精华内容
下载资源
问答
  • Strom

    千次阅读 2019-04-19 08:46:07
    Strom学习

    Strom简介(官网http://storm.apache.org/)

    1. Strom是实时的、分布式、具备高容错的计算系统
    2. 数据处理位置:
    • strom进程常驻内存
    • strom数据不经过磁盘,在内存中处理
    1. 发展:
    • 2013年,Storm进入Apache社区进行孵化
    • 2014年9月,晋级成为了Apache顶级项目。国内外各大网站使用,例如雅虎、阿里、百度
    1. 架构
    • Nimbus
    • Supervisor
    • Worker
    • topology
    1. 编程模型
    • DAG (Topology)
    • Spout :nextTuple(核心)、open初始化、declareOutput
    • Bolt:executor(核心)、prepare初始化、declareOutput
    1. 数据传输
    • ZMQ
      ZeroMQ 开源的消息传递框架,并不是一个MessageQueue
    • Netty
      Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)
    1. 高可靠性
    • 异常处理
    • 消息可靠性保障机制
    1. 可维护性
    • Storm UI图形化监控接口
      在这里插入图片描述
    1. 流式处理
    • 流式处理(异步):客户端提交数据进行结算,并不会等待数据计算结算
    • 逐条处理:如ETL
    • 统计分析:
      例子:计算pv、uv、访问热点以及某些数据的聚合、加和、平均等
      客户端提交数据之后,计算完成结果存储到Redis、HBase、MySQL或者其他MQ当中
      客户端并不关心最终结果是多少
      在这里插入图片描述
    1. 实时请求
    • 实时请求应答服务(同步)
      客户端提交数据请求之后,立刻取得计算结果并返回给客户端
    • Drpc
    • 实时请求处理
      例:图片特征提取
      在这里插入图片描述
    1. Strom和MR区别
    • Storm:进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。
    • MapReduce:为TB、PB级别数据设计的批处理计算框架。
      在这里插入图片描述

    Strom计算模型

    在这里插入图片描述
    在这里插入图片描述

    1. Topology – DAG有向无环图的实现
    • 对于Storm实时计算逻辑的封装。即,由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构
    • 生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止
      (区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)
    1. Tuple – 元组
      Stream中最小数据组成单元
    2. Stream – 数据流
    • 从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做Stream
    • Stream声明时需给其指定一个Id(默认为Default)
      实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId
    1. Spout – 数据源
    • 拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中
    • 一个Spout可以发送多个数据流(Stream)
      可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
    • Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算
    1. Bolt – 数据流处理组件
    • 拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往需要多个Bolt分多个步骤完成
    • 一个Bolt可以发送多个数据流(Stream)
      可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
    • Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑
    1. Stream Grouping – 数据流分组(即数据分发策略)

    Strom API

    Spout继承BaseRichSpout重写其方法:

    1. open(Map conf,TopologyContext context,SpoutOutputCollector collector)初始化方法
      在这里插入图片描述
    • conf 可以获取配置
    • context 上下文环境
    • collector 往下游发送数据
    1. nextTuple()核心方法:storm框架会一直调用此方法,每当调用这个方法时,往下游发送数据
      在这里插入图片描述

    2. declareOutputFields(OutputFieldsDeclarer declarer)声明输出的数据格式
      在这里插入图片描述

    Bolt继承BaseRichBolt重写其方法

    1. prepare(Map stormConf,TopologyContext context,OutputCollector collector)初始化
    2. execute(Tuple input)拿上游传来的数据
      在这里插入图片描述
    3. declareOutputFields(OutputFieldsDeclarer declarer)声明输出的数据格式

    TestTopology

    package com.sxt.storm.test;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.AuthorizationException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    
    public class TestTopology {
    
        public static void main(String[] args) {
    
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("myspout",new TestSpout());
            topologyBuilder.setBolt("mybolt",new TestBolt()).shuffleGrouping("myspout");
    
            Config conf = new Config();
    //        conf.setNumWorkers(4);
            StormTopology topology = topologyBuilder.createTopology();
    
            if (args.length > 0) {
                try {
    
                    StormSubmitter.submitTopology(args[0], conf, topology);
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            } else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("mytopology", conf, topology);
            }
        }
    }
    

    在这里插入图片描述
    下游mybolt衔接上游mysout,mybolt;可能还有下游,如mybolt1,那么mybolt1要衔接上游mybolt,以此类推

    分组策略

    1. Shuffle Grouping
      随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。轮询,平均分配
    2. Fields Grouping
      按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。
    3. All Grouping
      广播发送,对于每一个tuple,所有的bolts都会收到
    4. Global Grouping
      全局分组,把tuple分配给task id最低的task 。
    5. None Grouping
      不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
    6. Direct Grouping
      指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)
    7. Local or shuffle grouping
      本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
    8. customGrouping
      自定义,相当于mapreduce那里自己去实现一个partition一样。
    package com.sxt.storm.grouping;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.AuthorizationException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    public class Main {
    
    	/**
    	 * @param args
    	 */
    	public static void main(String[] args) {
    
    		TopologyBuilder builder = new TopologyBuilder();
    
    		builder.setSpout("spout", new MySpout(), 2);
    	  
    //		 shuffleGrouping其实就是随机往下游去发,不自觉的做到了负载均衡
    //		builder.setBolt("bolt", new MyBolt(),2).shuffleGrouping("spout");
    
    		// fieldsGrouping其实就是MapReduce里面理解的Shuffle,根据fields求hash来取模
    //		builder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("session_id"));
    
    		// 只往一个里面发,往taskId小的那个里面去发送
    		builder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");
    
    		// 等于shuffleGrouping
    //		builder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");
    
    		// 广播
    		builder.setBolt("bolt", new MyBolt(), 5).allGrouping("spout");
    
    		// Map conf = new HashMap();
    		// conf.put(Config.TOPOLOGY_WORKERS, 4);
    		Config conf = new Config();
    
    		conf.setDebug(false);
    		conf.setMessageTimeoutSecs(30);
    
    		if (args.length > 0) {
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            } else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("mytopology", conf, builder.createTopology());
            }
    
    	}
    
    }
    

    架构

    Storm架构设计

    在这里插入图片描述

    1. Nimbus
    • 资源调度
    • 任务分配
    • 接收jar包
    1. Supervisor
    • 接收nimbus分配的任务
    • 启动、停止自己管理的worker进程(当前supervisor上worker数量由配置文件设定)
    1. Worker
    • 运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集)
    • worker任务类型,即spout任务、bolt任务两种
    • 启动executor(executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务)
    1. Zookeeper

    Storm 架构设计与Hadoop架构对比

    在这里插入图片描述

    Storm 任务提交流程

    在这里插入图片描述

    Storm 本地目录树

    在这里插入图片描述

    Storm Zookeeper目录树

    在这里插入图片描述

    Flume+Kafka+Storm架构设计

    在这里插入图片描述

    在这里插入图片描述

    • 采集层:实现日志收集,使用负载均衡策略
    • 消息队列:作用是解耦及不同速度系统缓冲
    • 实时处理单元:用Storm来进行数据处理,最终数据流入DB中
    • 展示单元:数据可视化,使用WEB框架展示

    美团Flume架构
    http://tech.meituan.com/mt-log-system-arch.html

    Flume的负载均衡
    http://flume.apache.org/FlumeUserGuide.html#load-balancing-sink-processor

    集群搭建和UI界面

    单机部署

    1. 环境准备:
      Java 6+
      Python 2.6.6+
    2. 上传、解压安装包
    3. 在storm目录中创建logs目录
      mkdir logs
    4. ./storm help
    5. 启动Zookeeper
      ./bin/storm dev-zookeeper >> ./logs/zk.out 2>&1 &
    6. 启动Nimbus
      ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
    7. 启动Storm UI
      ./bin/storm ui >> ./logs/ui.out 2>&1 &
    8. 启动Supervisor
      ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
    9. 启动Logviewer
      ./bin/storm logviewer &
      查看日志:http://192.168.78.101:8000/log?file=wc-1-1523947796-worker-6703.log
      Jps查看是否启动正常
      在这里插入图片描述

    集群部署

    1. 环境准备:
      Java 6+
      Python 2.6.6+
    2. 部署ZooKeeper
      版本3.4.5+ (高版本Zookeeper实现了对于自身持久化数据的定期删除功能)
      (autopurge.purgeInterval; autopurge.snapRetainCount)
    3. 上传、解压安装包
    4. 在storm目录中创建logs目录
      $ mkdir logs
    5. 修改配置文件
      storm.yaml
      Yet Another Markup Language (yaml)
    6. 配置文件内容:
      storm.zookeeper.servers:
    • “node1”
    • “node2”
    • “node3”
      storm.local.dir: “/tmp/storm”
      nimbus.host: “node1"
      supervisor.slots.ports:
      • 6700
      • 6701
      • 6702
      • 6703
    1. 启动Zookeeper集群
    2. 在node1上启动Nimbus
      ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
      ./bin/storm ui >> ./logs/ui.out 2>&1 &
    3. 在node2、node3上启动Supervisor
      (按照配置每个Supervisor上启动4个slots)
      ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &

    UI界面

    1. 启动Storm UI
    2. ./storm ui >> ./logs/ui.out 2>&1 &
    3. 通过http://node1:8080/访问

    通信机制

    1. Worker进程间的数据通信
    • ZMQ
      ZeroMQ 开源的消息传递框架,并不是一个MessageQueue
    • Netty
      Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)
    1. Worker内部的数据通信
    • Disruptor
      实现了“队列”的功能。
      可以理解为一种事件监听或者消息处理机制,即在队列当中一边由生产者放入消息数据,另一边消费者并行取出消息数据处理。

    Worker内部的消息传递机制
    在这里插入图片描述

    并发机制

    在这里插入图片描述

    1. Worker – 进程
    • 一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)
    • 这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中多台服务器上的进程所组成
    1. Executor – 线程
    • Executor是由Worker进程中生成的一个线程
    • 每个Worker进程中会运行拓扑当中的一个或多个Executor线程
    • 一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些Task任务都是对应着同一个组件(Spout、Bolt)。
    1. Task
    • 实际执行数据处理的最小单元
    • 每个task即为一个Spout或者一个Bolt

    Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整
    (默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务)

    //设置Worker进程数
    Config.setNumWorkers(int workers)
    
    //设置Executor线程数
    TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
    TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
    //其中, parallelism_hint即为executor线程数
    
    //设置Task数量
    ComponentConfigurationDeclarer.setNumTasks(Number val)
    
    //例:
    Config conf = new Config() ;
    conf.setNumWorkers(2);
    
    TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("spout", new MySpout(), 1);
    topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
                   .setNumTasks(4)
                   .shuffleGrouping("blue-spout);
    

    在这里插入图片描述
    在这里插入图片描述

    • Rebalance – 再平衡
      即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量

    • 支持两种调整方式:
      1、通过Storm UI
      2、通过Storm CLI

    • 通过Storm CLI动态调整:
      例:storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
      将mytopology拓扑worker进程数量调整为5个
      “ blue-spout ” 所使用的线程数量调整为3个
      “ yellow-bolt ”所使用的线程数量调整为10个

    容错机制

    1、集群节点宕机
    Nimbus服务器
    单点故障?
    非Nimbus服务器
    故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行

    2、进程挂掉
    Worker
    挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上
    Supervisor
    无状态(所有的状态信息都存放在Zookeeper中来管理)
    快速失败(每当遇到任何异常情况,都会自动毁灭)
    Nimbus
    无状态(所有的状态信息都存放在Zookeeper中来管理)
    快速失败(每当遇到任何异常情况,都会自动毁灭)

    3、消息的完整性
    在这里插入图片描述
    从Spout中发出的Tuple,以及基于他所产生Tuple(例如上个例子当中Spout发出的句子,以及句子当中单词的tuple等)
    由这些消息就构成了一棵tuple树
    当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“完整处理”,即消息的完整性

    Acker – 消息完整性的实现机制
    Storm的拓扑当中特殊的一些任务
    负责跟踪每个Spout发出的Tuple的DAG(有向无环图)

    ack机制

    storm 中有一个系统级别的组件是 acker,acker 追踪从 spout 发射出
    的流 ID(msgId)在每一个 task 中生成的 tuple 是否完成。spout 或者 bolt
    在处理完 tuple 后,都会告诉 acker 我已经处理完了该源 tuple(如
    tupleId=1),如果 emit 一个 tuple 的话,同时会告诉 acker 我发射了一个
    tuple(如 tupleId=2),如果在大量的高并发的消息的情况下,传统的在内存中
    跟踪执行情况的方式,内存的开销会非常大,甚至内存溢出。acker 巧妙的利
    用了 xor 的机制,只需要维护一个 msgId 的标记位即可,处理方法是 acker 在
    初始的时候,对每个 msgId 初始化一个校验值 ack-val(为 0),在处理完 tuple
    和 emit tuple 的时候,会先对这两个个值做 xor 操作,生成的中间值再和
    acker 中的当前校验值 ack-val 做 xor 生成新的 ack-val 值,当所有的 tuple
    都处理完成都得到确认,那么最后的 ack-val 自然就为 0 了(因为每一个
    tuple,从 emit 到 ack 都是经过两次 xor 操作,所以最后的 结果为 0 可以由上
    面的那个公式可以验证出来)。

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    1.storm 不存数据,负责计算
    2.容错能力基于异或
    a) Msg ->acker
    b) Acker 状态
    A xor A = 0.
    A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次。
    storm 中使用的巧妙方法就是基于这个定理。具体过程是这样的:在 spout 中
    系统会为用户指定的 message id 生成一个对应的 64 位整数,作为一个 root
    id。root id 会传递给 acker 及后续的 bolt 作为该消息单元的唯一标识。同时
    无论是 spout 还是 bolt 每次新生成一个 tuple 的时候,都会赋予该 tuple 一
    个 64 位的整数的 id。Spout 发射完某个 message id 对应的源 tuple 之后,会
    告知 acker 自己发射的 root id 及生成的那些源 tuple 的 id。而 bolt 呢,每
    次接受到一个输入 tuple 处理完之后,也会告知 acker 自己处理的输入 tuple
    的 id 及新生 成的那些 tuple 的 id。Acker 只需要对这些 id 做一个简单的异或
    运算,就能判断出该 root id 对应的消息单元是否处理完成了。
    http://www.tuicool.com/articles/vErmIb

    DRPC

    DRPC (Distributed RPC)

    分布式远程过程调用

    • DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。

    • DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。
      (其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)

    • DRPC设计目的:
      为了充分利用Storm的计算能力实现高密度的并行实时计算。
      (Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

    • 客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。
      在这里插入图片描述

    定义DRPC拓扑:

    方法1:
    通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)
    该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现
    在这里插入图片描述
    方法2:
    直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑
    需要手动设定好开始的DRPCSpout以及结束的ReturnResults在这里插入图片描述

    运行模式:

    1、本地模式
    在这里插入图片描述
    2、远程模式(集群模式)

    修改配置文件conf/storm.yaml
    drpc.servers:
    - "node1“

    启动DRPC Server
    bin/storm drpc &

    通过StormSubmitter.submitTopology提交拓扑

    在这里插入图片描述
    案例:
    Twitter 中某个URL的受众人数统计
    在这里插入图片描述
    在这里插入图片描述

    展开全文
  • strom

    2019-11-28 14:08:24
  • pg-strom, PG Strom开发知识库 pgpg strom是PostgreSQL数据库的定制扫描提供程序模块。 它是用于使用GPU设备进行accelarate顺序扫描,hash-基于表的Join 和聚合函数。 它的基本概念是CPU和GPU应该集中在它们具有优势...
  • strom基础

    2019-06-14 21:11:41
    strom 经典图谱: strom基础 Topologies Streams Spouts Bolts Stream groupings Reliability Tasks Workers Configuration 1、Topologies 一个topology是spouts和bolts组成的图, 通过stream ...

     

    strom 经典图谱:

    strom基础

    strom基础

    1. Topologies
    2. Streams
    3. Spouts
    4. Bolts
    5. Stream groupings
    6. Reliability
    7. Tasks
    8. Workers
    9. Configuration

    1、Topologies

    一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来,如下图:
    strom基础
    一个topology会一直运行直到你手动kill掉,Storm自动重新分配执行失败的任务, 并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。
    运行一个topology很简单。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令:

    storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

    这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到Nimbus并且上传jar包。
    Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务, 你可以提交由任何语言创建的topology。上面的方面是用JVM-based语言提交的最简单的方法。


    2、Streams

    消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列, 而这些tuple序列会以一种分布式的方式并行地创建和处理。通过对stream中tuple序列中每个字段命名来定义stream。在默认的情况下,tuple的字段类型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定义类型(只要实现相应的序列化器)。
    每个消息流在定义的时候会被分配给一个id,因为单向消息流使用的相当普遍, OutputFieldsDeclarer定义了一些方法让你可以定义一个stream而不用指定这个id。在这种情况下这个stream会分配个值为‘default’默认的id 。
    Storm提供的最基本的处理stream的原语是spout和bolt。你可以实现spout和bolt提供的接口来处理你的业务逻辑。


    3、Spouts

    消息源spout是Storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。Spout可以是可靠的也可以是不可靠的。如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple, 但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
    消息源可以发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。
    Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。
    另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。


    4、Bolts

    所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。
    Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。
    Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
    Bolts的主要方法是execute, 它以一个tuple作为输入,bolts使用OutputCollector来发射tuple,bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 一般的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。


    5、Stream groupings

    定义一个topology的其中一步是定义每个bolt接收什么样的流作为输入。stream grouping就是用来定义一个stream应该如果分配数据给bolts上面的多个tasks。
    Storm里面有7种类型的stream grouping

    1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
    2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts里的一个task, 而不同的userid则会被分配到不同的bolts里的task。
    3. All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
    4. Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
    5. Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
    6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
    7. Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

    6、Reliability

    Storm保证每个tuple会被topology完整的执行。Storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而形成树状结构),并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置,如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,并且过一会儿重新发射这个tuple。
    为了利用Storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个tuple的时候你必须要通知storm。这一切是由OutputCollector来完成的。通过emit方法来通知一个新的tuple产生了,通过ack方法通知一个tuple处理完成了。
    Storm的可靠性我们在第四章会深入介绍。


    7、Tasks

    每一个spout和bolt会被当作很多task在整个集群里执行。每一个executor对应到一个线程,在这个线程上运行多个task,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder类的setSpout和setBolt来设置并行度(也就是有多少个task)。


    8、Workers

    一个topology可能会在一个或者多个worker(工作进程)里面执行,每个worker是一个物理JVM并且执行整个topology的一部分。比如,对于并行度是300的topology来说,如果我们使用50个工作进程来执行,那么每个工作进程会处理其中的6个tasks。Storm会尽量均匀的工作分配给所有的worker。


    9、Configuration

    Storm里面有一堆参数可以配置来调整Nimbus, Supervisor以及正在运行的topology的行为,一些配置是系统级别的,一些配置是topology级别的。default.yaml里面有所有的默认配置。你可以通过定义个storm.yaml在你的classpath里来覆盖这些默认配置。并且你也可以在代码里面设置一些topology相关的配置信息(使用StormSubmitter)。

    展开全文
  • This generated nvme-strom.upgrade.diff file, but from here I have no idea what I need to do to proceed to the stpe 2. <p>Thanks for your time and interesting posts.</p><p>该提问来源于开源项目:...
  • Email Strom

    2018-01-05 03:38:50
    title: Email Strom tags: confluence email storm categories: 工作日志 date: 2016-10-25 18:18:54 鉴于目前confluence大家使用起来越来越多,部分同学反映邮件过多。类似于朋友圈点击like后后面文章做的任何...

    title: Email Strom tags:

    • confluence
    • email
    • storm categories: 工作日志 date: 2016-10-25 18:18:54

    鉴于目前confluence大家使用起来越来越多,部分同学反映邮件过多。类似于朋友圈点击like后后面文章做的任何修改都会被邮件通知到。

    因此在confluence上增加了email strom 功能

    具体使用如下

    选择个人头像修改profile

    此插件会将保存拆分成保存和 Save&Notify,当需要周知修改时请填写changeList并选择Save&Notify按钮

    净化网络环境,从你我做起~~~

    展开全文
  • Strom优化指南

    千次阅读 2017-03-27 15:04:08
    摘要:本文主要讲了笔者使用Strom中的一些优化建议
  • strom-core-1.0.2

    2018-06-11 15:44:25
    这个是strom 1.0.2 的jar 包,版本比较老了,但是还是
  • Strom集群搭建

    2018-06-14 19:53:55
    Strom集群搭建 1. 准备 Jdk strom的部分代码用java编写,需要依赖jdk,我用的是1.7 Python storm依赖python,如果系统自带的python为2.6以下版本需要升级,可以直接在终端输入:python 查看系统自带python版本 ...
  • Pg strom compile issue

    2020-12-09 02:03:33
    <div><p>Hello, I'm trying to compile on ubuntu 14 pg_trom but I get this error : <p>src / main.c : 31 : 29 : fatal error : utils / ruleutils.h : No such file or directory ...kaigai/pg_strom</p></div>
  • STROM简介

    2014-10-29 17:17:04
    STORM就像hadoop上的mapreduce一样是一种计算框架:  在hadoop上运行的是"Mapreduce jobs",在Storm上运行的是"topologies", 与MapReduce job不一样...Strom集群上有两种节点:"master node" 以及"worker nodes".  ma
  • strom介绍,包括出现背景,应用场景,环境搭建,基本架构。
  • Samza与Strom

    千次阅读 2016-07-13 10:59:01
    Samza官方与Strom的对比文档的翻译版本
  • Strom基本概念

    2018-12-14 15:21:59
    Strom是分布式的实时计算系统,处理速度很快,可以达到毫秒级别,处理数据是一条一条的处理。组成是由一个个topology(拓扑)组成,一个拓扑可以包含多个spout和多个blot。 spout只负责接收数据,将数据转换为Tuple...
  • 此压缩包是Strom和kafka整合所需要的所有的jar包,导入即可使用,如果需要这些分布式集群的安装过程,可以关注我的博客,里面有详细的安装教程
  • pg-strom on openSuSE?

    2020-11-25 06:11:39
    <div><p>Hi, Please, is anyone using pg-strom on opensuse? Thanks, Carlos Camargo</p><p>该提问来源于开源项目:heterodb/pg-strom</p></div>
  • Strom集群配置

    2018-07-25 16:16:19
    将topology 发布到Strom 集群,将预先打包成jar 文件的topology 和配置信息提 交(submitting)到nimbus 服务器上。一旦nimbus 接收到了topology 的压缩包,会将jar 包分发到足够数量的supervisor 节点上。当...
  • Strom优化

    2019-03-23 01:15:53
    NULL 博文链接:https://contentprovider.iteye.com/blog/1041946
  • Hadoop与strom

    2017-09-14 11:29:19
    Hadoop是批量计算,Strom是流式计算 两者面向的领域也不完全相同,一个是批量处理,基于任务调度的;另外一个是实时处理,基于流。
  • #strom
  • Strom官网信息

    2017-09-15 09:51:32
    Strom官网首页:http://storm.apache.org/index.html Storm集群创建:http://storm.apache.org/releases/current/Setting-up-a-Storm-cluster.html Storm包下载:http://storm.apache.org//downl
  • GPU数据库PG_strom的安装及使用,包括postgresql的安装, PG_strom的安装。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,330
精华内容 532
关键字:

strom