135 lines
4.7 KiB
Python
135 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
#-*- coding:utf-8 -*-
|
|
import pymysql
|
|
import threading
|
|
from taosrest import connect, TaosRestConnection, TaosRestCursor
|
|
import time
|
|
|
|
con_mysql = pymysql.connect(host='127.0.0.1',
|
|
database='location',
|
|
user='root',
|
|
password='password',
|
|
charset='utf8')
|
|
|
|
con_taosrest: TaosRestConnection = connect(url="http://127.0.0.1:6041",
|
|
user="root",
|
|
password="password",
|
|
timeout=300)
|
|
|
|
step = 10000
|
|
taoscnt = 0
|
|
start_time = time.time()
|
|
lock = threading.RLock()
|
|
"""
|
|
DROP TABLE IF EXISTS location.location;
|
|
CREATE STABLE location.location ( \
|
|
locationtime TIMESTAMP, \
|
|
lat DOUBLE, \
|
|
lng DOUBLE, \
|
|
speed DOUBLE) \
|
|
TAGS (loginname NCHAR(50), \
|
|
username NCHAR(50), \
|
|
deviceimei NCHAR(50), \
|
|
devicemodel NCHAR(100), \
|
|
devicetype BOOL);
|
|
"""
|
|
|
|
class scheduler():
|
|
def __init__(self):
|
|
self.start = 0
|
|
self.sql_text = 'SELECT Id FROM location.location ORDER BY Id DESC LIMIT 1;'
|
|
cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
|
|
lock.acquire()
|
|
cursor_mysql.execute(self.sql_text)
|
|
self.maxsteplines = cursor_mysql.fetchall()
|
|
lock.release()
|
|
cursor_mysql.close()
|
|
self.maxstep = self.maxsteplines[0]['Id']
|
|
|
|
def getfetchid(self):
|
|
global step
|
|
while self.start <= self.maxstep:
|
|
self.start += step
|
|
return (self.start)
|
|
|
|
def processinsert():
|
|
global step
|
|
global taoscnt
|
|
global start_time
|
|
startid = scheduler.getfetchid()
|
|
endid = startid + step
|
|
while startid:
|
|
endid = startid + step
|
|
cursor_mysql = con_mysql.cursor()
|
|
lock.acquire()
|
|
cursor_mysql.execute('SELECT CONCAT(\'t\',LoginName,\'_\',DeviceIMEI) AS tbname , \
|
|
LoginName, \
|
|
MAX(UserName) AS UserName, \
|
|
DeviceIMEI, \
|
|
IFNULL(MAX(DeviceModel),\'\') AS DeviceModel, \
|
|
IFNULL(MAX(DeviceType),\'\') AS DeviceType\
|
|
FROM location.location \
|
|
WHERE Id > %s \
|
|
AND Id <= %s \
|
|
GROUP BY LoginName,DeviceIMEI;' % (startid,endid))
|
|
tbnamecur = cursor_mysql.fetchall()
|
|
lock.release()
|
|
cursor_mysql.close()
|
|
|
|
for tbname,loginname,username,deviceimei,devicemodel,devicetype in tbnamecur:
|
|
cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
|
|
lock.acquire()
|
|
cursor_mysql.execute(f'SELECT CONCAT(\'t\',LoginName,\'_\',DeviceIMEI) AS tbname, \
|
|
LocationTime, \
|
|
Lat, \
|
|
Lng, \
|
|
Speed \
|
|
FROM location.location \
|
|
WHERE LoginName = \"%s\" \
|
|
AND DeviceIMEI = \"%s\" \
|
|
AND Id > %d \
|
|
AND Id <= %d;' % (loginname,deviceimei,startid,endid))
|
|
submission = cursor_mysql.fetchall()
|
|
lock.release()
|
|
cursor_mysql.close()
|
|
sql="INSERT INTO location.`%s` USING location.location TAGS (\'%s\',\'%s\',\'%s\',\'%s\',%s) VALUES ".replace("''" ,"Null" ) % (tbname,loginname,username,deviceimei,devicemodel,devicetype)
|
|
if submission:
|
|
for row in submission:
|
|
sql += '(\'{}\',{},{},{}) '.format(row["LocationTime"],row["Lat"],row["Lng"],row["Speed"])
|
|
sql += ";"
|
|
print(sql)
|
|
cursor_taosrest: TaosRestCursor = con_taosrest.cursor()
|
|
#lock.acquire()
|
|
cursor_taosrest.execute(sql)
|
|
taoscnt += cursor_taosrest.rowcount
|
|
#lock.release()
|
|
cursor_taosrest.close()
|
|
stime = time.time() - start_time
|
|
avgrow = int(taoscnt / stime)
|
|
print("数据已迁移%d行,耗时%d秒,平均行%d/秒" % (taoscnt,stime,avgrow))
|
|
startid = scheduler.getfetchid()
|
|
|
|
def threads_scheduler(threads_num):
|
|
global start_time
|
|
threads = []
|
|
for i in range(threads_num):
|
|
td = threading.Thread(target=processinsert, name='th'+str(i+1))
|
|
threads.append(td)
|
|
for t in threads:
|
|
t.setDaemon(True)
|
|
t.start()
|
|
#t.join()
|
|
#for t in threads:
|
|
#t.join()
|
|
while threading.active_count() !=1:
|
|
pass
|
|
else:
|
|
end_time = time.time()
|
|
deltatime = end_time - start_time
|
|
totalhour = int(deltatime / 3600)
|
|
totalminute = int((deltatime - totalhour * 3600) / 60)
|
|
totalsecond = int(deltatime - totalhour * 3600 - totalminute * 60)
|
|
print("数据全部迁移完毕,总计耗时:%d小时%d分%d秒!" %(totalhour, totalminute, totalsecond))
|
|
if __name__=='__main__':
|
|
scheduler = scheduler()
|
|
threads_scheduler(8) |