精华内容
下载资源
问答
  • 一文入门流处理开发

    万次阅读 2020-05-29 14:30:04
    Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化...

    一、Flink介绍

    Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。

    二、部署环境

    操作系统环境:

    flink支持Linux, Mac OS X, 和 Windows环境部署,本次部署选择Linux环境部署。

    JDK:要求Java 7或者更高

    三、下载软件

    jdk1.8.0_144

    flink-1.4.2-bin-hadoop26-scala_2.11.tgz

    四、部署步骤

    1、JDK安装步骤此处省略,安装后验证下JDK环境

    $ java -version
    
    openjdk version "1.8.0_144"
    
    OpenJDKRuntimeEnvironment(build 1.8.0_144-b01)
    
    OpenJDK64-BitServer VM (build 25.144-b01, mixed mode)
    

    2、安装部署flink 本文介绍flink部署分为两种模式:local,standalone。下面依次介绍这两种模式的部署方式。

    找到下载的flink压缩包,进行解压

    $ tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz
    

    首先是local模式,最为简单。

    $ cd flink-1.4.2
    
    $ bin/start-local.sh
    
    Starting job manager
    

    我们可以通过查看日志确认是否启动成功

    $ tailf flink-csap-taskmanager-0-XXXX.log
    
    2018-05-0310:07:53,718 INFO  org.apache.flink.runtime.filecache.FileCache- User file cache uses directory /tmp/flink-dist-cache-4c371de9-0f85-4889-b4d9-4a522641549c
    
    2018-05-0310:07:53,725 INFO  org.apache.flink.runtime.taskmanager.TaskManager- StartingTaskManager actor at akka://flink/user/taskmanager#-524742300.
    
    2018-05-0310:07:53,725 INFO  org.apache.flink.runtime.taskmanager.TaskManager- TaskManager data connection information: 2c358d6f38949f9aae31c5bddb0cc1dc@ LY1F-R021707-VM14.local(dataPort=55234)
    
    2018-05-0310:07:53,726 INFO  org.apache.flink.runtime.taskmanager.TaskManager- TaskManager has 1 task slot(s).
    
    2018-05-0310:07:53,727 INFO  org.apache.flink.runtime.taskmanager.TaskManager- Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
    
    2018-05-0310:07:53,730 INFO  org.apache.flink.runtime.taskmanager.TaskManager- Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)
    
    2018-05-0310:07:53,848 INFO  org.apache.flink.runtime.taskmanager.TaskManager- Successful registration at JobManager(akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache.
    
    2018-05-0310:07:53,851 INFO  org.apache.flink.runtime.taskmanager.TaskManager- Determined BLOB server address to be localhost/127.0.0.1:52382.Starting BLOB cache.
    
    2018-05-0310:07:53,858 INFO  org.apache.flink.runtime.blob.PermanentBlobCache- Created BLOB cache storage directory /tmp/blobStore-c07b9e80-41f0-490f-8126-7008144c4b0b
    
    2018-05-0310:07:53,861 INFO  org.apache.flink.runtime.blob.TransientBlobCache- Created BLOB cache storage directory /tmp/blobStore-e0d1b687-1c47-41c4-b5bc-10ceaa39e778
    

    JobManager进程将会在8081端口上启动一个WEB页面,我们可以通过浏览器到hostname:8081中查看相关的信息。可以打开页面查看到相关信息,说明local模式部署是没问题的。

    下面来看一下standlone部署方式。

    安装JDK,解压压缩包,都是一样的。不一样的是我们要修改解压后的flink配置文件。然后在集群主机间做免密,免密操作方法。

    修改conf/flink-conf.yaml,我们将jobmanager.rpc.address的值设置成你master节点的IP地址。此外,我们通过jobmanager.heap.mb和taskmanager.heap.mb配置参数来设置每个节点的JVM能够分配的最大内存。从配置参数名字可以看出,这个参数的单位是MB,如果某些节点拥有比你之前设置的值更多的内存时,我们可以在那个节通过FLINKTMHEAP参数类覆盖值钱的设置。

    我们需要把所有将要作为worker节点的IP地址存放在conf/slaves文件中,在conf/slaves文件中,每个IP地址必须放在一行,如下:

    192.168.0.100
    
    192.168.0.101
    
    .
    
    .
    
    .
    
    192.168.0.150
    

    然后将修改好的flink包整理复制到集群各个节点。每个节点flink路径保持一致。然后启动集群

    $ bin/start-cluster.sh
    

    查看日志是否成功。

    以上是部署方法,部署成功后,我们来跑一个demo程序,验证一下Flink的流处理功能,对其有个初步的了解。

    flink为了更好的让大家理解,已经给大家提供了一些demo代码,demo的jar包可以在/examples/streaming首先看一下demo代码:

    objectSocketWindowWordCount{
    
    
    
    def main(args: Array[String]) : Unit= {
    
    
    
    // the port to connect to
    
            val port: Int= try{
    
    ParameterTool.fromArgs(args).getInt("port")
    
    } catch{
    
    case e: Exception=> {
    
    System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
    
    return
    
    }
    
    }
    
    
    
    // get the execution environment
    
            val env: StreamExecutionEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
    
    
    
    // 
    get input data by connecting to the socket
    
            val text = env.socketTextStream(
    "localhost"
    , port, 
    '\n'
    )
    
    
    
    // parse the data, group it, window it, and aggregate the counts
    
            val windowCounts = text
    
    .flatMap { w => w.split(
    "\\s"
    ) }
    
    .map { w => 
    WordWithCount
    (w, 
    1
    ) }
    
    .keyBy(
    "word"
    )
    
    .timeWindow(
    Time
    .seconds(
    5
    ), 
    Time
    .seconds(
    1
    ))
    
    .sum(
    "count"
    )
    
    
    
    // print the results with a single thread, rather than in parallel
    
            windowCounts.
    print
    ().setParallelism(
    1
    )
    
    
    
            env.execute(
    "Socket Window WordCount"
    )
    
    }
    
    
    
    // Data type for words with count
    
    caseclass
    WordWithCount
    (word: 
    String
    , count: 
    Long
    )
    
    }
    

    这个demo是监控端口,然后对端口输入单子进行wordcount的程序。

    运行demo,首先打开一个窗口进行端口数据输入:

    $ nc -l 9001
    
    hello
    
    hello
    
    word
    
    world
    

    然后运行demo监控端口单词输入统计:

    $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001
    

    运行后可以看到结果统计:

    $ more flink-csap-taskmanager-0-XXX.out.1
    
    hello : 1
    
    hello : 1
    
    word : 1
    
    world : 1
    

    五、IEDA开发环境搭建

    1、安装java环境
    此处略去,这个你已经会了~

    2、安装maven
    参考Maven安装与配置

    3、配置IDEA
    参考如何使用IntelliJ IDEA 配置Maven
    4、pom文件设置

    </properties>
    
    <dependencies>
    
    <dependency>
    
    <groupId>org.scala-lang</groupId>
    
    <artifactId>scala-library</artifactId>
    
    <version>${scala.version}</version>
    
    </dependency>
    
    <dependency>
    
    <groupId>org.apache.flink</groupId>
    
    <artifactId>flink-java</artifactId>
    
    <version>${flink.version}</version>
    
    </dependency>
    
    <dependency>
    
    <groupId>org.apache.flink</groupId>
    
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    
    <version>${flink.version}</version>
    
    </dependency>
    
    <dependency>
    
    <groupId>org.apache.flink</groupId>
    
    <artifactId>flink-scala_${scala.binary.version}</artifactId>
    
    <version>${flink.version}</version>
    
    </dependency>
    
    <dependency>
    
    <groupId>org.apache.flink</groupId>
    
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    
    <version>${flink.version}</version>
    
    </dependency>
    
    <dependency>
    
    <groupId>org.apache.flink</groupId>
    
    <artifactId>flink-table_${scala.binary.version}</artifactId>
    
    <version>${flink.version}</version>
    
    </dependency>
    
    <dependency>
    
    <groupId>org.apache.flink</groupId>
    
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    
    <version>${flink.version}</version>
    
    </dependency>
    
    <dependency>
    
    <groupId>org.apache.flink</groupId>
    
    <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
    
    <version>${flink.version}</version>
    
    </dependency>
    
    <dependency>
    

    5、代码示例

    import org.apache.flink.api.common.functions.FlatMapFunction;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import org.apache.flink.util.Collector;
    
    
    
    /**
    
     * Author: qincf
    
     * Date: 2018/11/02
    
     * Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
    
     *       先在目标主机1.1.1.1机器上执行nc -l 9000
    
     */
    
    publicclassStreamingWindowWordCount{
    
    publicstaticvoid main(String[] args) throwsException{
    
    //定义socket的端口号
    
    int port;
    
    try{
    
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
                port = parameterTool.getInt("port");
    
    }catch(Exception e){
    
    System.err.println("没有指定port参数,使用默认值9000");
    
                port = 9000;
    
    }
    
    //获取运行环境
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    //连接socket获取输入的数据
    
    DataStreamSource<String> text = env.socketTextStream("1.1.1.1", port, "\n");
    
    //计算数据
    
    DataStream<WordWithCount> windowCount = text.flatMap(newFlatMapFunction<String, WordWithCount>() {
    
    publicvoid flatMap(String value, Collector<WordWithCount> out) throwsException{
    
    String[] splits = value.split("\\s");
    
    for(String word:splits) {
    
    out.collect(newWordWithCount(word,1L));
    
    }
    
    }
    
    })//打平操作,把每行的单词转为<word,count>类型的数据
    
    //针对相同的word数据进行分组
    
    .keyBy("word")
    
    //指定计算数据的窗口大小和滑动窗口大小
    
    .timeWindow(Time.seconds(2),Time.seconds(1))
    
    .sum("count");
    
    //获取可视化JSON
    
    System.out.println(env.getExecutionPlan());
    
    //把数据打印到控制台,使用一个并行度
    
            windowCount.print().setParallelism(1);
    
    //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
    
            env.execute("streaming word count");
    
    
    
    
    
    
    
    }
    
    
    
    /**
    
         * 主要为了存储单词以及单词出现的次数
    
         */
    
    publicstaticclassWordWithCount{
    
    publicString word;
    
    publiclong count;
    
    publicWordWithCount(){}
    
    publicWordWithCount(String word, long count) {
    
    this.word = word;
    
    this.count = count;
    
    }
    
    
    
    @Override
    
    publicString toString() {
    
    return"WordWithCount{"+
    
    "word='"+ word + '\''+
    
    ", count="+ count +
    
    '}';
    
    }
    
    }
    
    
    
    }
    

    6、测试步骤

    首先在1.1.1.1机器上使用nc命令模拟数据发送

    nc -l 
    9000
    

    然后在IEDA中运营StreamingWindowWordCount程序 在主机上输入字符

    [root@data01]# nc -l 9000
    
    a
    
    a
    
    b
    
    c
    
    d
    
    d
    

    此时运行程序后,IDEA中会打印处结果

    WordWithCount{word='a', count=1}
    
    WordWithCount{word='a', count=2}
    
    WordWithCount{word='b', count=1}
    
    WordWithCount{word='d', count=1}
    
    WordWithCount{word='c', count=1}
    
    WordWithCount{word='c', count=1}
    
    WordWithCount{word='a', count=1}
    
    WordWithCount{word='d', count=1}
    
    WordWithCount{word='b', count=1}
    

    大家会看到,wordcount的结果。仔细看还有一串json输出,这部分是什么呢?代码中加了一个打印执行计划的部分:

    /获取可视化JSON
    
    System.out.println(env.getExecutionPlan());
    

    Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到Flink Plan Visualizer可以看到执行计划图:

    现在你已经搭建好Flink开发环境了,可以开启你的流处理旅程了,更多教程可以参考Flink官网。

    展开全文
  • NiFi流处理引擎

    千次阅读 2017-10-24 19:26:10
    有特点的流处理引擎NiFi 流处理不止有flink、storm、spark streaming,今天介绍一个大家不一定用得很多,但是却很有特点的东西,NiFi。 前面写了flink的文章,其实流处理不止有flink、storm、spark ...

    有特点的流处理引擎NiFi

    流处理不止有flink、storm、spark streaming,今天介绍一个大家不一定用得很多,但是却很有特点的东西,NiFi。

    前面写了flink的文章,其实流处理不止有flink、storm、spark streaming,说实话这些其实都是比较传统的流处理框架。今天介绍一个大家不一定用得很多,但是却很有特点的东西,NiFi。

    NiFi的来源

    Apache NiFi项目,它是一种实时数据流处理 系统,在去年由美国安全局(NSA)开源并进入Apache社区,NiFi初始的项目名称是Niagarafiles。当NiFi项目开源之后,一些早先在NSA的开发者们创立了初创公司Onyara,Onyara随之继续NiFi项目的开发并提供相关的支持。Hortonworks公司收购了Onyara并将其开发者整合到自己的团队中,形成HDF(Hortonworks Data Flow)平台。

    NiFi的特点

    下面是官方的一些关键能力介绍,可以认真看看:

    Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:

    • Web-based user interface
    • Seamless experience between design, control, feedback, and monitoring
    • Highly configurable
    • Loss tolerant vs guaranteed delivery
    • Low latency vs high throughput
    • Dynamic prioritization
    • Flow can be modified at runtime
    • Back pressure
    • Data Provenance
    • Track dataflow from beginning to end
    • Designed for extension
    • Build your own processors and more
    • Enables rapid development and effective testing
    • Secure
    • SSL, SSH, HTTPS, encrypted content, etc...
    • Multi-tenant authorization and internal authorization/policy management

    总结来说,做为一个流处理引擎,NiFi的核心差异化能力主要有两点:

    丰富的算子

    整合了大量数据源的处理能力,详细的可以登录nifi官网

    (https://nifi.apache.org/docs.html)详细看各个算子的能力,下面列一列算子,让大家有个感觉,,还是相当丰富的。

    Processors

    • AttributeRollingWindow 1.3.0
    • AttributesToJSON 1.3.0
    • Base64EncodeContent 1.3.0
    • CaptureChangeMySQL 1.3.0
    • CompareFuzzyHash 1.3.0
    • CompressContent 1.3.0
    • ConnectWebSocket 1.3.0
    • ConsumeAMQP 1.3.0
    • ConsumeEWS 1.3.0
    • ConsumeIMAP 1.3.0
    • ConsumeJMS 1.3.0
    • ConsumeKafka 1.3.0
    • ConsumeKafka_0_10 1.3.0
    • ConsumeKafkaRecord_0_10 1.3.0
    • ConsumeMQTT 1.3.0
    • ConsumePOP3 1.3.0
    • ConsumeWindowsEventLog 1.3.0
    • ControlRate 1.3.0
    • ConvertAvroSchema 1.3.0
    • ConvertAvroToJSON 1.3.0
    • ConvertAvroToORC 1.3.0
    • ConvertCharacterSet 1.3.0
    • ConvertCSVToAvro 1.3.0
    • ConvertExcelToCSVProcessor 1.3.0
    • ConvertJSONToAvro 1.3.0
    • ConvertJSONToSQL 1.3.0
    • ConvertRecord 1.3.0
    • CreateHadoopSequenceFile 1.3.0
    • DebugFlow 1.3.0
    • DeleteDynamoDB 1.3.0
    • DeleteGCSObject 1.3.0
    • DeleteHDFS 1.3.0
    • DeleteS3Object 1.3.0
    • DeleteSQS 1.3.0
    • DetectDuplicate 1.3.0
    • DistributeLoad 1.3.0
    • DuplicateFlowFile 1.3.0
    • EncryptContent 1.3.0
    • EnforceOrder 1.3.0
    • EvaluateJsonPath 1.3.0
    • EvaluateXPath 1.3.0
    • EvaluateXQuery 1.3.0
    • ExecuteFlumeSink 1.3.0
    • ExecuteFlumeSource 1.3.0
    • ExecuteProcess 1.3.0
    • ExecuteScript 1.3.0
    • ExecuteSQL 1.3.0
    • ExecuteStreamCommand 1.3.0
    • ExtractAvroMetadata 1.3.0
    • ExtractCCDAAttributes 1.3.0
    • ExtractEmailAttachments 1.3.0
    • ExtractEmailHeaders 1.3.0
    • ExtractGrok 1.3.0
    • ExtractHL7Attributes 1.3.0
    • ExtractImageMetadata 1.3.0
    • ExtractMediaMetadata 1.3.0
    • ExtractText 1.3.0
    • ExtractTNEFAttachments 1.3.0
    • FetchAzureBlobStorage 1.3.0
    • FetchDistributedMapCache 1.3.0
    • FetchElasticsearch 1.3.0
    • FetchElasticsearch5 1.3.0
    • FetchElasticsearchHttp 1.3.0
    • FetchFile 1.3.0
    • FetchFTP 1.3.0
    • FetchGCSObject 1.3.0
    • FetchHBaseRow 1.3.0
    • FetchHDFS 1.3.0
    • FetchParquet 1.3.0
    • FetchS3Object 1.3.0
    • FetchSFTP 1.3.0
    • FuzzyHashContent 1.3.0
    • GenerateFlowFile 1.3.0
    • GenerateTableFetch 1.3.0
    • GeoEnrichIP 1.3.0
    • GetAzureEventHub 1.3.0
    • GetCouchbaseKey 1.3.0
    • GetDynamoDB 1.3.0
    • GetFile 1.3.0
    • GetFTP 1.3.0
    • GetHBase 1.3.0
    • GetHDFS 1.3.0
    • GetHDFSEvents 1.3.0
    • GetHDFSSequenceFile 1.3.0
    • GetHTMLElement 1.3.0
    • GetHTTP 1.3.0
    • GetIgniteCache 1.3.0
    • GetJMSQueue 1.3.0
    • GetJMSTopic 1.3.0
    • GetKafka 1.3.0
    • GetMongo 1.3.0
    • GetSFTP 1.3.0
    • GetSNMP 1.3.0
    • GetSolr 1.3.0
    • GetSplunk 1.3.0
    • GetSQS 1.3.0
    • GetTCP 1.3.0
    • GetTwitter 1.3.0
    • HandleHttpRequest 1.3.0
    • HandleHttpResponse 1.3.0
    • HashAttribute 1.3.0
    • HashContent 1.3.0
    • IdentifyMimeType 1.3.0
    • InferAvroSchema 1.3.0
    • InvokeHTTP 1.3.0
    • InvokeScriptedProcessor 1.3.0
    • ISPEnrichIP 1.3.0
    • JoltTransformJSON 1.3.0
    • ListAzureBlobStorage 1.3.0
    • ListDatabaseTables 1.3.0
    • ListenBeats 1.3.0
    • ListenHTTP 1.3.0
    • ListenLumberjack 1.3.0
    • ListenRELP 1.3.0
    • ListenSMTP 1.3.0
    • ListenSyslog 1.3.0
    • ListenTCP 1.3.0
    • ListenUDP 1.3.0
    • ListenWebSocket 1.3.0
    • ListFile 1.3.0
    • ListFTP 1.3.0
    • ListGCSBucket 1.3.0
    • ListHDFS 1.3.0
    • ListS3 1.3.0
    • ListSFTP 1.3.0
    • LogAttribute 1.3.0
    • LogMessage 1.3.0
    • LookupAttribute 1.3.0
    • LookupRecord 1.3.0
    • MergeContent 1.3.0
    • ModifyBytes 1.3.0
    • ModifyHTMLElement 1.3.0
    • MonitorActivity 1.3.0
    • Notify 1.3.0
    • ParseCEF 1.3.0
    • ParseEvtx 1.3.0
    • ParseSyslog 1.3.0
    • PartitionRecord 1.3.0
    • PostHTTP 1.3.0
    • PublishAMQP 1.3.0
    • PublishJMS 1.3.0
    • PublishKafka 1.3.0
    • PublishKafka_0_10 1.3.0
    • PublishKafkaRecord_0_10 1.3.0
    • PublishMQTT 1.3.0
    • PutAzureBlobStorage 1.3.0
    • PutAzureEventHub 1.3.0
    • PutCassandraQL 1.3.0
    • PutCloudWatchMetric 1.3.0
    • PutCouchbaseKey 1.3.0
    • PutDatabaseRecord 1.3.0
    • PutDistributedMapCache 1.3.0
    • PutDynamoDB 1.3.0
    • PutElasticsearch 1.3.0
    • PutElasticsearch5 1.3.0
    • PutElasticsearchHttp 1.3.0
    • PutElasticsearchHttpRecord 1.3.0
    • PutEmail 1.3.0
    • PutFile 1.3.0
    • PutFTP 1.3.0
    • PutGCSObject 1.3.0
    • PutHBaseCell 1.3.0
    • PutHBaseJSON 1.3.0
    • PutHDFS 1.3.0
    • PutHiveQL 1.3.0
    • PutHiveStreaming 1.3.0
    • PutHTMLElement 1.3.0
    • PutIgniteCache 1.3.0
    • PutJMS 1.3.0
    • PutKafka 1.3.0
    • PutKinesisFirehose 1.3.0
    • PutKinesisStream 1.3.0
    • PutLambda 1.3.0
    • PutMongo 1.3.0
    • PutParquet 1.3.0
    • PutRiemann 1.3.0
    • PutS3Object 1.3.0
    • PutSFTP 1.3.0
    • PutSlack 1.3.0
    • PutSNS 1.3.0
    • PutSolrContentStream 1.3.0
    • PutSplunk 1.3.0
    • PutSQL 1.3.0
    • PutSQS 1.3.0
    • PutSyslog 1.3.0
    • PutTCP 1.3.0
    • PutUDP 1.3.0
    • PutWebSocket 1.3.0
    • QueryCassandra 1.3.0
    • QueryDatabaseTable 1.3.0
    • QueryDNS 1.3.0
    • QueryElasticsearchHttp 1.3.0
    • QueryRecord 1.3.0
    • QueryWhois 1.3.0
    • ReplaceText 1.3.0
    • ReplaceTextWithMapping 1.3.0
    • ResizeImage 1.3.0
    • RouteHL7 1.3.0
    • RouteOnAttribute 1.3.0
    • RouteOnContent 1.3.0
    • RouteText 1.3.0
    • ScanAttribute 1.3.0
    • ScanContent 1.3.0
    • ScrollElasticsearchHttp 1.3.0
    • SegmentContent 1.3.0
    • SelectHiveQL 1.3.0
    • SetSNMP 1.3.0
    • SplitAvro 1.3.0
    • SplitContent 1.3.0
    • SplitJson 1.3.0
    • SplitRecord 1.3.0
    • SplitText 1.3.0
    • SplitXml 1.3.0
    • SpringContextProcessor 1.3.0
    • StoreInKiteDataset 1.3.0
    • TailFile 1.3.0
    • TransformXml 1.3.0
    • UnpackContent 1.3.0
    • UpdateAttribute 1.3.0
    • UpdateCounter 1.3.0
    • UpdateRecord 1.3.0
    • ValidateCsv 1.3.0
    • ValidateXml 1.3.0
    • Wait 1.3.0
    • YandexTranslate 1.3.0

    Controller Services

    • AvroReader 1.3.0
    • AvroRecordSetWriter 1.3.0
    • AvroSchemaRegistry 1.3.0
    • AWSCredentialsProviderControllerService 1.3.0
    • CouchbaseClusterService 1.3.0
    • CSVReader 1.3.0
    • CSVRecordSetWriter 1.3.0
    • DBCPConnectionPool 1.3.0
    • DistributedMapCacheClientService 1.3.0
    • DistributedMapCacheServer 1.3.0
    • DistributedSetCacheClientService 1.3.0
    • DistributedSetCacheServer 1.3.0
    • FreeFormTextRecordSetWriter 1.3.0
    • GCPCredentialsControllerService 1.3.0
    • GrokReader 1.3.0
    • HBase_1_1_2_ClientMapCacheService 1.3.0
    • HBase_1_1_2_ClientService 1.3.0
    • HiveConnectionPool 1.3.0
    • HortonworksSchemaRegistry 1.3.0
    • IPLookupService 1.3.0
    • JettyWebSocketClient 1.3.0
    • JettyWebSocketServer 1.3.0
    • JMSConnectionFactoryProvider 1.3.0
    • JsonPathReader 1.3.0
    • JsonRecordSetWriter 1.3.0
    • JsonTreeReader 1.3.0
    • PropertiesFileLookupService 1.3.0
    • ScriptedLookupService 1.3.0
    • ScriptedReader 1.3.0
    • ScriptedRecordSetWriter 1.3.0
    • SimpleCsvFileLookupService 1.3.0
    • SimpleKeyValueLookupService 1.3.0
    • StandardHttpContextMap 1.3.0
    • StandardSSLContextService 1.3.0
    • XMLFileLookupService 1.3.0

    Reporting Tasks

    • AmbariReportingTask 1.3.0
    • ControllerStatusReportingTask 1.3.0
    • DataDogReportingTask 1.3.0
    • MonitorDiskUsage 1.3.0
    • MonitorMemory 1.3.0
    • ScriptedReportingTask 1.3.0
    • SiteToSiteBulletinReportingTask 1.3.0
    • SiteToSiteProvenanceReportingTask 1.3.0
    • SiteToSiteStatusReportingTask 1.3.0
    • StandardGangliaReporter 1.3.0

    优雅的界面

    就是我个人认为非常不错的界面,可以看到非常详细的数据流向。

    NiFi

    NiFi在Hortonworks的定位

    因为NiFi可以对来自多种数据源的流数据进行处理,Hortonworks认为HDF平台非常适合用于物联网 (IoAT)的数据处理。HDF中的数据流动可以是多个方向,甚至是点对点的,用户可以同收集到的数据流进行交互,这种交互甚至可以延伸到数据源,比如一些传感器或是设备。按照Hortonworks公司的说法,HDF产品是对HDP产品的补充,前者主要处理移动中的数据,而后者基于Hadoop技术,主要负责从静止的数据中获取洞察。可以看一看Hortonworks官方宣传对HDF的定位,已经号称是端到端流数据处理分析。

    Hortonworks DataFlow (HDF) provides the only end-to-end platform that collects, curates, analyzes and acts on data in real-time, on-premises or in the cloud, with a drag-and-drop visual interface. HDF is an integrated solution with Apache Nifi/MiNifi, Apache Kafka, Apache Storm and Druid.

    HDF三大部分

    上图是概要介绍HDF三大部分,The HDF streaming data analytics platform includes data Flow Management, Stream Processing, and Enterprise Services.Nifi是作为数据管理和接入,可以延伸部署到边缘网关的重要能力。

    结语

    如果你的项目中也有同样的对多数据源的处理诉求,NiFi是个不错的选择。


    展开全文
  • 批处理和流处理

    千次阅读 2019-04-22 10:44:05
    目录 1批处理 1.1 Apache Hadoop ...2流处理 2.1Apache Storm 2.1.1 流处理模式 2.1.2 优势和局限 2.1.3Storm组成原理 2.1.4Storm主要的编程概念 2.1.5 总结 2.2 Apache Samza 2.2.1 流处理模式 2....

    目录

    1 批处理

    1.1 Apache Hadoop

    1.1.1 批处理模式:

    1.1.2 优势和局限:

    1.1.3 总结:

    2 流处理

    2.1 Apache Storm

    2.1.1 流处理模式

    2.1.2 优势和局限

    2.1.3 Storm组成原理

    2.1.4 Storm主要的编程概念

    2.1.5 总结

    2.2 Apache Samza

    2.2.1 流处理模式

    2.2.2 优势和局限

    2.2.3 总结

    3 混合处理系统:批处理和流处理

    3.1 Apache Spark

    3.1.1 批处理模式

    3.1.2 流处理模式

    3.1.3 优势和局限

    3.1.4 总结

    3.2.1 流处理模型

    3.2.2 批处理模型

    3.2.3 优势和局限

    3.2.4 总结


            大数据是收集、整理、处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称。虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性、规模,以及价值在最近几年才经历了大规模扩展。

    1 批处理

             批处理在大数据世界有着悠久的历史。批处理主要操作大容量静态数据集,并在计算过程完成后返回结果。

            Batch Tasks 批任务:你可以将batch任务理解为,已经用一个固定时间间隔(specific time interval)分组的数据点(influxdb中,每条数据被称为a data point,对应一个时间点的状态信息)集合。我们也可以称为这是一个数据窗口(window of data)。那么当我们启动一个批任务,kapacitor会按时间段切分来请求influxdb,这样可以避免在内存中缓存太多数据

          批处理模式中使用的数据集通常符合下列特征:

    • 有界:批处理数据集代表数据的有限集合
    • 持久:数据通常始终存储在某种类型的持久存储位置中
    • 大量:批处理操作通常是处理极为海量数据集的唯一方法

            批处理非常适合需要访问全套记录才能完成的计算工作。例如在计算总数和平均数时,必须将数据集作为一个整体加以处理,而不能将其视作多条记录的集合。这些操作要求在计算进行过程中数据维持自己的状态。

            需要处理大量数据的任务通常最适合用批处理操作进行处理。无论直接从持久存储设备处理数据集,或首先将数据集载入内存,批处理系统在设计过程中就充分考虑了数据的量,可提供充足的处理资源。由于批处理在应对大量持久数据方面的表现极为出色,因此经常被用于对历史数据进行分析

             大量数据的处理需要付出大量时间,因此批处理不适合对处理时间要求较高的场合。批任务适合以下情况:

    • 需要对一段时间的数据运行聚合函数运算,比如求这些数据点的平均值、最大值、最小值。
    • 警告功能不需要运行每个数据点,或者说这些数据所代表的状态,并不会频繁地变化。(比如某互联网公司的智能车项目,收集到的数据所代表的状态就是频繁变化的,比如精确到度的车辆朝向变化)。
    • 降低采集的频度很大(downsample),庞大的数据中只需要最明显的那部分数据就能说明整体状态的变化。
    • 一点额外的运行延迟不会对你的整体业务产生过大影响。
    • 当集群运行时序数据库时,kapacitor处理数据的速度慢于数据写入的速度。(处理逻辑较为复杂,而数据写入量很大)

    1.1 Apache Hadoop

           Apache Hadoop是一种专用于批处理的处理框架。Hadoop是首个在开源社区获得极大关注的大数据框架。基于谷歌有关海量数据处理所发表的多篇论文与经验的Hadoop重新实现了相关算法和组件堆栈,让大规模批处理技术变得更易用。

          新版Hadoop包含多个组件,即多个层,通过配合使用可处理批数据:

    • HDFS:HDFS是一种分布式文件系统层,可对集群节点间的存储和复制进行协调。HDFS确保了无法避免的节点故障发生后数据依然可用,可将其用作数据来源,可用于存储中间态的处理结果,并可存储计算的最终结果。
    • YARN:YARN是Yet Another Resource Negotiator(另一个资源管理器)的缩写,可充当Hadoop堆栈的集群协调组件。该组件负责协调并管理底层资源和调度作业的运行。通过充当集群资源的接口,YARN使得用户能在Hadoop集群中使用比以往的迭代方式运行更多类型的工作负载。
    • MapReduce:MapReduce是Hadoop的原生批处理引擎。

    1.1.1 批处理模式:

          Hadoop的处理功能来自MapReduce引擎。MapReduce的处理技术符合使用键值对的map、shuffle、reduce算法要求。基本处理过程包括:

    1. 从HDFS文件系统读取数据集
    2. 将数据集拆分成小块并分配给所有可用节点
    3. 针对每个节点上的数据子集进行计算(计算的中间态结果会重新写入HDFS)
    4. 重新分配中间态结果并按照键进行分组
    5. 通过对每个节点计算的结果进行汇总和组合对每个键的值进行“Reducing”
    6. 将计算而来的最终结果重新写入 HDFS

    1.1.2 优势和局限:

            由于这种方法严重依赖持久存储,每个任务需要多次执行读取和写入操作,因此速度相对较慢。但另一方面由于磁盘空间通常是服务器上最丰富的资源,这意味着MapReduce可以处理非常海量的数据集。同时也意味着相比其他类似技术,Hadoop的MapReduce通常可以在廉价硬件上运行,因为该技术并不需要将一切都存储在内存中。MapReduce具备极高的缩放潜力,生产环境中曾经出现过包含数万个节点的应用。

            MapReduce的学习曲线较为陡峭,虽然Hadoop生态系统的其他周边技术可以大幅降低这一问题的影响,但通过Hadoop集群快速实现某些应用时依然需要注意这个问题。

            围绕Hadoop已经形成了辽阔的生态系统,Hadoop集群本身也经常被用作其他软件的组成部件。很多其他处理框架和引擎通过与Hadoop集成也可以使用HDFS和YARN资源管理器。

    1.1.3 总结:

            Apache Hadoop及其MapReduce处理引擎提供了一套久经考验的批处理模型,最适合处理对时间要求不高的非常大规模数据集。通过非常低成本的组件即可搭建完整功能的Hadoop集群,使得这一廉价且高效的处理技术可以灵活应用在很多案例中。与其他框架和引擎的兼容与集成能力使得Hadoop可以成为使用不同技术的多种工作负载处理平台的底层基础。

    流处理

            流处理系统会对随时进入系统的数据进行计算。相比批处理模式,这是一种截然不同的处理方式。流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作

           流处理中的数据集是“无边界”的,这就产生了几个重要的影响:

    • 完整数据集只能代表截至目前已经进入到系统中的数据总量。
    • 工作数据集也许更相关,在特定时间只能代表某个单一数据项。
    • 处理工作是基于事件的,除非明确停止否则没有“尽头”。处理结果立刻可用,并会随着新数据的抵达继续更新。

            流处理系统可以处理几乎无限量的数据,但同一时间只能处理一条(真正的流处理)或很少量(微批处理,Micro-batch Processing)数据,不同记录间只维持最少量的状态。虽然大部分系统提供了用于维持某些状态的方法,但流处理主要针对副作用更少,更加功能性的处理(Functional processing)进行优化。

            功能性操作主要侧重于状态或副作用有限的离散步骤。针对同一个数据执行同一个操作会忽略其他因素产生相同的结果,此类处理非常适合流处理,因为不同项的状态通常是某些困难、限制,以及某些情况下不需要的结果的结合体。因此虽然某些类型的状态管理通常是可行的,但这些框架通常在不具备状态管理机制时更简单也更高效。

            此类处理非常适合某些类型的工作负载。有近实时处理需求的任务很适合使用流处理模式。分析、服务器或应用程序错误日志,以及其他基于时间的衡量指标是最适合的类型,因为对这些领域的数据变化做出响应对于业务职能来说是极为关键的。流处理很适合用来处理必须对变动或峰值做出响应并且关注一段时间内变化趋势的数据

            流任务适合以下情况:

    • 需要实时地转化每个数据点。(严格地讲,批任务可以这么做,但需要考虑延迟,或者说时效性。)
    • 数据是报警数据,不能容忍太多的延迟。
    • 涉及的数据会被频繁地查询,上文说到过,influxdb的读取能力并不乐观,所以通过流任务,可以有效降低压力。(比如说一个库是统计用户的掉线情况的,如果你直接从这个库创建一个反映总掉线次数的dashboard,那么每次有人查看这个dashboard,数据库都要进行一次统计,这样就可以使用流任务将掉线次数实时计算并存入新的measurement)
    • 流任务通过数据点的时间戳来获取时间信息,所以不会与一个给定的数据点是否进入时间窗产生竞争关系。

           流任务,就像是使用消息队列时候对数据源再创建一个订阅,任何写入influxdb的数据点也会被写入kapacitor。所以需要牢记,流任务要求大量的内存分配。流数据任务并不局限于influxdb,你也可以直接从telegraf获取数据输入,然后不经influxdb直接传送给kapacitor。

    2.1 Apache Storm

            Apache Storm是一种侧重于极低延迟的流处理框架,也许是要求近实时处理的工作负载的最佳选择。该技术可处理非常大量的数据,通过比其他解决方案更低的延迟提供结果。

    2.1.1 流处理模式

           Storm的流处理可对框架中名为Topology(拓扑)的DAG(Directed Acyclic Graph,有向无环图)进行编排。这些拓扑描述了当数据片段进入系统后,需要对每个传入的片段执行的不同转换或步骤。

           拓扑包含:

    • Stream:普通的数据流,这是一种会持续抵达系统的无边界数据。
    • Spout:位于拓扑边缘的数据流来源,例如可以是API或查询等,从这里可以产生待处理的数据。
    • Bolt:Bolt代表需要消耗流数据,对其应用操作,并将结果以流的形式进行输出的处理步骤。Bolt需要与每个Spout建立连接,随后相互连接以组成所有必要的处理。在拓扑的尾部,可以使用最终的Bolt输出作为相互连接的其他系统的输入。

            Storm背后的想法是使用上述组件定义大量小型的离散操作,随后将多个组件组成所需拓扑。默认情况下Storm提供了“至少一次”的处理保证,这意味着可以确保每条消息至少可以被处理一次,但某些情况下如果遇到失败可能会处理多次。Storm无法确保可以按照特定顺序处理消息。

            为了实现严格的一次处理,即有状态处理,可以使用一种名为Trident的抽象。严格来说不使用Trident的Storm通常可称之为Core Storm。Trident会对Storm的处理能力产生极大影响,会增加延迟,为处理提供状态,使用微批模式代替逐项处理的纯粹流处理模式。

            为避免这些问题,通常建议Storm用户尽可能使用Core Storm。然而也要注意,Trident对内容严格的一次处理保证在某些情况下也比较有用,例如系统无法智能地处理重复消息时。如果需要在项之间维持状态,例如想要计算一个小时内有多少用户点击了某个链接,此时Trident将是你唯一的选择。尽管不能充分发挥框架与生俱来的优势,但Trident提高了Storm的灵活性。

           Trident拓扑包含:

    • 流批(Stream batch):这是指流数据的微批,可通过分块提供批处理语义。
    • 操作(Operation):是指可以对数据执行的批处理过程。

    2.1.2 优势和局限

            目前来说Storm可能是近实时处理领域的最佳解决方案。该技术可以用极低延迟处理数据,可用于希望获得最低延迟的工作负载。如果处理速度直接影响用户体验,例如需要将处理结果直接提供给访客打开的网站页面,此时Storm将会是一个很好的选择。

            Storm与Trident配合使得用户可以用微批代替纯粹的流处理。虽然借此用户可以获得更大灵活性打造更符合要求的工具,但同时这种做法会削弱该技术相比其他解决方案最大的优势。话虽如此,但多一种流处理方式总是好的。

            Core Storm无法保证消息的处理顺序。Core Storm为消息提供了“至少一次”的处理保证,这意味着可以保证每条消息都能被处理,但也可能发生重复。Trident提供了严格的一次处理保证可以在不同批之间提供顺序处理,但无法在一个批内部实现顺序处理。

            在互操作性方面,Storm可与Hadoop的YARN资源管理器进行集成,因此可以很方便地融入现有Hadoop部署。除了支持大部分处理框架,Storm还可支持多种语言,为用户的拓扑定义提供了更多选择。

    2.1.3 Storm组成原理

            集群环境配置下的Storm存在两类节点:主控节点工作节点。此外,为了实现集群的状态维护和配置管理,还需要一类特殊的节点:协调节点。整体架构如下图:

    (1)主控节点,即运行nimbus守护进程的节点。 nimbus负责在集群分发的代码,将任务分配给其他机器,并负责故障监测。

    (2)工作节点,即运行supervisor守护进程的节点。

             supervisor监听分配所在机器,根据nimbus的委派,在必要时启动和关闭工作进程。(工作节点是实时数据处理作业运行的节点)

           其中,计算在节点上的物理单元是worker,也即工作进程;计算的逻辑单元是executor,也即计算线程。(有点像spark) 然而计算的作业逻辑单元是topology,也称拓扑;计算的任务逻辑单元是task(还是有点像spark)。

           每个worker执行特定topology的executor子集,每个executor执行一个或多个task。

        一个topology主要有两类组件(component):spoutbolt。分别是流失数据在topology中的起始单元和处理单元。

    (3)协调节点,即运行Zookeeper服务端进程的节点。

            Zookeeper是一种分布式的状态协同服务,通过放松一致性的要求,为应用建立高层的协同原语(阻塞和更强一致性的要求),当前分布式系统中,广泛应用于状态监控和配置管理。

    2.1.4 Storm主要的编程概念

    1. spout  是流式处理的源头,是一个计算的起始单元,它封装数据源中的数据为storm可以识别的数据项。 spout可以从消息中间件中(如kafka、kestrel等)中读取数据产生流式元祖数据,也可以从其他接口如Twitter streaming API直接获取流式数据。
    2. bolt 是处理过程单元,从输入流中获取一定数量的数据项处理后,将结果作为输出流发送。流式数据处理的业务逻辑,大部分是在bolt中实现的,如各类函数、过滤器、连接操作、聚集操作、数据库操作等。
    3. topology是由spout和bolt为点组成的网络,网络中的边表示一个bolt订阅了某个或某个其他bolt或spout的输出流。topology可以是任意复杂多阶段流计算的网络,在Storm集群中提交后立即运行。

    2.1.5 总结

            对于延迟需求很高的纯粹的流处理工作负载,Storm可能是最适合的技术。该技术可以保证每条消息都被处理,可配合多种编程语言使用。由于Storm无法进行批处理,如果需要这些能力可能还需要使用其他软件。如果对严格的一次处理保证有比较高的要求,此时可考虑使用Trident。不过这种情况下其他流处理框架也许更适合。

    2.2 Apache Samza

           Apache Samza是一种与Apache Kafka消息系统紧密绑定的流处理框架。虽然Kafka可用于很多流处理系统,但按照设计,Samza可以更好地发挥Kafka独特的架构优势和保障。该技术可通过Kafka提供容错、缓冲,以及状态存储。

            Kafka是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

            Samza可使用YARN作为资源管理器。这意味着默认情况下需要具备Hadoop集群(至少具备HDFS和YARN),但同时也意味着Samza可以直接使用YARN丰富的内建功能。

    2.2.1 流处理模式

            Samza依赖Kafka的语义定义流的处理方式。Kafka在处理数据时涉及下列概念:

    • Topic(话题):进入Kafka系统的每个数据流可称之为一个话题。话题基本上是一种可供消耗方订阅的,由相关信息组成的数据流。
    • Partition(分区):为了将一个话题分散至多个节点,Kafka会将传入的消息划分为多个分区。分区的划分将基于键(Key)进行,这样可以保证包含同一个键的每条消息可以划分至同一个分区。分区的顺序可获得保证。
    • Broker(代理):组成Kafka集群的每个节点也叫做代理。
    • Producer(生成方):任何向Kafka话题写入数据的组件可以叫做生成方。生成方可提供将话题划分为分区所需的键。
    • Consumer(消耗方):任何从Kafka读取话题的组件可叫做消耗方。消耗方需要负责维持有关自己分支的信息,这样即可在失败后知道哪些记录已经被处理过了。

            由于Kafka相当于永恒不变的日志,Samza也需要处理永恒不变的数据流。这意味着任何转换创建的新数据流都可被其他组件所使用,而不会对最初的数据流产生影响。

    2.2.2 优势和局限

            乍看之下,SamzaKafka类查询系统的依赖似乎是一种限制,然而这也可以为系统提供一些独特的保证和功能,这些内容也是其他流处理系统不具备的。例如Kafka已经提供了可以通过低延迟方式访问的数据存储副本,此外还可以为每个数据分区提供非常易用且低成本的多订阅者模型。所有输出内容,包括中间态的结果都可写入到Kafka,并可被下游步骤独立使用。这种对Kafka的紧密依赖在很多方面类似于MapReduce引擎对HDFS的依赖。虽然在批处理的每个计算之间对HDFS的依赖导致了一些严重的性能问题,但也避免了流处理遇到的很多其他问题。

            Samza与Kafka之间紧密的关系使得处理步骤本身可以非常松散地耦合在一起。无需事先协调,即可在输出的任何步骤中增加任意数量的订阅者,对于有多个团队需要访问类似数据的组织,这一特性非常有用。多个团队可以全部订阅进入系统的数据话题,或任意订阅其他团队对数据进行过某些处理后创建的话题。这一切并不会对数据库等负载密集型基础架构造成额外的压力。

             直接写入Kafka还可避免回压(Backpressure)问题。回压是指当负载峰值导致数据流入速度超过组件实时处理能力的情况,这种情况可能导致处理工作停顿并可能丢失数据。按照设计,Kafka可以将数据保存很长时间,这意味着组件可以在方便的时候继续进行处理,并可直接重启动而无需担心造成任何后果。

             Samza可以使用以本地键值存储方式实现的容错检查点系统存储数据。这样Samza即可获得“至少一次”的交付保障,但面对由于数据可能多次交付造成的失败,该技术无法对汇总后状态(例如计数)提供精确恢复。

             Samza提供的高级抽象使其在很多方面比Storm等系统提供的基元(Primitive)更易于配合使用。目前Samza只支持JVM语言,这意味着它在语言支持方面不如Storm灵活。

    2.2.3 总结

            对于已经具备或易于实现Hadoop和Kafka的环境,Apache Samza是流处理工作负载一个很好的选择。Samza本身很适合有多个团队需要使用(但相互之间并不一定紧密协调)不同处理阶段的多个数据流的组织。Samza可大幅简化很多流处理工作,可实现低延迟的性能。如果部署需求与当前系统不兼容,也许并不适合使用,但如果需要极低延迟的处理,或对严格的一次处理语义有较高需求,此时依然适合考虑。

    混合处理系统:批处理和流处理

            一些处理框架可同时处理批处理和流处理工作负载。这些框架可以用相同或相关的组件和API处理两种类型的数据,借此让不同的处理需求得以简化。

           如你所见,这一特性主要是由Spark和Flink实现的,下文将介绍这两种框架。实现这样的功能重点在于两种不同处理模式如何进行统一,以及要对固定和不固定数据集之间的关系进行何种假设

          虽然侧重于某一种处理类型的项目会更好地满足具体用例的要求,但混合框架意在提供一种数据处理的通用解决方案。这种框架不仅可以提供处理数据所需的方法,而且提供了自己的集成项、库、工具,可胜任图形分析、机器学习、交互式查询等多种任务。

    3.1 Apache Spark

           Apache Spark是一种包含流处理能力的下一代批处理框架。与Hadoop的MapReduce引擎基于各种相同原则开发而来的Spark主要侧重于通过完善的内存计算和处理优化机制加快批处理工作负载的运行速度。

           Spark可作为独立集群部署(需要相应存储层的配合),或可与Hadoop集成并取代MapReduce引擎。

    3.1.1 批处理模式

            与MapReduce不同,Spark的数据处理工作全部在内存中进行,只在一开始将数据读入内存,以及将最终结果持久存储时需要与存储层交互。所有中间态的处理结果均存储在内存中。

            虽然内存中处理方式可大幅改善性能,Spark在处理与磁盘有关的任务时速度也有很大提升,因为通过提前对整个任务集进行分析可以实现更完善的整体式优化。为此Spark可创建代表所需执行的全部操作,需要操作的数据,以及操作和数据之间关系的Directed Acyclic Graph(有向无环图),即DAG,借此处理器可以对任务进行更智能的协调。

           为了实现内存中批计算,Spark会使用一种名为Resilient Distributed Dataset(弹性分布式数据集),即RDD的模型来处理数据。这是一种代表数据集,只位于内存中,永恒不变的结构。针对RDD执行的操作可生成新的RDD。每个RDD可通过世系(Lineage)回溯至父级RDD,并最终回溯至磁盘上的数据。Spark可通过RDD在无需将每个操作的结果写回磁盘的前提下实现容错。

    Spark之RDDhttps://www.cnblogs.com/qingyunzong/p/8899715.html

    3.1.2 流处理模式

            流处理能力是由Spark Streaming实现的。Spark本身在设计上主要面向批处理工作负载,为了弥补引擎设计和流处理工作负载特征方面的差异,Spark实现了一种叫做微批(Micro-batch)*的概念。在具体策略方面该技术可以将数据流视作一系列非常小的“批”,借此即可通过批处理引擎的原生语义进行处理。

           Spark Streaming会以亚秒级增量对流进行缓冲,随后这些缓冲会作为小规模的固定数据集进行批处理。这种方式的实际效果非常好,但相比真正的流处理框架在性能方面依然存在不足。

    3.1.3 优势和局限

           使用Spark而非Hadoop MapReduce的主要原因是速度。在内存计算策略和先进的DAG调度等机制的帮助下,Spark可以用更快速度处理相同的数据集。

            Spark的另一个重要优势在于多样性。该产品可作为独立集群部署,或与现有 Hadoop集群集成。该产品可运行批处理和流处理,运行一个集群即可处理不同类型的任务。

            除了引擎自身的能力外,围绕Spark还建立了包含各种库的生态系统,可为机器学习、交互式查询等任务提供更好的支持。相比MapReduce,Spark任务更是“众所周知”地易于编写,因此可大幅提高生产力。

            为流处理系统采用批处理的方法,需要对进入系统的数据进行缓冲。缓冲机制使得该技术可以处理非常大量的传入数据,提高整体吞吐率,但等待缓冲区清空也会导致延迟增高。这意味着Spark Streaming可能不适合处理对延迟有较高要求的工作负载

            由于内存通常比磁盘空间更贵,因此相比基于磁盘的系统,Spark成本更高。然而处理速度的提升意味着可以更快速完成任务,在需要按照小时数为资源付费的环境中,这一特性通常可以抵消增加的成本。

            Spark内存计算这一设计的另一个后果是,如果部署在共享的集群中可能会遇到资源不足的问题。相比Hadoop MapReduce,Spark的资源消耗更大,可能会对需要在同一时间使用集群的其他任务产生影响。从本质来看,Spark更不适合与Hadoop堆栈的其他组件共存一处。

    3.1.4 总结

    Spark是多样化工作负载处理任务的最佳选择。Spark批处理能力以更高内存占用为代价提供了无与伦比的速度优势。对于重视吞吐率而非延迟的工作负载,则比较适合使用Spark Streaming作为流处理解决方案。

            Apache Flink是一种可以处理批处理任务的流处理框架。该技术可将批处理数据视作具备有限边界的数据流,借此将批处理任务作为流处理的子集加以处理。为所有处理任务采取流处理为先的方法会产生一系列有趣的副作用。

            这种流处理为先的方法也叫做Kappa架构,与之相对的是更加被广为人知的Lambda架构(该架构中使用批处理作为主要处理方法,使用流作为补充并提供早期未经提炼的结果)。Kappa架构中会对一切进行流处理,借此对模型进行简化,而这一切是在最近流处理引擎逐渐成熟后才可行的。

    3.2.1 流处理模型

           Flink的流处理模型在处理传入数据时会将每一项视作真正的数据流。Flink提供的DataStream API可用于处理无尽的数据流。Flink可配合使用的基本组件包括:

    • Stream(流)是指在系统中流转的,永恒不变的无边界数据集
    • Operator(操作方)是指针对数据流执行操作以产生其他数据流的功能
    • Source(源)是指数据流进入系统的入口点
    • Sink(槽)是指数据流离开Flink系统后进入到的位置,槽可以是数据库或到其他系统的连接器

           为了在计算过程中遇到问题后能够恢复,流处理任务会在预定时间点创建快照。为了实现状态存储,Flink可配合多种状态后端系统使用,具体取决于所需实现的复杂度和持久性级别。

           此外Flink的流处理能力还可以理解“事件时间”这一概念,这是指事件实际发生的时间,此外该功能还可以处理会话。这意味着可以通过某种有趣的方式确保执行顺序和分组。

    3.2.2 批处理模型

            Flink的批处理模型在很大程度上仅仅是对流处理模型的扩展。此时模型不再从持续流中读取数据,而是从持久存储中以流的形式读取有边界的数据集。Flink会对这些处理模型使用完全相同的运行时。

            Flink可以对批处理工作负载实现一定的优化。例如由于批处理操作可通过持久存储加以支持,Flink可以不对批处理工作负载创建快照。数据依然可以恢复,但常规处理操作可以执行得更快。

            另一个优化是对批处理任务进行分解,这样即可在需要的时候调用不同阶段和组件。借此Flink可以与集群的其他用户更好地共存。对任务提前进行分析使得Flink可以查看需要执行的所有操作、数据集的大小,以及下游需要执行的操作步骤,借此实现进一步的优化。

    3.2.3 优势和局限

            Flink目前是处理框架领域一个独特的技术。虽然Spark也可以执行批处理和流处理,但Spark的流处理采取的微批架构使其无法适用于很多用例。Flink流处理为先的方法可提供低延迟,高吞吐率,近乎逐项处理的能力。

            Flink的很多组件是自行管理的。虽然这种做法较为罕见,但出于性能方面的原因,该技术可自行管理内存,无需依赖原生的Java垃圾回收机制。与Spark不同,待处理数据的特征发生变化后Flink无需手工优化和调整,并且该技术也可以自行处理数据分区和自动缓存等操作。

            Flink会通过多种方式对工作进行分许进而优化任务。这种分析在部分程度上类似于SQL查询规划器对关系型数据库所做的优化,可针对特定任务确定最高效的实现方法。该技术还支持多阶段并行执行,同时可将受阻任务的数据集合在一起。对于迭代式任务,出于性能方面的考虑,Flink会尝试在存储数据的节点上执行相应的计算任务。此外还可进行“增量迭代”,或仅对数据中有改动的部分进行迭代。

            在用户工具方面,Flink提供了基于Web的调度视图,借此可轻松管理任务并查看系统状态。用户也可以查看已提交任务的优化方案,借此了解任务最终是如何在集群中实现的。对于分析类任务,Flink提供了类似SQL的查询,图形化处理,以及机器学习库,此外还支持内存计算。

            Flink能很好地与其他组件配合使用。如果配合Hadoop 堆栈使用,该技术可以很好地融入整个环境,在任何时候都只占用必要的资源。该技术可轻松地与YARN、HDFS和Kafka 集成。在兼容包的帮助下,Flink还可以运行为其他处理框架,例如Hadoop和Storm编写的任务。

            目前Flink最大的局限之一在于这依然是一个非常“年幼”的项目。现实环境中该项目的大规模部署尚不如其他处理框架那么常见,对于Flink在缩放能力方面的局限目前也没有较为深入的研究。随着快速开发周期的推进和兼容包等功能的完善,当越来越多的组织开始尝试时,可能会出现越来越多的Flink部署。

    3.2.4 总结

           Flink提供了低延迟流处理,同时可支持传统的批处理任务。Flink也许最适合有极高流处理需求,并有少量批处理任务的组织。该技术可兼容原生Storm和Hadoop程序,可在YARN管理的集群上运行,因此可以很方便地进行评估。快速进展的开发工作使其值得被大家关注。

     

     

     

     

     

     

     

     

    https://blog.51cto.com/13957185/2287427

    https://www.jianshu.com/p/ff0c1fd9dc68

    https://www.jianshu.com/p/5cc07eae1a0c

    https://www.cnblogs.com/yangsy0915/p/5118950.html

    展开全文
  • 流处理框架对比

    千次阅读 2018-03-22 23:59:59
    分布式流处理是对无边界数据集进行连续不断的处理、聚合和分析的过程,与MapReduce一样是一种通用计算框架,期望延迟在毫秒或者秒级别。这类系统一般采用有向无环图(DAG)。DAG是任务链的图形化表示,用它来描述...

    分布式流处理是对无边界数据集进行连续不断的处理、聚合和分析的过程,与MapReduce一样是一种通用计算框架,期望延迟在毫秒或者秒级别。这类系统一般采用有向无环图(DAG)。DAG是任务链的图形化表示,用它来描述流处理作业的拓扑。在选择不同的流处理系统时,通常会关注以下几点:

    • 运行时和编程模型:平台框架提供的编程模型决定了许多特色功能,编程模型要足够处理各种应用场景。
    • 函数式原语:流处理平台应该能提供丰富的功能函数,比如,map或者filter这类易扩展、处理单条信息的函数;处理多条信息的函数aggregation;跨数据流、不易扩展的操作join等。
    • 状态管理:大部分应用都需要保持状态处理的逻辑。流处理平台应该提供存储、访问和更新状态信息。
    • 消息传输保障:消息传输保障一般有三种:at most once,at least once和exactly once
      1. At most once:消息传输机制是每条消息传输零次或者一次,即消息可能会丢失;
      2. A t least once:意味着每条消息会进行多次传输尝试,至少一次成功,即消息传输可能重复但不会丢失;
      3. Exactly once:消息传输机制是每条消息有且只有一次,即消息传输既不会丢失也不会重复。
    • 容错:流处理框架中的失败会发生在各个层次,比如,网络部分,磁盘崩溃或者节点宕机等。流处理框架应该具备从所有这种失败中恢复,并从上一个成功的状态(无脏数据)重新消费。
    • 性能:延迟时间(Latency),吞吐量(Throughput)和扩展性(Scalability)是流处理应用中极其重要的指标。
    • 平台的成熟度和接受度:成熟的流处理框架可以提供潜在的支持,可用的库,甚至开发问答帮助。选择正确的平台会在这方面提供很大的帮助。

    运行时和编程模型 


         运行时和编程模型是一个系统最重要的特质,因为它们定义了表达方式、可能的操作和将来的局限性。因此,运行时和编程模型决定了系统的能力和适用场景。实现流处理系统有两种完全不同的方式:

    1. 原生流处理:指所有输入的记录一旦到达即会一个接着一个进行处理。示例如下:
    2. 微批处理:把输入的数据按照某种预先定义的时间间隔(典型的是几秒钟)分成短小的批量数据,流经流处理系统。示例如下:

     

    两种方法都有其先天的优势和不足,原生流处理的优势在于它的表达方式。数据一旦到达立即处理,这些系统的延迟性远比其它微批处理要好。除了延迟性外,原生流处理的状态操作也容易实现。一般原生流处理系统为了达到低延迟和容错性会花费比较大的成本,因为它需要考虑每条记录。原生流处理的负载均衡也是个问题。比如,我们处理的数据按key分区,如果分区的某个key是资源密集型,那这个分区很容易成为作业的瓶颈。

    微批处理。将流式计算分解成一系列短小的批处理作业,也不可避免的减弱系统的表达力。像状态管理或者join等操作的实现会变的困难,因为微批处理系统必须操作整个批量数据。并且,batch interval会连接两个不易连接的事情:基础属性和业务逻辑。相反地,微批处理系统的容错性和负载均衡实现起来非常简单,因为微批处理系统仅发送每批数据到一个worker节点上,如果一些数据出错那就使用其它副本。微批处理系统很容易建立在原生流处理系统之上。

    编程模型一般分为组合式和声明式。组合式编程提供基本的构建模块,它们必须紧密结合来创建拓扑。新的组件经常以接口的方式完成。相对应地,声明式API操作是定义的高阶函数。它允许我们用抽象类型和方法来写函数代码,并且系统创建拓扑和优化拓扑。声明式API经常也提供更多高级的操作(比如,窗口函数或者状态管理)。

    主流开源流处理系统


     主源开源的流处理系统如下图,暂时不介绍商业的系统,比如Google MillWheel或者Amazon Kinesis,也不会涉及很少使用的Intel GearPump或者Apache Apex

    Apache Storm最开始是由Nathan Marz和他的团队于2010年在数据分析公司BackType开发的,后来BackType公司被Twitter收购,接着Twitter开源Storm并在2014年成为Apache顶级项目。毋庸置疑,Storm成为大规模流数据处理的先锋,并逐渐成为工业标准。Storm是原生的流处理系统,提供low-level的API。Storm使用Thrift来定义topology和支持多语言协议,使得我们可以使用大部分编程语言开发,Scala自然包括在内

    Trident是对Storm的一个更高层次的抽象,Trident最大的特点以batch的形式进行流处理。Trident简化topology构建过程,增加了窗口操作、聚合操作或者状态管理等高级操作,这些在Storm中并不支持。相对应于Storm的At most once流传输机制,Trident提供了Exactly once传输机制。Trident支持Java,Clojure和Scala

     当前Spark是非常受欢迎的批处理框架,包含Spark SQL,MLlib和Spark Streaming。Spark的运行时是建立在批处理之上,因此后续加入的Spark Streaming也依赖于批处理,实现了微批处理。接收器把输入数据流分成短小批处理,并以类似Spark作业的方式处理微批处理。Spark Streaming提供高级声明式API(支持Scala,Java和Python)

    Samza最开始是专为LinkedIn公司开发的流处理解决方案,并和LinkedIn的Kafka一起贡献给社区,现已成为基础设施的关键部分。Samza的构建严重依赖于基于log的Kafka,两者紧密耦合。Samza提供组合式API,当然也支持Scala

    Flink是个相当早的项目,开始于2008年,但只在最近才得到注意。Flink是原生的流处理系统,提供high level的API。Flink也提供API来像Spark一样进行批处理,但两者处理的基础是完全不同的。Flink把批处理当作流处理中的一种特殊情况。在Flink中,所有的数据都看作流,是一种很好的抽象,因为这更接近于现实世界

    快速的介绍流处理系统之后,让我们以下面的表格来更好清晰的展示它们之间的不同:

    容错性


     流处理系统的容错性与生俱来的比批处理系统难实现。当批处理系统中出现错误时,我们只需要把失败的部分简单重启即可;但对于流处理系统,出现错误就很难恢复。因为线上许多作业都是7 x 24小时运行,不断有输入的数据。流处理系统面临的另外一个挑战是状态一致性,因为重启后会出现重复数据,并且不是所有的状态操作是幂等的。

    以是流处理框架容错性处理方案:

    Apache Storm:Storm使用上游数据备份和消息确认的机制来保障消息在失败之后会重新处理。消息确认原理:每个操作都会把前一次的操作处理消息的确认信息返回。Topology的数据源备份它生成的所有数据记录。当所有数据记录的处理确认信息收到,备份即会被安全拆除。失败后,如果不是所有的消息处理确认信息收到,那数据记录会被数据源数据替换。这保障了没有数据丢失,但数据结果会有重复,这就是at-least once传输机制。

    Storm采用取巧的办法完成了容错性,对每个源数据记录仅仅要求几个字节存储空间来跟踪确认消息。纯数据记录消息确认架构,尽管性能不错,但不能保证exactly once消息传输机制,所有应用开发者需要处理重复数据。Storm存在低吞吐量和流控问题,因为消息确认机制在反压下经常误认为失败。

    Spark Streaming:Spark Streaming实现微批处理,容错机制的实现跟Storm不一样的方法。微批处理的想法相当简单。Spark在集群各worker节点上处理micro-batches。每个micro-batches一旦失败,重新计算就行。因为micro-batches本身的不可变性,并且每个micro-batches也会持久化,所以exactly once传输机制很容易实现。

    Samza:Samza的实现方法跟前面两种流处理框架完全不一样。Samza利用消息系统Kafka的持久化和偏移量。Samza监控任务的偏移量,当任务处理完消息,相应的偏移量被移除。消息的偏移量会被checkpoint到持久化存储中,并在失败时恢复。但是问题在于:从上次checkpoint中修复偏移量时并不知道上游消息已经被处理过,这就会造成重复。这就是at least once传输机制。

    Apache Flink:Flink的容错机制是基于分布式快照实现的,这些快照会保存流处理作业的状态(本文对Flink的检查点和快照不进行区分,因为两者实际是同一个事物的两种不同叫法。Flink构建这些快照的机制可以被描述成分布式数据流的轻量级异步快照,它采用Chandy-Lamport算法实现。)。如果发生失败的情况,系统可以从这些检查点进行恢复。Flink发送checkpoint的栅栏(barrier)到数据流中(栅栏是Flink的分布式快照机制中一个核心的元素),当checkpoint的栅栏到达其中一个operator,operator会接所有收输入流中对应的栅栏(比如,图中checkpoint n对应栅栏n到n-1的所有输入流,其仅仅是整个输入流的一部分)。所以相对于Storm,Flink的容错机制更高效,因为Flink的操作是对小批量数据而不是每条数据记录。但也不要让自己糊涂了,Flink仍然是原生流处理框架,它与Spark Streaming在概念上就完全不同。Flink也提供exactly once消息传输机制。

    状态管理


    状态管理大部分大型流处理应用都涉及到状态。相对于无状态的操作(其只有一个输入数据,处理过程和输出结果),有状态的应用会有一个输入数据和一个状态信息,然后处理过程,接着输出结果和修改状态信息。因此,我们不得不管理状态信息,并持久化。我们期望一旦因某种原因失败,状态能够修复。状态修复有可能会出现小问题,它并不总是保证exactly once,有时也会出现消费多次,但这并不是我们想要的。

    据我们所知,Storm提供at-least once的消息传输保障。那我们又该如何使用Trident做到exactly once的语义。概念上貌似挺简单,你只需要提交每条数据记录,但这显然不是那么高效。所以你会想到小批量的数据记录一起提交会优化。Trident定义了几个抽象来达到exactly once的语义,见下图,其中也会有些局限。

    Spark Streaming是微批处理系统,它把状态信息也看做是一种微批量数据流。在处理每个微批量数据时,Spark加载当前的状态信息,接着通过函数操作获得处理后的微批量数据结果并修改加载过的状态信息。

    Samza实现状态管理是通过Kafka来处理的。Samza有真实的状态操作,所以其任务会持有一个状态信息,并把状态改变的日志推送到Kafka。如果需要状态重建,可以很容易的从Kafka的topic重建。为了达到更快的状态管理,Samza也支持把状态信息放入本地key-value存储中,所以状态信息不必一直在Kafka中管理,见下图。不幸的是,Samza只提供at-least once语义,exactly once的支持也在计划中

    Flink提供状态操作,和Samza类似。Flink提供两种类型的状态:一种是用户自定义状态;另外一种是窗口状态。如图,第一个状态是自定义状态,它和其它的的状态不相互作用。这些状态可以分区或者使用嵌入式Key-Value存储状态[文档一和二]。当然Flink提供exactly-once语义。下图展示Flink长期运行的三个状态。

    小结


    对于延迟性来说,微批处理一般在秒级别,大部分原生流处理在百毫秒以下,调优的情况下Storm可以很轻松的达到十毫秒。同时也要记住,消息传输机制保障,容错性和状态恢复都会占用机器资源。例如,打开容错恢复可能会降低10%到15%的性能,Storm可能降低70%的吞吐量。

    总之,天下没有免费的午餐。对于有状态管理,Flink会降低25%的性能,Spark Streaming降低50%的性能。也要记住,各大流处理框架的所有操作都是分布式的,通过网络发送数据是相当耗时的,所以尽量利用数据本地性,也尽量优化你的应用的序列化。

    项目成熟度:Storm是第一个主流的流处理框架,后期已经成为长期的工业级的标准,并在像Twitter,Yahoo,Spotify等大公司使用。Spark Streaming是最近最流行的Scala代码实现的流处理框架。现在Spark Streaming被公司(Netflix, Cisco, DataStax, Intel, IBM等)日渐接受。Samza主要在LinkedIn公司使用。Flink是一个新兴的项目,很有前景。你可能对项目的贡献者数量也感兴趣。Storm和Trident大概有180个代码贡献者;整个Spark有720多个;根据github显示,Samza有40个;Flink有超过130个代码贡献者。

    High level API:具有high level API的流处理框架会更简洁和高效;

    Storm:Storm非常适合任务量小但速度要求高的应用。如果你主要在意流处理框架的延迟性,Storm将可能是你的首先。但同时也要记住,Storm的容错恢复或者Trident的状态管理都会降低整体的性能水平。也有一个潜在的Storm更新项目-Twitter的Heron,Heron设计的初衷是为了替代Storm,并在每个单任务上做了优化但同时保留了API。

    Spark Streaming:如果你得基础架构中已经设计到Spark,那Spark Streaming无疑是值得你尝试的。因为你可以很好的利用Spark各种library。如果你需要使用Lambda架构,Spark Streaming也是一个不错的选择。但你要时刻记住微批处理的局限性,以及它的延迟性问题。

    Samza:如果你想使用Samza,那Kafka应该是你基础架构中的基石,好在现在Kafka已经成为家喻户晓的组件。像前面提到的,Samza一般会搭配强大的本地存储一起,这对管理大数据量的状态非常有益。它可以轻松处理上万千兆字节的状态信息,但要记住Samza只支持at least once语义。

    Flink:Flink流处理系统的概念非常不错,并且满足绝大多数流处理场景,也经常提供前沿的功能函数,比如,高级窗口函数或者时间处理功能,这些在其它流处理框架中是没有的。同时Flink也有API提供给通用的批处理场景。但你需要足够的勇气去上线一个新兴的项目,并且你也不能忘了看下Flink的roadmap。

     

    展开全文
  • flink的批处理和流处理

    千次阅读 2019-04-17 10:40:28
    1.流处理系统 2.批处理系统 3.flink的流处理和批处理 4.flink的流处理和批处理代码的区别 流处理系统与批处理系统最大不同在于节点间的数据传输方式 1.流处理系统 流处理系统,其节点间数据传输的标准模型...
  • 流处理基本介绍

    万次阅读 多人点赞 2017-01-23 10:14:56
    1. 什么是流处理 一种被设计来处理无穷数据集的数据处理系统引擎 2. 流处理的几个概念 1. 无穷数据(Unbounded data):一种持续生成,本质上是无穷尽的数据集。它经常会被称为“流数据”。然而,用流和批次来...
  • 而Flink专注的是无限流处理,那么他是怎么做到批处理的呢? 无限流处理:输入数据没有尽头;数据处理从当前或者过去的某一个时间 点开始,持续不停地进行 另一种处理形式叫作有限流处理,即从某一个时间点开始处理...
  • Flink运行时之流处理程序生成流图

    千次阅读 2017-02-05 22:11:01
    DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph)。
  • flink流处理demo

    千次阅读 2018-11-14 14:04:56
    flink流处理demo import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache....
  • Flink的流处理与批处理 Flink通过执行引擎,能够同时支持批处理与流处理任务。 在执行引擎这一层,流处理系统与批处理系统最大的不同在于节点的数据传输方式。 流处理系统 对于一个流处理系统,其节点间数据传输...
  • Flink流处理之迭代案例

    千次阅读 2016-11-29 20:14:27
    当前Flink将迭代的重心集中在批处理...但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显。
  • 大数据之实时流处理常用框架

    万次阅读 2018-05-07 10:53:59
    实时流处理简单概述:实时是说整个流处理相应时间较短,流式技算是说数据是源源不断的,没有尽头的。实时流处理一般是将业务系统产生的数据进行实时收集,交由流处理框架进行数据清洗,统计,入库,并可以通过可视化...
  • 主流流处理框架比较

    千次阅读 2019-03-29 20:21:20
    分布式流处理是对无边界数据集进行连续不断的处理、聚合和分析。它跟 MapReduce 一样是一种通用计算,但我们期望延迟在毫秒或者秒级别。这类系统一般采用有向无环图(DAG)。 DAG 是任务链的图形化表示,我们用它来...
  • java8的流处理

    千次阅读 2017-12-29 01:54:52
    java8 Stream 流处理 map filter reduce
  • 大数据流处理框架介绍

    千次阅读 2018-10-26 19:52:25
    实时流处理简单概述:实时是说整个流处理相应时间较短,流式技算是说数据是源源不断的,没有尽头的。实时流处理一般是将业务系统产生的数据进行实时收集,交由流处理框架进行数据清洗,统计,入库,并可以通过可视化...
  • Kafka基础-流处理

    千次阅读 2018-10-26 17:10:20
    1. 什么是流处理? 首先,让我们说一下什么是数据流(也称为事件流)?它是无边界数据集的抽象说法,无边界意味着无限且不断增长,因为随着时间的推移,新数据会不断地到来。 除了无边界的特性之外,事件流模型...
  • Flink流处理入门和socket发送数据

    千次阅读 2019-08-02 17:41:01
    Flink流处理入门和socket发送数据Flink初步使用Flink流处理程序运行前提Socket发送数据 这块笔记是学习陈世敏老师的大规模数据和大数据系统分析时候做的大作业的一块内容,其中题目是Flink和Spark调研,研究大数据的...
  • Flink流处理之迭代任务

    千次阅读 2016-12-12 21:21:00
    前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型。这一篇我们将剖析运行时的流处理迭代任务的执行机制。这里涉及到两个任务类:
  • 什么是流处理流处理实现了什么效果) 流处理的设计思想 有哪些流处理技术并分析其优缺点、实用场景 具体在Spark Streaming上体现的优势不足 总结 转载:实时处理与流处理 ...
  • Flink流计算编程--流处理引擎的选型

    千次阅读 2016-09-14 17:00:53
    流处理引擎 Flink
  • Kafka数据流:让流处理更轻松

    千次阅读 2016-03-16 17:53:58
    Kafka Streams是一个使用Apache Kafka用于构建分布流处理应用的Java库。这将是即将更新Kafka-0.10版本的一部分,并且已经提供可以很容易试用的预览版。   使用Kafka Stream构建一个流处理应用如下所示:   ...
  • [Flink基础]--什么是流处理

    千次阅读 2018-09-29 12:56:46
    什么是流处理? Data Artisans由ApacheFlink®的原始创建者创建,我们花了很长时间来解决流处理领域的问题。在这篇介绍性文章中,我们将提供有关流处理和Apache Flink适合的视角。要了解更多信息,您可以下载有关...
  • Flink流处理过程的部分原理分析

    万次阅读 2018-12-19 23:20:07
    文章目录前言 前言 ...笔者做为一个研究存储模块出身的人,最近在研读Flink流处理的部分原理,小小作番总结。很多时候,以存储的眼光来看待计算过程中的处理过程,还是有很多不一样的地方的。 ...
  • Spark持续流处理与Flink比对

    千次阅读 2018-11-19 19:41:17
    Spark流处理 Spark从2.3版本开始引入了持续流式处理模型,可将流处理延迟降低至毫秒级别,让 Structured Streaming 达到了一个里程碑式的高度;使用 Pandas UDF 提升 PySpark 的性能;为 Spark 应用程序提供 ...
  • 最近正好把几片big data application方面的文章读完,...Storm twitter的流处理 https://lschacker.gitbooks.io/bigdata-intro/content/storm.html StreamScope / rStream 微软的流处理 https://lschacker.git
  • java 8 流处理字符串

    千次阅读 2018-09-24 12:11:21
    java 8 流处理字符串 java 8 引入新的Stream api,方便我们使用声明方式处理数据。本文我们将说明如何使用Stream api分割逗号分割字符串至list,以及如何连接字符串数组至逗号分割字符串,也会介绍如何使用Stream api...
  • Apache Flink流处理(一)

    千次阅读 2018-11-26 14:39:53
    Apache Flink是一个分布式流处理器,它使用直接且富有表现力的API来实现有状态的流处理程序。它以容错的方式高效地大规模运行这类应用程序。Flink于2014年4月加入Apache软件基金会作为孵化项目,并于2015年1月成为...
  • 大数据架构及流处理架构

    千次阅读 2015-07-08 13:50:19
    一、大数据架构 二、流处理架构 说明: 参考文档:《Gearpump 基于Akka的新一代流处理引擎》
  • STL的流处理类学习

    千次阅读 2017-02-18 17:44:53
    之前一直用STL提供的流处理类写一些代码,但也没有认真总结过,对于很多东西还是不理解,这两天看了一下做一些总结,总体的类图关系如下: 下面是借鉴网上的一张图,哪些类包含在哪里也写清楚了 这些流处理类...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 88,268
精华内容 35,307
关键字:

流处理