精华内容
下载资源
问答
  • 2018-11-30 23:14:00

    Trident 是一个基于Storm构建的上层的Micro-Batching系统,它简化了Storm的拓扑构建过程并且提供了类似于窗口、聚合以及状态管理等等没有被Storm原生支持的功能

    转载于:https://www.cnblogs.com/huiandong/p/10047305.html

    更多相关内容
  • “ void-files”目录包含“ trident-core”软件包将安装的所有文件的1:1映射。 “ void-files”目录直接映射到根文件系统(“ /”)。 示例:安装后,“ void-files / boot / loader.conf.local”变为“ /boot/...
  • 硅产品知识产权(SIP)平台解决方案和数字信号处理器(DSP)内核授权厂商CEVA公司宣布,Trident Microsystems已获得CEVA的DSP技术授权以实现其机顶盒和电视系统产品线的解调(demodulation)功能。Trident Microsystems...
  • 三叉戟除雾网 NTIRE 2020 NonHomogeneous Dehazing Challenge (CVPR Workshop 2020)第一个解决方案。 环境: Ubuntu16.04 Python3.6 英伟达 GPU+CUDA8 依存关系: 预训练模型==0.7.4 火炬视觉==0.2.1 ...
  • trident-7.0.jar

    2018-04-01 11:19:00
    java swing用户交互界面的美观开发工具包,便于界面开发。
  • storm_trident_state

    2021-06-30 05:45:08
    storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。
  • Trident Project-开源

    2021-05-12 22:45:49
    杜克大学图书馆的数字存储库和元数据编辑计划
  • Trident数据手册.pdf

    2019-10-20 03:15:08
    Trident数据手册pdf,主要介绍Trident特点及技术规格,Trident 择是一款 3U 机架抽取式 KVM 切换器,配有一体化抽取式键盘和 3X17 英寸 LCD 显示屏,使用了高对比度的显示器(50:1),可以折叠放入 3U 机架内。
  • Trident Extension -crx插件

    2021-04-02 14:16:22
    语言:English 三叉戟扩展 我们的Dialoga PBX是我们WebRTC服务产品的核心组件。 它结合了高级动态呼叫控制和路由引擎的所有优点以及功能齐全的呼叫者交互解决方案。 所有这些都可以通过易于使用的自我管理和报告界面...
  • Storm Trident实战之计算网站PV.rar
  • TridentNet

    2019-02-19 14:54:17
    本文致力于在深度学习目标检测问题中,提高对小目标的检测率
  • Trident8493_NVR.tar.gz

    2020-01-03 08:29:49
    建议选择4G或8G的小U盘,U盘的格式为FAT32,在U盘中新建一个T16的文件夹,将附件Trident8493_NVR.tar直接拷贝到T16文件夹中升级,插入U盘后点击系统维护升级。附件不要解压作直接拷贝。不要使用制作过U盘启动的U盘,...
  • 三叉戟 具有AI兼容性的Emulsion Web浏览器。
  • substance.jar和trident.jar

    热门讨论 2015-04-26 08:05:42
    Java界面GUI设计难看,所以用换肤所需的两个包substance.jar和trident.jar,方便换肤,怎样使用百度一下就可以
  • 目前,Trident使用Trident库允许使用Python编写的模块充当,以提供一些基本说明。 Trident尝试使用尽可能少的外部模块,并允许用户仅使用标准库来运行Trident 。这样做是为了确保与主机的最大兼容性,并允许专注于...
  • 含5篇近两年经典目标检测文献:Cascade R-CNN.pdf、CornerNet.pdf、RetinaNet.pdf、TridentNet.pdf、YOLOv3.pdf
  • trident教程

    2019-10-02 17:40:19
    (一)理论基础更多理论以后再补充,或者参考书籍1、trident是什么?Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput ...

     



    (一)理论基础
    更多理论以后再补充,或者参考书籍
    1、trident是什么?
    Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you're familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.

    简单的说,trident是storm的更高层次抽象,相对storm,它主要提供了2个方面的好处:

    (1)提供了更高层次的抽象,将常用的count,sum等封装成了方法,可以直接调用,不需要自己实现。

    (2)提供了一次原语,如groupby等。

    (3)提供了事务支持,可以保证数据均处理且只处理了一次。

    2、trident每次处理消息均为batch为单位,即一次处理多个元组。


    3、事务类型

    关于事务类型,有2个比较容易混淆的概念:spout的事务类型以及事务状态。

    它们都有3种类型,分别为:事务型、非事务型和透明事务型。

    (1)spout

    spout的类型指定了由于下游出现问题导致元组需要重放时,应该怎么发送元组。

    事务型spout:重放时能保证同一个批次发送同一批元组。可以保证每一个元组都被发送且只发送一个,且同一个批次所发送的元组是一样的。

    非事务型spout:没有任何保障,发完就算。

    透明事务型spout:同一个批次发送的元组有可能不同的,它可以保证每一个元组都被发送且只发送一次,但不能保证重放时同一个批次的数据是一样的。这对于部分失效的情况尤其有用,假如以kafka作为spout,当一个topic的某个分区失效时,可以用其它分区的数据先形成一个批次发送出去,如果是事务型spout,则必须等待那个分区恢复后才能继续发送。

    这三种类型可以分别通过实现ITransactionalSpout、ITridentSpout、IOpaquePartitionedTridentSpout接口来定义。

     

    (2)state

    state的类型指定了如果将storm的中间输出或者最终输出持久化到某个地方(如内存),当某个批次的数据重放时应该如果更新状态。state对于下游出现错误的情况尤其有用。

    事务型状态:同一批次tuple提供的结果是相同的。

    非事务型状态:没有回滚能力,更新操作是永久的。

    透明事务型状态:更新操作基于先前的值,这样由于这批数据发生变化,对应的结果也会发生变化。透明事务型状态除了保存当前数据外,还要保存上一批数据,当数据重放时,可以基于上一批数据作更新。




    (二)看官方提供的示例

    package org.ljh.tridentdemo;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.LocalDRPC;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import storm.trident.TridentState;
    import storm.trident.TridentTopology;
    import storm.trident.operation.BaseFunction;
    import storm.trident.operation.TridentCollector;
    import storm.trident.operation.builtin.Count;
    import storm.trident.operation.builtin.FilterNull;
    import storm.trident.operation.builtin.MapGet;
    import storm.trident.operation.builtin.Sum;
    import storm.trident.testing.FixedBatchSpout;
    import storm.trident.testing.MemoryMapState;
    import storm.trident.tuple.TridentTuple;
    
    
    public class TridentWordCount {
        public static class Split extends BaseFunction {
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                String sentence = tuple.getString(0);
                for (String word : sentence.split(" ")) {
                    collector.emit(new Values(word));
                }
            }
        }
    
        public static StormTopology buildTopology(LocalDRPC drpc) {
            FixedBatchSpout spout =
                    new FixedBatchSpout(new Fields("sentence"), 3, new Values(
                            "the cow jumped over the moon"), new Values(
                            "the man went to the store and bought some candy"), new Values(
                            "four score and seven years ago"),
                            new Values("how many apples can you eat"), new Values(
                                    "to be or not to be the person"));
            spout.setCycle(true);
    
            //创建拓扑对象
            TridentTopology topology = new TridentTopology();
            
            //这个流程用于统计单词数据,结果将被保存在wordCounts中
            TridentState wordCounts =
                    topology.newStream("spout1", spout)
                            .parallelismHint(16)
                            .each(new Fields("sentence"), new Split(), new Fields("word"))
                            .groupBy(new Fields("word"))
                            .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                                    new Fields("count")).parallelismHint(16);
            //这个流程用于查询上面的统计结果
            topology.newDRPCStream("words", drpc)
                    .each(new Fields("args"), new Split(), new Fields("word"))
                    .groupBy(new Fields("word"))
                    .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                    .each(new Fields("count"), new FilterNull())
                   .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
            return topology.build();
        }
    
        public static void main(String[] args) throws Exception {
            Config conf = new Config();
            conf.setMaxSpoutPending(20);
            if (args.length == 0) {
                LocalDRPC drpc = new LocalDRPC();
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
                for (int i = 0; i < 100; i++) {
                    System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
                    Thread.sleep(1000);
                }
            } else {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
            }
        }
    }
    





    实例实现了最基本的wordcount功能,然后将结果输出。关键步骤如下:



    1、定义了输入流

            FixedBatchSpout spout =
                    new FixedBatchSpout(new Fields("sentence"), 3, new Values(
                            "the cow jumped over the moon"), new Values(
                            "the man went to the store and bought some candy"), new Values(
                            "four score and seven years ago"),
                            new Values("how many apples can you eat"), new Values(
                                    "to be or not to be the person"));
            spout.setCycle(true);



    (1)使用FixedBatchSpout创建一个输入spout,spout的输出字段为sentence,每3个元组作为一个batch。
    (2)数据不断的重复发送。


    2、统计单词数量

            TridentState wordCounts =
                    topology.newStream("spout1", spout)
                            .parallelismHint(16)
                            .each(new Fields("sentence"), new Split(), new Fields("word"))
                            .groupBy(new Fields("word"))
                            .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                                    new Fields("count")).parallelismHint(16);



    这个流程用于统计单词数据,结果将被保存在wordCounts中。6行代码的含义分别为:

    (1)首先从spout中读取消息,spout1定义了zookeeper中用于保存这个拓扑的节点名称。

    (2)并行度设置为16,即16个线程同时从spout中读取消息。

    (3)each中的三个参数分别为:输入字段名称,处理函数,输出字段名称。即从字段名称叫sentence的数据流中读取数据,然后经过new Split()处理后,以word作为字段名发送出去。其中new Split()后面介绍,它的功能就是将输入的内容以空格为界作了切分。

    (4)将字段名称为word的数据流作分组,即相同值的放在一组。

    (5)将已经分好组的数据作统计,结果放到MemoryMapState,然后以count作为字段名称将结果发送出去。这步骤会同时存储数据及状态,并将返回TridentState对象。

    (6)并行度设置。

    3、输出统计结果

            topology.newDRPCStream("words", drpc)
                    .each(new Fields("args"), new Split(), new Fields("word"))
                    .groupBy(new Fields("word"))
                    .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                    .each(new Fields("count"), new FilterNull())
                   .aggregate(new Fields("count"), new Sum(), new Fields("sum"));



    这个流程从上述的wordCounts对象中读取结果,并返回。6行代码的含义分别为:

    (1)等待一个drpc调用,从drpc服务器中接受words的调用来提供消息。调用代码如下:

    drpc.execute("words", "cat the dog jumped")
    (2)输入为上述调用中提供的参数,经过Split()后,以word作为字段名称发送出去。

    (3)以word的值作分组。

    (4)从wordCounts对象中查询结果。4个参数分别代表:数据来源,输入数据,内置方法(用于从map中根据key来查找value),输出名称。

    (5)过滤掉空的查询结果,如本例中,cat和dog都没有结果。

    (6)将结果作统计,并以sum作为字段名称发送出去,这也是DRPC调用所返回的结果。如果没有这一行,最后的输出结果

    DRPC RESULT: [["cat the dog jumped","the",2310],["cat the dog jumped","jumped",462]]
    加上这一行后,结果为:
    DRPC RESULT: [[180]]

    4、split的字义

        public static class Split extends BaseFunction {
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                String sentence = tuple.getString(0);
                for (String word : sentence.split(" ")) {
                    collector.emit(new Values(word));
                }
            }
        }


    注意它最后会发送数据。

    5、创建并启动拓扑

        public static void main(String[] args) throws Exception {
            Config conf = new Config();
            conf.setMaxSpoutPending(20);
            if (args.length == 0) {
                LocalDRPC drpc = new LocalDRPC();
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
                for (int i = 0; i < 100; i++) {
                    System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
                    Thread.sleep(1000);
                }
            } else {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
            }
        }


    (1)当无参数运行时,启动一个本地的集群,及自已创建一个drpc对象来输入。
    (2)当有参数运行时,设置worker数量为3,然后提交拓扑到集群,并等待远程的drpc调用。


     

    (三)使用kafka作为数据源的一个例子

     

     

    package com.netease.sytopology;
    
    import java.io.File;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.Arrays;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    import storm.kafka.BrokerHosts;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import storm.kafka.trident.OpaqueTridentKafkaSpout;
    import storm.kafka.trident.TridentKafkaConfig;
    import storm.trident.TridentTopology;
    import storm.trident.operation.BaseFunction;
    import storm.trident.operation.TridentCollector;
    import storm.trident.operation.builtin.Count;
    import storm.trident.testing.MemoryMapState;
    import storm.trident.tuple.TridentTuple;
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    /*
     * 本类完成以下内容
     */
    public class SyTopology {
    
        public static final Logger LOG = LoggerFactory.getLogger(SyTopology.class);
    
        private final BrokerHosts brokerHosts;
    
        public SyTopology(String kafkaZookeeper) {
            brokerHosts = new ZkHosts(kafkaZookeeper);
        }
    
        public StormTopology buildTopology() {
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            // TransactionalTridentKafkaSpout kafkaSpout = new
            // TransactionalTridentKafkaSpout(kafkaConfig);
            OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
            TridentTopology topology = new TridentTopology();
    
            // TridentState wordCounts =
            topology.newStream("kafka4", kafkaSpout).
            each(new Fields("str"), new Split(),
                    new Fields("word")).groupBy(new Fields("word"))
                    .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                            new Fields("count")).parallelismHint(16);
            // .persistentAggregate(new HazelCastStateFactory(), new Count(),
            // new Fields("aggregates_words")).parallelismHint(2);
    
    
            return topology.build();
        }
    
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
            String kafkaZk = args[0];
            SyTopology topology = new SyTopology(kafkaZk);
            Config config = new Config();
            config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);
    
            String name = args[1];
            String dockerIp = args[2];
            config.setNumWorkers(9);
            config.setMaxTaskParallelism(5);
            config.put(Config.NIMBUS_HOST, dockerIp);
            config.put(Config.NIMBUS_THRIFT_PORT, 6627);
            config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
            config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));
            StormSubmitter.submitTopology(name, config, topology.buildTopology());
    
        }
    
        static class Split extends BaseFunction {
            public void execute(TridentTuple tuple, TridentCollector collector) {
                String sentence = tuple.getString(0);
                for (String word : sentence.split(",")) {
                    try {
                        FileWriter fw = new FileWriter(new File("/home/data/test/ma30/ma30.txt"),true);
                        fw.write(word);
                        fw.flush();
                        fw.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    collector.emit(new Values(word));
                    
                }
            }
        }
    }
    



     

    本例将从kafka中读取消息,然后对消息根据“,”作拆分,并写入一个本地文件。
    1、定义kafka想着配置
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "ma30", "storm");
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

            OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

    其中ma30是订阅的topic名称。


    2、从kafka中读取消息并处理

            topology.newStream("kafka4", kafkaSpout).
            each(new Fields("str"), new Split(),new Fields("word")).
            groupBy(new Fields("word"))
            .persistentAggregate(new MemoryMapState.Factory(), new Count(),
                            new Fields("count")).parallelismHint(16);

    (1)指定了数据来源,并指定zookeeper中用于保存数据的位置,即保存在/transactional/kafka4。
    (2)指定处理方法及发射的字段。
    (3)根据word作分组。

    (4)计数后将状态写入MemoryMapState

     

    提交拓扑:

    storm jar target/sytopology2-0.0.1-SNAPSHOT.jar com.netease.sytopology.SyTopology 192.168.172.98:2181/kafka test3 192.168.172.98

    此时可以在/home/data/test/ma30/ma30.txt看到split的结果

     

    转载于:https://www.cnblogs.com/jinhong-lu/p/4634980.html

    展开全文
  • trident-demo:三叉戟示威

    2021-05-16 06:10:28
    此拓扑用于演示两件事:仅处理一次语义和Trident的有状态处理。 我希望通过在本地创建的多节点群集中运行此拓扑来演示这些功能。 当拓扑运行时,我们将杀死所有工作节点,并通过查看redis服务器中存储的最终计数来...
  • Bolt/Trident API 实现 该库提供了核心storm bolt,并在Elasticsearch 之上实现了Trident 状态。 它支持非事务性、事务性和不透明状态类型。 Maven 依赖 < groupId>com.github.fhuss</ groupId> < artifactId>...
  • Trident-Shards

    2021-03-31 07:44:34
    Trident Shards是一个数据包,允许使用从上古守护者掉落的碎片制作三叉戟 安装 将trident-shards-dp.zip文件放入您的世界的datapacks文件夹中 将trident-shards.zip文件放入资源包文件夹 告示 没有资源包,三叉戟...
  • trident:Mach-O钩库

    2021-04-28 08:19:22
    将#include "trident.h" trident/文件夹复制到您的项目目录中,并#include "trident.h" 。 调用hook例程来设置钩子。 请注意,这应该从注入的库中完成,以挂钩特定图像中的函数。 void hook(const char *target, ...
  • Storm_Trident

    2016-08-18 14:42:07
    storm_Trident例子
  • Trident 必须从 Clojure 提供的所有原语。 准备好? 抓住你愿意的船只,让我们这样做! 安装 Marceline 可从 clojars 获得。 将以下内容添加到项目的deps 。 [yieldbot/marceline "0.3.1-SNAPSHOT"] 请注意,...
  • Trident项目是为满足以下要求而开发的自动密码喷涂工具: 可以部署在多个云平台/执行提供商上的能力 能够根据目标的帐户锁定政策安排喷涂活动 出于操作安全目的而增加身份验证尝试所源自的IP池的能力 快速扩展功能...
  • Trident Topology实例

    2019-02-22 20:58:43
    转载自:https://blog.csdn.net/l1028386804/article/details/79120204 ...实例一   通过实例来了解Trident topology。需求是收集医学诊断报告来判断是否有疾病暴发。这个topology会处理的医学诊断事件包括如...

    转载自:https://blog.csdn.net/l1028386804/article/details/79120204
        https://blog.csdn.net/jediael_lu/article/details/76794843

    实例一

      通过实例来了解Trident topology。需求是收集医学诊断报告来判断是否有疾病暴发。这个topology会处理的医学诊断事件包括如下信息:

    LatitudeLongitudeTimestampDiagnosis Code(ICD9-CM)
    39.9522-75.162401/21/2018 at 11:00 AM320.0(Hemophilus meningitis)
    40.3588-75.626901/21/2018 at 11:30 AM324.0(Intracranial abscess)

      每个事件包括事件发生时的全球定位系统(GPS)的位置坐标,经度和维度使用十进制小数表示,事件还包括ICD9-CM编码,表示诊断结果,以及事件发生的时间戳。
      为了判断是否有疾病暴发,系统会按照地理位置来统计各种疾病代码在一段时间内出现的次数。为了简化例子,按照城市划分诊断结果的地理位置。实际系统会对地理位置做出更精细的划分。
      另外,实例中会按小时对针对时间进行分组。使用移动平均值来计算趋势。
    最后使用简单的阈值来判断是否有疾病暴发。如果某个时间时间发生的系数超过了阈值,系统会发生告警。同时,为了维护历史记录,还需要将每个城市、小时、疾病的统计量持久化存储。

    一、Trident Spout
      Trident引入了"数据批次"的概念,不像Storm的spout,Trident spout必须成批的发送tuple。每个Batch会分配一个唯一的事务标识符。spout基于约定决定batch的组成方式,spout有三种约定:非事务型、事务型、非透明型。
      非事务型spout:对batch的组成部分不提供保证,并且可能出现重复,两个不同的batch可能含有相同的tuple
      事务型spout:保证batch是非重复的,并且batch总是包含相同的tuple。
      非透明型spout:保证数据是非重复的,但不能保证batch的内容是不变的。

      DiagnosisEventSpout如下:

    public class DiagnosisEventSpout implements ITridentSpout<Long> {
    
        private BatchCoordinator<Long> coordinator = new DefaultCoordinator();
        private Emitter<Long>          emitter     = new DiagnosisEventEmitter();
    
        @Override
        public BatchCoordinator<Long> getCoordinator(String txStateId, Map conf, TopologyContext context) {
            return coordinator;
        }
    
        @Override
        public Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {
            return emitter;
        }
    
        @Override
        public Map getComponentConfiguration() {
            return null;
        }
    
        @Override
        public Fields getOutputFields() {
            return new Fields("event");
        }
    }
    

      如上述代码中的getOutputFields()方法所示,该spout发射一个字段event,值是由Emitter实例发射的DiagnosisEvent,DefultCoordinator类来进行协调:

    public class DefaultCoordinator implements ITridentSpout.BatchCoordinator<Long>, Serializable {
        private static final Logger LOG = LoggerFactory.getLogger(DefaultCoordinator.class);
    
        @Override
        public boolean isReady(long txid) {
            return true;
        }
    
        @Override
        public void close() {
        }
    
        @Override
        public Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {
            LOG.info("Initializing Transaction [" + txid + "]");
            return null;
        }
    
        @Override
        public void success(long txid) {
            LOG.info("Successful Transaction [" + txid + "]");
        }
    }
    
    public class DiagnosisEventEmitter implements ITridentSpout.Emitter<Long>, Serializable {
        AtomicInteger successfulTransactions = new AtomicInteger(0);
    
        @Override
        public void emitBatch(TransactionAttempt tx, Long coordinatorMeta, TridentCollector collector) {
            for (int i = 0; i < 10000; i++) {
                List<Object> events = new ArrayList<>();
                double lat = (double) (-30 + (int) (Math.random() * 75));
                double lng = (double) (-120 + (int) (Math.random() * 70));
                long time = System.currentTimeMillis();
    
                String diagnosisCode = Integer.toString(320 + (int) (Math.random() * 7));
                DiagnosisEvent event = new DiagnosisEvent(lat, lng, time, diagnosisCode);
                events.add(event);
                collector.emit(events);
            }
        }
    
        @Override
        public void success(TransactionAttempt tx) {
            successfulTransactions.incrementAndGet();
        }
    
        @Override
        public void close() {
        }
    
    }
    

      Emitter#emitBatch函数接收的参数coordinatorMeta是由coordinator生成的。
    发送的工作在emitBatch()中进行。随机分配了一个经纬度,使用System.currentTimeStamp()方法生成时间戳,使用320-327之间的诊断码。
      DiagosisEvent是一个简单的JavaBean。时间戳使用long变量存储,存储的是时间的秒数。经度和维度使用double存储。

    public class DiagnosisEvent implements Serializable {
        private static final long serialVersionUID = 1L;
    
        public double lat;
        public double lng;
        public long time;
        public String diagnosisCode;
    
        public DiagnosisEvent(double lat, double lng, long time, String diagnosisCode) {
            super();
            this.time = time;
            this.lat = lat;
            this.lng = lng;
            this.diagnosisCode = diagnosisCode;
        }
    }
    

    二、Trident tology
      使用Trident创建tology:

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();
        DiagnosisEventSpout spout = new DiagnosisEventSpout();
        Stream inputStream = topology.newStream("event", spout);
    
        inputStream.each(new Fields("event"), new DiseaseFilter())
                .each(new Fields("event"), new CityAssignment(), new Fields("city"))
                .each(new Fields("event", "city"), new HourAssignment(), new Fields("hour", "cityDiseaseHour"))
                .groupBy(new Fields("cityDiseaseHour"))
                .persistentAggregate(new OutbreakTrendFactory(), new Count(), new Fields("count")).newValuesStream()
                .each(new Fields("cityDiseaseHour", "count"), new OutbreakDetector(), new Fields("alert"))
                .each(new Fields("alert"), new DispatchAlert(), new Fields());
        return topology.build();
    }
    

      DiagnosisEventSpout函数发射疾病事件,然后事件由DiseaseFilter函数过滤,过滤掉不关心的疾病事件。之后,事件由CityAssignment函数赋值一个对应的城市名。然后HourAssignment函数复制一个表示小时的时间戳,并且增加一个Key CityDiseaseHour到tuple的字段中,这个Key包括城市、小时和疾病代码。后续就使用这个Key进行分组统计并使用persistAggregate函数对统计量持久性存储。统计量传递给OutbreakDetector函数,如果统计量超过阈值,OutbreakDetector向后发送一个告警信息。最后DispatchAlert接收到告警信息,记录日志,并且流程结束。

    三、Trident filter
      为了通过疾病代码过滤事件,需要利用Trident filter。Trident提供BaseFilter类可以方便的对tuple过滤,滤除系统不需要的tuple。

    public class DiseaseFilter extends BaseFilter {
        private static final long serialVersionUID = 1L;
    
        private static final Logger LOG = LoggerFactory.getLogger(DiseaseFilter.class);
    
        @Override
        public boolean isKeep(TridentTuple tuple) {
            DiagnosisEvent diagnosis = (DiagnosisEvent) tuple.getValue(0);
            int code = Integer.parseInt(diagnosis.diagnosisCode);
            if (code <= 322) {
                LOG.debug("Emitting disease [" + diagnosis.diagnosisCode + "]");
                return true;
            }
            else {
                LOG.debug("Filtering disease [" + diagnosis.diagnosisCode + "]");
                return false;
            }
        }
    }
    

      Filter操作结果返回true的tuple将会被发送到下游进行操作,如果返回false,该tuple就不会发送到下游。在数据流上使用each(inputFields, filter)方法,可以将这个过滤器应用到每个tuple中。

    四、Trident function
      STorm还提供了一个更通用的功能接口function。function和Storm的bolt类似,读取tuple并且发送新的tuple。其中一个区别是Trident function只能添加数据。function发送数据时,将新字段添加到tuple中,并不会删除或者变更已有的字段。
      和Storm的bolt类似,function实现了一个包括逻辑的方法execute。function的实现也可以选用TridentCollector来发送tuple到新的function中。用这种方式,function也可以用来过滤tuple,起到filter的作用。
      来看下CityAssignment类:

    public class CityAssignment extends BaseFunction {
        private static final long serialVersionUID = 1L;
    
        private static final Logger LOG = LoggerFactory.getLogger(CityAssignment.class);
    
        private static Map<String, double[]> CITIES = new HashMap<>();
    
        static {
            // Initialize the cities
            CITIES.put("PHL", new double[]{39.875365, -75.249524});
            CITIES.put("NYC", new double[]{40.71448, -74.00598});
            CITIES.put("SF", new double[]{-31.4250142, -62.0841809});
            CITIES.put("LA", new double[]{-34.05374, -118.24307});
        }
    
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            DiagnosisEvent diagnosis = (DiagnosisEvent) tuple.getValue(0);
            double leastDistance = Double.MAX_VALUE;
            String closestCity = "NONE";
            for (Map.Entry<String, double[]> city : CITIES.entrySet()) {
                double R = 6371; // km
                double x = (city.getValue()[0] - diagnosis.lng) * Math.cos((city.getValue()[0] + diagnosis.lng) / 2);
                double y = (city.getValue()[1] - diagnosis.lat);
                double d = Math.sqrt(x * x + y * y) * R;
                if (d < leastDistance) {
                    leastDistance = d;
                    closestCity = city.getKey();
                }
            }
            List<Object> values = new ArrayList<>();
            values.add(closestCity);
    
            LOG.info("Closest city to lat=[" + diagnosis.lat + "], lng=[" + diagnosis.lng + "] == [" + closestCity
                    + "], d=[" + leastDistance + "]");
    
            collector.emit(values);
        }
    }
    

      使用静态初始化方式建立了一个城市地图。在execute()方法中,函数遍历城市计算事件和城市之间的距离。function声明的字段数量必须和它发射出的值的字段数一致。
      接下来,HourAssignment用来转化Unix时间戳,对事件进行时间分组操作。HourAssignment代码如下:

    public class HourAssignment extends BaseFunction {
        private static final long serialVersionUID = 1L;
    
        private static final Logger LOG = LoggerFactory.getLogger(HourAssignment.class);
    
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            DiagnosisEvent diagnosis = (DiagnosisEvent) tuple.getValue(0);
            String city = (String) tuple.getValue(1);
    
            long timestamp = diagnosis.time;
            long hourSinceEpoch = timestamp / 1000 / 60 / 60;
            LOG.info("Key =  [" + city + ":" + hourSinceEpoch + "]");
            String key = city + ":" + diagnosis.diagnosisCode + ":" + hourSinceEpoch;
    
            List<Object> values = new ArrayList<>();
            values.add(hourSinceEpoch);
            values.add(key);
            collector.emit(values);
        }
    }
    

      HourAssignment发射小时的数据,以及由城市、疾病代码、小时组合而成的key。实际上,这个组合值会作为聚合计数的唯一标识符。

      再来看最后两个function用来侦测疾病暴发并改进。OutbreakDetector类代码如下:

    public class OutbreakDetector extends BaseFunction {
        private static final long serialVersionUID = 1L;
    
        public static final int THRESHOLD = 10000;
    
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String key = (String) tuple.getValue(0);
            Long count = (Long) tuple.getValue(1);
    
            if (count > THRESHOLD) {
                List<Object> values = new ArrayList<>();
                values.add("Outbreak detected for [" + key + "]");
                collector.emit(values);
            }
        }
    }
    

      这个function提取出了特定城市、疾病、时间的发生次数,并且检查计数是否超过了设定的阈值。若超过,则发送一个新的字段包括一条告警信息。
      DispatchAlert功能就是发布一个告警(并且结束程序):

    public class DispatchAlert extends BaseFunction {
        private static final long serialVersionUID = 1L;
    
        private static final Logger LOG = LoggerFactory.getLogger(CityAssignment.class);
    
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String alert = (String) tuple.getValue(0);
            LOG.error("ALERT RECEIVED [" + alert + "]");
            LOG.error("Dispatch the national guard!");
            System.exit(0);
        }
    }
    

    五、Trident state
      接下来要完成持久化的操作,来看OutbreakTrendFactory类:

    public class OutbreakTrendFactory implements StateFactory {
        private static final long serialVersionUID = 1L;
    
        @Override
        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
            return new OutbreakTrendState(new OutbreakTrendBackingMap());
        }
    }
    

      工厂类返回一个State对象,Storm用它来持久化存储信息。在Storm中,有三种类型的状态。
      1)非事务型:没有回滚能力,更新操作是永久性的,commit操作会被忽略;
      2)重复事务型:由同一批tuple提供的结果是幂等的;
      3)不透明事务型: 更新操作基于先前的值,这样一批数据组成不同,持久化的数据也会变。

      在分布式环境下,数据可能被重放,为了支持计数和状态更新,Trident将状态更新操作进行序列化,使用不同的状态更新模式对重放和错误数据进行容错。
      来看OutBreakTrendState和OutbreakTrendBackingMap:

    public class OutbreakTrendState extends NonTransactionalMap<Long> {
        protected OutbreakTrendState(OutbreakTrendBackingMap outbreakBackingMap) {
            super(outbreakBackingMap);
        }
    }
    
    public class OutbreakTrendBackingMap implements IBackingMap<Long> {
        private static final Logger LOG = LoggerFactory.getLogger(OutbreakTrendBackingMap.class);
    
        private Map<String, Long> storage = new ConcurrentHashMap<>();
    
        @Override
        public List<Long> multiGet(List<List<Object>> keys) {
            List<Long> values = new ArrayList<>();
            for (List<Object> key : keys) {
                Long value = storage.get(key.get(0));
                if (value == null) {
                    values.add(0L);
                }
                else {
                    values.add(value);
                }
            }
            return values;
        }
    
        @Override
        public void multiPut(List<List<Object>> keys, List<Long> vals) {
            for (int i = 0; i < keys.size(); i++) {
                LOG.info("Persisting [" + keys.get(i).get(0) + "] ==> [" + vals.get(i) + "]");
                storage.put((String) keys.get(i).get(0), vals.get(i));
            }
        }
    }
    

      在示例的topology中,实际上没有固化存储数据。只是简单的将数据放入ConcurrentHashMap中。显然,对于多个机器的环境下,这样是不可行的。然而,BackingMap是一个抽象。只需要将传入的MapState对象的backing map的实例替换就可以更换持久层的实现。

      最后执行topology:

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("cdc", conf, buildTopology());
        Thread.sleep(200000);
        cluster.shutdown();
    }
    

    实例二

      该示例是将消息中的内容提取出来成name, age, title, tel4个field,然后通过project只保留name字段供统计,接着按照name分区后,为每个分区进行聚合,最后将聚合结果通过state写入map中。

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();
        DiagnosisEventSpout spout = new DiagnosisEventSpout();
    
        Stream inputStream = topology
                .newStream("tridentStateDemoId", spout)
                .parallelismHint(3) //设置并行度
                .shuffle()
                .parallelismHint(3)
                .each(new Fields("msg"), new Split(), new Fields("name", "age", "title", "tel"))
                .parallelismHint(3)
                .project(new Fields("name")) //不需要发射age、title、tel字段
                .parallelismHint(3)
                .partitionBy(new Fields("name"));
    
        inputStream.partitionAggregate(new Fields("name"), new NameCountAggregator(), new Fields("nameSumKey", "nameSumValue"))
                .partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"), new NameSumUpdater());
    
        return topology.build();
    }
    

      这里涉及了一些trident常用的API,但project等相对容易理解,这里介绍下partitionAggregate的用法。代码中对partitionAggregate的使用:

    .partitionAggregate(new Fields("name"), new NameCountAggregator(),
                new Fields("nameSumKey", "nameSumValue"))
    

      第一、三个参数分别表示输入流与输出流的名称。中间的NameCountAggregator是一个Aggregator的对象,它定义了如何对输入流进行聚合。

      先看下Aggregator接口的定义,这个接口有3个方法:

    public interface Aggregator<T> extends Operation {
        T init(Object batchId, TridentCollector collector);
        void aggregate(T val, TridentTuple tuple, TridentCollector collector);
        void complete(T val, TridentCollector collector);
    }
    

      init方法在处理batch之前被调用。init的返回值是一个表示聚合状态的对象,该对象会被传递到aggregate和complete方法。
      aggregate方法为每个在batch分区的输入元组所调用,更新状态。
      complete方法是当batch分区的所有元组已经被aggregate方法处理完后被调用。

      NameCountAggregator实现了该接口:

    public class NameCountAggregator implements Aggregator<Map<String, Integer>> {
        private static final Logger LOG = LoggerFactory.getLogger(NameCountAggregator.class);
    
        private static final long serialVersionUID = -5141558506999420908L;
    
        @Override
        public Map<String, Integer> init(Object batchId, TridentCollector collector) {
            LOG.info("init {}", batchId);
            return new HashMap<>();
        }
    
        //判断某个名字是否已经存在于map中,若无,则put,若有,则递增
        @Override
        public void aggregate(Map<String, Integer> map, TridentTuple tuple, TridentCollector collector) {
            String key = tuple.getString(0);
            if (map.containsKey(key)) {
                Integer tmp = map.get(key);
                map.put(key, ++tmp);
            }
            else {
                map.put(key, 1);
            }
        }
    
        //将聚合后的结果emit出去
        @Override
        public void complete(Map<String, Integer> map, TridentCollector collector) {
            if (map.size() > 0) {
                for (Map.Entry<String, Integer> entry : map.entrySet()) {
                    LOG.info("Thread.id={}, | {} | {}", Thread.currentThread().getId(), entry.getKey(), entry.getValue());
                    collector.emit(new Values(entry.getKey(), entry.getValue()));
                }
                map.clear();
            }
        }
    
        @Override
        public void prepare(Map conf, TridentOperationContext context) {
    
        }
    
        @Override
        public void cleanup() {
    
        }
    }
    

      init方法初始化了一个HashMap对象,这个对象会作为参数传给aggregate和complete方法,对一个batch只执行一次。aggregate方法对于batch内的每一个tuple均执行一次。这里将这个batch内的名字出现的次数放到init方法所初始化的map中。complete中会将aggregate处理完的结果发送出去,实际上可以在任何地方emit,比如在aggregate里面。这个方法对于一个batch也只执行一次。

      topology中将结果写入state:

    partitionPersist(
                new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),
                new NameSumUpdater());
    

      它的定义为:

    TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)
    

      其中的第二个参数比较容易理解,就是输入流的名称,这里是名字与它出现的个数。下面先看一下Facotry:

    public class NameSumStateFactory implements StateFactory {
    
        private static final long serialVersionUID = 8753337648320982637L;
    
        @Override
        public State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) {
            return new NameSumState();  
        } 
    }
    

      它实现了StateFactory,只有一个方法makeState,返回一个State类型的对象。

      NameSumUpdater这个类继承自BaseStateUpdater,updateState对batch的内容进行处理,这里是将batch的内容放到一个map中,然后调用setBulk方法:

    public class NameSumUpdater extends BaseStateUpdater<NameSumState> {
        private static final long serialVersionUID = -6108745529419385248L;
    
        public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) {
            Map<String, Integer> map = new HashMap<>();
            for (TridentTuple t : tuples) {
                map.put(t.getString(0), t.getInteger(1));
            }
            state.setBulk(map);
        }
    }
    

      状态类NameSumState是state最核心的类,它实现了大部分的逻辑。NameSumState实现了State接口:

    public class NameSumState implements State {
        
        private Map<String, Integer> map = new HashMap<>();
        
        @Override
        public void beginCommit(Long txid) {
    
        }
    
        @Override
        public void commit(Long txid) {
    
        }
    
        public void setBulk(Map<String, Integer> map) {
            // 将新到的tuple累加至map中
            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                String key = entry.getKey();
                if (this.map.containsKey(key)) {
                    this.map.put(key, this.map.get(key) + map.get(key));
                } else {
                    this.map.put(key, entry.getValue());
                }
            }
            System.out.println("-------");
            // 将map中的当前状态打印出来。
            for (Map.Entry<String, Integer> entry : this.map.entrySet()) {
                String Key = entry.getKey();
                Integer Value = entry.getValue();
                System.out.println(Key + "|" + Value);
            }
        }
    }
    

      beginCommit、commit分别在提交之前与提交成功的时候调用。另外NameSumState还定义了如何处理NameSumUpdater传递的消息setBulk方法。setBulk方法中即将NameSumUpdater传送过来的内容写入一个HashMap中,并打印出来。 此处将state记录在一个HashMap中,如果需要记录在其它地方,如mysql,则使用jdbc写入mysql代替map操作即可。


      state的应用步骤相当简单,原理也很简单。NameSumStateFactory()指定了将结果保存在哪里,NameSumUpdater()指定了更新state的逻辑,如将当前数据和原有数据相加等。

      state应用的一些注意事项:
      (1)使用state,不再需要比较事务id,在数据库中同时写入多个值等内容,而是专注于逻辑实现
      (2)除了实现State接口,更常用的是实现MapState接口。
      (3)在拓扑中指定了StateFactory,这个工厂类找到相应的State类。而Updater则每个批次均会调用它的方法。State中则定义了如何保存数据,这里将数据保存在内存中的一个HashMap,还可以保存在mysql, hbase等等。
      (4)trident会自动比较txid的值,如果和当前一样,则不更改状态,如果是当前txid的下一个值,则更新状态。这种逻辑不需要用户处理。
      (5)如果需要实现透明事务状态,则需要保存当前值与上一个值,在update的时候2个要同时处理。即逻辑由自己实现。在本例子中,大致思路是在NameSumState中创建2个HashMap,分别对应当前与上一个状态的值,而NameSumUpdater每次更新这2个Map。

    state与MapState的差异
      由上面可以看出,state需要自己指定如何更新数据

    if (this.map.containsKey(key)) {
                this.map.put(key, this.map.get(key) + map.get(key));
            } else {
                this.map.put(key, entry.getValue());
            }
    }
    

      这里是将原有的值,加上新到的值。而MapState会根据选择的类型(Transactional, Opaque, NonTransactional)定义好逻辑,只要定义如何向state中读写数据即可。

      MapState将State的aggreate与persistent 这两部分操作合在一起了,由方法名也可以看出。在State中最后2步是partitionAggregate()与partitionPersistent(),而在MapState中最后1步是persistentAggregate()
      事实上,查看persistentAggregate()的实现,它最终也是分成aggregate和persistent两个步骤的。

    public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        return aggregate(inputFields, agg, functionFields)
                .partitionPersist(spec,
                        TridentUtils.fieldsUnion(_groupFields, functionFields),
                        new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),
                        TridentUtils.fieldsConcat(_groupFields, functionFields)); 
    }
    

    1、persistentAggregate
      Trident有另外一种更新State的方法叫做persistentAggregate。如下:

    TridentTopology topology = new TridentTopology();          
    TridentState wordCounts =  
          topology.newStream("spout1", spout)  
            .each(new Fields("sentence"), new Split(), new Fields("word"))  
            .groupBy(new Fields("word"))  
            .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))  
    

      persistentAggregate是在partitionPersist之上的另外一层抽象。它知道怎么去使用一个Trident 聚合器来更新State。在这个例子当中,因为这是一个group好的stream,Trident会期待提供的state是实现了MapState接口的。用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中。MapState接口看上去如下所示:

    public interface MapState<T> extends State {  
        List<T> multiGet(List<List<Object>> keys);  
        List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);  
        void multiPut(List<List<Object>> keys, List<T> vals);  
    }  
    

      当在一个未经过group的stream上面进行聚合的话,Trident会期待你的state实现Snapshottable接口:

    public interface Snapshottable<T> extends State {  
        T get();  
        T update(ValueUpdater updater);  
        void set(T o);  
    }  
    

      MemoryMapState 和 MemcachedState 都实现了上面的2个接口。


      在Trident中实现MapState是非常简单的,它几乎帮你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 类实现了所有相关的逻辑,包括容错的逻辑。只需要将一个IBackingMap 的实现提供给这些类就可以了。IBackingMap接口看上去如下所示:

    public interface IBackingMap<T> {  
        List<T> multiGet(List<List<Object>> keys);   
        void multiPut(List<List<Object>> keys, List<T> vals);   
    }  
    

      multiGet 的参数是一个List,可以根据key来查询数据,key本身也是一个List,以方便多个值组合成key的情形。 multiPut的参数是一个List类型的keys和一个List类型的values,它们的size应该是相等的,把这些值写入state中。

      OpaqueMap’s会用OpaqueValue的value来调用multiPut方法,TransactionalMap’s会提供TransactionalValue中的value,而NonTransactionalMaps只是简单的把从Topology获取的object传递给multiPut。
      Trident还提供了一种CachedMap类来进行自动的LRU cache。
      另外,Trident 提供了 SnapshottableMap 类将一个MapState 转换成一个 Snapshottable 对象.
      可以看看 MemcachedState的实现,从而了解怎样将这些工具组合在一起形成一个高性能的MapState实现。MemcachedState是允许选择使用opaque transactional, transactional, 还是 non-transactional 语义的。

      实现一个MapState,可以实现IBackingMap接口(mutliGet()/multiPut),并且实现StateFactory接口(makeState()),返回一个State对象,这是常见的用法。


      
      以事务型状态为例,看一下整个存储过程的逻辑:
      首先,persistentAggregate收到一批数据,它的第一个参数返回的是事务型的MapState。然后,TransactionalMap在multiUpdate中会判断这个事务的txid与当前state中的txid是否一致。如果txid一致的话,则保持原来的值即可,如果txid不一致,则更新数值。 如果更新数据呢?它是拿新来的值和state中的原有的值,使用persistentAggregate中第2个参数定义的类方法作聚合计算。

    MapState读写mysql示例
    (1)MysqlMapStateFactory

    public class MysqlMapStateFactory<T> implements StateFactory {
    
        private static final long serialVersionUID = 1987523234141L;
        @Override
        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
            return  TransactionalMap.build((IBackingMap<TransactionalValue>) new MysqlMapStateBacking());
        }
    }
    

      很简单,就一行,返回一个IBacking对象。这里使用的Transactioal,当然还可以使用NonTransactional和Opaque。

    (2)MysqlMapStateBacking
      最核心的还是multiGet()和multiPut:

        @Override
        public List<TransactionalValue> multiGet(List<List<Object>> keys) {
            if (stmt == null) {
                stmt = getStatment();
            }
            List<TransactionalValue> values = new ArrayList<TransactionalValue>();
    
            for (List<Object> key : keys) {
    
                String sql = "SELECT req_count FROM edt_analysis where id='" + key.get(0) + "'";
                LOG.debug("============sql: " + sql);
                try (ResultSet rs = stmt.executeQuery(sql)) {
                    if (rs.next()) {
                        LOG.info("Get value:{} by key:{}", rs.getObject(1), key);
                        values.add(derialize(rs.getObject(1)));
                    } else {
                        values.add(null);
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            return values;
        }
    
        @Override
        public void multiPut(List<List<Object>> keys, List<TransactionalValue> vals) {
            if (stmt == null) {
                stmt = getStatment();
            }
    
            for (int i = 0; i < keys.size(); i++) {
                String sql = "replace into edt_analysis values('" + keys.get(i).get(0) + "','" + serialize(vals.get(i))
                        + "')";
                LOG.debug("===================put sql " + sql);
                try {
                    stmt.execute(sql);
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    

      但mysql与redis之类的不同,它需要将一个TransactionalValue对象转换为mysql中的一行数据,同理,需要将mysql中的一行数据转换为一个TransactionalValue对象:

    // 将数据库中的varchar转换为TransactionalValue对象
    private TransactionalValue derialize(Object object) {
        String value[] = object.toString().split(",");
        return new TransactionalValue(Long.parseLong(value[0]), Long.parseLong(value[1]));
    }
    
    // 将TransactionalValue转换为String
    private String serialize(TransactionalValue transactionalValue) {
        return transactionalValue.getTxid() + "," + transactionalValue.getVal();
    }
    

      这是使用了最简单的方式,只有2列,一行是key,一列是value,value中保存了txid及真实的value,之间以逗号分隔。


    以HBaseMapState为例分析MapState代码调用全过程
    1、调用过程
    (1)SubtopologyBolt implements ITridentBatchBolt这个bolt在完成一个batch的处理后会调用finishBatch(BatchInfo batchInfo)
    (2)然后调用PartitionPersistProcessor implements TridentProcessor这个处理器的finishBatch(ProcessorContext processorContext)
    (3)接着调用MapCombinerAggStateUpdater implements StateUpdater<MapState>updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector)
    (4)再接着调用TransactionalMap<T> implements MapState<T>multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters)
    (5)最后就是调用用户定义的MapState类(如HBaseMapState)的multiGet()multiPut()方法了。

      ITridentBatchBolt,简单的说就是一个blot被处理完后,会调用finishBatch()方法,然后这个方法会调用MapState()框架的updateState(),接着调用mutliUpdate(),最后调用用户定义的multiGet()和multiPut()。

    展开全文
  • Trident是用Kotlin编写的Minecraft服务器API和实现。 为什么使用三叉戟? 又快又轻! Trident尽其所能整合并发! 干净的! Trident不使用Mojang的任何代码。 美丽的API! 要使Trident的API达到最佳状态,人们有很...
  • Storm Trident

    2021-03-11 15:04:31
    Trident是Storm的延伸。像Storm一样,Trident也是由Twitter开发的。开发Trident的主要原因是在Storm之上提供高级抽象以及有状态流处理和低延迟分布式查询。Trident使用喷嘴和螺栓,但这些底层组件在执行前由Trident...

    Trident是Storm的延伸。像Storm一样,Trident也是由Twitter开发的。开发Trident的主要原因是在Storm之上提供高级抽象以及有状态流处理和低延迟分布式查询。

    Trident使用喷嘴和螺栓,但这些底层组件在执行前由Trident自动生成。Trident具有功能,过滤器,连接,分组和聚合。

    Trident处理流作为一系列被称为交易的批次。通常,这些小批量的大小将取决于数千或数百万个元组,取决于输入流。这样,Trident不同于Storm,它执行元组处理。

    批处理概念与数据库事务非常相似。每笔交易都分配一个交易ID。一旦所有处理完成,交易即被视为成功。但是,处理一个事务元组失败将导致整个事务被重新传输。对于每个批次,Trident将在交易开始时调用beginCommit,并在结束时进行提交。

    Trident拓扑

    Trident API公开了使用“TridentTopology”类创建Trident拓扑的简单选项。基本上,Trident拓扑接收来自喷口的输入流并且在该流上执行有序的操作序列(过滤,聚合,分组等)。Storm元组被Trident元组取代,螺栓被操作取代。一个简单的Trident拓扑可以创建如下

    TridentTopology topology = new TridentTopology();

    Trident元组

    Trident元组是一个已命名的值列表。TridentTuple接口是Trident拓扑的数据模型。TridentTuple接口是可以由Trident拓扑处理的基本数据单元。

    Trident喷口

    Trident喷口与Storm喷口相似,具有使用Trident功能的附加选项。实际上,我们仍然可以使用我们在Storm拓扑中使用的IRichSpout,但它本质上不具有事务性,我们将无法使用Trident提供的优势。

    具有使用Trident功能的所有功能的基本喷嘴是“ITridentSpout”。它支持事务性和不透明事务语义。其他喷嘴是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。

    除了这些通用喷嘴之外,Trident还有很多Trident喷嘴的实例。其中一个是FeederBatchSpout喷口,我们可以使用它轻松发送Trident元组的命名列表,而无需担心批处理,并行性等问题。

    FeederBatchSpout的创建和数据馈送可以按照如下所示完成

    TridentTopology topology = new TridentTopology();

    FeederBatchSpout testSpout = new FeederBatchSpout(

    ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));

    topology.newStream("fixed-batch-spout", testSpout)

    testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

    Trident操作

    Trident依靠“Trident操作”来处理Trident元组的输入流。Trident

    API具有许多内置操作来处理从简单到复杂的流处理。这些操作从简单验证到复杂的Trident元组分组和聚合。让我们来看看最重要和最常用的操作。

    过滤

    过滤器是用于执行输入验证任务的对象。Trident过滤器获取Trident元组字段的子集作为输入,并根据某些条件是否满足返回true或false。如果返回true,则元组保存在输出流中;

    否则,该元组将从流中移除。过滤器将基本上继承自 BaseFilter 类并实现 isKeep 方法。以下是过滤器操作的示例实现

    public class MyFilter extends BaseFilter {

    public boolean isKeep(TridentTuple tuple) {

    return tuple.getInteger(1) % 2 == 0;

    }

    }

    输入

    [1, 2]

    [1, 3]

    [1, 4]

    输出

    [1, 2]

    [1, 4]

    可以使用“每个”方法在拓扑中调用过滤器函数。“Fields”类可用于指定输入(Trident元组的子集)。示例代码如下

    TridentTopology topology = new TridentTopology();

    topology.newStream("spout", spout)

    .each(new Fields("a", "b"), new MyFilter())

    功能

    函数 是用于在单个Trident元组上执行简单操作的对象。它需要Trident元组字段的子集并发出零个或更多新的Trident元组字段。

    函数 基本上从 BaseFunction 类继承并实现了 execute 方法。下面给出了一个示例实现 -

    public class MyFunction extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {

    int a = tuple.getInteger(0);

    int b = tuple.getInteger(1);

    collector.emit(new Values(a + b));

    }

    }

    输入

    [1, 2]

    [1, 3]

    [1, 4]

    输出

    [1, 2, 3]

    [1, 3, 4]

    [1, 4, 5]

    就像过滤器操作一样,可以使用 每种 方法在拓扑中调用函数操作。示例代码如下

    TridentTopology topology = new TridentTopology();

    topology.newStream("spout", spout)

    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

    聚合

    聚集是用于对输入批处理或分区或流执行聚合操作的对象。Trident有三种类型的聚合。他们如下

    聚集 - 孤立地聚集每批Trident元组。 在聚合过程中,元组最初使用全局分组重新分区,以将同一批次的所有分区合并到一个分区中。

    partitionAggregate - 聚合每个分区,而不是整个批次的Trident元组。 分区聚合的输出完全替换了输入元组。分区聚合的输出包含单个字段元组。

    persistentaggregate - 在所有批次的所有Trident元组上聚合并将结果存储在内存或数据库中。

    TridentTopology topology = new TridentTopology();

    // aggregate operation

    topology.newStream("spout", spout)

    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

    .aggregate(new Count(), new Fields(“count”))

    // partitionAggregate operation

    topology.newStream("spout", spout)

    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

    .partitionAggregate(new Count(), new Fields(“count"))

    // persistentAggregate - saving the count to memory

    topology.newStream("spout", spout)

    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

    可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口创建聚合操作。上面例子中使用的“count”聚合器是内置聚合器之一,它使用“CombinerAggregator”实现,具体实现如下

    public class Count implements CombinerAggregator {

    @Override

    public Long init(TridentTuple tuple) {

    return 1L;

    }

    @Override

    public Long combine(Long val1, Long val2) {

    return val1 + val2;

    }

    @Override

    public Long zero() {

    return 0L;

    }

    }

    分组

    分组操作是一种内置操作,可以通过 groupBy

    方法调用。groupBy方法通过在指定的字段上执行partitionBy来重新分区流,然后在每个分区内将它的组字段相等的元组分组在一起。通常,我们使用“groupBy”和“persistentAggregate”来获得分组聚合。示例代码如下

    TridentTopology topology = new TridentTopology();

    // persistentAggregate - saving the count to memory

    topology.newStream("spout", spout)

    .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

    .groupBy(new Fields(“d”)

    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

    合并和加入

    合并和连接可以分别使用“合并”和“连接”方法完成。合并合并一个或多个流。连接类似于合并,除了连接使用来自双方的三叉形元组字段来检查和连接两个流。而且,加入只能在批次级别下工作。示例代码如下

    TridentTopology topology = new TridentTopology();

    topology.merge(stream1, stream2, stream3);

    topology.join(stream1, new Fields("key"), stream2, new Fields("x"),

    new Fields("key", "a", "b", "c"));

    状态维护

    Trident提供了状态维护机制。状态信息可以存储在拓扑本身中,否则可以将其存储在单独的数据库中。原因是维护一个状态,即如果任何元组在处理期间失败,则重试失败的元组。这在更新状态时会产生问题,因为您不确定此元组的状态是否已更新过。如果元组在更新状态之前失败了,那么重试元组将使状态稳定。但是,如果元组在更新状态后失败,则重试同一元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤来确保消息只处理一次

    小批量处理元组。

    为每个批次分配一个唯一的ID。如果批次重试,则会给出相同的唯一ID。

    状态更新在批次中排序。例如,第二批次的状态更新将不可能,直到第一批次的状态更新完成。

    分布式RPC

    分布式RPC用于查询和检索Trident拓扑的结果。Storm有一个内置的分布式RPC服务器。分布式RPC服务器接收来自客户端的RPC请求并将其传递给拓扑。拓扑处理请求并将结果发送到分布式RPC服务器,该服务器由分布式RPC服务器重定向到客户端。Trident的分布式RPC查询像普通的RPC查询一样执行,除了这些查询是并行运行的。

    何时使用Trident?

    和许多用例一样,如果需求只处理查询一次,我们可以通过在Trident中编写拓扑来实现。另一方面,在Storm的情况下,很难实现一次处理。因此Trident对那些需要精确处理一次的用例非常有用。Trident并非针对所有用例,特别是高性能用例,因为它增加了Storm的复杂性并管理了状态。

    Trident的工作例子

    我们将把我们在前一节中制定的呼叫日志分析器应用程序转换为Trident框架。由于其高级API,Trident应用相对简单Storm相对容易。Storm基本上需要执行Trident中的Function,Filter,Aggregate,GroupBy,Join和Merge操作中的任何一个。最后,我们将使用

    LocalDRPC 类启动DRPC服务器,并使用 LocalDRPC 类的 执行 方法搜索一些关键字。

    格式化通话信息

    FormatCall类的用途是格式化包含“呼叫者号码”和“接收者号码”的呼叫信息。完整的程序代码如下所示 -

    编码:FormatCall.java

    import backtype.storm.tuple.Values;

    import storm.trident.operation.BaseFunction;

    import storm.trident.operation.TridentCollector;

    import storm.trident.tuple.TridentTuple;

    public class FormatCall extends BaseFunction {

    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

    String fromMobileNumber = tuple.getString(0);

    String toMobileNumber = tuple.getString(1);

    collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));

    }

    }

    CSVSplit

    CSVSplit类的用途是根据“逗号(,)”分割输入字符串并发送字符串中的每个单词。该函数用于解析分布式查询的输入参数。完整的代码如下 -

    编码:CSVSplit.java

    import backtype.storm.tuple.Values;

    import storm.trident.operation.BaseFunction;

    import storm.trident.operation.TridentCollector;

    import storm.trident.tuple.TridentTuple;

    public class CSVSplit extends BaseFunction {

    @Override

    public void execute(TridentTuple tuple, TridentCollector collector) {

    for(String word: tuple.getString(0).split(",")) {

    if(word.length() > 0) {

    collector.emit(new Values(word));

    }

    }

    }

    }

    日志分析器

    这是主要的应用程序。最初,应用程序将使用 FeederBatchSpout

    初始化TridentTopology并提供来电者信息。Trident拓扑流可以使用TridentTopology类的 newStream

    方法创建。同样,可以使用TridentTopology类的 newDRCPStream

    方法创建Trident拓扑DRPC流。一个简单的DRCP服务器可以使用LocalDRPC类创建。 LocalDRPC

    具有执行搜索某个关键字的方法。完整的代码如下。

    编码:LogAnalyserTrident.java

    import java.util.*;

    import backtype.storm.Config;

    import backtype.storm.LocalCluster;

    import backtype.storm.LocalDRPC;

    import backtype.storm.utils.DRPCClient;

    import backtype.storm.tuple.Fields;

    import backtype.storm.tuple.Values;

    import storm.trident.TridentState;

    import storm.trident.TridentTopology;

    import storm.trident.tuple.TridentTuple;

    import storm.trident.operation.builtin.FilterNull;

    import storm.trident.operation.builtin.Count;

    import storm.trident.operation.builtin.Sum;

    import storm.trident.operation.builtin.MapGet;

    import storm.trident.operation.builtin.Debug;

    import storm.trident.operation.BaseFilter;

    import storm.trident.testing.FixedBatchSpout;

    import storm.trident.testing.FeederBatchSpout;

    import storm.trident.testing.Split;

    import storm.trident.testing.MemoryMapState;

    import com.google.common.collect.ImmutableList;

    public class LogAnalyserTrident {

    public static void main(String[] args) throws Exception {

    System.out.println("Log Analyser Trident");

    TridentTopology topology = new TridentTopology();

    FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",

    "toMobileNumber", "duration"));

    TridentState callCounts = topology

    .newStream("fixed-batch-spout", testSpout)

    .each(new Fields("fromMobileNumber", "toMobileNumber"),

    new FormatCall(), new Fields("call"))

    .groupBy(new Fields("call"))

    .persistentAggregate(new MemoryMapState.Factory(), new Count(),

    new Fields("count"));

    LocalDRPC drpc = new LocalDRPC();

    topology.newDRPCStream("call_count", drpc)

    .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

    topology.newDRPCStream("multiple_call_count", drpc)

    .each(new Fields("args"), new CSVSplit(), new Fields("call"))

    .groupBy(new Fields("call"))

    .stateQuery(callCounts, new Fields("call"), new MapGet(),

    new Fields("count"))

    .each(new Fields("call", "count"), new Debug())

    .each(new Fields("count"), new FilterNull())

    .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

    Config conf = new Config();

    LocalCluster cluster = new LocalCluster();

    cluster.submitTopology("trident", conf, topology.build());

    Random randomGenerator = new Random();

    int idx = 0;

    while(idx < 10) {

    testSpout.feed(ImmutableList.of(new Values("1234123401",

    "1234123402", randomGenerator.nextInt(60))));

    testSpout.feed(ImmutableList.of(new Values("1234123401",

    "1234123403", randomGenerator.nextInt(60))));

    testSpout.feed(ImmutableList.of(new Values("1234123401",

    "1234123404", randomGenerator.nextInt(60))));

    testSpout.feed(ImmutableList.of(new Values("1234123402",

    "1234123403", randomGenerator.nextInt(60))));

    idx = idx + 1;

    }

    System.out.println("DRPC : Query starts");

    System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));

    System.out.println(drpc.execute("multiple_call_count", "1234123401 -

    1234123402,1234123401 - 1234123403"));

    System.out.println("DRPC : Query ends");

    cluster.shutdown();

    drpc.shutdown();

    // DRPCClient client = new DRPCClient("drpc.server.location", 3772);

    }

    }

    构建和运行应用程序

    完整的应用程序有三个Java代码。他们如下 -

    FormatCall.java

    CSVSplit.java

    LogAnalyerTrident.java

    应用程序可以通过使用以下命令来构建 -

    javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

    该应用程序可以通过使用以下命令运行 -

    java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

    输出

    应用程序启动后,应用程序将输出关于集群启动过程,操作处理,DRPC服务器和客户端信息以及集群关闭过程的完整详细信息。该输出将显示在控制台上,如下所示。

    DRPC : Query starts

    [["1234123401 - 1234123402",10]]

    DEBUG: [1234123401 - 1234123402, 10]

    DEBUG: [1234123401 - 1234123403, 10]

    [[20]]

    DRPC : Query ends

    展开全文
  • Use 'True' for the last trident block. """ super().__init__() assert num_branch == len(dilations) self.num_branch = num_branch self.concat_output = concat_output self.test_branch_idx = test_...
  • Trident-MySQL

    2021-02-04 14:47:29
    使用事物TridentTopology 持久化数据到MySQL1、构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays;importjava.util.Map;importorg.apache.storm.Config;importorg.apache.storm.Local...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 38,728
精华内容 15,491
关键字:

Trident