mysql2td_codeberg/mysql2tdreal.py

162 lines
6.0 KiB
Python

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
import pymysql
import taos
import time
con_mysql = pymysql.connect(host='127.0.0.1',
database='location',
user='root',
password='password',
charset='utf8')
con_taos = taos.connect(host="127.0.0.1",
user="root",
password="password",
database="location",
port=6030,
config="/etc/taos",
timezone="Asia/Shanghai")
"""
##mysql 获取汇总数据
DROP TABLE IF EXISTS location.locationgroup;
CREATE TABLE location.locationgroup (
Id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
tbname varchar(50),
LoginName varchar(50),
UserName varchar(50),
DeviceIMEI varchar(50),
DeviceModel varchar(100),
DeviceType smallint(6),
PRIMARY KEY (Id),
KEY `NON-DeviceIMEI` (DeviceIMEI),
KEY `NON-DeviceModel` (DeviceModel),
KEY `NON-DeviceType` (DeviceType),
KEY `NON-LoginName` (LoginName)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
INSERT INTO location.locationgroup(tbname,LoginName,UserName,DeviceIMEI)
SELECT DISTINCT CONCAT('t',LoginName,'_',DeviceIMEI) AS tbname,
LoginName,
UserName,
DeviceIMEI
FROM location.tb_locationhis;
INSERT INTO location.locationgroup(tbname,LoginName,UserName,DeviceIMEI,DeviceModel,DeviceType)
SELECT DISTINCT CONCAT('t',LoginName,'_',DeviceIMEI) AS tbname,
LoginName,
UserName,
DeviceIMEI,
DeviceModel,
DeviceType
FROM location.tb_location a
WHERE NOT EXISTS (SELECT 1
FROM location.locationgroup b
WHERE b.LoginName=a.LoginName
AND b.DeviceIMEI=a.DeviceIMEI);
##td 建库建表
DROP DATABASE IF EXISTS location;
CREATE DATABASE location;
ALTER DATABASE location CACHEMODEL 'both';
ALTER DATABASE location CACHESIZE 16;
ALTER DATABASE location KEEP 120;
DROP TABLE IF EXISTS location.location;
CREATE STABLE location.location ( \
locationtime TIMESTAMP, \
lat DOUBLE, \
lng DOUBLE, \
speed DOUBLE) \
TAGS (loginname NCHAR(50), \
username NCHAR(50), \
deviceimei NCHAR(50), \
devicemodel NCHAR(100), \
devicetype BOOL);
"""
start_time = time.time()
cursor_mysql = con_mysql.cursor()
cursor_mysql.execute('SELECT tbname,LoginName,UserName,DeviceIMEI,DeviceModel,DeviceType FROM location.locationgroup;')
tbnamecur = cursor_mysql.fetchall()
cursor_mysql.close()
read_end_time = time.time()
readdeltatime = read_end_time - start_time
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
print("%s: ☆读取所有子表数据成功!,耗时:%d秒!" % (now_time,readdeltatime) )
for tbname,loginname,username,deviceimei,devicemodel,devicetype in tbnamecur:
table_start_time = time.time()
cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
cursor_mysql.execute('SELECT Id FROM location.tmp_location WHERE LoginName = \"%s\" ORDER BY Id DESC;' % loginname)
countlines = cursor_mysql.fetchall()
cursor_mysql.close()
count = countlines[0]['Id']
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
print("%s: ☆读取数据总量、分组成功!\n%s: ☆开始分组写入TDengine..." % (now_time,now_time))
i = 0
j = 10000
taoscnt = 0
for i in range(0, count, 10000):
group_start_time = time.time()
groupcount = 0
cursor_taos = con_taos.cursor()
cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
cursor_mysql.execute(f'SELECT CONCAT(\'t\',LoginName,\'_\',DeviceIMEI) AS tbname, \
LocationTime, \
Lat, \
Lng, \
Speed \
FROM location.tmp_location \
WHERE LoginName = \"%s\" \
AND Id > %d \
AND Id <= %d;' % (loginname,i,j))
submission = cursor_mysql.fetchall()
cursor_mysql.close()
sql="INSERT INTO location.`%s` USING location.location TAGS (\'{}\',\'{}\',\'{}\',\'{}\',{}) VALUES ".format(tbname,loginname,username,deviceimei,devicemodel,devicetype).replace(",)" ,",False)" )
if submission:
sub_start_time = time.time()
for row in submission:
sql += '(\'{}\',{},{},{}) '.format(row["LocationTime"],row["Lat"],row["Lng"],row["Speed"])
sql += ";"
#print(sql)
#cursor_taos: TaosRestCursor = con_taos.cursor()
cursor_taos.execute(sql)
subtaoscnt = cursor_taos.rowcount
groupcount += subtaoscnt
taoscnt += subtaoscnt
#time.sleep(3)
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
subspeed = subtaoscnt / ( sub_start_time - time.time() )
print("%s: ☆☆☆☆☆子表%s又迁移了%i行,平均速度%d行/秒!" % (now_time,tbname,subtaoscnt,subspeed))
#cursor_taos.close()
else:
i += 10000
j += 10000
continue
i += 10000
j += 10000
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
groupspeed = groupcount / ( group_start_time - time.time() )
print("%s: ☆☆☆☆子表'%s'已迁移%i行!,平均速度%d行/秒!" % (now_time,tbname,groupspeed))
cursor_taos.close()
table_end_time = time.time()
tabledeltatime = table_end_time - table_start_time
tabletotalhour = int(tabledeltatime / 3600)
tabletotalminute = int((tabledeltatime - tabletotalhour * 3600) / 60)
tabletotalsecond = int(tabledeltatime - tabletotalhour * 3600 - tabletotalminute * 60)
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
print("%s: ☆☆子表'%s'数据迁移完毕,耗时:%d小时%d%d秒!" %(now_time,tbname,tabletotalhour,tabletotalminute,tabletotalsecond))
end_time = time.time()
deltatime = end_time - start_time
totalhour = int(deltatime / 3600)
totalminute = int((deltatime - totalhour * 3600) / 60)
totalsecond = int(deltatime - totalhour * 3600 - totalminute * 60)
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
print("%s: ☆数据全部迁移完毕,总计耗时:%d小时%d%d秒!" %(now_time,totalhour,totalminute,totalsecond))
con_mysql.close()
con_taos.close()