精华内容
下载资源
问答
  • SparkStreaming之checkpoint机制使用讲解
    千次阅读
    2019-04-09 23:59:37

    一 什么类型的数据需要使用checkpoint?

    Spark Streaming是最需要进行容错的,因为一般都是7 * 24小时运转,所以需要将足够的信息checkpoint到容错的存储系统上,比如HDFS,从而能够让他从失败中进行恢复。有两种数据需要被进行checkpoint:

    1、元数据checkpoint:

    # 配置信息:创建spark streaming应用程序的配置信息,比如SparkConf中的信息

    # DStream的操作信息:定义了应用程序计算逻辑的DStream操作信息

    # 未处理的batch信息:那些job正在排队,还没处理的batch信息

    2、数据checkpoint

    将实时计算过程中产生的RDD的数据保存到可靠的存储系统之中。对于一些将多个batch的数据进行聚合的,有状态的转换操作,这是很有用的,在这种操作中,生成的RDD是依赖于之前的batch的RDD的,这会导致随着时间的推移,RDD的依赖链条变得越来越长。

    要避免由于依赖链条越拉越长,导致的一起变得越来越长的失败恢复时间,有状态的转换操作执行过程中产生的RDD,会定期的被checkpoint到可靠的存储系统上,比如HDFS.从而消减RDD的依赖链条,进行而缩短失败恢复时候的RDD恢复时间。

    所以:元数据checkpoint主要是为了从driver失败中恢复过来;而RDD checkpoint主要是为了,使用到有状态的转换操作的时候,能够在其生产出的数据丢失时进行快速的恢复。

    二 什么时候启用checkpoint机制?

    2.1 使用了有状态的转换操作

    比如updateStateByKey或者reduceByKeyAndWindow操作

    2.2 要保证可以从driver失败中进行恢复

    比如元数据的checkpoint需要启用

    当然如果不是必须要从driver失败中恢复或者没有使用到转换操作,那么也就无需启用checkpoint,这样反而有助于提升性能

    三 如何启用checkpoint机制?

    3.1 对于有状态的转换操作,启用checkpoint机制,是比较简单的,定期将其产生的RDD数据checkpoint。可以通过配置容错文件系统,比如HDFS的目录,来启用checkpoint机制,checkpoint数据就会写入该目录。

    3.2 如果为了要从driver失败中恢复,那么启用checkpoint机制是比较复杂的。需要改写spark streaming应用程序

    第一步:

    当应用程序第一次启动的时候,需要创建一个StreamingContext,并且调用其start方法进行启动。当driver从失败中恢复过来时,需要从checkpoint目录记录的元数据中恢复出来一个StreamingContext。

    如下代码所示:

    val checkpointDir = "hdfs://hdfs-cluster/user/spark/chkdir01"

    def createContext():StreamingContext = {

    val conf = new SparkConf().setAppName("Driver Checkpoint").setMaster("local[*]")

    val ssc = new StreamingContext(conf,Seconds(2))

    val hostname = "hadoop-all-01"

    val port = 9999;

    val lines = ssc.socketTextStream(hostname,port)

    ssc.checkpoint(checkpointDir)

    ssc

    }

    val context = StreamingContext.getOrCreate(checkpointDir,createContext)

    context.start()

    context.awaitTermination()

    第二步:

    必须确保Driver可以在失败时,自动重启。要是能够从Driver失败中恢复过来,运行spark streaming应用程序的集群,就必须监控driver的运行的过程,并且在它失败的时候将它重启,对于standalone需要配置supervise driver,在它失败时将其重启

    在spark-submit中,添加--deploy-mode参数,默认值是client,即在提交应用程序的机器上启动driver,但是要能够重启driver就必须设置为cluster,此外需要添加--supervise参数

    ref:https://www.2cto.com/net/201711/697288.html

    更多相关内容
  • 目录1 业务场景2 初始化环境2.1 创建 Topic2.2 模拟日志数据2.3 StreamingContextUtils 工具类3 实时数据ETL存储4 实时状态更新统计4.1 updateStateByKey 函数4.2 mapWithState 函数5 实时窗口统计 1 业务场景 百度...


    1 业务场景

    百度搜索风云榜(http://top.baidu.com/)以数亿网民的单日搜索行为作为数据基础,以搜索关键词为统计对象建立权威全面的各类关键词排行榜,以榜单形式向用户呈现基于百度海量搜索数据的排行信息,线上覆盖十余个行业类别,一百多个榜单在这里插入图片描述
    在【热点榜单】中,可以看到依据搜索关键词实时统计各种维度热点,下图展示【实时热点】。在这里插入图片描述
    仿【百度搜索风云榜】对用户使用百度搜索时日志进行分析:【百度搜索日志实时分析】,主要业务需求如下三个方面:在这里插入图片描述

    • 业务一:搜索日志数据存储HDFS,实时对日志数据进行ETL提取转换,存储HDFS文件系统;
    • 业务二:百度热搜排行榜Top10,累加统计所有用户搜索词次数,获取Top10搜索词及次数;
    • 业务三:近期时间内热搜Top10,统计最近一段时间范围(比如,最近半个小时或最近2个小时)
      内用户搜索词次数,获取Top10搜索词及次数;
      开发Maven Project中目录结构如下所示:
      在这里插入图片描述

    2 初始化环境

    编程实现业务之前,首先编写程序模拟产生用户使用百度搜索产生日志数据和创建工具类StreamingContextUtils提供StreamingContext对象与从Kafka接收数据方法。

    2.1 创建 Topic

    启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:

    # 1. 启动Zookeeper 服务
    zookeeper-daemon.sh start
    # 2. 启动Kafka 服务
    kafka-daemon.sh start
    # 3. Create Topic
    kafka-topics.sh --create --topic search-log-topic \
    --partitions 3 --replication-factor 1 --zookeeper node1.oldlut.cn:2181/kafka200
    # List Topics
    kafka-topics.sh --list --zookeeper node1.oldlut.cn:2181/kafka200
    # Producer
    kafka-console-producer.sh --topic search-log-topic --broker-list node1.oldlut.cn:9092
    # Consumer
    kafka-console-consumer.sh --topic search-log-topic \
    --bootstrap-server node1.oldlut.cn:9092 --from-beginning
    

    2.2 模拟日志数据

    模拟用户搜索日志数据,字段信息封装到CaseClass样例类【SearchLog】类,代码如下:

    package cn.oldlut.spark.app.mock
    
    /**
     * 用户百度搜索时日志数据封装样例类CaseClass
     * <p>
     *
     * @param sessionId 会话ID
     * @param ip        IP地址
     * @param datetime  搜索日期时间
     * @param keyword   搜索关键词
     */
    case class SearchLog(
                          sessionId: String, //
                          ip: String, //
                          datetime: String, //
                          keyword: String //
                        ) {
      override def toString: String = s"$sessionId,$ip,$datetime,$keyword"
    }
    
    `` `
    模拟产生搜索日志数据类 【 MockSearchLogs 】 具体代码如下 :
    `` `
    package cn.oldlut.spark.app.mock
    
    import java.util.{Properties, UUID}
    import org.apache.commons.lang3.time.FastDateFormat
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import org.apache.kafka.common.serialization.StringSerializer
    import scala.util.Random
    
    /**
     * 模拟产生用户使用百度搜索引擎时,搜索查询日志数据,包含字段为:
     * uid, ip, search_datetime, search_keyword
     */
    object MockSearchLogs {
      def main(args: Array[String]): Unit = {
        // 搜索关键词,直接到百度热搜榜获取即可
        val keywords: Array[String] = Array("罗志祥", "谭卓疑", "当当网", "裸海蝶", "张建国")
        // 发送Kafka Topic
        val props = new Properties()
        props.put("bootstrap.servers", "node1.oldlut.cn:9092")
        props.put("acks", "1")
        props.put("retries", "3")
        props.put("key.serializer", classOf[StringSerializer].getName)
        props.put("value.serializer", classOf[StringSerializer].getName)
        val producer = new KafkaProducer[String, String](props)
        val random: Random = new Random()
        while (true) {
          // 随机产生一条搜索查询日志
          val searchLog: SearchLog = SearchLog(
            getUserId(), //
            getRandomIp(), //
            getCurrentDateTime(), //
            keywords(random.nextInt(keywords.length)) //
          )
          println(searchLog.toString)
          Thread.sleep(10 + random.nextInt(100))
          val record = new ProducerRecord[String, String]("search-log-topic", searchLog.toString)
          producer.send(record)
        }
        // 关闭连接
        producer.close()
      }
    
      /**
       * 随机生成用户SessionId
       */
      def getUserId(): String = {
        val uuid: String = UUID.randomUUID().toString
        uuid.replaceAll("-", "").substring(16)
      }
    
      /**
       * 获取当前日期时间,格式为yyyyMMddHHmmssSSS
       */
      def getCurrentDateTime(): String = {
        val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")
        val nowDateTime: Long = System.currentTimeMillis()
        format.format(nowDateTime)
      }
    
      /**
       * 获取随机IP地址
       */
      def getRandomIp(): String = {
        // ip范围
        val range: Array[(Int, Int)] = Array(
          (607649792, 608174079), //36.56.0.0-36.63.255.255
          (1038614528, 1039007743), //61.232.0.0-61.237.255.255
          (1783627776, 1784676351), //106.80.0.0-106.95.255.255
          (2035023872, 2035154943), //121.76.0.0-121.77.255.255
          (2078801920, 2079064063), //123.232.0.0-123.235.255.255
          (-1950089216, -1948778497), //139.196.0.0-139.215.255.255
          (-1425539072, -1425014785), //171.8.0.0-171.15.255.255
          (-1236271104, -1235419137), //182.80.0.0-182.92.255.255
          (-770113536, -768606209), //210.25.0.0-210.47.255.255
          (-569376768, -564133889) //222.16.0.0-222.95.255.255
        )
        // 随机数:IP地址范围下标
        val random = new Random()
        val index = random.nextInt(10)
        val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
        //println(s"ipNumber = ${ipNumber}")
        // 转换Int类型IP地址为IPv4格式
        number2IpString(ipNumber)
      }
    
      /**
       * 将Int类型IPv4地址转换为字符串类型
       */
      def number2IpString(ip: Int): String = {
        val buffer: Array[Int] = new Array[Int](4)
        buffer(0) = (ip >> 24) & 0xff
        buffer(1) = (ip >> 16) & 0xff
        buffer(2) = (ip >> 8) & 0xff
        buffer(3) = ip & 0xff
        // 返回IPv4地址
        buffer.mkString(".")
      }
    }
    

    运行应用程序,源源不断产生日志数据,发送至Kafka(同时在控制台打印),截图如下:在这里插入图片描述

    2.3 StreamingContextUtils 工具类

    所有SparkStreaming应用都需要构建StreamingContext实例对象,并且从采用New KafkaConsumer API消费Kafka数据,编写工具类【StreamingContextUtils】,提供两个方法:

    • 方法一:getStreamingContext,获取StreamingContext实例对象在这里插入图片描述
    • 方法二:consumerKafka,消费Kafka Topic中数据在这里插入图片描述
      具体代码如下:
    package cn.oldlut.spark.app
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     * 工具类提供:构建流式应用上下文StreamingContext实例对象和从Kafka Topic消费数据
     */
    object StreamingContextUtils {
      /**
       * 获取StreamingContext实例,传递批处理时间间隔
       *
       * @param batchInterval 批处理时间间隔,单位为秒
       */
      def getStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {
        // i. 创建SparkConf对象,设置应用配置信息
        val sparkConf = new SparkConf()
          .setAppName(clazz.getSimpleName.stripSuffix("$"))
          .setMaster("local[3]")
          // 设置Kryo序列化
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .registerKryoClasses(Array(classOf[ConsumerRecord[String, String]]))
        // ii.创建流式上下文对象, 传递SparkConf对象和时间间隔
        val context = new StreamingContext(sparkConf, Seconds(batchInterval))
        // iii. 返回
        context
      }
    
      /**
       * 从指定的Kafka Topic中消费数据,默认从最新偏移量(largest)开始消费
       *
       * @param ssc       StreamingContext实例对象
       * @param topicName 消费Kafka中Topic名称
       */
      def consumerKafka(ssc: StreamingContext, topicName: String): DStream[ConsumerRecord[String, String]] = {
        // i.位置策略
        val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
        // ii.读取哪些Topic数据
        val topics = Array(topicName)
        // iii.消费Kafka 数据配置参数
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "node1.oldlut.cn:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "group_id_streaming_0001",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        // iv.消费数据策略
        val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
          topics, kafkaParams
        )
        // v.采用新消费者API获取数据,类似于Direct方式
        val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
          ssc, locationStrategy, consumerStrategy
        )
        // vi.返回DStream
        kafkaDStream
      }
    }
    

    3 实时数据ETL存储

    实时从Kafka Topic消费数据,提取ip地址字段,调用【ip2Region】库解析为省份和城市,存储到HDFS文件中,设置批处理时间间隔BatchInterval为10秒,完整代码如下:

    package cn.oldlut.spark.app.etl
    
    import cn.oldlut.spark.app.StreamingContextUtils
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.DStream
    import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}
    
    /**
     * 实时消费Kafka Topic数据,经过ETL(过滤、转换)后,保存至HDFS文件系统中,BatchInterval为:10s
     */
    object StreamingETLHdfs {
      def main(args: Array[String]): Unit = {
        // 1. 获取StreamingContext实例对象
        val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 10)
        // 2. 从Kafka消费数据,使用Kafka New Consumer API
        val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
          .consumerKafka(ssc, "search-log-topic")
        // 3. 数据ETL:过滤不合格数据及转换IP地址为省份和城市,并存储HDFS上
        kafkaDStream.foreachRDD { (rdd, time) =>
          // i. message不为null,且分割为4个字段
          val kafkaRDD: RDD[ConsumerRecord[String, String]] = rdd.filter { record =>
            val message: String = record.value()
            null != message && message.trim.split(",").length == 4
          }
          // ii. 解析IP地址
          val etlRDD: RDD[String] = kafkaRDD.mapPartitions { iter =>
            // 创建DbSearcher对象,针对每个分区创建一个,并不是每条数据创建一个
            val dbSearcher = new DbSearcher(new DbConfig(), "dataset/ip2region.db")
            iter.map { record =>
              val Array(_, ip, _, _) = record.value().split(",")
              // 依据IP地址解析
              val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)
              val region: String = dataBlock.getRegion
              val Array(_, _, province, city, _) = region.split("\\|")
              // 组合字符串
              s"${record.value()},$province,$city"
            }
          }
          // iii. 保存至文件
          val savePath = s"datas/streaming/etl/search-log-${time.milliseconds}"
          if (!etlRDD.isEmpty()) {
            etlRDD.coalesce(1).saveAsTextFile(savePath)
          }
        }
        // 4.启动流式应用,一直运行,直到程序手动关闭或异常终止
        ssc.start()
        ssc.awaitTermination()
        ssc.stop(stopSparkContext = true, stopGracefully = true)
      }
    }
    

    运行模拟日志数据程序和ETL应用程序,查看实时数据ETL后保存文件,截图如下:在这里插入图片描述

    4 实时状态更新统计

    实 时 累 加 统 计 用 户 各 个 搜 索 词 出 现 的 次 数 , 在 SparkStreaming 中 提 供 函 数【updateStateByKey】实现累加统计,Spark 1.6提供【mapWithState】函数状态统计,性能更好,实际应用中也推荐使用。

    4.1 updateStateByKey 函数

    状态更新函数【updateStateByKey】表示依据Key更新状态,要求DStream中数据类型为【Key/Value】对二元组,函数声明如下:
    在这里插入图片描述
    将每批次数据状态,按照Key与以前状态,使用定义函数【updateFunc】进行更新,示意图如下:在这里插入图片描述

    文档: http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#updatestatebykey-operation
    针对搜索词词频统计WordCount,状态更新逻辑示意图如下:在这里插入图片描述
    以前的状态数据,保存到Checkpoint检查点目录中,所以在代码中需要设置Checkpoint检查点目录:在这里插入图片描述
    完整演示代码如下:

    package cn.oldlut.spark.app.state
    
    import cn.oldlut.spark.app.StreamingContextUtils
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.DStream
    
    /**
     * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
     */
    object StreamingUpdateState {
      def main(args: Array[String]): Unit = {
        // 1. 获取StreamingContext实例对象
        val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)
        // TODO: 设置检查点目录
        ssc.checkpoint(s"datas/streaming/state-${System.nanoTime()}")
        // 2. 从Kafka消费数据,使用Kafka New Consumer API
        val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
          .consumerKafka(ssc, "search-log-topic")
        // 3. 对每批次的数据进行搜索词次数统计
        val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
          val reduceRDD = rdd
            // 过滤不合格的数据
            .filter { record =>
              val message: String = record.value()
              null != message && message.trim.split(",").length == 4
            }
            // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
            .map { record =>
              val keyword: String = record.value().trim.split(",").last
              keyword -> 1
            }
            // 按照单词分组,聚合统计
            .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
          reduceRDD // 返回
        }
        /*
        def updateStateByKey[S: ClassTag](
        // 状态更新函数
        updateFunc: (Seq[V], Option[S]) => Option[S]
        ): DStream[(K, S)]
        第一个参数:Seq[V]
        表示的是相同Key的所有Value值
        第二个参数:Option[S]
        表示的是Key的以前状态,可能有值Some,可能没值None,使用Option封装
        S泛型,具体类型有业务具体,此处是词频:Int类型
        */
        val stateDStream: DStream[(String, Int)] = reduceDStream.updateStateByKey(
          (values: Seq[Int], state: Option[Int]) => {
            // a. 获取以前状态信息
            val previousState = state.getOrElse(0)
            // b. 获取当前批次中Key对应状态
            val currentState = values.sum
            // c. 合并状态
            val latestState = previousState + currentState
            // d. 返回最新状态
            Some(latestState)
          }
        )
        // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
        stateDStream.print()
        // 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
        ssc.start()
        ssc.awaitTermination()
        ssc.stop(stopSparkContext = true, stopGracefully = true)
      }
    }
    

    运行应用程序,通过WEB UI界面可以发现,将以前状态保存到Checkpoint检查点目录中,更新时在读取。在这里插入图片描述
    此外,updateStateByKey函数有很多重载方法,依据不同业务需求选择合适的方式使用。

    4.2 mapWithState 函数

    Spark 1.6提供新的状态更新函数【mapWithState】,mapWithState函数也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。在这里插入图片描述
    这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高;在这里插入图片描述
    需要构建StateSpec对象,对状态State进行封装,可以进行相关操作,类的声明定义如下:在这里插入图片描述
    状态函数【mapWithState】参数相关说明:在这里插入图片描述
    完整演示代码如下:

    package cn.oldlut.spark.app.state
    
    import cn.oldlut.spark.app.StreamingContextUtils
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{State, StateSpec, StreamingContext}
    import org.apache.spark.streaming.dstream.DStream
    
    /**
     * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
     */
    object StreamingMapWithState {
      def main(args: Array[String]): Unit = {
        // 1. 获取StreamingContext实例对象
        val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)
        // TODO: 设置检查点目录
        ssc.checkpoint(s"datas/streaming/state-${System.nanoTime()}")
        // 2. 从Kafka消费数据,使用Kafka New Consumer API
        val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
          .consumerKafka(ssc, "search-log-topic")
        // 3. 对每批次的数据进行搜索词进行次数统计
        val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
          val reduceRDD: RDD[(String, Int)] = rdd
            // 过滤不合格的数据
            .filter { record =>
              val message: String = record.value()
              null != message && message.trim.split(",").length == 4
            }
            // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
            .map { record =>
              val keyword: String = record.value().trim.split(",").last
              keyword -> 1
            }
            // 按照单词分组,聚合统计
            .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
          // 返回
          reduceRDD
        }
        // TODO: 4、实时累加统计搜索词搜索次数,使用mapWithState函数
        /*
        按照Key来更新状态的,一条一条数据的更新状态
        def mapWithState[StateType: ClassTag, MappedType: ClassTag](
        spec: StateSpec[K, V, StateType, MappedType]
        ): MapWithStateDStream[K, V, StateType, MappedType]
        a. 通过函数源码发现参数使用对象
        StateSpec 实例对象
        b. StateSpec
        表示对状态封装,里面涉及到相关数据类型
        c. 如何构建StateSpec对象实例呢??
        StateSpec 伴生对象中function函数构建对象
        def function[KeyType, ValueType, StateType, MappedType](
        // 从函数名称可知,针对每条数据更新Key的转态信息
        mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
        ): StateSpec[KeyType, ValueType, StateType, MappedType]
        */
        // 状态更新函数,针对每条数据进行更新状态
        val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
          // (KeyType, Option[ValueType], State[StateType]) => MappedType
          (keyword: String, countOption: Option[Int], state: State[Int]) => {
            // a. 获取当前批次中搜索词搜索次数
            val currentState: Int = countOption.getOrElse(0)
            // b. 从以前状态中获取搜索词搜索次数
            val previousState = state.getOption().getOrElse(0)
            // c. 搜索词总的搜索次数
            val latestState = currentState + previousState
            // d. 更行状态
            state.update(latestState)
            // e. 返回最新省份销售订单额
            (keyword, latestState)
          }
        )
        // 调用mapWithState函数进行实时累加状态统计
        val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
        // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
        stateDStream.print()
        // 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
        ssc.start()
        ssc.awaitTermination()
        ssc.stop(stopSparkContext = true, stopGracefully = true)
      }
    }
    

    运行程序可以发现,当Key(搜索单词)没有出现时,不会更新状态,仅仅更新当前批次中出现的Key的状态。
    mapWithState 实现有状态管理主要是通过两点:a)、历史状态需要在内存中维护,这里必需的了,updateStateBykey也是一样;b)、自定义更新状态的mappingFunction,这些就是具体的业务功能实现逻辑了(什么时候需要更新状态)在这里插入图片描述
    首先数据像水流一样从左侧的箭头流入,把mapWithState看成一个转换器的话,mappingFunc就是转换的规则,流入的新数据(key-value)结合历史状态(通过key从内存中获取的历史状态)进行一些自定义逻辑的更新等操作,最终从红色箭头中流出。

    5 实时窗口统计

    SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档:

    http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#window-operations
    

    在实际项目中,很多时候需求:每隔一段时间统计最近数据状态,并不是对所有数据进行统计,称为趋势统计或者窗口统计,SparkStreaming中提供相关函数实现功能,业务逻辑如下:在这里插入图片描述
    针对用户百度搜索日志数据,实现【近期时间内热搜Top10】,统计最近一段时间范围(比如,最近半个小时或最近2个小时)内用户搜索词次数,获取Top10搜索词及次数。窗口函数【window】声明如下,包含两个参数:窗口大小(WindowInterval,每次统计数据范围)和滑动大小(每隔多久统计一次),都必须是批处理时间间隔BatchInterval整数倍。在这里插入图片描述
    案例完整实现代码如下,为了演示方便,假设BatchInterval为2秒,WindowInterval
    为4秒,SlideInterval为2秒。

    package cn.oldlut.spark.app.window
    
    import cn.oldlut.spark.app.StreamingContextUtils
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     * 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数
     * 批处理时间间隔:BatchInterval = 2s
     * 窗口大小间隔:WindowInterval = 4s
     * 滑动大小间隔:SliderInterval = 2s
     */
    object StreamingWindow {
      def main(args: Array[String]): Unit = {
        // Streaming应用BatchInterval
        val BATCH_INTERVAL: Int = 2
        // Streaming应用窗口大小
        val WINDOW_INTERVAL: Int = BATCH_INTERVAL * 2
        val SLIDER_INTERVAL: Int = BATCH_INTERVAL * 1
        // 1. 获取StreamingContext实例对象
        val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, BATCH_INTERVAL)
        // 2. 从Kafka消费数据,使用Kafka New Consumer API
        val kafkaDStream: DStream[String] = StreamingContextUtils
          .consumerKafka(ssc, "search-log-topic")
          .map(record => record.value())
        // TODO: 添加窗口,设置对应参数
        /*
        def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
        警告信息:
        ERROR KafkaRDD: Kafka ConsumerRecord is not serializable.
        Use .map to extract fields before calling .persist or .window
        */
        val windowDStream: DStream[String] = kafkaDStream.window(
          Seconds(WINDOW_INTERVAL), Seconds(SLIDER_INTERVAL)
        )
        // 4. 对每批次的数据进行搜索词进行次数统计
        val countDStream: DStream[(String, Int)] = windowDStream.transform { rdd =>
          val resultRDD = rdd
            // 过滤不合格的数据
            .filter(message => null != message && message.trim.split(",").length == 4)
            // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
            .map { message =>
              val keyword: String = message.trim.split(",").last
              keyword -> 1
            }
            // 按照单词分组,聚合统计
            .reduceByKey((tmp, item) => tmp + item)
          // 返回
          resultRDD
        }
        // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
        countDStream.print()
        // 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
        ssc.start()
        ssc.awaitTermination()
        ssc.stop(stopSparkContext = true, stopGracefully = true)
      }
    }
    

    SparkStreaming中同时提供将窗口Window设置与聚合reduceByKey合在一起的函数,为了更加方便编程。在这里插入图片描述
    使用【reduceByKeyAndWindow】函数,修改上述代码,实现窗口统计,具体代码如下:

    package cn.oldlut.spark.app.window
    
    import cn.oldlut.spark.app.StreamingContextUtils
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     * 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数
     * 批处理时间间隔:BatchInterval = 2s
     * 窗口大小间隔:WindowInterval = 4s
     * 滑动大小间隔:SliderInterval = 2s
     */
    object StreamingReduceWindow {
      def main(args: Array[String]): Unit = {
        // Streaming应用BatchInterval
        val BATCH_INTERVAL: Int = 2
        // Streaming应用窗口大小
        val WINDOW_INTERVAL: Int = BATCH_INTERVAL * 2
        val SLIDER_INTERVAL: Int = BATCH_INTERVAL * 1
        // 1. 获取StreamingContext实例对象
        val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, BATCH_INTERVAL)
        // 2. 从Kafka消费数据,使用Kafka New Consumer API
        val kafkaDStream: DStream[String] = StreamingContextUtils
          .consumerKafka(ssc, "search-log-topic")
          .map(recored => recored.value())
        // 3. 对每批次的数据进行搜索词进行次数统计
        val etlDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
          val etlRDD = rdd
            // 过滤不合格的数据
            .filter(message => null != message && message.trim.split(",").length == 4)
            // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
            .map { message =>
              val keyword: String = message.trim.split(",").last
              keyword -> 1
            }
          etlRDD // 返回
        }
        // 4. 对获取流式数据进行ETL后,使用窗口聚合函数统计计算
        /*
        def reduceByKeyAndWindow(
        reduceFunc: (V, V) => V, // 聚合函数
        windowDuration: Duration, // 窗口大小
        slideDuration: Duration // 滑动大小
        ): DStream[(K, V)]
        */
        val resultDStream: DStream[(String, Int)] = etlDStream.reduceByKeyAndWindow(
          (tmp: Int, value: Int) => tmp + value, //
          Seconds(WINDOW_INTERVAL), //
          Seconds(SLIDER_INTERVAL) //
        )
        // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
        resultDStream.print()
        // 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
        ssc.start()
        ssc.awaitTermination()
        ssc.stop(stopSparkContext = true, stopGracefully = true)
      }
    }
    
    展开全文
  • 一、Spark Streaming处理框架: Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,可以使用诸如map、reduce、join等高级函数进行复杂算法的处理。...二、SparkStreaming实时任务如何开发? 1. ...

    一、Spark Streaming处理框架:

    Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,可以使用诸如map、reduce、join等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,处理结果保存到HDFS,数据库等。

    二、SparkStreaming实时任务如何开发?

    1. 数据的输入
    1.1 socket(测试开发的时候使用起来很方便。)
    1.2 HDFS(使用得很少)
    1.3 Flume(也是很少)
    1.4 自定义数据源(用得很少,我们公司里面没有出现过,但是不代表没有用。)
    1.5 Kafka   真正企业里面使用的是kafka
    2. 数据的处理:
    企业里面怎么用?
    2.1 RDD的那些算子
    2.2 transform
    2.3 updateStateByKey
    2.4 mapWithState
    2.5 Window窗口的计算
    3. 数据的输出
    3.1 print(测试的时候使用)
    3.2 foreachRDD(允许用户对Dstream每一批数据对应的RDD本身做任意操作,企业里面也是使用的这个api)
    这个就是真正项目上线的时候需要使用的API。
    存入kafka,mysql,codis,reids,hbase
    比如公司里面上班:
    电梯:批处理,或者说是离线处理。
    离线,数据量大
    商场里面购物:
    扶梯:实时处理,处理的是流数据
    实时,每次处理的 数据量不大

    三、spark组件类比:

    SparkCore:核心计算引擎
    1. 核心的抽象 RDD
    2. 程序的入口
    val conf=new SparkConf
    val sc=new SparkContext(conf)
    后面无非就是一些算子对RDD进行各种操作。
    SparkStreaming
    1. 核心的抽象 DStream(一个DStream包括多个RDD,加了时间维度(隔一定时间执行一套RDD),不同时间RDD变换)
    2. 程序的入口
    val conf=new SparkConf()
    val ssc=new StremaingContext(conf,Seoncdss(1))
    SparkSQL:
    1. 核心的抽象 DataFrame/DataSet
    2. 程序的入口
    spark1.x:    val sqlContext=new SQLContext(conf)
    spark2.x:      val spark=SparkSessionxxx
    后面的操作无非就是对dataFream/dataset进行各种算子的操作

    三、Sparkstreaming架构:

    – Client:负责向Spark Streaming中灌入数据(flume kafka)

    • 整个架构由3个模块组成:

    – Master:记录Dstream之间的依赖关系或者血缘关系,并负责任务调度以生成新的RDD

    Worker:①从网络接收数据并存储到内存中  ②执行RDD计算

    spark中driver=AM , executor=worker节点

    四、SparkStreaming作业提交

    • Network Input Tracker:跟踪每一个网络received数据,并且将其映射到相应的input Dstream上

    • Job Scheduler:周期性的访问DStream Graph并生成Spark Job,将其交给Job Manager执行

    • Job Manager:获取任务队列,并执行Spark任务

    五、SparkStreaming窗口操作

    Spark提供了一组 窗口操作,通过滑动窗口技术对大规模数据的增量更新进行统计分析

    • Window Operation: 定时进行一定时间段内的数据处理(上图time 3 4 5  每个2秒,一共6秒)

    • 任何基于窗口操作需要指定两个参数:

    – 窗口总长度(window length)10s

    – 滑动时间间隔(slide interval)  2s

    执行代码前先启动nc -lk 9999

    执行代码:整个窗口长度10s,每2秒打印一次
    改代码:
    1.上代码改为seconds(10),second(3),报错,必须为scc seconds的整数倍
    2.改为seconds(9),second(2)也出错,如下图,也必须为上scc seconds整数倍

    六、Sparkstreaming全局统计

    • 如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制

    • 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份

    • 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在内存数据丢失的时候,可以从checkpoint中恢复数据

    左10s统计结果,右再过2s统计结果,最后 全局合并统计updatestatebykey,要开启checkpoint且先nc -lp 9999

    再输入7个a后

    Sparkstreaming容错性分析

    (RDD容错靠血缘关系DAG,sparkstreaming靠WAL)

    实时的流式处理系统必须是7*24运行的,同时可以从各种各样的系统错误中恢复,在设计之初,Spark Streaming就支持driver和worker节点的错误恢复。

    1. Worker容错:spark和rdd的保证worker节点的容错性。spark streaming构建在spark之上,所以它的worker节点也是同样的容错机制

    •2.Driver容错:依赖WAL(WriteAheadLog)持久化日志

    – 启动WAL需要做如下的配置

    – 1:给streamingContext设置checkpoint的目录,该目录必须是HADOOP支持的文件系统hdfs,用来保存WAL和

    做Streaming的checkpoint

    – 2:spark.streaming.receiver.writeAheadLog.enable 设置为truereceiver才有WAL

    Sparkstreaming中WAL简介

    • Spark应用分布式运行的,如果driver进程挂了,所有的executor进程将不可用,保存在这些进程所

    持有内存中的数据将会丢失。为了避免这些数据的丢失,Spark Streaming中引入了一个WAL.

    • WAL在文件系统和数据库中用于数据操作的持久化,先把数据写到一个持久化的日志中,然后对数

    据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。

    • 如果WAL 启用了,所有接收到的数据会保存到一个日志文件中去(HDFS), 这样保存接收数据的持

    久性,此外,如果只有在数据写入到log中之后接收器才向数据源确认,这样drive重启后那些保存在

    内存中但是没有写入到log中的数据将会重新发送,这两点保证的数据的无丢失。

    WAL工作原理

    driver=AM  ,  executor=worker节点           
    block+文件数据(代码)WAL   两部分结合

    1. 蓝色的箭头表示接收的数据:

    – 接收器把数据流打包成块存储在executor的内存中,如果开启了WAL,将会把数据写入到存在容错文

    件系统的日志文件中

    2. 青色的箭头表示提醒driver:

    – 接收到的数据块的元信息发送给driver中的StreamingContext, 这些元数据包括:executor内存中数据

    块的引用ID和日志文件中数据块的偏移信息

    3. 红色箭头表示处理数据:

    每一个批处理间隔,StreamingContext使用块信息用来生成RDD和jobs. SparkContext执行这些job用

    于处理executor内存中的数据块

    4. 黄色箭头表示checkpoint这些计算:

    – 以便于恢复。流式处理会周期的被checkpoint到文件中

    Sparkstreaming消费kafka

    Spark Streaming 接受数据的方式有两种: 只有receive有wal,direct不需要

    Receiver-based Approach:offset存储在zookeeper,由Receiver维护,Spark获取数据存入executor中,调用

    Kafka高阶API

    Direct Approach (No Receivers):offset自己存储和维护,由Spark维护,且可以从每个分区读取数据,调用Kafka低阶API

    SparkstreamingonKafkaDirect

    1. Direct的方式是会直接操作kafka底层的元数据信息

    2. 由于直接操作的是kafka,kafka就相当于底层的文件系统(对应receiver的executor内存)。

    3. 由于底层是直接读数据,没有所谓的Receiver,直接是周期性(Batch Intervel)的查询kafka,

    处理数据的时候,我们会使用基于kafka原生的Consumer api来获取kafka中特定范围(offset

    范围)中的数据。

    4. 读取多个kafka partition,Spark也会创建RDD的partition ,这个时候RDD的partition和

    kafka的partition是一致的。

    5. 不需要开启wal机制,从数据零丢失的角度来看,极大的提升了效率,还至少能节省一倍的磁盘

    空间。从kafka获取数据,比从hdfs获取数据,因为zero copy的方式,速度肯定更快。

    Direct与 Receiver对比

    从容错角度:

    – Receiver(高层次的消费者API):在失败的情况下,有些数据很有可能会被处理不止一次。 接收到的数

    据被可靠地保存到WAL中,但是还没有来得及更新Zookeeper中Kafka偏移量。导致数据不一致性:

    Streaming知道数据被接收,但Kafka认为数据还没被接收。这样系统恢复正常时,Kafka会再一次发送这

    些数据。at least once

    – Direct(低层次消费者API):给出每个batch区间需要读取的偏移量位置,每个batch的Job被运行时,

    对应偏移量的数据从Kafka拉取,偏移量信息也被可靠地存储(checkpoint),在从失败中恢复可以直接

    读取这些偏移量信息。exactly once

    Direct API消除了需要使用WAL的Receivers的情况,而且确保每个Kafka记录仅被接收一次并被高效地接收

    。这就使得我们可以将Spark Streaming和Kafka很好地整合在一起。总体来说,这些特性使得流处理管道拥

    有高容错性,高效性,而且很容易地被使用。

    展开全文
  • 大数据之实时处理SparkStreaming

    千次阅读 2019-05-26 15:45:43
    Spark Streamingspark核心API的一个扩展,可以实现高吞吐量、有容错机制的实时流数据处理。 支持多种数据源获取数据:Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,进行处理后保存在HDFS、...

    1. Spark Streaming基础知识

    Spark Streaming是spark核心API的一个扩展,可以实现高吞吐量、有容错机制的实时流数据处理。
    支持多种数据源获取数据:Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,进行处理后保存在HDFS、DataBase等。

    Spark Streaming将接收的实时流数据,按照一定时间间隔,对数据进行批次划分,交给Spark Engine引擎处理,最终得到一批批的结果。

    Dstream可以看做一组RDDs,即RDD的一个序列

    Dstream:Spark Streaming提供了表示连续数据流、高度抽象的被称为离散流的Dstream;
    任何对Dstream的操作都会转变为对底层RDD的操作;

    spark streaming任务的两个重要部分

    • 一个静态的 RDD DAG 的模板,来表示处理逻辑;
    • 一个动态的工作控制器,将连续的 streaming data 切分数据片段,并按照模板复制出新的 RDD DAG 的实例,对数据片段进行处理;

    我们在考虑的时候,可以认为,RDD 加上 batch 维度就是 DStream,DStream 去掉 batch 维度就是 RDD —— 就像 RDD = DStream at batch T。

    Dstream之间的转换所形成的依赖关系全部保存在DstreamGraph中,DstreamGraph简化了DAG scheduler,去除了多余的关系,只保留有用的依赖关系。DstreamGraph是RDD Graph的模板。

    整体架构由3个模块组成:

    • Master:记录Dstream之间的依赖关系,并负责任务调度以生成新的RDD
    • Worker:从网络接收数据并存储到executor 执行RDD计算
    • Client:负责向Spark Streaming中传输数据
    窗口操作

    spark提供了一组窗口操作,通过滑动窗口技术对大规模数据的增量更新进行统计分析
    Window Operation:定时进行一定时间段内的数据处理

    全局统计量

    全局统计量需要使用updataStateByKey算子,必须设置一个checkpoint目录,开启checkpoint机制以便在内存数据丢失的情况下可以从checkpoint中恢复数据。

    2. 容错性

    实时的流式处理系统必须是7*24运行的,同时可以从各种各样的系统错误中恢复,在设计之处,Spark Streaming就支持driver和worker节点的错误恢复。

    • Worker容错:spark和rdd的设计保证了集群中worker节点的容错性。spark streaming构建在spark之上,所以它的worker节点也是同样的容错机制。
    • Driver容错:依赖WAL机制持久化日志
    2.1 Write Ahead Logs

    WAL使用在文件系统和数据库中用于数据操作的持久性,先把数据写到一个持久化的日志中,然后对数据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。
    值得注意的是WAL开启了以后会减少Spark Streaming处理数据的吞吐,因为所有接收的数据会被写到容错的文件系统上,这样文件系统的吞吐和网络带宽将成为瓶颈。

    启动WAL需要做如下的配置

    1. 给streamingContext设置checkpoint的目录,该目录必须是Hadoop支持的文件系统hdfs,用来保存WAL和做Streaming的checkpoint
    2. spark.streaming.receiver.writeAheadLog.enable 设置为true (只有receiver方式才有WAL机制)

    实现细节

    下面讲解下WAL的工作原理。过一下Spark Streaming的架构

    当一个Spark Streaming应用启动了(例如driver启动), 相应的StreamingContext使用SparkContet去启动receiver,receiver是一个长时间执行的作业,这些接收器接收并保存这些数据到Spark的executor进程的内存中,这些数据的生命周期如下图所示:

    1. 蓝色的箭头表示接收的数据,接收器把数据流打包成块,存储在executor的内存中,如果开启了WAL,将会把数据写入到存在容错文件系统的日志文件中
    2. 青色的箭头表示提醒driver, 接收到的数据块的元信息发送给driver中的StreamingContext, 这些元数据包括:executor内存中数据块的引用ID和日志文件中数据块的偏移信息
    3. 红色箭头表示处理数据,每一个批处理间隔,StreamingContext使用块信息用来生成RDD和jobs. SparkContext执行这些job用于处理executor内存中的数据块
    4. 黄色箭头表示checkpoint这些计算,以便于恢复。流式处理会周期的被checkpoint到文件中

    当一个失败的driver重启以后,恢复流程如下

    1. 黄色的箭头用于恢复计算,checkpointed的信息是用于重启driver,重新构造上下文和重启所有的receiver
    2. 青色箭头恢复块元数据信息,所有的块信息对已恢复计算很重要
    3. 重新生成未完成的job(红色箭头),会使用到2恢复的元数据信息
    4. 读取保存在日志中的块(蓝色箭头),当job重新执行的时候,块数据将会直接从日志中读取,
    5. 重发没有确认的数据(紫色的箭头)。缓冲的数据没有写到WAL中去将会被重新发送。
    2.2 checkpoint机制

    Spark Streaming周期性的把应用数据存储在HDFS/S3这样的可靠存储系统中以供恢复时使用的机制被称为检查点机制

    3. Spark Streaming接收数据的两种方式

    3.1 Receiver-based Approach

    Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。kafka数据流由Receiver来接收,首先读入到executor进程的memory中,开启WAL机制后数据会被预写持久化到日志中(HDFS)。然后将数据块的元信息发送到driver中,同样的,元信息会预写到HDFS中,streaming context会将Dstream Graph和元数据信息相结合转化为一个streaming任务逻辑任务并被checkpoint机制记录在本地文件系统HDFS中以保证容错性。接下来,streaming任务被转化为一个spark context任务,由底层的spark来执行,将spark任务发送到指定的worker上开始运行任务。存储在zookeeper中,由Receiver来维护。当通过WAL机制将数据成功预写到HDFS之后,Receivers会相应更新ZooKeeper的offsets。

    3.2 Direct Approach

    Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要Receiver来持续不断读取数据。kafka看做一个底层文件系统,当batch数据的offsetRange(topic、partition、fromOffset、untilOffset)元数据传送给spark driver之后,batch任务被触发,由Executor使用pull模式拉取相应offset range范围的数据,并参与其他Executor的数据计算。offset由kafka自己来维护,driver来决定读取多少offsets,并将offsets交由checkpoints来维护。从此过程可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据。

    1. Direct的方式直接操作kafka底层的元数据信息
    2. 由于直接操作的是kafka,kafka相当于底层的文件系统
    3. 由于底层是直接读数据,没有所谓的Receiver,直接是周期性的查询kafka,处理数据的时候,我们会使用基于kafka原生的Consumer api来获取kafka特定offset范围的数据
    4. 读取多少个kafka partition,Spark也会创建相应的RDD的partition。RDD的partition和kafka的partition是一一对应的。
    5. 不需要开启WAL机制,从数据丢失的角度来看,极大地提升了效率,节省磁盘空间。从kafka获取数据,比从hdfs获取数据,速度更快(zero-copy)
    3.3 kafka partition和RDD的partition之间的关系

    Receiver方式:Topic中partition负载均衡平均分配到读入到consumer group并将partition的数据读入到executor进程中,然后,为每一个partition分配一个或多个task线程

    Direct方式:Topic中partition负载均衡平均分配到读入到consumer group。kafka partition和RDD的partition是一一对应关系。每一个partition由一个task线程来执行,相应的partition产生对应的offset,偏移量被记录在ZooKeeper。

    Topic每一个partition对应的offset目录为:/consumers/group_test/offsets/badou/0

    4. 从容错性角度分析

    Receiver:失败的情况下,有些数据很可能会被不止一次的接收。接收的数据通过WAL机制被保存到HDFS中,但是还没来得及更新zookeeper中的对应partition的offset就发生故障,导致数据不一致性。streaming知道数据被接收,但是kafka认为数据没有被接收。当系统恢复正常时,kafka会重新发送这些数据。(at least once)

    Direct:给出每个batch范围需要读取的偏移量,每个batch的job被运行时,对应offset range的数据会被拉取,数据处理完成后,offset信息被可靠地存储(checkpoint)。失败后重新恢复时可以直接读取这些偏移量信息。(exactly once)

    展开全文
  • SparkStreaming底层原理讲解

    千次阅读 2019-05-30 21:37:06
    Spark Streaming 是流式处理框架,是Spark ApI的扩展,支持可扩展、高吞吐量、容错的实时数据流处理。 实时数据的来源:kafka,flume,Twitter,ZeroMQ或者TCP Socket,并且可以使用高级功能的复杂算子,来处理流的数据...
  • 一、SparkStreaming概述 Spark内置对象: sparkconf: SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数。 SparkContext: **SparkContext为Spark的主要入口...
  • 1.Spark Streaming简介 1.1 概述 1.2 术语定义 1.3 Storm和Spark Streaming比较 2.运行原理 2.1 Streaming架构 2.2 编程模型 2.2.1 如何使用Spark Streaming 2.2.2DStream的输入源 2.2.3DStream的操作 2.3...
  • 本节主要讲解【CDN日志分析】这个实例,讲解2018大数据行业发展趋势,重点领域应用。同时以【CDN日志分析】为案例出发讲解Spark开发流程及实践。从多角度完善分享大数据开发流程。
  • Spark进行CDN日志分析视频教程,主要讲解CDN日志分析这个实例,讲解2018大数据行业发展趋势,重点领域应用。同时以CDN日志分析为案例出发讲解Spark开发流程及实践。从多角度完善分享大数据开发流程。
  • spark 入门 课程目标: 了解spark概念 知道spark的特点(与hadoop对比) 独立实现spark local模式的启动 1.1 spark概述 1、什么是spark 基于内存的计算引擎,它的计算速度非常快。但是仅仅只涉及到数据的...
  • CDN日志分析视频教程,主要讲解CDN日志分析这个实例,讲解2018大数据行业发展趋势,重点领域应用。同时以【CDN日志分析】为案例出发讲解Spark开发流程及实践。从多角度完善分享大数据开发流程。
  • Spark Streaming+Kafka spark 写入 kafka

    千次阅读 2018-09-14 17:47:49
    Spark streaming接收Kafka数据 基于Receiver的方式 直接读取方式 Spark向kafka中写入数据 Spark streaming+Kafka应用 Spark streaming+Kafka调优 合理的批处理时间(batchDuration) 合理的Kafka拉取量...
  • 文章目录一、Spark SQL概述二、SparkSQL版本1)SparkSQL的演变之路2)shark与SparkSQL对比3)SparkSession三、RDD、DataFrames和DataSet1)三者关联关系1)RDD1、核心概念2、RDD简单操作3、RDD API1)Transformation...
  • 本次项目是基于企业大数据经典案例项目(大数据日志分析),全方位、全流程讲解 大数据项目的业务分析、技术选型、架构设计、集群规划、安装部署、整合继承与开发和web可视化交互设计。项目代码托管于github,大家...
  • 在前面的文章中,总结了SparkStreaming入门级的文章,了解到SparkStreaming是一种微批处理的"实时"流技术,在实际场景中,当我们使用SparkStreaming开发好功能并通过测试之后部署到生产环境,那么之后就会7*24不间断...
  • 从智能商业的角度来讲,数据的结果代表了用户的反馈,获取结果的及时性就显得尤为重要,快速的获取数据反馈能够帮助公司更快的做出决策,更好的进行产品迭代,实时数仓在这一过程中起到了不可替代的作用。...
  • 现在工作中正在使用flink,避免对Spark流式处理的遗忘,在此进行总结。主要分为以下几个方面,均附有实际代码: Spark Streaming简介 ... Spark Streaming容错性分析 Spark Streaming整合Kafka案例 ...
  • 爱奇艺实时流处理项目实战 (Spark Streaming) 张长志技术全才...
  • 本节介绍如何编写 Spark Streaming 应用程序,由简到难讲解使用几个核心概念来解决实际应用问题。流数据模拟器在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境,首先需要定义...
  • 性能分析的概览: 1,队列是否积累。 2,executor是否倾斜。 3,task数据是否倾斜。 4,gc是否严重。   目录 前言 Spark streaming接收Kafka数据 基于Receiver的方式 直接读取方式 Spark向kafka中...
  • Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据...
  • 本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka在舆情项目中的应用,最后将自己在Spark Streaming+kafka的实际优化中的一些经验进行归纳总结。(如有任何纰漏欢迎补充来...
  • 说明Spark Streaming的原理说明的文章很多,这里不做介绍。本文主要介绍使用Kafka作为数据源的编程模型,编码实践,以及一些优化说明 spark streaming:...
  • 在 1.2 节中已经跟大家详细介绍了 Flink,那么在本节就主要 Blink、Spark Streaming、Structured Streaming 和 Storm 的区别。 Flink Flink 是一个针对流数据和批数据分布式处理的引擎,在某些对实时性要求非常高的...
  • SparkStreaming是Spark核心API的一个扩展,具有高吞吐量和容错能力的实时流数据处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部...
  • Spark Streaming 是批处理的流式实时计算框架,支持从多种数据源获取数据,如 Kafka、TCP sockets、文件系统等。它可以使用诸如 map、reduce、join 等高级函数进行复杂算法的处理,最后还可以将处理结果存储到文件...
  • 来源:http://blog.csdn.net/eric_sunah/article/details/54096057?utm_source=tuicool&utm_medium=referral 说明 Spark Streaming的原理说明的文章很多,这里不做介绍。...spark streaming:ht
  • 手把手视频详细讲解项目开发全过程,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 课程简介 知识点介绍、代码演示、逻辑分析、灵活举例、使用图形的方式详细演示代码的流程和细节、整合企业级实战案例,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,228
精华内容 891
关键字:

sparkstreaming实时日志分析项目讲解