streaming_streamingassets - CSDN
精华内容
参与话题
  • Streaming简介

    2013-07-24 11:27:17
    Streaming简介  Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。因此可以说对于hadoop的扩展性意义重大,今天简单说一下。 Streaming的原理是用Java实现一个...

    Streaming简介 

    Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。因此可以说对于hadoop的扩展性意义重大,今天简单说一下。

    Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用MapReduce Java接口获取key/value对输入,创建一个新的进程启动包装的用户程序,将数据通过管道传递给包装的用户程序处理,然后调用MapReduce Java接口将用户程序的输出切分成key/value对输出。 

     

    Streaming优点

    1 开发效率高,便于移植

    只要按照标准输入输出格式进行编程,就可以满足hadoop要求。因此单机程序稍加改动就可以在集群上进行使用。 同样便于测试

    只要按照 cat input | mapper | sort | reducer > output 进行单机测试即可。

    如果单机测试通过,大多数情况是可以在集群上成功运行的,只要控制好内存就好了。

        2 提高程序效率

    有些程序对内存要求较高,如果用java控制内存毕竟不如C/C++。

    Streaming不足

        1 Hadoop Streaming默认只能处理文本数据,无法直接对二进制数据进行处理 

        2 Streaming中的mapperreducer默认只能向标准输出写数据,不能方便地处理多路输出 

    具体参数介绍

     

    -input    <path>

    输入数据路径

    -output   <path>

    输出数据路径

    -mapper  <cmd|JavaClassName>

    mapper可执行程序或Java类

    -reducer  <cmd|JavaClassName>

    reducer可执行程序或Java类

    -file            <file>        Optional

    分发本地文件

    -cacheFile       <file>        Optional

    分发HDFS文件

    -cacheArchive   <file>         Optional

    分发HDFS压缩文件

    -numReduceTasks  <num>    Optional

    reduce任务个数

    -jobconf | -D NAME=VALUE    Optional

    作业配置参数

    -combiner <JavaClassName>   Optional

    Combiner Java

    -partitioner <JavaClassName>  Optional

    Partitioner Java

    -inputformat <JavaClassName> Optional

    InputFormat Java

    -outputformat <JavaClassName>Optional

    OutputFormat Java

    -inputreader <spec>            Optional

    InputReader配置

    -cmdenv   <n>=<v>           Optional

    传给mapper和reducer的环境变量

    -mapdebug <path>             Optional

    mapper失败时运行的debug程序

    -reducedebug <path>           Optional

    reducer失败时运行的debug程序

    -verbose                      Optional

    详细输出模式

     

     下面是对各个参数的详细说明:

    -input <path>:指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入。

    -output <path>:指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次。

    -mapper:指定mapper可执行程序或Java类,必须指定且唯一。

    -reducer:指定reducer可执行程序或Java类,必须指定且唯一。

    -file, -cacheFile, -cacheArchive:分别用于向计算节点分发本地文件、HDFS文件和HDFS压缩文件

    -numReduceTasks:指定reducer的个数,如果设置-numReduceTasks 0或者-reducer NONE则没有reducer程序,mapper的输出直接作为整个作业的输出。

    -jobconf | -D NAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考hadoop-default.xml特别建议-jobconf mapred.job.name='My Job Name'设置作业名,使用-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW设置作业优先级,使用-jobconf mapred.job.map.capacity=M设置同时最多运行Mmap任务,使用-jobconf mapred.job.reduce.capacity=N设置同时最多运行Nreduce任务。

    常见的作业配置参数如下表所示: 

    mapred.job.name

    作业名

    mapred.job.priority

    作业优先级(-jobconf mapred.job.queue.name=<queue_name> mapred.job.priority=<VERY_LOW|LOW|NORMAL|HIGH|VERY_HIGH>)

    mapred.job.map.capacity

    最多同时运行map任务数

    mapred.job.reduce.capacity

    最多同时运行reduce任务数

    hadoop.job.ugi

    作业执行权限

    mapred.map.tasks

    map任务个数

    mapred.reduce.tasks

    reduce任务个数

    mapred.job.groups

    作业可运行的计算节点分组

    mapred.task.timeout

    任务没有响应(输入输出)的最大时间

    mapred.compress.map.output

    map的输出是否压缩

    mapred.map.output.compression.codec

    map的输出压缩方式

    mapred.output.compress

    reduce的输出是否压缩

    mapred.output.compression.codec

    reduce的输出压缩方式

    stream.map.output.field.separator

    map输出分隔符

     -combiner:指定combiner Java类,对应的Java类文件打包成jar文件后用-file分发。

    -partitioner:指定partitioner Java类,Streaming提供了一些实用的partitioner实现,参考KeyBasedFiledPartitonerIntHashPartitioner

    -inputformat, -outputformat:指定inputformatoutputformat Java类,用于读取输入数据和写入输出数据,分别要实现InputFormatOutputFormat接口。如果不指定,默认使用TextInputFormatTextOutputFormat

    -cmdenv NAME=VALUE:给mapperreducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值。

    -mapdebug, -reducedebug:分别指定mapperreducer程序失败时运行的debug程序。

    -verbose:指定输出详细信息,例如分发哪些文件,实际作业配置参数值等,可以用于调试。

    展开全文
  • Streaming 102批处理之外的流处理世界

    千次阅读 2017-09-18 17:55:51
    Steaming 101介绍了基本的术语,有限数据(bounded)VS无限数据(unbounded),然后是批处理和流处理的区别,在介绍完术语之后,阐述了事件时间和处理时间这两个重要概念,在Steaming 102中增加了3个新的概念:Watermark...

    简单回顾和路线图

          在Streaming 101中,首先澄清了一些术语,介绍了有限数据 VS无限数据。有限数据源具有有限的大小,通常被称为“批处理”数据。无限数据源源可能具有无限大小,通常被称为“流”数据。在后边将会尽量避免使用批处理和流式传输来修饰数据源,因为这些名称带有一些令人误解和限制性的含义。

          然后,解释了批处理和流处理引擎之间的区别:批处理引擎是设计优先考虑有限数据(现在批处理引擎提供了micro-batch的方式处理流式数据),而流处理引擎设计用于处理无限数据。目标是在描述执行引擎时使用批处理和流处理。

          定义完术语之后,介绍了与处理有限数据有关的两个重要的基本概念。首先阐述事件时间(事件发生的时间)和处理时间(数据在系统中被处理的时刻)之间的关键区别。这为Streaming 101中提出的主要论文提供了基础:如果关心事件实际发生时间,则必须基于事件的事件时间,而不是处理时间。

          接下来介绍了窗口的概念(即沿时间边界切分数据集),这是一种常用的方法,用于应对无限数据源的数据处理,无限数据源理论上永远不会结束。窗口策略的最常见且简单的例子是固定和滑动的窗口,更复杂的窗口类型,例如会话窗口(其中窗口由数据本身的特征决定,捕获每个用户的活动会话窗口,会话窗口之后紧接着是用户的不活动期)也比较广泛的用法。

          除了前文中介绍的概念,现在再介绍3个新的概念:

    • • Watermark

          Watermark是相对于事件时间的输入完整性的概念。Watermark表示一个时间X,表示所有事件时间<X的所有数据到到齐了。因此,当处理无限数据源时,Watermark作为进度的度量。

    • • 触发器

          触发器是一种由外部条件触发,来表明何时计算窗口结果的机制。触发器可以让我们灵活的选择何时计算结果并发送给下游,而且随着数据的不停的到来,窗口的可以产生多次输出。所以,窗口结束前可以先提供近似结果,并且能够灵活应对上游数据的变化(可能是上游发送的数据修正)或者数据延迟到达(例如,移动场景在某人的离线时,某人的电话记录了各种动作及其事件时间,然后在重新获得连接时继续上传这些事件进行处理)。

    • • 累积

          累积模式指定在同一窗口中观察到的多个结果之间的关系。这些结果可能完全相互之间完全独立,或者它们之间可能存在重叠。不同的累积模式具有不同的语义和与计算成本,适用于不同的场景。

          最后,在回答无限数据处理中的4个问题时,更容易搞清楚这些概念和它们之间的关联关系:

    • What 计算的结果是什么?
            Pipeline中的转换来决定结果。例如计算总和,构建直方图,训练机器学习模型等等。它也是经典批处理回答的问题。

    • Where 在事件时间中的哪个位置计算结果?
            这个问题是通过在Pipeline中使用事件时间窗口来回答的。这包括从Streaming 101(固定,滑动和会话)窗口的常见示例,似乎没有窗口概念的用例(例如,Streaming 101中描述的时间不可知处理;经典批处理也通常属于此类别)和其他更复杂的窗口类型,如时间有限的拍卖。还要注意,它可以包括处理时间窗口,如果在记录到达系统时将入口时间指定为记录的事件时间。

    • When 在处理时间中的哪个时刻触发计算结果?
            通过使用Watermark和触发器来回答的这个问题。这个主题有无穷的变化,但最常见的模式是在给定窗口的输入完成时使用Watermak来描绘,触发器允许提前计算结果(对于在窗口完成之前发出的推测性的、部分的结果)和延迟计算结果(Watermark只是预估窗口的数据全部到达,并不是100%确定,在Watermark声明给定窗口的全部到达之后,也有可能会有隶属于该窗口的数据到达)。

    • How 如何修正结果?
            这个问题由所使用的累积类型回答:丢弃(其中结果是相互独立和不同的),累加(后来的结果建立在先前的结果上),累加和撤销(当前的累加值和上次触发的值撤销一起发送)。

          后边将一一讨论这些问题,试图让大量清楚哪些概念与What/Where/When/How中的哪个问题有关。

    Streaming 101 回顾

          首先,回顾一下Streaming 101中提出的一些概念,这一次还将提供一些具体的例子使这些概念更具体。

    What:Transform(变换)

          经典批处理中Transform解决了以下问题:“要计算什么结果?”许多人可能已经熟悉经典的批处理,所以我们将以它为基础,添加所有其他概念,更便于理解。

          对于这一部分,我们来看一个示例:计算由10个整数值组成的简单数据集中的数的总和。这么说有点抽象,在实际中,可以想象一个游戏,10个人组成一个团队,每个人的最终得分相加,就是团队的成绩。也可以想象计费和使用情况的监控使用情况这样的场景。

          对于每个示例,将包括一个简短的Dataflow Java SDK伪代码片段,以使Pipeline的定义更具体。因为是伪代码,所以有时会省略细节(如使用具体的I / O源)、使用简称(Java中的当前触发器名称太冗长)。除了这些(大部分我在Postscript中明确列举)的小事情之外,其它基本上是真实的Dataflow SDK代码。稍后还将提供一个链接到实际的代码演练,可以编译和运行自己的类似例子感兴趣的人,可以实际尝试一下。
          如果熟悉像Spark Streaming或Flink这样的计算引擎,那么在看Dataflow示例代码的时候就会容易一些。接下来开始让人崩溃的旅程,在Dataflow中有两个基本的原语:

    • PCollection

          表示可以由PTransfrom并行处理的数据集,(因此名称开始处的“P”),可以是任意规模的数据集。

    • PTransform转换

          处理PCollection并创建新的PCollection。 PTransform可以执行元素转换,它们可以将多个元素聚合在一起,或者它们可以是其他PTransform的组合。

    这里写图片描述
    图1 Transform变换的类型

          如果觉得不甚明白,或者只是想要参考,可以看Google Dataflow Java SDK文档。

          为了对例子说明,假设我们从一个PCollection <KV <String,Integer >>命名为“input”(即由一个键/值对的字符串和整数组成的PCollection,其中字符串是类似团队名称,整数是相应团队中个人的得分)。在现实世界的Pipeline中,通过从I / O源读取PCollection原始数据(例如日志记录)获得输入,将日志记录解析为适当的键/值对,然后将其转换为PCollection <KV <String,Integer >> 。为了在第一个例子中清楚起见,将包括所有这些步骤的伪代码,但是在随后的例子中,删除了I / O和解析部分。

          因此,对于简单地从I / O源读取数据的管道,解析出团队/分数键值对,并计算每队的得分数,代码如下:

    PCollection<String> raw = IO.read(...);
            PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
            PCollection<KV<String, Integer>> scores = input
              .apply(Sum.integersPerKey());

    列表1.计算总和Pipeline。从数据源读取键值对数据,键是String类型(团队名称),值是Integer类型(团队中各人得分)。然后对每个键(团队)计算总和(团队得分),并输出。

          对于所有的示例,在每个示例的Pipeline的代码片段之后,有1个该Pipeline执行的动画示例。动画中看到一个Key(即一个团队)的10条输入数据执行Pipeline的样子;在一个实际的Pipeline中,类似的操作将有成千上万个,在很多台机器上并行执行。为了能清晰的说明,示例中演示了1个Key的执行过程。

          动画中每个点代表一个输入或输出数据,输入和输出都有两个时间维度:事件时间(X轴)和处理时间(Y轴)。因此,Pipeline按照处理时间维度执行,从下向上延伸,如加粗的上升白线所示。输入是圆圈,圆圈内的数字代表该特定记录的值。输入开始是灰色圆圈,随着Pipeline处理变换颜色。

          当Pipeline处理到某一个值的时候,会将其累加并记录在State中,并最终将聚合结果作为输出。State和输出由矩形表示,矩形顶部不断变化的数字表示累加值,矩形覆盖的区域表示到当前处理时刻,所有矩形中的数据已经被处理。对于清单1中的Pipeline,在经典批处理引擎上执行时,会看起来像这样(请注意,点击下面的图片启动动画,然后会一直循环播放):

    c9ce5cefab8d16db487342717cee477acffa7dfe.jpg?image_play_button_size=2x&image_crop_resized=960x594&image_play_button=1&image_play_button_color=7b796ae0

    图2.经典的批处理

          上边是在批处理引擎上执行Pipeline,累加输入的数据,直到所有输入处理完毕(由顶部的虚线绿色线表示),此时输出为51。在此示例中,因为没有应用任何特定的窗口,所以在事件时间维度上计算了整个数据集的总和; 因此,用于累加的State和输出的矩形覆盖X轴的整体。 但是,如果要处理一个无限数据源,那么经典批处理将是不够的,不能等待输入结束,因为它实际上永远不会结束。 所以需要的使用在Streaming 101中提到的一个概念是—窗口。因此,想要回答第二个问题“在事件时间的哪个位置计算结果?”,现在先简要回顾一下窗口。

    Where: 窗口

          窗口化是把数据在时间上进行切分。常见的窗口策略有:固定窗口、滑动窗口、会话窗口。
    这里写图片描述
    图3. 窗口类型示例,每个窗口类型中包含了3个key,图中呈现了对齐的窗口(应用在整个数据集上)和非对齐窗口(应用在数据子集上).

          为了更好地了解实际中的如何使用窗口,来看一下的上边提到的整数求和Pipeline,使用了长度为2分钟的时间窗口。 使用Dataflow SDK,只需要简单的添加Window.into Transform变化即可(以蓝色文本突出显示):

    PCollection<KV<String, Integer>> scores = input
     .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
      .apply(Sum.integersPerKey());

    列表2. 使用窗口的求和代码

          回想一下,Dataflow提供了一种统一的批处理和流处理模型,因为语义上,批处理只是流处理的一个子集。 因此,首先在批处理引擎上执行此Pipeline; 在批处理引擎上执行此Pipeline,原理简单明了,可以很好的跟在流处理引擎上执行做对比。

    8f42fed76aca326248ee220b2d0e6daece412c20.jpg?image_play_button_size=2x&image_crop_resized=960x594&image_play_button=1&image_play_button_color=7b796ae0

    图4在批处理引擎上执行基于窗口的求和

          如前所述,输入在不断累加存储在State中,直到输入被完全处理,然后产生输出。 然而,在这种情况下,不是1个输出,而是4个:在时间上切分成了4个事件时间窗口,每个窗口产生一个输出。

          到此为止,我们重新回顾了Streaming 101中引入的两个主要概念:事件时间和处理时间之间的关系以及窗口。 再进一步,开始阐述本节开头提到的新概念:Watermark,触发器和累积。 下边开始Streaming 102。

    Streaming 102

          在上边我们看到了在批引擎上执行使用了窗口的Pipeline。 理想情况下,我们希望具有较低的处理延迟,同时也希望能涵盖对无限数据集的处理。 切换到流式引擎是朝着正确方向迈出的一步。批处理引擎可以明确的知道,每个窗口的数据是否完整了(即,有限数据集中所有的数据都被处理了),但目前缺乏确定无限数据集的完整性的实用方法。 接下来介绍Watermark。

    When: Watermark

          Watermark是问题答案的前半部分:“在处理时间维度上,什么时候计算窗口的结果?”

          Watermark是事件时间中输入完整性的时间概念。 换句话说,它们是系统根据当前处理的数据的事件时间判断处理进度和完整性的方法(有限或无限数据集,在无限数据的情况下作用更为明显)。

          回想一下Streaming 101中这个图,这里稍作修改,其中描述了事件时间和处理时间之间的偏差,大多数真实世界的分布式数据处理系统中时间偏差一直在变化。

    这里写图片描述
    图5. 事件时间进度, 偏差, 和watermark

          图中表示现实(reality)的曲折红线本质上是Watermark。随着处理时间的推移,它跟踪事件时间完整性的进度。在概念上,可以将Watermark视为函数F(P) - > E,在处理时间中选取一个点,返回事件时间的一个点。 (更准确地说,对函数的输入实际上是在Pipeline中观察到Watermark的点上游的一切的当前状态:输入源,缓冲数据,正在处理的数据等;但在概念上,将其视为从处理时间到事件时间的映射更简单。)事件时间点E是系统认为事件时间小于E的所有数据都到齐了。换句话说,这是一个断言,不再有更多的事件时间少于E的数据。根据Watermark的类型:完美的或启发式的,这种断言可能是严格的保证或经过训练的猜测:

    • 理想Watermark

      在完全了解所有输入数据的情况下,可以构建理想的Watermark; 在这种情况下,没有延迟数据; 所有数据都提前或准时到达。

    • 启发式Watermark

      对于许多分布式输入源,完全了解输入数据是不切实际的,在这种情况下,最佳选择是提供启发式水印。 启发式Watermark使用任何有关输入的信息(分区,分区内排序,文件增长率等),以提供尽可能准确的进度估计。 在许多情况下,启发式Watermark可以预测的非常准确。 即使如此,使用启发式Watermark意味着它有时可能是不正确的的,这将导致有些数据被划分为延迟数据。 我们将在下面的触发器部分中了解如何处理延迟数据。

          Watermark是一个有趣和复杂的话题,未来会写一篇新的文章专门阐述。 现在,为了更好地了解Watermark的作用以及缺点,我们来看一下使用Watermark的流处理引擎的两个例子,以确定何时在清单2中执行使用窗口的Pipeline时实现输出。 左边的例子使用理想的Watermark; 右边的一个使用启发式Watermark

    1ca8c3335e912cd17061ca15889d2e1c27098de2.jpg?image_play_button_size=2x&image_crop_resized=960x344&image_play_button=1&image_play_button_color=7b796ae0

    图6. 在流处理引擎上使用理想(左)和推测(右)Watermark进行基于窗口的求和

          在这两种情况下,当Watermark通过窗口的末尾时,窗口被触发计算。两个执行的主要区别在于右侧的Watermark计算中使用的启发式算法,值9因为迟到的问题而没有被计算在内,这大大改变了水印的形状[3]。这些例子突出了Watermark的两个缺点,具体如下:

    • 太慢
            当任何类型的Watermark,由于已知的未处理数据(例如,由于网络带宽约束而缓慢增长的输入日志)被正确地延迟时,如果结果的计算只依赖Watermark的触发,将直接导致输出结果的延迟。
            这在左图中是最明显的,即迟到的9阻止所有后续窗口的Watermark,即使这些窗口的输入数据已经更早的达到并且是完整的了。第二个窗口[12:02,12:04]尤其明显,从窗口中的第一个值到达到窗口计算并输出结果的时间需要将近七分钟。这个例子中的启发式Watermark要稍微好一点(五分钟直到输出),但这不意味着启发式Watermark从来不会受到其他Watermark滞后的影响;在本例子选择了特殊的数据突出了这种对比。
            这里的重点在于:Watermark提供了一个非常有用的完整性概念,从延迟的角度来看,只考虑完整性是不够的。想象一下一个仪表板,显示重要的指标,按小时或天显示。我们不太可能想等待一整个小时或一天才能查看当前窗口的结果;这是使用经典批处理系统为这种系统提供数据的痛点之一。相反,随着输入的演变和最终的完成,这些窗口的结果会随着时间的推移而持续并不断的更新更好一些。

    • 太快
            当一个启发式Watermark比实际的Watermark更快的向前推进时,会导致原来没有延迟的数据变成了延迟数据。这是在右边的例子中发生的情况:在第1个窗口的输入数据全部到达之前,Watermark进入第1个窗口的末尾,导致错误的输出值为5而不是14。这个缺点是严格的启发式Watermark的问题;既然是启发式就意味着有时会是错误的。因此,如果关心正确性,单纯依靠Watermark确定何时计算结果并发出是不够的。

          在Streaming 101中,对关于无限数据流的强大的无序处理不足的完整性概念作了一些强调。Watermark太慢或太快,是这些论据的基础。完全依赖于完整性概念的系统无法同时获得低延迟和正确性。触发器是解决这些缺点解决方案。

    When:触发器

          触发器是问题的另一半答案:“在处理时间维度上,什么时候该计算窗口的结果并发出?”触发器用来表明在处理时间维度上的哪个时刻该触发窗口计算结果(尽管触发器本身可能会根据其他时间触发,例如在事件时间维度上使用Watermark)。 窗口的每个特定输出都称为窗口的窗格(Pane)。

          用于触发的信号的示例包括:

    • Watermark进度
      即事件时间进度,是图6中我们已经看到的隐式版本,当Watermark通过窗口的末尾时,计算结果并输出[4]。另一个用例是在窗口的生命周期末尾时触发垃圾回收,稍后我们将看到一个例子。

    • 处理时间进度
      对于提供定期的定期更新是有用的,因为处理时间(不像事件时间)总是均匀地,没有延迟地演进。

    • 元素计数
      在窗口中累积有n条数据之后触发。

    • 标点或其他依赖于数据的触发器
      其中记录的一些记录或特征(例如,EOF元素或刷新事件)指示应当生成输出。

          除了基于具体信号触发的简单触发器之外,还有复合触发器,允许创建更复杂的触发逻辑。示例复合触发器如下:

    • Repeat重复触发器
      重复触发器特别适用于处理时间触发器以提供定期更新。

    • AND触发器(逻辑AND)
      所有子触发器都符合触发条件才触发(例如,在Watermark通过窗口结束之后,我们观察到一个终止的标点符号记录),它才会触发。

    • Or触发器(逻辑或)
      子触发器中的任何一个符合触发条件都会引起触发(例如,在水印通过窗口结束之后或我们观察到终止的标点符号记录)时。

    • Sequence触发器
      以子触发器按照预定义的顺序触发子依次触发。

          为了使触发器的概念更具体,继续介绍图6中使用的隐式默认触发器,将其添加到清单2中的代码中:

    PCollection<KV<String, Integer>> scores = input
     .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
                   .triggering(AtWatermark()))
      .apply(Sum.integersPerKey());

    清单3. 明确设定默认触发器

          对触发器能够做什么有了基本的了解,可以开始考虑解决Watermark太慢或太快的问题。在这两种情况下,我们希望在Watermark超过窗口末尾之前或之后能够有机会计算窗口的结果,并能够提供持续更新的机制(除了Watermark超过窗口末尾那一刻)。所以,需要一些多次触发触发器。那么问题就变成了:多次触发用来干什么?

    • 在太慢的情况下,即提供提前的推测结果,我们可能应该假设任何给定的窗口可能有稳定的输入数据,因为我们知道(通过窗口的早期阶段),我们观察到的这个窗口的输入是非常不完整的。因此,当处理时间提前(例如,每分钟一次)时周期性地触发可能是明智的,因为触发发射的数量将不依赖于实际观察到的窗口的数据量;在最坏的情况下,我们只会得到稳定的定期触发发射。

    • 在太快的情况下(即,由于启发式Watermark可能存在错误的推测,所以需要一种机制去能够处理延时数据去修正计算结果),假设Watermark基于相对准确的启发式(通常是相当安全的假设)。在这种情况下,预计不会经常看到延迟很久的数据,但是在实际中确实存在挺多延迟数据,不过结果很快会得到修正。每收到1个延时数据触发一次的策略,能够让我们更快的修正更新计算结果,但是由于预期的后期数据不频繁,应该不会给系统带来大的冲击。请注意,这些只是示例:如果有不同的应用场景,可以自由选择不同的触发器(或选择不适用触发器)。

          最后,我们需要协调这些各种触发器的时间安排:提前,准时和延迟。我们可以使用Sequence触发器和一个特殊的OrFinally触发器组合来实现,OrFinally触发器用来中止这个组合触发器。

    PCollection<KV<String, Integer>> scores = input
     .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
                   .triggering(Sequence(
            Repeat(AtPeriod(Duration.standardMinutes(1)))
                       .OrFinally(AtWatermark()),
                     Repeat(AtCount(1))))
      .apply(Sum.integersPerKey());

    清单4. 手动设定提前也延迟触发

          如上所示,伪代码看起来不错,给出了提前、准时、延迟触发的常用模式,使用略有不便,对应在Dataflow中,提供了一个定制(但语义上相当的)API,使得更容易使用这样的触发器,如下所示:

     PCollection<KV<String, Integer>> scores = input
      .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
                   .triggering(
                     AtWatermark()
                       .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                       .withLateFirings(AtCount(1))))
      .apply(Sum.integersPerKey());

    清单5. 使用early/late api指定提前和延迟触发

          在流处理引擎上执行清单4或清单5中的示例(包含了理想的Watermark和启发式的Watermak),如下所示:

    a12e5efa572e0fb6d0c5ae9d5db1094676f6ef53.jpg?image_play_button_size=2x&image_crop_resized=960x344&image_play_button=1&image_play_button_color=7b796ae0

    图 7. 在流处理引上执行基于窗口的求和,使用提前和延迟触发

          这个版本对图6有两个明显的改进:
          

    • • 对于第二个窗口中的“Watermark太慢”的情况,[12:02,12:04]:我们现在每分钟提供一次定期的提前计算。在理想的Watermark案例中,差异最为突出,其中时间到首次输出从近7分钟降至3分半;在启发式案例中也有明显改善。这两个版本现在都可以随着时间的推移而稳定地进行计算和修正计算结果(具有值7,14,然后是22的窗格),降低了从数据输入到得到计算结果之间的延迟。
            
    • • 对于第一个窗口中的“启发式Watermark太快”的情况,[12:00,12:02]:当9的值延迟到达时,立即将其合并到一个值为14的新的修正的窗格中。
      这些新触发器的一个有趣的副作用是,它们有效地使理想和启发式Watermark版本之间的输出模式标准化。而图6中的两个版本是截然不同的,而现在这两个版本看起来很相似。

          此时剩下的最大差异是窗口生命周期。在理想的Watermark案例中,当Watermark超过窗口末尾后,窗口过期,窗口中的数据再也不会被处理,可以被安全的回收。在启发式Watermark案例中,我们仍然需要保留窗口一段时间来处理延迟数据。但是到目前为止,系统没有任何好的方式知道每个窗口需要保留多长时间。这是最大允许延迟的用武之地。

    When: 最大允许延迟(超过即可回收)

          在谈到最后一个问题(“如何修正结果?”)之前,先来聊一下持续运行、乱序数据流处理系统中的实际面对的问题:垃圾收集。在图7中的启发式Watermark示例中,每个窗口的持续状态在该示例的整个生命周期内都会持续;这是必要的,以便在到达时适当地处理延迟数据。但是,尽管能够保留所有持续状态,直到数据全部处理完毕。实际上,当处理无限数据源时,一直保留给定窗口的状态(包括元数据)通常是不切实际;最终会耗尽内存、磁盘等的空间。
          因此,任何现实世界的无序处理系统都需要提供一些限制其正在处理的窗口的生命周期的方法。最简洁的实现方在系统内定义一个最大允许延迟的边界,即限制任何给定记录最晚到达时间(相对于Watermark时间)不能超过这个时间;任何超过这个时间的数据不再处理,直接丢弃掉。定义了最大允许延迟之后,还需要准确地确定窗口需要保留的时间:直到Watermark超过了窗口的末尾时间+最大允许延迟时间[5]。允许系统丢弃超过最大延迟的数据,还能够节省系统的计算资源。
          由于最大允许延迟和Watermark之间的相互作用有点晦涩,举个例子说明一下。我们来看一下清单5 /图7中的启发式Watermark Pipeline,并添加1分钟的最大允许延迟时间(请注意,之所以选择1分钟是为了更好的在图中说明概念,在现实世界中需要根据场景来确定合理的最大允许延迟时间):

    PCollection<KV<String, Integer>> scores = input
      .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
                   .triggering(
                     AtWatermark()         .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                       .withLateFirings(AtCount(1)))
           .withAllowedLateness(Duration.standardMinutes(1)))
      .apply(Sum.integersPerKey());

    清单6. 带有最大允许延迟的提前和延迟触发

          这个Pipeline的执行看起来像下面的图8所示,添加了以下功能来突出显示允许的延迟效应:

    • • 加粗的水平方向的白线表示当前的处理时间,注释为Lateness Horizon的小刻度表示窗口的最大延时线(事件时间)。

    • • 一旦Watermark超过窗口的最大延迟线,该窗口将被回收,这意味着窗口的所有信息都将被销毁。

    • • 虚线的矩形,表示窗口关闭时窗口覆盖的时间范围(在处理时间和事件时间两个维度上),一个小尾巴向右延伸,以表示窗口的延迟水平(与Watermark对照)。

          在这个图中,为第一个窗口添加了一个额外的延迟数据6。6相对窗口是迟到的,但仍然在允许的最大延迟时间范围内,所以6被累加修正计算结果为11。然而,9 超过了最大允许延迟,直接被丢弃掉。

    4bc76118539bfe60869ec06e6b6919b6e48d0ff2.jpg?image_play_button_size=2x&image_crop_resized=960x643&image_play_button=1&image_play_button_color=7b796ae0

    图8:在流处理引擎上切分窗口并计算,窗口使用提前、延迟触发,并设置了最大允许延迟时间

          关于最大迟延线的两个最后的说明:

    • • 要完全清楚,如果正在从能够提供理想Watermark的资源中获取数据,则无需处理延迟数据,而0秒的最大延迟时间将是最佳的。这是我们在图7的理想Watermark部分中看到的。

    • • 有一些例外情况不需要指定最大延迟,即使在使用启发式Watermark时,比如在数据覆盖的时间的范围内,对有限的key进行统计,(例如,数据覆盖的时间的范围内内,按照Web浏览器分组,统计网站的总访问次数)。在这种情况下,系统中活动窗口的数量受限于使用的Key的数量。只要Key的数量仍然很低,就无须通过设置最大允许的延迟时间来限制窗口的生命周期。

          接下来继续讨论第4个也是最后一个问题。

    How: 累积

          随着时间推移,触发器被用于为一个窗口生成多个窗格,我们发现自己面临最后一个问题:“如何随着时间修正结果?”在我们已经看到的例子中,每个窗格建立在其前一个窗格基础之上。然而,实际上有三种不同的累积方式[6]:

    • 丢弃

          每当窗格计算完毕时,任何存储的窗口状态都将被丢弃。这意味着每个窗格与之前的窗格都是相互独立的。当下游消费者本身正在执行某种累加时,例如当将整数发送到希望自己计算总和的系统时,丢弃模式是有用的,下游系统将数据累加在一起形成最后的结果。

    • 累加

          如图7所示,每当窗格计算完毕时,保留该窗格所有的状态,未来输入的数据会累加到并更新现有状态。这意味着窗格是建立在前面窗格的基础之上的。以后的结果可以简单地覆盖以前的结果,例如在诸如BigTable或HBase的键/值存储中存储输出结果时,累加模式很有用。

    • 累加和撤销

          像累加模式一样,但是当生成新窗格时,同时会为前一个窗格生成1个独立的撤销。撤销(与新的累加结果一起)本质上是在表达,“我以前告诉过你的结果是X,但我错了。撤销我上次告诉你的X,并将其替换为Y.“有两种情况,撤销是特别有用的:

    • • 当下游消费者将不同维度的数据重新分组时,新值可能会与先前的值不同,因此最终会在不同的组中。在这种情况下,新值不能简单的覆盖旧值;需要从旧组中删除旧值,然后将新值添加到新组中。
    • • 当使用动态窗口(例如,后边会有更详细的介绍)时,由于窗口合并,新值可能会替换多个旧窗口。在这种情况下,只从新窗口的信息中难以确定哪些旧窗口中需要撤销。对于旧的窗口进行明确的撤销使得任务变得简单明了。

          放在一起看时,每个类型的累加语义会更清晰。考虑图7中第二个窗口的三个窗格(事件时间范围[12:02,12:04))。下表显示了三个支持的累加模式(每个窗格的值)将在三种支持的累加模式中显示(累积模式是图7中使用的特定模式):

    这里写图片描述
    表1. 对比三种累积模式,使用了图7中的第2个窗口

    • 丢弃
      每个窗格仅包含在特定窗格中到达的值。因此,该窗口的最终值是最后一个窗格的值,而不是总和。但是,如果要自己计算所有独立窗格,将得到正确的答案22.这就是为什么丢弃模式下,下游消费者自己在窗格上执行聚合时很有用。

    • 累加
      如图7所示,每个窗格包含在该特定窗格中到达的值以及前一个窗格中的所有值。因此,最后一个窗格的值是该窗口所有值的总和22。但是,如果要自己累加该窗口的所有窗格,则会对来自窗格2和窗格1的输入进行双重和三重计数,给出的总和是不正确的51.这就是为什么累加模式是最有用的,可以简单地用新的值覆盖以前的值,新值已经包含了迄今为止收到的所有数据。

    • 累积和撤销
      每个窗格都包含一个新的累加模式值以及前一个窗格值的撤销。因此,观察到的最后(非撤销)值以及所有窗格(包括撤销)的总和是正确的答案22.这就是为什么撤回是如此强大。

          下边的代码示例了丢弃模式:

    PCollection<KV<String, Integer>> scores = input
     .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
                   .triggering(
                     AtWatermark()            .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                       .withLateFirings(AtCount(1)))
                   .discardingFiredPanes())
      .apply(Sum.integersPerKey());
    清单7. 丢弃模式与提前和延迟触发       在流处理引擎上使用启发式Watermark的效果如下所示:

    39c7bdd39bb0cff10c8650807b255f3f34fad0a8.jpg?image_play_button_size=2x&image_crop_resized=960x674&image_play_button=1&image_play_button_color=7b796ae0

    PCollection<KV<String, Integer>> scores = input
     .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
                   .triggering(
                     AtWatermark()
       .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                       .withLateFirings(AtCount(1)))
                   .accumulatingAndRetractingFiredPanes())
      .apply(Sum.integersPerKey());

    清单8. 累加和撤销模式,使用提前/延迟触发

          在流处理引擎的执行如下所示:

    3fef7a569ee1cf9e44c4a4859ef059c9c815fde5.jpg?image_play_button_size=2x&image_crop_resized=960x674&image_play_button=1&image_play_button_color=7b796ae0

    图10. 在流处理引擎上,执行累加和撤销模式,使用提前/延迟触发

          由于每个窗口的窗格都是重叠的,所以看起来有点乱。 撤销用红色表示,它与蓝色窗格结合重叠,看起来略带紫色。 在一个给定的窗格中略微移动了两个输出的值(并用逗号分隔),使它们更容易区分。
    将图9,7(启发式)和10的三张图动画效果的最终图放在一起,提供了三种模式的很好的视觉对比:

    这里写图片描述
    图 11. 丢弃模式、累加模式、累加&撤回模式对比

          从左到右是丢弃模式、累加模式、累加&撤回模式,三种模式需要的存储和计算成本依次递增。可以想像,在存储和计算成本方面,呈现的顺序(丢弃,累加,累加和撤回)的模式都相继更昂贵。 为此,累积提供了一种新的维度,在正确性,延迟和成本的之间进行权衡。

    小结

          到此为止,我们接触了4个问题:

    • What要计算什么结果? Transform.
    • Where在事件时间中的哪个位置计算结果? 窗口
    • When在处理时间中的哪个时刻触发计算结果?Watermark和触发
    • How如何修正结果? 累积

          我们目前只了解了一种窗口类型:基于事件时间的固定窗口。 从Streaming 101中我们提到了多种窗口,其中有两个是今天要详细阐述的:基于处理时间的固定窗口,基于事件时间的会话窗口。

    When/Where: 基于处理时间的窗口

          处理时间窗口重要的原因有两个:

    • • 对于某些使用情况,例如使用情况监控(例如,Web服务流量QPS),希望在收到数据流时分析数据,处理时间窗口绝对是适当的方法。

    • • 对于事件发生时间很重要的(例如,分析用户行为趋势,计费,评分等)的场景,处理时间窗口绝对是错误的选择,要能够清晰的区分哪些场景合适。

          因此,值得深入了解处理时间窗口和事件时间窗口之间的差异,特别是考虑到当今大多数流处理系统中广泛使用了处理时间窗口。
    当使用模型时,例如在这篇文章中提到的,作为一等公民的窗口是严格基于事件时间的,有两种方法可以用来实现处理时间窗口:

    • 触发器:忽略事件时间(即,使用跨越所有事件时间的全局窗口),并使用触发器在处理时间轴中提供该窗口的快照。
    • 进入时间:即数据到达系统的时间,将入口时间分配为数据作为事件时间,并使用正常的事件时间窗口进行后续处理。 SparkStreaming中的处理时间就是这么做的。

          请注意,这两种方法或多或少等同,但在在多处理步骤Pipeline的情况下略有不同:在触发器版本中,每个处理步骤都使用处理时间切分窗口,步骤之间相互独立,因此例如窗口X中的数据为 一个阶段可能会在下一阶段的窗口X-1或X + 1中; 在进入时间版本中,一旦将数据归于窗口X中,由于不同的处理步骤时间使用Watermark同步处理进度(Dataflow的做法),在整个处理过程中都会一直属于窗口X。对micro-batch来说( Spark Streaming的做法),micro-batch的边界或其他因素,是在引擎级别协调处理。

          正如一直强调的,处理时间窗口的最大缺点是,当输入的顺序发生变化时,窗口的内容会发生变化。 为了更具体地说明这一点,我们来看这三种用例:

    • • 事件时间窗口
    • • 使用触发器的处理时间窗口
    • • 使用进入时间的处理时间窗口

          我们将每个窗口应用到两个不同的输入数据集(总共有6个变体)。 两个输入数据包含完全相同的事件(即相同的值,发生在相同的事件时间),但顺序不同。 第1个数据集跟我们之前例子中的顺序一致,颜色为白色; 第二个数据集调整了事件的处理顺序,如下图12所示,为紫色。

    基于事件时间的窗口Event-time windowing

          为了建立一个基线,我们首先将基于事件时间的使用启发式Watermark的固定窗口处理两个顺序不同的数据集。 我们将重用清单5 /图7中的提前/延迟处理的代码,即如果如下。 左边实际上是我们以前看到的; 右边是第二个数据集的结果。 这里要注意的一点是:尽管输出的整体形状不同(由于处理时间不同),四个窗口的最终结果保持不变:14,22,3和12:

    2b26fd31ca9629d521fe80d821c0b76de941ef74.jpg?image_play_button_size=2x&image_crop_resized=960x344&image_play_button=1&image_play_button_color=7b796ae0

    图13. 基于事件时间窗口处理两个内容一样但顺序不一样的输入数据集

    使用触发器的处理时间窗口

          现在我们来比较上述两种处理时间方法。 首先,将尝试触发器方法。使用处理时间窗口达到效果,需要考虑以下三个方面:

    • 窗口: 使用全局事件时间窗口,本质上是以事件窗格模拟处理时间窗口

    • 触发: 根据处理时间窗口的期望大小,在处理时间维度上周期性触发。

    • • 累加: 使用丢弃模式来保持窗格彼此独立,从而让每个窗格都像一个独立的处理时间窗口。

    相应的代码看起来像清单9; 请注意,全局窗口是默认的,因此没有具体的覆盖策略:

    PCollection<KV<String, Integer>> scores = input
      .apply(Window.triggering(
                      Repeatedly(AtPeriod(Duration.standardMinutes(2))))
                   .discardingFiredPanes())
      .apply(Sum.integersPerKey());
    清单9. 在全局事件窗口上使用重发触发器、丢弃模式,模拟处理时间窗口       当流处理引擎上输入两个不同顺序的数据集的时候,结果如下图14所示。 有趣的笔记与这个数字:       由于我们了基于事件时间的窗格模拟处理时间窗口,所以在处理时间轴中勾画了“窗口”,这意味着窗口宽度是在Y轴上度量而不是X轴。由于处理时间窗口对输入数据的顺序敏感,在两个数据集中,每个窗口包含的数据都是不同的,即时事件发生的时间相同。 在左边我们得到12,21,18,而在右边我们得到7,36,4。

    328a112908da71728e15466166124afe085e802c.jpg?image_play_button_size=2x&image_crop_resized=960x301&image_play_button=1&image_play_button_color=7b796ae0

    图14. 使用触发器模拟模拟处理时间窗口,处理两个内容一但顺序不一样的数据集

    使用入口时间的处理时间窗口

          最后,我们来看看通过将输入数据的事件时间映射为入口时间来实现的处理时间窗口。在代码方面,这里有四个方面值得一提:

    • 时移
      当数据到达时,数据的事件时间被入口时间(数据到达时的处理时间)覆盖。请注意,我们目前在Dataflow中没有标准API来处理时间,尽管我们接下来会可能会使用伪代码I / O源中的虚构方法来代表下面的代码。对于Google Cloud Pub / Sub,只需在发布消息时将消息的timestampLabel字段留空;对于其他来源,需要查阅源代码文档。

    • 窗口
      返回使用标准的固定事件时间窗口。

    • 触发
      由于入口时间提供了计算理想Watermark的能力,所以可以使用默认触发器,在这种情况下,当Watermark通过窗口的末尾时,触发器会隐式触发一次。

    • 累积模式
      由于我们每个窗口只有一个输出,所以累积模式是无关紧要的。

    实际的代码可能看起来像这样:

    PCollection<String> raw = IO.read().withIngressTimeAsTimestamp();
    PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
    PCollection<KV<String, Integer>> scores = input
      .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
      .apply(Sum.integersPerKey());
    清单10. 明确的默认触发器
          在流式引擎上的执行将如下面的图15所示。当数据到达时,它们的事件时间被覆盖为它们的进入时间(即到达时的处理时间),导致在理想Watermark线上的向右水平移位。该图中有趣的注释: - 与其他处理时间窗口示例一样,当输入的顺序变化时,即使输入的值和事件时间保持不变,我们也会得到不同的结果。 - 与其他示例不同,窗口在事件时间维度上(因此沿X轴)重新划分了。尽管如此,这些窗口并不是原生的事件时间窗口;而是我们将处理时间简单地映射到事件时间上,擦除每个输入的原始记录,并用新的记录代替它,而事件的时间是表示Pipeline首次收到到数据的时间。 - 尽管如此,由于使用了Watermark,触发器仍然在与之前的处理时间示例完全相同的时间触发。此外,所产生的输出值与该示例相同,如左侧的12,21,18,右侧的7,36,4。 - 由于使用入口时间,所以理想的Watermark是可能的,所以实际的Watermark与理想的Watermark相匹配,斜率为1,向右上方延伸。

    54b515cfcbf6f0b237c953f1681547597c6cfbcd.jpg?image_play_button_size=2x&image_crop_resized=960x316&image_play_button=1&image_play_button_color=7b796ae0

    图15. 使用入口时间的处理时间窗口,处理两个内容一样但顺序不同的数据集

          虽然看到不同的方法可以实现处理时间窗口很有趣,但是这里的大部分内容是自从第一篇文章以来一直提到的:事件时间窗口与顺序无关,至少在极限情况下如此(实际 在处理过程中的窗格可能会不同,直到输入完成),而处理时间窗口不是。 如果关心事件实际发生的时间,必须使用事件时间窗口,否则计算结果是无意义的。

    Where: 会话窗口

          现在来看一下我最喜欢的特性之一:动态的、数据驱动的窗口,称为会话窗口。
          会话是一种特殊类型的窗口,它捕获数据中的一段活动,在不活动一段时间后窗口中止。 它们在数据分析中特别有用,因为可以让我们看到某一个特定用户在一段时间内的行为。 这可以让我们分析会话内的活动的相关性,基于会话的长度来推断用户的参与水平等。
          从窗口的角度来看,会话窗口在两个方面很有趣:

    • • 它们是数据驱动窗口的示例:窗口的位置和大小是输入数据本身来决定,而不是在时间内基于某些预定义模式,如固定和滑动窗口。

    • • 它们也是不对齐的窗口的示例,即,窗口并不将数据一视同仁,而是将数据的特定子集(例如,每个用户)进行切分。
      这与对齐的窗口(如固定和滑动窗口)形成对比,这些窗口通常对数据一视同仁,进行切分。

          对于一些用例,可以提前在一个会话中的数据中标记一个共同标识符(例如,在线的的视频播放器,定时发出心跳包,心跳包内容是服务质量信息,对于任何给定的一次观看,分配一个会话ID,所有的心跳信息中都添加这个会话ID)。在这种情况下,会话更容易构建(按照会话ID区分会话),本质上是按键分组的一种形式。

          然而,在更一般的情况下(即,实际会话提前并不知道),会话只能从从数据中构建出来。当处理无序数据时,这变得特别棘手。
          提供一般会话支持的关键是,根据定义,完整的会话窗口是一组较小的重叠窗口的组合,每个窗口包含单个记录,每个记录中的每个记录与下一个记录的间隔不超过预先定义的间隔。因此,即使会话中的数据乱序了,也可以简单地通过将各个数据的重叠窗口合并在一起来构建最终会话。
    这里写图片描述
    图16. 未合并的原始会话窗口和合并之后的会话窗口

          下边看一个代码示例,以清单8中的代码为基础,修改为使用会话窗口:

    PCollection<KV<String, Integer>> scores = input
      .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
                   .triggering(
                     AtWatermark()
                       .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                       .withLateFirings(AtCount(1)))
                   .accumulatingAndRetractingFiredPanes())
      .apply(Sum.integersPerKey());
    清单11. 基于会话窗口,提前和延迟触发,使用累加和撤销模式       在流处理引擎上执行如下所示:

    62ad3e2bf2f3477c14d54600bbc8eeff27b3b0d6.jpg?image_play_button_size=2x&image_crop_resized=960x674&image_play_button=1&image_play_button_color=7b796ae0

    图17. 基于会话窗口,提前和延迟触发,使用累加和撤销模式

          这里有很多事情,所以我将介绍一些它:

    • • 当遇到具有值为5的第一个记录时,它被放置在从该记录的事件时间开始的单个原始会话窗口中,窗口宽度为会话窗口的超时时长,例如超时时长为1分钟,会话窗口宽度为1分钟。在后边遇到的任何窗口与该窗口重叠的都应该隶属于同一个会话,并且合并到此窗口中。

    • • 第二个到达记录是7,它类似地放在自己的原始会话窗口中,因为它不与5的窗口重叠。

    • • 同时,Watermark已经过第一个窗口的末尾,所以在12:06之前,包含值5的窗口被物化为准时的窗口。此后不久,当处理时间正好为12:06的时候,第二个窗口也被物化为具有值7的推测结果。

    • 我们接下来观察一系列的记录,3,4和3,这3个会话窗口相互重叠。因此,它们都被合并在一起,并且在12:07的时候提前触发,发出一个值为10的单个窗口。

    • • 当8到达不久之后,它与具有值7的会话和与值10的会话重叠。所有这三个因此被合并在一起,形成具有值25的新的组合会话。当Watermark然后通过这个会话的末尾时,它物化了一个包含值25的新会话以及之前发布的两个窗口的撤消,但后来被并入它:7和10。

    • • 当9到达延迟到达时,类似的舞蹈发生在9号晚上,与值为5的原始会话,和值为25的会话变成了一个更大的值为39的一个较大的会话。值39和窗口25、5的撤销被立即延迟触发。

          窗口是极其有用的工具,可以非常容易的将流处理的维度分解成不同的可组合的部分。这样可以让我们更多地关注业务逻辑,而不是花费很多的精力处理数据的格式相关的问题。
          如果不相信,请看Spark的文章,介绍如何在Spark Streaming上手动构建会话(请注意,这不足以指出他们; Spark人员刚刚完成了足够的工作,有人实际上打扰了记录在其上构建特定种类的会话支持所需要的麻烦;对于大多数其他系统来说,我不能说相同)。这很相关,他们甚至没有做适当的事件时间会话,或提供投机或晚期的启动,也不会撤退。(这一段是原文,随着Spark的新版本的发布,对Spark的描述已经过时了)

    结尾

          现在我们已经深入了解了建立强大的流处理系统的基础,快速回顾一下我们所讲的内容。首先,我们提到的主要概念:

    • • 事件时间与处理时间:事件发生时间和被数据处理系统处理的时间之间的重要区别。

    • • 窗口:通常使用的方法是通过在时间边界(通过处理时间或事件时间)对其进行切分来管理无限数据,尽管我们将数据流模型中的窗口定义缩小仅表示事件时间内)。

    • • Watermark:事件时间进度的概念,为在无限数据上运行的乱序处理系统提供了估计窗口数据完整性的手段。

    • • 触发器:用于精确指定在合适计算窗口结果的机制,对于特定用例是有意义的。

    • • 累积:在单个窗口被多次触发计算的情况下,随着触发持续的修正窗口结果。

          其次,我们用来构建我们探索的四个问题(我承诺我不会再在此之后再读)

    • What 要计算出什么结果? =变换Transform
    • Where事件在哪里结果计算? =窗口
    • When 在处理时间维度上什么时候计算窗口结果?=Watermark+触发器
    • How如何不断的修正计算结果? =累积

          第三,最后一点,这种流处理模式所带来的灵活性(最终,需要做的是在处理数据的各种要素之间取得平衡,如正确性,延迟和成本),回顾一下,通过少量的代码修改,对相同的数据集处理而得到的输出的变化如

    结束!

    展开全文
  • Spark Streaming 介绍及架构——基础篇

    千次阅读 2018-07-19 09:41:03
    Spark Streaming是Spark API核心的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。 数据可以从许多来源获取,如Kafka,Flume,Kinesis或TCP套接字,并且可以使用复杂的算法进行处理,这些算法...

    1 概述

    官方网站
    Spark Streaming是Spark core API的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。 数据可以从许多来源获取,如Kafka,Flume,Kinesis或TCP sockets,并且可以使用复杂的算法进行处理,这些算法使用诸如map,reduce,join和window等高级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将Spark的机器学习和图形处理算法应用于数据流。

    总的来说我们可以从三点进行考虑:输入—–计算—–输出。正如下图所示:
    这里写图片描述
    1. 输入:可以从Kafka,Flume,HDFS等获取数据
    2. 计算:我们可以通过map,reduce,join等一系列算子通过spark计算引擎进行计算(基本和RDD一样,使用起来更方便。)
    3. 输出:可以输出到HDFS,数据库,HBase等。

    2 处理数据的特点

    在内部,它的工作原理如下。 Spark Streaming接收实时输入数据流并将数据分成批,然后由Spark引擎处理,以批量生成最终结果流。

    这里写图片描述
    从图中也能看出它将输入的数据分成多个batch进行处理,严格来说spark streaming 并不是一个真正的实时框架,因为他是分批次进行处理的。

    Spark Streaming提供了一个高层抽象,称为discretized stream或DStream,它表示连续的数据流。 DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示为一系列RDD。

    3 wordcount代码演示进行进一步认识

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object SocketWordCountApp {
    
      def main(args: Array[String]): Unit = {
        //创建SparkConf
        val conf=new SparkConf().setAppName("SocketWordCountApp").setMaster("local[2]")
        //通过conf 得到StreamingContext,底层就是创建了一个SparkContext
        val ssc=new StreamingContext(conf,Seconds(5))
        //通过socketTextStream创建一个DSteam,可以看出这里返回的是ReceiverInputDStream[T],后面从源码进行分析
        val DStream=ssc.socketTextStream("192.168.137.130",9998)
        //wc (看看是不是和RDD中的wc一样呢)
        DStream.flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey(_+_).print()
    
        // 开始计算
        ssc.start()
        // 等待计算结束
        ssc.awaitTermination()
      }
    }
    

    我们在通过nc命令像端口9998输入数据;

    [hadoop@hadoop ~]$ nc -lp 9998  
    a,a,a,a,b,b,b
    • 查看结果
    (b,3)
    (a,4)

    4 初始化StreamingContext

    要初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。StreamingContext对象也可以从现有的SparkContext对象创建。

    val conf = new SparkConf().setAppName(appName).setMaster(master)
    val ssc = new StreamingContext(conf, Seconds(1))

    当一个Context被定义,你必须做以下的事情:
    1. 通过定义输入DStream来创建输入源。
    2. 通过在DStream上应用转换操作和输出操作来定义流计算。
    3. 使用StreamContext.start()开始来接收数据和处理数据。
    4. 使用StreamContext.awaitTermination()来等待计算完成(手动或者因错误终止)。
    5. 可以StreamContext.stop()来手动停止计算(一般不会停止)。

    注意

    a.一旦一个StreamingContext被启动,就不能再设置或添加新的流计算。
    b.一旦一个StreamingContext被停止,就不能重新启动。
    c.同一时间内,在JVM内部只有一个StreamingContext处于活跃状态。
    d.默认情况下使用stop()方法停止StreamingContext的同时也会停止SparkContext,如果执行停止
    StreamingContext,可以将stop()的可选参数设置为false。
    e.SparkContext可以复用,即用来创建多个StreamingContext,只要在创建新的StreamingContext时,
    之前创建的StreamingContext是处于stop状态即可(SparkContext没有被停止)。
    

    5 Discretized Streams (DStreams)

    Discretized Streams或DStream是Spark Streaming提供的基本抽象。 它表示连续的数据流,即从源接收的输入数据流或通过转换输入流生成的已处理数据流。 在内部,DStream由连续的RDD系列表示,这是Spark对不可变的分布式数据集的抽象。 DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示。
    这里写图片描述

    在DStream上应用的任何操作都会转化为对每个RDD的操作。例如,wordcount案例中(下面会进行代码演示),flatMap操作应用于DStream行中的每个RDD,以生成单词DStream的RDD。 这在下图中显示。
    这里写图片描述

    这些基础RDD转换由Spark引擎计算。 DStream操作隐藏了大部分这些细节,并为开发人员提供了更高级别的API以方便使用。 这些操作将在后面的章节中详细讨论。

    6 spark streaming架构

    我们应该知道spark有很多种运行模式,下面通过spark on yarn (cluster模式)的模式图进行介绍,所以想要对spark streaming的运行架构进行理解,你要知道在yarn上提交作业的流程(可以参考该篇博客),以及spark的运行流程(参考该篇博客),下面是我在网上找的一幅图,我们根据这幅图 进行一个学习:
    这里写图片描述
    下面对于这幅图进行详细的剖析:
    符号表示:1,2,3….代表Spar on Yarn启动流程 ;(1)(2)(3)….代表Spark Streaming执行过程。
    1. 通过spark client提交作业到RM
    2. ResouceManager为该作业分配第一个Container,并与对应的NodeManager通信,创建Spark ApplicationMaster(每个SparkContext都有一个ApplicationMaster)
    3. NodeManager启动Spark AppMaster。
    4. Spark AppMaster并向ResourceManager AsM注册,用户就可以通过UI查看作业情况。
    5. ResourceManager通知NodeManager分配Container。(每个container的对应一个executor)
    6. NodeManager准备资源,并分配给executor。

    Spark Streaming执行过程

    spark on Yarn模式Driver运行在NM的container之中,运行application的main()函数并自动创建SparkContext,在SparkContext之上会创建一个 StreamingContext(因为途中并没有标出,这里说明下)。
    (1)SparkContext向资源管理器注册并申请运行Executor资源;
    (2)Executor会启动Receive接收数据(Data Received),分批处理。
    (3)Receive接收到数据后汇报给streamingcontext(底层调用的sparkcontext),他会以多个副本存储,默认两个(后面进行源码解读就知道了。)
    (4)Spark ApplicationMaster和executor(container)进行交互,分配task。
    (5)每个executor上会运行多个task执行任务。

    最后把结果保存在HDFS上。

    Saprk Streaming数据处理过程

    这里写图片描述
    首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。这里也和上面所说的一致。

    7 源码解析

    前面我们说了StreamingContext,底层就是创建了一个SparkContext,我们从源码中进行证明:
    new StreamingContext(conf,Seconds(5))

    /**
       * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
       * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
       * @param batchDuration the time interval at which streaming data will be divided into batches
       */
      def this(conf: SparkConf, batchDuration: Duration) = {
        this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
      }
    
    
    我们new StreamingContext(conf,Seconds(5)) 其实调用的上面的方法,
    1.传递一个SparkConf应该不陌生把,指定Spark参数的org.apache.spark.SparkConf对象;
    2.Duration流式数据分成批次的时间间隔
    
    

    我们 接着看看这句话StreamingContext.createNewSparkContext(conf)

      private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
        new SparkContext(conf)
      }
    
    可以看到底层就是给我们创建了一个SparkContext
    通过socketTextStream创建一个DSteam,可以看出这里返回的是ReceiverInputDStream[T],从源码进行分析:

    ssc.socketTextStream("192.168.137.130",9998)

    /**
       * Creates an input stream from TCP source hostname:port. Data is received using
       * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
       * lines.
       * @param hostname      Hostname to connect to for receiving data
       * @param port          Port to connect to for receiving data
       * @param storageLevel  Storage level to use for storing the received objects
       *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
       * @see [[socketStream]]
       */
      def socketTextStream(
          hostname: String,
          port: Int,
          storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
        ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
        socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
      }
    
    1.通过tcp socket监听hostname:port,接受字节(UTF8编码,`\ n`分隔)
    2.这里我们看到了默认的存储级别StorageLevel.MEMORY_AND_DISK_SER_2,因为他是一个默认参数,所以我们直接使用了默认的就木有传递。(和前面对应了吧)
    (还有一个问题,还记得spark中缓存级别吗???)
    3.返回ReceiverInputDStream
    4.调用了socketStream方法
    

    继续查看socketStream方法

    /**
       * Creates an input stream from TCP source hostname:port. Data is received using
       * a TCP socket and the receive bytes it interpreted as object using the given
       * converter.
       * @param hostname      Hostname to connect to for receiving data
       * @param port          Port to connect to for receiving data
       * @param converter     Function to convert the byte stream to objects
       * @param storageLevel  Storage level to use for storing the received objects
       * @tparam T            Type of the objects received (after converting bytes to objects)
       */
      def socketStream[T: ClassTag](
          hostname: String,
          port: Int,
          converter: (InputStream) => Iterator[T],
          storageLevel: StorageLevel
        ): ReceiverInputDStream[T] = {
        new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
      }
    
    这些参数大家应该明白了吧,我们继续看看SocketInputDStream[T]到底是什么吧。
    class SocketInputDStream[T: ClassTag](
        _ssc: StreamingContext,
        host: String,
        port: Int,
        bytesToObjects: InputStream => Iterator[T],
        storageLevel: StorageLevel
      ) extends ReceiverInputDStream[T](_ssc) {
    
    SocketInputDStream这个类继承了ReceiverInputDStream,感觉快看到希望了啊。
    
    /**
     * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
     * that has to start a receiver on worker nodes to receive external data.
     * Specific implementations of ReceiverInputDStream must
     * define [[getReceiver]] function that gets the receiver object of type
     * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
     * to the workers to receive data.
     * @param _ssc Streaming context that will execute this input stream
     * @tparam T Class type of the object of this stream
     */
    abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
      extends InputDStream[T](_ssc) {
    
    1.ReceiverInputDStream是一个抽象列继承了InputDStream,必须在工作节点上启动接收器才能接收外部数据,
    2.ReceiverInputDStream 通过getReceiver函数获取类型的接收器对象,即org.apache.spark.streaming.receiver.Receiver,
    3.Receiver的作用是接受数据。

    InputDStream[T](_ssc)是什么呢?

    /**
     * This is the abstract base class for all input streams. This class provides methods
     * start() and stop() which are called by Spark Streaming system to start and stop
     * receiving data, respectively.
     * Input streams that can generate RDDs from new data by running a service/thread only on
     * the driver node (that is, without running a receiver on worker nodes), can be
     * implemented by directly inheriting this InputDStream. For example,
     * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
     * new files and generates RDDs with the new files. For implementing input streams
     * that requires running a receiver on the worker nodes, use
     * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
     *
     * @param _ssc Streaming context that will execute this input stream
     */
    abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
      extends DStream[T](_ssc) {
    
    1.这是所有输入流的抽象基类。这个类提供了方法由Spark Streaming系统调用的start()和stop()来启动和停止
    接收数据。
    2.输入流会被差分成多个rdd,运行在每一个线程中。
    3.驱动程序节点(即,不在工作节点上运行接收器)可以 通过直接继承此InputDStream实现。例如,
    FileInputDStream,InputDStream的一个子类。产生一个新文件或者生成多个rdd都会产生新文件,
    这是通过driver监视HDFS目录
    4.用于实现输入流需要在工作节点上运行接收器,请使用
    [[org.apache.spark.streaming.dstream.ReceiverInputDStream]]作为父类。
    
    这里我们可以看到InputDStream继承DStream,终于看到了DStream,全村人的希望啊.
    **
     * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
     * sequence of RDDs (of the same type) representing a continuous stream of data (see
     * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
     * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
     * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
     * transforming existing DStreams using operations such as `map`,
     * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
     * periodically generates a RDD, either from live data or by transforming the RDD generated by a
     * parent DStream.
     *
     * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
     * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
     * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
     * `join`. These operations are automatically available on any DStream of pairs
     * (e.g., DStream[(Int, Int)] through implicit conversions.
     *
     * A DStream internally is characterized by a few basic properties:
     *  - A list of other DStreams that the DStream depends on
     *  - A time interval at which the DStream generates an RDD
     *  - A function that is used to generate an RDD after each time interval
     */
    abstract class DStream[T: ClassTag] (
        @transient private[streaming] var ssc: StreamingContext
      ) extends Serializable with Logging {
    
    这一大段注释,,头大。。。
    其实就是我们前面说的输入-----计算-----输出:
    可以通过TCP socket,Kafka,Flume 输入数据,转化为DStream,
    通过一些算子 `map`, `filter` and`window`进行计算
    
    DStream内部具有几个基本属性:
     * - DStream依赖的其他DStream列表(个人感觉就是RDD之间的依赖关系)
     * - DStream生成RDD的时间间隔(可自行设置)
     * - 每个时间间隔后用于生成RDD的函数(生成多个RDD)

    终于从头到尾给看完了哈,进行了一个简单的介绍,不知道小伙伴有没有理解呢。

    8 输入数据流

    从源端接受的数据代表输入数据流,通过接受输入数据会产生一个DStream,例如我们上面进行的wc中val DStream=ssc.socketTextStream("192.168.137.130",9998)这句代码接收数据后返回一个DStream, 每个输入DStream都与Receiver对象相关联,Receiver对象从源接收数据并将其存储在Spark的内存中进行处理。

    Spark Streaming提供了两类内置streaming sources。
    基本来源:StreamingContext API中直接可用的来源。 示例:文件系统和socket connections。
    高级来源:可通过额外的实用程序课程获得Kafka,Flume,Kinesis等来源。 这些要求链接部分中讨论的额外依赖关系。(后面我们会进行讲解)

    注意,如果您想在流式传输应用程序中并行接收多个数据流,则可以创建多个输入DStream(在性能调整部分中进一步讨
    论)。 这将创建多个接收器,它将同时接收多个数据流。 但请注意,Spark worker / executor是一项长期运行的任
    务,因此它占用了分配给Spark Streaming应用程序的核心之一。 因此,重要的是要记住,Spark Streaming应用程序
    需要分配足够的内核(或线程,如果在本地运行)来处理接收到的数据以及运行接收器。
    

    对local进行进一步理解
    1. local :用一个工作线程在本地运行Spark
    2. local[K]:使用K工作线程在本地运行Spark(理想情况下,将其设置为您计算机上的核心数)。
    3. local[K,F]:使用K工作线程,最多为F在本地运行Spark(可以通过参数spark.task.maxFailures设置)
    4. local[*]:使用与您的计算机上的逻辑内核一样多的工作线程在本地运行Spark。

    在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”作为主URL。 这两者中的任何一个都意味着只有一个线程将用于本地运行任务。 如果您使用的是基于接收器的输入DStream(例如套接字,Kafka,Flume等),那么receive接收器对象将占用一个线程,那就意味着没有足够的线程来处理数据。因此,在本地运行时,请始终用“local [n]”作为主URL,其中n>要运行的接收器的数量。在群集上运行,分配给Spark Streaming应用程序的内核数量必须多于接收器的数量。 否则系统将接收数据,但无法处理它。

    展开全文
  • Streaming(DataStream API): 概念介绍

    千次阅读 2018-09-20 00:11:09
    Streaming(DataStream API) 原文参考: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#collection-data-sources Overview Flink DataStream Api 编程指南 在...
    1. Streaming(DataStream API)

    原文参考:

    https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#collection-data-sources

      1. Overview
        1. Flink DataStream Api 编程指南

    Flink中的DataStream 程序在数据流(data streams)上实现了各种转换(transformation)操作(,filter,updating,state,window,aggregating )Data Streams 可以从各种数据源(message queue,socket,fiels )中被创建。产生的结果可以输出到各种sink(目的地),比如将它写入到数据文件或一些标准的输出当中。Flink 程序可以在各种环境中运行,如 standlone ,嵌入到其他程序中。Flink能在本地的JVM中执行,也可以在集群中运行(yarn.

     

    flink Api的基本概念请参照 basic concepts 

     

    为了创建你自己的Flink DataStream 程序,我们鼓励你一开始使用 anatomy of a Flink Program 并逐步的添加 stream transformations. 下面的章节将为添加一些operations(翻者注:Flink 中的任何的transformations)和高级特性做一些引用说明

     

    Example 程序案例

    Data Source 数据源

    DataStream transformation

    Data sink 数据输出

    Iterations

    Execution Parametes 执行参数

      Fault Tolerance(故障容错)

      Controlling Latency (延迟控制)

    Debugging

      Local Execution Envionment

      Collection Data Sources

      Iterator Data Sink

    Where to go next (下一站)?

     

        1.  Example Program

    下面的代码是一段完整的基于窗口的 word count 应用的例子,单词的数量来源于一个5秒窗口的socket . 你可以复制到本地并运行它。

    Java 代码片段

    public class WindowWordCount {
    
        public static void main(String[] args) throws Exception {
    
      
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
      
    
            DataStream<Tuple2<String, Integer>> dataStream = env
    
                    .socketTextStream("localhost", 9999)
    
                    .flatMap(new Splitter())
    
                    .keyBy(0)
    
                    .timeWindow(Time.seconds(5))
    
                    .sum(1);
    
      
    
            dataStream.print();
    
      
    
            env.execute("Window WordCount");
    
        }
    
      
    
        public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
            @Override
    
            public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
    
                for (String word: sentence.split(" ")) {
    
                    out.collect(new Tuple2<String, Integer>(word, 1));
    
                }
    
            }
    
        }
    
    }

     

     

     

    Scala 代码片段

    import org.apache.flink.streaming.api.scala._
    
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /**
    
      * Created by yuanhailong on 2018/9/19.
    
      */
    
    object WindowWordCount {
    
      def main(args: Array[String]) {
    
      
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val text = env.socketTextStream("localhost", 9999)
    
      
    
        val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
    
          .map { (_, 1) }
    
          .keyBy(0)
    
          .timeWindow(Time.seconds(5))
    
          .sum(1)
    
      
    
        counts.print()
    
      
    
        env.execute("Window Stream WordCount")
    
      }
    
    }
    
     

     

    为了运行这个例子,首先你需要启动在命令行终端用netcat 启动一个输入流:

     

    nc –lk 9999

     

    只要输入一些词就会返回一些新的单词。这些词将会成为word count 程序的输入。如果你想看到的结果大于1 。你只要重复的输入5秒钟之内相同的词即可。(如果你的输入不够快,你可以增加窗口大小)

     

        1. Data Sources [数据源]

    数据源表示你的程序从哪里读取数据。通过StreamExecutionEnvironment.addSource(sourceFunction). 你能添加数据源到你的程序中。Flink 实现了几种数据源函数(function) ,但你可以通过实现SourceFunction 自定义数据源[翻者注:SourceFunction并行度1]。如果你想要实现多个并行度的数据源函数你可以通过实现ParallelSourceFunction 接口或者扩展RichParallelSourceFunction 

     

           有一些预先定义的数据源来源于StreamExecutionEnvironment

     

    file-based[基于文件的]:

    1. readTextFile(path):读取文本文件,file 遵循TextInputFormat 规范,文本文件中的数据每一行作为一个字符串返回。
    2. readFile(fileinputFormat,path): 通过指定文件的输入格式来读取数据文件
    3. readFile(fileInputFormat, path, watchType, interval, pathFilter) :这个方法的调用实际是通过上面两个方法中的一个来实现的。它使用给定的fileInputFormat读取指定路径下面的文件。根据提供的watchType. 数据源可能周期性(根据interval ms)的监控Path路径下的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY)。或者仅处理一次当前路径下面的数据然后退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter排除不需要处理的数据。

    IMPLEMENTATION(实现):

    在内部,Flink 将读数据程序划分为两个子任务(sub-task) ,也就是目录监控和数据读取。每个子任务通过独立的条目实现。监控是通过并行度为1的任务实现的。然而数据读取时通过多个任务并行实现的。并行度等于Job任务的并行度。目录监控任务去监控目录(根据watchType 周期性的监控或仅读取一次),找到文件,切割文件,并切割文件到下游readers .   readers将读取实际的数据。每个切割的文件仅被一个readers 读取。然而一个readers 可以读取多个文件。

    IMPORTANT NOTES(特别注意):

    1. 如果watchType 被设置为 FileProcessingMode.PROCESS_CONTINUOUSLY。当files被修改的时候,它的整个内容将会被重新处理。这就会破坏“exactly-once”的语义,当追加数据到文件的末尾将导致所有的数据都会被重新处理。
    2. 如果watchType 被设置为FileProcessingMode.PROCESS_ONCE. 数据源只会被扫描一次然后退出,无需等待readers完成文件内容的读取[这里指的是监控服务]。当然readers 会持续读取文件内容直到文件内容读取完成.关闭source 将会导致此后的信息不会再有检查点。这将导致在节点失败后恢复变慢,因为Job需要从上一个检查点恢复

     

    Socked-Based:

    1. socketTextStream: Socket中读取数据。通过指定分隔符切割数据

     

    Collection-Based:

    1. fromCollection(Seq): java  Java.util.Collection 中创建data stream,集合中所有的元素必须具备相同的数据类型
    2. fromCollection(Iterator):从Iterator中创建data stream. 该类指定迭代器返回的元素的数据类型。
    3. fromElements(elements: _*) : 从一系列的对象中创建data stream. 所有的对象必须具备相同的类型
    4. fromParallelCollection(SplittableIterator):Iterator中创建data stream. 该类指定迭代器返回的元素的数据类型。
    5. generateSequence(from, to) :在给定的范围类生成一系列的数字
        1. DataStream transformations

    参见 operators  

        1. Data Sinks

    Data sinks 消费 DataStream中的数据并将数据输出到files,socket,其他额外系统或print Flink 有多种输出格式它封装了DataStream上的背后的多种operators

    1. writeAsText() / TextOutputFormat: 写元素一行作为一个String . 这个Strings 通过调用每个元素的toString() 方法来获取。
    2. writeAsCsv(...) / CsvOutputFormat: 用逗号分隔value写元组(tuple). Row Filed分隔符可配置。Value通过调用toString() 方法来获取。
    3. print() / printToErr():打印每个元素toString()value到标准输出。
    4. writeUsingOutputFormat() / FileOutputFormat:方法和自定义文件输出的基础类。支持自定义的对象到字节的转换
    5. writeToSocket:根据SerializationSchema 写元素到Socket
    6. addSink : 调用自定义的sink 函数。Flink 自带了多重sink 函数(如Apache kafka

     

    注意,在DataStrem上的Write()方法主要是为了调试的目的。他们不会参加flinkchekpoint操作。这就意味着它使用的是“at-least-once”语义。数据如何刷写到目标系统依赖于实现的OutputFormat. 这就意味着不是发送到目标系统的数据会立即展现出来。当然,在失败的场景中,这些数据可能会丢失。

     

        为了可靠性,strema exactly-once 传递到文件系统,可以使用flink-connector-filesystem

     

        1. Iterations

    Iterative streaming(迭代流)程序实现一个step 函数,并将其嵌入到IterativeStream中。由于一个DataStream程序可能永远都不会完成,因此没有最大的迭代次数。相反,你需要指定那些stream需要返回到iteration并且通过splitfilter transformation 指定那些需要输出到下游。在这里,我们有一个iteration例子。代码的主体部分是一个简单的map 转换 ,并通过返回的元素区分不同的元素返回给下游。

    val iteratedStream = someDataStream.iterate(
      iteration => {
        val iterationBody = iteration.map(/* this is executed many times */)
        (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
    })

     

    例如: 这里有一个程序冲一个整数中持续减1,直到它等于0

    val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
    val iteratedStream = someIntegers.iterate(
      iteration => {
        val minusOne = iteration.map( v => v - 1)
        val stillGreaterThanZero = minusOne.filter (_ > 0)
        val lessThanZero = minusOne.filter(_ <= 0)
        (stillGreaterThanZero, lessThanZero)
      }
    )

     

        1. Execution Parameters

    StreamExecutionEnvironment  包含ExecutionConfig  ExecutionConfig 允许为Flink运行时设置一些配置参数。

    更多的参数参见execution configuration 。这些参数属于DataStream API:

    1. setAutoWatermarkInterval(long millseconds):设置watermark发射的频率。通过getAutoWatermarkInterval可以得到当前的watermarkvalue.
          1. Fault Tolerance(故障容错)

    State & Checkpointing 描述了如何启用Flink的checkpoint 机制。

          1. Controlling Latency

    默认情况下,数据元素在网络上不是一对一的传输(如果这样将会导致不必要的网络延迟)而是先缓存起来。缓存(在两台机器上实际传输的对象)的大小在flink的配置文件中能被配置。为了更好的吞吐量这往往是一个好方法,但是当数据不足够快的时候会导致一定的数据延迟。为了控制吞吐量和延迟。在execution 环境上你可以通过env.setBufferTimeout(timeoutMillis)设置缓存等待被填满的最大等待时间。这样即使缓存区没有被填满也会被自动发送。这个timeout的默认值时100 ms

     

    Usage:

    val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
    env.setBufferTimeout(timeoutMillis)
    env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

     

    为了最大的吞吐量。set setBufferTimeout(-1),这样会移除timeout并且只有当缓存区填满的时候才能被发送。为了最小的延迟,设置timeout = 0 来关闭缓存。Timeout=0 应该要去避免,因为这会引起服务性能下降。

     

        1. Debugging

    在分布式集群中运行分布式程序之前,一个很好的办法是确定实现的算法按照期待的方式运行。因此,实现数据分析的程序通常是一个结果检查,调试,改善提高的过程。

    Flink IDE内通过本地调试的方式提供了数据分析程序开发处理的特性,注入测试,收集数据。本小节将给一些提示如何开发Flink程序。

     

          1. Local Execution Enviroment

    LocalStreamEnvironment 在同一的JVM内启动内创建Flink System.如果你是在IDE里面启动LocalEnvironment 。你可以在你的代码中打断点这样就很容易去调试了。

         

    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val lines = env.addSource(/* some source */)
    // build your program
    env.execute()

     

          1. Collection Data Sources

    Flink 为方便调试通过java collections 提供了一些特殊的数据源,一旦程序通过测试,source sink 很容易被替换。

    val env = StreamExecutionEnvironment.createLocalEnvironment()
    // Create a DataStream from a list of elements
    val myInts = env.fromElements(1, 2, 3, 4, 5)
    // Create a DataStream from any Collection
    val data: Seq[(String, Int)] = ...
    val myTuples = env.fromCollection(data)
    // Create a DataStream from an Iterator
    val longIt: Iterator[Long] = ...
    val myLongs = env.fromCollection(longIt)

     

     

          1. Iterator Data Sink

    Flink 为调试和测试的目的提供了收集DataStream 结果的sink .可以像下面这样使用:

    import org.apache.flink.streaming.experimental.DataStreamUtils
    import scala.collection.JavaConverters.asScalaIteratorConverter
    val myResult: DataStream[(String, Int)] = ...
    val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala

     

    注:在flink 1.5.0  flink-streaming-contrib 被移除了。使用flink-streaming-java flink-streaming-scala 来替代

     

        1. Where to go next(下一步)?
    1. Operators: stream operators 规范说明
    2. Event Time:介绍flink的时间概念
    3. State & Fault Tolerance:解释如何开发有状态的应用
    4. Connectors:描述有效的输入输出Connectors
    展开全文
  • 大数据系列之(一) Streaming模式基础知识 大数据 分布式系统 2016-01-07 20:18:23 发布 您的评价:   0.0 收藏 0收藏 作者:Tyler Akidau译者:张磊原文: ...
  • Java流(Stream)简介

    千次阅读 2017-11-24 11:54:39
    要讨论流,我们先来谈谈集合,这是最容易上手的方式了。 Java 8中的集合支持一个新的stream方法,它会返回一个流(接口定义在java.util.stream.Stream里)。那么, 流到底是什么呢?简短的定义就是“从支持数据处理...
  • 干货 | Spark Streaming 和 Flink 详细对比

    千次阅读 2018-08-11 00:39:08
    原文详见:https://mp.weixin.qq.com/s/Fb1cW0oN7xYeb1oI2ixtgQ
  • spark streaming实时分析处理时,处理的数据可能会出现重复,需要根据唯一的key进行处理,谁知道怎么处理
  • spark.streaming.kafka.maxRatePerPartition这个参数是控制吞吐量的,一般和spark.streaming.backpressure.enabled=true一起使用。那么应该怎么算这个值呢。 如例要10分钟的吞吐量控制在5000,0000,kafka分区是10个...
  • sparkstreaming

    万次阅读 2017-12-11 11:12:14
    Spark Streaming实时计算框架介绍   随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐、用户行为分析等。 Spark ...
  • Resources与StreamingAssets文件夹的区别

    千次阅读 2017-08-07 09:27:01
    FR:海涛高软(QQ技术交流群:386476712)
  • https://blog.csdn.net/ntk1986/article/details/80755888
  • Spark Streaming与Storm的对比分析

    千次阅读 2015-12-14 16:39:28
    Spark Streaming与Storm的对比分析
  • Unity中StreamingAssets文件夹中的资源,在打包时会原封不动的包含到包体中,如图我在StreamingAssets文件夹的资源:当我打包apk后,用解压软件查看apk的内部资源,会在assets文件夹下找到StreamingAssets中的资源。...
  • 出现该现象即为你的纹理池不够用了,增加纹理池的大小...你需要在 \Engine\Config\ConsoleVariables.ini 里添加 “r.Streaming.PoolSize=2000” ,保存重启引擎就可以 或者在项目设置里关掉texture streaming
  • Darwin Streaming Server 安装流程

    万次阅读 2013-11-25 23:51:36
    Darwin StreamingServer 安装流程 Darwin StreamingServer 支持开放源代码和基于标准的实时传输协议/实时流协议(RTP / RTSP)、MPEG-4 和MP3 流协议。 一、安装前的准备 Darwin StreamingServer 的下载 Quicktime ...
  • 使用springboot架构在通过feign调用方法的时候报错:cannot retry due to redirection, in streaming mode,通过swagger测试所有的get delete请求都可以跑通,只有post put带body的请求会出现500的异常: ...
  • 30分钟概览Spark Streaming 实时计算

    万次阅读 2019-09-08 15:24:09
    什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark Streaming相对其他实时计算框架该如何技术选型? 本文主要针对初学者,如果有不明白的概念...
  • spark 参数调优11-Spark Streaming

    千次阅读 2018-09-06 15:56:23
    spark参数调优系列 目录地址: ...   11 Spark Streaming spark.streaming.backpressure.enabled 反压,默认false,详细了解请移步https://blog.csdn.net/zyzzxycj/article/detai...
  • NoClassDefFoundError: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
1 2 3 4 5 ... 20
收藏数 104,150
精华内容 41,660
关键字:

streaming