sparksql中有hive吗

2019-03-01 19:54:07 suojie123 阅读数 56
//与hive表进行连接:
 // 1.一种通过conf.set 设置或直接将hive配置hive-site.xml文件拷贝工程
 // 2.加载mysql驱动

 //从hive中读取数据,驱动不是SQLContext,而是HiveContext
 val hsc=new HiveContext(sc)
 //执行hql语句
 hsc.sql("select * from hive.person").show()
 //执行的临时表
 val df=hsc.sql("select * from hive.person")
 df.registerTempTable("t_per")
 hsc.sql("select * from t_per").show()

 //把结果输出到hive中
val sourceRDD= sc.parallelize(List((1,"wang",12),(2,"afds",34)))
val id= StructField("id",IntegerType,true)
val name= StructField("name",StringType,true)
val age=  StructField("age",IntegerType,true)
val st=StructType(List(id,name,age))

 val mapRDD= sourceRDD.map(
   line=>{
     Row( line._1,
       line._2,
       line._3)
   }
 )
 hsc.createDataFrame(mapRDD,st).registerTempTable("per")
 val hiveDF=hsc.sql("select * from per")
 //输出到hive,hive表自动创建,如果表存在,报错:Exception in thread "main" org.apache.spark.sql.AnalysisException:  Table `hive`.`person` already exists.;
 hiveDF.write.saveAsTable("hive.person")

 

2018-05-10 22:14:43 qq_38899793 阅读数 3766

我们在做Spark开发的时候有时候需要用SparkSQL将数据写入Hive表中,今天就来看看SparkSQL与Hive的整合。

SparkSQL就是借助的Hive构建的数据仓库。

一、首先要配置Hive-site.xml。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://hadoop1:3306/hive?createDatabaseIfNotExist=true</value>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>aura</value>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>aura</value>
  </property>

</configuration>

二、把Hive-site.xml配置文件 复制到 $SPARK_HOME/conf目录下,在idea中开发需要将core-site.xml、hdfs-site.xml、hive-site.xml放在resources包下。

三、涉及驱动包的问题:

spark-sql --driver-class-path /usr/local/soft/spark/lib/mysql-connector-java-5.1.10.jar      //mysql驱动

切记SPARK_CLASSPATH这一行必须的配置:(否则报错)
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.15-bin.jar:$SPARK_CLASSPATH

这里有点模糊,到底是怎么做的?以后再更新把。

四、Maven配置:

如果spark和Hive需要整合,那么需要在pom.xml文件中添加如下依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>1.6.3</version>
    <scope>provided</scope>
</dependency>

五、使用方式:

Spark1.6:

val conf = new SparkConf().setMaster("local").setAppName("HiveSourceTest")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("create table worker (id string,salary double,bon double,dep int)")
hiveContext.sql("load data local inpath '/root/worker.txt' into table worker")
hiveContext.sql("select * from worker").show()

Spark2.0:

val sparkSession = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .config("spark.sql.warehouse.dir", "hdfs://hadoop1:9000/user/hive/warehouse")
      .enableHiveSupport()
      .getOrCreate()
spark.sql("")           //与1.6写法一致

可以看出主要就是创建表和载入数据这两个语句。



2015-12-11 17:10:00 weixin_30247781 阅读数 61

由于我Spark采用的是Cloudera公司的CDH,并且安装的时候是在线自动安装和部署的集群。最近在学习SparkSQL,看到SparkSQL on HIVE。下面主要是介绍一下如何通过SparkSQL在读取HIVE的数据。

(说明:如果不是采用CDH在线自动安装和部署的话,可能需要对源码进行编译,使它能够兼容HIVE。

编译的方式也很简单,只需要在Spark_SRC_home(源码的home目录下)执行如下命令:

./make-distribution.sh --tgz -Phadoop-2.2 -Pyarn -DskipTests -Dhadoop.version=2.6.0-cdh5.4.4 -Phive

编译好了之后,会在lib目录下多几个jar包。)

下面我主要介绍一下我使用的情况:

1、为了让Spark能够连接到Hive的原有数据仓库,我们需要将Hive中的hive-site.xml文件拷贝到Spark的conf目录下,这样就可以通过这个配置文件找到Hive的元数据以及数据存放。

在这里由于我的Spark是自动安装和部署的,因此需要知道CDH将hive-site.xml放在哪里。经过摸索。该文件默认所在的路径是:/etc/hive/conf 下。

同理,spark的conf也是在/etc/spark/conf。

此时,如上所述,将对应的hive-site.xml拷贝到spark/conf目录下即可

  如果Hive的元数据存放在Mysql中,我们还需要准备好Mysql相关驱动,比如:mysql-connector-java-5.1.22-bin.jar。

2、编写测试代码

    val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
    val sc=new SparkContext(conf)
    
    //create hivecontext
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    
    sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ")   //这里需要注意数据的间隔符
    
    sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src  ");
    
    sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)
    
    sc.stop()

 3、下面列举一下出现的问题:

(1)如果没有将hive-site.xml拷贝到spark/conf目录下,会出现:

分析:从错误提示上面就知道,spark无法知道hive的元数据的位置,所以就无法实例化对应的client。

解决的办法就是必须将hive-site.xml拷贝到spark/conf目录下

(2)测试代码中没有加sc.stop会出现如下错误:

ERROR scheduler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException

在代码最后一行添加sc.stop()解决了该问题。

 

转载于:https://www.cnblogs.com/ljy2013/p/5039641.html

2017-08-04 22:36:07 MrLevo520 阅读数 34689

Hive on Mapreduce

Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优化上的一些总结

Hive on Mapreduce执行流程

这里写图片描述

执行流程详细解析

  • Step 1:UI(user interface) 调用 executeQuery 接口,发送 HQL 查询语句给 Driver
  • Step 2:Driver 为查询语句创建会话句柄,并将查询语句发送给 Compiler, 等待其进行语句解析并生成执行计划
  • Step 3 and 4:Compiler 从 metastore 获取相关的元数据
  • Step 5:元数据用于对查询树中的表达式进行类型检查,以及基于查询谓词调整分区,生成计划
  • Step 6 (6.1,6.2,6.3):由 Compiler 生成的执行计划是阶段性的 DAG,每个阶段都可能会涉及到 Map/Reduce job、元数据的操作、HDFS 文件的操作,Execution Engine 将各个阶段的 DAG 提交给对应的组件执行。
  • Step 7, 8 and 9:在每个任务(mapper / reducer)中,查询结果会以临时文件的方式存储在 HDFS 中。保存查询结果的临时文件由 Execution Engine 直接从 HDFS 读取,作为从 Driver Fetch API 的返回内容。

Hive on Mapreduce特点

  1. 关系数据库里,表的加载模式是在数据加载时候强制确定的(表的加载模式是指数据库存储数据的文件格式),如果加载数据时候发现加载的数据不符合模式,关系数据库则会拒绝加载数据,这个就叫“写时模式”,写时模式会在数据加载时候对数据模式进行检查校验的操作。**Hive在加载数据时候和关系数据库不同,hive在加载数据时候不会对数据进行检查,也不会更改被加载的数据文件,而检查数据格式的操作是在查询操作时候执行,这种模式叫“读时模式”。**在实际应用中,写时模式在加载数据时候会对列进行索引,对数据进行压缩,因此加载数据的速度很慢,但是当数据加载好了,我们去查询数据的时候,速度很快。但是当我们的数据是非结构化,存储模式也是未知时候,关系数据操作这种场景就麻烦多了,这时候hive就会发挥它的优势。
  2. 关系数据库一个重要的特点是可以对某一行或某些行的数据进行更新、删除操作,hive不支持对某个具体行的操作,hive对数据的操作只支持覆盖原数据和追加数据。Hive也不支持事务和索引。更新、事务和索引都是关系数据库的特征,这些hive都不支持,也不打算支持,原因是hive的设计是海量数据进行处理,全数据的扫描时常态,针对某些具体数据进行操作的效率是很差的,对于更新操作,hive是通过查询将原表的数据进行转化最后存储在新表里,这和传统数据库的更新操作有很大不同。
  3. Hive也可以在hadoop做实时查询上做一份自己的贡献,那就是和hbase集成,hbase可以进行快速查询,但是hbase不支持类SQL的语句,那么此时hive可以给hbase提供sql语法解析的外壳,可以用类sql语句操作hbase数据库。
  4. Hive可以认为是MapReduce的一个封装、包装。Hive的意义就是在业务分析中将用户容易编写、会写的Sql语言转换为复杂难写的MapReduce程序,从而大大降低了Hadoop学习的门槛,让更多的用户可以利用Hadoop进行数据挖掘分析。

与传统数据库之间对比—From:Hive和传统数据库进行比较

比较项 SQL HiveQL
ANSI SQL 支持 不完全支持
更新 UPDATE\INSERT\DELETE insert OVERWRITE\INTO TABLE
事务 支持 不支持
模式 写模式 读模式
数据保存 块设备、本地文件系统 HDFS
延时
多表插入 不支持 支持
子查询 完全支持 只能用在From子句中
视图 Updatable Read-only
可扩展性
数据规模

SparkSQL

SparkSQL简介

SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-hadoop工具。但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,Shark应运而生,但又因为Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),2014年spark团队停止对Shark的开发,将所有资源放SparkSQL项目上

​ 其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

  • SparkSQL的两个组件
  1. SQLContext:Spark SQL提供SQLContext封装Spark中的所有关系型功能。可以用之前的示例中的现有SparkContext创建SQLContext。
  2. DataFrame:DataFrame是一个分布式的,按照命名列的形式组织的数据集合。DataFrame基于R语言中的data frame概念,与关系型数据库中的数据库表类似。通过调用将DataFrame的内容作为行RDD(RDD of Rows)返回的rdd方法,可以将DataFrame转换成RDD。可以通过如下数据源创建DataFrame:已有的RDD、结构化数据文件、JSON数据集、Hive表、外部数据库。

SparkSQL运行架构

类似于关系型数据库,SparkSQL也是语句也是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)组成,分别对应sql查询过程中的Result、Data Source、Operation,也就是说SQL语句按Operation–>Data Source–>Result的次序来描述的。

当执行SparkSQL语句的顺序

  1. 对读入的SQL语句进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是Data Source等,从而判断SQL语句是否规范;
    • Projection:简单说就是select选择的列的集合,参考:SQL Projection
  2. 将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定(Bind),如果相关的Projection、Data Source等都是存在的话,就表示这个SQL语句是可以执行的;
  3. 一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize);
  4. 计划执行(Execute),按Operation–>Data Source–>Result的次序来进行的,在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。

Hive on Spark

hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。

在hive中使用以下语句开启;当然引擎还可以使用tez,一样的方式
hive> set hive.execution.engine=spark;

Hive on Spark与SparkSql的区别

​ **hive on spark大体与SparkSQL结构类似,只是SQL引擎不同,但是计算引擎都是spark!**敲黑板!这才是重点!

我们来看下,在pyspark中使用Hive on Spark是中怎么样的体验

#初始化Spark SQL
#导入Spark SQL
from pyspark.sql import HiveContext,Row
# 当不能引入Hive依赖时
# from pyspark.sql import SQLContext,Row
# 注意,上面那一点才是关键的,他两来自于同一个包,你们区别能有多大


hiveCtx = HiveContext(sc)	#创建SQL上下文环境
input = hiveCtx.jsonFile(inputFile)	  #基本查询示例
input.registerTempTable("tweets")	#注册输入的SchemaRDD(SchemaRDD在Spark 1.3版本后已经改为DataFrame)
#依据retweetCount(转发计数)选出推文
topTweets = hiveCtx.sql("SELECT text,retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")

我们可以看到,sqlcontext和hivecontext都是出自于pyspark.sql包,可以从这里理解的话,其实hive on spark和sparksql并没有太大差别

结构上Hive On Spark和SparkSQL都是一个翻译层,把一个SQL翻译成分布式可执行的Spark程序。而且大家的引擎都是spark

SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。Hive原本是没有很好支持MapReduce之外的引擎的,而Hive On Tez项目让Hive得以支持和Spark近似的Planning结构(非MapReduce的DAG)。所以在此基础上,Cloudera主导启动了Hive On Spark。这个项目得到了IBM,Intel和MapR的支持(但是没有Databricks)。—From SparkSQL与Hive on Spark的比较

Hive on Mapreduce和SparkSQL使用场景

Hive on Mapreduce场景

  • Hive的出现可以让那些精通SQL技能、但是不熟悉MapReduce 、编程能力较弱与不擅长Java语言的用户能够在HDFS大规模数据集上很方便地利用SQL 语言查询、汇总、分析数据,毕竟精通SQL语言的人要比精通Java语言的多得多
  • Hive适合处理离线非实时数据

SparkSQL场景

  • Spark既可以运行本地local模式,也可以以Standalone、cluster等多种模式运行在Yarn、Mesos上,还可以运行在云端例如EC2。此外,Spark的数据来源非常广泛,可以处理来自HDFS、HBase、 Hive、Cassandra、Tachyon上的各种类型的数据。
  • 实时性要求或者速度要求较高的场所

Hive on Mapreduce和SparkSQL性能对比

具体实验参见:Spark SQL & Spark Hive编程开发, 并和Hive执行效率对比

结论:sparksql和hive on spark时间差不多,但都比hive on mapreduce快很多,官方数据认为spark会被传统mapreduce快10-100倍


更新

  • 2017.8.4 第一次更新,收集和整理

致谢

2018-10-31 10:13:58 hellozhxy 阅读数 5818

Spark SQL支持对Hive的读写操作。然而因为Hive有很多依赖包,所以这些依赖包没有包含在默认的Spark包里面。如果Hive依赖的包能在classpath找到,Spark将会自动加载它们。需要注意的是,这些Hive依赖包必须复制到所有的工作节点上,因为它们为了能够访问存储在Hive的数据,会调用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xmlcore-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目录下面。 
当使用Hive时,必须初始化一个支持Hive的SparkSession,用户即使没有部署一个Hive的环境仍然可以使用Hive。当没有配置hive-site.xml时,Spark会自动在当前应用目录创建metastore_db和创建由spark.sql.warehouse.dir配置的目录,如果没有配置,默认是当前应用目录下的spark-warehouse目录。 
注意:从Spark 2.0.0版本开始,hive-site.xml里面的hive.metastore.warehouse.dir属性已经被spark.sql.warehouse.dir替代,用于指定warehouse的默认数据路径(必须有写权限)。

import java.io.Serializable;  
import java.util.ArrayList;  
import java.util.List;  

import org.apache.spark.api.java.function.MapFunction;  
import org.apache.spark.sql.Dataset;  
import org.apache.spark.sql.Encoders;  
import org.apache.spark.sql.Row;  
import org.apache.spark.sql.SparkSession;  

public static class Record implements Serializable {  
  private int key;  
  private String value;  

  public int getKey() {  
    return key;  
  }  

  public void setKey(int key) {  
    this.key = key;  
  }  

  public String getValue() {  
    return value;  
  }  

  public void setValue(String value) {  
    this.value = value;  
  }  
}  

// warehouseLocation points to the default location for managed databases and tables  
String warehouseLocation = "/spark-warehouse";  
// init spark session with hive support  
SparkSession spark = SparkSession  
  .builder()  
  .appName("Java Spark Hive Example")  
  .master("local[*]")  
  .config("spark.sql.warehouse.dir", warehouseLocation)  
  .enableHiveSupport()  
  .getOrCreate();  

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");  
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");  

// Queries are expressed in HiveQL  
spark.sql("SELECT * FROM src").show();  
// +---+-------+  
// |key|  value|  
// +---+-------+  
// |238|val_238|  
// | 86| val_86|  
// |311|val_311|  
// ...  
// only showing top 20 rows  

// Aggregation queries are also supported.  
spark.sql("SELECT COUNT(*) FROM src").show();  
// +--------+  
// |count(1)|  
// +--------+  
// |    500 |  
// +--------+  

// The results of SQL queries are themselves DataFrames and support all normal functions.  
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");  

// The items in DaraFrames are of type Row, which lets you to access each column by ordinal.  
Dataset<String> stringsDS = sqlDF.map(row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING());  
stringsDS.show();  
// +--------------------+  
// |               value|  
// +--------------------+  
// |Key: 0, Value: val_0|  
// |Key: 0, Value: val_0|  
// |Key: 0, Value: val_0|  
// ...  

// You can also use DataFrames to create temporary views within a SparkSession.  
List<Record> records = new ArrayList<Record>();  
for (int key = 1; key < 100; key++) {  
  Record record = new Record();  
  record.setKey(key);  
  record.setValue("val_" + key);  
  records.add(record);  
}  
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);  
recordsDF.createOrReplaceTempView("records");  

// Queries can then join DataFrames data with data stored in Hive.  
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();  
// +---+------+---+------+  
// |key| value|key| value|  
// +---+------+---+------+  
// |  2| val_2|  2| val_2|  
// |  2| val_2|  2| val_2|  
// |  4| val_4|  4| val_4|  
// ...  
// only showing top 20 rows  

如果使用eclipse运行上述代码的话需要添加spark-hive的jars,下面是maven的配置:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.11 -->  
<dependency>  
    <groupId>org.apache.spark</groupId>  
    <artifactId>spark-hive_2.11</artifactId>  
    <version>2.1.0</version>  
</dependency>

否则的话会遇到下面错误:

Exception in thread "main" java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found.  
    at org.apache.spark.sql.SparkSession$Builder.enableHiveSupport(SparkSession.scala:815)  
    at JavaSparkHiveExample.main(JavaSparkHiveExample.java:17)  

与不同版本Hive Metastore的交互

Spark SQL对Hive的支持其中一个最重要的部分是与Hive metastore的交互,使得Spark SQL可以访问Hive表的元数据。从Spark 1.4.0版本开始,Spark SQL使用下面的配置可以用于查询不同版本的Hive metastores。需要注意的是,本质上Spark SQL会使用编译后的Hive 1.2.1版本的那些类来用于内部操作(serdes、UDFs、UDAFs等等)。

这里写图片描述

sparksql 操作hive

阅读数 3056