2019-03-04 15:07:36 weixin_42003671 阅读数 644
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨

@羲凡——只为了更好的活着

SparkStreaming 读取kafka中数据(spark2.3.2)

流处理一般都会涉及到kafka,所以用SparkStreaming读取kafka中数据是流处理的必备技能之一。

1.先在pom.xml文件中添加

${spark.version} 即你的spark的版本,我spark是spark 2.3.2。我kafka是kafka_2.11-0.10.2.2

<!--kafka-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
	<version>${spark.version}</version>
	<scope>compile</scope>
</dependency>
<!--Kafka 0.10 Source For Structured Streaming-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
	<version>${spark.version}</version>
	<scope>compile</scope>
</dependency>
2.实例代码:
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies,KafkaUtils,LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestKafkaDirectStreamHA {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("TestDirectStreamHA")
      .master("local[*]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    val checkpointDir = "/aarontest/sparkstreaming/checkpoint/TestDirectStreamHA"

    def creatingFunc(): StreamingContext = {
      val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
      ssc.checkpoint(checkpointDir)
      val topic = Set("aarontest")
      val kafkaParams = Map(
        "bootstrap.servers" -> "deptest104:9092,deptest105:9092,deptest108:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "KafkaDirectStreamHA",
        //"auto.offset.reset" -> "latest",//从最新的开始消费,如果送指定消费offsets者需要去掉此行
        "enable.auto.commit" -> (false: java.lang.Boolean))
      val offsets = Map(
        new TopicPartition("aarontest", 0) -> 10L,
        new TopicPartition("aarontest", 1) -> 10L,
        new TopicPartition("aarontest", 1) -> 10L
      )
      val directKafkaDStream = KafkaUtils
        .createDirectStream(ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topic, kafkaParams, offsets))
        .map(_.value())
      val resultDStream = directKafkaDStream
        .mapPartitions(iter => {
          iter.filter(_.length > 0).filter(_.nonEmpty).flatMap(_.split(" ")).map((_, 1))
        }).reduceByKey(_ + _)
      resultDStream.print()
      ssc
    }

    val ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
    ssc.start()
    ssc.awaitTermination()
  }
}
3.结果展示:
-------------------------------------------
Time: 1551682105000 ms
-------------------------------------------

-------------------------------------------
Time: 1551682110000 ms
-------------------------------------------
(successed,1)
(spark,1)
(read,1)
(kafka,1)

-------------------------------------------
Time: 1551682115000 ms
-------------------------------------------
4.注意事项:

a.使用checkpoint保障sparkstreaming的高可用,即在代码停止运行后重启,任能继续消费kafka数据
b.如果kafka中不产生数据,代码任继续输出时间戳
c.auto.offset.reset表示从最新的开始消费,如果指定消费offsets则不需要此参数

====================================================================

@羲凡——只为了更好的活着

若对博客中有任何问题,欢迎留言交流

2017-09-16 11:47:46 u014033218 阅读数 417
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨

spark读取kafka数据 createStream和createDirectStream的区别
1、KafkaUtils.createDstream
构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上
A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量
B、对于不同的group和topic可以使用多个receivers创建不同的DStream
C、如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)
2.KafkaUtils.createDirectStream
区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api
优点:
A、 简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。
B、高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中
C、恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具

2017-09-25 19:26:00 xiesq5112 阅读数 5741
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨

简介:
目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时间间隔可以自定义)数据刷新的效果。

应用场景:
业务库系统做多维分析的时候,数据来源各不相同。很多历史数据都是每天定时跑批生成。但是做分析产品,对于T+0日的数据, 则不好取。对于T+0日的数据,目前我采取的解决方案就是Spark Streaming 读取Kafka写入到Elasticsearch,业务系统通过查询历史数据和T+0日数据,得到一个数据实时展示的效果。


先介绍一下内容涉及的几个版本:

<java.version>1.8</java.version>
<spark.version>1.6.2</spark.version>
<scala.version>2.10.6</scala.version>
<elasticsearch.version>5.2.0</elasticsearch.version>
<kafka.version>1.0</kafka.version>

下面是Spring boot搭建的项目结构:

这里写图片描述

之前学习的时候,参考的spark版本1.6.2,kafka版本是0.8的,但是后面自己做项目的kafka版本是1.0的。我把对应的kafka_2.10-0.8.2.1.jar改成kafka_2.10-0.10.0.0.jar 但是遇到了下面的这个异常:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
    at scala.Option.map(Option.scala:145)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
    at scala.util.Either$RightProjection.flatMap(Either.scala:523)
    at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
    at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
    at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
    at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155)
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213)
	at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
    at scala.util.Either$RightProjection.flatMap(Either.scala:523)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
    at com.midea.magiccube.spark.LoanInfoStatistic.getActionDStream(LoanInfoStatistic.java:210)
    at com.midea.magiccube.spark.LoanInfoStatistic.main(LoanInfoStatistic.java:69)

主要内容是:java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker,经过一番了解后,初步估计是kafka版本和spark版本不兼容,于是我又将版本回退,发现能够跑通。

pom.xml内容如下:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.7.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.7</java.version>
        <spark.version>1.6.2</spark.version>
        <scala.version>2.10.6</scala.version>
        <elasticsearch.version>5.2.0</elasticsearch.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>

        <!-- spark 相关 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- spark to es 相关支持 -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-13_2.10</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

接下来就是开发具体的Spark Streaming 读写的代码了。

  1. 配置SparkConf对象并初始化es配置参数。

    SparkConf sc = new SparkConf();
    sc.setAppName("Name").setMaster("local[2]");
    sc.set("es.nodes", IP);
    sc.set("es.index.auto.create", "true");
    sc.set("es.mapping.id", "id");
    sc.set("es.port", PORT);
    
  2. 绑定sc参数,并设置循环取数时间间隔为5s

    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(5));
    jssc.checkpoint("E:/checkpoint");
    
  3. 设置kafka配置信息,KafkaUtils.createDirectStream()方法读取信息得到 JavaPairDStream< String,String>对象dStream。

  4. dStream.mapToPair()解析kafka数据并封装成JavaPairDStream< String, 自定义实体> entityDStream对象。

  5. entityDStream.transform()将数据转化为JavaDStream dataDStream方便写入ES。

  6. 接着将数据写入ES,JavaEsSparkStreaming.saveToEs(dataDStream, “索引名”);

  7. 最后启动和关闭对象JavaStreamingContext jssc

    jssc.start();
    jssc.awaitTermination();
    jssc.close();
    

这里只是记录了操作的流程和开发中遇到的一些问题,我觉得重难点在于RDD的各种转换逻辑处理操作。这里没有细化下去,太广了。记录好了配置及处理流程,以后需要用时再去复习一下就能够快速熟悉,从而继续高效开发。

好记性不如烂笔头,主要还是为了方便自己以后查看。可能记录得有些简单了。如有疑问可私信沟通。

2018-12-03 23:59:57 m0_37914799 阅读数 422
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨

  spark Streaming从kafka中读取数据的方式分有两种,Receiver读取和Direct读取。

Receiver方式
​   Receiver是使用kafka的高层次Consumer API来实现的,Receiver从kafka中获取数据存储在Spark Executor的内存之中,当Spark Streaming启动job时,job会去处理那些数据。由于它是依靠底层来实现的,数据写在缓存中,在默认配置的情况下,可能会因为底层发生故障从而导致数据的丢失。

​   因此,要想保证数据可靠性,实现数据的零丢失,就需要启动高可用机制。首先在启动高可用之前,必须要启用Spark Streaming的预写日志机制(Write Ahead Log,简称WAL),该机制会同步地将接受到的kafka数据写入到分布式文件系统HDFS上的预写日志中。所以,即使底层节点出现了问题,也可以使用预写日志中的数据进行恢复。

注意点
​  1、kafka中topic的partition与Spark中RDD的partition没有关系,在使用KafkaUtils.createStream()中,提高partition的数据只会增加Receiver。也就是说,读取kafka中的topic partition的线程数量,不会增加Spark处理数据的并行度。

  ​2、可以创建多个kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。

  ​3、如果基于容错的文件系统,比如HDFS启用了预写入日志机制,接收到的数据都会被复制一份到预写入日志中。所以在kafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_DISK_SER.

Direct方式
​   Receiver是在Spark1.3以后的版本引入的,这种机制可以确保更加健壮的机制。这种方式会周期性地查询kafka,来获得每个topic的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用kafka的简单consumer api来获取kafka指定offset。
Direct的优点

  读取速度快:直接到kafka拿数据消费,不会存到内存再消费。
  简单并行读取:如果要读取多个partition,不需要创建多个输入kafka流并将其合并。使用directStream,Spark Streaming将创建于使用kafka分区一样多的RDD分区,这些分区将全部从kafka并行读取数据。所以在kafka和RDD分区之间有一对一的映射关系。

  ​ 高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制,由于数据需要被复制两份(kafka自己本身的可靠机制会复制一份数据,还有一份需要复制到WAL中),所以效率比较底下。而基于Direct的方式,不依赖Receiver,也不需要开启WAL机制,只要kafka中做了数据的复制,就可以通过kafka的副本进行恢复,可以减少复制次数提高性能。

​   正好一次语义(一次且仅一次事务机制):Receiver中使用kafka的高级API在Zookeeper中存储消耗过的偏移量(offset),这是kafka传统的消费数据的方式。这种方法(结合提前写入日志)虽然可以保证数据的零丢失(即至少一次语义),但是在有些情况下可能导致有些记录被消费两次。出现这种情况的原因是Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间不一致。
  基于direct的方式,使用kafka的简单API,Spark Streaming自己就负责追踪消费的offset,并保存到checkpoint中,Spark Streaming自己跟踪偏移量,这样就消除了Spark Streaming与Zookeeper与kafka之间数据的不一致性。因此,Spark Streaming每次记录都会在发生故障的情况下有效地收到一次。spark自己一定是同步的,因此可以保证数据是消费一次且仅一次消费。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是等幂的,或者是保存结果和偏移量的原子事物。由于数据消费偏移量是保存在checkpoint中,因此,后续想使用kafka高级API消费数据,需要手动更新zookeeper中的偏移量。

2019-10-10 16:24:30 qq_27814951 阅读数 24
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨

spark streaming 从kafka读取数据,将流处理结果写入mysql

//spark streaming 从kafka读取数据,将流处理结果写入mysql
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import java.sql.{PreparedStatement,Connection,DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, Minutes}
import org.apache.log4j.Logger
import org.apache.log4j.Level

object KafkaWordCount{
    def main(args:Array[String]){
	  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "localhost:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "use_a_separate_group_id_for_each_stream",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean))
                
		//val numThreads = 3  //每个topic的分区数
        //val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
	    val sc = new SparkConf().setAppName("ZkWordCount")
        val ssc = new StreamingContext(sc, Seconds(10))
	    ssc.checkpoint("hdfs://nameservice1/test/checkpoint") //设置检查点
		val topics = Array("zklog")
        val stream = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
         )
        
		//val line = stream.map(record => (record.key, record.value))
	    val lines = stream.map(record => (record.value))
        val words = lines.flatMap(_.split(","))
        val pair = words.map(x => (x,1))
        val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(1),Seconds(10),3) //这行代码的含义在下一节的窗口转换操作中会有介绍
        wordCounts.print()
		//下面是新增的语句,把DStream保存到MySQL数据库中     
        wordCounts.foreachRDD(rdd => {
			def func(records: Iterator[(String,Int)]) {
				var conn: Connection = null
				var stmt: PreparedStatement = null
				try {
					val url = "jdbc:mysql://172.16.104.64:3306/bigdata"
					val user = "admin"
					val password = "Admin@Bigdata510630"  
					conn = DriverManager.getConnection(url, user, password)
					records.foreach(p => {
						val sql = "insert into zklog(information,count) values (?,?)"
						stmt = conn.prepareStatement(sql);
						stmt.setString(1, p._1.trim)
						stmt.setInt(2,p._2.toInt)
						stmt.executeUpdate()
					})
				} catch {
					case e: Exception => e.printStackTrace()
				} finally {
					if (stmt != null) {
						stmt.close()
					}
					if (conn != null) {
					conn.close()
					}
				}
			}

			val repartitionedRDD = rdd.repartition(3)
			repartitionedRDD.foreachPartition(func)
		})
        ssc.start
        ssc.awaitTermination
	}
}
没有更多推荐了,返回首页