精华内容
下载资源
问答
  • 首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么数据!当数据量特别大的时候,我们对...

    消费端限流

    1. 为什么要对消费端限流

    假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!

    当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

    2.限流的 api 讲解

    RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。

    /*** Request specific "quality of service" settings.* These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* @param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @param global true if the settings should be applied to the* entire channel rather than each consumer* @throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
    • prefetchSize:0,单条消息大小限制,0代表不限制
    • prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
    • global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。
    • 注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。

    3.如何对消费端进行限流

    • 首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
    • 第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
    • 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);

    这是生产端代码,与前几章的生产端代码没有做任何改变,主要的操作集中在消费端。

    import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class QosProducer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_qos_exchange"; String routingKey = "item.add"; //5. 发送 String msg = "this is qos msg"; for (int i = 0; i < 10; i++) { String tem = msg + " : " + i; channel.basicPublish(exchangeName, routingKey, null, tem.getBytes()); System.out.println("Send message : " + tem); } //6. 关闭连接 channel.close(); connection.close(); }}

    这里我们创建了两个消费者,以方便验证限流api中的 global 参数设置为 true 时不起作用.。整体结构如下图所示,两个 Consumer 都绑定在同一个队列上,这样的话两个消费者将共同消费发送的10条消息。

    4dc3878d43629026932a8e33f192e33c.png
    import com.rabbitmq.client.*;import java.io.IOException;public class QosConsumer { public static void main(String[] args) throws Exception { //1. 创建一个 ConnectionFactory 并进行设置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通过连接工厂来创建连接 Connection connection = factory.newConnection(); //3. 通过 Connection 来创建 Channel final Channel channel = connection.createChannel(); //4. 声明 String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String queueName1 = "test_qos_queue_1"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic
    展开全文
  • Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于 大数据实时处理领域。 在传统消息队列中分为两种,一种是同步消息队列,即让用户等待流程完成: 一种叫异步消息队列,即你的请求我...

    传统消息队列

    在信息系统传输信息中,不可能依靠某一性能来决定先后顺序,应该统一按照先来后到排队。
    Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于
    大数据实时处理领域。
    在传统消息队列中分为两种,一种是同步消息队列,即让用户等待流程完成:
    在这里插入图片描述
    一种叫异步消息队列,即你的请求我收到了,我先给你弄着,你先去忙其他的事情吧:
    在这里插入图片描述

    消息队列最大的优点有两个:解耦与削峰。同时还有很多其他优点:

    • 解耦
      允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
    • 可恢复性
      系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所
      以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
    • 缓冲
      有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致
      的情况。
    • 灵活性 & 峰值处理能力
      在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。
      如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
    • 异步通信
      很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户
      把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

    消费模式

    消息队列中服务端(消费者)获取队列中客户端(生产者)发送的信息的行为被称为消费。
    消费模式主要分为两种:

    点对点模式

    在这里插入图片描述

    队列中的一条消息由一个专门的消费者进行消费,消费者受到这条消息并确认后,队列就会删除这条消息,防止重复访问。

    发布/订阅模式

    在这里插入图片描述
    生产者将数据推送入队列,同一条消息会被所有消费者消费,而消费有两种情况:

    • 消费者主动拉取消息(消费者饿了会自己吃饭,吃饱了就不吃了)
    • 队列向消费者推送消息(队列主动喂消费者,消费者要么撑死,要么饿死)

    kafka选择的是消费者拉取消息,这样可以保证消费者处理自己可以处理的消息量。

    展开全文
  • 2工作模式 消费者监听同一个队列消费者监听一个队列,但是消费者中只有一个会成功消费 3订阅模式 个消息队列,每个消息有一个消费者监听 。生产者发的消息可以被每一个消费者接收。 4 路由模式 一...

    异步化处理, 时间换空间

    优点:1异步处理响应速度快2削峰3高可用4扩展性 解耦合

    缺点:1,时效性差,降低用户体,2系统复杂了

    mq的模式

     1简单模式,一个消费只有一个消费者  生产者创建消息发到队列,消费者从队列消费。

     2工作模式  多个消费者监听同一个队列, 多个消费者监听一个队列,但是多个消费者中只有一个会成功消费

     3订阅模式  多个消息队列,每个消息有一个消费者监听 。生产者发的消息可以被每一个消费者接收。

     4 路由模式 一个交换机绑定多个消息队列,没个消息队列独有自己的唯一key。每个消息队列有一个消息者监听

     

    mq的保证机制

       1保证发送者发送成功 :添加事务,很费资源,一般不用。

       2消息确认和return机制:

              消息确认:提供是否成功发送交换机。return机制是确认消息分发到队列。具体操作是 发送的时候,开启消息确认机制。 之后接收消息确认。(有同步,有异步);

             return机制:发的的时候开启return机制处理添加 return 监听器 回调函数。

    延时机制,死信队列实现消息延时

           队列是及时的,没有直接设置延时的的功能,用TTL(Time To Live)实现,

                             消息和消息队列都可以设置失效时间,设置消息的时间的时候一般是一样的,一般不能前大后小的,这样的话大的消费不了,小的那个消费不了。 消息队列用的少,一般设置队列消息。那个同时设置的时候小的延时时间。实现方式,定义时间的私                           信,故意让他过期,过期后转到新的队列消费,实现延时功能。

     使用场景:

          1异步 例如:注册后发邮件,下订单发消息

          2重试机制发邮件问题

          3延时队列 做限时取消的

          4消息通信

          5秒杀抢购-----队列拦截请求

            

    展开全文
  • 前几篇我们介绍了如果通过RabbitMQ发布一个简单的消息,再到工作队列消费者进行消费,最后再到工作队列的分发与消息的应答机制(ACK);之前我们分享的这几种模式,都是被消费之后就...

    前几篇我们介绍了如果通过RabbitMQ发布一个简单的消息,再到工作队列,多个消费者进行消费,最后再到工作队列的分发与消息的应答机制(ACK);

    之前我们分享的这几种模式,都是被消费之后就从队列中被删除了,理想状态下不会被重复消费,试想我们另外一种场景,比如我之前做的小说业务,用户在登录成功后,需要将临时账户的金币和书架的书籍信息同步到正式账户。

    如果我们跟登录融合在一块,登录成功之后,如果用户账户或者书架同步失败,那么势必影响我们整个登录的体验。为了更好地做到用户无感知,不需要用户做更多的操作,那么我们就使用消息队列的方式,来进行异步同步。

    发布订阅模式

    这就是我们一个用户数据同步的流程图,也是RabbitMQ发布订阅的流程图,大家可能注意到了中间怎么多了一个交换机

    这里要注意,使用发布订阅模式,这里必须将交换机与队列进行绑定,如果不绑定,直接发送消息,这个消息是不会发送到任何队列的,更不会被消费。

    交换机种类

    交换机总共分四种类型:分别是direct、topic、headers、fanout。这次我们主要讲fanout,因为这是我们本次需要用到的交换机类型。

    fanout顾名思义就是广播模式。它会把消息推送给所有订阅它的队列。

    代码

    生产者

    public class Send {
    
        /**
         * 交换机名称
         */
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 获取连接
            Connection connection = MQConnectUtil.getConnection();
    
            // 创建通道
            Channel channel = connection.createChannel();
    
            // 声明交换机  fanout:分发模式,分裂
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 消息内容
            String msg = "我是一个登录成功的消息";
    
            // 发送消息
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
    
            System.out.println("消息发送成功:" + msg);
    
            channel.close();
            connection.close();
        }
    }
    

    消费者-同步账户

    public class Consumer1 {
    
        /**
         * 交换机名称
         */
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        private final static String QUEUE_NAME = "test_topic_publish_account";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 获取连接
            Connection connection = MQConnectUtil.getConnection();
    
            // 创建通道
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 将队列绑定到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 保证一次只接收一个消息,保证rabbitMQ每次将消息发送给闲置的消费者
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
    
                    String msg = new String(body, StandardCharsets.UTF_8);
    
                    System.out.println("同步账户[1]:" + msg);
    
                    Thread.sleep(1000);
    
                    // 手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
    
            // 监听队列
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    

    消费者-同步书架

    public class Consumer2 {
    
        /**
         * 交换机名称
         */
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        private final static String QUEUE_NAME = "test_topic_publish_book_case";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 获取连接
            Connection connection = MQConnectUtil.getConnection();
    
            // 创建通道
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 将队列绑定到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 保证一次只接收一个消息,保证rabbitMQ每次将消息发送给闲置的消费者
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
    
                    String msg = new String(body, StandardCharsets.UTF_8);
    
                    System.out.println("同步书架[2]:" + msg);
    
                    Thread.sleep(1000);
    
                    // 手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
    
            // 监听队列
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    

    总结

    那么基于这样的需要同步用户数据的需求,那么为了保证各数据同步之间互不影响,降低耦合性,那么我们就可以使用多个队列,进行用户数据的同步。提升整个系统的高可用。

    日拱一卒,功不唐捐

    今日推荐

    RabbitMQ工作队列之公平分发消息与消息应答(ACK)

    如何利用RabbitMQ生产一个简单的消息

    RabbitMQ如何高效的消费消息

    好文章,我在看

    展开全文
  • 所有的consumer可以连接到同一个topic,然后消息会以RoundRobin的形式,轮训发送给每一个consumer,一个消息只会下发给一个consumer。如果一个consumer挂掉了,所有发给它但是还没有ack的...
  • Activemq实现Mysql与SolrCloud同步策略

    千次阅读 2017-09-05 20:24:00
    应用场景: ...再插入商品之后,发送一个商品id到队列上面去,是生产者。solr服务负责消费,监听商品添加消息,接收消息后,将对应的商品信息同步到索引库。 代码: 1.生产者 编码 @Autowired pr
  • 出现了重复消费的问题,同一个消息被重复消费次,导致了用户端收到了条重复的消息,最终排查发现,是因为消费者在处理消息的方法onMessage中有异常没有捕获到,导致异常上抛,被consumeMessage捕获并判定为...
  • 前情出现了重复消费的问题,同一个消息被重复消费次,导致了用户端收到了条重复的消息,最终排查发现,是因为消费者在处理消息的方法onMessage中有异常没有捕获到,导致异常上抛,被consumeMessage捕获并判定...
  • Java线程安全

    2019-10-19 18:09:38
    线程能够提高CPU的使用效率,为用户提供宏观上同时执行个动作的效果,强大的功能背后也存在线程安全性问题。个线程同时操作共享变量导致程序输出结果与设计...产品队列一个容量大小有限制的容器,如果生产出...
  • ActiveMQ集群配置及使用

    千次阅读 2017-08-13 18:39:26
    客户端集群:让个消费者消费同一个队列 Broker clusters:个Broker之间同步消息 Master Slave:实现高可用 客户端配置: ActiveMQ失效转移(falilover): 允许当其中一台消息服务器宕机时,客户端在...
  • rocketMq之MQ介绍(

    2020-12-06 11:44:28
    系统耦合高,容错性就越低,如下图,任意一个子系统出现故障,都会导致订单系统出现异常 当子系统出现故障,进行恢复了,能消费MQ的消息并进行返回,订单系统不会出现系统异常 流量削峰 流量瞬间猛增,导致...
  • Kafka 特性

    2019-01-03 09:46:00
    Kafka 特性 ...Kafka支持消费者来读取同一个消息流上面的数据,再同一个用户组下面,消费者消息读取完毕以后,其他的消费者将不可以重复读取,这种方案可以应用到集服务器对同一个消息流进...
  • 目录

    2019-09-16 14:29:55
    第一章 rabbitmq简介 第二章 rabbitmq在mac上的安装 第三章 启动rabbitmq的webUI 第四章 第一个rabbitmq...第五章 消费者监听同一个队列 附1 rabbitmq常用命令 附2 rabbitmq用户管理、角色管理与权限管理 ...
  • 比如我有这样一个案例,用户需要同步,而且需要同步到个系统,那么我们只需要队列添加一个主题,其他的子系统订阅该主题。分别处理自己的同步逻辑。 这样就实现了代码的解耦。 实现代码 1.生产者 import ...
  • 一个队列中,如果有消费者,那么消费者之间对于同一个消息的关系是竞争的关系。 例如:短信服务部署个,只需要有一个节点成功发送即可。 3.2 Pub / Sub订阅模式 引入交换机:生产者把消息发给交换机,...

空空如也

空空如也

1 2 3 4
收藏数 77
精华内容 30
关键字:

多用户消费同一个队列