精华内容
下载资源
问答
  • datax因为脏数据降速问题解决

    千次阅读 2020-11-30 16:47:06
    一言以蔽之:datax可能会因为脏数据太多导致频繁回滚操作,进一步让jvm内存触发gc,让速度降低到0,可以在sql语句中规避脏数据的写入来规避 1.问题 datax使用类型转换触发jvm gc然后降速至0失去响应。 ->脏数据为...

    一言以蔽之:datax可能会因为脏数据太多导致频繁回滚操作,进一步让jvm内存触发gc,让速度降低到0,可以在sql语句中规避脏数据的写入来规避
    1.问题

    datax使用类型转换触发jvm gc然后降速至0失去响应。

    ->脏数据为什么会触发gc

    ->脏数据导致datax回滚写入降速

    a.首先是开始就低速,发生在动态基线策略里面

    b.到八十万条左右的时候就开始低速,发生在动态基线和状态评价里面,这时日志伴随脏数据记录输出。

    c.id会从2049开始,是否有一个2048的块大小进行抽取,然后过一段时间才会变回1

    d.出现问题在后面,也就是metaspace快满的时候,是否是jvm或者mysql内存不够用,导致sql语句的执行出现问题,但是为什么这样的问题前面没出现,后面才出现。为什么后面会出现这种问题,是sql先失效导致的脏数据,再导致jvm,还是jvmgc导致sql失效产生脏数据。

    可能原因:

    1.数据问题

    2.jvm内存不够用

    应该是这个原因,不确定是jvm的问题还是mysql的问题,如果是不转换数据格式,那么很快就能完成也没有jvm info信息的生成:

    metaspace占用达到了97%,速度就降下来了,compress class space也到了90,触发了gc
    在这里插入图片描述

    2020-11-27 10:02:22.070 [job-0] INFO  VMInfo - 
    	 [delta cpu info] => 
    		curDeltaCpu                    | averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
    		-1.00%                         | -1.00%                         | -1.00%                         | -1.00%
                            
    
    	 [delta memory info] => 
    		 NAME                           | used_size                      | used_percent                   | max_used_size                  | max_percent                    
    		 PS Eden Space                  | 239.66MB                       | 75.96%                         | 239.66MB                       | 75.96%                         
    		 Code Cache                     | 10.74MB                        | 90.48%                         | 10.74MB                        | 90.48%                         
    		 Compressed Class Space         | 1.93MB                         | 90.98%                         | 1.93MB                         | 90.98%                         
    		 PS Survivor Space              | 2.53MB                         | 19.47%                         | 2.53MB                         | 19.47%                         
    		 PS Old Gen                     | 5.81MB                         | 0.85%                          | 5.81MB                         | 0.85%                          
    		 Metaspace                      | 18.64MB                        | 97.48%                         | 18.64MB                        | 97.48%                         
    
    	 [delta gc info] => 
    		 NAME                 | curDeltaGCCount    | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | curDeltaGCTime     | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
    		 PS MarkSweep         | 0                  | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             | 0.000s             
    		 PS Scavenge          | 0                  | 16                 | 16                 | 0                  | 0.000s             | 0.194s             | 0.194s             | 0.000s           
    

    初始值:

    MEMORY_NAME                    | allocation_size                | init_size                      
    	PS Eden Space                  | 256.00MB                       | 256.00MB                       
    	Code Cache                     | 240.00MB                       | 2.44MB                         
    	Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
    	PS Survivor Space              | 42.50MB                        | 42.50MB                        
    	PS Old Gen                     | 683.00MB                       | 683.00MB                       
    	Metaspace                      | -0.00MB                        | 0.00MB                         
    
    

    3.为什么70万左右出现脏数据 ,前面都没出现。

    拟解决方案:

    1.调高jvm内存,调高maxerror值或者设置maxerror值为百分比。

    2.检查日志确定数据是否有问题。

    3.确定脏数据的处理方式

    解决过程记录:

    1.jvm部分

    首先排查jvm部分的问题,这里查看

    2.使用较少数据量来测试,是否发生降速,我把一周的数据抽取计划换成了一天,发现了一个问题。就是当脏数据很多,datax会停下来。

    是否是脏数据的问题,脏数据太多,我们封装的datax没有对相关状态的反馈。
    在这里插入图片描述

    可以在core.json看到我设置了1000个脏数据,实际上,报错十几个二十几个脏数据的时候有可能会触发jvm的gc,然后就没了。

    会不会是这个collector类太大了,然后又不能释放,导致失败 如果不转换类型,那么就不会有这样的脏数据收集,所以会成功。:

    jvm的gc

    (实际上应该是脏数据导致回滚,回滚类占用太大)

    脏数据拟解决方案:

    1.一方面是脏数据的发现与处理,可以配置直接丢弃?

    由于这是往数据库写入的时候出现的问题。也就是说回滚会影响速度,是在数据库写入才发现的脏数据,所以是否直接丢弃这些脏数据并不能解决降速的问题,降速问题更多的可能是因为数据库因为回滚收到了影响,能够恢复。还可能因为这些回滚操作需要更多的内存空间让datax执行,而内存空间不够,导致了gc进一步降低了datax抽取的速度。

    可能方法:
    在这里插入图片描述

    错误收集没有停止,但是依然降速,而且伴着gc的出现。
    在这里插入图片描述

    2.确定脏数据的产生方式:

    考虑到如果不转换字段类型,那么是可以直接抽取不出问题的。那么这里我们尝试先不转换抽取到一张表中,然后根据转换抽取策略里面的脏数据信息去查看,是否存在脏数据,存在什么类型的脏数据,在sql语句中对这些脏数据进行规避。

    可以看到不转换的value字段里面有字符串,那么怎么在sql语句里面避免这些字符串抽取出来:

    在这里插入图片描述

    可以用when value REGEXP’[^0-9.]’=0 then value 判定是否为数值

    SELECT parent_type, child_type, province, CASE WHEN NAME LIKE “磁盘使用率%” THEN “磁盘使用率” WHEN NAME LIKE “磁盘使用量-C” THEN “磁盘使用量” WHEN NAME LIKE “%磁盘io%” THEN “磁盘io” ELSE NAME END AS NAME, be_monitored_ip, unit, CASE WHEN VALUE REGEXP ‘[^0-9.]’ = 0 THEN VALUE WHEN VALUE LIKE “正常” THEN “0” ELSE “1” END AS VALUE , create_time FROM data_c WHERE value != ‘’ AND ( BINARY NAME LIKE “cpu使用率” OR BINARY NAME LIKE “cpu频率” OR BINARY NAME LIKE “cpu温度” OR BINARY NAME LIKE “磁盘使用率-C” OR BINARY NAME LIKE “磁盘使用量-C” OR BINARY NAME LIKE “%磁盘io%” OR BINARY NAME LIKE “磁盘大小” OR BINARY NAME LIKE “内存大小” OR BINARY NAME LIKE “内存使用率” OR BINARY child_type = “中间件” ) AND create_day LIKE “2020111%” LIMIT 100

    SELECT
    	parent_type,
    	child_type,
    	province,
    CASE
    		
    		WHEN NAME LIKE "磁盘使用率%" THEN
    		"磁盘使用率" 
    		WHEN NAME LIKE "磁盘使用量-C" THEN
    		"磁盘使用量" 
    		WHEN NAME LIKE "%磁盘io%" THEN
    		"磁盘io" ELSE NAME 
    	END AS NAME,
    	be_monitored_ip,
    	unit,
    CASE
    		
    	WHEN 
    	VALUE
    		REGEXP '[^0-9.]' = 0 THEN
    			
    		VALUE
    			
    		WHEN 
    		VALUE
    			LIKE "正常" THEN
    				"0" ELSE "1" 
    		END AS 
    		VALUE
    			,
    			create_time 
    		FROM
    			data_cloud_monitor 
    		WHERE
    			`value` != '' 
    			AND (
    				BINARY NAME LIKE "cpu使用率" 
    				OR BINARY NAME LIKE "cpu频率" 
    				OR BINARY NAME LIKE "cpu温度" 
    				OR BINARY NAME LIKE "磁盘使用率-C" 
    				OR BINARY NAME LIKE "磁盘使用量-C" 
    				OR BINARY NAME LIKE "%磁盘io%" 
    				OR BINARY NAME LIKE "磁盘大小" 
    				OR BINARY NAME LIKE "内存大小" 
    				OR BINARY NAME LIKE "内存使用率" 
    				OR BINARY child_type = "中间件" 
    			) 
    		AND create_day LIKE "2020111%" 
    	LIMIT 100
    

    这里遇到了新的问题,sql语句使用到了[],我这里封装datax的json文件生成放到tomcat的web应用报错,这并不是每个人都会遇到的。
    在这里插入图片描述

    错误原因:

    当在浏览器中访问时 URL中带有特殊字符,如花括号冒号时,就会出现这个错误。

    例如:http://localhost:8080/index.do?{id:123}

    解决方法:

    1、去除URL中的特殊字符;

    3、使用 Post 方法提交数据

    4、更换低版本的Tomcat来规避这种问题。

    5、在 conf/catalina.properties 添加或者修改:

    5.1 添加 tomcat.util.http.parser.HttpParser.requestTargetAllow=|{}

    5.2 修改tomcat/conf/catalina.properties的配置文件

    Tomcat在 7.0.73, 8.0.39, 8.5.7 版本后,添加了对于http头的验证。

    具体来说,就是添加了些规则去限制HTTP头的规范性

    org.apache.tomcat.util.http.parser.HttpParser#IS_NOT_REQUEST_TARGET[]中定义了一堆not request target

    if(IS_CONTROL[i] || i > 127 || i == 32 || i == 34 || i == 35 || i == 60 || i == 62 || i == 92 || i == 94 || i == 96 || i == 123 || i == 124 || i == 125) {

    IS_NOT_REQUEST_TARGET[i] = true;

    }

    转换过来就是以下字符(对应10进制ASCII看):

    键盘上那些控制键:(<32或者=127)

    非英文字符(>127)

    空格(32)

    双引号(34)

    #(35)

    <(60)

    >(62)

    反斜杠(92)

    ^(94)

    ~(96)

    {(123)

    }(124)

    |(125)

    更改配置文件,重启服务器后,并没有解决问题。由于我的项目以前是使用tomcat7的所以这里回退到了tomcat7能够正常地完成抽取,这里我就不继续深入了。

    3.另一方面是设置jvm

    实际上按照逻辑来说,通过避免脏数据的写入才是解决问题的正确思路,降速问题应该就是由于脏数据写入,然后回滚操作使速度降低。脏数据太多让回滚操作的类内存爆了,调高内存感觉不是一个很合理的方法。通过sql解决脏数据在我这里可行。就不继续深入了。

    后期计划:

    根据需求看看是否有必要在tomcat更高版本寻找解决方案,目前通过降低tomcat版本+避免过多脏数据的写入来解决降速问题。

    展开全文
  • datax修复\N脏数据

    千次阅读 2020-09-18 21:04:53
    下载datax源码 修改datax源码plugin-unstructured-storage-util下的UnstructuredStorageReaderUtil.class 加上一个判断,因为在hdfs中,null值存储的是 \N ,所以需要把它转换成 null存储到Mysql中 if (columnValue....

    下载datax源码
    修改datax源码plugin-unstructured-storage-util下的UnstructuredStorageReaderUtil.class

    加上一个判断,因为在hdfs中,null值存储的是 \N ,所以需要把它转换成 null存储到Mysql中

    Type type = Type.valueOf(columnType.toUpperCase());
    // it's all ok if nullFormat is null
    if (columnValue.equals(nullFormat) || columnValue.equals("\\N")) {
       LOG.info("********** string的 原始值 为 "+columnValue+" ******");
       columnValue = null;
    }
    

    打包
    在这里插入图片描述
    你可以只打包自己的需要的

    然后会生成/${DATAX_Home}/target/datax/datax/plugin/xxreader
    在这里插入图片描述

    最后替换官网下的datax.tar.gz中的xxreader和xxwriter

    博主公众号
    求关注
    在这里插入图片描述

    展开全文
  • --------------------'+fn) logfxrs=logfind(fn,'脏数据.*\n.*record') for rr in logfxrs: print('脏数据',rr) flogall.write(rr) flogzang.write(rr) logfxrs = logfind(fn, '失败.*') for rr in logfxrs: print('...
    # coding=utf-8
    import codecs
    
    import os, re, datetime
    
    def get_file_path(root_path,file_list,dir_list):
        #获取该目录下所有的文件名称和目录名称
        dir_or_files = os.listdir(root_path)
        for dir_file in dir_or_files:
            #获取目录或者文件的路径
            dir_file_path = os.path.join(root_path,dir_file)
            #判断该路径为文件还是路径
            if os.path.isdir(dir_file_path):
                dir_list.append(dir_file_path)
                #递归获取所有文件和目录的路径
                get_file_path(dir_file_path,file_list,dir_list)
            else:
                #print(dir_file_path)
                #print(type(dir_file_path))
                #print(chardet.detect(dir_file_path.encode()))
                if dir_file_path.endswith(".log"):
                    file_list.append(dir_file_path)
    
    
    def logfind(filename,regexp):
        f = open(filename, "rb")
        file_content = f.read().decode('utf-8','ignore')
        f.close()
        # re.finditer(regexp,file_content,re.M|re.I)
        # searchObj = re.search(r'(.*)' + item2 + '.*', file_content, re.M | re.I)
        pattern = re.compile(regexp)  # 查找数字
        result1 = pattern.findall(str(file_content))
        return result1
    
    
    def dblist(dir_path):
        file_list = []
        # 用来存放所有的目录路径
        dir_list = []
        get_file_path(dir_path, file_list, dir_list)
        print(file_list)
        print(dir_list)
        return file_list
    
    
    if __name__ == "__main__":
        # 根目录路径
        root_path=r"F:\内蒙\datax\log"
        # root_path = r"F:\内蒙\datax\log\test"
    
        # 用来存放所有的文件路径
        file_list = []
        # 用来存放所有的目录路径
        dir_list = []
        get_file_path(root_path, file_list, dir_list)
        # print(file_list)
        # print(dir_list)
    
        flogall = open(".\\logerrlist.log", 'w', newline='', encoding='utf-8')
        flogzang = open(".\\logerrlist脏数据.log", 'w', newline='', encoding='utf-8')
        flogdberr = open(".\\logerrlist数据库错.log", 'w', newline='', encoding='utf-8')
    
    
    
        for fn in file_list:
            print('-------------'+fn)
            flogall.write('\n-----------------------------------------'+fn)
            flogzang.write('\n-----------------------------------------'+fn)
            flogdberr.write('\n-----------------------------------------'+fn)
            logfxrs=logfind(fn,'脏数据.*\n.*record')
            for rr in logfxrs:
                print('脏数据',rr)
                flogall.write(rr)
                flogzang.write(rr)
            logfxrs = logfind(fn, '失败.*')
            for rr in logfxrs:
                print('失败',rr)
                flogall.write(rr)
                flogdberr.write(rr)
            logfxrs = logfind(fn, '异常.*')
            for rr in logfxrs:
                print('异常',rr)
                flogall.write(rr)
                flogdberr.write(rr)
    
        flogall.close()
        flogzang.close()
        flogdberr.close()

    展开全文
  • mysql的null值通过datax抽取到hdfs,会变成引号,这不是我们所需要的,所以需要修改一下datax的源码
  • DataX读Oracle到ODPS脏数据少列问题

    千次阅读 2018-08-29 20:40:42
    同事用DataX同步数据到ODPS(MaxCompute)的时候,出现了脏数据。 1,找到该条脏数据的主键,值得一提的是,有些时候配置的JSON,主键的顺序在比较靠后的位置,所以需要把主键放在前面。这样报错日志,才会把主键...

    定位问题

    同事用DataX同步数据到ODPS(MaxCompute)的时候,出现了脏数据。

    1,找到该条脏数据的主键,值得一提的是,有些时候配置的JSON,主键的顺序在比较靠后的位置,所以需要把主键放在前面。这样报错日志,才会把主键打印出来,至于为什么后面会讲。

    2,去源段数据库查询这条记录,发现有部分字段乱码。

    3,通过CAST函数、TO_CHAR函数、CONVERT函数、DUMP函数,等坚定这个函数,在存储上就是乱码,与字符集无关。而且该记录是Number(14,10)的,经过多次转换,发现该字段的值十六进制(DUMP(COL,'1016'))是bd,e,ed。很匪夷所思

    4,姑且认定源段脏数据,但是为什么脏数据的记录,报错:源表的列个数小于目标表的列个数,源表列数是:9 目的表列数是:86 ,数目不匹配。。。。。。

    5,于是定位源码问题

     

     

    6,查看源码

    OracleReader

    public static class Task extends Reader.Task {
    
    		private Configuration readerSliceConfig;
    		private CommonRdbmsReader.Task commonRdbmsReaderTask;
    
    		@Override
    		public void init() {
    			this.readerSliceConfig = super.getPluginJobConf();
    			this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(
    					DATABASE_TYPE ,super.getTaskGroupId(), super.getTaskId());
    			this.commonRdbmsReaderTask.init(this.readerSliceConfig);
    		}
    
    		@Override
    		public void startRead(RecordSender recordSender) {
    			int fetchSize = this.readerSliceConfig
    					.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);
                       //在这里
    			this.commonRdbmsReaderTask.startRead(this.readerSliceConfig,
    					recordSender, super.getTaskPluginCollector(), fetchSize);
    		}
    
    		@Override
    		public void post() {
    			this.commonRdbmsReaderTask.post(this.readerSliceConfig);
    		}
    
    		@Override
    		public void destroy() {
    			this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
    		}
    
    	}

     CommonRbdmsReaderTask

     public void startRead(Configuration readerSliceConfig,
                                  RecordSender recordSender,
                                  TaskPluginCollector taskPluginCollector, int fetchSize) {
                String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
                String table = readerSliceConfig.getString(Key.TABLE);
    
                PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);
    
                LOG.info("Begin to read record by Sql: [{}\n] {}.",
                        querySql, basicMsg);
                PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
                queryPerfRecord.start();
    
                Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
                        username, password);
    
                // session config .etc related
                DBUtil.dealWithSessionConfig(conn, readerSliceConfig,
                        this.dataBaseType, basicMsg);
    
                int columnNumber = 0;
                ResultSet rs = null;
                try {
                    rs = DBUtil.query(conn, querySql, fetchSize);
                    queryPerfRecord.end();
    
                    ResultSetMetaData metaData = rs.getMetaData();
                    columnNumber = metaData.getColumnCount();
    
                    //这个统计干净的result_Next时间
                    PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
                    allResultPerfRecord.start();
    
                    long rsNextUsedTime = 0;
                    long lastTime = System.nanoTime();
                    while (rs.next()) {
                        rsNextUsedTime += (System.nanoTime() - lastTime);
                        //这里-------------------------------
                        this.transportOneRecord(recordSender, rs,
                                metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
                        lastTime = System.nanoTime();
                    }
    
                    allResultPerfRecord.end(rsNextUsedTime);
                    //目前大盘是依赖这个打印,而之前这个Finish read record是包含了sql查询和result next的全部时间
                    LOG.info("Finished read record by Sql: [{}\n] {}.",
                            querySql, basicMsg);
    
                }catch (Exception e) {
                    throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
                } finally {
                    DBUtil.closeDBResources(null, conn);
                }
            }

    transportOneRecord

     protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 
                    ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 
                    TaskPluginCollector taskPluginCollector) {
                Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 
                recordSender.sendToWriter(record);
                return record;
            }

    buildRecord

    protected Record buildRecord(RecordSender recordSender,ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
            		TaskPluginCollector taskPluginCollector) {
            	Record record = recordSender.createRecord();
    
                try {
                    for (int i = 1; i <= columnNumber; i++) {
                        switch (metaData.getColumnType(i)) {
    
                        case Types.CHAR:
                        case Types.NCHAR:
                        case Types.VARCHAR:
                        case Types.LONGVARCHAR:
                        case Types.NVARCHAR:
                        case Types.LONGNVARCHAR:
                            String rawData;
                            if(StringUtils.isBlank(mandatoryEncoding)){
                                rawData = rs.getString(i);
                            }else{
                                rawData = new String((rs.getBytes(i) == null ? EMPTY_CHAR_ARRAY : 
                                    rs.getBytes(i)), mandatoryEncoding);
                            }
                            record.addColumn(new StringColumn(rawData));
                            break;
    
                        case Types.CLOB:
                        case Types.NCLOB:
                            record.addColumn(new StringColumn(rs.getString(i)));
                            break;
    
                        case Types.SMALLINT:
                        case Types.TINYINT:
                        case Types.INTEGER:
                        case Types.BIGINT:
                            record.addColumn(new LongColumn(rs.getString(i)));
                            break;
    
                        case Types.NUMERIC:
                        case Types.DECIMAL:
                            record.addColumn(new DoubleColumn(rs.getString(i)));
                            break;
    
                        case Types.FLOAT:
                        case Types.REAL:
                        case Types.DOUBLE:
                            record.addColumn(new DoubleColumn(rs.getString(i)));
                            break;
    
                        case Types.TIME:
                            record.addColumn(new DateColumn(rs.getTime(i)));
                            break;
    
                        // for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
                        case Types.DATE:
                            if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {
                                record.addColumn(new LongColumn(rs.getInt(i)));
                            } else {
                                record.addColumn(new DateColumn(rs.getDate(i)));
                            }
                            break;
    
                        case Types.TIMESTAMP:
                            record.addColumn(new DateColumn(rs.getTimestamp(i)));
                            break;
    
                        case Types.BINARY:
                        case Types.VARBINARY:
                        case Types.BLOB:
                        case Types.LONGVARBINARY:
                            record.addColumn(new BytesColumn(rs.getBytes(i)));
                            break;
    
                        // warn: bit(1) -> Types.BIT 可使用BoolColumn
                        // warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn
                        case Types.BOOLEAN:
                        case Types.BIT:
                            record.addColumn(new BoolColumn(rs.getBoolean(i)));
                            break;
    
                        case Types.NULL:
                            String stringData = null;
                            if(rs.getObject(i) != null) {
                                stringData = rs.getObject(i).toString();
                            }
                            record.addColumn(new StringColumn(stringData));
                            break;
    
                        default:
                            throw DataXException
                                    .asDataXException(
                                            DBUtilErrorCode.UNSUPPORTED_TYPE,
                                            String.format(
                                                    "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
                                                    metaData.getColumnName(i),
                                                    metaData.getColumnType(i),
                                                    metaData.getColumnClassName(i)));
                        }
                    }
                } catch (Exception e) {
                    if (IS_DEBUG) {
                        LOG.debug("read data " + record.toString()
                                + " occur exception:", e);
                    }
                    //TODO 这里识别为脏数据靠谱吗?
                    taskPluginCollector.collectDirtyRecord(record, e);
                    if (e instanceof DataXException) {
                        throw (DataXException) e;
                    }
                }
                return record;
            }

    执行到这时发生异常

             case Types.DECIMAL:
                  record.addColumn(new DoubleColumn(rs.getString(i)));
                  break;

    于是考虑是否是getString的时候,结果集得不到这一列的值?

    查看源码

    T4CNumberAccessor.getString()

    String getString(int var1) throws SQLException {
            String var2 = super.getString(var1);
            if (var2 != null && this.definedColumnSize > 0 && var2.length() > this.definedColumnSize) {
                var2 = var2.substring(0, this.definedColumnSize);
            }
    
            return var2;
        }

    父类NumberCommonAccessor

     String getString(int var1) throws SQLException {
            Object var2 = null;
            if (this.rowSpaceIndicator == null) {
                SQLException var15 = DatabaseError.createSqlException(this.getConnectionDuringExceptionHandling(), 21);
                var15.fillInStackTrace();
                throw var15;
            } else if (this.rowSpaceIndicator[this.indicatorIndex + var1] == -1) {
                return (String)var2;
            } else {
                byte[] var3 = this.rowSpaceByte;
                int var4 = this.columnIndex + this.byteLength * var1 + 1;
                byte var5 = var3[var4 - 1];
                byte[] var6 = new byte[var5];
                System.arraycopy(var3, var4, var6, 0, var5);
                NUMBER var7 = new NUMBER(var6);
                String var8 = NUMBER.toString(var6);
                int var9 = var8.length();
                if (var8.startsWith("0.") || var8.startsWith("-0.")) {
                    --var9;
                }
    
                if (var9 <= 38) {
                    /* ----------- 这里 ---------*/
                    return var7.toText(38, (String)null).trim();
                } else {
                    var8 = var7.toText(-44, (String)null);
                    int var10 = var8.indexOf(69);
                    int var11 = var8.indexOf(43);
                    if (var10 == -1) {
                        var10 = var8.indexOf(101);
                    }
    
                    int var12;
                    for(var12 = var10 - 1; var8.charAt(var12) == '0'; --var12) {
                        ;
                    }
    
                    String var13 = var8.substring(0, var12 + 1);
                    String var14 = null;
                    if (var11 > 0) {
                        var14 = var8.substring(var11 + 1);
                    } else {
                        var14 = var8.substring(var10 + 1);
                    }
    
                    return (var13 + "E" + var14).trim();
                }
            }
        }

    关键地方  toText函数

      public String toText(int var1, String var2) throws SQLException {
            return _getLnxLib().lnxnuc(this.shareBytes(), var1, var2);
        }
    lnxnuc该函数,将字节数组转成对应的字符
     var30[var15] = var4[0];
     var30[var15 + 1] = var4[41];
     var30[var15 + 2] = var4[10];
     var30[var15 + 3] = var4[0];
     var30[var15 + 4] = var4[0];

    转换时,因为字符问题,转换异常导致数组越界抛出异常。

    回到DataX,DirtyRecord打印JSON数据信息

    @Override
    	public String toString() {
    		return JSON.toJSONString(this.columns);
    	}

    该记录之后被判为脏数据,后面的列不再考虑,直接进入下一循环。

    解决问题

    1,脏数据的字段,先去除,然后人工集成,走merge逻辑。

    2,修改CommonRdbmsReader代码,如果发现异常,抛出异常后,继续处理接下来的字段。因为是通过

     for (int i = 1; i <= columnNumber; i++) {
                        //获取每一列的Type
                        switch (metaData.getColumnType(i)) {
                        /* code */
      }
    }

    遍历每一列来做,所以遇到脏数据,把该数据写成指定的DirtyData标记。接着往里写。但不建议这么做。

    3,开放脏数据限制,毕竟脏数据并不需要集成,如果可控范围內,就让他过滤掉脏数据把。

    展开全文
  • DataX-web启动后执行器在线机器为0.0.0开头的地址,删除这个执行器后添加一个执行器,注册方式为手动录入,在线机器地址改为你的地址。 结果系统仍不能调度成功。 查看日志报错jdbc4....
  • datax值转换使用以及源码分析

    千次阅读 2019-07-05 17:15:08
    脏数据处理 什么是脏数据? 目前主要有三类脏数据: Reader读到不支持的类型、不合法的值。 不支持的类型转换,比如:Bytes转换为Date。 写入目标端失败,比如:写mysql整型长度超长 如何处理脏数据 ...
  • Datax的配置及使用

    千次阅读 2019-09-20 17:21:13
    DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能 DataX目前已经有了比较全面的插件体系,主流的...
  • Datax-数据抽取同步利器

    千次阅读 2020-12-19 19:55:18
    Datax概览DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。为了解决异构数据源同步问题,DataX将...
  • 一、根据日期进行增量数据抽取1.页面任务配置打开菜单任务管理页面,选择添加任务按下图中5个步骤进行配置1.任务类型选DataX任务2.辅助参数选择时间自增3.增量开始时间选择,即sql中查询时间的开始时间,用户使用此...
  • datax数据导出

    千次阅读 2019-10-16 17:56:01
    场景:mysql单表transaction数据量达到20亿,占服务器磁盘太多需要导出,但是普通方法导出太慢,这里借助datax工具,对transaction表按日期进行分表后将数据导入对应日期的表内。 1、需要安装的环境: (1)jdk ...
  • 准备 需要依赖(我的把源码下来,本地install,远程仓库是否有这两个依赖不确定) <dependency> <...datax-core</artifactId> <version>0.0.1-SNAPSHOT</version>
  • error stdoutplugincollector 脏数据 其他网站说是什么版本,什么数据类型错误,不一而足,但是对我个人而言不好使,没办法,喝口凉水,告诉自己这时候不要慌,看下日志上方的Engin信息,看下reader下的column信息,...
  • 原标题:基于 DataX 数据库基础表数据同步 万标2015年加入去哪儿技术团队。目前在金融事业部/支付中心,测试工程师岗位,对技术有浓厚兴趣。一、 问题及背景在此之前,新增的基础数据在上线后,是通过手工在各测试...
  • DataX导入与导出数据

    2021-12-08 15:23:37
    DataX导入与导出数据1、Datax安装2、从MySQL导出数据2.1、将MySQL中的student数据库中的student表导入hive中2.1.1、前提准备2.1.2、编写脚本2.1.3、执行脚本2.2、将MySQL中的数据导入hbase2.2.1、前提准备2.2.2、...
  • 阿里云DataWorks数据集成(DataX)架构&实践分享 分享嘉宾:罗海伟阿里云 编辑整理:约理中国科学院大学 目录 阿里云DataWorks数据集成(DataX)架构&实践分享 ▌为什么需要数据集成 数据集成的应用...
  • datax数据,从 mysql 到 phoenix

    千次阅读 2019-06-10 15:47:44
    2、读取列数据类型中 int 和 float 可能会导致脏数据,适当的使用 long 和 double类型替换 3、通常 hbase.phoenix 中的表名和列名都是大写,脚本中相对应的要注意大小写 脏数据的产生原因会有很多 4 输出到 hbase...
  • datax导入1000w行有脏数据2. 尝试500w,200w,100w,50w,只有50w的可以3. 尝试减去不同类型文件,只要达到一定大小就会出现脏数据4. 检查上传到服务器的文件,出现数据缺失5. 检查python脚本处理替换空值后的文件,...
  • dataX大数据同步

    2020-12-21 14:39:17
    安装直接解压就行dataX需要python2.x我之前已经安装了anaconda3,自带的是python3.7的版本,这边就需要切换conda create --name python27 python=2.7创建一个名为python27的环境,指定python版本为python2.7,它会...
  • DataX简单使用

    2021-02-06 16:58:19
    背景最近在重构权限管理系统(PMS),因此在验证新开发功能的行为是否和旧功能...迁移方案迁移的基本思路是写转换sql语句,查出数据并导入目标库的目标表思路1查出的数据导出到Excel,然后通过Excel导入到目的库。这种...
  • ERROR StdoutPluginCollector - 脏数据: {"exception":"Could not retrieve transation read-only status server","record":[{"byteSize":3,"index":0,"rawDat 原因:MySQL版本与DataX指定的mysql的连接驱动的版本...
  • DataX在mysql间数据迁移操作DataX概览MysqlReader插件实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的sql语句将数据从mysql库中SELECT出来。不同于其他关系型数据库,...
  • DataX 数据转储笔记

    千次阅读 2019-08-02 15:05:56
    一、关于DataX DataX 之所以会被应用到,就是为了解决不同数据库之间不同...DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需...
  • 1、datax软件已经在/home/oracle目录下安装好 2、将oracle_to_oracle_sigle.sh放在/home/oracle/datax/script下 3、将oracle_to_oracle.json放在/home/oracle/datax/job 4、将table_name.txt放在/home/oracle/datax/...
  • 数据迁移工具datax--介绍

    千次阅读 2018-09-05 11:33:56
    文章来源...​ DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功...
  • 数据同步工具datax

    千次阅读 2020-05-16 23:24:15
    DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套...
  • 从MySQL导入数据到hive上是没问题的,从hive上导出数据时,提示: [您的配置错误.]. - 列配置信息有错误. 因为您配置的任务中,源头读取字段数:1 与 目的表要写入的字段数:4 不相等. 请检查您的配置并作出修改. ...
  • {"core": {"transport": {"channel": {"speed": {"record": "10000","byte": 1048576}}}},"job": {"setting": {"speed": {"channel": 2,"byte":1048576,"record":10000}},"content": [{"reader": {"name": "hdfsread...
  • 初识数据源同步利器--DataX

    千次阅读 2019-11-29 19:23:44
    一、DataX是什么? DataX 是一个异构数据源离线同步工具,致力于实现包括...为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 461
精华内容 184
关键字:

datax脏数据

友情链接: BCB-vcl.rar