精华内容
下载资源
问答
  • 使用Spark SQL构建交互式查询引擎

    千次阅读 2016-07-09 16:23:00
    StreamingPro 原来仅仅是用来作为Spark Streaming的一个配置化+SQL封装,然而不经意间,已经涵盖了批处理,交互式查询等多个方面。今天就讲讲如何使用StreamingPro构建一个交互式查询引擎。 准备工作 下载Streaming...
        

    StreamingPro目前已经涵盖流式/批处理,以及交互查询三个领域,实现配置和SQL化

    前言

    StreamingPro 原来仅仅是用来作为Spark Streaming的一个配置化+SQL封装,然而不经意间,已经涵盖了批处理,交互式查询等多个方面。今天就讲讲如何使用StreamingPro构建一个交互式查询引擎。

    准备工作

    • 下载StreamingPro

    README中有下载地址

    如果你使用了 Spark 2.0 版本,则要下载对应页面上的Spark 安装包。因为目前Spark 2.0 默认支持Scala 2.11。我提供了一个机遇Scala 2.10版本的。

    我们假设您将文件放在了/tmp目录下。

    同时建立一个只有test.json,下里面的内容有:

    {}
    

    假设你把它放在了/tmp目录。

    启动StreamingPro

    Local模式:

    cd  $SPARK_HOME
    
    ./bin/spark-submit   --class streaming.core.StreamingApp \
    --master local[2] \
    --name sql-interactive \
    /tmp/streamingpro-0.4.1-SNAPSHOT-online-1.6.1.jar    \
    -streaming.name sql-interactive    \
    -streaming.job.file.path file:///tmp/test.json \
    -streaming.platform spark   \
    -streaming.rest true   \
    -streaming.driver.port 9004   \
    -streaming.spark.service true
    

    访问

    http://127.0.0.1:9004/sqlui
    

    后可进入查询界面:

    1063603-c281b8eddb8c8b82.png
    Snip20160709_5.png

    目前支持elasticsearch 索引,HDFS Parquet 等的查询,并且支持多表查询。

    除了交互式界面以外,也支持接口查询:

    http://127.0.0.1:9004/runtime/spark/sql
    

    参数支持:

    参数名 示例 说明
    tableName.abc hdfs://cluster/tmp/a.parquet 索引或者parquet路径,其中abc是SQL中的表名称
    sql SELECT count(distinct(mid)) as a ,floor(floor(time/100)/5)5 as b FROM abc group by floor(floor(time/100)/5)5 查询SQL
    loader_clzz.abc org.elasticsearch.spark.sql 驱动类,如果是parquet文件则可简写为parquet
    loader_param.abc.es.nodes node1 不同驱动可能会有自己一些特定的参数,比如es类的需要通过es.nodes传递ES集群在哪

    上面的参数都是成套出现,你可以配置多套,从而映射多张表。

    集群模式:

    cd  $SPARK_HOME
    
    ./bin/spark-submit   --class streaming.core.StreamingApp \
    --master yarn-cluster \
    --name sql-interactive \
    /tmp/streamingpro-0.2.1-SNAPSHOT-dev-1.6.1.jar    \
    -streaming.name sql-interactive    \
    -streaming.platform spark   \
    -streaming.rest true   \
    -streaming.job.file.path file:///tmp/test.json \
    -streaming.driver.port 9004   \
    -streaming.spark.service true
    

    接着进入spark-ui界面获取driver的地址,就可以访问了。

    服务发现

    因为集群模式,driver的地址是变化的,所以一旦集群启动后我们需要注册到某个地方,从而能然前端知道。目前支持zookeeper的方式,在启动命令行中添加如下几个参数:

    -streaming.zk.servers 127.0.0.1:2181 \
    -streaming.zk.conf_root_dir  /streamingpro/jack
    

    之后前端程序只要访问

    /streamingpro/jack/address
    

    就能获取IP和端口了。

    展开全文
  • 处理框架在某种意义上可称为处理引擎,处理框架按照所处理的数据状态分为批处理框架、流式处理框架及交互式处理框架。 批处理框架 一种计算大规模数据集的方法 批处理模式使用的数据集通常有如下特征 有界:数据...

    大数据计算框架

    计算框架

    处理框架在某种意义上可称为处理引擎,处理框架按照所处理的数据状态分为批处理框架、流式处理框架及交互式处理框架。

    批处理框架

    • 一种计算大规模数据集的方法

    • 批处理模式使用的数据集通常有如下特征

      • 有界:数据集是数据的有界集合
      • 持久:数据通常存储在某种类型的持久存储系统中,如HDFS或数据库
      • 大量:通常为海量的数据集
    • 批处理需要访问全体记录才能完成计算工作,因此较费时

    流式处理框架

    • 用来短时间内高容错的处理持续不断,动态产生的数据

    • 流式处理框架一般采用有向无环图((Directed Acyclic Graph, DAG)模型)

      • 图中节点分为两类
      • 1.数据输入点
      • 2.数据计算节点
      • 从外部系统不断传入的实时数据则流经这些节点,把它们串接起来。
    • 三个重要特点

      • (1)完整数据集只能代表截至目前已经进入系统中的数据总量。
      • (2)工作数据集会更加相关,在特定时间只能代表某个单一数据项。
      • (3)处理工作是基于事件的,除非明确停止,否则没有“尽头”。处理结果立即可用,并会随着新数据的抵达继续更新。
    • 此类处理非常适合某些类型的工作负载,有近实时处理需求的任务很适合使用流式处理

    • Apache Storm是一种侧重于极低延迟的流式处理框架,也是要求近实时处理的工作负载的最佳选择。该框架可处理非常大量的数据,提供结果比其他解决方案具有更低的延迟。同时,Spark Streaming也提供这种流式的处理模式。

    交互式处理框架

    • 交互式查询数据,SQL-on-Hadoop指为分布式数据存储提供SQL查询功能。

    (1)MapReduce

    简介

    • MapReduce是Hadoop大数据处理框架的处理引擎
    • MapReduce对历史的批量数据的处理具有很强的优势,且用户能够基于此引擎轻松地编写应用程序,以实现分布式的并行数据处理。
    • MapReduce源于Google公司的一篇论文,它借鉴了分而治之的思想,将一个数据处理过程拆分为主要的Map(映射)和Reduce(化简)两步。

    MapReduce编程的特点

    • 1.开发简单
    • 2.可扩展性强
    • 3.容错性强

    MapReduce的计算模型

    • 简易模型

      • map

        • 将数据抽象为key-value对形式
        • map()函数以key-value对作为函数进行处理,产生一系列新的key-value对作为中间结果输出到本地
        • MapReduce计算框架将这些中间结果按key排列,并将key值相同的数据发给reduce函数处理]
      • reduce

        • reduce()函数以Key和对应的Value的集合作为输入,经过reduce()函数的处理后,产生另一系列Key-Value对作为最终输出,写入HDFS。
      • 在这里插入图片描述

    • MapReduce能够解决的问题有一个共同特点

      • 任务可以被分解成多个子问题,且这些子问题相对独立,彼此之间不会有牵制;待并行处理完这些子问题后,任务便被解决了。

    使用MapReduce的分布式计算框架来分析网站log日志(灰常有效)

    • 一般网站都需要对网站的PV(URL被访问的次数)和UV(URL被不同IP访问的次数)进行数据统计,这些数据来源于网站服务器上的log日志。这些log日志记录其他机器访问服务器的IP、时间、状态码等信息。

      • 在这里插入图片描述
    • 1.输入

      • 因为大型网站的log日志文件非常庞大,先调用MapReduce库将输入的日志文件分成多个数据片段,也称作分片(split)。每个分片的大小默认为64MB。
      • 将每个分片的文本信息转换为初始的<Key,Value>形式,作为Map任务的输入。
      • 在这里插入图片描述
    • 2.map任务

      • 每个分片交给一个Map任务进行处理,Map任务不断地从对应的分片中解析出一个个键-值对,并调用map()函数进行处理,输出新的键值对,并将输出结果存于HDFS中
      • 在这里插入图片描述
    • 3.shuffle处理

      • 亦叫做数据混洗,代表map()函数产生输出到reduce的输入的整个过程
      • 它首先会对所有的Map输出的中间结果按照Key进行合并,Key值相同的合并一起,在此例中是将访问URL的所有访客IP进行合并,然后按Key值的大小进行排序。Map输出的键-值对经过Shuffle操作后形成<Key,list of Value>的键-值对
      • Shuffle是MapReduce的核心,由MapReduce内部代码实现,它的职责是把Map的输出结果有效地传到Reduce。
    • 4.reduce任务

      • 系统根据reduce任务的个数进行分区
      • 在这里插入图片描述
    • 5.输出

      • MapReduce最终结果存于HDFS中,供后续查询分析使用

    MapReduce的资源管理框架

    • 一开始MapReduce不仅完成分布式计算任务,也负责整个Hadoop集群的资源管理和调度,简称MRv1,但MRv1框架具有多计算框架支持不足的缺点,后来专门开发了全新的资源管理框架——YARN(Yet Another Resource Negotiator)。

    • MRv1

      • MRv1计算框架是主从架构,支撑MapReduce计算框架的是JobTracker和TaskTracker两类后台进程

      • 架构

        • 在这里插入图片描述

        • JobTracker

          • 集群主节点负责任务调度和集群资源监控,不参与具体计算
          • 一个Hadoop集群只有一个JobTracker,存在单点故障的可能性,一旦故障,这一集群上所有正在运行的任务将全部失败
          • TaskTracker会通过周期性的Heartbeat信息向JobTracker汇报当前的健康状况和状态
          • JobTracker会根据各个TaskTracker发送过来的Heartbeat信息综合考虑TaskTracker的资源剩余量、作业优先级、作业提交时间等因素,为TaskTracker分配合适的任务。
        • TaskTracker

          • 从节点,主要负责汇报Heartbeat信息和执行JobTracker的命令。
          • 一个集群中可以有多个TaskTracker,但是一个节点上只有一个
          • TaskTracer周期地向JobTracker发送Heartbeat信息后,JobTracker根据Heartbeat信息和当前作业的运行情况为TaskTracker下达命令,包括启动任务、提交任务、杀死任务、杀死作业和重新初始化5种命令
        • Client

      • 弊端

        • (1)槽被设定为Map槽和Reduce槽,因此,某一时刻Map槽或Reduce槽会出现紧缺情况,降低了槽的使用率。
        • (2)不能动态设置槽的数据量,可能会导致一个TaskTracker资源使用率过高或过低。
        • (3)提交的作业是多样化的,如果一个任务需要1GB内存,则会产生资源浪费;如果一个任务需要3GB内存,则会发生资源抢占。
    • YARN

      • 简介

        • 因为MRv1的种种不足,如可靠性差、多计算框架支持不足、资源利用率低,Apache社区着手下一代Hadoop的开发,提出了一个通用的架构——统一资源管理和调度平台,此平台直接导致了YARN和Mesos的出现。
        • YARN的架构也是主从架构
        • 在MRv2中,YARN接管了所有资源管理调度的功能,同时还兼容异构的计算框架
      • 架构

        • 在这里插入图片描述

        • YARN服务由ResourceManager和NodeManager两类进程组成,Container是YARN的资源表示模型,任何计算类型的作业都可以在Container中。YARN是双层调度模型,ResourceManager是中央调度器,ApplicationMaster是YARN的二级调度器,运行在Container中。

        • ResourceManager

          • ResourceManager是集群中所有资源的管理者,负责集群中所有资源的管理。
          • 定期接受各个NodeManager的资源汇报情况并进行汇总和调度
        • NodeManager

          • NodeManager是YARN集群中的单个节点的代理,管理YARN集群中的单个计算节点,负责保持与ResourceManager同步,跟踪节点的健康状况;
          • 管理各个Container的生命周期,监控每个Container的资源使用情况,管理分布式缓存并管理各个Container生成的日志,提供不同的YARN应用可能需要的辅助服务。
        • ApplicationMaster

          • ApplicationMaster是YARN架构中比较特殊的组件,其生命周期随着应用的开始而开始,随着应用的结束而结束。它是集群中应用程序的进程。每个应用程序都有自己专属的ApplicationMaster,不同计算框架如MapReduce和Spark的ApplicationMaster的实现也是不同的。它负责向ResourceManager申请资源,在对应的NodeManager上启动Container来执行任务,同时在应用运行过程中不断监控这些任务的状态。
    • MRv1和YARN的对比

      • 在这里插入图片描述

    (2)Spark

    基本知识

    • Spark是通用的并行计算框架,由加州大学伯克利分校的AMP实验室开发于2009年,并于2010年开源,2013年成长为Apache旗下的大数据领域活跃的开源项目之一。
    • Hadoop用于解决高吞吐、批量处理的业务场景,如5.2节举例的离线计算网站UV,但如果需要实时查看网站浏览量统计信息,Hadoop就无法满足,而Spark通过内存计算能极大地提高大数据处理速度,能够满足此要求。同时,Spark还支持流式计算、SQL查询、机器学习等。

    特点

    • 1.快速处理能力
    • 2.易于使用
    • 3.通用性强
    • 4.可用性高

    spark的生态系统

    • Spark的生态系统以Spark Core为核心,能够读取传统文件、HDFS、HBase等数据源,利用Standalone、YARN、EC2和Mesos等资源调度管理,完成应用程序的分析与处理。这些应用程序来自Spark的不同组件,如Spark shell交互式批处理方式、Spark Streaming的实时流处理应用、Spark SQL的即席查询,MLlib的机器学习、GrapX的图处理等。

    • 在这里插入图片描述

    • Spark Core

      • Spark Core 是整个生态系统的核心组件,是一个分布式大数据处理框架。
    • Spark Streaming

      • Spark Streaming是一个实时流式处理系统,针对实时数据流进行高吞吐、高容错的处理

      • 可以对多种数据源如Kafka、Flume或TCP sockets等进行类似Map、Reduce和Join等复杂操作,并将结果保存到文件系统、数据库或应用程序的实时仪表盘中。

      • Spark Streaming提供的处理引擎和RDD编程模型可以同时进行批处理和流处理,它使用的是数据离散化处理方式,能够进行秒级以下的数据批处理。在Spark Streaming处理过程中,Receiver并行接收数据,并将数据缓存至Spark工作节点的内存中。经过系统延迟优化后,Spark引擎对短任务能够进行批处理。Spark可基于数据的来源及可用资源情况,将任务动态地分配给工作节点

      • 使用离散化流数据使Spark Streaming具有以下优点。

        • (1)动态负载均衡
        • (2)快速故障恢复
      • 在这里插入图片描述

    • Spark SQL

      • 前身为Shark,即HIve on Spark
      • Spark SQL允许开发人员直接处理RDD,也可查询在Hive上存储的外部数据。Spark SQL的一个重要特点是能够统一处理关系表和RDD,使开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。
    • BlinkDB

      • BlinkDB是一个用于在海量数据上运行交互式SQL查询的大规模并行查询引擎,允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。
      • BlinkDB采用自适应的优化框架,从原始数据随着时间的推移建立并维护一组多维样本,且遵循动态样本选择策略。
    • MLlib

      • MLlib是Spark提供的机器学习框架。它的目标是让机器学习的门槛更低,让一些可能并不了解机器学习的用户能够使用MLlib。MLlib目前已经提供了基础统计、分类、回归、决策树、随机森林、朴素贝叶斯、协同过滤、聚类、维数缩减、特征提取与转型、频繁模式挖掘等多种数理统计、概率论、数据挖掘方面的数学算法。
    • GraphX

      • GraphX是分布式图计算框架。它提供了对图的抽象Graph,Graph由顶点、边及边权值3种结构组成。对Graph的所有操作最终都会转换成RDD操作来完成,即对图的计算在逻辑上等价于一系列的RDD转换过程。目前,GraphX已经封装了最短路径、网页排名、连接组件、三角关系统计等算法的实现,用户可自行选择使用。

    Spark的架构与原理

    • 基本架构

      • 在这里插入图片描述

      • (1)Cluster Manager

        • 它是Spark的集群管理器,负责资源的分配与管理。Cluster Manager属于一级分配,它将各个Worker上的内存、CPU等资源分配给应用程序。YARN、Mesos、EC2等都可以作为Spark的集群管理器,如YARN的ResourceManager可以作为Spark的Cluster Manager使用。
      • (2)Worker

        • Worker是Spark的工作节点。对Spark应用程序来说,由Cluster Manager分配得到资源的Worker节点主要负责创建Executor,将资源和任务进一步分配给Executor,并同步资源信息给Cluster Manager。在Spark on YARN模式下,Worker就是NoteManager节点。
      • (3)Executor

        • Executor是具体应用运行在Worker节点上的一个进程,该进程负责运行某些Task,并且负责将数据存到内存或磁盘上,每个应用都有各自独立的一批Executor。Executor同时与Worker、Driver App保持信息的同步。
      • (4)DriverApp

        • 它属于客户端驱动程序,用于将任务程序转化为RDD和DAG,并与Cluster Manager进行通信与调度。
    • 工作流程

      • 在这里插入图片描述
      • (1)构建Spark 应用程序的运行环境,启动SparkContext。
      • (2)SparkContext向资源管理器申请运行Executor资源。
      • (3)资源管理器分配资源给Executor,并由Worker启动Executor。
      • (4)Executor向SparkContext申请Task任务。
      • (5)SparkContext根据应用程序构建DAG。
      • (6)将DAG划分为多个Stage。
      • (7)把每个Stage需要的任务集发送给TaskScheduler。
      • (8)TaskScheduler将Task发送给Executor运行,Executor在执行任务过程中需要与DriverApp通信,告知目前任务的执行状态。
      • (9)Task在Executor上运行完后,释放所有资源。

    Spark RDD的基本知识

    • RDD的定义

      • RDD(Resilient Distributed Datasets)是一种可扩展的弹性分布式数据集,是Spark最基本的数据抽象,表示一个只读、分区且不变的数据集合,是一种分布式的内存抽象。
      • RDD不具备Schema的数据结构,可以基于任何数据结构创建,如tuple(元组)、dict(字典)和list(列表)等。
    • 与RDD类似的分布式共享内存(Distributed Shared Memory,DSM)也是分布式的内存抽象,但两者是不同的。与DSM相比,RDD模型有两个优势:

      • ①RDD中的批量操作在运行时会根据数据存放的位置来调度任务;
      • ②对扫描类型的操作,如果内存不足以缓存整个RDD,就进行部分缓存,避免内存溢出。
      • 在这里插入图片描述
    • RDD的优点

      • (1)Partition(译为分区或分片)是Spark数据集的基本组成单位。
      • (2)在Spark中,RDD的计算是以分片为单位的,每个RDD都会实现计算函数,以达到RDD的转换操作。
      • (3)RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。
      • (4)当前,Spark实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。
      • (5)列表,允许存储/读取每个Partition的优先位置(Preferred location)。
    • RDD操作类型

      • 在Spark编程中,开发者需要编写DriverApp来连接Worker。
      • DriverApp定义一个或多个RDD及相关的行动操作。Worker将经过一系列操作后的RDD分区数据保存在内存中。在Spark中,RDD的操作大致可以分为4类,分别为创建(Create)、转换(Transformation)、控制(Contral)和行动(Action)。
    • RDD的依赖关系

      • RDD只支持粗粒度转换,即在大量记录上执行的单个操作,将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

      • 一个RDD包含一个或多个分区,RDD的每个分区分布在集群的不同节点中,每个分区实际是一个数据集合的片段。在构建DAG的过程中,系统会将RDD用依赖关系串联起来。每个RDD都有其依赖,除了最顶级的RDD,这些依赖分为窄依赖和宽依赖。

      • 窄依赖指每个父RDD的分区都至多被一个子RDD的分区使用;而宽依赖指多个子RDD的分区依赖一个父RDD的分区。例如Map、Filter操作是一种窄依赖,而Join、GroupByKey操作属于宽依赖,如图5-11所示。

    • RDD的作业调度

      • 当对RDD执行转换操作时,调度器会根据RDD的“血统”来构建由若干Stage组成的DAG,每个Stage包含尽可能多的连续窄依赖转换。调度器按照DAG的顺序进行计算,并最终得到目标结果。

      • 调度器根据数据存储位置向各节点分配任务,并采用延时调度机制。例如某个任务需要处理的某个分区刚好存储在某个节点的内存中,则该任务会分配给节点;如果内存不包含该分区,调度器会找到包含该RDD的较佳位置,并把任务分配给其所在节点。

      • 图5-12展示了Spark中的RDD作业调度DAG,其中有A、B、C、D、E、F、G这7个RDD,每个RDD有多个分区。在这个图中,系统根据RDD的宽依赖将整个作业分为3个Stage,其中Stage 2内部的窄依赖则以流水线的形式执行,Stage 1与Stage 2执行完成后执行Stage 3。

    XMind: ZEN - Trial Version

    展开全文
  • 基于Hadoop的SQL一直在被持续地改进...重点介绍它是如何以数量级加速大数据查询,以及在2.0版里面为交互式BI所提供的新特性,包括对雪花模型的支持和流式建立数据立方。 Apache Kylin是什么? Kylin是一个在Hadoop上...

    基于Hadoop的SQL一直在被持续地改进,但是一个查询要等几分钟到几小时还是非常得正常。在这篇博文里,我们将会介绍开源的分布式分析引擎Apache Kylin。重点介绍它是如何以数量级加速大数据查询,以及在2.0版里面为交互式BI所提供的新特性,包括对雪花模型的支持和流式建立数据立方。

    Apache Kylin是什么?

    Kylin是一个在Hadoop上的OLAP引擎。如图1所示,Kylin位于Hadoop之上,向上层的应用提供了基于标准SQL接口的关系型数据。

    为大数据带来交互式的BI

    图1 Apache Kylin的位置。图片由Yang Li友情提供

    Kylin可以处理大数据集,从查询延迟上讲是很快的,这也是它和其他基于Hadoop的SQL的区别。比如,我们所知道的使用Kylin的最大的生产系统实例是在今日头条,一个中国的新闻推送应用。头条有一个包含3万亿条记录的表,对它的平均查询响应时间低于1秒。下一节我们会讨论Kylin是怎么实现这么快的查询。

    Kylin引擎的另一个特点是它可以支持复杂的数据模型。 例如,太平洋保险(CPIC,中国的一个保险集团公司)有一个多达60维的模型。 Kylin提供标准的JDBC / ODBC / RestAPI接口,可实现与任何SQL应用程序的连接。

    Kyligence还开发了一个在线演示,展示了使用10亿条航班记录的BI体验。你可以查看这里来了解。比如,在旧金山国际机场过去20年里延误最久的航班是哪个。(使用用户名“analyst”和密码“analyst”登录,选择“airline_cube”,拖放维度和事实数据来查询这个数据集)

    一个零售业场景:展示Kylin的速度

    Kylin比传统的基于Hadoop的SQL要快,是因为它采用了预计算来在SQL执行前先行一步。例如,设想一个零售业务场景,你需要处理非常多的订单,每个订单包含多个商品。如果想知道订单取消和退货造成的影响,一个分析人员可能需要写一个查询来在某个时间段内按照“returnflas(退货标记)”和“orderstatus(订单状态)”来汇总收入进行汇报,如图2 所示。图里面显示了这个查询被编译成的关系表达式,也叫执行计划。

    为大数据带来交互式的BI

    图2 一个典型的OLAP查询的时间复杂度。图片由Yang Li友情提供

    从这个执行计划可以很容易地看出,这个执行的时间复杂度至少是O(N),这里N是表里的总行数,因为每行都要至少被访问一次。同时我们假定要关联的表都已经很好地被分区和索引过了,因此花费比较大的关联操作也可以在线性的时间复杂度上完成,但在实际场景里这种情况是不大可能的。

    这个O(N)的时间复杂度并不好,因为这意味着如果数据量增长十倍,则查询时间也会慢10倍。现在一个查询需要花1秒钟,那么以后随着数据的增长,这个时间会变成10秒甚至是100秒。我们想要的是无论数据量大小,这个查询时间都是固定不变的。

    Kylin的解决方法是预计算。整体思路是,如果我们提前知道查询的模式,我们就能预先计算出整个执行的一部分。在上面这个例子里,就是预先计算Aggregate、Join和表扫描操作。生成的结果是一个立方体理论里的数据立方(或者可以把它叫做“实例化的总结表”,如果这样听起来觉得更好的话)。

    如图3所示,最初的执行计划就被转换成了基于数据立方之上。这个数据立方体包含了按照“returnflag(退货标记)”、“orderstatus(订单状态)”和“date(日期)”进行汇总的记录。因为退货标记和订单状态是一个固定的数量,而日期范围被限定在3年(大概1000天)。这就意味着这个数据立方体里的行数最多就是“标记数×状态数×天数”,对O定义的时间复杂度来说就是一个常量。这个新的执行计划将会保证不管源数据有多大都有一个固定的执行时间。这就我们想要的效果!

    为大数据带来交互式的BI

    图3. 通过预计算实现从O(N) 到O(1)。图片由Yang Li友情提供

    Kylin的架构一览

    如我们所见,Kylin是一个依赖于预计算的系统。其核心是基于经典的立方体理论,并发展成一个大数据上的SQL解决方案(见图4)。它使用模型和立方体概念来定义预计算的空间。构建引擎从数据源载入数据,并在使用MapReduce或Spark的分布式系统上进行预计算。被计算出来的立方体被存储在HBase里。当一个查询来到时,Kylin把它编译成一个关系表达式,找到匹配的模型,并基于预计算好的数据立方体而不是原始数据执行这个表达式。

    为大数据带来交互式的BI

    图4 Apache Kylin的架构。图片由Yang Li友情提供

    这里的关键就是建模。如果你对数据以及分析的需求有非常好的理解,你是可以用正确的模型和立方体定义来找到正确的预计算。接着,绝大多数(如果不是全部)的查询都可以被转化成对此立方体的查询。如图5所示,执行时间复杂度可以被降低到O(1),从而获得非常快的查询速度,无论原始数据有多大。

    为大数据带来交互式的BI

    图5 O(N) 和O(1)的对比。图片由Yang Li友情提供

    (延展阅读:一个展示Kylin在不同数据量级别上拥有一致的表现的星形模型基准测试。)

    Kylin 2.0的特性

    对雪花模型的支持和TPC-H基准测试

    Kylin 2.0引入了对元数据建模的增强,并且可以支持开箱即用的雪花模型。为了演示建模和SQL能力上的改进,我们进行了用Kylin 2.0运行TPC-H查询的基准测试。

    需要注意的是,这里的目标并不是想与其他人的TPC-H结果进行比较。一方面,根据TPC-H规范,不允许在表之间进行预先计算,因此在这个意义上,Kylin不能算是有效的TPC-H系统。另一方面,我们还没有对这些查询进行性能调优。改善的空间还是很大的。

    根据相同的零售场景,让我们快速查看一些有趣的TPC-H查询。图6是TPC-H查询07。SQL里面的字太小,可能看不清楚,但它给出了查询的复杂性的粗略感觉。该图是执行计划,强调了预计算(白色)与在线计算(蓝色)的部分。很容易看出,大部分关系运算符是预先计算的。剩下的Sort / Proj / Filter在很少的记录上工作,因此查询可以超快。在相同的硬件和相同的数据集上,Kylin用了0.17秒完成,而Hive + Tez对此查询运行了35.23秒。这显示了预计算所带来的差异。

    为大数据带来交互式的BI

    图6 TPC-H的查询07。图片由Yang Li友情提供

    图7所示的TPC-H查询11是另一个例子。这个查询有四个子查询,比前一个更复杂。 其执行计划包括两个分支,每个分支从预先计算的立方体载入数据。 分支结果再连接起来,这是一个复杂的在线计算。随着在线计算部分的增加,预计算的部分减少,Kylin的运行时间更长:3.42秒。 然而,完全在线计算的Hive + Tez仍然要慢一点,运行时间为15.87秒。

    为大数据带来交互式的BI

    图7 TPC-H的查询11。图片由Yang Li友情提供

    (有关Kylin和TPC-H的更多详细信息,请参阅此链接。此链接包含可以在你自己的环境中重复基准测试的步骤,以及我们在两个不同Hadoop集群中测试的所有TPC-H查询的性能结果。)

    为近实时分析准备的流式立方体

    预先计算给ETL流程增加了额外的时间,这在实时场景中会成为一个问题。为了解决这个问题,Kylin现在支持增量加载新添加的数据,而不会影响历史数据。 已有RestAPI可用于自动触发增量构建。每日构建一次是最常见的,现在更频繁的数据加载也是可行的。

    从1.6版开始,Kylin可以直接从Kafka获取数据,并进行近乎实时的数据分析。使用基于内存的立方体算法,微型增量构建可以在几分钟内完成。生成的结果是许多小的立方体分片,可以给查询提供实时的结果。

    为了展示这个特性是如何运作的,我们构建了一个演示网站来实时分析Twitter消息。它运行在一个八个节点的AWS集群上,有三个Kafka broker。输入是一个Twitter样本源,每秒有超过10K条消息。立方体的平均复杂度是:九个维度和三个测量数据。增量构建是每两分钟触发一次,并在三分钟内完成。总体而言,系统在实时性上有五分钟的延迟。

    为大数据带来交互式的BI

    图8 近实时的Twitter分析。图片由Yang Li友情提供

    该演示按照语言和设备维度显示了Twitter消息的趋势。在图8中可以看到,美国白天的英文消息量上升,而亚洲语言的消息量在夜间下降。演示里还有一个标签云,用以显示最新的热门话题。在标签云下面是最热门标签的趋势。所有图表都是实时到最近五分钟。

    总结

    Apache Kylin是Hadoop上一个流行的OLAP引擎。通过使用预计算技术,它可以使SQL对大数据的查询速度有数量级的加快,并使交互式BI和其他在线应用程序能够直接在大数据上运行。

    Kylin 2.0是最新版本,可以在这里下载。新版本的特性包括:Hadoop上的小于秒级的SQL延迟;雪花模型的支持和成熟的SQL功能;流式立方体用于近实时分析;内置时间/窗口/百分位数功能;和可以将构建时间缩短一半的Spark构建立方体。



    本文转自d1net(转载)

    展开全文
  • 简介:Flink 是目前最流行的大数据及流式计算框架之一,用户可以使用 Java/Scala/Python 的 DataStream 接口或者标准 SQL 语言来快速实现一个分布式高可用的流式应用,通过内部的 Java JIT、off-heap 内存管理等技术...

    简介: Flink 是目前最流行的大数据及流式计算框架之一,用户可以使用 Java/Scala/Python 的 DataStream 接口或者标准 SQL 语言来快速实现一个分布式高可用的流式应用,通过内部的 Java JIT、off-heap 内存管理等技术优化性能,并且有完整的 Source、Sink、WebUI、Metrics 等功能集成,让 Flink 几乎成为了流式计算的事实标准。

    作者:田志声

    前言

    Flink 是目前最流行的大数据及流式计算框架之一,用户可以使用 Java/Scala/Python 的 DataStream 接口或者标准 SQL 语言来快速实现一个分布式高可用的流式应用,通过内部的 Java JIT、off-heap 内存管理等技术优化性能,并且有完整的 Source、Sink、WebUI、Metrics 等功能集成,让 Flink 几乎成为了流式计算的事实标准。

    但是当处理海量数据的时候,很容易出现各种异常和性能瓶颈,这时我们需要优化系统性能时,常常需要分析程序运行行为和性能瓶颈。Profiling 技术是一种在应用运行时收集程序相关信息的动态分析手段,常用的 JVM Profiler 可以从多个方面对程序进行动态分析,如 CPU、Memory、Thread、Classes、GC 等,其中 CPU Profiling 的应用最为广泛。CPU Profiling 经常被用于分析代码的执行热点,如“哪个方法占用 CPU 的执行时间最长”、“每个方法占用 CPU 的比例是多少”等等,通过 CPU Profiling 得到上述相关信息后,研发人员就可以轻松针对热点瓶颈进行分析和性能优化,进而突破性能瓶颈,大幅提升系统的吞吐量。

    本文介绍我们在做性能优化常用的火焰图以及为如何集成火焰图到通用的 Flink 作业中。

    火焰图介绍

    火焰图是《性能之巅》作者以及 DTrace 等一系列 Linux 系统优化工具作者 Brendan Gregg 大神的作品之一,可以非常清晰地展示应用程序的函数调用栈以及函数调用时间占比,基本原理是通过各种 agent 在程序运行时采样并输出日志,使用 FlameGraph 工具把日志提取出来输出可在浏览器交互式查看的 SVG图片。

    Uber 开源了 jvm-profiler 项目,介绍如何为 Spark 应用和 Java 应用添加火焰图支持,但是目前 Flink 社区和 jvm-profiler 官网都还没有相关的使用教程。

    1.jpg

    实际上基于 JVM 的程序都可以使用这个工具,本文将基于 jvm-profiler 来介绍如何生成 Flink 作业的火焰图。

    下载和编译 jvm-profiler

    git clone git clone https://github.com/uber-common/jvm-profiler.git
    
    mvn clean install -DskipTests=true -Dcheckstyle.skip -Dfast -T 8C

    编译好了之后,将项目 target 目录下的 jvm-profiler-1.0.0.jar 复制一份到 flink 的 lib 目录下面。

    cp target/jvm-profiler-1.0.0.jar /usr/local/flink-1.11.1/lib

    下载 FlameGraph

    由于 jvm-profiler 支持生成火焰图需要的日志文件,将日志转化成交互式 SVG 图片还是使用 Brendan Gregg 的FlameGraph 工具。

    git clone https://github.com/brendangregg/FlameGraph.git

    下载项目源码即可,后面会使用 flamegraph.pl 工具来生成图片文件。

    配置 Flink

    对于 Flink 应用,我们只需要在 TaskManager 中注入打点的 Java agent 即可,这里测试,我就使用本地 standalone 模式,修改 Flink conf 目录下的 flink-conf.yaml 文件,添加一下如下配置:

    env.java.opts.taskmanager: "-javaagent:/usr/local/flink-1.11.1/lib/jvm-profiler-1.0.0.jar=sampleInterval=50"

    目前最小的采样间隔就是 50 毫秒,然后启动集群和运行一个 Flink 作业:

    ./bin/start-cluster.sh
    
    
    //运行一个作业
    ./bin/flink run ./examples/streaming/StateMachineExample.jar

    运行之后可以看到 TaskManager 的 stdout 里面打印如下:

    2.jpg

    因为已经注入 Java agent,因此在标准输出中会定期添加火焰图所需要的打点数据,然后使用下面的命令提取相关日志,并且使用 jvm-profiler 和 FlameGraph 提供的工具来生成 SVG 图片文件。

    //1、提取 stdout 文件中的相关日志
    
    cat log/flink-zhisheng-taskexecutor-0-zhisheng.out | grep "ConsoleOutputReporter - Stacktrace:" | awk '{print substr($0,37)}' > stacktrace.json
    
    
    //2、在 jvm-profiler 目录下执行下面命令
    
    python ./stackcollapse.py -i /usr/local/flink-1.11.1/stacktrace.json > stacktrace.folded
    
    
    //3、在 FlameGraph 目录下执行下面命令生成 SVG 图片
    
    ./flamegraph.pl /Users/zhisheng/Documents/github/jvm-profiler/stacktrace.folded > stacktrace.svg

    然后用浏览器打开刚才生成的 SVG 图片就可以看到火焰图信息。

    3.jpg

    总结

    本文主要目的在于教大家如何利用 jvm-profiler 去生成 Flink 作业的运行火焰图,这样可以在遇到性能瓶颈问题的时候会很方便大家去定位问题,关于如何去读懂生成的火焰图,后面可以再分享系列文章。

     

    原文链接
    本文为阿里云原创内容,未经允许不得转载。

    展开全文
  • Bokeh是一款针对现代Web浏览器呈现功能的交互式可视化库。Bokeh通过Python(或其他语言)以快速简单的方式为超大型或流式数据集提供高性能交互的漂亮而且结构简单的多功能图形。 为了提供高级自定义所需的简单性和强大...
  • 我们现在在切页面布局的使用常用的单位是px,这是一个绝对单位,webapp的屏幕适配有很多中做法,例如:流式布局、限死宽度,还有就是通过响应来做,但是这些方案都不是最佳的解决方法。 1、流式布局: 例如流式...
  • 允许同时以低内存/ CPU占用空间将SSE流式传输到众多客户端 使用提供的Docker映像,Heroku()或 ()易于部署 可以在生产中服务资产 保留有关已定义作业的统计信息,并随附一个仪表板以对其进行监视() 可以对...
  • 大数据处理系统可分为批式(batch)大数据和流式(streaming)大数据两类。...基于历史数据的交互式查询(interactive query),通常的时间跨度在数十秒到数分钟之间。基于实时数据流的数据处理(streaming d
  • 和UDP这种“滚珠”的协议不同(一份数据就是一个udp packet),TCP以报文段的方式传递数据,其大小受网络链路的限制。在SYN报文段中互相通告最大报文段长(MSS)。所以业务层交付的数据,会被TCP拆分/合并为合适的...
  • 点击上方 "zhisheng"关注,星标或置顶一起成长Flink 从入门到精通系列文章前言Flink 是目前最流行的大数据及流式计算框架之一,用户可以使用 Java/Scala/Python 的DataStream 接口或者标准 SQL 语言来快速实现一个...
  • 简介Flink是目前最流行的大数据及流式计算框架之一,用户可以使用Java/Scala/Python的DataStream接口或者标准SQL语言来快速实现一个分布式高可用的流式应用,通过内部的Java JIT、off-heap内存管理等技术优化性能,...
  • 本次分享主要分为以下四个...EMR下层可以访问各种各样的存储,比如对象存储OSS、集群内部自建的HDFS以及流式数据等。用户可以利用EMR处理海量数据和进行快速分析,也能够支持用户在上面做机器学习以及数据清洗等工作...
  • 前言Flink 是目前最流行的大数据及流式计算框架之一,用户可以使用 Java/Scala/Python 的 DataStream 接口或者标准 SQL 语言来快速实现一个分布式高可用的流式应用,通过内部的 Java JIT、off-heap 内存管理等技术...
  • 2)如何看待微信公众号阅读方式从瀑布流式退回到原来卡片阅读的情况? 1)不能。 朋友圈是作为“发现”的子功能。发现社交不仅仅是朋友圈的动态,还有一系列如陌生人社交的附近的人、漂流瓶;如目前基于社交关系...
  • 推出 KSQL 是为了降低流式处理的门槛,为处理 Kafka 数据提供简单而完整的可交互式 SQL 接口。KSQL 目前可以支持多种流式操作,包括聚合(aggregate)、连接(join)、时间窗口(window)、会话(session),等等。
  • 交互式监视模式下启动测试运行程序。 有关更多信息,请参见关于的部分。 npm run build 构建生产到应用程序build文件夹。 它在生产模式下正确捆绑了React,并优化了构建以获得最佳性能。 最小化构建,文件名...
  • KSQL降低了进入流处理的门槛,提供了一个简单的、完全交互式的SQL接口,用于处理Kafka的数据。你不再需要用Java或Python这样的编程语言编写代码了!KSQL是开源的(Apache2.0许可)、分布式的、可扩展的、可靠的和实时...
  • SparkStreaming是大规模流式数据处理的新贵,将流式计算分解成一系列短小的批处理作业。本文阐释了SparkStreaming的架构及编程模型,并结合实践对其核心技术进行了深入的...2.基于历史数据的交互式查询(interactiveque
  • KSQL:Apache Kafka的开源流式SQL

    千次阅读 2017-09-08 17:28:22
    原文:Neha Narkhede 作者:Madison Moore 翻译:lloog 译者注:作者从好处、特点、下...KSQL降低了进入流处理的门槛,提供了一个简单的、完全交互式的SQL接口,用于处理Kafka的数据。你不再需要用Java或Pytho...
  • 推出 KSQL 是为了降低流式处理的门槛,为处理 Kafka 数据提供简单而完整的可交互式 SQL 接口。KSQL 目前可以支持多种流式操作,包括聚合(aggregate)、连接(join)、时间窗口(window)、会话(session),等等。 ...
  • 和 Spark 类似,两者都希望提供一个统一功能的计算平台给用户,都在尝试建立一个统一的平台以运行批量,流式交互式,图处理,机器学习等应用。 1.1部署模式 Flink 集群的部署,本身不依赖 Hadoop 集群,如果...
  • sed命令总结

    2015-07-18 13:53:21
    sed是一种非交互式流式编辑器,是英文stream editor的缩写。其工作原理为把当前处理的行存储在临时缓冲区中,称为“模式空间”(pattern space),接着用sed命令处理缓冲区中的内容,处理完成后,把缓冲区的内容送往...
  • 无论是做linux下的驱动开发,还是Windows CE平台下的驱动开发,对流驱动模型一定非常熟悉。在此架构下,把所有的硬件设备都看成文件,和设备的交互其实就是读写文件,也就是数据流动。这样开发驱动不仅简单实用,...

空空如也

空空如也

1 2 3 4 5 ... 17
收藏数 332
精华内容 132
关键字:

交互式流式