精华内容
下载资源
问答
  • Flink两阶段提交

    2021-05-24 17:10:08
  • Flink两阶段提交 1.EXACTLY_ONCE语义 2.Kafka的幂等性和事务 2.1 幂等性 2.2 事务 3.两阶段提交协议 4.TwoPhaseCommitSinkFunction 参考文献 Flink两阶段提交 1.EXACTLY_ONCE语义 ...

    begin transaction之后:数据先写入broker的内存中(临时目录下的临时文件)

    precommit开始时:flush到磁盘,关闭该临时文件

    commit开始时:把precommit的文件移动到真实的目的路径下,在broker中开始对消费者可见。

    当然有延迟,比如1min做一次checkpoint,则每隔1min下游才能消费到数据

    ------------------------------------------------

    1 在checkpoint之间是一直begin_transaction,之后开始checkpoint后是pre_commit, 数据写入kafka broker, 同时在transaction协调器以及 消费者处理器之间写入本次事务的元信息;等到本次从source到sink之间的checkpoint全部完成,job manager完成checkpoint时,sink的producer才正式发出事务commit,这时候kafka之前写入broker的log才会对消费者显示有效可以读取。

    以下信息取自:https://zhuanlan.zhihu.com/p/111304281

    具体代码层面:

    flink kafkasink的TwoPhaseCommitSinkFunction仍然留了以下四个抽象方法待子类来实现:

    beginTransaction():开始一个事务,返回事务信息的句柄。

    preCommit():预提交(即提交请求)阶段的逻辑。

    commit():正式提交阶段的逻辑。

    abort():取消事务

    这四个方法实际上是flink对于sink端支持两阶段事务提交中间件的抽象,具体针对kafka sink来说,应用了kafka对应的两阶段提交的方法。

    1.1 预提交阶段

    FlinkKafkaProducer011.preCommit()方法的实现很简单。其中的flush()方法实际上是代理了KafkaProducer.flush()方法。

    那么preCommit()方法是在哪里使用的呢?答案是TwoPhaseCommitSinkFunction.snapshotState()方法。从前面的类图可以得知,TwoPhaseCommitSinkFunction也继承了CheckpointedFunction接口,所以2PC是与检查点机制一同发挥作用的。

    每当需要做checkpoint时,JobManager就在数据流中打入一个屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端(state BackEnd)的动作。当屏障到达Kafka sink后,触发preCommit(实际上是KafkaProducer.flush())方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段。

    1.2 提交阶段

    FlinkKafkaProducer011.commit()方法实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。

    该方法的调用点位于TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中。顾名思义,当所有检查点都成功完成之后,会回调这个方法。

    该方法每次从正在等待提交的事务句柄中取出一个,校验它的检查点ID,并调用commit()方法提交之。

    1.3 回退阶段

    可见,只有在所有检查点都成功完成这个前提下,写入才会成功。这符合前文所述2PC的流程,其中JobManager为协调者,各个算子为参与者(不过只有sink一个参与者会执行提交)。一旦有检查点失败,notifyCheckpointComplete()方法就不会执行。如果重试也不成功的话,最终会调用abort()方法回滚事务。
     

    普遍意义上flink的两阶段提交是这样的:

    Let’s discuss how to extend a `TwoPhaseCommitSinkFunction` on a simple file-based example. We need to implement only four functions and present their implementations for an exactly-once file sink:

    1. beginTransaction - to begin the transaction, we create a temporary file in a temporary directory on our destination file system. Subsequently, we can write data to this file as we process it.
    2. preCommit - on pre-commit, we flush the file, close it, and never write to it again. We’ll also start a new transaction for any subsequent writes that belong to the next checkpoint.
    3. commit - on commit, we atomically move the pre-committed file to the actual destination directory. Please note that this increases the latency in the visibility of the output data.
    4. abort - on abort, we delete the temporary file.

    为了开始事务,我们在目标文件系统的临时目录中创建一个临时文件。随后,我们可以在处理该文件时将数据写入该文件。

    预提交-在预提交时,我们刷新文件,关闭它,再也不写入它。我们还将为属于下一个检查点的任何后续写入启动一个新事务。

    提交-在提交时,我们原子地将预提交的文件移动到实际的目标目录。请注意,这会增加输出数据可见性的延迟。

    中止-中止时,我们删除临时文件。

     

    2 kafka的幂等性可以防止重复提交。每个producer可以用户显式的指定transactionId, 这样producer Id以及topic、partition和生产者发送的每条消息的递增的seq Id.这些可以唯一确定一条消息,相当于唯一键,防止broker内部数据重复写入。

    https://blog.csdn.net/longlovefilm/article/details/104908006

    ------------------------------------------------

     flush操作(将文件缓存刷到磁盘上)。

    https://blog.csdn.net/weixin_29970399/article/details/113583615?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_v2~rank_aggregation-11-113583615.pc_agg_rank_aggregation&utm_term=flush+linux+%E5%86%99%E6%96%87%E4%BB%B6&spm=1000.2123.3001.4430

    ------------------------------------------------

    1 FlinkKafkaProducer继承了TwoPhaseCommitSinkFunction

    2 TwoPhaseCommitSinkFunction类继承了RichSinkFunction类且实现了CheckPointedFunction接口以及CheckPointListener接口
    3 FlinkKafkaProducer重写了TwoPhaseCommitSinkFunction类中的invoke()方法以及CheckPointedFunction接口中的initializeState()方法和snapshotState()方法和CheckPointListener接口中的notifyCheckPointComplete()方法
    4 数据写出需要进行的几个步骤:
    (1)首先会调父类TwoPhaseCommitSinkFunction中的initializeState()方法初始化状态,如果以前有失败的事务,则会针对失败的事务再次提交-调用initializeState()->recoverAndCommitInternal()->recoverAndCommit()
    ->commit(),且initializeState()方法每个subTask只会调用一次

    (2)调用FlinkKafkaProducer的invoke()方法,直到执行snapshotState()方法中的checkpoint,因为进行了checkpoint,所以事务失败了也会进行回滚,回到事务失败前的状态再次进行处理
    (3)会调snapshotState()方法中的precommit()方法
    (4)再调notifyCheckPointComplete()方法中的commit()方法

    (5)最后调snapshotState()方法中的老状态清除新状态添加的方法

    https://blog.csdn.net/m0_47444428/article/details/112860481?utm_medium=distribute.pc_relevant_t0.none-task-blog-OPENSEARCH-1.control&dist_request_id=ab5ba192-0737-44e3-8879-3777dfc2bfe7&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-OPENSEARCH-1.control

    ------------------------------------------------

    文章目录

     

    Flink两阶段提交

    1.EXACTLY_ONCE语义

    EXACTLY_ONCE语义简称EOS,指的是每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次,Flink一直宣称自己支持EOS,实际上主要是对于Flink应用内部来说的,对于外部系统(端到端)则有比较强的限制

    • 外部系统写入支持幂等性
    • 外部系统支持以事务的方式写入

    Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,并在Kafka Producer的connector中实现了它,支持了对外部Kafka Sink的EXACTLY_ONCE语义。

    2.Kafka的幂等性和事务

    Kafka在0.11版本之前只能保证At-Least-Once或At-Most-Once语义,从0.11版本开始,引入了幂等发送和事务,从而开始保证EXACTLY_ONCE语义,下面来看看Kafka中幂等发送和事务的原理:

    2.1 幂等性

    在未引入幂等性时,Kafka正常发送和重试发送消息流程图如下:
    在这里插入图片描述
    在这里插入图片描述
    为了实现Producer的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。

    Producer发送每条消息<Topic, Partition>对于Sequence Number会从0开始单调递增,broker端会为每个<PID, Topic, Partition>维护一个序号,每次commit一条消息此序号加一,对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)正好大1,则Broker会接受它,否则将其丢弃:

    • 序号比Broker维护的序号大1以上,说明存在乱序。
    • 序号比Broker维护的序号小,说明此消息以及被保存,为重复数据。

    有了幂等性,Kafka正常发送和重试发送消息流程图如下:
    在这里插入图片描述
    在这里插入图片描述
    幂等性机制仅解决了单分区上的数据重复和乱序问题,对于跨session和所有分区的重复和乱序问题不能得到解决。于是需要引入事务。

    2.2 事务

    事务是指所有的操作作为一个原子,要么都成功,要么都失败,而不会出现部分成功或部分失败的可能。举个例子,比如小明给小王转账1000元,那首先小明的账户会减去1000,然后小王的账户会增加1000,这两个操作就必须作为一个事务,否则就会出现只减不增或只增不减的问题,因此要么都失败,表示此次转账失败。要么都成功,表示此次转账成功。分布式下为了保证事务,一般采用两阶段提交协议。

    为了解决跨session和所有分区不能EXACTLY-ONCE问题,Kafka从0.11开始引入了事务。

    为了支持事务,Kafka引入了Transacation Coordinator来协调整个事务的进行,并可将事务持久化到内部topic里,类似于offset和group的保存。

    用户为应用提供一个全局的Transacation ID,应用重启后Transacation ID不会改变。为了保证新的Producer启动后,旧的具有相同Transaction ID的Producer即失效,每次Producer通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。有了Transaction ID后,Kafka可保证:

    • 跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的Producer停止工作。
    • 跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作。

    KPI-98对Kafka事务原理进行了详细介绍,完整的流程图如下:
    在这里插入图片描述

    1. Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址

    2. 找到Transaction Coordinator后,具有幂等特性的Producer必须发起InitPidRequest请求以获取PID。

      2.1 当设置了TransactionalId
      如果开启了事务特性,设置了TransactionalId,则TransactionalId会和InitPidRequest请求一起传递,并且在步骤2a中将TransactionalId和对应的PID持久化到事务日志中,这使我们能够将TransactionalId的相同PID返回给producer的未来实例,从而使恢复或中止先前未完成的事务成为可能。除了返回PID外,InitPidRequest还会执行如下任务:

      • 增加该PID对应的epoch。具有相同PID但epoch小于该epoch的其它Producer(如果有)新开启的事务将被拒绝。
      • 恢复(Commit或Abort)之前的Producer未完成的事务(如果有)。
        注意:InitPidRequest的处理过程是同步阻塞的。一旦该调用正确返回,Producer即可开始新的事务。

      2.2 当没有设置TransactionalId
      如果事务特性未开启,InitPidRequest可发送至任意Broker,并且会得到一个全新的唯一的PID。该Producer将只能使用幂等特性以及单一Session内的事务特性,而不能使用跨Session的事务特性。

    3. 调用beginTransaction()方法开启一个事务,Producer本地会记录已经开启了事务,但Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。

    4. Consume-Transform-Produce
      这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。

      4.1 AddPartitionsToTxnRequest
      一个Producer可能会给多个<Topic, Partition>发送数据,给一个新的<Topic, Partition>发送数据前,它需要先向Transaction Coordinator发送AddPartitionsToTxnRequest。Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN,如上图中步骤4.1所示。有了该信息后,我们才可以在后续步骤中为每个Topic, Partition>设置COMMIT或者ABORT标记(如上图中步骤5.2所示)。另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。

      4.2 ProduceRequest
      Producer通过一个或多个ProduceRequest发送一系列消息。除了应用数据外,该请求还包含了PID,epoch,和Sequence Number。该过程如上图中步骤4.2所示。

      4.3 AddOffsetCommitsToTxnRequest
      为了提供事务性,Producer新增了sendOffsetsToTransaction方法,该方法将多组消息的发送和消费放入同一批处理内。

      该方法先判断在当前事务中该方法是否已经被调用并传入了相同的Group ID。若是,直接跳到下一步;若不是,则向Transaction Coordinator发送AddOffsetsToTxnRequests请求,Transaction Coordinator将对应的所有<Topic, Partition>存于Transaction Log中,并将其状态记为BEGIN,如上图中步骤4.3所示。该方法会阻塞直到收到响应。TxnOffsetCommitRequest作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的Offset持久化到内部的__consumer_offsets中,如上图步骤4.3a

      4.4 TxnOffsetCommitRequest
      作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的Offset持久化到内部的__consumer_offsets中,如上图步骤4.4所示。在此过程中,Consumer Coordinator会通过PID和对应的epoch来验证是否应该允许该Producer的该请求。
      这里需要注意:

      • 写入__consumer_offsets的Offset信息在当前事务Commit前对外是不可见的。也即在当前事务被Commit前,可认为该Offset尚未Commit,也即对应的消息尚未被完成处理。
      • Consumer Coordinator并不会立即更新缓存中相应<Topic, Partition>的Offset,因为此时这些更新操作尚未被COMMIT或ABORT。
    5. 提交或回滚事务
      一旦上述数据写入操作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

      5.1 EndTxnRequest
      commitTransaction方法使得Producer写入的数据对下游Consumer可见。abortTransaction方法通过Transaction Marker将Producer写入的数据标记为Aborted状态。下游的Consumer如果将isolation.level设置为READ_COMMITTED,则它读到被Abort的消息后直接将其丢弃而不会返回给客户程序,也即被Abort的消息对应用程序不可见。

      无论是Commit还是Abort,Producer都会发送EndTxnRequest请求给Transaction Coordinator,并通过标志位标识是应该Commit还是Abort。

      收到该请求后,Transaction Coordinator会进行如下操作

      • 将PREPARE_COMMIT或PREPARE_ABORT消息写入Transaction Log,如上图中步骤5.1所示
      • 通过WriteTxnMarker请求以Transaction Marker的形式将COMMIT或ABORT信息写入用户数据日志以及Offset Log中,如上图中步骤5.2所示
      • 最后将COMMIT或ABORT信息写入Transaction Log中,如上图中步骤5.3所示。

      上述第二步是实现将一组读操作与写操作作为一个事务处理的关键。因为Producer写入的数据Topic以及记录Comsumer Offset的Topic会被写入相同的Transactin Marker,所以这一组读操作与写操作要么全部COMMIT要么全部ABORT。

      5.2 WriteTxnMarkerRequest
      上面提到的WriteTxnMarkerRequest由Transaction Coordinator发送给当前事务涉及到的每个<Topic, Partition>的Leader。收到该请求后,对应的Leader会将对应的COMMIT(PID)或者ABORT(PID)控制信息写入日志,如上图中步骤5.2所示。

      该控制消息向Broker以及Consumer表明对应PID的消息被Commit了还是被Abort了。

      这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的Offset存于__consumer_offsets中,Transaction Coordinator也需要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息。

      5.2 写入最终的COMMIT或ABORT消息
      写完所有的Transaction Marker后,Transaction Coordinator会将最终的COMMIT或ABORT消息写入Transaction Log中以标明该事务结束,如上图中步骤5.3所示。

      此时,Transaction Log中大多数关于该事务的消息全部可以移除。当然,由于Kafka内数据是Append Only的,不可直接更新和删除,这里说的移除只是将其标记为null从而在Log Compact时不再保留。

      我们只需要保留已完成事务的PID和时间戳,因此最终可以为生产者删除TransactionalId-> PID映射。

    3.两阶段提交协议

    两阶段提交指的是一种协议,经常用来实现分布式事务,可以简单理解为预提交+实际提交,一般分为协调器Coordinator(以下简称C)和若干事务参与者Participant(以下简称P)两种角色。
    在这里插入图片描述

    1. C先将prepare请求写入本地日志,然后发送一个prepare的请求给P
    2. P收到prepare请求后,开始执行事务,如果执行成功返回一个Yes或OK状态给C,否则返回No,并将状态存到本地日志。
    3. C收到P返回的状态,如果每个P的状态都是Yes,则开始执行事务Commit操作,发Commit请求给每个P,P收到Commit请求后各自执行Commit事务操作。如果至少一个P的状态为No,则会执行Abort操作,发Abort请求给每个P,P收到Abort请求后各自执行Abort事务操作。
      注:C或P把发送或接收到的消息先写到日志里,主要是为了故障后恢复用,类似WAL
      在这里插入图片描述
      在这里插入图片描述

    4.TwoPhaseCommitSinkFunction

    Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,封装了两阶段提交逻辑,并在Kafka Sink connector中实现了TwoPhaseCommitSinkFunction,依赖Kafka版本为0.11+,TwoPhaseCommitSinkFunction具体实现如下:
    在这里插入图片描述

    //TwoPhaseCommitSinkFunction
    	public void initializeState(FunctionInitializationContext context) throws Exception {
    		// 当我们通过提交中事务恢复状态时,我们并不知道事务已经提交了,还是在checkpoint执行完成(在master端)和在此处通知checkpoint完成之间失败了。
    		// 通常情况下是已经提交了,因为checkpoint在master执行完成和在此处通知checkpoint完成之间的窗口非常小
    		// 如果在第一次checkpoint完成之前失败,则可能没有任何事务,或者扩容的情况下,会有一部分新的task没有事务
    		// 如果发生缩容事件,或者由于“notifycheckpointcomplete()”方法中讨论的原因,我们可以检查多个事务。
    		state = context.getOperatorStateStore().getListState(stateDescriptor);
    		boolean recoveredUserContext = false;
    		// 从上一次恢复
    		if (context.isRestored()) {
    			for (State<TXN, CONTEXT> operatorState : state.get()) {
    				userContext = operatorState.getContext();
    				List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
    				for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
    					recoverAndCommitInternal(recoveredTransaction);
    				}
    				recoverAndAbort(operatorState.getPendingTransaction().handle);
    				if (userContext.isPresent()) {
    					finishRecoveringContext();
    					recoveredUserContext = true;
    				}
    			}
    		}
    		if (!recoveredUserContext) {
    			userContext = initializeUserContext();
    		}
    		this.pendingCommitTransactions.clear();
    		//创建生产者事务,并返回句柄
    		currentTransactionHolder = beginTransactionInternal();
    	}
    
      public void snapshotState(FunctionSnapshotContext context) throws Exception {
    		long checkpointId = context.getCheckpointId();
    		//预提交,如果语义为EXACTLY_ONCE,执行flush操作
    		preCommit(currentTransactionHolder.handle);
    		//pendingCommitTransactions插入当次检查点对应的currentTransactionHolder,包含事务生产者的实例(对于EXACTLY_ONCE模式)
    		pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
    		//这里又初始化了一次包含事务生产者的实例(对于EXACTLY_ONCE模式),并赋给currentTransactionHolder
    		currentTransactionHolder = beginTransactionInternal();
    		//清空state
    		state.clear();
    		//
    		state.add(new State<>(
    			this.currentTransactionHolder,
    			new ArrayList<>(pendingCommitTransactions.values()),
    			userContext));
    	}
    
    	public final void notifyCheckpointComplete(long checkpointId) throws Exception {
    		// 可能出现以下几种情况
    		// (1) 从最近的检查点触发并完成的事务恰好只有一个。 这应该很常见,在这种情况下只需提交该事务即可。
    		// (2) 由于上一次checkpoint被跳过导致这里有多个正在进行的事务,这是一种罕见的情况,但可能在以下情况下发生:
    		//     - 上一次checkpoint未能持久化metadata(存储系统临时中断)但可以保留一个连续的检查点(此处通知的检查点)
    		//     - 其他task未能在上一次checkpoint持久化他们的状态,但未触发失败,因为他们可以保持其状态并将其成功保存在连续的检查点中(此处通知的检查点)
    		//    在这两种情况下,前一个检查点都不会达到提交状态,但此检查点总是希望包含前一个检查点,并覆盖自上一个成功检查点以来的所有更改。因此,我们需要提交所有待提交的事务。
    		// (3) 多个事务处于待提交状态,但检查点完成通知与最新的不相关。这是可能的,因为通知消息可能会延迟(在极端情况下,直到触发下一个检查点之后到达)并且可能会有并发的重叠检查点(新的检查点在上一个完全完成之前启动)。
    		// ==> 永远不会有我们这里没有待提交事务的情况
    		
    		//待提交的事务版本和事务句柄
    		Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
    		Throwable firstError = null;
    		while (pendingTransactionIterator.hasNext()) {
    			Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
    			Long pendingTransactionCheckpointId = entry.getKey();
    			TransactionHolder<TXN> pendingTransaction = entry.getValue();
    			if (pendingTransactionCheckpointId > checkpointId) {
    				continue;
    			}
    			try {
    				//提交事务(最终调用commitTransaction)
    				commit(pendingTransaction.handle);
    			} catch (Throwable t) {
    				//
    			}
    			pendingTransactionIterator.remove();
    		}
    	}
    

     

    //FlinkKafkaProducer011
    	public void initializeState(FunctionInitializationContext context) throws Exception {
    		//如果检查点未开启,语义设置为NONE
    		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
    			semantic = Semantic.NONE;
    		}
    		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
    			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
    		//初始化事务ID生成器
    		transactionalIdsGenerator = new TransactionalIdsGenerator(
    			getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
    			getRuntimeContext().getIndexOfThisSubtask(),
    			getRuntimeContext().getNumberOfParallelSubtasks(),
    			kafkaProducersPoolSize,
    			SAFE_SCALE_DOWN_FACTOR);
    		if (semantic != Semantic.EXACTLY_ONCE) {
    			nextTransactionalIdHint = null;
    		} else {
    			//如果为EXACTLY_ONCE语义,初始化nextTransactionalIdHint(初始化lastParallelism和nextFreeTransactionalId为0),后面用来生成多个事务ID
    			ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
    			if (transactionalIdHints.size() > 1) {
    			} else if (transactionalIdHints.size() == 0) {
    				nextTransactionalIdHint = new NextTransactionalIdHint(0, 0);
    				abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
    			} else {
    				nextTransactionalIdHint = transactionalIdHints.get(0);
    			}
    		}
    		//调用父类的initializeState方法
    		super.initializeState(context);
    	}
    
    
    	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    		//调用父类的snapshotState方法
    		super.snapshotState(context);
    		//清空nextTransactionalIdHintState
    		nextTransactionalIdHintState.clear();
    		//避免每个task上做同样的操作,这里只在第一个task上完成nextFreeTransactionalId的初始化
    		if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) {
    			long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
    			if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
    				nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
    			}
    			nextTransactionalIdHintState.add(new NextTransactionalIdHint(
    				getRuntimeContext().getNumberOfParallelSubtasks(),
    				nextFreeTransactionalId));
    		}
    	}
    

     

    Flink Kafka Sink执行两阶段提交的流程图大致如下:
    在这里插入图片描述
    假设一种场景,从Kafka Source拉取数据,经过一次窗口聚合,最后将数据发送到Kafka Sink,如下图:
    在这里插入图片描述

    1. JobManager向Source发送Barrier,开始进入pre-Commit阶段,当只有内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些已定义的状态变量即可。当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉它们。
    2. 当Source收到Barrier后,将自身的状态进行保存,后端可以根据配置进行选择,这里的状态是指消费的每个分区对应的offset。然后将Barrier发送给下一个Operator。
      在这里插入图片描述
    3. 当Window这个Operator收到Barrier之后,对自己的状态进行保存,这里的状态是指聚合的结果(sum或count的结果),然后将Barrier发送给Sink。Sink收到后也对自己的状态进行保存,之后会进行一次预提交。
      在这里插入图片描述
    4. 预提交成功后,JobManager通知每个Operator,这一轮检查点已经完成,这个时候,Kafka Sink会向Kafka进行真正的事务Commit。
      在这里插入图片描述

    以上便是两阶段的完整流程,提交过程中如果失败有以下两种情况

    1. Pre-commit失败,将恢复到最近一次CheckPoint位置
    2. 一旦pre-commit完成,必须要确保commit也要成功

    因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。

    参考文献

    https://cwiki.apache.org/confluence/display/KAFKA/KIP-98±+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-1.Findingatransactioncoordinator–theFindCoordinatorRequest
    https://cloud.tencent.com/developer/article/1149669
    https://segmentfault.com/a/1190000019329884
    https://www.cnblogs.com/felixzh/p/10184762.html

     

    转自:https://blog.csdn.net/lisenyeahyeah/article/details/90288231

     

    更多企业内的技术应用和使用技巧,请移步至我的公众号【程序员实用技能】

    图片

    展开全文
  • 顾名思义,2PC将分布式事务分成了个阶段,个阶段分别为提交请求(投票)和提交(执行)。协调者根据参与者的响应来决定是否需要真正地执行事务,具体流程如下。 提交请求(投票)阶段 协调者向所有参与者发送...

    在分布式系统中,为了让每个节点都能够感知到其他节点的事务执行状况,需要引入一个中心节点来统一处理所有节点的执行逻辑,这个中心节点叫做协调者(coordinator),被中心节点调度的其他业务节点叫做参与者(participant)。

    接下来正式介绍2PC。顾名思义,2PC将分布式事务分成了两个阶段,两个阶段分别为提交请求(投票)和提交(执行)。协调者根据参与者的响应来决定是否需要真正地执行事务,具体流程如下。

    提交请求(投票)阶段

    • 协调者向所有参与者发送prepare请求与事务内容,询问是否可以准备事务提交,并等待参与者的响应。
    • 参与者执行事务中包含的操作,并记录undo日志(用于回滚)和redo日志(用于重放),但不真正提交。
    • 参与者向协调者返回事务操作的执行结果,执行成功返回yes,否则返回no。

    提交(执行)阶段

    分为成功与失败两种情况。

    若所有参与者都返回yes,说明事务可以提交:

    • 协调者向所有参与者发送commit请求。
    • 参与者收到commit请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回ack。
    • 协调者收到所有参与者的ack消息,事务成功完成。

    若有参与者返回no或者超时未返回,说明事务中断,需要回滚:

    • 协调者向所有参与者发送rollback请求。
    • 参与者收到rollback请求后,根据undo日志回滚到事务执行前的
    • 协调者收到所有参与者的ack消息,事务回滚完成。
      下图分别示出这两种情况:
      提交成功:
      提交失败:

    Flink基于2PC的事务性写入

    Flink提供了基于2PC的SinkFunction,名为TwoPhaseCommitSinkFunction,帮助我们做了一些基础的工作。它的第一层类继承关系如下:
    但是TwoPhaseCommitSinkFunction仍然留了以下四个抽象方法待子类来实现:

       protected abstract TXN beginTransaction() throws Exception;
        protected abstract void preCommit(TXN transaction) throws Exception;
        protected abstract void commit(TXN transaction);
        protected abstract void abort(TXN transaction);
    
    • beginTransaction():开始一个事务,返回事务信息的句柄。

    • preCommit():预提交(即提交请求)阶段的逻辑。

    • commit():正式提交阶段的逻辑。

    • abort():取消事务。

    下面以Flink与Kafka的集成来说明2PC的具体流程。注意这里的Kafka版本必须是0.11及以上,因为只有0.11+的版本才支持幂等producer以及事务性,从而2PC才有存在的意义。

    开始事务

    看下FlinkKafkaProducer011类实现的beginTransaction()方法:

    @Override
        protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
            switch (semantic) {
                case EXACTLY_ONCE:
                    FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
                    producer.beginTransaction();
                    return new KafkaTransactionState(producer.getTransactionalId(), producer);
                case AT_LEAST_ONCE:
                case NONE:
                    // 如果是已经有事务存在,就无需每次都状态kafka事务生产者,直接复用,否则就创建一个非事务生产者
                    final KafkaTransactionState currentTransaction = currentTransaction();
                    if (currentTransaction != null && currentTransaction.producer != null) {
                        return new KafkaTransactionState(currentTransaction.producer);
                    }
                    return new KafkaTransactionState(initNonTransactionalProducer(true));
                default:
                    throw new UnsupportedOperationException("Not implemented semantic");
            }
    }
    

    如果在Flink里面明确要求exactly once语义时,就会创建事务生产者并且启动事务。

    预提交阶段

    FlinkKafkaProducer011.preCommit()方法的实现很简单。其中的flush()方法实际上是代理了KafkaProducer.flush()方法。

     @Override
        protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
            switch (semantic) {
                case EXACTLY_ONCE:
                case AT_LEAST_ONCE:
                    flush(transaction);
                    break;
                case NONE:
                    break;
                default:
                    throw new UnsupportedOperationException("Not implemented semantic");
            }
            checkErroneous();
        }
    

    那么preCommit()方法是在哪里使用的呢?答案是TwoPhaseCommitSinkFunction.snapshotState()方法。从前面的类图可以得知,TwoPhaseCommitSinkFunction也继承了CheckpointedFunction接口,所以2PC是与检查点机制一同发挥作用的。

     @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // 下面就是类2pc事务实现,提交就绪,并且记录事务日志
            checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
    
            long checkpointId = context.getCheckpointId();
            LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
    
            preCommit(currentTransactionHolder.handle);
            pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
            LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
    
            currentTransactionHolder = beginTransactionInternal();
            LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
    
            state.clear();
            state.add(new State<>(
                this.currentTransactionHolder,
                new ArrayList<>(pendingCommitTransactions.values()),
                userContext));
    }
    

    结合Flink检查点的原理,可以用下图来形象地表示预提交阶段的流程:
    每当需要做checkpoint时,JobManager就在数据流中打入一个屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端(state BackEnd)的动作。当屏障到达Kafka sink后,触发preCommit(实际上是KafkaProducer.flush())方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段。

    提交阶段

    FlinkKafkaProducer011.commit()方法实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。

       @Override
        protected void commit(KafkaTransactionState transaction) {
            if (transaction.isTransactional()) {
                try {
                    transaction.producer.commitTransaction();
                } finally {
                    recycleTransactionalProducer(transaction.producer);
                }
            }
        }
    

    该方法的调用点位于TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中。顾名思义,当所有检查点都成功完成之后,会回调这个方法。

    @Override
        public final void notifyCheckpointComplete(long checkpointId) throws Exception {
            Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
            checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
            Throwable firstError = null;
    
            while (pendingTransactionIterator.hasNext()) {
                Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
                Long pendingTransactionCheckpointId = entry.getKey();
                TransactionHolder<TXN> pendingTransaction = entry.getValue();
                if (pendingTransactionCheckpointId > checkpointId) {
                    continue;
                }
                LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                    name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
                logWarningIfTimeoutAlmostReached(pendingTransaction);
                try {
                    commit(pendingTransaction.handle);
                } catch (Throwable t) {
                    if (firstError == null) {
                        firstError = t;
                    }
                }
                LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
                pendingTransactionIterator.remove();
            }
            if (firstError != null) {
                throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                    firstError);
            }
    }
    

    该方法每次从正在等待提交的事务句柄中取出一个,校验它的检查点ID,并调用commit()方法提交之。这阶段的流程可以用下图来表示:
    可见,只有在所有检查点都成功完成这个前提下,写入才会成功。这符合前文所述2PC的流程,其中JobManager为协调者,各个算子为参与者(不过只有sink一个参与者会执行提交)。一旦有检查点失败,notifyCheckpointComplete()方法就不会执行。如果重试也不成功的话,最终会调用abort()方法回滚事务。

      @Override
        protected void abort(KafkaTransactionState transaction) {
            if (transaction.isTransactional()) {
                transaction.producer.abortTransaction();
                recycleTransactionalProducer(transaction.producer);
            }
        }
    

    2PC的缺点

    1、协调者存在单点问题。如果协调者挂了,整个2PC逻辑就彻底不能运行。

    2、执行过程是完全同步的。各参与者在等待其他参与者响应的过程中都处于阻塞状态,大并发下有性能问题。

    3、仍然存在不一致风险。如果由于网络异常等意外导致只有部分参与者收到了commit请求,就会造成部分参与者提交了事务而其他参与者未提交的情况。

    展开全文
  • 此时我们说checkpoint就是整个应用的全局状态,当然也包含pre-commit阶段提交的外部状态。当出现崩溃时,我们可以回滚状态到最新已成功完成快照时的时间点。整个过程为预提交阶段 2、commit阶段:下一步就是通知...

    1、pre-commit阶段:当checkpoint barrier在所有operator都传递了一遍且对应的快照也都成功完成之后,这个pre-commit阶段才算完成。该过程中所有创建的快照都被视为是checkpoint的一部分。此时我们说checkpoint就是整个应用的全局状态,当然也包含pre-commit阶段提交的外部状态。当出现崩溃时,我们可以回滚状态到最新已成功完成快照时的时间点。整个过程为预提交阶段

    2、commit阶段:下一步就是通知所有的operator,告诉它们checkpoint已成功完成。这便是两阶段提交协议的第二个阶段:commit阶段。该阶段中JobManager会为应用中每个operator发起checkpoint已完成的回调逻辑。

    3、两阶段提交,最核心是要上下游都要支持事务,否则消费的数据不能重新消费或者写入的数据不能撤回。比如上游kafka天然支持重新读取数据,下游kafka开启幂和事务保证数据不重复 等操作。因为redis不支持写入事务,所以不能保证精准一次

    4、mysql相关的两阶段提交代码 https://blog.51cto.com/simplelife/2401521

     

    https://www.dazhuanlan.com/2020/03/23/5e788dc5c786c/?__cf_chl_jschl_tk__=7a0d8a8dbfa3f226d3643928fa62d00c681a59ec-1609407060-0-AfVEiXUt4YDdYSIdJM24zkngKlzJTpYDBlYpV2qU9GA4_RXnzYnDdP9zvw5dP95_UmJvA_8ovIrH7DkmuXSzz2hgvIsOFlQu7GnapIrEhPtaLSWf4ALrZh3T_3ZvgwRuM1xeuAV-1mPBEnVHqEUFp5l3D0kb-LjcjuA55ZxkQ-dstLQumKPNscUDxa9tSyMkUGHfCpt1Q3rTkrKymUxrPvciv7KgddCO3SvdfrHfnmcfkwmCpWvzsOQ5NNLdvEBSsYXYFsTYkj88_d8UuuuhXqqT-yqge7Bh-0eihytokwS8Wu2WDkICpaDnnQvbgnVJ0Ro88aGPQN7gcNYEWPtypGc

    展开全文
  • 最近在一边学习,一边将Flink流处理技术应用到公司的业务场景中。目前会通过采集方式将数据库的数据变更写入到Kafka,后面通过Flink处理后,落地到数仓中,因为目前数仓对外提供的是一些接口服务,所以目前还是以...
  • 3.在Flink KafkaProducer中继承了TwoPhaseCommitSinkFunction来实现阶段提交的功能(要弄清楚阶段分别干了什么事) 该类下 有四个子类 protected abstract TXN beginTransaction() throws Exceptio...
  • 一、前言 根据维基百科的定义,两阶段提交(Two-phase Commit,简称2PC)是巨人们用来解决分布式系统架构下的所有...本文基于Flink 1.12.4,和大家一起拜读Flink两阶段提交的源码。 二、2PC简介 1. 定义 根据维基百科的
  • 一、flink Exactly-Once与At-Least-Once 关于消息的消费、处理语义可以分为三类: 1. at most once : 至多一次,表示一条消息不管后续处理成功与否只会被消费处理一次,那么就存在数据丢失可能 2\. exactly ...
  • 阶段提交 1 FlinkKafkaProducer继承了TwoPhaseCommitSinkFunction 类 2 TwoPhaseCommitSinkFunction类继承了RichSinkFunction类且实现了CheckPointedFunction接口以及CheckPointListener接口 3 FlinkKafka...
  • flink的事务之阶段提交

    千次阅读 2020-04-14 10:31:41
    场景描述:阶段提交(two-phase commit, 2PC)是最基础的分布式一致性协议,应用广泛。本文来介绍它的相关细节以及它在Flink中的典型应用场景。。
  • Flink阶段提交流程的个人理解

    千次阅读 2021-01-12 15:13:17
    commit都会被中止,flink回滚到最近成功完成的checkpoint),所有的Subtask都会收到JobManager发来的NotifyCheckpointComplete消息, 此时对Sink做commit,完成阶段提交的过程, 5.3、此时这个周期发送的数据才会对...
  • 两段提交: 预提交 确认提交 Flink通过checkpoint来保存数据是否处理完成的状态 由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件...
  • Flink实现阶段提交 参考 在Flink 1.4.0之前,Flink只能做到应用程序内的精确一次处理(exactly-once semantic),而无法做到端到端的精确一次(end-to-end exactly-once semantics)处理。 要提供端到端的精确一...
  • 在上一章中介绍了阶段提交的基本思路以及如何根据checkpoint机制来实现阶段提交思路,flink给出来阶段提交抽象实现TwoPhaseCommitSinkFunction与具体实现FlinkKafkaProducer011。 3....
  • 项目中需要使用Flink消费Kafka中的数据,然后使用二阶段提交的方式写入到MySQL里面。网上找到了一大堆相关的例子,但是没有一个是靠谱的,全TM是复制粘贴而且还是不能用的那种! 开发+调试,浪费了我好几天的时间,哎...
  • Reference: [1]Flink两阶段提交 [2]Flink 之 MySQL二阶提交
  • checkpoint.isDiscarded()) { // 待所有的 ack task 执行完成,才会 notifyCheckpointComplete 这也算是阶段提交 // flink task 是横向的,即每个 operator chain 的所有 subTask 都 acknowCheckpoint, 这...
  • 95-241-100-源码-Flink语义-Flink的exectly-once系列之阶段提交概述.pdf
  • 95-241-102-源码-Flink语义-Flink的exectly-once系列之阶段提交实现分析.pdf

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,910
精华内容 1,964
关键字:

flink两段提交