精华内容
参与话题
问答
  • Spark SQL 简单使用

    2018-07-09 06:42:03
    环境:scala 版本2.11.8,spark 版本2.0.1,使用 Intellij IDEA 来开发。 准备工作:创建maven项目可以从官网上找到我们建项目时使用的 archetype 至于具体怎么创建项目,请参考一个朋友的文章Intellij IDEA 创建 ...

    环境:scala 版本2.11.8,spark 版本2.0.1,使用 Intellij IDEA 来开发。 
    准备工作:

    创建maven项目

    可以从官网上找到我们建项目时使用的 archetype 
    这里写图片描述 
    至于具体怎么创建项目,请参考一个朋友的文章Intellij IDEA 创建 spark/scala 项目 
    这个是前一段时间发现的一个朋友,强烈推荐大家去转转。 
    好了,这样一来就默认大家创建好了项目…

    1 第一个例子

    1.1 创建 SparkSession

    这里写图片描述
    官方文档如是说。 
    那么我们可以按照这个例子来写。

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
                .builder()
                .appName("sql test")
                .master("local")
                .getOrCreate()
    
    import spark.implicits._            
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    1.2 创建 DataFrames

    这里写图片描述

    //创建dataframe
    val df = spark.read.json("C:\\Users\\Administrator\\Desktop\\people.json")
    
    df.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    就像官网写的那样,我们可以调用 show() 方法来打印出 df 的数据。这里我是把官网上的示例给放到了指定的目录。当然,我们也可以自己创建一个 json 文件,格式如下:

    {"name":"Signal"}
    {"name":"May j Lee","age":20}
    {"name":"Jay Chou","age":36}
    {"name":"Jack Chen","age":60}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当然,还有一些其他操作,我就不一一敲了,官网上给出的示例非常详细。用到类似的了就去官网上查… 
    这里写图片描述

    我们还可以使用 SQL 语句来操作: 
    这里写图片描述

    不过在我们使用 SQL 进行操作之前,需要使用 createOrReplaceTempView() 方法,熟悉 SQL 语句人肯定都知道”视图(view)“,接下来这个就是我们要操作的对象。

    df.createOrReplaceTempView("people")
    val sqlDF = spark.sql("select * from people")
    sqlDF.show()
    • 1
    • 2
    • 3

    2 第二个例子

    在第一个例子中,我们是根据一个 json 文件进行了一系列的操作,接下来我们是不是可以创建一个呢?

    这里我们需要使用到的对象是 DataSets

    2.1 创建 DataSets

    这里写图片描述

    我们还是参照官网的例子来写:

    def createDataSetsTest(spark:SparkSession): Unit ={
      import spark.implicits._
    
      //创建 DataSet
      val caseClassDS = Seq(Person("Jack",80),Person("Rose",76)).toDS()
      caseClassDS.show()
    
      //保存
      caseClassDS.write.mode(SaveMode.Overwrite).json("C:\\Users\\Administrator\\Desktop\\SIGNAL-TEMP\\person.json")
    
      val primitiveDS = Seq(1,2,3).toDS()
      val ds = primitiveDS.map(_+1).collect()
      ds.foreach(println)
    
      val path = "C:\\Users\\Administrator\\Desktop\\SIGNAL-TEMP\\person.json"
      val personDS = spark.read.json(path).as[Person]
    
      personDS.show()
    
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    接下我们可以调用这个 createDataSets(…) 来看看到底发生了些什么事情。

    object SqlJson {
      case class Person(name:String,age:Long)
    
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("spark json").master("local").getOrCreate()
    
        import spark.implicits._
    
        createDataFrameTest(spark)
      }
    }  
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里写图片描述

    于是就像图片上那样,我们成功保存了 json 对象,并且我们还将它输出到了控制台。

    这里写图片描述

    这里大家可能疑惑的就是 SavaMode 了,别担心,官网有: 
    这里写图片描述

    说起来保存,我们还可以保存为 parquet。 
    这里写图片描述

    ...
    //保存
    df.write.mode(SaveMode.Overwrite).parquet("C:\\Users\\Administrator\\Desktop\\SIGNAL-TEMP\\sparkSql.parquet")
    
    //读取
    val parquetFile = spark.read.parquet("C:\\Users\\Administrator\\Desktop\\SIGNAL-TEMP\\sparkSql.parquet")
    
    parquetFile.where($"key"===1).select($"key",$"value".as("v")).collect().foreach(println)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    同样的,会在我们指定的路径下新建一个文件夹: 
    这里写图片描述

    但是对于这个文件就没有那么友好了,我们看不懂^@^

    3 Spark 与 Hive

    我们可以通过 Spark 在 Hive 中创建一张表,并且导入数据。 
    这次我们需要将代码打成 jar 包,放到集群下去运行。

    package spark.connect
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * Created by SIGNAL on 2016/11/28.
      */
    object HiveTest {
      val spark = SparkSession.builder().appName("Spark connect to Hive").enableHiveSupport().getOrCreate()
    
      def main(args: Array[String]) {
        import spark.implicits._
        import spark.sql
    
        val createTable = "create table if not exists spark_hive_test(col1 int,col2 string,col3 string) row format delimited fields terminated by '|'"
        spark.sql(createTable)
        sql("load data local inpath '/home/signal/spark_hive_test.txt' overwrite into table spark_hive_test")
    
        val df = sql("select * from spark_hive_test")
    
        df.show()
    
        spark.stop()
    
      }
    
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    Intellij IDEA 打 jar 包比较麻烦,不如 Eclipse 来的顺溜。 
    Ctrl + Alt + Shift + s 之后,出现 Project Structure,然后如图,然后自己倒腾… 
    这里写图片描述

    …于是经过一番折腾之后,我以为大家 jar 包打好了。 
    然后我们可以发送到Linux上,方便操作就放到 master 节点上的 /homa/signal 目录下了: 
    这里写图片描述

    大家还记得 hive 的建表语法吧,记得我们在建表时指定的分割符是什么不? 
    这里写图片描述

    可以看到我们指定的分割符是”|“,而且,导入到 hive 中的时候我们指定了文件名和文件的路径,也就我们当前所在的 /home/signal,接下来我们需要在 /home/siganl 目录下创建一个 spark_hive_test.txt 的文件,文件格式如下:

    [root@master signal]# vi spark_hive_test.txt 
    4|Signal|male
    5|Kevin|male
    6|Max|female
    
    "spark_hive_test.txt" 3L, 40C written
    [root@master signal]# 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这一系列操作完了之后,我们就可以运行 spark-submit 了。但是,我们需要确保的是 Mysql 的服务是启动的。

    [root@master signal]# service mysql status
     SUCCESS! MySQL running (1678)
    
    [root@master signal]# 
    • 1
    • 2
    • 3
    • 4

    那么我们就”提交“吧: 
    这里写图片描述

    [root@master signal]# spark-submit --class spark.connect.HiveTest --master[1] SparkHive.jar
    
    ...
    +----+------+------+
    |col1|  col2|  col3|
    +----+------+------+
    |   4|Signal|  male|
    |   5| Kevin|  male|
    |   6|   Max|female|
    +----+------+------+
    
    16/11/28 19:46:35 INFO SparkUI: Stopped Spark web UI at http://192.168.38.129:4040
    16/11/28 19:46:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    16/11/28 19:46:35 INFO MemoryStore: MemoryStore cleared
    16/11/28 19:46:35 INFO BlockManager: BlockManager stopped
    16/11/28 19:46:35 INFO BlockManagerMaster: BlockManagerMaster stopped
    16/11/28 19:46:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    16/11/28 19:46:35 INFO SparkContext: Successfully stopped SparkContext
    16/11/28 19:46:35 INFO ShutdownHookManager: Shutdown hook called
    16/11/28 19:46:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-59c859cc-611b-4aac-adce-567eb8376ffa
    [root@master signal]# 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    控制台是正常输出了,我们可以到 Hive 中看看是否真的创建成功了。 
    启动 hiveserver2,然后使用 beeline 访问:

    [root@master bin]# hiveserver2 & > /dev/null 2>&1 
    [1] 7712
    
    [root@master bin]# beeline -u jdbc:hive2://master:10000 -n root
    ...
    Connected to: Apache Hive (version 2.1.0)
    Driver: Hive JDBC (version 2.1.0)
    16/11/28 19:56:12 [main]: WARN jdbc.HiveConnection: Request to set autoCommit to false; Hive does not support autoCommit=false.
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    Beeline version 2.1.0 by Apache Hive
    0: jdbc:hive2://master:10000>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里写图片描述

    我们可以看到,在 default 下是有 spark_hive_test 这张表的。

    0: jdbc:hive2://master:10000> use default;
    OK
    No rows affected (0.243 seconds)
    
    0: jdbc:hive2://master:10000> select * from spark_hive_test;
    OK
    +-----------------------+-----------------------+-----------------------+--+
    | spark_hive_test.col1  | spark_hive_test.col2  | spark_hive_test.col3  |
    +-----------------------+-----------------------+-----------------------+--+
    | 4                     | Signal                | male                  |
    | 5                     | Kevin                 | male                  |
    | 6                     | Max                   | female                |
    +-----------------------+-----------------------+-----------------------+--+
    3 rows selected (2.319 seconds)
    0: jdbc:hive2://master:10000> 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    真的就像我们在控制台里看到的那样,我们的hive中有了那张表,并且数据也加载进去了。

    4 Spark 与 HBase

    package spark.connect
    
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by SIGNAL on 2016/11/28.
      */
    object HBaseTest {
      val conf = new SparkConf().setAppName("Spark connect to HBase").setMaster("local")
      val sc = new SparkContext(conf)
    
    
      def main(args: Array[String]) {
        val hconf = HBaseConfiguration.create()
        hconf.set(TableInputFormat.INPUT_TABLE,"student")
    
        val hbaseRDD = sc.newAPIHadoopRDD(hconf,classOf[TableInputFormat]
          ,classOf[ImmutableBytesWritable]
          ,classOf[Result])
    
        val rowRDD = hbaseRDD.map(x =>(Bytes.toString(x._1.get()), Bytes.toString(x._2.getValue("i".getBytes,"name".getBytes()))))
    
        println(hbaseRDD.count())
    
        rowRDD.foreach(println _)
    
        sc.stop()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    这个就没什么好说的了,跟我们使用 java 访问 HBase 差不太多,只不过我们这次使用的是 scala 。 
    确保我们的 hbase 启动,我们就可以在 Intellij IDEA 中运行了。 
    这里写图片描述

    然后运行,我们会在控制台上看到 student 表中 i 列族下的 name 一列的数据了: 
    这里写图片描述

    当然,我们有好多种运行方式,可以放到集群中去运行,还可以在 yarn 下运行。大家可以参照官网上 spark-submit 来操作,就不一一演示了。

    5 Spark 与 Mysql

    package spark.connect
    
    import java.sql.{Connection, DriverManager}
    import java.util.Properties
    
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by SIGNAL on 2016/11/28.
      */
    object MysqlTest {
      val conf = new SparkConf().setAppName("Spark connect to Mysql").setMaster("local")
      val sc = new SparkContext(conf)
    
      def createConnection(): Connection ={
        Class.forName("com.mysql.jdbc.Driver").newInstance()
    //    DriverManager.getConnection("jdbc:mysql://localhost:3306/test?user=root&password=signal")
        DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","signal")
    
      }
    
      def formMysql(): Unit ={
        val spark = SparkSession.builder.appName("spark connect to mysql").master("local").getOrCreate()
    
        import spark.implicits._
    
        val fromrd = spark.read.jdbc("jdbc:mysql://localhost:3306/gov?user=root&password=signal","app_user",new Properties())
        fromrd.show()
      }
    
      def main(args: Array[String]): Unit = {
        val mysqlData = new JdbcRDD(sc,createConnection,"select * from tb_user where user_id >= ? and user_id <= ? ",0,1000,3,r=>r)
    
        mysqlData.map(r => (r.getLong("user_id"),r.getInt("age"),r.getString("user_name") )).foreach(println _)
    
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    这个跟 HBase 一样,都是比较简单的操作,但是我只是从数据库中读取数据,往数据库中写数据都差不多,看看 API 很快就会上手,也就不写了。

    这么一来,Spark SQL 也就介绍的差不多了,大家还有看不懂的就去看官方文档吧,多看看,对着敲几遍代码,去 API 里看看方法。

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/M_SIGNALs/article/details/53380786
    展开全文
  • SparkSQL(一)

    千次阅读 2018-12-12 00:37:02
    Shark是基于spark框架并且兼容hive,执行SQL执行引擎,因为底层使用了Spark,比MR的Hive普遍要快上两倍左右,当数据全部load到内存中,此时会比Hive快上10倍以上,SparkSQL就是一种交互式查询应用服务 特点 1.内存列...

    简介

    spark1.0版本就已经退出SparkSQL最早叫shark
    Shark是基于spark框架并且兼容hive,执行SQL执行引擎,因为底层使用了Spark,比MR的Hive普遍要快上两倍左右,当数据全部load到内存中,此时会比Hive快上10倍以上,SparkSQL就是一种交互式查询应用服务

    特点

    1.内存列存储–可以大大优化内存的使用率,减少内存消耗,避免GC对大量数据性能的开销
    2.字节码生成技术–可以使用动态 的字节码技术优化性能
    3.Scala代码的优化
    SparkSql 的官网是: http://spark.apache.org/sql/
    在这里插入图片描述
    在这里插入图片描述
    SparkSQL是spark用来处理结构化的一个模块,它提供一个抽象的数据集DataFrame,并且是作为分布式SQL查询引擎的应用。

    为什么要学习SparkSQL?

    之前已经学习了hive,它将HiveSQL转换成MR,然后提交到集群上去执行,减少编写MR查询的复杂性,但是因为采用计算框架,所以执行效率比较慢,所以spark就应运而生。
    在这里插入图片描述

    1.易整合
    2.统一的数据访问方式
    3.兼容hive
    4.提供了统一的数据连接方式(JDBC/ODBC)

    DataFrames

    与RDD类型,DataFrame也是是一个分布式数据容器,然而DataFrame更像传统数据库中二维表格
    除了记录数据之外,还记数据的结构信息(schema),同时与Hive类型,DataFrame也支持嵌套数据类型(struct,map,array),从API易用角度来看,DataFrame提供更高级的API,比函数RDDAPI更加友好。

    RDD和DataFrame的区别

    在这里插入图片描述

    创建DataFrames

    1)spark-shell版本
    spark中已经创建好了SparkContext和SQLContext对象
    2)代码
    spark-shell --master spark://hadoop1:7077 --executor-memory 512m --total-executor-cores 2
    //创建了一个数据集,实现了并行化
    val seq= Seq((“1”,“xiaoming”,15),(“2”,“xiaohong”,20),(“3”,“xiaobi”,10))
    在这里插入图片描述
    在这里插入图片描述

    将当前的rdd对象转换为DataFrame对象(数据信息和数据结构信息存储到DataFrame)
    //_1:string,_2:string,3:int
    //在使用toDF进行转换的时候,空参的情况下。默认是
    +数据 作为列名,数字从1开始逐渐递增
    在这里插入图片描述

    在这里插入图片描述
    _1:列名,String当前列的数据类型
    //查看数据 show 算子来打印,show是一个action类型 算子
    df.show
    在这里插入图片描述
    在这里插入图片描述

    DSL 风格语法

    1.查询:

    df.select("name").show
    df.select("name","age")..show
    //条件过滤
    df.select("name","age").filter("age >10").show
    //参数必须是一个字符串,filter中的表达式也需要时一个字符串
    

    在这里插入图片描述
    //2.参数是类名col (“列名”)
    df.select(“name”,“age”).filter(col(“age”) >10).show
    在这里插入图片描述
    //3.分组统计个数

    df.groupBy("age").count().show()
    

    在这里插入图片描述
    //4.打印DataFrame结构信息

    df.printSchema
    

    在这里插入图片描述

    在这里插入图片描述

    Sql 风格语法:
    1.将DataFrame注册成表(临时表),表会被存储
    df.registerTempTable(“t_person”)
    在这里插入图片描述
    查询语法:需要通过SQLContext对象调用sql方法写入sql语句(两种)
    sqlContext.sql(“select name,age from t_person where age > 10”).show
    在这里插入图片描述
    sqlContext.sql(“select name,age from t_person order by age desc limit 2”).show
    在这里插入图片描述

    Hive中orderby和sortby的区别?

    1.使用orderby全局排序
    2.使用distribute和sort进行分组排序
    3.distribute by + sort by 通过当前distribute by 设定字段为key,数据会被hash到不同reduce机器上
    4.然后同sort by 会对同一个reduce机器上的数据进行局部排序

    5.orderby 是全局有序 distribute sort by :局部有序,全局无序

    结构表信息
    sqlContext.sql(“desc t_person”).show

    以编码的形式来执行sparkSQL

    先将工程中的maven添加配置

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>
    

    第一种通过反射方式推断

    SparkSQLDemo1.scala

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SQLContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * sparkSQL --就是查询
      */
    object SparkSQLDemo1 {
      def main(args: Array[String]): Unit = {
        //之前在spark-shell中,sparkContext和SQLContext是创建好的 所以不需要创建
        //因为是代码编程,需要进行创建
        val conf = new SparkConf().setAppName("SparkSQLDemo1").setMaster("local")
        val sc  =new SparkContext(conf)
        //创建SQLContext对象
        val sqlc = new SQLContext(sc)
        //集群中获取数据生成RDD
         val lineRDD: RDD[Array[String]] = sc.textFile("hdfs://hadoop2:8020/Person.txt").map(_.split(" "))
        //lineRDD.foreach(x => println(x.toList))
    
        //将获取数据 关联到样例类中
        val personRDD: RDD[Person] = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))
        import sqlc.implicits._
        //toDF相当于反射,这里若要使用的话,需要导入包
        /**
          * DataFrame [_1:int,_2:String,_3:Int]
          * spark-shell 数据是一个自己生成并行化数据并没有使用样例类来 存数据而是直接使用
          * 直接调用toDF的时候,使用就是默认列名 _+数字  数字从1开始逐渐递增
          * 可以在调用toDF方法的时候指定类的名称(指定名称多余数据会报错)
          *
          * 列名不要多余,也不要少于
          * 也就是说列名要和数据一一对应
          *
          * 使用代码编程数据是存储到样例类中,样例类中的构造方法中的参数就是对应的列名
          * 所以通过toDF可以直接获取对应的属性名作为列名使用
          * 同时也可以自定义列名
          *
          */
        val personDF: DataFrame = personRDD.toDF()
    //val personDF: DataFrame = personRDD.toDF("ID","NAME","AGE")
        personDF.show()
    
        //使用Sql语法
        //注册临时表,这个表相当于存储在 SQLContext中所创建对象中
        personDF.registerTempTable("t_person")
        val sql = "select  * from t_person where age > 20 order by age"
        //查询
        val res = sqlc.sql(sql)
        //  def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
        //默认打印是20行
        res.show()
    
        //固化数据
        //将数据写到文件中mode是以什么形式写  写成什么文件
        /**
          * def mode(saveMode: String): DataFrameWriter = {
          *     this.mode = saveMode.toLowerCase match {
          * case "overwrite" => SaveMode.Overwrite  -复写
          * case "append" => SaveMode.Append -- 追加
          * case "ignore" => SaveMode.Ignore
          * case "error" | "default" => SaveMode.ErrorIfExists
          * case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
          * "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
          *
          */
    //    res.write.mode("append").json("out3")
    //    hdfs://hadoop2:8020/out111")
        //除了这两种还可以csv模式,json模式
        //csv在 1.6.3 spark中需要第三方插件,才能使用能使用,,,,2.0之后自动集成
        //这个方法不要使用因为在2.0会被删除
        res.write.mode("append").save("hdfs://hadoop2:8020/out111")
      }
    case class Person(id:Int,name:String,age:Int)
    }
    
    

    第二通过StructType

    SparkSQLStructTypeDemo.scala

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkSQLStructTypeDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkSQLStructTypeDemo").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlcontext = new SQLContext(sc)
    
        //获取数据并拆分
        val lineRDD =  sc.textFile("hdfs://hadoop2:8020/Person.txt").map(_.split(" "))
        //创建StructType对象  封装了数据结构(类似于表的结构)
        val structType: StructType = StructType {
          List(
            //列名   数据类型 是否可以为空值
            StructField("id", IntegerType, false),
            StructField("name", StringType, true),
            StructField("name", IntegerType, false)
    
            //列需要和数据对应,但是StructType这种可以:
            /**
              * 列的数据大于数据,所对应列的值应该是null
              * 列数是不能小于数据,不然会抛出异常
              *  StructField("oop", IntegerType, false)
              *   StructField("poo", IntegerType, false)
              */
          )
        }
        //将数据进行一个映射操作
        val rowRDD: RDD[Row] = lineRDD.map(arr => Row(arr(0).toInt,arr(1),arr(2).toInt))
        //将RDD转换为DataFrame
        val personDF: DataFrame = sqlcontext.createDataFrame(rowRDD,structType)
        personDF.show()
      }
    }
    

    1.将当前程序打包操作提交到集群,需要做 一定的更改 ,注意path路径 修改为 args(下标)

    模式
    spark-submit
    –class 类名(类的全限定名(包名+类名))
    –master spark://集群:7077
    /root/jar包路径
    输入数据路径
    输出路径数据

    2.查看运行结果(多个文件的情况下)
    hdfs dfs -cat /输入文件路径/part-r-* //这个代表查看多个文件

    JDBC数据源

    SparkSql可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,在通过对DataFrame的一系列操作,还可以将数据写到关系型数据库中

    使用spark-shell
    必须执行mysql的连接驱动jar

     spark-shell --master spark://hadoop1:7077 --executor-memory 512m --total-executor-cores 2 --jars /root/mysql-connector-java-5.1.32.jar --driver-class-path /root/mysql-connector-java-5.1.32.jar
    

    将数据写入到Mysql中

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object DataFormeInputJDBC {
    /*  def createSC(AppName:String,Master:String):SparkContext = {
    
      }
      def createSC(AppName:String,Master:String,sc:SparkContext):SQLContext = {
    
      }*/
      def main(args: Array[String]): Unit = {
        val conf = new  SparkConf().setAppName("DataFormeInputJDBC").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        //获取数据拆分
        val lines = sc.textFile("hdfs://hadoop1:8020/Person.txt").map(_.split(" "))
    
        // StructType 存的表结构
        val structType: StructType = StructType(Array(StructField("id", IntegerType, false),
          StructField("name", StringType, true),
          StructField("age", IntegerType, true)))
        //开始映射
        val rowRDD: RDD[Row] = lines.map(arr => Row(arr(0).toInt,arr(1),arr(2).toInt))
        //将当前RDD转换为DataFrame
        val personDF: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
    
        //创建一个用于写入mysql配置信息
        val prop = new Properties()
        prop.put("user","root")
        prop.put("password","123")
        prop.put("driver","com.mysql.jdbc.Driver")
        //提供mysql的URL
        val jdbcurl = "jdbc:mysql://hadoop1:3306/mydb1"
    
        //表名
        val table = "person"
        //数据库要对,表若不存在会自动创建并存储
        //需要将数据写入到jdbc
        //propertities的实现是HashTable
        personDF.write.mode("append").jdbc(jdbcurl,table,prop)
      println("插入数据成功")
        sc.stop()
      }
    }
    

    HIVE-on-Spark

    hive底层是通过MR进行计算,将其改变为SparkCore来执行

    配置步骤
    1.在不是高可用集群的前提下,只需要将Hadoop安装目录中的core-site.xml拷贝到spark的配置conf文件目录下即可
    2.将hive安装路径下的hive-site.xml拷贝到spark的配置conf配置文件目录下即可
    注意
    若是高可用:需要将hadoop安装路径下的core-site,xml和hdfs-site.xml拷到spark的conf目录下

    操作完成后建议重启集群
    通过sparksql来操作,需要在spark安装路径中sbin目录

    启动:
    spark-sql  \
    --master spark://hadoop1:7077 \
    --executor-memory 512m  \
    --total-executor-cores 2 \
    --jars /root/mysql-connector-java-5.1.32.jar \
    --driver-class-path  /root/mysql-connector-java-5.1.32.jar
    基本操作:
    1.创建表:
    create table person1(id int,name string,age int)row format delimited fields terminated by ' '
    2.加载数据:(本地加载)
    load data local inpath '/root/Person.txt' into table person1;
    3.查询:
    select * from person1;
    select name,age from person where age > 20 order by age;
    4.删除
    drop table person
    

    内部表和外部表

    表没有被external修饰的 都是内部表,被修饰的就是外部表
    hive本身不能存储数,依托于HDFS。
    区别
    内部表存储数据被删除,同时会删除数据和原信息
    外部表存储数据被删除,仅仅会删除元数据,HDFS中存储的数据会被表留下来

    展开全文
  • SparkSQL,顾名思义,就是Spark生态体系中的构建在SparkCore基础之上的一个基于SQL的计算模块。SparkSQL的前身叫Shark,最开始的底层代码优化,sql的解析,执行引擎等等完全基于HIve,Shark的执行速度要比Hive高出一...

    SparkSQL

    一、SparkSQL的发展

    1.1、概述

    SparkSQL,顾名思义,就是Spark生态体系中的构建在SparkCore基础之上的一个基于SQL的计算模块。SparkSQL的前身叫Shark,最开始的底层代码优化,sql的解析,执行引擎等等完全基于HIve,Shark的执行速度要比Hive高出一个数量级,但是Hive的发展制约了Shark,所以在15年中旬的时候,Shark项目结束,重新独立出来一个项目,就是SparkSQL,不再依赖Hive,做了独立的发展,逐渐的形成两条互相独立的业务:Sparksql,和Hive-OnSpark。在SparkSQL发展的过程中,同时也吸收了Shark有些特点:基于内存的列存储,动态字节码优化技术

    1.2、特点

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    1.3、总结

    SparkSQL就是Spark生态体系中用于处理结构化数据的一个模块。结构化数据是什么?存储在关系型数据库中的数据,就是结构化数据;半结构化数据是什么?类似xml、json等的格式的数据被称之为半结构化数据;非结构化数据是什么?音频、视频、图片等为非结构化数据。

    换句话说,SparkSQL处理的就是二维表数据。

    二、SparkSQL的编程模型(即代码风格:DSL和SQL)

    2.1、编程模型简介

    主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

    - SQL

    ​ SQL不用多说,就和Hive操作一样,但是需要清楚一点的时候,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。

    ​ 同时支持,通用sql和hiveql。

    - DataFrame和Dataset

    ​ DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。

    ​ Dataset是在spark1.6.2开始出现出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。

    ​ 一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。

    2.2、RDD、DataFrame、DataSet的对比

    2.2.1、RDD

    弹性分布式数据集。可分区,不可变(只读),内部元素可以并行计算。是Spark对数据进行的一种抽象,RDD就是一种数据结构,里面包含了数据和操作数据的方法(可以片面的将RDD理解为集合)
    RDD的弹性:
    1、可以自动切换内存和磁盘(当内存不够时,存入磁盘)
    2、当某一个RDD的数据丢失时,可以通过血缘关系追溯到上一个RDD重新计算,不用从头计算
    3、当某一个任务或阶段执行失败后,会自动进行重试,默认4次
    4、RDD中的数据是分区的,分区的大小可以自由设置和细粒度调整
    RDD的分布式:
    RDD的数据可以存放在多个节点上
    数据集:
    就是一个存放数据的集合

    相对于与DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)

    2.2.2、DataFrame

    DataFrame:理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD),下面有图片很形象的表现出来了两者的差别:

    假设RDD中的两行数据长这样:
    在这里插入图片描述
    那么DataFrame中的数据长这样:
    在这里插入图片描述

    从上面两个图可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法:DataFrame API,下面会说到。

    有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快!

    2.2.3、DataSet

    相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束
    如下图:

    假设RDD中的两行数据长这样:
    在这里插入图片描述

    那么Dataset中的数据长这样:
    在这里插入图片描述

    或者长这样(每行数据是个Object):
    在这里插入图片描述

    使用Dataset API的程序,会经过Spark SQL的**优化器(Catalyst)**进行优化。

    目前仅支持Scala、Java API,尚未提供Python的API,相比DataFrame,Dataset还提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,很烦,这也是引入Dataset的一个重要原因。

    2.2.4、RDD、DataFrame、DataSet的对比

    1、RDD和DataFrame的比较
    RDD描述的是数据结构,以及提供了操作方式,并且具有弹性特点,也可以理解为RDD是一张二维表
    DataFrame以前叫SchemaRDD,从名字上就可以知道,比RDD多了一个Schema,而schema就是元数据
    元数据有表名,表头,字段,字段类型
    DataFrame易用性更好,底层可以自动优化。即使你写的sql比较复杂,运行速度也非常快
    2、RDD和DataSet的比较
    相同点:DataSet也引入了RDD的强类型推断,也是在RDD的每行数据加了类型约束
    不同点:DataSet还可以映射成java对象
    运行时,DataSet也会自动优化
    3、DataFrame与DataSet的比较
    相同点:都有schema
    不同点:DataFrame 没有 编译器检查机制
    DataSet 编译器检查机制

    三、SparkSQL的编程入口

    在SparkSQL中的编程模型,不在是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。
    SparkSession的构建需要依赖SparkConf或者SparkContext。
    使用工厂构建器(Builder方式)模式创建SparkSession。

    四、IDEA中编写SparkSQL的入口

    4.1、在IDEA中创建SparkSQL模块

    只要引入SparkSQL相关的依赖即可,如下:

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.3</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    

    4.2、下面是创建SparkSQL入口的语法:SQLContext,HiveContext,SparkSession

    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * sparksql的程序入口方法
     */
    object AppAccessDemo {
        def main(args: Array[String]): Unit = {
            //spark2.0以前,有两个,分别是sqlContext和hiveContext
            val conf: SparkConf = new SparkConf().setMaster("local").setAppName("test")
            val sc = new SparkContext(conf)
    
            //第一种:获取一个sqlContext
            val sqlContext = new SQLContext(sc)
            val df: DataFrame = sqlContext.read.json("data/a.json")
            df.show()
    
            //第二种:获取一个hiveContext
            val hiveContext = new HiveContext(sc)
            val frame: DataFrame = hiveContext.table("")
            frame.show()
    
            //第三种:使用spark2.0以后的SparkSession对象
            val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
            val df: DataFrame = spark.read.json("data/a.json")
            df.show()
            sc.stop()
    
        }
    }
    

    五、SparkSQL的基本编程

    5.1、SparkSession的创建方式

    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object _02SparkSessionCreateMethod {
        def main(args: Array[String]): Unit = {
            //第一种:
            val spark: SparkSession =SparkSession.builder().appName("test").master("local").getOrCreate()
            val df: DataFrame = spark.read.json("data/a.json")
            df.show()
    
            //第二种
            val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local")
            val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
            val df: DataFrame = spark.read.json("data/a.json")
            df.show()
    
            //第三种
            val spark: SparkSession = SparkSession.builder().appName("test")
                .master("local")
                .enableHiveSupport() //开启访问hive的支持
                .getOrCreate()
    
            spark.table("").show()
        }
    }
    
    

    5.2、基本编程

    package com.xxx.SparkSQL.Day01
    
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    object _03SparkSqlFirst {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate()
    
            //读取一个json文件,获得一个DataFrame对象,数据格式如下:
            // {"movie":"1193","rate":"5","datetime":"978300760","uid":"1"}
            val df: DataFrame = spark.read.json("data/a.json")
    
            //show():是一个行动算子,用于将结果打印到控制台,相当于sql的 select * from tableName
            //df.show()  //默认显示前20行
            //df.show(30,true)  指定要显示的行数,和字段的值的长度如果超出了30个字符是否要截断显示  就是一个
            //cell中的内容大于30个字符只显示30个字符,并且右对齐  ,false表示全部显示,左对齐
            df.show()
    
            //printSchema():此方法将显示二维表的结构(元数据:表明,表头,字段名,字段类型)
            df.printSchema()
            //也可以这样写,比较繁琐,不常用
            val schema: StructType = df.schema
            schema.printTreeString()
    
            //具体的一些查询
            df.select("movie","rate","uid").show()
    
            //导入SparkSession中的隐式转换操作,增强SQL的功能.注意:spark不是包名,而是sparkSeesion对象的名称
            import spark.implicits._
            //这时就可以使用变量了,变量可以用.的方式来调用方法
            df.select(($"movie").+(10),$"rate" +10,$"datetime").show()
            //还有一种字段的写法  使用new Column
            df.select(new Column("movie"),new Column("uid")).show()
    
            //起别名
            df.select($"movie" as("电影"),$"rate".as("评分"),$"datetime".as("时间")).show()
    
            //做聚合,统计
            df.select($"movie" as("电影")).groupBy("movie").count().show()
    
            //条件查询
            df.select($"movie" as("电影"),$"rate".as("评分"),$"datetime".as("时间"))
                .where("rate > 4 and datetime < 956354156")
                .show()
    
    
            /*
            上面的写法都是使用DataFrame或者DataSet,下面写SQL的写法
            要用SQL的写法必须事先将对应的数据映射为一张表才行,有四种映射方法
            df.createOrReplaceGlobalTempView():   global是全局的意思,表示整个spark程序中都可以访问到
            df.createGlobalTempView()                              没有global:仅当前任务中可以访问的
            df.createOrReplaceTempView()          replace:   有replace,表示如果存在就会替换,不存在也创建
            df.createTempView()                              无replace, 如果存在就报错,不存在就创建
            注意:建议使用  createOrReplaceTempView 或 createTempView
             */
            df.registerTempTable("movie") //在spark2.0之后就抛弃了换用下面的方法
            df.createOrReplaceTempView("movie")
            //使用SQL语法进行查询
            spark.sql(
                """
                  |select
                  |*
                  |from movie
                  |where rate > 4
                  |""".stripMargin).show()
            //也可以提前定义SQL语句
            val sql = "select * from movie where rate > 4"
            spark.sql(sql).show()
            
            //关闭SparkSession(用完关闭,养成好习惯)
            spark.stop()
        }
    }
    
    

    5.3、SparkSQL编程模型的操作

    5.3.1、DataFrame的构建

    构建方式有三种:
    1、就是上面使用的用read()方法读取文件,返回一个DataFrame
    2、通过javaBean+反射来构建
    3、通过动态编码的方式来构建

    package com.xxx.SparkSQL.Day01
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    
    object _04CreateDataFrameMethod {
        def main(args: Array[String]): Unit = {
    
            javaBean
            dynamicProgramme
        }
    
        def javaBean: Unit = {
            val spark: SparkSession = SparkSession.builder().master("local").appName("create").getOrCreate()
            import spark.implicits._
    
            //描述一个java的学生集合对象,要使用java中对应的class
            val students = List(new Student(1001, "zs", "f", 23),
                new Student(1002, "ls", "m", 23),
                new Student(1003, "ww", "f", 25),
                new Student(1004, "zl", "f", 26))
    
            //调用createDataFrame方法需要导包
            import scala.collection.JavaConversions._
            //调用SparkSession的createDataFrame方法
            val df: DataFrame = spark.createDataFrame(students, classOf[Student])
            df.show()
    
            spark.stop()
        }
    
        def dynamicProgramme: Unit = {
            val spark: SparkSession = SparkSession.builder().master("local").appName("create").getOrCreate()
            import spark.implicits._
    
            //获取一个RDD对象,要使用SparkContext,SparkSession对象里可以直接获取
            //Row:行,就是代表了二维表中的一行记录
            val rdd1: RDD[Row] = spark.sparkContext.parallelize(List(
                Row(1001, "zs", "m", 23),
                Row(1002, "ls", "m", 24),
                Row(1003, "ww", "m", 25),
                Row(1004, "zl", "m", 26),
                Row(1005, "xq", "m", 27)
            ))
    
            //获取RDD的描述信息(RDD数据的元数据)
            val schema: StructType = StructType(List(
                StructField("id", DataTypes.IntegerType, false), //false代表不可为空,true可为空
                StructField("name", DataTypes.StringType, false),
                StructField("gender", DataTypes.StringType, false),
                StructField("age", DataTypes.IntegerType, false)
            ))
    
            val df: DataFrame = spark.createDataFrame(rdd1, schema)
            df.show()
            
            spark.stop()
        }
    }
    
    

    说明,这里学习三个新的类:

    Row:代表的是二维表中的一行记录,或者就是一个Java对象
    StructType:是该二维表的元数据信息,是StructField的集合
    StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)

    • 总结:

    这两种方式,都是非常常用,但是动态编程更加的灵活,因为javabean的方式的话,提前要确定好数据格式类型,后期无法做改动。

    5.3.2、DataSet的构建

    DataSet就是DataFrame的升级版,创建方式和DataFrame类似,但有所不同

    package com.xxx.SparkSQL.Day01
    
    import org.apache.spark.sql.{Dataset, SparkSession}
    
    /**
     * DataSet就是DataFrame的升级版
     *
     *  spark.createDataset(List|Seq|Array)
     * 创建方式是一样的,都可以使用动态编程方式
     * 1. 参数是scala的集合对象
     * 2. 有一个隐式参数:需要导入spark的隐式方法
     * 3. 集合的元素类型是一个样例类
     */
    object _05CreateDataSetMethod {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession.builder().master("local").appName("create").getOrCreate()
            import spark.implicits._
    
            //维护一个普通集合
            val girls = List(
                Girls(1001,"zs","m",23),
                Girls(1002,"ls","m",24),
                Girls(1003,"ww","m",25),
                Girls(1004,"zl","m",26)
            )
    
            val ds: Dataset[Girls] = spark.createDataset(girls)
            ds.show()
    
            spark.stop()
        }
    }
    
    case class Girls(id: Int, name: String, gender: String, age: Int)
    

    在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过

    5.3.3、RDD和DataFrame以及DataSet之间的相互转换

    1、RDD转DataFrame,DataSet

    import spark.implicits._
    rdd.toDF()
    rdd.toDS()
    

    2、DataFrame,DataSet转RDD

    df.rdd
    ds.rdd
    

    3、DataFrame转DataSet

    DataFrame无法直接转DataSet,但是可以调用算子实现转换(orderBy,groupBy等)

    val ds: Dataset[Row] = df.orderBy($"deptno".desc)
    

    4、DatSet转DataFrame

    ds.toDF
    

    在这里插入图片描述

    展开全文
  • Spark实战(六)spark SQL + hive(Python版)

    千次阅读 2019-01-10 13:54:29
    一、hive环境准备 ...CREATE USER 'spark'@'%' IDENTIFIED BY '123456'; GRANT all privileges ON hive.* TO 'spark'@'%'; flush privileges; 2、环境配置    将配置好的hive-site.xml放入$SPARK-HOME/...

    一、hive环境准备

    1、安装hive

       按照hive安装步骤安装好hive
    CREATE USER 'spark'@'%' IDENTIFIED BY '123456';
    GRANT all privileges ON hive.* TO 'spark'@'%';
    

    flush privileges;

    2、环境配置

       将配置好的hive-site.xml放入$SPARK-HOME/conf目录下,,下载mysql连接驱动,并拷贝至spark/lib目录下

       启动spark-shell时指定mysql连接驱动位置

    bin/spark-shell --master spark://mini1:7077 --executor-memory 1g --total-executor-cores 2 --driver-class-path /home/hadoop/spark/lib/mysql-connector-java-5.1.35-bin.jar
    

    二、Spark SQL

    1、概述

       Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

    2、DataFrames

       与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。

    在这里插入图片描述

    3、操作实例

    from pyspark import Row
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructField, StringType, StructType
    
    if __name__ == "__main__":
        spark = SparkSession\
                .builder\
                .appName("PythonWordCount")\
                .master("local")\
                .getOrCreate()
        sc = spark.sparkContext
        line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day4\\person.txt").map(lambda x: x.split(' '))
        # personRdd = line.map(lambda p: Row(id=p[0], name=p[1], age=int(p[2])))
        # personRdd_tmp = spark.createDataFrame(personRdd)
        # personRdd_tmp.show()
    
        #读取数据
        schemaString = "id name age"
        fields = list(map(lambda fieldName: StructField(fieldName, StringType(), nullable=True), schemaString.split(" ")))
        schema = StructType(fields)
    
        rowRDD = line.map(lambda attributes: Row(attributes[0], attributes[1],attributes[2]))
        peopleDF = spark.createDataFrame(rowRDD, schema)
        peopleDF.createOrReplaceTempView("people")
        results = spark.sql("SELECT * FROM people")
        results.rdd.map(lambda attributes: "name: " + attributes[0] + "," + "age:" + attributes[1]).foreach(print)
    
        # SQL风格语法
        # personRdd_tmp.registerTempTable("person")
        # spark.sql("select * from person where age >= 20 order by age desc limit 2").show()
    	#方法风格语法
        # personRdd_tmp.select("name").show()
        # personRdd_tmp.select(personRdd_tmp['name'], personRdd_tmp['age'] + 1).show()
        # personRdd_tmp.filter(personRdd_tmp['age'] > 21).show()
        # personRdd_tmp.groupBy("age").count().show()
    
        
        # personRdd_tmp.createOrReplaceTempView("people")
        # sqlDF = spark.sql("SELECT * FROM people")
        # sqlDF.show()
    
        # personRdd_tmp.createGlobalTempView("people")
        # spark.sql("SELECT * FROM global_temp.people").show()
        #
        # spark.newSession().sql("SELECT * FROM global_temp.people").show()
    
    	# 保存为指定格式
        # people = line.map(lambda p: (p[0],p[1], p[2].strip()))
        # schemaString = "id name age"
        #
        # fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
        # # # 通过StructType直接指定每个字段的schema
        # schema = StructType(fields)
        # schemaPeople = spark.createDataFrame(people, schema)
        # schemaPeople.createOrReplaceTempView("people")
        # results = spark.sql("SELECT * FROM people")
        # results.write.json("D:\\code\\hadoop\\data\\spark\\day4\\personout.txt")
        # results.write.save("D:\\code\\hadoop\\data\\spark\\day4\\personout1")
    
        # results.show()
    
       结果如下:

    在这里插入图片描述
       保存为JSON文件如下

    在这里插入图片描述

    在这里插入图片描述

       从mysql中读写数据
    	#JDBC
        # sqlContext = SQLContext(sc)
        # df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/hellospark",
        #                                             driver="com.mysql.jdbc.Driver", dbtable="(select * from actor) tmp",
        #                                             user="root", password="123456").load()
    
        # schemaPeople.write \
        #     .format("jdbc") \
        #     .option("url", "jdbc:mysql://localhost:3306/hellospark")\
        #     .option("dbtable", "person")\
        #     .option("user", "root")\
        #     .option("password", "123456")\
        #     .mode("append")\
        #     .save()
    

    三、Spark Sql + Hive

       需要指定warehouse_location,并将配置好的hive-site.xml放入$SPARK-HOME/conf目录下

    from os.path import abspath	
    from pyspark.sql import SparkSession
    # from pyspark.sql.types import StructField, StringType, StructType
    
    if __name__ == "__main__":
        # spark = SparkSession\
        #         .builder\
        #         .appName("PythonWordCount")\
        #         .master("spark://mini1:7077")\
        #         .getOrCreate()
        # sc = spark.sparkContext
        warehouse_location = abspath('spark-warehouse')
    
        spark = SparkSession \
            .builder \
            .appName("Python Spark SQL Hive integration example") \
            .config("spark.sql.warehouse.dir", warehouse_location) \
            .enableHiveSupport() \
            .getOrCreate()
    
        # spark.sql("CREATE TABLE IF NOT EXISTS person(id STRING, name STRING,age STRING) row format delimited fields terminated by ' '")
        # spark.sql("LOAD DATA INPATH 'hdfs://mini1:9000/person/data/person.txt' INTO TABLE person")
        spark.sql("SELECT * FROM person").show()
    
        # line = sc.textFile("hdfs://mini1:9000/person/data/person.txt").map(lambda x: x.split(' '))
        # people = line.map(lambda p: (p[0],p[1], p[2].strip()))
        # schemaString = "id name age"
        #
        # fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
        # # # 通过StructType直接指定每个字段的schema
        # schema = StructType(fields)
        # schemaPeople = spark.createDataFrame(people, schema)
        # schemaPeople.createOrReplaceTempView("people")
        #
        # schemaPeople.write \
        #     .format("jdbc") \
        #     .option("url", "jdbc:mysql://192.168.62.132:3306/hellospark")\
        #     .option("dbtable", "person")\
        #     .option("user", "root")\
        #     .option("password", "123456you")\
        #     .mode("append")\
        #     .save()
    
        spark.stop()
        # sc.stop()
    
       还可以使用org.apache.spark.sql.hive.HiveContext来实现。
    展开全文
  • Spark SQL

    2020-11-23 14:24:43
    hive将SQL转为MapReduce SparkSql可以简单理解将SQL转为RDD+优化在执行 spark处理数据类型 Spark 的 RDD 主要用于处理 非结构化数据 和 半结构化数据结构化 SparkSQL中的SQL主要用于处理 结构化数据(较为规范的半...
  • Spark SQL 使用示例

    2019-08-24 00:50:46
    1、Spark SQL 1.1、Spark SQL概述 Spark SQLSpark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。 Hive它是将Hive SQL转换成MapReduce然后提交到集群上...
  • spark Sql

    千次阅读 多人点赞 2019-10-15 08:19:25
    spark sql一.概述1 spark历史2 Spark-SQL 概述2.1 特点2.2 作用2.3 Spark SQL架构图3 Dataset演进历史3.1 RDD3.1.1 优点3.1.2 缺点3.2 DataFrame3.2.1 优点3.2.2 缺点3.2.3 核心特征3.3 Dataset3.3.1 区别3.3.2 特点...
  • 第二十四记·Spark SQL配置及使用

    千次阅读 2018-09-09 14:54:50
    SparkSQL是spark的一个模块,主入口是SparkSession,将SQL查询与Spark程序无缝混合。DataFrames和SQL提供了访问各种数据源(通过JDBC或ODBC连接)的常用方法包括Hive,Avro,Parquet,ORC,JSON和JDBC。您甚至可以跨...
  • SparkSQL 学习笔记

    千次阅读 2018-12-29 14:24:27
    为什么学习Spark SQLSpark SQL的版本迭代 SparkSession sparkSession概念解释: 特点 创建SparkSession 在spark-shell中创建 在IDEA中创建SparkSession RDD,DataFrame 和 DataSet RDD的局限性 什么是...
  • Spark计算引擎之SparkSQL详解

    万次阅读 2019-08-06 16:44:48
    一、Spark SQL 二、Spark SQL 1.Spark SQL概述 1.1.Spark SQL的前世今生 Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容。Shark建立在Hive的代码基础上,并通过将Hive的部分物理执行计划交换出来...
  • Spark SQL入门用法与原理分析

    万次阅读 2017-01-05 11:48:14
    sparkSQL是为了让开发人员摆脱自己编写RDD等原生Spark代码而产生的,开发人员只需要写一句SQL语句或者调用API,就能生成(翻译成)对应的SparkJob代码并去执行,开发变得更简洁, 1. API 2.原理 3.Catalyst解析器 4...
  • spark sql

    千次阅读 2014-11-08 11:00:28
    Spark SQLSQL 语句的处理和关系型数据库对 SQL 语句的处理采用了类似的方法,首先会将 SQL 语句进行解析(Parse),然后形成一个 Tree,在后续的如绑定、优化等处理过程都是对 Tree 的操作,而操作的方法是采用 Rule,...
  • 总结:Hive,Hive on SparkSparkSQL区别

    万次阅读 多人点赞 2017-08-04 22:36:07
    Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优化上的一些总结 ...
  • SparkSQL(6)——Spark SQL JDBC

    千次阅读 2018-10-01 18:41:24
    Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame。 通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。 SparkSQL从MySQL中加载数据 package com.fgm.sparksql import java....
  • 编写Spark SQL程序实现RDD转换成DataFrame Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD...
  • SparkSQL(4)——Spark SQL DataSet操作

    千次阅读 2018-09-30 22:12:14
    DataSet1、DataSet是什么?2、DataFrame与DataSet的区别3、DataFrame与DataSet互相转换DataFrame转为 DataSetDataSet转为DataFrame4、DataSet的创建从一个已经存在的scala集合来构建从一个已经存在的rdd中来构建通过...
  • Spark Sql

    2019-09-17 14:05:13
    Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame(数据框)的编程抽象,DF的底层仍然是RDD,并且可以充当分布式SQL查询引擎。 SparkSQL的由来 SparkSQL的前身是Shark。在Hadoop...
  • 这里写目录标题数据读写初识 DataFrameReader初识 DataFrameWriter读写 ...SparkSQL 的一个非常重要的目标就是完善数据读取, 所以 SparkSQL 中增加了一个新的框架, 专门用于读取外部数据源, 叫做 DataFrameReader impo
  • 主要参考书籍:《Spark SQL 内核剖析》 参考博客:...文章目录1.Spark执行流程概述SparkSQL的使用一般步骤:SparkSQL语句的转换2.Catalyst基础的SQL优化器理论:Catalyst...
  • Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种: 第1种:指定列名添加Schema 第2种:通过StructType指定Schema 第3种:编写样例类,利用反射...
  • SparkSQL Spark SQLSpark的一个组件,能够很好的处理结构化数据
  • Spark SQL 函数全集

    万次阅读 多人点赞 2018-03-23 09:31:01
    org.apache.spark.sql.functions是一个Object,提供了约两百多个函数。 大部分函数与Hive的差不多。 除UDF函数,均可在spark-sql中直接使用。 经过import org.apache.spark.sql.functions._ ,也可以用于...
  • spark sql 例子

    万次阅读 2017-05-31 14:23:35
    该文主要展示的是spark sql 例子 (内容是找了份oracle的例子,翻译成spark sql的) 1、需要准备好四张表,既四个文本文件逗号分隔 2、为这四张表创建好schema,并注册成表 3、时间处理有小部分改动
  • Spark SQL详解

    千次阅读 2018-09-26 09:06:17
    熟悉spark sql的都知道,spark sql是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业...
  • hive表已经创建好了,详见: hive实例:搜狗用户搜索日志 配置: 1. 把core-site.xml和hive-site.xml复制到spark的conf目录下 ...2. 把mysql-connector-java-5.1.47-bin.jar复制到spark的jars目录...
  • Spark SQL入门基础

    千次阅读 2018-06-27 18:05:34
    Spark SQL简介 从Shark说起 Shark即hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划、翻译执行计划优化等逻辑,可以近似认为将物理执行计划从MapReduce作业...
  • Spark SQL原理与应用

    千次阅读 2018-08-12 13:48:24
    Spark SQL原理与应用
  • Flink SQL vs Spark SQL

    千次阅读 2019-03-10 20:57:17
    Spark已经在大数据分析领域确立了事实得霸主地位,而Flink则得到了阿里系的亲赖前途一片光明。我们今天会SparkSQL和FlinkSQL的执行流程进行一个梳理。并提...
  • 1. 使用Scala语言操作Spark SQL, 将RDD转为DataFrame
  • Spark SQL入门

    千次阅读 2018-03-31 20:45:50
    1、SQL结合spark有两条线: Spark SQL和Hive on Spark(还在开发状态,不稳定,暂时不建议使用)。 #Hive on Spark是在Hive中的,使用Spark作为hive的执行引擎,只需要在hive中修改一个参数即可: # set hive....

空空如也

1 2 3 4 5 ... 20
收藏数 84,725
精华内容 33,890
关键字:

spark sql