2019-02-16 17:12:58 qq_31407011 阅读数 141

spark 源码简单分析
sparksql 作为spark1.0所支持的lib库中唯一新增加的lib库,可见其重要地位。
分析内容:

  • spark sql执行流程;
  • hive on spark 和 hive on hadoop 的比较;
  1. spark sql执行流程分析

    首先我们来分析下sql的通用执行过程:
    比如说: select n1,n2,n3 from tableN where n1 = ?
    其中n1,n2,n3 是需要返回的结果,tableN 是数据源,n1=? 是查询条件。
    sql 语句分析执行过程的步骤:
    - 语法解析;
    - 操作bind;
    - 优化策略;
    - 执行。
    语法解析完成会形成一颗语法树,树中的每个节点便是执行的规则,整个树就是执行策略。
    而接下来要解读的是sql on spark,无可厚非,也是要完成解析,优化,执行三大过程。
    - sqlParser 生成逻辑计划树;
    - 解析器和优化器在多个rule的基础上作用于逻辑计划树上;
    - 由优化后的逻辑计划生成物理计划;
    - 生成sparkRDD;
    - spark执行RDD。
    一般sparksql这种新特性会引入sqlContext和schemaRDD。

  2. hive on spark 和 hive on hadoop

    hiveQL 执行过程:
    - 将sql解析为语法树;
    - 语义分析;
    - 生成逻辑计划;
    - 生成查询计划;
    - 优化器。
    - 生成mapreduce的job。
    在spark中提供了hiveContext接口,而hiveContext继承自sqlContext。
    而hive中涉及到两种数据:
    - schema data 数据库的定义和表结构,存储在metastore中;
    - Row data 分析的文件本身。

    持续更新和修改中…

2018-07-05 17:59:02 qq_24674131 阅读数 2668

分组取Top N在日常需求中很多见:

 

  1. 每个班级分数前三名同学的名字以及分数
  2. 各省指标数量前三的市的名字 

等等需求,主要思想就是在某一个分区(班级,省)中取出该分区Top N的数据

测试数据格式:

如上图,字段含义为,班级,学生姓名,分数

下面我们通过一个Demo来实现各班级分数前三的学生姓名以及分数

1、通过Spark core 实现:

//读取测试数据保存为rdd

 

val rddtext = sc.textFile("file:///C:/Users/chunyuhe/Desktop/test1.txt")

//将数据转化为Row形式(为下面Spark SQL 生成临时表用)

val rowrdd = rddtext.map(m => Row(m.split(" ")(0), m.split(" ")(1), m.split(" ")(2).toInt))
/**
* spark core 实现分组取topN
*/

val classrdd = rddtext.map(x => {

                       //取到各数据并赋值给变量

val classname = x.split(" ")(0)
val name = x.split(" ")(1)

val grade = x.split(" ")(2)

                       //生成一个便于计算的元组

(classname, (name, grade.toInt))

}).groupByKey

                //根据key聚合分组得到

classrdd.foreach(x => println(x))
classrdd.map(m => {

val classname = m._1

                       //如上图将各班级同学信息转化为Array数组并且安装成绩进行降序排列取前三

                        val top3 = m._2.toArray.sortWith(_._2 > _._2).take(3)

(classname, top3)
}).foreach(m => {
println(m._1 + "班级的前3名的成绩为")
m._2.foreach(x => {
println(x)
})

})

输出结果为:

2、通过Spark sql 实现:

//隐式转换

import spark.implicits._

import spark.sql

                //生成数据表表结构

val schema = StructType(mutable.ArraySeq(
StructField("classname", StringType, true),
StructField("name", StringType, true),

StructField("grade", IntegerType, true)))

               //将表结构和表数据组合生成表

val tablerow = spark.createDataFrame(rowrdd, schema)

               //将生成的df转换为一个表并且命名

tablerow.createTempView("testtable")
val tetrow = sql("select * from testtable")

//tetrow.show()

               //运用Spark sql 开窗函数进行计算

               PARTITION BY 为需要开窗字段

               ORDER BY 为需要排序字段

val resultrow = sql("""
      select a.classname,a.name,a.grade from (select classname,name,grade,row_number() OVER (PARTITION BY classname ORDER BY grade DESC) rank from testtable) as a where a.rank <= 3
      """)

resultrow.show()

输出结果:

本文结束,希望能帮到大家,也希望大家批评指正!

Spark-SQL
2018-01-05 10:51:19 zhangliangaws 阅读数 156

Spark-SQL


1.直接通过脚本执行

注意指定使用的资源情况。
spark-sql –executor-cores 1 –executor-memory 2g

2.采用Beeline工具执行(mr用户)

beeline -u jdbc:hive2://localhost:18000 -n mr

HiveQL官方语法文档
https://cwiki.apache.org/confluence/display/Hive/LanguageManual

Example 1

hadoop fs -put /sample_fpgrowth.txt /user/mr/fpgrowth
CREATE EXTERNAL TABLE sample_fpgrowth(
        x1        STRING)
        LOCATION '/user/mr/fpgrowth/';

Example 2

hadoop fs -mkdir /user/mr/airline

hadoop fs -put /international-airline-passengers.csv /user/mr/airline/
CREATE EXTERNAL TABLE international_airline_passenger (
        x1        INT,
        x2        DOUBLE)
row format delimited fields terminated by ',' LOCATION '/user/mr/airline/';

Example3 Hive 0.14以上可用

drop table if exists test_zl;
create table test_zl (x1 INT, x2 DOUBLE, x3 DOUBLE) row format delimited fields terminated by ',';
insert overwrite into table test_zl values (1,2,3); //hive 0.14起支持
insert into table test_zl values( 2,2,3);

Iris数据集

create table iris ( sepal_length  double, sepal_width double, petal_length double, petal_width double, species string) row format delimited fields terminated by ',' LOCATION '/user/mr/iris/';

Titanic数据集

create table titanic_train (age int,passenger_class string,sex string,  no_of_siblings_or_spouses_on_board int, no_of_parents_or_children_on_board int, passenger_fare double, survived string) row format delimited fields terminated by ';' LOCATION '/user/mr/titanic_train/';

create table titanic_unlabeled (age int,passenger_class string,sex string,  no_of_siblings_or_spouses_on_board int, no_of_parents_or_children_on_board int, passenger_fare double) row format delimited fields terminated by ',' LOCATION '/user/mr/titanic_test/';

basket数据集

create table basket (basket string) row format delimited fields terminated by ';' LOCATION '/user/mr/basket/';
2017-08-23 20:53:52 bluejoe2000 阅读数 4582

spark sql对seq(s1, s2, s3, …)值的包装,seq的每个元素si会被包装成一个Row
如果si为一个简单值,则生成一个只包含一个value列的Row
如果si为一个N-Tuple,则生成一个包含N列的Row

特别的,如果N-Tuple是一元组,则视为非元组,即生成一个只包含一个value列的Row

scala> Seq(("bluejoe"),("alex")).toDF().show
+-------+
|  value|
+-------+
|bluejoe|
|   alex|
+-------+

scala> Seq("bluejoe","alex").toDF().show
+-------+
|  value|
+-------+
|bluejoe|
|   alex|
+-------+

scala> Seq(("bluejoe",1),("alex",0)).toDF().show
+-------+---+
|     _1| _2|
+-------+---+
|bluejoe|  1|
|   alex|  0|
+-------+---+

我特意编写了如下测试用例,验证了这种情况:

    @Test
    def testEncoderSchema() {
        val spark = SparkSession.builder.master("local[4]")
            .getOrCreate();
        val sqlContext = spark.sqlContext;
        import sqlContext.implicits._
        import org.apache.spark.sql.catalyst.encoders.encoderFor
        val schema1 = encoderFor[String].schema;
        val schema2 = encoderFor[(String)].schema;
        val schema3 = encoderFor[((String))].schema;

        Assert.assertEquals(schema1, schema2);
        Assert.assertEquals(schema1, schema3);
    }
2018-04-29 15:57:00 weixin_34247299 阅读数 122
# export by:
spark.sql("SET -v").show(n=200, truncate=False)
key value meaning
spark.sql.adaptive.enabled false When true, enable adaptive query execution.
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 67108864b The target post-shuffle input size in bytes of a task.
spark.sql.autoBroadcastJoinThreshold 10485760 Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run, and file-based data source tables where the statistics are computed directly on the files of data.
spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins.
spark.sql.cbo.enabled false Enables CBO for estimation of plan statistics when set true.
spark.sql.cbo.joinReorder.dp.star.filter false Applies star-join filter heuristics to cost based join enumeration.
spark.sql.cbo.joinReorder.dp.threshold 12 The maximum number of joined nodes allowed in the dynamic programming algorithm.
spark.sql.cbo.joinReorder.enabled false Enables join reorder in CBO.
spark.sql.cbo.starSchemaDetection false When true, it enables join reordering based on star schema detection.
spark.sql.columnNameOfCorruptRecord _corrupt_record The name of internal column for storing raw/un-parsed JSON and CSV records that fail to parse.
spark.sql.crossJoin.enabled false When false, we will throw an error if a query contains a cartesian product without explicit CROSS JOIN syntax.
spark.sql.extensions Name of the class used to configure Spark Session extensions. The class should implement Function1[SparkSessionExtension, Unit], and must have a no-args constructor.
spark.sql.files.ignoreCorruptFiles false Whether to ignore corrupt files. If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned.
spark.sql.files.maxPartitionBytes 134217728 The maximum number of bytes to pack into a single partition when reading files.
spark.sql.files.maxRecordsPerFile 0 Maximum number of records to write out to a single file. If this value is zero or negative, there is no limit.
spark.sql.groupByAliases true When true, aliases in a select list can be used in group by clauses. When false, an analysis exception is thrown in the case.
spark.sql.groupByOrdinal true When true, the ordinal numbers in group by clauses are treated as the position in the select list. When false, the ordinal numbers are ignored.
spark.sql.hive.caseSensitiveInferenceMode INFER_AND_SAVE Sets the action to take when a case-sensitive schema cannot be read from a Hive table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file formats such as Parquet are. Spark SQL must use a case-preserving schema when querying any table backed by files containing case-sensitive field names or queries may not return accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the case-sensitive schema from the underlying data files and write it back to the table properties), INFER_ONLY (infer the schema but don't attempt to write it to the table properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).
spark.sql.hive.filesourcePartitionFileCacheSize 262144000 When nonzero, enable caching of partition file metadata in memory. All tables share a cache that can use up to specified num bytes for file metadata. This conf only has an effect when hive filesource partition management is enabled.
spark.sql.hive.manageFilesourcePartitions true When true, enable metastore partition management for file source tables as well. This includes both datasource and converted Hive tables. When partition management is enabled, datasource tables store partition in the Hive metastore, and use the metastore to prune partitions during query planning.
spark.sql.hive.metastorePartitionPruning true When true, some predicates will be pushed down into the Hive metastore so that unmatching partitions can be eliminated earlier. This only affects Hive tables not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and HiveUtils.CONVERT_METASTORE_ORC for more information).
spark.sql.hive.thriftServer.singleSession false When set to true, Hive Thrift server is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.
spark.sql.hive.verifyPartitionPath false When true, check all the partition paths under the table's root directory when reading data stored in HDFS.
spark.sql.optimizer.metadataOnly true When true, enable the metadata-only query optimization that use the table's metadata to produce the partition columns instead of table scans. It applies when all the columns scanned are partition columns and the query has an aggregate operator that satisfies distinct semantics.
spark.sql.orc.filterPushdown false When true, enable filter pushdown for ORC files.
spark.sql.orderByOrdinal true When true, the ordinal numbers are treated as the position in the select list. When false, the ordinal numbers in order/sort by clause are ignored.
spark.sql.parquet.binaryAsString false Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
spark.sql.parquet.cacheMetadata true Turns on caching of Parquet schema metadata. Can speed up querying of static data.
spark.sql.parquet.compression.codec snappy Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo.
spark.sql.parquet.enableVectorizedReader true Enables vectorized parquet decoding.
spark.sql.parquet.filterPushdown true Enables Parquet filter push-down optimization when set to true.
spark.sql.parquet.int64AsTimestampMillis false When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the extended type. In this mode, the microsecond portion of the timestamp value will betruncated.
spark.sql.parquet.int96AsTimestamp true Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
spark.sql.parquet.mergeSchema false When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.
spark.sql.parquet.respectSummaryFiles false When true, we make assumption that all part-files of Parquet are consistent with summary files and we will ignore them when merging schema. Otherwise, if this is false, which is the default, we will merge all part-files. This should be considered as expert-only option, and shouldn't be enabled before knowing what it means exactly.
spark.sql.parquet.writeLegacyFormat false Whether to follow Parquet's format specification when converting Parquet schema to Spark SQL schema and vice versa.
spark.sql.pivotMaxValues 10000 When doing a pivot without specifying values for the pivot column this is the maximum number of (distinct) values that will be collected without error.
spark.sql.session.timeZone Etc/UTC The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.
spark.sql.shuffle.partitions 80 The default number of partitions to use when shuffling data for joins or aggregations.
spark.sql.sources.bucketing.enabled true When false, we will treat bucketed table as normal table
spark.sql.sources.default parquet The default data source to use in input/output.
spark.sql.sources.parallelPartitionDiscovery.threshold 32 The maximum number of paths allowed for listing files at driver side. If the number of detected paths exceeds this value during partition discovery, it tries to list the files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and LibSVM data sources.
spark.sql.sources.partitionColumnTypeInference.enabled true When true, automatically infer the data types for partitioned columns.
spark.sql.statistics.fallBackToHdfs false If the table statistics are not available from table metadata enable fall back to hdfs. This is useful in determining if a table is small enough to use auto broadcast joins.
spark.sql.streaming.checkpointLocation The default location for storing checkpoint data for streaming queries.
spark.sql.streaming.metricsEnabled false Whether Dropwizard/Codahale metrics will be reported for active streaming queries.
spark.sql.streaming.numRecentProgressUpdates 100 The number of progress updates to retain for a streaming query
spark.sql.thriftserver.scheduler.pool Set a Fair Scheduler pool for a JDBC client session.
spark.sql.thriftserver.ui.retainedSessions 200 The number of SQL client sessions kept in the JDBC/ODBC web UI history.
spark.sql.thriftserver.ui.retainedStatements 200 The number of SQL statements kept in the JDBC/ODBC web UI history.
spark.sql.variable.substitute true This enables substitution using syntax like ${var} ${system:var} and ${env:var}.
spark.sql.warehouse.dir file:/home/buildbot/datacalc/spark-warehouse/ The default location for managed databases and tables.

other Spark SQL config:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
https://github.com/unnunique/Conclusions/blob/master/AADocs/bigdata-docs/compute-components-docs/sparkbasic-docs/standalone.md

spark

阅读数 2735

没有更多推荐了,返回首页