streaming_streamingcontext获取不到数据 - CSDN
精华内容
参与话题
  • Streaming简介

    2013-07-24 11:27:17
    Streaming简介  Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。因此可以说对于hadoop的扩展性意义重大,今天简单说一下。 Streaming的原理是用Java实现一个...

    Streaming简介 

    Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。因此可以说对于hadoop的扩展性意义重大,今天简单说一下。

    Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用MapReduce Java接口获取key/value对输入,创建一个新的进程启动包装的用户程序,将数据通过管道传递给包装的用户程序处理,然后调用MapReduce Java接口将用户程序的输出切分成key/value对输出。 

     

    Streaming优点

    1 开发效率高,便于移植

    只要按照标准输入输出格式进行编程,就可以满足hadoop要求。因此单机程序稍加改动就可以在集群上进行使用。 同样便于测试

    只要按照 cat input | mapper | sort | reducer > output 进行单机测试即可。

    如果单机测试通过,大多数情况是可以在集群上成功运行的,只要控制好内存就好了。

        2 提高程序效率

    有些程序对内存要求较高,如果用java控制内存毕竟不如C/C++。

    Streaming不足

        1 Hadoop Streaming默认只能处理文本数据,无法直接对二进制数据进行处理 

        2 Streaming中的mapperreducer默认只能向标准输出写数据,不能方便地处理多路输出 

    具体参数介绍

     

    -input    <path>

    输入数据路径

    -output   <path>

    输出数据路径

    -mapper  <cmd|JavaClassName>

    mapper可执行程序或Java类

    -reducer  <cmd|JavaClassName>

    reducer可执行程序或Java类

    -file            <file>        Optional

    分发本地文件

    -cacheFile       <file>        Optional

    分发HDFS文件

    -cacheArchive   <file>         Optional

    分发HDFS压缩文件

    -numReduceTasks  <num>    Optional

    reduce任务个数

    -jobconf | -D NAME=VALUE    Optional

    作业配置参数

    -combiner <JavaClassName>   Optional

    Combiner Java

    -partitioner <JavaClassName>  Optional

    Partitioner Java

    -inputformat <JavaClassName> Optional

    InputFormat Java

    -outputformat <JavaClassName>Optional

    OutputFormat Java

    -inputreader <spec>            Optional

    InputReader配置

    -cmdenv   <n>=<v>           Optional

    传给mapper和reducer的环境变量

    -mapdebug <path>             Optional

    mapper失败时运行的debug程序

    -reducedebug <path>           Optional

    reducer失败时运行的debug程序

    -verbose                      Optional

    详细输出模式

     

     下面是对各个参数的详细说明:

    -input <path>:指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入。

    -output <path>:指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次。

    -mapper:指定mapper可执行程序或Java类,必须指定且唯一。

    -reducer:指定reducer可执行程序或Java类,必须指定且唯一。

    -file, -cacheFile, -cacheArchive:分别用于向计算节点分发本地文件、HDFS文件和HDFS压缩文件

    -numReduceTasks:指定reducer的个数,如果设置-numReduceTasks 0或者-reducer NONE则没有reducer程序,mapper的输出直接作为整个作业的输出。

    -jobconf | -D NAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考hadoop-default.xml特别建议-jobconf mapred.job.name='My Job Name'设置作业名,使用-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW设置作业优先级,使用-jobconf mapred.job.map.capacity=M设置同时最多运行Mmap任务,使用-jobconf mapred.job.reduce.capacity=N设置同时最多运行Nreduce任务。

    常见的作业配置参数如下表所示: 

    mapred.job.name

    作业名

    mapred.job.priority

    作业优先级(-jobconf mapred.job.queue.name=<queue_name> mapred.job.priority=<VERY_LOW|LOW|NORMAL|HIGH|VERY_HIGH>)

    mapred.job.map.capacity

    最多同时运行map任务数

    mapred.job.reduce.capacity

    最多同时运行reduce任务数

    hadoop.job.ugi

    作业执行权限

    mapred.map.tasks

    map任务个数

    mapred.reduce.tasks

    reduce任务个数

    mapred.job.groups

    作业可运行的计算节点分组

    mapred.task.timeout

    任务没有响应(输入输出)的最大时间

    mapred.compress.map.output

    map的输出是否压缩

    mapred.map.output.compression.codec

    map的输出压缩方式

    mapred.output.compress

    reduce的输出是否压缩

    mapred.output.compression.codec

    reduce的输出压缩方式

    stream.map.output.field.separator

    map输出分隔符

     -combiner:指定combiner Java类,对应的Java类文件打包成jar文件后用-file分发。

    -partitioner:指定partitioner Java类,Streaming提供了一些实用的partitioner实现,参考KeyBasedFiledPartitonerIntHashPartitioner

    -inputformat, -outputformat:指定inputformatoutputformat Java类,用于读取输入数据和写入输出数据,分别要实现InputFormatOutputFormat接口。如果不指定,默认使用TextInputFormatTextOutputFormat

    -cmdenv NAME=VALUE:给mapperreducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值。

    -mapdebug, -reducedebug:分别指定mapperreducer程序失败时运行的debug程序。

    -verbose:指定输出详细信息,例如分发哪些文件,实际作业配置参数值等,可以用于调试。

    展开全文
  • Spark Streaming 介绍及架构——基础篇

    千次阅读 2018-07-19 09:41:03
    Spark Streaming是Spark API核心的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。 数据可以从许多来源获取,如Kafka,Flume,Kinesis或TCP套接字,并且可以使用复杂的算法进行处理,这些算法...

    1 概述

    官方网站
    Spark Streaming是Spark core API的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。 数据可以从许多来源获取,如Kafka,Flume,Kinesis或TCP sockets,并且可以使用复杂的算法进行处理,这些算法使用诸如map,reduce,join和window等高级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将Spark的机器学习和图形处理算法应用于数据流。

    总的来说我们可以从三点进行考虑:输入—–计算—–输出。正如下图所示:
    这里写图片描述
    1. 输入:可以从Kafka,Flume,HDFS等获取数据
    2. 计算:我们可以通过map,reduce,join等一系列算子通过spark计算引擎进行计算(基本和RDD一样,使用起来更方便。)
    3. 输出:可以输出到HDFS,数据库,HBase等。

    2 处理数据的特点

    在内部,它的工作原理如下。 Spark Streaming接收实时输入数据流并将数据分成批,然后由Spark引擎处理,以批量生成最终结果流。

    这里写图片描述
    从图中也能看出它将输入的数据分成多个batch进行处理,严格来说spark streaming 并不是一个真正的实时框架,因为他是分批次进行处理的。

    Spark Streaming提供了一个高层抽象,称为discretized stream或DStream,它表示连续的数据流。 DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示为一系列RDD。

    3 wordcount代码演示进行进一步认识

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object SocketWordCountApp {
    
      def main(args: Array[String]): Unit = {
        //创建SparkConf
        val conf=new SparkConf().setAppName("SocketWordCountApp").setMaster("local[2]")
        //通过conf 得到StreamingContext,底层就是创建了一个SparkContext
        val ssc=new StreamingContext(conf,Seconds(5))
        //通过socketTextStream创建一个DSteam,可以看出这里返回的是ReceiverInputDStream[T],后面从源码进行分析
        val DStream=ssc.socketTextStream("192.168.137.130",9998)
        //wc (看看是不是和RDD中的wc一样呢)
        DStream.flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey(_+_).print()
    
        // 开始计算
        ssc.start()
        // 等待计算结束
        ssc.awaitTermination()
      }
    }
    

    我们在通过nc命令像端口9998输入数据;

    [hadoop@hadoop ~]$ nc -lp 9998  
    a,a,a,a,b,b,b
    • 查看结果
    (b,3)
    (a,4)

    4 初始化StreamingContext

    要初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。StreamingContext对象也可以从现有的SparkContext对象创建。

    val conf = new SparkConf().setAppName(appName).setMaster(master)
    val ssc = new StreamingContext(conf, Seconds(1))

    当一个Context被定义,你必须做以下的事情:
    1. 通过定义输入DStream来创建输入源。
    2. 通过在DStream上应用转换操作和输出操作来定义流计算。
    3. 使用StreamContext.start()开始来接收数据和处理数据。
    4. 使用StreamContext.awaitTermination()来等待计算完成(手动或者因错误终止)。
    5. 可以StreamContext.stop()来手动停止计算(一般不会停止)。

    注意

    a.一旦一个StreamingContext被启动,就不能再设置或添加新的流计算。
    b.一旦一个StreamingContext被停止,就不能重新启动。
    c.同一时间内,在JVM内部只有一个StreamingContext处于活跃状态。
    d.默认情况下使用stop()方法停止StreamingContext的同时也会停止SparkContext,如果执行停止
    StreamingContext,可以将stop()的可选参数设置为false。
    e.SparkContext可以复用,即用来创建多个StreamingContext,只要在创建新的StreamingContext时,
    之前创建的StreamingContext是处于stop状态即可(SparkContext没有被停止)。
    

    5 Discretized Streams (DStreams)

    Discretized Streams或DStream是Spark Streaming提供的基本抽象。 它表示连续的数据流,即从源接收的输入数据流或通过转换输入流生成的已处理数据流。 在内部,DStream由连续的RDD系列表示,这是Spark对不可变的分布式数据集的抽象。 DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示。
    这里写图片描述

    在DStream上应用的任何操作都会转化为对每个RDD的操作。例如,wordcount案例中(下面会进行代码演示),flatMap操作应用于DStream行中的每个RDD,以生成单词DStream的RDD。 这在下图中显示。
    这里写图片描述

    这些基础RDD转换由Spark引擎计算。 DStream操作隐藏了大部分这些细节,并为开发人员提供了更高级别的API以方便使用。 这些操作将在后面的章节中详细讨论。

    6 spark streaming架构

    我们应该知道spark有很多种运行模式,下面通过spark on yarn (cluster模式)的模式图进行介绍,所以想要对spark streaming的运行架构进行理解,你要知道在yarn上提交作业的流程(可以参考该篇博客),以及spark的运行流程(参考该篇博客),下面是我在网上找的一幅图,我们根据这幅图 进行一个学习:
    这里写图片描述
    下面对于这幅图进行详细的剖析:
    符号表示:1,2,3….代表Spar on Yarn启动流程 ;(1)(2)(3)….代表Spark Streaming执行过程。
    1. 通过spark client提交作业到RM
    2. ResouceManager为该作业分配第一个Container,并与对应的NodeManager通信,创建Spark ApplicationMaster(每个SparkContext都有一个ApplicationMaster)
    3. NodeManager启动Spark AppMaster。
    4. Spark AppMaster并向ResourceManager AsM注册,用户就可以通过UI查看作业情况。
    5. ResourceManager通知NodeManager分配Container。(每个container的对应一个executor)
    6. NodeManager准备资源,并分配给executor。

    Spark Streaming执行过程

    spark on Yarn模式Driver运行在NM的container之中,运行application的main()函数并自动创建SparkContext,在SparkContext之上会创建一个 StreamingContext(因为途中并没有标出,这里说明下)。
    (1)SparkContext向资源管理器注册并申请运行Executor资源;
    (2)Executor会启动Receive接收数据(Data Received),分批处理。
    (3)Receive接收到数据后汇报给streamingcontext(底层调用的sparkcontext),他会以多个副本存储,默认两个(后面进行源码解读就知道了。)
    (4)Spark ApplicationMaster和executor(container)进行交互,分配task。
    (5)每个executor上会运行多个task执行任务。

    最后把结果保存在HDFS上。

    Saprk Streaming数据处理过程

    这里写图片描述
    首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。这里也和上面所说的一致。

    7 源码解析

    前面我们说了StreamingContext,底层就是创建了一个SparkContext,我们从源码中进行证明:
    new StreamingContext(conf,Seconds(5))

    /**
       * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
       * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
       * @param batchDuration the time interval at which streaming data will be divided into batches
       */
      def this(conf: SparkConf, batchDuration: Duration) = {
        this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
      }
    
    
    我们new StreamingContext(conf,Seconds(5)) 其实调用的上面的方法,
    1.传递一个SparkConf应该不陌生把,指定Spark参数的org.apache.spark.SparkConf对象;
    2.Duration流式数据分成批次的时间间隔
    
    

    我们 接着看看这句话StreamingContext.createNewSparkContext(conf)

      private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
        new SparkContext(conf)
      }
    
    可以看到底层就是给我们创建了一个SparkContext
    通过socketTextStream创建一个DSteam,可以看出这里返回的是ReceiverInputDStream[T],从源码进行分析:

    ssc.socketTextStream("192.168.137.130",9998)

    /**
       * Creates an input stream from TCP source hostname:port. Data is received using
       * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
       * lines.
       * @param hostname      Hostname to connect to for receiving data
       * @param port          Port to connect to for receiving data
       * @param storageLevel  Storage level to use for storing the received objects
       *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
       * @see [[socketStream]]
       */
      def socketTextStream(
          hostname: String,
          port: Int,
          storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
        ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
        socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
      }
    
    1.通过tcp socket监听hostname:port,接受字节(UTF8编码,`\ n`分隔)
    2.这里我们看到了默认的存储级别StorageLevel.MEMORY_AND_DISK_SER_2,因为他是一个默认参数,所以我们直接使用了默认的就木有传递。(和前面对应了吧)
    (还有一个问题,还记得spark中缓存级别吗???)
    3.返回ReceiverInputDStream
    4.调用了socketStream方法
    

    继续查看socketStream方法

    /**
       * Creates an input stream from TCP source hostname:port. Data is received using
       * a TCP socket and the receive bytes it interpreted as object using the given
       * converter.
       * @param hostname      Hostname to connect to for receiving data
       * @param port          Port to connect to for receiving data
       * @param converter     Function to convert the byte stream to objects
       * @param storageLevel  Storage level to use for storing the received objects
       * @tparam T            Type of the objects received (after converting bytes to objects)
       */
      def socketStream[T: ClassTag](
          hostname: String,
          port: Int,
          converter: (InputStream) => Iterator[T],
          storageLevel: StorageLevel
        ): ReceiverInputDStream[T] = {
        new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
      }
    
    这些参数大家应该明白了吧,我们继续看看SocketInputDStream[T]到底是什么吧。
    class SocketInputDStream[T: ClassTag](
        _ssc: StreamingContext,
        host: String,
        port: Int,
        bytesToObjects: InputStream => Iterator[T],
        storageLevel: StorageLevel
      ) extends ReceiverInputDStream[T](_ssc) {
    
    SocketInputDStream这个类继承了ReceiverInputDStream,感觉快看到希望了啊。
    
    /**
     * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
     * that has to start a receiver on worker nodes to receive external data.
     * Specific implementations of ReceiverInputDStream must
     * define [[getReceiver]] function that gets the receiver object of type
     * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
     * to the workers to receive data.
     * @param _ssc Streaming context that will execute this input stream
     * @tparam T Class type of the object of this stream
     */
    abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
      extends InputDStream[T](_ssc) {
    
    1.ReceiverInputDStream是一个抽象列继承了InputDStream,必须在工作节点上启动接收器才能接收外部数据,
    2.ReceiverInputDStream 通过getReceiver函数获取类型的接收器对象,即org.apache.spark.streaming.receiver.Receiver,
    3.Receiver的作用是接受数据。

    InputDStream[T](_ssc)是什么呢?

    /**
     * This is the abstract base class for all input streams. This class provides methods
     * start() and stop() which are called by Spark Streaming system to start and stop
     * receiving data, respectively.
     * Input streams that can generate RDDs from new data by running a service/thread only on
     * the driver node (that is, without running a receiver on worker nodes), can be
     * implemented by directly inheriting this InputDStream. For example,
     * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
     * new files and generates RDDs with the new files. For implementing input streams
     * that requires running a receiver on the worker nodes, use
     * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
     *
     * @param _ssc Streaming context that will execute this input stream
     */
    abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
      extends DStream[T](_ssc) {
    
    1.这是所有输入流的抽象基类。这个类提供了方法由Spark Streaming系统调用的start()和stop()来启动和停止
    接收数据。
    2.输入流会被差分成多个rdd,运行在每一个线程中。
    3.驱动程序节点(即,不在工作节点上运行接收器)可以 通过直接继承此InputDStream实现。例如,
    FileInputDStream,InputDStream的一个子类。产生一个新文件或者生成多个rdd都会产生新文件,
    这是通过driver监视HDFS目录
    4.用于实现输入流需要在工作节点上运行接收器,请使用
    [[org.apache.spark.streaming.dstream.ReceiverInputDStream]]作为父类。
    
    这里我们可以看到InputDStream继承DStream,终于看到了DStream,全村人的希望啊.
    **
     * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
     * sequence of RDDs (of the same type) representing a continuous stream of data (see
     * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
     * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
     * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
     * transforming existing DStreams using operations such as `map`,
     * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
     * periodically generates a RDD, either from live data or by transforming the RDD generated by a
     * parent DStream.
     *
     * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
     * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
     * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
     * `join`. These operations are automatically available on any DStream of pairs
     * (e.g., DStream[(Int, Int)] through implicit conversions.
     *
     * A DStream internally is characterized by a few basic properties:
     *  - A list of other DStreams that the DStream depends on
     *  - A time interval at which the DStream generates an RDD
     *  - A function that is used to generate an RDD after each time interval
     */
    abstract class DStream[T: ClassTag] (
        @transient private[streaming] var ssc: StreamingContext
      ) extends Serializable with Logging {
    
    这一大段注释,,头大。。。
    其实就是我们前面说的输入-----计算-----输出:
    可以通过TCP socket,Kafka,Flume 输入数据,转化为DStream,
    通过一些算子 `map`, `filter` and`window`进行计算
    
    DStream内部具有几个基本属性:
     * - DStream依赖的其他DStream列表(个人感觉就是RDD之间的依赖关系)
     * - DStream生成RDD的时间间隔(可自行设置)
     * - 每个时间间隔后用于生成RDD的函数(生成多个RDD)

    终于从头到尾给看完了哈,进行了一个简单的介绍,不知道小伙伴有没有理解呢。

    8 输入数据流

    从源端接受的数据代表输入数据流,通过接受输入数据会产生一个DStream,例如我们上面进行的wc中val DStream=ssc.socketTextStream("192.168.137.130",9998)这句代码接收数据后返回一个DStream, 每个输入DStream都与Receiver对象相关联,Receiver对象从源接收数据并将其存储在Spark的内存中进行处理。

    Spark Streaming提供了两类内置streaming sources。
    基本来源:StreamingContext API中直接可用的来源。 示例:文件系统和socket connections。
    高级来源:可通过额外的实用程序课程获得Kafka,Flume,Kinesis等来源。 这些要求链接部分中讨论的额外依赖关系。(后面我们会进行讲解)

    注意,如果您想在流式传输应用程序中并行接收多个数据流,则可以创建多个输入DStream(在性能调整部分中进一步讨
    论)。 这将创建多个接收器,它将同时接收多个数据流。 但请注意,Spark worker / executor是一项长期运行的任
    务,因此它占用了分配给Spark Streaming应用程序的核心之一。 因此,重要的是要记住,Spark Streaming应用程序
    需要分配足够的内核(或线程,如果在本地运行)来处理接收到的数据以及运行接收器。
    

    对local进行进一步理解
    1. local :用一个工作线程在本地运行Spark
    2. local[K]:使用K工作线程在本地运行Spark(理想情况下,将其设置为您计算机上的核心数)。
    3. local[K,F]:使用K工作线程,最多为F在本地运行Spark(可以通过参数spark.task.maxFailures设置)
    4. local[*]:使用与您的计算机上的逻辑内核一样多的工作线程在本地运行Spark。

    在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”作为主URL。 这两者中的任何一个都意味着只有一个线程将用于本地运行任务。 如果您使用的是基于接收器的输入DStream(例如套接字,Kafka,Flume等),那么receive接收器对象将占用一个线程,那就意味着没有足够的线程来处理数据。因此,在本地运行时,请始终用“local [n]”作为主URL,其中n>要运行的接收器的数量。在群集上运行,分配给Spark Streaming应用程序的内核数量必须多于接收器的数量。 否则系统将接收数据,但无法处理它。

    展开全文
  • Streaming(DataStream API): 概念介绍

    千次阅读 2018-09-20 00:11:09
    Streaming(DataStream API) 原文参考: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#collection-data-sources Overview Flink DataStream Api 编程指南 在...
    1. Streaming(DataStream API)

    原文参考:

    https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#collection-data-sources

      1. Overview
        1. Flink DataStream Api 编程指南

    Flink中的DataStream 程序在数据流(data streams)上实现了各种转换(transformation)操作(,filter,updating,state,window,aggregating )Data Streams 可以从各种数据源(message queue,socket,fiels )中被创建。产生的结果可以输出到各种sink(目的地),比如将它写入到数据文件或一些标准的输出当中。Flink 程序可以在各种环境中运行,如 standlone ,嵌入到其他程序中。Flink能在本地的JVM中执行,也可以在集群中运行(yarn.

     

    flink Api的基本概念请参照 basic concepts 

     

    为了创建你自己的Flink DataStream 程序,我们鼓励你一开始使用 anatomy of a Flink Program 并逐步的添加 stream transformations. 下面的章节将为添加一些operations(翻者注:Flink 中的任何的transformations)和高级特性做一些引用说明

     

    Example 程序案例

    Data Source 数据源

    DataStream transformation

    Data sink 数据输出

    Iterations

    Execution Parametes 执行参数

      Fault Tolerance(故障容错)

      Controlling Latency (延迟控制)

    Debugging

      Local Execution Envionment

      Collection Data Sources

      Iterator Data Sink

    Where to go next (下一站)?

     

        1.  Example Program

    下面的代码是一段完整的基于窗口的 word count 应用的例子,单词的数量来源于一个5秒窗口的socket . 你可以复制到本地并运行它。

    Java 代码片段

    public class WindowWordCount {
    
        public static void main(String[] args) throws Exception {
    
      
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
      
    
            DataStream<Tuple2<String, Integer>> dataStream = env
    
                    .socketTextStream("localhost", 9999)
    
                    .flatMap(new Splitter())
    
                    .keyBy(0)
    
                    .timeWindow(Time.seconds(5))
    
                    .sum(1);
    
      
    
            dataStream.print();
    
      
    
            env.execute("Window WordCount");
    
        }
    
      
    
        public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
            @Override
    
            public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
    
                for (String word: sentence.split(" ")) {
    
                    out.collect(new Tuple2<String, Integer>(word, 1));
    
                }
    
            }
    
        }
    
    }

     

     

     

    Scala 代码片段

    import org.apache.flink.streaming.api.scala._
    
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /**
    
      * Created by yuanhailong on 2018/9/19.
    
      */
    
    object WindowWordCount {
    
      def main(args: Array[String]) {
    
      
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val text = env.socketTextStream("localhost", 9999)
    
      
    
        val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
    
          .map { (_, 1) }
    
          .keyBy(0)
    
          .timeWindow(Time.seconds(5))
    
          .sum(1)
    
      
    
        counts.print()
    
      
    
        env.execute("Window Stream WordCount")
    
      }
    
    }
    
     

     

    为了运行这个例子,首先你需要启动在命令行终端用netcat 启动一个输入流:

     

    nc –lk 9999

     

    只要输入一些词就会返回一些新的单词。这些词将会成为word count 程序的输入。如果你想看到的结果大于1 。你只要重复的输入5秒钟之内相同的词即可。(如果你的输入不够快,你可以增加窗口大小)

     

        1. Data Sources [数据源]

    数据源表示你的程序从哪里读取数据。通过StreamExecutionEnvironment.addSource(sourceFunction). 你能添加数据源到你的程序中。Flink 实现了几种数据源函数(function) ,但你可以通过实现SourceFunction 自定义数据源[翻者注:SourceFunction并行度1]。如果你想要实现多个并行度的数据源函数你可以通过实现ParallelSourceFunction 接口或者扩展RichParallelSourceFunction 

     

           有一些预先定义的数据源来源于StreamExecutionEnvironment

     

    file-based[基于文件的]:

    1. readTextFile(path):读取文本文件,file 遵循TextInputFormat 规范,文本文件中的数据每一行作为一个字符串返回。
    2. readFile(fileinputFormat,path): 通过指定文件的输入格式来读取数据文件
    3. readFile(fileInputFormat, path, watchType, interval, pathFilter) :这个方法的调用实际是通过上面两个方法中的一个来实现的。它使用给定的fileInputFormat读取指定路径下面的文件。根据提供的watchType. 数据源可能周期性(根据interval ms)的监控Path路径下的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY)。或者仅处理一次当前路径下面的数据然后退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter排除不需要处理的数据。

    IMPLEMENTATION(实现):

    在内部,Flink 将读数据程序划分为两个子任务(sub-task) ,也就是目录监控和数据读取。每个子任务通过独立的条目实现。监控是通过并行度为1的任务实现的。然而数据读取时通过多个任务并行实现的。并行度等于Job任务的并行度。目录监控任务去监控目录(根据watchType 周期性的监控或仅读取一次),找到文件,切割文件,并切割文件到下游readers .   readers将读取实际的数据。每个切割的文件仅被一个readers 读取。然而一个readers 可以读取多个文件。

    IMPORTANT NOTES(特别注意):

    1. 如果watchType 被设置为 FileProcessingMode.PROCESS_CONTINUOUSLY。当files被修改的时候,它的整个内容将会被重新处理。这就会破坏“exactly-once”的语义,当追加数据到文件的末尾将导致所有的数据都会被重新处理。
    2. 如果watchType 被设置为FileProcessingMode.PROCESS_ONCE. 数据源只会被扫描一次然后退出,无需等待readers完成文件内容的读取[这里指的是监控服务]。当然readers 会持续读取文件内容直到文件内容读取完成.关闭source 将会导致此后的信息不会再有检查点。这将导致在节点失败后恢复变慢,因为Job需要从上一个检查点恢复

     

    Socked-Based:

    1. socketTextStream: Socket中读取数据。通过指定分隔符切割数据

     

    Collection-Based:

    1. fromCollection(Seq): java  Java.util.Collection 中创建data stream,集合中所有的元素必须具备相同的数据类型
    2. fromCollection(Iterator):从Iterator中创建data stream. 该类指定迭代器返回的元素的数据类型。
    3. fromElements(elements: _*) : 从一系列的对象中创建data stream. 所有的对象必须具备相同的类型
    4. fromParallelCollection(SplittableIterator):Iterator中创建data stream. 该类指定迭代器返回的元素的数据类型。
    5. generateSequence(from, to) :在给定的范围类生成一系列的数字
        1. DataStream transformations

    参见 operators  

        1. Data Sinks

    Data sinks 消费 DataStream中的数据并将数据输出到files,socket,其他额外系统或print Flink 有多种输出格式它封装了DataStream上的背后的多种operators

    1. writeAsText() / TextOutputFormat: 写元素一行作为一个String . 这个Strings 通过调用每个元素的toString() 方法来获取。
    2. writeAsCsv(...) / CsvOutputFormat: 用逗号分隔value写元组(tuple). Row Filed分隔符可配置。Value通过调用toString() 方法来获取。
    3. print() / printToErr():打印每个元素toString()value到标准输出。
    4. writeUsingOutputFormat() / FileOutputFormat:方法和自定义文件输出的基础类。支持自定义的对象到字节的转换
    5. writeToSocket:根据SerializationSchema 写元素到Socket
    6. addSink : 调用自定义的sink 函数。Flink 自带了多重sink 函数(如Apache kafka

     

    注意,在DataStrem上的Write()方法主要是为了调试的目的。他们不会参加flinkchekpoint操作。这就意味着它使用的是“at-least-once”语义。数据如何刷写到目标系统依赖于实现的OutputFormat. 这就意味着不是发送到目标系统的数据会立即展现出来。当然,在失败的场景中,这些数据可能会丢失。

     

        为了可靠性,strema exactly-once 传递到文件系统,可以使用flink-connector-filesystem

     

        1. Iterations

    Iterative streaming(迭代流)程序实现一个step 函数,并将其嵌入到IterativeStream中。由于一个DataStream程序可能永远都不会完成,因此没有最大的迭代次数。相反,你需要指定那些stream需要返回到iteration并且通过splitfilter transformation 指定那些需要输出到下游。在这里,我们有一个iteration例子。代码的主体部分是一个简单的map 转换 ,并通过返回的元素区分不同的元素返回给下游。

    val iteratedStream = someDataStream.iterate(
      iteration => {
        val iterationBody = iteration.map(/* this is executed many times */)
        (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
    })

     

    例如: 这里有一个程序冲一个整数中持续减1,直到它等于0

    val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
    val iteratedStream = someIntegers.iterate(
      iteration => {
        val minusOne = iteration.map( v => v - 1)
        val stillGreaterThanZero = minusOne.filter (_ > 0)
        val lessThanZero = minusOne.filter(_ <= 0)
        (stillGreaterThanZero, lessThanZero)
      }
    )

     

        1. Execution Parameters

    StreamExecutionEnvironment  包含ExecutionConfig  ExecutionConfig 允许为Flink运行时设置一些配置参数。

    更多的参数参见execution configuration 。这些参数属于DataStream API:

    1. setAutoWatermarkInterval(long millseconds):设置watermark发射的频率。通过getAutoWatermarkInterval可以得到当前的watermarkvalue.
          1. Fault Tolerance(故障容错)

    State & Checkpointing 描述了如何启用Flink的checkpoint 机制。

          1. Controlling Latency

    默认情况下,数据元素在网络上不是一对一的传输(如果这样将会导致不必要的网络延迟)而是先缓存起来。缓存(在两台机器上实际传输的对象)的大小在flink的配置文件中能被配置。为了更好的吞吐量这往往是一个好方法,但是当数据不足够快的时候会导致一定的数据延迟。为了控制吞吐量和延迟。在execution 环境上你可以通过env.setBufferTimeout(timeoutMillis)设置缓存等待被填满的最大等待时间。这样即使缓存区没有被填满也会被自动发送。这个timeout的默认值时100 ms

     

    Usage:

    val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
    env.setBufferTimeout(timeoutMillis)
    env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

     

    为了最大的吞吐量。set setBufferTimeout(-1),这样会移除timeout并且只有当缓存区填满的时候才能被发送。为了最小的延迟,设置timeout = 0 来关闭缓存。Timeout=0 应该要去避免,因为这会引起服务性能下降。

     

        1. Debugging

    在分布式集群中运行分布式程序之前,一个很好的办法是确定实现的算法按照期待的方式运行。因此,实现数据分析的程序通常是一个结果检查,调试,改善提高的过程。

    Flink IDE内通过本地调试的方式提供了数据分析程序开发处理的特性,注入测试,收集数据。本小节将给一些提示如何开发Flink程序。

     

          1. Local Execution Enviroment

    LocalStreamEnvironment 在同一的JVM内启动内创建Flink System.如果你是在IDE里面启动LocalEnvironment 。你可以在你的代码中打断点这样就很容易去调试了。

         

    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val lines = env.addSource(/* some source */)
    // build your program
    env.execute()

     

          1. Collection Data Sources

    Flink 为方便调试通过java collections 提供了一些特殊的数据源,一旦程序通过测试,source sink 很容易被替换。

    val env = StreamExecutionEnvironment.createLocalEnvironment()
    // Create a DataStream from a list of elements
    val myInts = env.fromElements(1, 2, 3, 4, 5)
    // Create a DataStream from any Collection
    val data: Seq[(String, Int)] = ...
    val myTuples = env.fromCollection(data)
    // Create a DataStream from an Iterator
    val longIt: Iterator[Long] = ...
    val myLongs = env.fromCollection(longIt)

     

     

          1. Iterator Data Sink

    Flink 为调试和测试的目的提供了收集DataStream 结果的sink .可以像下面这样使用:

    import org.apache.flink.streaming.experimental.DataStreamUtils
    import scala.collection.JavaConverters.asScalaIteratorConverter
    val myResult: DataStream[(String, Int)] = ...
    val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala

     

    注:在flink 1.5.0  flink-streaming-contrib 被移除了。使用flink-streaming-java flink-streaming-scala 来替代

     

        1. Where to go next(下一步)?
    1. Operators: stream operators 规范说明
    2. Event Time:介绍flink的时间概念
    3. State & Fault Tolerance:解释如何开发有状态的应用
    4. Connectors:描述有效的输入输出Connectors
    展开全文
  • 大数据系列之(一) Streaming模式基础知识 大数据 分布式系统 2016-01-07 20:18:23 发布 您的评价:   0.0 收藏 0收藏 作者:Tyler Akidau译者:张磊原文: ...

    译者摘要

    现在大数据,云计算已经成为互联网的标配,但是现在主流的大数据处理依旧是使用batch模式,batch模式就是将数据按某种规则分成块,然后对整个块跑计算逻辑,缺点是延迟太高(至少是分钟),常用的工具就是Hadoop。在日益变化的需求面前,高延迟越来越不能忍受,因此Streaming模式应运而生,他最大的特点就是低延迟,最快能到毫秒级别,常用的Streaming工具主要是storm,spark等,但是这些工具都有各自的优缺点,功能上不能完全取代batch,这篇文章就是想深入分析什么样的Streaming系统能彻底替代batch,并最终打败batch。

    尽管现在市场上对streaming越来越关注,但是相对于batch来说,大部分streaming系统还不是很成熟,所以streaming系统正处于积极开发和进步中。

    作为一个在分布式steaming系统上工作过5年的从业者(MillWeel, Cloud DataFlow), 我非常荣幸给大家分享streaming系统能解决什么问题,以及batch系统和streaming系统在语义上的区别,这非常重要,会帮助我们更好地使用它。内容众多,我会分成两篇文章来讲述:

    1. Streaming 101 : 本篇会先讲述一些最基础的知识和澄清一些专业术语,之后我们深入地讲述时间的作用,以及数据处理常用方法,这主要包括batch和streaming两种模式。
    2. The Dataflow Model : 第二篇用一些实际例子讲述Cload Dataflow, 它综合了batch和streaming两种模式。之后,我回总结现有batch和streaming系统在语义上的相同点和不同点。

    下面是一篇很长的文章,不要分心。(译者:原文是nerdy,主要指呆滞但是专注的技术宅)

    背景知识

    本节讲述一些非常重要的背景知识,帮助理解剩下的部分。

    • 术语 : 任何复杂的问题都需要精确的术语。我会尝试把一些被滥用的术语重新定义,所以让大家能明白我用他们的时候在说什么。
    • 功能 :我会列举现有streaming系统常见的缺点,然后提出一个我觉得streaming系统应该有的功能,这些功能能够解决现有或者将来数据处理的需求。
    • 时间问题(Time Domains) : 介绍两个数据处理中跟时间相关的两个定义,并分析他们是怎么互相联系,然后指出这两个时间给数据处理带来怎样的困难。

    术语:什么是streaming?

    在我们深入之前,我想先解决:什么是streaming。现在‘Streaming’有很多不一样的意义,让我们误解streaming到底能解决什么问题,所以我一定要先 精确 定义它。

    问题的本质是:很多术语不是告诉我们他们是什么,而是告诉我们他们是怎么实现的,比如Unbounded data processing无穷数据处理和Approximate resulte近似结果处理被认为就是streaming,而实际上只是streaming常常用来解决这类问题,其实batch也能解决。没有精确定义的streaming被人们狭隘的认为streaming只是拥有某些常被称做‘streaming’的特性,比如近似结果。一个设计合理的streaming系统是可以提供 correct正确 ,consistent一致 并且 repeatable可重复计算 的结果,就像现在所有批处理引擎,我觉得 streaming定义 是:一种能处理无穷数据的数据处理引擎 。(为了这个定义更完整,我必须强调它包含了我们常说的streaming和micro-batch)。

    下面是一些经常和‘streaming’联系在一起的几个术语,我分别为他们给出了更精确,更清楚的定义,我建议整个工业界都应该采纳它们:

    1. Unbounded Data无穷数据 :一种持续产生并且无穷的数据,他们经常被称为‘streaming data’流数据,但是用streaming和batch去描述数据是错误的,正如我刚才说的,这两个词隐喻只能用某种类型 执行引擎 去处理这类数据。数据只分成两类:无穷和有限,不能说数据是streaming或者batch。我建议当我说infinite ‘streaming’ data无穷流的数据请直接用unbounded data无穷数据,当我们说finite batch data有限‘batch’数据时直接用bounded data有限数据。(译者:作者只是强调streaming和batch不能用来描述数据,它们只是数据处理的方式。)
    2. Unbounded data processing无穷数据处理 :一种用来处理无穷数据的方式。我自己也常常使用streaming来代表这种方式,但是不对的,batch也能实现它。因此,为了减少误解,我不会用streaming来代替它,我会直接使用无穷数据处理这个术语。
    3. 低延迟,近似结果 :他们经常跟流处理引擎连在一起。批处理系统通常不是设计用来解决低延迟近似结果,这是现状,不代表它不可以。事实上,批处理引擎是可以提供近似结果的。因此,低延迟就是低延迟(译者:延迟是指从系统收到消息,到完成整个消息计算的时间差),近似结果就是近似结果(译者:很多计算开销很大,近似结果用很少的开销就能提供可控的精度的正确性,比如yahoo最近开源的Sketches 就能解决unique users count),不要用streaming流处理引擎去代表他们。

    到此,我已经定义了什么是流处理引擎:一种用来处理无穷数据的引擎,对于其他术语,我不会用streaming来替代他们。我们已经在Cloud Dataflow中使用这些术语了,我建议其他人也可以采用它们。

    (译者:在下面的文章中,我会直接用streaming和batch来代替流处理引擎和批处理引擎,这样可以少敲几个字,敲字很辛苦)

    泛化streaming的功能

    下面让我们看看streaming到底能做什么和不能做什么,特别是设计合理的streaming能做什么。长久以来,streaming被工业界狭隘地认为是提供低延迟,近似结果,批处理是用来提供正确结果的,比如 Lambda Architecture (译者注:下文我会用Lambda架构表示,我理解为啥叫它Lambda)。

    如果你熟悉Lambda架构,你应该知道这个架构最基本的思想就是使用批处理引擎和流处理引擎,执行一样的计算,流处理引擎提供低延迟不精确结果(可能是使用近似算法,或者流处理就不准备提供正确性),之后批处理引擎提供正确的结果,覆盖流处理引擎的结果。起初,这个架构,由 Nathna Marz, Storm创始人 提出,真的获得不少成功,因为这个想法在当时看来真是不错,那时候流处理引擎还不令人满意,批处理引擎也不能像我们预期的那样发展得很好,Lambda架构却提供一种短期解决方案,并且很多项目真的用了。Lambda架构也存在缺点:维护成本高,你需要维护两套独立的系统,而且需要在最后合并他们的结果。

    作为一个在 强一致 流处理引擎工作多年的从业人员,我不同意Lambda架构的出发点,而且我特别喜欢 Jay Kreps’ Questioning the Lambda Architecture 这篇文章,它很有远见地质疑了双引擎的必要性,并且给出很强的理由。Kreps使用repeatability可重放的消息队列系统 kafka 去解决repeatability可重放问题,然后他提出一种新的架构:Kappa架构:基本思想就是只使用一套合理设计的引擎。虽然我不认为这个概念需要一个新的名字,但是我非常赞同他的思想。

    (译者:可重放是指你可以随时回到一个时间点顺序读取任何信息,kafka能做到这点,但现实是kafka也有开销,比如你的model需要一年的历史数据,你会让kafka存下一年的数据?基本上不会,你应该把数据存在开销更低的系统,比如hadoop,但是streaming系统可以读取回这些历史数据)

    现在我们应该更近一步,我认为:设计合理的streaming是可以提供比batch更多的功能,到那时我们不再需要batch。(译者:当前所有streaming比batch占用更多的资源,从商业上说batch一定会持续存在直到streaming能更加高效利用资源)Flink就是利用这个想法创造了一个完全的streaming系统,并且支持batch模式,我非常喜欢它。

    随着streaming系统越来越成熟,它将提供一种无穷流处理的框架,并且让Lambda架构消失在历史中。我相信它已经在发生。如果我想彻底打败batch,我们需要完成两件事:

    正确性:这让streaming和batch能够等同。

    本质上,正确性取决于consistent一致的存储。steaming需要一种类似checkpointing持久化存储,正如Kreps在它这篇文字所写,这种存储在机器挂掉的情况也能保证一致性。当几年前Spark streaming首次出现时,它在streaming世界里就是一致性的代名词。一切都在向前发展,仍有很多streaming系统不提供强一致性,我实在是不理解为啥at-most-once仍然存在,但是实际情况它还在。(译者:如果at-most-once指的是系统保证这个消息最多被处理一次,其他方式是:at-least-once至少一次和exactly-once只有一次,作者想表达的是最多一次就是系统会丢数据,除非你不关心正确性,否则这种方式就不该存在。但实际是上实现三种方式的开销不一样,随着系统越来越成熟,可能三种开销就不会那么大,到那时候估计就没人愿意使用最多一次的方式了。)

    重要的事情再说一次:强一致性必须要exactly-once处理,否则无正确性可言,而且只有这样才能追上并且最终超越batch系统。除非你毫不在乎结果的正确性,否则我真诚地建议你放弃任何不提供强一致性的streaming系统,不要浪费时间在他们身上。

    如果你真的想学习如何给streaming系统设计强一致性,我推荐你可以读读 MillWheel 和 Spark Streaming 的论文,这两篇论文都花费了很长的时间讲述一致性。本文时间有限,就不在此详述了。 (译者:还没读,看完会给大家分享下)

    时间工具:这让streaming超越batch。

    当我们处理无穷无序数据时,时间工具是一切的基础。当前,越来越多的需求要求我们处理无穷无序数据,而现有batch系统(包括大多数streaming系统)都缺乏必要的工具去解决这个困难。我会用本文剩下的部分和下一篇文章着重解释它。(译者:无序是难点,大部分分布式系统都不可能提供顺序保证,这里时间工具是指系统提供api,让我们自己控制超时以及如何按时间分块,下面会有详述。)

    我们首先会了解时间问题中重要的概念,而且深入分析什么是无穷无序数据的时间差,之后我会用剩下的部分讲解batch和streaming系统处理有限和无穷数据的常用方法。

    Event Time和 Processing Time

    • Event time发生时间 :事件实际触发时间(译者:我常叫client time,比如你了解手机app5分钟活跃度,那Event time就是你实际用手机的时间,由手机app打的时间戳)
    • Processing time处理时间 :时间被系统获知的时间(译者:我常叫server time,当事件进入这个系统的时间,大部分是应用层收到消息后加的时间戳。)

    (译者:下面我会直接用event time 和processing time)

    很多需求都不关注event time,这样的生活会简单很多,但是还是有不少需求是关心的,比如为带时序的用户行为建立特征,大多数付费应用和很多异常检查。(译者:广告的attribution就是带时序的行为,你只能在看过广告后点击)

    完美情况下,Event time和processing time应该永远是相等的,事件发生后立即被处理。现实是残酷的,两者的时间差不仅不是0,而是一个与输入,执行引擎和硬件相关的函数。下面几个是经常影响时间差:

    * 有限的共享资源:比如网络阻塞,网络分区,不独占条件下的共享CPU

    * 软件:分布式系统逻辑,竞争

    * 数据本身的特征:包括key的分布,吞吐量差异,无序(比如:很多人在坐飞机时关闭飞行模式使用手机,等手机网络恢复后手机会把事件发给手机)

    下图就是一个真实的event time和processing time差异图:

    黑色点线代表两个事件完全一致。在这个例子中,系统延迟在中间到最低点。横向距离差代表两个时间的时间差,基本上这个时间差都是由延迟导致。

    这两个时间没有固定的相关性(译者:不能简单的用一个函数去计算),所以如果你真的关心event time,你就不能用processing time去分析你的数据。不幸的是大多数现有系统都是用processing time去处理无穷数据,他们一般都会将输入按processing time用一些临时的分界线拆分小块去处理。

    如果你们系统关心正确性,那就千万不能用processing time去分块,否则一部分消息会因此被分到错误的块,无正确性而言。我们会在下面看到更多这样的例子。

    即使我们用event time作为分界线,其实也不能实现正确性。如果event time和processing time之间没有一个可预测的关系,你怎么能确定你已经收到所有消息?(比如:你要统计5分钟的数据,你怎么能保证收到所有5分钟的数据)现在大部分数据处理系统都依赖某种“完整性”,但是这么做让他们处理无穷数据遇到严重的困难。(译者:完整性一般都是用超时来实现,等一段时间发现没有了就放弃)

    我们应该设计工具能够让我们生活在这种不确定性中(译者:不确定性是指时间差不能预测)。当新数据到底时,我们可以获取或者修改老数据,所有系统都应该自然而然去优化“完整性”,而不是认为它至少可有可无的语义。(译者:优化完整性的意思是系统能够提供api控制超时。)

    在我们深入如何实现类似Cloud Dataflow数据模型前,我们先了解一个更有用的话题:常见数据处理方式。

    数据处理方式

    现在我们可以开始谈一些重要的数据处理模式了:batch和streaming(我把micro-batch归到streaming中,因为两者差异在这里不是很重要)。

    有限数据

    处理有限数据很简单,大多数都已经熟悉。在下图中,左边是一个完整数据集,中间是数据处理引擎(一般是batch,当然一些设计合理的streaming也可以),比如 MapReduce ,右边则是处理的结果:

    更让我们感兴趣的是无穷数据,我们会先分析传统的batch,然后分析常见streaming或者micro-batch。

    无穷数据 —— batch

    虽然batch从字面上看不像用来处理无穷数据,但是从它出生就已经被业界使用了。大家都能想到,只要我们能把无穷数据分块,我们就能用batch引擎出处理。

    固定窗口

    最常见的方法就是把数据拆分成固定大小的块,然后依次处理各个块数据。特别是日志数据,我们可以自然而然把数据拆分成以时间为单位的树状结构,这个很直观,因为基本上就是按event time分块。

    实际上,大部分系统仍然要处理完整性问题:如果一些事件由于网络延迟到达,怎么办?如果消息是全球的,但是存在一个公共地方,怎么办?如果事件来自手机,怎么办?我们可能需要用一些特别的办法解决它们:比如延迟事件处理直到所有数据都到达,或者重新计算整个数据集只要有新的数据到达。

    ** Sessions 序列 **这个比简单分块复杂,我们需要把事件拆的更细。Sessions一般是指把事件按活跃间隔拆分。如果在batch中计算sessions,你会出现一个Session横跨多个块,比如下图红色部分。当然,当你增加每个batch的时间间隔,这种横跨多个batch的概率会降低,但是它会带来更高的延迟。另外一个方法是增加额外的逻辑出处理这种情况,代价是逻辑复杂。

    两者都不是完美的方法,更好的方法是用streaming的方式处理session,我们下面会讲。

    无穷数据 —— streaming

    streaming从开始就是设计用来处理无穷数据,这跟大多数batch引擎不太一样。正如我们上面说的,大多数分布式系统的数据不只是无穷,还是一些其他让人讨厌的特性:

    • 无序数据 :意味着你需要处理时序问题,如果你关心event time。(译者:所有事件从绝对时间上看,一定都是有顺序的,如果一切都是单机,你一定定保证顺序。)
    • event time时间差 :你不能保证在X+Y时能看到大部分X发生的数据,这里Y是固定的数值。(译者:还是强调消息到底的不确定性)

    因此出现了一批处理这类数据的方法,我大致把他们分为4类:

    • Time-agnostic 时间无关的逻辑
    • Approximation 近似算法
    • Processing time分块
    • Event time分块

    我们简单分析下他们

    Time-agnostic

    这类处理逻辑完全跟时间没关心,只是更数据本身有关,batch和streaming在处理这种逻辑时没有本质区别,所以大部分streaming都支持。当然batch引擎也支持,你可以用任意方法把数据分块,再依次处理。下面是一些实际例子:

    Filtering过滤

    最基础的逻辑,比如你在处理web日志,你想要某些域名的数据,你可以在每个事件到底的时候丢掉那些不想要的,跟无穷,无序,时间差一点关系都没有。

    Inner-joins

    当join两类数据时,你只关系两者都到达的情况,如果一方先到,你可以把它存起来,等第二个到底之后取回前一个,然后计算。当然你想回收一些单个事件占用的资源,那就是跟时间有关了。(译者:超时回收)

    当然如果要支持某些outer-join,就存在数据完整性问题:当你看到一方时,你怎么知道另一方是否还会来?你肯定不知道,除非你设计超时,超时又跟时间相关,本质上又是另一种形式的分块,我们会详细分析。

    近似算法

    第二大类就是近似算法,比如近似Top-N,streaming K-kmeans等等。近似算法的好处是开销低,可以处理无穷数据。坏处是这类算法数量有限,而且实现复杂,近似的本质限制他们的使用,不可能所有场景都能用近似。这些算法大多数都有一些时间特征,比如decay,当然他们大多用processing time。另一个很重要的特点就是他们都提供可控的错误率。如果错误率可以根据数据到达的顺序预测出来,这些错误率就可以忽略不计,即便是无穷数据,这点很重要,你最好能记住这点。

    近似算法让人很兴奋,但本质上是另一种形式的时间无关算法。

    分块

    剩下两个方法都是讲如何将数据按时间分块。我想先讲明白啥是windowing,它就是把无穷或者有限数据按分界线拆分成有限的块。下图是三种不同的分块策略:

    • Fixed windows 固定窗口 :按时间分成固定大小的块。
    • Sliding windows 滑动窗口 :更一般的固定窗口,滑动窗口一般都是固定长度加上固定时间段。如果段小于长度,滑动窗口就是重叠的,否则就是sampling窗口,只使用部分数据。跟fixed窗口一样,大部分滑动窗口都是对齐的,有时候我们用不对齐的方式优化性能。
    • Sessions 序列 :动态窗口,用不活跃段将所有事件分成一个个session,所以session是一串连续的事件,不活跃段都是用超时时间。session一般用来分析用户行为数据,取决于实际时间情况,不能预先计算。sessions是经典的不对齐窗口,因为没有两个人的数据是完全相同的。

    我们会看看用processing time和event time去分窗口到达有什么不同,当然从processing time开始,因为它更常用。

    ** Processing time分块 **

    系统只需要保存来的数据直到一段时间完成。比如:5分钟分块,系统可以保存5分钟的数据,然后认为所有数据都到了,发给下游处理就行。这种方式有下面几个很好的特性:

    • 简单 :你不需要管时间是否乱,只需要保存来的数据,到时扔给下游
    • 完整性 :系统能清楚的知道数据是否完整,没必要处理“晚来”的数据。
    • 如果你想推断一些关于上游的情况,只需要用processing time。 : 监控系统就是最好的例子,比如你想知道一个全球化web service的QPS,使用processing time计算QPS是最好的方法。

    当然processing time也有缺点: 如果数据含有event time,并且你想用processing time来分块解决,那么这些数据必须是有序的。 不幸的是分布式上游大部分无法保证有序。

    一个简单的例子:手机app想了解用户使用情况。如果一个手机断网一段时间,这段时间的数据不会即时发到服务器端直到手机再次连网。我们无法用processing time分块获得任何有用的信息。

    另一个例子:有些分布式上游在正常情况下可以保证event time顺序,但是不代表上游一直能保持一样的时间差。比如一个全球系统,由于网络问题,可能某个州的网络导致很高的延迟,如果你是processing time分块,分块的数据已经不能代表当时的情况了,可能是新旧数据的混合。

    我们希望这两个例子都是用event time分块,保证逻辑的正确性。

    ** Event time分块 **

    Event time分块能让我们观察到上游数据在事件发生时的情况,这是最正确的方法。但是,大多数数据处理系统都没能从语义很好的支持它,虽然我们知道任何强一致性的系统(Hadoop或者Spark Streaming)经过一些修改是能解决的。(译者:作者说从语义上支持就是系统能保证按Processing time分块,我常用的工具都没有这样的语义)

    下图显示了一个按event time分成一小时的块:

    白色的线代表事件的processing time不同于event time。毋容置疑,event time分块一定能保证event time正确性。

    另一个好处是你可以创建动态大小的块,比如session,但是不存在session跨越多个固定的块。

    当然,任何事情都是有代价的,包括event time分块。它有两个很明显的缺点,因为块必须比实际长度存活更久:

    • Buffering :更长的生命要求我们保存更多的数据。很幸运,现在持久化已经是数据处理系统中最便宜的资源了,相对于CPU,网络带宽和内存来说,因此buffer不是太大的问题,至少比设计强一致性存储和内存cache要容易。很多聚合操作不需要保存整个输入,比如和或者均值。
    • Completeness 完整性 :我们都没有好方法知道所有数据都到了,我们如何知道何时发布结果?而且我们没法简单计算“何时”。(译者:发布数据是指让下游感知,比如把数据结果更细到DB)对于大部分情况,系统能使用类似MillWheel的watermarks去可靠地预测数据是否完整(我会在下一章讲)。但是在某些情况下,正确性非常重要,想想计费系统,我们唯一的方法是提供一个方法让系统自己能够控制什么时候发布数据,并且让系统自己能反复修改最终数据。完整性是一个非常有趣的话题,最好能用一些强有力的例子来说明,我会在下一篇分享。

    总结

    这篇包含太多信息了。如果你读到这里,你应该受到表扬,我们已经完成了一半了,让我们回顾下我们讲了什么,并且知道下一篇讲什么。让人高兴的是这篇无聊些,但是下一篇一定有趣。

    回顾

    • 澄清术语,特别是streaming的定义只限于执行引擎,而将其他术语,比如unbounded data和近似算法都放在streaming的概念下。
    • 分析了设计正确的batch和streaming系统,总结出 streaming是batch的功能超集 ,Lambda架构最终会被streaming取代。
    • 提出两个重要的概念,能够帮助streaming追赶并超越batch: 完整性 和 时间工具
    • 列出了 event time和processing time 的关系,并且指出两个时间给我们带来的困难,根据完整性的概念,提出系统应该能够拥抱在时间上的变化,提供完整精确的结果。
    • 分析了常用数据处理方法,包括bounded和unbounded数据,主要是batch和streaming引擎,并且把unbounded数据处理分成4类: time-agnostic, approximation, windowing by processing time, and windowing by event time 。

    下一篇

    • 从更高层面分析 数据处理 的概念,主要从4个方面入手: what, where, when, and how 。
    • 详细分析如何用Dataflow Model来完成各种不同的需求。他讲帮助你更彻底的理解啥是event time和processing time,也包括一个新的概念:watermarks
    • 比较现有数据处理系统,特别是一些最要的特性,让我们更好的选择他们,并且鼓励大家改善他们,帮助我实现我的最终目标:让streaming成为大数据处理的最好形式。
    展开全文
  • 一、为何要有StructuredStreaming 二、StructuredStreaming的特性 1、结构化流式处理 2、基于Event-Time聚合&amp;延迟数据处理 3、容错性 Structured Streaming是Spark新提出的一种实时流的框架,以前是...
  • Java流(Stream)简介

    千次阅读 2017-11-24 11:54:39
    要讨论流,我们先来谈谈集合,这是最容易上手的方式了。 Java 8中的集合支持一个新的stream方法,它会返回一个流(接口定义在java.util.stream.Stream里)。那么, 流到底是什么呢?简短的定义就是“从支持数据处理...
  • 干货 | Spark Streaming 和 Flink 详细对比

    千次阅读 2018-08-11 00:39:08
    原文详见:https://mp.weixin.qq.com/s/Fb1cW0oN7xYeb1oI2ixtgQ
  • spark streaming实时分析处理时,处理的数据可能会出现重复,需要根据唯一的key进行处理,谁知道怎么处理
  • spark.streaming.kafka.maxRatePerPartition这个参数是控制吞吐量的,一般和spark.streaming.backpressure.enabled=true一起使用。那么应该怎么算这个值呢。 如例要10分钟的吞吐量控制在5000,0000,kafka分区是10个...
  • https://blog.csdn.net/ntk1986/article/details/80755888
  • sparkstreaming

    万次阅读 2017-12-11 11:12:14
    Spark Streaming实时计算框架介绍   随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐、用户行为分析等。 Spark ...
  • Spark Streaming与Storm的对比分析

    千次阅读 2015-12-14 16:39:28
    Spark Streaming与Storm的对比分析
  • Unity中StreamingAssets文件夹中的资源,在打包时会原封不动的包含到包体中,如图我在StreamingAssets文件夹的资源:当我打包apk后,用解压软件查看apk的内部资源,会在assets文件夹下找到StreamingAssets中的资源。...
  • 使用springboot架构在通过feign调用方法的时候报错:cannot retry due to redirection, in streaming mode,通过swagger测试所有的get delete请求都可以跑通,只有post put带body的请求会出现500的异常: ...
  • 30分钟概览Spark Streaming 实时计算

    万次阅读 2019-09-08 15:24:09
    什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark Streaming相对其他实时计算框架该如何技术选型? 本文主要针对初学者,如果有不明白的概念...
  • 出现该现象即为你的纹理池不够用了,增加纹理池的大小...你需要在 \Engine\Config\ConsoleVariables.ini 里添加 “r.Streaming.PoolSize=2000” ,保存重启引擎就可以 或者在项目设置里关掉texture streaming
  • Darwin Streaming Server 安装流程

    万次阅读 2013-11-25 23:51:36
    Darwin StreamingServer 安装流程 Darwin StreamingServer 支持开放源代码和基于标准的实时传输协议/实时流协议(RTP / RTSP)、MPEG-4 和MP3 流协议。 一、安装前的准备 Darwin StreamingServer 的下载 Quicktime ...
  • spark 参数调优11-Spark Streaming

    千次阅读 2018-09-06 15:56:23
    spark参数调优系列 目录地址: ...   11 Spark Streaming spark.streaming.backpressure.enabled 反压,默认false,详细了解请移步https://blog.csdn.net/zyzzxycj/article/detai...
  • spark.streaming.concurrentJobs参数分析

    千次阅读 2016-04-14 16:30:40
    最近,在spark streaming 调优时,发现个增加job并行度的参数spark.streaming.concurrentJobs,spark 默认值为1,当增加为2时(在spark-default中配置),如遇到处理速度慢 streaming application UI 中会有两个...
  • NoClassDefFoundError: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
1 2 3 4 5 ... 20
收藏数 103,932
精华内容 41,572
关键字:

streaming