精华内容
下载资源
问答
  • 21 利用分布式消息队列降低系统耦合
    千次阅读
    2021-05-17 13:55:55

    在这里插入图片描述

    国内某大型互联网企业经常因为对同行的产品进行微创新,然后推岀自己的产品而遭人诟病,不讨论这种做法是否合适,我们分析这些产品,发现大多数都比原创产品有 更好的用户体验。这些产品常常后来居上,更速度地推岀新功能,吸引用户注意,进而 占据市场。

    微信从发布到拥有1亿用户,仅仅用了一年的时间。而据说摇一摇这个功能是两个 实习生用一个星期就开发完成上线的。

    使用TOP( Taobao Open API ), 一个技术熟练的淘宝客网站开发工程师只需要用几个 晚上的业余时间就可以开发部署一个炫目的购物导购网站。

    如此轻易地就可以开发一个新产品,如此快速地就可以实现一个新功能,他们是如 何做到的?

    为什么有的网站必须规定系统发布日,一到发布日就如临大敌,整个技术部加班通 宵达旦;而有的网站就可以随时发布,新功能可以随时快速上线。

    这些都有赖于网站的扩展性架构设计,就是在对现有系统影响最小的情况下,系统 功能可持续扩展及提升的能力。

    经常听到各种场合中对扩展性和伸缩性的误用,包括许多资深网站架构师也常常混淆两者,用扩展性表示伸缩性。在此,我们澄清下这两个概念。

    扩展性(Extensibility)

    指对现有系统影响最小的情况下,系统功能可持续扩展或提升的能力。表现在系统 基础设施稳定不需要经常变更,应用之间较少依赖和耦合,对需求变更可以敏捷响应。 它是系统架构设计层面的开闭原则(对扩展开放,对修改关闭),架构设计考虑未来功能 扩展,当系统增加新功能时,不需要对现有系统的结构和代码进行修改。

    伸缩性(Scalability)

    指系统能够通过增加(减少)自身资源规模的方式增强(减少)自己计算处理事务的能力。如果这种增减是成比例的,就被称作线性伸缩性。在网站架构中,通常指利用 集群的方式增加服务器数量、提高系统的整体事务吞吐能力。


    1 构建可扩展的网站架构

    开发低耦合系统是软件设计的终极目标之一,这一目标驱动着软件开发技术的创新 与发展,从软件与硬件的第一次分离到操作系统的诞生;从汇编语言到面向过程的开发语言,再到面向对象的编程语言;从各种软件工具集到各种开发框架;无不体现着降低 软件系统耦合性这一终极目标。可以说,度量一个开发框架、设计模式、编程语言优劣 的重要尺度就是衡量它是不是让软件开发过程和软件产品更加低耦合。

    显而易见,低耦合的系统更容易扩展,低耦合的模块更容易复用,一个低耦合的系
    统设计也会让开发过程和维护变得更加轻松和容易管理。一个复杂度为100的系统,如 果能够分解成没有耦合的两个子系统,那么每个子系统的复杂度不是50,而可能是25。
    当然,完全没有耦合就是没有关系,也就无法组合出一个强大的系统。那么如何分解系 统的各个模块、如何定义各个模块的接口、如何复用组合不同的模块构造成一个完整的 系统,这是软件设计中最有挑战的部分。

    笔者认为,软件架构师最大的价值不在于掌握多少先进的技术,而在于具有将一个 大系统切分成N个低耦合的子模块的能力,这些子模块包含横向的业务模块,也包含纵 向的基础技术模块。这种能力一部分源自专业的技术和经验,还有一部分源自架构师对 业务场景的理解、对人性的把握、甚至对世界的认知。

    大型网站也常常意味着功能复杂,产品众多。网站为了在市场竞争中胜岀,不断推 出各种新产品,为了把握市场机会,这些产品从策划到上线,时间非常短暂,技术团队 必须在产品设计和需求分析结束之后,快速地开发完成一个新产品。同时经过长期的演 化和发展,这些产品之间的关系错综复杂,维护也变得异常困难。这些问题对网站的可 扩展架构提岀了挑战和要求。

    设计网站可扩展架构的核心思想是模块化,并在此基础之上,降低模块间的耦合性, 提高模块的复用性。
    我们在本书第6章讨论过网站通过分层和分割的方式进行架构伸缩,分层和分割也 是模块化设计的重要手段,利用分层和分割的方式将软件分割为若干个低耦合的独立的 组件模块,这些组件模块以消息传递及依赖调用的方式聚合成一个完整的系统。

    在大型网站中,这些模块通过分布式部署的方式,独立的模块部署在独立的服务器 (集群)上,从物理上分离模块之间的耦合关系,进一步降低耦合性提高复用性。

    模块分布式部署以后具体聚合方式主要有分布式消息队列和分布式服务。


    2 利用分布式消息队列降低系统耦合性

    如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响最小, 这样系统的可扩展性无疑更好一些。


    2.1 事件驱动架构

    事件驱动架构(Event Driven Architecture ):通过在低耦合的模块之间传输事件消息, 以保持模块的松散耦合,并借助事件消息的通信完成模块间合作,典型的EDA架构就是 操作系统中常见的生产者消费者模式。在大型网站架构中,具体实现手段有很多,最常 用的是分布式消息队列,如图7.1所示。
    在这里插入图片描述

    消息队列利用发布一订阅模式工作,消息发送者发布消息,一个或者多个消息接收 者订阅消息。消息发送者是消息源,在对消息进行处理后将消息发送至分布式消息队列, 消息接受者从分布式消息队列获取该消息后继续进行处理。可以看到,消息发送者和消 息接受者之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,而消息接受者只需要从分布式消息队列获取消息后进行处理,不需要知道该消息 从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务 没有任何影响,从而实现网站业务的可扩展设计。

    消息接受者在对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息 继续发送出去,等待其他消息接受者订阅处理该消息。因此基于事件(消息对象)驱动 的业务架构可以是一系列的流程。

    由于消息发送者不需要等待消息接受者处理数据就可以返回,系统具有更好的响应 延迟;同时,在网站访问高峰,消息可以暂时存储在消息队列中等待消息接受者根据自 身负载处理能力控制消息处理速度,减轻数据库等后端存储的负载压力。


    2.2 分布式消息队列

    队列是一种先进先出的数据结构,分布式消息队列可以看作将这种数据结构部署到 独立的服务器上,应用程序可以通过远程访问接口使用分布式消息队列,进行消息存取 操作,进而实现分布式的异步调用,基本原理如图7.2所示。

    消息生产者应用程序通过远程访问接口将消息推送给消息队列服务器,消息队列服 务器将消息写入本地内存队列后立即返回成功响应给消息生产者。消息队列服务器根据 消息订阅列表查找订阅该消息的消息消费者应用程序,将消息队列中的消息按照先进先出(FIFO )的原则将消息通过远程通信接口发送给消息消费者程序。
    在这里插入图片描述

    目前开源的和商业的分布式消息队列产品有很多,比较著名的如Apache ActiveMQ 等,这些产品除了实现分布式消息队列的一般功能,在可用性、伸缩性、数据一致性、性能和可管理性方面也做了很多改善。

    伸缩性方面,由于消息队列服务器上的数据可以看作是被即时处理的,因此类似 于无状态的服务器,伸缩性设计比较简单。将新服务器加入分布式消息队列集群中,通 知生产者服务器更改消息队列服务器列表即可。

    可用性方面,为了避免消费者进程处理缓慢,分布式消息队列服务器内存空间不 足造成的问题,如果内存队列已满,会将消息写入磁盘,消息推送模块在将内存队列消 息处理完以后,将磁盘内容加载到内存队列继续处理。

    为了避免消息队列服务器宕机造成消息丢失,会将消息成功发送到消息队列的消息存储在消息生产者服务器,等消息真正被消息消费者服务器处理后才删除消息。在消息 队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中其他的服务器发 布消息。

    分布式消息队列可以很复杂,比如可以支持ESB(企业服务总线)、支持SOA(面向 服务的架构)等;也可以很简单,比如用MySQL也可以当作分布式消息队列:消息生产 者程序将消息当作数据记录写入数据库,消息消费者程序查询数据库并按记录写入时间 戳排序,就实现了一个事实上的分布式消息队列,而且这个消息队列使用成熟的MySQL 运维手段,也可以达到较高的可用性和性能指标。

    更多相关内容
  • RabbitMQ如何保证消息的幂等、可靠、顺序

    千次阅读 多人点赞 2020-11-08 16:56:55
    如何保证消息的幂等 所谓的幂等其实就是保证同一条消息不会重复或者重复消费了也不会对系统数据造成异常。 出现消息重复消费的情况 拿RabbitMQ来说的话,消费者在消费完成一条消息之后会向MQ回复一个ACK(可以...

    如何保证消息的幂等性

    所谓的幂等性其实就是保证同一条消息不会重复或者重复消费了也不会对系统数据造成异常。

    出现消息重复消费的情况

    拿RabbitMQ来说的话,消费者在消费完成一条消息之后会向MQ回复一个ACK(可以配置自动ACK或者手动ACK) 来告诉MQ这条消息已经消费了。假如当消费者消费完数据后,准备回执ACK时,系统挂掉了,MQ是不知道该条消息已经被消费了。所以重启之后MQ会再次发送该条消息,导致消息被重复消费,如果此时没有做幂等性处理,可能就会导致数据错误等问题。

    如何保证消息队列消费的幂等性

    这一块应该还是要结合业务来选择合适的方法,有以下几个方案:

    • 消费数据为了单纯的写入数据库,可以先根据主键查询数据是否已经存在,如果已经存在了就没必要插入了。或者直接插入也没问题,因为可以利用主键的唯一性来保证数据不会重复插入,重复插入只会报错,但不会出现脏数据。
    • 消费数据只是为了缓存到redis当中,这种情况就是直接往redis中set value了,天然的幂等性。
    • 针对复杂的业务情况,可以在生产消息的时候给每个消息加一个全局唯一ID,消费者消费消息时根据这个ID去redis当中查询之前是否消费过。如果没有消费过,就进行消费并将这个消息的ID写入到redis当中。如果已经消费过了,就无需再次消费了。

    如何保证消息的可靠性

    在将消息投入到MQ时,有可能会发生消息投递失败、消息丢失的问题,如果刚好丢失的是一些核心消息,例如money相关的,那就凉凉了…

    出现消息丢失的情况

    还是拿RabbitMQ来说…
    image

    从图中可以看到一共有以下三种可能出现消息丢失的情况:

    • 生产者弄丢了消息

    生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败

    • MQ自身弄丢了消息

    未开启RabbitMQ的持久化,数据存储于内存,服务挂掉后队列数据丢失;
    开启了RabbitMQ持久化,消息写入后会持久化到磁盘,但是在落盘的时候挂掉了,不过这种概率很小

    • 消费者弄丢了消息

    消费者刚接收到消息还没处理完成,结果消费者挂掉了…

    保证消息可靠性的方法

    针对以上三种情况,每种情况都有对应的处理方法:

    • 生产者弄丢消息时的解决方法

    方法一:生产者在发送数据之前开启RabbitMQ的事务

    // 开启事务
    channel.txSelect
    try {
        // 这里发送消息
    } catch (Exception e) {
        channel.txRollback
        // 这里再次重发这条消息
    }
    // 提交事务
    channel.txCommit
    
    

    采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能。

    方法二:开启confirm模式

    使用springboot时在application.yml配置文件中做如下配置

    spring:
      rabbitmq:
        addresses: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 发送者开启 confirm 确认机制
        publisher-confirm-type: correlated
    

    实现confirm回调接口

    @Slf4j
    public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (!ack) {
                log.error("消息发送异常!");
                //可以进行重发等操作
            } else {
                log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
            }
        }
    }
    

    生产者发送消息时设置confirm回调

    @Slf4j
    @Configuration
    public class RabbitMqConfig {
    
         @Bean
        public ConfirmCallbackService confirmCallbackService() {
            return new ConfirmCallbackService();
        }
        
        @Bean
        public RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
    
            /**
             * 消费者确认收到消息后,手动ack回执回调处理
             */
            rabbitTemplate.setConfirmCallback(confirmCallbackService());
            return rabbitTemplate;
        }
        
        //其他配置代码
        ......
    
    }
    

    小结: 事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制。

    • MQ自身弄丢消息时的解决方法

    第一步: 创建queue时设置为持久化队列,这样可以保证RabbitMQ持久化queue的元数据,此时还是不会持久化queue里的数据

        @Bean(QUEUE_IOT_TOIN)
        public Queue createIotQueue() {
            return new Queue(QUEUE_IOT_TOIN, true);
        }
    

    第二步: 发送消息时将消息的deliveryMode设置为持久化,此时queue中的消息才会持久化到磁盘。

        public void sendToUploadMsg(Object obj, String routingKey) {
            try {
    
                String jsonString = JSON.toJSONString(obj);
                rabbitTemplate.convertAndSend(EXCHANGE_IOT, routingKey, jsonString, message -> {
                    //设置该条消息持久化
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                }, new CorrelationData(UUIDUtil.generate()));
            } catch (Exception e) {
                log.info(routingKey + "发送消息异常!");
            }
        }
    

    同时设置queue和message持久化以后,RabbitMQ 挂了再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据,保证数据不会丢失。

    但是就算开启持久化机制,也有可能出现上面说的的消息落盘时服务挂掉的情况。这时可以考虑结合生产者的confirm机制来处理,持久化机制开启后消息只有成功落盘时才会通过confirm回调通知生产者,所以可以考虑生产者在生产消息时维护一个正在等待消息发送确认的队列,如果超过一定时间还没从confirm中收到对应消息的反馈,自动进行重发处理。

    • 消费者自身弄丢消息时的解决方法

    关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费,不过我们只需要保证幂等性就好了,重复消费也不会造成问题。

    在springboot中修改application.yml配置文件更改为手动ack模式

    spring:
      rabbitmq:
        addresses: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 发送者开启 confirm 确认机制
        publisher-confirm-type: correlated
        # 发送者开启 return 确认机制
        publisher-returns: true
        listener:
          simple:
            concurrency: 10
            max-concurrency: 10
            prefetch: 1
            auto-startup: true
            default-requeue-rejected: true
            # 设置消费端手动 ack
            acknowledge-mode: manual
            # 是否支持重试
            retry:
              enabled: true
    

    消费端手动ack参考代码:

        @RabbitHandler
        public void handlerMq(String msg, Channel channel, Message message) throws IOException {
            try {
                //业务处理代码
                ......
            
                //手动ACK
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                
            } catch (Exception e) {
                if (message.getMessageProperties().getRedelivered()) {
                    log.error("消息已重复处理失败,拒绝再次接收...", e);
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
                } else {
                    log.error("消息即将再次返回队列处理...", e);
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                }
            }
    
    
        }
    

    RabbitMQ保证消息可靠性总结:
    image


    如何保证消息的顺序性

    消息在投入到queue的时候是有顺序,如果只是单个消费者来处理对应的单个queue,是不会出现消息错乱的问题。但是在消费的时候有可能多个消费者消费同一个queue,由于各个消费者处理消息的时间不同,导致消息未能按照预期的顺序处理。其实根本的问题就是如何保证消息按照预期的顺序处理完成

    出现消费顺序错乱的情况

    • 为了提高处理效率,一个queue存在多个consumer
      image

    • 一个queue只存在一个consumer,但是为了提高处理效率,consumer中使用了多线程进行处理
      image

    保证消息顺序性的方法

    • 将原来的一个queue拆分成多个queue,每个queue都有一个自己的consumer。该种方案的核心是生产者在投递消息的时候根据业务数据关键值(例如订单ID哈希值对订单队列数取模)来将需要保证先后顺序的同一类数据(同一个订单的数据) 发送到同一个queue当中。
      image

    • 一个queue就一个consumer,在consumer中维护多个内存队列根据业务数据关键值(例如订单ID哈希值对内存队列数取模)将消息加入到不同的内存队列中,然后多个真正负责处理消息的线程去各自对应的内存队列当中获取消息进行消费。
      image

    RabbitMQ保证消息顺序性总结:
    核心思路就是根据业务数据关键值划分成多个消息集合,而且每个消息集合中的消息数据都是有序的,每个消息集合有自己独立的一个consumer。多个消息集合的存在保证了消息消费的效率,每个有序的消息集合对应单个的consumer也保证了消息消费时的有序性。


    参考文章:

    https://gitee.com/shishan100/Java-Interview-Advanced/blob/master/docs/high-concurrency/how-to-ensure-the-order-of-messages.md

    https://www.cnblogs.com/-wenli/p/13047059.html

    展开全文
  • 分布式消息最终一致解决方案

    千次阅读 2019-03-22 13:59:41
    多个服务之间使用自己单独维护的数据库,它们彼此之间不在同一个事务中,假如A执行成功了,B执行却失败了,而A的事务此时已经提交,无法回滚,那么最终就会导致两边数据不一致的问题;尽管很早之前就有基于两阶段...

    随着分布式服务架构的流行与普及,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用。虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务问题,多个服务之间使用自己单独维护的数据库,它们彼此之间不在同一个事务中,假如A执行成功了,B执行却失败了,而A的事务此时已经提交,无法回滚,那么最终就会导致两边数据不一致性的问题;尽管很早之前就有基于两阶段提交的XA分布式事务,但是这类方案因为需要资源的全局锁定,导致性能极差;因此后面就逐渐衍生出了消息最终一致性、TCC等柔性事务的分布式事务方案,本文主要分析的是基于消息的最终一致性方案

    普通消息的处理流程

    image

    1. 消息生成者发送消息
    2. MQ收到消息,将消息进行持久化,在存储中新增一条记录
    3. 返回ACK给生产者
    4. MQ push 消息给对应的消费者,然后等待消费者返回ACK
    5. 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
    6. MQ删除消息

    普通消息处理存在的一致性问题

    我们以订单创建为例,订单系统先创建订单(本地事务),再发送消息给下游处理;如果订单创建成功,然而消息没有发送出去,那么下游所有系统都无法感知到这个事件,会出现脏数据;

    public void processOrder() {
        // 订单处理(业务操作) 
        orderService.process();
        // 发送订单处理成功消息(发送消息) 
        sendBizMsg ();
    }
    
    

    如果先发送订单消息,再创建订单;那么就有可能消息发送成功,但是在订单创建的时候却失败了,此时下游系统却认为这个订单已经创建,也会出现脏数据。

    public void processOrder() {
       // 发送订单处理成功消息(发送消息) 
        sendBizMsg ();
        // 订单处理(业务操作) 
        orderService.process();
    }
    
    

    一个错误的想法

    此时可能有同学会想,我们可否将消息发送和业务处理放在同一个本地事务中来进行处理,如果业务消息发送失败,那么本地事务就回滚,这样是不是就能解决消息发送的一致性问题呢?

    @Transactionnal
    public void processOrder() {
        try{
            // 订单处理(业务操作) 
            orderService.process(); 
            // 发送订单处理成功消息(发送消息) 
            sendBizMsg ();
        }catch(Exception e){
             事务回滚;   
        }
    }
    
    

    消息发送的异常情况分析

    可能的情况一致性
    订单处理成功,然后突然宕机,事务未提交,消息没有发送出去一致
    订单处理成功,由于网络原因或者MQ宕机,消息没有发送出去,事务回滚一致
    订单处理成功,消息发送成功,但是MQ由于其他原因,导致消息存储失败,事务回滚一致
    订单处理成功,消息存储成功,但是MQ处理超时,从而ACK确认失败,导致发送方本地事务回滚不一致

    从上面的情况分析,我们可以看到,使用普通的处理方式,无论如何,都无法保证业务处理与消息发送两边的一致性,其根本的原因就在于:远程调用,结果最终可能为成功、失败、超时;而对于超时的情况,处理方最终的结果可能是成功,也可能是失败,调用方是无法知晓的。 笔者就曾经在项目中出现类似的情况,调用方先在本地写数据,然后发起RPC服务调用,但是处理方由于DB数据量比较大,导致处理超时,调用方在出现超时异常后,直接回滚本地事务,从而导致调用方这边没数据,而处理方那边数据却已经写入了,最终导致两边业务数据的不一致。为了保证两边数据的一致性,我们只能从其他地方寻找新的突破口。

    事务消息

    由于传统的处理方式无法解决消息生成者本地事务处理成功消息发送成功两者的一致性问题,因此事务消息就诞生了,它实现了消息生成者本地事务与消息发送的原子性,保证了消息生成者本地事务处理成功与消息发送成功的最终一致性问题。

    事务消息处理的流程

    image

    1. 事务消息与普通消息的区别就在于消息生产环节,生产者首先预发送一条消息到MQ(这也被称为发送half消息)

    2. MQ接受到消息后,先进行持久化,则存储中会新增一条状态为待发送的消息

    3. 然后返回ACK给消息生产者,此时MQ不会触发消息推送事件

    4. 生产者预发送消息成功后,执行本地事务

    5. 执行本地事务,执行完成后,发送执行结果给MQ

    6. MQ会根据结果删除或者更新消息状态为可发送

    7. 如果消息状态更新为可发送,则MQ会push消息给消费者,后面消息的消费和普通消息是一样的

    注意点:由于MQ通常都会保证消息能够投递成功,因此,如果业务没有及时返回ACK结果,那么就有可能造成MQ的重复消息投递问题。因此,对于消息最终一致性的方案,消息的消费者必须要对消息的消费支持幂等,不能造成同一条消息的重复消费的情况。

    事务消息异常情况分析

    异常情况一致性处理异常方法
    消息未存储,业务操作未执行一致
    存储待发送消息成功,但是ACK失败,导致业务未执行(可能是MQ处理超时、网络抖动等原因)不一致MQ确认业务操作结果,处理消息(删除消息)
    存储待发送消息成功,ACK成功,业务执行(可能成功也可能失败),但是MQ没有收到生产者业务处理的最终结果不一致MQ确认业务操作结果,处理消息(根据就业务处理结果,更新消息状态,如果业务执行成功,则投递消息,失败则删除消息)
    业务处理成功,并且发送结果给MQ,但是MQ更新消息失败,导致消息状态依旧为待发送不一致同上

    支持事务消息的MQ

    现在目前较为主流的MQ,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ支持事务消息。据笔者了解,早年阿里对MQ增加事务消息也是因为支付宝那边因为业务上的需求而产生的。因此,如果我们希望强依赖一个MQ的事务消息来做到消息最终一致性的话,在目前的情况下,技术选型上只能去选择RocketMQ来解决。上面我们也分析了事务消息所存在的异常情况,即MQ存储了待发送的消息,但是MQ无法感知到上游处理的最终结果。对于RocketMQ而言,它的解决方案非常的简单,就是其内部实现会有一个定时任务,去轮训状态为待发送的消息,然后给producer发送check请求,而producer必须实现一个check监听器,监听器的内容通常就是去检查与之对应的本地事务是否成功(一般就是查询DB),如果成功了,则MQ会将消息设置为可发送,否则就删除消息。

    常见的问题

    1. 问:如果预发送消息失败,是不是业务就不执行了?

      答:是的,对于基于消息最终一致性的方案,一般都会强依赖这步,如果这个步骤无法得到保证,那么最终也 就不可能做到最终一致性了。

    2. 问:为什么要增加一个消息预发送机制,增加两次发布出去消息的重试机制,为什么不在业务成功之后,发送失败的话使用一次重试机制?

      答:如果业务执行成功,再去发消息,此时如果还没来得及发消息,业务系统就已经宕机了,系统重启后,根本没有记录之前是否发送过消息,这样就会导致业务执行成功,消息最终没发出去的情况。

    3. 如果consumer消费失败,是否需要producer做回滚呢?

      答:这里的事务消息,producer不会因为consumer消费失败而做回滚,采用事务消息的应用,其所追求的是高可用最终一致性,消息消费失败的话,MQ自己会负责重推消息,直到消费成功。因此,事务消息是针对生产端而言的,而消费端,消费端的一致性是通过MQ的重试机制来完成的。

    4. 如果consumer端因为业务异常而导致回滚,那么岂不是两边最终无法保证一致性?

      答:基于消息的最终一致性方案必须保证消费端在业务上的操作没障碍,它只允许系统异常的失败,不允许业务上的失败,比如在你业务上抛出个NPE之类的问题,导致你消费端执行事务失败,那就很难做到一致了。

    由于并非所有的MQ都支持事务消息,假如我们不选择RocketMQ来作为系统的MQ,是否能够做到消息的最终一致性呢?答案是可以的。

    基于本地消息的最终一致性

    image

    基于本地消息的最终一致性方案的最核心做法就是在执行业务操作的时候,记录一条消息数据到DB,并且消息数据的记录与业务数据的记录必须在同一个事务内完成,这是该方案的前提核心保障。在记录完成后消息数据后,后面我们就可以通过一个定时任务到DB中去轮训状态为待发送的消息,然后将消息投递给MQ。这个过程中可能存在消息投递失败的可能,此时就依靠重试机制来保证,直到成功收到MQ的ACK确认之后,再将消息状态更新或者消息清除;而后面消息的消费失败的话,则依赖MQ本身的重试来完成,其最后做到两边系统数据的最终一致性。基于本地消息服务的方案虽然可以做到消息的最终一致性,但是它有一个比较严重的弊端,每个业务系统在使用该方案时,都需要在对应的业务库创建一张消息表来存储消息。针对这个问题,我们可以将该功能单独提取出来,做成一个消息服务来统一处理,因而就衍生出了我们下面将要讨论的方案。

    独立消息服务的最终一致性

    image

    独立消息服务最终一致性本地消息服务最终一致性最大的差异就在于将消息的存储单独地做成了一个RPC的服务,这个过程其实就是模拟了事务消息的消息预发送过程,如果预发送消息失败,那么生产者业务就不会去执行,因此对于生产者的业务而言,它是强依赖于该消息服务的。不过好在独立消息服务支持水平扩容,因此只要部署多台,做成HA的集群模式,就能够保证其可靠性。在消息服务中,还有一个单独地定时任务,它会定期轮训长时间处于待发送状态的消息,通过一个check补偿机制来确认该消息对应的业务是否成功,如果对应的业务处理成功,则将消息修改为可发送,然后将其投递给MQ;如果业务处理失败,则将对应的消息更新或者删除即可。因此在使用该方案时,消息生产者必须同时实现一个check服务,来供消息服务做消息的确认。对于消息的消费,该方案与上面的处理是一样,都是通过MQ自身的重发机制来保证消息被消费。

    总结:上游事务提交之后,在基于MQ的场景下就不考虑回滚了。失败的可能是由于网络、服务宕机所导致,文章中提到说业务上执行是无障碍的。如果下游服务长时间没有恢复,那么就应该设置告警,在这里有几种机制来解决一些牛皮癣类型的问题,假如上游消息始终发送失败(这种可能性基本不存在除非代码是假的)这种情况我们可以设置报警机制比如发生异常时可以打印日志,发送短信,发送邮件,将异常订单保存到数据库,这些措施可以同时用于下游一些异常订单,同时也可以在发生异常的时候新建一个异常Topic的消息提示,让人工来介入数据订正。

    展开全文
  • RabbitMQ消息最终一致解决方案

    千次阅读 2019-09-29 10:50:38
    RabbitMQ消息最终一致解决方案 随着分布式服务架构的流行与普及,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用。虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战...

    RabbitMQ消息最终一致性解决方案
    随着分布式服务架构的流行与普及,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用。虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务问题,多个服务之间使用自己单独维护的数据库,它们彼此之间不在同一个事务中,假如A执行成功了,B执行却失败了,而A的事务此时已经提交,无法回滚,那么最终就会导致两边数据不一致性的问题;尽管很早之前就有基于两阶段提交的XA分布式事务,但是这类方案因为需要资源的全局锁定,导致性能极差;因此后面就逐渐衍生出了消息最终一致性、TCC等柔性事务的分布式事务方案,本文主要分析的是基于消息的最终一致性方案。

    01 普通消息的处理流程
    MQ消息最终一致性解决方案
    在这里插入图片描述

    消息生成者发送消息
    MQ收到消息,将消息进行持久化,在存储中新增一条记录
    返回ACK给生产者
    MQ push 消息给对应的消费者,然后等待消费者返回ACK
    如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
    MQ删除消息
    1.1 普通消息处理存在的一致性问题

    我们以订单创建为例,订单系统先创建订单(本地事务),再发送消息给下游处理;如果订单创建成功,然而消息没有发送出去,那么下游所有系统都无法感知到这个事件,会出现脏数据;

    public void processOrder() {

    // 订单处理(业务操作)

    orderService.process();

    // 发送订单处理成功消息(发送消息)

    sendBizMsg ();

    }

    如果先发送订单消息,再创建订单;那么就有可能消息发送成功,但是在订单创建的时候却失败了,此时下游系统却认为这个订单已经创建,也会出现脏数据。

    public void processOrder() {
    // 发送订单处理成功消息(发送消息)
    sendBizMsg ();
    // 订单处理(业务操作)
    orderService.process();
    }
    1.2 一个错误的想法

    此时可能有同学会想,我们可否将消息发送和业务处理放在同一个本地事务中来进行处理,如果业务消息发送失败,那么本地事务就回滚,这样是不是就能解决消息发送的一致性问题呢?

    @Transactionnal
    public void processOrder() {
    try{
    // 订单处理(业务操作)
    orderService.process();
    // 发送订单处理成功消息(发送消息)
    sendBizMsg ();
    }catch(Exception e){
    事务回滚;
    }
    }
    1.3 消息发送的异常情况分析

    可能的情况 一致性 订单处理成功,然后突然宕机,事务未提交,消息没有发送出去 一致 订单处理成功,由于网络原因或者MQ宕机,消息没有发送出去,事务回滚 一致 订单处理成功,消息发送成功,但是MQ由于其他原因,导致消息存储失败,事务回滚 一致 订单处理成功,消息存储成功,但是MQ处理超时,从而ACK确认失败,导致发送方本地事务回滚 不一致 从上面的情况分析,我们可以看到,使用普通的处理方式,无论如何,都无法保证业务处理与消息发送两边的一致性,其根本的原因就在于:远程调用,结果最终可能为成功、失败、超时;而对于超时的情况,处理方最终的结果可能是成功,也可能是失败,调用方是无法知晓的。 笔者就曾经在项目中出现类似的情况,调用方先在本地写数据,然后发起RPC服务调用,但是处理方由于DB数据量比较大,导致处理超时,调用方在出现超时异常后,直接回滚本地事务,从而导致调用方这边没数据,而处理方那边数据却已经写入了,最终导致两边业务数据的不一致。为了保证两边数据的一致性,我们只能从其他地方寻找新的突破口。

    02 事务消息
    由于传统的处理方式无法解决消息生成者本地事务处理成功与消息发送成功两者的一致性问题,因此事务消息就诞生了,它实现了消息生成者本地事务与消息发送的原子性,保证了消息生成者本地事务处理成功与消息发送成功的最终一致性问题。

    03 事务消息处理的流程
    MQ消息最终一致性解决方案
    在这里插入图片描述

    事务消息与普通消息的区别就在于消息生产环节,生产者首先预发送一条消息到MQ(这也被称为发送half消息)
    MQ接受到消息后,先进行持久化,则存储中会新增一条状态为待发送的消息
    然后返回ACK给消息生产者,此时MQ不会触发消息推送事件
    生产者预发送消息成功后,执行本地事务
    执行本地事务,执行完成后,发送执行结果给MQ
    MQ会根据结果删除或者更新消息状态为可发送
    如果消息状态更新为可发送,则MQ会push消息给消费者,后面消息的消费和普通消息是一样的
    注意点:由于MQ通常都会保证消息能够投递成功,因此,如果业务没有及时返回ACK结果,那么就有可能造成MQ的重复消息投递问题。因此,对于消息最终一致性的方案,消息的消费者必须要对消息的消费支持幂等,不能造成同一条消息的重复消费的情况。

    3.1 事务消息异常情况分析

    异常情况 一致性 处理异常方法 消息未存储,业务操作未执行 一致 无 存储待发送消息成功,但是ACK失败,导致业务未执行(可能是MQ处理超时、网络抖动等原因) 不一致 MQ确认业务操作结果,处理消息(删除消息) 存储待发送消息成功,ACK成功,业务执行(可能成功也可能失败),但是MQ没有收到生产者业务处理的最终结果 不一致 MQ确认业务操作结果,处理消息(根据就业务处理结果,更新消息状态,如果业务执行成功,则投递消息,失败则删除消息) 业务处理成功,并且发送结果给MQ,但是MQ更新消息失败,导致消息状态依旧为待发送 不一致 同上

    04 支持事务消息的MQ
    现在目前较为主流的MQ,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ支持事务消息。据笔者了解,早年阿里对MQ增加事务消息也是因为支付宝那边因为业务上的需求而产生的。因此,如果我们希望强依赖一个MQ的事务消息来做到消息最终一致性的话,在目前的情况下,技术选型上只能去选择RocketMQ来解决。上面我们也分析了事务消息所存在的异常情况,即MQ存储了待发送的消息,但是MQ无法感知到上游处理的最终结果。对于RocketMQ而言,它的解决方案非常的简单,就是其内部实现会有一个定时任务,去轮训状态为待发送的消息,然后给producer发送check请求,而producer必须实现一个check监听器,监听器的内容通常就是去检查与之对应的本地事务是否成功(一般就是查询DB),如果成功了,则MQ会将消息设置为可发送,否则就删除消息。

    05 常见的问题
    (1)问:如果预发送消息失败,是不是业务就不执行了?

    答:是的,对于基于消息最终一致性的方案,一般都会强依赖这步,如果这个步骤无法得到保证,那么最终也 就不可能做到最终一致性了。

    (2)问:为什么要增加一个消息预发送机制,增加两次发布出去消息的重试机制,为什么不在业务成功之后,发送失败的话使用一次重试机制?

    答:如果业务执行成功,再去发消息,此时如果还没来得及发消息,业务系统就已经宕机了,系统重启后,根本没有记录之前是否发送过消息,这样就会导致业务执行成功,消息最终没发出去的情况。

    (3)问:如果consumer消费失败,是否需要producer做回滚呢?

    答:这里的事务消息,producer不会因为consumer消费失败而做回滚,采用事务消息的应用,其所追求的是高可用和最终一致性,消息消费失败的话,MQ自己会负责重推消息,直到消费成功。因此,事务消息是针对生产端而言的,而消费端,消费端的一致性是通过MQ的重试机制来完成的。

    (4)问:如果consumer端因为业务异常而导致回滚,那么岂不是两边最终无法保证一致性?

    答:基于消息的最终一致性方案必须保证消费端在业务上的操作没障碍,它只允许系统异常的失败,不允许业务上的失败,比如在你业务上抛出个NPE之类的问题,导致你消费端执行事务失败,那就很难做到一致了。

    由于并非所有的MQ都支持事务消息,假如我们不选择RocketMQ来作为系统的MQ,是否能够做到消息的最终一致性呢?答案是可以的。

    06 基于本地消息的最终一致性
    MQ消息最终一致性解决方案
    在这里插入图片描述

    基于本地消息的最终一致性方案的最核心做法就是在执行业务操作的时候,记录一条消息数据到DB,并且消息数据的记录与业务数据的记录必须在同一个事务内完成,这是该方案的前提核心保障。在记录完成后消息数据后,后面我们就可以通过一个定时任务到DB中去轮训状态为待发送的消息,然后将消息投递给MQ。这个过程中可能存在消息投递失败的可能,此时就依靠重试机制来保证,直到成功收到MQ的ACK确认之后,再将消息状态更新或者消息清除;而后面消息的消费失败的话,则依赖MQ本身的重试来完成,其最后做到两边系统数据的最终一致性。基于本地消息服务的方案虽然可以做到消息的最终一致性,但是它有一个比较严重的弊端,每个业务系统在使用该方案时,都需要在对应的业务库创建一张消息表来存储消息。针对这个问题,我们可以将该功能单独提取出来,做成一个消息服务来统一处理,因而就衍生出了我们下面将要讨论的方案。

    07 独立消息服务的最终一致性
    MQ消息最终一致性解决方案
    在这里插入图片描述

    独立消息服务最终一致性与本地消息服务最终一致性最大的差异就在于将消息的存储单独地做成了一个RPC的服务,这个过程其实就是模拟了事务消息的消息预发送过程,如果预发送消息失败,那么生产者业务就不会去执行,因此对于生产者的业务而言,它是强依赖于该消息服务的。不过好在独立消息服务支持水平扩容,因此只要部署多台,做成HA的集群模式,就能够保证其可靠性。在消息服务中,还有一个单独地定时任务,它会定期轮训长时间处于待发送状态的消息,通过一个check补偿机制来确认该消息对应的业务是否成功,如果对应的业务处理成功,则将消息修改为可发送,然后将其投递给MQ;如果业务处理失败,则将对应的消息更新或者删除即可。因此在使用该方案时,消息生产者必须同时实现一个check服务,来供消息服务做消息的确认。对于消息的消费,该方案与上面的处理是一样,都是通过MQ自身的重发机制来保证消息被消费。

    总结:上游事务提交之后,在基于MQ的场景下就不考虑回滚了。失败的可能是由于网络、服务宕机所导致,文章中提到说业务上执行是无障碍的。如果下游服务长时间没有恢复,那么就应该设置告警,在这里有几种机制来解决一些牛皮癣类型的问题,假如上游消息始终发送失败(这种可能性基本不存在除非代码是假的)这种情况我们可以设置报警机制比如发生异常时可以打印日志,发送短信,发送邮件,将异常订单保存到数据库,这些措施可以同时用于下游一些异常订单,同时也可以在发生异常的时候新建一个异常Topic的消息提示,让人工来介入数据订正。

    展开全文
  • 分别是优先级队列、消息顺序消息分发、持久化。 正文 目录 前言 正文 优先级队列 消息顺序 消息分发 持久化 优先级队列 顾名思义,优先级高的具备优先消费的特权。 设置方式是在声明队列的时候...
  • 微信公众号一次订阅消息

    千次阅读 2018-04-11 07:51:00
    开发者可以通过一次订阅消息授权让微信用户授权第三方移动应用(接入说明)或公众号,获得发送一次订阅消息给到授权微信用户的机会。授权微信用户可以不需要关注公众号。微信用户每授权一次,开发者可获得一次下发...
  • RabbitMQ消息中间件极速入门与实战 课程 RabbitMQ学习——高级特性 RabbitMQ消息中间件技术精讲(三)—— 深入...什么是生产端的可靠投递 保障消息的成功发出 保障MQ节点的成功接收 发送端收到MQ节点(Br...
  • 简单说一下流程: 在页面带参数跳转到...用户订阅消息的结果,最后由后端触发订阅消息的发送,后端调用https://api.weixin.qq.com/cgi-bin/message/template/subscribe?access_token=ACCESS_..
  • RabbitMQ消息幂等之全局唯一ID

    千次阅读 2019-07-21 20:36:20
    消息幂等,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将...
  • Kafka 幂等,事物,消息可靠

    千次阅读 2018-06-07 16:46:53
    本文结合在使用Kafka中的使用,和遇到的问题1.Kafka中如何保障发送消息的可靠?首先我们在创建一个Producer是,可以设置的一些参数如下:1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后...
  • Kafka 消息可靠

    千次阅读 2018-07-25 08:50:03
    在 Kafka 工作机制 一文提及了 Kafka 消息的不可靠。本文就 Kafka 消息的三种不可靠(重复、丢失、乱序),分析它们出现的内部原因和解决办法。 作者:王克锋 出处:...
  • 消息中间件(消息队列)

    千次阅读 2022-02-17 13:52:30
    消息中间件
  • RabbitMQ之消息的可靠

    千次阅读 多人点赞 2020-03-25 01:21:01
    那么消息传输的可靠就至关重要了,说到这里,我们先看一下Rabbit MQ的整个工作流程: 从上面一张图中,可以很容易知道,消息是由生产者发出,流经Broker(信道->交换机->队列),再到消费者消费完毕,这就是...
  • 消息鉴别机制

    千次阅读 2018-10-09 16:47:22
    消息鉴别/认证(message authentication) 消息认证是一种允许通信者验证所收...* 还可能希望验证消息的时效(时间戳)及实体间消息流的相对顺序(序列号) 上述都属于数据完整范畴   消息鉴别机制: ...
  • 一个密码系统的安全主要与两个方面的因素有关。 (1)一个是所使用密码算法本身的保密强度。密码算法的保密强度取决于密码设计水平、破译技术等。可以说一个密码系统所使用密码算法的保密强度是该系统安全的...
  • 消息队列,问题与处理方案梳理

    千次阅读 2022-04-10 15:31:19
    1、如何保证消息不被重复消费? 一、为什么会出现重复消费的问题? RabbitMQ、RocketMQ、Kafka 都有可能出现重复消费的问题,...也许是一个 Controller 接口被重复调用了 2 次,没有做接口幂等导致的;也可能是推送
  • 我们将消息队列这个组件加入到了我们的商城系统里,并且通过秒杀这个实际的案例进行了实际演练,知道了它对高并发写流量做削峰填谷,对非关键业务逻辑做异步处理,对不同的业务系统做解耦合。场景:现...
  • 幂等浅谈 GameKing 2017.01.07 21:08* 字数 1155 阅读 467评论 0喜欢 1 概述 幂等原本是数学上的概念,即使公式:f(x)=f(f(x)) 能够成立的数学性质。用在编程领域,则意为对同一个系统,...
  • FreeRTOS消息队列

    万次阅读 多人点赞 2019-01-31 17:55:04
    当等待的时间超过了指定的阻塞时间,即使队列中尚有效数据,任务也会自动从阻塞态转移为就绪态。 由于队列可以被多个任务读取,所以对单个队列而言,也可能有多个任务处于阻塞状态以等待队列数据有效。这种情况下...
  • 史上最全的消息队列总结,我确实不信你能看完!!
  • 完整和认证

    千次阅读 2018-07-11 08:55:28
    完整和认证 密码学Hash函数 Hash函数H将可变长度的数据块M作为输入,产生固定长度的Hash值 h = H(M)。一个“好”的Hash函数具有以下的特点: 对于大的输入集合使用该函数,产生的输出结果均匀地分布且看...
  • 消息队列及消息中间件

    千次阅读 2018-08-03 10:08:35
    它具有 低耦合、可靠投递、广播、流量控制、最终一致 等一系列功能。 当前使用较多的 消息队列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等,而部分 数据库 如 Redis、MySQL 以及 phxsql ...
  • Kafka、RabbitMQ、RocketMQ等 消息中间件 介绍和对比

    万次阅读 多人点赞 2019-09-05 17:59:00
    文章目录1、前言2、概念2.1、MQ简介2.2、MQ特点2.2.1、先进先出2.2.2、发布订阅2.2.3、持久化2.2.4、分布式3、消息中间件性能究竟哪家强?3.1、Kafka3.2、RabbitMQ3.3、RocketMQ4、测试4.1、测试目的4.2、测试场景...
  • 什么是分布式消息中间件? 消息中间件的作用是什么? 消息中间件的使用场景是什么? 消息中间件选型? 初识 Kafka Kafka知识树 1 Why Kafka 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你...
  • 消息中间件产生的背景 1、在客户端与服务器进行通讯时,客户端调用后,必须等待服务对象完成处理返回结果才能继续执行,这个过程是基于请求与响应的同步过程。 2、客户与服务器对象的生命周期紧密耦合,客户进程和...
  • 关于消息队列的使用

    万次阅读 多人点赞 2019-03-05 13:58:31
    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,...
  • 它具有低耦合、可靠投递、广播、流量控制、最终一致等一系列功能,成为异步RPC的主要手段之一。 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、...
  • 随着微服务的越来越多,一致问题也越来越被重视。纠结是怎样才能ACID呢?...而本地消息表、可靠消息最终一致方案、最大努力通知方案都是不错的解决方案。 目录 一致问题 解决一致问题的模式和思路 A...
  • kafka消息确认机制

    千次阅读 2020-12-26 17:55:04
    我们知道,kafka的一个topic中,具体负责处理消息的是分区,一个分区可能存在多个副本,因此在producer端向broker发送消息时,ACK表示消息成功发送到分区后,broker返回给发端的一种可靠机制 producerACKs参数 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 531,214
精华内容 212,485
关键字:

性无消息