kafka读取数据到spark_spark读取kafka数据到hdfs - CSDN
  • sparkStreaming读取kafka的两种方式

    千次阅读 2018-04-20 15:34:46
    第一个是:Receiver-base 基于Receiver的kafka数据消费模式就是先把数据从kafka中读取出来然后缓存内存然后再定时处理。这里要引入一个参数配置spark.... Receiver-basede kafka读取模式是基于Kafka的高阶(high-...
    第一个是:Receiver-base
    基于Receiver的kafka数据消费模式就是先把数据从kafka中读取出来然后缓存到内存然后再定时处理。这里要引入一个参数配置spark.streaming.receiver.writeAheadLog.enable 防止集群闪退偏移量没做好造成数据丢失。
    Receiver-basede kafka读取模式是基于Kafka的高阶(high-level)api来实现对卡夫卡的数据消费,再提交spark Streaming任务后 Spark集群会指定Receiver专门持续不断的异步读取kafka的数据
    读取事件间隔以及每次读取offsets范围可以由参数来配置,读取的数据保存在Receiver中,具体StoragelLevel方式由用户指定比如仅仅是内存,当driver触发Batch任务的时候,Receiver中的数据会转移到剩余的Executor中去执行。在执行完之后Receiver会相应更新Zookeeper的offsets如果要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true

    第二个是:Direct
    这种方式是延迟的,也就说当action真正触发时才会去kafka里面接收数据,因此不存在currentbuffer的概念,他把kafka每个分区里的数据映射为kafkaRdd概念

    一般用前者因为:
    提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based 那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。
    监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。



    sparkstreaming读取kafka的两种方式
    原创 2017年08月27日 13:59:28
    spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存到内存中;另一种是定时批量读取。
    这两种方式分别是:
    • Receiver-base
    • Direct

    一 、Receiver-base:
    Spark官方最先提供了基于Receiver的Kafka数据消费模式。不过这种方式是先把数据从kafka中读取出来,然后缓存在内存,再定时处理。如果这时候集群退出,而偏移量又没处理好的话,数据就丢掉了,存在程序失败丢失数据的可能,后在Spark 1.2时引入一个配置参数spark.streaming.receiver.writeAheadLog.enable以规避此风险。
    Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图:


    spark streaming启动过后,会选择一台excetor作为ReceiverSupervior
    1:Reciver的父级ReciverTracker分发多个job(task)到不同的executor,并启动ReciverSupervisor.

    2:ReceiverSupervior会启动对应的实例reciver(kafkareciver,TwitterReceiver),并调用onstart()

    3:kafkareciver在通过onstart()启动后就开启线程源源不断的接收数据,并交给ReceiverSupervior,通过ReceiverSupervior.store函数一条一条接收

    4:ReceiverSupervior会调用BlockGenertor.adddata填充数据。

    所有的中间数据都缓存在BlockGenertor
    1:首先BlockGenertor维护了一个缓冲区,currentbuffer,一个无限长度的arraybuffer。为了防止内存撑爆,这个currentbuffer的大小可以被限制,通过设置参数spark.streaming.reciver.maxRate,以秒为单位。currentbuffer所使用的内存不是storage(负责spark计算过程中的所有存储,包括磁盘和内存),而是珍贵的计算内存。所以currentbuffer应该被限制,防止占用过多计算内存,拖慢任务计算效率,甚至有可能拖垮Executor甚至集群。

    2:维护blockforpushing队列,它是等待被拉到到BlockManager的中转站。它是currentbuffer和BlockManager的中间环节。它里面的每一个元素其实就是一个currentbuffer。

    3:维护两个定时器,其实就是一个生产-消费模式。blockintervaltimer定时器,负责生产端,定时将currentbuffer放进blockforpushing队列。blockforpushingthread负责消费端,定时将blockforpushing里的数据转移到BlockManager。


    Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming计算引擎时,我们优先考虑采用此种方式来读取数据,具体的代码如下:


    如上述代码,函数getKafkaInputStream提供了zookeeper, topic, groupId, numReceivers, partition以及ssc,其传入函数分别对应:

    • zookeeper: ZooKeeper连接信息
    • topic: Kafka中输入的topic信息
    • groupId: consumer信息
    • numReceivers: 打算开启的receiver个数, 并用来调整并发
    • partition: Kafka中对应topic的分区数

    以上几个参数主要用来连接Kafka并读取Kafka数据。具体执行的步骤如下:

    • Kafka相关读取参数配置,其中 zookeeper.connect即传入进来的zookeeper参数;auto.offset.reset设置从topic的最新处开始读取数据;zookeeper.connection.timeout.ms指zookeepr连接超时时间,以防止网络不稳定的情况;fetch.message.max.bytes则是指单次读取数据的大小;group.id则是指定consumer。
    • 指定topic的并发数,当指定receivers个数之后,但是由于receivers个数小于topic的partition个数,所以在每个receiver上面会起相应的线程来读取不同的partition。
    • 读取Kafka数据,numReceivers的参数在此用于指定我们需要多少Executor来作为Receivers,开多个Receivers是为了提高应用吞吐量。
    • union用于将多个Receiver读取的数据关联起来。


    二、Direct:
    这种方式是延迟的。也就是说当action真正触发时才会去kafka里接数据。因此不存在currentbuffer的概念。它把kafka每个分区里的数据,映射为KafkaRdd的概念。题外话,在structured streaming中,也已经向DataFrame和DataSet统一了,弱化了RDD的概念。
    真正与kafka打交道的是KafkaCluster,全限定名: org.apache.spark.streaming.kafka.KafkaCluster。包括设备kafka各种参数,连接,获取分区,以及偏移量,设置偏移量范围等。
    Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程我们可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。其具体读取方式如下图:


    Spark Streaming提供了一些重载读取Kafka数据的方法,本文中关注两个基于Scala的方法,这在我们的应用场景中会用到,具体的方法代码如下:
    • 方法createDirectStream中,ssc是StreamingContext;kafkaParams的具体配置见Receiver-based之中的配置,与之一样;这里面需要指出的是fromOffsets ,其用来指定从什么offset处开始读取数据。



    • 方法createDirectStream中,该方法只需要3个参数,其中kafkaParams还是一样,并未有什么变化,不过其中有个配置auto.offset.reset可以用来指定是从largest或者是smallest处开始读取数据;topic是指Kafka中的topic,可以指定多个。具体提供的方法代码如下:



    在实际的应用场景中,我们会将两种方法结合起来使用,大体的方向分为两个方面:
    • 应用启动。当程序开发并上线,还未消费Kafka数据,此时从largest处读取数据,采用第二种方法;
    • 应用重启。因资源、网络等其他原因导致程序失败重启时,需要保证从上次的offsets处开始读取数据,此时就需要采用第一种方法来保证我们的场景
    总体方向上,我们采用以上方法满足我们的需要,当然具体的策略我们不在本篇中讨论,后续会有专门的文章来介绍。从largest或者是smallest处读Kafka数据代码实现如下:


    程序失败重启的逻辑代码如下:


    代码中的fromOffsets参数从外部存储获取并需要处理转换,其代码如下:


    该方法提供了从指定offsets处读取Kafka数据。如果发现读取数据异常,我们认为是offsets失败,此种情况去捕获这个异常,然后从largest处读取Kafka数据。

    Receive_base VS   Direct两种方式的优缺点:
    Direct方式具有以下方面的优势:
    1、简化并行(Simplified Parallelism)。不现需要创建以及union多输入源,Kafka topic的partition与RDD的partition一一对应
    2、高效(Efficiency)。Receiver-based保证数据零丢失(zero-data loss)需要配置spark.streaming.receiver.writeAheadLog.enable,此种方式需要保存两份数据,浪费存储空间也影响效率。而Direct方式则不存在这个问题。
    3、强一致语义(Exactly-once semantics)。High-level数据由Spark Streaming消费,但是Offsets则是由Zookeeper保存。通过参数配置,可以实现at-least once消费,此种情况有重复消费数据的可能。
    4、降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
    5、降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。
    6、鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

    Direct方式的缺点:
    • 提高成本。Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。
    • 监控可视化。Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发。


    Receive-base优点:
    1、Kafka的high-level数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少用户的工作量以及代码量而且相对比较简单。

    Receive-base的缺点:
    1、防数据丢失。做checkpoint操作以及配置spark.streaming.receiver.writeAheadLog.enable参数,配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
    2、单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。
    3、在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费
    4、提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。5、Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。
    6、采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度
    展开全文
  • SparkStreaming读取kafka数据的两种方式

    千次阅读 2018-11-18 22:21:42
    Receive是使用的高级API,需要消费者连接Zookeeper来读取数据。是由Zookeeper来维护偏移量,不用我们来手动维护,这样的话就比较简单一些,减少了代码量。但是天下没有免费的午餐,它也有很多缺点: 1.导致丢失数据...

    Receive

    Receive是使用的高级API,需要消费者连接Zookeeper来读取数据。是由Zookeeper来维护偏移量,不用我们来手动维护,这样的话就比较简单一些,减少了代码量。但是天下没有免费的午餐,它也有很多缺点:
    1.导致丢失数据。它是由Executor内的Receive来拉取数据并存放在内存中,再由Driver端提交的job来处理数据。这样的话,如果底层节点出现错误,就会发生数据丢失。
    2.浪费资源。可以采取WALs方式将数据同步到高可用数据存储平台上(HDFS,S3),那么如果再发生错误,就可以从中再次读取数据。但是这样会导致同样的数据存储了两份,浪费了资源。
    3.可能会导致重复读取数据。对于公司来说,一些数据宁可丢失了一小小部分也不能重复读取,但是这种由Zookeeper来记录偏移量的方式,可能会因为Spark和Zookeeper不同步,导致一份数据读取了两次。
    4.效率低。因为是分批次执行的,它是接收数据,直到达到了设定的时间间隔,才可是进行计算。而且我们在KafkaUtils.createStream()中设定的partition数量,只会增加receive的数量,不能提高并行计算的效率,但我们可以设定不同的Group和Topic创建DStream,然后再用Union合并DStream,提高并行效率。

    官网Receive的架构图如下:
    在这里插入图片描述

    Direct

    Direct方式则采用的是低层次的API,直接连接kafka服务器上读取数据。需要我们自己去手动维护偏移量,代码量稍微大些。不过这种方式的优点有:
    1.当我们读取Topic下的数据时,它会自动对应Topic下的Partition生成相对应数量的RDD Partition,提高了计算时的并行度,提高了效率。
    2.它不需要通过WAL来维持数据的完整性。采取Direct直连方式时,当数据发生丢失,只要kafka上的数据进行了复制,就可以根据副本来进行数据重新拉取。
    3.它保证了数据只消费一次。因为我们将偏移量保存在一个地方,当我们读取数据时,从这里拿到数据的起始偏移量和读取偏移量确定读取范围,通过这些我们可以读取数据,当读取完成后会更新偏移量,这就保证了数据只消费一次。

    官方的Direct架构图如下:
    在这里插入图片描述

    总结

    在spark1.3以后的版本中,direct方式取代了receive方式,当然在公司中,使用的都是direct方式。从上面对比也能看出receive方式的效率低,而且数据完整性也很让人担忧,当我们采取direct方式时,完全不用为这两点所担忧,可以根据自己想读取的范围进行读取,底层失败后也能通过副本来进行数据恢复。
    在接下来的博客中,我会向大家分别介绍三种采用Direct存储偏移量的方式。
    1.存储于HBase
    2.存储于Zookeeper
    3.存储于MySQL
    如果我说的有哪点错误的地方,希望能帮我指正出来。

    									 summed up by JiaMingcan
    									 转载请署名:JiaMingcan
    
    展开全文
  • Spark Streaming读取Kafka数据的两种方式

    千次阅读 2019-04-20 16:49:40
    Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-streaming-kafka-0-10。在使用时应注意以下几点: spark-streaming-kafka-0-8...

    Kafka在0.80.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8spark-streaming-kafka-0-10。在使用时应注意以下几点:

    1. spark-streaming-kafka-0-8兼容Kafka 0.8.2.1及以后的版本, 从Spark 2.3.0开始,对Kafka 0.8支持已被标记为过时。

    2. spark-streaming-kafka-0-10兼容Kafka 0.10.0及以后的版本, 从Spark 2.3.0开始, 此API是稳定版。

    3. 如果Kafka版本大于等于0.10.0,且Spark版本大于等于Spark 2.3.0,应使用spark-streaming-kafka-0-10

    本文总结spark-streaming-kafka-0-8中两种读取Kafka数据的方式:createStreamcreateDirectStream


    基于Receiver方式

    POM依赖

     <dependencies>
         <!--spark-streaming-->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_2.11</artifactId>
             <version>2.2.2</version>
         </dependency>
    
         <!--spark-streaming-kafka-plugin-->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
             <version>2.2.2</version>
         </dependency>
     </dependencies>
    

    示例一

     // 1、Kafka配置
     // 配置zookeeper集群、消费者组
     val kafkaParams = Map(
       "zookeeper.connect" -> "localhost:2181",
       "group.id" -> groupID)
    
     // 2、topic_name与numThreads的映射
     // topic有几个partition,就写几个numThreads。
     // 每个partition对应一个单独线程从kafka取数据到Spark Streaming
     val topics = Map(topicName -> numThreads)
    
     // 3、ReceiverInputDStream
     // 注意:应先import kafka.serializer.StringDecoder再import org.apache.spark.streaming._
     val kafkaStream= KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
       ssc,
       kafkaParams,
       topics,
       StorageLevel.MEMORY_AND_DISK_SER_2)
          
    

    示例二

     // 1、topic_name与numThreads的映射
     // topic有几个partition,就写几个numThreads。
     // 每个partition对应一个单独线程从kafka取数据到Spark Streaming
     val topics = Map(topicName -> numThreads)
    
     // 2、ReceiverInputDStream
     // 底层先根据zkQuorum、groupId 构造kafkaParams,
     // 然后再调用createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel)
     val kafkaStream=KafkaUtils.createStream(
       ssc=ssc,
       zkQuorum="localhost:2181",
       groupId = groupID,
       topics,
       StorageLevel.MEMORY_AND_DISK_SER_2
     )
       
    

    特点

    1. 需要使用单独的Receiver线程来异步获取Kafka数据。

    2. Receiver底层实现中使用了Kafka高级消费者API,因此,不需要自己管理Offset,只需指定Zookeeper和消费者组GroupID,系统便会自行管理。

    3. 执行过程: Spark Streaming启动时,会在Executor中同时启动Receiver异步线程用于从Kafka持续获取数据,获取的数据先存储在Receiver中(存储方式由StorageLevel决定),后续,当Batch Job触发后,这些数据会被转移到剩下的Executor中被处理。处理完毕后,Receiver会自动更新Zookeeper中的Offset。

    4. 默认情况下,程序失败或Executor宕掉后可能会丢失数据,为避免数据丢失,可启用预写日志(Write Ahead Log,WAL)。将Receiver收到的数据再备份一份到更可靠的系统如HDFS分布式文件中,以冗余的数据来换取数据不丢失。

    5. 生产下,为保证数据完全不丢失,一般需要启用WAL。启用WAL,在数据量较大,网络不好情况下,会严重降低性能。


    基于Direct(No Receiver)方式

    POM依赖

     <dependencies>
         <!--spark-streaming-->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_2.11</artifactId>
             <version>2.3.1</version>
         </dependency>
    
         <!--spark-streaming-kafka-plugin-->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
             <version>2.3.1</version>
         </dependency>
     </dependencies>
    
    

    示例一

     // 1、Kafka配置
     // auto.offset.reset=latest 无提交的offset时,从最新的开始消费
     // enable.auto.commit=false 禁用后台自动提交offset,自己手动管理
     val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> "localhost:9092",
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       "auto.offset.reset" -> "latest",
       "enable.auto.commit" -> (false: java.lang.Boolean),
       "group.id" -> groupID)
    
     // 2、DirectKafkaInputDStream
     // LocationStrategies:本地策略。为提升性能,可指定Kafka Topic Partition的消费者所在的Executor。
     // LocationStrategies.PreferConsistent:一致性策略。一般情况下用这个策略就OK。将分区尽可能分配给所有可用Executor。
     // LocationStrategies.PreferBrokers:特殊情况,如果Executor和Kafka Broker在同一主机,则可使用此策略。
     // LocationStrategies.PreferFixed:特殊情况,当Kafka Topic Partition负荷倾斜,可用此策略,手动指定Executor来消费特定的Partition.
     // ConsumerStrategies:消费策略。
     // ConsumerStrategies.Subscribe/SubscribePattern:可订阅一类Topic,且当新Topic加入时,会自动订阅。一般情况下,用这个就OK。
     // ConsumerStrategies.Assign:可指定要消费的Topic-Partition,以及从指定Offset开始消费。
     val kafkaStream=KafkaUtils.createDirectStream[String,String](
       ssc,
       LocationStrategies.PreferConsistent,
       ConsumerStrategies.Subscribe[String,String](List(topicName),kafkaParams)
     )
    
    

    示例二

     // 1、Kafka配置
     // auto.offset.reset=latest 无提交的offset时,从最新的开始消费
     // enable.auto.commit=false 禁用后台自动提交offset,自己手动管理
     val kafkaParams = Map[String, Object](
       "bootstrap.servers" -> "localhost:9092",
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       "auto.offset.reset" -> "latest",
       "enable.auto.commit" -> (false: java.lang.Boolean),
       "group.id" -> groupID)
    
     // 2、DirectKafkaInputDStream
     // LocationStrategies.PreferConsistent:一致性策略。
     // ConsumerStrategies.Assign:从指定Topic-Partition的Offset开始消费。
     val initOffset=Map(new TopicPartition(topicName,0)->10L)
     val kafkaStream=KafkaUtils.createDirectStream[String,String](
       ssc,
       LocationStrategies.PreferConsistent,
       ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
     )
    
    

    特点

    1. 不需要使用单独的Receiver线程从Kafka获取数据。

    2. 使用Kafka简单消费者API,不需要ZooKeeper参与,直接从Kafka Broker获取数据。

    3. 执行过程:Spark Streaming Batch Job触发时,Driver端确定要读取的Topic-Partition的OffsetRange,然后由Executor并行从Kafka各Partition读取数据并计算。

    4. 为保证整个应用EOS, Offset管理一般需要借助外部存储实现。如Mysql、HBase等。

    5. 由于不需要WAL,且Spark Streaming会创建和Kafka Topic Partition一样多的RDD Partition,且一一对应,这样,就可以并行读取,大大提高了性能。

    6. Spark Streaming应用启动后,自己通过内部currentOffsets变量跟踪Offset,避免了基于Receiver的方式中Spark Streaming和Zookeeper中的Offset不一致问题。

    展开全文
  • 首先确定: 自己的idea开发环境和spark集群的开发环境一致,jdk, scala 版本都一直,而且在idea中测试可以消费到kafka集群中的数据: 集群 spark为1.6.1,本地idea也是spark1.6依赖。并且将sparkstream2kafka程序...

    首先确定: 自己的idea开发环境和spark集群的开发环境一致,jdk, scala 版本都一直,而且在idea中测试可以消费到kafka集群中的数据:
    集群 spark为1.6.1,本地idea也是spark1.6依赖。并且将sparkstream2kafka程序发布到spark集群也是正常运行,但是获取不到kafka中的数据。后来经过自己大量的测试终于找到了答案(百度了好久没有):

    代码:

    package spark_api.kafka
    
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import spark_api.kafka.datamodel.TestModel
    import spark_api.kafka.serializer.{StringDecoder, TestModelSerializer}
    
    object SparkStreaming2kafkaConsumerCustomTestModeByCreateDirectStream {
      var status: Boolean = false
      def main(args: Array[String]): Unit = {
        var Array(a,b) = args
        val conf = new SparkConf().setAppName(a).setMaster(b)
        val scc: StreamingContext = new StreamingContext(conf,Seconds(2))
        val sc = scc.sparkContext
        sc.hadoopConfiguration.addResource("core-site.xml")
        sc.hadoopConfiguration.addResource("hdfs-site.xml")
    
        val kafkaParam = Map("zookeeper.connect"->"node1:2181,node2:2181,node3:2181,node4:2181","group.id"->a,"zookeeper.connection.timeout.ms" -> "100000"
        ,"metadata.broker.list"->"node1:9092,node2:9092,node3:9092,node4:9092")
        val topics=Set("TestModel")
        val kafkaRdd = KafkaUtils.createDirectStream[String,TestModel,StringDecoder,TestModelSerializer](scc,kafkaParam,topics)
    
        kafkaRdd.foreachRDD(rdd=>{
          println("准备解析数据。。。")
          consumer2KakfaJQ(rdd) //此方法在 本地 和spark集群都可以正常获得kafka数据,
          //consumerKafka(rdd)//在 本地 可以获得kafka数据,在spark集群则无效
    
        })
        kafkaRdd.start()
        scc.start()
        scc.awaitTermination()
    
      }
    
      private def consumer2KakfaJQ(rdd: RDD[(String, TestModel)]) = {
        val len = rdd.collect().length
        println("数据为: " + len)
        if (len > 0) {
          rdd.collect().foreach(yz => {   //集群可以拿取kafka数据
            println("查看数据" + yz._1 + "  :  " + yz._2)
          })
        }//
      }
    
      private def consumerKafka(rdd: RDD[(String, TestModel)]) = {
        rdd.foreach(yz => {  //集群拿不到kafka数据
          println("开始解析数据:数据为: " + yz._2.toString);
          status = true
          println("当前线程ID: " + Thread.currentThread().getId)
        }
        ) //
        println("当前线程ID: " + Thread.currentThread().getId)
        println(status)
        if (status) {
          println("数据解析完毕。。。")
          status = false
        } else {
          println("当前获取不到数据!!!")
        }
      }
    }

    总结:原来是rdd.foreach 导致在spark集群获取不到kafka中的数据,需要换成 rdd.collect().foreach,就可以获得数据,具体原因上明确,还请大神告知。

    展开全文
  • Spark Streaming+Kafka spark 写入 kafka

    千次阅读 2018-09-14 17:47:49
    Spark streaming接收Kafka数据 基于Receiver的方式 直接读取方式 Sparkkafka中写入数据 Spark streaming+Kafka应用 Spark streaming+Kafka调优 合理的批处理时间(batchDuration) 合理的Kafka拉取量...
  • sparkstreaming读取kafka的两种方式

    万次阅读 2017-08-27 15:15:00
    spark streaming提供了两种获取方式,一种是同storm一样,实时读取缓存内存中;另一种是定时批量读取。 这两种方式分别是: Receiver-base Direct 一 、Receiver-base: Spark官方最先提供了基于Receiver...
  •  1、对于银行应用日志,一个系统建一个topic,每台主机对应一个partition,规则为,flume采集时,同一个应用,数据同一个topic,一个主机,送一个partition,这样做是为了同一个日志的数据在一个partition中,...
  • SparkStreaming 读取kafka数据(spark2.3.2)

    千次阅读 2019-04-29 16:11:48
    流处理一般都会涉及到kafka,所以用SparkStreaming读取kafka数据是流处理的必备技能之一。 1.先在pom.xml文件中添加 ${spark.version} 即你的spark的版本,我sparkspark 2.3.2。我kafkakafka_2.11-0.10.2.2 .....
  • 使用kafka的高层次Consumer api来实现的,Receiver从kafka中获取的数据都是存储在spark executor的内存中,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而...
  • 简单了解一下Kafka:是一种高...当SparkStreaming与Kafka做集成的时候Kafka成了Streaming的高级数据源,由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来...
  • spark stream从kafka读取数据,10秒间隔;需要缓存当天数据用于业务分析。 思路1:定义static rdd用于union每次接收的rdd;用window窗口(窗口长1小时,滑动步长20分钟);union之后checkpoint。 但是发现在利用...
  • java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 解决方法:tranformation最后一步产生的那个RDD必须有相应Action操作,例如massages.print()等
  • 文章目录一、 题目题目和数据二、 pom依赖三、建表语句四、 连接kafka配置类五、 自定义分区类六、 读取数据并发送数据七、 消费数据,把数据存储mysql 一、 题目 题目和数据 链接: ...
  • 代码: /** * * @description: * @author: wanjintao ...import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import o
  • SparkStreaming从kafka读取文件流时(Java),默认是utf-8的,如果源文件的编码不是utf-8,那就会出现乱码现象,但是kafka的官网参数里没有关于编码的参数,不过kafka的源码里面是有的。源码如下: ...
  •   spark Streaming从kafka读取数据的方式分为Receiver和Direct两种方式 Receiver方式 ​   Receiver是使用kafka的高层次Consumer API来实现的,Receiver从kafka中获取数据存储在Spark Executor的内存之中,...
  • 首先说明环境: ...kafka 2.4.0 2.11.12 2.2.1 废话不多说直接上代码 #首先是依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11&l...
  • spark读取kafka数据写入hbase

    千次阅读 2018-05-18 16:22:11
    package com.prince.demo.test import java.util.UUID import com.typesafe.config.{Config, ...import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put impo...
  • spark-streaming-kafka-demo 使用Springboot框架,Sparkstreaming监听Kafka消息,Redis记录已读Kafka偏移量,Spark统计单词出现次数,最后写入Hive表。 代码参考:...
  • receiver从Kafka中获取的数据都存储在Spark Executor的内存中,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零...
1 2 3 4 5 ... 20
收藏数 13,186
精华内容 5,274
关键字:

kafka读取数据到spark