2016-08-26 13:46:33 refuil 阅读数 4321
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1840 人正在学习 去看看 李飞

SparkSQL前身是Shark,Shark强烈依赖于Hive。Spark原来没有做SQL多维度数据查询工具,后来开发了Shark,Shark依赖于Hive的解释引擎,部分在Spark中运行,还有一部分在Hadoop中运行。所以讲SparkSQL必须讲Hive。

一、spark on hive原理与搭建

1. hive的本质

1)Hive是分布式数据仓库,同时又是查询引擎,所以SparkSQL取代的只是Hives的查询引擎,在企业实际生产环境下,Hive+SparkSQL是目前最为经典的数据分析组合。

2)Hive本身就是一个简单单机版本的软件,主要负责:

A) 把HQL翻译成Mapper(s)-Reducer-Mapper(s)的代码,并且可能产生很多MapReduce的JOB。
B)把生成的MapReduce代码及相关资源打包成jar并发布到Hadoop集群中运行(这一切都是自动的)

3)Hive本身的架构如下所示: 
hive架构

可以通过CLI(命令终端)、JDBC/ODBC、Web GUI访问Hive。 
JavaEE或.net程序可以通过Hive处理,再把处理的结果展示给用户。 
也可以直接通过Web页面操作Hive。 
※ Hive本身只是一个单机版本的的软件,怎么访问HDFS的呢? 
=> 在Hive用Table的方式插入数据、检索数据等,这就需要知道数据放在HDFS的什么地方以及什么地方属于什么数据,Metastore就是保存这些元数据信息的。Hive通过访问元数据信息再去访问HDFS上的数据。

可以看出HDFS不是一个真实的文件系统,是虚拟的,是逻辑上的,HDFS只是一套软件而已,它是管理不同机器上的数据的,所以需要NameNode去管理元数据。DataNode去管理数据。 
Hive通过Metastore和NameNode打交道。

2. Hive安装和配置实战

Spark1.6.1中SparkSQL可以指定具体的Hive的版本。 
1) 从apache官网下载hive-1.2.1,并解压到/home/richard目录。 
http://mirrors.cnnic.cn/apache/hive/hive-1.2.1/ 
2) 配置.bashrc,追加以下内容,或者/etc/profile:

  1. export HIVE_HOME=/home/richard/hive-1.2.1
  2. export HIVE_CONF_DIR=/$HIVE_HOME/conf
  3. export PATH=$PATH:$HIVE_HOME/bin

3)进入/home/richard/hive-1.2.1/conf目录,生成hive-env.sh:

  1. cp hive-default.xml.template hive-site.xml

配置:

  1. export HADOOP_HOME=/opt/hadoop-2.6.0
  2. export HIVE_HOME=/opt/hive-1.2.1/
  3. export HIVE_CONF_DIR=/opt/hive-1.2.1/conf

Hive默认情况下放元数据的数据库是Derby,遗憾的是Derby是单用户,所以在生产环境下一般会采用支持多用户的数据库来进行MetaStore,且进行Master-Slaves主从读写分离和备份(一般Master节点负责写数据,Slaves节点负责读数据)。最常用的是MySQL。

cp hive-env.sh.template hive-env.sh 
再生成hive-site.xml,并配置如下:

  1. <configuration>
  2. <!-- WARNING!!! This file is auto generated for documentation purposes ONLY! -->
  3. <!-- WARNING!!! Any changes you make to this file will be ignored by Hive. -->
  4. <!-- WARNING!!! You must make your changes in hive-site.xml instead. -->
  5. <!-- Hive Execution Parameters -->
  6. <property>
  7. <name>javax.jdo.option.ConnectionURL</name>
  8. <value>jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true</value>
  9. <description>JDBC connect string for a JDBC metastore</description>
  10. </property>
  11. <property>
  12. <name>javax.jdo.option.ConnectionDriverName</name>
  13. <value>com.mysql.jdbc.Driver</value>
  14. <description>Driver class name for a JDBC metastore</description>
  15. </property>
  16. <property>
  17. <name>javax.jdo.option.ConnectionUserName</name>
  18. <value>root</value>
  19. <description>username to use against metastore database</description>
  20. </property>
  21. <property>
  22. <name>javax.jdo.option.ConnectionPassword</name>
  23. <value>778899..</value>
  24. <description>password to use against metastore database</description>
  25. </property>
  26. <property>
  27. <name>hive.metastore.warehouse.dir</name>
  28. <value>/user/hive/warehouse</value>
  29. <description>location of default database for the warehouse</description>
  30. </property>
  31. </configuration>

Hive中的DataBase和表其实就是HDFS上的目录和简单的文本文件。简单的文本文件中有几列数据,每列数据的类型无法直接从文本文件中得知。但当数据放入Hive中,Hive就把元数据放入Mysql中了,这样就可以基于数据的表进行查询了。

4) MYSQL的安装和配置 
root用户下执行yum -y install mysql-server即可自动安装 
执行rpm -qa mysql-server可以查看是否安装成功及安装的mysql版本。 
参考redhat4.4.7如何安装mysql(非yum 安装) 
http://blog.csdn.net/choice_jj/article/details/8827649 
http://jingyan.baidu.com/article/1974b289acebd0f4b0f77469.html 
设置密码问题: 
http://blog.csdn.net/rogerzhanglijie/article/details/9182125

hive打开错误参考:

  1. 主要是按照《DT-大数据 hive1.2.1mysql作为元数据库搭建》文档

http://www.aboutyun.com/thread-11131-1-1.html

另外还需要下载驱动包(mysql-connector-java-5.1.35.tar.gz)将.jar文件放到这个目录下:

  1. cd /usr/local/hive/apache-hive-1.2.1/lib/

启动hive报错

  1. Logging initialized using configuration in jar:file:/opt/hive-1.2.1/lib/hive-common-1.2.1.jar!/hive-log4j.properties
  2. Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
  3. Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
  4. Caused by: java.lang.reflect.InvocationTargetException
  5. Caused by: javax.jdo.JDOFatalDataStoreException: Unable to open a test connection to the given database. JDBC url = jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true, username = hive. Terminating connection pool (set lazyInit to true if you expect to start your database after
  6. ...
  1. root@master:/opt# netstat -tnlp | grep 3306
  2. tcp 0 0 127.0.0.1:3306 0.0.0.0:* LISTEN 5090/mysqld
  3. //mysql的监听IP应该是0.0.0.0
  4. //在vi /etc/mysql/my.cnf 中修改
  5. killall mysqld 后守护进程还会启动mysql,等于重启

另外报错:

  1. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  2. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  3. [ERROR] Terminal initialization failed; falling back to unsupported

原因是hadoop目录下存在老版本jline: 
/hadoop-2.5.2/share/hadoop/yarn/lib: 
-rw-r--r-- 1 root root 87325 Mar 10 18:10 jline-0.9.94.jar

解决方法是:

  1. hive下的新版本jlineJAR包拷贝到hadoop下:
  2. cp /hive/apache-hive-1.1.0-bin/lib/jline-2.12.jar ./

5) Hive的表有两种基本类型:一种是内部表(这种表数据属于Hive本身,即如果原来的数据在HDFS的其他地方,此时数据会通过HDFS移动到Hive所在目录,如果删除Hive中的该表的话数据和元数据均会被删除),一种是外部表(这种表数据不属于Hive数据仓库,元数据中会表达具体数据在哪里,使用时和内部表的使用一样,只是如果通过Hive去删除的话,删除的只是元数据,并没有删除数据本身)

6)Hive扩展(Hive的数据存储-转载)

Hive是基于Hadoop分布式文件系统的,它的数据存储在Hadoop分布式文件系统中。Hive本身是没有专门的数据存储格式,也没有为数据建立索引,只需要在创建表的时候告诉Hive数据中的列分隔符和行分隔符,Hive就可以解析数据。所以往Hive表里面导入数据只是简单的将数据移动到表所在的目录中(如果数据是在HDFS上;但如果数据是在本地文件系统中,那么是将数据复制到表所在的目录中)。

Hive中主要包含以下几种数据模型:Table(表),External Table(外部表),Partition(分区),Bucket(桶)(本博客会专门写几篇博文来介绍分区和桶)。

1、表:Hive中的表和关系型数据库中的表在概念上很类似,每个表在HDFS中都有相应的目录用来存储表的数据,这个目录可以通过${HIVE_HOME}/conf/hive-site.xml配置文件中的 hive.metastore.warehouse.dir属性来配置,这个属性默认的值是/user/hive/warehouse(这个目录在 HDFS上),我们可以根据实际的情况来修改这个配置。如果我有一个表wyp,那么在HDFS中会创建/user/hive/warehouse/wyp 目录(这里假定hive.metastore.warehouse.dir配置为/user/hive/warehouse);wyp表所有的数据都存放在这个目录中。这个例外是外部表。

2、外部表:外部表指向已经在HDFS中存在的数据,可以创建Partition。它和内部表在元数据的组织上是相同的,而实际数据的存储则有较大的差异。内部表的创建过程和数据加载过程这两个过程可以分别独立完成,也可以在同一个语句中完成,在加载数据的过程中,实际数据会被移动到数据仓库目录中;之后对数据对访问将会直接在数据仓库目录中完成。删除表时,表中的数据和元数据将会被同时删除。而外部表只有一个过程,加载数据和创建表同时完成(CREATE EXTERNAL TABLE ……LOCATION),实际数据是存储在LOCATION后面指定的 HDFS 路径中,并不会移动到数据仓库目录中。当删除一个External Table时,仅删除该链接。

3、分区:在Hive中,表的每一个分区对应表下的相应目录,所有分区的数据都是存储在对应的目录中。比如wyp 表有dt和city两个分区,则对应dt=20131218,city=BJ对应表的目录为/user/hive/warehouse /dt=20131218/city=BJ,所有属于这个分区的数据都存放在这个目录中。

4、桶:对指定的列计算其hash,根据hash值切分数据,目的是为了并行,每一个桶对应一个文件(注意和分区的区别)。比如将wyp表id列分散至16个桶中,首先对id列的值计算hash,对应hash值为0和16的数据存储的HDFS目录为:/user /hive/warehouse/wyp/part-00000;而hash值为2的数据存储的HDFS 目录为:/user/hive/warehouse/wyp/part-00002。如果想应用很多的Map任务这样是不错的选择。 
Hive数据抽象结构图

3. 使用Hive分析搜索数据

参考: 
http://lqding.blog.51cto.com/9123978/1751030

启动HDFS/Yarn。注意如果要使用Hive进行查询就需要启动Yarn。 
启动Hive。 
通过show databases;可以查看数据库。默认database只有default。 
选取搜狗实验的三个数据源:http://download.labs.sogou.com/dl/q.html

  1. hadoop dfs -mkdir /library/sogou
  2. hadoop dfs -put ./SogouQ1.txt /library/sogou
  3. hadoop dfs -put ./SogouQ2.txt /library/sogou
  4. hadoop dfs -put ./SogouQ3.txt /library/sogou
  1. create table SogouQ3(ID STRING, WEBSESSION STRING, WORD STRING, S_SEQ INT, C_SEQ INT, WEBSITE STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; //建表
  2. LOAD DATA INPATH '/library/sogou/SogouQ3.txt' OVERWRITE INTO TABLE SogouQ3; //加载数据

查询搜索排名第一,点击排名为第一的结果: 
select count(*) from sogouq1 where S_SEQ=1 and C_SEQ=1;

搜索日志中,关注排名前五的内容,并且给出分别为多少次: 
select WORD, count(WORD) as COUNTWord from SogouQ1 group by WORD order by COUNTWord desc limit 5

查询例子mysql

mysql -u root -p

  1. creeate database hive;
  2. use hive;
  3. create table person(name String,age int);
  4. insert into person values(‘richard’,’34’);
  5. select * from person;即可查询。

到此不需要启动spark,只需要启动HDFS。

4. Spark SQL on Hive配置及实战

1)spark on hive 配置

  1. 切换到spar的conf目录下使用vi hive-site.xml创建hive-site.xml.并填写如下内容
  1. <configuration>
  2. <property>
  3. <name>hive.metastore.uris</name>
  4. <value>thrift://master:9083</value>
  5. <description>thrift URI for the remote metastore.Used by metastore client to connect to remote metastore. </description>
  6. </property>
  7. </configuration>

因为用sparksql操作hive实际上是把hive 当做数据仓库。数据仓库肯定有元数据和数据本身。要访问真正的数据就要访问他的元数据。所以只需要配置hive.metastore.uris即可。(不需在每台机器上配置)

2)启动集群

  1. 启动dfs 服务start-dfs.sh
  2. 启动hive 数据仓库服务 hive --service metastore >metastore.log 2>& 1&
  3. 启动spark服务start-all.sh
  4. 启动sparkshell ./spark-shell –master spark://master:7077

3)案例实战

  1. Spark on hive 实战在spark-shell 模式下
  1. val hiveContext= new org.apache.spark.sql.hive.HiveContext(sc)
  2. hiveContext.sql("use hive") //使用hive 数据库
  3. hiveContext.sql("show tables").collect.foreach(println) // 查询数据库中的表
  4. hiveContext.sql("select count(*) from sogouq1").collect.foreach(println)//(注意此时使用的是spark的引擎)
  5. hiveContext.sql(“select count(*) from sogouq2 where website like '%baidu%'”).collect.foreach(println)
  6. hiveContext.sql(“select count(*) from sogouq2 where s_seq=1 and c_seq=1 and website like '%baidu%'”).collect.foreach(println)
  1. 不基于hive 的实战代码,在spark-shell 模式下
  1. scala> sqlContext
  2. res8: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@35920655(可以创建很多hiveContexthivecongtext连接的是数据仓库,程序本身(spark中的job),job在程序中并行运行,如果都hive的数据,如果用句柄,对句柄的占用比较麻烦,所有用多个实例。从查询的角度来说,有多个实例也很正常)
  3. val df =sqlcontext.read.json(“library/examples/src/main/resources/people.json”) //读取json 数据
  4. df.show()
  5. df.printSchema
  6. df.select(“name”).show()
  7. df.select(df(“name”),df(“age”)+1).show()
2014-09-02 14:10:28 wbj0110 阅读数 56
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1840 人正在学习 去看看 李飞

Spark SQL也公布了很久,今天写了个程序来看下Spark SQL、Spark Hive以及直接用Hive执行的效率进行了对比。以上测试都是跑在YARN上。
  首先我们来看看我的环境:

  1. 3台DataNode,2台NameNode,每台机器20G内存,24核
  2. 数据都是lzo格式的,共336个文件,338.6 G
  3. 无其他任务执行

如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

三个测试都是执行

1 select count(*), host, module
2 from ewaplog
3 group by host, module
4 order by host, module;

下面我们先来看看Spark SQL核心的代码(关于Spark SQL的详细介绍请参见Spark官方文档,这里我就不介绍了。):

01 /**
02  * User: 过往记忆
03  * Date: 14-8-13
04  * Time: 下午23:16
05  * bolg: http://www.iteblog.com
06  * 本文地址:http://www.iteblog.com/archives/1090
07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08  * 过往记忆博客微信公共帐号:iteblog_hadoop
09  */
10  
11 JavaSparkContext ctx = ...
12 JavaSQLContext sqlCtx = ...
13 JavaRDD<Entry> stringJavaRDD = ctx.textFile(args[0]).map(
14       new Function<String, Entry>() {
15             @Override
16             public Entry call(String str) throws Exception {
17                 String[] split = str.split("\u0001");
18                 if (split.length < 3) {
19                     return new Entry("""""");
20                 }
21  
22                 return new Entry(split[0], split[1], split[2]);
23             }
24 });
25  
26 JavaSchemaRDD schemaPeople = sqlCtx.applySchema(stringJavaRDD, Entry.class);
27 schemaPeople.registerAsTable("entry");
28 JavaSchemaRDD teenagers = sqlCtx.sql("select count(*), host, module " +
29                 "from entry " +
30                 "group by host, module " +
31                 "order by host, module");
32  
33 List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
34      public String call(Row row) {
35           return row.getLong(0) + "\t" +
36                   row.getString(1) + "\t" + row.getString(2);
37      }
38 }).collect();
39  
40 for (String name : teenagerNames) {
41             System.out.println(name);
42 }

Spark Hive核心代码:

01 /**
02  * User: 过往记忆
03  * Date: 14-8-23
04  * Time: 下午23:16
05  * bolg: http://www.iteblog.com
06  * 本文地址:http://www.iteblog.com/archives/1090
07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08  * 过往记忆博客微信公共帐号:iteblog_hadoop
09  */
10 JavaHiveContext hiveContext =....;
11 JavaSchemaRDD result = hiveContext.hql("select count(*), host, module " +
12                 "from ewaplog " +
13                 "group by host, module " +
14                 "order by host, module");
15 List<Row> collect = result.collect();
16 for (Row row : collect) {
17     System.out.println(row.get(0) + "\t" + row.get(1) + "\t" + row.get(2));
18 }

  大家可以看到Spark Hive核心代码里面的SQL语句和直接在Hive上面执行一样,在执行这个代码的时候,需要确保ewaplog存在。而且在运行这个程序的时候需要依赖Hive的一些jar包,需要依赖Hive的元数据等信息。对Hive的依赖比较大。而Spark SQL直接读取lzo文件,并没有涉及到Hive,相比Spark Hive依赖性这方便很好。Spark SQL直接读取lzo文件,然后将数据存放在RDD中,applySchema方法将JavaRDD转换成JavaSchemaRDD,我们来看看文档是怎么来描述的

  At the core of this component is a new type of RDD, SchemaRDD. SchemaRDDs are composed Row objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.

转换成JavaSchemaRDD之后,我们可以用用registerAsTable将它注册到表中,之后就可以通过JavaSQLContext的sql方法来执行相应的sql语句了。
  用Maven编译完上面的程序之后,放到Hadoop集群上面运行:

1 iteblog@Spark $ spark-submit --master yarn-cluster 
2                              --jars lib/spark-sql_2.10-1.0.0.jar
3                              --class SparkSQLTest
4                              --queue queue1
5                              ./spark-1.0-SNAPSHOT.jar
6                              /home/wyp/test/*.lzo

分别经过了20分钟左右的时间,Spark SQL和Spark Hive都可以运行完,结果如下:

01 39511517    bokingserver1   CN1_hbase_android_client
02 59141803    bokingserver1   CN1_hbase_iphone_client
03 39544052    bokingserver2   CN1_hbase_android_client
04 59156743    bokingserver2   CN1_hbase_iphone_client
05 23472413    bokingserver3   CN1_hbase_android_client
06 35251936    bokingserver3   CN1_hbase_iphone_client
07 23457708    bokingserver4   CN1_hbase_android_client
08 35262400    bokingserver4   CN1_hbase_iphone_client
09 19832715    bokingserver5   CN1_hbase_android_client
10 51003885    bokingserver5   CN1_hbase_iphone_client
11 19831076    bokingserver6   CN1_hbase_android_client
12 50997314    bokingserver6   CN1_hbase_iphone_client
13 30526207    bokingserver7   CN1_hbase_android_client
14 50702806    bokingserver7   CN1_hbase_iphone_client
15 54844214    bokingserver8   CN1_hbase_android_client
16 88062792    bokingserver8   CN1_hbase_iphone_client
17 54852596    bokingserver9   CN1_hbase_android_client
18 88043401    bokingserver9   CN1_hbase_iphone_client
19 54864322    bokingserver10  CN1_hbase_android_client
20 88041583    bokingserver10  CN1_hbase_iphone_client
21 54891529    bokingserver11  CN1_hbase_android_client
22 88007489    bokingserver11  CN1_hbase_iphone_client
23 54613917    bokingserver12  CN1_hbase_android_client
24 87623763    bokingserver12  CN1_hbase_iphone_client

  为了比较基于Spark的任务确实比基于Mapreduce的快,我特意用Hive执行了同样的任务,如下:

01 hive> select count(*), host, module from ewaplog
02     > group by host, module order by host, module;
03  
04 Job 0: Map: 2845  Reduce: 364   Cumulative CPU: 17144.59 sec
05 HDFS Read: 363542156311 HDFS Write: 36516 SUCCESS
06 Job 1: Map: 1  Reduce: 1   Cumulative CPU: 4.82 sec
07 HDFS Read: 114193 HDFS Write: 1260 SUCCESS
08 Total MapReduce CPU Time Spent: 0 days 4 hours 45 minutes 49 seconds 410 msec
09 OK
10 39511517    bokingserver1   CN1_hbase_android_client
11 59141803    bokingserver1   CN1_hbase_iphone_client
12 39544052    bokingserver2   CN1_hbase_android_client
13 59156743    bokingserver2   CN1_hbase_iphone_client
14 23472413    bokingserver3   CN1_hbase_android_client
15 35251936    bokingserver3   CN1_hbase_iphone_client
16 23457708    bokingserver4   CN1_hbase_android_client
17 35262400    bokingserver4   CN1_hbase_iphone_client
18 19832715    bokingserver5   CN1_hbase_android_client
19 51003885    bokingserver5   CN1_hbase_iphone_client
20 19831076    bokingserver6   CN1_hbase_android_client
21 50997314    bokingserver6   CN1_hbase_iphone_client
22 30526207    bokingserver7   CN1_hbase_android_client
23 50702806    bokingserver7   CN1_hbase_iphone_client
24 54844214    bokingserver8   CN1_hbase_android_client
25 88062792    bokingserver8   CN1_hbase_iphone_client
26 54852596    bokingserver9   CN1_hbase_android_client
27 88043401    bokingserver9   CN1_hbase_iphone_client
28 54864322    bokingserver10  CN1_hbase_android_client
29 88041583    bokingserver10  CN1_hbase_iphone_client
30 54891529    bokingserver11  CN1_hbase_android_client
31 88007489    bokingserver11  CN1_hbase_iphone_client
32 54613917    bokingserver12  CN1_hbase_android_client
33 87623763    bokingserver12  CN1_hbase_iphone_client
34 Time taken: 1818.706 seconds, Fetched: 24 row(s)

  从上面的显示我们可以看出,Hive执行同样的任务用了30分钟,而Spark用了20分钟,也就是省了1/3的时间,还是很快的。在运行的过程中,我发现Spark消耗内存比较大,在程序运行期间,三个子节点负载很高,整个队列的资源消耗了一半以上。我想如果集群的机器数据更多的话,Spark的运行速度应该还会有一些提升。好了今天就说到这,欢迎关注本博客。

 转载自过往记忆(http://www.iteblog.com/)

 
2017-05-02 16:38:06 a123demi 阅读数 5495
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1840 人正在学习 去看看 李飞

        上一篇文章我们实现了Java+Spark+Hive+Maven实现和异常处理,测试的实例是打包运行在linux环境,但当直接在Windows系统运行时,会有Hive相关异常的输出,本文将帮助您如何在Windows系统上集成Hadoop+Spark+Hive开发环境。

一.开发环境

系统:windows 7

JDK:jdk1.7

eclipse:Mars.2 Release (4.5.2)

Hadoop:hadoop-2.6.5

Spark:spark-1.6.2-bin-hadoop2.6

Hive:hive-2.1.1

二.前期准备

1.系统环境配置

JDK,Hadoop和Spark配置系统环境

2.Hadoop相关文件

winutils.exe和hadoop.dll,下载地址:hadoop2.6.5中winutils和hadoop

将上面2个文件放置..\hadoop-2.6.5\bin目录下;

将winutils.exe同时放置到C:\Windows\System32目录下;

3.新建tmp/hive目录

在应用工程目录中新建tmp/hive目录,由于我的工程是放置在E盘,顾可以在E盘新建tmp/hive目录

三.hive配置

1.Hive环境

本系统的Hive是部署在远程linux集群环境上的。主安装目录ip地址:10.32.19.50:9083

具体Hive在linux环境的部署,请查看相关文档,本文不介绍。

2.Windows中hive-site.xml文件配置

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<!-- 配置数据源存储地址 -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>

	<!-- 配置是否本地 -->
    <property>
        <name>hive.metastore.local</name>
        <value>false</value>
    </property>

	<!-- 配置数据源地址 -->
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://10.32.19.50:9083</value>
    </property>
    
</configuration>


windows中hive-site.xml配置


四.实例测试

需求:查询hive数据,eclipse正常显示

1.实例工程结构


实例工程

2.pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.lm.hive</groupId>
	<artifactId>SparkHive</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>SparkHive</name>
	<url>http://maven.apache.org</url>

	 <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

 
    <dependencies>
        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.10</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.derby</groupId>
            <artifactId>derby</artifactId>
            <version>10.10.2.0</version>
        </dependency>
        <!-- hadoop -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.4</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>
    
    <build>
		<sourceDirectory>src/main/java</sourceDirectory>
		<testSourceDirectory>src/main/test</testSourceDirectory>

		<plugins>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass></mainClass>
						</manifest>
					</archive>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>

			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.2.1</version>
				<executions>
					<execution>
						<goals>
							<goal>exec</goal>
						</goals>
					</execution>
				</executions>
				<configuration>
					<executable>java</executable>
					<includeProjectDependencies>true</includeProjectDependencies>
					<includePluginDependencies>false</includePluginDependencies>
					<classpathScope>compile</classpathScope>
					<mainClass></mainClass>
				</configuration>
			</plugin>

			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
					<showWarnings>true</showWarnings>
				</configuration>
			</plugin>

		</plugins>
	</build>
</project>




3.测试用例实现

package com.lm.hive.SparkHive;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.HiveContext;

/**
 * Spark sql获取Hive数据
 *
 */
public class App 
{
    public static void main( String[] args ) 
    {
        SparkConf sparkConf = new SparkConf().setAppName("SparkHive").setMaster("local[2]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        
        //不要使用SQLContext,部署异常找不到数据库和表
        HiveContext hiveContext = new HiveContext(sc);
//        SQLContext sqlContext = new SQLContext(sc);
        //查询表前10条数据
        hiveContext.sql("select * from bi_ods.owms_m_locator limit 10").show();
        
        sc.stop();
    }
}

4.测试结果展示


测试结果展示


代码下载地址:eclispe集成hadoop+spark+hive开发实例代码

2020-01-19 23:21:09 Earl211 阅读数 10
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1840 人正在学习 去看看 李飞


在日常的etl开发的过程中,不管是使用spark,或者是hive来做开发,经常会遇见任务跑的特别慢,或者任务出现oom,或者数据量并不大,但是任务就是跑的特别慢等等情况。

关于任务的优化,我将自己工作中用到的一些小的trick分享出来。

数据倾斜

数据倾斜是什么?数据倾斜是在计算数据的时候,数据的分散度不够,大量的数据集中在一台或者几台机器上计算,导致整个计算过程过慢甚至失败。
举个例子,1TB的数据,十台机器并行计算,由于数据分区不合理,9台机器上各承担了10GB的计算任务,剩下910GB的数据在一台机器上。因而负载大的机器,需要的资源过大,运行的时间过长等等问题。

数据倾斜的表现

  • hadoop中的数据倾斜
    • 一个或者多个reduce卡主
    • 各种oom报错
      *单个reduce, 读写的数据量极大
  • spark中的数据倾斜
    • Excutor lost, OOM, shuffle过程出错
    • Driver OOM
    • 单个Executor执行特别就,整个任务卡在某个阶段不能执行
    • 正常运行的任务突然失败

问题查找

  • hadoop中的问题查找
    • 读取的数据源是否数据倾斜
    • group或者on的时候,是否有大量相同的key
  • spark中的问题查找
    • 读取的数据源是否数据倾斜
    • spark计算中的数据倾斜,只可能发生在shuffle操作阶段。查找代码,啊中的程序
    • 读取数据源,按照猜测对某几个字段进行group统计,定位倾斜的字段
    • 看log,根据spark ui,确定是哪个stage特别慢。然后根据stage,定位代码

问题解决

这个问题的解决主要从三个方面进行着手解决:业务逻辑、程序层面和调参方面
业务逻辑

  1. 如果读取的数据源倾斜比较严重,则上游写入数据平均一些

  2. 将直接聚合,改为二级甚至三级聚合。比如:
    select key1, xxx from a group by key1.
    改为
    select key1, xxx from ( select key1, key2, key3, xxx from a group by key1, key2, key3) group by key1

  3. 与2类似,如果没有存在的字段用于多重聚合。可以生成随机key,实现多重聚合。

  4. 区别对待,将倾斜的key的数据单独处理。比如倾斜的key,增加并行度等等方式。这里如何判断倾斜的key,可以使用历史数据或者随机sample的方式判断。或者倾斜的key单独join或者group不要等操作

  5. 自定义分区,需要用户自定义实现分区策略,这种方法效果比较明显。

程序层面

  1. 使用group by 提到count(dinstinc xxx)操作

调仓方面

  1. 不管是hive还是spark,如果有小表与大表join,可以使用mapjoin提到reduce join
  2. hsql,可以调整map, reduce的个数,来缓解数据倾斜问题
  3. 更多相关参数,可以去官网撸一遍配置

拆数据

另外一个关于任务优化的办法,就是化大为小,将大任务拆成小任务。下面提供几个拆数据的思路。

  1. 时间维度,将天任务改成小时任务,甚至将小时任务改成分钟任务甚至秒级别的实时任务。这样单次处理的任务数据量会小很多,比较好处理。
  2. 按照某个维度,对数据进行拆分,使用多个任务进行并行处理

合数据

天下合久必分,分久必合。如果一旦数据拆分特别细,导致hive分区过多,则会出现任务特别慢的情况。这种情况,可能就要考虑合并文件

预计算

这个思路是将可以提前计算的数据提前计算好,一旦等待的数据ready之后,可以更快的计算处理。
举个例子,原本一个任务需要读取A和B两份数据,A数据第二天凌晨1点就好了,B数据需要到第二天10点才能ready。如果在B数据好了之后再开始数据,则任务需要运行1个小时。但是通过修改代码,可以将A数据提前计算好,等到B数据ready之后,再进行计算,只需要20分钟。业务就可以看到数据了。

其他零零散散的任务优化技巧

  1. 大量的spark小任务,可以使用多线程提升效率
  2. 当涉及到大量数据任务的数据流的时候,优化数据产出流程或者重新建模,可能会更快的产出最终的数据。

参考资料

  1. https://blog.csdn.net/u010039929/article/details/55044407
  2. https://zhuanlan.zhihu.com/p/64240857
2015-12-24 20:43:40 sparkexpert 阅读数 4205
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1840 人正在学习 去看看 李飞

利用Eclipse进行Spark开发过程中,特别是针对Hive开发时,经常会碰到这样一个问题,就是无法找到metastore。而一旦找不到的时候,hive会自动创建一个临时的本地的metastore,其提示INFO信息如下:

15/12/24 20:46:02 INFO HiveContext: Initializing execution hive, version 1.2.1
15/12/24 20:46:02 INFO ClientWrapper: Inspected Hadoop version: 2.6.0
15/12/24 20:46:02 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
15/12/24 20:46:03 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/12/24 20:46:03 INFO ObjectStore: ObjectStore, initialize called
15/12/24 20:46:03 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/12/24 20:46:03 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
15/12/24 20:46:17 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
15/12/24 20:46:19 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
15/12/24 20:46:19 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
15/12/24 20:46:28 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
15/12/24 20:46:28 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
15/12/24 20:46:30 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
15/12/24 20:46:30 INFO ObjectStore: Initialized ObjectStore
15/12/24 20:46:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
15/12/24 20:46:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
15/12/24 20:46:32 INFO HiveMetaStore: Added admin role in metastore
15/12/24 20:46:32 INFO HiveMetaStore: Added public role in metastore
15/12/24 20:46:33 INFO HiveMetaStore: No user is added in admin role, since config is empty
15/12/24 20:46:33 INFO HiveMetaStore: 0: get_all_databases
15/12/24 20:46:33 INFO audit: ugi=ndscbigdata ip=unknown-ip-addr cmd=get_all_databases
15/12/24 20:46:33 INFO HiveMetaStore: 0: get_functions: db=default pat=*
15/12/24 20:46:33 INFO audit: ugi=ndscbigdata ip=unknown-ip-addr cmd=get_functions: db=default pat=*
15/12/24 20:46:33 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
15/12/24 20:46:36 INFO SessionState: Created local directory: /tmp/1f162c4e-9707-467f-8adc-f8124c3e929a_resources
15/12/24 20:46:36 INFO SessionState: Created HDFS directory: /tmp/hive/ndscbigdata/1f162c4e-9707-467f-8adc-f8124c3e929a
15/12/24 20:46:36 INFO SessionState: Created local directory: /tmp/ndscbigdata/1f162c4e-9707-467f-8adc-f8124c3e929a
15/12/24 20:46:36 INFO SessionState: Created HDFS directory: /tmp/hive/ndscbigdata/1f162c4e-9707-467f-8adc-f8124c3e929a/_tmp_space.db
15/12/24 20:46:36 INFO HiveContext: default warehouse location is /user/hive/warehouse
15/12/24 20:46:36 INFO HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
15/12/24 20:46:36 INFO ClientWrapper: Inspected Hadoop version: 2.6.0
15/12/24 20:46:36 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
15/12/24 20:46:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/12/24 20:46:37 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/12/24 20:46:37 INFO ObjectStore: ObjectStore, initialize called
15/12/24 20:46:37 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/12/24 20:46:37 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored


这种问题的解决是个很烦人的问题,在spark shell是可以搞定的,为什么spark eclipse就不行呢,肯定是hive-site.xml配置的问题。于是针对这个问题,进行拷贝配置就可以。

进行一个简单的测试,如从一个自己建的表中进行创建,果然KO。


spark hive执行树

阅读数 973

Spark on Hive开发demo

阅读数 371

spark整合hive

阅读数 140

没有更多推荐了,返回首页