精华内容
下载资源
问答
  • 搭建Scala、Flink开发环境,使用IDEA开发Scala程序 一、基于IDEA安装scala File——Settings——Plugins搜索Scala安装,安装后重启IDEA使Scala插件生效 二、配置Flink开发环境 <dependencies> <!--...

    IDEA搭建Scala、Flink开发环境,使用Scala语言开发Flink程序

    一、基于IDEA安装scala

    File——Settings——Plugins搜索Scala安装,安装后重启IDEA使Scala插件生效
    在这里插入图片描述

    二、配置Flink开发环境

    使用Scala语言开发Flink程序的时候需要添加下面的配置

        <dependencies>
    
    		<!--配置flin
    展开全文
  • 注:本篇章的flink学习均是基于java开发语言 我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢? 前情回顾:什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)? - Batch Analytics,...


    注:本篇章的flink学习均是基于java开发语言

    我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢?

    前情回顾:什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)?

    image-20210307225350028

    - Batch Analytics,右边是 Streaming Analytics。批量计算: 统一收集数据->存储到DB->对数据进行批量处理,对数据实时性邀请不高,比如生成离线报表、月汇总,支付宝年度账单(一年结束批处理计算)

    - Streaming Analytics 流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如 实时报表、车辆实时报警计算等等。

    (0)开发程序所需依赖

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.12.2</flink.version>
    </properties>
    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
    </dependencies>
        <build>
            <sourceDirectory>src/main/java</sourceDirectory>
            <plugins>
                <!-- 编译插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.18.1</version>
                    <configuration>
                        <useFile>false</useFile>
                        <disableXmlReport>true</disableXmlReport>
                        <includes>
                            <include>**/*Test.*</include>
                            <include>**/*Suite.*</include>
                        </includes>
                    </configuration>
                </plugin>
                <!-- 打包插件(会包含所有依赖) -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <!--
                                            zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <!-- 设置jar包的入口类(可选) -->
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    

    (1)获取执行环境

    flink程序开发,首要的便是需要获取其执行环境!

    ex:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    

    或者:

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    

    如果使用StreamExecutionEnvironment 默认便是流式处理环境

    但是flink1.12 开始,流批一体,我们可以自己指定当前计算程序的环境模式

    指定为自动模式:AUTOMATIC

    此设置后,flink将会自动识别数据源类型

    有界数据流,则会采用批方式进行数据处理

    无界束流,则会采用流方式进行数据处理

    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    

    强制指定为批数据处理模式:BATCH

    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    

    强制指定为流数据处理模式:STREAMING

    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    

    注意点:

    在flink中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATICSTREAMING,不可以指定为BATCH否则程序会报错!

    (2)加载/创建数据源

    flink,是一个计算框架,在计算的前提,肯定是要有数据来源啊!

    flink可以从多种场景读取加载数据,例如 各类DB 如MysqlSQL SERVERMongoDB、各类MQ 如KafkaRabbitMQ、以及很多常用数据存储场景 如redis文件(本地文件/HDFS)scoket

    我们在加载数据源的时候,便知道,该数据是有界还是无界了!

    ex:

    flink读取rabbitMQ消息,是有界还是无界呢?当然是无界!因为flink程序启动时,能通过连接知道什么时候MQ中有数据,什么时候没有数据吗?不知道,因为本身MQ中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待MQ中有数据到来,然后处理。


    flink读取指定某个文件中的数据,那么此数据源是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!

    ex:

    从集合中读取数据:

    DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java");
    

    那么,这是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。

    scoket中读取数据:

     DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
    

    这是无界数据还是有界数据呢?很明显,无界数据!因为scoket一旦连接,flink不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket中有数据到来,然后处理。

    (3)数据转换处理

    数据转换处理,就是flink使用算子,对从数据源中获取的数据进行数据加工处理(例如 数据转换,计算等等)

    例如:开窗口、低阶处理函数ProcessFuction、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。

    demo示例:

    DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
                                                               "java,scala,php", "java,scala", "java");
    // 数据处理
    DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String element, Collector<String> out) throws Exception {
            String[] wordArr = element.split(",");
            for (String word : wordArr) {
                out.collect(word);
            }
        }
    });
    flatMap.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws Exception {
            return value.toUpperCase();
        }
    });
    

    (4)处理后数据放置/输出

    将计算后的数据,进行放置(输出/存储),可以很地方,从什么地方读取数据,自然也可以将计算结果输出到该地点。

    例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket

    ex:输出到控制台

    source.print();
    

    (5)执行计算程序

    flink程序需要启动才能执行任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar & 或者编译器中点击图标按钮启动

    启动示例:

    // 1.准备环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置模式 (流、批、自动)
    // 2.加载数据源
    // 3.数据转换
    // 4.数据输出
    // 5.执行程序
    env.execute();
    //或者 env.execute("指定当前计算程序名");
    

    (6)完整示例

    public class FlinkDemo {
        public static void main(String[] args) throws Exception {
            // 1.准备环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置运行模式
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            // 2.加载数据源
            DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
                    "java,scala,php", "java,scala", "java");
            // 3.数据转换
            DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String element, Collector<String> out) throws Exception {
                    String[] wordArr = element.split(",");
                    for (String word : wordArr) {
                        out.collect(word);
                    }
                }
            });
            //DataStream 下边为DataStream子类
            SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    return value.toUpperCase();
                }
            });
            // 4.数据输出
            source.print();
            // 5.执行程序
            env.execute("flink-hello-world");
        }
    }
    

    IDEA执行后,输出结果:

    image-20210331222234445

    前边序号可以理解为多线程执行时的线程名字!

    展开全文
  • Flink什么

    2021-04-18 12:03:03
    Flink是一个专注于流处理的框架,它相比较与spark的流式处理,flink的延迟更短,时间颗粒度更细,且flink也是使用scala语言编写 但注意flin只是专注于流式计算,并不是说不可以做离线计算,它的离线计算和spark的...

    Flink是一个专注于流处理的框架,它相比较与spark的流式处理,flink的延迟更短,时间颗粒度更细,且flink也是使用scala语言编写

    但注意flink只是专注于流式计算,并不是说不可以做离线计算,它的离线计算和spark的差不多,但是它的在线计算,摒弃了spark的很多缺点,如spark通常情况下要依赖于开窗时间和滑动时间来进行一批一批的处理数据,无法做到真正的流式处理,且不同批次的结果想要累加或者一起计算则需要两个状态更细的开窗方法,而flink则没有这种限制,它是真正意义上的流式处理可以从负无穷到正无穷的计算,不分批次,每一条数据都将会被处理

    它的开发流程和spark差不多,只是方法有时有些不同而已,编程方式见代码,这里要重点说的是任务的提交方式以及集群的启动方式!!!!

    Flink的任务提交方式很特别,它不像hadoop的hadoop jar,也不是spark的spark-submit,它的提交和任务的查看可以在管理页面进行,但是对于管理页面来说有一个坑爹的事情,就是集群配置文件master中写的是那个,就必须在那台机器上运行启动命令,不然虽然可以正常启动,但是可用task没有办法进行心跳机制,会发现flink管理页面上的task都是0,管理页面的默认端口是8081,当然也可以通过命令行提交任务

    管理页面的样子如下,这个管理页面你需要重Flink的Master结点的端口打开,这里只是给大家看一下样子,详细使用大家看我其他的资料
    在这里插入图片描述
    命令行提交时使用如下命令
    在这里插入图片描述
    也可以用命令停止
    在这里插入图片描述
    同时Flink流处理的时候还有很多数据源获取方式,甚至flink的流处理可以直接获取批处理那样的数据源,如文件,只是处理的时候是和流处理一样都是一条一条数据出来的,而不是普通批处理那样一整批的处理

    下面是我自己整理了一个pom,供给大家演示Flink用

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.wy</groupId>
        <artifactId>flink</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
    	<!--flink -hdfs-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>1.7.2</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.2</version>
            </dependency>
            <!-- flink sql模块 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.7.2</version>
            </dependency>
    
    	<!-- 保存mysql -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.6</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_2.11</artifactId>
                <version>1.7.0</version>
            </dependency>
    
    
        </dependencies>
    
        <build>
            <pluginManagement>
                <plugins>
                    <!-- 编译scala的插件 -->
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                    </plugin>
                    <!-- 编译java的插件 -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.5.1</version>
                    </plugin>
                </plugins>
            </pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-test-compile</id>
                            <phase>process-test-resources</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
    
                <!-- 打jar插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    下面给大家看一下flink的批处理怎么写

    package wy
    
    /**
      * 导入时一定要将org.apache.flink.api.scala包下所有的东西
      * 全部导入不然会发生无法隐式转换等错误
      */
    import org.apache.flink.api.scala._
    
    object WorldCount {
      def main(args: Array[String]): Unit = {
        /**
          * 开头等价于spark的Spark容器获取
          * 同时我们一般直接使用getExecutionEnvironment就可以
    	  * 原因是Flink最开始获取运行容器有二个方法,分别是获取集群运行资源和获取本地运行资源
    	  * 但是开发时改来改去的麻烦,所以Flink后期提供了getExecutionEnvironment方法
    	  * 该方法可以底层调用了之前的两种运行资源的获取自己判断获取那种资源
          */
        val exe = ExecutionEnvironment.getExecutionEnvironment
    
        /**
          * 此更改整个程序的并行数,这个和spark的有些不同
          * Spark并行数设置通过运行时的cores设置就行
          * 但是flink的有些不一样,且我们一般都会适量设置,一般不使用默认的个数
          * 因为Flink自己会获取运行资源,默认个数为所有可用的task数
          * 我们设置时flink的并行数最好是控制在可用task数之内
          * 可用task数在管理页面中可以查看到
          * task设置少了任务效率受影响
          * 设置多了,就会发生大于可用task的数量的进程会和其他进程争抢task资源
          * 挣不到就会死锁等问题发生特别麻烦
          * 且flink改变并行数有不同的优先级的:
          * 在集群中运行任务时设置的并行数优先级最低
          * 在ExecutionEnvironment对象设置的并行数倒数优先级倒数第二
          * 在方法上设置的并行数优先级最高,如:sum(1).setParallelism()
          */
        exe.setParallelism(1)
    
        /**
          * 加载一个文件进行单词统计算法,使用链式调用
          * 这里就可以发现flink中将spark的reduceByKey分解成
          * groupBy和sum了
          */
        val data = exe.readTextFile("D:\\IdeaObj\\flink\\src\\main\\resources\\world")
        data.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
    
      }
    }
    
    

    下面再给大家演示一下流处理

    package wy
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala._
    
    object SWorldCount {
      def main(args: Array[String]): Unit = {
    
        val exe = StreamExecutionEnvironment.getExecutionEnvironment
    
        //val Sdata = exe.socketTextStream(args(0),args(1).toInt)
        val Sdata = exe.socketTextStream("hdp1",44444)
        Sdata.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print()
    
        /**
          *和Spark一样需要一个方法使得流计算程序执行,并命名程序
          */
        exe.execute("WorldCount")
    
      }
    }
    
    
    展开全文
  • Flink

    2020-07-16 16:31:34
    Flink


    简介

    Apache Flink 是一个在无界和有界数据流上进行状态计算的框架和分布式处理引擎。Flink 已经可以在所有常见的集群环境中运行 (YARN/ Mesos/ Kubernetes),并以 in-memory 的速度和任意的规模进行计算。

    • Source:必须是流式数据,eg:MySQL/ SQL Server/ Kafka/ RabbitMQ;
    • Sink:既可以是流式数据,也可以是批式数据,eg: MySQL/ SQL Server/ HDFS/ Hive/ Kafka/ RabbitMQ/ ElasticSearch;

    Flink的用途:

    • 实时数仓。包括对数据的实时清洗、归并、结构化,以及对传统数仓的补充和优化;
    • 实时报表。包括对双11等营销活动的分析大屏、实时的数据化运营;
    • 流数据分析。包括对实时指标的计算,例如金融风控指标等,以及个性化定制、精准推荐等;
    • 实时监控。包括对用户行为进行实时监测和分析、并进行预警等;

    由于Flink项目中很多华人开发者,因此有较多中文版的学习资料,但这些学子资料都是以由深到浅的方式介绍 Flink 的原理、概念,缺乏入门的demo,让初学者一头雾水。这一点上,Flink 的官方文档是不如 Spark 的官方文档的。


    命令

    安装

    # Flink下载
    wget https://mirror-hk.koddos.net/apache/flink/flink-1.10.3/flink-1.10.3-bin-scala_2.11.tgz
    
    # 解压
    tar -zxvf /root/software/flink-1.10.3-bin-scala_2.11.tgz
    
    # 以单节点形式启动Flink;注意,这里不能用 `sh start-cluster.sh` 形式启动,这里是flink的bug
    ./bin/start-cluster.sh
    
    # 查看Flink WebUI:在浏览器访问 8081 端口即可
    
    # 添加Kafka命令到环境变量,方便快速启动
    vim /etc/profile  # 打开配置文件
    export PATH="/root/flink-1.10.3/bin:$PATH"  # 增加一行
    source /etc/profile  # 变量生效
    

    测试

    通过官方自带demo,检查Flink是否可以正常使用

    # 第一步、启动一个Linux会话窗口,在这个会话中手动输入流数据
    nc -l 9999
    
    # 第二步、启动另外一个Linux会话窗口,这样就启动了一个Flink任务
    flink run ./bin/examples/streaming/SocketWindowWordCount.jar --port 9999
    
    # 第三步、登录Flink WebUI查看任务执行状态
    
    # 第四步、在第一个会话窗口中随意输入一些单词
    
    # 第五步、查看结果。在Flink WenUI - Job Manager - Stdout 也可以查看
    vim ./log/flink-root-taskexecutor-0-VM-232-72-centos.log
    

    其它

    # Flink 1.8.0
    # 安装目录 [192.168.102.85] home/devsa_dev/flink-1.8.0/
    ./bin/start-cluster.sh  # 以StandAlone模式启动Flink集群
    ./bin/stop-cluster.sh  # 启动Flink集群
    ./bin/flink --version  # 查看Flink版本
    
    ./bin/flink run examples/streaming/WordCount.jar  # 提交demo任务,WordCount
    ./bin/flink run -d examples/streaming/TopSpeedWindowing.jar  # 提交demo任务,TopSpeedWindowing
    ./bin/sql-client.sh embedded  # 启动Flink-SQL客户端
    ./bin/start-scala-shell.sh  # 启动Flink的Scala shell入口
    
    ./bin/flink -h  # 查看Flink的命令行参数
    ./bin/flink run -h  # 查看run命令的参数
    ./bin/flink list -m 127.0.0.1:8081  # 查看任务列表
    ./bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb  # 停止任务
    
    ./bin/flink run -m yarn-cluster ./example/batch/WordCount.jar  # 以yarn模式提交作业
    

    Flink的API

    Flink 的API可以使用 Java / Scala / Python 来调用。

    Stateful Stream Processing

    Flink 最底层的API。它将通过过程函数(Process Function)嵌入到 DataStream API 中,允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
    DataStream API 是 Flink API 中比较底层的一种,在实际的生产中,需要用户较大的开发工作量,但同时它也提供了更为灵活的表达能力,适用于需要精细操作的情况;而绝大多数业务逻辑,都可以用 Flink SQL 来实现。

    DataStream / DataSet API

    Flink 提供的核心 API,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。

    Table API

    Table API 与 Flink SQL 联系非常紧密。
    Flink API 中最顶层的抽象,可以做流批统一处理。Flink SQL 是基于Apache Calcite来实现的标准SQL;它对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。它也是使用起来最简单的 API。
    Table API 是以 表 为中心的声明式 DSL,其中表可能会动态变化(在表达流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁(代码量更少)。你可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。

    SQL

    Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。


    Flink SQL

    Flink SQL 使用类似 ANSI-SQL 的语言,操作流式数据,从而实现业务逻辑的实时计算;其本质是调用了封装在 Table API & SQL 中的 Flink 函数,也是 Flink API 中最高级、最简单的形式。通过简单的 SQL 语言就可以实现实时计算,从而实现业务与技术的解耦,这是 Flink 比 Spark Streaming 高明的地方。但是,Flink SQL 并不适用于过于复杂的业务逻辑的实现。

    # Flink中常见的窗口函数
    select  -- 统计每五分钟、每个城市的骑车人数
        city
        , tumble_end(start_time, interval '5' minute) window_end  -- 时间窗口,每5分钟统计一次
        , count(*) cnt
    from t_ride
    WHERE is_del = 0
    group by city
        , tumble(start_time, interal '5' minute)
    having count(*) >= 5
    ;
    
    
    insert into sink_kafka_TenMinPsgCnts  -- 将结果写入Kafka
    select
        tumble_start(ride_time, interval '10' minutes) cnt_start  -- 每10分钟的搭乘的乘客数
        , tumble_end(ride_time, interval '10' minutes) cnt_end
        , cast(sum(psgCnt) as bigint) cnt
    from t_rides
    group by tumble(ride_time, interval '10' minutes)
    ;
    
    
    -- 将结果写入Elasticsearch
    insert into sink_elasticsearch_AreaCnts
    select
        city  -- 每个区域出发的行车数
        , count(*) cnt
    from t_rides
    where is_del = 0
    group by city
    ;
    

    Table API

    依赖

    < !-- Flink Table API的依赖 -- >
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>
    
    < !-- Flink Table API在本地IDE运行的依赖 -- >
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>
    

    DataStream / DataSet API

    Flink 的这不分 API 操作与 Spark Streaming有非常多的相似之处。

    展开全文
  • Flink:Flink-SQL开发

    千次阅读 2020-07-27 18:08:40
    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对...
  • flink实例开发-详细使用指南

    万次阅读 2018-03-10 17:50:00
    Flink入门及实战-上: ...flink实例开发-详细使用指南 配置一个maven项目 编写一个flink程序 编程实战:编写一个向kafka写数据的程序在集群运行 flink整合kafka 在本指南中,我们将从头开始,从flink项...
  • Effective Apache Flink(一)Apache Flink什么? 目录 Effective Apache Flink(一)Apache Flink什么? 一、架构 处理无界和有界数据 部署应用到任意地方 运行任意规模应用 利用内存性能 二、应用 流...
  • study_java_flink 学习项目,java语言flink组件开发 flink官网: ://flink.apache.org/
  • 什么FlinkFlink能用来做什么

    千次阅读 2021-02-08 17:16:38
    Flink什么? Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。 Apache Flink 功能强大,支持...
  • 【大数据Flink系列】 Flink 开发环境搭建

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 9,425
精华内容 3,770
关键字:

flink用什么语言开发