精华内容
下载资源
问答
  • Spark本机SQL引擎 用于Spark SQL的本机引擎,具有矢量化SIMD优化 在线文件 您可以在找到所有Native SQL Engine文档。 介绍 Spark SQL与基于行的结构化数据配合得很好。 它使用WholeStageCodeGen通过Java JIT代码来...
  • Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
  • 主要给大家介绍了关于Spark SQL操作JSON字段的小技巧,文中通过示例代码介绍的非常详细,对大家学习或者使用spark sql具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
  • Learning Spark SQL epub

    2017-10-06 21:17:19
    Learning Spark SQL 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
  • Spark SQL 表达式计算

    2017-05-16 10:23:05
    表达式计算在Spark SQL中随处可见,本演讲将简介表达式、UDF、UDAF、UDTF的概念,主要的API,以及如何扩展Spark SQL函数库。本演讲还将提及Catalyst在计划阶段和Project Tungsten在执行层做的优化,以及未来性能提升...
  • Spark SQL 实验

    2018-08-27 09:12:03
    Spark SQL 详细介绍 实验介绍 有需要的尽快下载吧
  • Spark SQL入门与实践指南》、Hadoop权威指南(中文第3版)、Hadoop源代码分析(完整版)、Spark快速大数据分析
  • 使用spark读取hbase中的数据,并插入到mysql中
  • spark-tpc-ds-performance-test:使用TPC-DS基准测试Spark SQL性能
  • spark Sql

    万次阅读 多人点赞 2019-10-15 08:19:25
    spark sql一.概述1 spark历史2 Spark-SQL 概述2.1 特点2.2 作用2.3 Spark SQL架构图3 Dataset演进历史3.1 RDD3.1.1 优点3.1.2 缺点3.2 DataFrame3.2.1 优点3.2.2 缺点3.2.3 核心特征3.3 Dataset3.3.1 区别3.3.2 特点...

    一.概述

    1 spark历史

    前身: shark (即Hive on Spark)
            hive 进程维护 , shark 线程维护
    新入口:SparkSession
    RDD----->DataFrame------->Dataset
    基本数据类型:Row,schema,StructType,StructField
    支持: scala,Java python,R
    shark:
    	执行计划优化完全依赖于Hive,不方便添加新的优化策略;
    	Spark是线程级并行,而MapReduce是进程级并行。
    	Spark在兼容Hive的实现上存在线程安全问题,导致Shark
    		不得不使用另外一套独立维护的打了补丁的Hive源码分支;
    Spark SQL:
    	作为Spark生态的一员继续发展,而不再受限于Hive,
    	只是兼容Hive;Hive on Spark作为Hive的底层引擎之一
    	Hive可以采用Map-Reduce、Tez、Spark等引擎
    

    2 Spark-SQL 概述

    2.1 特点

    1. 数据兼容:不仅兼容Hive,还可以从RDD、parquet文件、Json文件获取数据、支持从RDBMS获取数据
    2. 性能优化:采用内存列式存储、自定义序列化器等方式提升性能;
    3. 组件扩展:SQL的语法解析器、分析器、优化器都可以重新定义和扩展
    4. 兼容: Hive兼容层面仅依赖HiveQL解析、Hive元数据。
      从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了,Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责
    5. 支持: 数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据;
      Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范;

    2.2 作用

    1. Spark 中用于处理结构化数据的模块;
    2. 相对于RDD的API来说,提供更多结构化数据信息和计算方法
    3. 可以通过SQL或DataSet API方式同Spark SQL进行交互,

    2.3 Spark SQL架构图

    在这里插入图片描述

    3 Dataset演进历史

    RDD------------------->DataFrame-------------------->Dataset
    0.0            		     1.3        			       1.6
    

    在这里插入图片描述

    3.1 RDD

    3.1.1 优点

    1. 编译时类型安全,编译时就能检查出类型错误;
    2. 面向对象的编程风格,直接通过class.name的方式来操作数据;
      idAge.filter(.age > “”) // 编译时报错, int不能跟与String比较
      idAgeRDDPerson.filter(
      .age > 25) // 直接操作一个个的person对象

    3.1.2 缺点

    1. 序列化和反序列化的性能开销,无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化反序列化;
    2. GC的性能开销,频繁的创建和销毁对象, 势必会增加GC

    3.2 DataFrame

    3.2.1 优点

    1. off-heap类似于地盘, schema类似于地图, 有自己地盘了, 不再受JVM的限制, 也就不再受GC的困扰了
    2. 通过schema和off-heap, DataFrame克服了RDD的缺点。对比RDD提升计算效率、减少数据读取、底层计算优化;

    3.2.2 缺点

    1. DataFrame克服了RDD的缺点,但是却丢了RDD的优点。
    2. DataFrame不是类型安全的,API也不是面向对象风格的。
      // API不是面向对象的
      idAgeDF.filter(idAgeDF.col(“age”) > 25)
      // 不会报错, DataFrame不是编译时类型安全的
      idAgeDF.filter(idAgeDF.col(“age”) > “”)

    3.2.3 核心特征

    1. DataFrame的前身是SchemaRDD,不继承RDD,自己实现了RDD的大部分功能,在DataFrame上调用RDD的方法转化成另外一个RDD
    2. DataFrame可以看做分布式Row对象的集合,DataFrame 不仅有比RDD更多的算子,还可以进行执行计划的优化;
    3. DataFrame表示为DataSet[Row],即DataSet的子集
    4. Row :被 DataFrame 自动实现,一行就是一个Row对象
    5. Schema :包含了以ROW为单位的每行数据的列的信息; Spark通过Schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了
    6. off-heap : Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存
    7. Tungsten:新的执行引擎
    8. Catalyst:新的语法解析框架

    3.3 Dataset

    Spark第三代API:Dataset;Dataset的核心:Encoder

    3.3.1 区别

    DataSet不同于RDD,没有使用Java序列化器或者Kryo进行序列化,而是使用一个特定的编码器进行序列化,这些序列化器可以自动生成,而且在spark执行很多操作(过滤、排序、hash)的时候不用进行反序列化。

    3.3.2 特点

    1. 编译时的类型安全检查,性能极大的提升,内存使用极大降低、减少GC、极大的减少网络数据的传输,极大的减少scala和java之间代码的差异性。
    2. DataFrame每一个行对应了一个Row。而Dataset的定义更加宽松,每一个record对应了一个任意的类型。DataFrame只是Dataset的一种特例。
    3. 不同于Row是一个泛化的无类型JVM object, Dataset是由一系列的强类型JVM object组成的,Scala的case class或者Java class定义。因此Dataset可以在编译时进行类型检查
    4. Dataset以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。
    5. Dataset创立需要一个显式的Encoder,把对象序列化为二进制。

    4 SparkSQL API

    在这里插入图片描述
    SparkSession:Spark的一个全新的切入点,统一Spark入口;

    4.1创建SparkSession

    val spark = SparkSession.builder
    .appName()
    .enableHiveSupport()
    .getOrCreate()
    .master()
    spark.conf.set(“spark.sql.shuffle.partitions”,6)
    spark.conf.set(“spark.executor.memory”, “2g”)

    4.2 核心API

    1. sparkSession: spark入口
      统一封装SparkConf,SparkContext,SQLContext, 配置运行参数,读取文件,创建数据,使用SQL
    2. Dataset:
      统一Dataset接口,其中DataFrame==Dataset[Row]
      基本实现了类似RDD的所有算子
      column: Dataset的列对象
      包括对列操作的基本函数
    3. ROW : DataFrame的行对象
      包括对行操作的基本函数
    4. Encoder : 序列化
      支持常用的数据类型,可以直接序列化,也支持case class自定义数据对象进行序列化
    5. functions: Dataset的内置函数
      支持丰富的操作函数(聚合,collection… …)
    6. SQlImplict: 隐式转换
      其中scala对象RDD转换成DF/DS ,DF/DS使用Map/FlatMap方法等;
      要采用的隐式转换格式的
      val spark= SparkSession.()
      import spark.implicts._
      注意 : Dataset是一个类(RDD是一个抽象类,而Dataset不是抽象类),其中有三个参数:
      SparkSession(包含环境信息)
      QueryExecution(包含数据和执行逻辑)
      Encoder[T]:数据结构编码信息(包含序列化、schema、数据类型)

    5 基本操作

    5.1 Row

    import org.apache.spark.sql.Row
    //创建行对象
    val row1=Row(1,”ss”,12,2.2)
    //访问
    row1(0)
    row1.getInt(0)//要与数据的类型对应
    row1.getAsInt

    5.2 Schema

    DataFrame(即带有Schema信息的RDD)Spark通过Schema就能够读懂数据
    DataFrame中提供了详细的数据结构信息,从而使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么,DataFrame中的数据结构信息,即为schema。

    5.3 Schema & StructType & StructField

    import org.apache.spark.sql.types._

    1. val s=(new StructType)
      .add(“列名”,”类型”,可是否为空,”备注”)
      .add(“列名”,”类型”,true/false,”备注”)
      val s1=(new StructType)
      .add(“列名”,IntType,可是否为空,”备注”)
      .add(“列名”,StringType,可是否为空,”备注”)
    2. val s2=(new StructType)
      .add(StructField(“列名”,IntType,false))
      .add(StructField(“列名”,StringType,true))
    3. val s3=StructType(StructField(“列名”,StringType,true)::
      StructField(“列名”,IntType,true)::Nil )
      val s4=StructType( (List/Sep)
      (StructField(“列名”,StringType,true)::
      StructField(“列名”,IntType,true)::Nil ))

    6 Dataset & DataFrame

    Spark提供了一整套用于操纵数据的DSL
    (DSL :Domain Specified Language,领域专用语言)
    DSL在语义上与SQL关系查询非常相近

    6.1 Dataset& DataFrame 的创建

    在这里插入图片描述在这里插入图片描述

    1. 由range生成Dataset
      val numDS = spark.range(5, 100, 5)
      //降序显示前五个
      numDS.orderBy(desc(“id”)).show(5)
      //显示总数,平均数,偏差,最大值,最小值
      numDS.describe().show
    2. 多列由集合生成Dataset
      case class Person(name:String, age:Int, height:Int)
      val seq1 = Seq(Person(“Jack”, 28, 184), Person(“Tom”, 10, 144))
      val ds1 = spark.createDataset(seq1)
      ds1.show
      val seq2 = Seq((“Jack”, 28, 184), (“Tom”, 10, 144))
      val ds2 = spark.createDataset(seq2)
      ds2.show
    3. 集合转成DataFrame,并修改列名必须有类型
      val seq1 =[Sep/List] ((“Jack”, 28, 184), (“Tom”, 10, 144), (“Andy”, 16, 165))
      val df1 = spark.createDataFrame(seq1)
      .withColumnRenamed("_1", “name1”)
      .withColumnRenamed("_2",“age1”)
      .withColumnRenamed("_3", “height1”)
      df1.orderBy(desc(“age1”)).show(10)
      val df2 = spark.createDataFrame(seq1).toDF(“name”, “age”, “height”).show
      // 简单!2.0.0的新方法
      createDataset 无法运用toDS 修改列名

    6.1.2 read 的格式

    在这里插入图片描述

    import org.apache.spark.sql.types._
    val schema2 = StructType( StructField("name", StringType, false) :: 
                              StructField("age",  IntegerType, false) :: 
                              StructField("height", IntegerType, false) ::  Nil)
    options(    Map(k,v)	  )                
    	val df = spark.read.options(Map(("delimiter", ","),
    					("header", "false")))
    					.schema(schema2)
    					.csv("file:///home/spark/t01.csv")
    option().option()					
    	val df1=spark.read.option("header", "true")
    					.option("inferschema","true")
    					.csv("data/emp.dat") 
    	df.show()	
    
    delimiter //分隔符
    header //是否要头部作为列名
    schema //设置格式列名
    inferschema  //当不指定schema时自动推测列的格式
    

    val df = spark.read.csv(“file:///home/spark/data/sparksql/t01.csv”)
    在这里插入图片描述
    val df = spark.read.option(“inferschema”,“true”).csv(“file:///home/spark/data/sparksql/t01.csv”)
    在这里插入图片描述
    val df = spark.read.options(Map((“delimiter”, “,”), (“header”, “false”))). schema(schema6).csv(“file:///home/spark/data/sparksql/t01.csv”)
    val df = spark.read.csv(“file:///home/spark/data/sparksql/t01.csv”)
    在这里插入图片描述

    6.1.3 ** 从MySQL读取数据 **

    // 读取数据库中的数据
    val jdbcDF = spark.read.format("jdbc").
    				option("url", "jdbc:mysql://localhost:3306/spark").
    				option("driver","com.mysql.jdbc.Driver").
    				option("dbtable", "student").
    				option("user", "hive").
    				option("password", "hive").load()
    jdbcDF.show
    jdbcDF.printSchema
    

    备注:
    1、将jdbc驱动拷贝到$SPARK_HOME/jars目录下,是最简单的做法;
    2、明白每一个参数的意思,一个参数不对整个结果出不来;
    3、从数据库从读大量的数据进行分析,不推荐;读取少量的数据是可以接受的,也是常见的做法。

    7 RDD & DataFrame & DataSet 相互转换

    在这里插入图片描述

    7.1 共性

    1、RDD、DataFrame、Dataset都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;

    2、三者都有惰性机制。在进行创建、转换时(如map方法),不会立即执行;只有在遇到Action时(如foreach) ,才会开始遍历运算。极端情况下,如果代码里面仅有创建、转换,但后面没有在Action中使用对应的结果,在执行时会被直接跳过;

    3、三者都有partition的概念,进行缓存(cache)操作、还可以进行检查点(checkpoint)操作;

    4、三者有许多相似的函数,如map、filter,排序等;

    5、在对DataFrame和Dataset进行操作时,很多情况下需要 spark.implicits._ 进行支持;

    7.2 DataFrame

    DataFrame(DataFrame 是 Dataset[Row]的别名):
    DataFrame = RDD[Row] + schema

    1、与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值;

    2、DataFrame与Dataset一般与spark ml同时使用;

    3、DataFrame与Dataset均支持sparksql的操作,比如select,groupBy之类,还能注册临时视图,进行sql语句操作;

    4、DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然;

    7.3 Dataset

    Dataset = RDD[case class].toDS

    1、Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同;

    2、DataFrame 定义为 Dataset[Row]。每一行的类型是Row,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用前面提到的getAS方法或者模式匹配拿出特定字段;

    3、Dataset每一行的类型都是一个case class,在自定义了case class之后可以很自由的获得每一行的信息;

    7.3 转换

    RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换
    在这里插入图片描述

    7.3.1DataFrame/Dataset 转 RDD:

    // 这个转换很简单
    val rdd1=testDF.rdd
    val rdd2=testDS.rdd
    

    7.3.2RDD 转 DataFrame:

    // 一般用元组把一行的数据写在一起,然后在toDF中指定字段名
    import spark.implicits._
    val testDF = rdd.map {line=>
          (line._1,line._2)
        }.toDF("col1","col2")
    

    7.3.3RDD 转 DataSet:

    // 核心就是要定义case class
    import spark.implicits._
    case class Coltest(col1:String, col2:Int)
    val testDS = rdd.map{line=>Coltest(line._1,line._2)}.toDS
    

    7.3.4Dataset 转 DataFrame:

    // 这个转换简单,只是把 case class 封装成Row
    import spark.implicits._
    val testDF = testDS.toDF
    

    7.3.5DataFrame 转 Dataset:

    // 每一列的类型后,使用as方法(as方法后面还是跟的case class,这个是核心),转成Dataset。
    import spark.implicits._
    case class Coltest … …
    val testDS = testDF.as[Coltest]
    

    特别注意:
    在使用一些特殊操作时,一定要加上import spark.implicits._ 不然toDF、toDS无法使用

    case class Person(name:String, age:Int, height:Int)
    val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
    val rdd1 = sc.makeRDD(arr)
    val df = rdd1.toDF()
    val df = rdd1.toDF("name", "age", "height")
    df.as[Person]
    

    二. spark sql 算子

    1.数据类型

    import org.apache.spark.sql.types._
    在这里插入图片描述

    2. Actions

    df1.count	
    
    // 缺省只会显示20行(show里不写时)
    df1.union(df1).show()
    
    // 显示2行
    df1.show(2)
    
    // 不截断字符
    df1.toJSON.show(false)
    
    // 显示10行,不截断字符
    df1.toJSON.show(10, false)
    spark.catalog.listFunctions.show(10000, false)
    
    // collect返回的是数组, Array[org.apache.spark.sql.Row]
    val c1 = df1.collect()
    
    // collectAsList返回的是List, List[org.apache.spark.sql.Row]
    val c2 = df1.collectAsList() 
    
    // 返回 org.apache.spark.sql.Row
    val h1 = df1.head()  
    val f1 = df1.first()
    
    // 返回 Array[org.apache.spark.sql.Row],长度为3
    val h2 = df1.head(3)
    val f2 = df1.take(3)
    
    // 返回 List[org.apache.spark.sql.Row],长度为2
    val t2 = df1.takeAsList(2)
    
    case class Person(name:String, age:Int, height:Int)
    val seq1 = Seq(Person("Jack", 28, 184), 
    Person("Tom", 10, 144), Person("Andy", 16, 165))
    val ds1 = spark.createDataset(seq1)
    ds1.reduce{ (f1, f2) => Person("sum", f1.age+f2.age, f1.height+f2.height) }
    
    // 结构属性
    df1.columns			// 查看列名
    df1.dtypes			// 查看列名和类型
    df1.explain()		// 参看执行计划
    df1.col("name")		// 获取某个列
    df1.printSchema		// 常用
    

    3. Transformations

    // map、flatMap操作(与RDD基本类似)
    df1.map(row=>row.getAs[Int](0)).show
    
    case class Peoples(age:Int, names:String)
    
    val seq1 = Seq(Peoples(30, "Tom, Andy, Jack"), Peoples(20, "Pand, Gate, Sundy"))
    val ds1 = spark.createDataset(seq1)
    val ds2 = ds1.map(x => (x.age+1, x.names))
    ds2.show
    
    val ds3 = ds1.flatMap{ x =>
      val a = x.age
      val s = x.names.split(",").map(name => (a, name.trim))
      s
    }
    ds3.show
    
    // filter
    df1.filter("sal>3000").show
    	
    // randomSplit(与RDD类似,将DF、DS按给定参数分成多份)
    val df2  = df1.randomSplit(Array(0.5, 0.6, 0.7))
    df2(0).count
    df2(1).count
    df2(2).count
    
    // 取10行数据生成新的DataSet
    val df2 = df1.limit(10)
    
    // distinct,去重
    val df2 = df1.union(df1)
    df2.distinct.count
    
    // dropDuplicates,按列值去重
    df2.dropDuplicates.show
    df2.dropDuplicates("mgr", "deptno").show
    df2.dropDuplicates("mgr").show
    df2.dropDuplicates("deptno").show
    
    // 返回全部列的统计(count、mean、stddev、min、max)
    ds1.describe().show
    
    // 返回指定列的统计
    ds1.describe("sal").show
    ds1.describe("sal", "comm").show
    
    // 存储相关的方法,与RDD的方法一致
    import org.apache.spark.storage.StorageLevel
    spark.sparkContext.setCheckpointDir("hdfs://node1:8020/checkpoint")
    
    df1.show()
    df1.checkpoint()
    df1.cache()
    df1.persist(StorageLevel.MEMORY_ONLY)
    df1.count()
    df1.unpersist(true)
    

    4. select相关

    // 列的多种表示方法(5种)。使用""、$""、'、col()、ds("")
    // 注意:不要混用;必要时使用spark.implicitis._;并非每个表示在所有的地方都有效
    df1.select($"ename", $"hiredate", $"sal").show
    df1.select("ename", "hiredate", "sal").show
    df1.select('ename, 'hiredate, 'sal).show
    df1.select(col("ename"), col("hiredate"), col("sal")).show
    df1.select(df1("ename"), df1("hiredate"), df1("sal")).show
    
    // 下面的写法无效,其他列的表示法有效
    df1.select("ename", "hiredate", "sal"+100).show
    df1.select("ename", "hiredate", "sal+100").show
    
    // 可使用expr表达式(expr里面只能使用引号)
    df1.select(expr("comm+100"), expr("sal+100"), expr("ename")).show
    df1.selectExpr("ename as name").show
    df1.selectExpr("power(sal, 2)", "sal").show
    //四舍五入,负数取小数点以前的位置,正数取小数点后的位数
    df1.selectExpr("round(sal, -3) as newsal", "sal", "ename").show
    

    drop、withColumn、 withColumnRenamed、casting

    // drop 删除一个或多个列,得到新的DF
    df1.drop("mgr")
    df1.drop("empno", "mgr")
    
    // withColumn,修改列值
    val df2 = df1.withColumn("sal", $"sal"+1000)
    df2.show
    
    // withColumnRenamed,更改列名
    df1.withColumnRenamed("sal", "newsal")
    
    // 备注:drop、withColumn、withColumnRenamed返回的是DF
    
    df1.selectExpr("cast(empno as string)").printSchema
    
    import org.apache.spark.sql.types._
    df1.select('empno.cast(StringType)).printSchema
    

    5.where 相关

    // where操作
    df1.filter("sal>1000").show
    df1.filter("sal>1000 and job=='MANAGER'").show
    
    // filter操作
    df1.where("sal>1000").show
    df1.where("sal>1000 and job=='MANAGER'").show
    

    6. groupBy 相关

    // groupBy、max、min、mean、sum、count(与df1.count不同)
    df1.groupBy("列名").sum("sal").show
    df1.groupBy("Job").max("sal").show
    df1.groupBy("Job").min("sal").show
    df1.groupBy("Job").avg("sal").show
    df1.groupBy("Job").count.show
    
    // agg
    df1.groupBy().agg("sal"->"max", "sal"->"min", "sal"->"avg", "sal"->"sum", "sal"->"count").show
    df1.groupBy("Job").agg("sal"->"max", "sal"->"min", "sal"->"avg", "sal"->"sum", "sal"->"count").show
    df1.groupBy("deptno").agg("sal"->"max", "sal"->"min", "sal"->"avg", "sal"->"sum", "sal"->"count").show
    
    // 这种方式更好理解
    df1.groupBy("Job").agg(max("sal"), min("sal"), avg("sal"), sum("sal"), count("sal")).show
    // 给列取别名
    df1.groupBy("Job").agg(max("sal"), min("sal"), avg("sal"), sum("sal"), count("sal")).withColumnRenamed("min(sal)", "min1").show
    // 给列取别名,最简便
    df1.groupBy("Job").agg(max("sal").as("max1"), min("sal").as("min2"), avg("sal").as("avg3"), sum("sal").as("sum4"), count("sal").as("count5")).show
    

    7. orderBy、sort 相关

    // orderBy
    
    df1.orderBy("sal").show
    df1.orderBy($"sal").show
    df1.orderBy($"sal".asc).show
    df1.orderBy('sal).show
    df1.orderBy(col("sal")).show
    df1.orderBy(df1("sal")).show
    //降序	
    df1.orderBy($"sal".desc).show
    df1.orderBy(-'sal).show
    df1.orderBy(-'deptno, -'sal).show
    
    // sort,以下语句等价
    
    df1.sort("sal").show
    df1.sort($"sal").show
    df1.sort($"sal".asc).show
    df1.sort('sal).show
    df1.sort(col("sal")).show
    df1.sort(df1("sal")).show
    //降序	
    df1.sort($"sal".desc).show
    df1.sort(-'sal).show
    df1.sort(-'deptno, -'sal).show
    

    8. 集合相关(交、并、差)

    // union、unionAll、intersect、except。集合的交、并、差
    val ds3 = ds1.select("sname")
    val ds4 = ds2.select("sname")
    
    // union 求并集,不去重
    ds3.union(ds4).show
    
    // unionAll、union 等价;unionAll过期方法,不建议使用
    ds3.unionAll(ds4).show
    
    // intersect 求交
    ds3.intersect(ds4).show
    
    // except 求差
    ds3.except(ds4).show
    

    9. join 相关(DS在join操作之后变成了DF)

    // 10种join的连接方式(下面有9种,还有一种是笛卡尔积)
    ds1.join(ds2, "sname").show
    ds1.join(ds2, Seq("sname"), "inner").show
    
    ds1.join(ds2, Seq("sname"), "left").show
    ds1.join(ds2, Seq("sname"), "left_outer").show
    
    ds1.join(ds2, Seq("sname"), "right").show
    ds1.join(ds2, Seq("sname"), "right_outer").show
    
    ds1.join(ds2, Seq("sname"), "outer").show
    ds1.join(ds2, Seq("sname"), "full").show
    ds1.join(ds2, Seq("sname"), "full_outer").show
    
    ds1.join(ds2, Seq("sname"), "left_semi").show
    ds1.join(ds2, Seq("sname"), "left_anti").show
    
    备注:DS在join操作之后变成了DF
    val ds1 = spark.range(1, 10)
    val ds2 = spark.range(6, 15)
    // 类似于集合求交
    ds1.join(ds2, Seq("id"), "left_semi").show
    // 类似于集合求差
    ds1.join(ds2, Seq("id"), "left_anti").show
    

    10.空值处理

    // NaN 非法值
    math.sqrt(-1.0); math.sqrt(-1.0).isNaN()
    
    df1.show
    // 删除所有列的空值和NaN
    df1.na.drop.show
    
    // 删除某列的空值和NaN
    df1.na.drop(Array("mgr")).show
    
    // 对全部列填充;对指定单列填充;对指定多列填充
    df1.na.fill(1000).show
    df1.na.fill(1000, Array("comm")).show
    df1.na.fill(Map("mgr"->2000, "comm"->1000)).show
    
    // 对指定的值进行替换
    df1.na.replace("comm" :: "deptno" :: Nil, Map(0 -> 100, 10 -> 100)).show
    
    // 查询空值列或非空值列。isNull、isNotNull为内置函数
    df1.filter("comm is null").show
    df1.filter($"comm".isNull).show
    df1.filter(col("comm").isNull).show
    
    df1.filter("comm is not null").show
    df1.filter(col("comm").isNotNull).show
    

    11. 时间日期函数

    // 各种时间函数
    df1.select(year($"hiredate")).show
    df1.select(weekofyear($"hiredate")).show
    df1.select(minute($"hiredate")).show
    df1.select(date_add($"hiredate", 1), $"hiredate").show
    df1.select(current_date).show
    df1.select(unix_timestamp).show
    
    val df2 = df1.select(unix_timestamp as "unixtime")
    df2.select(from_unixtime($"unixtime")).show
    
    // 计算年龄
    df1.select(round(months_between(current_date, $"hiredate")/12)).show
    

    12. json 数据源 建表

    // 读数据(txt、csv、json、parquet、jdbc)
    val df2 = spark.read.json("data/employees.json")
    df2.show
    // 备注:SparkSQL中支持的json文件,文件内容必须在一行中
    
    // 写文件
    df1.select("ename", "sal").write.format("csv").save("data/t2")
    df1.select("ename", "sal").write
    .option("header", true)
    .format("csv").save("data/t2")
    
    建表:
    val data =spark.read.json("/project/weibo/*.json")
    data.createOrReplaceTempView("t2")
     spark.sql("create table sparkproject.weibo as select * from t2")
     spark.sql("select * from  sparkproject.weibo limit 5").show
    

    13. DF、DS对象上的SQL语句

    // 注册为临时视图。
    // 有两种形式:createOrReplaceTempView / createTempView 
    df1.createOrReplaceTempView("temp1")
    spark.sql("select * from temp1").show
    
    df1.createTempView("temp2")
    spark.sql("select * from temp2").show
    
    // 使用下面的语句可以看见注册的临时表
    spark.catalog.listTables.show	
    
    备注:
    1、spark.sql返回的是DataFrame;
    2、如果TempView已经存在,使用createTempView会报错;
    3、SQL的语法与HQL兼容;
    

    三 窗体函数 (不断地累计计算)

    1. 语法形式

    分析函数的语法结构一般是:

    分析函数名(参数) OVER (PARTITION BY子句 ORDER BY子句 ROWS/RANGE子句)
    
    分析函数名:sum、max、min、count、avg等聚集函数,或lead、lag行比较函数 或 排名函数等;
    over:关键字,表示前面的函数是分析函数,不是普通的集合函数;
    分析子句:over关键字后面挂号内的内容;分析子句又由以下三部分组成:
    partition by:分组子句,表示分析函数的计算范围,不同的组互不相干;
    ORDER BY:排序子句,表示分组后,组内的排序方式;
    ROWS/RANGE:窗口子句,是在分组(PARTITION BY)后,组内的子分组(也称窗口),是分析函数的计算范围窗口。窗口有两种,ROWS和RANGE;
    

    在这里插入图片描述
    spark.sql("""
    SELECT cookieid, createtime, pv,
    SUM(pv) OVER(PARTITION BY cookieid ORDER BY createtime) AS pv1,
    SUM(pv) OVER(PARTITION BY cookieid ORDER BY createtime
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS pv2
    FROM t1
    “”").show

    ROWS BETWEEN,也叫做Window子句
    当同一个select查询中存在多个窗口函数时,他们相互之间是没有影响的.每个窗口函数应用自己的规则;
    
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:
    ROWS BETWEEN … AND …(开始到结束,位置不能交换)
    UNBOUNDED PRECEDING:从第一行开始
    CURRENT ROW:当前行
    
    第一行:UNBOUNDED PRECEDING
    最后一行:UNBOUNDED FOLLOWING
    前 n 行:n PRECEDING
    后 n 行:n FOLLOWING
    

    在这里插入图片描述

    2. rows & range

    	range是逻辑窗口,是指定当前行对应值的范围取值,行数不固定,
    只要行值在范围内,对应列都包含在内。
    
    	rows是物理窗口,即根据order by 子句排序后,取的前N行及后N行的数据计算
    (与当前行的值无关,只与排序后的行号相关)。
    “ROWS” 是按照行数进行范围定位的,
    而“RANGE”则是按照值范围进行定位的,
    这两个不同的定位方式 主要用来处理并列排序的情况	前后遇到相同的值的时候会进行累加
    

    在这里插入图片描述

    3.partition by & order by的组合

    1. 同时存在

       spark.sql("""
       SELECT cookieid, createtime, pv,
              SUM(pv) OVER(PARTITION BY cookieid ORDER BY createtime) AS pv1
       FROM t1
       """).show
      

    正常显示
    2) partition by出现

    	spark.sql("""
    	SELECT cookieid, createtime, pv,
    	       SUM(pv) OVER(PARTITION BY cookieid ) AS pv1
    	FROM t1
    	""").show
    

    没有排序仅分组内没有一个个迭代计算
    都是分组的计算值
    3) order by 出现
    只有排序 两两相加再加上,上一个的和(两两是计算机分组的数)
    4) 都没有
    显示总计算值

    4. 排名函数

    row_number() 是没有重复值的排序(即使两行记录相等也是不重复的)
    rank() 是跳跃排序,两个第二名下来就是第四名
    dense_rank() 是连续排序,两个第二名仍然跟着第三名
    NTILE(n),用于将分组数据按照顺序切分成n片

    spark.sql("""
    SELECT cookieid, createtime, pv,
           row_number() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank1,
           rank()       OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank2,
           dense_rank() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank3,
           ntile(3)      OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank4
    FROM t1
    """).show
    

    5. 行函数:lag、lead

    lag(field, N) 取前N行的值
    lead(field, N) 取后N行的值
    注意:取前/后N行的值是当前行往前/后数第n行的值

    first_value,取分组内排序后,截止到当前行,第一个值
    根据组内排序获得第一行的
    last_value,取分组内排序后,截止到当前行,最后一个值
    也就是当前行的值

    spark.sql("""
      select cookieid, createtime, pv,
             lag(pv)    over (PARTITION BY cookieid ORDER BY pv) as col1,
             lag(pv, 1) over (PARTITION BY cookieid ORDER BY pv) as col2,
             lag(pv, 2) over (PARTITION BY cookieid ORDER BY pv) as col3
        from t1
      order by cookieid""").show
    
    spark.sql("""
      select cookieid, createtime, pv,
             lead(pv)    over (PARTITION BY cookieid ORDER BY pv) as col1,
             lead(pv, 1) over (PARTITION BY cookieid ORDER BY pv) as col2,
             lead(pv, 2) over (PARTITION BY cookieid ORDER BY pv) as col3
        from t1
      order by cookieid""").show
    
    spark.sql("""
      select cookieid, createtime, pv,
             lead(pv, -2) over (PARTITION BY cookieid ORDER BY pv) as col1,
             lag(pv, 2)   over (PARTITION BY cookieid ORDER BY pv) as col2
        from t1
      order by cookieid""").show
    

    6. first_value、last_value

    first_value,取分组内排序后,截止到当前行,第一个值
    根据组内排序获得第一行的
    last_value,取分组内排序后,截止到当前行,最后一个值
    也就是当前行的值

    // first_value,取分组内排序后,截止到当前行,第一个值
    spark.sql("""
    SELECT cookieid, createtime, pv,
           row_number()  OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank1,
           first_value(createtime) OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank2,
           first_value(pv) OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank3
    FROM t1
    """).show
    
    // last_value,取分组内排序后,截止到当前行,最后一个值
    spark.sql("""
    SELECT cookieid, createtime, pv,
           row_number()  OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank1,
           last_value(createtime) OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank2,
           last_value(pv) OVER(PARTITION BY cookieid ORDER BY pv desc) AS rank3
    FROM t1
    """).show
    
    // 备注:lag、lead、first_value、last_value 不支持窗口子句
    

    7.合并记录(collect_set,concat_ws,collect_list)

    都是内置的函数
    concat_ws:实现多行记录合并成一行
    collect_set:对记录去重
    collect_list:不对记录去重在这里插入图片描述案例:

    case class UserAddress(userid:String, address:String)
    val userinfo = Seq(UserAddress("a", "address1"), UserAddress("a", "address2"), UserAddress("a", "address2"), UserAddress("b", "address3"), UserAddress("c", "address4"))
    val ds1 = spark.createDataset(userinfo)
    ds1.createOrReplaceTempView("t1")
    
    // SQL语句
    val df2 = spark.sql("""
      select userid, concat_ws(';', collect_set(address))  as addSet,
                     concat_ws(',', collect_list(address)) as addList,
                     collect_list(address) as addSet1,
                     collect_list(address) as addList1
        from t1 
      group by userid
    """)
    df2.printSchema
    df2.show
    
    // DSL语法
    ds1.groupBy($"userid").
       agg(collect_list($"address").alias("address1"), 
            collect_set($"address").alias("address2")).show
    

    8.展开记录(explode)

    在这里插入图片描述

    //合并
    val ds2 = ds1.groupBy($"userid").agg(collect_set($"address") as "address")
    
    // 拆分
    // SQL语句
    ds2.createOrReplaceTempView("t2")
    // explode:将一行中复杂的array或者map结构拆分成多行
    spark.sql("select userid, explode(address) from t2").show
    
    // DSL语法
    ds2.select($"userid", explode($"address")).show
    

    9.left semi & anti join

    半连接:左半连接实现了类似in、exists的查询语义,输出符合条件的左表内容;

    反连接:两表关联,只返回主表的数据,并且只返回主表与子表没关联上的数据,这种连接就叫反连接。反连接一般就是指的 not in 和 not exists;

    // 等价的SQL(左半连接、in、exists)这几个等价
    // 效率最低
    sql("""
      (select id from t1
      intersect
      select id from t2)
      order by id
    """).show
    
    spark.sql("""
      select t1.id 
        from t1 left semi join t2 on (t1.id=t2.id)
    """).show
    
    spark.sql("""
      select t1.id 
        from t1
       where t1.id in (select id from t2)
    """).show
    
    spark.sql("""
      select t1.id 
        from t1
       where exists (select id from t2 where t1.id = t2.id)
    """).show
    备注:三条查询语句使用了相同的执行计划
    

    在这里插入图片描述

    // 等价的SQL(左反连接、in、exists)
    spark.sql("""
      select t1.id 
        from t1 left anti join t2 on (t1.id=t2.id)
    """).show
    
    spark.sql("""
      select t1.id 
        from t1
       where t1.id not in (select id from t2)
    """).show
    
    spark.sql("""
      select t1.id 
        from t1
       where not exists (select id from t2 where t1.id = t2.id)
    """).show
    

    在这里插入图片描述
    备注:
    1、Hive在0.13版本中才实现(not) in/exists功能。在低版本的Hive中只能使用left semi/anti join;
    2、传统的数据库中,历史上exists先执行外层查询,再执行内层查询,与in相反;现在的优化引擎一般情况下能够做出正确的选择;
    3、在数据量小的情况下,三者等同。数据量大推荐使用join,具体还要看执行计划

    四.UDF

    UDF: 自定义函数。函数的输入、输出都是一条数据记录,类似于Spark SQL中普通的数学或字符串函数,从实现上看就是普通的Scala函数;
    为了解决一些复杂的计算,并在SQL函数与Scala函数之间左右逢源
    UDF的参数视为数据表的某个列;
    书写规范:

    1.注册版

    1. import spark.implicits._
    2. def funName(参数:类型)={函数体} //自定义函数
    3. spark.udf.register(“fun1”, funName _ )
      fun1 :是sql中要用的函数
      funName _ :自定义的函数名+空格+下划线
      // 注册函数
      4)val x=spark.sql(“select id, fun1(colname) from tbName ”)

    2.非注册版

    1. import org.apache.spark.sql.functions._
      import spark.implicits._
    2. val fun2=udf((参数:类型,length:Int)=>参数.length>length)
    3. val getData=DataFrame类型数据.filter(fun2($ ”参数”,lit(10)))
      $ : 可以接收的数据会当成Column对象($符号来包裹一个字符串表示一个Column)
      当不用注册时要有udf包住自定义函数—>udf函数

    3. 案例

    import org.apache.spark.sql.{Row, SparkSession}
    
    object UDFDemo {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("UDFDemo")
          .master("local[*]")
          .getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
    
        val data = List(("scala", "author1"), ("spark", "author2"), ("hadoop", "author3"), ("hive", "author4"), ("strom", "author5"), ("kafka", "author6"))
        val df = spark.createDataFrame(data).toDF("title", "author")
        df.createTempView("books")
    
        // 定义函数并注册
        def len1(bookTitle: String):Int = bookTitle.length
        spark.udf.register("len1", len1 _)
        // UDF可以在select语句、where语句等多处使用
        spark.sql("select title, author, len1(title) from books").show
        spark.sql("select title, author from books where len1(title)>5").show
    
        // UDF可以在DataFrame、Dataset的API中使用
        import spark.implicits._
        df.filter("len1(title)>5").show
        // 不能通过编译
        //df.filter(len1($"title")>5).show
        // 能通过编译,但不能执行
        //df.select("len1(title)").show
        // 不能通过编译
        //df.select(len1($"title")).show
    
        // 如果要在DSL语法中使用$符号包裹字符串表示一个Column,需要用udf方法来接收函数。这种函数无需注册
        import org.apache.spark.sql.functions._
        val len2 = udf((bookTitle: String) => bookTitle.length)
        df.filter(len2($"title")>5).show
        df.select(len2($"title")).show
    
        // 不使用UDF
        df.map{case Row(title: String, author: String) => (title, author, title.length)}.show
    
        spark.stop()
      }
    }
    

    五. UDAF

    UDAF :用户自定义聚合函数。函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作(多条数据输入,一条数据输出);类似于在group by之后使用的sum、avg等函数
    在这里插入图片描述
    在这里插入图片描述

    abstract class UserDefinedAggregateFunction extends Serializable{
    def inputSchema : StructType
    //inputSchema用于定义与DataFrame列有关的输入样式
    
    def bufferSchema : StructType
    //bufferSchema用于定义存储聚合运算时产生的中间数据结果的Schema;
    
    def dataType : DataFrame
    //dataType标明了UDAF函数的返回值类型;
    
    def deterministic : Boolean
    //deterministic是一个布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果;
    
    def initialize ( buffer : MutableAggregationBuffer) : Unit
    //initialize对聚合运算中间结果的初始化;
    
    def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
    //update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始;
    UDAF的核心计算都发生在update函数中;
    update函数的第二个参数input: Row对应的并非DataFrame的行,
    而是被inputSchema投影了的行;
    
    def merge (buffer1 : MutableAggregationBuffer , buffer2 : Row):Unit
    //merge函数负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中;
    
    def evluate ( buffer :Row ): Any                       
    //evaluate函数完成对聚合Buffer值的运算,得到最终的结果
     }
    

    普通的UDF不支持数据的聚合运算。如当要对销售数据执行年度同比计算,就需要对当年和上一年的销量分别求和,然后再利用同比公式进行计算。
    书写UDAF 先继承UserDefinedAggregateFunction接口
    在重写他的方法
    def update ( buffer : MutableAggregationBuffer , input :Row) :Unit
    // UDAF的核心计算都发生在update函数中。
    // 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
    // update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
    // update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
    // 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期

    案例

    class YearOnYearBasis extends UserDefinedAggregateFunction {
      // UDAF与DataFrame列有关的输入样式
      override def inputSchema: StructType 
      				= new StructType()
    				  .add("sales", DoubleType)
    				  .add("saledate", StringType)
    
      // UDAF函数的返回值类型
      override def dataType: DataType = DoubleType
    
      // 缓存中间结果
      override def bufferSchema: StructType 
      					= new StructType()
      					.add("year2014", DoubleType)
      					.add("year2015", DoubleType)
    
      // 布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。通常用true
      override def deterministic: Boolean = true
    
      // initialize就是对聚合运算中间结果的初始化
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0.0
        buffer(1) = 0.0
      }
    
      // UDAF的核心计算都发生在update函数中。
      // 扫描每行数据,都会调用一次update,输入buffer(缓存中间结果)、input(这一行的输入值)
      // update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始
      // update函数的第二个参数input: Row对应的是被inputSchema投影了的行。
      // 本例中每一个input就应该只有两个Field的值,input(0)代表销量,input(1)代表销售日期
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {	  
        val salenumber = input.getAs[Double](0)
        input.getString(1).take(4) match {
          case "2014" => buffer(0) = buffer.getAs[Double](0) + salenumber
          case "2015" => buffer(1) = buffer.getAs[Double](1) + salenumber
          case _ => println("ERROR!")
        }
      }
    
      // 合并两个分区的buffer1、buffer2,将最终结果保存在buffer1中
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
        buffer1(1) = buffer1.getDouble(1) + buffer2.getDouble(1)
      }
    
      // 取出buffer(缓存的值)进行运算,得到最终结果
      override def evaluate(buffer: Row): Double = {
        println(s"evaluate : ${buffer.getDouble(0)}, ${buffer.getDouble(1)}")
        if (buffer.getDouble(0) == 0.0) 0.0
        else (buffer.getDouble(1) - buffer.getDouble(0)) / buffer.getDouble(0)
      }
    }
    
    object UDAFDemo {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.WARN)
        val spark = SparkSession.builder()
          .appName(s"${this.getClass.getCanonicalName}")
          .master("local[*]")
          .getOrCreate()
    
        val sales = Seq(
          (1, "Widget Co",        1000.00, 0.00,    "AZ", "2014-01-02"),
          (2, "Acme Widgets",     2000.00, 500.00,  "CA", "2014-02-01"),
          (3, "Widgetry",         1000.00, 200.00,  "CA", "2015-01-11"),
          (4, "Widgets R Us",     2000.00, 0.0,     "CA", "2015-02-19"),
          (5, "Ye Olde Widgete",  3000.00, 0.0,     "MA", "2015-02-28") )
    
        val salesDF = spark.createDataFrame(sales).toDF("id", "name", "sales", "discount", "state", "saleDate")
        salesDF.createTempView("sales")
    
        val yearOnYear = new YearOnYearBasis
        spark.udf.register("yearOnYear", yearOnYear)
        spark.sql("select yearOnYear(sales, saleDate) as yearOnYear from sales").show()
    
        spark.stop()
      }
    }
    

    在这里插入图片描述

    六. 从MySQL读取数据

    // 读取数据库中的数据
    val jdbcDF = spark.read.format("jdbc").
    				option("url", "jdbc:mysql://localhost:3306/spark").
    				option("driver","com.mysql.jdbc.Driver").
    				option("dbtable", "student").
    				option("user", "hive").
    				option("password", "hive").load()
    jdbcDF.show
    jdbcDF.printSchema
    

    备注:
    1、将jdbc驱动拷贝到$SPARK_HOME/jars目录下,是最简单的做法;
    2、明白每一个参数的意思,一个参数不对整个结果出不来;
    3、从数据库从读大量的数据进行分析,不推荐;读取少量的数据是可以接受的,也是常见的做法。

    在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述

    展开全文
  • Spark SQL分批入库

    2018-04-13 12:46:58
    List<row> list= spark.sql(sql).collectAsList(),获或者其他方法将数据存在List里面,然后就list转为 Dataset分批入库
  • Spark SQL Introduction

    2018-07-01 17:02:17
    关于spark sql的英文讲义,通过讲义的学习,可以对spark sql有一定的了解
  • Baidu基于Spark SQL构建即席查询平台
  • Spark SQL源码概览.zip

    2020-01-12 17:22:15
    Spark SQL源码概览.zip Spark SQL源码概览.zip Spark SQL源码概览.zip Spark SQL源码概览.zipSpark SQL源码概览.zip
  • the basics of Spark SQL and its role in Spark applications. After the initial familiarization with Spark SQL, we will focus on using Spark SQL to execute tasks that are common to all big data projects
  • hive表已经创建好了,详见: hive实例:搜狗用户搜索日志 配置: 1. 把core-site.xml和hive-site.xml复制到spark的conf目录下 ...2. 把mysql-connector-java-5.1.47-bin.jar复制到spark的jars目录...

     

    hive表已经创建好了,详见: hive实例:搜狗用户搜索日志

     

    配置:

    1. 把core-site.xml和hive-site.xml复制到spark的conf目录下

    core-site.xml在hadoop的配置目录下,hive-site.xml在hive的配置目录下

    2. 把mysql-connector-java-5.1.47-bin.jar复制到spark的jars目录

    3. 修改spark的hive-site.xml

    /spark/conf/hive-site.xml,加上

    <property>

    <name>hive.metastore.uris</name>

    <value>thrift://hadoop01:9083</value>

    </property>

    <property>

    <name>hive.metastore.schema.verification</name>

    <value>false</value>

    </property>

    解释:

    hive.metastore.uris:sparksql 连接到这里,这里是hive的metastore,用于获取hive表

    另一个是禁用metastore的版本检测

    4.开启hive的metastore元数据库

    spark sql想要使用hive的表,还需要hive开启metastore

    hive --service metastore &

    启动后放后台就可以,供spark sql使用

    或者用hive --service metastore,没研究他们的区别,好像一样

     

     

    如果启动metastore时遇到版本不一致的错误

    MetaException(message:Hive Schema version 2.3.0 does not match metastore's schema version 1.2.0 Metastore is not upgraded or corrupt)

    就修改hive的配置文件/hive/conf/hive-site.xml,

    也加上禁用metastore的版本检测即可

    <property>

    <name>hive.metastore.schema.verification</name>

    <value>false</value>

    </property>

     

     

     

    下面的常用操作还可以参考一下官方文档:

    http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

     

    使用:

    启动spark shell:

    [root@hadoop01 ~]# spark-shell

    scala>

     

    测试:

    (1)查看hive的元数据仓库metastore有啥数据库

    scala> spark.sql("show databases").show

    +------------+

    |databaseName|

    +------------+

    | default|

    | hive|

    +------------+

    scala>

     

    (2)查看hive的元数据仓库metastore有啥表

    scala> val priors = spark.sql("show tables")

    priors: org.apache.spark.sql.DataFrame = [database: string, tableName: string ... 1 more field]

    scala> priors.show

    +--------+---------+-----------+

    |database|tableName|isTemporary|

    +--------+---------+-----------+

    | default| sogou| false|

    +--------+---------+-----------+

    scala>

     

    (3)查看sogou表有多少条数据

    scala> val x = spark.sql("select count(*) from sogou")

    x: org.apache.spark.sql.DataFrame = [count(1): bigint]

    scala> x.show

    +--------+

    |count(1)|

    +--------+

    | 10000|

    +--------+

    scala>

     

    (4)新建一张表

    scala> spark.sql("create table if not exists people(id int,name string)")

    res4: org.apache.spark.sql.DataFrame = []

    scala> spark.sql("show tables").show()

    +--------+---------+-----------+

    |database|tableName|isTemporary|

    +--------+---------+-----------+

    | default| people| false|

    | default| sogou| false|

    +--------+---------+-----------+

    scala>

     

    (5)插入数据

    scala> spark.sql("insert into people values (1,'a'),(2,'b'),(3,'c')")

    19/05/28 17:29:48 ERROR KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!

    res7: org.apache.spark.sql.DataFrame = []

    scala> spark.sql("select * from people").show()

    +---+----+

    | id|name|

    +---+----+

    | 1| a|

    | 2| b|

    | 3| c|

    +---+----+

    scala>

     

     

    (6)导入本地数据

    本地文件/test/age.txt为:

    a,18

    b,24

    c,22

    新建一个表,以逗号分割(sql语句参考hive):

    spark.sql("create table if not exists people_age(name string,age int) row format delimited fields terminated by ','")

    导入(sql语句参考hive):

    scala> spark.sql("load data local inpath '/test/age.txt' into table people_age")

    19/05/28 23:20:51 ERROR KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!

    res10: org.apache.spark.sql.DataFrame = []

    scala> spark.sql("select * from people_age").show

    +----+---+

    |name|age|

    +----+---+

    | a| 18|

    | b| 24|

    | c| 22|

    +----+---+

    scala>

     

    (7)连表查询,筛选并倒序输出

    scala> spark.sql("select people.id,people.name,people_age.age from people join people_age on people.name=people_age.name where people_age.age>18 order by age desc").show

    +---+----+---+

    | id|name|age|

    +---+----+---+

    | 2| b| 24|

    | 3| c| 22|

    +---+----+---+

    scala>

    如果想要设置只输出n条,再加上limit n

     

    (8)保存为表,存在本地

    val resultDF = spark.sql("select people.id,people.name,people_age.age from people join people_age on people.name=people_age.name where people_age.age>18 order by age")

    保存为永久表:

    resultDF.write.saveAsTable("people_result")

    scala> spark.sql("show tables").show

    +--------+-------------+-----------+

    |database| tableName|isTemporary|

    +--------+-------------+-----------+

    | default| people| false|

    | default| people_age| false|

    | default|people_result| false|

    | default| sogou| false|

    +--------+-------------+-----------+

     

    可以直接用spark的table方法加载已经保存的永久表:

    scala> spark.table("people_result").show

    +---+----+---+

    | id|name|age|

    +---+----+---+

    | 3| c| 22|

    | 2| b| 24|

    +---+----+---+

    scala>

     

     

     

    以上都是使用sql语句,也可以使用dsl语句,还有rdd的相关操作

    首先创建一个dataframe:

    scala> val df = spark.sql("select * from people_result")

    df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

    或者:

    scala> val df = spark.table("people_result")

    df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

    scala> df.show

    +---+----+---+

    | id|name|age|

    +---+----+---+

    | 3| c| 22|

    | 2| b| 24|

    +---+----+---+

    dsl语句

    (1)查询、筛选、过滤

    scala> df.select("name").show //df.select("*").show

    +----+

    |name|

    +----+

    | c|

    | b|

    +----+

    scala> df.where("id=2").show //df.select("name").where("id=2").show

    +---+----+---+

    | id|name|age|

    +---+----+---+

    | 2| b| 24|

    +---+----+---+

    scala> df.filter("age<23").show

    +---+----+---+

    | id|name|age|

    +---+----+---+

    | 3| c| 22|

    +---+----+---+

    (3)排序

    scala> df.sort("id").show //df.sort($"id".desc).show

    +---+----+---+

    | id|name|age|

    +---+----+---+

    | 2| b| 24|

    | 3| c| 22|

    +---+----+---+

    (4)toDF新建dataframe,toDF方法可以指定列名

    scala> val df_new = df.toDF

    df_new: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

    scala> df_new.show

    +---+----+---+

    | id|name|age|

    +---+----+---+

    | 3| c| 22|

    | 2| b| 24|

    +---+----+---+

    toDF方法可以指定列名

    scala> val df_new = df.toDF("newTD","newName","newAge")

    df_new: org.apache.spark.sql.DataFrame = [newTD: int, newName: string ... 1 more field]

    scala> df_new.show

    +-----+-------+------+

    |newTD|newName|newAge|

    +-----+-------+------+

    | 3| c| 22|

    | 2| b| 24|

    +-----+-------+------+

    scala>

     

     

    rdd操作:

    dataframe也是rdd,是对rdd的封装,相当于rdd+表头schema

    collect、count、cache、filter、sort、join、sample等 rdd操作都是可用的

    scala> df.collect

    res35: Array[org.apache.spark.sql.Row] = Array([3,c,22], [2,b,24])

    scala> df.take(2)

    res36: Array[org.apache.spark.sql.Row] = Array([3,c,22], [2,b,24])

    scala> df.first

    res38: org.apache.spark.sql.Row = [3,c,22]

    scala> df.count

    res39: Long = 2

    scala> df.cache

    res40: df.type = [id: int, name: string ... 1 more field]

    scala> df.sample(0.5).show //df.sample(false,0.5).show false代表不放回抽样

    +---+----+---+

    | id|name|age|

    +---+----+---+

    | 2| b| 24|

    +---+----+---+

     

    Dataframe转换为RDD:

    val rdd1 = df.rdd

     

     

     

    spark自带的操作hive例子在:

    examples/src/main/scala/org/apache/spark/examples/sql/

    SparkHiveExample.scala

    展开全文
  • Spark调优 | Spark SQL参数调优

    万次阅读 2019-07-26 09:45:29
    Spark SQL里面有很多的参数,而且这些参数在Spark官网中没有明确的解释,可能是太多了吧,可以通过在spark-sql中使用set -v 命令显示当前spark-sql版本支持的参数。 本文讲解最近关于在参与hive往spark迁移过程中...

    前言
    Spark SQL里面有很多的参数,而且这些参数在Spark官网中没有明确的解释,可能是太多了吧,可以通过在spark-sql中使用set -v 命令显示当前spark-sql版本支持的参数。

    本文讲解最近关于在参与hive往spark迁移过程中遇到的一些参数相关问题的调优。

    内容分为两部分,第一部分讲遇到异常,从而需要通过设置参数来解决的调优;第二部分讲用于提升性能而进行的调优。

    异常调优
    spark.sql.hive.convertMetastoreParquet
    parquet是一种列式存储格式,可以用于spark-sql 和hive 的存储格式。在spark中,如果使用using parquet的形式创建表,则创建的是spark 的DataSource表;而如果使用stored as parquet则创建的是hive表。

    spark.sql.hive.convertMetastoreParquet默认设置是true, 它代表使用spark-sql内置的parquet的reader和writer(即进行反序列化和序列化),它具有更好地性能,如果设置为false,则代表使用 Hive的序列化方式。

    但是有时候当其设置为true时,会出现使用hive查询表有数据,而使用spark查询为空的情况.

    但是,有些情况下在将spark.sql.hive.convertMetastoreParquet设为false,可能发生以下异常(spark-2.3.2)。

    java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable
        at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36)
    

    这是因为在其为false时候,是使用hive-metastore使用的元数据进行读取数据,而如果此表是使用spark sql DataSource创建的parquet表,其数据类型可能出现不一致的情况,例如通过metaStore读取到的是IntWritable类型,其创建了一个WritableIntObjectInspector用来解析数据,而实际上value是LongWritable类型,因此出现了类型转换异常。

    与该参数相关的一个参数是spark.sql.hive.convertMetastoreParquet.mergeSchema, 如果也是true,那么将会尝试合并各个parquet 文件的schema,以使得产生一个兼容所有parquet文件的schema。

    spark.sql.files.ignoreMissingFiles && spark.sql.files.ignoreCorruptFiles
    这两个参数是只有在进行spark DataSource 表查询的时候才有效,如果是对hive表进行操作是无效的。

    在进行spark DataSource 表查询时候,可能会遇到非分区表中的文件缺失/corrupt 或者分区表分区路径下的文件缺失/corrupt 异常,这时候加这两个参数会忽略这两个异常,这两个参数默认都是false,建议在线上可以都设为true.

    其源码逻辑如下,简单描述就是如果遇到FileNotFoundException, 如果设置了ignoreMissingFiles=true则忽略异常,否则抛出异常;如果不是FileNotFoundException 而是IOException(FileNotFoundException的父类)或者RuntimeException,则认为文件损坏,如果设置了ignoreCorruptFiles=true则忽略异常。

    catch {
    case e: FileNotFoundException if ignoreMissingFiles =>
      logWarning(s"Skipped missing file: $currentFile", e)
      finished = true
      null
    // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
    case e: FileNotFoundException if !ignoreMissingFiles => throw e
    case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
      logWarning(
      s"Skipped the rest of the content in the corrupted file: $currentFile", e)
      finished = true
      null
      }
    

    spark.sql.hive.verifyPartitionPath
    上面的两个参数在分区表情况下是针对分区路径存在的情况下,分区路径下面的文件不存在或者损坏的处理。而有另一种情况就是这个分区路径都不存在了。这时候异常信息如下:

    java.io.FileNotFoundException: File does not exist: hdfs://hz-cluster10/user/da_haitao/da_hivesrc/haitao_dev_log/integ_browse_app_dt/day=2019-06-25/os=Android/000067_0
    

    而spark.sql.hive.verifyPartitionPath参数默认是false,当设置为true的时候会在获得分区路径时对分区路径是否存在做一个校验,过滤掉不存在的分区路径,这样就会避免上面的错误。

    spark.files.ignoreCorruptFiles && spark.files.ignoreMissingFiles
    这两个参数和上面的spark.sql.files.ignoreCorruptFiles很像,但是区别是很大的。在spark进行DataSource表查询时候spark.sq.files.*才会生效,而spark如果查询的是一张hive表,其会走HadoopRDD这条执行路线。

    所以就会出现,即使你设置了spark.sql.files.ignoreMissingFiles的情况下,仍然报FileNotFoundException的情况,异常栈如下, 可以看到这里面走到了HadoopRDD,而且后面是org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrappe可见是查询一张hive表。

    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 107052 in stage 914.0 failed 4 times, most recent failure: Lost task 107052.3 in stage 914.0 (TID 387381, hadoop2698.jd.163.org, executor 266): java.io.FileNotFoundException: File does not exist: hdfs://hz-cluster10/user/da_haitao/da_hivesrc/haitao_dev_log/integ_browse_app_dt/day=2019-06-25/os=Android/000067_0
            at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
            at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
            at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
            at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
            at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
            at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:371)
            at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.getSplit(ParquetRecordReaderWrapper.java:252)
            at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:99)
            at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
            at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
            at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
    

    此时可以将spark.files.ignoreCorruptFiles && spark.files.ignoreMissingFiles设为true,其代码逻辑和上面的spark.sql.file.*逻辑没明显区别,此处不再赘述。

    性能调优

    除了遇到异常需要被动调整参数之外,我们还可以主动调整参数从而对性能进行调优。

    spark.hadoopRDD.ignoreEmptySplits
    默认是false,如果是true,则会忽略那些空的splits,减小task的数量。

    spark.hadoop.mapreduce.input.fileinputformat.split.minsize
    是用于聚合input的小文件,用于控制每个mapTask的输入文件,防止小文件过多时候,产生太多的task.

    spark.sql.autoBroadcastJoinThreshold && spark.sql.broadcastTimeout
    用于控制在spark sql中使用BroadcastJoin时候表的大小阈值,适当增大可以让一些表走BroadcastJoin,提升性能,但是如果设置太大又会造成driver内存压力,而broadcastTimeout是用于控制Broadcast的Future的超时时间,默认是300s,可根据需求进行调整。

    spark.sql.adaptive.enabled && spark.sql.adaptive.shuffle.targetPostShuffleInputSize
    该参数是用于开启spark的自适应执行,这是spark比较老版本的自适应执行,后面的targetPostShuffleInputSize是用于控制之后的shuffle 阶段的平均输入数据大小,防止产生过多的task。

    intel大数据团队开发的adaptive-execution相较于目前spark的ae更加实用,该特性也已经加入到社区3.0之后的roadMap中,令人期待。

    spark.sql.parquet.mergeSchema
    默认false。当设为true,parquet会聚合所有parquet文件的schema,否则是直接读取parquet summary文件,或者在没有parquet summary文件时候随机选择一个文件的schema作为最终的schema。

    spark.sql.files.opencostInBytes
    该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。

    spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
    1或者2,默认是1. MapReduce-4815 详细介绍了 fileoutputcommitter 的原理,实践中设置了 version=2 的比默认 version=1 的减少了70%以上的 commit 时间,但是1更健壮,能处理一些情况下的异常。

    Spark SQL 参数表(spark-2.3.2)

    keyvaluemeaning
    spark.sql.adaptive.enabledTRUEWhen true, enable adaptive query execution.
    spark.sql.adaptive.shuffle.targetPostShuffleInputSize67108864bThe target post-shuffle input size in bytes of a task.
    spark.sql.autoBroadcastJoinThreshold209715200Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscanhas been run, and file-based data source tables where the statistics are computed directly on the files of data.
    spark.sql.broadcastTimeout300000msTimeout in seconds for the broadcast wait time in broadcast joins.
    spark.sql.cbo.enabledFALSEEnables CBO for estimation of plan statistics when set true.
    spark.sql.cbo.joinReorder.dp.star.filterFALSEApplies star-join filter heuristics to cost based join enumeration.
    spark.sql.cbo.joinReorder.dp.threshold12The maximum number of joined nodes allowed in the dynamic programming algorithm.
    spark.sql.cbo.joinReorder.enabledFALSEEnables join reorder in CBO.
    spark.sql.cbo.starSchemaDetectionFALSEWhen true, it enables join reordering based on star schema detection.
    spark.sql.columnNameOfCorruptRecord_corrupt_recordThe name of internal column for storing raw/un-parsed JSON and CSV records that fail to parse.
    spark.sql.crossJoin.enabledTRUEWhen false, we will throw an error if a query contains a cartesian product without explicit CROSS JOIN syntax.
    spark.sql.execution.arrow.enabledFALSEWhen true, make use of Apache Arrow for columnar data transfers. Currently available for use with pyspark.sql.DataFrame.toPandas, and pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame. The following data types are unsupported: BinaryType, MapType, ArrayType of TimestampType, and nested StructType.
    spark.sql.execution.arrow.maxRecordsPerBatch10000When using Apache Arrow, limit the maximum number of records that can be written to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.
    spark.sql.extensionsName of the class used to configure Spark Session extensions. The class should implement Function1[SparkSessionExtension, Unit], and must have a no-args constructor.
    spark.sql.files.ignoreCorruptFilesFALSEWhether to ignore corrupt files. If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned.
    spark.sql.files.ignoreMissingFilesFALSEWhether to ignore missing files. If true, the Spark jobs will continue to run when encountering missing files and the contents that have been read will still be returned.
    spark.sql.files.maxPartitionBytes134217728The maximum number of bytes to pack into a single partition when reading files.
    spark.sql.files.maxRecordsPerFile0Maximum number of records to write out to a single file. If this value is zero or negative, there is no limit.
    spark.sql.function.concatBinaryAsStringFALSEWhen this option is set to false and all inputs are binary,functions.concat returns an output as binary. Otherwise, it returns as a string.
    spark.sql.function.eltOutputAsStringFALSEWhen this option is set to false and all inputs are binary, elt returns an output as binary. Otherwise, it returns as a string.
    spark.sql.groupByAliasesTRUEWhen true, aliases in a select list can be used in group by clauses. When false, an analysis exception is thrown in the case.
    spark.sql.groupByOrdinalTRUEWhen true, the ordinal numbers in group by clauses are treated as the position in the select list. When false, the ordinal numbers are ignored.
    spark.sql.hive.caseSensitiveInferenceModeINFER_AND_SAVESets the action to take when a case-sensitive schema cannot be read from a Hive table’s properties. Although Spark SQL itself is not case-sensitive, Hive compatible file formats such as Parquet are. Spark SQL must use a case-preserving schema when querying any table backed by files containing case-sensitive field names or queries may not return accurate results. Valid options include INFER_AND_SAVE (the default mode– infer the case-sensitive schema from the underlying data files and write it back to the table properties), INFER_ONLY (infer the schema but don’t attempt to write it to the table properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).
    spark.sql.hive.convertMetastoreParquetTRUEWhen set to true, the built-in Parquet reader and writer are used to process parquet tables created by using the HiveQL syntax, instead of Hive serde.
    spark.sql.hive.convertMetastoreParquet.mergeSchemaFALSEWhen true, also tries to merge possibly different but compatible Parquet schemas in different Parquet data files. This configuration is only effective when “spark.sql.hive.convertMetastoreParquet” is true.
    spark.sql.hive.filesourcePartitionFileCacheSize262144000When nonzero, enable caching of partition file metadata in memory. All tables share a cache that can use up to specified num bytes for file metadata. This conf only has an effect when hive filesource partition management is enabled.
    spark.sql.hive.manageFilesourcePartitionsTRUEWhen true, enable metastore partition management for file source tables as well. This includes both datasource and converted Hive tables. When partition management is enabled, datasource tables store partition in the Hive metastore, and use the metastore to prune partitions during query planning.
    spark.sql.hive.metastore.barrierPrefixesA comma separated list of class prefixes that should explicitly be reloaded for each version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a prefix that typically would be shared (i.e. org.apache.spark.*).
    spark.sql.hive.metastore.jarsbuiltinLocation of the jars that should be used to instantiate the HiveMetastoreClient. This property can be one of three options: “ 1. “builtin” Use Hive 1.2.1, which is bundled with the Spark assembly when -Phive is enabled. When this option is chosen, spark.sql.hive.metastore.versionmust be either 1.2.1 or not defined. 2. “maven” Use Hive jars of specified version downloaded from Maven repositories. 3. A classpath in the standard format for both Hive and Hadoop.
    spark.sql.hive.metastore.sharedPrefixescom.mysql.jdbc,A comma separated list of class prefixes that should be loaded using the classloader that is shared between Spark SQL and a specific version of Hive. An example of classes that should be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need to be shared are those that interact with classes that are already shared. For example, custom appenders that are used by log4j.
    org.postgresql,
    com.microsoft.sqlserver,
    oracle.jdbc
    spark.sql.hive.metastore.version1.2.1Version of the Hive metastore. Available options are0.12.0 through 2.1.1.
    spark.sql.hive.metastorePartitionPruningTRUEWhen true, some predicates will be pushed down into the Hive metastore so that unmatching partitions can be eliminated earlier. This only affects Hive tables not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and HiveUtils.CONVERT_METASTORE_ORC for more information).
    spark.sql.hive.thriftServer.asyncTRUEWhen set to true, Hive Thrift server executes SQL queries in an asynchronous way.
    spark.sql.hive.thriftServer.singleSessionFALSEWhen set to true, Hive Thrift server is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.
    spark.sql.hive.verifyPartitionPathFALSEWhen true, check all the partition paths under the table’s root directory when reading data stored in HDFS.
    spark.sql.hive.version1.2.1deprecated, please use spark.sql.hive.metastore.version to get the Hive version in Spark.
    spark.sql.inMemoryColumnarStorage.batchSize10000Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
    spark.sql.inMemoryColumnarStorage.compressedTRUEWhen set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.
    spark.sql.inMemoryColumnarStorage.enableVectorizedReaderTRUEEnables vectorized reader for columnar caching.
    spark.sql.optimizer.metadataOnlyTRUEWhen true, enable the metadata-only query optimization that use the table’s metadata to produce the partition columns instead of table scans. It applies when all the columns scanned are partition columns and the query has an aggregate operator that satisfies distinct semantics.
    spark.sql.orc.compression.codecsnappySets the compression codec used when writing ORC files. If either compression or orc.compress is specified in the table-specific options/properties, the precedence would be compression, orc.compress,spark.sql.orc.compression.codec.Acceptable values include: none, uncompressed, snappy, zlib, lzo.
    spark.sql.orc.enableVectorizedReaderTRUEEnables vectorized orc decoding.
    spark.sql.orc.filterPushdownFALSEWhen true, enable filter pushdown for ORC files.
    spark.sql.orderByOrdinalTRUEWhen true, the ordinal numbers are treated as the position in the select list. When false, the ordinal numbers in order/sort by clause are ignored.
    spark.sql.parquet.binaryAsStringFALSESome other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
    spark.sql.parquet.compression.codecsnappySets the compression codec used when writing Parquet files. If either compression or parquet.compression is specified in the table-specific options/properties, the precedence would be compression,parquet.compression, spark.sql.parquet.compression.codec. Acceptable values include: none, uncompressed, snappy, gzip, lzo.
    spark.sql.parquet.enableVectorizedReaderTRUEEnables vectorized parquet decoding.
    spark.sql.parquet.filterPushdownTRUEEnables Parquet filter push-down optimization when set to true.
    spark.sql.parquet.int64AsTimestampMillisFALSE(Deprecated since Spark 2.3, please set spark.sql.parquet.outputTimestampType.) When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the extended type. In this mode, the microsecond portion of the timestamp value will betruncated.
    spark.sql.parquet.int96AsTimestampTRUESome Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
    spark.sql.parquet.int96TimestampConversionFALSEThis controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different timezone offset than Hive & Spark.
    spark.sql.parquet.mergeSchemaFALSEWhen true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.
    spark.sql.parquet.outputTimestampTypeINT96Sets which Parquet timestamp type to use when Spark writes data to Parquet files. INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS is a standard timestamp type in Parquet, which stores number of microseconds from the Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which means Spark has to truncate the microsecond portion of its timestamp value.
    spark.sql.parquet.recordLevelFilter.enabledFALSEIf true, enables Parquet’s native record-level filtering using the pushed down filters. This configuration only has an effect when ‘spark.sql.parquet.filterPushdown’ is enabled.
    spark.sql.parquet.respectSummaryFilesFALSEWhen true, we make assumption that all part-files of Parquet are consistent with summary files and we will ignore them when merging schema. Otherwise, if this is false, which is the default, we will merge all part-files. This should be considered as expert-only option, and shouldn’t be enabled before knowing what it means exactly.
    spark.sql.parquet.writeLegacyFormatFALSEWhether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior versions, when converting Parquet schema to Spark SQL schema and vice versa.
    spark.sql.parser.quotedRegexColumnNamesFALSEWhen true, quoted Identifiers (using backticks) in SELECT statement are interpreted as regular expressions.
    spark.sql.pivotMaxValues10000When doing a pivot without specifying values for the pivot column this is the maximum number of (distinct) values that will be collected without error.
    spark.sql.queryExecutionListenersList of class names implementing QueryExecutionListener that will be automatically added to newly created sessions. The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument.
    spark.sql.redaction.options.regex(?i)urlRegex to decide which keys in a Spark SQL command’s options map contain sensitive information. The values of options whose names that match this regex will be redacted in the explain output. This redaction is applied on top of the global redaction configuration defined by spark.redaction.regex.
    spark.sql.redaction.string.regexRegex to decide which parts of strings produced by Spark contain sensitive information. When this regex matches a string part, that string part is replaced by a dummy value. This is currently used to redact the output of SQL explain commands. When this conf is not set, the value fromspark.redaction.string.regex is used.
    spark.sql.session.timeZoneAsia/ShanghaiThe ID of session local timezone, e.g. “GMT”, “America/Los_Angeles”, etc.
    spark.sql.shuffle.partitions4096The default number of partitions to use when shuffling data for joins or aggregations.
    spark.sql.sources.bucketing.enabledTRUEWhen false, we will treat bucketed table as normal table
    spark.sql.sources.defaultparquetThe default data source to use in input/output.
    spark.sql.sources.parallelPartitionDiscovery.threshold32The maximum number of paths allowed for listing files at driver side. If the number of detected paths exceeds this value during partition discovery, it tries to list the files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and LibSVM data sources.
    spark.sql.sources.partitionColumnTypeInference.enabledTRUEWhen true, automatically infer the data types for partitioned columns.
    spark.sql.sources.partitionOverwriteModeSTATICWhen INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. In static mode, Spark deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. In dynamic mode, Spark doesn’t delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. By default we use static mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn’t affect Hive serde tables, as they are always overwritten with dynamic mode.
    spark.sql.statistics.fallBackToHdfsTRUEIf the table statistics are not available from table metadata enable fall back to hdfs. This is useful in determining if a table is small enough to use auto broadcast joins.
    spark.sql.statistics.histogram.enabledFALSEGenerates histograms when computing column statistics if enabled. Histograms can provide better estimation accuracy. Currently, Spark only supports equi-height histogram. Note that collecting histograms takes extra cost. For example, collecting column statistics usually takes only one table scan, but generating equi-height histogram will cause an extra table scan.
    spark.sql.statistics.size.autoUpdate.enabledFALSEEnables automatic update for table size once table’s data is changed. Note that if the total number of files of the table is very large, this can be expensive and slow down data change commands.
    spark.sql.streaming.checkpointLocationThe default location for storing checkpoint data for streaming queries.
    spark.sql.streaming.metricsEnabledFALSEWhether Dropwizard/Codahale metrics will be reported for active streaming queries.
    spark.sql.streaming.numRecentProgressUpdates100The number of progress updates to retain for a streaming query
    spark.sql.thriftserver.scheduler.poolSet a Fair Scheduler pool for a JDBC client session.
    spark.sql.thriftserver.ui.retainedSessions200The number of SQL client sessions kept in the JDBC/ODBC web UI history.
    spark.sql.thriftserver.ui.retainedStatements200The number of SQL statements kept in the JDBC/ODBC web UI history.
    spark.sql.ui.retainedExecutions1000Number of executions to retain in the Spark UI.
    spark.sql.variable.substituteTRUEThis enables substitution using syntax like ${var} ${system:var} and ${env:var}.
    spark.sql.warehouse.dir/user/warehouseThe default location for managed databases and tables.
    展开全文
  • Spark SQL 2.3.0:深入浅出,看了下,还行,希望对大家有帮助
  • spark sql的练习题

    千次阅读 2020-04-18 16:51:31
    2.2、统计出姓“王”男生和女生的各有多少人 3、请使用Structured Streaming读取department_info文件夹写的csv文件 3.1统计出各个院系的分别多少条信息 4、请使用spark sql读取student_score文件夹写的csv文件 4.1、...

    1、使用Structured Streaming读取Socket数据,把单词和单词的反转组成 json 格式写入到当前目录中的file文件夹中
    2、请使用Structured Streaming读取student_info文件夹写的csv文件,
    2.1、统计出文件中的男女生各有多少人
    2.2、统计出姓“王”男生和女生的各有多少人
    3、请使用Structured Streaming读取department_info文件夹写的csv文件
    3.1统计出各个院系的分别多少条信息
    4、请使用spark sql读取student_score文件夹写的csv文件
    4.1、统计出每个班级的最高分数
    4.2、统计出男生最高分
    4.3、统计出女生最高分
    4.4、分别统计出男生和女生的分数前三名
    4.5、统计出分数在500分以上的人数
    4.7、统计出分数在300分以下的人中男女各占多少
    5.请使用Spark sql读取class_info文件夹写的csv文件
    5.1、统计出哪个院系的专业最多
    5.2、统计出计算机学院中有多少专业
    5.3、统计出经济管理学院的会计和工商管理的班级数
    5.4、分别统计出每个学院的班级最多的专业
    5.5、统计出班级编号以2开头的所有的专业名称
    以下是sparksql的练习题
    表(一)Student (学生表)
    属性名 数据类型 可否为空 含 义
    Sno varchar (20) 否 学号
    Sname varchar (20) 否 学生姓名
    Ssex varchar (20) 否 学生性别
    Sbirthday datetime 可 学生出生年月
    Class varchar (20) 可 学生所在班级
    表(二)Course(课程表)
    属性名 数据类型 可否为空 含 义
    Cno varchar (20) 否 课程号
    Cname varchar (20) 否 课程名称
    Tno varchar (20) 否 教工编号
    表(三)Score(成绩表)
    属性名 数据类型 可否为空 含 义
    Sno varchar (20) 否 学号
    Cno varchar (20) 否 课程号
    Degree Decimal(4,1) 可 成绩
    表(四)Teacher(教师表)
    属性名 数据类型 可否为空 含 义
    Tno varchar (20) 否 教工编号
    Tname varchar (20) 否 教工姓名
    Tsex varchar (20) 否 教工性别
    Tbirthday datetime 可 教工出生年月
    Prof varchar (20) 可 职称
    Depart varchar (20) 否 教工所在部门
    表1-2数据库中的数据
    表(一)Student
    Sno Sname Ssex Sbirthday class
    108 丘东 男 1977-09-01 95033
    105 匡明 男 1975-10-02 95031
    107 王丽 女 1976-01-23 95033
    101 李军 男 1976-02-20 95033
    109 王芳 女 1975-02-10 95031
    103 陆君 男 1974-06-03 95031
    表(二)Course
    Cno Cname Tno
    3-105 计算机导论 825
    3-245 操作系统 804
    6-166 数字电路 856
    9-888 高等数学 831
    表(三)Score
    Sno Cno Degree
    103 3-245 86
    105 3-245 75
    109 3-245 68
    103 3-105 92
    105 3-105 88
    109 3-105 76
    101 3-105 64
    107 3-105 91
    108 3-105 78
    101 6-166 85
    107 6-166 79
    108 6-166 81
    表(四)Teacher
    Tno Tname Tsex Tbirthday Prof Depart
    804 李诚 男 1958-12-02 副教授 计算机系
    856 张旭 男 1969-03-12 讲师 电子工程系
    825 王萍 女 1972-05-05 助教 计算机系
    831 刘冰 女 1977-08-14 助教 电子工程系
    6、查询Student表中“95031”班或性别为“女”的同学记录。
    7、以Class降序,升序查询Student表的所有记录。
    8、以Cno升序、Degree降序查询Score表的所有记录。
    9、查询“95031”班的学生。
    10、查询Score表中的最高分的学生学号和课程号。(子查询或者排序)
    11、查询每门课的平均成绩。
    12、查询Score表中至少有5名学生选修的并以3开头的课程的平均分数。
    13、查询分数大于70,小于90的Sno列。
    14、查询所有学生的Sname、Cno和Degree列。
    15、查询所有学生的Sno、Cname和Degree列。
    16、查询所有学生的Sname、Cname和Degree列。
    17、查询“95033”班学生的平均分。
    18、查询所有选修“计算机导论”课程的“女”同学的成绩表。
    19、查询选修“3-105”课程的成绩高于“109”号同学成绩的所有同学的记录。
    20、查询score中选学多门课程的同学中分数为非最高分成绩的记录。
    21、查询成绩高于学号为“109”、课程号为“3-105”的成绩的所有记录。
    22、查询和学号为105的同学同年出生的所有学生的Sno、Sname和Sbirthday列。
    23、查询“张旭“教师任课的学生成绩
    24、查询选修某课程的同学人数多于4人的教师姓名。
    25、查询95033班和95031班全体学生的记录。
    26、查询存在有85分以上成绩的课程Cno.
    27、查询出“计算机系“教师所教课程的成绩表。
    28、查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
    29、查询选修编号为“3-105“课程且成绩至少高于选修编号为“3-245”的同学的Cno、Sno和Degree,并按Degree从高到低次序排序。
    30、查询选修编号为“3-105”且成绩高于选修编号为“3-245”课程的同学的Cno、Sno和Degree.
    31、查询所有教师和同学的name、sex和birthday.
    32、查询所有“女”教师和“女”同学的name、sex和birthday.
    33、查询成绩比该课程平均成绩低的同学的成绩表。
    34、查询所有任课教师的Tname和Depart.
    35、查询所有未讲课的教师的Tname和Depart.
    36、查询至少有2名男生的班号。
    37、查询Student表中不姓“王”的同学记录。
    38、查询Student表中每个学生的姓名和年龄。将函数运用到spark sql中去计算,可以直接拿String的类型计算不需要再转换成数值型 默认是会转换成Double类型计算浮点型转整型
    39、查询Student表中最大和最小的Sbirthday日期值。 时间格式最大值,最小值
    40、以班号和年龄从大到小的顺序查询Student表中的全部记录。 查询结果排序
    41、查询“男”教师及其所上的课程。
    42、查询最高分同学的Sno、Cno和Degree列。
    43、查询和“李军”同性别的所有同学的Sname.
    44、查询和“李军”同性别并同班的同学Sname.
    45、查询所有选修“计算机导论”课程的“男”同学的成绩表。
    46、查询Student表中的所有记录的Sname、Ssex和Class列。
    47、查询教师所有的单位即不重复的Depart列。
    48、查询Student表的所有记录
    49、查询Score表中成绩在60到80之间的所有记录。
    50、查询Score表中成绩为85,86或88的记录。
    代码:

    11、使用Structured Streaming读取Socket数据,把单词和单词的反转组成 json 格式写入到当前目录中的file文件夹中
    object day {
      //1、请使用Structured Streaming读取Socket数据,统计出每个单词的个数
      def main(args: Array[String]): Unit = {
        //1.创建SparkSession
        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("day1").getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        val frame: DataFrame = spark.readStream
          .option("host", "hadoop01")
          .option("port", 9999)
          .format("socket")
          .load()
        import  spark.implicits._
        val dataDS: Dataset[String] = frame.as[String]
        val wordDF = dataDS.flatMap(_.split(" "))
          .map({ x => (x, x.reverse) }).toDF("before", "reverse")
        //wordDF.show()
        //输出数据
        wordDF.writeStream
          .format("json")
          .option("path","F:\\第四学期的大数据资料\\day02四月份资料\\第二周\\day05\\4.16号练习题50道2.0\\file")
          .option("checkpointLocation","json")//必须指定 checkpoint 目录,否则报错
          .trigger(Trigger.ProcessingTime(0))
          .start()
          .awaitTermination()
      }
    }
    2、请使用Structured Streaming读取student_info文件夹写的csv文件,
    2.1、统计出文件中的男女生各有多少人
    2.2、统计出姓“王”男生和女生的各有多少人
    object day2 {
      def main(args: Array[String]): Unit = {
        //创建SparkSession
        val spark: SparkSession =
        SparkSession.builder().master("local[*]").appName("day2").getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        //准备数据结构
        val Schema: StructType = new StructType()
          .add("id", "integer")
          .add("name", "string")
          .add("sex", "string")
          .add("classs", "string")
          .add("date", "string")
        //接受数据
        import spark.implicits._
        val dataDF: DataFrame =
          spark.readStream.schema(Schema).csv("F:\\第四学期的大数据资料\\day02四月份资料\\第二周\\day05\\4.16号练习题50道2.0\\student_info")
        // 根据业务处理和计算数据
        val result: Dataset[Row] =
        //2.1、统计出文件中的男女生各有多少人
          dataDF.selectExpr("sex").groupBy("sex").count().sort($"count".desc)
        //2.2、统计出姓“王”男生和女生的各有多少人
        dataDF.select("name", "sex").where("name like '%王%'")
          .groupBy("sex").count().sort($"count".desc)
        //输出数据
        result.writeStream
          .format("console")
          .outputMode("complete")
          .trigger(Trigger.ProcessingTime(0))
          .start()
          .awaitTermination()
      }
    }
    3、请使用Structured Streaming读取department_info文件夹写的csv文件
    3.1统计出各个院系的分别多少条信息
    def main(args: Array[String]): Unit = {
        //创建SparkSession
        val spark: SparkSession =
          SparkSession.builder().master("local[*]").appName("day2").getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        //准备数据结构
        val Schema: StructType = new StructType()
          .add("id", "integer")
          .add("name", "string")
        //接受数据
        import spark.implicits._
        val dataDF: DataFrame =
          spark.readStream.schema(Schema).csv("F:\\第四学期的大数据资料\\day02四月份资料\\第二周\\day05\\4.16号练习题50道2.0\\department_info")
        // 根据业务处理和计算数据
        val result: Dataset[Row] =
        //3.1统计出各个院系的分别多少条信息
          dataDF.select("name")
            .groupBy("name").count().sort($"count".desc)
        //输出数据
        result.writeStream
          .format("console")
          .outputMode("complete")
          .trigger(Trigger.ProcessingTime(0))
          .start()
          .awaitTermination()
      }
    4、请使用spark sql读取student_score文件夹写的csv文件
    4.1、统计出每个班级的最高分数
    4.2、统计出男生最高分
    4.3、统计出女生最高分
    4.4、分别统计出男生和女生的分数前三名
    4.5、统计出分数在500分以上的人数
    4.7、统计出分数在300分以下的人中男女各占多少
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("day04").getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        val frame: DataFrame = spark.read.csv("F:\\第四学期的大数据资料\\day02四月份资料\\第二周\\day05\\4.16号练习题50道2.0\\student_score")
        import spark.implicits._
        //将RDD转成toDF
        val personDF: DataFrame = frame.toDF("id", "name", "sex", "classs", "score")
        //打印数据
        personDF.createOrReplaceTempView("student")
        //4.1、统计出每个班级的最高分数
        spark.sql("select classs,max(score) from student group by classs ").show()
        //4.2、统计出男生最高分
        spark.sql("select sex,max(score) from student where sex = '男' group by sex").show()
        //4.3、统计出女生最高分
        spark.sql("select sex,max(score) from student where sex = '女' group by sex").show()
        //4.4、分别统计出男生和女生的分数前三名
        spark.sql("select name,classs,score,sex,row_number() over(partition by sex order by score desc) rk from student having rk<=3 ").show()
        //4.5、统计出分数在500分以上的人数
        spark.sql("select count(score) from student where score > 500 ").show()
        //4.7、统计出分数在300分以下的人中男女各占多少
        spark.sql("select sex,count(sex) from (select * from student where score<300)  group by sex").show()
      }
    5.请使用Spark sql读取class_info文件夹写的csv文件
    5.1、统计出哪个院系的专业最多
    5.2、统计出计算机学院中有多少专业
    5.3、统计出经济管理学院的会计和工商管理的班级数
    5.4、分别统计出每个学院的班级最多的专业
    5.5、统计出班级编号以2开头的所有的专业名称
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("day05").getOrCreate()
        val sc: SparkContext = spark.sparkContext
        sc.setLogLevel("WARN")
        val frame: DataFrame = spark.read.csv("F:\\第四学期的大数据资料\\day02四月份资料\\第二周\\day05\\4.16号练习题50道2.0\\class_info")
        import spark.implicits._
        //将RDD转成toDF
        val personDF: DataFrame = frame.toDF("id", "name", "date", "classs")
        //打印数据
        personDF.createOrReplaceTempView("classI")
        //5.1、统计出哪个院系的专业最多
        spark.sql("select classs,max(name) from classI group by classs").show()
        //5.2、统计出计算机学院中有多少专业
         spark.sql("select count(name) from classI where classs ='计算机学院'").show()
        //5.3、统计出经济管理学院的会计和工商管理的班级数
        spark.sql("select subStr(name,0,2),count(*) from classI where classs ='经济管理学院' and name like '会计%' or name like '工商管理%' group by subStr(name,0,2)").show()
        //5.4、分别统计出每个学院的班级最多的专业
        spark.sql("select max(name),classs from classI group by classs").show()
        //5.5、统计出班级编号以2开头的所有的专业名称
        spark.sql("select id,name from classI where id like '_2%'").show()
      }
    以下是sparksql的练习题
    package demo02
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object day006 {
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("day006").master("local").getOrCreate()
        import spark.implicits._
        val jdbcDF1 = spark.read
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8")
          .option("dbtable", "student")
          .option("user", "root")
          .option("password", "root")
          .load()
    
        val frame1: DataFrame = jdbcDF1.toDF()
        frame1.createOrReplaceTempView("student")
    
        val jdbcDF2 = spark.read
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8")
          .option("dbtable", "Score")
          .option("user", "root")
          .option("password", "root")
          .load()
        val frame2: DataFrame = jdbcDF2.toDF()
        frame2.createOrReplaceTempView("Score")
    
        val jdbcDF3 = spark.read
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8")
          .option("dbtable", "Course")
          .option("user", "root")
          .option("password", "root")
          .load()
        val frame3: DataFrame = jdbcDF3.toDF()
        frame3.createOrReplaceTempView("Course")
    
        val jdbcDF4 = spark.read
          .format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8")
          .option("dbtable", "teacher")
          .option("user", "root")
          .option("password", "root")
          .load()
        val frame4: DataFrame = jdbcDF4.toDF()
        frame4.createOrReplaceTempView("teacher")
    
        //6、查询Student表中“95031”班或性别为“女”的同学记录。
         spark.sql("select * from student where  Class = 95031 and Ssex ='女'").show()
        //7、以Class降序,升序查询Student表的所有记录。
         spark.sql("select * from student  order by Class desc").show()
        // 8、以Cno升序、Degree降序查询Score表的所有记录。
          spark.sql("select * from Score order by Cno asc,Degree desc").show()
        //9、查询“95031”班的学生。
         spark.sql("select * from student where Class = 95031").show()
        //    10、查询Score表中的最高分的学生学号和课程号。(子查询或者排序)
         spark.sql("select SNO,CNO from Score where Degree=(select MAX(Degree) from Score)").show()
        //    11、查询每门课的平均成绩。
         spark.sql("select Cno,AVG(Degree)  from Score group by Cno").show()
        //    12、查询Score表中至少有5名学生选修的并以3开头的课程的平均分数。
        spark.sql("select AVG(Degree ) from Score where Cno like '3%' group by Cno having COUNT(Cno)>5").show()
        //    13、查询分数大于70,小于90的Sno列。
         spark.sql("select sno from Score where Degree between 70 and 90").show()
        //    14、查询所有学生的Sname、Cno和Degree列。
         spark.sql("select Sname,Cno,Degree from student join Score on student.Sno=Score.Sno").show()
        //    15、查询所有学生的Sno、Cname和Degree列。
        spark.sql("select Sno,Cname,degree from Score join Course on Course.Cno=Score.Cno").show()
        //    16、查询所有学生的Sname、Cname和Degree列。
        spark.sql("select student.Sname,Cname,degree from student join Score on student.Sno=Score.Sno join Course on Course.Cno=Score.Cno").show()
        //    17、查询“95033”班学生的平均分。
        spark.sql("select AVG(Degree) from Score,student where student.Sno=Score.Sno and Class='95033'").show()
        //    18、查询所有选修“计算机导论”课程的“女”同学的成绩表。
        spark.sql("select Sno,Degree from Score where Sno in (select Sno from student where Ssex='女') and Cno in (select Cno from Course where Cname='计算机导论')").show()
        //    19、查询选修“3-105”课程的成绩高于“109”号同学成绩的所有同学的记录。
        spark.sql("select * from student,Score where Score.Cno='3-105' and student.Sno=Score.Sno and Score.Degree>(select Degree from Score where Cno='3-105' and Sno='109')").show()
        //    20、查询score中选学多门课程的同学中分数为非最高分成绩的记录。
         spark.sql("select * from Score a where Degree <(select MAX(degree) from Score b where a.Cno=b.Cno) and Sno in(select Sno from Score group by Sno having count(*)>1)").show()
        //    21、查询成绩高于学号为“109”、课程号为“3-105”的成绩的所有记录。
        spark.sql("select * from student,Score where student.Sno=Score.Sno and Score.Degree>(select Degree from Score where Cno='3-105' and Sno='109')").show()
        //    22、查询和学号为105的同学同年出生的所有学生的Sno、Sname和Sbirthday列。
        spark.sql("select Sno,Sname,Sbirthday from student where year(student.Sbirthday)=(select year(Sbirthday) from student where Sno='105')").show()
        //    23、查询“张旭“教师任课的学生成绩
        spark.sql("select Degree from Score,Teacher,Course where Teacher.Tname='张旭' and Teacher.Tno=Course.Tno and Course.Cno=Score.Cno").show()
        //    24、查询选修某课程的同学人数多于4人的教师姓名。
        spark.sql("select Tname from Teacher where Tno in (select Tno from Course where Cno in (select Cno from Score group by Cno having COUNT(*)>4))").show()
        //    25、查询95033班和95031班全体学生的记录。
         spark.sql("select * from student where Class='95033' or Class='95031'").show()
        //    26、查询存在有85分以上成绩的课程Cno.
         spark.sql("select distinct cno from Score where Degree>85").show()
        //    27、查询出“计算机系“教师所教课程的成绩表。
        spark.sql("select sno,Cno ,Degree from Score where Cno in (select Cno from Course where Tno in (select tno from Teacher where Depart='计算机系'))").show()
        //    28、查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
        spark.sql("select Tname,Prof from Teacher a where Prof not in(select Prof from Teacher b where a.Depart!=b.Depart)").show()
        //    29、查询选修编号为“3-105“课程且成绩至少高于选修编号为“3-245”的同学的Cno、Sno和Degree,并按Degree从高到低次序排序。
        spark.sql("select Cno,Sno,Degree from Score a where (select Degree from Score b where Cno='3-105' and b.Sno=a.Sno)>=(select Degree from Score c where Cno='3-245' and c.Sno=a.Sno) order by Degree desc").show()
        //    30、查询选修编号为“3-105”且成绩高于选修编号为“3-245”课程的同学的Cno、Sno和Degree.
        spark.sql("select Cno,Sno,Degree from Score a where (select Degree from Score b where Cno='3-105' and b.Sno=a.Sno)>(select Degree from Score c where Cno='3-245' and c.Sno=a.Sno)").show()
        //    31、查询所有教师和同学的name、sex和birthday.
        spark.sql("select distinct Sname as name,Ssex as sex,Sbirthday as birthday from student union select distinct Tname as name,Tsex as sex,Tbirthady as birthday from Teacher").show()
        //    32、查询所有“女”教师和“女”同学的name、sex和birthday.
        spark.sql("select distinct Sname as name,Ssex as sex,Sbirthday as birthday from student where Ssex='女' union select distinct Tname as name,Tsex as sex,Tbirthady as birthday from Teacher where Tsex='女'").show()
        //    33、查询成绩比该课程平均成绩低的同学的成绩表。
        spark.sql("select Sno,Cno,Degree from Score a where a.Degree<(select AVG(Degree) from Score b where a.Cno=b.Cno)").show()
        //    34、查询所有任课教师的Tname和Depart.
        spark.sql("select Tname,Depart from Teacher where tno in (select tno from course where Cno in (select distinct Cno from Score))").show()
        //    35、查询所有未讲课的教师的Tname和Depart.
        spark.sql("select Tname,Depart from Teacher where Tname not in (select distinct Tname from Teacher,Course,Score where Teacher.Tno=Course.Tno and Course.Cno=Score.Cno)").show()
        //    36、查询至少有2名男生的班号。
        spark.sql("select Class FROM student where Ssex='男' group by Class having COUNT(*)>1").show()
        //    37、查询Student表中不姓“王”的同学记录。
         spark.sql("select * from student where Sname not like '王%'").show()
        //    38、查询Student表中每个学生的姓名和年龄。将函数运用到spark sql中去计算,可以直接拿String的类型计算不需要再转换成数值型 默认是会转换成Double类型计算浮点型转整型
         spark.sql("select Sname,YEAR(GETDATE())-year(Sbirthday) from student").show()
        //    39、查询Student表中最大和最小的Sbirthday日期值。 时间格式最大值,最小值
       spark.sql("select MAX(Sbirthday) as 最大,MIN(Sbirthday) as 最小 from student").show()
        //    40、以班号和年龄从大到小的顺序查询Student表中的全部记录。 查询结果排序
         spark.sql("select * from student order by Class desc,Sbirthday asc").show()
        //    41、查询“男”教师及其所上的课程。
         spark.sql("select Tname,Cname from Teacher,Course where Tsex='男' and Teacher.Tno=Course.Tno").show()
        //    42、查询最高分同学的Sno、Cno和Degree列。
        spark.sql("select Sno,Cno,Degree from Score where degree=(select MAX(Degree)from Score)").show()
        //    43、查询和“李军”同性别的所有同学的Sname.
         spark.sql("select Sname from student where Ssex=(select Ssex from student where Sname='李军') and Sname not in ('李军')").show()
        //    44、查询和“李军”同性别并同班的同学Sname.
        spark.sql("select Sname from student where Ssex=(select Ssex from student where Sname='李军') and Sname not in ('李军') and Class=(select Class from student where Sname='李军')").show()
        //    45、查询所有选修“计算机导论”课程的“男”同学的成绩表。
         spark.sql("select Sno,Degree from Score where Sno in (select Sno from student where Ssex='男') and Cno in (select Cno from Course where Cname='计算机导论')").show()
        //    46、查询Student表中的所有记录的Sname、Ssex和Class列。
        spark.sql("select Sname,Ssex,Class from student").show()
        //    47、查询教师所有的单位即不重复的Depart列。
         spark.sql("select distinct Depart from Teacher").show()
        //    48、查询Student表的所有记录
        spark.sql("select * from student").show()
        //    49、查询Score表中成绩在60到80之间的所有记录。
        spark.sql("select * from Score where Degree between 60 and 80").show()
        //    50、查询Score表中成绩为85,86或88的记录。
        spark.sql("select * from Score where Degree in (85,86,88)").show()
      }
    }
    
    
    展开全文
  • pyspark系列5-Spark SQL介绍

    千次阅读 2021-04-29 14:05:41
    文章目录一.Spark SQL的概述1.1 Spark SQL 来源1.2 从代码看Spark SQL的特点1.3 从代码运行速度看来看Spark SQL二.Spark SQL数据抽象2.1 DataFrame2.2 Dataset三.Spark SQL 操作数据库3.1 Spark SQL操作Hive数据库...
  • spark sql——6. spark sql操作hbase

    千次阅读 2019-05-30 23:10:22
    在hbase建一张表,使用spark sql操作它 参考: https://blog.csdn.net/eyeofeagle/article/details/84571756 https://blog.csdn.net/eyeofeagle/article/details/89943913 hbase数据准备: 进入hbase shell...
  • Spark SQL详解

    千次阅读 2018-09-26 09:06:17
    熟悉spark sql的都知道,spark sql是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业...
  • Spark SQL 结构化数据文件处理 详解

    千次阅读 2020-05-25 11:58:35
    Spark SQLSpark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API...
  • spark sql 例子

    万次阅读 2017-05-31 14:23:35
    该文主要展示的是spark sql 例子 (内容是找了份oracle的例子,翻译成spark sql的) 1、需要准备好四张表,既四个文本文件逗号分隔 2、为这四张表创建好schema,并注册成表 3、时间处理有小部分改动
  • Spark SQL作用及其架构

    千次阅读 2018-05-03 22:58:59
    对于Spark SQL的学习:本文首先会介绍Spark SQL的产生背景,知道了产生背景我们开始学习Spark SQL的作用和特点,最后介绍其原理。下一篇文章会介绍如何使用Spark SQL。 官网地址 2 Spark SQL产生背景 ...
  • Spark Sql编程

    千次阅读 2020-01-03 19:36:57
    第1章 Spark SQL概述 1.1 什么是Spark SQL Spark SQLSpark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。 我们已经学习了Hive,它是将Hive SQL转换...
  • spark sql——5. spark sql操作mysql表

    千次阅读 2019-05-30 23:08:48
    目标: 1.jdbc到mysql,读mysql的表并load成dataframe 2.对dataframe执行dsl、sql语句 ...spark自带的案例在: /examples/src/.../sql/SQLDataSourceExample.scala jar包: jdbc的jar包为mysql...
  • Spark SQL-概述

    千次阅读 2019-11-20 21:39:01
    1.什么是Spark SQL Spark SQLSpark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。 对比Hive,它是将Hive SQL转换成MapReduce然后提交到集群上...
  • Spark SQL 大数据处理

    千次阅读 2018-03-03 16:26:57
    InfoQ 上有学者对 Spark 的大数据处理,做了一些归纳演讲 我尝试着对这些演讲做翻译,加入了一些...Big Data Processing with Apache Spark - Part 2 : Spark SQL https://www.infoq.com/articles/apache-spark...
  • Spark SQL 自适应执行优化引擎

    千次阅读 2020-02-22 17:13:21
    在本篇文章中,笔者将给大家带来 Spark SQL 中关于自适应执行引擎(Spark Adaptive Execution)的内容。在之前的文章中,笔者介绍过 Flink SQL,目前 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 100,734
精华内容 40,293
关键字:

sparksql