消息中间件 订阅
可与OA、ERP集成的免费消息中间件Active Messenger(简称AM)是一款非常实用的企业即时通讯软件。系统提供免费的消息中间件(以com组件的方式提供),开放给第三方程序使用。 展开全文
可与OA、ERP集成的免费消息中间件Active Messenger(简称AM)是一款非常实用的企业即时通讯软件。系统提供免费的消息中间件(以com组件的方式提供),开放给第三方程序使用。
信息
背    景
国内信息化建设的日益深入
外文名
Active Messenger
中文名
消息中间件
属    性
企业即时通讯软件
消息中间件简介
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
收起全文
精华内容
参与话题
问答
  • 消息中间件RabbitMQ

    2020-06-22 16:22:39
    当下主流的消息中间件RabbitMQ、Kafka、ActiveMQ、RocketMQ等。 2、作用 1、消息中间件主要作用 冗余(存储) 扩展性 可恢复性 缓冲 异步通信 削峰 :消息队列中的常用场景,一般在秒杀或抢够活动中使用广泛。一般...

    RabbitMQ

    1、消息中间件

    1、简介

    **消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。**通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。

    当下主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。

    2、作用

    1、消息中间件主要作用
    • 冗余(存储)
    • 扩展性
    • 可恢复性
    • 缓冲
    • 异步通信
    • 削峰 :消息队列中的常用场景,一般在秒杀或抢够活动中使用广泛。一般会因为流量过大,应用系统配置承载不了这股瞬间流量,导致系统直接挂掉,即传说中的“宕机”现象。为解决这个问题,我们会将那股巨大的流量拒在系统的上层,即将其转移至 MQ 而不直接涌入我们的接口。
    • 解耦
    2、消息中间件的两种模式
    1、P2P模式

    P2P模式包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。

    P2P的特点:(点对点:Queue,不可重复消费)

    • 每个消息只有一个消费者(Consumer),即一旦被消费,消息就不再在消息队列中
    • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行它不会影响到消息被发送到队列
    • 接收者在成功接收消息之后需向队列应答成功
    • 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式
    2、Pub/Sub模式(发布/订阅:Topic,可以重复消费)

    Pub/Sub模式包含三个角色:主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

    Pub/Sub的特点:

    • 每个消息可以有多个消费者
    • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
    • 为了消费消息,订阅者必须保持运行的状态
    • 如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型
    3、常用中间件介绍与对比
    1、Kafka

    Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量日志数据的互联网服务的数据收集业务。

    2、RabbitMQ

    RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

    Erlang是一种通用的面向并发的编程语言

    3、RocketMQ

    RocketMQ是阿里开源的消息中间件,**它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。**它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、消息推送、日志流式处理、binglog分发等场景。


    RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

    RabbitMQ集群

    RabbiMQ简介

    RabbiMQ是Erlang开发的,集群非常方便,因为Erlang天生就是分布式语言,但其本身并不支持负载均衡,支持高并发,支持可扩展。支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    Ajax 即“Asynchronous Javascript And XML”(异步 JavaScript 和 XML),是指一种创建交互式网页应用的网页开发技术。Ajax = 异步 JavaScript 和 XML

    2、RabbitMQ 特点

    • 可靠性

    • 扩展性

    • 高可用性

    • 多种协议

    • 多语言客户端

    • 管理界面

    • 插件机制

      3、什么是消息队列

      MQ 全称为Message Queue, 消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。

      消息传递指的是程序之间通过在消息中发送数据进行通信。

    RabbitMQ模式

    注意:RabbitMQ模式大概分为以下三种:

    (1)单机模式。

    (2)普通模式(默认的集群模式)。

    (3) 镜像模式(把需要的队列做成镜像队列,存在于多个节点,属于RabbiMQ的HA方案,在对业务可靠性要求较高的场合中比较适合)。要实现镜像模式,需要先搭建出普通集群模式,在这个模式的基础上再配置镜像模式以实现高可用。

    了解集群中的基本概念:

    RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。

    一个rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制。

    Broker:消息队列服务器实体
    ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
    Exchange(交换器):用于接受、分配消息;
    Routing Key:路由关键字,exchange根据这个关键字进行消息投递;
    Queue(队列):用于存储生产者的消息;
    Bindding:绑定,把exchange和queue按照路由规则绑定起来。
    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离.
    producer:消息生产者,就是投递消息的程序。
    consumer:消息消费者,就是接受消息的程序。
    channel:(信道)消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。消息推送使用的通道;
    

    在这里插入图片描述

    面试注意:集群中有两种节点

    1 内存节点:只保存状态到内存(持久的queue的持久内容将被保存到disk)

    2 磁盘节点:保存状态到内存和磁盘。—推荐

    内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了

    如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。

    普通集群准备环境

    注意,三台服务器,RabbitMQ集群节点必须在同一网段,如果是跨域,效果会变差。关闭防火墙和selinux

    修改主机名称,添加解析

    三台机器都操作:

    1. 配置hosts文件更改三台MQ节点的计算机名分别为rabbitmq-1、rabbitmq-2 和rabbitmq-3,然后修改hosts配置件

      [root@rabbitmq-1 ~]# hostnamectl set-hostname rabbitmq-1
      [root@rabbitmq-1 ~]# vim /etc/hosts
      127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
      ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
      192.168.50.138 rabbitmq-1
      192.168.50.139 rabbitmq-2
      192.168.50.140 rabbitmq-3
      

      2.三个节点配置安装rabbitmq软件

    安装依赖
    [root@rabbitmq-1 ~]# yum install -y *epel* gcc-c++ unixODBC unixODBC-devel openssl-devel ncurses-devel
    yum安装erlang
    [root@rabbitmq-1 ~]# wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-20.3-1.el7.centos.x86_64.rpm/download.rpm
    [root@rabbitmq-1 ~]# yum install erlang-20.3-1.el7.centos.x86_64.rpm -y
    测试;
    [root@rabbitmq-1 ~]# erl
    Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]
    
    Eshell V9.3  (abort with ^G)
    1>
    
    安装rabbitmq
    [root@rabbitmq-1 ~]# wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.5/rabbitmq-server-3.7.5-1.el7.noarch.rpm
    [root@rabbitmq-1 ~]# yum install rabbitmq-server-3.7.5-1.el7.noarch.rpm -y
    
    3.启动
    [root@rabbitmq-1 ~]# systemctl daemon-reload
    [root@rabbitmq-1 ~]# systemctl start rabbitmq-server
    [root@rabbitmq-1 ~]# systemctl enable rabbitmq-server
    [root@rabbitmq-1 ~]# systemctl status rabbitmq-server
    启动方式二:
    [root@rabbitmq-1 ~]# /sbin/service rabbitmq-server status  ---查看状态
    [root@rabbitmq-1 ~]# /sbin/service rabbitmq-server start   ---启动
    每台都操作开启rabbitmq的web访问界面: 
    [root@rabbitmq-1 ~]# rabbitmq-plugins enable rabbitmq_management
    

    在这里插入图片描述

    创建用户

    注意:在一台机器操作
    添加用户和密码
    [root@rabbitmq-1 ~]# rabbitmqctl add_user soho soso
    Adding user "soho" ...
    设置为管理员
    [root@rabbitmq-1 ~]# rabbitmqctl set_user_tags soho administrator
    Setting tags for user "soho" to [administrator] ...
    查看用户
    [root@rabbitmq-1 ~]# rabbitmqctl list_users
    Listing users ...
    guest	[administrator]
    soho	[administrator]
    ...done.
    
    此处设置权限时注意'.*'之间需要有空格 三个'.*'分别代表了conf权限,read权限与write权限 例如:当没有给
    soho设置这三个权限前是没有权限查询队列,在ui界面也看不见
    [root@rabbitmq-1 ~]# rabbitmqctl set_permissions -p "/" soho ".*" ".*" ".*"
    Setting permissions for user "soho" in vhost "/" ...
    

    3台机器都操作:开启用户远程登录:

    [root@rabbitmq-1 ~]# cd /etc/rabbitmq/   
    [root@rabbitmq-1 rabbitmq]# cp /usr/share/doc/rabbitmq-server-3.7.5/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
    [root@rabbitmq-1 rabbitmq]# ls
    enabled_plugins  rabbitmq.config
    [root@rabbitmq-1 rabbitmq]# vim rabbitmq.config
    修改如下:
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tittFqJZ-1592813522458)(assets/1564157816348.png)]

    三台机器都操作重启服务服务:
    [root@rabbitmq-1 ~]# systemctl restart rabbitmq-server
    

    查看端口

    在这里插入图片描述

    4369 -- erlang端口
    5672 --程序连接端口
    15672 -- 管理界面ui端口
    25672 -- server间内部通信端口
    

    !注意如果是云服务器,切记添加安全组端口放行。

    访问:192.168.50.138:15672

    在这里插入图片描述

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NkIjj2V8-1592813522464)(assets/1564158056319.png)]

    这里需要注意:

    rabbitmq默认管理员用户:guest 密码:guest

    新添加的用户为:soho 密码:soso

    开始部署集群三台机器都操作:

    1.首先创建好数据存放目录和日志存放目录:

    3台机器都操作

    [root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/data
    [root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/logs
    [root@rabbitmq-1 ~]# chmod 777 -R /data/rabbitmq
    [root@rabbitmq-1 ~]# chown rabbitmq.rabbitmq /data/ -R
    创建配置文件:
    [root@rabbitmq-1 ~]# vim /etc/rabbitmq/rabbitmq-env.conf
    [root@rabbitmq-1 ~]# cat /etc/rabbitmq/rabbitmq-env.conf
    RABBITMQ_MNESIA_BASE=/data/rabbitmq/data
    RABBITMQ_LOG_BASE=/data/rabbitmq/logs
    重启服务
    [root@rabbitmq-1 ~]# systemctl restart rabbitmq-server
    

    2.拷erlang.cookie

    Rabbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群。Erlang的集群中

    各节点是经由各个cookie来实现的,这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中,文件是400的权限。所以必须保证各节点cookie一致,不然节点之间就无法通信.

    如果执行# rabbitmqctl stop_app 这条命令报错:需要执行

    #如果执行# rabbitmqctl stop_app 这条命令报错:需要执行
    #chmod 400 .erlang.cookie
    #chown rabbitmq.rabbitmq .erlang.cookie
    

    (官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。如果我们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。如果我们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq目录下。)

    [root@rabbitmq-1 ~]# cat /var/lib/rabbitmq/.erlang.cookie
    HOUCUGJDZYTFZDSWXTHJ
    scp的方式将rabbitmq-1节点的.erlang.cookie的值复制到其他两个节点中。
    [root@rabbitmq-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.50.139:/var/lib/rabbitmq/
    [root@rabbitmq-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.50.140:/var/lib/rabbitmq/
    

    3.将mq-2、mq-3作为内存节点加到mq-1节点集群中

    在mq-2、mq-3执行如下命令:
    [root@rabbitmq-2 ~]# rabbitmqctl stop_app  #停止节点,切记不是停止服务
    

    在这里插入图片描述

    [root@rabbitmq-2 ~]# rabbitmqctl reset   #如果有数据需要重置,没有则不用
    

    在这里插入图片描述

    注意查看回显,如果不是以上。就是错误;如果报错,重启rabbitmq服务

    将两个节点加入集群,指定角色
    [root@rabbitmq-2 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq-1  #添加到内存节点
    Clustering node 'rabbit@rabbitmq-2' with 'rabbit@rabbitmq-1' ...
    [root@rabbitmq-2 ~]# rabbitmqctl start_app  #启动节点
    Starting node 'rabbit@rabbitmq-2' ...
     completed with 3 plugins.
    ======================================================================
    [root@rabbitmq-3 ~]# rabbitmqctl stop_app
    Stopping node 'rabbit@rabbitmq-3' ...
    [root@rabbitmq-3 ~]# rabbitmqctl reset
    Resetting node 'rabbit@rabbitmq-3' ...
    [root@rabbitmq-3 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq-1
    Clustering node 'rabbit@rabbitmq-3' with 'rabbit@rabbitmq-1' ...
    [root@rabbitmq-3 ~]# rabbitmqctl start_app
    Starting node 'rabbit@rabbitmq-3' ...
     completed with 3 plugins.
    
    (1)默认rabbitmq启动后是磁盘节点,在这个cluster命令下,mq-2和mq-3是内存节点,
    mq-1是磁盘节点。
    (2)如果要使mq-2、mq-3都是磁盘节点,去掉--ram参数即可。
    (3)如果想要更改节点类型,可以使用命令rabbitmqctl change_cluster_node_type
    disc(ram),前提是必须停掉rabbitmq应用
    注:
    #如果有需要使用磁盘节点加入集群
     [root@rabbitmq-2 ~]# rabbitmqctl join_cluster  rabbit@rabbitmq-1
     [root@rabbitmq-3 ~]# rabbitmqctl join_cluster  rabbit@rabbitmq-1
    

    4.查看集群状态

    在 RabbitMQ 集群任意节点上执行 rabbitmqctl cluster_status来查看是否集群配置成功。
    在mq-1磁盘节点上面查看
    [root@rabbitmq-1 ~]# rabbitmqctl cluster_status
    

    在这里插入图片描述

    每台机器显示出三台节点,表示已经添加成功!
    

    5.登录rabbitmq web管理控制台,创建新的队列

    打开浏览器输入http://192.168.50.138:15672,

    输入默认的Username:guest

    输入默认的Password:guest

    登录后出现如图所示的界面。

    在这里插入图片描述
    根据界面提示创建一条队列

    在这里插入图片描述

    在这里插入图片描述

    RabbitMQ镜像集群配置

    上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,队列内容不会复制。如果队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

    镜像队列是基于普通的集群模式的。

    创建镜像集群:在任意一台机器操作

    rabbitmq set_policy :设置策略

    [root@rabbitmq-1 ~]# rabbitmqctl set_policy  ha-all "^" '{"ha-mode":"all"}'
    Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
    
     "^"匹配所有的队列, ha-all 策略名称为ha-all, '{"ha-mode":"all"}' 策略模式为 all 即复制到所有节点,包含新增节点。
    

    再次查看队列已经同步到其他两台节点:

    在这里插入图片描述

    "^"匹配所有的队列, ha-all 策略名称为ha-all, ‘{“ha-mode”:“all”}’ 策略模式为 all 即复制到所有节点,包含新增节点。

    设置策略介绍:
    rabbitmqctl set_policy [-p Vhost] Name Pattern Definition
    -p Vhost: 可选参数,针对指定vhost下的queue进行设置
    Name: policy的名称,可以定义
    Pattern: queue的匹配模式(正则表达式),也就是说会匹配一组。
    Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
        ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
            all:表示在集群中所有的节点上进行镜像
            exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
            nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
        ha-params:ha-mode模式需要用到的参数
        ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
    案例:
    例如,对队列名称以hello开头的所有队列进行镜像,并在集群的两个节点上完成镜像,policy的设置命令为: 
    rabbitmqctl set_policy hello-ha “^hello” ‘{“ha-mode”:”exactly”,”ha-params”:2,”ha-sync-mode”:”automatic”}

    则此时镜像队列设置成功。

    已经部署完成

    将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一致。

    三、常见问题

    常见错误:

    1、使用 rabbitmq-server -detached命令启动rabbitmq时,出现以下提示Warning: PID file not written; -detached was passed,此时使用rabbitmqctl status提示服务已启动,可知此问题不用解决。

    2、由于更改hostname文件,在每次rabbitmqctl stop或者rabbitmqctl cluster_status等,只要是rabbitmq的命令就报错,提示大概如下

    Cluster status of node rabbit@web2 ...
    Error: unable to connect to node rabbit@web2: nodedown
    
    DIAGNOSTICS
    ===========
    
    attempted to contact: [rabbit@web2]
    
    rabbit@web2:
      * connected to epmd (port 4369) on web2
      * epmd reports node 'rabbit' running on port 25672
      * TCP connection succeeded but Erlang distribution failed
    
      * Hostname mismatch: node "rabbit@mq2" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@mq2"
    
    
    current node details:
    - node name: 'rabbitmq-cli-11@web2'
    - home dir: /root
    - cookie hash: SGwxMdJ3PjEXG1asIEFpBg==
    

    此时先ps aux | grep mq,然后kill -9 该进程,然后再rabbitmq-server -detached即可解决。(即先强杀,再重新启动)

    3、使用rabbitmqctl stoprabbitmq-server -detached重新启动后,原先添加的用户admin、虚拟主机coresystem等均丢失,还需要重新添加。

    展开全文
  • Spring Boot 整合消息中间件 RabbitMQ

    万次阅读 2017-02-16 16:03:12
    RabbitMQ消息中间件的一种,实现了 AMQP 标准。消息中间件的工作过程可以用生产者-消费者模型来表示。生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的...

    RabbitMQ 是消息中间件的一种,实现了 AMQP 标准。消息中间件的工作过程可以用生产者-消费者模型来表示。生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理。消息队列常用于分布式系统之间互相信息的传递。

    对于 RabbitMQ 来说,除了生产者、消息队列、消费者这三个基本模块以外,还添加了 交换机 (Exchange) 模块。它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列。

    交换机的主要作用是接收相应的消息并且绑定到指定的队列,有三种类型的交换机:

    交换机 说明
    direct 默认的交换机 (一对一)。即创建消息队列的时候,指定一个BindingKey,当生产者发送消息的时候,指定对应的Key,当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中。
    fanout 路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略。生产者其实仅关注Exchange与Route Key, 消费者仅关注Queue
    topic Topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中。

    RabbitMQ交换机

    使用 RabbitMQ 需要添加 Maven 起步依赖:

    <!-- rabbitmq -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    一些核心类如下:

    org.springframework.amqp.core.Queue: 队列
    org.springframework.amqp.core.Binding: 建立交换机与队列的绑定关系
    org.springframework.amqp.core.DirectExchange: Direct交换机
    org.springframework.amqp.core.TopicExchange: Topic交换机
    org.springframework.amqp.core.FanoutExchange: Fanout交换机
    org.springframework.amqp.support.converter.MessageConverter: 消息转换器, 如将Java类转换JSON类型发送至Broker, 从Broker处获取JSON消息转换为Java类型
    org.springframework.amqp.core.AmqpTemplate: 多用于生产者端发布消息
    org.springframework.amqp.core.AmqpAdmin: 用于Exchange, Queue等的动态管理
    

    然后通过 application.properties 中的 spring.rabbitmq.* 前缀配置属性(生产者、消费者应用都一样):

    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/admin
    

    下面看一下在三种类型的交换机下的不同实现。

    1.direct交换机

    在 JavaConfig 中注册 bean(生产者和消费者的是一样的,因为监听的是同一个队列,所以队列名要先约定好):

    @Configuration
    public class RabbitConfiguration {
    	//队列名
        public static final String TRADE_QUEUE = "funds";
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        @Bean
        public Queue queue() {
            return new Queue(TRADE_QUEUE);
        }
    }
    

    1.生产者生产消息

    在 SpringBoot 中,我们使用 AmqpTemplate 去发送消息(调用 send 方法即发送):

    @Component
    public class HelloProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
        public void send() {
            amqpTemplate.convertAndSend(RabbitConfiguration.TRADE_QUEUE, "Hello, Rabbit!");
        }
    }
    

    2.消费者消费消息

    配置监听器监听指定的 Queue,当消息队列有消息的时候予以接收:

    @Component
    public class HelloConsumer {
        @RabbitListener(queues = {RabbitConfiguration.TRADE_QUEUE})
        public void processBootTask(String content) {
            System.out.println(content);
        }
    }
    

    2.fanout交换机

    要使用 fanout 交换机的话,那么生产者与消费者的配置就不一样了。

    1.生产者生产消息

    在 JavaConfig 中注册 bean:

    @Configuration
    public class RabbitConfiguration {
        public static final String DEFAULT_FANOUT_EXCHANGE = "admin.fanout";
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(DEFAULT_FANOUT_EXCHANGE);
        }
    }
    

    在 SpringBoot 中,我们使用 AmqpTemplate 去发送消息:

    @Component
    public class HelloProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
        public void send() {
            //参数一:交换机名称,参数二:发送的key,参数三:内容
            amqpTemplate.convertAndSend(RabbitConfiguration.DEFAULT_FANOUT_EXCHANGE, "", "Hello, Rabbit!");
        }
    }
    

    2.消费者消费消息

    在 JavaConfig 中注册 bean:

    @Configuration
    public class RabbitConfiguration {
        public static final String DEFAULT_FANOUT_EXCHANGE = "admin.fanout";
        public static final String FANOUT_QUEUE = "admin-" + UUID.randomUUID();
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        //配置广播路由器
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(DEFAULT_FANOUT_EXCHANGE);
        }
        //配置临时队列
        @Bean
        public Queue randomQueue() {
            return new Queue(FANOUT_QUEUE);
        }
        @Bean
        public Binding bindingExchange() {
            return BindingBuilder.bind(randomQueue()).to(fanoutExchange());
        }
    }
    

    配置监听器,监听 Queue,当消息队列有消息时,监听器就会接收到消息(也可配置多个队列接收):

    @Component
    public class HelloConsumer {
        @RabbitListener(queues = "#{rabbitConfiguration.FANOUT_QUEUE}")
        public void processBootTask(String content) {
            System.out.println(content);
        }
    }
    

    3.topic交换机

    使用 topic 交换机,生产者与消费者的配置也不一样。

    1.生产者生产消息

    在 JavaConfig 中注册 bean:

    @Configuration
    public class RabbitConfiguration {
        public static final String DEFAULT_TOPIC_EXCHANGE = "admin.topic";
        public static final String TOPIC_ROUTE_KEY = "A.B.C";
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(DEFAULT_TOPIC_EXCHANGE);
        }
    }
    

    在 SpringBoot 中,我们使用 AmqpTemplate 去发送消息:

    @Component
    public class HelloProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
        public void send() {
    	    //参数一:交换机名称,参数二:发送的key,参数三:内容
            amqpTemplate.convertAndSend(RabbitConfiguration.DEFAULT_TOPIC_EXCHANGE, RabbitConfiguration.TOPIC_ROUTE_KEY, "Hello, Rabbit!");
        }
    }
    

    RabbitMQ 将会根据参数二去寻找有没有匹配此规则的队列,如果有则把消息给它,如果有且不止一个,则把消息分发给匹配的队列 (每个队列都有消息)。

    2.消费者消费消息

    在 JavaConfig 中注册 bean:

    @Configuration
    public class RabbitConfiguration {
        public static final String DEFAULT_TOPIC_EXCHANGE = "admin.topic";
        public static final String TOPIC_QUEUE = "admin-" + UUID.randomUUID();
        //*表示一个词,#表示零个或多个词
        public static final String TOPIC_ROUTE_KEY = "#.#";
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        //配置主题路由器
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(DEFAULT_TOPIC_EXCHANGE);
        }
        //配置临时队列
        @Bean
        public Queue randomQueue() {
            return new Queue(TOPIC_QUEUE);
        }
        @Bean
        public Binding bindingExchange() {
            return BindingBuilder.bind(randomQueue()).to(topicExchange()).with(TOPIC_ROUTE_KEY);
        }
    }
    

    配置监听器,监听 Queue,当消息队列匹配此规则时,监听器就会接收到消息:

    @Component
    public class HelloConsumer {
        @RabbitListener(queues = "#{rabbitConfiguration.TOPIC_QUEUE}")
        public void processBootTask(String content) {
            System.out.println(content);
        }
    }
    

    生产端指定 Route Key 为 A.B.C, 下面是消费端绑定 Route Key 的不同情况:

    消费端绑定的Route Key 是否匹配
    A.B.C Yes
    # Yes
    A.# Yes
    . No
    A.* No
    A.B.* Yes
    A.*.C Yes
    展开全文
  • 消息中间件RabbitMQ入门消息中间件RabbitMQ松耦合架构的优势与传统RPC的区别如何选择消息中间件RabbitMQ属性简介 消息中间件RabbitMQ 消息中间件有时候又被人们成为消息队列,目前基本没有针对于消息中间件标准的...

    消息中间件RabbitMQ

    消息中间件有时候又被人们成为消息队列,目前基本没有针对于消息中间件标准的定义。一般将消息中间件认为是松耦合的分布式系统中的一个子系统,关注于数据的发送与接收,利用高效可靠的异步消息传递机制对分布式各个子系统进行解耦和集成。

    松耦合架构的优势

    1.解耦
    2.异步处理能力
    3.缓冲能力 这一点在秒杀时特别常用,削峰填谷
    4.伸缩性 加MQ机器与减MQ的机器来适应实际的并发量
    5.扩展性 增加功能时已开发完毕的功能不需要进行改动

    与传统RPC的区别

    RPC,远程过程调用。dubbo(TCP)与SpringCloud(HTTP) 是较为常用的RPC框架,相比起中间件,RPC与中间件最大的区别就是RPC是同步的,对被调用方有很强的依赖性,而对于消息中间件来说,不需要知道消息的接受方是谁,只需要往队列里面发送消息就可以了,自然有消费者来对消息进行消费。

    如何选择消息中间件

    目前常用的消息中间件大概有,ActiveMQ,RabbitMQ,RocketMQ和kafka,下表对其有个简单的比较

    ActiveMQ RabbitMQ RocketMQ kafka
    性能(单机) 6000+ 12000+ 十万级 百万级
    消息持久化 支持 支持 支持 支持
    多语言支持 支持 支持 支持

    根据自己项目的需要,个人认为,
    1.ActiveMQ,RabbitMQ:在并发量较小时,可以选用ActiveMQ,RabbitMQ,对于两者来说ActiveMQ是基于java的,而RabbitMQ是基于Erlang的,如果想要做一些插件开发又没有Erlang开发的能力,可以选择ActiveMQ。
    2.RocketMQ:是阿里社区开源的一款消息中间件,较其他几种中间件来说的话,诞生的稍微晚一点,但正因为晚也弥补的其他消息中间件的一些缺陷,在使用上会很方便。
    3.Kafka: 性能很强大,使用非常简单,在并发量很高。在需要大数据,流计算等需求时可以考虑,当然有的公司目前在记录日志时也直接使用了Kafaka,不过Kafka维护起来较为麻烦一些,偶尔会出现数据混乱的情况,对ZooKeeper有较强的依赖性。

    RabbitMQ属性简介

    RabbitMQ 使用了AMQP协议(Advanced Message Queuing Protocol),是应用层的标准高级消息队列协议,RabbitMQ的架构图如下图所示:
    在这里插入图片描述
    在RabbitMQ中有如下几种要素:
    1.生产者。可以将其理解为传统开发模式上的服务的调用方。
    2.消费者。可以将其理解为传统开发模式上的服务的被调用方,从队列中获取消息进行消费。
    3.消息:在RabbitMQ中,消息就是有效载荷,说的通俗一点就是我们需要传递的数据,比如说订单系统下单后库存系统需要扣减商品数量,就需要将订单编号还有商品编号商品数量等一些数据传递给库存系统,这些数据就是所谓的消息,当然消息里面中还有一些描述这些数据的标签。
    4.路由键:可以将其理解为一个访问地址,通过这一地址找到相应的队列
    5.交换器:可以近似理解为一个路由器,之所以能够通过路由键找到队列,就是因为队列与路由键做了绑定,并在交换区中进行了注册。
    6:队列:用与存放生产者生产的消息,等待消费者消费
    7:虚拟主机:虚拟主机(vhost)在RabbitMQ中间是一个很重要的概念,每一个虚拟主机都是互相独立的,类似于tomcat(RabbitMQ)下面有很多个war(vhost)包,每个war包下的java类都是独立的

    RabbitMQ有direct、fanout、topic、header四种交换器。
    direct交换器:需要路由键完全匹配,可以用sql查询时的“=”来理解,如果队列的路由键绑定为abc,那生产者与消费者在对消息进行操作时,必须告诉交换器我操作的是abc这一个队列,传入ab或abcd等其他不能匹配的都不能找到正确的队列
    fanout交换器:属于一种广播模式的交换器,意味着只要生产者生产了消息,在fanout交换器绑定的队列中都将进行消息的入队操作.
    topic交换器: 主题交换器,较前两种交换器来说要复杂很多,有通配符*和#两种,如 a.*和a.#*.a和#.a,具体的匹配规则这里暂时陈述了,网上有大量的文章来解释。
    header交换器:与direct交换器基本相同,使用的极少。

    消息中间件和RabbitMQ的简单介绍暂时就到这里,如果需要安装RabbitMQ的朋友可以看看我这一篇文章
    有很详细的安装教程https://editor.csdn.net/md/?articleId=103858910

    有需要学习rabbitmq原生编程的朋友可以看看我github上的示例代码https://github.com/denghang96/RabbitMQ

    展开全文
  • 简单说说消息中间件RabbitMQ(上)

    千次阅读 热门讨论 2018-10-05 21:30:20
    准备工作
    MQ和RabbitMQ

    MQ,消息队列,消息以管道的方式进行传递。其就像个“快递员”,将消息从生产者送到消费者处。RabbitMQ是消息队列中的一种。
    消息队列适用于异步返回执行时间长,且消息发送者不多关注执行结果的情形。

    中间件—消息中间件—RabbitMQ

    在这里插入图片描述
    中间件是将具体业务和底层逻辑解耦的软件,分为:MOM(消息中间件)、RPC(远程过程调用中间件)、UDA(数据访问中间件)、TPM(交易中间件)等。
    消息中间件,又称为消息队列、消息队列中间件,分为RabbitMQ、ActiveMQ、Kafka等。

    原理解析

    在这里插入图片描述
    各部分的名字和作用?
    1.Brocker:消息队列服务器实体,Rabbitmq可以作为一个选择。
    2.Exchange:消息交换机,用于接收、分配消息。指定消息按什么规则,路由到哪个队列。
    3.Queue:消息队列,用于存储生产者的消息。每个消息都会被投入到一个或者多个队列里。
    4.Binding Key:绑定关键字,用于把交换器的消息绑定到队列中,它的作用是把exchange和queue按照路由规则binding起来。
    5.Routing Key:路由关键字,用于把生产者的数据分配到交换器上。exchange根据这个关键字进行消息投递。
    6.Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不用用户的权限分离。
    7.Producer:消息生产者,就是投递消息的程序。
    8.Consumer:消息消费者,就是接受消息的程序。
    9.Channel:信道,消息推送使用的通道。可建立多个channel,每个channel代表一个会话任务。

    使用流程?
    1.消息接收客户端连接到消息队列服务器,打开一个channel。
    2.客户端声明一个exchange,并设置相关属性。
    3.客户端声明一个queue,并设置相关属性。
    4.客户端使用routing key,在exchange和queue之间建立好绑定关系。
    5.消息发布客户端投递消息到exchange。
    6.exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
    总结起来就是:生产者发送一条消息给交换机——交换机根据关键字匹配到对应的队列——将消息存入队列——消费者从队列中取出消息使用。

    why?

    我们的项目为什么要选择RabbitMQ,而不选择别的消息中间件?
    1.除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
    2.可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
    3.高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
    4.集群部署简单,正是因为Erlang使得RabbitMQ集群部署变的超级简单;
    5.社区活跃度高,根据网上资料来看,RabbitMQ也是首选;

    5种队列模型

    在官网中有明确的说明:http://www.rabbitmq.com/getstarted.html
    1.简单队列模式:一个生产者对应一个消费者。
    在这里插入图片描述

    2.工作队列模式:一个生产者产生的消息可以供多个消费者消费,但是一个消息只能被其中一个消费者消费。
    在这里插入图片描述

    3.发布/订阅模式:多了一个交换机,生产者将消息发送到交换机上,交换机发送消息给各个队列,此时,一个消息可以被多个消费者获取。
    值得一提的是,这又叫广播模式,是最常用的模式了,在ITOO中使用的就是这种模式。
    在这里插入图片描述

    4.路由模式:又多了一个routing key,生产者发送消息带有routingkey,消费者选择自己需要的消息进行消费,也配置一个routing key。
    在这里插入图片描述

    5.主题模式:又多了一个通配符,这样消费端如果需要好几种消息的时候,不用一个一个的设置,直接用通配符可以接收自己想要的各种消息。
    在这里插入图片描述

    6.RPC:c对s说“我这有个任务需要你的帮助”,s处理完后,将结果返回给c。
    在这里插入图片描述

    展开全文
  • 消息中间件介绍 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,...
  • 利用springAMQP实现异步消息队列的日志管理 摘要: 经过前段时间的学习和铺垫,已经对spring amqp有了大概的了解。俗话说学以致用,今天就利用springAMQP来完成一个日志管理模块。大概的需求是这样的:系统中有很多...
  • 消息中间件 rabbitmq 级联-Federation

    千次阅读 2017-05-31 23:53:22
    Federation Plugin Introduction The high-level goal of the federation plugin is to transmit messages between brokers without requiring clustering. This is useful for various reasons: Loose cou
  • 消息中间件RabbitMQ-实战一

    千次阅读 2017-06-02 13:08:17
    1.安装erlang环境 ...apt-get install erlang yum install erlang 2.安装rabbitmq apt-get install rabbitmq-server http://www.rabbitmq.com/download.html 3.创建用户 (自己编译rabbitmq-s
  • RabbitMQ是一套开源的消息队列服务软件 是由LShift提供的一个AMQP(Advanced Message Queuing Protocol高级消息队列协议)的开源实现 由以高性能 健壮以及可伸缩性出名的Erlang写成 具有很高的稳定性和可靠性 1、核心...
  • 使用Erlang语言编写的一种消息中间件。 什么是消息中间件? 一种数据传送的消息传递机制,换句话说,是一种软件应用之间的通讯方式。 举个栗子: 消息中间件的作用之一是应用解耦。 拿取快递为例,前几年的...
  • 消息中间件Rabbitmq(二)-使用详解

    千次阅读 2018-02-27 15:05:38
    Rabbitmq 是基于amqp(高级消息队列协议)实现的队列技术,在他之上可以完成多种类型的消息转发模型。 下面列举一些常用的消息转发场景,在rabbitmq中是怎样实现的。 1.原理 先来看一下rabbitmq消息转发的原理,...
  • 一、RabbitMQ简介   1、什么是RabbitMQ   ...它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。  ...
  • 消息中间件RabbitMQ-主题模式(Topic)

    千次阅读 2019-06-03 22:04:12
    消息中间件RabbitMQ-主题模式(Topic)主题模式(Topic)什么是主题模式创建队列与绑定1)新建一个交换器 ,类型选择topic2)点击新建的交换器topictest3)添加匹配规则,添加后列表如下:代码实现1)goods.#2)#....

空空如也

1 2 3 4 5 ... 20
收藏数 93,352
精华内容 37,340
关键字:

消息中间件