精华内容
下载资源
问答
  • Spark Streaming

    千次阅读 2020-04-22 11:11:15
    什么是Spark Streaming 什么是DStream 阐明RDD、DataFrame、DataSet、DStream数据抽象之间的关系。 SparkStreaming代码过程 窗口宽度和滑动距离的关系 0.8版本SparkStreaming集成kafka的差异 Receiver接收方式...

    目录

     

    什么是Spark Streaming

    什么是DStream

    阐明RDD、DataFrame、DataSet、DStream数据抽象之间的关系。

    SparkStreaming代码过程

    窗口宽度和滑动距离的关系

    0.8版本SparkStreaming集成kafka的差异

    Receiver接收方式

    Direct直连方式

    什么是Structured Streaming

    Structured Streaming模型

    Structured Streaming应用场景

    详细描述下图内容


    什么是Spark Streaming

     

    Spark Streaming是一个基于Spark Core之上的实时计算框架

     

    什么是DStream

    代表持续性的输入的数据流和经过各种Spark算子操作后的输出的结果数据流。

    本质上就是按照时间间隔划分成一批一批的连续的RDD

     

     

    阐明RDD、DataFrame、DataSet、DStream数据抽象之间的关系。

     

    DStream=RDD1(t1)+ RDD2(t2)+ RDD3(t3)+ RDD4(t4)+….

    DataSet  =  DataFrame+类型  =   RDD+结构+类型

    DataFrame  =  RDD+结构

     

     

    SparkStreaming代码过程

         1 创建sparkConf

    2 创建一个sparkcontext
    3 创建streamingcontext
    4 接收数据并根据业务逻辑进行计算

    5 开启计算任务
    6 等待关闭

     

     

    窗口宽度和滑动距离的关系

     

    0.8版本SparkStreaming集成kafka的差异

    Receiver接收方式

    1. 多个Receiver接受数据效率高,但有丢失数据的风险。
    2. 开启日志(WAL)可防止数据丢失,但写两遍数据效率低。
    3. Zookeeper维护offset有重复消费数据可能。
    4. 使用高层次的API

    Direct直连方式

    1. 不使用Receiver,直接到kafka分区中读取数据
    2. 不使用日志(WAL)机制。
    3. Spark自己维护offset
    4. 使用低层次的API

     

     

     

    什么是Structured Streaming

    Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎

     

    Structured Streaming模型

    是一个不断增长的动态表格,新数据被持续不断地添加到表格的末尾

    对动态数据源进行实时查询,就是对当前的表格内容执行一次 SQL 查询。

     

    Structured Streaming应用场景

    将数据源映射为类似于关系数据库中的表,(SparkSQL中的DF/DS)

    然后将经过计算得到的结果映射为另一张表.

     

     

    详细描述下图内容

    这是一个 计算  WorldCount (每个单词 出现的 次数) 流  

    输入 数据: 

    在 10:01   01分的时候 输入  单词 : cat dog dog dog    (剖析 : 肉眼可见  cat 出现了一次  dog 出现了三次)

     所有  计算出来   cat 1 dog 3   最后进行输出模式

    10:02    和 10:03 一致  (只是 新加了 单词 并重新计算)

    展开全文
  • Spark Streaming 转向 Structured Streaming

    千次阅读 2019-05-23 20:33:24
    Spark 团队对 Spark Streaming 的维护将会越来越少,Spark 2.4 版本的 Release Note 里面甚至一个 Spark Streaming 相关的 ticket 都没有。相比之下,Structured Streaming 有将近十个 ticket 说明。所以,是时候...

    导读

    Spark 团队对 Spark Streaming 的维护将会越来越少,Spark 2.4 版本的 Release Note 里面甚至一个 Spark Streaming 相关的 ticket 都没有。相比之下,Structured Streaming 有将近十个 ticket 说明。所以,是时候舍弃 Spark Streaming 转向 Structured Streaming 了,当然理由并不止于此。

    今天这篇文章将重点分析 Spark Streaming 的不足,以及 Structured Streaming 的设计初衷和思想。文章主要参考 2018 年 Sigmod 上面的论文:《Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark》。

    首先可以注意到论文标题中的 Declarative API,中文一般叫做声明式编程 API。一般直接看到这个词可能不知道什么意思,但是当我们列出它的对立单词:Imperative API,中文一般叫命令式编程 API,仿佛一切都明了了。是的,没错,Declarative 只是表达出我们想要什么,而 Imperative 则是说为了得到什么我们需要做哪些东西一个个说明。举个例子,我们要一个糕点,去糕点店直接定做,告诉店员我们要什么样式的糕点,然后店员去给我们做出来,这就是 Declarative。而 Imperative 对应的就是面粉店了。

    0. Spark Streaming 的不足

    在开始正式介绍 Structured Streaming 之前有一个问题还需要说清楚,就是 Spark Streaming 存在哪些不足?总结一下主要有下面几点:

    使用 Processing Time 而不是 Event Time

    首先解释一下,Processing Time 是数据到达 Spark 被处理的时间,而 Event Time 是数据自带的属性,一般表示数据产生于数据源的时间。比如 IoT 中,传感器在 12:00:00 产生一条数据,然后在 12:00:05 数据传送到 Spark,那么 Event Time 就是 12:00:00,而 Processing Time 就是 12:00:05。我们知道 Spark Streaming 是基于 DStream 模型的 micro-batch 模式,简单来说就是将一个微小时间段,比如说 1s 的流数据当成批数据来处理。如果我们要统计某个时间段的一些数据,毫无疑问应该使用 Event Time,但是因为 Spark Streaming 的数据切割是基于 Processing Time,这样就导致使用 Event Time 特别困难。

     Complex Low-level API

    这点比较好理解,DStream (Spark Streaming 的数据模型)提供的 API 类似 RDD 的 API,非常 Low level。当我们编写 Spark Streaming 程序的时候,本质上就是要去构造 RDD 的 DAG 执行图,然后通过 Spark Engine 运行。这样导致的一个问题是,DAG 可能会因为开发者的水平参差不齐而导致执行效率上的天壤之别。这样导致开发者的体验非常不好,也是任何一个基础框架不想看到的(基础框架的口号一般都是:你们专注于自己的业务逻辑就好,其他的交给我)。这也是很多基础系统强调 Declarative 的一个原因。

     Reason about end-to-end application

    这里的 end-to-end 指的是直接 input 到 out,比如 Kafka 接入 Spark Streaming 然后再导出到 HDFS 中。DStream 只能保证自己的一致性语义是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Streaming 输出到外部存储的语义往往需要用户自己来保证。而这个语义保证写起来也非常有挑战性,比如为了保证 output 的语义是 exactly-once,需要 output 的存储系统具有幂等的特性,或者支持事务性写入,这对于开发者来说都不是一件容易的事情。

     批流代码不统一

    尽管批流本是两套系统,但是这两套系统统一起来确实很有必要,我们有时候确实需要将我们的流处理逻辑运行到批数据上面。关于这一点,最早在 2014 年 Google 提出 Dataflow 计算服务的时候就批判了 streaming/batch 这种叫法,而是提出了 unbounded/bounded data 的说法。DStream 尽管是对 RDD 的封装,但是我们要将 DStream 代码完全转换成 RDD 还是有一点工作量的,更何况现在 Spark 的批处理都用 DataSet/DataFrame API 了。

    1. Structured Streaming 介绍

    Structured Streaming 在 Spark 2.0 版本于 2016 年引入,设计思想参考了很多其他系统的思想,比如区分 processing time 和 event time,使用 relational 执行引擎提高性能等。同时也考虑了和 Spark 其他组件更好的集成。

    Structured Streaming 和其他系统的显著区别主要如下:

    • Incremental query model:Structured Streaming 将会在新增的流式数据上不断执行增量查询,同时代码的写法和批处理 API (基于 Dataframe 和 Dataset API)完全一样,而且这些 API 非常的简单。
    • Support for end-to-end application:Structured Streaming 和内置的 connector 使的 end-to-end 程序写起来非常的简单,而且 "correct by default"。数据源和 sink 满足 "exactly-once" 语义,这样我们就可以在此基础上更好地和外部系统集成。
    • 复用 Spark SQL 执行引擎:我们知道 Spark SQL 执行引擎做了非常多优化工作,比如执行计划优化、codegen、内存管理等。这也是 Structured Streaming 取得高性能和高吞吐的一个原因。

     

     

    2. Structured Streaming 核心设计

    下面我们看一下 Structured Streaming 的核心设计。

    • Input and Output:Structured Streaming 内置了很多 connector 来保证 input 数据源和 output sink 保证 exactly-once 语义。

    而实现 exactly-once 语义的前提是:

    1. Input 数据源必须是可以 replay 的,比如 Kafka,这样节点 crash 的时候就可以重新读取 input 数据。常见的数据源包括 Amazon Kinesis, Apache Kafka 和文件系统。
    2. Output sink 必须要支持写入是幂等的。这个很好理解,如果 output 不支持幂等写入,那么一致性语义就是 at-least-once 了。另外对于某些 sink, Structured Streaming 还提供了原子写入来保证 exactly-once 语义。
    • API:Structured Streaming 代码编写完全复用 Spark SQL 的 batch API,也就是对一个或者多个 stream 或者 table 进行 query。query 的结果是 result table,可以以多种不同的模式(append, update, complete)输出到外部存储中。另外,Structured Streaming 还提供了一些 Streaming 处理特有的 API:Trigger, watermark, stateful operator。
    • Execution:复用 Spark SQL 的执行引擎。Structured Streaming 默认使用类似 Spark Streaming 的 micro-batch 模式,有很多好处,比如动态负载均衡、再扩展、错误恢复以及 straggler (straggler 指的是哪些执行明显慢于其他 task 的 task)重试。除了 micro-batch 模式,Structured Streaming 还提供了基于传统的 long-running operator 的 continuous 处理模式。
    • Operational Features:利用 wal 和状态存储,开发者可以做到集中形式的 Rollback 和错误恢复。还有一些其他 Operational 上的 feature,这里就不细说了。

    3. Structured Streaming 编程模型

    可能是受到 Google Dataflow 的批流统一的思想的影响,Structured Streaming 将流式数据当成一个不断增长的 table,然后使用和批处理同一套 API,都是基于 DataSet/DataFrame 的。如下图所示,通过将流式数据理解成一张不断增长的表,从而就可以像操作批的静态数据一样来操作流数据了。

    在这个模型中,主要存在下面几个组成部分:

    • Input Unbounded Table:流式数据的抽象表示
    • Query:对 input table 的增量式查询
    • Result Table:Query 产生的结果表
    • Output:Result Table 的输出

     

    下面举一个具体的例子,NetworkWordCount,代码如下:

    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines = spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))
    // Generate running word count
    val wordCounts = words.groupBy("value").count()
    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .start()

    代码实际执行流程可以用下图来表示。把流式数据当成一张不断增长的 table,也就是图中的 Unbounded table of all input。然后每秒 trigger 一次,在 trigger 的时候将 query 应用到 input table 中新增的数据上,有时候还需要和之前的静态数据一起组合成结果。query 产生的结果成为 Result Table,我们可以选择将 Result Table 输出到外部存储。输出模式有三种:

    • Complete mode:Result Table 全量输出
    • Append mode (default):只有 Result Table 中新增的行才会被输出,所谓新增是指自上一次 trigger 的时候。因为只是输出新增的行,所以如果老数据有改动就不适合使用这种模式。
    • Update mode:只要更新的 Row 都会被输出,相当于 Append mode 的加强版。

    和 batch 模式相比,streaming 模式还提供了一些特有的算子操作,比如 window、watermark、stateful operator 等。

    window,下图是一个基于 event-time 统计 window 内事件的例子。

    import spark.implicits._
    val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
    // Group the data by window and word and compute the count of each group
    val windowedCounts = words.groupBy(
    window("eventTime", "10 minutes", "5 minutes"),
    $"word"
    ).count()

    如下图所示,窗口大小为 10 分钟,每 5 分钟 trigger 一次。在 12:11 时候收到了一条 12:04 的数据,也就是 late data (什么叫 late data 呢?就是 Processing Time 比 Event Time 要晚),然后去更新其对应的 Result Table 的记录。

    watermark,是也为了处理 ,很多情况下对于这种 late data 的时效数据并没有必要一直保留太久。比如说,数据晚了 10 分钟或者还有点有,但是晚了 1 个小时就没有用了,另外这样设计还有一个好处就是中间状态没有必要维护那么多。watermark 的形式化定义为 max(eventTime) - threshold,早于 watermark 的数据直接丢弃。

    import spark.implicits._
    val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
    // Group the data by window and word and compute the count of each group
    val windowedCounts = words
    .withWatermark("eventTime", "10 minutes")
    .groupBy(
    window("eventTime", "10 minutes", "5 minutes"),
    $"word")
    .count()

    用下图表示更加形象。在 12:15 trigger 时 watermark 为 12:14 - 10m = 12:04,所以 late date (12:08, dog; 12:13, owl) 都被接收了。在 12:20 trigger 时 watermark 为 12:21 - 10m = 12:11,所以 late data (12:04, donkey) 都丢弃了。

    除此之后 Structured Streaming 还提供了用户可以自定义状态计算逻辑的算子:

    • mapGroupsWithState
    • flatMapGroupsWithState

    看名字大概也能看出来 mapGroupsWithState 是 one -> one,flatMapGroupsWithState 是 one -> multi。这两个算子的底层都是基于 Spark Streaming 的 updateStateByKey。

    4. Continuous Processing Mode

    好,终于要介绍到“真正”的流处理了,我之所以说“真正”是因为 continuous mode 是传统的流处理模式,通过运行一个 long-running 的 operator 用来处理数据。之前 Spark 是基于 micro-batch 模式的,就被很多人诟病不是“真正的”流式处理。continuous mode 这种处理模式只要一有数据可用就会进行处理,如下图所示。epoch 是 input 中数据被发送给 operator 处理的最小单位,在处理过程中,epoch 的 offset 会被记录到 wal 中。另外 continuous 模式下的 snapshot 存储使用的一致性算法是 Chandy-Lamport 算法。

    这种模式相比与 micro-batch 模式缺点和优点都很明显。

    • 缺点是不容易做扩展
    • 优点是延迟更低

    关于为什么延迟更低,下面两幅图可以做到一目了然。

    5. 一致性语义

    对于 Structured Streaming 来说,因为有两种模式,所以我们分开讨论。

    micro-batch 模式 可以提供 end-to-end 的 exactly-once 语义。原因是因为在 input 端和 output  端都做了很多工作来进行保证,比如 input 端 replayable + wal,output 端写入幂等。

    continuous 模式 只能提供 at-least-once 语义。关于 continuous mode 的官方讨论的实在太少,甚至只是提了一下。在和 @李呈祥 讨论之后觉得应该还是 continuous mode 由于要尽可能保证低延迟,所以在 sink 端没有做一致性保证。

    6. Benchmark

        Structured Streaming 的官方论文里面给出了 Yahoo! Streaming Benchmark 的结果,Structured Streaming 的 throughput 大概是 Flink 的 2 倍和 Kafka Streaming 的 90 多倍。

    7. 总结

    总结一下,Structured Streaming 通过提供一套 high-level 的 declarative API 使得流式计算的编写相比 Spark Streaming 简单容易不少,同时通过提供 end-to-end 的 exactly-once 语义。

    8. 闲扯

    最后,闲扯一点别的。Spark 在多年前推出基于 micro-batch 模式的 Spark Streaming 必然是基于当时 Spark Engine 最快的方式,尽管不是真正的流处理,但是在吞吐量更重要的年代,还是尝尽了甜头。而 Spark 的真正基于 continuous 处理模式的 Structured Streaming 直到 Spark 2.3 版本才真正推出,从而导致近两年让 Flink 尝尽了甜头(当然和 Flink 的优秀的语义模型存在很大的关系)。

    在实时计算领域,目前来看,两家的方向都是朝着 Google DataFlow 的方向。由 Spark 的卓越核心 SQL Engine 助力的 Structured Streaming,还是风头正劲的 Flink,亦或是其他流处理引擎,究竟谁将占领统治地位,还是值得期待一下的。

    展开全文
  • spark streaming

    千次阅读 2018-08-03 10:39:59
    Spark Streaming 课程目标 掌握Spark Streaming的原理 熟练使用Spark Streaming完成流式计算任务 Spark Streaming介绍 Spark Streaming概述 什么是Spark Streaming Spark Streaming类似于Apache ...

    Spark Streaming

    1. 课程目标
      1. 掌握Spark Streaming的原理
      2. 熟练使用Spark Streaming完成流式计算任务
    2. Spark Streaming介绍
      1. Spark Streaming概述
        1. 什么是Spark Streaming

    Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。

        1. 为什么要学习Spark Streaming

     

    1. 易用

    1. 容错

    1. 易整合到Spark体系

        1. Spark与Storm的对比

    Spark

    Storm

    开发语言:Scala

    开发语言:Clojure

    编程模型:DStream

    编程模型:Spout/Bolt

     

    1. DStream
      1. 什么是DStream

    Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:

    对数据的操作也是按照RDD为单位来进行的

    计算过程由Spark engine来完成

      1. DStream相关操作

    DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

     

        1. Transformations on DStreams

    Transformation

    Meaning

    map(func)

    Return a new DStream by passing each element of the source DStream through a function func.

    flatMap(func)

    Similar to map, but each input item can be mapped to 0 or more output items.

    filter(func)

    Return a new DStream by selecting only the records of the source DStream on which func returns true.

    repartition(numPartitions)

    Changes the level of parallelism in this DStream by creating more or fewer partitions.

    union(otherStream)

    Return a new DStream that contains the union of the elements in the source DStream and otherDStream.

    count()

    Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.

    reduce(func)

    Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel.

    countByValue()

    When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.

    reduceByKey(func, [numTasks])   

    When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.

    join(otherStream, [numTasks])

    When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.

    cogroup(otherStream, [numTasks])

    When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.

    transform(func)     

    Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.

    updateStateByKey(func)

    Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

     

    特殊的Transformations

     

    1. UpdateStateByKey Operation

    UpdateStateByKey原语用于记录历史记录,上文中Word Count示例中就用到了该特性。若不用UpdateStateByKey来更新状态,那么每次数据进来后分析完成后,结果输出后将不再保存

     

    1. Transform Operation

    Transform原语允许DStream上执行任意的RDD-to-RDD函数。通过该函数可以方便的扩展Spark API。此外,MLlib(机器学习)以及Graphx也是通过本函数来进行结合的。

     

    1. Window Operations

    Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态

        1. Output Operations on DStreams

    Output Operations可以将DStream的数据输出到外部的数据库或文件系统,当某个Output Operations原语被调用时(与RDD的Action相同),streaming程序才会开始真正的计算过程。

    Output Operation

    Meaning

    print()

    Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.

    saveAsTextFiles(prefix, [suffix])

    Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

    saveAsObjectFiles(prefix, [suffix])

    Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

    saveAsHadoopFiles(prefix, [suffix])

    Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".

    foreachRDD(func)

    The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

    1. 实战
      1. 用Spark Streaming实现实时WordCount

    架构图:

    1. 安装并启动生产者

    首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具

    yum install -y nc

     

    启动一个服务端并监听9999端口

    nc -lk 9999

     

    1. 编写Spark Streaming程序
    package com.qf.spark.streaming
    
      
    
      import com.qf.spark.util.LoggerLevel
    
      import org.apache.spark.SparkConf
    
      import org.apache.spark.streaming.{Seconds, StreamingContext}
    
      
    
      object NetworkWordCount {
    
      def main(args: Array[String]) {
    
        //设置日志级别
    
        LoggerLevel.setStreamingLogLevels()
    
        //创建SparkConf并设置为本地模式运行
    
        //注意local[2]代表开两个线程
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    
        //设置DStream批次时间间隔为2秒
    
        val ssc = new StreamingContext(conf, Seconds(2))
    
        //通过网络读取数据
    
        val lines = ssc.socketTextStream("192.168.10.101", 9999)
    
        //将读到的数据用空格切成单词
    
        val words = lines.flatMap(_.split(" "))
    
        //将单词和1组成一个pair
    
        val pairs = words.map(word => (word, 1))
    
        //按单词进行分组求相同单词出现的次数
    
        val wordCounts = pairs.reduceByKey(_ + _)
    
        //打印结果到控制台
    
        wordCounts.print()
    
        //开始计算
    
        ssc.start()
    
        //等待停止
    
        ssc.awaitTermination()
    
      }
    
    }

     

    3.启动Spark Streaming程序:由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序

    注意:要指定并行度,如在本地运行设置setMaster("local[2]"),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1

     

    4.在Linux端命令行中输入单词

     

    5.在IDEA控制台中查看结果

    问题:结果每次在Linux段输入的单词次数都被正确的统计出来,但是结果不能累加!如果需要累加需要使用updateStateByKey(func)来更新状态,下面给出一个例子:

    package com.qf.spark.streaming
    
      
    
      import com.qf.spark.util.LoggerLevel
    
      import org.apache.spark.{HashPartitioner, SparkConf}
    
      import org.apache.spark.streaming.{StreamingContext, Seconds}
    
      
    
      object NetworkUpdateStateWordCount {
    
      /**
    
        * String : 单词 hello
    
        * Seq[Int] :单词在当前批次出现的次数
    
        * Option[Int] : 历史结果
    
        */
    
      val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    
        //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
    
        iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
    
      }
    
      
    
      def main(args: Array[String]) {
    
        LoggerLevel.setStreamingLogLevels()
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")
    
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //做checkpoint 写入共享存储中
    
        ssc.checkpoint("c://aaa")
    
        val lines = ssc.socketTextStream("192.168.10.100", 9999)
    
        //reduceByKey 结果不累加
    
        //val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    
        //updateStateByKey结果可以累加但是需要传入一个自定义的累加函数:updateFunc
    
        val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    
        results.print()
    
        ssc.start()
    
        ssc.awaitTermination()
    
      }
    
    }
    
     

    4.2Spark Streaming整合Kafka完成网站点击流实时统计

    1. 安装并配置zk
    2. 安装并配置Kafka
    3. 启动zk
    4. 启动Kafka
    5. 创建topic

    bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181 \

    --replication-factor 3 --partitions 3 --topic urlcount

    6.编写Spark Streaming应用程序

    package com.qf.spark.streaming
    
      
    
      package com.qf.spark
    
      
    
      import org.apache.spark.{HashPartitioner, SparkConf}
    
      import org.apache.spark.storage.StorageLevel
    
      import org.apache.spark.streaming.kafka.KafkaUtils
    
      import org.apache.spark.streaming.{Seconds, StreamingContext}
    
      
    
      object UrlCount {
    
      val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
    
        iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
    
      }
    
      
    
      def main(args: Array[String]) {
    
        //接收命令行中的参数
    
        val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
    
        //创建SparkConf并设置AppName
    
        val conf = new SparkConf().setAppName("UrlCount")
    
        //创建StreamingContext
    
        val ssc = new StreamingContext(conf, Seconds(2))
    
        //设置检查点
    
        ssc.checkpoint(hdfs)
    
        //设置topic信息
    
        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    
        //重Kafka中拉取数据创建DStream
    
        val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
    
        //切分数据,截取用户点击的url
    
        val urls = lines.map(x=>(x.split(" ")(6), 1))
    
        //统计URL点击量
    
        val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    
        //将结果打印到控制台
    
        result.print()
    
        ssc.start()
    
        ssc.awaitTermination()
    
      }
    
    }

     

    展开全文
  • 最近在看 Structured Streaming,先出个简单对比图,Spark Streaming VS Structured Streaming 以及 Structured Streaming内部的两种模式 MicroBatch Streaming VS Continous Streaming。后期深入再做总结和补充...

    一 前言

    最近在看 Structured Streaming,先出个简单对比图,Spark Streaming VS Structured Streaming 以及 Structured Streaming内部的两种模式 MicroBatch Streaming VS Continous Streaming。后期深入再做总结和补充。

    参考:

    《图解Spark核心技术与案例实战》

    二 Spark Streaming VS Structured Streaming

    对比项

    Spark Streaming

    Structured Streaming

    基于RDD抽象数据

    DStream(RDD不同时间点的集合,是RDD抽象数据集合)。DStream上有转换和输出操作

    基于DataSet/DataFrame,由于二维表有数据类型,可以做优化,如encoder。它提供了toRDD接口,最终也是基于RDD的Job

    是否数据结构化

    可以非结构化???

    结构化数据

    Job执行流程

    先执行DStreamGraphe+JobScheduler再执行DAGSchedule。 

    作业先注册到DStreamGraph,到达批处理时间时,才根据DStreamGraph生成作业并处理该批处理时间内接受的数据

    先执行SQL Engine再执行DAGSchedule。

    SQL Engine(解析SQL语句,生成logical plan,优化logical plan,生成physical plan,优化physical plan)

    是否批处理

    是,设置BatchDuration,每个duration的数据流为一批处理源

    MicroBatchExecution:是批处理

    ContinousExecution:可以单条数据增量式处理(这个目前由触发器的时间决定)

    最小延期

    0.5~2s

    MicroBatchExecution:100ms

    ContinousExecution:1ms

    处理引擎

    Spark Streaming引擎(DStreamGraphe+JobScheduler)+SparkCore引擎(DAGScheduler)

    Spark SQL引擎(SQL Engine) + SparkCore引擎(DAGScheduler)

    三、 MicroBatch Streaming VS Continous Streaming (In Spark2.4 Structured Streaming)

    Structured Streaming处理模式

    MicroBatch模式

    Continous 模式

    统一的用户编程模式

    Input => Query => Result => Output(complete/update/append)

    -----------------------trigger--------------------------->

    统一的设计模式(from code view)

    source => transformation/action(catalyst->spark.execute) => sink => output => [checkpoint]

    样例代码及对应的代码模块

    Trigger类

    Trigger.ProcessingTime

    Trigger.Continuous

    对应execution

    MicroBatchExecution

    ContinuousExecution

    Trigger工作模式

    以一定间隔(interval)调度计算逻辑,间隔为0时,上批次调用完成后,立即进入下一批次调用一直调用,退化为类似sparkstreaming的micro batch的流处理

    以一定间隔(interval)查看流计算状态

    DAG调度次数

    等于Trigger 的 interval间隔触发次数

    一次,由ContinuousWriteRDD.compute里 while (!context.isInterrupted() && !context.isCompleted()) 完成

    目前Spark2.3-2.4支持API

    支持API丰富,如汇聚,关联等操作

    仅简单的projection类(map,select等)

    TriggerExecutor类型

    ProcessingTimeExecutor

    ProcessingTimeExecutor/

    OneTimeExecutor

    Databrick测试的延迟级别

    100ms

    https://databricks.com/wp-content/uploads/2018/03/image6-1.png

    1ms

    https://databricks.com/wp-content/uploads/2018/03/image5-2.png

    Refer to: https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html

         
    展开全文
  • 前言 引用Spark commiter(gatorsmile)的话:“从Spark-2.X版本后,Spark streaming就进入维护模式,Spark streaming是低阶API,给码农用的,各种坑;Structured streaming是给人设计的API,简单易用。由于太忙,...
  • 我先简单的说一下Spark streaming,然后在想对比的说一下Structured Streaming Spark streaming的概述 Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的...
  • SparkStreaming

    2019-04-14 13:16:06
    SparkStreaming 概述 Spark Streaming可以轻松构建可扩展的容错流应用程序 1)便于使用 2) 容错 3)Spark集成 计算框架 log-》flume-》kafka-》sparkStreaming-》hdfs || mysql -》hive||hbase Dstream 数据...
  • structure streaming

    千次阅读 2020-03-04 21:47:40
    structure steaming在spark streaming上进行了全新架构,持续处理模式支持很快很快,微批处理模式支持毫秒响应,select where groupBy map filter flatMap操作都支持 支持sparkSQL,数据抽象是dataframe 和DataSet ...
  • 文章目录课程目标1、sparkStreaming概述1.1 SparkStreaming是什么1.2 SparkStreaming的组件 课程目标 说出Spark Streaming的特点 说出DStreaming的常见操作api 能够应用Spark Streaming实现实时数据处理 能够应用...
  • Mapreduce streaming

    2017-05-16 15:27:38
    Mapreduce streaming 用hadoop,作为Java界的小残废,只能紧紧抱住Streaming框架的大腿,Hadoop MapReduce和HDFS采用Java实现,默认提供Java编程接口,另外提供了C++编程接口和Streaming框架。Streaming框架允许...
  • Hadoop Streaming

    2018-08-01 11:50:23
    官方文档:...Hadoop streaming is a utility that comes with the Hadoop distribution. The utility ...
  • spark streaming导入的依赖为: org.apache.spark spark-streaming-kafka-0-10_2.11 ${spark.version} 而structed streaming导入的依赖不同: org.apache.spark spark-sql-kafka-0-10_2.11 ${spark.version} ...
  • 【Spark Streaming】Spark Streaming的使用

    千次阅读 2020-04-21 09:06:18
    一、Spark Streaming引入 集群监控 一般的大型集群和平台, 都需要对其进行监控的需求。 要针对各种数据库, 包括 MySQL, HBase 等进行监控 要针对应用进行监控, 例如 Tomcat, Nginx, Node.js 等 要针对硬件的...
  • Structred StreamingStreaming Query分析

    千次阅读 2016-12-28 10:41:52
    Structred StreamingStreaming Query分析 在用户的应用程序中,用户会调用DataStreamWriter.start()方法发起一个Streaming query。 在DataStreamWriter中,会调用df.sparkSession.sessionState....
  • SparkStreaming 介绍

    千次阅读 多人点赞 2020-04-14 17:49:04
    Spark Streaming引入 新的场景需求 ●集群监控 一般的大型集群和平台, 都需要对其进行监控的需求。 要针对各种数据库, 包括 MySQL, HBase 等进行监控 要针对应用进行监控, 例如 Tomcat, Nginx, Node.js 等 要...
  • sparkstreaming

    万次阅读 2017-10-14 12:54:28
    Spark Streaming实时计算框架介绍   随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐、用户行为分析等。 Spark ...
  • Streaming System 第一章:Streaming 101

    千次阅读 2018-12-13 16:09:47
    Streaming101起源于在O'really上发表的两篇博客,原文如下:https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 其中对流式...
  • 前言 又是一个超长的标题(摊手┓( ´∀` )┏)。Spark Streaming 历史比较悠久,也确实非常...反倒是Structured Streaming, 吐槽点比较多,但是到目前,我们经过一番实践,觉得是时候丢掉Spark Streaming 升级到...
  • Hive Streaming

    千次阅读 2016-01-20 17:50:15
    1.Hive Streaming介绍 在前面我们看到了UDF、UDTF、UDAF的实现并不是很简单,而且还要求对Java比较熟悉,而Hive设计的初衷是方便那些非Java人员使用。因此,Hive提供了另一种数据处理方式——Streaming,这样就可以...
  • Spark Streaming1、Spark Streaming是什么?2、Spark Streaming特点易用容错易整合3、SparkStreaming与Storm的对比 1、Spark Streaming是什么? Spark Streaming类似于Apache Storm,用于流式数据的处理。 Spark ...
  • Hadoop streaming详解

    千次阅读 2017-03-14 17:20:17
    Hadoop streamingHadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java。这里要介绍的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard ...
  • Spark Streaming原理 Spark Streaming 是基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。 Spark Streaming计算流程 Spark ...
  • 转载:是时候放弃 Spark Streaming, 转向 Structured Streaming 了 正如在之前的那篇文章中 Spark Streaming 设计原理 中说到 Spark 团队之后对 Spark Streaming 的维护可能越来越少,Spark 2.4 版本的 Release ...
  • StructStreaming 代码

    千次阅读 2020-04-22 12:00:30
    //需求 统计年龄小于25岁的人群的爱好排行榜 读取文件 package day26 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql...object StructStreaming_files { d...
  • 1.Spark Streaming:大数据实时计算介绍 2.Spark Streaming:DStream基本工作原理 3.Spark Streaming:StreamingContext详解技能点 4.Spark Streaming:输入DStream和Receiver详解 5.Spark Streaming:DStream的...
  • 上一篇博客《StructuredStreaming是何方神圣》已经介绍了StructuredStreaming的相关概念,这篇博客我们介绍基于StructuredStreaming进行实时流算子开发,并将结果输出到kafka中。 StructuredStreaming使用的数据...
  • 一、为何要有StructuredStreaming 二、StructuredStreaming的特性 1、结构化流式处理 2、基于Event-Time聚合&延迟数据处理 3、容错性 Structured Streaming是Spark新提出的一种实时流的框架,以前是...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 53,519
精华内容 21,407
关键字:

streaming