精华内容
下载资源
问答
  • java多线程实现mq消息处理超时监控

    千次阅读 2017-06-09 10:04:22
    项目运用rabbitMq 实现的分布式架构,每台机器既是生成者也是消费者,改项目依赖于外部调度服务,mq的consumer listener引用其他组的jar包(实现对外部资源的调用),消息消费是单线程的,在此调用jar包操作业务或者...

     

    项目运用rabbitMq 实现的分布式架构,每台机器既是生成者也是消费者,改项目依赖于外部调度服务,mq的consumer listener引用其他组的jar包(实现对外部资源的调用),消息消费是单线程的,在此调用jar包操作业务或者是个人代码写的有问题,可能导致线程死锁,或者其他代码问题(http请求未设置超时),导致消息消费被卡住,最终消息堆积。导致正常业务垮掉。既然consumer没配置消息的超时时间,java支持多线程,那么可以用来完美解决,worker线程执行任务,protect线程监听worker线程判断执行时间,(其实这两个线程都是在同时互相监听,各自执行完毕stop未执行完成的线程)不啰嗦直接上代码

     

     BaseThreadUtil类 worker 线程和protect线程集成该抽象类

    /**
     * Created by liweigao on 2017/4/25.
     */
    public abstract class BaseThreadUtil extends Thread {
    
    
      public abstract void execute();//抽象方法需要子类实现
    
      private String threadName = "";
    
      //在父类重写run方法,在子类只要重写execute方法就可以了
      @Override
      public void run() {
        super.run();
        execute();
      }
    
      //在需要回调数据的地方(两个子类需要),声明一个接口
      public static interface Callback {
    
        public void complete();
      }
    
      //2.创建接口对象
      public Callback callback;
    
      public String getThreadName() {
        return threadName;
      }
    
      public void setThreadName(String threadName) {
        this.threadName = threadName;
      }
    }
    

     worker 线程类 执行主要业务线程。

    /**
     * Created by liweigao on 2017/4/25.
     */
    public class WorkerThread extends BaseThreadUtil {
    
      private Logger logger = LoggerFactory.getLogger(WorkerThread.class);
    
      private Runnable runnable;
    
      public WorkerThread(Runnable runnable, String threadName) {
        this.runnable = runnable;
        if (threadName != null && !"".equals(threadName)) {
          super.setThreadName(threadName);
        } else {
          super.setThreadName("worker thread");
        }
      }
    
      @Override
      public void execute() {
        StopWatch stopWatch=new StopWatch();
        stopWatch.start();
        if (runnable != null) {
          runnable.run();
        }
        stopWatch.stop();
    //    System.out.println("线程:" + super.getThreadName() + "执行完毕,开始执行回调……");
        logger.debug("线程:" + super.getThreadName() + "执行完毕,开始执行回调……耗时:"+stopWatch.getTotalTimeMillis() +"ms");
        //任务执行完毕  执行回调
        callback.complete();
      }
    }

     protect 线程类。

     

    /**
     * Created by liweigao on 2017/4/25.
     */
    public class ProtectThread extends BaseThreadUtil {
    
      private Logger logger = LoggerFactory.getLogger(ProtectThread.class);
    
      private Integer timeout = 6000;
    
      public ProtectThread(Integer timeout, String threadName) {
        this.timeout = timeout;
        if (threadName != null && !"".equals(threadName)) {
          super.setThreadName(threadName);
        } else {
          super.setThreadName("protect thread");
        }
      }
    
      @Override
      public void execute() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        try {
          Thread.sleep(timeout);
        } catch (InterruptedException e) {
          logger.error(e.getMessage(), e);
    
        }
        stopWatch.stop();
    //    System.out.println("线程:" + super.getThreadName() + "执行完毕,开始执行回调……");
        logger.debug(
            "线程:" + super.getThreadName() + "执行完毕,开始执行回调……耗时:" + stopWatch.getTotalTimeMillis() + "ms");
        //线程任务执行完毕 执行回调
        callback.complete();
      }
    }

     baseStopUtil 类 也可以叫回调类,处理未完成线程。

    /**
     * Created by liweigao on 2017/4/25.
     */
    public class BaseStopUtil implements Callback {
    
      private Logger logger = LoggerFactory.getLogger(BaseStopUtil.class);
      BaseThreadUtil baseCallBackUtil;
    
      // 获取对象
      public BaseStopUtil(BaseThreadUtil baseCallBackUtil) {
        this.baseCallBackUtil = baseCallBackUtil;
      }
    
      @Override
      public void complete() {
    //    System.out.println("线程:" + baseCallBackUtil.getThreadName() + "被停掉……");
        logger.debug("线程:" + baseCallBackUtil.getThreadName() + "被停掉……");
        if (baseCallBackUtil.isAlive()) {
          baseCallBackUtil.stop();
        }
      }
    }

     

    ListenThreadConsumer 封装对外调用方法,启动两个线程,配置超时时间。
    
    /**
     * Created by liweigao on 2017/4/25.
     */
    public class ListenThreadConsumer {
    
      /**
       * 过期时间
       */
      private Integer timeout;
    
      /**
       * 任务
       */
      private Runnable runnable;
    
      /**
       * 检测间隔时间 默认1000
       */
    //  private Integer spacetime;
      public ListenThreadConsumer(Integer timeout, Runnable runnable) {
        this.timeout = timeout;
        this.runnable = runnable;
      }
    
      public void execute() {
    
        ProtectThread protectThread = new ProtectThread(timeout, "");
        WorkerThread workerThread = new WorkerThread(runnable, "");
        protectThread.callback = new BaseStopUtil(workerThread);
        workerThread.callback = new BaseStopUtil(protectThread);
        protectThread.start();
        workerThread.start();
      }
    }
    
     那么就可以很简单的运用到自己的程序中了。main方法测试。
      public static void main(String[] args) {
        ListenThreadConsumer listenThreadConsumer = new ListenThreadConsumer(100, new Runnable() {
          @Override
          public void run() {
            System.out.println("这是我的测试……………………");
          }
        });
        //执行任务以及监控
        listenThreadConsumer.execute();
      }
    打印日志:
    这是我的测试…………………… 
    2017-04-28 18:56:50.916 [Thread-1] DEBUG c.w.s.c.util.thread.WorkerThread-[34] - 线程:worker thread执行完毕,开始执行回调……耗时:1ms 
    2017-04-28 18:56:50.922 [Thread-1] DEBUG c.w.s.c.util.thread.BaseStopUtil-[23] - 线程:protect thread被停掉…… 
    
    以上完毕,可直接copy复用。以上信息欢迎大神吐槽,欢迎提建议。互相监控,其中哪一个线程有问题,都会被停掉。

     

    展开全文
  • 多线程分批处理数据

    千次阅读 2019-06-05 17:17:06
    然后使用多线程进行异步发送 步骤 1.初始化数据 2.定义每个线程分批发送大小 3.计算每个线程需要分批跑的数据 4.进行分批发送 实例: 实体类: package com.emple.entity; import java.security.SecureRa...

    需求:

    目前蚂蚁课堂有10万个用户,现在蚂蚁课堂需要做活动,给每一个用户发送一条祝福短信
    

    思路:
    在这里插入图片描述
    就是将数据分成几个部分,这里用集合。然后使用多线程进行异步发送
    步骤
    1.初始化数据
    2.定义每个线程分批发送大小
    3.计算每个线程需要分批跑的数据
    4.进行分批发送

    实例:
    实体类:

    package com.emple.entity;
    
    import java.security.SecureRandom;
    
    /**
     * @author shkstart
     * @date 2019/6/5- 16:14
     */
    public class UserEntity {
        private String userId;
        private String userName;
    
        public UserEntity() {
        }
    
        public UserEntity(String userId, String userName) {
            this.userId = userId;
            this.userName = userName;
        }
    
        public String getUserId() {
            return userId;
        }
    
        public void setUserId(String userId) {
            this.userId = userId;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public void setUserName(String userName) {
            this.userName = userName;
        }
    
        @Override
        public String toString() {
            return "UserEntity{" +
                    "userId='" + userId + '\'' +
                    ", userName='" + userName + '\'' +
                    '}';
        }
    }
    
    

    这里要用到一个分页工具,将List数据分割为几个list
    工具类 ListUtils:

    package com.emple;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class ListUtils {
    	/**
    	 *
    	 * @methodDesc: 功能描述:(list 集合分批切割)
    	 * @author: 余胜军
    	 * @param: @param
    	 *             list
    	 * @param: @param
    	 *             pageSize
    	 * @param: @return
    	 * @createTime:2017年8月7日 下午9:30:59
    	 * @returnType:@param list 切割集合
    	 * @returnType:@param pageSize 分页长度
    	 * @returnType:@return List<List<T>> 返回分页数据
    	 * @copyright:上海每特教育科技有限公司
    	 */
    
    	static public <T> List<List<T>> splitList(List<T> list, int pageSize) {
    		int listSize = list.size();
    		int page = (listSize + (pageSize - 1)) / pageSize;
    		List<List<T>> listArray = new ArrayList<List<T>>();
    		for (int i = 0; i < page; i++) {
    			List<T> subList = new ArrayList<T>();
    			for (int j = 0; j < listSize; j++) {
    				int pageIndex = ((j + 1) + (pageSize - 1)) / pageSize;
    				if (pageIndex == (i + 1)) {
    					subList.add(list.get(j));
    				}
    				if ((j + 1) == ((j + 1) * pageSize)) {
    					break;
    				}
    			}
    			listArray.add(subList);
    		}
    		return listArray;
    	}
    }
    
    

    发送类:

    package com.emple;
    
    import com.emple.entity.UserEntity;
    
    import javax.annotation.processing.SupportedSourceVersion;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author shkstart
     * @date 2019/6/5- 16:17
     */
    class UserSendThread implements Runnable{
        private List<UserEntity> listUser;
        public UserSendThread(List<UserEntity> listUser){
            this.listUser = listUser;
        }
        @Override
        public void run() {
            for (UserEntity userEntity :listUser){
                System.out.println("线程name:"+Thread.currentThread().getName()+userEntity.toString());
            }
            System.out.println();
        }
    }
    
    
    public class BatchSms {
        public static void main(String[] args) {
            //1.初始化数据
            List<UserEntity> initUser = initUser();
            //2.定义每个线程分批发送大小
            int userCount = 2;
            //3.计算每个线程需要分批跑的数据
            List<List<UserEntity>> lists = ListUtils.splitList(initUser, userCount);
            for (int i = 0; i < lists.size(); i++) {
                List<UserEntity> userEntities = lists.get(i);
                UserSendThread userSendThread = new UserSendThread(userEntities);
                Thread thread = new Thread(userSendThread,"线程"+i);
                thread.start();
    
            }
            //4.进行分批发送
    
        }
       static private List<UserEntity> initUser(){
            ArrayList<UserEntity> list = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                list.add(new UserEntity("userId:"+i,"userName:"+i));
            }
            return list;
        }
    }
    
    

    运行结果:
    在这里插入图片描述
    注意:
    小项目可以这样实现,大项目不行。因为大项目考虑数量,用线程池管理 mq实现

    展开全文
  • java mq源码 java高并发学习 多线程数据源 redis rabbitmq 高并发 负载均衡 主从数据库
  • MQ监听器下多线程安全问题

    千次阅读 2019-07-18 22:05:24
    今天在写MQ队列监听器队列的时候,突然有一个疑问,局部变量会不会受到多线程的影响,所以为此做了一个实验。 实验 我在监听器中定义了两个变量,一个是类变量,一个是局部变量。然后设置监听器的并发线程为2个、...

    背景

    今天在写MQ队列监听器队列的时候,突然有一个疑问,局部变量会不会受到多线程的影响,所以为此做了一个实验。

    实验

    我在监听器中定义了两个变量,一个是类变量,一个是局部变量。然后设置监听器的并发线程为2个、一次抓取线程的消息个数为10个,具体代码如下所示:

    public int countPublic = 0;
    
    @Override
    @RabbitHandler
    @RabbitListener(queues = "amz_advertisement:big_info", containerFactory = "rabbitListenerContainerFactoryAmzAdvertisementBig")
    public void process(Message msg, Channel channel) {
        int countPrivate = 0;
        try {
            countPrivate++;
            countPublic++;
            logger.error("countPrivate:{},countPublic:{}", countPrivate, countPublic);
        } catch (Exception e) {
            logger.error("bigInfo错误,错误信息为:{}", ExceptionUtil.formatException(e));
        }
    }
    

    然后循环发送100个消息,推送到指定队列中,具体代码如下所示:

    @RequestMapping("/sendMQ/{message}")
    public Object sendMQ(@PathVariable("message")  String message) throws InterruptedException {
        for (int i=0;i<100;i++){
            Thread.sleep(100);
            amqpTemplateDelay.convertAndSend("amz_advertisement:big_info", "我是测试amz_advertisement:info的测试数据");
        }
        return "发送成功";
    }
    

    结果如下所示:

    RabbitListenerEndpointContainer#0-1 [BigInfoAdvertistingListener.java: 43] - countPrivate:1,countPublic:1

    RabbitListenerEndpointContainer#0-1 [BigInfoAdvertistingListener.java: 43] - countPrivate:1,countPublic:3

    RabbitListenerEndpointContainer#0-2 [BigInfoAdvertistingListener.java: 43] - countPrivate:1,countPublic:3

    RabbitListenerEndpointContainer#0-2 [BigInfoAdvertistingListener.java: 43] - countPrivate:1,countPublic:4

    从结果中,我们可以很明显的看到:局部变量一致都是1没有递增,但是类变量一直在递增。

    这是因为Spring默认以单例模式创建对象,所以多线程模式下类变量就会存在线程安全问题。但是局部变量,在多线程模式下是线程安全的,各个线程之间的局部变量都是独享的。

    总结

    有时候我们写代码不注意,就会把局部变量写成类变量,这个时候如果是多线程模式,那将是灾难性的。所以我们平时写代码一定要严谨,不要变量位置随意放置。今天的初试多线程就到这边,下一讲博主将会深入的给大家介绍复杂多线程应用场景。

    林老师带你学编程https://wolzq.com

    想要更多干货、技术猛料的孩子,快点拿起手机扫码关注我,我在这里等你哦~

    展开全文
  • 假设你是大型电商互联网企业的架构师,基于kafka、zookeeper,多线程等技术、基于单台服务器,MQ生产方发送1千万个User对象,User对象包含id,name这两个属性。要求:实现以下两种技术功能:1、并发场景:消息生产方...
  • 正确处理kafka多线程消费的姿势

    万次阅读 多人点赞 2019-08-03 14:21:12
    通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步、解耦、削峰等几大好处,而且开始考虑最大的好处,可以实现架构的水平扩展,下游系统出现性能瓶颈,容器平台伸缩增加一些实例消费能力很快就提上来了,整体...

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息。通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步、 解耦、 削峰等几大好处,而且开始考虑最大的好处,可以实现架构的水平扩展,下游系统出现性能瓶颈,容器平台伸缩增加一些实例消费能力很快就提上来了,整体系统架构上不用任何变动。理论上,我们项目数据量再大整体架构上高可用都没有问题。在使用kafka过程中也遇到一些问题:

    1. 消息逐渐积压,消费能力跟不上;

    2.某个消费者实例因为某些异常原因挂掉,造成少量数据丢失的问题。

    针对消费积压的问题,通过研究kafka多线程消费的原理,解决了消费积压的问题。所以,理解多线程的Consumer模型是非常有必要,对于我们正确处理kafka多线程消费很重要。

    kafka多线程消费模式

    说kafka多线程消费模式前,我们先来说下kafka本身设计的线程模型和ConcurrentmodificationException异常的原因。见官方文档:

    The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.

    ConcurrentmodificationException异常的出处见以下代码:

      /**
         * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
         * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
         * supported).
         * @throws IllegalStateException if the consumer has been closed
         * @throws ConcurrentModificationException if another thread already has the lock
         */
        private void acquire() {
            ensureNotClosed();
            long threadId = Thread.currentThread().getId();
            if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
                throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
            refcount.incrementAndGet();
        }

    该方法acquire 会在KafkaConsumer的大部分公有方法调用第一句就判断是否正在同一个KafkaConsumer被多个线程调用。

    "正在"怎么理解呢?我们顺便看下KafkaConsumer的commitAsync 这个方法就知道了。

     @Override
        public void commitAsync(OffsetCommitCallback callback) {
            acquire(); // 引用开始
            try {
                commitAsync(subscriptions.allConsumed(), callback);
            } finally {
                release(); //引用释放
            }
        }
    

    我们看KafkaConsumer的release方法就是释放正在操作KafkaConsumer实例的引用。

     /**
         * Release the light lock protecting the consumer from multi-threaded access.
         */
        private void release() {
            if (refcount.decrementAndGet() == 0)
                currentThread.set(NO_CURRENT_THREAD);
        }

    通过以上的代码理解,我们可以总结出来kafka多线程的要点: kafka的KafkaConsumer必须保证只能被一个线程操作

    下面就来说说,我理解的Kafka能支持的两种多线程模型,首先,我们必须保证操作KafkaConsumer实例的只能是一个线程,那我们要想多线程只能用在消费ConsumerRecord List上动心思了。下面列举我理解的kafka多线程消费模式。

    • 模式一  1个Consumer模型对应一个线程消费,最多可以有topic对应的partition个线程同时消费Topic。

                

     

    • 模式二 1个Consumer和多个线程消费模型,保证只有一个线程操作KafkaConsumer,其它线程消费ConsumerRecord列表。

    注意 第二种模式其实也可以支持多个Consumer,用户最多可以启用partition总数个Consumer实例,然后,模式二跟模式一唯一的差别就是模式二在单个Consuemr里面是多线程消费,而模式一单个Consumer里面是单线程消费。

    以上两种kafka多线程消费模式优缺点对比:

    kafka多线程消费模式实现    

    关于多线程消费模式具体实现都是选择基于spring-kafka实现,毕竟站在巨人肩膀上,站的高望的远少加班???,以下就是模式二的具体实现,模式一的话就是对模式二的简化,具体实现如下。

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        @Value("${kafka.bootstrap-servers}")
        private String servers;
    
        @Value("${kafka.producer.retries}")
        private int retries;
        @Value("${kafka.producer.batch-size}")
        private int batchSize;
        @Value("${kafka.producer.linger}")
        private int linger;
    
        @Value("${kafka.consumer.enable.auto.commit}")
        private boolean enableAutoCommit;
        @Value("${kafka.consumer.session.timeout}")
        private String sessionTimeout;
        @Value("${kafka.consumer.group.id}")
        private String groupId;
        @Value("${kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
    
        @Value("${msg.consumer.max.poll.records}")
        private int maxPollRecords;
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        public ProducerFactory producerFactory() {
            return new DefaultKafkaProducerFactory(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate kafkaTemplate() {
            return new KafkaTemplate(producerFactory());
        }
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setBatchListener(true);
            // 此处并发度设置的都是Consumer个数,可以设置1到partition总数,
            // 但是,所有机器实例上总的并发度之和必须小于等于partition总数
            // 如果,总的并发度小于partition总数,有一个Consumer实例会消费超过一个以上partition
            factory.setConcurrency(2);
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            return factory;
        }
    
        public ConsumerFactory<String, Object> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
            return propsMap;
        }
    
    }

    具体业务代码在BaseConsumer:

    public abstract class BaseConsumer implements ApplicationListener<ConsumerStoppedEvent> {
    
        private static final Logger LOG = LoggerFactory.getLogger(BaseConsumer.class);
    
        @Value("${kafka.consumer.thread.min}")
        private int consumerThreadMin;
    
        @Value("${kafka.consumer.thread.max}")
        private int consumerThreadMax;
    
        private ThreadPoolExecutor consumeExecutor;
    
        private volatile boolean isClosePoolExecutor = false;
    
        @PostConstruct
        public void init() {
    
            this.consumeExecutor = new ThreadPoolExecutor(
                    getConsumeThreadMin(),
                    getConsumeThreadMax(),
                    // 此处最大最小不一样没啥大的意义,因为消息队列需要达到 Integer.MAX_VALUE 才有点作用,
                    // 矛盾来了,我每次批量拉下来不可能设置Integer.MAX_VALUE这么多,
                    // 个人觉得每次批量下拉的原则 觉得消费可控就行,
                    // 不然,如果出现异常情况下,整个服务示例突然挂了,拉下来太多,这些消息会被重复消费一次。
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>());
        }
    
        /**
         * 收到spring-kafka 关闭Consumer的通知
         * @param event 关闭Consumer 事件
         */
        @Override
        public void onApplicationEvent(ConsumerStoppedEvent event) {
    
            isClosePoolExecutor = true;
            closeConsumeExecutorService();
    
        }
    
        private void closeConsumeExecutorService() {
    
            if (!consumeExecutor.isShutdown()) {
    
                ThreadUtil.shutdownGracefully(consumeExecutor, 120, TimeUnit.SECONDS);
                LOG.info("consumeExecutor stopped");
    
            }
    
        }
    
        @PreDestroy
        public void doClose() {
            if (!isClosePoolExecutor) {
                closeConsumeExecutorService();
            }
        }
    
        @KafkaListener(topics = "${msg.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
        public void onMessage(List<String> msgList, Acknowledgment ack) {
    
            CountDownLatch countDownLatch = new CountDownLatch(msgList.size());
    
            for (String message : msgList) {
                submitConsumeTask(message, countDownLatch);
            }
    
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                LOG.error("countDownLatch exception ", e);
            }
    
            // 本次批量消费完,手动提交
            ack.acknowledge();
            LOG.info("finish commit offset");
    
        }
    
        private void submitConsumeTask(String message, CountDownLatch countDownLatch) {
            consumeExecutor.submit(() -> {
                try {
                    onDealMessage(message);
                } catch (Exception ex) {
                    LOG.error("on DealMessage exception:", ex);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
    
        /**
         * 子类实现该抽象方法处理具体消息的业务逻辑
         * @param message kafka的消息
         */
        protected abstract void onDealMessage(String message);
    
        private int getConsumeThreadMax() {
            return consumerThreadMax;
        }
    
        private int getConsumeThreadMin() {
            return consumerThreadMin;
        }
    
        public void setConsumerThreadMax(int consumerThreadMax) {
            this.consumerThreadMax = consumerThreadMax;
        }
    
        public void setConsumerThreadMin(int consumerThreadMin) {
            this.consumerThreadMin = consumerThreadMin;
        }
    }

    其中,closeConsumeExecutorService方法就是为了服务实例异常退出或者多机房上线kill的情况下,尽最大可能保证本次拉下来的任务被消费掉。最后,附上closeConsumeExecutorService实现,觉得RocketMQ源码这个实现的不错,就借用过来了,在此表示感谢。

      public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
            // Disable new tasks from being submitted.
            executor.shutdown();
            try {
                // Wait a while for existing tasks to terminate.
                if (!executor.awaitTermination(timeout, timeUnit)) {
                    executor.shutdownNow();
                    // Wait a while for tasks to respond to being cancelled.
                    if (!executor.awaitTermination(timeout, timeUnit)) {
                        LOGGER.warn(String.format("%s didn't terminate!", executor));
                    }
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted.
                executor.shutdownNow();
                // Preserve interrupt status.
                Thread.currentThread().interrupt();
            }
        }

    下面回到使用kafka遇到的第二个问题,怎么解决消费者实例因为某些原因挂掉,造成少量数据丢失的问题。其实,通过我们上面的写法,已经不会出现因为某些原因服务实例(docker、物理机)挂掉,丢数据的情况。因为我们是先拉取后消费,消费完才手动提交kafka确认offset。实在还存在万一退出时候调用的closeConsumeExecutorService方法还没有消费完数据,表示这个时候offset肯定没有手动提交,这一部分数据也不会丢失,会在服务实例恢复了重新拉取消费。

    以上的代码存在极小的可能瑕疵,比如,我们双机房切换上线,某机房实例有一部分数据没有消费,下次会重复消费的问题。其实,这个问题我们在业务上通过在配置中心配置一个标识符来控制,当改变标识符控制某些机房停止拉取kafka消息,这个时候我们就可以安全操作,不担心kafka没有消费完,下次重复消费的问题了。

    以上自己使用kafka过程中一些心得体会,难免有所遗漏,感谢指出,知错能改,每天进步?。

     

    展开全文
  • 多线程对JSON数据解析

    千次阅读 2019-06-02 07:33:48
    数据库交易表中每天会产生大量的交易数据数据为前端系统传来(从MQ获取)。数据格式为json格式。 有问题的难点: 当客户表存在已有客户时,需要更新客户的最新信息,否则需要将客户信息落地。 需要调用其他...
  • 因为数据量大概有500万甚至更,所以采用了每次查询5000条,开4线程去更新.使用countDownLatch做的同步,4个线程把5000条数据处理完以后再查5000,一直到所有数据全部处理完. 实际运行的时候就是,主线程反复查询到的...
  • UNIX多线程数据共享与线程同步

    千次阅读 2009-09-01 23:41:00
    UNIX多线程数据共享与线程同步http://blog.csdn.net/hwz119/archive/2007/06/22/1662156.aspx 作者:杨海平 姚洪利 本文选自:中国计算机报 2001年12月18日 在UNIX中,一个进程让另外实体进行某项事务而采取的操作...
  • springboot +activeMQ 多线程消费慢问题

    千次阅读 2019-08-21 17:35:24
    环境 springboot1.5.1.RELEAS activemq 5-15.19 问题描述 在直接使用springboot 中jmsMessagingTemplate直接接收消息队列的内容的时候,会...通过网上资料的查阅,发现了多线程情况下,消息消费慢的问题,很可...
  • 首先,叙述一下当前面临的问题所在。当前系统通过接口调用其他系统的数据,返回的数据达到10万...接口拿到数据后可以暂存到Redis或kafka再者是MQ队列中,以提高接口直接的相率。当然了如果项目团队允许,分布式的Hba...
  • RabbitMQ消费多线程

    千次阅读 2021-01-18 16:50:57
    项目中有一个业务需要先把数据从接口入到原始表,再通过MQ机制,让业务处理程序监听并处理。跑了几天以后发现有大量消息堆积,通过rabbitmq控制台看到Ready了几万条数据,Pulish是3/s左右,但Consumer ack则1/s左右...
  • RabbitMQ设置多线程处理队列消息

    千次阅读 2019-05-17 19:08:32
    @RabbitListener注解指定消费方法,默认...可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。 1、RabbitmqConfig.java中添加容器工厂配置: @Bean("customContainerFac...
  • 所以,从实践中得出的结论是:异构多数据源保证实时数据一致性是一个伪命题,缓存的使用具有不可逆的特性,大多数情况下缓存不能当作真实数据源。 预设场景 欲实现移动App人员位置管理,要求: 1、展示用户总数和...
  • MQ顺序消费解决方案

    千次阅读 2020-09-03 16:18:03
    消息消费:多消费者并行消费,或消费者内部启用多线程消费。异常重试。 2.为什么要保证消息的顺序 : 如果业务上通过消息发送的数据是有前后顺序关系时,则必须保证消息的顺序。 例如:通过MQ同步数据时,如果增...
  • 线程池消费MQ消息队列解决方案

    千次阅读 2019-07-03 13:54:01
    就想着能不能用线程池来消费MQ里的任务,但是问题来了,如果使用线程池的话线程池满了之后会有决绝接收。而且线程池里的队列如果存了很消息,重启服务的时候会造成消息的丢失。怎么办好呢,思来想去,还是继承...
  • 单机压测工具JMH JMH Java准测试工具套件 什么是... } } ProducerType生产者线程模式 ProducerType有两种模式 Producer.MULTI和Producer.SINGLE 默认是MULTI,表示在多线程模式下产生sequence 如果确认是单线程生产者,...
  • 在开发中,遇到了这样一个问题,我们使用ActivateMQ来接收处理消息,然后调用人工智能的算法去处理数据,但是算法处理的速度太慢,跟不上消息的接收速度,限制于硬件的问题,算法也没办法增加更的服务器来进行并发...
  • 今天看视频,里面讲了一个经典的例子,是...队列的使用,而且很好的利用多线程并发的处理了任务,提高了吞吐量.首先看生产端:package com.jvm.activemq.bhz.mq; import com.jvm.util.PropertiesUtil; import org.apache...
  • 生产者与消费者读前建议深入理解Java线程安全——银行欠我400万!!!一分钟用睡前小故事图解Java线程之死锁一、情景设计二、引入生产者与消费者三、情景的代码再现 读前建议 如果需要对本文有更加深刻的了解建议...
  • 消息队列和多线程的选择

    千次阅读 2020-06-17 11:33:40
    可靠性要求高时选择消息队列:消息队列和多线程两者并不冲突,多线程可以作为队列的生产者和消费者。 使用外部的消息队列时,第一是可以提高应用的稳定性,当程序fail后,已经写入外部消息队列的数据依旧是保存的,...
  • 多线程 队列 线程执行器 调度 生产者与消费者进行 消息队列入队出列
  • RPC,MQ,数据同步

    千次阅读 2017-08-15 11:09:20
    RPC,MQ,数据同步三大基础组件服务使用说明 转至元数据结尾 Created by 李江, last modified on 五月 22, 2015 转至元数据起始 概述 这个文档的目的是想更清楚的展示RPC...
  • Java 多线程常见问题

    2018-11-24 23:04:01
    1、多线程有什么用? 1)发挥多核CPU的优势 随着工业的进步,现在的笔记本、台式机乃至商用的应用服务器至少也都是双核的,4核、8核甚至16核的也都不少见,如果是单线程的程序,那么在双核CPU上就浪费了50%,在4...
  • basic-cornerstone:Java基础(Java8,Java11),包含并发编程,同步和锁,线程线程池,泛型,数据结构,算法... 设计:Java实现设计模式basic:Java基础代码实践leet:Leetcode刷题nettt:netty库实践Spring:...
  • 多线程+Quartz+ActiveMQ

    2016-11-16 02:02:18
    从MongoDB中获取满足条件的数据 并 发送至MQ,发送MQ有个条件:先去MQ中获取队列剩余数,(我设置了MQ队列最大数200),若剩余数超过200则不发送 } 现在的问题是:控制不了了,现在队列数已经超过200了,怎么办?
  • 在实际应用中,线程往往会共享一些数据,并且各个线程之间的状态和行为是相互影响的。线程之间的影响有两种,一种是线程间的互斥,另一种是线程间的同步。 ---线程安全(Thread-Safe) ---线程间的互斥 ---...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 32,512
精华内容 13,004
关键字:

多线程如何消费mq的数据