rabbitmq 订阅
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 展开全文
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
信息
开发公司
Rabbit
简    称
MQ
构    成
以高性能、健壮以及可伸缩性出名的 Erlang 写成
释    义
一种程序对程序的通信方法
中文名
消息队列
外文名
Message Queue
rabbitmq简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
收起全文
精华内容
下载资源
问答
  • RabbitMQ教程_3 配置

    万次阅读 2021-01-08 14:53:56
    https://gitee.com/fakerlove/rabbitmq 文章目录3. 配置3.1 rabbitmq 所有命令3.2 命令介绍启动查看状态关闭所有ctl 指令所有插件3.3 Web 页面3.3.1...rabbitmq-defaults rabbitmq-env rabbitmq-queues rabbitmq-upgrad

    https://gitee.com/fakerlove/rabbitmq

    3. 配置

    3.1 rabbitmq 所有命令

    rabbitmq-defaults     rabbitmq-env          rabbitmq-queues       rabbitmq-upgrade      rabbitmqctl           
    rabbitmq-diagnostics  rabbitmq-plugins      rabbitmq-server       rabbitmqadmin 
    

    有这么多命令

    3.2 命令介绍

    启动

    rabbitmq-server
    

    后台启动

    rabbitmq-server  -detached
    

    查看状态

    rabbitmqctl status
    

    关闭

    rabbitmqctl stop
    

    所有ctl 指令

    Help:
    
       autocomplete                  Provides command name autocomplete variants
       help                          Displays usage information for a command
       version                       Displays CLI tools version
    
    Nodes:
    
       await_startup                 Waits for the RabbitMQ application to start on the target node
       reset                         Instructs a RabbitMQ node to leave the cluster and return to its virgin state
       rotate_logs                   Instructs the RabbitMQ node to perform internal log rotation
       shutdown                      Stops RabbitMQ and its runtime (Erlang VM). Monitors progress for local nodes. Does not require a PID file path.
       start_app                     Starts the RabbitMQ application but leaves the runtime (Erlang VM) running
       stop                          Stops RabbitMQ and its runtime (Erlang VM). Requires a local node pid file path to monitor progress.
       stop_app                      Stops the RabbitMQ application, leaving the runtime (Erlang VM) running
       wait                          Waits for RabbitMQ node startup by monitoring a local PID file. See also 'rabbitmqctl await_online_nodes'
    
    Cluster:
    
       await_online_nodes            Waits for <count> nodes to join the cluster
       change_cluster_node_type      Changes the type of the cluster node
       cluster_status                Displays all the nodes in the cluster grouped by node type, together with the currently running nodes
       force_boot                    Forces node to start even if it cannot contact or rejoin any of its previously known peers
       force_reset                   Forcefully returns a RabbitMQ node to its virgin state
       forget_cluster_node           Removes a node from the cluster
       join_cluster                  Instructs the node to become a member of the cluster that the specified node is in
       rename_cluster_node           Renames cluster nodes in the local database
       update_cluster_nodes          Instructs a cluster member node to sync the list of known cluster members from <seed_node>
    
    Replication:
    
       cancel_sync_queue             Instructs a synchronising mirrored queue to stop synchronising itself
       sync_queue                    Instructs a mirrored queue with unsynchronised mirrors (follower replicas) to synchronise them
    
    Users:
    
       add_user                      Creates a new user in the internal database
       authenticate_user             Attempts to authenticate a user. Exits with a non-zero code if authentication fails.
       change_password               Changes the user password
       clear_password                Clears (resets) password and disables password login for a user
       delete_user                   Removes a user from the internal database. Has no effect on users provided by external backends such as LDAP
       list_users                    List user names and tags
       set_user_tags                 Sets user tags
    
    Access Control:
    
       clear_permissions             Revokes user permissions for a vhost
       clear_topic_permissions       Clears user topic permissions for a vhost or exchange
       list_permissions              Lists user permissions in a virtual host
       list_topic_permissions        Lists topic permissions in a virtual host
       list_user_permissions         Lists permissions of a user across all virtual hosts
       list_user_topic_permissions   Lists user topic permissions
       list_vhosts                   Lists virtual hosts
       set_permissions               Sets user permissions for a vhost
       set_topic_permissions         Sets user topic permissions for an exchange
    
    Monitoring, observability and health checks:
    
       list_bindings                 Lists all bindings on a vhost
       list_channels                 Lists all channels in the node
       list_ciphers                  Lists cipher suites supported by encoding commands
       list_connections              Lists AMQP 0.9.1 connections for the node
       list_consumers                Lists all consumers for a vhost
       list_exchanges                Lists exchanges
       list_hashes                   Lists hash functions supported by encoding commands
       list_queues                   Lists queues and their properties
       list_unresponsive_queues      Tests queues to respond within timeout. Lists those which did not respond
       ping                          Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it
       report                        Generate a server status report containing a concatenation of all server status information for support purposes
       schema_info                   Lists schema database tables and their properties
       status                        Displays status of a node
    
    Parameters:
    
       clear_global_parameter        Clears a global runtime parameter
       clear_parameter               Clears a runtime parameter.
       list_global_parameters        Lists global runtime parameters
       list_parameters               Lists runtime parameters for a virtual host
       set_global_parameter          Sets a runtime parameter.
       set_parameter                 Sets a runtime parameter.
    
    Policies:
    
       clear_operator_policy         Clears an operator policy
       clear_policy                  Clears (removes) a policy
       list_operator_policies        Lists operator policy overrides for a virtual host
       list_policies                 Lists all policies in a virtual host
       set_operator_policy           Sets an operator policy that overrides a subset of arguments in user policies
       set_policy                    Sets or updates a policy
    
    Virtual hosts:
    
       add_vhost                     Creates a virtual host
       clear_vhost_limits            Clears virtual host limits
       delete_vhost                  Deletes a virtual host
       list_vhost_limits             Displays configured virtual host limits
       restart_vhost                 Restarts a failed vhost data stores and queues
       set_vhost_limits              Sets virtual host limits
       trace_off                     
       trace_on                      
    
    Configuration and Environment:
    
       decode                        Decrypts an encrypted configuration value
       encode                        Encrypts a sensitive configuration value
       environment                   Displays the name and value of each variable in the application environment for each running application
       set_cluster_name              Sets the cluster name
       set_disk_free_limit           Sets the disk_free_limit setting
       set_log_level                 Sets log level in the running node
       set_vm_memory_high_watermark  Sets the vm_memory_high_watermark setting
    
    Definitions:
    
       export_definitions            Exports definitions in JSON or compressed Erlang Term Format.
       import_definitions            Imports definitions in JSON or compressed Erlang Term Format.
    
    Feature flags:
    
       enable_feature_flag           Enables a feature flag on target node
       list_feature_flags            Lists feature flags
    
    Operations:
    
       close_all_connections         Instructs the broker to close all connections for the specified vhost or entire RabbitMQ node
       close_connection              Instructs the broker to close the connection associated with the Erlang process id
       eval                          Evaluates a snippet of Erlang code on the target node
       eval_file                     Evaluates a file that contains a snippet of Erlang code on the target node
       exec                          Evaluates a snippet of Elixir code on the CLI node
       force_gc                      Makes all Erlang processes on the target node perform/schedule a full sweep garbage collection
       resume_listeners              Resumes client connection listeners making them accept client connections again
       suspend_listeners             Suspends client connection listeners so that no new client connections are accepted
    
    Queues:
    
       delete_queue                  Deletes a queue
       purge_queue                   Purges a queue (removes all messages in it)
    
    Deprecated:
    
       hipe_compile                  DEPRECATED. This command is a no-op. HiPE is no longer supported by modern Erlang versions
       node_health_check             DEPRECATED. Performs intrusive, opinionated health checks on a fully booted node. See https://www.rabbitmq.com/monitoring.html#health-checks instead
    
    Use 'rabbitmqctl help <command>' to learn more about a specific command
    

    所有插件

    rabbitmq-plugins list
    

    3.3 Web 页面

    3.3.1 页面介绍

    image-20191126162026720

    • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
    • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
    • Exchanges:交换机,用来实现消息的路由
    • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

    3.3.2 Admin用户和虚拟主机管理

    添加用户

    • 超级管理员(administrator)

      可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

    • 监控者(monitoring)

      可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

    • 策略制定者(policymaker)

      可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

    • 普通管理者(management)

      仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

    • 其他

      无法登陆管理控制台,通常就是普通的生产者和消费者。

    创建虚拟主机

    为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

    绑定虚拟主机和用户

    展开全文
  • 必知必会 RabbitMQ面试题 33道(附答案)

    万次阅读 多人点赞 2021-04-19 00:21:36
    点击关注公众号,回复000获取优质资料前言 大家好,我是老田。今天我们来分享RabbitMQ消息队列。其中,MQ(Message Queue)翻译过来就是消息队列的意思。RabbitMQ...

    点击关注公众号,回复000获取优质资料

    前言

    大家好,我是老田。

    今天我们来分享RabbitMQ消息队列。 其中,MQ(Message Queue)翻译过来就是消息队列的意思。RabbitMQ作为消息队列中的优秀平台且开源,被很多公司使用。RabbitMQ服务器是用Erlang语言编写的,基于AMQP,本篇给大家总结了29道RabbitMQ知识点或者说面试题,可以收藏一波了,持续更新中...

    Rabbit核心知识总结

    下面使用一张思维导图来总结Rabbit消息队列:

    需要思维导图的,扫描加我微信,免费赠送

    RabbitMQ的30道题目如下

    1.RabbitMQ是什么?

    2.RabbitMQ特点?

    3.AMQP是什么?

    4.AMQP协议3层?

    5.AMQP模型的几大组件?

    6.怎么理解生产者Producer、消费者Consumer?

    7.为什么需要消息队列?

    8.Broker服务节点?

    9.消息队列有什么优缺点

    10.如何保证消息的可靠性?

    11.RoutingKey路由键?

    12.Binding绑定?

    13.交换器4种类型?

    14.生产者消息运转?

    15.消费者接收消息过程?

    16.交换器无法根据自身类型和路由键找到符合条件队列时,有哪些处理?

    17.什么是死信队列?

    18.导致的死信的有哪些原因?

    19.何为延迟队列?

    20.什么是优先级队列?

    21.熟悉RabbitMQ的事务机制吗?

    22.熟悉发送确认机制吗?

    23.消费者获取消息的方式?

    24.消费者某些原因无法处理当前接受的消息如何来拒绝?

    25.消息传输保证层级?

    26.vhost是什么?

    27.说说集群中的节点类型?

    28.熟悉队列结构吗?

    29.RabbitMQ中消息可能有的几种状态?

    30.在何种场景下使用了消息中间件?

    31.生产者如何将消息可靠投递到MQ?

    32.如何保证RabbitMQ消息队列的高可用?

    1. MQ如何将消息可靠投递到消费者?

    1.RabbitMQ是什么?

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

    PS:也可能直接问什么是消息队列?消息队列就是一个使用队列来通信的组件

    2.RabbitMQ特点?

    可靠性: RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。

    灵活的路由 : 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个 交换器绑定在一起, 也可以通过插件机制来实现自己的交换器。

    扩展性: 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。

    高可用性 : 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。

    多种协议: RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息 中间件协议。

    多语言客户端 :RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。

    管理界面 : RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集 群中的节点等。

    令插件机制 : RabbitMQ 提供了许多插件 , 以实现从多方面进行扩展,当然也可以编写自 己的插件。

    3.AMQP是什么?

    RabbitMQ就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。

    RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

    4.AMQP协议3层?

    Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。

    Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。

    TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。

    5.AMQP模型的几大组件?

    • 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。

    • 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。

    • 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。

    6.说说生产者Producer和消费者Consumer?

    生产者

    • 消息生产者,就是投递消息的一方。

    • 消息一般包含两个部分:消息体(payload)和标签(Label)。

    消费者

    • 消费消息,也就是接收消息的一方。

    • 消费者连接到RabbitMQ服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。

    7.为什么需要消息队列?

    从本质上来说是因为互联网的快速发展,业务不断扩张,促使技术架构需要不断的演进。

    从以前的单体架构到现在的微服务架构,成百上千的服务之间相互调用和依赖。从互联网初期一个服务器上有 100 个在线用户已经很了不得,到现在坐拥10亿日活的微信。此时,我们需要有一个「工具」来解耦服务之间的关系、控制资源合理合时的使用以及缓冲流量洪峰等等。因此,消息队列就应运而生了。

    它常用来实现:异步处理服务解耦流量控制(削峰)

    8.说说Broker服务节点、Queue队列、Exchange交换器?

    • Broker可以看做RabbitMQ的服务节点。一般请下一个Broker可以看做一个RabbitMQ服务器。

    • Queue:RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。

    • Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。

    9.消息队列有什么优缺点

    优点上面已经说了,就是在特殊场景下有其对应的好处,解耦、异步、削峰。缺点有以下几个:

    • 系统可用性降低 系统引入的外部依赖越多,越容易挂掉。万一 MQ 挂了,MQ 一挂,整套系统崩 溃,你不就完了?

    • 系统复杂度提高 硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?

    • 怎么保证消息传递的顺序性?问题一大堆。

    • 一致性问题 A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致 了。

    10.如何保证消息的可靠性?

    消息到MQ的过程中搞丢,MQ自己搞丢,MQ到消费过程中搞丢。

    生产者到RabbitMQ:事务机制和Confirm机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。

    RabbitMQ自身:持久化、集群、普通模式、镜像模式。

    RabbitMQ到消费者:basicAck机制、死信队列、消息补偿机制。

    11.什么是RoutingKey路由键?

    生产者将消息发送给交换器的时候,会指定一个RoutingKey,用来指定这个消息的路由规则,这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

    12.Binding绑定?

    通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,这样RabbitMq就知道如何正确路由消息到队列了。

    13.交换器4种类型?

    主要有以下4种。

    • fanout:把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

    • direct:把消息路由到BindingKey和RoutingKey完全匹配的队列中。

    • topic:

    • 匹配规则:

    RoutingKey 为一个 点号'.': 分隔的字符串。比如: java.xiaoka.show

    BindingKeyRoutingKey一样也是点号“.“分隔的字符串。

    BindingKey可使用 * 和 # 用于做模糊匹配,*匹配一个单词,#匹配多个或者0个

    headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到。

    14.生产者消息运转?

    1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)。

    2.Producer声明一个交换器并设置好相关属性。

    3.Producer声明一个队列并设置好相关属性。

    4.Producer通过路由键将交换器和队列绑定起来。

    5.Producer发送消息到Broker,其中包含路由键、交换器等信息。

    6.相应的交换器根据接收到的路由键查找匹配的队列。

    7.如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。

    8.关闭信道。

    9.管理连接。

    15.消费者接收消息过程?

    1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel)。

    2.向Broker请求消费响应的队列中消息,可能会设置响应的回调函数。

    3.等待Broker回应并投递相应队列中的消息,接收消息。

    4.消费者确认收到的消息,ack

    5.RabbitMq从队列中删除已经确定的消息。

    6.关闭信道。

    7.关闭连接。

    16.交换器无法根据自身类型和路由键找到符合条件队列时,有哪些处理?

    • mandatory :true 返回消息给生产者。

    • mandatory: false 直接丢弃。

    17.死信队列?

    DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

    18.导致的死信的几种原因?

    • 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false

    • 消息TTL过期。

    • 队列满了,无法再添加。

    19.延迟队列?

    存储对应的延迟消息,指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

    20.优先级队列?

    • 优先级高的队列会先被消费。

    • 可以通过x-max-priority参数来实现。

    • 当消费速度大于生产速度且Broker没有堆积的情况下,优先级显得没有意义。

    21.事务机制?

    RabbitMQ 客户端中与事务机制相关的方法有三个:

    channel.txSelect  用于将当前的信道设置成事务模式。

    channel . txCommit 用于提交事务 。

    channel . txRollback 用于事务回滚,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,通过txRollback来回滚。

    22.发送确认机制?

    生产者把信道设置为confirm确认模式,设置后,所有再改信道发布的消息都会被指定一个唯一的ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息到达对应的目的地了。

    23.消费者获取消息的方式?

    24.消费者某些原因无法处理当前接受的消息如何来拒绝?

    channel .basicNack channel .basicReject

    25.消息传输保证层级?

    At most once:最多一次。消息可能会丢失,但不会重复传输。

    At least once:最少一次。消息绝不会丢失,但可能会重复传输。

    Exactly once:  恰好一次,每条消息肯定仅传输一次。

    26.了解Virtual Host吗?

    每一个RabbitMQ服务器都能创建虚拟的消息服务器,也叫虚拟主机(virtual host),简称vhost。

    默认为“/”。

    27.集群中的节点类型?

    内存节点:ram,将变更写入内存。

    磁盘节点:disc,磁盘写入操作。

    RabbitMQ要求最少有一个磁盘节点。

    28.队列结构?

    通常由以下两部分组成?

    rabbit_amqqueue_process:负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack) 等。

    backing_queue:是消息存储的具体形式和引擎,并向 rabbit amqqueue process提供相关的接口以供调用。

    29.RabbitMQ中消息可能有的几种状态?

    alpha: 消息内容(包括消息体、属性和 headers) 和消息索引都存储在内存中 。

    beta: 消息内容保存在磁盘中,消息索引保存在内存中。

    gamma: 消息内容保存在磁盘中,消息索引在磁盘和内存中都有 。

    delta: 消息内容和索引都在磁盘中 。

    30.在何种场景下使用了消息中间件?

    • 接口之间耦合比较严重

    • 面对大流量并发时,容易被冲垮

    • 存在性能问题

    31.生产者如何将消息可靠投递到MQ?

    1.Client发送消息给MQ

    2.MQ将消息持久化后,发送Ack消息给Client,此处有可能因为网络问题导致Ack消息无法发送到Client,那么Client在等待超时后,会重传消息;

    3.Client收到Ack消息后,认为消息已经投递成功。

    32 . MQ如何将消息可靠投递到消费者?

    1.MQ将消息push给Client(或Client来pull消息)

    2.Client得到消息并做完业务逻辑

    3.Client发送Ack消息给MQ,通知MQ删除该消息,此处有可能因为网络问题导致Ack失败,那么Client会重复消息,这里就引出消费幂等的问题;

    4.MQ将已消费的消息删除

    33.如何保证RabbitMQ消息队列的高可用?

    RabbitMQ 有三种模式:单机模式普通集群模式镜像集群模式

    单机模式:就是demo级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式

    普通集群模式:意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。

    镜像集群模式:这种模式,才是所谓的RabbitMQ的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论元数据(元数据指RabbitMQ的配置数据)还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。

    参考:http://nl03r.cn/BQwhi

    总结

    由于是专门应对面试的,肯定不会对每个知识点都细说,我们只要找到被问的概率相关对高的知识点和题目。

    推荐阅读

    MySQL的25个连环炮

    线程池的12个连环炮

    并发编程基础的12个连环炮

    JVM的18个连环炮

    HashMap的31连环炮,我倒在第5个上

    Redis的43连环炮,试试你能扛住几个

    面试官:线上系统CPU飙高怎么办?

    别不信,98%的程序员都是这样的

    期待你的点赞、在看、转发,先谢谢啦!

    展开全文
  • RabbitMQ

    万次阅读 多人点赞 2019-10-29 23:14:20
    文章目录RabbitMQ 使用场景服务解耦流量削峰异步调用rabbitmq 基本概念ExchangeMessage QueueBinding KeyRouting Keyrabbitmq安装安装erlang语言库rabbitmq官方精简的Erlang语言包下载和安装安装socat依赖socat依赖...

    Rabbitmq

    RabbitMQ 使用场景

    服务解耦

    假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可

    但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难

    这是由于服务之间耦合度过于紧密

    耦合

    再来考虑用RabbitMQ解耦的情况

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可

    解耦

    流量削峰

    假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对

    低流量

    而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了

    流量峰值

    这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

    这是消息队列服务器非常典型的应用场景

    流量销峰

    异步调用

    考虑定外卖支付成功的情况

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长

    这样就造成整条调用链路响应非常缓慢

    阻塞

    而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作

    异步调用

    rabbitmq 基本概念

    RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

    rabbitmq

    Exchange

    接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

    exchange

    Message Queue

    消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

    Binding Key

    它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

    Routing Key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

    rabbitmq安装

    Docker 启动Rabbitmq

    下载镜像,rabbitmq:management 镜像中已经安装了管理界面

    docker pull rabbitmq:management
    

    关闭防火墙

    systemctl stop firewalld
    systemctl disable firewalld
     
    # 重启 docker 系统服务
    systemctl restart docker
    

    配置管理员用户名和密码

    mkdir /etc/rabbitmq
    vim /etc/rabbitmq/rabbitmq.conf
    
    # 添加两行配置:
    default_user = admin
    default_pass = admin
    

    启动Rabbitmq

    docker run -d --name rabbit \
    -p 5672:5672 \
    -p 15672:15672 \
    -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
    -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
    rabbitmq:management
    
    

    访问管理控制台 http://192.168.64.140:15672
    用户名密码是 admin

    离线安装

    下载离线安装包文件

    上传离线安装包

    • rabbitmq-install 目录上传到 /root

    切换到rabbitmq-install目录

    cd rabbitmq-install
    

    安装

    rpm -ivh *.rpm
    

    Yum在线安装

    以下内容来自 RabbitMQ 官方手册

    rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
    
    
    # centos7 用这个
    cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
    [bintray-rabbitmq-server]
    name=bintray-rabbitmq-rpm
    baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
    gpgcheck=0
    repo_gpgcheck=0
    enabled=1
    EOF
    
    
    # centos6 用这个
    cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
    [bintray-rabbitmq-server]
    name=bintray-rabbitmq-rpm
    baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/
    gpgcheck=0
    repo_gpgcheck=0
    enabled=1
    EOF
    
    
    yum makecache
    
    yum install socat
    
    wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpm
    rpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodeps
    
    yum install rabbitmq-server
    

    启动rabbitmq服务器

    # 设置服务,开机自动启动
    systemctl enable rabbitmq-server
    
    # 启动服务
    systemctl start rabbitmq-server
    

    rabbitmq管理界面

    启用管理界面

    # 开启管理界面插件
    rabbitmq-plugins enable rabbitmq_management
    
    # 防火墙打开 15672 管理端口
    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --reload
    

    重启RabbitMQ服务

    systemctl restart rabbitmq-server
    

    访问

    访问服务器的15672端口,例如:

    http://192.168.64.140:15672

    添加用户

    添加用户

    # 添加用户
    rabbitmqctl add_user admin admin
    
    # 新用户设置用户为超级管理员
    rabbitmqctl set_user_tags admin administrator
    

    设置访问权限

    访问权限
    访问权限

    开放客户端连接端口

    # 打开客户端连接端口
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
    firewall-cmd --reload
    
    • 主要端口介绍
      • 4369 – erlang发现口
      • 5672 – client端通信口
      • 15672 – 管理界面ui端口
      • 25672 – server间内部通信口

    rabbitmq六种工作模式

    简单模式

    简单

    RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

    • 发送消息的程序是生产者
    • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
    • 消费者等待从队列接收消息

    简单模式

    pom.xml

    添加 slf4j 依赖, 和 rabbitmq amqp 依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.tedu</groupId>
    	<artifactId>rabbitmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<dependencies>
    		<dependency>
    			<groupId>com.rabbitmq</groupId>
    			<artifactId>amqp-client</artifactId>
    			<version>5.4.3</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>1.8.0-alpha2</version>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>3.8.0</version>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

    生产者发送消息

    package rabbitmq.simple;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		//创建连接工厂,并设置连接信息
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);//可选,5672是默认端口
    		f.setUsername("admin");
    		f.setPassword("admin");
    
    		/*
    		 * 与rabbitmq服务器建立连接,
    		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
    		 * 并开辟多个信道与客户端通信
    		 * 以减轻服务器端建立连接的开销
    		 */
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    
    		/*
    		 * 声明队列,会在rabbitmq中创建一个队列
    		 * 如果已经创建过该队列,就不能再使用其他参数来创建
    		 * 
    		 * 参数含义:
    		 *   -queue: 队列名称
    		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
    		 *   -exclusive: 排他,true表示限制仅当前连接可用
    		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
    		 *   -arguments: 其他参数
    		 */
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		/*
    		 * 发布消息
    		 * 这里把消息向默认交换机发送.
    		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
    		 * 
    		 * 参数含义:
    		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
    		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
    		 * 	-props: 其他参数,例如头信息
    		 * 	-body: 消息内容byte[]数组
    		 */
    		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
    
    		System.out.println("消息已发送");
    		c.close();
    	}
    }
    

    消费者接收消息

    package rabbitmq.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列,如果该队列已经创建过,则不会重复创建
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    

    工作模式

    工作

    工作模式

    工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

    我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

    使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

    生产者发送消息

    这里模拟耗时任务,发送的消息中,每个点使工作进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		//参数:queue,durable,exclusive,autoDelete,arguments
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		while (true) {
    		    //控制台输入的消息发送到rabbitmq
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			//如果输入的是"exit"则结束生产者进程
    			if ("exit".equals(msg)) {
    				break;
    			}
    			//参数:exchage,routingKey,props,body
    			ch.basicPublish("", "helloworld", null, msg.getBytes());
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者接收消息

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    
    				//遍历字符串中的字符,每个点使进程暂停一秒
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    

    运行测试

    运行:

    • 一个生产者
    • 两个消费者

    生产者发送多条消息,
    如: 1,2,3,4,5. 两个消费者分别收到:

    • 消费者一: 1,3,5
    • 消费者二: 2,4

    rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者

    消息确认

    一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?

    就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失

    如果生产者发送以下消息:

    1…

    2

    3

    4

    5

    两个消费者分别收到:

    • 消费者一: 1…, 3, 5
    • 消费者二: 2, 4

    当消费者一收到所有消息后,要话费7秒时间来处理第一条消息,这期间如果关闭该消费者,那么1未处理完成,3,5则没有被处理

    我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者

    为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

    如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

    这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

    手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为false,然后工作进程处理完意向任务时,发送一个消息确认(回执)。

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("helloworld", false, callback, cancel);
    	}
    }
    
    

    使用以上代码,就算杀掉一个正在处理消息的工作进程也不会丢失任何消息,工作进程挂掉之后,没有确认的消息就会被自动重新传递。

    忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 由于未确认的消息不会被释放, rabbitmq会吃掉越来越多的内存

    可以使用下面命令打印工作队列中未确认消息的数量

    rabbitmqctl list_queues name messages_ready messages_unacknowledged
    

    当处理消息时异常中断, 可以选择让消息重回队列重新发送.
    nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:

    // requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
    c.basicNack(tag, multiple, requeue)
    

    合理地分发

    rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

    我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

    合理分发

    消息持久化

    当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

    要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

    队列设置为可持久化, 可以在定义队列时指定参数durable为true

    //第二个参数是持久化参数durable
    ch.queueDeclare("helloworld", true, false, false, null);
    

    由于之前我们已经定义过队列"hello"是不可持久化的, 对已存在的队列, rabbitmq不允许对其定义不同的参数, 否则会出错, 所以这里我们定义一个不同名字的队列"task_queue"

    //定义一个新的队列,名为 task_queue
    //第二个参数是持久化参数 durable
    ch.queueDeclare("task_queue", true, false, false, null);
    

    生产者和消费者代码都要修改

    这样即使rabbitmq重新启动, 队列也不会丢失. 现在我们再设置队列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN参数

    //第三个参数设置消息持久化
    ch.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                msg.getBytes());
    

    下面是"工作模式"最终完成的生产者和消费者代码

    生产者代码

    package rabbitmq.workqueue;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class Test3 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//第二个参数设置队列持久化
    		ch.queueDeclare("task_queue", true,false,false,null);
    
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//第三个参数设置消息持久化
    			ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者代码

    package rabbitmq.workqueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test4 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//第二个参数设置队列持久化
    		ch.queueDeclare("task_queue",true,false,false,null);
    		
    		System.out.println("等待接收数据");
    		
    		ch.basicQos(1); //一次只接收一条消息
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    				for (int i = 0; i < msg.length(); i++) {
    					if (msg.charAt(i)=='.') {
    						try {
    							Thread.sleep(1000);
    						} catch (InterruptedException e) {
    						}
    					}
    				}
    				System.out.println("处理结束");
    				//发送回执
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//autoAck设置为false,则需要手动确认发送回执
    		ch.basicConsume("task_queue", false, callback, cancel);
    	}
    }
    
    

    发布订阅模式

    发布订阅

    发布订阅

    在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。

    为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序接收它们。

    在我们的日志系统中,接收程序的每个运行副本都将获得消息。这样,我们就可以运行一个消费者并将日志保存到磁盘; 同时我们可以运行另一个消费者在屏幕上打印日志。

    最终, 消息会被广播到所有消息接受者

    Exchanges 交换机

    RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。

    相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。

    有几种可用的交换类型:direct、topic、header和fanout。我们将关注最后一个——fanout。让我们创建一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");

    fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列。这正是我们的日志系统所需要的。

    我们前面使用的队列具有特定的名称(还记得hello和task_queue吗?)能够为队列命名对我们来说至关重要——我们需要将工作进程指向同一个队列,在生产者和消费者之间共享队列。

    但日志记录案例不是这种情况。我们想要接收所有的日志消息,而不仅仅是其中的一部分。我们还只对当前的最新消息感兴趣,而不是旧消息。

    要解决这个问题,我们需要两件事。首先,每当我们连接到Rabbitmq时,我们需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的方法是让服务器为我们选择一个随机队列名称。其次,一旦断开与使用者的连接,队列就会自动删除。在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列

    //自动生成队列名
    //非持久,独占,自动删除
    String queueName = ch.queueDeclare().getQueue();
    

    绑定 Bindings

    绑定

    我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。

    //指定的队列,与指定的交换机关联起来
    //成为绑定 -- binding
    //第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey
    ch.queueBind(queueName, "logs", "");
    

    现在, logs交换机将会向我们指定的队列添加消息

    列出绑定关系:

    rabbitmqctl list_bindings

    完成的代码

    完成代码

    生产者

    生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。

    package rabbitmq.publishsubscribe;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为logs的交换机,交换机类型为fanout
    		//这一步是必须的,因为禁止发布到不存在的交换。
    		ch.exchangeDeclare("logs", "fanout");
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//第一个参数,向指定的交换机发送消息
    			//第二个参数,不指定队列,由消费者向交换机绑定队列
    			//如果还没有队列绑定到交换器,消息就会丢失,
    			//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
    			ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。
    ReceiveLogs.java代码:

    package rabbitmq.publishsubscribe;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为 logs 的交换机, 它的类型是 fanout
    		ch.exchangeDeclare("logs", "fanout");
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		//把该队列,绑定到 logs 交换机
    		//对于 fanout 类型的交换机, routingKey会被忽略,不允许null值
    		ch.queueBind(queueName, "logs", "");
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    路由模式

    路由

    路由模式

    在上一小节,我们构建了一个简单的日志系统。我们能够向多个接收者广播日志消息。

    在这一节,我们将向其添加一个特性—我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

    绑定 Bindings

    在上一节,我们已经创建了队列与交换机的绑定。使用下面这样的代码:

    ch.queueBind(queueName, "logs", "");
    

    绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

    绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定:

    ch.queueBind(queueName, EXCHANGE_NAME, "black");
    

    bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。

    直连交换机 Direct exchange

    上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。

    前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。

    我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。为了说明这一点,请考虑以下设置

    路由模式

    其中我们可以看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green

    这样设置,使用路由键orange发布到交换器的消息将被路由到队列Q1。带有blackgreen路由键的消息将转到Q2。而所有其他消息都将被丢弃。

    多重绑定 Multiple bindings

    多重绑定

    使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。

    发送日志

    我们将在日志系统中使用这个模型。我们把消息发送到一个Direct交换机,而不是fanout。我们将提供日志级别作为routingKey。这样,接收程序将能够选择它希望接收的级别。让我们首先来看发出日志。

    和前面一样,我们首先需要创建一个exchange:

    //参数1: 交换机名
    //参数2: 交换机类型
    ch.exchangeDeclare("direct_logs", "direct");
    

    接着来看发送消息的代码

    //参数1: 交换机名
    //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    //参数3: 其他配置属性
    //参数4: 发布的消息数据 
    ch.basicPublish("direct_logs", "error", null, message.getBytes());
    

    订阅

    接收消息的工作原理与前面章节一样,但有一个例外——我们将为感兴趣的每个日志级别创建一个新的绑定, 示例代码如下:

    ch.queueBind(queueName, "logs", "info");
    ch.queueBind(queueName, "logs", "warning");
    

    完整的代码

    完整代码

    生产者

    package rabbitmq.routing;
    
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		String[] a = {"warning", "info", "error"};
    		
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//参数1: 交换机名
    		//参数2: 交换机类型
    		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//随机产生日志级别
    			String level = a[new Random().nextInt(a.length)];
    			
    			//参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
    			ch.basicPublish("direct_logs", level, null, msg.getBytes());
    			System.out.println("消息已发送: "+level+" - "+msg);
    			
    		}
    
    		c.close();
    	}
    }
    
    

    消费者

    package rabbitmq.routing;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为 direct_logs 的交换机, 它的类型是 "direct"
    		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		System.out.println("输入接收的日志级别,用空格隔开:");
    		String[] a = new Scanner(System.in).nextLine().split("\\s");
    		
    		//把该队列,绑定到 direct_logs 交换机
    		//允许使用多个 bindingKey
    		for (String level : a) {
    			ch.queueBind(queueName, "direct_logs", level);
    		}
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				String routingKey = message.getEnvelope().getRoutingKey();
    				System.out.println("收到: "+routingKey+" - "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    主题模式

    主题

    在上一小节,我们改进了日志系统。我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。

    虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

    在我们的日志系统中,我们可能不仅希望根据级别订阅日志,还希望根据发出日志的源订阅日志。

    这将给我们带来很大的灵活性——我们可能只想接收来自“cron”的关键错误,但也要接收来自“kern”的所有日志。

    要在日志系统中实现这一点,我们需要了解更复杂的Topic交换机。

    主题交换机 Topic exchange

    发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

    bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

    • * 可以通配单个单词。
    • # 可以通配零个或多个单词。

    用一个例子来解释这个问题是最简单的

    主题

    在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词表示速度,第二个是颜色,第三个是物种:“<速度>.<颜色>.<物种>”。

    我们创建三个绑定:Q1与bindingKey “*.orange.*” 绑定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。

    这些绑定可概括为:

    • Q1对所有橙色的动物感兴趣。
    • Q2想接收关于兔子和慢速动物的所有消息。

    将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

    如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。

    另外,"lazy.orange.male.rabbit",即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。

    完成的代码

    生产者

    package rabbitmq.topic;
    
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//参数1: 交换机名
    		//参数2: 交换机类型
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".contentEquals(msg)) {
    				break;
    			}
    			System.out.print("输入routingKey: ");
    			String routingKey = new Scanner(System.in).nextLine();
    			
    			//参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
    			ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
    			
    			System.out.println("消息已发送: "+routingKey+" - "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    package rabbitmq.topic;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		System.out.println("输入bindingKey,用空格隔开:");
    		String[] a = new Scanner(System.in).nextLine().split("\\s");
    		
    		//把该队列,绑定到 topic_logs 交换机
    		//允许使用多个 bindingKey
    		for (String bindingKey : a) {
    			ch.queueBind(queueName, "topic_logs", bindingKey);
    		}
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				String routingKey = message.getEnvelope().getRoutingKey();
    				System.out.println("收到: "+routingKey+" - "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    

    RPC模式

    RPC

    如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC.

    在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。

    客户端

    在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果

    RPCClient client = new RPCClient();
    String result = client.call("4");
    System.out.println( "第四个斐波那契数是: " + result);
    

    回调队列 Callback Queue

    使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:

    //定义回调队列,
    //自动生成对列名,非持久,独占,自动删除
    callbackQueueName = ch.queueDeclare().getQueue();
    
    //用来设置回调队列的参数对象
    BasicProperties props = new BasicProperties
                                .Builder()
                                .replyTo(callbackQueueName)
                                .build();
    //发送调用消息
    ch.basicPublish("", "rpc_queue", props, message.getBytes());
    

    消息属性 Message Properties

    AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:

    deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。

    contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json

    replyTo:通常用于指定回调队列。

    correlationId:将RPC响应与请求关联起来非常有用。

    关联id (correlationId):

    在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。

    这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。

    小结

    rpc

    RPC的工作方式是这样的:

    • 对于RPC请求,客户端发送一条带有两个属性的消息:replyTo,设置为仅为请求创建的匿名独占队列,和correlationId,设置为每个请求的惟一id值。
    • 请求被发送到rpc_queue队列。
    • RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。
    • 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据。

    完成的代码

    服务器端

    package rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCServer {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		/*
    		 * 定义队列 rpc_queue, 将从它接收请求信息
    		 * 
    		 * 参数:
    		 * 1. queue, 对列名
    		 * 2. durable, 持久化
    		 * 3. exclusive, 排他
    		 * 4. autoDelete, 自动删除
    		 * 5. arguments, 其他参数属性
    		 */
    		ch.queueDeclare("rpc_queue",false,false,false,null);
    		ch.queuePurge("rpc_queue");//清除队列中的内容
    		
    		ch.basicQos(1);//一次只接收一条消息
    		
    		
    		//收到请求消息后的回调对象
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				//处理收到的数据(要求第几个斐波那契数)
    				String msg = new String(message.getBody(), "UTF-8");
    				int n = Integer.parseInt(msg);
    				//求出第n个斐波那契数
    				int r = fbnq(n);
    				String response = String.valueOf(r);
    				
    				//设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应
    				BasicProperties replyProps = new BasicProperties.Builder()
    						.correlationId(message.getProperties().getCorrelationId())
    						.build();
    				/*
    				 * 发送响应消息
    				 * 1. 默认交换机
    				 * 2. 由客户端指定的,用来传递响应消息的队列名
    				 * 3. 参数(关联id)
    				 * 4. 发回的响应消息
    				 */
    				ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
    				//发送确认消息
    				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
    			}
    		};
    		
    		//
    		CancelCallback cancelCallback = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认
    		ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
    	}
    
    	protected static int fbnq(int n) {
    		if(n == 1 || n == 2) return 1;
    		
    		return fbnq(n-1)+fbnq(n-2);
    	}
    }
    
    

    客户端

    package rabbitmq.rpc;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCClient {
    	Connection con;
    	Channel ch;
    	
    	public RPCClient() throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		con = f.newConnection();
    		ch = con.createChannel();
    	}
    	
    	public String call(String msg) throws Exception {
    		//自动生成对列名,非持久,独占,自动删除
    		String replyQueueName = ch.queueDeclare().getQueue();
    		//生成关联id
    		String corrId = UUID.randomUUID().toString();
    		
    		//设置两个参数:
    		//1. 请求和响应的关联id
    		//2. 传递响应数据的queue
    		BasicProperties props = new BasicProperties.Builder()
    				.correlationId(corrId)
    				.replyTo(replyQueueName)
    				.build();
    		//向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数
    		ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
    		
    		//用来保存结果的阻塞集合,取数据时,没有数据会暂停等待
    		BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    		
    		//接收响应数据的回调对象
    		DeliverCallback deliverCallback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				//如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据
    				if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
    					//把收到的响应数据,放入阻塞集合
    					response.offer(new String(message.getBody(), "UTF-8"));
    				}
    			}
    		};
    
    		CancelCallback cancelCallback = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		//开始从队列接收响应数据
    		ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
    		//返回保存在集合中的响应数据
    		return response.take();
    	}
    	
    	public static void main(String[] args) throws Exception {
    		RPCClient client = new RPCClient();
    		while (true) {
    			System.out.print("求第几个斐波那契数:");
    			int n = new Scanner(System.in).nextInt();
    			String r = client.call(""+n);
    			System.out.println(r);
    		}
    	}
    }
    
    

    virtual host

    在RabbitMQ中叫做虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通

    创建virtual host: /pd

    • 进入虚拟机管理界面

    虚拟主机

    • 添加新的虚拟机’/pd’,名称必须以"/"开头

    虚拟主机

    • 查看添加的结果

    结果

    设置虚拟机的用户访问权限

    点击 /pd 虚拟主机, 设置用户 admin 对它的访问权限

    权限

    拼多商城整合 rabbitmq

    当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中

    当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统

    我们需要应对流量峰值,让流量曲线变得平缓,如下图

    削峰

    订单存储的解耦

    为了进行流量削峰,我们引入 rabbitmq 消息队列,当购物系统产生订单后,可以把订单数据发送到消息队列;而订单消费者应用从消息队列接收订单消息,并把订单保存到数据库

    订单

    这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存

    生产者-发送订单

    pom.xml 添加依赖

    spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    application.yml

    添加RabbitMQ的连接信息

    spring:
      rabbitmq:
        host: 192.168.64.140
        port: 5672
        virtualHost: /pd
        username: admin
        password: admin
    
    

    修改主程序 RunPdAPP

    在主程序中添加下面的方法创建Queue实例

    当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,代码在RabbitAdmin.declareQueues()方法中

    	@Bean
    	public Queue getQueue() {
    		Queue q = new Queue("orderQueue", true);
    		return q;
    	}
    

    修改 OrderServiceImpl

    	//RabbitAutoConfiguration中创建了AmpqTemplate实例
    	@Autowired
    	AmqpTemplate amqpTemplate;
    
        //saveOrder原来的数据库访问代码全部注释,添加rabbitmq消息发送代码
    	public String saveOrder(PdOrder pdOrder) throws Exception {
    		String orderId = generateId();
    		pdOrder.setOrderId(orderId);
    
    		amqpTemplate.convertAndSend("orderQueue", pdOrder);
    		return orderId;
    
    		
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		//
    		//		
    		//		PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
    		//		pdOrder.setShippingName(pdShipping.getReceiverName());
    		//		pdOrder.setShippingCode(pdShipping.getReceiverAddress());
    		//		pdOrder.setStatus(1);// 
    		//		pdOrder.setPaymentType(1);
    		//		pdOrder.setPostFee(10D);
    		//		pdOrder.setCreateTime(new Date());
    		//
    		//		double payment = 0;
    		//		List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
    		//		for (ItemVO itemVO : itemVOs) {
    		//			PdOrderItem pdOrderItem = new PdOrderItem();
    		//			String id = generateId();
    		//			//String id="2";
    		//			pdOrderItem.setId(id);
    		//			pdOrderItem.setOrderId(orderId);
    		//			pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
    		//			pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
    		//			pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
    		//			pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
    		//
    		//			payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
    		//			pdOrderItemMapper.insert(pdOrderItem);
    		//		}
    		//		pdOrder.setPayment(payment);
    		//		pdOrderMapper.insert(pdOrder);
    		//		return orderId;
    	}
    
    

    消费者-接收订单,并保存到数据库

    pd-web项目复制为pd-order-consumer

    复制项目

    修改 application.yml

    把端口修改成 81

    server:
      port: 81
    
    spring:
      datasource:
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8
        username: root
        password: root
    
      rabbitmq:
        host: 192.168.64.140
        port: 5672
        virtualHost: /pd
        username: admin
        password: admin
    
    mybatis:
      #typeAliasesPackage: cn.tedu.ssm.pojo
      mapperLocations: classpath:com.pd.mapper/*.xml
    
    logging:
      level: 
        cn.tedu.ssm.mapper: debug
    

    删除无关代码

    pd-order-consumer项目只需要从 RabbitMQ 接收订单数据, 再保存到数据库即可, 所以项目中只需要保留这部分代码

    • 删除 com.pd.controller 包
    • 删除 com.pd.payment.utils 包
    • 删除无关的 Service,只保留 OrderService 和 OrderServiceImpl

    新建 OrderConsumer

    package com.pd;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.pd.pojo.PdOrder;
    import com.pd.service.OrderService;
    
    @Component
    public class OrderConsumer {
        //收到订单数据后,会调用订单的业务代码,把订单保存到数据库
    	@Autowired
    	private OrderService orderService;
    
        //添加该注解后,会从指定的orderQueue接收消息,
        //并把数据转为 PdOrder 实例传递到此方法
    	@RabbitListener(queues="orderQueue")
    	public void save(PdOrder pdOrder)
    	{
    		System.out.println("消费者");
    		System.out.println(pdOrder.toString());
    		try {
    			orderService.saveOrder(pdOrder);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    

    修改 OrderServiceImpl 的 saveOrder() 方法

    	public String saveOrder(PdOrder pdOrder) throws Exception {
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		//
    		//		amqpTemplate.convertAndSend("orderQueue", pdOrder);
    		//		return orderId;
    		//
    		//		
    		//		
    		//		String orderId = generateId();
    		//		pdOrder.setOrderId(orderId);
    		
    		//从RabbitMQ接收的订单数据,
    		//已经在上游订单业务中生成过id,这里不再重新生成id
    		//直接获取该订单的id
    		String orderId = pdOrder.getOrderId();
    
    		
    		PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
    		pdOrder.setShippingName(pdShipping.getReceiverName());
    		pdOrder.setShippingCode(pdShipping.getReceiverAddress());
    		pdOrder.setStatus(1);// 
    		pdOrder.setPaymentType(1);
    		pdOrder.setPostFee(10D);
    		pdOrder.setCreateTime(new Date());
    
    		double payment = 0;
    		List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
    		for (ItemVO itemVO : itemVOs) {
    			PdOrderItem pdOrderItem = new PdOrderItem();
    			String id = generateId();
    			//String id="2";
    			pdOrderItem.setId(id);
    			pdOrderItem.setOrderId(orderId);
    			pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
    			pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
    			pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
    			pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
    
    			payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
    			pdOrderItemMapper.insert(pdOrderItem);
    		}
    		pdOrder.setPayment(payment);
    		pdOrderMapper.insert(pdOrder);
    		return orderId;
    	}   
    

    手动确认

    application.yml

    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual
    

    OrderConsumer

    package com.pd;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.pd.pojo.PdOrder;
    import com.pd.service.OrderService;
    import com.rabbitmq.client.Channel;
    
    @Component
    public class OrderConsumer {
        //收到订单数据后,会调用订单的业务代码,把订单保存到数据库
    	@Autowired
    	private OrderService orderService;
    
        //添加该注解后,会从指定的orderQueue接收消息,
        //并把数据转为 PdOrder 实例传递到此方法
    	@RabbitListener(queues="orderQueue")
    	public void save(PdOrder pdOrder, Channel channel, Message message)
    	{
    		System.out.println("消费者");
    		System.out.println(pdOrder.toString());
    		try {
    			orderService.saveOrder(pdOrder);
    			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    		} catch (Exception e) {
    			e.printStackTrace();
    		} 
    	}
    }
    
    展开全文
  • RabbitMQ教程(万字教程)

    万次阅读 多人点赞 2018-08-05 23:14:56
    RabbitMQ教程。RabbitMQ面试题。消息队列(MQ),本质是个队列,队列中存放的内容是message。MQ用于不同进程Process/线程Thread之间通信。本文介绍RabbitMQ的使用。RabbitMQ实战教程。

    【订阅专栏合集,作者所有付费文章都能看(持续更新)】

    推荐【Kafka教程https://bigbird.blog.csdn.net/article/details/108770504
    推荐【rabbitmq教程https://bigbird.blog.csdn.net/article/details/81436980
    推荐【Flink教程https://blog.csdn.net/hellozpc/article/details/109413465
    推荐【JVM面试与调优教程https://bigbird.blog.csdn.net/article/details/113888604
    推荐【SpringBoot全套教程https://blog.csdn.net/hellozpc/article/details/107095951
    推荐【SpringCloud教程

    展开全文
  • rabbitmq导致丢失数据---线上问题

    万次阅读 2021-01-26 10:44:42
    rabbitmq导致丢失数据—线上问题 数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ来分析一下吧。 生产者弄丢了数据 生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络...
  • 在linux下安装配置rabbitMQ详细教程

    万次阅读 多人点赞 2017-12-20 17:34:47
    安装Erlang ... 从Erlang Solution安装(此方式安装的erlang版本较高,和下文教程中rabbitMQ的版本不一致,建议安装高版本的rabbitMQ) # 添加erlang solutions源 $ wget https://packages.erlang...
  • RabbitMq安装教程(超详细)

    万次阅读 多人点赞 2020-08-14 11:42:31
    RabbitMq安装教程RabbitMq简介安装准备工具 RabbitMq简介 ##1.1消息队列中间件简介 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性...
  • centos7下RabbitMQ的安装

    千次阅读 2021-06-09 20:52:44
    RabbitMQ官网:...# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management centos安装 官网安装教程:https://www.rabbitmq.com/install-rpm.html 这里安
  • rabbitmq导致丢失数据—线上问题 数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ来分析一下吧。 1.如何保证生产者消息不丢失 以mq实现异步消峰为例,系统需要实现发送手机验证码,采用rabbitmq...
  • RabbitMq分布式事务解决方案第一篇

    万次阅读 2021-03-07 10:35:40
    可以考虑使用rocketMq事务消息 下面要介绍的是使用rabbitMq如何解决分布式事务 在分布式事务解决方案中,提到了一种思路,叫做柔性事务解决方案,柔性在这里的含义可以理解为尽最大可能满足数据的最终一致性,它结合...
  • Windows下RabbitMQ安装及配置

    万次阅读 多人点赞 2018-08-30 10:58:47
    rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器,Rabbit MQ 是建立在Erlang OTP平台上。 1、安装Erlang ...
  • rabbitmq面试题

    万次阅读 多人点赞 2018-12-12 09:53:59
    1.什么是rabbitmq 采用AMQP高级消息队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦 2.为什么要使用rabbitmq 1.在分布式系统下具备异步,削峰,负载均衡等一系列高级...
  • RabbitMQ教程_6 搭建集群

    万次阅读 2021-01-08 14:57:32
    默认情况下:RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问 架构图 ​ 核心解决问题: 当集群中...
  • (七)Linux环境搭建RabbitMQ

    万次阅读 2020-08-10 17:48:00
    搭建工具 1、Xshell:用于连接linux终端,方便操作。 2、Xftp:用于传输文件。 准备工作 ...1、安装Rabbitmq的语言环境 - erlang rpm -ivh erlang-20.1.7-1.el6.x86_64.rpm erl -v :测试是否安装成
  • ubuntu安装rabbitmq教程 避坑

    万次阅读 2020-09-28 00:39:35
    Ubuntu安装RabbitMQ教程 摘要 本篇主要给大家介绍ubuntu安装rabbitmq的过程以及相关遇到的坑,同时也避免自己忘记,所以写下这篇博文供大家学习。 相关版本 操作系统:Linux阿里云服务器 ubuntu:18.04(root用户) ...
  • rabbitmq官方的六种工作模式

    万次阅读 多人点赞 2018-09-04 14:58:21
    1.RabbitMq 1.1介绍 RabbitMQ是一个消息代理:它接受并转发消息。你可以把它当成一个邮局:当你想邮寄信件的时候,你会把信件放在投递箱中,并确信邮递员最终会将信件送到收件人的手里。在这个例子中,RabbitMQ就...
  • 一、问题背景 说明:项目中使用了rabbitmq,工程代码用SpringBoot对 rabbitmq集群进行了集成,现在用户服务启动报错,信息如下: 二、问题分析 1、代码中配置vhost为lolaage 2、查看rabbitmq中vhosts列表 root@...
  • docker安装rabbitmq无法进入管理页面

    万次阅读 2020-10-06 11:19:16
    说明:为什么不直接安装 docker pull rabbitmq 这个,因为这个安装后,开启对应端口后是不能直接访问它的管理后台,需要额外的命令开启,后面会将这种情况 容器运行,对应的端口开启 docker run -di --name=my...
  • 前言:前面两章我们已经开始了从安装到编写简单的demo的过程: ...RabbitMQ的学习(一):Windows下安装及配置RabbitMQ,erlang环境变量 RabbitMQ的学习(二):简单的java demo实现RabbitMQ的发送与接收 ...
  • laravel+rabbitMQ

    千次阅读 2020-09-14 16:38:31
    laravel+rabbitMQ前言一、安装Erlang环境1.安装erlang前安装下依赖文件2.去erlang官网下载erlang安装包3.解压4.编译安装5.配置环境变量二、安装rabbitmq1.下载2.解压3.操作4.添加用户二、laravel安装rabbitmq1....
  • RabbitMQ启动出现的问题与解决办法

    万次阅读 2019-07-25 10:26:39
    RabbitMQ启动出现的问题与解决办法 如果下面的文章解决不了您的问题,可以关注公众号:程序员开发者社区,点击与我联系,加我微信。尽量为您解答。 回复:谷歌插件 。 可以使用chrome 访问 google 了 。百度搜索...
  • docker安装RabbitMQ

    万次阅读 多人点赞 2020-08-30 13:18:27
    docker安装RabbitMQ 查看仓库里的RabbitMQ docker search rabbitmq 安装RabbitMQ docker pull rabbitmq 这里是直接安装最新的,如果需要安装其他版本在rabbitmq后面跟上版本号即可 启动RabbitMQ docker run -d --...
  • Centos7 安装rabbitmq详细教程

    万次阅读 多人点赞 2018-10-15 15:36:39
    一、RabbitMQ简单介绍 RabbitMQ就是当前最主流的消息中间件之一。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、...
  • rabbitmq-plugins enable rabbitmq_delayed_message_exchange 安装插件完成 四、在SpringBoot整合RabbitMQ 1、引入 RabbitMQ 依赖 org.springframework.boot spring-boot-starter-amqp 2、配置 RabbitMQ 信息 spring...
  • Linux安装RabbitMq步骤流程

    万次阅读 2020-01-09 17:47:38
    1:下载RabbitMq依赖的erlang语言安装包 wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm 最新的22版本 2:若缺少epel-release 依赖 yum install epel-release 3:安装erlang软件包 ...
  • Linux下安装RabbitMQ

    万次阅读 多人点赞 2019-07-13 18:42:45
    一、RabbitMQ介绍 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发...
  • docker 安装rabbitMQ

    万次阅读 2020-11-24 18:42:53
    搜索rabbitMq,进入官方的镜像,可以看到以下几种类型的镜像;我们选择带有“mangement”的版本(包含web管理页面); 拉取镜像 docker pull rabbitmq 查看所有镜像 docker images 安装和web界面启动 ...
  • SpringBoot:接入RabbitMQ(完整版)

    万次阅读 2020-07-06 16:55:15
    处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是 手工签收 // spring.rabbitmq.listener.simple.acknowledge-mode=manual Long deliveryTag = (Long)message.getHeaders().get...
  • RabbitMQ启动失败解决

    万次阅读 2019-06-14 15:04:01
    使用systemctl start rabbitmq-server.service启动rabbitmq时报错: [root@rhos5-rabbitmq1 ~]# systemctl start rabbitmq-server Jun 13 21:03:08 jeckersb-f20 systemd[1]: Starting RabbitMQ broker... Jun 13 21...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 205,104
精华内容 82,041
关键字:

rabbitmq