2017-02-19 23:09:37 xuyaoqiaoyaoge 阅读数 4892
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1686 人正在学习 去看看 李飞

kafka

kafka中文教程

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力.
Apache kafka是消息中间件的一种。
一 、术语介绍
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。每个topic都具有这两种模式:(队列:消费者组(consumer group)允许同名的消费者组成员瓜分处理;发布订阅:允许你广播消息给多个消费者组(不同名))。
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker,比如flume采集机就是Producer。
Consumer
消息消费者,向Kafka broker读取消息的客户端。比如Hadoop机器就是Consumer。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

二、使用场景
1、Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的”事务性”“消息传输担保(消息确认机制)”“消息分组”等企业级特性;kafka只能使用作为”常规”的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
2、Websit activity tracking
kafka可以作为”网站活性跟踪”的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等
3、Log Aggregation
kafka的特性决定它非常适合作为”日志收集中心”;application可以将操作日志”批量”“异步”的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.
4、它应用于2大类应用
构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。
构建实时流的应用程序,对数据流进行转换或反应。

三、分布式
Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求,而follower被动的复制数据。如果leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

四、消息处理顺序
Kafka保证消息的顺序不变。 在这一点上Kafka做的更好,尽管并没有完全解决上述问题。 Kafka采用了一种分而治之的策略:分区。 因为Topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。

五、安装
kafka安装和启动

六、Key和Value
Kafka是一个分布式消息系统,Producer生产消息并推送(Push)给Broker,然后Consumer再从Broker那里取走(Pull)消息。Producer生产的消息就是由Message来表示的,对用户来讲,它就是键-值对。

Message => Crc MagicByte Attributes Key Value

kafka会根据传进来的key计算其分区,但key可以不传,可以为null,空的话,producer会把这条消息随机的发送给一个partition。

这里写图片描述

MessageSet用来组合多条Message,它在每条Message的基础上加上了Offset和MessageSize,其结构是:

MessageSet => [Offset MessageSize Message]

它的含义是MessageSet是个数组,数组的每个元素由三部分组成,分别是Offset,MessageSize和Message,它们的含义分别是:
这里写图片描述

七、小例子
1.启动ZooKeeper
进入kafka目录,加上daemon表示在后台启动,不占用当前的命令行窗口。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
如果要关闭,下面这个
bin/zookeeper-server-stop.sh
ZooKeeper 的端口号是2181,输入jps查看进程号是QuorumPeerMain
2.启动kafka
在server.properties中加入,第一个是保证你删topic可以删掉,第二个不然的话就报topic找不到的错误:
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
然后:
bin/kafka-server-start.sh -daemon config/server.properties
如果要关闭,下面这个
bin/kafka-server-stop.sh
Kafka的端口号是9092,输入jps查看进程号是Kafka
3.创建一个主题(topic)
创建一个名为“test”的Topic,只有一个分区和一个备份:
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
创建好之后,可以通过运行以下命令,查看已创建了哪些topic:
bin/kafka-topics.sh –list –zookeeper localhost:2181
查看具体topic的信息:
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test
4.发送消息
启动kafka生产者:
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
5.接收消息
新开一个命令行窗口,启动kafka消费者:
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
6.最后
在producer窗口中输入消息,可以在consumer窗口中显示:
这里写图片描述
这里写图片描述

spark streaming

spark中文学习指南

Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。

Spark Streaming的优势在于:
能运行在100+的结点上,并达到秒级延迟。
使用基于内存的Spark作为执行引擎,具有高效和容错的特性。
能集成Spark的批处理和交互查询。
为实现复杂的算法提供和批处理类似的简单接口。

首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。
在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。
正如Spark Streaming最初的目标一样,它通过丰富的API和基于内存的高速计算引擎让用户可以结合流式处理,批处理和交互查询等应用。因此Spark Streaming适合一些需要历史数据和实时数据结合分析的应用场合。当然,对于实时性要求不是特别高的应用也能完全胜任。另外通过RDD的数据重用机制可以得到更高效的容错处理。

当一个上下文(context)定义之后,你必须按照以下几步进行操作:
定义输入源;
准备好流计算指令;
利用streamingContext.start()方法接收和处理数据;
处理过程将一直持续,直到streamingContext.stop()方法被调用。

可以利用已经存在的SparkContext对象创建StreamingContext对象:

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

窗口函数
对于spark streaming中的窗口函数,参见:
窗口函数解释

对非(K,V)形式的RDD 窗口化reduce:
1.reduceByWindow(reduceFunc, windowDuration, slideDuration)
2.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

对(K,V)形式RDD 按Key窗口化reduce:
1.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
2.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc)
从效率来说,应选择带有invReduceFunc的方法。

可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支:

 dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })

spark执行时间是少了,但数据库压力比较大,会一直占资源。

小例子:

package SparkStreaming

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

object Spark_streaming_Test {
  def main(args: Array[String]): Unit = {
    //local[2]表示在本地建立2个working线程
    //当运行在本地,如果你的master URL被设置成了“local”,这样就只有一个核运行任务。这对程序来说是不足的,因为作为receiver的输入DStream将会占用这个核,这样就没有剩余的核来处理数据了。
    //所以至少得2个核,也就是local[2]
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //时间间隔是1秒
    val ssc = new StreamingContext(conf, Seconds(1))
    //有滑动窗口时,必须有checkpoint
    ssc.checkpoint("F:\\checkpoint")
    //DStream是一个基类
    //ssc.socketTextStream() 将创建一个 SocketInputDStream;这个 InputDStream 的 SocketReceiver 将监听服务器 9999 端口
    //ssc.socketTextStream()将 new 出来一个 DStream 具体子类 SocketInputDStream 的实例。
    val lines = ssc.socketTextStream("192.168.1.66", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    //    val lines = ssc.textFileStream("F:\\scv")
    val words = lines.flatMap(_.split(" ")) // DStream transformation
    val pairs = words.map(word => (word, 1)) // DStream transformation
    //    val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation
    //每隔3秒钟,计算过去5秒的词频,显然一次计算的内容与上次是有重复的。如果不想重复,把2个时间设为一样就行了。
    //    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(5), Seconds(3))
    val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, _ - _, Seconds(5), Seconds(3))
    windowedWordCounts.filter(x => x._2 != 0).print()
    //    wordCounts.print() // DStream output,打印每秒计算的词频
    //需要注意的是,当以上这些代码被执行时,Spark Streaming仅仅准备好了它要执行的计算,实际上并没有真正开始执行。在这些转换操作准备好之后,要真正执行计算,需要调用如下的方法
    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    //在StreamingContext上调用stop()方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置stop()的可选参数为false
    //一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)
    ssc.stop()
  }
}

1.启动
start-dfs.sh
start-yarn.sh
这里写图片描述
2.终端输入:
nc -lk 9999
然后在IEDA中运行spark程序。由于9999端口中还没有写东西,所以运行是下图:

这里写图片描述
只有时间,没有打印出东西。然后在终端输入下面的东西,也可以从其他地方复制进来。
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
这时,IDEA的控制台就输出下面的东西。
这里写图片描述

3.下面运行带时间窗口的,注意如果加了时间窗口就必须有checkpoint
输入下面的,不要一次全输入,一次输个几行。
checkpoint
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
ni hao a
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
hello world
hello hadoop
hadoop love
love cat
cat love rabbit

先是++–的那种:
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
再然后是不++–的那种:
这里写图片描述
这里写图片描述
++–的那种是因为把过去的RDD也带进来计算了,所以出现了0这个情况,为了避免这种情况只能在打印前过滤掉0的再打印。而没有++–的那种情况是不需要这样做的。

Checkpointing
在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。就可以通过streamingContext.checkpoint(checkpointDirectory)方法来做。
默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过dstream.checkpoint来设置。需要注意的是,随着 streaming application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的5~10倍。

package streaming

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Administrator on 2017/3/12.
  */

object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
    println("Creating new context") //如果没有出现这句话,说明StreamingContext是从checkpoint里面加载的
    val outputFile = new File(outputPath) //输出文件的目录
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1)) //时间间隔是1秒
    ssc.checkpoint(checkpointDirectory) //设置一个目录用于保存checkpoint信息

    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    val windowedWordCounts = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10))
    windowedWordCounts.checkpoint(Seconds(10))//一般推荐设置为 batch duration 的5~10倍,即StreamingContext的第二个参数的5~10倍
    windowedWordCounts.print()
    Files.append(windowedWordCounts + "\n", outputFile, Charset.defaultCharset())
    ssc
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 4) {
      System.exit(1)
    }
    val ip = args(0)
    val port = args(1).toInt
    val checkpointDirectory = args(2)
    val outputPath = args(3)
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}

优化
1.数据接收的并行水平
创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。因此允许数据并行接收,提高整体的吞吐量。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

多输入流或者多receiver的可选的方法是明确地重新分配输入数据流(利用inputStream.repartition()),在进一步操作之前,通过集群的机器数分配接收的批数据。
2.任务序列化
运行kyro序列化任何可以减小任务的大小,从而减小任务发送到slave的时间。

    val conf = new SparkConf().setAppName("analyse_domain_day").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

3.设置合适的批间隔时间(即批数据的容量)
批处理时间应该小于批间隔时间。如果时间间隔是1秒,但处理需要2秒,则处理赶不上接收,待处理的数据会越来越多,最后就嘣了。
找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。为了验证你的系统是否能满足数据处理速率,你可以通过检查端到端的延迟值来判断(可以在Spark驱动程序的log4j日志中查看”Total delay”或者利用StreamingListener接口)。如果延迟维持稳定,那么系统是稳定的。如果延迟持续增长,那么系统无法跟上数据处理速率,是不稳定的。你能够尝试着增加数据处理速率或者减少批容量来作进一步的测试。

DEMO

spark流操作kafka有两种方式:
一种是利用接收器(receiver)和kafaka的高层API实现。
一种是不利用接收器,直接用kafka底层的API来实现(spark1.3以后引入)。

相比基于Receiver方式有几个优点:
1、不需要创建多个kafka输入流,然后Union他们,而使用DirectStream,spark Streaming将会创建和kafka分区一样的RDD的分区数,而且会从kafka并行读取数据,Spark的分区数和Kafka的分区数是一一对应的关系。
2、第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次:一次是被Kafka复制;另一次是写入到WAL中。
Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。
3、Receiver方式读取kafka,使用的是高层API将偏移量写入ZK中,虽然这种方法可以通过数据保存在WAL中保证数据的不对,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。
第二种方式不采用ZK保存偏移量,消除了两者的不一致,保证每个记录只被Spark Streaming操作一次,即使是在处理失败的情况下。如果想更新ZK中的偏移量数据,需要自己写代码来实现。
由于直接操作的是kafka,kafka就相当于你底层的文件系统。这个时候能保证严格的事务一致性,即一定会被处理,而且只会被处理一次。

首先去maven的官网上下载jar包
spark-streaming_2.10-1.6.2.jar
spark-streaming-kafka_2.10-1.6.2.jar
我的Scala是2.10的,spark是1.6.0的,下载的spark.streaming和kafka版本要与之对应,spark-streaming_2.10-1.6.2.jar中2.10是Scala版本号,1.6.2是spark版本号。当然下载1.6.1也行。
需要添加 kafka-clients-0.8.2.1.jar以及kafka_2.10-0.8.2.1.jar
这里的2.10是Scala版本号,0.8.2.1是kafka的版本号。就下这个版本,别的版本不对应,会出错。

在kafka的配置文件里面:
delete.topic.enable=true
host.name=192.168.1.66
zookeeper.connect=192.168.1.66:2181
我这里写主机名的话,各种报错,所以干脆就写IP地址了。
启动kafka以及ZK的步骤和kafka 1-2是一样的。
进入/kafka_2.10-0.8.2.1 新建一个主题:
bin/kafka-topics.sh –create –zookeeper 192.168.1.66:2181 –replication-factor 1 –partitions 1 –topic test
启动一个生产者:
bin/kafka-console-producer.sh –broker-list 192.168.1.66:9092 –topic test
在自己的电脑上运行spark程序后,在命令行输入:
这里写图片描述
在控制台会显示:
这里写图片描述

package SparkStreaming

//TopicAndPartition是对 topic和partition的id的封装的一个样例类
import kafka.common.TopicAndPartition
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import kafka.serializer.StringDecoder

object SparkStreaming_Kafka_Test {

  val kafkaParams = Map(
    //kafka broker的IP加端口号,这个是必须的
    "metadata.broker.list" -> "192.168.1.66:9092",
    // "group.id" -> "group1",
    /*此配置参数表示当此groupId下的消费者,
     在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
     consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),
     smallest表示最小offset,即从topic的开始位置消费所有消息.*/
    "auto.offset.reset" -> "smallest"
  )

  val topicsSet = Set("test")

  //  val zkClient = new ZkClient("xxx:2181,xxx:2181,xxx:2181",Integer.MAX_VALUE,100000,ZKStringSerializer)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming_Kafka_Test")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))
    ssc.checkpoint("F:\\checkpoint")
    /*
    KafkaUtils.createDirectStream[
       [key的数据类型], [value的数据类型], [key解码的类], [value解码的类] ](
       streamingContext, [Kafka配置的参数,是一个map], [topics的集合,是一个set])
       */
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2) //取value
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。所以说checkpoint就已经可以保证容错性了。
如果需要把偏移量写入ZK,首先在工程中新建一个包:org.apache.spark.streaming.kafka,然后建一个KafkaCluster类:

package org.apache.spark.streaming.kafka

import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]

  @transient private var _config: SimpleConsumerConfig = null

  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
    }
    _config
  }

  def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    val meta = offsets.map {
      kv => kv._1 -> OffsetAndMetadata(kv._2)
    }
    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
  }

  def setConsumerOffsetMetadata(groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.commitStatus
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
            errs.append(ErrorMapping.exceptionFor(err))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    Left(errs)
  }

  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
        fn(consumer)
      } catch {
        case NonFatal(e) =>
          errs.append(e)
      } finally {
        if (consumer != null) {
          consumer.close()
        }
      }
    }
  }

  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)
}

然后在主函数中:

// 手动更新ZK偏移量,使得基于ZK偏移量的kafka监控工具可以使用
    messages.foreachRDD(rdd => {
      // 先处理消息
      val lines = rdd.map(_._2) //取value
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
      wordCounts.foreach(println)
      // 再更新offsets
      //spark内部维护kafka偏移量信息是存储在HasOffsetRanges类的offsetRanges中
      //OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移。
      val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
      val kc = new KafkaCluster(kafkaParams)
      for (offsets <- offsetsList) {
        val topicAndPartition = TopicAndPartition("test", offsets.partition)
        val o = kc.setConsumerOffsets("group1", Map((topicAndPartition, offsets.untilOffset)),8)
        if (o.isLeft) {
          println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
        }
      }
    })

下面是用kafka的API自己写一个程序读取文件,作为kafka的生产者,需要将Scala和kafka的所有的jar包都导入,lib文件夹下面的都导入进去。
如果没有2台电脑,可以开2个开发环境,IDEA作为消费者,eclipse作为生产者。
生产者代码如下:

package spark_streaming_kafka_test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MakeRealtimeDate extends Thread {

    private Producer<Integer, String> producer;

    public MakeRealtimeDate() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "192.168.1.66:2181");
        props.put("metadata.broker.list", "192.168.1.66:9092");
        ProducerConfig pc = new ProducerConfig(props);
        producer = new Producer<Integer, String>(pc);
    }

    public void run() {
        while (true) {
            File file = new File("C:\\Users\\Administrator\\Desktop\\wordcount.txt");
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new FileReader(file));
            } catch (FileNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            String lineTxt = null;
            try {
                while ((lineTxt = reader.readLine()) != null) {
                    System.out.println(lineTxt);
                    producer.send(new KeyedMessage<Integer, String>("test", lineTxt));
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        new MakeRealtimeDate().start();
    }

}

先启动之前写的sparkstreaming消费者统计单词个数的程序,然后再启动我们现在写的这个生产者程序,最后就会在IDEA的控制台中看到实时结果。

2018-08-17 11:18:42 u013817676 阅读数 389
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1686 人正在学习 去看看 李飞

本文实现kafka与Spark Streaming之间的通信,其中Kafka端producer实现使用Java,Spark Streaming端Consumer使用Python实现。

首先安装kafka与spark streaming环境,kafka测试连通测试参考上文,本文的实验环境都为本地单机版本。

Kafka

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class producer {
    private final static String TOPIC = "data-message";
    private final static String BOOTSTRAP_SERVER = "127.0.0.1:9092";


    public static Producer<String,String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        return new KafkaProducer<>(props);
    }

    // 实现自定义partition
    public static int partition(long time){
        if(time%2 == 0)
            return 0;
        else
            return 1;
    }
    public static void runProducer() throws Exception{
        final Producer<String,String> producer = createProducer();
        long time  = System.currentTimeMillis();
        long curTime = time;
        try{
            while(true){
                curTime = System.currentTimeMillis();
                if(curTime-time == 10000){
                    final ProducerRecord<String,String> record =
                            new ProducerRecord<>(TOPIC, partition(curTime) ,"JP_"+curTime,"AUX|989|bid|276|"+curTime);
                    RecordMetadata metadata = producer.send(record).get();
                    long elapsedTime = System.currentTimeMillis() - time;
                    System.out.printf("sent record(key=%s value=%s) " +
                            "meta(partition=%d, offset=%d) time=%d\n",
                    record.key(), record.value(), metadata.partition(),
                    metadata.offset(), elapsedTime);
                    curTime = time = System.currentTimeMillis();
                }
            }
        } finally {
            producer.flush();
            producer.close();
        }   
    }
    public static void main(String[] args) throws Exception{
            runProducer();
    }
}

Spark Streaming实现了Spark Steaming两者通信方式,createStream和createDirectStream

import os

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import configparser

def startReceiver(config,topics,ssc):
    #connect kafka
    kafkaStreams = [KafkaUtils.createStream(ssc,config.get('oppo','zookeeper'),
                                       config.get('oppo','consumer'),topics) for _ in range(int(config.get('oppo','numStreams')))]
    uniStream = ssc.union(*kafkaStreams)

    stream = uniStream.map(lambda x: x[0])

    stream.pprint()

    ssc.start()
    ssc.awaitTermination()

def startDirect(config,topic,ssc):
    brokerList = config.get('oppo','brokerList')
    #connect kafka
    kafkaStreams = KafkaUtils.createDirectStream(ssc,[config.get('oppo','topic')],
                                                  {"metadata.broker.list":brokerList})
    stream = kafkaStreams.map(lambda x: x[1])
    stream.pprint()

    ssc.start()
    ssc.awaitTermination()

if __name__ == '__main__':
    config = configparser.SafeConfigParser()
    config.read("properties.conf")

    sc = SparkContext(appName=config.get('oppo', 'appName'))
    sc.setLogLevel(config.get('oppo', 'logLevel'))

    # create Streaming Context
    # deal with internal 10 seconds
    ssc = StreamingContext(sc, 10)

    topic = config.get('oppo', 'topic')
    topics = {topic: 0, topic: 1}

    #startReceiver(config,topics,ssc)
    startDirect(config,topic,ssc)

properties.conf配置文件

[oppo]
appName = SparkStreamingKafka
logLevel = WARN
topic = data-message
partitions = 2
zookeeper=127.0.0.1:2181
numStreams = 2
consumer = spark-streaming
brokerList=127.0.0.1:9092
2018-08-26 16:56:55 u013256816 阅读数 581
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1686 人正在学习 去看看 李飞

欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


在分布式环境下,Spark集群采用的是主从架构。如下图所示,在一个Spark集群中,有一个节点负责中央协调,调度各个分布式工作节点,这个中央协调节点被称为驱动器(Driver)节点,与之对应的工作节点被称为执行器(Executor)节点。驱动器节点可以和大量的执行器节点进行通信,它们也都作为独立的进程运行。驱动器节点和所有的执行器节点一起被称为一个Spark应用(Application)。

Spark应用通过一个叫做集群管理器(Cluster Manager)的外部服务在集群中的机器上启动。Spark自带的集群管理器被称为独立集群管理器。Spark也能运行在YARN、Mesos、Kubernetes这类开源集群管理器上。

Spark驱动器节点是执行你程序中的main()方法的进程。它执行用户编写的用来创建SparkContext、创建RDD,以及进行RDD的转换操作和行动操作的代码。其实,当你启动spark-shell时,你就启动了一个Spark驱动程序。驱动程序一旦停止,Spark应用也就结束了。

驱动器程序在Spark应用中有两个职责:把用户程序转为任务以及为执行器节点调度任务。

Spark驱动器程序负责把用户程序转为多个物理执行的单元,这些单元也被称为任务(Task)。任务是Spark中最小的工作单元,用户程序通常要启动成百上千的独立任务。从上层来看,所有的Spark程序都遵循同样的结构:程序从输入数据创建一系列RDD,再使用转换操作派生出新的RDD,最后使用行动操作收集或存储结果RDD中的数据。Spark程序其实是隐式地创建出了一个由操作组成的逻辑上的有向无环图(Directed Acyclic Graph,简称DAG)。当驱动器程序运行时,它会把这个逻辑图转为物理执行计划。

有了物理执行计划之后,Spark驱动器程序必须在各执行器进程间协调任务的调度。执行器进程启动后,会向驱动器进程注册自己。因此,驱动器进程始终对应用中所有的执行器节点有完整的记录。每个执行器节点代表一个能够处理任务和存储RDD数据的进程。

Spark驱动器程序会根据当前的执行器节点集合,尝试把所有任务基于数据所在位置分配给合适的执行器进程。当任务执行时,执行器进程会把缓存数据存储起来,而驱动器进程同样会跟踪这些缓存数据的位置,并且利用这些位置信息来调度以后的任务,以尽量减少数据的网络传输。

Spark执行器节点是一种工作进程,负责在Spark作业中运行任务,任务间相互独立。Spark应用启动时,执行器节点就被同步启动,并且始终伴随着整个Spark应用的生命周期而存在。如果有执行器节点发生了异常或崩溃,Spark应用也可以继续执行。执行器进程有两大作用:第一,它们负责运行组成Spark应用的任务,并将结果返回给驱动器进程;第二,它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在执行器进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

Spark依赖于集群管理器来启动执行器节点,而在某些特殊的情况下,也依赖集群管理器来启动驱动器节点。集群管理器是Spark中的可插拔式组件,这样既可选择Spark自带的独立集群管理,也可以选择前面所提及的YARN、Mesos之类的外部集群管理器。

不论你使用的是哪一种集群管理器,你都可以使用Spark提供的统一脚本spark-submit将你的应用提交到那种集群管理器上。通过不同的配置选项,spark-submit可以连接到相应的集群管理器上,并控制应用所使用的资源数量。在使用某些特定集群管理器时,spark-submit也可以将驱动器节点运行在集群内部(比如一个YARN的工作节点)。但对于其他的集群管理器,驱动器节点只能被运行在本地机器上。

在集群上运行spark应用的详细过程为:1. 用户通过spark-submit脚本提交应用。2. spark-submit脚本启动驱动器程序,调用用户定义的main()方法。3. 驱动器程序与集群管理器通信,申请资源以启动执行器节点。4. 集群管理器为驱动器程序启动执行器节点。5. 驱动器进行执行用户应用中的操作。根据程序中所定义的对RDD的转换操作和行动操作,驱动器节点把工作以任务的形式发送到执行器进行。6. 任务在执行器程序中进行计算并保存结果。7. 如果驱动器程序的main()方法退出,或者调用了SparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源。


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


2018-05-14 19:46:32 CRISPY_RICE 阅读数 1299
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1686 人正在学习 去看看 李飞

一、准备环境: 创建Kafka Topic和HBase表

1. 在kerberos环境下创建Kafka Topic

1.1 因为kafka默认使用的协议为PLAINTEXT,在kerberos环境下需要变更其通信协议: 在${KAFKA_HOME}/config/producer.propertiesconfig/consumer.properties下添加

security.protocol=SASL_PLAINTEXT

1.2 在执行前,需要在环境变量中添加KAFKA_OPT选项,否则kafka无法使用keytab:

export KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/usr/ndp/current/kafka_broker/conf/kafka_jaas.conf"

其中kafka_jaas.conf内容如下:

cat /usr/ndp/current/kafka_broker/conf/kafka_jaas.conf

KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka/hzadg-mammut-platform3.server.163.org@BDMS.163.COM";
};

1.3 创建新的topic:

bin/kafka-topics.sh --create --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --replication-factor 1 --partitions 1 --topic spark-test

1.4 创建生产者:

bin/kafka-console-producer.sh  --broker-list hzadg-mammut-platform2.server.163.org:6667,hzadg-mammut-platform3.server.163.org:6667,hzadg-mammut-platform4.server.163.org:6667 --topic spark-test --producer.config ./config/producer.properties

1.5 测试消费者:

bin/kafka-console-consumer.sh --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --bootstrap-server hzadg-mammut-platform2.server.163.org:6667 --topic spark-test --from-beginning --new-consumer  --consumer.config ./config/consumer.properties

2. 创建HBase表

2.1 kinit到hbase账号,否则无法创建hbase表

kinit -kt /etc/security/keytabs/hbase.service.keytab hbase/hzadg-mammut-platform2.server.163.org@BDMS.163.COM

 ./bin/hbase shell
> create 'recsys_logs', 'f'

二、编写Spark代码

编写简单的Spark Java程序,功能为: 从Kafka消费信息,同时将batch内统计的数量写入Hbase中,具体可以参考项目:

https://github.com/LiShuMing/spark-demos

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.netease.spark.streaming.hbase;

import com.netease.spark.utils.Consts;
import com.netease.spark.utils.JConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class JavaKafkaToHBaseKerberos {
  private final static Logger LOGGER = LoggerFactory.getLogger(JavaKafkaToHBaseKerberos.class);

  private static HConnection connection = null;
  private static HTableInterface table = null;

  public static void openHBase(String tablename) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    synchronized (HConnection.class) {
      if (connection == null)
        connection = HConnectionManager.createConnection(conf);
    }

    synchronized (HTableInterface.class) {
      if (table == null) {
        table = connection.getTable("recsys_logs");
      }
    }
  }

  public static void closeHBase() {
    if (table != null)
      try {
        table.close();
      } catch (IOException e) {
        LOGGER.error("关闭 table 出错", e);
      }
    if (connection != null)
      try {
        connection.close();
      } catch (IOException e) {
        LOGGER.error("关闭 connection 出错", e);
      }
  }

  public static void main(String[] args) throws Exception {
    String hbaseTable = JConfig.getInstance().getProperty(Consts.HBASE_TABLE);
    String kafkaBrokers = JConfig.getInstance().getProperty(Consts.KAFKA_BROKERS);
    String kafkaTopics = JConfig.getInstance().getProperty(Consts.KAFKA_TOPICS);
    String kafkaGroup = JConfig.getInstance().getProperty(Consts.KAFKA_GROUP);

    // open hbase
    try {
      openHBase(hbaseTable);
    } catch (IOException e) {
      LOGGER.error("建立HBase 连接失败", e);
      System.exit(-1);
    }

    SparkConf conf = new SparkConf().setAppName("JavaKafakaToHBase");
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(kafkaTopics.split(",")));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", kafkaBrokers);
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", kafkaGroup);
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", false);
    // 在kerberos环境下,这个配置需要增加
    kafkaParams.put("security.protocol", "SASL_PLAINTEXT");

    // Create direct kafka stream with brokers and topics
    final JavaInputDStream<ConsumerRecord<String, String>> stream =
        KafkaUtils.createDirectStream(
            ssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topicsSet.toArray(new String[0])), kafkaParams)
        );

    JavaDStream<String> lines = stream.map(new Function<ConsumerRecord<String, String>, String>() {
      private static final long serialVersionUID = -1801798365843350169L;

      @Override
      public String call(ConsumerRecord<String, String> record) {
        return record.value();
      }
    }).filter(new Function<String, Boolean>() {
      private static final long serialVersionUID = 7786877762996470593L;

      @Override
      public Boolean call(String msg) throws Exception {
        return msg.length() > 0;
      }
    });

    JavaDStream<Long> nums = lines.count();

    nums.foreachRDD(new VoidFunction<JavaRDD<Long>>() {
      private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");

      @Override
      public void call(JavaRDD<Long> rdd) throws Exception {
        Long num = rdd.take(1).get(0);
        String ts = sdf.format(new Date());
        Put put = new Put(Bytes.toBytes(ts));
        put.add(Bytes.toBytes("f"), Bytes.toBytes("nums"), Bytes.toBytes(num));
        table.put(put);
      }
    });

    ssc.start();
    ssc.awaitTermination();
    closeHBase();
  }
}

三、 编译并在Yarn环境下运行

3.1 切到项目路径下,编译项目:

mvn clean package

3.2 运行Spark环境

  • 由于executor需要访问kafka,所以需要将Kafka授权过的kerberos用户下发至executor中;
  • 由于集群环境的hdfs也是kerberos加密的,需要通过spark.yarn.keytab/spark.yarn.principal配置可以访问Hdfs/HBase的keytab信息;

在项目目录下执行如下:

/usr/ndp/current/spark2_client/bin/spark-submit \
--files ./kafka_client_jaas.conf,./kafka.service.keytab \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
--driver-java-options "-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
--conf spark.yarn.keytab=/etc/security/keytabs/hbase.service.keytab \
--conf spark.yarn.principal=hbase/hzadg-mammut-platform1.server.163.org@BDMS.163.COM \
--class com.netease.spark.streaming.hbase.JavaKafkaToHBaseKerberos \
--master yarn  \
--deploy-mode client \
./target/spark-demo-0.1.0-jar-with-dependencies.jar  

其中kafka_client_jaas.conf文件具体内容如下:

cat kafka_client_jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
renewTicket=true
keyTab="./kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafka/hzadg-mammut-platform1.server.163.org@BDMS.163.COM";
};

3.2 执行结果

这里写图片描述

这里写图片描述

2018-12-29 16:40:26 qq_16457097 阅读数 1079
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1686 人正在学习 去看看 李飞

spark到kafka的消息消费--SparkKafkaDriverHAZooKeeperOps

流式计算中最重要的消息的消费

当我们使用spark做准实时计算的时候,很大场景都是和kafka的通信,总结下spark使用kafka的注意事项,下面上代码
package com.aura.bigdata.spark.scala.streaming.p1

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.JavaConversions

/**
  * 使用的zookeeper来管理sparkdriver读取的offset偏移量
  * 将kafka对应的topic的offset保存到的路径
  *
  * 约定,offset的保存到路径
  * /xxxxx/offsets/topic/group/partition/
  *     0
  *     1
  *     2
  *
  * bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka
  */
object _07SparkKafkaDriverHAZooKeeperOps {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)

        if(args == null || args.length < 4) {
            println(
                """
                  |Parameter Errors! Usage: <batchInterval> <zkQuorum> <groupId> <topics>
                  |batchInterval        : 批次间隔时间
                  |zkQuorum             : zookeeper url地址
                  |groupId              : 消费组的id
                  |topic                : 读取的topic
                """.stripMargin)
            System.exit(-1)
        }
        val Array(batchInterval, zkQuorum, group, topic) = args
        val kafkaParams = Map[String, String](
            "bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",
            "auto.offset.reset"-> "smallest"
        )

        val conf = new SparkConf().setMaster("local[2]").setAppName("_06SparkKafkaDirectOps2")

        def createFunc():StreamingContext = {
            val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))
            //读取kafka的数据
            val messages = createMessage(ssc, kafkaParams, topic, group)
            //业务操作
            messages.foreachRDD((rdd, bTime) => {
                if(!rdd.isEmpty()) {
                    println("###########################->RDD count: " + rdd.count)
                    println("###########################->RDD count: " + bTime)
                    //所有的业务操作只能在这里完成 这里的处理逻辑和rdd的操作一模一样
                }
                //处理完毕之后将偏移量保存回去
                storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, topic, group)
            })
            ssc
        }

        //开启的高可用的方式 要从失败中恢复过来
        val ssc = StreamingContext.getActiveOrCreate(createFunc _)
        ssc.start()
        ssc.awaitTermination()
    }

    def storeOffsets(offsetRanges: Array[OffsetRange], topic: String, group: String): Unit = {
        val zkTopicPath = s"/offsets/${topic}/${group}"
        for (range <- offsetRanges) {//每一个range中都存储了当前rdd中消费之后的偏移量
            val path = s"${zkTopicPath}/${range.partition}"
            ensureZKExists(path)
            client.setData().forPath(path, (range.untilOffset + "").getBytes())
        }
    }
    /*
      * 约定,offset的保存到路径 ----->zookeeper
      * /xxxxx/offsets/topic/group/partition/
      *     0
      *     1
      *     2
     */
    def createMessage(ssc:StreamingContext, kafkaParams:Map[String, String], topic:String, group:String):InputDStream[(String, String)] = {
        //从zookeeper中读取对应的偏移量,返回值适应fromOffsets和flag(标志位)
        val (fromOffsets, flag)  = getFromOffsets(topic, group)

        var message:InputDStream[(String, String)] = null
        if(!flag) {
            //有数据-->zookeeper中是否保存了SparkStreaming程序消费kafka的偏移量信息
            //处理第一次以外,从这个接口读取kafka对应的数据
            val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
            message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
        } else {
            //第一次读取的时候
            message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)
        }
        message
    }

    //从zookeeper中读取kafka对应的offset --->
    def getFromOffsets(topic:String, group:String): (Map[TopicAndPartition, Long], Boolean) = {
        ///xxxxx/offsets/topic/group/partition/
        val zkTopicPath = s"/offsets/${topic}/${group}"
        ensureZKExists(zkTopicPath)

        //如果有直接读取对应的数据
        val offsets = for{p <- JavaConversions.asScalaBuffer(
            client.getChildren.forPath(zkTopicPath))} yield {
//                p --->分区所对应的值
            val offset = client.getData.forPath(s"${zkTopicPath}/${p}")
            (TopicAndPartition(topic, p.toInt), new String(offset).toLong)
        }
        if(!offsets.isEmpty) {
            (offsets.toMap, false)
        } else {
            (offsets.toMap, true)
        }
    }

    def ensureZKExists(zkTopicPath:String): Unit = {
        if(client.checkExists().forPath(zkTopicPath) == null) {//zk中没有没写过数据
            client.create().creatingParentsIfNeeded().forPath(zkTopicPath)
        }
    }

    val client = {//代码块编程 zk(servlet)--->Curator(SpringMVC/Struts2)
        val client = CuratorFrameworkFactory.builder()
                    .namespace("mykafka")//命名空间就是目录意思
                    .connectString("bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka")
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                    .build()
        client.start()
        client
    }
}

总结

在spark的使用是特别注意使用kafka的时候要处理消息的偏移量。

【1】http://www.aboutyun.com
【2-hive优化】https://blog.csdn.net/yu0_zhang0/article/details/81776459

没有更多推荐了,返回首页