mysql2td_codeberg/README.md

7.7 KiB
Raw Permalink Blame History

title date tags categories
数据迁移脚本案例 2022-09-30 08:26:11
ETL
数据迁移
ETL工具

使用Python脚本将MySQL数据迁移至TDengine

项目背景:

公司调研过程时序数据库做技术储备。最开始调研TDengine2.4随时间推进官方升级2.6在2.6上做了数据迁移一批手机上传上来的定位数据使用DATAX使用mysqlreader加淘思官方提供的tdenginewriter插件做好字段对应关系写入数据按阿里DATAX效率开3个管道速度也并不快走内网平均迁移速递大概在5000条/秒到最近涛思升级TDengine为3.0版本很多附属工具基本重构支持了流式计算增加了社区版的数据删除等功能这次使用DATAX适配失败可能是改的太多了索性尝试先使用python脚本迁移下。

说干就干,官方提供了快速迁移方案,但是数据源是虚拟出来的,并没有时间主键,而且拆分子表是按量拆分然后平均分配,索性直接放弃,但是里面的多线程写入还是要参考的。

由于是python初学者类和一些库函数使用还不是很熟简简单单一把梭一条流水从头撸到尾。迁移也是尝试了很多方法遇到了很多问题这里逐一介绍。

TDengine连接建立

官方提供文档两种连接方式原生客户端restful方式。

taosc 本地客户端方式

安装驱动

安装包见官网 Linux|Windows

win安装linux解压执行 chmod+x install_client.sh && install_client.sh

配置客户端

前提是与服务端的6030端口开启TCP和UDP的放行 如果是集群服务端所有FQDN都需要添加端口两个协议的放行

  • 修改cfg配置文件(win: C:\TDengine\cfg\taos.cfg linux/etc/taos/taos.cfg) 修改 firstEP 为TDengine服务端的FirstEP
 # The end point of the first dnode in the cluster to be connected to when 
 `taosd` or `taos` is started
 firstEp                   tdengine1:6030
  • 修改host文件添加FQDN解析映射
 vim /etc/hosts
 192.168.1.xxx tdengine1
 192.168.1.xxx tdengine2
 192.168.1.xxx tdengine3
  • 验证连接
 ping tdengine1
 ping tdengine2
 ping tdengine3
 telnet tdengine1 6030
 telnet tdengine2 6030
 telnet tdengine3 6030
  • taosc验证
$ taos -h tdengine1 -u root -p

taos > show databases;

安装python库

pip uninstall taos taospy
pip install taospy

此种安装方法最高版本2.4.x使用pip_search查看最高2.5.1(2022.09.30)还是使用git源直接安装靠谱

pip install git+https://github.com/taosdata/taos-connector-python.git

但是吧github方式国内连接不畅可以下载包后来本地安装。

注意要求python版本3.8以上非官方文档上的3.6以上。

测试连接

import taos

def test_connection():
    # all parameters are optional.
    # if database is specified,
    # then it must exist.
    conn = taos.connect(host="localhost",
                        port=6030,
                        user="root",
                        password="taosdata",
                        database="log")
    print('client info:', conn.client_info)
    print('server info:', conn.server_info)
    conn.close()


if __name__ == "__main__":
    test_connection()

restful 方式

万能连接大法能发送GET、POST请求就行。

与原生连接器的一个区别是RESTful 接口是无状态的,因此 USE db_name 指令没有效果,所有 对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name这时 如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。

  • 确认服务端开启 taosadapter

firstEP服务端运行

systemctl status taosadapter

确保开放6041

lsof -nP -iTCP -sTCP:LISTEN 
  • 测试连接
curl -u root:password http://192.168.1.xxx:6041/rest/sql -d "select server_version()"

HTTP 响应码 response code 说明 200 正确返回和 C 接口错误返回 400 参数错误返回 401 鉴权失败 404 接口不存在 500 内部错误 503 系统资源不足

from taosrest import connect, TaosRestConnection, TaosRestCursor

conn: TaosRestConnection = connect(url="http://localhost:6041",
                                   user="root",
                                   password="taosdata",
                                   timeout=30)

源数据batch

拆分、循环方法

因源数据有上亿级别数据,不可能一个SELECT撸到低,首先源表必须加上自增主键,或者带索引的可以作为分片键的字段(可以现加)然后取个最大值用for循环分批次去读取如果数据处理模块、函数是要分多线程来取数据需要有个全局变量记录不同线程之间取完数据后的增量值。

子表名称拼接

这里会考虑将源表代表设备、人等可分子表的关键键值作为分组条件取出各个唯一值全量再去循环每个子表的固定batch分批拼接INSERT语句后写入。这里结合上面的分批取源表数据,会有嵌套循环的场景,

  • 先从全量数据取出固定分组然后循环每个分组内的Id做for循环这里好处是可以保证子循环里Id可以连续不会按自增来跑空
  • 先取全量Id的连续值在子循环里再找分组这里好处是分组的压力对源库压力小不至于直接给上亿条数据直接GROUP BY
  • 还有一种是先取全量Id连续值子循环里拆分组这个分组之后由于有设备、人等键的限制Id是不连续的那就再次来个for循环遍历分组内的结果直接拼出来INSERT语句。

这里各种循环,没得用函数,也没得分文件,最开始需求就是要迁移数据,有思路更清晰的应该能更好的给模块化下。

TDengine语句拼接

TDengine对标签数据和子表内数据写入和正常的SQL标准内的还稍微有些区别和其他时序类的应该很相似这里在官方文档里有段

INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33)
            d21002 USING meters (groupId) TAGS (2) VALUES ('2021-07-13 14:06:34.255', 10.15, 217, 0.33)
            d21003 USING meters (groupId) TAGS (2) (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31);

需要拿出标签数据写在INSERT段里,然后VALUES值按行写在括号内。

简单日志

上面讲到有实际已经到了三个循环,可以用全局变量累计每个子循环的时间、返回行数的累计,计算出平均迁移速度,返回给前台。

后台运行

nohup python3.9 mysql2tdnative.py &

实际脚本

放托管平台了

GitHub

Gitee

Codeberg

本来想写一堆,写着写着忘词了~