精华内容
下载资源
问答
  • 2.1无状态分组过滤器和有状态分组过滤器的区别: 无状态分组过滤器:只根据单个IP分组携带的信息确定是否过滤掉该IP。(基于单个IP分组) 有状态分组过滤器:不仅根据IP分组携带的信息,而且还根据IP分组所属的...

    一、防火墙概述

    1.1引出防火墙的原因:

    安全的网络系统既要保障正常的数据交换过程,又要能够检测用于实施攻击的数据交换过程。阻止用于实施攻击的数据交换过程需要做到以下两点:一是能够在网络间传输,或者用户终端输入输出的信息流中检测出用于实施攻击的信息流;二是能够丢弃检测出用于实施攻击的信息流。
    防火墙位于网络之间,或者用户终端与网络之间。具有以下功能:一是能够检测出用于实施攻击的信息流,并阻断这样的信息流;二是能够允许正常信息流通过。

    防火墙工作机制:依据配置的安全策略允许或阻断操作。

    1.2防火墙分类:

    个人防火墙
    只保护单台计算机,用于对进出计算机的信息流实施控制,因此,个人防火墙通常是分组过滤器。
    无状态分组过滤器:只根据单个IP分组携带的信息确定是否过滤掉该IP。
    有状态分组过滤器:不仅根据IP分组携带的信息,而且还根据IP分组所属的会话的状态确定是否过滤掉该IP分组。
    网络防火墙
    通常位于内网和外网之间的连接点,对内网中的资源实施保护。目前作为网络防火墙的主要是:分组过滤器、电路层代理、应用层网关。

    分组过滤器:根据用户指定的安全策略对内网和外网之间传输的信息流实施控制,它对信息流的发送端和接收端是透明的,所以分组过滤器的存在不需要改变终端访问网络的方式。

    电路层代理:终端先向电路代理层请求建立TCP连接,电路层代理在完成对终端用户的身份鉴别后,和服务器建立TCP连接,并将这两个TCP绑定在一起。

    应用层网关:对应用层数据进行内容安全检查,应用层各字段值是否正确?请求消息和响应消息是否匹配?文件内容是否包含进制传播的非法内容或病毒?

    防火墙功能—防火墙的功能主要包含以下几个:
    1.服务控制:只允网络间相互交换和特定服务相关的信息流
    2.方向控制:只允许网络之间交换与由属于某个特定网络的终端发起的3特定服务相关的信息流。
    3.用户控制:不同网络之间只允许传输与授权访问用户合法访问网络资源相关的信息流。
    4.行为控制:不同网络只允许传输与行为合理的网络资源访问过程相关的信息流。

    防火墙的局限性:
    1.无法防御网络内部终端发起的攻击。
    2.不能阻止病毒的传播。
    3.无法防御利用防火墙安全策略允许的信息传输过程实施的攻击

    二、分组过滤器

    2.1无状态分组过滤器和有状态分组过滤器的区别:

    无状态分组过滤器:只根据单个IP分组携带的信息确定是否过滤掉该IP。(基于单个IP分组)
    有状态分组过滤器:不仅根据IP分组携带的信息,而且还根据IP分组所属的会话的状态确定是否过滤掉该IP分组。(基于一个会话)
    分组过滤器一般指无状态分组过滤器,通过制定规则对每一个IP分组的单向传输过程独立进行控制,但实际应用中常常需要针对某个服务相关的一组IP分组的传输过程实施控制,这种控制过程一是双向的,完成服务过程中需要双向传输IP分组;二是相关性,同一传输方向,不同顺序的IP分组之间存在相关性,两个不同传输方向的IP分组之间存在相关性,这种情况下的IP分组传输过程需要有状态分组过滤器机制实施控制。

    判别是否是响应消息的依据:一是响应消息是属于终端A发起建立的与Web服务器之间的TCP连接的TCP报文,即TCP报文的源和目的IP地址、源和目的端口号等于标识该TCP连接的两端插口。二是响应消息和终端A发送给Web服务器的请求消息存在相关性,即如果终端A发送的是建立TCP连接请求报文,则响应消息是同意建立TCP连接的响应报文,如果终端A发送的是HTTP请求报文,则响应消息是HTTP响应报文。这就意味着终端A至Web服务器传输方向可以通过无状态分组过滤器实现允许与终端A访问Web服务器相关的TCP报文正常转发的访问控制,而Web服务器至终端A方向的过滤规则必须根据当前TCP连接状态和终端A刚发送给Web服务器的请求报文的内容动态设置,这就是有状态分组过滤器的本质含义。

    无分组状态过滤器
    过滤规则:由一组属性值(源IP地址、目的IP地址、源和目的端口号、协议类型)和操作组成,如果某个IP分组携带的信息和构成规则的一组属性值匹配,意味着该IP分组和规则匹配,对该IP分组实施规则制定的操作。
    过滤规则格式:
    协议类型= ,源IP地址= ,源端口号= ,目的IP地址= ,目的端口号= ;操作。
    两种过滤规则集设置方法:
    1)黑名单—是列出所有禁止传输的IP分组类型,没有明确禁止的IP分组类型都是允许传输的。
    2)白名单—是列出所有允许传输的IP分组类型,没有明确允许的IP分组类型都是禁止传播的。

    路由器R1接口1输入方向的过滤规则集如下:
    协议类型=TCP,源IP地址=192.1.1.1/32,源端口号=,目的IP地址=192.1.2.7/32,目的端口号=80;正常转发。
    协议类型=TCP,源IP地址=192.1.1.7/32,源端口号=21,目的IP地址=192.1.2.1/32,目的端口号=
    ;正常转发。
    协议类型=TCP,源IP地址=192.1.1.7/32,源端口号=20,目的IP地址=192.1.2.1/32,目的端口号=;正常转发。
    协议类型=
    ,源IP地址=any,目的IP地址=any;丢弃。
    路由器R2接口2输入方向的过滤规则集如下:
    协议类型=TCP,源IP地址=192.1.2.1/32,源端口号=,目的IP地址=192.1.1.7/32,目的端口号=21;正常转发。
    协议类型=TCP,源IP地址=192.1.2.1/32,源端口号=
    ,目的IP地址=192.1.2.1/32,目的端口号=20;正常转发。
    协议类型=TCP,源IP地址=192.1.2.7/32,源端口号=80,目的IP地址=192.1.2.1/32,目的端口号=;正常转发。
    协议类型=
    ,源IP地址=any,目的IP地址=any;丢弃。

    有状态分组过滤器:
    引出有状态分组过滤器的原因:
    但上述过滤规则中直接允许Web服务器发送的、源端口号为80的TCP报文沿着Web服务器至终端A方向传输
    一是只允许由终端A发起建立与Web服务器之间的TCP连接;
    二是没有规定这种传输过程必须在由终端A发起建立与Web服务器之间的TCP连接后进行,也就是没有作用顺序限制;
    三是由于需要用两端插口标识TCP连接,因此,上述过滤规则并没有明确指出只有属于由终端A发起建立与Web服务器之间的TCP连接的TCP报文才能沿着Web服务器至终端A方向传输。

    针对上述三点,有状态分组过滤器的工作原理如下:
    1.终端A至Web服务器传输方向上的过滤规则允许终端A传输与终端A发起访问Web服务器的操作有关的TCP报文;
    2.初始状态下,Web服务器至终端A传输方向上的过滤规则拒绝一切IP分组;
    3.只有当终端A至Web服务器传输方向上传输了与终端A发起访问Web服务器的操作相关的TCP报文之后,Web服务器至终端A传输方向才允许传输作为对应的响应报文的TCP报文。

    有状态分组过滤器根据功能分为会话层和应用层两种类型的有状态分组过滤器。这里的会话层是指分组过滤器检查信息的深度限于与会话相关的信息,与OSI体系结构的会话层没有关系。应用层是指分组过滤器检查信息的深度涉及应用层协议数据单元(PDU)中有关的字段。
    1)会话层有状态分组过滤器
    一个方向配置允许发起创建某个会话的IP分组传输的过滤规则集。创建会话之后,所有属于该会话的报文可以从两个方向传输。也就是说一旦终端A发出请求建立与Web服务器之间的TCP请求报文,路由器R1在会话表中创建一个会话;创建该会话之后,所有属于该会话的TCP报文允许经过路由器R1接口1输入输出。
    除了TCP会话(用TCP连接两端端口号标识会话),还可以是UDP会话(用报文两端端口号标识会话)、ICMP会话(用报文两端地址、请求报文标识符和序号标识会话)
    2)应用层有状态分组过滤器
    它与会话层分组过滤器有以下不同:
    1.应用层有状态分组过滤器需要分析应用层协议数据单元,所以过滤规则中要指定应用层协议。
    2.一个方向需要配置允许传输请求报文的过滤规则,另一个方向自动生成允许传输该请求报文对应响应报文的过滤规则。
    3.应用层检查响应报文与请求报文的相关性。

    展开全文
  • 防火墙的基本概念 防火墙是指能够隔离组织内部网络与公共互联网,允许某些分组通过,...1.无状态分组过滤器 基于特定的规则对分组是通过还是丢弃进行决策 2.有状态分组过滤器 跟踪每个TCP连接建立、拆除,...

     

    防火墙的基本概念

     

    防火墙是指能够隔离组织内部网络与公共互联网,允许某些分组通过,而阻止其他分组进入或离开内部网络的软件、硬件或者软件硬件结合的一种设施。

    前提是从外部到内部和从内部到外部的所有流量都经过防火墙。

     

     

    防火墙分类

     

    1.无状态分组过滤器

    基于特定的规则对分组是通过还是丢弃进行决策

     

    2.有状态分组过滤器

    跟踪每个TCP连接建立、拆除,根据状态确定是否允许分组通过

     

    3.应用网关

    鉴别用户身份或针对授权用户开放特定服务

     

     

    入侵检测系统IDS

     

    入侵检测系统(IDS)是当观察到潜在的恶意流量时,能够产生警告的设备或系统

     

     

     

    展开全文
  • 1.无状态分组过滤器 典型部署在内部网络网络边缘路由器上的防火墙, 路由器逐个检查数据报,根据访问控制表(Access Control Lists ,ACL)实现防火墙规则。 2.有状态分组过滤器 跟踪每个TCP连接建立、 拆除, ...

    1. 防火墙基本概念

    防火墙是能够隔离组织内部网络与公共互联网, 允许某些分组通过, 而阻止其他分组进入或离开内部网络的软件、 硬件或者软件硬件结合的一种设施。

     

    2. 防火墙分类

    1. 无状态分组过滤器

    典型部署在内部网络和网络边缘路由器上的防火墙, 路由器逐个检查数据报,根据访问控制表(Access Control Lists ,ACL)实现防火墙规则。

    2. 有状态分组过滤器

    跟踪每个TCP连接建立、 拆除, 根据状态确定是否允许分组通过。

    3. 应用网关

    应用网关实现授权用户通过网关访问外部网络的服务。

     

    3. 入侵监测系统IDS

    入侵检测系统(Intrusion Detection System,IDS)是当观察到潜在的恶意流量时, 能够产生警告的设备或系统。

    展开全文
  • 2、修正客户端花屏问题 (本地经过数天测试问题) [2011-07-20] 1、修正部分物品设置名称颜色值无效问题 2、修正客户端报错闪屏问题 3、修正脚本命令 CHECKRANGEMONCOUNT 检测地图范围 4、更新装备到期时间显示...
  • 客户端数据绑定支持 服务器端数据绑定支持 带可观察对象的实时数据源支持 单列多列排序 单值多值过滤 自定义布局模板支持 具有会话或本地存储模式的数据表状态持久性 客户端服务器端分页 单,多单切换行...
  • stormTrident

    2019-07-03 20:11:19
    如果您熟悉Pig或Cascading等高级批处理工具,Trident的概念将非常熟悉 - Trident具有连接,聚合,分组,功能和过滤器。除此之外,Trident还添加了基元,用于在任何数据库或持久性存储之上执行有状态的增量处理...

    Storm Trident

    Trident是一个高级抽象,用于在Storm之上进行实时计算。它允许您无缝混合高吞吐量(每秒数百万条消息),有状态流处理和低延迟分布式查询。如果您熟悉Pig或Cascading等高级批处理工具,Trident的概念将非常熟悉 - Trident具有连接,聚合,分组,功能和过滤器。除此之外,Trident还添加了基元,用于在任何数据库或持久性存储之上执行有状态的增量处理。 Trident具有一致,exactly-once的语义,因此很容易推理Trident拓扑。

    Trident Kafka集成

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.0</version>
    </dependency>
    
    //工具类   获取KafkaSpout(用作lowlevel输入)和KafkaTridentSpoutOpaque(用作trident不透明事务输入)
    public class KafkaSpoutUtils {
        public static KafkaSpout<String, String> buildKafkaSpout(String boostrapServers, String topic){
    //连接参数
            KafkaSpoutConfig<String,String> kafkaspoutConfig=KafkaSpoutConfig.builder(boostrapServers,topic)
    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")       //反序列化
                .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
                //指定组id
                    .setProp(ConsumerConfig.GROUP_ID_CONFIG,"g1")
                //关闭空值读取(默认关闭)
                    .setEmitNullTuples(false)
                //设置订阅后产生的数据才可以读取
                    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
                //设置数据至少正常消费一次
                    .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)
                    .setMaxUncommittedOffsets(10)//一旦分区积压有10个未提交offset,Spout停止poll数据,解决Storm背压问题
    
                    .build();
            return new KafkaSpout<String, String>(kafkaspoutConfig);
        }
        //可以保证精准一次更新,推荐使用
        public static KafkaTridentSpoutOpaque<String,String> buildKafkaSpoutOpaque(String boostrapServers, String topic){
            KafkaTridentSpoutConfig<String, String> kafkaOpaqueSpoutConfig = KafkaTridentSpoutConfig.builder(boostrapServers, topic)
                    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
                    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
                    .setProp(ConsumerConfig.GROUP_ID_CONFIG,"g1")
                    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
                //指定数据如何对接(即:record->tuple)
                    .setRecordTranslator(new Func<ConsumerRecord<String, String>, List<Object>>() {
                        public List<Object> apply(ConsumerRecord<String, String> record) {
                            return new Values(record.key(),record.value(),record.timestamp());
                        }
                    },new Fields("key","value","timestamp"))
                    .build();
            return new KafkaTridentSpoutOpaque<String, String>(kafkaOpaqueSpoutConfig);
        }
    }
    
    public static void main(String[] args) throws Exception {
        TridentTopology tridentTopology=new TridentTopology();
    
        tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
                .peek((TridentTuple input) ->{
                    System.out.println(input);
                });
    
        new LocalCluster().submitTopology("tridentTopology",new Config(),tridentTopology.build());
    }
    

    常见算子介绍

    Map算子

    将一个Tuple转换为另外一个Tuple,如果用户修改了Tuple元素的个数,需要指定输出的Fields

    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
                    .map((tuple)-> new Values("Hello~"+tuple.getStringByField("value")),new Fields("name"))
                    .peek((tuple) -> System.out.println(tuple));
    

    Filter

    过滤上游输入的Tuple将满足条件的Tuple向下游输出。

     tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
         //BaseFilter为抽象类不能使用lamda表达式
         .filter(new Fields("value"), new BaseFilter() {
             @Override
             public boolean isKeep(TridentTuple tuple) {
                 System.out.println(tuple.getFields());
                 return !tuple.getStringByField("value").contains("error");
             }
         })
         .peek((tuple) -> System.out.println(tuple));
    

    flatMap

    将一个Tuple,转换为多个Tuple,如果修改了Tuple的数目,需要指定输出的Fields

     tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
                    .flatMap((tuple)->{
                        List<Values> list=new ArrayList<>();
                        String[] tokens = tuple.getStringByField("value").split("\\W+");
                        for (String token : tokens) {
                            list.add(new Values(token));
                        }
                        return list;
                    },new Fields("word"))
                    .peek((tuple) -> System.out.println(tuple));
    

    each

    参数传递可以是BaseFunction(添加fields)和BaseFilter(等价于Filter)

    • basefunction
    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
        .each(new Fields("value"), new BaseFunction() {
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) {
                collector.emit(new Values(tuple.getStringByField("value")));
            }
        }, new Fields("other"))
        .peek((tuple) -> System.out.println(tuple));
    
    • baseFilter
    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
        .each(new Fields("value"), new BaseFilter() {
            @Override
            public boolean isKeep(TridentTuple tuple) {
                return !tuple.getStringByField("value").contains("error");
            }
        })
        .peek((tuple) -> System.out.println(tuple));
    

    project

    投影/过滤Tuple中无用field

    tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
        .project(new Fields("value","timestamp"))
        .peek((tuple) -> System.out.println(tuple));
    

    ### 分区和聚合(无状态)

    public class KafkaTridentTopology {
        public static void main(String[] args) throws Exception {
            TridentTopology tridentTopology=new TridentTopology();
    
            tridentTopology.newStream("KafkaSpoutOpaque",KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092","topic01"))
                .parallelismHint(3)
                .project(new Fields("value"))
                .flatMap((tuple)-> {
                    List<Values> list=new ArrayList<>();
                    String[] tokens = tuple.getStringByField("value").split("\\W+");
                    for (String token : tokens) {
                        list.add(new Values(token));
                    }
                    return list;
                },new Fields("word"))
                .map((tuple)->new Values(tuple.getStringByField("word"),1),new Fields("key","count"))
                .partition(new PartialKeyGrouping(new Fields("key")))
                .parallelismHint(5)
                //分区聚合
                .partitionAggregate(new Fields("key","count"),new CountAggregater(),new Fields("word","total"))
                .peek((tuple) -> System.out.println(tuple));
            new LocalCluster().submitTopology("tridentTopology",new Config(),tridentTopology.build());
        }
    }
    

    CountAggregater

    public class CountAggregater extends BaseAggregator<Map<String,Integer>> {
        @Override
        public Map<String, Integer> init(Object batchId, TridentCollector collector) {
            return new HashMap<>();
        }
    
        @Override
        public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {
            String word = tuple.getStringByField("key");
            Integer count=tuple.getIntegerByField("count");
    
    
            if(val.containsKey(word)){
                count= val.get(word)+count;
            }
            val.put(word,count);
        }
    
        @Override
        public void complete(Map<String, Integer> val, TridentCollector collector) {
            for (Map.Entry<String, Integer> entry : val.entrySet()) {
                collector.emit(new Values(entry.getKey(),entry.getValue()));
            }
            val.clear();
        }
    }
    
    

    状态管理

    实时计算要解决的一个关键问题是如何管理状态,以便在面对故障和重试时更新是幂等的。消除故障是不可能的,因此当节点死亡或出现其他问题时,需要重试批次。问题是 - 如何进行状态更新(无论是外部数据库还是拓扑内部的状态),以便每条消息只处理一次? 这是一个棘手的问题,可以通过以下示例进行说明。假设您正在对流进行计数聚合,并希望将运行计数存储在数据库中。如果您只在数据库中存储计数并且是时候为批处理应用状态更新,则无法知道您之前是否应用了该状态更新。可以在之前尝试批处理,成功更新数据库,然后在稍后的步骤中失败。或者之前可能已尝试批处理并且无法更新数据库。你只是不知道。

    Trident通过做两件事来解决这个问题:

    • 每个批次都有一个称为“transaction ID”的唯一ID。如果重试批次,它将具有完全相同的事务ID。
    • 批次之间有序的状态更新。也就是说,在批处理2的状态更新成功之前,将不会应用批处理3的状态更新。

    使用这两个原则,您可以使用状态更新实现一次性语义。您可以做的不是将计数存储在数据库中,而是将事务ID与数据库中的计数存储为原子值。然后,在更新计数时,您只需将数据库中的事务ID与当前批次的事务ID进行比较。如果它们相同,则跳过更新 - 由于强大的排序,您确定数据库中的值包含当前批次。如果它们不同,则增加计数。

    当然,您不必在拓扑中手动执行此逻辑。这个逻辑由State抽象包装并自动完成。您的State对象也不需要实现事务ID技巧:如果您不想支付在数据库中存储事务ID的成本,则您不必这样做。在这种情况下,状态将在失败的情况下具有至少一次处理的语义(对于您的应用程序可能没问题)

    事务状态管理

    • 事务state
      • 1、失败事务ID不变且事务中的Tuple不变
      • 2、保证状态更新严格有序
    key - [txid,value]
    

    txid不同 更新value、txid相同跳过

    • Opaque 事务
      • 1、失败事务ID不变,但是同一个批次的tuple可以不同(允许分区数据丢失)
      • 2、保证状态更新严格有序
    public class TransactionalValue<T> {
        T val;
        Long txid;
        ...
    }
    
    key - [txid,prevValue,value]
    
    public class OpaqueValue<T> {
        Long currTxid;
        T prev;
        T curr;
        ...
    }
    

    统计案例

    public class KafkaTridentTopology {
        public static void main(String[] args) throws Exception {
            TridentTopology tridentTopology = new TridentTopology();
    
    
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
                    .setHost("CentOSA")
                    .setPort(6379)
                    .build();
            Options<OpaqueValue> options=new Options<OpaqueValue>();
            options.dataTypeDescription=new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,"mapstate");
            options.serializer=new JSONOpaqueSerializer();
    
    
            tridentTopology.newStream("KafkaSpoutOpaque", KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
                //筛选读取的数据(这里只读取value属性)
                    .project(new Fields("value"))
                    .flatMap((tuple) -> {
                        List<Values> list = new ArrayList<>();
                        String[] tokens = tuple.getStringByField("value").split("\\W+");
                        for (String token : tokens) {
                            list.add(new Values(token));
                        }
                        return list;
                    }, new Fields("word"))
                    .groupBy(new Fields("word"))
                //汇总聚合
                    .persistentAggregate(RedisMapState.opaque(jedisPoolConfig,options),new Fields("word"),new Count(),new Fields("count"))
                //将tridentstate转换成stream
                    .newValuesStream()
                    .peek(new Consumer() {
                        @Override
                        public void accept(TridentTuple input) {
                            System.out.println(input);
                        }
                    });
    
            Config config = new Config();
    
            new LocalCluster().submitTopology("tridentTopology", config, tridentTopology.build());
        }
    }
    

    自定义State

    • 写一个State的实现
    public class RedisIpState implements State {
    
        @Override
        public void beginCommit(Long txid) {
    
        }
        @Override
        public void commit(Long txid) {
    
        }
        //批量查询 stateQuery
         public List<String> bachRetrive(List<TridentTuple> tuples) {
             
         }
        //批量修改 partitionPersisit
        public void batchUpdate(List<TridentTuple> tuples) {
    
        }
    }
    
    • 写一个StateFactory
    public class MyStateFactory implements StateFactory{
         @Override
        public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
            return new RedisIpState(jedisPoolConfig);
        }
    }
    

    stateQuery

    public class IpQueryFunction extends BaseQueryFunction<RedisIpState, String> {
        @Override
        public List<String> batchRetrieve(RedisIpState state, List<TridentTuple> tuples) {
            return state.bachRetrive(tuples);
        }
    
        @Override
        public void execute(TridentTuple tuple, String lastIp, TridentCollector collector) {
            System.out.println(tuple);
            collector.emit(new Values(lastIp));
        }
    }
    
    TridentState ipstate = tridentTopology.newStaticState(MyStateFactory实例);
    
    
    tridentTopology.newStream("KafkaSpoutOpaque", KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
        .project(new Fields("value"))
        .map((tuple)-> {
            String value = tuple.getStringByField("value");
            String[] tokens = value.split("\\s+");
            return new Values(tokens[1],tokens[4] );
        },new Fields("userid","ip"))
        .stateQuery(ipstate,new Fields("userid","ip"),new IpQueryFunction(),new Fields("hip"))
        .peek(new Consumer() {
            @Override
            public void accept(TridentTuple input) {
                System.out.println(input);
            }
        });
    

    partitionPersist

    public class UserIPSateUpdater extends BaseStateUpdater<RedisIpState> {
        @Override
        public void updateState(RedisIpState state, List<TridentTuple> tuples, TridentCollector collector) {
            state.batchUpdate(tuples);
        }
    }
    
    
    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder().
            setHost("CentOSA")
            .setPort(6379).build();
    
    //INFO 001 2019:10:10 10:00:00 1.202.251.26
    tridentTopology.newStream("KafkaSpoutOpaque", KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
                    .project(new Fields("value"))
                    .map((tuple)-> {
                        String value = tuple.getStringByField("value");
                        String[] tokens = value.split("\\s+");
                        return new Values(tokens[1],tokens[4] );
                    },new Fields("userid","ip"))
                    .partitionPersist(MyStateFactory实例,
                            new Fields("userid","ip"),new UserIPSateUpdater(),new Fields());
    Config config = new Config();
    

    窗口

    • SlidingWindow
    public class TridentWindowDemo {
        public static void main(String[] args) throws Exception {
            TridentTopology tridentTopology = new TridentTopology();
    
            tridentTopology.newStream("KafkaSpoutOpaque",
                    KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
                    .project(new Fields("value"))
                    .flatMap((tuple) -> {
                        List<Values> list = new ArrayList<>();
                        String[] tokens = tuple.getStringByField("value").split("\\W+");
                        for (String token : tokens) {
                            list.add(new Values(token));
                        }
                        return list;
                    }, new Fields("word"))
                    .map((tuple) -> new Values(tuple.getStringByField("word"), 1), new Fields("key", "count"))
                    .slidingWindow(
                            BaseWindowedBolt.Duration.seconds(10),
                            BaseWindowedBolt.Duration.seconds(5),
                            new InMemoryWindowsStoreFactory(),
                            new Fields("key","count"),
                            new WordCountAggregator(),
                            new Fields("key","total")
                    )
                    .peek(new Consumer() {
                        @Override
                        public void accept(TridentTuple input) {
                            System.out.println(input);
                        }
                    });
    
            new LocalCluster().submitTopology("aa",new Config(),tridentTopology.build());
        }
    }
    
    • TridentTopology
    TridentTopology tridentTopology = new TridentTopology();
    
    WindowConfig wc= SlidingDurationWindow.of(BaseWindowedBolt.Duration.seconds(10),
            BaseWindowedBolt.Duration.seconds(5));
    
    tridentTopology.newStream("KafkaSpoutOpaque",
            KafkaSpoutUtils.buildKafkaSpoutOpaque("CentOSA:9092,CentOSB:9092,CentOSC:9092", "topic01"))
            .project(new Fields("value"))
            .flatMap((tuple) -> {
                List<Values> list = new ArrayList<>();
                String[] tokens = tuple.getStringByField("value").split("\\W+");
                for (String token : tokens) {
                    list.add(new Values(token));
                }
                return list;
            }, new Fields("word"))
            .map((tuple) -> new Values(tuple.getStringByField("word"), 1), new Fields("key", "count"))
            .window(wc,
                    new InMemoryWindowsStoreFactory(),
                    new Fields("word","count"),
                    new WordCountAggregator(),
                    new Fields("word","total")
            )
            .peek(new Consumer() {
                @Override
                public void accept(TridentTuple input) {
                    System.out.println(input);
                }
            });
    
    new LocalCluster().submitTopology("aa",new Config(),tridentTopology.build());
    

    Storm对象序列化

    • Storm-1.x版本

    在storm中流动的数据流格式可以多种多样,数据流可以以各种格式的形式在task之间进行传递。因为storm中可以对数据格式自动进行序列化,但是也只是对于一些常见格式能进行序列化,其中包括int, short, long, float, double, bool, byte, string, byte arrays,也就是说用户可以直接在task之间传递这些类型而不需要做其它的操作,但是对于一些其它类型,或是自己定义的一些类型(比如要在task之间传递一个对象格式),就需要自己进行序列化.

    public class UserOrder implements Serializable {
        private Integer userid;
        private String username;
        private String itemname;
        private double cost;
        ...
    }
    
    Config conf = new Config();
    //如果是Storm-1.x版本,需要在Tuple中传递实体类,需要注册改实体类
    //目前测试版本是Storm-2.0.0,不需注册序列化
    conf.registerSerialization(UserOrder.class);
    StormSubmitter.submitTopology("localDemo",conf,tridentTopology.build());
    

    目前如果使用最新的Storm-2.0 用户只需要自定义实体类型实现序列化接口即可,无需注册序列化。

    展开全文
  • Storm Trident

    2019-06-22 14:01:27
    如果您熟悉Pig或Cascading等高级批处理工具,Trident的概念将非常熟悉 - Trident具有连接,聚合,分组,功能和过滤器。除此之外,Trident还添加了基元,用于在任何数据库或持久性存储之上执行有状态的增量处理。 ...
  • 问题1-2:能否说:“电路交换面向连接是等同的,而分组交换和无连接是等同的”? 问题1-3:因特网使用的IP协议是连接的,因此其传输是不可靠的。这样容易使人们感到因特网很不可靠。那么为什么当初不把因特网的...
  • 问题1-2:能否说:“电路交换面向连接是等同的,而分组交换和无连接是等同的”? 问题1-3:因特网使用的IP协议是连接的,因此其传输是不可靠的。这样容易使人们感到因特网很不可靠。那么为什么当初不把因特网的...
  • 静态路由和有类别查找  当路由选择表进程检查一条使用中间地址(路由选择表中作为下一跳引用的IP地址)的可解析的静态路由时,这个检查总是在有类别方式下完成的,无论是否使用ip classless命令如果在路由选择表...
  • TCPIP详解--共三卷

    2015-11-30 17:17:21
    29.6.3 无状态 358 29.6.4 例子:服务器崩溃 358 29.6.5 等幂过程 360 29.7 第3版的NFS 360 29.8 小结 361 第30章 其他的TCP/IP应用程序 363 30.1 引言 363 30.2 Finger协议 363 30.3 Whois协议 364 30.4 Archie、...
  • 6.2 分组框(Group) 80 6.3 选项卡(TabFolder) 81 6.3.1 选项卡的基本构成 81 6.3.2 设置底部显示选项卡 82 6.3.3 设置选项卡图标 82 6.3.4 选项卡的常用方法 83 6.4 自定义选项卡(CTabFolder ) 83...
  • 6.2 分组框(Group) 80 6.3 选项卡(TabFolder) 81 6.3.1 选项卡的基本构成 81 6.3.2 设置底部显示选项卡 82 6.3.3 设置选项卡图标 82 6.3.4 选项卡的常用方法 83 6.4 自定义选项卡(CTabFolder ) 83...
  • 6.2.4 选择地禁用视图状态 6.2.5 视图状态安全 6.3 在页面间传送信息 6.3.1 查询字符串 6.3.2 跨页回发 6.4 cookie 6.5 会话状态 6.5.1 会话架构 6.5.2 使用会话状态 6.5.3 配置会话状态 ...
  • 实例257 利用HAVING语句过滤分组数据 实例258 HAVING语句应用在多表查询中 第8章 SQL嵌入ADO.NET高级应用 8.1 虚拟数据表:视图的应用 实例259 查询视图中的员工工资数据 实例260 获取当前数据库中的全部用户...
  • 第7章 键盘操作和状态栏特效 7.1 按功能键返回首页 7.2 回车实现Tab键功能 7.3 Ctrl+Enter提交数据 7.4 IE中屏蔽退格建(Back Space) 7.5 屏蔽键盘所有键 7.6 JavaScript捕获方向键 7.7 状态栏变化信息 7.8 状态栏...
  • Python Cookbook

    2013-07-31 22:33:26
    5.15 根据姓的首字母将人名排序和分组 214 第6章 面向对象编程 217 引言 217 6.1 温标的转换 223 6.2 定义常量 225 6.3 限制属性的设置 227 6.4 链式字典查询 229 6.5 继承的替代方案-自动托管 231 6.6 在...
  • 6.2.4 选择地禁用视图状态 184 6.2.5 视图状态安全 185 6.3 在页面间传送信息 186 6.3.1 查询字符串 187 6.3.2 跨页回发 188 6.4 cookie 193 6.5 会话状态 194 6.5.1 会话架构 194 6.5.2 使用会话...
  • TCP_IP详解卷1

    热门讨论 2010-12-29 10:53:54
    29.6.3 无状态 358 29.6.4 例子:服务器崩溃 358 29.6.5 等幂过程 360 29.7 第3版的NFS 360 29.8 小结 361 第30章 其他的TCP/IP应用程序 363 30.1 引言 363 30.2 Finger协议 363 30.3 Whois协议 364 30.4 Archie、...
  • 实例030 打开关闭输入法编辑 实例031 使用键盘控制窗体的移动 实例032 虚拟键盘操作 实例033 多功能键盘 第2篇 windows系统开发篇 第3章 获取系统相关信息 3.1 获取计算机系统信息 实例034 获取系统时间 实例035...
  • Visual C++编程技巧精选500例.pdf

    热门讨论 2012-09-01 15:01:50
    033 如何设置文件对话框过滤器? 034 如何设置文件对话框多重选择功能? 035 如何设置文件对话框打开时的目录位置? 036 如何从文件对话框中选择文件夹? 037 如何从文件对话框中新建文件夹? 038 如何在文件对话框中预览...
  • FilmmakerProV11注册机

    2011-12-23 15:00:01
     针对高级的数据库开发人员,FileMaker Pro 11 改进了脚本创建编辑模式,根据计算结果的关系字段进行入口过滤,及创建布局文件夹,通过简单地拖放操作管理多布局。  新的合作方式  FileMaker Pro 11 同时推出了...
  • 18.1.4 过滤器 18.1.5 断言 18.2 事件日志 18.2.1 事件日志体系架构 18.2.2 事件日志类 18.2.3 创建事件源 18.2.4 写入事件日志 18.2.5 资源文件 18.3 性能监控 18.3.1 性能监控类 18.3.2 性能计数器的构建 18.3.3 ...
  • 查看/设置学生教师的 Internet Explorer 网络钓鱼过滤器。 查看/设置学生教师的 Internet Explorer 保护模式策略。 查看学生教师的 NetSupport 防护状态。 编辑远程系统的注册表。 从远程系统在您的 PC 上调用...
  • 查看/设置学生教师的 Internet Explorer 网络钓鱼过滤器。 查看/设置学生教师的 Internet Explorer 保护模式策略。 查看学生教师的 NetSupport 防护状态。 编辑远程系统的注册表。 从远程系统在您的 PC 上调用...
  • 子控件包括饼图+圆环图+曲线图+柱状图+柱状分组图+横向柱状图+横向柱状分组图+合格率控件+百分比控件+进度控件+设备状态面板+表格数据+地图控件(包括动态闪烁点+迁徙图等)+视频控件+其他控件等。 二级界面可以自由...
  • WPF编程宝典 part1

    2015-07-20 23:33:59
    21.2 过滤、排序与分组 588 21.2.1 过滤集合 588 21.2.2 过滤DataTable对象 591 21.2.3 排序 592 21.2.4 分组 593 21.2.5 实时成型 598 21.3 小结 599 第22章 列表、树网格 601 22.1 ListView控件 601 22.1.1 使用...
  • 测试培训教材

    2014-04-01 12:10:48
    视图->筛选/排序->设置筛选/排序 设置排序字段 设置过滤条件 修改需求 拷贝需求项Cruise Reservation 重命名需求项Cruise Reservation_Copy_1为Hotel Reservation 移动需求项到Reservations ...
  • 实例189 通过过滤器控制页面输出内容 实例190 使用过滤器自动生成静态页面 实例191 文件上传过滤器 实例192 权限验证过滤器 7.2 监听器的应用 实例193 监听在线用户 实例194 应用监听器使服务器端免登录 第8...
  • 在ASP.Net课程的一开始,不是直接教学员怎么拖ASP.Net控件进行快速开发,而是通过ashx的模式开发原始的动态网站,让学员明白“请求—处理—响应模型”、“Http协议、Http无状态”、“c#代码渲染生成浏览器端...

空空如也

空空如也

1 2 3 4
收藏数 70
精华内容 28
关键字:

无状态分组过滤器和有状态分组过滤器