2019-02-26 10:04:31 wangshuminjava 阅读数 611
  • Hive大数据分析大数据仓库中应用实战

    随着大数据技术的不断发展壮大, Hive不再是大数据技术生态圈中一个普通的工具,而是在大数据分析和大数据仓库中占据着几乎不可替代的重要作用,大数据分析中Hive和Hbase、Hive和Spark SQL、Hive和Impala的结合使用愈加紧密,大数据仓库中Hive在数据仓库建模模块的作用暂时无可替代。所以,深入学好Hive是入门大数据分析、大数据仓库最好的选择。

    211 人正在学习 去看看 李飞

利用Hive进行数据分析

2016年07月06日 21:30:20 wh_springer 阅读数:16516

近十年来,随着Hadoop生态系统的不断完善,Hadoop早已成为大数据事实上的行业标准之一。面对当今互联网产生的巨大的TB甚至PB级原始数据,利用基于Hadoop的数据仓库解决方案Hive早已是Hadoop的热点应用之一。达观数据团队长期致力于研究和积累Hadoop系统的技术和经验,并构建起了分布式存储、分析、挖掘以及应用的整套大数据处理平台。本文将从Hive的原理、架构及优化等方面来分享Hive的一些心得和使用经验,希望对大家有所收货。

 

1  Hive基本原理

Hadoop是一个流行的开源框架,用来存储和处理商用硬件上的大规模数据集。对于HDFS上的海量日志而言,编写Mapreduce程序代码对于类似数据仓库的需求来说总是显得相对于难以维护和重用,Hive作为一种基于Hadoop的数据仓库解决方案应运而生,并得到了广泛应用。

Hive是基于Hadoop的数据仓库平台,由Facebook贡献,其支持类似SQL的结构化查询功能。Facebook设计开发Hive的初衷就是让那些熟悉sql编程方式的人也可以更好的利用hadoop,hive可以让数据分析人员只关注于具体业务模型,而不需要深入了解Map/Reduce的编程细节,但是这并不意味着使用hive不需要了解和学习Map/Reduce编程模型和hadoop,复杂的业务需求和模型总是存在的,对于Hive分析人员来说,深入了解Hadoop和Hive的原理和Mapreduce模型,对于优化查询总有益处。

以下先以一个简单的例子说明利用hadoop Map/Reduce程序和Hive实现hadoop word count的例子。

 

图:mapreduce和hive分别实现count

 

通过以上可以看出,hive优点:成本低,可以通过类sql语句快速实现简单或复杂的MapReduce统计。

借助于Hadoop和HDFS的大数据存储能力,数据仍然存储于Hadoop的HDFS中,Hive提供了一种类SQL的查询语言:HiveQL(HQL),对数据进行管理和分析,开发人员可以近乎sql的方式来实现逻辑,从而加快应用开发效率。(关于Hadoop、hdfs的更多知识请参考hadoop官网及hadoop权威指南)

HQL经过解析和编译,最终会生成基于Hadoop平台的Map Reduce任务,Hadoop通过执行这些任务来完成HQL的执行。

 

1.1   Hive组件

Hive的组件总体上可以分为以下几个部分:用户接口(UI)、驱动、编译器、元数据(Hive系统参数数据)和执行引擎。

 

图:Hive执行流程图

1)      对外的接口UI包括以下几种:命令行CLI,Web界面、JDBC/ODBC接口;

2)      驱动:接收用户提交的查询HQL;

3)     编译器:解析查询语句,执行语法分析,生成执行计划;

4)     元数据Metadata:存放系统的表、分区、列、列类型等所有信息,以及对应的HDFS文件信息等;

5)     执行引擎:执行执行计划,执行计划是一个有向无环图,执行引擎按照各个任务的依赖关系选择执行任务(Job)。

 

需要注意的是,元数据库一般是通过关系型数据库MySQL来存储。元数据维护了库信息、表信息、列信息等所有内容,例如表T包含哪些列,各列的类型等等。因此元数据库十分重要,需要定期备份以及支持查询的扩展性。

 

读时验证机制

与传统数据库对表数据进行写时严重不同,Hive对数据的验证方式为读时模式,即只有在读表数据的时候,hive才检查解析具体的字段、shema等,从而保证了大数据量的快速加载。

既然hive采用的读时验证机制,那么 如果表schema与表文件内容不匹配,会发生什么呢?

答案是hive会尽其所能的去读数据。如果schema中表有10个字段,而文件记录却只有3个字段,那么其中7个字段将为null;如果某些字段类型定位为数值类型,但是记录中却为非数值字符串,这些字段也将会被转换为null。简而言之,hive会努力catch读数据时遇到的错误,并努力返回。既然Hive表数据存储在HDFS中且Hive采用的是读时验证方式,定义完表的schema会自动生成表数据的HDFS目录,且我们可以以任何可能的方式来加载表数据或者利用HDFS API将数据写入文件,同理,当我们若需要将hive数据写入其他库(如oracle),也可以直接通过api读取数据再写入目标库。在实际生产环境中,当需要数据仓库之间的迁移时,就可以直接利用api将源库的数据直接写入hive库的表文件中,包括淘宝开源的datax数据交换系统都采用类似的方式来交换跨库数据。

再次注意,加载或者写入的数据内容要和表定义的schema一致,否则将会造成字段或者表为空。

 

1.2   Hive数据模型

从数据仓库的角度看,Hive是建立在Hadoop上的数据仓库基础架构,可以方便的ETL操作。Hive没有专门的数据存储格式,也没有为数据建立索引,用于可以非常自由的组织Hive中的表,只需要在创建表的时候定义好表的schema即可。Hive中包含4中数据模型:Tabel、ExternalTable、Partition、Bucket。

 

图:hive数据模型

a)         Table:类似与传统数据库中的Table,每一个Table在Hive中都有一个相应的目录来存储数据。例如:一个表t,它在HDFS中的路径为:/user/hive/warehouse/t

b)        Partition:类似于传统数据库中划分列的索引。在Hive中,表中的一个Partition对应于表下的一个目录,所有的Partition数据都存储在对应的目录中。例如:t表中包含ds和city两个Partition,则对应于ds=2014,city=beijing的HDFS子目录为:/user/hive/warehouse/t/ds=2014/city=Beijing

需要注意的是,分区列是表的伪列,表数据文件中并不存在这个分区列的数据。

c)         Buckets:对指定列计算的hash,根据hash值切分数据,目的是为了便于并行,每一个Buckets对应一个文件。将user列分数至32个Bucket上,首先对user列的值计算hash,比如,对应hash=0的HDFS目录为:/user/hive/warehouse/t/ds=2014/city=Beijing/part-00000;对应hash=20的目录为:/user/hive/warehouse/t/ds=2014/city=Beijing/part-00020

d)        External Table指向已存在HDFS中的数据,可创建Partition。Managed Table创建和数据加载过程,可以用统一语句实现,实际数据被转移到数据仓库目录中,之后对数据的访问将会直接在数据仓库的目录中完成。删除表时,表中的数据和元数据都会删除。ExternalTable只有一个过程,因为加载数据和创建表是同时完成。数据是存储在Location后面指定的HDFS路径中的,并不会移动到数据仓库中。

 

1.3   Hive翻译成MapReduce Job

Hive编译器将HQL代码转换成一组操作符(operator),操作符是Hive的最小操作单元,每个操作符代表了一种HDFS操作或者MapReduce作业。Hive中的操作符包括:

表:Hive执行常用的操作符列表

操作符

描述

TableScanOperator

扫描hive表数据

ReduceSinkOperator

创建将发送到Reducer端的<Key,Value>对

JoinOperator

Join两份数据

SelectOperator

选择输出列

FileSinkOperator

建立结果数据,输出至文件

FilterOperator

过滤输入数据

GroupByOperator

Group By 语句

MapJoinOperator

Mapjoin

LimitOperator

Limit语句

UnionOperator

Union语句

 

对于MapReduce操作单元,Hive通过ExecMapper和ExecReducer执行MapReduce任务。

对于Hive语句:

 


 
  1. INSERT OVERWRITETABLE read_log_tmp

  2. SELECT a.userid,a.bookid,b.author,b.categoryid

  3. FROM user_read_log aJOIN book_info b ON a.bookid= b.bookid;

其执行计划为:

 

图:reduce端join的任务执行流程

 

1.4   与一般SQL的区别

Hive 视图与一般数据库视图

Hive视图与一般数据库视图作用角色相同,都是基于数据规模缩减或者基于安全机制下的某些条件查询下的数据子集。Hive视图只支持逻辑视图,不支持物化视图,即每次对视图的查询hive都将执行查询任务,因此视图不会带来性能上的提升。作为Hive查询优化的一部分,对视图的查询条件语句和视图的定义查询条件语句将会尽可能的合并成一个条件查询。

 

Hive索引与一般数据库索引

相比于传统数据库,Hive只提供有限的索引功能,通过在某些字段上建立索引来加速某些操作。通常当逻辑分区太多太细,partition无法满足时,可以考虑建立索引。Hive1.2.1版本目前支持的索引类型有CompactIndexHandler和Bitmap。

CompactIndexHandler压缩索引通过将列中相同的值得字段进行压缩从而减小存储和加快访问时间。需要注意的是Hive创建压缩索引时会将索引数据也存储在Hive表中。对于表tb_index (id int, name string)而言,建立索引后的索引表中默认的三列一次为索引列(id)、hdfs文件地址(_bucketname)、偏移量(offset)。特别注意,offset列类型为array<bigint>。

Bitmap位图索引作为一种常见的索引,如果索引列只有固定的几个值,那么就可以采用位图索引来加速查询。利用位图索引可以方便的进行AND/OR/XOR等各类计算,Hive0.8版本开始引入位图索引,位图索引在大数据处理方面的应用广泛,比如可以利用bitmap来计算用户留存率(索引做与运算,效率远好于join的方式)。如果Bitmap索引很稀疏,那么就需要对索引压缩以节省存储空间和加快IO。Hive的Bitmap Handler采用的是EWAH(https://github.com/lemire/javaewah)压缩方式。

图:Hivecompact索引及bitmap索引 

从Hive索引功能来看,其主要功能就是避免第一轮mr任务的全表扫描,而改为扫描索引表。如果索引索引表本身很大,其开销仍然很大,在集群资源充足的情况下,可以忽略使用hive下的索引。

 

2     Schema设计

没有通用的schema,只有合适的schema。在设计Hive的schema的时候,需要考虑到存储、业务上的高频查询造成的开销等等,设计适合自己的数据模型。

 

2.1   设置分区表

对于Hive来说,利用分区来设计表总是必要的,分区提供了一种隔离数据和优化查询的便利的方式。特别是面对日益增长的数据规模。设置符合逻辑的分区可以避免进行全表扫描,只需加载特定某些hdfs目录的数据文件。

设置分区时,需要考虑被设置成分区的字段,按照时间分区一般而言就是一个好的方案,其好处在于其是按照不同时间粒度来确定合适大小的数据积累量,随着时间的推移,分区数量的增长是均匀的,分区的大小也是均匀的。达观数据每日处理大量的用户日志,对于user_log来说,设置分区字段为日期(天)是合理的。如果以userid字段来建立动态分区,而userid的基数是非常大的,显然分区数目是会超过hive的默认设置而执行失败。如果相对userid进行hash,我们可以以userid进行分桶(bucket),根据userid进行hash然后分发到桶中,相同hash值的userid会分发到同一个桶中。每个桶对应着一个单独的文件。

 

2.2   避免小文件

虽然分区有利于隔离数据和查询,设置过多过细的分区也会带来瓶颈,主要是因为HDFS非常容易存储大数据文件,由于分区对应着hdfs的目录结构,当存在过多的分区时,意味着文件的数目就越多,过多增长的小文件会给namecode带来巨大的性能压力。同时小文件过多会影响JOB的执行,hadoop会将一个job转换成多个task,即使对于每个小文件也需要一个task去单独处理,task作为一个独立的jvm实例,其开启和停止的开销可能会大大超过实际的任务处理时间。因此,hive表设计的分区不应该过多过细,每个目录下的文件足够大,应该是文件系统中块大小的若干倍。

 

查询避免生成小文件

既然hive或者说hadoop需要大文件,HQL执行语句也需要注意输入文件和输出文件的大小,防止生成过多小文件。hive可以通过配置参数在mr过程中合并小文件。

Map合并小文件:

 


 
  1. setmapred.max.split.size=256000000 #每个Map最大输入大小(单位:字节)

  2. set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat #执行Map前进行小文件合并

输出合并:

 


 
  1. set hive.merge.mapfiles= true #在Map-only的任务结束时合并小文件

  2. sethive.merge.mapredfiles= true #在Map-Reduce的任务结束时合并小文件

  3. set hive.merge.size.per.task= 256*1000*1000 #合并文件的大小

  4. set hive.merge.smallfiles.avgsize=16000000 #当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge

 

 


 
  1. </pre><p>一个独立的map-reduce任务进行文件merge</p><p align="left"><pre name="code" class="sql">create table user_log (user_id int,url string,source_ip string)

  2. partitionedby (dt string)

  3. clusteredby (user_id) into 96 buckets;

 

我们知道hive输出最终是mr的输出,即reducer(或mapper)的输出,有多少个reducer(mapper)输出就会生成多少个输出文件,根据shuffle/sort的原理,每个文件按照某个值进行shuffle后的结果。我们可通过设置hive.enforce.bucketing=true来强制将对应记录分发到正确桶中,或者通过添加cluster by语句以及设置setmapred.reduce.tasks=96来设置reducer的数目,从而保证输出与schema一致。根据hive的读时验证方式,正确的插入数据取决与我们自己,而不能依靠schema。

 

2.3   选择文件格式

Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。

在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。

 

3     查看执行计划及优化

达观的数据仓库基于Hive搭建,每日需要处理大量的计算流程,Hive的稳定性和性能至关重要。众多的任务需要我们合理的调节分配集群资源,合理的配置各参数,合理的优化查询。Hive优化包含各个方面,如job个数优化、job的map/reducer个数优化、并行执行优化等等,本节将主要从HQL查询优化角度来具体说明。

3.1   Join语句

对于上述的join语句

 


 
  1. INSERT OVERWRITETABLE read_log_tmp

  2. SELECT a.userid,a.bookid,b.author

  3. FROM user_read_log aJOIN book_info b ON a.bookid= b.bookid;

explain该查询语句后:

 

 

图:map端join的执行计划

由于表中数据为空,对于小数据量,hive会自动采取map join的方式来优化join,从mapreduce的编程模型来看,实现join的方式主要有map端join、reduce端join。Map端join利用hadoop 分布式缓存技术通过将小表变换成hashtable文件分发到各个task,map大表时可以直接判断hashtable来完成join,注意小表的hashtable是放在内存中的,在内存中作匹配,因此map join是一种非常快的join方式,也是一种常见的优化方式。如果小表够小,那么就可以以map join的方式来完成join完成。Hive通过设置hive.auto.convert.join=true(默认值)来自动完成map join的优化,而无需显示指示map join。缺省情况下map join的优化是打开的。 

Reduce端join需要reducer来完成join过程,对于上述join代码,reduce 端join的mr流程如下,

 

图:reduce端join的mapreduce过程

相比于map join, reduce 端join无法再map过程中过滤任何记录,只能将join的两张表的所有数据按照join key进行shuffle/sort,并按照join key的hash值将<key,value>对分发到特定的reducer。Reducer对于所有的键值对执行join操作,例如0号(bookid的hash值为0)reducer收到的键值对如下,其中T1、T2表示记录的来源表,起到标识作用:

 

图:reduce端join的reducer join

Reducer端join无法避免的reduce截断以及传输的大量数据都会给集群网络带来压力,从上图可以看出所有hash(bookid)% reducer_number等于0的key-value对都会通过shuffle被分发到0号reducer,如果分到0号reducer的记录数目远大于其他reducer的记录数目,显然0号的reducer的数据处理量将会远大于其他reducer,因此处理时间也会远大于其他reducer,甚至会带来内存等其他问题,这就是数据倾斜问题。对于join造成的数据倾斜问题我们可以通过设置参数setHive.optimize.skewjoin=true,让hive自己尝试解决join过程中产生的倾斜问题。

3.2   Group by语句

我们对user_read_log表按userid goup by语句来继续探讨数据倾斜问题,首先我们explain group by语句:

 

explain select userid,count(*)from user_read_log groupby userid

 

图:goupby的执行计划

 

Group by的执行计划按照userid的hash值分发记录,同时在map端也做了本地reduce,group by的shuffle过程是按照hash(userid)来分发的,实际应用中日志中很多用户都是未注册用户或者未登录,userid字段为空的记录数远大于userid不为空的记录数,当所有的空userid记录都分发到特定某一个reducer后,也会带来严重的数据倾斜问题。造成数据倾斜的主要原因在于分发到某个或某几个reducer的数据量远大于其他reducer的数据量。

对于groupby造成的数据倾斜问题,我们可以通过设置参数 

 


 
  1. set hive.map.aggr=true (开启map端combiner);

  2. set hive.groupby.skewindata=true;

这个参数的作用是做Reduce操作的时候,拿到的key并不是所有相同值给同一个Reduce,而是随机分发,然后Reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。虽然多了一轮MR任务,但是可以有效的减少数据倾斜问题可能带来的危险。

 

Hive解决数据倾斜

正确的设置Hive参数可以在某种程度上避免的数据倾斜问题,合适的查询语句也可以避免数据倾斜问题。要尽早的过滤数据和裁剪数据,减少后续处理的数据量,使得join key的数据分布较为均匀,将空字段随机赋予值,这样既可以均匀分发倾斜的数据:

 


 
  1. select userid,namefrom user_info a

  2. join (

  3. select case when userid isnull then cast(rand(47)*100000as int)

  4. else userid

  5. from user_read_log

  6. ) b on a.userid= b.userid

 

如果用户在定义schema的时候就已经预料到表数据可能会存在严重的数据倾斜问题,Hive自0.10.0引入了skew table的概念,如建表语句

 


 
  1. CREATE TABLE user_read_log (useridint,bookid,…)

  2. SKEWEDBY (userid) ON (null)[STORED AS DIRECTORIES];

需要注意的是,skewtable只是将倾斜特别严重的列的分开存储为不同的文件,每个制定的倾斜值制定为一个文件或者目录,因此在查询的时候可以通过过滤倾斜值来避免数据倾斜问题:

 


 
  1. select userid,namefrom user_info a

  2. join (

  3. select userid from user_read_log where pt=’2015’and userid isnot null

  4. ) b on a.userid= b.userid

可以看出,如果不加过滤条件,倾斜问题还是会存在,通过对skewtable加过滤条件的好处是避免了mapper的表扫描过滤操作。

3.3   Join的物理优化

Hive内部实现了MapJoinResolver(处理MapJoin)、SkewJoinResolver(处理倾斜join)、CommonJoinResolver

(处理普通Join)等类来实现join的查询物理优化(/org/apache/hadoop/hive/ql/optimizer/physical)。

CommonJoinResolver类负责将普通Join转换成MapJoin,Hive通过这个类来实现mapjoin的自动优化。对于表A和表B的join查询,会产生3个分支:

1)        以表A作为大表进行Mapjoin;

2)        以表A作为大表进行Mapjoin;

3)        Map-reduce join

由于不知道输入数据规模,因此编译时并不会决定走那个分支,而是在运行时判断走那个分支。需要注意的是要像完成上述自动转换,需要将hive.auto.convert.join.noconditionaltask设置为true(默认值),同时可以手工控制转载进内存的小表的大小(hive.auto.convert.join.noconditionaltask.size)。

MapJoinResolver 类负责迭代各个mr任务,检查每个任务是否存在map join操作,如果有,会将local map work转换成local map join work。

SkewJoinResolver类负责迭代有join操作的reducer任务,一旦单个reducer产生了倾斜,那么就会将倾斜值得数据写入hdfs,然后用一个新的map join的任务来处理倾斜值的计算。虽然多了一轮mr任务,但是由于采用的map join,效率也是很高的。良好的mr模式和执行流程总是至关重要的。

 

4     窗口分析函数

Hive提供了丰富了数学统计函数,同时也提供了用户自定义函数的接口,用户可以自定义UDF、UDAF、UDTF Hive 0.11版本开始提供窗口和分析函数(Windowingand Analytics Functions),包括LEAD、LAG、FIRST_VALUE、LAST_VALUE、RANK、ROW_NUMBER、PERCENT_RANK、CUBE、ROLLUP等。

窗口函数是深受数据分析人员的喜爱,利用窗口函数可以方便的实现复杂的数据统计分析需求,oracle、db2、postgresql等数据库中也提供了window function的功能。窗口函数与聚合函数一样,都是对表子集的操作,从结果上看,区别在于窗口函数的结果不会聚合,原有的每行记录依然会存在。

窗口函数的典型分析应用包括:

1)        按分区聚合(排序,topn问题)

2)        行间计算(时间序列分析)

3)        关联计算(购物篮分析)

我们以一个简单的行间计算的例子说明窗口函数的应用(关于其他函数的具体说明,请参考hive文档)。用户阅读行为的统计分析需要从点击书籍行为中归纳统计出来,用户在时间点T1点击了章节A,在时间点T2点击了章节B,在时间点T3点击了章节C 。用户浏览日志结构如下表所示。

USER_ID

BOOK_ID

CHAPTER_ID

LOG_TIME

1001

2001

40001

1443016010

1001

2001

40004

1443016012

1001

2001

40005

1443016310

通过对连续的用户点击日志分析,通过Hive提供的窗口分析函数可以计算出用户各章节的阅读时间。按照USER_ID、BOOKID构建窗口,并按照LOG_TIME排序,对窗口的每一条记录取相对下一条记录的LOG_TIME减去当前记录的LOG_TIME即为当前记录章节的阅读时间。

 


 
  1. SELECT

  2. Userid, bookid, chapterid, end_time – start_timeas read_time

  3. FROM

  4. (

  5. SELECT userid, bookid, chapterid, log_timeas start_time,

  6. lead(log_time,1,null) over(partitionby userid, bookidorder by log_time)as end_time

  7. FROM user_read_logwhere pt=’2015-12-01’

  8. ) a;

 

通过上述查询既可以找出2015-12-01日所有用户对每一章节的阅读时间。感谢窗口函数,否则hive将束手无策。只能通过开发mr代码或者实现udaf来实现上述功能。

窗口分析函数关键在于定义的窗口数据集及其对窗口的操作,通过over(窗口定义语句)来定义窗口。日常分析和实际应用中,经常会有窗口分析应用的场景,例如基于分区的排序、集合、统计等复杂操作。例如我们需要统计每个用户阅读时间最多的3本书:

 

  •  

 

图:行间计算示意图及代码

对上述语句explain后的结果:

 

图:行间计算的执行计划

窗口函数使得Hive的具备了完整的数据分析功能,在实际的应用环境中,达观数据分析团队大量使用hive窗口分析函数来实现较为复杂的逻辑,提高开发和迭代效率。

 

5     总结

本文在介绍Hive的原理和架构的基础上,分享了达观团队在Hive上的部分使用经验。Hive仍然处在不断的发展之中,将HQL理解成Mapreduce程序、理解Hadoop的核心能力是更好的使用和优化Hive的根本。

技术的发展日新月异,随着spark的日益完善和流行,hive社区正考虑将spark作为hive的执行引擎之一。Spark是一种基于rdd(弹性数据集)的内存分布式并行处理框架,内部集成了Spark SQL模块来实现对结构化数据的SQL功能。相比于Hadoop将大量的中间结果写入HDFS,Spark避免了中间结果的持久化,速度更快且更有利于迭代计算。 具体需要结合自身的业务需求,采取合理的框架架构,提升系统的处理能力。

 

 

参考:

1.  Hive wiki:https://cwiki.apache.org/confluence/display/Hive/Home

2.  Hive Design Docs:https://cwiki.apache.org/confluence/display/Hive/DesignDocs

3.  Hadoop: The Definitive Guide (3rd Edition)

4. Programming Hive

5. Analytical Queries with Hive:http://www.slideshare.net/Hadoop_Summit/analytical-queries-with-hive

 

2020-01-02 17:14:53 xinxindsj 阅读数 13
  • Hive大数据分析大数据仓库中应用实战

    随着大数据技术的不断发展壮大, Hive不再是大数据技术生态圈中一个普通的工具,而是在大数据分析和大数据仓库中占据着几乎不可替代的重要作用,大数据分析中Hive和Hbase、Hive和Spark SQL、Hive和Impala的结合使用愈加紧密,大数据仓库中Hive在数据仓库建模模块的作用暂时无可替代。所以,深入学好Hive是入门大数据分析、大数据仓库最好的选择。

    211 人正在学习 去看看 李飞

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>  <name>hive.aux.jars.path</name>  <value>/path/elasticsearch-hadoop.jar</value>  <description>A comma separated list (with no spaces) of the jar files</description></property>

CDH6.X 推荐的安装方法

将 elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

大数据干货丨Hive 与 ElasticSearch 的数据交互

 

大数据干货丨Hive 与 ElasticSearch 的数据交互

 

 

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(    water_type STRING,    water_level STRING,    monitor_time TIMESTAMP,    sitecode STRING,    p492 DOUBLE,    p311 DOUBLE,    status STRING)STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'TBLPROPERTIES(    'es.resource'='ods_data_day_surface*/doc',    'es.query'='?q=status:001'    'es.nodes'='sky-01','es.port'='9200',    'es.net.http.auth.user'='sky',    'es.net.http.auth.pass'='jointsky',    'es.date.format'='yyyy-MM-dd HH:mm:ss',    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'    'es.mapping.names'='waterType:water_type,monitortime:monitor_time');

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri query、query dsl、external resource 三种设置方式。

# uri (or parameter) queryes.query = ?q=costinl# query dsles.query = { "query" : { "term" : { "user" : "costinl" } } }# external resourcees.query = org/mypackage/myquery.json复制代码

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Overrideprotected Object parseDate(Long value, boolean richDate) {    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));}@Overrideprotected Object parseDate(String value, boolean richDate) {    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));}

为了兼容自定义的日期格式,需要编写自定义的日期读取类:

import org.apache.hadoop.hive.serde2.io.TimestampWritable;import org.elasticsearch.hadoop.cfg.Settings;import org.elasticsearch.hadoop.hive.HiveValueReader;import java.sql.Timestamp;import java.text.ParseException;import java.text.ParsePosition;import java.text.SimpleDateFormat;import java.util.Date;public class EsValueReader extends HiveValueReader {    private String dateFormat;    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";    @Override    public void setSettings(Settings settings) {        super.setSettings(settings);        dateFormat = settings.getProperty("es.date.format");    }    @Override    protected Object parseDate(String value, boolean richDate) {        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {            if (richDate){                if (value.length() == 16){                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));                }                if (value.length() == 13){                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));                }                if (value.length() == 10){                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));                }                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));            }            return parseString(value);        }        return super.parseDate(value, richDate);    }    /**     * 解析日期,根據指定的格式進行解析.<br>     * 如果解析錯誤,則返回null     * @param stringDate 日期字串     * @param format 日期格式     * @return 日期型別     */    private static Date parseDate(String stringDate, String format) {        if (stringDate == null) {            return null;        }        try {            return parseDate(stringDate, new String[] { format });        } catch (ParseException e) {            return null;        }    }    public static Date parseDate(String str, String... parsePatterns) throws ParseException {        return parseDateWithLeniency(str, parsePatterns, true);    }    private static Date parseDateWithLeniency(            String str, String[] parsePatterns, boolean lenient) throws ParseException {        if (str == null || parsePatterns == null) {            throw new IllegalArgumentException("Date and Patterns must not be null");        }        SimpleDateFormat parser = new SimpleDateFormat();        parser.setLenient(lenient);        ParsePosition pos = new ParsePosition(0);        for (String parsePattern : parsePatterns) {            String pattern = parsePattern;            if (parsePattern.endsWith("ZZ")) {                pattern = pattern.substring(0, pattern.length() - 1);            }            parser.applyPattern(pattern);            pos.setIndex(0);            String str2 = str;            if (parsePattern.endsWith("ZZ")) {                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");            }            Date date = parser.parse(str2, pos);            if (date != null && pos.getIndex() == str2.length()) {                return date;            }        }        throw new ParseException("Unable to parse the date: " + str, -1);    }}

上述代码的 Maven 依赖

<dependencies>    <dependency>        <groupId>org.apache.hive</groupId>        <artifactId>hive-exec</artifactId>        <version>2.1.1</version>        <scope>provided</scope>    </dependency>    <dependency>        <groupId>org.elasticsearch</groupId>        <artifactId>elasticsearch-hadoop</artifactId>        <version>6.5.4</version>        <scope>provided</scope>    </dependency></dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。

2019-05-17 17:14:37 shalyniu 阅读数 245
  • Hive大数据分析大数据仓库中应用实战

    随着大数据技术的不断发展壮大, Hive不再是大数据技术生态圈中一个普通的工具,而是在大数据分析和大数据仓库中占据着几乎不可替代的重要作用,大数据分析中Hive和Hbase、Hive和Spark SQL、Hive和Impala的结合使用愈加紧密,大数据仓库中Hive在数据仓库建模模块的作用暂时无可替代。所以,深入学好Hive是入门大数据分析、大数据仓库最好的选择。

    211 人正在学习 去看看 李飞

需求:统计各省业务量,使用hive语句分析业务,用sqoop工具进行导入导出

步骤:

  1. 利用sqoop将mysql中业务表导入hive库(两种方式)
    1. 可以在hive中执行create建表语句先创建一个与mysql表结构相同的表,再执行数据导入操作
    2. 直接进行数据导入,hive自动创建表。
    3. 我采用第二种方式,导入语句为sqoop  --connect jdbc:mysql://ip:3306/databaseName --username *** --password *** --table mysqlTabelName --hive-import --hive-overwrite --hive-database hiveDatabase   --fields-terminated-by "\t" --create-hive-table --lines-terminated-by "\n" --hive-drop-import-delims   --delete-target-dir(其余参数根据自身需要添加)
  2. 在hive中执行hql语句进行统计(两种方式)
    1. 可以在hive中执行create创建结果表结构,例如create table temp_result( total int,province string)

      row format delimited fields terminated by '\t',再执行insert into table 结果表 as 统计语句,例如insert into temp_result as select count(*) as total,province from bussiness group by province.

    2. 直接创建统计表并统计,create table temp_result as select count(*) as total,province from bussiness group by province.。。。。。这里是有个疑问的,不知道这个直接进行保存的统计表有没有表头????

  3. 将结果表通过sqoop导出到mysql(两步)

    1. 首先在mysql中创建表temp_result,结构和hive中的表结构相同,表名字可以不同

    2. 其次使用sqoop语句导出

      sqoop export --connect jdbc:mysql://ip:3306/databaseName  --username *** --password ***  -m 1 --table temp_result  --fields-terminated-by '\t' --export-dir '/user/hive/warehouse/hiveDatabaseName/temp_result';

导出时有一个问题:关于mysql主键的问题,为了保持mysql的表和hive的表字段一致,就没创建主键,因为如果在mysql中创建自增id的时候导出就会报错,因为hive中没有id这一列,当然如果能有id最好了,需要再学习。。。

2018-04-10 22:10:40 sir9ll 阅读数 1833
  • Hive大数据分析大数据仓库中应用实战

    随着大数据技术的不断发展壮大, Hive不再是大数据技术生态圈中一个普通的工具,而是在大数据分析和大数据仓库中占据着几乎不可替代的重要作用,大数据分析中Hive和Hbase、Hive和Spark SQL、Hive和Impala的结合使用愈加紧密,大数据仓库中Hive在数据仓库建模模块的作用暂时无可替代。所以,深入学好Hive是入门大数据分析、大数据仓库最好的选择。

    211 人正在学习 去看看 李飞

数据样例:

[{“beCommentWeiboId”:”“,”beForwardWeiboId”:”“,”catchTime”:”1387157643”,”commentCount”:”682”,”content”:”喂!2014。。。2014!喂。。。”,”createTime”:”1387086483”,”info1”:”“,”info2”:”“,”info3”:”“,”mlevel”:”“,”musicurl”:[],”pic_list”:[“http://ww1.sinaimg.cn/square/47119b17jw1ebkc9b07x9j218g0xcair.jpg“,”http://ww4.sinaimg.cn/square/47119b17jw1ebkc9ebakij218g0xc113.jpg“,”http://ww2.sinaimg.cn/square/47119b17jw1ebkc9hml7dj218g0xcgt6.jpg“,”http://ww3.sinaimg.cn/square/47119b17jw1ebkc9kyakyj218g0xcqb3.jpg“],”praiseCount”:”1122”,”reportCount”:”671”,”source”:”iPhone客户端”,”userId”:”1192336151”,”videourl”:[],”weiboId”:”3655768039404271”,”weiboUrl”:”http://weibo.com/1192336151/AnoMrDstN“}]

字段描述

总共19个字段
beCommentWeiboId 是否评论
beForwardWeiboId 是否是转发微博
catchTime 抓取时间
commentCount 评论次数
content 内容
createTime 创建时间
info1 信息字段1
info2信息字段2
info3信息字段3
mlevel no sure
musicurl 音乐链接
pic_list 照片列表(可以有多个)
praiseCount 点赞人数
reportCount 转发人数
source 数据来源
userId 用户id
videourl 视频链接
weiboId 微博id
weiboUrl 微博网址

题目

  1. 组织数据
    (创建Hive表weibo_json(json string),表只有一个字段,导入所有数据,并验证查询前5条数据)
    (解析完weibo_json当中的json格式数据到拥有19个字段的weibo表中,写出必要的SQL语句)

  2. 统计微博总量 和 独立用户数

  3. 统计用户所有微博被转发的次数之和,输出top5用户,并给出次数

4.统计带图片的微博数

  1. 统计使用iphone发微博的独立用户数

  2. 将微博的点赞人数和转发人数相加求和,并将相加之和降序排列,取前10条记录,输出userid和总次数

  3. 统计微博中评论次数小于1000的用户ID与数据来源信息,将其放入视图,然后统计视图中数据来源是”ipad客户端”的用户数目

  4. 统计微博内容中出现”iphone”次数最多的用户,最终结果输出用户id和次数(注意:该次数是”iphone”的出现次数,不是出现”iphone”的微博数目)

  5. 求每天发微博次数最多的那个家伙的ID和发微博的条数

  6. 求出所有被多次引用(同一张照片出现在多条微博中,超过1条就算多条)的照片的数目

解题

组织数据

// 创建库:
create database weibo;
use weibo;

// 创建表:
create table weibo_json(json string);

// 导入数据:
load data local inpath ‘/home/hadoop/weibojson.data.json’ into table weibo_json;

// 验证:
select json from weibo_json limit 5;

// 创建19个字段的weibo表:

create table weibo(
 beCommentWeiboId string,
 beForwardWeiboId string,
 catchTime string,
 commentCount int,
 content string,
 createTime string,
 info1 string, 
 info2 string, 
 info3 string,
 mlevel string, 
 musicurl string, 
 pic_list string, 
 praiseCount int,
 reportCount int, 
 source string, 
 userId string, 
 videourl string,
 weiboId string, 
 weiboUrl string 
) row format delimited fields terminated by '\t';

插入数据

insert into table weibo 
select 
get_json_object(json,'$[0].beCommentWeiboId') beCommentWeiboId,
get_json_object(json,'$[0].beForwardWeiboId') beForwardWeiboId,
get_json_object(json,'$[0].catchTime') catchTime,
get_json_object(json,'$[0].commentCount') commentCount,
get_json_object(json,'$[0].content') content, 
get_json_object(json,'$[0].createTime') createTime,
get_json_object(json,'$[0].info1') info1, 
get_json_object(json,'$[0].info2') info2,
get_json_object(json,'$[0].info3') info3,
get_json_object(json,'$[0].mlevel') mlevel,
get_json_object(json,'$[0].musicurl') musicurl,
get_json_object(json,'$[0].pic_list') pic_list,
get_json_object(json,'$[0].praiseCount') praiseCount,
get_json_object(json,'$[0].reportCount') reportCount,
get_json_object(json,'$[0].source') source,
get_json_object(json,'$[0].userId') userId,
get_json_object(json,'$[0].videourl') videourl,
get_json_object(json,'$[0].weiboId') weiboId,
get_json_object(json,'$[0].weiboUrl') weiboUrl
from weibo_json;

统计用户所有微博被转发的次数之和,输出top5用户,并给出次数。注意:一个用户可能发过多个微博

思路:
1. 以用户id分组,求转发和
2. 按照转发量排序
select sum(reportCount) sumrep
from weibo
group by userId
order by sumrep desc limit 5;

结果
2721667
518676
477742
430532
415424

5、统计带图片的微博数(7分)
图片字段pic_list
select count(weiboId) total
from weibo
where instr(pic_list,’http’)>0
结果:

5278

统计使用iphone发微博的独立用户数

数据来源字段:source
select count(distinct userId)
from weibo
where instr(lcase(source),’iphone’)>0;

或者使用
select count(distinct userId)
from weibo
where lcase(source) like ‘%iphone%’;

将微博的点赞人数和转发人数相加求和,并将相加之和降序排列,取前10条记录,输出userid和总次数

思路:
以userid分组,统计

select count(praiseCount)+count(reportCount) total
from weibo
group by userId
order by total desc limit 10;

结果:
14328
620
516
472
428
340
308
226
210
188

统计微博中评论次数小于1000的用户ID与数据来源信息,将其放入视图,然后统计视图中数据来源是”ipad客户端”的用户数目

思路:
1. commentCount<1000
2. select userId,source

create view weibo8_view as
select userId,source
from weibo where commentCount<1000;

select count(userId)
from weibo8_view where source like ‘%皮皮%’;

统计微博内容中出现”iphone”次数最多的用户,最终结果输出用户id和次数(注意:该次数是”iphone”的出现次数,不是出现”iphone”的微博数目)

思路

  1. 以iPhone为分隔符使用split切分来源之后转换为数组,统计数组size

  2. 以userid分组最后统计用户所有出现的次数

create view weibo9_view as
select userId,size(split(lcase(content),’iphone’))-1 total
from weibo where size(split(lcase(content),’iphone’))-1>0;

select userId, sum(total) total
from weibo9_view
group by userId order by total desc limit 1;

1640601392 3
也可以用一条实现
select userId,sum(size(split(lcase(content),’iphone’))-1) total
from weibo
group by userId
order by total desc limit 1;

求每天发微博次数最多的那个家伙的ID和发微博的条数

求解步骤:

  1. 以每天和userId分组统计每天之中用户发送微博数
    create table weibo10 as
    select from_unixtime(cast(createTime as int), ‘yyyy-MM-dd’) dt,userId, count(weiboId) total
    from weibo
    group by from_unixtime(cast(createTime as int), ‘yyyy-MM-dd’),userId;

  2. 使用窗口函数生成以天数为分区,以发送微博数排序的列
    create table weibo10_2 as
    select dt,userId,total,
    row_number() over (distribute by dt sort by tota) as index
    from weibo10;

  3. 查询出每天发送微博数排名第一的字段
    select * from weibo10_2 where index<2;

求出所有被多次引用(同一张照片出现在多条微博中,超过1条就算多条)的照片的数目

思路: 以照片的url分组,统计这个分组下的weiboid数。

难点:
1. pic_list字段属于字符串类型,但是被[]包括,要先去除这个括号,再把字符串按照逗号切分成一个数组。
2. 要把照片列表中多个链接分裂之后,才能分组。

create table weibo11 as
select explode(split(substring(pic_list,2,length(pic_list)-2),’,’)) url
from weibo where pic_list!=’[]’;

select count(*) total
from weibo11
group by url having total >= 2 order by total;

2019-12-21 20:13:13 smartsteps 阅读数 46
  • Hive大数据分析大数据仓库中应用实战

    随着大数据技术的不断发展壮大, Hive不再是大数据技术生态圈中一个普通的工具,而是在大数据分析和大数据仓库中占据着几乎不可替代的重要作用,大数据分析中Hive和Hbase、Hive和Spark SQL、Hive和Impala的结合使用愈加紧密,大数据仓库中Hive在数据仓库建模模块的作用暂时无可替代。所以,深入学好Hive是入门大数据分析、大数据仓库最好的选择。

    211 人正在学习 去看看 李飞

1.集群的规划

以下安装的前提是已经部署好hadoop集群
一个简单的表格是这么创建的:

节点 功能 节点名称 节点IP 安装软件
marster 和 slave zhen(震) 192.168.2.5 namenogde
datanode
slave xun(巽) 192.168.2.6 datanode
slave li(离) 192.168.2.7 datanode
mysql
hive

2.安装步骤

1.下载hive安装包

官方下载:https://mirrors.tuna.tsinghua.edu.cn/apache/hive/hive-2.3.6/apache-hive-2.3.6-bin.tar.gz

cd /home/hive/ # 上传到这个目录
tar zxvf apache-hive-2.3.6-bin.tar.gz -C /usr/local/ # 进行解压到 /usr/local 目录下
mv apache-hive-2.3.6-bin/ hive # 重命名 hive

2.利用docker安装mysql

详细请参考https://blog.csdn.net/smartsteps/article/details/103646926

3.修改hive的配置文件

  • 修改hive配置文件hive-env.sh

cd /usr/local/hive/conf
cp hive-env.sh.template hive-env.sh
修改内容
HADOOP_HOME=/home/hadoop/hadoop-2.7.5
export HIVE_CONF_DIR=/usr/local/hive/conf

在这里插入图片描述

4.增加hive-site.xml配置文件

vi hive-site.xml

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
	 <!-- 用户名 -->
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>root</value> 
  </property>
  <property>
  <!-- 密码 -->
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>root</value>
  </property>
  <property>
   <!-- 链接数据库,没有则生成 -->
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://192.168.2.7:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>
  </property>
  <property>
   <!-- 驱动 -->
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
  </property>
  <property>
   <!-- 数据是否校验 -->
      <name>hive.metastore.schema.verification</name>
      <value>false</value>
  </property>
  <property>
   <!-- 是否自动创建核心配置文件 -->
    <name>datanucleus.schema.autoCreateAll</name>
    <value>true</value>
 </property>
 <property>
  <!-- 主机 -->
		<name>hive.server2.thrift.bind.host</name>
		<value>li</value>
   </property>
</configuration>

5.上传mysql 驱动

添加mysql的连接驱动包到hive的lib目录下
百度网盘下载
上传以后lib目录下多了一个mysql-connector-java-5.1.38.jar文件,hive和mysql链接的桥梁和驱动
在这里插入图片描述

6.添加hive的环境变量

vi /etc/profile
#hive
export HIVE_HOME=/usr/local/hive
export PATH=:HIVEHOME/bin:HIVE_HOME/bin:PATH
source /etc/profile # 让 profile 文件生效
在这里插入图片描述

3.启动HIVE

cd /usr/local/hive/bin
输入运行 hive
./hive
在这里插入图片描述

可以看到hive已经成功启动了,后续我们继续讲解hive中的数据库操作和表操作。

Hive基础

阅读数 30

没有更多推荐了,返回首页