精华内容
参与话题
问答
  • spark-sql执行hivesql

    千次阅读 2018-08-27 14:40:02
    spark提交命令有spark-shell、spark-submit、spark-sqlspark-sql的提交命令很少见吧,不过已经被我发掘出来了。 我们执行hive命令一般都是 hive -e 'select * from xx' 或者 hive -f /home/hadoop/xx....

    spark提交命令有spark-shell、spark-submit、spark-sql,spark-sql的提交命令很少见吧,不过已经被我发掘出来了。
    我们执行hive命令一般都是

    hive -e 'select * from xx'
    或者 
    hive -f /home/hadoop/xx.hql -d dt=2018-01-01
    

    但是hive底层使用mr执行速度实在不忍直视,安装hive on spark又太麻烦了,怎么办呢?其实,spark也有基于hive执行sql脚本的提交任务方式,就是spark-sql

    spark-sql --master yarn-client -e 'select * from xx'
    spark-sql --master yarn-client  dt=2018-01-01 -f '/home/hadoop/xx.hql'
    

    不过spark对机器内存性能要求很高,容易执行失败,如果spark-sql执行失败,出现内存溢出的情况,还是使用hive比较稳定。
    这里spark-sql能查询到hive表是怎么配置的呢?只需要把hive-sit.xml复制到spark安装目录的conf目录下即可。
    spark-sql缺点:执行语句insert overwrite table xx…在结果目录会有大量小文件,容易内存溢出执行失败

    我的GitHub
    QQ:2541692705
    邮箱:loyalwilliams@163.com
    微信扫一扫关注我,没事可以玩玩小游戏



    我想去流浪,我想去读书,若有机会,江湖再见
    扫一扫,领取红包,就当奖励你我付出的努力

    展开全文
  • Spark SQL详解

    千次阅读 2018-09-26 09:06:17
    熟悉spark sql的都知道,spark sql是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业...

    转自:https://mp.weixin.qq.com/s/SGhYBxGd5qCVfeM70DRFTw

    发家史

    熟悉spark sql的都知道,spark sql是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);

    同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。
    Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe。也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。执行计划生成和优化都由Catalyst负责。借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行计划优化策略比Hive要简洁得多。

    Spark SQL

    spark sql提供了多种接口:

    1. 纯Sql 文本

    2. dataset/dataframe api

    当然,相应的,也会有各种客户端:

    sql文本,可以用thriftserver/spark-sql

    编码,Dataframe/dataset/sql

     

     Dataframe/Dataset API简介

     

    Dataframe/Dataset也是分布式数据集,但与RDD不同的是其带有schema信息,类似一张表。

    可以用下面一张图详细对比Dataset/dataframe和rdd的区别:

    Dataset是在spark1.6引入的,目的是提供像RDD一样的强类型、使用强大的lambda函数,同时使用spark sql的优化执行引擎。到spark2.0以后,DataFrame变成类型为Row的Dataset,即为:

    type DataFrame = Dataset[Row]

     

    所以,很多移植spark1.6及之前的代码到spark2+的都会报错误,找不到dataframe类。

    基本操作
     

    val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”)
    df.show()
    import spark.implicits._
    df.printSchema()
    df.select("name").show()
    df.select($"name", $"age" + 1).show()
    df.filter($"age" > 21).show()
    df.groupBy("age").count().show()
    spark.stop()

    分区分桶 排序
     

    分桶排序保存hive表
    df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”)
    分区以parquet输出到指定目录
    df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
    分区分桶保存到hive表
    df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")

    cube rullup pivot

    cube
    sales.cube("city", "year”).agg(sum("amount")as "amount”) .show()
    rull up
    sales.rollup("city", "year”).agg(sum("amount")as "amount”).show()
    pivot 只能跟在groupby之后
    sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()

     

    SQL编程

    Spark SQL允许用户提交SQL文本,支持一下三种手段编写sql文本:

    1. spark 代码

    2. spark-sql的shell

    3. thriftserver

    支持Spark SQL自身的语法,同时也兼容HSQL。

    1. 编码

    要先声明构建SQLContext或者SparkSession,这个是SparkSQL的编码入口。早起的版本使用的是SQLContext或者HiveContext,spark2以后,建议使用的是SparkSession。

    1. SQLContext
    new SQLContext(SparkContext)
    
    2. HiveContext
    new HiveContext(spark.sparkContext)
    
    3. SparkSession
    不使用hive元数据:
    val spark = SparkSession.builder()
     .config(sparkConf) .getOrCreate()
    使用hive元数据
    val spark = SparkSession.builder()
     .config(sparkConf) .enableHiveSupport().getOrCreate()

    使用

    val df =spark.read.json("examples/src/main/resources/people.json") 
    df.createOrReplaceTempView("people") 
    spark.sql("SELECT * FROM people").show()

    2. spark-sql脚本

    spark-sql 启动的时候类似于spark-submit 可以设置部署模式资源等,可以使用

    bin/spark-sql –help 查看配置参数。 

    需要将hive-site.xml放到${SPARK_HOME}/conf/目录下,然后就可以测试

    show tables;
    
    select count(*) from student;

    3. thriftserver

    thriftserver jdbc/odbc的实现类似于hive1.2.1的hiveserver2,可以使用spark的beeline命令来测试jdbc server。

    安装部署
    1). 开启hive的metastore
    bin/hive --service metastore 
    2). 将配置文件复制到spark/conf/目录下
    3). thriftserver
    sbin/start-thriftserver.sh --masteryarn  --deploy-mode client
    对于yarn只支持client模式
    4). 启动bin/beeline
    5). 连接到thriftserver
    !connect jdbc:hive2://localhost:10001

    用户自定义函数 

    1. UDF

    定义一个udf很简单,例如我们自定义一个求字符串长度的udf。

    val len = udf{(str:String) => str.length}
    spark.udf.register("len",len)
    val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
    ds.createOrReplaceTempView("employees")
    ds.show()
    spark.sql("select len(name) from employees").show()

    2. UserDefinedAggregateFunction

    定义一个UDAF

    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._
    
    object MyAverageUDAF extends UserDefinedAggregateFunction {
     //Data types of input arguments of this aggregate function
     definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)
     //Data types of values in the aggregation buffer
     defbufferSchema:StructType = {
       StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)
     }
     //The data type of the returned value
     defdataType:DataType = DoubleType
     //Whether this function always returns the same output on the identical input
     defdeterministic: Boolean = true
     //Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to
     // standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides
     // the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still
     // immutable.
     definitialize(buffer:MutableAggregationBuffer): Unit = {
       buffer(0) = 0L
       buffer(1) = 0L
     }
     //Updates the given aggregation buffer `buffer` with new input data from `input`
     defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={
       if(!input.isNullAt(0)) {
         buffer(0) = buffer.getLong(0)+ input.getLong(0)
         buffer(1) = buffer.getLong(1)+ 1
       }
     }
     // Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`
     defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={
       buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)
       buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)
     }
     //Calculates the final result
     defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1)
    }

    使用UDAF

    val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
    ds.createOrReplaceTempView("employees")
    ds.show()
    spark.udf.register("myAverage", MyAverageUDAF)
    val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
    result.show()

    3. Aggregator

    定义一个Aggregator

    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    import org.apache.spark.sql.expressions.Aggregator
    case class Employee(name: String, salary: Long)
    case class Average(var sum: Long, var count: Long)
    
    object MyAverageAggregator extends Aggregator[Employee, Average, Double] {
    
     // A zero value for this aggregation. Should satisfy the property that any b + zero = b
     def zero: Average = Average(0L, 0L)
     // Combine two values to produce a new value. For performance, the function may modify `buffer`
     // and return it instead of constructing a new object
     def reduce(buffer: Average, employee: Employee): Average = {
       buffer.sum += employee.salary
       buffer.count += 1
       buffer
     }
     // Merge two intermediate values
     def merge(b1: Average, b2: Average): Average = {
       b1.sum += b2.sum
       b1.count += b2.count
       b1
     }
     // Transform the output of the reduction
     def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
     // Specifies the Encoder for the intermediate value type
     def bufferEncoder: Encoder[Average] = Encoders.product
     // Specifies the Encoder for the final output value type
     def outputEncoder: Encoder[Double] = Encoders.scalaDouble
    }

    使用

    spark.udf.register("myAverage2", MyAverageAggregator)
    import spark.implicits._
    val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee]
    ds.show()
    val averageSalary = MyAverageAggregator.toColumn.name("average_salary")
    val result = ds.select(averageSalary)
    result.show() 

     

    数据源

     

    1. 通用的laod/save函数
    可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text

    
     

    val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
    peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

    默认的是parquet,可以通过spark.sql.sources.default,修改默认配置。
    2. Parquet 文件

    val parquetFileDF =spark.read.parquet("people.parquet") 
    peopleDF.write.parquet("people.parquet")

    3. ORC 文件

    val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
    ds.write.mode("append").orc("/opt/outputorc/")
    spark.read.orc("/opt/outputorc/*").show(1)

    4. JSON

    ds.write.mode("overwrite").json("/opt/outputjson/")
    spark.read.json("/opt/outputjson/*").show()

    5. Hive 表

    spark 1.6及以前的版本使用hive表需要hivecontext。

    Spark2开始只需要创建sparksession增加enableHiveSupport()即可。

    val spark = SparkSession
    .builder()
    .config(sparkConf)
    .enableHiveSupport()
    .getOrCreate()
    
    spark.sql("select count(*) from student").show()

    6. JDBC

    写入mysql

    wcdf.repartition(1).write.mode("append").option("user", "root")
     .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())

    从mysql里读

    val fromMysql = spark.read.option("user", "root")
     .option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())

    7. 自定义数据源

    自定义source比较简单,首先我们要看看source加载的方式

    指定的目录下,定义一个DefaultSource类,在类里面实现自定义source。就可以实现我们的目标。

    import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}

    class DefaultSource  extends DataSourceV2 with ReadSupport {

     def createReader(options: DataSourceOptions) = new SimpleDataSourceReader()
    }

     

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}

    class SimpleDataSourceReader extends DataSourceReader {

     def readSchema() = StructType(Array(StructField("value", StringType)))

     def createDataReaderFactories = {
       val factoryList = new java.util.ArrayList[DataReaderFactory[Row]]
       factoryList.add(new SimpleDataSourceReaderFactory())
       factoryList
     }
    }

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}

    class SimpleDataSourceReaderFactory extends
     DataReaderFactory[Row] with DataReader[Row] {
     def createDataReader = new SimpleDataSourceReaderFactory()
     val values = Array("1", "2", "3", "4", "5")

     var index = 0

     def next = index < values.length

     def get = {
       val row = Row(values(index))
       index = index + 1
       row
     }

     def close() = Unit
    }

    使用

    val simpleDf = spark.read
     .format("bigdata.spark.SparkSQL.DataSources")
     .load()

    simpleDf.show()

    优化器及执行计划

    1. 流程简介

    整体流程如下:

    总体执行流程如下:从提供的输入API(SQL,Dataset, dataframe)开始,依次经过unresolved逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据cost based优化,选取一条物理计划进行执行.

    简单化成四个部分:

    1). analysis
    
    Spark 2.0 以后语法树生成使用的是antlr4,之前是scalaparse。
    
    2). logical optimization
    
    常量合并,谓词下推,列裁剪,boolean表达式简化,和其它的规则
    
    3). physical planning
    
    eg:SortExec          
    
    4). Codegen
    
    codegen技术是用scala的字符串插值特性生成源码,然后使用Janino,编译成java字节码。Eg: SortExec

    2. 自定义优化器

    1). 实现

    继承Rule[LogicalPlan]

    2). 注册

    spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule)

    3). 使用

    selectExpr("amountPaid* 1")

    3. 自定义执行计划

    主要是实现重载count函数的功能

    1).  物理计划:

    继承SparkLan实现doExecute方法

    2). 逻辑计划

    继承SparkStrategy实现apply

    3). 注册到Spark执行策略:

    spark.experimental.extraStrategies =Seq(countStrategy)

    4). 使用

    spark.sql("select count(*) fromtest")
    展开全文
  • Spark SQL入门基础

    千次阅读 2018-06-27 18:05:34
    Spark SQL简介 从Shark说起 ...Shark即hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划、翻译执行计划优化等逻辑,可以近似认为将物理...Shark的出现,使得SQL-o...

    Spark SQL简介

    ###从Shark说起
    Shark即hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划、翻译执行计划优化等逻辑,可以近似认为将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作。Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高。

    Shark的设计导致了两个问题:

    • 执行计划优化完全依赖于Hive,不方便添加新的优化策略。

    • 因为Shark是线程级并行,而MapReduce是进程级。因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支。

    2014年6月1日Shark项目和SparkSQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放在SparkSQL项目上。至此,Shark的发展画上了句号,也因此发展出了两个方向:SparkSQL和Hive on Spark

    • Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive。

    • Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

    Spark SQL设计

    在这里插入图片描述
    Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。

    Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。Spark SQL提供DataFrame API,可以对内部和外部各种数据源执行各种关系操作。

    Spark SQL可以支持大量的数据源和数据分析算法。Spark SQL可以融合传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力。

    DataFrame

    DataFrame使得Spark具备了对大规模结构化数据的处理能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算能力。

    • RDD是分布式的Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的。

    • DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息。
      Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。
      RDD分布式对象的集合。

    DataFrame的创建

    从Spark2.0开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。

    Spark支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。

    现在在"/usr/local/spark/examples/src/main/resources/"这个目录下有两个样例数据people.json和people.txt。

    people.json文件的内容如下:
    {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}

    people.txt文件的内容如下:
    Michael,29 Andy,30 Justin,19

    编写代码读取文件数据,创建DataFrame。

    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().getOrCreate()
    //是支持RDDs转换为DataFrames及后续sql操作
    import spark.implictis._
    val df = spark.read.json("file://usr/local/spark/examples/src/main/resources/people.json")
    df.show()
    //打印模式信息
    df.printSchema()
    df.select(df("name"), df("age")+1).show()
    //分组聚合
    df.groupBy("age").count().show()
    

    从RDD到DataFrame

    利用反射机制推断RDD模式

    在利用反射机制推断RDD模式时,需要首先顶一个case class。因为,只有case class才能被Spark隐式地转换为DataFrame。

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.Encoder
    import spark.implicits._ //支持把一个RDD隐式转换为一个DataFrame
    import spark.implicits._
    case class Person(name:String, age:Long) # 定义case class
    val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
    	.map(_.split(",")).map(attributes => Person(attributes(0),attributes(1).trim.toInt)).toDF()
    peopleDF.createOrReplaceTempView("people") #必须注册为临时表才能供下面的查询使用
    //最终生成一个DataFrame
    val personsRDD = spark.sql("select name, age from people where age>20")
    //DataFrame中的么个元素都是一行记录,包含name和age两个字段,分别用t(0),t(1)来获取值
    personsRDD.map(t=>"Name:"+t(0)+","+"Age:"+t(1)).show()
    

    使用编程方式定义RDD模式

    当无法提前定义case clas时,就需要采用编程方式定义RDD模式。

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
    //定义一个模式字符串
    val schemaString="name age"
    //根据模式字符串生成模式
    val fields = schemaString.split(" ").map(fieldName=>StructField(fieldName, StringType, nullable=true))
    val schema = StructType(fields) //模式中包含name和age两个字段
    val rowRDD = peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0), attributes(1).trim))
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    peopleDF.createOrReplaceTempView("people")
    val results = spark.sql("select name,age from people")
    results.map(attributes=>"name"+attributes(0)+"."+"age:"+attributes(1)).show()
    
    

    把DataFrame保存成文件

    • 第一个方法
    val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
    peopleDF.select("name","age").write.format("csv")
    	.save("file:///usr/local/spark/examples/src/main/resources/newpeople.csv")
    

    write.format()支持输出json,parquet,jdbc.orc,libsvm,csv,text等格式文件。

    • 第二种方式
    val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
    //转换成rdd然后再保存
    peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
    

    读取和保存parquet数据

    Spark SQL可以支持Parquet、JSON、Hive等数据源,并且可以通过JDBC连接外部数据源。

    Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合使用的组件有:

    • 查询引擎:Hive,Impala,Pig,Presto等
    • 计算框架:MapReduce,Spark,Cascading等
    • 数据模型:Avro,Thrift,Protocol Buffers, POJOs

    从parquet文件中加载数据生成DataFrame

    import spark.implicits._
    val parquetFileDF = spark.read.parquet("file:///usr/local/spark/examples/src/main/resources/users.parquet")
    parquetFileDF.createOrReplaceTempView("parquetFile")
    val namesDF = spark.sql("select * from parquetFile")
    namesDF.foreach(attributes=>println("Name:"+attributes(0)+"favorite color:"+attributes(1)))
    

    将DataFrame保存成parquet文件

    import spark.implicits._
    val parquetFileDF = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
    peopleDF.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")
    

    读取和插入MySQL

    准备工作:

    • 下载MySQL的JDBC驱动,比如mysql-connector-java-5.1.40.tar.gz
    • 把该驱动程序拷贝到spark的安装目录“/usr/local/spark/jars”下
    • 启动一个spark-shell,启动Spark Shell时,必须指定mysql连接驱动jar包
    $ ./bin/spark-shell --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
    

    在MySQL数据库中创建了一个名称为spark的数据库,并创建了一个名称为student的表。
    执行以下命令连接数据库,读取数据,并显示:

    val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark")
    	.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student")
    	.option("user", "root").option("password","hadoop").load()
    jdbcDF.show()
    

    向student表中插入两条记录:

    import java.util.Properties
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    //下面我们设置两条数据表示两个学生信息
    val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26", "4 Guanhua M 27")).map(_.split(" "))
    
    //下面要设置模式信息
    val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("gender", StringType, true), StructField("age", IntegerType, true)))
    
    //下面创建Row对象,每个Row对象都是rowRDD中的一行
    val rowRDD = studentRDD.map(p=>Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
    
    //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
    val studentDF = spark.createDataFrame(rowRDD, schema)
    
    //下面创建一个prop变量用来保存jdbc连接参数
    val prop = new Properties()
    prop.put("user", "root") // 表示用户名是root
    prop.put("password", "hadoop") //表示密码是hadoop
    prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序
    
    //连接数据库,采用append模式,表示追加记录到数据表中
    studentDF.wirte.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "student", prop)
    
    展开全文
  • Spark SQL入门

    千次阅读 2018-03-31 20:45:50
    1、SQL结合spark有两条线: Spark SQL和Hive on Spark(还在开发状态,不稳定,暂时不建议使用)。 #Hive on Spark是在Hive中的,使用Spark作为hive的执行引擎,只需要在hive中修改一个参数即可: # set hive....

    1、SQL结合spark有两条线:

    Spark SQL和Hive on Spark(还在开发状态,不稳定,暂时不建议使用)。

    #Hive on Spark是在Hive中的,使用Spark作为hive的执行引擎,只需要在hive中修改一个参数即可:

    # set hive.execution.engine=spark

     

    2、Spark SQL

    a.概述:

            Spark SQL是Spark处理数据的一个模块,跟基本的Spark RDD的API不同,Spark SQL中提供的接口将会提供给Spark更多关于结构化数据和计算的信息。其本质是,Spark SQL使用这些额外的信息去执行额外的优化,这儿有几种和Spark SQL进行交互的方法,包括SQL和Dataset API,当使用相同的执行引擎时,API或其它语言对于计算的表达都是相互独立的,这种统一意味着开发人员可以轻松地在不同的API之间进行切换。

    b.SQL:

            Spark SQL的一大用处就是执行SQL查询语句,Spark SQL也可以用来从Hive中读取数据,当我们使用其它编程语言来运行一个SQL语句,结果返回的是一个Dataset或者DataFrame.你可以使用命令行,JDBC或者ODBC的方式来与SQL进行交互。

    c.Dataset和DataFrame

            Dataset是一个分布式数据集合。Dataset是一个在Spark 1.6版本之后才引入的新接口,它既拥有了RDD的优点(强类型、能够使用强大的lambda函数),又拥有Spark SQL的优点(用来一个经过优化的执行引擎)。你可以将一个JVM对象构造成一个Dataset,之后就可以使用一些transformations操作啦。我们可以使用scala,java来访问Dataset API,不支持python哦,当然,由于python的动态特性,很多的Dataset API是可以使用的,R语言也是一样哦。

            DataFrame是Dataset中一个有名字的列。从概念上,它等价于关系型数据库中的一张表,或者等价于R/Python中的Data Frame,但它在底层做了更好的优化。构造DataFrame的数据源很多:结构化的数据文件、hive表、外部数据库、已经存在的RDD。DataFrame 的API支持java,scal.python,R

     

    3、面试题

    RDD  VS  DataFrame

    esgd 

    a.基于RDD的编程,不同语言性能是不一样的,而DataFrame是一样的,因为底层会有一个优化器先将代码进行优化。

    b.对于RDD,暴露给执行引擎的信息只有数据的类型,如RDD[Student]装的是Student,而对于DataFrame,对于外部可见的信息有字段类型,字段key,字段value等。

    c.RDD是一个数组,DataFrame是一个列式表。

     

    4、Spark SQL愿景

    a.写更少的代码 

    b.读更少的数据(压缩,存储格式,列裁剪)

    c.对于不同语言的应用程序让优化器自动进行优化

     

    5、Spark SQL架构

    客户端->未解析的逻辑执行计划(Schema Catalog 将schema作用在数据上)->逻辑执行计划->优化过后的逻辑执行计划->物理执行计划->Spark引擎。

    #Spark SQL 要使用hive中的表,需要将hive-site.xml加入spark的配置文件目录。

     

    6、执行计划(Hive 或Spark SQL)

    explain extended +查询语句

     

    7、SparkSession

    添加依赖:

    <dependency>

        <groupId>org.spark.apache</groupId>

        <artifactId>spark-sql_2.11</artifactId>    ##2.11位scala版本

        <version>${spark.version}</version>

    </dependency>

            Spark中所有功能的入口点是SparkSession类,我们可以使用SparkSession.builder()来创建一个SparkSession,具体如下(scala):

     

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    
    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

               可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。

            在Spark 2.0之后,SparkSession内置了对于hive特性的支持,允许使用HiveQL来书写查询语句访问UDF,以及从Hive表中读取数据。使用这些特性,你不需要进行任何Hive的设置。

     

    8、创建DataFrame

            通过SparkSession,应用程序可以从一个现有的RDD、Hive表、Spark数据源来创建一个DataFrame。

            以下创建DataFrame是基于JSON格式的文件:

     

    val df = spark.read.json("examples/src/main/resources/people.json")
    
    // Displays the content of the DataFrame to stdout
    df.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

             可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。

     

     

     

    9、无类型的Dataset操作(又称DataFrame 操作)

          上面提到的,在Spark 2.0时,在java或者scala API中,DataFrame是Dataset的行,这些操作也被称为“非类型转换”,与“类型化转换”相比,具有强类型的Scala/Java Dataset。

            这儿包括一些使用Dataset处理结构化数据的例子:

     

    // This import is needed to use the $-notation
    import spark.implicits._
    // Print the schema in a tree format
    df.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
    // Select only the "name" column
    df.select("name").show()
    // +-------+
    // |   name|
    // +-------+
    // |Michael|
    // |   Andy|
    // | Justin|
    // +-------+
    
    // Select everybody, but increment the age by 1
    df.select($"name", $"age" + 1).show()
    // +-------+---------+
    // |   name|(age + 1)|
    // +-------+---------+
    // |Michael|     null|
    // |   Andy|       31|
    // | Justin|       20|
    // +-------+---------+
    
    // Select people older than 21
    df.filter($"age" > 21).show()
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|Andy|
    // +---+----+
    
    // Count people by age
    df.groupBy("age").count().show()   //groupBy返回一个Dataset,count返回一个DataFrame.
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+

    可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。

    对于可以在数据集上执行的操作类型的完整列表,请参阅API Documentation

    除了简单的列引用和表达式之外,数据集还拥有丰富的函数库,包括字符串操作、日期算术、常见的数学运算等等。完整列表查看 DataFrame Function Reference.

     

    10、以编程方式运行SQL查询语句

            SparkSession中的SQL函数可以让应用程序以编程的方式运行SQL查询语句,让结果返回一个DataFrame。

     

    // Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("people")
    
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

    可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。

     

    11、全局临时视图

            Spark SQL中的临时视图作用域仅仅在于创建该视图的会话窗口,如果窗口关闭,该视图也终止。如果你想要一个在所有会话中都生效的临时视图,并且即使应用程序终止该视图仍然存活,你可以创建一个全局临时视图。 全局临时视图与系统保存数据库global_temp相关联,我们必须使用规范的名字来定义它,比如:SELECT * FROM global_temp.view1.

     

    // Register the DataFrame as a global temporary view
    df.createGlobalTempView("people")
    
    // Global temporary view is tied to a system preserved database `global_temp`
    spark.sql("SELECT * FROM global_temp.people").show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    
    // Global temporary view is cross-session
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    

    可以在spark repo下的"examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 路径下找到所有例子的代码。

     

    12、创建Dataset

            Dataset有点像RDD,但它并不是使用java或Kryo这样的序列化方式,而是使用专用的编码器将对象进行序列化,以便于在网络上进行处理和传输。虽然编码器和标准的序列化都可以将对象转成字节,但编码器产生动态的代码,它使用的格式允许Spark在不执行反序列化的情况下去执行像过滤、排序、哈希等许许多多的操作。

        

    case class Person(name: String, age: Long)
    
    // Encoders are created for case classes
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    // +----+---+
    // |name|age|
    // +----+---+
    // |Andy| 32|
    // +----+---+
    
    // Encoders for most common types are automatically provided by importing spark.implicits._
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    
    // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
    val path = "examples/src/main/resources/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+

     

     

     

    展开全文
  • Spark SQL原理与应用

    千次阅读 2018-08-12 13:48:24
    Spark SQL原理与应用
  • Spark SQL 教程

    千次阅读 2018-12-25 17:56:21
    一、什么是Spark SQL Spark SQLSpark用来处理结构化数据的一个模块,它提供了两个编程抽象分别叫做DataFrame和DataSet,它们用于作为分布式SQL查询引擎。从下图可以查看RDD、DataFrames与DataSet的关系。   ...
  • Spark SQL-概述

    千次阅读 2019-11-20 21:39:01
    1.什么是Spark SQL Spark SQLSpark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。 对比Hive,它是将Hive SQL转换成MapReduce然后提交到集群上...
  • Spark SQL 简单使用

    2018-07-09 06:42:03
    环境:scala 版本2.11.8,spark 版本2.0.1,使用 Intellij IDEA 来开发。 准备工作:创建maven项目可以从官网上找到我们建项目时使用的 archetype 至于具体怎么创建项目,请参考一个朋友的文章Intellij IDEA 创建 ...
  • 第二十四记·Spark SQL配置及使用

    千次阅读 2018-09-09 14:54:50
    SparkSQL是spark的一个模块,主入口是SparkSession,将SQL查询与Spark程序无缝混合。DataFrames和SQL提供了访问各种数据源(通过JDBC或ODBC连接)的常用方法包括Hive,Avro,Parquet,ORC,JSON和JDBC。您甚至可以跨...
  • spark-shell & spark-sql 使用

    千次阅读 2019-07-03 22:31:42
    spark-shell 启动 spark-shell [hadoop@hadoop2 bin]$ ./spark-shell --master local[2] \ --jars /opt/module/hive-1.2.2/lib/mysql-connector-java-5.1.27-bin.jar 通过 spark-shell 操作 hive ...
  • spark Sql

    千次阅读 多人点赞 2019-10-15 08:19:25
    spark sql一.概述1 spark历史2 Spark-SQL 概述2.1 特点2.2 作用2.3 Spark SQL架构图3 Dataset演进历史3.1 RDD3.1.1 优点3.1.2 缺点3.2 DataFrame3.2.1 优点3.2.2 缺点3.2.3 核心特征3.3 Dataset3.3.1 区别3.3.2 特点...
  • Spark SQL

    2020-09-27 19:24:23
    因此SparkSQL应允而生,可以理解为将sql语句转换为RDD和各种算子依赖,然后提交到集群中允许,继承了Spark的特点,因此SparkSQL执行效率非常高,同时在sql语句的转换过程中会翻译成尽可能高效率的算子依赖相较于不熟
  • SparkSQL 学习笔记

    千次阅读 2018-12-29 14:24:27
    为什么学习Spark SQLSpark SQL的版本迭代 SparkSession sparkSession概念解释: 特点 创建SparkSession 在spark-shell中创建 在IDEA中创建SparkSession RDD,DataFrame 和 DataSet RDD的局限性 什么是...
  • 1. 编写Spark SQL程序实现RDD转换成DataFrame 2. 花式查询汇总写法 3. 相互转换 4. Spark SQL完成WordCount 5. 编写Spark SQL程序操作HiveContext
  • 如果想创建一个数据库用户,并且为数据库赋值权限,可以参考:http://blog.csdn.net/tototuzuoquan/article/details/527855042.将配置好的hive-site.xml、core-site.xml、hdfs-site.xml放入$SPARK_HOME/conf目录下...
  • spark-sql(三)---spark-sql性能测试

    千次阅读 2017-12-04 14:56:33
    sparksql大数据量下测试性能 2.数据环境从网上扒过来的数据,某些网站泄漏的帐号信息,数据重复冗余很少。处理了一下,在原基础上增大了数据量。 准备的数据量大,是保证结果误差更小,也检测下sparksql数据处理...
  • 背景介绍 在日常工作中,我们可以在一个shell脚本里面利用spark-sql -e执行sql脚本,而且可以传参数,但是存在一个问题,shell脚本对格式要求严格,而且shell脚本和spark-sql的脚本混在一起,内容庞大凌乱,不好管理...
  • 关键字:spark-shell on yarn、spark-sql on yarn 前面的文章《Spark On Yarn:提交Spark应用程序到Yarn》介绍了将Spark应用程序提交到Yarn上运行。有时候在做开发测试的时候,需要使用spark-shell和spark-sql命令行...
  • CDH内嵌Spark版本不支持spark-sql,因为cloudera在推自己的impala,但是有些场景需要用到Spark-sql时,比如Kylin企业版想要加快构建需要用到SparkSQL,大概的思路就是换jar包,很多资料,千篇一律是要加入hive&...
  • 启动spark-sql执行sql时,在监控页面中看到该Application的Name是SparkSQL:hadoop000(其中hadoop000是测试机器的hostname),就有个想法,修改下该application的name,在spark-sql --help中看到--name NAME的参数设置...
  • Spark-SQL的安装及使用

    千次阅读 2016-06-09 15:24:48
    安装步骤 下载编译了Hive的Spark版本 ...CDH自带的Spark都是没有带Hive的,若直接运行./spark-sql,会报错如下Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.cli.CliDriver at java.net.URLCl
  • 关键字:spark-shell on yarn、spark-sql on yarn 前面的文章《Spark On Yarn:提交Spark应用程序到Yarn》介绍了将Spark应用程序提交到Yarn上运行。有时候在做开发测试的时候,需要使用spark-shell和spark-sql命令行...
  • spark-shell、spark-sql 都是是一个独立的 spark application,启动几个就是几个 spark application每次都要重新启动申请资源。 用thriftserver,无论启动多少个客户端(beeline),只要是连在一个thriftserver上,...
  • 错误描述:Exception in thread "main" org.apache.spark.SparkException: Application application_1479132065716_00 原因:我的是JDK版本过高,使用spark2.0.1+hadoop2.5.2+hive0.13+JDK1.7没问题 二、 错误...
  • spark-core 和spark-sql的区别

    千次阅读 2018-11-10 17:45:25
    Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL)。即Spark SQLSpark Core封装而来的!  Spark SQLSpark Core的基础上针对结构化数据处理进行很多优化和改进,  简单来讲:  Spark SQL ...
  • Spark-SQL导出查询结果的两种方式

    千次阅读 2018-09-06 18:07:49
    为了分析数据的需要,我们需要导出Spark-SQL的查询结果,通过Spark SQL CLI有两种方式。 1. 使用“-e”参数 与“Hive”一样,Spark SQL CLI也支持“-e”参数,使用方式如下: # 分成多行是为了便于阅读 # ...
  • 对于熟悉Scala开发的人来说,对于spark-sql的使用,直接jar包中写入代码处理就能轻松实现动态语句的执行。 但是对于我,不打算学习Scala和Java语言,但是又想定时执行时间推延的周期、定时任务,该肿么办? spark-...
  • spark-sql调优

    2020-08-18 14:18:46
    Hivesql底层把sql解析成了mapreduce程序,Shark是把sql语句解析成了Spark任务 随着性能优化的上限,以及集成SQL的一些复杂的分析功能,发现Hive的MapReduce思想限制了Shark的发展。 最后Databricks公司终止对Shark的...
  • spark-sql cli 参数及使用

    千次阅读 2019-02-21 18:24:17
    原文地址 ...很难找到spark-sql cli使用的教程,总结下 一、启动方法 /data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-m...
  • spark-SQL -e -f 的用法

    千次阅读 2019-07-16 16:14:22
    spark-sql -help 出现用法提示 spark-sql --database bigdata -e "select * from dw_results limit 1000;" > /home/results.txt --database 选择对应的hive数据库 -e 对应的sql语句 --num-exectors 10 ...

空空如也

1 2 3 4 5 ... 20
收藏数 84,904
精华内容 33,961
关键字:

spark-sql