-
DataX
2019-03-20 10:51:31 -
datax(24):远程调试datax
2021-01-29 00:02:48一、datax开启远程debug 1、环境 本地: win10,idea专业版2020.3,datax3.0 远程: CentOS6.5,datax3.0 2、效果 3、步骤 3.1 远程开启debug /apps/datax/bin/datax.py /apps/datax/job/job.json -d 即在后面添加...一、datax开启远程debug
1、环境
本地: win10,idea专业版2020.3,datax3.0 远程: CentOS6.5,datax3.0
2、效果
3、步骤
3.1 远程开启debug
/apps/datax/bin/datax.py /apps/datax/job/job.json -d
即在后面添加-d 即可,默认端口为9999,也可以自行修改datax.py文件第35行
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
3.2 本地idea设置远程
二、远程debug原理
1、原理
Java远程调试的原理是两个VM之间通过debug协议进行通信,然后以达到远程调试的目的,两者之间可以通过socket进行通信;
我们知道,Java 程序都是运行在 Java 虚拟机上的,我们要调试 Java 程序,事实上就需要
向 Java 虚拟机请求当前运行态的状态,并对虚拟机发出一定的指令,设置一些回调
等等,那么 Java 的调试体系,就是虚拟机的一整套用于调试的工具和接口。对于 Java 虚拟机接口熟悉的人来说,您一定还记得 Java 提供了两个接口体系,JVMPI(Java Virtual Machine Profiler Interface)和 JVMDI(Java Virtual Machine Debug Interface),而它们,以及在 Java SE 5 中准备代替它们的 JVMTI(Java Virtual Machine Tool Interface),都是 Java 平台调试体系(Java Platform Debugger Architecture,JPDA)的重要组成部分。 Java SE 自 1.2.2 版就开始推出 Java 平台调试体系结构(JPDA)工具集,而从 JDK 1.3.x 开始,Java SDK 就提供了对 Java 平台调试体系结构的直接支持。顾名思义,这个体系为开发人员提供了一整套用于调试 Java 程序的 API,是一套用于开发 Java 调试工具的接口和协议。本质上说,它是我们通向虚拟机,考察虚拟机运行态的一个通道,一套工具。理解这一点对于学习 JPDA 非常重要。
换句话说,通过
JPDA 这套接口,我们就可以开发自己的调试工具
。通过这些 JPDA 提供的接口和协议,调试器开发人员就能根据特定开发者的需求,扩展定制 Java 调试应用程序,开发出吸引开发人员使用的调试工具。前面我们提到的 IDE 调试工具都是基于 JPDA 体系开发的,区别仅仅在于它们可能提供了不同的图形界面、具有一些不同的自定义功能。另外,我们要注意的是,JPDA 是一套标准,任何的 JDK 实现都必须完成这个标准,因此,通过 JPDA 开发出来的调试工具先天具有跨平台、不依赖虚拟机实现、JDK 版本无关等移植优点,因此大部分的调试工具都是基于这个体系的。- JPDA模块层次
- JPDA层次比较
更加深入的介绍文档可以参见 https://developer.ibm.com/zh/articles/j-lo-jpda1/
2、idea 和eclipse的远程debug原理
客户端(idea 、eclipse 等)之所以可以进行调试,是由于客户端 和 服务端(程序端)进行了 socket 通信,通信过程如下:
1、先建立起了 socket 连接
2、将断点位置创建了断点事件通过 JDI 接口传给了 服务端(程序端)的 VM,VM 调用 suspend 将 VM 挂起
3、VM 挂起之后将客户端需要获取的 VM 信息返回给客户端,返回之后 VM resume 恢复其运行状态
4、客户端获取到 VM 返回的信息之后可以通过不同的方式展示给客户;
- JPDA模块层次
-
datax(23):dataX调优
2021-01-28 00:34:55调优前需要先知道datax任务的执行过程; 一、调优方向 网络本身的带宽等硬件因素造成的影响; DataX本身的参数; 即当觉得DataX传输速度慢时,需要从上述两个个方面着手开始排查。 二、网络本身的带宽等硬件因素...调优前需要先知道datax任务的执行过程;
一、调优方向
- 网络本身的带宽等硬件因素造成的影响;
- DataX本身的参数;
即当觉得DataX传输速度慢时,需要从上述两个个方面着手开始排查。
二、网络本身的带宽等硬件因素造成的影响
此部分主要需要了解网络本身的情况,即从源端到目的端的带宽是多少(实际带宽计算公式),平时使用量和繁忙程度的情况,从而分析是否是本部分造成的速度缓慢。以下提供几个思路。
1,可使用从源端到目的端scp,python http,nethogs等观察实际网络及网卡速度; 2,结合监控观察任务运行时间段时,网络整体的繁忙情况,来判断是否应将任务避开网络高峰运行; 3,观察任务机的负载情况,尤其是网络和磁盘IO,观察其是否成为瓶颈,影响了速度;
三、DataX本身的参数;
全局:提升每个channel的速度
在DataX内部对每个Channel会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是1MB/s,可以根据具体硬件情况设置这个byte速度或者record速度,一般设置byte速度,比如:我们可以把单个Channel的速度上限配置为5MB
举例 { "core":{ "transport":{ "channel":{ "speed":{ "channel": 2, ## 此处为数据导入的并发度,建议根据服务器硬件进行调优 "record":-1, ##此处解除对读取行数的限制 "byte":-1, ##此处解除对字节的限制 "batchSize":2048 ##每次读取batch的大小 } } } }, "job":{ ... } }
局部:提升DataX Job内Channel并发数
并发数=taskGroup的数量每一个TaskGroup并发执行的Task数 (默认单个任务组的并发数量为5)。
提升job内Channel并发有三种配置方式:
- 配置全局Byte限速以及单Channel Byte限速,Channel个数 = 全局Byte限速 / 单Channel Byte限速
- 配置全局Record限速以及单Channel Record限速,Channel个数 = 全局Record限速 / 单Channel Record限速
- 直接配置Channel个数.
配置含义: job.setting.speed.channel : channel并发数 job.setting.speed.record : 全局配置channel的record限速 job.setting.speed.byte:全局配置channel的byte限速 core.transport.channel.speed.record:单channel的record限速 core.transport.channel.speed.byte:单channel的byte限速
举例 "setting": { "speed": { "channel": 2, "record":-1, "byte":-1, "batchSize":2048 } } } } # channel增大,为防止OOM,需要修改datax工具的datax.py文件。 # 如下所示,可根据任务机的实际配置,提升-Xms与-Xmx,来防止OOM。 # tunnel并不是越大越好,过分大反而会影响宿主机的性能。 DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
Jvm 调优
python datax.py --jvm="-Xms3G -Xmx3G" ../job/test.json
此处根据服务器配置进行调优,切记不可太大!否则直接Exception
以上为调优,应该是可以针对每个json文件都可以进行调优。注意事项
1.当提升DataX Job内Channel并发数时,调整JVM堆参数,原因如下:
1. 当一个Job内Channel数变多后,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。 2. 例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止jvm报内存溢出等错误,调大jvm的堆参数。 3. 通常我们建议将内存设置为4G或者8G,这个也可以根据实际情况来调整 4. 调整JVM xms xmx参数的两种方式:一种是直接更改datax.py;另一种是在启动的时候,加上对应的参数,如下:python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
2.Channel个数并不是越多越好, 原因如下:
1. Channel个数的增加,带来的是更多的CPU消耗以及内存消耗。 2. 如果Channel并发配置过高导致JVM内存不够用,会出现的情况是发生频繁的Full GC,导出速度会骤降,适得其反
注意:
MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能,splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据,第三个测试不配置splitPk测试不出来效果
调优没有固定的,先了解原理,再根据原理及执行过程进行局部或者全局的调优,谢谢各位阅读
-
datax
2020-10-09 11:50:18DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步...是什么
GitHub 地址:https://github.com/alibaba/DataX
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
安装
环境要求
- Linux|Macos
- JDK(1.8以上,推荐1.8)
- Python(推荐Python2.6.X)
- Apache Maven 3.x (Compile DataX)
下载安装
-
tar.gz包下载:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
-
源码编译
git clone git@github.com:alibaba/DataX.git cd {DataX_source_code_home} mvn -U clean package assembly:assembly -Dmaven.test.skip=true //打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ cd {DataX_source_code_home} ls ./target/datax/datax/ bin conf job lib log log_perf plugin script tmp
小试牛刀
怎么执行
datax 通过bin目录下的datax.py 文件 运行json文件来进行数据同步。解压缩包里有个测试datax的json文件。
cd {DataX_source_code_home} ./bin/datax.py ./job/job.json // 运行后结果 2020-09-18 18:22:47.282 [job-0] INFO JobContainer - 任务启动时刻 : 2020-09-18 18:22:37 │ 任务结束时刻 : 2020-09-18 18:22:47 │ 任务总计耗时 : 10s │ 任务平均流量 : 253.91KB/s │ 记录写入速度 : 10000rec/s │ 读出记录总数 : 100000 │ 读写失败总数 : 0
msyql 同步到es
- mysql 脚本
ROP TABLE IF EXISTS `test`; CREATE TABLE `test` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `col_ip` varchar(100) DEFAULT NULL COMMENT 'projectcol_ipname', `col_double` double(16,2) DEFAULT NULL COMMENT 'col_double', `col_long` bigint(20) DEFAULT NULL COMMENT 'col_long', `col_integer` int(11) DEFAULT '1' COMMENT 'col_integer', `col_keyword` varchar(100) DEFAULT NULL COMMENT 'col_keyword', `col_text` varchar(100) DEFAULT NULL COMMENT 'col_text', `col_geo_point` varchar(100) DEFAULT NULL COMMENT 'col_geo_point', `col_date` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'col_date time', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC; LOCK TABLES `test` WRITE; INSERT INTO `test` (`id`, `col_ip`, `col_double`, `col_long`, `col_integer`, `col_keyword`, `col_text`, `col_geo_point`, `col_date`) VALUES (1,'1.1.1.1',19890604.00,19890604,19890604,'hello world','hello world','41.12,-71.34','2020-09-18 19:04:29'); UNLOCK TABLES;
- myql2es.json
{ "job":{ "setting":{ "speed":{ "channel":1 } }, "content":[ { "reader":{ "name":"mysqlreader", "parameter":{ "username":"root", "password":"root", "column":[ "col_ip", "col_double", "col_long", "col_integer", "col_keyword", "col_text", "col_geo_point", "col_date" ], "splitPk":"", "connection":[ { "table":[ "test" ], "jdbcUrl":[ "jdbc:mysql://127.0.0.1:3306/test" ] } ] } }, "writer":{ "name":"elasticsearchwriter", "parameter":{ "endpoint":"http://192.168.90.124:9200", "index":"mytest", "type":"default", "accessId": "1", // 不写报错,,没有认证就随写个值 "accessKey": "1", // 不写报错,没有认证就随写个值 "cleanup":true, "settings":{ "index":{ "number_of_shards":5, "number_of_replicas":0 } }, "discovery":false, "batchSize":1000, "splitter":",", "column":[ { "name":"col_ip", "type":"ip" }, { "name":"col_double", "type":"double" }, { "name":"col_long", "type":"long" }, { "name":"col_integer", "type":"integer" }, { "name":"col_keyword", "type":"keyword" }, { "name":"col_text", "type":"text" }, { "name":"col_geo_point", "type":"geo_point" }, { "name":"col_date", "type":"date" } ] } } } ] } }
- 使用kibana查看,同步成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D0ygDysG-1602215392914)(datax/image-20200918192804137.png)]
总结
datax 解放了我们在数据开发时候异构数据源间etl处理,使用起来还是很简单的,更多高级功能或者坑遇到再写。
-
datax(13):源码解读Column-datax中的数据类型
2021-01-22 00:38:00Column是datax中所有数据类型的基类,里面有3个属性,以及一个构造方法,外加一个枚举类; public abstract class Column { private Type type; private Object rawData; private int byteSize; public ... -
datax(4): datax.py解读
2021-01-05 15:43:36原始文件位置在 xx/DataX/core/src/main/bin/下,datax项目打包后会将文件拷贝到 xx/DataX\target\datax\datax\bin 下。 core模块的pom.xml 指定‘拷贝’datax.py文件的方式maven-assembly-plugin <plugin> ... -
datax(20):内置transformer使用
2021-01-25 11:33:27job.json使用datax的样例json,源文件在xxx\DataX\core\src\main\job\中,打包编译后在xxx\DataX\target\datax\datax\job下。本文略做修改,主要修改2出,是否打印和记录行数 { "job": { "setting": { "speed":. -
datax(10): 源码解读Communication(Datax通讯类)
2021-01-19 19:38:42前面看了datax的通讯机制,继续看源码—具体的通讯类 Communication。根据datax的运行模式的区别, 数据的收集会有些区别,这篇文章都是讲的在standalone模式下。 一、communication概述 DataX所有的统计信息都会... -
datax(22):任务分配规则
2021-01-26 23:18:25前面学习了一些源码和datax的执行,其中有一个重要的流程任务切分。今天梳理下; 一、概述 Datax根首先据配置文件,确定好channel的并发数目。然后将整个job分成一个个小的task,然后划分成组。从JobContainer的... -
datax(26):各个数据库与datax字段映射
2021-02-01 23:36:02通过源码解读Column-datax中的数据类型,可以知道datax框架中只有7(enum Type种)种数据类型,那么各个数据库的字段是如何和datax的字段进行相互映射? 一、ADB PG DataX 内部类型 ADB PG 数据类型 Long ... -
datax(11):源码解读 ContainerCommunicator
2021-01-20 00:34:37前面看了datax的 通讯类communication,现在看看在他之上包装的一个容器通信类ContainerCommunicator 一、抽象基类AbstractContainerCommunicator dataX中提供了一个基类 AbstractContainerCommunicator来处理... -
datax(5):改造升级-自动识别py环境,执行datax任务
2021-01-05 16:45:05上篇文章已经研究过datax.py文件,产生2个问题: 如果用户不是py2环境(datax默认要求环境)怎么处理; 能不能有一个脚本自动识别用户的py环境,从而执行datax任务 2 效果 在py2或py3下执行下面命令 >python ... -
datax(18):源码解读Transformer
2021-01-24 15:04:55现在很多场景都把datax当做ETL工具,datax中的各种reader相当于E(Extract),各种writer相当于L(load),那么datax中是否有T(transform)。答案是肯定的~ 一、概述transformer 作用:在生产上数据传输,一般... -
datax(19):源码解读内置Transformer
2021-01-25 00:05:42通过datax(18)已经对transformer有了初步了解,继续撸代码,看datax已经内置的5种简单类型transformer; 一、概述 目前datax内置了5种常用的transformer,分别如下 截取SubstrTransformer 填充PadTransformer ... -
DataX源码阅读汇总
2021-03-10 11:14:28将自己datax的系列文章进行汇总形成目录 DataX(1):编译打包使用 DataX(2): 通过idea搭建源码阅读+调试环境 DataX(3): win环境cmd乱码 DataX(4): dDataX.py解读 DataX(5):改造升级-自动识别py环境,执行dDataX任务 ...