精华内容
下载资源
问答
  • 公司Ubuntu12.04服务器突然开机起来后就卡住不动,打印kernel offset信息 目前已尝试以下方法,均失败 1:开机启动选择Ubuntu选项时,选择recovery mode启动,仍然卡住,显示kernel offset信息 2:在开机选项...
  • 主要介绍了springboot中如何实现kafa指定offset消费,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 主要给大家介绍了关于mysql查询时offset过大影响性能的原因和优化的相关资料,并在文末跟大家分享了MYSQL中limit,offset的区别,需要的朋友可以参考借鉴,下面随着小编来一起学习学习吧
  • 主要介绍了使用limit,offset分页场景时为什么会慢,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • 包含Mac 和 windows版本, 可以连接kafka,非常方便的查看topic、consumer、consumer-group 等信息。 1、首先在Properties页签下填写好 zookeeper 地址和端口 2、再从 Advanced页签下填写 broker地址和端口
  • 在Sql Server 2012之前,实现分页主要是使用ROW_NUMBER(),在SQL Server2012,可以使用Offset ...Rows Fetch Next ... Rows only的方式去实现分页数据查询,具体代码详解大家参考下本
  • 仿真比较器的offset

    2018-08-29 10:53:11
    比较器仿真必用,详细的仿真过程,以及EDA软件的充分介绍。
  • offset函数基础教程

    2014-03-05 14:08:46
    Excel的offset函数基础教程,从零开始哦
  • offset()函数使用方法

    2013-08-15 21:38:25
    offset()函数使用方法,这里有几个例子,可以看一下就非常明白这个用法了
  • Kafka偏移量(Offset)管理

    千次阅读 2020-03-12 19:29:20
    1.定义 Kafka中的每个partition都由一系列有序的、不...Offset记录着下一条将要发送给Consumer的消息的序号。 流处理系统常见的三种语义: 最多一次 每个记录要么处理一次,要么根本不处理 至少一次 这...

    1.定义

    Kafka中的每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息。

    Offset记录着下一条将要发送给Consumer的消息的序号。

    流处理系统常见的三种语义:

    最多一次每个记录要么处理一次,要么根本不处理
    至少一次这比最多一次强,因为它确保不会丢失任何数据。但是可能有重复的
    有且仅有一次每条记录将被精确处理一次,没有数据会丢失,也没有数据会被多次处理

    The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)

    1. At most once: Each record will be either processed once or not processed at all.
    2. At least once: Each record will be processed one or more times. This is stronger than at-most once as it ensure that no data will be lost. But there may be duplicates.
    3. Exactly once: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.

     2.Kafka offset Management with Spark Streaming

    Offset首先建议存放到Zookeeper中,Zookeeper相比于HBASE等来说更为轻量级,且是做HA(高可用性集群,High Available)的,offset更安全。

    对于offset管理常见的两步操作:

    • 保存offsets
    • 获取offsets

    3.环境准备

    启动一个Kafka生产者,测试使用topic:tp_kafka:

    ./kafka-console-producer.sh --broker-list hadoop000:9092 --topic tp_kafka

    启动一个Kafka消费者:

    ./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_kafka

    在IDEA中生产数据:

    package com.taipark.spark;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import java.util.Properties;
    import java.util.UUID;
    
    public class KafkaApp {
    
        public static void main(String[] args) {
            String topic = "tp_kafka";
    
            Properties props = new Properties();
            props.put("serializer.class","kafka.serializer.StringEncoder");
            props.put("metadata.broker.list","hadoop000:9092");
            props.put("request.required.acks","1");
            props.put("partitioner.class","kafka.producer.DefaultPartitioner");
            Producer<String,String> producer = new Producer<>(new ProducerConfig(props));
    
            for(int index = 0;index <100; index++){
                KeyedMessage<String, String> message = new KeyedMessage<>(topic, index + "", "taipark" + UUID.randomUUID());
                producer.send(message);
            }
            System.out.println("数据生产完毕");
    
        }
    }
    
    

    4.第一种offset管理方式:smallest

    Spark Streaming链接Kafka统计个数:

    package com.taipark.spark.offset
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object Offset01App {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")
        val ssc = new StreamingContext(sparkConf,Seconds(10))
    
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> "hadoop000:9092",
          "auto.offset.reset" -> "smallest"
        )
        val topics = "tp_kafka".split(",").toSet
        val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    
        messages.foreachRDD(rdd=>{
          if(!rdd.isEmpty()){
            println("Taipark" + rdd.count())
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    

    再生产100条Kafka数据->Spark Streaming接受:

    但这时如果Spark Streaming停止后重启:

    会发现这里重头开始计数了,原因是代码里将auto.offset.reset的值设置为了smallest。(kafka-0.10.1.X版本之前)

    5.第二种offset管理方式:checkpoint

    在HDFS中创建一个/offset文件夹:

    hadoop fs -mkdir /offset
    

    使用Checkpoint:

    package com.taipark.spark.offset
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
    
    object Offset01App {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")
    
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> "hadoop000:9092",
          "auto.offset.reset" -> "smallest"
        )
        val topics = "tp_kafka".split(",").toSet
        val checkpointDirectory = "hdfs://hadoop000:8020/offset/"
        def functionToCreateContext():StreamingContext = {
          val ssc = new StreamingContext(sparkConf,Seconds(10))
          val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
          //设置checkpoint
          ssc.checkpoint(checkpointDirectory)
          messages.checkpoint(Duration(10*1000))
    
          messages.foreachRDD(rdd=>{
            if(!rdd.isEmpty()){
              println("Taipark" + rdd.count())
            }
          })
    
          ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext _)
    
    
    
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    

    注:IDEA修改HDFS用户,在设置里的VM options中:

    -DHADOOP_USER_NAME=hadoop

    先启动:

    发现消费了之前的100条。这是停止之后,生产100条,再启动:

    发现这里只读取了上次结束到这次启动之间的100条,而不是像smallest一样读取之前所有条数。

    但是checkpiont存在问题,如果采用这种方式管理offset,只要业务逻辑发生了变化,则checkpoint就没有作用了。因为其调用的是getOrCreate()。

    6.第三种offset管理方式:手动管理偏移量

    思路:

    1. 创建StreamingContext
    2. 从Kafka获取数据  <== 拿到offset
    3. 根据业务逻辑进行处理
    4. 将处理结果写入外部存储 ==>保存offset
    5. 启动程序等待线程终止
    package com.taipark.spark.offset
    
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object Offset01App {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Offset01App")
        val ssc = new StreamingContext(sparkConf,Seconds(10))
    
    
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> "hadoop000:9092",
          "auto.offset.reset" -> "smallest"
        )
        val topics = "tp_kafka".split(",").toSet
        //从某地获取偏移量
        val fromOffsets = Map[TopicAndPartition,Long]()
    
        val messages = if(fromOffsets.size == 0){  //从头消费
          KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
        }else{  //从指定偏移量消费
    
          val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key,mm.message())
          KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)
    
          )
        }
    
        messages.foreachRDD(rdd=>{
          if(!rdd.isEmpty()){
            //业务逻辑
            println("Taipark" + rdd.count())
    
            //将offset提交保存到某地
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            offsetRanges.foreach(x =>{
              //提交如下信息提交到外部存储
              println(s"${x.topic} ${x.partition} ${x.fromOffset} ${x.untilOffset}")
            })
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    
    • 先保存offset后保存数据可能导致数据丢失
    • 先保存数据后保存offset可能导致数据重复执行

    解决方式1:实现幂等(idempotent)

    在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

    解决方式2:事务 (transaction)

    1.数据库事务可以包含一个或多个数据库操作,但这些操作构成一个逻辑上的整体。

    2.构成逻辑整体的这些数据库操作,要么全部执行成功,要么全部不执行。

    3.构成事务的所有操作,要么全都对数据库产生影响,要么全都不产生影响,即不管事务是否执行成功,数据库总能保持一致性状态。

    4.以上即使在数据库出现故障以及并发事务存在的情况下依然成立。

    将业务逻辑与offset保存放在一个事务里,仅执行一次。

    7.Kafka-0.10.1.X版本之后的auto.kafka.reset:

    earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    latest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    nonetopic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

     

    展开全文
  • 1、Kafka Offset 管理–Checkpoint 启用Spark Streaming的checkpoint是存储偏移量最简单的方法。 流式checkpoint专门用于保存应用程序的状态, 比如保存在HDFS上, 在故障时能恢复。 Spark Streaming的checkpoint...

    1、Kafka Offset 管理–Checkpoint

    1. 启用Spark Streaming的checkpoint是存储偏移量最简单的方法。
    2. 流式checkpoint专门用于保存应用程序的状态, 比如保存在HDFS上,
      在故障时能恢复。
    3. Spark Streaming的checkpoint无法跨越应用程序进行恢复。
    4. Spark 升级也将导致无法恢复。
    5. 在关键生产应用, 不建议使用spark检查点的管理offset方式。

    代码如下:

    /**
      * 用checkpoint记录offset
      * 优点:实现过程简单
      * 缺点:如果streaming的业务更改,或别的作业也需要获取该offset,是获取不到的
      */
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
    
    object StreamingWithCheckpoint {
      def main(args: Array[String]) {
        //val Array(brokers, topics) = args
        val processingInterval = 2
        val brokers = "node01:9092,node02:9092,node03:9092"
        val topics = "mytest1"
        // Create context with 2 second batch interval
        val sparkConf = new SparkConf().setAppName("ConsumerWithCheckPoint").setMaster("local[2]")
        // Create direct kafka stream with brokers and topics
        val topicsSet = topics.split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest")
        val checkpointPath = "hdfs://node01:9000/spark_checkpoint1"
    
        def functionToCreateContext(): StreamingContext = {
          val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))
          val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    
          ssc.checkpoint(checkpointPath)
          messages.checkpoint(Duration(8 * processingInterval.toInt * 1000))
          messages.foreachRDD(rdd => {
            if (!rdd.isEmpty()) {
              println("################################" + rdd.count())
            }
    
          })
          ssc
        }
    
        // 如果没有checkpoint信息,则新建一个StreamingContext
        // 如果有checkpoint信息,则从checkpoint中记录的信息恢复StreamingContext
        // createOnError参数:如果在读取检查点数据时出错,是否创建新的流上下文。
        // 默认情况下,将在错误上引发异常。
        val context = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
        context.start()
        context.awaitTermination()
      }
    }
    // 以上案例测试过程:
    // 模拟消费者向mytest1插入10条数据,
    // 强制停止streaming,
    // 再插入20条数据并启动streaming查看读取的条数为20条
    

    2、Kafka Offset 管理–Zookeeper

    1. 路径:
      val zkPath = s"{kakfaOffsetRootPath}/{groupName}/{o.topic}/{o.partition}"
    2. 如果Zookeeper中未保存offset,根据kafkaParam的配置使用最新或者最旧的offset
    3. 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream的起始位置

    代码如下:

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.curator.framework.CuratorFrameworkFactory
    import org.apache.curator.retry.ExponentialBackoffRetry
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.JavaConversions._
    
    object KafkaZKManager  extends Serializable{
      /**
        * 创建zookeeper客户端
        */
      val client = {
        val client = CuratorFrameworkFactory
          .builder
          .connectString("node01:2181/kafka0.9") // zk中kafka的路径
          .retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 重试指定的次数, 且每一次重试之间停顿的时间逐渐增加
          .namespace("mykafka") // 命名空间:mykafka
          .build()
        client.start()
        client
      }
    
      val kafkaOffsetRootPath = "/consumers/offsets"
    
      /**
        * 确保zookeeper中的路径是存在的
        * @param path
        */
      def ensureZKPathExists(path: String): Unit = {
        if (client.checkExists().forPath(path) == null) {
          client.create().creatingParentsIfNeeded().forPath(path)
        }
      }
    
      def storeOffsets(offsetsRanges:Array[OffsetRange], groupName:String) = {
        for (o <- offsetsRanges) {
          val zkPath = s"${kafkaOffsetRootPath}/${groupName}/${o.topic}/${o.partition}"
          ensureZKPathExists(zkPath)
          // 保存offset到zk
          client.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
        }
      }
    
      /**
        * 用于获取offset
        * @param topic
        * @param groupName
        * @return
        */
      def getFromOffsets(topic : String,groupName : String): (Map[TopicAndPartition, Long], Int) = {
        // 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream 的起始位置
        var fromOffsets: Map[TopicAndPartition, Long] = Map()
        val zkTopicPath = s"${kafkaOffsetRootPath}/${groupName}/${topic}"
        // 确保zookeeper中的路径是否存在
        ensureZKPathExists(zkTopicPath)
     	// 获取topic中,各分区对应的offset
        val offsets: mutable.Buffer[(TopicAndPartition, Long)] = for {
          // 获取分区
          p <- client.getChildren.forPath(zkTopicPath)
        } yield {
          //遍历路径下面的partition中的offset
          val data = client.getData.forPath(s"$zkTopicPath/$p")
          //将data变成Long类型
          val offset = java.lang.Long.valueOf(new String(data)).toLong
          println("offset:" + offset)
          (TopicAndPartition(topic, Integer.parseInt(p)), offset)
        }
    
        if(offsets.isEmpty) {
          (offsets.toMap,0)
        }else{
          (offsets.toMap,1)
        }
      }
    
      def main(args: Array[String]): Unit = {
        val processingInterval = 2
        val brokers = "node01:9092,node02:9092,node03:9092"
        val topic = "mytest1"
        val sparkConf = new SparkConf().setAppName("KafkaZKManager").setMaster("local[2]")
        // Create direct kafka stream with brokers and topics
        val topicsSet = topic.split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
          "auto.offset.reset" -> "smallest")
    
        val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))
    
        // 读取kafka数据
        val messages = createMyDirectKafkaStream(ssc, kafkaParams, topic, "group01")
    
        messages.foreachRDD((rdd,btime) => {
          if(!rdd.isEmpty()){
            println("==========================:" + rdd.count() )
            println("==========================btime:" + btime )
          }
          // 消费到数据后,将offset保存到zk
          storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "group01")
        })
    
        ssc.start()
        ssc.awaitTermination()
       }
    
      def createMyDirectKafkaStream(ssc: StreamingContext, kafkaParams: Map[String, String], topic: String, groupName: String): InputDStream[(String, String)] = {
        // 获取offset
        val (fromOffsets, flag) = getFromOffsets( topic, groupName)
        var kafkaStream : InputDStream[(String, String)] = null
        if (flag == 1) {
          // 这个会将kafka的消息进行transform,最终kafak的数据都会变成(topic_name, message)这样的tuple
          val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
          println("fromOffsets:" + fromOffsets)
          kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
        } else {
          // 如果未保存,根据kafkaParam的配置使用最新或者最旧的offset
          kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)
        }
        kafkaStream
      }
    
    }
    

    启动zk命令:node1为master节点

    zkCli.sh  -timeout 5000  -r  -server  node1:2181
    

    在这里插入图片描述

    3、Kafka Offset 管理–Hbase

    1. 基于Hbase的通用设计, 使用同一张表保存可以跨越多个spark streaming程序的topic的offset

    2. rowkey = topic名称 + groupid + streaming的batchtime.milliSeconds . 尽管
      batchtime.milliSeconds不是必须的, 但是它可以看到历史的批处理任务对offset的管理情况。

    3. kafka的offset保存在下面的表中,列簇为offsets, 30天后自动过期
      Hbase表结构:后面的时间为秒级
      create ‘spark_kafka_offsets’, {NAME=>‘offsets’, TTL=>2592000}

    4. offset的获取场景
      场景1:Streaming作业首次启动。 通过zookeeper来查找给定topic中分区的数量,然后返回“0”
      作为所有topic分区的offset。
      场景2:长时间运行的Streaming作业已经停止,新的分区被添加到kafka的topic中。 通过
      zookeeper来查找给定topic中分区的数量, 对于所有旧的topic分区,将offset设置为HBase中的最新偏移量。 对于所有新的topic分区,它将返回“0”作为offset。
      场景3:长时间运行的Streaming作业已停止,topic分区没有任何更改。 在这种情况下,HBase
      中发现的最新偏移量作为每个topic分区的offset返回。

      代码如下

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import kafka.utils.ZkUtils
    import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object KafkaHbaseManager {
      // 保存offset到hbase
      def saveOffsets(TOPIC_NAME: String, GROUP_ID: String, offsetRanges: Array[OffsetRange],
                      hbaseTableName: String, batchTime: org.apache.spark.streaming.Time) = {
        val hbaseConf = HBaseConfiguration.create()
        val conn = ConnectionFactory.createConnection(hbaseConf)
        val table = conn.getTable(TableName.valueOf(hbaseTableName))
        val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(batchTime.milliseconds)
        val put = new Put(rowKey.getBytes())
        for (offset <- offsetRanges) {
          put.addColumn(Bytes.toBytes("offsets"), Bytes.toBytes(offset.partition.toString),
            Bytes.toBytes(offset.untilOffset.toString))
        }
        table.put(put)
        conn.close()
      }
    
      // 从zookeeper中获取topic的分区数
      def getNumberOfPartitionsForTopicFromZK(TOPIC_NAME: String, GROUP_ID: String,
                                              zkQuorum: String, zkRootDir: String, sessTimeout: Int, connTimeOut: Int): Int = {
        val zkUrl = zkQuorum + "/" + zkRootDir
        val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, sessTimeout, connTimeOut)
        val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false)
        // 获取分区数量
        val zkPartitions = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size
        println(zkPartitions)
        zkClientAndConn._1.close()
        zkClientAndConn._2.close()
        zkPartitions
      }
    
      // 获取hbase的offset
      def getLastestOffsets(TOPIC_NAME: String, GROUP_ID: String, hTableName: String,
                            zkQuorum: String, zkRootDir: String, sessTimeout: Int, connTimeOut: Int): Map[TopicAndPartition, Long] = {
    
        // 连接zk获取topic的partition数量
        val zKNumberOfPartitions = getNumberOfPartitionsForTopicFromZK(TOPIC_NAME, GROUP_ID, zkQuorum, zkRootDir, sessTimeout, connTimeOut)
    
        val hbaseConf = HBaseConfiguration.create()
    
        // 获取hbase中最后提交的offset
        val conn = ConnectionFactory.createConnection(hbaseConf)
        val table = conn.getTable(TableName.valueOf(hTableName))
        val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis())
        val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
        val scan = new Scan()
        val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
        val result = scanner.next()
    
        var hbaseNumberOfPartitions = 0 // 在hbase中获取的分区数量
        if (result != null) {
          // 将分区数量设置为hbase表的列数量
          hbaseNumberOfPartitions = result.listCells().size()
        }
    
        val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
        if (hbaseNumberOfPartitions == 0) { // 如果没有保存过offset
          // 初始化kafka为开始
          for (partition <- 0 until zKNumberOfPartitions) {
            fromOffsets += ((TopicAndPartition(TOPIC_NAME, partition), 0))
          }
    
        } else if (zKNumberOfPartitions > hbaseNumberOfPartitions) { // 如果zk的partition数量大于hbase的partition数量,说明topic增加了分区,就需要对分区做单独处理
          // 处理新增加的分区添加到kafka的topic
          for (partition <- 0 until zKNumberOfPartitions) {
            val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
              Bytes.toBytes(partition.toString)))
            fromOffsets += ((TopicAndPartition(TOPIC_NAME, partition), fromOffset.toLong))
          }
          // 对新增加的分区将它的offset值设为0
          for (partition <- hbaseNumberOfPartitions until zKNumberOfPartitions) {
            fromOffsets += ((TopicAndPartition(TOPIC_NAME, partition), 0))
          }
        } else { // 如果既没有新增加的分区,也不是第一次运行
          // 获取上次运行的offset
          for (partition <- 0 until hbaseNumberOfPartitions) {
            val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
              Bytes.toBytes(partition.toString)))
            fromOffsets += ((TopicAndPartition(TOPIC_NAME, partition), fromOffset.toLong))
          }
        }
    
        scanner.close()
        conn.close()
        fromOffsets.toMap
      }
    
      def main(args: Array[String]): Unit = {
        val processingInterval = 2
        val brokers = "node01:9092,node02:9092,node03:9092"
        val topics = "mytest1"
        // Create context with 2 second batch interval
        val sparkConf = new SparkConf().setAppName("kafkahbase").setMaster("local[2]")
        // Create direct kafka stream with brokers and topics
        val topicsSet = topics.split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
          "auto.offset.reset" -> "smallest")
    
        val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))
        val groupId = "testp"
        val hbaseTableName = "spark_kafka_offsets"
    
        // 获取kafkaStream
        //val kafkaStream = createMyDirectKafkaStream(ssc, kafkaParams, zkClient, topicsSet, "testp")
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
        // 获取offset
        val fromOffsets = getLastestOffsets("mytest1", groupId, hbaseTableName, "node01:2181,node02:2181,node03:2181", "kafka0.9", 30000, 30000)
    
        var kafkaStream: InputDStream[(String, String)] = null
        kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    
        kafkaStream.foreachRDD((rdd, btime) => {
          if (!rdd.isEmpty()) {
            println("==========================:" + rdd.count())
            println("==========================btime:" + btime)
            saveOffsets(topics, groupId, rdd.asInstanceOf[HasOffsetRanges].offsetRanges, hbaseTableName, btime)
          }
    
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    在这里插入图片描述

    4、Kafka Offset 管理–Kafka本身

    stream.foreachRDD { rdd =>
    
    val offsetRanges =
    
    rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
    // some time later, after outputs have completed
    
    stream.asInstanceOf[CanCommitOffsets].commitAsync(off
    
    setRanges)
    
    }
    

    kafka自己管理offset官网链接:

    http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself

    5、Kafka Offset 管理–HDFS

    1. 可以将offset保存在HDFS上
    2. 与其他系统(Zookeeper、Hbase)相比, HDFS具有更高
      的延迟。 此外, 如果管理不当, 在HDFS上写入每个批次的
      offsetRanges可能会导致小文件问题
    展开全文
  • kafka中offset使用原理

    千次阅读 2020-05-24 21:07:47
    在使用kafka时,从消费端来说,基本上大家在使用的时候,一般是通过一个消息监听器监听具体的topic以及对应的partition,接收消息即可,但有必要深入了解一下关于kafka的offset原理 kafka在设计上和其他的消息中间...

    前言

    在使用kafka时,从消费端来说,基本上大家在使用的时候,一般是通过一个消息监听器监听具体的topic以及对应的partition,接收消息即可,但有必要深入了解一下关于kafka的offset原理

    kafka在设计上和其他的消息中间其中有一个不同点是,kafka中存在一个offset的概念,即偏移量,而这个偏移量是需要消费端进行记录的,即producer将消息发到broker上之后,当某个消费者订阅了这个topic之后,consumer需要自己记录每次的消费位置,以便下次知道从哪个位置开始消费消息,这个即offset的来源,简单的原理图如下
    在这里插入图片描述
    既然消费者要知道自己每次的消费位移,那么对于消费者来说,就需要一种机制,提交每次的消费位移,以便各自的分区能够准确知道各分区中消息的位置如何

    对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。

    当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commitÿ

    展开全文
  • kafka 自定义存储offset 到mysql中

    千次阅读 2019-11-22 15:01:32
    kafka0.9版本之前,offset存储在zookeeper,0.9版本以及之后,默认offset存储在kafka的一个内置的topic中。除此之外,kafka还可以选择自定义存储offsetoffset的维护是相当繁琐的,因为需要考虑到消费者的...

    kafka0.9版本之前,offset存储在zookeeper,0.9版本以及之后,默认offset存储在kafka的一个内置的topic中。除此之外,kafka还可以选择自定义存储offset。

    offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalance。

    当有新的消费者加入消费者组、已有的消费者推出消费者组或者锁订阅的主题的分区发生变化,就会触发到分区的重新分区,重新分区的过程叫做Rebalance

    消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定为到每个分区最近提交的offset位置继续消费。

    要实现自定义存储offset,需要借助ConsumerRebalanceListener,其中提交和获取的offset的方法,需要根据所选的offset存储系统自行实现:

    1:定义producer

    object KafkaProducerTest {
      val props = new Properties()
      //    kafka集群 broker-list
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "cm01:9092,cm02:9092,cm03:9092")
      //    acks确认机制
      props.put(ProducerConfig.ACKS_CONFIG, "all")
      //    重试次数
      props.put(ProducerConfig.RETRIES_CONFIG, "1")
      //    批次大小
      props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
      //    等待时间
      props.put(ProducerConfig.LINGER_MS_CONFIG, "1")
      //    RecordAccumulator缓冲区大小
      props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
      //    key序列化
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
      //    value序列化
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    
      private val producer = new KafkaProducer[String, String](props)
    
      def generateHash(input: String): String = {
        val digest: MessageDigest = MessageDigest.getInstance("MD5")
        val random: Int = Random.nextInt(1000)
        digest.update((input + random).getBytes())
        val bytes: Array[Byte] = digest.digest()
        val bi = new BigInteger(1, bytes)
        val string: String = bi.toString(16)
        string.substring(0, 3) + input + random
      }
    
      def sendData = {
        val topic: String = "mysql_store_offset"
    
        producer.send(new ProducerRecord[String, String](
          topic,
          generateHash("mysql_store_offset"),
          s"${Random.nextInt(1000)}\t金锁家庭财产综合险(家顺险)\t1\t金锁家庭财产综合险(家顺险)\t213\t自住型家财险\t10\t家财保险\t44\t人保财险\t23:50.0"
        ), (metadata: RecordMetadata, exception: Exception) => {
          if (exception != null) {
            println(
              s"""
                 |----------------------------
                 |topic   partition   offset
                 |${metadata.topic()} ${metadata.partition()} ${metadata.offset()}
                 |----------------------------
                 |""".stripMargin)
          }
          else {
            exception.printStackTrace()
          }
        })
    
      }
    
      def main(args: Array[String]): Unit = {
        for (i <- 0 to 100){
          sendData
        }
        producer.close()
      }
    }

    2:传统的mysql操作类

    object DBUtils {
      private val URL: String = "jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
      private val DRIVER: String = "com.mysql.jdbc.Driver"
      private val USERNAME: String = "root"
      private val PASSWORD: String = "123456"
    
    
      /**
       * 获取特定消费者组,主题,分区下的偏移量
       *
       * @param consumer_group 消费者组
       * @param sub_topic      主题
       * @param partition_id   分区
       * @return offset
       */
      def query(consumer_group: String, sub_topic: String, partition_id: Int) = {
        Class.forName(DRIVER)
        val conn: Connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)
        var offset: Long = 0
        val sql: String = "select sub_topic_partition_offset from offset where consumer_group=? and sub_topic=? and sub_topic_partition_id=?"
        val ps: PreparedStatement = conn.prepareStatement(sql)
        ps.setString(1, consumer_group)
        ps.setString(2, sub_topic)
        ps.setInt(3, partition_id)
        val set: ResultSet = ps.executeQuery()
        while (set.next()) {
          offset = set.getLong("sub_topic_partition_offset")
        }
        set.close()
        ps.close()
        conn.close()
        offset
      }
    
      /**
       * 根据特定消费者组,主题,分区,更新偏移量
       *
       * @param offset [[Offset]]
       * @return
       */
      def update(offset: Offset) = {
        Class.forName(DRIVER)
        val conn: Connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)
        val sql: String = "replace into offset values(?,?,?,?,?)"
        val ps: PreparedStatement = conn.prepareStatement(sql)
        ps.setString(1, offset.consumer_group)
        ps.setString(2, offset.sub_topic)
        ps.setInt(3, offset.sub_topic_partition_id)
        ps.setLong(4, offset.sub_topic_partition_offset)
        ps.setString(5, offset.timestamp)
        ps.executeUpdate()
        ps.close()
        conn.close()
      }
    
      def main(args: Array[String]): Unit = {
        DBs.setupAll()
        //    query("", "", 0).foreach(println)
        update(Offset("test", "test", 0, 0, System.currentTimeMillis().toString))
        DBs.close()
      }
    }

    3:定义consumer

    object KafkaConsumerTest {
      private val group: String = "mysql_offset"
      private val topic: String = "mysql_store_offset"
      private val props = new Properties()
      //    kafka集群
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "cm01:9092,cm02:9092,cm03:9092")
      //    消费者组,只要group.id相同,就属于同一个消费者组
      props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
      //    自动提交offset
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
      //    key反序列化
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
      //    value反序列化
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
    
      lazy val consumer = new KafkaConsumer[String, String](props)
    
      def executor = {
        //    订阅主题
        consumer.subscribe(util.Arrays.asList(topic),
          new ConsumerRebalanceListener {
            //        重rebalance之前将记录进行保存
            override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
              partitions.forEach(partition => {
                //            获取分区
                val sub_topic_partition_id: Int = partition.partition()
                //            对应分区的偏移量
                val sub_topic_partition_offset: Long = consumer.position(partition)
                DBUtils.update(Offset(group, topic, sub_topic_partition_id, sub_topic_partition_offset, TimeUtils.tranTimeToString(System.currentTimeMillis().toString)))
              })
            }
    
            //rebalance之后读取之前的消费记录,继续消费
            override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
              partitions.forEach(partition => {
                val sub_topic_partition_id: Int = partition.partition()
                val offset = DBUtils.query(group, topic, sub_topic_partition_id)
                consumer.seek(partition, offset)
              })
            }
          })
        while (true) {
          val records: ConsumerRecords[String, String] = consumer.poll(100)
          val offsets = new util.ArrayList[Offset]()
          records.forEach(record => {
            offsets.add(Offset(group, topic, record.partition(), record.offset(), TimeUtils.tranTimeToString(record.timestamp().toString)))
            println(
              s"""
                 |---------------------------------------------------------------
                 |group       topic       partition       offset        timestamp
                 |$group  $topic  ${record.partition()} ${record.offset()}  ${record.timestamp()}
                 |---------------------------------------------------------------
                 |""".stripMargin)
          })
          offsets.forEach(offset => {
            DBUtils.update(offset)
          })
          offsets.clear()
        }
      }
    
      def main(args: Array[String]): Unit = {
        executor
      }
    }

    4:mysql数据截图

    使用replace的话,必须有相应的主键作为限制,不然起不到我们想要的目的

    replace:根据三个主键,先去查询是否存在三个主键对应值得存在,不存在的话直接insert,存在的话就覆盖

    根据需求,如果需要加入consumer_id的话,那就同样可以设置为4号主键,动态的数据不能设置为主键,动手尝试一下就知道其中的奥妙了

    不懂的地方都可以私信我,第一时间回复你(刷单,网络兼职,招聘IT讲师,猎头之类的不要私信我)

    展开全文
  • TypeError: can't subtract offset-naive and offset-aware datetimes 原来是两个相减的时间时区不一致 # -*- coding: utf-8 -*- from datetime import datetime import pytz now1 = datetime.now(tz=pytz.UTC) ...
  • Kafka提交offset机制

    千次阅读 2019-07-18 14:22:14
    在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。它好比...
  • offset offset 即偏移量,使用 offset 系列相关属性可以 动态的 获取该元素的位置(偏移)、大小等,如: 元素距离带有定位父元素的位置 获取元素自身的大小(宽度高度) 注:返回的数值不带单位 offset 系列...
  • Spark整合Kafka并手动维护offset

    千次阅读 2019-09-04 14:30:19
    //维护offset:为了方便我们对offset的维护/管理,spark提供了一个类,帮我们封装offset的数据 val offsetRanges : Array [ OffsetRange ] = rdd . asInstanceOf [ HasOffsetRanges ] . offsetRanges for ( ...
  • kafka 的 auto.offset.reset 含义详解

    万次阅读 多人点赞 2018-11-22 20:34:52
    最近也是有人问我kafka的auto.offset.reset设置为earliest后怎么结果和自己想象的不一样呢,相信很多人都对这个参数心存疑惑,今天来详细讲解一下: kafka-0.10.1.X版本之前:auto.offset.reset 的值为smallest,和,...
  • 对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。 单词“offset...
  • Kafka有offset的概念,offset记录每个groupId对于每个topic的每个partition里已经提交的读取位置。当comsumer程序失败重启时,可以从这个位置重新读取数据。 可以通过如下方法查看一个groupId的offset: root@h2:~# ...
  • flink手动维护kafka的offset

    千次阅读 2019-08-01 20:39:13
    手动维护offset的好处,你可以记录每个时间点的offset,如果上游日志异常,你可以把你记录的offset和时间戳拿出来,找出对应时间点的offset,去修复历史数据 不废话,写过spark的,看了我的代码就知道如何实现了,这...
  • kafka的offset理解

    万次阅读 2020-03-16 10:35:40
    kafka的offset理解 kafka是顺序读写,具备很好的吞吐量。实现原理是 每次生产消息时,都是往对应partition的文件中追加写入,而消息的被读取状态是由consumer来维护的 所以每个partition中offset一般都是连续递增的...
  • PostgreSQL LIMIT和OFFSET

    千次阅读 2019-12-04 17:36:23
    PostgreSQL中LIMIT和OFFSET关键字 LIMIT和OFFSET允许你只检索查询剩余部分产生的行的一部分: LIMIT : 限制取多少条数据。 OFFSET : 跳过多少条数据然后取后续数据。 LIMIT 和 OFFSET 关键字在查询时可以单独使用...
  • 汇编offset

    千次阅读 2020-02-06 17:24:27
    start:mov ax,offset start s: mov ax,offset s codeseg ends end start mov ax,offset start 相当于mov ax,0,因为start是代码段中的标号,他所标记的指令是代码中的第一条指令,偏移地址为0 mov ax,of...
  • I was googling and reading Kafka documentation but I couldn’t find out the max value of a consumer offset and whether there is offset wraparound after max value. I understand offset is an Int64 value...
  • Kafka自动提交offset设置

    千次阅读 2021-02-04 13:59:44
    kafka自动提交offset的频率,默认是5000ms,就是5s 如果将enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。 The frequency in milliseconds that the consumer offsets are ...
  • Kafka offset管理

    千次阅读 2019-01-10 09:40:37
    Kafka offset管理 消费者在消费的过程中需要记录自己消费了多少数据,即消费 Offset。Kafka Offset 是Consumer Position,与 Broker 和 Producer 都无关。每个 Consumer Group、每个 Topic 的每个Partition 都有...
  • Kafka Consumer重置Offset

    万次阅读 2019-06-25 17:59:46
    在Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。 在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作...
  • http://www.cnblogs.com/bitzhuwei/p/polygon-offset-for-stitching-andz-fighting.html 什么是stitching和z-fighting 在OpenGL中,如果想绘制一个多边形同时绘制其边界,可是先使用多边形模式GL_FILL绘制物体,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 674,985
精华内容 269,994
关键字:

offset