162 lines
6.0 KiB
Python
162 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
|
#-*- coding:utf-8 -*-
|
|
import pymysql
|
|
import taos
|
|
import time
|
|
|
|
con_mysql = pymysql.connect(host='127.0.0.1',
|
|
database='location',
|
|
user='root',
|
|
password='password',
|
|
charset='utf8')
|
|
|
|
con_taos = taos.connect(host="127.0.0.1",
|
|
user="root",
|
|
password="password",
|
|
database="location",
|
|
port=6030,
|
|
config="/etc/taos",
|
|
timezone="Asia/Shanghai")
|
|
|
|
"""
|
|
##mysql 获取汇总数据
|
|
DROP TABLE IF EXISTS location.locationgroup;
|
|
CREATE TABLE location.locationgroup (
|
|
Id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
|
|
tbname varchar(50),
|
|
LoginName varchar(50),
|
|
UserName varchar(50),
|
|
DeviceIMEI varchar(50),
|
|
DeviceModel varchar(100),
|
|
DeviceType smallint(6),
|
|
PRIMARY KEY (Id),
|
|
KEY `NON-DeviceIMEI` (DeviceIMEI),
|
|
KEY `NON-DeviceModel` (DeviceModel),
|
|
KEY `NON-DeviceType` (DeviceType),
|
|
KEY `NON-LoginName` (LoginName)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8
|
|
;
|
|
|
|
INSERT INTO location.locationgroup(tbname,LoginName,UserName,DeviceIMEI)
|
|
SELECT DISTINCT CONCAT('t',LoginName,'_',DeviceIMEI) AS tbname,
|
|
LoginName,
|
|
UserName,
|
|
DeviceIMEI
|
|
FROM location.tb_locationhis;
|
|
|
|
INSERT INTO location.locationgroup(tbname,LoginName,UserName,DeviceIMEI,DeviceModel,DeviceType)
|
|
SELECT DISTINCT CONCAT('t',LoginName,'_',DeviceIMEI) AS tbname,
|
|
LoginName,
|
|
UserName,
|
|
DeviceIMEI,
|
|
DeviceModel,
|
|
DeviceType
|
|
FROM location.tb_location a
|
|
WHERE NOT EXISTS (SELECT 1
|
|
FROM location.locationgroup b
|
|
WHERE b.LoginName=a.LoginName
|
|
AND b.DeviceIMEI=a.DeviceIMEI);
|
|
|
|
##td 建库建表
|
|
DROP DATABASE IF EXISTS location;
|
|
CREATE DATABASE location;
|
|
ALTER DATABASE location CACHEMODEL 'both';
|
|
ALTER DATABASE location CACHESIZE 16;
|
|
ALTER DATABASE location KEEP 120;
|
|
|
|
DROP TABLE IF EXISTS location.location;
|
|
CREATE STABLE location.location ( \
|
|
locationtime TIMESTAMP, \
|
|
lat DOUBLE, \
|
|
lng DOUBLE, \
|
|
speed DOUBLE) \
|
|
TAGS (loginname NCHAR(50), \
|
|
username NCHAR(50), \
|
|
deviceimei NCHAR(50), \
|
|
devicemodel NCHAR(100), \
|
|
devicetype BOOL);
|
|
"""
|
|
|
|
start_time = time.time()
|
|
cursor_mysql = con_mysql.cursor()
|
|
cursor_mysql.execute('SELECT tbname,LoginName,UserName,DeviceIMEI,DeviceModel,DeviceType FROM location.locationgroup;')
|
|
tbnamecur = cursor_mysql.fetchall()
|
|
cursor_mysql.close()
|
|
|
|
read_end_time = time.time()
|
|
readdeltatime = read_end_time - start_time
|
|
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
print("%s: ☆读取所有子表数据成功!,耗时:%d秒!" % (now_time,readdeltatime) )
|
|
|
|
for tbname,loginname,username,deviceimei,devicemodel,devicetype in tbnamecur:
|
|
table_start_time = time.time()
|
|
cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
|
|
cursor_mysql.execute('SELECT Id FROM location.tmp_location WHERE LoginName = \"%s\" ORDER BY Id DESC;' % loginname)
|
|
countlines = cursor_mysql.fetchall()
|
|
cursor_mysql.close()
|
|
count = countlines[0]['Id']
|
|
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
print("%s: ☆读取数据总量、分组成功!\n%s: ☆开始分组写入TDengine..." % (now_time,now_time))
|
|
i = 0
|
|
j = 10000
|
|
taoscnt = 0
|
|
for i in range(0, count, 10000):
|
|
group_start_time = time.time()
|
|
groupcount = 0
|
|
cursor_taos = con_taos.cursor()
|
|
cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
|
|
cursor_mysql.execute(f'SELECT CONCAT(\'t\',LoginName,\'_\',DeviceIMEI) AS tbname, \
|
|
LocationTime, \
|
|
Lat, \
|
|
Lng, \
|
|
Speed \
|
|
FROM location.tmp_location \
|
|
WHERE LoginName = \"%s\" \
|
|
AND Id > %d \
|
|
AND Id <= %d;' % (loginname,i,j))
|
|
submission = cursor_mysql.fetchall()
|
|
cursor_mysql.close()
|
|
sql="INSERT INTO location.`%s` USING location.location TAGS (\'{}\',\'{}\',\'{}\',\'{}\',{}) VALUES ".format(tbname,loginname,username,deviceimei,devicemodel,devicetype).replace(",)" ,",False)" )
|
|
if submission:
|
|
sub_start_time = time.time()
|
|
for row in submission:
|
|
sql += '(\'{}\',{},{},{}) '.format(row["LocationTime"],row["Lat"],row["Lng"],row["Speed"])
|
|
sql += ";"
|
|
#print(sql)
|
|
#cursor_taos: TaosRestCursor = con_taos.cursor()
|
|
cursor_taos.execute(sql)
|
|
subtaoscnt = cursor_taos.rowcount
|
|
groupcount += subtaoscnt
|
|
taoscnt += subtaoscnt
|
|
#time.sleep(3)
|
|
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
subspeed = subtaoscnt / ( sub_start_time - time.time() )
|
|
print("%s: ☆☆☆☆☆子表%s又迁移了%i行,平均速度%d行/秒!" % (now_time,tbname,subtaoscnt,subspeed))
|
|
#cursor_taos.close()
|
|
else:
|
|
i += 10000
|
|
j += 10000
|
|
continue
|
|
i += 10000
|
|
j += 10000
|
|
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
groupspeed = groupcount / ( group_start_time - time.time() )
|
|
print("%s: ☆☆☆☆子表'%s'已迁移%i行!,平均速度%d行/秒!" % (now_time,tbname,groupspeed))
|
|
cursor_taos.close()
|
|
table_end_time = time.time()
|
|
tabledeltatime = table_end_time - table_start_time
|
|
tabletotalhour = int(tabledeltatime / 3600)
|
|
tabletotalminute = int((tabledeltatime - tabletotalhour * 3600) / 60)
|
|
tabletotalsecond = int(tabledeltatime - tabletotalhour * 3600 - tabletotalminute * 60)
|
|
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
print("%s: ☆☆子表'%s'数据迁移完毕,耗时:%d小时%d分%d秒!" %(now_time,tbname,tabletotalhour,tabletotalminute,tabletotalsecond))
|
|
end_time = time.time()
|
|
deltatime = end_time - start_time
|
|
totalhour = int(deltatime / 3600)
|
|
totalminute = int((deltatime - totalhour * 3600) / 60)
|
|
totalsecond = int(deltatime - totalhour * 3600 - totalminute * 60)
|
|
now_time = time.strftime('%Y-%m-%d %H:%M:%S')
|
|
print("%s: ☆数据全部迁移完毕,总计耗时:%d小时%d分%d秒!" %(now_time,totalhour,totalminute,totalsecond))
|
|
|
|
con_mysql.close()
|
|
con_taos.close() |