精华内容
参与话题
问答
  • Kafka多线程生产消费

    千次阅读 2018-06-09 10:23:06
    kafka目前在0.9版本后采用java版本实现,生产者KafkaProducer是线程安全对象,所以我们建议KafkaProducer采用单例模式,线程共享一个实例 [java] view plain copypackage com.kafka.singleton; import java...

    一、kafka生产者

           kafka目前在0.9版本后采用java版本实现,生产者KafkaProducer是线程安全对象,所以我们建议KafkaProducer采用单例模式,多个线程共享一个实例

        

    [java] view plain copy
    1. package com.kafka.singleton;  
    2.   
    3. import java.io.IOException;  
    4. import java.io.InputStream;  
    5. import java.util.Properties;  
    6. import java.util.Random;  
    7.   
    8. import org.apache.kafka.clients.producer.Callback;  
    9. import org.apache.kafka.clients.producer.KafkaProducer;  
    10. import org.apache.kafka.clients.producer.ProducerRecord;  
    11. import org.apache.kafka.clients.producer.RecordMetadata;  
    12. import org.slf4j.Logger;  
    13. import org.slf4j.LoggerFactory;  
    14.   
    15. public final class KafkaProducerSingleton {  
    16.   
    17.     private static final Logger LOGGER = LoggerFactory  
    18.             .getLogger(KafkaProducerSingleton.class);  
    19.   
    20.     private static KafkaProducer<String, String> kafkaProducer;  
    21.   
    22.     private Random random = new Random();  
    23.   
    24.     private String topic;  
    25.   
    26.     private int retry;  
    27.   
    28.     private KafkaProducerSingleton() {  
    29.   
    30.     }  
    31.       
    32.   
    33.     /** 
    34.      * 静态内部类 
    35.      *  
    36.      * @author tanjie 
    37.      * 
    38.      */  
    39.     private static class LazyHandler {  
    40.   
    41.         private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();  
    42.     }  
    43.   
    44.     /** 
    45.      * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例 
    46.      *  
    47.      * @return 
    48.      */  
    49.     public static final KafkaProducerSingleton getInstance() {  
    50.         return LazyHandler.instance;  
    51.     }  
    52.   
    53.     /** 
    54.      * kafka生产者进行初始化 
    55.      *  
    56.      * @return KafkaProducer 
    57.      */  
    58.     public void init(String topic,int retry) {  
    59.         this.topic = topic;  
    60.         this.retry = retry;  
    61.         if (null == kafkaProducer) {  
    62.             Properties props = new Properties();  
    63.             InputStream inStream = null;  
    64.             try {  
    65.                 inStream = this.getClass().getClassLoader()  
    66.                         .getResourceAsStream("kafka.properties");  
    67.                 props.load(inStream);  
    68.                 kafkaProducer = new KafkaProducer<String, String>(props);  
    69.             } catch (IOException e) {  
    70.                 LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);  
    71.             } finally {  
    72.                 if (null != inStream) {  
    73.                     try {  
    74.                         inStream.close();  
    75.                     } catch (IOException e) {  
    76.                         LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);  
    77.                     }  
    78.                 }  
    79.             }  
    80.         }  
    81.     }  
    82.   
    83.     /** 
    84.      * 通过kafkaProducer发送消息 
    85.      *  
    86.      * @param topic 
    87.      *            消息接收主题 
    88.      * @param partitionNum 
    89.      *            哪一个分区 
    90.      * @param retry 
    91.      *            重试次数 
    92.      * @param message 
    93.      *            具体消息值 
    94.      */  
    95.     public void sendKafkaMessage(final String message) {  
    96.         /** 
    97.          * 1、如果指定了某个分区,会只讲消息发到这个分区上 2、如果同时指定了某个分区和key,则也会将消息发送到指定分区上,key不起作用 
    98.          * 3、如果没有指定分区和key,那么将会随机发送到topic的分区中 4、如果指定了key,那么将会以hash<key>的方式发送到分区中 
    99.          */  
    100.         ProducerRecord<String, String> record = new ProducerRecord<String, String>(  
    101.                 topic, random.nextInt(3), "", message);  
    102.         // send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率  
    103.         // kafka生产者是线程安全的,可以单实例发送消息  
    104.         kafkaProducer.send(record, new Callback() {  
    105.             public void onCompletion(RecordMetadata recordMetadata,  
    106.                     Exception exception) {  
    107.                 if (null != exception) {  
    108.                     LOGGER.error("kafka发送消息失败:" + exception.getMessage(),  
    109.                             exception);  
    110.                     retryKakfaMessage(message);  
    111.                 }  
    112.             }  
    113.         });  
    114.     }  
    115.   
    116.     /** 
    117.      * 当kafka消息发送失败后,重试 
    118.      *  
    119.      * @param retryMessage 
    120.      */  
    121.     private void retryKakfaMessage(final String retryMessage) {  
    122.         ProducerRecord<String, String> record = new ProducerRecord<String, String>(  
    123.                 topic, random.nextInt(3), "", retryMessage);  
    124.         for (int i = 1; i <= retry; i++) {  
    125.             try {  
    126.                 kafkaProducer.send(record);  
    127.                 return;  
    128.             } catch (Exception e) {  
    129.                 LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);  
    130.                 retryKakfaMessage(retryMessage);  
    131.             }  
    132.         }  
    133.     }  
    134.   
    135.     /** 
    136.      * kafka实例销毁 
    137.      */  
    138.     public void close() {  
    139.         if (null != kafkaProducer) {  
    140.             kafkaProducer.close();  
    141.         }  
    142.     }  
    143.   
    144.     public String getTopic() {  
    145.         return topic;  
    146.     }  
    147.   
    148.     public void setTopic(String topic) {  
    149.         this.topic = topic;  
    150.     }  
    151.   
    152.     public int getRetry() {  
    153.         return retry;  
    154.     }  
    155.   
    156.     public void setRetry(int retry) {  
    157.         this.retry = retry;  
    158.     }  
    159.   
    160. }  

       HandlerProducer

      

    [java] view plain copy
    1. package com.travelsky.kafka.singleton;  
    2.   
    3.   
    4. public class HandlerProducer implements Runnable {  
    5.   
    6.     private String message;  
    7.   
    8.     public HandlerProducer(String message) {  
    9.         this.message = message;  
    10.     }  
    11.   
    12.     @Override  
    13.     public void run() {  
    14.         KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton  
    15.                 .getInstance();  
    16.         kafkaProducerSingleton.init("test_find",3);  
    17.         System.out.println("当前线程:" + Thread.currentThread().getName()  
    18.                 + ",获取的kafka实例:" + kafkaProducerSingleton);  
    19.         kafkaProducerSingleton.sendKafkaMessage("发送消息" + message);  
    20.     }  
    21.   
    22. }  

        kafka.properties

    [java] view plain copy
    1. bootstrap.servers=master:9092,slave1:9092,slave2:9092  
    2. acks=1  
    3. retries=0  
    4. batch.size=1000  
    5. compression.type=gzip  
    6. #buffer.memory=33554432  
    7. key.serializer=org.apache.kafka.common.serialization.StringSerializer  
    8. value.serializer=org.apache.kafka.common.serialization.StringSerializer  

      二、kafka消费者

    [java] view plain copy
    1. package com.kafka.consumer;  
    2.   
    3. import java.util.Arrays;  
    4. import java.util.Properties;  
    5. import java.util.concurrent.ExecutorService;  
    6. import java.util.concurrent.Executors;  
    7. import java.util.concurrent.TimeUnit;  
    8.   
    9. import org.apache.kafka.clients.consumer.ConsumerRecords;  
    10. import org.apache.kafka.clients.consumer.KafkaConsumer;  
    11.   
    12. public final class Kafka_Consumer {  
    13.   
    14.     /** 
    15.      * kafka消费者不是线程安全的 
    16.      */  
    17.     private final KafkaConsumer<String, String> consumer;  
    18.   
    19.     private ExecutorService executorService;  
    20.   
    21.     public Kafka_Consumer() {  
    22.         Properties props = new Properties();  
    23.         props.put("bootstrap.servers",  
    24.                 "ip,port");  
    25.         props.put("group.id""group");  
    26.         // 关闭自动提交  
    27.         props.put("enable.auto.commit""false");  
    28.         props.put("auto.commit.interval.ms""1000");  
    29.         props.put("session.timeout.ms""30000");  
    30.         props.put("key.deserializer",  
    31.                 "org.apache.kafka.common.serialization.StringDeserializer");  
    32.         props.put("value.deserializer",  
    33.                 "org.apache.kafka.common.serialization.StringDeserializer");  
    34.         consumer = new KafkaConsumer<String, String>(props);  
    35.         consumer.subscribe(Arrays.asList("test_find"));  
    36.     }  
    37.   
    38.     public void execute() {  
    39.         executorService = Executors.newFixedThreadPool(3);  
    40.         while (true) {  
    41.             ConsumerRecords<String, String> records = consumer.poll(10);  
    42.             if (null != records) {  
    43.                 executorService.submit(new ConsumerThread(records, consumer));  
    44.             }  
    45.         }  
    46.     }  
    47.   
    48.     public void shutdown() {  
    49.         try {  
    50.             if (consumer != null) {  
    51.                 consumer.close();  
    52.             }  
    53.             if (executorService != null) {  
    54.                 executorService.shutdown();  
    55.             }  
    56.             if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {  
    57.                 System.out.println("Timeout");  
    58.             }  
    59.         } catch (InterruptedException ignored) {  
    60.             Thread.currentThread().interrupt();  
    61.         }  
    62.     }  
    63.   
    64. }  

        ConsumerThread

    [java] view plain copy
    1. package com.kafka.consumer;  
    2.   
    3. import java.util.Collections;  
    4. import java.util.List;  
    5.   
    6. import org.apache.kafka.clients.consumer.ConsumerRecord;  
    7. import org.apache.kafka.clients.consumer.ConsumerRecords;  
    8. import org.apache.kafka.clients.consumer.KafkaConsumer;  
    9. import org.apache.kafka.clients.consumer.OffsetAndMetadata;  
    10. import org.apache.kafka.common.TopicPartition;  
    11.   
    12. /** 
    13.  * 多消费者,多个work线程,难保证分区消息消费的顺序性 
    14.  *  
    15.  * @author tanjie 
    16.  * 
    17.  */  
    18. public final class ConsumerThread implements Runnable {  
    19.   
    20.     private ConsumerRecords<String, String> records;  
    21.   
    22.     private KafkaConsumer<String, String> consumer;  
    23.   
    24.     public ConsumerThread(ConsumerRecords<String, String> records,  
    25.             KafkaConsumer<String, String> consumer) {  
    26.         this.records = records;  
    27.         this.consumer = consumer;  
    28.     }  
    29.   
    30.     @Override  
    31.     public void run() {  
    32.         for (TopicPartition partition : records.partitions()) {  
    33.             List<ConsumerRecord<String, String>> partitionRecords = records  
    34.                     .records(partition);  
    35.             for (ConsumerRecord<String, String> record : partitionRecords) {  
    36.                 System.out.println("当前线程:" + Thread.currentThread() + ","  
    37.                         + "偏移量:" + record.offset() + "," + "主题:"  
    38.                         + record.topic() + "," + "分区:" + record.partition()  
    39.                         + "," + "获取的消息:" + record.value());  
    40.             }  
    41.             // 消费者自己手动提交消费的offest,确保消息正确处理后再提交  
    42.             long lastOffset = partitionRecords.get(partitionRecords.size() - 1)  
    43.                     .offset();  
    44.             consumer.commitSync(Collections.singletonMap(partition,  
    45.                     new OffsetAndMetadata(lastOffset + 1)));  
    46.         }  
    47.     }  
    48. }  

       Main方法

    [java] view plain copy
    1. public static void main(String[] args) {  
    2.         Kafka_Consumer kafka_Consumer = new Kafka_Consumer();  
    3.         try {  
    4.             kafka_Consumer.execute();  
    5.             Thread.sleep(20000);  
    6.         } catch (InterruptedException e) {  
    7.             e.printStackTrace();  
    8.         } finally {  
    9.             kafka_Consumer.shutdown();  
    10.         }  
    11.     }  

     三、运行效果

         先起消费者,再起生产者,运行效果如下

         消费者:

    [java] view plain copy
    1. 当前线程:Thread[pool-1-thread-1,5,main],偏移量:44,主题:test_find,分区:1,获取的消息:发送消息:1  
    2. 当前线程:Thread[pool-1-thread-2,5,main],偏移量:45,主题:test_find,分区:1,获取的消息:发送消息:2  
    3. 当前线程:Thread[pool-1-thread-1,5,main],偏移量:46,主题:test_find,分区:1,获取的消息:发送消息:3  
    4. 当前线程:Thread[pool-1-thread-1,5,main],偏移量:39,主题:test_find,分区:0,获取的消息:发送消息:4  
    5. 当前线程:Thread[pool-1-thread-2,5,main],偏移量:47,主题:test_find,分区:1,获取的消息:发送消息:5  
    6. 当前线程:Thread[pool-1-thread-3,5,main],偏移量:40,主题:test_find,分区:0,获取的消息:发送消息:6  
    7. 当前线程:Thread[pool-1-thread-2,5,main],偏移量:37,主题:test_find,分区:2,获取的消息:发送消息:7  
    8. 当前线程:Thread[pool-1-thread-2,5,main],偏移量:38,主题:test_find,分区:2,获取的消息:发送消息:8  
    9. 当前线程:Thread[pool-1-thread-1,5,main],偏移量:48,主题:test_find,分区:1,获取的消息:发送消息:9  
    10. 当前线程:Thread[pool-1-thread-2,5,main],偏移量:39,主题:test_find,分区:2,获取的消息:发送消息:10  

        生产者:

      

    [java] view plain copy
    1. import java.util.concurrent.ExecutorService;  
    2. import java.util.concurrent.Executors;  
    3.   
    4. import org.junit.Test;  
    5. import org.junit.runner.RunWith;  
    6. import org.springframework.test.context.ContextConfiguration;  
    7. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  
    8.   
    9. import com.travelsky.kafka.singleton.HandlerProducer;  
    10.   
    11. @RunWith(SpringJUnit4ClassRunner.class)  
    12. @ContextConfiguration(locations = { "classpath:applicationContext.xml" })  
    13. public class Kafka生产_多线程单实例 {  
    14.       
    15.   
    16.     @Test  
    17.     public void testSendMessageSingleton() throws InterruptedException {  
    18.         ExecutorService executor = Executors.newFixedThreadPool(3);  
    19.         for (int i = 1; i <= 10; i++) {  
    20.             Thread.sleep(1000);  
    21.             executor.submit(new HandlerProducer(":" + i));  
    22.         }  
    23.     }  
    24.   
    25. }  

       

    [java] view plain copy
    1. 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    2. 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    3. 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    4. 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    5. 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    6. 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    7. 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    8. 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    9. 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  
    10. 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475  

     

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/charry_a/article/details/79621324
    展开全文
  • 正确处理kafka多线程消费的姿势

    千次阅读 2019-08-03 14:21:12
    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息。通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步、解耦、削峰等几大好处,而且开始考虑最大...

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息。通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步、 解耦、 削峰等几大好处,而且开始考虑最大的好处,可以实现架构的水平扩展,下游系统出现性能瓶颈,容器平台伸缩增加一些实例消费能力很快就提上来了,整体系统架构上不用任何变动。理论上,我们项目数据量再大整体架构上高可用都没有问题。在使用kafka过程中也遇到一些问题:

    1. 消息逐渐积压,消费能力跟不上;

    2.某个消费者实例因为某些异常原因挂掉,造成少量数据丢失的问题。

    针对消费积压的问题,通过研究kafka多线程消费的原理,解决了消费积压的问题。所以,理解多线程的Consumer模型是非常有必要,对于我们正确处理kafka多线程消费很重要。

    kafka多线程消费模式

    说kafka多线程消费模式前,我们先来说下kafka本身设计的线程模型和ConcurrentmodificationException异常的原因。见官方文档:

    The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.

    ConcurrentmodificationException异常的出处见以下代码:

      /**
         * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
         * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
         * supported).
         * @throws IllegalStateException if the consumer has been closed
         * @throws ConcurrentModificationException if another thread already has the lock
         */
        private void acquire() {
            ensureNotClosed();
            long threadId = Thread.currentThread().getId();
            if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
                throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
            refcount.incrementAndGet();
        }

    该方法acquire 会在KafkaConsumer的大部分公有方法调用第一句就判断是否正在同一个KafkaConsumer被多个线程调用。

    "正在"怎么理解呢?我们顺便看下KafkaConsumer的commitAsync 这个方法就知道了。

     @Override
        public void commitAsync(OffsetCommitCallback callback) {
            acquire(); // 引用开始
            try {
                commitAsync(subscriptions.allConsumed(), callback);
            } finally {
                release(); //引用释放
            }
        }
    

    我们看KafkaConsumer的release方法就是释放正在操作KafkaConsumer实例的引用。

     /**
         * Release the light lock protecting the consumer from multi-threaded access.
         */
        private void release() {
            if (refcount.decrementAndGet() == 0)
                currentThread.set(NO_CURRENT_THREAD);
        }

    通过以上的代码理解,我们可以总结出来kafka多线程的要点: kafka的KafkaConsumer必须保证只能被一个线程操作

    下面就来说说,我理解的Kafka能支持的两种多线程模型,首先,我们必须保证操作KafkaConsumer实例的只能是一个线程,那我们要想多线程只能用在消费ConsumerRecord List上动心思了。下面列举我理解的kafka多线程消费模式。

    • 模式一  1个Consumer模型对应一个线程消费,最多可以有topic对应的partition个线程同时消费Topic。

                

     

    • 模式二 1个Consumer和多个线程消费模型,保证只有一个线程操作KafkaConsumer,其它线程消费ConsumerRecord列表。

    注意 第二种模式其实也可以支持多个Consumer,用户最多可以启用partition总数个Consumer实例,然后,模式二跟模式一唯一的差别就是模式二在单个Consuemr里面是多线程消费,而模式一单个Consumer里面是单线程消费。

    以上两种kafka多线程消费模式优缺点对比:

    kafka多线程消费模式实现    

    关于多线程消费模式具体实现都是选择基于spring-kafka实现,毕竟站在巨人肩膀上,站的高望的远少加班???,以下就是模式二的具体实现,模式一的话就是对模式二的简化,具体实现如下。

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String servers;
    
        @Value("${kafka.producer.retries}")
        private int retries;
        @Value("${kafka.producer.batch-size}")
        private int batchSize;
        @Value("${kafka.producer.linger}")
        private int linger;
    
        @Value("${kafka.consumer.enable.auto.commit}")
        private boolean enableAutoCommit;
        @Value("${kafka.consumer.session.timeout}")
        private String sessionTimeout;
        @Value("${kafka.consumer.group.id}")
        private String groupId;
        @Value("${kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
    
        @Value("${msg.consumer.max.poll.records}")
        private int maxPollRecords;
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        public ProducerFactory producerFactory() {
            return new DefaultKafkaProducerFactory(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate kafkaTemplate() {
            return new KafkaTemplate(producerFactory());
        }
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setBatchListener(true);
            // 此处并发度设置的都是Consumer个数,可以设置1到partition总数,
            // 但是,所有机器实例上总的并发度之和必须小于等于partition总数
            // 如果,总的并发度小于partition总数,有一个Consumer实例会消费超过一个以上partition
            factory.setConcurrency(2);
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            return factory;
        }
    
        public ConsumerFactory<String, Object> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            return propsMap;
        }
    
    }

    具体业务代码在BaseConsumer:

    public abstract class BaseConsumer implements ApplicationListener<ConsumerStoppedEvent> {
    
        private static final Logger LOG = LoggerFactory.getLogger(BaseConsumer.class);
    
        @Value("${kafka.consumer.thread.min}")
        private int consumerThreadMin;
    
        @Value("${kafka.consumer.thread.max}")
        private int consumerThreadMax;
    
        private ThreadPoolExecutor consumeExecutor;
    
        private volatile boolean isClosePoolExecutor = false;
    
        @PostConstruct
        public void init() {
    
            this.consumeExecutor = new ThreadPoolExecutor(
                    getConsumeThreadMin(),
                    getConsumeThreadMax(),
                    // 此处最大最小不一样没啥大的意义,因为消息队列需要达到 Integer.MAX_VALUE 才有点作用,
                    // 矛盾来了,我每次批量拉下来不可能设置Integer.MAX_VALUE这么多,
                    // 个人觉得每次批量下拉的原则 觉得消费可控就行,
                    // 不然,如果出现异常情况下,整个服务示例突然挂了,拉下来太多,这些消息会被重复消费一次。
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>());
        }
    
        /**
         * 收到spring-kafka 关闭Consumer的通知
         * @param event 关闭Consumer 事件
         */
        @Override
        public void onApplicationEvent(ConsumerStoppedEvent event) {
    
            isClosePoolExecutor = true;
            closeConsumeExecutorService();
    
        }
    
        private void closeConsumeExecutorService() {
    
            if (!consumeExecutor.isShutdown()) {
    
                ThreadUtil.shutdownGracefully(consumeExecutor, 120, TimeUnit.SECONDS);
                LOG.info("consumeExecutor stopped");
    
            }
    
        }
    
        @PreDestroy
        public void doClose() {
            if (!isClosePoolExecutor) {
                closeConsumeExecutorService();
            }
        }
    
        @KafkaListener(topics = "${msg.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
        public void onMessage(List<String> msgList, Acknowledgment ack) {
    
            CountDownLatch countDownLatch = new CountDownLatch(msgList.size());
    
            for (String message : msgList) {
                submitConsumeTask(message, countDownLatch);
            }
    
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                LOG.error("countDownLatch exception ", e);
            }
    
            // 本次批量消费完,手动提交
            ack.acknowledge();
            LOG.info("finish commit offset");
    
        }
    
        private void submitConsumeTask(String message, CountDownLatch countDownLatch) {
            consumeExecutor.submit(() -> {
                try {
                    onDealMessage(message);
                } catch (Exception ex) {
                    LOG.error("on DealMessage exception:", ex);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
    
        /**
         * 子类实现该抽象方法处理具体消息的业务逻辑
         * @param message kafka的消息
         */
        protected abstract void onDealMessage(String message);
    
        private int getConsumeThreadMax() {
            return consumerThreadMax;
        }
    
        private int getConsumeThreadMin() {
            return consumerThreadMin;
        }
    
        public void setConsumerThreadMax(int consumerThreadMax) {
            this.consumerThreadMax = consumerThreadMax;
        }
    
        public void setConsumerThreadMin(int consumerThreadMin) {
            this.consumerThreadMin = consumerThreadMin;
        }
    }

    其中,closeConsumeExecutorService方法就是为了服务实例异常退出或者多机房上线kill的情况下,尽最大可能保证本次拉下来的任务被消费掉。最后,附上closeConsumeExecutorService实现,觉得RocketMQ源码这个实现的不错,就借用过来了,在此表示感谢。

      public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
            // Disable new tasks from being submitted.
            executor.shutdown();
            try {
                // Wait a while for existing tasks to terminate.
                if (!executor.awaitTermination(timeout, timeUnit)) {
                    executor.shutdownNow();
                    // Wait a while for tasks to respond to being cancelled.
                    if (!executor.awaitTermination(timeout, timeUnit)) {
                        LOGGER.warn(String.format("%s didn't terminate!", executor));
                    }
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted.
                executor.shutdownNow();
                // Preserve interrupt status.
                Thread.currentThread().interrupt();
            }
        }

    下面回到使用kafka遇到的第二个问题,怎么解决消费者实例因为某些原因挂掉,造成少量数据丢失的问题。其实,通过我们上面的写法,已经不会出现因为某些原因服务实例(docker、物理机)挂掉,丢数据的情况。因为我们是先拉取后消费,消费完才手动提交kafka确认offset。实在还存在万一退出时候调用的closeConsumeExecutorService方法还没有消费完数据,表示这个时候offset肯定没有手动提交,这一部分数据也不会丢失,会在服务实例恢复了重新拉取消费。

    以上的代码存在极小的可能瑕疵,比如,我们双机房切换上线,某机房实例有一部分数据没有消费,下次会重复消费的问题。其实,这个问题我们在业务上通过在配置中心配置一个标识符来控制,当改变标识符控制某些机房停止拉取kafka消息,这个时候我们就可以安全操作,不担心kafka没有消费完,下次重复消费的问题了。

    以上自己使用kafka过程中一些心得体会,难免有所遗漏,感谢指出,知错能改,每天进步?。

     

    展开全文
  • 几种kafka多线程消费方式

    千次阅读 2019-03-28 13:17:45
    kafka API ...知乎关于多线程的回答https://www.zhihu.com/question/57483708/answer/153185829 1、高级新api消费者,一个线程一...

    kafka API   https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

    知乎关于多线程的回答https://www.zhihu.com/question/57483708/answer/153185829

    1、高级新api消费者,一个线程一个消费者。

    import com.atguigu.datacosumer.util.PropertyUtil;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * @author wade
     * @create 2019-03-20 12:26
     */
    public class MyTestThreads {
        public static void main(String[] args) {
            /*
            a进行了消费ddd分区是0偏移量是15
            b进行了消费eee分区是2偏移量是17
            a进行了消费wade分区是1偏移量是17
            a进行了消费pual分区是0偏移量是16
             */
    
            //因为KafkaConsumer不是线程安全的,使用一个对象会报异常
            //ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
            new Thread(new MyConsumerThread(),"a").start();
    
            new Thread(new MyConsumerThread(),"b").start();
    
    
        }
    }
    
    class MyConsumerThread implements Runnable {
        KafkaConsumer<String, String> consumer;
        public  MyConsumerThread(){
            Properties properties = PropertyUtil.properties;
           consumer = new KafkaConsumer<>(properties);
    
        }
        @Override
        public void run() {
            consumer.subscribe(Arrays.asList("dai"));
            while(true){
                ConsumerRecords<String,String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(Thread.currentThread().getName()+"进行了消费"+record.value()+"分区是"+record.partition()+"偏移量是"+record.offset());
                }
    
            }
    
        }
        //./kafka-consumer-groups.sh --bootstrap-server hadoop103:9092 --new-consumer --group g1  --describe
        /*
        TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    dai                            0          19              19              0          consumer-1-de94a73e-78ab-4097-9a4b-05a44b0efdfa   /192.168.11.1                  consumer-1
    dai                            1          20              20              0          consumer-1-de94a73e-78ab-4097-9a4b-05a44b0efdfa   /192.168.11.1                  consumer-1
    dai                            2          20              20              0          consumer-2-da47cf63-fcc8-44b1-ae9e-760dba4df284   /192.168.11.1
         */
    }
    

    2、高级旧api消费者,一个消费者3个线程

    import com.atguigu.datacosumer.util.PropertyUtil;
    import kafka.consumer.*;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    import org.apache.commons.lang.ObjectUtils;
    import scala.Int;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    /**
     * @author wade
     * @create 2019-03-20 15:50
     */
    public class MyTestThreads2 {
    
    
        public static void main(String[] args) {
            //这种是老版本的高级api  还要连 zk
            Properties properties = PropertyUtil.properties;
            ConsumerConfig config = new ConsumerConfig(properties);
            ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
            HashMap<String, Integer> map = new HashMap<>();
            map.put("dai",3);
            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = connector.createMessageStreams(map);
    
            List<KafkaStream<byte[], byte[]>> kafkaStreams = messageStreams.get("dai");
    
    
            for (KafkaStream<byte[], byte[]> stream : kafkaStreams) {
                new Thread(new MyThreads(stream)).start();
            }
        }
    }
    class MyThreads implements Runnable {
        KafkaStream<byte[], byte[]> stream = null ;
    
        public MyThreads (KafkaStream<byte[], byte[]> stream){
            this.stream = stream;
        }
    
        @Override
        public void run() {
            ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
            while (iterator.hasNext()){
                MessageAndMetadata<byte[], byte[]> metadata = iterator.next();
                System.out.println(
                        Thread.currentThread().getName() + "消费了 =>>" + new String(metadata.message())+
                                "主题=>"+metadata.topic()+
                                "分区=>" +metadata.partition()  +
                                "偏移量=>" + metadata.offset()
    
                );
            }
        }
    }
    //     ./kafka-consumer-groups.sh --zookeeper hadoop103:2181 --group g1  --describe
    /*
    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID
    dai                            0          19              20              1          g1_DESKTOP-HGSVH26-1553070141911-a078e09d
    dai                            1          20              22              2          g1_DESKTOP-HGSVH26-1553070141911-a078e09d
    dai                            2          20              21              1          g1_DESKTOP-HGSVH26-1553070141911-a078e09d
    
     */
    

    3、高级新api实现消费和处理解耦

    import com.atguigu.datacosumer.util.PropertyUtil;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import sun.applet.Main;
    
    import java.util.Arrays;
    import java.util.LinkedList;
    import java.util.Properties;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * @author wade
     * @create 2019-03-20 18:04
     */
    public class MyThreadsTest4 {
        public static void main(String[] args) throws InterruptedException {
            KafkaConsumer<String, String> consumer;
            Properties properties = PropertyUtil.properties;
            consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(Arrays.asList("dai"));
            ConsumerRecords<String, String> records ;
    
            /*
            线程安全,获取如果没有 阻塞,添加如果多了 阻塞 ,多线程数据共享,类似消息队列
             */
            LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue();
    
    
                new Thread(new MyThread4(list),"bb").start();
    
                new Thread(new MyThread4(list),"aa").start();
    
                while (true){
    
                    records = consumer.poll(1000);
    
                    list.put(records);
               //建议打开来主动提交
                // 默认的自动提交会造成offset的提交不及时,关闭再启动的时候会重复消费
                //避免不了数据丢失
             // consumer.commitAsync();
            }
    
    
    
        }
    }
    /**
     * 消费和处理解耦
     * 一个或多个消费者线程来做所有的数据消费,把ConsumerRecords实例存到一个被多个处理线程或线程池
     * 消费的阻塞队列
     * 好处:不限制消费和处理的线程,让 一个消费者来满足多个处理线程,避免了线程数被分区数所限制
     *      理解 :(因为 不解耦的情况下,消费和处理在一起,offset提交的原因,消费线程被分区数限制,多的线程都是空转。
     *          而解耦了,处理线程完全不受限制,消费线程仍然限制
     *      )
     * 坏处 : 顺序是一个问题, 多个处理线程顺序无法保证,先从阻塞队列获得的数据 可能比后面获得的数据处理时间晚
     *  坏处 : 手动提交offset变得很难,可能数据丢失和重复消费
     *
     * 2. Decouple Consumption and Processing
     * Another alternative is to have one or more consumer threads that do all data consumption and hands off ConsumerRecords instances to a blocking queue consumed by a pool of processor threads that actually handle the record processing.
     * This option likewise has pros and cons:
     * PRO: This option allows independently scaling the number of consumers and processors.
     *      This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
     * CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing.
     *      For processing that has no ordering requirements this is not a problem.
     * CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition.
     *      There are many possible variations on this approach. For example each processor thread can have its own queue,
     *      and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.
     */
    
    class MyThread4 implements Runnable {
    
        LinkedBlockingQueue<ConsumerRecords<String, String>> list ;
    
        public MyThread4 (LinkedBlockingQueue<ConsumerRecords<String, String>> list){
            this.list = list;
        }
    
        @Override
        public void run() {
            while (true) {
                ConsumerRecords<String, String> consumerRecords;
                try {
                    consumerRecords = list.take();
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println(Thread.currentThread().getName()
                                +"消费了:" + consumerRecord.value()
                                +"  分区:"+consumerRecord.partition()
                                +"偏移量是:" + consumerRecord.offset()
                        );
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
    
            }
        }
    }

     

    展开全文
  • 2.2、多线程Kafka Consumer 模型类别 2.2.1、模型一:多个Consumer且每一个Consumer有自己的线程 2.2.2、模型二:一个Consumer且有多个Worker线程 1、多线程生产者 kafka目前在0.9版本后采用java版本实现,...

    目录

     

    1、多线程生产者

    2、多线程消费者

    2.1、Consumer为何需要实现多线程

    2.2、多线程的Kafka Consumer 模型类别

    2.2.1、模型一:多个Consumer且每一个Consumer有自己的线程

    2.2.2、模型二:一个Consumer且有多个Worker线程


    1、多线程生产者

    kafka目前在0.9版本后采用java版本实现,生产者KafkaProducer是线程安全对象,所以我们建议KafkaProducer采用单例模式,多个线程共享一个实例。

    代码:

    • ProducerThread
    package com.qibo.base.controller.kafkaThread;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.log4j.Logger;
    
    import com.qibo.base.controller.kafka.MQDict;
    
    public class ProducerThread implements Runnable {
    	static Logger log = Logger.getLogger(Producer.class);
    
    	private static KafkaProducer<String, String> producer = null;
    
    	/*
    	 * 初始化生产者
    	 */
    	static {
    		Properties configs = initConfig();
    		producer = new KafkaProducer<String, String>(configs);
    	}
    
    	/*
    	 * 初始化配置
    	 */
    	private static Properties initConfig() {
    		Properties props = new Properties();
    		props.put("bootstrap.servers", MQDict.MQ_ADDRESS_COLLECTION);
    		props.put("acks", "1");
    		props.put("retries", 0);
    		props.put("batch.size", 16384);
    		props.put("key.serializer", StringSerializer.class.getName());
    		props.put("value.serializer", StringSerializer.class.getName());
    		return props;
    	}
    
    	@Override
    	public void run() {
    		System.out.println("主线程序号:"+Thread.currentThread().getId()+" ");
    		int j = 0;
    		while (true) {
    			j++;
    			// 消息实体
    			ProducerRecord<String, String> record = null;
    			for (int i = 0; i < 10; i++) {
    				record = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "value" + i);
    				// 发送消息
    				producer.send(record, new Callback() {
    					@Override
    					public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    						if (null != e) {
    							log.info("send error" + e.getMessage());
    						} else {
    							System.out.println("线程序号:"+Thread.currentThread().getId()+" "+String.format("发送信息---offset:%s,partition:%s", recordMetadata.offset(),
    									recordMetadata.partition()));
    						}
    					}
    				});
    			}
    			// producer.close();
    			try {
    				
    				Thread.sleep(3000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			if (j > 5)
    				break;
    		}
    
    	}
    
    }
    
    • 调用

    	@RequestMapping("/sendThread")
    	public void sendThread() {
    		ExecutorService runnableService = Executors.newFixedThreadPool(3);
    		runnableService.submit(new ProducerThread());
    		runnableService.submit(new ProducerThread());
    		runnableService.submit(new ProducerThread());
    		runnableService.shutdown();
    	}

    效果:

    开了三个线程跑,但是KafkaProducer是线程安全的。

     

    如果是多个partition,会分散在不通的partition:

    2、多线程消费者

    2.1、Consumer为何需要实现多线程

    假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息。该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用API写入消息到Kafka Cluster中,然后消息的订阅者通过Consumer读取消息,刚开始的时候系统架构图如下:

     

    但是,随着用户数量的增多,通知的数据也会对应的增长。总会达到一个阈值,在这个点上,Producer产生的数量大于Consumer能够消费的数量。那么Broker中未消费的消息就会逐渐增多。即使Kafka使用了优秀的消息持久化机制来保存未被消费的消息,但是Kafka的消息保留机制限制时间,分区大小,消息Key)也会使得始终未被消费的Message被永久性的删除。另一方面从业务上讲,一个消息通知系统的高延迟几乎算作是废物了。所以多线程的Consumer模型是非常有必要的。

    清除机制:

    清理超过指定时间清理: 
    log.retention.hours=16
    超过指定大小后,删除旧的消息:
    log.retention.bytes=1073741824

    2.2、多线程的Kafka Consumer 模型类别

    基于Consumer的多线程模型有两种类型:

    模型一:多个Consumer且每一个Consumer有自己的线程,对应的架构图如下:

     

    模型二:一个Consumer且有多个Worker线程

     

    两种实现方式的优点/缺点比较如下:

    可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改

    2.2.1、模型一:多个Consumer且每一个Consumer有自己的线程

    • ConsumerThread
    
    
    import java.util.Arrays;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    
    import com.qibo.base.controller.kafka.Consumer;
    import com.qibo.base.controller.kafka.MQDict;
    
    public class ConsumerThread implements Runnable {
    
    	static Logger log = Logger.getLogger(Consumer.class);
    
    	private static KafkaConsumer<String, String> consumer;
    
    	/**
    	 * 初始化消费者
    	 */
    	static {
    		Properties configs = initConfig();
    		consumer = new KafkaConsumer<String, String>(configs);
    		consumer.subscribe(Arrays.asList(MQDict.CONSUMER_TOPIC));
    	}
    
    	/**
    	 * 初始化配置
    	 */
    	private static Properties initConfig() {
    		Properties props = new Properties();
    		props.put("bootstrap.servers", MQDict.MQ_ADDRESS_COLLECTION);
    		props.put("group.id", MQDict.CONSUMER_GROUP_ID);
    		props.put("enable.auto.commit", MQDict.CONSUMER_ENABLE_AUTO_COMMIT);
    		props.put("auto.commit.interval.ms", MQDict.CONSUMER_AUTO_COMMIT_INTERVAL_MS);
    		props.put("session.timeout.ms", MQDict.CONSUMER_SESSION_TIMEOUT_MS);
    		props.put("max.poll.records", MQDict.CONSUMER_MAX_POLL_RECORDS);
    		props.put("auto.offset.reset", "earliest");
    		props.put("key.deserializer", StringDeserializer.class.getName());
    		props.put("value.deserializer", StringDeserializer.class.getName());
    		return props;
    	}
    
    	@Override
    	public void run() {
    		System.out.println("主线程序号:"+Thread.currentThread().getId()+" ");
    
    //		int i = 1 ;
    		while (true) {
    			ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT); 
    			records.forEach((ConsumerRecord<String, String> record) -> {
    				
    				log.info("线程序号:"+Thread.currentThread().getId()+" partition:"+record.partition()+" 收到消息: key ===" + record.key() + " value ====" + record.value() + " topic ==="
    						+ record.topic());
    			});
    //			 i++;
    //	            //每次拉10条CONSUMER_MAX_POLL_RECORDS = 10;		
    //	            if (i >5 ){
    //	                consumer.commitSync();
    //	              
    //	                break;
    //	            }
    		}
    //		  consumer.close();
    	}
    
    }
    

    controller:

    	@RequestMapping("/receiveThread")
    	public void receiveThread() {
    		ExecutorService runnableService = Executors.newFixedThreadPool(3);
    		runnableService.submit(new ConsumerThread());
    		runnableService.submit(new ConsumerThread());
    		runnableService.submit(new ConsumerThread());
    		runnableService.shutdown();
    	}

     

    效果:

    以上如果有多个partition,消费段一个consumer对应一个partition,多出来的consumer消费不到partion。

    2.2.2、模型二:一个Consumer且有多个Worker线程

     

    生产者跟都一样


    ConsumerThreadHandler:

    
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    public class ConsumerThreadHandler implements Runnable {
    
    	private ConsumerRecord consumerRecord;
    
    	public ConsumerThreadHandler(ConsumerRecord consumerRecord) {
    		this.consumerRecord = consumerRecord;
    	}
    
    	@Override
    	public void run() {
    		//结合自己的业务处理
    		System.out.println("Consumer Message:" + consumerRecord.value() + ",Partition:" + consumerRecord.partition()
    				+ "Offset:" + consumerRecord.offset());
    	}
    
    }
    

    ConsumerThread2:

    
    import java.util.Arrays;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.log4j.Logger;
    
    import com.qibo.base.controller.kafka.Consumer;
    import com.qibo.base.controller.kafka.MQDict;
    
    public class ConsumerThread2 implements Runnable {
    
    	static Logger log = Logger.getLogger(Consumer.class);
    
    	private static KafkaConsumer<String, String> consumer;
    
    	private ExecutorService executor;
    
    	/**
    	 * 初始化消费者
    	 */
    	static {
    		Properties configs = initConfig();
    		consumer = new KafkaConsumer<String, String>(configs);
    		consumer.subscribe(Arrays.asList(MQDict.CONSUMER_TOPIC));
    	}
    
    	/**
    	 * 初始化配置
    	 */
    	private static Properties initConfig() {
    		Properties props = new Properties();
    		props.put("bootstrap.servers", MQDict.MQ_ADDRESS_COLLECTION);
    		props.put("group.id", MQDict.CONSUMER_GROUP_ID);
    		props.put("enable.auto.commit", MQDict.CONSUMER_ENABLE_AUTO_COMMIT);
    		props.put("auto.commit.interval.ms", MQDict.CONSUMER_AUTO_COMMIT_INTERVAL_MS);
    		props.put("session.timeout.ms", MQDict.CONSUMER_SESSION_TIMEOUT_MS);
    		props.put("max.poll.records", MQDict.CONSUMER_MAX_POLL_RECORDS);
    		props.put("auto.offset.reset", "earliest");
    		props.put("key.deserializer", StringDeserializer.class.getName());
    		props.put("value.deserializer", StringDeserializer.class.getName());
    		return props;
    	}
    
    	@Override
    	public void run() {
    		System.out.println("主线程序号:" + Thread.currentThread().getId() + " ");
    		 executor = new ThreadPoolExecutor(3,3,0L, TimeUnit.MILLISECONDS,
    	                new ArrayBlockingQueue<Runnable>(4), new ThreadPoolExecutor.CallerRunsPolicy());
    	        while (true){
    	        	//循环不断拉取100消息
    	            ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
    	            for (ConsumerRecord<String,String> item : consumerRecords){
    	                executor.submit(new ConsumerThreadHandler(item));
    	            }
    	        }
    		
    		
    		
    	}
    
    }
    

    controller:

    	@RequestMapping("/receiveThread")
    	public void receiveThread() {
    		ExecutorService runnableService = Executors.newFixedThreadPool(3);
    		runnableService.submit(new ConsumerThread());
    		runnableService.submit(new ConsumerThread());
    		runnableService.submit(new ConsumerThread());
    		runnableService.shutdown();
    	}

    以上实现每次consumer拉取100条消息放入多线程的线程池后处理业务。

    展开全文
  • kafka多线程消费

    千次阅读 2018-05-07 14:18:38
    kafka算是很麻烦的一件事儿,起因是最近需要采集大量的数据,原先是只用了典型的high-level Consumer的API,最经典的不过如下:Properties props = new Properties(); props.put("zookeeper.connect", &...
  • 来自多线程处理在页面最后一部分 所有网络I/O都发生在进行调用应用程序的线程中。 用户的责任是确保多线程访问正确同步的。非同步访问将导致ConcurrentModificationException。 此规则唯一的例外是wake...
  • kafka的消费者有很多种不同的用法及模型. 本文着重探讨0.9版本及之后的kafka新consumer API的手动提交和多线程的使用
  • 1,消费组的概念:个消费者组成,相当于一个线程池,初始化线程池,创建消费线程,统一消费方法 2,个消费者:每个消费者抽象为一个线程,实现Callable接口,可获取返回值,查看消费情况 3,消费者:需要初始...
  • springboot整合kafka实现logback收集日志

    万次阅读 2018-06-25 19:38:53
    springboot中实现logback收集日志输出到kafka 异常容错机制,如果kafka服务宕机,输出到本地文件,可用其他方式重新加载local中的数据记录; 效率比对下:也可以尝试直接用kafka客户端写入到kafka中,手动针对异常做容错...
  • springboot整合kafka

    万次阅读 热门讨论 2019-03-19 21:44:41
    业务中需要使用到异步消息队列,为了快速搭建一个消息中间件,这里选了kafka,一方面是kafka搭建比较简单,而且这个中间件比较成熟,而且对于大数据量的消息支持很好,下面首先说说kafka的安装,我这里演示一下在...
  • SpringBoot整合Kafka

    万次阅读 2019-04-21 16:26:28
    kafka 也作为消息中间件的一员,与其他消息中间件相比,它的优点在于拥有极高的吞吐量,ms 级的延迟,是一个高性能,分布式的系统。 源码 GitHub地址:https://github.com/intomylife/SpringBoot 环境 JDK ...
  • SpringBoot整合kafka

    2019-06-14 16:59:41
    SpringBoot整合kafka代码,两个工程 一个消费者 一个生产者,利用定时任务。很简洁代码。欢迎大家下载哦。可参考这篇文章代码哦。https://blog.csdn.net/weixin_39984161/article/details/91986809
  • SpringBoot 整合kafka

    2020-02-23 09:42:41
    参考文章:kafka学习(7)-Springboot整合kafka-多线程 springboot整合kafka
  • 1.引用jar包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 2.kafka配置文...
  • 该篇博客实现Springboot整合kafka ,批量消费 更高级用法请看下篇博客 一. 简介 kafka概念相关的介绍请看官方文档和其他博文 官方中文文档 kafka入门介绍 二. 发送Java Bean消息 2.1 引入依赖 主要是spri...
  • Springboot整合kafka

    2018-11-04 11:33:20
    org.springframework.kafka&lt;/groupId&gt; &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt; &lt;/dependency&gt; (二)配置文件 spring: kafka: boo...
  • SpringBoot整合Kafka实战

    2020-09-14 17:29:22
    SpringBoot整合Kafka实战1.Kafka简介和应用2.Kafka相关概念3.SpringBoot2.x整合Kafka实战3.1Kafka和Zookeeper的本地安装和启动3.2新建SpringBoot项目并加载相关依赖3.3kafka相关属性配置3.4生产者框架搭建 ...
  • SpringBoot整合Kafka:简单收发消息案例

    千次阅读 2017-12-18 11:56:05
    环境说明 ...SpringBoot 2.0.M7 Spring-Kafka 2.1.0.RELEASE JDK 1.8.0_144 Maven 3.5.0 阿里云ECS CentOS 7 Kafka 2.12-1.0.0 zookeeper 3.4.10 下载并解压Kafka下载tgz包 wget http://mirrors.hust.ed
  • SpringBoot整合kafka代码,两个工程 一个消费者 一个生产者,利用定时任务和restapi发送消息,批量消费和指定分区消费。代码简洁。欢迎大家下载。
  • SpringBoot(十二):SpringBoot整合Kafka

    万次阅读 2018-04-15 16:45:04
    提前说明:如果你运行出问题,请检查Kafka的版本与SpringBoot的版本是否与我文中的一致,本文中的环境已经经过测试。 Kafka服务版本为 kafka_2.11-1.1.0 (Scala), 也就是1.1.0 SpringBoot版本:1.5.10.RELEASE ...
  • springboot整合kafka案例demo 在上一篇博客详细介绍了kafka的安装,使用 kafka + zookeeper下载/安装/使用(超详细) 这里介绍一下将kafka整合进springboot中使用.前提还是得启动zk和kafka的服务! 话不说了,直接上...
  • springboot整合kafka集群

    2019-07-10 18:17:22
    一、引入maven依赖: ...--kafka支持--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.6.RELEA...
  • SpringBoot整合Kafka消息

    千次阅读 2018-04-20 17:54:25
    1、添加依赖2、配置KafkaProducerConfig3、配置消费者KafkaConsumerConfig4、发送Kafka消息5、消费Kafka消息
  • springboot整合kafka自动提交的问题

    千次阅读 2020-01-26 20:11:11
    最近遇到一个springboot整合kafka设置手动提交不生效的问题,后来发现是自己的方法不对,走了一些弯路,这里记录一下。 环境准备 spring boot 2.1.6.RELEASE 本地zk, 单节点kafka,版本是kafka_2.11-2.2.0 新建一...
  • 一、Kafka安装 点击下载地址 解压,进入windows目录,启动文件都在这个目录下。 二、启动服务 没有java环境先安装java。 1、启动ZooKeeper 进入D:\my_software\kafka_2.13-2.4.1\bin\windows目录,右键打开...
  • kafka:2.5.0 zookeeper:kafka高版本中自带的zookeeper 首先下载kafka,横线部分是源码,还需要编译,直接下载下面编译后的压缩包 下载地址:http://kafka.apache.org/downloads 解压 :cd/usr/local/kafka &&...
  • Springboot整合kafka及常见问题 1 Maven依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.0.0.RELEASE&...

空空如也

1 2 3 4 5 ... 20
收藏数 42,417
精华内容 16,966
关键字:

kafka多线程