为您推荐:
精华内容
最热下载
问答
  • Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。 Seata主要有两种分布式事务实现...

    一、seata是什么

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

    Seata主要有两种分布式事务实现方案,AT及TCC

    1、AT模式主要关注多 DB 访问的数据一致性,当然也包括多服务下的多 DB 数据访问一致性 问题
    2、TCC模式主要关注业务拆分,在按照业务横向扩展资源时,解决微服务间调用的一致性问题

    术语:
    TC (Transaction Coordinator) - 事务协调者
    维护全局和分支事务的状态,驱动全局事务提交或回滚。
    TM (Transaction Manager) - 事务管理器
    定义全局事务的范围:开始全局事务、提交或回滚全局事务。
    RM (Resource Manager) - 资源管理器
    管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

    二、AT 模式

    基于支持本地 ACID 事务的关系型数据库。

    整体机制
    两阶段提交协议的演变:

    一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

    二阶段:

    提交异步化,非常快速地完成。
    回滚通过一阶段的回滚日志进行反向补偿。
    

    写隔离
    一阶段本地事务提交前,需要确保先拿到 全局锁 。
    拿不到 全局锁 ,不能提交本地事务。
    拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
    以一个示例来说明:

    两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

    tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。

    在这里插入图片描述
    tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

    在这里插入图片描述

    如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

    此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

    因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

    读隔离

    在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。

    如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

    在这里插入图片描述
    SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

    出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

    工作机制

    以一个示例来说明整个 AT 分支的工作过程。

    业务表:product

    在这里插入图片描述
    AT 分支事务的业务逻辑:

    update product set name = ‘GTS’ where name = ‘TXC’;

    一阶段
    过程:

    1、解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息。
    2、查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。

    select id, name, since from product where name = ‘TXC’;

    得到前镜像:
    在这里插入图片描述

    3、执行业务 SQL:更新这条记录的 name 为 ‘GTS’。
    4、查询后镜像:根据前镜像的结果,通过 主键 定位数据。

    select id, name, since from product where id = 1;

    得到后镜像:
    在这里插入图片描述
    5、插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。

    {
    	"branchId": 641789253,
    	"undoItems": [{
    		"afterImage": {
    			"rows": [{
    				"fields": [{
    					"name": "id",
    					"type": 4,
    					"value": 1
    				}, {
    					"name": "name",
    					"type": 12,
    					"value": "GTS"
    				}, {
    					"name": "since",
    					"type": 12,
    					"value": "2014"
    				}]
    			}],
    			"tableName": "product"
    		},
    		"beforeImage": {
    			"rows": [{
    				"fields": [{
    					"name": "id",
    					"type": 4,
    					"value": 1
    				}, {
    					"name": "name",
    					"type": 12,
    					"value": "TXC"
    				}, {
    					"name": "since",
    					"type": 12,
    					"value": "2014"
    				}]
    			}],
    			"tableName": "product"
    		},
    		"sqlType": "UPDATE"
    	}],
    	"xid": "xid:xxx"
    }
    

    6、提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁 。
    7、本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
    8、将本地事务提交的结果上报给 TC。

    二阶段-回滚

    1、收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
    2、通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
    3、数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前4全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
    4、根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:

    update product set name = ‘TXC’ where id = 1;

    5、提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

    二阶段-提交

    1、收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
    2、异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

    三、TCC 模式

    回顾总览中的描述:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

    一阶段 prepare 行为
    二阶段 commit 或 rollback 行为

    在这里插入图片描述
    根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.

    AT 模式(参考链接 TBD)基于 支持本地 ACID 事务 的 关系型数据库:

    一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
    二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
    二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

    相应的,TCC 模式,不依赖于底层数据资源的事务支持:

    一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
    二阶段 commit 行为:调用 自定义 的 commit 逻辑。
    二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

    所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

    展开全文
    qq_20161461 2021-09-20 09:53:54
  • 原理 seata涉及到三个角色之间的交互,本文通过流程图将AT模式下的基本交互流程梳理一下,为我们以后的解析打下基础。 假设有三个微服务,分别是服务A、B、C,其中服务A中调用了服务B和服务C,TM、TC、RM三者之间的...

    原理

    seata涉及到三个角色之间的交互,本文通过流程图将AT模式下的基本交互流程梳理一下,为我们以后的解析打下基础。
    假设有三个微服务,分别是服务A、B、C,其中服务A中调用了服务B和服务C,TM、TC、RM三者之间的交互流程如下图:

    在这里插入图片描述

    • 1、服务A启动时,GlobalTransactionScanner会对有@GlobalTransaction注解的方法进行AOP增强,并生成代理,增强的代码位于GlobalTransactionalInterceptor类中,当调用@GlobalTransaction注解的方法时,增强代码首先向TC注册全局事务,表示全局事务的开始,同时TC生成XID,并返回给TM;
    • 2、服务A中调用服务B时,将XID传递给服务B;
    • 3、服务B得到XID后,访问TC,注册分支事务,并从TC获得分支事务ID,TC根据XID将分支事务与全局事务关联;
    • 4、接下来服务B开始执行SQL语句,在执行前将表中对应的数据保存一份,执行后在保存一份,将这两份记录作为回滚记录写入到数据库中,如果执行过程中没有异常,服务B最后将事务提交,并通知TC分支事务成功,服务B也会清除本地事务数据;
    • 5、服务A访问完服务B后,访问服务C;
    • 6、服务C与TC之间的交互与服务B完全一致;
    • 7、服务B和服务C都成功后,服务A通过TM通知TC全局事务成功,如果失败了,服务A也会通知TC全局事务失败;
    • 8、TC记录了全局事务下的每个分支事务,TC收到全局事务的结果后,如果结果成功,则通知RM成功,RM收到通知后清理之前在数据库中保存的回滚记录,如果失败了,则RM要查询出之前在数据库保存的回滚记录,对之前的SQL操作进行回滚。

    因为TM、RM、TC之间的交互都是通过网络完成的,很容易出现网络断开的情况,因此TC提供了四个定时线程池,定时检测系统中是否有超时事务、异步提交事务、回滚重试事务、重试提交事务,如果发现了有这四类事务,则从全局事务中获取所有的分支事务,分别调用各个分支事务完成对应的操作,依次来确保事务的一致性。

    需要考虑的问题:
    通过上面流程的分析可以发现,每次SQL操作(查询除外)时,都会增加额外了三次数据库操作;每次全局事务和分支事务开启时,都涉及到TM、RM与TC的交互;全局事务期间还要承担数据短时不一致的情况,这些都是我们在使用AT模式需要考虑的情况。

    案例

    用户购买操作:
    入口BusinessServiceImpl.java,里面三个dubbo接口 storageDubboService(扣减库存) ,orderDubboService(创建订单,里面又调用了另外的接口accountDubboService-扣减账户资金)

        /**
         * 模拟用户购买商品下单业务逻辑流程
         * @Param:
         * @Return:
         */
        @PostMapping("/buy")
        ObjectResponse handleBusiness(@RequestBody BusinessDTO businessDTO){
            LOGGER.info("请求参数:{}",businessDTO.toString());
            return businessService.handleBusiness(businessDTO);
        }
    
    /**
         * 处理业务逻辑 GlobalTransactional 开启全局事务,生成XID
         * @Param:
         * @Return:
         */
        @Override
        @GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-seata-example")
        public ObjectResponse handleBusiness(BusinessDTO businessDTO) {
            System.out.println("开始全局事务,XID = " + RootContext.getXID());
            ObjectResponse<Object> objectResponse = new ObjectResponse<>();
            //1、扣减库存
            CommodityDTO commodityDTO = new CommodityDTO();
            commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
            commodityDTO.setCount(businessDTO.getCount());
            ObjectResponse storageResponse = storageDubboService.decreaseStorage(commodityDTO);
            //2、创建订单
            OrderDTO orderDTO = new OrderDTO();
            orderDTO.setUserId(businessDTO.getUserId());
            orderDTO.setCommodityCode(businessDTO.getCommodityCode());
            orderDTO.setOrderCount(businessDTO.getCount());
            orderDTO.setOrderAmount(businessDTO.getAmount());
            ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO);
    
            //打开注释测试事务发生异常后,全局回滚功能
            if (!flag) {
                throw new RuntimeException("测试抛异常后,分布式事务回滚!");
            }
    
            if (storageResponse.getStatus() != 200 || response.getStatus() != 200) {
                throw new DefaultException(RspStatusEnum.FAIL);
            }
    
            objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode());
            objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage());
            objectResponse.setData(response.getData());
            return objectResponse;
        }
    
    
    

    StorageDubboServiceImpl.java扣减库存

    @Service(version = "1.0.0",protocol = "${dubbo.protocol.id}",
            application = "${dubbo.application.id}",registry = "${dubbo.registry.id}",
            timeout = 3000)
    public class StorageDubboServiceImpl implements StorageDubboService {
    
        @Autowired
        private ITStorageService storageService;
    
        @Override
        public ObjectResponse decreaseStorage(CommodityDTO commodityDTO) {
            System.out.println("全局事务id :" + RootContext.getXID());
            return storageService.decreaseStorage(commodityDTO);
        }
    }
    

    下单OrderDubboServiceImpl.java,createOrder里面包含两步,扣减账户余额,生成订单

    @Service(version = "1.0.0",protocol = "${dubbo.protocol.id}",
            application = "${dubbo.application.id}",registry = "${dubbo.registry.id}",
            timeout = 3000)
    public class OrderDubboServiceImpl implements OrderDubboService {
    
        @Autowired
        private ITOrderService orderService;
    
        @Override
        public ObjectResponse<OrderDTO> createOrder(OrderDTO orderDTO) {
            System.out.println("全局事务id :" + RootContext.getXID());
            return orderService.createOrder(orderDTO);
        }
    }
    
        /**
         * 创建订单
         * @Param:  OrderDTO  订单对象
         * @Return:  OrderDTO  订单对象
         */
        @Override
        public ObjectResponse<OrderDTO> createOrder(OrderDTO orderDTO) {
            ObjectResponse<OrderDTO> response = new ObjectResponse<>();
            //扣减用户账户
            AccountDTO accountDTO = new AccountDTO();
            accountDTO.setUserId(orderDTO.getUserId());
            accountDTO.setAmount(orderDTO.getOrderAmount());
            ObjectResponse objectResponse = accountDubboService.decreaseAccount(accountDTO);
    
            //生成订单号
            orderDTO.setOrderNo(UUID.randomUUID().toString().replace("-",""));
            //生成订单
            TOrder tOrder = new TOrder();
            BeanUtils.copyProperties(orderDTO,tOrder);
            tOrder.setCount(orderDTO.getOrderCount());
            tOrder.setAmount(orderDTO.getOrderAmount().doubleValue());
            try {
                baseMapper.createOrder(tOrder);
            } catch (Exception e) {
                response.setStatus(RspStatusEnum.FAIL.getCode());
                response.setMessage(RspStatusEnum.FAIL.getMessage());
                return response;
            }
    
            if (objectResponse.getStatus() != 200) {
                response.setStatus(RspStatusEnum.FAIL.getCode());
                response.setMessage(RspStatusEnum.FAIL.getMessage());
                return response;
            }
    
            response.setStatus(RspStatusEnum.SUCCESS.getCode());
            response.setMessage(RspStatusEnum.SUCCESS.getMessage());
            return response;
        }
    

    AT TCC Sega 区别

    AT模式

    两阶段:
    1.seata拦截sql,并解析,更新业务数据,记录更新前和更新后的数据快照。此时更新后的数据,是对外可见的。
    2.执行成功,分布式事务提交。如果偶一个执行失败,开始回滚,由于第一阶段里面,数据已经更新到数据库,所以要对比当前数据和之前保存的数据快照是否一致,避免发生数据错误,完成校验后用before还原数据。

    TCC模式

    1.try:做业务检查和资源预留
    2.confirm:确认提交
    3.cancel:业务执行错误之后需要回滚的状态下执行事务的业务取消和预留。

    如:下单操作有两个步骤:资金扣减、创建订单

    try(数据准备):冻结资金、扣减账户余额。
    confirm;数据提交。直接的提交,扣减资金,调用某个分录余额。
    cancel:数据回滚

    在这里插入图片描述

    在这里插入图片描述

    AT和TCC的区别

    AT是基于支持本地ACID事务的关系型数据库

    一阶段prepare 行为,在本地事务中,同时提交数据更新前、后的快照信息
    二阶段commit: 成功后结束,异步清理之前的快照。
    二阶段rollback: 对比之前的保存的数据更新后的快照,完成数据回滚

    TCC 不依赖底层数据库的事务支持。
    一阶段prepare行为:调用自定义的prepare 逻辑
    二阶段commit,调用自定义的commit
    三阶段rollback 调用自定义rollback

    saga模式

    sage是seata提供的长事务解决方案。
    每个参与者单独提交自己的事务,当某一个参与者失败,则补偿回滚前面已经成功的参与者。
    一阶段正向服务和二阶段补偿服务都由服务开发实现
    长链路金融应用、渠道聚合等

    在这里插入图片描述

    XA模式

    XA 模式是 Seata 将会开源的另一种无侵入的分布式事务解决方案
    无侵入
    将快照数据和行锁等通过 XA 指令委托给了数据库来完成
    XA模式是分布式强一致性的解决方案,但性能低而使用较少

    展开全文
    a718515028 2021-05-10 11:27:44
  • seata是阿里开源的一个分布式事务框架,能够让大家在操作分布式事务时,像操作本地事务一样简单。一个注解搞定分布式事务。 有些地方官网文档写的可能比较难以理解,这里用较为简单的方式来描述一下。 快速入门 ...

    seata是阿里开源的一个分布式事务框架,能够让大家在操作分布式事务时,像操作本地事务一样简单。一个注解搞定分布式事务。

    有些地方官网文档写的可能比较难以理解,这里用较为简单的方式来描述一下。

    快速入门

    譬如你有两个微服务,一个是库存模块StorageService,一个是订单模块OrderService,主业务是用户下单,然后需要分别调用上面的两个服务,完成减库存、用户扣款和下单操作。由于两个服务是不同的服务,并且是不同的数据库,那么这就是一个典型的分布式事务场景。我们希望要么全部成功,要么全部失败。

    /**
     * 用户下单
     */
    public void purchase(String userId, String commodityCode, int orderCount) {
            //减库存
            storageService.deduct(commodityCode, orderCount);
            //扣款、生成订单
            orderService.create(userId, commodityCode, orderCount);
        }

    其中OrderService做了如下操作,扣减钱款、生成订单。库存Service就是减了一下库存。

    那么要完成这次分布式事务,只需要在purchase方法上加个注解即可。看起来确实是一个注解,解决所有。

     @GlobalTransactional
        public void purchase(String userId, String commodityCode, int orderCount) {
            ......
        }

    @GlobalTransactional注解

    被这个注解包围的方法,是怎么个执行流程,下面来看一下。

    public class TransactionalTemplate {
    
        public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
    
            // 1. 获取当前全局事务实例或创建新的实例
            GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    
            // 2. 开启全局事务
            try {
                tx.begin(business.timeout(), business.name());
    
            } catch (TransactionException txe) {
                // 2.1 开启失败
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.BeginFailure);
    
            }
    
            Object rs = null;
            try {
                // 3. 调用业务服务
                rs = business.execute();
    
            } catch (Throwable ex) {
    
                // 业务调用本身的异常
                try {
                    // 全局回滚
                    tx.rollback();
    
                    // 3.1 全局回滚成功:抛出原始业务异常
                    throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
    
                } catch (TransactionException txe) {
                    // 3.2 全局回滚失败:
                    throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, ex);
    
                }
    
            }
    
            // 4. 全局提交
            try {
                tx.commit();
    
            } catch (TransactionException txe) {
                // 4.1 全局提交失败:
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.CommitFailure);
    
            }
            return rs;
        }
    
    }

    被注解包围的方法,其实就是第三步,可以看一下流程:

    第一步:初始化一个全局事务的实例,该实例拥有一个唯一的id,称之为XID,这个XID会在整个分布式事务的各个服务间流转。作为这一次分布式事务的唯一标识。

    第二步:开启全局事务,设置超时时间等属性,以避免拿不到锁时,无限的等待。

    第三步:执行逻辑,(具体的各个服务,执行各自的本地事务)。如果任何一个服务的本地事务出现异常,则进入回滚全局事务。如果回滚成功,就抛出那个本地事务失败的异常。如果回滚全局事务失败,则抛出异常原因。

    第四步:标记该次全局事务状态为完成,通知各单体服务该次事务已经完成,请继续下一步。各单体服务收到该请求后,会立刻返回成功,然后异步删除之前本地事务前保存的回滚信息。

    这里面需要注意的,和其他的常见的分布式事务不同的地方在于第三步的单体服务的执行逻辑。在生成全局事务XID并打出begin的发令枪后,各个单体服务会立刻开始自己的本地事务,而不会去关心别的服务的情况。在本地事务执行成功、失败后,会通知TC中心,说我成功、失败了。TC会记录下来每个单体服务的状态,如果全部成功了,那么就进行下一步(第四步)。如果有任何一个失败了,就通知各个单体服务,进行各自的回滚。

    很明显,这种处理方式减少了XA两阶段提交的锁的时间,而且并不依赖于数据库本身的回滚机制,靠的是TC这个server端维护这一次XID中各个单体服务的执行状态,回滚时靠着自己保存的回滚语句进行回滚。可以明显提高各事务的并发执行。

    详细流程

    来看一下单体服务具体的执行流程。

    第一步:解析sql语句,得到 SQL 的类型(UPDATE),表(product),条件(where name = 'TXC')等相关的信息。

    第二步:查询老数据,根据上面的where语句sql,去数据库查询原始的数据。

    如 select * from product where name = 'TXC';得到原始的数据,如该行id=1,然后记录下来。
    

    第三步:执行第一步的sql语句,即执行update,修改数据库的该记录的值。

    第四步:查询修改后的值,select * from product where id =1.得到该行值,记录下来。

    第五步:插入回滚日志,将老值、新值以及sql语句组成一个将来可用于回滚的日志,插入到UNDO_LOG表。

    {
    	"branchId": 641789253,
    	"undoItems": [{
    		"afterImage": {
    			"rows": [{
    				"fields": [{
    					"name": "id",
    					"type": 4,
    					"value": 1
    				}, {
    					"name": "name",
    					"type": 12,
    					"value": "GTS"
    				}, {
    					"name": "since",
    					"type": 12,
    					"value": "2014"
    				}]
    			}],
    			"tableName": "product"
    		},
    		"beforeImage": {
    			"rows": [{
    				"fields": [{
    					"name": "id",
    					"type": 4,
    					"value": 1
    				}, {
    					"name": "name",
    					"type": 12,
    					"value": "TXC"
    				}, {
    					"name": "since",
    					"type": 12,
    					"value": "2014"
    				}]
    			}],
    			"tableName": "product"
    		},
    		"sqlType": "UPDATE"
    	}],
    	"xid": "xid:xxx"
    }

    第六步:向TC server注册分支,申请product表,id=1的行的全局锁。注意,这个全局锁是相对于所有可能的同时在执行的分布式事务而言的。一旦某个分支,获取了该记录的全局锁,在解锁之前,任何其他的分布式事务,不能修改该数据。

    第七步:本地事务提交,将自己的本地事务、和前面的UNDO LOG一起提交。

    第八步:将本地事务提交的结果上报给TC server。如成功、失败。

    此时TC会陆续收到各个分支的执行结果,在各分支全部提交完毕后,TC会下发最终结果给各分支。

    开始第二阶段。

    成功的情况:分支收到了TC下发的成功请求,立马返回我已OK的结果给TC,然后异步执行删除UNDO LOG的操作。因为成功了,所以用来回滚的UNDO LOG就没意义了,异步删除掉就好。

    失败的情况:

    1 分支收到了TC下发的失败请求,开始执行回滚逻辑。

    2 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。

    3 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。

    4 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句。

    update product set name = 'TXC' where id = 1;
    

    5 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

    结论:

    可以看到,整体来说,这个分布式事务是比较迅速的,在不等待全局锁的情况下,基本和本地事务没什么区别。回滚时,也不依赖数据库本身的回滚能力,都由自己业务来实现回滚操作。

    这里还是有个问题点的,就是第3步。在回滚时,发现老数据已经被其他的事务修改了,该怎么处理。

    全局锁和本地锁

    上面提到了,在操作同一个数据时,涉及了全局锁的概念。这是为了控制多个分布式事务,同时操作同一条数据时,造成的数据不一致。

    定义整个分布式事务为两个阶段:

    一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

    二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。

    那么锁的限制就是:

    一阶段本地事务提交前,需要确保先拿到 全局锁 。
    拿不到 全局锁 ,不能提交本地事务。
    拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

    举例:

    两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

    tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。

    tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁

     

    如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

    此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

    因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

     

    参考:

    https://www.jianshu.com/p/77a95a0cf850

    https://github.com/seata/seata/wiki/Quick-Start

     

    展开全文
    tianyaleixiaowu 2019-07-09 20:16:44
  • Seata 分布式事务的精简使用教程和原理浅析一、说明二、Seata 简介2.1、Seata 是什么?2.2、Seata 的整体架构2.2.1、主要角色2.2.2、整体架构和工作流程图2.3、Seata 的事务模式2.3.1、Seata 的 AT 事务模式2.4、...

    一、说明

    • 本博客首先会对 Seata 进行简单介绍,然后演示如何在项目中使用 Seata 分布式事务(仅 AT 模式),并结合监控工具简单分析它的工作原理,最后再谈谈博主在工作中关于 Seata 的使用感受。
      • Seata 简介
        • 介绍一些 Seata 的重要概念,和胖友们在一些事情上达成共识。
      • Seata 通用接入流程
        • 这部分内容也属于通用知识,和具体工作环境无关。
        • 介绍 Server 端和 Client 端的使用,尤其是 Client 端使用不同微服务框架时如何解决事务传播问题。
      • Seata 分布式事务使用演示
        • 可以看做是上面 “Seata 通用接入流程” 的实战版。
        • 由于博主目前工作的公司使用的是 SpringBoot 1.x 版本,在一些配置细节上可能和某些胖友不同,但万变不离其宗,只要能理解它的核心思想,相信这些小细节问题对于各位优秀的胖友们都是小意思。
      • Seata 工作原理简单分析
        • 首先会提供通过监控工具得到的工作流程截图,让大家有个直观的认知。
        • 然后介绍代码中关键的类。
    • 胖友们可以把本博客当成是一个快速上手的参考手册,但如果想要全面系统地了解 Seata 框架,建议查看 Seata 的官方文档以及 GitHub

    二、Seata 简介

    看到博主这篇博客的胖友,想必大多都是从业 n 年的同仁,所以一些基础概念的介绍和铺垫这里就省略了,而只介绍博主认为最核心的部分。

    2.1、Seata 是什么?

    • Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

    2.2、Seata 的整体架构

    2.2.1、主要角色

    • TC (Transaction Coordinator) - 事务协调者
      • 维护全局和分支事务的状态,驱动全局事务提交或回滚。
    • TM (Transaction Manager) - 事务管理器
      • 定义全局事务的范围:开始全局事务、提交或回滚全局事务。
    • RM (Resource Manager) - 资源管理器
      • 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
    • 事务回话信息存储
      • 非必须角色,理论上可以不引入额外组件。
      • 事务会话信息存储方式有:file本地文件(不支持HA),db数据库|redis(支持HA)
      • 但从生产实践角度来看,这个组件也是必须的,博主使用的是 MySQL
    • 注册中心
      • 非必须角色,理论上可以不引入额外组件。
      • 默认file,支持file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom
      • 但从生产实践角度来看,这个组件也是必须的,博主使用的是 zk.
    • 配置中心
      • 非必须角色,理论上可以不引入额外组件。
      • 默认file,支持file、nacos 、apollo、zk、consul、etcd3、custom
      • 从生产实践角度来看,这个组件也是必须的,博主使用的是 zk.

    2.2.2、整体架构和工作流程图

    • 整体架构图

      • 在这里插入图片描述
    • 工作流程图

      • 在这里插入图片描述

    2.3、Seata 的事务模式

    • AT
    • TCC
    • SAGA
    • XA 事务模式

    2.3.1、Seata 的 AT 事务模式

    • 前提
      • 基于支持本地 ACID 事务的关系型数据库。
      • Java 应用,通过 JDBC 访问数据库。
    • 整体机制
      • 两阶段提交协议的演变:
        • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
        • 二阶段:
          • 提交异步化,非常快速地完成。
          • 回滚通过一阶段的回滚日志进行反向补偿。
    • 写隔离
      • 一阶段本地事务提交前,需要确保先拿到 全局锁 。
      • 拿不到 全局锁 ,不能提交本地事务。
      • 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁
    • 读隔离
      • 在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)
      • 如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

    2.4、微服务框架支持

    • 从上面的 2.2.2、整体架构和工作流程图 的工作流程图中可以看到,整个事务过程中,需要有一个全局的事务ID,并且需要把这个全局事务ID在各个微服务间传播。

    2.4.1、事务上下文

    • Seata 的事务上下文由 RootContext 来管理。
    • 应用开启一个全局事务后,RootContext 会自动绑定该事务的 XID,事务结束(提交或回滚完成),RootContext 会自动解绑 XID。
    • 应用可以通过 RootContext 的 API 接口(RootContext.getXID())来获取当前运行时的全局事务 XID
    • 应用是否运行在一个全局事务的上下文中,就是通过 RootContext 是否绑定 XID 来判定的

    2.4.2、事务传播

    • 服务内部的事务传播
      • 默认的,RootContext 的实现是基于 ThreadLocal 的,即 XID 绑定在当前线程上下文中
      • 所以服务内部的 XID 传播通常是天然的通过同一个线程的调用链路串连起来的。默认不做任何处理,事务的上下文就是传播下去的
      • 如果希望挂起事务上下文,则需要通过 RootContext 提供的 API 来实现:
        •   // 挂起(暂停)
            String xid = RootContext.unbind();
          
            // TODO: 运行在全局事务外的业务逻辑
          
            // 恢复全局事务上下文
            RootContext.bind(xid);
          
    • 跨服务调用的事务传播
      • 跨服务调用场景下的事务传播,本质上就是要把 XID 通过服务调用传递到服务提供方,并绑定到 RootContext 中去。
      • 只要能做到这点,理论上 Seata 可以支持任意的微服务框架。

    2.5、ORM 框架支持

    • Seata 虽然是保证数据一致性的组件,但对于 ORM 框架并没有特殊的要求,像主流的Mybatis,Mybatis-Plus,Spring Data JPA, Hibernate等都支持。这是因为ORM框架位于JDBC结构的上层,而 Seata 的 AT,XA 事务模式是对 JDBC 标准接口操作的拦截和增强

    2.6、数据库类型支持

    • AT模式支持的数据库有:MySQL、Oracle、PostgreSQL和 TiDB
    • TCC模式不依赖数据源(1.4.2版本)

    2.7、SQL 参考

    • 完整 SQL 参考传送阵
    • 需要特别注意它的使用限制:
      • 不支持 SQL 嵌套
      • 不支持多表复杂 SQL
      • 不支持存储过程、触发器
      • 不支持批量更新 SQL

    三、Seata 通用接入流程

    3.1、Seata Server 端(TC)

    1. 下载程序包并解压
      1. 当前最新版本:seata-server-1.4.2
    2. 配置事务回话信息存储方式
      1. 配置文件:seata-server-1.4.2\conf\file.conf
      2. 事务会话信息存储方式有:file本地文件(不支持HA),db数据库|redis(支持HA),这里配置为 db.
        1. ## transaction log store, only used in seata-server
          store {
          ## store mode: file、db、redis
          mode = "db"
          ## rsa decryption public key
          publicKey = ""
          
          
          ## database store property
           db {
            ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) 	etc.
              datasource = "druid"
              ## mysql/oracle/postgresql/h2/oceanbase etc.
              dbType = "mysql"
              driverClassName = "com.mysql.cj.jdbc.Driver"
              ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
              url = "jdbc:mysql://127.0.0.1:3306/seata?serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true"
              user = "root"
              password = "Andy6666"
              minConn = 5
              maxConn = 100
              globalTable = "global_table"
              branchTable = "branch_table"
              lockTable = "lock_table"
              queryLimit = 100
              maxWait = 5000
            }
          
          }
          
          
      3. 在 MySQL 数据库创建事务回话信息表
        1. -- -------------------------------- The script used when storeMode is 'db' --------------------------------
          -- the table to store GlobalSession data
          CREATE TABLE IF NOT EXISTS `global_table`
          (
          	`xid`                       VARCHAR(128) NOT NULL,
          	`transaction_id`            BIGINT,
          	`status`                    TINYINT      NOT NULL,
          	`application_id`            VARCHAR(32),
          	`transaction_service_group` VARCHAR(32),
          	`transaction_name`          VARCHAR(128),
          	`timeout`                   INT,
          	`begin_time`                BIGINT,
          	`application_data`          VARCHAR(2000),
          	`gmt_create`                DATETIME,
          	`gmt_modified`              DATETIME,
          	PRIMARY KEY (`xid`),
          	KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
          	KEY `idx_transaction_id` (`transaction_id`)
          ) ENGINE = InnoDB
          DEFAULT CHARSET = utf8;
          
          -- the table to store BranchSession data
          CREATE TABLE IF NOT EXISTS `branch_table`
          (
          	`branch_id`         BIGINT       NOT NULL,
          	`xid`               VARCHAR(128) NOT NULL,
          	`transaction_id`    BIGINT,
          	`resource_group_id` VARCHAR(32),
          	`resource_id`       VARCHAR(256),
          	`branch_type`       VARCHAR(8),
          	`status`            TINYINT,
          	`client_id`         VARCHAR(64),
          	`application_data`  VARCHAR(2000),
          	`gmt_create`        DATETIME(6),
          	`gmt_modified`      DATETIME(6),
          	PRIMARY KEY (`branch_id`),
          	KEY `idx_xid` (`xid`)
          ) ENGINE = InnoDB
          DEFAULT CHARSET = utf8;
          
          -- the table to store lock data
          CREATE TABLE IF NOT EXISTS `lock_table`
          (
          	`row_key`        VARCHAR(128) NOT NULL,
          	`xid`            VARCHAR(128),
          	`transaction_id` BIGINT,
          	`branch_id`      BIGINT       NOT NULL,
          	`resource_id`    VARCHAR(256),
          	`table_name`     VARCHAR(32),
          	`pk`             VARCHAR(36),
          	`gmt_create`     DATETIME,
          	`gmt_modified`   DATETIME,
          	PRIMARY KEY (`row_key`),
          	KEY `idx_branch_id` (`branch_id`)
          ) ENGINE = InnoDB
          DEFAULT CHARSET = utf8;
          
          
    3. 配置注册中心和配置中心
      1. 配置文件:seata-server-1.4.2\conf\registry.conf。这里博主把注册中心和配置中心都设置为 zk(胖友们可以根据情况设置成其他的,配置大同小异)。
        1.  registry {
           # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
           type = "zk"
           
           zk {
           	cluster = "default"
           	serverAddr = "127.0.0.1:2181"
           	sessionTimeout = 6000
           	connectTimeout = 2000
           	username = ""
           	password = ""
           }
           
           }
           
           config {
           # file、nacos 、apollo、zk、consul、etcd3
           type = "zk"
           
           zk {
           	cluster = "default"
           	serverAddr = "127.0.0.1:2181"
           	sessionTimeout = 6000
           	connectTimeout = 2000
           	username = ""
           	password = ""
           }
           
           }
          
          
    4. 启动 Server 端
      1. 点击 seata-server-1.4.2\bin\seata-server.bat(或 seata-server.sh)

    3.2、Seata Client 端(TM和RM)

    3.2.1、业务系统集成 Seata Client

    1. 业务数据库创建回滚日志表
      1.  -- for AT mode you must to init this sql for you business database. the seata server not need it.
         CREATE TABLE IF NOT EXISTS `undo_log`
         (
         	`branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
         	`xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
         	`context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
         	`rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
         	`log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
         	`log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
         	`log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
         	UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
         ) ENGINE = InnoDB
         AUTO_INCREMENT = 1
         DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
        
        
    2. 业务系统添加 seata 依赖
      1.  <dependency>
             <groupId>io.seata</groupId>
             <artifactId>seata-spring-boot-starter</artifactId>
             <version>1.3.0</version>
         </dependency>
        
    3. 业务系统配置文件中配置注册中心和配置中心、事务分组名称
      1. 这里注册中心和配置中心都选择 zk(以 account-service 为例)
      2.  seata:
         	registry:
         		type: zk
         		zk:
         		server-addr: localhost:2181
         	config:
         		type: zk
         		zk:
         		server-addr: localhost:2181
         	txServiceGroup:  account-service-g
        
    4. 在 Seata 配置中心配置事务分组和TC集群的关联关系
      1. 在 zk 添加以下配置(其他配置中心大同小异):
        1. 节点名称:/seata/service.vgroupMapping.account-service-g,节点内容:default
          1. 其中 account-service-g 是事务分组名称
          2. default 是 TC 集群名称
          3. 即配置所有事务分组名称为 account-service-g 的 TM 和 RM 注册到集群名称为 default 的 TC 集群

    3.2.2、不同微服务框架解决 Seata 事务传播问题

    3.2.2.1、事务传播的原理

    • 2.4.2、事务传播 可以知道,跨服务调用场景下的事务传播,本质上就是要把 XID 通过服务调用传递到服务提供方,并绑定到 RootContext 中去。

    3.2.2.2、事务传播的解决方案

    • 远程服务调用方:
      • 发起远程服务调用时,需要把全局事务XID包含到请求信息中
    • 远程服务提供方:
      • 处理请求前,解析获取 XID 并绑定到 RootContext 中
      • 处理请求后,将 XID 从 RootContext 中解绑

    3.2.2.3、常用微服务框架的事务传播问题

    (1)、Seata + Dubbo 分布式事务
    • 如何实现 Dubbo 请求的前置和后置处理呢? 熟悉 Dubbo 的胖友可能就会说,扩展它的 Filter。
    • 没错,我们就是要扩展 Dubbo 的 Filter,在服务调用方发起请求时,设置全局事务 XID;在服务提供方处理请求前,解析获取XID,并绑定到 RootContext,处理完请求后,清理 XID。
    • 幸运的是,Seata 框架中已经默认提供了这样的 Filter,它基于 SPI 机制自动注册,项目中也可以通过 ServiceLoader.load 查看。
    • 也就是说,Seata 天然支持 Dubbo 框架的事务传播,我们什么都不需要做。
    (2)、Seata + Spring Cloud OpenFeign 分布式事务
    • Spring Cloud OpenFeign 是基于 Http 协议的微服务框架。远程服务调用方需要在 Header 中设置全局事务XID;远程服务提供方在处理请求前需要从 Http Header 中获取全局事务XID 并绑定到 RootContext,处理完请求后清理 XID。
    • 推荐的做法:
      • 远程服务调用方:通过 feign 提供的 RequestInterceptor 设置 Header。但由于 OpenFeign 默认集成了 Hystrix,通信时使用的是异步多线程通信,所以还需要自定义配置 HystrixConcurrencyStrategy,处理线程间全局事务XID传递的问题。
        • 配置 HystrixConcurrencyStrategy:
          •   @Slf4j
              @Configuration
              public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
              
              	private HystrixConcurrencyStrategy delegate;
              
              	public FeignHystrixConcurrencyStrategy() {
              		try {
              			this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
              			if (this.delegate instanceof FeignHystrixConcurrencyStrategy) {
              				return;
              			}
              			HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
              					.getInstance().getCommandExecutionHook();
              			HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
              					.getEventNotifier();
              			HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
              					.getMetricsPublisher();
              			HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
              					.getPropertiesStrategy();
              
              
              			HystrixPlugins.reset();
              			HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
              			HystrixPlugins.getInstance()
              					.registerCommandExecutionHook(commandExecutionHook);
              			HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
              			HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
              			HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
              		}
              		catch (Exception ex) {
              			log.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
              		}
              	}
              
              	@Override
              	public <T> Callable<T> wrapCallable(Callable<T> callable) {
              		if (callable instanceof SeataContextCallable) {
              			return callable;
              		}
              		return new SeataContextCallable<>(callable,
              				RequestContextHolder.getRequestAttributes());
              	}
              
              	private static class SeataContextCallable<K> implements Callable<K> {
              
              		private final Callable<K> actual;
              
              		private final String xid;
              
              		private final RequestAttributes requestAttributes;
              
              		SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) {
              			this.actual = actual;
              			this.requestAttributes = requestAttribute;
              			this.xid = RootContext.getXID();
              		}
              
              		@Override
              		public K call() throws Exception {
              			try {
              				RequestContextHolder.setRequestAttributes(requestAttributes);
              				if (!StringUtils.isEmpty(xid)) {
              					RootContext.bind(xid);
              				}
              				return actual.call();
              			}
              			finally {
              				if (!StringUtils.isEmpty(xid)) {
              					RootContext.unbind();
              				}
              				RequestContextHolder.resetRequestAttributes();
              			}
              		}
              
              	}
              
              }
            
        • 添加 feign RequestInterceptor 设置 Header
          •   @Slf4j
              public class FeignBasicAuthRequestInterceptor implements RequestInterceptor {
              
              	@Override
              	public void apply(RequestTemplate template) {
              		try {
              			// 支持 seata 事务传播
              			String xid = RootContext.getXID();
              			if (!StringUtils.isEmpty(xid)) {
              				template.header(RootContext.KEY_XID, xid);
              			}
              		} catch (Exception e) {
              			log.error("FeignBasicAuthRequestInterceptor apply fail。", e);
              		}
              
              	}
              }
            
            
      • 远程服务调用方:基于 WEB 拦截器做前置、后置处理。例如,基于 Spring Web 项目,可以通过 HandlerInterceptor 实现:
        •   public class SeataHandlerInterceptor implements HandlerInterceptor {
            
            	private static final Logger log = LoggerFactory
            			.getLogger(SeataHandlerInterceptor.class);
            
            	@Override
            	public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
            			Object handler) {
            		String xid = RootContext.getXID();
            		String rpcXid = request.getHeader(RootContext.KEY_XID);
            		if (log.isDebugEnabled()) {
            			log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
            		}
            
            		if (StringUtils.isBlank(xid) && rpcXid != null) {
            			RootContext.bind(rpcXid);
            			if (log.isDebugEnabled()) {
            				log.debug("bind {} to RootContext", rpcXid);
            			}
            		}
            
            		return true;
            	}
            
            	@Override
            	public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
            
            	}
            
            	@Override
            	public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
            			Object handler, Exception e) {
            		if (StringUtils.isNotBlank(RootContext.getXID())) {
            			String rpcXid = request.getHeader(RootContext.KEY_XID);
            
            			if (StringUtils.isEmpty(rpcXid)) {
            				return;
            			}
            
            			String unbindXid = RootContext.unbind();
            			if (log.isDebugEnabled()) {
            				log.debug("unbind {} from RootContext", unbindXid);
            			}
            			if (!rpcXid.equalsIgnoreCase(unbindXid)) {
            				log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
            				if (unbindXid != null) {
            					RootContext.bind(unbindXid);
            					log.warn("bind {} back to RootContext", unbindXid);
            				}
            			}
            		}
            	}
            
            }
          
    (3)、Seata + RestTemplate 分布式事务
    • RestTemplate 同样基于 Http 协议,处理方式和 Spring Cloud OpenFeign 类似。推荐的做法:
      • 远程服务调用方:
        • 定义拦截器:
          •   public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
              
              	@Override
              	public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
              			ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
              		HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
              
              		String xid = RootContext.getXID();
              
              		if (!StringUtils.isEmpty(xid)) {
              			requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
              		}
              		return clientHttpRequestExecution.execute(requestWrapper, bytes);
              	}
              
              }
            
        • 给 RestTemplate 设置拦截器
          • restTemplate.setInterceptors(Collections.singletonList(new SeataRestTemplateInterceptor()));
      • 远程服务提供方:基于 WEB 拦截器做前置、后置处理。处理方式完全和 Spring Cloud OpenFeign一样,即可以复用上面的 SeataHandlerInterceptor
    (3)、Seata + 其他微服务框架
    • 实现思路参考:3.2.2.2、事务传播的解决方案
    • 在动手实现前,先查看 seata 包中是否已经默认提供相关方案。例如,motan、grpc 框架也享受和 Dubbo 框架同样的待遇,Seata 同样也默认提供了一个拥有传播事务XID的过滤器。
    • 实在找不到现成方案,我想根据上面的思路,自己实现一套事务传播方案,相信对于各位能看到这里的胖友来说,应该是小意思。

    四、Seata 分布式事务使用演示

    • 创建 3 个项目,order-service、storage-service、account-service

    • order-service 开启事务,并通过 Spring Cloud OpenFeign 调用 storage-service 服务。

      • 在这里插入图片描述
    • storage-service 通过 RestTemplate 调用 account-service 提供的服务。

      • 在这里插入图片描述
    • account-service 提供的服务。

      • 在这里插入图片描述
    • 通过 postman 调用 order-service 的接口,可以分别看到以下日志:

      • 在这里插入图片描述

      • 在这里插入图片描述

      • 在这里插入图片描述

    • 通过日志,可以看到全局事务最终是提交的状态。而要使全局事务的回滚,只需要让其中一个分支事务失败即可,这里就不演示了。

    五、Seata 工作原理简单分析

    5.1、工作流程

    这里使用 SkyWalking 进行链路追踪,给大家展示 四、Seata 分布式事务使用演示 的完整流程

    1. 完整链路追踪:

      1. 在这里插入图片描述
    2. 对 Postman 请求的链路追踪

      1. order-service

        1. 在这里插入图片描述
      2. storage-service

        1. 在这里插入图片描述
      3. account-service

        1. 在这里插入图片描述
    3. 全局事务提交后,异步删除 undo_log

      1. 在这里插入图片描述

    5.2、主要的类

    • 2.5、ORM 框架支持 ,我们知道:
      • Seata 虽然是保证数据一致性的组件,但对于 ORM 框架并没有特殊的要求,像主流的Mybatis,Mybatis-Plus,Spring Data JPA, Hibernate等都支持。这是因为ORM框架位于JDBC结构的上层,而 Seata 的 AT,XA 事务模式是对 JDBC 标准接口操作的拦截和增强。
    • Seata 是如何对 JDBC 标准接口操作的拦截和增强???答案在 DataSourceProxy。
      • 它对 DataSource 进行了增强
      • 它的 getConnection 方法返回的是:ConnectionProxy
        • ConnectionProxy 对 Connection 进行了增强
        • 它的 prepareStatement 方法返回的是同样是一个 Proxy 代理类
    • 从 DataSource 开始,Seata 对 JDBC 的标准接口进行了层层增强。而具体每一个增强类实现做了哪些事情,本博客就不深入探讨了。
    • 已经有不少关于 DataSourceProxy 源码解读的博客,各位胖友们可以移步观看。
    展开全文
    liqing0013 2021-08-21 13:50:17
  • qq_29569183 2020-12-23 16:26:36
  • m0_60491538 2021-11-18 17:06:30
  • fyj13925475957 2020-05-11 16:19:34
  • weixin_35702787 2021-02-28 18:59:53
  • 3.74MB u012684638 2018-11-30 19:41:56
  • wwd0501 2020-09-02 18:23:20
  • weixin_39787826 2020-12-21 12:09:42
  • www1056481167 2021-02-05 17:07:33
  • qq_35890572 2020-05-27 14:04:55
  • u010046908 2019-10-22 16:44:57
  • d960704119 2021-07-14 17:17:51
  • weixin_41263382 2020-07-22 11:08:10
  • weixin_37512224 2020-10-12 21:48:42
  • weixin_34130389 2019-04-19 09:14:42
  • weixin_45538722 2021-03-22 19:31:06
  • qq_37640410 2021-04-06 11:39:14
  • hosaos 2019-04-23 20:56:51

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,420
精华内容 1,368
关键字:

seata分布式事务原理

友情链接: Source.zip