spark sql_spark sql 包含 - CSDN
精华内容
参与话题
  • sparkSQL实战详解

    千次阅读 2018-04-14 12:15:36
    摘要 如果要想真正的掌握sparkSQL编程,首先要对sparkSQL的整体框架以及sparkSQL到底能帮助我们解决什么问题有一个整体的认识,然后就是对各个层级关系有一个清晰的认识后,才能真正的掌握它,对于sparkSQL整体框架...

    摘要  

            如果要想真正的掌握sparkSQL编程,首先要对sparkSQL的整体框架以及sparkSQL到底能帮助我们解决什么问题有一个整体的认识,然后就是对各个层级关系有一个清晰的认识后,才能真正的掌握它,对于sparkSQL整体框架这一块,在前一个博客已经进行过了一些介绍,如果对这块还有疑问可以看我前一个博客:http://9269309.blog.51cto.com/9259309/1845525。本篇博客主要是对sparkSQL实战进行讲解和总结,而不是对sparkSQL源码的讲解,如果想看源码的请绕道。

        再多说一点,对于初学者,本人坚持的观点是不要一上来就看源码,这样的效果不是很大,还浪费时间,对这个东西还没有大致掌握,还不知道它是干什么的,上来就看源码,门槛太高,而且看源码对个人的提升也不是很高。我们做软件开发的,我们开发的顺序也是,首先是需求,对需求有了详细的认识,需要解决什么问题,然后才是软件的设计,代码的编写。同样,学习框架也是,我们只有对这个框架的需求,它需要解决什么问题,它需要干什么工作,都非常了解了,然后再看源码,这样效果才能得到很大的提升。对于阅读源代码这一块,是本人的一点看法,说的对与错,欢迎吐槽j_0057.gif......!

     

    1、sparkSQL层级

         当我们想用sparkSQL来解决我们的需求时,其实说简单也简单,就经历了三步:读入数据 -> 对数据进行处理  -> 写入最后结果,那么这三个步骤用的主要类其实就三个:读入数据和写入最后结果用到两个类HiveContext和SQLContext,对数据进行处理用到的是DataFrame类,此类是你把数据从外部读入到内存后,数据在内存中进行存储的基本数据结构,在对数据进行处理时还会用到一些中间类,用到时在进行讲解。如下图所示:

    wKioL1fMOmTAfg8zAAEV37NvmPo273.png-wh_50

     

    2、HiveContext和SQLContext

       把HiveContext和SQLContext放在一起讲解是因为他们是差不多的,因为HiveContext继承自SQLContext,为什么会有两个这样的类,其实与hive和sql有关系的,虽然hive拥有HQL语言,但是它是一个类sql语言,和sql语言还是有差别的,有些sql语法,HQL是不支持的。所以他们还是有差别的。选择不同的类,最后执行的查询引擎的驱动是不一样的。但是对于底层是怎么区别的这里不做详细的介绍,你就知道一点,使用不同的读数据的类,底层会进行标记,自动识别是使用哪个类进行数据操作,然后采用不同的执行计划执行操作,这点在上一篇sparkSQL整体框架中进行了介绍,这里不做介绍。当从hive库中读数据的时候,必须使用HiveContext来进行读取数据,不然在进行查询的时候会出一些奇怪的错。其他的数据源两者都可以选择,但是最好使用SQLContext来完成。因为其支持的sql语法更多。由于HiveContext是继承自SQLContext,这里只对SQLContext进行详细的介绍,但是以下这些方法是完全可以用在HiveContext中的。其实HiveContext类就扩展了SQLContext的两个我们可以使用的方法(在看源码时以protected和private开头的方法都是我们不能使用的,这个是scala的控制逻辑,相反,不是以这两个关键字标记的方法是我们可以直接使用的方法):analyze(tableName:String)和refreshTable(tableName:String)。

     

    方法用途
    analyze方法这个我们一般使用不到,它是来对我们写的sql查询语句进行分析用的,一般用不到。
    refreshTable方法

    当我们在sparkSQL中处理的某个表的存储位置发生了变换,但是我们在内存的metaData中缓存(cache)了这张表,则需要调用这个方法来使这个缓存无效,需要重新加载。

     

     

    2.1 读数据

          我们在解决我们的需求时,首先是读入数据,需要把数据读入到内存中去,读数据SQLContext提供了两个方法,我们提供两个数据表,为了便于演示,我采用的是用JSON格式进行存储的,写成这样的格式,但是可以保存为.txt格式的文件。

    wKioL1fMQtmxKqkVAADPd4NtRww164.png-wh_50

    1、第一种数据读入:这种是对数据源文件进行操作。

    1
    2
    3
    4
    5
    import org.apache.spark.sql.SQLContext
    val sql = new SQLContext(sc) //声明一个SQLContext的对象,以便对数据进行操作
    val peopleInfo = sc.read.json("文件路径")
    //其中peopleInfo返回的结果是:org.apache.spark.sql.DataFrame =
    // [age: bigint, id: bigint, name: string],这样就把数据读入到内存中了

            写了这几行代码后面总共发生了什么,首先sparkSQL先找到文件,以解析json的形式进行解析,同时通过json的key形成schema,scheam的字段的顺序不是按照我们读入数据时期默认的顺序,如上,其字段的顺序是通过字符串的顺序进行重新组织的。默认情况下,会把整数解析成bigint类型的,把字符串解析成string类型的,通过这个方法读入数据时,返回值得结果是一个DataFrame数据类型。

        DataFrame是什么?其实它是sparkSQL处理大数据的基本并且是核心的数据结构,是来存储sparkSQL把数据读入到内存中,数据在内存中进行存储的基本数据结构。它采用的存储是类似于数据库的表的形式进行存储的。我们想一想,一个数据表有几部分组成:1、数据,这个数据是一行一行进行存储的,一条记录就是一行,2、数据表的数据字典,包括表的名称,表的字段和字段的类型等元数据信息。那么DataFrame也是按照行进行存储的,这个类是Row,一行一行的进行数据存储。一般情况下处理粒度是行粒度的,不需要对其行内数据进行操作,如果想单独操作行内数据也是可以的,只是在处理的时候要小心,因为处理行内的数据容易出错,比如选错数据,数组越界等。数据的存储的形式有了,数据表的字段和字段的类型都存放在哪里呢,就是schema中。我们可以调用schema来看其存储的是什么。

    1
    2
    3
    4
    peopleInfo.schema
    //返回的结果是:org.apache.spark.sql.types.StructType = 
    //StructType(StructField(age,LongType,true), StructField(id,LongType,true),
    // StructField(name,StringType,true))

    可以看出peopleInfo存储的是数据,schema中存储的是这些字段的信息。需要注意的是表的字段的类型与scala数据类型的对应关系:bigint->Long,int -> Int,Float -> Float,double -> Double,string ->  String等。一个DataFrame是有两部分组成的:以行进行存储的数据和scheam,schema是StructType类型的。当我们有数据而没有schema时,我们可以通过这个形式进行构造从而形成一个DataFrame。

     

    read函数还提供了其他读入数据的接口:

    函数用途

    json(path:String)

    读取json文件用此方法
    table(tableName:String)读取数据库中的表
    jdbc(url: String,table: String,predicates:Array[String],connectionProperties:Properties)

     

    通过jdbc读取数据库中的表
    orc(path:String)读取以orc格式进行存储的文件
    parquet(path:String)读取以parquet格式进行存储的文件
    schema(schema:StructType)这个是一个优化,当我们读入数据的时候指定了其schema,底层就不会再次解析schema从而进行了优化,一般不需要这样的优化,不进行此优化,时间效率还是可以接受

     

    2、第二种读入数据:这个读入数据的方法,主要是处理从一个数据表中选择部分字段,而不是选择表中的所有字段。那么这种需求,采用这个数据读入方式比较有优势。这种方式是直接写sql的查询语句。把上述json格式的数据保存为数据库中表的格式。需要注意的是这种只能处理数据库表数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    val peopleInfo = sql.sql("""
    |select
    | id,
    | name,
    | age
    |from peopleInfo
    """.stripMargin)//其中stripMargin方法是来解析我们写的sql语句的。
    //返回的结果是和read读取返回的结果是一样的:
    //org.apache.spark.sql.DataFrame =
    // [age: bigint, id: bigint, name: string]

    需要注意的是其返回的schmea中字段的顺序和我们查询的顺序还是不一致的。

     

    2.2  写入数据

     

    写入数据就比较的简单,因为其拥有一定的模式,按照这个模式进行数据的写入。一般情况下,我们需要写入的数据是一个DataFrame类型的,如果其不是DataFrame类型的我们需要把其转换为

    DataFrame类型,有些人可能会有疑问,数据读入到内存中,其类型是DataFrame类型,我们在处理数据时用到的是DataFrame类中的方法,但是DataFrame中的方法不一定返回值仍然是DataFrame类型的,同时有时我们需要构建自己的类型,所以我们需要为我们的数据构建成DataFrame的类型。把没有schema的数据,构建schema类型,我所知道的就有两种方法。

     

    1、通过类构建schema,还以上面的peopleInfo为例子。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    val sql = new SQLContext(sc) //创建一个SQLContext对象
    import sql.implicits._ //这个sql是上面我们定义的sql,而不是某一个jar包,网上有很多
                           //是import sqlContext.implicits._,那是他们定义的是
                           //sqlContext = SQLContext(sc),这个是scala的一个特性
    val people = sql.textFile("people.txt")//我们采用spark的类型读入数据,因为如果用
                                          //SQLContext进行读入,他们自动有了schema
    case clase People(id:Int,name:String,age:Int)//定义一个类
    val peopleInfo = people.map(lines => lines.split(","))
                            .map(p => People(p(0).toInt,p(1),p(2).toInt)).toDF
                            //这样的一个toDF就创建了一个DataFrame,如果不导入
                            //sql.implicits._,这个toDF方法是不可以用的。

    上面的例子是利用了scala的反射技术,生成了一个DataFrame类型。可以看出我们是把RDD给转换为DataFrame的。

     

    2、直接构造schema,以peopelInfo为例子。直接构造,我们需要把我们的数据类型进行转化成Row类型,不然会报错。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    val sql = new SQLContext(sc) //创建一个SQLContext对象
    val people = sc.textFile("people.txt").map(lines => lines.split(","))
    val peopleRow = sc.map(p => Row(p(0),p(1),(2)))//把RDD转化成RDD(Row)类型
    val schema = StructType(StructFile("id",IntegerType,true)::
                            StructFile("name",StringType,true)::
                            StructFile("age",IntegerType,true)::Nil)
    val peopleInfo = sql.createDataFrame(peopleRow,schema)//peopleRow的每一行的数据
                                                          //类型一定要与schema的一致
                                                          //否则会报错,说类型无法匹配
                                                          //同时peopleRow每一行的长度
                                                          //也要和schema一致,否则
                                                          //也会报错

    构造schema用到了两个类StructType和StructFile,其中StructFile类的三个参数分别是(字段名称,类型,数据是否可以用null填充)

    采用直接构造有很大的制约性,字段少了还可以,如果有几十个甚至一百多个字段,这种方法就比较耗时,不仅要保证Row中数据的类型要和我们定义的schema类型一致,长度也要一样,不然都会报错,所以要想直接构造schema,一定要细心细心再细心,本人就被自己的不细心虐惨了,处理的字段将近一百,由于定义的schema和我的数据类型不一致,我就需要每一个字段每一个字段的去确认,字段一多在对的时候就容易疲劳,就这样的一个错误,由于本人比较笨,就花费了一个下午的时间,所以字段多了,在直接构造schema的时候,一定要细心、细心、细心,重要的事情说三遍,不然会死的很惨。

     

    好了,现在我们已经把我们的数据转化成DataFrame类型的,下面就要往数据库中写我们的数据了

    写数据操作:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    val sql = new SQLContext(sc) 
    val people = sc.textFile("people.txt").map(lines => lines.split(","))
    val peopleRow = sc.map(p => Row(p(0),p(1),(2)))
    val schema = StructType(StructFile("id",IntegerType,true)::
                            StructFile("name",StringType,true)::
                            StructFile("age",IntegerType,true)::Nil)
    val peopleInfo = sql.createDataFrame(peopleRow,schema)
    peopleInfo.registerTempTable("tempTable")//只有有了这个注册的表tempTable,我们
                                             //才能通过sql.sql(“”“ ”“”)进行查询
                                             //这个是在内存中注册一个临时表用户查询
    sql.sql.sql("""
    |insert overwrite table tagetTable
    |select
    | id,
    | name,
    | age
    |from tempTable
    """.stripMargin)//这样就把数据写入到了数据库目标表tagetTable中

    有上面可以看到,sparkSQL的sql()其实就是用来执行我们写的sql语句的。

     

    好了,上面介绍了读和写的操作,现在需要对最重要的地方来进行操作了啊。

     

     

    2.3 通过DataFrame中的方法对数据进行操作

           

            在介绍DataFrame之前,我们还是要先明确一下,sparkSQL是用来干什么的,它主要为我们提供了怎样的便捷,我们为什么要用它。它是为了让我们能用写代码的形式来处理sql,这样说可能有点不准确,如果就这么简单,只是对sql进行简单的替换,要是我,我也不学习它,因为我已经会sql了,会通过sql进行处理数据仓库的etl,我还学习sparkSQL干嘛,而且学习的成本又那么高。sparkSQL肯定有好处了,不然也不会有这篇博客啦。我们都知道通过写sql来进行数据逻辑的处理时有限的,写程序来进行数据逻辑的处理是非常灵活的,所以sparkSQL是用来处理那些不能够用sql来进行处理的数据逻辑或者用sql处理起来比较复杂的数据逻辑。一般的原则是能用sql来处理的,尽量用sql来处理,毕竟开发起来简单,sql处理不了的,再选择用sparkSQL通过写代码的方式来处理。好了废话不多说了,开始DataFrame之旅。

           sparkSQL非常强大,它提供了我们sql中的正删改查所有的功能,每一个功能都对应了一个实现此功能的方法。

     

    对schema的操作

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    val sql = new SQLContext(sc)
    val people = sql.read.json("people.txt")//people是一个DataFrame类型的对象
     
    //数据读进来了,那我们查看一下其schema吧
     
    people.schema
     
    //返回的类型
    //org.apache.spark.sql.types.StructType = 
    //StructType(StructField(age,LongType,true), 
    //           StructField(id,LongType,true),
    //           StructField(name,StringType,true))
     
    //以数组的形式分会schema
     
    people.dtypes
     
    //返回的结果:
    //Array[(String, String)] = 
    //       Array((age,LongType), (id,LongType), (name,StringType))
     
     
    //返回schema中的字段
     
    people.columns
     
    //返回的结果:
    //Array[String] = Array(age, id, name)  
     
    //以tree的形式打印输出schema
     
    people.printSchema
     
    //返回的结果:
    //root
    // |-- age: long (nullable = true)
    // |-- id: long (nullable = true)
    // |-- name: string (nullable = true)

    对表的操作,对表的操作语句一般情况下是不常用的,因为虽然sparkSQL把sql查的每一个功能都封装到了一个方法中,但是处理起来还是不怎么灵活一般情况下我们采用的是用sql()方法直接来写sql,这样比较实用,还更灵活,而且代码的可读性也是很高的。那下面就把能用到的方法做一个简要的说明。

     

    方法(sql使我们定义的sql = new SQLContext(sc)) df是一个DataFrame对象实例说明
    sql.read.table(tableName)读取一张表的数据
    df.where(),          df.filter()

    过滤条件,相当于sql的where部分;

    用法:选择出年龄字段中年龄大于20的字段。

    返回值类型:DataFrame

     

    df.where("age >= 20"),df.filter("age >= 20")

    df.limit()

    限制输出的行数,对应于sql的limit

    用法:限制输出一百行

    返回值类型:DataFrame

     

    df.limit(100)

    df.join()

    链接操作,相当于sql的join

    对于join操作,下面会单独进行介绍

    df.groupBy()

    聚合操作,相当于sql的groupBy

    用法:对于某几行进行聚合

    返回值类型:DataFrame

     

    df.groupBy("id")

    df.agg()求聚合用的相关函数,下面会详细介绍
    df.intersect(other:DataFrame)

     

    求两个DataFrame的交集
    df.except(other:DataFrame)求在df中而不在other中的行
    df.withColumn(colName:String,col:Column)

    增加一列

    df.withColumnRenamed(exName,newName)对某一列的名字进行重新命名

    df.map(),

    df.flatMap,

    df.mapPartitions(),

    df.foreach()

    df.foreachPartition()

    df.collect()

    df.collectAsList()

    df.repartition()

    df.distinct()

    df.count()

    这些方法都是spark的RDD的基本操作,其中在DataFrame类中也封装了这些方法,需要注意的是这些方法的返回值是RDD类型的,不是DataFrame类型的,在这些方法的使用上,一定要记清楚返回值类型,不然就容易出现错误
    df.select()

    选取某几列元素,这个方法相当于sql的select的功能

    用法:返回选择的某几列数据

    返回值类型:DataFrame

     

    df.select("id","name")

    以上是两个都是一写基本的方法,下面就详细介绍一下join和agg,na,udf操作

     

    2.4 sparkSQL的join操作

     

        spark的join操作就没有直接写sql的join操作来的灵活,在进行链接的时候,不能对两个表中的字段进行重新命名,这样就会出现同一张表中出现两个相同的字段。下面就一点一点的进行展开用到的两个表,一个是用户信息表,一个是用户的收入薪资表:

    wKioL1fVFKHBWx8yAADS8HGHT7Q431.png-wh_50  wKioL1fVFMzjucKzAAEUkKPh1AA902.png-wh_50

     

    1、内连接,等值链接,会把链接的列合并成一个列

     

    1
    2
    3
    4
    5
    val sql = new SQLContext(sc)
    val pInfo = sql.read.json("people.txt")
    val pSalar = sql.read.json("salary.txt")
    val info_salary = pInfo.join(pSalar,"id")//单个字段进行内连接
    val info_salary1 = pInfo.join(pSalar,Seq("id","name"))//多字段链接

    返回的结果如下图:

    单个id进行链接 (一张表出现两个name字段)                                                两个字段进行链接

    wKiom1fVFlTwjQbwAAH3nXZEh58669.png-wh_50                                                    wKioL1fVFlTxwDx-AAFsuYdckaE090.png-wh_50

     

    2、join还支持左联接和右链接,但是其左联接和右链接和我们sql的链接的意思是一样的,同样也是在链接的时候不能对字段进行重新命名,如果两个表中有相同的字段,则就会出现在同一个join的表中,同事左右链接,不会合并用于链接的字段。链接用的关键词:outer,inner,left_outer,right_outer

    1
    2
    3
    4
    5
    //单字段链接
    val left = pInfo.join(pSalar,pInfo("id"=== pSalar("id"),"left_outer")
    //多字段链接
    val left2 = pInfo.join(pSalar,pInfo("id"=== pSalar("id") and 
                    pInfo("name"=== pSalar("name"),"left_outer")

    返回的结果:

    单字段链接                                                               多字段链接

    wKioL1fVGQiT_JQgAAF-BueVQ8A567.png-wh_50                  wKioL1fVGXeSkNMpAAE5TN2DOHg791.png-wh_50

     

    由上可以发现,sparkSQL的join操作还是没有sql的join灵活,容易出现重复的字段在同一张表中,一般我们进行链接操作时,我们都是先利用registerTempTable()函数把此DataFrame注册成一个内部表,然后通过sql.sql("")写sql的方法进行链接,这样可以更好的解决了重复字段的问题。

     

    2.5 sparkSQL的agg操作

        

         其中sparkSQL的agg是sparkSQL聚合操作的一种表达式,当我们调用agg时,其一般情况下都是和groupBy()的一起使用的,选择操作的数据表为:

    wKioL1fVFMzjucKzAAEUkKPh1AA902.png-wh_50

    1
    2
    3
    4
    val pSalar = new SQLContext(sc).read.json("salary.txt")
    val group = pSalar.groupBy("name").agg("salary" -> "avg")
    val group2 = pSalar.groupBy("id","name").agg("salary" -> "avg")
    val group3 = pSalar.groupBy("name").agg(Map("id" -> "avg","salary"->"max"))

    得到的结过如下:

       group的结果                                         group2                                           group3

    wKiom1fVHoODlce6AAD2HXgd44M826.png-wh_50   wKioL1fVHoOi8ZByAAELMO8I5kw789.png-wh_50    wKiom1fVHoSDVySrAAEKInS_Hh8231.png-wh_50 

     

    使用agg时需要注意的是,同一个字段不能进行两次操作比如:agg(Map("salary" -> "avg","salary" -> "max"),他只会计算max的操作,原因很简单,agg接入的参数是Map类型的key-value对,当key相同时,会覆盖掉之前的value。同时还可以直接使用agg,这样是对所有的行而言的。聚合所用的计算参数有:avg,max,min,sum,count,而不是只有例子中用到的avg

     

     

    2.6 sparkSQL的na操作

       

         sparkSQL的na方法,返回的是一个DataFrameFuctions对象,此类主要是对DataFrame中值为null的行的操作,只提供三个方法,drop()删除行,fill()填充行,replace()代替行的操作。很简单不做过多的介绍。

     

    3、总结

     

            我们使用sparkSQL的目的就是为了解决用写sql不能解决的或者解决起来比较困难的问题,在平时的开发过程中,我们不能为了高逼格什么样的sql问题都是用sparkSQL,这样不是最高效的。使用sparkSQL,主要是利用了写代码处理数据逻辑的灵活性,但是我们也不能完全的只使用sparkSQL提供的sql方法,这样同样是走向了另外一个极端,有上面的讨论可知,在使用join操作时,如果使用sparkSQL的join操作,有很多的弊端。为了能结合sql语句的优越性,我们可以先把要进行链接的DataFrame对象,注册成内部的一个中间表,然后在通过写sql语句,用SQLContext提供的sql()方法来执行我们写的sql,这样处理起来更加的合理而且高效。在工作的开发过程中,我们要结合写代码和写sql的各自的所长来处理我们的问题,这样会更加的高效。

           写这篇博客,花费了我两周的时间,由于工作比较忙,只有在业余时间进行思考和总结。也算对自己学习的一个交代。关于sparkSQL的两个类HiveContext和SQLContext提供的udf方法,如果用好了udf方法,可以使我们代码的开发更加的简洁和高效,可读性也是很强的。由于在代码中注册udf方法,还有很多很细的知识点需要注意,我准备在另外写一篇博客进行详细的介绍。

    展开全文
  • SparkSQL基础

    2018-12-01 18:33:10
    SparkSQL概述 SparkSQL是Spark的结构化数据处理模块。特点如下: 数据兼容:可从Hive表、外部数据库(JDBC)、RDD、Parquet 文件、JSON 文件获取数据; 组件扩展:SQL 语法解析器、分析器、优化器均可重新定义; ...

    SparkSQL概述

    SparkSQL是Spark的结构化数据处理模块。特点如下:

    • 数据兼容:可从Hive表、外部数据库(JDBC)、RDD、Parquet 文件、JSON 文件获取数据;
    • 组件扩展:SQL 语法解析器、分析器、优化器均可重新定义;
    • 性能优化:内存列存储、动态字节码生成等优化技术,内存缓存数据;
    • 多语言支持:Scala、Java、Python;
      Shark即Hive on Spark,Shark的设计导致了两个问题:
    • 执行计划优化完全依赖于Hive,不方便添加新的优化策略;
    • Spark是线程级并行,而MapReduce是进程级并行。Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支;

    Spark团队汲取了shark的优点重新设计了Spark Sql,使之在数据兼容、性能优化、组件扩展等方面得到极大的提升:
    数据兼容:不仅兼容Hive,还可以从RDD、parquet文件、Json文件获取数据、支持从RDBMS获取数据;
    性能优化:采用内存列式存储、自定义序列化器等方式提升性能;
    组件扩展:SQL的语法解析器、分析器、优化器都可以重新定义和扩展。

    Spark SQL 是 Spark 中用于处理结构化数据的模块;
    Spark SQL相对于RDD的API来说,提供更多结构化数据信息和计算方法;
    Spark SQL 提供更多额外的信息进行优化,可以通过SQL或DataSet API方式同Spark SQL进行交互。无论采用哪种方法,哪种语言进行计算操作,实际上都用相同的执行引擎,使用者可以在不同的API中进行切换,选择一种最自然的方式完成数据操作。
    Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据。
    从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了,Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。

    Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范;

    Spark第一代API:RDD

    优点:
    1)编译时类型安全,编译时就能检查出类型错误;
    2)面向对象的编程风格,直接通过class.name的方式来操作数据;
    idAge.filter(.age > “”) // 编译时报错, int不能跟与String比较
    idAgeRDDPerson.filter(
    .age > 25) // 直接操作一个个的person对象

    缺点:
    1)序列化和反序列化的性能开销,无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化;
    2)GC的性能开销,频繁的创建和销毁对象, 势必会增加GC。

    Spark第二代API:DataFrame

    DataFrame的前身是SchemaRDD。Spark1.3更名为DataFrame。不继承RDD,自己实现了RDD的大部分功能。可以在DataFrame上调用RDD的方法转化成另外一个RDD;
    DataFrame可以看做分布式Row对象的集合,提供了由列组成的详细模式信息,使其可以得到优化。DataFrame 不仅有比RDD更多的算子,还可以进行执行计划的优化;
    Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。

    DataFrame核心特征:
    Schema : 包含了以ROW为单位的每行数据的列的信息; Spark通过Schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了;

    off-heap : Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存;

    Tungsten:新的执行引擎;

    Catalyst:新的语法解析框架;

    Spark第二代API:DataFrame

    优点:
    off-heap类似于地盘, schema类似于地图, Spark有地图又有自己地盘了, 就可以自己说了算了, 不再受JVM的限制, 也就不再受GC的困扰了,通过schema和off-heap, DataFrame克服了RDD的缺点。对比RDD提升计算效率、减少数据读取、底层计算优化;

    缺点:
    DataFrame克服了RDD的缺点, 但是却丢了RDD的优点。DataFrame不是类型安全的, API也不是面向对象风格的。

    // API不是面向对象的
    idAgeDF.filter(idAgeDF.col(“age”) > 25)
    // 不会报错, DataFrame不是编译时类型安全的
    idAgeDF.filter(idAgeDF.col(“age”) > “”)

    Spark第三代API:Dataset;Dataset的核心:Encoder

    DataSet不同于RDD,没有使用Java序列化器或者Kryo进行序列化,而是使用一个特定的编码器进行序列化,这些序列化器可以自动生成,而且在spark执行很多操作(过滤、排序、hash)的时候不用进行反序列化。

    1)编译时的类型安全检查。性能极大的提升,内存使用极大降低、减少GC、极大的减少网络数据的传输、极大的减少scala和java之间代码的差异性。

    2)DataFrame每一个行对应了一个Row。而Dataset的定义更加宽松,每一个record对应了一个任意的类型。DataFrame只是Dataset的一种特例。

    3)不同于Row是一个泛化的无类型JVM object, Dataset是由一系列的强类型JVM object组成的,Scala的case class或者Java class定义。因此Dataset可以在编译时进行类型检查。

    4)Dataset以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。

    5)Dataset创立需要一个显式的Encoder,把对象序列化为二进制。

    SparkSQL API

    SparkSession:Spark的一个全新的切入点,统一Spark入口;

    Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,包括是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),用户不但可以使用DataFrame和Dataset的各种API,学习Spark的难度也会大大降低。
    在这里插入图片描述

    Dataset是一个类(RDD是一个抽象类,而Dataset不是抽象类),其中有三个参数:
    SparkSession(包含环境信息)
    QueryExecution(包含数据和执行逻辑)
    Encoder[T]:数据结构编码信息(包含序列化、schema、数据类型)

    Row

    Row是一个泛化的无类型JVM object

    Schema

    DataFrame(即带有Schema信息的RDD)
    Spark通过Schema就能够读懂数据

    什么是schema?
    DataFrame中提供了详细的数据结构信息,从而使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么,DataFrame中的数据结构信息,即为schema。
    // 最便捷
    val schema6 = (new StructType).
    add(“name”, “string”, false).
    add(“age”, “integer”, false).
    add(“height”, “integer”, false)

    Spark提供了一整套用于操纵数据的DSL
    (DSL :Domain Specified Language,领域专用语言)
    DSL在语义上与SQL关系查询非常相近

    Dataset的创建

    在这里插入图片描述
    在这里插入图片描述
    // 1、由range生成Dataset
    val numDS = spark.range(5, 100, 5)
    numDS.orderBy(desc(“id”)).show(5)
    numDS.describe().show

    // 2、由集合生成Dataset
    case class Person(name:String, age:Int, height:Int)
    // 注意 Seq 中元素的类型
    val seq1 = Seq(Person(“Jack”, 28, 184), Person(“Tom”, 10, 144), Person(“Andy”, 16, 165))
    val ds1 = spark.createDataset(seq1)
    ds1.show
    val seq2 = Seq((“Jack”, 28, 184), (“Tom”, 10, 144), (“Andy”, 16, 165))
    val ds2 = spark.createDataset(seq2)
    ds2.show
    // 3、集合转成DataFrame,并修改列名
    val seq1 = ((“Jack”, 28, 184), (“Tom”, 10, 144), (“Andy”, 16, 165))
    val df1 = spark.createDataFrame(seq1).withColumnRenamed("_1", “name1”).
    withColumnRenamed("_2", “age1”).withColumnRenamed("3", “height1”)
    df1.orderBy(desc(“age1”)).show(10)
    val df2 = spark.createDataFrame(seq1).toDF(“name”, “age”, “height”) // 简单!2.0.0的新方法
    // 4、RDD 转成DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.

    val arr = Array((“Jack”, 28, 184), (“Tom”, 10, 144), (“Andy”, 16, 165))
    val rdd1 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))
    val schema = StructType( StructField(“name”, StringType, false) :: StructField(“age”, IntegerType, false) ::
    StructField(“height”, IntegerType, false) :: Nil)
    val rddToDF = spark.createDataFrame(rdd1, schema)
    rddToDF.orderBy(desc(“name”)).show(false)
    // 5、RDD 转成 Dataset / DataFrame
    val rdd2 = spark.sparkContext.makeRDD(arr).map(f=>Person(f._1, f._2, f.3))
    val ds2 = rdd2.toDS()
    val df2 = rdd2.toDF()
    ds2.orderBy(desc(“name”)).show(10)
    df2.orderBy(desc(“name”)).show(10)
    // 6、rdd 转成 Dataset
    val ds3 = spark.createDataset(rdd2)
    ds3.show(10)
    // 7 读取文件
    val df5 = spark.read.csv(“file:///C:/Users/Administrator/Desktop/Spark-SQL/raw_user.csv”)
    df5.show()
    逗号分隔值(Comma-Separated Values,CSV,也称为字符分隔值,因为分隔字符也可以不是逗号),其文件以纯文本形式存储表格数据(数字和文本)。纯文本意味着该文件是一个字符序列,不含必须像二进制数字那样被解读的数据。CSV文件由任意数目的记录组成,记录间以某种换行符分隔;每条记录由字段组成,字段间的分隔符是其它字符或字符串,最常见的是逗号或制表符。通常,所有记录都有完全相同的字段序列。通常都是纯文本文件。
    // 8 读取文件,详细参数
    import org.apache.spark.sql.types.

    val schema2 = StructType( StructField(“name”, StringType, false) ::
    StructField(“age”, IntegerType, false) ::
    StructField(“height”, IntegerType, false) :: Nil)
    val df7 = spark.read.options(Map((“delimiter”, “,”), (“header”, “false”))). schema(schema2).csv(“file:///home/spark/t01.csv”)
    df7.show()

    spark.read 方法

    val df = spark.read.csv(“file:\\\C:\Users\Administrator\Desktop\spark\score.csv”)
    //自动类型推断
    val df = spark.read.option(“inferschema”,“true”).csv(“file:///home/spark/data/sparksql/t01.csv”)
    //以逗号为分隔符,不带表头
    val df = spark.read.options(Map((“delimiter”, “,”), (“header”, “false”))). schema(schema6).csv(“file:///home/spark/data/sparksql/t01.csv”)

    RDD、DataFrame、Dataset的共性与区别

    共性:
    1、RDD、DataFrame、Dataset都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;

    2、三者都有惰性机制。在进行创建、转换时(如map方法),不会立即执行;只有在遇到Action时(如foreach) ,才会开始遍历运算。极端情况下,如果代码里面仅有创建、转换,但后面没有在Action中使用对应的结果,在执行时会被直接跳过;

    3、三者都有partition的概念,进行缓存(cache)操作、还可以进行检查点(checkpoint)操作;

    4、三者有许多共同的函数,如map、filter,排序等;

    5、在对DataFrame和Dataset进行操作时,很多情况下需要 spark.implicits._ 进行支持;

    三者之间的转换

    RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换

    • DataFrame/Dataset 转 RDD:
      // 这个转换很简单
      val rdd1=testDF.rdd
      val rdd2=testDS.rdd

    • RDD 转 DataFrame:

    // 一般用元组把一行的数据写在一起,然后在toDF中指定字段名
    import spark.implicits._
    val testDF = rdd.map {line=>
    (line._1,line._2)
    }.toDF(“col1”,“col2”)

    • RDD 转 Dataet:
      // 核心就是要定义case class
      import spark.implicits._
      case class Coltest(col1:String, col2:Int)
      val testDS = rdd.map{line=>Coltest(line._1,line._2)}.toDS
    • Dataset 转 DataFrame:
      // 这个转换简单,只是把 case class 封装成Row
      import spark.implicits._
      val testDF = testDS.toDF
    • DataFrame 转 Dataset:
      // 每一列的类型后,使用as方法(as方法后面还是跟的case class,这个是核心),转成Dataset。
      import spark.implicits._
      case class Coltest … …
      val testDS = testDF.as[Coltest]
      特别注意:
      在使用一些特殊操作时,一定要加上import spark.implicits._ 不然toDF、toDS无法使用
      case class Person(name:String, age:Int, height:Int)
      val arr = Array((“Jack”, 28, 184), (“Tom”, 10, 144), (“Andy”, 16, 165))
      val rdd1 = sc.makeRDD(arr)
      val df = rdd1.toDF()
      val df = rdd1.toDF(“name”, “age”, “height”)
      df.as[Person]
      备注:DF转为DS时要求:二者的列名相同
      以下操作均报错:
      rdd1.toDF().as[Person]
      rdd1.toDF(“name1”, “age”, “height”).asPerson
      在这里插入图片描述

    DSL

    备注:
    1、在官方文档及源码中并没有action、transformation的提法
    2、这一部分的内容比较新

    数据类型

    在这里插入图片描述

    Transformation

    与RDD类似的操作
    map、filter、flatMap、mapPartitions、sample、 randomSplit、 limit、distinct、dropDuplicates、describe()
    存储相关
    cacheTable、persist、checkpoint、unpersist、cache
    select相关
    列的多种表示、select、selectExpr
    drop、withColumn、 withColumnRenamed、cast(内置函数)
    where相关
    where、filter
    groupBy相关
    groupBy、agg、max、min、avg(mean)、sum、count(后面5个为内置函数)
    orderBy相关
    orderBy、sort
    join相关
    join
    集合相关
    union、unionAll、intersect、except
    空值处理
    na.fill、na.drop

    // map、flatMap操作(与RDD基本类似)
    df1.map(row=>row.getAsInt).show
    // filter
    df1.filter(“sal>3000”).show
    // randomSplit(与RDD类似,将DF、DS按给定参数分成多份)
    val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
    df2(0).count
    df2(1).count
    df2(2).count

    // 取10行数据生成新的DataSet
    val df2 = df1.limit(10)

    // distinct,去重
    val df2 = df1.union(df1)
    df2.distinct.count

    // dropDuplicates,按列值去重
    df2.dropDuplicates.show
    df2.dropDuplicates(“mgr”, “deptno”).show
    df2.dropDuplicates(“mgr”).show
    df2.dropDuplicates(“deptno”).show
    // 返回全部列的统计(count、mean、stddev、min、max)
    ds1.describe().show

    // 返回指定列的统计
    ds1.describe(“sal”).show
    ds1.describe(“sal”, “comm”).show

    Actions

    df1.count

    // 缺省显示20行
    df1.union(df1).show()
    // 显示2行
    df1.show(2)
    // 不截断字符
    df1.toJSON.show(false)
    // 显示10行,不截断字符
    df1.toJSON.show(10, false)
    spark.catalog.listFunctions.show(10000, false)

    // collect返回的是数组, Array[org.apache.spark.sql.Row]
    val c1 = df1.collect()

    // collectAsList返回的是List, List[org.apache.spark.sql.Row]
    val c2 = df1.collectAsList()

    // 返回 org.apache.spark.sql.Row
    val h1 = df1.head()
    val f1 = df1.first()

    // 返回 Array[org.apache.spark.sql.Row],长度为3
    val h2 = df1.head(3)
    val f2 = df1.take(3)

    // 返回 List[org.apache.spark.sql.Row],长度为2
    val t2 = df1.takeAsList(2)
    // 结构属性
    df1.columns // 查看列名
    df1.dtypes // 查看列名和类型
    df1.explain() // 参看执行计划
    df1.col(“name”) // 获取某个列
    df1.printSchema // 常用

    select相关

    // 列的多种表示方法(5种)。使用""、""col()ds("")//:;使spark.implicitis.;df1.select(""、'、col()、ds("") // 注意:不要混用;必要时使用spark.implicitis._;并非每个表示在所有的地方都有效 df1.select(“ename”, $“hiredate”, $“sal”).show
    df1.select(“ename”, “hiredate”, “sal”).show
    df1.select('ename, 'hiredate, 'sal).show
    df1.select(col(“ename”), col(“hiredate”), col(“sal”)).show
    df1.select(df1(“ename”), df1(“hiredate”), df1(“sal”)).show

    // 下面的写法无效,其他列的表示法有效
    df1.select(“ename”, “hiredate”, “sal”+100).show
    df1.select(“ename”, “hiredate”, “sal+100”).show

    // 可使用expr表达式(expr里面只能使用引号)
    df1.select(expr(“comm+100”), expr(“sal+100”), expr(“ename”)).show
    df1.selectExpr(“ename as name”).show
    df1.selectExpr(“power(sal, 2)”, “sal”).show
    df1.selectExpr(“round(sal, -3) as newsal”, “sal”, “ename”).show
    // drop、withColumn、 withColumnRenamed、casting
    // drop 删除一个或多个列,得到新的DF
    df1.drop(“mgr”)
    df1.drop(“empno”, “mgr”)

    // withColumn,修改列值
    val df2 = df1.withColumn(“sal”, $“sal”+1000)
    df2.show

    // withColumnRenamed,更改列名
    df1.withColumnRenamed(“sal”, “newsal”)

    // 备注:drop、withColumn、withColumnRenamed返回的是DF

    // cast,类型转换(cast是函数,hive中也有类似的函数,用法基本类似)
    df1.selectExpr(“cast(empno as string)”).printSchema

    import org.apache.spark.sql.types._
    df1.select('empno.cast(StringType)).printSchema

    where 相关

    // where操作
    df1.filter(“sal>1000”).show
    df1.filter(“sal>1000 and job==‘MANAGER’”).show

    // filter操作
    df1.where(“sal>1000”).show
    df1.where(“sal>1000 and job==‘MANAGER’”).show

    groupBy 相关

    // groupBy、max、min、mean、sum、count(与df1.count不同)
    df1.groupBy(“Job”).sum(“sal”).show
    df1.groupBy(“Job”).max(“sal”).show
    df1.groupBy(“Job”).min(“sal”).show
    df1.groupBy(“Job”).avg(“sal”).show
    df1.groupBy(“Job”).count.show

    // agg
    df1.groupBy().agg(“sal”->“max”, “sal”->“min”, “sal”->“avg”, “sal”->“sum”, “sal”->“count”).show
    df1.groupBy(“Job”).agg(“sal”->“max”, “sal”->“min”, “sal”->“avg”, “sal”->“sum”, “sal”->“count”).show

    // 这种方式更好理解
    df1.groupBy(“Job”).agg(max(“sal”), min(“sal”), avg(“sal”), sum(“sal”), count(“sal”)).show
    // 给列取别名
    df1.groupBy(“Job”).agg(max(“sal”), min(“sal”), avg(“sal”), sum(“sal”), count(“sal”)).withColumnRenamed(“min(sal)”, “min1”).show
    // 给列取别名,最简便
    df1.groupBy(“Job”).agg(max(“sal”).as(“max1”), min(“sal”).as(“min2”), avg(“sal”).as(“avg3”), sum(“sal”).as(“sum4”), count(“sal”).as(“count5”)).show

    orderBy、sort 相关

    // orderBy
    df1.orderBy(“sal”).show
    df1.orderBy("sal").showdf1.orderBy("sal").show df1.orderBy(“sal”.asc).show
    df1.orderBy('sal).show
    df1.orderBy(col(“sal”)).show
    df1.orderBy(df1(“sal”)).show

    df1.orderBy("sal".desc).showdf1.orderBy(sal).showdf1.orderBy(deptno,sal).show//sortdf1.sort("sal").showdf1.sort("sal".desc).show df1.orderBy(-'sal).show df1.orderBy(-'deptno, -'sal).show // sort,以下语句等价 df1.sort("sal").show df1.sort(“sal”).show
    df1.sort($“sal”.asc).show
    df1.sort('sal).show
    df1.sort(col(“sal”)).show
    df1.sort(df1(“sal”)).show

    df1.sort($“sal”.desc).show
    df1.sort(-'sal).show
    df1.sort(-'deptno, -'sal).show

    join 相关

    // 1、笛卡尔积
    df1.crossJoin(df1).count
    // 2、等值连接(连接字段仅显示一次)
    df1.join(df1, Seq(“empno”, “ename”)).show
    ds1.join(ds2, “sname”).show
    ds1.join(ds2, Seq(“sname”), “inner”).show
    ds1.join(ds2, ds1(“sname”)===ds2(“sname”), “inner”).show
    // 10种join的连接方式(下面有9种,还有一种是笛卡尔积)
    ds1.join(ds2, “sname”).show
    ds1.join(ds2, Seq(“sname”), “inner”).show

    ds1.join(ds2, Seq(“sname”), “left”).show
    ds1.join(ds2, Seq(“sname”), “left_outer”).show

    ds1.join(ds2, Seq(“sname”), “right”).show
    ds1.join(ds2, Seq(“sname”), “right_outer”).show

    ds1.join(ds2, Seq(“sname”), “outer”).show
    ds1.join(ds2, Seq(“sname”), “full”).show
    ds1.join(ds2, Seq(“sname”), “full_outer”).show

    // 类似于集合求交
    ds1.join(ds2, Seq(“sname”), “left_semi”).show
    // 类似于集合求差
    ds1.join(ds2, Seq(“sname”), “left_anti”).show

    备注:DS在join操作之后变成了DF

    集合相关

    // union、unionAll、intersect、except。集合的交、并、差
    val ds3 = ds1.select(“sname”)
    val ds4 = ds2.select(“sname”)

    // union 求并集,不去重
    ds3.union(ds4).show

    // unionAll、union 等价;unionAll过期方法,不建议使用
    ds3.unionAll(ds4).show

    // intersect 求交
    ds3.intersect(ds4).show

    // except 求差
    ds3.except(ds4).show

    空值处理

    // NaN 非法值
    math.sqrt(-1.0); math.sqrt(-1.0).isNaN()

    df1.show
    // 删除所有列的空值和NaN
    df1.na.drop.show

    // 删除某列的空值和NaN
    df1.na.drop(Array(“mgr”)).show

    // 对全部列填充;对指定单列填充;对指定多列填充
    df1.na.fill(1000).show
    df1.na.fill(1000, Array(“comm”)).show
    df1.na.fill(Map(“mgr”->2000, “comm”->1000)).show

    // 对指定的值进行替换
    df1.na.replace(“comm” :: “deptno” :: Nil, Map(0 -> 100, 10 -> 100)).show

    // 查询空值列或非空值列。isNull、isNotNull为内置函数
    df1.filter(“comm is null”).show
    df1.filter($“comm”.isNull).show
    df1.filter(col(“comm”).isNull).show

    df1.filter(“comm is not null”).show
    df1.filter(col(“comm”).isNotNull).show

    时间日期函数

    // 各种时间函数
    df1.select(year("hiredate")).showdf1.select(weekofyear("hiredate")).show df1.select(weekofyear(“hiredate”)).show
    df1.select(minute("hiredate")).showdf1.select(dateadd("hiredate")).show df1.select(date_add(“hiredate”, 1), $“hiredate”).show
    df1.select(current_date).show
    df1.select(unix_timestamp).show

    val df2 = df1.select(unix_timestamp as “unixtime”)
    df2.select(from_unixtime($“unixtime”)).show

    // 计算年龄
    df1.select(round(months_between(current_date, $“hiredate”)/12)).show

    json 数据源

    // 读数据(txt、csv、json、parquet、jdbc)
    val df2 = spark.read.json(“data/employees.json”)
    df2.show
    // 备注:SparkSQL中支持的json文件,文件内容必须在一行中

    // 写文件
    df1.select(“ename”, “sal”).write.format(“csv”).save(“data/t2”)
    df1.select(“ename”, “sal”).write
    .option(“header”, true)
    .format(“csv”).save(“data/t2”)

    DF、DS对象上的SQL语句

    // 注册为临时视图。
    // 有两种形式:createOrReplaceTempView / createTempView
    df1.createOrReplaceTempView(“temp1”)
    spark.sql(“select * from temp1”).show

    df1.createTempView(“temp2”)
    spark.sql(“select * from temp2”).show

    // 使用下面的语句可以看见注册的临时表
    spark.catalog.listTables.show

    备注:
    1、spark.sql返回的是DataFrame;
    2、如果TempView已经存在,使用createTempView会报错;
    3、SQL的语法与HQL兼容;

    展开全文
  • Spark计算引擎之SparkSQL详解

    万次阅读 2019-08-06 16:44:48
    一、Spark SQL 二、Spark SQL 1.Spark SQL概述 1.1.Spark SQL的前世今生 Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容。Shark建立在Hive的代码基础上,并通过将Hive的部分物理执行计划交换出来...

    一、Spark SQL

    二、 Spark SQL

    1. Spark SQL概述

    1.1. Spark SQL的前世今生

        Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容。Shark建立在Hive的代码基础上,并通过将Hive的部分物理执行计划交换出来。这个方法使得Shark的用户可以加速Hive的查询,但是Shark继承了Hive的大且复杂的代码使得Shark很难优化和维护,同时Shark依赖于Spark的版本。随着我们遇到了性能优化的上限,以及集成SQL的一些复杂的分析功能,我们发现Hive的MapReduce设计的框架限制了Shark的发展。在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。

    1.2. 什么是Spark SQL

    Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

    相比于Spark RDD API,Spark SQL包含了对结构化数据和在其上运算的更多信息,Spark SQL使用这些信息进行了额外的优化,使对结构化数据的操作更加高效和方便。

    有多种方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但无论是哪种API或者是编程语言,它们都是基于同样的执行引擎,因此你可以在不同的API之间随意切换,它们各有各的特点,看你喜欢那种风格。

    1.3. 为什么要学习Spark SQL 

    我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群中去执行,大大简化了编写MapReduce程序的复杂性,由于MapReduce这种计算模型执行效率比较慢,所以Spark SQL应运而生,它是将Spark SQL转换成RDD,然后提交到集群中去运行,执行效率非常快!

    1.易整合

     

    将sql查询与spark程序无缝混合,可以使用java、scala、python、R等语言的API操作。

     

    2.统一的数据访问

     

    以相同的方式连接到任何数据源。

    3.兼容Hive

     

    支持hiveSQL的语法。

    4.标准的数据连接

     

    可以使用行业标准的JDBC或ODBC连接。

    2. DataFrame

    2.1. 什么是DataFrame

    DataFrame的前身是SchemaRDD,从Spark 1.3.0开始SchemaRDD更名为DataFrame。与SchemaRDD的主要区别是:DataFrame不再直接继承自RDD,而是自己实现了RDD的绝大多数功能。你仍旧可以在DataFrame上调用rdd方法将其转换为一个RDD。

    在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。DataFrame可以从很多数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表。

    2.2. DataFrame与RDD的区别

    RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。DataFrame和普通的RDD的逻辑框架区别如下所示:

     

    上图直观地体现了DataFrame和RDD的区别。

    左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解 Person类的内部结构。

    而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。这样看起来就像一张表了,DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...)。

    此外DataFrame还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存. 由于Spark理解schema, 所以知道该如何操作。

    RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

    有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。

    不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

     

    2.3. DataFrame与RDD的优缺点

    RDD的优缺点:

    优点:

    (1)编译时类型安全 

    编译时就能检查出类型错误

    (2)面向对象的编程风格 

    直接通过对象调用方法的形式来操作数据

    缺点:

    (1)序列化和反序列化的性能开销 

    无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。

    (2)GC的性能开销 

    频繁的创建和销毁对象, 势必会增加GC

    DataFrame通过引入schema和off-heap(不在堆里面的内存,指的是除了不在堆的内存,使用操作系统上的内存),解决了RDD的缺点, Spark通过schame就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了;通过off-heap引入,可以快速的操作数据,避免大量的GC。但是却丢了RDD的优点,DataFrame不是类型安全的, API也不是面向对象风格的。

     

    2.4. 读取数据源创建DataFrame

    2.4.1 读取文本文件创建DataFrame

      在spark2.0版本之前,Spark SQL中SQLContext是创建DataFrame和执行SQL的入口,利用hiveContext通过hive sql语句操作hive表数据,兼容hive操作,并且hiveContext继承自SQLContext。在spark2.0之后,这些都统一于SparkSession,SparkSession 封装了 SparkContext,SqlContext,通过SparkSession可以获取到SparkConetxt,SqlContext对象。

     

    (1)在本地创建一个文件,有三列,分别是id、name、age,用空格分隔,然后上传到hdfs上。person.txt内容为:

    1 zhangsan 20

    2 lisi 29

    3 wangwu 25

    4 zhaoliu 30

    5 tianqi 35

    6 kobe 40

    上传数据文件到HDFS上:

    hdfs dfs -put person.txt  /

     

    (2)在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割

    先执行 spark-shell --master local[2]

    val lineRDD= sc.textFile("/person.txt").map(_.split(" "))

     

     

    (3)定义case class(相当于表的schema)

    case class Person(id:Int, name:String, age:Int)

     

    (4)将RDD和case class关联

    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

     

    (5)将RDD转换成DataFrame

    val personDF = personRDD.toDF

     

     

    (6)对DataFrame进行处理

    personDF.show

     

    personDF.printSchema

     

    (7)、通过SparkSession构建DataFrame

    使用spark-shell中已经初始化好的SparkSession对象spark生成DataFrame

    val dataFrame=spark.read.text("/person.txt")

     

    2.4.2 读取json文件创建DataFrame

    (1)数据文件

    使用spark安装包下的

    /opt/bigdata/spark/examples/src/main/resources/people.json文件

    (2)在spark shell执行下面命令,读取数据

    val jsonDF= spark.read.json("file:///opt/bigdata/spark/examples/src/main/resources/people.json")

     

     

    (3)接下来就可以使用DataFrame的函数操作

     

     

    2.4.3 读取parquet列式存储格式文件创建DataFrame

    (3)数据文件

    使用spark安装包下的

    /opt/bigdata/spark/examples/src/main/resources/users.parquet文件

     

    (2)在spark shell执行下面命令,读取数据

    val parquetDF=spark.read.parquet("file:///opt/bigdata/spark/examples/src/main/resources/users.parquet")

    (3)接下来就可以使用DataFrame的函数操作

    3.DataFrame常用操作

    3.1.  DSL风格语法

    DataFrame提供了一个领域特定语言(DSL)以方便操作结构化数据。下面是一些使用示例

    (1)查看DataFrame中的内容,通过调用show方法

    personDF.show

    (2)查看DataFrame部分列中的内容

    查看name字段的数据

    personDF.select(personDF.col("name")).show

    查看name字段的另一种写法

    查看 name 和age字段数据personDF.select(col("name"), col("age")).show

    (3)打印DataFrame的Schema信息

    personDF.printSchema

    (4)查询所有的name和age,并将age+1

    personDF.select(col("id"), col("name"), col("age") + 1).show

    也可以这样:

    personDF.select(personDF("id"), personDF("name"), personDF("age") + 1).show

    (5)过滤age大于等于25的,使用filter方法过滤

    personDF.filter(col("age") >= 25).show

    (6)统计年龄大于30的人数

    personDF.filter(col("age")>30).count()

    (7)按年龄进行分组并统计相同年龄的人数

    personDF.groupBy("age").count().show

    3.2. SQL风格语法

     DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL查询,结果将作为一个DataFrame返回。

    如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:

    personDF.registerTempTable("t_person")

    (1)查询年龄最大的前两名

    spark.sql("select * from t_person order by age desc limit 2").show

    (2)显示表的Schema信息

    spark.sql("desc t_person").show

    (3)查询年龄大于30的人的信息

    spark.sql("select * from t_person where age > 30 ").show

    4.DataSet

    4.1.  什么是DataSet

    DataSet是分布式的数据集合。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及Spark SQL优化的执行引擎。DataSet可以通过JVM的对象进行构建,可以用函数式的转换(map/flatmap/filter)进行多种操作。

     

    4.2.  DataFrame、DataSet、RDD的区别

    假设RDD中的两行数据长这样:

    那么DataFrame中的数据长这样:

    那么Dataset中的数据长这样:

    或者长这样(每行数据是个Object):

    DataSet包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。

    (1)DataSet可以在编译时检查类型

    (2)并且是面向对象的编程接口

    相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,这会浪费大量的时间,这也是引入Dataset的一个重要原因。

    4.3.  DataFrame与DataSet的互转

    DataFrame和DataSet可以相互转化。

    (1)DataFrame转为 DataSet

    df.as[ElementType]这样可以把DataFrame转化为DataSet。

    (2)DataSet转为DataFrame 

    ds.toDF()这样可以把DataSet转化为DataFrame。

     

    4.4.  创建DataSet

    (1)通过spark.createDataset创建

    (2)通toDS方法生成DataSet

    (3)通过DataFrame转化生成

    使用as[]转换为DataSet

    更多DataSet操作API地址:

    http://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.sql.Dataset

    三、 以编程方式执行Spark SQL查询

    1. 编写Spark SQL程序实现RDD转换DataFrame

    前面我们学习了如何在Spark Shell中使用SQL完成查询,现在我们来实现在自定义的程序中编写Spark SQL查询程序。

    在Spark SQL中有两种方式可以在DataFrame和RDD进行转换,第一种方法是利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。

    第二种方法通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。

    首先在maven项目的pom.xml中添加Spark SQL的依赖

     

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-sql_2.11</artifactId>

        <version>2.0.2</version>

    </dependency>

     

    1.1. 通过反射推断Schema

    Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被反射读取并成为表的列名。这种RDD可以高效的转换为DataFrame并注册为表。

    代码如下:

    package cn.itcast.sql

    import org.apache.spark.SparkContext

    import org.apache.spark.rdd.RDD

    import org.apache.spark.sql.{DataFrame, SparkSession}

     

    /**

      * RDD转化成DataFrame:利用反射机制

      */

    //todo:定义一个样例类Person

    case class Person(id:Int,name:String,age:Int) extends Serializable

     

    object InferringSchema {

     

      def main(args: Array[String]): Unit = {

          //todo:1、构建sparkSession 指定appName和master的地址

        val spark: SparkSession = SparkSession.builder()

                                  .appName("InferringSchema")

                                  .master("local[2]").getOrCreate()

          //todo:2、从sparkSession获取sparkContext对象

          val sc: SparkContext = spark.sparkContext

          sc.setLogLevel("WARN")//设置日志输出级别

          //todo:3、加载数据

          val dataRDD: RDD[String] = sc.textFile("D:\\person.txt")

          //todo:4、切分每一行记录

          val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))

          //todo:5、将RDD与Person类关联

          val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt))

          //todo:6、创建dataFrame,需要导入隐式转换

          import spark.implicits._

          val personDF: DataFrame = personRDD.toDF()

     

        //todo-------------------DSL语法操作 start--------------

        //1、显示DataFrame的数据,默认显示20行

        personDF.show()

        //2、显示DataFrame的schema信息

        personDF.printSchema()

        //3、显示DataFrame记录数

        println(personDF.count())

        //4、显示DataFrame的所有字段

        personDF.columns.foreach(println)

        //5、取出DataFrame的第一行记录

        println(personDF.head())

        //6、显示DataFrame中name字段的所有值

        personDF.select("name").show()

        //7、过滤出DataFrame中年龄大于30的记录

        personDF.filter($"age" > 30).show()

        //8、统计DataFrame中年龄大于30的人数

        println(personDF.filter($"age">30).count())

        //9、统计DataFrame中按照年龄进行分组,求每个组的人数

        personDF.groupBy("age").count().show()

        //todo-------------------DSL语法操作 end-------------

     

        //todo--------------------SQL操作风格 start-----------

        //todo:将DataFrame注册成表

        personDF.createOrReplaceTempView("t_person")

        //todo:传入sql语句,进行操作

     

        spark.sql("select * from t_person").show()

     

        spark.sql("select * from t_person where name='zhangsan'").show()

     

        spark.sql("select * from t_person order by age desc").show()

        //todo--------------------SQL操作风格 end-------------

     

     

        sc.stop()

      }

    }

     

    1.2. 通过StructType直接指定Schema

    当case class不能提前定义好时,可以通过以下三步通过代码创建DataFrame

    (1)将RDD转为包含row对象的RDD

    (2)基于structType类型创建schema,与第一步创建的RDD相匹配

    (3)通过sparkSession的createDataFrame方法对第一步的RDD应用

    schema创建DataFrame

    package cn.itcast.sql

     

    import org.apache.spark.SparkContext

    import org.apache.spark.rdd.RDD

    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

    import org.apache.spark.sql.{DataFrame, Row, SparkSession}

     

    /**

      * RDD转换成DataFrame:通过指定schema构建DataFrame

      */

    object SparkSqlSchema {

      def main(args: Array[String]): Unit = {

          //todo:1、创建SparkSession,指定appName和master

          val spark: SparkSession = SparkSession.builder()

                                    .appName("SparkSqlSchema")

                                    .master("local[2]")

                                    .getOrCreate()

          //todo:2、获取sparkContext对象

        val sc: SparkContext = spark.sparkContext

          //todo:3、加载数据

        val dataRDD: RDD[String] = sc.textFile("d:\\person.txt")

          //todo:4、切分每一行

        val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))

          //todo:5、加载数据到Row对象中

        val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))

          //todo:6、创建schema

        val schema:StructType= StructType(Seq(

                                          StructField("id", IntegerType, false),

                                          StructField("name", StringType, false),

                                          StructField("age", IntegerType, false)

                                        ))

     

         //todo:7、利用personRDD与schema创建DataFrame

        val personDF: DataFrame = spark.createDataFrame(personRDD,schema)

     

        //todo:8、DSL操作显示DataFrame的数据结果

        personDF.show()

     

        //todo:9、将DataFrame注册成表

        personDF.createOrReplaceTempView("t_person")

        

        //todo:10、sql语句操作

        spark.sql("select * from t_person").show()

     

        spark.sql("select count(*) from t_person").show()

     

     

        sc.stop()

      }

    }

     

    2. 编写Spark SQL程序操作HiveContext

    HiveContext是对应spark-hive这个项目,与hive有部分耦合, 支持hql,是SqlContext的子类,也就是说兼容SqlContext;

    2.1. 添加pom依赖

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-hive_2.11</artifactId>

        <version>2.0.2</version>

    </dependency>

    2.2. 代码实现

    package itcast.sql

     

    import org.apache.spark.sql.SparkSession

    /**

      * todo:支持hive的sql操作

      */

    object HiveSupport {

      def main(args: Array[String]): Unit = {

          val warehouseLocation = "D:\\workSpace_IDEA_NEW\\day2017-10-12\\spark-warehouse"

          //todo:1、创建sparkSession

         val spark: SparkSession = SparkSession.builder()

           .appName("HiveSupport")

           .master("local[2]")

           .config("spark.sql.warehouse.dir", warehouseLocation)

           .enableHiveSupport() //开启支持hive

           .getOrCreate()

        spark.sparkContext.setLogLevel("WARN")  //设置日志输出级别

        import spark.implicits._

        import spark.sql

     

        //todo:2、操作sql语句

        sql("CREATE TABLE IF NOT EXISTS person (id int, name string, age int) row format delimited fields terminated by ' '")

        sql("LOAD DATA LOCAL INPATH '/person.txt' INTO TABLE person")

        sql("select * from person ").show()

        spark.stop()

      }

    }

     

     

     

     

     

     

    四、 数据源

    1. JDBC

    Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

    1.1. SparkSql从MySQL中加载数据

    1.1.1 通过IDEA编写SparkSql代码

    package itcast.sql

    import java.util.Properties

    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**

      * todo:Sparksql从mysql中加载数据

      */

    object DataFromMysql {

      def main(args: Array[String]): Unit = {

          //todo:1、创建sparkSession对象

          val spark: SparkSession = SparkSession.builder()

            .appName("DataFromMysql")

            .master("local[2]")

            .getOrCreate()

        //todo:2、创建Properties对象,设置连接mysql的用户名和密码

        val properties: Properties =new Properties()

        properties.setProperty("user","root")

        properties.setProperty("password","123456")

        //todo:3、读取mysql中的数据

        val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.200.150:3306/spark","iplocaltion",properties)

        //todo:4、显示mysql中表的数据

        mysqlDF.show()

        spark.stop()

      }

    }

    执行查看效果:

    1.1.2 通过spark-shell运行

    (1)、启动spark-shell(必须指定mysql的连接驱动包)

     

    spark-shell \

    --master spark://hdp-node-01:7077 \

    --executor-memory 1g \

    --total-executor-cores  2 \

    --jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar \

    --driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar

     

    (2)、从mysql中加载数据

    val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocaltion", "user" -> "root", "password" -> "123456")).load()

     

    (3)、执行查询

    1.2. SparkSql将数据写入到MySQL中

    1.2.1 通过IDEA编写SparkSql代码

    (1)编写代码

    package itcast.sql

    import java.util.Properties

    import org.apache.spark.rdd.RDD

    import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

    /**

      * todo:sparksql写入数据到mysql中

      */

    object SparkSqlToMysql {

      def main(args: Array[String]): Unit = {

        //todo:1、创建sparkSession对象

          val spark: SparkSession = SparkSession.builder()

            .appName("SparkSqlToMysql")

            .getOrCreate()

        //todo:2、读取数据

          val data: RDD[String] = spark.sparkContext.textFile(args(0))

        //todo:3、切分每一行,

        val arrRDD: RDD[Array[String]] = data.map(_.split(" "))

        //todo:4、RDD关联Student

        val studentRDD: RDD[Student] = arrRDD.map(x=>Student(x(0).toInt,x(1),x(2).toInt))

        //todo:导入隐式转换

        import spark.implicits._

        //todo:5、将RDD转换成DataFrame

        val studentDF: DataFrame = studentRDD.toDF()

        //todo:6、将DataFrame注册成表

        studentDF.createOrReplaceTempView("student")

        //todo:7、操作student表 ,按照年龄进行降序排列

        val resultDF: DataFrame = spark.sql("select * from student order by age desc")

     

        //todo:8、把结果保存在mysql表中

          //todo:创建Properties对象,配置连接mysql的用户名和密码

          val prop =new Properties()

          prop.setProperty("user","root")

          prop.setProperty("password","123456")

     

      resultDF.write.jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)

     

        //todo:写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错

        //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)

        spark.stop()

      }

    }

    //todo:创建样例类Student

    case class Student(id:Int,name:String,age:Int)

     

     

    (2)用maven将程序打包

    通过IDEA工具打包即可

     

    (3)将Jar包提交到spark集群

    spark-submit \

    --class itcast.sql.SparkSqlToMysql \

    --master spark://hdp-node-01:7077 \

    --executor-memory 1g \

    --total-executor-cores 2 \

    --jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar  \

    --driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar \

    /root/original-spark-2.0.2.jar  /person.txt

     


    (4)查看mysql中表的数据

     

     

    展开全文
  • Spark 之 SQL 学习笔记

    千次阅读 2019-01-29 21:56:16
    目录 概述 为什么学习Spark SQL: Spark SQL的版本迭代 SparkSession sparkSession概念解释: 特点 创建SparkSession 在spark-shell中创建 在IDEA中创建SparkSession RDD,DataFrame 和 DataSet ...Da...

    目录

    概述

    为什么学习Spark SQL:

    Spark SQL的版本迭代

    SparkSession

    sparkSession概念解释:

    特点

    创建SparkSession

    在spark-shell中创建

    在IDEA中创建SparkSession

    RDD,DataFrame 和 DataSet

    RDD的局限性

    什么是DataFrame

    特点

    DataFrame解释

    DataFrame编程

    DataSet

    为什么产生DataSet

    解释

    为什么需要 DataFrame 和 DataSet

    Spark SQL 程序编写步骤

    创建DataFrame

    DataFrame常用操作

    DSL风格语法

    SQL风格语法

    DataFrame 支持的操作

    以编程方式执行Spark SQL

    编写 Spark SQL 查询程序

    提交Spark任务

    数据源

    通用的load和save功能

    Save Model

    JDBC

    从 MySQL 中加载数据(Spark Shell 方式)

    将数据写入 MySQL 中(Spark Submit 方式)

    JSON

    Parquet Files

    Spark On Yarn

    Spark 整合 Hive

    SparkSQL自定义聚合函数

    SparkSQL 定义普通函数 

    定义 SparkSQL 的自定义聚集函数 

    SparkSQL 常用窗口分析函数


    概述

    (版本:Spark 2.3.2)

    Spark SQL 是 Spark 用来处理结构化数据(结构化数据可以来自外部结构化数据源也可以通过 RDD 获取)的一个模块,它提供了一个编程抽象叫做 DataFrame 并且作为分布式 SQL 查询引擎的作用。

    外部的结构化数据源包括 JSON、Parquet(默认)、RMDBS、Hive 等。当前 Spark SQL 使用 Catalyst优化器来对 SQL 进行优化,从而得到更加高效的执行方案。并且可以将结果存储到外部系统。


    为什么学习Spark SQL:

    • 首先

    我们已经学习了 Hive,它是将 Hive SQL 转换成 MapReduce 然后提交到集群上执行,大大简化了编写 MapReduce 的程序的复杂性,由于 MapReduce 这种计算模型执行效率比较慢。所以 Spark SQL 就应运而生,它的工作机制是将 Spark SQL 的 SQL 查询转换成 Spark Core 的应用程序,然后提交到集群执行,执行效率非常快!

    • 其次,SparkSQL的特点
    1. 容易整合
    2. 统一的数据访问格式
    3. 兼容Hive
    4. 标准的数据连接

    Spark SQL的版本迭代

    1. SparkSQL 的前身是 Shark。由于 Shark 自身的不完善,2014 年 6 月 1 日 Reynold Xin 宣布:停止对 Shark 的开发。SparkSQL 抛弃原有 Shark 的代码,汲取了 Shark 的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive 兼容性等,重新开发 SparkSQL。
    2. Spark-1.1:2014 年 9 月 11 日,发布 Spark1.1.0。Spark 从 1.0 开始引入 SparkSQL(Shark不再支持升级与维护)。Spark1.1.0 变化较大是 SparkSQL 和 MLlib
    3. Spark-1.3:增加 DataFrame 新 API
    4. Spark-1.4:增加窗口分析函数
    5. Spark 1.5:钨丝计划。Hive 中有 UDF 与 UDAF,Spark 中对 UDF 支持较早UDAF:User Defined Aggregate Function 用户自定义聚合函数,直到 Spark 1.5.x 才引入的最新特性
    6. spark-1.6:执行的 sql 中可以增加"--"注释,Spark-1.5/1.6 的新特性,引入 DataSet 的概念
    7. Spark-2.x:SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入SparkSession 统一了 RDD,DataFrame,DataSet 的编程入口

    SparkSession

    sparkSession概念解释:

    SparkSession 是 Spark-2.0 引如的新概念。SparkSession 为用户提供了统一的切入点,来让用户学习 Spark 的各项功能。在 Spark 的早期版本中,SparkContext 是 Spark 的主要切入点,由于 RDD 是主要的 API,我们通过 sparkContext 来创建和操作 RDD。对于每个其他的 API,我们需要使用不同的 context。

    例如:
    对于 Spark Streaming,我们需要使用 StreamingContext
    对于 Spark SQL,使用 SQLContext
    对于 Hive,使用 HiveContext

    但是随着 DataSet 和 DataFrame 的 API 逐渐成为标准的 API,就需要为他们建立接入点。所以在 Spark2.0 中,引入SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession封装了 SparkConf、SparkContext 和 SQLContext。为了向后兼容,SQLContext 和 HiveCont也被保存下来。SparkSession 实质上是 SQLContext 和 HiveContext 的组合,所以在 SQLContext 和 HiveContext上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 SparkContext 完成的。

    特点

    1. 为用户提供一个统一的切入点使用 Spark 各项功能
    2. 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
    3. 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
    4. 与 Spark 交互之时不需要显示的创建 SparkConf、SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中
    5. SparkSession 提供对 Hive 特征的内部支持:用 HiveQL 写 SQL 语句,访问 Hive UDFs,从Hive 表中读取数据。

    创建SparkSession

    在spark-shell中创建

    [hadoop@hadoop02 ~]$ ~/apps/spark-2.3.1-bin-hadoop2.7/bin/spark-shell \
    > --master spark://hadoop02:7077 \
    > --executor-memory 512m \
    > --total-executor-cores 1
    

    SparkSession 会被自动初始化一个对象叫做 spark,为了向后兼容,Spark-Shell 还提供了一个sparkContext 的初始化对象,方便用户操作:

    在IDEA中创建SparkSession

    RDD,DataFrame 和 DataSet

    RDD的局限性

    RDD 仅表示数据集,RDD 没有元数据,也就是说没有字段语义定义。它需要用户自己优化程序,对程序员要求较高,从不同数据源读取数据相对困难,读取到不同格式的数据都必须用户自己定义转换方式合并多个数据源中的数据也较困难。

    SparkCore的RDD编程

    1)首先要找到程序入口(SparkContext)
    2)通过程序入口构建一个 RDD(核心的抽象 RDD)
    3)对写 RDD 进行 Transformation 或者 Action 的操作
    4)对最后的结果进行处理(输出或者存入数据库等)

    什么是DataFrame

    由于 RDD 的局限性,Spark 产生了 DataFrame,其中 Schema 是就是元数据,是语义描述信息。在 Spark1.3 之前,DataFrame 被称为SchemaRDD。以行为单位构成的分布式数据集合,按照列赋予不同的名称。对 select、fileter、aggregation 和 sort 等操作符的抽象。

    DataFrame = RDD+Schema = SchemaRDD

    特点

    1. 内部数据无类型,统一为 Row
    2. DataFrame 是一种特殊类型的 Dataset,DataSet[Row] = DataFrame
    3. DataFrame 自带优化器 Catalyst,可以自动优化程序
    4. DataFrame 提供了一整套的 Data Source API

    与 RDD 类似,DataFrame 也是一个分布式数据容器。然而 DataFrame 更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即 Schema。同时,与 Hive 类似,DataFrame也支持嵌套数据类型(struct、array 和 map)。从 API 易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的 RDD API 

    DataFrame解释

    A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala APIDataFrameis simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

    翻译:

    DataFrame 是按列名的方式去组织的一个分布式的数据集(RDD),就像关系型数据库里面的一张表,(或者说好比是 R/Python 语言里面的 DataFrame),不过 SparkSQL 这儿的方法比 R/Python 语言里面的 DataFrame 提供的操作方法更丰富,DataFrame 的数据源有如下:结构化的文件,Hive 里面的表,外部的数据库(MySQL 等),已经存在的 RDD。DataFrame 提供了 Scala,Java,Python,R 的编程 API,在 Scala 或者 Java 编程中,一个 DataFrame 表示以行组织的 Rows 的数据集合,在 Scala 的 API 中,DataFrame 就可以看做是 Dataset[Row]的另一种称呼,但是,在 Java 的 API 中,开发者必须使用 Dataset<Row>去表示一个 DataFrame。

    DataFrame编程

    Spark SQL 编程:

    1. 首先要找到程序入口(SQLContext),新版本 Spark-2.x 之后寻找 SparkSession
    2. 通过程序入口构建一个 DataFrame(核心的抽象 DataFrame)
    3. 对 DataFrame 做各种操作。最重要就是编写 SQL 语句
    4. 对得到的结果数据进行处理(打印输出或者存入数据库等)

    DataSet

    为什么产生DataSet

    由于 DataFrame 的数据类型统一是 Row,所以 DataFrame 也是有缺点的。Row 运行时类型检查,比如 salary 是字符串类型,下面语句也只有运行时才进行类型检查。所以,Spark SQL 引入了 Dataset,扩展了 DataFrame API,提供了编译时类型检查,面向对象风格的 API。但是Dataset 可以和 DataFrame、RDD 相互转换。DataFrame=Dataset[Row],可见 DataFrame 是一种特殊的 Dataset。

    dataframe.filter("salary>1000").show()

    解释

    A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

    翻译:

    一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口,它提供了RDD 的优点(强类型化,能够使用强大的 lambda 函数)与 Spark SQL 执行引擎的优点。一个 Dataset 可以从 JVM 对象来构造并且使用转换功能(map, flatMap, filter,等等)。Dataset API 在 Scala 和 Java 是可用的。Python 不支持 Dataset API。但是由于 Python 的动态特性,许多Dataset API 的优点已经可用了 (也就是说,你可能通过 name 天生的 row.columnName 属性访问一行中的字段)。这种情况和 R 相似。

    为什么需要 DataFrame 和 DataSet

    Spark SQL提供了两种方式读取操作数据:1. SQL 查询   2. DataFrame 和 Dataset API。但是,SQL 语句虽然简单,但是 SQL 的表达能力却是有限的,DataFrame 和 Dataset 可以采用更加通用的语言(Scala 或 Python)来表达用户的查询请求。此外,Dataset 可以更快捕捉错误,因为 SQL 是运行时捕获异常,而 Dataset 是编译时检查错误。

    Spark SQL 程序编写步骤

    1. 创建 SparkSession 对象
    2. 创建 DataFrame 或 Dataset
    3. 在 DataFrame 或 Dataset 之上进行转换和 Action
    4. 返回结果(保存结果到 HDFS 中,或直接打印出来)

    创建DataFrame

    数据文件 :

    1. 在本地创建一个文件,有五列,分别是 id、name、sex、age、department,用逗号分隔,然后上传到 HDFS 上:hdfs dfs -put student.txt /student
    2. 在 spark shell 执行下面命令,读取数据,将每一行的数据使用列分隔符分割:val lineRDD = sc.textFile("hdfs://myha01/student/student.txt").map(_.split(","))
    3. 定义 case class(相当于表的 schema):case class Student(id:Int, name:String, sex:String, age:Int, department:String)
    4. 将 RDD 和 case class 关联:val studentRDD = lineRDD.map(x => Student(x(0).toInt, x(1), x(2), x(3).toInt, x(4)))
    5. 将 RDD 转换成 DataFrame:Spark-2.3 : val studentDF = spark.createDataFrame(studentRDD) 或者 Spark-1.6 : val studentDF = studentRDD.toDF
    6. 对 DataFrame 进行处理:studentDF.show 或者 studentDF.printSchema

    DataFrame常用操作

    DSL风格语法

    //打印 DataFrame 的 Schema 信息
    studentDF.printSchema

    DSL风格语法示例:

    //查看 DataFrame 部分列中的内容
    studentDF.select("name", "age").show
    studentDF.select(col("name"), col("age")).show
    studentDF.select(studentDF.col("name"), studentDF.col("age")).show
    
    //查询所有的 name 和 age,并将 age+1
    studentDF.select(col("id"), col("name"), col("age") + 1).show
    studentDF.select(studentDF ("id"), studentDF ("name"), studentDF ("age") + 1).show
    
    
    // 按年龄进行分组并统计相同年龄的人数
    studentDF.groupBy("age").count().show()

    SQL风格语法

    注:如果想使用 SQL 风格的语法,需要将 DataFrame 注册成表

    老版本写法:

    • studentDF.registerTempTable("t_student")

    新版本写法:

    1. Session 范围内的临时表:studentDF.createOrReplaceTempView(“t_student”)只在Session范围内有效,Session 结束临时表自动销毁
    2. 全局范围内的临时表:studentDF.createGlobalTempView(“t_student”)所有 Session 共享

    SQL风格语法示例:

    // 查询年龄最大的前五名
    sqlContext.sql("select * from t_student order by age desc limit 5").show
    
    // 显示表的 Schema 信息
    sqlContext.sql("desc t_student ").show
    
    // 统计学生数超过 6 个的部门和该部门的学生人数。并且按照学生的个数降序排序
    sqlContext.sql("select department, count(*) as total from t_student group by department having total > 6 order by total desc").show
    
    

    DataFrame 支持的操作

    以编程方式执行Spark SQL

    编写 Spark SQL 查询程序

    package com.mazh.spark.sql
    import org.apache.spark.sql.{SQLContext, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}
    
    //case class 一定要事先放到外面定义好
    case class Student(id: Int, name: String, sex: String, age: Int, department: String)
    object StudentSparkSQL {
         def main(args: Array[String]) {
             //创建 SparkConf()并设置 App 名称
             val conf = new SparkConf().setAppName("FirstSparkSQLAPP--Student")
             //SQLContext 要依赖 SparkContext
             val sc = new SparkContext(conf)
             //创建 SQLContext
             val sqlContext = new SQLContext(sc)
             //从指定的地址创建 RDD
             val lineRDD = sc.textFile(args(0)).map(_.split(","))
             //创建 case class
             //将 RDD 和 case class 关联
             val studentRDD = lineRDD.map(x => Student(x(0).toInt, x(1), x(2), x(3).toInt,x(4)))
             //导入隐式转换,如果不导入无法将 RDD 转换成 DataFrame
             //将 RDD 转换成 DataFrame
             import sqlContext.implicits._
             val studentDF = studentRDD.toDF
             //注册表
             studentDF.registerTempTable("t_student")
             //传入 SQL
             val df = sqlContext.sql("select department, count(*) as total from t_student group by department having total > 6 order by total desc")
             //将结果以 JSON 的方式存储到指定位置
             df.write.json(args(1))
             //停止 Spark Context
             sc.stop()
        }
    }
    

    提交Spark任务

    $SPARK_HOME/bin/spark-submit \
    --class com.mazh.spark.sql.StudentSparkSQL \
    --master spark://hadoop02:7077,hadoop04:7077 \
    /home/hadoop/Spark_SQL-1.0-SNAPSHOT.jar \
    hdfs://myha01/student/student.txt \
    hdfs://myha01/student/output_sparksql

    数据源

    通用的load和save功能

    编写普通的 load 和 save 功能

    spark.read.load("hdfs://myha01/spark/sql/input/users.parquet").select("name","favorite_color").write.save("hdfs://myha01/spark/sql/output")
    

    指定 load 和 save 的特定文件格式

    spark.read.format("json").load("hdfs://myha01/spark/sql/input/people.json").select("name", "age").write.format("csv").save("hdfs://myha01/spark/sql/csv")

    Save Model

    JDBC

    Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对 DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。

    从 MySQL 中加载数据(Spark Shell 方式)

    启动 Spark Shell,必须指定 mysql 连接驱动 jar 包

    启动本机的单进程 Shell:
    $SPARK_HOME/bin/spark-shell \
    --jars $SPARK_HOME/mysql-connector-java-5.1.40-bin.jar \
    --driver-class-path $SPARK_HOME/mysql-connector-java-5.1.40-bin.jar 
    
    启动连接 Spark 集群的 Shell:
    $SPARK_HOME/bin/spark-shell \
    --master spark://hadoop02:7077,hadoop04:7077 \
    --jars $SPARK_HOME/mysql-connector-java-5.1.40-bin.jar \
    --driver-class-path $SPARK_HOME/mysql-connector-java-5.1.40-bin.jar 

    从 mysql 中加载数据

    val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://hadoop02:3306/spider", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "lagou", "user" -> "root", "password" -> "root")).load()

    将数据写入 MySQL 中(Spark Submit 方式)

    package com.mazh.spark.sql
    
    import java.util.Properties
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField,StructType}
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkSQL_JDBC {
        def main(args: Array[String]) {
            val conf = new SparkConf().setAppName("SparkSQL_JDBC")
            val sc = new SparkContext(conf)
            val sqlContext = new SQLContext(sc)
            //通过并行化创建 RDD
            // val studentRDD = sc.parallelize(Array("1 huangbo 33", "2 xuzheng 44", "3 wangbaoqiang 55")).map(_.split(" "))
            //通过读取文件创建 RDD
            val studentRDD = sc.textFile(args(0)).map(_.split(","))
            //通过 StructType 直接指定每个字段的 schema
            val schema = StructType(
                List(
                     StructField("id", IntegerType, true),
                     StructField("name", StringType, true),
                     StructField("sex", StringType, true),
                     StructField("age", IntegerType, true),
                     StructField("department", StringType, true)
                )
            )
            //将 RDD 映射到 rowRDD
            val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim,p(3).toInt,p(4).trim))
            //将 schema 信息应用到 rowRDD 上
            val studentDataFrame = sqlContext.createDataFrame(rowRDD, schema)
            //创建 Properties 存储数据库相关属性
            val prop = new Properties()
            prop.put("user", "root")
            prop.put("password", "root")
            //将数据追加到数据库
            studentDataFrame.write.mode("append").jdbc("jdbc:mysql://hadoop02:3306/spider","student", prop)
            //停止 SparkContext
            sc.stop()
        }
    }

    准备数据:student.txt 存储在 HDFS 上的/student 目录中
    给项目打成 jar 包,上传到客户端
    提交任务给 Spark 集群:

    $SPARK_HOME/bin/spark-submit \
    --class com.mazh.spark.sql.SparkSQL_JDBC \
    --master spark://hadoop02:7077,hadoop04:7077 \
    --jars $SPARK_HOME/mysql-connector-java-5.1.40-bin.jar \
    --driver-class-path $SPARK_HOME/mysql-connector-java-5.1.40-bin.jar \
    /home/hadoop/Spark_WordCount-1.0-SNAPSHOT.jar \
    hdfs://myha01/student/student.txt

    结果展示:

    JSON

    代码:

    object TestSparkSQL_ReadJSON {
        def main(args: Array[String]): Unit = {
            // 构建 SparkSQL 程序的编程入口对象 SparkSession
            val sparkSession:SparkSession = SparkSession.builder()
              .appName("MyFirstSparkSQL")
              .config("someKey", "someValue")
              .master("local")
              .getOrCreate()
    
             // 方式 1
             val df1 = sparkSession.read.json("D:\\bigdata\\json\\people.json")
    
             // 方式 2
             val df2 = sparkSession.read.format("json").load("D:\\bigdata\\json\\people.json")
        }
    }

    Parquet Files

    代码:

    object TestSparkSQL_ReadParquet {
        def main(args: Array[String]): Unit = {
            // 构建 SparkSQL 程序的编程入口对象 SparkSession
            val sparkSession:SparkSession = SparkSession.builder()
              .appName("MyFirstSparkSQL")
              .config("someKey", "someValue")
              .master("local")
              .getOrCreate()
    
              // 方式 1
              val df1 = sparkSession.read.parquet("D:\\bigdata\\parquet\\people.parquet")
    
              // 方式 2
              val df2 = sparkSession.read.format("parquet").load("D:\\bigdata\\json\\people.json")
        }
    }

    Spark On Yarn

    参照博客:https://blog.csdn.net/Jerry_991/article/details/85042305

    Spark 整合 Hive

    参照博客:https://blog.csdn.net/Jerry_991/article/details/84000097

    SparkSQL自定义聚合函数

    SparkSQL 定义普通函数 

    要点:spark.udf.register(“function_name”, function)

     /*
      *    第一步:获取程序入口
      */
        val sparkConf = new SparkConf()
        sparkConf.setAppName("SparkSQL_UAF_Length").setMaster("local")
        val sparkContext = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sparkContext)
    
     /*
      *    第二步:获取到一个DataFrame,然后注册为一张表
      *    
      *    JDBC:三个参数
      *    url:String
      *    table:String
      *    properties:Properties
      */ 
        val url = "jdbc:mysql://hadoop02:3306/bigdata"
        val table = "student"
        val properties = new Properties()
        properties.put("user","root")
        properties.put("password","root")
        val studentDF:DataFrame = sqlContext.read.jdbc(url,table,properties)
        
      /*
       *    第三步:把这个dataFrame注册为一张临时表
       */
        studentDF.createTempView("student")
    
      /*
       *    第四步:定义一个函数
       */  
        sqlContext.udf.register("strLength",(x:String) => x.length)
    
      /*
       *    第五步:使用这个函数做一个操作,求出某个字段的长度
       */
        sqlContext.sql("select strLength(name) as name_len from student").show()
        
      /*
       *    第六步:程序完结,关闭资源
       */
        sparkContext.stop()
        
         
        

    定义 SparkSQL 的自定义聚集函数 

    要点:Class MyUDAF extends UserDefinedAggregationFunction,spark.udf.register("function_name", function)

    object SparkSQL_UDAF_AvgAge extends UserDefinedAggregateFunction{
        /**
         * 定义输入的数据的类型
         */
         override def inputSchema: StructType = StructType(
             StructField("age", DoubleType, true) :: Nil
         )
    
        /**
         * 定义辅助字段:
         *
         * 1、辅助字段 1:用来记录所有年龄之和 total
         * 2、辅助字段 2:用来总记录所有学生的个数 count
         */
         override def bufferSchema: StructType = StructType(
             StructField("total", DoubleType, true)::
             StructField("count", IntegerType, true)::
             Nil
         )
    
         /**
         * 计算学生的平均年龄 计算公式: 学生年龄的总和 / 学生总数
         *
         * 所以要初始化要两个辅助字段:
         * total : 0.0
         * count : 0
         */
         override def initialize(buffer: MutableAggregationBuffer): Unit = {
             buffer.update(0, 0.0)
             buffer.update(1, 0)
         }
    
        /**
         * 每次给一条记录, 然后进行累加。进行累加变量 buffer 的状态更新
         * 这是一个局部操作。
         */
         override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
             val lastTotal = buffer.getDouble(0)
             val lastCount = buffer.getInt(1)
             val currentSalary = input.getDouble(0)
             buffer.update(0,lastTotal + currentSalary)
             buffer.update(1,lastCount+1)
         }
    
        /**
         * 当局部操作完成,最后需要一个全局合并的操作
         * 就相当于是 reducer 阶段的最终合并
         */
         override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
             val total1 = buffer1.getDouble(0)
             val count1 = buffer1.getInt(1)
             val total2 = buffer2.getDouble(0)
             val count2 = buffer2.getInt(1)
             buffer1.update(0, total1 + total2)
             buffer1.update(1, count1 + count2)
         }
    
        /**
         * 计算平均年龄
         */
         override def evaluate(buffer: Row): Any = {
             val total = buffer.getDouble(0)
             val count = buffer.getInt(1)
             total / count
         }
    
         /**
          * 返回结果数据类型
          */
         override def dataType: DataType = DoubleType
    
        /**
         * 输入和输出的字段类型是否匹配。也即是否一致
         */
         override def deterministic: Boolean = true
    }

    使用测试:

     

    SparkSQL 常用窗口分析函数

    (待整理)

    展开全文
  • Spark SQL 教程

    千次阅读 2018-12-25 17:56:21
    一、什么是Spark SQL Spark SQL是Spark用来处理结构化数据的一个模块,它提供了两个编程抽象分别叫做DataFrame和DataSet,它们用于作为分布式SQL查询引擎。从下图可以查看RDD、DataFrames与DataSet的关系。...
  • SparkSQL(一)

    千次阅读 2018-12-12 00:37:02
    spark1.0版本就已经退出SparkSQL最早叫shark Shark是基于spark框架并且兼容hive,执行SQL执行引擎,因为底层使用了Spark,比MR的Hive普遍要快上两倍左右,当数据全部load到内存中,此时会比Hive快上10倍以上,...
  • 第二十四记·Spark SQL配置及使用

    千次阅读 2018-11-27 14:32:13
    SparkSQL是spark的一个模块,主入口是SparkSession,将SQL查询与Spark程序无缝混合。DataFrames和SQL提供了访问各种数据源(通过JDBC或ODBC连接)的常用方法包括Hive,Avro,Parquet,ORC,JSON和JDBC。您甚至可以跨...
  • spark Sql

    千次阅读 2019-10-15 18:37:36
    概述1 spark历史2 Spark-SQL 概述2.1 特点2.2 作用2.3 Spark SQL架构图3 Dataset演进历史3.1 RDD3.1.1 优点3.1.2 缺点3.2 DataFrame3.2.1 优点3.2.2 缺点3.2.3 核心特征3.3 Dataset3.3.1 区别3.3.2 特点4 SparkSQL ...
  • Spark SQL入门

    千次阅读 2019-04-11 17:38:50
    1、SQL结合spark有两条线: Spark SQL和Hive on Spark(还在开发状态,不稳定,暂时不建议使用)。 #Hive on Spark是在Hive中的,使用Spark作为hive的执行引擎,只需要在hive中修改一个参数即可: ...
  • Spark SQL入门用法与原理分析

    万次阅读 2017-01-12 15:05:07
    sparkSQL是为了让开发人员摆脱自己编写RDD等原生Spark代码而产生的,开发人员只需要写一句SQL语句或者调用API,就能生成(翻译成)对应的SparkJob代码并去执行,开发变得更简洁, 1. API 2.原理 3.Catalyst解析器 4...
  • SparkSql

    2018-04-15 09:06:48
    SparkSql 1.sparkSQL概述 Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。它是sparkSQL的底层抽象 有多种方式去使用Spark SQL,包括SQL...
  • SparkSQL

    2018-10-17 23:15:46
    SparkSQL的shuffle过程 SparkSQL结构化数据 SparkSQL解析 SparkSQL的shuffle过程 Spark SQL的核心是把已有的RDD,带上Schema信息,然后注册成类似sql里的”Table”,对其进行sql查询。这里面主要分两部分,一...
  • 总结:Hive,Hive on Spark和SparkSQL区别

    万次阅读 多人点赞 2020-09-11 00:43:36
    Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优化上的一些总结 ...
  • 1、SparkSQL的发展历程 1.1 Hive and Shark SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是...
  • SparkSQL相关语法总结

    千次阅读 2017-03-23 11:35:03
    1.in 不支持子查询 eg. select * from src where key in(select key from test); 支持查询个数 eg. select * from src where key in(1,2,3,4,5); in 40000个 耗时25.766秒 in 80000个 耗时78.827秒 ...
  • 开源OLAP引擎测评报告(SparkSql、Presto、Impala、HAWQ、ClickHouse、GreenPlum) 易观CTO 郭炜 序 现在大数据组件非常多,众说不一,在每个企业不同的使用场景里究竟应该使用哪个引擎呢?这是易观Spark实战营出品的...
  • SparkSQL代码案例

    2019-04-10 15:50:10
    package com.netcloud.bigdata.sparksql import java.util.Properties import org.apache.spark.sql.{SaveMode, SparkSession} /** * DataFrame的创建 * 从已经存在的RDD生成,从hive表、或者其他数据源(本地...
  • 适合小白入门的IDEA开发SparkSQL详细教程

    千次阅读 多人点赞 2020-04-12 10:51:40
        ...于是在正式开始学习了之后,决定整理一篇适合像我一样的小白级别都能看得懂的IDEA操作SparkSQL教程,于是就有了下文…        ...
  • 1、SparkSQL的发展历程 1.1 Hive and Shark SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是...
  • sparksql性能调优

    千次阅读 2015-09-16 00:16:16
    性能优化参数 代码实例import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext;...
1 2 3 4 5 ... 20
收藏数 19,204
精华内容 7,681
关键字:

spark sql