kafka与spark原理_spark2 kafka运行原理 - CSDN
  • 1:SparkSteaming基于kafka的方式,主要有俩种,即Receiver和Derict,基于Receiver的方式,是sparkStreaming给我们提供了kafka访问的高层api的封装,而基于Direct的方式,就是直接访问,在sparkSteaming中直接去操作...

    有兴趣想学习国内整套Spark+Spark Streaming+Machine learning最顶级课程的,可加我qq  471186150。共享视频,性价比超高!

    1:SparkSteaming基于kafka获取数据的方式,主要有俩种,即Receiver和Derict,基于Receiver的方式,是sparkStreaming给我们提供了kafka访问的高层api的封装,而基于Direct的方式,就是直接访问,在sparkSteaming中直接去操作kafka中的数据,不需要前面的高层api的封装。而Direct的方式,可以对kafka进行更好的控制!同时性能也更好。

    2:实际上做kafka receiver的时候,通过receiver来获取数据,这个时候,kafka receiver是使用的kafka高层次的comsumer api来实现的。receiver会从kafka中获取数据,然后把它存储到我们具体的Executor内存中。然后spark streaming也就是driver中,会根据这获取到的数据,启动job去处理。

    3:注意事项:

    1)在通过kafka receiver去获取kafka的数据,在正在获取数据的过程中,这台机器有可能崩溃了。如果来不及做备份,数据就会丢失,切换到另外一台机器上,也没有相关数据。这时候,为了数据安全,采用WAL的方式。write  ahead log,预写日志的方式会同步的将接收到的kafka数据,写入到分布式文件系统中。但是预写日志的方式消耗时间,所以存储时建议Memory_and_Disc,不要2.如果是写到hdfs,会自动做副本。如果是写到本地,这其实有个风险,就是如果这台机器崩溃了,再想恢复过来,这个是需要时间的。

    2):我们的kafka receiver接收数据的时候,通过线程或者多线程的方式,kafka中的topic是以partition的方式存在的。sparkstreaming中的kafka receiver接收kafka中topic中的数据,也是通过线程并发的方式去获取的不同的partition,例如用五条线程同时去读取kafka中的topics中的不同的partition数据,这时你这个读取数据的并发线程数,和RDD实际处理数据的并发线程数是没任何关系的。因为获取数据时都还没产生RDD呢。RDD是Driver端决定产生RDD的。

    3)默认情况下,一个Executor中是不是只有一个receiver去接收kafka中的数据。那能不能多找一些Executor去更高的并发度,就是使用更多的机器去接收数据,当然可以,基于kafa的api去创建更多的Dstream就可以了。很多的Dstream接收kafka不同topics中的不同的数据,最后你计算的时候,再把他优联就行了。其实这是非常灵活的,因为可以自由的组合。




    kafka + spark streaming 集群

     

    前提:

    spark 安装成功,spark 1.6.0

    zookeeper 安装成功 

    kafka 安装成功

    启动集群和zookeeperkafka

    步骤:

    1:创建topictest

    kafka-topics.sh --create --zookeeper master1:2181,work1:2181,work2:2181 --replication-factor 3 --partitions 1 --topic test

    worker1启动kafka 生产者:

    root@worker1:/usr/local/kafka_2.10-0.9.0.1# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

     

     

    worker2中启动消费者:

    root@worker2:/usr/local/kafka_2.10-0.9.0.1# bin/kafka-console-consumer.sh --zookeeper master1:2181 --topic test

     

    生产者生产的消息,消费者可以消费到。说明kafka集群没问题。进入下一步。

     

    master中启动spark-shell

    ./spark-shell --master  local[2] --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0

    笔者用的spark 是 1.6.0 ,读者根据自己版本调整。

    shell中的逻辑代码(wordcount,启动完成,把下面代码直接丢进去:

     

    import org.apache.spark.SparkConf

    import org.apache.spark.streaming._

    import org.apache.spark.streaming.kafka._

    import org.apache.spark.streaming.kafka.KafkaUtils

    import org.apache.spark.streaming.{Durations, StreamingContext}

     

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    // 第二个参数是zk集群信息,zkclient host:port,生动的说明了kafka读取数据获取offset

    //等元数据等信息,是从zk里面获取的。所以要连zk

    // 第三个参数是Consumer groupID,随便写的

    //4个参数是消费的topic,以及并发读取topicPartition的线程数,这个Map指定了你

    //要消费什么topic,以及怎么消费topic

    KafkaUtils.createStream(ssc, "master:2181,worker1:2181,worker2:2181", "StreamingWordCountSelfKafkaScala", Map("test" -> 1)).map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

     

    生产者再生产消息:

     

     

    spark streaming的反应:

     

     

    返回worker2查看消费者

     

    可见,groupId不一样,相互之间没有互斥。

     

    上述是使用 createStream 方式链接kafka

     

    还有更高效的方式,请使用createDirectStream

     

    参考:

    http://spark.apache.org/docs/latest/streaming-kafka-integration.html





    展开全文
  • kafkaspark总结

    2019-01-08 09:46:36
    kafkaspark总结 本文涉及到的技术版本号: scala 2.11.8 kafka1.1.0 spark2.3.1 kafka简介 kafka是一个分布式流平台,流媒体平台有三个功能 发布和订阅记录流 以容错的持久化的方式存储记录流 发生数据时对流...

    kafka和spark总结

    本文涉及到的技术版本号:

    • scala 2.11.8
    • kafka1.1.0
    • spark2.3.1

    kafka简介

    kafka是一个分布式流平台,流媒体平台有三个功能

    • 发布和订阅记录流
    • 以容错的持久化的方式存储记录流
    • 发生数据时对流进行处理

    kafka通常用于两大类应用

    • 构件在系统或应用程序之间可靠获取数据的实时数据管道
    • 构件转换或响应数据流的实时流应用程序

    kafka的几个概念

    • kafka运行在集群上,或一个或多个能跨越数据中心的服务器上
    • kafka集群上存储流记录的称为topic
    • kafka的topic里,每一条记录包括一个key、一个value和一个时间戳timestamp

    kafka有四个核心API

    • Producer API

      生产者api允许应用程序发布一个记录流到一个或多个kafka的topic

    • Consumer API

      消费者api允许应用程序订阅一个或多个topic并且接受处理传送给消费者的数据流

    • Streams API

      流api允许应用程序作为一个流处理器,从一个或多个输入topic中消费输入流,并生产一个输出流到一个或多个输出topic中

    • Connector API

      连接器api允许构建和运行中的kafka的topic连接到现有的应用程序或数据系统中重用生产者或消费者。例如关系数据库的连接器可以捕获对表的每一个更改操作

    kafka中的客户端和服务端之间是通过简单、高性能的语言无关的TCP协议完成的,该协议已经版本化并且高版本向低版本向后兼容。

    topics

    topic为kafka为记录流提供的核心抽象,类似于数据通道,并且topic是发布记录和订阅的核心。

    kafka的topic是多用户的,一个topic可以有0个、1个或多个消费者订阅记录

    对于每一个topic,kafka集群都维护了一个如下所示的分区记录:
    topic

    其中每一个分区都是有序的不可变的记录序列,并且新数据是不断的追加到结构化的记录中。分区中的记录每个都分配了一个offset作为ID,它唯一标识分区中的每个记录。

    kafka集群默认是保存所有记录,无论是否被消费过,但是可以通过配置保留时间策略。例如如果设置数据保留策略为两天,则超过两天的数据将被丢弃释放空间。kafka的性能受数据大小影响不大,因此长时间的存储数据并不是太大的问题。

    其中,kafka 的消费者唯一对topic中的每一个分区都可以设置偏移量offset,标识当前消费者从哪个分区的哪一条数据开始消费,消费者通过对偏移量的设置可以灵活的对topic进行消费。如下图
    offset

    消费者控制自己的偏移量就意味着kafka的消费者是轻量的,消费者之间互不影响。

    topic记录中的分区有多种用途,首先它允许topic扩展到超出单台服务器适合的大小。每个分区都需要有适合托管分区的服务器,而topic可以有很多分区,因此一个topic可以处理任意数量的数据。另外这些分区作为并行的单位,效率很高,这也是相当重要的一点。

    分配

    记录分区分布在kafka集群服务器上,每个服务器共同处理数据并请求分区的共享。每个分区都可以在可用服务器的数量上进行复制,以此实现容错。

    每一个分区都会有一个服务器作为leader,0个或多个服务器作为followers。leader处理分区的所有读取和写入请求,而follower被动的复制leader。如果leader出错,则其中一个follower会自动称为新的leader。集群中的每个服务器都充当某分区的leader和其他分区的follower,因此能在集群中达到负载均衡。

    生产者

    生产者将数据发布到所选择的分区,生产者在发布数据是需要选择将数据发送到哪个分区,分配分区可以通过循环方式完成也可以根据语义分区的功能实现。

    消费者

    消费者使用消费者组(consumer group)标记自己。发布到topic的每个记录会被发送到每个消费者组中的一个消费者实例。所以当一个消费者组中有多个消费者实例,则记录将在该消费者组中的所有消费者之间进行有效的负载均衡。

    topic接受的每一条记录都会被广播发送到每个消费者组中。示意图如下:

    消费者
    上图有两个机器的kafka集群,某topic有四个分区p0-p3,有两个消费者组A/B订阅该topic,消费者组A有两个消费者实例,消费者组B有四个消费者实例。

    kafka中实现消费的方式是通过在消费者实例上划分分区实现,保证实例在任何时间点都是公平分配的。消费者组中的成员划分分区是由kafka协议进行动态处理。如果新实例加入该组,那新加入的实例会从改组的成员中接管一些分区。如果消费者组中的某个实例死亡,则它所划分的分区分配给该消费组的其他实例。

    kafka只能提供一个分区内的记录的顺序,但是不保证多个分区的记录顺序。如果用户想保证topic中的顺序,则使用一个分区的topic即可,但这样就意味着每个消费者组中只能有一个消费者实例。

    kafka提供的保证

    • 同一个生产者实例发送到特定topic的特定分区的两条数据M1和M2,并且M1发送早于M2,则M1将拥有更低的偏移量,即可以保证插入顺序。
    • 消费者可以按照记录存储的顺序消费记录
    • 对于复制因子为N的topic,最多可以容忍N-1个服务器故障,而不会丢失提交到topic中的记录

    kafka常用命令

    • 启动Zookeeper server

      bin/zookeeper-server-start.sh config/zookeeper.properties &
      
    • 启动Kafka server

      nohup bin/kafka-server-start.sh config/server.properties &
      
    • 停止Kafka server

      bin/kafka-server-stop.sh
      
    • 停止Zookeeper server

      bin/zookeeper-server-stop.sh
      
    • producer

      bin/kafka-console-producer.sh --broker-list 192.168.20.133:9092 --topic realtime
      
    • consumer

      bin/kafka-console-consumer.sh --zookeeper 192.168.20.133:2181 --topic realtime --from-beginning
      
    • 查看所有topic

      bin/kafka-topics.sh --list --zookeeper localhost:2181
      
    • 创建一个topic

      bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic realtime0103
      
    • 查看topic详情

      bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic realtime0103
      
    • 删除topic

      bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic realtime0103
      

    java操作kafka

    引入jar包

      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>1.1.0</version>
      </dependency>
    

    Producer

    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class ProducerDemo {
    
      public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.20.133:9092,192.168.20.134:9092,192.168.20.135:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        String topic = "realtime0103";
    
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        String value = "{'name':'1','value':1}" ; 
        
        //设定分区规则,分区为0,1,2
        int partation = KafkaProducerClient.count.get() % 3;
    
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,partation, "key1",value );
          
        producer.send(record).get();
      }
    }
    

    Customer

    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    
    public class CustomerDemo {
    
      private static KafkaConsumer<String, String> consumer;
      private static String inputTopic;
    
      @SuppressWarnings({ "unchecked", "rawtypes" })
      public static void main(String[] args) {
        String groupId = "group1";
        inputTopic = "realtime0103";
        String brokers = "192.168.20.133:9092";
    
        consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId));
        
        //分配topic 某分区的offset
        TopicPartition part0 = new TopicPartition(inputTopic, 0);
        TopicPartition part1 = new TopicPartition(inputTopic, 1);
        TopicPartition part2 = new TopicPartition(inputTopic, 2);
        OffsetAndMetadata offset0 = new OffsetAndMetadata(1);
        OffsetAndMetadata offset1 = new OffsetAndMetadata(2);
        OffsetAndMetadata offset2 = new OffsetAndMetadata(3);
        Map<TopicPartition,OffsetAndMetadata> offsetMap = new HashMap<>();
        offsetMap.put(part0,offset0);
        offsetMap.put(part1,offset1);
        offsetMap.put(part2,offset2);
        //提交offset信息
        consumer.commitSync(offsetMap);
        
        start();
    
      }
    
      private static Properties createConsumerConfig(String brokers, String groupId) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokers);
            props.put("group.id", groupId);
            props.put("auto.commit.enable", "false");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            return props;
        }
    
      private static void start() {
        consumer.subscribe(Collections.singletonList(inputTopic));
    
            System.out.println("Reading topic:" + inputTopic);
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record: records) {
                    String ip = record.key();
                    String event = record.value();
                    System.out.println(event);
                }
                consumer.commitSync();
            }
    
      }
    }
    

    spark操作kafka

    IDEA配置搭建spark scala开发环境(Windows)

    • 安装jdk8并配置环境变量
    • 安装scala2.11并配置环境变量(本文安装2.11.8)
    • 安装IDEA
    • IDEA安装SBT和Scala插件
    • File->New->Project 创建新项目,选择Scala->sbt->next

    新建项目

    • 选择项目名称、位置、Java版本号、sbt版本和Scala版本,Finish

    选择版本

    • 打开build.sbt,添加相关依赖

      libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
      libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.1" % "provided"
      libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.1"
      libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
      
    • 刷新sbt项目,下载依赖:

    刷新

    • 编写业务代码,可以使用以下的使用spark Structured Streaming连接kafka处理流部分代码
    • 设置打包规则
      • File->Project Sturcture->Artifacts 点击绿色加号设置打jar包规则

        Artifacts

      • 选择Module和Main class,JAR file from libraries选择copy to output…即不将外部jar打包到jar文件中

        Artifacts

      • 导航栏 Build->Build Artifacts ,打包成jar,将jar包上传到spark集群

    • 运行程序:
      • 以上配置打出jar包包含项目jar包和多个依赖jar包,提交spark作业时,可以使用–jars逗号隔开配置引用多个外部jar

        cd $SPARK_HOME
        ./bin/spark-submit --master spark://192.168.20.133:7077 --jars /root/interface-annotations-1.4.0.jar,/root/async-1.4.1.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class com.xuchg.app.Application /root/spark-kafka.jar
        

    使用spark Structured Streaming连接kafka处理流

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    object Main extends App {
    
      //spark读取kafka示例
      Logger.getLogger("org").setLevel(Level.ERROR)
      val kafkaAddress = "192.168.20.133:9092"
      val zookeeper = "192.168.20.133:2181"
      val topic = "realtime0103"
      val topicOffset = "{\""+topic+"\":{\"0\":0,\"1\":0,\"2\":0}}"
      val sparkSession = SparkSession
        .builder()
        .config(new SparkConf()
          .setMaster("local[2]")
          .set("spark.streaming.stopGracefullyOnShutdown","true")//设置spark,关掉sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
          .set("spark.submit.deployMode","cluster")
          .set("spark.executor.memory","4g")//worker内存
          .set("spark.driver.memory","4g")
          .set("spark.cores.max","2")//设置最大核心数
        )
        .appName(getClass.getName)
        .getOrCreate()
    
      def createStreamDF(spark:SparkSession):DataFrame = {
        import spark.implicits._
        val df = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaAddress)
          .option("zookeeper.connect", zookeeper)
          .option("subscribe", topic)
          .option("startingOffsets", topicOffset)
          .option("enable.auto.commit", "false")
          .option("failOnDataLoss", false)
          .option("includeTimestamp", true)
          .load()
        df
      }
    
      var df = createStreamDF(sparkSession)
    
      val query = df.writeStream
        .format("console")
        .start()
    
      query.awaitTermination()
    }
    

    监控spark和kafka

    此处根据实际应用情况使用两种监控方法,解决两个不同问题

    • 解决spark启动和停止处理的动作,例如监听spark停止时处理或记录完所有计算

      //定义监听类继承SparkListener,并重写相关方法
      import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}
      class AppListener extends SparkListener{
        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
          //监控spark停止方法,可以处理spark结束的动作
          println("application 关闭")
        }
      
        override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
          println("application 启动")
        }
      }
      
      //在主类中注册监听类
      sparkSession.sparkContext.addSparkListener(new AppListener)
      
    • 监控spark的查询,例如spark读取kafka流的偏移量offset,可以监听并记录下来,下次启动spark可以直接从该偏移量offset进行消费,不会消费相同的数据

      sparkSession.streams.addListener(new StreamingQueryListener() {
        override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
          println("Query started: " + queryStarted.id)
        }
        override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
          //服务出现问题而停止
          println("Query terminated: " + queryTerminated.id)
        }
        override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
          var progress = queryProgress.progress
          var sources = progress.sources
          if(sources.length>0){
            var a = 0
            for(a <- 0 to sources.length - 1){
              var offsetStr = sources.apply(a).startOffset
              if(offsetStr!=null){
                println("检测offset是否变化 -- " + offsetStr)
              }
            }
          }
        }
      })
      

      运行结果如下:可以看到对topic的每个分区的偏移量都可以获取到
      offset

    管理和停止spark程序

    在spark集群主节点配置并使用spark history server可以实现对spark作业进行管理和监控

    • 配置spark history server

      • 修改$SPARK_HOME/conf/spark-defaults.conf,如果不存在则从模板复制一份

        cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
        vi $SPARK_HOME/conf/spark-defaults.conf
        
      • 修改配置文件如下:

        spark.eventLog.enabled           true
        spark.eventLog.dir               hdfs://192.168.20.133:9000/spark-history
        spark.eventLog.compress          true
        # 注意ip地址需要根据情况更改,9000为hdfs默认端口号,如hdfs端口号不是9000则需要更改
        
      • 创建hdfs目录

        $HADOOP_HOME/bin/hdfs dfs -mkdir /spark-history
        
      • 配置$SPARK_HOME/conf/spark-env.sh文件:

        • 如果不存在则从模板复制:

          cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
          
        • 编辑$SPARK_HOME/conf/spark-env.sh,结尾添加:

          export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://192.168.20.133:9000/spark-history"
          # 18080为history server的访问端口号,192.168.20.133:9000为hdfs的ip和端口,根据实际情况修改
          
      • 打开防火墙18080端口

      • 执行命令打开history server服务

        $SPARK_HOME/sbin/start-history-server.sh
        
    • 代码中sparkSession创建时添加配置:

      //设置spark,关掉sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
      new SparkConf().set("spark.streaming.stopGracefullyOnShutdown","true")
      
    • 使用shell关掉某一个正在运行的spark作业:

      • spark作业关闭原理
        每一个spark作业都由一个appId唯一标识,而每一个作业包含多个Executors执行器,这些Executors中包含1个或几个id为driver的驱动程序,它是执行开发程序中的 main方法的进程。如果驱动程序停止,则当前spark作业就结束了。

      • spark关闭过程

        • 获取某appId的spark作业的driver节点的ip和端口,可以通过spark history server提供的页面或提供的api进行获取。此处介绍页面获取,后面会介绍api获取

          finddriver

        • 根据获取的driver的端口号对spark作业发送停止命令,当然有时ctrl+c和在监控页面上都是可以直接停止的,但此处只提用shell停止,应用场景更广泛。

          centod7:ss -tanlp |  grep 60063|awk '{print $6}'|awk  -F, '{print $2}'|awk -F= '{print $2}'|xargs kill -15
          centos6:ss -tanlp |  grep 60063|awk '{print $6}'|awk  -F, '{print $2}'|xargs kill -15
          

          注意:centos6和centos7稍有不同,而且此处使用kill -15而不是kill -9,kill -9会直接杀死进程,可能会导致丢失数据。而kill -15是发送停止命令,不会直接杀死进程。

      • 通过以上内容可以实现在spark集群主节点部署web服务接收并远程调用执行shell语句来达到远程动态启动(可传参)和停止spark作业,大体如下:

        • 远程调用接口传参启动spark作业,此时记录下spark运行的appid

        • 通过调用spark history server提供的REST Api获取当前作业driver进程的端口号:

          http://192.168.20.133:18080/api/v1/applications/{appId}/allexecutors
          

          driver

        • 通过获取到的端口号可以向spark集群主节点发送停止命令到该端口进程即可

    示例项目地址:github
    kafka官网
    spark官网

    展开全文
  • Spark是一个用来是实现快速而通用的集群计算的平台。Spark是UC Berkeley AMP Lab(加州大学伯克利分销的AMP实验室)所开源的类MapReduce的通用并行框架, 现在已经是Apache中的一个顶级项目。Spark使用Scala语言开发...

    欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

    欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-and-spark-integration-1-spark-introduction/


    Spark是一个用来是实现快速而通用的集群计算的平台。Spark是UC Berkeley AMP Lab(加州大学伯克利分销的AMP实验室)所开源的类MapReduce的通用并行框架, 现在已经是Apache中的一个顶级项目。Spark使用Scala语言开发,支持Scala、Java、Python、R语言相关的API,运行于JVM之上。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性。Spark适用于各种各样原先需要多种不同的分布式平台的场景,包括批处理、迭代计算、交互式查询、流处理等。

    如下图所示,Spark生态圈即BDAS(伯克利数据分析栈)所包含的组件有Spark Core、Spark Streaming、Spark SQL、MLlib和GraphX,它们都是由AMP实验室提供,能够无缝地继承,并提供一站式解决平台。

    Spark Core实现了Spark的基本功能,包含任务调度、内存管理、错误恢复以及与存储系统交互等模块。Spark Streaming属于Spark Core API的扩展,支持实时数据流的可扩展、高吞吐、容错的流处理。Spark SQL是Spark的一个结构化数据处理模块,提供了DataFrame/Dataset的编程抽象,可以看作是一个分布式查询引擎。从Spark2.0开始又引入了Structured Streaming,它是建立在Spark SQL之上的可扩展和高容错的流处理引擎。MLlib是Spark提供的机器学习功能的程序库,它提供了很多种机器学习算法,包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。GraphX是用来操作图的程序库,可以进行并行的图计算。

    Spark具有很强的适应性,能够读取HDFS、Cassandra、HBase等为持久层读写原生数据,资源管理采用Mesos、YARN、Kubernetes等集群资源管理模式,或者Spark自带的独立运行模式以及本地运行模式。

    Spark具有一个庞大的生态圈,付诸于生产时还需要考虑参数调配、容错处理、监控、性能优化、存储、调度、部署等多个环节,涉及到具体的方方面面,仅以一个小系列的内容是无法穷尽的。本系列的主旨也并非简单的讲解Spark,而是要讲解Kafka与Spark之间的集成细节。本系列的文章会以尽量少的篇幅让读者对Spark能够有一个初步的了解,并且会以一个合适的篇幅来讲解Kafka与Spark Streaming的集成以及Kafka与Structured Streaming的集成。

    欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-and-spark-integration-1-spark-introduction/


    欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


    展开全文
  • 今天也要努力学习 ...SparkStreaming+Kafka 1.SpringStreaming+Kafka 接受数据和发送数据 (1)SparkStreaming 接受kafka方式 基于Received的方式 基于DirectKafkaStreaming DirectKafkaStreaming...

                                               今天也要努力学习

    SparkStreaming+Kafka

    1.SpringStreaming+Kafka 接受数据和发送数据

    (1)SparkStreaming 接受kafka方式

    • 基于Received的方式
      基于Receiverd方式获取数据
      这里写图片描述
    • 基于DirectKafkaStreaming
      KafkaStream-Recevied方式

    DirectKafkaStreaming 相比较 ReceiverKafkaStreaming

    • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
    • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
    • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

    (2)Spark 发送数据至Kafka中

    一般处理方式 : 在RDD.forpartition进行操作

    input.foreachRDD(rdd =>
      // 不能在这里创建KafkaProducer
      rdd.foreachPartition(partition =>
        partition.foreach{
          case x:String=>{
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            println(x)
            val producer = new KafkaProducer[String,String](props)
            val message=new ProducerRecord[String, String]("output",null,x)
            producer.send(message)
          }
        }
      )
    ) 
    

    此方式的缺点在于每次foreach操作都需要重新创建一次kafkaProduce 主要花费时间都在 创建连接的时候.
    基于此我们以以下方式进行操作

    • 首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:
     
        import java.util.concurrent.Future
        import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
        class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
          /* This is the key idea that allows us to work around running into
             NotSerializableExceptions. */
          lazy val producer = createProducer()
          def send(topic: String, key: K, value: V): Future[RecordMetadata] =
            producer.send(new ProducerRecord[K, V](topic, key, value))
          def send(topic: String, value: V): Future[RecordMetadata] =
            producer.send(new ProducerRecord[K, V](topic, value))
        }
        object KafkaSink {
          import scala.collection.JavaConversions._
          def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
            val createProducerFunc = () => {
              val producer = new KafkaProducer[K, V](config)
              sys.addShutdownHook {
                // Ensure that, on executor JVM shutdown, the Kafka producer sends
                // any buffered messages to Kafka before shutting down.
                producer.close()
              }
              producer
            }
            new KafkaSink(createProducerFunc)
          }
          def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
        }
    
    • 之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:
    // 广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", Conf.brokers)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      log.warn("kafka producer init done!")
      ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }
    
    • 这样我们就能在每个executor中愉快的将数据输入到kafka当中:
    //输出到kafka
    segmentedStream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {
        rdd.foreach(record => {
          kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
          // do something else
        })
      }
    })
    

    2.Spark streaming+Kafka调优

    2.1 批处理时间设置

    参数设置:

    2.2 合理的Kafka拉取量

    参数设置: spark.streaming.kafka.maxRatePerPartition

    2.3 缓存反复使用的Dstream(RDD)

    DStream.cache()

    2.4 设置合理的GC

    长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
    
    • 1

    2.5 设置合理的CPU资源数

    CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

    2.6设置合理的parallelism

    partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
    在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

    2.7使用高性能的算子

    • 使用reduceByKey/aggregateByKey替代groupByKey
    • 使用mapPartitions替代普通map
    • 使用foreachPartitions替代foreach
    • 使用filter之后进行coalesce操作
    • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
    展开全文
  • Spark与Kafka整合原理

    2019-01-08 19:55:40
    sparkkafka整合有2种方式 1、receiver 顾名思义:就是有一个线程负责获取数据,这个线程叫receiver线程 解释: 1、Spark集群中的某个executor中有一个receiver线程,这个线程负责从kafka中获取数据  注意:...
  • 原 [Kafka与Spark集成系列二] Spark的安装及简单应用https://blog.csdn.net/u013256816/article/details/82082019版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 ...
  • 环境 kafka_2.11-0.10.0.1 hadoop-2.6.0-cdh5.7.0 spark-2.2.0-bin-2.6.0-cdh5.7.0 Receiver方式 ...构造函数中的numThreads参数,对应提高sparkstreaming的并行度并没有关系,提高只有kafka的分区...
  • 截止撰稿之时,Spark最新版本为2.3.1,如下图所示,我们可以从官网中选择spark-2.3.1-bin-hadoop2.7.tgz进行下载。 在下载过后,笔者是先将安装包拷贝至/opt目录下,然后执行相应的解压缩动作,示例如下: ...
  • 大数据实时流式数据处理是大数据应用中最为常见的场景,我们的生活也息息相关,以手机流量实时统计来说,它总是能够实时的统计出用户的使用的流量,在第一时间通知用户流量的使用情况,并且最为人性化的为用户提供...
  • 微软的ASG (应用服务集团)包含Bing,、Office,、Skype。每天产生多达5 PB以上数据,如何构建一个高扩展性的data audit服务来保证这样量级的数据完整性和实时性非常具有挑战性。本文将介绍微软ASG大数据团队如何利用...
  • 原 [Kafka与Spark集成系列一] Spark入门https://blog.csdn.net/u013256816/article/details/82081946版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 ...
  • Kafka整合Spark Streaming的两种模式:Receiver模式和Direct直连模式。本篇先来介绍Receiver模式,讲述Receiver模式的原理,以及Receiver模式存在的问题和相关的解决办法。虽然在生产中,一般都会选择Direct直连模式...
  • SparkStreaming接收Kafka数据的两种方式一、SparkStreaming + Kafka Receiver模式二、SparkStreaming + Kafka Direct模式三、Direct模式Receiver模式比较四、SparkStreaming+Kafka维护消费者offset 一、Spark...
  • kafka与spark应用

    2016-09-01 08:48:19
    kafka主要是用来做日志收集,可以收集用户的流量,点击行为,kafka目前版本是1.0的,其0.9的版本坑特别多,... 1、spark与kafka的介绍 2、spark的集群安装 3、Spark RDD函数讲解实战分析 4、Spark 的java操作...
  • Spark Streaming的介绍,运行WordCount实例,程序运行原理与Kafka的集成及常用的API介绍。
  • 原 [Kafka与Spark集成系列三] Spark编程模型https://blog.csdn.net/u013256816/article/details/82082109版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 ...
  • 1.SparkStreaming模块运行原理: 使用场景: saprkStreaming分为多个RDD: RDD进行转换(transform函数): RDD运行Action函数: 2.Kafka模块详解: 3.Kafka+Flume ...
  • 看懂本文的前提是首先要熟悉kafka,然后了解spark Streaming的运行原理与kafka结合的两种形式,然后了解flink实时流的原理与kafka结合的方式。 kafka kafka作为一个消息队列,在企业中主要用于缓存数据,当然...
1 2 3 4 5 ... 20
收藏数 9,921
精华内容 3,968
关键字:

kafka与spark原理