精华内容
下载资源
问答
  • SparkSql 数据类型转换

    千次阅读 2019-09-18 10:40:08
    数据类型转换这个在任何语言框架中都会涉及到,看起来非常简单,不过要把所有的数据类型都掌握还是需要一定的时间历练的 SparkSql数据类型 数字类型 ByteType:代表一个字节的整数。范围是-128到127 ShortType:...

    前言

    数据类型转换这个在任何语言框架中都会涉及到,看起来非常简单,不过要把所有的数据类型都掌握还是需要一定的时间历练的

    SparkSql数据类型

    数字类型
    • ByteType:代表一个字节的整数。范围是-128到127
    • ShortType:代表两个字节的整数。范围是-32768到32767
    • IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
    • LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
    • FloatType:代表4字节的单精度浮点数
    • DoubleType:代表8字节的双精度浮点数
    • DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
    • StringType:代表一个字符串值
    • BinaryType:代表一个byte序列值
    • BooleanType:代表boolean值
    • Datetime类型
      TimestampType:代表包含字段年,月,日,时,分,秒的值
      DateType:代表包含字段年,月,日的值
    复杂类型
    • ArrayType(elementType, containsNull):代表由elementType类型元素组成的序列值。containsNull用来指明ArrayType中的值是否有null值
    • MapType(keyType, valueType, valueContainsNull):表示包括一组键 - 值对的值。通过keyType表示key数据的类型,通过valueType表示value数据的类型。valueContainsNull用来指明MapType中的值是否有null值
    • StructType(fields):表示一个拥有StructFields (fields)序列结构的值
      StructField(name, dataType, nullable):代表StructType中的一个字段,字段的名字通过name指定,dataType指定field的数据类型,nullable表示字段的值是否有null值。

    Spark Sql数据类型和Scala数据类型对比

    sparksql 数据类型scala数据类型
    ByteTypeByte
    ShortTypeShort
    IntegerTypeInt
    LongTypeLong
    FloatTypeFloat
    DoubleTypeDouble
    DecimalTypescala.math.BigDecimal
    StringTypeString
    BinaryTypeArray[Byte]
    BooleanTypeBoolean
    TimestampTypejava.sql.Timestamp
    DateTypejava.sql.Date
    ArrayTypescala.collection.Seq
    MapTypescala.collection.Map
    StructTypeorg.apache.spark.sql.Row
    StructFieldThe value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType)

    Spark Sql数据类型转换案例

    一句话描述:调用Column类的cast方法

    如何获取Column类

    这个之前写过

    df("columnName")            // On a specific `df` DataFrame.
    col("columnName")           // A generic column not yet associated with a DataFrame.
    col("columnName.field")     // Extracting a struct field
    col("`a.column.with.dots`") // Escape `.` in column names.
    $"columnName"               // Scala short hand for a named column.
    
    测试数据准备
    1,tom,23
    2,jack,24
    3,lily,18
    4,lucy,19
    
    spark入口代码
    val spark = SparkSession
          .builder()
          .appName("test")
          .master("local[*]")
          .getOrCreate()
    
    测试默认数据类型
    spark.read.
          textFile("./data/user")
          .map(_.split(","))
          .map(x => (x(0), x(1), x(2)))
          .toDF("id", "name", "age")
          .dtypes
          .foreach(println)
    

    结果:

    (id,StringType)
    (name,StringType)
    (age,StringType)
    

    说明默认都是StringType类型

    把数值型的列转为IntegerType
     import spark.implicits._
        spark.read.
          textFile("./data/user")
          .map(_.split(","))
          .map(x => (x(0), x(1), x(2)))
          .toDF("id", "name", "age")
          .select($"id".cast("int"), $"name", $"age".cast("int"))
          .dtypes
          .foreach(println)
    

    结果:

    (id,IntegerType)
    (name,StringType)
    (age,IntegerType)
    
    Column类cast方法的两种重载
    • 第一种
      def cast(to: String): Column
      Casts the column to a different data type, using the canonical string representation of the type. The supported types are:
      string, boolean, byte, short, int, long, float, double, decimal, date, timestamp.
    // Casts colA to integer.
    df.select(df("colA").cast("int"))
    Since
    1.3.0
    
    • 第二种
      def cast(to: DataType): Column
      Casts the column to a different data type.
    // Casts colA to IntegerType.
    import org.apache.spark.sql.types.IntegerType
    df.select(df("colA").cast(IntegerType))
    
    // equivalent to
    df.select(df("colA").cast("int"))
    
    展开全文
  • 前几天接触了 SparkSQL,通过自定义数据源可以完成各种数据库的读取和写入。我好像嗅到了数据中台的调调,封装一个扩展性强的小架架把 hbase,mysql,redis各种数据源都整合一下,再用并发多线程,对象池之类的优化...

    下面这一段是废话,时间紧的兄弟直接跳过:

    前几天接触了 SparkSQL,通过自定义数据源可以完成各种数据库的读取和写入。我好像嗅到了数据中台的调调,封装一个扩展性强的小架架把 hbase,mysql,redis各种数据源都整合一下,再用并发多线程,对象池之类的优化一下性能,再招一个3000块的小表哥,多么优秀的开源节流,是不是又可以找老板涨工资了!考验架构能力的时候到了,不想当架构师的程序员不是一个好男人!好了,做梦时间结束,进入正题:

    我们知道 hbase 最终是把数据转成了 HFile 文件,HFile 是 hadoop 的二进制格式文件,所以从 hbase 读出来的也是二进制字节流,那么要如何获取每个 Column 的数据类型呢?

    其实这是一个伪命题,因为 SparkSQL 已经帮我们实现了!

    在自定义数据源需要实现的第二层接口(DataSourceReader)需要实现这样一个方法:

    override def readSchema(): StructType = {
        structType
      }

    StructType 就是保存字段类型信息的,进入 DataSourceReader 接口看下方法调用:

    private lazy val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
        case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
        case _ =>
          reader.createDataReaderFactories().asScala.map {
            new RowToUnsafeRowDataReaderFactory(
    _, 
    reader.readSchema() // here
    ): DataReaderFactory[UnsafeRow]
          }.asJava
      }

    进入 RowToUnsafeRowDataReaderFactory 

    class RowToUnsafeRowDataReaderFactory(rowReaderFactory: DataReaderFactory[Row], schema: StructType)
      extends DataReaderFactory[UnsafeRow] {
    
      override def preferredLocations: Array[String] = rowReaderFactory.preferredLocations
    
      override def createDataReader: DataReader[UnsafeRow] = {
        new RowToUnsafeDataReader(
          rowReaderFactory.createDataReader, // 这里的 Reader 就是我们自己实现的 DataReader
          RowEncoder.apply(schema).resolveAndBind() // 这是 column 的类型信息
        )
      }
    }

    继续进入 RowToUnsafeDataReader 

    class RowToUnsafeDataReader(val rowReader: DataReader[Row], encoder: ExpressionEncoder[Row])
      extends DataReader[UnsafeRow] {
    
      override def next: Boolean = rowReader.next
    
      /**
        * rowReader 就是我们自己实现的 DataReader 对象
        * 这里他用了一个对象代理了我们自己实现的 DataReader 对象
        * get() 的时候 用具有 column 类型信息的 encoder 对我们返回的 Row 对象做了一次转换
        */
      override def get: UnsafeRow = encoder.toRow(rowReader.get).asInstanceOf[UnsafeRow]
    
      override def close(): Unit = rowReader.close()
    }

    而我们自定义数据源重写 DataReader 的时候,只需要获取一个数组对象array,然后直接调用 Row.fromSeq(array) 转成 Row 对象返回即可

    override def get(): Row = {
        // 将查询的数据直接生成 Iterator 对象,get()直接iterator.next
        val result: Result = datas.next() 
    
        // 拆分 hbase 的列族和列名,获取结果,将结果封装成一个 String 数组
        val strings: Array[String] = cfcc.split(",").map(eachCfcc => {
          val strings: Array[String] = eachCfcc.trim.split(":")
          // 直接转 String
          Bytes.toString(result.getValue(strings(0).trim.getBytes(), strings(1).trim.getBytes()))
        })
        Row.fromSeq(strings)
      }

     

    展开全文
  • 【Spark】Spark SQL 数据类型转换

    千次阅读 2020-02-22 11:41:53
    数据类型转换这个在任何语言框架中都会涉及到,看起来非常简单,不过要把所有的数据类型都掌握还是需要一定的时间历练。 SparkSql数据类型 数字类型 ByteType:代表一个字节的整数。范围是-128到127 ShortType:...

    前言

    数据类型转换这个在任何语言框架中都会涉及到,看起来非常简单,不过要把所有的数据类型都掌握还是需要一定的时间历练。

    SparkSQL数据类型

    数字类型
    • ByteType:代表一个字节的整数。范围是-128到127
    • ShortType:代表两个字节的整数。范围是-32768到32767
    • IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
    • LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
    • FloatType:代表4字节的单精度浮点数
    • DoubleType:代表8字节的双精度浮点数
    • DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
    • StringType:代表一个字符串值
    • BinaryType:代表一个byte序列值
    • BooleanType:代表boolean值
    • Datetime类型
      TimestampType:代表包含字段年,月,日,时,分,秒的值
      DateType:代表包含字段年,月,日的值
    复杂类型
    • ArrayType(elementType, containsNull):代表由elementType类型元素组成的序列值。containsNull用来指明ArrayType中的值是否有null值
    • MapType(keyType, valueType, valueContainsNull):表示包括一组键 - 值对的值。通过keyType表示key数据的类型,通过valueType表示value数据的类型。valueContainsNull用来指明MapType中的值是否有null值
    • StructType(fields):表示一个拥有StructFields (fields)序列结构的值
      StructField(name, dataType, nullable):代表StructType中的一个字段,字段的名字通过name指定,dataType指定field的数据类型,nullable表示字段的值是否有null值。

    Spark SQL数据类型和Scala数据类型对比

    sparksql 数据类型scala数据类型
    ByteTypeByte
    ShortTypeShort
    IntegerTypeInt
    LongTypeLong
    FloatTypeFloat
    DoubleTypeDouble
    DecimalTypescala.math.BigDecimal
    StringTypeString
    BinaryTypeArray[Byte]
    BooleanTypeBoolean
    TimestampTypejava.sql.Timestamp
    DateTypejava.sql.Date
    ArrayTypescala.collection.Seq
    MapTypescala.collection.Map
    StructTypeorg.apache.spark.sql.Row
    StructFieldThe value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType)

    Spark SQL数据类型转换案例

    一句话描述:调用Column类的cast方法

    如何获取Column类

    这个之前写过

    df("columnName")            // On a specific `df` DataFrame.
    col("columnName")           // A generic column not yet associated with a DataFrame.
    col("columnName.field")     // Extracting a struct field
    col("`a.column.with.dots`") // Escape `.` in column names.
    $"columnName"               // Scala short hand for a named column.
    
    测试数据准备
    1,tom,23
    2,jack,24
    3,lily,18
    4,lucy,19
    
    spark入口代码
    val spark = SparkSession
          .builder()
          .appName("test")
          .master("local[*]")
          .getOrCreate()
    
    测试默认数据类型
    spark.read.
          textFile("./data/user")
          .map(_.split(","))
          .map(x => (x(0), x(1), x(2)))
          .toDF("id", "name", "age")
          .dtypes
          .foreach(println)
    

    结果:

    (id,StringType)
    (name,StringType)
    (age,StringType)
    

    说明默认都是StringType类型

    把数值型的列转为IntegerType
     import spark.implicits._
        spark.read.
          textFile("./data/user")
          .map(_.split(","))
          .map(x => (x(0), x(1), x(2)))
          .toDF("id", "name", "age")
          .select($"id".cast("int"), $"name", $"age".cast("int"))
          .dtypes
          .foreach(println)
    

    结果:

    (id,IntegerType)
    (name,StringType)
    (age,IntegerType)
    
    Column类cast方法的两种重载
    • 第一种
      def cast(to: String): Column
      Casts the column to a different data type, using the canonical string representation of the type. The supported types are:
      string, boolean, byte, short, int, long, float, double, decimal, date, timestamp.
    // Casts colA to integer.
    df.select(df("colA").cast("int"))
    Since
    1.3.0
    
    • 第二种
      def cast(to: DataType): Column
      Casts the column to a different data type.
    // Casts colA to IntegerType.
    import org.apache.spark.sql.types.IntegerType
    df.select(df("colA").cast(IntegerType))
    // equivalent to
    df.select(df("colA").cast("int"))
    
    展开全文
  • 行列转换的其实是一个很常用的... 列转行对数据的要求为 column 的数据类型是 string ,使用实例如下 原数据如下 2018-01,项目1,100 2018-01,项目2,200 2018-01,项目3,300 2018-01,项目3,400 2018-02,项目1,1000 2018-

    行列转换的其实是一个很常用的数据分析操作,用在数据的拼接与拆分上,实现一些普通的函数无法实现的效果

    列转行

    首先为大家介绍的是列转行函数,涉及到的内建函数有,collect_list 列转行之后不去重,collect_set 列转行之后去重。 列转行是对一列数据进行聚合的操作,且要求这一列的数据类型是 string 使用实例如下

    原数据如下

    2018-01,项目1,100
    2018-01,项目2,200
    2018-01,项目3,300
    2018-01,项目3,400
    2018-02,项目1,1000
    2018-02,项目2,2000
    2018-03,项目x,999
    

    sql如下

    spark.sql("select yue, collect_set(project) projects,sum(shouru) zsr  from sr group by yue")
    展开全文
  • sparkSQL的三种数据类型【RDD--DF--DS】之间的相互转换 RDD结果: DF结果: DS结果:
  • 最近在做弄sparksql,在读取mysql数据的时候发现一个问题, 在数据库将字段定义成tinyint,并且长度为1的时候,读取到spark里面,被转换成Boolean类型的字段了. 测试表定义 CREATE TABLE `test1` ( `id` bigint(4) NOT ...
  • 使用spark读取parquet文件时,例如读取在file:///E:/test/...而我们的文件内容中的数据结构是: val struct = StructType( Array( StructField("uid", StringType), StructField("time", StringType), Struc...
  • 项目中需要实现一个udf完成一个数据转换的功能,网上的例子都是比较简单的那种,比如字符串变换或者字符串长度统计这种简单功能。我这个稍微复杂一些,需要传入数据字典,字段要和数据字典中的字段进行比对,然后...
  • 文章目录一、sparksql概述二、sparksql四大特性三、DataFrame简介DataFrame与RDD的区别DataFrame与RDD的优缺点读取数据源创建DataFrame读取json文件创建DataFrame四、DataFrame常用操作DSL风格语法SQL风格语法六、...
  • 【Spark SQL】扩展 ---- DataFrame 数据类型转换 (cast使用) package 大数据应用赛_2020 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.IntegerType object Exam2 { def main...
  • 1.有类型转换算子 (1)转换 1.flatMap 通过 flatMap 可以将一条数据转为一个数组, 后再展开这个数组放入 Dataset import spark.implicits._ val ds = Seq("hello world", "hello pc").toDS() ds.flatMap( _.split(...
  • 文章目录1 、SparkSQL读取Json文件2、RDD转换成DataFrame2.1 用toDF()方式2.2 把原生RDD转换成RDD[Row],再和定义好的StructType匹配 1 、SparkSQL读取Json文件 先随便造两份Json格式数据。 [hadoop@vm01 data]$ ...
  • 数据格式 原格式 日期 时间 种类 监测站1数据 监测站…数据 String Int String Double Double 数据清洗 PM2.5表、O3表… 时间 监测站 数据 String(“yyyy-MM-dd-HH“) String Double 这样会...
  • sparkSQL是spark的一个模块,可以和RDD进行混合编程、支持标准的数据源、可以集成和替代Hive、可以提供JDBC\ODBC服务器功能。 sparkSQL里面有两个新的数据抽象:DataFrame和DataSet。 DataFrame: SQL操作主要涉及到...
  • sparkSql(hive) 复合数据类型的使用

    千次阅读 2017-01-09 22:48:09
    在Hive中可以使用复合数据类型,有三种常用的类型:Array 数组,Map 字典,Struct结构。在sparkSql中RDD可以转换成非常灵活的DataFrame, 但是如果需要将数据结构完整的存储为Hive表,那么在RDD到DataFrame转换中...
  • SparkSQL数据处理分析 基于DSL分析 ​​​​​​​基于SQL分析 第一步、注册为临时视图 第二步、编写SQL,执行分析 ​​​​​​​SparkSQL数据处理分析 在SparkSQL模块中,将结构化数据封装到DataFrame或...
  • RDD转换成DataFrame并读取数据利用反射机制通过StructType动态指定Schemapom依赖 利用反射机制 应用场景:在开发代码之前,可以事先确定好DataFrame的schema信息,通过反射机制后期直接映射成DataFrame的schema信息 ...
  • 首先在本地创建一个json文件,名字叫json_schema_infer.json,文件中数据的格式如下: {"name":"liguohui","gender":"M","height&...
  • 直接针对HDFS等任何可以构建为RDD数据,进行SparkSQL的sql查询 2、SparkSQL支持RDD转换成DataFrame的方式如下: 1>反射方式; 2>通过编程接口创建DataFrame; 方法一:使用createDataFrame方法; val ...
  • SparkSQL数据抽象 DataFrame 引入 就易用性而言,对比传统的MapReduce API,Spark的RDD API有了数量级的飞跃并不为过。然而,对于没有MapReduce和函数式编程经验的新手来说,RDD API仍然存在着一定的门槛。 另一...
  • Sql 数据类型转换

    2015-06-28 10:58:45
    将数字转换成字符串,如果是整数类型的话,可以使用str()函数直接来转换,不过用str函数不能转带有浮点数。 declare @score float; set @score=1.6; select str(@score); 输出结果是2。这明显是不对的。 ...
  • spark抽象数据列表 RDD DataFrame DataSet 相同点: 全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利 三者都是惰性机制,在执行Transform操作时不会立即执行,在遇到action操作时会正式...
  • SparkSql DataFrame转换为RDD测试数据样式转换 测试数据样式 caizhiyuan,25,18696478962 kongjingqi,25,16987159638 caizhiyuan,25,15277840314 kongjingqi,25,13568520990 caizhiyuan,25,11859201666 kongjingqi,25...
  • SparkSQL数据

    2020-03-02 15:14:39
    SparkSQL支持通过DataFrame接口操作多种数据源。DataFrame可以进行关系型转换操作,也能用来创建临时表。创建临时表后可以对它进行SQL查询。本章节描述了使用Spark数据源loading和saving数据的一般方法,然后是一些...
  • 数据源信息样例: GCSL00000673,0,JL225390810101,1,286.5,286.5 GCSL00000673,1,84126312010104,1,329.7,329.7 GCSL00000673,2,24126312011502,1,412.2,412.2 GCSL00000673,3,84126388563204,1,372.9,372.9...
  • sparkSQL加载数据 1.read加载数据 scala> spark.read. csv format jdbc json load option options orc parquet schema table text textFile 注意:加载数据的相关参数需写到上述方法中,如:textFile需传入加载...
  • SparkSQL提供了通用的保存数据数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据SparkSQL默认读取和保存的文件格式为parquet。 1)加载数据 spark.read.load是加载数据的...
  • SparkSQL读取和写出数据的几种方式

    千次阅读 2019-11-29 15:40:21
    parquet是一种列式存储格式的文件类型。存储时可以通过牛X的压缩算法节省存储空间,读取数据时只需要读取所需的列,提高读取性能。 二、JSON格式的数据 三、通过JDBC读取数据库中的数据 四、Hive中的数据 所需的pom...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,102
精华内容 2,040
热门标签
关键字:

sparksql数据类型转换