精华内容
下载资源
问答
  • 最大努力通知型( Best-effort delivery)是最简单的一种柔性事务,适用于一些最终一致时间敏感度低的业务,且被动方处理结果 不影响主动方的处理结果。典型的使用场景:如银行通知、商户通知等。最大努力通知型的...

    5.0 柔性事务:最大努力通知

     2018-02-05 02:51:05  5,956  0

        最大努力通知型( Best-effort delivery)是最简单的一种柔性事务,适用于一些最终一致性时间敏感度低的业务,且被动方处理结果 不影响主动方的处理结果。典型的使用场景:如银行通知、商户通知等。最大努力通知型的实现方案,一般符合以下特点:

        1、不可靠消息:业务活动主动方,在完成业务处理之后,向业务活动的被动方发送消息,直到通知N次后不再通知,允许消息丢失(不可靠消息)。

        2、定期校对:业务活动的被动方,根据定时策略,向业务活动主动方查询(主动方提供查询接口),恢复丢失的业务消息。

     

        举例来说:笔者曾经做过一个短信发送平台,背景是公司内部有多个业务都有发送短信的需求,如果每个业务独立实现短信发送功能,存在功能实现上的重复。因此专门做了一个短信平台项目,所有的业务方都接入这个短信平台,来实现发送短信的功能。简化后的架构如下所示: 

    E14FE394-01AF-45BE-A371-8B46611884BD.png

    短信发送流程如下:

    1、业务方将短信发送请求提交给短信平台

    2、短信平台接收到要发送的短信,记录到数据库中,并标记其状态为”已接收"

    3、短信平台调用外部短信发送供应商的接口,发送短信。外部供应商的接口也是异步将短信发送到用户手机上,因此这个接口调用后,立即返回,进入第4步。

    4、更新短信发送状态为"已发送"

    5、短信发送供应商异步通知短信平台短信发送结果。而通知可能失败,因此最多只会通知N次。

    6、短信平台接收到短信发送结果后,更新短信发送状态,可能是成功,也可能失败(如手机欠费)。到底是成功还是失败并不重要,重要的是我们知道了这调短信发送的最终结果

    7、如果最多只通知N次,如果都失败了的话,那么短信平台将不知道短信到底有没有成功发送。因此短信发送供应商需要提供一个查询接口,以方便短信平台驱动的去查询,进行定期校对。

     

        在这个案例中,短信发送供应商通知短信平台短信发送结果的过程中,就是最典型的最大努力通知型方案,通知了N次就不再通知。通过提供一个短信结果查询接口,让短信平台可以进行定期的校对。而由于短信发送业务的时间敏感度并不高,比较适合采用这个方案。

     

        需要注意的是,短信结果查询接口很重要,必须要进行定期校对。因为后期要进行对账,笔者在做这个项目的时候,一个月的短信发送总量在高峰期可以达到1亿条左右,即使一条短信只要5分钱,一个月就有500W。

        (这个5分钱是笔者估计的,在这么大的量的情况,一条短信到底需要多少钱我也不知道。因为商务对接过程,是没有我靓丽的身影的。总之短信发送要花很多钱,如果短信发送供应商说短信都发送成功了,而短信平台却一条成功的记录都没有,出现这种扯皮的情况就不好了)

     

        最后提一点:当当网开源数据库中间件sharding-jdbc使用了最大努力通知型来实现分库分表情况下数据的一致性,说实话,个人觉得最大努力通知型不太适合于这种场景。目前,sharding-jdbc也正在开发另一种柔性事务方案TCC,我们后面将要讲解。 

    展开全文
  • 聊聊分布式应用的分布式事务之最大努力通知事务 本文我们将学习到另一种常见的柔性事务解决方案:消息一致性事务方案。 对于TCC型事务,跨系统的调用均是基于服务间的直接调用,即很大程度上是同步调用。基于TCC...

    聊聊分布式应用的分布式事务2PC/3PC
    聊聊分布式应用的分布式事务TCC
    聊聊分布式应用的分布式事务之最大努力通知型事务
    聊聊分布式应用的分布式事务之消息最终一致性事务

    本文我们将学习到另一种常见的柔性事务解决方案:消息一致性事务方案。核心思想是将分布式事务拆分成本地事务进行处理,不同本地事务之间通过消息传递、确认和回滚

    对于TCC型事务,跨系统的调用均是基于服务间的直接调用,即很大程度上是同步调用。基于TCC方案能够保证主子事务同时成功,同时失败。

    但实际开发中,由于多方面的考虑,我们会将服务拆分为异步方式,一般是基于MQ进行服务间的解耦,服务发起方执行完本地业务操作后发送一条消息给到消息中间件(比如:RocketMQ、RabbitMQ、Kafka、ActiveMQ等),被动方服务从MQ中消费该消息并进行业务处理,从而形成业务上的闭环。

    这种场景下,我们还是希望异步的多个业务操作同时成功,同时失败,基于TCC的同步型事务解决方案就不可行了,这时就需要祭出可靠消息最终一致性方案。

    ① 实现可靠消息服务

    首先按照惯例我们先看一下该方案的简略的结构图,如下
    在这里插入图片描述

    ② 核心流程1:上游投递消息

    调用开始业务主动方(之后称为主动方)预先发送一条消息到消息服务(图中中间的部分)中,消息中包含后续的业务操作所必须的业务参数,消息服务接收到该消息后存储消息到消息存储中,并设置消息状态为 “待确认”。如果消息存储失败则直接返回消息持久化失败,本次业务操作结束。

    当主动方接收到消息存储结果后,开始执行本地的业务操作,根据本地事务提交的结果,调用消息服务的接口。这里分为两种状态:

    • 如果本地事务执行成功,就调用消息服务确认消息状态,更新为待发送
    • 如果本地事务执行失败,就调用消息服务删除消息(一般是逻辑删除,更新消息状态为已回滚

    2-1当状态为第1种(主动方本地事务执行成功,消息被更新为待发送),消息服务就将该消息发送到MQ中,并更新消息状态为“已发送”

    注意:对于消息状态的更新和投递消息到MQ中间件的操作应在消息服务同一个方法中,并开启本地事务。为什么要这么做呢?

    因为我们的目的是:保证消息状态更新和消息投递同时成功同时失败。

    这里还是有两种情况:

    • 如果更新消息状态失败,则应当抛出异常回滚事务,不投递消息到MQ中。
    • 如果投递MQ失败(需要捕获异常),需要主动抛出异常触发本地事务回滚。
    • 前两步要同时成功同时失败

    2-2当状态是第2种,即本地事务执行失败。

    业务主动方需要调用可靠消息事务的删除消息操作,消息服务从消息持久化存储中删除该消息(设置消息状态为已回滚)。


    ③ 被动方应用接收消息

    被动方服务订阅主题后只需要等待MQ投递消息即可。

    当消息投递成功后,被动方服务消费该消息并执行本地业务操作,当本地业务执行成功,被动方服务调用消息服务,返回本地业务执行成功。

    可靠消息服务根据业务唯一参数(订单号结合消息id)设置消息状态为 “已完成”

    整个过程中,作为被动方服务需要尽最大努力将业务向最终状态推进,最终成功或者失败并通知消息服务置消息状态为完成的终态。


    ④ 如何保证消息不丢失–即保证消息可靠投递

    这里分为多种情况进行讨论。

    开始阶段,主动方应用提交 待确认 消息时出错,此时主动方会直接感知到提交失败,业务直接返回失败,不处理后续的流程。

    主动方应用执行完成本地事务之后,通知可靠消息服务确认或者删除消息阶段,出了问题:例如通知可靠消息服务失败、本地业务执行异常、可靠消息接收到提交请求后投递消息到MQ中失败等问题,如何解决?

    这类情况即出现业务卡在中间态,其实没关系,因为此时消息持久化状态会一直处于 “待确认” 状态。

    对于这种情况,我们只需要在可靠消息服务后台开启一个定时任务,定时扫描 “待确认”状态的中间状态消息。当消息处于 “待确认”状态,表明主动方应用已经开始执行本地业务操作,但业务状态未知,因此我们需要对主动方本地业务执行进行回查操作。

    这个阶段我们要在主动方应用中暴露一个回调查询接口,可靠消息服务会调用该接口,根据消息中的业务参数回查本地事务执行状态。如果主动方业务返回执行成功,则表明当前消息可以投递,此时可靠消息服务更新消息状态为 “待发送”,同时投递消息到MQ,并更新消息状态为已发送

    如果可靠消息服务(通过回查接口)询问主动方业务执行结果,返回执行失败,那么可靠消息服务需要删除该消息(逻辑删除,设置消息状态为已回滚)。

    通过上述的流程,我们可以保证可靠消息服务一定会努力尝试完成消息到MQ的投递过程,即主动方业务执行与消息发送一定同时成功,同时失败。


    ⑤ 如何保证消息不丢失–业务被动方对消息100%接收成功

    如果消息投递成功,但业务被动方消费消息出现问题,如:消费失败、未收到消息投递(传说中的丢消息)等,该如何处理呢?

    因为“未收到消息投递”的情况在消息服务高可用的情况下机会不会出现,而消费失败是业务级别的异常,因此我们同样可以采用在可靠消息服务后台起定时任务的方式,检查消息状态。

    对长时间处于 “已发送” 未变更状态为 “已完成” 的消息进行重新投递操作,这个扫描的时间我们要根据业务执行时间自行调整,比如:1min。

    对这类型消息重新投递到MQ之后,MQ会推送消息给消费方重新进行业务的处理操作。这个过程要在业务层实现消费的幂等性,保证同一条消息在多次投递之后,只会进行一次完整的业务逻辑处理。

    整个流程中,从消息的发送,到消息的消费阶段都能保证消息与本地事务执行状态一致,即使上下游会有短暂的状态不一致,在经过一个处理的时间窗口之后,在全局上,数据能够实现最终一致性。

    整个流程中,我们能保证:

    • 业务主动方本地事务提交失败,业务被动方不会收到消息的投递。
    • 只要业务主动方本地事务执行成功,那么消息服务一定会投递消息给下游的业务被动方,并最终保证业务被动方一定能成功消费该消息(消费成功或失败,即最终一定会有一个最终态)。

    这个机制就是基于消息中间件的异步流程中的最终一致性保证方案。

    参考博文:
    分布式事务之消息最终一致性事务

    展开全文
  • 上面这件事一般IBasicBolt可以罩住,更多的方法可以使用IRichBolt。 一个topology里面的acker数量是可以...storm用一致哈希来把spout-tuple-id对应给acker,因为tuple知道自己的祖宗,所以他可以算出通知哪个acker

    Storm可靠性相关


    Storm可靠性的设计与它的Acker有很大关系,先让我用比较拙劣的语句简单描述下。

    Storm的tuple,被OutputCollector emit的时候——这个称为archoring(生成新的tuples),需要指定和它相关的之前的tuple,并且要指定executor完之后ack之类的api,这样就能建立一颗可追踪的tuple树。如:

    public class SplitSentence extends BaseRichBolt {
            OutputCollector _collector;
            
            public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
                _collector = collector;
            }
    
            public void execute(Tuple tuple) {
                String sentence = tuple.getString(0);
                for(String word: sentence.split(" ")) {
                    _collector.emit(tuple, new Values(word));
                }
                _collector.ack(tuple);
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }        
        }
    上面这件事一般IBasicBolt可以罩住,更多的方法可以使用IRichBolt。
    一个topology里面的acker数量是可以设置的,然后tuple比较多的话可以多设置几个acker,提高效率。每个tuple有一个64位的id,acker利用这个id来追踪tuple,且会知道这个tuple他的祖宗们,也就是只要继续跟踪新的tuple就可以了,因为祖宗的id会被传递下去。
    storm用一致性哈希来把spout-tuple-id对应给acker,因为tuple知道自己的祖宗,所以他可以算出通知哪个acker来ack(所有的根tuple是知道的,hash好了之后,以后的子tuple去对应的地方ack)。acker会维护tuple树上的各个tuple,当他知道这个树完成处理了,就会通知某个对应的task。
    acker task不显示跟踪整个tuple树,不然会占据很多内存,acker使用一个恒定的20 bytes来针对每个spout tuple。一个acker存一个spout-tuple-id的时候,存两个值:一个是task id,用于关联task;第二个是 ack val,一个64位的数。ack val是整个树状态的一个表示,把所有的tuple id异或起来,当ack val=0,就知道整棵tuple树成功完成了,否则失败,然后就可以通知task了。
    在以上的可靠性之下,如果:

    1.  task fail了,tuple没有被ack。超时机制保证这个tuple以后再被重新处理
    2. Acker挂 。这个acker跟踪的tuple都超时,都会重新处理
    3. Spout挂了。消息源重新发送消息。
    所以,storm的可靠性机制是完全分布式的,可伸缩的并且高度容错的。

    以上内容可以具体参考wiki:Guaranteeing-message-processing

    Acker更多设计可以参考: twitter-storm-code-analysis-acker-merchanism



    Storm事务性相关


    State in Trident

    Trident在读写有状态的数据源方面是有着一流的抽象封装的。状态即可以保留在topology的内部,比如说内存和HDFS,也可以放到外部存储当中,比如说Memcached或者Cassandra。这些都是使用同一套Trident API。

    Trident以一种容错的方式来管理状态以至于当你在更新状态的时候你不需要去考虑错误以及重试的情况。这种保证每个消息被处理有且只有一次的原理会让你更放心的使用Trident的topology。

    在进行状态更新时,会有不同的容错级别。在外面一起来讨论这点之前,让我们先通过一个例子来说明一下如果想要坐到有且只有一次处理的必要的技巧。假定你在做一个关于某stream的计数聚合器,你想要把运行中的计数存放到一个数据库中。如果你在数据库中存了一个值表示这个计数,每次你处理一个tuple之后,就将数据库存储的计数加一。

    当错误发生,truple会被重播。这就带来了一个问题:当状态更新的时候,你完全不知道你是不是在之前已经成功处理过这个tuple。也许你之前从来没处理过这个tuple,这样的话你就应该把count加一。另外一种可能就是你之前是成功处理过这个tuple的,但是这个在其他的步骤处理这个tuple的时候失败了,在这种情况下,我们就不应该将count加一。再或者,你接受到过这个tuple,但是上次处理这个tuple的时候在更新数据库的时候失败了,这种情况你就应该去更新数据库。

    如果只是简单的存计数到数据库的话,你是完全不知道这个tuple之前是否已经被处理过了的。所以你需要更多的信息来做正确的决定。Trident提供了下面的语义来实现有且只有一次被处理的目标。

    • Tuples 是被分成小的集合被批量处理的 (see the tutorial)
    • 每一批tuples被给定一个唯一ID作为事务ID (txid). 当这一批tuple被重播时, txid不变.
    • 批与批之间的状态更新时严格顺序的。比如说第三批tuple的状态的更新必须要等到第二批tuple的状态更新成功之后才可以进行.
    有了这些定义,你的状态实现可以检测到当前这批tuple是否以前处理过,并根据不同的情况进行不同的处理。你需要才去的行动取决于你的输入spout。有三种不同类型的可以容错的spout: 非事务的,事务的,以及不透明事务的spout。对应的,也有3种容错的状态:非事务的,事务的,以及不透明事务的状态。让我们一起来看看每一种spout类型能够支持什么样的容错类型。


    Transactional spouts

    记住,Trident是以小批量(batch)的形式在处理tuple,并且每一批都会分配一个唯一的transaction id。 不同的spout会根据他们可以给予不同的批量tuple的guarantee的能力有不同的属性。一个transactional spout会有如下这些属性:
    1. 有着同样txid的batch一定是一样的。当重播一个txid对应的batch时,一定会重播和之前对应txid的batch中同样的tuples。
    2. 各个batch之间是没有交集的。每个tuple只能属于一个batch
    3. 每一个tuple都属于一个batch,无一例外
    这是一类非常容易理解的spout, tuple 流被划分为固定的batch并且永不改变。trident-kafka 有一个transactional spout的实现。

    你也许会问:为什么我们不总是使用transactional spout?这很容易理解。一个原因是并不是所有的地方都需要容错的。举例来说,TransactionalTridentKafkaSpout 工作的方式是给定一个txid的batch所包含的一个属于一个topic的来自于所有Kafka partition的tuple序列。一旦这个batch被发出,在任何时候如果这个batch被重新发出时,它必须包含原来所有的tuple以满足 transactional spout的语义。现在我们假定一个batch被TransactionalTridentKafkaSpout所发出,这个batch没有被成功处理,并且同时kafka的一个节点也down掉了。你就无法像之前一样重播一个完全一样的batch(因为kakfa的节点down掉,该topic的一部分partition可能会无法使用),整个处理会被中断。

    这也就是"opaque transactional" spouts(不透明事务spout)存在的原因- 他们对于丢失源节点这种情况是容错的,仍然能够帮你达到有且只有一次处理的语义。后面会对这种spout有所介绍。

    (当然,在Kafka开启replication功能时,transactional spout也是可以做到容错的)

    在外面来讨论"opaque transactional" spout之前,我们先来看看你应该怎样设计一个State来实现transactional spout的有且只有一次执行的语义。这个State的类型是"transactional state" 并且它利用了任何一个txid总是对应同样的tuple序列这个语义。

    假如说你有一个用来计算单词出现次数的topology,你想要将单词的出现次数以key/value对的形式存储到数据库中。key就是单词,value就是这个这个单词出现的次数。你已经看到只是存储一个数量是不足以知道你是否已经处理过一个batch的。你可以通过将value和txid一起存储到数据库中。这样的话,当更新这个count之前,你可以先去比较数据库中存储的txid和现在要存储的txid。如果一样,就跳过什么都不做,因为这个value之前已经被处理过了。如果不一样,就执行存储。这个逻辑可以工作的前提就是txid永不改变,并且Trident保证状态的更新是在batch之间严格顺序进行的。

    考虑下面这个例子的运行逻辑, 假定你在处理一个txid为3的包含下面tuple的batch:
    ["man"]  
    ["man"]  
    ["dog"] 
     假定数据库中当前保存了下面这样的key/value 对:
    man => [count=3, txid=1]  
    
    dog => [count=4, txid=3]  
    
    apple => [count=10, txid=2]  
    单词“man”对应的txid是1. 因为当前的txid是3,你可以确定你还没有为这个batch中的tuple更新过这个单词的数量。所以你可以放心的给count加2并更新txid为3. 与此同时,单词“dog”的txid和当前的txid是相同的,因此你可以跳过这次更新。此时数据库中的数据如下:

    man => [count=5, txid=3]  
    
    dog => [count=4, txid=3]  
    
    apple => [count=10, txid=2] 
    接下来我们一起再来看看 opaque transactional spout已经怎样去为这种spout设计相应的state。 

    Opaque transactional spouts

    #xhe_tmpurl正如之前说过的,opaque transactional spout并不能确保一个txid所对应的batch的一致性。一个opaque transactional spout有如下属性:
    • 每个tuple只在一个batch中被成功处理。然而,一个tuple在一个batch中被处理失败后,有可能会在另外的一个batch中被成功处理
    OpaqueTridentKafkaSpout 是一个拥有这种属性的spout,并且它是容错的,即使Kafak的节点丢失。当OpaqueTridentKafkaSpout 发送一个batch的时候, 它会从上个batch成功结束发送的位置开始发送一个tuple序列。这就确保了永远没有任何一个tuple会被跳过或者被放在多个batch中被多次成功处理的情况.
    使用opaque transactional spout,再使用和transactional spout相同的处理方式:判断数据库中存放的txid和当前txid去做对比已经不好用了。这是因为在state的更新过程之间,batch可能已经变了。
    你只能在数据库中存储更多的信息。除了value和txid,你还需要存储之前的数值在数据库中。让我们还是用上面的例子来说明这个逻辑。假定你当前batch中的对应count是“2”, 并且我们需要进行一次状态更新。而当前数据库中存储的信息如下:
    { 
      value = 4,  
    
      prevValue = 1,  
    
      txid = 2  
    } 
    如果你当前的txid是3, 和数据库中的txid不同。那么就将value中的值设置到prevValue中,根据你当前的count增加value的值并更新txid。更新后的数据库信息如下:
    { 
      value = 6,  
    
      prevValue = 4,  
    
      txid = 3  
    }  
    现在外面再假定你的当前txid是2,和数据库中存放的txid相同。这就说明数据库里面value中的值包含了之前一个和当前txid相同的batch的更新。但是上一个batch和当前这个batch可能已经完全不同了,以至于我们需要无视它。在这种情况下,你需要在prevValue的基础上加上当前count的值并将结果存放到value中去。数据库中的信息如下所示:
    { 
      value = 3,  
    
      prevValue = 1,  
    
      txid = 2  
    } 
    因为Trident保证了batch之间的强顺序性,因此这种方法是有效的。一旦Trident去处理一个新的batch,它就不会重新回到之前的任何一个batch。并且由于opaque transactional spout确保在各个batch之间是没有共同成员的,每个tuple只会在一个batch中被成功处理,你可以安全的在之前的值上进心更新。 

    Non-transactional spouts

    Non-transactional spout(非事务spout)不确保每个batch中的tuple的规则。所以他可能是最多被处理一次的,如果tuple被处理失败就不重发的话。同时他也可能会是至少处理一次的,如果tuple在不同的batch中被多次成功处理的时候。无论怎样,这种spout是不可能实现有且只有一次被成功处理的语义的。

    Summary of spout and state types

    Opaque transactional state有着最为强大的容错性。但是这是以存储更多的信息作为代价的。Transactional states 需要存储较少的状态信息,但是仅能和 transactional spouts协同工作. Finally, non-transactional state所需要存储的信息最少,但是却不能实现有且只有一次被成功处理的语义。

    State和Spout类型的选择其实是一种在容错性和存储消耗之间的权衡,你的应用的需要会决定那种组合更适合你。

    State APIs

    你已经看到一些错综复杂的方法来实现有且只有一次被执行的语义。Trident这样做的好处把所有容错想过的逻辑都放在了State里面。 作为一个用户,你并不需要自己去处理复杂的txid,存储多余的信息到数据库中,或者是任何其他类似的事情。你只需要写如下这样简单的code:
    TridentTopology topology = new TridentTopology();          
    
    TridentState wordCounts =  
    
          topology.newStream("spout1", spout)  
    
            .each(new Fields("sentence"), new Split(), new Fields("word"))  
    
            .groupBy(new Fields("word"))  
    
            .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))                  
    
            .parallelismHint(6);  
    所有管理opaque transactional state所需的逻辑都在MemcachedState.opaque方法的调用中被涵盖了,除此之外,数据库的更新会自动以batch的形式来进行以避免多次访问数据库。
    State的基本接口只包含下面两个方法:

    public interface State {  
    
        void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream  
    
        void commit(Long txid);  
    
    }  
    当一个State更新开始时,以及当一个State更新结束时你都会被告知,并且会告诉你该次的txid。Trident并没有对你的state的工作方式有任何的假定。
    假定你自己搭了一套数据库来存储用户位置信息,并且你想要在Trident中去访问这个数据。你的state的实现应该有用户信息的set、get方法
    public class LocationDB implements State {  
    
        public void beginCommit(Long txid) {      
    
        }  
    
        public void commit(Long txid) {      
    
        }
          
        public void setLocation(long userId, String location) {  
    
          // code to access database and set location  
    
        }  
    
        public String getLocation(long userId) {  
    
          // code to get location from database  
    
        }  
    }  
    然后你还需要提供给Trident一个StateFactory来在Trident的task中创建你的State对象。LocationDB 的 StateFactory可能会如下所示:
    public class LocationDBFactory implements StateFactory {  
       public State makeState(Map conf, int partitionIndex, int numPartitions) {  
          return new LocationDB();  
       }   
    }  
    Trident提供了一个QueryFunction接口用来实现Trident中在一个source state上查询的功能。同时还提供了一个StateUpdater来实现Trident中更新source state的功能。比如说,让我们写一个查询地址的操作,这个操作会查询LocationDB来找到用户的地址。让我们以怎样在topology中实现该功能开始,假定这个topology会接受一个用户id作为输入数据流。

    TridentTopology topology = new TridentTopology();  
    
    TridentState locations = topology.newStaticState(new LocationDBFactory());  
    
    topology.newStream("myspout", spout)  
            .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))  
    接下来让我们一起来看看QueryLocation 的实现应该是什么样的:
    public class QueryLocation extends BaseQueryFunction<LocationDB, String> {  
    
        public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {  
    
            List<String> ret = new ArrayList();  
    
            for(TridentTuple input: inputs) {  
    
                ret.add(state.getLocation(input.getLong(0)));  
    
            }  
            return ret;  
        }  
    
        public void execute(TridentTuple tuple, String location, TridentCollector collector) {  
            collector.emit(new Values(location));  
        }      
    }  
    QueryFunction的执行分为两部分。首先Trident收集了一个batch的read操作并把他们统一交给batchRetrieve。在这个例子中,batchRetrieve会接受到多个用户id。batchRetrieve应该返还一个和输入tuple数量相同的result序列。result序列中的第一个元素对应着第一个输入tuple的结果,result序列中的第二个元素对应着第二个输入tuple的结果,以此类推。
    你可以看到,这段代码并没有想Trident那样很好的利用batch的优势,而是为每个输入tuple去查询了一次LocationDB。所以一种更好的操作LocationDB方式应该是这样的:
    public class LocationDB implements State {  
    
        public void beginCommit(Long txid) {     
     
        }  
    
        public void commit(Long txid) {      
    
        }  
    
        public void setLocationsBulk(List<Long> userIds, List<String> locations) {  
    
          // set locations in bulk  
    
        }  
          
        public List<String> bulkGetLocations(List<Long> userIds) {  
    
          // get locations in bulk  
    
        }  
    }  
    接下来,你可以这样改写上面的QueryLocation:
    public class QueryLocation extends BaseQueryFunction<LocationDB, String> {  
    
        public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {  
    
            List<Long> userIds = new ArrayList<Long>();  
    
            for(TridentTuple input: inputs) {  
                userIds.add(input.getLong(0));  
            }  
    
            return state.bulkGetLocations(userIds);  
        }  
    
        public void execute(TridentTuple tuple, String location, TridentCollector collector) {  
            collector.emit(new Values(location));  
        }  
    } 
    通过有效减少访问数据库的次数,这段代码比上一个实现会高效的多。如何你要更新State,你需要使用StateUpdater接口。下面是一个StateUpdater的例子用来将新的地址信息更新到LocationDB当中。

    public class LocationUpdater extends BaseStateUpdater<LocationDB> {  
    
        public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {  
    
            List<Long> ids = new ArrayList<Long>();  
    
            List<String> locations = new ArrayList<String>();  
    
            for(TridentTuple t: tuples) {  
    
                ids.add(t.getLong(0));  
    
                locations.add(t.getString(1));  
            }  
            state.setLocationsBulk(ids, locations);  
        }  
    }  
    下面列出了你应该如何在Trident topology中使用上面声明的LocationUpdater:
    TridentTopology topology = new TridentTopology();  
    
    TridentState locations =   
        topology.newStream("locations", locationsSpout)  
            .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())  
    partitionPersist 操作会更新一个State。其内部是将 State和一批更新的tuple交给StateUpdater,由StateUpdater完成相应的更新操作。
    在这段代码中,只是简单的从输入的tuple中提取处userid和对应的location,并一起更新到State中。
    partitionPersist 会返回一个TridentState对象来表示被这个Trident topoloy更新过的location db。 然后你就可以使用这个state在topology的任何地方进行查询操作了。
    同时,你也可以看到我们传了一个TridentCollector给StateUpdaters。 emit到这个collector的tuple就会去往一个新的stream。在这个例子中,我们并没有去往一个新的stream的需要,但是如果你在做一些事情,比如说更新数据库中的某个count,你可以emit更新的count到这个新的stream。然后你可以通过调用TridentState#newValuesStream方法来访问这个新的stream来进行其他的处理。

    persistentAggregate

    Trident有另外一种更新State的方法叫做persistentAggregate。 你在之前的word count例子中应该已经见过了,如下:
    TridentTopology topology = new TridentTopology();          
    
    TridentState wordCounts =  
    
          topology.newStream("spout1", spout)  
    
            .each(new Fields("sentence"), new Split(), new Fields("word"))  
    
            .groupBy(new Fields("word"))  
    
            .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))  

    persistentAggregate是在partitionPersist之上的另外一层抽象。它知道怎么去使用一个Trident 聚合器来更新State。在这个例子当中,因为这是一个group好的stream,Trident会期待你提供的state是实现了MapState接口的。用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中。MapState接口看上去如下所示:
    public interface MapState<T> extends State {  
    
        List<T> multiGet(List<List<Object>> keys);  
    
        List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);  
    
        void multiPut(List<List<Object>> keys, List<T> vals);  
    
    }  

    当你在一个未经过group的stream上面进行聚合的话,Trident会期待你的state实现Snapshottable接口:
    public interface Snapshottable<T> extends State {  
    
        T get();  
    
        T update(ValueUpdater updater);  
    
        void set(T o);  
    
    } 

    MemoryMapState 和 MemcachedState 都实现了上面的2个接口。

    Implementing Map States

    在Trident中实现MapState是非常简单的,它几乎帮你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 类实现了所有相关的逻辑,包括容错的逻辑。你只需要将一个IBackingMap 的实现提供给这些类就可以了。IBackingMap接口看上去如下所示:
    public interface IBackingMap<T> {  
    
        List<T> multiGet(List<List<Object>> keys);   
    
        void multiPut(List<List<Object>> keys, List<T> vals);   
    
    }  

    OpaqueMap's会用OpaqueValue的value来调用multiPut方法,TransactionalMap's会提供TransactionalValue中的value,而NonTransactionalMaps只是简单的把从Topology获取的object传递给multiPut。
    Trident还提供了一种CachedMap类来进行自动的LRU cache。
    另外,Trident 提供了 SnapshottableMap 类将一个MapState 转换成一个 Snapshottable 对象。
    大家可以看看 MemcachedState的实现,从而学习一下怎样将这些工具组合在一起形成一个高性能的MapState实现。MemcachedState是允许大家选择使用opaque transactional, transactional, 还是 non-transactional 语义的。 


    以上内容可以具体参考wiki: Trident-state


    (全文完)

    展开全文
  • 一般做法就是将数据库的操作以及发送消息放到一个事务中。如果数据库操作或者发送消息失败,则回滚事务即可。如果事务提交成功,消息发出去了以后,服务B处理消息出现异常,则我们只需要在fix调问题以后retry ...

    写数据库同时发mq消息事务一致性的一种解决方案


    事件驱动(event driven)的系统设计,服务之间的交互大多数都是通过消息队列中间件,那么我们都会面临一个微服务之间数据一致性的问题。

    假设如下场景:服务A在一个事务中包含数据库更新操作,然后发送消息给MQ通知服务B

    一般做法就是将数据库的操作以及发送消息放到一个事务中。如果数据库操作或者发送消息失败,则回滚事务即可。如果事务提交成功,消息发出去了以后,服务B处理消息出现异常,则我们只需要在fix调问题以后retry message就能够保证数据最终一致性。(当然在发送消息之前我们会将这个消息存储下来,redis或者数据库都可以,之后再做housekeep删除)。

    但是我们会面临这样一个问题,假如服务B需要拿到服务A数据库更新操作以后的数据状态,但是我们的数据库操作跟发送消息是在一个事务里面的,那就可能存在服务B已经在处理消息了,但是服务A这边的事务还未提交,那么服务B从数据库中拿到的数据状态还是原来的数据状态。

    所以对于这个问题,个人觉得比较好的做法就是,发送消息不要放在数据库事务中,等到数据库提交事务以后,会有一个回调事件(ATER_COMMIT),然后在这个回调事件中发送消息即可。

    在这里插入图片描述

    1. begin tx 开启本地事务
    2. do work 执行业务操作
    3. insert message 向同实例消息库插入消息
    4. end tx 事务提交
    5. send message 网络向 server 发送消息
    6. reponse server 回应消息
    7. delete message 如果 server 回复成功则删除消息
    8. scan messages 补偿任务扫描未发送消息
    9. send message 补偿任务补偿消息
    10. delete messages 补偿任务删除补偿成功的消息

    跟人觉得以上是一种比较好的解决方案,发送消息前回将消息存储下来,如果服务消费方出现异常,只需要retry即可。
    该方案在这篇文章有详细描述:去哪儿QMQ

    这是一种设计了,那如何落实到代码实现呢?在这里我记录的是使用spring的事件机制以及提供的注解来实现,在事务提交以后回调事件发送消息。

    • 定义一个evnet (假设就是要发给MQ的event)
    public class MyTransactionEvent {
        private String name;
    
        public MyTransactionEvent2(String name) {
            this.name = name;
        }
    
        public String getName() {
            return name;
        }
    }
    
    • 在数据库事务中发布定义的 MyTransactionEvent
    @Service
    @Slf4j
    @RequiredArgsConstructor
    public class FooService {
    
        private final ApplicationEventPublisher publisher;
        private final FooRepository fooRepository;
    
        @Transactional
        public boolean saveFoo(FooEntity fooEntity) throws InterruptedException {
            log.error("start insert foo");
            fooRepository.save(fooEntity);
            publisher.publishEvent(new MyTransactionEvent(fooEntity.getFooName()));
            log.error("end insert foo");
            Thread.currentThread().sleep(2000);
            log.error("to commit insert");
            return true;
        }
     }
    
    • 订阅 MyTransactionEvent
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
        public void afterCommit(MyTransactionEvent event) {
            log.error("after commit then send event {}", event);
            log.error("after commit then send event {}", event.getName());
        }
    

    可以在同一个class中加入这个订阅方法,也可以将这个订阅方法放在另一个class,要交给spring去管理。
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 告诉spring是在事务提交以后触发这个方法,并且事务中发布的事件是 MyTransactionEvent ,该方法才会触发。

    完整代码查看:github

    展开全文
  • Spring——事务管理

    2020-08-03 21:39:16
    文章目录Spring——事务管理(1)配置声名式事务:(2)使用Spring管理事务,注意头文件的约束导入 : tx(3) 配置事务通知(4)配置aop织入事务 Spring——事务管理 提供声名式事务,允许用户自定义切面。 事务:...
  • 其实这篇文章和springCloud无关,但是属于微服务不可避免的一个问题,所以拿在这里说一说。...柔性事务中又有可靠消息最终一致,TCC,最大努力通知三种解决方案。这里来说一说可靠消息最终一致。 ...
  • 分布式事务

    千次阅读 2020-01-12 20:31:37
    一、概述 面试时经常被问到分布式事务问题,今天我们...常用的一般是后四种。 常用的分布式事务解决方案: XA 方案 TCC 方案 本地消息表 可靠消息最终一致方案 最大努力通知方案 二、XA 所谓的 XA 方案,即:两...
  • 分布式事务方案

    2020-01-04 22:41:57
    数据库事务acid cap理论 consistency(强)一致,... 一般使用ap组合,base理论是ap的扩展,不需要强一致,只要最终一致 分布式事务 2pc(两阶段提交协议,prepare commit) XA方案 1TM通知各个RM执行业务,RM执...
  • 一、分布式事务方案:最终一致事务补偿、TCC、两阶段提交、最大能力通知等。具体结合业务场景。...最大努力通知型方案(一般跨平台通知比较常用)1、基于可靠消息最终一致方案场景:对应支付系统会计异步记
  • 分布式事务知识点

    2020-09-09 14:54:27
    最大努力通知方案 2、两阶段提交方案/XA方案 XA方案即两阶段提交,有一个事务管理器的概念,负责协调多个数据库的事务。 第一阶段:事务协调器要求每个设计到事物的数据库预提交此操作,并反映是否可以提交; ...
  • 分布式事务项目实践

    2017-10-07 11:31:56
    首先本人关于分布式事务的基础知识来自于吴水成老师的分布式事务教程。...在项目中主要使用的事务处理就是最终一致、最大努力通知和TCC,其中最终一致使用比较普遍,最大努力通知一般用于和第三方对接的情况比较多
  • TCC型分布式事务介绍

    2018-06-08 22:55:51
    分布式事务的产生是由于需要同时对多个数据...一般而言,满足ACID的事务的为钢性事务,满足BASE理论的为柔性事务。其中,柔性事务大致可以分为以下四种:两阶段型补偿型异步确保型最大努力通知型TCC型事务TCC事物 ...
  • 一、分布式事务方案:最终一致事务补偿、TCC、两阶段提交、最大能力通知等。具体结合业务场景。很多大型企业自主研发了自己的分布式事务解决方案,如:支付宝 XTS,去哪儿 QMQ。 1.基于可靠消息的最终一致解决...
  • 前言分布式事务的产生是由于需要同时对多...钢性事务与柔性事务一般而言,满足ACID的事务的为钢性事务,满足BASE理论的为柔性事务。其中,柔性事务大致可以分为以下四种:两阶段型补偿型异步确保型最大努力通知型TCC...
  • 在介绍了分布式事务的理论基础后,针对不同的分布式场景常见的解决方案有2PC、TCC、可靠消息最终一致、最大努力通知这几种,这次我们来介绍2PC。 什么是2PC 2PC(2 prepare phase commit phase):俩阶段提交,俩...
  • 一般是以下几种方案: XA 方案 TCC 方案 本地消息表 可靠消息最终一致方案 最大努力通知方案 一、两阶段提交方案/XA方案 所谓的 XA 方案,即:两阶段提交,有一个事务管理器的概念,负责协调多个数据库...
  • 前言分布式事务的产生是由于需要同时对多...钢性事务与柔性事务一般而言,满足ACID的事务的为钢性事务,满足BASE理论的为柔性事务。其中,柔性事务大致可以分为以下四种:两阶段型补偿型异步确保型最大努力通知型TCC...
  • 只要聊到你做了分布式系统,必问分布式事务,你对分布式事务一无所知的话,确实会很坑,你起码得知道有哪些方案,一般怎么来做,每个方案的优缺点是什么。 现在面试,分布式系统成了标配,而分布式系统带来的分布式...
  • 常见的分布式事务,两阶段提交方案/XA方案(行业一般称为2PC)方案,TCC方案(try confirm cancel)也叫三阶段方案或者3PC方案,本地消息表,可靠消息最终一致方案,最大努力通知方案。今天,我们着重介绍两阶段...
  • 一套分布式事务标准,使用了两阶段提交来保证分布式事务的完整。 包含的三种角色: AP:Application,表示应用程序。 RM:Resource Manager,表示资源管理器,比如数据库。 TM:Transaction Manager,表示事务...
  • 只要聊到你做了分布式系统,必问分布式事务,你对分布式事务一无所知的话,确实会很坑,你起码得知道有哪些方案,一般怎么来做,每个方案的优缺点是什么。 现在面试,分布式系统成了标配,而分布式系统带来的分布式...
  • 柔性事务中又有可靠消息最终一致,TCC,最大努力通知三种解决方案。这里来说一说可靠消息最终一致。 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六 可靠消息...
  • 我们已经了解了四种分布式事务解决方案,2PC【链接】、TCC【链接】、可靠消息最终一致【链接】、最大努力通知【链接】,每种解决方案我们通过案例开发进行学习,本章节我们结合互联网金融项目中的业务场景,来进行...
  • SAP R/3 事务码速查手册SAP R/3 事务码速查手册目录 1 CA 交叉应用组件 11 1.1 CA 交叉应用组件 11 1.2 CA-EUR-CNV 本地货币改变 11 1.3 CA-DMS 文档管理系统 14 1.4 CA-CL 分类系统 15 1.4.1 CA-CL-CHR 特性 17 1.5...
  • 重试机制原理:@RabbitListtener 底层实现,使用AOP进行拦截,如果程序没有抛出异常,自动提交事务:如果AOP使用异常通知拦截 获取异常信息的话,自动实现补偿机制,该消息会一直缓存到RabbitMQ服务器端存放,一直重...
  • 邮件接口

    2019-05-29 16:31:48
    答:通知类邮件,一般是发送时效很强的验证码邮件,用户操作通知邮件等,内容纯事务类,不包含广告内容。  A.通知类邮件常见分类包括:  (一)账号相关:账号激活、信息验证、账号绑定、密码修改、密码取回等...

空空如也

空空如也

1 2 3 4 5 6
收藏数 113
精华内容 45
关键字:

一般事务性通知