精华内容
下载资源
问答
  • RocketMQ 消费多线程 源码

    千次阅读 2017-12-30 01:18:00
    RocketMQ 消费多线程 源码 博客分类: MQ /** * Wrapped push consumer.in fact,it works as remarkable as the pull consumer * * @author shijia.wxr * @since 2013-7-24 */ public class ...
    RocketMQ 消费者 多线程 源码 博客分类: MQ
    /**
     * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
     *
     * @author shijia.wxr<vintage.wang@gmail.com>
     * @since 2013-7-24
     */
    public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { 
    
    
       /**
         * Minimum consumer thread number
         */
        private int consumeThreadMin = 20;
        /**
         * Max consumer thread number
         */
        private int consumeThreadMax = 64;

     

     

      

    com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
    
    public void start() throws MQClientException {
            switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}",
                    this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(),
                    this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
    
                this.checkConfig();
    
                this.copySubscription();
    
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
    
                this.mQClientFactory =
                        MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,
                            this.rpcHook);
    
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
                    .getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    
                this.pullAPIWrapper = new PullAPIWrapper(//
                    mQClientFactory,//
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                }
                else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore =
                                new LocalFileOffsetStore(this.mQClientFactory,
                                    this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore =
                                new RemoteBrokerOffsetStore(this.mQClientFactory,
                                    this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                    }
                }
                this.offsetStore.load();
    
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                            new ConsumeMessageOrderlyService(this,
                                (MessageListenerOrderly) this.getMessageListenerInner());
                }
                else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                            new ConsumeMessageConcurrentlyService(this,
                                (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                //消费者服务
                this.consumeMessageService.start();
    
                boolean registerOK =
                        mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group["
                            + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please."
                            + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
                }
    
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
                        + this.serviceState//
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
            default:
                break;
            }
    
            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    
            this.mQClientFactory.rebalanceImmediately();
        }

     

     

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        private static final Logger log = ClientLogger.getLog();
        private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
        private final DefaultMQPushConsumer defaultMQPushConsumer;
        private final MessageListenerConcurrently messageListener;
        private final BlockingQueue<Runnable> consumeRequestQueue;
        private final ThreadPoolExecutor consumeExecutor;
        private final String consumerGroup;
    
        private final ScheduledExecutorService scheduledExecutorService;
    
    
        public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
                MessageListenerConcurrently messageListener) {
            this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
            this.messageListener = messageListener;
    
            this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
            this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
            this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
    
            this.consumeExecutor = new ThreadPoolExecutor(//
                this.defaultMQPushConsumer.getConsumeThreadMin(),//
                this.defaultMQPushConsumer.getConsumeThreadMax(),//
                1000 * 60,//
                TimeUnit.MILLISECONDS,//
                this.consumeRequestQueue,//
                new ThreadFactoryImpl("ConsumeMessageThread_"));
    
            this.scheduledExecutorService =
                    Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
                        "ConsumeMessageScheduledThread_"));
        }
    
    
        public void start() {
        }

      

    @Override
        public void submitConsumeRequest(//
                final List<MessageExt> msgs, //
                final ProcessQueue processQueue, //
                final MessageQueue messageQueue, //
                final boolean dispatchToConsume) {
            final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
            if (msgs.size() <= consumeBatchSize) {
                ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
                this.consumeExecutor.submit(consumeRequest);
            }
            else {
                for (int total = 0; total < msgs.size();) {
                    List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                    for (int i = 0; i < consumeBatchSize; i++, total++) {
                        if (total < msgs.size()) {
                            msgThis.add(msgs.get(total));
                        }
                        else {
                            break;
                        }
                    }
    
                    ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                    this.consumeExecutor.submit(consumeRequest);
                }
            }
        }

       

    com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
        public void pullMessage(final PullRequest pullRequest) {
            final ProcessQueue processQueue = pullRequest.getProcessQueue();
            if (processQueue.isDropped()) {
                log.info("the pull request[{}] is droped.", pullRequest.toString());
                return;
            }
    
            pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
    
            try {
                this.makeSureStateOK();
            }
            catch (MQClientException e) {
                log.warn("pullMessage exception, consumer state not ok", e);
                this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenException);
                return;
            }
    
            if (this.isPause()) {
                log.warn("consumer was paused, execute pull request later. instanceName={}",
                    this.defaultMQPushConsumer.getInstanceName());
                this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenSuspend);
                return;
            }
    
            long size = processQueue.getMsgCount().get();
            if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
                this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
                if ((flowControlTimes1++ % 1000) == 0) {
                    log.warn("the consumer message buffer is full, so do flow control, {} {} {}", size,
                        pullRequest, flowControlTimes1);
                }
                return;
            }
    
            if (!this.consumeOrderly) {
                if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                    this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
                    if ((flowControlTimes2++ % 1000) == 0) {
                        log.warn("the queue's messages, span too long, so do flow control, {} {} {}",
                            processQueue.getMaxSpan(), pullRequest, flowControlTimes2);
                    }
                    return;
                }
            }
    
            final SubscriptionData subscriptionData =
                    this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (null == subscriptionData) {
                // 由于并发关系,即使找不到订阅关系,也要重试下,防止丢失PullRequest
                this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenException);
                log.warn("find the consumer's subscription failed, {}", pullRequest);
                return;
            }
    
            final long beginTimestamp = System.currentTimeMillis();
    
            PullCallback pullCallback = new PullCallback() {
                @Override
                public void onSuccess(PullResult pullResult) {
                    if (pullResult != null) {
                        pullResult =
                                DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                                    pullRequest.getMessageQueue(), pullResult, subscriptionData);
    
                        switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
                                pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
    
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            }
                            else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
    
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
                                    pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(),
                                    pullResult.getMsgFoundList().size());
    
                                boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                    pullResult.getMsgFoundList(), //
                                    processQueue, //
                                    pullRequest.getMessageQueue(), //
                                    dispathToConsume);
    
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                }
                                else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
    
                            if (pullResult.getNextBeginOffset() < prevRequestOffset//
                                    || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",//
                                    pullResult.getNextBeginOffset(),//
                                    firstMsgOffset,//
                                    prevRequestOffset);
                            }
    
                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
    
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",//
                                pullRequest.toString(), pullResult.toString());
    
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
    
                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(
                                            pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
    
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest
                                            .getMessageQueue());
    
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl
                                            .removeProcessQueue(pullRequest.getMessageQueue());
    
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    }
                                    catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                        }
                    }
                }

     

    转载于:https://my.oschina.net/xiaominmin/blog/1599045

    展开全文
  • 问题RocketMQ重复消费问题RocketMQ线程过高问题线上场景场景一:重复消费场景:生产有这么一种场景,我们在RocketMQ中对一个topic创建了16个tag,不同总类的信息放到不同的tag中,在消费端每个tag对应三个线程组成...

    问题

    RocketMQ重复消费问题

    RocketMQ线程过高问题

    线上场景

    场景一:重复消费

    场景:生产有这么一种场景,我们在RocketMQ中对一个topic创建了16个tag,不同总类的信息放到不同的tag中,在消费端每个tag对应三个线程组成group去消费消息。消费服务在线上是集群部署,是使用docker进行部署的。

    问题1:tag中的消息发生了稳定的重复性消费。

    排查:首先我们发现重复消费的次数和线上集群的台数是一致的,所以这个时候就去查看配置信息,然后发现没有配置错误,在多方试错的情况下,最后在rocketmq的监控页面发现ClientId获取的IP竟然是一样的。

    820bfa2f1fe3ed2453a40db5c2c35543.png

    这时候阅读RocketMQ的源码,我们在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl的start方法中看到下面这行代码

    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

    点进去看到

    677273ce11d53f4d34b71e1cd39ed6a9.png

    41faf0439ea0ec6078c488b6734013fe.png

    从上面图中的代码以及我们看到的RocketMQ的监控图可以明白一点,rocketmq在docker部署中通过getLocalAddress方法获取出来的IP是一样,如果你不设置instanceName和unitName,那么多台机器上面使用的就是一个instance。这样可能会造成重复消费,那么为什么instanceName一致就会造成重复消费呢?接着往下看

    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

    start方法

    // 一个JVM中的所有消费组、生产者持有同一个MQClientInstance,MQClientInstance只会启动一次

    mQClientFactory.start();

    org.apache.rocketmq.client.impl.factory.MQClientInstance

    public void start() throws MQClientException {

    ...

    this.rebalanceService.start();

    ...

    }

    org.apache.rocketmq.client.impl.consumer.RebalanceService

    @Override

    public void run() {

    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {

    // 该线程默认20s执行一次rebalance

    this.waitForRunning(waitInterval);

    this.mqClientFactory.doRebalance();

    }

    log.info(this.getServiceName() + " service end");

    }

    org.apache.rocketmq.client.impl.factory.MQClientInstance

    public void doRebalance() {

    // 遍历注册的所有已经注册的消费者,对消费者执行rebalance

    for (Map.Entry entry : this.consumerTable.entrySet()) {

    MQConsumerInner impl = entry.getValue();

    if (impl != null) {

    try {

    impl.doRebalance();

    } catch (Throwable e) {

    log.error("doRebalance exception", e);

    }

    }

    }

    }

    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

    @Override

    public void doRebalance() {

    if (!this.pause) {

    // 每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象

    this.rebalanceImpl.doRebalance(this.isConsumeOrderly());

    }

    }

    org.apache.rocketmq.client.impl.consumer.RebalanceImpl

    public void doRebalance(final boolean isOrder) {

    Map subTable = this.getSubscriptionInner();

    if (subTable != null) {

    // 遍历订阅信息对每个主题的队列进行重新负载

    for (final Map.Entry entry : subTable.entrySet()) {

    final String topic = entry.getKey();

    try {

    this.rebalanceByTopic(topic, isOrder);

    } catch (Throwable e) {

    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

    log.warn("rebalanceByTopic Exception", e);

    }

    }

    }

    }

    this.truncateMessageQueueNotMyTopic();

    }

    org.apache.rocketmq.client.impl.consumer.RebalanceImpl

    private void rebalanceByTopic(final String topic, final boolean isOrder) {

    switch (messageModel) {

    case BROADCASTING: {

    ...

    }

    case CLUSTERING: {

    // 从主题订阅信息缓存表中获取该topic的队列信息

    Set mqSet = this.topicSubscribeInfoTable.get(topic);

    // topic分布在多个broker上,但是每个broker都存有所有的消费者信息,因为消费者启动的时候需要像所有的broker注册信息

    // 这里获取的是当前topic下消费者组里所有的消费者客户端ID

    List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

    // 对cidAll和mqAll排序,确保所有消费者结果一致,这样一个消费队列就只能被一个消费者分配

    Collections.sort(mqAll);

    Collections.sort(cidAll);

    // 默认为AllocateMessageQueueAveragely

    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

    allocateResult = strategy.allocate(//

    this.consumerGroup, //

    this.mQClientFactory.getClientId(), //

    mqAll, //

    cidAll);

    }

    }

    }

    我们知道RocketMQ不管push还是pull其实底层的实现都是pull,我们看到最后发现他会根据topic和group从broker那里获取出来所有cunsumer client,如果clientId相同,那么在broker上面只有一个,获取出来的是一样,那么拉取的MessageQueue就是一样的。于是我们就可以给consumer的instanceName设置一个随机值

    场景二:线程剧增

    问题2: 设置完随机值以后确实不重复消费了,但是发现服务器的线程飙升。

    排查:jstack下来线上日志,发现里面有很多netty以及rocketmq相关的线程,于是我们再次进到源码中。这里我就不详细跟踪代码了

    473f94c7205b76c2608583bac43098be.png

    我们从这里可以看到consumer端起了很多线程,报错与broker建立链接的线程,这里面会级联产生多个netty相关的线程,然后是定时任务的线程,以及拉取消息的线程和负载均衡的线程。于是我们把instanceName的随机性与服务绑定,而不是与tag绑定,这样就可以做到一台服务器以他instance

    结论

    对于同一个jvm实例我们只需要一个instance实例即可,对于多个jvm我们要区分,不然集群消费会隐式的变为广播消费

    参考

    展开全文
  • kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsumer 本身并没有实现多线程消费逻辑,如需多线程消费,还需要用户自行实现,在这里我会讲到 Kafka ...

    之前在讨论顺序消息的一些知识,看到有个读者的留言如下:

    这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/RocketMQ 消费线程模型。

    Kafka

    kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsumer 本身并没有实现多线程消费逻辑,如需多线程消费,还需要用户自行实现,在这里我会讲到 Kafka 两种多线程消费模型。

    1、每个线程维护一个 KafkaConsumer

    这样相当于一个进程内拥有多个消费者,也可以说消费组内成员是有多个线程内的 KafkaConsumer 组成的。

    image

    但其实这个消费模型是存在很大问题的,从消费消费模型可看出每个 KafkaConsumer 会负责固定的分区,因此无法提升单个分区的消费能力,如果一个主题分区数量很多,只能通过增加 KafkaConsumer 实例提高消费能力,这样一来线程数量过多,导致项目 Socket 连接开销巨大,项目中一般不用该线程模型去消费。

    2、单 KafkaConsumer 实例 + 多 worker 线程

    针对第一个线程模型的缺点,我们可采取 KafkaConsumer 实例与消息消费逻辑解耦,把消息消费逻辑放入单独的线程中去处理,线程模型如下:

    image

    从消费线程模型可看出,当 KafkaConsumer 实例与消息消费逻辑解耦后,我们不需要创建多个 KafkaConsumer 实例就可进行多线程消费,还可根据消费的负载情况动态调整 worker 线程,具有很强的独立扩展性,在公司内部使用的多线程消费模型就是用的单 KafkaConsumer 实例 + 多 worker 线程模型。

    但这个消费模型由于消费逻辑是利用多线程进行消费的,因此并不能保证其消息的消费顺序,在这里我们可以引入阻塞队列的模型,一个 woker 线程对应一个阻塞队列,线程不断轮训从阻塞队列中获取消息进行消费,对具有相同 key 的消息进行取模,并放入相同的队列中,实现顺序消费, 消费模型如下:

    image

    但是以上两个消费线程模型,存在一个问题:

    在消费过程中,如果 Kafka 消费组发生重平衡,此时的分区被分配给其它消费组了,如果拉取回来的消息没有被消费,虽然 Kakfa 可以实现 ConsumerRebalanceListener 接口,在新一轮重平衡前主动提交消费偏移量,但这貌似解决不了未消费的消息被打乱顺序的可能性?

    因此在消费前,还需要主动进行判断此分区是否被分配给其它消费者处理,并且还需要锁定该分区在消费当中不能被分配到其它消费者中(但 kafka 目前做不到这一点)。

    参考 RocketMQ 的做法:

    在消费前主动调用 ProcessQueue#isDropped 方法判断队列是否已过期,并且对该队列进行加锁处理(向 broker 端请求该队列加锁)。

    RocketMQ

    RocketMQ 不像 Kafka 那么“原生”,RocketMQ 早已为你准备好了你的需求,它本身的消费模型就是单 consumer 实例 + 多 worker 线程模型,有兴趣的小伙伴可以从以下方法观摩 RocketMQ 的消费逻辑:

    org.apache.rocketmq.client.impl.consumer.PullMessageService#run
    

    RocketMQ 会为每个队列分配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会不断轮询从 pullRequestQueue 中取出 PullRequest 去拉取消息,接着将拉取到的消息给到 ConsumeMessageService 处理,ConsumeMessageService 有两个子接口:

    // 并发消息消费逻辑实现类
    org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
    // 顺序消息消费逻辑实现类
    org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
    

    其中,ConsumeMessageConcurrentlyService 内部有一个线程池,用于并发消费,同样地,如果需要顺序消费,那么 RocketMQ 提供了 ConsumeMessageOrderlyService 类进行顺序消息消费处理。

    经过对 Kafka 消费线程模型的思考之后,从 ConsumeMessageOrderlyService 源码中能够看出 RocketMQ 能够实现局部消费顺序,我认为主要有以下两点:

    1)RocketMQ 会为每个消息队列建一个对象锁,这样只要线程池中有该消息队列在处理,则需等待处理完才能进行下一次消费,保证在当前 Consumer 内,同一队列的消息进行串行消费。

    2)向 Broker 端请求锁定当前顺序消费的队列,防止在消费过程中被分配给其它消费者处理从而打乱消费顺序。

    总结

    经过这篇文章的分析后,尝试回答文章开头的那个问题:

    1)多分区的情况下:

    如果想要保证 Kafka 在消费时要保证消费的顺序性,可以使用每个线程维护一个 KafkaConsumer 实例,并且是一条一条地去拉取消息并进行消费(防止重平衡时有可能打乱消费顺序);对于能容忍消息短暂乱序的业务(话说回来, Kafka 集群也不能保证严格的消息顺序),可以使用单 KafkaConsumer 实例 + 多 worker 线程 + 一条线程对应一个阻塞队列消费线程模型。

    1)单分区的情况下:

    由于单分区不存在重平衡问题,以上两个线程模型的都可以保证消费的顺序性。

    另外如果是 RocketMQ,使用 MessageListenerOrderly 监听消费可保证消息消费顺序。

    很多人也有这个疑问:既然 Kafka 和 RocketMQ 都不能保证严格的顺序消息,那么顺序消费还有意义吗?

    一般来说普通的的顺序消息能够满足大部分业务场景,如果业务能够容忍集群异常状态下消息短暂不一致的情况,则不需要严格的顺序消息。

    如果你对文章还有什么疑问和补充或者发现文中有错误的地方,欢迎留言,我们一起探讨。

    展开全文
  • Rocketmq 消费者在高峰期希望手动减少消费线程数,通过DefaultMQPushConsumer.updateCorePoolSize方法可以调用内部的ThreadPoolExecutor.setCorePoolSize设置多线程核心线程数。 那么是否能够通过调整参数动态调整...

    背景

    Rocketmq 消费者在高峰期希望手动减少消费线程数,通过DefaultMQPushConsumer.updateCorePoolSize方法可以调用内部的ThreadPoolExecutor.setCorePoolSize设置多线程核心线程数。

    那么是否能够通过调整参数动态调整Rocketmq消费者呢。

    结论

    • 多线程ThreadPoolExecutor.setCorePoolSize可以修改核心线程数,但是减少核心线程数不一定生效
    • 核心线程销毁的前提是至少在keepAliveTime内没有新的任务提交

    动态调整消费线程实现方案

    • 可以通过调整核心线程数减少RocketMQ 消费线程数
      1. 先挂起消费者consumer.suspend()
      2. 调用consumer.updateCorePoolSize更新核心线程数
      3. 然后休眠至少1分钟以上,等任务全部消费完成,1分钟是基于ConsumeMessageConcurrentlyService中创建线程池默认参数1000*60 TimeUnit.MILLISECONDS得到的, 还需要加上本地队列堆积任务消费完成时间
      4. 恢复消费者consumer.resume()
    consumer.suspend();
    consumer.updateCorePoolSize(3);
    try {
    	TimeUnit.SECONDS.sleep(65000L);
     } catch (Exception e) {
    	log.error("InterruptException", e);
    }
    consumer.resume();
    
    • 增加消费线程数,直接通过consumer.updateCorePoolSize方法就可以实现

    测试

    ThreadTest.java

    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.common.ThreadFactoryImpl;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class ThreadTest {
       public static void main(String[] args) throws InterruptedException {
           ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                   10,
                   50,
                   1000 * 60,
                   TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<>(),
                   new ThreadFactoryImpl("test" + "_" + "ConsumeMessageThread_"));
    
           for (int i = 0; i < 1000; i++) {
               threadPoolExecutor.submit(new Runnable() {
                   @SneakyThrows
                   @Override
                   public void run() {
                       Thread.sleep(5);
                       log.info("hello");
                   }
               });
           }
           log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
           Thread.sleep(10000L);
           threadPoolExecutor.setCorePoolSize(3);
           log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
           // Thread.sleep(1000*60); // 如果休眠时间大于KeepAliveTime将会只有3个线程
           Thread.sleep(1000L);  // 休眠时间不够时仍然有10个线程
           for (int i = 0; i < 1000; i++) {
               threadPoolExecutor.submit(new Runnable() {
                   @SneakyThrows
                   @Override
                   public void run() {
                       Thread.sleep(10);
                       log.info("hello2");
                   }
               });
           }
       }
    }
    

    实验证明setCorePoolSize在设置为3个线程以后,在第二批任务提交还是有10个线程在工作, 但是如果在第二批任务提交前休眠时间大于keepAliveTime以后则只会有3个工作线程

    原理

    源码部分主要看是ThreadPoolExecutor中的workers变量,setCorePoolSize()方法,runWorker()方法,getTask()方法

    • 一个work在执行runWorker()方法时只有在获取任务getTask()方法返回null以后才会终止循环,然后销毁
    • getTask()方法从任务队列中拿取任务等待keepAliveTime超时以后才会有可能返回null
        // 工作workers, work只有在获取任务超时以后才会从workers中删除
        private final HashSet<Worker> workers = new HashSet<Worker>();
        
        public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
            // 减少核心线程数以后进入interruptIdleWorkers方法
                interruptIdleWorkers();
            else if (delta > 0) {
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {
                    if (workQueue.isEmpty())
                        break;
                }
            }
        }
    
         private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            // 在interruptIdleWorkers方法中只是将work的线程中断,并没有从workers删除
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    
       final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
               // 重点是getTask()方法获取task失败才会中断循环
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
       private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                // 超时以后进入这里的if返回null然后work才会被销毁
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
       private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                // 这里才真正将worker删除
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);
            }
        }
    
    展开全文
  • rocketMq设置消费者单线程消费

    千次阅读 2019-12-18 21:31:02
    // 订阅指定topic下所有消息,注意:一个consumer对象可以订阅个topic consumer.subscribe(rocketMQConfig.getSmsReceiptTopic(), "*"); consumer.setVipChannelEnabled(false); // 设置Consumer第一次启动是从...
  • rocketmq批量消费

    万次阅读 2019-12-02 19:31:28
    rocketmq默认就是可以批量消费的,但需要设置个参数一起配合。 我们只需要知道他是怎么消费的,就可以很精准的设置他的批量消费参数。 我们看看DefaultMQPushConsumer源码中的这几个参数: /** * 消费消息线程,...
  • ClancyH2019-03-12 14:05:37 +08:00发现问题原因了,应该是处理程序挂在通过 Druid 获取数据库连接上了,对应的线程信息"Thread-0" #10 prio=5 os_prio=0 tid=0x00007fd8d4491000 nid=0x3d0e waiting on condition ...
  • 每个consumer都有一个pullMessageSrvice,启动线程接受pullReques. 第一个pullReuqest是balanceService里发出的. RequestCode.pull_message 最终汇总到pullAPIWrapper.pullKernelImpl去请求 ? 为什么说拉数据都是在一...
  • RocketMQ消息支持的模式:  消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay。 消息同步发送:  普通消息的发送和接收在前面已经演示过了,在前面的案例中是基于同步消息发送模式。也...
  • RocketMQ多线程场景生产和消费TPS测试

    千次阅读 2016-06-30 23:09:00
    多线程环境下,测试producer端的TPS 和 consumer端的TPS。   2、评测指标 (1)生产者producer TPS、线程个数、发送成功数量、发送失败数量、接收成功数量、接收失败数量、发送消息成功总耗时。 ...
  • 前言RocketMQ配置中有一个设置项为transferMsgByHeap,即是否通过堆内存传输数据。在文章“RocketMQ存储--同步刷盘和异步刷盘”中对其进行过梳理。那transf...
  • RocketMQ是用了两把锁: 1)向 Broker 端请求锁定当前顺序消费的队列,防止在消费过程中被分配...kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsum.
  • RocketMQ之Broker线程模型

    千次阅读 2019-03-26 09:51:01
    1.概述 RocketMQ是阿里开源的一款高性能、高吞吐量的分布式...网上关于RocketMQ各方面(如RPC通信、消息存储、消费发送、消息消费等)的介绍非常,也比较详细,本文在此就不重复了,感兴趣的同学可以百度一下。...
  • RocketMQ顺序消费

    2020-05-10 17:59:10
    一、rocketmq顺序消费的原理 1、消息的有序性是指消息的消费顺序能够与消息的发送顺序一致。...消费的时候通过一个队列只会被一个线程取到 ,第二个线程无法访问这个队列 来保证队列有序性。rocketmq可以
  • RocketMQ消费模式

    2021-01-14 06:41:46
    RocketMQ消费模式有2种查看一下源码,在默认情况下,就是集群消费(CLUSTERING)。另一种消费模式,是广播消费(BROADCASTING)。其实,对于RocketMQ而言,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点...
  • 正确处理kafka多线程消费的姿势

    万次阅读 多人点赞 2019-08-03 14:21:12
    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息。通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步、解耦、削峰等几大好处,而且开始考虑最大...
  • RocketMQ消费优化

    千次阅读 2019-01-03 20:42:18
    消息团队一直致力于RocketMQ的性能优化,双十一前进行了低延时(毛刺)优化,保障了双十一万亿消息的流转如丝般顺滑,在2016年双十一种,MetaQ以接近万亿的消息总量支撑着全集团数千个应用,在系统解耦、削峰填谷、...
  • RocketMQ中的线程名称

    2019-12-15 01:14:43
    Boss线程池 NioEventLoopGroup NettyNIOBoss_1 Worker线程池 NioEventLoopGroup NettyServerNIOSelector_3_1 NettyServerNIOSelector_3_2 NettyServerNIOSelector_3_3 Executor线程池 DefaultEventExecutorGroup ...
  • rocketmq 两个线程同时消费一个消息

    千次阅读 2017-12-27 11:14:00
    运维同事 部署时 有一个实例用root账户重启的, 然后该实例出现两个线程同时消费一个消息的情况,并且后台查看该消费group delay很。 图一 host1机器两个实例 一个为root启动 图二 rocketmq 监控后台 该...
  • rocketMq - 并发消费过程

    千次阅读 2018-03-20 19:26:00
    rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析并发消费问题。 并发消费需要理解的几个核心点:并发消费的消息拉取,并发消费的消息重试,并发...
  • 并发消费使用示例: public class BalanceComuser { public static void main(String[] args) throws Exception { // 实例化消息生产者,指定组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer...
  • RocketMQ消费

    2021-01-17 15:53:54
    但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者...
  • springboot整合rocketmq实现顺序消费

    千次阅读 2020-03-18 12:10:57
    消息队列依然成为当下非常火热的中间件,而rocketmq作为...rocketmq在发送消息的时候,是将消息发送到不同的队列(queue,也有人称之为分区)中,然后消费端从个队列中读取消息进行消费,很明显,在这种全局模式...
  • 保证RocketMQ消息顺序消费的关键主要有以下几点: 保证生产者消费者用同一topic 保证生产者消费者用同一topic下的同一个queue(默认一个topic下有4个queue) 发消息的时候用一个线程去发送消息 消费的时候 只用...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,237
精华内容 4,494
关键字:

多线程消费rocketmq