精华内容
下载资源
问答
  • 2022-03-18 17:44:59

    Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

    • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

    • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

    消费者组初始化流程

    1、coordinator:辅助实现消费者组的初始化和分区的分配

    coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量)

    1)每个consumer都 发送JoinGroup请求

    2)选出一个 consumer作为leader

    3)把要消费的topic情况 发送给leader 消费者

    4)leader会负 责制定消费方案

    5)把消费方案发给coordinator

    6)Coordinator就把消费方 案下发给各个consumer

    7)每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的时间过长(max.poll.interval.ms5分钟),也 会触发再平衡

    注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id

    代码实现

    public class CustomConsumer {
         public static void main(String[] args) {
             // 1.创建消费者的配置对象
             Properties properties = new Properties();
             // 2.给消费者配置对象添加参数
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.6.100:9092");
             // 配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
             // 配置消费者组(组名任意起名) 必须
             properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
             // 创建消费者对象
             KafkaConsumer<String, String> kafkaConsumer = new 
            KafkaConsumer<String, String>(properties);
             // 注册要消费的主题(可以消费多个主题)
             ArrayList<String> topics = new ArrayList<>();
             topics.add("first");
             kafkaConsumer.subscribe(topics);
             // 拉取数据打印
             while (true) {
             // 设置 1s 中消费一批数据
             ConsumerRecords<String, String> consumerRecords = 
            kafkaConsumer.poll(Duration.ofSeconds(1));
             // 打印消费到的数据
             for (ConsumerRecord<String, String> consumerRecord : 
            consumerRecords) {
             System.out.println(consumerRecord);
             }
             }
         }
    }

    指定消费某一分区

    public class CustomConsumer {
         public static void main(String[] args) {
             // 1.创建消费者的配置对象
             Properties properties = new Properties();
             // 2.给消费者配置对象添加参数
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.6.100:9092");
             // 配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
    
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
             // 配置消费者组(必须),名字可以任意起
             properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
             KafkaConsumer<String, String> kafkaConsumer = new 
            KafkaConsumer<>(properties);
             // 消费某个主题的某个分区数据
             ArrayList<TopicPartition> topicPartitions = new 
            ArrayList<>();
             topicPartitions.add(new TopicPartition("first", 0));
             kafkaConsumer.assign(topicPartitions);
             while (true){
             ConsumerRecords<String, String> consumerRecords = 
            kafkaConsumer.poll(Duration.ofSeconds(1));
             for (ConsumerRecord<String, String> consumerRecord : 
            consumerRecords) {
             System.out.println(consumerRecord);
             }
            }
       }
    }
    

    分区分配策略

    当启动多个消费者消费时,消费者会自动分区消费,每个消费者消费对应分区的内容

    Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略

    Range 是对每个 topic 而言的。 首先对同一个 topic 里面的分区按照序号进行排序,并 对消费者按照字母顺序进行排序

    假如现在有 7 个分区,3 个消费者,排序后的分区将会 是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2,通过 partitions数/consumer数 来决定每个消费者应该 消费几个分区。如果除不尽,那么前面几个消费者将会多 消费 1 个分区。

    RoundRobin 以及再平衡

    RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。

    再平衡

    停止掉 0 号消费者,重新发送消息(45s 以内)

    0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成不同区域,由其他消费者执行

    (45s 以后)消费者 0 已经被踢出消费者组

    // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFI
    G, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

    Sticky 以及再平衡

    粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分 区不变化。

    类似于range,但粘性分区是随机的

    // 修改分区分配策略
    ArrayList<String> startegys = new ArrayList<>();
    startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    startegys);

    offset 位移

    offset存在系统主题里,避免与zookeeper交互,带来大量的网络通信

    查看系统主题内容,在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,

    默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false

    消费者

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.100:9092 --topic test --group test

    查看消费者消费主题__consumer_offsets

    bin/kafka-console-consumer.sh --topic 
    __consumer_offsets --bootstrap-server 192.168.100:9092192.168.100:9092 --consumer.config config/consumer.properties --formatter 
    "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm
    atter" --from-beginning

    __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据

    自动提交offset

    enable.auto.commit:是否开启自动提交offset功能,默认是true

    auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

    // 是否自动提交 offset
     properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
    true);
     // 提交 offset 的时间周期 1000ms,默认 5s
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
    1000);

    手动提交 offset

    手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相 同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成 功,并且会自动失败重试;而异步提交则没有失败重试机制,故 有可能提交失败。

    • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。

    • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

    同步提交 offset

    public class CustomConsumerByHandSync {
         public static void main(String[] args) {
             // 1. 创建 kafka 消费者配置类
             Properties properties = new Properties();
             // 2. 添加配置参数
             // 添加连接
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "192.168.6.100:9092");
             // 配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            "org.apache.kafka.common.serialization.StringDeserializer");
    
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            "org.apache.kafka.common.serialization.StringDeserializer");
             // 配置消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
             // 是否自动提交 offset
             properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
            false);
             //3. 创建 kafka 消费者
             KafkaConsumer<String, String> consumer = new 
            KafkaConsumer<>(properties);
             //4. 设置消费主题 形参是列表
             consumer.subscribe(Arrays.asList("first"));
             //5. 消费数据
             while (true){
             // 读取消息
             ConsumerRecords<String, String> consumerRecords = 
            consumer.poll(Duration.ofSeconds(1));
             // 输出消息
             for (ConsumerRecord<String, String> consumerRecord : 
            consumerRecords) {
             System.out.println(consumerRecord.value());
             }
             // 同步提交 offset
             consumer.commitSync();
             }
         }
    }

    异步提交 offset

    // 异步提交 offset
     consumer.commitAsync();

    指定 Offset 消费

    auto.offset.reset = earliest | latest | none 默认是 latest

    当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量 时(例如该数据已被删除),该怎么办?

    (1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。

    (2)latest(默认值):自动将偏移量重置为最新偏移量。

    (3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

    任意指定 offset 位移开始消费

    public class CustomConsumerSeek {
         public static void main(String[] args) {
             // 0 配置信息
                 Properties properties = new Properties();
                 // 连接
                 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                "192.168.6.100:9092");
                 // key value 反序列化
    
                properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                StringDeserializer.class.getName());
    
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                StringDeserializer.class.getName());
                 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
                 // 1 创建一个消费者
                 KafkaConsumer<String, String> kafkaConsumer = new 
                KafkaConsumer<>(properties);
                // 2 订阅一个主题
                 ArrayList<String> topics = new ArrayList<>();
                 topics.add("first");
                 kafkaConsumer.subscribe(topics);
                 Set<TopicPartition> assignment= new HashSet<>();
                 while (assignment.size() == 0) {
                 kafkaConsumer.poll(Duration.ofSeconds(1));
                 // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
                 assignment = kafkaConsumer.assignment();
                 }
                 // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
                 for (TopicPartition tp: assignment) {
                 kafkaConsumer.seek(tp, 1700);
                 }
                 // 3 消费该主题数据
                 while (true) {
                 ConsumerRecords<String, String> consumerRecords = 
                kafkaConsumer.poll(Duration.ofSeconds(1));
                 for (ConsumerRecord<String, String> consumerRecord : 
                consumerRecords) {
                 System.out.println(consumerRecord);
                 }
             }
         }
    }

    注意:每次执行完,需要修改消费者组名;

    指定时间消费

    需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。 例如要求按照时间消费前一天的数据,怎么处理?

    public class CustomConsumerForTime {
         public static void main(String[] args) {
             // 0 配置信息
            Properties properties = new Properties();
             // 连接
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "192.168.6.100:9092");
             // key value 反序列化
    
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
    
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
             properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
             // 1 创建一个消费者
             KafkaConsumer<String, String> kafkaConsumer = new 
            KafkaConsumer<>(properties);
             // 2 订阅一个主题
             ArrayList<String> topics = new ArrayList<>();
             topics.add("first");
             kafkaConsumer.subscribe(topics);
             Set<TopicPartition> assignment = new HashSet<>();
             while (assignment.size() == 0) {
             kafkaConsumer.poll(Duration.ofSeconds(1));
             // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
             assignment = kafkaConsumer.assignment();
             }
             HashMap<TopicPartition, Long> timestampToSearch = new 
            HashMap<>();
             // 封装集合存储,每个分区对应一天前的数据
             for (TopicPartition topicPartition : assignment) {
             timestampToSearch.put(topicPartition, 
            System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
             }
             // 获取从 1 天前开始消费的每个分区的 offset
             Map<TopicPartition, OffsetAndTimestamp> offsets = 
            kafkaConsumer.offsetsForTimes(timestampToSearch);
             // 遍历每个分区,对每个分区设置消费时间。
             for (TopicPartition topicPartition : assignment) {
             OffsetAndTimestamp offsetAndTimestamp = 
            offsets.get(topicPartition);
             // 根据时间指定开始消费的位置
             if (offsetAndTimestamp != null){
             kafkaConsumer.seek(topicPartition, 
            offsetAndTimestamp.offset());
             }
         }
        // 3 消费该主题数据
         while (true) {
             ConsumerRecords<String, String> consumerRecords = 
            kafkaConsumer.poll(Duration.ofSeconds(1));
             for (ConsumerRecord<String, String> consumerRecord : 
            consumerRecords) {
             System.out.println(consumerRecord);
             }
             }
         }
    }
    

    漏消费和重复消费

    重复消费:已经消费了数据,但是 offset 没提交。 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

    场景1:重复消费。自动提交offset引起

    1) Consumer 每5s提交offse

    2)如果提交offset后的2s,consumer挂了

    3)再次重启consumer,则从上一次提交的 offset处继续消费,导致重复消费

    漏消费

    设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线 程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失

    消费者事务

    如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质

    数据积压

    1)如果是Kafka消费能力不足,则可以考虑增 加Topic的分区数,并且同时提升消费组的消费者 数量,消费者数 = 分区数

    2)如果是下游的数据处理不及时:提高每批次拉取的数 量。批次拉取数据过少(拉取数据/处理时间 < 生产速度), 使处理的数据小于生产的数据,也会造成数据积压

    更多相关内容
  • 公司Ubuntu12.04服务器突然开机起来后就卡住不动,打印kernel offset信息 目前已尝试以下方法,均失败 1:开机启动选择Ubuntu选项时,选择recovery mode启动,仍然卡住,显示kernel offset信息 2:在开机选项...
  • 主要介绍了springboot中如何实现kafa指定offset消费,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 主要介绍了使用limit,offset分页场景时为什么会慢,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • Kafka:Consumer手动提交offset

    千次阅读 2022-02-14 13:52:22
    在上一篇博客的测试样例中,Consumer都是自动提交offset,这是通过下面的配置来生效: // 开启offset自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 消费者offset自动提交到...

    在上一篇博客中介绍了使用Consumer订阅多个Topic或者多个Partition

    在上一篇博客的测试样例中,Consumer都是自动提交offset,这是通过下面的配置来生效:

            // 开启offset自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            // 消费者offset自动提交到Kafka的频率(以毫秒为单位)
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    

    如果Consumer手动提交offset,则需要将ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG设置为false

    测试代码

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.kaven</groupId>
        <artifactId>kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.0.0</version>
            </dependency>
        </dependencies>
    </project>
    

    创建Topic

    package com.kaven.kafka.admin;
    
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.common.KafkaFuture;
    
    import java.util.Collections;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutionException;
    
    public class Admin {
    
        // 基于Kafka服务地址与请求超时时间来创建AdminClient实例
        private static final AdminClient adminClient = Admin.getAdminClient(
                "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094",
                "40000");
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            Admin admin = new Admin();
            // 创建Topic,Topic名称为topic1,分区数为1,复制因子为1
            admin.createTopic("topic1", 1, (short) 1);
            // 创建Topic,Topic名称为topic2,分区数为2,复制因子为1
            admin.createTopic("topic2", 2, (short) 1);
            // 创建Topic,Topic名称为topic3,分区数为2,复制因子为1
            admin.createTopic("topic3", 2, (short) 1);
            Thread.sleep(10000);
        }
    
        public static AdminClient getAdminClient(String address, String requestTimeoutMS) {
            Properties properties = new Properties();
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address);
            properties.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMS);
            return AdminClient.create(properties);
        }
    
        public void createTopic(String name, int numPartitions, short replicationFactor) throws InterruptedException {
            CountDownLatch latch = new CountDownLatch(1);
            CreateTopicsResult topics = adminClient.createTopics(
                    Collections.singleton(new NewTopic(name, numPartitions, replicationFactor))
            );
            Map<String, KafkaFuture<Void>> values = topics.values();
            values.forEach((name__, future) -> {
                future.whenComplete((a, throwable) -> {
                    if(throwable != null) {
                        System.out.println(throwable.getMessage());
                    }
                    System.out.println(name__);
                    latch.countDown();
                });
            });
            latch.await();
        }
    }
    

    Producer发布消息:

    package com.kaven.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class ProducerTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            send("topic1");
            send("topic2");
            send("topic3");
        }
    
        public static void send(String name) throws ExecutionException, InterruptedException {
            Producer<String, String> producer = ProducerTest.createProducer();
            for (int i = 0; i < 7; i++) {
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
                        name,
                        "key-" + i,
                        "value-" + i
                );
                // 异步发送并回调
                producer.send(producerRecord, (metadata, exception) -> {
                    if(exception == null) {
                        System.out.printf("topic: %s, partition: %s, offset: %s\n", name, metadata.partition(), metadata.offset());
                    }
                    else {
                        exception.printStackTrace();
                    }
                });
            }
            // 要关闭Producer实例
            producer.close();
        }
    
        public static Producer<String, String> createProducer() {
            // Producer的配置
            Properties properties = new Properties();
            // 服务地址
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
            // KEY的序列化器类
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            // VALUE的序列化器类
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            return new KafkaProducer<>(properties);
        }
    }
    

    不提交offset

    Consumer订阅程序:

    package com.kaven.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.*;
    
    import java.time.Duration;
    import java.util.*;
    
    public class ConsumerTest {
    
        public static void main(String[] args) {
            commit(Arrays.asList("topic1", "topic2", "topic3"));
        }
    
        public static void commit(List<String> topicList) {
            KafkaConsumer<String, String> consumer = createConsumer();
            consumer.subscribe(topicList);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
                records.forEach((record) -> {
                    System.out.printf("topic: %s, partition: %s, offset: %s, key: %s, value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                });
            }
        }
    
        public static KafkaConsumer<String, String> createConsumer() {
            // Consumer的配置
            Properties properties = new Properties();
            // 服务地址
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
            // 组ID,用于标识此消费者所属的消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kaven-test");
            // 关闭offset自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            // 消费者offset自动提交到Kafka的频率(以毫秒为单位)
            // properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            // KEY的反序列化器类
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            // VALUE的反序列化器类
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
            return new KafkaConsumer<>(properties);
        }
    }
    

    先创建Topic,然后运行Consumer订阅程序,目前还没有消息发布:
    在这里插入图片描述
    通过Producer发布消息到Topic上:

    topic: topic1, partition: 0, offset: 56
    topic: topic1, partition: 0, offset: 57
    topic: topic1, partition: 0, offset: 58
    topic: topic1, partition: 0, offset: 59
    topic: topic1, partition: 0, offset: 60
    topic: topic1, partition: 0, offset: 61
    topic: topic1, partition: 0, offset: 62
    topic: topic2, partition: 0, offset: 32
    topic: topic2, partition: 0, offset: 33
    topic: topic2, partition: 0, offset: 34
    topic: topic2, partition: 0, offset: 35
    topic: topic2, partition: 1, offset: 24
    topic: topic2, partition: 1, offset: 25
    topic: topic2, partition: 1, offset: 26
    topic: topic3, partition: 1, offset: 24
    topic: topic3, partition: 1, offset: 25
    topic: topic3, partition: 1, offset: 26
    topic: topic3, partition: 0, offset: 32
    topic: topic3, partition: 0, offset: 33
    topic: topic3, partition: 0, offset: 34
    topic: topic3, partition: 0, offset: 35
    

    此时Consumer就可以订阅到消息了,输出如下所示:

    topic: topic2, partition: 1, offset: 24, key: key-0, value: value-0
    topic: topic2, partition: 1, offset: 25, key: key-3, value: value-3
    topic: topic2, partition: 1, offset: 26, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 56, key: key-0, value: value-0
    topic: topic1, partition: 0, offset: 57, key: key-1, value: value-1
    topic: topic1, partition: 0, offset: 58, key: key-2, value: value-2
    topic: topic1, partition: 0, offset: 59, key: key-3, value: value-3
    topic: topic1, partition: 0, offset: 60, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 61, key: key-5, value: value-5
    topic: topic1, partition: 0, offset: 62, key: key-6, value: value-6
    topic: topic2, partition: 0, offset: 32, key: key-1, value: value-1
    topic: topic2, partition: 0, offset: 33, key: key-2, value: value-2
    topic: topic2, partition: 0, offset: 34, key: key-5, value: value-5
    topic: topic2, partition: 0, offset: 35, key: key-6, value: value-6
    topic: topic3, partition: 1, offset: 24, key: key-0, value: value-0
    topic: topic3, partition: 1, offset: 25, key: key-3, value: value-3
    topic: topic3, partition: 1, offset: 26, key: key-4, value: value-4
    topic: topic3, partition: 0, offset: 32, key: key-1, value: value-1
    topic: topic3, partition: 0, offset: 33, key: key-2, value: value-2
    topic: topic3, partition: 0, offset: 34, key: key-5, value: value-5
    topic: topic3, partition: 0, offset: 35, key: key-6, value: value-6
    

    因为上次消费没有提交offset,再次运行Consumer订阅程序,会重复消费消息:

    topic: topic2, partition: 0, offset: 32, key: key-1, value: value-1
    topic: topic2, partition: 0, offset: 33, key: key-2, value: value-2
    topic: topic2, partition: 0, offset: 34, key: key-5, value: value-5
    topic: topic2, partition: 0, offset: 35, key: key-6, value: value-6
    topic: topic3, partition: 1, offset: 24, key: key-0, value: value-0
    topic: topic3, partition: 1, offset: 25, key: key-3, value: value-3
    topic: topic3, partition: 1, offset: 26, key: key-4, value: value-4
    topic: topic2, partition: 1, offset: 24, key: key-0, value: value-0
    topic: topic2, partition: 1, offset: 25, key: key-3, value: value-3
    topic: topic2, partition: 1, offset: 26, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 56, key: key-0, value: value-0
    topic: topic1, partition: 0, offset: 57, key: key-1, value: value-1
    topic: topic1, partition: 0, offset: 58, key: key-2, value: value-2
    topic: topic1, partition: 0, offset: 59, key: key-3, value: value-3
    topic: topic1, partition: 0, offset: 60, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 61, key: key-5, value: value-5
    topic: topic1, partition: 0, offset: 62, key: key-6, value: value-6
    topic: topic3, partition: 0, offset: 32, key: key-1, value: value-1
    topic: topic3, partition: 0, offset: 33, key: key-2, value: value-2
    topic: topic3, partition: 0, offset: 34, key: key-5, value: value-5
    topic: topic3, partition: 0, offset: 35, key: key-6, value: value-6
    

    提交offset

    修改commit方法:

        public static void commit(List<String> topicList) {
            KafkaConsumer<String, String> consumer = createConsumer();
            consumer.subscribe(topicList);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
                records.forEach((record) -> {
                    System.out.printf("topic: %s, partition: %s, offset: %s, key: %s, value: %s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                });
                consumer.commitSync();
            }
        }
    

    因为上一次消费没有提交offset,运行Consumer订阅程序还是会消费上一次消费过的消息,但这次消费之后提交了offset(通过consumer.commitSync())。

    topic: topic2, partition: 1, offset: 24, key: key-0, value: value-0
    topic: topic2, partition: 1, offset: 25, key: key-3, value: value-3
    topic: topic2, partition: 1, offset: 26, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 56, key: key-0, value: value-0
    topic: topic1, partition: 0, offset: 57, key: key-1, value: value-1
    topic: topic1, partition: 0, offset: 58, key: key-2, value: value-2
    topic: topic1, partition: 0, offset: 59, key: key-3, value: value-3
    topic: topic1, partition: 0, offset: 60, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 61, key: key-5, value: value-5
    topic: topic1, partition: 0, offset: 62, key: key-6, value: value-6
    topic: topic2, partition: 0, offset: 32, key: key-1, value: value-1
    topic: topic2, partition: 0, offset: 33, key: key-2, value: value-2
    topic: topic2, partition: 0, offset: 34, key: key-5, value: value-5
    topic: topic2, partition: 0, offset: 35, key: key-6, value: value-6
    topic: topic3, partition: 1, offset: 24, key: key-0, value: value-0
    topic: topic3, partition: 1, offset: 25, key: key-3, value: value-3
    topic: topic3, partition: 1, offset: 26, key: key-4, value: value-4
    topic: topic3, partition: 0, offset: 32, key: key-1, value: value-1
    topic: topic3, partition: 0, offset: 33, key: key-2, value: value-2
    topic: topic3, partition: 0, offset: 34, key: key-5, value: value-5
    topic: topic3, partition: 0, offset: 35, key: key-6, value: value-6
    

    再次运行Consumer订阅程序,就不会消费上次消费过的消息了。

    基于Partition提交offset

    Consumer订阅程序:

    package com.kaven.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.*;
    
    public class ConsumerTest {
    
        public static void main(String[] args) {
            commitWithPartition(Arrays.asList("topic1", "topic2", "topic3"));
        }
        
        public static void commitWithPartition(List<String> topicList) {
            KafkaConsumer<String, String> consumer = createConsumer();
            consumer.subscribe(topicList);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
                records.partitions().forEach((partition) -> {
                    List<ConsumerRecord<String, String>> recordsWithPartition = records.records(partition);
                    recordsWithPartition.forEach((record) -> {
                        System.out.printf("topic: %s, partition: %s, offset: %s, key: %s, value: %s\n",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    });
                    // 为了测试效果,只提交分区1的offset
                    if(partition.partition() == 1) {
                        // 更新offset,即上一次最大的offset + 1
                        long newOffset = recordsWithPartition.get(recordsWithPartition.size() - 1).offset() + 1;
                        Map<TopicPartition, OffsetAndMetadata> newOffsetMap = new HashMap<>();
                        newOffsetMap.put(partition, new OffsetAndMetadata(newOffset));
                        consumer.commitSync(newOffsetMap);
                    }
                });
            }
        }
    
        public static KafkaConsumer<String, String> createConsumer() {
            // Consumer的配置
            Properties properties = new Properties();
            // 服务地址
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.9:9092,192.168.1.9:9093,192.168.1.9:9094");
            // 组ID,用于标识此消费者所属的消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kaven-test");
            // 关闭offset自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            // 消费者offset自动提交到Kafka的频率(以毫秒为单位)
            // properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            // KEY的反序列化器类
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            // VALUE的反序列化器类
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
            return new KafkaConsumer<>(properties);
        }
    }
    

    运行Consumer订阅程序,然后使用Producer发布消息:

    topic: topic1, partition: 0, offset: 63
    topic: topic1, partition: 0, offset: 64
    topic: topic1, partition: 0, offset: 65
    topic: topic1, partition: 0, offset: 66
    topic: topic1, partition: 0, offset: 67
    topic: topic1, partition: 0, offset: 68
    topic: topic1, partition: 0, offset: 69
    topic: topic2, partition: 1, offset: 27
    topic: topic2, partition: 1, offset: 28
    topic: topic2, partition: 1, offset: 29
    topic: topic2, partition: 0, offset: 36
    topic: topic2, partition: 0, offset: 37
    topic: topic2, partition: 0, offset: 38
    topic: topic2, partition: 0, offset: 39
    topic: topic3, partition: 1, offset: 27
    topic: topic3, partition: 1, offset: 28
    topic: topic3, partition: 1, offset: 29
    topic: topic3, partition: 0, offset: 36
    topic: topic3, partition: 0, offset: 37
    topic: topic3, partition: 0, offset: 38
    topic: topic3, partition: 0, offset: 39
    

    此时Consumer就可以订阅到消息了,输出如下所示:

    topic: topic2, partition: 0, offset: 36, key: key-1, value: value-1
    topic: topic2, partition: 0, offset: 37, key: key-2, value: value-2
    topic: topic2, partition: 0, offset: 38, key: key-5, value: value-5
    topic: topic2, partition: 0, offset: 39, key: key-6, value: value-6
    topic: topic3, partition: 1, offset: 27, key: key-0, value: value-0
    topic: topic3, partition: 1, offset: 28, key: key-3, value: value-3
    topic: topic3, partition: 1, offset: 29, key: key-4, value: value-4
    topic: topic2, partition: 1, offset: 27, key: key-0, value: value-0
    topic: topic2, partition: 1, offset: 28, key: key-3, value: value-3
    topic: topic2, partition: 1, offset: 29, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 63, key: key-0, value: value-0
    topic: topic1, partition: 0, offset: 64, key: key-1, value: value-1
    topic: topic1, partition: 0, offset: 65, key: key-2, value: value-2
    topic: topic1, partition: 0, offset: 66, key: key-3, value: value-3
    topic: topic1, partition: 0, offset: 67, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 68, key: key-5, value: value-5
    topic: topic1, partition: 0, offset: 69, key: key-6, value: value-6
    topic: topic3, partition: 0, offset: 36, key: key-1, value: value-1
    topic: topic3, partition: 0, offset: 37, key: key-2, value: value-2
    topic: topic3, partition: 0, offset: 38, key: key-5, value: value-5
    topic: topic3, partition: 0, offset: 39, key: key-6, value: value-6
    

    再运行Consumer订阅程序,Consumer还可以消费到上次没有提交offset的消息(都是分区0中的消息):

    topic: topic1, partition: 0, offset: 63, key: key-0, value: value-0
    topic: topic1, partition: 0, offset: 64, key: key-1, value: value-1
    topic: topic1, partition: 0, offset: 65, key: key-2, value: value-2
    topic: topic1, partition: 0, offset: 66, key: key-3, value: value-3
    topic: topic1, partition: 0, offset: 67, key: key-4, value: value-4
    topic: topic1, partition: 0, offset: 68, key: key-5, value: value-5
    topic: topic1, partition: 0, offset: 69, key: key-6, value: value-6
    topic: topic3, partition: 0, offset: 36, key: key-1, value: value-1
    topic: topic3, partition: 0, offset: 37, key: key-2, value: value-2
    topic: topic3, partition: 0, offset: 38, key: key-5, value: value-5
    topic: topic3, partition: 0, offset: 39, key: key-6, value: value-6
    topic: topic2, partition: 0, offset: 36, key: key-1, value: value-1
    topic: topic2, partition: 0, offset: 37, key: key-2, value: value-2
    topic: topic2, partition: 0, offset: 38, key: key-5, value: value-5
    topic: topic2, partition: 0, offset: 39, key: key-6, value: value-6
    

    Consumer手动提交offset就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

    展开全文
  • kafka 自动与手动管理offset

    千次阅读 2022-03-13 10:41:25
    kafka 自动与手动管理offset

    前言

    offset顾名思义,即偏移量,我们知道消息从生产者发送到kafka的topic之后,是进入到不同的分区,在consumer未对消息进行消费之前,消息是有序存储在各个分区中;

    offset内部原理

    在之前我们了解了kafka的消费者原理之后,提出这样一个疑问,kafka怎么知道某个消费组中的消费者消费消息的进度呢?

    1、从0.9版本开始,consumer默认将offset保存在Kafka ,一个内置的topic中,该topic为__consumer_offsets;
    2、 Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中;
    这也就是说,kafka是通过 offset这个值来管理消费组消费进度的,下面是一张关于kafka的offset的原理图;

    关于offset做下面几点补充:

    • __consumer_offsets 主题里面采用 key value 的方式存储数据;
    • key 是 group.id+topic+ 分区号,value 就是当前 offset 的值;
    • 每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据;

    默认情况下,保存offset数据的系统主题是看不到的,为了查看该系统主题数据,要将下面这个参数修改为false

    exclude.internal.topics=false【在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false】

    自动提交 offset

    为了使我们能够专注于自己的业务逻辑, Kafka 提供了自动提交 offset的功能,自动提交 offset 的相关参数:
    1、enable.auto.commit  是否开启自动提交 offset 功能,默认是 true;
    默认值为 true ,消费者会自动周期性地向服务器提交偏移量
    2、auto.commit.interval.ms  自动提交 offset 的时间间隔,默认是 5s;
    如果设置了 enable.auto.commit 的值为 true , 则该值定义了消 费者偏移量向 Kafka 提交的频率, 默认 5s

     

    代码展示
    producer 端代码
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class OffsetProducer1 {
    
        public static void main(String[] args) throws Exception {
    
            // 1. 创建 kafka 生产者的配置对象
            Properties properties = new Properties();
            // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // 3. 创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            System.out.println("开始发送数据");
            // 4. 调用 send 方法,发送消息
            for (int i = 0; i < 15; i++) {
                kafkaProducer.send(new ProducerRecord<>("zcy234","congge " + i));
            }
            // 5. 关闭资源
            kafkaProducer.close();
        }
    
    }

    consumer 端代码
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class OffsetConsumer1 {
    
        public static void main(String[] args) {
            // 1. 创建 kafka 消费者配置类
            Properties properties = new Properties();
            // 2. 添加配置参数
            // 添加连接
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
    
            // 配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group2");
            // 是否自动提交 offset
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    
            // 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    
            //3. 创建 kafka 消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    
            //4. 设置消费主题 形参是列表
            consumer.subscribe(Arrays.asList("zcy234"));
    
            System.out.println("准备开始消费数据");
            //5. 消费数据
            while (true){
                // 读取消息
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                // 输出消息
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.value());
                }
            }
    
        }
    
    }

    核心的代码即添加下面这两行配置

    // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

    // 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

    运行上面的程序,效果上面和之前差不多,

     

     

    手动提交 offset

    虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset API,关于手动提交offset,做如下几点说明:
    • 手动提交offset的方法有两种:分别是commitSync(同步提交)commitAsync(异步提交);
    • 两者的相 同点是,都会将本次提交的一批数据最高的偏移量提交
    • 不同点是,同步提交阻塞当前线程,一直到提交成 功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故 有可能提交失败;

    commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据;

    commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了;

     

    同步提交 offset

    由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
    交的效率比较低。
    下面看同步提交offset的consumer的完整代码:
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class SyncConsumer1 {
    
        public static void main(String[] args) {
            // 1. 创建 kafka 消费者配置类
            Properties properties = new Properties();
            // 2. 添加配置参数
            // 添加连接
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
    
            // 配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group3");
            // 是否自动提交 offset
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    
            // 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    
            //3. 创建 kafka 消费者
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    
            // 是否自动提交 offset
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            //3. 创建 kafka 消费者
            //4. 设置消费主题 形参是列表
            consumer.subscribe(Arrays.asList("zcy234"));
            System.out.println("准备开始消费数据");
    
            //5. 消费数据
            while (true){
                // 读取消息
                ConsumerRecords<String, String> consumerRecords =
                        consumer.poll(Duration.ofSeconds(1));
                // 输出消息
                for (ConsumerRecord<String, String> consumerRecord :
                        consumerRecords) {
                    System.out.println(consumerRecord.value());
                }
    
                // 同步提交 offset
                consumer.commitSync();
            }
    
        }
    
    }
    

    仍然使用上面的producer向zcy234这个topic发送几条消息,观察消费端控制台输出情况,仍然可以正常消费到消息;

     

    异步提交 offset
    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
    下面是完整的代码
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class AsyncConsumer1 {
    
        public static void main(String[] args) {
            // 1. 创建 kafka 消费者配置类
            Properties properties = new Properties();
            // 2. 添加配置参数
            // 添加连接
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
    
            // 配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            // 配置消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group5");
            // 是否自动提交 offset
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            //3. 创建 Kafka 消费者
            KafkaConsumer<String, String> consumer = new
                    KafkaConsumer<>(properties);
            //4. 设置消费主题 形参是列表
            consumer.subscribe(Arrays.asList("zcy234"));
            //5. 消费数据
            while (true) {
                // 读取消息
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                // 输出消息
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.value());
                }
                // 异步提交 offset
                consumer.commitAsync();
            }
        }
    }

    仍然使用上面的producer向zcy234这个topic发送几条消息,观察消费端控制台输出情况,仍然可以正常消费到消息;

     

    指定 Offset 消费

    kafka中消费者在消费数据时的offset的机制有3种,默认情况下为latest,即从最近的那一次的位置开始消费;

    auto.offset.reset = earliest | latest | none 默认是 latest

    Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
    时(例如该数据已被删除),该怎么办?
    1、 earliest :自动将偏移量重置为最早的偏移量, --from-beginning;
    2、latest (默认值) :自动将偏移量重置为最新偏移量;
    3、none :如果未找到消费者组的先前偏移量,则向消费者抛出异常;

     

    于是在实际业务中可能会遇到这么一种场景,即新的消费者并不想消费最早的那一批消息,而是指定从某个offset位置开始消费;

    下面看具体的consumer端代码:

    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.*;
    
    public class SpecialOffsetConsumer1 {
    
        public static void main(String[] args) {
            // 0 配置信息
            Properties properties = new Properties();
            // 连接
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
            // key value 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group6");
    
            // 1 创建一个消费者
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅一个主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("zcy234");
            kafkaConsumer.subscribe(topics);
            Set<TopicPartition> assignment = new HashSet<>();
    
            // 获取消费者分区分配信息(有了分区分配信息才能开始消费),避免开始消费的时候分区信息还未就绪
            while (assignment.size() == 0) {
                kafkaConsumer.poll(Duration.ofSeconds(1));
                assignment = kafkaConsumer.assignment();
            }
    
            // 遍历所有分区,并指定 offset 从 5 的位置开始消费
            for (TopicPartition tp : assignment) {
                kafkaConsumer.seek(tp, 5);
            }
    
            System.out.println("准备开始消费数据");
            // 3 消费该主题数据
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    
    }

    运行这段代码,然后再次使用上面的producer发送消息,观察控制台输出效果,可以看到,数据消费的offset的位置从5开始

     

    指定时间消费
    需求:在生产环境中,比如说遇到最近消费的某一段时间的数据有异常,想重新按照时间消费?或者要求按照时间消费前一天的数据,怎么处理?

    下面看具体的代码处理

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.*;
    
    public class SpecialTimeConsumer1 {
    
        public static void main(String[] args) {
            // 0 配置信息
            Properties properties = new Properties();
            // 连接
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "101.34.23.80:9092");
            // key value 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group7");
    
            // 创建一个消费者
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            // 订阅主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("zcy234");
            kafkaConsumer.subscribe(topics);
            Set<TopicPartition> assignment = new HashSet<>();
            while (assignment.size() == 0) {
                kafkaConsumer.poll(Duration.ofSeconds(1));
                // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
                assignment = kafkaConsumer.assignment();
            }
            HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
            // 封装集合存储,每个分区对应一天前的数据
            for (TopicPartition topicPartition : assignment) {
                //用当前时间减去业务上需要回退的时间,比如这里想重新消费24个小时之前的数据
                timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
            }
    
            // 获取从 1 天前开始消费的每个分区的 offset
            Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
    
            // 遍历每个分区,对每个分区设置消费时间。
            for (TopicPartition topicPartition : assignment) {
                OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
                // 根据时间指定开始消费的位置
                if (offsetAndTimestamp != null) {
                    kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
                }
            }
        }
    }

    展开全文
  • 指定offset/指定时间消费Kafka消息

    千次阅读 2021-10-12 19:42:56
    例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者的offset到之前某一时间的数值,然后...

    一、 指定offset重新消费Kafka消息

    首先创建kafka消费服务

    @Service
    @Slf4j
    //实现CommandLineRunner接口,在springboot启动时自动运行其run方法。
    public class TspLogbookAnalysisService implements CommandLineRunner {
        @Override
        public void run(String... args) {
            //do something
        }
    }
    

    kafka消费模型建立

    kafka server中每个主题存在多个分区(partition),每个分区自己维护一个偏移量(offset),我们的目标是实现kafka consumer指定offset消费。

    在这里使用consumer-->partition一对一的消费模型,每个consumer各自管理自己的partition。

    kafka consumer partition

    @Service
    @Slf4j
    public class TspLogbookAnalysisService implements CommandLineRunner {
        //声明kafka分区数相等的消费线程数,一个分区对应一个消费线程
        private  static final int consumeThreadNum = 9;
        //特殊指定每个分区开始消费的offset
        private List<Long> partitionOffsets = Lists.newArrayList(1111,1112,1113,1114,1115,1116,1117,1118,1119);
       
        private ExecutorService executorService = Executors.newFixedThreadPool(consumeThreadNum);
    
        @Override
        public void run(String... args) {
            //循环遍历创建消费线程
            IntStream.range(0, consumeThreadNum)
                    .forEach(partitionIndex -> executorService.submit(() -> startConsume(partitionIndex)));
        }
    }
    

    kafka consumer对offset的处理

    声明kafka consumer的配置类:

    private Properties buildKafkaConfig() {
        Properties kafkaConfiguration = new Properties();
        kafkaConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        kafkaConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "");
        kafkaConfiguration.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
        kafkaConfiguration.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");
        kafkaConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "");
        kafkaConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "");
        kafkaConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"");
        kafkaConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");
        ...更多配置项
    
        return kafkaConfiguration;
    }
    

    创建kafka consumer,处理offset,开始消费数据任务:

    private void startConsume(int partitionIndex) {
        //创建kafka consumer
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(buildKafkaConfig());
    
        try {
            //指定该consumer对应的消费分区
            TopicPartition partition = new TopicPartition(kafkaProperties.getKafkaTopic(), partitionIndex);
            consumer.assign(Lists.newArrayList(partition));
    
            //consumer的offset处理
            if (collectionUtils.isNotEmpty(partitionOffsets)  &&  partitionOffsets.size() == consumeThreadNum) {
                Long seekOffset = partitionOffsets.get(partitionIndex);
                log.info("partition:{} , offset seek from {}", partition, seekOffset);
                consumer.seek(partition, seekOffset);
            }
            
            //开始消费数据任务
            kafkaRecordConsume(consumer, partition);
        } catch (Exception e) {
            log.error("kafka consume error:{}", ExceptionUtils.getFullStackTrace(e));
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
    

    消费数据逻辑,offset操作

    private void kafkaRecordConsume(KafkaConsumer<String, byte[]> consumer, TopicPartition partition) {
        while (true) {
            try {
                ConsumerRecords<String, byte[]> records = consumer.poll(TspLogbookConstants.POLL_TIMEOUT);
                //具体的处理流程
                records.forEach((k) -> handleKafkaInput(k.key(), k.value()));
    
                //🌿很重要:日志记录当前consumer的offset,partition相关信息(之后如需重新指定offset消费就从这里的日志中获取offset,partition信息)
                if (records.count() > 0) {
                    String currentOffset = String.valueOf(consumer.position(partition));
                    log.info("current records size is:{}, partition is: {}, offset is:{}", records.count(), consumer.assignment(), currentOffset);
                }
        
                //offset提交        
                consumer.commitAsync();
            } catch (Exception e) {
                log.error("handlerKafkaInput error{}", ExceptionUtils.getFullStackTrace(e));
            }
        }
    }
    

    二、指定时间消费历史Kafka消息并统计数量

    package com.lxk.weber.utils;
    
    /**
     * @author lixk
     * @create 2021/10/12
     */
    
    import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.UUID;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    
    
    public class ConsumeOffsetByTime {
        public static void main(String[] args) {
            /**
             * ************使用说明*******************
             * 需要5个参数:
             * 1.Server
             * 2.Topic
             * 3.开始时间,格式yyyyMMddHHmmss
             * 4.结束时间,格式yyyyMMddHHmmss
             * 5.输出文件
             * 例:java -jar ConsumeOffsetByTime .jar [Server] [Topic] 20201128090600 20201128090659 output.txt
             */
            if (args.length < 5) {
                System.out.println("***********使用说明*****************");
                System.out.println("1.Server");
                System.out.println("2.Topic");
                System.out.println("3.开始时间,格式yyyyMMddHHmmss");
                System.out.println("4.结束时间,格式yyyyMMddHHmmss");
                System.out.println("5.输出文件");
                System.out.println("java -jar ConsumeOffsetByTime .jar [Server] [Topic] 20201128090600 20201128090659 output.txt");
                
            } else {
                try {
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
                    long begintime = sdf.parse(args[2]).getTime();
                    long endtime = sdf.parse(args[3]).getTime();
                    consumerByTime(args[0], UUID.randomUUID().toString(), args[1],
                            begintime, endtime, args[4]);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        static void consumerByTime(String server, String groupid, String topic, long begintime, long endtime, String outfile) {
            Properties props = new Properties();
            props.put("bootstrap.servers", server);
            props.put("group.id", groupid);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
            BufferedWriter bw = null;
            try {
                boolean isBreak;
                List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
                List<TopicPartition> topicPartitions = new ArrayList<>();
                Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
                long fetchDataTime = begintime;
                for (PartitionInfo partitionInfo : partitionInfos) {
                    topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                    timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), Long.valueOf(fetchDataTime));
                }
    
                // 分配一个指定topic的分区
                consumer.assign(topicPartitions);
                // 设置时间戳
                Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
                DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                OffsetAndTimestamp offsetTimestamp = null;
    
                // 设置各分区的起始偏移量
                Iterator<Map.Entry<TopicPartition, OffsetAndTimestamp>> iterator = map.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<TopicPartition, OffsetAndTimestamp> entry = iterator.next();
                    offsetTimestamp = entry.getValue();
                    if (offsetTimestamp != null) {
                        int partition = entry.getKey().partition();
                        long timestamp = offsetTimestamp.timestamp();
                        long offset = offsetTimestamp.offset();
                        System.out.println("partition = " + partition + ", time = " + df.format(new Date(timestamp)) + ", offset = " + offset);
                        consumer.seek(entry.getKey(), offset);
                    }
                }
    
                // 拉取数据
                do {
                    isBreak = false;
                    ConsumerRecords<String, String> records = consumer.poll(1000L);
                    bw = new BufferedWriter(new FileWriter(outfile, true));
                    System.out.println("拉取中: "+ records.count());
                    for (ConsumerRecord<String, String> record : records) {
                        bw.write(String.valueOf(record.value()) + "\r\n");
    
                        // 设置截止时间
                        if (record.timestamp() > endtime) {
                            isBreak = true;
                        }
                    }
                    bw.flush();
                    bw.close();
                } while (!isBreak);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    }
    
    

    展开全文
  • kafka 调整offset

    千次阅读 2021-11-29 14:39:37
    和业务部门协商确定: 可以丢弃中间未消费的数据,将指定offset(指定消费组消费topic)调整到最新值; 【kafka-消费堆积】 告警时间: 2020.04.02 10:52:22 主机名称: slhd1.op-paas.kafka001 [主题] : heartbeat_sync...
  • 消费者提交位移(offset),是消费者往一个名为`_consumer_offset`的特殊主题发送消息,消息中包含每个分区的位移量。它记录了 Consumer 要消费的下一条消息的位移。**切记是下一条消息的位移**,而不是目前最新消费...
  • } // 从最小的offset开始消费 case CONSUME_FROM_FIRST_OFFSET: { // 但是这里还是先从broker中读取offset,如果读取成功,就返回读取的offset,此时CONSUME_FROM_FIRST_OFFSET 无效 long lastOffset = offsetStore...
  • kafka不同分区重置offset

    千次阅读 2022-02-08 20:02:49
    1、因代码问题导致消费者在使用过程中逻辑不是幂等的,造成数据库插入数据,单kafka不提交offset,当再次启动kafka时,存在重复消费的情况,造成数据库流入脏数据 2、在修复数据的过程中,因kafka的offset错误重置,...
  • Kafka偏移量(Offset)管理

    千次阅读 2020-03-12 19:29:20
    1.定义 Kafka中的每个partition都由一系列有序的、不...Offset记录着下一条将要发送给Consumer的消息的序号。 流处理系统常见的三种语义: 最多一次 每个记录要么处理一次,要么根本不处理 至少一次 这...
  • 1、Kafka Offset 管理–Checkpoint 启用Spark Streaming的checkpoint是存储偏移量最简单的方法。 流式checkpoint专门用于保存应用程序的状态, 比如保存在HDFS上, 在故障时能恢复。 Spark Streaming的checkpoint...
  • kafka手动提交offset以及offset回滚

    千次阅读 2022-01-10 18:50:06
    当 auto.commit.enable 设置为false时,表示kafak的offset由消费者手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式; ackMode有以下7种值: public enum AckMode // 当每一条记录被消费者...
  • 什么叫偏移地址偏移地址就是计算机里的内存分段后,在段...关于偏移量(Offset)的概念,不仅仅只是在 C 语言中可以体现,实际上再各种编程语言中都是可以体现出来的。 通俗地讲,偏移(Offset)就是指从当前位置为起点...
  • offset在C语言中使用是什么意思啊??请教 Excel VBA Range.offset 的用法Range.Offset 属性 返回 Range 对象,它代表位于指定单元格区域的一定的偏移量位置上的区域。 语法 表达式.Offset(RowOffset, ColumnOffset) ...
  • 获取Kafka Consumer的offset

    千次阅读 2021-03-25 10:36:04
    从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的...
  • kafka中offset使用原理

    千次阅读 2020-05-24 21:07:47
    在使用kafka时,从消费端来说,基本上大家在使用的时候,一般是通过一个消息监听器监听具体的topic以及对应的partition,接收消息即可,但有必要深入了解一下关于kafka的offset原理 kafka在设计上和其他的消息中间...
  • TypeError: can't subtract offset-naive and offset-aware datetimes 原来是两个相减的时间时区不一致 # -*- coding: utf-8 -*- from datetime import datetime import pytz now1 = datetime.now(tz=pytz.UTC) ...
  • Flink消费kafka的offset设置

    千次阅读 2022-04-19 17:44:47
    在使用Flink自带的Kafka消费API时,我们可以像单纯的使用Kafka消费对象API对其进行相应的属性设置,例如,读取offset的方式、设置offset的方式等。但是,Flink具有checkpoint功能,保存各运算算子的状态,也包括消费...
  • offset offset 即偏移量,使用 offset 系列相关属性可以 动态的 获取该元素的位置(偏移)、大小等,如: 元素距离带有定位父元素的位置 获取元素自身的大小(宽度高度) 注:返回的数值不带单位 offset 系列...
  • 或许大家都知道,消费者端手动提交offset嘛。那么具体代码该怎么写呢?本文就基于springboot来进行消费者手动提交offset的试验。 配置 application.yml spring: kafka: # 指定 kafka 地址可以多个 bootstrap-...
  • kafka 自定义存储offset 到mysql中

    千次阅读 2019-11-22 15:01:32
    kafka0.9版本之前,offset存储在zookeeper,0.9版本以及之后,默认offset存储在kafka的一个内置的topic中。除此之外,kafka还可以选择自定义存储offsetoffset的维护是相当繁琐的,因为需要考虑到消费者的...
  • 一篇学会使用 Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试
  • Kafka消费者组和offset

    千次阅读 2022-02-22 15:46:28
    同时组成消费者组在消费消息时,对应的offset文件是如何变化的?本文的生产消费模型图如下,一个生产者生产,两个消费者消费: 1、启动zookeeper和kafka,并创建一个名为topicC的topic,不设备份,partition为3 ...
  • kafka offset入门理解

    千次阅读 2020-12-02 14:10:16
    offset是什么? 一个数字,记录了消费位置 offset有什么用? 消费者在消费数据时,发生宕机后,再次重新启动后,消费的数据需要从宕机位置开始读取 如果从头读取,有一部分消息一定出现了重复消费 如果从宕机时的消费位置...
  • GROUP:当前消费者组,通过group.id指定的值 TOPIC:当前消费的topic PARTITION:消费的分区 CURRENT-OFFSET:消费者消费到这个分区的offset LOG-END-OFFSET:当前分区中数据的最大offset LAG:当前分区未消费数据...
  • 对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。 单词“offset...
  • Spark整合Kafka并手动维护offset

    千次阅读 2019-09-04 14:30:19
    //维护offset:为了方便我们对offset的维护/管理,spark提供了一个类,帮我们封装offset的数据 val offsetRanges : Array [ OffsetRange ] = rdd . asInstanceOf [ HasOffsetRanges ] . offsetRanges for ( ...
  • 问题描述 在比较 <class 'datetime.datetime'> 类型时,抛出异常 原因 俩个做比较的,一个具有时区,一个不具有时区 解决 如果可以确认俩个时间都是本地时间可以将时区去除掉: python data = data.replace...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 768,124
精华内容 307,249
关键字:

offset