2018-01-16 09:52:40 pengchengqing 阅读数 1206
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2456 人正在学习 去看看 肖滨

Kafka 0.10 与 Spark Streaming 流集成在设计上与0.8 Direct Stream 方法类似。它提供了简单的并行性,Kafka分区和Spark分区之间的1:1对应,以及对偏移量和元数据的访问。然而,由于新的集成使用了新的  Kafka consumer API 而不是简单的API,所以在使用方面有显著的差异。这个版本的集成被标记为实验性的,因此API有可能发生变化。

链接:

 对于使用 SBT/maven 定义的 Scala/Java 项目,使用如下的依赖:

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.2.1
不要手动的添加org.apache.kafka 依赖(比如, kafka-clients), spark-streaming-kafka-0-10  已经有了合适的过渡的依赖,不同版本之间的不兼容问题是很难处理的。

Creating a Direct Stream

Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

流中的每一项都是一个 ConsumerRecord

参考kafka的消费者的配置文档可以看到更多的参数。如果你的Spark 批量的执行时间 比默认的 Kafka的 心跳 会话超时时间大,需要适当的增加 heartbeat.interval.ms 和session.timeout.ms,对于超过5分钟的,batch处理,需要需改broker上的 group.max.session.timeout.ms。注意,例子中设置enable.auto.commit 为 false,详细讨论参考下面的 Offset 存储。

LocationStrategies
新的Kafka consumer API  将会提前获取消息到缓存中。因而这个对于性能是非常重要的,因为 Spark的集成 在executors 保持缓存的consumers 而不是在每个batch上重新创建,并且优先在consumer的本机上调度分区。
在大多数情况下,你应该使用 Locationstrategies.PreferConsistent,如上所示。这将会使得分区平均分不到executors上。如果你的executors 和你的 kafka brokers 在在同一个主机上,使用 PreferBrokers,这种方式 会优先调度 Kafka leader 上的分区。如果你在不同的分区上有一个严重的倾斜,使用PreferFixed。这种方式使得你可以指定一个明确的分区和主机的映射关系(任何未指定的分区将使用一致的位置)。
用户缓存的默认最大大小为64。  如果你希望处理超过 kafka分区数(64* executors 的数量),你可以通过修改
spark.streaming.kafka.consumer.cache.maxCapacity。
如果你希望关闭 Kafka consumers的缓存,你可以设置spark.streaming.kafka.consumer.cache.enabled 为false。
关闭需要解决 SPARK-19185描述的问题。这个属性将会在以后的版本中删除, SPARK-19185。

缓存的key是 按照 topicpartition和group.id,所以每一个createDirectStream需要一个单独的group.id

ConsumerStrategies

新的Kafka consumer API  对特定的topics 有不同的方式,其中的一些需要大量的object实例化设置,消费策略提供了一个抽象,使得Spark 能够获得在checkpoint重启的时候正确配置。

上述的ConsumerStrategies.Subscribe,能够订阅一个固定的topics的集合。SubscribePattern 能够根据你感兴趣的topics进行匹配。需要注意的是,不同于 0.8的集成, 使用subscribe or SubscribePattern 可以支持在运行的streaming中增加分区。Assign 可以指定一个固定的分区集合。这三种策略都有重载构造函数,允许您指定特定分区的起始偏移量。如果你有特定的消费需求,设置不符合上面的选项,consumerstrategy是一个公共类,您可以扩展。

Creating an RDD

如果你有一个适用于batch处理的场景,你可以为一个特定的 offset 范围创建一个 RDD。
// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
需要注意的是,你不能使用PreferBrokers,因为没有Stream,就没有一个driver上的消费者呢能够自动的查找broker的元数据。如果需要的话,使用PreferFixed来实现元数据的查找。

Obtaining Offsets 获得偏移量

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}
注意:HasOffsetRanges 的转化只会在createDirectStream结果的第一个方法中执行成功,后续方法中则不行。要知道,RDD分区和Kafka分区的一对一的映射关系在任何的shuffer和repartion(比如,reduceBykey 或者 window())操作后都无法保持。

Storing Offsets 保存偏移量

Kafka 在失败的时的传输语义 取决于如何和什么时候 存储偏移量。 Spark 输出的操作是 at-least-once。因此,如果想要达到 exactly-once的语义要求,你要么存储在一个幂等操作后存储offsets,或者在一个原子的事物内存储便宜量。为了增加可好性 ,如何存储offsets,这里你有三个选择:checkPoints,Kafka 本身,自己的数据存储

Checkpoints

如果开启了Spark checkpointing, offsets 可以存储在checkpoint上,实现很简单,但是有缺点。你的输出必须是幂等的,因为会输出重复的结果;事物不可行。另外,如果应用的代码改变了,是不能从chekpoint恢复的。对于有假话升级的话,需要和老的代码一起运行新的代码(由于 输出是幂等的,所以不能有冲突),但是对于一些意外的需要修改代码的失败,除非有一个其他的方式知道如何开始消费offset,不然就会丢数据。

Kafka itself

Kafka 有一个offset 提交的api, 能够存储特定kafkatopic的offsets。默认情况下,新的消费者会定期自动提交offsets,
















2016-03-06 16:34:01 zhong_han_jun 阅读数 17317
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2456 人正在学习 去看看 肖滨

开发环境

windows7 64、intellij idea 14.1.5、spark-1.5.2、scala 2.0.4、java1.7、maven3.05

将spark中的assembly包引入即可使用local模式运行相关的scala任务,注意不要使用scala2.11,非要使用的话先用这个版本的scala编译一遍spark哈

代码部分

pom文件

先附上pom.xml中的jar包依赖部分

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.4</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.10.4</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.10.4</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.12</version>
        </dependency>
        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.2</version>

        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.29</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.5.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>com.jolbox</groupId>
            <artifactId>bonecp</artifactId>
            <version>0.8.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.2-1002.jdbc4</version>
        </dependency>

    </dependencies>

mysql连接池

连接池,利用boncp创建连接池供使用,引用自网上的代码,详细可看参考资料

/**
 * Created by Administrator on 2016/2/29.
 * 参考资料 http://www.myexception.cn/mysql/1934808.html
 */
import java.sql.{Connection, ResultSet}
import com.jolbox.bonecp.{BoneCP, BoneCPConfig}
import org.slf4j.LoggerFactory

object ConnectionPool {

  val logger = LoggerFactory.getLogger(this.getClass)
  private val connectionPool = {
    try{
      Class.forName("com.mysql.jdbc.Driver")
      val config = new BoneCPConfig()
      config.setJdbcUrl("jdbc:mysql://localhost:3306/test")
      config.setUsername("etl")
      config.setPassword("xxxxx")
      config.setLazyInit(true)

      config.setMinConnectionsPerPartition(3)
      config.setMaxConnectionsPerPartition(5)
      config.setPartitionCount(5)
      config.setCloseConnectionWatch(true)
      config.setLogStatementsEnabled(false)

      Some(new BoneCP(config))
    } catch {
      case exception:Exception=>
        logger.warn("Error in creation of connection pool"+exception.printStackTrace())
        None
    }
  }
  def getConnection:Option[Connection] ={
    connectionPool match {
      case Some(connPool) => Some(connPool.getConnection)
      case None => None
    }
  }
  def closeConnection(connection:Connection): Unit = {
    if(!connection.isClosed) {
      connection.close()

    }
  }
}

streaming处理程序

ssc程序,数据示例如下,删除了一些关键数据(不影响此次处理),__开头的key为系统自带属性.

__clientip=10.10.9.153&paymentstatus=0&__opip=&memberid=89385239&iamount=1&itype=16&oper_res=1&channeltype=8&__timestamp=1457252427&productid=112&selectbank=&icount=0&ordersrc=web&paymentip=61.159.104.134&orderdate=2016-03-06 16:19:55&subjecttype=zheanaiMessenger&oper_type=1&paydate=&orderamount=259.0&paymentchannel=16&oper_time=2016-03-06 16:20:27&orderid=127145727&iunit=month&bussinessid=80125727&isuse=0
__clientip=10.10.9.175&paymentstatus=0&__opip=&memberid=89378034&iamount=12&itype=17&oper_res=1&channeltype=75&__timestamp=1457252429&productid=124&selectbank=&icount=0&ordersrc=100&paymentip=59.37.137.119&orderdate=2016-03-06 16:20:29&subjecttype=zheanaiMessenger&oper_type=0&paydate=&orderamount=388.0&paymentchannel=1028&oper_time=2016-03-06 16:20:29&orderid=127145736&iunit=month&bussinessid=8012580&isuse=0
__clientip=10.10.9.153&paymentstatus=0&__opip=&memberid=75372899&iamount=12&itype=16&oper_res=1&channeltype=&__timestamp=1457252286&productid=131&selectbank=&icount=0&ordersrc=web&paymentip=113.226.244.206&orderdate=2016-03-06 16:18:06&subjecttype=zheanaiMessenger&oper_type=0&paydate=&orderamount=99.0&paymentchannel=307&oper_time=2016-03-06 16:18:06&orderid=127145700&iunit=month&bussinessid=80125477&isuse=0
__clientip=10.10.9.175&paymentstatus=0&__opip=&memberid=87634711&iamount=1&itype=16&oper_res=1&channeltype=8&__timestamp=1457252432&productid=129&selectbank=&icount=0&ordersrc=web&paymentip=114.246.35.251&orderdate=2016-03-06 16:19:05&subjecttype=zheanaiMessenger&oper_type=1&paydate=&orderamount=19.0&paymentchannel=16&oper_time=2016-03-06 16:20:32&orderid=127145713&iunit=month&bussinessid=66213022&isuse=0
__clientip=10.10.9.153&paymentstatus=0&__opip=&memberid=89172717&iamount=12&itype=17&oper_res=1&channeltype=77&__timestamp=1457252371&productid=124&selectbank=&icount=0&ordersrc=4&paymentip=111.126.43.83&orderdate=2016-03-06 16:19:31&subjecttype=zheanaiMessenger&oper_type=0&paydate=&orderamount=388.0&paymentchannel=1116&oper_time=2016-03-06 16:19:31&orderid=127145723&iunit=month&bussinessid=8012568&isuse=0

主要操作如下
读取,ssc自带的receiver
解析(valueSplit方法 处理成kv格式)
过滤filterRegex,类似sql中的where条件放弃一些不需要的数据,比如只需要买单的数据而不要下单数据
转换,getPlatform、getFormatDate,类似case when
创建了一个class命名为result,重写了toString方法。该class存放从kafka中处理后的所有需要的数据字段。
写入mysql,insertIntoMySQL,方法在每个partition中调用
另外代码中使用了getOrCreate以便恢复,利用了计数器简单统计了一下有效记录数
代码如下

import java.text.SimpleDateFormat
import java.util.Date

import com.zhenai.SqlConnection.ConnectionPool
import java.sql.Connection

import org.apache.log4j.PropertyConfigurator
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Time,Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import org.slf4j.LoggerFactory

import scala.collection.mutable.Map

/**
 * Created by Administrator on 2016/2/25.
 */

object KafkaStreaming {

  val logger = LoggerFactory.getLogger(this.getClass)
  PropertyConfigurator.configure(System.getProperty("user.dir")+"\\src\\log4j.properties")

  case class result(ftime:String,hour:String,orderid:Long,memberid:Long,platform:String,iamount:Double,orderamount:Double)extends Serializable{
    override  def toString: String="%s\t%s\t%d\t%d\t%s\t%.2f\t%.2f".format(ftime, hour,orderid,memberid,platform,iamount,orderamount)
  }


  def getFormatDate(date:Date,format:SimpleDateFormat): String ={
    format.format(date)
  }
  def stringFormatTime(time:String,simpleformat:SimpleDateFormat): Date ={
    simpleformat.parse(time)
  }

  // kafka中的value解析为Map
  def valueSplit(value:String): Map[String,String] ={
    val x = value.split("&")
    val valueMap:Map[String,String] = Map()
    x.foreach { kvs =>
      if (!kvs.startsWith("__")){
        val kv = kvs.split("=")
        if (kv.length==2) {
          valueMap += (kv(0) -> kv(1))
        }
      }

    }
    valueMap
  }

  // 实现类似where的条件,tips:优先过滤条件大的减少后续操作
  def filterRegex(map:Map[String,String]): Boolean ={
    //过滤操作类型,控制为支付操作
    val oper_type = map.getOrElse("oper_type","-1")
    if(!oper_type.equals("2") && !oper_type.equals("3"))
      return false
    // 过滤未支付成功记录
    if(!map.getOrElse("paymentstatus","0").equals("1"))
      return false
    // 过滤无效支付ip
    val paymentip =  map.getOrElse("paymentip",null)
    if (paymentip.startsWith("10.10")||paymentip.startsWith("183.62.134")||paymentip.contains("127.0.0.1"))
      return false
    return true
  }
  // 实现类似 case when的方法,上报的p字段不一定为数值
  def getPlatform(p:String,x:Int): String ={
    val platformname = (p,x) match{
     case (p,x) if(Array[String]("1","2","3").contains(p)) => "wap"
      case (p,x) if(Array[String]("4","8").contains(p)&& x!=18) =>"andriod"
      case (p,x) if((Array[String]("5","7","51","100").contains(p))&&(p!=18)) => "ios"
      case _ => "pc"
    }
    platformname
  }
// 数据库写入
  def insertIntoMySQL(con:Connection,sql:String,data:result): Unit ={
    // println(data.toString)
    try {
      val ps = con.prepareStatement(sql)
      ps.setString(1, data.ftime)
      ps.setString(2, data.hour)
      ps.setLong(3,data.orderid)
      ps.setLong(4, data.memberid)
      ps.setString(5, data.platform)
      ps.setDouble(6, data.iamount)
      ps.setDouble(7, data.orderamount)
      ps.executeUpdate()
      ps.close()

    }catch{
      case exception:Exception=>
        logger.error("Error in execution of query "+exception.getMessage+"\n-----------------------\n"+exception.printStackTrace()+"\n-----------------------------")
    }
  }
  def createContext(zkqurm:String,topic:scala.Predef.Map[String,Int],checkPointDir:String): StreamingContext ={


    val simpleformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val dateFormat = new SimpleDateFormat("yyyyMMdd")
    val timeFormat = new SimpleDateFormat("HH:mm")

    val sql ="insert into t_ssc_toufang_result_mi(ftime,hour,orderid,memberid,platform,iamount,orderamount) values(?,?,?,?,?,?,?);"


    val conf = new SparkConf()
    conf.setAppName("Scala Streaming read kafka")
    // VM option -Dspark.master=local
    // conf.setMaster("local[4]")
    val sc = new SparkContext(conf)

    val totalcounts = sc.accumulator(0L,"Total count")

    val ssc =  new StreamingContext(sc,Seconds(60))
    //ssc.checkpoint(checkPointDir)
    //统计各平台最近一分钟实时注册收入 时间段,平台,金额,订单数
    val lines = KafkaUtils.createStream(ssc, zkqurm, "mytopic_local",topic).map(_._2)

    val filterRecord = lines.filter(x => !x.isEmpty).map(valueSplit).filter(filterRegex).map{x =>
      val orderdate = stringFormatTime(x.getOrElse("orderdate",null),simpleformat)
      val day = getFormatDate(orderdate,dateFormat)
      val hour = getFormatDate(orderdate,timeFormat)
      var orderamount = x.getOrElse("orderamount","0").toDouble
      if (x.getOrElse("oper_type",-1)==3)
        orderamount = -1*orderamount
      val res = new result(
        day
        ,hour
        ,x.getOrElse("orderid",null).toLong
        ,x.getOrElse("memberid",null).toLong
        ,getPlatform(x.getOrElse("ordersrc",null),x.getOrElse("itype",null).toInt)
        ,x.getOrElse("iamount","0").toDouble
        ,orderamount
      )
      res
    }

    filterRecord.foreachRDD((x: RDD[result],time: Time) =>{
      if(!x.isEmpty()) {
        // 打印一下这一批batch的处理时间段以及累计的有效记录数(不含档次)
        println("--"+new DateTime(time.milliseconds).toString("yyyy-MM-dd HH:mm:ss")+"--totalcounts:"+totalcounts.value+"-----")
        x.foreachPartition{res =>
          {
            if(!res.isEmpty){
              val connection = ConnectionPool.getConnection.getOrElse(null)
              res.foreach {
                      r: result =>totalcounts.add(1L)
                        insertIntoMySQL(connection, sql, r)
              }
              ConnectionPool.closeConnection(connection)
            }
          }
        }
      }
    })

    ssc
  }
// =================================================================

  def  main(args:Array[String]): Unit ={
    val zkqurm = "10.10.10.177:2181,10.10.10.175:2181,10.10.10.179:2181"

    val topic = scala.Predef.Map("t_fw_00015"->30)
    val checkPointDir ="/user/root/sparkcheck"
    val ssc = StreamingContext.getOrCreate(checkPointDir,
      () => {
        createContext(zkqurm, topic,checkPointDir)
      })
    ssc.start()
    ssc.awaitTermination()
  }
}

补充log4j.propertites文件代码

log4j.rootLogger=WARN,stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=mapreduce_test.log
log4j.appender.R.MaxFileSize=10MB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n
log4j.logger.com.codefutures=WARN

打包成jar运行

maven

由于是在idea intellij上配置的maven工程,直接使用maven打包了

build artifacts,操作流程如下

配置好要打包的类:   file -> project structure -> artifacts,操作如下

111
选择main方法,点击确认,结果如下
2222
发现结果包名为自己的项目名称,这里可以调整,选中jar一行右键重命名;引入了所有的类,选中全部右键remove掉,当然也可以根据需要选择保留需要的依赖,这里避免最终jar过大选择全部删掉
333
结果如下,注意有个输出路径,build完之后在这个路径下找到jar包
4
点击确认返回界面,左上的导航栏选择 build ->build artifacts ,选择刚刚配置的名字KafkaSsc:jar进行build
这里写图片描述
这里写图片描述
完成之后可到输出路径下发现有个jar包KafkaStreaming.jar,可以使用winrar打开看看里面都写了什么

运行

上传到linux下进行运行,指定两个jar包使用local模式,命令如下    
spark-submit --master local[4]  \
--class "com.xxxx.streaming.KafkaStreaming" \
--jars spark-streaming-kafka-assembly_2.10-1.5.0.jar,mysql-connector-java-5.1.29.jar \
KafkaStreaming.jar

运行情况如下,错误信息为log4j.properties未设置好,这个代码注释掉或者修改路径即可
这里写图片描述

一些个人想法

以上代码的不足
1、未考虑写入mysql失败的情况,处理方法:可以在insertIntoMySQL中做容错处理,这个不行我写到另一个地方存放起来,后台再起一个线程定时把这些数据写到目标mysql中去
2、代码可以更简洁一点、mysql、zk等参数应当支持配置文件处理
3、读取kafka数据建议KafkaUtils.createStream改为低阶api的实现KafkaUtils.createDirectStream去读取。具体可自行查询或者查看我的下一篇博客对这两个的一些总结

关于使用连接池说明:
1、不希望每写一条记录都创建一个连接,资源消耗大
2、分布式集群中,我们不知道这个数据记录会在那一台机,但是可以知道的是至少每一个partition里所有数据都是在一台机的
3、针对每个parition创建连接相对来说也很耗费资源,在处理时间段内整个ssc是大量sc的job组成的对rdd处理队列,存在多个mysql的长连接是必要的
为何不返回到driver端来执行呢?
个人认为在数据量不大的情况下这是可行的,完全可以返回所有的数据,并且批量写入mysql;但是如果数据量大的话很影响效率

关于实时业务场景的使用
细心的同学可能已经发现,在上面的ssc程序中并没有使用reduce之类的聚合操作,这个其实关系到一个业务场景,这个数据出来主要是用于报表的;首先这个数据量实在是很小,每分钟有效的不超过10条记录(实际上可以直接mysql搞掂,当然并没有这样干);另外针对收入这一块,运营人员可能有很多维度需要查询,而且需求是变动的,这个时候数据还是尽量明细保留下来的好,避免需求变动带来的频繁修改代码。

以上,如有不足欢迎留言讨论

错误说明

错误一、scala版本错误
Exception in thread “main” java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
解决方法:改为使用2.10版本

错误二、客户端调试ssc程序访问kafka异常
WARN - [mytopic_local_ZA6600-1456571483267-f287bec2-leader-finder-thread], Failed to find leader for Set([mytopic,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(mytopic)] from broker [ArrayBuffer(id:0,host:bj-230,port:9092)] failed
at kafka.client.ClientUtils.fetchTopicMetadata(ClientUtils.scala:72)atkafka.client.ClientUtils.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManagerLeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)atkafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)Causedby:java.nio.channels.ClosedChannelExceptionatkafka.network.BlockingChannel.send(BlockingChannel.scala:100)atkafka.producer.SyncProducer.liftedTree11(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafkaproducerSyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
… 3 more

解决方案
仔细看错误返回 from broker [ArrayBuffer(id:0,host:bj-230,port:9092)] failed 这个访问的broker的host竟然是bj-230,是kafka服务所在的服务器,我客户端压根没有这个hostname的映射
这里把它直接修改为ip地址,打开server.properties,增加配置项advertised.host.name=10.10.10.230,重启kafka服务;另一种处理方法也可以在hosts中配置ip映射

2019-08-14 09:13:03 wutian713 阅读数 991
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2456 人正在学习 去看看 肖滨

spark-streaming-kafka-demo

使用Springboot框架,Sparkstreaming监听Kafka消息,Redis记录已读Kafka偏移量,Spark统计单词出现次数,最后写入Hive表。

代码参考:https://github.com/JunjianS/spark-streaming-kafka-demo

注意事项

  1. 版本信息
  • Kafka:2.12-2.3.0
  • Spark:1.6.0
  • Redis:4.x
  • Hadoop:2.6.0-cdh5.15.2
  1. 读取Kafka

    采用Direct方式:此方式不使用接收器接收数据,而是周期性查询Kafka中每个主题+分区中的最新偏移量,并相应地定义要在每批中处理的偏移量范围。

   JavaInputDStream<String> dStream = KafkaUtils.createDirectStream(
                    jssc,
                    String.class,
                    String.class,
					          StringDecoder.class,
					          StringDecoder.class,
					          String.class,
					          kafkaParams,
					          offsets,
					          new MessageAndMeta());

正常处理RDD后,更新Offset到Redis

   // 更新offset
   for (OffsetRange offsetRange : offsetRanges.get()) {
		setOffsetToRedis(offsetRange);
   }

每次启动前从Redis获取上次读取的Offset

   // 获取消费kafka的offset
   getOffset(topicsSet);

处理数据的作业启动后,Kafka consumerAPI读取Kafka中定义的偏移量范围(类似于从文件系统读取文件),每60s读取一次Kafka消息,这个间隔时间建议根据kafka写入速度自行设置,Sparkstreaming的微批处理模式,Spark处理后每批写一个数据文件(直接写hdfs或者hive表),如果每批读取的文件太少,会造成大量小文件(大量小文件的问题请自行bing)。当消息为空时,Spark任务会生成空文件,为避免生成空文件,在操作前对RDD进行非空判断

  if (!rowRDD.isEmpty()) {
  //操作RDD
  }
  1. RDD操作

    示例是计算单词频次,Spark的transformation算子用到的function需要序列化,所以如果使用匿名类,那匿名类所在的宿主类也必须能序列化,所以示例中把flatMap、reduceByKey等算子的function单独定义了类,放在util目录下。

    spark任务默认会在Hive表数据文件目录生成staging文件,可以配置到统一目录定时清理

  hiveContext.sql("set hive.exec.stagingdir = /tmp/staging/.hive-staging")
  1. 运行
    • 本地运行
      按照Springboot的方式直接运行主类 SprakStreamingMain
    • 集群环境运行
      • 打包

        Spark不支持使用spring-boot-maven-plugin打包的springboot项目结构,所以本项目使用maven-shade-plugin插件打包成一个fat的jar包;因为集群中
        一般都有相关的jar包,所有Spark相关的jar包都不需要打进jar包,在pom中把scope设置成provided。

      • 运行

        有多种运行模式,

Master参数 含义
local 使用1个worker线程在本地运行Spark应用程序
local[K] 使用K个worker线程在本地运行Spark应用程序
local. 使用所有剩余worker线程在本地运行Spark应用程序
spark://HOST:PORT 连接到Spark Standalone集群,以便在该集群上运行Spark应用程序
mesos://HOST:PORT 连接到Mesos集群,以便在该集群上运行Spark应用程序
yarn-client 以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行。
yarn-cluster 以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运行。

举个使用yarn集群的例子,通过参数properties-file指定springboot配置文件

spark-submit --master yarn-cluster  --num-executors 2 --driver-memory 128m --executor-memory 128m --executor-cores 2  --class com.sjj.SprakStreamingMain --properties-file /root/application.properties spark-demo-boot.jar
2018-12-02 15:35:39 qq_40825218 阅读数 236
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2456 人正在学习 去看看 肖滨

Spark Streaming with Kafka integration

  • 在这里我们开始介绍如何配置spark Streaming去接受来自kafka的数据,有两个方法能够做到:
    • 1.老方法,使用的是Receivers和kafka的高级API
    • 2.新方法,取消了使用Receivers
    • 他们拥有不同的编程模型,代表特征和保证语义,所以,阅读来获得更多的细节,两个方法都是被考虑经过现在spark版本的稳定API实现的

Approach 1:Receiver-based Approach(基于接收器的方法)

  • 这个方法是使用一个接收器来接受数据,这个接收器是使用kafka高级API的工具,对于所有的接收器来说,kafka的数据接受是通过一个数据接收器,此接收器是存在于spark Executor中的,之后的任务启动是通过spark Streaming来处理数据。
  • 然而在默认的配置下,此方法有可能会发生错误从而丢失数据(详情请看Receiver reliability),为确保数据不丢失,不得不另外添加一个Write-Ahead Log(WAL)在spark Streaming中,kafka中所有接收到的数据会同步存储在WAL分布式文件系统HDFS上 ,所以,数据发生错误的时候可被恢复
  • 该方式如何设置在流式应用中:
import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum],[consumer group id],[per-topic number of Kafka partitions to consume]

需要注意的点

  • 1.在Kafka中的Topic Partitions不能与Spark Streaming中RDD产生的分区相关联,所以,增加topic特定的分区(KafkaUtils.createStream():仅仅能增加一个拥有简单接收器的consumer线程的数量),它并不能增加接受数据的并行度
  • 多个kafka输入DSstreams可以被不同的group和topic创建,为了使用多个Receiver来并行接受数据
  • 如果你启用WAL with 一个多副本文件系统如HDFS,这些已经接受到的数据将被复制在日志中,因此输入流的存储级别为存储级别StorageLevel

Apporach2:Direct Approach(No Receivers)无接收器

  • 这个新的缺少接受器(直连)的方式是在spark1.3版本发布的,为了保证更加健壮的端对端担保,取代了使用接收器接收数据,此方法周期性的查询在Kafka中每个topic+partition的最近offset,还包括定义了offset range同时执行每个批次,当任务开启,开始处理数据后,kafka的低阶API就开始读取来自kafka中被定义过的offset的范围(简单的读取文件从文件系统中)
    优点

  • 1.更加简明的并行机制:

    不需要创建多个kafka stream来输入和合并;通过directStream,spark Streaming可以创建一个和kafka中分区一致的多个RDD Partition来提供消费,此形式会并行读取kafka中的所有数据,所以这是一个一对一的映射关系

  • 2.更高效
    Receiver-based approach实现零数据丢失需要让数据存储在一个WAL中,是一种远程的数据复制。数据被有效的获取和复制了两次,一次是通过kafka,第二次是wal,所以是低效的方式;
    为了高效,取消了接收器,因此不要WAL。由于有一个充足的kafka保留区域,消息能够从kafka中```

  • 3.明确语义

    第一种方法使用kafka的高级API在zookeeper中存储消费后的offset,这是一个传统的方法去消费kafka中的数据,虽然这个方法能够保证数据零丢失,在发生错误后小概率许多记录被消费两次。此错误的发生时因为在spark streaming数据可靠接受和zookeeper跟踪的offset相互矛盾(数据不一致)

    所以在直连方式中我们使用底层kafka API来做跟踪offset,而不使用zookeeper跟踪offset;在直连方式中,offset的追踪是根据spark streaming的checkpoint,消除了在spark streaming和zookeeper/kafka之间的矛盾(数据不一致),所以,每一条记录只会被spark streaming有效读取一次。

    此方法的缺点是,不会更新zookeeper中的offset,因此以zookeeper作为基础的kafka镜像监控工具不会显示进度。但是,可以在每个批次中访问由此方法处理的偏移量,并自己更新zookeeper的offset

2019-01-08 09:46:36 xcg132566 阅读数 260
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2456 人正在学习 去看看 肖滨

kafka和spark总结

本文涉及到的技术版本号:

  • scala 2.11.8
  • kafka1.1.0
  • spark2.3.1

kafka简介

kafka是一个分布式流平台,流媒体平台有三个功能

  • 发布和订阅记录流
  • 以容错的持久化的方式存储记录流
  • 发生数据时对流进行处理

kafka通常用于两大类应用

  • 构件在系统或应用程序之间可靠获取数据的实时数据管道
  • 构件转换或响应数据流的实时流应用程序

kafka的几个概念

  • kafka运行在集群上,或一个或多个能跨越数据中心的服务器上
  • kafka集群上存储流记录的称为topic
  • kafka的topic里,每一条记录包括一个key、一个value和一个时间戳timestamp

kafka有四个核心API

  • Producer API

    生产者api允许应用程序发布一个记录流到一个或多个kafka的topic

  • Consumer API

    消费者api允许应用程序订阅一个或多个topic并且接受处理传送给消费者的数据流

  • Streams API

    流api允许应用程序作为一个流处理器,从一个或多个输入topic中消费输入流,并生产一个输出流到一个或多个输出topic中

  • Connector API

    连接器api允许构建和运行中的kafka的topic连接到现有的应用程序或数据系统中重用生产者或消费者。例如关系数据库的连接器可以捕获对表的每一个更改操作

kafka中的客户端和服务端之间是通过简单、高性能的语言无关的TCP协议完成的,该协议已经版本化并且高版本向低版本向后兼容。

topics

topic为kafka为记录流提供的核心抽象,类似于数据通道,并且topic是发布记录和订阅的核心。

kafka的topic是多用户的,一个topic可以有0个、1个或多个消费者订阅记录

对于每一个topic,kafka集群都维护了一个如下所示的分区记录:
topic

其中每一个分区都是有序的不可变的记录序列,并且新数据是不断的追加到结构化的记录中。分区中的记录每个都分配了一个offset作为ID,它唯一标识分区中的每个记录。

kafka集群默认是保存所有记录,无论是否被消费过,但是可以通过配置保留时间策略。例如如果设置数据保留策略为两天,则超过两天的数据将被丢弃释放空间。kafka的性能受数据大小影响不大,因此长时间的存储数据并不是太大的问题。

其中,kafka 的消费者唯一对topic中的每一个分区都可以设置偏移量offset,标识当前消费者从哪个分区的哪一条数据开始消费,消费者通过对偏移量的设置可以灵活的对topic进行消费。如下图
offset

消费者控制自己的偏移量就意味着kafka的消费者是轻量的,消费者之间互不影响。

topic记录中的分区有多种用途,首先它允许topic扩展到超出单台服务器适合的大小。每个分区都需要有适合托管分区的服务器,而topic可以有很多分区,因此一个topic可以处理任意数量的数据。另外这些分区作为并行的单位,效率很高,这也是相当重要的一点。

分配

记录分区分布在kafka集群服务器上,每个服务器共同处理数据并请求分区的共享。每个分区都可以在可用服务器的数量上进行复制,以此实现容错。

每一个分区都会有一个服务器作为leader,0个或多个服务器作为followers。leader处理分区的所有读取和写入请求,而follower被动的复制leader。如果leader出错,则其中一个follower会自动称为新的leader。集群中的每个服务器都充当某分区的leader和其他分区的follower,因此能在集群中达到负载均衡。

生产者

生产者将数据发布到所选择的分区,生产者在发布数据是需要选择将数据发送到哪个分区,分配分区可以通过循环方式完成也可以根据语义分区的功能实现。

消费者

消费者使用消费者组(consumer group)标记自己。发布到topic的每个记录会被发送到每个消费者组中的一个消费者实例。所以当一个消费者组中有多个消费者实例,则记录将在该消费者组中的所有消费者之间进行有效的负载均衡。

topic接受的每一条记录都会被广播发送到每个消费者组中。示意图如下:

消费者
上图有两个机器的kafka集群,某topic有四个分区p0-p3,有两个消费者组A/B订阅该topic,消费者组A有两个消费者实例,消费者组B有四个消费者实例。

kafka中实现消费的方式是通过在消费者实例上划分分区实现,保证实例在任何时间点都是公平分配的。消费者组中的成员划分分区是由kafka协议进行动态处理。如果新实例加入该组,那新加入的实例会从改组的成员中接管一些分区。如果消费者组中的某个实例死亡,则它所划分的分区分配给该消费组的其他实例。

kafka只能提供一个分区内的记录的顺序,但是不保证多个分区的记录顺序。如果用户想保证topic中的顺序,则使用一个分区的topic即可,但这样就意味着每个消费者组中只能有一个消费者实例。

kafka提供的保证

  • 同一个生产者实例发送到特定topic的特定分区的两条数据M1和M2,并且M1发送早于M2,则M1将拥有更低的偏移量,即可以保证插入顺序。
  • 消费者可以按照记录存储的顺序消费记录
  • 对于复制因子为N的topic,最多可以容忍N-1个服务器故障,而不会丢失提交到topic中的记录

kafka常用命令

  • 启动Zookeeper server

    bin/zookeeper-server-start.sh config/zookeeper.properties &
    
  • 启动Kafka server

    nohup bin/kafka-server-start.sh config/server.properties &
    
  • 停止Kafka server

    bin/kafka-server-stop.sh
    
  • 停止Zookeeper server

    bin/zookeeper-server-stop.sh
    
  • producer

    bin/kafka-console-producer.sh --broker-list 192.168.20.133:9092 --topic realtime
    
  • consumer

    bin/kafka-console-consumer.sh --zookeeper 192.168.20.133:2181 --topic realtime --from-beginning
    
  • 查看所有topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    
  • 创建一个topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic realtime0103
    
  • 查看topic详情

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic realtime0103
    
  • 删除topic

    bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic realtime0103
    

java操作kafka

引入jar包

  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.0</version>
  </dependency>

Producer

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.20.133:9092,192.168.20.134:9092,192.168.20.135:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    String topic = "realtime0103";

    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    String value = "{'name':'1','value':1}" ; 
    
    //设定分区规则,分区为0,1,2
    int partation = KafkaProducerClient.count.get() % 3;

    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,partation, "key1",value );
      
    producer.send(record).get();
  }
}

Customer

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;


public class CustomerDemo {

  private static KafkaConsumer<String, String> consumer;
  private static String inputTopic;

  @SuppressWarnings({ "unchecked", "rawtypes" })
  public static void main(String[] args) {
    String groupId = "group1";
    inputTopic = "realtime0103";
    String brokers = "192.168.20.133:9092";

    consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId));
    
    //分配topic 某分区的offset
    TopicPartition part0 = new TopicPartition(inputTopic, 0);
    TopicPartition part1 = new TopicPartition(inputTopic, 1);
    TopicPartition part2 = new TopicPartition(inputTopic, 2);
    OffsetAndMetadata offset0 = new OffsetAndMetadata(1);
    OffsetAndMetadata offset1 = new OffsetAndMetadata(2);
    OffsetAndMetadata offset2 = new OffsetAndMetadata(3);
    Map<TopicPartition,OffsetAndMetadata> offsetMap = new HashMap<>();
    offsetMap.put(part0,offset0);
    offsetMap.put(part1,offset1);
    offsetMap.put(part2,offset2);
    //提交offset信息
    consumer.commitSync(offsetMap);
    
    start();

  }

  private static Properties createConsumerConfig(String brokers, String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokers);
        props.put("group.id", groupId);
        props.put("auto.commit.enable", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

  private static void start() {
    consumer.subscribe(Collections.singletonList(inputTopic));

        System.out.println("Reading topic:" + inputTopic);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record: records) {
                String ip = record.key();
                String event = record.value();
                System.out.println(event);
            }
            consumer.commitSync();
        }

  }
}

spark操作kafka

IDEA配置搭建spark scala开发环境(Windows)

  • 安装jdk8并配置环境变量
  • 安装scala2.11并配置环境变量(本文安装2.11.8)
  • 安装IDEA
  • IDEA安装SBT和Scala插件
  • File->New->Project 创建新项目,选择Scala->sbt->next

新建项目

  • 选择项目名称、位置、Java版本号、sbt版本和Scala版本,Finish

选择版本

  • 打开build.sbt,添加相关依赖

    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.1" % "provided"
    libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.1"
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
    
  • 刷新sbt项目,下载依赖:

刷新

  • 编写业务代码,可以使用以下的使用spark Structured Streaming连接kafka处理流部分代码
  • 设置打包规则
    • File->Project Sturcture->Artifacts 点击绿色加号设置打jar包规则

      Artifacts

    • 选择Module和Main class,JAR file from libraries选择copy to output…即不将外部jar打包到jar文件中

      Artifacts

    • 导航栏 Build->Build Artifacts ,打包成jar,将jar包上传到spark集群

  • 运行程序:
    • 以上配置打出jar包包含项目jar包和多个依赖jar包,提交spark作业时,可以使用–jars逗号隔开配置引用多个外部jar

      cd $SPARK_HOME
      ./bin/spark-submit --master spark://192.168.20.133:7077 --jars /root/interface-annotations-1.4.0.jar,/root/async-1.4.1.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class com.xuchg.app.Application /root/spark-kafka.jar
      

使用spark Structured Streaming连接kafka处理流

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object Main extends App {

  //spark读取kafka示例
  Logger.getLogger("org").setLevel(Level.ERROR)
  val kafkaAddress = "192.168.20.133:9092"
  val zookeeper = "192.168.20.133:2181"
  val topic = "realtime0103"
  val topicOffset = "{\""+topic+"\":{\"0\":0,\"1\":0,\"2\":0}}"
  val sparkSession = SparkSession
    .builder()
    .config(new SparkConf()
      .setMaster("local[2]")
      .set("spark.streaming.stopGracefullyOnShutdown","true")//设置spark,关掉sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
      .set("spark.submit.deployMode","cluster")
      .set("spark.executor.memory","4g")//worker内存
      .set("spark.driver.memory","4g")
      .set("spark.cores.max","2")//设置最大核心数
    )
    .appName(getClass.getName)
    .getOrCreate()

  def createStreamDF(spark:SparkSession):DataFrame = {
    import spark.implicits._
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaAddress)
      .option("zookeeper.connect", zookeeper)
      .option("subscribe", topic)
      .option("startingOffsets", topicOffset)
      .option("enable.auto.commit", "false")
      .option("failOnDataLoss", false)
      .option("includeTimestamp", true)
      .load()
    df
  }

  var df = createStreamDF(sparkSession)

  val query = df.writeStream
    .format("console")
    .start()

  query.awaitTermination()
}

监控spark和kafka

此处根据实际应用情况使用两种监控方法,解决两个不同问题

  • 解决spark启动和停止处理的动作,例如监听spark停止时处理或记录完所有计算

    //定义监听类继承SparkListener,并重写相关方法
    import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}
    class AppListener extends SparkListener{
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
        //监控spark停止方法,可以处理spark结束的动作
        println("application 关闭")
      }
    
      override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
        println("application 启动")
      }
    }
    
    //在主类中注册监听类
    sparkSession.sparkContext.addSparkListener(new AppListener)
    
  • 监控spark的查询,例如spark读取kafka流的偏移量offset,可以监听并记录下来,下次启动spark可以直接从该偏移量offset进行消费,不会消费相同的数据

    sparkSession.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
      }
      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        //服务出现问题而停止
        println("Query terminated: " + queryTerminated.id)
      }
      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        var progress = queryProgress.progress
        var sources = progress.sources
        if(sources.length>0){
          var a = 0
          for(a <- 0 to sources.length - 1){
            var offsetStr = sources.apply(a).startOffset
            if(offsetStr!=null){
              println("检测offset是否变化 -- " + offsetStr)
            }
          }
        }
      }
    })
    

    运行结果如下:可以看到对topic的每个分区的偏移量都可以获取到
    offset

管理和停止spark程序

在spark集群主节点配置并使用spark history server可以实现对spark作业进行管理和监控

  • 配置spark history server

    • 修改$SPARK_HOME/conf/spark-defaults.conf,如果不存在则从模板复制一份

      cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
      vi $SPARK_HOME/conf/spark-defaults.conf
      
    • 修改配置文件如下:

      spark.eventLog.enabled           true
      spark.eventLog.dir               hdfs://192.168.20.133:9000/spark-history
      spark.eventLog.compress          true
      # 注意ip地址需要根据情况更改,9000为hdfs默认端口号,如hdfs端口号不是9000则需要更改
      
    • 创建hdfs目录

      $HADOOP_HOME/bin/hdfs dfs -mkdir /spark-history
      
    • 配置$SPARK_HOME/conf/spark-env.sh文件:

      • 如果不存在则从模板复制:

        cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
        
      • 编辑$SPARK_HOME/conf/spark-env.sh,结尾添加:

        export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://192.168.20.133:9000/spark-history"
        # 18080为history server的访问端口号,192.168.20.133:9000为hdfs的ip和端口,根据实际情况修改
        
    • 打开防火墙18080端口

    • 执行命令打开history server服务

      $SPARK_HOME/sbin/start-history-server.sh
      
  • 代码中sparkSession创建时添加配置:

    //设置spark,关掉sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
    new SparkConf().set("spark.streaming.stopGracefullyOnShutdown","true")
    
  • 使用shell关掉某一个正在运行的spark作业:

    • spark作业关闭原理
      每一个spark作业都由一个appId唯一标识,而每一个作业包含多个Executors执行器,这些Executors中包含1个或几个id为driver的驱动程序,它是执行开发程序中的 main方法的进程。如果驱动程序停止,则当前spark作业就结束了。

    • spark关闭过程

      • 获取某appId的spark作业的driver节点的ip和端口,可以通过spark history server提供的页面或提供的api进行获取。此处介绍页面获取,后面会介绍api获取

        finddriver

      • 根据获取的driver的端口号对spark作业发送停止命令,当然有时ctrl+c和在监控页面上都是可以直接停止的,但此处只提用shell停止,应用场景更广泛。

        centod7:ss -tanlp |  grep 60063|awk '{print $6}'|awk  -F, '{print $2}'|awk -F= '{print $2}'|xargs kill -15
        centos6:ss -tanlp |  grep 60063|awk '{print $6}'|awk  -F, '{print $2}'|xargs kill -15
        

        注意:centos6和centos7稍有不同,而且此处使用kill -15而不是kill -9,kill -9会直接杀死进程,可能会导致丢失数据。而kill -15是发送停止命令,不会直接杀死进程。

    • 通过以上内容可以实现在spark集群主节点部署web服务接收并远程调用执行shell语句来达到远程动态启动(可传参)和停止spark作业,大体如下:

      • 远程调用接口传参启动spark作业,此时记录下spark运行的appid

      • 通过调用spark history server提供的REST Api获取当前作业driver进程的端口号:

        http://192.168.20.133:18080/api/v1/applications/{appId}/allexecutors
        

        driver

      • 通过获取到的端口号可以向spark集群主节点发送停止命令到该端口进程即可

示例项目地址:github
kafka官网
spark官网

Flume、Kafka、Spark streaming整合

博文 来自: huonan_123
没有更多推荐了,返回首页