精华内容
下载资源
问答
  • kafka 命令行 创建topic 查看topic详情 生产消费数据,查看偏移量,修改分区偏移量(多方法),修改分区数量 1.知识点 1)Topic相关:创建Topic、删除Topic、查看Topic列表、查看Topic详细信息 2)生产相关:往...

    kafka 命令行 创建topic 查看topic详情 生产消费数据,查看偏移量,修改分区偏移量(多方法),修改分区数量

    1.知识点

    1)Topic相关:创建Topic、删除Topic、查看Topic列表、查看Topic详细信息

    2)生产者相关:往某个Topic中生产数据

    3)消费者相关:从某个Topic中消费数据

    4)消费组(group)相关:查看消费者group、查看消费者消费情况(消费至那个offset/积压数据量多少)

    5)修改topic下某个消费者(groupid)所有分区或某个分区的offset为任意指定偏移量

    2.实现命令

    以下命令若非特别说明均在kafka安装目录的bin目录下执行

    1)Topic相关:创建Topic、删除Topic、查看Topic详细信息、查看Topic列表、修改topic分区数

    TOP相关的命令执行连接zookeeper 端口2181

    创建Topic

    # 创建一个3分区1副本名为test的topic,必须指定分区数 --partitions 和副本数--replication-factor,其中副本数量不能超过kafka节点(broker)数量
    ./kafka-topics.sh --zookeeper localhost:2181  --topic test --partitions 3 --replication-factor 1 --create
    

    删除Topic

    # 删除名为test的topic
    # 删除topic时只有在kafka安装目录config目录下的server.properties中将delete.topic.enable 设置为true topic才会真实删除,否则只是标记为删除,实则不会删除
    ./kafka-topics.sh --zookeeper localhost:2181  --topic test  --delete
    

    查看某个Topic 分区 副本信息

    # 查看名为test的topic的详细信息,分区 副本的数量
    ./kafka-topics.sh --zookeeper localhost:2181  --topic test --describe
    

    查看有那些Topic

    # 查看kafka中创建了那些topic
    ./kafka-topics.sh  --zookeeper localhost:2181 --list
    

    修改topic的分区数

    # 将名为test的topic 修改为4个分区
    # 注意 分区数只能增加不能减少
    ./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 4 --topic test
    

    2)生产者相关:往某个topic中生产数据

    生产者相关命令执行连接broke-list 端口9092

    使用命令行往某个topic中写入数据

    # 使用命令行 给名为 test 的topic 中生产数据
    # 执行以下命令,然后在命令行中写入要发送kafka的数据回车即可发送数据到kafka
    ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

    如果执行上述命令报错:

    在这里插入图片描述

    则将命令中的localhost 更换为主机名即可。

    3)消费者相关:从某个topic中消费数据

    消费者相关可连接zookeeper 端口2181 或者bootstrap 端口9092, 0.8版本及以下版本kafka只能连接zookeeper,0.9版本及以上版本建议连接bootstrap ,但也可连接zookeeper

    消费某个topic中的最新数据

    # 0.8版本及以下的的kafka 使用如下命令test topic中的数据
    ./kafka-console-consumer.sh  --zookeeper localhost:2181 --topic test
    # 指定消费10条数据
    ./kafka-console-consumer.sh  --zookeeper localhost:2181 --topic test --max-messages 10
    # 0.9版本及以上的kafka建议使用如下命令进行消费,当然也可使用上一条命令消费
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
    

    消费某个topic中最老的数据

    # 0.8版本及以下的的kafka 使用如下命令test topic中的数据
    ./kafka-console-consumer.sh  --zookeeper localhost:2181 --topic test --from-beginning
    # 0.9版本及以上的kafka建议使用如下命令进行消费,当然也可使用上一条命令消费
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    消费某个topic中的数据并指定groupid

    # 在命令行消费某个topic中的数据通过/config/consumer.properties 配置文件指定groupid 
    # 0.8版本及以下的的kafka 使用如下命令test topic中的数据
    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config ../config/consumer.properties
    # 0.9版本及以上的kafka建议使用如下命令进行消费,当然也可使用上一条命令消费
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config ../config/consumer.properties
    

    将/config/consumer.properties配置文件中groupid对应的offset删除,该groupid重置为未使用状态

    # 使用这条命令会从最新消息开始消费,会将之前groupid记录的offset重置,并重新开始记录
    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config ../config/consumer.properties  --delete-consumer-offsets
    # 使用consumer.properties 不可以和--from-beginning一同使用 除非与--delete-consumer-offsets一同使用
    # 使用这条命令会从头开始消费数据,会将之前groupid记录的offset重置,并重新开始记录
    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config ../config/consumer.properties  --delete-consumer-offsets --from beginning
    

    4)消费组(group)相关:查看消费者group、查看消费者消费情况(消费至那个offset/积压数据量多少)

    查看有那些消费者group

    # 0.8版本及以下的的kafka 使用如下命令查看有那些消费者group
    ./kafka-consumer-groups.sh  --zookeeper localhost:2181 --list
    # 0.9版本及以上的kafka建议使用如下命令查看有那些消费者group,当然也可使用上一条命令消费
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    

    查看某个消费者消费情况(消息队列堆积情况)

    # 0.8版本及以下的的kafka 使用如下命令查看名为testgroup 的消费组的消费情况
    ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group testgroup --describe
    # 0.9版本及以上的的kafka 使用如下命令查看名为testgroup 的消费组的消费情况
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --describe
    

    5)修改某个消费组的偏移量(offset)

    通过zk客户端对topic的分区修改offset 为任意偏移量

    # 独立安装的zk,进入zookeeper安装目录的bin目录下,使用如下命令进入zk客户端
    ./zkCli.sh -server localhost:2181
    # 非独立安装的的zk, 直接在kafka安装目录bin目录下,使用如下命令进入zk客户端
    ./zookeeper-shell.sh  localhost:2181
    # 进入zk客户端后可查看某个分区的偏移量 例如名为test的topic的消费者组 test-consumer-group 0分区的offset的消费情况
    get /consumers/test-consumer-group/offsets/test/0
    # 设置名为test的topic的消费者组 test-consumer-group 0分区的offset 为1000
    set /consumers/test-consumer-group/offsets/test/0  1000
    

    通过kafka内置的kafka.tools.UpdateOffsetsInZK类实现修改某个topic 的消费组(config/consumer.properties中配置的groupid)的所有分区的偏移量为最新(latest)或者最旧(earliest)

    # 将名为test的topic的消费组(groupid必须从consumer.properties获取,即需要将需要修改的groupid写入consumer.properties配置文件)所有分区的offset设置为最早earliest
    ./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest ../config/consumer.properties test
    # 将名为test的topic的消费组(groupid必须从consumer.properties获取,即需要将需要修改的groupid写入consumer.properties配置文件)所有分区的offset设置为最新latest
    ./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest ../config/consumer.properties test
    

    0.11.0.0及以上版本修改偏移量可使用Kafka自带的kafka-consumer-groups.sh脚本

    # 以下可将--zookeeper localhost:2181 更换为--bootstrap-server localhost:9092 高版本的消费者建议连接bootstrap
    # 将test topic的消费组test-consumer-group的0分区的偏移量设置为最新
    ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test:0 --reset-offsets --to-earliest –execute
    # 将test topic的消费组test-consumer-group的0和1分区的偏移量设置为最旧
    ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test:0,1 --reset-offsets --to-latest –execute
    # 将test topic的消费组test-consumer-group的所有分区的偏移量设置为1000
    ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test --reset-offsets --to-offset 1000 –execute
    # --reset-offsets后可以跟的其他用法:--to-current:把位移调整到分区当前位移
    # --reset-offsets后可以跟的其他用法:--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
    # --reset-offsets后可以跟的其他用法:--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
    
    展开全文
  • 由于Zookeeper并不适合大批的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。...

    我们在kafka的log文件中发现了还有很多以 __consumer_offsets_的文件夹;总共50个;

    由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

    __consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。
    __consumer_offsets 的每条消息格式大致如图所示
    在这里插入图片描述
    可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。

    考虑到一个 kafka 生成环境中可能有很多consumerconsumer group,如果这些 consumer 同时提交位移,则必将加重 __consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions,从而将负载分散到不同的 __consumer_offsets 分区上。

    一般情况下,当集群中第一次有消费者消费消息时会自动创建__consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50。

    1. 消费Topic消息

    打开一个session a,执行下面的消费者命令 ;指定了消费组:szz1-group; topic:szz1-test-topic

    bin/kafka-console-consumer.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group --topic szz1-test-topic
    

    2.产生消息

    打开一个新的session b,执行生产消息命令

    bin/kafka-console-producer.sh --broker-list  xxx1:9092,xxx2:9092,xxx3:9092  --topic szz1-test-topic
    

    发送几条消息
    在这里插入图片描述

    然后可以看到刚刚打开的 session a 消费了消息;
    在这里插入图片描述

    3. 查看指定消费组的消费位置offset

    bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group
    

    在这里插入图片描述
    可以看到图中 展示了每个partition 对应的消费者id; 因为只开了一个消费者; 所以是这个消费者同时消费3个partition;
    CURRENT-OFFSET: 当前消费组消费到的偏移量
    LOG-END-OFFSET: 日志最后的偏移量
    CURRENT-OFFSET = LOG-END-OFFSET 说明当前消费组已经全部消费了;

    那么我把 session a 关掉;现在没有消费者之后; 我再发送几条消息看看;
    在这里插入图片描述
    我发送了2条消息之后, partition-0 partition-1LOG-END-OFFSET: 日志最后的偏移量分别增加了1; 但是CURRENT-OFFSET: 当前消费组消费到的偏移量 保持不变;因为没有被消费;

    重新打开一个消费组 继续消费*

    重新打开session之后, 会发现控制台输出了刚刚发送的2条消息; 并且偏移量也更新了
    在这里插入图片描述

    4. 从头开始消费 --from-beginning

    如果我们用新的消费组去消费一个Topic,那么默认这个消费组的offset会是最新的; 也就是说历史的不会消费
    例如下面我们新开一个session c ;消费组设置为szz1-group3

    bin/kafka-console-consumer.sh --bootstrap-server   xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group3    --topic szz1-test-topic
    

    查看消费情况

     bin/kafka-consumer-groups.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092  --describe --group szz1-group3
    

    在这里插入图片描述
    可以看到CURRENT-OFFSET = LOG-END-OFFSET ;

    如何让新的消费组/者 从头开始消费呢? 加上参数 --from-beginning

    5.如何确认 consume_group 在哪个__consumer_offsets-? 中

    Math.abs(groupID.hashCode()) % numPartitions

    6. 查找__consumer_offsets 分区数中的消费组偏移量offset

    上面的 3. 查看指定消费组的消费位置offset 中,我们知道如何查看指定的topic消费组的偏移量;
    那还有一种方式也可以查询

    先通过 consume_group 确定分区数; 例如 "szz1-group".hashCode()%50=32; 那我们就知道 szz-group消费组的偏移量信息存放在 __consumer_offsets_32中;
    通过命令

     bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 32 --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
    

    在这里插入图片描述
    前面的 是key 后面的是value;key由 消费组+Topic+分区数 确定; 后面的value就包含了 消费组的偏移量信息等等

    然后接着我们发送几个消息,并且进行消费; 上面的控制台会自动更新为新的offset;

    7 查询topic的分布情况

    bin/kafka-topics.sh --describe --zookeeper xxx:2181 --topic TOPIC名称
    
    展开全文
  • 1.概述 有这么一种情况,假设你的flink任务在运行中,但是,因为某一个时刻...# 0.8版本及以下的的kafka 使用如下命令查看有那些消费者group ./kafka-consumer-groups.sh --zookeeper localhost:2181 --list # 0.9版本.

    在这里插入图片描述

    1.概述

    有这么一种情况,假设你的flink任务在运行中,但是,因为某一个时刻,日志突然暴增,但是你的数据又不是特别重要的,比如日志数据,但是你的分析结果又需要实时性,此时可以丢弃一些数据,让数据保持实时,但是最好是flink任务不重启,该怎么做呢?

    2.查看消费组

    查看有那些消费者group

    # 0.8版本及以下的的kafka 使用如下命令查看有那些消费者group
    ./kafka-consumer-groups.sh  --zookeeper localhost:2181 --list
    # 0.9版本及以上的kafka建议使用如下命令查看有那些消费者group,当然也可使用上一条命令消费
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    

    但是这两种方案,都可能获取不到完整的消费组信息。

    3.查看LAG

    查看某个消费者消费情况(消息队列堆积情况)

    # 0.8版本及以下的的kafka 使用如下命令查看名为testgroup 的消费组的消费情况
    ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group testgroup --describe
    
    # 0.9版本及以上的的kafka 使用如下命令查看名为testgroup 的消费组的消费情况
    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --describe
    

    4.groupid重置为未使用状态

    将/config/consumer.properties配置文件中groupid对应的offset删除,该groupid重置为未使用状态

    使用这条命令会从最新消息开始消费,会将之前groupid记录的offset重置,并重新开始记录

    
    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config ../config/consumer.properties  --delete-consumer-offsets
    
    
    # 使用consumer.properties 不可以和--from-beginning一同使用 除非与--delete-consumer-offsets一同使用
    # 使用这条命令会从头开始消费数据,会将之前groupid记录的offset重置,并重新开始记录
    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config ../config/consumer.properties  --delete-consumer-offsets --from beginning
    

    5.修改某个消费组的偏移量

    5.1 zk修改

    通过zk客户端对topic的分区修改offset 为任意偏移量

    独立安装的zk,进入zookeeper安装目录的bin目录下,使用如下命令进入zk客户端

    ./zkCli.sh -server localhost:2181
    

    非独立安装的的zk, 直接在kafka安装目录bin目录下,使用如下命令进入zk客户端

    ./zookeeper-shell.sh  localhost:2181
    
    

    #进入zk客户端后可查看某个分区的偏移量 例如名为test的topic的消费者组 test-consumer-group 0分区的offset的消费情况

    get /consumers/test-consumer-group/offsets/test/0
    
    

    设置名为test的topic的消费者组 test-consumer-group 0分区的offset 为1000

    set /consumers/test-consumer-group/offsets/test/0  1000
    
    

    注意
    有些情况下,消费组不存在,此时我们可以直接在zk中,创建消费组,然后更改它的offset,也能实现。

    5.2 UpdateOffsetsInZK

    通过kafka内置的kafka.tools.UpdateOffsetsInZK类实现修改某个topic 的消费组(config/consumer.properties中配置的groupid)的所有分区的偏移量为最新(latest)或者最旧(earliest)

    #将名为test的topic的消费组(groupid必须从consumer.properties获取,即需要将需要修改的groupid写入consumer.properties配置文件)所有分区的offset设置为最早earliest

    ./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest ../config/consumer.properties test
    
    

    将名为test的topic的消费组(groupid必须从consumer.properties获取,即需要将需要修改的groupid写入consumer.properties配置文件)所有分区的offset设置为最新latest

    ./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest ../config/consumer.properties test
    

    5.3 脚本

    0.11.0.0及以上版本修改偏移量可使用Kafka自带的kafka-consumer-groups.sh脚本

    以下可将–zookeeper localhost:2181 更换为–bootstrap-server localhost:9092 高版本的消费者建议连接bootstrap

    将test topic的消费组test-consumer-group的0分区的偏移量设置为最新

    ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test:0 --reset-offsets --to-earliest –execute
    

    将test topic的消费组test-consumer-group的0和1分区的偏移量设置为最旧

    ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test:0,1 --reset-offsets --to-latest –execute
    

    将test topic的消费组test-consumer-group的所有分区的偏移量设置为1000

    ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test --reset-offsets --to-offset 1000 –execute
    
    1. --reset-offsets后可以跟的其他用法:--to-current:把位移调整到分区当前位移
    2. --reset-offsets后可以跟的其他用法:--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
    3. --reset-offsets后可以跟的其他用法:--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
    展开全文
  • Kafka提交偏移量的五种方式

    千次阅读 2019-02-01 17:31:55
    讨论偏移量我们首先要知道如何查看偏移量消费者目前消费的偏移量 ./kafka-consumer-groups.sh --describe --bootstrap-server 192.168.153.128:9092 --group ConsumerGroup3 Consumer group 'ConsumerGroup3' ...

    讨论偏移量我们首先要知道如何查看偏移量及消费者目前消费的偏移量

    ./kafka-consumer-groups.sh --describe --bootstrap-server 192.168.153.128:9092 --group ConsumerGroup3

    Consumer group 'ConsumerGroup3' has no active members:可以看到我们这个消费者组现在是没有消费者的

    test_topic这个topic其他分区没有消息,只有分区2有消费记录:

    CURRENT-OFFSET:目前已被消费者消费的消息偏移量

    LOG-END-OFFSET:消息最大偏移量

    LAG:消息堆积的条数

    1、自动提交方式

    这种提交方式有两个很重要的参数:

    enable.auto.commit=true(是否开启自动提交,true or false)

    auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)

    每隔5秒,消费者会自动把从poll方法接收到的最大偏移量提交上去。自动提交是在轮询中进行,消费者每次轮询时都会检查是否提交该偏移量。可是这种情况会发生重复消费和丢失消息的情况。

    重复消费:如果我们设auto.commit.interval.ms=60000,16:34首次提交偏移量62,此时又拉取了2条消息,此时分区2对应的消费者宕机,发生了分区再均衡,(分区的所有权从一个消费者转到另一个消费者被称为再均衡。一般新增消费者,消费者关闭或改变分区数都会发生再均衡)分区2的消息由另一个消费者消费,新的消费者会读取16:34提交的那个偏移量,这样就会发生重复消费了,我们来实践一下:

    我们开两个消费端consumer1和consumer2:

    private static final String TOPIC_NAME = "test_topic";
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers",
                    "192.168.153.128:9092,192.168.153.128:9093,192.168.153.128:9094");
            props.put("group.id", "ConsumerGroup3");
            /* 是否自动确认offset */
            props.put("enable.auto.commit", "true");
            /* 自动确认offset的时间间隔 */
            props.put("auto.commit.interval.ms", "60000");
            props.put("session.timeout.ms", "30000");
    //        props.put("auto.offset.reset", "earliest");
            props.put("auto.offset.reset", "latest");
    
            // 序列化类
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    
            try {
                for (; ; ) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records)
                        System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            } finally {
                consumer.close();
            }
        }

    生产者:

            producer = new KafkaProducer<>(props);
            for (int i = 0; i < 2; i++) {
    	        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2,
    	                "key01", "连衣裙" + i);
    	        try {
    	            RecordMetadata result = producer.send(record).get();
    	            System.out.printf("同步发送:%s,分区:%d,offset:%d\n", result.topic(),
    	                    result.partition(), result.offset());
    	        } catch (Exception e) {
    	            e.printStackTrace();
    	        }
            }

    生产者生产了2条数据:

    同步发送:test_topic,分区:2,offset:62
    同步发送:test_topic,分区:2,offset:63

    consumer2消费的消息

    2019-01-31 17:02:17 消费消息:topic=test_topic, partition=2, offset=62, key=key01, value=连衣裙0
    2019-01-31 17:02:17 消费消息:topic=test_topic, partition=2, offset=63, key=key01, value=连衣裙1

    此时我们关闭consumer2,去服务器执行./kafka-consumer-groups.sh --describe --bootstrap-server 192.168.153.128:9092 --group ConsumerGroup3

    我们再看consumer1消费的消息

    2019-01-31 17:02:50 消费消息:topic=test_topic, partition=2, offset=62, key=key01, value=连衣裙0
    2019-01-31 17:02:50 消费消息:topic=test_topic, partition=2, offset=63, key=key01, value=连衣裙1

    可以看到发生了重复消费。

    丢失消息:消费者一次poll100条新消息,并且提交了偏移量,此时消费者还没处理完,就宕机了,又发生了再均衡,由另一个消费者消费该分区的消息,新的消费者会读取旧消费者最后一次提交的偏移量,此时就会发生消息丢失了。我们来实践一下:

    生产者:

          for (int i = 0; i < 10; i++) {
    	        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 2,
    	                "key01", "连衣裙" + i);
    	        try {
    	            RecordMetadata result = producer.send(record).get();
    	            System.out.printf("同步发送:%s,分区:%d,offset:%d\n", result.topic(),
    	                    result.partition(), result.offset());
    	        } catch (Exception e) {
    	            e.printStackTrace();
    	        }
            }

    生产10条消息:

    同步发送:test_topic,分区:2,offset:208
    同步发送:test_topic,分区:2,offset:209
    同步发送:test_topic,分区:2,offset:210
    同步发送:test_topic,分区:2,offset:211
    同步发送:test_topic,分区:2,offset:212
    同步发送:test_topic,分区:2,offset:213
    同步发送:test_topic,分区:2,offset:214
    同步发送:test_topic,分区:2,offset:215
    同步发送:test_topic,分区:2,offset:216
    同步发送:test_topic,分区:2,offset:217

    consumer2:

    for (; ; ) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        if (!records.isEmpty()){
           System.out.println("consumer2消费了这批消息!");
        }
        for (ConsumerRecord<String, String> record : records){
            new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					Thread.sleep(60000);//用阻塞来模拟数据处理过程所消耗的时间
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    			    }
    			System.out.printf(DateUtil.getDate() + " 消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
    					                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    	        }
    	   }).start();
        }
    }
    consumer2消费了这批消息!
    consumer2消费了这批消息!
    consumer2消费了这批消息!
    consumer2消费了这批消息!
    consumer2消费了这批消息!
    consumer2消费了这批消息!

    可见consumer2已经接收到了这批消息但并未进行逻辑处理,因为提交偏移量的周期时间比处理数据的时间小,所以已经提交了偏移量,此时我们关闭consumer2,再发送10条消息

    同步发送:test_topic,分区:2,offset:218
    同步发送:test_topic,分区:2,offset:219
    同步发送:test_topic,分区:2,offset:220
    同步发送:test_topic,分区:2,offset:221
    同步发送:test_topic,分区:2,offset:222
    同步发送:test_topic,分区:2,offset:223
    同步发送:test_topic,分区:2,offset:224
    同步发送:test_topic,分区:2,offset:225
    同步发送:test_topic,分区:2,offset:226
    同步发送:test_topic,分区:2,offset:227

    我们看一下consumer1

    for (; ; ) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        if (!records.isEmpty()){
            System.out.println("consumer1消费了这批消息!");
        }
        for (ConsumerRecord<String, String> record : records){
            new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					Thread.sleep(60000);//用阻塞来模拟数据处理过程所消耗的时间
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				System.out.printf(DateUtil.getDate() + " 消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
    					                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    			}
    		}).start();
        }
    }
    consumer1消费了这批消息!
    consumer1消费了这批消息!
    consumer1消费了这批消息!
    consumer1消费了这批消息!
    consumer1消费了这批消息!
    consumer1消费了这批消息!
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=220, key=key01, value=连衣裙2
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=218, key=key01, value=连衣裙0
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=219, key=key01, value=连衣裙1
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=221, key=key01, value=连衣裙3
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=222, key=key01, value=连衣裙4
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=223, key=key01, value=连衣裙5
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=225, key=key01, value=连衣裙7
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=224, key=key01, value=连衣裙6
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=227, key=key01, value=连衣裙9
    2019-02-32 09:27:13 消费消息:topic=test_topic, partition=2, offset=226, key=key01, value=连衣裙8

    其实我们不看consumer1可能也知道208到217之间的那10条消息其实已经丢失了。consumer1是从220开始消费的。

    由此我们知道自动提交的方式是有弊端的,如果你是同步处理数据,再均衡时很容易发生消息重复,如果你是异步处理数据,则易发生数据丢失,这都是我们不想看到的。

    2、提交当前偏移量

    enable.auto.commit=false,用commitSync()提交由poll方法返回的最新偏移量,如果提交成功马上返回,提交失败则抛出异常。

    for (; ; ) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        if (!records.isEmpty()){
            System.out.println("consumer1消费了这批消息!");
        }
        for (ConsumerRecord<String, String> record : records){
    		System.out.printf(DateUtil.getDate() + " 消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
    			                record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }

    3、异步提交

    手动提交不足之处在于提交请求后,broker响应之前应用程序会一直阻塞。这样就会限制应用程序的吞吐量。虽然可以通过降低提交频率来提升吞吐量,但一旦发生再均衡,会增加重复消息的数量。

    for (; ; ) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        if (!records.isEmpty()){
           System.out.println("consumer1消费了这批消息!");
        }
        for (ConsumerRecord<String, String> record : records){
    	    System.out.printf(DateUtil.getDate() + " 消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
    			                record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitAsync();
    }

    异步提交还可以有回调

    for (; ; ) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        if (records.isEmpty()){
           continue;
        }
        for (ConsumerRecord<String, String> record : records){
    		System.out.printf(DateUtil.getDate() + " 消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
    			                record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitAsync(new OffsetCommitCallback() {
    					
    		@Override
    		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e)       
            {
    			if (e != null) {
    				System.out.println(offsets.toString());
    				System.out.println(e.toString());
    			}
    		}
    	});
    }

    4、同步异步组合提交

    如果提交失败发生在关闭消费者或者再均衡前的最后一次提交,那么就要确保提交能够成功。这个时候就需要使用同步异步组合提交。

    try {
        for (; ; ) {
           ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
           if (records.isEmpty()){
               continue;
           }
           for (ConsumerRecord<String, String> record : records){
    		   System.out.printf(DateUtil.getDate() + " 消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
    			                record.topic(), record.partition(), record.offset(), record.key(), record.value());
           }
           consumer.commitAsync();
        }
    } finally {
       try{
          consumer.commitSync();
       } finally{
          consumer.close();
       }
    }

    5、提交特定偏移量

    如果想要在批次中间提交偏移量,消费者API允许在调用commitSync和commitAsync时传递希望提交的分区和偏移量。

    Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    int count = 0; 		
    try {
        for (; ; ) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            if (records.isEmpty()){
               continue;
            }
            for (ConsumerRecord<String, String> record : records){
    			System.out.printf(DateUtil.getDate() + " 消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
    			                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    			currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset(), "no metadata"));
    			if (count%1000==0) {
    				consumer.commitAsync(currentOffsets, null);
    			}
    			count++;
            }
        }
    } finally {
        try{
            consumer.commitSync();
        } finally{
            consumer.close();
        }
    }

    参考文献的文章写得很好,大家可以去看看。

    参考文献:https://blog.csdn.net/ljheee/article/details/81605754

    展开全文
  • kafka消费者

    2018-05-10 17:13:08
    kafka客户端从kafka集群消费消息(记录)。它会透明地处理kafka集群中服务器的故障。它获取集群内数据的分区,也和服务器进行交互,允许消费者组进行负载平衡消费。...偏移量消费者的位置kafka为每个分...
  • kafka客户端 kafkatool.

    2020-07-21 18:17:24
    kafka客户端,方便查看提供者和消费者的数据,有关消费者偏移量,一目了然,有二级制和UTF-8模式查看消费者内容 查看消费者组
  • spark消费kafka的两种方式 直连方式的两种 自动和手动 自动 自动偏移量维护kafka 0.10 之前的...查看记录消费者偏移量的路径 _consumer_offsets 案例: 注:先启动zookeeper 再启动kafka集群 命...
  • kafka直连方式消费多个topic

    万次阅读 多人点赞 2018-11-14 22:48:34
    一个消费者组可以消费多个topic,以前写过一篇一个消费者消费一个topic的,这次的是一个消费者组通过直连方式消费多个topic,做了小测试,结果是正确的,通过查看zookeeper的客户端,zookeeper记录了偏移量 ...
  • 一、Kafka工具 Kafka工具是用于管理和使用Apache Kafka®集群的... 查看消费者偏移量,包括Apache Storm Kafkaspout消费者 以漂亮的格式显示JSON和XML消息 添加和删除主题以及其他管理功能 将单个消息从您的分.
  • kafka命令行操作

    2021-03-09 17:11:46
    kafka命令行操作zookeeper管理命令启动zookeeper查看状态重新启动停止zookeeperkafka命令行操作启动关闭topic系列(2181是zookeeper的端口)查看当前所有topic新建topic删除topic详情topic增加topic分区数目查看...
  • kafka 维护 消费偏移量的 情况2.1 查看某个topic被那几个group消费2.2 查看指定 group.id 的消费者消费情况查看成员信息查看状态3. zookeeper 维护 消费偏移量的 情况 1.视界 1.概述 我们先启动消费者,再启动生产...
  • kafka常用shell命令

    2020-05-11 17:43:45
    kakfa常用命令,kafka服务启动和停止命令,创建topic命令,查看topic列表,查看topic最大偏移量,控制台发送和接收消息,查看某个消费者组的当前偏移量和最大偏移量,未消费数据量
  • kafka集群搭建

    2021-04-29 23:56:18
    1 规划2 Zookeeper集群准备3 安装包准备4 安装4.1 解压4.2 配置环境变量4.3修改server.properties4.4 同步Kafka安装目录5 ...Kafka集群6 测试6.1创建主题6.2 创建生产者6.3 创建消费6.4 动态消费6.5 查看消费者偏移量...
  • Kafka UI – Kafka的免费Web UI Kafka UI是一个免费的开源Web UI,用于监视和管理Apache Kafka集群。...查看消费者组—查看按分区停放的偏移量,合并和按分区滞后 浏览消息-使用JSON,纯文本和Avro编码浏览消息 动态
  • 查看数据,偏移量,键,时间戳和标题 使用架构注册表自动编码的Avro消息反序列化 配置视图 日志视图 删除记录 清空主题(删除一个主题中的所有记录) 排序视图 按分区过滤 开始时间过滤 使用搜索字符串过滤数据 ...
  • 显示消费者偏移量和滞后历史以及消费者/生产者消息吞吐量历史的图表。 最新发布的主题消息(要求Web浏览器支持WebSocket) 此外,控制台还提供了描述的JSON API。 可以使用可通过URL http:// [主机名] : [端口]...
  • 查看kafka消息列表 创建topic 删除topic 消息生产 消息消费 查看某个topic详情 修改分区数 三.可能会用到的其它命令 通过命令行的方式修改Kafka topic配置 快速定位某个topic异常的分区 消费端从任意指定的偏移量...
  • kafka日常之重要操作

    2020-09-24 22:50:37
    brokerZookeeper 配置Kafka 配置topicKafka 创建 topic查看当前有多少个 tpoic启动生产者产生消息启动消费者读取消息查看某个 topic 详细信息查看所有 topic 详细信息获取目前对应topic的offset获取消费组状态和用到...
  • 1、启动集群每个节点的进程:  2、创建 topic  3、查看已经创建的所有 kafka topic  ...6、查看某 topic 某个分区的偏移量最大值和最小值  7、增加 topic 分区数  8、删除 Topic  ...
  • 查看消费者组-每个分区的停放偏移量,合并延迟和每个分区滞后 创建新主题 查看ACL 支持Azure事件中心 要求 Java 11或更高版本 Kafka(版本0.11.0或更高版本)或Azure Event Hub 可选的附加集成: 架构注册表 入门...
  • OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户身打造,我们会为OCE企业提供更好的技术支持,比如专属的技术沙龙、企业一对一的交流机会、专属的答疑群等,如果贵司Logi-KafkaManager上了生产,快...
  • Kafka-shell常用操作

    2019-09-04 01:57:53
    kafka 的shell操作(基于topic的增删改查 shell) 目录 1. 启动节点的Kafka进程 2.... 3.... 查看分区偏移量 8. 增加topic分区数 9. 删除topic 10. 查看队列信息 11. 查看指定队列消费数据 1....
  • 一个小应用程序来监视kafka消费者的进度和它们的延迟的队列。 KafkaOffsetMonitor是用来实时监控Kafka集群中的consumer以及在队列中的位置(偏移量)。 你可以查看当前的消费者组,每个topic队列的所有partition...
  • OCE是一个认证机制和交流平台,为滴滴Logi-KafkaManager生产用户身打造,我们会为OCE企业提供更好的技术支持,比如专属的技术沙龙、企业一对一的交流机会、专属的答疑群等,如果贵司Logi-KafkaManager上了生产,快...
  • 新版本消费者(支持0.9版本+) 查看某一个具体的Topic 修改Topic 删除Topic 查询topic每个partition的偏移量 创建topic $KAFKA_HOME/bin/kafka-topics.sh --create --topic channel.test.HelloWorld --...

空空如也

空空如也

1 2
收藏数 38
精华内容 15
关键字:

查看kafka消费者偏移量