精华内容
下载资源
问答
  • Kafka的幂等性与事务性理解
    千次阅读
    2020-09-08 12:53:27

        最近在深入理解Flink的Exactly-Once,发现Flink Checkpoint只能保障Flink程序内部的一致性,无法保证Sink到外部系统的Exactly-Once语义。但是Sink到外部如果实现了TwoPhaseCommitSinkFunction这个抽象类就能实现端到端的Exactly-Once语义,而Kafka刚好也实现了这个这个类,所以先来研究下Kafka的Exactly-Once是怎么实现的。

        在Producer向Kafka发送消息的时候,如果消息成功被写入到日志文件里面,消息就不会丢失了(多副本机制)。但是如果中间发生网络中断等异常,Producer没有接受到Ack消息,无法判断消息是否已经提交了,此时选择重复发送消息到Kafka,这样就会造成数据的重复写入(At Least Once)。因此在0.11.0.0版本中,Kafka引入了幂等性和事务性,以此来实现Exactly-Once语义。

     

    Kafka幂等性:

    幂等,就是指多接口的多次调用所产生的结果和只调用一次是一致的。没有幂等性的情况下就会重复发送数据,如下图所示:

        Kafka的幂等性机制能保证单个分区不会重复写入数据,而实现幂等性的核心就是引入了producer id 和 sequence number这两个概念。

        Kafka内部会自动为每个Producer分配一个producer id(PID),broker端会为producer每个Partition维护一个<PID,Partition> -> sequence number映射。sequence number时从0开始单调递增的。

    对于新接受到的消息,broker端会进行如下判断:

    1. 如果新消息的sequence number正好是broker端维护的<PID,Partition> -> sequence number大1,说broker会接受处理这条消息。
    2. 如果新消息的sequence number比broker端维护的sequence number要小,说明时重复消息,broker可以将其直接丢弃
    3. 如果新消息的sequence number比broker端维护的sequence number要大过1,说明中间存在了丢数据的情况,那么会响应该情况,对应的Producer会抛出OutOfOrderSequenceException。

     

    Producer如何开启幂等性:

        Properties.put(“enable.idempotence”,true);

        如果使用的深入的话,还需要修改下producer端的retries、acks、max.in.flignt.requests.per.connection这几个参数,当然默认值已经能够应付大部分情况了,也不太建议修改。

     

    Kafka事务性:

        Kafka事务性主要是为了解决幂等性无法跨Partition运作的问题,事务性提供了多个Partition写入的原子性,即写入多个Partition要么全部成功,要么全部失败,不会出现部分成功部分失败这种情况。Flink正是基于Kafka的事务性,实现了端到端的Exactly Once语义 (ps: 当然FLink配合其他系统也可以实现Exactly Once语义,比如参考链接中的MySQL,我这里想说明的是source和sink都是Kafka的情况下,可以做到端到端Exactly Once)。

    在幂等性<producer id, Partition> -> sequence number的基础之上,Kafka还引入了如下这些角色来实现事务性:
        1.    TransactionalId 
        2.    _transaction_state(Topic)
        3.    Producer epoch
        4.    ControlBatch (又叫Control Message、Transaction Marker) 
        5.    TransactionCoordinator

        TransactionalId是需要用户显示进行设置的,用于唯一标识某个producer。这里为啥要重新引入一个TransactionalId而不是使用幂等性中引入的producer id呢?
        这是因为producer id 在producer每次重启的时候都会更新为一个新值,如果没有TransactionalId,那么producer在Fail恢复后就不能abort上次未完成的事务了。TransactionalId可以唯一标识一个事务操作,便于这个事务的所有操作都能在用一个TransactionCoordinator(负责事务真正执行的Kafka内部角色) 上进行处理。
        
        事务日志记录在_transaction_state Topic中。TransactionCoordinator如果发生异常进行恢复或者新选举时,可以通过读取_transaction_state 中的事务日志信息,来恢复其状态信息。

        Producer epoch配合TransactionalId用于唯一标识最新的那个producer,它是一个单调递增的值,在每次初始化事务的时候递增(在KafkaProducer.initTransactions()方法中,每个producer通过transactionId获取producer id的时候同时获取到这个值)。它的作用如下,如果有两个producer使用了相同的transactionId,那么比较旧的那个producer会抛出异常,避免事务干扰。

        ControlBatch是producer产生的并写入到Topic的特殊消息,ControlBatch一共有两种类型:COMMIT和ABORT,分别表示事务已经成功提交或者被成功中止。

        Producer 在持久化数据时跟之前一样,按照条件持久化到硬盘(数据会有一个标识,标识这条或这批数据是不是事务写入的数据),当收到 Transaction Marker 时,把这个 ControlBatch(Transaction Marker)数据也直接写入这个 Partition 中,这样在处理 Consumer 消费时,就可以根据 ControlBatch信息做相应的处理。真正执行时,Producer 只需要告诉 TransactionCoordinator 当前事务可以 commit,然后再由 TransactionCoordinator 来向其涉及到的 Topic-Partition 的 leader 发送 Transaction Marker 数据,这里减轻了 Client 的压力,而且 TransactionCoordinator 会做一些优化,如果这个目标 Broker 涉及到多个事务操作,是可以共享这个 TCP 连接的;

        对于Consumer来说ControlBatch是不可见,它们是用来让broker告知consumer之前拉取的消息是否被原子性提交,如下如所示:

    在这里插入图片描述

     

        TransactionCoordinator 是每个producer在broker端都会对应分配到的这么一个角色,负责事务的真正执行,并跟进记录事务当前所处的状态。

     

    Producer如何开启事务:

    程序必须给producer显式指定唯一的transactionId:

        properties.put("transactionId"," ***id*** ")

        程序默认会将幂等性设置为true,无需用户手动设置。

    同时Consumer端要注意设置一个与事务性相关的参数:

        isolation.level    默认值为read_uncommitted,意思是Consumer应用可以消费到未提交的事务。如果设置成 read_committed,Consumer应用就不能消费到未提交的事务。

        同时要将enable.auto.commit参数设置为false

    producer事务使用代码示例如下:

    /**

         * 在一个事务内,即有生产消息又有消费消息

         */

        public void consumeTransferProduce() {

            // 0. 构建生产者属性

            Properties pro = new Properties()

            pro.put(...key序列化设定...);

            pro.put(...value序列化设定...);

            pro.put("transactionId"," ***id*** ")

            // 1.构建生产者

            Producer producer = new Producer(properties);

            // 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作

            producer.initTransactions();

            while (true) {

                // 3.开启事务

                producer.beginTransaction();

                List<String> records = readCsvFile(path);

                try {

                    for (String record : records) {

                          producer.send(new ProducerRecord<String, String>("test",record ));

                    }

                    // 4.事务提交

                    producer.commitTransaction();

                } catch (Exception e) {

                    // 4.放弃事务

                    producer.abortTransaction();

                }

            }

        }

     

    TransactionCoordinator 执行事务操作时,整体流程如下图所示:

     

    1. 查找Tranaction Corordinator

        Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址,返回对应的node_id,host,port信息。

    2. 初始化事务 initTransaction

        Producer发送InitpidRequest给Transaction Coordinator,获取pid。同时Transaction Coordinator会在Transaciton Log中记录这<TransactionId,pid>的映射关系这一消息。同时消息内部还包含了transactio_status事务状态信息:Empty/Ongoing/PrepareCommit/PrepareAbort/CompleteCommit/CompleteAbort/Dead.

        另外,它还会做两件事:

    • 恢复(Commit或Abort)之前的Producer未完成的事务
    • 对PID对应的epoch进行递增,保证producer事务的唯一性。

    只要开启了幂等特性即必须执行InitpidRequest,而无须考虑该Producer是否开启了事务特性。

    3. 开始事务beginTransaction

        执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。这个操作并没有通知Transaction Coordinator,因为Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。

    4. read-process-write流程

        一旦Producer开始发送消息,Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为开始。另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。

        在注册<Transaction, Topic, Partition>到Transaction Log后,生产者发送数据,虽然没有还没有执行commit或者abort,但是此时消息已经保存到Broker上了。即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。

    5. 事务提交或终结 commitTransaction/abortTransaction

    在Producer执行commitTransaction/abortTransaction时,Transaction Coordinator会执行一个二阶段提交:

    • 第一阶段,将Transaction Log内的该事务状态设置为PREPARE_COMMITPREPARE_ABORT
    • 第二阶段,将Transaction Marker写入该事务涉及到的所有消息(即将消息标记为committedaborted)。这一步骤Transaction Coordinator会发送给当前事务涉及到的每个<Topic, Partition>的Leader,Broker收到该请求后,会将对应的Transaction Marker控制信息写入日志。

        一旦Transaction Marker写入完成,Transaction Coordinator会将最终的COMPLETE_COMMITCOMPLETE_ABORT状态写入Transaction Log中以标明该事务结束。

     

        如上所述的事务性是能保证单个Producer的事务性。但是在Flink中会创建多个Producer,这个时候配合Flink中的Checkpoint机制,能保证多个Producer的事务性,实现端到端的Exactly-Once语义
        虽然Producer端保证了事务性,但是Consumer端很难保证一个已经commit的事务所有msg都会被消费,有以下几个原因:
            1. 对于 compacted topic,在一个事务中写入的数据可能会被新的值覆盖;
            2. 一个事务内的数据,可能会跨多个 log segment,如果旧的 segmeng 数据由于过期而被清除,那么这个事务的一部分数据就无法被消费到了;
            3. Consumer 在消费时可以通过 seek 机制,随机从一个位置开始消费,这也会导致一个事务内的部分数据无法消费;
            4. Consumer 可能没有订阅这个事务涉及的全部 Partition。

     

        对于一个 Partition 而言,offset 小于 LSO (Last Stable Offset)的数据,全都是已经确定的数据,这个主要是对于事务操作而言,在这个 offset 之前的事务操作都是已经完成的事务(已经 commit 或 abort),如果这个 Partition 没有涉及到事务数据,那么 LSO 就是其 HW(水位)。多个producer同时提交事务时,可能会出现先提交的事务未完成,但是事务已经完成的情况,此时后提交的事务的数据也不能被读取到,因为LSO结合事务时,保证之前的数据都是已提交的事务,所以此时会造成数据被消费的延迟,所以在实际的生产场景中,尽量避免 long transaction 这种操作,而且 long transaction可能也会容易触发事务超时

     

    参考:

        https://www.cnblogs.com/smartloli/p/11922639.html (Kafka幂等性图解)

        https://blog.csdn.net/u011669700/article/details/80000744 (Kafka部分参数介绍)

        https://www.jianshu.com/p/5bdd9a0d7d02 (Flink写入MySQL实现Exactly-Once)

        https://blog.51cto.com/simplelife/2401411(Flink的二阶段提交)
        https://www.cnblogs.com/createweb/p/11971846.html (Flink Exactly Once写入Kafka出错)

        https://www.jianshu.com/p/64c93065473e(Kafka事务流程图)

        https://blog.csdn.net/mlljava1111/article/details/81180351(事务 java Demo)

        https://blog.csdn.net/muyimo/article/details/91439222(Kafka事务性分析)

        https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
        https://www.confluent.io/blog/transactions-apache-kafka/

    更多相关内容
  • 最大努力通知事务主要用于外部系统,因为外部的网络环境更加复杂和不可信,所以只能尽最大努力去通知实现数据最终一致,比如充值平台与运营商、支付对接、商户通知等等跨平台、跨企业的系统间业务交互场景;而事务...
  • 在前面的文章中我们提到了柔性事务遵循BASE理论,满足最终一致,柔性事务主要分为补偿型和通知型。补偿型事务又分TCC、Saga,通知事务事务消息、最大努力通知型。补偿型事务都是同步的,通知事务都是异步的...

    目录

    【前言】

    TCC

    Saga

     【通知型事务】

    本地消息表

    MQ事务消息

    最大努力通知

    总结


    分布式事务:

    分布式事务(一)、CAP,BASE理论

    分布式事务(二)、刚性事务之 2PC、3PC


    【前言】

    在前面的文章中我们提到了柔性事务遵循BASE理论,满足最终一致性,柔性事务主要分为补偿型通知型。补偿型事务又分TCC、Saga,通知型事务分事务消息、最大努力通知型。补偿型事务都是同步的,通知型事务都是异步的。本篇介绍柔性事务的这几种实现方案:TCC、Saga、事务消息、本地消息表、最大努力通知。

     

    TCC

    TCC(Try-Confirm-Cancel)采用补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。TCC模型通过对业务逻辑的分解来实现分布式事务:

    1. Try:尝试执行业务,完成所有业务检查(一致性),预留必要的业务资源(准隔离性)。
    2. Confirm:确认执行业务,不再做业务检查。只使用Try阶段预留的业务资源,Confirm操作满足幂等性。
    3. Cancel: 若业务执行失败,则取消执行业务并释放Try阶段预留的业务资源。

    TCC分布式事务模型包括如下三部分:

    • 主业务服务:负责发起并完成整个业务活动。
    • 从业务服务:是整个业务活动的参与方,实现 Try、Confirm、Cancel 操作,供主业务服务调用。
    • 事务管理器:管理整个业务活动,包括记录事务状态,调用从业务服务的 Confirm/Cancel 操作等。

    TCC对比2PC

    可以看出,TCC也是把事务分成了两个阶段,Try是阶段一,Confirm 和 Cancel 是阶段二的两个分支。这有点像2PC(二阶段提交),但其实他们是不一样的,下面是他们的区别:

    1. 2PC和3PC都是数据库层面的操作,对于开发人员无感知;而TCC是业务层的操作,对开发人员来说具有较高的开发成本。
    2. 2PC是一个整体的长事务,是刚性事务;而TCC是一组本地短事务,是柔性事务;
    3. 2PC是全局锁定资源,所有参与者阻塞等待事务管理器的通知;而TCC的资源锁定在于Try操作,业务方可以灵活选择业务资源的锁定粒度。

    ​​​​​TCC优缺点

    优点:

    • 应用可以自定义数据操作的粒度,降低了锁冲突,提升吞吐量。

    缺点: 

    • 应用侵入性强, Try、Confirm、Cancel 三个阶段都需要业务逻辑实现。
    • 需要根据网络、系统故障等不同失败原因实现不同的回滚策略, 实现难度大,一般借助 TCC 开源框架,ByteTCC,TCC-transaction,Himly。

    Saga

    Saga模型是把一个分布式事务拆分为多个本地事务,每个本地事务都有相应的执行模块和补偿模块(对应TCC中的Confirm和Cancel),当Saga事务中任意一个本地事务出错时,可以通过调用相关的补偿方法恢复之前的事务,达到事务最终一致性。

    Saga 模型由三部分组成

    • LLT(Long Live Transaction):由一个个本地事务组成的事务链
    • 本地事务:事务链由一个个子事务(本地事务)组成,LLT = T1+T2+T3+...+Ti。
    • 补偿:每个本地事务 Ti 有对应的补偿 Ci。

    Saga 的执行顺序

    • 正常情况: T1,T2,T3,...,Ti
    • 异常情况: T1,T2,T3,..Ti,Ci,...C3,C2,C1

    Saga 两种恢复策略

    • 向后恢复(Backward Recovery):撤销掉之前所有成功子事务。如果任意本地子事务失败,则补偿已完成的事务。如异常情况的执行顺序T1,T2,T3,..Ti,Ci,...C3,C2,C1。
    • 向前恢复(Forward Recovery):即重试失败的事务,适用于必须要成功的场景,该情况下不需要Ci。执行顺序:T1,T2,...,Tj(失败),Tj(重试),...,Ti。

    Saga 模型可以满足事务的三个特性ACD

    • 原子性:Saga 协调器协调事务链中的本地事务要么全部提交,要么全部回滚。
    • 一致性:Saga 事务可以实现最终一致性。
    • 持久性:基于本地事务,所以这个特性可以很好实现。

    Saga缺乏隔离性会带来脏读,幻读,不可重复读的问题。由于Saga 事务和 TCC 事务一样,都是强依靠业务改造,因此需要在业务设计上去解决这个问题:

    • 在应⽤层⾯加⼊逻辑锁的逻辑。
    • Session 层⾯隔离来保证串⾏化操作。
    • 业务层⾯采⽤预先冻结数据的方式隔离此部分数据。
    • 业务操作过程中通过及时读取当前状态的⽅式获取更新。

     实现Saga的注意事项

    1. Ti和Ci必须是幂等的。如向后恢复和向前恢复时候如果不是幂等操作会导致数据不一致。
    2. Ci必须是能够成功的,如果无法成功则需要人工介入。
    3. Ti->Ci和Ci->Ti的执行结果必须是一样的。

    Saga对比TCC

    Saga和TCC都是补偿型事务,他们的区别为:

    劣势:

    • 无法保证隔离性;

    优势:

    • 一阶段提交本地事务,无锁,高性能;
    • 事件驱动模式,参与者可异步执行,高吞吐;
    • Saga 对业务侵入较小,只需要提供一个逆向操作的Cancel即可;而TCC需要对业务进行全局性的流程改造; 

     【通知型事务】

    上面提到柔性事务主要分为补偿型通知型,我们已经介绍了补偿型的TCC,Saga模型,下面继续介绍通知型事务。

    通知型事务的主流实现是通过MQ(消息队列)来通知其他事务参与者自己事务的执行状态,MQ组件的引入有效的将事务参与者进行解耦,各参与者都可以异步执行,所以通知型事务又被称为异步事务。通知型事务主要适用于那些需要异步更新数据,并且对数据的实时性要求较低的场景,主要包含事务消息最大努力通知事务两种。

    事务消息:主要适用于内部系统的数据最终一致性保障,因为内部相对比较可控,如订单和购物车、收货与清算、支付与结算等等场景;

    最大努力通知:主要用于外部系统,因为外部的网络环境更加复杂和不可信,所以只能尽最大努力去通知实现数据最终一致性,比如充值平台与运营商、支付对接等等跨网络系统级别对接;

    普通消息是无法解决本地事务执行和消息发送的一致性问题的。因为消息发送是一个网络通信的过程,发送消息的过程就有可能出现发送失败、或者超时的情况。超时有可能发送成功了,有可能发送失败了,消息的发送方是无法确定的,所以此时消息发送方无论是提交事务还是回滚事务,都有可能不一致性出现。因此通知型事务的难点在于投递消息和参与者自身本地事务的一致性保障。目前业界解决这个一致性的方案有两个分支:

    • 基于MQ自身的事务消息方案
    • 基于DB的本地消息表方案 

    本地消息表

    ​​​​​本地消息表最初由eBay 提出来解决分布式事务的问题。是目前业界使用的比较多的方案之一,它的核心思想就是将分布式事务拆分成本地事务进行处理。

    流程

    发送消息方:

    • 需要有一个消息表,记录着消息状态相关信息。
    • 业务数据和消息表在同一个数据库,要保证它俩在同一个本地事务。
    • 在本地事务中处理完业务数据和写消息表操作后,通过写消息到 MQ 消息队列。
    • 消息会发到消息消费方,如果发送失败,即进行重试。

    消息消费方:

    • 处理消息队列中的消息,完成自己的业务逻辑。
    • 如果本地事务处理成功,则表明已经处理成功了。
    • 如果本地事务处理失败,那么就会重试执行。
    • 如果是业务层面的失败,给消息生产方发送一个业务补偿消息,通知进行回滚等操作。

    生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

    本地消息表优缺点:

    优点:

    • 本地消息表建设成本比较低,实现了可靠消息的传递确保了分布式事务的最终一致性。

    缺点:

    • 本地消息表与业务耦合在一起,难于做成通用性,不可独立伸缩。
    • 本地消息表是基于数据库来做的,而数据库是要读写磁盘IO的,因此在高并发下是有性能瓶颈的

    MQ事务消息

    基于MQ的事务消息方案主要依靠MQ的半消息机制来实现投递消息和参与者自身本地事务的一致性保障。半消息机制实现原理其实借鉴的2PC的思路,是二阶段提交的广义拓展。

    半消息:在原有队列消息执行后的逻辑,如果后面的本地逻辑出错,则不发送该消息,如果通过则告知MQ发送;

    流程

    1. 事务发起方首先发送半消息到MQ;
    2. MQ通知发送方消息发送成功;
    3. 在发送半消息成功后执行本地事务;
    4. 根据本地事务执行结果返回commit或者是rollback;
    5. 如果消息是rollback, MQ将丢弃该消息不投递;如果是commit,MQ将会消息发送给消息订阅方;
    6. 订阅方根据消息执行本地事务;
    7. 订阅方执行本地事务成功后再从MQ中将该消息标记为已消费;
    8. 如果执行本地事务过程中,执行端挂掉,或者超时,MQ服务器端将不停的询问producer来获取事务状态;
    9. Consumer端的消费成功机制有MQ保证; 

    MQ事务消息对比本地消息表

    MQ事务消息:

    • 需要MQ支持半消息机制或者类似特性,在重复投递上具有比较好的去重处理;
    • 具有比较大的业务侵入性,需要业务方进行改造,提供对应的本地操作成功的回查功能;

    DB本地消息表:

    • 使用了数据库来存储事务消息,降低了对MQ的要求,但是增加了存储成本;
    • 事务消息使用了异步投递,增大了消息重复投递的可能性;

    最大努力通知

    最大努力通知方案的目标,就是发起通知方通过一定的机制,最大努力将业务处理结果通知到接收方。本质是通过引入定期校验机制实现最终一致性,对业务的侵入性较低,适合于对最终一致性敏感度比较低、业务链路较短的场景。

    最大努力通知解决方案:要实现最大努力通知,可以采用 MQ 的 ACK 机制。

    1. 业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。
    2. 主动方可以设置时间阶梯型通知规则,在通知失败后按规则重复通知,直到通知N次后不再通知。
    3. 主动方提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息。
    4. 业务活动的被动方如果正常接收了数据,就正常返回响应,并结束事务。
    5. 如果被动方没有正常接收,根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。

    特点

    1. 用到的服务模式:可查询操作、幂等操作;
    2. 被动方的处理结果不影响主动方的处理结果;
    3. 适用于对业务最终一致性的时间敏感度低的系统;
    4. 适合跨企业的系统间的操作,或者企业内部比较独立的系统间的操作,比如银行通知、商户通知等;

    总结

    方案对比:

    属性2PCTCCSaga本地消息表事务消息尽最大努力通知
    事务一致性
    复杂性
    业务侵入性
    使用局限性
    性能
    维护成本

     

    希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!

    展开全文
  • 点击上方蓝色字体,选择“设为星标”优质文章,及时送达本文来源:https://dwz.cn/730BLvt0上篇文章主要介绍了分布式事务解决方案之2PC、TCC,传送门,这篇文章将主要讲...

    点击上方蓝色字体,选择“设为星标”

    优质文章,及时送达

    本文来源:https://dwz.cn/730BLvt0

    上篇文章主要介绍了分布式事务解决方案之2PC、TCC,传送门,这篇文章将主要讲解分布式事务解决方案之可靠消息最终一致性、最大努力通知。

    • 分布式事务解决方案之可靠消息最终一致性

      • 什么是可靠消息最终一致性事务

      • 解决方案

        • 本地消息表方案

        • RocketMQ事务消息方案

    • 分布式事务解决方案之最大努力通知

      • 什么是最大努力通知

      • 解决方案

    • 分布式事务对比分析

    1.分布式事务解决方案之可靠消息最终一致性

    1.1 什么是可靠消息最终一致性事务

    可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能 够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致

    此方案是利用消息中间件完成,如下图:

    事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消息中间件 之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事 务问题。

    因此可靠消息最终一致性方案要解决以下几个问题:

    1.本地事务与消息发送的原子性问题 本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实 现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最 终一致性方案的关键问题。

    先来尝试下这种操作,先发送消息,再操作数据库:

    begin transaction; 
        //1.发送MQ
        //2.数据库操作
    commit transation;
    

    这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。

    你立马想到第二种方案,先进行数据库操作,再发送消息:

    begin transaction;
        //1.数据库操作
        //2.发送MQ
    commit transation;
    

    这种情况下貌似没有问题,如果发送MQ消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数 据库回滚,但MQ其实已经正常发送了,同样会导致不一致。

    2.事务参与方接收消息的可靠性

    事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。

    3.消息重复消费的问题

    由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重 复消费。

    要解决消息重复消费的问题就要实现事务参与方的方法幂等性。

    1.2 解决方案

    1.2.1 本地消息表方案

    本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后 通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。

    下面以注册送积分为例来说明:

    下例共有两个微服务交互,用户服务和积分服务,用户服务负责添加用户,积分服务负责增加积分。

    交互流程如下:

    1、用户注册 

    用户服务在本地事务新增用户和增加 ”积分消息日志“。(用户表和消息表通过本地事务保证一致) 下边是伪代码

    begin transaction; 
        //1.新增用户
        //2.存储积分消息日志
    commit transation;
    

    这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原子性

    2、定时任务扫描日志

    如何保证将消息发送给消息队列呢?

    经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息 中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。

    3、消费消息

    如何保证消费者一定能消费到消息呢?

    这里可以使用MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ 发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重 试向消费者来发送消息。

    积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复 投递此消息。

    由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性

    1.2.2 RocketMQ事务消息方案

    RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项 目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之 上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消 息,为分布式事务实现提供了便利性支持。

    RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的 设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系 统发生异常时依然能够保证达成事务的最终一致性。

    在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ 内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。

    执行流程如下:为方便理解我们还以注册送积分的例子来描述 整个流程。

    Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责 新增积分。

    1、Producer 发送事务消息

    Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注 意此时这条消息消费者(MQ订阅方)是无法消费到的。本例中,Producer 发送 ”增加积分消息“ 到MQ Server。

    2、MQ Server回应消息发送成功 MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

    3、Producer 执行本地事务 Producer 端执行业务代码逻辑,通过本地数据库事务控制 本例中,Producer 执行添加用户操作。

    4、消息投递 若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积 分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删 除”增加积分消息“ 。

    MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即 程序执行正常则自动回应ack。

    5、事务回查 如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer 来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

    以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此 只需关注本地事务的执行状态即可。

    RoacketMQ提供RocketMQLocalTransactionListener接口:

    public interface RocketMQLocalTransactionListener {
     /**
     ‐ 发送prepare消息成功此方法被回调,该方法用于执行本地事务
     ‐ @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
     ‐ @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
     ‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
     */
     RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg); 
     /**
     ‐ @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
     ‐ @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
     */
     RocketMQLocalTransactionState checkLocalTransaction(Message msg); }
    

    发送事务消息:

    以下是RocketMQ提供用于发送事务消息的API:

        TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); 
        producer.setNamesrvAddr("127.0.0.1:9876"); 
        producer.start();
       //设置TransactionListener实现
       producer.setTransactionListener(transactionListener); 
       //发送事务消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    

    1.3 小结

    可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性, RocketMQ作为 消息中间件,

    RocketMQ主要解决了两个功能:

    • 本地事务与消息发送的原子性问题。

    • 事务参与方接收消息的可靠性。可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消 息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。

    2.分布式事务解决方案之最大努力通知

    2.1 什么是最大努力通知

    最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:

    交互流程:

    1. 账户系统调用充值系统接口

    2. 充值系统完成支付处理向账户系统发起充值结果通知 若通知失败,则充值系统按策略进行重复通知

    3. 账户系统接收到充值结果通知修改充值状态。

    4. 账户系统未接收到通知会主动调用充值系统的接口查询充值结果

    通过上边的例子我们总结最大努力通知方案的目标:

    目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。

    具体包括:

    1. 有一定的消息重复通知机制。因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。

    2. 消息校对机制。如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息 信息来满足需求。

    最大努力通知与可靠消息一致性有什么不同

    1、解决方案思想不同

    可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知 方来保证。

    最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接 收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方

    2、两者的业务应用场景不同

    可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。

    最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。

    3、技术解决方向不同

    可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。

    最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消 息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。

    2.2 解决方案

    通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知

    方案1:

    本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:

    1. 发起通知方将通知发给MQ。使用普通消息机制将通知发给MQ。注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。(后边会讲)

    2. 接收通知方监听 MQ

    3. 接收通知方接收消息,业务处理完成回应ack

    4. 接收通知方若没有回应ack则MQ会重复通知。

    5. 接收通知方可通过消息校对接口来校对消息的一致性。

    方案2:

    本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图:

    交互流程如下:

    1. 发起通知方将通知发给MQ。使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。

    2. 通知程序监听 MQ,接收MQ的消息。方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。通知程序若没有回应ack则MQ会重复通知。

    3. 通知程序通过互联网接口协议(如http、webservice)调用接收通知方案接口,完成通知。通知程序调用接收通知方案接口成功就表示通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消 息。接收通知方可通过消息校对接口来校对消息的一致性。

    方案1和方案2的不同点:

    1. 方案1中接收通知方与MQ接口,即接收通知方案监听 MQ,此方案主要应用与内部应用之间的通知。

    2. 方案2中由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收 通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。

    3.分布式事务对比分析:

    在了解各种分布式事务的解决方案后,我们了解到各种方案的优缺点:

    2PC 最大的诟病是一个阻塞协议。RM在执行分支事务后需要等待TM的决定,此时服务会阻塞并锁定资源。由于其 阻塞机制和最差时间复杂度高, 因此,这种设计不能适应随着事务涉及的服务数量增加而扩展的需要,很难用于并 发较高以及子事务生命周期较长 (long-running transactions) 的分布式服务中。

    如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处 理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使 得降低锁冲突、提高吞吐量成为可能。而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现 try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实 现不同的回滚策略。典型的使用场景:满,登录送优惠券等。

    可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入消息机制后,同步的事务操作变为基于消 息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。典型的使用场景:注 册送积分,登录送优惠券等。

    最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务;允许发起通知方处理业 务失败,在接收通知方收到通知后积极进行失败处理,无论发起通知方如何处理结果都会不影响到接收通知方的后 续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用场景:银行通知、支付结果 通知等。

    最大努力通知是分布式事务中要求最低的一种,适用于一些最终一致性时间敏感度低的业务;允许发起通知方处理业 务失败,在接收通知方收到通知后积极进行失败处理,无论发起通知方如何处理结果都会不影响到接收通知方的后 续处理;发起通知方需提供查询执行情况接口,用于接收通知方校对结果。典型的使用场景:银行通知、支付结果 通知等。


    2PCTCC可靠消息最大努力通知
    一致性强一致性最终一致最终一致最终一致
    吞吐量
    实现复杂度

    总结:在条件允许的情况下,我们尽可能选择本地事务单数据源,因为它减少了网络交互带来的性能损耗,且避免了数据 弱一致性带来的种种问题。若某系统频繁且不合理的使用分布式事务,应首先从整体设计角度观察服务的拆分是否 合理,是否高内聚低耦合?是否粒度太小?分布式事务一直是业界难题,因为网络的不确定性,而且我们习惯于拿 分布式事务与单机事务ACID做对比。

    无论是数据库层的XA、还是应用层TCC、可靠消息、最大努力通知等方案,都没有完美解决分布式事务问题,它们 不过是各自在性能、一致性、可用性等方面做取舍,寻求某些场景偏好下的权衡。


    ❤️「转发」和「在看」,是对我最大的支持❤️
    
    展开全文
  • 一次搞定数据库事务

    千人学习 2019-12-13 21:56:38
    第一部分:彻底明白事务的四个特性:原子、一致、隔离、持久,用场景和事例来讲解。   第二部分:实战讲数据库事务的6中并发异常:回滚丢失、覆盖丢失、脏读、幻读、不可重复读、MVCC精讲。   第...
  • kafka事务性实现

    万次阅读 多人点赞 2019-06-11 15:15:38
    这篇文章是 Kafka Exactly-Once 实现系列的第二篇,主要讲述 Kafka 事务性的实现,这部分的实现要比幂等性的实现复杂一些,幂等性实现是事务性实现的基础,幂等性提供了单会话单 Partition Exactly-Once 语义的实现...

    转自:http://matt33.com/2018/11/04/kafka-transaction/ 侵删

    这篇文章是 Kafka Exactly-Once 实现系列的第二篇,主要讲述 Kafka 事务性的实现,这部分的实现要比幂等性的实现复杂一些,幂等性实现是事务性实现的基础,幂等性提供了单会话单 Partition Exactly-Once 语义的实现,正是因为 Idempotent Producer 不提供跨多个 Partition 和跨会话场景下的保证,因此,我们是需要一种更强的事务保证,能够原子处理多个 Partition 的写入操作,数据要么全部写入成功,要么全部失败,不期望出现中间状态。这就是 Kafka Transactions 希望解决的问题,简单来说就是能够实现 atomic writes across partitions,本文以 Apache Kafka 2.0.0 代码实现为例,深入分析一下 Kafka 是如何实现这一机制的。

    Apache Kafka 在 Exactly-Once Semantics(EOS)上三种粒度的保证如下(来自 Exactly-once Semantics in Apache Kafka):

    1. Idempotent Producer:Exactly-once,in-order,delivery per partition;
    2. Transactions:Atomic writes across partitions;
    3. Exactly-Once stream processing across read-process-write tasks;

    第二种情况就是本文讲述的主要内容,在讲述整个事务处理流程时,也顺便分析第三种情况。

    Kafka Transactions

    Kafka 事务性最开始的出发点是为了在 Kafka Streams 中实现 Exactly-Once 语义的数据处理,这个问题提出之后,在真正的方案讨论阶段,社区又挖掘了更多的应用场景,也为了尽可能覆盖更多的应用场景,在真正的实现中,在很多地方做了相应的 tradeoffs,后面会写篇文章对比一下 RocketMQ 事务性的实现,就能明白 Kafka 事务性实现及应用场景的复杂性了。

    Kafka 的事务处理,主要是允许应用可以把消费和生产的 batch 处理(涉及多个 Partition)在一个原子单元内完成,操作要么全部完成、要么全部失败。为了实现这种机制,我们需要应用能提供一个唯一 id,即使故障恢复后也不会改变,这个 id 就是 TransactionnalId(也叫 txn.id,后面会详细讲述),txn.id 可以跟内部的 PID 1:1 分配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 内部自动生成的(并且故障恢复后这个 PID 会变化),有了 txn.id 这个机制,就可以实现多 partition、跨会话的 EOS 语义。

    当用户使用 Kafka 的事务性时,Kafka 可以做到的保证:

    1. 跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;
    2. 跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);
    3. 跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。

    上面是从 Producer 的角度来看,那么如果从 Consumer 角度呢?Consumer 端很难保证一个已经 commit 的事务的所有 msg 都会被消费,有以下几个原因:

    1. 对于 compacted topic,在一个事务中写入的数据可能会被新的值覆盖;
    2. 一个事务内的数据,可能会跨多个 log segment,如果旧的 segmeng 数据由于过期而被清除,那么这个事务的一部分数据就无法被消费到了;
    3. Consumer 在消费时可以通过 seek 机制,随机从一个位置开始消费,这也会导致一个事务内的部分数据无法消费;
    4. Consumer 可能没有订阅这个事务涉及的全部 Partition。

    简单总结一下,关于 Kafka 事务性语义提供的保证主要以下三个:

    1. Atomic writes across multiple partitions.
    2. All messages in a transaction are made visible together, or none are.
    3. Consumers must be configured to skip uncommitted messages.

    事务性示例

    Kafka 事务性的使用方法也非常简单,用户只需要在 Producer 的配置中配置 transactional.id,通过 initTransactions() 初始化事务状态信息,再通过 beginTransaction() 标识一个事务的开始,然后通过 commitTransaction() 或 abortTransaction() 对事务进行 commit 或 abort,示例如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    Properties props = new Properties();
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("client.id", "ProducerTranscationnalExample");
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "test-transactional");
    props.put("acks", "all");
    KafkaProducer producer = new KafkaProducer(props);
    producer.initTransactions();
    
    try {
        String msg = "matt test";
        producer.beginTransaction();
        producer.send(new ProducerRecord(topic, "0", msg.toString()));
        producer.send(new ProducerRecord(topic, "1", msg.toString()));
        producer.send(new ProducerRecord(topic, "2", msg.toString()));
        producer.commitTransaction();
    } catch (ProducerFencedException e1) {
        e1.printStackTrace();
        producer.close();
    } catch (KafkaException e2) {
        e2.printStackTrace();
        producer.abortTransaction();
    }
    producer.close();
    

    事务性的 API 也同样保持了 Kafka 一直以来的简洁性,使用起来是非常方便的。

    事务性要解决的问题

    回想一下,前面一篇文章中关于幂等性要解决的问题(幂等性要解决的问题),事务性其实更多的是解决幂等性中没有解决的问题,比如:

    1. 在写多个 Topic-Partition 时,执行的一批写入操作,有可能出现部分 Topic-Partition 写入成功,部分写入失败(比如达到重试次数),这相当于出现了中间的状态,这并不是我们期望的结果;
    2. Producer 应用中间挂之后再恢复,无法做到 Exactly-Once 语义保证;

    再来分析一下,Kafka 提供的事务性是如何解决上面两个问题的:

    1. 如果启用事务性的话,涉及到多个 Topic-Partition 的写入时,这个事务操作要么会全部成功,要么会全部失败,不会出现上面的情况(部分成功、部分失败),如果有 Topic-Partition 无法写入,那么当前这个事务操作会直接 abort;
    2. 其实应用做到端到端的 Exactly-Once,仅仅靠 Kafka 是无法做到的,还需要应用本身做相应的容错设计,以 Flink 为例,其容错设计就是 checkpoint 机制,作业保证在每次 checkpoint 成功时,它之前的处理都是 Exactly-Once 的,如果中间作业出现了故障,恢复之后,只需要接着上次 checkpoint 的记录做恢复即可,对于失败前那个未完成的事务执行回滚操作(abort)就可以了,这样的话就是实现了 Flink + Kafka 端到端的 Exactly-Once(这只是设计的思想,具体的实现后续会有文章详细解揭秘)。

    事务性实现的关键

    对于 Kafka 的事务性实现,最关键的就是其事务操作原子性的实现。对于一个事务操作而言,其会涉及到多个 Topic-Partition 数据的写入,如果是一个 long transaction 操作,可能会涉及到非常多的数据,如何才能保证这个事务操作的原子性(要么全部完成,要么全部失败)呢?

    1. 关于这点,最容易想到的应该是引用 2PC 协议(它主要是解决分布式系统数据一致性的问题)中协调者的角色,它的作用是统计所有参与者的投票结果,如果大家一致认为可以 commit,那么就执行 commit,否则执行 abort:
      • 我们来想一下,Kafka 是不是也可以引入一个类似的角色来管理事务的状态,只有当 Producer 真正 commit 时,事务才会提交,否则事务会还在进行中(实际的实现中还需要考虑 timeout 的情况),不会处于完成状态;
      • Producer 在开始一个事务时,告诉【协调者】事务开始,然后开始向多个 Topic-Partition 写数据,只有这批数据全部写完(中间没有出现异常),Producer 会调用 commit 接口进行 commit,然后事务真正提交,否则如果中间出现异常,那么事务将会被 abort(Producer 通过 abort 接口告诉【协调者】执行 abort 操作);
      • 这里的协调者与 2PC 中的协调者略有不同,主要为了管理事务相关的状态信息,这就是 Kafka Server 端的 TransactionCoordinator 角色;
    2. 有了上面的机制,是不是就可以了?很容易想到的问题就是 TransactionCoordinator 挂的话怎么办?TransactionCoordinator 如何实现高可用?
      • TransactionCoordinator 需要管理事务的状态信息,如果一个事务的 TransactionCoordinator 挂的话,需要转移到其他的机器上,这里关键是在 事务状态信息如何恢复? 也就是事务的状态信息需要很强的容错性、一致性
      • 关于数据的强容错性、一致性,存储的容错性方案基本就是多副本机制,而对于一致性,就有很多的机制实现,其实这个在 Kafka 内部已经实现(不考虑数据重复问题),那就是 min.isr + ack 机制;
      • 分析到这里,对于 Kafka 熟悉的同学应该就知道,这个是不是跟 __consumer_offset 这个内部的 topic 很像,TransactionCoordinator 也跟 GroupCoordinator 类似,而对应事务数据(transaction log)就是 __transaction_state 这个内部 topic,所有事务状态信息都会持久化到这个 topic,TransactionCoordinator 在做故障恢复也是从这个 topic 中恢复数据;
    3. 有了上面的机制,就够了么?我们再来考虑一种情况,我们期望一个 Producer 在 Fail 恢复后能主动 abort 上次未完成的事务(接上之前未完成的事务),然后重新开始一个事务,这种情况应该怎么办?之前幂等性引入的 PID 是无法解决这个问题的,因为每次 Producer 在重启时,PID 都会更新为一个新值:
      • Kafka 在 Producer 端引入了一个 TransactionalId 来解决这个问题,这个 txn.id 是由应用来配置的;
      • TransactionalId 的引入还有一个好处,就是跟 consumer group 类似,它可以用来标识一个事务操作,便于这个事务的所有操作都能在一个地方(同一个 TransactionCoordinator)进行处理;
    4. 再来考虑一个问题,在具体的实现时,我们应该如何标识一个事务操作的开始、进行、完成的状态?正常来说,一个事务操作是由很多操作组成的一个操作单元,对于 TransactionCoordinator 而言,是需要准确知道当前的事务操作处于哪个阶段,这样在容错恢复时,新选举的 TransactionCoordinator 才能恢复之前的状态:
      • 这个就是事务状态转移,一个事务从开始,都会有一个相应的状态标识,直到事务完成,有了事务的状态转移关系之后,TransactionCoordinator 对于事务的管理就会简单很多,TransactionCoordinator 会将当前事务的状态信息都会缓存起来,每当事务需要进行转移,就更新缓存中事务的状态(前提是这个状态转移是有效的)。

    上面的分析都是个人见解,有问题欢迎指正~

    下面这节就讲述一下事务性实现的一些关键的实现机制(对这些细节不太感兴趣或者之前没有深入接触过 Kafka,可以直接跳过,直接去看下一节的事务流程处理,先去了解一下一个事务操作的主要流程步骤)。

    TransactionCoordinator

    TransactionCoordinator 与 GroupCoordinator 有一些相似之处,它主要是处理来自 Transactional Producer 的一些与事务相关的请求,涉及的请求如下表所示(关于这些请求处理的详细过程会在下篇文章详细讲述,这里先有个大概的认识即可):

    请求类型用途说明
    ApiKeys.FIND_COORDINATORTransaction Producer 会发送这个 FindCoordinatorRequest 请求,来查询当前事务(txn.id)对应的 TransactionCoordinator,这个与 GroupCoordinator 查询类似,是根据 txn.id 的 hash 值取模找到对应 Partition 的 leader,这个 leader 就是该事务对应的 TransactionCoordinator
    ApiKeys.INIT_PRODUCER_IDProducer 初始化时,会发送一个 InitProducerIdRequest 请求,来获取其分配的 PID 信息,对于幂等性的 Producer,会随机选择一台 broker 发送请求,而对于 Transaction Producer 会选择向其对应的 TransactionCoordinator 发送该请求(目的是为了根据 txn.id 对应的事务状态做一些判断)
    ApiKeys.ADD_PARTITIONS_TO_TXN将这个事务涉及到的 topic-partition 列表添加到事务的 meta 信息中(通过 AddPartitionsToTxnRequest 请求),事务 meta 信息需要知道当前的事务操作涉及到了哪些 Topic-Partition 的写入
    ApiKeys.ADD_OFFSETS_TO_TXNTransaction Producer 的这个 AddOffsetsToTxnRequest 请求是由 sendOffsetsToTransaction() 接口触发的,它主要是用在 consume-process-produce 的场景中,这时候 consumer 也是整个事务的一部分,只有这个事务 commit 时,offset 才会被真正 commit(主要还是用于 Failover)
    ApiKeys.END_TXN当提交事务时, Transaction Producer 会向 TransactionCoordinator 发送一个 EndTxnRequest 请求,来 commit 或者 abort 事务

    TransactionCoordinator 对象中还有两个关键的对象,分别是:

    1. TransactionStateManager:这个对象,从名字应该就能大概明白其作用是关于事务的状态管理,它会维护分配到这个 TransactionCoordinator 的所有事务的 meta 信息;
    2. TransactionMarkerChannelManager:这个主要是用于向其他的 Broker 发送 Transaction Marker 数据,关于 Transaction Marker,第一次接触的人,可能会有一些困惑,什么是 Transaction Marker,Transaction Marker 是用来解决什么问题的呢?这里先留一个疑问,后面会来解密。

    总结一下,TransactionCoordinator 主要的功能有三个,分别是:

    1. 处理事务相关的请求;
    2. 维护事务的状态信息;
    3. 向其他 Broker 发送 Transaction Marker 数据。

    Transaction Log(__transaction_state)

    在前面分析中,讨论过一个问题,那就是如果 TransactionCoordinator 故障的话应该怎么恢复?怎么恢复之前的状态?我们知道 Kafka 内部有一个事务 topic __transaction_state,一个事务应该由哪个 TransactionCoordinator 来处理,是根据其 txn.id 的 hash 值与 __transaction_state 的 partition 数取模得到,__transaction_state Partition 默认是50个,假设取模之后的结果是2,那么这个 txn.id 应该由 __transaction_state Partition 2 的 leader 来处理。

    对于 __transaction_state 这个 topic 默认是由 Server 端的 transaction.state.log.replication.factor 参数来配置,默认是3,如果当前 leader 故障,需要进行 leader 切换,也就是对应的 TransactionCoordinator 需要迁移到新的 leader 上,迁移之后,如何恢复之前的事务状态信息呢?

    正如 GroupCoordinator 的实现一样,TransactionCoordinator 的恢复也是通过 __transaction_state 中读取之前事务的日志信息,来恢复其状态信息,前提是要求事务日志写入做相应的不丢配置。这也是 __transaction_state 一个重要作用之一,用于 TransactionCoordinator 的恢复,__transaction_state 与 __consumer_offsets 一样是 compact 类型的 topic,其 scheme 如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    Key => Version TransactionalId
        Version => 0 (int16)
        TransactionalId => String
    
    Value => Version ProducerId ProducerEpoch TxnTimeoutDuration TxnStatus [TxnPartitions] TxnEntryLastUpdateTime TxnStartTime
        Version => 0 (int16)
        ProducerId => int64
        ProducerEpoch => int16
        TxnTimeoutDuration => int32
        TxnStatus => int8
        TxnPartitions => [Topic [Partition]]
            Topic => String
            Partition => int32
        TxnLastUpdateTime => int64
        TxnStartTime => int64
    

    Transaction Marker

    终于讲到了 Transaction Marker,这也是前面留的一个疑问,什么是 Transaction Marker?Transaction Marker 是用来解决什么问题的呢?

    Transaction Marker 也叫做 control messages,它的作用主要是告诉这个事务操作涉及的 Topic-Partition Set 的 leaders 当前的事务操作已经完成,可以执行 commit 或者 abort(Marker 主要的内容就是 commit 或 abort),这个 marker 数据由该事务的 TransactionCoordinator 来发送的。我们来假设一下:如果没有 Transaction Marker,一个事务在完成后,如何执行 commit 操作?(以这个事务涉及多个 Topic-Partition 写入为例)

    1. Transactional Producer 在进行 commit 时,需要先告诉 TransactionCoordinator 这个事务可以 commit 了(因为 TransactionCoordinator 记录这个事务对应的状态信息),然后再去告诉这些 Topic-Partition 的 leader 当前已经可以 commit,也就是 Transactional Producer 在执行 commit 时,至少需要做两步操作;
    2. 在 Transactional Producer 通知这些 Topic-Partition 的 leader 事务可以 commit 时,这些 Topic-Partition 应该怎么处理呢?难道是 commit 时再把数据持久化到磁盘,abort 时就直接丢弃不做持久化?这明显是问题的,如果这是一个 long transaction 操作,写数据非常多,内存中无法存下,数据肯定是需要持久化到硬盘的,如果数据已经持久化到硬盘了,假设这个时候收到了一个 abort 操作,是需要把数据再从硬盘清掉?

      • 这种方案有一个问题是:已经持久化的数据是持久化到本身的日志文件,还是其他文件?如果持久化本来的日志文件中,那么 consumer 消费到一个未 commit 的数据怎么办?这些数据是有可能 abort 的,如果是持久化到其他文件中,这会涉及到数据多次写磁盘、从磁盘清除的操作,会影响其 server 端的性能;

      再看下如果有了 Transaction Marker 这个机制后,情况会变成什么样?

      1. 首先 Transactional Producer 只需要告诉 TransactionCoordinator 当前事务可以 commit,然后再由 TransactionCoordinator 来向其涉及到的 Topic-Partition 的 leader 发送 Transaction Marker 数据,这里减轻了 Client 的压力,而且 TransactionCoordinator 会做一些优化,如果这个目标 Broker 涉及到多个事务操作,是可以共享这个 TCP 连接的;
      2. 有了 Transaction Marker 之后,Producer 在持久化数据时就简单很多,写入的数据跟之前一样,按照条件持久化到硬盘(数据会有一个标识,标识这条或这批数据是不是事务写入的数据),当收到 Transaction Marker 时,把这个 Transaction Marker 数据也直接写入这个 Partition 中,这样在处理 Consumer 消费时,就可以根据 marker 信息做相应的处理。

    Transaction Marker 的数据格式如下,其中 ControlMessageType 为 0 代表是 COMMIT,为 1 代表是 ABORT:

    1
    2
    3
    4
    5
    6
    7
    
    ControlMessageKey => Version ControlMessageType
        Version => int16
        ControlMessageType => int16
    
    TransactionControlMessageValue => Version CoordinatorEpoch
        Version => int16
        CoordinatorEpoch => int32
    

    这里再讲一个额外的内容,对于事务写入的数据,为了给消息添加一个标识(标识这条消息是不是来自事务写入的),数据格式(消息协议)发生了变化,这个改动主要是在 Attribute 字段,对于 MessageSet,Attribute 是16位,新的格式如下:

    1
    
    | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
    

    对于 Message,也就是单条数据存储时(其中 Marker 数据都是单条存储的),在 Kafka 中,只有 MessageSet 才可以做压缩,所以 Message 就没必要设置压缩字段,其格式如下:

    1
    
    | Unused (1-7) | Control Flag(0) |
    

    Server 端事务状态管理

    TransactionCoordinator 会维护相应的事务的状态信息(也就是 TxnStatus),对于一个事务,总共有以下几种状态:

    状态状态码说明
    Empty0Transaction has not existed yet
    Ongoing1Transaction has started and ongoing
    PrepareCommit2Group is preparing to commit
    PrepareAbort3Group is preparing to abort
    CompleteCommit4Group has completed commit
    CompleteAbort5Group has completed abort
    Dead6TransactionalId has expired and is about to be removed from the transaction cache
    PrepareEpochFence7We are in the middle of bumping the epoch and fencing out older producers

    其相应有效的状态转移图如下:

    Server 端 Transaction 的状态转移图Server 端 Transaction 的状态转移图

    正常情况下,对于一个事务而言,其状态状态流程应该是 Empty –> Ongoing –> PrepareCommit –> CompleteCommit –> Empty 或者是 Empty –> Ongoing –> PrepareAbort –> CompleteAbort –> Empty。

    Client 端事务状态管理

    Client 的事务状态信息主要记录本地事务的状态,当然跟其他的系统类似,本地的状态信息与 Server 端的状态信息并不完全一致(状态的设置,就像 GroupCoodinator 会维护一个 Group 的状态,每个 Consumer 也会维护本地的 Consumer 对象的状态一样)。Client 端的事务状态信息主要用于 Client 端的事务状态处理,其主要有以下几种:

    1. UNINITIALIZED:Transactional Producer 初始化时的状态,此时还没有事务处理;
    2. INITIALIZING:Transactional Producer 调用 initTransactions() 方法初始化事务相关的内容,比如发送 InitProducerIdRequest 请求;
    3. READY:对于新建的事务,Transactional Producer 收到来自 TransactionCoordinator 的 InitProducerIdResponse 后,其状态会置为 READY(对于已有的事务而言,是当前事务完成后 Client 的状态会转移为 READY);
    4. IN_TRANSACTION:Transactional Producer 调用 beginTransaction() 方法,开始一个事务,标志着一个事务开始初始化;
    5. COMMITTING_TRANSACTION:Transactional Producer 调用 commitTransaction() 方法时,会先更新本地的状态信息;
    6. ABORTING_TRANSACTION:Transactional Producer 调用 abortTransaction() 方法时,会先更新本地的状态信息;
    7. ABORTABLE_ERROR:在一个事务操作中,如果有数据发送失败,本地状态会转移到这个状态,之后再自动 abort 事务;
    8. FATAL_ERROR:转移到这个状态之后,再进行状态转移时,会抛出异常;

    Client 端状态如下图:

    Client 端 Transaction 的状态转移图Client 端 Transaction 的状态转移图

    事务性的整体流程

    有了前面对 Kafka 事务性关键实现的讲述之后,这里详细讲述一个事务操作的处理流程,当然这里只是重点讲述事务性相关的内容,官方版的流程图可参考Kafka Exactly-Once Data Flow,这里我做了一些改动,其流程图如下:

    consume-process-produce 事务的处理流程consume-process-produce 事务的处理流程

    这个流程是以 consume-process-produce 场景为例(主要是 kafka streams 的场景),图中红虚框及 4.3a 部分是关于 consumer 的操作,去掉这部分的话,就是只考虑写入情况的场景。这种只考虑写入场景的事务操作目前在业内应用也是非常广泛的,比如 Flink + Kafka 端到端的 Exactly-Once 实现就是这种场景,下面来详细讲述一下整个流程。

    1. Finding a TransactionCoordinator

    对于事务性的处理,第一步首先需要做的就是找到这个事务 txn.id 对应的 TransactionCoordinator,Transaction Producer 会向 Broker (随机选择一台 broker,一般选择本地连接最少的这台 broker)发送 FindCoordinatorRequest 请求,获取其 TransactionCoordinator。

    怎么找到对应的 TransactionCoordinator 呢?这个前面已经讲过了,主要是通过下面的方法获取 __transaction_state 的 Partition,该 Partition 对应的 leader 就是这个 txn.id 对应的 TransactionCoordinator。

    1
    
    def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount
    

    2. Getting a PID

    PID 这里就不再介绍了,不了解的可以看前面那篇文章(Producer ID)。

    Transaction Producer 在 initializeTransactions() 方法中会向 TransactionCoordinator 发送 InitPidRequest 请求获取其分配的 PID,有了 PID,事务写入时可以保证幂等性,PID 如何分配可以参考 PID 分配,但是 TransactionCoordinator 在给事务 Producer 分配 PID 会做一些判断,主要的内容是:

    1. 如果这个 txn.id 之前没有相应的事务状态(new txn.id),那么会初始化其事务 meta 信息 TransactionMetadata(会给其分配一个 PID,初始的 epoch 为-1),如果有事务状态,获取之前的状态;
    2. 校验其 TransactionMetadata 的状态信息(参考下面代码中 prepareInitProduceIdTransit() 方法):
      1. 如果前面还有状态转移正在进行,直接返回 CONCURRENT_TRANSACTIONS 异常;
      2. 如果此时的状态为 PrepareAbort 或 PrepareCommit,返回 CONCURRENT_TRANSACTIONS 异常;
      3. 如果之前的状态为 CompleteAbort、CompleteCommit 或 Empty,那么先将状态转移为 Empty,然后更新一下 epoch 值;
      4. 如果之前的状态为 Ongoing,状态会转移成 PrepareEpochFence,然后再 abort 当前的事务,并向 client 返回 CONCURRENT_TRANSACTIONS 异常;
      5. 如果状态为 Dead 或 PrepareEpochFence,直接抛出相应的 FATAL 异常;
    3. 将 txn.id 与相应的 TransactionMetadata 持久化到事务日志中,对于 new txn.id,这个持久化的数据主要时 txn.id 与 pid 关系信息,如图中的 3a 所示。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    
    //note: producer 启用事务性的情况下,检测此时事务的状态信息
    private def prepareInitProduceIdTransit(transactionalId: String,
                                            transactionTimeoutMs: Int,
                                            coordinatorEpoch: Int,
                                            txnMetadata: TransactionMetadata): ApiResult[(Int, TxnTransitMetadata)] = {
      if (txnMetadata.pendingTransitionInProgress) {
        // return a retriable exception to let the client backoff and retry
        Left(Errors.CONCURRENT_TRANSACTIONS)
      } else {
        // caller should have synchronized on txnMetadata already
        txnMetadata.state match {
          case PrepareAbort | PrepareCommit =>
            // reply to client and let it backoff and retry
            Left(Errors.CONCURRENT_TRANSACTIONS)
    
          case CompleteAbort | CompleteCommit | Empty => //note: 此时需要将状态转移到 Empty(此时状态并没有转移,只是在 PendingState 记录了将要转移的状态)
            val transitMetadata = if (txnMetadata.isProducerEpochExhausted) {
              val newProducerId = producerIdManager.generateProducerId()
              txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds())
            } else { //note: 增加 producer 的 epoch 值
              txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, time.milliseconds())
            }
    
            Right(coordinatorEpoch, transitMetadata)
    
          case Ongoing => //note: abort 当前的事务,并返回一个 CONCURRENT_TRANSACTIONS 异常,强制 client 去重试
            // indicate to abort the current ongoing txn first. Note that this epoch is never returned to the
            // user. We will abort the ongoing transaction and return CONCURRENT_TRANSACTIONS to the client.
            // This forces the client to retry, which will ensure that the epoch is bumped a second time. In
            // particular, if fencing the current producer exhausts the available epochs for the current producerId,
            // then when the client retries, we will generate a new producerId.
            Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
    
          case Dead | PrepareEpochFence => //note: 返回错误
            val errorMsg = s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " +
              s"This is illegal as we should never have transitioned to this state."
            fatal(errorMsg)
            throw new IllegalStateException(errorMsg)
    
        }
      }
    }
    

    3. Starting a Transaction

    前面两步都是 Transaction Producer 调用 initTransactions() 部分,到这里,Producer 可以调用 beginTransaction() 开始一个事务操作,其实现方法如下面所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    
    //KafkaProducer
    //note: 应该在一个事务操作之前进行调用
    public void beginTransaction() throws ProducerFencedException {
        throwIfNoTransactionManager();
        transactionManager.beginTransaction();
    }
    
    // TransactionManager
    //note: 在一个事务开始之前进行调用,这里实际上只是转换了状态(只在 producer 本地记录了状态的开始)
    public synchronized void beginTransaction() {
        ensureTransactional();
        maybeFailWithError();
        transitionTo(State.IN_TRANSACTION);
    }
    

    这里只是将本地事务状态转移成 IN_TRANSACTION,并没有与 Server 端进行交互,所以在流程图中没有体现出来(TransactionManager 初始化时,其状态为 UNINITIALIZED,Producer 调用 initializeTransactions() 方法,其状态转移成 INITIALIZING)。

    4. Consume-Porcess-Produce Loop

    在这个阶段,Transaction Producer 会做相应的处理,主要包括:从 consumer 拉取数据、对数据做相应的处理、通过 Producer 写入到下游系统中(对于只有写入场景,忽略前面那一步即可),下面有一个示例(start 和 end 中间的部分),是一个典型的 consume-process-produce 场景:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    while (true) {
        ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
        producer.beginTransaction();
        //start
        for (ConsumerRecord record : records){
            producer.send(producerRecord(“outputTopic1”, record));
            producer.send(producerRecord(“outputTopic2”, record));
        }
        producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
        //end
        producer.commitTransaction();
    }
    

    下面来结合前面的流程图来讲述一下这部分的实现。

    4.1. AddPartitionsToTxnRequest

    Producer 在调用 send() 方法时,Producer 会将这个对应的 Topic—Partition 添加到 TransactionManager 的记录中,如下所示:

    1
    2
    3
    
    //note: 如何开启了幂等性或事务性,需要做一些处理
    if (transactionManager != null && transactionManager.isTransactional())
        transactionManager.maybeAddPartitionToTransaction(tp);
    

    如果这个 Topic-Partition 之前不存在,那么就添加到 newPartitionsInTransaction 集合中,如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    //note: 将 tp 添加到 newPartitionsInTransaction 中,记录当前进行事务操作的 tp
    public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
        failIfNotReadyForSend();
    
        //note: 如果 partition 已经添加到 partitionsInTransaction、pendingPartitionsInTransaction、newPartitionsInTransaction中
        if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
            return;
    
        log.debug("Begin adding new partition {} to transaction", topicPartition);
        newPartitionsInTransaction.add(topicPartition);
    }
    

    Producer 端的 Sender 线程会将这个信息通过 AddPartitionsToTxnRequest 请求发送给 TransactionCoordinator,也就是图中的 4.1 过程,TransactionCoordinator 会将这个 Topic-Partition 列表更新到 txn.id 对应的 TransactionMetadata 中,并且会持久化到事务日志中,也就是图中的 4.1 a 部分,这里持久化的数据主要是 txn.id 与其涉及到的 Topic-Partition 信息。

    4.2. ProduceRequest

    这一步与正常 Producer 写入基本上一样,就是相应的 Leader 在持久化数据时会在头信息中标识这条数据是不是来自事务 Producer 的写入(主要是数据协议有变动,Server 处理并不需要做额外的处理)。

    4.3. AddOffsetsToTxnRequest

    Producer 在调用 sendOffsetsToTransaction() 方法时,第一步会首先向 TransactionCoordinator 发送相应的 AddOffsetsToTxnRequest 请求,如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    //class KafkaProcducer
    //note: 当你需要 batch 的消费-处理-写入消息,这个方法需要被使用
    //note: 发送指定的 offset 给 group coordinator,用来标记这些 offset 是作为当前事务的一部分,只有这次事务成功时
    //note: 这些 offset 才会被认为 commit 了
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                         String consumerGroupId) throws ProducerFencedException {
        throwIfNoTransactionManager();
        TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
        sender.wakeup();
        result.await();
    }
    
    
    // class TransactionManager
    //note: 发送 AddOffsetsToTxRequest
    public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                                                            String consumerGroupId) {
        ensureTransactional();
        maybeFailWithError();
        if (currentState != State.IN_TRANSACTION)
            throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " +
                    "active transaction");
    
        log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, consumerGroupId);
        AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
        AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
        enqueueRequest(handler);
        return handler.result;
    }
    

    TransactionCoordinator 在收到这个请求时,处理方法与 4.1 中的一样,把这个 group.id 对应的 __consumer_offsets 的 Partition (与写入涉及的 Topic-Partition 一样)保存到事务对应的 meta 中,之后会持久化相应的事务日志,如图中 4.3a 所示。

    4.4. TxnOffsetsCommitRequest

    Producer 在收到 TransactionCoordinator 关于 AddOffsetsToTxnRequest 请求的结果后,后再次发送 TxnOffsetsCommitRequest 请求给对应的 GroupCoordinator,AddOffsetsToTxnHandler 的 handleResponse() 的实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    
    @Override
    public void handleResponse(AbstractResponse response) {
        AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
        Errors error = addOffsetsToTxnResponse.error();
    
        if (error == Errors.NONE) {
            log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId());
    
            // note the result is not completed until the TxnOffsetCommit returns
            //note: AddOffsetsToTnxRequest 之后,还会再发送 TxnOffsetCommitRequest
            pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
            transactionStarted = true;
        } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
            lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
            reenqueue();
        } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
            reenqueue();
        } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
            fatalError(error.exception());
        } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
            fatalError(error.exception());
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
        } else {
            fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
        }
    }
    

    GroupCoordinator 在收到相应的请求后,会将 offset 信息持久化到 consumer offsets log 中(包含对应的 PID 信息),但是不会更新到缓存中,除非这个事务 commit 了,这样的话就可以保证这个 offset 信息对 consumer 是不可见的(没有更新到缓存中的数据是不可见的,通过接口是获取的,这是 GroupCoordinator 本身来保证的)。

    5.Committing or Aborting a Transaction

    在一个事务操作处理完成之后,Producer 需要调用 commitTransaction() 或者 abortTransaction() 方法来 commit 或者 abort 这个事务操作。

    5.1. EndTxnRequest

    无论是 Commit 还是 Abort,对于 Producer 而言,都是向 TransactionCoordinator 发送 EndTxnRequest 请求,这个请求的内容里会标识是 commit 操作还是 abort 操作,Producer 的 commitTransaction()方法实现如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    //class KafkaProducer
    //note: commit 正在进行的事务操作,这个方法在真正发送 commit 之后将会 flush 所有未发送的数据
    //note: 如果在发送中遇到任何一个不能修复的错误,这个方法抛出异常,事务也不会被提交,所有 send 必须成功,这个事务才能 commit 成功
    public void commitTransaction() throws ProducerFencedException {
        throwIfNoTransactionManager();
        TransactionalRequestResult result = transactionManager.beginCommit();
        sender.wakeup();
        result.await();
    }
    
    // class TransactionManager
    //note: 开始 commit,转移本地本地保存的状态以及发送相应的请求
    public synchronized TransactionalRequestResult beginCommit() {
        ensureTransactional();
        maybeFailWithError();
        transitionTo(State.COMMITTING_TRANSACTION);
        return beginCompletingTransaction(TransactionResult.COMMIT);
    }
    

    Producer 的 abortTransaction() 方法实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    //class KafkaProducer
    //note: 取消正在进行事务,任何没有 flush 的数据都会被丢弃
    public void abortTransaction() throws ProducerFencedException {
        throwIfNoTransactionManager();
        TransactionalRequestResult result = transactionManager.beginAbort();
        sender.wakeup();
        result.await();
    }
    
    // class TransactionManager
    public synchronized TransactionalRequestResult beginAbort() {
        ensureTransactional();
        if (currentState != State.ABORTABLE_ERROR)
            maybeFailWithError();
        transitionTo(State.ABORTING_TRANSACTION);
    
        // We're aborting the transaction, so there should be no need to add new partitions
        newPartitionsInTransaction.clear();
        return beginCompletingTransaction(TransactionResult.ABORT);
    }
    

    它们最终都是调用了 TransactionManager 的 beginCompletingTransaction() 方法,这个方法会向其 待发送请求列表 中添加 EndTxnRequest 请求,其实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    //note: 发送 EndTxnRequest 请求,添加到 pending 队列中
    private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
        if (!newPartitionsInTransaction.isEmpty())
            enqueueRequest(addPartitionsToTransactionHandler());
        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
                producerIdAndEpoch.epoch, transactionResult);
        EndTxnHandler handler = new EndTxnHandler(builder);
        enqueueRequest(handler);
        return handler.result;
    }
    

    TransactionCoordinator 在收到 EndTxnRequest 请求后,会做以下处理:

    1. 更新事务的 meta 信息,状态转移成 PREPARE_COMMIT 或 PREPARE_ABORT,并将事务状态信息持久化到事务日志中;
    2. 根据事务 meta 信息,向其涉及到的所有 Topic-Partition 的 leader 发送 Transaction Marker 信息(也就是 WriteTxnMarkerRquest 请求,见下面的 5.2 分析);
    3. 最后将事务状态更新为 COMMIT 或者 ABORT,并将事务的 meta 持久化到事务日志中,也就是 5.3 步骤。

    5.2. WriteTxnMarkerRquest

    WriteTxnMarkerRquest 是 TransactionCoordinator 收到 Producer 的 EndTxnRequest 请求后向其他 Broker 发送的请求,主要是告诉它们事务已经完成。不论是普通的 Topic-Partition 还是 __consumer_offsets,在收到这个请求后,都会把事务结果(Transaction Marker 的格数据式见前面)持久化到对应的日志文件中,这样下游 Consumer 在消费这个数据时,就知道这个事务是 commit 还是 abort。

    5.3. Writing the Final Commit or Abort Message

    当这个事务涉及到所有 Topic-Partition 都已经把这个 marker 信息持久化到日志文件之后,TransactionCoordinator 会将这个事务的状态置为 COMMIT 或 ABORT,并持久化到事务日志文件中,到这里,这个事务操作就算真正完成了,TransactionCoordinator 缓存的很多关于这个事务的数据可以被清除了。

    小思考

    在上面讲述完 Kafka 事务性处理之后,我们来思考一下以下这些问题,上面的流程可能会出现下面这些问题或者很多人可能会有下面的疑问:

    1. txn.id 是否可以被多 Producer 使用,如果有多个 Producer 使用了这个 txn.id 会出现什么问题?
    2. TransactionCoordinator Fencing 和 Producer Fencing 分别是什么,它们是用来解决什么问题的?
    3. 对于事务的数据,Consumer 端是如何消费的,一个事务可能会 commit,也可能会 abort,这个在 Consumer 端是如何体现的?
    4. 对于一个 Topic,如果既有事务数据写入又有其他 topic 数据写入,消费时,其顺序性时怎么保证的?
    5. 如果 txn.id 长期不使用,server 端怎么处理?
    6. PID Snapshot 是做什么的?是用来解决什么问题?

    下面,来详细分析一下上面提到的这些问题。

    如果多个 Producer 使用同一个 txn.id 会出现什么情况?

    对于这个情况,我们这里直接做了一个相应的实验,两个 Producer 示例都使用了同一个 txn.id(为 test-transactional-matt),Producer 1 先启动,然后过一会再启动 Producer 2,这时候会发现一个现象,那就是 Producer 1 进程会抛出异常退出进程,其异常信息为:

    1
    2
    3
    4
    5
    6
    
    org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    	at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
    	at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)
    	at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
    	at com.matt.test.kafka.producer.ProducerTransactionExample.main(ProducerTransactionExample.java:68)
    Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
    

    这里抛出了 ProducerFencedException 异常,如果打开相应的 Debug 日志,在 Producer 1 的日志文件会看到下面的日志信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
    [2018-11-03 12:48:52,495] DEBUG [Producer clientId=ProducerTransactionExample, transactionalId=test-transactional-matt] Transition from state COMMITTING_TRANSACTION to error state FATAL_ERROR (org.apache.kafka.clients.producer.internals.TransactionManager)
    org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
    [2018-11-03 12:48:52,498] ERROR [Producer clientId=ProducerTransactionExample, transactionalId=test-transactional-matt] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
    org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
    [2018-11-03 12:48:52,599] INFO [Producer clientId=ProducerTransactionExample, transactionalId=test-transactional-matt] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
    [2018-11-03 12:48:52,599] DEBUG [Producer clientId=ProducerTransactionExample, transactionalId=test-transactional-matt] Beginning shutdown of Kafka producer I/O thread, sending remaining records. (org.apache.kafka.clients.producer.internals.Sender)
    [2018-11-03 12:48:52,601] DEBUG Removed sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,601] DEBUG Removed sensor with name connections-created: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,602] DEBUG Removed sensor with name successful-authentication: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,602] DEBUG Removed sensor with name failed-authentication: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,602] DEBUG Removed sensor with name bytes-sent-received: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,603] DEBUG Removed sensor with name bytes-sent: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,603] DEBUG Removed sensor with name bytes-received: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,604] DEBUG Removed sensor with name select-time: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,604] DEBUG Removed sensor with name io-time: (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,604] DEBUG Removed sensor with name node--1.bytes-sent (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,605] DEBUG Removed sensor with name node--1.bytes-received (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,605] DEBUG Removed sensor with name node--1.latency (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,605] DEBUG Removed sensor with name node-33.bytes-sent (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,606] DEBUG Removed sensor with name node-33.bytes-received (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,606] DEBUG Removed sensor with name node-33.latency (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,606] DEBUG Removed sensor with name node-35.bytes-sent (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,606] DEBUG Removed sensor with name node-35.bytes-received (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,606] DEBUG Removed sensor with name node-35.latency (org.apache.kafka.common.metrics.Metrics)
    [2018-11-03 12:48:52,607] DEBUG [Producer clientId=ProducerTransactionExample, transactionalId=test-transactional-matt] Shutdown of Kafka producer I/O thread has completed. (org.apache.kafka.clients.producer.internals.Sender)
    [2018-11-03 12:48:52,607] DEBUG [Producer clientId=ProducerTransactionExample, transactionalId=test-transactional-matt] Kafka producer has been closed (org.apache.kafka.clients.producer.KafkaProducer)
    [2018-11-03 12:48:52,808] ERROR Forcing producer close! (com.matt.test.kafka.producer.ProducerTransactionExample)
    [2018-11-03 12:48:52,808] INFO [Producer clientId=ProducerTransactionExample, transactionalId=test-transactional-matt] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
    [2018-11-03 12:48:52,808] DEBUG [Producer clientId=ProducerTransactionExample, transactionalId=test-transactional-matt] Kafka producer has been closed (org.apache.kafka.clients.producer.KafkaProducer)
    

    Producer 1 本地事务状态从 COMMITTING_TRANSACTION 变成了 FATAL_ERROR 状态,导致 Producer 进程直接退出了,出现这个异常的原因,就是抛出的 ProducerFencedException 异常,简单来说 Producer 1 被 Fencing 了(这是 Producer Fencing 的情况)。因此,这个问题的答案就很清除了,如果多个 Producer 共用一个 txn.id,那么最后启动的 Producer 会成功运行,会它之前启动的 Producer 都 Fencing 掉(至于为什么会 Fencing 下一小节会做分析)。

    Fencing

    关于 Fencing 这个机制,在分布式系统还是很常见的,我第一个见到这个机制是在 HDFS 中,可以参考我之前总结的一篇文章 HDFS NN 脑裂问题,Fencing 机制解决的主要也是这种类型的问题 —— 脑裂问题,简单来说就是,本来系统这个组件在某个时刻应该只有一个处于 active 状态的,但是在实际生产环境中,特别是切换期间,可能会同时出现两个组件处于 active 状态,这就是脑裂问题,在 Kafka 的事务场景下,用到 Fencing 机制有两个地方:

    1. TransactionCoordinator Fencing;
    2. Producer Fencing;

    TransactionCoordinator Fencing

    TransactionCoordinator 在遇到上 long FGC 时,可能会导致 脑裂 问题,FGC 时会 stop-the-world,这时候可能会与 zk 连接超时导致临时节点消失进而触发 leader 选举,如果 __transaction_state 发生了 leader 选举,TransactionCoordinator 就会切换,如果此时旧的 TransactionCoordinator FGC 完成,在还没来得及同步到最细 meta 之前,会有一个短暂的时刻,对于一个 txn.id 而言就是这个时刻可能出现了两个 TransactionCoordinator。

    相应的解决方案就是 TransactionCoordinator Fencing,这里 Fencing 策略不像离线场景 HDFS 这种直接 Kill 旧的 NN 进程或者强制切换状态这么暴力,而是通过 CoordinatorEpoch 来判断,每个 TransactionCoordinator 都有其 CoordinatorEpoch 值,这个值就是对应 __transaction_statePartition 的 Epoch 值(每当 leader 切换一次,该值就会自增1)。

    明白了 TransactionCoordinator 脑裂问题发生情况及解决方案之后,来分析下,Fencing 机制会在哪里发挥作用?仔细想想,是可以推断出来的,只可能是 TransactionCoordinator 向别人发请求时影响才会比较严重(特别是乱发 admin 命令)。有了 CoordinatorEpoch 之后,其他 Server 在收到请求时做相应的判断,如果发现 CoordinatorEpoch 值比缓存的最新的值小,那么 Fencing 就生效,拒绝这个请求,也就是 TransactionCoordinator 发送 WriteTxnMarkerRequest 时可能会触发这一机制。

    Producer Fencing

    Producer Fencing 与前面的类似,如果对于相同 PID 和 txn.id 的 Producer,Server 端会记录最新的 Epoch 值,拒绝来自 zombie Producer (Epoch 值小的 Producer)的请求。前面第一个问题的情况,Producer 2 在启动时,会向 TransactionCoordinator 发送 InitPIDRequest 请求,此时 TransactionCoordinator 已经有了这个 txn.id 对应的 meta,会返回之前分配的 PID,并把 Epoch 自增 1 返回,这样 Producer 2 就被认为是最新的 Producer,而 Producer 1 就会被认为是 zombie Producer,因此,TransactionCoordinator 在处理 Producer 1 的事务请求时,会返回相应的异常信息。

    Consumer 端如何消费事务数据

    在讲述这个问题之前,需要先介绍一下事务场景下,Consumer 的消费策略,Consumer 有一个 isolation.level 配置,这个是配置对于事务性数据的消费策略,有以下两种可选配置:

    1. read_committed: only consume non-­transactional messages or transactional messages that are already committed, in offset ordering.
    2. read_uncommitted: consume all available messages in offset ordering. This is the default value.

    简单来说就是,read_committed 只会读取 commit 的数据,而 abort 的数据不会向 consumer 显现,对于 read_uncommitted 这种模式,consumer 可以读取到所有数据(control msg 会过滤掉),这种模式与普通的消费机制基本没有区别,就是做了一个 check,过滤掉 control msg(也就是 marker 数据),这部分的难点在于 read_committed 机制的实现。

    Last Stable Offset(LSO)

    在事务机制的实现中,Kafka 又设置了一个新的 offset 概念,那就是 Last Stable Offset,简称 LSO(其他的 Offset 概念可参考 Kafka Offset 那些事),先看下 LSO 的定义:

    The LSO is defined as the latest offset such that the status of all transactional messages at lower offsets have been determined (i.e. committed or aborted).

    对于一个 Partition 而言,offset 小于 LSO 的数据,全都是已经确定的数据,这个主要是对于事务操作而言,在这个 offset 之前的事务操作都是已经完成的事务(已经 commit 或 abort),如果这个 Partition 没有涉及到事务数据,那么 LSO 就是其 HW(水位)。

    Server 处理 read_committed 类型的 Fetch 请求

    如果 Consumer 的消费策略设置的是 read_committed,其在向 Server 发送 Fetch 请求时,Server 端只会返回 LSO 之前的数据,在 LSO 之后的数据不会返回。

    这种机制有没有什么问题呢?我现在能想到的就是如果有一个 long transaction,比如其 first offset 是 1000,另外有几个已经完成的小事务操作,比如:txn1(offset:1100~1200)、txn2(offset:1400~1500),假设此时的 LSO 是 1000,也就是说这个 long transaction 还没有完成,那么已经完成的 txn1、txn2 也会对 consumer 不可见(假设都是 commit 操作),此时受 long transaction 的影响可能会导致数据有延迟

    那么我们再来想一下,如果不设计 LSO,又会有什么问题呢?可能分两种情况:

    1. 允许读未完成的事务:那么 Consumer 可以直接读取到 Partition 的 HW 位置,对于未完成的事务,因为设置的是 read_committed 机制,所以不能对用户可见,需要在 Consumer 端做缓存,这个缓存应该设置多大?(不限制肯定会出现 OOM 的情况,当然也可以现在 client 端持久化到硬盘,这样的设计太过于复杂,还需要考虑 client 端 IO、磁盘故障等风险),明显这种设计方案是不可行的;
    2. 如果不允许读未完成的事务:相当于还是在 Server 端处理,与前面的区别是,这里需要先把示例中的 txn1、txn2 的数据发送给 Consumer,这样的设计会带来什么问题呢?
      1. 假设这个 long transaction commit 了,其 end offset 是 2000,这时候有两种方案:第一种是把 1000-2000 的数据全部读出来(可能是磁盘读),把这个 long transaction 的数据过滤出来返回给 Consumer;第二种是随机读,只读这个 long transaction 的数据,无论哪种都有多触发一次磁盘读的风险,可能影响影响 Server 端的性能;
      2. Server 端需要维护每个 consumer group 有哪些事务读了、哪些事务没读的 meta 信息,因为 consumer 是随机可能挂掉,需要接上次消费的,这样实现就复杂很多了;
      3. 还有一个问题是,消费的顺序性无法保证,两次消费其读取到的数据顺序可能是不同的(两次消费启动时间不一样);

    从这些分析来看,个人认为 LSO 机制还是一种相当来说 实现起来比较简单、而且不影响原来 server 端性能、还能保证顺序性的一种设计方案,它不一定是最好的,但也不会差太多。在实际的生产场景中,尽量避免 long transaction 这种操作,而且 long transaction可能也会容易触发事务超时。

    Consumer 如何过滤 abort 的事务数据

    Consumer 在拉取到相应的数据之后,后面该怎么处理呢?它拉取到的这批数据并不能保证都是完整的事务数据,很有可能是拉取到一个事务的部分数据(marker 数据还没有拉取到),这时候应该怎么办?难道 Consumer 先把这部分数据缓存下来,等后面的 marker 数据到来时再确认数据应该不应该丢弃?(还是又 OOM 的风险)有没有更好的实现方案?

    Kafka 的设计总是不会让我们失望,这部分做的优化也是非常高明,Broker 会追踪每个 Partition 涉及到的 abort transactions,Partition 的每个 log segment 都会有一个单独只写的文件(append-only file)来存储 abort transaction 信息,因为 abort transaction 并不是很多,所以这个开销是可以可以接受的,之所以要持久化到磁盘,主要是为了故障后快速恢复,要不然 Broker 需要把这个 Partition 的所有数据都读一遍,才能直到哪些事务是 abort 的,这样的话,开销太大(如果这个 Partition 没有事务操作,就不会生成这个文件)。这个持久化的文件是以 .txnindex 做后缀,前面依然是这个 log segment 的 offset 信息,存储的数据格式如下:

    1
    2
    3
    4
    5
    6
    
    TransactionEntry =>
        Version => int16
        PID => int64
        FirstOffset => int64
        LastOffset => int64
        LastStableOffset => int64
    

    有了这个设计,Consumer 在拉取数据时,Broker 会把这批数据涉及到的所有 abort transaction 信息都返回给 Consumer,Server 端会根据拉取的 offset 范围与 abort transaction 的 offset 做对比,返回涉及到的 abort transaction 集合,其实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = {
      val abortedTransactions = ListBuffer.empty[AbortedTxn]
      for ((abortedTxn, _) <- iterator()) {
        if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset)
          abortedTransactions += abortedTxn //note: 这个 abort 的事务有在在这个范围内,就返回
    
        if (abortedTxn.lastStableOffset >= upperBoundOffset)
          return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true)
      }
      TxnIndexSearchResult(abortedTransactions.toList, isComplete = false)
    }
    

    Consumer 在拿到这些数据之后,会进行相应的过滤,大概的判断逻辑如下(Server 端返回的 abort transaction 列表就保存在 abortedTransactions 集合中,abortedProducerIds 最开始时是为空的):

    1. 如果这个数据是 control msg(也即是 marker 数据),是 ABORT 的话,那么与这个事务相关的 PID 信息从 abortedProducerIds 集合删掉,是 COMMIT 的话,就忽略(每个这个 PID 对应的 marker 数据收到之后,就从 abortedProducerIds 中清除这个 PID 信息);
    2. 如果这个数据是正常的数据,把它的 PID 和 offset 信息与 abortedTransactions 队列(有序队列,头部 transaction 的 first offset 最小)第一个 transaction 做比较,如果 PID 相同,并且 offset 大于等于这个 transaction 的 first offset,就将这个 PID 信息添加到 abortedProducerIds 集合中,同时从 abortedTransactions 队列中删除这个 transaction,最后再丢掉这个 batch(它是 abort transaction 的数据);
    3. 检查这个 batch 的 PID 是否在 abortedProducerIds 集合中,在的话,就丢弃,不在的话就返回上层应用。

    这部分的实现确实有些绕(有兴趣的可以慢慢咀嚼一下),它严重依赖了 Kafka 提供的下面两种保证:

    1. Consumer 拉取到的数据,在处理时,其 offset 是严格有序的;
    2. 同一个 txn.id(PID 相同)在某一个时刻最多只能有一个事务正在进行;

    这部分代码实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    
    private Record nextFetchedRecord() {
        while (true) {
            if (records == null || !records.hasNext()) { //note: records 为空(数据全部丢掉了),records 没有数据(是 control msg)
                maybeCloseRecordStream();
    
                if (!batches.hasNext()) {
                    // Message format v2 preserves the last offset in a batch even if the last record is removed
                    // through compaction. By using the next offset computed from the last offset in the batch,
                    // we ensure that the offset of the next fetch will point to the next batch, which avoids
                    // unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck
                    // fetching the same batch repeatedly).
                    if (currentBatch != null)
                        nextFetchOffset = currentBatch.nextOffset();
                    drain();
                    return null;
                }
    
                currentBatch = batches.next();
                maybeEnsureValid(currentBatch);
    
                if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
                    //note: 需要做相应的判断
                    // remove from the aborted transaction queue all aborted transactions which have begun
                    // before the current batch's last offset and add the associated producerIds to the
                    // aborted producer set
                    //note: 如果这个 batch 的 offset 已经大于等于 abortedTransactions 中第一事务的 first offset
                    //note: 那就证明下个 abort transaction 的数据已经开始到来,将 PID 添加到 abortedProducerIds 中
                    consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
    
                    long producerId = currentBatch.producerId();
                    if (containsAbortMarker(currentBatch)) {
                        abortedProducerIds.remove(producerId); //note: 这个 PID(当前事务)涉及到的数据已经处理完
                    } else if (isBatchAborted(currentBatch)) { //note: 丢弃这个数据
                        log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
                                      "offsets {} to {}",
                                  partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
                        nextFetchOffset = currentBatch.nextOffset();
                        continue;
                    }
                }
    
                records = currentBatch.streamingIterator(decompressionBufferSupplier);
            } else {
                Record record = records.next();
                // skip any records out of range
                if (record.offset() >= nextFetchOffset) {
                    // we only do validation when the message should not be skipped.
                    maybeEnsureValid(record);
    
                    // control records are not returned to the user
                    if (!currentBatch.isControlBatch()) { //note: 过滤掉 marker 数据
                        return record;
                    } else {
                        // Increment the next fetch offset when we skip a control batch.
                        nextFetchOffset = record.offset() + 1;
                    }
                }
            }
        }
    }
    

    Consumer 消费数据时,其顺序如何保证

    有了前面的分析,这个问题就很好回答了,顺序性还是严格按照 offset 的,只不过遇到 abort trsansaction 的数据时就丢弃掉,其他的与普通 Consumer 并没有区别。

    如果 txn.id 长期不使用,server 端怎么处理?

    Producer 在开始一个事务操作时,可以设置其事务超时时间(参数是 transaction.timeout.ms,默认60s),而且 Server 端还有一个最大可允许的事务操作超时时间(参数是 transaction.timeout.ms,默认是15min),Producer 设置超时时间不能超过 Server,否则的话会抛出异常。

    上面是关于事务操作的超时设置,而对于 txn.id,我们知道 TransactionCoordinator 会缓存 txn.id 的相关信息,如果没有超时机制,这个 meta 大小是无法预估的,Server 端提供了一个 transaction.id.expiration.ms 参数来配置这个超时时间(默认是7天),如果超过这个时间没有任何事务相关的请求发送过来,那么 TransactionCoordinator 将会使这个 txn.id 过期。

    PID Snapshot 是做什么的?用来解决什么问题?

    对于每个 Topic-Partition,Broker 都会在内存中维护其 PID 与 sequence number(最后成功写入的 msg 的 sequence number)的对应关系(这个在上面幂等性文章应讲述过,主要是为了不丢补充的实现)。

    Broker 重启时,如果想恢复上面的状态信息,那么它读取所有的 log 文件。相比于之下,定期对这个 state 信息做 checkpoint(Snapshot),明显收益是非常大的,此时如果 Broker 重启,只需要读取最近一个 Snapshot 文件,之后的数据再从 log 文件中恢复即可。

    这个 PID Snapshot 样式如 00000000000235947656.snapshot,以 .snapshot 作为后缀,其数据格式如下:

    1
    2
    3
    4
    
    [matt@XXX-35 app.matt_test_transaction_json_3-2]$ /usr/local/java18/bin/java -Djava.ext.dirs=/XXX/kafka/libs kafka.tools.DumpLogSegments --files 00000000000235947656.snapshot
    Dumping 00000000000235947656.snapshot
    producerId: 2000 producerEpoch: 1 coordinatorEpoch: 4 currentTxnFirstOffset: None firstSequence: 95769510 lastSequence: 95769511 lastOffset: 235947654 offsetDelta: 1 timestamp: 1541325156503
    producerId: 3000 producerEpoch: 5 coordinatorEpoch: 6 currentTxnFirstOffset: None firstSequence: 91669662 lastSequence: 91669666 lastOffset: 235947651 offsetDelta: 4 timestamp: 1541325156454
    

    在实际的使用中,这个 snapshot 文件一般只会保存最近的两个文件。

    中间流程故障如何恢复

    对于上面所讲述的一个事务操作流程,实际生产环境中,任何一个地方都有可能出现的失败:

    1. Producer 在发送 beginTransaction() 时,如果出现 timeout 或者错误:Producer 只需要重试即可;
    2. Producer 在发送数据时出现错误:Producer 应该 abort 这个事务,如果 Produce 没有 abort(比如设置了重试无限次,并且 batch 超时设置得非常大),TransactionCoordinator 将会在这个事务超时之后 abort 这个事务操作;
    3. Producer 发送 commitTransaction() 时出现 timeout 或者错误:Producer 应该重试这个请求;
    4. Coordinator Failure:如果 Transaction Coordinator 发生切换(事务 topic leader 切换),Coordinator 可以从日志中恢复。如果发送事务有处于 PREPARE_COMMIT 或 PREPARE_ABORT 状态,那么直接执行 commit 或者 abort 操作,如果是一个正在进行的事务,Coordinator 的失败并不需要 abort 事务,producer 只需要向新的 Coordinator 发送请求即可。

    陆陆续续写了几天,终于把这篇文章总结完了。


    参考:

    1. Exactly Once Delivery and Transactional Messaging in Kafka
    2. Idempotent Producer
    3. Exactly-once Semantics in Apache Kafka
    4. Transactional Messaging in Kafka
    5. Transactions in Apache Kafka
    展开全文
  • 基于最新Spring 5.x,详细介绍了Spring 事务源码,包括BeanFactoryTransactionAttributeSourceAdvisor注解事务通知器源码解析。
  • 在分布式系统中一次操作需要由多个服务协同完成,这种由不同的服务之间通过网络协同完成...本文详解介绍七种常见分布式事务的原理以及优缺点和适用场景(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知
  • 微服务架构下的事务一致保证

    千次阅读 2019-04-04 14:07:56
    今天我给大家分享的题目是微服务架构下的事务一致保证。 主要内容包括4部分: 传统分布式事务不是微服务中一致的最佳选择 微服务架构中应满足数据最终一致原则 微服务架构实现最终一致的三种模式 对账...
  • 最大努力通知型方案(一般跨平台通知比较常用)1、基于可靠消息最终一致方案场景:对应支付系统会计异步记账业务;银行通知结果信息存储与驱动订单处理。2、TCC方案场景:对应支付系统的订单账户操作:订单处理、...
  • 分布式事务10_最大努力通知

    千次阅读 2018-02-16 21:35:05
    分布式事务10_最大努力通知形势更多干货分布式事务处理一分布式事务二分布式事务处理三分布式事务四_基于可靠消息的最终一致分布式事务五_基于可靠消息的最终一致_异常流程分布式事务六_常规MQ队列分布式事务七_...
  • 本文先介绍原理,再编码实践,通过Spring Cloud Stream框架,结合使用rocketmq来实现事务性消息。
  • 数据库怎么保证(分布式)事务一致

    万次阅读 多人点赞 2019-03-15 10:15:43
    浅谈事务与一致问题 原文地址 https://www.jianshu.com/p/f0a1b00a6002 在高并发场景下,分布式储存和处理已经是常用手段。但分布式的结构势必会带来“不一致”的麻烦问题,而事务正是解决这一问题而引入的一种...
  • 点击上方“朱小厮的博客”,选择“设为星标”后台回复”加群“获取公众号专属群聊入口随着业务的快速发展、业务复杂度越来越高,传统单体应用逐渐暴露出了一些问题,例如开发效率低、可维护差、架构...
  • 一、单机事务的延伸 二、九十年代的XA事务 三、常见的分布式事务方案 事务补偿 本地消息表 消息队列 四、分布式事务框架Seata seata结构简介,TC、TM、RM。 AT模式 一阶段(prepare):所有RM解析当前sql,自动生成...
  • 前言 方案简介 ...最大努力通知方案: 适用场景: 对于业务最终一致的时间敏感度比较低的。 实现: 1.主要由业务活动的主动方,在完成相关业务处理之后,向业务活动的被动方发送消息;消息允许...
  • 分布式事务课程列表: 第01节--课程介绍 第02节--解决方案的效果演示 第03节--常用的分布式事务解决方案介绍 第04节--消息发送一致方案探讨(可靠消息的前提保障) 第05节--JMS规范的消息发送与接收特点 第...
  • mq实现分布式事务-补偿事务一致CAP原则Rocket mq实现思路Rabbit mq实现思路需要考虑的问题后记 严格的来说,消息中间件并不能实现分布式事务,而是通过事后补偿机制,达到和分布式事务一样的数据一致。这里主要...
  • 分布式事务领域开山之作,冰河与猫大人带你深入理解分布式事务,强烈建议收藏!!
  • 分布式柔性事务详解--基于事务型MQ

    千次阅读 2020-07-05 16:29:18
    最大努力通知事务主要用于外部系统,因为外部的网络环境更加复杂和不可信,所以只能尽最大努力去通知实现数据最终一致,比如充值平台与运营商、支付对接、商户通知等等跨平台、跨企业的系统间业务交互场景;而事务...
  • 分布式事务数据一致解决方案

    千次阅读 2020-09-03 01:01:53
    2PC 两阶段提交 由一个事务协调器器去通知事务执行者准备事务、提交或回滚事务,它解决了数据一致问题,但是通信时间、资源锁定时间太长,系统的可用受到影响 利用RocketMq的消息事务机制,将生产者本地...
  • 中国会计师事务所百强名单正式公布! 最新中国会计师事务所百强名单正式公布! 2019年5月23日,中注协终于公布了《2018年度业务收入前100家会计师事务所信息》,最新一期的会计师事务所百强名单终于出炉! 今年...
  • 背景:订单完成支付,通知商户商户系统接口必须实现幂等订单服务提供商户订单查询接口流程:消息生产端:完成事件 -&gt; 调用消息服务,发送消息消息消费端:接收消息 -&gt; 调用通知服务(判断该消息未...
  • 来源:https://0x9.me/YgaUc在之前的文章中,我们已经学习总结了分布式事务的两种解决方案。我说分布式事务之TCC我说分布式事务之最大努力通知事务本文我们...
  • 分布式事务一致

    千次阅读 2018-05-25 17:18:45
    有人的地方,就有江湖有江湖的地方,就有纷争问题的起源在电商等业务中,系统一般由多个独立的服务组成,如何解决分布式调用时候数据的一致? 具体业务场景如下,比如一个业务操作,如果同时调用服务 A、B、C,...
  • 所以就采用了RocketMQ的事务消息来实现分布式事务的一致。 RocketMQ 前文中的阿里的rocketMq集成的ons框架配置以及普通、延时、定时消息实现的文章 一、事务消息类型 RockectMQ事务消息提供了X/Open XA的分布式...
  • 面试官问我知道的分布式事务,我一口气说了六种

    万次阅读 多人点赞 2020-08-14 11:07:10
    不过暖男为了保证文章的完整确保所有人都听得懂,我还是得先说说 ACID,然后再来介绍下什么是分布式事务和常见的分布式事务包括 2PC、3PC、TCC、本地消息表、消息事务、最大努力通知事务 严格意义上的事务实现...
  • 互联网大背景下,微服务盛行,平时开发中难免会遇到分布式事务问题。...文章目录分布式事务产生背景数据库水平拆分微服务拆分分布式事务解决方案TCCXA事务(2PC)最终一致(消息事务)错误场景一错误

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 140,193
精华内容 56,077
关键字:

一般事务性通知