mysql2td_codeberg/mysql2td_thread.py

135 lines
4.7 KiB
Python

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
import pymysql
import threading
from taosrest import connect, TaosRestConnection, TaosRestCursor
import time
con_mysql = pymysql.connect(host='127.0.0.1',
database='location',
user='root',
password='password',
charset='utf8')
con_taosrest: TaosRestConnection = connect(url="http://127.0.0.1:6041",
user="root",
password="password",
timeout=300)
step = 10000
taoscnt = 0
start_time = time.time()
lock = threading.RLock()
"""
DROP TABLE IF EXISTS location.location;
CREATE STABLE location.location ( \
locationtime TIMESTAMP, \
lat DOUBLE, \
lng DOUBLE, \
speed DOUBLE) \
TAGS (loginname NCHAR(50), \
username NCHAR(50), \
deviceimei NCHAR(50), \
devicemodel NCHAR(100), \
devicetype BOOL);
"""
class scheduler():
def __init__(self):
self.start = 0
self.sql_text = 'SELECT Id FROM location.location ORDER BY Id DESC LIMIT 1;'
cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
lock.acquire()
cursor_mysql.execute(self.sql_text)
self.maxsteplines = cursor_mysql.fetchall()
lock.release()
cursor_mysql.close()
self.maxstep = self.maxsteplines[0]['Id']
def getfetchid(self):
global step
while self.start <= self.maxstep:
self.start += step
return (self.start)
def processinsert():
global step
global taoscnt
global start_time
startid = scheduler.getfetchid()
endid = startid + step
while startid:
endid = startid + step
cursor_mysql = con_mysql.cursor()
lock.acquire()
cursor_mysql.execute('SELECT CONCAT(\'t\',LoginName,\'_\',DeviceIMEI) AS tbname , \
LoginName, \
MAX(UserName) AS UserName, \
DeviceIMEI, \
IFNULL(MAX(DeviceModel),\'\') AS DeviceModel, \
IFNULL(MAX(DeviceType),\'\') AS DeviceType\
FROM location.location \
WHERE Id > %s \
AND Id <= %s \
GROUP BY LoginName,DeviceIMEI;' % (startid,endid))
tbnamecur = cursor_mysql.fetchall()
lock.release()
cursor_mysql.close()
for tbname,loginname,username,deviceimei,devicemodel,devicetype in tbnamecur:
cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
lock.acquire()
cursor_mysql.execute(f'SELECT CONCAT(\'t\',LoginName,\'_\',DeviceIMEI) AS tbname, \
LocationTime, \
Lat, \
Lng, \
Speed \
FROM location.location \
WHERE LoginName = \"%s\" \
AND DeviceIMEI = \"%s\" \
AND Id > %d \
AND Id <= %d;' % (loginname,deviceimei,startid,endid))
submission = cursor_mysql.fetchall()
lock.release()
cursor_mysql.close()
sql="INSERT INTO location.`%s` USING location.location TAGS (\'%s\',\'%s\',\'%s\',\'%s\',%s) VALUES ".replace("''" ,"Null" ) % (tbname,loginname,username,deviceimei,devicemodel,devicetype)
if submission:
for row in submission:
sql += '(\'{}\',{},{},{}) '.format(row["LocationTime"],row["Lat"],row["Lng"],row["Speed"])
sql += ";"
print(sql)
cursor_taosrest: TaosRestCursor = con_taosrest.cursor()
#lock.acquire()
cursor_taosrest.execute(sql)
taoscnt += cursor_taosrest.rowcount
#lock.release()
cursor_taosrest.close()
stime = time.time() - start_time
avgrow = int(taoscnt / stime)
print("数据已迁移%d行,耗时%d秒,平均行%d/秒" % (taoscnt,stime,avgrow))
startid = scheduler.getfetchid()
def threads_scheduler(threads_num):
global start_time
threads = []
for i in range(threads_num):
td = threading.Thread(target=processinsert, name='th'+str(i+1))
threads.append(td)
for t in threads:
t.setDaemon(True)
t.start()
#t.join()
#for t in threads:
#t.join()
while threading.active_count() !=1:
pass
else:
end_time = time.time()
deltatime = end_time - start_time
totalhour = int(deltatime / 3600)
totalminute = int((deltatime - totalhour * 3600) / 60)
totalsecond = int(deltatime - totalhour * 3600 - totalminute * 60)
print("数据全部迁移完毕,总计耗时:%d小时%d%d秒!" %(totalhour, totalminute, totalsecond))
if __name__=='__main__':
scheduler = scheduler()
threads_scheduler(8)