See social network users cryptocurrency addresses and balances. Powered by transparent blockchains.
https://iseeyour.cash/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
501 lines
22 KiB
501 lines
22 KiB
from nitter_scraper import NitterScraper |
|
from datetime import timedelta |
|
from bs4 import BeautifulSoup |
|
import datetime as dt |
|
import hashlib |
|
import random |
|
import httpx |
|
import json |
|
import yaml |
|
import time |
|
import re |
|
import os |
|
|
|
# Database |
|
import mysql.connector |
|
from mysql.connector import Error |
|
|
|
# DATABASE SQLALCHEMY |
|
import aiohttp |
|
import asyncio |
|
from sqlalchemy.future import select |
|
from sqlalchemy.ext.asyncio import create_async_engine |
|
|
|
|
|
import sqlalchemy as sa |
|
from sqlalchemy import text |
|
|
|
f = open('data.json') |
|
data = json.load(f) |
|
if int(os.environ["ISYC_DEBUG"]) > 0: |
|
data["ranking_filter"] = [] |
|
|
|
teddit_instances = ["incogsnoo.com"]#, "teddit.kavin.rocks", "teddit.ggc-project.de", "teddit.zaggy.nl"] |
|
|
|
|
|
my_session = aiohttp.ClientSession(trust_env=True) |
|
|
|
currencies = ("btc", "eth", "nano") |
|
|
|
# Regex definitions |
|
addr_patterns = { |
|
"btc": re.compile(r"\b[bc1|13][a-zA-HJ-NP-Z0-9]{25,43}\b"), |
|
"eth": re.compile(r"\b(0x[a-fA-F0-9]{40})\b"), |
|
"nano": re.compile(r"\b(nano_[13][1-9a-z]{59})\b") |
|
} |
|
|
|
########################################## |
|
######## REDDIT RELATED FUNCTIONS ######## |
|
########################################## |
|
async def use_pushshift_api(username, comments, submissions): |
|
""" Given comments and submissions form a certain username retrieved from pushshift, it scraps all content |
|
newer than last_checked and finds any crypto addresses and updates them in the database. Returns 1 if successful. |
|
""" |
|
uid, new = await get_uid_add_user(username) |
|
lastc = await get_user_last_checked_and_update(uid) |
|
text_list = [] |
|
for f in asyncio.as_completed([x(i) for i in comments]): |
|
comment_json = await f |
|
comment_text = comment_json['body'] |
|
try: |
|
url = f"https://td.r3d.red{comment_json['permalink']}" |
|
except: |
|
_id = comment_json['link_id'].split("_")[1] |
|
subreddit = comment_json['subreddit'] |
|
url = f"https://td.r3d.red/r/{subreddit}/comments/{_id}" |
|
|
|
# if the comment has already been checked |
|
if (dt.datetime.fromtimestamp(comment_json['created_utc']) < lastc) and new == False: |
|
continue |
|
|
|
comment_obj = { |
|
"text": comment_text, |
|
"url": url |
|
} |
|
text_list.append(comment_obj) |
|
await find_all_addresses_and_add_them(text_list, uid, "reddit") |
|
return 1 |
|
|
|
async def use_teddit_api(username, user_content): |
|
''' Given user_content form a certain username retrieved from teddit, scraps all comments newer than |
|
last_checked and finds any crypto addresses and updates them in the database. Returns 1 if successful. |
|
''' |
|
uid, new = await get_uid_add_user(username) |
|
lastc = await get_user_last_checked_and_update(uid) |
|
|
|
if user_content[0]['type'] == "t1": |
|
last_comment = BeautifulSoup(user_content[0]['body_html'], "lxml").text |
|
else: |
|
last_comment = BeautifulSoup(user_content[0]['title'], "lxml").text |
|
|
|
for f in asyncio.as_completed([x(i) for i in user_content]): |
|
# t1 = Comment, t3 = Post |
|
comment_json = await f |
|
if comment_json['type'] == "t1": |
|
comment = BeautifulSoup(comment_json['body_html'], "lxml").text |
|
else: |
|
comment = BeautifulSoup(comment_json['title'], "lxml").text |
|
|
|
url = comment_json['permalink'] |
|
# if the comment has already been checked |
|
if (dt.datetime.fromtimestamp(comment_json['created_utc']) < lastc) and new == False: |
|
continue |
|
|
|
await find_addresses_in_text_and_add_them(comment, uid, "reddit", url) |
|
return 1 |
|
|
|
|
|
async def get_crypto_addresses_from_reddit(username): |
|
reddit_result = 1 |
|
teddit=random.choice(teddit_instances) |
|
source = 0 |
|
try: |
|
user_comments = json.loads(httpx.get(f"https://api.pushshift.io/reddit/search/comment/?author={username}&limit=1000").content)['data'] |
|
user_submissions = json.loads(httpx.get(f"https://api.pushshift.io/reddit/search/submission/?author={username}&limit=1000").content)['data'] |
|
source = 1 |
|
if len(user_comments) == 0 and len(user_submissions) == 0: |
|
return -1 |
|
except: |
|
user_content = json.loads(httpx.get(f"https://{teddit}/u/{username}?sort=new&t=year&api").content)['posts'] |
|
source = 2 |
|
if not user_content: |
|
return -2 |
|
|
|
if source == 1: # Source is pushshift |
|
return await use_pushshift_api(username, user_comments, user_submissions) |
|
else: # Source is Teddit |
|
return await use_teddit_api(username, user_content) |
|
|
|
########################################## |
|
######## TWITTER RELATED FUNCTIONS ####### |
|
########################################## |
|
async def get_crypto_addresses_from_twitter(username): |
|
with NitterScraper(host=data['nitter-ip'], port=8080, container_name="nitter") as nitter: |
|
try: |
|
tweets = nitter.get_tweets(username, pages=25, with_replies=True) |
|
profile = nitter.get_profile(username, not_found_ok=True) |
|
except: |
|
print("User does not exist - get_crypto_addresses_from_twitter") |
|
return -1 # User does not exist |
|
try: |
|
# if the comment has already been checked |
|
uid, new = await get_uid_add_user(username) |
|
lastc = await get_user_last_checked_and_update(uid) |
|
|
|
# Scan profile bio for addresses |
|
if profile: |
|
try: |
|
await find_addresses_in_text_and_add_them(profile.biography, uid, 'twitter', f'/{username}') |
|
except: |
|
print("Warning - No biography") |
|
else: # If profile does not exist, user does not exist too. |
|
return -1 |
|
|
|
text_list = [] |
|
for tweet in asyncio.as_completed([x(i) for i in tweets]): |
|
tweet = await tweet |
|
|
|
# Skip retweets |
|
if tweet.is_retweet is True: |
|
continue |
|
|
|
# Extract tweet data |
|
date = tweet.time |
|
text = tweet.text |
|
tweet_url = tweet.tweet_url |
|
if (date < lastc) and new == False: |
|
continue |
|
comment_obj = { |
|
"text": text, |
|
"url": tweet_url |
|
} |
|
text_list.append(comment_obj) |
|
|
|
if text_list: |
|
await find_all_addresses_and_add_them(text_list, uid, "twitter") |
|
else: |
|
return 1 # Success |
|
return 1 |
|
except: |
|
print("Error - get_crypto_addresses_from_twitter") |
|
return -1 # User does not exist |
|
|
|
########################################## |
|
####### GENERAL UTILITY FUNCTIONS ######## |
|
########################################## |
|
async def get_crypto_addresses(username): |
|
'''It tries to get content from pushshift, if it does not succeed it will fallback to Teddit. |
|
It will return -1 if the user does not exist. It will return -2 if user has no content. |
|
''' |
|
reddit_result = await get_crypto_addresses_from_reddit(username) |
|
twitter_result = await get_crypto_addresses_from_twitter(username) |
|
return reddit_result, twitter_result |
|
|
|
async def check_balance(address, currency): |
|
''' Given an address and an currency it will find its balance and return it in the biggest units. |
|
''' |
|
if currency == "btc": |
|
url = f"https://blockchain.info/q/addressbalance/{address}" |
|
try: |
|
r = httpx.get(url) |
|
sats = r.text |
|
return float(sats)/100000000.0 |
|
except: |
|
return 0.0 |
|
|
|
if currency == "eth": |
|
url = f"https://www.etherchain.org/account/{address}" |
|
try: |
|
r = httpx.get(url, timeout=5.0) |
|
soup = BeautifulSoup(r.content, features="html.parser") |
|
eth_balance = soup.find_all('span', class_="badge-success")[0].text |
|
return float(eth_balance[:-3]) |
|
except: |
|
return 0.0 |
|
|
|
if currency == "nano": |
|
url = f"https://api.nanocrawler.cc/v2/accounts/{address}" |
|
try: |
|
r = httpx.get(url) |
|
data = json.loads(r.content) |
|
return float(data['account']['balance'][0:5])/1000000.0 |
|
except: |
|
return 0.0 |
|
|
|
async def get_user_data(username): |
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
result = 0 |
|
async with engine.begin() as conn: |
|
uid = await conn.execute(text("SELECT user_id FROM user WHERE username=:x"), |
|
[{"x": username}]) |
|
if not uid.first(): |
|
result_reddit, result_twitter = await get_crypto_addresses(username) |
|
|
|
if result_reddit == -1 and result_twitter == -1: # -1 = User does not exist |
|
return {currency: -1 for currency in currencies} |
|
if result_reddit == -2 and result_twitter == -2: # -2 = User has no content |
|
return {currency: -2 for currency in currencies} |
|
#await conn.commit() |
|
|
|
async with engine.begin() as conn: |
|
uid = await conn.execute(text("SELECT user_id FROM user WHERE username=:x"), |
|
[{"x": username}]) |
|
uid = uid.first() |
|
uid = uid[0] |
|
|
|
last_checked = await conn.execute(text("SELECT last_checked FROM user WHERE user_id=:x"), |
|
[{"x": uid}]) |
|
last_checked = last_checked.first()[0] |
|
|
|
if last_checked < dt.datetime.now()-timedelta(hours=6): |
|
result_reddit, result_twitter = await get_crypto_addresses(username) |
|
lastc = await get_user_last_checked_and_update(uid) |
|
result = result_reddit+result_twitter |
|
if result < 0: |
|
return {currency: -1 for currency in currencies} |
|
|
|
name = await conn.execute(text("SELECT username FROM user WHERE user_id=:x"), [{"x": uid}]) |
|
|
|
addresses = {} |
|
for currency in currencies: |
|
addr_data = await conn.execute(text(f"SELECT * FROM address WHERE user=:uid AND currency='{currency}'"), [{"uid": uid}]) |
|
addr_data = addr_data.all() |
|
# check if there are any blacklisted addresses in addr_data |
|
for row in addr_data[:]: # copy ([:]) because of remove() |
|
if currency in data["blacklist"] and row[0] in data["blacklist"][currency]: |
|
await conn.execute(text(f"DELETE FROM address WHERE currency=:currency AND address_id=:addr"), [{"currency": currency, "addr": row[0]}]) |
|
addr_data.remove(row) |
|
addresses[currency] = addr_data |
|
|
|
return addresses |
|
|
|
async def get_price(): |
|
''' It gets the price from the database for each supported crypto. |
|
Also returns the date from the last check of the price. |
|
''' |
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
async with engine.begin() as conn: |
|
prices = {currency: (await conn.execute(text(f"SELECT * FROM currency WHERE id='{currency}'"))).first() for currency in currencies} |
|
# last check is same for all currencies |
|
last_check = prices["btc"][2] |
|
# filter by price |
|
prices = {currency: data[1] for currency, data in prices.items()} |
|
return prices, last_check |
|
|
|
|
|
async def update_price(): |
|
''' Updates the price for each crypto on the database using the Coingecko API. |
|
''' |
|
# currency symbols with coingecko ids, currency_id is not guarrantied to be the same as currency_name for all currencies |
|
currencies_with_ids = ( |
|
("btc", "bitcoin"), |
|
("eth", "ethereum"), |
|
("nano", "nano") |
|
) |
|
urls = {currency: f"https://api.coingecko.com/api/v3/simple/price?ids={currency_id}&vs_currencies=usd" for currency, currency_id in currencies_with_ids} |
|
response = {currency: httpx.get(urls[currency]).content for currency in currencies} |
|
prices = {currency: str(json.loads(response[currency])[currency_id]['usd']) for currency, currency_id in currencies_with_ids} |
|
|
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
async with engine.begin() as conn: |
|
date = str(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) |
|
for currency in currencies: |
|
await conn.execute(text(f"UPDATE currency SET price = '{prices[currency]}' WHERE id='{currency}'")) |
|
await conn.execute(text(f"UPDATE currency SET last_check = '{date}' WHERE id='{currency}'")) |
|
await conn.commit() |
|
|
|
async def update_db_address(address, currency, balance, source, url, uid): |
|
''' Given an address, a currency id, a balance and a user_id this function updates the database; if the address does |
|
not exist, it creates an entry for it. If it does exist it updates it with the new balance. |
|
''' |
|
date = str(dt.datetime.today().strftime("%Y-%m-%d")) |
|
# If it is a new address, we add it to the Database |
|
|
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
async with engine.begin() as conn: |
|
exists = await conn.execute(text("SELECT * FROM address WHERE address_id=:aid AND user=:uid"), |
|
[{"aid": address, "uid": uid}]) |
|
|
|
if exists.first() is None: # Address is not in he database |
|
res = await conn.execute(text("""INSERT INTO address |
|
(address_id, currency, balance, last_interacted, source, url, user) |
|
VALUES (:aid, :currency, :balance, :lin, :src, :url, :uid)"""), |
|
[{"aid": address, "currency": currency, "balance": balance, |
|
"lin": date, "src":source, "url":url, "uid": uid}]) |
|
await conn.commit() |
|
return res |
|
else: # If it already exists, we update its data. |
|
res = await conn.execute(text("""UPDATE address SET balance=:balance WHERE address_id=:aid AND user=:uid"""), |
|
[{"balance": balance, "aid": address, "uid": uid}]) |
|
await conn.commit() |
|
return res |
|
|
|
async def x(i): |
|
await asyncio.sleep(1) |
|
return i |
|
|
|
async def get_user_last_checked_and_update(uid): |
|
''' Given a user_id it returns the last_checked parameter from the user and updates it |
|
to the datetime.utcnow() date. |
|
''' |
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
lastc = 0 |
|
async with engine.begin() as conn: |
|
await conn.execute(text("UPDATE user SET last_checked=:date WHERE user_id=:uid"), |
|
[{"date": dt.datetime.utcnow(), "uid": uid}]) |
|
lastc = await conn.execute(text("SELECT last_checked FROM user WHERE user_id=:x"), |
|
[{"x": uid}]) |
|
lastc = lastc.first()[0] |
|
await conn.commit() |
|
return lastc |
|
|
|
async def get_user_last_checked_social_and_update(uid, social="r"): |
|
''' Given a user_id it returns the last_checked parameter from the user and updates it |
|
to the datetime.utcnow() date. |
|
''' |
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
lastc = 0 |
|
async with engine.begin() as conn: |
|
if social == "r": |
|
await conn.execute(text("UPDATE user SET last_checked_r=:date WHERE user_id=:uid"), |
|
[{"date": dt.datetime.utcnow(), "uid": uid}]) |
|
lastc = await conn.execute(text("SELECT last_checked_r FROM user WHERE user_id=:x"), |
|
[{"x": uid}]) |
|
if social == "t": |
|
await conn.execute(text("UPDATE user SET last_checked_t=:date WHERE user_id=:uid"), |
|
[{"date": dt.datetime.utcnow(), "uid": uid}]) |
|
lastc = await conn.execute(text("SELECT last_checked_t FROM user WHERE user_id=:x"), |
|
[{"x": uid}]) |
|
lastc = lastc.first()[0] |
|
await conn.commit() |
|
return lastc |
|
|
|
async def get_uid_add_user(username): |
|
""" Given a username string, if it does not exist it creates an entry to the database for it and returns its new user_id |
|
and a True boolean that indicates that this user has just been created. |
|
Otherwise, it only returns the user_id and a False boolean indicating that this user was already in the database. |
|
""" |
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
async with engine.begin() as conn: |
|
res = await conn.execute(text("SELECT user_id FROM user WHERE username=:x"), |
|
[{"x": username}]) |
|
uid = res.first() |
|
if not uid: |
|
date = str(dt.datetime.utcnow()) |
|
await conn.execute(text("INSERT INTO user VALUES (0, :username, :date, :date, :date)"), |
|
[{"username": username, "date": date}]) |
|
|
|
res = await conn.execute(text("SELECT user_id FROM user WHERE username=:x"), |
|
[{"x": username}]) |
|
uid = res.first() |
|
await conn.commit() |
|
return uid[0], True |
|
return uid[0], False |
|
|
|
async def find_all_addresses_and_add_them(text_list, uid, source): |
|
'''Given a list of strings and urls pairs (text, url), it finds btc/eth addresses on these texts |
|
and adds them to the database with their balance. It makes a single request for all addresses of a currency.''' |
|
|
|
addr_lists = {currency: list() for currency in currencies} |
|
|
|
url_address = {} |
|
|
|
# Find addresses in text |
|
for text in text_list: |
|
for currency in currencies: |
|
if addr_patterns[currency].findall(text['text']): |
|
address = addr_patterns[currency].findall(text['text'])[0] |
|
if currency not in data['blacklist'] or address not in data['blacklist'][currency]: |
|
addr_lists[currency].append(address) |
|
url_address.update({address:text['url']}) |
|
|
|
# Check all Ethereum found addresses |
|
eth_text_list = ",".join(addr_lists["eth"]) |
|
url = f"https://api.etherscan.io/api?module=account&action=balancemulti&address={eth_text_list}&tag=latest&apikey={data['etherscan-api-key']}" |
|
eth_request = httpx.get(url) |
|
eth_data = json.loads(eth_request.content) |
|
if eth_data['status'] == '1': |
|
for account in eth_data["result"]: |
|
balance = round((float(account['balance'])/1000000000000000000.0), 5) |
|
await update_db_address(account['account'], 'eth', balance, source, url_address[account['account']], uid) |
|
else: |
|
print(eth_data['result']) |
|
|
|
# Check all Bitcoin found addresses |
|
btc_text_list = "|".join(addr_lists["btc"]) |
|
url = f"""https://blockchain.info/balance?active={btc_text_list}""" |
|
btc_request = httpx.get(url) |
|
btc_data = json.loads(btc_request.content) |
|
if 'error' not in btc_data: |
|
for account in addr_lists["btc"]: |
|
balance = round((float(btc_data[account]['final_balance'])/100000000.0), 5) |
|
await update_db_address(account, 'btc', balance, source, url_address[account], uid) |
|
else: |
|
print(btc_data['message']) |
|
|
|
# check all found Nano addresses |
|
for address in addr_lists["nano"]: |
|
url = f"https://api.nanocrawler.cc/v2/accounts/{address}" |
|
r = httpx.get(url) |
|
nano_data = json.loads(r.content) |
|
if not 'error' in nano_data: |
|
balance = round((float(nano_data['account']['balance'])/1000000000000000000000000000000.0), 5) |
|
else: |
|
balance = 0.0 |
|
await update_db_address(address, 'nano', balance, source, text['url'], uid) |
|
|
|
|
|
|
|
async def find_addresses_in_text_and_add_them(text, uid, source, url): |
|
# We find addresses using regex stored in addr_patterns |
|
for currency in currencies: |
|
if addr_patterns[currency].findall(text): |
|
address = addr_patterns[currency].findall(text)[0] |
|
if currency not in data['blacklist'] or address not in data['blacklist'][currency]: |
|
balance = await check_balance(address, currency) |
|
await update_db_address(address, currency, balance, source, url, uid) |
|
|
|
async def get_rankings(max_rank): |
|
prices, dt = await get_price() |
|
|
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
async with engine.begin() as conn: |
|
res = await conn.execute( |
|
text( |
|
"SELECT U.username, SUM( " |
|
"CASE " + |
|
"".join(f"WHEN A.currency = '{currency}' THEN (A.balance*:{currency}_price) " for currency in currencies) + |
|
"ELSE 0 " |
|
"END " |
|
") AS total_usd " |
|
"FROM user AS U " |
|
"JOIN address AS A " |
|
"ON U.user_id = A.user " |
|
"WHERE U.username NOT IN :ranking_filter " |
|
"GROUP BY U.username " |
|
"ORDER BY total_usd DESC " |
|
"LIMIT :max_rank " |
|
";" |
|
), |
|
[{ |
|
**{f"{currency}_price": prices[currency] for currency in currencies}, |
|
# empty string to prevent empty list |
|
"ranking_filter": ["", *data["ranking_filter"]], |
|
"max_rank": max_rank |
|
}] |
|
) |
|
|
|
ranking = [] |
|
for i, r in enumerate(res, start=1): |
|
ranking.append([i, r[0], f"${r[1]:,.2f}"]) |
|
|
|
return ranking |
|
|
|
async def get_global_stats(): |
|
engine = create_async_engine(f"mysql+aiomysql://root:{data['db-password']}@{data['db-name']}:3307/iseeyourcash?charset=utf8mb4") |
|
async with engine.begin() as conn: |
|
tot_addr_count = await conn.execute(text("SELECT COUNT(*) FROM address")) |
|
tot_addr_count = tot_addr_count.first()[0] |
|
tot_user_count = await conn.execute(text("SELECT COUNT(*) FROM user")) |
|
tot_user_count = tot_user_count.first()[0] |
|
return tot_addr_count, tot_user_count
|
|
|