kafka读取 spark_spark 读取kafka - CSDN
  • Spark streaming接收Kafka数据 基于Receiver的方式 直接读取方式 Sparkkafka中写入数据 Spark streaming+Kafka应用 Spark streaming+Kafka调优 合理的批处理时间(batchDuration) 合理的Kafka拉取量...

    目录

    前言

    在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消费者一端,我们利用了spark streaming从kafka中不断拉取数据进行词频统计。本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka在舆情项目中的应用,最后将自己在Spark Streaming+kafka的实际优化中的一些经验进行归纳总结。(如有任何纰漏欢迎补充来踩,我会第一时间改正^v^)

    Spark streaming接收Kafka数据

    用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。

    基于Receiver的方式

    这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据。如下图:
    Receiver图形解释
    在使用时,我们需要添加相应的依赖包:

    <dependency><!-- Spark Streaming Kafka -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>

    而对于Scala的基本使用方式如下:

    import org.apache.spark.streaming.kafka._
    
     val kafkaStream = KafkaUtils.createStream(streamingContext, 
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

    还有几个需要注意的点:

    • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
    • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
    • 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

    直接读取方式

    在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图:

    这种方法相较于Receiver方式的优势在于:

    • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
    • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
    • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

    以上主要是对官方文档[1]的一个简单翻译,详细内容大家可以直接看下官方文档这里不再赘述。

    不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然zookeeper就保存了当前消费的offset值,那么如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,我们是直接从kafka来读数据,那么offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到zookeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到zookeeper中的通用类,我将其放在了github上:
    Spark streaming+Kafka demo
    示例中KafkaManager是一个通用类,而KafkaCluster是kafka源码中的一个类,由于包名权限的原因我把它单独提出来,ComsumerMain简单展示了通用类的使用方法,在每次创建KafkaStream时,都会先从zooker中查看上次的消费记录offsets,而每个batch处理完成后,会同步offsets到zookeeper中。

    Spark向kafka中写入数据

    上文阐述了Spark如何从Kafka中流式的读取数据,下面我整理向Kafka中写数据。与读数据不同,Spark并没有提供统一的接口用于写入Kafka,所以我们需要使用底层Kafka接口进行包装。
    最直接的做法我们可以想到如下这种方式:

    input.foreachRDD(rdd =>
      // 不能在这里创建KafkaProducer
      rdd.foreachPartition(partition =>
        partition.foreach{
          case x:String=>{
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            println(x)
            val producer = new KafkaProducer[String,String](props)
            val message=new ProducerRecord[String, String]("output",null,x)
            producer.send(message)
          }
        }
      )
    ) 

    但是这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接。如何解决呢?

    1. 首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:
    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
    class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, key, value))
      def send(topic: String, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, value))
    }
    
    object KafkaSink {
      import scala.collection.JavaConversions._
      def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
          producer
        }
        new KafkaSink(createProducerFunc)
      }
      def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    }
    1. 之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:
    // 广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", Conf.brokers)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      log.warn("kafka producer init done!")
      ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }

    这样我们就能在每个executor中愉快的将数据输入到kafka当中:

    //输出到kafka
    segmentedStream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {
        rdd.foreach(record => {
          kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
          // do something else
        })
      }
    })

    Spark streaming+Kafka应用

    WeTest舆情监控对于每天爬取的千万级游戏玩家评论信息都要实时的进行词频统计,对于爬取到的游戏玩家评论数据,我们会生产到Kafka中,而另一端的消费者我们采用了Spark Streaming来进行流式处理,首先利用上文我们阐述的Direct方式从Kafka拉取batch,之后经过分词、统计等相关处理,回写到DB上(至于Spark中DB的回写方式可参考我之前总结的博文:Spark踩坑记——数据库(Hbase+Mysql)),由此高效实时的完成每天大量数据的词频统计任务。

    Spark streaming+Kafka调优

    Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。

    合理的批处理时间(batchDuration)

    几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,如下图:

    合理的Kafka拉取量(maxRatePerPartition重要)

    对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的,配置参数为:spark.streaming.kafka.maxRatePerPartition。这个参数默认是没有上线的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time,如下图:

    缓存反复使用的Dstream(RDD)

    Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数,如下图:

    设置合理的GC

    长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

    设置合理的CPU资源数

    CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

    设置合理的parallelism

    partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
    在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

    使用高性能的算子

    这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

    • 使用reduceByKey/aggregateByKey替代groupByKey
    • 使用mapPartitions替代普通map
    • 使用foreachPartitions替代foreach
    • 使用filter之后进行coalesce操作
    • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

    使用Kryo优化序列化性能

    这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。
    在Spark中,主要有三个地方涉及到了序列化:

    • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
    • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
    • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

    // 创建SparkConf对象。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    // 设置序列化器为KryoSerializer。
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

    结果

    经过种种调试优化,我们最终要达到的目的是,Spark Streaming能够实时的拉取Kafka当中的数据,并且能够保持稳定,如下图所示:

    当然不同的应用场景会有不同的图形,这是本文词频统计优化稳定后的监控图,我们可以看到Processing Time这一柱形图中有一Stable的虚线,而大多数Batch都能够在这一虚线下处理完毕,说明整体Spark Streaming是运行稳定的。

     

    转载自:

    https://www.cnblogs.com/xlturing/p/6246538.html#spark%E5%90%91kafka%E4%B8%AD%E5%86%99%E5%85%A5%E6%95%B0%E6%8D%AE

    展开全文
  • kafkaspark总结

    2019-01-08 09:46:36
    kafkaspark总结 本文涉及到的技术版本号: scala 2.11.8 kafka1.1.0 spark2.3.1 kafka简介 kafka是一个分布式流平台,流媒体平台有三个功能 发布和订阅记录流 以容错的持久化的方式存储记录流 发生数据时对流...

    kafka和spark总结

    本文涉及到的技术版本号:

    • scala 2.11.8
    • kafka1.1.0
    • spark2.3.1

    kafka简介

    kafka是一个分布式流平台,流媒体平台有三个功能

    • 发布和订阅记录流
    • 以容错的持久化的方式存储记录流
    • 发生数据时对流进行处理

    kafka通常用于两大类应用

    • 构件在系统或应用程序之间可靠获取数据的实时数据管道
    • 构件转换或响应数据流的实时流应用程序

    kafka的几个概念

    • kafka运行在集群上,或一个或多个能跨越数据中心的服务器上
    • kafka集群上存储流记录的称为topic
    • kafka的topic里,每一条记录包括一个key、一个value和一个时间戳timestamp

    kafka有四个核心API

    • Producer API

      生产者api允许应用程序发布一个记录流到一个或多个kafka的topic

    • Consumer API

      消费者api允许应用程序订阅一个或多个topic并且接受处理传送给消费者的数据流

    • Streams API

      流api允许应用程序作为一个流处理器,从一个或多个输入topic中消费输入流,并生产一个输出流到一个或多个输出topic中

    • Connector API

      连接器api允许构建和运行中的kafka的topic连接到现有的应用程序或数据系统中重用生产者或消费者。例如关系数据库的连接器可以捕获对表的每一个更改操作

    kafka中的客户端和服务端之间是通过简单、高性能的语言无关的TCP协议完成的,该协议已经版本化并且高版本向低版本向后兼容。

    topics

    topic为kafka为记录流提供的核心抽象,类似于数据通道,并且topic是发布记录和订阅的核心。

    kafka的topic是多用户的,一个topic可以有0个、1个或多个消费者订阅记录

    对于每一个topic,kafka集群都维护了一个如下所示的分区记录:
    topic

    其中每一个分区都是有序的不可变的记录序列,并且新数据是不断的追加到结构化的记录中。分区中的记录每个都分配了一个offset作为ID,它唯一标识分区中的每个记录。

    kafka集群默认是保存所有记录,无论是否被消费过,但是可以通过配置保留时间策略。例如如果设置数据保留策略为两天,则超过两天的数据将被丢弃释放空间。kafka的性能受数据大小影响不大,因此长时间的存储数据并不是太大的问题。

    其中,kafka 的消费者唯一对topic中的每一个分区都可以设置偏移量offset,标识当前消费者从哪个分区的哪一条数据开始消费,消费者通过对偏移量的设置可以灵活的对topic进行消费。如下图
    offset

    消费者控制自己的偏移量就意味着kafka的消费者是轻量的,消费者之间互不影响。

    topic记录中的分区有多种用途,首先它允许topic扩展到超出单台服务器适合的大小。每个分区都需要有适合托管分区的服务器,而topic可以有很多分区,因此一个topic可以处理任意数量的数据。另外这些分区作为并行的单位,效率很高,这也是相当重要的一点。

    分配

    记录分区分布在kafka集群服务器上,每个服务器共同处理数据并请求分区的共享。每个分区都可以在可用服务器的数量上进行复制,以此实现容错。

    每一个分区都会有一个服务器作为leader,0个或多个服务器作为followers。leader处理分区的所有读取和写入请求,而follower被动的复制leader。如果leader出错,则其中一个follower会自动称为新的leader。集群中的每个服务器都充当某分区的leader和其他分区的follower,因此能在集群中达到负载均衡。

    生产者

    生产者将数据发布到所选择的分区,生产者在发布数据是需要选择将数据发送到哪个分区,分配分区可以通过循环方式完成也可以根据语义分区的功能实现。

    消费者

    消费者使用消费者组(consumer group)标记自己。发布到topic的每个记录会被发送到每个消费者组中的一个消费者实例。所以当一个消费者组中有多个消费者实例,则记录将在该消费者组中的所有消费者之间进行有效的负载均衡。

    topic接受的每一条记录都会被广播发送到每个消费者组中。示意图如下:

    消费者
    上图有两个机器的kafka集群,某topic有四个分区p0-p3,有两个消费者组A/B订阅该topic,消费者组A有两个消费者实例,消费者组B有四个消费者实例。

    kafka中实现消费的方式是通过在消费者实例上划分分区实现,保证实例在任何时间点都是公平分配的。消费者组中的成员划分分区是由kafka协议进行动态处理。如果新实例加入该组,那新加入的实例会从改组的成员中接管一些分区。如果消费者组中的某个实例死亡,则它所划分的分区分配给该消费组的其他实例。

    kafka只能提供一个分区内的记录的顺序,但是不保证多个分区的记录顺序。如果用户想保证topic中的顺序,则使用一个分区的topic即可,但这样就意味着每个消费者组中只能有一个消费者实例。

    kafka提供的保证

    • 同一个生产者实例发送到特定topic的特定分区的两条数据M1和M2,并且M1发送早于M2,则M1将拥有更低的偏移量,即可以保证插入顺序。
    • 消费者可以按照记录存储的顺序消费记录
    • 对于复制因子为N的topic,最多可以容忍N-1个服务器故障,而不会丢失提交到topic中的记录

    kafka常用命令

    • 启动Zookeeper server

      bin/zookeeper-server-start.sh config/zookeeper.properties &
      
    • 启动Kafka server

      nohup bin/kafka-server-start.sh config/server.properties &
      
    • 停止Kafka server

      bin/kafka-server-stop.sh
      
    • 停止Zookeeper server

      bin/zookeeper-server-stop.sh
      
    • producer

      bin/kafka-console-producer.sh --broker-list 192.168.20.133:9092 --topic realtime
      
    • consumer

      bin/kafka-console-consumer.sh --zookeeper 192.168.20.133:2181 --topic realtime --from-beginning
      
    • 查看所有topic

      bin/kafka-topics.sh --list --zookeeper localhost:2181
      
    • 创建一个topic

      bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic realtime0103
      
    • 查看topic详情

      bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic realtime0103
      
    • 删除topic

      bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic realtime0103
      

    java操作kafka

    引入jar包

      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>1.1.0</version>
      </dependency>
    

    Producer

    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class ProducerDemo {
    
      public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.20.133:9092,192.168.20.134:9092,192.168.20.135:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        String topic = "realtime0103";
    
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        String value = "{'name':'1','value':1}" ; 
        
        //设定分区规则,分区为0,1,2
        int partation = KafkaProducerClient.count.get() % 3;
    
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,partation, "key1",value );
          
        producer.send(record).get();
      }
    }
    

    Customer

    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    
    public class CustomerDemo {
    
      private static KafkaConsumer<String, String> consumer;
      private static String inputTopic;
    
      @SuppressWarnings({ "unchecked", "rawtypes" })
      public static void main(String[] args) {
        String groupId = "group1";
        inputTopic = "realtime0103";
        String brokers = "192.168.20.133:9092";
    
        consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId));
        
        //分配topic 某分区的offset
        TopicPartition part0 = new TopicPartition(inputTopic, 0);
        TopicPartition part1 = new TopicPartition(inputTopic, 1);
        TopicPartition part2 = new TopicPartition(inputTopic, 2);
        OffsetAndMetadata offset0 = new OffsetAndMetadata(1);
        OffsetAndMetadata offset1 = new OffsetAndMetadata(2);
        OffsetAndMetadata offset2 = new OffsetAndMetadata(3);
        Map<TopicPartition,OffsetAndMetadata> offsetMap = new HashMap<>();
        offsetMap.put(part0,offset0);
        offsetMap.put(part1,offset1);
        offsetMap.put(part2,offset2);
        //提交offset信息
        consumer.commitSync(offsetMap);
        
        start();
    
      }
    
      private static Properties createConsumerConfig(String brokers, String groupId) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokers);
            props.put("group.id", groupId);
            props.put("auto.commit.enable", "false");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            return props;
        }
    
      private static void start() {
        consumer.subscribe(Collections.singletonList(inputTopic));
    
            System.out.println("Reading topic:" + inputTopic);
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record: records) {
                    String ip = record.key();
                    String event = record.value();
                    System.out.println(event);
                }
                consumer.commitSync();
            }
    
      }
    }
    

    spark操作kafka

    IDEA配置搭建spark scala开发环境(Windows)

    • 安装jdk8并配置环境变量
    • 安装scala2.11并配置环境变量(本文安装2.11.8)
    • 安装IDEA
    • IDEA安装SBT和Scala插件
    • File->New->Project 创建新项目,选择Scala->sbt->next

    新建项目

    • 选择项目名称、位置、Java版本号、sbt版本和Scala版本,Finish

    选择版本

    • 打开build.sbt,添加相关依赖

      libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
      libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.1" % "provided"
      libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.1"
      libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
      
    • 刷新sbt项目,下载依赖:

    刷新

    • 编写业务代码,可以使用以下的使用spark Structured Streaming连接kafka处理流部分代码
    • 设置打包规则
      • File->Project Sturcture->Artifacts 点击绿色加号设置打jar包规则

        Artifacts

      • 选择Module和Main class,JAR file from libraries选择copy to output…即不将外部jar打包到jar文件中

        Artifacts

      • 导航栏 Build->Build Artifacts ,打包成jar,将jar包上传到spark集群

    • 运行程序:
      • 以上配置打出jar包包含项目jar包和多个依赖jar包,提交spark作业时,可以使用–jars逗号隔开配置引用多个外部jar

        cd $SPARK_HOME
        ./bin/spark-submit --master spark://192.168.20.133:7077 --jars /root/interface-annotations-1.4.0.jar,/root/async-1.4.1.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class com.xuchg.app.Application /root/spark-kafka.jar
        

    使用spark Structured Streaming连接kafka处理流

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    object Main extends App {
    
      //spark读取kafka示例
      Logger.getLogger("org").setLevel(Level.ERROR)
      val kafkaAddress = "192.168.20.133:9092"
      val zookeeper = "192.168.20.133:2181"
      val topic = "realtime0103"
      val topicOffset = "{\""+topic+"\":{\"0\":0,\"1\":0,\"2\":0}}"
      val sparkSession = SparkSession
        .builder()
        .config(new SparkConf()
          .setMaster("local[2]")
          .set("spark.streaming.stopGracefullyOnShutdown","true")//设置spark,关掉sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
          .set("spark.submit.deployMode","cluster")
          .set("spark.executor.memory","4g")//worker内存
          .set("spark.driver.memory","4g")
          .set("spark.cores.max","2")//设置最大核心数
        )
        .appName(getClass.getName)
        .getOrCreate()
    
      def createStreamDF(spark:SparkSession):DataFrame = {
        import spark.implicits._
        val df = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaAddress)
          .option("zookeeper.connect", zookeeper)
          .option("subscribe", topic)
          .option("startingOffsets", topicOffset)
          .option("enable.auto.commit", "false")
          .option("failOnDataLoss", false)
          .option("includeTimestamp", true)
          .load()
        df
      }
    
      var df = createStreamDF(sparkSession)
    
      val query = df.writeStream
        .format("console")
        .start()
    
      query.awaitTermination()
    }
    

    监控spark和kafka

    此处根据实际应用情况使用两种监控方法,解决两个不同问题

    • 解决spark启动和停止处理的动作,例如监听spark停止时处理或记录完所有计算

      //定义监听类继承SparkListener,并重写相关方法
      import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}
      class AppListener extends SparkListener{
        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
          //监控spark停止方法,可以处理spark结束的动作
          println("application 关闭")
        }
      
        override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
          println("application 启动")
        }
      }
      
      //在主类中注册监听类
      sparkSession.sparkContext.addSparkListener(new AppListener)
      
    • 监控spark的查询,例如spark读取kafka流的偏移量offset,可以监听并记录下来,下次启动spark可以直接从该偏移量offset进行消费,不会消费相同的数据

      sparkSession.streams.addListener(new StreamingQueryListener() {
        override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
          println("Query started: " + queryStarted.id)
        }
        override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
          //服务出现问题而停止
          println("Query terminated: " + queryTerminated.id)
        }
        override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
          var progress = queryProgress.progress
          var sources = progress.sources
          if(sources.length>0){
            var a = 0
            for(a <- 0 to sources.length - 1){
              var offsetStr = sources.apply(a).startOffset
              if(offsetStr!=null){
                println("检测offset是否变化 -- " + offsetStr)
              }
            }
          }
        }
      })
      

      运行结果如下:可以看到对topic的每个分区的偏移量都可以获取到
      offset

    管理和停止spark程序

    在spark集群主节点配置并使用spark history server可以实现对spark作业进行管理和监控

    • 配置spark history server

      • 修改$SPARK_HOME/conf/spark-defaults.conf,如果不存在则从模板复制一份

        cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
        vi $SPARK_HOME/conf/spark-defaults.conf
        
      • 修改配置文件如下:

        spark.eventLog.enabled           true
        spark.eventLog.dir               hdfs://192.168.20.133:9000/spark-history
        spark.eventLog.compress          true
        # 注意ip地址需要根据情况更改,9000为hdfs默认端口号,如hdfs端口号不是9000则需要更改
        
      • 创建hdfs目录

        $HADOOP_HOME/bin/hdfs dfs -mkdir /spark-history
        
      • 配置$SPARK_HOME/conf/spark-env.sh文件:

        • 如果不存在则从模板复制:

          cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
          
        • 编辑$SPARK_HOME/conf/spark-env.sh,结尾添加:

          export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://192.168.20.133:9000/spark-history"
          # 18080为history server的访问端口号,192.168.20.133:9000为hdfs的ip和端口,根据实际情况修改
          
      • 打开防火墙18080端口

      • 执行命令打开history server服务

        $SPARK_HOME/sbin/start-history-server.sh
        
    • 代码中sparkSession创建时添加配置:

      //设置spark,关掉sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
      new SparkConf().set("spark.streaming.stopGracefullyOnShutdown","true")
      
    • 使用shell关掉某一个正在运行的spark作业:

      • spark作业关闭原理
        每一个spark作业都由一个appId唯一标识,而每一个作业包含多个Executors执行器,这些Executors中包含1个或几个id为driver的驱动程序,它是执行开发程序中的 main方法的进程。如果驱动程序停止,则当前spark作业就结束了。

      • spark关闭过程

        • 获取某appId的spark作业的driver节点的ip和端口,可以通过spark history server提供的页面或提供的api进行获取。此处介绍页面获取,后面会介绍api获取

          finddriver

        • 根据获取的driver的端口号对spark作业发送停止命令,当然有时ctrl+c和在监控页面上都是可以直接停止的,但此处只提用shell停止,应用场景更广泛。

          centod7:ss -tanlp |  grep 60063|awk '{print $6}'|awk  -F, '{print $2}'|awk -F= '{print $2}'|xargs kill -15
          centos6:ss -tanlp |  grep 60063|awk '{print $6}'|awk  -F, '{print $2}'|xargs kill -15
          

          注意:centos6和centos7稍有不同,而且此处使用kill -15而不是kill -9,kill -9会直接杀死进程,可能会导致丢失数据。而kill -15是发送停止命令,不会直接杀死进程。

      • 通过以上内容可以实现在spark集群主节点部署web服务接收并远程调用执行shell语句来达到远程动态启动(可传参)和停止spark作业,大体如下:

        • 远程调用接口传参启动spark作业,此时记录下spark运行的appid

        • 通过调用spark history server提供的REST Api获取当前作业driver进程的端口号:

          http://192.168.20.133:18080/api/v1/applications/{appId}/allexecutors
          

          driver

        • 通过获取到的端口号可以向spark集群主节点发送停止命令到该端口进程即可

    示例项目地址:github
    kafka官网
    spark官网

    展开全文
  • Spark读取kafka的两种方式 spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;另一种是定时批量读取。 这两种方式分别是: Receiver-base Direct Receiver-base: Spark官方最先提供了...

    Spark读取kafka的两种方式

    spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;另一种是定时批量读取。
    这两种方式分别是:

    • Receiver-base
    • Direct

    Receiver-base:

    Spark官方最先提供了基于Receiver的Kafka数据消费模式。不过这种方式是先把数据从kafka中读取出来,然后缓存在内存,再定时处理。如果这时候集群退出,而偏移量又没处理好的话,数据就丢掉了,存在程序失败丢失数据的可能,后在Spark 1.2时引入一个配置参数spark.streaming.receiver.writeAheadLog.enable以规避此风险。
    Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程:

    spark streaming启动过后,会选择一台excetor作为ReceiverSupervior
    1:Reciver的父级ReciverTracker分发多个job(task)到不同的executor,并启动ReciverSupervisor.
    2:ReceiverSupervior会启动对应的实例reciver(kafkareciver,TwitterReceiver),并调用onstart()
    3:kafkareciver在通过onstart()启动后就开启线程源源不断的接收数据,并交给ReceiverSupervior,通过ReceiverSupervior.store函数一条一条接收
    4:ReceiverSupervior会调用BlockGenertor.adddata填充数据。

    所有的中间数据都缓存在BlockGenertor

    1:首先BlockGenertor维护了一个缓冲区,currentbuffer,一个无限长度的arraybuffer。为了防止内存撑爆,这个currentbuffer的大小可以被限制,通过设置参数spark.streaming.reciver.maxRate,以秒为单位。currentbuffer所使用的内存不是storage(负责spark计算过程中的所有存储,包括磁盘和内存),而是珍贵的计算内存。所以currentbuffer应该被限制,防止占用过多计算内存,拖慢任务计算效率,甚至有可能拖垮Executor甚至集群。
    2:维护blockforpushing队列,它是等待被拉到到BlockManager的中转站。它是currentbuffer和BlockManager的中间环节。它里面的每一个元素其实就是一个currentbuffer。
    3:维护两个定时器,其实就是一个生产-消费模式。blockintervaltimer定时器,负责生产端,定时将currentbuffer放进blockforpushing队列。blockforpushingthread负责消费端,定时将blockforpushing里的数据转移到BlockManager。

    Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming计算引擎时,我们优先考虑采用此种方式来读取数据。

    Direct:

    这种方式是延迟的。也就是说当action真正触发时才会去kafka里接数据。因此不存在currentbuffer的概念。它把kafka每个分区里的数据,映射为KafkaRdd的概念。题外话,在structured streaming中,也已经向DataFrame和DataSet统一了,弱化了RDD的概念。
    真正与kafka打交道的是KafkaCluster,全限定名: org.apache.spark.streaming.kafka.KafkaCluster。包括设备kafka各种参数,连接,获取分区,以及偏移量,设置偏移量范围等。
    Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程我们可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。

    Spark Streaming提供了一些重载读取Kafka数据的方法,本文中关注两个基于Scala的方法,这在我们的应用场景中会用到,具体的方法代码如下:
    方法createDirectStream中,ssc是StreamingContext;kafkaParams的具体配置见Receiver-based之中的配置,与之一样;这里面需要指出的是fromOffsets ,其用来指定从什么offset处开始读取数据。

    方法createDirectStream中,该方法只需要3个参数,其中kafkaParams还是一样,并未有什么变化,不过其中有个配置auto.offset.reset可以用来指定是从largest或者是smallest处开始读取数据;topic是指Kafka中的topic,可以指定多个。

    在实际的应用场景中,我们会将两种方法结合起来使用,大体的方向分为两个方面:

    应用启动。当程序开发并上线,还未消费Kafka数据,此时从largest处读取数据,采用第二种方法;
    应用重启。因资源、网络等其他原因导致程序失败重启时,需要保证从上次的offsets处开始读取数据,此时就需要采用第一种方法来保证我们的场景

    Receive_base VS Direct两种方式的优缺点:

    Direct方式具有以下方面的优势:

    1、简化并行(Simplified Parallelism)。不现需要创建以及union多输入源,Kafka topic的partition与RDD的partition一一对应
    2、高效(Efficiency)。Receiver-based保证数据零丢失(zero-data loss)需要配置spark.streaming.receiver.writeAheadLog.enable,此种方式需要保存两份数据,浪费存储空间也影响效率。而Direct方式则不存在这个问题。
    3、强一致语义(Exactly-once semantics)。High-level数据由Spark Streaming消费,但是Offsets则是由Zookeeper保存。通过参数配置,可以实现at-least once消费,此种情况有重复消费数据的可能。
    4、降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
    5、降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。
    6、鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

    Direct方式的缺点:

    提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。
    监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。

    Receive-base优点:

    Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。

    Receive-base的缺点:

    1、防数据丢失。做checkpoint操作以及配置spark.streaming.receiver.writeAheadLog.enable参数,配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
    2、单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。
    3、在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费。
    4、提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。5、Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。
    6、采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度

    代码示例请参考:

    https://blog.csdn.net/weixin_44455388/article/details/102818048

    展开全文
  • spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;另一种是定时批量读取。 这两种方式分别是: Receiver-base Direct 一 、Receiver-base: Spark官方最先提供了基于Receiver...

    spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;另一种是定时批量读取。

    这两种方式分别是:

    • Receiver-base

    • Direct


    一 、Receiver-base:
    Spark官方最先提供了基于Receiver的Kafka数据消费模式。不过这种方式是先把数据从kafka中读取出来,然后缓存在内存,再定时处理。如果这时候集群退出,而偏移量又没处理好的话,数据就丢掉了,存在程序失败丢失数据的可能,后在Spark 1.2时引入一个配置参数spark.streaming.receiver.writeAheadLog.enable以规避此风险。
    Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图:


    spark streaming启动过后,会选择一台excetor作为ReceiverSupervior
    1:Reciver的父级ReciverTracker分发多个job(task)到不同的executor,并启动ReciverSupervisor.

    2:ReceiverSupervior会启动对应的实例reciver(kafkareciver,TwitterReceiver),并调用onstart()

    3:kafkareciver在通过onstart()启动后就开启线程源源不断的接收数据,并交给ReceiverSupervior,通过ReceiverSupervior.store函数一条一条接收

    4:ReceiverSupervior会调用BlockGenertor.adddata填充数据。

    所有的中间数据都缓存在BlockGenertor
    1:首先BlockGenertor维护了一个缓冲区,currentbuffer,一个无限长度的arraybuffer。为了防止内存撑爆,这个currentbuffer的大小可以被限制,通过设置参数spark.streaming.reciver.maxRate,以秒为单位。currentbuffer所使用的内存不是storage(负责spark计算过程中的所有存储,包括磁盘和内存),而是珍贵的计算内存。所以currentbuffer应该被限制,防止占用过多计算内存,拖慢任务计算效率,甚至有可能拖垮Executor甚至集群。

    2:维护blockforpushing队列,它是等待被拉到到BlockManager的中转站。它是currentbuffer和BlockManager的中间环节。它里面的每一个元素其实就是一个currentbuffer。


    3:维护两个定时器,其实就是一个生产-消费模式。blockintervaltimer定时器,负责生产端,定时将currentbuffer放进blockforpushing队列。blockforpushingthread负责消费端,定时将blockforpushing里的数据转移到BlockManager。



    Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming计算引擎时,我们优先考虑采用此种方式来读取数据,具体的代码如下:


    如上述代码,函数getKafkaInputStream提供了zookeeper, topic, groupId, numReceivers, partition以及ssc,其传入函数分别对应:


    • zookeeper: ZooKeeper连接信息

    • topic: Kafka中输入的topic信息

    • groupId: consumer信息

    • numReceivers: 打算开启的receiver个数, 并用来调整并发

    • partition: Kafka中对应topic的分区数


    以上几个参数主要用来连接Kafka并读取Kafka数据。具体执行的步骤如下:


    • Kafka相关读取参数配置,其中 zookeeper.connect即传入进来的zookeeper参数;auto.offset.reset设置从topic的最新处开始读取数据;zookeeper.connection.timeout.ms指zookeepr连接超时时间,以防止网络不稳定的情况;fetch.message.max.bytes则是指单次读取数据的大小;group.id则是指定consumer。

    • 指定topic的并发数,当指定receivers个数之后,但是由于receivers个数小于topic的partition个数,所以在每个receiver上面会起相应的线程来读取不同的partition。

    • 读取Kafka数据,numReceivers的参数在此用于指定我们需要多少Executor来作为Receivers,开多个Receivers是为了提高应用吞吐量。

    • union用于将多个Receiver读取的数据关联起来。



    二、Direct:

    这种方式是延迟的。也就是说当action真正触发时才会去kafka里接数据。因此不存在currentbuffer的概念。它把kafka每个分区里的数据,映射为KafkaRdd的概念。题外话,在structured streaming中,也已经向DataFrame和DataSet统一了,弱化了RDD的概念。

    真正与kafka打交道的是KafkaCluster,全限定名: org.apache.spark.streaming.kafka.KafkaCluster。包括设备kafka各种参数,连接,获取分区,以及偏移量,设置偏移量范围等。

    Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程我们可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。其具体读取方式如下图:


    Spark Streaming提供了一些重载读取Kafka数据的方法,本文中关注两个基于Scala的方法,这在我们的应用场景中会用到,具体的方法代码如下:

    • 方法createDirectStream中,ssc是StreamingContext;kafkaParams的具体配置见Receiver-based之中的配置,与之一样;这里面需要指出的是fromOffsets ,其用来指定从什么offset处开始读取数据。




    • 方法createDirectStream中,该方法只需要3个参数,其中kafkaParams还是一样,并未有什么变化,不过其中有个配置auto.offset.reset可以用来指定是从largest或者是smallest处开始读取数据;topic是指Kafka中的topic,可以指定多个。具体提供的方法代码如下:




    在实际的应用场景中,我们会将两种方法结合起来使用,大体的方向分为两个方面:

    • 应用启动。当程序开发并上线,还未消费Kafka数据,此时从largest处读取数据,采用第二种方法;

    • 应用重启。因资源、网络等其他原因导致程序失败重启时,需要保证从上次的offsets处开始读取数据,此时就需要采用第一种方法来保证我们的场景

    总体方向上,我们采用以上方法满足我们的需要,当然具体的策略我们不在本篇中讨论,后续会有专门的文章来介绍。从largest或者是smallest处读Kafka数据代码实现如下:



    程序失败重启的逻辑代码如下:


    代码中的fromOffsets参数从外部存储获取并需要处理转换,其代码如下:


    该方法提供了从指定offsets处读取Kafka数据。如果发现读取数据异常,我们认为是offsets失败,此种情况去捕获这个异常,然后从largest处读取Kafka数据。

    Receive_base VS   Direct两种方式的优缺点:
    Direct方式具有以下方面的优势:
    1、简化并行(Simplified Parallelism)。不现需要创建以及union多输入源,Kafka topic的partition与RDD的partition一一对应
    2、高效(Efficiency)。Receiver-based保证数据零丢失(zero-data loss)需要配置spark.streaming.receiver.writeAheadLog.enable,此种方式需要保存两份数据,浪费存储空间也影响效率。而Direct方式则不存在这个问题。
    3、强一致语义(Exactly-once semantics)。High-level数据由Spark Streaming消费,但是Offsets则是由Zookeeper保存。通过参数配置,可以实现at-least once消费,此种情况有重复消费数据的可能。
    4、降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
    5、降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。
    6、鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

    Direct方式的缺点:
    • 提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。

    • 监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。



    Receive-base优点:
    1、Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。

    Receive-base的缺点:
    1、防数据丢失。做checkpoint操作以及配置spark.streaming.receiver.writeAheadLog.enable参数,配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
    2、单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。
    3、在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费
    4、提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。5、Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。
    6、采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度





    展开全文
  • 简单了解一下Kafka:是一种高...当SparkStreaming与Kafka做集成的时候Kafka成了Streaming的高级数据源,由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来...

    简单了解一下Kafka:是一种高吞吐量的分布式发布订阅消息系统。依赖Zookeeper,因此搭建Kafka的时候需要事先搭建好Zookeeper。体系结构如下:

    当SparkStreaming与Kafka做集成的时候Kafka成了Streaming的高级数据源,由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程

    pom文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>ZDemo5</groupId>
        <artifactId>ZDemo5</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <spark.version>2.1.0</spark.version>
            <scala.version>2.11</scala.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>
        </dependencies>
    
    </project>

    启动kafka:bin/kafka-server-start.sh config/server.properties &       ---后台方式启动

    创建topic:bin/kafka-topics.sh --create --zookeeper bigdata111:2181 -replication-factor 1 --partitions 3 --topic mydemo2

    发布消息:bin/kafka-console-producer.sh --broker-list bigdata111:9092 --topic mydemo2

    SparkStreaming读取Kafka数据源由两种模式: 

    模式一:Receiver 的方式,Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。

    这种方法吞吐量不高

    代码实现:

    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.storage.StorageLevel
    
    object KafkaRecciver {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))
        ssc.checkpoint("hdfs://bigdata111:9000/checkpoint")
        //创建kafka对象   生产者 和消费者 
        //模式1 采取的是 receiver 方式  reciver 每次只能读取一条记录
        val topic = Map("mydemo2" -> 1)
        //直接读取的方式  由于kafka 是分布式消息系统需要依赖Zookeeper
        val data = KafkaUtils.createStream(ssc, "192.168.128.111:2181", "mygroup", topic, StorageLevel.MEMORY_AND_DISK)
        //数据累计计算
        val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
        //进行数据统计当前值加上之前的值
        var total = curVal.sum
        //最初的值应该是0
        var previous = preVal.getOrElse(0)
        //Some 代表最终的返回值
        Some(total+previous)
      }
       val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print()
       //启动ssc
       ssc.start()
       ssc.awaitTermination()
        
      }
    }

    模式二:与基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。这种模式可以有效提高系统的吞吐量

    代码实现:

    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.kafka.KafkaUtils
    import kafka.serializer.StringDecoder
    
    object KafkaDirector {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        //构建conf ssc 对象
        val conf = new SparkConf().setAppName("Kafka_director").setMaster("local[2]")
        val ssc = new StreamingContext(conf,Seconds(3))
        //设置数据检查点进行累计统计单词
        ssc.checkpoint("hdfs://192.168.128.111:9000/checkpoint")
        //kafka 需要Zookeeper  需要消费者组
        val topics = Set("mydemo2")
        //                                     broker的原信息                                  ip地址以及端口号
        val kafkaPrams = Map[String,String]("metadata.broker.list" -> "192.168.128.111:9092")
        //                                          数据的输入了类型    数据的解码类型
        val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaPrams, topics)
        val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
          //进行数据统计当前值加上之前的值
          var total = curVal.sum
          //最初的值应该是0
          var previous = preVal.getOrElse(0)
          //Some 代表最终的但会值
          Some(total+previous)
        }
        //统计结果
        val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print() 
        //启动程序
        ssc.start()
        ssc.awaitTermination()
        
      }
    }

    结果展示:

    -------------------------------------------
    Time: 1536325032000 ms
    -------------------------------------------
    (love,1)
    (Beijing,1)
    (I,1)
    
    -------------------------------------------
    Time: 1536325035000 ms
    -------------------------------------------
    (love,2)
    (Beijing,1)
    (I,2)
    (Shanghai,1)
    
    -------------------------------------------
    Time: 1536325038000 ms
    -------------------------------------------
    (love,2)
    (Beijing,1)
    (I,2)
    (Shanghai,1)
    
    -------------------------------------------
    Time: 1536325041000 ms
    -------------------------------------------
    (love,2)
    (Beijing,1)
    (I,2)
    (Shanghai,1)
    

     

    展开全文
  • SparkStreaming 读取kafka中数据(spark2.3.2) 流处理一般都会涉及到kafka,所以用SparkStreaming读取kafka中数据是流处理的必备技能之一。 1.先在pom.xml文件中添加 ${spark.version} 即你的spark的版本,我spark是...
  • Receive是使用的高级API,需要消费者连接Zookeeper来读取数据。是由Zookeeper来维护偏移量,不用我们来手动维护,这样的话就比较简单一些,减少了代码量。但是天下没有免费的午餐,它也有很多缺点: 1.导致丢失数据...
  • 第一个是:Receiver-base 基于Receiver的kafka数据消费模式就是先把数据从kafka中读取出来然后缓存到内存然后再定时处理。这里要引入一个参数配置spark.... Receiver-basede kafka读取模式是基于Kafka的高阶(high-...
  • 作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据... 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版
  • 待更新
  • 解决方法:基于 Flume+Kafka+Spark Streaming 的框架对这些任务的输出日志进行实时监控,当检测到日志出现Error的信息就发送邮件给项目的负责人。 目的:通过这个小项目熟悉基于 Flume+Kafka+Spark Streaming 框架...
  • Kafka 简介 kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信。 kafka...
  • Impala 操作/读写 Kudu,... spark读取kudu表导出数据为parquet文件(spark kudu parquet) kudu 导入/导出 数据 Kudu 分页查询的两种方式 map、flatMap(流的扁平化)、split 的区别 Spark(SparkSql) 写数据到 ...
  • Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-streaming-kafka-0-10。在使用时应注意以下几点: spark-streaming-kafka-0-8...
  • Spark是一个用来是实现快速而通用的集群计算的平台。Spark是UC Berkeley AMP Lab(加州大学伯克利分销的AMP实验室)所开源的类MapReduce的通用并行框架, 现在已经是Apache中的一个顶级项目。Spark使用Scala语言开发...
  • 每个公司想要进行数据分析或数据挖掘,收集日志、ETL都是第一步...首先,业务日志会通过Nginx(或者其他方式,我们是使用Nginx写入日志)每分钟写入到磁盘中,现在我们想要使用Spark分析日志,就需要先将磁盘中的文件上
  • 本文主要记录使用SparkStreaming从Kafka读取数据,并计算WordCount 主要内容: 1.本地模式运行SparkStreaming 2.yarn-client模式运行 相关文章:1.Spark之PI本地2.Spark之WordCount集群3.SparkStreaming之读取...
  • 文章目录单机版环境搭建及相关DEMOFlumeFlume基本介绍与架构Flume安装部署案例实操Kafka环境搭建Kafka控制台的一些命令操作Java API控制KafkaFlume+Kafka配合SparkSpark 简介Spark环境搭建在Spark Shell 中运行代码...
  • Kafka总结(一):Kafka概述 Kafka总结(二):Kafka核心组件 Kafka总结(三):Kafka核心流程分析 Kafka总结(四):Kafka命令操作 Kafka总结(五):API编程详解 Kafka总结(六):Kafka Stream详解 Kafka...
  • spark stream从kafka读取数据,10秒间隔;需要缓存当天数据用于业务分析。 思路1:定义static rdd用于union每次接收到的rdd;用window窗口(窗口长1小时,滑动步长20分钟);union之后checkpoint。 但是发现在利用...
1 2 3 4 5 ... 20
收藏数 12,909
精华内容 5,163
关键字:

kafka读取 spark