精华内容
下载资源
问答
  • kafka-消费者 简单的 Scala Kafka 消费者 如何构建
  • 在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费...3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。
  • 3.3.1 消费方式 consumer 采用 pull(拉)模式从 broker 中读取数据。 push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。 它的目标是尽可能以最快速度传递消息,但是这样很容易...

    3.3.1 消费方式 

    consumer 采用 pull(拉)模式从 broker 中读取数据。 
    push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。
    它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。 
    pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout

    3.3.2 分区分配策略 

    一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。 

    Kafka 有两种分配策略,一是 RoundRobin,一是 Range。 

    1)RoundRobin (轮询)按照组来消费

    分区分配策略之RoundRobin

    使用轮询的策略优点:就是一个消费者组多个消费者直接消费消息最多相差1个

    缺点:使用轮询的策略有一个问题,当一个消费者组订阅的是多个topic主题,假设有一个消费者组consumergroup(consumerA订阅了主题topic1和consumerB主题topic2)consumerA消费topic1,consumerB消费topic2 ,这看起来似乎没有问题,使用轮询的策略会将消费者组订阅的主题当成一个整体。但是topic1和topic2各有三个partition分区,在kafka内部有一个TopicAndPartition这个类会将topic1和topic2的partition进行排序,假设两个经过排序之后顺序{topic1partition0,topic2partition0,topic2partition1,topic1partition2,topic1partition1,topic2partition2}   然后consumerA和consumerB轮询的拉去消息,这样consumerA就会将topic2的消息给拉取消费了这样是不是有问题?

    所以使用轮询策略条件的前提:就是一个消费者组里消费者订阅的主题是一样的,只有consumerA和consumerB都订阅了topic1和topic2,这样使用轮询的方式才不会有问题

    2)Range  (范围)默认的消费方式  按主题的方式给消费者(谁订阅了我就给谁消费)

          分区分配策略之Range

    范围range是按照范主题划分的,一个主题7个分区 3个消费者  7除以3除不尽就会分布不均,消费者1消费前topic1的前三个分区,后面两个消费者消费topic1的4和5分区   6和7分区就给消费者3消费,这种情况看起来也没有什么问题?

    缺点:假设消费者他们订阅了2个主题topic1和topic2  都是7个分区 ,由于是按主题划分的所以,消费者1就分到了topic1和topic2的1、2、3分区这样消费者1就被分到了6个分区,消费者2和消费者3只分到了4个分区,随着订阅的主题越来越多,这样消费者1和其他消费者相差越来越大,就不均衡了 

    思考一个问题:消费者消费消息什么时候重新分配?

    当消费者个数发生变的时候,

    1,假设topic1有6个分区  三个消费者A、B、C,不管用什么策略分配,假设C负责消费partition4和partition5,突然C挂掉了,这个时候partition4和partition5需不需要消费,答案当然是要,那怎么消费?当然是重新分配

    2,假设topic1有6个分区  三个消费者A、B、C、D,当消费者A服务起来的时候6个分区都分配给了A,当B起来的时候重新分配,当C起来时候也会重新分配,消费者A、B、C都分配到了2个partition,当第四个消费者D加进来的时候,会怎么办?还是上面那句话,消费者个数发生变化的时候,就会触发分区分配策略重新分配

    总结:当消费者个数发生变的时候,消费者个数可以增多或者减少,甚至可以增多至比分区数还多的时候,照样会重新分配,只是有些消费者可能被分配不到

    3.3.3 offset 的维护 

    由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。 

    Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。 
    1)修改配置文件 consumer.properties

    exclude.internal.topics=false

    2)读取 offset

    0.11.0.0 之前:
    bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties from-beginning 
    0.11.0.0 之后版本(含): 
    bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning 

    但是在新版本中

    [root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning
    Missing required argument "[bootstrap-server]"
    

    那我们不能用zookeeper了

    [root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server backup01:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning

     

     

    展开全文
  • Kafka消费者:从Kafka中读取数据

    万次阅读 多人点赞 2018-08-20 20:38:07
    本系列文章为对《Kafka:The Definitive Guide》的学习整理,希望能够帮助到...Kafka消费者相关的概念 消费者与消费组 假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个...

    本系列文章为对《Kafka:The Definitive Guide》的学习整理,希望能够帮助到大家

    应用从Kafka中读取数据需要使用KafkaConsumer订阅主题,然后接收这些主题的消息。在我们深入这些API之前,先来看下几个比较重要的概念。

    Kafka消费者相关的概念

    消费者与消费组

    假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。

    Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:

    one

    如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,如下所示:

    two

    如果增加到4个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:

    four

    但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:

    more

    总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

    Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

    double

    在这个场景中,消费组G1和消费组G2都能收到T1主题的全量消息,在逻辑意义上来说它们属于不同的应用。

    最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

    消费组与分区重平衡

    可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。

    消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。

    如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

    在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

    创建Kafka消费者

    读取Kafka消息只需要创建一个kafkaConsumer,创建过程与KafkaProducer非常相像。我们需要使用四个基本属性,bootstrap.servers、key.deserializer、value.deserializer和group.id。其中,bootstrap.servers与创建KafkaProducer的含义一样;key.deserializer和value.deserializer是用来做反序列化的,也就是将字节数组转换成对象;group.id不是严格必须的,但通常都会指定,这个参数是消费者的消费组。

    下面是一个代码样例:

    Properties props = new Properties();
    props.put("bootstrap.servers", "broker1:9092,broker2:9092");
    props.put("group.id", "CountryCounter");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);

    订阅主题

    创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表,非常简单:

    nsumer.subscribe(Collections.singletonList("customerCountries"));

    这个例子中只订阅了一个customerCountries主题。另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:

    consumer.subscribe("test.*");

    拉取循环

    消费数据的API和处理方式很简单,我们只需要循环不断拉取消息即可。Kafka对外暴露了一个非常简洁的poll方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能,但使用时这些细节都被隐藏了,我们也不需要关注这些。下面是一个代码样例:

    try {
       while (true) {  //1)
           ConsumerRecords<String, String> records = consumer.poll(100);  //2)
           for (ConsumerRecord<String, String> record : records)  //3)
           {
               log.debug("topic = %s, partition = %s, offset = %d,
                  customer = %s, country = %s\n",
                  record.topic(), record.partition(), record.offset(),
                  record.key(), record.value());
               int updatedCount = 1;
               if (custCountryMap.countainsValue(record.value())) {
                   updatedCount = custCountryMap.get(record.value()) + 1;
               }
               custCountryMap.put(record.value(), updatedCount)
               JSONObject json = new JSONObject(custCountryMap);
               System.out.println(json.toString(4))
           }
       }
    } finally {
          consumer.close(); //4
    }

    其中,代码中标注了几点,说明如下:

    • 1)这个例子使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。
    • 2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。
    • 3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
    • 4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。

    另外需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。

    消费者配置

    上面的例子中只设置了几个最基本的消费者参数,bootstrap.servers,group.id,key.deserializer和value.deserializer,其他的参数可以看Kafka文档。虽然我们很多情况下只是使用默认设置就行,但了解一些比较重要的参数还是很有帮助的。

    fetch.min.bytes

    这个参数允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。

    fetch.max.wait.ms

    上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。

    max.partition.fetch.bytes

    这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。

    需要注意的是,max.partition.fetch.bytes必须要比broker能够接收的最大的消息(由max.message.size设置)大,否则会导致消费者消费不了消息。另外,在上面的样例可以看到,我们通常循环调用poll方法来读取消息,如果max.partition.fetch.bytes设置过大,那么消费者需要更长的时间来处理,可能会导致没有及时poll而会话过期。对于这种情况,要么减小max.partition.fetch.bytes,要么加长会话时间。

    session.timeout.ms

    这个参数设置消费者会话过期时间,默认为3秒。也就是说,如果消费者在这段时间内没有发送心跳,那么broker将会认为会话过期而进行分区重平衡。这个参数与heartbeat.interval.ms有关,heartbeat.interval.ms控制KafkaConsumer的poll()方法多长时间发送一次心跳,这个值需要比session.timeout.ms小,一般为1/3,也就是1秒。更小的session.timeout.ms可以让Kafka快速发现故障进行重平衡,但也加大了误判的概率(比如消费者可能只是处理消息慢了而不是宕机)。

    auto.offset.reset

    这个参数指定了当消费者第一次读取分区或者上一次的位置太老(比如消费者下线时间太久)时的行为,可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)。

    enable.auto.commit

    这个参数指定了消费者是否自动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false。如果为true,你可能需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。

    partition.assignment.strategy

    我们已经知道当消费组存在多个消费者时,主题的分区需要按照一定策略分配给消费者。这个策略由PartitionAssignor类决定,默认有两种策略:

    • 范围(Range):对于每个主题,每个消费者负责一定的连续范围分区。假如消费者C1和消费者C2订阅了两个主题,这两个主题都有3个分区,那么使用这个策略会导致消费者C1负责每个主题的分区0和分区1(下标基于0开始),消费者C2负责分区2。可以看到,如果消费者数量不能整除分区数,那么第一个消费者会多出几个分区(由主题数决定)。
    • 轮询(RoundRobin):对于所有订阅的主题分区,按顺序一一的分配给消费者。用上面的例子来说,消费者C1负责第一个主题的分区0、分区2,以及第二个主题的分区1;其他分区则由消费者C2负责。可以看到,这种策略更加均衡,所有消费者之间的分区数的差值最多为1。

    partition.assignment.strategy设置了分配策略,默认为org.apache.kafka.clients.consumer.RangeAssignor(使用范围策略),你可以设置为org.apache.kafka.clients.consumer.RoundRobinAssignor(使用轮询策略),或者自己实现一个分配策略然后将partition.assignment.strategy指向该实现类。

    client.id

    这个参数可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用。

    max.poll.records

    这个参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。

    receive.buffer.bytes、send.buffer.bytes

    这两个参数控制读写数据时的TCP缓冲区,设置为-1则使用系统的默认值。如果消费者与broker在不同的数据中心,可以一定程度加大缓冲区,因为数据中心间一般的延迟都比较大。

    提交(commit)与位移(offset)

    当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。

    在正常情况下,消费者会发送分区的提交信息到Kafka,Kafka进行记录。当消费者宕机或者新消费者加入时,Kafka会进行重平衡,这会导致消费者负责之前并不属于它的分区。重平衡完成后,消费者会重新获取分区的位移,下面来看下两种有意思的情况。

    假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费,如下所示:

    dup

    假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后Kafka进行重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息,如下所示:

    miss

    因此,提交位移的方式会对应用有比较大的影响,下面来看下不同的提交方式。

    自动提交

    这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。

    需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

    提交当前位移

    为了减少消息重复消费或者避免消息丢失,很多应用选择自己主动提交位移。设置auto.commit.offset为false,那么应用需要自己通过调用commitSync()来主动提交位移,该方法会提交poll返回的最后位移。

    为了避免消息丢失,我们应当在完成业务逻辑后才提交位移。而如果在处理消息时发生了重平衡,那么只有当前poll的消息会重复消费。下面是一个自动提交的代码样例:

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        
        try {
            consumer.commitSync();
        } catch (CommitFailedException e) {
            log.error("commit failed", e)
        }
    }

    上面代码poll消息,并进行简单的打印(在实际中有更多的处理),最后完成处理后进行了位移提交。

    异步提交

    手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。以下为使用异步提交的方式,应用发了一个提交请求然后立即返回:

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            System.out.printf("topic = %s, partition = %s,
            offset = %d, customer = %s, country = %s\n",
            record.topic(), record.partition(), record.offset(),
            record.key(), record.value());
        }
        
        consumer.commitAsync();
    }

    但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

    因此,基于这种性质,一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果:

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s,
            offset = %d, customer = %s, country = %s\n",
            record.topic(), record.partition(), record.offset(),
            record.key(), record.value());
        }
        consumer.commitAsync(new OffsetCommitCallback() {
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (e != null)
                    log.error("Commit failed for offsets {}", offsets, e);
            } 
        });
    }

    而如果想进行重试同时又保证提交顺序的话,一种简单的办法是使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)。

    混合同步提交与异步提交

    正常情况下,偶然的提交失败并不是什么大问题,因为后续的提交成功就可以了。但是在某些情况下(例如程序退出、重平衡),我们希望最后的提交成功,因此一种非常普遍的方式是混合异步提交和同步提交,如下所示:

    try {
        while (true) {
           ConsumerRecords<String, String> records = consumer.poll(100);
           for (ConsumerRecord<String, String> record : records) {
               System.out.printf("topic = %s, partition = %s, offset = %d,
               customer = %s, country = %s\n",
               record.topic(), record.partition(),
               record.offset(), record.key(), record.value());
           }
           
           consumer.commitAsync();
        }
    } catch (Exception e) {
        log.error("Unexpected error", e);
    } finally {
        try {
            consumer.commitSync();
        } finally {
            consumer.close();
        }
    }

    在正常处理流程中,我们使用异步提交来提高性能,但最后使用同步提交来保证位移提交成功。

    提交特定位移

    commitSync()和commitAsync()会提交上一次poll()的最大位移,但如果poll()返回了批量消息,而且消息数量非常多,我们可能会希望在处理这些批量消息过程中提交位移,以免重平衡导致从头开始消费和处理。幸运的是,commitSync()和commitAsync()允许我们指定特定的位移参数,参数为一个分区与位移的map。由于一个消费者可能会消费多个分区,所以这种方式会增加一定的代码复杂度,如下所示:

    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    int count = 0;
    
    ....
    
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    
            currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
            if (count % 1000 == 0)
                consumer.commitAsync(currentOffsets, null);
            count++;
    } }

    代码中在处理poll()消息的过程中,不断保存分区与位移的关系,每处理1000条消息就会异步提交(也可以使用同步提交)。

    重平衡监听器(Rebalance Listener)

    在分区重平衡前,如果消费者知道它即将不再负责某个分区,那么它可能需要将已经处理过的消息位移进行提交。Kafka的API允许我们在消费者新增分区或者失去分区时进行处理,我们只需要在调用subscribe()方法时传入ConsumerRebalanceListener对象,该对象有两个方法:

    • public void onPartitionRevoked(Collection partitions):此方法会在消费者停止消费消费后,在重平衡开始前调用。
    • public void onPartitionAssigned(Collection partitions):此方法在分区分配给消费者后,在消费者开始读取消息前调用。

    下面来看一个onPartitionRevoked9)的例子,该例子在消费者失去某个分区时提交位移(以便其他消费者可以接着消费消息并处理):

    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    
    private class HandleRebalance implements ConsumerRebalanceListener {
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }
        
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Lost partitions in rebalance.
              Committing current
            offsets:" + currentOffsets);
            consumer.commitSync(currentOffsets);
        }
    }
    
    try {
        consumer.subscribe(topics, new HandleRebalance());
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
            {
                 System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
                 currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
            }
            consumer.commitAsync(currentOffsets, null);
        }
    } catch (WakeupException e) {
        // ignore, we're closing
    } catch (Exception e) {
       log.error("Unexpected error", e);
    } finally {
       try {
           consumer.commitSync(currentOffsets);
       } finally {
           consumer.close();
           System.out.println("Closed consumer and we are done");
       }
    }

    代码中实现了onPartitionsRevoked()方法,当消费者失去某个分区时,会提交已经处理的消息位移(而不是poll()的最大位移)。上面代码会提交所有的分区位移,而不仅仅是失去分区的位移,但这种做法没什么坏处。

    从指定位移开始消费

    在此之前,我们使用poll()来从最后的提交位移开始消费,但我们也可以从一个指定的位移开始消费。

    如果想从分区开始端重新开始消费,那么可以使用seekToBeginning(TopicPartition tp);如果想从分区的最末端消费最新的消息,那么可以使用seekToEnd(TopicPartition tp)。而且,Kafka还支持我们从指定位移开始消费。从指定位移开始消费的应用场景有很多,其中最典型的一个是:位移存在其他系统(例如数据库)中,并且以其他系统的位移为准。

    考虑这么个场景:我们从Kafka中读取消费,然后进行处理,最后把结果写入数据库;我们既不想丢失消息,也不想数据库中存在重复的消息数据。对于这样的场景,我们可能会按如下逻辑处理:

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
            processRecord(record);
            storeRecordInDB(record);
            consumer.commitAsync(currentOffsets);
        }
    }

    这个逻辑似乎没什么问题,但是要注意到这么个事实,在持久化到数据库成功后,提交位移到Kafka可能会失败,那么这可能会导致消息会重复处理。对于这种情况,我们可以优化方案,将持久化到数据库与提交位移实现为原子性操作,也就是要么同时成功,要么同时失败。但这个是不可能的,因此我们可以在保存记录到数据库的同时,也保存位移,然后在消费者开始消费时使用数据库的位移开始消费。这个方案是可行的,我们只需要通过seek()来指定分区位移开始消费即可。下面是一个改进的样例代码:

    public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //在消费者负责的分区被回收前提交数据库事务,保存消费的记录和位移
            commitDBTransaction();
        }
        
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            //在开始消费前,从数据库中获取分区的位移,并使用seek()来指定开始消费的位移
            for(TopicPartition partition: partitions)
                consumer.seek(partition, getOffsetFromDB(partition));
        } 
    }
    
        consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
        //在subscribe()之后poll一次,并从数据库中获取分区的位移,使用seek()来指定开始消费的位移
        consumer.poll(0);
        for (TopicPartition partition: consumer.assignment())
            consumer.seek(partition, getOffsetFromDB(partition));
    
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
            {
                processRecord(record);
                //保存记录结果
                storeRecordInDB(record);
                //保存位移
                storeOffsetInDB(record.topic(), record.partition(), record.offset());
            }
            //提交数据库事务,保存消费的记录以及位移
            commitDBTransaction();
        }

    具体逻辑见代码注释,此处不再赘述。另外注意的是,seek()只是指定了poll()拉取的开始位移,这并不影响在Kafka中保存的提交位移(当然我们可以在seek和poll之后提交位移覆盖)。

    优雅退出

    下面我们来讨论下消费者如何优雅退出。

    在一般情况下,我们会在一个主线程中循环poll消息并进行处理。当需要退出poll循环时,我们可以使用另一个线程调用consumer.wakeup(),调用此方法会使得poll()抛出WakeupException。如果调用wakup时,主线程正在处理消息,那么在下一次主线程调用poll时会抛出异常。主线程在抛出WakeUpException后,需要调用consumer.close(),此方法会提交位移,同时发送一个退出消费组的消息到Kafka的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)。

    下面是一个优雅退出的样例代码:

    //注册JVM关闭时的回调钩子,当JVM关闭时调用此钩子。
    Runtime.getRuntime().addShutdownHook(new Thread() {
              public void run() {
                  System.out.println("Starting exit...");
                  //调用消费者的wakeup方法通知主线程退出
                  consumer.wakeup();
                  try {
                      //等待主线程退出
                      mainThread.join();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              } 
    });
    
    ...
    
    try {
        // looping until ctrl-c, the shutdown hook will cleanup on exit
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            System.out.println(System.currentTimeMillis() + "--  waiting for data...");
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
            }
            for (TopicPartition tp: consumer.assignment())
                System.out.println("Committing offset at position:" + consumer.position(tp));
            consumer.commitSync();
        }
    } catch (WakeupException e) {
        // ignore for shutdown
    } finally {
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }

    反序列化

    如前所述,Kafka生产者负责将对象序列化成字节数组并发送到Kafka。消费者则需要将字节数组转换成对象,这就是反序列化做的事情。序列化与反序列化需要匹配,如果序列化使用IntegerSerializer,但使用StringDeserializer来反序列化,那么会反序列化失败。因此作为开发者,我们需要关注写入到主题使用的是什么序列化格式,并且保证写入的数据能够被消费者反序列化成功。如果使用Avro与模式注册中心(Schema Registry)来序列化与反序列化,那么事情会轻松许多,因为AvroSerializer会保证所有写入的数据都是结构兼容的,并且能够被反序列化出来。

    下面先来看下如何自定义反序列化,后面会进一步讨论如何使用Avro。

    自定义反序列化

    首先,假设序列化的对象为Customer:

    public class Customer {
         private int customerID;
         private String customerName;
         public Customer(int ID, String name) {
             this.customerID = ID;
             this.customerName = name;
         }
         public int getID() {
             return customerID;
         }
         public String getName() {
             return customerName;
         } 
    }

    根据之前的序列化策略,我们的反序列化代码如下:

    import org.apache.kafka.common.errors.SerializationException;
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CustomerDeserializer implements Deserializer<Customer> {
        @Override
        public void configure(Map configs, boolean isKey) {
         // nothing to configure
        }
    
        @Override
        public Customer deserialize(String topic, byte[] data) {
            int id;
            int nameSize;
            String name;
            try {
                if (data == null)
                    return null;
                if (data.length < 8)
                    throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
                ByteBuffer buffer = ByteBuffer.wrap(data);
                id = buffer.getInt();
                String nameSize = buffer.getInt();
                byte[] nameBytes = new Array[Byte](nameSize);
                buffer.get(nameBytes);
                name = new String(nameBytes, 'UTF-8');
                return new Customer(id, name);
            } catch (Exception e) {
                throw new SerializationException("Error when serializing Customer to byte[] " + e);
            }
        }
        @Override
        public void close() {
                // nothing to close
        } 
    }

    消费者使用这个反序列化的代码如下:

    Properties props = new Properties();
    props.put("bootstrap.servers", "broker1:9092,broker2:9092");
    props.put("group.id", "CountryCounter");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.CustomerDeserializer");
    
    KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
    consumer.subscribe("customerCountries")
    while (true) {
        ConsumerRecords<String, Customer> records = consumer.poll(100);
        for (ConsumerRecord<String, Customer> record : records)
        {
        System.out.println("current customer Id: " + record.value().getId() + " and current customer name: " + record.value().getName());
        } 
    }

    最后提醒下,我们并不推荐实现自定义的序列化与反序列化,因为往往这些方案并不成熟,难以维护和升级,而且容易出错。我们可以使用JSON、Thrift、Protobuf或者Avro的成熟的解决方案。

    使用Avro反序列化

    假设我们使用之前生产者Avro序列化时使用的Customer,那么使用Avro反序列化的话,我们的样例代码如下:

    Properties props = new Properties();
    props.put("bootstrap.servers", "broker1:9092,broker2:9092");
    props.put("group.id", "CountryCounter");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    //使用KafkaAvroDeserializer来反序列化Avro消息
    props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    //这里增加了schema.registry.url参数,获取生产者注册的消息模式
    props.put("schema.registry.url", schemaUrl);
    String topic = "customerContacts"
    
    KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
    consumer.subscribe(Collections.singletonList(topic));
    
    System.out.println("Reading topic:" + topic);
    
    while (true) {
        //这里使用之前生产者使用的Avro生成的Customer类
        ConsumerRecords<String, Customer> records = consumer.poll(1000);
        for (ConsumerRecord<String, Customer> record: records) {
            System.out.println("Current customer name is: " + record.value().getName());
        }
        consumer.commitSync();
    }

    单个消费者

    一般情况下我们都是使用消费组(即便只有一个消费者)来消费消息的,因为这样可以在增加或减少消费者时自动进行分区重平衡。这种方式是推荐的方式。在知道主题和分区的情况下,我们也可以使用单个消费者来进行消费。对于这种情况,我们需要自己给消费者分配消费分区,而不是让消费者订阅(成为消费组)主题。

    下面是一个给单个消费者指定分区进行消费的代码样例:

    List<PartitionInfo> partitionInfos = null;
    //获取主题下所有的分区。如果你知道所指定的分区,可以跳过这一步
    partitionInfos = consumer.partitionsFor("topic");
    
    if (partitionInfos != null) {
        for (PartitionInfo partition : partitionInfos)
            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
        //为消费者指定分区
        consumer.assign(partitions);
    
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record: records) {
                System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            consumer.commitSync();
        }
    }

    除了需要主动获取分区以及没有分区重平衡,其他的处理逻辑都是一样的。需要注意的是,如果添加了新的分区,这个消费者是感知不到的,需要通过consumer.partitionsFor()来重新获取分区。

    展开全文
  • kafka消费者如何批量消费消息

    千次阅读 2020-07-22 14:45:04
    整理kafka消费者批量消费消息开发笔记。 kafka使用的是2.1.11.RELEASE版本 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId>...

    整理kafka消费者批量消费消息开发笔记。

    kafka使用的是2.1.11.RELEASE版本

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.1.11.RELEASE</version>
    </dependency>

    Springboot项目启动类屏蔽掉自动配置

    @SpringBootApplication(scanBasePackages ={"com.pengyingjun"},exclude = {KafkaAutoConfiguration.class})

    新增kafka相关配置项

    kafka.bootstrap-servers = kakfa.*.*.com:9092
    kafka.consumer.auto-commit-interval = 1000
    kafka.consumer.max-poll-records = 1000
    kafka.consumer.enable-auto-commit = true
    kafka.consumer.concurrency = 5
    kafka.consumer.group-id = pengyingjun_log
    kafka.consumer.auto-offset-reset = earliest
    kafka.consumer.log_topic = pengyingjun

    新增kafka消费者配置类

    @Configuration
    @EnableKafka
    @Slf4j
    public class KafkaConsumerConfig {
        /** 以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接 */
        @Value("${kafka.bootstrap-servers}")
        private String servers;
        /** 如果为true,则消费者的偏移量将在后台定期提交,默认值为true */
        @Value("${kafka.consumer.enable-auto-commit}")
        private boolean enableAutoCommit;
        /** 心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000 */
        @Value("${kafka.consumer.auto-commit-interval}")
        private String autoCommitInterval;
        /** 当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量 可选的值为latest, earliest, none*/
        @Value("${kafka.consumer.auto-offset-reset}")
        private String autoOffsetReset;
        /** 在监听器容器中运行的线程数 */
        @Value("${kafka.consumer.concurrency}")
        private int concurrency;
        /** 一次调用poll()操作时返回的最大记录数,默认值为500 */
        @Value("${kafka.consumer.max-poll-records}")
        private int maxPollRecords;
        /** 用于标识此使用者所属的使用者组的唯一字符串 */
        @Value("${kafka.consumer.group-id}")
        private String groupId;
    
        /**
         *  消费者批量工厂
         */
        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
            // 并发创建的消费者数量
            factory.setConcurrency(concurrency);
            // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
            factory.setBatchListener(true);
            factory.getContainerProperties().setPollTimeout(1500);
            return factory;
        }
    
        /**
         *  消费者配置信息
         */
        private Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>(16);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10485760);
            props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 10485760);
            return props;
        }
    }

    新增kafka生产者配置类

    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String servers;
    
        private Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>(8);
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1000);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        private ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }

    消费监听逻辑代码

    @KafkaListener(topics = "pengyingjun", groupId = "pengyingjun_log", containerFactory = "kafkaListenerContainerFactory")
        public void handleHotValue(List<ConsumerRecord<?, ?>> records){
            List<String> messages = new ArrayList<>();
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                kafkaMessage.ifPresent(o -> messages.add(o.toString()));
            }
            if (messages.size() > 0) {
                //业务处理逻辑
            }
        }

    模拟大数据量消息代码

    @Test
        public void testSendKafka() throws InterruptedException {
    
            int clientTotal = 10000;
            int threadTotal = 200;
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        String log = "223.104.63.101 - - [1594828915] \"GET /click/track?s0=WxAppStart&sm0=&sk0=&sRemarks0=&t0=GoodsDetailPage&tm0=&tk0=ABCDEFG&ts0=1594828915186 HTTP/1.1\" 200  \"Mozilla/5.0 (iPhone; CPU iPhone OS 9_2_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Mobile/13D15 M\n" +
                                "icroMessenger/7.0.9(0x17000929) NetType/4G Language/zh_CN\" \"223.104.63.101\" \"click.dalingheart.com\" \"-\" \"wxapp\" \"0000070800011202756008308\" \"2\" \"-\" \"-\" \"\" \"-\" \"-\"";
                        kafkaTemplate.send("dhclick", log);
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception >>> ", e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
        }

    至此,完成了kafka批量消费需求。

    展开全文
  • kafka消费延迟问题查找

    千次阅读 2020-12-10 20:32:04
    最近线上偶尔爆出kafka消费延迟,但是系统的数据量并不大,为什么会延迟呢? 具体分析如下。 思路: 1.查看机器中数据积压情况,是否是因为数据量过大导致的消费延迟。 2.统计数据发送kafka成功到数据消费出来(还未...

    最近线上偶尔爆出kafka消费延迟,但是系统的数据量并不大,为什么会延迟呢?
    具体分析如下。

    基本思路
    1.查看机器中数据积压情况,是否是因为数据量过大导致的消费延迟。
    2.统计数据发送kafka成功到数据消费出来(还未做业务处理)的耗时。
    3.统计数据消费出来并完成业务处理的耗时。

    一、查看kafka机器的topic在每个分区的数据分配情况

    在这里插入图片描述
    可以看出该group中有一个topic,该topic有6个partition,消费者分布在两台机器上(两个ip),每台机器有三个消费者。
    重点关注每个partition的日志积压情况(查看LAG参数):
    partition-3中有0条数据积压
    partition-1中有1条数据积压
    partition-2中有3条数据积压
    partition-0中有0条数据积压
    partition-4中有0条数据积压
    partition-5中有0条数据积压
    可以看出数据积压不严重。不是数据量大导致的延迟

    二、统计数据发送kafka成功到数据消费出来(还未做业务处理)的耗时

    经过日志统计分析,基本是毫秒级别的耗时。所以不是该问题导致的延迟。

    三、统计数据消费出来并完成业务处理的耗时

    经过日志统计分析,基本是毫秒级别的耗时。但是偶尔出现20分钟的处理耗时。所以,可以肯定延迟是由于处理消费的数据时部分操作导致了。
    找到这部分日志,发现是由于业务处理中,有时需要调用一个外部的接口,结果这个接口的地址不通,在http调用时未设置超时,导致服务卡在这里等待20分钟后才超时,从而导致部分后续消费的延迟。

    四、结果处理

    最终,将http请求超时设置为15s后就解决了这个问题。

    展开全文
  • kafka消费者无法消费异常

    千次阅读 2020-01-07 18:10:17
    今天被一个kafka消费异常折磨了一天,头差点炸了,还好最后解决了它 异常:服务器:record is corrupt(记录损坏)不明原因 有可能磁盘空间不足导致 导致消费者无法正常消费消息 卡在某一个offset 不能继续消费 ...
  • kafka消费内存溢出问题

    千次阅读 2019-06-14 15:01:40
    关于kafka消费缓慢,内存以及cpu 吃尽的问题,小卡夫卡消费内存溢出问题,最近在做kafka消费优化,所以特地来记录下 spring boot项目集成kafka 注解监听消费 1 项目启动 kafka消费监听启动中 内存溢出 2 kafka消费到...
  • Apache Flink如何管理Kafka消费者offsets
  • 简单的Kafka消费者-生产者示例 运行项目的步骤 下载Kafka 0.10.0.0二进制文件cd kafka_2.11-0.10.0.0 运行zookeeper和kafka服务器bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-...
  • kafka消费过程中失败 kafka重试补偿

    千次阅读 2020-05-09 17:02:09
    kafka消费过程中失败 kafka重试补偿
  • kafka消费者分组消费的再平衡策略

    千次阅读 2018-12-24 00:06:40
    一,Kafka消费模式从kafka消费消息,kafka客户端提供两种模式: 分区消费,分组消费。分区消费对应的就是我们的DirectKafkaInputDStream分组...
  • Kafka系列文章: Kafka系列 —— 入门及应用场景 &amp; 部署 &...Kafka消费语义分析 通常架构:Flume --&gt; Kafka --&gt; Spark Srteaming Flume作为Kafka的生产者 Spark Stre...
  • Kafka消费者重置offset

    千次阅读 2020-07-28 18:53:00
    shell 命令实现 kafka重新设置group 消费的offset java 代码实现 Kafka消费者重置offset读取数据
  • kafka 消费者代码示例

    万次阅读 2016-10-28 00:27:37
    最近在公司项目中使用kafka,主要的功能是从kafka消费数据,并且将数据以对象的方式写入自定义的日志文件中,每天生产一个日志文件。 一开始使用高版本的kafka(0.9.0.0) maven配置 <groupId>org.apache.kafka ...
  • kafka的学习记录——kafka消费

    千次阅读 2018-01-05 16:57:30
    1、kafka消费实例的工作过程,从启动开始。(待补充) 2、auto.offset.reset属性:默认值为 latest earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费  latest...
  • Kafka消费者消费方式

    千次阅读 2020-02-04 21:19:42
    consumer消费方式 ...针对这一点,Kafka消费者在消费数据时会传入一个时长参数 timeout,如果当前没有 数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。 push模式,由Broker推送...
  • Kafka系列(四)Kafka消费者:从Kafka中读取数据

    万次阅读 多人点赞 2018-05-08 16:36:21
    本系列文章为对《Kafka:The Definitive Guide》的学习整理...Kafka消费者相关的概念消费者与消费组假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事...
  • Kafka消费者组是什么?

    千次阅读 2019-08-17 14:20:02
    一、Kafka消费者组是什么? 二、Kafka消费者组解决了哪些问题?
  • spring boot 1.5集成 kafka 消费者怎么自己确认消费 怎么使用@KafkaListener注解实现Acknowledgment,即消费者怎么自己提交游标
  • Kafka学习笔记(五) --使用java api监控新版Kafka消费者组的消费积压 正式开篇之前,先简单介绍下该需求产生的背景: 随着部署在生产环境中的kafka消费端应用越来越多 通过人工的方式(去机房使用Kafka监控工具或者...
  • kafka消费者--指定分区消费

    千次阅读 2020-03-12 21:56:48
    kafka消费者有两种模式, 订阅模式和分配模式, 具体区别如下 模式 不同之处 相同之处 subscribe() 使用 Kafka Group 管理,自动进行 rebalance 操作 可以在 Kafka 保存 offset assign() 用户自己...
  • 本文章对应的 kafka 版本是 kafka_2.11-0.10.0.1 版本号的含义 scala 2.11 kafka 0.10.0.1 ... kafka 0.9 及以上 有了一个大版本变化, ... 2.kafka 消费者偏移量信息 不再单纯的存储在 zookeeper 中, kafk...
  • Kafka消费积压Lag值Python代码获取

    千次阅读 2019-04-24 10:56:15
    Kafka消费积压Lag值Python代码获取 背景 根据kafka的消息堆积情况,自动扩容消费者集群实例数目。 代码及测试 from kafka import KafkaConsumer from kafka import KafkaProducer from kafka import To.....
  • 使用Apache Kafka 消费者组时,有一个为消费者分配对应分区partition的过程,我们可以使用“自动”subscribe和“手动”assign的方式。 同时进行“自动”和“手动”的分区分配是会互相影响的,有时会把事情搞糟。...
  • springboot中kafka消费之配置详解

    千次阅读 2019-08-19 10:45:46
    kafka消费者默认开启线程池,可以通过consumer.concurrency来设置消费线程数 #原始数据kafka读取 kafka.consumer.servers=IP:9092,IP:9092(kafka消费集群ip+port端口) kafka.consumer.enable.auto.commit=true...
  • kafka消费者对应partition关系

    千次阅读 2020-03-25 10:46:56
    1.kafka消费组基本概念 kafka消费topic是以group为单位来的,一个group消费一个topic。一个group能容纳多个consumer。consumer消费是以分区(partition)来的,一个consumer可以消费一个或多个partition,一个...
  • shell命令行获取kafka消费情况

    千次阅读 2020-08-18 23:26:03
    原先是用kafkaManager查看kafka消费积压情况,后面基于平衡几台服务器内存,把kafkaManger关了,需要使用kafka自带的sh查看消费情况 常用命令: 获取topic列表/usr/local/kafka/bin/kafka-topics.sh --list --...
  • kafka消费者之seek方法

    千次阅读 2020-04-14 23:49:13
    继续kafka学习之旅。...这个参数的意思是:当kafka消费者在_consumer_offset主题中找不到所属分区的offset时,该参数就派上用场了,改参数有三个可选值,latest、earilst、none。第一个取值是说,当消费者找不到...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 95,177
精华内容 38,070
关键字:

kafka消费