精华内容
下载资源
问答
  • 消息
    万次阅读
    2022-07-19 10:53:20

    概念介绍

    • 事务消息:提供类似XA或Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。
    • 半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了RocketMQ服务端,但是RocketMQ服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
    • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

    分布式事务消息的优势

    RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

    典型场景

    在淘宝购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅交易订单消息,做相应的业务处理,即可保证最终的数据一致性。

    交互流程

    事务消息交互流程如下图所示。

    1656920164647.png

    事务消息发送步骤如下:

    1. 生产者将半事务消息发送至RocketMQ服务端。
    2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
    3. 生产者开始执行本地事务逻辑。
    4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
    1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

    事务消息回查步骤如下:

    1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    2. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

    示例代码

    事务消息生产者

    public enum LocalTransactionState {
        COMMIT_MESSAGE,
        ROLLBACK_MESSAGE,
        UNKNOW,
    }
    

    事务消息发送完成本地事务后,可在execute方法中返回以下三种状态:

    • COMMIT_MESSAGE:提交事务,允许消费者消费该消息。
    • ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
    • UNKNOW:暂时无法判断状态,等待固定时间以后消息队列RocketMQ版服务端根据回查规则向生产者进行消息回查。

    创建事务消息的Producer时必须指定TransactionListener的实现类,处理异常情况下事务消息的回查。

    回查规则:本地事务执行完成后,若服务端收到的本地事务返回状态为TransactionStatus.Unknow,或生产者应用退出导致本地事务未提交任何状态。则服务端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

    回查间隔时间:系统默认每隔30秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。

    package com.morris.rocketmq.transaction;
    
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
    
    /**
     * 事务消息生产者
     */
    public class TransactionProducer {
    
        public static void main(String[] args) throws Exception {
    
            TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-demo");
            producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
    
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            });
            producer.setExecutorService(executorService);
    
            // 指定事务会查的实现类
            producer.setTransactionListener(new TransactionListener() {
                private final AtomicInteger transactionIndex = new AtomicInteger(0);
    
                private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    int value = transactionIndex.getAndIncrement();
                    System.out.println(Thread.currentThread().getName()+  "-executeLocalTransaction:" + new String(msg.getBody()) + ",value=" + value);
                    int status = value % 3;
                    localTrans.put(msg.getTransactionId(), status);
                    return LocalTransactionState.UNKNOW;
                }
    
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.println(Thread.currentThread().getName()+  "-checkLocalTransaction:" + new String(msg.getBody()));
                    Integer status = localTrans.get(msg.getTransactionId());
                    if (null != status) {
                        switch (status) {
                            case 0:
                                return LocalTransactionState.COMMIT_MESSAGE;
                            case 1:
                                return LocalTransactionState.UNKNOW;
                            case 2:
                                return LocalTransactionState.ROLLBACK_MESSAGE;
                        }
                    }
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
    
            producer.start();
    
            for(int i = 0; i < 10; i++) {
                Message message = new Message("TransactionTopic", ("transactionDemo" + i).getBytes());
                // 发送事务消息
                producer.sendMessageInTransaction(message, i);
                System.out.println(message);
            }
        }
    
    }
    

    第一次消息回查最快时间:该参数支持自定义设置。若指定消息未达到设置的最快回查时间前,系统默认每隔30秒一次的回查任务不会检查该消息。

    设置方式如下:

    Message message = new Message();
    message.putUserProperties(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");
    

    说明:因为系统默认的回查间隔,第一次消息回查的实际时间会向后有0秒~30秒的浮动。

    例如:指定消息的第一次消息最快回查时间设置为60秒,系统在第58秒时达到定时的回查时间,但设置的60秒未到,所以该消息不在本次回查范围内。等待间隔30秒后,下一次的系统回查时间在第88秒,该消息才符合条件进行第一次回查,距设置的最快回查时间延后了28秒。

    事务消息消费者

    事务消息的Group ID不能与其他类型消息的Group ID共用。与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据Group ID去查询生产者客户端。

    package com.morris.rocketmq.transaction;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
    
    /**
     * 事务消息消费者
     */
    public class TranscationConsumer {
        public static void main(String[] args) throws Exception {
            // 实例化消息生产者,指定组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-consumer-group");
            // 指定Namesrv地址信息.
            consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
            // 订阅Topic
            consumer.subscribe("TransactionTopic", "*");
            //负载均衡模式消费
            consumer.setMessageModel(MessageModel.CLUSTERING);
            // 注册回调函数,处理消息
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                try {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            //启动消息者
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    

    使用说明

    1. 事务消息不支持延时消息和批量消息。
    2. 事务回查的间隔时间:BrokerConfig.transactionCheckInterval,通过Broker的配置文件设置好。
    3. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话(N=transactionCheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
    4. 事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数。
    5. 事务性消息可能不止一次被检查或消费。
    6. 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
    7. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
    8. 事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。
    更多相关内容
  • 想查看微信好友撤回的消息?Python帮你搞定

    万次阅读 多人点赞 2019-12-01 13:50:59
    要说微信最让人恶心的发明,消息撤回绝对能上榜。 比如你现在正和女朋友用微信聊着天,或者跟自己喜欢的女孩子聊着天,一个不留神,你没注意到对方发的消息就被她及时撤回了,这时你很好奇,好奇她到底发了什么?...

    要说微信最让人恶心的发明,消息撤回绝对能上榜。

    比如你现在正和女朋友用微信聊着天,或者跟自己喜欢的女孩子聊着天,一个不留神,你没注意到对方发的消息就被她及时撤回了,这时你很好奇,好奇她到底发了什么?于是你打算问问她发了什么,结果她回一句"没什么"。这一回复,让你的好奇心更加强烈了,顿时就感觉消息撤回这一功能就是用来折磨人的。

    那么有没有什么办法能够知道你心爱的她(他)到底撤回了什么呢?不要着急,Python帮你搞定。

    模块介绍

    本篇文章将用Python实现微信的防撤回功能,针对微信操作,Python有一个十分强大的库:itchat。相信没有使用过也有所耳闻吧。官方是这样描述它的:

    Project description
    itchat is a open souce wechat api project for personal account.
    
    It enables you to access your personal wechat account through command line.
    

    翻译过来就是:itchat是一个针对个人帐户的开放式微信api项目,它使您可以通过命令行访问您的个人微信帐户。

    既然是针对微信的开发,我们就离不开这个模块的协助,所以,首先下载该模块:

    pip install itchat
    

    也可以在开发工具Pycharm中直接导入该模块,Pycharm会提示你下载。

    模块初体验

    考虑到应该有些人从来没有使用过该模块,这里对该模块进行一个简单的入门。

    1、如何登陆微信

    既然要操作微信,那么摆在我们面前的问题就是如何登录微信,登录微信非常简单,直接看代码:

    import itchat
    
    itchat.login()
    

    没错,一句代码即可完成登录,运行之后就会弹出一个二维码,扫描之后在手机上授权登录,控制台就会提示是否登录成功。

    Login successfully as Y
    

    这样就说明登录成功了。

    这里需要注意一个问题,就是你会发现每次运行程序都要扫描二维码登录,这样未免太麻烦,有没有办法只扫描一次,以后就自动登录了呢?这当然是可以的。

    import itchat
    
    itchat.auto_login(hotReload=True)
    

    通过函数名也能知道该方法可以实现自动登录,运行程序,扫码登录之后会在项目路径下创建一个itchat.pkl文件,该文件用于存储登录的状态,所以千万不要动它,如果你想换一个微信账号登录,就要先把这个文件删除,因为该文件记录的是上一个微信的状态,删除之后即可登录。

    需要注意:这种方式只能保证你在短时间内无需重复登录,时间长了,还是需要重新扫码登录的。

    进行到这里,有些人可能会发现自己的微信登录不上的情况,据我所知,有些新注册的微信和长期不使用的微信是无法登录网页版微信的,所以这里也会导致登录不上。如果登录不上,那也是没有办法的,下面的内容也就没有意义了。

    2、获取好友列表

    登录上微信之后,我们来用一用itchat模块提供的一些api,比如获取好友列表。

    import itchat
    
    itchat.auto_login(hotReload=True)
    friends = itchat.get_friends()  # 好友列表
    print(friends)
    

    使用get_friends()函数即可获取到好友列表的所有好友信息,包括昵称、备注名、地址、个性签名、性别等等。

    这里我随意地复制了一个好友的个人信息,当然由于隐私问题,这里的部分信息我用"*"号代替了,我们重点是分析一下这些信息的内容。比如最开始的UserName,这是用户的唯一标识,相当于身份证号码,你的每个好友都会有这样一个标识,每个好友之间肯定都是不一样的;然后是NickName,这是好友的昵称;HeadImgUrl是好友的头像地址;RemarkName是你对好友的备注名;Province是省份等等,这里就不一一介绍了,感兴趣的话可以自己去了解一下。

    3、如何发送消息给好友

    如何发送一条消息给指定的好友呢?也非常简单:

    import itchat
    
    itchat.auto_login(hotReload=True)
    itchat.send('Hello World', toUserName='@f9e42aafa1175b38b60a0be4d651a34c77f2528d9b7784e7aaf415090eca8fa6')
    

    此时的UserName就派上用场了,也就是好友的唯一标识,这样,我们就给该标识对应的好友发送了一条消息,所以,我们可以这样改进程序:

    import itchat
    
    itchat.auto_login(hotReload=True)
    friends = itchat.get_friends()
    nickName = '诚信通授权渠道商-老曾'
    for i in friends:
        if '诚信通授权渠道商-老曾' == i['NickName']:
            itchat.send('Hello World', toUserName=i['UserName'])
            break
    
    

    这样,就可以指定发送给任意好友,通过好友的昵称在好友列表中进行检索,找到的话,就获取该好友的UserName,然后发送消息,也可以通过对好友的备注名(RemarkName)查找,大家可以自己尝试。

    4、装饰器

    关于itchat模块还有很多功能,这里就不作过多讲解了,我们只讲关于这次程序的知识点,这里是最后一个内容,装饰器。

    关于装饰器,一时半会还讲不清楚,这里只是简单介绍一下,装饰器的作用就是用于拓展原来函数功能的一种函数,目的是在不改变原函数名(或类名)的情况下,给函数增加新的功能。

    例如现在有一个函数fun(),你并不知晓函数的实现原理,你肯定也不能去修改这个函数的代码,而你需要给该函数添加一个输出开始运行时间和结束运行时间的功能,该如何实现呢?这个时候就可以使用装饰器。

    import time
     
    def show_time(fun):
        def inner():
            print(time.time())
            fun()
            print(time.time())
        return inner   
     
    @show_time
    def fun():
        pass
     
    fun()
    
    

    该如何理解这段程序呢?首先@show_time即是使用一个装饰器show_time,此时会将装饰的函数,也就是fun()作为参数传递给装饰器show_time(),我们知道函数作为返回值的话,执行的其实是该函数,所以程序会执行内部函数inner(),此时输出开始运行时间,然后调用fun()函数(原有的功能不能丢),最后输出结束运行时间。这样就通过装饰器实现了一个函数的功能扩展,这也是典型的面向切面编程思想。

    如何获取好友发送的消息

    准备工作做完了,接下来就进入正题了,对于上面的知识点,大家一定要掌握,如果不懂的话,接下来的代码你可能会很懵。

    首先,我们看看该如何获取到好友发送的消息。

    import itchat
    
    itchat.auto_login(hotReload=True)
    
    
    @itchat.msg_register(itchat.content.TEXT)
    def resever_info(msg):
        print(msg)
    
    
    itchat.run() #保持运行
    
    

    itchat模块提供了@itchat.msg_register装饰器来监听消息,比如这里我们自定义了一个resever_info()函数,并用装饰器对消息进行监听,装饰器中传入了itchat.content.TEXT类型,这样监听的就是文本消息,监听到输入之后,装饰器就会将文本消息传入resever_info()的参数中。所以,msg就是监听到的消息内容。

    对于@itchat.msg_register装饰器,它不仅可以监听文本,还可以监听语音、图片、地图、名片、视频等等,为了方便,这里我们导入itchat模块下的content模块中的全部内容,因为这些消息类型都是在该模块下声明的。

    TEXT       = 'Text'
    MAP        = 'Map'
    CARD       = 'Card'
    NOTE       = 'Note'
    SHARING    = 'Sharing'
    PICTURE    = 'Picture'
    RECORDING  = VOICE = 'Recording'
    ATTACHMENT = 'Attachment'
    VIDEO      = 'Video'
    FRIENDS    = 'Friends'
    SYSTEM     = 'System'
    
    INCOME_MSG = [TEXT, MAP, CARD, NOTE, SHARING, PICTURE,
        RECORDING, VOICE, ATTACHMENT, VIDEO, FRIENDS, SYSTEM]
    
    

    还有要注意的地方,最后记得调用itchat的run()函数,保持程序运行,否则程序就直接结束了。

    接下来我们就可以测试一下了,我让我的好友发了一条消息给我,控制台就输出了如下内容:
    在这里插入图片描述

    内容很多,我们只挑重要的看。例如FromUserName,这是发送者的标识;ToUserName,这是接收者的标识;Content,这当然就是文本内容了;CreateTime,这是发送时间;注意最后的两个值:Type,这是消息类型,这里是文本类型Text,然后Text也是文本内容,所以如果想取出好友发送的消息内容的话,用Content和Text都可以。分析过后,取出内容就很简单了:

    import itchat
    import time
    from itchat.content import *  # 导入itchat下的content模块
    
    itchat.auto_login(hotReload=True)
    
    
    @itchat.msg_register(TEXT)
    def resever_info(msg):
        info = msg['Text']  # 取出文本消息
        info_type = msg['Type']  # 取出消息类型
        fromUser = itchat.search_friends(userName=msg['FromUserName'])['NickName']
        ticks = msg['CreateTime']  # 获取信息发送的时间
        time_local = time.localtime(ticks)
        dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local)  # 格式化日期
        print("发送人:" + fromUser + '\n消息类型:' + info_type + '\n发送时间:' + dt + '\n消息内容:' + info)
    
    
    itchat.run()
    
    

    这里用到了time模块,用于格式化日期。

    为了测试方便,我就自己发了一条消息给别人,自己发的消息也是会被监听的,看运行结果:

    发送人:Y
    消息类型:Text
    发送时间:2019-11-28 16:19:13
    消息内容:土鳖
    
    

    再来试试语音和图片能获取到吗?我们回到刚才的代码:

    import itchat
    from itchat.content import *  # 导入itchat下的content模块
    
    itchat.auto_login(hotReload=True)
    
    
    @itchat.msg_register(TEXT)
    def resever_info(msg):
        print(msg)
    
    
    itchat.run()
    
    

    运行之后,发送语音和图片试试,不管怎么发,控制台就是没反应,这是当然的了,我们还没对语音和图片进行监听呢,修改代码:

    import itchat
    from itchat.content import *  # 导入itchat下的content模块
    
    itchat.auto_login(hotReload=True)
    
    
    @itchat.msg_register([TEXT, PICTURE, RECORDING])	#添加了对图片和语音的监听
    def resever_info(msg):
        print(msg)
    
    
    itchat.run()
    
    

    再运行试试,先发送一张图片,再发送一段语音,控制台输出了两段内容,由于篇幅过长,就不贴出来了,无非还是那些信息,发送者,接收者,日期,消息内容等等,这里只需注意图片和语音的内容:

    'Type': 'Picture', 'Text': <function get_download_fn.<locals>.download_fn at 0x0000000003574158>
    'Type': 'Recording', 'Text': <function get_download_fn.<locals>.download_fn at 0x0000000002CFED08>
    
    

    这是一段地址,通过它我们就能够将图片和语音保存起来。

    如何保存好友发送的图片和语音

    下面我们对好友发送的图片和语音进行保存。

    import itchat
    import os
    from itchat.content import *  # 导入itchat下的content模块
    
    itchat.auto_login(hotReload=True)
    temp = 'C:/Users/Administrator/Desktop/CrawlerDemo' + '/' + '撤回的消息'
    # 如果不存在该文件夹,就创建
    if not os.path.exists(temp):
        os.mkdir(temp)
    
    
    @itchat.msg_register([TEXT, PICTURE, RECORDING])
    def resever_info(msg):
        info = msg['Text']  # 取出文本消息
        info_type = msg['Type']  # 取出消息类型
        name = msg['FileName']  # 取出语音(图片)文件名
    
        if info_type == 'Recording':
            # 保存语音
            info(temp + '/' + name)
        elif info_type == 'Picture':
            # 保存图片
            info(temp + '/' + name)
    
    
    itchat.run()
    
    

    运行起来,然后发送一张图片和一条语音,就会在指定目录下生成两个文件:
    在这里插入图片描述

    如何监听好友撤回了消息

    到这里,我们其实已经完成了消息监听,只需要稍加修改即可,但是这个程序是有缺陷的,因为不是所有消息我们都需要去保存的,好友正常发送过来的消息我们直接就能看到,保存下来不是多此一举吗?我们的目的是想知道好友撤回了什么内容,这就涉及到如何监听好友是否撤回了消息这一问题了。其实也非常简单,Content模块为我们提供了NOTE类型,该类型指的是系统消息。

    所以我们可以自定义一个函数用来监听系统消息:

    import itchat
    from itchat.content import *  # 导入itchat下的content模块
    
    
    itchat.auto_login(hotReload=True)
    
    @itchat.msg_register(NOTE)
    def note_info(msg): # 监听系统消息
        print(msg)
    
    
    itchat.run()
    
    

    运行程序,我们撤回一条消息测试一下,输出结果如下:

    ......
    'DisplayName': '', 'ChatRoomId': 0, 'KeyWord': '', 'EncryChatRoomId': '', 'IsOwner': 0}>, 'Type': 'Note', 'Text': '你撤回了一条消息'}
    ......
    
    

    这里截取了部分内容,会发现,撤回消息的文本内容为"你撤回了一条消息",所以要想知道好友是否撤回了消息就非常简单了,判断msg['Text'] == '你撤回了一条消息'即可。

    实现微信防撤回程序

    关于程序每个步骤的代码到这里就分析完了,接下来是对所有代码的汇总,也是整个程序的完整代码:

    import itchat
    from itchat.content import *
    import os
    import time
    import xml.dom.minidom	# 解析xml模块
    
    # 这是保存撤回消息的文件目录(如:图片、语音等),这里已经写死了,大家可以自行修改
    temp = 'C:/Users/Administrator/Desktop/CrawlerDemo' + '/' + '撤回的消息'
    if not os.path.exists(temp):
        os.mkdir(temp)
    
    itchat.auto_login(True)	# 自动登录
    
    dict = {}	# 定义一个字典
    
    
    # 这是一个装饰器,给下面的函数添加新功能
    # 能够捕获好友发送的消息,并传递给函数参数msg
    @itchat.msg_register([TEXT, PICTURE, FRIENDS, CARD, MAP, SHARING, RECORDING, ATTACHMENT, VIDEO])  # 文本,语音,图片
    def resever_info(msg):
        global dict	# 声明全局变量
    
        info = msg['Text']  # 取出消息内容
        msgId = msg['MsgId']  # 取出消息标识
        info_type = msg['Type']  # 取出消息类型
        name = msg['FileName']  # 取出消息文件名
        # 取出消息发送者标识并从好友列表中检索
        fromUser = itchat.search_friends(userName=msg['FromUserName'])['NickName']
        ticks = msg['CreateTime']  # 获取信息发送的时间
        time_local = time.localtime(ticks)
        dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local)  # 格式化日期
        # 将消息标识和消息内容添加到字典
        # 每一条消息的唯一标识作为键,消息的具体信息作为值,也是一个字典
        dict[msgId] = {"info": info, "info_type": info_type, "name": name, "fromUser": fromUser, "dt": dt}
        
    
    @itchat.msg_register(NOTE)  # 监听系统提示
    def note_info(msg):
        # 监听到好友撤回了一条消息
        if '撤回了一条消息' in msg['Text']:
            # 获取系统消息中的Content结点值
            content = msg['Content']
            # Content值为xml,解析xml
            doc = xml.dom.minidom.parseString(content)
            # 取出msgid标签的值
            result = doc.getElementsByTagName("msgid")
            # 该msgId就是撤回的消息标识,通过它可以在字典中找到撤回的消息信息
            msgId = result[0].childNodes[0].nodeValue
            # 从字典中取出对应消息标识的消息类型
            msg_type = dict[msgId]['info_type']
            if msg_type == 'Recording':	# 撤回的消息为语音
                recording_info = dict[msgId]['info']  # 取出消息标识对应的消息内容
                info_name = dict[msgId]['name'] # 取出消息文件名
                fromUser = dict[msgId]['fromUser'] # 取出发送者
                dt = dict[msgId]['dt'] # 取出发送时间
                recording_info(temp + '/' + info_name) # 保存语音
                # 拼接提示消息
                send_msg = '【发送人:】' + fromUser + '\n' + '发送时间:' + dt + '\n' + '撤回了一条语音'
                itchat.send(send_msg, 'filehelper') # 将提示消息发送给文件助手
                # 发送保存的语音
                itchat.send_file(temp + '/' + info_name, 'filehelper')
                del dict[msgId] # 删除字典中对应的消息
                print("保存语音")
            elif msg_type == 'Text':
                text_info = dict[msgId]['info'] # 取出消息标识对应的消息内容
                fromUser = dict[msgId]['fromUser'] # 取出发送者
                dt = dict[msgId]['dt'] # 取出发送时间
                # 拼接提示消息
                send_msg = '【发送人:】' + fromUser + '\n' + '发送时间:' + dt + '\n' + '撤回内容:' + text_info
                # 将提示消息发送给文件助手
                itchat.send(send_msg, 'filehelper')
                del dict[msgId] # 删除字典中对应的消息
                print("保存文本")
            elif msg_type == 'Picture':
                picture_info = dict[msgId]['info'] # 取出消息标识对应的消息内容
                fromUser = dict[msgId]['fromUser'] # 取出发送者
                dt = dict[msgId]['dt'] # 取出发送时间
                info_name = dict[msgId]['name'] # 取出文件名
                picture_info(temp + '/' + info_name) # 保存图片
                # 拼接提示消息
                send_msg = '【发送人:】' + fromUser + '\n' + '发送时间:' + dt + '\n' + '撤回了一张图片'
                itchat.send(send_msg, 'filehelper') # 将图片发送给文件助手
                # 发送保存的语音
                itchat.send_file(temp + '/' + info_name, 'filehelper')
                del dict[msgId] # 删除字典中对应的消息 
                print("保存图片")
    
    
    itchat.run()
    
    

    这样,一个完整的防撤回程序就完成了,如果你对于前面的铺垫能够掌握得很好的话,这个程序对你来说就是小菜一碟,每一句代码的注释我都有写,应该很容易看懂。

    测试程序

    到了激动人心的测试环节,我们来测试一下这个程序是否编写成功了。

    在这里插入图片描述

    我向我的好友发送了三条消息,分别是文本、图片和语音,接着我一一撤回,然后,微信程序就自动向文件传输助手发送了三条消息:
    在这里插入图片描述

    到这里,这个程序就基本完成了。你们在测试的时候也可以叫自己的好友、同学发给你几条消息,然后撤回看看是否能够成功获取到撤回的消息。

    撤回的消息发给别人肯定不行,这样不仅泄露了隐私,也会骚扰到别人,所以这里我选择将撤回的消息发送给文件传输助手,如何将消息发送给文件传输助手也很简单:

    itchat.send(send_msg, toUserName='filehelper')
    
    

    toUserName传入filehelper即可,这样,如果对方撤回了消息,你就可以前往文件传输助手查看对方究竟撤回了什么。

    说说我遇到的一些坑

    这个程序说它难,其实并不难,但我也在编写的过程中遇到了一些坑,一开始我是一条消息一条消息地进行测试,发现程序是正常的,但我连续撤回几条消息,却发现程序出现了Bug。比如我一开始发送了一张图片和一段文字,结果我撤回这两条消息后,得到的却是两段文字。后面我才醒悟过来,是后面的消息覆盖了前面的消息,导致了这个结果,所以在程序中,我定义了一个字典,用于存放好友输入的消息,当监听到消息被撤回时,就通过撤回消息产生的内容中的msgId去和字典中的匹配,匹配到的就是被撤回的消息,然后进行操作即可。

    使用教程

    想使用该程序非常简单,实现微信防撤回程序节点下有程序的完整代码,直接复制粘贴到你自己的python文件,然后运行该文件即可,运行后会产生一个二维码,用手机验证登录即可。
    当然,你也可以选择将该程序打包成可执行的exe文件,这样运行更加方便,打包方式:
    首先打开cmd窗口,下载pyinstaller模块,有的话就不用下载了,下载指令:pip insall pyinstaller,此时我们通过cmd窗口进入到python文件目录,比如我这里
    在这里插入图片描述
    那就进入到该目录下:
    在这里插入图片描述
    然后执行下面这条指令:

    pyinstaller -F wechat.py
    

    后面是需要打包的文件名,执行命令后,就会在文件同级目录下生成一个dist文件夹。
    在这里插入图片描述
    进入该文件夹,就看到我们的.exe文件了,然后双击执行即可。

    最后

    这个程序目前只实现了监听好友的文本、图片、语音类型的消息,对于其它类型的消息,还有群聊的消息都是无法监听到的,感兴趣的话大家可以自己试着实现一下。

    因为自己也是刚刚接触这个模块,文中的程序可能会出现一些意想不到的Bug,但目前我测试来看是没有问题的,如有问题,欢迎评论区留言。

    展开全文
  • 消息中间件(消息队列)

    万次阅读 2022-02-17 13:52:30
    消息中间件


    简介

    MQ(message queue)消息队列,也叫消息中间件。

    消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。

    它是类似于数据库一样需要独立部署在服务器上的一种应用,提供接口给其他系统调用。

    JMS规范

    消息中间件是遵守JMS(java message service)规范的一种软件(大多数消息中间件遵守JMS规范)。

    要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。现在既有开源的提供者也有专有的提供者。
    开源的提供者包括:Apache ActiveMQ、Kafka、WebMethods、阿里的RocketMQ等。

    专业术语

    • 提供者:实现JMS规范的中间件服务器。
    • 客户端:发送或者接受消息的应用程序。
    • 生产者:创建并发送消息的客户端。
    • 消费者:接受并处理消息的客户端。
    • 消息:应用程序之间传递的内容。
    • 队列:一个容纳那些被发送的等待阅读的消息的区域,一旦消息被消费,将被从队列中移走。
    • 主题 :一种支持发送消息给多个订阅者的机制。
    • 消息模式:在客户端之间传递消息的方式,JSM中定义了点对点模式(发送者接收者)和发布订阅模式(发布者订阅者)。

    消息模式

    点对点模式:Point-to-Point(P2P)

    消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

    消息被消费以后,queue中不再存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
    在这里插入图片描述

    1. 每个消息只有一个消费者。一旦被消费,消息就不再在消息队列中。

    2. 提供者和消费者之间在时间上没有依赖性。当提供者发送了消息之后,不管消费者有没有正在运行,它不会影响到消息被发送到队列。

    3. 每条消息仅会传送给一个消费者。可能会有多个消费者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个消费者所消费。

    4. 消息存在先后顺序。一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者。当已被消费时,就会从队列头部将它们删除(除非使用了消息优先级)。

    5. 消费者在成功接收消息之后需向队列应答成功。

    queue实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。

    发布订阅模式:Publish/Subscribe(Pub/Sub)

    消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
    在这里插入图片描述

    1. 每个消息可以有多个消费者。
    2. 发布者和订阅者之间有时间上的依赖性。针对某个主题的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
    3. 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
    4. 每条消息都会传送给称为订阅者的多个消息消费者。订阅者有许多类型,包括持久型、非持久型和动态型。
    5. 发布者通常不会知道哪一个订阅者正在接收主题消息。
    6. 消息被推送给消费者。这意味着消息会传送给消费者,而无须请求。

    topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。

    消息消费方式

    1. 同步

      订阅者或消费者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞。

    2. 异步
      订阅者或消费者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

    JMS规范接口

    • ConnectionFactor接口(连接工厂)
      用于创建连接到消息中间件的连接工厂。

      创建Connection对象的工厂,根据消息类型的不同,用户将使用队列连接工厂QueueConnectionFactory或者主题连接工厂TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

    • Connection接口(连接)
      Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装),代表了应用程序和消息服务器之间的通信链路。

      Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

    • Destination接口(目标)
      Destination是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。

      它是消息生产者的消息发送目标或者说消息消费者的消息来源。

      • 对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);
      • 对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

      所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

    • Session接口(会话)
      Session是我们操作消息的接口。表示一个单线程的上下文,用于发送和接收消息。

      由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。
      可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

    • MessageProducer接口(消息生产者)
      消息生产者由Session创建,并用于将消息发送到Destination。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

      同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

    • MessageConsumer接口(消息消费者)
      消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。

      可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

    • Message接口(消息)
      是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分。

      1. 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
      2. 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
      3. 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。消息接口非常灵活,并提供了许多方式来定制消息的内容。

      消息接口非常灵活,并提供了许多方式来定制消息的内容。

    • MessageListener(监听器)
      消息监听器,如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。

      EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

    消息中间件作用

    1.系统解耦

    系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低。

    2.异步通信

    消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

    对于一些非必须及时处理的业务,通过消息队列可以优化系统响应时间。提升系统性能。

    3.流量削峰

    使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

    4.数据采集

    分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

    5.可恢复性

    有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。

    许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

    6.可扩展性

    在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

    7.顺序保证

    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

    消息中间件协议

    1.AMQP协议

    AMQP(Advanced Message Queuing Protocol)高级消息队列协议,一个提供统一消息服务的应用层标准协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

    优点:可靠、通用

    部分相关产品:

    • RabbitMQ
      一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD平台。
    • OpenAMQ
      AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS。
    • Apache Qpid
      Apache的开源项目,支持C++、Ruby、Java、JMS、Python和.NET。
    • Zyre
      一个Broker,实现了RestMS协议和AMQP协议,提供了RESTful HTTP访问网络AMQP的能力。

    2.MQTT协议

    MQTT(Message Queuing Telemetry Transport)消息队列遥测传输,是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。

    该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

    优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统

    3.STOMP协议

    STOMP(Streaming Text Orientated Message Protocol)流文本定向消息协议,是一种为MOM(Message Oriented Middleware)面向消息的中间件设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。

    优点:命令模式(非topic\queue模式)

    部分相关产品:

    • ActiveMQ

    4.XMPP协议

    XMPP(Extensible Messaging and Presence Protocol)可扩展消息处理现场协议,是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。

    核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。

    优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大

    5.基于TCP/IP自定义协议

    有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。

    主流消息中间件

    ActiveMQ

    • 非常成熟,功能比较完备,大量公司使用;
    • 社区越来越不活跃,维护越来越少,几个月才发一次版;
    • 偶尔会有较低概率丢失消息;
    • 多数使用目的主要是用于解耦和异步通信,较少在大规模吞吐的场景中使用。

    RabbitMQ

    • 比较成熟,功能比较完备,大量公司使用;
    • Erlang语言开发,性能极其好,延时很低;
    • 比较好用,社区活跃,几乎每个月都发布几个版本;
    • 吞吐量万级,和其他相比会略低一些,这是因为他做的实现机制比较重;
    • Erlang开发,语言难度大,很难读源码,很难定制和掌控。基本只能依赖于开源社区的快速维护和修复bug。
    • 集群动态扩展会很麻烦,这主要是erlang语言本身带来的问题。

    RocketMQ

    • 文档相对来说简单一些,接口简单易用(接口不是按照标准JMS规范);
    • 阿里大规模应用,有保障(阿里日处理消息上百亿之多),可以做到大规模吞吐,性能也非常好;
    • 分布式扩展也很方便;
    • 社区比较活跃,维护还可以;
    • 可靠性和可用性都不错;
    • 支撑大规模的topic数量;
    • 支持复杂MQ业务场景;
    • Java语言编写,我们可以自己阅读源码。

    Kafka

    • 仅提供较少的核心功能;

    • 提供超高的吞吐量;

    • ms级的延迟;

    • 极高的可用性以及可靠性;

    • 分布式可以任意扩展;

    • 一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用;

    • topic的大幅增加会导致吞吐量的大幅度下降;

      所以尽量保证topic数量不要过多,以保证其超高吞吐量。如果要支撑大规模topic,需要增加更多的机器资源

    • 消息有可能重复消费;

    • 天然适合大数据实时计算以及日志收集,在大数据领域中以及日志采集得以广泛使用。

    4种MQ对比

    特性ActiveMQRabbitMQRocketMQKafka
    成熟度成熟成熟比较成熟成熟日志领域
    社区活跃度较高
    开发语言JavaErlangJavaScala
    跨语言支持,Java优先语言无关只支持Java支持,Java优先
    支持协议AMQP、MQTT、STOMP、OpenWireAMQP、MQTT、STOMPMQTT、TCPKafka
    JMS规范支持支持支持得不够好不支持
    持久化内存、文件、数据库内存、文件磁盘文件磁盘文件
    可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
    单机吞吐量万级万级万级十万级
    消息延迟毫秒级微秒级毫秒级毫秒级
    可靠性有较低的概率丢失数据有较低的概率丢失数据经过参数优化配置,可以做到0丢失经过参数优化配置,消息可以做到0丢失
    事务支持支持支持支持
    集群支持支持支持支持
    负载均衡支持支持支持支持
    文档完备完备完备完备
    是否开源开源开源开源开源
    所属社区/公司ApacheRabbitApacheApache
    消息服务默认端口616165672109119092
    管理后台单独部署
    管理后台默认端口8161156728080-
    部署方式独立、嵌入独立独立独立
    评价产品成熟,功能齐全,大量公司使用;有较低概率丢失消息;社区不够活跃,版本维护较少,公司产品重心不在该产品上Erlang开发,性能好,延迟低;大量公司使用;社区比较活跃;但erlang语言难度大,集群动态扩容很麻烦功能较为完善,社区比较活跃;还是分布式的,扩展性好功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用

    消息分发策略对比:

    消息分发策略ActiveMQRabbitMQRocketMQKafka
    发布订阅支持支持支持支持
    轮询分发支持支持-支持
    公平分发-支持-支持
    重发支持支持支持-
    消息拉取-支持支持支持

    MQ的选择

    最早大家用ActiveMQ。但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃。

    后来大家用RabbitMQ。但是确实erlang语言阻止了大量的java工程师去深入研究和掌控他,对公司而言,几乎处于不可控的状态,但是确实人是开源的,比较稳定的支持,活跃度也高。

    现在确实越来越多的公司会去用RocketMQ。

    • 对于中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;
    • 对于大型公司,基础架构研发实力较强,用RocketMQ是很好的选择;
    • 大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题。社区活跃度很高,何况Kafka几乎是全世界这个领域的规范制定者。
    展开全文
  • RabbitMQ消息模型详解

    千次阅读 2022-03-03 15:03:32
    RabbitMQ消息模型详解,原理详解,代码展示。其中包括简单消息模型,工作模型,订阅发布模型,订阅模型-Fanout,订阅模型-Direct。

    目录

    一、消息队列

    什么是消息队列

     AMQP和JMS

     常见MQ产品

    二、RabbitMQ

    三、五种消息模型

    四、简单消息模型

    代码演示

    获取连接

    生产者

    消费者

    手动ACK

    五、工作模式

    代码演示:

    生产者

    消费者1和消费者2

    六、发布订阅模式

    七、订阅模型-Fanout

    演示代码:

    生产者

     消费者1和消费者2

    八、订阅模型-Direct

    代码演示:

    生产者

    消费者1和消费者2

    九、订阅模型-Topic

    代码演示

    生产者

    消费者1和消费者2

    十、如何避免消息丢失?---持久化

    交换机持久化​

    队列持久化

    队列持久化​


    一、消息队列

    什么是消息队列

    消息队列,即MQ,Message Queue。

    消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦

     AMQP和JMS

    MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

    两者间的区别和联系:

    • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
    • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
    • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

     常见MQ产品

    ActiveMQ:基于JMS

    • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
    • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
    • Kafka:分布式消息系统,高吞吐量

    二、RabbitMQ

    RabbitMQ是基于AMQP的一款消息管理系统

    官网: Messaging that just works — RabbitMQ

    官方教程:RabbitMQ Tutorials — RabbitMQ

    安装教程:小小张自由—>张有博_CSDN博客-C#编程基础,项目实战,Java进阶领域博主

    RabbitMQ 基本概念

     

    Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

    Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。

    Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

    Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

    Connection
    网络连接,比如一个TCP连接。

    Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

    Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

    Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

    Broker
    表示消息队列服务器实体。

    三、五种消息模型

    RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。

    但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

    四、简单消息模型

    RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

    RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

    P(producer/ publisher):生产者,一个发送消息的用户应用程序。

    C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

    队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

    总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

    代码演示

    引入依赖

     <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
           <version>2.0.6.RELEASE</version>
    </dependency>

    配置文件

    spring:
      rabbitmq:
        host: 192.168.99.99
        username: leyou
        password: leyou
        virtual-host: /leyou
    

    获取连接

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    
    public class ConnectionUtil {
        /**
         * 建立与RabbitMQ的连接
         * @return
         * @throws Exception
         */
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("192.168.99.99");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("/leyou");
            factory.setUsername("leyou");
            factory.setPassword("leyou");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    
    }

    生产者

    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    /**
     * 生产者
     */
    public class Send {
    
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道,使用通道才能完成消息相关的操作
            Channel channel = connection.createChannel();
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 消息内容
            String message = "Hello World!";
            for (int i = 0; i < 10; i++) {
                // 向指定的队列中发送消息
                message=message+i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    
                System.out.println(" [x] Sent '" + message + "'");
            }
    
    
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    消费者

    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    /**
     * 消费者
     */
    public class Recv {
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [x] received : " + msg + "!");
                }
            };
            // 监听队列,第二个参数:是否自动进行消息确认。
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    上述代码中:消息一旦被消费者接收,队列中的消息就会被删除。

    如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

    因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

    • 自动ACK:消息一旦被接收,消费者自动发送ACK

    • 手动ACK:消息接收后,不会发送ACK,需要手动调用

    手动ACK

    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    /**
     * 消费者,手动进行ACK
     */
    public class Recv2 {
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    //int i=1/0;
                    String msg = new String(body);
                    System.out.println(" [x] received : " + msg + "!");
                    // 手动进行ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列,第二个参数false,手动进行ACK
            // 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    五、工作模式

    工作队列或者竞争消费者模式

    工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取

    这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。

    接下来我们来模拟这个流程:

    P:生产者:任务的发布者

    C1:消费者,领取任务并且完成任务,假设完成速度较快

    C2:消费者2:领取任务并完成任务,假设完成速度慢
     

    我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

    代码演示:

    生产者

    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    // 生产者
    public class Send {
        private final static String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 循环发布任务
            for (int i = 0; i < 50; i++) {
                // 消息内容
                String message = "task .. " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
    
                Thread.sleep(i * 2);
            }
            // 关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    消费者1和消费者2

    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    // 消费者1
    public class Recv {
        private final static String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 设置每个消费者同时只能处理一条消息
            //channel.basicQos(1);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                    try {
                        // 模拟完成任务的耗时:1000ms
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                    // 手动ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列。
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    
    
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    //消费者2
    public class Recv2 {
        private final static String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 设置每个消费者同时只能处理一条消息
            //channel.basicQos(1);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                    // 手动ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列。
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    面试题:避免消息堆积?

    1)采用workqueue,多个消费者监听同一队列。

    2)接收到消息以后,而是通过线程池,异步消费。

    六、发布订阅模式

    解读:

    1、1个生产者,多个消费者

    2、每一个消费者都有自己的一个队列

    3、生产者没有将消息直接发送到队列,而是发送到了交换机

    4、每个队列都要绑定到交换机

    5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的

    X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    Exchange类型有以下几种:

    Fanout:广播,将消息交给所有绑定到交换机的队列

    Direct:定向,把消息交给符合指定routing key 的队列 

    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    七、订阅模型-Fanout

    Fanout,也称为广播。

    流程图:

     在广播模式下,消息发送流程是这样的:

    • 1) 可以有多个消费者

    • 2) 每个消费者有自己的queue(队列)

    • 3) 每个队列都要绑定到Exchange(交换机)

    • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

    • 5) 交换机把消息发送给绑定过的所有队列

    • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

    演示代码:

    生产者

    • 1) 声明Exchange,不再声明Queue

    • 2) 发送消息到Exchange,不再发送到Queue

    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
    
            // 声明exchange,指定类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
    
            // 消息内容
            String message = "Hello everyone";
            // 发布消息到Exchange
            channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [生产者] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }

     消费者1和消费者2

    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    //消费者1
    public class Recv {
        private final static String QUEUE_NAME = "fanout_exchange_queue_1";
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动返回完成
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    // 消费者2
    public class Recv2 {
        private final static String QUEUE_NAME = "fanout_exchange_queue_2";
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    八、订阅模型-Direct

    在广播模式中,生产者发布消息,所有消费者都可以获取所有消息。

    在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。 

    但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

    在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

    消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。

    P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

    X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

    C1:消费者,其所在队列指定了需要routing key 为 error 的消息

    C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

    代码演示:

    生产者

    此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete

    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    /**
     * 生产者,模拟为商品服务
     */
    public class Send {
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为direct
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 消息内容
            String message = "商品删除了, id = 1001";
            // 发送消息,并且指定routing key 为:insert ,代表新增商品
            channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
            System.out.println(" [商品服务:] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }

    消费者1和消费者2

    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    /**
     * 消费者1
     */
    public class Recv {
        private final static String QUEUE_NAME = "direct_exchange_queue_1";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    
    
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    /**
     * 消费者2
     */
    public class Recv2 {
        private final static String QUEUE_NAME = "direct_exchange_queue_2";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    九、订阅模型-Topic

    Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

    Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

    通配符规则:

    `#`:匹配一个或多个词 ​

    `*`:匹配不多不少恰好1个词

    代码演示

    生产者

    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    /**
     * 生产者,模拟为商品服务
     */
    public class Send {
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为topic
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            // 消息内容
            String message = "新增商品 : id = 1001";
            // 发送消息,并且指定routing key 为:insert ,代表新增商品
            channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
            System.out.println(" [商品服务:] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }

    消费者1和消费者2

    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    /**
     * 消费者1
     */
    public class Recv {
        private final static String QUEUE_NAME = "topic_exchange_queue_1";
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    十、如何避免消息丢失?---持久化

    1) 消费者的ACK机制。可以防止消费者丢失消息。

    2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。

    是可以将消息进行持久化

    要将消息持久化,前提是:队列、Exchange都持久化

    交换机持久化

    队列持久化

    队列持久化

    如果本篇博客对您有一定的帮助,大家记得留言+点赞+收藏哦。 

    展开全文
  • 【RocketMQ】消息的存储设计

    万次阅读 2022-07-19 11:35:42
    CommitLog:存储消息的元数据 ConsumerQueue:存储消息在CommitLog的索引 IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程 ...
  • RocketMQ(五)RocketMQ消息生产及消息储存机制
  • 延迟消息的五种实现方案

    万次阅读 多人点赞 2021-01-12 12:09:06
    针对延迟消息,本文分享五种不同的实现方案,并逐一讨论各种方案的大致实现和优缺点。
  • RocketMQ(八)RocketMQ延时消息

    千次阅读 2022-03-22 13:22:34
    消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息。 二、延时消息等级 RocketMQ延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。...
  • 十二张图,踹开消息队列的大门

    千次阅读 多人点赞 2021-07-08 22:18:24
    消息队列,应用广泛,面试必问。一篇文章,十二张图,我们一起走进消息队列的世界。
  • RabbitMQ如何防止消息丢失及重复消费

    千次阅读 2022-03-20 14:48:19
    文章目录RabbitMQ如何防止消息丢失及重复消费一、消息丢失1.1、生产者没有成功把消息发送到MQ1.1.1、confirm(发布确认)机制1.1.2、事务机制1.2、RabbitMQ接收到消息之后丢失了消息1.3、消费者弄丢了消息二、如何...
  • 消息队列

    千次阅读 多人点赞 2019-09-19 21:42:59
    消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用...
  • RabbitMQ消息队列常见面试题总结

    万次阅读 多人点赞 2021-03-22 02:46:39
    RabbitMQ消息队列常见面试题总结; 1、什么是消息队列?消息队列的优缺点? 2、Kafka、ActiveMQ、RabbitMQ、RocketMQ的区别? 3、如何保证消息不被重复消费? 4、如何保证消息不丢失,进行可靠性传输? 5、如何保证...
  • 15 Redis 实现消息队列

    万次阅读 2021-12-06 21:13:44
    15 Redis 实现消息队列前言一、消息队列的消息存取需求二、基于 Streams 的消息队列解决方案总结 前言 消息队列要能支持组件通信消息的快速读写,而 Redis 本身支持数据的高速访问,正好可以满足消息队列的读写性能...
  • 什么是消息队列?

    万次阅读 多人点赞 2021-08-17 20:13:42
    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,...
  • 一、RocketMQ 支持 3 种消息发送方式 : 1、同步消息(sync message ) producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 。 2、异步消息(async message) producer向 broker ...
  • RT-Thread快速入门-消息队列

    千次阅读 2022-03-18 12:47:08
    首发,公众号【一起学嵌入式】 哈哈,RT-Thread 快速入门系列文章登上官方论坛 “今日...上一篇介绍了消息邮箱,本篇文章介绍线程(任务)间通信的另一种方式——消息队列。 消息队列在实际项目中应用较多,建议初.
  • 如何用Python记录微信撤回的消息

    万次阅读 多人点赞 2021-11-03 21:15:55
    想查看微信好友撤回的消息?Python帮你搞定 因此要是下文中有什么讲的不清楚的地方,大家也可以参考上面这篇文档。(总感觉腾讯云里面抓来一篇教自动化爬微信的工具的文档,em…不得不说,腾讯心挺大啊!) 一、pip ...
  • 2、多个进程可同时向一个消息队列发送消息,也可以同时从一个消息队列中接收消息。发送进程把消息发送到队列尾部,接受进程从消息队列头部读取消息消息一旦被读出就从队列中删除。 二、结构 1、消息队列中消息本身...
  • 队列是为了任务与任务、任务与中断之间的通信而准备的,可以在任务与任务、任务与中断之间传递消息,队列中可以存储有限的、大小固定的数据项目。 任务与任务、任务与中断之间要交流的数据保存在队列中,叫做队列...
  • 通俗易懂讲消息队列

    万次阅读 多人点赞 2020-03-05 17:00:21
    一、什么是消息队列? 消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的。 消息队列,一般我们会简称它为MQ(Message Queue),嗯...
  • 用Python实现自动发消息,自定义内容,太省事了!

    万次阅读 多人点赞 2021-08-16 11:27:22
    有时候让了解放双手,让电脑来帮我们自动发一些我们想要发的消息,挺省力的,比如说白天写好了演讲稿,晚上要在群里进行文字演讲,那么我们就可以用脚本来实现自动复制、粘贴和发送文字的功能,从而解放我们自己,...
  • 基于消息推送的聊天工具

    千次下载 热门讨论 2013-06-08 15:20:46
    本应用是基于百度云推送的一款轻量级聊天工具,包含多个开源项目库,同时本代码也已经开源,欢迎访问我的博客主页:http://blog.csdn.net/weidi1989,由于时间仓促,错误与疏忽之处在所难免,希望各位朋友们以邮件的...
  • 密码学入门(6):消息认证码

    千次阅读 2022-02-09 22:03:24
    消息认证码的实现方式 HMAC 认证加密 重放攻击 防御重放攻击的方式 消息认证码无法解决的问题 参考 上次讲到的单向散列函数只能保证数据传输的完整性,不能提供认证功能,即 Bob 不能确定消息是否来自 Alice,只能...
  • RocketMQ延迟消息的代码实战及原理分析

    万次阅读 多人点赞 2020-07-07 09:44:14
    RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠、万亿级容量、灵活可伸缩的消息发布与订阅服务。
  • 非日志的可靠消息传输 非日志的可靠消息传输 非日志的可靠消息传输 系统间的数据量管道 队列模式 √ √ √ √ 订阅模式 √ √ √ √ 回复模式 √ √
  • MQ消息队列

    万次阅读 2022-02-22 10:27:16
    我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。 消息队列是分布式系统中重要的组件之一。使用消息队列主要是为了通过异步处理提高系统性能和削峰、...
  • JavaWeb后台自动向前台发送消息

    千次下载 热门讨论 2014-07-09 10:12:04
    JavaWeb项目后台向前推送消息,主要是是利用第三方包Comet4J,附件中是MyEclipse开发的源码,可直接运行。
  • 事务消息流程介绍 中文图更友好 上图说明了事务消息的大致方案,其中分为两个流程: 正常事务消息的发送及提交(黑线走的流程) 事务消息的补偿流程(黑线+红线走的流程) 1)正常事务消息的发送及提交 (1) 发送...
  • 背景IM消息作为闲鱼用户重要的交易咨询工具,核心目标有两点,第一是保证用户的消息不丢失,第二是保证用户的消息及时送达接收方。IM消息根据消息的接收方设备是否在线,分为离线和在线推送,数据...
  • 消息队列”是在消息的传输过程中保存消息的容器。 “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。 消息被发送到队列中。“消息队列”是在...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,999,481
精华内容 1,199,792
关键字:

消息

友情链接: object3.rar