kafka spark 拉数据_spark消费kafka数据更新redis数据 - CSDN
  • Spark streaming接收Kafka数据 基于Receiver的方式 直接读取方式 Sparkkafka中写入数据 Spark streaming+Kafka应用 Spark streaming+Kafka调优 合理的批处理时间(batchDuration) 合理的Kafka拉取量...

    目录

    前言

    在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消费者一端,我们利用了spark streaming从kafka中不断拉取数据进行词频统计。本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka在舆情项目中的应用,最后将自己在Spark Streaming+kafka的实际优化中的一些经验进行归纳总结。(如有任何纰漏欢迎补充来踩,我会第一时间改正^v^)

    Spark streaming接收Kafka数据

    用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。

    基于Receiver的方式

    这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据。如下图:
    Receiver图形解释
    在使用时,我们需要添加相应的依赖包:

    <dependency><!-- Spark Streaming Kafka -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>

    而对于Scala的基本使用方式如下:

    import org.apache.spark.streaming.kafka._
    
     val kafkaStream = KafkaUtils.createStream(streamingContext, 
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

    还有几个需要注意的点:

    • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
    • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
    • 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

    直接读取方式

    在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图:

    这种方法相较于Receiver方式的优势在于:

    • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
    • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
    • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

    以上主要是对官方文档[1]的一个简单翻译,详细内容大家可以直接看下官方文档这里不再赘述。

    不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然zookeeper就保存了当前消费的offset值,那么如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,我们是直接从kafka来读数据,那么offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到zookeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到zookeeper中的通用类,我将其放在了github上:
    Spark streaming+Kafka demo
    示例中KafkaManager是一个通用类,而KafkaCluster是kafka源码中的一个类,由于包名权限的原因我把它单独提出来,ComsumerMain简单展示了通用类的使用方法,在每次创建KafkaStream时,都会先从zooker中查看上次的消费记录offsets,而每个batch处理完成后,会同步offsets到zookeeper中。

    Spark向kafka中写入数据

    上文阐述了Spark如何从Kafka中流式的读取数据,下面我整理向Kafka中写数据。与读数据不同,Spark并没有提供统一的接口用于写入Kafka,所以我们需要使用底层Kafka接口进行包装。
    最直接的做法我们可以想到如下这种方式:

    input.foreachRDD(rdd =>
      // 不能在这里创建KafkaProducer
      rdd.foreachPartition(partition =>
        partition.foreach{
          case x:String=>{
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            println(x)
            val producer = new KafkaProducer[String,String](props)
            val message=new ProducerRecord[String, String]("output",null,x)
            producer.send(message)
          }
        }
      )
    ) 

    但是这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接。如何解决呢?

    1. 首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:
    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
    class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, key, value))
      def send(topic: String, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, value))
    }
    
    object KafkaSink {
      import scala.collection.JavaConversions._
      def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
          producer
        }
        new KafkaSink(createProducerFunc)
      }
      def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    }
    1. 之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:
    // 广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", Conf.brokers)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      log.warn("kafka producer init done!")
      ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }

    这样我们就能在每个executor中愉快的将数据输入到kafka当中:

    //输出到kafka
    segmentedStream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {
        rdd.foreach(record => {
          kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
          // do something else
        })
      }
    })

    Spark streaming+Kafka应用

    WeTest舆情监控对于每天爬取的千万级游戏玩家评论信息都要实时的进行词频统计,对于爬取到的游戏玩家评论数据,我们会生产到Kafka中,而另一端的消费者我们采用了Spark Streaming来进行流式处理,首先利用上文我们阐述的Direct方式从Kafka拉取batch,之后经过分词、统计等相关处理,回写到DB上(至于Spark中DB的回写方式可参考我之前总结的博文:Spark踩坑记——数据库(Hbase+Mysql)),由此高效实时的完成每天大量数据的词频统计任务。

    Spark streaming+Kafka调优

    Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。

    合理的批处理时间(batchDuration)

    几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,如下图:

    合理的Kafka拉取量(maxRatePerPartition重要)

    对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的,配置参数为:spark.streaming.kafka.maxRatePerPartition。这个参数默认是没有上线的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time,如下图:

    缓存反复使用的Dstream(RDD)

    Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数,如下图:

    设置合理的GC

    长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

    设置合理的CPU资源数

    CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

    设置合理的parallelism

    partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
    在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

    使用高性能的算子

    这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

    • 使用reduceByKey/aggregateByKey替代groupByKey
    • 使用mapPartitions替代普通map
    • 使用foreachPartitions替代foreach
    • 使用filter之后进行coalesce操作
    • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

    使用Kryo优化序列化性能

    这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。
    在Spark中,主要有三个地方涉及到了序列化:

    • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
    • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
    • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

    // 创建SparkConf对象。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    // 设置序列化器为KryoSerializer。
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

    结果

    经过种种调试优化,我们最终要达到的目的是,Spark Streaming能够实时的拉取Kafka当中的数据,并且能够保持稳定,如下图所示:

    当然不同的应用场景会有不同的图形,这是本文词频统计优化稳定后的监控图,我们可以看到Processing Time这一柱形图中有一Stable的虚线,而大多数Batch都能够在这一虚线下处理完毕,说明整体Spark Streaming是运行稳定的。

     

    转载自:

    https://www.cnblogs.com/xlturing/p/6246538.html#spark%E5%90%91kafka%E4%B8%AD%E5%86%99%E5%85%A5%E6%95%B0%E6%8D%AE

    展开全文
  • sparkkafka写入数据

    2017-09-07 09:56:23
    在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消费者一端,我们利用了spark streaming从kafka中不断拉取数据进行词频统计。...

    前言

    在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消费者一端,我们利用了spark streaming从kafka中不断拉取数据进行词频统计。本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka在舆情项目中的应用,最后将自己在Spark Streaming+kafka的实际优化中的一些经验进行归纳总结。(如有任何纰漏欢迎补充来踩,我会第一时间改正^v^)

    Spark streaming接收Kafka数据

    用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。

    基于Receiver的方式

    这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据。如下图:
    Receiver图形解释
    在使用时,我们需要添加相应的依赖包:

    <dependency><!-- Spark Streaming Kafka -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>

    而对于Scala的基本使用方式如下:

    import org.apache.spark.streaming.kafka._
    
     val kafkaStream = KafkaUtils.createStream(streamingContext, 
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

    还有几个需要注意的点:

    • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
    • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
    • 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

    直接读取方式

    在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图:

    这种方法相较于Receiver方式的优势在于:

    • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
    • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
    • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

    以上主要是对官方文档[1]的一个简单翻译,详细内容大家可以直接看下官方文档这里不再赘述。

    不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然zookeeper就保存了当前消费的offset值,那么如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,我们是直接从kafka来读数据,那么offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到zookeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到zookeeper中的通用类,我将其放在了github上:
    Spark streaming+Kafka demo
    示例中KafkaManager是一个通用类,而KafkaCluster是kafka源码中的一个类,由于包名权限的原因我把它单独提出来,ComsumerMain简单展示了通用类的使用方法,在每次创建KafkaStream时,都会先从zooker中查看上次的消费记录offsets,而每个batch处理完成后,会同步offsets到zookeeper中。

    Spark向kafka中写入数据

    上文阐述了Spark如何从Kafka中流式的读取数据,下面我整理向Kafka中写数据。与读数据不同,Spark并没有提供统一的接口用于写入Kafka,所以我们需要使用底层Kafka接口进行包装。
    最直接的做法我们可以想到如下这种方式:

    input.foreachRDD(rdd =>
      // 不能在这里创建KafkaProducer
      rdd.foreachPartition(partition =>
        partition.foreach{
          case x:String=>{
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            println(x)
            val producer = new KafkaProducer[String,String](props)
            val message=new ProducerRecord[String, String]("output",null,x)
            producer.send(message)
          }
        }
      )
    ) 

    但是这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接。如何解决呢?

    1. 首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:
    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
    class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, key, value))
      def send(topic: String, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, value))
    }
    
    object KafkaSink {
      import scala.collection.JavaConversions._
      def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
          producer
        }
        new KafkaSink(createProducerFunc)
      }
      def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    }
    1. 之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:
    // 广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", Conf.brokers)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      log.warn("kafka producer init done!")
      ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }

    这样我们就能在每个executor中愉快的将数据输入到kafka当中:

    //输出到kafka
    segmentedStream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {
        rdd.foreach(record => {
          kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
          // do something else
        })
      }
    })

    Spark streaming+Kafka应用

    WeTest舆情监控对于每天爬取的千万级游戏玩家评论信息都要实时的进行词频统计,对于爬取到的游戏玩家评论数据,我们会生产到Kafka中,而另一端的消费者我们采用了Spark Streaming来进行流式处理,首先利用上文我们阐述的Direct方式从Kafka拉取batch,之后经过分词、统计等相关处理,回写到DB上(至于Spark中DB的回写方式可参考我之前总结的博文:Spark踩坑记——数据库(Hbase+Mysql)),由此高效实时的完成每天大量数据的词频统计任务。

    Spark streaming+Kafka调优

    Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。

    合理的批处理时间(batchDuration)

    几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,如下图:

    合理的Kafka拉取量(maxRatePerPartition重要)

    对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的,配置参数为:spark.streaming.kafka.maxRatePerPartition。这个参数默认是没有上线的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time,如下图:

    缓存反复使用的Dstream(RDD)

    Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数,如下图:

    设置合理的GC

    长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

    设置合理的CPU资源数

    CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

    设置合理的parallelism

    partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
    在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

    使用高性能的算子

    这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

    • 使用reduceByKey/aggregateByKey替代groupByKey
    • 使用mapPartitions替代普通map
    • 使用foreachPartitions替代foreach
    • 使用filter之后进行coalesce操作
    • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

    使用Kryo优化序列化性能

    这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。
    在Spark中,主要有三个地方涉及到了序列化:

    • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
    • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
    • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

    // 创建SparkConf对象。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    // 设置序列化器为KryoSerializer。
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

    结果

    经过种种调试优化,我们最终要达到的目的是,Spark Streaming能够实时的拉取Kafka当中的数据,并且能够保持稳定,如下图所示:

    当然不同的应用场景会有不同的图形,这是本文词频统计优化稳定后的监控图,我们可以看到Processing Time这一柱形图中有一Stable的虚线,而大多数Batch都能够在这一虚线下处理完毕,说明整体Spark Streaming是运行稳定的。

    转自:http://www.cnblogs.com/xlturing/p/6246538.html点击打开链接

    展开全文
  • SparkKafka整合原理

    2019-01-08 19:55:40
    sparkkafka整合有2种方式 1、receiver 顾名思义:就是有一个线程负责获取数据,这个... 注意:这里的获取数据并不是从kafka(pull) 而是接收数据,具体原理是该receiver线程发送请求到kafka,这个请求包含对...

    spark和kafka整合有2种方式

    1、receiver

    顾名思义:就是有一个线程负责获取数据,这个线程叫receiver线程

    解释:

    1、Spark集群中的某个executor中有一个receiver线程,这个线程负责从kafka中获取数据

     注意:这里的获取数据并不是从kafka中拉(pull) 而是接收数据,具体原理是该receiver线程发送请求到kafka,这个请求包含对kafka中每个partition的消费偏移量(offset),然后由kafka主动的推送数据到spark中,再有该receiver线程负责接收数据

    2、当receiver线程接收到数据后会做备份处理,即把数据备份到其他的executor中,也可能会备份到这个receiver线程所在节点的executor中

    3、当备份完毕后该线程会把每个partition的消费偏移量在zookeeper中修改,(新版本的kafka的offset 保存在kafka集群中)

    4、修改完offset后,该receiver线程会把"消费"的数据告诉Driver

    5、Driver分发任务时会根据每个executor上的数据,根据数据本地性发送

    问题:

    当第三步执行完后,对于kafka来说这一批数据已经消费完成,那么如果此时Driver挂掉,那么这一批数据就会丢失,为了解决这个问题,有一个叫WAL逾写日志的概念,即把一部分数据存储在HDFS上,当Driver回复后可以从HDFS上获取这部分数据,但是开启WAL性能会受到很大的影响

    2、dirct

    直接连接:即每个executor直接取kafka获取数据

    1、首先Driver程序会定时(batchInterval)的向executor中发送任务(4个)

      >> 问题1:Driver怎么知道要把任务发送到哪个executor中呢?

         >> Driver会调用Kafka的接口获取某个partition位于哪个节点上,根据这个来获取这些信息并发送任务到指定的节点,这就类似于Spark集群处理HDFS上的文件数据,Spark是可以知道某些文件的block在那些节点上,就是spark调用了HDFS的相关接口

      >> 问题2:为什么是4个任务?

          >> 这个个数由消费的topic的partition的个数决定,因为spark会对每个partition开启一个任务,所以任务数是kafka的某个topic的partition数

    2、当每个任务确定了处理那个partition中的数据,则就有任务本身去kafka获取数据

    总结:目前公司中第二种方式使用比较多,这样也有一个问题就是说当kafka中某个topic加了ACL验证,那么这种方式是不能消费加了ACL的topic中的数据,因为kafka客户端的ACL验证需要客户端配置一个环境变量在System的Properties中,在local模式下可以实现,因为local模式下启动一个虚拟机实例,即只对应一个System,而在集群模式下,要启动多个进程,即启动多个虚拟机实例,所以System的全局属性没有办法配置

    如何学习大数据?学习没有资料?

    想学习大数据开发技术,Hadoop,spark,云计算,数据分析等技术,在这里向大家推荐一个学习资料分享群:894951460,里面有大牛已经整理好的相关学习资料,希望对你们有所帮助。

    展开全文
  • 下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考。文章写的通俗...
        

    下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考。文章写的通俗易懂,搭配代码,供大家参考。

    本文的作者是来自TalkingData的数据工程师张伟。

    SS 是 Spark 上的一个流式处理框架,可以面向海量数据实现高吞吐量、高容错的实时计算。SS 支持多种类型数据源,包括 Kafka、Flume、twitter、zeroMQ、Kinesis 以及 TCP sockets 等。SS 实时接收数据流,并按照一定的时间间隔(下文称为“批处理时间间隔”)将连续的数据流拆分成一批批离散的数据集;然后应用诸如 map、reduce、join 和 window 等丰富的 API 进行复杂的数据处理;最后提交给 Spark 引擎进行运算,得到批量结果数据,因此其也被称为准实时处理系统。而结果也能保存在很多地方,如 HDFS,数据库等。另外 SS 也能和 MLlib(机器学习)以及 GraphX(图计算)完美融合。

    640

    Spark Streaming 支持多种类型数据源

    Spark Streaming 基础概念

    DStream

    Discretized Stream 是 SS 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。DStream 本质上是一个以时间为键,RDD 为值的哈希表,保存了按时间顺序产生的 RDD,而每个 RDD 封装了批处理时间间隔内获取到的数据。SS 每次将新产生的 RDD 添加到哈希表中,而对于已经不再需要的 RDD 则会从这个哈希表中删除,所以 DStream 也可以简单地理解为以时间为键的 RDD 的动态序列。如下图:

    640

    窗口时间间隔

    窗口时间间隔又称为窗口长度,它是一个抽象的时间概念,决定了 SS 对 RDD 序列进行处理的范围与粒度,即用户可以通过设置窗口长度来对一定时间范围内的数据进行统计和分析。假如设置批处理时间间隔为 1s,窗口时间间隔为 3s。如下图,DStream 每 1s 会产生一个 RDD,红色边框的矩形框就表示窗口时间间隔,一个窗口时间间隔内最多有 3 个 RDD,Spark Streaming 在一个窗口时间间隔内最多会对 3 个 RDD 中的数据进行统计和分析。

    640

    滑动时间间隔

    滑动时间间隔决定了 SS 程序对数据进行统计和分析的频率。它指的是经过多长时间窗口滑动一次形成新的窗口,滑动时间间隔默认情况下和批处理时间间隔相同,而窗口时间间隔一般设置的要比它们两个大。在这里必须注意的一点是滑动时间间隔和窗口时间间隔的大小一定得设置为批处理时间间隔的整数倍。

    如下图,批处理时间间隔是 1 个时间单位,窗口时间间隔是 3 个时间单位,滑动时间间隔是 2 个时间单位。对于初始的窗口 time 1-time 3,只有窗口时间间隔满足了才触发数据的处理。这里需要注意的一点是,初始的窗口有可能覆盖的数据没有 3 个时间单位,但是随着时间的推进,窗口最终会覆盖到 3 个时间单位的数据。当每个 2 个时间单位,窗口滑动一次后,会有新的数据流入窗口,这时窗口会移去最早的两个时间单位的数据,而与最新的两个时间单位的数据进行汇总形成新的窗口(time3-time5)。

    640

    Spark Streaming 读取 Kafka 数据

    Spark Streaming 与 Kafka 集成接收数据的方式有两种:

    • Receiver-based Approach

    • Direct Approach (No Receivers)

    Receiver-based Approach

    这个方法使用了 Receivers 来接收数据。Receivers 的实现使用到 Kafka 高级消费者 API。对于所有的 Receivers,接收到的数据将会保存在 Spark executors 中,然后由 SS 启动的 Job 来处理这些数据。

    然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在 SS 中使用 WAL 日志,这是在 Spark 1.2.0 才引入的功能,这使得我们可以将接收到的数据保存到 WAL 中(WAL 日志可以存储在 HDFS 上),所以在失败的时候,我们可以从 WAL 中恢复,而不至于丢失数据。

    架构图如下:

    640

    使用方式:

    (1) 导入 Kafka 的 Spark Streaming 整合包

    640

    (2) 创建 DStream

    640

    640

    需要注意的几点:

    1) Kafka 的 topic 和 partition 并不和 SS 生成的 RDD 的 partition 相对应,所以上面代码中 topicMap 里增加 threads 只能增加使用一个 receiver 消费这个 topic 的线程数,它并不能增加 Spark 处理数据的并行数,因为每个 input DStream 在一个 worker 机器上只创建一个接受单个数据流的 receiver。

    2) 可以为不同 topic 和 group 创建多个 DStream 来使用多个 receiver 并行的接受数据。例如:一个单独的 Kafka input DStream 接受两个 topic 的数据可以分为两个 Kafka input DStream,每个只接受一个 topic 的数据,这样可以并行的接受速度从而提高整体吞吐量。

    3) 如果开启了 wal 来保证数据不丢失话,需要设置 checkpoint 目录,并且像上面代码一样指定数据序列化到 hdfs 上的方式(比如:StorageLevel.MEMORY_AND_DISK_SER)

    4) 建议每个批处理时间间隔周期接受到的数据最好不要超过接受 Executor 的内存 (Storage) 的一半。

    要描述清楚 Receiver-based Approach ,我们需要了解其接收流程,分析其内存使用,以及相关参数配置对内存的影响。

    数据接收流程

    当执行 SS 的 start 方法后,SS 会标记 StreamingContext 为 Active 状态,并且单独起个线程通过 ReceiverTracker 将从 ReceiverInputDStreams 中获取的 receivers 以并行集合的方式分发到 worker 节点,并运行他们。worker 节点会启动 ReceiverSupervisor。接着按如下步骤处理:

    1) ReceiverSupervisor 会启动对应的 Receiver(这里是 KafkaReceiver)

    2) KafkaReceiver 会根据配置启动新的线程接受数据,在该线程中调用 ReceiverSupervisor.pushSingle 方法填充数据,注意,这里是一条一条填充的。

    3) ReceiverSupervisor 会调用 BlockGenerator.addData 进行数据填充。

    到目前为止,整个过程不会有太多内存消耗,正常的一个线性调用。所有复杂的数据结构都隐含在 BlockGenerator 中。

    BlockGenerator 存储结构

    BlockGenerator 会复杂些,重要的数据存储结构有四个:

    1) 维护了一个缓存 currentBuffer ,这是一个变长的数组的 ArrayBuffer。currentBuffer 并不会被复用,而是每个 spark.streaming.blockInterval 都会新建一个空的变长数据替换老的数据作为新的 currentBuffer,然后把老的对象直接封装成 Block 放入到 blocksForPushing 的队列里,BlockGenerator 会负责保证 currentBuffer 只有一个。currentBuffer 填充的速度是可以被限制的,以秒为单位,配置参数为 spark.streaming.receiver.maxRate,是单个 Receiver 每秒钟允许添加的条数。这个是 Spark 内存控制的第一步,填充 currentBuffer 是阻塞的,消费 Kafka 的线程直接做填充。

    2) 维护了一个 blocksForPushing 的阻塞队列,size 默认为 10 个 (1.6.3 版本),可通过 spark.streaming.blockQueueSize 进行配置。该队列主要用来实现生产 - 消费模式,每个元素其实是一个 currentBuffer 形成的 block。

    3) blockIntervalTimer 是一个定时器。其实是一个生产者,负责将当前 currentBuffer 的数据放到 blocksForPushing 中,并新建一个 currentBuffer。通过参数 spark.streaming.blockInterval 设置,默认为 200ms。放的方式很简单,直接把 currentBuffer 做为 Block 的数据源。这就是为什么 currentBuffer 不会被复用。

    4) blockPushingThread 也是一个定时器,负责将 Block 从 blocksForPushing 取出来,

    然后交给 BlockManagerBasedBlockHandler.storeBlock。10 毫秒会取一次,不可配置。到这一步,才真的将数据放到了 Spark 的 BlockManager 中。

    下面我们会详细分析每一个存储对象对内存的使用情况:

    currentBuffer

    首先自然要说下 currentBuffer,它缓存的数据会被定时器每隔 spark.streaming.blockInterval(默认 200ms)的时间拿走,这个缓存用的是 Spark 的运行时内存(我们使用的是静态内存管理模式,默认应该是 heap *0.2,如果是统一内存管理模式的话应该是 heap*0.25),而不是 storage 内存。如果 200ms 期间你从 Kafka 接受的数据足够大,则这部分内存很容易 OOM 或者进行大量的 GC,导致 receiver 所在的 Executor 极容易挂掉或者处理速度也很慢。如果你在 SparkUI 发现 Receiver 挂掉了,考虑有没有可能是这个问题。

    blocksForPushing

    blocksForPushing 这个是作为 currentBuffer 和 BlockManager 之间的中转站。默认存储的数据最大可以达到 10*currentBuffer 大小。一般不大可能有问题,除非你的 spark.streaming.blockInterval 设置的比 10ms 还小,官方推荐最小也要设置成 50ms,只要你不设置的过大,这块不用太担心。

    blockPushingThread

    blockPushingThread 负责从 blocksForPushing 获取数据,并且写入 BlockManager。blockPushingThread 只写他自己所在的 Executor 的 blockManager, 也就是一个 receiver 每个批处理时间间隔周期的数据都会被一个 Executor 接收。 这是导致内存被撑爆的最大风险,在数据量很大的情况下,会导致 Receiver 所在的 Executor 直接挂掉。

    对应的解决方案在上面需要注意的建议 4) 有提到,也可以使用多个 Receiver 来消费同一个 topic,降低每个 receiver 接收的数据量, 使用类似下面的代码

    640

    前面我们提到,SS 的消费速度可以设置上限,其实 SS 也可以根据之前的周期处理情况来自动调整下一个周期处理的数据量。你可以通过将 spark.streaming.backpressure.enabled 设置为 true 打开该功能。算法的论文可参考: Socc 2014: Adaptive Stream Processing using Dynamic Batch Sizing , 还是有用的,我现在也都开启着。 另外,Spark 里除了这个 Dynamic, 还有一个就是 Dynamic Allocation, 也就是 Executor 数量会根据资源使用情况,自动分配资源。具体见官网文档。

    Direct Approach (No Receivers)

    和基于 Receiver 接收数据不一样,这种方式定期地从 Kafka 的 topic+partition 中查询最新的偏移量,再根据定义的偏移量范围在每个批处理时间间隔里面处理数据。当作业需要处理的数据来临时,Spark 通过调用 Kafka 的低级消费者 API 读取一定范围的数据。这个特性目前还处于试验阶段,而且仅仅在 Scala 和 Java 语言中提供相应的 API。

    和基于 Receiver 方式相比,这种方式主要有一些几个优点:

    (1)简化并行。

    我们不需要创建多个 Kafka 输入流,然后 union 他们。而使用 DirectStream,SS 将会创建和 Kafka 分区一样的 RDD 分区个数,而且会从 Kafka 并行地读取数据,也就是说 Spark 分区将会和 Kafka 分区有一一对应的关系,这对我们来说很容易理解和使用;

    (2)高效。

    第一种实现零数据丢失是通过将数据预先保存在 WAL 中,这将会复制一遍数据,这种方式实际上很不高效,因为这导致了数据被拷贝两次:一次是被 Kafka 复制;另一次是写到 WAL 中。但是本方法因为没有 Receiver,从而消除了这个问题,所以不需要 WAL 日志;

    (3)恰好一次语义(Exactly-once semantics)。

    第一种实现中通过使用 Kafka 高层次的 API 把偏移量写入 Zookeeper 中,这是读取 Kafka 中数据的传统方法。虽然这种方法可以保证零数据丢失,但是还是存在一些情况导致数据会丢失,因为在失败情况下通过 SS 读取偏移量和 Zookeeper 中存储的偏移量可能不一致。而本文提到的方法是通过 Kafka 低层次的 API,并没有使用到 Zookeeper,偏移量仅仅被 SS 保存在 Checkpoint 中。这就消除了 SS 和 Zookeeper 中偏移量的不一致,而且可以保证每个记录仅仅被 SS 读取一次,即使是出现故障。

    但是本方法唯一的坏处就是没有更新 Zookeeper 中的偏移量,所以基于 Zookeeper 的 Kafka 监控工具将会无法显示消费的状况。但是你可以通过自己手动地将偏移量写入到 Zookeeper 中。

    架构图如下:

    640

    使用方式:

    640

    其中 fromOffsets 是指定的 topic 和 partition 开始读取的 offset 起始值,方法如下:

    640

    个人认为,DirectApproach 更符合 Spark 的思维。我们知道,RDD 的概念是一个不变的,分区的数据集合。我们将 Kafka 数据源包裹成了一个 KafkaRDD,RDD 里的 partition 对应的数据源为 Kafka 的 partition。唯一的区别是数据在 Kafka 里而不是事先被放到 Spark 内存里。其实包括 FileInputStream 里也是把每个文件映射成一个 RDD, 比较好奇,为什么一开始会有 Receiver-based Approach,额外添加了 Receiver 这么一个概念。

    DirectKafkaInputDStream

    SS 通过 Direct Approach 接收数据的入口自然是 KafkaUtils.createDirectStream 了。在调用该方法时,会先创建protected val kc = new KafkaCluster(KafkaParams)

    KafkaCluster 这个类是真实负责和 Kafka 交互的类,该类会获取 Kafka 的 partition 信息, 接着会创建 DirectKafkaInputDStream。 此时会获取每个 Topic 的每个 partition 的 offset。 如果配置成 smallest 则拿到最早的 offset, 否则拿最近的 offset。

    每个 DirectKafkaInputDStream 也会持有一个 KafkaCluster 实例。到了计算周期后,对应的 DirectKafkaInputDStream .compute 方法会被调用, 此时做下面几个操作:

    1) 获取对应 Kafka Partition 的 untilOffset。这样就确定了需要获取数据的 offset 的范围,同时也就知道了需要计算多少数据了

    2) 构建一个 KafkaRDD 实例。这里我们可以看到,每个计算周期里,DirectKafkaInputDStream 和 KafkaRDD 是一一对应的

    3) 将相关的 offset 信息报给 InputInfoTracker

    4) 返回该 RDD

    KafkaRDD 的组成结构

    KafkaRDD 包含 N(N=Kafka 的 partition 数目) 个 KafkaRDDPartition, 每个 KafkaRDDPartition 其实只是包含一些信息,譬如 topic,offset 等,真正如果想要拉数据,是通过 KafkaRDDIterator 来完成,一个 KafkaRDDIterator 对应一个 KafkaRDDPartition。整个过程都是延时过程,也就是说数据其实都还在 Kafka 里,直到有实际的 action 被触发,才会主动去 Kafka 拉数据。

    限速

    Direct Approach (NoReceivers) 的接收方式也是可以限制接受数据的量的。你可以通过设置 spark.streaming.kafka.maxRatePerPartition 来完成对应的配置。需要注意的是,这里是对每个 Partition 进行限速。所以你需要事先知道 Kafka 有多少个分区,才好评估系统的实际吞吐量,从而设置该值。

    相应的,spark.streaming.backpressure.enabled 参数在 Direct Approach 中也是继续有效的。

    Receiver-based Approach VS Direct Approach (No Receivers)

    经过上面对两种数据接收方案的介绍,我们发现, Receiver-based Approach 存在各种内存折腾,对应的 Direct Approach (No Receivers) 则显得比较纯粹简单些,这也给其带来了较多的优势,主要有如下几点:

    1) 因为按需要拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了。这个在 Receiver-based Approach 就比较麻烦,你需要通过 spark.streaming.blockInterval 等参数来调整。

    2) 数据默认就被分布到了多个 Executor 上。Receiver-based Approach 你需要做特定的处理,才能让 Receiver 分不到多个 Executor 上。

    3) Receiver-based Approach 的方式,一旦你的 Batch Processing 被 delay 了,或者被 delay 了很多个 batch, 那估计你的 Spark Streaming 程序离崩溃也就不远了。 Direct Approach (No Receivers) 则完全不会存在类似问题。就算你 delay 了很多个 batch time, 你内存中的数据只有这次处理的。

    4) Direct Approach (No Receivers) 直接维护了 Kafka offset, 可以保证数据只有被执行成功了,才会被记录下来,通过 checkpoint 机制。如果采用 Receiver-based Approach,消费 Kafka 和数据处理是被分开的,这样就很不好做容错机制,比如系统宕掉了。所以你需要开启 WAL, 但是开启 WAL 带来一个问题是,数据量很大,对 HDFS 是个很大的负担,而且也会给实时程序带来比较大延迟。

    我原先以为 Direct Approach 因为只有在计算的时候才拉取数据,可能会比 Receiver-based Approach 的方式慢,但是经过我自己的实际测试,总体性能 Direct Approach 会更快些,因为 Receiver-based Approach 可能会有较大的内存隐患,GC 也会影响整体处理速度。

    如何保证数据接收的可靠性

    SS 自身可以做到 at least once 语义, 具体方式是通过 CheckPoint 机制。

    CheckPoint 机制

    CheckPoint 会涉及到一些类,以及他们之间的关系:DStreamGraph 类负责生成任务执行图,而 JobGenerator 则是任务真实的提交者。任务的数据源则来源于 DirectKafkaInputDStream,checkPoint 一些相关信息则是由类 DirectKafkaInputDStreamCheckpointData 负责。

    好像涉及的类有点多,其实没关系,我们完全可以不用关心他们。先看看 checkpoint 都干了些啥,checkpoint 其实就序列化了一个类而已:

    以下是其中的类成员:

    640

    其他的都比较容易理解,最重要的是 graph,该类全路径名是:

    里面有两个核心的数据结构是:

    640

    inputStreams 对应的就是 DirectKafkaInputDStream 了。

    再进一步,DirectKafkaInputDStream 有一个重要的对象:

    640

    checkpointData 里则有一个 data 对象,里面存储的内容也很简单

    640

    就是每个 batch 的唯一标识 time 对象,以及每个 KafkaRDD 对应的的 Kafka 偏移信息。

    而 outputStreams 里则是 RDD, 如果你存储的时候做了 foreach 操作,那么应该就是 forEachRDD 了,他被序列化的时候是不包含数据的。

    经过上面的分析,我们发现:

    1) checkpoint 是非常高效的。没有涉及到实际数据的存储。一般大小只有几十 K,因为只存了 Kafka 的偏移量等信息。

    2) checkpoint 采用的是序列化机制,尤其是 DStreamGraph 的引入,里面包含了可能如 ForeachRDD 等,而 ForeachRDD 里面的函数应该也会被序列化。如果采用了 CheckPoint 机制,而你的程序包做了做了变更,恢复后可能会有一定的问题(这个在测试过程中碰到过)。

    接着我们看看 JobGenerator 是怎么提交一个真实的 batch 任务的,分析在什么时间做 checkpoint 操作,从而保证数据的高可用:

    1) 产生 jobs

    2) 成功则提交 jobs 然后异步执行

    3) 失败则会发出一个失败的事件

    4) 无论成功或者失败,都会发出一个 DoCheckpoint 事件。

    5) 当任务运行完成后,还会再调用一次 DoCheckpoint 事件。

    只要任务运行完成后没能顺利执行完 DoCheckpoint 前 crash, 都会导致这次 Batch 被重新调度。也就说无论怎样,不存在丢数据的问题,而这种稳定性是靠 checkpoint 机制以及 Kafka 的可回溯性来完成的。

    那现在会产生一个问题,假设我们的业务逻辑会对每一条数据都处理,则:

    1) 我们没有处理一条数据

    2) 我们可能只处理了部分数据

    3) 我们处理了全部数据

    根据我们上面的分析,无论如何,这次失败了,都会被重新调度,那么我们可能会重复处理数据。有可能事最后失败的那一批次数据的一部分,也可能是全部,但不会更多了。

    业务需要做事务,保证 Exactly Once 语义

    这里业务场景被区分为两个:

    1) 幂等操作

    2) 业务代码需要自身添加事物操作

    所谓幂等操作就是重复执行不会产生问题,如果是这种场景下,你不需要额外做任何工作。但如果你的应用场景是不允许数据被重复执行的,那只能通过业务自身的逻辑代码来解决了。

    这个 SS 倒是也给出了官方方案:

    640

    这代码什么含义呢? 就是说针对每个 partition 的数据,产生一个 uniqueId, 只有这个 partion 的所有数据被完全消费,则算成功,否则算失败,要回滚。下次重复执行这个 uniqueId 时,如果已经被执行成功过的,则 skip 掉。这样,就能保证数据 Exactly Once 语义了。

    总结

    根据我的实际经验,目前 Direct Approach 稳定性个人感觉比 Receiver-based Approach 更好些,推荐使用 Direct Approach 方式和 Kafka 进行集成,并且开启相应的 checkpoint 功能,保证数据接收的稳定性,Direct Approach 模式本身可以保证数据 at least once 语义,如果你需要 Exactly Once 语义时,需要保证你的业务是幂等,或者保证了相应的事务。


    640?wx_fmt=png



    640?wx_fmt=jpeg

    展开全文
  • spark streaming拉取kafka数据, 结合sparkSql dataframe hive存储计算,输出到mysql. 数据清洗过程比较复杂,没办法,上游给的屡一样的数据,正则去解析并全量按时间取最新一条去重。 每天kafka数据5千万条。1...
  • spark streaming每个 job的数据量 与以下几个参数有关。 1. 批次间隔时间,例如5秒拉取一次 2. 自己配置的 每个partition 一次最少拉取的条数 假设5秒一个批次 ,kafka 5个partition,配置每个partition最少拉取...
  • 简单解析一下DirectKafkaInputDStream的概念1:获取kafka数据 由于DirectKafkaInputDStream 不是继承自ReceiverInputDStream。所以不需要有Receiver 所以也就没有获取数据,然后将数据存在内存block里面,也就没有...
  • --conf spark.streaming.kafka.maxRatePerPartition=10000 \ 单位数据条数
  • Spark读取kafka的两种方式 spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;另一种是定时批量读取。 这两种方式分别是: Receiver-base Direct Receiver-base: Spark官方最先提供了...
  • 接 使用idea编写SparkStreaming消费kafka中的数据【小案例】(四) https://georgedage.blog.csdn.net/article/details/103508619 先对上篇做一个回顾,在上一篇我们编写消费者,并且使用sparkStreaming对kafka中...
  • 1.springboot+kafka的注解、客户端,消费数据为ConsumerRecord<?, ?> record对象,通过record获取: 参考链接:https://blog.csdn.net/weixin_39249427/article/details/100012629 kafka+SparkStreaming...
  • 之前讲述了spark如何从kafka中消费数据,这次来将一下spark如何将数据写入到kafka中。 一、spark写消息到kafka中 直接在spark或者sparkstreaming每一批次处理结束后,在rdd.foreachPartition方法体内创建new ...
  • 1 ,kafkaStreaming 架构模型 : 2 ,代码思路 : 3 ,spark 两种 API : ...kafka 数据,默认保存 7 天。 从 zk 读取 offset 。 创建 kafka 消费者,消费数据。 5 ,sparkStreaming 的各种数据源 : 文件数据...
  • 第一个是:Receiver-base 基于Receiver的kafka数据消费模式就是先把数据kafka中读取出来然后缓存到内存然后再定时处理。这里要引入一个参数配置spark.streaming.receiver.writeAheadLog.enable 防止集群闪退偏移量...
  • 前面有说道spark-streaming的简单demo,也有说到kafka成功跑通的例子,这里就结合二者,也是常用的使用之一。1.相关组件版本 首先确认版本,因为跟之前的版本有些不一样,所以才有必要记录下,另外仍然没有使用...
  • 昨天面试中被问到kafka怎么做到对于数据的不...假如消费了一段时间之后,kafka挂掉了,这时候需要将sparkstreaming起来,然后继续进行消费。那么这时候是不是又进行从头开始消费了呢?不是的,因为kafka中有一个...
  • kafka kafka中文教程 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑...
  • receiver从Kafka中获取的数据都存储在Spark Executor的内存中,然后Spark Streaming启动的job会去处理那些数据 要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL...
  • spark streaming是以batch的方式来...不过这两种方式都是先把数据kafka中读取出来,然后缓存在内存或者第三方,再定时处理。如果这时候集群退出,而偏移量又没处理好的话,数据就丢掉了。 而spark streaming提供了
  • spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;...Spark官方最先提供了基于Receiver的Kafka数据消费模式。不过这种方式是先把数据kafka中读取出来,然后缓存在内存,再定
1 2 3 4 5 ... 20
收藏数 2,188
精华内容 875
关键字:

kafka spark 拉数据