精华内容
下载资源
问答
  • 外部数据源

    2018-05-21 12:23:37
    不同压缩格式,不同存储接口,用户肯定希望从不同数据源收集数据方便、快速从不同数据源()经过混合处理(json直接和parqent jion)再将结果以特定格式写回到指定系统上去sparksql 1.2====》外部数据源api问题...

    产生背景

    概述

    目标

    操作parquet文件数据

    操作hive数据

    操作mysql数据

    统一


    产生背景

    每一个spark都是以加载数据开始,经过一系列处理,最后存储到其他地方;

    不同格式,不同压缩格式,不同存储接口,用户肯定希望从不同数据源收集数据

    方便、快速从不同数据源()经过混合处理(json直接和parqent jion)再将结果以特定格式写回到指定系统上去

    sparksql 1.2====》外部数据源api

    问题:

    1.加载、保存数据并不简单,比如从关系数据库sqoop加载到hdfs然后..

    2.解析原生数据(text/json/parquet)

    3.转换数据格式

    数据集存储在不同的存储系统、格式上面



    api概述:

    一种扩展方式,将外部数据源整合到sparl sql中

    读写各种格式(指定格式和路径local,分布式)


    目标:

    开发人员:是否需要把代码合并到spark源码中?比如访问微博(按照dataframe api实现)

    对于使用人员:非常容易加载保存

    支持:build-in json parquet jdbc 其他数据源:packages:外部的,非spark内置的,一个网站


    操作 parqeut文件数据

    展开全文
  • 加载外部数据源  可以加载好多种外部数据源的格式,例如:csv,text,json,parquet等。我们在这里讲解下json和parquet格式。  json:  代码:  def main(args: Array[String]): Unit = { val ...
    加载外部数据源

     

     

      可以加载好多种外部数据源的格式,例如:csv,text,json,parquet等。我们在这里讲解下json和parquet格式。

      json:

        代码:

          def main(args: Array[String]): Unit = {

            val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
            import spark.implicits._
            val df_json = spark.read.json("file:///d:/测试数据/sample1.json")  //读取json文件存储为DataFrame
            df_json.printSchema()     //打印文件的字段和字段类型
          }

        加载Json格式后,df会自动识别Schema,如下:

     

    df_json.printSchema()  -->
                root
             |-- city: string (nullable = true)
             |-- count: string (nullable = true)
             |-- id: string (nullable = true)
             |-- info: string (nullable = true)
             |-- infocode: string (nullable = true)
             |-- pois: array (nullable = true)
             |    |-- element: struct (containsNull = true)
             |    |    |-- adcode: string (nullable = true)
             |    |    |-- address: string (nullable = true)
             |    |    |-- adname: string (nullable = true)
             |    |    |-- alias: string (nullable = true)
             |    |    |-- biz_ext: struct (nullable = true)
             |    |    |    |-- cost: string (nullable = true)
             |    |    |    |-- hotel_ordering: string (nullable = true)
             |    |    |    |-- lowest_price: string (nullable = true)
             |    |    |    |-- meal_ordering: string (nullable = true)
             |    |    |    |-- rating: string (nullable = true)
             |    |    |    |-- seat_ordering: string (nullable = true)
             |    |    |    |-- star: array (nullable = true)
             |    |    |    |    |-- element: string (containsNull = true)
             |    |    |    |-- ticket_ordering: string (nullable = true)
             |    |    |-- biz_type: string (nullable = true)
             |    |    |-- business_area: string (nullable = true)
             |    |    |-- children: array (nullable = true)
             |    |    |    |-- element: string (containsNull = true)
             |    |    |-- citycode: string (nullable = true)
             |    |    |-- cityname: string (nullable = true)
             |    |    |-- discount_num: string (nullable = true)
             |    |    |-- distance: string (nullable = true)
             |    |    |-- email: array (nullable = true)
             |    |    |    |-- element: string (containsNull = true)
             |    |    |-- entr_location: string (nullable = true)
             |    |    |-- event: array (nullable = true)
             |    |    |    |-- element: string (containsNull = true)
             |    |    |-- exit_location: array (nullable = true)
             |    |    |    |-- element: string (containsNull = true)
             |    |    |-- gridcode: string (nullable = true)
             |    |    |-- groupbuy_num: string (nullable = true)
             |    |    |-- id: string (nullable = true)
             |    |    |-- importance: array (nullable = true)
             |    |    |    |-- element: string (containsNull = true)
             |    |    |-- indoor_data: struct (nullable = true)
             |    |    |    |-- cmsid: string (nullable = true)
             |    |    |    |-- cpid: string (nullable = true)
             |    |    |    |-- floor: string (nullable = true)
             |    |    |    |-- truefloor: string (nullable = true)
             |    |    |-- indoor_map: string (nullable = true)
             |    |    |-- location: string (nullable = true)
             |    |    |-- match: string (nullable = true)
             |    |    |-- name: string (nullable = true)
             |    |    |-- navi_poiid: string (nullable = true)
             |    |    |-- pcode: string (nullable = true)
             |    |    |-- photos: array (nullable = true)
             |    |    |    |-- element: struct (containsNull = true)
             |    |    |    |    |-- title: string (nullable = true)
             |    |    |    |    |-- url: string (nullable = true)
             |    |    |-- pname: string (nullable = true)
             |    |    |-- poiweight: array (nullable = true)
             |    |    |    |-- element: string (containsNull = true)
             |    |    |-- postcode: string (nullable = true)
             |    |    |-- recommend: string (nullable = true)
             |    |    |-- shopid: array (nullable = true)
             |    |    |    |-- element: string (containsNull = true)
             |    |    |-- shopinfo: string (nullable = true)
             |    |    |-- tag: string (nullable = true)
             |    |    |-- tel: string (nullable = true)
             |    |    |-- timestamp: array (nullable = true)
             |    |    |    |-- element: string (containsNull = true)
             |    |    |-- type: string (nullable = true)
             |    |    |-- typecode: string (nullable = true)
             |    |    |-- website: string (nullable = true)
             |-- school_name: string (nullable = true)
             |-- status: string (nullable = true)
             |-- suggestion: struct (nullable = true)
             |    |-- cities: array (nullable = true)
             |    |    |-- element: string (containsNull = true)
             |    |-- keywords: array (nullable = true)
             |    |    |-- element: string (containsNull = true)

     

      执行查询:

              df_json.createOrReplaceTempView("sample")
              val df1 = spark.sql("select school_name from sample")
              df1.show(10)
                +------------+
            | school_name|
            +------------+
            |三明市尤溪县城南中心小学|
            |     三亚市第七小学|
            |        天涯小学|
            |     三亚市榆红小学|
            |  三明市泰宁县下渠学校|
            |      三亚丰和学校|
            |        铜川学校|
            |  三明市三元区岩前小学|
            |  三亚市天涯区桶井小学|
            |  三亚市吉阳区红沙小学|
            +------------+

      注意:如果Json格式不严谨,sparkSQL能将问题数据解析出来

     +--------------------+----+-----+------+----+--------+--------------------+------------+------+--------------------+
            |     _corrupt_record|city|count|    id|info|infocode|                pois| school_name|status|          suggestion|
            +--------------------+----+-----+------+----+--------+--------------------+------------+------+--------------------+
            |                null| 三明市|  186|216938|  OK|   10000|[[350426,城关镇解放路3号...|三明市尤溪县城南中心小学|     1|[WrappedArray(),W...|
            |{"id":"419293","s...|null| null|  null|null|    null|                null|        null|  null|                null|
            |                null| 三亚市|   25|420278|  OK|   10000|[[460204,225国道与31...|        天涯小学|     1|[WrappedArray(),W...|
            |                null| 三亚市|   73|420132|  OK|   10000|[[460203,15路;17路;...|     三亚市榆红小学|     1|[WrappedArray(),W...|
            |                null| 三明市|  578|216867|  OK|   10000|[[350429,松山寺悟旺法师,...|  三明市泰宁县下渠学校|     1|[WrappedArray(),W...|
            |                null| 三亚市|  745|419818|  OK|   10000|[[460203,月北路附近,吉阳...|      三亚丰和学校|     1|[WrappedArray(),W...|
            |                null| 上海市|  891|  4121|  OK|   10000|[[310107,62路;323路...|        铜川学校|     1|[WrappedArray(),W...|
            |                null| 三明市|  900|216812|  OK|   10000|[[350403,星桥大路线,三元...|  三明市三元区岩前小学|     1|[WrappedArray(),W...|
            |                null| 三亚市|   81|420277|  OK|   10000|[[460204,凤凰镇桶井小学对...|  三亚市天涯区桶井小学|     1|[WrappedArray(),W...|
            |                null| 三亚市|   70|419991|  OK|   10000|[[460203,2路,吉阳区,[...|  三亚市吉阳区红沙小学|     1|[WrappedArray(),W...|
            +--------------------+----+-----+------+----+--------+--------------------+------------+------+--------------------+
            only showing top 10 rows

      parquet:

        代码:

          def main(args: Array[String]): Unit = {

            val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
            import spark.implicits._
            val df_parquet = spark.read.parquet("file:///D:/测试数据/sogou/parquet/part-00000-30a2dca2-91f2-4ea6-8993-    c22870048d6f.snappy.parquet")
            df_parquet.show(10);
          }

      运行结果:

          +--------------------+
          |         value|
          +--------------------+
          |20111230000005 57...|
          |20111230000005 66...|
          |20111230000007 b9...|
          |20111230000008 69...|
          |20111230000008 f2...|
          |20111230000009 96...|
          |20111230000009 69...|
          |20111230000009 59...|
          |20111230000010 f5...|
          |20111230000010 28...|
          +--------------------+
            only showing top 10 rows

    存储至外部数据源

     

      

      1.parquet格式存储
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
          import spark.implicits._
          val df = spark.read.textFile("file:///D:/测试数据/sogou/SogouQ3.txt/*")
          df.repartition(1).write.parquet("file:///D:/测试数据/sogou/parquet/")
        }
      2.orcFile
        需添加hive依赖
          <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-hive_2.11</artifactId>
           <version>2.1.2</version>
           <scope>provided</scope>
          </dependency>

    解决加载数据乱码问题

     

    说明:如果数据源默认不是UTF-8的编码集,那么加载数据将会产生乱码,解决方法如下:
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
        import spark.implicits._=
        val rdd = spark.sparkContext.hadoopFile("file:///D:/测试数据/sogou/SogouQ3.txt/*",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],1)
                      .map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
                      .map(x=>x.split("\t"))
                      .map(x=>new sogou(x(0),x(1),x(2),x(3),x(4),x(5)))
        val df = spark.createDataFrame(rdd)
        df.printSchema()
        df.show(10)
      }

      因为textfile底层调用hadoopfile,还用调用textinputformat,编码格式默认为utf-8,所以这样加载数据才不会出现乱码的问题。

     

    转载于:https://www.cnblogs.com/lyr999736/p/10224774.html

    展开全文
  • Python-使用外部数据源 该存储库包含在Python中使用外部数据源进行写入和读取外部数据源的示例。 包括文件合并
  • Spark 外部数据源调用代码,CSV文件 和HIVE读取方式。
  • 知识库系统与外部数据源接口的研究以实践认知世界,以实事构架世界,知识库系统与外部数据源接口的研究总...该文档为知识库系统与外部数据源接口的研究,是一份很不错的参考资料,具有较高参考价值,感兴趣的可以...
  • SparkSQL外部数据源

    2020-09-13 18:44:12
    Spark SQL 外部数据源 一、简介         1.1 多数据源支持         1.2 读数据格式        ...

    Spark SQL 外部数据源

    一、简介

    1.1 多数据源支持

    Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景。

    • CSV
    • JSON
    • Parquet
    • ORC
    • JDBC/ODBC connections
    • Plain-text files

    1.2 读数据格式

    所有读取 API 遵循以下调用格式:

    // 格式
    DataFrameReader.format(...).option("key", "value").schema(...).load()
    
    // 示例
    spark.read.format("csv")
    .option("mode", "FAILFAST")          // 读取模式
    .option("inferSchema", "true")       // 是否自动推断 schema
    .option("path", "path/to/file(s)")   // 文件路径
    .schema(someSchema)                  // 使用预定义的 schema      
    .load()
    

    读取模式有以下三种可选项:

    读模式 描述
    permissive 当遇到损坏的记录时,将其所有字段设置为 null,并将所有损坏的记录放在名为 _corruption t_record 的字符串列中
    dropMalformed 删除格式不正确的行
    failFast 遇到格式不正确的数据时立即失败

    1.3 写数据格式

    // 格式
    DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
    
    //示例
    dataframe.write.format("csv")
    .option("mode", "OVERWRITE")         //写模式
    .option("dateFormat", "yyyy-MM-dd")  //日期格式
    .option("path", "path/to/file(s)")
    .save()
    

    写数据模式有以下四种可选项:

    Scala/Java 描述
    SaveMode.ErrorIfExists 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式
    SaveMode.Append 数据以追加的方式写入
    SaveMode.Overwrite 数据以覆盖的方式写入
    SaveMode.Ignore 如果给定的路径已经存在文件,则不做任何操作

    二、CSV

    CSV 是一种常见的文本文件格式,其中每一行表示一条记录,记录中的每个字段用逗号分隔。

    2.1 读取CSV文件

    自动推断类型读取读取示例:

    spark.read.format("csv")
    .option("header", "false")        // 文件中的第一行是否为列的名称
    .option("mode", "FAILFAST")      // 是否快速失败
    .option("inferSchema", "true")   // 是否自动推断 schema
    .load("/usr/file/csv/dept.csv")
    .show()
    

    使用预定义类型:

    import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
    //预定义数据格式
    val myManualSchema = new StructType(Array(
        StructField("deptno", LongType, nullable = false),
        StructField("dname", StringType,nullable = true),
        StructField("loc", StringType,nullable = true)
    ))
    spark.read.format("csv")
    .option("mode", "FAILFAST")
    .schema(myManualSchema)
    .load("/usr/file/csv/dept.csv")
    .show()
    

    2.2 写入CSV文件

    df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
    

    也可以指定具体的分隔符:

    df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
    

    三、JSON

    3.1 读取JSON文件

    spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
    

    需要注意的是:默认不支持一条数据记录跨越多行 (如下),可以通过配置 multiLinetrue 来进行更改,其默认值为 false

    // 默认支持单行
    {"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
    
    //默认不支持多行
    {
      "DEPTNO": 10,
      "DNAME": "ACCOUNTING",
      "LOC": "NEW YORK"
    }
    

    3.2 写入JSON文件

    df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
    

    四、Parquet

    Parquet 是一个开源的面向列的数据存储,它提供了多种存储优化,允许读取单独的列非整个文件,这不仅节省了存储空间而且提升了读取效率,它是 Spark 是默认的文件格式。

    4.1 读取Parquet文件

    spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
    

    2.2 写入Parquet文件

    df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
    

    2.3 可选配置

    Parquet 文件有着自己的存储规则,因此其可选配置项比较少,常用的有如下两个:

    读写操作 配置项 可选值 默认值 描述
    Write compression or codec None,
    uncompressed,
    bzip2,
    deflate, gzip,
    lz4, or snappy
    None 压缩文件格式
    Read mergeSchema true, false 取决于配置项 spark.sql.parquet.mergeSchema 当为真时,Parquet 数据源将所有数据文件收集的 Schema 合并在一起,否则将从摘要文件中选择 Schema,如果没有可用的摘要文件,则从随机数据文件中选择 Schema。

    更多可选配置可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html


    五、ORC

    ORC 是一种自描述的、类型感知的列文件格式,它针对大型数据的读写进行了优化,也是大数据中常用的文件格式。

    5.1 读取ORC文件

    spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
    

    4.2 写入ORC文件

    csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
    

    六、SQL Databases

    Spark 同样支持与传统的关系型数据库进行数据读写。但是 Spark 程序默认是没有提供数据库驱动的,所以在使用前需要将对应的数据库驱动上传到安装目录下的 jars 目录中。下面示例使用的是 Mysql 数据库,使用前需要将对应的 mysql-connector-java-x.x.x.jar 上传到 jars 目录下。

    6.1 读取数据

    读取全表数据示例如下,这里的 help_keyword 是 mysql 内置的字典表,只有 help_keyword_idname 两个字段。

    spark.read
    .format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")            //驱动
    .option("url", "jdbc:mysql://127.0.0.1:3306/mysql")   //数据库地址
    .option("dbtable", "help_keyword")                    //表名
    .option("user", "root").option("password","root").load().show(10)
    

    从查询结果读取数据:

    val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
    spark.read.format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "root").option("password", "root")
    .option("dbtable", pushDownQuery)
    .load().show()
    
    //输出
    +---------------+-----------+
    |help_keyword_id|       name|
    +---------------+-----------+
    |              0|         <>|
    |              1|     ACTION|
    |              2|        ADD|
    |              3|AES_DECRYPT|
    |              4|AES_ENCRYPT|
    |              5|      AFTER|
    |              6|    AGAINST|
    |              7|  AGGREGATE|
    |              8|  ALGORITHM|
    |              9|        ALL|
    |             10|      ALTER|
    |             11|    ANALYSE|
    |             12|    ANALYZE|
    |             13|        AND|
    |             14|    ARCHIVE|
    |             15|       AREA|
    |             16|         AS|
    |             17|   ASBINARY|
    |             18|        ASC|
    |             19|     ASTEXT|
    +---------------+-----------+
    

    也可以使用如下的写法进行数据的过滤:

    val props = new java.util.Properties
    props.setProperty("driver", "com.mysql.jdbc.Driver")
    props.setProperty("user", "root")
    props.setProperty("password", "root")
    val predicates = Array("help_keyword_id < 10  OR name = 'WHEN'")   //指定数据过滤条件
    spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() 
    
    //输出:
    +---------------+-----------+
    |help_keyword_id|       name|
    +---------------+-----------+
    |              0|         <>|
    |              1|     ACTION|
    |              2|        ADD|
    |              3|AES_DECRYPT|
    |              4|AES_ENCRYPT|
    |              5|      AFTER|
    |              6|    AGAINST|
    |              7|  AGGREGATE|
    |              8|  ALGORITHM|
    |              9|        ALL|
    |            604|       WHEN|
    +---------------+-----------+
    

    可以使用 numPartitions 指定读取数据的并行度:

    option("numPartitions", 10)
    

    在这里,除了可以指定分区外,还可以设置上界和下界,任何小于下界的值都会被分配在第一个分区中,任何大于上界的值都会被分配在最后一个分区中。

    val colName = "help_keyword_id"   //用于判断上下界的列
    val lowerBound = 300L    //下界
    val upperBound = 500L    //上界
    val numPartitions = 10   //分区综述
    val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
                                 colName,lowerBound,upperBound,numPartitions,props)
    

    想要验证分区内容,可以使用 mapPartitionsWithIndex 这个算子,代码如下:

    jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
        val buffer = new ListBuffer[String]
        while (iterator.hasNext) {
            buffer.append(index + "分区:" + iterator.next())
        }
        buffer.toIterator
    }).foreach(println)
    

    执行结果如下:help_keyword 这张表只有 600 条左右的数据,本来数据应该均匀分布在 10 个分区,但是 0 分区里面却有 319 条数据,这是因为设置了下限,所有小于 300 的数据都会被限制在第一个分区,即 0 分区。同理所有大于 500 的数据被分配在 9 分区,即最后一个分区。
    在这里插入图片描述

    6.2 写入数据

    val df = spark.read.format("json").load("/usr/file/json/emp.json")
    df.write
    .format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
    .option("user", "root").option("password", "root")
    .option("dbtable", "emp")
    .save()
    

    七、Text

    Text 文件在读写性能方面并没有任何优势,且不能表达明确的数据结构,所以其使用的比较少,读写操作如下:

    7.1 读取Text数据

    spark.read.textFile("/usr/file/txt/dept.txt").show()
    

    7.2 写入Text数据

    df.write.text("/tmp/spark/txt/dept")
    

    八、数据读写高级特性

    8.1 并行读

    多个 Executors 不能同时读取同一个文件,但它们可以同时读取不同的文件。这意味着当您从一个包含多个文件的文件夹中读取数据时,这些文件中的每一个都将成为 DataFrame 中的一个分区,并由可用的 Executors 并行读取。

    8.2 并行写

    写入的文件或数据的数量取决于写入数据时 DataFrame 拥有的分区数量。默认情况下,每个数据分区写一个文件。

    8.3 分区写入

    分区和分桶这两个概念和 Hive 中分区表和分桶表是一致的。都是将数据按照一定规则进行拆分存储。需要注意的是 partitionBy 指定的分区和 RDD 中分区不是一个概念:这里的分区表现为输出目录的子目录,数据分别存储在对应的子目录中。

    val df = spark.read.format("json").load("/usr/file/json/emp.json")
    df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
    

    输出结果如下:可以看到输出被按照部门编号分为三个子目录,子目录中才是对应的输出文件。

    在这里插入图片描述

    8.3 分桶写入

    分桶写入就是将数据按照指定的列和桶数进行散列,目前分桶写入只支持保存为表,实际上这就是 Hive 的分桶表。

    val numberBuckets = 10
    val columnToBucketBy = "empno"
    df.write.format("parquet").mode("overwrite")
    .bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
    

    8.5 文件大小管理

    如果写入产生小文件数量过多,这时会产生大量的元数据开销。Spark 和 HDFS 一样,都不能很好的处理这个问题,这被称为“small file problem”。同时数据文件也不能过大,否则在查询时会有不必要的性能开销,因此要把文件大小控制在一个合理的范围内。

    在上文我们已经介绍过可以通过分区数量来控制生成文件的数量,从而间接控制文件大小。Spark 2.2 引入了一种新的方法,以更自动化的方式控制文件大小,这就是 maxRecordsPerFile 参数,它允许你通过控制写入文件的记录数来控制文件大小。

     // Spark 将确保文件最多包含 5000 条记录
    df.write.option(“maxRecordsPerFile”, 5000)
    

    九、可选配置附录

    9.1 CSV读写可选配置

    读\写操作 配置项 可选值 默认值 描述
    Both seq 任意字符 ,(逗号) 分隔符
    Both header true, false false 文件中的第一行是否为列的名称。
    Read escape 任意字符 \ 转义字符
    Read inferSchema true, false false 是否自动推断列类型
    Read ignoreLeadingWhiteSpace true, false false 是否跳过值前面的空格
    Both ignoreTrailingWhiteSpace true, false false 是否跳过值后面的空格
    Both nullValue 任意字符 “” 声明文件中哪个字符表示空值
    Both nanValue 任意字符 NaN 声明哪个值表示 NaN 或者缺省值
    Both positiveInf 任意字符 Inf 正无穷
    Both negativeInf 任意字符 -Inf 负无穷
    Both compression or codec None,
    uncompressed,
    bzip2, deflate,
    gzip, lz4, or
    snappy
    none 文件压缩格式
    Both dateFormat 任何能转换为 Java 的
    SimpleDataFormat 的字符串
    yyyy-MM-dd 日期格式
    Both timestampFormat 任何能转换为 Java 的
    SimpleDataFormat 的字符串
    yyyy-MMdd’T’HH:mm:ss.SSSZZ 时间戳格式
    Read maxColumns 任意整数 20480 声明文件中的最大列数
    Read maxCharsPerColumn 任意整数 1000000 声明一个列中的最大字符数。
    Read escapeQuotes true, false true 是否应该转义行中的引号。
    Read maxMalformedLogPerPartition 任意整数 10 声明每个分区中最多允许多少条格式错误的数据,超过这个值后格式错误的数据将不会被读取
    Write quoteAll true, false false 指定是否应该将所有值都括在引号中,而不只是转义具有引号字符的值。
    Read multiLine true, false false 是否允许每条完整记录跨域多行

    9.2 JSON读写可选配置

    读\写操作 配置项 可选值 默认值
    Both compression or codec None,
    uncompressed,
    bzip2, deflate,
    gzip, lz4, or
    snappy
    none
    Both dateFormat 任何能转换为 Java 的 SimpleDataFormat 的字符串 yyyy-MM-dd
    Both timestampFormat 任何能转换为 Java 的 SimpleDataFormat 的字符串 yyyy-MMdd’T’HH:mm:ss.SSSZZ
    Read primitiveAsString true, false false
    Read allowComments true, false false
    Read allowUnquotedFieldNames true, false false
    Read allowSingleQuotes true, false true
    Read allowNumericLeadingZeros true, false false
    Read allowBackslashEscapingAnyCharacter true, false false
    Read columnNameOfCorruptRecord true, false Value of spark.sql.column&NameOf
    Read multiLine true, false false

    9.3 数据库读写可选配置

    属性名称 含义
    url 数据库地址
    dbtable 表名称
    driver 数据库驱动
    partitionColumn,
    lowerBound, upperBoun
    分区总数,上界,下界
    numPartitions 可用于表读写并行性的最大分区数。如果要写的分区数量超过这个限制,那么可以调用 coalesce(numpartition) 重置分区数。
    fetchsize 每次往返要获取多少行数据。此选项仅适用于读取数据。
    batchsize 每次往返插入多少行数据,这个选项只适用于写入数据。默认值是 1000。
    isolationLevel 事务隔离级别:可以是 NONE,READ_COMMITTED, READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即标准事务隔离级别。
    默认值是 READ_UNCOMMITTED。这个选项只适用于数据读取。
    createTableOptions 写入数据时自定义创建表的相关配置
    createTableColumnTypes 写入数据时自定义创建列的列类型

    数据库读写更多配置可以参阅官方文档:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

    参考资料

    1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
    2. https://spark.apache.org/docs/latest/sql-data-sources.html
    展开全文
  • 文章目录1.1、External DataSource   标签模型编码中需要从HBase表读写数据,编写 HBaseTools 工具类,其中提供 read ...Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现,接口在 org.apache.spark.

    一、SparkSQL自定义外部数据源

      标签模型编码中需要从HBase表读写数据,编写 HBaseTools 工具类,其中提供 read 和write 方法,传递参数读写表的数据,但是能否实现类似SparkSQL读写MySQL数据库表数据时如下格式:

    在这里插入图片描述

    1.1、External DataSource

      自从Spark 1.3的发布,Spark SQL开始正式支持外部数据源。Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现,接口在 org.apache.spark.sql.sources 包下:interfaces.scala
    在这里插入图片描述
    https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

    1.1、主要两个类为: BaseRelationRelationProvider

     &mesp;如果要实现一个外部数据源,比如hbase数据源,支持Spark SQL操作HBase数据库。那么就必须定义HBaseRelation来继承BaseRelation,同时也要定义DefaultSource实现一个RelationProvider

    • 1)、BaseRelation
      • 代表了一个抽象的数据源;
      • 该数据源由一行行有着已知schema的数据组成(关系表);
      • 展示从DataFrame中产生的底层数据源的关系或者表;
      • 定义如何产生schema信息;
    • 2)、RelationProvider
      • 顾名思义,根据用户提供的参数(parameters)返回一个数据源(BaseRelation)
      • 一个Relation的提供者,创建BaseRelation

    下图表示从SparkSQL提供 外部数据源(External DataSource加载数据 时,需要继承的类说明如下:
    在这里插入图片描述

    1.2、BaseRelation

    1.2.1、BaseRelation是外部数据源的抽象,里面主要存放了schema的映射 。

    abstract class BaseRelation {
    	def sqlContext: SQLContext
    	def schema: StructType
    }
    // 如果自定义Relation,必须重写schema,就是必须描述对于外部数据源的Schema。
    

    在这里插入图片描述

    1.2.2、从外部数据源加载(读取)数据和保存(写入)数据时,提供不同接口实现

    1.2.2.1、 加载数据接口

    提供4种 Scan 策略,加载数据

    在这里插入图片描述

    默认的Scan为 TableScan其中方法 buildScan :定义如何查询外部数据源

    在这里插入图片描述

    其他加载数据Trait的Scan说明

    • PrunedScan :列裁剪的,可以传入指定的列,不需要的列不会从外部数据源加载。
    • PrunedFilteredScan :列裁剪➕过滤,在列裁剪的基础上,并且加入Filter机制,在加载数据也的时候就进行过滤,而不是在客户端请求返回时做Filter。
    • CatalystScan :Catalyst的支持传入expressions来进行Scan,支持列裁剪和Filter。

    1.2.2.2、 保存数据接口

    InsertableRelation :保存数据的Relation
    在这里插入图片描述

    1.3、RelationProvider

      RelationProvider : 获取参数列表,返回一个BaseRelation对象 。要实现这个接口,需要接受 传入的参数 ,来生成对应的·、、External Relation,就是 一个反射生产外部数据源Relation的接口 ,接口Trait定义如下:

    在这里插入图片描述

      上述表示加载数据时构建Relation对象的ProviderRelationProvider,同样保存数据时构建Relation对象的ProviderCreatableRelationProvider ,代码如下:

    在这里插入图片描述

    1.2.3、SparkSQL集成Elasticsearch 代码结构:

    在这里插入图片描述

    1.2.4、SparkSQL集成Kudu 代码结构:

    在这里插入图片描述

      按照上述接口说明,实现自定义外部数据源从HBase表读写数据,包【 com.chb.tags 】结构如下:
    在这里插入图片描述

    类的继承结构图,如下所示:

    在这里插入图片描述

    1.4、自定义HBaseRelation

      自定义 HBaseRelation 类,继承 BaseRelationTableScanInsertableRelation ,此外实现序列化接口 Serializable ,所有类声明如下,其中实现 Serializable 接口为了保证对象可以被序列化和反序列化。

    package com.chb.spark.hase
    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.client.{Put, Result, Scan}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan}
    import org.apache.spark.sql.types.StructType
    
    /**
     * 自定义外部数据源:从HBase表加载数据和保存数据值HBase表
     */
    class HbaseRelation(context: SQLContext,
                        params: Map[String, String],
                        userSchema: StructType
                       ) extends BaseRelation with TableScan with InsertableRelation with Serializable {
      // 将读写HBase表数据时参数属性定义为常量,方便后续使用:
      // 连接HBase数据库的属性名称
      val HBASE_ZK_QUORUM_KEY: String = "hbase.zookeeper.quorum"
      val HBASE_ZK_QUORUM_VALUE: String = "zkHosts"
      val HBASE_ZK_PORT_KEY: String = "hbase.zookeeper.property.clientPort"
      val HBASE_ZK_PORT_VALUE: String = "zkPort"
      val HBASE_TABLE: String = "hbaseTable"
      val HBASE_TABLE_FAMILY: String = "family"
      val SPERATOR: String = ","
      val HBASE_TABLE_SELECT_FIELDS: String = "selectFields"
      val HBASE_TABLE_ROWKEY_NAME: String = "rowKeyColumn"
    
      /**
       * SQLContext实例对象
       */
      override def sqlContext: SQLContext = context
    
      /**
       * DataFrame的Schema信息
       */
      override def schema: StructType = userSchema
    
      /**
       * 如何从HBase表中读取数据,返回RDD[Row]
       */
      override def buildScan(): RDD[Row] = {
        // 1. 设置HBase中Zookeeper集群信息
        val conf: Configuration = new Configuration()
        conf.set(HBASE_ZK_QUORUM_KEY, params(HBASE_ZK_QUORUM_VALUE))
        conf.set(HBASE_ZK_PORT_KEY, params(HBASE_ZK_PORT_VALUE))
    
        // 2. 设置读HBase表的名称
        conf.set(TableInputFormat.INPUT_TABLE, params(HBASE_TABLE))
    
        // 3. 设置读取列簇和列名称
        val scan: Scan = new Scan()
        // 3.1. 设置列簇
        val familyBytes = Bytes.toBytes(params(HBASE_TABLE_FAMILY))
        scan.addFamily(familyBytes)
        // 3.2. 设置列名称
        val fields = params(HBASE_TABLE_SELECT_FIELDS).split(SPERATOR)
        fields.foreach { field =>
          scan.addColumn(familyBytes, Bytes.toBytes(field))
        }
        // 3.3. 设置scan过滤
        conf.set(
          TableInputFormat.SCAN, //
          Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray) //
        )
    
        // 4. 调用底层API,读取HBase表的数据
        val datasRDD: RDD[(ImmutableBytesWritable, Result)] =
          sqlContext.sparkContext
            .newAPIHadoopRDD(
              conf,
              classOf[TableInputFormat],
              classOf[ImmutableBytesWritable],
              classOf[Result]
            )
    
        // 5. 转换为RDD[Row]
        val rowsRDD: RDD[Row] = datasRDD.map { case (_, result) =>
          // 5.1. 列的值
          val values: Seq[String] = fields.map { field =>
            Bytes.toString(result.getValue(familyBytes,
              Bytes.toBytes(field)))
          }
          // 5.2. 生成Row对象
          Row.fromSeq(values)
        }
    
        // 6. 返回
        rowsRDD
      }
    
      /**
       * 将数据DataFrame写入到HBase表中
       *
       * @param data      数据集
       * @param overwrite 保存模式
       */
      override def insert(data: DataFrame, overwrite: Boolean): Unit = {
        // 1. 数据转换
        val columns: Array[String] = data.columns
        val putsRDD: RDD[(ImmutableBytesWritable, Put)] = data.rdd.map {
          row =>
            // 获取RowKey
            val rowKey: String = row.getAs[String](params(HBASE_TABLE_ROWKEY_NAME))
            // 构建Put对象
            val put = new Put(Bytes.toBytes(rowKey))
            // 将每列数据加入Put对象中
            val familyBytes = Bytes.toBytes(params(HBASE_TABLE_FAMILY))
            columns.foreach { column =>
              put.addColumn(
                familyBytes, Bytes.toBytes(column), //
                Bytes.toBytes(row.getAs[String](column)) //
              )
            }
            // 返回二元组
            (new ImmutableBytesWritable(put.getRow), put)
        }
    
        // 2. 设置HBase中Zookeeper集群信息
        val conf: Configuration = new Configuration()
        conf.set(HBASE_ZK_QUORUM_KEY, params(HBASE_ZK_QUORUM_VALUE))
        conf.set(HBASE_ZK_PORT_KEY, params(HBASE_ZK_PORT_VALUE))
    
        // 3. 设置读HBase表的名称
        conf.set(TableOutputFormat.OUTPUT_TABLE, params(HBASE_TABLE))
    
        // 4. 保存数据到表
        putsRDD.saveAsNewAPIHadoopFile(
          s"/apps/hbase/${params(HBASE_TABLE)}-" +
            System.currentTimeMillis(),
          classOf[ImmutableBytesWritable], //
          classOf[Put], //
          classOf[TableOutputFormat[ImmutableBytesWritable]], //
          conf //
        )
      }
    }
    
    

    1.5、自定义 DefaultSource

      自定义类 DefaultSource,继承 RelationProviderCreatableRelationProvider 便于读写数据,此外继承 Serializable为了更好序列化和反序列化对象

    package com.chb.spark.hase
    
    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
    import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    /**
     * 自定义外部数据源HBase,提供BaseRelation对象,用于加载数据和保存数据
     */
    class DefaultSource extends RelationProvider with CreatableRelationProvider with Serializable {
      // 参数信息
      val HBASE_TABLE_SELECT_FIELDS: String = "selectFields"
    
      val SPERATOR: String = ","
    
      /**
       * 返回BaseRelation实例对象,提供加载数据功能
       *
       * @param sqlContext SQLContext实例对象
       * @param parameters 参数信息
       * @return
       */
      override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]):
      BaseRelation = {
        // 1. 定义Schema信息
        val schema: StructType = StructType(
          parameters(HBASE_TABLE_SELECT_FIELDS)
            .split(SPERATOR)
            .map { field =>
              StructField(field, StringType, nullable = true)
            }
        )
        // 2. 创建HBaseRelation对象
        val relation = new HbaseRelation(sqlContext, parameters, schema)
        // 3. 返回对象
        relation
      }
    
      /**
       * 返回BaseRelation实例对象,提供保存数据功能
       *
       * @param sqlContext SQLContext实例对象
       * @param mode       保存模式
       * @param parameters 参数
       * @param data       数据集
       * @return
       */
      override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
        // 1. 创建HBaseRelation对象
        val relation = new HbaseRelation(sqlContext, parameters,
          data.schema)
        // 2. 插入数据
        relation.insert(data, overwrite = true)
        // 3. 返回对象
        relation
      }
    
    }
    
    

    1.6、测试功能代码

    package com.chb.tags.test.hbase
    
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    
    /**
     * 测试自定义外部数据源实现从HBase表读写数据接口
     */
    object HBaseSQLTest {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          .master("local[4]")
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .getOrCreate()
    
        import spark.implicits._
    
        // 读取数据
        val usersDF: DataFrame = spark.read
          .format("com.chb.tags.spark.hbase")
          .option("zkHosts", "ch1")
          .option("zkPort", "2181")
          .option("hbaseTable", "tbl_tag_users")
          .option("family", "detail")
          .option("selectFields", "id,gender")
          .load()
        usersDF.printSchema()
        usersDF.cache()
        usersDF.show(10, truncate = false)
    
    
        // 保存数据
        usersDF.write
          .mode(SaveMode.Overwrite)
          .format("com.chb.tags.spark.hbase")
          .option("zkHosts", "chb")
          .option("zkPort", "2181")
          .option("hbaseTable", "tbl_users")
          .option("family", "info")
          .option("rowKeyColumn", "id")
          .save()
        spark.stop()
      }
    }
    
    

    1.7、注册数据源

      在SparkSQL提供外部数据源接口中提供 DataSourceRegister 类,实现 shortName 方法,方便调用函数使用简写名称:

    trait DataSourceRegister {
    /**
    * The string that represents the format that this data source provider
    uses. This is
    * overridden by children to provide a nice alias for the data source.
    For example:
    *
    * {{{
    * override def shortName(): String = "parquet"
    * }}}
    *
    * @since 1.5.0
    */
    def shortName(): String
    }
    

    默认数据源类 DefaultSource ,继承 DataSourceRegister ,实现其中 shortame 方法,核心代码:

    /**
    * 自定义外部数据源HBase,提供BaseRelation对象,用于加载数据和保存数据
    */
    class DefaultSource extends RelationProvider
    with CreatableRelationProvider with DataSourceRegister with
    Serializable{
    /**
    * 数据源使用简短名称
    */
    override def shortName(): String = "hbase"
    }
    

    需要进行相关配置,实现如下方式加载与保存数据至HBase:
    在这里插入图片描述

    将实现SparkSQL外部数据源接口类: DefaultSource 进行注册,方便开发使用,首先看看SparkSQL与Kafka集成时注册方式如下:

    在这里插入图片描述

    所以在项目【 resources 】目录下创建库目录【 META-INF/services 】,并且创建文件【 org.apache.spark.sql.sources.DataSourceRegister】,内容为数据源主类【 com.chb.tags.spark.hbase.DefaultSource 】,如下图:
    在这里插入图片描述

    注意:在资源文件目录 resources 下创建目录【 META-INF/services 】为两层目录,不要写错。

    二、条件过滤

      在开发 商业属性(消费特征)标签 时,每个标签计算时处理【 最近半年内订单数据 】,需要如下图所示:

    在这里插入图片描述

    所以在从数据源中读取数据时,需要先设置过滤条件(filter)再读取(read)数据

    在【订单表】数据中字段【modified】标识了数据最后修改时间,可以以此作为数据所属时间范围的判断。

    需要修改SparkSQL外部数据源HBase开发接口实现类代码,完成设置过滤条件筛选数据的功能。

    2.1、Hbase Filter

      在HBase中提供 过滤器 ,在读取扫描数据时进行过滤,其中SingleColumnValueFilter 针对 某个字段的值进行比较过滤 获取数据,支持如下几种比较操作 CompareOp

    public enum CompareOp {
    /** less than */
    LESS,
    /** less than or equal to */
    LESS_OR_EQUAL,
    /** equals */
    EQUAL,
    /** not equal */
    NOT_EQUAL,
    /** greater than or equal to */
    GREATER_OR_EQUAL,
    /** greater than */
    GREATER,
    /** no operation */
    NO_OP,
    }
    

    枚举比较器 CompareOp 中各个枚举的含义:
    在这里插入图片描述

    构建 SingleColumnValueFilter 实例对象,构造方法如下:
    在这里插入图片描述

    在MySQL数据库中针对订单表【 tbl_tag_orders 】数据进行如下查询统计。

    SELECT MIN(modified) AS min_date, MAX(modified) AS max_date FROM
    tbl_tag_orders ;
    /*
    +---------------------+---------------------+
    | min_date | max_date |
    +---------------------+---------------------+
    | 2019-07-19 00:56:42 | 2019-09-25 22:40:55 |
    +---------------------+---------------------+
    */
    SELECT COUNT(1) AS total FROM tbl_tag_orders WHERE modified >= '2019-09-01'
    ;
    /*
    +--------+
    | total |
    +--------+
    | 113735 |
    +--------+
    
    */
    SELECT COUNT(1) AS total FROM tbl_tag_orders WHERE modified < '2019-09-01'
    ;
    /*
    +-------+
    | total |
    +-------+
    | 6390 |
    +-------+
    */
    SELECT COUNT(1) AS total FROM tbl_tag_orders ;
    /*
    +--------+
    | total |
    +--------+
    | 120125 |
    +--------+
    */
    

    2.1.1、从HBase表读取数据时,设置过滤条件:【 modified < '2019-09-01'在这里插入图片描述

    2.2、HbaseRelation优化,实现Filter

    展开全文
  • 自定义Spark的外部数据源读取文件建立scala工程自定义数据源BaseRelationRelationProviderTableScan Spark提供了自定义外部数据源的功能,可以根据自身需要自定义数据读取获取dataframe,写法是 val df = spark....
  • 如何使用微搭低代码平台外部数据源
  • Spark SQL 外部数据源

    2018-11-22 16:43:02
    Spark SQL 外部数据源 1. 概述 外部数据源API方便快速从不同的数据源(json,parquet,rdbms)引入处理数据,经过混合处理,写回到指定文件系统上去。 2. 操作parquet文件数据 读数据 spark.read.format(“parquet”)...
  • 外部用户界面 用于外部数据源NCB的UI组件
  • Spark 外部数据源

    2017-12-27 17:03:31
    External Data Sources rdbms,need JDBC jarsParquet...已使用avro外部数据源为例: 参考【https://spark-packages.org/】这data Sources部分 With spark-shell or spark-submit $ bin/spark-shel
  • 勤哲电子表格服务器 MY SQ外部数据源教程 ES引用 作者: 日期: ES2010引用MYSQL外部数据源教程 整理安哥 qq:29154754 2 一 安装mysql的ODBC驱动程序mysql官网有下载 二 mysqlODBC数据源配置 1mysql驱动程序安装好后点...
  • spark自定义外部数据源

    千次阅读 2019-08-03 13:06:27
    对于spark外部数据源来说,要先了解这几个类 BaseRelation:定义数据的schema信息,把我们的数据转成RDD[Row] RelationProvider:是一个relation的提供者,创建BaseRelation TableScan:读取数据并构建行,拿出所有的...
  • spark02 外部数据源

    2021-03-09 15:56:48
    spark02 外部数据源 read text import spark.implicits._ val df = spark.read.format("text").load("data/input/ck") df.map(x => { val splits = x.getString(0).split(" ") (splits(0), splits(1)) })....
  • Spark连接外部数据源

    2021-01-11 08:21:07
    Spark连接外部数据源spark -> hivespark集成hiveIDEA连接hive spark -> hive spark集成hive 将hive110/conf目录下的hive-site.xml复制到spark/conf目录下; cp /opt/software/hadoop/hive110/conf/hive-...
  • Spark SQL外部数据源

    2019-04-24 17:04:10
    外部数据源 package com.kaola.bigdata.sparksql03 import org.apache.spark.sql.SparkSession object DataSourceAPIApp { def main(args: Array[String]): Unit = { val spark = SparkSession .builder(...
  • Spark SQL读取外部数据源 1、Spark SQL可以加载任何地方的数据,例如mysql,hive,hdfs,hbase等,而且支持很多种格式如json, parquet, avro, csv格式。 2、通过外部数据源API读取各种格式的数据,会得到一个...
  • Spark处理外部数据源

    2018-06-01 22:49:32
    产生背景:1.数据以各种格式存储在系统中2加载和保存数据不容易(Hive和mysql之间)3.数据存在各种类型,不好解析4.转换数据格式5.格式转换6.用户希望方便快速从不同...出现时间:Spark Sql1.2出现了外部数据源API...
  • 1、产生背景  用户需求:方便快速从不同的数据源(json、parquet、rdbms),经过混合处理(json join parquet),再将处理结果以特定的格式(son、... 外部数据源API  Loading and saving Data is not easy  Pars...
  • 自定义DataObject / ModelAdmin实现,可与外部数据源一起使用 目的是有一种简单的方法将ModelAdmin和GridField与来自外部数据源的数据一起使用。 只需创建一个扩展ExternalDataObject的模型并实现您自己的get() , ...
  • Spark SQL Spark SQL是Spark的核心组件之一(2014.4 ...支持多种外部数据源:Parquet、JSON、RDBMS等 SparkContext SQLContext Spark SQL的编程入口 HiveContext SQLContext的子集,包含更多功能 SparkSession(S
  • 实时更新Excel文档外部数据源的数据 单元格区域、Excel 表、数据透视表或数据透视图均可以连接到外部数据源(数据源:用于连接数据库的一组存储的“源”信息。数据源包含数据库服务器的名称和位置、数据库驱动程序...
  • spark2.0中,提供了两种扩展外部数据源的接口, 第一种外部数据源为文件,第二种外部数据源为系统 spark内部调用外部数据源包的类是下面,包括解析BaseRelation,提取schema等 package org.apache.spark.sql....
  • 实现一个外部数据源

    2019-01-05 22:56:26
    文章目录JDBCrelationprovider模仿...要定义一个外部数据源可以参考 JDBCrelation和JDBCrelationprovider。 JDBCrelation相当于用户可以使用里面的方法实现数据s...
  • 网络EXCEL平台外部数据源注册数据字段对应规则,如果部份没有的可以简单修改增加
  • 一:介绍 ...随着Spark1.2的发布,Spark SQL开始正式支持外部数据源。Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现。 这使得Spark SQL支持了更多的类型数据源,如json, parquet, a...
  • ES2010引用MYSQL外部数据源教程 整理:安哥 qq:29154754 zxa2000@163.com 20130827 一安装mysql的ODBC驱动程序mysql官网有下载 二mysqlODBC数据源配置 1mysql驱动程序安装好后点开始/管理工具/数据源 点添加 点完成 ...
  • 外部数据源 该项目说明了 Spark 2.3.0 中引入的新 V2 Apache Spark 外部数据源 API。 它包括: 一个简单的内存数据库系统 (ExampleDB),它支持说明 API 特性所需的所有数据访问范式 一系列不同复杂度的数据源,全部...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,388
精华内容 2,955
关键字:

外部数据源