精华内容
下载资源
问答
  • Flink读取文件数据的两种方式

    千次阅读 2019-06-24 20:00:58
    概述:Flink数据源(Data Sources)可以来自Java collections,也可以来自文件。本文主要介绍分别使用Scala和Java从CSV、Text中读取数据的方法,更详细信息可以参阅官方文档。 (1)Scala方式 /** * map算子 * ...

    概述:Flink数据源(Data Sources)可以来自Java collections,也可以来自文件。本文主要介绍分别使用Scala和Java从CSV、Text中读取数据的方法,更详细信息可以参阅官方文档

    (1)Scala方式

    /**
        * map算子
        *
        * @param env
        */
    def mapFunction(env: ExecutionEnvironment): Unit = {
        import org.apache.flink.api.scala._
        val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
        //    data.map((x:Int)=>x+1).print()
        //    data.map((x)=>x+1).print()
        //    data.map(x=>x+1).print()
        data.map(_ + 1).filter(_ > 5).print()
    }
    ​
    /**
        * 读取压缩文件
        *
        * @param env
        */
    def readCompressionFiles(env: ExecutionEnvironment): Unit = {
        val filePath = "E:\\compression"
        env.readTextFile(filePath).print()
    }
    ​
    /**
        * 读取嵌套文件
        *
        * @param env
        */
    def readRecursiveFiles(env: ExecutionEnvironment): Unit = {
        val filePath = "E:\\nested"
        val parameters = new Configuration
        parameters.setBoolean("recursive.file.enumeration", true)
        env.readTextFile(filePath).withParameters(parameters).print()
    }
    ​
    /**
        * 从集合中获取数据
        *
        * @param env
        */
    def fromCollection(env: ExecutionEnvironment): Unit = {
        import org.apache.flink.api.scala._
        val data = 1 to 10
        env.fromCollection(data).print()
    }
    ​
    /**
        * 从Text中读取数据
        *
        * @param env
        */
    def textFile(env: ExecutionEnvironment): Unit = {
        //    val filePath="E:\\temp\\people.txt"
        val filePath = "E:\\temp"
        env.readTextFile(filePath).print()
    }
    ​
    /**
        * 从csv中读取数据 使用自定义类方式
        * id,peopleName,phone,email
        *
        * @param env
        */
    def csvFile(env: ExecutionEnvironment): Unit = {
        import org.apache.flink.api.scala._
        val filePath = "E:\\temp\\people.csv"
        //    env.readCsvFile[(Int,String,String,String)](filePath,ignoreFirstLine = true).print()
        //        env.readCsvFile[(Int,String)](filePath,ignoreFirstLine = true,includedFields =Array(0,3)).print()
        case class PeopleCsvClass(phone: String, email: String)
        //    env.readCsvFile[PeopleCsvClass](filePath,ignoreFirstLine=true,includedFields = Array(2,3)).print()
        env.readCsvFile[PeopleCsvClass](filePath, ignoreFirstLine = true, pojoFields = Array("phone", "email")).print()
    }

    (2)Java方式

     public static void mapFunction(ExecutionEnvironment env) throws Exception {
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                list.add(i);
            }
            DataSource<Integer> data = env.fromCollection(list);
            data.map(new MapFunction<Integer, Integer>() {
                @Override
                public Integer map(Integer input) throws Exception {
                    return input + 1;
                }
            }).print();
        }
    ​
        public static void readRecursive(ExecutionEnvironment env) throws Exception {
            String filePath = "E:\\nested";
            Configuration parameters = new Configuration();
            parameters.setBoolean("recursive.file.enumeration", true);
            env.readTextFile(filePath).withParameters(parameters).print();
        }
    ​
        public static void formCollection(ExecutionEnvironment env) throws Exception {
            List<Integer> list = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                list.add(i);
            }
            env.fromCollection(list).print();
        }
    ​
        public static void textFile(ExecutionEnvironment env) throws Exception {
            String filePath = "E:\\temp\\people.txt";
            env.readTextFile(filePath).print();
            System.out.println("---------------------------------------");
            filePath = "E:\\temp";
            env.readTextFile(filePath).print();
        }
    ​
        public static void csvFile(ExecutionEnvironment env) throws Exception {
            String filePath = "E:\\temp\\people.csv";
            env.readCsvFile(filePath).pojoType(People.class, "id", "peopleName", "phone", "email").print();
        }

     

    展开全文
  • import org.apache.flink.streaming.api.scala._ object SourceTest2 { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env....
    import org.apache.flink.streaming.api.scala._
    
    object SourceTest2 {
      def main(args: Array[String]): Unit = {
        // 创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val value: DataStream[String] = env.readTextFile("D:\\Flink\\src\\main\\resources\\sensor.txt")
    
    
        value.print("stream1")
        env.execute("source test job")
      }
    }
    

    其中D:\Flink\src\main\resources\sensor.txt路径下的文件内容是:

    sensor_1,1547718199,35.8
    sensor_6,1547718201,15.4
    sensor_7,1547718202,6.7
    sensor_10,1547718205,38.1
    sensor_1,1547718207,37.2
    sensor_1,1547718212,33.5
    sensor_1,1547718215,38.1
    

    执行输出结果:

    stream1> sensor_1,1547718199,35.8
    stream1> sensor_6,1547718201,15.4
    stream1> sensor_7,1547718202,6.7
    stream1> sensor_10,1547718205,38.1
    stream1> sensor_1,1547718207,37.2
    stream1> sensor_1,1547718212,33.5
    stream1> sensor_1,1547718215,38.1
    
    展开全文
  • sensor.txt sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718207,37.2 ...import org.apache.flink.streaming.api.scala._ import org.apa

    sensor.txt

    sensor_1,1547718199,35.8
    sensor_6,1547718201,15.4
    sensor_7,1547718202,6.7
    sensor_10,1547718205,38.1
    sensor_1,1547718207,37.2
    sensor_1,1547718212,33.5
    sensor_1,1547718215,38.1
    

    TableAPI

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.scala._
    import org.apache.flink.table.api.{DataTypes, Table}
    import org.apache.flink.table.descriptors._
    
    /**
     * 读取kafka的数据并且转成表
     */
    object TableApiTest4 {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val tableEnv = StreamTableEnvironment.create(env)
    
        val filePath = "D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
        tableEnv.connect(new FileSystem().path(filePath))
    
          .withFormat(new Csv()) // 指定csv格式,就是逗号分割的格式,因为kafka输入的数据就是这个格式的.
          //创建每个字段的名字和类型
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("temperature", DataTypes.DOUBLE())
          )
          // 创建表名
          .createTemporaryTable("inputTable")
    
    
        //    表的查询
        val sensorTable: Table = tableEnv.from("inputTable")
        val resultTable: Table = sensorTable
          .select('id, 'temperature)
          .filter('id === "sensor_1") //等于是 三个等号,
    
        resultTable.toAppendStream[(String, Double)].print("TableAPI")
    
    
        env.execute("table api test job")
      }
    }
    
    

    结果:

    TableAPI> (sensor_1,35.8)
    TableAPI> (sensor_1,37.2)
    TableAPI> (sensor_1,33.5)
    TableAPI> (sensor_1,38.1)
    

    FlinkSql

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.scala._
    import org.apache.flink.table.api.{DataTypes, Table}
    import org.apache.flink.table.descriptors._
    
    /**
     * 读取kafka的数据并且转成表
     */
    object TableApiTest4 {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val tableEnv = StreamTableEnvironment.create(env)
    
        val filePath = "D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
        tableEnv.connect(new FileSystem().path(filePath))
    
          .withFormat(new Csv()) // 指定csv格式,就是逗号分割的格式,因为kafka输入的数据就是这个格式的.
          //创建每个字段的名字和类型
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("temperature", DataTypes.DOUBLE())
          )
          // 创建表名
          .createTemporaryTable("inputTable")
    
    
    
        //    SQL简单查询
        val resultSqlTable: Table = tableEnv.sqlQuery(
          """
            |select id, temperature
            |from inputTable
            |where id = 'sensor_1'
              """.stripMargin)
        resultSqlTable.toAppendStream[(String, Double)].print("FlinkSql")
    
    
        env.execute("table api test job")
      }
    }
    
    

    结果:

    FlinkSql> (sensor_1,35.8)
    FlinkSql> (sensor_1,37.2)
    FlinkSql> (sensor_1,33.5)
    FlinkSql> (sensor_1,38.1)
    
    展开全文
  • Flink读取资源文件

    2020-11-11 15:24:20
    Flink读取资源文件 看下图 查看Flink源码,才发现是用getResourceAsStream()这个方法才可以读取资源文件; 如下图:

    Flink读取资源文件

    看下图

    在这里插入图片描述
    查看Flink源码,才发现是用getResourceAsStream()这个方法才可以读取资源文件;
    如下图:
    在这里插入图片描述

    展开全文
  • flink读取csv文件

    千次阅读 2020-06-03 15:35:02
    org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.9.0</version> </dependency> <!-- https://mvnrepository.com/artifact...
  • 最近总是会有需求要涉及到读取csv文件,以前总是拿到文件就去,没有仔细去看相关的方法和功能,现在结合最近的需求,记录一下spark和flink读取csv文件的操作。 (注:本文内容针对spark2.3.0以及flink1.9.1) ...
  • Flink实战(六)Flink读取Hdfs文件

    万次阅读 2019-10-11 21:39:04
    接一下以一个示例配置来介绍一下如何以Flink连接HDFS 1. 依赖HDFS pom.xml 添加依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11...
  • Flink读写文件

    千次阅读 2020-05-15 10:57:25
    读写文件1 读取文件-readFile2 写入到文件-StreamingFileSink    2.1 在了解-StreamingFileSink之前你需要了解的知识点        2.1.1 结论    2.2 行编码        2.2.1 行编码自定义-...
  • Flink读取本地文件

    千次阅读 2019-09-17 20:24:35
    2.读取文本文件 public static void main(String[] args) throws Exception { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env....
  • 任意文件读取 任意文件写入 修复建议 漏洞描述 Flink 在 1.5.1 版本中引入了一个 REST handler,这允许攻击者将已上传的文件写入本地任意位置的文件中,并且可通过恶意修改的 HTTP 头将这些文件写入到 Flink ...
  • 其实我试图通过flink java api读取文件和csv转换。如何使用flink java api读取目录下的文件名(本地文件系统/ hdfs)根据我们的要求。 一)需要通过文件夹作为输入参数,输出参数为CSV文件名 二)需要读取从本地文件系统...
  • 读取文件的HDFS路径信息如下图 Spark读取HDFS文件代码示例如下 package demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object _00wcHDFS { def main(args: Array...
  • 1.11 flink读取本地文件例子以及细节

    千次阅读 2020-11-21 13:44:19
    可以指定文件或目录 可以指定读取模式一次性或持续性检测 代码例子 PROCESS_ONCE模式 public class FileToPrint { public static void main(final String[] args) throws Exception { ...
  • Flink 配置文件详解

    千次阅读 2019-07-18 13:03:36
    这两天正好在看现在比较火的大数据流处理框架flink,熟悉了flink的搭建过程,还是比较简单的,于是便了解了一下flink的配置文件 安装目录下主要有 flink-conf.yaml 配置、日志的配置文件、zk 配置、Flink SQL ...
  • Flink 配置文件

    2019-05-06 09:52:06
    安装目录下主要有 flink-conf.yaml 配置、日志的配置文件、zk 配置、Flink SQL Client 配置。 1、flink-conf.yaml 基础配置 # jobManager 的IP地址 jobmanager.rpc.address: localhost # JobManager 的端口号 ...
  • Flink读取HDFS上的Parquet文件生成DataSet.pdf
  • 使用Flink读取本地数据源 处理数据,导入ES中 提交Flink作业 环境 Flink :1.8.2 Elasticsearch:6.2.3 JDK:1.8 pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns=...
  • Flink配置文件详解

    2019-08-30 10:25:16
    安装目录下主要有 flink-conf.yaml 配置、日志的配置文件、zk 配置、Flink SQL Client 配置。 1、Flink-conf.yarml 基础配置 # jobManager 的IP地址 jobmanager.rpc.address: localhost # JobManager 的端口号...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 12,022
精华内容 4,808
关键字:

flink读文件