精华内容
下载资源
问答
  • 数据质量监控工具-Apache Griffin

    万次阅读 多人点赞 2019-01-12 15:40:58
    文章目录一、概述二、Apache Griffin2.1 特性2.1.1 数据质量指标说明2.2 优势2.3 工作流程2.4 系统架构2.5 数据验证逻辑2.6 Demo2.7 后台提交监控任务 一、概述 随着业务发展和数据量的增加,大数据应用开发已成为...

    一、概述

    随着业务发展和数据量的增加,大数据应用开发已成为部门应用开发常用的开发方式,由于部门业务特点的关系,spark和hive应用开发在部门内部较为常见。当处理的数据量达到一定量级和系统的复杂度上升时,数据的唯一性、完整性、一致性等等校验就开始受到关注,而通常做法是根据业务特点,额外开发job如报表或者检查任务,这样会比较费时费力。

    目前遇到的表大部分在几亿到几十亿的数据量之间,并且报表数量在不断增加,在这种情况下,一个可配置、可视化、可监控的数据质量工具就显得尤为重要了。Griffin 数据质量监控工具正是可以解决前面描述的数据质量问题的开源解决方案。

    二、Apache Griffin

    Griffin起源于eBay中国,并于2016年12月进入Apache孵化器,Apache软件基金会2018年12月12日正式宣布Apache Griffin毕业成为Apache顶级项目。

    Griffin是属于模型驱动的方案,基于目标数据集合或者源数据集(基准数据),用户可以选择不同的数据质量维度来执行目标数据质量的验证。支持两种类型的数据源:batch数据和streaming数据。对于batch数据,我们可以通过数据连接器从Hadoop平台收集数据。对于streaming数据,我们可以连接到诸如Kafka之类的消息系统来做近似实时数据分析。在拿到数据之后,模型引擎将在spark集群中计算数据质量。

    2.1 特性

    • 度量:精确度、完整性、及时性、唯一性、有效性、一致性。
    • 异常监测:利用预先设定的规则,检测出不符合预期的数据,提供不符合规则数据的下载。
    • 异常告警:通过邮件或门户报告数据质量问题。
    • 可视化监测:利用控制面板来展现数据质量的状态。
    • 实时性:可以实时进行数据质量检测,能够及时发现问题。
    • 可扩展性:可用于多个数据系统仓库的数据校验。
    • 可伸缩性:工作在大数据量的环境中,目前运行的数据量约1.2PB(eBay环境)。
    • 自助服务:Griffin提供了一个简洁易用的用户界面,可以管理数据资产和数据质量规则;同时用户可以通过控制面板查看数据质量结果和自定义显示内容。

    2.1.1 数据质量指标说明

    • 精确度:度量数据是否与指定的目标值匹配,如金额的校验,校验成功的记录与总记录数的比值。
    • 完整性:度量数据是否缺失,包括记录数缺失、字段缺失,属性缺失。
    • 及时性:度量数据达到指定目标的时效性。
    • 唯一性:度量数据记录是否重复,属性是否重复;常见度量为hive表主键值是否重复。
    • 有效性:度量数据是否符合约定的类型、格式和数据范围等规则。
    • 一致性:度量数据是否符合业务逻辑,针对记录间的逻辑的校验,如:pv一定是大于uv的,订单金额加上各种优惠之后的价格一定是大于等于0的。

    2.2 优势

    • 可配置、可自定义的数据质量验证。
    • 基于spark的数据分析,可以快速计算数据校验结果。
    • 历史数据质量趋势可视化。

    2.3 工作流程

    1. 注册数据,把想要检测数据质量的数据源注册到griffin。
    2. 配置度量模型,可以从数据质量维度来定义模型,如:精确度、完整性、及时性、唯一性等。
    3. 配置定时任务提交spark集群,定时检查数据。
    4. 在门户界面上查看指标,分析数据质量校验结果。

    2.4 系统架构

    Griffin 系统主要分为:数据收集处理层(Data Collection&Processing Layer)、后端服务层(Backend Service Layer)和用户界面(User Interface),如图:
    Griffin技术组件

    系统数据处理分层结构图:
    Griffin架构图

    系统处理流程图:
    Griffin工作流

    2.5 数据验证逻辑

    2.5.1 精确度验证(accurancy),从hive metadata中加载数据源,校验精确度

    • 选择source表及列
    • 选择target表及列
    • 选择字段比较规则(大于、小于或者相等)
    • 通过一个公式计算出结果:
      精确度计算公式
    • 最后在控制面板查看精确度趋势
      精确度趋势图

    2.5.2 数据统计分析(profiling)

    • 选择需要进行分析的数据源,配置字段等信息。
    • 简单的数据统计:用来统计表的特定列里面值为空、唯一或是重复的数量。例如统计字段值空值记录数超过指定一点阈值,则可能存在数据丢失的情况。
    • 汇总统计:用来统计最大值、最小值、平均数、中值等。例如统计年龄列的最大值最小值判断是否存在数据异常。
    • 高级统计:用正则表达式来对数据的频率和模式进行分析。例如邮箱字段的格式验证,指定规则的数据验证。
    • 数据分析机制主要是基于Spark的MLlib提供的列汇总统计功能,它对所有列的类型统计只计算一次。
    • 控制面板分析数据
      数据统计图

    2.5.3 异常检测

    • 异常检测的目标是从看似正常的数据中发现异常情况,是一个检测数据质量问题的重要工具。通过使用BollingerBands和MAD算法来实现异常检测功能,可以发现数据集中那些远远不符合预期的数据。

    • 以MAD作为例子,一个数据集的MAD值反映的是每个数据点与均值之间的距离。可以通过以下步骤来得到MAD值:

      • 算出平均值
      • 算出每一个数据点与均值的差
      • 对差值取绝对值
      • 算出这些差值取绝对值之后的平均值

      公式如下:
      MAD公式

      通过异常检测可以发现数据值的波动大小是否符合预期,数据的预期值则是在对历史趋势的分析中得来的,用户可以根据检测到的异常来调整算法中必要的参数,让异常检测更贴近需求。

    2.6 Demo

    以检测供应商账单明细表的同步精确度为例,配置数据检测,如图:

    • 选择数据源

    • 选择账单明细源表字段
      选择源表字段

    • 选择账单明细目标表字段
      选择目标表字段

    • 设置源表和目标表的校验字段映射关系
      设置映射关系

    • 选择数据分区、条件和是否输出结果文件。(无分区表可以跳过)
      选择分区条件

    • 设置验证项目名称和描述,提交后就可以在列表看到度量的信息了
      设置名称和描述

    度量列表

    创建了数据模型度量后,需要相应的spark定时任务来执行分析,接下来就是创建spark job和调度信息了

    • 在job菜单下,选择Create Job
      创建job

    创建job界面中需要选择源表和目标表数据范围,如上图所示是选择t-1到当前的数据分区,即昨天的数据分区。设置定时表达式,提交任务后即可在job列表中查看:
    job任务执行

    • 到这里,数据验证度量和分析任务都已配置完成,后面还可根据你的指标设置邮件告警等监控信息,接下来就可以在控制面板上监控你的数据质量了,如图:

    2.7 后台提交监控任务

    除了用户在控制面板创建数据验证任务,也可以通过后台生成指标信息,提交spark任务进行数据检测,提供了良好的数据接入和配置的扩展性,api配置数据检测可查看官网快速指引

    实时数据检测目前未有界面配置,可以通过api的方式提交实时数据监控,详细内容可以参考:Streaming Use Cases

    written by 赖泽坤@vipshop.com

    参考文档

    展开全文
  • 数据质量监控工具-Apache Griffin本地安装和调试

    千次阅读 热门讨论 2019-06-16 18:26:19
    数据质量监控工具-Apache Griffin本地安装和调试 1 、Apache Griffin简介 Griffin起源于eBay中国,并于2016年12月进入Apache孵化器,Apache软件基金会2018年12月12日正式宣布Apache Griffin毕业成为Apache顶级项目。...

    数据质量监控工具-Apache Griffin本地安装和调试


    1 、Apache Griffin简介
    Griffin起源于eBay中国,并于2016年12月进入Apache孵化器,Apache软件基金会2018年12月12日正式宣布Apache Griffin毕业成为Apache顶级项目。
    Griffin是属于模型驱动的方案,基于目标数据集合或者源数据集(基准数据),用户可以选择不同的数据质量维度来执行目标数据质量的验证。支持两种类型的数据源:batch数据和streaming数据。对于batch数据,我们可以通过数据连接器从Hadoop平台收集数据。对于streaming数据,我们可以连接到诸如Kafka之类的消息系统来做近似实时数据分析。在拿到数据之后,模型引擎将在spark集群中计算数据质量。
    2、代码下载
    https://github.com/apache/griffin 选择master分支,下载zip包,或者利用git,直接克隆代码
    3、本地安装
    把代码导入idea,等待所有依赖的jar包的下载完成,整个过程可能需要10分钟左右
    在这里插入图片描述
    4.修改配置文件
    4.1 修改service模块下的pom.xml文件,把支持mysql的profIle的注释放开
    原来的pom.xml:
    在这里插入图片描述
    修改之后的pom.xml
    在这里插入图片描述
    4.2修改service模块下的application.properties
    在这里插入图片描述
    修改前:
    在这里插入图片描述
    修改后:
    在这里插入图片描述
    4.3 修改ui模块下的angular下的environment.ts
    修改前
    在这里插入图片描述
    修改后:
    在这里插入图片描述
    5、执行初始化脚本
    初始化脚本在service模块下的resource目录下,因为使用的是mysql数据库,所以执行的是mysql的初始化脚本:
    在这里插入图片描述
    6、启动项目
    6.1、启动serve模块
    直接debug运行service模块下的GriffinWebApplication这个类即可:
    在这里插入图片描述
    因为这里我没有改hive的元数据信息配置,这个项目启动的时候就会去找连接hive的元数据,所以项目启动的时候会报错,但这并不会影响项目的正常启动
    在这里插入图片描述
    在这里插入图片描述
    6.2启动前端
    先把ui模块进行编译,编译之后去到node_modules目录下中的./bin目录 执行如下命令:
    ng serve --port 8081
    这里的端口不能是8080,因为8080已经被service模块占用了
    在这里插入图片描述
    6.3在浏览器中打开
    http://localhost:8081

    出现这个页面说明本地可以正常启动了,后台默认是没有用户名和密码的,直接点击登录就可以了
    点击登录之后看到的页面;
    在这里插入图片描述
    7、断点调试
    7.1 在后台的service模块下的JobController类上打上断点
    在这里插入图片描述
    在浏览器上切换到job的页面,会发出查询所有Jobs的请求:
    在这里插入图片描述
    发现后台的断点已经进到了:
    在这里插入图片描述
    如果本地安装启动过程,遇到问题无法解决,可以添加微信,备注:apache griffin(否则有可能不通过)
    在这里插入图片描述

    展开全文
  • Apache Griffin 开源的数据质量分析工具

    千次阅读 热门讨论 2019-09-26 21:50:47
    文章目录一 简介二 架构Apache Griffin具备的能力Apache Griffin处理数据的方式Apache Griffin架构图Apache Griffin的工作流三 环境部署1,部署jdk版本2,部署mysql版本3,部署hadoop版本4,部署hive版本5,部署...


    一 简介

    apache Griffin是一个开源的大数据数据质量解决方案,它支持批处理流模式两种数据质量检测方式,可以从不同维度(比如离线任务执行完毕后检查源端和目标端的数据数量是否一致、源表的数据空值数量等)度量数据资产,从而提升数据的准确度、可信度。
    官方网站的介绍:Big Data Quality Solution For Batch and Streaming 官方介绍:http://griffin.apache.org/#about_page

    二 架构

    Apache Griffin具备的能力

    引入官方文档
    在这里插入图片描述
    意思就是Apache Griffn具备提供明确的数据质量的定义域,这个通常覆盖了大多数数据质量的问题,同时能够支持用户自定义数据质量的标准,通过扩展DSL(Apache Griffn定义),用户能够自定义扩展自己的数据定义功能

    Apache Griffin处理数据的方式

    官方文档
    在这里插入图片描述
    1,对于数据质量的定义,用户可以通过Apache Griffin UI功能,对于他们关注的数据进行质量定义,例如准确性,完整性,及时性等
    2,数据指标计算,Apache Griffin基于数据质量的维度定义,从流模式(kafka模型),批处理(定时功能)的方式抽取元数据进行计算,
    3,数据质量结果落盘,数据质量报告作为度量将被逐出指定的目标。
    4,apache Griffin提供了易于在ApacheGriffin平台上提供任何新的数据质量要求并编写综合逻辑以定义其数据质量的插件扩展。

    Apache Griffin架构图

    在这里插入图片描述
    各个部分的主要职能:

    • Define:主要负责定义数据质量统计的维度,比如数据质量统计的时间跨度、统计的目标(源端和目标端的数据数量是否一致,数据源里某一字段的非空的数量、不重复值的数量、最大值、最小值、top5的值数量等
    • Measure:主要负责执行统计任务,生成统计结果
    • Analyze:主要负责保存与展示统计结果
      Apache Griffin的元数据来源:kafka,hadoop,RDBMS(关系型数据库),
      Apache Griffin的运行指标模型:基于Spark

    Apache Griffin的工作流

    1,注册数据,把想要检测数据质量的数据源注册到griffin。
    2,配置度量模型,可以从数据质量维度来定义模型,如:精确度、完整性、及时性、唯一性等。
    3,配置定时任务提交spark集群,定时检查数据。
    4,在门户界面上查看指标,分析数据质量校验结果。
    在这里插入图片描述

    三 环境部署

    1,部署jdk版本

    步骤略(jdk版本要求1.8以上)

    2,部署mysql版本

    步骤略

    3,部署hadoop版本

    (1)下载hadoop版本 :https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/core/ 2.7.7版本
    (2)上传到/opt/hadoop,没有hadoop目录可以自己创建 mkdir /opt/hadoop(也可以自行创建其他目录)

    tar  hadoop-2.7.7.tar.gz
    vi /etc/profile
    

    追加Hadoop目录

    export HADOOP_HOME=/opt/hadoop/hadoop-2.7.7 
    export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
    source /etc/profile
    

    追加hadoop的jdk环境变量‘

    cd $HADOOP_HOME/etc/hadoop
    vi hadoop-env.sh
    

    在文件中追加JDK环境变量

    export JAVA_HOME=/usr/local/jdk(实际自己的jdk部署目录)
    

    编辑core-site.xml文件

    cd  $HADOOP_HOME/etc/hadoop/conf
    vi core-site.xml
    <configuration> 
    <property>   
    		<name>fs.default.name</name>    
    		 <value>hdfs://localhost:9000</value> 
    </property> 
    </configuration>
    

    编辑hdfs-site.xml文件

    <configuration>  
        <property>  
            <name>dfs.name.dir</name>  
            <value>/usr/hadoop/hdfs/name</value>  
            <description>namenode上存储hdfs名字空间元数据 </description>   
        </property>  
      
        <property>  
            <name>dfs.data.dir</name>  
            <value>/usr/hadoop/hdfs/data</value>  
            <description>datanode上数据块的物理存储位置</description>  
        </property>  
      
      
        <!-- 设置hdfs副本数量 -->  
        <property>  
            <name>dfs.replication</name>  
            <value>1</value>  
        </property>  
    </configuration>  
    

    配置mapred-site.xml,刚开始安装的时候文件名是mapred-site.xml.template,重命名为mapred-site.xml

    <configuration>  
        <!-- 通知框架MR使用YARN -->  
        <property>  
            <name>mapreduce.framework.name</name>  
            <value>yarn</value>  
        </property>  
    </configuration>
    

    配置yarn-site.xml

    <configuration>  
        <!-- reducer取数据的方式是mapreduce_shuffle -->  
        <property>  
            <name>yarn.nodemanager.aux-services</name>  
            <value>mapreduce_shuffle</value>  
        </property>  
    </configuration>  
    

    4,部署hive版本

    参考 :https://www.cnblogs.com/caoxb/p/11333741.html

    5,部署Spark版本

    参考: https://blog.csdn.net/k393393/article/details/92440892

    6,部署Livy版本

    参考:添加链接描述

    7,部署Elasticsearch5版本

    参考:https://blog.csdn.net/fiery_heart/article/details/85265585

    8,部署Zookeeper

    基于kafka的时候需要

    四 使用实例

    基于Apache Griffin Hive数据库源数据计算

    http://griffin.apache.org/docs/quickstart.html
    本地化举例演示:
    (1)访问Apache Griffin可视化界面数据
    在这里插入图片描述
    (2)设置指标模型界面
    在这里插入图片描述
    (3)配置源数据和目标数据,以及对应的指标模型结果数据
    在这里插入图片描述
    (4)按照步骤,配置引擎结果
    在这里插入图片描述
    (5),配置任务的执行Job
    在这里插入图片描述
    (6),点击保存
    设置Job任务:
    (1)job任务配置页面设置
    在这里插入图片描述

    基于Apache Griffin Kafka源数据计算

    http://griffin.apache.org/docs/usecases.html (待分析,这个我们不会使用流数据源处理)

    五 源码分析

    https://github.com/apache/griffin 基于griffin- 0.4.0-rc0版本
    个人github:https://github.com/zcswl7961/apache-griffin-expand
    源码模块:
    在这里插入图片描述
    service:spring boot代码,做web配置 和监控界面服务端数据
    measure: scala 代码,Spark定时任务代码
    ui:前端界面

    数据依赖配置模块:
    application.properties
    在这里插入图片描述
    env:流和批处理
    在这里插入图片描述

    1,任务调度源码

    (1)首先是由前端进行作业任务保存之后,调用JobController的 POST /jobs方法,判断对应的是为批处理还是流处理作业任务,如果为批处理,创建BatchJob数据,然后保存本地的quartz的job
    在这里插入图片描述
    同时执行jobService.addJob(triggerKey, batchJob, BATCH);方法,创建定时任务,执行SparkSubmitJob job作业
    在这里插入图片描述
    initParam(jd);初始化相关参数信息,包括从JobDetail中获取measure,jobInstance,获取livy.url的配置信息,
    setLivyConf(); 设置livy任务提交的相关参数赋值给livyConfMap实例,主要是对于sparkProperties.json文件的解析,同时追加了一个raw参数
    在这里插入图片描述
    saveJobInstance(jd);通过livy提交spark任务,同时将当前的任务执行历史存入到本地quartz库中,进入到saveJobInstance方法中,首先执行post2Livy方法 ,首先设置了livy任务提交的HttpEntity
    在这里插入图片描述
    最终的livy任务提交中,调用的接口是 url:http://192.168.239.171:8998/batches ,body信息为:

    {
        "file":"hdfs://localhost:8020/griffin/griffin-measure.jar",
        "className":"org.apache.griffin.measure.Application",
        "name":"griffin",
        "queue":"default",
        "numExecutors":2,
        "executorCores":1,
        "driverMemory":"1g",
        "executorMemory":"1g",
        "conf":{
            "spark.yarn.dist.files":"hdfs://localhost:8020/home/spark_conf/hive-site.xml"
        },
        "files":[
    
        ],
        "args":[
            "{
      "spark" : {
        "log.level" : "WARN"
      },
      "sinks" : [ {
        "type" : "CONSOLE",
        "config" : {
          "max.log.lines" : 10
        }
      }, {
        "type" : "HDFS",
        "config" : {
          "path" : "hdfs://localhost:8020/griffin/persist",
          "max.persist.lines" : 10000,
          "max.lines.per.file" : 10000
        }
      }, {
        "type" : "ELASTICSEARCH",
        "config" : {
          "method" : "post",
          "api" : "http://localhost:9200/griffin/accuracy",
          "connection.timeout" : "1m",
          "retry" : 10
        }
      } ],
      "griffin.checkpoint" : [ ]
    }",
            "{
      "measure.type" : "griffin",
      "id" : 3355,
      "name" : "schedule-job-zcg",
      "owner" : "test",
      "description" : "test",
      "deleted" : false,
      "timestamp" : 1569477360000,
      "dq.type" : "ACCURACY",
      "sinks" : [ "ELASTICSEARCH", "HDFS" ],
      "process.type" : "BATCH",
      "data.sources" : [ {
        "id" : 3358,
        "name" : "source",
        "connectors" : [ {
          "id" : 3359,
          "name" : "source1569548839003",
          "type" : "HIVE",
          "version" : "1.2",
          "predicates" : [ ],
          "data.unit" : "1day",
          "data.time.zone" : "",
          "config" : {
            "database" : "griffin_demo",
            "table.name" : "demo_src",
            "where" : "dt='20190927' AND hour = '09'"
          }
        } ],
        "baseline" : false
      }, {
        "id" : 3360,
        "name" : "target",
        "connectors" : [ {
          "id" : 3361,
          "name" : "target1569548846717",
          "type" : "HIVE",
          "version" : "1.2",
          "predicates" : [ ],
          "data.unit" : "1day",
          "data.time.zone" : "",
          "config" : {
            "database" : "griffin_demo",
            "table.name" : "demo_tgt",
            "where" : "dt='20190927' AND hour = '09'"
          }
        } ],
        "baseline" : false
      } ],
      "evaluate.rule" : {
        "id" : 3356,
        "rules" : [ {
          "id" : 3357,
          "rule" : "source.id=target.id",
          "dsl.type" : "griffin-dsl",
          "dq.type" : "ACCURACY",
          "out.dataframe.name" : "accuracy"
        } ]
      },
      "measure.type" : "griffin"
    }",
            "raw,raw"
        ]
    }
    

    2,livy任务提交spark

    livy接收到service提交的任务之后,提交到spark,spark接受的到任务之后,进行执行,首先是获取hadoop中配置的fileName:hdfs://localhost:8020/griffin/griffin-measure.jar,通过获取对应的className进行执行任务调度
    在这里插入图片描述

    3,measure引擎计算

           首先measuer计算是依赖于measuer.jar ,同时spark通过访问hadoop中的上传的measuer.jar进行执行,这个配置是在griffin源码中的sparkProperties.json的配置信息中。
    在这里插入图片描述
    在去讲解源码之前,首先大致介绍一下Spark和Hadoop

    3.1 Hadoop

    首先,Hadoop是为了解决大数据存储大数据分析的一套开源的分布式基础架构,
    Hadoop有两大核心:HDFS和MapReducer

    • HDFS(Hadoop Distributed File
      System)是可扩展、容错、高性能的分布式文件系统,异步复制,一次写入多次读取,主要负责存储。
    • MapReduce 为分布式计算框架,包含map(映射)和 reduce(归约)过程,负责在 HDFS 上进行计算。

    HDFS 就像一个传统的分级文件系统,可以进行创建、删除、移动或重命名文件或文件夹等操作,与 Linux 文件系统类似。
    基础的文件操作命令:
    在这里插入图片描述
    MapReduce:MapReduce 是 Google 提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(归纳)”以及它们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。
    当前的软件实现是指定一个 Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的 Reduce(归纳)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
    在这里插入图片描述
    mapReduce的执行示例:
    MapReduce的执行示例

    3.2 Spark

    Spark是用于大规模数据处理的统一分析引擎。
    Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。
    Spark的基本运行流程:

    • (1)构建Spark
      Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
    • (2)资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
    • (3)SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task
      Scheduler。Executor向SparkContext申请Task
    • (4)Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
    • (5)Task在Executor上运行,运行完毕释放所有资源。

    在这里插入图片描述

    3.2 measuer源码

    进入到meauser模块中,执行Application.scala类,首先是获取启动类传递的两个参数args
    val envParamFile = args(0)
    val dqParamFile = args(1)
    envParamFile:表示对应环境配置信息,包括对应的spark的日志级别,数据源的输出目的地,

    {
    	//对应的spark的日志级别
        "spark":{
            "log.level":"WARN"
        },
        //数据源的输出目的地
        "sinks":[
            {
                "type":"CONSOLE",
                "config":{
                    "max.log.lines":10
                }
            },
            {
                "type":"HDFS",
                "config":{
                    "path":"hdfs://localhost:8020/griffin/persist",
                    "max.persist.lines":10000,
                    "max.lines.per.file":10000
                }
            },
            {
                "type":"ELASTICSEARCH",
                "config":{
                    "method":"post",
                    "api":"http://localhost:9200/griffin/accuracy",
                    "connection.timeout":"1m",
                    "retry":10
                }
            }
        ],
        "griffin.checkpoint":[
    
        ]
    }
    

    dbParamFile:表示对应的执行任务的数据配置,包括对应的数据源的配置,计算规则信息

    {
        "measure.type":"griffin",
        "id":3355,
        "name":"schedule-job-zcg",
        "owner":"test",
        "description":"test",
        "deleted":false,
        "timestamp":1569492180000,
        "dq.type":"ACCURACY",
        "sinks":[
            "ELASTICSEARCH",
            "HDFS"
        ],
        "process.type":"BATCH",
        "data.sources":[
            {
                "id":3358,
                "name":"source",
                "connectors":[
                    {
                        "id":3359,
                        "name":"source1569548839003",
                        "type":"HIVE",
                        "version":"1.2",
                        "predicates":[
    
                        ],
                        "data.unit":"1day",
                        "data.time.zone":"",
                        "config":{
                            "database":"griffin_demo",
                            "table.name":"demo_src",
                            "where":"dt='20190927' AND hour = '09'"
                        }
                    }
                ],
                "baseline":false
            },
            {
                "id":3360,
                "name":"target",
                "connectors":[
                    {
                        "id":3361,
                        "name":"target1569548846717",
                        "type":"HIVE",
                        "version":"1.2",
                        "predicates":[
    
                        ],
                        "data.unit":"1day",
                        "data.time.zone":"",
                        "config":{
                            "database":"griffin_demo",
                            "table.name":"demo_tgt",
                            "where":"dt='20190927' AND hour = '09'"
                        }
                    }
                ],
                "baseline":false
            }
        ],
        "evaluate.rule":{
            "id":3356,
            "rules":[
                {
                    "id":3357,
                    "rule":"source.id=target.id",
                    "dsl.type":"griffin-dsl",
                    "dq.type":"ACCURACY",
                    "out.dataframe.name":"accuracy"
                }
            ]
        }
    }
    

    Application.scala核心代码:

    object Application extends Loggable {
    
      def main(args: Array[String]): Unit = {
        // info(args.toString)
        val args = new Array[String](2)
        // 测试代码
        args(0) = "{\n  \"spark\":{\n    \"log.level\":\"WARN\",\n    \"config\":{\n      \"spark" +
          ".master\":\"local[*]\"\n    }\n  },\n  \"sinks\":[\n    {\n      \"type\":\"CONSOLE\",\n  " +
          "    \"config\":{\n        \"max.log.lines\":10\n      }\n    },\n    {\n      " +
          "\"type\":\"HDFS\",\n      \"config\":{\n        " +
          "\"path\":\"hdfs://localhost:8020/griffin/batch/persist\",\n        \"max.persist" +
          ".lines\":10000,\n        \"max.lines.per.file\":10000\n      }\n    },\n    {\n      " +
          "\"type\":\"ELASTICSEARCH\",\n      \"config\":{\n        \"method\":\"post\",\n        " +
          "\"api\":\"http://192.168.239.171:9200/griffin/accuracy\",\n        \"connection" +
          ".timeout\":\"1m\",\n        \"retry\":10\n      }\n    }\n  ],\n  \"griffin" +
          ".checkpoint\":[\n\n  ]\n}";
        args(1) = "{\n  \"name\":\"accu_batch\",\n  \"process.type\":\"batch\",\n  \"data" +
          ".sources\":[\n    {\n      \"name\":\"source\",\n      \"baseline\":true,\n      " +
          "\"connectors\":[\n        {\n          \"type\":\"avro\",\n          \"version\":\"1.7\"," +
          "\n          \"config\":{\n            \"file.name\":\"src/test/resources/users_info_src" +
          ".avro\"\n          }\n        }\n      ]\n    },\n    {\n      \"name\":\"target\",\n     " +
          " \"connectors\":[\n        {\n          \"type\":\"avro\",\n          \"version\":\"1.7\"," +
          "\n          \"config\":{\n            \"file.name\":\"src/test/resources/users_info_target" +
          ".avro\"\n          }\n        }\n      ]\n    }\n  ],\n  \"evaluate.rule\":{\n    " +
          "\"rules\":[\n      {\n        \"dsl.type\":\"griffin-dsl\",\n        \"dq" +
          ".type\":\"accuracy\",\n        \"out.dataframe.name\":\"accu\",\n        \"rule\":\"source" +
          ".user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND " +
          "source.last_name = target.last_name AND source.address = target.address AND source.email =" +
          " target.email AND source.phone = target.phone AND source.post_code = target.post_code\"\n " +
          "     }\n    ]\n  },\n  \"sinks\":[\n    \"CONSOLE\",\n    \"ELASTICSEARCH\"\n  ]\n}";
        if (args.length < 2) {
          error("Usage: class <env-param> <dq-param>")
          sys.exit(-1)
        }
    
        val envParamFile = args(0)
        val dqParamFile = args(1)
    
        info(envParamFile)
        info(dqParamFile)
    
        // read param files
        // args(0)信息,将其转换成对应的EnvConfig对象,
        val envParam = readParamFile[EnvConfig](envParamFile) match {
          case Success(p) => p
          case Failure(ex) =>
            error(ex.getMessage, ex)
            sys.exit(-2)
        }
        // args(2)信息,将其转换成对应的DQConfig配置信息
        val dqParam = readParamFile[DQConfig](dqParamFile) match {
          case Success(p) => p
          case Failure(ex) =>
            error(ex.getMessage, ex)
            sys.exit(-2)
        }
        val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
    
        // choose process
        // 选择对应的进程对象进行执行,这里面的就是BatchDQApp
        val procType = ProcessType(allParam.getDqConfig.getProcType)
        val dqApp: DQApp = procType match {
          case BatchProcessType => BatchDQApp(allParam)
          case StreamingProcessType => StreamingDQApp(allParam)
          case _ =>
            error(s"${procType} is unsupported process type!")
            sys.exit(-4)
        }
    
        startup
    
        // (1)初始化griffin定时任务执行环境
        // 具体代码见下个代码块,主要逻辑是创建sparkSession和注册griffin自定义的spark udf
        // dq app init
        dqApp.init match {
          case Success(_) =>
            info("process init success")
          case Failure(ex) =>
            error(s"process init error: ${ex.getMessage}", ex)
            shutdown
            sys.exit(-5)
        }
    
        // dq app run
        // (2)执行对应的定时任务作业,这里的处理就是批处理任务,
        val success = dqApp.run match {
          case Success(result) =>
            info("process run result: " + (if (result) "success" else "failed"))
            result
    
          case Failure(ex) =>
            error(s"process run error: ${ex.getMessage}", ex)
    
            if (dqApp.retryable) {
              throw ex
            } else {
              shutdown
              sys.exit(-5)
            }
        }
    
        // dq app end
        dqApp.close match {
          case Success(_) =>
            info("process end success")
          case Failure(ex) =>
            error(s"process end error: ${ex.getMessage}", ex)
            shutdown
            sys.exit(-5)
        }
    
        shutdown
    
        if (!success) {
          sys.exit(-5)
        }
      }
    
      private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
        val paramReader = ParamReaderFactory.getParamReader(file)
        paramReader.readConfig[T]
      }
    
      private def startup(): Unit = {
      }
    
      private def shutdown(): Unit = {
      }
    
    }
    

    首先进入到**(1)**中 初始化griffiin定时任务执行环境源码,进入到BatchDQApp的init方法

    def init: Try[_] = Try {
        // build spark 2.0+ application context
        val conf = new SparkConf().setAppName(metricName)
        conf.setAll(sparkParam.getConfig)
        conf.set("spark.sql.crossJoin.enabled", "true")
        sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
        sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
        sqlContext = sparkSession.sqlContext
    
        // register udf
        GriffinUDFAgent.register(sqlContext)
      }
    

    该段代码主要的功能是根据dqParam中的name属性,创建对应的SparkSession,同时获取对应的SqlContext,
    SparkSession是spark2.0引入的新概念,SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
    在最后的代码中

    GriffinUDFAgent.register(sqlContext)
    

    注册udf,udf是hive中支持的自定义函数,其中udf即最基本的自定义函数,类似to_char,to_data等,其中还有udaf和udtf两种类型:

    • UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等
    • UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group
      by之后使用的sum,avg等
    • UDTF(User-Defined Table-Generating
      Functions),用户自定义生成函数,有点像stream里面的flatMap

    源码中,通过GriffinUDFs注册了基础的udf函数,index_of,matches,reg_replace,通过GriffinUDAggFs注册了udaf函数(这部分可以进行扩展)

    def register(sqlContext: SQLContext): Unit = {
        sqlContext.udf.register("index_of", indexOf _)
        sqlContext.udf.register("matches", matches _)
        sqlContext.udf.register("reg_replace", regReplace _)
      }
    
    object GriffinUDAggFs {
    
      def register(sqlContext: SQLContext): Unit = {
      }
    
    }
    

    然后,进入到执行对应的定时任务作业 ,这一部分是spark计算的核心代码所在

    def run: Try[Boolean] = Try {
        // start time
        val startTime = new Date().getTime
    
        val measureTime = getMeasureTime
        val contextId = ContextId(measureTime)
    
        // 根据配置获取数据源 即dq对应的data.sources配置,
        // get data sources
        val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
        // 数据源初始化操作
        dataSources.foreach(_.init)
    
        // 创建griffin执行上下文
        // create dq context
        val dqContext: DQContext = DQContext(
          contextId, metricName, dataSources, sinkParams, BatchProcessType
        )(sparkSession)
    
        // 根据对应的sink的配置,输出结果到console和elasticsearch中
        // start id
        val applicationId = sparkSession.sparkContext.applicationId
        dqContext.getSink().start(applicationId)
    
        // 创建数据检测对比job信息
        // build job
        val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
    
        // dq job execute
        val result = dqJob.execute(dqContext)
    
        // end time
        val endTime = new Date().getTime
        dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms")
    
        // clean context
        dqContext.clean()
    
        // finish
        dqContext.getSink().finish()
    
        result
      }
    

    run方法中,主要的几大功能

    • 1,根据对应的配置args(1)获取数据源,即args(1)DQConfig配置中的data.sources配置
    • 2,数据源的初始化
    • 3,创建Girffin执行的上下文
    • 4,根据对应的skin配置,输出结果到console和elasticsearch中(配置中)
    • 5,创建数据检测对应的job信息
    • 6,执行任务作业

    首先进入到根据配置获取数据源,传递的参数有sparkSession,dqParam.getDataSources即dq对应的data.sources配置

    def getDataSources(sparkSession: SparkSession,
                         ssc: StreamingContext,
                         dataSources: Seq[DataSourceParam]
                        ): Seq[DataSource] = {
        dataSources.zipWithIndex.flatMap { pair =>
          val (param, index) = pair
          getDataSource(sparkSession, ssc, param, index)
        }
      }
    

    getDataSource()方法中,第一个参数是对应的SparkSession,第二个参数是StreamingContext(这里是null),第三个参数是数据源配置,第四个参数是index

    private def getDataSource(sparkSession: SparkSession,
                                ssc: StreamingContext,
                                dataSourceParam: DataSourceParam,
                                index: Int
                               ): Option[DataSource] = {
        val name = dataSourceParam.getName
        val connectorParams = dataSourceParam.getConnectors
        val timestampStorage = TimestampStorage()
    
        // for streaming data cache
        val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
          sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
    
        val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
          DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
            timestampStorage, streamingCacheClientOpt) match {
              case Success(connector) => Some(connector)
              case _ => None
            }
        }
    
        Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
      }
    

    调用DataConnectorFactory.getDataConnector函数获取对应的DataConnector对象
    第一个参数是SparkSession
    第二个参数在batch模式下是null
    第三个参数为dbparam配置中data.sources中的connectors参数
    第四个参数为时间戳
    第五个参数为streaming data cache,为null

    def getDataConnector(sparkSession: SparkSession,
                           ssc: StreamingContext,
                           dcParam: DataConnectorParam,
                           tmstCache: TimestampStorage,
                           streamingCacheClientOpt: Option[StreamingCacheClient]
                          ): Try[DataConnector] = {
        val conType = dcParam.getType
        val version = dcParam.getVersion
        Try {
          // 数据源映射
          conType match {
            case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
            case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
            case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
            case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
            case KafkaRegex() =>
              getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
            case _ => throw new Exception("connector creation error!")
          }
        }
      }
    

    最终,我们能看到griffin的meauser默认的数据源配置有以下几种,hiveavrotextDirkafka
    这里,我们以Hive数据源来分析对应的创建过程,以及数据的执行过程

    /**
      * batch data connector for hive table
      * 接收三个参数:1,SparkSession 2,connectors配置信息,3,timestampStorage缓存对象
      */
    case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
                                      dcParam: DataConnectorParam,
                                      timestampStorage: TimestampStorage
                                     ) extends BatchDataConnector {
    
    //connectors配置信息下的config配置
      val config = dcParam.getConfig
    
      //数据库
      val Database = "database"
      //表
      val TableName = "table.name"
      val Where = "where"
    	
      //config配置下的database库
      val database = config.getString(Database, "default")
      //config配置下的table
      val tableName = config.getString(TableName, "")
      //config下的where配置信息
      val whereString = config.getString(Where, "")
    
      //关联
      val concreteTableName = s"${database}.${tableName}"
      val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
    
      //sparkSql执行sql语句,返回对应的DataFrame和TimeRate数据
      def data(ms: Long): (Option[DataFrame], TimeRange) = {
        val dfOpt = try {
          val dtSql = dataSql
          info(dtSql)
          val df = sparkSession.sql(dtSql)
          val dfOpt = Some(df)
          val preDfOpt = preProcess(dfOpt, ms)
          preDfOpt
        } catch {
          case e: Throwable =>
            error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e)
            None
        }
        val tmsts = readTmst(ms)
        (dfOpt, TimeRange(ms, tmsts))
      }
    
    
      private def tableExistsSql(): String = {
    //    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
        s"tableName LIKE '${tableName}'"
      }
    
      private def metaDataSql(): String = {
        s"DESCRIBE ${concreteTableName}"
      }
    
      private def dataSql(): String = {
        val tableClause = s"SELECT * FROM ${concreteTableName}"
        if (wheres.length > 0) {
          val clauses = wheres.map { w =>
            s"${tableClause} WHERE ${w}"
          }
          clauses.mkString(" UNION ALL ")
        } else tableClause
      }
    
    }
    
    

    HiveBatchDataConnector对象首先是继承了BatchDataConnnector,并且BatchDataConnector继承了DataConnector对象,其中,HiveBatchDataConnector实现了DataConnector对象中的data方法,这个是一个比较重要的方法:

    def data(ms: Long): (Option[DataFrame], TimeRange) = {
        val dfOpt = try {
          val dtSql = dataSql
          info(dtSql)
          val df = sparkSession.sql(dtSql)
          val dfOpt = Some(df)
          val preDfOpt = preProcess(dfOpt, ms)
          preDfOpt
        } catch {
          case e: Throwable =>
            error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e)
            None
        }
        val tmsts = readTmst(ms)
        (dfOpt, TimeRange(ms, tmsts))
      }
    

    首先,获取SparkSql的执行sql语句,通过dataSql方法,然后通过SparkSession执行sql语句,获取对应的DataFrame,同时执行DataConnector方法中的preProcess方法,封装最终的DataFrame

    def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
        // new context
        val context = createContext(ms)
    
        val timestamp = context.contextId.timestamp
        val suffix = context.contextId.id
        val dcDfName = dcParam.getDataFrameName("this")
    
        try {
          saveTmst(timestamp)    // save timestamp
    
          dfOpt.flatMap { df =>
            val (preProcRules, thisTable) =
              PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
    
            // init data
            context.compileTableRegister.registerTable(thisTable)
            context.runTimeTableRegister.registerTable(thisTable, df)
    
            // build job
            val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
    
            // job execute
            preprocJob.execute(context)
    
            // out data
            val outDf = context.sparkSession.table(s"`${thisTable}`")
    
            // add tmst column
            val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp))
    
            // clean context
            context.clean()
    
            Some(withTmstDf)
          }
    
        } catch {
          case e: Throwable =>
            error(s"pre-process of data connector [${id}] error: ${e.getMessage}", e)
            None
        }
      }
    }
    

    关于data方法的执行,我们可以在后面的源码中看到,
    最终,我们会根据一系列的配置信息,初始化对应的数据源,接着,

    dataSources.foreach(_.init)
    

    执行dataSources的初始化方法,以Hive的DataSource为例,init方法没有具体实现内容

    接着,执行

    val dqContext: DQContext = DQContext(
          contextId, metricName, dataSources, sinkParams, BatchProcessType
        )(sparkSession)
    

    创建griffin的上下文DQContext对象

    /**
     *  每一个spark计算的context唯一的上下文对象数据
      * dq context: the context of each calculation
      * unique context id in each calculation
      * access the same spark session this app created
      */
    case class DQContext(contextId: ContextId,
                         name: String,
                         dataSources: Seq[DataSource],
                         sinkParams: Seq[SinkParam],
                         procType: ProcessType
                        )(@transient implicit val sparkSession: SparkSession) {
    
      val sqlContext: SQLContext = sparkSession.sqlContext
    
      //编译
      val compileTableRegister: CompileTableRegister = CompileTableRegister()
      //运行环境
      val runTimeTableRegister: RunTimeTableRegister = RunTimeTableRegister(sqlContext)
    
      val dataFrameCache: DataFrameCache = DataFrameCache()
    
      val metricWrapper: MetricWrapper = MetricWrapper(name)
      val writeMode = WriteMode.defaultMode(procType)
    
      //数据源名称
      val dataSourceNames: Seq[String] = {
        // sort data source names, put baseline data source name to the head
        val (blOpt, others) = dataSources.foldLeft((None: Option[String], Nil: Seq[String])) { (ret, ds) =>
          val (opt, seq) = ret
          if (opt.isEmpty && ds.isBaseline) (Some(ds.name), seq) else (opt, seq :+ ds.name)
        }
        blOpt match {
          case Some(bl) => bl +: others
          case _ => others
        }
      }
      dataSourceNames.foreach(name => compileTableRegister.registerTable(name))
    
      def getDataSourceName(index: Int): String = {
        if (dataSourceNames.size > index) dataSourceNames(index) else ""
      }
    
      implicit val encoder = Encoders.STRING
      val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq
    
      //加载数据
      val dataSourceTimeRanges = loadDataSources()
    
      def loadDataSources(): Map[String, TimeRange] = {
        dataSources.map { ds =>
          (ds.name, ds.loadData(this))
        }.toMap
      }
    
      printTimeRanges
    
      private val sinkFactory = SinkFactory(sinkParams, name)
      private val defaultSink: Sink = createSink(contextId.timestamp)
    
      def getSink(timestamp: Long): Sink = {
        if (timestamp == contextId.timestamp) getSink()
        else createSink(timestamp)
      }
    
      def getSink(): Sink = defaultSink
    
      private def createSink(t: Long): Sink = {
        procType match {
          case BatchProcessType => sinkFactory.getSinks(t, true)
          case StreamingProcessType => sinkFactory.getSinks(t, false)
        }
      }
    
      def cloneDQContext(newContextId: ContextId): DQContext = {
        DQContext(newContextId, name, dataSources, sinkParams, procType)(sparkSession)
      }
    
      def clean(): Unit = {
        compileTableRegister.unregisterAllTables()
        runTimeTableRegister.unregisterAllTables()
    
        dataFrameCache.uncacheAllDataFrames()
        dataFrameCache.clearAllTrashDataFrames()
      }
    
      private def printTimeRanges(): Unit = {
        if (dataSourceTimeRanges.nonEmpty) {
          val timeRangesStr = dataSourceTimeRanges.map { pair =>
            val (name, timeRange) = pair
            s"${name} -> (${timeRange.begin}, ${timeRange.end}]"
          }.mkString(", ")
          println(s"data source timeRanges: ${timeRangesStr}")
        }
      }
    
    }
    

    创建DQContext对象,传递四个参数
    第一个参数为对应的sparkSession
    第二个参数为当前执行任务的name信息,也就是dqparam中的name属性
    第三个参数为上面获取的数据源配置信息
    第四个参数为envparam中配置的sinks配置信息
    第五个参数为批处理类型
    重点看一下

    //加载数据
      val dataSourceTimeRanges = loadDataSources()
    

    加载数据源中的数据信息,调用DataSource中的loadData()方法,这个时候,我们会看到会执行对应每一个数据源DataConnector实现的data方法,因为我们是以对应的Hive数据源为例,进入到HiveBatchDataConnector类中的data方法,重点研究一下SparkSql的执行逻辑

      def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
        // new context
        //创建一个新的DQContext
        val context = createContext(ms)
    
        val timestamp = context.contextId.timestamp
        val suffix = context.contextId.id
        val dcDfName = dcParam.getDataFrameName("this")
    
        try {
          saveTmst(timestamp)    // save timestamp
    
          dfOpt.flatMap { df =>
            val (preProcRules, thisTable) =
              PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
    
            // init data
            context.compileTableRegister.registerTable(thisTable)
            context.runTimeTableRegister.registerTable(thisTable, df)
    
            // build job
            val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
    
            // job execute
            preprocJob.execute(context)
    
            // out data
            val outDf = context.sparkSession.table(s"`${thisTable}`")
    
            // add tmst column
            val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp))
    
            // clean context
            context.clean()
    
            Some(withTmstDf)
          }
    
        } catch {
          case e: Throwable =>
            error(s"pre-process of data connector [${id}] error: ${e.getMessage}", e)
            None
        }
      }
    }
    

    创建build job信息,进入到

    // build job
    val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
    

    这里源码有个疑惑,在创建DQContext的时候,执行了一次loadDataSources(),内部是创建了一次buildDQJob(),为什么外面又创建了一次??

    首先是获取dqparma中的evaluate.rule配置规则,在DQJobBuilder中的buildDQJob方法中,首先是根据dataSource创建 steps

    def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam]): DQJob = {
        // build steps by datasources
        val dsSteps = context.dataSources.flatMap { dataSource =>
          DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam)
        }
        // build steps by rules
        /**
         * SeqDQStep(List(SparkSqlTransformStep(__missRecords,SELECT `source`.* FROM `source` LEFT JOIN `target` ON coalesce(`source`.`user_id`, '') = coalesce(`target`.`user_id`, '') AND upper(`source`.`first_name`) = upper(`target`.`first_name`) AND coalesce(`source`.`last_name`, '') = coalesce(`target`.`last_name`, '') AND coalesce(`source`.`address`, '') = coalesce(`target`.`address`, '') AND coalesce(`source`.`email`, '') = coalesce(`target`.`email`, '') AND coalesce(`source`.`phone`, '') = coalesce(`target`.`phone`, '') AND coalesce(`source`.`post_code`, '') = coalesce(`target`.`post_code`, '') WHERE (NOT (`source`.`user_id` IS NULL AND `source`.`first_name` IS NULL AND `source`.`last_name` IS NULL AND `source`.`address` IS NULL AND `source`.`email` IS NULL AND `source`.`phone` IS NULL AND `source`.`post_code` IS NULL)) AND (`target`.`user_id` IS NULL AND `target`.`first_name` IS NULL AND `target`.`last_name` IS NULL AND `target`.`address` IS NULL AND `target`.`email` IS NULL AND `target`.`phone` IS NULL AND `target`.`post_code` IS NULL),Map(),true), SparkSqlTransformStep(__missCount,SELECT COUNT(*) AS `miss` FROM `__missRecords`,Map(),false), SparkSqlTransformStep(__totalCount,SELECT COUNT(*) AS `total` FROM `source`,Map(),false), SparkSqlTransformStep(accu,
                 SELECT A.total AS `total`,
                        A.miss AS `miss`,
                        (A.total - A.miss) AS `matched`,
                        coalesce( (A.total - A.miss) / A.total, 1.0) AS `matchedFraction`
                 FROM (
                   SELECT `__totalCount`.`total` AS total,
                          coalesce(`__missCount`.`miss`, 0) AS miss
                   FROM `__totalCount` LEFT JOIN `__missCount`
                 ) AS A
             ,Map(),false), MetricWriteStep(accu,accu,DefaultFlattenType,None), RecordWriteStep(__missRecords,__missRecords,None,None)))
         */
        val ruleSteps = ruleParams.flatMap { ruleParam =>
          DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
        }
        // metric flush step
        val metricFlushStep = MetricFlushStep()
        /**
         * ++ 用于连接两个集合
         * :+ 用于在集合尾部追加集合
         * +: 用于在集合头部追加集合
         */
        DQJob(dsSteps ++ ruleSteps :+ metricFlushStep)
      }
    

    然后再创建rulesteps,调用DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)

    def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam
                                 ): Option[DQStep] = {
        val dslType = ruleParam.getDslType
        val dsNames = context.dataSourceNames
        val funcNames = context.functionNames
        val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames)
          .flatMap(_.buildDQStep(context, ruleParam))
        dqStepOpt.toSeq.flatMap(_.getNames).foreach(name =>
          context.compileTableRegister.registerTable(name)
        )
        dqStepOpt
      }
    

    首先是创建对应的RuleParamStepBuilder,然后调用buildDQStep方法

    def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = {
        val steps = buildSteps(context, param) //这个没看懂
        if (steps.size > 1) Some(SeqDQStep(steps))
        else if (steps.size == 1) steps.headOption
        else None
      }
    

    根据griffin官方的含义,这个根据对应的解析器dfs进行结对对应的执行步骤

    Apache Girffin DSL涉及详见:
    https://github.com/apache/griffin/blob/master/griffin-doc/measure/dsl-guide.md

    五 技术栈

    Spark :学习参考:https://www.cnblogs.com/qingyunzong/category/1202252.html
    Hadoop
    Hive
    Livy
    Quartz

    六 问题

    1,Apache Giffin目前的数据源是支持HIVE,TXT,文件,avro文件和实时数据源kafka,mysql和其他关系型数据库的扩展需要自己进行扩展

    2,Apache Griffin进行Mesausre生成之后,会形成Spark大数据执行规则模板,shu的最终提交是交给了Spark执行,需要懂Spark进行扩展

    3,Apache Griffin中的源码中,只有针对于接口层的数据使用的是Spring Boot,measure关于Spark定时任务的代码为scala 语言,扩展的时候需要在measure中进行扩展,需要了解一下对应的scala脚本

    4,将对应的measuer.jar包put到hadoop中的/griffin/目录下,es中的metric指标任然没有数据。(已解决)

    官方网站上给的两个解决的方法1,service/src/main/resources/env/env_batch.json里的ES配置信息不正确 这个已经修复了 2,执行spark任务的yarn服务器上没有配置ES服务器的hostname,连接异常(这个不太明白)

    部署参考

    https://blog.csdn.net/github_39577257/article/details/90607081
    http://griffin.apache.org/docs/quickstart-cn.html

    展开全文
  • 数据质量稽核工具-datacheck

    万次阅读 2017-07-19 17:38:12
    一个简单的数据质量稽核自动化工具,通过配置稽核sql,自动化发送报警。 实现常见的稽核规则的检查,例:数据的一致性、完整性、及时性检查,指标的历史波动检查、关联检查、指标平衡检查、其他根据实际业务制定的...

    简介

    一个简单的数据质量稽核自动化工具,通过配置稽核sql,自动化发送报警。
    实现常见的稽核规则的检查,例:数据的一致性、完整性、及时性检查,指标的历史波动检查、关联检查、指标平衡检查、其他根据实际业务制定的业务规则检查
    功能支持
    1、支持mysql、greenplum数据源
    2、配置sql,自动化执行,如果sql返回结果有数据,自动发送报警
    3、不同sql规则可设置优先级
    4、可控制并发


    项目源码

    https://github.com/xmingyang/datacheck
    展开全文
  • 质量数据分析工具软件的应用

    千次阅读 2019-07-26 11:12:14
    时代在发展,社会在进步。大数据在时代的发展中占据着举足轻重的地位,大数据和人工智能技术的结合造就了新一代人工智能产业。...以高效能数据处理、数据访问和数据传输为核心问题的大数据处理系统是支撑...
  • Profiler工具用了一下,不能覆盖大数据量的情况
  • 最好是免费的,有试用版也ok,开源的也可以。最好是那种对数据质量评估/分析有量化结果的表格的,谢谢各位。
  • 不论是高校还是其他行业,管理信息系统数据质量的重要性不言而喻。在业务层面,数据质量问题不仅会影响单点业务的正常开展,还会影响与该业务相关的其他业务顺利开展;在决策层面,数据质量问题会直接影响到管理层的...
  • 7种常用的数据标注工具

    千次阅读 2019-12-13 13:03:20
    标注工具数据标注行业的基础,一款好用的标注工具是提升标注效率与产出高质量标注数据的关键。 常用的数据标注工具主要有以下几种:2D框、语义分割、多边形分割、点标注、线标注、视频标注、3D立方体标注等。 1....
  • 【数据治理】数据质量探查

    千次阅读 2019-05-28 15:42:54
    结合这段时间做数据质量分析的过程,构思了数据质量探查工具的实现思路(下图)。简单来说,这个工具应该是基于对表数据的分析结果,归纳并定义出质量检查规则,这两个过程应该是反复迭代进行。其中, 数据探查部分...
  • 数据挖掘工具

    千次阅读 2018-09-06 19:18:14
    一、产品简介: 顶尖数据挖掘平台(TipDM)是广州泰迪智能科技有限公司自主研发的一个数据挖掘工具,使用JAVA语言开发,能从各种数据源获取数据,建立各种不同的数据挖掘模型(目前已集成数十种预测算法和分析技术,...
  • 数据治理工具

    千次阅读 2019-01-02 10:43:39
    Data Governance Tools - Evaluation Criteria, Big Data Governance, 内容: 前言 第一部分:导论: 1:数据治量简介 ...数据治理的支柱 ... 2:企业数据管理参考架构 ...数据治理工具 摘要 第...
  • 数据质量控制与数据治理

    千次阅读 2018-09-29 17:18:08
    数据质量控制与数据治理 背景 对于一个公司来说,最重要的资产是数据,数据的核心价值可以理解为核心商业价值,我个人认为是体现在两方面,一是能为企业带来更多的盈利,二是能为企业规避风险。 实现数据价值就需要...
  • 数据清洗工具OpenRefine

    万次阅读 2014-02-27 13:39:31
    数据清洗工具OpenRefine作者:chszs,转载需注明。博客主页:http://blog.csdn.net/chszs数据经常被称为一座金矿,尤其是在当今数据驱动的经济环境下更是如此。怎样把数据集在OpenRefine中进行转换,优化数据质量...
  • 数据仓库的数据质量

    千次阅读 2011-08-31 22:20:41
    (一)数据质量的衡量标准、好处和问题   数据质量的好坏是决定一个数据仓库成功的关键,但是需要从那些方面衡量数据仓库中数据的质量呢? 可以从下列方面衡量系统中的数据质量:  准确性:存储在系统中的关于...
  • 数据仓库之数据质量建设(深度好文)

    千次阅读 多人点赞 2021-09-24 11:17:29
    数仓建设真正的难点不在于数仓设计,而在于后续业务发展起来,业务线变的庞大之后的数据治理,而数据治理的范围非常广,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在这么多治理内容中,大家想下最重要的...
  • 数据质量评估标准

    万次阅读 2018-05-31 21:44:55
    数据质量是保证数据应用的基础,它的评估标准主要包括四个方面,完整性、一致性、准确性、及时性。评估数据是否达到预期设定的质量要求,就可以通过这四个方面来进行判断。完整性完整性指的是数据信息是否存在缺失的...
  • Python常用数据分析工具

    千次阅读 2019-04-17 16:09:37
    Python数据分析工具 Python 本身的数据分析功能不强,需要安装一些第三方扩展库来增强它的能力。常用 的库有Numpy 、Scipy 、Matplotlib 、Pandas 、Scikit-Learn 、Keras 和Gensim 等,下面将对 这些库的安装和...
  • 数据质量监控

    千次阅读 2019-04-20 22:10:03
    数据质量监控 原创: 木东居士 木东居士 4天前 0x00 概述 随着大数据时代的带来,数据的应用也日趋繁茂,越来越多的应用和服务都基于数据而建立,数据的重要性不言而喻。而且,数据质量是数据分析和数据挖掘结论有效...
  • ETL数据抽取工具

    万次阅读 2018-06-19 16:20:51
    ETL负责将分布的、异构数据源中的数据如关系数据、平面数据文件等抽取到临时中间层后进行清洗、转换、集成,最后加载到数据仓库或数据集市中,成为联机分析处理、数据挖掘的基础。旗鼓相当:Datastage与Powercenter...
  • 最近在做相关项目,评估了多个数据标注工具,也接触了几家数据标注公司和平台,总结如下,供各位参考。 视频数据标注平台(标注外包公司) 数据标注公司的工作比较多样,但视频标注对工具要求稍高一些,能在线上做的...
  • 最近在做人脸识别相关项目,评估了多个数据标注工具,也接触了几家数据标注公司和平台,总结如下,供各位参考。 图片数据标注平台(标注外包公司) 数据标注公司的工作比较多样,图片标注是最普通的。视频标注、语音...
  • 数据收集及数据分析工具介绍

    万次阅读 2018-01-13 00:00:00
    以下文章内容,来自草堂君的新书《人人都会数据分析-从生活实例学统计》。因为新书中增添和细化了很多知识点,所以草堂君会逐步将...随着数据分析在各个领域发挥越来越重要的作用,针对不同数据分析步骤的软硬件工具
  • 数据交换工具Kettle

    万次阅读 热门讨论 2008-05-29 16:43:00
    网上搜集了一些关于开源数据交换工具Kattle的文章,特收藏如下:文章一:ETL和Kettle简介ETL即数据抽取(Extract)、转换(Transform)、装载(Load)的过程。它是构建数据仓库的重要环节。数据仓库是面向主题的、...
  • 代码质量工具总结

    万次阅读 2018-10-13 11:59:02
    代码质量检测工具 PMD FindBugs checkstyle SonarLint/SonarQube alibaba coding guiding 仅仅有质量检测工具还不够,还需要代码审查工具,加以人工审查辅助 代码审查工具简介 Review Board Gerrit GitLab...
  • 数据质量监控Griffin——使用

    千次阅读 2021-07-07 14:42:13
    数据质量监控griffin: 地址:http://XXXXXXXXX:4200/#/health 账号:admin 密码:123456 二、Griffin是干什么的? 官方介绍 大数据模块是大数据平台中数据方案的一个功能组件,Griffin(以下简称Griffin)是一个...
  • 数据仓库的目的是构建面向分析的集成化数据...Informatica平台是一套完善的技术,可支持多项复杂的企业级数据集成计划,包括:企业数据集成、大数据管理、数据质量、数据治理、主数据管理、数据安全和云数据集成等。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 299,183
精华内容 119,673
关键字:

数据质量工具