精华内容
参与话题
问答
  • SparkSQL

    千次阅读 2020-04-22 11:01:53
    SparkSQL基本介绍 什么是SparkSQL? SparkSQL底层的数据抽象 什么是DataFrame?? 什么是DataSet?? SparkSQL查询数据的形态 添加Schema的方式 通过StructType指定Schema代码流程 利用反射机制推断Schema代码...

    目录

     

    累加器的作用

    广播变量的作用

    SparkSQL基本介绍

    什么是SparkSQL?

    SparkSQL底层的数据抽象

    什么是DataFrame??

    什么是DataSet??

    SparkSQL查询数据的形态

    添加Schema的方式

    通过StructType指定Schema代码流程

    利用反射机制推断Schema代码流程


    累加器的作用

    累加器accumulators:累加器支持在所有不同节点之间进行累加计算

     

    广播变量的作用

    在每个机器上缓存一份、不可变的、只读的、相同的变量,该节点每个任务都能访问。起到节省资源和优化的作用。

     

     

    SparkSQL基本介绍

     

    什么是SparkSQL?

    用于处理结构化数据的Spark模块。

     

     

    SparkSQL底层的数据抽象

    DataFrame和DataSet

    Hive和SparkSQL的对比

    Hive是将sql转化成MapReduce进行计算(降低学习成本、提高开发效率)

    SparkSQL是将sql转化成rdd集进行计算(降低学习成本、提高开发效率)

     

     

    什么是DataFrame??

    DataFrame是以RDD为基础的带有Schema元信息的分布式数据集。

                                                                          

     

    什么是DataSet??

    含有类型信息的DataFrame就是DataSet

    (DataSaet=DataFrame+类型= Schema+RDD*n+类型)

     

    SparkSQL查询数据的形态

    1. 类似方法调用,领域特定语言(DSL)。

    personDF.select($"id",$"name",$"age"+1).filter($"age">25).show

    1. SQL语句

    spark.sql("select * from personDFT where age >25").show

     

    添加Schema的方式

    第1种:指定列名添加Schema

    第2种:通过StructType指定Schema

    第3种:编写样例类,利用反射机制推断Schema

    指定列名添加Schema代码流程

     

    1  创建sparksession

    2 创建sc

    3 读取数据并加工

    4 设置表结构   ttRDD.toDF("id","name","age")

    5  注册成表并查询

    6 关闭sc    sparksession

     

    通过StructType指定Schema代码流程

    1  创建sparksession

    2 创建sc

    3 读取数据并加工

    4 设置表结构   

    types.StructType(
        //   字段类型  (字段名,字段类型,是否为空)
        List(StructField("id",IntegerType,true)
        )
      )

    5 创建DS  DF

      val ttDF: DataFrame = spark.createDataFrame(RowRDD,structTable)

    6  注册成表并查询
    7 关闭sc    sparksession

     

     

    利用反射机制推断Schema代码流程

    准备样例类

    1 创建sparksession
    2 创建sc

    3 读取数据并加工
    val PersonRDD: RDD[Person] = ttRDD.map(z=>Person(z(0).toInt,z(1),z(2).toInt))
    4 RDD转DF
       val personDF: DataFrame = PersonRDD.toDF()

    5 注册成表并查询

    6 关闭sc    sparksession

     

     

    展开全文
  • sparksql

    2020-06-26 08:15:03
    sparksql1、课程目标2、sparksql概述2.1 sparksql前世今生2.2 sparksql是什么3、sparksql特性4、DataFrame4.1 dataFrame是什么4.2 RDD与DataFrame优缺点对比5、DataSet5.1 dataSet是什么5.2 dataFrame和dataSet互相...

    1、课程目标

    • 1、掌握sparksql原理
    • 2、掌握DataFrame和DataSet数据结构和使用方式
    • 3、掌握通过sparksql来开发简单的应用程序

    2、sparksql概述

    2.1 sparksql前世今生

    • spark是专门为spark设计的大规模数据仓库系统
    • spark需要依赖于hive的代码,同时也依赖于spark的版本
    • 随着数据处理的复杂度越来越高,并且性能也要求很高
    • 发现之前hivesql底层运行mapreduce代码这种思想限制了shark的发展
    • 最后就把shark这个框架废弃了,把工作的重点转移到sparksql

    2.2 sparksql是什么

    • Spark SQL is Apache Spark’s module for working with structured data

    • sparksql是spark的一个用来处理结构化数据的模块

    • 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

    3、sparksql特性

    • 1、易整合
      • 可以使用sparksql与spark应用程序进行混合使用
      • 同时也可以使用不同的语言进行代码开发
        • java
        • scala
        • python
        • R
    • 2、统一的数据源访问
      • sparksql可以使用一种相同的方式来访问外部的数据源
        • SparkSession.read.文件格式方法(该格式的文件路径)
    • 3、兼容hive
      • sparksql可以支持hivesql的语法,使用sparksql来操作hivesql
    • 4、支持行业标准的数据库连接
      • sparksql支持jdbc或者是odbc来连接上数据库

    4、DataFrame

    4.1 dataFrame是什么

    	dataFrame它的前身是schmeRDD,schemaRDD是直接继承自RDD,在spark1.3.0之后把schemaRDD改为DataFrame,它不在直接继承自RDD,而是自己实现了RDD的一些方法。
    
    	在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化
    

    4.2 RDD与DataFrame优缺点对比

    • RDD

      • 优点
        • 1、编译时类型安全
          • 通过RDD开发程序,在编译的时候会进行类型检查
        • 2、具有面向对象编程的风格
      • 缺点
        • 1、序列化和反序列化性能开销很大
          • 数据在跨进程进行网络传输的时候,需要先把数据内容本身和数据的结构信息进行序列化,后期通过反序列化来恢复得到该对象。
        • 2、构建大量的对象会带来频繁的GC
          • 对象的创建一般是使用heap堆中的内存去存储,如果内存空间不足需要进行gc,把一些不是进行使用的对象清除掉,来腾出更多的内存空间。不断进行gc的后果导致线程任务不断的暂停,任务执行的效率是比较低。
    • dataFrame

      • dataFrame它引入了schema和off-heap概念

      • 优点

        • 1、dataFrame由于引入了schema(数据的结构信息–元数据信息),这个时候数据在进行网络传输的时候只需要序列化数据本身就可以了,对于数据的结构信息可以省略掉。它是解决了rdd序列化和反序列化性能开销很大这个缺点

        • 2、dataFrame由于引入了off-heap(直接使用操作系统层面上的内存,不在使用堆中的内存)

          • 这里大量的对象创建就不在heap堆中,直接使用操作系统层面上的内存,后期就可以保证堆中空间是比较充足,就不会导致频繁的gc。它是解决了RDD构建大量的对象会带来频繁的GC
      • 缺点

        • dataFrame引入了schema和off-heap分别解决了rdd的缺点,同时它丢失了rdd的优点。
          • 1、编译时类型不安全
          • 2、不具备面向对象编程的风格

    5、DataSet

    5.1 dataSet是什么

    	DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。DataSet是在Spark1.6中添加的新的接口。
    

    5.2 dataFrame和dataSet互相转换

    • 1、dataFrame转换成dataSet
      • val ds=df.as[强类型]
      • val ds=df.as[String]
    • 2、dataSet转换成dataFrame
      • val df=ds.toDF
    • 补充:
      • 可以把dataFrame和dataSet调用rdd这个方法,获取得到一个rdd。

    5.3 创建DataSet

    • 1、通过sparkSession调用方法createDataset构建

      val ds=spark.createDataset(List(1,2,3,4))
      val ds=spark.createDataset(sc.textFile("/person.txt"))
      
    • 2、通过一个rdd转换生成一个dataSet

    val ds=sc.textFile("/person.txt").toDS
    
    • 3、通过一个dataFrame转换生成一个dataSet

      val ds=df.as[类型]
      
    • 4、通过一个已经存在dataSet转换生成一个新的dataSet

      ds.map(x=>x+" beijing").show
      

    6、基于IDEA开发代码将rdd转换成dataFrame

    • 1、引入依赖

              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.11</artifactId>
                  <version>2.1.3</version>
              </dependency>
      

    6.1 利用反射机制

    • 1、代码开发
    package cn.itcast.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    //todo:实现将rdd转换成dataFrame,利用了反射机制(定义一个样例类)
    
    case class Person(id:Int,name:String,age:Int)
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
            //1、创建SparkSession
            val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
    
           //2、创建SparkContext
              val sc: SparkContext = spark.sparkContext
              sc.setLogLevel("warn")
    
          //3、读取数据文件
            val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" "))
    
        //4、将rdd1与样例类进行关联
            val personRDD: RDD[Person] = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
    
        //5、将RDD转换成dataFrame
            //手动导入隐式转换
            import  spark.implicits._
            val personDF: DataFrame = personRDD.toDF
    
        //6、操作
          //-------------------DSL风格语法-----------------start
           //打印schema
           personDF.printSchema()
          //展示数据,默认展示前20条数据
           personDF.show()      //name:zhangsanxxxxxxxxxxxxxxxxxxxxxxxx
          //展示第一个数据
          println(personDF.first())
          personDF.head(3).foreach(println)
    
          //获取name字段对应的数据
          personDF.select("name").show()
          personDF.select($"name").show()
          personDF.select(new Column("name")).show()
    
        //查询多个字段的结果数据
         personDF.select("name","id","age").show()
    
        //实现age+1操作
        personDF.select($"age"+1).show()
    
        //过滤出年龄大于30的用户信息
          personDF.filter($"age" >30).show()
          println(personDF.filter($"age" >30).count())
    
        //按照年龄进行分组统计
          personDF.groupBy("age").count().show()
    
        //-------------------DSL风格语法-----------------end
    
    
        //-------------------SQL风格语法-----------------start
          personDF.createTempView("person")
          spark.sql("select * from person").show()
          spark.sql("select * from person where age >30").show()
          spark.sql("select * from person  where id=6").show()
          spark.sql("select count(*) from person group by age ").show()
    
        //-------------------SQL风格语法-----------------end
         sc.stop()
         spark.stop()
      }
    }
    

    展开全文
  • SparkSql

    2020-04-21 18:17:15
    SparkSQL基本介绍 什么是SparkSQL? 用于处理结构化数据的Spark模块。 可以通过DataFrame和DataSet处理数据。 SparkSQL特点 1、易整合 可以使用java、scala、python、R等语言的API操作。 2、统一的数据访问 连接到...

    SparkSQL基本介绍

    什么是SparkSQL?
    用于处理结构化数据的Spark模块。
    可以通过DataFrame和DataSet处理数据。

    SparkSQL特点
    1、易整合
    可以使用java、scala、python、R等语言的API操作。
    2、统一的数据访问
    连接到任何数据源的方式相同。
    3、兼容Hive
    4、标准的数据连接(JDBC/ODBC)

    SQL优缺点
    优点:表达非常清晰,难度低、易学习。
    缺点:复杂的业务需要复杂的SQL, 复杂分析,SQL嵌套较多。机器学习较难实现。

    Hive和SparkSQL的对比
    Hive是将sql转化成MapReduce进行计算
    SparkSQL是将sql转化成rdd集进行计算

    SparkSQL中的两个抽象

    什么RDD??
    弹性分布式数据集。

    DataFrame
    什么是DataFrame??
    DataFrame是以RDD为基础的带有元信息的分布式数据集。
    (DataFrame=Schema+RDD*n)

    什么是DataSet??
    含有类型信息的DataFrame就是DataSet
    (DataSaet=DataFrame+类型= Schema+RDD*n+类型)
    DataSet包含了DataFrame的功能

    Spark初体验

    SparkSQL驱动为SparkSession
    SparkSession可以执行SparkSQL也可以执行HiveSQL
    //读取数据
    val lineRDD= sc.textFile(“hdfs://node01:8020/tt.txt”).map(_.split(" "))
    //实例样例类(类似于表的结构)
    case class Person(id:Int, name:String, age:Int)
    //遍历数据,将数据填充到样例类中
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    //将RDD转换成DataFrame
    val personDF = personRDD.toDF
    //查看数据
    personDF.show
    //输出表结构
    personDF.printSchema
    //将DataFrame注册为张表
    personDF.createOrReplaceTempView(“t_person”)
    //通过SQL语句进行查询
    spark.sql(“select id,name from t_person where id > 3”).show

    使用SparkSession对象(spark)直接读取数据,读取文本文件是没有元数据信息,读取json文件有元数据信息。

    创建DataSet
    1.通过spark.createDataset创建Dataset
    val fileRdd = sc.textFile(“hdfs://node01:8020/person.txt”) //RDD[String]
    val ds1 = spark.createDataset(fileRdd) //DataSet[String]
    ds1.show

    2.通RDD.toDS方法生成DataSet

    case class Person(name:String, age:Int)
    val data = List(Person(“zhangsan”,20),Person(“lisi”,30)) //List[Person]
    val dataRDD = sc.makeRDD(data)
    val ds2 = dataRDD.toDS //Dataset[Person]
    ds2.showS
    3.通过DataFrame.as[泛型]转化生成DataSet
    case class Person(name:String, age:Long)
    val jsonDF= spark.read.json(“file:///export/servers/spark/examples/src/main/resources/people.json”)
    val jsonDS = jsonDF.as[Person] //DataSet[Person]
    jsonDS.show

    SparkSQL查询数据的形态
    1、类似方法调用,领域特定语言(DSL)。
    准备数据
    val lineRDD= sc.textFile(“hdfs://node01:8020/tt.txt”).map(_.split(" "))
    case class Person(id:Int, name:String, age:Int)
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    val personDF = personRDD.toDF
    personDF.show
    查询
    personDF.select(“id”,“name”,“age”).show
    personDF.select("id","id",“name”,"age"+1).showpersonDF.select("age"+1).show personDF.select(“id”,"name","name",“age”+1).filter($“age”>25).show

    2、SQL语句
    注册成一张表
    personDF.createOrReplaceTempView(“t_person”)
    查询
    spark.sql(“select * from t_person”).show
    spark.sql("select * from personDFT ").show
    spark.sql(“select * from personDFT where age >25”).show
    总结:
    1.DataFrame和DataSet都可以通过RDD来进行创建
    2.也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过RDD+Schema
    3.通过josn/parquet会有完整的约束
    4.不管是DataFrame还是DataSet都可以注册成表,之后就可以使用SQL进行查询了! 也可以使用DSL!

    通过IDEA编写SparkSQL代码

    创建DataFrame/DataSet
    第1种:指定列名添加Schema
    第2种:通过StructType指定Schema
    第3种:编写样例类,利用反射机制推断Schema

    第一种:指定列名添加Schema

    def main(args: Array[String]): Unit = {

    //1 创建sparksession
    val spark: SparkSession = SparkSession.builder().appName(“Demo01”).master(“local[*]”).getOrCreate()

    //2 创建sc
    val sc: SparkContext = spark.sparkContext

    //3 读取数据并加工
    val ttDatas: RDD[String] = sc.textFile(“F:\传智播客\传智专修学院\第二学期\12\05-Spark\资料\tt.txt”)
    val ttRDD: RDD[(Int, String, Int)] = ttDatas.map(a=>a.split(" ")).map(z=>(z(0).toInt,z(1),z(2).toInt))

    //4 设置表结构
    //引入隐式转换
    import spark.implicits._
    //转化成DF
    val ttDF: DataFrame = ttRDD.toDF(“id”,“name”,“age”)

    //5 注册成表并查询
    ttDF.show()
    ttDF.printSchema()
    //6 关闭sc sparksession
    sc.stop()
    spark.stop()

    }

    第二种方法
    def main(args: Array[String]): Unit = {

    //1 创建sparksession
    val spark: SparkSession = SparkSession.builder().appName(“Demo01”).master(“local[*]”).getOrCreate()
    //2 创建sc
    val sc: SparkContext = spark.sparkContext
    //3 读取数据并加工
    val ttDatas: RDD[String] = sc.textFile(“F:\传智播客\传智专修学院\第二学期\12\05-Spark\资料\tt.txt”)
    val RowRDD: RDD[Row] = ttDatas.map(a=>a.split(" ")).map(z=>Row(z(0).toInt,z(1),z(2).toInt))

    //4 设置表结构
    // 表结构类型
    val structTable: StructType = types.StructType(
    // 字段类型 (字段名,字段类型,是否为空)
    List(StructField(“id”,IntegerType,true),
    StructField(“name”,StringType,true),
    StructField(“age”,IntegerType,true)
    )
    )

    //创建DS DF
    val ttDF: DataFrame = spark.createDataFrame(RowRDD,structTable)

    //5 注册成表并查询
    ttDF.show()
    ttDF.printSchema()
    ttDF.createOrReplaceTempView(“ttDF”)
    spark.sql(“select * from ttDF where age>25”).show()

    //6 关闭sc sparksession
    sc.stop()
    spark.stop()
    }

    第三种方法
    //准备临时存储数据的样例类
    case class Person(id:Int,name:String,age:Int)

    def main(args: Array[String]): Unit = {
    //1 创建sparksession
    val spark: SparkSession = SparkSession.builder().appName(“Demo01”).master(“local[*]”).getOrCreate()
    //2 创建sc
    val sc: SparkContext = spark.sparkContext
    //3 读取数据并加工
    val ttDatas: RDD[String] = sc.textFile(“F:\传智播客\传智专修学院\第二学期\12\05-Spark\资料\tt.txt”)
    val ttRDD: RDD[Array[String]] = ttDatas.map(a=>a.split(" "))
    val PersonRDD: RDD[Person] = ttRDD.map(z=>Person(z(0).toInt,z(1),z(2).toInt))
    // 引入隐式转换
    import spark.implicits._

    //RDD转DF
    val personDF: DataFrame = PersonRDD.toDF()

    //5 注册成表并查询
    personDF.show()
    personDF.printSchema()
    personDF.createOrReplaceTempView(“personDF”)
    spark.sql(“select * from personDF where age>25”).show()

    //6 关闭sc sparksession
    sc.stop()
    spark.stop()
    }

    SparkSQL查询数据的两种风格

    //第一种数据查询方法
    personDF.createOrReplaceTempView(“personDF”)
    spark.sql(“select * from personDF where age>25”).show()

    //第二种方法dsl
    personDF.select(“name”,“age”).filter($“age”>25).show()

    RDD、DF、DS三者之间的转化

    //RDD、DF、DS相互转化
    //RDD -> DF DS
    //在引入隐式转换的前提下
    PersonRDD.toDF()
    PersonRDD.toDS()

    //DF ->RDD DS
    personDF.rdd
    val dataSet: Dataset[Person] = personDF.as[Person]

    //DS ->rdd DF
    dataSet.rdd
    dataSet.toDF()

    /*
    转换成RDD .rdd
    转换成DF .toDF()
    转换成DS
    RDD->DS .toDS()
    DF->DS .as[Person]
    */

    SparkSQL的数据读取与写入
    数据读取
    spark.read.json(“D:\data\output\json”).show()
    spark.read.csv(“D:\data\output\csv”).toDF(“id”,“name”,“age”).show()
    spark.read.parquet(“D:\data\output\parquet”).show()
    val prop = new Properties()
    prop.setProperty(“user”,“root”)
    prop.setProperty(“password”,“root”)
    spark.read.jdbc(
    “jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8”,“person”,prop).show()

    数据写入
    personDF.write.json(“D:\data\output\json”)
    personDF.write.csv(“D:\data\output\csv”)
    personDF.write.parquet(“D:\data\output\parquet”)
    val prop = new Properties()
    prop.setProperty(“user”,“root”)
    prop.setProperty(“password”,“root”)
    personDF.write.mode(SaveMode.Overwrite).jdbc(
    “jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8”,“person”,prop)

    Spark SQL自定义函数
    1.UDF(User-Defined-Function)
    输入一行,输出一行 (将每行内小写数据转换成大写)
    2.UDAF(User-Defined Aggregation Funcation)
    输入多行,输出一行 (聚和算法:求平均数/最大值/最小值)
    3.UDTF(User-Defined Table-Generating Functions)
    输入一行,输出多行 (数据拆分)

    UDF案例
    def main(args: Array[String]): Unit = {
    //1 创建sparksession
    val spark: SparkSession = SparkSession.builder().appName(“Demo01”).master(“local[*]”).getOrCreate()
    //2 创建sc
    val sc: SparkContext = spark.sparkContext
    //3 读取数据并加工
    val udfDatas: RDD[String] = sc.textFile(“F:\传智播客\传智专修学院\第二学期\12\05-Spark\资料\udf.txt”)

    import spark.implicits._
    val udfDF = udfDatas.toDF()

    //编写UDF函数 将数据中的小写字符转换为大写
    spark.udf.register(“toUpperAdd123”,(str:String)=>{
    //根据业务逻辑对数据进行处理
    //str.toUpperCase()+ " 123"
    //str.length10
    str.length
    10/2/2.toDouble
    })

    udfDF.show()
    udfDF.createOrReplaceTempView(“udfTable”)
    spark.sql("select value , toUpperAdd123(value) from udfTable ").show()

    sc.stop()
    spark.stop()
    }

    自定义UDAF
    利用UDAF实现平均工资的计算

    数据如下
    {“name”:“Michael”,“salary”:3000}
    {“name”:“Andy”,“salary”:4500}
    {“name”:“Justin”,“salary”:3500}
    {“name”:“Berta”,“salary”:4000}
    代码如下
    class GetAvg extends UserDefinedAggregateFunction{
    //设置 输入的数据的类型
    override def inputSchema: StructType = {
    StructType(List(StructField(“input”,LongType)))
    }

    //设置缓存中间结果数据
    //sum : 每次的临时的和
    //total: 临时的总次数
    override def bufferSchema: StructType = {
    StructType(List(StructField(“sum”,LongType),StructField(“total”,LongType)))
    }

    //最终的返回的数据的类型
    override def dataType: DataType ={
    DoubleType
    }

    //相同的输入否会有相同的输出
    override def deterministic: Boolean = {
    true
    }

    //设置X 个变量 每个变量进行初始化数据
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //算法总和/总数
    // buffer(0)用于记录临时的数据和
    buffer(0)=0L
    //buffer(1)用于记录临时的数据条数
    buffer(1)=0L
    }

    /*
    {“name”:“Michael”,“salary”:3000}
    {“name”:“Andy”,“salary”:4500}

    {“name”:“Justin”,“salary”:3500}
    {“name”:“Berta”,“salary”:4000}
    */

    //List(1,2,3,4,5,6).reduce((a,b)=>a+b)
    /*
    1 a=1 b=2
    2 a=3 b=3
    3 a=6 b=4
    4 a=10 b=5
    5
    6
    */
    //RDD 中含有多个分区 update是计算一个分区内的数据
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    //临时输入的总金额
    //buffer.getLong(0) 上次缓存的数据
    //input.getLong(0) 最新输入的数据
    buffer(0)=buffer.getLong(0)+input.getLong(0)

    //临时输入的总数量(条数)
    buffer(1)=buffer.getLong(1)+1
    

    }

    //上面的update有一个rdd分区就会有几个最终结果(上面的有两个结果【临时金额和/临时数量】)
    //合并计算不同分区的结果
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //累加第一个分区金额总和 与第二个分区金额总=最终的中金额
    buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
    //累加第一个分区次数 与第二个分区次数=最终的总次数
    buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
    }

    //计算最终的平均值
    override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble/buffer.getLong(1).toDouble
    }
    }

    def main(args: Array[String]): Unit = {

    //1 创建sparksession
    val spark: SparkSession = SparkSession.builder().appName(“Demo01”).master(“local[*]”).getOrCreate()
    val udafJsonDatas: DataFrame = spark.read.json(“file:///F:\传智播客\传智专修学院\第二学期\12\05-Spark\资料\udaf.json”)

    udafJsonDatas.show()
    udafJsonDatas.createOrReplaceTempView(“UdafTable”)

    //注册一个UDAF函数
    spark.udf.register(“GetAvg”,new GetAvg())

    //查看薪水
    spark.sql(“select GetAvg(salary) from UdafTable”).show()
    spark.sql(“select avg(salary) from UdafTable”).show()

    spark.stop()

    }

    最终结果

    展开全文
  • sparkSQL

    2020-03-01 20:32:29
    sparksql前身是shark,shark基本是全抄了hive,问题就是优化啥的,hive人家是mapreduce的进程级并行,我们这spark数据抽象是RDD是线程级并行,所以shark执行优化依赖与hive跟我们方向就错了,而且导致了shark兼容...

    sparksql前身是shark,shark基本是全抄了hive,问题就是优化啥的,hive人家是mapreduce的进程级并行,我们这spark数据抽象是RDD是线程级并行,所以shark执行优化依赖与hive跟我们方向就错了,而且导致了shark兼容hive时出现了线程安全问题,shark又开发了一套独立维护的打了补丁的hive源码分支,累的不行,所以spark直接新开发了这个sparksql,基于spark来做,Hive on Spark依然存在,
    Spark SQL在兼容hive仅依赖于hql解析,hive元数据,换句话说,就是hql被解析成AST抽象语法树后,Sparksql就接手了,SparkSQL的执行计划和优化都有catalyst(函数是关系查询优化框架)负责
    SparkSQL增加了DataFrame,数据可以来自于RDD,也可以来自Hive、HDFS、Cassandra等外部数据源,也可以json,支持三种语言JAVA python scala
    Spark2.0以上版本,Spark使用全新的SparkSession接口来替代SparkContext以及HiveContext接口

    展开全文
  • sparkSQL自定义函数sparkSQL自定义函数代码 sparkSQL自定义函数 关键函数 sparkSession.udf.register 有两种风格: 面向对象式风格,通过实现匿名内部类来实现自定义功能 面向函数式风格(一般选这种,比较简洁,...
  • sparkSQL操作hiveSQLsparkSQL操作hiveSQLsparkSQL操作hiveSQL来操作本地文件sparkSQL操作hiveSQL来操作HDFS上的hivepom依赖 sparkSQL操作hiveSQL sparkSQL操作hiveSQL并不是sparkSQL on hive。 sparkSQL操作hiveSQL...
  • sparkSQL文档

    2018-04-28 14:23:43
    本文详细的描述了sparksql的一些应用,带你快速的了解
  • SparkSQL 笔记 01

    2021-01-07 08:40:22
    目录SparkSQL1. 基础概念2.DataFrame3.SparkSql程序开发(1.x,2.x)(1)SparkSQL1.x(2)SparkSQL2.x SparkSQL 1. 基础概念 Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且...

空空如也

1 2 3 4 5 ... 20
收藏数 9,725
精华内容 3,890
关键字:

sparksql