• 数据抽取
千次阅读
2021-02-11 04:51:33

首先创建一个数据帧(使用随机数据)：import pandas as pd

import numpy as np

from datetime import datetime, timedelta

ab = pd.DataFrame()

ab["subjectID"] = np.random.randint(5, size=200)#random list of "subjects" from 0 to 4

ab["day_number"] = np.random.randint(50, size=200)#random list of "dates" from 0 to 50

ab['real_date'] = ab.day_number.apply(lambda d: datetime(2018, 1, 1) + timedelta(days=d)) #to simulate real dates

ab["score1"] = np.random.randint(200, size=200)#meant to simulate one measurement from one subject

ab["score2"] = np.random.randint(400, size=200)#meant to simulate a second measurement

min_day = ab.real_date.min()

ab = ab.groupby(['subjectID', 'real_date']).sum() #because some subjects have more than 1 score each day

day_number score1 score2

subjectID real_date

0 2018-01-01 0 306 273

2018-01-04 3 32 60

2018-01-05 4 61 135

2018-01-08 21 477 393

2018-01-09 8 22 341

2018-01-10 9 137 30

2018-01-11 30 281 674

2018-01-14 13 183 396

2018-01-15 14 41 337

2018-01-16 15 83 50

然后用下一天的数据填充没有数据的天数：

^{pr2}$4天的下一次重采样(分组依据)：res = df.reset_index(level='subjectID').groupby('subjectID').resample('4D').first() #group by 4 days periods and keep only the first value res = res.drop(columns='subjectID') print(res.head(10)) day_number score1 score2 subjectID real_date 0 2018-01-01 0 306 273 2018-01-05 4 61 135 2018-01-09 8 22 341 2018-01-13 13 183 396 2018-01-17 18 91 46 2018-01-21 20 76 333 2018-01-25 48 131 212 2018-01-29 29 92 81 2018-02-02 32 172 55 2018-02-06 72 98 246 最后，当有超过4天的周期没有数据时，重新设置索引并处理情况：res = res.reset_index('real_date', drop=True) #the real_date has no meaning anymore res['real_date'] = res.day_number.apply(lambda d: min_day + timedelta(days=d)) #good real_date based on the day_number res = res.drop(columns='day_number') res = res.set_index('real_date', append=True) res = res.groupby(level=['subjectID', 'real_date']).first() #regroups periods with no data for more than 4 days print(res.head(10)) score1 score2 subjectID real_date 0 2018-01-01 306 273 2018-01-05 61 135 2018-01-09 22 341 2018-01-14 183 396 2018-01-19 91 46 2018-01-21 76 333 2018-01-30 92 81 2018-02-02 172 55 2018-02-10 40 218 2018-02-15 110 112 这有点复杂，但我认为这是最好的办法。虽然我不知道效率如何，但似乎也没那么糟。在 更多相关内容 • 具体来讲，大数据处理的基本流程可以分为数据抽取与集成、数据分析和数据解释等步骤。 数据抽取与集成 大数据的一个重要特点就是多样性，这就意味着数据来源极其广泛，数据类型极为繁杂。这种复杂的数据环境给... • kettle数据抽取、数据清洗、数据装换， 作业根据时间戳更新插入数据完整demo 1、先获取时间戳 2、删除目标库大于时间戳的 3、数据同步，获取源表跟目标表大于时间戳的，比较， 目标表多的删除， 少的插入更新 4... • 技术领域+数据抽取+应用工具 • 复赛数据 2021 数据抽取挑战赛复赛数据 2021 数据抽取挑战赛复赛数据 2021 数据抽取挑战赛复赛数据 2021 数据抽取挑战赛复赛数据 2021 数据抽取挑战赛复赛数据 2021 数据抽取挑战赛复赛数据 2021 数据抽取挑战赛复赛... • 随着大数据增长速度提高、数据体量增大, 数据的冗余也将会越来越大, 传统的数据软件...采取了一种更适合大数据时代的数据抽取模型, 并给出了数据抽取的判定方法。此方法具有复杂度低, 易于实现, 具有良好的估计性能。 • 通过调用kettle的API接口,实现将一个库的数据转移到另一个数据库中。附件中同时提供了抽取需要的jar包 • 主要记录了ETL中数据抽取的一些工具，并对工具进行了一部分的对比 • Connotate：Web数据抽取神器 网页数据抽取 非结构化数据抽取与分类分析 共15页.pdf • 针对关系数据库数据抽取过程中问题经常发生复制中断问题，采用数据库在线Redo日志增量数据快速定位于解析技术，设计一种Redo日志数据抽取模型，给出了INSERT、DELETE、UPDATE 3种DML操作日志数据捕获方法。... • 适合初学者学习使用kettle • 针对中文PDF格式论文元数据抽取问题，对大量中文科技论文进行分析归纳。总结出中文诊文元数据的互不包含性、排它性、重复性、顺序性和部分确定性，并据此定义简单元数据和复杂元数据的概念，应用字典匹配和支持向量... • 内容详细的简明的介绍了ETL数据抽取的原理，详细阐明了数据抽取的几种规则和方式 • 在用于ETL工具进行数据抽取的郭晨各种更通长会出现一个初始化方法在单元测试时可以完美运行单一旦加载到服务器上的时候就会出现异常，一般情况下就是少少这个jar包。 • NULL 博文链接：https://xuan0506.iteye.com/blog/1040352 • 文档内包含了最新版ETL数据抽取的说明和介绍，包含部分截图解释 • ETL（Extract-Transform-Load的缩写，即数据抽取、转换、装载的过程） • ETL即数据抽取（Extract）、转换（Transform）、装载（Load）的过程，它是构建数据仓库的重要环节。 ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程，目的是将企业中的分散、零乱、标准不统一的... # 概念 ETL即数据抽取（Extract）、转换（Transform）、装载（Load）的过程，它是构建数据仓库的重要环节。 ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程，目的是将企业中的分散、零乱、标准不统一的数据整合到一起，为企业的决策提供分析依据。ETL是BI项目重要的一个环节。通常情况下，在BI项目中ETL会花掉整个项目的1/3的时间,ETL设计的好坏直接关接到BI项目的成败。 ETL的设计分三部分：数据抽取、数据的清洗转换、数据的加载。在设计ETL的时候我们也是从这三部分出发。数据的抽取是从各个不同的数据源抽取到ODS(OperationalDataStore，操作型数据存储)中——这个过程也可以做一些数据的清洗和转换)，在抽取的过程中需要挑选不同的抽取方法，尽可能的提高ETL的运行效率。ETL三个部分中，花费时间最长的是“T”(Transform，清洗、转换)的部分，一般情况下这部分工作量是整个ETL的2/3。数据的加载一般在数据清洗完了之后直接写入DW(DataWarehousing，数据仓库)中去。 # 一、数据的抽取 这一部分需要在调研阶段做大量的工作，首先要搞清楚数据是从几个业务系统中来,各个业务系统的数据库服务器运行什么DBMS,是否存在手工数据，手工数据量有多大，是否存在非结构化的数据等等，当收集完这些信息之后才可以进行数据抽取的设计。 1、对于与存放DW的数据库系统相同的数据源处理方法 这一类数据源在设计上比较容易。一般情况下，DBMS(SQLServer、Oracle)都会提供数据库链接功能，在DW数据库服务器和原业务系统之间建立直接的链接关系就可以写Select语句直接访问。 2、对于与DW数据库系统不同的数据源的处理方法 对于这一类数据源，一般情况下也可以通过ODBC的方式建立数据库链接——如SQLServer和Oracle之间。如果不能建立数据库链接，可以有两种方式完成，一种是通过工具将源数据导出成.txt或者是.xls文件，然后再将这些源系统文件导入到ODS中。另外一种方法是通过程序接口来完成。 3、对于文件类型数据源(.txt,.xls)，可以培训业务人员利用数据库工具将这些数据导入到指定的数据库，然后从指定的数据库中抽取。或者还可以借助工具实现，如SQLServer2005的SSIS服务的平面数据源和平面目标等组件导入ODS中去。 4、增量更新的问题 对于数据量大的系统，必须考虑增量抽取。一般情况下，业务系统会记录业务发生的时间，我们可以用来做增量的标志,每次抽取之前首先判断ODS中记录最大的时间，然后根据这个时间去业务系统取大于这个时间所有的记录。利用业务系统的时间戳，一般情况下，业务系统没有或者部分有时间戳。 二、数据的清洗转换 一般情况下，数据仓库分为ODS、DW两部分。通常的做法是从业务系统到ODS做清洗，将脏数据和不完整数据过滤掉，在从ODS到DW的过程中转换，进行一些业务规则的计算和聚合。 ## 1、数据清洗 数据清洗的任务是过滤那些不符合要求的数据，将过滤的结果交给业务主管部门，确认是否过滤掉还是由业务单位修正之后再进行抽取。不符合要求的数据主要是有不完整的数据、错误的数据、重复的数据三大类。 (1)不完整的数据：这一类数据主要是一些应该有的信息缺失，如供应商的名称、分公司的名称、客户的区域信息缺失、业务系统中主表与明细表不能匹配等。对于这一类数据过滤出来，按缺失的内容分别写入不同Excel文件向客户提交，要求在规定的时间内补全。补全后才写入数据仓库。 (2)错误的数据：这一类错误产生的原因是业务系统不够健全，在接收输入后没有进行判断直接写入后台数据库造成的，比如数值数据输成全角数字字符、字符串数据后面有一个回车操作、日期格式不正确、日期越界等。这一类数据也要分类，对于类似于全角字符、数据前后有不可见字符的问题，只能通过写SQL语句的方式找出来，然后要求客户在业务系统修正之后抽取。日期格式不正确的或者是日期越界的这一类错误会导致ETL运行失败，这一类错误需要去业务系统数据库用SQL的方式挑出来，交给业务主管部门要求限期修正，修正之后再抽取。 (3)重复的数据：对于这一类数据——特别是维表中会出现这种情况——将重复数据记录的所有字段导出来，让客户确认并整理。 数据清洗是一个反复的过程，不可能在几天内完成，只有不断的发现问题，解决问题。对于是否过滤，是否修正一般要求客户确认，对于过滤掉的数据，写入Excel文件或者将过滤数据写入数据表，在ETL开发的初期可以每天向业务单位发送过滤数据的邮件，促使他们尽快地修正错误,同时也可以做为将来验证数据的依据。数据清洗需要注意的是不要将有用的数据过滤掉，对于每个过滤规则认真进行验证，并要用户确认。 ## 2、数据转换 数据转换的任务主要进行不一致的数据转换、数据粒度的转换，以及一些商务规则的计算。 (1)不一致数据转换：这个过程是一个整合的过程，将不同业务系统的相同类型的数据统一，比如同一个供应商在结算系统的编码是XX0001,而在CRM中编码是YY0001，这样在抽取过来之后统一转换成一个编码。 (2)数据粒度的转换：业务系统一般存储非常明细的数据，而数据仓库中数据是用来分析的，不需要非常明细的数据。一般情况下，会将业务系统数据按照数据仓库粒度进行聚合。 (3)商务规则的计算：不同的企业有不同的业务规则、不同的数据指标，这些指标有的时候不是简单的加加减减就能完成，这个时候需要在ETL中将这些数据指标计算好了之后存储在数据仓库中，以供分析使用。 # 三、ETL日志、警告发送 ## 1、ETL日志 ETL日志分为三类。一类是执行过程日志，这一部分日志是在ETL执行过程中每执行一步的记录，记录每次运行每一步骤的起始时间，影响了多少行数据，流水账形式。一类是错误日志，当某个模块出错的时候写错误日志，记录每次出错的时间、出错的模块以及出错的信息等。第三类日志是总体日志，只记录ETL开始时间、结束时间是否成功信息。如果使用ETL工具,ETL工具会自动产生一些日志，这一类日志也可以作为ETL日志的一部分。记录日志的目的是随时可以知道ETL运行情况，如果出错了，可以知道哪里出错。 ## 2、警告发送 如果ETL出错了，不仅要形成ETL出错日志，而且要向系统管理员发送警告。发送警告的方式多种，一般常用的就是给系统管理员发送邮件，并附上出错的信息，方便管理员排查错误。 ETL是BI项目的关键部分，也是一个长期的过程，只有不断的发现问题并解决问题，才能使ETL运行效率更高，为BI项目后期开发提供准确的数据。 展开全文 • ## 数据抽取 千次阅读 2016-10-19 13:53:51 数据抽取是指从源数据源系统抽取目的数据源系统需要的数据。实际应用中，数据源较多采用的是关系数据库。 数据抽取的方式 (一) 全量抽取 全量抽取类似于数据迁移或数据复制，它将数据源中的表或视图的... 数据抽取是指从源数据源系统抽取目的数据源系统需要的数据。实际应用中，数据源较多采用的是关系数据库 数据抽取的方式 (一) 全量抽取 全量抽取类似于数据迁移或数据复制，它将数据源中的表或视图的数据原封不动的从数 据库中抽取出来，并转换成自己的ETL 工具可以识别的格式。全量抽取比较简单。 (二) 增量抽取 增量抽取只抽取自上次抽取以来数据库中要抽取的表中新增或修改的数据。在ETL 使用过程中，增量抽取较全量抽取应用更广。如何捕获变化的数据是增量抽取的关键。对捕获方法一般有两点要求：准确性，能够将业务系统中的变化数据按一定的频率准确地捕获到；性能，不能对业务系统造成太大的压力，影响现有业务。目前增量数据抽取中常用的捕获变化数据的方法有： (a) 触发器方式（又称快照式） 在要抽取的表上建立需要的触发器，一般要建立插入、修改、删除三个触发器，每当源表中的数据发生变化，就被相应的触发器将变化的数据写入一个临时表，抽取线程从临时表中抽取数据，临时表中抽取过的数据被标记或删除。 优点：数据抽取的性能高，ETL 加载规则简单，速度快，不需要修改业务系统表结构，可以实现数据的递增加载。 缺点：要求业务表建立触发器，对业务系统有一定的影响，容易对源数据库构成威胁。 (b) 时间戳方式 它是一种基于快照比较的变化数据捕获方式，在源表上增加一个时间戳字段，系统中更新修改表数据的时候，同时修改时间戳字段的值。当进行数据抽取时，通过比较上次抽取时间与时间戳字段的值来决定抽取哪些数据。有的数据库的时间戳支持自动更新，即表的其它字段的数据发生改变时，自动更新时间戳字段的值。有的数据库不支持时间戳的自动更新，这就要求业务系统在更新业务数据时，手工更新时间戳字段。 优点：同触发器方式一样，时间戳方式的性能也比较好，ETL 系统设计清晰，源数据抽取相对清楚简单，可以实现数据的递增加载。 缺点：时间戳维护需要由业务系统完成，对业务系统也有很大的倾 入性（加入额外的时间戳字段），特别是对不支持时间戳的自动更新的数据库，还要求业务系统进行额外的更新时间戳操作；另外，无法捕获对时间戳以前数据的delete和update 操作，在数据准确性上受到了一定的限制。 (c) 全表删除插入方式 每次ETL 操作均删除目标表数据，由ETL 全新加载数据。 优点：ETL 加载规则简单，速度快。 缺点：对于维表加外键不适应，当业务系统产生删除数据操作时，综合数据库将不会记录到所删除的历史数据，不可以实现数据的递增加载；同时对于目标表所建立的关联关系，需要重新进行创建。 (d) 全表比对方式 全表比对的方式是ETL 工具事先为要抽取的表建立一个结构类似的临时表，该临时表记录源表主键以及根据所有字段的数据计算出来，每次进行数据抽取时，对源表和临时表进行的比对，如有不同，进行Update 操作，如目标表没有存在该主键值，表示该记录还没有，即进行Insert 操作。 优点：对已有系统表结构不产生影响，不需要修改业务操作程序，所有抽取规则由ETL完成，管理维护统一，可以实现数据的递增加载，没有风险。 缺点：ETL 比对较复杂，设计较为复杂，速度较慢。与触发器和时间戳方式中的主动通知不同，全表比对方式是被动的进行全表数据的比对，性能较差。当表中没有主键或唯一列且含有重复记录时，全表比对方式的准确性较差。 (e)日志表方式 在业务系统中添加系统日志表，当业务数据发生变化时，更新维护日志表内容，当作ETL 加载时，通过读日志表数据决定加载那些数据及如何加载。 优点：不需要修改业务系统表结构，源数据抽取清楚，速度较快。可以实现数据的递增加载。 缺点：日志表维护需要由业务系统完成，需要对业务系统业务操作程序作修改，记录日志信息。日志表维护较为麻烦，对原有系统有较大影响。工作量较大，改动较大，有一定风险。 (f) Oracle 变化数据捕捉（CDC 方式） 通过分析数据库自身的日志来判断变化的数据。Oracle 的改变数据捕获（CDC，Changed Data Capture）技术是这方面的代表。CDC 特性是在Oracle9i 数据库中引入的。CDC 能够帮助你识别从上次抽取之后发生变化的数据。利用CDC，在对源表进行insert、update 或 delete 等操作的同时就可以提取数据，并且变化的数据被保存在数据库的变化表中。这样就可以捕获发生变化的数据，然后利用数据库视图以一种可控的方式提供给目标系统。CDC 体系结构基于发布/订阅模型。发布者捕捉变化数据并提供给订阅者。订阅者使用从发布者那里获得的变化数据。通常，CDC 系统拥有一个发布者和多个订阅者。发布者首先需要识别捕获变化数据所需的源表。然后，它捕捉变化的数据并将其保存在特别创建的变化表中。它还使订阅者能够控制对变化数据的访问。订阅者需要清楚自己感兴趣的是哪些变化数据。一个订阅者可能不会对发布者发布的所有数据都感兴趣。订阅者需要创建一个订阅者视图来访问经发布者授权可以访问的变化数据。CDC 分为同步模式和异步模式，同步模式实时的捕获变化数据并存储到变化表中，发布者与订阅都位于同一数据库中；异步模式则是基于Oracle 的流复制技术。 优点：提供了易于使用的API 来设置CDC 环境，缩短ETL 的时间。不需要修改业务系统表结构，可以实现数据的递增加载。 缺点：业务系统数据库版本与产品不统一，难以统一实现，实现过程相对复杂，并且需深入研究方能实现。或者通过第三方工具实现，价格昂贵。 展开全文 • kettle数据清洗抽取，全量对比记录，包含列转行，增加序列，字段拆分，对比记录 • 一、Kettle数据抽取概览 1. 文件抽取 （1）处理文本文件 （2）处理XML文件 2. 数据库抽取 二、变化数据捕获 1. 基于源数据的CDC 2. 基于触发器的CDC 3. 基于快照的CDC 4. 基于日志的CDC 三、使用Sqoop抽取... 目录 一、Kettle数据抽取概览 1. 文件抽取 （1）处理文本文件 （2）处理XML文件 2. 数据库抽取 二、变化数据捕获 1. 基于源数据的CDC 2. 基于触发器的CDC 3. 基于快照的CDC 4. 基于日志的CDC 三、使用Sqoop抽取数据 1. Sqoop简介 2. 使用Sqoop抽取数据 3. Sqoop优化 （1）调整Sqoop命令行参数 （2）调整数据库 四、小结 本篇介绍如何利用Kettle提供的转换步骤和作业项实现Hadoop数据仓库的数据抽取，即ETL过程中的Extract部分。首先简述Kettle中几种抽取数据的组件，然后讲述变化数据捕获（Change Data Capture，CDC），以及Kettle如何支持不同的CDC技术。Hadoop生态圈中的Sqoop工具可以直接在关系数据库和HDFS或Hive之间互导数据，而Kettle支持Sqoop输入、输出作业项。最后我们使用Kettle里的Sqoop作业项以及基于时间戳的CDC转换实现销售订单示例的数据抽取过程，将MySQL中的源数据抽取到Hive的rds数据库中。 数据抽取是一个艰难的工作，因为数据源是多样和复杂的。在传统数据仓库环境下，数据通常来源于事务类应用系统，大部分这类系统都是把数据存储在MySQL、Oracle或SQL Server等关系数据库中。一般要从业务角度进行抽取，这也是一个挑战，从技术上来看，最好能使用JDBC直连数据库。如果数据库不是关系型的或者没有可用的驱动，一般就需要使用具有固定分隔符的文本文件来获取数据。还有一种情况是数据属于外部系统，不能直连，使用文本文件交换数据是唯一选择。除此之外，Kettle提供了几种方法来访问互联网数据，如通过RSS或者Salesforce.com网站直连，或者通过Web Service等。 # 一、Kettle数据抽取概览 Kettle大部分数据抽取类的步骤都放在“输入”类别下。输入类的步骤，顾名思义就是从外部数据源抽取数据，把数据输入到Kettle的数据流中。Kettle 8.3的输入类下有37个步骤，其中最常用的是“文本文件输入”和“表输入”。一般来说准备要读取的数据（尤其是文件类数据）的功能往往在作业里完成，实际读取数据才在转换这一层。各个步骤和作业项的功能选项，大都能直接从选项名称了解其含义。详细说明可使用Kettle在线帮助文档。在菜单条上选择“帮助” -> “显示欢迎屏幕” -> “Documentation”打开在线帮助文档。 ## 1. 文件抽取 Kettle在转换里提供了文件基本的读写操作，对于文件的其它操作（移动、复制、创建、删除、比较、压缩、解压缩等）都在“文件管理”作业项中。在使用“文本文件输出”步骤前，不必先创建一个文件。如果文件不存在，该步骤会自动创建一个。下面介绍两种最常用的处理场景，即从文本文件与XML文件抽取数据。 ### （1）处理文本文件 文本文件可能是使用ETL工具处理的最简单的一种数据源，读写文本文件没有太多技巧。文本文件易于交换，压缩比较高，任何文本编辑器都可以用于打开文本文件。总体说有以下两类文本文件： • 固定分隔符文件：这种文件里，每列都由特定字符分隔。通常这类文件也称为CSV（逗号分隔值）文件或TSV（制表符分隔值）文件。 • 固定宽度文件：每列都有指定的长度。尽管固定宽度文件的格式非常明确，但也需要一些时间来定义。Kettle在“固定宽度文件输入”的“获取字段”选项里提供了一些辅助工具，但如果要在分隔符文件和固定宽度文件之间选择，最好还是选择分隔符文件。 对于这两种文件，都可以选择文件编码。UTF-8是通常情况下的标准编码格式，但其它编码格式，如ANSI或UTF-8-BOM也在广泛使用。为了正常读取文件内容，必须要设置正确的文件编码。文件编辑软件能够查看文件编码，如使用Notepad++打开文件，选择“编码”菜单即可查看或修改当前文件的编码。 “CSV文件输入”是基本的文本文件输入步骤，CSV文件是一种用具有固定列分隔符的文本文件。在处理这种文件之前要确定分隔符和字段。“CSV文件输入”步骤和与之相似的“固定宽度文件输入”步骤都不太适合一次处理多个文件，这两个步骤其实都是“文本文件输入”步骤的简化版。“文本文件输入”步骤是一个功能强大的步骤，也是处理文本文件的首选步骤。其主要功能如下： • 从前一个步骤读取文件名。 • 一次运行读取多个文件。 • 从.zip或.gzip压缩文件中读取文件。 • 不用指定文件结构就可以显示文件内容。注意需要指定文件格式（DOS、UNIX或Mixed），因为Kettle需要知道文件的换行符。在DOS用\r\n代表换行，UNIX只用\n代表换行。 • 指定转义字符，用来读取字段数据里包含分隔符的字段。通常的转义字符是反斜线（\）。 • 错误处理。 • 过滤。 • 指定本地化的日期格式。 当然使用这些功能是有代价的，“文本文件输入”步骤比“CSV文件输入”步骤和“固定宽度文件输入”步骤需要占用更多内存和CPU处理能力。 下面看一个Kettle处理的常见场景。假设有一组zip压缩文件，每个zip文件中包含若干文本文件，所有文本文件具有相同的格式。需求是将文本文件中的记录抽取到数据库表中，并且标明每条记录所属的文本文件和zip文件。在“Kettle构建Hadoop ETL实践（一）：ETL与Kettle”里介绍Kettle虚拟文件系统时，我们知道了Kettle使用Apache的通用VFS作为文件处理接口，能够直接读取zip压缩包中的多个文件，本例将使用这一特性。 我们用的例子文件是a.zip和b.zip，a.zip中包含1.txt和2.txt两个文件，b.zip中包含3.txt和4.txt两个文件。文本文件具有三个字段，以逗号作为列分隔符。4个文本文件的内容如下，反斜杠是转义字符： # 1.txt 11,1a\,aa,2020-01-01 01:01:01 12,1b\,bb,2020-01-01 02:02:02 13,1c\,cc,2020-01-01 03:03:03 # 2.txt 21,2a\,aa,2020-02-02 01:01:01 22,2b\,bb,2020-02-02 02:02:02 23,2c\,cc,2020-02-02 03:03:03 # 3.txt 31,3a\,aa,2020-03-03 01:01:01 32,3b\,bb,2020-03-03 02:02:02 33,3c\,cc,2020-03-03 03:03:03 # 4.txt 41,4a\,aa,2020-04-04 01:01:01 42,4b\,bb,2020-04-04 02:02:02 43,4c\,cc,2020-04-04 03:03:03 创建的目标表如下，c1、c2、c3三个字段对应分别对应文本文件中的三列，c4字段存储记录所属的文件名： create table t_txt ( c1 int(11) default null, c2 varchar(20) default null, c3 datetime default null, c4 varchar(100) default null); 创建的Kettle转换如图5-1所示，包含“自定义常量数据”、“获取文件名”、“文本文件输入”、“表输出”四个步骤。 “自定义常量数据”步骤用于定义zip和txt的文件名。当然也可以直接在“获取文件名”步骤中的“文件或目录”写死所要读取的文件名。这里使用“自定义常量数据”步骤的目的是想使输入的文件名参数化，当需要从不同的文件抽取时，只需修改这个步骤，而后面的步骤都不用变更。 在“自定义常量数据”步骤里的“元数据”标签页中创建两个字符串类型的字段zip和txt，然后在“数据”标签页中给这两个字段赋值如图5-2所示。注意两个字段值的写法。zip字段以zip协议开头，后面是zip文件的绝对路径，以‘!/’结尾。txt字段值为正则表达式，表示zip包中所有‘.txt’后缀的文件。 “获取文件名”步骤的设置如图5-3所示。选中“文件名定义在字段里”选项，“从字段获取文件名”选择“zip”，“从字段获取通配符”选择“txt”。这两个字段的值从前一步骤传递过来。 下一步骤是“文本文件输入”步骤。首先要确定文件的结构，打开“文本文件输入”步骤设置对话框，在“文件”标签页中点击“浏览”按钮，找到其中一个zip文件，然后点击“增加”按钮把这个文件添加到“选中的文件”列表中，如“zip:/root/kettle_hadoop/5/a.zip!/”。现在可以点击“文件”标签页中的“显示文件内容”按钮打开这个文件，可以看到这个文件的列分隔符、是否带有表头和封闭符等信息。我们可以使用这些信息来设置“内容”标签页里的选项，本例具体如图5-4所示。 定义完文件格式后，再选择“字段”标签页并点击“获取字段”按钮。Kettle会尽量判断出每个字段的数据类型，本例中如图5-5所示。 为了验证设置的正确性，点击“预览记录”按钮，如果出现预览的数据，说明设置正确。下一步需要把“获取文件名”步骤和“文本文件输入”步骤连接起来。回到“文本文件输入”步骤的“文件”标签页，选中“从以前的步骤接受文件名”和“从以前的步骤接受字段名”，并选中“获取文件名”步骤作为文件名的来源，选中filename字段作为文件名的字段，该字段由“获取文件名”步骤所生成。注意现在不能再使用“预览记录”选项，只能在该步骤上选择转换里的预览。 我们注意到在“文本文件输入”步骤里也有路径和文件名正则表达式选项，但最好把选择文件的过程单独放在“获取文件名”步骤里。因为“获取文件名”步骤可以从前面的步骤获得路径名和文件名的正则表达式，这样比较灵活。而且“文本文件输入”步骤本身不能获取到文件名。 最后一个步骤是“表输出”，将文件内容装载到数据库表中。在该步骤中勾选“指定数据库字段”选项，然后在“数据库字段”标签页点击“获取字段”按钮，在“插入的字段”列表中将会出现前面步骤数据流中的所有字段。只需要选择表字段及其对应的流字段，本例中为： c1 Field_000 c2 Field_001 c3 Field_002 c4 filename 保存并执行该转换后，t_txt表中数据如下： mysql> select * from t_txt; +------+-------+---------------------+-----------------------------------------------+ | c1 | c2 | c3 | c4 | +------+-------+---------------------+-----------------------------------------------+ | 11 | 1a,aa | 2020-01-01 01:01:01 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt | | 12 | 1b,bb | 2020-01-01 02:02:02 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt | | 13 | 1c,cc | 2020-01-01 03:03:03 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt | | 21 | 2a,aa | 2020-02-02 01:01:01 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt | | 22 | 2b,bb | 2020-02-02 02:02:02 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt | | 23 | 2c,cc | 2020-02-02 03:03:03 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt | | 31 | 3a,aa | 2020-03-03 01:01:01 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt | | 32 | 3b,bb | 2020-03-03 02:02:02 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt | | 33 | 3c,cc | 2020-03-03 03:03:03 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt | | 41 | 4a,aa | 2020-04-04 01:01:01 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt | | 42 | 4b,bb | 2020-04-04 02:02:02 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt | | 43 | 4c,cc | 2020-04-04 03:03:03 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt | +------+-------+---------------------+-----------------------------------------------+ 12 rows in set (0.00 sec) ### （2）处理XML文件 XML是扩展标识语言（eXtensible Markup Language）的缩写，是一种在平面文件中定义数据结构和内容的开放标准。XML格式非常流行，很多系统都使用这种格式交换数据。XML实际是一种遵照规范的结构化的文本文件，可以使用文本编辑器打开。Kettle里有四种验证XML数据是否有效的方法。 • 验证XML文件是否有效：只验证XML是否有完整的开始和结束标签，及各层嵌套的结构是否完整。 • DTD验证：检查XML文件的结构是否符合DTD（Data Type Definition）文件的要求。DTD可以是一个独立的文件，也可以包含在XML文件中。 • XSD验证（作业）：检查XML文件的结构是否符合XML Schema定义文件的要求。 • XSD验证（转换）：和上面相同，但XML是在数据流的字段里（如数据库的列里包含XML格式数据）。 可以使用“Get data from XML”步骤读取XML文件。读取XML文件的主要障碍就是分析嵌套的文件结构。从这个步骤输出的数据流是平面的没有嵌套的数据结构，可以存储在关系数据库中。与之相反，“Add XML”步骤用来把平面数据构造成嵌套形式的XML格式数据。 如果想把XML转成其它格式，如另一种格式的XML文件、平面文件或HTML文件，要使用“XSL transformation”步骤。XSL是扩展样式语言（eXtensible Stylesheet Language）的缩写，这是一种用来转换XML文档的XML语言。转换里的“XSD validator”步骤验证数据流里的XML格式的数据，作业里的“XSD validator”作业项用于验证一个完整的XML文件。 XML是一种非常灵活的格式，可以用来表达很多种数据结构，下面看一个简单的示例。首先准备一个XML文档，然后创建一个转换，从该文档抽取数据，并把数据保存在一个MySQL表中。最后再创建一个功能相反的转换，从MySQL表中抽取数据并保存成XML文件。 示例XML文档sample.xml的内容如下： <rows> <info> <infodata user="user1"> <data>data1</data> </infodata> <infodata user="user2"> <data>data2</data> </infodata> </info> <row> <parameter> <user>user1</user> <password>pass1</password> </parameter> <parameter> <user>user2</user> <password>pass2</password> </parameter> <parameter> <user>user3</user> <password>pass3</password> </parameter> </row> </rows> <rows>节点下包括了一个<info>节点和一个<row>节点。这两个节点分别又包含一组<infodata>节点和一组<parameter>节点。<infodata>节点具有属性user。<parameter>节点下的<user>节点包括了某个<infodata>节点的user属性值。 对应MySQL表t_xml结构如下： mysql> desc t_xml; +----------+-------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +----------+-------------+------+-----+---------+-------+ | rn | int(11) | YES | | NULL | | | username | varchar(20) | YES | | NULL | | | pass | varchar(20) | YES | | NULL | | | info | varchar(20) | YES | | NULL | | | xmlfile | varchar(50) | YES | | NULL | | +----------+-------------+------+-----+---------+-------+ 5 rows in set (0.01 sec) rn存储记录行号，username和pass字段分别存储XML文档中<user>和<password>节点的值，info字段保存<data>节点的值，xmlfile保存XML文件名。 如图5-6所示的转换从sample.xml文件抽取数据并转载到数据库表中。 这个转换只有“Get data from XML”和“表输出”两个步骤。“Get data from XML”步骤从静态XML文件读取数据，并输出XML节点值，本质上是将一个层次结构平面化展开的过程。 在该步骤的“文件”标签页选择要读取的XML文件。点击“浏览”按钮选择本地的sample.xml文件，然后点击“增加”按钮，/root/kettle_hadoop/5/sample.xml将出现在“选中的文件和目录”列表中。“内容”标签页定义XML文件格式，如图5-7所示。 标签页里最重要的属性是“循环读取路径”。这里需要设置一个XPath表达式。XPath表达式将从XML文档中过滤出一个节点集，就是XML节点的一个集合。集合里的每一个节点都将被解析为一行记录，并放到输出流中。本例中设置为/rows/row/parameter。如果已经在“文件”标签页中指定了一个XML文件，可以点击“获取XML文档的所有路径”按钮帮助设置XPath属性。这个按钮获取了XML文档里的全部路径，如图5-8所示。 “内容”标签页里还包括以下属性。 • 编码：用来定义XML文档的编码。如果XML文档本身没有指定编码，就要用到这个选项。通常情况下，XML文档的编码在文件头定义，例如：<?xml version="1.0" encoding="UTF-8"?> • 考虑命名空间：如果文档使用了命名空间，需要选中该选项。 • 忽略注释：通常情况下，XML注释也被看作是节点，如果要忽略注释节点，要选中该选项。 • 验证XML：如果想在抽取数据前使用DTD验证，要选中该选项。 • 使用标记：该选项用于“字段”标签页的设置，在后面讨论。 • 忽略空文件：如果指定的文件是空，不会抛出异常。 • 如果没有文件不要报告错误：如果指定的文件不存在，不会抛出异常。 • 限制：限制生成的最大记录行数，默认值为0，意味着对每一个抽取到的XML节点都生成一条记录。 • 用于截取数据的XML路径（大文件）：一般情况下，XML文档一次性读如内存，读取路径XPath表达式可以应用于整个文档。但如果XML文档非常大，XPath表达式匹配到的所有XML节点不能一次放入内存中，此时就需要指定另一个XPath表达式把XML文档分成多块，就是这里的XML截取路径。这个用于把XML文档分块的XPath路径不支持全部的XPath语法，只能使用斜线分隔的节点名这种语法格式，不支持命名空间和谓词表达式。另外截取路径XPath必须是读取路径的上一级或同级目录。 • 输出中包括文件名/文件名字段：如果使用XML文件作为源，该选项可以在输出流中增加一个字段保存XML文件名。“文件名字段”选项设置新增字段的字段名。 • 输出中包括行号/行数字段：该选项可以为每一个数据行生成一个序列号。“行数字段”选项设置行号字段的字段名。 • 将文件增加到结果文件中：如果使用了XML文件，选中该选项把文件添加到结果文件列表中。在父作业中就可以再处理这个文件。 在“内容”标签页中已经使用XPath表达式匹配了XML节点集。“字段”标签页用来从XML节点抽取字段，如图5-9所示。 列表中的前两行是点击“获取字段”自动得到的。“名称”列用来设置要抽取的字段名。“XML路径”列使用XPath表达式指定从哪里获得字段的值。XPath表达式用来匹配XML数据行里的字段。下面详细说一下第三行data字段获取。 “字段”标签页里的XPath表达式支持一种非标准化的称为token的扩展形式。token用来参数化XPath表达式，它可以把字段值绑定到XPath表达式里。本例中data字段的XPath是../../info/infodata[@user=@_user-]/data。../代表的返回上一层，当前路径是/rows/row/parameter，因此对应的绝对路经为/rows/info。infodata[@user=@_user-]这一段指的是infodata目录下满足条件的用户，也就是token的作用所在。@user所引用的是infodata节点的user属性值，表达式@_user-就是token，这个token包括一个@符号，一条下划线，然后是字段名user，最后是一个短横线。可以看到token的功能和数据库中表join相似，user1的用户名密码等属性没有和data数据在一个读取路径下，那么通过token我们就可以像表一样给它们连接起来，得到user1的数值data1。 如果要使用token，需要选中“内容”标签页里的“使用标记”复选框。另外使用token有以下几个限制： • XML文档中被引用的节点（<infodata>）必须出现在引用它的节点（<user>）之前。 • token里使用的字段（本例的user）必须出现在使用token的字段（本例的data）之前。 • token语法只对“字段”标签页中的XPath表达式有效，不能用于“内容”标签页中的XPath表达式。 本例中的第二个步骤是“表输出”，只要连接到目标数据库表，勾选“指定数据库字段”选项，然后在“数据库字段”标签页定义表字段与流字段的关系如下： username user pass password info data xmlfile filename rn rn 保存并成功执行转换后，表t_xml数据如下： mysql> select * from test.t_xml; +------+----------+-------+-------+----------------------------------+ | rn | username | pass | info | xmlfile | +------+----------+-------+-------+----------------------------------+ | 1 | user1 | pass1 | data1 | /root/kettle_hadoop/5/sample.xml | | 2 | user2 | pass2 | data2 | /root/kettle_hadoop/5/sample.xml | | 3 | user3 | pass3 | NULL | /root/kettle_hadoop/5/sample.xml | +------+----------+-------+-------+----------------------------------+ 3 rows in set (0.00 sec) 图5-10所示的转换执行一个反向的过程，读取数据库表数据，然后用数据生成XML节点。“表输入”步骤连接的数据库表就是上个转换所装载的t_xml。 “Add XML”步骤用于生成XML节点。对输入流里的每一行，该步骤会添加一个包含XML字符串的新字段，并把这一行发送到下一个步骤中。在配置对话框里有“内容”和“字段”两个标签页，可以设置生成的XML节点的名称、属性、内容等。本例中的内容标签页各选项值如下： • 编码：UTF-8 • Output Value：xmlvaluename • 根XML元素：ROW • Omit XML header：勾选 • Omit null values from XML result：勾掉 “内容”这个标签名字有一点令人迷惑，它实际用于设置生成的XML节点的属性，而不是它的内容。“编码”下拉列表用来指定一个编码（默认UTF-8）。“Output Value”属性设置保存XML节点的字段名。“根XML元素”属性设置XML节点的名称。注意，节点名称目前是一个字符串常量，不能指定一个字段来动态设置节点名称。“Omit XML header”复选框用来只生成XML片段，以后合并到其它XML文档中。对于最外层的节点来说，一定要清除这个选项，以便生成带有XML定义的XML文档。“Omit null values from XML result”复选框可以用来控制对NULL的展现，是对文档内容的设置。 “字段”标签页用来控制如何使用输入流字段生成XML文档的内容或属性。可以通过点击“获取字段”按钮，自动得到从前面的步骤输出的字段，本例中为表t_xml的 rn、username、pass、info四个字段，如图5-11所示。 输入流字段可以通过四种方式来构成XML文档。 • 生成“根XML元素”的子节点，把字段内容作为子节点的内容。表格中的“Element name”用来设置节点名。 • 生成“根XML元素”的属性，把字段内容作为属性的内容。这种方式需要把表格里的“属性”列设置为Y，并把“Attribute parent name”列留空。 • 把字段内容作为“根XML元素”的文本内容。这种方式的配置和上面的第一种方式的配置非常类似。唯一的不同之处是必须使用“根XML元素”的名字作为节点的名字。尽管配置变化不大，最后效果相差却很大：不会生成子节点，字段的值作为“根XML元素”节点的内容。 • 生成“根XML元素”的子节点，把字段内容作为子节点的属性。这种方式的配置和第二种方式类似。不同之处就是需要在“Attribute parent name”列中输入要设置的节点的名字。 如果字段中有NULL值，默认情况下会产生一个空节点或属性值。可以选中“内容”标签页中的“Omit null values from XML result”选项来忽略这样的节点或属性值。 执行转换后，xmlvaluename字段的值如下，可以点击“Add XML”步骤右键菜单中的Preview菜单项来查看。 <Row><rn>1</rn><username>user1</username><pass>pass1</pass><info>data1</info></Row> <Row><rn>2</rn><username>user2</username><pass>pass2</pass><info>data2</info></Row> <Row><rn>3</rn><username>user3</username><pass>pass3</pass><info></info></Row> ## 2. 数据库抽取 本节讨论如何从传统关系型数据库抽取数据，从“表输入”步骤开始，用示例解释这个步骤里的参数和变量如何工作。源数据表就用处理文本文件时创建的t_txt表。“表输入”步骤的功能实际上是向所连接的数据库发送select查询语句，并将查询结果返回到输出流中。 可以有两种参数化的查询方法：使用参数和使用变量替换。使用参数的方法需要在“表输入”步骤前面有一个步骤，用来给“表输入”步骤提供一个或多个参数，这些参数替换“表输入”步骤的SQL语句里的问号。这种方法的配置窗口如图5-12所示。 这个例子中的“自定义常量数据”步骤定义了两个常量a和b，数据类型分别是String和Date，两个常量的数据这就是后面“表输入”步骤查询语句中替换两个问号的数据。例如在“自定义常量数据”步骤的“数据”标签页中给常量a和b分别赋值‘a’和‘2020-02-02’，则转换执行时，“表输入”步骤的查询语句实际为： SELECT c1 , c2 , c3 , c4 FROM t_txt where c2 like concat('%','a','%') and c3 >='2020-02-02'; 点击“表输入”步骤右键的Preview菜单项预览数据，显示如下： 21 2a,aa 2020/02/02 01:01:01.000000000 zip:file:///root/kettle_hadoop/5/a.zip!/2.txt 31 3a,aa 2020/03/03 01:01:01.000000000 zip:file:///root/kettle_hadoop/5/b.zip!/3.txt 41 4a,aa 2020/04/04 01:01:01.000000000 zip:file:///root/kettle_hadoop/5/b.zip!/4.txt “表输入”步骤中的主要选项含义如下。 • 允许简易转换：选中此选项后，在可能情况下避免转换进行不必要的数据类型转换，可以显著提高性能。 • 替换SQL语句里的变量：选择此选项可替换脚本中的变量。此特性提供了使用变量替换的测试功能。 • 从步骤插入数据：选择提供替换SQL语句中问号参数数据的步骤。 • 执行每一行：选择此选项可对每一输入行执行查询。 • 记录数量限制：指定要从数据库中读取的行数，缺省值0表示读取所有行。 本例的“自定义常量数据”步骤只用来演示，实际使用中，最好用其它步骤替换它。在本篇后面的CDC部分能看到一个类似的例子。 第二种参数化查询方法是使用变量，变量要在使用变量的转换之前的转换中进行设置。设置变量的转换如图5-13所示，设置变量的转换往往是作业里的第一个转换。 两个变量var_c2和var_c3的值来自前面的“自定义常量数据”步骤里a和b定义的值。在后面转换的“表输入”步骤中可以使用这些变量，查询里的变量名被变量的值替换。使用变量的表输入步骤如图5-14所示。 为了查看转换的执行结果，使用“文本文件输出”步骤将表输入步骤的查询结果写入一个文本文件。上面两个转换都在一个作业里，图5-15显示了这两个转换，第一个转换时设置变量，第二个转换使用变量作为表输入步骤的参数。 本例中常量a和b的值分别为‘b’和‘2020-01-01’。执行作业后，生成的文本文件内容如下： [root@localhost 5]# more file.txt 12 1b,bb 2020-01-01 02:02:02 zip:file:///root/kettle_hadoop/5/a.zip!/1.txt 22 2b,bb 2020-02-02 02:02:02 zip:file:///root/kettle_hadoop/5/a.zip!/2.txt 32 3b,bb 2020-03-03 02:02:02 zip:file:///root/kettle_hadoop/5/b.zip!/3.txt 42 4b,bb 2020-04-04 02:02:02 zip:file:///root/kettle_hadoop/5/b.zip!/4.txt # 二、变化数据捕获 抽取数据是ETL处理过程的第一个步骤，也是数据仓库中最重要和最具有挑战性的部分，适当的数据抽取是成功建立数据仓库的关键。从源抽取数据导入数据仓库或过渡区有两种方式，可以从源把数据抓取出来（拉），也可以请求源把数据发送（推）到数据仓库。影响选择数据抽取方式的一个重要因素是操作型系统的可用性和数据量，这是抽取整个数据还是仅仅抽取自最后一次抽取以来的变化数据的基础。我们考虑以下两个问题： • 需要抽取哪部分源数据加载到数据仓库？有两种可选方式，完全抽取和变化数据捕获。 • 数据抽取的方向是什么？有两种方式，拉模式，即数据仓库主动去源系统拉取数据；推模式，由源系统将自己的数据推送给数据仓库。 对于第二个问题来说，通常要改变或增加操作型业务系统的功能是非常困难的，这种困难不仅是技术上的，还有来自于业务系统用户及其开发者的阻力。理论上讲，数据仓库不应该要求对源系统做任何改造，实际上也很少由源系统推数据给数据仓库。因此对这个问题的答案比较明确，大都采用拉数据模式。下面我们着重讨论第一个问题。 如果数据量很小并且易处理，一般来说采取完全源数据抽取，就是将所有的文件记录或所有的数据库表数据抽取至数据仓库。这种方式适合基础编码类型的源数据，比如邮政编码、学历、民族等。基础编码型源数据通常是维度表的数据来源。如果源数据量很大，抽取全部数据是不可行的，那么只能抽取变化的源数据，即最后一次抽取以来发生了变化的数据。这种数据抽取模式称为变化数据捕获，简称CDC，常被用于抽取操作型系统的事务数据，比如销售订单、用户注册，或各种类型的应用日志记录等。 CDC大体可以分为两种，一种是侵入式的，另一种是非侵入式的。所谓侵入式的是指CDC操作会给源系统带来性能的影响。只要CDC操作以任何一种方式对源库执行了SQL语句，就可以认为是侵入式的CDC。常用的四种CDC方法是：基于源数据的CDC、基于触发器的CDC、基于快照的CDC、基于日志的CDC，其中前三种是侵入性的。表5-1总结了四种CDC方案的特点。  源数据 触发器 快照 日志 能区分插入/更新 否 是 是 是 周期内，检测到多次更新 否 是 否 是 能检测到删除 否 是 是 是 不具有侵入性 否 否 否 是 支持实时 否 是 否 是 不依赖数据库 是 否 是 否 表5-1 四种CDC方案比较 ## 1. 基于源数据的CDC 基于源数据的CDC要求源数据里有相关的属性字段，抽取过程可以利用这些属性字段来判断哪些数据是增量数据。最常见的属性字段有以下两种。 • 时间戳：这种方法至少需要一个更新时间戳，但最好有两个，一个插入时间戳，表示记录何时创建，一个更新时间戳，表示记录最后一次更新的时间。 • 序列：大多数数据库系统都提供自增功能。如果数据库表列被定义成自增的，就可以很容易地根据该列识别出新插入的数据。 这种方法的实现较为简单，假设表t1中有一个时间戳字段last_inserted，t2表中有一个自增序列字段id，则下面SQL语句的查询结果就是新增的数据，其中{last_load_time}和{last_load_id}分别表示ETL系统中记录的最后一次数据装载时间和最大自增序列号。 select * from t1 where last_inserted > {last_load_time}; select * from t2 where id > {last_load_id}; 通常需要建立一个额外的数据库表存储上一次更新时间或上一次抽取的最后一个序列号。实践中，一般是在一个独立的模式下或在数据过渡区里创建这个参数表。下面来看Kettle里使用时间戳方式CDC的例子。前一篇建立的ETL示例模型中，source.sales_order表的entry_date字段表示订单录入的时间。我们还需要把上一次装载时间存储在属性文件或参数表里。先使用下面的脚本在hive里的rds库中建立一个名为cdc_time的时间戳表，并设置初始数据。 use rds; drop table if exists cdc_time ; create table cdc_time ( id int, last_load date, current_load date) clustered by (id) into 1 buckets stored as orc tblproperties ('transactional'='true'); insert into table cdc_time select 1, '1971-01-01', '1971-01-01' ; 后面的Kettle转换中需要对cdc_time执行行级更新，因此该表必须分桶、使用ORC格式、设置支持事务。id字段用于分桶，不做更新操作。时间戳有last_load和current_load两个字段。之所以需要两个字段，是因为抽取到的数据可能会多于本次需要处理的数据。比如，两点执行ETL过程，则零点到两点这两个小时的数据不会在本次处理。为了确定这个截至时间点，需要给时间戳设定一个上限条件，即这里的current_load字段值。本示例的时间粒度为每天，时间戳只要保留日期部分即可，因此数据类型选为date。最开始这个两个时间戳都设置成一个早于所有业务数据的时间，当开始装载时，current_load时间戳设置为当前时间。 该表的逻辑描述如下。 1. 装载作业开始后，作业要先把current_load设置成作业的开始日期，可以通过如图5-16的“设置系统日期”转换实现。 在“获取系统信息”步骤里创建一个当前日期的字段cur_date，以及一个前一天的日期pre_date字段，然后将两个字段的数据复制分发到“插入/更新”步骤和“字段选择”步骤。“插入/更新”步骤的“更新字段”部分，用流里的字段“cur_date”去更新表里的字段“current_load”。另外还要设置“用来查询的关键字”部分，把表的“current_load”条件设置为“IS NOT NULL”即可。其含义是当“current_load”为空时执行插入，否则执行更新操作。 在“字段选择”步骤的“元数据”标签页中，修改pre_date字段的类型为“Date”，格式为“yyyy-MM-dd”。格式化的前一天日期值传递给“设置变量”步骤，该步骤将pre_date字段值定义为一个变量PRE_DATE，用于将日期拼接到上传至HDFS的文件名中。变量活动类型（作用域）为“Valid in the root job”，即调用该转换的所有作业均可使用该变量。 2. 从sales_order表里抽取数据的查询使用开始日期和结束日期，如图5-17所示的“装载销售订单表”转换所示。 “表输入”步骤获取到cdc_time表的last_load和current_load日期。“数据库连接步骤”用前一步骤获得的last_load和current_load值替换查询语句中问号标识的参数。通过比较表字段entry_date的值判断新增的数据。这里假设源系统中销售订单记录一旦入库就不再改变，或者可以忽略改变。也就是说销售订单是一个随时间变化单向追加数据的表。sales_order表中有两个关于时间的字段，order_date表示订单时间，entry_date表示订单数据实际插入表里的时间，在后面第九篇“（九）事实表技术”讨论“迟到的事实”时就会看到两个时间可能不同。那么用哪个字段作为CDC的时间戳呢？设想这样的情况，一个销售订单的订单时间是2020年1月1日，实际插入表里的时间是2020年1月2日，ETL每天0点执行，抽取前一天的数据。如果按order_date抽取数据，条件为where order_date >= '2020-01-02' AND order_date < '2020-01-03'，则2020年1月3日0点执行的ETL不会捕获到这个新增的订单数据，所以应该以entry_date作为CDC的时间戳。 最后将新增数据通过“Hadoop file output”步骤上传到rds.sales_order表对应的HDFS目录下。“文件”标签页中的“Hadoop Cluster”选择CDH631，“Folder/File”输入“/user/hive/warehouse/rds.db/sales_order/sales_order_${PRE_DATE}”，其中${PRE_DATE}引用的就是图5-16中“设置变量”步骤定义的变量。“内容”标签页指定分隔符为逗号，格式选择Unix，编码为UTF-8。“字段”标签页选择sales_order表中全部六个字段。 3. 如果转换中没有发生任何错误，要把current_load字段里的值复制到last_load字段里，用如图5-18所示的“SQL”作业项实现。如果转换中发生了错误，时间戳需要保持不变，以便后面再次执行。 将上述转换和作业项放到一个作业中，如图5-19所示。 首次作业成功执行后，hive表sales_order所对应的HDFS目录下生成了一个带有前一天日期的文件： [root@manager~]#hdfs dfs -ls /user/hive/warehouse/rds.db/sales_order/ Found 1 items -rw-r--r-- 3 root hive 5892 2020-09-28 13:38 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-24.txt [root@manager~]# rds.sales_order装载全部100条销售订单记录，rds.cdc_time的last_load和current_load均更新为当前日期： hive> use rds; OK hive> select * from sales_order; OK 1 6 2 2020-03-01 20:13:34 2020-03-01 20:13:34 3777.00 2 4 2 2020-03-03 19:07:07 2020-03-03 19:07:07 9227.00 ... 99 3 1 2020-08-29 01:20:11 2020-08-29 01:20:11 9058.00 100 1 2 2020-08-31 09:43:38 2020-08-31 09:43:38 5607.00 Time taken: 2.41 seconds, Fetched: 100 row(s) hive> select * from cdc_time; OK 1 2020-09-25 2020-09-25 hive> 基于时间戳和自增序列的方法是CDC最简单的实现方式，也是最常用的方法，但它的缺点也很明显，主要如下： • 不能区分插入和更新操作。只有当源系统包含了插入时间戳和更新时间戳两个字段，才能区别插入和更新，否则不能区分。 • 不能记录删除记录的操作。不能捕获到删除操作，除非是逻辑删除，即记录没有被真的删除，只是做了逻辑上的删除标志。 • 无法识别多次更新。如果在一次同步周期内，数据被更新了多次，只能同步最后一次更新操作，中间的更行操作将会丢失。 • 不具有实时能力。时间戳和基于序列的数据抽取一般适用于批量操作，不适合于实时场景下的数据抽取。 这种方法是具有侵入性的，如果操作型系统中没有时间戳或时间戳信息是不可用的，那么不得不通过修改源系统把时间戳包含进去，要求修改操作型系统的表包含一个新的时间戳字段。有些数据库系统可以自动维护timestamp类型的值。如在MySQL中只要如下定义，当执行insert或update操作时，所影响数据行的ts字段将会自动更新为当前时间： alter table t1 add column ts timestamp default current_timestamp on update current_timestamp; 而有些数据库系统，需要建立一个触发器，在修改一行时更新时间戳字段的值。下面是一个Oracle数据库的例子。当t1表上执行了insert或update操作时，触发器会将last_updated字段更新为当前系统时间。 alter table t1 add last_updated date; create or replace trigger trigger_on_t1_change before insert or update on t1 for each row begin :new.last_updated := sysdate; end; / 在实施这些操作前必须被源系统的拥有者所接受，并且要仔细评估对源系统产生的影响。 ## 2. 基于触发器的CDC 当执行INSERT、UPDATE、DELETE这些SQL语句时，可以激活数据库里的触发器，并执行一些动作，就是说触发器可以用来捕获变更的数据并把数据保存到中间临时表里。然后这些变更的数据再从临时表中取出，被抽取到数据仓库的过渡区里。大多数场合下，不允许向操作型数据库里添加触发器（业务数据库的变动通常都异常慎重），而且这种方法会降低系统的性能，所以此方法用的并不是很多。 作为直接在源数据库上建立触发器的替代方案，可以使用源数据库的复制功能，把源数据库上的数据复制到备库上，在备库上建立触发器以提供CDC功能。尽管这种方法看上去过程冗余，且需要额外的存储空间，但实际上这种方法非常有效，而且没有侵入性。复制是大部分数据库系统的标准功能，如MySQL、Oracle和SQL Server等都有各自的数据复制方案。 一个类似于内部触发器的例子是Oracle的物化视图日志。这种日志被物化视图用来识别改变的数据，并且这种日志对象能够被最终用户访问。一个物化视图日志可以建立在每一个需要捕获变化数据的源表上。之后任何时间在源表上对任何数据行做修改时，都有一条记录插入到物化视图日志中表示这一行被修改了。如果想使用基于触发器的CDC机制，并且源数据库是Oracle，这种物化视图日志方案是很方便的。物化视图日志依赖于触发器，但是它们提供了一个益处是，建立和维护这个变化数据捕获系统已经由Oracle自动管理了。我们甚至可以在物化视图上建立自己的触发器，每次物化视图刷新时，触发器基于刷新时间点的物化视图日志归并结果，在一些场景下（只要记录两次刷新时间点数据的差异，不需要记录两次刷新之间的历史变化）可以简化应用处理。下面是一个Oracle物化视图的例子。每条数据的变化可以查询物化视图日志表mlog$_tbl1，两个刷新时间点之间的数据差异，可以查询mv_tbl1_tri表。

-- 建立mv测试表
create table tbl1(a number,b varchar2 (20));
create unique index tbl1_pk on tbl1 (a);
alter table tbl1 add (constraint tbl1_pl primary key(a));
-- 建立mv日志，单一表聚合视图的快速刷新需要指定including new values子句
create materialized view log on tbl1 including new values;
-- 建立mv
create materialized view mv_tbl1 build immediate refresh fast
next sysdate + 1/24
as select * from tbl1;
-- 建立trigger测试表
create table mv_tbl1_tri (a number,b varchar (20),c varchar (20));
-- 建立trigger
create or replace trigger tri_mv
after delete or insert or update
on mv_tbl1
referencing new as new old as old
for each row
begin
case
when inserting then
insert into mv_tbl1_tri values (:new.a, :new.b, 'insert');
when updating then
insert into mv_tbl1_tri values (:new.a, :new.b, 'update');
when deleting then
insert into mv_tbl1_tri values (:old.a, :old.b, 'delete');
end case;
exception
when others then
raise;
end tri_mv;
/
-- 对表tbl1进行一系列增删改操作
-- ...

-- 手工刷新mv
exec dbms_mview.refresh('mv_tbl1');
-- 查看物化视图日志
select * from mlog$_tbl1; -- 检查trigger测试表 select * from mv_tbl1_tri;  ## 3. 基于快照的CDC 如果没有时间戳，也不允许使用触发器，就要使用快照表了。可以通过比较源表和快照表来获得数据变化。快照就是一次性抽取源系统中的全部数据，把这些数据装载到数据仓库的过渡区中。下次需要同步时，再从源系统中抽取全部数据，并把全部数据也放到数据仓库的过渡区中，作为这个表的第二个版本，然后再比较这两个版本的数据，从而找到变化。 有多个方法可以获得这两个版本数据的差异。假设表有两个列id和name，id是主键列。该表的第一、二个版本的快照表名为snapshot_1、snapshot_2。下面的SQL语句在主键id列上做全外链接，并根据主键比较的结果增加一个标志字段，I表示新增，U表示更新，D代表删除，N代表没有变化。外层查询过滤掉没有变化的记录。 select * from (select case when t2.id is null then 'D' when t1.id is null then 'I' when t1.name <> t2.name then 'U' else 'N' end as flag, case when t2.id is null then t1.id else t2.id end as id, t2.name from snapshot_1 t1 full outer join snapshot_2 t2 on t1.id = t2.id) a where flag <> 'N'; 当然，这样的SQL语句需要数据库支持全外链接，对于MySQL这样不支持全外链接的数据库，可以使用类似下面的SQL语句： select 'U' as flag, t2.id as id, t2.name as name from snapshot_1 t1 inner join snapshot_2 t2 on t1.id = t2.id where t1.name != t2.name union all select 'D' as flag, t1.id as id, t1.name as name from snapshot_1 t1 left join snapshot_2 t2 on t1.id = t2.id where t2.id is null union all select 'I' as flag, t2.id as id, t2.name as name from snapshot_2 t2 left join snapshot_1 t1 on t2.id = t1.id where t1.id is null; Kettle里的“合并记录”步骤能够比较两个表的差异。该步骤读取两个使用关键字排序的输入数据流，并基于数据流里的关键字比较其它字段。可以选择要比较的字段，并设置一个标志字段，作为比较结果输出字段。我们用示例模型里的source.sales_order表做个例子。 1. 先把source.sales_order表复制到另一个数据库中。 create table test.sales_order select * from source.sales_order; 2. 创建一个用于快照CDC的转换，如图5-20所示。 创建两个“表输入”步骤，一个连接source.sales_order，另一个连接test.sales_order，SQL查询语句如下： SELECT order_number , customer_number , product_code , order_date , entry_date , order_amount FROM sales_order order by order_number; 然后添加一个“合并记录”步骤，如图5-21所示。 把两个表输入步骤都连接到“合并记录”步骤，在步骤中在选择新旧数据源，设置标志字段名，该字段的值为new、changed、deleted或identical，分别标识新增、修改、删除和没有变化的记录。另外设置主键字段和需要比较的字段。 为了过滤没有发生变化的数据，在后面加一个“过滤记录”步骤，过滤条件是“flagfield=identical”，把所有没有变化的数据发送到“空操作”步骤，把新增、修改、删除的数据发送到“数据同步”步骤，该步骤可以根据标志字段自动进行新增、修改、删除等操作。“一般”和“高级”标签页的配置如图5-22所示。 根据数据流中flagfield字段的值决定要执行的插入、更新或删除操作。当目标表test.sales_order中的order_number与数据流order_number相同时，更新目标表的全部六个字段。 3. 验证转换 初始source.sales_order和test.sales_order两个表数据相同： +--------------+-----------------+--------------+---------------------+---------------------+--------------+ | order_number | customer_number | product_code | order_date | entry_date | order_amount | +--------------+-----------------+--------------+---------------------+---------------------+--------------+ ... | 98 | 2 | 1 | 2020-08-27 14:02:35 | 2020-08-27 14:02:35 | 8144.00 | | 99 | 3 | 1 | 2020-08-29 01:20:11 | 2020-08-29 01:20:11 | 9058.00 | | 100 | 1 | 2 | 2020-08-31 09:43:38 | 2020-08-31 09:43:38 | 5607.00 | +--------------+-----------------+--------------+---------------------+---------------------+--------------+ 100 rows in set (0.00 sec) 对source.sales_order数据做一些修改： insert into source.sales_order values (101,1,1,now(),now(),100); delete from source.sales_order where order_number=99; update source.sales_order set order_amount=5606 where order_number=100; 预览“合并记录”步骤的数据： order_number customer_number product_code order_date entry_date order_amount flagfield ... 98 2 1 2020/08/27 14:02:35.000000000 2020/08/27 14:02:35.000000000 8144.0 identical 99 3 1 2020/08/29 01:20:11.000000000 2020/08/29 01:20:11.000000000 9058.0 deleted 100 1 2 2020/08/31 09:43:38.000000000 2020/08/31 09:43:38.000000000 5606.0 changed 101 1 1 2020/09/24 16:53:56.000000000 2020/09/24 16:53:56.000000000 100.0 new 成功执行转换后，test.sales_order的数据已经和source.sales_order同步： +--------------+-----------------+--------------+---------------------+---------------------+--------------+ | order_number | customer_number | product_code | order_date | entry_date | order_amount | +--------------+-----------------+--------------+---------------------+---------------------+--------------+ ... | 98 | 2 | 1 | 2020-08-27 14:02:35 | 2020-08-27 14:02:35 | 8144.00 | | 100 | 1 | 2 | 2020-08-31 09:43:38 | 2020-08-31 09:43:38 | 5606.00 | | 101 | 1 | 1 | 2020-09-24 16:53:56 | 2020-09-24 16:53:56 | 100.00 | +--------------+-----------------+--------------+---------------------+---------------------+--------------+ 100 rows in set (0.00 sec) 4. 恢复原数据 insert into source.sales_order values (99,3,1,'2020-08-29 01:20:11','2020-08-29 01:20:11',9058); update source.sales_order set order_amount=5607 where order_number=100; delete from source.sales_order where order_number=101; 基于快照的CDC可以检测到插入、更新和删除的数据，这是相对于基于时间戳的CDC方案的优点。它的缺点是需要大量的存储空间来保存快照，因为比较的是两个全量数据集合。同样的原因，当表很大时，这种查询会有比较严重的性能问题。 ## 4. 基于日志的CDC 最复杂的和最没有侵入性的CDC方法是基于日志的方式。数据库会把每个插入、更新、删除操作记录到日志里。如使用MySQL数据库，只要在数据库服务器中启用二进制日志（设置log_bin服务器系统变量），之后就可以实时从数据库日志中读取到所有数据库写操作，并使用这些操作来更新数据仓库中的数据。现在十分流行的canal就是基于这个原理，将自己模拟成一个从库，接收主库的二进制日志，从而捕获数据变化。 也可以手工解析二进制日志，将其转为可以理解的格式，然后再把里面的操作按照顺序读取出来。MySQL提供了一个叫做mysqlbinlog的日志读取工具。这个工具可以把二进制的日志格式转换为可读的格式，然后就可以把这种格式的输出保存到文本文件里，或者直接把这种格式的日志应用到MySQL客户端用于数据还原操作。mysqlbinlog工具有很多命令行参数，其中最重要的一组参数可以设置开始/截止时间戳，这样能够只从日志里截取一段时间的日志。另外，日志里的每个日志项都有一个序列号，也可以用来做偏移操作。MySQL的日志提供了上述两种方式来防止CDC过程发生重复或丢失数据的情况。下面是使用mysqlbinlog的两个例子。 mysqlbinlog --start-position=120 jbms_binlog.000002 | mysql -u root -p123456 mysqlbinlog --start-date="2011-02-27 13:10:12" --stop-date="2011-02-27 13:47:21" jbms_binlog.000002 > temp/002.txt 第一条命令将jbms_binlog.000002文件中从120偏移量以后的操作应用到一个MySQL数据库中。第二条命令将jbms_binlog.000002文件中一段时间的操作格式化输出到一个文本文件中。 其它数据库也有类似的方法，下面再来看一个使用Oracle日志分析的实例。有个项目提出的需求是这样的：部署两个相同的Oracle数据库A、B，两个库之间没有网络连接，要定期把A库里的数据复制到B库。要求：1. 应用程序不做修改。2. 实现增量数据更新，并且不允许重复数据导入。 Oracle提供了DBMS_LOGMNR系统包可以分析归档日志。我们只要将A库的归档日志文件通过离线介质拷贝到B库中，再在B库上使用DBMS_LOGMNR解析归档日志，最后将格式化后的输出应用于B库。使用DBMS_LOGMNR分析归档日志并redo变化的方案如下： 1. A库上线前数据库需要启用归档日志。 2. 每次同步数据时对A库先执行一次日志切换，然后拷贝归档日志文件到B库，拷贝后删除A库的归档日志。 3. 在B库上使用DBMS_LOGMNR分析归档日志文件并重做变化。 因为网不通，手工拷贝文件的工作不可避免，所以可以认为上述步骤均为手工操作。第1步为上线前的数据库准备，是一次性工作；第2、3步为周期性工作。对于第3步，可以用PL/SQL脚本实现。首先在B库机器上上规划好目录，这里D:\logmine为主目录，D:\logmine\redo_log存放从A库拷贝来的归档日志文件。然后在B库上执行一次初始化对象脚本，建立一个外部表，存储归档日志文件名称。 create or replace directory logfilename_dir as 'D:\logmine\'; grant read, write on directory logfilename_dir to u1; conn user1/password1 begin excute immediate 'create table logname_ext (logfile_name varchar2(300)) organization external (type oracle_loader default directory data_dir logfilename_dir location (''log_file_name.txt''))'; exception when others then if sqlcode = -955 then -- 名称已由现有对象使用 null; else raise; end if; end; / 每次数据同步时要做的工作是： 1. 拷贝A库归档日志文件到B的D:\logmine\redo_log目录。 2. 执行D:\logmine\create_ext_table.bat。 3. 前面步骤成功执行后，删除第1步拷贝的归档日志文件。 create_ext_table.bat脚本文件内容如下： echo off dir /a-d /b /s D:\logmine\redo_log\*.log > D:\logmine\log_file_name.txt sqlplus user1/password1 @D:\logmine\create_ext_table.sql create_ext_table.sql脚本文件的内容如下： begin for x in (select logfile_name from logname_ext) loop dbms_logmnr.add_logfile(x.logfile_name); end loop; end; / execute dbms_logmnr.start_logmnr(options => dbms_logmnr.committed_data_only); begin for x in (select sql_redo from v$logmnr_contents
-- 只应用U1用户模式的数据变化，一定要按提交的SCN排序
where table_space != 'SYSTEM' and instr(sql_redo,'"U1".') > 0
order by commit_scn)
loop
execute immediate x.sql_redo;
end loop;
end;
/

exit;

使用基于数据库的日志工具也有缺陷，即只能用来处理一种特定的数据库，如果要在异构的数据库环境下使用基于日志的CDC方法，就要使用Oracle GoldenGate之类的商业软件。

# 三、使用Sqoop抽取数据

有了前面的讨论和实验，我们现在已经可以处理从源系统获取数据的各种情况。回想上一篇建立的销售订单示例，源系统的MySQL数据库中已经添加好测试数据，Hive中建立了rds数据库作为过渡区，dw库存储维度表和事实表。这里我们将使用一种新的工具将MySQL数据抽取到Hive的rds库中，它就是Sqoop。

## 1. Sqoop简介

第一代Sqoop的设计目标很简单：

• 在企业级数据仓库、关系数据库、文档系统和HDFS、HBase或Hive之间导入导出数据。
• 基于客户端的模型。
• 连接器使用厂商提供的驱动。
• 没有集中的元数据存储。
• 只有Map任务，没有Reduce任务，数据传输和转化都由Mappers提供。
• 可以使用Oozie调度和管理Sqoop作业。

Sqoop1是用Java开发的，完全客户端驱动，严重依赖于JDBC，可以使用简单的命令行命令导入导出数据。例如：

# 把MySQL中testdb.PERSON表的数据导入HDFS
sqoop import --connect jdbc:mysql://localhost/testdb --table PERSON --username test --password 123456

上面这条命令形成一系列任务：

• 生成MySQL的SQL代码。
• 执行MySQL的SQL代码。
• 生成Map作业。
• 执行Map作业。
• 数据传输到HDFS。
# 将HDFS上/user/localadmin/CLIENTS目录下的文件导出到MySQL的testdb.CLIENTS_INTG表中
sqoop export --connect jdbc:mysql://localhost/testdb --table CLIENTS_INTG --username test --password 123456 --export-dir /user/localadmin/CLIENTS

上面这条命令形成一系列任务：

• 生成Map作业。
• 执行Map作业。
• 生成MySQL的SQL代码。
• 向MySQL的testdb.CLIENTS_INTG表插入数据

Sqoop2体系结构比Sqoop1复杂得多，它被设计用来解决Sqoop1的问题，主要体现在易用性、可扩展性和安全性三个方面。

易用性
Sqoop1需要客户端的安装和配置，而Sqoop2是在服务器端安装和配置。这意味着连接器只在一个地方统一配置，由管理员角色管理，操作员角色使用。类似地，只需要在一台服务器上配置JDBC驱动和数据库连接。Sqoop2还有一个基于Web的服务：前端是命令行接口（CLI）和浏览器，后端是一个元数据知识库。用户可以通过交互式的Web接口进行导入导出，避免了错误选项和繁冗步骤。Sqoop2还在服务器端整合了Hive和HBase。Oozie通过REST API管理Sqoop任务，这样当安装一个新的Sqoop连接器后，无需在Oozie中安装它。

可扩展性
在Sqoop2中，连接器不再受限于JDBC的SQL语法，如不必指定database、table等，甚至可以定义自己使用的SQL方言。例如，Couchbase不需要指定表名，只需在充填或卸载操作时重载它。通用的功能将从连接器中抽取出来，使之只负责数据传输。在Reduce阶段实现通用功能，确保连接器可以从将来的功能性开发中受益。连接器不再需要提供与其它系统整合等下游功能，因此，连接器的开发者不再需要了解所有Sqoop支持的特性。

安全性
Sqoop1用户是通过执行sqoop命令运行Sqoop。Sqoop作业的安全性主要由是否对执行Sqoop的用户信任所决定。Sqoop2将作为基于应用的服务，通过按不同角色连接对象，支持对外部系统的安全访问。为了进一步安全，Sqoop2不再允许生成代码、请求直接访问Hive或HBase，也不对运行的作业开放访问所有客户端的权限。Sqoop2将连接作为一级对象，包含证书的连接一旦生成，可以被不同的导入导出作业多次使用。连接由管理员生成，被操作员使用，因此避免了最终用户的权限泛滥。此外，连接可以被限制只能进行某些基本操作，如导入导出，还可通过限制同一时间打开连接的总数和一个禁止连接的选项来管理资源。

当前的Sqoop2还缺少Sqoop1的某些特性，因此Cloudera的建议是，只有当Sqoop2完全满足需要的特性时才使用它，否则继续使用Sqoop1。CDH 6.3.1中只包含Sqoop1，版本为1.4.7。

## 2. 使用Sqoop抽取数据

在销售订单示例中使用Sqoop1进行数据抽取。表5-2汇总了示例中维度表和事实表用到的源数据表及其抽取模式。

 源数据表 rds库中的表 dw库中的表 抽取模式 customer customer customer_dim 整体、拉取 product product product_dim 整体、拉取 sales_order sales_order order_dim、sales_order_fact 基于时间戳的CDC、拉取

表5-2 销售订单抽取模式

对于customer、product这两个表采用整体拉取的方式抽数据。ETL通常是按一个固定的时间间隔，周期性定时执行的，因此对于整体拉取的方式而言，每次导入的数据需要覆盖上次导入的数据。Kettle作业中的“Sqoop import”作业项，可以调用Sqoop命令，从关系数据库抽取数据到HDFS或hive表。我们使用该作业项将源库中的customer、product两表数据全量覆盖导入hive表所对应的HDFS目录，而调用图5-19所示的作业，实现对sales_order表的增量数据导入。整体作业如图5-23所示。

“Sqoop import customer”作业项选项设置如图5-24所示。

源库表为MySQL的customer表，目标为CDH631集群中，hive库表rds.customer所对应的HDFS目录/user/hive/warehouse/rds.db/customer。点击“Advanced Options”，将显示所有Sqoop所支持的命令行参数。通过点击“List View”或“Command Line View”图标，参数将分别以列表或命令行形式展现。这里只需设置“delete-target-dir”参数的值为true。Sqoop import要求目标HDFS的目录为空，为了能够幂等执行作业，需要设置delete-target-dir参数。所谓幂等操作指的是其执行任意多次所产生的影响均与一次执行的影响相同。这样就能在导入失败或修复bug后可以再次执行该操作，而不用担心重复执行会对系统造成数据混乱。定义好的作业项等价于以下sqoop命令：

sqoop import --connect jdbc:mysql://node3:3306/source --delete-target-dir --password 123456 --table customer --target-dir /user/hive/warehouse/rds.db/customer --username root

下面测试增量导入。前面介绍基于时间戳的CDC时，我们已经首次执行过装载sales_order表的作业，cdc_time表的日期为'2020-09-25'。现在向MySQL源库增加两条数据：

use source;
set @customer_number := floor(1 + rand() * 6);
set @product_code := floor(1 + rand() * 2);
set @order_date := from_unixtime(unix_timestamp('2020-09-26') + rand() * (unix_timestamp('2020-09-27') - unix_timestamp('2020-09-26')));
set @amount := floor(1000 + rand() * 9000);

insert into sales_order
values (101,@customer_number,@product_code,@order_date,@order_date,@amount);

set @customer_number := floor(1 + rand() * 6);
set @product_code := floor(1 + rand() * 2);
set @order_date := from_unixtime(unix_timestamp('2020-09-27') + rand() * (unix_timestamp('2020-09-28') - unix_timestamp('2020-09-27')));
set @amount := floor(1000 + rand() * 9000);

insert into sales_order
values (102,@customer_number,@product_code,@order_date,@order_date,@amount);

commit;

上面的语句向sales_order插入了两条记录，一条是9月26日的，另一条是9月27日的：

+--------------+-----------------+--------------+---------------------+---------------------+--------------+
| order_number | customer_number | product_code | order_date          | entry_date          | order_amount |
+--------------+-----------------+--------------+---------------------+---------------------+--------------+
...
|          101 |               4 |            1 | 2020-09-26 21:51:18 | 2020-09-26 21:51:18 |      3402.00 |
|          102 |               4 |            1 | 2020-09-27 06:15:43 | 2020-09-27 06:15:43 |      6963.00 |
+--------------+-----------------+--------------+---------------------+---------------------+--------------+
102 rows in set (0.01 sec)

下面执行图5-23所示的Kettle作业。customer、product重新全量覆盖装载数据，sales_order表只装载最新的两条数据。作业成功执行后，HDFS目录/user/hive/warehouse/rds.db/sales_order/下有两个文件：

[root@manager~]#hdfs dfs -ls /user/hive/warehouse/rds.db/sales_order/
Found 2 items
-rw-r--r--   3 root hive       5892 2020-09-28 13:38 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-24.txt
-rw-r--r--   3 root hive        120 2020-09-28 15:32 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-27.txt
[root@manager~]#

rds.sales_order表数据如下：

hive> select * from rds.sales_order;
OK
1    6    2    2020-03-01 20:13:34    2020-03-01 20:13:34    3777.00
2    4    2    2020-03-03 19:07:07    2020-03-03 19:07:07    9227.00
...
101    4    1    2020-09-26 21:51:18    2020-09-26 21:51:18    3402.00
102    4    1    2020-09-27 06:15:43    2020-09-27 06:15:43    6963.00
Time taken: 3.168 seconds, Fetched: 102 row(s)
hive>

时间戳表rds.cdc_time数据也已经更新为当前日期：

hive> select * from rds.cdc_time;
OK
1    2020-09-28    2020-09-28
Time taken: 1.369 seconds, Fetched: 1 row(s)
hive>

作业的执行结果符合预期。

## 3. Sqoop优化

当使用Sqoop在关系数据库和HDFS之间传输数据时，有多个因素影响其性能。可以通过调整Sqoop命令行参数或数据库参数优化Sqoop的性能。本节简要描述这两种优化方法。

### （1）调整Sqoop命令行参数

可以调整下面的Sqoop参数优化性能。

• batch：该参数的语法是--batch，指示使用批处理模式执行底层的SQL语句。在导出数据时，该参数能够将相关的SQL语句组合在一起批量执行。也可以使用有效的API在JDBC接口中配置批处理参数。
• boundary-query：指定导入数据的范围值。当仅使用split-by参数指定的分隔列不是最优时，可以使用boundary-query参数指定任意返回两个数字列的查询。它的语法如下：--boundary-query select min(id), max(id) from <tablename>。在配置boundary-query参数时，查询语句中必须连同表名一起指定min(id)和max(id)。如果没有配置该参数，缺省时Sqoop使用select min(<split-by>), max(<split-by>) from <tablename>查询找出分隔列的边界值。
• direct：该参数的语法是--direct，指示在导入数据时使用关系数据库自带的工具（如果存在的话），如MySQL的mysqlimport。这样可以比jdbc连接的方式更为高效地将数据导入到关系数据库中。
• Dsqoop.export.records.per.statement：在导出数据时，可以将Dsqoop.export.records.per.statement参数与批处理参数结合在一起使用。该参数指示在一条insert语句插入的行数。当指定了这个参数时，Sqoop运行下面的插入语句：INSERT INTO table VALUES (...), (...), (...),...;某些情况下这可以提升近一倍的性能。
• fetch-size：导入数据时，指示每次从数据库读取的记录数。使用下面的语法：--fetch-size=<n>,其中<n>表示Sqoop每次必须取回的记录数，缺省值为1000。可以基于读取的数据量、可用的内存和带宽大小适当增加fetch-size的值。某些情况下这可以提25%的性能。
• num-mappers：该参数的语法为--num-mappers <number of map tasks>，用于指定并行数据导入的map任务数，缺省值为4。应该将该值设置成低于数据库所支持的最大连接数。
• split-by：该参数的语法为--split-by <column name>，指定用于Sqoop分隔工作单元的列名，不能与--autoreset-to-one-mapper选项一起使用。如果不指定列名，Sqoop基于主键列分隔工作单元。

### （2）调整数据库

为了优化关系数据库的性能，可执行下面的任务：

• 为精确调整查询，分析数据库统计信息。
• 将不同的表空间存储到不同的物理硬盘。
• 预判数据库的增长。
• 使用explain plan类似的语句调整查询语句。
• 导入导出数据时禁用外键约束。
• 导入数据前删除索引，导入完成后再重建。
• 优化JDBC URL连接参数。
• 确定使用最好的连接接口。

# 四、小结

展开全文
• 数据抽取的概念2. 数据的分类3. JSON数据概述及解析3.1 JSON数据格式3.2 解析库json3.2.1 json序列化3.2.2 json反序列化4. jsonpath4.1 使用4.2 使用示例5. Python专用JSON解析库pickle 1. 数据抽取的概念 原创...
• 今天小编就为大家分享一篇关于抽取oracle数据到mysql数据库的实现过程，小编觉得内容挺不错的，现在分享给大家，具有很好的参考价值，需要的朋友一起跟随小编来看看吧
• 其中本软件产品提供了数据抽取、数据清洗、数据转换、数据校验、数据补丁等数据操作主要功能。  数据抽取，即将源数据库表抽取到目标数据库，数据表结构不变；  数据清洗，即整理、清洗需要转换的源数据。  ...
• 数据抽取，也叫做数据拆分，它是指保留，抽取原数据表中的某些数据形成一个新的数据表，主要方法有字段拆分、记录抽取和随机抽取。 1.1字段拆分 抽取某一字段的部分信息，形成一个新的字段 1.1.1按位置...
• 数据抽取是数据集成平台中一个非常重要的功能，主要负责不同数据源和不同数据库的数据同步
• 常见的数据抽取模式有4种：全量覆盖抽取，全量追加，增量和增量滚动。 全量覆盖 例子：假设第一天源头库有100条数据，第二天源头库新增10条，源头库即110条。那么第一天抽取：目标库100条，第二天抽取会删除昨天抽取...
• 数据抽取是指从源数据源系统抽取需要的数据。实际应用中，数据源较多采用的是关系数据库。总体而言，数据抽取的常见方法有两大类，一是基于查询式的，一是基于日志的。 基于查询式的数据抽取 基于查询式的数据抽取...

...