精华内容
下载资源
问答
  • SpringAMQP详解

    2021-01-28 21:29:29
    一,Spring AMQP介绍 Java API 方式编程,有什么问题? Spring 封装 RabbitMQ 的时候,它做了什么事情? 1、管理对象(队列、交换机、绑定) 2、封装方法(发送消息、接收消息) Spring AMQP 是对 Spring 基于 AMQP ...

    一,Spring AMQP介绍

    Java API 方式编程,有什么问题?

    Spring 封装 RabbitMQ 的时候,它做了什么事情?

    1、管理对象(队列、交换机、绑定)

    2、封装方法(发送消息、接收消息)

    Spring AMQP 是对 Spring 基于 AMQP 的消息收发解决方案,它是一个抽象层,不依赖于特定的 AMQP Broker 实现和客户端的抽象,所以可以很方便地替换。比如我们可以使用 spring-rabbit 来实现。

    二,Spring AMQP核心组件

    1,ConnectionFactory

    Spring AMQP 的连接工厂接口,用于创建连接。CachingConnectionFactory 是ConnectionFactory 的一个实现类。

    2,RabbitAdmin

    RabbitAdmin 是 AmqpAdmin 的实现,封装了对 RabbitMQ 的基础管理操作,比如对交换机、队列、绑定的声明和删除等。

    /**
     * @author yhd
     * @createtime 2021/1/28 19:24
     * @description Spring AMQP configuration class
     */
    @SpringBootConfiguration
    public class AMQPConfig {
    
        private static final String EXCHANGE_NAME = "amqp.yhd.exchange";
    
        private static final String QUEUE_NAME = "amqp.yhd.queue";
    
        private static final String ROUTING_KEY = "amqp.admin";
    
        @Bean
        public ConnectionFactory factory() {
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setAddresses("121.199.31.160");
            factory.setPort(5672);
            factory.setUsername("root");
            factory.setPassword("root");
            return factory;
        }
    
        @Bean
        public AmqpAdmin amqpAdmin( ConnectionFactory factory) {
            RabbitAdmin admin = new RabbitAdmin(factory);
            //声明一个交换机 交换机名  是否持久化  是否自动删除
            admin.declareExchange(new DirectExchange(EXCHANGE_NAME, true, false));
            //队列名 持久化 是否批处理  自动删除
            admin.declareQueue(new Queue(QUEUE_NAME, true, false, false));
            //声明一个绑定 队列名 ,绑定类型,交换机名,路由键 参数
            admin.declareBinding(new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, ROUTING_KEY, null));
            return admin;
        }
    }
    

    为什么我们在配置文件(Spring)或者配置类(SpringBoot)里面定义了交换机、队列、绑定关系,并没有直接调用 Channel 的 declare 的方法,Spring 在启动的时候就可以帮我们创建这些元数据?这些事情就是由 RabbitAdmin 完成的。

    RabbitAdmin 实 现 了 InitializingBean 接 口 , 里 面 有 唯 一 的 一 个 方 法afterPropertiesSet(),这个方法会在 RabbitAdmin 的属性值设置完的时候被调用。

    在 afterPropertiesSet ()方法中,调用了一个 initialize()方法。这里面创建了三个Collection,用来盛放交换机、队列、绑定关系。

    最后依次声明返回类型为 Exchange、Queue 和 Binding 这些 Bean,底层还是调用了 Channel 的 declare 的方法。

    declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
    declareQueues(channel, queues.toArray(new Queue[queues.size()]));
    declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
    

    3,Message

    Message 是 Spring AMQP 对消息的封装。两个重要的属性:body:消息内容。 messageProperties:消息属性。

    4,RabbitTemplate 消息模板

    RabbitTemplate 是 AmqpTemplate 的一个实现(目前为止也是唯一的实现),用来简化消息的收发,支持消息的确认(Confirm)与返回(Return)。跟 JDBCTemplate一 样 , 它 封 装 了 创 建 连 接 、 创 建 消 息 信 道 、 收 发 消 息 、 消 息 格 式 转 换(ConvertAndSend→Message)、关闭信道、关闭连接等等操作。

    针对于多个服务器连接,可以定义多个 Template。可以注入到任何需要收发消息的地方使用。

        /**
         * return callback   &&  confirm callable
         *
         * @param factory
         * @return
         */
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
            RabbitTemplate template = new RabbitTemplate(factory);
            template.setMandatory(true);
            template.setReturnCallback((Message message,
                                        int replyCode,
                                        String replyText,
                                        String exchange,
                                        String routingKey) -> {
    
            });
    
            template.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
                if (ack) {
                    log.info("消息确认成功!");
                } else {
                    log.info("消息确认失败!");
                }
            });
            return template;
        }
    

    5,Messager Listener 消息监听

    MessageListener

    MessageListener 是 Spring AMQP 异步消息投递的监听器接口,它只有一个方法onMessage,用于处理消息队列推送来的消息,作用类似于 Java API 中的 Consumer。

    MessageListenerContainer

    MessageListenerContainer可以理解为MessageListener的容器,一个Container只有一个 Listener,但是可以生成多个线程使用相同的 MessageListener 同时消费消息。

    Container 可以管理 Listener 的生命周期,可以用于对于消费者进行配置。

    例如:动态添加移除队列、对消费者进行设置,例如 ConsumerTag、Arguments、并发、消费者数量、消息确认模式等等。

        /**
         * 消息监听器容器
         * @param connectionFactory
         * @return
         */
        @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            //监听的队列
            container.setQueues(new Queue(QUEUE_NAME, true, false, false));
            // 最小消费者数
            container.setConcurrentConsumers(1);
            // 最大的消费者数量
            container.setMaxConcurrentConsumers(5);
            //是否重回队列
            container.setDefaultRequeueRejected(false);
            //签收模式
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setExposeListenerChannel(true);
            //消费端的标签策略
            container.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());
            return container;
        }
    

    在 SpringBoot2.0 中新增了一个 DirectMessageListenerContainer。

    MessageListenerContainerFactory

    Spring 去整合 IBM MQ、JMS、Kafka 也是这么做的。

        /**
         * 
         * @param connectionFactory
         * @return
         */
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            factory.setAcknowledgeMode(AcknowledgeMode.NONE);
            factory.setAutoStartup(true);
            return factory;
        }
    

    可以在消费者上指定,当我们需要监听多个 RabbitMQ 的服务器的时候,指定不同的 MessageListenerContainerFactory。

    @Slf4j
    @Component
    @PropertySource("classpath:application.properties")
    @RabbitListener(queues = "${amqp.yhd.queue}", containerFactory = "rabbitListenerContainerFactory")
    public class FirstConsumer {
    
        @RabbitHandler
        public void process(@Payload String message) {
            log.info("First Queue received msg : {}", message);
        }
    }
    

    继承关系

    在这里插入图片描述

    6,转换器 MessageConvertor

    MessageConvertor 的 作用?

    RabbitMQ 的消息在网络传输中需要转换成 byte[](字节数组)进行发送,消费者需要对字节数组进行解析。

    在 Spring AMQP 中,消息会被封装为 org.springframework.amqp.core.Message对象。消息的序列化和反序列化,就是处理 Message 的消息体 body 对象。

    如果消息已经是 byte[]格式,就不需要转换。

    如果是 String,会转换成 byte[]。

    如果是 Java 对象,会使用 JDK 序列化将对象转换为 byte[](体积大,效率差)。

    在 调 用 RabbitTemplate 的 convertAndSend() 方 法 发 送 消 息 时 , 会 使 用MessageConvertor 进行消息的序列化,默认使用 SimpleMessageConverter。

    在某些情况下,我们需要选择其他的高效的序列化工具。如果我们不想在每次发送消息时自己处理消息,就可以直接定义一个 MessageConvertor。

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
    

    MessageConvertor 如何 工作?

    调 用 了 RabbitTemplate 的 convertAndSend() 方 法 时 会 使 用 对 应 的MessageConvertor 进行消息的序列化和反序列化。

    序列化:Object —— Json —— Message(body) —— byte[]

    反序列化:byte[] ——Message —— Json —— Object

    有 哪些 MessageConvertor ?

    在 Spring 中提供了一个默认的转换器:SimpleMessageConverter。

    Jackson2JsonMessageConverter(RbbitMQ 自带):将对象转换为 json,然后再转换成字节数组进行传递。

    如何 自定义 MessageConverter ?

    例如:我们要使用 Gson 格式化消息:

    创建一个类,实现 MessageConverter 接口,重写 toMessage()和 fromMessage()方法。

    toMessage(): Java 对象转换为 Message
    fromMessage(): Message 对象转换为 Java 对象
    

    三,SpringBoot集成RabbitMQ

    为什么没有定义 Spring AMQP 的任何一个对象,也能实现消息的收发?Spring Boot 做了什么?

    老套路

    源码:RabbitAutoConfiguration

    在这里插入图片描述

    展开全文
  • SpringAmqp参数说明

    2021-09-02 17:03:50
    @Component public class Listener { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange",
    @Component
    public class Listener {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "spring.test.queue", durable = "true"),
                exchange = @Exchange(
                        value = "spring.test.exchange",
                        ignoreDeclarationExceptions = "true",
                        type = ExchangeTypes.TOPIC
                ),
                key = {"#.#"}))
        public void listen(String msg){
            System.out.println("接收到消息:" + msg);
        }
    }
    
    • @Componet:类上的注解,注册到Spring容器
    • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
      • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
        • value:这个消费者关联的队列。值是@Queue,代表一个队列
        • exchange:队列所绑定的交换机,值是@Exchange类型
        • key:队列和交换机绑定的RoutingKey

    类似listen这样的方法在一个类中可以写多个,就代表多个消费者。

    展开全文
  • 一个使用springamqp实现的异步消息队列的股票系统,来自springamqp的官网,对于学习springamqp很有帮助。
  • SpringAMQP整合RabbitMQ-五种工作模式Demo

    千次阅读 多人点赞 2021-10-31 10:08:14
    一.MQ基本概念 1.概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器,是存储消息的中间件 ...AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用

    一.MQ基本概念

    1.概述

    • MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器,是存储消息的中间件
    • 分布式系统通信有两种方式,一种是直接远程调用(例如Feign),一种是借助类似于rabbitmq的第三方中间件完成间接通信,即异步通讯

    在这里插入图片描述
    2.MQ的优势

    • 应用解耦:提高系统容错性和可维护性
    • 异步提速:提升用户体验和系统吞吐量
    • 削峰填谷:提高系统稳定性

    3.几种常见MQ的对比

    RabbitMQActiveMQRocketMQKafka
    公司/社区RabbitApache阿里Apache
    开发语言ErlangJavaJavaScala&Java
    协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
    可用性一般
    单机吞吐量一般非常高
    消息延迟微秒级毫秒级毫秒级毫秒以内
    消息可靠性一般一般

    追求可用性:Kafka、 RocketMQ 、RabbitMQ

    追求可靠性:RabbitMQ、RocketMQ

    追求吞吐能力:RocketMQ、Kafka

    追求消息低延迟:RabbitMQ、Kafka

    二.RabbitMQ

    1.AMQP

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等

    在这里插入图片描述
    2.RabbitMQ

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

    2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。
    Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛

    3.RabbitMQ中的相关概念

    在这里插入图片描述

    • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
    • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
    • Connection:publisher/consumer 和 broker 之间的 TCP 连接
    • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销
    • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到
      queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
    • Queue:消息最终被送到这里等待 consumer 取走
    • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

    请添加图片描述

    三.Docker部署RabbitMQ

    拉取镜像

    docker pull rabbitmq:management
    

    启动docker

    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
    

    四.SpringAMQP

    SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便

    SpringAMQP提供了三个功能:

    • 自动声明队列、交换机及其绑定关系
    • 基于注解的监听器模式,异步接收消息
    • 封装了RabbitTemplate工具,用于发送消息

    五.RabbitMQ工作模式

    工作模式说明
    Basic Queue 简单模式一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
    Work Queue 工作队列模式一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
    Publish/subscribe 发布订阅模式需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
    Routing 路由模式需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
    Topic 通配符模式需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列

    1.Basic Queue 简单模式

    请添加图片描述

    • publisher:消息生产者,也就是要发送消息的程序
    • consumer:消息的消费者,会一直等待消息到来
    • queue:消息队列,类似一个邮箱,可以缓存消息;发布者向其中投递消息,消费者从其中取出消息

    代码实现

    publisher服务端

    ①引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    ②配置MQ地址,在publisher服务的application.yml中添加配置

    spring:
      rabbitmq:
        host: 101.43.16.42 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: henrik # 用户名
        password: yao # 密码
    

    ③在publisher服务中编写测试类SpringAmqpTest,利用RabbitTemplate实现消息发送

    @SpringBootTest
    public class SpringAmqpTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue() {
            // 队列名称
            String queueName = "simple.queue";
            // 消息
            String message = "hello, spring amqp!";
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
    

    consumer服务端

    ①引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    ②配置MQ地址,在consumer服务的application.yml中添加配置

    spring:
      rabbitmq:
        host: 101.43.16.42 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: henrik # 用户名
        password: yao # 密码
    

    ③consumer服务的listener包中新建一个类SpringRabbitListener

    @Component
    public class SpringRabbitListener {
    
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg) throws InterruptedException {
            System.out.println("spring 消费者接收到消息:【" + msg + "】");
        }
    }
    

    ④启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息


    所有操作均需引入依赖和配置rabbitmq,后续demo不再重复


    2.Work queues 工作队列模式

    请添加图片描述

    • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息
    • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
    • 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
    • Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可

    publisher服务端

    publisher服务中的SpringAmqpTest类中添加一个测试方法

    /**
         * workQueue
         * 向队列中不停发送消息,模拟消息堆积。
         */
    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, message_";
        for (int i = 0; i < 50; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    

    consumer服务端

    为了模拟多个消费者绑定同一个队列,在consumer服务的SpringRabbitListener中添加2个新的方法

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
    
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
    

    3.Pub/Sub 订阅模式

    请添加图片描述
    在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

    • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • Consumer:消费者,消息的接收者,会一直等待消息到来
    • Queue:消息队列,接收消息、缓存消息
    • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、
      递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    三种类型的交换机

    1. Fanout:广播,将消息交给所有绑定到交换机的队列
    2. Direct:定向,把消息交给符合指定routing key 的队列
    3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列 Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合
      路由规则的队列,那么消息会丢失

    consumer端

    在springboot添加配置类,借助springboot自动装配bean,声明队列和交换机

    @Configuration
    public class FanoutConfig {
        /**
         * 声明交换机
         * @return Fanout类型交换机
         */
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("henrik.fanout");
        }
    
        /**
         * 第1个队列
         */
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
    
        /**
         * 绑定队列和交换机
         */
        @Bean
        public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
        }
    
        /**
         * 第2个队列
         */
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
    
        /**
         * 绑定队列和交换机
         */
        @Bean
        public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
        }
    }
    

    publisher服务端

    在publisher服务的SpringAmqpTest类中添加测试方法

    @Test
    public void testFanoutExchange() {
        // 队列名称
        String exchangeName = "henrik.fanout";
        // 消息
        String message = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
    

    consumer服务端

    在consumer服务的SpringRabbitListener中添加两个方法,作为消费者

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }
    
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }
    

    4.Routing 路由模式

    在这里插入图片描述

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
    • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
    • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routing key 与消息的 Routing key 完全一致,才会接收到消息
      图解:
    • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
    • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

    consumer服务端

    基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
    在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }
    

    publisher服务端

    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
    

    5.Topics 通配符模式

    在这里插入图片描述

    • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符
    • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
    • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc
      或者 item.insert,item.* 只能匹配 item.insert

    请添加图片描述

    • Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather
    • Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news

    publisher服务端

    /**
         * topicExchange
         */
    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "henrik.topic";
        // 消息
        String message = "喜报!孙悟空大战哥斯拉,胜!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }
    

    consumer服务端

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "henrik.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "henrik.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
    

    六.消息转换器

    Spring会把发送的消息序列化为字节发送给MQ,接收消息的时候,会把字节反序列化为Java对象。只不过,默认情况下Spring采用的序列化方式是JDK序列化。JDK序列化存在下列问题:数据体积过大,有安全漏洞,可读性差。
    所以JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
    ①引入依赖

    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
    

    ②配置消息转换器,在启动类中添加一个bean即可

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    展开全文
  • SpringAMQP Connecion Factory

    2021-04-27 14:55:27
    文章目录AMQP ComponentMessageExchangeQueueBindingConnection FactoryPooledChannelConnectionFactoryThreadChannelConnectionFactoryCachingConnectionFactory连接工厂的配置设置连接名称监听连接阻塞事件模拟...

    AMQP Component


    Message

    Message是消息体body[]与消息属性MessageProperties的封装,如下所示:

    public class Message {
    
        private final MessageProperties messageProperties;
    
        private final byte[] body;
    
        public Message(byte[] body, MessageProperties messageProperties) {
            this.body = body;
            this.messageProperties = messageProperties;
        }
    
        public byte[] getBody() {
            return this.body;
        }
    
        public MessageProperties getMessageProperties() {
            return this.messageProperties;
        }
    }
    

    Exchange

    Exchange是收发消息的中转站,其中属性定义如下所示,见名知意:

    public interface Exchange {
    
        String getName();
    
        String getExchangeType();
    
        boolean isDurable();
    
        boolean isAutoDelete();
    
        Map<String, Object> getArguments();
    
    }
    

    Queue

    Queue是接受消息的组件,Exchange通过对应的Routing key路由到响应的Queue,该类如下所示:

    public class Queue  {
    
        private final String name;
    
        private volatile boolean durable;
    
        private volatile boolean exclusive;
    
        private volatile boolean autoDelete;
    
        private volatile Map<String, Object> arguments;
    
        /**
         * The queue is durable, non-exclusive and non auto-delete.
         *
         * @param name the name of the queue.
         */
        public Queue(String name) {
            this(name, true, false, false);
        }
    
        // Getters and Setters omitted for brevity
    
    }
    

    Binding

    Binding是将ExchangeQueue绑定起来的组件,如下所示:

    new Binding(someQueue, someDirectExchange, "foo.bar");
    

    同时还提供了BindingBuilder,使用链式语法的形式来构造Binding,如下所示

    Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
    

    Connection Factory


    Spring提供了三个连接工厂供我们选择,常用的CachingConnectionFactory。这三个工厂都支持publisher confirmations。对于大多数用例,应该使用PooledChannelConnectionFactory。如果您想确保严格的消息排序而不需要使用范围操作,那么可以使用ThreadChannelConnectionFactory。如果你想使用相关的发布者确认,或者你想通过它的CacheMode打开多个连接,应该使用CachingConnectionFactory。

    PooledChannelConnectionFactory

    该工厂基于Apache Pool2管理一个连接和两个通道池。 一个池用于事务性通道,另一个池用于非事务性通道。 池是具有默认配置的GenericObjectPool。 提供回调以配置池; 有关更多信息,请参考Apache文档。

    @Bean
    PooledChannelConnectionFactory pcf() throws Exception {
        ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
        rabbitConnectionFactory.setHost("localhost");
        PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
        pcf.setPoolConfigurer((pool, tx) -> {
            if (tx) {
                // configure the transactional pool
            }
            else {
                // configure the non-transactional pool
            }
        });
        return pcf;
    }
    

    ThreadChannelConnectionFactory

    这个工厂管理一个连接和两个ThreadLocal,一个用于事务性通道,另一个用于非事务性通道。这个工厂确保同一个线程上的所有操作使用相同的通道(只要通道保持打开)。这促进了严格的消息排序,而不需要作用域操作。为了避免内存泄漏,如果您的应用程序使用了许多短命线程,您必须调用工厂的closeThreadChannel()来释放通道资源。

    CachingConnectionFactory

    第三个工厂是CachingConnectionFactory,这也是常用的工厂,见名知意,它具有缓存的功能,通过CacheMode来设置,它有以下两个缓存模式:

    • CHANNEL 缓存通道,默认缓存模式。
    • CONNECTION 缓存连接和每个连接中的通道。

    默认通道缓存大小为25

    DEFAULT_CHANNEL_CACHE_SIZE = 25
    

    CachingConnectionFactory还提供了一个connectionLimit属性用于限制连接数的大小。当超出了最大连接数时,通过等待channelCheckoutTimeout毫秒获取连接。

    private Connection connectionFromCache() {
    	...
    	if (cachedConnection == null && countOpenConnections() >= this.connectionLimit) {
    			cachedConnection = waitForConnection(now);
    	}
    	...
    }
    private ChannelCachingConnectionProxy waitForConnection(long now) {
    	...
    		if (countOpenConnections() >= this.connectionLimit) {
    			try {
    				this.connectionMonitor.wait(this.channelCheckoutTimeout);
    				cachedConnection = findIdleConnection();
    			}
    			catch (InterruptedException e) {
    				Thread.currentThread().interrupt();
    				throw new AmqpException("Interrupted while waiting for a connection", e);
    			}
    		}
    	...
    }
    
    

    缓存的大小不是限制channel的数量,它只是限制能缓存的数量。如果在RabbitMQ Admin UI界面看到许多channel正在快速的创建和关闭,则需要扩大channel的缓存大小。

    channelCheckoutTimeout参数大于0时,它会限制channel的数量,如果达到限制,则调用线程阻塞,直到通道可用或达到此超时,在这种情况下将引发 AmqpTimeoutException。在缓存模式为Connection时,会通过Semaphore来控制获取channel的数量。

    this.checkoutPermits.put(cachedConnection, new Semaphore(this.channelCacheSize));
    

    在获取channel中会判断channelCheckoutTimeout是否大于0,获取许可证。

    private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
    	...
    	if (this.channelCheckoutTimeout > 0) {
    		permits = obtainPermits(connection);
    	}
    	...
    	if (channel == null) {
    		try {
    			channel = getCachedChannelProxy(connection, channelList, transactional);
    		}
    		catch (RuntimeException e) {
    			if (permits != null) {
    				permits.release();
    				if (logger.isDebugEnabled()) {
    					logger.debug("Could not get channel; released permit for " + connection + ", remaining:"
    							+ permits.availablePermits());
    				}
    			}
    			throw e;
    		}
    	}
    	return channel
    }
    private Semaphore obtainPermits(ChannelCachingConnectionProxy connection) {
    	Semaphore permits;
    	permits = this.checkoutPermits.get(connection);
    	if (permits != null) {
    		try {
    			if (!permits.tryAcquire(this.channelCheckoutTimeout, TimeUnit.MILLISECONDS)) {
    				throw new AmqpTimeoutException("No available channels");
    			}
    			if (logger.isDebugEnabled()) {
    				logger.debug(
    						"Acquired permit for " + connection + ", remaining:" + permits.availablePermits());
    			}
    		}
    		catch (InterruptedException e) {
    			Thread.currentThread().interrupt();
    			throw new AmqpTimeoutException("Interrupted while acquiring a channel", e);
    		}
    	}
    	else {
    		throw new IllegalStateException("No permits map entry for " + connection);
    	}
    	return permits;
    }
    

    框架中使用的通道(例如RabbitTemplate)会可靠地返回到缓存中。如果您在框架之外创建通道(例如,通过直接访问连接并调用createChannel()),则必须(通过关闭)可靠地返回它们,在finally块中,以避免通道耗尽。

    连接工厂的配置


    设置连接名称

    //方式一
    connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
    
    //方式二
    @Bean
    public SimplePropertyValueConnectionNameStrategy cns() {
        return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
    }
    
    @Bean
    public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        ...
        connectionFactory.setConnectionNameStrategy(cns);
        return connectionFactory;
    }
    

    监听连接阻塞事件

    对于内存告警和磁盘报警,使用BlockedListener进行监听。AbstractConnectionFactory 通过其内部的 BlockedListener 实现,分别发出 connectionblockeddeventconnectionunblockeddevent

    connectionFactory.addConnectionListener(new ConnectionListener() {
        @Override
        public void onCreate(Connection connection) {
            connection.addBlockedListener(new BlockedListener() {
                @Override
                public void handleBlocked(String s) throws IOException {
                    System.out.println("我被阻塞了");
                }
    
                @Override
                public void handleUnblocked() throws IOException {
                    System.out.println("我未被阻塞");
                }
            });
            System.out.println("我被创建了");
        }
    }
    

    模拟内存告警

    使用rabbitmqctl set_vm_memory_high_watermark 0.01命令将内存阈值从0.4调回0.01,当发送一条消息,会在Rabbit Admin UI界面在看到对应节点的内存信息已经报红了。此时会触发connectionblockeddevent

    模拟磁盘告警

    使用rabbitmqctl set_disk_free_limit 170G设置空闲磁盘大小如果小于该值对应Rabbit Admin Ui界面节点就会报红。

    从版本1.7.7开始,提供了一个AmqpResourceNotAvailableException,当SimpleConnection.createChannel()不能创建通道时抛出该异常(例如,因为达到了channelMax限制并且缓存中没有可用的通道)。您可以在RetryPolicy中使用此异常来恢复某些回退之后的操作。

    自动恢复

    4.0.x客户端默认启用自动恢复。虽然与此特性兼容,但Spring AMQP有自己的恢复机制,通常不需要客户机恢复特性。我们建议禁用amqp-client自动恢复,以避免在代理可用但连接尚未恢复时获得AutoRecoverConnectionNotCurrentlyOpenException实例。你可能会注意到这个异常,例如,当在RabbitTemplate中配置了一个RetryTemplate,甚至当故障转移到集群中的另一个broker时。由于自动恢复连接是在计时器上恢复的,因此使用Spring AMQP的恢复机制可以更快地恢复连接。从版本1.7.1开始,Spring AMQP禁用AMQP-Client自动恢复,除非你显式地创建自己的RabbitMQ连接工厂,并将其提供给CachingConnectionFactory。默认情况下,由RabbitConnectionFactoryBean创建的RabbitMQ ConnectionFactory实例也禁用了该选项。

    多连接工厂配置

    通过SimpleRoutingConnectionFactory配置多个连接工厂:

    // 连接工厂配置
    @Bean("connectionFactory1")
    public ConnectionFactory connectionFactory1() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("123456");
        connectionFactory.setConnectionNameStrategy(connectionFactory1 -> "MY_CONNECTION1");
        //设置缓存模式为连接
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        return connectionFactory;
    }
    
    @Bean("connectionFactory2")
    public ConnectionFactory connectionFactory2() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("newadmin");
        connectionFactory.setPassword("123456");
        connectionFactory.setConnectionNameStrategy(connectionFactory1 -> "MY_CONNECTION2");
        connectionFactory.setVirtualHost("test");
    
        //设置缓存模式为连接
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        return connectionFactory;
    }
    //路由连接工厂配置
    @Bean("simpleConnectionFactory")
    @Primary //优先注入该连接工厂
    public ConnectionFactory simpleConnectionFactory(@Qualifier("connectionFactory1")ConnectionFactory one,@Qualifier("connectionFactory2")ConnectionFactory two) {
        SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();
        Map<Object, ConnectionFactory> map = new HashMap<>();
        map.put("one",one);
        map.put("two",two);
        simpleRoutingConnectionFactory.setTargetConnectionFactories(map);
        return simpleRoutingConnectionFactory;
    }
    

    发送消息

    ApplicationContext context = new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    RabbitTemplate amqpTemplate = context.getBean(RabbitTemplate.class);
    for (int i = 0; i < 10; i++) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //使用connectionFactory2
                SimpleResourceHolder.bind(amqpTemplate.getConnectionFactory(),"two");
                amqpTemplate.convertAndSend("Hello World");
                System.out.println("Sent: Hello World");
                //解绑使用connectionFactory2
                SimpleResourceHolder.unbind(amqpTemplate.getConnectionFactory());
                countDownLatch.countDown();
            }
        });
    }
    countDownLatch.await();
    executorService.shutdownNow();
    

    Connection and Channel Listeners

    简单列举Connection和Channel的创建事件。关闭、停止事件未列出。

    connectionFactory.addConnectionListener(new ConnectionListener() {
        @Override
        public void onCreate(Connection connection) {
            connection.addBlockedListener(new BlockedListener() {
                @Override
                public void handleBlocked(String s) throws IOException {
    
                }
    
                @Override
                public void handleUnblocked() throws IOException {
    
                }
            });
        }
    });
    connectionFactory.addChannelListener(new ChannelListener() {
        @Override
        public void onCreate(Channel channel, boolean transactional) {
    
        }
    });
    

    定义客户端属性

    Rabbit Admin UI界面中能够看到。

    connectionFactory.getRabbitConnectionFactory().getClientProperties().put("Name","WuHan");
    
    展开全文
  • SpringAMQP--FanoutExchange

    2021-11-10 00:00:11
    来表示所有不同类型的交换机: 在consumer中创建一个类,声明队列和交换机: import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework....
  • SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 提供了三个功能: 自动声明队列、交换机及其绑定关系 基于注解的监听器模式,异步接收消息 封装了...
  • 文章目录Rabbit MQ 和Spring AMQP学习三(消息的可靠性)Producer端对于消息投递的可靠性可能会出现问题的地方解决措施如何确定消息能否到达对应的Exchange到达Exchange之后,如何确定消息可以到达对应的...
  • 一、简单模式 简单模式是最简单的消息模式,它...import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configurat..
  • SpringAMQP

    2021-10-29 10:57:42
    SpringAMQP SpringAMQP官网 Spring AMQP是基于AMQP协议定义的一套API规范,提供模板来发送和接收消息。 spring-amqp是基础抽象;spring-rabbit是底层默认实现 特征 侦听器容器,用于异步处理入站消息 用于...
  • 文章目录RabbitMQ和Spring AMQP学习二RabbitMQ消息模型BrokerConnectionChannel背景使用Java ClientSpring AMQP使用详解消息队列使用规范哪些配置该配置在生产者端?哪些配置该配置在消费者端?使用...
  • SpringAMQP-消息转换器

    2021-10-15 17:18:40
    一、声明队列 /** * 声明队列 * @return */ @Bean public Queue objectQueue2() { return new Queue("object.Queue"); } 二、测试 发送 @Test public void testSendObjectMessage() { ...
  • I want to implement Spring AMQP example for sending and receiving Java Objects using listener. I tried this:Send Java ObjectConnectionFactory connectionFactory = new CachingConnectionFactory("localhos...
  • spring-amqp-2.0.3.RELEASE.jar
  • spring配置rabbitMQ(Spring AMQP

    千次阅读 2017-09-05 17:06:16
    本人rabbitMQ版本是3.6.5,此处省略rabbitMQ的安装方式。 windows的安装方式(十分详细):http://blog.csdn.net/chwshuang/article/details/50543878...linux的安装方式:待尝试 一、maven依赖 ... amqp-client 4.
  • Basic Queue 简单队列模型 在父工程mq-demo中引入依赖 <!--AMQP依赖,包含RabbitMQ-->...spring-boot-starter-amqp</artifactId> </dependency> 消息发送 首先配置MQ地址,在publ
  • 我们正在使用Spring rabbitmq进行项目.我们目前正在测试不同的故障转移方案,以防止生产中出现任何进一我们正在使用以下属性:????听众:??????类型:直接??????直接:????????确认模式:手动我们面临的是,当网络出现...
  • SpringAMQP的使用

    2021-09-09 17:52:05
    SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配。
  • Spring AMQP

    2021-08-30 17:19:49
    文章目录引入使用依赖配置配置消费者@Componet@RabbitListener@QueueBinding模板类AmqpTemplate发送方什么时候发?发送什么内容?...在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的J
  • Spring AMQP 笔记

    2021-10-26 15:32:23
    AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议 概念 Publisher:发布者 Consumer:消费者 Message:消息 Queue:消息队列 Exchange:交换机,把发送的消息按照...
  • RabbitMQ\Spring AMQP

    2021-03-31 15:57:36
    AMQP的消息模型更加丰富 常见MQ产品 ActiveMQ:基于JMS RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好 RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会 Kafka:分布式消息系统,高吞吐量 RabbitMQ ...
  • } } 完成后启动application启动类,然后就可以接收消息了,如下 注意:消费完后队列里的消息会被销毁,所以再去rabbitmq的队列里就看不见已经消费过的消息了 下面通过SpringAMQP创建做一个工作队列(work模型),就是...
  • 文章目录Rabbit MQ和Spring AMQP学习四(JSON消息体)Jackson2JsonMessageConverter配置Producer端配置Consumer端配置Consumer端接收JSON Rabbit MQ和Spring AMQP学习四(JSON消息体) 实际应用场景中,消息的发送和...
  • 利用channel将消费者与队列绑定 3.SpringAMQP SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 SpringAmqp的官方地址:...
  • 使用Spring AMQP - spring-amqpSpring Boot在这里,我试图实施死信交换 . 我正在向队列发送消息,如果发生某些业务异常,那么它应该将消息发送到“dlq”队列并在那里等待5秒然后它应该进入队列再次处理..... 5次...
  • 【RabbitMQ】学习笔记-p2(SpringAMQP

    多人点赞 热门讨论 2021-11-15 19:54:10
    SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 SpringAmqp的官方地址:https://spring.io/projects/spring-amqp 什么是SpringAMQPSpringAMQP提供了三...
  • 注册流程+SpringAMQP

    2021-12-02 21:12:21
    基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机: @RabbitListener(bindings = @QueueBinding...
  • 文章目录RabbitMQ和Spring AMQP学习五(延迟队列)使用场景使用前准备选择方案安装插件编码声明延迟exchange查看控制台发送消息设置延迟时间简单验证示例代码参考 RabbitMQ和Spring AMQP学习五(延迟队列) 使用场景...
  • spring amqp异步队列例子helloworld

    千次阅读 2014-11-08 12:09:06
    在官方例子https://github.com/spring-projects/spring-amqp-samples/tree/master/helloworld进行清理 一.Producer端: 1.定义一个队列名,配置连接工厂,发送工具类.
  • import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 模糊匹配 * 定义路由键匹配规则 * a.b.c...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 34,814
精华内容 13,925
关键字:

springamqp

spring 订阅
友情链接: esign.zip