精华内容
下载资源
问答
  • TridentNet

    2019-02-19 14:54:17
    本文致力于在深度学习目标检测问题中,提高对小目标的检测率
  • Trident项目是新一代多线程,高性能和无尘Minecraft服务器的实现。 最新发布的 获得JAR 方法一:自己构建 如果您确定我们的分发形式有问题,或者您想要在获取JAR文件之前进行一些修改,则希望直接从源代码进行构建...
  • 一组用 Storm Trident 编写的应用程序。 应用 用法 建造 $ git clone git@github.com:mayconbordin/trident-examples.git $ cd trident-examples/ $ mvn -P < profile> package 使用local配置文件以本地模式运行...
  • 硅产品知识产权(SIP)平台解决方案和数字信号处理器(DSP)内核授权厂商CEVA公司宣布,Trident Microsystems已获得CEVA的DSP技术授权以实现其机顶盒和电视系统产品线的解调(demodulation)功能。Trident Microsystems...
  • 三叉戟除雾网 NTIRE 2020 NonHomogeneous Dehazing Challenge (CVPR Workshop 2020)第一个解决方案。 环境: Ubuntu16.04 Python3.6 英伟达 GPU+CUDA8 依存关系: 预训练模型==0.7.4 火炬视觉==0.2.1 ...
  • trident-7.0.jar

    2018-04-01 11:19:00
    java swing用户交互界面的美观开发工具包,便于界面开发。
  • trident 一个性能监控和JVM虚拟机运行时监控组件 效果 请求响应时间监控 可逐层展开调用树 请求时序 详细模式下,可以通过甘特图观察请求时序 虚拟机状态监控 包括了: heap和perm区的使用情况展示 新、老代的GC回收...
  • Trident Extension -crx插件

    2021-04-02 14:16:22
    语言:English 三叉戟扩展 我们的Dialoga PBX是我们WebRTC服务产品的核心组件。 它结合了高级动态呼叫控制和路由引擎的所有优点以及功能齐全的呼叫者交互解决方案。 所有这些都可以通过易于使用的自我管理和报告界面...
  • Trident数据手册.pdf

    2019-10-20 03:15:08
    Trident数据手册pdf,主要介绍Trident特点及技术规格,Trident 择是一款 3U 机架抽取式 KVM 切换器,配有一体化抽取式键盘和 3X17 英寸 LCD 显示屏,使用了高对比度的显示器(50:1),可以折叠放入 3U 机架内。
  • Trident8493_NVR.tar.gz

    2020-01-03 08:29:49
    建议选择4G或8G的小U盘,U盘的格式为FAT32,在U盘中新建一个T16的文件夹,将附件Trident8493_NVR.tar直接拷贝到T16文件夹中升级,插入U盘后点击系统维护升级。附件不要解压作直接拷贝。不要使用制作过U盘启动的U盘,...
  • substance.jar和trident.jar

    热门讨论 2015-04-26 08:05:42
    Java界面GUI设计难看,所以用换肤所需的两个包substance.jar和trident.jar,方便换肤,怎样使用百度一下就可以
  • ###必读 把大数进行分片,根据数据中某个字段分组 Origin_Stream.partitionAggregate(new Fields("a","b") , new Test(), new Fields("A1","B1")).partitionPersist(new LocationDBFactory(), new Fields("A1","B1")...
  • trident教程

    2019-10-02 17:40:19
    (一)理论基础更多理论以后再补充,或者参考书籍1、trident是什么?Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput ...

     



    (一)理论基础
    更多理论以后再补充,或者参考书籍
    1、trident是什么?
    Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you're familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.

    简单的说,trident是storm的更高层次抽象,相对storm,它主要提供了2个方面的好处:

    (1)提供了更高层次的抽象,将常用的count,sum等封装成了方法,可以直接调用,不需要自己实现。

    (2)提供了一次原语,如groupby等。

    (3)提供了事务支持,可以保证数据均处理且只处理了一次。

    2、trident每次处理消息均为batch为单位,即一次处理多个元组。


    3、事务类型

    关于事务类型,有2个比较容易混淆的概念:spout的事务类型以及事务状态。

    它们都有3种类型,分别为:事务型、非事务型和透明事务型。

    (1)spout

    spout的类型指定了由于下游出现问题导致元组需要重放时,应该怎么发送元组。

    事务型spout:重放时能保证同一个批次发送同一批元组。可以保证每一个元组都被发送且只发送一个,且同一个批次所发送的元组是一样的。

    非事务型spout:没有任何保障,发完就算。

    透明事务型spout:同一个批次发送的元组有可能不同的,它可以保证每一个元组都被发送且只发送一次,但不能保证重放时同一个批次的数据是一样的。这对于部分失效的情况尤其有用,假如以kafka作为spout,当一个topic的某个分区失效时,可以用其它分区的数据先形成一个批次发送出去,如果是事务型spout,则必须等待那个分区恢复后才能继续发送。

    这三种类型可以分别通过实现ITransactionalSpout、ITridentSpout、IOpaquePartitionedTridentSpout接口来定义。

     

    (2)state

    state的类型指定了如果将storm的中间输出或者最终输出持久化到某个地方(如内存),当某个批次的数据重放时应该如果更新状态。state对于下游出现错误的情况尤其有用。

    事务型状态:同一批次tuple提供的结果是相同的。

    非事务型状态:没有回滚能力,更新操作是永久的。

    透明事务型状态:更新操作基于先前的值,这样由于这批数据发生变化,对应的结果也会发生变化。透明事务型状态除了保存当前数据外,还要保存上一批数据,当数据重放时,可以基于上一批数据作更新。




    (二)看官方提供的示例

    package org.ljh.tridentdemo;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.LocalDRPC;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import storm.trident.TridentState;
    import storm.trident.TridentTopology;
    import storm.trident.operation.BaseFunction;
    import storm.trident.operation.TridentCollector;
    import storm.trident.operation.builtin.Count;
    import storm.trident.operation.builtin.FilterNull;
    import storm.trident.operation.builtin.MapGet;
    import storm.trident.operation.builtin.Sum;
    import storm.trident.testing.FixedBatchSpout;
    import storm.trident.testing.MemoryMapState;
    import storm.trident.tuple.TridentTuple;
    
    
    public class TridentWordCount {
        public static class Split extends BaseFunction {
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                String sentence = tuple.getString(0);
                for (String word : sentence.split(" ")) {
                    collector.emit(new Values(word));
                }
            }
        }
    
        public static StormTopology buildTopology(LocalDRPC drpc) {
            FixedBatchSpout spout =
                    new FixedBatchSpout(new Fields("sentence"), 3, new Values(
                            "the cow jumped over the moon"), new Values(
                            "the man went to the store and bought some candy"), new Values(
                            "four score and seven years ago"),
                            new Values("how many apples can you eat"), new Values(
                                    "to be or not to be the person"));
            spout.setCycle(true);
    
            //创建拓扑对象
            TridentTopology topology = new TridentTopology();
            
            //这个流程用于统计单词数据,结果将被保存在wordCounts中
            TridentState wordCounts =
                    topology.newStream("spout1", spout)
                            .parallelismHint(16)
                            .each(new Fields("sentence"), new Split(), new Fields("word"))
                            .groupBy(new Fields("word"))
                            .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                                    new Fields("count")).parallelismHint(16);
            //这个流程用于查询上面的统计结果
            topology.newDRPCStream("words", drpc)
                    .each(new Fields("args"), new Split(), new Fields("word"))
                    .groupBy(new Fields("word"))
                    .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                    .each(new Fields("count"), new FilterNull())
                   .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
            return topology.build();
        }
    
        public static void main(String[] args) throws Exception {
            Config conf = new Config();
            conf.setMaxSpoutPending(20);
            if (args.length == 0) {
                LocalDRPC drpc = new LocalDRPC();
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
                for (int i = 0; i < 100; i++) {
                    System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
                    Thread.sleep(1000);
                }
            } else {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
            }
        }
    }
    





    实例实现了最基本的wordcount功能,然后将结果输出。关键步骤如下:



    1、定义了输入流

            FixedBatchSpout spout =
                    new FixedBatchSpout(new Fields("sentence"), 3, new Values(
                            "the cow jumped over the moon"), new Values(
                            "the man went to the store and bought some candy"), new Values(
                            "four score and seven years ago"),
                            new Values("how many apples can you eat"), new Values(
                                    "to be or not to be the person"));
            spout.setCycle(true);



    (1)使用FixedBatchSpout创建一个输入spout,spout的输出字段为sentence,每3个元组作为一个batch。
    (2)数据不断的重复发送。


    2、统计单词数量

            TridentState wordCounts =
                    topology.newStream("spout1", spout)
                            .parallelismHint(16)
                            .each(new Fields("sentence"), new Split(), new Fields("word"))
                            .groupBy(new Fields("word"))
                            .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                                    new Fields("count")).parallelismHint(16);



    这个流程用于统计单词数据,结果将被保存在wordCounts中。6行代码的含义分别为:

    (1)首先从spout中读取消息,spout1定义了zookeeper中用于保存这个拓扑的节点名称。

    (2)并行度设置为16,即16个线程同时从spout中读取消息。

    (3)each中的三个参数分别为:输入字段名称,处理函数,输出字段名称。即从字段名称叫sentence的数据流中读取数据,然后经过new Split()处理后,以word作为字段名发送出去。其中new Split()后面介绍,它的功能就是将输入的内容以空格为界作了切分。

    (4)将字段名称为word的数据流作分组,即相同值的放在一组。

    (5)将已经分好组的数据作统计,结果放到MemoryMapState,然后以count作为字段名称将结果发送出去。这步骤会同时存储数据及状态,并将返回TridentState对象。

    (6)并行度设置。

    3、输出统计结果

            topology.newDRPCStream("words", drpc)
                    .each(new Fields("args"), new Split(), new Fields("word"))
                    .groupBy(new Fields("word"))
                    .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                    .each(new Fields("count"), new FilterNull())
                   .aggregate(new Fields("count"), new Sum(), new Fields("sum"));



    这个流程从上述的wordCounts对象中读取结果,并返回。6行代码的含义分别为:

    (1)等待一个drpc调用,从drpc服务器中接受words的调用来提供消息。调用代码如下:

    drpc.execute("words", "cat the dog jumped")
    (2)输入为上述调用中提供的参数,经过Split()后,以word作为字段名称发送出去。

    (3)以word的值作分组。

    (4)从wordCounts对象中查询结果。4个参数分别代表:数据来源,输入数据,内置方法(用于从map中根据key来查找value),输出名称。

    (5)过滤掉空的查询结果,如本例中,cat和dog都没有结果。

    (6)将结果作统计,并以sum作为字段名称发送出去,这也是DRPC调用所返回的结果。如果没有这一行,最后的输出结果

    DRPC RESULT: [["cat the dog jumped","the",2310],["cat the dog jumped","jumped",462]]
    加上这一行后,结果为:
    DRPC RESULT: [[180]]

    4、split的字义

        public static class Split extends BaseFunction {
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                String sentence = tuple.getString(0);
                for (String word : sentence.split(" ")) {
                    collector.emit(new Values(word));
                }
            }
        }


    注意它最后会发送数据。

    5、创建并启动拓扑

        public static void main(String[] args) throws Exception {
            Config conf = new Config();
            conf.setMaxSpoutPending(20);
            if (args.length == 0) {
                LocalDRPC drpc = new LocalDRPC();
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
                for (int i = 0; i < 100; i++) {
                    System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
                    Thread.sleep(1000);
                }
            } else {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
            }
        }


    (1)当无参数运行时,启动一个本地的集群,及自已创建一个drpc对象来输入。
    (2)当有参数运行时,设置worker数量为3,然后提交拓扑到集群,并等待远程的drpc调用。


     

    (三)使用kafka作为数据源的一个例子

     

     

    package com.netease.sytopology;
    
    import java.io.File;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.Arrays;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    import storm.kafka.BrokerHosts;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import storm.kafka.trident.OpaqueTridentKafkaSpout;
    import storm.kafka.trident.TridentKafkaConfig;
    import storm.trident.TridentTopology;
    import storm.trident.operation.BaseFunction;
    import storm.trident.operation.TridentCollector;
    import storm.trident.operation.builtin.Count;
    import storm.trident.testing.MemoryMapState;
    import storm.trident.tuple.TridentTuple;
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    /*
     * 本类完成以下内容
     */
    public class SyTopology {
    
        public static final Logger LOG = LoggerFactory.getLogger(SyTopology.class);
    
        private final BrokerHosts brokerHosts;
    
        public SyTopology(String kafkaZookeeper) {
            brokerHosts = new ZkHosts(kafkaZookeeper);
        }
    
        public StormTopology buildTopology() {
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            // TransactionalTridentKafkaSpout kafkaSpout = new
            // TransactionalTridentKafkaSpout(kafkaConfig);
            OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
            TridentTopology topology = new TridentTopology();
    
            // TridentState wordCounts =
            topology.newStream("kafka4", kafkaSpout).
            each(new Fields("str"), new Split(),
                    new Fields("word")).groupBy(new Fields("word"))
                    .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                            new Fields("count")).parallelismHint(16);
            // .persistentAggregate(new HazelCastStateFactory(), new Count(),
            // new Fields("aggregates_words")).parallelismHint(2);
    
    
            return topology.build();
        }
    
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
            String kafkaZk = args[0];
            SyTopology topology = new SyTopology(kafkaZk);
            Config config = new Config();
            config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);
    
            String name = args[1];
            String dockerIp = args[2];
            config.setNumWorkers(9);
            config.setMaxTaskParallelism(5);
            config.put(Config.NIMBUS_HOST, dockerIp);
            config.put(Config.NIMBUS_THRIFT_PORT, 6627);
            config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
            config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));
            StormSubmitter.submitTopology(name, config, topology.buildTopology());
    
        }
    
        static class Split extends BaseFunction {
            public void execute(TridentTuple tuple, TridentCollector collector) {
                String sentence = tuple.getString(0);
                for (String word : sentence.split(",")) {
                    try {
                        FileWriter fw = new FileWriter(new File("/home/data/test/ma30/ma30.txt"),true);
                        fw.write(word);
                        fw.flush();
                        fw.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    collector.emit(new Values(word));
                    
                }
            }
        }
    }
    



     

    本例将从kafka中读取消息,然后对消息根据“,”作拆分,并写入一个本地文件。
    1、定义kafka想着配置
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

            OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

    其中ma30是订阅的topic名称。


    2、从kafka中读取消息并处理

            topology.newStream("kafka4", kafkaSpout).
            each(new Fields("str"), new Split(),new Fields("word")).
            groupBy(new Fields("word"))
            .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                            new Fields("count")).parallelismHint(16);

    (1)指定了数据来源,并指定zookeeper中用于保存数据的位置,即保存在/transactional/kafka4。
    (2)指定处理方法及发射的字段。
    (3)根据word作分组。

    (4)计数后将状态写入MemoryMapState

     

    提交拓扑:

    storm jar target/sytopology2-0.0.1-SNAPSHOT.jar com.netease.sytopology.SyTopology 192.168.172.98:2181/kafka test3 192.168.172.98

    此时可以在/home/data/test/ma30/ma30.txt看到split的结果

     

    转载于:https://www.cnblogs.com/jinhong-lu/p/4634980.html

    展开全文
  • substance和trident javaGUI界面美化用到的包substance和trident javaGUI界面美化用到的包
  • 利用Trident topology实现预测疾病暴发的实例完整实例源码,具体详情参见博文:http://blog.csdn.net/l1028386804/article/details/79120204
  • Trident

    2018-11-30 23:14:00
    Trident是一个基于Storm构建的上层的Micro-Batching系统,它简化了Storm的拓扑构建过程并且提供了类似于窗口、聚合以及状态管理等等没有被Storm原生支持的功能 转载于:...

    Trident 是一个基于Storm构建的上层的Micro-Batching系统,它简化了Storm的拓扑构建过程并且提供了类似于窗口、聚合以及状态管理等等没有被Storm原生支持的功能

    转载于:https://www.cnblogs.com/huiandong/p/10047305.html

    展开全文
  • Storm_Trident

    2016-08-18 14:42:07
    storm_Trident例子
  • 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial----------------Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时...

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial

    ----------------

    Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批量处理工具很了解的话,那么应该毕竟容易理解Trident,因为他们之间很多的概念和思想都是类似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理。


    举例说明

    让我们一起来看一个Trident的例子。在这个例子中,我们主要做了两件事情:

    1. 从一个流式输入中读取语句病计算每个单词的个数
    2. 提供查询给定单词列表中每个单词当前总数的功能
    因为这只是一个例子,我们会从如下这样一个无限的输入流中读取语句作为输入:

    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                   new Values("the cow jumped over the moon"),
                   new Values("the man went to the store and bought some candy"),
                   new Values("four score and seven years ago"),
                   new Values("how many apples can you eat"),
    spout.setCycle(true);
    

    这个spout会循环输出列出的那些语句到sentence stream当中,下面的代码会以这个stream作为输入并计算每个单词的个数:

    TridentTopology topology = new TridentTopology();        
    TridentState wordCounts =
         topology.newStream("spout1", spout)
           .each(new Fields("sentence"), new Split(), new Fields("word"))
           .groupBy(new Fields("word"))
           .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
           .parallelismHint(6);
    

    让我们一起来读一下这段代码。我们首先创建了一个TridentTopology对象。TridentTopology类相应的接口来构造Trident计算过程中的所有内容。我们在调用了TridentTopology类的newStream方法时,传入了一个spout对象,spout对象会从外部读取数据并输出到当前topology当中,从而在topology中创建了一个新的数据流。在这个例子中,我们使用了上面定义的FixedBatchSpout对象。输入数据源同样也可以是如Kestrel或者Kafka这样的队列服务。Trident会再Zookeeper中保存一小部分状态信息来追踪数据的处理情况,而在代码中我们指定的字符串“spout1”就是Zookeeper中用来存储metadata信息的Znode节点

    Trident在处理输入stream的时候会把输入转换成若干个tuple的batch来处理。比如说,输入的sentence stream可能会被拆分成如下的batch:

    Batched stream

    一般来说,这些小的batch中的tuple可能会在数千或者数百万这样的数量级,这完全取决于你的输入的吞吐量。

    Trident提供了一系列非常成熟的批量处理的API来处理这些小batch. 这些API和你在Pig或者Cascading中看到的非常类似, 你可以做group by's, joins, aggregations, 运行 functions, 执行 filters等等。当然,独立的处理每个小的batch并不是非常有趣的事情,所以Trident提供了很多功能来实现batch之间的聚合的结果并可以将这些聚合的结果存储到内存,Memcached, Cassandra或者是一些其他的存储中。同时,Trident还提供了非常好的功能来查询实时状态。这些实时状态可以被Trident更新,同时它也可以是一个独立的状态源。

    回到我们的这个例子中来,spout输出了一个只有单一字段“sentence”的数据流。在下一行,topology使用了Split函数来拆分stream中的每一个tuple,Split函数读取输入流中的“sentence”字段并将其拆分成若干个word tuple。每一个sentence tuple可能会被转换成多个word tuple,比如说"the cow jumped over the moon" 会被转换成6个 "word" tuples. 下面是Split的定义:

    public class Split extends BaseFunction {
       public void execute(TridentTuple tuple, TridentCollector collector) {
           String sentence = tuple.getString(0);
           for(String word: sentence.split(" ")) {
               collector.emit(new Values(word));                
           }
       }
    }
    

    如你所见,真的很简单。它只是简单的根据空格拆分sentence,并将拆分出的每个单词作为一个tuple输出。

    topology的其他部分计算单词的个数并将计算结果保存到了持久存储中。首先,word stream被根据“word”字段进行group操作,然后每一个group使用Count聚合器进行持久化聚合。persistentAggregate会帮助你把一个状态源聚合的结果存储或者更新到存储当中。在这个例子中,单词的数量被保持在内存中,不过我们可以很简单的把这些数据保存到其他的存储当中,如 Memcached, Cassandra等。如果我们要把结果存储到Memcached中,只是简单的使用下面这句话替换掉persistentAggregate就可以,这当中的"serverLocations"是Memcached cluster的主机和端口号列表:

    .persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
    MemcachedState.transactional()
    

    persistentAggregate存储的数据就是所有batch聚合的结果。

    Trident非常酷的一点就是它是完全容错的,拥有者有且只有一次处理的语义。这就让你可以很轻松的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复这些状态。

    persistentAggregate方法会把数据流转换成一个TridentState对象。在这个例子当中,TridentState对象代表了所有的单词的数量。我们会使用这个TridentState对象来实现在计算过程中的进行分布式查询。

    下面这部分实现了一个低延时的单词数量的分布式查询。这个查询以一个用空格分割的单词列表为输入,并返回这些单词当天的个数。这些查询是想普通的RPC调用那样被执行的,要说不同的话,那就是他们在后台是并行执行的。下面是执行查询的一个例子:

    DRPCClient client = new DRPCClient("drpc.server.location", 3772);
    System.out.println(client.execute("words", "cat dog the man");
    // prints the JSON-encoded result, e.g.: "[[5078]]"
    

    如你所见,除了这是并发执行在storm cluster上之外,这看上去就是一个正常的RPC调用。这样的简单查询的延时通常在10毫秒左右。当然,更负责的DRPC调用可能会占用更长的时间,尽管延时很大程度上是取决于你给计算分配了多少资源。

    这个分布式查询的实现如下所示:

    topology.newDRPCStream("words")
           .each(new Fields("args"), new Split(), new Fields("word"))
           .groupBy(new Fields("word"))
           .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
           .each(new Fields("count"), new FilterNull())
           .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
    

    我们仍然是使用TridentTopology对象来创建DRPC stream,并且我们将这个函数命名为“words”。这个函数名会作为第一个参数在使用DRPC Client来执行查询的时候用到。

    每个DRPC请求会被当做只有一个tuple的batch来处理。在处理的过程中,以这个输入的单一tuple来表示这个请求。这个tuple包含了一个叫做“args”的字段,在这个字段中保存了客户端提供的查询参数。在这个例子中,这个参数是一个以空格分割的单词列表。

    首先,我们使用Splict功能把入参拆分成独立的单词。然后对“word” 进行group by操作,之后就可以使用stateQuery来在上面代码中创建的TridentState对象上进行查询。stateQuery接受一个数据源(在这个例子中,就是我们的topolgoy所计算的单词的个数)以及一个用于查询的函数作为输入。在这个例子中,我们使用了MapGet函数来获取每个单词的出现个数。由于DRPC stream是使用跟TridentState完全同样的group方式(按照“word”字段进行group),每个单词的查询会被路由到TridentState对象管理和更新这个单词的分区去执行。

    接下来,我们用FilterNull这个过滤器把从未出现过的单词给去掉,并使用Sum这个聚合器将这些count累加起来。最终,Trident会自动把这个结果发送回等待的客户端。

    Trident在如何最大程度的保证执行topogloy性能方面是非常智能的。在topology中会自动的发生两件非常有意思的事情:

    1. 读取和更新状态的操作 (比如说 persistentAggregate 和 stateQuery) 会自动的是batch的形式操作状态。 如果有20次更新需要被同步到存储中,Trident会自动的把这些操作汇总到一起,只做一次读一次写,而不是进行20次读20次写的操作。因此你可以在很方便的执行计算的同时,保证了非常好的性能。
    2. Trident的聚合器已经是被优化的非常好了的。Trident并不是简单的把一个group中所有的tuples都发送到同一个机器上面进行聚合,而是在发送之前已经进行过一次部分的聚合。打个比方,Count聚合器会先在每个partition上面进行count,然后把每个分片count汇总到一起就得到了最终的count。这个技术其实就跟MapReduce里面的combiner是一个思想。

    让我们再来看一下Trident的另外一个例子。

    Reach

    下一个例子是一个纯粹的DRPC topology。这个topology会计算一个给定URL的reach。那么什么事reach呢,这里我们将reach定义为有多少个独立用户在Twitter上面expose了一个给定的URL,那么我们就把这个数量叫做这个URL的reach。要计算reach,你需要tweet过这个URL的所有人,然后找到所有follow这些人的人,并将这些follower去重,最后就得到了去重后的follower的数量。如果把计算reach的整个过程都放在一台机器上面,就太勉强了,因为这会需要进行数千次数据库调用以及上一次的tuple的读取。如果使用Storm和Trident,你就可以把这些计算步骤在整个cluster中进行并发。

    这个topology会读取两个state源。一个用来保存URL以及tweet这个URL的人的关系的数据库。还有一个保持人和他的follower的关系的数据库。topology的定义如下:

    TridentState urlToTweeters =
           topology.newStaticState(getUrlToTweetersState());
    TridentState tweetersToFollowers =
           topology.newStaticState(getTweeterToFollowersState());
    
    topology.newDRPCStream("reach")
           .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
           .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
           .shuffle()
           .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
           .parallelismHint(200)
           .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
           .groupBy(new Fields("follower"))
           .aggregate(new One(), new Fields("one"))
           .parallelismHint(20)
           .aggregate(new Count(), new Fields("reach"));
    

    这个topology使用newStaticState方法创建了TridentState对象来代表一种外部存储。使用这个TridentState对象,我们就可以在这个topology上面进行动态查询了。和所有的状态源一样,在数据库上面的查找会自动被批量执行,从而最大程度的提升效率。

    这个topology的定义是非常直观的 - 只是一个简单的批量处理job。首先,查询urlToTweeters数据库来得到tweet过这个URL的人员列表。这个查询会返回一个列表,因此我们使用ExpandList函数来把每一个反悔的tweeter转换成一个tuple。

    接下来,我们来获取每个tweeter的follower。我们使用shuffle来把要处理的tweeter分布到toplology运行的每一个worker中并发去处理。然后查询follower数据库从而的到每个tweeter的follower。你可以看到我们为topology的这部分分配了很大的并行度,这是因为这部分是整个topology中最耗资源的计算部分。

    然后我们在follower上面使用group by操作进行分组,并对每个组使用一个聚合器。这个聚合器只是简单的针对每个组输出一个tuple “One”,再count “One” 从而的到不同的follower的数量。“One”聚合器的定义如下:

    public class One implements CombinerAggregator<Integer> {
       public Integer init(TridentTuple tuple) {
           return 1;
       }
    
       public Integer combine(Integer val1, Integer val2) {
           return 1;
       }
    
       public Integer zero() {
           return 1;
       }        
    }
    

    这是一个"汇总聚合器", 它会在传送结果到其他worker汇总之前进行局部汇总,从而来最大程度上提升性能。Sum也是一个汇总聚合器,因此以Sum作为topology的最终操作是非常高效的。

    接下来让我们一起来看看Trident的一些细节。

    Fields and tuples

    Trident的数据模型就是TridentTuple - 一个有名的值的列表。在一个topology中,tuple是在一系列的处理操作(operation)中增量生成的。operation一般以一组子弹作为输入并输出一组功能字段。Operation的输入字段经常是输入tuple的一个子集,而功能字段则是operation的输出。

    看一下如下这个例子。假定你有一个叫做“stream”的stream,它包含了“x”,"y"和"z"三个字段。为了运行一个读取“y”作为输入的过滤器MyFilter,你可以这样写:

    stream.each(new Fields("y"), new MyFilter())
    

    假定MyFilter的实现是这样的:

    public class MyFilter extends BaseFilter {
       public boolean isKeep(TridentTuple tuple) {
           return tuple.getInteger(0) < 10;
       }
    }

    这会保留所有“y”字段小于10的tuples。TridentTuple传个MyFilter的输入将只有字段“y”。这里需要注意的是,当选择输入字段时,Trident会自动投影tuple的一个子集,这个操作是非常高效的。

    让我们一起看一下“功能字段”是怎样工作的。假定你有如下这个功能:

    public class AddAndMultiply extends BaseFunction {
       public void execute(TridentTuple tuple, TridentCollector collector) {
           int i1 = tuple.getInteger(0);
           int i2 = tuple.getInteger(1);
           collector.emit(new Values(i1 + i2, i1 * i2));
       }
    }
    

    这个函数接收两个数作为输入并输出两个新的值:“和”和“乘积”。假定你有一个stream,其中包含“x”,"y"和"z"三个字段。你可以这样使用这个函数:

    stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));
    

    输出的功能字段被添加到输入tuple中。因此这个时候,每个tuple中将会有5个字段"x", "y", "z", "added", 和 "multiplied". "added" 和"multiplied"对应于AddAndMultiply输出的第一和第二个字段。

    另外,我们可以使用聚合器来用输出字段来替换输入tuple。如果你有一个stream包含字段"val1"和"val2",你可以这样做:

    stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
    

    output stream将会只包含一个叫做“sum”的字段,这个sum字段就是“val2”的累积和。

    在group之后的stream上,输出将会是被group的字段以及聚合器输出的字段。举例如下:

    stream.groupBy(new Fields("val1"))
         .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
    

    在这个例子中,输出将包含字段"val1" 和 "sum".

    State

    在实时计算领域的一个主要问题就是怎么样来管理状态并能轻松应对错误和重试。消除错误的影响是非常重要的,因为当一个节点死掉,或者一些其他的问题出现时,那行batch需要被重新处理。问题是-你怎样做状态更新来保证每一个消息被处理且只被处理一次?

    这是一个很棘手的问题,我们可以用接下来的例子进一步说明。假定你在做一个你的stream的计数聚合,并且你想要存储运行时的count到一个数据库中去。如果你只是存储这个count到数据库中,并且想要进行一次更新,我们是没有办法知道同样的状态是不是以前已经被update过了的。这次更新可能在之前就尝试过,并且已经成功的更新到了数据库中,不过在后续的步骤中失败了。还有可能是在上次更新数据库的过程中失败的,这些你都不知道。

    Trident通过做下面两件事情来解决这个问题:

    1. 每一个batch被赋予一个唯一标识id“transaction id”。如果一个batch被重试,它将会拥有和之前同样的transaction id
    2. 状态更新是以batch为单位有序进行的。也就是说,batch 3的状态更新必须等到batch 2的状态更新成功之后才可以进行。

    有了这2个原则,你就可以达到有且只有一次更新的目标。你可以将transaction id和count一起以原子的方式存到数据库中。当更新一个count的时候,需要判断数据库中当前batch的transaction id。如果跟要更新的transaction id一样,就跳过这次更新。如果不同,就更新这个count。

    当然,你不需要在topology中手动处理这些逻辑。这些逻辑已经被封装在Stage的抽象中并自动进行。你的Stage object也不需要自己去实习transaction id的跟踪操作。如果你想了解更多的关于如何实现一个Stage以及在容错过程中的一些取舍问题,可以参照这篇文章.

    一个Stage可以采用任何策略来存储状态。它可以存储到一个外部的数据库,也可以在内存中保持状态并备份到HDFS中。Stage并不需要永久的保持状态。比如说,你有一个内存版的Stage实现,它保存最近X个小时的数据并丢弃老的数据。可以把 Memcached integration 作为例子来看看State的实现.

    Execution of Trident topologies

    Trident的topology会被编译成尽可能高效的Storm topology。只有在需要对数据进行repartition的时候(如groupby或者shuffle)才会把tuple通过network发送出去,如果你有一个trident如下:

    Compiling Trident to Storm 1

    它将会被编译成如下的storm topology:

    Compiling Trident to Storm 2

    Conclusion

    Trident使得实时计算更加优雅。你已经看到了如何使用Trident的API来完成大吞吐量的流式计算,状态维护,低延时查询等等功能。Trident让你在获取最大性能的同时,以更自然的一种方式进行实时计算。

    专题一:Trident State 详解

    一、什么是Trident State

    直译过来就是trident状态,这里的状态主要涉及到Trident如何实现一致性语义规则,Trident的计算结果将被如何提交,如何保存,如何更新等等。我们知道Trident的计算都是以batch为单位的,但是batch在中的tuple在处理过程中有可能会失败,失败之后bach又有可能会被重播,这就涉及到很多事务一致性问题。Trident State就是管理这些问题的一套方案,与这套方案对应的就是Trident State API。这样说可能还比较抽象,下面就用一个例子具体说明一下。

    1.1 举例具体例子来说明

    假设有这么一个需求,统计一个数据流中各个单词出现的数量,并把单词和其数量更新到数据库中。假设我们在数据库中只有两个字段,单词和其数量,在计数过程中,如果遇到相同的单词则就把其数量加一。但是这么做有一个问题,如果某个单词是被重播的单词,就有可能导致这个单词被多加了一遍。因此,在数据库中只保存单词和其数量两个字段是无法做到“数据只被处理一次”的语义要求的。

    1.2 Trident是怎么解决这个问题的呢?

    Trident定义了如下语义规则:

    1. 所有的Tuple都是以batch的形式处理的
    2. 每个batch都会被分配一个唯一的“transaction id”(txid),如果batch被重发,txid不变
    3. 各个batch状态的更新是有序的,也就是说batch2一定会在batch3之前更新

    有了这三个规则,我们就可以通过txid知道batch是否被处理过,然后就可以根据实际情况来更新状态信息了。很明显,要满足这几个语义规则,就需要spout来支持,因为把tuple封装成batch,分配txid等等都是有spout来负责的。

    但是在具体应用场景中,storm应该能够提供不同的容错级别,因为某些情况下我们并不需要强一致性。为了更灵活的处理,Trident提供了三类spout,分别是:

    1. Transactional spouts : 事务spout,提供了强一致性
    2. Opaque Transactional spouts:不透明事务spout,提供了弱一致性
    3. No-Transactional spouts:非事务spout,对一致性无法保证
    
    • 注意,所有的Trident Spout都是以batch的形式发送数据,每个batch也都会分配一个唯一的txid,决定它们有不同性质的地方在于它们对各自的batch提供了什么样的保证

    1.3 Trident State的类型

    我们已经知道Trident 提供了三种类型的spout来服务Trident State管理,那么对应的Trident State也有三种类型:

    1. Transactional
    2. Opaque Transactional 
    3. No-Transactional
    
    • 二、各类Trident Spout详解

    2.1 Transactional spouts

    Transactional spouts对batch的发送提供了如下保证:

    1. 相同txid的batch完全一样,如果一个batch被重播,重播的batch的txid及其所有tuple和原batch的完全一致
    2. 两个batch中的tuple不会有重合
    3. 每个tuple都在batch中,不会有batch漏掉某个tuple

    这三个特性是“最完美”的保证,也最容易理解,Stream被分割成固定的batch,而且不会改变。Storm就提供了一个Transactional spout的实现:TransactionalTridentKafkaSpout

    我们现在再看上面1.1节提到的那个实例,我们要把单词和其数量保存在数据库中,为了保证“数据只被处理一次”,除了要保存单词和数量两个字段之外,我们再加一个字段txid。在更新数据时,我们先对比一下当前的数据的txid和数据库中数据的txid,若txid相同,说明是被重播的数据,直接跳过即可,如果不同,则把两个数值相加即可。

    下面具体说明一下,假设当前处理的batch的txid=3,其中的tuples为:

    [man]
    [man]
    [dog]
    
    • 再假设数据库中保存的数据为:
    man => [count=3, txid=1]
    dog => [count=4, txid=3]
    apple => [count=10, txid=2]
    
    • 数据库中“man”单词的txid为1,而当前batch的txid为3,说明当前batch中的“man”单词未被累加过,所以需要把当前batch中”man”的个数累加到数据库中。数据库中“dog”单词的txid为3,和当前batch的txid相同,说明已经被累计过了直接跳过。最终数据库中的结果变为:
    man => [count=5, txid=1]
    dog => [count=4, txid=3]
    apple => [count=10, txid=2]
    
    • 总结一下整个处理过程:
    if(database txid=current txid){//两次更新的txid相同
         跳过;
    }else{
         用current value替换掉database value;
    }
    
    • 2.2 Opaque Transactional spouts

    上面已经提到过,并不是所有情形下都需要保证强一致性。例如在TransactionalTridentKafkaSpout中(关于Kafka相关介绍,点这里),如果它的一个batch中的tuples来自一个topic的所有partitions,如果要满足Transactionnal Spout语义的话,一旦这个batch因为某些失败而被重发,重发batch中的所有tuple必须与这个batch中的完全一致,而恰好kafka集群某个节点down掉导致这个topic其中一个partition无法使用,那么就会导致这个batch无法凑齐所有tuple(无法获取失败partition上的数据),整个处理过程被挂起。而Opaque Transactional spouts就可以解决这个问题。

    Opaque Transactional spouts提供了如下保证:

    - 每个tuple只在一个batch中被成功处理,如果一个batch中的tuple处理失败的话,会被后面的batch继续处理
    
    • 怎么理解这个特性呢,简要来说就OpaqueTransactional spout和Transactional spouts基本差不多,只是在Opaque Transactional spout中,相同txid的batch中的tuple集合可能不一样OpaqueTridentKafkaSpout就是符合这种特性的spout的,所以它可以容忍kafka节点失败。

    因为重播的batch中的tuple集合可能不一样,所以对于Opaque Transactional Spout,就不能根据txid是否一致来决定是否需要更新状态了。我们需要在数据库中保存更多的状态信息,除了单词名,数量、txid之外,我们还需要保存一个pre-value来记录前一次计算的值。我们再用上面例子具体说明一下。

    假设数据库中的记录如下:

    { value = 4,
      preValue = 1,
      txid = 2
    }
    
    • 假设当前batch的count值为2,txid=3。因为当前txid和数据库中的不同,我们需要把preValue替换成value的值,累计value值,然后更新txid值为3,结果如下:
    { value = 6,
      prevValue = 4,
      txid = 3
    }
    
    • 再假设当前batch的count值为1,txid=2。这是当前txid和数据库中的相同,虽然两个txid值相同,但由于两个batch的内容已经变了,所以上次的更新可以忽略掉,需要对数据库中的value值进行重新计算,即把当前值和preValue值相加,结果如下:
    { value =3,
      prevValue = 1,
      txid = 2
    }
    
    • 总结一下整个处理过程:
    if(database txid=current txid){
     value=preValue+current value;//重新更新value
     //preValue不变;
    }else{
     preValue=value;//更新preValue
     value=preValue+current value;//更新value
     txid=current txid;//更新txid
    }
    
    • 2.3 No-Transactional spouts

    No-Transactional spouts对每个batch的内容不做任何保证。如果失败的batch没被重发,它有会出现“最多被处理一次”的请况,如果tuples被多个batch处理,则会发生“最少被处理一次的情况”,很难保证“数据只被处理一次”的情况。

    三、Spout和State的类型总结

    下面这个表格描述了“数据只被处理一次”的spout/state的类型组合:

    这里写图片描述

    总的来说, Opaque transactional states即有一定的容错性又能保证数据一致性,但它的代价是需要在数据库中保存更多的状态信息(txid和preValue)。Transactional states虽然需要较少的状态信息(txid),但是它需要transactional spouts的支持。non-transactional states需要在数据库中保存最少的状态信息但难以保证“数据只被处理一次”的语义。

    因此,在实际应用中,spout和state类型的选择需要根据我们具体应用需求来决定,当然在容错性和增加存储代价之间也需要做个权衡。

    四、State API

    上面讲的看上去有点啰嗦,庆幸的是Trident State API 在内部为我们实现了所有状态管理的逻辑,我们不需要再进行诸如对比txid,在数据库中存储多个值等操作,仅需要简单调用Trident API即可,例如:

    TridentTopology topology = new TridentTopology();       
    TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))               
        .parallelismHint(6);
    
    • 所有的管理Opaque transactional states状态的逻辑都在MemcachedState.opaque()方法内部实现了。另外,所有的更新操作都是以batch为单位的,这样减少了对数据库的调用次数,极大的提高了效率。下面就向大家介绍一下和Trident State 相关的API。

    4.1 State接口

    Trident API中最基本的State接口只有两个方法:

    public interface State {
        void beginCommit(Long txid);
        void commit(Long txid);
    }
    
    • State接口只定义了状态什么时候开始更新,什么时候结束更新,并且我们都能获得一个txid。具体这个State如何工作,如何更新State,如何查询State,Trident并没有对此作出限制,我们可以自己任意实现。

    假设我们有一个Location数据库,我们要通过Trident查新和更新这个数据库,那么我们可以自己实现这样一个LocationDB State,因为我们需要查询和更新,所以我们为这个LocationDB 可以添加对Location的get和set的实现:

    public class LocationDB implements State {
        public void beginCommit(Long txid) {   
        }
        public void commit(Long txid) {   
        }
        public void setLocation(long userId, String location) {
      // code to access database and set location
        }
        public String getLocation(long userId) {
      // code to get location from database
        }
     }
    
    • 4.2 StateFactory工厂接口

    Trident提供了State Factory接口,我们实现了这个接口之后,Trident 就可以通过这个接口获得具体的Trident State实例了,下面我们就实现一个可以制造LocationDB实例的LocationDBFactory:

    public class LocationDBFactory implements StateFactory {
       public State makeState(Map conf, int partitionIndex, int numPartitions) {
          return new LocationDB();
       } 
    }
    
    • 4.3 QueryFunction接口

    这个接口是用来帮助Trident查询一个State,这个接口定义了两个方法:

    public interface QueryFunction<S extends State, T> extends EachOperation {
        List<T> batchRetrieve(S state, List<TridentTuple> args);
        void execute(TridentTuple tuple, T result, TridentCollector collector);
    }
    
    • 接口的第一个方法batchRetrieve()有两个参数,分别是要查询的State源和查询参数,因为trident都是以batch为单位处理的,所以这个查询参数是一个List<TridentTuple>集合。关于第二个方法execute()有三个参数,第一个代表查询参数中的某个tuple,第二个代表这个查询参数tuple对应的查询结果,第三个则是一个消息发送器。下面就看一个QuaryLocation的实例:
    public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
        public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<String> ret = new ArrayList();
        for(TridentTuple input: inputs) {
            ret.add(state.getLocation(input.getLong(0)));
        }
        return ret;
    }
    
    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
        }   
    }
    
    • QueryLocation接收到Trident发送的查询参数,参数是一个batch,batch中tuple内容是userId信息,然后batchRetrieve()方法负责从State源中获取每个userId对应的的location。最终batchRetrieve()查询的结果会被execute()方法发送出去。

    但这里有个问题,batchRetrieve()方法中针对每个userid都做了一次查询State操作,这样处理显然效率不高,也不符合Trident所有操作都是针对batch的原则。所以,我们要对LocationDB这个State做一下改造,提供一个bulkGetLocations()方法来替换掉getLocation()方法,请看改造后的LocationDB的实现:

    public class LocationDB implements State {
        public void beginCommit(Long txid) {   
        }
        public void commit(Long txid) {   
        }
        public void setLocationsBulk(List<Long> userIds, List<String> locations) {
      // set locations in bulk
        }
        public List<String> bulkGetLocations(List<Long> userIds) {
      // get locations in bulk
        }
     }
    
    • 我们可以看到,改造的LocationDB对Location的查询和更新都是批量操作的,这样显然可以提高处理效率。此时,我们再稍微改一下QueryFunction中batchRetrieve()方法:
    public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
        public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
            List<Long> userIds = new ArrayList<Long>();
            for(TridentTuple input: inputs) {
                userIds.add(input.getLong(0));
            }
            return state.bulkGetLocations(userIds);
        }
    
        public void execute(TridentTuple tuple, String location, TridentCollector collector) {
            collector.emit(new Values(location));
        }   
    }
    
    • QueryLocation在topology中可以这么使用:
    TridentTopology topology = new TridentTopology();
    topology.newStream("myspout", spout)
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
    
    • 4.4 UpdateState接口

    当我们要更新一个State源时,我们需要实现一个UpdateState接口。UpdateState接口只提供了一个方法:

    public interface StateUpdater<S extends State> extends Operation {
        void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);
    }
    
    • 下面我们来具体看一下LocationUpdater的实现:
    public class LocationUpdater extends BaseStateUpdater<LocationDB> {
        public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
            List<Long> ids = new ArrayList<Long>();
            List<String> locations = new ArrayList<String>();
            for(TridentTuple t: tuples) {
                ids.add(t.getLong(0));
                locations.add(t.getString(1));
            }
            state.setLocationsBulk(ids, locations);
        }
    }
    
    • 对于LocationUpdater在topology中可以这么使用:
    TridentTopology topology = new TridentTopology();
    TridentState locations = 
    topology.newStream("locations", locationsSpout)
    
    • 通过调用Trident Stream的partitionPersist方法可以更新一个State。在上面这个实例中,LocationUpdater接收一个State和要更新的batch,最终通过调用LocationFactory制造的LocationDB中的setLocationsBulk()方法把batch中的userid及其location批量更新到State中。

    partitionPersist操作会返回一个TridentState对象,这个对象即是被TridentTopology更新后的LocationDB,所以,我们可以在topology中续继续对这个返回的State做查询操作。

    另外一点需要注意的是,从上面StateUpdater接口可以看出,在它的updateState()方法中还提供了一个TridentCollector,因此在执行StateUpdate的同时仍然可以形成一个新的Stream。若要操作StateUpdater形成的Stream,可以通过调用TridentState.newValueStream()方法实现。

    五、persistentAggregate

    Trident另一个update state的方法时persistentAggregate,请看下面word count的例子:

    TridentTopology topology = new TridentTopology();       
    TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
    
    • 5.1 MapState接口

    persistentAggregate是在partitionPersist之上的另一个抽象,它会对Trident Stream进行聚合之后再把聚合结果更新到State中。在上面这个例子中,因为聚合的是一个groupedStream,Trident要求这种情况下State需要实现MapState接口,被grouped的字段会被做为MapSate的key,被grouped的数据计算的结果会被做为MapSate的value。MapSate接口定义如下:

    public interface MapState<T> extends State {
        List<T> multiGet(List<List<Object>> keys);
        List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
        void multiPut(List<List<Object>> keys, List<T> vals);
    }
    

    5.2 Snapshottable接口

    如果我们聚合的不是一个groupedStream,Trident要求我们的State实现Snapshottable接口:

    public interface Snapshottable<T> extends State {
        T get();
        T update(ValueUpdater updater);
        void set(T o);
    }
    

    六、Map States的实现

    在Trident中实现MapState很简单,大部分工作Trident已经替我们做了。OpaqueMap,TransactionalMap, 和NonTransactionalMap类已经替我们完成了和容错相关的处理逻辑. 我们仅仅提供一个 IBackingMap的实现类即可, IBackingMap的接口定义如下:

    public interface IBackingMap<T> {
        List<T> multiGet(List<List<Object>> keys); 
        void multiPut(List<List<Object>> keys, List<T> vals); 
    }
    
    • OpaqueMap’s调用的multiPut将会把value值自动封装成OpaqueValue来处理, TransactionalMap’s 将会把value封装成TransactionalValue再进行处理, 而NonTransactionalMaps 则不会对value做处理,直接传递给topology。

    另外, 
    Trident提供的CachedMap 类会对Map中的key/value做自动的LRU缓存 。 
    Trident提供的SnapshottableMap类会把MapState转换成SnapShottable对象(把MapState中的所有key/value对聚合成一个固定的key)。

    详细更详细的了解整个MapState的实现过程,请查看 MemcachedState 的实现,MemcachedState除了把上面介绍的相关接口整合到一起之外,还提供了对opaque transactional, transactional, non-transactional三个语义规则的支持。



    Storm专题二:Storm Trident API 使用详解


    在Trident中有五种操作类型:
    1. Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输     
    2. Repartitioning:数据流重定向,单纯的改变数据流向,不会改变数据内容,这部分会有网络传输
    3. Aggragation:聚合操作,会有网络传输
    4. Grouped streams上的操作
    5. Merge和Join
    小结:上面提到了Trident实际上是通过把函数应用到每个节点的Batch上的数据以实现并行,而应用的这些函数就是TridentAPI,下面我们就具体介绍一下TridentAPI的各种操作。   

    二、Trident五种操作详解

    2.1 Apply Locally本地操作:操作都应用在本地节点的Batch上,不会产生网络传输

    2.1.1 Functions:函数操作

         函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面,如果一个function不输出tuple,那就意味这这个tuple被过滤掉了,下面举例说明:
    • 定义一个Function:
    [java]  view plain  copy
    1. public class MyFunction extends BaseFunction {  
    2.   @Override  
    3.   public void execute(TridentTuple tuple, TridentCollector collector) {  
    4.        for ( int i = 0; i < tuple.getInteger(0); i++) {  
    5.           collector.emit( new Values(i));  
    6.       }  
    7.  }  
         小结:Function实际上就是对经过Function函的tuple做一些操作以改变其内容。
    • 比如我们处理一个“mystream”的数据流,它有三个字段分别是[“a”, “b”, “c”] ,数据流中tuple的内容是:
         [1, 2, 3] [4, 1, 6] [3, 0, 8]
    • 我们运行我们的Function:  
    [java]  view plain  copy
    1. java mystream.each(new Fields("b"), new MyFunction(), new Fields("d")));  
         它意思是接收输入的每个tuple “b”字段得值,把函数结算结果做为新字段“d”追加到每个tuple后面,然后发射出去。
    • 最终运行结果会是每个tuple有四个字段[“a”, “b”, “c”, “d”],每个tuple的内容变成了:
         [1, 2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]
        小结:我们注意到,如果一个function发射多个tuple时,每个发射的新tuple中仍会保留原来老tuple的数据。

    2.1.2 Filters:过滤操作
    • Filters很简单,接收一个tuple并决定是否保留这个tuple。举个例子,定义一个Filter:
    [java]  view plain  copy
    1. public class MyFilter extends BaseFilter {  
    2.     public boolean isKeep(TridentTuple tuple) {  
    3.           return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;  
    4.     }  
    5.   }  
    • 假设我们的tuples有这个几个字段 [“a”, “b”, “c”]: 
         [1, 2, 3] [2, 1, 1] [2, 3, 4]
    • 然后运行我们的Filter:
    [java]  view plain  copy
    1. java mystream.each(new Fields("b""a"), new MyFilter());  
    • 则最终得到的tuple是 :
         [2, 1, 1]

         说明第一个和第三个不满足条件,都被过滤掉了。

         小结:Filter就是一个过滤器,它决定是否需要保留当前tuple。

    2.1.3 PartitionAggregate
        PartitionAggregate的作用对每个Partition中的tuple进行聚合,与前面的函数在原tuple后面追加数据不同,PartitionAggregate的输出会直接替换掉输入的tuple,仅数据PartitionAggregate中发射的tuple。下面举例说明:
    • 定义一个累加的PartitionAggregate:
    [java]  view plain  copy
    1. java mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));  
    • 假设我们的Stream包含两个字段 [“a”, “b”],各个Partition的tuple内容是:
         ``` Partition 0: [“a”, 1] [“b”, 2]

         Partition 1: [“a”, 3] [“c”, 8]

         Partition 2: [“e”, 1] [“d”, 9] [“d”, 10] ```
    • 输出的内容只有一个字段“sum”,值是:
         ``` Partition 0: [3]

         Partition 1: [11]

         Partition 2: [20] ```

        TridentAPI提供了三个聚合器的接口:CombinerAggregator, ReducerAggregator, and Aggregator.

    我们先看一下CombinerAggregator接口:     
    [java]  view plain  copy
    1. public interface CombinerAggregator <T> extends Serializable {  
    2.         T init(TridentTuple tuple);  
    3.         T combine(T val1, T val2);  
    4.         T zero();  
    5.    }  
        CombinerAggregator接口只返回一个tuple,并且这个tuple也只包含一个field。init方法会先执行,它负责预处理每一个接收到的tuple,然后再执行combine函数来计算收到的tuples直到最后一个tuple到达,当所有tuple处理完时,CombinerAggregator会发射zero函数的输出,举个例子:
    • 定义一个CombinerAggregator实现来计数:  
    [java]  view plain  copy
    1. public class CombinerCount implements CombinerAggregator<Integer>{  
    2.     @Override  
    3.     public Integer init(TridentTuple tuple) {  
    4.           return 1;  
    5.     }  
    6.     @Override  
    7.     public Integer combine(Integer val1, Integer val2) {  
    8.            
    9.           return val1 + val2;  
    10.     }  
    11.     @Override  
    12.     public Integer zero() {  
    13.           return 0;  
    14.     }  
    15.   }  
         小结:当你使用aggregate 方法代替PartitionAggregate时,CombinerAggregator的好处就体现出来了,因为Trident会自动优化计算,在网络传输tuples之前做局部聚合。

    我们再看一下ReducerAggregator:
    [java]  view plain  copy
    1. public interface ReducerAggregator <T> extends Serializable {  
    2.         T init();  
    3.         T reduce(T curr, TridentTuple tuple);  
    4.     }  
         ReducerAggregator通过init方法提供一个初始值,然后为每个输入的tuple迭代这个值,最后生产处一个唯一的tuple输出,下面举例说明:
    • 定义一个ReducerAggregator接口实现技术器的例子:
    [java]  view plain  copy
    1. public class ReducerCount implements ReducerAggregator<Long>{  
    2.     @Override  
    3.     public Long init() {  
    4.           return 0L;  
    5.     }  
    6.     @Override  
    7.     public Long reduce(Long curr, TridentTuple tuple) {  
    8.           return curr + 1;  
    9.     }  
    10. }  
    最后一个是Aggregator接口,它是最通用的聚合器,它的形式如下:
      
    [java]  view plain  copy
    1. public interface Aggregator<T> extends Operation {  
    2.       T init(Object batchId, TridentCollector collector);  
    3.       void aggregate(T val, TridentTuple tuple, TridentCollector collector);  
    4.       void complete(T val, TridentCollector collector);  
    5.  }  
        Aggregator接口可以发射含任意数量属性的任意数据量的tuples,并且可以在执行过程中的任何时候发射:
    1. init:在处理数据之前被调用,它的返回值会作为一个状态值传递给aggregate和complete方法
    2. aggregate:用来处理每一个输入的tuple,它可以更新状态值也可以发射tuple
    3. complete:当所有tuple都被处理完成后被调用     
        下面举例说明:
    • 定义一个实现来完成一个计数器:
    
    
    [java]  view plain  copy
    1.  public class CountAgg extends BaseAggregator<CountState>{  
    2.    static class CountState { long count = 0; }  
    3.    @Override  
    4.    public CountState init(Object batchId, TridentCollector collector) {  
    5.          return new CountState();  
    6.    }  
    7.    @Override  
    8.    public void aggregate(CountState val, TridentTuple tuple, TridentCollector collector) {  
    9.         val. count+=1;  
    10.    }  
    11.    @Override  
    12.    public void complete(CountState val, TridentCollector collector) {  
    13.         collector.emit( new Values(val. count));  
    14.    }  
    15. }  
        有时候我们需要同时执行多个聚合器,这在Trident中被称作chaining,使用方法如下:
    
    
    [java]  view plain  copy
    1. java mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd();  
        这点代码会在每个Partition上运行count和sum函数,最终输出一个tuple:[“count”, “sum”]
    projection:投影操作
         投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: [“a”, “b”, “c”, “d”]
         运行如下代码:   
    
    
    [java]  view plain  copy
    1. java mystream.project(new Fields("b""d"))  
        则输出的流仅包含 [“b”, “d”]字段。
    2.2 Repartitioning重定向操作
         重定向操作是如何在各个任务间对tuples进行分区。分区的数量也有可能改变重定向的结果。重定向需要网络传输,下面介绍下重定向函数:
    1. shuffle:通过随机分配算法来均衡tuple到各个分区
    2. broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
    3. partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区
    4. global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream
    5. batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区
    6. Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping
    2.3 Aggragation聚合操作
         Trident有aggregate和 persistentAggregate方法来做聚合操作。aggregate是独立的运行在Stream的每个Batch上的,而persistentAggregate则是运行在Stream的所有Batch上并把运算结果存储在state source中。
         运行aggregate方法做全局聚合。当你用到  ReducerAggregator或Aggregator时,Stream首先被重定向到一个分区中,然后其中的聚合函数便在这个分区上运行。当你用到CombinerAggregator时,Trident会首先在每个分区上做局部聚合,然后把局部聚合后的结果重定向到一个分区,因此使用CombinerAggregator会更高效,可能的话我们需要优先考虑使用它。
         下面举个例子来说明如何用aggregate进行全局计数:
    
    [java]  view plain  copy
    1. java mystream.aggregate(new Count(), new Fields("count"));  
    和paritionAggregate一样,aggregators的聚合也可以串联起来,但是如果你把一个 CombinerAggregator和一个非CombinerAggregator串联在一起,Trident是无法完成局部聚合优化的。
    2.4 grouped streams
          GroupBy操作是根据特定的字段对流进行重定向的,还有,在一个分区内部,每个相同字段的tuple也会被Group到一起,下面这幅图描述了这个场景:
         如果你在grouped Stream上面运行aggregators,聚合操作会运行在每个Group中而不是整个Batch。persistentAggregate也能运行在GroupedSteam上,不过结果会被保存在MapState中,其中的key便是分组的字段。
         当然,aggregators在GroupedStreams上也可以串联。
    2.5 Merge和Joins:
    api的最后一部分便是如何把各种流汇聚到一起。最简单的方式就是把这些流汇聚成一个流。我们可以这么做:   
    [java]  view plain  copy
    1. java topology.merge(stream1, stream2, stream3);  
    另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的Stream。join应用在来自Spout的每一个小Batch中。join时候的tuple会包含:  
     1. join的字段,如Stream1中的key和Stream2中的x    
     2. 所有非join的字段,根据传入join方法的顺序,a和b分别代表steam1的val1和val2,c代表Stream2的val1          
         当join的是来源于不同Spout的stream时,这些Spout在发射数据时需要同步,一个Batch所包含的tuple会来自各个Spout。    
    
    
    
    
    
    

    专题三:Storm Trident API 实践

    一、概要     

    1.1 Storm(简介)

         Storm是一个实时的可靠地分布式流计算框架。
         具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(可以是logs,clicks,sensor data)、通过Storm对消息进行计算聚合等预处理、把处理结果持久化到NoSQL数据库或者HDFS做进一步深入分析。

    1.2 Trident(简介)

         Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API之外,它以batch(一组tuples)为单位进行处理,这样一来,可以使得一些处理更简单和高效。
         我们知道把Bolt的运行状态仅仅保存在内存中是不可靠的,如果一个node挂掉,那么这个node上的任务就会被重新分配,但是之前的状态是无法恢复的。因此,比较聪明的方式就是把storm的计算状态信息持久化到database中,基于这一点,trident就变得尤为重要。因为在处理大数据时,我们在与database打交道时通常会采用批处理的方式来避免给它带来压力,而trident恰恰是以batch groups的形式处理数据,并提供了一些聚合功能的API。

    二、Trident API 实践

         Trident其实就是一套API,但现阶段网上关于Trident API中各个函数的用法含义资料不多,下面我就根据一些英文资料和自己的理解,详细介绍一下Trident API各个函数的用法和含义。阅读本文需要有一定的Trident API基础。

    2.1 each() 方法

         作用:操作batch中的每一个tuple内容,一般与Filter或者Function函数配合使用。
         下面通过一个例子来介绍each()方法,假设我们有一个FakeTweetsBatchSpout,它会模拟一个Stream,随机产生一个个消息。我们可以通过设置这个Spout类的构造参数来改变这个Spout的batch Size的大小。

    2.1.1 Filter类:过滤tuple

         一个通过actor字段过滤消息的Filter:
    [java]  view plain  copy
    1. public static class PerActorTweetsFilter extends BaseFilter {  
    2.   String actor;  
    3.   
    4.   public PerActorTweetsFilter(String actor) {  
    5.     this.actor = actor;  
    6.   }  
    7.   @Override  
    8.   public boolean isKeep(TridentTuple tuple) {  
    9.     return tuple.getString(0).equals(actor);  
    10.   }  
    11. }  
       Topology:  
    [java]  view plain  copy
    1. topology.newStream("spout", spout)  
    2.   .each(new Fields("actor""text"), new PerActorTweetsFilter("dave"))  
    3.   .each(new Fields("actor""text"), new Utils.PrintFilter());  
         从上面例子看到,each()方法有一些构造参数
    1. 第一个构造参数:作为Field Selector,一个tuple可能有很多字段,通过设置Field,我们可以隐藏其它字段,仅仅接收指定的字段(其它字段实际还在)。
    2. 第二个是一个Filter:用来过滤掉除actor名叫"dave"外的其它消息。

    2.1.2 Function类:加工处理tuple内容

         一个能把tuple中text内容变成大写的Function:
    [java]  view plain  copy
    1. public static class UppercaseFunction extends BaseFunction {  
    2.   @Override  
    3.   public void execute(TridentTuple tuple, TridentCollector collector) {  
    4.     collector.emit(new Values(tuple.getString(0).toUpperCase()));  
    5.   }  
    6. }  
         Topology:
    [java]  view plain  copy
    1. topology.newStream("spout", spout)  
    2.   .each(new Fields("actor""text"), new PerActorTweetsFilter("dave"))  
    3.   .each(new Fields("text""actor"), new UppercaseFunction(), new Fields("uppercased_text"))  
    4.   .each(new Fields("actor""text""uppercased_text"), new Utils.PrintFilter());  
         首先,UppercaseFunction函数的输入是Fields("text", "actor"),其作用是把其中的"text"字段内容都变成大写。
         其次,它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后面,在本例中,执行完Function之后的tuple内容多了一个"uppercased_text",并且这个字段排在最后面。

    2.1.3 Field Selector与project

       我们需要注意的是,上面每个each()方法的第一个Field字段仅仅是隐藏掉没有指定的字段内容,实际上被隐藏的字段依然还在tuple中,如果想要彻底丢掉它们,我们就需要用到project()方法。
       投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: [“a”, “b”, “c”, “d”],运行如下代码:
    [java]  view plain  copy
    1. java mystream.project(new Fields("b""d"))  
       则输出的流仅包含 [“b”, “d”]字段。

    2.2 parallelismHint()方法和partitionBy()

    2.2.1 parallelismHint()

         指定Topology的并行度,即用多少线程执行这个任务。我们可以稍微改一下我们的Filter,通过打印当前任务的partitionIndex来区分当前是哪个线程。
    Filter:
    [java]  view plain  copy
    1. public static class PerActorTweetsFilter extends BaseFilter {  
    2.   
    3.   private int partitionIndex;  
    4.   private String actor;  
    5.   
    6.   public PerActorTweetsFilter(String actor) {  
    7.     this.actor = actor;  
    8.   }  
    9.   @Override  
    10.   public void prepare(Map conf, TridentOperationContext context) {  
    11.     this.partitionIndex = context.getPartitionIndex();  
    12.   }  
    13.   @Override  
    14.   public boolean isKeep(TridentTuple tuple) {  
    15.     boolean filter = tuple.getString(0).equals(actor);  
    16.     if(filter) {  
    17.       System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor);  
    18.     }  
    19.     return filter;  
    20.   }  
    21. }  
    Topology:
    [java]  view plain  copy
    1. topology.newStream("spout", spout)  
    2.   .each(new Fields("actor""text"), new PerActorTweetsFilter("dave"))  
    3.   .parallelismHint(5)  
    4.   .each(new Fields("actor""text"), new Utils.PrintFilter());  
         如果我们指定执行Filter任务的线程数量为5,那么最终的执行结果会如何呢?看一下我们的测试结果:
    [plain]  view plain  copy
    1. I am partition [4] and I have kept a tweet by: dave  
    2. I am partition [3] and I have kept a tweet by: dave  
    3. I am partition [0] and I have kept a tweet by: dave  
    4. I am partition [2] and I have kept a tweet by: dave  
    5. I am partition [1] and I have kept a tweet by: dave  
         我们可以很清楚的发现,一共有5个线程在执行Filter。
         如果我们想要2个Spout和5个Filter怎么办呢?如下面代码所示,实现很简单。
    [java]  view plain  copy
    1. topology.newStream("spout", spout)  
    2.   .parallelismHint(2)  
    3.   .shuffle()  
    4.   .each(new Fields("actor""text"), new PerActorTweetsFilter("dave"))  
    5.   .parallelismHint(5)  
    6.   .each(new Fields("actor""text"), new Utils.PrintFilter());  

    2.2.2 partitionBy()和重定向操作(repartitioning operation)  

         我们注意到上面的例子中用到了shuffle(),shuffle()是一个重定向操作。那什么是重定向操作呢?重定向定义了我们的tuple如何被route到下一处理层,当然不同的层之间可能会有不同的并行度,shuffle()的作用是把tuple随机的route下一层的线程中,而partitionBy()则根据我们的指定字段按照一致性哈希算法route到下一层的线程中,也就是说,如果我们用partitionBy()的话,同一个字段名的tuple会被route到同一个线程中。
         比如,如果我们把上面代码中的shuffle()改成partitionBy(new Fields("actor")),猜一下结果会怎样?
    [plain]  view plain  copy
    1. I am partition [2] and I have kept a tweet by: dave  
    2. I am partition [2] and I have kept a tweet by: dave  
    3. I am partition [2] and I have kept a tweet by: dave  
    4. I am partition [2] and I have kept a tweet by: dave  
         测试结果正如我们上面描述的那样,相同字段的tuple被route到了同一个partition中。
    重定向操作有如下几种:
    1. shuffle:通过随机分配算法来均衡tuple到各个分区
    2. broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
    3. partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区
    4. global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream
    5. batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区
    6. Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping

    2.3 聚合(Aggregation)

         我们前面讲过,Trident的一个很重要的特点就是它是以batch的形式处理tuple的。我们可以很容易想到的针对一个batch的最基本操作应该就是聚合。Trident提供了聚合API来处理batches,来看一个例子:

    2.3.1 Aggregator:

    [java]  view plain  copy
    1. public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> {  
    2.   
    3.   @Override  
    4.   public Map<String, Integer> init(Object batchId, TridentCollector collector) {  
    5.     return new HashMap<String, Integer>();  
    6.   }  
    7.   
    8.   @Override  
    9.   public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {  
    10.     String location = tuple.getString(0);  
    11.     val.put(location, MapUtils.getInteger(val, location, 0) + 1);  
    12.   }  
    13.   
    14.   @Override  
    15.   public void complete(Map<String, Integer> val, TridentCollector collector) {  
    16.     collector.emit(new Values(val));  
    17.   }  
    18. }  
     Topology:
    
    
    [java]  view plain  copy
    1. topology.newStream("spout", spout)  
    2.   .aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))  
    3.   .each(new Fields("location_counts"), new Utils.PrintFilter());  
         这个aggregator很简单:计算每一个batch的location的数量。通过这个例子我们可以看到Aggregator接口:
    1. init():当刚开始接收到一个batch时执行
    2. aggregate():在接收到batch中的每一个tuple时执行
    3. complete():在一个batch的结束时执行     
         我们前面讲过aggregate()方法是一个重定向方法,因为它会随机启动一个单独的线程来进行这个聚合操作。
         下面我们来看一下测试结果:
    [plain]  view plain  copy
    1. [{USA=3, Spain=1, UK=1}]  
    2. [{USA=3, Spain=2}]  
    3. [{France=1, USA=4}]  
    4. [{USA=4, Spain=1}]  
    5. [{USA=5}]  
         我们可以看到打印的结果,其中每一条的和都是5,这是因为我们的Spout的每个batch中tuple数量设置的是5,所以每个线程的计算结果也会是5。 除此之外,Trident还提供了其它两个Aggregator接口: CombinerAggregator, ReducerAggregator,具体使用方法请参考Trident API。

    2.3.2 partitionAggregate():

         如果我们将上面的Topology稍微改造一下,猜一下结果会是如何?

    [java]  view plain  copy
    1. topology.newStream("spout", spout)  
    2.   .partitionBy(new Fields("location"))  
    3.   .partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))  
    4.   .parallelismHint(3)  
    5.   .each(new Fields("location_counts"), new Utils.PrintFilter());  

         我们一起来分析一下,首先partitionBy()方法将tuples按其location字段重定向到下一处理逻辑,而且相同location字段的tuple一定会被分配到同一个线程中处理。其次,partitionAggregate()方法,注意它与Aggregate不同,它不是一个重定向方法,它仅仅是对当前partition上的各个batch执行聚合操作。因为我们根据location进行了重定向操作,测试数据一共有4个location,而当前一共有3个partition,因此可以猜测我们的最终测试结果中,有一个partition会处理两个location的batch,最终测试结果如下:
    [plain]  view plain  copy
    1. [{France=10, Spain=5}]  
    2. [{USA=63}]  
    3. [{UK=22}]  
         需要注意的是,partitionAggregate虽然也是聚合操作,但与上面的Aggregate完全不同,它不是一个重定向操作。

    2.4 groupBy

         我们可以看到上面几个例子的测试结果,其实我们通常想要的是每个location的数量是多少,那该怎么处理呢?看下面这个Topology:
    [java]  view plain  copy
    1. topology.newStream("spout", spout)  
    2.   .groupBy(new Fields("location"))  
    3.   .aggregate(new Fields("location"), new Count(), new Fields("count"))  
    4.   .each(new Fields("location""count"), new Utils.PrintFilter());  
         我们先看一下执行的结果:
    [plain]  view plain  copy
    1. ...  
    2. [France, 25]  
    3. [UK, 2]  
    4. [USA, 25]  
    5. [Spain, 44]  
    6. [France, 26]  
    7. [UK, 3]  
    8. ...  
         上面这段代码计算出了每个location的数量,即使我们的Count函数没有指定并行度。这就是groupBy()起的作用,它会根据指定的字段创建一个GroupedStream,相同字段的tuple都会被重定向到一起,汇聚成一个group。groupBy()之后是aggregate,与之前的聚合整个batch不同,此时的aggregate会单独聚合每个group。我们也可以这么认为,groupBy会把Stream按照指定字段分成一个个stream group,每个group就像一个batch一样被处理。

         不过需要注意的是,groupBy()本身并不是一个重定向操作,但如果它后面跟的是aggregator的话就是,跟的是partitionAggregate的话就不是。


    三、总结 

         Storm是一个实时流计算框架,Trident是对storm的一个更高层次的抽象,Trident最大的特点以batch的形式处理stream。
         一些最基本的操作函数有Filter、Function,Filter可以过滤掉tuple,Function可以修改tuple内容,输出0或多个tuple,并能把新增的字段追加到tuple后面。
         聚合有partitionAggregate和Aggregator接口。partitionAggregate对当前partition中的tuple进行聚合,它不是重定向操作。Aggregator有三个接口:CombinerAggregator, ReducerAggregator,Aggregator,它们属于重定向操作,它们会把stream重定向到一个partition中进行聚合操作。
         重定向操作会改变数据流向,但不会改变数据内容,重定向操会产生网络传输,可能影响一部分效率。而Filter、Function、partitionAggregate则属于本地操作,不会产生网络传输。
         GroupBy会根据指定字段,把整个stream切分成一个个grouped stream,如果在grouped stream上做聚合操作,那么聚合就会发生在这些grouped stream上而不是整个batch。如果groupBy后面跟的是aggregator,则是重定向操作,如果跟的是partitionAggregate,则不是重定向操作。

         上面所以的例子都可以在github上找到: https://github.com/pereferrera/trident-hackaton/  


    展开全文
  • 每个想要DevSecOps的人的路线图。 :scroll: 目录 :thought_balloon: 路线图 :nut_and_bolt: 工具 在应用DevSecOps上花费了大量时间来搜索,比较和制定有关工具的决策。 这些工具列表是帮助您减少不必要的时间并...
  • 目前,Trident使用Trident库允许使用Python编写的模块充当,以提供一些基本说明。 Trident尝试使用尽可能少的外部模块,并允许用户仅使用标准库来运行Trident 。这样做是为了确保与主机的最大兼容性,并允许专注于...
  • 实用的Storm Trident教程 本教程以的的出色为基础。 流浪者的设置基于Taylor Goetz的。 Hazelcast状态代码基于wurstmeister的。 看看随附的。 本教程的结构 浏览Part * .java,了解Trident的基础知识 使用Skeleton....
  • TridentNet算法笔记

    千次阅读 2019-06-18 22:50:21
    论文:Scale-Aware Trident Networks for Object Detection 论文链接:https://arxiv.org/abs/1901.01892 代码链接:https://github.com/TuSimple/simpledet/tree/master/models/tridentnet 目标检测中物体的尺度...

    论文:Scale-Aware Trident Networks for Object Detection
    论文链接:https://arxiv.org/abs/1901.01892
    代码链接:https://github.com/TuSimple/simpledet/tree/master/models/tridentnet

    目标检测中物体的尺度变化一直是关注的热点,毕竟要兼顾大尺寸目标和小尺寸目标有一定难度。早些年的图像金字塔(image pyramid),如Figure1(a)所示,通过缩放输入图像实现不同尺寸目标的检测,效果很不错,现在许多比赛中仍会采用多尺度测试提升模型效果,缺点就是速度太慢,实际项目中很难使用。之前的SNIP、SNIPER、AutoFocus等算法算是图像金字塔的优化版,目的是减少不必要的计算,提速比较明显,但离实际应用还有点差距。

    特征金字塔是另外一种尝试,代表作就是FPN,如Figure1(b)所示,效果上不如图像金字塔,但是速度方面有优势。

    图像金字塔和特征金字塔本质上都是希望不同尺度的目标有不同的感受野,这样提取到的特征才比较全面,因此TridentNet算法从感受野入手,通过引入空洞卷积增加网络的感受野,从而实现不同尺度目标的检测,如Figure1(c)所示,比较吸引我的地方在于算法整体上非常简洁,而且效果很赞。另外之前还有一篇关于目标检测算法中感受野的研究:DetNet,可以参考博客:DetNet算法笔记
    在这里插入图片描述

    那么感受野和检测效果之间到底存在什么关系?作者做了一个关于感受野和检测效果之间的联系的对比实验,实验结果如Table1所示。这个实验通过修改Faster RCNN算法的特征提取网络中卷积层的dilation参数控制感受野大小,当dilation参数为1时等效于常规卷积层。可以看出不同尺度目标的最高AP值对应的dilation参数(也就是不同感受野)是不同的,而且存在明显的规律,这说明针对目标尺度大小设计对应的感受野可以使检测模型的整体效果达到最佳,这也是TridentNet算法的主要思想
    在这里插入图片描述

    有了Table1的结论,接下来就可以设计TridentNet了,如Figure2所示。
    第1个改进点是将原本特征提取网络(backbone)的单支路卷积层替换成3个支路且dilated参数不同的dilated卷积层,这就是论文中提到的多分枝(multi-branch)思想。以特征提取网络ResNet为例,就是将residual block中的3×3卷积层替换成Figure2中的3支路3×3卷积层,dilated参数分别为1、2、3。
    第2个改进点是权重共享(weight sharing among branches),是指3个支路的卷积层参数是共享的(差别仅在于dilated参数),这么做的原因是一方面可以减少网络前向计算的时间,另一方面网络学到的参数有更好的泛化能力。还有一个好处在于inference,文中提到了一种快速inference做法:选择一个分支的输出作为最终结果,假如没有权重共享,那么单分支的结果很难近似多分支结果。
    第3个改进点是指定尺度过滤训练(scale-aware training scheme),是指不同dilated参数的3个支路分别检测不同尺度的目标。还记得Table1的实验结果吗?不同尺度的目标所对应的网络最佳感受野是不同的,因此可以为这3条支路分配不同尺度的目标(和SNIP的思想有点类似),比如对于dalated参数为3的支路而言,感受野更大,大尺度目标的检测效果好,因此就分配尺度较大的目标,实现上可以通过判断RoI的尺寸后将尺度符合定义的目标输入该支路进行训练。这种方法减少了每条支路所训练的目标尺寸差异,虽然训练样本也少了,但由于权重是共享的,所以效果不会下降,可以看Table2的对比实验。
    在这里插入图片描述

    实验结果
    Table2是关于TridentNet提出的多分枝、权重共享和指定尺度过滤训练这3个部分的对比实验,可以看出多分枝和权重共享的效果是很明显的。指定尺度过滤的实验效果没有太出彩,作者在文中也指明了原因可能是过拟合导致的,因为对尺度做了过滤,所以相当于每个分支的训练样本减少了。在尺度过滤的基础上添加权重共享可以有效减少这种过拟合,因为权重共享操作相当于在每次的参数更新中所有训练样本都做了贡献(可以参考e和c的对比)。同时在Table2中用到2个baseline,一个是以ResNet101为backbone的Faster RCNN算法,另一个是以ResNet101-Deformable为backbone的Faster RCNN算法,deformable结构的设计初衷是为了解决物体形变问题,形变和尺度变化并不完全一样,尺度变化更强调目标整体尺寸的大小变化,可以看到TridentNet算法在deformable结构中仍然有所提升,说明设计更加灵活的感受野依然有所帮助。
    在这里插入图片描述

    Table7是TridentNet算法和目前前沿算法的效果对比,48.4的mAP应该是目前效果最好的。
    在这里插入图片描述

    论文中实验比较丰富,比如分支数量的选择、在主网络的哪些部分添加trident block等,比较有说服力,整体上来看解决方案非常干净、清爽,很难得。

    展开全文
  • Trident API 概览

    2017-06-06 13:58:50
    Trident API 概览  在网上看到了很多有TRIDENT相关API的翻译,看来看去,总觉得没有说清楚很多东西,所以自己结合使用的经验翻译了一篇出来;翻译完以后,也发现 在自己的翻译中也有很多地方是表达不清楚的··...
  • Trident Topology实例

    2019-02-22 20:58:43
    转载自:https://blog.csdn.net/l1028386804/article/details/79120204 ...实例一   通过实例来了解Trident topology。需求是收集医学诊断报告来判断是否有疾病暴发。这个topology会处理的医学诊断事件包括如...
  • TridentNet解读

    千次阅读 2019-04-25 12:01:56
    检测领域一直存在一个scale variation问题,大小物体对视野域的需要是不一样的,deformable cnn的...(目标检测算法trident network引发的思考)[ https://blog.csdn.net/diligent_321/article/details/86531659 ]
  • trident原理及编程指南

    千次阅读 2017-08-13 20:06:53
    trident原理及编程指南@(STORM)[storm, 大数据]trident原理及编程指南 一理论介绍 一trident是什么 二trident处理单位 三事务类型 1spout类型 2state类型 3实现恰好一次的spout与state组合类型 二编程指南 1定义输入...
  • 此拓扑用于演示两件事:仅处理一次语义和Trident的有状态处理。 我希望通过在本地创建的多节点群集中运行此拓扑来演示这些功能。 当拓扑运行时,我们将杀死所有工作节点,并通过查看redis服务器中存储的最终计数来...
  • Storm Trident API 使用详解

    千次阅读 2017-12-12 14:44:13
     Storm Trident中的核心数据模型就是“Stream”,也就是说,Storm Trident处理的是Stream,但是实际上Stream是被成批处理的,Stream被切分成一个个的Batch分布到集群中,所有应用在Stream上的函数最终会应用到每个...
  • Scale-Aware Trident Networks for Object Detection &Summary 检测任务中存在目标尺寸多样化的问题,为了解决这一问题,作者采用的做法如下: 为了使模型对不同尺寸目标的“表达能力”近似,作者借鉴了SNIP的...
  • Trident简介 Trident拥有一流的抽象,可以读取和写入有状态的来源。状态可以是拓扑的内部 - 例如,保存在内存中并由HDFS支持 - 或者外部存储在Memcached或Cassandra等数据库中。在任何一种情况下,Trident API都...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 35,214
精华内容 14,085
关键字:

trident