精华内容
下载资源
问答
  • kafka学习总结(处理多个consumer消费topic数据一次)2017年05月12日 11:53:50阅读数:11260最近遇到一个问题,由于kafka接收数据进行处理所花费的时间较长,导致kafka队列中有堆积,然后就想开启很多个consumer...
    kafka学习总结(处理多个consumer只消费topic数据一次)

    最近遇到一个问题,由于kafka接收数据进行处理所花费的时间较长,导致kafka队列中有堆积,然后就想开启很多个consumer但是不怎么会用,报了一些错误,通过一天的学习了解,终于可以让多个consumer共同消费topic中的数据了。

    写入数据和读取数据

    使用3个producer同时对一个topic写入数据,其中使用2个group组来对数据进行读取,其中topic中的partitions定为2。在每个group下又创建2个consumer进行消费数据。

    在项目刚开始,我只在topic中设置了一个partitions就是只有一个消费者来消费我传入的数据,但是由于项目的变化,消费的太慢写入的太快,给kafka带来了数据堆积,于是我又加了一个consumer来进行数据的消费,由于刚开始没有给group创建ID使用默认ID,但是发现我每个consumer消费的数据是相同的,并没有达到我的需求。kafka的一条数据会被我的2个consumer同时消费,消费2次,并没有增加效率,而且还给系统带来负担,后台查询官网API发现group中是有ID的,如果没有创建就自动使用默认ID这个一定要注意。其次是一个partition对应一个consumer,如果consumer的数量大于Topic中partition的数量就会有的consumer接不到数据(设置ID不使用默认ID的情况下)。 
    为了满足的我业务需求我做了一下调整: 
    增加topic中partition中的数量。 
    相应增加consumer的数量 consumer的数量<=partition的数量 
    这里需要强调的是不同的group组之间不做任何影响,就如同我一个group做python机器学习。另一个做Spark计算,这2个group的数据都是相互不影响的,这也是kafka很好用的东西。 
    下面java的代码如下需要对给group添加一个name:

    consumer1

    public static Properties props;
        static {
            props = new Properties();
            props.put("zookeeper.connect", "node1:2181/kafka");
            props.put("serializer.class", StringEncoder.class.getName());
            props.put("metadata.broker.list", "node1:9092");
            props.put("group.id", "group"); // group组的名字 (做group组区分)
            props.put("group.name", "1"); // 当前group组中的名字
                                            // (在相同的group组中做consumer的qufe)
        }
    
        public static void main(String[] args) throws InterruptedException {
            String topic = "zhu1";
            ConsumerConnector consumer = Consumer
                    .createJavaConsumerConnector(new ConsumerConfig(props));
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 1); // 取哪一个topic 取几条数据
            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
                    .createMessageStreams(topicCountMap);
            final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(
                    topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
            while (iterator.hasNext()) {
                String item = new String(iterator.next().message());
                // String msg;
                // try {
                // msg = new String(item.getBytes("gbk"),"utf-8");
                System.out.println("收到消息:" + item);
                Thread.sleep(2000);
                // } catch (UnsupportedEncodingException e) {
                // // TODO Auto-generated catch block
                // e.printStackTrace();
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    consumer2:

    public static Properties props;
        static {
            props = new Properties();
            props.put("zookeeper.connect", "node1:2181/kafka");
            props.put("serializer.class", StringEncoder.class.getName());
            props.put("metadata.broker.list", "node1:9092");
            props.put("group.id", "group"); // group组的名字 (做group组区分)
            props.put("group.name", "2"); // 当前group组中的名字
                                            // (在相同的group组中做consumer的qufe)
        }
    
        public static void main(String[] args) throws InterruptedException {
            String topic = "zhu1";
            ConsumerConnector consumer = Consumer
                    .createJavaConsumerConnector(new ConsumerConfig(props));
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 1); // 取哪一个topic 取几条数据
            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
                    .createMessageStreams(topicCountMap);
            final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(
                    topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
            while (iterator.hasNext()) {
                String item = new String(iterator.next().message());
                // String msg;
                // try {
                // msg = new String(item.getBytes("gbk"),"utf-8");
                System.out.println("收到消息:" + item);
                Thread.sleep(2000);
                // } catch (UnsupportedEncodingException e) {
                // // TODO Auto-generated catch block
                // e.printStackTrace();
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    这里我的producer写入是使用python写的:

        topic = client.topics["zhu1"]
        producer = topic.get_sync_producer()
        count = 0
        print "2"
        while(1):
            producer.produce("test test"+str(count))
            time.sleep(2)
            count+=1
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    结果如下图所示: 
    2个consumer的结果

    到了这里已经实现读入kafka多对多的消费了 ,第一次写博客希望对大家有帮助! 
    但是在python中出现了一些问题,希望有大神可以解决。 
    在python中我是使用pykafka进行kafka调用的我设置好group的id后貌似并没有起作用,2个consumer读到的内容都是相同的内容,但是2个consumer在同一个group中,所以很是纠结。希望后续可以进行解决。

    展开全文
  • 注意点:一个group中的consumer数 需小于 topic中的partitions分区数(一个partitions只能个consumer进行消费;一个consumer可以消费多个partitions) kafka命令: 查看topic详情:./bin/kafka-topics.sh --...

    需求:一个topic消息队列设置3个partitions分区,设置一个group组中3个consumer进行消费
    注意点:一个group中的consumer数 需小于 topic中的partitions分区数(一个partitions只能被一个consumer进行消费;一个consumer可以消费多个partitions)
    kafka命令:
             查看topic详情:./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

             修改topic分区:./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test --partitions 5

    展开全文
  • kafka多个consumer消费topic数据一次

    万次阅读 2018-01-05 15:47:36
    最近遇到一个问题,由于kafka接收数据进行处理所花费的时间较长,导致kafka队列中有堆积,然后就想开启很多个consumer但是不怎么会用,报了一些错误,通过一天的学习了解,终于可以让多个consumer共同消费topic中的...

    最近遇到一个问题,由于kafka接收数据进行处理所花费的时间较长,导致kafka队列中有堆积,然后就想开启很多个consumer但是不怎么会用,报了一些错误,通过一天的学习了解,终于可以让多个consumer共同消费topic中的数据了


    使用3个producer同时对一个topic写入数据,其中使用2个group组来对数据进行读取,其中topic中的partitions定为2。在每个group下又创建2个consumer进行消费数据。

    在项目刚开始,我只在topic中设置了一个partitions就是只有一个消费者来消费我传入的数据,但是由于项目的变化,消费的太慢写入的太快,给kafka带来了数据堆积,于是我又加了一个consumer来进行数据的消费,由于刚开始没有给group创建ID使用默认ID,但是发现我每个consumer消费的数据是相同的,并没有达到我的需求。kafka的一条数据会被我的2个consumer同时消费,消费2次,并没有增加效率,而且还给系统带来负担,后台查询官网API发现group中是有ID的,如果没有创建就自动使用默认ID这个一定要注意。其次是一个partition对应一个consumer,如果consumer的数量大于Topic中partition的数量就会有的consumer接不到数据(设置ID不使用默认ID的情况下)。
    为了满足的我业务需求我做了一下调整:
    增加topic中partition中的数量。
    相应增加consumer的数量 consumer的数量<=partition的数量
    这里需要强调的是不同的group组之间不做任何影响,就如同我一个group做python机器学习。另一个做Spark计算,这2个group的数据都是相互不影响的,这也是kafka很好用的东西。
    下面java的代码如下需要对给group添加一个name:

    consumer1

    public static Properties props;
        static {
            props = new Properties();
            props.put("zookeeper.connect", "node1:2181/kafka");
            props.put("serializer.class", StringEncoder.class.getName());
            props.put("metadata.broker.list", "node1:9092");
            props.put("group.id", "group"); // group组的名字 (做group组区分)
            props.put("group.name", "1"); // 当前group组中的名字
                                            // (在相同的group组中做consumer的qufe)
        }
    
        public static void main(String[] args) throws InterruptedException {
            String topic = "zhu1";
            ConsumerConnector consumer = Consumer
                    .createJavaConsumerConnector(new ConsumerConfig(props));
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 1); // 取哪一个topic 取几条数据
            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
                    .createMessageStreams(topicCountMap);
            final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(
                    topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
            while (iterator.hasNext()) {
                String item = new String(iterator.next().message());
                // String msg;
                // try {
                // msg = new String(item.getBytes("gbk"),"utf-8");
                System.out.println("收到消息:" + item);
                Thread.sleep(2000);
                // } catch (UnsupportedEncodingException e) {
                // // TODO Auto-generated catch block
                // e.printStackTrace();


    consumer2:

    public static Properties props;
        static {
            props = new Properties();
            props.put("zookeeper.connect", "node1:2181/kafka");
            props.put("serializer.class", StringEncoder.class.getName());
            props.put("metadata.broker.list", "node1:9092");
            props.put("group.id", "group"); // group组的名字 (做group组区分)
            props.put("group.name", "2"); // 当前group组中的名字
                                            // (在相同的group组中做consumer的qufe)
        }
    
        public static void main(String[] args) throws InterruptedException {
            String topic = "zhu1";
            ConsumerConnector consumer = Consumer
                    .createJavaConsumerConnector(new ConsumerConfig(props));
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 1); // 取哪一个topic 取几条数据
            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
                    .createMessageStreams(topicCountMap);
            final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(
                    topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
            while (iterator.hasNext()) {
                String item = new String(iterator.next().message());
                // String msg;
                // try {
                // msg = new String(item.getBytes("gbk"),"utf-8");
                System.out.println("收到消息:" + item);
                Thread.sleep(2000);
                // } catch (UnsupportedEncodingException e) {
                // // TODO Auto-generated catch block
                // e.printStackTrace();
    这里我的producer写入是使用python写的:

    topic = client.topics["zhu1"]
        producer = topic.get_sync_producer()
        count = 0
        print "2"
        while(1):
            producer.produce("test test"+str(count))
            time.sleep(2)
            count+=1
    结果如下图所示:


    到了这里已经实现读入kafka多对多的消费了 ,第一次写博客希望对大家有帮助!
    但是在python中出现了一些问题,希望有大神可以解决。
    在python中我是使用pykafka进行kafka调用的我设置好group的id后貌似并没有起作用,2个consumer读到的内容都是相同的内容,但是2个consumer在同一个group中,所以很是纠结。希望后续可以进行解决。

    转载:http://blog.csdn.net/zhaishujie/article/details/71713794


    关于消费,还可以参考:https://www.cnblogs.com/huxi2b/p/4757098.html

    http://blog.csdn.net/qq_20641565/article/details/59746101

    http://blog.csdn.net/qq_20641565/article/details/60810174

    http://blog.csdn.net/qq_20641565/article/details/64440425



    展开全文
  • https://www.oschina.net/question/2558468_2145935
    展开全文
  • 只会有一个consumer能够读取到数据, 其它的consumer是无法消费到数据的. 谁要是知道怎么做,恳请告知!
  • public class kafkaConsumer extends Thread {private String topic;public kafkaConsumer() {super();}@Overridepublic void run() {List topicList = Arrays.asList("topic1","topic2","topic3","topic4","topic5...
  • Kafka是一个推送的消息框架,支持java、python、c/c++等待。本次与Kafka接触了一段时间,...一个主题可以分为多个分区,其封装的c++库已经能够按顺序从不同的分区中将数据取出来了,所以还是相对方便的。  我只写...
  • 将这个主题的消息分发给两个(或者多个消费消费,(不能消费相同的消息) 1.图解 2.关键注解@kafkaListener @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @...
  • rocketmq建议一个服务对应一个topic,但是一个服务下会有多个不同的业务消息,同时rocketmq建议不同的业务消息对应不同的tag,当SpringBoot整合RocketMQ时,设置多个消费者发生报错 问题复现 RocketMQ创建多个消费者...
  • kafka消费多个topic配置

    万次阅读 2019-04-30 14:01:42
    我们的项目是springboot集成kafka的,配置文件在application.yml里如下: ... consumer: bootstrap: maxPollRecords: 1000 enableAutoCommit: true autoCommitIntervalMs: 100 sessionTimeoutMs: ...
  • 1 同一订阅组内不同Consumer实例订阅不同topic消费混乱问题调查 图1: 背景说明: 如图1左半部分,假设目前的关系如下: broker: 两,broker_a和broker_b topic:两topic1和topic2,每个topic在每...
  • 最近做的项目的一版本... 由于刚刚从事JAVA开发这行业不久,所以在使用到这MQ的时候本人还是比较陌生的,于是花了一些时间去学习,当然学习的时候能记住的东西还是不的,因为在学习理论的时候,没有真正的...
  • 我使用springboot kafka 使用同一个消费组内配置多个消费者(因此会有多个@KafkaListener监听器),监听多个topic下的指定分区,如图所示,是这样配置吧?但使用@TopicPartition 时报错!提示:TopicPartition ...
  • 很多时候,我们会在多个地方同时订阅一个 topic,但是发现消费者只能执行一个 后注册消费者会顶替之前注册的消费者 原因 在 subscribeTable和 subscriptionInner方法中,是使用 Map 集合的方式存储topic订阅者 ...
  •  kafka消费者有一熟悉groupId 就是一个topic中的消息只能同一groupId的消费者中的一个消费消费。 这groupId,在配置消费者时指定。 但是问题来了,怎么实现让一个topic可以让不同group消费呢。 这为也...
  • Flink实时消费多个topic数据

    千次阅读 2021-06-02 22:17:48
    // 启动executor,执行任务 env.execute("Socket stream word count") } } 开启两Kafka生产者,分别是test1、test2两个topic,输入测试数据: 可以看到数据都打印在控制台了: 喜欢的小伙伴点关注吧~~~
  • Kafka Consumertopic关系

    2021-01-24 12:40:59
    本质上kafka只支持Topic; 每个group中可以有多个consumer,每个consumer属于一个consumer group;... 对于Topic中的一条特定的消息,只会订阅此Topic的每个group中的其中一个consumer消费,此消
  • 最近很多人问我,sparkstreaming怎么消费多个topic的数据,自己维护offest,其实这个跟消费一个topic是一样的,但还是有很多问我,今天就简单的写一个demo,供大家参考,直接上代码吧,已经测试过了.我把offest存到redis里了...
  • 说真,这问题看上去很简单,但“得益”与kafka-python...直接上代码吧from kafka import SimpleClient, KafkaConsumerfrom kafka.common import OffsetRequestPayload, TopicPartitiondef get_topic_offset(broker...
  • 今天开发了一将很长的mq消息分段发送的功能,自测一下,发现测试环境老是有部分消息没有被消费。看了下后天的消费者,一订阅组里面有两个消费者,但是topic不是我订阅的topic,应该是另外一个消费者的topic。 ...
  • /** * Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数 */ //consumer.setStartFromGroupOffsets(); final SingleOutputStreamOperator> source = env.addSource(consumer).flatMap( ...
  • 关于同一进程配置多个groupId消费同一个Topic的问题   同一个进程中配置多个groupId消费同一个topic,期望的结果是都可以消费到这个topic,而实际上,只会有一个groupId能消费这个topic。所以这个groupId的配置也...
  • 这个问题是最近一个朋友问我的,用sparkstreaming消费kafka的多个topic,怎么获取topic的信息,然后根据不同topic的数据做不同的逻辑处理.其实这个问题非常简单,...
  • 在一服务里起了两个消费者,并且这两个消费者属于同一组,但是这两个消费者所订阅的Topic是不相同的,出现了只有一个消费者在消费消息,另外一消费消息 分析 经过走查broker端代码发现如下代码: 重点...
  • kafka生产者与线程消费者demo producer生产者代码 package com.cg.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer....
  • 需要依赖的jar包,包括但不限于 <!-- spring核心包 --> <groupId>org.springframework <artifactId>spring-core ${spring.version} <groupId>org.springframework <artifactId>spring-web ...消费...
  • 一、kafkaConsumerClient代码 import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 51,144
精华内容 20,457
关键字:

topic被多个consumer消费