精华内容
下载资源
问答
  • kafka消费
    千次阅读
    2022-04-18 11:23:25

    什么是kafka消费者组

      kafka消费者组(Consumer Group)是kafka提供的可扩展且具有容错性的消费者机制。
      它是一个组,所以内部有可以有多个消费者,这些消费者共用一个ID(Group ID),一个组内的所有消费者共同协作,完成对订阅的主题的所有分区进行消费。其中一个主题中的一个分区只能由一个消费者消费。

    消费者组的特性

    1. 一个消费者组可以有多个消费者。
    2. Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组。
    3. 每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费。消费者组之间不影响。

    为什么出现消费者组

      我们知道的消息引擎模型有:点对点模型和发布/订阅模型。传统的消息引擎就是这两大类。这两大类消息引擎,都有各自适合的应用场景,也都有不适应的场景。
      点对点的模型,每消费一个消息之后,被消费的消息就会被删除。如果我们需要多个消费者消费同一个消息队列时,就不能使用点对点模型了。
      发布订阅模型,支持多个消费者消费同一个消息队列,但是发布订阅模型中,消费者订阅了一个主题后,就要订阅主题的所有分区。这总方式既不灵活,也会影响消息的真是投递效果。
      消费者组就避开了上述两种模型的缺陷,有兼容了他们的优点。
      首先消费者之间彼此独立,互不影响。可以订阅同一个主题并且互不干扰。再加上Broker端的消息留存机制,kafka的消费者组就完美的解决了上面的问题。kafka使用一种消费者组(Consumer Group)机制,就同时实现了传统消息引擎系统的两大模型:如果所有的消费者实例都属于一个消费者组那就是点对点模型,如果所有消费者实例各自是独立的消费者那就是发布订阅模型。
      因为上面消费组的第三个特性。所以消费者组的消费者实例数最好等于该消费者组订阅的主题中的分区数。如果实例数量多于分区数,那多余的实例将永远不会工作,除非有其他实例挂掉。

    针对Consumer Group,Kafka如何管理位移(offset)?

      这个问题需要区分新老版本。首先他们的存储方式都是使用类似于map的KV对实现的存储。key是分区,v对应Consumer消费该分区的最新位移。我们可以这样理解,但是实际的存储要比这个复杂的多。
      新老版本的区别在于位移存储的位置:老版本是将位移存放到zookeeper中,而zookeeper是一个分布式的协调服务框架,kafka重度依赖它实现的各种各样的协调管理。将位移存到zookeeper中的做法,显而易见的好处是减少了kafka broker端的状态保存开销。可以自由的扩缩容,实现超强的伸缩性。
      但是由于zookeeper这类元框架其实并不适合进行频繁的写更新。而Consumer Group的位移更新却是一个非常频繁的更新操作。所以并不是很适合将位移存在zookeeper中。
      于是新版本将位移保存在kafka内部主题中。就是:_consumer_offsets。

    kafka的Rebalance(重平衡)

    定义

      Rebalance本质上是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区。例如:某个Group下有20个Consumer实例,它订阅了一个有100个分区的Topic。正常情况下,kafka会给每个实例分配5个分区。这个分配的过程叫做Rebalance。
      rebalance发生时,Group下的所有Consumer实例都会协调在一起共同参与。具体怎么分配,是有分配策略协助的。分配策略以后再总结。

    触发条件

    1. 组成员数发生变化。比如有实例进入或者离开组。
    2. 订阅的主题数发生变更。
    3. 订阅主题的分区数发生变更。

    问题

      rebalance有一个比较大的问题。那就是再Rebalance过程中,所有的实例都会停止消费,等Rebalance完成。这就导致Rebalance过程中无法提供服务。而且,Rebalance的过程还很慢。所以我们要尽量避免Rebalance的发生。

    更多相关内容
  • Kafka快速入门(Kafka消费者)

    千次阅读 2022-05-01 19:59:21
    Kafka 消费者 1. Kafka 消费方式 2 Kafka 消费者工作流程 2.1 消费者总体工作流程 2.2 消费者组原理 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 ...

    Kafka 消费者

    1. Kafka 消费方式

    在这里插入图片描述

    2 Kafka 消费者工作流程

    2.1 消费者总体工作流程

    在这里插入图片描述

    2.2 消费者组原理

    Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

    消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

    消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

    在这里插入图片描述

    在这里插入图片描述

    消费者组初始化流程

    1、coordinator:辅助实现消费者组的初始化和分区的分配。

    ​ coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量)

    ​ 例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator

    作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

    在这里插入图片描述

    消费者组详细消费流程

    在这里插入图片描述

    2.3 消费者重要参数

    -参数名称-描述
    bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。
    key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
    group.id标记消费者所属的消费者组。
    enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量。
    auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s
    auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?earliest:自动重置偏移量到最早的偏移量。latest:默认,自动重置偏移量为最新的偏移量。none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。anything:向消费者抛异常。
    offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区。
    heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms的 1/3。
    session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
    max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
    fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
    fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
    fetch.max.bytes默认Default:52428800(50m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)ormax.message.bytes(topicconfig)影响。
    max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条

    3. 消费者 API

    3.1 独立消费者案例(订阅主题)

    1)需求:

    创建一个独立消费者,消费 first 主题中数据。

    在这里插入图片描述

    注意在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。

    2)实现步骤

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer {
         public static void main(String[] args) {
             // 1.创建消费者的配置对象
             Properties properties = new Properties();
             // 2.给消费者配置对象添加参数
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "hadoop102:9092");
             // 配置序列化 必须
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
    
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class.getName());
             // 配置消费者组(组名任意起名) 必须
             properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
             // 创建消费者对象
             KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
             // 注册要消费的主题(可以消费多个主题)
             ArrayList<String> topics = new ArrayList<>();
             topics.add("first");
             kafkaConsumer.subscribe(topics);
             // 拉取数据打印
             while (true) {
                 // 设置 1s 中消费一批数据
                 ConsumerRecords<String, String> consumerRecords = 
                 kafkaConsumer.poll(Duration.ofSeconds(1));
                 // 打印消费到的数据
                 for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {
                  System.out.println(consumerRecord);
              }
         }
     }
    }
    

    3)测试

    在 Kafka 集群控制台,创建 Kafka 生产者,并输入数据

    bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    >hello
    

    在 IDEA 控制台观察接收到的数据。

    3.2 独立消费者案例(订阅分区)

    1)需求:创建一个独立消费者,消费 first 主题 0 号分区的数据。

    在这里插入图片描述

    2)实现步骤

    // 消费某个主题的某个分区数据
     ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
     topicPartitions.add(new TopicPartition("first", 0));
     kafkaConsumer.assign(topicPartitions);
    

    3.3 消费者组案例

    1)需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

    在这里插入图片描述

    2)案例实操

    (1)复制一份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的两个消费者。

    (2)启动代码中的生产者发送消息,在 IDEA 控制台即可看到两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2);)。

    (3)重新发送到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只能有一个消费者消费到数据

    4 分区的分配以及再平衡

    1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。

    2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
    可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。

    在这里插入图片描述

    -参数名称-描述
    heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。
    session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
    max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
    partition.assignment.strategy消费者分区分配策略,默认策略是Range +CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range 、RoundRobin 、 Sticky 、CooperativeSticky

    4.1 Range 以及再平衡

    1Range分区策略原理

    在这里插入图片描述

    2Range分区分配策略案例

    ​ (1)修改主题 first 为 7 个分区。

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7
    

    注意:分区数可以增加,但是不能减少。

    ​ (2)复制 CustomConsumer 类,创建 CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”,同时启动 3 个消费者。

    ​ (3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

    说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

    ​ (4)观看 3 个消费者分别消费哪些分区的数据。

    3Range分区分配再平衡案例

    (1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

    ​ 1 号消费者:消费到 3、4 号分区数据。

    ​ 2 号消费者:消费到 5、6 号分区数据。

    ​ 0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。

    说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

    (2)再次重新发送消息观看结果(45s 以后)。

    ​ 1 号消费者:消费到 0、1、2、3 号分区数据。

    ​ 2 号消费者:消费到 4、5、6 号分区数据。

    说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

    4.2 RoundRobin 以及再平衡

    1RoundRobin分区策略原理

    在这里插入图片描述

    2RoundRobin分区分配策略案例

    (1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代

    码中修改分区分配策略为 RoundRobin

    // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
    

    (2)重启 3 个消费者,重复发送消息的步骤,观看分区结果。

    3RoundRobin分区分配再平衡案例

    (1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

    ​ 1 号消费者:消费到 2、5 号分区数据

    ​ 2 号消费者:消费到 4、1 号分区数据

    ​ 0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费

    说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

    (2)再次重新发送消息观看结果(45s 以后)。

    ​ 1 号消费者:消费到 0、2、4、6 号分区数据

    ​ 2 号消费者:消费到 1、3、5 号分区数据

    说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

    4.3 Sticky 以及再平衡

    粘性分区定义可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

    粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

    1)需求

    ​ 设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

    2)步骤

    ​ (1)修改分区分配策略为粘性。

    注意:3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。

    // 修改分区分配策略
    ArrayList<String> startegys = new ArrayList<>();
    startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
    

    ​ (2)使用同样的生产者发送 500 条消息

    ​ 可以看到会尽量保持分区的个数近似划分分区。

    3Sticky分区分配再平衡案例

    ​ (1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

    ​ 1 号消费者:消费到 2、5、3 号分区数据。

    ​ 2 号消费者:消费到 4、6 号分区数据。

    0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1 号消费者或者 2 号消费者消费。

    说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

    ​ (2)再次重新发送消息观看结果(45s 以后)。

    ​ 1 号消费者:消费到 2、3、5 号分区数据。

    ​ 2 号消费者:消费到 0、1、4、6 号分区数据。

    说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

    5. offset 位移

    5.1 offset 的默认维护位置

    ​ __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

    1)消费 offset 案例

    (0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

    (1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

    (2)采用命令行方式,创建一个新的 topic。

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2
    

    (3)启动生产者往 atguigu 生产数据。

    bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092
    

    (4)启动消费者消费 atguigu 数据。

    bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic atguigu --group test
    

    注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。

    (5)查看消费者消费主题__consumer_offsets。

    bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter 
    "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
    
    [offset,atguigu,1]::OffsetAndMetadata(offset=7, 
    leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
    expireTimestamp=None)
    [offset,atguigu,0]::OffsetAndMetadata(offset=8, 
    leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
    expireTimestamp=None)
    

    5.2 自动提交 offset

    为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。5s

    自动提交offset的相关参数:

    • enable.auto.commit:是否开启自动提交offset功能,默认是true

    • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

    在这里插入图片描述

    -参数名称-描述
    enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量。
    auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s

    1)消费者自动提交 offset

     // 是否自动提交 offset
     properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
     // 提交 offset 的时间周期 1000ms,默认 5s
     properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000)
    

    5.3 手动提交 offset

    ​ 虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因

    此Kafka还提供了手动提交offset的API。

    ​ 手动提交offset的方法有两种:分别是commitSync(同步提交)commitAsync(异步提交)。两者的相

    同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成

    功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败

    commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。

    commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

    在这里插入图片描述

    1)同步提交 offset

    ​ 由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提

    交的效率比较低。以下为同步提交 offset 的示例。

    // 是否自动提交 offset
     properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
    //消费
    
    // 同步提交 offset
    consumer.commitSync();
    

    2)异步提交offset

    ​ 虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

    // 是否自动提交 offset
     properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
    //消费
    
    // 异步提交 offset
     consumer.commitAsync();
    

    5.4 指定 Offset 消费

    auto.offset.reset = earliest | latest | none 默认是 latest。

    ​ 当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

    (1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning

    (2)latest(默认值):自动将偏移量重置为最新偏移量。

    (3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

    在这里插入图片描述

    (4)任意指定 offset 位移开始消费

    // 1 创建一个消费者
    // 2 订阅一个主题
    
    //指定offset
    Set<TopicPartition> assignment= new HashSet<>();
     while (assignment.size() == 0) {
         kafkaConsumer.poll(Duration.ofSeconds(1));
         // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
         assignment = kafkaConsumer.assignment();
     }
     // 遍历所有分区,并指定 offset 从 1700 的位置开始消费
     for (TopicPartition tp: assignment) {
     	kafkaConsumer.seek(tp, 1700);
     }
    

    注意:每次执行完,需要修改消费者组名;

    5.5 指定时间消费

    在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

    //1 创建一个消费者
    // 2 订阅一个主题
    ....
        
    Set<TopicPartition> assignment = new HashSet<>();
     while (assignment.size() == 0) {
     kafkaConsumer.poll(Duration.ofSeconds(1));
         // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
         assignment = kafkaConsumer.assignment();
     }
     HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
     // 封装集合存储,每个分区对应一天前的数据
     for (TopicPartition topicPartition : assignment) {
         timestampToSearch.put(topicPartition, 
         System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
     }
     // 获取从 1 天前开始消费的每个分区的 offset
     Map<TopicPartition, OffsetAndTimestamp> offsets = 
     kafkaConsumer.offsetsForTimes(timestampToSearch);
     // 遍历每个分区,对每个分区设置消费时间。
     for (TopicPartition topicPartition : assignment) {
         OffsetAndTimestamp offsetAndTimestamp = 
         offsets.get(topicPartition);
         // 根据时间指定开始消费的位置
         if (offsetAndTimestamp != null){
            kafkaConsumer.seek(topicPartition, 
            offsetAndTimestamp.offset());
          }
     }
    

    5.6 漏消费和重复消费

    重复消费已经消费了数据,但是 offset 没提交。

    漏消费先提交 offset 后消费,有可能会造成数据的漏消费。

    在这里插入图片描述

    怎么能做到既不漏消费也不重复消费呢?

    6. 消费者事务

    如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如MySQL)。

    在这里插入图片描述

    7 数据积压(消费者如何提高吞吐量)

    1)如果是Kafka消费能力不足,则可以考虑增 加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)

    在这里插入图片描述

    2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

    在这里插入图片描述

    -参数名称-描述
    fetch.max.bytes默认Default:52428800(50m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)ormax.message.bytes(topicconfig)影响。
    max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条

    笔记来自b站尚硅谷

    展开全文
  • 这篇博客呢,就跟大家一起聊一下 kafka 消费者如何消费的?如何避免重复消费消费流程:一般我们消费测试是不会变的,都使用默认的,也就是第一种,range策略。默认策略,保证基本是均衡的。 计算公式 : n = 分区...

    一、前言

    前面博客小编向大家分享了 kafka如何保证消息不丢失?,基本是从producer和broker来分析的,producer要支持重试和acks,producer要做好副本和及时刷盘落地。

    这篇博客呢,就跟大家一起聊一下 kafka 消费者如何消费的?如何避免重复消费?

    二、消费者消费流程

    消费流程:

    1. 从zk获取要消费的partition 的leader的位置 以及 offset位置
    2. 拉数据,这里拉数据是直接从broker的pagecash拉取,零拷贝 ,所以很快。
    3. 如果pagecash数据不全,就会从磁盘中拉取,并发送
    4. 消费完成后,可以手动提交offset,也可以自动提交offset。
      在这里插入图片描述

    消费策略有哪些?如何配置

    一般我们消费测试是不会变的,都使用默认的,也就是第一种,range策略。

    • Range 范围分配策略(默认)

    默认策略,保证基本是均衡的。
    计算公式 :
    n = 分区数/消费者数
    m = 分区数%消费者数
    前m个消费者,消费n+1个,剩余的消费n个
    在这里插入图片描述
    在这里插入图片描述
    eg:12个partition,9个消费者
    12/9 = 1
    12%9 = 3
    前3台 消费2个partition,后6台各消费1个partition。

    • RoundRobin 轮询

    先根据topic 和 topic的partition的hashcode进行一个排序,然后以轮询的方式分配给各个消费者。

    在这里插入图片描述

    • stricky粘性分配策略

    在没有reblence的时候和轮询策略一样
    当发生rebalence的时候,尽可能的保证与上一次分配一致

    比如默认是
    在这里插入图片描述
    比如consumer2 挂了,topicA p1 和topicB p2就没有消费者了,这个时候要进行消费组的rebalence。
    在这里插入图片描述

    然后按照轮询策略分配一下。
    在这里插入图片描述

    可以在配置消费配置的时候,指定消费策略:

    //Range
    propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.RangeAssignor.class);
    
    //RoundRobin
    propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.RoundRobinAssignor.class);
    
    //stricky
    propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.StickyAssignor.class);
    
    

    什么是零拷贝?

    普通把文件发送到远程服务器的方法:
    在这里插入图片描述
    1.读磁盘内容,拷贝到内核缓冲区
    2.cpu把内核缓冲区数据拷贝到用户空间缓冲区
    3.调用write(),把用户空间缓冲区数据拷贝到内核的Socket Buffer中
    4.把sb中的数据拷贝到网卡缓冲区 NIC Buffer ,网卡在传输

    从上面的流程看, 1和3 其实是多余的,用户和内核相互转换,会带来cpu上下文切换,对cpu性能有影响。

    零拷贝 就是对这两次的拷贝忽略掉,应用程序可以直接把磁盘中的数据从内核中,直接传输到socket,不用互相拷贝。其中用到了Direct Memory Access 技术,可以把数据直接从内核空间传递到网卡设备,kafka中把数据直接从磁盘复制到 pagecash,给消费者读取,如图:

    在这里插入图片描述
    在这里插入图片描述
    零拷贝其实不是没有拷贝,只是减少了不必要的拷贝次数,比如内核到用户空间的拷贝。
    linux 中使用sendfile()实现零拷贝
    java中nio用到零拷贝,比如filechannel.transferTo()。

    mmap 文件映射机制:把磁盘文件映射到内存,用户通过修改内存,就可以修改磁盘文件。提高io效率,减少了复制开销。

    三、如何避免重复消费?

    分析原因:

    1.生产者重复提交
    2.rebalence引起重复消费

    超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance,提交offset失败。其他消费者会从没有提交的位置消费,从而导致重复消费。

    解决方案:

    1.提高消费速度

    • 增加消费者
    • 多线程消费
    • 异步消费
    • 调整消费处理时间

    2.幂等处理

    • 消费者设置幂等校验

    • 开启kafka幂等配置,生产者开启幂等配置,将消息生成md5,然后保存到redis中,处理新消息的时候先校验。这个尽量不要开启,消耗性能。

    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    

    四、如何顺序消费?

    我们知道kafka,整个topic有多个partition,每个partition内的消息是有顺序的。

    五、如何延迟消费?

    kafka是无状态的,没有延迟的功能。pulsar和rabbitmq实现更加方便。
    在这里插入图片描述
    开发延迟推送服务,定时检索延迟消息,发送给kafka。

    六、频繁rebanlence怎么解决?

    再均衡,保证所有消费者相对均衡消费。rebalence的时候,所有消费者,停止消费,直到rebanlence完成。

    触发时机:
    1.consumer个数变化
    2.订阅topic个数变化
    3.订阅的topic的partition变化

    解决方案:

    使用消息队列Kafka版时消费客户端频繁出现Rebalance

    频繁出现rebalence,可能是消费者的消费时间过长,超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance。

    1.参数调整:
    session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
    max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>的积。
    max.poll.interval.ms: 该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

    2.尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
    3.减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic。

    附:批量消费代码

    import com.ctrip.framework.apollo.ConfigService;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class BehaviorConsumerConfig {
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
                propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.StickyAssignor.class);
    
            propsMap.put("security.protocol", protocol);
            propsMap.put("ssl.truststore.location", truststoreLocation.replaceAll("file://", ""));
            propsMap.put("ssl.truststore.password", truststorePassword);
            propsMap.put("login.config.location", loginConfigLocation);
            propsMap.put("sasl.mechanism", mechanism);
            return propsMap;
        }
    
        @Bean("batchContainerFactory")
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    
            // 并发创建的消费者数量
            factory.setConcurrency(4);
            factory.getContainerProperties().setPollTimeout(3000);
    
            //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
            factory.setBatchListener(true);
            return factory;
        }
    }
    

    七、小结

    本篇我们基本上把消费者的消费梳理干净了,以及消费会遇到的 重复消费,顺序消费,延迟消费等问题都也解释了给出了解决方案。方案一通百通。

    展开全文
  • kafka 消费机制

    千次阅读 2021-10-06 09:26:21
    一、本文介绍了kafka的基础概念:topic、partition、broker、consumer、consumer group和producer。 Topic 一个Topic代表了一类资源,一种事件。比如用户上传的数据可以是一个topic,系统产生的事件也可以是一个...

    一、本文介绍了kafka的基础概念:topic、partition、broker、consumer、consumer group和producer。

    1. Topic
      一个Topic代表了一类资源,一种事件。比如用户上传的数据可以是一个topic,系统产生的事件也可以是一个topic

    2. Broker
      一个broker代表一个kafka实例,通常建议一台物理机配置一个kafka实例,因为配置多个磁盘的IO限制也注定了性能不会提升太多

    3. Partition
      一个Topic可以创建多个Partition,一个partition就是一个存储kafka数据的文件(称为log),每个partition内部,消息是顺序排列的。

    由于Partition是顺序写磁盘,不需要关心锁的问题,保证了Kafka的高吞吐量;每个partition内的数据是有序的。多个partition可以解决磁盘IO的性能限制,同时,也可以通过指定数据发送给kafka时的key,key被用来决定数据放到哪个partition,这样就消费数据时,某个partition的数据就是按照某个规则有序的了。

    Partition和broker没有必然联系(可以参考Partition分配到broker的策略),但是一个partition只能位于一个broker上。

    1. Consumer
      消费者,消费Kafka中存储的数据,一个consumer可以消费多个partition(同topic/不同topic)

    2. Consumer Group
      消费者组。一个消费者组可以有多个消费者,当使用高级消费者时,只需要指定订阅(subscribe)的topic;使用低级消费时,则需要指定consumer分配(assign)的topic和partition,,指定哪个partition就消费哪个,没有限制。
      一个partition只能被同一个消费者组的一个订阅的consumer(高级)消费,但是可以被不同消费者组的多个consumer消费(订阅/分配均可),或者被同一个组的一个订阅的consumer和任意个分配的consumer(低级)消费;这个限制是由client端实现的,详情参见本系列第二篇文章。

    3. Offset
      offset记录了某个consumer group在一个partition中消费数据的位置。由partition和group唯一确定。

    4. Producer
      生产者,生产数据到Kafka

    示意图如下:

    topic a:
    p1 被groupMconsumerA 消费
    p2 被groupMconsumerB 消费
    实现了一个topic被两个同group独立的consumer消费,提升消费速度。

    topic b:
    p1 被groupMconsumerB和groupNconsumerC同时消费
    p2 被groupMconsumerB和groupNconsumerC同时消费
    实现了多播的效果,所有到topicb的数据,consumerB和C同时接收到
    在这里插入图片描述
    【参考链接】

    Kafka背景及架构介绍
    KafkaPartition与Broker的映射关系

    二、本文主要介绍Kafka中的topic、partition、offset的概念,和kafka java使用consumer时,高级消费和低级消费不同场景下的区别,通过本文,大致能够了解kafka是怎么保证至少消费一次,以及什么情况下会出现重复消费和丢失数据。

    Offset
    kafka高吞吐量的保证是Partition是顺序写磁盘,同样消费也是顺序的,offset维护了一个group的消费者在当前partition消费的数据位置。

    当一个consumer启动后,会查询服务端的offset作为本地offset;
    运行中poll数据使用的是本地offset,不再查询server;
    每poll完一批数据,自动更新本地offset
    server端也会维护一个offset,新版kafka offset是维护在一个topic中,旧版维护在zookeeper
    提交offset是指:使用本地的offset/指定的offset 去更新server端的offset,但是本地offset不会改变

    自动提交
    自动提交策略下,是每隔指定时间,由kafka-clients自动提交本地维护的offset,默认本地offset=poll的数量+1。(本地offset可以通过seek方法修改)
    但是会出现数据丢失的情况,比如poll了一批数据没有处理完,但是到时间了已经提交了offset,然后程序终止了,下次启动会从新的offset’启动,没有处理的数据丢失了

    手动提交

    1. 不指定offset:同上,也是提交本地维护的offset,默认本地offset=poll的数量+1。
      这种模式下,数据处理完毕(保存/丢弃)后再手动提交,解决了自动模式下的数据丢失问题,但是可能存在消费完的数据,offset没有提交成功,重复消费数据的问题(可以通过数据库事务解决)
    2. 指定offset:更新server端offset为指定值,但是本地offset不会更新,所以在consumer没有重启的情况下,是不会消费到重复数据。

    Consumer消费数据
    一个大致的流程如下
    在这里插入图片描述
    消费有两种指定topic的方式:subscribe和assign,两种方式主要区别在于partition的分配,前者是由kafka-clients分配的(高级消费),而后者是我们手动指定的(低级消费)。

    注意:Consumer线程不安全,不能多线程共用

    高级消费
    API
    对应于KafkaConsumer.subscribe()方法。可以接受的参数为

    1subscribe(java.util.Collection<java.lang.String> topics)
    2subscribe(java.util.Collection<java.lang.String> topics,ConsumerRebalanceListener listener)
    3subscribe(java.util.regex.Pattern pattern)
    4subscribe(java.util.regex.Pattern pattern,ConsumerRebalanceListener listener)

    所以,其只接受订阅某个topic,而不能具体指定partition。

    介绍
    使用高级消费时,假定,1-N个consumer,属于同一个group。根据订阅的consumer的个数,由kafka-clinets根据指定的分配策略分配每个consumer消费的partition。注意:必须使用合理的分配策略,否则可能出现一些consumer没有分配partition的情况。

    若N>partition num(所有topic的partition总和), 则一些consumer不会被分配partition
    若N< partition num,则某些consumer会消费多个partition
    当消费多个partition时,消费每个分区内的消息是有序的,但消费多个分区之间的消息是无序的(可以在消费记录中获得当前记录的partition)

    partition分配策略

    1. range: 得到topic-partitions关系,得到topic-consumers关系,然后,按照topic进行分配,即topic的所有partition按顺序分配到其所有的consumer上,举例:topicA-3partition, topicB-1partition, 4 consumers, 过程是,A的3个partition分配到consumer1-3,B的1个partition分配到consumer1,consumer4空闲,所以使用的最大线程数=max(topic*partition)

    2. roundrobin:topics和patition组合,上述例子,就是ta-0,ta-1,ta-2,tb-0,然后四个取hashcode得到顺序,然后挨个分配到consumer上(要求:每一个consumer消费的topics有相同的streams&&这个消费组中每个consumer消费的topics必须完全相同)

    上面的文字可能有描述不准确或不清楚的地方,这里列出了官方对着两种策略的解释:

    RoundRobin: The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) (For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance and thread-id within that instance. Therefore, round-robin assignment is allowed only if: a) Every topic has the same number of streams within a consumer instance b) The set of subscribed topics is identical for every consumer instance within the group.

    Range: Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 and C2 with two streams each, and there are five available partitions (p0,p1, p2, p3, p4). So each consumer thread will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 ->C2-0, p4 -> C2-1

    reblance
    订阅模式下,每加入或者离开一个consumer,都会触发consumer reblance,重新为每个消费者分配partition。

    reblance的过程发生了什么?查看kafka-clients源码可以发现:
    AbstractCoordinator 有详细说明调用subscribe方法发生了以下的事请

    consumer注册到到服务端
    coordinator(server端维护的一个服务)查找所有的该组consumer,选取leader
    如果auto commit为true,所有的consumer提交本地offset到服务端;为false则不提交
    leader通过coordinator获取服务端所有的partition和offset,并使用策略重新分配partition,结果返回给coordinator,coordinator下发分配结果到所有consumer(即join和leave的reblance)。
    所以高级消费者集群时,新加入的consumer,如果是auto-commit则会提交offset,若未处理完可能会丢失数据;否则不提交,会重复消费数据。离开consumer,若未提交offset离开,则会重复消费数据;若自动提交了但是未消费,则会丢失数据。

    低级消费
    API
    对应于KafkaConsumer.assign()方法,指定TopicPartition的集合

    1assign(java.util.Collection partitions)

    介绍
    使用低级消费时,直接指定consumer消费某个topic的某个partition,不再由kafka-clients分配,这种情况下,第一篇文章中已经提到,是可以多个同组消费者消费同一个partition的。

    所以当同一个消费组指定重复的partition时,会消费到重复的数据(完全重复的数据,因为poll的offset是本地维护的),但是server端只有一个offset!server的offset被两个consumer更新,会出现冲突和错乱,这种模式下,需要开发者自己保证同一个消费组的消费着具有不重复的partition。

    高级or低级?
    如何抉择,主要取决于复杂性和数据一致性的取舍,即reblance带来的影响和手动分配带来的复杂的取舍。

    数据丢失/重复消费
    高级消费partition的分配是由kafka-clients完成的,但是会查询server端的信息,所以集群环境下,当没有指定partition时,每加入/离开一个消费者,kafka-clients都会重新平衡partition的分配,这个时候,如果有消费完成但是没有提交的offset,reblance时则会造成数据的重复消费或者数据丢失(具体是哪种情况,要看offset的提交策略)。低级消费则不会发生reblance!

    注意:Spring-kafka多线程消费的配置下,指定topic和partition时,也是低级消费,其线程和partition的分配策略见后续spring-kafka的教程。

    reblance影响性能
    每次reblance都要重新分配,如果partition比较多的情况下,重新分配将会消耗大量的时间。

    低级消费时的高可用
    如果使用低级消费,当一个consumer退出时,其partition不会再分配给其他消费者,数据将会堆积在kafka中!所以务必要保证退出的消费者能重新运行。
    【参考链接】
    1.Kafka-偏移量提交
    2.更详细的reblance过程

    本文主要介绍kafka原生api的使用,关于kafka apache官方的文档页面只有简单的说明,不过所有的使用说明都在apache kafka java doc文档页面,每个类的文档都有详细的使用说明,源码中也有详细的注释。
    原生api的使用比较简单,直接创建Consumer或者Producer对象即可,注意:由于Consumer线程不安全,不得多线程公用,且最好使用final变量

    更多参考api docs

    自动提交

    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records)
     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
    

    手动提交

    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
     buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
     insertIntoDb(buffer);
     consumer.commitSync();
     buffer.clear();
     }
     }
    

    多线程

    public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
    
     public void run() {
     try {
     consumer.subscribe(Arrays.asList("topic"));
     while (!closed.get()) {
     ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
     // Handle new records
     }
     } catch (WakeupException e) {
     // Ignore exception if closing
     if (!closed.get()) throw e;
     } finally {
     consumer.close();
     }
     }
    
     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
     closed.set(true);
     consumer.wakeup();
     }
     }
    

    低级消费

    String topic = "foo";
    TopicPartition partition0 = new TopicPartition(topic, 0);
    TopicPartition partition1 = new TopicPartition(topic, 1);
    consumer.assign(Arrays.asList(partition0, partition1));
    
    展开全文
  • kafka消费模式

    千次阅读 2022-03-21 11:00:10
    kafka消费模式
  • 聊聊Kafka(三)Kafka消费者与消费组

    千次阅读 2021-01-07 19:30:35
    Kafka消费者与消费组简介消费者概念入门消费者、消费组心跳机制消息接收必要参数配置订阅反序列化位移提交消费者位移管理再均衡避免重平衡消费者拦截器消费组管理什么是消费者组消费者位移(consumer position)位移...
  • Java实现kafka消费

    千次阅读 2021-11-30 11:41:50
    消费者基本配置及代码: #spring.kafka.client-id= spring.kafka.bootstrap-servers=localhost:9092 ###########【消费者基本配置】########### #spring.kafka.consumer.client-id= spring.kafka.consumer....
  • kafka消费数据

    千次阅读 2022-04-08 17:01:23
    kafka消费数据最简单的实现方法 from kafka import KafkaConsumer def consumer_kafka(): kafkaServers = [] topic = "" consumer = KafkaConsumer(topic, bootstrap_servers=kafkaServers, auto_offset_reset...
  • 正常使用kafka消费者,接收消息时,会出现消息循环无法结束问题,增加参数 consumer_timeout_ms:超时时间(毫秒),超过指定时间没有获取到消息关闭kafka。(例子如下) consumer.py文件: from kafka import ...
  • kafka消费

    千次阅读 2021-11-18 20:12:22
    1、同一个topic,每个消费者组都可以拿到相同的全部数据。 2、消费者多于分区数: 2.1 创建一个测试用的单分区topic-test 2.2 创建一个分组group1 2.3 在group1中启动两个消费者 producer:a,b,c consumer1:无 ...
  • kafka消费者Rebalance机制

    千次阅读 2022-04-02 11:31:40
    rebalance就是说如果消费组里的消费者数量有变化或消费的分区数有变化,kafka会重新分配消费消费分区的关系。比如consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,...
  • kafka消费指定分区数据

    千次阅读 2022-03-08 16:08:49
    kafka消费指定分区数据
  • kafka 消费者的工作流程

    千次阅读 2022-04-10 12:02:32
    kafka消费者 1.1 kafka消费的方式 consumer采用从broker中主动拉取数据。 Kafka采用这种方式。 pull模式不足之处是,如 果Kafka没有数 据,消费者可能会陷入循环中,一直返回 空数据。 不采用push(推)模式...
  • 跟我学Kafka:Kafka消费组运维详解

    千次阅读 热门讨论 2021-05-16 21:20:59
    kafka系列第二篇,该系列将见证笔者从Kafka小白进阶专家之路,欢迎各位小伙伴关注,一起前行。
  • Kafka消费者流程源码解析

    千次阅读 2022-03-11 21:39:07
    本文章简单流程追踪一下Kafka从服务端拉取数据到客户端的流程。 看完本文,你将会大致了解消费者数据消费的过程。 2.消费者示例 Properties properties = new Properties(); properties.put(ConsumerConfig....
  • 一 重复消费和漏消费 1.1 重复消费 重复消费:已经消费了数据,但是 offset 没提交。 1.2 漏消费 ...1.如果是Kafka消费能力不足,消费者数量较少,则可以考虑增 加Topic的分区数,并且同时提升消费...
  • java使用kafka消费topic数据

    千次阅读 2022-03-08 15:38:00
    java使用kafka消费topic数据
  • Kafka消费者组和offset

    千次阅读 2022-02-22 15:46:28
    新版Kafka已将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic。如何将多个consumer组成消费者组?同时组成消费者组在消费消息时,对应的offset文件是如何变化的?本文的生产消费模型图...
  • kafka消费者分区消费策略

    千次阅读 2020-12-27 22:24:58
    而从消费者一端来看,consumer连接到kafka集群之后,是基于什么样的分区策略进行消息消费的呢? kafka消息消费原理 如上图所示,kafka的设计架构让它从开始就为分布式而生,上图是一个简单的消息消费示意图,我们...
  • 分区分配策略、Range、RoundRobin、Sticky 以及再平衡、offset 位移、自动提交offset、手动提交 offset、指定 Offset 消费、指定时间消费、漏消费和重复消费消费者事务、数据积压
  • kafka消费者组消费数据问题

    千次阅读 2022-03-08 16:25:23
    kafka消费者组消费数据问题
  • Kafka消费者读取数据

    千次阅读 2022-03-03 20:03:07
    import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; ...
  • kafka消费延迟或者重复消费原因

    千次阅读 2020-12-30 21:01:43
    简介由于项目中需要使用kafka作为消息队列,并且项目是基于spring-boot来进行构建的,所以项目采用了spring-kafka作为原生kafka的一个扩展库进行使用。先说明一下版本:spring-boot 的版本是1.4.0.RELEASEkafka 的...
  • kafka消费消息-java版-demo

    千次阅读 2021-12-09 20:04:59
    try{ while(true){ //1000是超时设定,如果有定时要求,可设置,否则建议设置个比较大的值 //通常consumer拿到足够多的数据,会立即返回,否则会阻塞 //poll返回则认为是成功消费了消息,如果发现消费慢需要分析是...
  • kafka 消费者详解

    千次阅读 2022-02-09 17:40:54
    kafka消费者 和 消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者 和 消费者组 什么是消费者? 顾名思义,消费者就是从kafka集群消费数据的客户端, 如下图,展示了一个消费者从一个...
  • kafka消费者参数配置

    千次阅读 2021-02-07 21:51:35
    参数默认配置 auto.commit.interval.ms = 5000 check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false ... 如果Kafka中还有消息没有消费的话,会马上去读,而不需要等待。
  • 带你认识kafka消费者组

    千次阅读 2021-03-30 23:55:59
    kafka消费者组单播与多播topic与消费者组消费者组其他应用 单播与多播 创建topic ,这里创建名为yangcan,副本因子和分区都为1的topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-...
  • 环境:出现12台集群一个kafka节点,消费组一个java-data,消费者6个 问题:运行几个小时后,开始逐步出现消费者被coordinator提出消费组,但是程序进程未退出,正常运行中。 怀疑问题 网络问题,不生产数据,一直挂...
  • 启动kafka消费者报错

    千次阅读 2022-01-22 10:32:08
    当我们启动kafka消费者的时候 bin/kafka-console-consumer.sh --zookeeper spark-local:2181 --topic myTopic 出现了这个问题:no brokers found when trying to rebalance 那就是kafka没有启动,在kafka启动之前要...
  • kafka消费延迟问题查找

    千次阅读 2020-12-10 20:32:04
    最近线上偶尔爆出kafka消费延迟,但是系统的数据量并不大,为什么会延迟呢? 具体分析如下。 思路: 1.查看机器中数据积压情况,是否是因为数据量过大导致的消费延迟。 2.统计数据发送kafka成功到数据消费出来(还未...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 115,449
精华内容 46,179
关键字:

kafka消费

友情链接: 水泵自动化.zip