精华内容
下载资源
问答
  • sparksql

    2021-08-13 16:28:50
    sparksql介绍 sparksql是spark用来处理结构化数据的一个模板,他提供了要给编程抽象叫做dataframe并且作为分布式sql查询引擎的作用 sparksqlsparksql转化为rdd,然后提交到集群执行,执行效率快 hive的应用其实...

    sparksql介绍

    sparksql是spark用来处理结构化数据的一个模板,他提供了要给编程抽象叫做dataframe并且作为分布式sql查询引擎的作用

    sparksql将sparksql转化为rdd,然后提交到集群执行,执行效率快

    hive的应用其实是对应不会写java的开发人员,但是会写sql的数据库提供的是mr的一种简化

    sparksql其实是对之前学习的sparkcore中rdd的一种简化,用sql的语言可以对rdd编程进行开发

    spark是有处理上限的,10PB,超过这个范围还是要使用hive来处理的,hive的处理上限是100PB级别

    sparksql优点

    1.易整合

    2.统一的数据访问方式

    3.兼容hive

    4.标准的数据连接

    sparksql的操作方式

    sparksql的数据抽象

    对于sparkcore而言对数据的操作需要首先转换成rdd,对rdd可以使用各种算子进行处理,最终对数据进行统一操作,所以我们将rdd看做是对数据的封装(抽象)

    对于sparksql而言,对数据进行操作的也需要进行转换,这里提供了两个新的抽象,分别是dataframe和dataset

    RDD vs DataFrames vs DataSet

    首先从版本上看

    RDD

    rdd是一个懒执行的不可变的可以支持functional(函数式编程)的并行数据集合.

    rdd的最大好处就是简单,API的人性化程度很高

    rdd的劣势是性能限制,它是一个jvm驻内存对象,这也就决定了存在gc的限制和数据增加时java序列化成本的升高

    DataFrame

    简单来说dataframe是rdd+schema的集合

    什么是schema?

    之前我们学过mysql数据库,在数据库中schema是数据库的组织和结构模式中包含了schema对象,可以是表(table)列(column)数据类型(data type),视图(view),存储过程(stored procedures),关系(relationships),主键(primary key)外键 (foreign key)等schema代表的就是一张表

    与rdd类似,dataframe也是一个分布式的数据容器,然而dataframe更像传统数据库的二维表格,处理数据以外,还记录数据的结构信息,即schema,同时,与hive类似dataframe也支持嵌套数据类型(struct,array和map),从api易用性的角度上看,dataframeapi提供的是一套高层的关系操作,比函数式的rddapi更加友好,门槛更低,由于与r和pandas的dataframe类似,sparkdataframe很好的继承了传统单机数据分析的开发体验

    dataframe是为数据提供了schema的视图,可以把他当做数据库中的一张表来对待,

    dataframe也是懒执行的

    性能比rdd要高,主要有两方面

    定制化内存管理

    优化执行计划

    dataframe的劣势在于编译器缺少的类型安全检查,导致运行时出错,

    dataframe只是知道字段但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候见擦汗是否类型失败的,比如你可以对一个string进行减法操作,在执行的时候才会报错,而dataset不仅仅知道字段而且世道字段类型,所以有更加严格的错误检查

    dataset

    是dataframeapi的一个扩展,是spark最新的数据抽象,

    用户友好的api风格,既具有类型安全检查,也具有dataframe的查询优化特性,dataset支持编程解码器,当需要访问非堆上的数据式可以避免反序列化整个对象,提高了效率,

    样例类被用来在dataset重定义数据的结构信息,样例类中每个属性的名称直接映射到dataset中定义的数据的结构信息,样例类中的每个属性的名称直接映射到dataset中的字段名称,

    dataframe是dataset的特例,dataframe=dataset[row],所以可以通过as方法,将dataframe转换为dataset,row是一个类型,更car person这些类型一样,所有的表结构信息,我们都用row来表示

    dataset是强类型,比如可以有dataset[car],dataset[person]

    dataset[row]这个类似于我们学习的方形row就是泛型类型

    三者的共性

    1.rdd ,dataframe,dataset全都是spark平台下的分布式弹性数据库,为处理超大型数据提供便利

    2.三者都有惰性机制,在进行创建,转换,如map方法时,不会立即执行,只有在遇到action如foreach是,三者才会开始遍历运算,极端情况下,如果代码里面有创建转换,,但是后面没有action中使用对应的结果,在执行时会被直接跳过

    3.三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心内存溢出

    4.三者都有partition的概念

    5.三者都有许多的共同函数,如filter,排序等

    6.在对dataframe和dataset进行操作许多操作都需要这个包进行支持

    7.dataframe和dataset均可使用模式匹配获取各个自读你的值和类型

    三者的区别

    rdd

    rdd一般不和spark mlib同时使用

    rdd不支持sparksql操作

    dataframe

    与rdd和dataset不同,dataframe每一行的类型固定为row,只有通过解析才能获取各个字段的值,每一列的值没法直接访问

    dataframe与dataset一般不与spark mlib同时使用

    dataframe与dataset均支持sparkslql的操作,比如select ,groupby之类,还能注册临时表/视窗,进行sql语句操作

    dataframe与dateset支持一些特别翻遍的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

    利用这样的保存方式,可以方便的获取字段名和列的对应,而且分隔符(delimiter)可以自由指定

    dataset

    dataset和dataframe拥有完全相同的成员函数,区别只是每一样的数据类型不同,

    dataframe也可以叫dataset[row]每一行的类型是row,不解析,每一行究竟有哪些字段,各个字段优势什么类型都无从得知,只能用上面提到的getAS方法或者童星汇总的七条提出到的模式匹配拿出特定字段

    而dataset中每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

    dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用dataframe即dataset[row]就能比较好的解决问题

    sparksql应用操作

    在老版本中,saprksql提供了两种sql查询的起始点,一个叫sqlcontext,用于spark自己提供的sql查询,一个叫hivecontext,用于连接hive 的查询,sparksession提供 的是spark新的sql查询起始点,实际上是sqlContext和hivecontext的组合,所以在sqlcontext和hivecontext上可以用的api在sparksession上同样是可以使用的,sparksession内部封装了sparkcontext,所以计算实际上是由sparkcontext完成的

    spark-shell基本操作

    spark.read.json("/opt/software/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.json")
    df.show()
    df.filter($"age">21).show
    df.createOrReplaceTempView("person")
    spark.sql("select * from persons").show()
    spark.sql("select * from persons where age > 21).show()

    idea编写sparksql

    maven pom

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

    sparksession的三种创建方式

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    /**
      * SparkSession三种创建方式
      */
    object SparkSQLDemo {
      def main(args: Array[String]): Unit = {
        /**
          * 创建SparkSession方式1
          * builder用于创建一个SparkSession。
          * appName设置App的名字
          * master设置运行模式(集群模式不用设置)
          * getOrCreate 进行创建
          */
        val sparks1=SparkSession.builder().appName("SparkSQLDemo").master("local").getOrCreate()
        /**
          * 创建SparkSession方式2
          * 先通过SparkConf创建配置对象
          * SetAppName设置应用的名字
          * SetMaster设置运行模式(集群模式不用设置)
          * 在通过SparkSession创建对象
          * 通过config传入conf配置对象来创建SparkSession
          * getOrCreate 进行创建
          */
        val conf=new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
        val sparks2=SparkSession.builder().config(conf).getOrCreate()
        /**
          * 创建SparkSession方式3(操作hive)
          * uilder用于创建一个SparkSession。
          * appName设置App的名字
          * master设置运行模式(集群模式不用设置)
          * enableHiveSupport 开启hive操作
          * getOrCreate 进行创建
          */
          val sparkh = SparkSession.builder().appName("SparkSQLDemo").master("local).enableHiveSupport().getOrCreate()
        
        //关闭
        sparks1.stop()
        sparks2.stop()
        sparkh.stop()
      }
    }
    ​

    数据转换

    rdd转换为dataframe

    直接手动确定

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    ​
    import org.apache.spark.rdd.RDD
    /**
      * RDD--->DataFrame 直接手动确定
      */
    object SparkSQLDemo {
      def main(args: Array[String]): Unit = {
        //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
        //SQLContext要依赖SparkContext
        val sc = new SparkContext(conf)
        //从指定的地址创建RDD
        val lineRDD = sc.textFile("dir/people.txt").map(_.split(","))
        //这里是将数据转换为元组,数据量少可以使用这种方式
        val tuple: RDD[(String, Int)] = lineRDD.map(x => (x(0),x(1).trim().toInt))
        val spark = SparkSession.builder().config(conf).getOrCreate()
        //如果需要RDD于DataFrame之间操作,那么需要引用 import spark.implicits._ [spark不是包名,是SparkSession对象]
        import spark.implicits._
        val frame: DataFrame = tuple.toDF("name","age")
        frame.show()
        sc.stop();
        spark.stop();
      }
    }

    通过反射获取schema

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    /**
      * RDD--->DataFrame 通过反射推断Schema
      */
    object SparkSQLDemo {
      def main(args: Array[String]): Unit = {
        //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
        //SQLContext要依赖SparkContext
        val sc = new SparkContext(conf)
        //从指定的地址创建RDD
        val lineRDD = sc.textFile("dir/people.txt").map(_.split(","))
        //除了这个方式之外我们还可以是用样例类的形式来做复杂的转换操作
        val tuple = lineRDD.map(x => People(x(0),x(1).trim().toInt));
        val spark = SparkSession.builder().config(conf).getOrCreate()
        import spark.implicits._
        //此时不需要使用指定列名会根据样例类中定义的属性来确定类名
        val frame: DataFrame = tuple.toDF
        frame.show()
        sc.stop()
      }
    }
    case class People(name:String,age:Int)

    通过structType直接指定schema

    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    /**
      * RDD--->DataFrame 通过StructType直接指定Schema
      */
    object SparkSQLDemo {
      def main(args: Array[String]): Unit = {
        //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
        //创建SparkContext对象
        val sc = new SparkContext(conf)
        //创建SparkSession对象
        val spark = SparkSession.builder().config(conf).getOrCreate()
        //从指定的地址创建RDD
        val lineRDD = sc.textFile("dir/people.txt").map(_.split(","))
        //通过StructType直接指定每个字段的schema
        val schema = StructType(
          List(
            StructField("name", StringType, true),
            StructField("age", IntegerType, true)
          )
        )
        //将RDD映射到rowRDD
        val rowRDD = lineRDD.map(p => Row(p(0), p(1).trim.toInt))
        //将schema信息应用到rowRDD上
        val peopleDataFrame = spark.createDataFrame(rowRDD, schema)
        val frame: DataFrame = peopleDataFrame.toDF
        frame.show()
        sc.stop()
      }
    }

    dataframe转换成rdd

    无论通过那种范式获取的dataframe都可以使用一个方法转换
    val frameToRDD:RDD[Row] = frame.rdd
    frameToRdd.foreache(x=>println(x.getString(0)+","+x.getInt(1)))

    rdd转换为dataset

    通过反射获取schema(样例类模式)

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    /**
      * RDD--->DataSet 通过反射获取Scheam
      */
    object SparkSQLDemo {
      def main(args: Array[String]): Unit = {
        //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
        //创建SparkContext对象
        val sc = new SparkContext(conf)
        //创建SparkSession对象
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val value: RDD[String] = sc.textFile("dir/people.txt")
        import spark.implicits._
        val dataSet: Dataset[People] = value.map {
          x =>
            val para = x.split(",");
            People(para(0), para(1).trim().toInt);
        }.toDS()
        dataSet.show()
        sc.stop()
      }
    }
    case class People(name:String,age:Int)
    ​

    dataset转换为rdd

    //调用rdd方法
    val dataSetTORDD:RDD[People] = dataSet.rdd
    dataSetTORDD.foreach(x => println(x.name+","+x.age));

    dataset转换为dataframe

    //调用toDF方法,直接服用case class中定义的属性
    val frame: DataFrame = dataSet.toDF()
    frame.show()

    dataframe转换为dataset

     val value: Dataset[People] = frame.as[People]
     case class People(name:String,age:Int)

    总结:

    sparksql支持两种类型分别为dataset和dataframe,这两种类型都支持从rdd转换为dataset或dataframe

    rdd转dataframe有三种方法是

    1,直接转换使用元素的模式存储再转换

    2.使用样例类的模式匹配schema再转换(反射的方式)

    3.structType直接指定schema再转换

    rdd转dataset

    使用样例类的模式匹配schema再转换

    其余读取文件的方式可以直接获取对应的dataframe

    dataset和dataframe之间的相互转换

    dataset转换dataframe

    调用toDF方法直接用case class中定义的属性

    dataframe转换为dataset

    调用as[对应样例类类名]

    dataset对象或dataframe对象调用rdd方法就可以转换为rdd

    数据操作方法

    dsl语言风格

    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}
    ​
    object SparkSQL {
      def main(args:Array[String]):Unit = {
      //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val df: DataFrame = spark.read.json("dir/people.json")
        //DSL风格语法:
        df.show()
        import spark.implicits._
        // 打印Schema信息
        df.printSchema()
        df.select("name").show()
        df.select($"name", $"age" + 1).show()
        df.filter($"age" > 21).show()
        df.groupBy("age").count().show()
        spark.stop()
      }
    }

    sql风格

    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}
    ​
    object SparkSQL {
      def main(args:Array[String]):Unit = {
      //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val df: DataFrame = spark.read.json("dir/people.json")
        //SQL风格语法:
        //临时表是Session范围内的,Session退出后,表就失效了
        //一个SparkSession结束后,表自动删除
        df.createOrReplaceTempView("people")
        val sqlDF = spark.sql("SELECT * FROM people")
        sqlDF.show()
        //如果想应用范围内有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp.people
        //应用级别内可以访问,一个SparkContext结束后,表自动删除 一个SparkContext可以多次创建SparkSession
        //使用的比较少
        df.createGlobalTempView("people")
       //创建名后需要必须添加global_temp才可以
        spark.sql("SELECT * FROM global_temp.people").show()
        spark.newSession().sql("SELECT * FROM global_temp.people").show()
        spark.stop()
      }
    }

    需要打包上传集群,可以在集群中使用这个jar包

    在安装spark目录下进入到bin目录执行spark-submit

    /usr/local/spark/bin/spark-submit \

    --class 需要执行的类的全限定名(从包开始到类名结束) \

    --master 指定主节点 \

    上传jar所在的位置 \

    数据输入路径 \

    数据输出路径 -->没有可以不写

    sparksql自定义函数

    udf函数

    用户自定义函数

    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.{SparkConf}
    // 需求:实现字符串拼接
    object SparkSQL {
      def main(args:Array[String]):Unit = {
      //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("SparkSQLDemo").setMaster("local")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        val df: DataFrame = spark.read.json("dir/people.json")
        //注册函数,在整个应用中可以使用
        val addName = spark.udf.register("addName", (x: String) => "Name:" + x)
        df.createOrReplaceTempView("people")
        spark.sql("Select addName(name), age from people").show()
        spark.stop()
      }
    }

    udaf函数

    用户自定义聚合函数

    udaf函数支持dataframe(弱类型)

    通过继承userdefinedaggregateFunction来实现用户自定义聚合函数,

    弱类型指的是在编译阶段是无法确定数据类型的,而是在运行阶段才能创建类型

    udaf函数支持dataset(强类型)

    通过继承aggregator来实现强类型自定义聚合函数,

    在编译阶段就确定了数据类型

    开窗函数

    说明:
    rank()跳跃函数,有两个第二名时后面跟着的是第四名
    dense_rank()连续函数,有两个第二名后面仍然跟着第三名
    over()开窗函数:
        在使用聚合函数后,会将多行变成一行,而开窗函数是将一行变成多行
        而且在使用聚合函数后,如果要显示其他的列必须将列加入到groupby中
        而使用开窗函数后,可以不适用groupby,直接将所有信息显示出来,
        开窗函数适用于在每行的最后一列添加聚合函数的结果
    常用开窗函数:
        1.为每条数据显示聚合信息,(聚合函数() over())
        2.为每条数据提供分组的聚合函数结果(聚合函数() over (partition by 字段) as 别名)
            --按照字段分组,分组后进行计算
        3.与排名函数一起使用,(row number() over(over by 字段名)as 别名)
            1.row_number() over(partition by ... order by)
            2.rank() over(partition by .. order by ...)
            3.dense_rank() over(partition by ... order by ...)
            4.count() over(partition by ... order by ...)
            5.max()
            6.min()
            7.avg()
            8.first_value()
            9.last_value()
            10.sum()
            11.lag()
            12.lead()
    lag和lead获取的结果集中,按一定排序所排列的当前行的上下相邻若干个offset的某个行的某个列(不用结果集的自关联)
    lad,lead分别是向前,向后
    lag和lead有三个参数第一个参数是列名,第二个参数是偏移的offset,第三个 参数是超出记录窗口时的默认值
    ​

    集成hive

    apachehive是hadoop上的sql引擎,sparksql编译时可以包含hive支持,也可以不包含,包含hive支持的spark sql可以支持hive 表访问,udf(用户自定义函数)以及hive查询语言(hiveql/hql)等,需要强调的是如果在spark中包含hive的库,并不需要事先安装hive

    一般来说,最好还是在编译sparksql时引入 hive支持,这样就可以使用这些特性了

    使用内置hive

    ps:需要注意内置hive是非常容易出现问题的
    1.先启动集群/opt/software/spark-2.2.0-bin-hadoop2.7/sbin/start-all.sh
    2.进入到spark-shell模式/opt/software/spark-2.2.0-bin-hadoop2.7/bin/spark-shell --master spark://hadoop01:7077
    3.在spark-shell下操作hive
    spark.sql("show tables").show 查询所有hive的表
    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING)") 创建表
    spark.sql("LOAD DATA LOCAL INPATH '/opt/software/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/kv1.txt' INTO TABLE src")  添加数据
    spark.sql("SELECT * FROM src").show 查询表中数据
    会出现一个问题FileNotFoundException 没有找到文件
    通过在主节点和从节点查询可以发现,主节点存在spark-warehouse目录 目录中是存在数据的
    但是在从节点中没有这个文件夹,所以此时将文件夹分发到从节点
    scp -r ./spark-warehouse/ root@hadoop02:$PWD 
    再次执行查询
    ps:这样表面看查询是没什么问题了,但是实际问题在于是讲master节点上的数据分发到从节点上的,那么不可能说每次操作有了数据都执行拷贝操作,所以此时就需要使用HDFS来进行存储数据了
    所以先将所有从节点上的spark-warehouse删除掉
    删除后将主节点上的spark-warehouse和metastor_db删除掉
    然后在启动集群的时候添加一个命令
     --conf spark.sql.warehouse.dir=hdfs://hadoop01:8020/spark_warehouse
     此方法只做一次启动即可 后续再启动集群的时候就无需添加这个命了 因为记录在metastore_db中了
     ps:spark-sql中可以直接使用SQL语句操作

    集成外部hive

    将hive-site.xml软连接到spark安装目录下的conf目录下[主节点有即可]
    ln -s /usr/local/hive/conf/hive-site.xml /usr/local/spark/conf/hive-site.xml
    ​
    打开sparkshell注意带上hive元数据库的jdbc客户端
    将mysql驱动jar包拷贝到spark的bin目录下
    ./spark-shell --master spark://leetom:7077 --jars mysql-connector-java-5.1.36.jar
    ​
    做完外部hive连接需要注意,因为hive-site.xml文件是在spark的conf目录下,若直接启动spark-shell无论是单机版还是几区都会出现报错error creating tarnsactional connection factory 原因在于启动时会加载hive-site.xml文件,所以必须加载jar路径,为了以后使用建议删除软链接,需要的时候再做外部hive连接,
    rm-rf 软链接

    总结:

    若要把sparksql连接到一个部署好的hive上,你必须把hive-site.xml复制到spark的配置文件目录中($SPARK_HOME/conf),即使没有部署好hive,sparksql也可以运行,需要注意的是,如果没有部署好hive,sparksql会在当前的工作目录中创建自己的hive元数据仓库,叫做metastore_db,此外,如果尝试使用hiveql中的create table(并非create external table)语句来创建表,这些表会被凡在你默认的文件系统中,/user/hive/warehouse目录中(如果你的classpath中有配好的hdfs-site.xml,默认的文件系统就是hdfs,否则就是本地文件系统

    通过代码操作

    需要有hadoop本地环境

    import org.apache.spark.sql.{Row, SparkSession}
    object HiveCode {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("HiveCode")
          .config("spark.sql.warehouse.dir", "D:\\spark-warehouse")
          .master("local[*]")
          .enableHiveSupport()
          .getOrCreate()
        import spark.implicits._
        import spark.sql
       // sql("CREATE TABLE IF NOT EXISTS src_1 (key INT, value STRING)")
        sql("LOAD DATA LOCAL INPATH  'dir/kv1.txt' INTO TABLE src_1")
        sql("SELECT * FROM src_1").show()
        sql("SELECT COUNT(*) FROM src_1").show()
        val sqlDF = sql("SELECT key, value FROM src_1 WHERE key < 10 ORDER BY key")
        sqlDF.as("mixing").show()
        val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
        recordsDF.createOrReplaceTempView("records")
        sql("SELECT * FROM records r JOIN src_1 s ON r.key = s.key").show()
      }
    }
    case class Record(key: Int, value: String)
    如果本地出现ErrorWhileinstantiating
    org.apache.spark.sql.hive.HiveSessionStateBuilder和权限异常问题
    在cmd命令进入到d:/hadoop2.7.6/bin目录执行命令
    winutils.exe chmod 777 /tmp/hive

    连接服务器hive

    需要添加hive-site.xml hdfs-site.xml core-site.xml

    import java.io.File
    ​
    import org.apache.spark.sql.{Row, SparkSession}
    ​
    object HiveCode {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("HiveCode")
          .config("spark.sql.warehouse.dir", "hdfs://hadoop01:8020/spark_warehouse")
          .master("spark://hadoop01:7077")
          .enableHiveSupport()
          .getOrCreate()
        import spark.implicits._
        import spark.sql
        //sql("CREATE TABLE IF NOT EXISTS src_1 (key INT, value STRING)")
       // sql("LOAD DATA LOCAL INPATH 'dir/kv1.txt' INTO TABLE src_1")
        sql("SELECT * FROM src_1").show()
      sql("SELECT COUNT(*) FROM src_1").show()
       val sqlDF = sql("SELECT key, value FROM src_1 WHERE key < 10 ORDER BY key")
       sqlDF.as("mixing").show()
        val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
        recordsDF.createOrReplaceTempView("records")
        sql("SELECT * FROM records r JOIN src_1 s ON r.key = s.key").show()
      }
    }
    case class Record(key: Int, value: String)

    sparksql的输入和输出

    spark的输入

    写法一:

    SparkSession对象.read.json("路径")

    SparkSession对象.read.jdbc("路径")

    SparkSession对象.read.csv("路径")

    SparkSession对象.read. parquet("路径") Parquet格式经常在Hadoop生态圈中被使用,它也支持Spark SQL的全部数据型

    SparkSession对象.read.orc("路径")

    SparkSession对象.read.table("路径")

    SparkSession对象.read.text("路径")

    SparkSession对象.read. textFile("路径")

    写法2:

    SparkSession对象.read.format("json").load("路径")

    ps:若不执行format默认是parquet格式

    sparksql输出

    写法一:

    DataFrame或DataSet对象.write.json("路径")

    DataFrame或DataSet对象.write.jdbc("路径")

    DataFrame或DataSet对象.write.csv("路径")

    DataFrame或DataSet对象.write.parquet("路径")

    DataFrame或DataSet对象.write.orc("路径")

    DataFrame或DataSet对象.write.table("路径")

    DataFrame或DataSet对象.write.text("路径")

    写法二:

    DataFrame或DataSet对象.write.fomat("jdbc").中间可能其他的参数.save()

    ps:典型的是saveMode模式 即 mode方法

    scala/javaany languagemeaning
    SaveMode.ErrorIfExists(default)"error" (default)如果文件存在,则报错
    SaveMode.Append"append"追加
    SaveMode.OverWrite"overwrite"覆写
    SaveMode.Ignore"ignore"数据存在,则忽略

    若不执行format默认是parquet格式

    jdbc操作

    从mysql中将数据获取

    maven.pom

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
            </dependency>
    import java.util.Properties
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    ​
    object SparkSQLAndMySQL {
      def main(args: Array[String]): Unit = {
         val spark = SparkSession.builder().appName("SparkSQLAndMySQL").master("local").getOrCreate()
         //读取方式一
        val connectionProperties = new Properties()
        connectionProperties.put("user", "root")
        connectionProperties.put("password", "123456")
        val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/db1", "t_student",connectionProperties)
        jdbcDF.show();
    ​
        //方式二
        val jdbcDF2 = spark.read.format("jdbc")
    .option("url","jdbc:mysql://localhost:3306/db1").option("dbtable","t_student").option("user","root").option("password","123456").load()
        jdbcDF2.show()
    ​
        spark.stop()
      }
    }

    将数据输出到mysql中

    import java.util.Properties
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    ​
    object SparkSQLAndMySQL {
      def main(args: Array[String]): Unit = {
         val spark = SparkSession.builder()
          .appName("SparkSQLAndMySQL").master("local")
          .getOrCreate()
        //读取数据
         val frame: DataFrame = spark.read.json("dir/employees.json")
        //输出方法一
        val connectionProperties = new Properties()
        connectionProperties.put("user", "root")
        connectionProperties.put("password", "123456")
        //表可以不存在,通过读取的数据可以直接生成表
        frame.write.jdbc("jdbc:mysql://localhost:3306/db1","employees",connectionProperties)
    ​
        //输出方式二
        frame.write.format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/db1")
          .option("dbtable", "employees1")
          .option("user", "root")
          .option("password", "123456")
          .save()
       //输出方式三 执行创建表的列名和数据类型 数据类型不能大写
        frame.write
          .option("createTableColumnTypes", "name varchar(200),salary int")
          .jdbc("jdbc:mysql://localhost:3306/db1", "employees2", connectionProperties)
        spark.stop()
      }
    }

    展开全文
  • SparkSQL

    2021-05-04 09:59:25
    SparkSQLSparkSQL概述什么是SparkSQL特点DataFrameDataSetSparkSQL编程SparkSessionDataFrame创建从Spark数据源进行创建1.我们先创建一个json文件,并上传到linux2.spark.read3.展示结果从RDD转换、HiveTable返回SQL...

    SparkSQL

    什么是SparkSQL

    SparkSQL是Spark用来结构化数据的一个模块,它提供了两个编程抽象:DataFrame、DataSet,并且作为分布式SQL查询引擎的作用。
    将SparkSQL转换为RDD,然后提交至集群,效率非常高

    特点

    1.易整合
    2.统一的数据访问方式
    3.兼容Hive
    4.标准的数据连接

    DataFrame

    与RDD类似,DataFrame也是一个分布式的数据容器。而DataFrame更像传统数据库的二维表格,除记录数据以外,还记录数据的结构信息schema,同时,和Hive类似,DataFrame也支持嵌套数据类型(map、array、struct)
    在这里插入图片描述
    RDD类型以Person类型作为参数,但是Spark框架本身并不了解Person的内部结构,而右侧的DataFrame却提供了详细的结构信息,使得SparkSQL清楚的直到数据集中包含了哪些列(字段)

    DataSet

    1)是Dataframe API的一个扩展,是Spark最新的数据抽象。
    2)用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
    3)Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
    4)样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。
    5) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。
    6)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].
    7)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。

    SparkSQL编程

    SparkSession

    旧的版本中,SparkSQL提供了两种SQL:SQLContext、HiveContext
    SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以SQLContext和HiveContext上可用的API在SparkSession上也是可用的。SparkSession内部封装了sparkContext,所以实际上是由SparkContext完成的
    spark中的SparkSession对象

    scala> spark
    res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4d22265c
    

    DataFrame

    创建

    可以读取的格式:

    scala> spark.read.
    csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile
    
    

    创建DataFrame有三种方式:通过Spark数据源创建、从一个存在的RDD进行转换、还可以从HiveTable查询返回

    从Spark数据源进行创建

    1.我们先创建一个json文件,并上传到linux

    {“age”:18,“name”:“yyx”}
    {“age”:19,“name”:“dd”}
    {“age”:20,“name”:“nn”}

    2.spark.read

    本地路径读取:

    可以看到创建成功DataFrame
    scala> spark.read.json("file:///opt/module/spark/data/test_sparkSQL/people.json")
    res10: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    3.展示结果
    scala> val df = spark.read.json("file:///opt/module/spark/data/test_sparkSQL/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.show
    +---+----+
    |age|name|
    +---+----+
    | 18| yyx|
    | 19|  dd|
    | 20|  nn|
    +---+----+
    
    

    从RDD转换、HiveTable返回

    SQL风格语法

    1.创建一个DataFrame
    2.对DataFrame创建一个临时视图(视图名为person)

    之所以是试图而不是表格,是因为表格可以修改,而Spark RDD通常都是val,不支持修改

    scala> df.createOrReplaceTempView("person")
    

    3.通过SQL语句查询:

    scala> val dfSQL = spark.sql("select * from person")
    dfSQL: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> dfSQL.show
    +---+----+
    |age|name|
    +---+----+
    | 18| yyx|
    | 19|  dd|
    | 20|  nn|
    +---+----+
    
    scala> val dfSQL_morethan19 = spark.sql("select * from person where age > 19")
    dfSQL_morethan19: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> dfSQL
    dfSQL   dfSQL_morethan19
    
    scala> dfSQL_morethan19.show
    +---+----+
    |age|name|
    +---+----+
    | 20|  nn|
    +---+----+
    

    注意,临时表是Session范围内的,Session退出后,表就会失效,如果想应用范围内有效,就要使用全局表,使用全局表时要全路径访问,比如:global_temp.people
    创建一个全局表,并使用

    scala> df.createGlobalTempView("people")
    
    scala> spark.sql("select * from global_temp.people").show
    +---+----+
    |age|name|
    +---+----+
    | 18| yyx|
    | 19|  dd|
    | 20|  nn|
    +---+----+
    

    在新的Session查询:

    scala> spark.newSession().sql("select * from global_temp.people").show
    +---+----+
    |age|name|
    +---+----+
    | 18| yyx|
    | 19|  dd|
    | 20|  nn|
    +---+----+
    
    

    而之前的person表在新的Session查询:

    scala> spark.newSession().sql("select * from person").show
    org.apache.spark.sql.AnalysisException: Table or view not found: person; line 1 pos 14
    
    

    DSL风格语法

    创建一个DateFrame

    scala> val df = spark.read.json("file:///opt/module/spark/data/test_sparkSQL/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    

    查看DataFrame信息

    scala> df.printSchema
    root
     |-- age: long (nullable = true)
     |-- name: string (nullable = true)
    
    

    查看name列数据

    scala> df.select("name").show()
    +----+
    |name|
    +----+
    | yyx|
    |  dd|
    |  nn|
    +----+
    

    查看name列并且age列+1,错误用法:
    会被认为是一整个字段age1

    scala> df.select("name","age"+1).show()
    org.apache.spark.sql.AnalysisException: cannot resolve '`age1`' given input columns: [age, name];;
    'Project [name#80, 'age1]
    
    

    正确方法:
    用$必须都用

    scala> df.select($"name",$"age"+1).show
    +----+---------+
    |name|(age + 1)|
    +----+---------+
    | yyx|       19|
    |  dd|       20|
    |  nn|       21|
    +----+---------+
    

    查询age大于等于19的数据filter

    scala> df.filter($"age">=19).show
    +---+----+
    |age|name|
    +---+----+
    | 19|  dd|
    | 20|  nn|
    +---+----+
    

    按照age分组:

    scala> df.groupBy("age").count.show
    +---+-----+                                                                     
    |age|count|
    +---+-----+
    | 19|    1|
    | 18|    1|
    | 20|    1|
    +---+-----+
    
    

    RDD转换为DataFrame

    RDD与DF、DS之间的操作,要引入import spark.implicits._

    scala> import spark.implicits._
    import spark.implicits._
    此处spark是sparkSession对象名
    

    创建RDD

    scala> val rdd = sc.makeRDD(Array(1,2,3,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[68] at makeRDD at <console>:27
    

    转换

    scala> rdd.toDF.show
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    |    4|
    +-----+
    
    scala> rdd.toDF("id").show
    +---+
    | id|
    +---+
    |  1|
    |  2|
    |  3|
    |  4|
    +---+
    

    再来一次

    scala> val rdd = sc.makeRDD(Array((1,"zhangsan"),(2,"lisi"),(3,"wangwu")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[75] at makeRDD at <console>:27
    
    scala> rdd.toDF("id","name").show
    +---+--------+
    | id|    name|
    +---+--------+
    |  1|zhangsan|
    |  2|    lisi|
    |  3|  wangwu|
    +---+--------+
    
    

    通过事先创建类(给与字段)来转换

    scala> case class user(name:String,age:Int)
    defined class user
    

    转换:

    scala> dataRDD.map(x=>user(x._1,x._2)).toDF.show
    +-----+---+
    | name|age|
    +-----+---+
    |  yyx| 21|
    |  dlq| 22|
    |nulmi|  2|
    +-----+---+
    

    DataFrame转为RDD

    直接调用.rdd即可,注意类型

    scala> val df = dataRDD.map(x=>user(x._1,x._2)).toDF
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> df.rdd
    res9: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[57] at rdd at <console>:31
    
    

    DataSet

    1.创建一个user类:(类名要大写,我忘了…)

    scala> case class user(name:String,age:Int)
    defined class user
    

    创建DataSet

    scala> val caseUserDS = Seq(user("yyx",21)).toDS
    caseUserDS: org.apache.spark.sql.Dataset[user] = [name: string, age: int]
    
    

    DS可以调用的方法:

    scala> caseUserDS.
    agg                       createTempView   foreach            mapPartitions       schema                 toJavaRDD           
    alias                     crossJoin        foreachPartition   na                  select                 toLocalIterator     
    apply                     cube             groupBy            orderBy             selectExpr             toString            
    as                        describe         groupByKey         persist             show                   transform           
    cache                     distinct         head               printSchema         sort                   union               
    checkpoint                drop             inputFiles         queryExecution      sortWithinPartitions   unionAll            
    coalesce                  dropDuplicates   intersect          randomSplit         sparkSession           unpersist           
    col                       dtypes           isLocal            randomSplitAsList   sqlContext             where               
    collect                   except           isStreaming        rdd                 stat                   withColumn          
    collectAsList             explain          javaRDD            reduce              storageLevel           withColumnRenamed   
    columns                   explode          join               registerTempTable   take                   withWatermark       
    count                     filter           joinWith           repartition         takeAsList             write               
    createGlobalTempView      first            limit              rollup              toDF                   writeStream         
    createOrReplaceTempView   flatMap          map                sample              toJSON                                
    

    RDD转换为DataSet

    scala> val dsuser = userRDD.map(line => {user(line._1,line._2)}).toDS
    dsuser: org.apache.spark.sql.Dataset[user] = [name: string, age: int]
    
    

    DataSet转换为RDD

    注意类型

    scala> dsuser.rdd
    res14: org.apache.spark.rdd.RDD[user] = MapPartitionsRDD[61] at rdd at <console>:31
    
    

    DataSet和DataFrame互相操作

    DataSet=>DataFrame

    要导入隐式转换

    import spark.implicits._
    
    scala> val dfuser = dsuser.toDF
    dfuser: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    

    DataFrame=>DataSet

    要添加类名

    scala> dfuser.as[user]
    res15: org.apache.spark.sql.Dataset[user] = [name: string, age: int]
    
    

    RDD、DataSet、DataFrame的共性

    1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
    2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算。
    3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。
    4、三者都有partition的概念
    5、三者有许多共同的函数,如filter,排序等
    6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持
    import spark.implicits._
    7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

    三者区别

    1. RDD:
      1)RDD一般和spark mlib同时使用
      2)RDD不支持sparksql操作
    2. DataFrame:
      1)与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值,如:
    testDF.foreach{
      line =>
        val col1=line.getAs[String]("col1")
        val col2=line.getAs[String]("col2")
     }
    

    2)DataFrame与Dataset一般不与spark mlib同时使用
    3)DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如:

    dataDF.createOrReplaceTempView("tmp")
    spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
    

    4)DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
    3. Dataset:
    1)Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
    2)DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

    IDEA操作SparkSQL

    首先、导入依赖

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object Spark_SQLTest {
      def main(args: Array[String]): Unit = {
        //创建连接
        val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SQL")
        //创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        val frame: DataFrame = spark.read.json("F:\\people.json") //就用之前的
        frame.show()
        // 关闭资源
        spark.close()
      }
    }
    
    

    结果:

    +---+----+
    |age|name|
    +---+----+
    | 18| yyx|
    | 19|  dd|
    | 20|  nn|
    +---+----+
    

    用视图的方式查询:

    package Spark.sparksql
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object Spark_SQLTest {
      def main(args: Array[String]): Unit = {
        //创建连接
        val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SQL")
        //创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        val frame: DataFrame = spark.read.json("F:\\people.json") //就用之前的
        frame.createOrReplaceTempView("people") // 视图
        val re: DataFrame = spark.sql("select * from people") // 用表的方式查询(视图)
        re.show()
        //frame.show()
        // 关闭资源
        spark.close()
      }
    }
    
    

    转换:记得添加隐式转换

    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    object Spark_Trans {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("trans").setMaster("local[*]")
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        // 要转换就要添加隐式转换(spark不是包名)
        import spark.implicits._
        //实现RDD=》DF=》DS=》DF=》RDD
        val dataRDD: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(Array((1, "zhangsan", 5), (2, "李四", 18), (3, "王五", 58)))
        val frame: DataFrame = dataRDD.toDF("id", "name", "age")
        val dataset: Dataset[User] = frame.as[User]
        dataset.show() // 展示一下
        println("==============================")
        val frame1: DataFrame = dataset.toDF()
        frame1.rdd.foreach(x=>{
          println("***************")
          println(x)
        })
    
        /*
         +---+--------+---+
        | id|    name|age|
        +---+--------+---+
        |  1|zhangsan|  5|
        |  2|      李四| 18|
        |  3|      王五| 58|
        +---+--------+---+
        
        
        
        
        ***************
        [2,李四,18]
        ***************
        [3,王五,58]
        ***************
        [1,zhangsan,5]
         */
      }
    }
    case class User(id:Int,name:String,age:Int)//转换类
    

    自定义函数

    UDF

    spark.udf.register(函数名,逻辑)

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.expressions.UserDefinedFunction
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object Spark_SQLUDF {
      def main(args: Array[String]): Unit = {
        //自定义udf函数
        val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SQL")
        //创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        import spark.implicits._
        val frame: DataFrame = spark.read.json("F:\\people.json") //就用之前的
        frame.createOrReplaceTempView("users")
        //注册函数
        val addHead: UserDefinedFunction = spark.udf.register("addHead", { x: String => "name" + x })
        val re: DataFrame = spark.sql("select addHead(name) from users")
        re.show()
        spark.close()
      }
    }
    
    

    结果:

    +-----------------+
    |UDF:addHead(name)|
    +-----------------+
    |          nameyyx|
    |           namedd|
    |           namenn|
    +-----------------+
    

    UDAF聚合函数

    继承UserDefinedAggregateFunction,并实现方法

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, UserDefinedFunction}
    import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    object Spark_SQLDAF {
      def main(args: Array[String]): Unit = {
        //自定义udaf函数
        val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SQL")
        //创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        val frame: DataFrame = spark.read.json("F:\\people.json") //就用之前的
        frame.createOrReplaceTempView("users")
        //自定义UDAF
        //创建函数对象
        val myAvg = new MyAvg
        val myAvg1: UserDefinedAggregateFunction = spark.udf.register("MyAvg", myAvg)
        // 注册好之后
    
        val frame1: DataFrame = spark.sql("select myavg(age) from users")
        frame1.show()
    
        spark.close()
      }
    }
    class MyAvg extends UserDefinedAggregateFunction{
      override def inputSchema: StructType = {
        // 函数输入的数据结构
        new StructType().add("age",LongType)//输入age,类型为Long
      }
    
      override def bufferSchema: StructType = {
        //计算时数据结构
        new StructType().add("sum",LongType).add("count",LongType)//需要这两个数据来计算
      }
      // 函数返回的数据类型
      override def dataType: DataType = DoubleType
    
      //函数是否稳定:相同输入是否有相同输出
      override def deterministic: Boolean = true
    
      //计算之前缓冲区初始化
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
      }
    
      //根据查询结果更新缓冲区数据
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getLong(0) + input.getLong(0)
        buffer(1) = buffer.getLong(1) + 1
      }
    
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        // 缓冲区联合
      buffer1(0) = buffer1.getLong(0)+buffer2.getLong(0)
      buffer1(1) = buffer1.getLong(1)+buffer2.getLong(1)
      }
      // 计算最终结果
      override def evaluate(buffer: Row): Any = {
        buffer.getLong(0).toDouble / buffer.getLong(1)
      }
    }
    

    结果:

    +----------+
    |myavg(age)|
    +----------+
    |      19.0|
    +----------+
    

    强聚合方法:
    结果一样

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, UserDefinedFunction}
    import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructType}
    import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
    
    object Spark_SQLDAF {
      def main(args: Array[String]): Unit = {
        //自定义udf函数
        val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SQL")
        //创建SparkSession对象
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
        import spark.implicits._
        val frame: DataFrame = spark.read.json("F:\\people.json") //就用之前的
        frame.createOrReplaceTempView("users")
        //自定义UDAF
        //创建函数对象
        val myAvg = new MyAvg
        // 将聚合函数转换为查询的列
        val column: TypedColumn[UserBean, Double] = myAvg.toColumn.name("avg")
        // 用DFS风格
        val ds: Dataset[UserBean] = frame.as[UserBean]
        ds.select(column).show()
    
        spark.close()
      }
    }
    
    case class UserBean(name: String, age: Int)
    
    case class AvgBuffer(var sum: Int,var cont: Int) //这里会做修改,所以用var
    
    // 强聚合函数
    
    class MyAvg extends Aggregator[UserBean, AvgBuffer, Double] {
      override def zero: AvgBuffer = { //初始化
        AvgBuffer(0, 0)
      }
    
      override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {
        b.sum = b.sum + a.age
        b.cont = b.cont + 1
        b
      }
    
      //合并
      override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
        b1.sum = b1.sum + b2.sum
        b1.cont = b1.cont + b2.cont
        b1
      }
    
      override def finish(reduction: AvgBuffer): Double = {
        reduction.sum / reduction.cont//计算
      }
    
      override def bufferEncoder: Encoder[AvgBuffer] =Encoders.product
    
      override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
    }
    
    

    SparkSQL数据源

    通过加载/保存方法

    手动指定选项

    SparkSQL默认数据源为Parquet格式,不过,可以进行临时修改或通过配置spark.sql.sources.default永久修改:我们通过spark自带的例子来加载数据:

    scala> val df = spark.read.load("file:///opt/module/spark/examples/src/main/resources/users.parquet")
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
    
    scala> df.show
    21/05/05 09:48:06 WARN hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +------+--------------+----------------+
    |  name|favorite_color|favorite_numbers|
    +------+--------------+----------------+
    |Alyssa|          null|  [3, 9, 15, 20]|
    |   Ben|           red|              []|
    +------+--------------+----------------+
    
    

    如果我们想读json文件,除了直接json外,还可以:
    效果和read.json一样

    scala> val df = spark.read.format("json").load("file:///opt/module/spark/data/test_sparkSQL/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.show
    +---+----+
    |age|name|
    +---+----+
    | 18| yyx|
    | 19|  dd|
    | 20|  nn|
    +---+----+
    

    并且,我们可以通过SparkSession提供的read.load的方法通过加载数据,使用write和save保存数据
    (会自动创建文件)

    scala> df.write.format("json").save("file:///opt/module/spark/outputTest")
    {"age":18,"name":"yyx"}
    {"age":19,"name":"dd"}
    {"age":20,"name":"nn"}
    
    

    如果我们想保存其它格式也可以,只不过输出的未必能看懂

    scala> df.write.format("parquet").save("file:///opt/module/spark/outputTest")
    

    在这里插入图片描述

    当然,再次保存会报错(默认是会error的),我们可以通过mode进行修改:
    追加就不会报错了

    scala> df.write.format("parquet").save("file:///opt/module/spark/outputTest")
    

    JDBC

    SparkSQL可以通过JDBC的方式从关系型数据库中读取数据,创建DataFrame,通过对DataFrame的计算,还可以再将数据返回到数据库
    要将相关驱动数据库的jar包导入
    在这里插入图片描述
    开始练习从MySQL读取数据:
    先看一下mysql数据库:
    在这里插入图片描述

    scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop01:3306/test").option("dbtable", "teacher").option("user", "root").option("password", "000000").load()
    jdbcDF: org.apache.spark.sql.DataFrame = [name: string, sum: string]
    
    scala> jdbcDF.show
    +-----------+---+
    |       name|sum|
    +-----------+---+
    |    chenfei|  2|
    |      ddddd|  1|
    |      lihua|  4|
    |rongzhuqing|  3|
    +-----------+---+
    
    

    还可以将信息封装一下:

    scala> val connectionProperties = new java.util.Properties()
    connectionProperties: java.util.Properties = {}
    
    scala> connectionProperties.put("user", "root")
    res36: Object = null
    
    scala> connectionProperties.put("password", "000000")
    res37: Object = null
    
    scala> val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop01:3306/test", "teacher", connectionProperties)
    jdbcDF2: org.apache.spark.sql.DataFrame = [name: string, sum: string]
    
    scala> jdbcDF2.show
    +-----------+---+
    |       name|sum|
    +-----------+---+
    |    chenfei|  2|
    |      ddddd|  1|
    |      lihua|  4|
    |rongzhuqing|  3|
    +-----------+---+
    
    

    将数据写入MySQL

    scala> val jdbcWrDF = jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop01:3306/test").option("dbtable", "testTable").option("user", "root").option("password", "000000").save()
    jdbcWrDF: Unit = ()
    
    

    同样,也可以用封装:

    
    scala> jdbcDF2.write.jdbc("jdbc:mysql://hadoop01:3306/test", "tableTest", connectionProperties)
    
    

    在这里插入图片描述

    Hive

    内嵌Hive

    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    +--------+---------+-----------+
    
    scala> spark.sql("create table teacher(id int)").show
    
    scala> spark.sql("insert into teacher values(5)").show
    
    scala> spark.sql("select * from teacher").show
    +---+
    | id|
    +---+
    |  5|
    +---+
    
    

    外部Hive

    首先将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下的conf目录下。
    其次,要有mysql连接jar包
    在这里插入图片描述
    前提是hive开启
    重新进入shell

    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    | default|  student|      false|
    | default|     test|      false|
    +--------+---------+-----------+
    

    成功

    [yyx@hadoop01 spark]$ bin/spark-sql
    spark-sql (default)> select * from student;
    id	name
    1001	zhangsan
    1002	lisi
    1003	wangwu
    
    

    不太好看

    展开全文
  • sparkSQL

    2019-07-07 11:13:06
    Spark SQL还可用于从现有Hive安装中读取数据。从其他编程语言中运行SQL时,结果将作为Dataset/DataFrame返回,使用命令行或JDBC / ODBC与SQL接口进行交互。 Dataset是一个分布式数据集合在Spark 1.6提供一个新的...

    Spark SQL

    Spark SQL是用于结构化数据处理的一个模块。同Spark RDD 不同地方在于Spark SQL的API可以给Spark计算引擎提供更多地信息,例如:数据结构、计算算子等。在内部Spark可以通过这些信息有针对对任务做优化和调整。这里有几种方式和Spark SQL进行交互,例如Dataset API和SQL等,这两种API可以混合使用。Spark SQL的一个用途是执行SQL查询。 Spark SQL还可用于从现有Hive安装中读取数据。从其他编程语言中运行SQL时,结果将作为Dataset/DataFrame返回,使用命令行或JDBC / ODBC与SQL接口进行交互。

    Dataset是一个分布式数据集合在Spark 1.6提供一个新的接口,Dataset提供RDD的优势(强类型,使用强大的lambda函数)以及具备了Spark SQL执行引擎的优点。Dataset可以通过JVM对象构建,然后可以使用转换函数等(例如:map、flatMap、filter等),目前Dataset API支持Scala和Java 目前Python对Dataset支持还不算完备。

    Data Frame是命名列的数据集,他在概念是等价于关系型数据库。DataFrames可以从很多地方构建,比如说结构化数据文件、hive中的表或者外部数据库,使用Dataset[row]的数据集,可以理解DataFrame就是一个Dataset[Row].

    SparkSession

    • 依赖
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
    

    Spark中所有功能的入口点是SparkSession类。要创建基本的SparkSession,只需使用SparkSession.builder():

    val spark = SparkSession.builder()
              .appName("hellosql")
              .master("local[10]")
               .getOrCreate()
    //一般都需要引入改隐试转换 主要是 将 RDD 转换为 DataFrame/Dataset
    import spark.implicits._
    
    //关闭Spark日志
    spark.sparkContext.setLogLevel("FATAL")
    spark.stop()
    

    Dataset

    Dataset与RDD类似,但是,它们不使用Java序列化或Kryo,而是使用专用的Encoder来序列化对象以便通过网络进行处理或传输。虽然Encoder和标准序列化都负责将对象转换为字节,但Encoder是动态生成的代码,并使用一种格式,允许Spark执行许多操作,如过滤,排序和散列,而无需将字节反序列化为对象。

    • 集合Case-Class
    case class Person(id:Int,name:String,age:Int,sex:Boolean)
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
        .appName("hellosql")
        .master("local[10]")
        .getOrCreate()
        import spark.implicits._
    
        val dataset: Dataset[Person] = List(Person(1,"zhangsan",18,true),Person(2,"wangwu",28,true)).toDS()
        dataset.select($"id",$"name").show()
    
        //关闭Spark日志
        spark.sparkContext.setLogLevel("FATAL")
        spark.stop()
    }
    
    • 元组
     case class Person(id:Int,name:String,age:Int,sex:Boolean)
      def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder()
              .appName("hellosql")
              .master("local[10]")
              .getOrCreate()
          import spark.implicits._
    
        val dataset: Dataset[(Int,String,Int,Boolean)] = List((1,"zhangsan",18,true),(2,"wangwu",28,true)).toDS()
          dataset.select($"_1",$"_2").show()//元组没有具体类做支撑,因此引入列指定以tuple._下标的形式
    
        //关闭Spark日志
        spark.sparkContext.setLogLevel("FATAL")
        spark.stop()
      }
    
    • 加载json数据
    {"name":"张三","age":18}
    {"name":"lisi","age":28}
    {"name":"wangwu","age":38}
    
    
    case class Person(name: String, age: Long)
    def main(args: Array[String]): Unit = {
      val spark = SparkSession.builder()
      .master("local[5]")
      .appName("spark session")
      .getOrCreate()
      spark.sparkContext.setLogLevel("FATAL")
      import spark.implicits._
    
      val dataset = spark.read.json("D:///Persion.json").as[Person]
      dataset.show()
    
      spark.stop()
    }
    

    Data Frame

    Data Frame是命名列的数据集,他在概念是等价于关系型数据库。DataFrames可以从很多地方构建,比如说结构化数据文件、hive中的表或者外部数据库,使用Dataset[row]的数据集,可以理解DataFrame就是一个Dataset[Row].

    • 加载json文件
    val spark = SparkSession.builder()
    .appName("hellosql")
    .master("local[10]")
    .getOrCreate()
    import spark.implicits._
    
    val frame = spark.read.json("file:///f:/person.json")
    frame.show()
    
    //关闭Spark日志
    spark.sparkContext.setLogLevel("FATAL")
    spark.stop()
    
    • case-class
    case class Person(name:String,age:Long)
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
        .appName("hellosql")
        .master("local[10]")
        .getOrCreate()
        import spark.implicits._
    
        List(Person("zhangsan",18),Person("王五",20)).toDF("uname","uage").show()
        //关闭Spark日志
        spark.sparkContext.setLogLevel("FATAL")
        spark.stop()
    }
    
    • 元组
    case class Person(name:String,age:Long)
      def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder()
              .appName("hellosql")
              .master("local[10]")
              .getOrCreate()
          import spark.implicits._
    
          List(("zhangsan",18),("王五",20)).toDF("name","age").show()
        //关闭Spark日志
        spark.sparkContext.setLogLevel("FATAL")
        spark.stop()
      }
    
    • 通过 RDD 转换(灵活)
    def main(args: Array[String]): Unit = {
    	val spark = SparkSession.builder()
    	  .appName("hellosql")
    	  .master("local[10]")
    	  .getOrCreate()
    	
    	val lines = spark.sparkContext.parallelize(List("zhangsan,20", "lisi,30"))
    		  //先将字符串进行切分,得到Rdd[Row(zhangsan,20)]
    		  .map(line => Row(line.split(",")(0), line.split(",")(1).toInt))
    		  
    	//定义要转换成的frame的结构类型(结构属性(名称,类型,是否可以为空))
    	val structType = new StructType(Array(StructField("name",StringType,true),StructField("age",IntegerType,true)))
    	
    	//创建DataFrame(RDD[Row]数据,结构类型)
    	val frame = spark.createDataFrame(lines,structType)
    	
    	frame.show()
    	
    	//关闭Spark日志
    	spark.sparkContext.setLogLevel("FATAL")
    	spark.stop()
    

    DataFrame 算子操作

    如下格式数据

    Michael,29,2000,true
    Andy,30,5000,true
    Justin,19,1000,true
    Kaine,20,5000,true
    Lisa,19,1000,false
    

    select

    //数据格式 Michael,29,2000,true
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("hellosql")
          .master("local[10]")
          .getOrCreate()
        import spark.implicits._//必须引入隐式转换,否则相关附加功能优化不能实现
    
        var rdd=spark.sparkContext.textFile("file:///D:/WorkSpace02/people.txt")
          //先将字符串进行切分,得到Rdd[Row(Michael,29,2000,true)]
          .map(_.split(","))
          .map(arr=>Row(arr(0),arr(1).trim().toInt,arr(2).trim().toDouble,arr(3).trim().toBoolean))
        //定义要转换成frame的结构类型的结构属性(名称,类型,是否可以为空))
        var fields=new StructField("name",StringType,true)::
          new StructField("age",IntegerType,true)::
          new StructField("salary",DoubleType,true)::
          //::表示追加,给内容为Nil(Null)的集合向前追加,相当于Array(StructField("name",StringType,true),StructField("age",IntegerType,true)...)
          new StructField("sex",BooleanType,true)::Nil
        //创建DataFrame(RDD[Row]数据,结构类型)
        val frame = spark.createDataFrame(rdd, StructType(fields)).as("t_user")
        
        //执行select算子($"名称"引入结构属性名称)
        frame.select($"name",$"age",$"salary",$"sex",$"salary"*12 as "年薪")
             .show()
    
        //关闭Spark日志
        spark.sparkContext.setLogLevel("FATAL")
        spark.stop()
      }
    
    +-------+---+-----+------+-------+
    |   name|age|  sex|salary|   年薪|
    +-------+---+-----+------+-------+
    |Michael| 29| true|2000.0|24000.0|
    |   Andy| 30| true|5000.0|60000.0|
    | Justin| 19| true|1000.0|12000.0|
    |  Kaine| 20| true|5000.0|60000.0|
    |   Lisa| 19|false|1000.0|12000.0|
    +-------+---+-----+------+-------+
    

    filter

    var rdd=  spark.sparkContext.textFile("file:///D:/people.txt")
    .map(_.split(","))
    .map(arr=>Row(arr(0),arr(1).trim().toInt,arr(2).trim().toDouble,arr(3).trim().toBoolean))
    
    var fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType,true):: new StructField("sex",BooleanType,true)::Nil
    spark.createDataFrame(rdd,StructType(fields))
        .select($"name",$"age",$"sex",$"salary",$"salary" * 12 as "年薪")
        .filter($"name" === "Michael" or $"年薪" <  60000)
    .show()
    
    +-------+---+-----+------+-------+
    |   name|age|  sex|salary|   年薪|
    +-------+---+-----+------+-------+
    |Michael| 29| true|2000.0|24000.0|
    | Justin| 19| true|1000.0|12000.0|
    |   Lisa| 19|false|1000.0|12000.0|
    +-------+---+-----+------+-------+
    

    where

    //Michael,29,2000,true
    var rdd=  spark.sparkContext.textFile("file:///D:/people.txt")
    .map(_.split(","))
    .map(arr=>Row(arr(0),arr(1).trim().toInt,arr(2).trim().toDouble,arr(3).trim().toBoolean))
    
    var fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType,true):: new StructField("sex",BooleanType,true)::Nil
    spark.createDataFrame(rdd,StructType(fields))
    .select($"name",$"age",$"sex",$"salary",$"salary" * 12 as "year_salary") //不允许别名中有 中文 bug
    .where("(name = 'Michael') or ( year_salary <= 24000) ")
    .show()
    
    var rdd=  spark.sparkContext.textFile("file:///D:/people.txt")
      .map(_.split(","))
      .map(arr=>Row(arr(0),arr(1).trim().toInt,arr(2).trim().toDouble,arr(3).trim().toBoolean))
    
    var fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType,true):: new StructField("sex",BooleanType,true)::Nil
    spark.createDataFrame(rdd,StructType(fields))
        .select($"name",$"age",$"sex",$"salary",$"salary" * 12 as "年薪")
        .where($"name" === "Michael" or $"年薪" <= 24000)
        .show()
    

    withColumn

    //Michael,29,2000,true
    var rdd=  spark.sparkContext.textFile("file:///D:/people.txt")
    .map(_.split(","))
    .map(arr=>Row(arr(0),arr(1).trim().toInt,arr(2).trim().toDouble,arr(3).trim().toBoolean))
    
    var fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType,true):: new StructField("sex",BooleanType,true)::Nil
    spark.createDataFrame(rdd,StructType(fields))
    .select($"name",$"age",$"sex",$"salary",$"salary" * 12 as "年薪")
    .where($"name" === "Michael" or $"年薪" <= 24000)
    .withColumn("年终奖",$"年薪" * 0.8)
    .show()
    
    +-------+---+-----+------+-------+-------+
    |   name|age|  sex|salary|   年薪| 年终奖|
    +-------+---+-----+------+-------+-------+
    |Michael| 29| true|2000.0|24000.0|19200.0|
    | Justin| 19| true|1000.0|12000.0| 9600.0|
    |   Lisa| 19|false|1000.0|12000.0| 9600.0|
    +-------+---+-----+------+-------+-------+
    

    groupBy

    var rdd=  spark.sparkContext.textFile("file:///D:/people.txt")
    .map(_.split(","))
    .map(arr=>Row(arr(0),arr(1).trim().toInt,arr(2).trim().toDouble,arr(3).trim().toBoolean))
    
    var fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType,true):: new StructField("sex",BooleanType,true)::Nil
    spark.createDataFrame(rdd,StructType(fields))
        .select($"age",$"sex")
        .groupBy($"sex")
        .avg("age")
    .show()
    
    +-----+--------+
    |  sex|avg(age)|
    +-----+--------+
    | true|    24.5|
    |false|    19.0|
    +-----+--------+
    

    agg

    var rdd=  spark.sparkContext.textFile("file:///D:/people.txt")
    .map(_.split(","))
    .map(arr=>Row(arr(0),arr(1).trim().toInt,arr(2).trim().toDouble,arr(3).trim().toBoolean))
    
    var fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType,true):: new StructField("sex",BooleanType,true)::Nil
    import org.apache.spark.sql.functions._
    spark.createDataFrame(rdd,StructType(fields))
    .select($"age",$"sex",$"salary")
    .groupBy($"sex")
    //执行agg算子,算子内可以使用sum.avg.max.min.count组函数
    .agg(sum($"salary") as "toatalSalary",avg("age") as "avgAge",max($"salary"))
    .show()
    
    +-----+------------+------+-----------+
    |  sex|toatalSalary|avgAge|max(salary)|
    +-----+------------+------+-----------+
    | true|     13000.0|  24.5|     5000.0|
    |false|      1000.0|  19.0|     1000.0|
    +-----+------------+------+-----------+
    

    join

    准备一下数据dept.txt

    1,销售部门
    2,研发部门
    3,媒体运营
    4,后勤部门
    

    people.txt

    Michael,29,2000,true,1
    Andy,30,5000,true,1
    Justin,19,1000,true,2
    Kaine,20,5000,true,2
    Lisa,19,1000,false,3
    
    //Michael,29,2000,true,1
    var rdd=  spark.sparkContext.textFile("file:///D:/people.txt")
    .map(_.split(","))
    .map(arr=>Row(arr(0),arr(1).trim().toInt,arr(2).trim().toDouble,arr(3).trim().toBoolean,arr(4).trim().toInt))
    
    var fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType,true):: new StructField("sex",BooleanType,true)::
    new StructField("deptno",IntegerType,true)::Nil
    
    val user = spark.createDataFrame(rdd,StructType(fields)).as("user")
    
    var dept =  spark.sparkContext.textFile("file:///D:/dept.txt")
    .map(line =>(line.split(",")(0).toInt,line.split(",")(1)))
    .toDF("deptno","deptname").as("dept")
    
    user.select($"name",$"user.deptno")
    .join(dept,$"dept.deptno" === $"user.deptno")
    .show()
    
    +-------+------+------+--------+
    |   name|deptno|deptno|deptname|
    +-------+------+------+--------+
    |Michael|     1|     1|销售部门|
    |   Andy|     1|     1|销售部门|
    |   Lisa|     3|     3|媒体运营|
    | Justin|     2|     2|研发部门|
    |  Kaine|     2|     2|研发部门|
    +-------+------+------+--------+
    

    drop

    userDF.select($"deptno",$"salary" )
    .groupBy($"deptno")
    .agg(sum($"salary") as "总薪资",avg($"salary") as "平均值",max($"salary") as "最大值")
    .join(deptDF,$"dept.deptno" === $"user.deptno")
    .drop($"dept.deptno")
    .show()
    
    +------+-------+------------------+-------+--------+
    |deptno| 总薪资|            平均值| 最大值|deptname|
    +------+-------+------------------+-------+--------+
    |     1|43000.0|14333.333333333334|20000.0|销售部门|
    |     2|38000.0|           19000.0|20000.0|研发部门|
    +------+-------+------------------+-------+--------+
    

    orderBy

    userDF.select($"deptno",$"salary" )
    .groupBy($"deptno")
    .agg(sum($"salary") as "总薪资",avg($"salary") as "平均值",max($"salary") as "最大值")
    .join(deptDF,$"dept.deptno" === $"user.deptno")
    .drop($"dept.deptno")
    .orderBy($"总薪资" asc)
    .show()
    
    +------+-------+------------------+-------+--------+
    |deptno| 总薪资|            平均值| 最大值|deptname|
    +------+-------+------------------+-------+--------+
    |     2|38000.0|           19000.0|20000.0|研发部门|
    |     1|43000.0|14333.333333333334|20000.0|销售部门|
    +------+-------+------------------+-------+--------+
    

    map

    userDF.map(row => (row.getString(0),row.getInt(1))).show()
    
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 28|
    +--------+---+
    

    默认情况下SparkSQL会在执行SQL的时候将序列化里面的参数数值,一般情况下系统提供了常见类型的Encoder,如果出现了没有的Encoder,用户需要声明 隐式转换Encoder

    //声明 隐式转换Encoder
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    userDF.map(row => row.getValuesMap[Any](List("name","age","salary")))
    .foreach(map=>{
      var name=map.getOrElse("name","")
      var age=map.getOrElse("age",0)
      var salary=map.getOrElse("salary",0.0)
      println(name+" "+age+" "+salary)
    })
    

    flatMap

    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    userDF.flatMap(row => row.getValuesMap(List("name","age")))
        .map(item => item._1 +" -> "+item._2)
        .show()
    
    +---------------+
    |          value|
    +---------------+
    |name -> Michael|
    |      age -> 29|
    |   name -> Andy|
    |      age -> 30|
    | name -> Justin|
    |      age -> 19|
    |  name -> Kaine|
    |      age -> 20|
    |   name -> Lisa|
    |      age -> 19|
    +---------------+
    
    • limit (take(n))
        //创建spark
        val spark = SparkSession.builder().master("local").appName("testlimit").getOrCreate()
        //导入隐式注入
        import spark.implicits._
        //读入dept文件的数据并转换成dataframe
        val dept = spark.sparkContext.textFile("D://suns/dept.txt")
          .map(line => (line.split(",")(0).toInt, line.split(",")(1)))
          .toDF("deptno", "deptname")
          .as("dept")
        //将people文件中的数据读入rdd
        val personRDD = spark.sparkContext.textFile("D://suns/people.txt")
          .map(_.split(","))
          .map(arr => Row(arr(0), arr(1).toInt, arr(2).toDouble, arr(3).toBoolean, arr(4).toInt))
        // 通过StructType直接指定每个字段的schema
        val fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType)::new StructField("sex",BooleanType)::new StructField("deptno",IntegerType)::Nil
       //定义person 相关dataframe
        val person = spark.createDataFrame(personRDD,StructType(fields)).as("person")
        //查询
    person.select($"name",$"age",$"salary",$"deptno" as "p_deptno")
      //表连接
        .join(dept,$"dept.deptno"===$"p_deptno")
      //删除字段
        .drop("person.deptno")
      //排序
          .orderBy($"deptno")
      //分页   (这里取前三条)
          .limit(3)
        .show()
        spark.sparkContext.setLogLevel("FATAL")//关闭日志打印
        spark.stop()
    
    |  name|deptno|deptname|
    +------+------+--------+
    |  Lisa|     3|媒体运营|
    |Justin|     2|研发部门|
    | Kaine|     2|研发部门|
    +------+------+--------+
    

    SQL获取DataFrame

    Michael,29,20000,true,MANAGER,1
    Andy,30,15000,true,SALESMAN,1
    Justin,19,8000,true,CLERK,1
    Kaine,20,20000,true,MANAGER,2
    Lisa,19,18000,false,SALESMAN,2
    

    sql查询

        //创建spark
        val spark = SparkSession.builder().master("local[3]").appName("testsparksql").getOrCreate()
       //读入文件内容  创建rdd
        val personRDD = spark.sparkContext.textFile("D://suns/people.txt")
          .map(_.split(","))
          .map(arr => Row(arr(0), arr(1).toInt, arr(2).toDouble, arr(3).toBoolean, arr(4), arr(5).toInt))
        // 通过StructType直接指定每个字段的schema
        var fields=new StructField("name",StringType,true)::new StructField("age",IntegerType,true)::new StructField("salary",DoubleType,true)::new StructField("sex",BooleanType,true)::new StructField("job",StringType,true)::new StructField("deptno",IntegerType,true)::Nil
        //将personRDD转换为dataframe
        val personFrame = spark.createDataFrame(personRDD,StructType(fields)).as("person")
        //创建视图
        personFrame.createTempView("t_person")
        //书写sql
        spark.sql("select * from t_person").show()
    
        //关闭日志打印
        spark.sparkContext.setLogLevel("FATAL")
        //停止spark
        spark.stop()
    

    group by

    val rdd = spark.sparkContext.textFile("file:///D:/people.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0), tokens(1).toInt, tokens(2).toDouble, tokens(3).toBoolean, tokens(4), tokens(5).toInt)
    })
    var fields=new StructField("name",StringType,true)::
    new StructField("age",IntegerType,true)::
    new StructField("salary",DoubleType,true)::
    new StructField("sex",BooleanType,true)::
    new StructField("job",StringType,true)::
    new StructField("deptno",IntegerType,true)::Nil
    
    val userDF = spark.createDataFrame(rdd,StructType(fields))
    
    //创建一个视图
    userDF.createTempView("t_user")
    
    spark.sql("select deptno,max(salary),avg(salary),sum(salary),count(1) from t_user group by deptno").show()
    
    //关闭Spark日志
    spark.sparkContext.setLogLevel("FATAL")
    spark.stop()
    
    • having 过滤
    val rdd = spark.sparkContext.textFile("file:///D:/people.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0), tokens(1).toInt, tokens(2).toDouble, tokens(3).toBoolean, tokens(4), tokens(5).toInt)
    })
    var fields=new StructField("name",StringType,true)::
    new StructField("age",IntegerType,true)::
    new StructField("salary",DoubleType,true)::
    new StructField("sex",BooleanType,true)::
    new StructField("job",StringType,true)::
    new StructField("deptno",IntegerType,true)::Nil
    
    val userDF = spark.createDataFrame(rdd,StructType(fields))
    
    //创建一个视图
    userDF.createTempView("t_user")
    
    spark.sql("select deptno,max(salary),avg(salary),sum(salary),count(1) total from t_user group by deptno having total > 2 ")
    .show()
    
    //关闭Spark日志
    spark.sparkContext.setLogLevel("FATAL")
    spark.stop()
    
    • 表连接 join
    val userRdd = spark.sparkContext.textFile("file:///D:/people.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0), tokens(1).toInt, tokens(2).toDouble, tokens(3).toBoolean, tokens(4), tokens(5).toInt)
    })
    val deptRdd = spark.sparkContext.textFile("file:///D:/dept.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0).toInt, tokens(1))
    })
    var userFields=new StructField("name",StringType,true)::
    new StructField("age",IntegerType,true)::
    new StructField("salary",DoubleType,true)::
    new StructField("sex",BooleanType,true)::
    new StructField("job",StringType,true)::
    new StructField("deptno",IntegerType,true)::Nil
    
    var deptFields=new StructField("deptno",IntegerType,true)::
    new StructField("name",StringType,true)::Nil
    
    spark.createDataFrame(userRdd,StructType(userFields)).createTempView("t_user")
    spark.createDataFrame(deptRdd,StructType(deptFields)).createTempView("t_dept")
    
    spark.sql("select u.*,d.name from t_user u left join t_dept d on u.deptno=d.deptno")
    .show()
    
    //关闭Spark日志
    spark.sparkContext.setLogLevel("FATAL")
    spark.stop()
    
    • limit
    val userRdd = spark.sparkContext.textFile("file:///D:/people.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0), tokens(1).toInt, tokens(2).toDouble, tokens(3).toBoolean, tokens(4), tokens(5).toInt)
    })
    val deptRdd = spark.sparkContext.textFile("file:///D:/dept.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0).toInt, tokens(1))
    })
    var userFields=new StructField("name",StringType,true)::
    new StructField("age",IntegerType,true)::
    new StructField("salary",DoubleType,true)::
    new StructField("sex",BooleanType,true)::
    new StructField("job",StringType,true)::
    new StructField("deptno",IntegerType,true)::Nil
    
    var deptFields=new StructField("deptno",IntegerType,true)::
    new StructField("name",StringType,true)::Nil
    
    spark.createDataFrame(userRdd,StructType(userFields)).createTempView("t_user")
    spark.createDataFrame(deptRdd,StructType(deptFields)).createTempView("t_dept")
    
    spark.sql("select u.*,d.name from t_user u left join t_dept d on u.deptno=d.deptno order by u.age asc limit 8")
    .show()
    
    //关闭Spark日志
    spark.sparkContext.setLogLevel("FATAL")
    spark.stop()
    
    • 子查询
    val userRdd = spark.sparkContext.textFile("file:///D:/people.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0), tokens(1).toInt, tokens(2).toDouble, tokens(3).toBoolean, tokens(4), tokens(5).toInt)
    })
    val deptRdd = spark.sparkContext.textFile("file:///D:/dept.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0).toInt, tokens(1))
    })
    var userFields=new StructField("name",StringType,true)::
    new StructField("age",IntegerType,true)::
    new StructField("salary",DoubleType,true)::
    new StructField("sex",BooleanType,true)::
    new StructField("job",StringType,true)::
    new StructField("deptno",IntegerType,true)::Nil
    
    var deptFields=new StructField("deptno",IntegerType,true)::
    new StructField("name",StringType,true)::Nil
    
    spark.createDataFrame(userRdd,StructType(userFields)).createTempView("t_user")
    spark.createDataFrame(deptRdd,StructType(deptFields)).createTempView("t_dept")
    
    spark.sql("select * from (select name,age,salary from t_user)")
    .show()
    
    //关闭Spark日志
    spark.sparkContext.setLogLevel("FATAL")
    spark.stop()
    

    开窗函数

    在正常的统计分析中 ,通常使用聚合函数作为分析,聚合分析函数的特点是将n行记录合并成一行,在数据库的统计当中还有一种统计称为开窗统计,开窗函数可以实现将一行变成多行。可以将数据库查询的每一条记录比作是一幢高楼的一层, 开窗函数就是在每一层开一扇窗, 让每一层能看到整装楼的全貌或一部分。

    查询每个部门员工信息,并返回本部门的平均薪资

    Michael,29,20000,true,MANAGER,1
    Andy,30,15000,true,SALESMAN,1
    Justin,19,8000,true,CLERK,1
    Kaine,20,20000,true,MANAGER,2
    Lisa,19,18000,false,SALESMAN,2
    
    +-------+---+-------+-----+--------+------+
    |   name|age| salary|  sex|     job|deptno|
    +-------+---+-------+-----+--------+------+
    |Michael| 29|20000.0| true| MANAGER|     1|
    |   Jimi| 25|20000.0| true|SALESMAN|     1|
    |   Andy| 30|15000.0| true|SALESMAN|     1|
    | Justin| 19| 8000.0| true|   CLERK|     1|
    |  Kaine| 20|20000.0| true| MANAGER|     2|
    |   Lisa| 19|18000.0|false|SALESMAN|     2|
    +-------+---+-------+-----+--------+------+
    
    val userRdd = spark.sparkContext.textFile("file:///D:/people.txt")
    .map(line => {
        val tokens = line.split(",")
        Row(tokens(0), tokens(1).toInt, tokens(2).toDouble, tokens(3).toBoolean, tokens(4), tokens(5).toInt)
    })
    
    var userFields=new StructField("name",StringType,true)::
    new StructField("age",IntegerType,true)::
    new StructField("salary",DoubleType,true)::
    new StructField("sex",BooleanType,true)::
    new StructField("job",StringType,true)::
    new StructField("deptno",IntegerType,true)::Nil
    
    
    spark.createDataFrame(userRdd,StructType(userFields)).createTempView("t_user")
    
    spark.sql("select *, avg(salary) over(partition by deptno) as avgSalary from t_user")
    .show()
    
    //关闭Spark日志
    spark.sparkContext.setLogLevel("FATAL")
    spark.stop()
    
    +-------+---+-------+-----+--------+------+------------------+
    |   name|age| salary|  sex|     job|deptno|         avgSalary|
    +-------+---+-------+-----+--------+------+------------------+
    |Michael| 29|20000.0| true| MANAGER|     1|14333.333333333334|
    |   Andy| 30|15000.0| true|SALESMAN|     1|14333.333333333334|
    | Justin| 19| 8000.0| true|   CLERK|     1|14333.333333333334|
    |  Kaine| 20|20000.0| true| MANAGER|     2|           19000.0|
    |   Lisa| 19|18000.0|false|SALESMAN|     2|           19000.0|
    +-------+---+-------+-----+--------+------+------------------+
    

    ROW_NUMBER()

    统计员工在部门内部薪资排名

    spark.sql("select * , ROW_NUMBER() over(partition by deptno order by salary DESC) as rank from t_user")
          .show()
    
    +-------+---+-------+-----+--------+------+----+
    |   name|age| salary|  sex|     job|deptno|rank|
    +-------+---+-------+-----+--------+------+----+
    |Michael| 29|20000.0| true| MANAGER|     1|   1|
    |   Andy| 30|15000.0| true|SALESMAN|     1|   2|
    | Justin| 19| 8000.0| true|   CLERK|     1|   3|
    |  Kaine| 20|20000.0| true| MANAGER|     2|   1|
    |   Lisa| 19|18000.0|false|SALESMAN|     2|   2|
    +-------+---+-------+-----+--------+------+----+
    

    统计员工在公司所有员工的薪资排名

    spark.sql("select * , ROW_NUMBER() over(order by salary DESC) as rank from t_user")
          .show()
    
    +-------+---+-------+-----+--------+------+----+
    |   name|age| salary|  sex|     job|deptno|rank|
    +-------+---+-------+-----+--------+------+----+
    |Michael| 29|20000.0| true| MANAGER|     1|   1|
    |  Kaine| 20|20000.0| true| MANAGER|     2|   2|
    |   Lisa| 19|18000.0|false|SALESMAN|     2|   3|
    |   Andy| 30|15000.0| true|SALESMAN|     1|   4|
    | Justin| 19| 8000.0| true|   CLERK|     1|   5|
    +-------+---+-------+-----+--------+------+----+
    

    可以看出ROW_NUMBER()函数只能计算结果在当前开窗函数中的顺序。并不能计算排名。

    DENSE_RANK()

    计算员工在公司薪资排名

    val sql="select * , DENSE_RANK() over(order by salary DESC)  rank  from t_emp"
    spark.sql(sql).show()
    
    +-------+---+-------+-----+--------+------+----+
    |   name|age| salary|  sex|     job|deptno|rank|
    +-------+---+-------+-----+--------+------+----+
    |Michael| 29|20000.0| true| MANAGER|     1|   1|
    |   Jimi| 25|20000.0| true|SALESMAN|     1|   1|
    |  Kaine| 20|20000.0| true| MANAGER|     2|   1|
    |   Lisa| 19|18000.0|false|SALESMAN|     2|   2|
    |   Andy| 30|15000.0| true|SALESMAN|     1|   3|
    | Justin| 19| 8000.0| true|   CLERK|     1|   4|
    +-------+---+-------+-----+--------+------+----+
    

    计算员工在公司部门薪资排名

    val sql="select * , DENSE_RANK() over(partition by deptno order by salary DESC)  rank  from t_emp"
    spark.sql(sql).show()
    
    +-------+---+-------+-----+--------+------+----+
    |   name|age| salary|  sex|     job|deptno|rank|
    +-------+---+-------+-----+--------+------+----+
    |Michael| 29|20000.0| true| MANAGER|     1|   1|
    |  Kaine| 20|20000.0| true| MANAGER|     2|   1|
    |   Lisa| 19|18000.0|false|SALESMAN|     2|   2|
    |   Andy| 30|15000.0| true|SALESMAN|     1|   3|
    | Justin| 19| 8000.0| true|   CLERK|     1|   4|
    +-------+---+-------+-----+--------+------+----+
    

    RANK()

    该函数和DENSE_RANK()类似,不同的是RANK计算的排名顺序不连续。

    计算员工在公司部门薪资排名

    val sql="select * , RANK() over(partition by deptno order by salary DESC)  rank  from t_user"
    spark.sql(sql).show()
    
    +-------+---+-------+-----+--------+------+----+
    |   name|age| salary|  sex|     job|deptno|rank|
    +-------+---+-------+-----+--------+------+----+
    |Michael| 29|20000.0| true| MANAGER|     1|   1|
    |  Kaine| 20|20000.0| true| MANAGER|     2|   1|
    |   Lisa| 19|18000.0|false|SALESMAN|     2|   3|
    |   Andy| 30|15000.0| true|SALESMAN|     1|   4|
    | Justin| 19| 8000.0| true|   CLERK|     1|   5|
    +-------+---+-------+-----+--------+------+----+
    

    自定义函数

    单行函数

    spark.udf.register("yearSalary",(job:String,salary:Double)=> {
        job match {
            case "MANAGER" => salary * 14
            case "SALESMAN" => salary * 16
            case "CLERK" => salary * 13
            case _ => salary*12
        }
    })
    spark.sql("select name,salary, yearSalary(job,salary) as yearSalary from t_user")
    .show()
    

    聚合函数

    无类型聚合( Spark SQL)

    • order.txt
    1,苹果,4.5,2,001
    2,橘子,2.5,5,001
    3,机械键盘,800,1,002
    
    • MySumAggregateFunction
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructType}
    
    class MySumAggregateFunction extends UserDefinedAggregateFunction{
      //说明输出参数,name 参数无所谓
      override def inputSchema: StructType = {
        new StructType().add("price",DoubleType).add("count",IntegerType)
      }
      //最终输出结果的Schema
      override def bufferSchema: StructType = {
          new StructType().add("totalCost",DoubleType)
      }
     // 统计结果值类型
      override def dataType: DataType = DoubleType
      //一般不需要做额外实现,直接返回true
      override def deterministic: Boolean = true
      //设置统计初始值
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        //初始化第一个参数的值是0
        buffer.update(0,0.0)
      }
      //局部计算
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        val price = input.getAs[Double](0)
        val count = input.getAs[Int](1)
        val historyCost = buffer.getDouble(0)
        buffer.update(0,historyCost+(price*count))
      }
      //计算在最终结果,要将结果更新到buffer1
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        var totalCost=buffer1.getDouble(0)+buffer2.getDouble(0)
        buffer1.update(0,totalCost)
      }
      //执行最终的返回结果
      override def evaluate(buffer: Row): Any = {
        buffer.getDouble(0)
      }
    }
    
    • 按照userid统计用户消费
     case class OrderLog(price: Double, count: Int,userid:String)
      def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder()
              .appName("hellosql")
              .master("local[10]")
              .getOrCreate()
        import spark.implicits._
        
    
        var orderDF=  spark.sparkContext.textFile("file:///D:/order.txt")
          .map(_.split(","))
          .map(arr=> OrderLog(arr(2).toDouble,arr(3).toInt,arr(4)))
          .toDF().createTempView("t_order")
    
        spark.udf.register("customsum",new MySumAggregateFunction())
        spark.sql("select userid , customsum(price,count) as totalCost from t_order group by userid").show()
    
        //关闭Spark日志
        spark.sparkContext.setLogLevel("FATAL")
        spark.stop()
      }
    
    • 结果
    +------+---------+
    |userid|totalCost|
    +------+---------+
    |   001|     21.5|
    |   002|    800.0|
    +------+---------+
    

    有类型聚合|强类型聚合 (DataFrame API )

    • AverageState
    case class AverageState(var sum: Double, var total: Int)
    
    • MyAggregator
    import org.apache.spark.sql.{Encoder, Encoders}
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.spark.sql.expressions.Aggregator
    
    class MyAggregator extends Aggregator[GenericRowWithSchema,AverageState,Double] {
      //初始值
      override def zero: AverageState = AverageState(0.0,0)
    
      //局部合并
      override def reduce(b: AverageState, a: GenericRowWithSchema): AverageState ={
          var sum=b.sum + a.getAs[Int]("count") * a.getAs[Double]("price")
          var count=b.total+1
          b.copy(sum,count)
      }
      //最终合并
      override def merge(b1: AverageState, b2: AverageState): AverageState = {
        b1.copy(b1.sum+b2.sum,b1.total+b2.total)
      }
      //最终输出结果
      override def finish(reduction: AverageState): Double = {
        reduction.sum/reduction.total
      }
      //中间计算结果
      override def bufferEncoder: Encoder[AverageState] = {
        Encoders.product[AverageState]
      }
    //最终输出结果
      override def outputEncoder: Encoder[Double] = {
        Encoders.scalaDouble
      }
    }
    
    • 使用
     var orderDF=  spark.sparkContext.textFile("file:///D:/order.txt")
          .map(_.split(","))
          .map(arr=> OrderLog(arr(2).toDouble,arr(3).toInt,arr(4)))
          .toDF()
        val avg = new MyAggregator().toColumn.name("avgCost")
    
        orderDF.groupBy($"userid").agg(avg).show()
    
    展开全文
  • SparkSql技术

    2018-08-16 13:04:03
    一:为什么sparkSQL? 3 1.1:sparkSQL的发展历程 3 1.1.1:hive and shark 3 1.1.2:Shark和sparkSQL 4 1.2:sparkSQL的性能 5 1.2.1:内存列存储(In-Memory Columnar Storage) 6 1.2.2:字节码生成技术...
  • 28 sparkSQL

    2020-08-03 17:55:14
    文章目录28 sparkSQL1.sparkSQL的概述2.RDD以及df以及ds3.dataframe的创建以及操作4.dataset介绍5.编程方式实现sparkSQL查询6.数据源7.sparkSQL当中的分析函数8.sparkSQL当中的自定义函数 28 sparkSQL 1.sparkSQL的...

    28 sparkSQL

    1.sparkSQL的概述

    什么是Spark SQL
    在这里插入图片描述

    结构化:mysql当中的表,字段个数一定,字段的类型也一定了
    半结构化:类似于xml或者json
    非结构化:类似于音频或者视频
    
    sparkCore   ==>  RDD数据抽象   ==>  sparkContext 操作数据对象
    
    sparkSQL    ==> dataFrame/dataSet数据抽象   ==> SparkSession操作数据对象
    

    在这里插入图片描述
    sparkSQL特点:

    1.易整合
    
    2.统一的数据访问
    
    3.兼容hive:sparkSQL可以无缝的直接使用hql语句来做数据分析。可以使用spark  on  hive,将hive底层的执行引擎直接给换成spark的执行引擎
    
    4.标准的数据连接
    

    2.RDD以及df以及ds

    在这里插入图片描述
    dataset是最晚出来的一个数据抽象,dataset的出现主要是为了提供同一个数据抽象访问

    dataset可能在未来的版本里面统一rdd以及dataframe,替换掉rdd以及df,只剩下ds

    rdd以及df以及dataset都是弹性分布式数据集

    df提供schema的定义。scheam定义了df当中数据的字段的个数,以及字段的类型。schema相当于定义了表的规范,然后就可以将df当中的数据映射成为一张表,通过sql语句的方式去操作它

    dataframe也是懒执行的

    dataframe摆脱了jvm堆内存的限制,避免了频繁的GC。dataFrame可以将数据保存到物理内存里面去
    
    dataframe缺少编译器的类型安全的检查,使用的是运行期的检查  int  c = 1/0
    

    dataset是dataframe的扩展

    dataset使用的是编译期类型检查   int a = "hello"。编译期的检查更加安全一点,代码没有可能打包到集群上面去运行
    rdd以及dataframe以及dataset三者之间可以相互的进行转化
    

    三者的共性在这里插入图片描述

    三者的区别
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    3.dataframe的创建以及操作

    3.1 读取文本文件创建DataFrame

    第一种方式:通过RDD配合case class 样例类来将文本文件的数转换 成为DF

    1.读取文本文件成为一个rdd => 2.将rdd给切开,然后将数据转换成为RDD[CASE Class] => 3.将RDD[case  class]  转换成为DF
    

    第一步:创建文本文件

    在linux的/export/servers/路径下创建文本文件
    cd /export/servers/
    vim person.txt
    1 zhangsan 20
    2 lisi 29
    3 wangwu 25
    4 zhaoliu 30
    5 tianqi 35
    6 kobe 40
    

    第二步:定义RDD

    使用spark-shell 进入spark客户端
    cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/
    bin/spark-shell --master local[2]
    val lineRDD = sc.textFile("file:///export/servers/person.txt").map(x => x.split(" "))
    

    第三步:定义case class样例类

    case class Person(id:Int,name:String,age:Int)
    

    第四步:关联RDD与case class

    val personRDD = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))
    

    第五步:将RDD转换成DF

    val personDF  = personRDD.toDF
    

    注意:DF也可以转换成为RDD,直接使用DF调用rdd方法即可

    scala> personDF.rdd.collect
    res38: Array[org.apache.spark.sql.Row] = Array([1,zhangsan,20], [2,lisi,29], [3,wangwu,25], [4,zhaoliu,30], [5,tianqi,35], [6,kobe,40])
    

    第二种方式:通过sparkSession构建DataFrame

    val personDF2 = spark.read.text("file:///export/servers/person.txt")
    

    3.2 读取json文件创建DataFrame

    spark给我们提供了json格式的示例文件,路径在
    /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/src/main/resources/people.json
    我们可以直接通过spark解析json数据进行创建DF

    val jsonDF = spark.read.json("file:///export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/src/main/resources/people.json")
    

    3.3 读取parquet列式存储格式文件创建DataFrame

    spark也给我们提供了parquet格式的数据,我们也可以通过spark直接解析parquet格式的数据来进行创建DF,示例文件的路径在
    /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/src/main/resources/users.parquet

    val parquetDF = spark.read.parquet("file:///export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/src/main/resources/users.parquet")
    parquetDF.show
    parquetDF.printSchema 打印DataFrame的Schema信息
    

    3.4 DF的语法操作风格

    DSL语法操作风格:

    查看某一个字段四种语法
    
    scala> personDF.select("name").show
    personDF.select($"name").show
    personDF.select(col("name")).show
    personDF.select(personDF.col("name")).show
    
    将年龄值加1 
     personDF.select($"name",$"age",$"age" + 1).show
     
    过滤age大于等于25
      personDF.filter($"age" > 25).show
      
    统计年龄大于30岁一共多少人
      personDF.fi
      lter(col("age")>30).count()
      
    按照年龄进行分区,统计每组有多少人
       personDF.groupBy($"age").count.show
    

    SQL语法操作风格:

    直接使用sql语句来实现数据的分析
    

    1.将DataFrame注册成表

    scala> personDF.registerTempTable("t_person")
    warning: there was one deprecation warning; re-run with -deprecation for details
    

    2.查询年龄最大的前两名

    scala> spark.sql("select * from t_person order by age desc limit 2 ").show
    

    3.显示表的Schema信息

    scala> spark.sql("desc t_person").show
    

    4.查询年龄大于30的人的信息

    scala>  spark.sql("select * from t_person where age > 30").show
    

    4.dataset介绍

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    4.1 创建DataSet

    创建dataSet一共可以有四种方式

    1.从集合当中创建

    scala> val ds1 = spark.createDataset(1 to 10)
    

    2.从已经存在的rdd当中进行创建

    val personRDD = sc.textFile("file:///export/servers/person.txt")
    val ds2 =spark.createDataset(personRDD)
    

    3.通过样例类配合集合创建ds

    case class Person(name:String,age:Int)
    val personDataList = List(Person("zhangsan",18),Person("lisi",28))
    val personDS = personDataList.toDS
    

    4.使用df转化生成ds

    case class Person(name:String,age:Long)
     val jsonDF = spark.read.json("file:///export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/src/main/resources/people.json")
     val jsonDS = jsonDF.as[Person]
    

    4.2 DataFrame与DataSet互相转换

    在这里插入图片描述

    在这里插入图片描述

    5.编程方式实现sparkSQL查询

    在这里插入图片描述

    5.1 编写Spark SQL程序实现RDD转换成DataFrame

    第一步:创建maven工程并导入依赖jar包

    <properties>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.2.0</spark.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.5</version>
            </dependency>
        </dependencies>
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                        <!--    <verbal>true</verbal>-->
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.1.1</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    

    5.1.1 第一种方式创建DF:通过反射配合样例类推断Schema

    package cn.itcast.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    
    case class Person(id: Int, name: String, age: Int)
    
    object SparkDF {
      /**
       * 需求:读取文件,生成RDD,配合样例类,将RDD转换成DF
       * 然后用DSL以及SQL语法进行操作
       * @param args
       */
      def main(args: Array[String]): Unit = {
        //读取文件成为RDD,对RDD进行切割
    
        //定义样例类,将RDD转换成为 RDD[case class]
        //将RDD转换成为df
        //使用dsl以及sql语法来操作
    
        //获取sparkSession对象
        val sparkSession: SparkSession = SparkSession.builder().appName("sparkSQL").master("local[2]").getOrCreate()
        //通过sparkContext来读取文件
        val sparkContext: SparkContext = sparkSession.sparkContext
        sparkContext.setLogLevel("WARN")
        val fileRDD: RDD[String] = sparkContext.textFile("file:///E:\\大数据资料\\大数据实时资料\\3、Spark\\Spark第三天\\资料\\person.txt")
        //对RDD进行切割
        val map: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
        val personRDD: RDD[Person] = map.map(x => Person(x(0).toInt,x(1),x(2).toInt))
        //导入隐式转换的包
        import sparkSession.implicits._
        //得到personDF 将RDD转换成为了DF
        val personDF: DataFrame = personRDD.toDF()
        println("=====================DSL语法风格操作开始======================")
        //查看所有数据
        personDF.show()
        //查看schema信息
        personDF.printSchema()
        //查看name和age字段,并且将age字段值+1
        personDF.select($"name",$"age",$"age"+1).show()   //默认显示20行
        //统计年龄大于30岁的人
        val count: Long = personDF.filter($"age">30).count()
        println(count)
        //按照年龄进行分组,查看每组多少人
        personDF.groupBy($"age").count().show()
        println("=====================DSL语法风格操作结束======================")
    
        println("=====================SQL语法风格操作开始======================")
        //将DF注册成为一张表
        personDF.registerTempTable("person1")
        personDF.createTempView("person2")
        personDF.createOrReplaceTempView("person3")
        sparkSession.sql("select * from person1").show()
        sparkSession.sql("select * from person2").show()
        sparkSession.sql("select * from person3").show()
    
        sparkSession.sql("select * from person1 left join person2 on person1.id = person2.id").show()
    
        sparkSession.sql("select * from person1 orderby age limit 2").show()
    
        sparkSession.sql("select age from person1 group by age").show()
    
    
    
        println("=====================SQL语法风格操作结束======================")
        sparkContext.stop()
        sparkSession.close()
      }
    
    }
    

    DSL语法风格
    在这里插入图片描述
    SQL语法风格
    在这里插入图片描述

    5.1.2 第二种方式创建DF:通过StructType配合Row直接指定Schema

    package cn.itcast.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    object SparkDF2 {
      //通过Row对象以及StructType来实现DF创建
      def main(args: Array[String]): Unit = {
        //获取sparkSession
        val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("DF2").getOrCreate()
        //通过sparkContext读取文件,得到RDD
        val sparkContext: SparkContext = sparkSession.sparkContext
        sparkContext.setLogLevel("WARN")
        //获取RDD,只不过RDD[String]  ==> RDD[Row]
        val fileRDD: RDD[String] = sparkContext.textFile("file:///E:\\大数据资料\\大数据实时资料\\3、Spark\\Spark第三天\\资料\\person.txt")
        val arrayRDD: RDD[Array[String]] = fileRDD.map(x =>x.split(" "))
        val rowRDD: RDD[Row] = arrayRDD.map(x => Row(x(0).toInt, x(1),x(2).toInt))
        val structType: StructType = new StructType().add("id",IntegerType).add("name",StringType).add("age",IntegerType)
        //跟两个参数 RDD[Row] StructType
        val dataFrame: DataFrame = sparkSession.createDataFrame(rowRDD,structType)
        dataFrame.show()
        sparkContext.stop()
        sparkSession.close()
      }
    }
    

    5.2 编写Spark SQL程序操作HiveContext

    在这里插入图片描述

    5.2.1 第一步:添加pom依赖

    		<dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
    

    5.2.2 第二步:代码实现

    package cn.itcast.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SparkSession
    
    object SparkHive {
      //需求:创建一张表,加载student.csv文件
      def main(args: Array[String]): Unit = {
        //通过spark使用hive语法,直接创建hive表,然后加载数据即可
        //获取sparkSession
        val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("sparkHive").enableHiveSupport().getOrCreate() //开启对hive语法的支持,数据保存在deby数据库
        val sparkContext: SparkContext = sparkSession.sparkContext
        sparkContext.setLogLevel("WARN")
        sparkSession.sql("create table if not exists student (id int, name string,age int ) row format delimited fields  terminated by ','")
        //加载数据
        sparkSession.sql("load data local inpath './data/student.csv' overwrite into table student")
        sparkSession.sql("select * from student").show()
        sparkContext.stop()
        sparkSession.close()
      }
    }
    

    6.数据源

    6.1 SparkSql从MySQL中加载数据

    第一步:添加jdbc连接驱动jar包

    <dependency>
    	<groupId>mysql</groupId>
    	<artifactId>mysql-connector-java</artifactId>
    	<version>5.1.38</version>
    </dependency>
    

    第二步:开发代码,读取mysql数据库当中的数据

    package cn.itcast.sparksql.jdbc
    
    import java.util.Properties
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object SparkRead {
      def main(args: Array[String]): Unit = {
        //获取sparkSession
        val sparkSession: SparkSession = SparkSession.builder().appName("sparkjdbc").master("local[2]").getOrCreate()
        val sparkContext: SparkContext = sparkSession.sparkContext
        sparkContext.setLogLevel("WARN")
        //sparksession.read.jdbc 读取数据库当中的数据
        // (url: String, table: String, properties: Properties)
        val url = "jdbc:mysql://192.168.1.26:3306/scy"
        val table = "t_product"
        val properties: Properties = new Properties()
        properties.setProperty("user","root")
        properties.setProperty("password","123456")
    
        val jdbc: DataFrame = sparkSession.read.jdbc(url,table,properties)
        jdbc.show()
        sparkContext.stop()
        sparkSession.close()
      }
    }
    

    6.2 通过spark-shell运行加载mysql当中的数据

    (1)、启动spark-shell(必须指定mysql的连接驱动包)

    bin/spark-shell  \
    --master spark://node1:7077 \
    --executor-memory 1g \
    --total-executor-cores 2 \
    --jars /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \
    --driver-class-path /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar
    

    (2)、从mysql中加载数据

    val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.77.21:3306/userdb", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "emp", "user" -> "root", "password" -> "admin")).load()
    

    6.3 SparkSql将数据写入到MySQL中

    在这里插入图片描述
    本地模式提交

    package cn.itcast.sparksql.jdbc
    
    import java.util.Properties
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    
    case class Person(id: Int,name:String,age:Int)
    
    object SparkWrite {
      //需求:读取person.txt,成为一个rdd,将rdd分割,配合样例类,转化成为DF
      //将DF注册成为一张表,从表中查询数据,将结果保存到MySQL去
      def main(args: Array[String]): Unit = {
        val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("sparkWrite").getOrCreate()
        val sparkContext: SparkContext = sparkSession.sparkContext
        sparkContext.setLogLevel("WARN")
        //读取文件
        val fileRDD: RDD[String] = sparkContext.textFile("file:///E:\\大数据资料\\大数据实时资料\\3、Spark\\Spark第三天\\资料\\person.txt")
        val arrayRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
        //将RDD[Array[String]] -> RDD[Person]
        val personRDD: RDD[Person] = arrayRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))
        import sparkSession.implicits._
        //将personRDD转换成为DF
        val personDF: DataFrame = personRDD.toDF()
        personDF.createOrReplaceTempView("person")
        //通过sql语句从表中查询数据,将结果保存到MySQL去
        val result: DataFrame = sparkSession.sql("select * from person")
        val url = "jdbc:mysql://192.168.1.26:3306/scy"
        val table = "person"
        val properties: Properties = new Properties()
        properties.setProperty("user","root")
        properties.setProperty("password","123456")
        result.write.mode(SaveMode.Append).jdbc(url,table,properties)
        sparkContext.stop()
        sparkSession.close()
      }
    }
    

    在这里插入图片描述
    (2)集群上提交
    用maven将程序打包
    修改代码进行打包
    修改master提交地址

    //获取sparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("spark2Mysql").getOrCreate()
    //  .master("local[2]")  打包集群运行,不用再指定master的地址了,等会儿提交jar包的时候,我们手动指定
    

    修改文件读取路径

    val arrRDD: RDD[Array[String]] = sparkContext.textFile(args(0)).map(x => x.split(" "))
    

    (3)(3)将Jar包提交到spark集群

    spark-submit --master spark://node1:7077 \
    --class cn.itcast.spark.sql.Spark2Mysql \
    --executor-memory 1g \
    --total-executor-cores 2 \
    --jars /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \
    --driver-class-path /export/servers/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java-5.1.38.jar \
    original-day03-1.0-SNAPSHOT.jar \
    hdfs://node1.hadoop.com:8020/person.txt
    

    7.sparkSQL当中的分析函数

    在这里插入图片描述
    在这里插入图片描述

    分组求topN

    row_number()
    
    rank over
    
    dense_rank  over()
    

    2、代码实现如下

    package cn.itcast.sparksql.demo4
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object SparkFunction {
      /**
       * 需求:求每个班级分数最高的两个学生
       * @param args
       */
      def main(args: Array[String]): Unit = {
        //sparkSession可以直接读取json格式的数据,转换成为DF
        val sparkSession: SparkSession = SparkSession.builder().appName("sparkFunc").master("local[2]").getOrCreate()
        val sparkContext: SparkContext = sparkSession.sparkContext
        sparkContext.setLogLevel("WARN")
        val jsonDF: DataFrame = sparkSession.read.json("file:///E:\\大数据资料\\大数据实时资料\\3、Spark\\Spark第三天\\资料\\score.txt")
        jsonDF.createOrReplaceTempView("score")
        //通过sql,实现数据的分析
        //第一步
        sparkSession.sql("select name,clazz,score, rank() over(partition by clazz order by score desc) as rankNum from score").show()
        //第二步
        sparkSession.sql("select * from (select name,clazz,score, rank() over(partition by clazz order by score desc) as rankNum from score) temp where temp.rankNum <=2").show()
    
        sparkContext.stop()
        sparkSession.close()
      }
    }
    

    在这里插入图片描述

    在这里插入图片描述

    8.sparkSQL当中的自定义函数

    UDF:一行数据进入,一行数据出来

    UDAF:用户自定义聚合函数 多行数据进入,一行数据出来 例如count,sum ,avg

    UDTF:一行数据进入,多行数据出来 latel view

    8.1 UDF实战 需求:将每一行数据转换成大写

    有数据格式如下:

    helloworld
    abc
    study
    smallWORD
    
    package cn.itcast.sparksql.demo5
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.api.java.UDF1
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    case class Line(lineStr:String)
    
    object SparkUDF {
      //需求:读取文件,注册成为一张表,注册udf函数,使用udf函数,将小写转大写
      def main(args: Array[String]): Unit = {
          //获取sparkSession
          val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("sparkUDF").getOrCreate()
          val sparkContext: SparkContext = sparkSession.sparkContext
          sparkContext.setLogLevel("WARN")
          val fileRDD: RDD[String] = sparkContext.textFile("file:///E:\\大数据资料\\大数据实时资料\\3、Spark\\Spark第三天\\资料\\udf.txt")
          val lineRDD: RDD[Line] = fileRDD.map(x => Line(x))
          //读取数据转换成RDD[Line]
          import sparkSession.implicits._
          val lineDF: DataFrame = lineRDD.toDF()
          //将DF注册成为一张表
          lineDF.createOrReplaceTempView("line_tab")
          //name udf returnType
          //注册用户自定义udf函数
          sparkSession.udf.register("bigger",new UDF1[String,String] {
            override def call(t1: String):String = {
              t1.toUpperCase()
            }
          },StringType)
          sparkSession.sql("select lineStr, bigger(lineStr) as big from line_tab").show()
          sparkContext.stop()
          sparkSession.close()
      }
    }
    

    在这里插入图片描述

    8.2 UDAF实战

    在这里插入图片描述

    package cn.itcast.sparksql.demo5
    
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types._
    
    class SparkFunctionUDAF extends UserDefinedAggregateFunction{
      //输入的数据类型的schema
      override def inputSchema: StructType = {
        StructType(StructField("input",LongType)::Nil)
      }
      //缓冲区数据类型schema,说白了就是转换之后的数据的schema
      //求平均值  总金额  总人数
      override def bufferSchema: StructType = {
        StructType(StructField("sum",LongType)::StructField("total",LongType)::Nil)
      }
      //返回值的数据类型
      override def dataType: DataType = {
        DoubleType
      }
      //确定是否相同的输入会有相同的输出
      override def deterministic: Boolean = {
        true
      }
      //初始化内部数据结构
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
      }
      //更新数据内部结构
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        //所有的金额相加
        buffer(0) = buffer.getLong(0) + input.getLong(0)
        //一共有多少条数据
        buffer(1) = buffer.getLong(1) + 1
      }
      //来自不同分区的数据进行合并
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) =buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
      }
      //计算输出数据值
      override def evaluate(buffer: Row): Any = {
        buffer.getLong(0).toDouble / buffer.getLong(1)
      }
    }
    
    object SparkFunctionUDAF {
      def main(args: Array[String]): Unit = {
        //获取sparkSession
        val sparkSession: SparkSession = SparkSession.builder().appName("sparkUDAF").master("local[2]").getOrCreate()
        //通过sparkSession读取json文件得到DataFrame
        val employeeDF: DataFrame = sparkSession.read.json("file:///E:\\大数据资料\\大数据实时资料\\3、Spark\\Spark第三天\\资料\\udaf.txt")
        //通过DataFrame创建临时表
        employeeDF.createOrReplaceTempView("employee_table")
        //注册我们的自定义UDAF函数
        sparkSession.udf.register("avgSal", new SparkFunctionUDAF)
        //调用我们的自定义UDAF函数
        sparkSession.sql("select avgSal(salary) from employee_table").show()
    
        sparkSession.close()
      }
      }
    

    9.sparkSQL整合hive以及sparkSQL使用

    sparkSQL官方文档:

    http://spark.apache.org/docs/2.2.0/sql-programming-guide.html
    

    在这里插入图片描述
    1、sparkSQL整合hive
    第一步:将hive-site.xml拷贝到spark安装家路径的conf目录下

    node3执行以下命令来拷贝hive-site.xml到所有的spark安装服务器上面去
    cd /export/servers/hive-1.1.0-cdh5.14.0/conf
    cp hive-site.xml  /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
    scp hive-site.xml  node2:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
    scp hive-site.xml  node1:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/conf/
    

    第二步:将mysql的连接驱动包拷贝到spark的jars目录下

    node3执行以下命令将连接驱动包拷贝到spark的jars目录下,三台机器都要进行拷贝
    cd /export/servers/hive-1.1.0-cdh5.14.0/lib
    cp mysql-connector-java-5.1.38.jar /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/
    scp mysql-connector-java-5.1.38.jar node02:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/
    scp mysql-connector-java-5.1.38.jar node01:/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/jars/
    

    第三步:测试sparksql整合hive是否成功

    先启动hadoop集群,在启动spark集群,确保启动成功之后node1执行命令:
    cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
    bin/spark-sql --master spark://node1:7077 --executor-memory 1G --total-executor-cores 2
    指明master地址、每一个executor的内存大小、一共所需要的核数、
    mysql数据库连接驱动。
    执行成功后的界面:进入到spark-sql 客户端命令行界面
    
    查看当前有哪些数据库, 并创建数据库
    show databases;
    create database sparkhive;
    

    在这里插入图片描述
    **加粗样式**

    展开全文
  • SparkSQL简介

    2019-05-15 17:49:24
    1、SparkSQL的发展历程 1.1 Hive and Shark SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是...
  • sparkSQL讲解

    2019-03-08 14:33:12
    我觉得官方文档很详细,研究完文档,再加点个人开发经验,玩转sparkSQL妥妥的呀!哈哈... Spark SQL,DataFrames和Datasets Guide Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API...
  • zeppelin是spark的web版本...一Zeppelin安装 (前提是spark已经安装好) 1 下载https://zeppelin.apache.org/download.html(下载编译好的bin版) 2 解压运行:sh bin/zeppelin-daemon.sh start 3 权限问题:chown...
  • sparkSQL读写数据到MySQL前言sparkSQL加载MySQL表中的数据sparkSQL写数据到MySQLsparkSQL写数据部署到集群pom依赖 前言 spark sql可以通过 JDBC 从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一...
  • SparkSql整合Hive

    2019-09-24 17:13:49
    采用 SparkSql 与 hive 进行整合,通过 SparkSql 读取 hive 中表的元数据,把 HiveHQL 底层采用 MapReduce 来处理任 务,导致性能慢的特点,改为更加强大的 Spark 引擎来进行相应的分析处理 二:整合前提 1.安装好...
  • SparkSql 使用

    千次阅读 2015-01-29 16:06:05
    SparkSql无需安装, 直接初始化SQLContext即可 //启动spark-shell bin/spark-shell --master spark://hadoop1:7077 --executor-memory 3g //RDD演示 val sqlContext= new org.apache.spark.sql....
  • sparksql资料

    2021-02-25 15:33:20
    SparkSQL,顾名思义它是spark生态技术体系中的构建在spark core基础上的基于SQL的计算模块。spark sql的前身叫做shark,早期的sql解析、计算引擎完全来自于hive,但是shark执行速度比hive要高出几个量级。在15年中旬...
  • SparkSQL核心编程

    2021-04-01 22:23:07
    SparkSQL核心编程 1. 构建SparkSQL环境 在老的版本中,SparkSQL提供了两种SQL查询起始点:一个SQLContext,用于Spark自己提供的SQL查询;一个HiveContext,用于连接Hive的查询。 SparkSession是Spark最新的SQL查询...
  • 大数据之SparkSQL

    2021-05-09 22:21:33
    大数据之sparkSQL
  • sparkSQL操作

    2020-12-18 12:15:10
    在Linux中安装好MySQL数据库 MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,目前属于 Oracle 旗下产品。 1.更新软件源,获取最新版本 2.设置mysql的root用户的密码 重新输入 3.启动mysql服务器,...
  • SparkSQL整合 HIVE

    2018-11-13 11:41:41
    SparkSQL整合 HIVE 安装hive(配置好) 将配置好的hive-site.xml放入$SPARK-HOME/conf目录下 将mysql-xxx-connector-xx.jar放到spark集群中节点的lib目录下 启动spark-shell时指定mysql连接驱动位置 先启动spark...
  • sparkSQL整合hivesparkSQL整合hivesparkSQL整合hive步骤示例数据库保存在...把hive安装目录下conf文件夹里的hive-site.xml拷贝到集群每一个spark安装目录下对应的conf文件夹中 目的:让spark知道存放hive表的元数据的M
  • Hive环境搭建 hive下载:http://archive-primary.cloudera.com/cdh5/cdh/5/hive-1.1.0-cdh5.7.0.tar.gz wget ... ...tar -zxvf hive-1.1.0-cdh...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,707
精华内容 1,482
关键字:

sparksql安装