精华内容
下载资源
问答
  • Trident

    2018-11-30 23:14:00
    Trident是一个基于Storm构建的上层的Micro-Batching系统,它简化了Storm的拓扑构建过程并且提供了类似于窗口、聚合以及状态管理等等没有被Storm原生支持的功能 转载于:...

    Trident 是一个基于Storm构建的上层的Micro-Batching系统,它简化了Storm的拓扑构建过程并且提供了类似于窗口、聚合以及状态管理等等没有被Storm原生支持的功能

    转载于:https://www.cnblogs.com/huiandong/p/10047305.html

    展开全文
  • trident

    2016-10-25 23:53:47
    Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批量处理

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial

    ----------------

    Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批量处理工具很了解的话,那么应该毕竟容易理解Trident,因为他们之间很多的概念和思想都是类似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理。


    举例说明

    让我们一起来看一个Trident的例子。在这个例子中,我们主要做了两件事情:

    1. 从一个流式输入中读取语句病计算每个单词的个数
    2. 提供查询给定单词列表中每个单词当前总数的功能
    因为这只是一个例子,我们会从如下这样一个无限的输入流中读取语句作为输入:

    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                   new Values("the cow jumped over the moon"),
                   new Values("the man went to the store and bought some candy"),
                   new Values("four score and seven years ago"),
                   new Values("how many apples can you eat"),
    spout.setCycle(true);
    

    这个spout会循环输出列出的那些语句到sentence stream当中,下面的代码会以这个stream作为输入并计算每个单词的个数:

    TridentTopology topology = new TridentTopology();        
    TridentState wordCounts =
         topology.newStream("spout1", spout)
           .each(new Fields("sentence"), new Split(), new Fields("word"))
           .groupBy(new Fields("word"))
           .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
           .parallelismHint(6);
    

    让我们一起来读一下这段代码。我们首先创建了一个TridentTopology对象。TridentTopology类相应的接口来构造Trident计算过程中的所有内容。我们在调用了TridentTopology类的newStream方法时,传入了一个spout对象,spout对象会从外部读取数据并输出到当前topology当中,从而在topology中创建了一个新的数据流。在这个例子中,我们使用了上面定义的FixedBatchSpout对象。输入数据源同样也可以是如Kestrel或者Kafka这样的队列服务。Trident会再Zookeeper中保存一小部分状态信息来追踪数据的处理情况,而在代码中我们指定的字符串“spout1”就是Zookeeper中用来存储metadata信息的Znode节点

    Trident在处理输入stream的时候会把输入转换成若干个tuple的batch来处理。比如说,输入的sentence stream可能会被拆分成如下的batch:

    Batched stream

    一般来说,这些小的batch中的tuple可能会在数千或者数百万这样的数量级,这完全取决于你的输入的吞吐量。

    Trident提供了一系列非常成熟的批量处理的API来处理这些小batch. 这些API和你在Pig或者Cascading中看到的非常类似, 你可以做group by's, joins, aggregations, 运行 functions, 执行 filters等等。当然,独立的处理每个小的batch并不是非常有趣的事情,所以Trident提供了很多功能来实现batch之间的聚合的结果并可以将这些聚合的结果存储到内存,Memcached, Cassandra或者是一些其他的存储中。同时,Trident还提供了非常好的功能来查询实时状态。这些实时状态可以被Trident更新,同时它也可以是一个独立的状态源。

    回到我们的这个例子中来,spout输出了一个只有单一字段“sentence”的数据流。在下一行,topology使用了Split函数来拆分stream中的每一个tuple,Split函数读取输入流中的“sentence”字段并将其拆分成若干个word tuple。每一个sentence tuple可能会被转换成多个word tuple,比如说"the cow jumped over the moon" 会被转换成6个 "word" tuples. 下面是Split的定义:

    public class Split extends BaseFunction {
       public void execute(TridentTuple tuple, TridentCollector collector) {
           String sentence = tuple.getString(0);
           for(String word: sentence.split(" ")) {
               collector.emit(new Values(word));                
           }
       }
    }
    

    如你所见,真的很简单。它只是简单的根据空格拆分sentence,并将拆分出的每个单词作为一个tuple输出。

    topology的其他部分计算单词的个数并将计算结果保存到了持久存储中。首先,word stream被根据“word”字段进行group操作,然后每一个group使用Count聚合器进行持久化聚合。persistentAggregate会帮助你把一个状态源聚合的结果存储或者更新到存储当中。在这个例子中,单词的数量被保持在内存中,不过我们可以很简单的把这些数据保存到其他的存储当中,如 Memcached, Cassandra等。如果我们要把结果存储到Memcached中,只是简单的使用下面这句话替换掉persistentAggregate就可以,这当中的"serverLocations"是Memcached cluster的主机和端口号列表:

    .persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
    MemcachedState.transactional()
    

    persistentAggregate存储的数据就是所有batch聚合的结果。

    Trident非常酷的一点就是它是完全容错的,拥有者有且只有一次处理的语义。这就让你可以很轻松的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复这些状态。

    persistentAggregate方法会把数据流转换成一个TridentState对象。在这个例子当中,TridentState对象代表了所有的单词的数量。我们会使用这个TridentState对象来实现在计算过程中的进行分布式查询。

    下面这部分实现了一个低延时的单词数量的分布式查询。这个查询以一个用空格分割的单词列表为输入,并返回这些单词当天的个数。这些查询是想普通的RPC调用那样被执行的,要说不同的话,那就是他们在后台是并行执行的。下面是执行查询的一个例子:

    DRPCClient client = new DRPCClient("drpc.server.location", 3772);
    System.out.println(client.execute("words", "cat dog the man");
    // prints the JSON-encoded result, e.g.: "[[5078]]"
    

    如你所见,除了这是并发执行在storm cluster上之外,这看上去就是一个正常的RPC调用。这样的简单查询的延时通常在10毫秒左右。当然,更负责的DRPC调用可能会占用更长的时间,尽管延时很大程度上是取决于你给计算分配了多少资源。

    这个分布式查询的实现如下所示:

    topology.newDRPCStream("words")
           .each(new Fields("args"), new Split(), new Fields("word"))
           .groupBy(new Fields("word"))
           .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
           .each(new Fields("count"), new FilterNull())
           .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
    

    我们仍然是使用TridentTopology对象来创建DRPC stream,并且我们将这个函数命名为“words”。这个函数名会作为第一个参数在使用DRPC Client来执行查询的时候用到。

    每个DRPC请求会被当做只有一个tuple的batch来处理。在处理的过程中,以这个输入的单一tuple来表示这个请求。这个tuple包含了一个叫做“args”的字段,在这个字段中保存了客户端提供的查询参数。在这个例子中,这个参数是一个以空格分割的单词列表。

    首先,我们使用Splict功能把入参拆分成独立的单词。然后对“word” 进行group by操作,之后就可以使用stateQuery来在上面代码中创建的TridentState对象上进行查询。stateQuery接受一个数据源(在这个例子中,就是我们的topolgoy所计算的单词的个数)以及一个用于查询的函数作为输入。在这个例子中,我们使用了MapGet函数来获取每个单词的出现个数。由于DRPC stream是使用跟TridentState完全同样的group方式(按照“word”字段进行group),每个单词的查询会被路由到TridentState对象管理和更新这个单词的分区去执行。

    接下来,我们用FilterNull这个过滤器把从未出现过的单词给去掉,并使用Sum这个聚合器将这些count累加起来。最终,Trident会自动把这个结果发送回等待的客户端。

    Trident在如何最大程度的保证执行topogloy性能方面是非常智能的。在topology中会自动的发生两件非常有意思的事情:

    1. 读取和更新状态的操作 (比如说 persistentAggregate 和 stateQuery) 会自动的是batch的形式操作状态。 如果有20次更新需要被同步到存储中,Trident会自动的把这些操作汇总到一起,只做一次读一次写,而不是进行20次读20次写的操作。因此你可以在很方便的执行计算的同时,保证了非常好的性能。
    2. Trident的聚合器已经是被优化的非常好了的。Trident并不是简单的把一个group中所有的tuples都发送到同一个机器上面进行聚合,而是在发送之前已经进行过一次部分的聚合。打个比方,Count聚合器会先在每个partition上面进行count,然后把每个分片count汇总到一起就得到了最终的count。这个技术其实就跟MapReduce里面的combiner是一个思想。

    让我们再来看一下Trident的另外一个例子。

    Reach

    下一个例子是一个纯粹的DRPC topology。这个topology会计算一个给定URL的reach。那么什么事reach呢,这里我们将reach定义为有多少个独立用户在Twitter上面expose了一个给定的URL,那么我们就把这个数量叫做这个URL的reach。要计算reach,你需要tweet过这个URL的所有人,然后找到所有follow这些人的人,并将这些follower去重,最后就得到了去重后的follower的数量。如果把计算reach的整个过程都放在一台机器上面,就太勉强了,因为这会需要进行数千次数据库调用以及上一次的tuple的读取。如果使用Storm和Trident,你就可以把这些计算步骤在整个cluster中进行并发。

    这个topology会读取两个state源。一个用来保存URL以及tweet这个URL的人的关系的数据库。还有一个保持人和他的follower的关系的数据库。topology的定义如下:

    TridentState urlToTweeters =
           topology.newStaticState(getUrlToTweetersState());
    TridentState tweetersToFollowers =
           topology.newStaticState(getTweeterToFollowersState());
    
    topology.newDRPCStream("reach")
           .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
           .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
           .shuffle()
           .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
           .parallelismHint(200)
           .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
           .groupBy(new Fields("follower"))
           .aggregate(new One(), new Fields("one"))
           .parallelismHint(20)
           .aggregate(new Count(), new Fields("reach"));
    

    这个topology使用newStaticState方法创建了TridentState对象来代表一种外部存储。使用这个TridentState对象,我们就可以在这个topology上面进行动态查询了。和所有的状态源一样,在数据库上面的查找会自动被批量执行,从而最大程度的提升效率。

    这个topology的定义是非常直观的 - 只是一个简单的批量处理job。首先,查询urlToTweeters数据库来得到tweet过这个URL的人员列表。这个查询会返回一个列表,因此我们使用ExpandList函数来把每一个反悔的tweeter转换成一个tuple。

    接下来,我们来获取每个tweeter的follower。我们使用shuffle来把要处理的tweeter分布到toplology运行的每一个worker中并发去处理。然后查询follower数据库从而的到每个tweeter的follower。你可以看到我们为topology的这部分分配了很大的并行度,这是因为这部分是整个topology中最耗资源的计算部分。

    然后我们在follower上面使用group by操作进行分组,并对每个组使用一个聚合器。这个聚合器只是简单的针对每个组输出一个tuple “One”,再count “One” 从而的到不同的follower的数量。“One”聚合器的定义如下:

    public class One implements CombinerAggregator<Integer> {
       public Integer init(TridentTuple tuple) {
           return 1;
       }
    
       public Integer combine(Integer val1, Integer val2) {
           return 1;
       }
    
       public Integer zero() {
           return 1;
       }        
    }
    

    这是一个"汇总聚合器", 它会在传送结果到其他worker汇总之前进行局部汇总,从而来最大程度上提升性能。Sum也是一个汇总聚合器,因此以Sum作为topology的最终操作是非常高效的。

    接下来让我们一起来看看Trident的一些细节。

    Fields and tuples

    Trident的数据模型就是TridentTuple - 一个有名的值的列表。在一个topology中,tuple是在一系列的处理操作(operation)中增量生成的。operation一般以一组子弹作为输入并输出一组功能字段。Operation的输入字段经常是输入tuple的一个子集,而功能字段则是operation的输出。

    看一下如下这个例子。假定你有一个叫做“stream”的stream,它包含了“x”,"y"和"z"三个字段。为了运行一个读取“y”作为输入的过滤器MyFilter,你可以这样写:

    stream.each(new Fields("y"), new MyFilter())
    

    假定MyFilter的实现是这样的:

    public class MyFilter extends BaseFilter {
       public boolean isKeep(TridentTuple tuple) {
           return tuple.getInteger(0) < 10;
       }
    }

    这会保留所有“y”字段小于10的tuples。TridentTuple传个MyFilter的输入将只有字段“y”。这里需要注意的是,当选择输入字段时,Trident会自动投影tuple的一个子集,这个操作是非常高效的。

    让我们一起看一下“功能字段”是怎样工作的。假定你有如下这个功能:

    public class AddAndMultiply extends BaseFunction {
       public void execute(TridentTuple tuple, TridentCollector collector) {
           int i1 = tuple.getInteger(0);
           int i2 = tuple.getInteger(1);
           collector.emit(new Values(i1 + i2, i1 * i2));
       }
    }
    

    这个函数接收两个数作为输入并输出两个新的值:“和”和“乘积”。假定你有一个stream,其中包含“x”,"y"和"z"三个字段。你可以这样使用这个函数:

    stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));
    

    输出的功能字段被添加到输入tuple中。因此这个时候,每个tuple中将会有5个字段"x", "y", "z", "added", 和 "multiplied". "added" 和"multiplied"对应于AddAndMultiply输出的第一和第二个字段。

    另外,我们可以使用聚合器来用输出字段来替换输入tuple。如果你有一个stream包含字段"val1"和"val2",你可以这样做:

    stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
    

    output stream将会只包含一个叫做“sum”的字段,这个sum字段就是“val2”的累积和。

    在group之后的stream上,输出将会是被group的字段以及聚合器输出的字段。举例如下:

    stream.groupBy(new Fields("val1"))
         .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
    

    在这个例子中,输出将包含字段"val1" 和 "sum".

    State

    在实时计算领域的一个主要问题就是怎么样来管理状态并能轻松应对错误和重试。消除错误的影响是非常重要的,因为当一个节点死掉,或者一些其他的问题出现时,那行batch需要被重新处理。问题是-你怎样做状态更新来保证每一个消息被处理且只被处理一次?

    这是一个很棘手的问题,我们可以用接下来的例子进一步说明。假定你在做一个你的stream的计数聚合,并且你想要存储运行时的count到一个数据库中去。如果你只是存储这个count到数据库中,并且想要进行一次更新,我们是没有办法知道同样的状态是不是以前已经被update过了的。这次更新可能在之前就尝试过,并且已经成功的更新到了数据库中,不过在后续的步骤中失败了。还有可能是在上次更新数据库的过程中失败的,这些你都不知道。

    Trident通过做下面两件事情来解决这个问题:

    1. 每一个batch被赋予一个唯一标识id“transaction id”。如果一个batch被重试,它将会拥有和之前同样的transaction id
    2. 状态更新是以batch为单位有序进行的。也就是说,batch 3的状态更新必须等到batch 2的状态更新成功之后才可以进行。

    有了这2个原则,你就可以达到有且只有一次更新的目标。你可以将transaction id和count一起以原子的方式存到数据库中。当更新一个count的时候,需要判断数据库中当前batch的transaction id。如果跟要更新的transaction id一样,就跳过这次更新。如果不同,就更新这个count。

    当然,你不需要在topology中手动处理这些逻辑。这些逻辑已经被封装在Stage的抽象中并自动进行。你的Stage object也不需要自己去实习transaction id的跟踪操作。如果你想了解更多的关于如何实现一个Stage以及在容错过程中的一些取舍问题,可以参照这篇文章.

    一个Stage可以采用任何策略来存储状态。它可以存储到一个外部的数据库,也可以在内存中保持状态并备份到HDFS中。Stage并不需要永久的保持状态。比如说,你有一个内存版的Stage实现,它保存最近X个小时的数据并丢弃老的数据。可以把 Memcached integration 作为例子来看看State的实现.

    Execution of Trident topologies

    Trident的topology会被编译成尽可能高效的Storm topology。只有在需要对数据进行repartition的时候(如groupby或者shuffle)才会把tuple通过network发送出去,如果你有一个trident如下:

    Compiling Trident to Storm 1

    它将会被编译成如下的storm topology:

    Compiling Trident to Storm 2

    Conclusion

    Trident使得实时计算更加优雅。你已经看到了如何使用Trident的API来完成大吞吐量的流式计算,状态维护,低延时查询等等功能。Trident让你在获取最大性能的同时,以更自然的一种方式进行实时计算。

    展开全文
  • Losing Trident

    2021-01-07 06:35:03
    <p>When you throw a trident, it comes back if the trident is enchanted with loyalty <h3>What behaviour is observed: <p>When thrown and the player dies shortly after, the trident is lost. Even when the...
  • Trident是Storm中最为核心的概念,在做Strom开发的过程中,绝大部分情况下我们都会使用Trident,而不是使用传统的Spout、Bolt。Trident是Storm原语的高级封装,学会Trident之后,将会使得我们Storm开发变得非常简单...

    Trident是Storm中最为核心的概念,在做Strom开发的过程中,绝大部分情况下我们都会使用Trident,而不是使用传统的Spout、Bolt。Trident是Storm原语的高级封装,学会Trident之后,将会使得我们Storm开发变得非常简单。

    一、什么是Storm Trident ?

    简而言之:Trident是编写Storm Topology的一套高级框架,是对传统Spout、Bolt的高级封装。在学习Trident之前,我们都是都Spout、Bolt的相关API来编写一个Topology,在学习了Trident之后,我们会使用Trident API来编写Topology。

    可以将StormTopology与TridentTopology的关系,类比为JDBC与ORM框架(mybatis、hibernate)之间的关系,后者是前者的高级封装,功能相同,但是可以极大的减少我们的开发的工作量。

    当然,就像我们学习JDBC与ORM框架一样,JDBC可能很容易理解,但是学习ORM框架,可能就相对复杂一点。甚至学习ORM框架的时间可能要比学习JDBC的时间要更长,但是一旦我们学会了ORM框架,可能就再也不想去使用JDBC了,因为ORM框架可以帮助我们更高效的进行开发。

    学习Trident也一样,可能我们学习理解许多新的概念,但是学会了会极大的提高我们的开发效率。

    Storm原语中,最重要的就是Spout、Bolt、Grouping等概念。

    Trident对于Storm原语的抽象主要也就是针对这些基本概念的抽象。主要体现在:Trident Spout,Operation、State 。

    Trident Spout是针对Storm原语中的Spout进行的抽象

    Operation是针对Bolt、Grouping等概念的抽象

    State是新提出的概念,实际上就是数据持久化的接口。

    通常情况下,新的概念意味着要使用新的API。但是归根结底,还是底层还是通过storm原语来实现。在Trident中,我们使用TridentTopology表示一个拓扑,而在Storm原语中,我们使用StormTopology来表示一个拓扑。TridentTopology最终会被转换成StormTopology。在接下来的内容,我们将首先介绍TridentTopology的构建过程,以及TridentTopology如何转化为StormTopology。

    二、TridentTopology与StormToplogy

    1、API区别:

    在Trident中,有着新的一套构建Topology的API,我们先通过从代码层面上对比来进行分析:

    StormTopology:由传统的Spout和Bolt的API编写的Topology,最终是通过TopologyBuilder对象来创建的,返回的结果就是StormTopology对象。StormTopologytopology= topologyBuilder.createTopology();

    TridentTopology:由Trident的API编写的Topology,因为在Trident的API中,使用TridentTopology来表示一个Topology,是直接new出来的。TridentTopology tridentTopology = new TridentTopology();

    单独提出这两个概念,是为了以后的区分。以后我们提到StormTopology时,表示的就是以Spout、Bolt等这些API创建的Topology,而提到TridentTopology表示的就是以Trident的API创建的Topology。

    2、创建Topology的区别

    我们以单词计数案例WordCountApp(超链接)进行对比

    StormTopologyTopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word-reader" , new WordReader(),4);

    builder.setBolt("word-normalizer" , new WordNormalizer(),3).shuffleGrouping("word-reader" );

    builder.setBolt("word-counter" , new WordCounter(),1).fieldsGrouping("word-normalizer" , new Fields("word"));

    StormTopology topology = builder .createTopology();

    TridentTopology:TridentTopology tridentTopology = new TridentTopology();

    tridentTopology.newStream("word-reader-stream" , new WordReader()).parallelismHint(16)

    .each( new Fields("line" ), new NormalizeFunction(), new Fields("word" ))

    .groupBy( new Fields("word" ))

    .persistentAggregate( new MemoryMapState.Factory(), new Sum(), new Fields("sum" ));

    StormTopology stormTopology = tridentTopology.build();

    对比:

    在StormTopology中,我们都是通过TopologyBuilder的setSpout、setBolt的方式来创建Topology,然后通过Grouping策略指定Bolt的数据来源和分组策略。

    在TridentTopology中,我们使用TridentTopology来创建Topology,整个创建过程中,都是流式编程风格的。要注意的是,在Trident中,我们依然使用了WordReader这个Spout,但是并没有使用Bolt,而是使用了类似于each、persistentAggregate这样方法,来取代Bolt的功能。关于这些方法的作用再之后会详细介绍,目前只要知道Bolt的作用被一些方法取代了即可。

    三、TridentTopology与StormToplogy的联系

    二者的联系主要是:TridentTopology最终会被编译成StormTopology。请再次查看上述构建构建TridentTopology的最后一句代码。StormTopology stormTopology = tridentTopology.build();

    这句代码的的返回结果还是StormTopology对象,这实际上意味着,TridentTopology最终还是会被编译成StandardTopology。这很容易理解,就像ORM框架与JDBC一样,ORM框架只是一层封装,最终还是要通过JDBC操作数据库。而TridentTopology是高级封装,但是最终还是要通过编译StormTopology来运行。

    注意:这一点是非常重要的。上面我们已经提到,在Trident中,依然要指定Spout,但是用了一系列其他的方法如each、persistentAggregate等(当然不止这些),代替了Bolt的功能。那么这里又提到,TridentTopology会被编译成StormTopology,实际上就意味着Storm最终会将这些方法转换为一个或多个Bolt。我们要了解Trident是如何工作的,就必须要了解,这些方法的最终是如何被转换为Bolt的。最简单的查看方式,就是查看编译后的StormTopology的getSpout,getBolt方法来看。

    在后面我们会详细介绍,TridentTopology是如何转换为StormTopology的。目前,我们只需要知道TridentTopology最终是会转换为StormTopology即可。

    事实上,在Trident框架会将调用的所有方法转换为一个个Node。Node类型如下:

    newStream方法中参数Spout转换为SpoutNode,将调用的each方法,persistentAggregate等方法,转换为一个个ProcesserNode,而groupBy等操作,转换为一个PartitionNode,最终组成一个对象图,最后根据这个对象图,来将TridentTopology转换为StormTopology。当然,光说不练假把式,我们通过分析源码进行简单说明。

    首先说明Spout转换为SpoutNode对象

    其次说明each、persistentAggregate转换为ProcesserNode

    最后说明如何根据SpoutNode和ProcesserNode将TridentTopology转换为StormTopology

    1、Spout转换为SpoutNode

    首先,我们看一下TridentTopology对象的newStream方法:

    7fe63b99d2aecb36c50fcb817c459663.png

    可以看到,可以接受五种类型的Spout,以下是这五个方法的实现:

    04d2b5e9a614b58beec38a9ab8f72c3d.png

    我们可以看到,这五种方法,最终调用的实际上只有两个,并且在这两个方法中,最终都将Spout转换为了SpoutNode对象。

    2、each、persistentAggregate转换为ProcessorNode

    我们可以查看Stream对象的源码, 找到each、persistentAggregate两个方法内容

    0056f13ea4155ea89d8d7ddc8a7bf251.png

    上图显示了这两个方法,这种都被转换为一个ProcessorNode,最终添加到Topology中。

    3、最后我们看一下,TridentTopology.build()的实现

    由于源码内容比较多,我们只分析感兴趣的地方,其中红色加深的地方,是目前最为关注的:public StormTopology build() {

    ...

    TridentTopologyBuilder builder = new TridentTopologyBuilder();

    Map spoutIds = genSpoutIds( spoutNodes);

    Map boltIds = genBoltIds( mergedGroups);

    // SpoutNode维护了Spout类型,根据类型转换为对应的Spout

    for(SpoutNode sn : spoutNodes ) {

    Integer parallelism = parallelisms.get(grouper .nodeGroup(sn));

    if(sn .type == SpoutNode.SpoutType.DRPC) {

    builder.setBatchPerTupleSpout(spoutIds .get(sn), sn.streamId ,

    (IRichSpout) sn. spout, parallelism , batchGroupMap.get(sn ));

    } else {

    ITridentSpout s;

    if(sn .spout instanceof IBatchSpout) {

    s = new BatchSpoutExecutor((IBatchSpout)sn .spout );

    } else if(sn .spout instanceof ITridentSpout) {

    s = (ITridentSpout) sn. spout;

    } else {

    throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");

    // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)

    }

    builder.setSpout(spoutIds .get(sn), sn.streamId, sn. txId, s, parallelism , batchGroupMap .get(sn));

    }

    }

    for(Group g : mergedGroups ) {

    if(!isSpoutGroup( g)) {

    Integer p = parallelisms.get(g );

    Map streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);

    //将调用each、processAggregate方法后的ProcessorNode,转换为Bolt

    BoltDeclarer d = builder.setBolt(boltIds .get(g), new SubtopologyBolt(graph, g .nodes , batchGroupMap ), p,

    committerBatches(g, batchGroupMap), streamToGroup);

    Collection inputs = uniquedSubscriptions(externalGroupInputs(g ));             //根据调用GroupBy等方法转换成的PartitionNode进行Grouping策略

    for(PartitionNode n : inputs ) {

    Node parent = TridentUtils.getParent( graph, n );

    String componentId;

    if(parent instanceof SpoutNode) {

    componentId = spoutIds .get(parent);

    } else {

    componentId = boltIds.get(grouper .nodeGroup(parent));

    }

    d.grouping( new GlobalStreamId(componentId , n.streamId ), n.thriftGrouping);

    }

    }

    }

    return builder .buildTopology();

    }

    展开全文
  • 实用的Storm Trident教程 本教程以的的出色为基础。 流浪者的设置基于Taylor Goetz的。 Hazelcast状态代码基于wurstmeister的。 看看随附的。 本教程的结构 浏览Part * .java,了解Trident的基础知识 使用Skeleton....
  • Trident是用Kotlin编写的Minecraft服务器API和实现。 为什么使用三叉戟? 又快又轻! Trident尽其所能整合并发! 干净的! Trident不使用Mojang的任何代码。 美丽的API! 要使Trident的API达到最佳状态,人们有很...
  • Storm Trident

    2019-06-22 14:01:27
    如果您熟悉Pig或Cascading等高级批处理工具,Trident的概念将非常熟悉 - Trident具有连接,聚合,分组,功能和过滤器。除此之外,Trident还添加了基元,用于在任何数据库或持久性存储之上执行有状态的增量处理。 ...

    概述

    Trident是一个高级抽象,用于在Storm之上进行实时计算。它允许您无缝混合高吞吐量(每秒数百万条消息),有状态流处理和低延迟分布式查询。如果您熟悉Pig或Cascading等高级批处理工具,Trident的概念将非常熟悉 - Trident具有连接,聚合,分组,功能和过滤器。除此之外,Trident还添加了基元,用于在任何数据库或持久性存储之上执行有状态的增量处理。 Trident具有一致, exactly-once 的语义,因此很容易推理Trident拓扑。

    Trident和kafka集成

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.0</version>
    </dependency>
    
    public class KafkaSpoutUtils {
        public static KafkaSpout<String, String> buildKafkaSpout(String boostrapServers, String topic){
    
            KafkaSpoutConfig<String,String> kafkaspoutConfig=KafkaSpoutConfig.builder(boostrapServers,topic)
                    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
                    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
                    .setProp(ConsumerConfig.GROUP_ID_CONFIG,"g1")
                    .setEmitNullTuples(false)
                    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
                    .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)
                    .setMaxUncommittedOffsets(10)//一旦分区积压有10个未提交offset,Spout停止poll数据,解决Storm背压问题
    
                    .build();
            return new KafkaSpout<String, String>(kafkaspoutConfig);
        }
        //可以保证精准一次更新,推荐使用
        public static KafkaTridentSpoutOpaque<String,String> buildKafkaSpoutOpaque(String boostrapServers, String topic){
            KafkaTridentSpoutConfig<String, String> kafkaOpaqueSpoutConfig = KafkaTridentSpoutConfig.builder(boostrapServers, topic)
                    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
                    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
                    .setProp(ConsumerConfig.GROUP_ID_CONFIG,"g1")
                    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
                    .setRecordTranslator(new Func<ConsumerRecord<String, String>, List<Object>>() {
                        public List<Object> apply(ConsumerRecord<String, String> record) {
                            return new Values(record.key(),record.value(),record.timestamp());
                        }
                    },new Fields("key","value","timestamp"))
                    .build();
            return new KafkaTridentSpoutOpaque<String, String>(kafkaOpaqueSpoutConfig);
        }
    }
    
    public static void main(String[] args) throws Exception {
        TridentTopology tridentTopology=new TridentTopology();
    
        tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
                .peek((TridentTuple input) ->{
                    System.out.println(input);
                });
    
        new LocalCluster().submitTopology("tridentTopology",new Config(),tridentTopology.build());
    }
    

    常见算子

    • Map算子
      将一个Tuple转换为另外一个Tuple,如果用户修改了Tuple元素的个数,需要指定输出的Fields
    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
                    .map((tuple)-> new Values("Hello~"+tuple.getStringByField("value")),new Fields("name"))
                    .peek((tuple) -> System.out.println(tuple));
    
    • Filter
      过滤上游输入的Tuple将满足条件的Tuple向下游输出。
    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
         .filter(new Fields("value"), new BaseFilter() {
             @Override
             public boolean isKeep(TridentTuple tuple) {
                 System.out.println(tuple.getFields());
                 return !tuple.getStringByField("value").contains("error");
             }
         })
         .peek((tuple) -> System.out.println(tuple));
    
    • flatMap
      将一个Tuple,转换为多个Tuple,如果修改了Tuple的数目,需要指定输出的Fields
    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
                    .flatMap((tuple)->{
                        List<Values> list=new ArrayList<>();
                        String[] tokens = tuple.getStringByField("value").split("\\W+");
                        for (String token : tokens) {
                            list.add(new Values(token));
                        }
                        return list;
                    },new Fields("word"))
                    .peek((tuple) -> System.out.println(tuple));
    
    • each
      参数传递可以是BaseFunction(添加fields)和BaseFilter(等价于Filter)
      basefunction
    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
        .each(new Fields("value"), new BaseFunction() {
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                collector.emit(new Values(tuple.getStringByField("value")));
            }
        }, new Fields("other"))
        .peek((tuple) -> System.out.println(tuple));
    

    basefilter

    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
        .each(new Fields("value"), new BaseFilter() {
            @Override
            public boolean isKeep(TridentTuple tuple) {
                return !tuple.getStringByField("value").contains("error");
            }
        })
        .peek((tuple) -> System.out.println(tuple));
    
    • project
      投影/过滤Tuple中无用field
    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
        .project(new Fields("value","timestamp"))
        .peek((tuple) -> System.out.println(tuple));
    

    分区和聚合

    public class KafkaTridentTopology {
        public static void main(String[] args) throws Exception {
            TridentTopology tridentTopology=new TridentTopology();
    
            tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
                .parallelismHint(3)
                .project(new Fields("value"))
                .flatMap((tuple)-> {
                    List<Values> list=new ArrayList<>();
                    String[] tokens = tuple.getStringByField("value").split("\\W+");
                    for (String token : tokens) {
                        list.add(new Values(token));
                    }
                    return list;
                },new Fields("word"))
                .map((tuple)->new Values(tuple.getStringByField("word"),1),new Fields("key","count"))
                .partition(new PartialKeyGrouping(new Fields("key")))
                .parallelismHint(5)
                .partitionAggregate(new Fields("key","count"),new CountAggregater(),new Fields("word","total"))
                .peek((tuple) -> System.out.println(tuple));
    
    
            new LocalCluster().submitTopology("tridentTopology",new Config(),tridentTopology.build());
        }
    }
    

    CountAggregater

    public class CountAggregater extends BaseAggregator<Map<String,Integer>> {
        @Override
        public Map<String, Integer> init(Object batchId, TridentCollector collector) {
            return new HashMap<>();
        }
    
        @Override
        public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {
            String word = tuple.getStringByField("key");
            Integer count=tuple.getIntegerByField("count");
    
    
            if(val.containsKey(word)){
                count= val.get(word)+count;
            }
            val.put(word,count);
        }
    
        @Override
        public void complete(Map<String, Integer> val, TridentCollector collector) {
            for (Map.Entry<String, Integer> entry : val.entrySet()) {
                collector.emit(new Values(entry.getKey(),entry.getValue()));
            }
            val.clear();
        }
    }
    
    

    Trident 状态管理

    Trident以容错方式管理状态,以便在重试和失败时状态更新是幂等的。这使您可以推理Trident拓扑,就好像每条消息都被精确处理一次。 在进行状态更新时,可以实现各种级别的容错。在开始之前,让我们看一个示例,说明实现一次性语义所需的技巧。假设您正在对流进行计数聚合,并希望将运行计数存储在数据库中。现在假设您在数据库中存储了一个表示计数的值,并且每次处理新tuple时都会增加计数。
    发生故障时,将重放tuple。这会在执行状态更新(或任何有副作用的事物)时出现问题 - 您不知道以前是否曾基于此tuple成功更新状态。也许你以前从未处理过tuple,在这种情况下你应该增加计数。也许你已经处理了tuple并成功递增了计数,但是tuple在另一个步骤中处理失败。在这种情况下,您不应增加计数。或许您之前看过tuple但在更新数据库时出错。在这种情况下,您应该更新数据库。
    通过将计数存储在数据库中,您不知道之前是否已处理此tuple。因此,您需要更多信息才能做出正确的决定。 Trident提供以下语义,足以实现一次性处理语义:

    1. tuple作为小批量处理(参见教程)
    2. 每批元组都有一个称为“事务ID”(txid)的唯一ID。如果批量重播,则给出完全相同的txid。
    3. 批次之间订购状态更新。也就是说,在批处理2的状态更新成功之前,将不会应用批处理3的状态更新。

    在容错方面有三种可能的喷口:“非事务性(non-transactional)”,“事务性(transactional)”和“不透明事务性(opaque transactional)”。同样,在容错方面有三种可能的状态:“非事务性(non-transactional)”,“事务性(transactional)”和“不透明事务性(opaque transactional)”

    Transactional spouts(事务)

    Trident将tuple作为小批量处理,每个批次都被赋予唯一的事务ID。spout的属性根据它们可以提供的关于每批中的含量的保证而变化。事务性spout具有以下属性:

    • 给定txid的批次始终相同。(失败事务ID不变且 事务中的Tuple不变)
    • 批处理tuple之间没有重叠(tuple是一批或另一批,永远不是多tuple)。
    • 每个tuple都在一个批处理中(没有跳过元组)
    //txid不同 更新value、txid相同跳过
    key - [txid,value]
    

    流被分成永不改变的固定批次,Storm为Kafka实现了一个transactional spout。

    Opaque transactional spouts(不透明的事务)

    不透明的事务性spout不保证txid中的一批tuple保持不变。不透明的事务性spout具有以下属性:

    • 失败事务ID不变,但是同一个批次的tuple可以不同(允许分区数据丢失)。
    • 保证状态更新严格有序

    窗口

    • slidingWindow
    public class TridentWindowDemo {
        public static void main(String[] args) throws Exception {
            TridentTopology tridentTopology = new TridentTopology();
    
            tridentTopology.newStream("KafkaSpoutOpaque",
                    KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
                    .project(new Fields("value"))
                    .flatMap((tuple) -> {
                        List<Values> list = new ArrayList<>();
                        String[] tokens = tuple.getStringByField("value").split("\\W+");
                        for (String token : tokens) {
                            list.add(new Values(token));
                        }
                        return list;
                    }, new Fields("word"))
                    .map((tuple) -> new Values(tuple.getStringByField("word"), 1), new Fields("key", "count"))
                    .slidingWindow(
                            BaseWindowedBolt.Duration.seconds(10),
                            BaseWindowedBolt.Duration.seconds(5),
                            new InMemoryWindowsStoreFactory(),
                            new Fields("key","count"),
                            new WordCountAggregator(),
                            new Fields("key","total")
                    )
                    .peek(new Consumer() {
                        @Override
                        public void accept(TridentTuple input) {
                            System.out.println(input);
                        }
                    });
    
            new LocalCluster().submitTopology("aa",new Config(),tridentTopology.build());
        }
    }
    
    • TridentTopology
    TridentTopology tridentTopology = new TridentTopology();
    
    WindowConfig wc= SlidingDurationWindow.of(BaseWindowedBolt.Duration.seconds(10),
            BaseWindowedBolt.Duration.seconds(5));
    
    tridentTopology.newStream("KafkaSpoutOpaque",
            KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
            .project(new Fields("value"))
            .flatMap((tuple) -> {
                List<Values> list = new ArrayList<>();
                String[] tokens = tuple.getStringByField("value").split("\\W+");
                for (String token : tokens) {
                    list.add(new Values(token));
                }
                return list;
            }, new Fields("word"))
            .map((tuple) -> new Values(tuple.getStringByField("word"), 1), new Fields("key", "count"))
            .window(wc,
                    new InMemoryWindowsStoreFactory(),
                    new Fields("word","count"),
                    new WordCountAggregator(),
                    new Fields("word","total")
            )
            .peek(new Consumer() {
                @Override
                public void accept(TridentTuple input) {
                    System.out.println(input);
                }
            });
    
    new LocalCluster().submitTopology("aa",new Config(),tridentTopology.build());
    
    展开全文
  • Storm_Trident

    2016-08-18 14:42:07
    storm_Trident例子
  • storm Trident

    2018-11-20 15:21:43
    Trident topology. Trident 在storm上提供了高层抽象,抽象掉了事务处理和状态管理的细节. Trident topology trident 引入了"数据批次概念" batch每个batch会分配一个唯一的事务标识符,spout基于决定batch...
  • TridentNet

    2020-10-19 17:39:36
    论文标题:Scale-Aware Trident Networks for Object Detection 年份及出处:ICCV 2019 研究尺度变化问题 首先研究感受野对目标检测性能的影响 贡献: 首次指出了不管是图像金字塔,还是SSD、FPN,都是利用不同大小...
  • 一、概述Storm Trident中的核心数据模型就是“Stream”,也就是说,Storm Trident处理的是Stream,但是实际上Stream是被成批处理的,Stream被切分成一个个的Batch分布到集群中,所有应用在Stream上的函数最终会应用...
  • trident教程

    2019-10-02 17:40:19
    (一)理论基础更多理论以后再补充,或者参考书籍1、trident是什么?Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput ...
  • Trident中有对状态数据进行读取和写入操作的一流抽象工具。状态既可以保存在拓扑内部,比如保存在内容中并由HDFS存储,也可以通过外部存储(比如Memcached或Cassandra)存储在数据库中。而对于Trident的API而言,这两...
  • Trident Topology

    2018-04-10 15:58:18
    Trident在Storm上提供了高层抽象,Trident抽象掉了事物处理状态和状态管理细节,他可以让一批tuple进行离散的事务处理,并且提供了一些抽象函数,允许topology在数据上执行函数功能,过滤和聚合等操作. 在Trident中引入...
  • 初见Trident

    2018-09-07 15:37:00
    1、trident是什么?Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful str...
  • Trident是基于Storm进行实时留处理的高级抽象,提供了对实时流4的聚集,投影,过滤等操作,从而大大减少了开发Storm程序的工作量。Trident还提供了针对数据库或则其他持久化存储的有状态的,增量的更新操作的原语。...
  • Trident-Shards-源码

    2021-03-31 07:44:34
    Trident Shards是一个数据包,允许使用从上古守护者掉落的碎片制作三叉戟 安装 将trident-shards-dp.zip文件放入您的世界的datapacks文件夹中 将trident-shards.zip文件放入资源包文件夹 告示 没有资源包,三叉戟...
  • 一、认识storm trident trident可以理解为storm批处理的高级抽象,提供了分组、分区、聚合、函数等操作,提供一致性和恰好一次处理的语义。 1)元祖被作为batch处理 2)每个batch的元祖都被指定唯一的一个事物id,如果...
  • 目前,Trident使用Trident库允许使用Python编写的模块充当,以提供一些基本说明。 Trident尝试使用尽可能少的外部模块,并允许用户仅使用标准库来运行Trident 。这样做是为了确保与主机的最大兼容性,并允许专注于...
  • 使用事物TridentTopology 持久化数据到MySQL1、构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays;importjava.util.Map;importorg.apache.storm.Config;importorg.apache.storm.Local...
  • storm trident

    2018-06-06 11:00:40
    Trident是在storm基础上,一个以实时计算为目标的高度抽象。 它在提供处理大吞吐量数据能力(每秒百万次消息)的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批处理工具...
  • Added Soulbound Trident

    2020-12-02 01:22:13
    <p>Added the Soulbound trident <h2>Changes <p>Added the Soulbound trident <h2>Related Issues <h2>Testability <ul><li>[x] I have fully tested the proposed changes and promise that they will not ...
  • Trident Mark

    2018-11-14 14:41:59
    import java.util.HashMap; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated....import org.apache.storm.trident...
  • 一.trident的介绍trident的英文意思是三叉戟,在这里我的理解是因为之前我们通过之前的学习topology spout bolt去处理数据是没有问题的,但trident的对spout bolt更高层次的一个抽象,其实现功能是一样的,只不过是...
  • Trident数据手册.pdf

    2019-10-20 03:15:08
    Trident数据手册pdf,主要介绍Trident特点及技术规格,Trident 择是一款 3U 机架抽取式 KVM 切换器,配有一体化抽取式键盘和 3X17 英寸 LCD 显示屏,使用了高对比度的显示器(50:1),可以折叠放入 3U 机架内。
  • Trident for Gmail-crx插件

    2021-03-23 12:40:32
    Trident for Gmail Trident-基于WebRTC技术的PBX我们的PBX Trident是WebRTC服务产品的核心组件。 它结合了先进的动态呼叫控制和呼叫路由引擎的所有优势,以及围绕易于使用的融合式自我管理和报告界面的全功能呼叫者...
  • Apache Storm Trident

    2019-10-02 19:14:51
    Trident是Storm的延伸。像Storm,Trident也是由Twitter开发的。开发Trident的主要原因是在Storm上提供高级抽象,以及状态流处理和低延迟分布式查询。 Trident使用spout和bolt,但是这些低级组件在执行之前由Tri...
  • trident 这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法。 我在这里只关注代码设计,而不关注监督或冗余之类的部署良好实践。 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不移至下...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,118
精华内容 2,047
关键字:

trident