精华内容
下载资源
问答
  • SparkSQL

    千次阅读 2020-04-22 11:01:53
    SparkSQL基本介绍 什么是SparkSQL? SparkSQL底层的数据抽象 什么是DataFrame?? 什么是DataSet?? SparkSQL查询数据的形态 添加Schema的方式 通过StructType指定Schema代码流程 利用反射机制推断Schema代码...

    目录

     

    累加器的作用

    广播变量的作用

    SparkSQL基本介绍

    什么是SparkSQL?

    SparkSQL底层的数据抽象

    什么是DataFrame??

    什么是DataSet??

    SparkSQL查询数据的形态

    添加Schema的方式

    通过StructType指定Schema代码流程

    利用反射机制推断Schema代码流程


    累加器的作用

    累加器accumulators:累加器支持在所有不同节点之间进行累加计算

     

    广播变量的作用

    在每个机器上缓存一份、不可变的、只读的、相同的变量,该节点每个任务都能访问。起到节省资源和优化的作用。

     

     

    SparkSQL基本介绍

     

    什么是SparkSQL?

    用于处理结构化数据的Spark模块。

     

     

    SparkSQL底层的数据抽象

    DataFrame和DataSet

    Hive和SparkSQL的对比

    Hive是将sql转化成MapReduce进行计算(降低学习成本、提高开发效率)

    SparkSQL是将sql转化成rdd集进行计算(降低学习成本、提高开发效率)

     

     

    什么是DataFrame??

    DataFrame是以RDD为基础的带有Schema元信息的分布式数据集。

                                                                          

     

    什么是DataSet??

    含有类型信息的DataFrame就是DataSet

    (DataSaet=DataFrame+类型= Schema+RDD*n+类型)

     

    SparkSQL查询数据的形态

    1. 类似方法调用,领域特定语言(DSL)。

    personDF.select($"id",$"name",$"age"+1).filter($"age">25).show

    1. SQL语句

    spark.sql("select * from personDFT where age >25").show

     

    添加Schema的方式

    第1种:指定列名添加Schema

    第2种:通过StructType指定Schema

    第3种:编写样例类,利用反射机制推断Schema

    指定列名添加Schema代码流程

     

    1  创建sparksession

    2 创建sc

    3 读取数据并加工

    4 设置表结构   ttRDD.toDF("id","name","age")

    5  注册成表并查询

    6 关闭sc    sparksession

     

    通过StructType指定Schema代码流程

    1  创建sparksession

    2 创建sc

    3 读取数据并加工

    4 设置表结构   

    types.StructType(
        //   字段类型  (字段名,字段类型,是否为空)
        List(StructField("id",IntegerType,true)
        )
      )

    5 创建DS  DF

      val ttDF: DataFrame = spark.createDataFrame(RowRDD,structTable)

    6  注册成表并查询
    7 关闭sc    sparksession

     

     

    利用反射机制推断Schema代码流程

    准备样例类

    1 创建sparksession
    2 创建sc

    3 读取数据并加工
    val PersonRDD: RDD[Person] = ttRDD.map(z=>Person(z(0).toInt,z(1),z(2).toInt))
    4 RDD转DF
       val personDF: DataFrame = PersonRDD.toDF()

    5 注册成表并查询

    6 关闭sc    sparksession

     

     

    展开全文
  • sparksql

    2020-06-26 08:15:03
    sparksql1、课程目标2、sparksql概述2.1 sparksql前世今生2.2 sparksql是什么3、sparksql特性4、DataFrame4.1 dataFrame是什么4.2 RDD与DataFrame优缺点对比5、DataSet5.1 dataSet是什么5.2 dataFrame和dataSet互相...

    1、课程目标

    • 1、掌握sparksql原理
    • 2、掌握DataFrame和DataSet数据结构和使用方式
    • 3、掌握通过sparksql来开发简单的应用程序

    2、sparksql概述

    2.1 sparksql前世今生

    • spark是专门为spark设计的大规模数据仓库系统
    • spark需要依赖于hive的代码,同时也依赖于spark的版本
    • 随着数据处理的复杂度越来越高,并且性能也要求很高
    • 发现之前hivesql底层运行mapreduce代码这种思想限制了shark的发展
    • 最后就把shark这个框架废弃了,把工作的重点转移到sparksql

    2.2 sparksql是什么

    • Spark SQL is Apache Spark’s module for working with structured data

    • sparksql是spark的一个用来处理结构化数据的模块

    • 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

    3、sparksql特性

    • 1、易整合
      • 可以使用sparksql与spark应用程序进行混合使用
      • 同时也可以使用不同的语言进行代码开发
        • java
        • scala
        • python
        • R
    • 2、统一的数据源访问
      • sparksql可以使用一种相同的方式来访问外部的数据源
        • SparkSession.read.文件格式方法(该格式的文件路径)
    • 3、兼容hive
      • sparksql可以支持hivesql的语法,使用sparksql来操作hivesql
    • 4、支持行业标准的数据库连接
      • sparksql支持jdbc或者是odbc来连接上数据库

    4、DataFrame

    4.1 dataFrame是什么

    	dataFrame它的前身是schmeRDD,schemaRDD是直接继承自RDD,在spark1.3.0之后把schemaRDD改为DataFrame,它不在直接继承自RDD,而是自己实现了RDD的一些方法。
    
    	在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化
    

    4.2 RDD与DataFrame优缺点对比

    • RDD

      • 优点
        • 1、编译时类型安全
          • 通过RDD开发程序,在编译的时候会进行类型检查
        • 2、具有面向对象编程的风格
      • 缺点
        • 1、序列化和反序列化性能开销很大
          • 数据在跨进程进行网络传输的时候,需要先把数据内容本身和数据的结构信息进行序列化,后期通过反序列化来恢复得到该对象。
        • 2、构建大量的对象会带来频繁的GC
          • 对象的创建一般是使用heap堆中的内存去存储,如果内存空间不足需要进行gc,把一些不是进行使用的对象清除掉,来腾出更多的内存空间。不断进行gc的后果导致线程任务不断的暂停,任务执行的效率是比较低。
    • dataFrame

      • dataFrame它引入了schema和off-heap概念

      • 优点

        • 1、dataFrame由于引入了schema(数据的结构信息–元数据信息),这个时候数据在进行网络传输的时候只需要序列化数据本身就可以了,对于数据的结构信息可以省略掉。它是解决了rdd序列化和反序列化性能开销很大这个缺点

        • 2、dataFrame由于引入了off-heap(直接使用操作系统层面上的内存,不在使用堆中的内存)

          • 这里大量的对象创建就不在heap堆中,直接使用操作系统层面上的内存,后期就可以保证堆中空间是比较充足,就不会导致频繁的gc。它是解决了RDD构建大量的对象会带来频繁的GC
      • 缺点

        • dataFrame引入了schema和off-heap分别解决了rdd的缺点,同时它丢失了rdd的优点。
          • 1、编译时类型不安全
          • 2、不具备面向对象编程的风格

    5、DataSet

    5.1 dataSet是什么

    	DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。DataSet是在Spark1.6中添加的新的接口。
    

    5.2 dataFrame和dataSet互相转换

    • 1、dataFrame转换成dataSet
      • val ds=df.as[强类型]
      • val ds=df.as[String]
    • 2、dataSet转换成dataFrame
      • val df=ds.toDF
    • 补充:
      • 可以把dataFrame和dataSet调用rdd这个方法,获取得到一个rdd。

    5.3 创建DataSet

    • 1、通过sparkSession调用方法createDataset构建

      val ds=spark.createDataset(List(1,2,3,4))
      val ds=spark.createDataset(sc.textFile("/person.txt"))
      
    • 2、通过一个rdd转换生成一个dataSet

    val ds=sc.textFile("/person.txt").toDS
    
    • 3、通过一个dataFrame转换生成一个dataSet

      val ds=df.as[类型]
      
    • 4、通过一个已经存在dataSet转换生成一个新的dataSet

      ds.map(x=>x+" beijing").show
      

    6、基于IDEA开发代码将rdd转换成dataFrame

    • 1、引入依赖

              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.11</artifactId>
                  <version>2.1.3</version>
              </dependency>
      

    6.1 利用反射机制

    • 1、代码开发
    package cn.itcast.sparksql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    //todo:实现将rdd转换成dataFrame,利用了反射机制(定义一个样例类)
    
    case class Person(id:Int,name:String,age:Int)
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
            //1、创建SparkSession
            val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
    
           //2、创建SparkContext
              val sc: SparkContext = spark.sparkContext
              sc.setLogLevel("warn")
    
          //3、读取数据文件
            val rdd1: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(_.split(" "))
    
        //4、将rdd1与样例类进行关联
            val personRDD: RDD[Person] = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
    
        //5、将RDD转换成dataFrame
            //手动导入隐式转换
            import  spark.implicits._
            val personDF: DataFrame = personRDD.toDF
    
        //6、操作
          //-------------------DSL风格语法-----------------start
           //打印schema
           personDF.printSchema()
          //展示数据,默认展示前20条数据
           personDF.show()      //name:zhangsanxxxxxxxxxxxxxxxxxxxxxxxx
          //展示第一个数据
          println(personDF.first())
          personDF.head(3).foreach(println)
    
          //获取name字段对应的数据
          personDF.select("name").show()
          personDF.select($"name").show()
          personDF.select(new Column("name")).show()
    
        //查询多个字段的结果数据
         personDF.select("name","id","age").show()
    
        //实现age+1操作
        personDF.select($"age"+1).show()
    
        //过滤出年龄大于30的用户信息
          personDF.filter($"age" >30).show()
          println(personDF.filter($"age" >30).count())
    
        //按照年龄进行分组统计
          personDF.groupBy("age").count().show()
    
        //-------------------DSL风格语法-----------------end
    
    
        //-------------------SQL风格语法-----------------start
          personDF.createTempView("person")
          spark.sql("select * from person").show()
          spark.sql("select * from person where age >30").show()
          spark.sql("select * from person  where id=6").show()
          spark.sql("select count(*) from person group by age ").show()
    
        //-------------------SQL风格语法-----------------end
         sc.stop()
         spark.stop()
      }
    }
    

    展开全文
  • sparkSQL

    2020-03-01 20:32:29
    sparksql前身是shark,shark基本是全抄了hive,问题就是优化啥的,hive人家是mapreduce的进程级并行,我们这spark数据抽象是RDD是线程级并行,所以shark执行优化依赖与hive跟我们方向就错了,而且导致了shark兼容...

    sparksql前身是shark,shark基本是全抄了hive,问题就是优化啥的,hive人家是mapreduce的进程级并行,我们这spark数据抽象是RDD是线程级并行,所以shark执行优化依赖与hive跟我们方向就错了,而且导致了shark兼容hive时出现了线程安全问题,shark又开发了一套独立维护的打了补丁的hive源码分支,累的不行,所以spark直接新开发了这个sparksql,基于spark来做,Hive on Spark依然存在,
    Spark SQL在兼容hive仅依赖于hql解析,hive元数据,换句话说,就是hql被解析成AST抽象语法树后,Sparksql就接手了,SparkSQL的执行计划和优化都有catalyst(函数是关系查询优化框架)负责
    SparkSQL增加了DataFrame,数据可以来自于RDD,也可以来自Hive、HDFS、Cassandra等外部数据源,也可以json,支持三种语言JAVA python scala
    Spark2.0以上版本,Spark使用全新的SparkSession接口来替代SparkContext以及HiveContext接口

    展开全文
  • SparkSql

    2019-12-13 15:45:34
    目录(SparkSql)本质(是什么)(我在试着讲明白)作用(干什么)(我在试着讲明白)架构(有什么)(我在试着讲明白)Spark SQL由core,catalyst,hive和hive-thriftserver4个部分组成。1.Catalyst执行优化器UDFUDAF开窗...

    本质(是什么)(我在试着讲明白)

    SparkSql 的官网是: http://spark.apache.org/sql/
    sparkSql 是 spark on hive 一种基于 内存 的 交互式查询 应用服务。
    spark 负责 解析优化、执行引擎。hive 负责存储
    sparkSql 底层是 DataFrame(可以理解成一个二维表格,有数据、列的schema信息)

    作用(干什么)(我在试着讲明白)

    hive :将hive sql 转换成 mr,减少了mr查询的复杂性,但慢!
    使spark脱离hive,不依赖hive 的解析优化
    sparkSql 用来处理结构化的一个模块。提供一个抽象的数据集DataFrame,作为分布式Sql查询引擎的应用。

    架构(有什么)(我在试着讲明白)

    Spark SQL由core,catalyst,hive和hive-thriftserver4个部分组成。

    core: 负责处理数据的输入/输出,从不同的数据源获取数据(如RDD,Parquet文件和JSON文件等),然后将结果查询结果输出成Data Frame。
    catalyst: 负责处理查询语句的整个处理过程,包括解析,绑定,优化,生成物理计划等。
    hive: 负责对hive数据的处理。
    hive-thriftserver:提供client和JDBC/ODBC等接口。
    而sparksql的查询优化器是catalyst,它负责处理查询语句的解析,绑定,优化和生成物理执行计划等过程,catalyst是sparksql最核心部分。

    sparkSql是spark Core之上的一个模块,sql最终都通过Catalyst翻译成类似的spark程序代码被sparkCore执行,过程也有 job、stage、task的概念
    在这里插入图片描述

    1.Catalyst执行优化器

    1.1 Catalyst最主要的数据结构是树,所有的SQL语句都会用树结构来存储,树中的每个节点都有一个类,以及0或多个子节点。Scala中定义的新的节点类型都是TreeNode这个类的子类,这些对象是不可变的。

    1.2 Catalyst另外一个重要的概念是规则,基本上,所有的优化都是基于规则的。

    1.3 执行过程

    1 分析阶段

    分析逻辑树,解决引用

    使用Catalyst规则和Catalog对象来跟踪所有数据源中的表,以解决所有未辨识的属性

    2 逻辑优化

    3 物理计划

    Catalyst会生成很多计划,并基于成本进行对比

    接受一个逻辑计划作为输入,生产一个或多个物理计划

    4 代码生成

    将Spark SQL代码编译成Java字节码

    UDF

    user defined function,用户自定义函数
    用法:
    java:sqlContext.udf().register(“StrLen”, new UDF1<String,Integer>()) 传几个参数,要实现对应的UDFXX接口,最多支持22个

    UDAF

    user defined aggregate function,用户自定义聚合函数
    count,avg,sum,min,max… 特点:多对一,select name ,count(*) from table group by name
    是聚合函数,要继承 UserDefinedAggregateFunction() 实现8个方法,最重要三个方法
    initialize
    update
    merge
    在这里插入图片描述

    开窗函数

    优缺点(我在试着讲明白)

    优点

    意整性
    统一的数据访问方式
    兼容hive
    提供了统一的数据连接方式(JDBC/ODBC)

    缺点

    流程(怎么运作)(我在试着讲明白)

    Sql运行流程

    在这里插入图片描述
    1 . 语法和词法解析:对写入的sql语句进行词法和语法解析(parse),分辨出sql语句在哪些是关键词(如select ,from 和where),哪些是表达式,哪些是projection ,哪些是datasource等,判断SQL语法是否规范,并形成逻辑计划。

    2 .绑定:将SQL语句和数据库的数据字典(列,表,视图等)进行绑定(bind),如果相关的projection和datasource等都在的话,则表示这个SQL语句是可以执行的。

    3 .优化(optimize):一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划。

    4 .执行(execute):执行前面的步骤获取最有执行计划,返回查询的数据集。

    sparkSql 运行原理分析

    1.使用SesstionCatalog保存元数据

    在解析sql语句前需要初始化sqlcontext,它定义sparksql上下文,在输入sql语句前会加载SesstionCatalog,初始化sqlcontext时会把元数据保存在SesstionCatalog中,包括库名,表名,字段,字段类型等。这些数据将在解析未绑定的逻辑计划上使用。

    2.使用Antlr生成未绑定的逻辑计划

    Spark2.0版本起使用Antlr进行词法和语法解析,Antlr会构建一个按照关键字生成的语法树,也就是生成的未绑定的逻辑计划。

    3.使用Analyzer绑定逻辑计划

    在这个阶段Analyzer 使用Analysis Rules,结合SessionCatalog元数据,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划。

    4.使用Optimizer优化逻辑计划

    Opetimize(优化器)的实现和处理方式同Analyzer类似,在该类中定义一系列Rule,利用这些Rule对逻辑计划和Expression进行迭代处理,达到树的节点的合并和优化。

    5.使用SparkPlanner生成可执行计划的物理计划

    SparkPlanner使用Planning Strategies对优化的逻辑计划进行转化,生成可执行的物理计划。

    6.使用QueryExecution执行物理计划

    常用(必会)(我在试着讲明白)

    创建DataFrame的方式

    1. 读取json格式的文件
    2. 读取json格式的RDD 、DataSet
    3. 读取RDD创建DataFrame
    4. 读取parquet格式数据加载DataFrame
    5. 读取Mysql找那个的数据加载成DataFrame
    6. 读取Hive中数据加载DataFrame

    保存DataFrame的目标源

    1. parquet文件
    2. Mysql表中
    3. Hive表中

    常见问题(必知)(我在试着讲明白)

    RDD和DataFrame的区别

    在这里插入图片描述

    配置 Spark on Hive

    配置 Spark on Hive
    1.在客户端 …/conf/中创建hive-site.xml,让SparkSQL找到Hive原数据

    	<configuration>
    	<property>
    	<name>hive.metastore.uris</name>
    	<value>thrift://node1:9083</value>
    	</property>
    	</configuration>
    

    2.在Hive的服务端启动metaStore 服务 : hive --service metastore &
    3.使用spark-shell 测试 速度

    异议

    有差错或者需要补充的地方,还望大家评论指出,并详细论证,相互学习,共同进步哈!

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 10,945
精华内容 4,378
关键字:

sparksql