精华内容
下载资源
问答
  • 一、什么是可靠消息最终一致性事务 二、可靠消息最终一致性解决方案之本地消息表方案 三、可靠消息最终一致性解决方案之RocketMQ事务消息方案 四、可靠消息最终一致性解决方案之RocketMQ事务消息案例流程分析 五...

    目录

    一、什么是可靠消息最终一致性事务

    二、可靠消息最终一致性解决方案之本地消息表方案

    三、可靠消息最终一致性解决方案之RocketMQ事务消息方案

    四、可靠消息最终一致性解决方案之RocketMQ事务消息案例流程分析

    五、总结


    小伙伴们可以按照如下的分布式事务文章序列学习:

    一、什么是可靠消息最终一致性事务

    可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(即消息消费方)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方,事务参与方无论如何都要消费掉,并且事务最终要达到一致。

    此方案是利用消息中间件来完成的,如下图:

    事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消息中间件之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题。

    因此可靠消息最终一致性方案要解决以下几个问题:

    1. 本地事务与消息发送的原子性问题

    本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。

    先来尝试下这种操作,先发送消息,再操作数据库:

    begin transaction;
    //1.发送MQ
    //2.数据库操作
    commit transation;

    这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。

    那么反过来呢,先进行数据库操作,再发送消息:

    begin transaction;
    //1.数据库操作
    //2.发送MQ
    commit transation;

    这种情况下,看上去好像没什么问题,如果发送MQ消息失败,就会抛出异常,导致数据库事务回滚。

    但如果是超时异常,数据库回滚,但MQ其实已经正常发送了,同样会导致不一致。

    2、事务参与方接收消息的可靠性

    事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。

    3、消息重复消费的问题

    由于网络波动等原因,若某一个事务消费者节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费,这就要求我们需要实现事务参与方消费消息方法的幂等性。

    二、可靠消息最终一致性解决方案之本地消息表方案

    本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。

    下面以注册送积分为例来说明:

    大体流程:

    • 1、用户微服务新增用户信息;
    • 2、增加积分消息发送日志记录;
    • 3、定时调度每隔一段时间,查询出未发送的消息发送到消息中间件中;
    • 4、发送消息到MQ中;
    • 5、MQ发送消息到事务消费方;
    • 6、积分微服务接收到MQ消息,完成赠送积分逻辑;

    如上图,涉及到用户服务和积分服务两个微服务之间的交互,用户服务负责添加用户,积分服务负责增加积分。

    交互过程具体如下:

    • 1、用户注册

    用户服务在本地事务新增用户和增加“积分消息发送日志记录”。(用户表和消息表通过本地事务保证一致)

    begin transaction;
    //1.新增用户
    //2.存储积分消息日志
    commit transation;

    这种情况下,本地数据库操作与存储消息发送日志记录处于同一个事务中,本地数据库操作与记录消息日志操作具备原子性,要么都成功,要么都一起失败。

    • 2、定时任务扫描日志

    前面第一步消息我们已经将用户注册的消息记录到我们的数据表里面去了,我们可以借助定时调度,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。

    • 3、消费消息

    如何保证消费者一定能消费到消息呢?

    这里可以使用MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重试向消费者来发送消息。积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复投递此消息。

    由于消息会重复投递,所以积分服务的”增加积分“功能需要实现幂等性。

    三、可靠消息最终一致性解决方案之RocketMQ事务消息方案

    RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。

    1. RocketMQ 事务消息设计主要是为了解决消息生产者发送消息与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与消息生产者的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;
    2. RocketMQ本身提供的存储机制为事务消息提供了持久化能力;
    3. RocketMQ 的高可用机制以及可靠消息设计,为事务消息在系统发生异常时依然能够保证达成事务的最终一致性;

    在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部,解决消息生产者消息发送与本地事务执行的原子性问题。

    大体流程:

    • 1、消息生产者发送事务消息到MQ服务器
    • 2、MQ回应消息发送成功
    • 3、执行本地事务
    • 4、消息生产者向MQ Server中发送commit或rollback消息
    • 5、如果MQ Server没有收到确认消息,将会进行回查事务状态
    • 6、检查本地事务状态
    • 7、根据回查的事务状态向MQ Server中发送commit或rollback消息
    • 8、如果是commit,则投递消息
    • 9、如果是rollback,则删除消息,不进行投递

    四、可靠消息最终一致性解决方案之RocketMQ事务消息案例流程分析

    为了方便小伙伴们的理解,我们还是拿新用户注册的同时赠送用户积分的案例,来具体分析一下RocketMQ处理分布式事务的大体流程。

    主要涉及三个角色:

    1. MQ消息生产者:本例中就是用户微服务,新用户注册的时候会发送一条消息"赠送用户积分"到MQ中;
    2. MQ Server:即MQ服务器
    3. MQ消息订阅者:本例中就是积分微服务,负责新增用户积分;

    RocketMQ处理分布式事务大体流程:

    1、用户微服务发送事务消息"增加积分消息"到MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的;

    2、MQ Server回应消息发送成功

    MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

    3、用户微服务执行本地事务,即执行新增用户操作,通过本地数据库事务控制。

    4、消息投递

    若消息生产者本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后,将 "增加积分消息" 状态标记为可消费,此时MQ订阅方(积分微服务)即正常消费消息;

    若消息生产者本地事务执行失败,则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后,将会删除"增加积分消息";

    MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

    5、MQ Server进行事务回查

    如果消息生产者执行本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他消息生产者来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

    以上主干流程已由RocketMQ实现,对用户来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。

    RoacketMQ提供RocketMQLocalTransactionListener事务监听器接口:

    /**
     * 事务消息发送后的回调方法,此方法执行本地事务
     * 当消息发送给mq成功,此方法被回调
     * RocketMQLocalTransactionState.COMMIT:提交事务,提交后broker才允许消费者使用
     * RocketMQLocalTransactionState.ROLLBACK:回滚事务,回滚后消息将被删除,并且此时消息不允许被消费
     * RocketMQLocalTransactionState.Unknown:中间状态,表示MQ需要核对,以确定状态
     *
     * @param message
     * @param args
     * @return
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object args) {
        
    }
    
    /**
     * 检查事务执行状态
     * 事务状态回查,查询是否扣减金额
     *
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        
    }

    发送事务消息:

    TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    //设置TransactionListener实现
    producer.setTransactionListener(transactionListener);
    //发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);

    五、总结

    本文主要介绍了可靠消息一致性解决分布式事务需要解决的问题,通过分析本地消息表和RocketMQ两种可靠消息最终一致性方案,相信小伙伴对可靠消息最终一致性方案有了初步的认识,下一篇我们将通过详细的案例说明如何利用RocketMQ保证分布式的一致性。

    展开全文
  • RocketMQ实现可靠消息最终一致性 业务说明 本实例通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟两个账户的转账交易过程。 两个账户在分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个...

    RocketMQ实现可靠消息最终一致性

    业务说明

    本实例通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟两个账户的转账交易过程。
    两个账户在分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。
    交易过程是,张三 给李四转账指定金额。

    上述交易步骤,张三扣减金额与给bank2发转账消息,两个操作必须是一个整体性的事务。
    在这里插入图片描述

    程序组成部分

    数据库:MySQL-8.X (包括bank1和bank2 两个数据库)
    JDK:1.8.X
    微服务框架:SpringBoot2.3、SpringCloudHoxton
    RocketMQ:rocketmq-spring-boot-starter.2.0.4
    微服务及数据库的关系:
    dtx/rocker-bank-mq/bank1-server-mq 银行1 操作张三用户,连接bank1
    dtx/rocker-bank-mq/bank2-server-mq 银行2 操作李四用户,连接bank2

    创建数据库

    bank1库,包含张三的账户

    DROP DATABASE IF  EXISTS  `bank1`;
    CREATE DATABASE `bank1` CHARACTER SET 'utf8mb4';
    
    
    USE `bank1`;
    
    DROP TABLE IF EXISTS `account_info`;
    CREATE TABLE `account_info` (
    		id bigint ( 20 ) NOT NULL  AUTO_INCREMENT COMMENT '主键',
    		account_name VARCHAR ( 100 ) COMMENT '账户姓名',
    		account_no VARCHAR ( 100 ) COMMENT '账户卡号',
    		account_password VARCHAR ( 100 ) COMMENT '账户密码',
    		account_balance DECIMAL ( 10,2) COMMENT '账户余额',
    		PRIMARY KEY ( id ) 
    ) COMMENT = '账户表';
    
    INSERT INTO `account_info` VALUES ('1' , '张三的账户','1', '', 10000 );
    

    bank2库,包含李四的账户

    DROP DATABASE IF  EXISTS  `bank2`;
    CREATE DATABASE `bank2` CHARACTER SET 'utf8mb4';
    
    
    USE `bank2`;
    
    DROP TABLE IF EXISTS `account_info`;
    CREATE TABLE `account_info` (
    		id bigint ( 20 ) NOT NULL  AUTO_INCREMENT COMMENT '主键',
    		account_name VARCHAR ( 100 ) COMMENT '账户姓名',
    		account_no VARCHAR ( 100 ) COMMENT '账户卡号',
    		account_password VARCHAR ( 100 ) COMMENT '账户密码',
    		account_balance DECIMAL ( 10,2)  COMMENT '账户余额',
    		PRIMARY KEY ( id ) 
    ) COMMENT = '账户表';
    
    INSERT INTO `account_info` VALUES ('1' , '李四的账户','2', '', 0);
    

    每个数据库都创建 de_duplication

    DROP TABLE IF EXISTS `de_duplication`;
    CREATE TABLE `de_duplication` (
    		tx_no VARCHAR ( 64) NOT NULL COMMENT '事务ID',
    		create_time datetime COMMENT '创建时间',
    		PRIMARY KEY ( tx_no ) 
    ) COMMENT = '事务执行记录表';
    

    搭建项目

    启动RocketMQ

    (1)下载RocketMQ服务器
    下载地址:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin- release.zip
    (2)解压并启动
    启动nameserver:

    set ROCKETMQ_HOME=[rocketmq服务端解压路径] start [rocketmq服务端解压路径]/bin/mqnamesrv.cmd
    

    启动broker:

    set ROCKETMQ_HOME=[rocketmq服务端解压路径] start [rocketmq服务端解压路径]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true
    

    **我使用的是docker 容器创建的 rockerMQ **

    docker-compose.yml

    version: '3.5'
    services:
      rmqnamesrv:
        image: foxiswho/rocketmq:server
        container_name: rmqnamesrv
        ports:
          - 9876:9876
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
        environment:
            JAVA_OPTS: " -Duser.home=/opt"
            JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
        networks:
            rmq:
              aliases:
                - rmqnamesrv
    
      rmqbroker:
        image: foxiswho/rocketmq:broker
        container_name: rmqbroker
        ports:
          - 10909:10909
          - 10911:10911
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
          - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
        environment:
            NAMESRV_ADDR: "rmqnamesrv:9876"
            JAVA_OPTS: " -Duser.home=/opt"
            JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
        command: mqbroker -c /etc/rocketmq/broker.conf
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqbroker
    
      rmqconsole:
        image: styletang/rocketmq-console-ng
        container_name: rmqconsole
        ports:
          - 8080:8080
        environment:
            JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqconsole
    
    networks:
      rmq:
        name: rmq
        driver: bridge
    

    broker.conf
    RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,我们需要在 ./data/brokerconf/ 目录下创建一个名为 broker.conf 的配置文件,内容如下:

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    
    # 所属集群名字
    brokerClusterName=DefaultCluster
    
    # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
    # 在 broker-b.properties 使用: broker-b
    brokerName=broker-a
    
    # 0 表示 Master,> 0 表示 Slave
    brokerId=0
    
    # nameServer地址,分号分割
    # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    
    # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
    # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
    # brokerIP1=192.168.0.253
    
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    
    # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
    autoCreateTopicEnable=true
    
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    
    # Broker 对外服务的监听端口
    listenPort=10911
    
    # 删除文件时间点,默认凌晨4点
    deleteWhen=04
    
    # 文件保留时间,默认48小时
    fileReservedTime=120
    
    # commitLog 每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    
    # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    
    # destroyMapedFileIntervalForcibly=120000
    # redeleteHangedFileInterval=120000
    # 检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    # 存储路径
    # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
    # commitLog 存储路径
    # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
    # 消费队列存储
    # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
    # 消息索引存储路径
    # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
    # checkpoint 文件存储路径
    # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
    # abort 文件存储路径
    # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
    # 限制的消息大小
    maxMessageSize=65536
    
    # flushCommitLogLeastPages=4
    # flushConsumeQueueLeastPages=2
    # flushCommitLogThoroughInterval=10000
    # flushConsumeQueueThoroughInterval=60000
    
    # Broker 的角色
    # - ASYNC_MASTER 异步复制Master
    # - SYNC_MASTER 同步双写Master
    # - SLAVE
    brokerRole=ASYNC_MASTER
    
    # 刷盘方式
    # - ASYNC_FLUSH 异步刷盘
    # - SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    
    # 发消息线程池数量
    # sendMessageThreadPoolNums=128
    # 拉消息线程池数量
    # pullMessageThreadPoolNums=128
    

    RocketMQ 控制台
    访问 http://rmqIP:8080 登入控制台

    在这里插入图片描述

    搭建项目

    1. pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.1.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.wry.dtx</groupId>
        <artifactId>bank1-server-mq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>bank1-server-mq</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Hoxton.SR5</spring-cloud.version>
            <spring-cloud-alibaba.version>2.2.0.RELEASE</spring-cloud-alibaba.version>
            <mybatis-plus.version>3.3.2</mybatis-plus.version>
            <mysql.version>8.0.19</mysql.version>
            <rocketmq.version>2.0.4</rocketmq.version>
        </properties>
    
        <dependencies>
            <!--rocketmq-->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>${rocketmq.version}</version>
            </dependency>
            <!--web-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>${mybatis-plus.version}</version>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
                <dependency>
                    <groupId>com.alibaba.cloud</groupId>
                    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                    <version>${spring-cloud-alibaba.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    
    1. application.yml
    server:
      port: 8005
    
    spring:
      application:
        name: bank1-server-mq
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://39.96.3.100:3306/bank1?allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC&nullCatalogMeansCurrent=true
        username: root
        password: 123456
    
    logging:
      level:
        root: info
    
    mybatis-plus:
      mapper-locations: classpath:/mapper/*.xml
      typeAliasesPackage: com.wry.dtx.bank1.entity
      global-config:
        db-config:
          field-strategy: not-empty
          id-type: auto
          db-type: mysql
    
    ribbon:
      ReadTimeout: 3000
      ConnectTimeout: 3000
    
    rocketmq:
      name-server: 39.96.3.100:9876
      producer:
        group: producer_bank1
    
    
    1. 启动类
    package com.wry.dtx.bank1;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class Bank1ServerMqApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(Bank1ServerMqApplication.class, args);
        }
    
    }
    
    

    具体代码可参考Github:https://github.com/hobbyWang/DistributedTransaction/tree/master/rocket-bank-mq

    小结

    可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了RocketMQ作为 消息中间件,
    主要解决了两个功能:
    1、本地事务与消息发送的原子性问题。
    2、事务参与方接收消息的可靠性。

    可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消 息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。

    展开全文
  • 一、可靠消息最终一致性事务概述 可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终...

    一、可靠消息最终一致性事务概述


    可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。 此方案是利用消息中间件完成,如下图:

    事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题。因此可靠消息最终一致性方案要解决以下几个问题:
    【1】本地事务与消息发送的原子性问题:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。先来尝试下这种操作,先发送消息,再操作数据库这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,据库操作失败。

    begin transaction; 
        //1.发送MQ 
        //2.数据库操作 
    commit transation;

    第二种方案,先进行数据库操作,再发送消息:这种情况下貌似没有问题,如果发送 MQ消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但 MQ其实已经正常发送了,同样会导致不一致。

    begin transaction; 
        //1.数据库操作 
        //2.发送MQ 
    commit transation;

    【2】事务参与方接收消息的可靠性:事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。
    【3】消息重复消费的问题:由于步骤2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

    二、解决方案【本地消息表方案


    本地消息表这个方案最初是 eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。 下面以注册送积分为例来说明:下例共有两个微服务交互,用户服务和积分服务,用户服务负责添加用户,积分服务负责增加积分。

    【交互流程如下】:【1】用户注册 :用户服务在本地事务新增用户和增加 “积分消息日志”。(用户表和消息表通过本地事务保证一致)下边是伪代码,这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原子性。

    begin transaction; 
        //1.新增用户 
        //2.存储积分消息日志 
    commit transation;

    【2】定时任务扫描日志:如何保证将消息发送给消息队列呢?经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。
    【3】消费消息:如何保证消费者一定能消费到消息呢?这里可以使用 MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ 发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重试向消费者来发送消息。积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复投递此消息。由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性。

    三、解决方案【RocketMQ事务消息方案


    RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。RocketMQ 事务消息设计主要为解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。在 RocketMQ 4.3后实现了完整的事务消息,实际上是对本地消息表的一个封装,将本地消息表移动到了 MQ内部,解决Producer 端的消息发送与本地事务执行的原子性问题。

    执行流程如下为方便理解我们还以注册送积分的例子来描述整个流程。Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。
    【1】Producer 发送事务消息:Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。本例中,Producer 发送 ”增加积分消息“ 到MQ Server。
    【2】MQ Server回应消息发送成功:MQ Server接收到 Producer 发送给的消息则回应发送成功。表示 MQ已接收到消息。
    【3】Producer 执行本地事务:Producer 端执行业务代码逻辑,通过本地数据库事务控制。本例中,Producer 执行添加用户操作。
    【4】消息投递:若 Producer 本地事务执行成功则自动向 MQServer发送 commit消息,MQ Server接收到 Commit消息后将“增加积分消息” 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息。若Producer 本地事务执行失败则自动向 MQServer发送 Rollback消息,MQ Server接收到 Rollback消息后将删除“增加积分消息”。MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里 ack默认自动回应,即程序执行正常则自动回应ack。
    【5】事务回查:如果执行 Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态(维护本地事务状态表)即可。 RoacketMQ提供 RocketMQLocalTransactionListener接口:

    public interface RocketMQLocalTransactionListener {
        /**发送prepare消息成功此方法被回调,该方法用于执行本地事务
        * @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
        * @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
        * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
        */
        RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
    
         /**@param msg 通过获取transactionId来判断这条消息的本地事务执行状态
          * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
          */
        RocketMQLocalTransactionState checkLocalTransaction(Message msg);
     }

    【6】发送事务消息:以下是 RocketMQ提供用于发送事务消息的API:

    TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    //设置TransactionListener实现
    producer.setTransactionListener(transactionListener);
    //发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);

    四、RocketMQ实现可靠消息最终一致性事务


    业务说明】通过 RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟两个账户的转账交易过程。两个账户在分别在不同的银行(张三在 bank1、李四在 bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定金额。 上述交易步骤,张三扣减金额与给 bank2发转账消息,两个操作必须是一个整体性的事务。


    【核心代码】:程序技术架构如下:

    【交互流程如下】:【1】Bank1向 MQ Server发送转账消息;
    【2】Bank1执行本地事务,扣减金额;
    【3】Bank2接收消息,执行本地事务,添加金额;
    【数据库】:在bank1、bank2数据库中新增 de_duplication,交易记录表(去重表),用于交易幂等控制。

    DROP TABLE IF EXISTS `de_duplication`; 
    CREATE TABLE `de_duplication` ( 
        `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, 
        `create_time` datetime(0) NULL DEFAULT NULL, 
        PRIMARY KEY (`tx_no`) USING BTREE 
    ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

    版本依赖在父工程中指定了rocketmq-spring-boot-starter的版本

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.2</version>
    </dependency>

    配置rocketMQ在application-local.propertis 中配置 rocketMQ nameServer地址及生产组。

    rocketmq.producer.group = producer_bank2
    rocketmq.name-server = 127.0.0.1:9876

    张三服务层代码

    import com.alibaba.fastjson.JSONObject;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    /**
     * @author Administrator
     * @version 1.0
     **/
    @Service
    @Slf4j
    public class AccountInfoServiceImpl implements AccountInfoService {
    
        @Autowired
        AccountInfoDao accountInfoDao;
    
        @Autowired
        RocketMQTemplate rocketMQTemplate;
    
    
        //向mq发送转账消息
        @Override
        public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
    
            //将accountChangeEvent转成json
            JSONObject jsonObject =new JSONObject();
            jsonObject.put("accountChange",accountChangeEvent);
            String jsonString = jsonObject.toJSONString();
            //生成message类型
            Message<String> message = MessageBuilder.withPayload(jsonString).build();
            //发送一条事务消息
            /**
             * String txProducerGroup 生产组
             * String destination topic,
             * Message<?> message, 消息内容
             * Object arg 参数
             */
            rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null);
    
        }
    
        //更新账户,扣减金额
        @Override
        @Transactional
        public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
            //幂等判断,txNo是在Ctroller中生成的 UUID,全局唯一
            if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
                return ;
            }
            //扣减金额
            accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);
            //添加事务日志
            accountInfoDao.addTx(accountChangeEvent.getTxNo());
            if(accountChangeEvent.getAmount() == 3){
                throw new RuntimeException("人为制造异常");
            }
        }
    }

    张三RocketMQLocalTransactionListener编写 RocketMQLocalTransactionListener接口实现类,实现执行本地事务和事务回查两个方法。

    import com.alibaba.fastjson.JSONObject;
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.springframework.messaging.Message;
    import org.springframework.transaction.annotation.Transactional;
    
    /**
     * @author Administrator
     * @version 1.0
     **/
    @Component
    @Slf4j
    //生产者组与发送消息时定义组相同
    @RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
    public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
    
        @Autowired
        AccountInfoService accountInfoService;
    
        @Autowired
        AccountInfoDao accountInfoDao;
    
        //事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
        @Override
        @Transactional
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    
            try {
                //解析message,转成AccountChangeEvent
                String messageString = new String((byte[]) message.getPayload());
                JSONObject jsonObject = JSONObject.parseObject(messageString);
                String accountChangeString = jsonObject.getString("accountChange");
                //将accountChange(json)转成AccountChangeEvent
                AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
                //执行本地事务,扣减金额
                accountInfoService.doUpdateAccountBalance(accountChangeEvent);
                //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                e.printStackTrace();
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        //事务状态回查,查询是否扣减金额
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            //解析message,转成AccountChangeEvent
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            String accountChangeString = jsonObject.getString("accountChange");
            //将accountChange(json)转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //事务id
            String txNo = accountChangeEvent.getTxNo();
            int existTx = accountInfoDao.isExistTx(txNo);
            if(existTx>0){
                return RocketMQLocalTransactionState.COMMIT;
            }else{
                return RocketMQLocalTransactionState.UNKNOWN;
            }
        }
    }

    李四服务层代码

    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    /**
     * @author Administrator
     * @version 1.0
     **/
    @Service
    @Slf4j
    public class AccountInfoServiceImpl implements AccountInfoService {
    
        @Autowired
        AccountInfoDao accountInfoDao;
    
        //更新账户,增加金额
        @Override
        @Transactional
        public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
            log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
            if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
                return ;
            }
            //增加金额
            accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
            //添加事务记录,用于幂等
            accountInfoDao.addTx(accountChangeEvent.getTxNo());
            if(accountChangeEvent.getAmount() == 4){
                throw new RuntimeException("人为制造异常");
            }
        }
    }

    【MQ监听类】:通过实现 RocketMQListener接口监听目标 Topic

    import com.alibaba.fastjson.JSONObject;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    
    /**
     * @author Administrator
     * @version 1.0
     **/
    @Component
    @Slf4j
    @RocketMQMessageListener(consumerGroup = "consumer_group_txmsg_bank2",topic = "topic_txmsg")
    public class TxmsgConsumer implements RocketMQListener<String> {
    
        @Autowired
        AccountInfoService accountInfoService;
    
        //接收消息
        @Override
        public void onMessage(String message) {
            log.info("开始消费消息:{}",message);
            //解析消息
            JSONObject jsonObject = JSONObject.parseObject(message);
            String accountChangeString = jsonObject.getString("accountChange");
            //转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //设置账号为李四的
            accountChangeEvent.setAccountNo("2");
            //更新本地账户,增加金额
            accountInfoService.addAccountInfoBalance(accountChangeEvent);
        }
    }

    五、总结


    可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了 RocketMQ作为消息中间件,RocketMQ主要解决了两个功能:
    【1】本地事务与消息发送的原子性问题;
    【2】事务参与方接收消息的可靠性;
    可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。

    展开全文
  • 分布式事务解决方案之可靠消息最终一致性 什么是可靠消息最终一致性事务 可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能 够接收消息并处理事务成功,此...

    分布式事务解决方案之可靠消息最终一致性

    什么是可靠消息最终一致性事务

    可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能 够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。 此方案是利用消息中间件完成,如下图: 事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消息中间件 之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事 务问题。
    在这里插入图片描述
    因此可靠消息最终一致性方案要解决以下几个问题:
    1.本地事务与消息发送的原子性问题

    本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实 现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最 终一致性方案的关键问题。 先来尝试下这种操作,先发送消息,再操作数据库。

    先来尝试下这种操作,先发送消息,再操作数据库:

    begin transaction;
    	 //1.发送MQ 
    	 //2.数据库操作 
    commit transation;
    

    这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。 你立马想到第二种方案,先进行数据库操作,再发送消息:

    begin transaction;
    	//1.数据库操作 
    	//2.发送MQ
    commit transation;
    

    这种情况下貌似没有问题,如果发送MQ消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数 据库回滚,但MQ其实已经正常发送了,同样会导致不一致。

    2、事务参与方接收消息的可靠性
    事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。

    3、消息重复消费的问题
    由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重 复消费。 要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

    解决方案

    本地消息表方案

    本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后 通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。

    下面以注册送积分为例来说明:

    下例共有两个微服务交互,用户服务和积分服务,用户服务负责添加用户,积分服务负责增加积分。
    在这里插入图片描述
    交互流程如下:
    1、用户注册
    用户服务在本地事务新增用户和增加 ”积分消息日志“。(用户表和消息表通过本地事务保证一致) 下边是伪代码

    begin transaction;
    	 //1.新增用户 
    	 //2.存储积分消息日志 
    commit transation;
    

    这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原子性。

    2、定时任务扫描日志

    如何保证将消息发送给消息队列呢?

    经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息 中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。

    3、消费消息

    如何保证消费者一定能消费到消息呢?

    这里可以使用MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ 发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重 试向消费者来发送消息。 积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复 投递此消息。

    由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性。

    RocketMQ事务消息方案

    RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项 目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之 上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。

    RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

    在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ 内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。
    在这里插入图片描述

    执行流程如下:
    为方便理解我们还以注册送积分的例子来描述 整个流程。
    Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。
    1、Producer 发送事务消息
    Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态,注意此时这条消息消费者(MQ订阅方)是无法消费到的。 本例中,Producer 发送 ”增加积分消息“ 到MQ Server。
    2、MQ Server回应消息发送成功
    MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。
    3、Producer 执行本地事务
    Producer 端执行业务代码逻辑,通过本地数据库事务控制。 本例中,Producer 执行添加用户操作。
    4、消息投递
    若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;
    若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删 除”增加积分消息“ 。
    MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即 程序执行正常则自动回应ack。
    5、事务回查
    如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer 来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。 以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此 只需关注本地事务的执行状态即可。

    RoacketMQ提供RocketMQLocalTransactionListener接口:

    public interface RocketMQLocalTransactionListener {
            /**
             *  发送prepare消息成功此方法被回调,该方法用于执行本地事务 
             *  @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id 
             *  @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到 
             *  @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
             */
            RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
    
            /**
             * @param msg 通过获取transactionId来判断这条消息的本地事务执行状态 
             * @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
             */
            RocketMQLocalTransactionState checkLocalTransaction(Message msg);
        }
    

    发送事务消息:
    以下是RocketMQ提供用于发送事务消息的API:

     TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); 
     producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); 
     //设置TransactionListener实现 
     producer.setTransactionListener(transactionListener;
     //发送事务消息
     SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    
    展开全文
  • 分布式事务八_可靠消息最终一致性方案更多干货分布式事务处理一分布式事务二分布式事务处理三分布式事务四_基于可靠消息的最终一致性分布式事务五_基于可靠消息的最终一致性_异常流程分布式事务六_常规MQ队列分布式...
  • 可靠消息最终一致性-分布式事务解决方案
  • 分布式事务之可靠消息最终一致性通常使用rocketmq的事务消息机制来实现。 其大致流程如下 1、Producer 发送事务消息 Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态...
  • 1. 什么是可靠消息最终一致性 可靠消息最终一致性:是指当事务发起方执行完成本地事务后发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理成功,即强调的是只要消息发给事务参与方最终事务要达到一致。 ...
  • 5. 分布式事务解决方案之可靠消息最终一致性 5.1. 什么是可靠消息最终一致性事务 可靠消息最终一致性方案是指当事务发起执行完全本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功...
  • 一、可靠消息最终一致性事务概述 二、解决方案【本地消息表方案】 三、解决方案【RocketMQ事务消息方案】 四、RocketMQ实现可靠消息最终一致性事务 五、总结 一、可靠消息最终一致性事务概述 可靠消息最终一致...
  • 可靠消息最终一致性原理 执行流程 Producer发送Prepare message到broker。 Prepare Message发送成功后开始执行本地事务。 如果本地事务执行成功的话则返回commit,如果执行失败则返回rollback。(这个是在...
  • 可靠消息最终一致性 可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。 ...
  • 1.什么是可靠消息最终一致性事务 ​ 可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方...
  • 什么是可靠消息最终一致性 可靠消息最终一致性方案是指:当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功. 此方案强调的是只要消息发给事务参与方最终事务要...
  • RocketMQ可靠消息最终一致性解决方案 - 用户消费赚积分业务 ================================ ©Copyright 蕃薯耀2021-05-14 https://blog.csdn.net/w995223851 什么是可靠消息最终一致性方案? 可靠消息...
  • 前面一篇文章介绍了RocketMQ实现可靠消息最终一致性事务的整体流程,大部分都是理论知识,本篇文章我们将结合前面我们使用过的【银行转账案例】详细说明如何利用RocketMQ实现可靠消息最终一致性事务。 二、案例说明...
  • 可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能 够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。 此方案是利用...
  • 今天给大家分享的是分布式事务:可靠消息最终一致性 这个的意思,就是干脆不要用本地的消息了,直接基于MQ来实现事务,比如阿里的RocketMQ就支持消息事务 A系统先发送一个prepared消息到mq,如果这个prepared...
  • 可靠消息最终一致性方案 基于 MQ 来实现事务。比如阿里的 RocketMQ 就支持消息事务。 大概的意思就是: A 系统先发送一个 prepared 消息到 mq,如果这个 prepared 消息发送失败那么就直接取消操作别执行了; 如果这...
  • 柔性事务:可靠消息最终一致性 消息发送一致性:是指产生消息的业务动作与消息发送的一致。也就是说,如果业务操作成功,那么由这个业务操作所产生的消息一定要成功投递出去(一般是发送到kafka、rocketmq、rabbitmq...
  • 在《分布式事务:解决方案之可靠性消息最终一致性理论》中我们说到可靠消息最终一致性主要有两个解决方案:一个是本地消息表+MQ;一个是RocketMQ事务消息。今天我们重点演示一下第二个方案的实现细节。 1. 业务说明 ...
  • 分布式事务解决方案一之:可靠消息最终一致性
  • 如果要实现可靠消息最终一致性方案,一般你可以自己写一个可靠消息服务,实现一些业务逻辑。 首先,上游服务需要发送一条消息给可靠消息服务。这条消息说白了,你可以认为是对下游服务一个接口的调用,里面包含了...
  • 本章,我们将要介绍一种生产上最常用的分布式事务解决方案——可靠消息最终一致性方案。所谓可靠消息最终一致性方案,其实就是在分布式系统当中,把一个业务操作转换成一个消息,然后利用消息来实现事务的最终一致性...
  • 在上一篇研读《可靠消息最终一致性方案(本地消息服务)》 我介绍了通过本地消息服务来实现分布式事务最终一致性,最后也说了一下这种实现方案的优缺点,现在就来介绍一下如何通过独立消息服务实现分布式事务最终...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,008
精华内容 403
关键字:

可靠消息最终一致性