2017-06-02 09:44:35 dominic_tiger 阅读数 2920

背景:如何在数据湖平台kylo中,添加数据源(如下图,支持从phoenix数据源获取数据)
这里写图片描述

步骤

1、配置Controller Service

  • 确保在root process group下进行配置,如下图
    这里写图片描述

    -点击配置按钮进行配置
    这里写图片描述

  • 过滤选择需要的Controller Service
    这里写图片描述

  • 修改相关配置参数
    这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

  • 完成配置
    这里写图片描述

2、拷贝步骤1中的驱动包至/opt/kylo/kylo-services/plugin/下(建议)

3、修改kylo配置文件/opt/kylo/kylo-services/conf/application.properties

在文件末尾加入如下两行
nifi.service.controller-service-name.database_user
nifi.service.controller-service-name.password

注意controller-service-name为小写
nifi.service.phoenix.database_user=root
nifi.service.phoenix.password=root

4、重启kylo服务

service kylo-services restart

参考:https://stackoverflow.com/questions/42117944/how-to-add-a-database-source-to-kylo

2018-10-16 10:12:06 wiborgite 阅读数 3059

定位

Kylo定位于企业级的数据湖管理平台(Data Lake Platform),它是基于Spark和NiFi的开源数据湖编排框架,Kylo提供的主要特性包括数据获取、数据准备和数据发现,并支持元数据管理、数据治理和高级安全特性。

厂商信息

Kylo是由Teradata天睿公司开源,并被航空、保险、电信、金融服务、银行和零售行业的全球Top N公司所应用。

经典场景

通过Kylo的GUI界面,业务人员可以按照他们关心的方式来操作数据,包括:创建数据源、定义数据加载、数据预处理、转换,发布到目标系

Kylo系统介绍

系统组件

  • Kylo-ui:即前端web组件,主要包括Operations、Feed Manager和Admin三个一级功能:
  • Operations:提供仪表盘、服务状态监控、Job执行概览、告警查看、SLA调度等功能。
  • Feed Manager:提供Feed管理、目录分类、SLA、可视化查询以及数据预览等能力。
  • Admin:提供用户管理、分组管理、数据源管理、模板管理等功能。
  • kylo-services:Kylo服务后端,为前端提供Rest ful接口,并实现Job 仓库、元数据仓库的管理能力,并负责与依赖组件如ES、NiFi、Hadoop Cluster的通信。

系统依赖
Kylo的安装和运行依赖于多种外部组件,组件及其作用如下所示

概述说明如下:

MySQL/PG/MS SQL Server :kylo需要使用关系型数据库实现其元数据存储与管理

MQ:用于不同组件之间的消息通信

JDK:Kylo运行在java虚拟机中

ES/Solr:用于Hive中元数据或Feed数据的全局搜索(前提是在创建Feed时需要指定索引)

Spark、Hive、HDFS:Kylo具有调用大数据集群的能力,Kylo默认将Spark作为Hive的执行引擎。

组网模式

Kylo的自有服务和依赖服务可以分开部署,也可以部署在一台服务器上。Kylo既支持单机模式也可以集群方式。
组网结构如下所示:

与大数据系统的关系
在物理上,Kylo既独立于源系统和目标系统,同时也与CDH/HDP Cluster相互独立。但Kylo需要集成Hive、HDFS、Spark等的lib库,从而实现与大数据集群的通信。
Kylo集成的部分lib库如下所示:

与NiFi的关系
Kylo依赖NiFi实现数据流编排的能力,即Kylo中对数据的处理依赖于NiFi中的模板,在工作中需要先在NiFi中完成模板定义,然后从NiFi中导出并导入到Kylo中,如下所示:
NiFi中的数据流:

NiFi中的模板:

Kylo中的模板:

2018-10-16 20:40:13 wiborgite 阅读数 3036

Kylo功能介绍


Kylo提供服务用于生成Hive表、基于Hadoop中的数据生成schema、执行基于Spark的转换、元数据跟踪、监控feed和SLA策略、发布数据到目标系统。
Kylo前端提供的功能模块包括Operations、Feed Manager、Admin。

Operations

Operations侧重于运维管理。
Dashboard
Dashboard通过仪表盘和列表使用户快速了解系统的状态:

Services

Services则用于检测Kylo集成的服务的状态状态:

SLA Assessments

SLA Assessments提供SLA任务调度结果查看功能:

SLA Schedule

  • SLA Schedule提供SLA调度配置能力:
  • JobsJobs提供执行作业的查看功能:

  • Alerts

  • Alerts提供告警查看功能:

  • Charts

  • Charts提供系统指标的图形化展示能力:

Feed Manager

Feed Manager提供数据集以及数据流管理能力,是Kylo的核心功能。

Feeds

Feeds:用于定义数据集以及数据流,其中数据流是在Feeds使用的模板中完成定义的。Feeds支持调度配置,从而周期性的进行数据处理。

 

 Feed的血缘:

Feed的版本:

Categories

Categories表示Feed的目录,用于在逻辑上进行Feeds的组织与管理。对于进入Hive的Feeds,其Category相当于Database名称。

                    

 

SLA

SLA即Service Level Agreement,在该页面可以配置检查计划,对于不满足的生成告警等信息。

Visual Query

提供对以接入Kylo的数据源的可视化查询能力,即一站式查询各种数据源中的数据,支持SQL查询,并支持对查询结果进行导出。

支持可视化查询:

如下为实现一个Join查询

Catalog

提供Kylo以及数据源中元数据以及数据的查询能力,数据预览能力,支持SQL查询。

 

Admin

Admin提供系统管理能力。

Data Sources

Kylo支持将数据库作为数据源,并从这些数据库中抽取数据。

Domain Types

用于定义域类型。

Properties

                        

Templates

模板定义了Feeds在Kylo中的处理模式。模板为Feeds的处理提供了可视化向导以及其中的可配参数等内容,并决定了最终的数据处理流程。

这里的模板是指Apache NiFi模板,Kylo仅提供了已定义好的模板的上传与注册功能。

User

提供用户管理能力,例如用户的创建、修改等功能。

Groups

Group是一组权限的集合,用户可以加入某个组,从而获得相应的权限。

SLA Mail

用于配置SLA邮件模板。

 

全局检索

Kylo可以集成ES或Solr作为其检索引擎,实现Feeds的全局检索。Kylo允许为保存在数据湖中的数据创建索引,从而实现全局检索能力。

 

工作流程

考虑如下场景:Kylo从数据库中定期抽取数据,进行数据处理并将结果保存在Hive中。

 

Kylo优劣分析

Kylo优势

  • 支持Feed的版本管理。
  • 提供了良好的权限控制能力。
  • 集生产(Feed Manage)、监控运维(Operations)、管理(Admin)于一体。
  • 提供了良好的安全控制能力。
  • 支持Cron风格调度控制。
  • 支持数据源种类丰富。
  • 支持数据压缩和多种存储格式。
  • Kylo以及NiFi都遵循Apache License。

Kylo劣势

  • 血缘分析仅支持到Feed级别。
  • 外部依赖较多,特别是对NiFi的依赖重,用于实现数据流的模板需要在NiFi中进行。

 

功能总结

Kylo支持的数据源包括哪些

Kylo支持的数据源非常广泛,其中关系型数据库的可以在Kylo中预先定义以及管理,对于关系型数据库数据源,Kylo支持数据的可视化查询、预览等能力。

Kylo对其它类型的数据源的支持依赖于NiFi,可以在NiFi模板template中实现个性化的数据源支持。

Kylo对关系型数据库数据源的支持

接入一种RDBMS数据源

在Kylo中新增一个Mysql类型的数据源(在官方文档中仅给出MySQL/MariaDB/Oracle/PG类型的示例,但由于是通过JDBC方式实现的,因此其它类型应该也是支持的):

数据可视化查询

     在画布中添加表并支持构建join查询,支持画布向SQL的映射,也允许用户直接编辑SQL执行查询:

  • 在查询结果中用户可以查看数据的一些统计数据,如方差等
  • 查询结果可以以图形化方式展示
  • 查询结果支持导出

  

Catalog功能

对于接入Hive中的RDBMS以及Hive(应该也支持HBase),Kylo可以作为一个jdbc客户端,直接对这些数据源中的数据进行查询。

如下所示为在Kylo web中查询已接入的MySQL数据库的中kylo.AUDIT_LOG

表,包括字段查看、数据预览以及SQL查询三种功能:

  • 字段查看:
  • 数据预览
  • Query

 

非RDBMS数据源

对于非RDBMS类型的数据源的支持,该能力是由NiFi提供的,而NiFi对数据源的支持非常广泛,例如FTP、SFTP、Kafka、S3、HDFS等,如下图所示:

                           在NiFi的template中定义支持哪些数据源,在Kylo中使用该template创建Feeds时,即可从指定的数据源获取数据,如下给出而来Kylo(NiFi)预置的三种模板支持的数据源:

  • Data Ingest模板支持接入的数据源如下所示:

  • Streaming Ingest模板支持接入的数据源如下所示:

 

  • Advanced Ingest模板支持接入的数据源如下所示

应用示例

本节主要描述如何使用NiFi以及Kylo实现一个数据流,具体配置细节不做体现。

数据场景

利用sqoop将源数据库中导出数据并保存在HDFS上,让后加载到Hive中,并将该表与已有的其它表合并,最终再讲合并数据通过sqoop导出到目标数据库中。

 

实现流程

1 在NiFi中创建数据流模板dataflow001,如下所示:

2 在kylo中导入已创建好的模板,如下所示:

在导入模板时,需要指定模板的相关配置参数,示例如下所示:

3 使用已导入的模板dataflow001创建一个Feed,如下所示:

Feed中不仅包括数据接入同时也包括了数据处理的部分以及调度的执行,即一个Feed是一个完整的数据流。

 

价值点

可视化交互查询

Kylo提供的可视化交互式查询能力

  • 支持通过图形化的编排自动生成SQL,也运行用户直接编写SQL
  • 特征值的统计分析:

用户可以直接查看查询结果中各个字段(特征)的数学统计值,如下图所示:

  • 支持特征运算生成新的特征值

在查询结果页面,kylo提供了很多内置函数,通过计算可生成新的特征(字段)

 

 

  • 支持查询结果保存

用户可以将查询结果保存为csv、json、xml、orc、parquet格式的文件,也可以将查询结果直接保存到目标数据库中。

价值分析

  • 数据可视化:

数据的交互式查询是一个非常有亮点和实际应用价值的功能点,有所见即所得的效果,但目前在DEP中,实际是无法直接查询数据的,由于采用的是操作定义与操作执行分离的模式,因而所有对数据的操作也都是对用户不可见的,用户无法“看见”数据发生的变化,在后续版本中可以考虑加入类似的功能。

  • 数据下发:

Kylo支持将查询结果直接导出为文件,这个功能也是非常有用的,考虑机器学习或面向数据分析任务的场景,往往需要先提供一小部分的样例数据给下游用于建模以及模型训练,最后才是完整的数据下发。在DEP数据服务设计中可以考虑下是否支持该场景

  • 数学统计

在该功能上,kylo提供的是基于查询结果范围内的特征的数学统计,为DEP上实现该功能提供了一个思路,比如何时在多大范围的数据集上计算哪些统计指标。

全局检索

Kylo提供基于ES或Solr的全局检索能力

基于ElasticSearch或Solr搜索引擎的全局检索能力,kylo可以将所有Feeds的数据同步到搜索引擎中,从而提供全局查询能力,并可以从查询结果列表中获取数据详情,如下图所示:

价值分析

与Kylo类似,在元数据仓库工具wherehows中也提供了全局检索功能,因此在DEP后续的规划设计中,也应考虑下该功能是否有相关的应用场景,以决定是否提供类似的功能。

 

数据预览

Kylo支持对Hive以及上游数据源中数据的预览能力

Kylo除可以直接查询Hive中的数据,也支持查询已在Kylo中添加的数据源(关系型数据库)中的数据。

价值分析

可以看到类似的产品(如wherehows、dataworks、teable)都提供了数据预览的能力,DEP在V1.1中会提供HQL脚本执行能力,可以在此基础上更进一步提供数据预览能力。

数据流的迁移能力

NiFi基于xml的数据流导出导入

NiFi中通过Template来定义数据流,并且支持导出为xml格式,从而实现数据流的一次定义多次使用的能力,如下为导出的xml文件的部分内容。

价值分析

在DEP当前的设计中将所有设计数据保存在数据库中,在以后的设计中,若考虑设计态与运行态的分离,则可以参考Kylo/NiFi的实现方式(Kylo可以导入NiFi中的模板)。

 

 

 

2019-03-27 22:18:09 yitengtongweishi 阅读数 166

https://github.com/Teradata/kylo/tree/master/integrations/spark/spark-job-profiler

A Spark job capable of performing generating profile statistics against a source table, partition, or for a provided query.

流程

  • Data is typically read from a source table such as -valid and a given partition.
  • Profile statistics are generated.
  • Profiler statistics is written to -profile table.

加载数据

    @Inject
    private SparkContextService scs;

    @Inject
    private SQLContext sqlContext;
    
    DataSet dataDF = scs.toDataSet(sqlContext.createDataFrame(dataRDD, schema));

生成 Profile statistics

    ProfilerConfiguration configuration = new ProfilerConfiguration();
    
    StatisticsModel statsModel = profiler.profile(dataDF, configuration);
package com.thinkbiganalytics.spark.dataprofiler;

/*-
 * #%L
 * kylo-spark-job-profiler-api
 * %%
 * Copyright (C) 2017 ThinkBig Analytics
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import java.io.Serializable;

/**
 * Helper class to hold parameters for profiler
 */
@SuppressWarnings("unused")
public class ProfilerConfiguration implements Serializable {

    private static final long serialVersionUID = -6099960489540200374L;

    private Integer decimalDigitsToDisplayConsoleOutput = 4;
    private String inputAndOutputTablePartitionKey = "partitionKey";
    private String inputTablePartitionColumnName = "processing_dttm";
    private Integer numberOfTopNValues = 5;
    private String outputDbName = "default";
    private String outputTableName = "profilestats";
    private String outputTablePartitionColumnName = "processing_dttm";
    private String sqlDialect = "hiveql";  // Hive supported HQL
    private Integer bins = 5;

    /**
     * Number of decimals to print out in console<br>
     * (not considered when writing to table)
     */
    public Integer getDecimalDigitsToDisplayConsoleOutput() {
        return decimalDigitsToDisplayConsoleOutput;
    }

    public void setDecimalDigitsToDisplayConsoleOutput(Integer decimalDigitsToDisplayConsoleOutput) {
        this.decimalDigitsToDisplayConsoleOutput = decimalDigitsToDisplayConsoleOutput;
    }

    /**
     * Partition key to read and write to
     */
    public String getInputAndOutputTablePartitionKey() {
        return inputAndOutputTablePartitionKey;
    }

    public void setInputAndOutputTablePartitionKey(String inputAndOutputTablePartitionKey) {
        this.inputAndOutputTablePartitionKey = inputAndOutputTablePartitionKey;
    }

    /**
     * Partition column name for input table
     */
    public String getInputTablePartitionColumnName() {
        return inputTablePartitionColumnName;
    }

    public void setInputTablePartitionColumnName(String inputTablePartitionColumnName) {
        this.inputTablePartitionColumnName = inputTablePartitionColumnName;
    }

    /**
     * N for top-N values to store in result table<br>
     * A required command line parameter
     */
    public Integer getNumberOfTopNValues() {
        return numberOfTopNValues;
    }

    public void setNumberOfTopNValues(Integer numberOfTopNValues) {
        this.numberOfTopNValues = numberOfTopNValues;
    }

    /**
     * Name of database to write result to
     */
    public String getOutputDbName() {
        return outputDbName;
    }

    public void setOutputDbName(String outputDbName) {
        this.outputDbName = outputDbName;
    }

    /**
     * Name of table to write result to<br>
     * A required command line parameter
     */
    public String getOutputTableName() {
        return outputTableName;
    }

    public void setOutputTableName(String outputTableName) {
        this.outputTableName = outputTableName;
    }

    /**
     * Partition column name for output table
     */
    public String getOutputTablePartitionColumnName() {
        return outputTablePartitionColumnName;
    }

    public void setOutputTablePartitionColumnName(String outputTablePartitionColumnName) {
        this.outputTablePartitionColumnName = outputTablePartitionColumnName;
    }

    /**
     * Gets the flavor of queries to run.
     */
    public String getSqlDialect() {
        return sqlDialect;
    }

    public void setSqlDialect(String sqlDialect) {
        this.sqlDialect = sqlDialect;
    }

    public Integer getBins() {
        return this.bins;
    }

    /**
     * Set the number of histogram bins to generate
     * @param bins the number of bins
     */
    public void setBins(Integer bins) {
        this.bins = bins;
    }
}
package com.thinkbiganalytics.spark.dataprofiler

import com.thinkbiganalytics.spark.dataprofiler.function.PartitionLevelModels
import com.thinkbiganalytics.spark.{DataSet, SparkContextService}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructField

/** The standard implementation of `Profiler` that uses Spark to analyze the columns.
  *
 * @param sqlContext          the Spark SQL context
 * @param sparkContextService the Spark context service
  */
class StandardProfiler(val sqlContext: SQLContext, val sparkContextService: SparkContextService) extends Profiler {
    override def profile(dataset: DataSet, profilerConfiguration: ProfilerConfiguration): StatisticsModel = {
        /* Update schema map and broadcast it*/
        val schemaMap = populateSchemaMap(dataset)

        /* Get profile statistics */
        profileStatistics(dataset, schemaMap, profilerConfiguration).orNull
    }

    /** Generates a map from column index to field type.
      *
      * @param dataset the data set
      * @return the schema map
      */
    private def populateSchemaMap(dataset: DataSet): Map[Int, StructField] = {
        dataset.schema().fields.zipWithIndex.map(tuple => (tuple._2, tuple._1)).toMap
    }

    /** Profiles the columns in the specified data set.
      *
      * @param dataset   the data set
      * @param schemaMap the schema map
      * @return the statistics model
      */
    private def profileStatistics(dataset: DataSet, schemaMap: Map[Int, StructField], profilerConfiguration: ProfilerConfiguration): Option[StatisticsModel] = {
        // Get ((column index, column value), count)
        val columnValueCounts = dataset.rdd
            .flatMap((row) => row.toSeq.zipWithIndex.map((tuple) => ((tuple._2, tuple._1), 1)))
            .reduceByKey((a, b) => a + b)

        // Generate the profile model
        val partitionLevelModels = columnValueCounts.mapPartitions(new PartitionLevelModels(schemaMap, profilerConfiguration))
        val result = if (!partitionLevelModels.isEmpty) {
            Option(partitionLevelModels.reduce((a, b) => {
                a.combine(b)
                a
            }))
        } else {
            Option.empty
        }

        if (result.isDefined) {
            // Add histogram statistics to the combined model
            for ((colIdx,field) <- schemaMap) result.get.addAggregate(colIdx, dataset, field);
        }

      result;
    }
}
package com.thinkbiganalytics.spark.dataprofiler.model;

/*-
 * #%L
 * thinkbig-spark-job-profiler-app
 * %%
 * Copyright (C) 2017 ThinkBig Analytics
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import com.thinkbiganalytics.spark.DataSet;
import com.thinkbiganalytics.spark.dataprofiler.ColumnStatistics;
import com.thinkbiganalytics.spark.dataprofiler.ProfilerConfiguration;
import com.thinkbiganalytics.spark.dataprofiler.StatisticsModel;
import com.thinkbiganalytics.spark.dataprofiler.columns.*;
import com.thinkbiganalytics.spark.dataprofiler.histo.HistogramStatistics;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;


/**
 * Class to store the profile statistics
 */
public class StandardStatisticsModel implements Serializable, StatisticsModel {

    private static final long serialVersionUID = -6115368868245871747L;

    private static final Logger log = LoggerFactory.getLogger(StandardStatisticsModel.class);
    private final Map<Integer, StandardColumnStatistics> columnStatisticsMap = new HashMap<>();

    @Nonnull
    private final ProfilerConfiguration profilerConfiguration;

    public StandardStatisticsModel(@Nonnull final ProfilerConfiguration profilerConfiguration) {
        this.profilerConfiguration = profilerConfiguration;
    }

    /**
     * Include a column value in calculation of profile statistics for the column
     *
     * @param columnIndex numeric index of column (0-based)
     * @param columnValue value in column
     * @param columnCount number of times value is found in column
     * @param columnField schema information of the column
     */
    public void add(Integer columnIndex, Object columnValue, Long columnCount, StructField columnField) {

        StandardColumnStatistics newColumnStatistics;

        if (!columnStatisticsMap.containsKey(columnIndex)) {
            DataType columnDataType = columnField.dataType();

            switch (columnDataType.simpleString()) {

                /* === Group 1 ===*/

                /*
                 * Hive datatype: 		TINYINT
                 * SparkSQL datatype: 	        tinyint
                 * Java datatype:		Byte
                 */
                case "tinyint":
                    newColumnStatistics = new ByteColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		SMALLINT
                 * SparkSQL datatype: 	        smallint
                 * Java datatype:		Short
                 */
                case "smallint":
                    newColumnStatistics = new ShortColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		INT
                 * SparkSQL datatype: 	        int
                 * Java datatype:		Int
                 */
                case "int":
                    newColumnStatistics = new IntegerColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		BIGINT
                 * SparkSQL datatype: 	        bigint
                 * Java datatype:		Long
                 */
                case "bigint":
                    newColumnStatistics = new LongColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 2 === */

                /*
                 * Hive datatype: 		FLOAT
                 * SparkSQL datatype: 	        float
                 * Java datatype:		Float
                 */
                case "float":
                    newColumnStatistics = new FloatColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		DOUBLE
                 * SparkSQL datatype: 	        double
                 * Java datatype:		Double
                 */
                case "double":
                    newColumnStatistics = new DoubleColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 3 === */

                /*
                 * Hive datatypes: 		STRING, VARCHAR
                 * SparkSQL datatype: 	        string
                 * Java datatype:		String
                 */
                case "string":
                    newColumnStatistics = new StringColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 4 === */

                /*
                 * Hive datatype: 		BOOLEAN
                 * SparkSQL datatype: 	        boolean
                 * Java datatype:		Boolean
                 */
                case "boolean":
                    newColumnStatistics = new BooleanColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 5 === */

                /*
                 * Hive datatype: 		DATE
                 * SparkSQL datatype: 	        date
                 * Java datatype:		java.sql.Date
                 */
                case "date":
                    newColumnStatistics = new DateColumnStatistics(columnField, profilerConfiguration);
                    break;


                /*
                 * Hive datatype: 		TIMESTAMP
                 * SparkSQL datatype: 	        timestamp
                 * Java datatype:		java.sql.Timestamp
                 */
                case "timestamp":
                    newColumnStatistics = new TimestampColumnStatistics(columnField, profilerConfiguration);
                    break;



                /* === Group 6 === */

                default:
                    /*
                     * Hive datatype: 		DECIMAL
                     * SparkSQL datatype: 	        decimal
                     * Java datatype:		java.math.BigDecimal
                     *
                     * Handle the decimal type here since it comes with scale and precision e.g. decimal(7,5)
                     */
                    String decimalTypeRegex = "decimal\\S+";
                    if (columnDataType.simpleString().matches(decimalTypeRegex)) {
                        newColumnStatistics = new BigDecimalColumnStatistics(columnField, profilerConfiguration);
                    }

                    /*
                     * Hive datatypes: CHAR, BINARY, ARRAY, MAP, STRUCT, UNIONTYPE
                     */
                    else {
                        if (log.isWarnEnabled()) {
                            log.warn("[PROFILER-INFO] Unsupported data type: {}", columnDataType.simpleString());
                        }
                        newColumnStatistics = new UnsupportedColumnStatistics(columnField, profilerConfiguration);
                    }
            }
            columnStatisticsMap.put(columnIndex, newColumnStatistics);
        }

        StandardColumnStatistics currentColumnStatistics = columnStatisticsMap.get(columnIndex);
        currentColumnStatistics.accomodate(columnValue, columnCount);
    }

    /**
     * Generates additional statistics that requires operations on the entire dataFrame
     *
     * @param columnIndex the column index
     * @param ds          the dataSet
     * @param columnField the column
     */
    public void addAggregate(Integer columnIndex, DataSet ds, StructField columnField) {

        // Generate histogram statistics (numeric columns) and add statistics to model
        if (HistogramStatistics.isNumeric(columnField)) {
            HistogramStatistics histogramStatistics = new HistogramStatistics(profilerConfiguration);
            histogramStatistics.accomodate(columnIndex, ds.javaRDD(), columnField);
            columnStatisticsMap.get(columnIndex).getStatistics().addAll(histogramStatistics.getStatistics());
        }
    }

    protected Double toDoubleFunction(Row row) {
        return row.getDouble(0);
    }

    /**
     * Combine another statistics model
     */
    public void combine(StandardStatisticsModel statisticsModel) {

        for (Integer k_columnIndex : statisticsModel.columnStatisticsMap.keySet()) {

            StandardColumnStatistics columnStatistics = columnStatisticsMap.get(k_columnIndex);
            StandardColumnStatistics v_columnStatistics = statisticsModel.columnStatisticsMap.get(k_columnIndex);

            if (columnStatistics != null) {

                columnStatistics.combine(v_columnStatistics);

            } else {
                columnStatisticsMap.put(k_columnIndex, v_columnStatistics);
            }
        }
    }

    /**
     * Print the profile statistics on console
     *
     * @return profile model string
     */
    private String printModel() {

        StringBuilder sb = new StringBuilder();
        sb.append("====== Statistics Model ======");
        sb.append("\n");

        for (Map.Entry<Integer, StandardColumnStatistics> entry : columnStatisticsMap.entrySet()) {
            sb.append("=== Column #")
                .append(entry.getKey())
                .append("\n");

            sb.append(entry.getValue().getVerboseStatistics())
                .append("\n");
        }

        sb.append("==============================");
        return sb.toString();
    }


    /**
     * Print the profile statistics on console
     */
    @Override
    public String toString() {
        return printModel();
    }


    /**
     * Get the column statistics map (column number mapped to column statistics)
     *
     * @return column statistics map
     */
    @Override
    @SuppressWarnings({"unchecked", "squid:S1905"})
    public Map<Integer, ColumnStatistics> getColumnStatisticsMap() {
        return (Map) columnStatisticsMap;
    }
}

Profiler statistics 保存至 Hive 表

    OutputWriter.writeModel(statisticsModel, profilerConfiguration, sqlContext, sparkContextService);
package com.thinkbiganalytics.spark.dataprofiler.output;

/*-
 * #%L
 * thinkbig-spark-job-profiler-app
 * %%
 * Copyright (C) 2017 ThinkBig Analytics
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import com.thinkbiganalytics.hive.util.HiveUtils;
import com.thinkbiganalytics.spark.DataSet;
import com.thinkbiganalytics.spark.SparkContextService;
import com.thinkbiganalytics.spark.dataprofiler.ColumnStatistics;
import com.thinkbiganalytics.spark.dataprofiler.ProfilerConfiguration;
import com.thinkbiganalytics.spark.dataprofiler.StatisticsModel;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;

import javax.annotation.Nonnull;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * Class to write profile statistics result to Hive table
 */
public class OutputWriter implements Serializable {

    private static final long serialVersionUID = -1250818467175932284L;

    /**
     * Write the profile statistics to Hive.
     */
    public static void writeModel(@Nonnull final StatisticsModel model, @Nonnull final ProfilerConfiguration profilerConfiguration, @Nonnull final SQLContext sqlContext,
                                  @Nonnull final SparkContextService scs) {
        final OutputWriter writer = new OutputWriter(profilerConfiguration);

        for (final ColumnStatistics column : model.getColumnStatisticsMap().values()) {
            writer.addRows(column.getStatistics());
        }

        writer.writeResultToTable(sqlContext, scs);
    }

    private final List<OutputRow> outputRows = new ArrayList<>();

    @Nonnull
    private final ProfilerConfiguration profilerConfiguration;

    private OutputWriter(@Nonnull final ProfilerConfiguration profilerConfiguration) {
        this.profilerConfiguration = profilerConfiguration;
    }

    /**
     * Helper method:
     * Check if output configuration (db, table, partition column, partition key) has been set
     */
    private boolean checkOutputConfigSettings() {
        return !((profilerConfiguration.getOutputDbName() == null)
                 || (profilerConfiguration.getOutputTableName() == null)
                 || (profilerConfiguration.getOutputTablePartitionColumnName() == null)
                 || (profilerConfiguration.getInputAndOutputTablePartitionKey() == null));
    }

    /**
     * Add multiple rows to write in output
     *
     * @param rows list of rows for output
     */
    public void addRows(List<OutputRow> rows) {
        outputRows.addAll(rows);
    }

    /**
     * Write result to Hive table
     *
     * @return boolean indicating result of write
     */
    @SuppressWarnings("unchecked")
    public boolean writeResultToTable(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs) {
        boolean retVal = false;

        if (!checkOutputConfigSettings()) {
            System.out.println("Error writing result: Output database/table/partition column/partition key not set.");
        } else if (sqlContext == null) {
            System.out.println("Error writing result: Spark context is not available.");
        } else {

            @SuppressWarnings("squid:S2095") final JavaRDD<OutputRow> outputRowsRDD = JavaSparkContext.fromSparkContext(sqlContext.sparkContext()).parallelize(outputRows);
            DataSet outputRowsDF = scs.toDataSet(sqlContext, outputRowsRDD, OutputRow.class);
            //outputRowsDF.write().mode(SaveMode.Overwrite).saveAsTable(outputTable);

            // Since Spark doesn't support partitions, write to temp table, then write to partitioned table
            String tempTable = profilerConfiguration.getOutputTableName() + "_" + System.currentTimeMillis();
            outputRowsDF.registerTempTable(tempTable);

            createOutputTableIfNotExists(sqlContext, scs);
            writeResultToOutputTable(sqlContext, scs, tempTable);
            retVal = true;
        }

        return retVal;
    }


    /**
     * Create output table if does not exist
     */
    private void createOutputTableIfNotExists(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs) {
        String createTableSQL = "CREATE TABLE IF NOT EXISTS " + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputDbName(), profilerConfiguration.getOutputTableName()) + "\n"
                                + "(columnname STRING, metricname STRING, metricvalue STRING)\n"
                                + "PARTITIONED BY (" + profilerConfiguration.getOutputTablePartitionColumnName() + " STRING)\n"
                                + "ROW FORMAT DELIMITED\n"
                                + "FIELDS TERMINATED BY ','\n"
                                + "STORED AS TEXTFILE";

        scs.sql(sqlContext, createTableSQL);
    }


    /**
     * Write to output table
     */
    private void writeResultToOutputTable(@Nonnull final SQLContext sqlContext, @Nonnull final SparkContextService scs, @Nonnull final String tempTable) {
        String insertTableSQL = "INSERT INTO TABLE " + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputDbName(), profilerConfiguration.getOutputTableName())
                                + " PARTITION (" + HiveUtils.quoteIdentifier(profilerConfiguration.getOutputTablePartitionColumnName()) + "="
                                + HiveUtils.quoteString(profilerConfiguration.getInputAndOutputTablePartitionKey()) + ")"
                                + " SELECT columnname,metrictype,metricvalue FROM " + HiveUtils.quoteIdentifier(tempTable);

        scs.sql(sqlContext, insertTableSQL);

        System.out.println("[PROFILER-INFO] Metrics written to Hive table: "
                           + profilerConfiguration.getOutputDbName() + "." + profilerConfiguration.getOutputTableName()
                           + " Partition: (" + profilerConfiguration.getOutputTablePartitionColumnName() + "='" + profilerConfiguration.getInputAndOutputTablePartitionKey() + "')"
                           + " [" + outputRows.size() + " rows]");
    }
}
package com.thinkbiganalytics.spark.dataprofiler.output;

/*-
 * #%L
 * thinkbig-spark-job-profiler-app
 * %%
 * Copyright (C) 2017 ThinkBig Analytics
 * %%
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * #L%
 */

import java.io.Serializable;

/**
 * Class to represent a row in profile statistics output<br>
 * Format of output:<br>
 *
 * ColumnName, MetricType, MetricValue
 */
@SuppressWarnings("unused")
public class OutputRow implements Serializable {

    private static final long serialVersionUID = -8872905670704304249L;

    private String columnName;
    private String metricType;
    private String metricValue;

    /**
     * No-argument constructor
     */
    public OutputRow() {
        columnName = null;
        metricType = null;
        metricValue = null;
    }

    /**
     * Three-argument constructor to create a new row
     *
     * @param columnName  name of column
     * @param metricType  metric type
     * @param metricValue metric value
     */
    public OutputRow(String columnName, String metricType, String metricValue) {
        this.columnName = columnName;
        this.metricType = metricType;
        this.metricValue = metricValue;
    }

    /**
     * Get the column name
     *
     * @return column name
     */
    public String getColumnName() {
        return columnName;
    }

    /**
     * Set the column name
     *
     * @param columnName column name
     */
    public void setColumnName(String columnName) {
        this.columnName = columnName;
    }

    /**
     * Get the metric type
     *
     * @return metric type
     */
    public String getMetricType() {
        return metricType;
    }

    /**
     * Set the metric type
     *
     * @param metricType metric type
     */
    public void setMetricType(String metricType) {
        this.metricType = metricType;
    }

    /**
     * Get the metric value
     *
     * @return metric value
     */
    public String getMetricValue() {
        return metricValue;
    }

    /**
     * Set the metric value
     *
     * @param metricValue metric value
     */
    public void setMetricValue(String metricValue) {
        this.metricValue = metricValue;
    }

    /**
     * Set values for the row
     *
     * @param columnName  name of column
     * @param metricType  metric type
     * @param metricValue metric value
     */
    public void setValues(String columnName, String metricType, String metricValue) {
        this.columnName = columnName;
        this.metricType = metricType;
        this.metricValue = metricValue;
    }

    /**
     * Print verbose description of row to console
     */
    @Override
    public String toString() {
        return "OutputRow [columnName=" + columnName + ", metricType=" + metricType + ", metricValue=" + metricValue
               + "]";
    }
}

kylo UI

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

流程入口

https://github.com/Teradata/kylo/tree/master/integrations/spark/spark-job-profiler/spark-job-profiler-app/src/main/java/com/thinkbiganalytics/spark/dataprofiler/core

2018-10-19 20:01:56 Arvinzr 阅读数 970

一、下载Kylo安装包

 

kylo安装包内包含 Mysql、ElasticSearch、ActiveMQ、NiFi下载,安装部署方便  博客之家

 

kylo-0.9.1.tar  ==>  对应Nifi版本为1.6.0

 

curl -O -k https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.0.rpm

rpm -ivh elasticsearch-5.5.0.rpm

 

curl -O -k https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.0.deb

dpkg -i elasticsearch-5.5.0.deb

 

ActiveMQ

curl -O -k https://archive.apache.org/dist/activemq/5.15.0/apache-activemq-5.15.0-bin.tar.gz

 

NiFi

curl -f -O -k https://archive.apache.org/dist/nifi/1.6.0/nifi-1.6.0-bin.tar.gz

 

 

二、创建相应的用户和组

ps:该操作会将所有要安装的服务所需的用户和组设置好,一定要先执行,不然会影响后面安装

useradd -r -m -s /bin/bash nifi && useradd -r -m -s /bin/bash kylo && useradd -r -m -s /bin/bash activemq

groupadd -f kylo && groupadd -f nifi && groupadd -f activemq

 

三、创建/opt/kylo文件夹 博客之家

kylo-0.9.1.tar 上传至/opt/kylo文件夹并解压

 

 

 

四、执行向导安装

 

先执行以下脚本:

/opt/kylo/setup/install/post-install.sh

执行完会生成kylo执行脚本

 

注意:按引导操作,首先需要安装mysql5.6.5+版本的数据库

/opt/kylo/setup/setup-wizard.sh 要执行全路径

 

关于kylo登陆不上的问题解决

1.首先需要安装mysql 5.6.5+ 版本的数据库

2.安装完之后首先创建数据库

create database if not exists kylo character set utf8 collate utf8_general_ci;

3.执行/opt/kylo/setup/sql/generate-update-sql.sh

4.执行 ./setup-mysql.sh localhost root dbCloud1!

注意事项:

上两个脚本执行完成后要确认是否执行成功,不要执行报错了还在往下走流程,一定要培养自己看错误日志解决问题的习惯

1.第一次安装kylo,需要更改/opt/kylo/kylo-services/conf/application.properties该文件,其中

一是要改:

modeshape.datasource.username

modeshape.datasource.password

两处注释去掉

2.mysql驱动改为 com.mysql.jdbc.Driver

3.kylo-service下lib文件夹中上传最新mysql驱动jar包

上传完成后重启服务

 

五、启动服务

1.启动NiFi

$ service nifi start

此时所有服务都应该运行。通过运行验证:

$ service nifi status

$ service elasticsearch status

$ service activemq status

 

2.启动kylo

$ kylo-service start

$ kylo-service status

 

六、测试服务    博客之家

1.Feed Manager和Operations UI

http://127.0.0.1:8400

 

用户名:dladmin

密码:dladmin(此项在引导安装时会设置,注意记录)

2.NiFi UI

http://127.0.0.1:8079/nifi

 

 

Elasticsearch REST API

curl localhost:9200

 

ActiveMQ管理员

http://127.0.0.1:8161/admin

 

有问题可以联系我 

attachments-2018-09-CSKVzwD95ba3630b4036a.jpeg

 

Kylo 0.8.3 安装

阅读数 1167

Kylo 入坑记

阅读数 99

数据湖探索服务

阅读数 340

没有更多推荐了,返回首页