精华内容
参与话题
问答
  • 一 ,知识 : 1 ,json 数据集 : 理论 Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]. 可以通过 SparkSession.read.json() 去加载一个 Dataset[String] 或者一个 JSON 文件 ...

    一 ,知识 :

    1 ,json 数据集 : 理论

    1. Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row].
    2. 可以通过 SparkSession.read.json() 去加载一个 Dataset[String] 或者一个 JSON 文件
    3. json 文件 :
    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}
    
    1. 支持数据类型 :
      1 ,基本类型 :Int, String, …
      2 ,样例类
    2. 导入 : 隐式转换
      import spark.implicits._

    2 ,json 数据集 : 操作

    1. 读文件 :
      val peopleDF = spark.read.json(path)
    2. 打印元数据信息 :
      peopleDF.printSchema()
    3. 建表 :
      peopleDF.createOrReplaceTempView(“people”)
    4. 查询 :
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    

    3 ,jdbc 数据集 : 理论

    1. 可读 :可以从数据库读数据。
    2. 可写 : 能把数据写出到关系型数据库。
    3. 将关系型数据的驱动包,放在 spark 的驱动路径下。
      cd /export/servers/spark-2.3.1-bin-hadoop2.6/jars

    4 ,jdbc 数据集 : 操作

    1. 拿数据 :
    // Loading data from a JDBC source
    val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://master01:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "hive").load()
    
    1. 拿数据 : 另一种方式
    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "hive")
    val jdbcDF2 = spark.read.jdbc("jdbc:mysql://master01:3306/rdd", "rddtable", connectionProperties)
    
    1. 存数据 :
    jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://master01:3306/rdd").option("dbtable", "rddtable2").option("user", "root").option("password", "hive").save()
    
    1. 存数据 2 :
    jdbcDF2.write.jdbc("jdbc:mysql://master01:3306/mysql", "db", connectionProperties)
    

    5 ,sparkSql 运行架构 :

    1. Spark SQL 运行架构 :
      1 ,Spark SQL 对 SQL 语句的处理和关系型数据库类似,即词法/语法解析、绑定、优化、执行。
      2 ,Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对Tree进行绑定、优化等处理过程。
      3 ,Spark SQL 由Core、Catalyst、Hive、Hive-ThriftServer 四部分构成:
    2. Core : 负责处理数据的输入和输出,如获取数据,查询结果输出成 DataFrame 等
    3. Catalyst : 负责处理整个查询过程,包括解析、绑定、优化等
    4. Hive : 负责对 Hive 数据进行处理
    5. Hive-ThriftServer : 主要用于对 hive 的访问

    6 ,TreeNode :

    1. 逻辑计划、表达式等都可以用tree来表示,它只是在内存中维护,并不会进行磁盘的持久化,分析器和优化器对树的修改只是替换已有节点。
    2. TreeNode有2个直接子类,QueryPlan和Expression。QueryPlam下又有LogicalPlan和SparkPlan. Expression是表达式体系,不需要执行引擎计算而是可以直接处理或者计算的节点,包括投影操作,操作符运算等

    7 ,Rule & RuleExecutor

    Rule 就是指对逻辑计划要应用的规则,以到达绑定和优化。他的实现类就是 RuleExecutor。优化器和分析器都需要继承 RuleExecutor。每一个子类中都会定义 Batch、Once、FixPoint . 其中每一个Batch 代表着一套规则,Once 表示对树进行一次操作,FixPoint 表示对树进行多次的迭代操作。RuleExecutor 内部提供一个 Seq[Batch]属性,里面定义的是RuleExecutor的处理逻辑,具体的处理逻辑由具体的Rule子类实现。

    在这里插入图片描述
    在这里插入图片描述

    8 , Spark SQL 运行原理

    1. 使用 SessionCatalog 保存元数据
      在解析SQL语句之前,会创建SparkSession,或者如果是2.0之前的版本初始化SQLContext,SparkSession只是封装了SparkContext和SQLContext的创建而已。会把元数据保存在SessionCatalog中,涉及到表名,字段名称和字段类型。创建临时表或者视图,其实就会往SessionCatalog注册
    2. 解析SQL,使用ANTLR生成未绑定的逻辑计划
      当调用SparkSession的sql或者SQLContext的sql方法,我们以2.0为准,就会使用SparkSqlParser进行解析SQL. 使用的ANTLR进行词法解析和语法解析。它分为2个步骤来生成Unresolved LogicalPlan:
    # 词法分析:Lexical Analysis,负责将token分组成符号类
    # 构建一个分析树或者语法树AST
    
    1. 使用分析器Analyzer绑定逻辑计划
      在该阶段,Analyzer会使用Analyzer Rules,并结合SessionCatalog,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划。
    2. 使用优化器Optimizer优化逻辑计划
      优化器也是会定义一套Rules,利用这些Rule对逻辑计划和Exepression进行迭代处理,从而使得树的节点进行和并和优化
    3. 使用SparkPlanner生成物理计划
      SparkSpanner使用Planning Strategies,对优化后的逻辑计划进行转换,生成可以执行的物理计划SparkPlan.
    4. 使用QueryExecution执行物理计划
      此时调用SparkPlan的execute方法,底层其实已经再触发JOB了,然后返回RDD

    二 ,SparkSQL 实战

    1 ,数据 : 货品交易数据集

    1. 时间表,商品详情,订单表
    2. 表关系 :
      每个订单可能包含多个货品,每个订单可以产生多次交易,不同的货品有不同的单价。
      在这里插入图片描述

    2 ,加载数据 : tbStock ( 样例类,读文件,转 DS )

    1. 定义样例类 :
      case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
    2. 读文件 :
      val tbStockRdd = spark.sparkContext.textFile(“tbStock.txt”)
    3. 把数据转换成 DS :
      val tbStockDS = tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS
    4. 得到表 : tbStockDS.show()

    3 ,加载数据 :tbStockDetail ( 样例类,读文件,转 DS )

    case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double) extends Serializable
    val tbStockDetailRdd = spark.sparkContext.textFile("tbStockDetail.txt")
    val tbStockDetailDS = tbStockDetailRdd.map(_.split(",")).map(attr=> tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS
    tbStockDetailDS.show()
    

    4 ,加载数据 :tbDate ( 样例类,读文件,转 DS )

    case class tbDate(dateid:String, years:Int, theyear:Int, month:Int, day:Int, weekday:Int, week:Int, quarter:Int, period:Int, halfmonth:Int) extends Serializable
    val tbDateRdd = spark.sparkContext.textFile("tbDate.txt")
    val tbDateDS = tbDateRdd.map(_.split(",")).map(attr=> tbDate(attr(0),attr(1).trim().toInt, attr(2).trim().toInt,attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS
    tbDateDS.show()
    

    5 ,注册表 :

    tbStockDS.createOrReplaceTempView("tbStock")
    tbDateDS.createOrReplaceTempView("tbDate")
    tbStockDetailDS.createOrReplaceTempView("tbStockDetail")
    

    6 ,计算所有订单中每年的销售单数、销售总额

    1. 思路 :
      三个表连接后以count(distinct a.ordernumber)计销售单数,sum(b.amount)计销售总额
    2. 订单数 :
    spark.sql("SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear").show
    
    1. 解析 :
      1 ,每年 : GROUP BY
      2 ,三表联查 : JOIN
      3 ,总数 : COUNT(DISTINCT a.ordernumber)
      4 ,总额 : SUM(b.amount)

    7 ,计算所有订单每年最大金额订单的销售额

    1. 每天每个订单的销售总额 :
    spark.sql("SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber").show
    
    1. 用上一步的结果作为基础表,与实践表联查,得到每年,每个订单的最大额
    spark.sql("SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC").show
    

    8 ,计算所有订单中每年最畅销货品 :

    没得最多,就最畅销 ( amount 最高,就最畅销 )
    在这里插入图片描述

    1. 每年,每个货品,卖出去多少 :
    spark.sql("SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid").show
    
    1. 每年,卖的最多的货品 : ( 得到:年,货品总额 )
    spark.sql("SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear").show
    
    1. join 到商品 id :
      用最大销售额和统计好的每个货品的销售额join,以及用年join,集合得到最畅销货品那一行信息
    spark.sql("SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear").show
    
    展开全文
  • 1 SparkSQL概述 Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块 1.1 Spark的由来 Hive是早期唯一运行在Hadoop上的SQL-on-Hadoop工具 => 之后又出现:Drill、Impala、Shark Spark的前身是...

    1 SparkSQL概述

    image-20201130202515060

    Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块

    1.1 Spark的由来

    Hive是早期唯一运行在Hadoop上的SQL-on-Hadoop工具 => 之后又出现:Drill、Impala、Shark

    Spark的前身是Shark,Shark使得SQL-on-Hadoop的性能比Hive提高了10-100倍。

    SparkSQL抛弃了Shark的代码,汲取了Shark的一些优点,性能得到极大提升:

    • 数据兼容方面:Spark不仅兼容Hive,还可以从RDD、parquet文件、JSON文件获取数据。
    • 性能优化方面:除了采用In-Memory Columnar Storage、byte-code generation等优化技术外、还将引进Cost Model对查询进行动态评估、获取最佳物理计划等。
    • 组件拓展方面:SQL语法解析器、分析器还有优化器都可以重新定义、扩展。

    Spark为了简化RDD的开发,提高编程开发效率,提供了两个编程抽象,类似于SparkCore种的RDD

    • DataFrame
    • DataSet

    1.2 SparkSQL特点

    • 易整合:整合了SQL查询和Spark编程

    • 统一的数据访问:使用相同的方式连接不同的数据源

    • 兼容Hive:在已有的仓库上直接运行SQL或HiveQL

    • 标准数据连接:通过JDBC或者ODBC连接

    1.3 DataFrame

    DataFrame是一种以RDD为基础的分布式数据集,类似于传统数库中的二维表格。

    Dataframe与RDD的区别在于:前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

    • DataFrame底层其实就是一种DataSet[Row]的DataSet

    1.4 DataSet

    DataSet是分布式数据集合,是DataFrame的扩展,是一个强类型集合。

    就相当于一个封装的对象,对象的属性就是DataSet的结构。

    1.5 RDD & DataFrame & DataSet三者关系

    • 1 三者出现的时间顺序:

      Spark1.0 => RDD

      Spark1.3 => DataFrame

      Spark1.6 => DataSet

    • 三者的共性

      • RDD、DataFrame、DataSet全是spark平台下的分布式弹性数据集
      • 三者都有惰性机制,在创建、转换不会立即执行,只有遇到action算子才会执行
      • 三者有许多共同的函数:如filter、排序等
      • 对DataFrame和DataSet操作都需要import spark.implicits._,在创建好的SparkSession对象后导入
      • 三者都有partition的概念
      • 三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心内存溢出
      • DataFrame和DataSet均可以使用模式匹配获取各个字段的值和类型
    • a、DataFrame是在RDD上进行扩展,将数据增加了结构信息
      b、DataSet是在DataFrame的基础上进行扩展,增加数据的类型。
      c、DataFrame是DataSet的一个特例,即为数据类型ROW的DataSet

    三者相互转换:

    image-20201130224642706

    2 SparkSQL核心编程

    在SparkCore中如果执行应用程序需要先构建上下文环境对象SparkContext;

    SparkSQL可以理解为对SparkCore的封装,不仅仅在RDD模型上进行了封装,上下文环境也进行了封装。

    2.1 SparkSession

    SparkSession内部封装了SparkContext,所以实际的计算是由SparkContext完成的。当我们使用spark-shell的时候,spark框架内部会创建一个名叫spark的SparkSession对象;就像前面的sc的SparkContext对象一样

    image-20201130205503752

    2.2 DataFrame

    ①创建DataFrame

    方式1:通过Spark数据源创建

    步骤1:打开spark-shell.cmd,查看Spark支持创建文件的数据源格式

    image-20201130134516576

    步骤2:在spark的bin/data目录中创建user.json文件

    {"id":1, "name":"Tom", "age":24 }
    {"id":2, "name":"Jerry", "age":34 }
    {"id":3, "name":"Mike", "age":54 }
    

    步骤3:读取json文件创建DataFrame

    scala> spark.read.json("data/user.json")
    res0: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]
    

    步骤4:展示结果:

    scala> spark.read.json("data/user.json").show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 24|  1|  Tom|
    | 34|  2|Jerry|
    | 54|  3| Mike|
    +---+---+-----+
    

    方式2:从一个存在的RDD进行转换

    方式3:从Hive Table查询返回

    ②SQL语法

    SQL语法风格指查询数据的时候使用SQL语句查询,必须要有临时视图或者全局视图来辅助。

    步骤1:读取JSON文件创建DataFrame

    scala> val df = spark.read.json("data/user.json")
    

    步骤2:对DataFrame创建一个临时表

    scala> df.createOrReplaceTempView("people")
    

    步骤3:通过SQL语句实现查询全表

    scala> spark.sql("select * from people").show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 24|  1|  Tom|
    | 34|  2|Jerry|
    | 54|  3| Mike|
    +---+---+-----+
    //也可以将查询结果赋值给一个变量
    scala> val sqlDF = spark.sql("select * from people")
    sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]
    
    scala> sqlDF.show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 24|  1|  Tom|
    | 34|  2|Jerry|
    | 54|  3| Mike|
    +---+---+-----+
    

    注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。

    注意:使用全局临时表需要全路径访问,如:global_temp.people

    步骤4:对DataFrame创建一个全局表

    scala> df.createOrReplaceGlobalTempView("people1")
    

    步骤5:通过SQL语句实现查询全表

    scala> spark.sql("select * from global_temp.people1").show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 24|  1|  Tom|
    | 34|  2|Jerry|
    | 54|  3| Mike|
    +---+---+-----+
    

    步骤6:创建一个新的SparkSession查找表

    scala> spark.newSession().sql("select * from global_temp.people1").show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 24|  1|  Tom|
    | 34|  2|Jerry|
    | 54|  3| Mike|
    +---+---+-----+
    

    ③DSL语法

    DataFrame提供了一个特定领域语言(domain-specific language, DSL)管理结构化的数据,可以在Scala,Java等中使用DSL,使用DSL就不用创建临时视图了。

    步骤1:创建一个DataFrame

    scala> val df = spark.read.json("data/user.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]
    

    步骤2:查看DataFrame的Schema信息

    scala> df.printSchema
    root
     |-- age: long (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
    

    步骤3:只查看“name”列的信息

    scala> df.select("name").show()
    +-----+
    | name|
    +-----+
    |  Tom|
    |Jerry|
    | Mike|
    +-----+
    

    步骤4:查看“username”列数据以及“age+1”数据

    涉及到运算的时候,每列都必须使用$,或者采用引号表达式:单引号+字段名

    scala> df.select($"age"+1).show
    scala> df.select('age+1).show
    +---------+
    |(age + 1)|
    +---------+
    |       25|
    |       35|
    |       55|
    +---------+
    

    步骤5:给列取别名

    注意:使用$或者单引号+字段名,表示列中的每条数据,就不能和其他使用双引号的字段一起使用。

    scala> df.select('name,'age+1 as "newage").show
    +-----+------+
    | name|newage|
    +-----+------+
    |  Tom|    25|
    |Jerry|    35|
    | Mike|    55|
    +-----+------+
    

    步骤6:查看age大于30的数据

    scala> df.filter('age>30).show
    +---+---+-----+
    |age| id| name|
    +---+---+-----+
    | 34|  2|Jerry|
    | 54|  3| Mike|
    +---+---+-----+
    

    步骤7:按照age分组,查看数据条数

    scala> df.groupBy("age").count.show
    +---+-----+
    |age|count|
    +---+-----+
    | 54|    1|
    | 34|    1|
    | 24|    1|
    +---+-----+
    

    ④RDD转化为DataFrame

    RDD.toDF()

    • 在IDEA中开发程序时,如果需要RDD与DF和DS之间互相操作,那么需要引入import spark.implicits._

      这里的spark不是Scala中的包名,而是创建sparkSession对象的变量名称,所以必须要创建SparkSession对象再导入。

    • spark对象不能使用var声明,因为Scala只支持val修饰的对象的导入。

    scala> val idRDD = sc.makeRDD(List(1, 2, 3, 4))
    idRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at <console>:24
    scala> idRDD.toDF("id").show
    +---+
    | id|
    +---+
    |  1|
    |  2|
    |  3|
    |  4|
    +---+
    

    实际开发中,一般通过样例类将RDD转换为DataFrame

    scala> case class User(name:String,age:Int)
    defined class User
    
    scala> sc.makeRDD(List(("zhangsan", 30), ("lisi", 40))).map(t=>User(t._1,t._2)).toDF.show
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 30|
    |    lisi| 40|
    +--------+---+
    

    方式2:

    scala> val df = sc.makeRDD(List(("zhangsan", 23), ("lisi", 45))).toDF("name", "age")
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> df.show
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 23|
    |    lisi| 45|
    +--------+---+
    

    ⑤DataFrame转换为RDD

    scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> val rdd = df.rdd
    rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[93] at rdd at <console>:25
    
    scala> val array = rdd.collect
    array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])
    

    注意:此时得到的RDD存储类型为Row

    scala> array
    res26: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])
    
    scala> array(0)
    res27: org.apache.spark.sql.Row = [zhangsan,30]
    
    scala> array(0)(0)
    res28: Any = zhangsan
    

    2.3 DataSet

    DataSet是强数据类型的数据集合,需要提供对应的类型信息

    ①创建DataSet

    使用样例类序列创建DataSet
    scala> case class Person(name:String, age:Long)
    defined class Person
    
    scala> val caseClassDS = Seq(Person("zhangsan", 22)).toDS()
    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
    
    scala> caseClassDS.show
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 22|
    +--------+---+
    
    使用基本类型的序列创建DataSet

    实际的使用中很少把序列转化成DataSet,更多的是通过RDD得到DataSet

    scala> val ds = Seq(1, 2, 3, 4, 5).toDS
    ds: org.apache.spark.sql.Dataset[Int] = [value: int]
    
    scala> ds.show
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    |    4|
    |    5|
    +-----+
    

    ②RDD转换为DataSet

    SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table结构,case类属性通过反射变成了表的列名。

    scala> val ds = sc.makeRDD(List(("zhangsan", 30), ("lisi", 49))).map(t=>User(t._1, t._2)).toDS
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    
    scala> ds.show
    +--------+---+
    |    name|age|
    +--------+---+
    |zhangsan| 30|
    |    lisi| 49|
    +--------+---+
    

    ③DataSet转换为RDD

    scala> val ds = sc.makeRDD(List(("zhangsan", 30), ("lisi", 49))).map(t=>User(t._1, t._2)).toDS
    
    scala> val rdd = ds.rdd
    rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[103] at rdd at <console>:25
    
    scala> rdd.collect
    res34: Array[User] = Array(User(zhangsan,30), User(lisi,49))
    

    ④DataFrame和DataSet转换

    DataFrame转换成DataSet

    scala> val df = sc.makeRDD(List(("zhangsan", 23), ("lisi", 45))).toDF("name", "age")
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> val ds = df.as[User]
    ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
    

    DataSet转换成DataFrame

    scala> val df = ds.toDF
    df: org.apache.spark.sql.DataFrame = [name: string, age: int]
    

    2.4 三者的转换

    • 转换成RDD:

      DF.rdd

      DS.rdd

    • 转换成DataFrame

      RDD.toDF(数据的结构) RDD.map(t=>样例类属性).toDF

      DS.toDF()

    • 转换成DataSet

      RDD.map(t=>样例类属性).toDS

      DF.as[类型]

    2.5 IDEA开发SparkSQL

    ①添加依赖

    spark-core和spark-sql

        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.0.0</version>
            </dependency>
        </dependencies>
    

    ②创建SparkSQL环境

    • SparkSession的对象不是new出来的,是通过SparkSession的伴生对象.builder()的getOrCreate()构建环境对象。

    • 另外是哪一个环境,还需要SparkConf()环境和appname

    方式1:

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("sparkSQL").getOrCreate()
    

    方式2:

    val conf = new SparkConf().setMaster("local").setAppName("spark-sql")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    

    创建Spark环境后,一般情况要导入spark的implicits._

    主要目的是为了使用DSL语法的时候可以使用sparkSQL的隐式转换;提供RDD转DataFrame等的方法

        //创建spark环境
        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("sparkSQL").getOrCreate()
        //这里的spark不是包名,而是SparkSession的对象名
        //一般情况下,需要在创建SparkSession对象后,增加导入
        import spark.implicits._
    

    spark提供了一个sparkContext变量,可以获取SparkContext对象

    可以通过这个sc对象创建RDD

    val sc = spark.sparkContext
    

    ③读取文件创建DataFrame

        //TODO DataFrame
        //创建的JSON文件中的整个文件的数据应该符合JSON的语法规则
        //RDD读取文件的时候是一行一行读取的,所以SparkSQL读取JSON文件时,要求一行数据符合JSON格式即可
        val df = spark.read.json("input/users.json")
        df.show()
    

    ④SQL语法 & DSL语法

        //SQL
        df.createOrReplaceTempView("user")
        spark.sql("select * from user").show()
        spark.sql("select avg(age) as newAge from user").show()
    
        //DSL
        //DSL语法需要在当前环境中引入SparkSQL的隐式转换规则
        df.select("id", "name").show()
        df.select('name, 'age + 1).show()
        df.select($"age" + 1).show()
    	//使用DSL查询表中全部数据
    	df.select('*).show()
    

    ⑤创建DataSet

        //TODO DataSet
        val seq = Seq(1, 2, 3, 4)
        val ds: Dataset[Int] = seq.toDS()
        val list = List("1", "2", "3")
        val ds1 = list.toDS()
        ds.show()
        ds1.show()
    
        val list1 = List((1, "zhangsan", 23), (2, "lisi", 33))
        val rdd = spark.sparkContext.makeRDD(list1)
        rdd.map{
          case (id, name, age) => User(id, name, age)
        }.toDS().show()
    

    ⑥RDD & DataFrame & DataSet

    RDD

        //TODO RDD转DataFrame
        val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhangsan", 23), (2, "lisi", 45)))
        val df = rdd.toDF("id", "name", "age")
        df.show()
        //TODO RDD转DataSet
        val ds = rdd.toDS() //这样直接转,没有字段名
        ds.show()
        rdd.map{
          case (id, name, age) => User(id, name, age)
        }.toDS().show()   //通过map+模式匹配可以添加字段名
    

    DataFrame

    //TODO DataFrame转RDD
    val df = spark.read.json("input/users.json")
    val rdd = df.rdd
    rdd.foreach(println)
    //TODO DataFrame转DataSet
    val ds: Dataset[User] = df.as[User]
    ds.show() //注意这里的类型!需要将int改成bigInt才对!
    

    DataSet

    //TODO DataSet转RDD
    val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhangsan", 23), (2, "lisi", 43)))
    val ds: Dataset[User] = rdd.map {
      case (id, name, age) => User(id, name, age)
    }.toDS()
    
    val rdd1 = ds.rdd
    rdd1.foreach(println)
    
    //TODO DataSet转DataFrame
    ds.toDF().show()
    ds.toDF("id1", "name1", "age1").show()
    

    2.6 用户自定义函数

    ①UDF

    来一行处理一行

    object SparkSql05_udf {
      def main(args: Array[String]): Unit = {
    
        //1 创建spark环境
        val conf = new SparkConf().setMaster("local").setAppName("spark-sql")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        import spark.implicits._
        //2 读取json文件,创建DataFrame
        val df: DataFrame = spark.read.json("input/users.json")
        //3 创建临时视图
        df.createOrReplaceTempView("user")
        //4 TODO 注册udf自定义函数
        spark.udf.register("prefixName", (name:String) => {"Name : " + name})
    
        //5 TODO 使用udf自定义函数
        spark.sql("select prefixName(name) from user").show()
        
        //6 关闭环境
        spark.stop()
      }
    

    ②UDAF-弱类型

    计算平均工资

    弱类型的UserDefinedAggregateFunction已经过时了,Spark3.0以后就使用强类型的Aggregator替代了。

    @deprecated("Aggregator[IN, BUF, OUT] should now be registered as a UDF" +
      " via the functions.udaf(agg) method.", "3.0.0")
    

    spark3.0之前的弱类型的UDAF,需要实现UserDefinedAggregateFunction,不需要定义泛型,只需要重写8个方法就可以了

    image-20201201005817474

    object SparkSql06_udaf {
      def main(args: Array[String]): Unit = {
    
        //1 创建spark环境
        val conf = new SparkConf().setMaster("local").setAppName("spark-sql")
        val spark = SparkSession.builder().config(conf).getOrCreate()
        import spark.implicits._
        //2 读取json文件,创建DataFrame
        val df: DataFrame = spark.read.json("input/users.json")
        //3 创建临时视图
        df.createOrReplaceTempView("user")
    
        //5 创建聚合函数
        val udaf = new MyAvg
        //6 TODO 注册UDAF函数
        spark.udf.register("myAvg", udaf)
    	
        //UDF函数称之为用户自定义函数,但是这个函数不能聚合,只能对每一条进行处理
        // zhangsan => "Name : zhangsan"
        //4 如果想使用sql完成聚合功能,那么必须采用特殊的函数:用户自定义聚合函数UDAF
        spark.sql("select myAvg(age) from user").show()
    
        //关闭环境
        spark.stop()
      }
      // 自定义年龄平均值的聚合函数
      // 继承UserDefinedAggregateFunction类
      // 重写方法
      class MyAvg extends UserDefinedAggregateFunction{
        //输入数据的结构(年龄)
        override def inputSchema: StructType = {
          StructType(Array(StructField("age", LongType)))
        }
        // 缓冲区数据的结构(年龄总和,用户的数量)
        override def bufferSchema: StructType = {
          StructType(Array(
            StructField("total", LongType),
            StructField("count", LongType)
          ))
        }
        // 聚合函数的输出结构类型
        override def dataType: DataType = LongType
        //稳定性
        override def deterministic: Boolean = true
        //缓冲区初始化操作
        override def initialize(buffer: MutableAggregationBuffer): Unit = {
          buffer.update(0, 0L)
          buffer.update(1, 0L)
        }
        //用户输入的值更新缓冲区的值
        override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
          buffer.update(0, buffer.getLong(0) + input.getLong(0))
          buffer.update(1, buffer.getLong(1) + 1)
        }
        //合并缓冲区的数据;这是因为分布式计算
        override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
          buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
          buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
        }
        //计算聚合函数的结果
        override def evaluate(buffer: Row): Any = {
          buffer.getLong(0) / buffer.getLong(1)
        }
      }
    }
    

    ③UDAF-强类型

    object SparkSql08_udaf {
      def main(args: Array[String]): Unit = {
        //创建spark环境
        val spark = SparkSession.builder().master("local[*]").appName("spark-sql").getOrCreate()
        import spark.implicits._
        //读取文件创建df
        val df = spark.read.json("input/users.json")
        //创建临时表
        df.createOrReplaceTempView("user")
        //TODO 创建自定义聚合函数
        val myAvg = new myAvg()
        //TODO 向spark注册函数
        spark.udf.register("myAvg", functions.udaf(myAvg))
        //TODO SQL本身就是弱类型操作,支持弱类型的聚合函数,不能直接支持强类型的聚合函数
        //  需要使用functions.udaf()
        //使用SQL语法,查询平均年龄
        spark.sql("select myAvg(age) from user").show() 
        //关闭环境
        spark.stop()
      }
      case class AvgBuffer(var total: Long, var count: Long)
      //TODO 自定义年龄平均值的聚合函数(强类型)
      //  1. 继承org.apache.spark.sql.expressions.Aggregator
      //  2. 定义泛型
      //  IN:年龄-Long
      //  BUF:AvgBuffer
      //  OUT:平均年龄-Long
      class myAvg extends Aggregator[Long, AvgBuffer, Long]{
        //缓冲区的初始化操作
        override def zero: AvgBuffer = {
          AvgBuffer(0L, 0L)
        }
        //将年龄数据和缓冲区的数据进行聚合
        override def reduce(b: AvgBuffer, a: Long): AvgBuffer = {
          b.total += a
          b.count += 1
          b
        }
        //分布式下的多个缓冲区合并
        override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
          b1.total += b2.total
          b1.count += b2.count
          b1
        }
        //计算结果
        override def finish(reduction: AvgBuffer): Long = {
          reduction.total / reduction.count
        }
        //这两个是固定值
        override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product
        override def outputEncoder: Encoder[Long] = Encoders.scalaLong
      }
    }
    

    ④UDAF-强类型(早期版本)

    Spark3.0版本之前,虽然也能使用Aggregator(),但是无法将强类型的聚合函数使用在SQL语法中。

    那么哪个时候怎么自定义聚合函数呢?

    • 方式1:使用弱类型的UserDefinedAggregateFunction
    • 方式2:将数据的一行当成对象传递给聚合函数,然后使用DSL语法,将聚合函数转换成查询列。
    object SparkSql09_udaf_1 {
      def main(args: Array[String]): Unit = {
    
        //创建spark环境
        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("sparkSQL-udaf").getOrCreate()
        import spark.implicits._
        //读取文件创建df
        val df = spark.read.json("input/users.json")
        //创建临时视图
        df.createOrReplaceTempView("user")
    
        //需求:计算平均工资
        //TODO 创建自定义聚合函数
        val myAvg = new MyAvg
    
        //TODO 使用DSL语法将聚合函数转换成查询列
        val ds: Dataset[User] = df.as[User]
        ds.select(myAvg.toColumn).show()
    
    
        //spark.sql("select myAvg(age) from user").show()
        //TODO Spark3.0版本之前,无法将强类型聚合函数使用在SQL文中
        //  那么哪个时候怎么做的呢?方式1:使用弱类型的UserDefinedAggregateFunction
        //                        方式2:将数据的一行当成对象传递给聚合函数,使用DSL语法查询
    
        //关闭环境
        spark.stop()
      }
      case class User(id: Long, name: String, age: Long)
      case class AvgBuffer(var total: Long, var count: Long)
    
      //TODO 自定义年龄平均值的聚合函数(3.0版本以前的强类型)
      //  1. 继承Aggregator :org.apache.spark.sql.expressions.Aggregator
      //  2. 定义泛型
      //  IN:User(将一行数据作为输入)
      //  BUF:AvgBuffer
      //  OUT:Long
      //  3. 重写方法
      class MyAvg extends Aggregator[User, AvgBuffer, Long]{
        //缓冲区初始化
        override def zero: AvgBuffer = {
          AvgBuffer(0L, 0L)
        }
        //输入的年龄和缓冲区聚合
        override def reduce(b: AvgBuffer, user: User): AvgBuffer = {
          b.total += user.age
          b.count +=1
          b
        }
        //分布式多个缓冲区的合并
        override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
          b1.total += b2.total
          b1.count += b2.count
          b1
        }
        //返回计算的结果
        override def finish(reduction: AvgBuffer): Long = {
          reduction.total / reduction.count
        }
        //默认的两个值
        override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product
        override def outputEncoder: Encoder[Long] = Encoders.scalaLong
      }
    }
    

    2.7 数据的加载和保存

    SparkSQL提供了通用API的保存数据和加载数据的方式。

    spark提供的文件

    image-20201201211741344

    ①加载数据

    spark.read.load是加载数据的通用方法

    spark.read.load默认加载的数据格式为:parquet

    如果文件格式读取错误,会报下面的错误。

    scala> spark.read.load("examples/src/main/resources/users.parquet").show
    +------+--------------+----------------+
    |  name|favorite_color|favorite_numbers|
    +------+--------------+----------------+
    |Alyssa|          null|  [3, 9, 15, 20]|
    |   Ben|           red|              []|
    +------+--------------+----------------+
    
    Caused by: java.lang.RuntimeException: file:/opt/module/spark-local/data/user.json is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [51, 48, 125, 10]
    

    按照文件的格式读取数据:

    • format("…"):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile”
    • load("…"):在"csv"、“jdbc”、“json”、“orc”、"parquet"和"textFile"格式下需要传入加载数据的路径
    scala> spark.read.format("json").load("data/user.json").show
    +---+---+--------+                                                              
    |age| id|username|
    +---+---+--------+
    | 20|  1|zhangsan|
    | 24|  2|    lisi|
    | 40|  3|  wangwu|
    | 30|  4| zhaoliu|
    +---+---+--------+
    

    直接在文件上进行查询:文件格式. `文件路径

    scala> spark.sql("select * from json.`/opt/module/spark-local/data/user.json`").show
    +---+---+--------+
    |age| id|username|
    +---+---+--------+
    | 20|  1|zhangsan|
    | 24|  2|    lisi|
    | 40|  3|  wangwu|
    | 30|  4| zhaoliu|
    +---+---+--------+
    

    ②保存数据

    scala> var df = spark.read.format("json").load("data/user.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field]
    
    scala> df.write.
    bucketBy   csv   format   insertInto   jdbc   json   mode   option   options   orc   parquet   partitionBy   save   saveAsTable   sortBy   text
    

    spark的保存数据的通用:df.write.save(“输出路径”)

    • save ("…"):在"csv"、“orc”、"parquet"和"textFile"格式下需要传入保存数据的路径
    scala> df.write.save("data/output")
    
    [atguigu@hadoop102 output]$ ll
    总用量 4
    -rw-r--r--. 1 atguigu atguigu 969 12月  1 21:01 part-00000-b97909ab-e827-41bc-9a90-1b2784529339-c000.snappy.parquet
    -rw-r--r--. 1 atguigu atguigu   0 12月  1 21:01 _SUCCESS
    

    spark保存为指定的文件类型

    format("…"):指定保存的数据类型,包括"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile”

    scala> df.write.format("json").save("data/output1")
    
    atguigu@hadoop102 output1]$ ll
    总用量 4
    -rw-r--r--. 1 atguigu atguigu 153 12月  1 21:04 part-00000-1f990b58-3a9e-4d0a-be95-71ce27deb5a0-c000.json
    -rw-r--r--. 1 atguigu atguigu   0 12月  1 21:04 _SUCCESS
    

    保存路径的文件保存模式

    因为默认情况下,save保存的路径文件如果存在了,会报错;可以加上.mode(“模式“);当存在相同文件可以追加,覆盖,忽略等。

    SaveMode是一个枚举类,其中的常量包括:

    Scala/Java Any Language Meaning
    SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常
    SaveMode.Append “append” 如果文件已经存在则追加
    SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖
    SaveMode.Ignore “ignore” 如果文件已经存在则忽略
    scala> df.write.format("csv").mode("append").save("data/output1")
    
    [atguigu@hadoop102 output1]$ ll
    总用量 8
    -rw-r--r--. 1 atguigu atguigu 153 12月  1 21:04 part-00000-1f990b58-3a9e-4d0a-be95-71ce27deb5a0-c000.json
    -rw-r--r--. 1 atguigu atguigu  49 12月  1 21:11 part-00000-b83d3429-0531-42db-b347-d582d6bcd82d-c000.csv
    -rw-r--r--. 1 atguigu atguigu   0 12月  1 21:11 _SUCCESS
    

    ③Parquet

    Spark SQL默认数据源为Parquet格式,Parquet是一种能够有效存储嵌套数据的列式存储格式。

    • 数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format。
    • 修改配置项spark.sql.sources.default可以修改默认的数据源格式

    加载数据

    scala> spark.read.load("/opt/module/spark-local/examples/src/main/resources/users.parquet").show
    +------+--------------+----------------+
    |  name|favorite_color|favorite_numbers|
    +------+--------------+----------------+
    |Alyssa|          null|  [3, 9, 15, 20]|
    |   Ben|           red|              []|
    +------+--------------+----------------+
    

    保存数据

    scala> var df = spark.read.json("/opt/module/data/input/people.json")
    //保存为parquet格式
    scala> df.write.mode("append").save("/opt/module/data/output")
    

    ④JSON

    Spark SQL 能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载JSON 文件

    • Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。
    • image-20201201212600347

    步骤1:导入隐式转换

    import spark.implicits._
    

    步骤2:加载JSON文件

    val path = "/opt/module/spark-local/people.json"
    val peopleDF = spark.read.json(path)
    

    步骤3:创建临时表

    peopleDF.createOrReplaceTempView("people")
    

    步骤4:数据查询

    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    +------+
    |  name|
    +------+
    |Justin|
    +------+
    

    ⑤CSV

    csv是通过,分割的;单纯的读取csv格式的文件,会把标题字段也当作是一行数据读取。

    如果要正常的读取数据,需要通过option选项设定分隔符seq;inferSchema;header设为true表示有头信息,也就是标题。

    scala> spark.read.format("csv").load("examples/src/main/resources/people.csv").show
    +------------------+
    |               _c0|
    +------------------+
    |      name;age;job|
    |Jorge;30;Developer|
    |  Bob;32;Developer|
    +------------------+
    
    scala> spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("examples/src/main/resources/people.csv").show
    +-----+---+---------+
    | name|age|      job|
    +-----+---+---------+
    |Jorge| 30|Developer|
    |  Bob| 32|Developer|
    +-----+---+---------+
    

    image-20201201233539093

    ⑥MySQL

    SparkSQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame。需要在启动spark-shell的之前,把mysql-connector-java-5.1.27-bin.jar的jar包放到/opt/module/spark-local/jars下。

    步骤1:提前在hadoop102的mysql中创建表

    CREATE DATABASE `spark-sql`;
    CREATE TABLE USER(id INT, NAME VARCHAR(20), age INT);
    INSERT INTO USER VALUES(1, "zhangsan", 23);
    INSERT INTO `user` VALUES(2, "lisi", 45);
    

    步骤2:通过option去设置连接MySQL的url,driver驱动类,登录的用户名,密码,表名等。

    scala> spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/spark-sql").option("driver", "com.mysql.jdbc.Driver").option("user", "root").option("password", "123456").option("dbtable", "user").load().show
    +---+--------+---+
    | id|    name|age|
    +---+--------+---+
    |  1|zhangsan| 23|
    |  2|    lisi| 45|
    +---+--------+---+
    

    步骤3:向MySQL中写入数据

    然后在mysql中就能查看到新增的数据

    scala> val df = sc.makeRDD(List((3, "Tom", 88))).toDF("id", "name", "age")
    df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
    
    scala> df.write.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/spark-sql").option("user", "root").option("password", "123456").option("dbtable", "user").mode("append").save
    

    ⑦Hive

    内嵌的Hive

    Hive 的元数据存储在 derby 中, 默认仓库地址:$SPARK_HOME/spark-warehouse

    初次使用spark-sql,用到hive的时候,会自动在SPARK_HOME创建saprk的warehouse,和metastore_db

    在实际使用中, 几乎没有任何人会使用内置的 Hive

    scala> spark.sql("show tables").show
    。。。
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    +--------+---------+-----------+
    
    scala> spark.sql("create table aa(id int)")
    
    。。。
    
    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    | default|       aa|      false|
    +--------+---------+-----------+
    

    向表加载本地数据

    scala> spark.sql("load data local inpath 'input/ids.txt' into table aa")
    
    。。。
    
    scala> spark.sql("select * from aa").show
    +---+
    | id|
    +---+
    |  1|
    |  2|
    |  3|
    |  4|
    +---+
    
    外部的Hive
    • 步骤1:Spark接管Hive需要将hive-site.xml拷贝到spark-local的conf/文件下
    • 步骤2:把MySQL的驱动拷贝到jars/目录下
    • 步骤3:如果访问不到hdfs,则需要把core-site.xml和hdfs.site.xml拷贝到conf/目录下
    • 步骤4:重启spark-shell

    在IDEA中使用外部的Hive创建库,创建表,导入数据

    步骤1:导入依赖

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.27</version>
    </dependency>
    

    步骤2:将hive-site.xml文件拷贝到项目的resources目录中。

    步骤3:代码实现

    • 在代码的最前面指定代理用户: System.setProperty(“HADOOP_USER_NAME”, “atguigu”)
    • 修改数据仓库地址:.config(“spark.sql.warehouse.dir”,“hdfs://hadoop102:8020/user/hive/warehouse”)
    • **添加Hive支持!**enableHiveSupport()
    object SparkSql10_hive {
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "atguigu")
        //创建spark环境
        val conf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
        val spark = SparkSession.builder().enableHiveSupport().config(conf).config("spark.sql.warehouse.dir",
        "hdfs://hadoop102:8020/user/hive/warehouse").getOrCreate()
        spark.sql("use hive_db")
    
        spark.sql(
          """
            |CREATE TABLE `user_visit_action`(
            |  `date` string,
            |  `user_id` bigint,
            |  `session_id` string,
            |  `page_id` bigint,
            |  `action_time` string,
            |  `search_keyword` string,
            |  `click_category_id` bigint,
            |  `click_product_id` bigint,
            |  `order_category_ids` string,
            |  `order_product_ids` string,
            |  `pay_category_ids` string,
            |  `pay_product_ids` string,
            |  `city_id` bigint)
            |row format delimited fields terminated by '\t'
            |""".stripMargin)
        spark.sql(
          """
            |load data local inpath 'input/user_visit_action.txt' into table hive_db.user_visit_action
            |""".stripMargin)
    
        spark.sql(
          """
            |CREATE TABLE `product_info`(
            |  `product_id` bigint,
            |  `product_name` string,
            |  `extend_info` string)
            |row format delimited fields terminated by '\t'
            |""".stripMargin)
        spark.sql(
          """
            |load data local inpath 'input/product_info.txt' into table hive_db.product_info
            |""".stripMargin)
    
        spark.sql(
          """
            |CREATE TABLE `city_info`(
            |  `city_id` bigint,
            |  `city_name` string,
            |  `area` string)
            |row format delimited fields terminated by '\t'
            |""".stripMargin)
        spark.sql(
          """
            |load data local inpath 'input/city_info.txt' into table hive_db.city_info
            |""".stripMargin)
    
        //关闭环境
        spark.stop()
    
      }
    }
    
    运行SparkSQL CLI
    [atguigu@hadoop102 jars]$ cd /opt/module/spark-local/
    [atguigu@hadoop102 spark-local]$ bin/spark-sql
    
    spark-sql (default)> 
    
    运行Spark beeline
    展开全文

空空如也

1 2 3 4 5 ... 17
收藏数 335
精华内容 134
关键字:

sparksql原理