精华内容
下载资源
问答
  • kafka consumer 停止消费topic

    万次阅读 2018-04-07 12:23:10
    现象在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令查看,kafka 已经没有consumer 的信息了。实验用例实验 kafka ...

    现象

    在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令查看,kafka 已经没有consumer 的信息了。

    实验用例

    实验 kafka consumer 实现:

    package com.muhao.kafka;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    public class MyKafKaConsumer {
    	public static void main(String[] args) throws InterruptedException {
    		Properties props = new Properties();
    		props.put("bootstrap.servers", "192.168.220.10:9092");
    		props.put("group.id", "test");
    		props.put("enable.auto.commit", "true");
    		props.put("auto.commit.interval.ms", "1000");
    		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    		consumer.subscribe(Arrays.asList("my-topic"));
    		while (true) {
    			ConsumerRecords<String, String> records = consumer.poll(100);
    			for (ConsumerRecord<String, String> record : records) {
    				// 一次停止10 秒钟,如果上一次得到超过30条消息,就会出现kafka consumer停止消费的现象
    				Thread.sleep(10000L);
    				System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    				
    			}
    		}
    
    	}
    }
    

    实验 kafka producer 实现:

    package com.muhao.kafka;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    
    public class MyKafkaProducer {
    	public static void main(String[] args) {
    		Properties props = new Properties();
    		 props.put("bootstrap.servers", "192.168.220.10:9092");
    		 props.put("acks", "all");
    		 props.put("retries", 0);
    		 props.put("batch.size", 16384);
    		 props.put("linger.ms", 1);
    		 props.put("buffer.memory", 33554432);
    		 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    		 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    		 Producer<String, String> producer = new KafkaProducer<>(props);
    		 for (int i = 0; i < 1000; i++)
    		     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), "hello message is number : "+Integer.toString(i)));
    
    		 producer.close();
    	}
    }

    启动 kafka consumer ,成功运行后在 kafka命令行执行 

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe  --group test


    发现kafka consumer 已经注册到kafka集群中了。

    kafka producer发送消息让kafka consumer 消费,但是consumer是阻塞的,等待5分钟时候,运行命令行


    发现kafka集群已经没有了 consumer的消息,但是程序仍在运行。


    解决及建议

    这种现象也是纠结了好长时间,查看源码,终于明白了,原来是在 kafka consumer 运行时,要和kafka集群的协调节点做心跳交流,这也是kafka集群给consumer做负载均衡的条件。但是但是consumer内部也会有一个计时器,记录上一次向 kafka 集群 poll 的时间,另外心跳线程会检测该现在距上一次poll的时间,如果该时间差超过了设定时间(kafka consumer默认的是 5分钟),就会想kafka集群发出leaveGroup,这时kafka集群会注销掉该consumer 的信息。

    建议:kafka consumer 在消费消息时,不要使用阻塞方法,比如blockqueue、网络发送设置超时时间……
    总得一句就是上下两次poll 的时间间隔不要超过5分钟(默认的时间)。

    展开全文
  • kafka-python 停止消费

    千次阅读 2018-07-12 13:45:03
    kafka-python 停止消费使用kafka-python时,不消费数据,也没有异常换为pykafka库后解决使用例子from pykafka import KafkaClient client = KafkaClient(hosts='127.0.0.1:9092')topic = client.topics['logsget'] ...

    kafka-python 停止消费

    使用kafka-python时,不消费数据,也没有异常


    换为pykafka库后解决

    使用例子

    1. from pykafka import KafkaClient
    2. client = KafkaClient(hosts='127.0.0.1:9092')
    3. topic = client.topics['logsget']
    4. consumer = topic.get_simple_consumer()
    5. for msg in consumer:
    6. if msg is not None:
    7. print msg.value


    pykafka中consumer_group的设置

    先看符合要求的代码

    kafka生产者

    import logging
    import logger
    from pykafka import KafkaClient
    client = KafkaClient(hosts="**")
    logging.info(client.topics)
    
    input=raw_input('please enter your topic here:')
    topic = client.topics[input]
    
    producer=topic.get_sync_producer()
    while True:
        event = raw_input("Add what to event log?: ('Q' to end.): ")
        if event == 'Q':
            break
        else:
           # msg = event.encode('UTF-8', 'ignore')
    
            producer.produce(event)                                
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    kafka消费者:

    import logging
    from pykafka import KafkaClient
    client = KafkaClient(hosts="**")
    
    input=raw_input('please enter the topic here:')
    topic = client.topics[input]
    print input
    print topic
    i=0
    
    #if you want to consumer next time,you can change the consumer_group
    consumer=topic.get_simple_consumer(consumer_group='sec3',auto_commit_enable=True,auto_commit_interval_ms=1,consumer_id='sec2')
    
    for message in consumer:
        if message is not None:
            print message.offset,message.value
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    那么先运行一次 
    consumer group为sec2的时候,正常得到消息。再试一次,读取不到了。

    https://img-blog.csdn.net/20161123170220803

    修改group_id:

    https://img-blog.csdn.net/20161123170550969

    修改之后,实现kafka的订阅原理:消费之后不能再次消费,如果想得到数据必须修改group_id。


    探索过程

    看了下,在java中,group_id应该很好配置:

      private static ConsumerConfig createConsumerConfig()
        {
            Properties props = new Properties();
            props.put("zookeeper.connect", "**");
            props.put("group.id", "password");
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    然后自己也尝试:

    consumer=topic.get_simple_consumer(consumer_group='sec3',consumer_id='sec2')
    • 1

    但是每次再消费的时候总是能得到这个数据。 
    但显然不行,目前kafka每秒处理几千条。那么一天就上千万条了。我下次再消费的数据的时候,是不希望得到历史的这么多数据的。 
    查了很多资料,没有提到这个问题的,全是复制别人的东西,还不如看官方文档。于是看文档

    >>>from pykafka import KafkaClient
    >>>client = KafkaClient(hosts="**")
    >>>topic = client.topics[input]
    >>>consumer=topic.get_simple_consumer()
    >>>help(consumer)
    • 1
    • 2
    • 3
    • 4
    • 5

    pykafka基本文档也就这些。没办法,只能挨个看参数有什么用了。 
    加入了commit_enable 的参数之后,终于符合要求了。

    展开全文
  • 收到生产环境告警,一直好好消费的客户端突然有天消费不到消息,事后找出原因:kafka生产者部分节点升级kafka-client的版本,导致发送到kafka的消息版本变高,由于服务端版本没有变,导致低版本的消费端向服务器获取...

    收到生产环境告警,一直好好消费的客户端突然有天消费不到消息,事后找出原因:kafka生产者部分节点升级kafka-client的版本,导致发送到kafka的消息版本变高,由于服务端版本没有变,导致低版本的消费端向服务器获取消息时,服务端报异常

    [2021-01-26 17:50:59,952] ERROR [KafkaApi-1] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,topics=[{topic=ucLoginLog1,partitions=[{partition=2,fetch_offset=0,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
    java.lang.IllegalArgumentException: Magic v1 does not support record headers
            at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
            at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
            at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
            at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
            at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
            at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
            at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
            at scala.Option.map(Option.scala:146)
            at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
            at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
            at scala.Option.flatMap(Option.scala:171)
            at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
            at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
            at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
            at scala.collection.Iterator$class.foreach(Iterator.scala:891)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
            at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
            at scala.collection.AbstractIterable.foreach(I

    排查过程试过调整偏移量:发现消费到特定位置就不消费,事后来看之前能消费的消息是低版本推送过来的消息,后面的消息为高版本发送的消息,服务端响应消费者的时候抛出版本不一致的消息异常,消费者没有获取到偏移量,导致消费者无法加入当前组,消费者线程挂起

    通过源码分析:

    结合异常堆栈消息查看kafka服务端源码具体分析如下:

     

    在Broker拉取到Kakfa消息后,调用fetchResponseCallback回调方法,创建返回信息时,会校验消费者Api版本,如果低于当前Broker版本与向下转换消息,最终调用的是MemoryRecordsBuilder类的方法appendWithOffset

     

    抛出异常:java.lang.IllegalArgumentException: Magic v1 does not support record headers

     

    得出结论,kafka虽然解耦了架构,但还是涉及到版本问题的兼容性,对生产应急影响很大。

     

     

     

    展开全文
  • brew install kafka ==> zookeeper To have launchd start zookeeper now and restart at login: brew services start zookeeper Or, if you don't want/need a background service you can just run: zk...
    brew install kafka
    
    ==> zookeeper
    To have launchd start zookeeper now and restart at login:
      brew services start zookeeper
    Or, if you don't want/need a background service you can just run:
      zkServer start
    ==> kafka
    To have launchd start kafka now and restart at login:
      brew services start kafka
    Or, if you don't want/need a background service you can just run:
      zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
    

    kafka 启动需要zookeeper

    zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
    

    创建一个主题

    kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    查看主题

    kafka-topics --list --zookeeper localhost:2181
    

    2.测试生产者(producer)与消费者(consumer)

    kafka-console-producer --broker-list localhost:9092 --topic test
    kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
    
    展开全文
  • kafka消费

    2020-06-05 20:22:40
    kafka消费者 文章目录kafka消费者1.重要概念消费者和消费消费者和消费组的简单使用2. 消息接收2.1 重要参数2.2 订阅主题和分区2.3 位移提交(1)重复消费和消息丢失(2)自动提交(3)手动同步提交和手动异步...
  • kafka消费

    千次阅读 2016-03-23 20:08:47
    第一步:参看消费者的基本情况 查看mwbops系统,【Consumer监控】-->【对应的consumerId】 ...如果Owner是空,说明消费端的程序已经跟Kafka断开连接,应该排查消费端是否正常,return; 如果Owner不为空,就是
  • 关于kafka重新消费数据问题

    万次阅读 2016-08-25 22:58:16
    我们在使用consumer消费数据时,有些情况下我们需要对已经消费过的数据进行重新消费,这里介绍kafka中两种重新消费数据的方法。 1.修改offset 我们在使用consumer消费的时候,每个topic会产生一个偏移量,这个...
  • kafka如何消费消息

    万次阅读 多人点赞 2018-05-25 11:32:26
    转载自:http://generalcode.cn/archives/255消费者与消费组假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例...Kafka消费者是消费组的一部分,当多个消费者形...
  • kafka 消费

    2018-03-28 11:04:03
    前置资料 kafkakafka消费中的问题及解决方法:情况1:问题:脚本读取kafka 数据,写入到数据库,有时候出现MySQL server has gone away,导致脚本死掉。再次启动,这过程中的kafka数据丢失。原因:MySQL ...
  • kafka重复消费问题

    千次阅读 2018-02-06 17:28:08
    kafka的consumer消费数据时首先会从broker里读取一批消息数据进行处理,处理完成后再提交offset。而我们项目中的consumer消费能力比较低,导致取出的一批数据在session.timeout.ms时间内没有处理完成,自动提交...
  • kafka停止consumer,producer等进程

    千次阅读 2018-04-22 17:28:18
    kafka停止consumer,producer等进程 1.官网给出的方案: You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the ZooKeeper server in order via ...
  • 解决Kafka无法消费问题终极大法:重置Kafka 环境介绍:zookeeper集群(三节点)+Kafka集群(三节点) 重置步骤: 1. 停止kafka 2. 删除kafka存储目录(server.properties文件log.dirs配置 3. 删除zookeeper上与kafka...
  • Kafka消费者概念

    2018-12-26 10:20:36
    应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 。 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法。如果不先理解 这些概念,就难以理解如何使用消费者 API...
  • kafka 暂停消费

    千次阅读 2018-10-16 15:48:00
    kafkaListener 需要指定id,例如这里是:full-part-id。 @KafkaListener(topics = "part-full-topic", id = "full-part-id", containerGroup = "full-part-group") public void ...
  • Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。 既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。 组内的...
  • 初始 Kafka Consumer 消费

    千次阅读 2019-11-24 19:10:07
    温馨提示:整个 Kafka 专栏基于 kafka-2.2.1 版本。 1、KafkaConsumer 概述 根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征: 在 KafkaKafkaConsumer 是线程不安全的。 2.2.1 版本的...
  • 但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,...
  • I am doing Python Kafka consumer (trying to use kafka.consumer.SimpleConsumer or kafka.consumer.simple.SimpleConsumer in http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html)....
  • Kafka重复消费和丢失数据研究

    万次阅读 2016-12-31 22:15:59
    Kafka重复消费原因、数据丢失 底层根本原因:已经消费了数据,但是offset没提交。 原因1:强行kill线程,导致消费后的数据,offset没有提交。 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用...
  • Kafka消费

    2020-12-14 20:00:36
    与其他一些消息中间件不同的是:在Kafka消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。每个消费者...
  • Kafka消费者-从Kafka读取数据

    千次阅读 2020-07-26 17:24:55
    (1)Customer和Customer Group (1)两种常用的消息模型 队列模型(queuing)和发布-订阅...Kafka为这两种模型提供了单一的消费者抽象模型:消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个.

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,876
精华内容 4,750
关键字:

kafka停止消费