精华内容
下载资源
问答
  • RocketMQ

    万次阅读 多人点赞 2019-07-31 19:17:34
    一、RocketMQ简介 1.1、介绍 RocketMQ是一款分布式、队列模型的消息中间件,由Metaq3.X版本改名而来,RocketMQ并不遵循包括JMS规范在内的任何规范,但是参考了各种规范不同类产品的设计思想,自己有一套...

    此为博主(yjclsx)原创文章,如若转载请标明出处,谢谢!
    #一、RocketMQ简介
    ##1.1、介绍
    RocketMQ是一款分布式、队列模型的消息中间件,由Metaq3.X版本改名而来,RocketMQ并不遵循包括JMS规范在内的任何规范,但是参考了各种规范不同类产品的设计思想,自己有一套自定义的机制,简单来说就是使用订阅主题的方式去发送和接收任务,但是支持集群和广播两种消息模式。开源项目地址:https://github.com/apache/rocketmq
    具有以下特点:
    1、能够保证严格的消息顺序
    2、提供丰富的消息拉取模式
    3、高效的订阅者水平扩展能力
    4、实时的消息订阅机制
    5、亿级消息堆积能力
    选用理由:
    1、强调集群无单点,可扩展,任意一点高可用,水平可扩展。
    2、海量消息堆积能力,消息堆积后,写入低延迟。
    3、支持上万个队列。
    4、消息失败重试机制。
    5、消息可查询。
    6、开源社区活跃。
    7、成熟度(历经多次天猫双十一海量消息考验)
    ##1.2、专业术语
    1、Producer
    消息生产者,负责产生消息,一般由业务系统负责产生消息。
    2、Consumer
    消息消费者,负责消费消息,一般是后台系统负责异步消费。
    3、Push Consumer
    Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
    4、Pull Consumer
    Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
    5、Producer Group
    一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
    6、Consumer Group
    一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
    7、Broker
    消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。
    8、广播消费
    一条消息被多个 Consumer 消费,即使返些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。
    在 CORBA Notification 规范中,消费方式都属于广播消费。
    在 JMS 规范中,相当于 JMS publish/subscribe model
    9、集群消费
    一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。
    在 CORBA Notification 规范中,无此消费方式。
    在 JMS 规范中,JMS point-to-point model 与之类似,但是 RocketMQ 的集群消费功能大等于 PTP 模型。
    因为 RocketMQ 单个 Consumer Group 内的消费者类似于 PTP,但是一个 Topic/Queue 可以被多个 Consumer Group 消费。
    10、顺序消息
    消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要挃的是尿部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序収送,丏収送到同一个队列,返样 Consumer 就可以挄照 Producer 发送的顺序去消费消息。
    11、普通顺序消息
    顺序消息的一种,正常情冴下可以保证完全的顺序消息,但是一旦収生通信异常,Broker 重启,由亍队列总数収生发化,哈希叏模后定位的队列会发化,产生短暂的消息顺序丌一致。如果业务能容忍在集群异常情冴(如某个 Broker 宕机戒者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
    12、严格顺序消息
    顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只要有一台机器丌可用,则整个集群都丌可用,服务可用性大大降低。
    如果服务器部署为同步双写模式,此缺陷可通过备机自劢切换为主避免,丌过仍然会存在几分钟的服务丌可用。(依赖同步双写,主备自劢切换,自劢切换功能目前迓未实现)
    目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
    13、Message Queue
    在 RocketMQ 中,所有消息队列都是持丽化,长度无限的数据结构,所谓长度无限是挃队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。
    也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。
    ##1.3、关键概念
    ###1.3.1、主题与标签
    主题Topic:第一级消息类型,书的标题;
    标签Tags:第二级消息类型,书的目录,可以基于Tag做简单的消息过滤,通常这已经可以满足90%的需求了,如果有更复杂的过滤场景,就需要使用rocketmq-filtersrv组件了。
    例如,主题是订单交易,那么标签可以是订单交易-创建、订单交易-付款、订单交易-完成。
    通过查看源码就可以发现:一个主题在MQ上默认会有4个Queue队列来存储该主题上的消息,Queue的数量也可以在创建主题时指定。这也是为什么,当MQ采用双Master集群方式时,如果向MQ发送100条消息,其中52条在BrokerA上,48条在BrokerB上。因为4条发给A,4条发给B…依次循环下去,最后4条是发给了A,所以A比B多存储了4条消息。
    ###1.3.2、群组
    这里写图片描述
    生产组:用于消息的发送的群组,官方推荐:一个生产组理应发送的是同一主题的消息,消息子类型再使用Tags来区分;
    消费组:用于消息的订阅处理的群组,官方推荐:一个消费组理应消费的是同一主题的消息,再使用Tags在Broker做消息过滤。
    生产组和消费组极大地方便了扩缩机器、增减处理能力等,同时只有群组名相同才会被认为是一个集群组的,RocketMQ默认情况下采用集群消费模式,所以消息每次只会随机的发给每个消费群组中的一员,这也体现了RocketMQ集群无单点、水平可扩展、任意一点高可用、支持负载均衡等特点。
    ##1.4、RocketMQ核心模块
    rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。
    rocketmq-client:提供发送、接受消息的客户端API。
    rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。
    rocketmq-common:通用的一些类,方法,数据结构等。
    rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议。
    rocketmq-store:消息、索引存储等。
    rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!【一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件】。
    rocketmq-tools:命令行工具。
    #二、RocketMQ示例
    ##2.1、RocketMQ部署–双master方式
    可参考我的博文:“RocketMQ部署–双master方式”。
    ##2.2、HelloWorld示例
    ###2.2.1、生产者

    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
    	public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    		//实例化生产者,实例化时需要指定生产组名
    		DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
    		//设置namesrc地址,有多个的话用";"隔开
    		producer.setNamesrvAddr("192.168.246.130:9876;192.168.246.131:9876");
    		//启动生产者
    		producer.start();
    		for(int i=1;i<=100;i++){
    			//创建一条消息,指定了消息的主题topic、标签tag、消息的内容
    			Message msg = new Message("TopicQuickStart", "TagA", ("Hello RocketMQ "+i).getBytes());
    			//发送消息
    			SendResult sendResult = producer.send(msg);
    			System.out.println(sendResult);
    		}
    		//关闭生产者,main方法主线程结束,程序终止
    		producer.shutdown();
    	}
    }
    

    ###2.2.2、消费者

    import java.util.List;
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer {
        public static void main(String[] args) throws InterruptedException, MQClientException {
        	//实例化消费者,实例化时需要指定消费组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
            //设置namesrc地址,有多个的话用";"隔开
            consumer.setNamesrvAddr("192.168.246.130:9876;192.168.246.131:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //设置每次消费的消息最大数量,默认是1,即一条条拉取
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置订阅的消息主题topic和标签tags,这里订阅TopicQuickStart主题下的所有消息,所以会收到上面生产者发送的该主题下标签为TagA的消息
            consumer.subscribe("TopicQuickStart", "*");
            //注册消费监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {
                	//如果不设置每次消费的消息最大数量,这里的msgs里只会有一条
                	System.out.println("消息条数:"+msgs.size());
                	for(MessageExt msg : msgs){
                		System.out.println(Thread.currentThread().getName()+"收到消息:topic:"+msg.getTopic()+",tags:"+msg.getTags()+",msg:"+new String(msg.getBody()));
                	}
                	//回复RocketMQ,这条消息消费成功,如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER,即表明消息消费失败,那RocketMQ会对这条消息进行重发操作
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
     //启动消费者,main方法主线程结束后,程序不会停止,进入阻塞状态,来一条消息就触发一次监听事件
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    

    可以执行多次上面消费者的main方法,也就是启动多个这样的消费者,因为在一个群组里,消息每次只会发送给群组里的一个成员,所以假设有100条消息,启动了两个同一群组的消费者,那么每个消费者各消费50条消息。可见,RocketMQ自动完成了相同群组下的消费者的负载均衡操作,而且如果想增减消费者,只需启动或者关闭消费者即可,无需任何配置,水平可扩展性好!
    如果要切换成广播消费模式,每个消费端都需进行下面的设置:
    consumer.setMessageModel(MessageModel.BROADCASTING);//设置为广播消费模式
    这样即使是同一个消费组的消费者,也都会收到订阅的所有消息,不会进行均衡消费。
    ##2.3、两类Consumer
    在RocketMQ里,Consumer分为两类:MQPullConsumer和MQPushConsumer。其实两种都是拉模式(pull),即Consumer轮询从broker拉取消息。
    push方式就是上面例子里的消费者,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumerMessage()来消费,对用户而言,感觉消息是被推送过来的。
    pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
    #三、消息重试
    ##3.1、生产端消息重试
    生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败,这种消息失败重试我们可以手动设置发送失败重试的次数。

    producer.setRetryTimesWhenSendFailed(3); //设置重试次数
    producer.send(msg, 1000); //发送消息,并设置消息发送超时时间
    

    上面的代码表示消息在1S内没有发送成功就会触发重试,重试最多3次。
    ##3.2、消费端消息重试
    消费端在收到消息并处理完成会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功,如果返回了失败或者没返回就会触发重试,即MQ会把消息再发一遍。所以,发生消费端的消息重试有两种情况:1、返回了ConsumeConcurrentlyStatus.RECONSUME_LATER直接表明消费失败;2、长时间没有返回消息处理状态给MQ导致超时。
    消息重复消费
    值得注意的是,当一个消费组有多个消费者时,其中一个消费者处理消息后长时间没返回,那么MQ就会把这条消息进行重试,会发送给同一消费组的另外一个消费者进行消费。要是这时候之前的消费者又把消息处理结果返回了,那就出现了消息重复消费的问题。
    RocketMQ无法避免消息重复,如果业务对消息重复非常敏感,务必要在业务层面去重,这就要求我们一定要做好消费端幂等处理。比如每条消息都有一个唯一编号,每处理完一条消息就记录日志,当消息再来的时候判断一下本条消息是否处理过。需要注意的是,如果消费端处理消息后的结果保存在DB中,那记录日志的操作也一定要保存在这个DB中,这样才能保证事务,其中有一步失败了就会一起回滚。倘若把消息处理后的结果存在mysql里,日志却记录在redis中,然后每次消息再来的时候去redis中查看是否已经处理过,这样是错误的做法,本以为放redis里再去查询的时候速度快,可以提升性能,但是却导致事务的一致性无法保证(比如mysql操作成功了而redis操作失败了那怎么回滚呢),至少目前为止单靠spring的事务管理无法回滚两个数据源的操作,需要增加其他的组件,所以建议都在一个DB中操作。
    #四、集群
    推荐的几种 Broker 集群部署方式,这里的 Slave 不可写,但可读,类似于 Mysql 主备方式。当主节点挂了,就可以访问从节点来获取之前未消费的数据。但是因为Slave是只读的,所以不会接收生产者生产的新数据,新数据只会存储到其他的Broker主备节点上,直到宕机的主节点重新启动了才会接收新数据。至少截止到v3.2.4版本,RocketMQ还未能支持主备自动切换功能。
    ##4.1、单个 Master
    返种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
    ##4.2、多 Master 模式
    一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
    优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
    缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
    ##4.3、多 Master 多 Slave 模式,异步复制
    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。
    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
    缺点:Master 宕机、磁盘损坏等情况,会丢失少量消息。
    ##4.4、多 Master 多 Slave 模式,同步双写
    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,主备都写成功,才会向应用返回成功。
    优点:数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
    缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
    #五、顺序消费
    普通模式下,使用传统的send发送消息即可,比如2.2里的示例代码,但是这种模式下不能保证消息消费顺序的一致性。假如我们在网购的时候,需要下单,那么下单需要有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成,也就是这个三个环节要有顺序,这个订单才有意义,这种场景下就需要顺序消费。
    世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!
    那通过RocketMQ怎么实现顺序消费的呢?
    答:需要顺序消费的消息在生成端必须发送到同一个主题的同一个队列中(一个主题默认4个队列),比如创建订单1、订单1付款,订单1完成这三条消息就需要在同一个队列中,创建订单2、订单2付款,订单2完成这三条消息也需要在同一队列中,但订单1和订单2的队列可以不是同一个队列。然后消费端消费时必须实现MessageListenerOrderly接口以保证一个队列只会被同一个消费端的一个线程所消费,因为队列先进先出的原则,就可以保证顺序消费了。
    比如有1个生产端和2个消费端,要保证顺序消费,示例代码如下:
    ##5.1、生产者

    public class Producer {  
        public static void main(String[] args) {  
            try {  
                DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
                producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
                producer.start();  
                for (int i = 1; i <= 5; i++) {  
      // 主题:TopicOrderTest,标签:order_1,KEY:"KEY" + i,消息内容:"order_1 " + i
                    Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
      // RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
    		// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
    		// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  //arg就是producer.send方法的最后一个参数,这里是0
                            int index = id % mqs.size();  //队列数量没有事先设置那就是4,0%4=0
                            return mqs.get(index);  //返回下标为0的队列,即这5条消息存放在0号队列中
                        }  
                    }, 0);  
                    System.out.println(sendResult);  
                }  
                for (int i = 1; i <= 5; i++) {  
                    Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());  
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  
                            int index = id % mqs.size();  
                            return mqs.get(index);  //返回下标为1的队列,即这5条消息存放在1号队列中
                        }  
                    }, 1);  
                    System.out.println(sendResult);  
                }  
                for (int i = 1; i <= 5; i++) {  
                    Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());  
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  
                            int index = id % mqs.size();  
                            return mqs.get(index);  //返回下标为2的队列,即这5条消息存放在2号队列中
                        }  
                    }, 2);  
                    System.out.println(sendResult);  
                }  
                producer.shutdown();  
            } catch (MQClientException e) {  
                e.printStackTrace();  
            } catch (RemotingException e) {  
                e.printStackTrace();  
            } catch (MQBrokerException e) {  
                e.printStackTrace();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }
    

    ##5.2、消费者1

    public class Consumer1 {    
        public static void main(String[] args) throws MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicOrderTest", "*");  
            /** 
             * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列 
      * 所以为了保证顺序消费,消费逻辑里不应该有多线程逻辑,比如通过线程池并发消费,这都是不允许的
             */  
            consumer.registerMessageListener(new MessageListenerOrderly() {  
                AtomicLong consumeTimes = new AtomicLong(0);  
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                    // 设置自动提交  
                    context.setAutoCommit(true);  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                    }  
                    try {  
                        TimeUnit.SECONDS.sleep(5L);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    return ConsumeOrderlyStatus.SUCCESS;  
                }  
            });  
            consumer.start();  
            System.out.println("Consumer1 Started.");  
        }  
    }
    

    ##5.3、消费者2

    public class Consumer2 {  
        public static void main(String[] args) throws MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicOrderTest", "*");  
            /** 
             * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列 
      * 所以为了保证顺序消费,消费逻辑里不应该有多线程逻辑,比如通过线程池并发消费,这都是不允许的
             */  
            consumer.registerMessageListener(new MessageListenerOrderly() {  
                AtomicLong consumeTimes = new AtomicLong(0);  
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                    // 设置自动提交  
                    context.setAutoCommit(true);  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                    }  
                    try {  
                        TimeUnit.SECONDS.sleep(5L);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    return ConsumeOrderlyStatus.SUCCESS;  
                }  
            });  
            consumer.start();  
            System.out.println("Consumer2 Started.");  
        }  
    }
    

    先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息。
    Consumer1消费情况如图,都按照顺序执行了
    这里写图片描述
    Consumer2消费情况如图,也都按照顺序执行了
    这里写图片描述
    #六、事务消费
    考虑生活中的场景:我们去北京庆丰包子铺吃炒肝,先去营业员那里付款(Action1),拿到小票(Ticket),然后去取餐窗口排队拿炒肝(Action2)。思考2个问题:第一,为什么不在付款的同时,给顾客炒肝?如果这样的话,会增加处理时间,使得后面的顾客等待时间变长,相当于降低了接待顾客的能力(降低了系统的QPS)。第二,付了款,拿到的是Ticket,顾客为什么会接受?从心理上说,顾客相信Ticket会兑现炒肝。事实上也是如此,就算在最后炒肝没了,或者断电断水(系统出现异常),顾客依然可以通过Ticket进行退款操作,这样都不会有什么损失!(虽然这么说,但是实际上包子铺最大化了它的利益,如果炒肝真的没了,浪费了顾客的时间,不过顾客顶多发发牢骚,最后接受)
    生活已经告诉我们处理分布式事务,保证数据最终一致性的思路!这个Ticket(凭证)其实就是消息!
    通过RocketMQ可以实现分布式事务,比如银行A向银行B转账,银行A扣款1000,那银行B一定要加1000才行,通过RocketMQ的执行逻辑如下:
    这里写图片描述
    如上图所示,消息数据独立存储,业务和消息解耦,实质上消息的发送有2次,一条是转账消息,另一条是确认消息。发送转账消息后,消息在MQ的状态是prepared,这时消费者还无法收到这条消息,需等生产者这边的本地事务执行完并发送确认消息后,才能收到这条消息。
    到这里,我们先来看看基于RocketMQ的代码:
    ##6.1、消费者

    public class Consumer {  
        public static void main(String[] args) throws InterruptedException, MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeMessageBatchMaxSize(10);  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicTransactionTest", "*");  
            consumer.registerMessageListener(new MessageListenerConcurrently() {  
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                    try {  
                        for (MessageExt msg : msgs) {  
                            System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                        }  
                    } catch (Exception e) {  
                        e.printStackTrace();  
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
                    }  
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
                }  
            }); 
            consumer.start(); 
            System.out.println("transaction_Consumer Started.");  
        }  
    }
    

    ##6.2、生产者
    ###6.2.1、生产者

    public class Producer {  
        public static void main(String[] args) throws MQClientException, InterruptedException {  
            TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();  
            TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");  
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            // 事务回查最小并发数  
            producer.setCheckThreadPoolMinSize(2);  
            // 事务回查最大并发数  
            producer.setCheckThreadPoolMaxSize(2);  
            // 队列数  
            producer.setCheckRequestHoldMax(2000);  
            producer.setTransactionCheckListener(transactionCheckListener);  
            producer.start();   
            TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();  
            for (int i = 1; i <= 2; i++) {  
                try {  
                    Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,  
                            ("Hello RocketMQ " + i).getBytes());  
      //发送消息后,消息在MQ的状态是prepared,这时消费者还无法收到这条消息,需等生产者这边的本地事务执行完并发送确认消息后,才能收到这条消息
                    SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);  
                    System.out.println(sendResult);  
                    Thread.sleep(10);  
                } catch (MQClientException e) {  
                    e.printStackTrace();  
                }  
            }  
            for (int i = 0; i < 100000; i++) {  
                Thread.sleep(1000);  
            }  
            producer.shutdown();  
        }  
    }
    

    ###6.2.2、执行本地事务
    TransactionExecuterImpl类用于执行本地事务如下:

    public class TransactionExecuterImpl implements LocalTransactionExecuter {  
        public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {  
            System.out.println("执行本地事务msg = " + new String(msg.getBody()));  
            System.out.println("执行本地事务arg = " + arg);  
            String tags = msg.getTags();  
            if (tags.equals("transaction2")) {  
                System.out.println("======我的操作============,失败了  -进行ROLLBACK");  
                return LocalTransactionState.ROLLBACK_MESSAGE;  //返回失败并发送回滚消息
            }  
            return LocalTransactionState.COMMIT_MESSAGE;  //返回成功并发送确认消息
            // return LocalTransactionState.UNKNOW;  
        }  
    }
    

    ###6.2.3、针对未决事务,MQ服务器回查客户端
    如果因网络问题最后发送确认消息给MQ失败了或者发送了LocalTransactionState.UNKNOW,那事务就一直没能完成,一直处于prepared状态,针对未决事务,MQ服务器会回查客户端看看到底有没有完成(目前已经被阉割啦),这时会调用TransactionCheckListener接口,所以TransactionCheckListenerImpl类实现了这个接口用于回查,代码如下:

    public class TransactionCheckListenerImpl implements TransactionCheckListener {  
        //在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。  
        public LocalTransactionState checkLocalTransactionState(MessageExt msg) {  
            System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));  
            // return LocalTransactionState.ROLLBACK_MESSAGE;  
            return LocalTransactionState.COMMIT_MESSAGE;  
            // return LocalTransactionState.UNKNOW;  
        }  
    }
    

    producer端发送数据到MQ,并且处理本地事物,这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据,第二个数据失败了,不会被消费。
    因为MQ回查客户端的功能被阿里去除了,导致即使返回了LocalTransactionState.UNKNOW,TransactionCheckListenerImpl里的代码也不会被触发,所以目前事务回查这部分需要自己设计实现。
    #七、参考文章
    RocketMQ重点原理讲解:https://www.jianshu.com/p/453c6e7ff81c

    此为博主(yjclsx)原创文章,如若转载请标明出处,谢谢!

    展开全文
  • Rocketmq

    2020-03-08 20:01:35
    https://github.com/apache/rocketmq 控制台:https://github.com/apache/rocketmq-externals Spring、Springboot整合:https://github.com/apache/rocketmq-spring 官网 http://rocketmq.apache.org 文章 性能...
    展开全文
  • RocketMq

    万次阅读 2018-09-30 12:07:34
    RocketMQ简介 1.RocketMQ是一款分布式、队列模型的消息中间件,是阿里巴巴集团自主研发的专业消息中间件,借鉴参考了JMS规范的MQ实现,更参考了优秀的开源消息中间件KAFKA,实现了业务消峰、分布式事务的优秀框架。 ...

    RocketMQ简介

    1.RocketMQ是一款分布式、队列模型的消息中间件,是阿里巴巴集团自主研发的专业消息中间件,借鉴参考了JMS规范的MQ实现,更参考了优秀的开源消息中间件KAFKA,实现了业务消峰、分布式事务的优秀框架。

    2.其底层代码编写清晰优秀,采用Netty NIO框架进行数据通信

    3.摒弃了Zookeeper,内部使用更轻量级的NameServer进行网络路由,提高服务性能,并且支持消息失败重试机制。

    4.天然支持集群模型,消费者负载均衡、水平扩展能力,支持广播模式和集群模式。

    5.采用零拷贝的原理、顺序写盘、支持亿级消息堆积能力。

    6.提供丰富的消息机制,如顺序消息、事务消息等。

    产品基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。

    MQ 目前提供 TCP 、MQTT 两种协议层面的接入方式,支持 Java、C++ 以及 .NET 不同语言,方便不同编程语言开发的应用快速接入 MQ 消息云服务。 用户可以将应用部署在阿里云 ECS、企业自建云,或者嵌入到移动端、物联网设备中与 MQ 建立连接进行消息收发,同时本地开发者也可以通过公网接入 MQ 服务进行消息收发。

    功能概览图

    多协议接入

    支持 MQTT 协议:支持主动推送模型,多级 Topic 模型支持一次触达 1000万+ 终端,可广泛应用于物联网和社交即时通信场景。

    支持 TCP 协议:区别于 HTTP 简单的接入方式,提供更为专业、可靠、稳定的 TCP 协议的 SDK 接入。

    特色功能

    事务消息:实现类似 X/Open XA 的分布事务功能,以达到事务最终一致性状态。

    定时(延时)消息:允许消息生产者指定消息进行定时(延时)投递,最长支持40天。

    大消息:目前默认支持最大 256KB 消息,华北2 地域支持最大 4MB 消息。

    消息轨迹:通过消息轨迹,用户能清晰定位消息从发布者发出,经由 MQ 服务端,投递给消息订阅者的完整链路,方便定位排查问题。

    广播消费:允许一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。

    顺序消息:允许消息消费者按照消息发送的顺序对消息进行消费。

    重置消费进度:根据时间重置消费进度,允许用户进行消息回溯或者丢弃堆积消息。

    消息收发模型:

    MQ基本概念:

    Message:消息,消息队列中信息传递的载体。

    Message ID:消息的全局唯一标识,由 MQ 系统自动生成,唯一标识某条消息。

    Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。

    Topic:消息主题,一级消息类型,通过 Topic 对消息进行分类。

    Tag:消息标签,二级消息类型,用来进一步区分某个 Topic 下的消息分类。

    Producer:消息生产者,也称为消息发布者,负责生产并发送消息。

    Producer ID:一类 Producer 的标识,这类 Producer 通常生产并发送一类消息,且发送逻辑一致。

    Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。

    Consumer ID:一类 Consumer 的标识,这类 Consumer 通常接收并消费一类消息,且消费逻辑一致。

     

    RocketMQ的4个组件

    分别是nameserver、broker、producer和consumer。

    nameserver: 存储当前集群所有Brokers信息、Topic跟Broker的对应关系。

    Broker: 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。

    Producer: 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID。同一个ID下所有实例组成一个生产者集群。

    Consumer: 消息消费者,每个订阅者也有一个ID(编号),多个消费者实例可以共用同一个ID。同一个ID下所有实例组成一个消费者集群。

     

    集群部署结构

    工作流程

    1,启动Nameserver,Nameserver起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。

    2,Broker启动,跟所有的Nameserver保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,Nameserver集群中就有Topic跟Broker的映射关系。

    3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。

    4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Nameserver中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。

    5,Consumer跟Producer类似。跟其中一台Nameserver建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

    模块功能特性

    Nameserver

    1、Nameserver用于存储Topic、Broker关系信息,功能简单,稳定性高。多个Namesrv之间相互没有通信,单台Namesrv宕机不影响其他Namesrv与集群;即使整个Namesrv集群宕机,已经正常工作的Producer,Consumer,Broker仍然能正常工作,但新起的Producer, Consumer,Broker就无法工作。

    2、Nameserver压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向Namesr发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致Namesrv误认为Broker心跳失败。

    Broker

    1高并发读写服务

    Broker的高并发读写主要是依靠以下两点:

    消息顺序写:所有Topic数据同时只会写一个文件,一个文件满1G,再写新文件,真正的顺序写盘,使得发消息TPS大幅提高。

    消息随机读:RocketMQ尽可能让读命中系统pagecache,因为操作系统访问pagecache时,即使只访问1K的消息,系统也会提前预读出更多的数据,在下次读时就可能命中pagecache,减少IO操作。

    2负载均衡与动态伸缩

    负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。

    动态伸缩能力(非顺序消息):Broker的伸缩性体现在两个维度:Topic, Broker。

    Topic维度:假如一个Topic的消息量特别大,但集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。

    Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker就可以。Broker起来后向Namesrv注册,Producer、Consumer通过Namesrv发现新Broker,立即跟该Broker直连,收发消息。

    3高可用&高可靠

    高可用:集群部署时一般都为主备,备机实时从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。

    高可靠:所有发往broker的消息,有同步刷盘和异步刷盘机制;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电

    4Broker与Namesrv的心跳机制

    单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。Namesrv会反查Broer的心跳信息,如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Producer、Consumer有Broker宕机。

    生产者(Producer)

    Producer启动时,也需要指定Namesrv的地址,从Namesrv集群中选一台建立长连接。如果该Namesrv宕机,会自动连其他Namesrv。直到有可用的Namesrv为止。

    生产者每30秒从Namesrv获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。

    生产者端的负载均衡

    生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上。

    这里需要注意一点:假如某个Broker宕机,意味生产者最长需要30秒才能感知到。在这期间会向宕机的Broker发送消息。当一条消息发送到某个Broker失败后,会往该broker自动再重发2次,假如还是发送失败,则抛出发送失败异常。业务捕获异常,重新发送即可。客户端里会自动轮询另外一个Broker重新发送,这个对于用户是透明的。

    消费者

    消费者启动时需要指定Namesrv地址,与其中一个Namesrv建立长连接。消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。连接建立后,从namesrv中获取当前消费Topic所涉及的Broker,直连Broker。

    Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳,就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。

    消费者端的负载均衡

    先讨论消费者的消费模式,消费者有两种模式消费:集群消费,广播消费。

    广播消费:每个消费者消费Topic下的所有队列。

    集群消费:一个topic可以由同一个ID下所有消费者分担消费。具体例子:假如TopicA有6个队列,某个消费者ID起了2个消费者实例,那么每个消费者负责消费3个队列。如果再增加一个消费者ID相同消费者实例,即当前共有3个消费者同时消费6个队列,那每个消费者负责2个队列的消费。

    消费者端的负载均衡,就是集群消费模式下,同一个ID的所有消费者实例平均消费该Topic的所有队列。

    RocketMQ vs. ActiveMQ vs. Kafka

    展开全文
  • rocketmq

    2018-07-09 14:18:49
    由于被测服务的底层架构的变动,由redis+zookeper改成redis+rocketmq。因此需要掌握基本的rocketmq知识。测试环境的后台日志出现异常报错:9876端口unkown;推测可能的两个原因1、端口被临时屏蔽(因测试机器为...
    

    由于被测服务的底层架构的变动,由redis+zookeper改成redis+rocketmq。因此需要掌握基本的rocketmq知识。

    测试环境的后台日志出现异常报错:9876端口unkown;

    推测可能的两个原因1、端口被临时屏蔽(因测试机器为同一区域,不太可能设置防火墙)2、rocketmq异常挂机,需要对rocketmq进行重启。命令如下:

    1、rocketmq的启动
    进入rocketMQ解压目录下的bin文件夹
    启动namesrv服务:nohup sh bin/mqnamesrv & 
    //日志目录:{rocketMQ解压目录}/logs/rocketmqlogs/namesrv.log

    启动broker服务:nohup sh bin/mqbroker &
    //日志目录:{rocketMQ解压目录}/logs/rocketmqlogs/broker.log

    以上的启动日志可以在启动目录下的nohub.out中看到

    2、rocketmq服务关闭

    关闭namesrv服务:sh bin/mqshutdown namesrv

    关闭broker服务 :sh bin/mqshutdown broker

    展开全文
  • Spring Cloud Alibaba 04_使用 RocketMQ 实现消息的生产和消费 RocketMQ 的下载安装和配置 官网下载地址:http://rocketmq.apache.org/dowloading/releases/ 在 /usr/local 下创建 rocket 文件夹,将 rocketmq-all-...
  • SpringBoot整合RocketMq
  • RocketMQ踩坑

    2020-12-22 23:55:42
    今天刚开始学 RocketMQ,在运行的时候 Name Server 就怎么也运行不起来,捣鼓了很长时间才弄好,在这里把解决方法记录下来 版本说明 操作系统:MacOS 10.13.5 RocketMQ版本: 4.5.1 情景复现 下载 rocketmq-all-...
  • RocketMQ3RocketMQ Overview

    2019-12-27 11:30:09
    RocketMQ 是什么 RocketMQ 物理部署结构 RocketMQ 逻辑部署结构
  • 2020年,RocketMQ面试题 -面试题驱动RocketMQ学习

    万次阅读 多人点赞 2019-11-08 12:41:47
    本篇文章持续更新(星期1、3、5),大概有上百道题,用这些题来驱动RocketMQ学习,在面试中也会脱颖而出!! 15 解决订单系统诸多问题的核心技术:消息中间件到底是什么? 问1:什么是“同步”调用? 答: ...
  • RocketMQ吐血总结

    万次阅读 多人点赞 2018-12-09 20:36:36
    RocketMQ吐血总结 架构   概念模型 最基本的概念模型与扩展后段概念模型  存储模型   RocketMQ吐血总结 User Guide RocketMQ是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并...
  • 文章目录Rocketmq整体架构 Rocketmq整体架构 RocketMQ-初体验RocketMQ(01)_RocketMQ初体验中 对 RocketMQ 架构图做了一个简短的介绍 , 接下来,我详细的数一下 如上图 ...
  • RocketMQ——安装RocketMQ

    2020-03-01 23:37:04
    本文在Linux环境下安装RocketMQ RocketMQ 4.x以上版本安装需要JDK1.8 +, Maven 首先在官网下载RocketMQ的源码包,如果没有安装Maven,首先使用wget ...
  • 文章目录RocketMQ集群基本信息目标知识预习发送方式发送结果环境搭建使用Java API操作RocketMQ—Simple Message RocketMQ集群基本信息 右侧的部署模式 ,双机互为主备 如何搭建的,请移步: RocketMQ-初体验...
  • 使用RocketMQ Console源码搭建RocketMQ Console与基本使用
  • Python RocketMQ

    2018-10-23 14:42:03
    python调用java访问RocketMQ,兼容python2和python3。 请安装jpype1
  • Rocketmq学习指南

    2020-10-08 16:51:00
    初学者学习rocketmq的文档 包含3个pdf rocketmq最佳实践pdf, rocketmq原理简介pdf, rocketmq用户指南pdf
  • springboot整合使用rocketMq

    万次阅读 热门讨论 2018-09-11 18:33:22
    前文,我们讲述了rocketMq的基本使用,接下来聊聊如何使用springboot整合使用rocketMq; 1)新建maven工程,工程结构目录如图: constants包下存放着常量信息,这里保存的是自定义的错误码,为将被异常类调用, ...
  • rocketmq-window启动rocketmq

    千次阅读 2020-01-18 10:28:57
    window启动rocketmq 下载 到官网下载最新的压缩包rocketmq-all-4.6.0-bin-release.zip并解压 rocketmq启动顺序: 先启动namesrv 再启动broker 进入rocketmq解压路径的bin目录,启动一个cmd窗口 启动namesrv start ...
  • Linux下安装RocketMQ

    2020-10-22 23:44:49
    RocketMQ
  • Rocketmq编程指南

    2018-06-19 10:47:26
    Rocketmq编程指南Rocketmq编程指南Rocketmq编程指南Rocketmq编程指南
  • RocketMQ用户指南

    2018-04-04 17:55:35
    RocketMQ用户指南RocketMQ用户指南RocketMQ用户指南RocketMQ用户指南RocketMQ用户指南
  • RocketMQ篇.Linux安装rocketmq-console-附件资源

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 34,463
精华内容 13,785
热门标签
关键字:

rocketmq