精华内容
下载资源
问答
  • Flink原理与实践
    2021-03-11 12:49:08

    初识Flink

    环境搭建和部署

    入门案例

    1,统计词出现格式
    1.1,使用Lambda表达式

    public class WordCount01 {
    	public static void main(String[] args) {
    		// 设置用户名在Linux下运行,指定HADOOP用户权限
    		System.setProperty("HADOOP_USER_NAME","root");
    		//env
    		StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
    		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动匹配,流,批
    		
    		//source
    		DataStream<String> lines= env.fromElements("welcome to flink world", "welcome to flink", "welcome flink");
    		//transformation--etl
    		// 切割
    		DataStream<String>  words= words.flatMap(String word, Collector<String> out->{
    		Arrays.stream(word.split(" ")).foreach(out:collect)
    		}).returns(Type.STRING);
    		// 标记
    		DataStream<Tuple2<String,Integer>> wordAndOne = words.map((String value) -> Tuple2.of(value, 1))
        	.returns(Types.TUPLE(Types.STRING, Types.INT));
    	    // 分组
    	    KeyedStream<Tuple2<String,Integer>,String> grouped = wordAndOne.keyBy((KeySelector<Tuple2<String,Integer>,String>)t->t.f0);
    	    // 统计
    	    SingleOutputStreamOperator<String> result = grouped.sum(1);
    		//sink
    		//输出
    		env.print();
    		//启动并执行
    		env.execute("word count");
    	}
    }
    
    
    更多相关内容
  • Flink原理与实践-Flink的部署和配置.pdf
  • Flink原理讲解

    2019-03-14 19:29:34
    在介绍Flink前,让我们先回顾一下在处理数据时可能遇到的数据类型以及可以选择进行处理这些数据类型所选择执行模型的类型。下面有两个观点经常容易混淆,很有必要去澄清它们
  • 我们知道目前流处理的主要流行的计算引擎有,Storm,SparkStreaming。...Flink的出现完美的解决了以上问题,这也是flink出现的原因,flink不仅能提供同时支持高吞吐和exactly-once语义的实时计算,
  • Flink原理解析

    2021-10-29 09:50:27
    1.Flink是什么 ​Flink是新一代分布式流式计算引擎,用于对海量数据进行实时处理和计算,具备快速容错(支持每条消息恰好处理一次)、流批一体、低延时、支持数据乱序的特点。 ​ ​Flink的主要应用场景如下: 数据...

    1.Flink是什么

    ​Flink是新一代分布式流式计算引擎,用于对海量数据进行实时处理和计算,具备快速容错(支持每条消息恰好处理一次)、流批一体、低延时、支持数据乱序的特点。

    ​ ​Flink的主要应用场景如下:

    • 数据分析场景:流式计算平台相对于传统的基于批的计算平台具备最大优点就是实时性,典型的应用场景就是淘宝的双十一大屏和一些实时性要求比较高的数据看班
    • 事件驱动场景:相比于MetaQ或其他消息队列,​Flink可以基于​Flink Sql或者其他API进行一些复杂的计算或者过滤操作,比如基于用户行为触发风控报警等
    • ETL场景:这个比较容易理解,相对于传统的ETL工具,blink更为灵活

    2.设计原理

    容错机制

    ​ 分布式环境下的流式处理平台相对于批处理平台而言,一个非常重要的问题是如何保证分布式计算节点在发生故障并恢复后最终的计算结果是正确的。在批处理的模式下,数据有界,在任务开始前我们就可以得到需要计算的所有数据,如果节点出错,最坏的情况下只需要对数据集进行重新计算,但在流式处理模式下,数据是源源不断产生并且无界的,节点故障恢复后无法从头开始重新计算。

    分布式快照

    ​ 在网络服务中,快照是比较常用的一个容错机制,比如Redis中就有基于RDB进行数据恢复的策略,flink的快速容错机制也是基于快照的方式实现的。在单机环境下,保存快照比较简单,只需要在某个时间点暂停任务处理并将当前状态持久化即可,但在分布式系统中,由于没有一个全局时钟,想要同时对所有计算节点的状态进行保存是很难做到的(要详细了解可以参考附录:分布式系统中的时钟)。最简单的实现方式是通过类似2PC的方式将所有节点任务都停止并进行状态保存,最后统一上报,但这这种stop the world的方式会极大的增加计算延时和降低吞吐量。flink最终的解决方案是基于Chandy-Lamport算法改进而来的Asynchronous Barrier Snapshotting(异步屏障快照)算法

    Chandy-Lamport

    ​ chandy-lamport算法的核心思想是将分布式系统抽象为一个有向图,每个分布式节点作为顶点,节点与节点之间的通信信道被抽象为图的有向边(input channel,output channel),在发起快照时,每个节点会记录自身状态和input channel的状态,最终可以得到一个全局一致的快照,下面用一个例子详细解释。

    ​ 如下图所示,一个分布式系统中有两个节点P1和P2,两个节点分别有X,Y,Z三个状态,通过C12和C21两个信道可以互相发送消息

    ​ 首先P1发起了一个快照操作(同时可以上报到一个全局的快照协调系统),他会先保存自身状态[X1:0,Y1:0,Z1:0],并向下游发送一个特殊消息marker,用于通知下游节点也进行快照操作,与此同时,P2通过C21向P1发送了一条业务消息

    ​ 此时由于P1已经完成了快照,在P2节点未完成快照之前不可以进行消息处理,否则最后生成的全局快照会不一致,此时需要将它的输入信道C21中的所有消息(目前只有M1)也追加到快照中,与此同时P2接收到了P1过来的marker信息,同样进行了快照操作

    ​ 最后P1接收到来自P2的marker信息,此时两个节点快照都保存完毕(同样可以上报到一个全局的快照协调系统,告知快照操作结束),可以认为全局快照生成成功


    ​ 可以看到整体的算法还是比较简单易懂的,但要注意的是,此算法要求网络的传输是有序的,可靠的(很容易就会想到TCP协议),需要严格保证消息和marker是按发送顺序到达其他节点。

    Asynchronous Barrier Snapshotting

    ​ Chandy-Lamport虽然理解起来不复杂,但存在两个比较大的问题。1是在进行快照时节点不仅需要保存自身状态,还要保存所有input channer中的消息,这会导致快照体积比较大,并且基于快照进行恢复时效率低,2是整个系统的运行效率取决于最慢的计算节点。flink基于Chandy-Lamport算法进行了改进,它的核心思想如下

    ​ 每当需要生成全局快照时,流的source在产生的消息是插入一个带ID的特殊信息,称之为barrier(屏障),同时source会向快照协调者上报快照生成操作已启动,并记录自身offset(用于故障回复之后流的重放),这些屏障会将整个数据流切分为不同的数据集,当下游的处理节点处理到屏障时会在本地保存一份快照,并向更下游广播带有相同ID的屏障,最终当sink(流的输出端)也处理到屏障时,向协调者上报快照生成成功,还是以一个例子进行说明

    如图所示,上游节点的快照已保存成功并且向图中的task节点输出了两个数据流,由于数据流1的处理速度较快,它的屏障N先到达了图中的task处,此时节点需要暂时停止处理数据流1的消息并将消息保存在本地缓冲中,等到数据流2的屏障N也到达task时,task会先保存本地状态(快照)同时向下游广播屏障N,这个操作被称之为屏障的对齐,最终sink(输出)也进行屏障对齐之后,会向快照协调者上报全局快照生成成功,此时整个分布式系统,包括图中的task节点和上游的节点保存的快照是一致的,即只包含对于屏障N之前的消息的处理状态,如果发生故障,各节点快照中读取状态,同时source屏障N的Offset处进行消息回放即可

    Asynchronous Barrier Snapshotting算法解决了需要保存额外消息的问题,但和Chandy-Lamport算法一样,会导致整个分布式系统的计算效率受限于整个系统中最慢的节点(需要进行屏障对齐),所以Blink/Flink支持关闭屏障对齐,在这种模式下,如果数据流1的屏障先到达,task还是会继续处理数据流1的后续数据,只是在数据流2的屏障到达时,会将两个时间段内处理的消息一起保存在快照中。在Blink中,快照被称为Check Point

    批处理和支持乱序

    Flink中的时间和窗口

    ​ 在​Flink中,通过将批看作是有界的流实现了对批数据的处理(spark刚好相反,它将流看作是微形的批实现了流处理),而时间是用于将流转换为批的重要工具,在实时计算应用中有很多基于时间的计算场景,比如每隔一分钟统计最近五分钟内买家发消息数量等。在blink中,我们可以使用的时间有三种:

    • Event Time(事件时间):表示事件在产生它的设备上发生的时间,在进入Blink计算平台时就已确定且不可修改,不受进入Blink的延时和不同计算节点的机器时间和计算效率的影响。它能准确反映事件发生的先后顺序。
    • Process Time(处理时间):每个计算节点对事件进行处理的时间,因为每个节点所处的环境,性能的差异,在计算某个时间段内的数据时,计算结果是不稳定的。它反映的是事件在Blink中的处理先后顺序。
    • Ingestion time(事件进入事件):指事件进入Blink的时间,在Source处产生,与Process Time相比它不受计算节点的环境影响,在事件进入Blink时就已确定。它反映的是事件进入Blink的先后顺序。

    基于这三种时间,​Flink为我们提供了以下时间窗口(暂时不讲CountWindow):

    • 固定窗口:按固定时间点拆分的窗口,比如统计一天的数据
    • 滑动窗口:按窗口大小和滑动周期定义,比如上面说到的每隔一分钟统计最近五分钟买家发消息数
    • 会话窗口:在数据子集上捕捉一段时间内的活动,比如按超时时间定义。举个例子,用户登录30分钟无事件为超时,计算每次登录的发消息数

    处理乱序数据

    ​ 前面提到,Blink支持三种时间,其中在使用Ingestion time(事件进入时间)和EventTime(事件事件)进行计算时都可能会发生数据乱序的问题,即由于各节点所处的网络环境,硬件性能等,事件实际计算时间的先后顺序和事件实际发生或进入Blink时间的先后顺序可能是不一致的。


    如图所示,图中的数字表示时间,对流数据按时间的奇偶进行了分组计算,在进行一次并行计算之后,由于不同节点计算速度不同,原本的顺序的数据在task4处也可能变为乱序。而在处理乱序数据时,基于窗口的计算结果很可能是错误的。原因是在数据流乱序的情况下,节点无法确认在时间窗口结束时是否还会有原本位于时间窗口内的数据到来。

    ​ 如图所示,​Flink无法确认在事件4到达之后是否还会有更早的事件到来,为了解决这个问题,​Flink引入了Water Mark(水位线机制),简单来讲,Water Mark定义了​Flink在数据乱序的情况下,是否需要等待更早的数据以及需要等待多久的问题,以上图的场景为例,节点可以选择在窗口时间结束之后等待一段时间再触发窗口计算,一定程度上防止更早数据未被处理。当然,水位线的选择必须要谨慎,如果等待的时间过长,遗漏更早事件的可能性虽然会减少,但需要缓存的事件和等待时间会增加,会导致延时和吞吐量降低,如果等待的时间过短,遗漏掉更早事件的概率可能更大。但无论如何,数据的遗漏是无法避免的,Blink把窗口计算结束后到来的,早于水位线的事件称为迟到事件,我们可以对迟到数据进行单独处理,Blink提供了如下三种策略:

    • 重新激活已关闭的窗口并进行重新计算修正结果
    • 将迟到事件收集起来单读处理
    • 丢弃迟到事件

    ​Flink默认处理方式为【丢弃迟到事件】

    3.技术架构

    ​ 在了解到​Flink的两个核心原理之后,最后介绍一下整体的技术架构。​Flink整体是基于主从模式,JobManager相当于Master,TaskManager是Slave。计算任务(基于StreamAPI或者其他API开发的代码)在客户端转化为JobGraph(多节点chain为一个节点,减少不同节点之间数据流动带来的性能损耗)后会提交到JobManager,JobManager将其转化为可执行的Execution Graph(主要是对JobGraph进行了并行化操作),最终通过Resource Manager将具体的执行任务调度到TaskManager上去执行。JobManager还负责触发checkPoint(就是前面说到的全局快照)

    当然,这种模式下JobManager会存在单点问题,flink提供了两种高可用模式,第一种是Standalone,在这种模式下同时会启动多个JobManager并通过Zookeeper选举出一个Master,当JobManager发生故障时,会选举出新的Master。第二种是基于Yarn的模式,启动一个JobManger,当发生故障时会自动进行重启

    展开全文
  • 整个集群的master节点,负责整个flink集群的任务调度和资源管理,整个集群有且仅有一个活跃的JobManager。 从客户端获取提交的应用,根据TaskManager上TaskSlot使用情况,为提交的作业分配TaskSlot资源,并命令...

    参考《Flink原理、实战与性能优化》

    基本架构

    在这里插入图片描述

    client

    将作业提交到jobmanager
    用户没提交一个flink程序就会创建一个client,client会将flink程序翻译成一个JobGraph

    JobManager

    整个集群的master节点,负责整个flink集群的任务调度和资源管理,整个集群有且仅有一个活跃的JobManager。
    从客户端获取提交的应用,根据TaskManager上TaskSlot使用情况,为提交的作业分配TaskSlot资源,并命令TaskManager启动应用。
    所有checkpoints协调过程都在JobManager完成,每个taskmanager收到触发命令后,完成checkpoints操作。

    TaskManager

    负责具体任务执行(计算)和对应任务在每个节点资源的申请和管理

    flink编程模型

    source - transformation -sink

    Flink数据类型

    1.任意Java原生基本数据类型(装箱)
    2.任意Java原生基本数据类型(装箱)数组
    3.java Tuples
    4.Scala Case Class
    5.POJO

    DataStream API

    DataStream 转换操作

    • Mpa [DataStream - > DataStream]
      应用于对数据格式的变化
    • FlatMpa [DataStream - > DataStream]
      应用于一个元素产生一个或多个元素
    • Filter[DataStream - > DataStream]
      过滤
    • KeyBy [DataStream - > KeyedStream]
      按key分区
      keyby(0)//按第一个字段为分区key
    • Reduce [ KeyedStream- > DataStream]
      将KeyedStream按照自定义的reducefunction聚合
    • Aggregations [ KeyedStream- > DataStream]
      对reduce的封装,有sum、min、minby、max、maxby等
    • Union [DataStream - > DataStream]
      两个或多个stream合并成一个
    • Split [DataStream - > SplitStream]
      将一个datastream按条件拆分
    • Select [SplitStream- > DataStream ]
      对Select的结果数据集筛选
    • Iterate [DataStream - > IterativeStream - > DataStream]
      将datastream中满足条件的进行下一次迭代,不满足则发到下游datastream中
    • 物理分区操作 [SplitStream- > DataStream ]
      用于防止数据倾斜
      随机分区(Random Partitioning)、R Partioning、Rescaling Partioning 、广播操作(Broadcasting)、自定义分区(Custom Partitioning)

    时间概念与Watermark

    flink中时间分为三个概念
    事件生成时间(Event Time):在设备上发生事件的时间
    事件接入时间(Ingestion Time):接入flink系统的时间
    事件处理时间(Processing Time):执行算子的时间

    EventTime 和 Watermark
    原因:防止由于网络或系统等因素,造成事件数据不能及时到flink系统。
    简述:设置Watermark(最大延迟间隔),如果数据没有全部到达,则一直等待。

    EventTime和Watermark
    Flink的时间与watermarks详解
    在这里插入图片描述
    在这里插入图片描述

    Flink状态管理和容错

    有状态的计算
    flink程序运行中存储中间计算结果给后面的算子、function使用。存储可以是flink堆内对外内存或第三方存储介质。同理无状态计算不保存中间结果
    在这里插入图片描述
    适用场景
    用户想获取某一特定事件规则的时间、按照时间窗口求最大值、机器学习、使用历史数据计算等

    Flink状态类型
    根据数据集中是否按key分区分为KeyedState、OperatorState(NonkeyedState),这两种状态均具有两种形式,托管状态(ManagedState)形式 和 原生状态(RawState)形式

    ManagedState托管状态由Flink Runtime控制管理,将状态转换为内存Hash tables或RocksDB通过内部接口持久化到checkpoints
    RawState原生状态算子自己管理数据结构将数据转化成bytes存储在checkpoints中

    Checkpoints和Savepoints
    异步轻量级分布式快照技术Checkpoints容错机制:

    • 分布式快照可以将同一时间点Task/Operator状态数据全局统一快照处理。
    • flink在输入的数据集上间隔性生成checkpointbarrier,出现异常时,就能从上一个快照恢复算子之前的状态,保持数据一致性。
    • checkpoints一些配置:exactlyance(保证数据质量)和atleastonce(保证吞吐)语义选择、checkpoint超时时间、检查点间最小间隔(避免数据堆积)、并行检查点、外部检查点(持久化到外部系统)、checkpoint失败任务是否关闭

    以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中的Savepoint机制:

    • 目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况

    Flink中提供了StateBackend来存储和管理Checkpoints过程中的状态数据:包括基于内存的MemoryStateBackend、基于文件系统的FsStateBackend,以及基于RockDB作为存储介质的RocksDBStateBackend

    checkpoint优化

    1. 减小时间间隔
    2. 状态容量预估
    3. 异步snapshot
    4. 状态数据压缩
    5. CheckpointDelayTime

    环境部署

    官网下jar包:https://flink.apache.org/zh/downloads.html
    注意,根据Hadoop、scala版本选择flink,避免包冲突
    本地启动flink:

    ./flink-1.7.1/bin/start-cluster.sh
    

    本地停止flink:

    ./flink-1.7.1/bin/stop-cluster.sh
    

    检验启动成功(默认端口8081):
    http://localhost:8081/
    在这里插入图片描述

    代码

    wordcount code

    maven 依赖

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <!-- This dependency is required to actually execute jobs. It is currently pulled in by
                    flink-streaming-java, but we explicitly depend on it to safeguard against future changes. -->
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    

    java code

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class wordcount {
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource<String> text = env.fromElements(
                    "To be, or not to be,--that is the question:--",
                    "Whether 'tis nobler in the mind to suffer",
                    "The slings and arrows of outrageous fortune",
                    "Or to take arms against a sea of troubles,"
            );
            DataSet<Tuple2<String,Integer>> counts =
                    text.flatMap(new LineSplitter())
                            .groupBy(0)
                    .sum(1);
            counts.print();
    
        }
    
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
    
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    
                String[] tokens = value.toLowerCase().split("\\W+");//^\w中\w表示字符类(包括大小写字母,数字),后面的+号的作用在前一个字符上,即\w+,表示一个或多个\w,最少一个
    
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }
    }
    

    如何用flink启动

    ~/soft/flink-1.7.1/bin/flink run -c 包名.类名 jar包名.jar
    

    Datastream Demo Code

    /*
    运行命令:
    cd ~/工作/idea_project/lazy_project/java_project/demo && mvn clean install
    cd ~/工作/idea_project/lazy_project/java_project/demo/target && ~/soft/flink-1.7.1/bin/flink run -c base.dataStream.StreamingDemoWithMyRichPralalleSource demo-1.0-SNAPSHOT.jar
    */
    package base.dataStream;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    public class StreamingDemoWithMyRichPralalleSource {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    //        DataStreamSource<Long> streamSource = executionEnvironment.addSource(new MyParallesource()).setParallelism(1);
            DataStreamSource<Long> streamSource = executionEnvironment.addSource(new MyNoParalleSource()).setParallelism(1);
            SingleOutputStreamOperator<Long> operator = streamSource.map(new MyMapFunction());
            operator.timeWindowAll(Time.seconds(2)).sum(0).print().setParallelism(1);
            String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName();
            executionEnvironment.execute(jobName);
        }
    
    
    }
    class MyParallesource implements ParallelSourceFunction<Long>{
        private long count = 1L;
        private boolean isRunning = true;
    
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while (isRunning){
                sourceContext.collect(count ++);
                Thread.sleep(1000);
            }
        }
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
    
    //使用并行度为1的source
    class MyNoParalleSource implements SourceFunction<Long> {//1
    
        private long count = 1L;
    
        private boolean isRunning = true;
    
        /**
         * 主要的方法
         * 启动一个source
         * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Long> ctx) throws Exception {
            while(isRunning){
                ctx.collect(count);
                count++;
                //每秒产生一条数据
                Thread.sleep(1000);
            }
        }
        //取消一个cancel的时候会调用的方法
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
    
    class MyMapFunction implements MapFunction<Long,Long>{
        @Override
        public Long map(Long value) throws Exception {
            System.out.println("接收到数据:" + value);
            return value;
        }
    }
    

    Kafka connector Demo Code

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>1.7.0</version>
            </dependency>
    
    /*
    启动本地zk,并查看状态,仅执行一次即可:
            sh ~/soft/zookeeper-3.4.9/bin/zkServer.sh start && sh ~/soft/zookeeper-3.4.9/bin/zkServer.sh status
    停止运行本地zk:
            sh ~/soft/zookeeper-3.4.9/bin/zkServer.sh stop
    */
    
    /*
    启动本地kafka(先启动zookeeper):
            nohup sh ~/soft/kafka_2.11-1.0.1/bin/kafka-server-start.sh ~/soft/kafka_2.11-1.0.1/config/server.properties &
    查看是否成功(监听端口):
            lsof -i:9092
    关闭本地kafka:
            sh ~/soft/kafka_2.11-1.0.1/bin/kafka-server-stop.sh
    */
    
    /*
    创建kafka topic(topic_name需要给为用户定义的topic名字):
            sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 【topic_name】 ; sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --list --zookeeper localhost:2181
    删除topic
            sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 【topic_name】 && sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --list --zookeeper localhost:2181
    查看topic list:
            sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --list --zookeeper localhost:2181
    */
    
    /*
    
    测试kafka生产消费:
    创建控制台生产者:
            sh ~/soft/kafka_2.11-1.0.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 【topic_name】
    创建消费者:
            sh ~/soft/kafka_2.11-1.0.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 【topic_name】 --from-beginning
    
    */
    
    package base.dataStream;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.util.ArrayList;
    import java.util.Properties;
    import java.util.Random;
    
    public class kafkaDataStream1 {
        public static final String TOPIC_NAME = "topic_test";
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            return properties;
        }
    
        public static FlinkKafkaProducer<String> getProducer(){
            Properties properties = getProperties();
            FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(kafkaDataStream1.TOPIC_NAME, new SimpleStringSchema(), properties);
            return producer;
        }
    
        public static FlinkKafkaConsumer<String> getConsumer(){
            return new FlinkKafkaConsumer(TOPIC_NAME,new SimpleStringSchema(),getProperties());
        }
    }
    
    class KafkaProducerDemo1{
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> source = executionEnvironment.addSource(new MyNoParalleSourceDemo1());
    
            FlinkKafkaProducer<String> producer = kafkaDataStream1.getProducer();
            source.addSink(producer);
            executionEnvironment.execute();
        }
    }
    
    class KafkaConsumerDemo1{
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            FlinkKafkaConsumer<String> consumer = kafkaDataStream1.getConsumer();
            consumer.setStartFromLatest();
    
            env.addSource(consumer).print();
            env.execute();
        }
    }
    
    class MyNoParalleSourceDemo1 implements SourceFunction<String> {
        boolean isRuning = true;
    
        @Override
        public void run(SourceContext<String> sourceContext) throws Exception {
            while (isRuning == true){
                ArrayList<String> list = new ArrayList<>();
                list.add("第1条");
                list.add("第2条");
                list.add("第3条");
                list.add("第4条");
                list.add("第5条");
                int i = new Random().nextInt(5);
                System.out.println("============"+list.get(i));
                sourceContext.collect(list.get(i));
                Thread.sleep(2000);
            }
        }
    
        @Override
        public void cancel() {
            isRuning = false;
        }
    }
    
    
    
    展开全文
  • Flink技术原理

    2018-05-10 11:49:52
    华为教程,内容由浅入深,适合各个层次学习,欢迎大家讨论
  • Flink原理和实践.pptx

    2022-04-10 16:37:51
    Flink原理和实践.pptx
  • Flink原理.doc

    2019-05-24 10:00:13
    Flink原理原理全面讲解,是不可多得的flink资源,很值得学习
  • 又讲解了Flink的时间概念、Window的实现原理及其代码解析,Flink的容错机制原理Flink容错的关键设计、代码实现分析,Flink Job从源码到执行整个过程的解析,Flink Job的调度策略、资源管理策略、内存管理、数据...

    《Flink内核原理与实现》从系统整体视角出发,既讲解了Flink的入门、安装、流计算开发入门、监控运维等基础知识,又讲解了Flink的时间概念、Window原理及其代码解析,Flink容错机制原理、关键设计及其代码实现分析,作业从源码到执行整个过程的解析, 作业的调度策略、资源管理、类型和序列化系统、内存管理、类数据交换的关键设计和代码实现分析,RPC通信框架等深度内容。

    适合对实时计算感兴趣的大数据开发、运维领域的从业人员阅读,此外对机器学习工程技术人员也有所帮助。

     

    目录

    前言

    第1章 Flink入门

    1.1 核心特点

    1.1.1 批流一体

    1.1.2 可靠的容错能力

    1.1.3 高吞吐、低延迟

    1.1.4 大规模复杂计算

    1.1.5 多平台部署

    1.2 架构

    1.2.1 技术架构

    1.2.2 运行架构

    1.3 Flink的未来

    1.4 准备工作

    1.5 总结

    第2章 Flink应用

    2.1 Flink应用开发

    2.2 API层次

    2.3 数据流

    2.4 数据流API

    2.4.1 数据读取

    2.4.2 处理数据

    2.4.3 数据写出

    2.4.4 旁路输出

    2.5 总结

    第3章 核心抽象

    3.1 环境对象

    3.1.1 执行环境

    3.1.2 运行时环境

    3.1.3 运行时上下文

    3.2 数据流元素

    3.3 数据转换

    3.4 算子

    3.4.1 算子行为

    3.4.2 Flink算子

    3.4.3 Blink算子

    3.4.4 异步算子

    3.5 函数体系

    3.5.1 函数层次

    3.5.2 处理函数

    3.5.3 广播函数

    3.5.4 异步函数

    3.5.5 数据源函数

    3.5.6 输出函数

    3.5.7 检查点函数

    3.6 数据分区

    3.7 连接器

    3.8 分布式ID

    3.9 总结

    第4章 时间与窗口

    4.1 时间类型

    4.2 窗口类型

    4.3 窗口原理与机制

    4.3.1 WindowAssigner

    4.3.2 WindowTrigger

    4.3.3 WindowEvictor

    4.3.4 Window函数

    4.4 水印

    4.4.1 DataStream Watermark生成

    4.4.2 Flink SQL Watermark生成

    4.4.3 多流的Watermark

    4.5 时间服务

    4.5.1 定时器服务

    4.5.2 定时器

    4.5.3 优先级队列

    4.6 窗口实现

    4.6.1 时间窗口

    4.6.2 会话窗口

    4.6.3 计数窗口

    4.7 总结

    第5章 类型与序列化

    5.1 DataStream类型系统

    5.1.1 物理类型

    5.1.2 逻辑类型

    5.1.3 类型推断

    5.1.4 显式类型

    5.1.5 类型系统存在的问题

    5.2 SQL类型系统

    5.2.1 Flink Row

    5.2.2 Blink Row

    5.2.3 ColumnarRow

    5.3 数据序列化

    5.3.1 数据序列化/反序列化

    5.3.2 String序列化过程示例

    5.3.3 作业序列化

    5.3.4 Kryo序列化

    5.4 总结

    第6章 内存管理

    6.1 自主内存管理

    6.2 内存模型

    6.2.1 内存布局

    6.2.2 内存计算

    6.3 内存数据结构

    6.3.1 内存段

    6.3.2 内存页

    6.3.3 Buffer

    6.3.4 Buffer资源池

    6.4 内存管理器

    6.4.1 内存申请

    6.4.2 内存释放

    6.5 网络缓冲器

    6.5.1 内存申请

    6.5.2 内存回收

    6.6 总结

    第7章 状态原理

    7.1 状态类型

    7.1.1 KeyedState与OperatorState

    7.1.2 原始和托管状态

    7.2 状态描述

    7.3 广播状态

    7.4 状态接口

    7.4.1 状态操作接口

    7.4.2 状态访问接口

    7.5 状态存储

    7.5.1 内存型和文件型状态存储

    7.5.2 基于RocksDB的StateBackend

    7.6 状态持久化

    7.7 状态重分布

    7.7.1 OperatorState重分布

    7.7.2 KeyedState重分布

    7.8 状态过期

    7.8.1 DataStream中状态过期

    7.8.2 Flink SQL中状态过期

    7.8.3 状态过期清理

    7.9 总结

    第8章 作业提交

    8.1 提交流程

    8.1.1 流水线执行器PipelineExecutor

    8.1.2 Yarn Session提交流程

    8.1.3 Yarn Per-Job提交流程

    8.1.4 K8s Session提交流程

    8.2 Graph总览

    8.3 流图

    8.3.1 StreamGraph核心对象

    8.3.2 StreamGraph生成过程

    8.3.3 单输入物理Transformation的转换示例

    8.3.4 虚拟Transformation的转换示例

    8.4 作业图

    8.4.1 JobGraph核心对象

    8.4.2 JobGraph生成过程

    8.4.3 算子融合

    8.5 执行图

    8.5.1 ExecutionGraph核心对象

    8.5.2 ExecutionGraph生成过程

    8.6 总结

    第9章 资源管理

    9.1 资源抽象

    9.2 资源管理器

    9.3 Slot管理器

    9.4 SlotProvider

    9.5 Slot选择策略

    9.6 Slot资源池

    9.7 Slot共享

    9.8 总结

    第10章 作业调度

    10.1 调度

    10.2 执行模式

    10.3 数据交换模式

    10.4 作业生命周期

    10.4.1 作业生命周期状态

    10.4.2 Task的生命周期

    10.5 关键组件

    10.5.1 JobMaster

    10.5.2 TaskManager

    10.5.3 Task

    10.5.4 StreamTask

    10.6 作业启动

    10.6.1 JobMaster启动作业

    10.6.2 流作业启动调度

    10.6.3 批作业调度

    10.6.4 TaskManger启动Task

    10.7 作业停止

    10.8 作业失败调度

    10.8.1 默认作业失败调度

    10.8.2 遗留的作业失败调度

    10.9 组件容错

    10.9.1 容错设计

    10.9.2 HA服务

    10.9.3 JobMaster的容错

    10.9.4 ResourceManager容错

    10.9.5 TaskManager 的容错

    10.10 总结

    第11章 作业执行

    11.1 作业执行图

    11.2 核心对象

    11.2.1 输入处理器

    11.2.2 Task输入

    11.2.3 Task输出

    11.2.4 结果分区

    11.2.5 结果子分区

    11.2.6 有限数据集

    11.2.7 输入网关

    11.2.8 输入通道

    11.3 Task执行

    11.3.1 Task处理数据

    11.3.2 Task处理Watermark

    11.3.3 Task处理StreamStatus

    11.3.4 Task处理LatencyMarker

    11.4 总结

    第12章 数据交换

    12.1 数据传递模式

    12.2 关键组件

    12.2.1 RecordWriter

    12.2.2 数据记录序列化器

    12.2.3 数据记录反序列化器

    12.2.4 结果子分区视图

    12.2.5 数据输出

    12.3 数据传递

    12.3.1 本地线程内的数据传递

    12.3.2 本地线程间的数据传递

    12.3.3 跨网络的数据传递

    12.4 数据传递过程

    12.4.1 数据读取

    12.4.2 数据写出

    12.4.3 数据清理

    12.5 网络通信

    12.5.1 网络连接

    12.5.2 无流控

    12.5.3 基于信用的流控

    12.6 总结

    第13章 应用容错

    13.1 容错保证语义

    13.2 检查点与保存点

    13.3 作业恢复

    13.3.1 检查点恢复

    13.3.2 保存点恢复

    13.3.3 恢复时的时间问题

    13.4 关键组件

    13.4.1 检查点协调器

    13.4.2 检查点消息

    13.5 轻量级异步分布式快照

    13.5.1 基本概念

    13.5.2 Barrier对齐

    13.6 检查点执行过程

    13.6.1 JobMaster触发检查点

    13.6.2 TaskExecutor执行检查点

    13.6.3 JobMaster确认检查点

    13.7 检查点恢复过程

    13.8 端到端严格一次

    13.8.1 两阶段提交协议

    13.8.2 两阶段提交实现

    13.9 总结

    第14章 Flink SQL

    14.1 Apache Calcite

    14.1.1 Calcite是什么

    14.1.2 Calcite的技术特点

    14.1.3 Calcite的主要功能

    14.1.4 Calcite的核心原理

    14.2 动态表

    14.2.1 流映射为表

    14.2.2 连续查询

    14.2.3 流上SQL查询限制

    14.2.4 表到流的转换

    14.3 TableEnvironment

    14.3.1 TableEnvironment体系

    14.3.2 TableEnvironment使用示例

    14.4 Table API

    14.5 SQL API

    14.6 元数据

    14.6.1 元数据管理

    14.6.2 元数据分类

    14.7数据访问

    14.7.1Table Source

    14.7.2Table Slink

    14.8 SQL函数

    14.9 Planner关键抽象

    14.9.1 Expression

    14.9.2 ExpressionResolver

    14.9.3 Operation

    14.9.4 QueryOperation

    14.9.5 物理计划节点

    14.10 Blink Planner和Flink Planner对比

    14.11 Blink与Calcite关系

    14.12 Blink SQL执行过程

    14.12.1 从SQL到Operation

    14.12.2 Operation到Transformation

    14.13 Blink Table API执行过程

    14.13.1 Table API到Operation

    14.13.2 Operation到Transformation

    14.14 Flink与Calcite的关系

    14.15 Flink SQL执行过程

    14.15.1 SQL 到Operation

    14.15.2 Operation到DataStream/DataSet

    14.16 Flink Table API执行过程

    14.17 SQL优化

    14.18 Blink优化

    14.18.1 优化器

    14.18.2 代价计算

    14.18.3 优化过程

    14.18.4 优化规则

    14.18.5 公共子图

    14.19 Flink优化

    14.19.1 优化器

    14.19.2 优化过程

    14.19.3 优化规则

    14.20 代码生成

    14.20.1 为什么进行代码生成

    14.20.2 代码生成范围

    14.20.3 代码生成示例

    14.21 总结

    第15章 运维监控

    15.1 监控指标

    15.2 指标组

    15.3 监控集成

    15.4 指标注册中心

    15.5 指标查询服务

    15.6 延迟跟踪实现原理

    15.7 总结

    第16章 RPC框架

    16.1 Akka简介

    16.1.1 Akka是什么

    16.1.2 使用Akka

    16.1.3 Akka的通信

    16.2 RPC消息的类型

    16.3 RPC通信组件

    16.3.1 RpcGateway

    16.3.2 RpcEndpoint

    16.3.3 RpcService

    16.3.4 RpcServer

    16.3.5 AkkaRpcActor

    16.4 RPC交互过程

    16.4.1 RPC请求发送

    16.4.2 RPC请求响应

    16.5总结

    专家寄语

    参考文献

    展开全文
  • 本文会介绍 Flink 计算资源相关的一些核心概念,如:Slot、SlotSharingGroup、CoLocationGroup、Chain等。并会着重讨论 Flink 如何对计算资源进行管理和隔离,如何将计算资源利用率最大化等等。理解 Flink 中的计算...
  • Flink 原理架构总结

    千次阅读 2019-06-10 16:35:03
    一、流式任务执行过程 1.任务并行 按照自己的理解,一个流程如下图所示,除非经历shuffle过程,否则流程并行度将由source的并行度决定,比如kafka分区数目,shuffle之后的并行度,可能会改变... 原理同Spark 的S...
  • 一句话来介绍 Flink 就是 “Stateful Computations Over Streams”。四个基石:Checkpoint、State、Time、Window。 Checkpoint 机制,Flink 基于 Chandy-Lamport 算法实现了分布式一致性的快照,从而提供了 exactly...
  • flink原理与实践

    2019-07-19 12:14:43
    •第一部分:Flink 基本原理与分析
  • 在构建实时数仓的过程中,如何快速、正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题。 01 Flink CDC介绍 ...
  • Apache Flink是什么? 在当代数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题。随着雅虎对hadoop的开源,越来越多的大数据...
  • 1. Flink角色分工 在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程。 JobManager:它扮演的是集群管理者的角色,负责调度任务、协调 checkpoints、协调故障恢复、收集 Job 的状态信息,并...
  • 那么为了让大家更好地掌握 Flink 核心原理,现邀请到有多年大数据从业与分享经验的星云老师,专为学习Flink的同学开设了【Flink实战训练营】,0.02元就能入学! 在这门课程中,你会从理论到实战全面了解最火的实时...
  • 面试官如果问道,为什么使用Flink? 1.Flink 使用有状态流式计算模型实现了高吞吐,低延迟,高性能兼具实时流式计算框架。 什么是有状态计算? 流式计算中算子的中间结果数据保存在内存或者文件系统当中,等下一个...
  • 阿里 Flink 原理及架构深度解析 随着业务的发展,企业对数据的实时性要求越来越高。以电商为例,阿里在双 11 会竖起一面电子屏幕,实时展示淘宝数据,例如成交额、访问人数、订单量、下单量、成交量等等。这个...
  • flink根据时间产生的位置不同,把时间区分为三种时间概念 事件生成时间(event Time) 数据从终端产生,或者从系统中产生的时间 事件接入时间(Ingestion Time) 数据经过中间件传入到flink之后,在dataSource中接...
  • Flink原理、实战与性能优化.pdf
  • Flink原理、实战与性能优化读书笔记

    千次阅读 2019-09-26 01:20:40
    一、Flink优势 1. 目前唯一同时支持高吞吐、低延迟、高性能的分布式流式数据处理框架 2. 支持事件事件概念 3. 支持有状态计算,保持了事件原本产生的时序性,避免网络传输带来的影响 4. 支持高度灵活的窗口操作,...
  • http://yuedu.163.com/book_reader/a6a7fdfedb6246148e4eb19b617557d2_4
  • JobManager调度task,协调checkpoint的报错并进行恢复。 JobManager接收到客户端发来的打包任务信息,将信息分配给...Task slot是Flink中最小的资源单位。 目前slot不会对cpu进行隔离。同一个taskManager中的
  • 文章目录01 引言02 Flink角色03 Flink执行流程3.1 Standalone版本3.2 on yarn04 Flink Streaming Dataflow4.1 Flink相关词汇4.2 Operator传递模式4.3 Operator Chain4.4 任务槽与槽共享4.4.1 任务槽(TaskSlot)4.4.2 ...
  • flink实战--flink原理解析

    千次阅读 多人点赞 2018-09-30 15:40:22
    Flink出现的背景 我们知道目前流处理的主要流行的计算引擎有,Storm,SparkStreaming。但是这个两个计算引擎都有自己的局限性。Storm实现了低延迟,但是目前还没有实现高吞吐,也不能在故障发生的时候准确的处理...
  • Flink原理、实战与性能优化作者:张利兵 著出版日期:2019年04月文件大小:2.09M支持设备:¥50.00在线试读适用客户端:言商书局iPad/iPhone客户端:下载 Android客户端:下载PC客户端:下载更多详情:查看?...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 13,816
精华内容 5,526
关键字:

flink原理

友情链接: SCM.zip