See social network users cryptocurrency addresses and balances. Powered by transparent blockchains. https://iseeyour.cash/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

501 lines
22 KiB

from nitter_scraper import NitterScraper
from datetime import timedelta
from bs4 import BeautifulSoup
import datetime as dt
import hashlib
import random
import httpx
import json
import yaml
import time
import re
import os
# Database
import mysql.connector
from mysql.connector import Error
# DATABASE SQLALCHEMY
import aiohttp
import asyncio
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import create_async_engine
import sqlalchemy as sa
from sqlalchemy import text
f = open('data.json')
data = json.load(f)
if int(os.environ["ISYC_DEBUG"]) > 0:
data["ranking_filter"] = []
teddit_instances = ["incogsnoo.com"]#, "teddit.kavin.rocks", "teddit.ggc-project.de", "teddit.zaggy.nl"]
my_session = aiohttp.ClientSession(trust_env=True)
currencies = ("btc", "eth", "nano")
# Regex definitions
addr_patterns = {
"btc": re.compile(r"\b[bc1|13][a-zA-HJ-NP-Z0-9]{25,43}\b"),
"eth": re.compile(r"\b(0x[a-fA-F0-9]{40})\b"),
"nano": re.compile(r"\b(nano_[13][1-9a-z]{59})\b")
}
##########################################
######## REDDIT RELATED FUNCTIONS ########
##########################################
async def use_pushshift_api(username, comments, submissions):
""" Given comments and submissions form a certain username retrieved from pushshift, it scraps all content
newer than last_checked and finds any crypto addresses and updates them in the database. Returns 1 if successful.
"""
uid, new = await get_uid_add_user(username)
lastc = await get_user_last_checked_and_update(uid)
text_list = []
for f in asyncio.as_completed([x(i) for i in comments]):
comment_json = await f
comment_text = comment_json['body']
try:
url = f"https://td.r3d.red{comment_json['permalink']}"
except:
_id = comment_json['link_id'].split("_")[1]
subreddit = comment_json['subreddit']
url = f"https://td.r3d.red/r/{subreddit}/comments/{_id}"
# if the comment has already been checked
if (dt.datetime.fromtimestamp(comment_json['created_utc']) < lastc) and new == False:
continue
comment_obj = {
"text": comment_text,
"url": url
}
text_list.append(comment_obj)
await find_all_addresses_and_add_them(text_list, uid, "reddit")
return 1
async def use_teddit_api(username, user_content):
''' Given user_content form a certain username retrieved from teddit, scraps all comments newer than
last_checked and finds any crypto addresses and updates them in the database. Returns 1 if successful.
'''
uid, new = await get_uid_add_user(username)
lastc = await get_user_last_checked_and_update(uid)
if user_content[0]['type'] == "t1":
last_comment = BeautifulSoup(user_content[0]['body_html'], "lxml").text
else:
last_comment = BeautifulSoup(user_content[0]['title'], "lxml").text
for f in asyncio.as_completed([x(i) for i in user_content]):
# t1 = Comment, t3 = Post
comment_json = await f
if comment_json['type'] == "t1":
comment = BeautifulSoup(comment_json['body_html'], "lxml").text
else:
comment = BeautifulSoup(comment_json['title'], "lxml").text
url = comment_json['permalink']
# if the comment has already been checked
if (dt.datetime.fromtimestamp(comment_json['created_utc']) < lastc) and new == False:
continue
await find_addresses_in_text_and_add_them(comment, uid, "reddit", url)
return 1
async def get_crypto_addresses_from_reddit(username):
reddit_result = 1
teddit=random.choice(teddit_instances)
source = 0
try:
user_comments = json.loads(httpx.get(f"https://api.pushshift.io/reddit/search/comment/?author={username}&limit=1000").content)['data']
user_submissions = json.loads(httpx.get(f"https://api.pushshift.io/reddit/search/submission/?author={username}&limit=1000").content)['data']
source = 1
if len(user_comments) == 0 and len(user_submissions) == 0:
return -1
except:
user_content = json.loads(httpx.get(f"https://{teddit}/u/{username}?sort=new&t=year&api").content)['posts']
source = 2
if not user_content:
return -2
if source == 1: # Source is pushshift
return await use_pushshift_api(username, user_comments, user_submissions)
else: # Source is Teddit
return await use_teddit_api(username, user_content)
##########################################
######## TWITTER RELATED FUNCTIONS #######
##########################################
async def get_crypto_addresses_from_twitter(username):
with NitterScraper(host=data['nitter-ip'], port=8080, container_name="nitter") as nitter:
try:
tweets = nitter.get_tweets(username, pages=25, with_replies=True)
profile = nitter.get_profile(username, not_found_ok=True)
except:
print("User does not exist - get_crypto_addresses_from_twitter")
return -1 # User does not exist
try:
# if the comment has already been checked
uid, new = await get_uid_add_user(username)
lastc = await get_user_last_checked_and_update(uid)
# Scan profile bio for addresses
if profile:
try:
await find_addresses_in_text_and_add_them(profile.biography, uid, 'twitter', f'/{username}')
except:
print("Warning - No biography")
else: # If profile does not exist, user does not exist too.
return -1
text_list = []
for tweet in asyncio.as_completed([x(i) for i in tweets]):
tweet = await tweet
# Skip retweets
if tweet.is_retweet is True:
continue
# Extract tweet data
date = tweet.time
text = tweet.text
tweet_url = tweet.tweet_url
if (date < lastc) and new == False:
continue
comment_obj = {
"text": text,
"url": tweet_url
}
text_list.append(comment_obj)
if text_list:
await find_all_addresses_and_add_them(text_list, uid, "twitter")
else:
return 1 # Success
return 1
except:
print("Error - get_crypto_addresses_from_twitter")
return -1 # User does not exist
##########################################
####### GENERAL UTILITY FUNCTIONS ########
##########################################
async def get_crypto_addresses(username):
'''It tries to get content from pushshift, if it does not succeed it will fallback to Teddit.
It will return -1 if the user does not exist. It will return -2 if user has no content.
'''
reddit_result = await get_crypto_addresses_from_reddit(username)
twitter_result = await get_crypto_addresses_from_twitter(username)
return reddit_result, twitter_result
async def check_balance(address, currency):
''' Given an address and an currency it will find its balance and return it in the biggest units.
'''
if currency == "btc":
url = f"https://blockchain.info/q/addressbalance/{address}"
try:
r = httpx.get(url)
sats = r.text
return float(sats)/100000000.0
except:
return 0.0
if currency == "eth":
url = f"https://www.etherchain.org/account/{address}"
try:
r = httpx.get(url, timeout=5.0)
soup = BeautifulSoup(r.content, features="html.parser")
eth_balance = soup.find_all('span', class_="badge-success")[0].text
return float(eth_balance[:-3])
except:
return 0.0
if currency == "nano":
url = f"https://api.nanocrawler.cc/v2/accounts/{address}"
try:
r = httpx.get(url)
data = json.loads(r.content)
return float(data['account']['balance'][0:5])/1000000.0
except:
return 0.0
async def get_user_data(username):
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
result = 0
async with engine.begin() as conn:
uid = await conn.execute(text("SELECT user_id FROM user WHERE username=:x"),
[{"x": username}])
if not uid.first():
result_reddit, result_twitter = await get_crypto_addresses(username)
if result_reddit == -1 and result_twitter == -1: # -1 = User does not exist
return {currency: -1 for currency in currencies}
if result_reddit == -2 and result_twitter == -2: # -2 = User has no content
return {currency: -2 for currency in currencies}
#await conn.commit()
async with engine.begin() as conn:
uid = await conn.execute(text("SELECT user_id FROM user WHERE username=:x"),
[{"x": username}])
uid = uid.first()
uid = uid[0]
last_checked = await conn.execute(text("SELECT last_checked FROM user WHERE user_id=:x"),
[{"x": uid}])
last_checked = last_checked.first()[0]
if last_checked < dt.datetime.now()-timedelta(hours=6):
result_reddit, result_twitter = await get_crypto_addresses(username)
lastc = await get_user_last_checked_and_update(uid)
result = result_reddit+result_twitter
if result < 0:
return {currency: -1 for currency in currencies}
name = await conn.execute(text("SELECT username FROM user WHERE user_id=:x"), [{"x": uid}])
addresses = {}
for currency in currencies:
addr_data = await conn.execute(text(f"SELECT * FROM address WHERE user=:uid AND currency='{currency}'"), [{"uid": uid}])
addr_data = addr_data.all()
# check if there are any blacklisted addresses in addr_data
for row in addr_data[:]: # copy ([:]) because of remove()
if currency in data["blacklist"] and row[0] in data["blacklist"][currency]:
await conn.execute(text(f"DELETE FROM address WHERE currency=:currency AND address_id=:addr"), [{"currency": currency, "addr": row[0]}])
addr_data.remove(row)
addresses[currency] = addr_data
return addresses
async def get_price():
''' It gets the price from the database for each supported crypto.
Also returns the date from the last check of the price.
'''
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
async with engine.begin() as conn:
prices = {currency: (await conn.execute(text(f"SELECT * FROM currency WHERE id='{currency}'"))).first() for currency in currencies}
# last check is same for all currencies
last_check = prices["btc"][2]
# filter by price
prices = {currency: data[1] for currency, data in prices.items()}
return prices, last_check
async def update_price():
''' Updates the price for each crypto on the database using the Coingecko API.
'''
# currency symbols with coingecko ids, currency_id is not guarrantied to be the same as currency_name for all currencies
currencies_with_ids = (
("btc", "bitcoin"),
("eth", "ethereum"),
("nano", "nano")
)
urls = {currency: f"https://api.coingecko.com/api/v3/simple/price?ids={currency_id}&vs_currencies=usd" for currency, currency_id in currencies_with_ids}
response = {currency: httpx.get(urls[currency]).content for currency in currencies}
prices = {currency: str(json.loads(response[currency])[currency_id]['usd']) for currency, currency_id in currencies_with_ids}
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
async with engine.begin() as conn:
date = str(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for currency in currencies:
await conn.execute(text(f"UPDATE currency SET price = '{prices[currency]}' WHERE id='{currency}'"))
await conn.execute(text(f"UPDATE currency SET last_check = '{date}' WHERE id='{currency}'"))
await conn.commit()
async def update_db_address(address, currency, balance, source, url, uid):
''' Given an address, a currency id, a balance and a user_id this function updates the database; if the address does
not exist, it creates an entry for it. If it does exist it updates it with the new balance.
'''
date = str(dt.datetime.today().strftime("%Y-%m-%d"))
# If it is a new address, we add it to the Database
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
async with engine.begin() as conn:
exists = await conn.execute(text("SELECT * FROM address WHERE address_id=:aid AND user=:uid"),
[{"aid": address, "uid": uid}])
if exists.first() is None: # Address is not in he database
res = await conn.execute(text("""INSERT INTO address
(address_id, currency, balance, last_interacted, source, url, user)
VALUES (:aid, :currency, :balance, :lin, :src, :url, :uid)"""),
[{"aid": address, "currency": currency, "balance": balance,
"lin": date, "src":source, "url":url, "uid": uid}])
await conn.commit()
return res
else: # If it already exists, we update its data.
res = await conn.execute(text("""UPDATE address SET balance=:balance WHERE address_id=:aid AND user=:uid"""),
[{"balance": balance, "aid": address, "uid": uid}])
await conn.commit()
return res
async def x(i):
await asyncio.sleep(1)
return i
async def get_user_last_checked_and_update(uid):
''' Given a user_id it returns the last_checked parameter from the user and updates it
to the datetime.utcnow() date.
'''
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
lastc = 0
async with engine.begin() as conn:
await conn.execute(text("UPDATE user SET last_checked=:date WHERE user_id=:uid"),
[{"date": dt.datetime.utcnow(), "uid": uid}])
lastc = await conn.execute(text("SELECT last_checked FROM user WHERE user_id=:x"),
[{"x": uid}])
lastc = lastc.first()[0]
await conn.commit()
return lastc
async def get_user_last_checked_social_and_update(uid, social="r"):
''' Given a user_id it returns the last_checked parameter from the user and updates it
to the datetime.utcnow() date.
'''
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
lastc = 0
async with engine.begin() as conn:
if social == "r":
await conn.execute(text("UPDATE user SET last_checked_r=:date WHERE user_id=:uid"),
[{"date": dt.datetime.utcnow(), "uid": uid}])
lastc = await conn.execute(text("SELECT last_checked_r FROM user WHERE user_id=:x"),
[{"x": uid}])
if social == "t":
await conn.execute(text("UPDATE user SET last_checked_t=:date WHERE user_id=:uid"),
[{"date": dt.datetime.utcnow(), "uid": uid}])
lastc = await conn.execute(text("SELECT last_checked_t FROM user WHERE user_id=:x"),
[{"x": uid}])
lastc = lastc.first()[0]
await conn.commit()
return lastc
async def get_uid_add_user(username):
""" Given a username string, if it does not exist it creates an entry to the database for it and returns its new user_id
and a True boolean that indicates that this user has just been created.
Otherwise, it only returns the user_id and a False boolean indicating that this user was already in the database.
"""
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
async with engine.begin() as conn:
res = await conn.execute(text("SELECT user_id FROM user WHERE username=:x"),
[{"x": username}])
uid = res.first()
if not uid:
date = str(dt.datetime.utcnow())
await conn.execute(text("INSERT INTO user VALUES (0, :username, :date, :date, :date)"),
[{"username": username, "date": date}])
res = await conn.execute(text("SELECT user_id FROM user WHERE username=:x"),
[{"x": username}])
uid = res.first()
await conn.commit()
return uid[0], True
return uid[0], False
async def find_all_addresses_and_add_them(text_list, uid, source):
'''Given a list of strings and urls pairs (text, url), it finds btc/eth addresses on these texts
and adds them to the database with their balance. It makes a single request for all addresses of a currency.'''
addr_lists = {currency: list() for currency in currencies}
url_address = {}
# Find addresses in text
for text in text_list:
for currency in currencies:
if addr_patterns[currency].findall(text['text']):
address = addr_patterns[currency].findall(text['text'])[0]
if currency not in data['blacklist'] or address not in data['blacklist'][currency]:
addr_lists[currency].append(address)
url_address.update({address:text['url']})
# Check all Ethereum found addresses
eth_text_list = ",".join(addr_lists["eth"])
url = f"https://api.etherscan.io/api?module=account&action=balancemulti&address={eth_text_list}&tag=latest&apikey={data['etherscan-api-key']}"
eth_request = httpx.get(url)
eth_data = json.loads(eth_request.content)
if eth_data['status'] == '1':
for account in eth_data["result"]:
balance = round((float(account['balance'])/1000000000000000000.0), 5)
await update_db_address(account['account'], 'eth', balance, source, url_address[account['account']], uid)
else:
print(eth_data['result'])
# Check all Bitcoin found addresses
btc_text_list = "|".join(addr_lists["btc"])
url = f"""https://blockchain.info/balance?active={btc_text_list}"""
btc_request = httpx.get(url)
btc_data = json.loads(btc_request.content)
if 'error' not in btc_data:
for account in addr_lists["btc"]:
balance = round((float(btc_data[account]['final_balance'])/100000000.0), 5)
await update_db_address(account, 'btc', balance, source, url_address[account], uid)
else:
print(btc_data['message'])
# check all found Nano addresses
for address in addr_lists["nano"]:
url = f"https://api.nanocrawler.cc/v2/accounts/{address}"
r = httpx.get(url)
nano_data = json.loads(r.content)
if not 'error' in nano_data:
balance = round((float(nano_data['account']['balance'])/1000000000000000000000000000000.0), 5)
else:
balance = 0.0
await update_db_address(address, 'nano', balance, source, text['url'], uid)
async def find_addresses_in_text_and_add_them(text, uid, source, url):
# We find addresses using regex stored in addr_patterns
for currency in currencies:
if addr_patterns[currency].findall(text):
address = addr_patterns[currency].findall(text)[0]
if currency not in data['blacklist'] or address not in data['blacklist'][currency]:
balance = await check_balance(address, currency)
await update_db_address(address, currency, balance, source, url, uid)
async def get_rankings(max_rank):
prices, dt = await get_price()
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
async with engine.begin() as conn:
res = await conn.execute(
text(
"SELECT U.username, SUM( "
"CASE " +
"".join(f"WHEN A.currency = '{currency}' THEN (A.balance*:{currency}_price) " for currency in currencies) +
"ELSE 0 "
"END "
") AS total_usd "
"FROM user AS U "
"JOIN address AS A "
"ON U.user_id = A.user "
"WHERE U.username NOT IN :ranking_filter "
"GROUP BY U.username "
"ORDER BY total_usd DESC "
"LIMIT :max_rank "
";"
),
[{
**{f"{currency}_price": prices[currency] for currency in currencies},
# empty string to prevent empty list
"ranking_filter": ["", *data["ranking_filter"]],
"max_rank": max_rank
}]
)
ranking = []
for i, r in enumerate(res, start=1):
ranking.append([i, r[0], f"${r[1]:,.2f}"])
return ranking
async def get_global_stats():
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4")
async with engine.begin() as conn:
tot_addr_count = await conn.execute(text("SELECT COUNT(*) FROM address"))
tot_addr_count = tot_addr_count.first()[0]
tot_user_count = await conn.execute(text("SELECT COUNT(*) FROM user"))
tot_user_count = tot_user_count.first()[0]
return tot_addr_count, tot_user_count