2017-02-19 23:09:37 xuyaoqiaoyaoge 阅读数 4893
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1710 人正在学习 去看看 李飞

kafka

kafka中文教程

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力.
Apache kafka是消息中间件的一种。
一 、术语介绍
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。每个topic都具有这两种模式:(队列:消费者组(consumer group)允许同名的消费者组成员瓜分处理;发布订阅:允许你广播消息给多个消费者组(不同名))。
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker,比如flume采集机就是Producer。
Consumer
消息消费者,向Kafka broker读取消息的客户端。比如Hadoop机器就是Consumer。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

二、使用场景
1、Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的”事务性”“消息传输担保(消息确认机制)”“消息分组”等企业级特性;kafka只能使用作为”常规”的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
2、Websit activity tracking
kafka可以作为”网站活性跟踪”的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等
3、Log Aggregation
kafka的特性决定它非常适合作为”日志收集中心”;application可以将操作日志”批量”“异步”的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.
4、它应用于2大类应用
构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。
构建实时流的应用程序,对数据流进行转换或反应。

三、分布式
Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求,而follower被动的复制数据。如果leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

四、消息处理顺序
Kafka保证消息的顺序不变。 在这一点上Kafka做的更好,尽管并没有完全解决上述问题。 Kafka采用了一种分而治之的策略:分区。 因为Topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。

五、安装
kafka安装和启动

六、Key和Value
Kafka是一个分布式消息系统,Producer生产消息并推送(Push)给Broker,然后Consumer再从Broker那里取走(Pull)消息。Producer生产的消息就是由Message来表示的,对用户来讲,它就是键-值对。

Message => Crc MagicByte Attributes Key Value

kafka会根据传进来的key计算其分区,但key可以不传,可以为null,空的话,producer会把这条消息随机的发送给一个partition。

这里写图片描述

MessageSet用来组合多条Message,它在每条Message的基础上加上了Offset和MessageSize,其结构是:

MessageSet => [Offset MessageSize Message]

它的含义是MessageSet是个数组,数组的每个元素由三部分组成,分别是Offset,MessageSize和Message,它们的含义分别是:
这里写图片描述

七、小例子
1.启动ZooKeeper
进入kafka目录,加上daemon表示在后台启动,不占用当前的命令行窗口。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
如果要关闭,下面这个
bin/zookeeper-server-stop.sh
ZooKeeper 的端口号是2181,输入jps查看进程号是QuorumPeerMain
2.启动kafka
在server.properties中加入,第一个是保证你删topic可以删掉,第二个不然的话就报topic找不到的错误:
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
然后:
bin/kafka-server-start.sh -daemon config/server.properties
如果要关闭,下面这个
bin/kafka-server-stop.sh
Kafka的端口号是9092,输入jps查看进程号是Kafka
3.创建一个主题(topic)
创建一个名为“test”的Topic,只有一个分区和一个备份:
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
创建好之后,可以通过运行以下命令,查看已创建了哪些topic:
bin/kafka-topics.sh –list –zookeeper localhost:2181
查看具体topic的信息:
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test
4.发送消息
启动kafka生产者:
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
5.接收消息
新开一个命令行窗口,启动kafka消费者:
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
6.最后
在producer窗口中输入消息,可以在consumer窗口中显示:
这里写图片描述
这里写图片描述

spark streaming

spark中文学习指南

Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。

Spark Streaming的优势在于:
能运行在100+的结点上,并达到秒级延迟。
使用基于内存的Spark作为执行引擎,具有高效和容错的特性。
能集成Spark的批处理和交互查询。
为实现复杂的算法提供和批处理类似的简单接口。

首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。
在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。
正如Spark Streaming最初的目标一样,它通过丰富的API和基于内存的高速计算引擎让用户可以结合流式处理,批处理和交互查询等应用。因此Spark Streaming适合一些需要历史数据和实时数据结合分析的应用场合。当然,对于实时性要求不是特别高的应用也能完全胜任。另外通过RDD的数据重用机制可以得到更高效的容错处理。

当一个上下文(context)定义之后,你必须按照以下几步进行操作:
定义输入源;
准备好流计算指令;
利用streamingContext.start()方法接收和处理数据;
处理过程将一直持续,直到streamingContext.stop()方法被调用。

可以利用已经存在的SparkContext对象创建StreamingContext对象:

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

窗口函数
对于spark streaming中的窗口函数,参见:
窗口函数解释

对非(K,V)形式的RDD 窗口化reduce:
1.reduceByWindow(reduceFunc, windowDuration, slideDuration)
2.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

对(K,V)形式RDD 按Key窗口化reduce:
1.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
2.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc)
从效率来说,应选择带有invReduceFunc的方法。

可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支:

 dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })

spark执行时间是少了,但数据库压力比较大,会一直占资源。

小例子:

package SparkStreaming

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

object Spark_streaming_Test {
  def main(args: Array[String]): Unit = {
    //local[2]表示在本地建立2个working线程
    //当运行在本地,如果你的master URL被设置成了“local”,这样就只有一个核运行任务。这对程序来说是不足的,因为作为receiver的输入DStream将会占用这个核,这样就没有剩余的核来处理数据了。
    //所以至少得2个核,也就是local[2]
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //时间间隔是1秒
    val ssc = new StreamingContext(conf, Seconds(1))
    //有滑动窗口时,必须有checkpoint
    ssc.checkpoint("F:\\checkpoint")
    //DStream是一个基类
    //ssc.socketTextStream() 将创建一个 SocketInputDStream;这个 InputDStream 的 SocketReceiver 将监听服务器 9999 端口
    //ssc.socketTextStream()将 new 出来一个 DStream 具体子类 SocketInputDStream 的实例。
    val lines = ssc.socketTextStream("192.168.1.66", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    //    val lines = ssc.textFileStream("F:\\scv")
    val words = lines.flatMap(_.split(" ")) // DStream transformation
    val pairs = words.map(word => (word, 1)) // DStream transformation
    //    val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation
    //每隔3秒钟,计算过去5秒的词频,显然一次计算的内容与上次是有重复的。如果不想重复,把2个时间设为一样就行了。
    //    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(5), Seconds(3))
    val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, _ - _, Seconds(5), Seconds(3))
    windowedWordCounts.filter(x => x._2 != 0).print()
    //    wordCounts.print() // DStream output,打印每秒计算的词频
    //需要注意的是,当以上这些代码被执行时,Spark Streaming仅仅准备好了它要执行的计算,实际上并没有真正开始执行。在这些转换操作准备好之后,要真正执行计算,需要调用如下的方法
    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    //在StreamingContext上调用stop()方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置stop()的可选参数为false
    //一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)
    ssc.stop()
  }
}

1.启动
start-dfs.sh
start-yarn.sh
这里写图片描述
2.终端输入:
nc -lk 9999
然后在IEDA中运行spark程序。由于9999端口中还没有写东西,所以运行是下图:

这里写图片描述
只有时间,没有打印出东西。然后在终端输入下面的东西,也可以从其他地方复制进来。
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
这时,IDEA的控制台就输出下面的东西。
这里写图片描述

3.下面运行带时间窗口的,注意如果加了时间窗口就必须有checkpoint
输入下面的,不要一次全输入,一次输个几行。
checkpoint
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
ni hao a
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
hello world
hello hadoop
hadoop love
love cat
cat love rabbit

先是++–的那种:
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
再然后是不++–的那种:
这里写图片描述
这里写图片描述
++–的那种是因为把过去的RDD也带进来计算了,所以出现了0这个情况,为了避免这种情况只能在打印前过滤掉0的再打印。而没有++–的那种情况是不需要这样做的。

Checkpointing
在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。就可以通过streamingContext.checkpoint(checkpointDirectory)方法来做。
默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过dstream.checkpoint来设置。需要注意的是,随着 streaming application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的5~10倍。

package streaming

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Administrator on 2017/3/12.
  */

object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
    println("Creating new context") //如果没有出现这句话,说明StreamingContext是从checkpoint里面加载的
    val outputFile = new File(outputPath) //输出文件的目录
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1)) //时间间隔是1秒
    ssc.checkpoint(checkpointDirectory) //设置一个目录用于保存checkpoint信息

    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    val windowedWordCounts = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10))
    windowedWordCounts.checkpoint(Seconds(10))//一般推荐设置为 batch duration 的5~10倍,即StreamingContext的第二个参数的5~10倍
    windowedWordCounts.print()
    Files.append(windowedWordCounts + "\n", outputFile, Charset.defaultCharset())
    ssc
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 4) {
      System.exit(1)
    }
    val ip = args(0)
    val port = args(1).toInt
    val checkpointDirectory = args(2)
    val outputPath = args(3)
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}

优化
1.数据接收的并行水平
创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。因此允许数据并行接收,提高整体的吞吐量。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

多输入流或者多receiver的可选的方法是明确地重新分配输入数据流(利用inputStream.repartition()),在进一步操作之前,通过集群的机器数分配接收的批数据。
2.任务序列化
运行kyro序列化任何可以减小任务的大小,从而减小任务发送到slave的时间。

    val conf = new SparkConf().setAppName("analyse_domain_day").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

3.设置合适的批间隔时间(即批数据的容量)
批处理时间应该小于批间隔时间。如果时间间隔是1秒,但处理需要2秒,则处理赶不上接收,待处理的数据会越来越多,最后就嘣了。
找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。为了验证你的系统是否能满足数据处理速率,你可以通过检查端到端的延迟值来判断(可以在Spark驱动程序的log4j日志中查看”Total delay”或者利用StreamingListener接口)。如果延迟维持稳定,那么系统是稳定的。如果延迟持续增长,那么系统无法跟上数据处理速率,是不稳定的。你能够尝试着增加数据处理速率或者减少批容量来作进一步的测试。

DEMO

spark流操作kafka有两种方式:
一种是利用接收器(receiver)和kafaka的高层API实现。
一种是不利用接收器,直接用kafka底层的API来实现(spark1.3以后引入)。

相比基于Receiver方式有几个优点:
1、不需要创建多个kafka输入流,然后Union他们,而使用DirectStream,spark Streaming将会创建和kafka分区一样的RDD的分区数,而且会从kafka并行读取数据,Spark的分区数和Kafka的分区数是一一对应的关系。
2、第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次:一次是被Kafka复制;另一次是写入到WAL中。
Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。
3、Receiver方式读取kafka,使用的是高层API将偏移量写入ZK中,虽然这种方法可以通过数据保存在WAL中保证数据的不对,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。
第二种方式不采用ZK保存偏移量,消除了两者的不一致,保证每个记录只被Spark Streaming操作一次,即使是在处理失败的情况下。如果想更新ZK中的偏移量数据,需要自己写代码来实现。
由于直接操作的是kafka,kafka就相当于你底层的文件系统。这个时候能保证严格的事务一致性,即一定会被处理,而且只会被处理一次。

首先去maven的官网上下载jar包
spark-streaming_2.10-1.6.2.jar
spark-streaming-kafka_2.10-1.6.2.jar
我的Scala是2.10的,spark是1.6.0的,下载的spark.streaming和kafka版本要与之对应,spark-streaming_2.10-1.6.2.jar中2.10是Scala版本号,1.6.2是spark版本号。当然下载1.6.1也行。
需要添加 kafka-clients-0.8.2.1.jar以及kafka_2.10-0.8.2.1.jar
这里的2.10是Scala版本号,0.8.2.1是kafka的版本号。就下这个版本,别的版本不对应,会出错。

在kafka的配置文件里面:
delete.topic.enable=true
host.name=192.168.1.66
zookeeper.connect=192.168.1.66:2181
我这里写主机名的话,各种报错,所以干脆就写IP地址了。
启动kafka以及ZK的步骤和kafka 1-2是一样的。
进入/kafka_2.10-0.8.2.1 新建一个主题:
bin/kafka-topics.sh –create –zookeeper 192.168.1.66:2181 –replication-factor 1 –partitions 1 –topic test
启动一个生产者:
bin/kafka-console-producer.sh –broker-list 192.168.1.66:9092 –topic test
在自己的电脑上运行spark程序后,在命令行输入:
这里写图片描述
在控制台会显示:
这里写图片描述

package SparkStreaming

//TopicAndPartition是对 topic和partition的id的封装的一个样例类
import kafka.common.TopicAndPartition
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import kafka.serializer.StringDecoder

object SparkStreaming_Kafka_Test {

  val kafkaParams = Map(
    //kafka broker的IP加端口号,这个是必须的
    "metadata.broker.list" -> "192.168.1.66:9092",
    // "group.id" -> "group1",
    /*此配置参数表示当此groupId下的消费者,
     在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
     consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),
     smallest表示最小offset,即从topic的开始位置消费所有消息.*/
    "auto.offset.reset" -> "smallest"
  )

  val topicsSet = Set("test")

  //  val zkClient = new ZkClient("xxx:2181,xxx:2181,xxx:2181",Integer.MAX_VALUE,100000,ZKStringSerializer)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming_Kafka_Test")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))
    ssc.checkpoint("F:\\checkpoint")
    /*
    KafkaUtils.createDirectStream[
       [key的数据类型], [value的数据类型], [key解码的类], [value解码的类] ](
       streamingContext, [Kafka配置的参数,是一个map], [topics的集合,是一个set])
       */
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2) //取value
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。所以说checkpoint就已经可以保证容错性了。
如果需要把偏移量写入ZK,首先在工程中新建一个包:org.apache.spark.streaming.kafka,然后建一个KafkaCluster类:

package org.apache.spark.streaming.kafka

import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]

  @transient private var _config: SimpleConsumerConfig = null

  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
    }
    _config
  }

  def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    val meta = offsets.map {
      kv => kv._1 -> OffsetAndMetadata(kv._2)
    }
    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
  }

  def setConsumerOffsetMetadata(groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.commitStatus
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
            errs.append(ErrorMapping.exceptionFor(err))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    Left(errs)
  }

  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
        fn(consumer)
      } catch {
        case NonFatal(e) =>
          errs.append(e)
      } finally {
        if (consumer != null) {
          consumer.close()
        }
      }
    }
  }

  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)
}

然后在主函数中:

// 手动更新ZK偏移量,使得基于ZK偏移量的kafka监控工具可以使用
    messages.foreachRDD(rdd => {
      // 先处理消息
      val lines = rdd.map(_._2) //取value
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
      wordCounts.foreach(println)
      // 再更新offsets
      //spark内部维护kafka偏移量信息是存储在HasOffsetRanges类的offsetRanges中
      //OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移。
      val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
      val kc = new KafkaCluster(kafkaParams)
      for (offsets <- offsetsList) {
        val topicAndPartition = TopicAndPartition("test", offsets.partition)
        val o = kc.setConsumerOffsets("group1", Map((topicAndPartition, offsets.untilOffset)),8)
        if (o.isLeft) {
          println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
        }
      }
    })

下面是用kafka的API自己写一个程序读取文件,作为kafka的生产者,需要将Scala和kafka的所有的jar包都导入,lib文件夹下面的都导入进去。
如果没有2台电脑,可以开2个开发环境,IDEA作为消费者,eclipse作为生产者。
生产者代码如下:

package spark_streaming_kafka_test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MakeRealtimeDate extends Thread {

    private Producer<Integer, String> producer;

    public MakeRealtimeDate() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "192.168.1.66:2181");
        props.put("metadata.broker.list", "192.168.1.66:9092");
        ProducerConfig pc = new ProducerConfig(props);
        producer = new Producer<Integer, String>(pc);
    }

    public void run() {
        while (true) {
            File file = new File("C:\\Users\\Administrator\\Desktop\\wordcount.txt");
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new FileReader(file));
            } catch (FileNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            String lineTxt = null;
            try {
                while ((lineTxt = reader.readLine()) != null) {
                    System.out.println(lineTxt);
                    producer.send(new KeyedMessage<Integer, String>("test", lineTxt));
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        new MakeRealtimeDate().start();
    }

}

先启动之前写的sparkstreaming消费者统计单词个数的程序,然后再启动我们现在写的这个生产者程序,最后就会在IDEA的控制台中看到实时结果。

2016-08-04 23:34:56 sun_qiangwei 阅读数 1262
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1710 人正在学习 去看看 李飞

在之前的文章中,曾经提到了,如何在使用 Kafka Direct API 处理消费时,将每个Partition的offset写到Zookeeper中,并且在应用重新启动或者应用升级时,可以通过读取Zookeeper中的offset恢复之前的处理位置,进而继续工作。而本篇文章则将要介绍另外一个 Spark Streaming + Kafka 的利器 – Kafka-spark-consumer 项目。


一、项目简介

项目名称:Kafka-spark-consumer
项目地址:https://github.com/dibbhatt/kafka-spark-consumer
在项目的 README.md 中,已经对这个项目有了一个详细的介绍,此处就不对里面的内容进行详细的说明了,想了解的同学可自行去了解,总结一句话:牛。

我在这边需要强调的是:Kafka-spark-consumer 项目在运行的过程中会把 topic 的每个 partition 的 offsets 写到 Zookeeper 中,当我们对 Driver 程序进行升级 或者 需要重新启动 Driver 程序的时候,Kafka-spark-consumer 可以从Zookeeper中恢复相关内容并继续执行。


二、构建测试程序

程序的主要功能:从kafka中 kafka_direct topic 中处理消息,统计每个batch中单词出现的次数。
2.1、添加依赖jar包,此处使用的maven方式。

<dependencies>
  <dependency>
    <groupId>dibbhatt</groupId>
    <artifactId>kafka-spark-consumer</artifactId>
    <version>1.0.6</version>
  </dependency>
</dependencies>
<repositories>
  <repository>
    <id>SparkPackagesRepo</id>
    <url>http://dl.bintray.com/spark-packages/maven</url>
  </repository>
</repositories>

由于需要下载相关依赖的jar包,所以我在下载的时候花了很长时间。

2.2、具体测试代码,如下:

public class KafkaSparkConsumerTest{

    public static JavaStreamingContext createContext(){

        Properties props = new Properties();
        //Kafka所使用的Zookeeper的IP地址
        props.put("zookeeper.hosts", "192.168.1.151");
        //Kafka所使用的Zookeeper的端口
        props.put("zookeeper.port", "2181");
        //Kafka在Zookeeper中,保存broker 服务器的路径
        props.put("zookeeper.broker.path", "/brokers");
        //配置目标 topic
        props.put("kafka.topic", "kafka_direct");
        //配置用来标识此程序作为consumer的编号
        props.put("kafka.consumer.id", "54321");
        //配置用来存储offset的Zookeeper
        props.put("zookeeper.consumer.connection", "192.168.1.151:2181");
        //配置存储offset的基础path
        props.put("zookeeper.consumer.path", "/kafka_spark_consumer");
        //********以下是可选参数 ******************/
        //配置是否强制从第一条消息开始处理,默认是从当时能获取到的最后一条记录开始处理
        props.put("consumer.forcefromstart", "true");
        props.put("consumer.fetchsizebytes", "1048576");
        props.put("consumer.fillfreqms", "250");
        props.put("consumer.backpressure.enabled", "true");

        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("KafkaSparkConsumerTest")
                .set("spark.streaming.receiver.writeAheadLog.enable", "false");

       JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));
       jsc.checkpoint("/checkpoint");

       /*
        * 由于ReceiverLauncher.launch的返回值为JavaDStream<MessageAndMetadata>类型的,
        * 而我们现在所关心的是消息中的数据,所以直接调用了 MessageAndMetadata中的
        * getPayload()方法并构造为 String类型的
        * 而MessageAndMetadata中包含了很多有用的内容,例如:consumer,topic,partition
        * ,offset,key,payload,而具体的含义从名称上就可以看出来了。
        */
        JavaDStream<String> lines = ReceiverLauncher.launch(jsc, props,3, StorageLevel.MEMORY_ONLY()).map(new Function<consumer.kafka.MessageAndMetadata, String>() {

            @Override
            public String call(consumer.kafka.MessageAndMetadata v1) throws Exception {
                return new String( v1.getPayload());
            }
        });

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                    public Iterable<String> call(
                           String event)
                            throws Exception {
                        return Arrays.asList(event);
                    }
                });

        JavaPairDStream<String, Integer> pairs = words
                .mapToPair(new PairFunction<String, String, Integer>() {

                    public Tuple2<String, Integer> call(
                            String word) throws Exception {
                        return new Tuple2<String, Integer>(
                                word, 1);
                    }
                });

        JavaPairDStream<String, Integer> wordsCount = pairs
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer v1, Integer v2)
                            throws Exception {
                        return v1 + v2;
                    }
                });

        wordsCount.print();

        return jsc;
    }

    public static void main(String[] args)  throws Exception{
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            public JavaStreamingContext create() {
              return createContext();
            }
          };

        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate("/checkpoint", factory);

        jsc.start();

        jsc.awaitTermination();
        jsc.close();
    }

}

2.3、准备测试环境。
当前 kafka_direct topic 中的各个partition的offset信息:
这里写图片描述

当前ZooKeeper中的path信息:
这里写图片描述
从截图中可以看到 程序中使用到的 /kafka_spark_consumer 路径目前还不存在。

2.4、运行Spark Streaming 程序,观察命令打印及Zookeeper中的变化:
Spark Streaming 程序的输出:
这里写图片描述
从输出中可以看到,它将我之前的测试数据一并打印了,非常符合程序中的设定 consumer.forcefromstart=true 的参数。

Zookeeper中的变化:
这里写图片描述
从截图中可以看到,已经将 kafka_direct 中的每个 partition 的offset 保存到了Zookeeper中了,其中最末节点的内容如下:
这里写图片描述
这个截图中的offset的值是40,正好与准备数据中的Kafka Manager中关于每个partition的offset的值是一样的。

2.5、删除checkpoint中的数据,并向kafka_direct中增加了四条消息,如下图:
这里写图片描述

2.6、再次运程Spark Streaming ,看其是否输出了4个单词的统计结果,如下图:
这里写图片描述
完全没有问题!!!

2016-05-15 23:45:15 T1DMzks 阅读数 1635
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1710 人正在学习 去看看 李飞
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.regex.Pattern;
 
import scala.Tuple2;
 
import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
 
/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: JavaDirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
 */
 
public final class JavaDirectKafkaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");
 
  public static void main(String[] args) {
    if (args.length < 2) {
      System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
          "  <brokers> is a list of one or more Kafka brokers\n" +
          "  <topics> is a list of one or more kafka topics to consume from\n\n");
      System.exit(1);
    }
 
    StreamingExamples.setStreamingLogLevels();
 
    String brokers = args[0];
    String topics = args[1];
 
    // Create context with a 2 seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
      JavaStreamingContext jssc;
     jssc = new (sparkConf, Durations.seconds(2));
 
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);
 
    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );
 
    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(
        new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    wordCounts.print();
 
    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
}

 

2019-03-14 18:05:21 zimiao552147572 阅读数 537
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1710 人正在学习 去看看 李飞

 


KafkaUtils.createDirectStream方式(推荐使用createDirectStream,不推荐使用createDstream)
        1.createDirectStream方式不同于createDstream的Receiver接收数据的方式,createDirectStream定期地从kafka的topic下对应的partition中查询最新的偏移量,
          再根据偏移量范围在每个batch(批次)里面处理数据,Spark通过调用kafka简单的消费者Api(低级api)读取一定范围的数据。
        2.createDirectStream方式相比基于Receiver的createDstream方式有几个优点: 
            1.简化并行
                不需要创建多个kafka输入流,然后union它们,sparkStreaming将会创建和kafka中的topic分区数相同的rdd的分区数,
                而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的topic分区数是一一对应的关系。
            2.高效     
                createDstream方式实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,
                第一次是接受kafka中topic的数据,另一次是写到WAL中。而不使用createDstream的receiver的这种方式则消除了这个问题。 
            3.恰好一次语义(Exactly-once-semantics,即EOS)
                createDstream方式的Receiver读取kafka数据是通过 kafka的高级api 把偏移量写入zookeeper中,
                虽然这种方法可以通过把数据保存在WAL中保证数据不丢失,但是可能会因为Spark Streaming和zookeeper中保存的偏移量不一致
                而导致数据被消费了多次。
                createDirectStream方式的恰好一次语义(Exactly-once-semantics,即EOS) 则通过实现 kafka低级api,
                topic中的偏移量仅仅被StreamingContext保存在客户端程序的checkpoint目录中,topic中的偏移量则不再保存在zookeeper中,
                那么程序只需要到checkpoint目录中读取topic中的偏移量即可,便从而消除了zookeeper和StreamingContext中偏移量不一致的问题,
                但是同时也造成无法使用基于zookeeper的kafka监控工具来读取到topic的相关信息。

        3.编写Spark Streaming应用程序(createDirectStream方式的版本)
            package cn.itcast.dstream.kafka
            import kafka.serializer.StringDecoder
            import org.apache.spark.{SparkConf, SparkContext}
            import org.apache.spark.streaming.{Seconds, StreamingContext}
            import org.apache.spark.streaming.dstream.{DStream, InputDStream}
            import org.apache.spark.streaming.kafka.KafkaUtils

            // 利用sparkStreaming对接kafka实现单词计数:createDirectStream 采用Direct(低级API)
            object SparkStreamingKafka_Direct 
            {
                    def main(args: Array[String]): Unit = 
                {
                          //1、创建sparkConf。setMaster("local[2]"):本地模式运行 
                    //setMaster("local[2]"):本地模式运行,启动两个线程
                    //设置master的地址local[N] ,n必须大于1,其中1个线程负责去接受数据,另一线程负责处理接受到的数据
                            val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingKafka_Direct").setMaster("local[2]")
                            //2、创建sparkContext
                            val sc = new SparkContext(sparkConf)
                    //设置日志输出的级别
                            sc.setLogLevel("WARN")
                        //3、构建StreamingContext对象,每个批处理的时间间隔,即以多少秒内的数据划分为一个批次 ,当前设置 以5秒内的数据 划分为一个批次  
                            val ssc = new StreamingContext(sc, Seconds(5))
                    //createDirectStream采用低级API的关系,topic中的偏移量仅仅被StreamingContext保存在客户端程序的checkpoint目录中,
                       //topic中的偏移量则不再保存在zookeeper中。此处设置checkpoint路径,当前项目下有一个Kafka_Direct目录
                          ssc.checkpoint("./Kafka_Direct")
                            //4、配置kafka相关参数:配置每个kafka服务器的IP:端口,还有"group.id"(消费者组groupId)
                    //配置group.id:多个Consumer的group.id都相同的话,表示多个Consumer都在同一个消费组group中
                            val kafkaParams=Map("metadata.broker.list"->"NODE1:9092,NODE2:9092,NODE3:9092","group.id"->"Kafka_Direct")
                            //5、定义topic的主题名。
                    //此处没使用topics.split("分隔符").toSet。topic中数据还可设置以分隔符进行切割,进而切割出每行数据
                            val topics=Set("kafka_spark")
                            //6、通过 KafkaUtils.createDirectStream 接收 kafka数据,这里采用是kafka低级api偏移量不受zookeeper管理
                            val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
                            //7、获取kafka中topic中的数据
                    //获取元组中的值格式:元组名._下标。
                         //map(_._2):表示map遍历出的每个元素是元祖的同时,并获取元祖中下标为2的值,下标从1开始。
                         //partition.foreach(tuple=>{tuple._2} 和 map(_._2) 均都为“元组名._下标值2”,最终取出的数据才为真正要进行处理的数据。
                              val topicData: DStream[String] = dstream.map(_._2)
                            //8、切分每一行,每个单词计为1
                    //flatMap(_.split(" ")) 流的扁平化,最终输出的数据类型为一维数组Array[String],所有单词都被分割出来作为一个元素存储到同一个一维数组Array[String]
                    //map((_,1))每个单词记为1,即(单词,1),表示每个单词封装为一个元祖,其key为单词,value为1,返回MapPartitionRDD数据
                            val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
                            //9、相同单词出现的次数累加
                            //分组聚合:reduceByKey(_+_) 相同单词出现的次数累加,表示对相同的key(单词)对应的value进行累加计算 
                            val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
                            //10、打印输出
                            result.print()
                            //开启计算
                            ssc.start()
                            ssc.awaitTermination()
                        }
            }

        4.查看对应的效果
            1.向topic中添加数据,通过shell命令向topic发送消息
                cd /root/kafka
                格式:bin/kafka-console-producer.sh --broker-list kafka的IP:9092 --topic 主题名
                例子:bin/kafka-console-producer.sh --broker-list NODE1:9092 --topic kafka_spark


            2.查看控制台的输出:

 

============================================================

spark streaming从kafka获取数据,计算处理后存储到redis


因为数据要存入redis中,获取redis客户端代码如下
package com.fan.spark.stream  
   
import org.apache.commons.pool2.impl.GenericObjectPoolConfig  
import redis.clients.jedis.JedisPool  
   
/** 
  * Created by http://www.fanlegefan.com on 17-7-21. 
  */  
object RedisClient {  
  val redisHost = "127.0.0.1"  
  val redisPort = 6379  
  val redisTimeout = 30000  
   
  lazy val pool = newJedisPool(newGenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)  
  lazy val hook = newThread {  
    override def run = {  
      println("Execute hook thread: " + this)  
      pool.destroy()  
    }  
  }  
   
  sys.addShutdownHook(hook.run)  
}  


实时计算代码如下
package com.fan.spark.stream  
   
import java.text.SimpleDateFormat  
import java.util.Date  
   
import kafka.serializer.StringDecoder  
import org.apache.spark.streaming.kafka.KafkaUtils  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.{SparkConf, SparkContext}  
   
/** 
  * Created by http://www.fanlegefan.com on 17-7-21. 
  */  
object UserActionStreaming 
{     
  def main(args: Array[String]): Unit = 
  {  
    val df = newSimpleDateFormat("yyyyMMdd")  
    //设置master的地址local[N] ,n必须大于1,其中1个线程负责去接受数据,另一线程负责处理接受到的数据
    val sparkConf = newSparkConf().setAppName("pvuv").setMaster("local[3]")  
   
    val sc = newSparkContext(sparkConf)  
    //构建StreamingContext对象,每个批处理的时间间隔,即以多少秒内的数据划分为一个批次 ,当前设置 以 1 秒内的数据 划分为一个批次,每一个batch(批次)就构成一个RDD数据集
    //DStream就是一个个batch(批次)的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集
    val ssc = newStreamingContext(sc, Seconds(10))  
    ssc.checkpoint("/home/work/IdeaProjects/sparklearn/checkpoint")  
   
    val group = "test"  
    val topics = "test"  
    val topicSets = topics.split(",").toSet  //topic中的数据以","作为分隔符,即相当于把","作为回车换行的标志,进而切割出每行数据
    val kafkaParams = Map[String, String](  
      "metadata.broker.list"-> "localhost:9092",  
      "group.id"-> group  
    )  

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSets)  
    //1.sparkStreaming将会创建和kafka中的topic分区数相同的rdd的分区数,而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的topic分区数是一一对应的关系。
    //2.如果kafka创建topic的命令中指定分区数为3个的话,那么就会有对应的3个RDD,所以必须使用stream.foreachRDD遍历每个RDD。
    //  同时每个RDD中又会有多个partition分区,所以必须使用rdd.foreachPartition遍历RDD中的每个partition分区。
    //  而每个partition分区中存储有topic中的数据,因此需要使用partition.foreach遍历每个partition分区中的多个元祖tuple,
    //  而每个元祖tuple封装一条数据,因此使用tuple._2 即通过“元组名._下标值2”的方式才能获取topic中真正的每行数据。
    //3.如果kafka创建topic的命令中不指定分区数的话,那么使用 stream.map(_._2) 即通过“元组名._下标值2”的方式便能获取topic中存储的所有数据。
    stream.foreachRDD(rdd=>rdd.foreachPartition(partition=>
    {  
      val jedis = RedisClient.pool.getResource  
      partition.foreach(tuple=>
      {  
        //获取元组中的值格式:元组名._下标。
        //map(_._2):表示map遍历出的每个元素是元祖的同时,并获取元祖中下标为2的值,下标从1开始。
        //partition.foreach(tuple=>{tuple._2} 和 map(_._2) 均都为“元组名._下标值2”,最终取出的数据才为真正要进行处理的数据。
        //因为使用topics.split(",").toSet,即topic中的数据把","分隔符作为回车换行的标志,所以此处获取出的数据为一行数据
        val line = tuple._2  
        //对一行数据以","分隔符进行切割,一行数据被分为4个小字符串并封装到字符串数组中
        val arr = line.split(",")  
        val user = arr(0)  //字符串数组(下标):取出一行字符串数据中的第一个被分割出来的小字符串
        val page = arr(1)  
        val money = arr(2)  
        val day = df.format(newDate(arr(3).toLong))  
        //uv  
        jedis.pfadd(day  + "_"+ page , user)  
        //pv  
        jedis.hincrBy(day+"_pv", page, 1)  
        //sum  
        jedis.hincrByFloat(day+"_sum", page, money.toDouble)  
      })  
    }))  
    ssc.start()  
    ssc.awaitTermination()  
  }  
}  

 

2017-06-15 15:13:24 tlqfreedom 阅读数 4912
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1710 人正在学习 去看看 李飞
如下是pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.demo</groupId>
	<artifactId>spark-streaming-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spark-streaming-demo</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<spark.version>1.6.2</spark.version>
		<mysql-connector.version>5.1.35</mysql-connector.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql-connector.version}</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>druid</artifactId>
			<version>1.0.31</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.stratio.datasource</groupId>
			<artifactId>spark-mongodb_2.11</artifactId>
			<version>0.12.0</version>
		</dependency>
	</dependencies>
</project>


代码如下:

package com.fosun.spark_streaming_demo;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import javax.sql.DataSource;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;

import com.alibaba.druid.pool.DruidDataSourceFactory;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class SparkstreamingOnDirectKafka {
	public static JavaStreamingContext createContext() throws Exception {
		SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");
		JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
		// jsc.checkpoint("/user/tenglq/checkpoint");

		Map<String, String> kafkaParams = new HashMap<String, String>();
		kafkaParams.put("metadata.broker.list", "fonova-hadoop1.fx01:9092,fonova-hadoop2.fx01:9092");
		kafkaParams.put("auto.offset.reset", "smallest");
		Set<String> topics = new HashSet<String>();
		topics.add("tlqtest3");

		final Map<String, String> params = new HashMap<String, String>();
		params.put("driverClassName", "com.mysql.jdbc.Driver");
		params.put("url", "jdbc:mysql://172.16.100.49:3306/test_sparkstreaming");
		params.put("username", "root");
		params.put("password", "root123456");

		Map<TopicAndPartition, Long> offsets = new HashMap<TopicAndPartition, Long>();
		DataSource ds = DruidDataSourceFactory.createDataSource(params);
		Connection conn = ds.getConnection();
		Statement stmt = conn.createStatement();
		ResultSet rs = stmt.executeQuery("SELECT topic,partition,offset FROM kafka_offsets WHERE topic = 'tlqtest3'");
		while (rs.next()) {
			TopicAndPartition topicAndPartition = new TopicAndPartition(rs.getString(1), rs.getInt(2));
			offsets.put(topicAndPartition, rs.getLong(3));
		}

		final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();

		JavaDStream<String> lines = null;

		if (offsets.isEmpty()) {
			JavaPairInputDStream<String, String> pairDstream = KafkaUtils.createDirectStream(jsc, String.class,
					String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
			lines = pairDstream
					.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
						private static final long serialVersionUID = 1L;

						public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
							OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
							offsetRanges.set(offsets);
							return rdd;
						}
					}).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
						private static final long serialVersionUID = 1L;

						public Iterable<String> call(Tuple2<String, String> t) throws Exception {
							return Arrays.asList(t._2);
						}
					});
		} else {
			JavaInputDStream<String> dstream = KafkaUtils.createDirectStream(jsc, String.class, String.class,
					StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,
					new Function<MessageAndMetadata<String, String>, String>() {

						private static final long serialVersionUID = 1L;

						public String call(MessageAndMetadata<String, String> v1) throws Exception {
							return v1.message();
						}
					});
			lines = dstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
				private static final long serialVersionUID = 1L;

				public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
					OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
					offsetRanges.set(offsets);
					return rdd;
				}
			});

		}

		lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
			private static final long serialVersionUID = 1L;

			public void call(JavaRDD<String> rdd) throws Exception {
				// 操作rdd
				List<String> map = rdd.collect();
				String[] array = new String[map.size()];
				System.arraycopy(map.toArray(new String[map.size()]), 0, array, 0, map.size());
				List<String> l = Arrays.asList(array);
				Collections.sort(l);
				for (String value : l) {
					System.out.println(value);
				}

				// 保存offset
				DataSource ds = DruidDataSourceFactory.createDataSource(params);
				Connection conn = ds.getConnection();
				Statement stmt = conn.createStatement();
				for (OffsetRange offsetRange : offsetRanges.get()) {
					ResultSet rs = stmt.executeQuery("select count(1) from kafka_offsets where topic='"
							+ offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'");
					if (rs.next()) {
						int count = rs.getInt(1);
						if (count > 0) {
							stmt.executeUpdate("update kafka_offsets set offset ='" + offsetRange.untilOffset()
									+ "'  where topic='" + offsetRange.topic() + "' and partition='"
									+ offsetRange.partition() + "'");
						} else {
							stmt.execute("insert into kafka_offsets(topic,partition,offset) values('"
									+ offsetRange.topic() + "','" + offsetRange.partition() + "','"
									+ offsetRange.untilOffset() + "')");
						}
					}

					rs.close();
				}

				stmt.close();
				conn.close();
			}

		});

		return jsc;
	}

	public static void main(String[] args) {
		JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
			public JavaStreamingContext create() {
				try {
					return createContext();
				} catch (Exception e) {
					throw new RuntimeException(e);
				}
			}
		};

		// JavaStreamingContext jsc =
		// JavaStreamingContext.getOrCreate("/user/tenglq/checkpoint", factory);

		JavaStreamingContext jsc = factory.create();

		jsc.start();

		jsc.awaitTermination();
		jsc.close();

	}
}


kafka-spark-hive

阅读数 61

没有更多推荐了,返回首页