精华内容
下载资源
问答
  • spark自定义外部数据源

    千次阅读 2019-08-03 13:06:27
    对于spark外部数据源来说,要先了解这几个类 BaseRelation:定义数据的schema信息,把我们的数据转成RDD[Row] RelationProvider:是一个relation的提供者,创建BaseRelation TableScan:读取数据并构建行,拿出所有的...

    对于spark外部数据源来说,要先了解这几个类

    BaseRelation:定义数据的schema信息,把我们的数据转成RDD[Row]
    RelationProvider:是一个relation的提供者,创建BaseRelation
    TableScan:读取数据并构建行,拿出所有的数据
    PrunedScan:列裁剪的
    PrunedFilteredScan:列裁剪➕过滤

    InsertableRelation:回写数据的relation

    insertableRelation有如下假设需要注意:
    /**

    • A BaseRelation that can be used to insert data into it through the insert method.
    • If overwrite in insert method is true, the old data in the relation should be overwritten with
    • the new data. If overwrite in insert method is false, the new data should be appended.
    • InsertableRelation has the following three assumptions.
      1. It assumes that the data (Rows in the DataFrame) provided to the insert method
    • exactly matches the ordinal of fields in the schema of the BaseRelation.
      1. It assumes that the schema of this relation will not be changed.
    • Even if the insert method updates the schema (e.g. a relation of JSON or Parquet data may have a
    • schema update after an insert operation), the new schema will not be used.
      1. It assumes that fields of the data provided in the insert method are nullable.
    • If a data source needs to check the actual nullability of a field, it needs to do it in the
    • insert method.
    • @since 1.3.0
      */
      @InterfaceStability.Stable
      trait InsertableRelation {
      def insert(data: DataFrame, overwrite: Boolean): Unit
      }

    对于spark外部数据源来说,要先了解这几个类

    BaseRelation:定义数据的schema信息,把我们的数据转成RDD[Row]
    RelationProvider:是一个relation的提供者,创建BaseRelation
    TableScan:读取数据并构建行,拿出所有的数据
    PrunedScan:列裁剪的
    PrunedFilteredScan:列裁剪➕过滤

    InsertableRelation:回写数据的relation

    insertableRelation有如下假设需要注意:
    /**

    • A BaseRelation that can be used to insert data into it through the insert method.
    • If overwrite in insert method is true, the old data in the relation should be overwritten with
    • the new data. If overwrite in insert method is false, the new data should be appended.
    • InsertableRelation has the following three assumptions.
      1. It assumes that the data (Rows in the DataFrame) provided to the insert method
    • exactly matches the ordinal of fields in the schema of the BaseRelation.
      1. It assumes that the schema of this relation will not be changed.
    • Even if the insert method updates the schema (e.g. a relation of JSON or Parquet data may have a
    • schema update after an insert operation), the new schema will not be used.
      1. It assumes that fields of the data provided in the insert method are nullable.
    • If a data source needs to check the actual nullability of a field, it needs to do it in the
    • insert method.
    • @since 1.3.0
      */
      @InterfaceStability.Stable
      trait InsertableRelation {
      def insert(data: DataFrame, overwrite: Boolean): Unit
      }

    下面我们可以简单看一下JDBCRelation

    private[sql] case class JDBCRelation(
    parts: Array[Partition], jdbcOptions: JDBCOptions)(@transient val sparkSession: SparkSession)
    extends BaseRelation
    with PrunedFilteredScan
    with InsertableRelation

    继承了 BaseRelation ,BaseRelation 是一个抽象类,就需要实现他的属性和方法:
    override def sqlContext: SQLContext = sparkSession.sqlContext

    override val needConversion: Boolean = false

    override val schema: StructType = {
    val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
    jdbcOptions.customSchema match {
    case Some(customSchema) => JdbcUtils.getCustomSchema(
    tableSchema, customSchema, sparkSession.sessionState.conf.resolver)
    case None => tableSchema
    }
    }

    在这里插入图片描述

    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
    import org.apache.spark.sql.types.StructType
    
    /**
      * @Author: lih
      * @Date: 2019/8/2 11:40 PM
      * @Version 1.0
      */
    class DefaultSource extends RelationProvider with SchemaRelationProvider{
      override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
        createRelation(sqlContext,parameters,null)
      }
    
      override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
        val path = parameters.get("path")
    
        path match {
          case Some(p)=> new TextDataSourceRelation(sqlContext,p,schema)
          case _=> throw new IllegalArgumentException("path is required ...")
        }
    
    
      }
    }
    
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.sql.sources._
    import org.apache.spark.sql.types._
    
    /**
      * @Author: lih
      * @Date: 2019/8/2 11:41 PM
      * @Version 1.0
      */
    class TextDataSourceRelation(override val sqlContext: SQLContext,
                                 path: String,
                                 userSchema: StructType
                                ) extends BaseRelation with TableScan with Logging with Serializable {
    
      override def schema: StructType = {
        if (userSchema != null) {
          userSchema
        } else {
          StructType(
            StructField("id", LongType, false) ::
              StructField("name", StringType, false) ::
              StructField("gender", StringType, false) ::
              StructField("salar", LongType, false) ::
              StructField("comm", LongType, false) :: Nil
          )
        }
      }
    
    
      override def buildScan(): RDD[Row] = {
        logError("this is ruozedata custom buildScan...")
    
        var rdd = sqlContext.sparkContext.wholeTextFiles(path).map(_._2)
        val schemaField = schema.fields
    
        // rdd + schemaField
        val rows = rdd.map(fileContent => {
          val lines = fileContent.split("\n")
          val data = lines.map(_.split(",").map(x=>x.trim)).toSeq
    
          val result = data.map(x => x.zipWithIndex.map{
            case  (value, index) => {
    
              val columnName = schemaField(index).name
    
               caseTo(if(columnName.equalsIgnoreCase("gender")) {
                if(value == "0") {
                  "男"
                } else if(value == "1"){
                  "女"
                } else {
                  "未知"
                }
              } else {
                value
              }, schemaField(index).dataType)
            }
          })
          result.map(x => Row.fromSeq(x))
        })
    
        rows.flatMap(x=>x)
      }
    
    
      def caseTo(value: String, dataType: DataType) = {
          dataType match {
            case _:DoubleType => value.toDouble
            case _:LongType => value.toLong
            case _:StringType => value
        }
      }
    }
    

    数据:
    1,li,0,100000,2000
    2,zhang,1,20000,23223

    object Test {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder()
          .appName(this.getClass.getSimpleName).master("local[8]")
          .getOrCreate()
    
    
        val df = spark
          .read
          .format("com.ztgx.datasourse.DefaultSource")
          .load("file:///Users/mac/Desktop/1.txt")
    
        df.show()
    
        spark.stop()
      }
    
    }
    
    

    结果:

    +---+-----+------+------+-----+
    | id| name|gender| salar| comm|
    +---+-----+------+------+-----+
    |  1|   li|     男|100000| 2000|
    |  2|zhang|     女| 20000|23223|
    +---+-----+------+------+-----+
    
    展开全文
  • 对于大数据行业的从业者和未来从业者来说,HBase热点和Spark自定义外部数据源你可能都听过,但是你真的了解它吗?#什么是HBase热点?##它会造...

    对于大数据行业的从业者和未来从业者来说,HBase热点和Spark自定义外部数据源你可能都听过,但是你真的了解它吗?

     

    #什么是HBase热点?#

    #它会造成什么问题?#

    #如何解决HBase热点问题?#

     

    #什么是Spark自定义外部数据源?#

    #为什么要自定义外部数据源?#

    #数据源相关三大核心是什么?#

     

    3月27日(本周五)晚8点,为你邀请到李科老师,深度讲解HBase热点和Spark自定义外部数据源的那些事儿。

    Java、大数据多年实战经验的大咖导师

    HBase热点和Spark自定义外部数据源实战案例详解

    原价¥299,限时9.8,仅限前100人

    赶紧长按二维码加入学习群吧

    ↓ ↓ ↓

    更多AI工程师成长规划、BAT大厂面试技巧等干货,直播等你来聊!

    展开全文
  • BaseRelation:展示从DataFrame中产生的底层数据源的关系或者表。定义如何产生schema信息。或者说是数据源的关系。 RelationProvider:获取参数列表,返回一个BaseRelation对象。 TableScan:对数据的schame信息,...

    Data Source API

    Basic Interfaces

    • BaseRelation:展示从DataFrame中产生的底层数据源的关系或者表。定义如何产生schema信息。或者说是数据源的关系。
    • RelationProvider:获取参数列表,返回一个BaseRelation对象。
    • TableScan:对数据的schame信息,进行完整扫描,返回一个没有过滤的RDD。
    • DataSourceRegister:定义数据源的简写。

    Providers

    • SchemaRelationProvider:用户可以自定义schema信息。
    • CreatableRelationProvider:用户可以定义从DataFrame中产生新的Relation。

    Scans

    • PrunedScan:自定义方法,删除不需要的列。
    • PrunedFilteredScan:自定义方法,删除不需要的列,并且对列的值进行过滤。
    • CatalystScan:用于试验与查询计划程序的更直接连接的界面。

    Relations

    • InsertableRelation:插入数据。三个假设:1.插入方法提供的数据与BaseRelation中定义的Schame信息匹配到;2.schema信息不变;3.插入方法中的数据都是可以为null的。
    • HadoopFsRelation:Hadoop 文件系统的数据源。

    Output Interfaces

    如果使用HadoopFsRelation,会使用到这一块。

    准备工作

    数据格式

    使用文本数据作为数据源,文件中的数据都是以都好分割,行之间以回车为分隔符,数据的格式为:

     

    //编号,名称,性别(1为男性,0为女性),工资,费用
    10001,Alice,0,30000,12000
    

    创建项目

    在IDEA中创建一个maven项目,添加相应的spark、scala依赖。

     

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
    </properties>
    
    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
    </repositories>
    
    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>
    
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.4</version>
        </dependency>
        <dependency>
            <groupId>org.specs</groupId>
            <artifactId>specs</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
    </dependencies>
    

    开始编写自定义数据源

    创建Schema信息

    为了自定义Schema信息,必须要创建一个DefaultSource的类(源码规定,如果不命名为DefaultSource,会报找不到DefaultSource类的错误)。
    还需要继承RelationProvider和SchemaRelationProvider。RelationProvider用来创建数据的关系,SchemaRelationProvider用来明确schema信息。
    在编写DefaultSource.scala文件时,如果文件存在的情况下,需要创建相应的Relation来根据路径读取文件。

    DefaultSource.scala文件代码:

     

    class DefaultSource
        extends RelationProvider
        with SchemaRelationProvider {
      override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
        createRelation(sqlContext,parameters,null)
      }
    
      override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
        val path=parameters.get("path")
        path match {
          case Some(p) => new TextDataSourceRelation(sqlContext,p,schema)
          case _ => throw new IllegalArgumentException("Path is required for custom-datasource format!!")
        }
      }
    }
    

    在编写Relation时,需要实现BaseRelation来重写自定数据源的schema信息。如果是parquet/csv/json文件,可以直接获取schema信息。
    然后实现序列化接口,为了网络传输。

    TextDataSourceRelation.scala文件的代码:

     

    class TextDataSourceRelation(override val sqlContext : SQLContext, path : String, userSchema : StructType)
        extends BaseRelation
          with Serializable {
      override def schema: StructType = {
        if(userSchema!=null){
          userSchema
        }else{
          StructType(
            StructField("id",IntegerType,false) ::
            StructField("name",StringType,false) ::
            StructField("gender",StringType,false) ::
            StructField("salary",LongType,false) ::
            StructField("expenses",LongType,false) :: Nil)
        }
      }
    }
    

    根据上面编写代码,可以简单测试一下是否可以拿到正确的schema信息。
    在编写测试方法时,使用sqlContext.read来读取文件,使用format参数来指定自定义数据源的包路径,使用printSchema()验证是否可以拿到相应的schema信息。

     

    object TestApp extends App {
      println("Application Started...")
      val conf=new SparkConf().setAppName("spark-custom-datasource")
      val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
      val df=spark.sqlContext.read.format("com.edu.spark.text").load("/Users/Downloads/data")
      println("output schema...")
      df.printSchema()
      println("Application Ended...")
    }
    

    输出的schema信息如下:

     

    output schema...
    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = false)
     |-- gender: string (nullable = false)
     |-- salary: long (nullable = false)
     |-- expenses: long (nullable = false)
    

    通过输出的schema,与自己定义的schema一致。

    读取数据

    为了读取数据,TextDataSourceRelation需要实现TableScan,实现buildScan()方法。
    这个方法会将数据以Row组成的RDD的形式返回数据,每一个Row表示一行数据。
    在读取文件时,使用WholeTextFiles根据指定的路径来读取文件,返回的形式为(文件名,内容)。
    在读取数据之后,然后按照逗号分割数据,将性别这个字段根据数字转换为相应的字符串,然后根据在shema信息,转换为相应的类型。

    转换的代码如下:

     

    object Util {
      def castTo(value : String, dataType : DataType) ={
        dataType match {
          case _ : IntegerType => value.toInt
          case _ : LongType => value.toLong
          case _ : StringType => value
        }
      }
    }
    

    实现TableScan的代码:

     

    override def buildScan(): RDD[Row] = {
        println("TableScan: buildScan called...")
        val schemaFields = schema.fields
        // Reading the file's content
        val rdd = sqlContext.sparkContext.wholeTextFiles(path).map(f => f._2)
        
        val rows = rdd.map(fileContent => {
          val lines = fileContent.split("\n")
          val data = lines.map(line => line.split(",").map(word => word.trim).toSeq)
          val tmp = data.map(words => words.zipWithIndex.map{
            case (value, index) =>
              val colName = schemaFields(index).name
              Util.castTo(
                if (colName.equalsIgnoreCase("gender")) {
                  if(value.toInt == 1) {
                    "Male"
                  } else {
                    "Female"
                  }
                } else {
                  value
                }, schemaFields(index).dataType)
          })
          tmp.map(s => Row.fromSeq(s))
        })
        rows.flatMap(e => e)
    }
    
    

    测试是否可以读取到数据的代码:

     

    object TestApp extends App {
      println("Application Started...")
      val conf=new SparkConf().setAppName("spark-custom-datasource")
      val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
      val df=spark.sqlContext.read.format("com.edu.spark.text").load("/Users/Downloads/data")
      df.show()
      println("Application Ended...")
    }
    

    拿到的数据为:

     

    +-----+---------------+------+------+--------+
    |   id|           name|gender|salary|expenses|
    +-----+---------------+------+------+--------+
    |10002|    Alice Heady|Female| 20000|    8000|
    |10003|    Jenny Brown|Female| 30000|  120000|
    |10004|     Bob Hayden|  Male| 40000|   16000|
    |10005|    Cindy Heady|Female| 50000|   20000|
    |10006|     Doug Brown|  Male| 60000|   24000|
    |10007|Carolina Hayden|Female| 70000|  280000|
    +-----+---------------+------+------+--------+
    

    写数据

    本代码有两种编写方法:自定义格式和Json。
    继承CreateTableRelationProvider,实现createRelation方法。

     

    class DefaultSource
        extends RelationProvider
        with SchemaRelationProvider
        with CreatableRelationProvider {
      override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
        createRelation(sqlContext,parameters,null)
      }
    
      override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
        val path=parameters.get("path")
        path match {
          case Some(p) => new TextDataSourceRelation(sqlContext,p,schema)
          case _ => throw new IllegalArgumentException("Path is required for custom-datasource format!!")
        }
      }
    
      override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
        val path = parameters.getOrElse("path", "./output/") //can throw an exception/error, it's just for this tutorial
        val fsPath = new Path(path)
        val fs = fsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
    
        mode match {
          case SaveMode.Append => sys.error("Append mode is not supported by " + this.getClass.getCanonicalName); sys.exit(1)
          case SaveMode.Overwrite => fs.delete(fsPath, true)
          case SaveMode.ErrorIfExists => sys.error("Given path: " + path + " already exists!!"); sys.exit(1)
          case SaveMode.Ignore => sys.exit()
        }
    
        val formatName = parameters.getOrElse("format", "customFormat")
        formatName match {
          case "customFormat" => saveAsCustomFormat(data, path, mode)
          case "json" => saveAsJson(data, path, mode)
          case _ => throw new IllegalArgumentException(formatName + " is not supported!!!")
        }
        createRelation(sqlContext, parameters, data.schema)
      }
      private def saveAsJson(data : DataFrame, path : String, mode: SaveMode): Unit = {
        /**
          * Here, I am using the dataframe's Api for storing it as json.
          * you can have your own apis and ways for saving!!
          */
        data.write.mode(mode).json(path)
      }
    
      private def saveAsCustomFormat(data : DataFrame, path : String, mode: SaveMode): Unit = {
        /**
          * Here, I am  going to save this as simple text file which has values separated by "|".
          * But you can have your own way to store without any restriction.
          */
        val customFormatRDD = data.rdd.map(row => {
          row.toSeq.map(value => value.toString).mkString("|")
        })
        customFormatRDD.saveAsTextFile(path)
      }
    }
    

    测试代码:

     

    object TestApp extends App {
      println("Application Started...")
      val conf=new SparkConf().setAppName("spark-custom-datasource")
      val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
      val df=spark.sqlContext.read.format("com.edu.spark.text").load("/Users/Downloads/data")
      //save the data
        df.write.options(Map("format" -> "customFormat")).mode(SaveMode.Overwrite).format("com.edu.spark.text").save("/Users//Downloads/out_custom/")
        df.write.options(Map("format" -> "json")).mode(SaveMode.Overwrite).format("com.edu.spark.text").save("/Users//Downloads/out_json/")
        df.write.mode(SaveMode.Overwrite).format("com.edu.spark.text").save("/Users//Downloads/out_none/")
      println("Application Ended...")
    }
    

    输出的结果:

    自定义格式:

     

    10002|Alice Heady|Female|20000|8000
    10003|Jenny Brown|Female|30000|120000
    10004|Bob Hayden|Male|40000|16000
    10005|Cindy Heady|Female|50000|20000
    10006|Doug Brown|Male|60000|24000
    10007|Carolina Hayden|Female|70000|280000
    

    Json格式:

     

    {"id":10002,"name":"Alice Heady","gender":"Female","salary":20000,"expenses":8000}
    {"id":10003,"name":"Jenny Brown","gender":"Female","salary":30000,"expenses":120000}
    {"id":10004,"name":"Bob Hayden","gender":"Male","salary":40000,"expenses":16000}
    {"id":10005,"name":"Cindy Heady","gender":"Female","salary":50000,"expenses":20000}
    {"id":10006,"name":"Doug Brown","gender":"Male","salary":60000,"expenses":24000}
    {"id":10007,"name":"Carolina Hayden","gender":"Female","salary":70000,"expenses":280000}
    

    修建列

    继承PrunedScan,实现buildScan方法,只展示需要的项。

     

    override def buildScan(requiredColumns: Array[String]): RDD[Row] = {
        println("PrunedScan: buildScan called...")
        
        val schemaFields = schema.fields
        // Reading the file's content
        val rdd = sqlContext.sparkContext.wholeTextFiles(path).map(f => f._2)
        
        val rows = rdd.map(fileContent => {
          val lines = fileContent.split("\n")
          val data = lines.map(line => line.split(",").map(word => word.trim).toSeq)
          val tmp = data.map(words => words.zipWithIndex.map{
            case (value, index) =>
              val colName = schemaFields(index).name
              val castedValue = Util.castTo(if (colName.equalsIgnoreCase("gender")) {if(value.toInt == 1) "Male" else "Female"} else value,
                schemaFields(index).dataType)
              if (requiredColumns.contains(colName)) Some(castedValue) else None
          })
        
          tmp.map(s => Row.fromSeq(s.filter(_.isDefined).map(value => value.get)))
        })
        
        rows.flatMap(e => e)
    }
    

    测试代码:

     

    object TestApp extends App {
      println("Application Started...")
      val conf=new SparkConf().setAppName("spark-custom-datasource")
      val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
      val df=spark.sqlContext.read.format("com.edu.spark.text").load("/Users/Downloads/data")
      //select some specific columns
      df.createOrReplaceTempView("test")
      spark.sql("select id, name, salary from test").show()
      println("Application Ended...")
    }
    

    输出的结果为:

     

    +-----+---------------+------+
    |10002|    Alice Heady| 20000|
    |10003|    Jenny Brown| 30000|
    |10004|     Bob Hayden| 40000|
    |10005|    Cindy Heady| 50000|
    |10006|     Doug Brown| 60000|
    |10007|Carolina Hayden| 70000|
    +-----+---------------+------+
    

    过滤

    继承PrunedFilterScan,实现buildScan方法,只展示需要的项。

     

    override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
        println("PrunedFilterScan: buildScan called...")
        
        println("Filters: ")
        filters.foreach(f => println(f.toString))
        
        var customFilters: Map[String, List[CustomFilter]] = Map[String, List[CustomFilter]]()
        filters.foreach( f => f match {
          case EqualTo(attr, value) =>
            println("EqualTo filter is used!!" + "Attribute: " + attr + " Value: " + value)
        
            /**
              * as we are implementing only one filter for now, you can think that this below line doesn't mak emuch sense
              * because any attribute can be equal to one value at a time. so what's the purpose of storing the same filter
              * again if there are.
              * but it will be useful when we have more than one filter on the same attribute. Take the below condition
              * for example:
              * attr > 5 && attr < 10
              * so for such cases, it's better to keep a list.
              * you can add some more filters in this code and try them. Here, we are implementing only equalTo filter
              * for understanding of this concept.
              */
            customFilters = customFilters ++ Map(attr -> {
              customFilters.getOrElse(attr, List[CustomFilter]()) :+ new CustomFilter(attr, value, "equalTo")
            })
          case GreaterThan(attr,value) =>
            println("GreaterThan Filter is used!!"+ "Attribute: " + attr + " Value: " + value)
            customFilters = customFilters ++ Map(attr -> {
              customFilters.getOrElse(attr, List[CustomFilter]()) :+ new CustomFilter(attr, value, "greaterThan")
            })
          case _ => println("filter: " + f.toString + " is not implemented by us!!")
        })
        
        val schemaFields = schema.fields
        // Reading the file's content
        val rdd = sqlContext.sparkContext.wholeTextFiles(path).map(f => f._2)
        
        val rows = rdd.map(file => {
          val lines = file.split("\n")
          val data = lines.map(line => line.split(",").map(word => word.trim).toSeq)
        
          val filteredData = data.map(s => if (customFilters.nonEmpty) {
            var includeInResultSet = true
            s.zipWithIndex.foreach {
              case (value, index) =>
                val attr = schemaFields(index).name
                val filtersList = customFilters.getOrElse(attr, List())
                if (filtersList.nonEmpty) {
                  if (CustomFilter.applyFilters(filtersList, value, schema)) {
                  } else {
                    includeInResultSet = false
                  }
                }
            }
            if (includeInResultSet) s else Seq()
          } else s)
        
          val tmp = filteredData.filter(_.nonEmpty).map(s => s.zipWithIndex.map {
            case (value, index) =>
              val colName = schemaFields(index).name
              val castedValue = Util.castTo(if (colName.equalsIgnoreCase("gender")) {
                if (value.toInt == 1) "Male" else "Female"
              } else value,
                schemaFields(index).dataType)
              if (requiredColumns.contains(colName)) Some(castedValue) else None
          })
        
          tmp.map(s => Row.fromSeq(s.filter(_.isDefined).map(value => value.get)))
        })
        
        rows.flatMap(e => e)
    }
    

    测试代码:

     

    object TestApp extends App {
      println("Application Started...")
      val conf=new SparkConf().setAppName("spark-custom-datasource")
      val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
      val df=spark.sqlContext.read.format("com.edu.spark.text").load("/Users/Downloads/data")
      //filter data
      df.createOrReplaceTempView("test")
      spark.sql("select id,name,gender from test where salary == 50000").show()
    
      println("Application Ended...")
    }
    

    输出的结果为:

     

    +-----+-----------+------+
    |   id|       name|gender|
    +-----+-----------+------+
    |10005|Cindy Heady|Female|
    +-----+-----------+------+
    

    注册自定义数据源

    实现DataSourceRegister的shortName。

    实现代码如下:

     

    override def shortName(): String = "udftext"
    

    然后在resource目录下,创建文件为META-INF/services/org.apache.spark.sql.sources.DataSourceRegister,文件内容如下:

     

    com.edu.spark.text.DefaultSource
    

    测试代码如下:

     

    object TestApp extends App {
      println("Application Started...")
      val conf=new SparkConf().setAppName("spark-custom-datasource")
      val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
      val df=spark.sqlContext.read.format("udftext").load("/Users/Downloads/data")
      df.show()
      println("Application Ended...")
    }
    

    输出的结果为:

     

    +-----+---------------+------+------+--------+
    |   id|           name|gender|salary|expenses|
    +-----+---------------+------+------+--------+
    |10002|    Alice Heady|Female| 20000|    8000|
    |10003|    Jenny Brown|Female| 30000|  120000|
    |10004|     Bob Hayden|  Male| 40000|   16000|
    |10005|    Cindy Heady|Female| 50000|   20000|
    |10006|     Doug Brown|  Male| 60000|   24000|
    |10007|Carolina Hayden|Female| 70000|  280000|
    +-----+---------------+------+------+--------+
    

    编写相应的DataFrameReader来简写自定义的数据源,代码如下:

     

    object ReaderObject {
      implicit class UDFTextReader(val reader: DataFrameReader) extends AnyVal{
        def udftext(path:String) = reader.format("udftext").load(path)
      }
    }
    

    测试代码(需要将隐士转换导入相应的DataFrameReader):

     

    object TestApp extends App {
      println("Application Started...")
      val conf=new SparkConf().setAppName("spark-custom-datasource")
      val spark=SparkSession.builder().config(conf).master("local[2]").getOrCreate()
      val df=spark.sqlContext.read.udftext("/Users/Downloads/data")
      df.show()
      println("Application Ended...")
    }
    

    输出结果与上面一致,不再赘述。

    附加CustomFilter.scala代码

     

    case class CustomFilter(attr : String, value : Any, filter : String)
    object CustomFilter {
      def applyFilters(filters : List[CustomFilter], value : String, schema : StructType): Boolean = {
        var includeInResultSet = true
    
        val schemaFields = schema.fields
        val index = schema.fieldIndex(filters.head.attr)
        val dataType = schemaFields(index).dataType
        val castedValue = Util.castTo(value, dataType)
    
        filters.foreach(f => {
          val givenValue = Util.castTo(f.value.toString, dataType)
          f.filter match {
            case "equalTo" => {
              includeInResultSet = castedValue == givenValue
              println("custom equalTo filter is used!!")
            }
            case "greaterThan" => {
              includeInResultSet = castedValue.equals(givenValue)
              println("custom greaterThan filter is used!!")
            }
            case _ => throw new UnsupportedOperationException("this filter is not supported!!")
          }
        })
    
        includeInResultSet
      }
    }
    

     

    展开全文
  • 自定义Spark的外部数据源读取...Spark提供了自定义外部数据源的功能,可以根据自身需要自定义数据读取获取dataframe,写法是 val df = spark.sqlContext.read.format(“com.spark.datasource”) .option("",) .op...
  • import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType /** * Created by rana on 29/9/16....
  • 文章目录1.1、External DataSource   标签模型编码中需要从HBase表读写数据,编写 HBaseTools 工具类,其中提供 read ...Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现,接口在 org.apache.spark.
  • 关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源 前面文章介绍了SparSQL通过Hive操作HBase表。 ...这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。 在HBase中表
  • Spark SQL外部数据源

    2019-04-24 17:04:10
    外部数据源 package com.kaola.bigdata.sparksql03 import org.apache.spark.sql.SparkSession object DataSourceAPIApp { def main(args: Array[String]): Unit = { val spark = SparkSession .builder(...
  • Spark处理外部数据源

    2018-06-01 22:49:32
    产生背景:1.数据以各种格式存储在系统中2加载和保存数据不容易(Hive和mysql之间)3.数据存在各种类型,不好解析4.转换数据格式5.格式转换6.用户希望方便快速从不同...出现时间:Spark Sql1.2出现了外部数据源API...
  • Spark SQL 外部数据源

    2020-04-26 11:34:05
    Spark SQL 外部数据源 一、简介         1.1 多数据源支持         1.2 读数据格式        ...
  • 通过查看JDBC方式代码入口分析: 源码分析 //继承BaseRelation的类必须能够以`StructType`的形式产生其数据模式。具体的实现应继承自后代Scan类之一 abstract class BaseRelation { def sqlContext: SQLContext ...
  • Spark连接外部数据源解读

    千次阅读 2016-10-21 12:51:45
    项目源码:https://github.com/hortonworks-spark/shc 注:由于某些原因,尚无充裕时间进行更深入的解读,本文先着重数据源注册和写入两个流程,后续文章会继续跟进。注册流程位置:org.apache.spark.sql.execution....
  • 此外Spark中提供JdbcRDD用于从MySQL表中读取数据。 调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。 范例演示:将词频统计WordCount结果保存...
  • Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景。 CSV JSON Parquet ORC JDBC/ODBC connections Plain-text files 注:以下所有测试文件均可...
  • spark读取外部数据源

    2019-10-04 00:03:11
    文章目录读取json文件读取csv和tsv文件读取SequenceFile读取ObjectFile格式的数据读取hdfs中的数据(显式调用hadoopAPI)读取mysql中的数据 读取json文件 def main(args: Array[String]): Unit = { val conf = new ...
  • Spark支持以下六个核心数据源,同时Spark社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景。 CSV JSON Parquet ORC JDBC/ODBC connections Plain-text files 注:以下所有测试文件均可从本仓库...
  • 一.数据解释与杂项 1.External Data Source API 外部数据源 ...
  • JDBC外部数据源 JDBC编程实现的底层原理 自己实现 去 自定义外部数据源
  • Spark SQL操作外部数据源 1. Spark SQL与Hive集成 方式一:spark-shell访问Hive表 1、hive-site.xml拷贝至${SPARK_HOME}/conf下 2、mysql驱动至${SPARK_HOME}/jars 查询结果: 方式二:IDEA访问hive 1、hive-site....
  • * @Description TODO 实现自定义开发Wordcount程序 * */ object WordCountWriteToMySQL { /** * 用于写入mySQL * @param part:每个分区对象 */ def saveToMySQL(part: Iterator[(String, Int)]): Unit = { ...
  • Spark SQL操作——使用内置函数1 Spark SQL操作——使用内置函数2 Spark SQL函数——UDF(自定义函数) Spark SQL支持的外部数据源 创建maven工程的过程可以参考我这篇博客: Maven工程实现Spark api—wordcount和打印...
  • Spark SQL之External DataSource外部数据源

    千次阅读 2015-10-02 11:53:47
    http://blog.csdn.net/oopsoom/article/details/42061077一、Spark SQL External ...Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现。 这使得Spark SQL支持了更多的类型数据源,如json, parquet,

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 6,333
精华内容 2,533
关键字:

spark自定义外部数据源