精华内容
下载资源
问答
  • Spark DataFrame ETL教程

    2018-01-21 22:17:00
    实际上,ETL就是一个对数据进行批处理的过程,一个ETL程序就是一个批处理脚本,执行时能将一堆数据转化成我们需要的形式。 每个接触过数据批处理的工程师,都走过ETL的流程,只是没有意识到而已。按照ETL过程的框架...

    前言

    ETL是 Extract-Transform-Load的缩写,也就是抽取-转换-加载,在数据工作中是非常重要的部分。实际上,ETL就是一个对数据进行批处理的过程,一个ETL程序就是一个批处理脚本,执行时能将一堆数据转化成我们需要的形式。
    每个接触过数据批处理的工程师,都走过ETL的流程,只是没有意识到而已。按照ETL过程的框架来重新认识数据批处理,有利于我们更清晰地编写批处理脚本。
    在单机范围内的数据量下,使用python的pandas包就可以非常方便地完成数据批处理工作。但当数据量达到1G以上时,pandas处理起来就有些力不从心了,到数据量达到1T以上,只能以分块的方式存储在分布式系统上时,pandas就无能为力了。在当前的技术背景下,典型的场景就是数据存储在Hive on HDFS上。要做ETL,就需要新的工具。Hadoop生态下,原生的工具是MapReduce计算模型,通常用Java编写,比较复杂,每次计算的中间结果也需要进行磁盘存取,非常费时。Spark是一个MPP架构的计算引擎,相比MapReduce,Spark 有DataFrame(又名 Schema RDD), 以表的形式来储存数据,无论是理解还是操作,都更为简单,还支持Python,在许多需要使用函数作参数的场合,非常好用。

    本教程将介绍如何使用pyspark.sql模块,操作Spark DataFrame,从Hive中读取数据,经过一系列转换,最后存入Hive中。Spark的DataFrame和pandas的DataFrame的概念很相似,只是操作略有不同,如果读者有pandas的使用经验,很容易就能快速上手。
    教程只是为了方便读者快速入门,想要更好地开发Spark程序,仍然需要详细了解Spark的API接口,对python环境下,Hive的ETL来说,研究pyspark.sql模块下的内容就足够了,可以参考官方文档

    环境:Spark的API随版本不同会有较大变化,目前比较流行的版本是1.6和2.2,本文使用Spark 1.6.0,语言为Python 2.7。默认数据都储存在Hive中,Hadoop集群带有yarn。

    冒烟测试

    学习一门语言或者软件的第一步,永远都是冒烟测试。最经典的冒烟测试就是输出Hello World。但对ETL来说,一个打印"Hello World"的Spark程序是没什么用的。所以我们这里讲讲如何打印一张表,这张表有一行数据,列名为t,值为"Hello World"。

    Spark的核心是SparkContext,它提供了Spark程序的运行环境。而SqlContext则是由SparkContext产生,提供了对数据库表的访问接口。因为这里数据库的环境是Hive,通常使用SqlContext的派生类HiveContext。在Spark提供的交互式环境中,会在启动时自动创建环境,生成SparkContext和HiveContext的实例。在pyspark的交互式环境中,SparkContext实例名为sc,HiveContext实例名为sqlContext。

    交互式操作只在学习和调试时使用,实际工作中还是要靠命令行执行脚本。在脚本中我们就需要自己生成SparkContext和HiveContext了。基本操作代码如下:

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext,HiveContext
    sc = SparkContext(appName="Hello World") #  appName就是这个Spark程序的名字,在DEBUG时有用
    hc = HiveContext(sc)
    df = hc.createDataFrame([["Hello World"]],['t']) # 创建一个DataFrame,第一个参数是数据,一个二维列表,第二个参数是表头,一个列表)
    first_cell = df.collect()[0][0] # 取第一个单元格的值
    df.show() # 将表打印到屏幕上
    print(first_cell)

    将这段代码保存成文件hello.py,在终端中进入到该文件所在目录,输入命令spark-submit --master yarn hello.py ,然后就可以看到屏幕上输出如下,冒烟测试就算完成了。

    +-----------+
    |          t|
    +-----------+
    |Hello World|
    +-----------+
    Hello World

    指令解释:spark-submit就是spark的执行程序,master yarn是spark-submit的参数,指定yarn作为计算调度的中心。最后hello.py就是我们的ETL程序。

    Extract 抽取

    ETL的第一步就是从数据源抽取数据,在Spark中就是从Hive里读取数据。

    Hive虽然实质上是个MapReduce接口的封装,但从上层抽象模型来看,有最基本的Schema、Table和Column,还有一套类SQL语法,可以说就是一个典型的关系数据库模型,因此在ETL过程中,我们完全可以把Hive当成一个关系数据库来看待。

    抽取的常用方法由两种,一种是直接读取Hive表,一种是通过Hive QL读取。
    都需要以HiveContext的实例作为入口,结果返回一个Spark DataFrame,为了检查结果,可以使用show方法查看DataFrame的数据。

    假设我们有一个名为test 的库,里面有一张表为t1,数据结构如下:

    a b c
    1 2 3
    4 5 6
    7 8 9

    直接读取Hive表

    HiveContext对读取操作提供统一的接口- DataFrameReader,HiveContext的实例的read属性就可以获取到这个接口。
    当然,这个接口也能用来读取Hive的数据,read.table就可获取到表的数据,代码如下

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    sc = SparkContext(appName="extract")
    hc = HiveContext(sc) # 生成HiveContext实例
    t =hc.read.table("test.t1")
    t.show() 

    Hive QL读取

    实质是让HiveContext将HiveQL传给Hive,让Hive执行后,将查询结果封装成Spark DataFrame返回。在处理过程比较简单,或者需要大量设置别名时,比较有用(因为Spark批量设置别名不太方便),但不推荐写太过复杂的Hive QL,因为Hive 执行Hive QL的实质是把Hive QL转成MapReduce执行,在计算效率上是不如Spark的。

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    sc = SparkContext(appName="extract")
    hc = HiveContext(sc)
    hc.sql("use test") 
    t = hc.sql("select * from t1")
    t.show() 

    Load 加载

    为什么不先讲Trasform呢?因为Trasform的操作很多,先讲Load有助于快速上手完成一个初级的ETL程序。
    类似于读取,HiveContext也提供了统一的写接口,名为DataFrameWriter.调用write属性即可获取。

    写入的具体方式也很多,不过为了快速上手,只讲最关键的一些东西。

    mode 写入方式

    如果表已经存在,该如何操作。

    • append 追加: 在尾部追加数据
    • overwrite 覆写: 覆盖原有数据
    • error 错误: 抛出异常
    • ignore忽略 : 自动跳过

    因为Hive on HDFS的关系,更新表最快的方式是全表覆写。对于需要更新原有的ETL,一般都是全表重写,只需要追加的,就可以用追加。

    format 文件格式

    在Hive on HDFS中,数据实质上是以文件的形式保存的。不同的文件格式,在压缩容量、支持数据类型和查询速度上都有所不同。textfile,avro,sequence,parquet,json等。目前我常用的格式是text和parquet,如果不设置文件格式,默认会使用Hive表的文件格式,如果Hive表不存在,则使用Hive表的默认格式textfile

    加载新表

    了解了上面的操作之后,我们就可以开始写加载部分的代码了,只需要使用一个saveAsTable方法就行了,非常简单。

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    sc = SparkContext(appName="load")
    hc = HiveContext(sc)
    hc.sql("use test") 
    t1 = hc.sql("select a as a1,b as b1,c as c1 from t1")
    t1.write.saveAsTable("test.t2",format="parquet",mode="overwrite") # 将t1的三个列改名后存成t2表
    t2.read.table("test.t2")
    t2.show()

    转换

    转换是ETL过程中最复杂的部分,去掉抽取和加载,剩下的全都是转换,包含的内容是非常多的,常见的有筛选、聚合、多列合并或计算,列赋值,根据不同的需要有不同的处理方法。由于Spark的转换操作较为啰嗦,所以推荐把部分简单的操作通过Hive QL的方式,在抽取步骤中交由Hive完成,这样有助于精简代码,提高可读性,降低维度难度。
    下面就讲一讲Spark DataFrame 转换部分的基本概念和操作。

    向量化编程

    对于日常用Java来做数据批处理的工程师来说,可能更习惯用for循环来逐条处理数据。但这样做在操作上是很不方便的,也不太利于阅读理解。在科学计算的语境下,数据总是以DataFrame的形式储存,也就是一张表。数据处理操作通常是对这张表的某些行或者某些列来进行处理。比如,“令t1表的a列中数字大于2的值的,全部都等于2”,或者“给t1表新加一常数列d,值为99”,这样的操作在向量化编程的语境下,就是一个调用API接口的操作,比for循环容易被理解。
    可以类比pandas。在pandas中,也主要是通过向量化编程的方式来处理数据,虽然提供了迭代器的接口,可以一行行地读取数据,但一般以表作为修改对象的操作,主要是以API接口来完成,不推荐使用迭代器来做行级修改。一来操作不方便,二来运算速度未必能比优化过的API接口快。
    Spark是分布式执行的,数据分散在各个机器上,背后有一套调度系统来控制数据计算负载。如果用for循环来处理,就是把负载都加在了执行脚本的机器上,一般来说执行脚本的机器都是不储存数据的master,实际上这一过程就会导致需要把数据从slave传到master上,无谓地增加了网络负担。所以,在Spark脚本里,严禁使用原生的python for循环来处理SparkData Frame,即使要用,也应该使用Spark提供的API接口。

    基本操作对象

    在Spark DataFrame语境下,操作对象主要有三个:DataFrame,Row,Column。

    • DataFrame: DataFrame就是一张表,有表头和若干行数据。这张表是一个有序、可迭代的集合。
    • Row:DataFrame 集合中的元素就是Row。每个Row储存一行数据,有相同的属性,这些属性和表头同名。DataFrame没有API接口可以直接获取到某个Row,但可以通过Colect方法获取到Row对象的list,再从中获取指定的Row。
    • Column:Column与数据的实际结构无关,是一个操作上的概念。在实际的转换操作中,绝大多数都是对若干列进行数学运算、拼接、映射等等。取DataFrame中的一列,得到的就是一个Column对象。

    事实上,最常用的主要是DataFrame和Column,Row很少用到。其中,DataFrame是核心,一个ETl过程,实质就是从抽取一个DataFrame开始,经过一系列的DataFrame变换,得到一个与目标一致的DataFrame,然后写入到目标数据库中去。Column在其中扮演着中间点的角色,比如取DataFrame的多个列,拼接合成一个新列,然后把这个新列加到原本的DataFrame中去。

    基本操作分类

    上面提到了,DataFrame是核心操作对象。其实在Spark中,真正意义上的核心操作对象是RDD,一个有序的,分布式储存在内存中的操作对象。DataFrame就是一个特殊的RDD——Schema RDD。所有的DataFrame操作,都可以归类为两种基本操作:转化(Transformation)和行动(action)。转换操作是不会触发Spark的实际计算的,即使转换过程中出现了错误,在执行到这一行代码时,也不会报错。直到执行了行动操作之后,才会真正让Spark执行计算,这时候才会抛出在转化过程中出现的错误。这在DEBU时,尤其是交互式编程环境下,可能会导致问题代码定位错误,需要特别注意。

    • Transform:典型的转换操作有读(read),筛选(filter)、拼接(union)等等,只要这个过程只改变DataFrame的形态,而不需要实际取出DataFrame的数据进行计算,都属于转换。理论上来说,ETL过程中的Transfrom过程,主干流程只会有转换操作,不会有Action操作。
    • Action:典型的动作操作有计数(count),打印表(show),写(write)等,这些操作都需要真正地取出数据,就会触发Spark的计算。

    筛选

    filter(cond):筛选出满足条件cond的行。cond可以填字符串,格式和SQL中的where子句一样,也可以填Bool类型的Column对象,比如 df['a']>1。

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
    t1 = df.filter("a > 1 and c < 9")
    t1.show() # 输出 4,5,6 这一行
    t2 = df.filter( (df['b']<5) & (df['c']<8)) # 可以使用&或|对两个bool列进行逻辑运算,但必须要用圆括号括起,限定运算顺序。
    t2.show() # 输出 1,2,3 这一行

    赋值,加列

    withColumn(col_name,col):col_name是列名,col是列值,必须是一个Column对象。
    赋值和加列操作是相同的,col_name存在,就是赋值,否则就是加列。

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
    t1 = df.withColumn("c",df['c']+1)
    t1.show() # c的值全都增加了1
    t2 = df.withColumn("d",df['a']+1)
    t2.show() # 增加了新一列d

    删除列

    drop(col_name):col_name为列名。该方法会返回一个删除col_name列的DataFrame

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
    t = df.drop("c")
    t.show() # 只有 a,b两列

    给列取名

    alias(col_name):通常和select配合使用,请看下面的例子

    选取列

    select(*cols):cols为列名或列对象。
    赋值和删除操作,每次只能改加减一列数据,如果想要批量地改变,尤其是调整列顺序的时候,就非常有用了。在ETL中,当需要计算的列很多时,通常就是逐个计算出不同的列对象,最后用select把它们排好顺序。

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
    a1 = (df['a']+1).alias("a1") # 新增一个列对象,取名为a1
    t = df.select("a",a1,"b") # 如果用字符串,必须是df中存在的列名。
    t.show() # 显示a, a_1,b 三列

    生成Column对象

    在赋值的例子里,Column对象是由原DataFrame的Column经过简单的数学运算或逻辑运算得到的,但如果我们想生成一些更特殊的Column呢?比如常数列或者自己定义复杂的规则。
    Spark提供了pyspark.sql.functions,含有丰富的接口,其中就有我们需要的东西。篇幅有限,只能介绍一些常用的,更多的还是需要去看官方文档。

    常数列

    lit(value):value数必须是必须为pyspark.sql.types支持的类型,比如int,double,string,datetime等

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    from pyspark.sql.functions import lit
    from datetime import datetime
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
    t = df.withColumn("constant",lit(datetime(2018,1,1,2,3,4,999)))
    t.show(truncate=False)

    取整

    round、floor:和Python的标准函数用法一致,只是数字换成列名

    条件分支

    when(cond,value):符合cond就取value,value可以是常数也可以是一个列对象,连续可以接when构成多分支
    otherwise(value):接在when后使用,所有不满足when的行都会取value,若不接这一项,则取Null。

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    from pyspark.sql.functions import when
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
    t = df.withColumn("when",when(df['a']==1,"a=1").when(df['b']==5,df['b']%5).otherwise("other"))
    t.show() # 生成when列,值分别为 a=1,0,other

    日期和时间

    current_date():当前日期,返回一个date列
    current_timestamp():当前时刻,返回一个timestamp列
    date_add(start, days):日期正向偏移,start为开始时间,必须是Column或字符串对象,指向一个date或timestamp列,days为偏移天数。
    date_sub(start, days):类似date_add,但是负向偏移。
    date_format(date, format): 日期格式化,date为要格式化的时间,必须是Column或字符串对象,指向一个date或timestamp列,days为偏移天数,format为格式化的字符串,具体参考Hive QL的date_format函数。
    datediff(end, start):计算天数差

    自定义规则

    udf(f, returnType=StringType): 自定义处理函数,f为自定义的处理函数,returnType为f的返回类型,必须为pyspark.sql.types支持的类型,如果不填,会默认自动转化为String类型。udf会返回一个函数,可以当做列函数使用。
    这在处理逻辑非常复杂时很有用。比如对身份证号进行校验计算,然后取出有效的身份证号的第1,4,10位,这个复杂流程很难用Spark提供的API拼接起来,只能自己写。
    作为教程,就不写太复杂的函数了。
    自定义函数f的传入参数为列的值。

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
    def f(a,b,c):
        r=0
        if a==1:
            r=1
        elif b==5:
            r=2
        return r
    
    col_match = udf(f,IntegerType())
    t = df.withColumn("col_match",col_match("a","b","c"))
    t.show() # 生成col_match列,值分别为 a=1,2,0

    排序

    Spark支持多字段,升降序排序。
    可以使用orderBy和sort,因为操作比较简单也符合直觉,这里略去例子,详情可以看文档。

    聚合

    Spark 支持直接聚合,也支持分组聚合。聚合的表达方式非常多,这里仅选取常用的。

    直接聚合

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    from pyspark.sql.functions import sum
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],['a','b','c'])
    t = df.agg(sum("a"))
    print(t.collect()[0][0]) # 打印 12

    分组聚合

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    from pyspark.sql.functions import sum,max
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],['a','b','c'])
    t = df.groupBy("b").agg(sum("a"),max("c"))
    t.show() 

    输出:

    +---+------+------+
    |  b|sum(a)|max(c)|
    +---+------+------+
    |  1|     5|     3|
    |  2|     7|     9|
    +---+------+------+

    窗口函数

    有一类分析需求,是需要分组计算,但保持数据的粒度不变的。比如通过成绩表,按班计算的学生的成绩排名,加一列到原本的成绩表中,整个表的每一行仍然表示一名学生。这种分析需求称为窗口分析,比如说每个班,就是一个窗口,在这个窗口中,计算出班级成绩排名,再并到原表中。
    这种分析,首先要创建一个窗口,然后再使用窗口函数来进行计算。Spark提供了丰富的窗口函数,可以满足各类分析需求。

    创建窗口

    使用pyspark.sql.Window对象可以创建一个窗口,最简单的窗口可以什么都没有,但一般不推荐这样做。可以使用partitionBy进行分组,使用orderBy进行排序,比如

    from pyspark.sql import Window
    window = Window.partitionBy("a").orderBy("b")

    窗口函数使用示例

    rank():根据窗口中partitionBy进行分组,以orderBy排序

    # -*- coding: UTF-8 -*-
    from pyspark import SparkContext, HiveContext
    from pyspark.sql.functions import rank,desc
    from pyspark.sql import Window
    sc = SparkContext(appName="transform")
    hc = HiveContext(sc)
    score = [
        ['a','a_1',90],
        ['a','a_2',80],
        ['a','a_3',85],
        ['b','b_1',70],
        ['b','b_2',80],
        ['b','b_3',75],
        ['c','c_1',90]
    ]
    df = hc.createDataFrame(score,['class','student','score'])
    class_window = Window.partitionBy("class").orderBy(desc("score")) #降序排列
    class_rank = rank().over(class_window) 
    class_row_number = row_number().over(class_window) #窗口函数(xxx).over(window),就是一般的用法
    t = df.withColumn("rank",class_rank)
    t.show() 

    按班级,分数从高到低,生成排名

    +-----+-------+-----+----+
    |class|student|score|rank|
    +-----+-------+-----+----+
    |    a|    a_1|   90|   1|
    |    a|    a_3|   85|   2|
    |    a|    a_2|   80|   3|
    |    b|    b_2|   80|   1|
    |    b|    b_3|   75|   2|
    |    b|    b_1|   70|   3|
    |    c|    c_1|   90|   1|
    +-----+-------+-----+----+

    缓存

    在实际业务中,常常会碰到这种需求:需要把一个计算结果,稍加不同的改动,分别存为不同的表。比如,ETL中为了保证出错后能重试,就会要求除了保存转换计算结果之外,还要备份一份到备份表里。备份表通常是按天分区的,每个区存当天的转换计算结果。而应用表则不分区,只存最新一天的计算结果。
    在完成这一需求时,如果是先保存应用表,然后再添加分区列后添加到分区表,就会触发两次完整的计算流程,时间翻倍。而如果有缓存,就不一样了。我们可以在计算到最终结果时,缓存一下这张表,然后把它保存为应用表,再添加分区列保存为分区表。那么,实际计算中,到缓存操作为止的计算,只会触发一次,实际消耗时间为1次到最终结果的计算+1次加分区列,远小于2次计算的时间。当某些中间结果需要反复使用时,缓存可以给我们带来极大的效率提升。当然,相应地,内存也会占用更多,还是应该根据具体情况决定如何取舍。缓存的方法很简单,只要让DataFrame对象执行cache方法就行了:df.cache()

    转载于:https://www.cnblogs.com/longfei-aot/p/8325843.html

    展开全文
  • 动手教程(Hands-on Tutorials) I recently migrated data between 2 different apps which each had their own database.我最近在2个不同的应用程序之间迁移了数据,每个应用程序都有各自的数据库。 One app was ...

    动手教程(Hands-on Tutorials)

    I recently migrated data between 2 different apps which each had their own database.

    我最近在2个不同的应用程序之间迁移了数据,每个应用程序都有各自的数据库。

    One app was written in Ruby on Rails and consumed Github’s job APIs. The other was a production Django app with a frontend that displayed the data.

    一个应用程序是用Ruby on Rails编写的,并使用了Github的工作API。 另一个是带有前端的Django生产应用程序,用于显示数据。

    Here is a quick tutorial on how I migrated that data using Ruby’s Kiba ETL library.

    这是有关如何使用Ruby的Kiba ETL库迁移数据的快速教程。

    In the process you’ll learn to write ETL pipelines with Kiba.

    在此过程中,您将学习如何使用Kiba编写ETL管道。

    We’ll grab job data from Github’s jobs API, save it into a database, and then use Kiba to transfer it to another database. In-between, we’ll parse the original JSON into multiple fields.

    我们将从Github的jobs API中获取工作数据,将其保存到数据库中,然后使用Kiba将其传输到另一个数据库中。 在这两者之间,我们将原始JSON解析为多个字段。

    Here we go!

    开始了!

    创建数据库 (Create the databases)

    Lets create source and destination databases from the command line.

    让我们从命令行创建源数据库和目标数据库。

    $ psql -d template1;
    $ create database source_db;
    $ create database destination_db;
    $ \q

    Note: \q exits the SQL terminal.

    注意: \q退出SQL终端。

    Now open your favorite SQL editor (I’m using Dbeaver) and create a table in each.

    现在打开您喜欢SQL编辑器(我正在使用Dbeaver )并在每个表中创建一个表。

    In the source database, run.

    在源数据库中,运行。

    CREATE TABLE jobs(
    id serial PRIMARY KEY,
    data json
    );

    In the destination database, run.

    在目标数据库中,运行。

    CREATE TABLE jobs(
    id serial PRIMARY KEY,
    title text,
    company text
    );

    Now we’re setup to insert Github job data into the source database!

    现在,我们准备将Github作业数据插入源数据库!

    将Github数据保存到源数据库 (Save Github data into source db)

    First create a directory called /etl, in which we’ll create all our files. Then cd into it.

    首先创建一个目录/etl ,在其中创建所有文件。 然后cd进去。

    Inside, create a Gemfile.

    在内部,创建一个Gemfile

    source 'https://rubygems.org'
    gem 'kiba'
    gem 'rake', '~> 11.2', '>= 11.2.2'
    gem 'faraday'

    And install those libraries with bundle install.

    并使用bundle install安装这些库。

    Then create a file called api.rb and copy/paste the following code.

    然后创建一个名为api.rb的文件并复制/粘贴以下代码。

    require 'pg'
    require 'faraday'class Apidef self.retrieve
    # Request data
    response = ::Faraday.get('https://jobs.github.com/positions.json?') # Open connection to db
    conn = PG::Connection.open(dbname: 'source_db') # Parse response
    body = JSON.parse(response.body) # Iterate on each job in the response and save to the local db
    body.each do |job|
    conn.exec('insert into jobs (data) values ($1);',
    [ job.to_json ])
    end end
    endApi.retrieve

    This requests job data from Github and drops it into our source database.

    这将从Github请求作业数据并将其放入我们的源数据库中。

    Run it.

    运行。

    $ ruby api.rb

    And check in your database editor to ensure it was properly inserted.

    并签入数据库编辑器以确保已正确插入它。

    Image for post

    设置木场 (Setup Kiba)

    We’re going to require a couple files here, inside /etl.

    我们将在/etl内需要几个文件。

    $ touch Rakefile
    $ touch common.rb

    In Rakefile, copy/paste.

    Rakefile ,复制/粘贴。

    require 'kiba'
    require_relative 'common'task :kiba_run do
    puts "Starting..." Kiba.run(
    Kiba.parse do
    source Source, db_name: 'source_db'
    destination Destination, db_name: 'destination_db'
    end
    )
    puts "...Done"
    end

    The Rakefile calls the code that we’re going to write in a second in our common.rb file.

    Rakefile调用了我们将在秒钟common.rb文件中编写的代码。

    And in common.rb, copy/paste.

    common.rb ,复制/粘贴。

    require 'pg'class Source
    def initialize(db_name:)
    @conn = PG::Connection.open(dbname: db_name)
    end def each
    rows = @conn.exec('select * from jobs') rows.each do |row|
    yield(row['data'])
    end

    @conn.close
    end
    end
    class Destination
    def initialize(db_name:)
    @conn = PG::Connection.open(dbname: db_name)
    end def write(data)
    data = JSON.parse(data) # We insert every row as it's own transaction. Not efficient!
    @conn.exec(
    'insert into jobs (title, company, location) values ($1, $2, $3);',
    [ data['title'], data['company'], data['location'] ])
    end def close
    @conn.close
    end
    end

    Per the Kiba API, the Source class defines how we load data from the source database. And the Destination class defines how we insert it into our destination database.

    根据Kiba API, Source类定义了如何从源数据库加载数据。 Destination类定义了如何将其插入到目标数据库中。

    Now run Kiba to migrate the data between databases.

    现在运行Kiba在数据库之间迁移数据。

    $ rake kiba_run

    And then inspect the destination database to ensure it worked.

    然后检查目标数据库以确保其正常工作。

    Image for post

    Booya! It worked!

    ya! 有效!

    结论 (Conclusion)

    Migrating data between databases is a given once your app/startup reaches any significant size.

    一旦您的应用程序/启动达到任何重要大小,就可以在数据库之间迁移数据。

    Often this will be from a production db to a data warehouse, but it could also be migrating data between the databases of different microservices.

    通常这是从生产数据库到数据仓库,但也可能是在不同微服务的数据库之间迁移数据。

    Either way, it’s good to be familiar with some basic technology for accomplishing this.

    无论哪种方式,最好都熟悉一些基本的技术来完成此操作。

    In all honesty, Ruby isn’t great for data science or data engineering, but Kiba is an easy-to-use and reliable tool if your team only writes Ruby, or your data’s scale isn’t too large.

    坦白地说,Ruby不适用于数据科学或数据工程,但是如果您的团队仅编写Ruby,或者您的数据规模不太大,Kiba是一种易于使用且可靠的工具。

    翻译自: https://towardsdatascience.com/a-beginner-etl-tutorial-in-ruby-with-kiba-89d1437e0328

    展开全文
  • 入门级ETL教程,跟着教程走一遍,就会懂基本的ETL流程了。kettle是ETL过程中最基本、最好用的工具,将各种数据源整合输出你想要的指定格式的数据
  • ETL培训教程

    2017-09-07 13:55:43
    ETL 顾名思义, 即数据抽取(Extract)、转换(Transform)、装载 (Load)的过程,它是构建数据仓库的重要环节。 ETL负责将分布的、异构数据源中的数据如关系数据、平面数据文件等抽取 到临时中间层后进行清洗、...
  • ETL入门教程

    千次阅读 2019-01-29 09:55:40
    ETL详解 1.1 ETL https://www.cnblogs.com/yjd_hycf_space/p/7772722.html ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程。 ETL的设计分三部分:数据抽取、数据的清洗转换、数据的加载...

    来源:我是码农,转载请保留出处和链接!

    本文链接:http://www.54manong.com/?id=1214

    1 ETL详解

    1.1 ETL

    https://www.cnblogs.com/yjd_hycf_space/p/7772722.html

    ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程。

    ETL的设计分三部分:数据抽取、数据的清洗转换、数据的加载。

    1.1.1 ElasticSearch

    全文搜索引擎:http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html

    1.1.2 Kibana

    通过Kibana,能够对Elasticsearch中的数据进行可视化并在Elastic Stack进行操作。

    Kibana核心产品搭载了一批经典功能:柱状图、线状图、饼图、旭日图等。

    https://www.elastic.co/cn/products/kibana

    Kibana是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索引中的数据。使用Kibana,可以通过各种图表进行高级数据分析及展示

    1.1.3 Logstash

    https://www.elastic.co/cn/products/logstash

    Logstash是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据存储到数据库。

    Logstash支持各种输入选择,可以在同一时间从众多常用来源捕捉时间。能够以连续的流式传输方式,轻松地从日志、指标、Web应用、数据存储以及各种AWS服务采集数据。

    Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

    2 ETL启动报错

    2.0.1 启动检查没有通过

    修改/etc/security/limits.conf

    修改/etc/security/limits.d/90-nproc.conf

    3 ETL实例

    3.1 elasticsearch常用请求

    3.1.1 查看索引目录(装有es的机器上执行)

    curl "localhost:9200/_cat/indices?v"

    3.1.2 创建customer索引

    curl -XPUT "localhost:9200/customer?pretty"

    3.1.3 给customer索引创建文档

    curl -XPUT "localhost:9200/customer/external/1?pretty" -d '{"name":"TEST"}'

    3.1.4 在索引上查找文档

    curl -XGET "localhost:9200/customer/external/1?pretty"

    3.1.5 修改索引下的文档

    curl -XPOST "localhost:9200/customer/external/1/_update?pretty" -d ' { "doc": { "name": "Lenovo","Location":"Beijing" }}'

    3.2 ELK日志采集

    ELK+syslog+nginx访问日志收集+分词处理

    http://blog.51cto.com/lrtao2010/1949334

    https://www.2cto.com/kf/201610/560348.html

    https://blog.csdn.net/qq_22211217/article/details/80764568

    3.2.1 控制台输入,控制台输出测试

    编辑配置文件

    将集群中的logstash停止,然后通过命令启动并指定刚创建的配置文件。

    logstash -f test.conf

    3.2.2 集群中修改配置文件logstash-data-source

    添加配置组

    添加配置如下

    检测/var/log/ambary-server/ambary-server.log日志文件

    查看ElasticSearch索引

    curl "node18.sleap.com:9200/_cat/indices?v"

    查看kibana展示

    http://node16.sleap.com:5601

    3.2.3 问题

    logstash配置文件如何编写(过滤部分)

    http://www.cnblogs.com/yincheng/p/logstash.html

    kibana如何使用

    https://www.elastic.co/cn/products/kibana

    https://www.elastic.co/guide/cn/kibana/current/dashboard.html

    ElasticSearch中数据如何查看

    https://www.yiibai.com/elasticsearch/elasticsearch-getting-start.html

    3.3 ELK与kafka整合

    参考连接

    https://sematext.com/blog/kafka-connect-elasticsearch-how-to/

    https://www.cnblogs.com/smartloli/p/6978645.html

    https://www.confluent.io/blog/the-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/

    http://www.cnblogs.com/JetpropelledSnake/p/10057545.html

    http://www.demodashi.com/demo/10181.html

    https://blog.csdn.net/qq_37502106/article/details/79262721

    3.3.1 日志处理流程

    使用java把日志传入kafka,然后通过kafka将日志发送给logstash,logstash再将日志写入elasticsearch,这样elasticsearch就有了日志数据了,最后,则使用kibana将存放在elasticsearch中的日志数据显示出来,并且可以做实时的数据图表分析等等。

    zookeeper查看leader节点:echo stat | nc node17.sleap.com 2181

    3.3.2 kafka常用命令

    https://www.cnblogs.com/xtdxs/p/7112683.html

    创建topic data1

    kafka-topics --create --zookeeper node17.sleap.com:2181 --replication-factor 2 --partitions 1 --topic data2

    查看所有topic

    kafka-topics --list --zookeeper node17.sleap.com:2181

    创建消费者消费topic data1

    kafka-console-consumer --zookeeper node17.sleap.com:2181 --topic data2 --from-beginning

    创建生产者

    kafka-console-producer --broker-list node17.sleap.com:6667 --topic data2

    3.3.3 查看ElasticSearch索引

    curl "node18.sleap.com:9200/_cat/indices?v"

    curl –XGET "node18.sleap.com:9200/kafka-logstash/_search"

    3.3.4 kafka配置参数

    https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

    https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

    https://segmentfault.com/a/1190000016595992

    logstash高低版本配置不同:(具体配置可查看对应版本的官网配置介绍)

    https://www.elastic.co/guide/en/logstash/index.html

    logstash 5

    bootstrap_servers => “node17.sleap.com:6667”

    topics => [“data1”]

    logstash 2.4

    zk_connect => “node17.sleap.com:2181”

    topic_id => “data1”

    控制台输入,控制台输出stdin-stdout.conf

    input {

      stdin {}

    }

    output {

      stdout {

        codec => rubydebug

      }

    }

    kafka输入数据到logstash

    input-kafka.conf

    input {

      kafka {

        bootstrap_servers => "10.110.181.39:6667"

        topics => ["data1"]

        type => "kafka.logstash"

      }

    }

    output {

      if [type] == "kafka.logstash" {

        stdout {

          codec => rubydebug

        }

      }

    }

    logstash输出数据到kafka

    input {

      stdin {}

    }

    output {

      kafka {

        bootstrap_servers => "10.110.181.39:9092"

        topic_id => "data1"

      }

    }

    采集ambari-server.log到elasticsearch

    # test ambari log

    ambari-log.conf

    input {

      file {

        path => "/var/log/ambari-server/ambari-server.log"

        start_position => beginning

        type => "ambari.log"

      }

    }

    filter {

     

    }

    output {

      if [type] == "ambari.log" {

        elasticsearch {

          hosts => ["node17.sleap.com:9200","node18.sleap.com:9200"]

          index => "ambari-log"

        }

      }

    }

    查看logstash插件版本是否与kafka版本兼容。

    ./bin/logstash-plugin list –verbose

    logstash-input-kafka (5.1.7)

    logstash-output-kafka (5.1.6)

    logstash_LEAP3.4.4.0-5.4.1+ldh1.2.0+c0001-b0048.el6.x86_64.rpm

    kafka_LEAP3.4.4.0-0.9.0+ldh1.2.0+c0001-b0061.el6.x86_64.rpm

    elasticsearch_LEAP3.4.4.0-5.4.1+ldh1.2.0+c0001-b0061.NOARCH.rpm

    https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix

    kafka版本不兼容:Kafka Connect: versions <= 0.10.0 or >= 0.10.2

    plugins-inputs-kafka配置

    https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-kafka.html

    3.3.5 kafka问题: Error reading field 'topics'

    https://blog.csdn.net/badyting/article/details/56667873

    3.4 hive数据导入ElasticSearch

    https://blog.csdn.net/qianshangding0708/article/details/50388750

    http://www.voidcn.com/article/p-ftrfzdop-bqu.html

    3.5 ElasticSearch数据导入hive

    https://blog.csdn.net/shan1369678/article/details/51331296

    4 本地验证

    centos7 下kafka的安装介绍:

    https://segmentfault.com/a/1190000012990954

    centos7 安装部署ELK 6.2.4:

    http://blog.51cto.com/andyxu/2124697

    Logstash连接kafka指南

    https://wdxtub.com/2016/08/18/logstash-kafka-guide/

    启动zookeeper

    zookeeper-server-start.sh kafka_2.12-1.0.0/config/zookeeper.properties > /dev/null 2>&1 &

    启动kafka

    kafka-server-start.sh kafka_2.12-1.0.0/config/server.properties > /dev/null 2>&1 &

    创建topic

    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic data1

    查看topic

    kafka-topics.sh --list --zookeeper localhost:2181

    创建消费者

    kafka-console-consumer.sh  --zookeeper 10.110.181.50:2181 --topic data1 --from-beginning

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

    创建生产者

    kafka-console-producer.sh --broker-list localhost:9092 --topic test

    删除topic

    kafka-topics.sh --zookeeper localhost:2181 --topic data1

    /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf

    查看elasticsearch的index

    查看input-kafka的内容

    /usr/share/logstash/bin/logstash -f input-kafka.conf 

    5 ELK实例

    kafka接收数据,导入到logstash,过滤出特定ip日志,存储在hive。

    展开全文
  • ETL详细教程+笔记.zip

    2021-01-07 16:17:42
    ETL详细教程+笔记.zip ETL概念 ETL讲解 数据仓库 ETL开发 ...
  • ETL测试教程

    2018-09-05 16:40:00
    在我们了解ETL测试之前,先了解有关商业智能和数据仓库的重要性。 让我们开始吧 - 什么是BI? 商业智能是收集原始数据或业务数据并将其转化为有用和更有意义的信息的过程。 原始数据是一个组织每日事务的记录,...

    在我们了解ETL测试之前,先了解有关商业智能和数据仓库的重要性。
    让我们开始吧 -

    什么是BI?

    商业智能是收集原始数据或业务数据并将其转化为有用和更有意义的信息的过程。
    原始数据是一个组织每日事务的记录,如与客户的互动,财务管理和员工管理等。
    这些数据将用于“报告,分析,数据挖掘,数据质量和解释,预测分析”。

    什么是数据仓库?

    数据仓库是一个数据库,专为查询和分析而设计,而不是事务处理。
    通过集成来自多个异构源的数据构建数据仓库,使公司或组织能够整合来自多个来源的数据,并将分析工作与事务工作分开。
    数据转化为高质量信息,以满足各级用户的所有企业报告要求。

     

     
    ETL测试或数据仓库测试:终极指南

     

    什么是ETL?

    ETL代表Extract-Transform-Load,它是将数据从源系统加载到数据仓库的过程。
    从OLTP(联机事务处理)数据库中提取数据,将其转换为与数据仓库模式匹配并加载到数据仓库数据库中。
    许多数据仓库还包括非OLTP系统的数据,例如文本文件,遗留系统和电子表格。

    看看它是如何工作的

    例如,有零售公司有不同的部门,如销售,营销,物流等。他们每个都是独立处理客户信息,而且存储数据的方式是截然不同的。
    销售部门按客户名称存储,营销部门按客户编号存储。

    现在,如果他们想检查客户的历史,并想知道由于不同的营销活动,他/她购买的不同的产品?这将是非常困难的。

    解决方案是使用数据仓库来使用ETL将来自不同来源的信息存储在统一的结构中。
    ETL可以将不同的数据集转换为统一的结构。后来使用BI工具从这些数据中获得有意义的见解和报告。

    下图给出了ETL过程的ROAD MAP

     

     
    ETL测试或数据仓库测试:终极指南

     

    1. Extract

      • 提取相关数据
    2. Transform

      • 将数据转换为DW(数据仓库)格式

      • 构建键 - 键是唯一标识实体的一个或多个数据属性。
        各种键是主键,备用键,外键,复合键,代理键。
        数据仓库管理这些键,并且不允许再去分配给任何其他实体。

      • 清理数据:提取数据后,将进入下一个阶段,清理和规范数据。
        清理在数据中排除以及识别和修复错误。
        规范意味着解决那些不兼容的数据之间的冲突,以便它们可以在企业数据仓库中使用。
        除此之外,该系统还创建了用于诊断源系统问题并提高数据质量的meta-data(元数据)。

    3. 加载

      • 将数据加载到DW(数据仓库)

      • 构建聚合 - 创建聚合是汇总和存储事实表中可用的数据,以便提高最终用户查询的性能。

    什么是ETL测试?

    完成ETL测试是为了确保经过业务转换后从源加载到目标的数据准确无误。
    它还涉及在源和目标之间使用的各个中间阶段的数据的验证。

    ETL测试流程

    与其他测试过程类似,ETL也经过不同的阶段。
    ETL测试过程的不同阶段如下

     

     
    ETL测试或数据仓库测试:终极指南

     

    ETL测试分五个阶段进行

    1. 识别数据源和需求
    2. 数据采集
    3. 实现业务逻辑和维度建模
    4. 构建和填充分析的多维立方体
    5. 构建报告

     

     
    ETL测试或数据仓库测试:终极指南

     

    ETL测试的类型

    测试类型测试过程
    生产验证测试 “Table balancing”或“production reconciliation”这种类型的ETL测试是在数据移入生产系统时进行的。为了支持您的业务决策,生产系统中的数据必须按正确的顺序排列。 Informatica数据验证选项提供ETL测试自动化和管理功能,以确保生产系统不会受到数据的影响。
    源到目标测试(验证测试) 执行这种类型的测试是为了验证转换的数据值是否是符合预期。
    应用升级 这种类型的ETL测试可以自动生成,节省了大量的测试开发时间。这种类型的测试检查从旧的应用程序或库提取的数据是否与库或新应用程序中的数据完全相同。
    元数据测试 元数据测试包括数据类型检查,数据长度检查和索引/约束检查的测试。
    数据完整性测试 要验证所有预期的数据是否从源中加载到目标中,则完成数据的完整性测试。其中一些测试可以运行去比较和验证源和目标之间的计数,聚合和实际数据的列,简单转换或不转换。
    数据精度测试 这种测试是为了确保数据按预期精确地加载和转换。
    数据转换测试 在许多情况下,测试数据转换不能通过编写一个源SQL查询并将输出与目标进行比较来实现。可能需要针对每一行运行多个SQL查询来验证转换规则。
    数据质量检测 数据质量测试包括语法和引用测试。为了避免在业务流程期间由于日期或订单号等造成的任何错误。
    语法测试:将根据无效字符,字符模式,大小写不正确等报告脏数据。
    引用测试:将根据数据模型检查数据。例如:客户ID数据质量测试包括数字检查,日期检查,精确检查,数据检查,空检查等。
    增量ETL测试 此测试是通过添加新数据来检查旧数据和新数据的数据完整性。增量测试验证插入和更新是否在增量ETL过程中按预期进行处理。
    GUI /导航测试 这个测试是为了检查前端报告的导航或GUI方面。

    如何创建ETL测试用例

    ETL测试是一种可以应用于信息管理行业不同工具和数据库的概念。
    ETL测试的目标是确保通过业务转换后从源到目标的数据准确无误。 它还涉及在源和目标之间各个中间阶段使用的数据的验证。

    在执行ETL测试时,ETL测试仪将始终使用两个文档

    1. ETL映射表: ETL映射表包含源表和目标表的所有信息,包括每个列以及它们在引用表中的引用。
      ETL测试人员需要对SQL查询感到满意,因为ETL测试可能涉及用多种连接写大的查询语句以在ETL的任何阶段验证数据。
      在编写数据验证查询时,ETL映射表提供了重要的帮助。

    2. 源和目标的DB Schema:应该方便地验证映射表中的任何细节。

    ETL测试场景和测试用例

    测试场景测试用例
    映射文档验证 验证映射文档是否提供相符的ETL信息。每次更改日志应保留在映射文档中。
    验证 1. 根据相应的映射文档验证源表和目标表结构。
    2. 源数据类型和目标数据类型应该相同
    3. 源和目标数据类型的长度应相等
    4. 验证是否指定了数据字段类型和格式
    5. 源数据类型长度不​​应小于目标数据类型长度
    6. 根据映射文档验证表中列的名称。
    约束验证 确保按预期为特定表定义约束
    数据一致性问题 1. 特定属性的数据类型和长度可能在文件或表中不同,但语义定义相同。
    2. 滥用完整性约束
    完整性问题 1. 确保所有预期的数据都加载到目标表中。
    2. 比较源和目标之间的记录计数。
    3. 检查任何被拒绝的记录
    4. 检查数据不应在目标表的列中被截断
    5. 检查边界值分析
    6. 比较加载到仓库的数据和源数据之间的关键字段的唯一值
    正确性问题 1. 数据拼写错误或记录错误
    2. 空,非唯一或超出范围的数据
    转换 转换
    数据质量 1. 数字检查:需要数字检查和验证
    2. 日期检查:他们必须遵循日期格式,它应该在所有记录中相同
    3. 精度检查
    4. 数据检查
    5. 空检查
    空验证 为指定“Not Null”的特定列验证空值。
    重复检查 1. 需要验证唯一键,主键和任何其他列应该是唯一的,如果业务需求具有任何重复的行
    检查从源中的多个列提取并组合成一列的任何列中是否存在任何重复值
    根据客户端要求,需要确保仅在目标中的多个列的组合不会重复
    日期验证 日期值用在ETL开发中使用许多领域
    1. 获取行创建日期
    2. 根据ETL开发视角识别活动记录
    3. 根据业务需求视角确定活动记录
    4. 有时根据日期值生成更新和插入。
    数据完整性验证 1. 验证源表和目标表中的完整数据集减去最佳解决方案中的查询
    2. 我们需要源减去目标和目标减去源
    3. 如果减法查询返回任何值,那么这些值应被视为不匹配行
    4.使用intersect语句在源和目标之间相交返回的计数,应与源表和目标表的单独的计数相匹配
    5. 如果行的查询返回的行和计数相交小于源计数或目标表,那么我们可以考虑重复的行存在。
    数据清理 在加载到暂存区域之前,应删除不必要的列。

    ETL错误的类型

    错误类型描述
    用户界面错误bug * 与应用程序的GUI相关
    * 字体样式,字体大小,颜色,对齐方式,拼写错误,导航等
    边界值分析(BVA)相关的bug 最小值和最大值
    等价类分类(ECP)相关的bug 有效和无效的类型
    输入/输出bug * 有效值不被接受
    * 无效值被接受
    计算bug * 精度错误
    * 最终输出错误
    加载条件错误 * 不允许多个用户
    * 不允许客户预期的加载
    Race Condition错误 * 系统崩溃和挂起
    * 系统无法运行客户端平台
    版本控制错误 * 没有标志匹配
    没有版本信息可用
    这通常发生在回归测试中
    H / W错误 设备没有响应应用程序
    帮助源错误 帮助文件中的错误

     

     
    ETL测试或数据仓库测试:终极指南

     

    错误类型描述
    用户界面错误bug * 与应用程序的GUI相关
    * 字体样式,字体大小,颜色,对齐方式,拼写错误,导航等
    边界值分析(BVA)相关的bug 最小值和最大值
    等价类分类(ECP)相关的bug 有效和无效的类型
    输入/输出bug * 有效值不被接受
    * 无效值被接受
    计算bug * 精度错误
    * 最终输出错误
    加载条件错误 * 不允许多个用户
    * 不允许客户预期的加载
    Race Condition错误 * 系统崩溃和挂起
    * 系统无法运行客户端平台
    版本控制错误 * 没有标志匹配
    没有版本信息可用
    这通常发生在回归测试中
    H / W错误 设备没有响应应用程序
    帮助源错误 帮助文件中的错误

    数据库测试与ETL测试的区别

    ETL测试数据库测试
    验证数据是否按预期移动 主要目标是检查数据是否遵循数据模型中定义的规则/标准
    验证源和目标中的计数是否匹配
    验证数据转换是否符合预期
    验证没有孤立记录和维护外主键关系
    验证ETL期间外部主键关系是否保留 验证没有冗余表,数据库被最佳地标准化
    验证是否重复的加载数据中 验证是否在需要的列中丢失数据

    ETL测试的职责

    ETL测试的主要职责分为三类

    • 阶段表/ SFS或MFS
    • 业务转型逻辑应用
    • 应用转换后,从阶段文件或表格中加载目标表。

    ETL测试的一些职责是

    • 测试ETL软件
    • 测试ETL数据仓库的组件
    • 执行后端数据驱动测试
    • 创建,设计和执行测试用例,测试计划和测试工具
    • 确定问题并为潜在问题提供解决方案
    • 审核需求和设计规范
    • 数据传输和测试平面文件
    • 编写SQL查询用于各种场景,如计数测试

    ETL性能测试和调优

    ETL 性能测试是确保ETL系统能够处理多个用户和事务负载的确认测试。
    性能调优的目标是通过消除性能瓶颈来优化会话性能。
    要调整或提高会话的性能,您必须找出性能瓶颈并将其消除。
    在源和目标数据库,映射,会话和系统中可以找到性能瓶颈。
    用于性能测试的最佳工具之一是Informatica。

    ETL测试自动化

    ETL测试的一般方法是使用SQL脚本或做数据的“ eyeballing”。这些ETL测试方法耗时,容易出错,很少提供完整的测试覆盖。
    加快,提高覆盖率,降低成本,提高
    生产和开发环境中ETL测试的缺陷发现率,自动化是需要的时间。一个这样的工具是Informatica。

    ETL测试的最佳实践

    1. 确保数据正确转换
    2. 没有任何数据丢失和截断,数据应该被加载到数据仓库中
    3. 确保ETL应用程序适当地拒绝并使用默认值替换并报告无效数据
    4. 需要确保在规定的预期时间范围内加载数据仓库的数据,以确认可扩展性和性能
    5. 所有方法都应该有适当的单元测试,而不管可见性
    6. 要测量其有效性,所有单元测试都应使用适当的覆盖率测试技术
    7. 为每个测试用例争取一个断言
    8. 创建针对异常的单元测试


    作者:杰克家的猫
    链接:https://www.jianshu.com/p/f56c808cd2c3
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

    转载于:https://www.cnblogs.com/xjx767361314/p/9593026.html

    展开全文
  • ETL高级教程

    2013-10-23 09:36:37
    ETL高级教程 1,Kettle跨平台使用。 例如:在AIX下(AIX是IBM商用UNIX操作系统,此处在LINUX/UNIX同样适用),运行Kettle的相关步骤如下: 1)进入到Kettle部署的路径 2)执行 chmod *.sh,将所有shell文件...
  • 教程为记录本人的学习关键部分,故如果看不懂可以联系博主,只讲关键ETL操作部分 1.全量ETL过程 示例: (1)项目创建完成后,创建一个新的SSIS包 我这里就叫ODSCustomer2.dtsx好了 (2)创建数据连接 ...
  • CloverETL使用教程

    千次阅读 2016-08-03 16:18:59
    CloverETL介绍CloverETL是一个基于Java的开源的ETL框架,同时还包含了一个 ETL设计器——CloverETL Designer。核心的算法就是一个数据流网络。 CloverETL支持大多数主流数据库系统,并且它是一个跨平台产品,支持...
  • 教程名称:数据抽取转换(ETL教程大全课程目录:【】2.0数据仓库与ETL技术【】BI项目中ETL设计与思考【】DataStage(ETL)技术总结【】ETL Automation 操作手册【】ETL Automation 软件【】ETL BI 经营分析 数据抽取...
  • kettle之ETL一步到位 阿里云大学,腾讯云特骋讲师,曾任光华电子大数据...
  • kettle之ETL一步到位视频教程

    千人学习 2020-04-29 17:02:03
    kettle从入门到精通,让你真正掌所掌商业智能的能力。了解真正的ETL过程 此系列课程共20集,有真实项目,本课为完整版。
  • ETL工具KETTLE培训教程

    2018-07-10 14:46:21
    ETL工具KETTLE详细讲解,深入浅出了解ETLkettle的使用
  • ETL高级教程kettle

    2012-07-05 09:39:49
     在Job下的start模块,有一个定时功能,可以每日,每周等方式进行定时,对于周期性的ETL,很有帮助。  a.使用资源库(repository)登录时,默认的用户名和密码是admin/admin。  b.当job是存放在资源库(一般...
  • OWB的ETL工具教程

    千次阅读 2017-09-28 22:30:01
    此外,ETL的部署不局限于你当前正在工作的服务器,OWB让你能够在一台服务器上设计ETL过程,然后将设计的步骤部署到另一台服务器上,如果你想要,还可以部署到更多的服务器上。 这个操作背后的整个程序是怎样一个...
  • ETL工具Kettle使用教程

    2021-01-19 10:55:40
    Kettle使用教程之数据同步 Kettle从文本中导入大量到数据库 Kettle使用教程之Job使用 Kettle使用教程之安装与资源库的创建
  • ETL工具-Kettle Spoon教程

    万次阅读 多人点赞 2018-09-21 14:56:03
     ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程),对于企业或行业应用来说,我们经常会遇到各种数据的处理,转换,迁移,了解并掌握一种etl工具的使用,必不可少,支持图形化的GUI设计界面,...
  • 教程名称:SQL Server 2005 ETL专家系列视频教程课程目录:【】1.SQL Server DTS的前世今生【】10.ETL系列答疑【】2.SQL Server 2005 Integration Service的基本任务【】3.SQL Server 2005 Integration Service的...
  • kettle 入门教程 ETL 基础 介绍

    千次阅读 2020-07-21 18:26:54
    kettle是一个ETL(Extract, Transform and Load)数据抽取、转换、载入工具,ETL工具在数据仓库项目使用非常频繁,kettle也可以应用在以下一些场景: 在不同应用或数据库之间整合数据 把数据库中的数据导出...
  • ETL高级教程学习笔记

    2007-12-29 14:38:00
    ODS的定义似乎业内没有一个统一的标准,教程里理解的是一个业务系统数据库的快照.教程推荐业务系统的数据先导到这个ODS层数据库中,虽然是快照,不过也可以适当的加些转换或者标识,比如加派生列标识数据是从哪里来的,...
  • Kettle的建立数据库连接、...Kettle简介:Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。Kettle 中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个...
  • Kettle ETL工具设置教程

    2011-12-23 11:31:58
     在Job下的start模块,有一个定时功能,可以每日,每周等方式进行定时,对于周期性的ETL,很有帮助。  a.使用资源库(repository)登录时,默认的用户名和密码是admin/admin。  b.当job是存放在资源库...
  • 使用PostgresOperator执行SQL完成ETL任务 通过搜集信息,了解到PostgresOperator能执行SQL,并且还支持传参数.能解决大多数ETL任务中的传参问题.传参使用的是Python的Jinjia模块. 创建DAG 首先创建一个test_param_sql...
  • Kettle的建立数据库连接、...Kettle简介:Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。Kettle 中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个...

空空如也

空空如也

1 2 3 4 5 ... 17
收藏数 326
精华内容 130
关键字:

etl教程