精华内容
下载资源
问答
  • MQ

    2020-07-15 16:09:28
    本文记载MQ的学习笔记。

    本文记载MQ的学习笔记。

    可以参考转载链接,应该写的比我更准确更详细一点。

    MQ:message queue

    消息队列就是基础数据结构中的‘先进先出’的一种数据结构。

    MQ一直就存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。

    主要用来解决如下问题:

    1,系统解耦;

    2,流量消峰;

    3,消息分发;

    4,异步消息。

    消息中间件的组成:

    broker:消息服务器,作为server提供消息核心服务;

    producer:消息生产者,业务发起方,负责生产消息传输给broker;

    consumer:消息消费者,业务处理方,负责从broker获取消息并进行业务逻辑处理;

    topic:主题。发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ分发到不同的订阅者,实现消息的广播;

    queue:队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收;

    message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输。

    消息中间件模式分类:

    1,点对点

    PTP点对点:使用queue作为通信载体

    功能:一个生产者P发送消息到队列Q,一个消费者C接收

    生产者实现思路:

    创建连接工厂connectionFactory,设置服务器地址127.0.0.1,端口号5672,设置用户名密码和virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接。

    消费者实现思路:

    创建连接工厂connectionFactory,设置服务器地址为127.0.0.1,设置用户名密码和virtual,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,创建消费者并监听队列,从队列中读取消息。

     

    2,发布/订阅:

    pub/sub发布订阅(广播):使用topic作为通信载体

    说明:queue实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到一个可用的消费者。

    topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到n个订阅者都能得到一个消息的拷贝。

    这个模式是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。

    功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列和多个消费者。

    思路解读(重点理解):

    1)一个生产者,多个消费者;

    2)每一个消费者都有自己的一个队列

    3)生产者没有直接发送消息到队列中,而是发送到交换机

    4)每个消费者的队列都绑定到交换机上

    5)消息通过交换机到达每个消费者的队列

    注意:交换机没有存储消息功能,如果是消息发送到没有绑定消费队列的交换机,则消息丢失。

    具体可看rabbitmq中的实例代码,就是采用该种模式。

     

    消息中间件常用协议:

    AMQP:advanced message queueing protocol:一个提供统一消息服务的应用层标准高级消息队列协议。

    MQTT:message queueing telemetry transport:消息队列遥测传输。

    STOMP:streaming text orientated message protocol: 流向文本定向消息协议。

    XMPP:extensible messaging and prensence protocol:基于可扩展标记语言(XML)的协议.

    其他基于TCP/IP自定义的协议。

    常见消息中间件MQ介绍

    1,RocketMQ:阿里系下开源的一款分布式、队列模型的消息中间件,原名metaq,主要用于订单交易系统。

    具有如下特点:

    能够保证严格的消息顺序;

    提供针对消息的过滤功能;

    提供丰富的消息拉取模式;

    高校的订阅者水平扩展能力;

    实时的消息订阅机制;

    亿级消息堆积能力。

    2,RabbitMQ:使用Erlang编写的一个开源的消息队列。本身支持很多协议:AMQP,XMPP,SQTP,STOMO,使得它变得非常重量级,更适合企业级的开发,对路由,负载均衡,数据持久化都有很好的支持。

    3,ActiveMQ:Apache下的一个子项目。

    4,Reidis:使用C语言开发的一个key-value的Nosql数据库,开发维护很活跃,虽然是一个key-value数据库存储系统,但是它本身支持MQ功能,所以可以完全当作一个轻量级的队列服务使用。但是,如果入队数据大小超过了10K,redis则慢的无法忍受,出队时,无论数据大小,redis都表现出非常好的性能。

    5,Kafka:Apache下的一个子项目,使用scala实现的一个高性能分布式publish/subscribe消息队列系统。

    6,zeroMQ :号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,偏重于实时数据通信场景。但是开发人员需要自己组合多种技术框架,开发成本高。

     

     

     

    展开全文
  • MQ

    2020-08-31 20:04:11
    1、为什么使用MQ 核心:解耦,异步,削峰 解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统...
    1、为什么使用MQ

    核心:解耦,异步,削峰

    • 解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统 产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一 条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统 压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超 时等情况。
      就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但 是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。
    • 异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库 要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应 给用户,总时长是 3 + 5 = 8ms。
    • 削峰:减少高峰时期对服务器压力。
    2、MQ优缺点

    优点上面已经说了,就是在特殊场景下有其对应的好处,解耦、异步、削峰。
    缺点有以下几个:

    • 系统可用性降低
      系统引入的外部依赖越多,越容易挂掉。万一 MQ 挂了,MQ 一挂,整套系统崩溃,你不就完了?
    • 系统复杂度提高
      硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的 顺序性?问题一大堆。
    • 一致性问题
      A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那 里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。
    3、Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别?

    对于吞吐量来说kafka和RocketMQ支撑高吞吐,ActiveMQ和RabbitMQ比他们低一个数量级。对于延迟量来说RabbitMQ是最低的。

    • 1.从社区活跃度
      按照目前网络上的资料,RabbitMQ 、activeM 、ZeroMQ 三者中,综合来看,RabbitMQ 是首选。
    • 2.持久化消息比较
      ActiveMq 和RabbitMq 都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息 不会丢失的机制。
    • 3.综合技术实现
      可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。
      RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。当然ZeroMq 也可以做到,不过自己必须手 动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。
    • 4.高并发
      毋庸置疑,RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言。
    • 5.比较关注的比较, RabbitMQ 和 Kafka
      RabbitMq 比Kafka 成熟,在可用性上,稳定性上,可靠性上, RabbitMq 胜于 Kafka (理论上)。
      另外,Kafka 的定位主要在日志等方面, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志 (消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq 。
      还有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出来很多。
    4、如何保证高可用的?

    RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例 子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

    • 单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式
    • 普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接 到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量 的,就是说让集群中多个节点来服务某个 queue 的读写操作。
    • 镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像 集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每 个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消 息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台, 就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节 点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据 同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第 一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重! RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的 完整数据。

    Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这 个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放 一部分数据。这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每 个机器就放一部分数据。Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候, leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度 太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可 以提高容错性。因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都 有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一 旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会 返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)消费的时候, 只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息 才会被消费者读到。

    5、如何保证消息的可靠传输?如果消息丢了怎么办

    数据的丢失问题,可能出现在生产者、MQ、消费者中

    • 生产者丢失:生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥 的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异 常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提 交事务channel.txCommit。吞吐量会下来,因为太耗性能。所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的 消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告 诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息 接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一 定时间还没接收到这个消息的回调,那么你可以重发。事务机制和cnofirm机制最大的不同在于,事务 机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就 可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收 到了。所以一般在生产者这块避免数据丢失,都是用confirm机制的。
    • MQ中丢失:就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之 后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会 丢。设置持久化有两个步骤:创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久 化 queue 的元数据,但是不会持久化 queue 里的数据。第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上 去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。持久化可以跟生产者那边的confirm机制配合起来,只有消息被持 久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢 了,生产者收不到ack,你也是可以自己重发的。注意,哪怕是你给 RabbitMQ 开启了持久化机制,也 有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
    • 消费端丢失:你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了, RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的ack机制,简单来说,就 是你关闭 RabbitMQ 的自动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的 时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那 RabbitMQ 就认为你还 没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
    7、 如何解决消息队列的延时以及过期失效问题?消息队列满了以后 该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

    消息积压处理办法:临时紧急扩容:
    先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
    新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时 的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
    接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做 法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。 MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消 息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑 了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是 批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据 了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们 就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把 白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
    mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了, 咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消 费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据 吧。

    8、设计MQ的思路

    比如说这个消息队列系统,我们从以下几个角度来考虑一下:
    首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设 计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一 个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移, 增加机器,不就可以存放更多数据,提供更高的吞吐量了?
    其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就 丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能 是很高的,这就是 kafka 的思路。
    其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用 保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
    能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案。

    展开全文
  • RabbitMQClientUtil是MQ的测试工具类,他封装了fanout、direct、topic三种exchange模式,并包括发送数据和接收数据。 Test1、Test2是测试类 使用maven管理,在pom.xml文件中引入如下代码: <!-- Rabbitmq工具包...
  • activity MQ 做简单的即时聊天工具

    千次阅读 2019-01-09 11:41:28
    最近项目上需要一个activity Mq的即时通讯,这两天开始学习,写篇文章记录一下学习心得,也希望有大神能指点一二。 进入正题,首先是安装activityMq,直接去官网下载下来解压就可以了,然后进入bin目录根据操作系统...

    最近项目上需要一个activity Mq的即时通讯,这两天开始学习,写篇文章记录一下学习心得,也希望有大神能指点一二。

    进入正题,首先是安装activityMq,直接去官网下载下来解压就可以了,然后进入bin目录根据操作系统版本进入文件夹(我的是Windows64位),双击activitymq.bat进入小黑框(应该不会出现问题,倒霉如我也没在这上遇到坑,如果真的有,请百度之后告诉我解决方案(:)。

    小黑框如果长这样那就算成功了。

    启动之后就可以写代码了(应该是代码写完之后启动它。。。。好像差不多?。。。。。。

    如果使用maven,直接到官网去copy就好,如果使用jar,直接到你刚刚下的那个压缩包里边找一个叫activemq-all.jar的就好。

    正式写代码:

    po封装类:

    package com.mq.demo;
    
    /**
     * @author HTC
     * @create 2019-01-03 16:50
     * @desc 一个简单的ajax模板
     **/
    public class Po {
        private String State;
        private String mag;
        private String userName;
    
        public Po(String state, String mag, String userName) {
            State = state;
            this.mag = mag;
            this.userName = userName;
        }
        public Po(String state, String mag) {
            State = state;
            this.mag = mag;
        }
        public Po() {}
    
        public String getState() {
            return State;
        }
    
        public void setState(String state) {
            State = state;
        }
    
        public String getMag() {
            return mag;
        }
    
        public void setMag(String mag) {
            this.mag = mag;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public void setUserName(String userName) {
            this.userName = userName;
        }
    }
    

    消息发送端的代码:

    package com.mq.demo;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @author HTC
     * @create 2019-01-04 20:05
     * @desc 消息发送方
     **/
    public class MqSender {
    
        ConnectionFactory connectionFactory;
        Connection connection;
        Session session=null;
        Destination destination;
        MessageProducer producer;
        MessageConsumer messageConsumer;
        /*初始化*/
        public  boolean start(){
            //连接工厂
            connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
            try {
                //连接
                connection=connectionFactory.createConnection();
                //启动
                connection.start();
                //创建session
                session=connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                //消息目的地
    //            destination = session.createQueue("FirstQueue");
                destination=session.createQueue("Javcekon-MQ");
                //消息生产者
                producer = session.createProducer(destination);
                //设置不持久化
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            } catch (JMSException e) {
                e.printStackTrace();
                close();
                return false;
            }
            return true;
        }
    
        /*发送消息*/
        public Po send(String text){
            Po po=new Po();
            try {
    //            创建消息
                TextMessage textMessage=session.createTextMessage(text);
    //            发送消息
                producer.send(textMessage);
                 //如果输入exit,则退出,没有写退出后对消息的解决方案
                if("exit".equals(text)){
                    close();
                }
            } catch (JMSException e) {
                e.printStackTrace();
                close();
                po=new Po("0","error");
            }
            po=new Po("1",text);
            return po;
        }
    
            /*关闭*/
        public void close(){
            try {
                connection.close();
                session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    消息接收方的代码:
     

    package com.mq.demo;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author HTC
     * @create 2019-01-04 8:42
     * @desc 消费者
     **/
    public class Redriver {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory=null;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection=null;
        // Session: 一个发送或接收消息的线程
        Session session=null;
        // Destination :消息队列,消息的存放地
        Destination destination=null;
        // MessageProducer:消息发送者
        MessageConsumer messageConsumer=null;
    
        public void start(){
            // TextMessage message;
            // 构造ConnectionFactory实例对象
            //tcp://127.0.0.1:61616 是对方的消息队列的地址,因为是给自己发消息,所以发送方和接收方地址一样
            connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
            // 构造从工厂得到连接对象
            try {
                connection=connectionFactory.createConnection();
                connection.start();
                session=connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("Javcekon-MQ");
                messageConsumer = session.createConsumer(destination);
            } catch (JMSException e) {
                e.printStackTrace();
                close();
            }
        }
    
            /*将收到的消息添加到list*/
            List<Po>list=new ArrayList<>();
            /*接收消息*/
            public List<Po> GetMessage(){
                try {
                    /*初始化*/
                    start();
                    while (true) {
                        //设置接收者接收消息的时间
                        TextMessage message = (TextMessage) messageConsumer.receive(500);
                        //如果为空则跳出循环
                        if ( message!=null) {
                        //封装消息
                            Po po=new Po();
                            po.setUserName(message.getJMSMessageID());
                            po.setMag(message.getText());
                            list.add(po);
                        }
                        else {
                            break;
                        }
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }finally {
                    close();
                }
                return list;
            }
    
            public void close(){
                try {
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    
    }
    

    ok,要用的东西都齐全了,开搞!

    我用的是springboot,直接上配置源码(其他的框架也一样,如果只想写纯java自己动手改一下就odk了)

    #server start
    server.port=8081
    #server end
    
    #thymeleaf start
    spring.thymeleaf.mode=HTML5
    spring.thymeleaf.encoding=UTF-8
    spring.thymeleaf.servlet.content-type=text/html
    #开发时关闭缓存,不然没法看到实时页面
    spring.thymeleaf.cache=false
    
    #static 文件夹下的静态文件访问路径
    spring.mvc.static-path-pattern=/static/**
    #thymeleaf end

    一个极其简陋的客户端界面:

    它是通过定时向后台发送消息接收请求的方式来获得消息的,所以会看到有时候没得一条消息,有时候几十条消息,,,,(毕竟是几秒钟才请求一次,消息积压得多)

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>消息</title>
        <script type="text/javascript" src="/static/jquery-1.7.2.min.js"></script>
    </head>
    <body>
            <input type="text" id="inputInfo">
    
            <button id="send">发送</button><br/>
    
            <textarea id="textarea" style="background-color: azure;width: 400px;height: 500px"></textarea>
    
    </body>
        <script>
            $("#send").click(function (e) {
                var text = document.getElementById("inputInfo").value;
                var textarea = document.getElementById("textarea");
                textarea.innerHTML+="我:"+text+"。\n";
                text.innerText="";
                $.ajax({
                    type:"post",
                    url:"/Main/send",
                    dataType:"json",
                    data:{"text":text},
                    success:function (data) {
                        if(data.State=="0"){
                            alert("消息发送失败");
                        }
                    }
                })
            });
            //定时到后台获取消息
            ref = setInterval(function(){
                var textarea = document.getElementById("textarea");
                $.ajax({
                    type:"post",
                    url:"/Main/getMessage",
                    dataType:"json",
                    success:function (data) {
                        console.log(data);
                        for(var i=0;i<data.length;i++){
                            textarea.innerHTML+=+data[i].userName+":"+data[i].mag+"。\n";
                        }
                    }
                })
            },2500);
    
        </script>
    </html>

    一个极其简陋的后台controller:

    package com.mq.demo;
    
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    import java.util.List;
    
    /**
     * @author HTC
     * @create 2019-01-03 16:41
     * @desc 用于页面跳转
     **/
    @Controller
    @RequestMapping("/Main")
    public class MainController {
        MqSender mqSender=new MqSender();
        /**
        * @Author: HTC
        * @Description: 用于消息发送
        * @Param: String text
        * @return: Po po
        * @Date: 2019/1/4
        */
        @RequestMapping("/send")
        @ResponseBody
        public Po sendMasage(@RequestParam("text") String text){
            Po po=null;
            po=mqSender.send(text);
            return po;
        }
        
        /**
        * @Author: HTC
        * @Description: 在进入发送消息的HTML页面前就开始初始化mq
        * @Param: null
        * @return: String 页面
        * @Date: 2019/1/4
        */
        @RequestMapping("toSendMassage")
        public String toSendMassage(){
            //页面加载前初始化Mq
            boolean b=mqSender.start();
            if(!b){
                return "error";
            }
            return "SendMassage";
        }
        /**
        * @Author: HTC
        * @Description: 从消费者获得消息
        * @Param: null
        * @return: List<Po> list
        * @Date: 2019/1/4
        */
        @RequestMapping("getMessage")
        @ResponseBody
        public List<Po> toGetMessage(){
            Redriver redriver=new Redriver();
            List<Po> list=redriver.GetMessage();
            return list;
        }
    
    }
    

    跑一个?大概就这样了(我是给自己的消费者发消息,所以返回的是一样的)

    注意:

    正常情况是A有一个客户端A1和一个消费者端A2,B也有一个客户端B1和一个消费者端B2,如果A要给B发消息,是A1发消息给B2,所以A1的地址是填的B2的。

    下一步是把ajax轮询替换成webSocket。

    ------------------------------------------------------------------雷区警戒线----------------------------------------------------------------------------

    坑1:

    我发了20条信息,却只接到了15条?消息似乎消失了?

    填坑:

    1.你得看看是不是在某些地方消费了那些消息,毕竟有多个消费者消费消息的时候,你的消费者不一定会抢得到那么多。。。。

    你可以去你的管理界面看看,如果ajax轮询因为你的代码习惯或者什么什么之类的问题导致了它在每次查询的时候都创建了一个新的消费者(而且因为你还在发消息它还没法关掉),那可能消息就莫名丢失了。

    坑2:

    我消息发得好好的,为啥接收的消息就顺序是乱的?

    填坑:

    在这里有两个关于时间的设置:第一个是: TextMessage message = (TextMessage) messageConsumer.receive(500);这是设置消费者消息接收的时间间隔(就叫“后台时间”),可以减少资源占用(毕竟不用时时刻刻盯着有没得消息过来,没那么累,,,,我就这么理解的)

    第二个是Html页面的请求消息接收的定时器(就叫“前台时间”了): ref = setInterval(function(){.........省略实现代码.....},2500);

    如果你的手速(输入的速度)大于这两个时,并且前台时间小于后台时间的话,输入的东西就会乱掉。(手速是个好东西,可是。。。。。。不说了,,,,,,其实这个坑是可以控制的,毕竟人的输入速度不可能小于0.5秒吧,把值设置得低点就好了)。

    异常:javax.jms.InvalidClientIDException: Broker: localhost - Client: HTC already connected from tcp://127.0.0.1:54914

    其实就是一个端口只能开一个页面,多开页面是不存在的,至于解决方案嘛,暂时没有。。。。。。。

    --------------------------------------------------------------我不是系统的生产者,我只是bug的搬运工----------------------------------------

    上一篇文章:springboot整合dubbo之dubbo管理平台搭建以及服务搭建SpringBoot 学习笔记(第五天)

    展开全文
  • mq

    千次阅读 2012-08-31 02:00:39
    这个工具作为消息网络拓扑图工具。再一次感谢各位朋友对我们的信任。 现在,我们正式启动 activeMQ 笑脸计划。它的目的不再是给大家提供解决问题的方向,而是直接解决大家碰到的各种问题,给大家带去笑脸。它将是...
    在接触
    activeMQ
    的这一段时间里,我们还是保持开始对它的态度,它是个优秀的开源消息中间件。消息中间件是个非常重要的搭建企业应用系统的重要组件,我们在不断深入分析
    activeMQ
    的过程中,发现直到
    5.1
    这个版本,都还是存在不少问题,有些是很致命,但正因为如此,我们更加坚定了要全面掌握
    activeMQ
    ,我们不想重新做“轮子”,但我们要具备在轮子坏了或不好用的情况下,要能独立解决碰到的这些问题。下面我们通过分析网友提出的一个典型的问题场景,来作为我们指南针计划的结束。
    
    Queue作为activeMQ里面一个很重要的通讯方式,网友的场景如下:
    测试queue持久化消息时,发送接收20W条消息。打开消息消费者,连上再断开,反复进行这步操作,能接收到消息,接收端有时候会阻塞,但不能完全接收完20W条消息。(其实5000条就会发生问题,不用20W这么多)
           相关背景知识:
           因为这是5.1版本的一个非常严重的bug,所以我们会比较详细的进行分析。(我们在最终解决问题后,上activeMQ官网上发现它最新的源码是解决了该问题的,但这并不影响这个问题的典型性)。下面我们将从3个方面来分析:Queue消息的接收和发送、内存使用机制、消息的审查(audit)、消息在文件中的存储机制。
    l
    Queue消息的接收和发送
    queue接收消息.JPG
    2009-8-11 09:22 上传
    下载附件 (21.81 KB)


    Queue接收消息并发给需要的消费者,具体过程如下:
    1.
    Queue
    从消息生产者接收消息。
    2.
    Queue
    使用一个“存储指针”来接收这些消息。当内存有空闲区域时,“存储指针”把消息放到内存中,当内存不够时,则把消息们存入磁盘文件。
    3.
    当有活动的(active)的消息消费者时,Queue会首先把“存储指针”的内存中的消息送给消费者,当内存的消息被消费掉,则从磁盘文件中再读入其他的消息(出问题处),直至消息都被消费掉了。
    其中最关键的方法是 Queue 类里的 doPageIn()

    l
    内存使用机制
    activeMQ为了适应企业级的365*24的使用,在内存使用方面非常慎重,任何消息只有在内存里有空闲区域时,才能放到内存里,之后才能发给消费者。当消息被消费者消耗掉了后,确认信息会发给activeMQQueue接收到这些确认消息后,会把那些被确认的消息所占用的内存释放掉。

    l
    消息的审查(audit)
    为了防止消息的重复发送,activeMQ采用了一个审查机制,它负责审查某条消息是否重复。它是一个最近最久未使用算法(LRU)队列。每个队列元素它是一个bit数组,它的运行机制如下所示:
    ActiveMQMessageAudit处理细节.JPG
    2009-8-11 09:22 上传
    下载附件 (15.24 KB)



           消息是一个个按照顺序进入bit数组,具体算法answer = (index - firstIndex) / BitArray.LONG_SIZE,其中:
    BitArray.LONG_SIZE是每个bit数组的大小。
    Index是消息的编号。(它是按照+1顺序增加的)
    firstIndex是整个LRU队列的首Index,这个值会经常变化,因为当达到LRU的上限时,老的一批就被清除了,firstIndex += BitArray.LONG_SIZE(出问题处)

    l
    消息在文件中的存储机制
    存放在文件中的消息,它们是按照如下方式进行组织的:
    Queue消息存储.JPG
    2009-8-11 09:22 上传
    下载附件 (9.63 KB)


    每个消息都知道它的上一个和下一个消息,当它自身被删除后,相应的关系会进行调整。

    问题原因分析:

    因为 activeMQ在编码实现的时候,原本的想法应该是这样的:
    1.
    从生产者接收消息,如果Queue有可用的内存就放在内存中,没有则存入文件中。

    2.
    Queue发送消息给消费者时,先发送已经保存在内存中的消息。

    3.
    当内存中消息发送完后,顺序读入(这里是关键)文件中的消息,通过消息的审查机制,确认不是重复消息,则放入内存中供后续操作使用。

    但是 activeMQ5.1版本的实现,问题就出在第三步的顺序读入。因为从文件中读入它有个先决条件,那就是必须要有可用的内存,如果没有可用的话,就放弃本次消息读入,并且应该放弃这次读取操作。但是5.1版本是继续往下读,这就导致顺序错乱,使得当内存可用的时候,读入的消息在进行审查的时候,发生错误,错误认为它们是重复消息。这就导致发送20W条消息,不能保证完全收到。

    解决方案:
    KahaReferenceStore的方法recoverNextMessages里的
    if (entry != null) {

    int count = 0;


    do {


    ReferenceRecord msg = messageContainer.getValue(entry);


    if (msg != null ) {



    if ( recoverReference(listener, msg)) {


    count++;


    lastBatchId = msg.getMessageId();


    }


    } else {


    lastBatchId = null;



    }


    batchEntry = entry;


    entry = messageContainer.getNext(entry);


    } while (entry != null && count < maxReturned && listener.hasSpace());


    }


    改为

          if (entry != null) {



    int count = 0;


    do {


    ReferenceRecord msg = messageContainer.getValue(entry);


    testTheNextMsgId(msg.getMessageId().toString());


    if (msg != null )


    {



    if ( recoverReference(listener, msg))


    {


    count++;


    lastBatchId = msg.getMessageId();


    batchEntry = entry;


    entry = messageContainer.getNext(entry);


    }


    else


    {



    break;


    }


    }


    else


    {



    lastBatchId =
    null;


    batchEntry = entry;


    entry = messageContainer.getNext(entry);


    }


    }
    while (entry != null && count < maxReturned && listener.hasSpace());



    }



    activeMQ 指南针计划的结束,但它又是个新开始,我们通过这个计划收获了我们想要的东西了,同时我们不仅为各位朋友答疑解疑,也提供了 activemqSpanner 这个工具作为消息网络拓扑图工具。再一次感谢各位朋友对我们的信任。
    现在,我们正式启动 activeMQ 笑脸计划。它的目的不再是给大家提供解决问题的方向,而是直接解决大家碰到的各种问题,给大家带去笑脸。它将是一个长期坚持的事情,任何关于 activeMQ 使用过程的疑惑、问题、 bug 、功能改进,都可以在这个计划里交流。所有在笑脸计划中提出的问题、功能改进、解决方案,都将完全通过网络无偿分享给所有人。在接触activeMQ的这一段时间里,我们还是保持开始对它的态度,它是个优秀的开源消息中间件。消息中间件是个非常重要的搭建企业应用系统的重要组件,我们在不断深入分析activeMQ的过程中,发现直到5.1这个版本,都还是存在不少问题,有些是很致命,但正因为如此,我们更加坚定了要全面掌握activeMQ,我们不想重新做“轮子”,但我们要具备在轮子坏了或不好用的情况下,要能独立解决碰到的这些问题。下面我们通过分析网友提出的一个典型的问题场景,来作为我们指南针计划的结束。
    Queue作为activeMQ里面一个很重要的通讯方式,网友的场景如下:
    测试queue持久化消息时,发送接收20W条消息。打开消息消费者,连上再断开,反复进行这步操作,能接收到消息,接收端有时候会阻塞,但不能完全接收完20W条消息。(其实5000条就会发生问题,不用20W这么多)
           相关背景知识:
           因为这是5.1版本的一个非常严重的bug,所以我们会比较详细的进行分析。(我们在最终解决问题后,上activeMQ官网上发现它最新的源码是解决了该问题的,但这并不影响这个问题的典型性)。下面我们将从3个方面来分析:Queue消息的接收和发送、内存使用机制、消息的审查(audit)、消息在文件中的存储机制。
    l
    Queue消息的接收和发送
    queue接收消息.JPG
    2009-8-11 09:22 上传
    下载附件 (21.81 KB)


    Queue接收消息并发给需要的消费者,具体过程如下:
    1.
    Queue
    从消息生产者接收消息。
    2.
    Queue
    使用一个“存储指针”来接收这些消息。当内存有空闲区域时,“存储指针”把消息放到内存中,当内存不够时,则把消息们存入磁盘文件。
    3.
    当有活动的(active)的消息消费者时,Queue会首先把“存储指针”的内存中的消息送给消费者,当内存的消息被消费掉,则从磁盘文件中再读入其他的消息(出问题处),直至消息都被消费掉了。
    其中最关键的方法是Queue类里的doPageIn()

    l
    内存使用机制
    activeMQ为了适应企业级的365*24的使用,在内存使用方面非常慎重,任何消息只有在内存里有空闲区域时,才能放到内存里,之后才能发给消费者。当消息被消费者消耗掉了后,确认信息会发给activeMQQueue接收到这些确认消息后,会把那些被确认的消息所占用的内存释放掉。

    l
    消息的审查(audit)
    为了防止消息的重复发送,activeMQ采用了一个审查机制,它负责审查某条消息是否重复。它是一个最近最久未使用算法(LRU)队列。每个队列元素它是一个bit数组,它的运行机制如下所示:
    ActiveMQMessageAudit处理细节.JPG
    2009-8-11 09:22 上传
    下载附件 (15.24 KB)



           消息是一个个按照顺序进入bit数组,具体算法answer = (index - firstIndex) / BitArray.LONG_SIZE,其中:
    BitArray.LONG_SIZE是每个bit数组的大小。
    Index是消息的编号。(它是按照+1顺序增加的)
    firstIndex是整个LRU队列的首Index,这个值会经常变化,因为当达到LRU的上限时,老的一批就被清除了,firstIndex += BitArray.LONG_SIZE(出问题处)

    l
    消息在文件中的存储机制
    存放在文件中的消息,它们是按照如下方式进行组织的:
    Queue消息存储.JPG
    2009-8-11 09:22 上传
    下载附件 (9.63 KB)


    每个消息都知道它的上一个和下一个消息,当它自身被删除后,相应的关系会进行调整。

    问题原因分析:

    因为activeMQ在编码实现的时候,原本的想法应该是这样的:
    1.
    从生产者接收消息,如果Queue有可用的内存就放在内存中,没有则存入文件中。

    2.
    Queue发送消息给消费者时,先发送已经保存在内存中的消息。

    3.
    当内存中消息发送完后,顺序读入(这里是关键)文件中的消息,通过消息的审查机制,确认不是重复消息,则放入内存中供后续操作使用。

    但是activeMQ5.1版本的实现,问题就出在第三步的顺序读入。因为从文件中读入它有个先决条件,那就是必须要有可用的内存,如果没有可用的话,就放弃本次消息读入,并且应该放弃这次读取操作。但是5.1版本是继续往下读,这就导致顺序错乱,使得当内存可用的时候,读入的消息在进行审查的时候,发生错误,错误认为它们是重复消息。这就导致发送20W条消息,不能保证完全收到。

    解决方案:
    KahaReferenceStore的方法recoverNextMessages里的
    if (entry != null) {

    int count = 0;


    do {


    ReferenceRecord msg = messageContainer.getValue(entry);


    if (msg != null ) {



    if ( recoverReference(listener, msg)) {


    count++;


    lastBatchId = msg.getMessageId();


    }


    } else {


    lastBatchId = null;



    }


    batchEntry = entry;


    entry = messageContainer.getNext(entry);


    } while (entry != null && count < maxReturned && listener.hasSpace());


    }


    改为

          if (entry != null) {



    int count = 0;


    do {


    ReferenceRecord msg = messageContainer.getValue(entry);


    testTheNextMsgId(msg.getMessageId().toString());


    if (msg != null )


    {



    if ( recoverReference(listener, msg))


    {


    count++;


    lastBatchId = msg.getMessageId();


    batchEntry = entry;


    entry = messageContainer.getNext(entry);


    }


    else


    {



    break;


    }


    }


    else


    {



    lastBatchId =
    null;


    batchEntry = entry;


    entry = messageContainer.getNext(entry);


    }


    }
    while (entry != null && count < maxReturned && listener.hasSpace());



    }



    activeMQ 指南针计划的结束,但它又是个新开始,我们通过这个计划收获了我们想要的东西了,同时我们不仅为各位朋友答疑解疑,也提供了 activemqSpanner 这个工具作为消息网络拓扑图工具。再一次感谢各位朋友对我们的信任。
    现在,我们正式启动 activeMQ 笑脸计划。它的目的不再是给大家提供解决问题的方向,而是直接解决大家碰到的各种问题,给大家带去笑脸。它将是一个长期坚持的事情,任何关于 activeMQ 使用过程的疑惑、问题、 bug 、功能改进,都可以在这个计划里交流。所有在笑脸计划中提出的问题、功能改进、解决方案,都将完全通过网络无偿分享给所有人。
    展开全文
  • 何为MQ以及为何要用MQ

    2020-07-16 18:13:25
    MQ是一直存在,不过随着分布式/微服务架构的流行,成了解决微服务之间问题的常用工具。 应用解耦 假设有系统A、B、C。 A系统要与B、C系统交互,调用B、C系统,假设没有使用MQ。使用直连调用的话。 A、B、C系统必须...
  • MQ服务

    2019-07-29 20:06:00
    MQ (IBM MQMQ传递主干,在世界屡获殊荣。 它帮您搭建企业服务总线(ESB)的基础传输层。IBM WebSphere MQ为SOA提供可靠的消息传递。它为经过验证的消息传递主干, 全方位、 多用途的数据传输, 并帮助您搭建...
  • 吃透 MQ

    2021-06-01 22:26:59
    点击上方蓝色“方志朋”,选择“设为星标” 回复“666”获取独家整理的学习资料! 本文主要讲解 MQ 的通用知识,让大家先弄明白:如果让你来设计一个 MQ,该如何下手?需要考虑哪些问题...
  • mq jms

    2017-04-13 00:08:11
    MQ、JMS以及ActiveMQ ms 的一个标准或者说是一个协议. 通常用于企业级应用的消息传递. 主要有topic 消息(1 对多), queue 消息(1对1). activemq 是一个jms 的实现, apache 出的. 另外还其它的实现...
  • 0MQ

    万次阅读 2017-05-30 22:18:09
    ØMQ - The Guide Table of Contents By Pieter Hintjens, CEO of iMatix Please use the issue tracker for all comments and errata(勘误表). This version covers the latest stable(稳定的) relea
  • 关于IBM MQ

    2019-02-21 09:13:09
    消息队列+发送队列+消息通道 接收通道名称与发送端的发送通道名称要一致,修改通道信息后要执行 start channle(chlname...常用的MQ命令  66.0.42.240 用户 mqm/mqm 88.0.52.40 mq队列服务器:mqm/1qaz2wsx 二代:8...
  • 配置mq

    2016-11-16 18:36:00
    mq的实现可以是apache的,也可以是ibm的,配置不同的地方是connectionFactory和queue和topic应用的包不同 <!-- 配置链接器,注入apache的实现 --> <bean id="connectionFactory" class="org.spring...
  • MQ消息队列简介

    2019-07-25 12:23:15
    MQ生产者者往消息队列中写消息,消费可以读取队列中的消息。 下一代编程 : 响应式编程 spring boot 2.2提供非阻塞 RSocket协议 spring-boot-starter-data-redis-reactive, spring-boot-starter-dat...
  • MQ入门简介

    千次阅读 2015-12-02 14:52:52
    MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过...
  • 浅谈MQ

    2019-10-02 14:40:13
    MQ被程序员叫做消息中间件,字面意思用来处理处于两方中间消息的一个工具,异步RPC(远程过程调用)。 yes/no 优点:以高并发、高流量系统为出发点,如果数据量并不大使用MQ反而带来后期维护的困难等问题。 ...
  • MQ企业架构

    2014-03-17 15:20:08
     Websphere MQ 是 IBM 功能强大的消息传送中间件产品,它以其成熟的技术和世界领先的产品向我们提供了的功能丰富、可靠易用的异构平台间实现可靠信息传递的成熟解决方案。使用 MQ 消息传递产品可以帮助业务应用在...
  • ActivityMQ安装部署

    千次阅读 2018-02-27 19:36:28
    尤其是对于为银行等金融机构开发的同事,我们知道监管机构使用的MQ队列都是IBM的供应商,如果开发时使用Rabbit或者Rocket南面上线或者生产环境下会有一些不一样,所以ActivityMQ还是一款比较常用的MQ工具。...
  • 一、MQ简介及特点 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是...
  • MQ消息

    万次阅读 2015-03-30 23:40:48
    我们再进一步抽象,用户业务就是消息的"生产者",它将消息发布到消息管理器。邮件业务就是 消息的"消费者",它将收到的消息进行处理。邮局可以订阅很多种杂志,杂志都是通过某种编号来区分;消息管理器也可以管理...
  • 分布式RPC和MQ

    2019-08-09 10:31:24
    分布式系统(distributed system)是建立在网络之上的软件...分布式消息队列(MQ) 为什么使用 MQ? 异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。 应用解耦 - 系统间通过消息通信,不用关心其他系...
  • Rabbit MQ 入门

    千次阅读 2020-06-18 14:15:23
    Rabbit MQ是一个通用的消息中间件,支持AMQP,STOMP,MQTT等多种协议 安装# 在mac OSX下可以使用如下命令来安装 rabbitmq brew install rabbitmq 基本命令# ls -al ~/rabbitmq/3.7.14/sbin/ total 1104 ...
  • 关于MQ那些事

    2019-03-14 14:07:27
    那么消息队列MQ有什么套路呢?(这个话题转换生硬度连我自己都怕!) 使用消息队列场景和好处 使用消息队列会带来什么问题,有什么解决方案 如何使用MQ(以ActiveMQ为例) 1.消息队列的应用场景和好处: 异步-...
  • WebSphere MQ基础概念

    千次阅读 2016-10-19 09:21:55
    WebSphere MQ基础概念1.简介MQ是IBM开发的一款功能强大的消息中间件,通过通用的消息队列模型实现不同应用和系统之间的可靠数据通信,简化系统开发和集成,实现类似SOA的可重用架构。 IBM MQ提供消息封装和消息排队...
  • MQ消息中间件技术

    万次阅读 多人点赞 2016-03-30 14:46:14
    消息是MQ中最小的概念,本质上就是一段数据,它能被一个或者多个应用程序所理解,是应用程序之间传递的信息载体。
  • WEBSPHERE MQ实践

    2019-04-11 12:03:05
    WEBSPHERE MQ实践 (由于本文格式已经错乱,请在http://www.itpub.net/thread-1175076-1-1.html下载相关doc文件及附件) 最近以来,工作中一直在使用WebSphere的 ...
  • 消息队列 MQ

    2018-01-31 16:20:47
    消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件,产品基于高可用分布式集群技术,提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息...
  • mq系列rabbitmq

    2018-01-15 08:39:46
    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 13,809
精华内容 5,523
关键字:

mq生产工具类