hbase_hbase shell - CSDN
hbase 订阅
HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)所提供的分布式数据存储一样,HBase在Hadoop之上提供了类似于Bigtable的能力。HBase是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。 展开全文
HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)所提供的分布式数据存储一样,HBase在Hadoop之上提供了类似于Bigtable的能力。HBase是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。
信息
系    统
Google
属    性
开源数据库
结    构
分布式存储系统
来    源
Fay Chang 所撰写的“Bigtable
中文名
HBase
隶    属
Apache的Hadoop项目
HBase结构介绍
HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。与FUJITSU Cliq等商用大数据产品不同,HBase是Google Bigtable的开源实现,类似Google Bigtable利用GFS作为其文件存储系统,HBase利用Hadoop HDFS作为其文件存储系统;Google运行MapReduce来处理Bigtable中的海量数据,HBase同样利用Hadoop MapReduce来处理HBase中的海量数据;Google Bigtable利用 Chubby作为协同服务,HBase利用Zookeeper作为对应。 [1]  上图描述Hadoop EcoSystem中的各层系统。其中,HBase位于结构化存储层,Hadoop HDFS为HBase提供了高可靠性的底层存储支持,Hadoop MapReduce为HBase提供了高性能的计算能力,Zookeeper为HBase提供了稳定服务和failover机制。 此外,Pig和Hive还为HBase提供了高层语言支持,使得在HBase上进行数据统计处理变的非常简单。 Sqoop则为HBase提供了方便的RDBMS数据导入功能,使得传统数据库数据向HBase中迁移变的非常方便。
收起全文
精华内容
参与话题
  • 在本课程中,主要讲述了HBase详细的架构原理及特点、HBase内部各个角色的详细介绍、安装配置、HBase的Shell操作、新旧版本的读写数据详细流程、HBase的API操作、使用MapReduce以及Hive对HBase数据分析、Rowkey设计、...
  • Hbase原理、基本概念、基本架构

    万次阅读 多人点赞 2020-01-07 13:49:13
    HBase是一个构建在HDFS上的分布式列存储系统; HBase是基于Google BigTable模型开发的,典型的key/value系统; HBase是Apache Hadoop生态系统中的重要一员,主要用于海量结构化数据存储; 从逻辑上讲,HBase将数据...

     

    • 概述
     



    HBase是一个构建在HDFS上的分布式列存储系统;
    HBase是基于Google BigTable模型开发的,典型的key/value系统;
    HBase是Apache Hadoop生态系统中的重要一员,主要用于海量结构化数据存储;
    从逻辑上讲,HBase将数据按照表、行和列进行存储。
    与hadoop一样,Hbase目标主要依靠横向扩展,通过不断增加廉价的商用服务器,来增加计算和存储能力。
    Hbase表的特点
    大:一个表可以有数十亿行,上百万列
    无模式:每行都有一个可排序的主键和任意多的列,列可以根据需要动态的增加,同一张表中不同的行可以有截然不同的列
    面向列:面向列(族)的存储和权限控制,列(族)独立检索
    稀疏:空(null)列并不占用存储空间,表可以设计的非常稀疏;
    数据多版本:每个单元中的数据可以有多个版本,默认情况下版本号自动分配,是单元格插入时的时间戳;
    数据类型单一:Hbase中的数据都是字符串,没有类型。

    • Hbase数据模型

    Hbase逻辑视图

     

    注意上图中的英文说明

    Hbase基本概念

    RowKey:是Byte array,是表中每条记录的“主键”,方便快速查找,Rowkey的设计非常重要。
    Column Family:列族,拥有一个名称(string),包含一个或者多个相关列
    Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加
    Version Number:类型为Long,默认值是系统时间戳,可由用户自定义
    Value(Cell):Byte array
    • Hbase物理模型

    每个column family存储在HDFS上的一个单独文件中,空值不会被保存。
    Key 和 Version number在每个 column family中均有一份;
    HBase 为每个值维护了多级索引,即:<key, column family, column name, timestamp>

    物理存储:
    1、Table中所有行都按照row key的字典序排列;
    2、Table在行的方向上分割为多个Region;
    3、Region按大小分割的,每个表开始只有一个region,随着数据增多,region不断增大,当增大到一个阈值的时候,region就会等分会两个新的region,之后会有越来越多的region;
    4、Region是Hbase中分布式存储和负载均衡的最小单元,不同Region分布到不同RegionServer上。

    5、Region虽然是分布式存储的最小单元,但并不是存储的最小单元。Region由一个或者多个Store组成,每个store保存一个columns family;每个Strore又由一个memStore和0至多个StoreFile组成,StoreFile包含HFile;memStore存储在内存中,StoreFile存储在HDFS上。

     

    • HBase架构及基本组件

     

    Hbase基本组件说明:

     

    Client

    包含访问HBase的接口,并维护cache来加快对HBase的访问,比如region的位置信息

    Master

    为Region server分配region

    负责Region server的负载均衡

    发现失效的Region server并重新分配其上的region

    管理用户对table的增删改查操作

    Region Server

    Regionserver维护region,处理对这些region的IO请求

    Regionserver负责切分在运行过程中变得过大的region

    Zookeeper作用

    通过选举,保证任何时候,集群中只有一个master,Master与RegionServers 启动时会向ZooKeeper注册

    存贮所有Region的寻址入口

    实时监控Region server的上线和下线信息。并实时通知给Master

    存储HBase的schema和table元数据

    默认情况下,HBase 管理ZooKeeper 实例,比如, 启动或者停止ZooKeeper

    Zookeeper的引入使得Master不再是单点故障

     

    Write-Ahead-Log(WAL)

    该机制用于数据的容错和恢复:

    每个HRegionServer中都有一个HLog对象,HLog是一个实现Write Ahead Log的类,在每次用户操作写入MemStore的同时,也会写一份数据到HLog文件中(HLog文件格式见后续),HLog文件定期会滚动出新的,并删除旧的文件(已持久化到StoreFile中的数据)。当HRegionServer意外终止后,HMaster会通过Zookeeper感知到,HMaster首先会处理遗留的 HLog文件,将其中不同Region的Log数据进行拆分,分别放到相应region的目录下,然后再将失效的region重新分配,领取 到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复

    HBase容错性
    Master容错:Zookeeper重新选择一个新的Master
    无Master过程中,数据读取仍照常进行;
    无master过程中,region切分、负载均衡等无法进行;
    RegionServer容错:定时向Zookeeper汇报心跳,如果一旦时间内未出现心跳,Master将该RegionServer上的Region重新分配到其他RegionServer上,失效服务器上“预写”日志由主服务器进行分割并派送给新的RegionServer
    Zookeeper容错:Zookeeper是一个可靠地服务,一般配置3或5个Zookeeper实例
    Region定位流程:

    寻找RegionServer

    ZooKeeper--> -ROOT-(单Region)--> .META.--> 用户表

    -ROOT-
    表包含.META.表所在的region列表,该表只会有一个Region;

    Zookeeper中记录了-ROOT-表的location。

    .META.

    表包含所有的用户空间region列表,以及RegionServer的服务器地址。

    • Hbase使用场景

    storing large amounts  of data(100s of TBs)
    need high write throughput
    need efficient random access(key lookups) within large data sets
    need to scale gracefully with data
    for structured and semi-structured data
    don't need fullRDMS capabilities(cross row/cross table transaction, joins,etc.)

    大数据量存储,大数据量高并发操作

    需要对数据随机读写操作

    读写访问均是非常简单的操作

    • Hbase与HDFS对比

    两者都具有良好的容错性和扩展性,都可以扩展到成百上千个节点;
    HDFS适合批处理场景
    不支持数据随机查找
    不适合增量数据处理

    不支持数据更新

     

    • 参考文档:

    1、http://www.alidata.org/archives/1509(存储模型比较详细)

    2、http://www.searchtb.com/2011/01/understanding-hbase.html(技术框架以及存储模型)

    3、http://wenku.baidu.com/view/b46eadd228ea81c758f578f4.html(读和写的流程比较详细)

    展开全文
  • HBase详解(很全面)

    万次阅读 多人点赞 2020-03-19 17:53:56
    【转自:http://jiajun.iteye.com/blog/899632】一、简介historystarted by chad walters and jim2006.11 G release paper on BigTable2007.2 inital HBase prototype created as Hadoop contrib2007.10 First ...
    
    
    

    一、 简介

    history

    started by chad walters and jim

    2006.11 G release paper on BigTable

    2007.2 inital HBase prototype created as Hadoop contrib

    2007.10 First useable Hbase

    2008.1 Hadoop become Apache top-level project and Hbase becomes subproject

    2008.10 Hbase 0.18,0.19 released

     

    hbase是bigtable的开源山寨版本。是建立的hdfs之上,提供高可靠性、高性能、列存储、可伸缩、实时读写的数据库系统。

    它介于nosql和RDBMS之间,仅能通过主键(row key)和主键的range来检索数据,仅支持单行事务(可通过hive支持来实现多表join等复杂操作)。主要用来存储非结构化和半结构化的松散数据。

    与hadoop一样,Hbase目标主要依靠横向扩展,通过不断增加廉价的商用服务器,来增加计算和存储能力。

     

    HBase中的表一般有这样的特点:

    1 大:一个表可以有上亿行,上百万列

    2 面向列:面向列(族)的存储和权限控制,列(族)独立检索。

    3 稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常稀疏。

     

    下面一幅图是Hbase在Hadoop Ecosystem中的位置。

    二、 逻辑视图

     

    HBase以表的形式存储数据。表有行和列组成。列划分为若干个列族(row family)


    Row Key 

    与nosql数据库们一样,row key是用来检索记录的主键。访问hbase table中的行,只有三种方式:

    1 通过单个row key访问

    2 通过row key的range

    3 全表扫描

    Row key行键 (Row key)可以是任意字符串(最大长度是 64KB,实际应用中长度一般为 10-100bytes),在hbase内部,row key保存为字节数组。

    存储时,数据按照Row key的字典序(byte order)排序存储。设计key时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)

    注意:

    字典序对int排序的结果是1,10,100,11,12,13,14,15,16,17,18,19,2,20,21,…,9,91,92,93,94,95,96,97,98,99。要保持整形的自然序,行键必须用0作左填充。

    行的一次读写是原子操作 (不论一次读写多少列)。这个设计决策能够使用户很容易的理解程序在对同一个行进行并发更新操作时的行为。

     

    列族

    hbase表中的每个列,都归属与某个列族。列族是表的chema的一部分(而列不是),必须在使用表之前定义。列名都以列族作为前缀。例如courses:history , courses:math 都属于 courses 这个列族。

    访问控制、磁盘和内存的使用统计都是在列族层面进行的。实际应用中,列族上的控制权限能 帮助我们管理不同类型的应用:我们允许一些应用可以添加新的基本数据、一些应用可以读取基本数据并创建继承的列族、一些应用则只允许浏览数据(甚至可能因 为隐私的原因不能浏览所有数据)。

     

    时间戳

    HBase中通过row和columns确定的为一个存贮单元称为cell。每个 cell都保存着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是 64位整型。时间戳可以由hbase(在数据写入时自动 )赋值,此时时间戳是精确到毫秒的当前系统时间。时间戳也可以由客户显式赋值。如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。每个 cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。

    为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,hbase提供了两种数据版本回收方式。一是保存数据的最后n个版本,二是保存最近一段时间内的版本(比如最近七天)。用户可以针对每个列族进行设置。

     

    Cell

    {row key, column( =<family> + <label>), version} 唯一确定的单元。cell中的数据是没有类型的,全部是字节码形式存贮。

     

    三、 物理存储

    1 已经提到过,Table中的所有行都按照row key的字典序排列。

    2 Table 在行的方向上分割为多个Hregion。

    3 region按大小分割的,每个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,Hregion就会等分会两个新的Hregion。当table中的行不断增多,就会有越来越多的Hregion。

    4 Hregion是Hbase中分布式存储和负载均衡的最小单元。最小单元就表示不同的Hregion可以分布在不同的HRegion server上。但一个Hregion是不会拆分到多个server上的。

    5 HRegion虽然是分布式存储的最小单元,但并不是存储的最小单元。

    事实上,HRegion由一个或者多个Store组成,每个store保存一个columns family。

    每个Strore又由一个memStore和0至多个StoreFile组成。如图:

    StoreFile以HFile格式保存在HDFS上。

    HFile的格式为:

    Trailer部分的格式:

    HFile分为六个部分:

    Data Block 段–保存表中的数据,这部分可以被压缩

    Meta Block 段 (可选的)–保存用户自定义的kv对,可以被压缩。

    File Info 段–Hfile的元信息,不被压缩,用户也可以在这一部分添加自己的元信息。

    Data Block Index 段–Data Block的索引。每条索引的key是被索引的block的第一条记录的key。

    Meta Block Index段 (可选的)–Meta Block的索引。

    Trailer–这一段是定长的。保存了每一段的偏移量,读取一个HFile时,会首先 读取Trailer,Trailer保存了每个段的起始位置(段的Magic Number用来做安全check),然后,DataBlock Index会被读取到内存中,这样,当检索某个key时,不需要扫描整个HFile,而只需从内存中找到key所在的block,通过一次磁盘io将整个 block读取到内存中,再找到需要的key。DataBlock Index采用LRU机制淘汰。

    HFile的Data Block,Meta Block通常采用压缩方式存储,压缩之后可以大大减少网络IO和磁盘IO,随之而来的开销当然是需要花费cpu进行压缩和解压缩。

    目标Hfile的压缩支持两种方式:Gzip,Lzo。

     

    HLog(WAL log)

    WAL 意为Write ahead log(http://en.wikipedia.org/wiki/Write-ahead_logging),类似mysql中的binlog,用来 做灾难恢复只用,Hlog记录数据的所有变更,一旦数据修改,就可以从log中进行恢复。

    每个Region Server维护一个Hlog,而不是每个Region一个。这样不同region(来自不同table)的日志会混在一起,这样做的目的是不断追加单个 文件相对于同时写多个文件而言,可以减少磁盘寻址次数,因此可以提高对table的写性能。带来的麻烦是,如果一台region server下线,为了恢复其上的region,需要将region server上的log进行拆分,然后分发到其它region server上进行恢复。

    HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是”写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。HLog Sequece File的Value是HBase的KeyValue对象,即对应HFile中的KeyValue,可参见上文描述。

     

    四、 系统架构

     

     

    Client

    1 包含访问hbase的接口,client维护着一些cache来加快对hbase的访问,比如regione的位置信息。

     

    Zookeeper

    1 保证任何时候,集群中只有一个master

    2 存贮所有Region的寻址入口。

    3 实时监控Region Server的状态,将Region server的上线和下线信息实时通知给Master

    4 存储Hbase的schema,包括有哪些table,每个table有哪些column family

     

    Master

    1 为Region server分配region

    2 负责region server的负载均衡

    3 发现失效的region server并重新分配其上的region

    4 GFS上的垃圾文件回收

    5 处理schema更新请求

     

    Region Server

    1 Region server维护Master分配给它的region,处理对这些region的IO请求

    2 Region server负责切分在运行过程中变得过大的region

    可以看到,client访问hbase上数据的过程并不需要master参与(寻址访问zookeeper和region server,数据读写访问regione server),master仅仅维护者table和region的元数据信息,负载很低。

     

    五、关键算法 / 流程

    region定位

    系统如何找到某个row key (或者某个 row key range)所在的region

    bigtable 使用三层类似B+树的结构来保存region位置。

    第一层是保存zookeeper里面的文件,它持有root region的位置。

    第二层root region是.META.表的第一个region其中保存了.META.z表其它region的位置。通过root region,我们就可以访问.META.表的数据。

    .META.是第三层,它是一个特殊的表,保存了hbase中所有数据表的region 位置信息。

    说明:

    1 root region永远不会被split,保证了最需要三次跳转,就能定位到任意region 。

    2.META.表每行保存一个region的位置信息,row key 采用表名+表的最后一样编码而成。

    3 为了加快访问,.META.表的全部region都保存在内存中。

    假设,.META.表的一行在内存中大约占用1KB。并且每个region限制为128MB。

    那么上面的三层结构可以保存的region数目为:

    (128MB/1KB) * (128MB/1KB) = = 2(34)个region

    4 client会将查询过的位置信息保存缓存起来,缓存不会主动失效,因此如果client上的缓存全部失效,则需要进行6次网络来回,才能定位到正确的region(其中三次用来发现缓存失效,另外三次用来获取位置信息)。

     

    读写过程

    上文提到,hbase使用MemStore和StoreFile存储对表的更新。

    数据在更新时首先写入Log(WAL log)和内存(MemStore)中,MemStore中的数据是排序的,当MemStore累计到一定阈值时,就会创建一个新的MemStore,并 且将老的MemStore添加到flush队列,由单独的线程flush到磁盘上,成为一个StoreFile。于此同时,系统会在zookeeper中 记录一个redo point,表示这个时刻之前的变更已经持久化了。(minor compact)

    当系统出现意外时,可能导致内存(MemStore)中的数据丢失,此时使用Log(WAL log)来恢复checkpoint之后的数据。

    前面提到过StoreFile是只读的,一旦创建后就不可以再修改。因此Hbase的更 新其实是不断追加的操作。当一个Store中的StoreFile达到一定的阈值后,就会进行一次合并(major compact),将对同一个key的修改合并到一起,形成一个大的StoreFile,当StoreFile的大小达到一定阈值后,又会对 StoreFile进行split,等分为两个StoreFile。

    由于对表的更新是不断追加的,处理读请求时,需要访问Store中全部的 StoreFile和MemStore,将他们的按照row key进行合并,由于StoreFile和MemStore都是经过排序的,并且StoreFile带有内存中索引,合并的过程还是比较快。

    写请求处理过程

    1 client向region server提交写请求

    2 region server找到目标region

    3 region检查数据是否与schema一致

    4 如果客户端没有指定版本,则获取当前系统时间作为数据版本

    5 将更新写入WAL log

    6 将更新写入Memstore

    7 判断Memstore的是否需要flush为Store文件。

     

    region分配

    任何时刻,一个region只能分配给一个region server。master记录了当前有哪些可用的region server。以及当前哪些region分配给了哪些region server,哪些region还没有分配。当存在未分配的region,并且有一个region server上有可用空间时,master就给这个region server发送一个装载请求,把region分配给这个region server。region server得到请求后,就开始对此region提供服务。

     

    region server上线

    master使用zookeeper来跟踪region server状态。当某个region server启动时,会首先在zookeeper上的server目录下建立代表自己的文件,并获得该文件的独占锁。由于master订阅了server 目录上的变更消息,当server目录下的文件出现新增或删除操作时,master可以得到来自zookeeper的实时通知。因此一旦region server上线,master能马上得到消息。

     

    region server下线

    当region server下线时,它和zookeeper的会话断开,zookeeper而自动释放代表这台server的文件上的独占锁。而master不断轮询 server目录下文件的锁状态。如果master发现某个region server丢失了它自己的独占锁,(或者master连续几次和region server通信都无法成功),master就是尝试去获取代表这个region server的读写锁,一旦获取成功,就可以确定:

    1 region server和zookeeper之间的网络断开了。

    2 region server挂了。

    的其中一种情况发生了,无论哪种情况,region server都无法继续为它的region提供服务了,此时master会删除server目录下代表这台region server的文件,并将这台region server的region分配给其它还活着的同志。

    如果网络短暂出现问题导致region server丢失了它的锁,那么region server重新连接到zookeeper之后,只要代表它的文件还在,它就会不断尝试获取这个文件上的锁,一旦获取到了,就可以继续提供服务。

     

    master上线

    master启动进行以下步骤:

    1 从zookeeper上获取唯一一个代码master的锁,用来阻止其它master成为master。

    2 扫描zookeeper上的server目录,获得当前可用的region server列表。

    3 和2中的每个region server通信,获得当前已分配的region和region server的对应关系。

    4 扫描.META.region的集合,计算得到当前还未分配的region,将他们放入待分配region列表。

     

    master下线

    由于master只维护表和region的元数据,而不参与表数据IO的过 程,master下线仅导致所有元数据的修改被冻结(无法创建删除表,无法修改表的schema,无法进行region的负载均衡,无法处理region 上下线,无法进行region的合并,唯一例外的是region的split可以正常进行,因为只有region server参与),表的数据读写还可以正常进行。因此master下线短时间内对整个hbase集群没有影响。从上线过程可以看到,master保存的 信息全是可以冗余信息(都可以从系统其它地方收集到或者计算出来),因此,一般hbase集群中总是有一个master在提供服务,还有一个以上 的’master’在等待时机抢占它的位置。

     

    六、访问接口

    • HBase Shell
    • Java clietn API
    • HBase non-java access
      • languages talking to the JVM
        • Jython interface to HBase
        • Groovy DSL for HBase
        • Scala interface to HBase
      • languages with a custom protocol
        • REST gateway specification for HBase
        • 充分利用HTTP协议:GET POST PUT DELETE

    §

        • text/plain
        • text/xml
        • application/json
        • application/x-protobuf
      • Thrift gateway specification for HBase
        • java
        • cpp
        • rb
        • py
        • perl
        • php
    • HBase Map Reduce
    • Hive/Pig

    七、结语:

    全文对 Hbase做了 简单的介绍,有错误之处,敬请指正。未来将结合 Hbase 在淘宝数据平台的应用场景,在更多细节上进行深入。

     

    参考文档

    Bigtable: A Distributed Storage System for Structured Data

    HFile: A Block-Indexed File Format to Store Sorted Key-Value Pairs for a thorough introduction Hbase Architecture 101

    Hbase source code

     

    展开全文
  • Hbase从入门到入坑

    万次阅读 多人点赞 2020-09-04 20:22:24
    一 什么是HBASE 二 安装HBASEhbase初体验 四 HBASE客户端API操作 五 HBASE运行原理 5.1 master职责 5.2 Region Server 职责 5.3 zookeeper集群所起作用 5.4 HBASE读写数据流程 5.5 hbase:meta表 5.6 ...
    1. 本博客已迁移至微信公众号!将不再更新
    2. 关注公众号即可获得免费学习资源,获得免费指导!!!
    3. 公众号后续将会持续更新clickhouse,sparkstreaming,flink,数仓建模,用户画像,实时计算,推荐系统,实时数仓等内容,感兴趣的朋友可以关注
    4. 不定期会有朋友的面经分享

    目录

    一 什么是HBASE

    二 安装HBASE

    三 hbase初体验

    四 HBASE客户端API操作

    五 HBASE运行原理

    5.1 master职责

    5.2 Region Server 职责

    5.3 zookeeper集群所起作用

    5.4 HBASE读写数据流程

    5.4.1 写数据流程

    5.4.2 读数据流程

    5.4.3 数据flush过程

    5.4.4 数据合并过程

    5.5 hbase:meta表

    5.6 Region Server内部机制

    六.HBASE优化

    6.1 高可用

    6.2 预分区

    6.3 RowKey设计

    6.4 内存优化

    6.5 基础优化


    一 什么是HBASE

    HBASE是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群。

    HBASE的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。

    HBASE是Google Bigtable的开源实现,但是也有很多不同之处。比如:Google Bigtable利用GFS作为其文件存储系统,HBASE利用Hadoop HDFS作为其文件存储系统;Google运行MAPREDUCE来处理Bigtable中的海量数据,HBASE同样利用Hadoop MapReduce来处理HBASE中的海量数据;Google Bigtable利用Chubby作为协同服务,HBASE利用Zookeeper作为对应。

    HBASE与mysql、oralce、db2、sqlserver等关系型数据库不同,它是一个NoSQL数据库(非关系型数据库)

    1. Hbase的表模型与关系型数据库的表模型不同:
    2. Hbase的表没有固定的字段定义;
    3. Hbase的表中每行存储的都是一些key-value对
    4. Hbase的表中有列族的划分,用户可以指定将哪些kv插入哪个列族
    5. Hbase的表在物理存储上,是按照列族来分割的,不同列族的数据一定存储在不同的文件中
    6. Hbase的表中的每一行都固定有一个行键,而且每一行的行键在表中不能重复
    7. Hbase中的数据,包含行键,包含key,包含value,都是byte[ ]类型,hbase不负责为用户维护数据类型
    8. HBASE对事务的支持很差

    HBASE相比于其他nosql数据库(mongodb、redis、cassendra、hazelcast)的特点:

    Hbase的表数据存储在HDFS文件系统中

    从而,hbase具备如下特性:存储容量可以线性扩展; 数据存储的安全性可靠性极高!

    二 安装HBASE

    HBASE是一个分布式系统

    其中有一个管理角色:  HMaster(一般2台,一台active,一台backup)

    其他的数据节点角色:  HRegionServer(很多台,看数据容量)

    2.1 安装准备

    需要先有一个java环境

    首先,要有一个HDFS集群,并正常运行; regionserver应该跟hdfs中的datanode在一起

    其次,还需要一个zookeeper集群,并正常运行

    然后,安装HBASE

    角色分配如下:

    Hdp01:  namenode  datanode  regionserver  hmaster  zookeeper

    Hdp02:  datanode   regionserver  zookeeper

    Hdp03:  datanode   regionserver  zookeeper

    2.2 安装步骤

    解压hbase安装包

    修改hbase-env.sh

    export JAVA_HOME=/root/apps/jdk1.7.0_67

    export HBASE_MANAGES_ZK=false

    修改hbase-site.xml

    <configuration>

    <!-- 指定hbase在HDFS上存储的路径 -->

            <property>

                    <name>hbase.rootdir</name>

                    <value>hdfs://hdp01:9000/hbase</value>

            </property>

    <!-- 指定hbase是分布式的 -->

            <property>

                    <name>hbase.cluster.distributed</name>

                    <value>true</value>

            </property>

    <!-- 指定zk的地址,多个用“,”分割 -->

            <property>

                    <name>hbase.zookeeper.quorum</name>

                    <value>hdp01:2181,hdp02:2181,hdp03:2181</value>

            </property>

    </configuration>

    修改 regionservers

    hdp01

    hdp02

    hdp03

    2.3 启动hbase集群

    bin/start-hbase.sh

    启动完后,还可以在集群中找任意一台机器启动一个备用的master

    bin/hbase-daemon.sh start master

    新启的这个master会处于backup状态

    三 hbase初体验

    3.1 启动hbase命令行客户端

    bin/hbase shell

    Hbase> list     // 查看表

    Hbase> status   // 查看集群状态

    Hbase> version  // 查看集群版本

    3.2 hbase表模型的特点

    1. 一个表,有表名
    2. 一个表可以分为多个列族(不同列族的数据会存储在不同文件中)
    3. 表中的每一行有一个“行键rowkey”,而且行键在表中不能重复
    4. 表中的每一对kv数据称作一个cell
    5. hbase可以对数据存储多个历史版本(历史版本数量可配置)
    6. 整张表由于数据量过大,会被横向切分成若干个region(用rowkey范围标识),不同region的数据也存储在不同文件中
    7. hbase会对插入的数据按顺序存储:

         要点一:首先会按行键排序

         要点二:同一行里面的kv会按列族排序,再按k排序

    3.3 hbase的表中能存储什么数据类型

    hbase中只支持byte[]

    此处的byte[] 包括了: rowkey,key,value,列族名,表名

    3.4 hbase命令行客户端操作

    名称

    命令表达式

    创建表

    create '表名', '列族名1','列族名2','列族名N'

    查看所有表

    list

    描述表

    describe  ‘表名’

    判断表存在

    exists  '表名'

    判断是否禁用启用表

    is_enabled '表名'

    is_disabled ‘表名’

    添加记录      

    put  ‘表名’, ‘rowKey’, ‘列族 : 列‘  ,  '值'

    查看记录rowkey下的所有数据

    get  '表名' , 'rowKey'

    查看表中的记录总数

    count  '表名'

    获取某个列族

    get '表名','rowkey','列族'

    获取某个列族的某个列

    get '表名','rowkey','列族:列’

    删除记录

    delete  ‘表名’ ,‘行名’ , ‘列族:列'

    删除整行

    deleteall '表名','rowkey'

    删除一张表

    先要屏蔽该表,才能对该表进行删除

    第一步 disable ‘表名’ ,第二步  drop '表名'

    清空表

    truncate '表名'

    查看所有记录

    scan "表名"  

    查看某个表某个列中所有数据

    scan "表名" , {COLUMNS=>'列族名:列名'}

    更新记录

    就是重写一遍,进行覆盖,hbase没有修改,都是追加

    3.4.1 建表

    create 't_user_info','base_info','extra_info'

                          表名      列族名   列族名

    3.4.2 插入数据

    hbase(main):011:0> put 't_user_info','001','base_info:username','zhangsan'

    0 row(s) in 0.2420 seconds

     

    hbase(main):012:0> put 't_user_info','001','base_info:age','18'

    0 row(s) in 0.0140 seconds

     

    hbase(main):013:0> put 't_user_info','001','base_info:sex','female'

    0 row(s) in 0.0070 seconds

     

    hbase(main):014:0> put 't_user_info','001','extra_info:career','it'

    0 row(s) in 0.0090 seconds

     

    hbase(main):015:0> put 't_user_info','002','extra_info:career','actoress'

    0 row(s) in 0.0090 seconds

     

    hbase(main):016:0> put 't_user_info','002','base_info:username','liuyifei'

    0 row(s) in 0.0060 seconds

    3.4.3 查询方式一 scan扫描

    hbase(main):017:0> scan 't_user_info'

    ROW                               COLUMN+CELL                                                                                     

     001                              column=base_info:age, timestamp=1496567924507, value=18                                         

     001                              column=base_info:sex, timestamp=1496567934669, value=female                                     

     001                              column=base_info:username, timestamp=1496567889554, value=zhangsan                              

     001                              column=extra_info:career, timestamp=1496567963992, value=it                                     

     002                              column=base_info:username, timestamp=1496568034187, value=liuyifei                              

     002                              column=extra_info:career, timestamp=1496568008631, value=actoress    

    3.4.4 查询方式二 get单行数据

    hbase(main):020:0> get 't_user_info','001'

    COLUMN                            CELL                                                                                            

     base_info:age                    timestamp=1496568160192, value=19                                                               

     base_info:sex                    timestamp=1496567934669, value=female                                                           

     base_info:username               timestamp=1496567889554, value=zhangsan                                                         

     extra_info:career                timestamp=1496567963992, value=it                                                               

    4 row(s) in 0.0770 seconds

    3.4.5 删除一个kv数据

    hbase(main):021:0> delete 't_user_info','001','base_info:sex'

    0 row(s) in 0.0390 seconds

    删除整行数据

    hbase(main):024:0> deleteall 't_user_info','001'

    0 row(s) in 0.0090 seconds

    hbase(main):025:0> get 't_user_info','001'

    COLUMN                            CELL                                                                                            

    0 row(s) in 0.0110 seconds

    3.4.6 删除整个表

    hbase(main):028:0> disable 't_user_info'

    0 row(s) in 2.3640 seconds

    hbase(main):029:0> drop 't_user_info'

    0 row(s) in 1.2950 seconds

    hbase(main):030:0> list

    TABLE                                                                                                                             

    0 row(s) in 0.0130 seconds

    => []

    3.5 Hbase重要特性-排序特性(行键)

    与nosql数据库们一样,row key是用来检索记录的主键。访问HBASE table中的行,只有三种方式:

    1.通过单个row key访问

    2.通过row key的range(正则)

    3.全表扫描

    Row key行键 (Row key)可以是任意字符串(最大长度 是 64KB,实际应用中长度一般为 10-100bytes),在HBASE内部,row key保存为字节数组。存储时,数据按照Row key的字典序(byte order)排序存储。设计key时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)

    插入到hbase中去的数据,hbase会自动排序存储:

    排序规则:  首先看行键,然后看列族名,然后看列(key)名; 按字典顺序

    Hbase的这个特性跟查询效率有极大的关系

    比如:一张用来存储用户信息的表,有名字,户籍,年龄,职业....等信息

    然后,在业务系统中经常需要:

    查询某个省的所有用户

    经常需要查询某个省的指定姓的所有用户

    思路:如果能将相同省的用户在hbase的存储文件中连续存储,并且能将相同省中相同姓的用户连续存储,那么,上述两个查询需求的效率就会提高!!!

    做法:将查询条件拼到rowkey内

    四 HBASE客户端API操作

    4.1 简洁版

    HbaseClientDDL 

    package cn.hbase.demo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.regionserver.BloomType;
    import org.junit.Before;
    import org.junit.Test;
    
    
    /**
     *  
     *  1、构建连接
     *  2、从连接中取到一个表DDL操作工具admin
     *  3、admin.createTable(表描述对象);
     *  4、admin.disableTable(表名);
    	5、admin.deleteTable(表名);
    	6、admin.modifyTable(表名,表描述对象);	
     *  
     *
     */
    public class HbaseClientDDL {
    	Connection conn = null;
    	
    	@Before
    	public void getConn() throws Exception{
    		// 构建一个连接对象
    		Configuration conf = HBaseConfiguration.create(); // 会自动加载hbase-site.xml
    		conf.set("hbase.zookeeper.quorum", "hdp-01:2181,hdp-02:2181,hdp-03:2181");
    		
    		conn = ConnectionFactory.createConnection(conf);
    	}
    	
    	
    	
    	/**
    	 * DDL
    	 * @throws Exception 
    	 */
    	@Test
    	public void testCreateTable() throws Exception{
    
    		// 从连接中构造一个DDL操作器
    		Admin admin = conn.getAdmin();
    		
    		// 创建一个表定义描述对象
    		HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("user_info"));
    		
    		// 创建列族定义描述对象
    		HColumnDescriptor hColumnDescriptor_1 = new HColumnDescriptor("base_info");
    		hColumnDescriptor_1.setMaxVersions(3); // 设置该列族中存储数据的最大版本数,默认是1
    		
    		HColumnDescriptor hColumnDescriptor_2 = new HColumnDescriptor("extra_info");
    		
    		// 将列族定义信息对象放入表定义对象中
    		hTableDescriptor.addFamily(hColumnDescriptor_1);
    		hTableDescriptor.addFamily(hColumnDescriptor_2);
    		
    		
    		// 用ddl操作器对象:admin 来建表
    		admin.createTable(hTableDescriptor);
    		
    		// 关闭连接
    		admin.close();
    		conn.close();
    		
    	}
    	
    	
    	/**
    	 * 删除表
    	 * @throws Exception 
    	 */
    	@Test
    	public void testDropTable() throws Exception{
    		
    		Admin admin = conn.getAdmin();
    		
    		// 停用表
    		admin.disableTable(TableName.valueOf("user_info"));
    		// 删除表
    		admin.deleteTable(TableName.valueOf("user_info"));
    		
    		
    		admin.close();
    		conn.close();
    	}
    	
    	// 修改表定义--添加一个列族
    	@Test
    	public void testAlterTable() throws Exception{
    		
    		Admin admin = conn.getAdmin();
    		
    		// 取出旧的表定义信息
    		HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf("user_info"));
    		
    		
    		// 新构造一个列族定义
    		HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("other_info");
    		hColumnDescriptor.setBloomFilterType(BloomType.ROWCOL); // 设置该列族的布隆过滤器类型
    		
    		// 将列族定义添加到表定义对象中
    		tableDescriptor.addFamily(hColumnDescriptor);
    		
    		
    		// 将修改过的表定义交给admin去提交
    		admin.modifyTable(TableName.valueOf("user_info"), tableDescriptor);
    		
    		
    		admin.close();
    		conn.close();
    		
    	}
    	
    	
    	/**
    	 * DML -- 数据的增删改查
    	 */
    	
    	
    
    }
    

    HbaseClientDML

    package cn.hbase.demo;
    
    
    import java.util.ArrayList;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellScanner;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.junit.Before;
    import org.junit.Test;
    
    public class HbaseClientDML {
    	Connection conn = null;
    	
    	@Before
    	public void getConn() throws Exception{
    		// 构建一个连接对象
    		Configuration conf = HBaseConfiguration.create(); // 会自动加载hbase-site.xml
    		conf.set("hbase.zookeeper.quorum", "hdp-01:2181,hdp-02:2181,hdp-03:2181");
    		
    		conn = ConnectionFactory.createConnection(conf);
    	}
    	
    	
    	/**
    	 * 增
    	 * 改:put来覆盖
    	 * @throws Exception 
    	 */
    	@Test
    	public void testPut() throws Exception{
    		
    		// 获取一个操作指定表的table对象,进行DML操作
    		Table table = conn.getTable(TableName.valueOf("user_info"));
    		
    		// 构造要插入的数据为一个Put类型(一个put对象只能对应一个rowkey)的对象
    		Put put = new Put(Bytes.toBytes("001"));
    		put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("张三"));
    		put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
    		put.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"), Bytes.toBytes("北京"));
    		
    		
    		Put put2 = new Put(Bytes.toBytes("002"));
    		put2.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("李四"));
    		put2.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes("28"));
    		put2.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"), Bytes.toBytes("上海"));
    	
    		
    		ArrayList<Put> puts = new ArrayList<>();
    		puts.add(put);
    		puts.add(put2);
    		
    		
    		// 插进去
    		table.put(puts);
    		
    		table.close();
    		conn.close();
    		
    	}
    	
    	
    	/**
    	 * 循环插入大量数据
    	 * @throws Exception 
    	 */
    	@Test
    	public void testManyPuts() throws Exception{
    		
    		Table table = conn.getTable(TableName.valueOf("user_info"));
    		ArrayList<Put> puts = new ArrayList<>();
    		
    		for(int i=0;i<100000;i++){
    			Put put = new Put(Bytes.toBytes(""+i));
    			put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("张三"+i));
    			put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes((18+i)+""));
    			put.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"), Bytes.toBytes("北京"));
    			
    			puts.add(put);
    		}
    		
    		table.put(puts);
    		
    	}
    	
    	/**
    	 * 删
    	 * @throws Exception 
    	 */
    	@Test
    	public void testDelete() throws Exception{
    		Table table = conn.getTable(TableName.valueOf("user_info"));
    		
    		// 构造一个对象封装要删除的数据信息
    		Delete delete1 = new Delete(Bytes.toBytes("001"));
    		
    		Delete delete2 = new Delete(Bytes.toBytes("002"));
    		delete2.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"));
    		
    		ArrayList<Delete> dels = new ArrayList<>();
    		dels.add(delete1);
    		dels.add(delete2);
    		
    		table.delete(dels);
    		
    		
    		table.close();
    		conn.close();
    	}
    	
    	/**
    	 * 查
    	 * @throws Exception 
    	 */
    	@Test
    	public void testGet() throws Exception{
    		
    		Table table = conn.getTable(TableName.valueOf("user_info"));
    		
    		Get get = new Get("002".getBytes());
    		
    		Result result = table.get(get);
    		
    		// 从结果中取用户指定的某个key的value
    		byte[] value = result.getValue("base_info".getBytes(), "age".getBytes());
    		System.out.println(new String(value));
    		
    		System.out.println("-------------------------");
    		
    		// 遍历整行结果中的所有kv单元格
    		CellScanner cellScanner = result.cellScanner();
    		while(cellScanner.advance()){
    			Cell cell = cellScanner.current();
    			
    			byte[] rowArray = cell.getRowArray();  //本kv所属的行键的字节数组
    			byte[] familyArray = cell.getFamilyArray();  //列族名的字节数组
    			byte[] qualifierArray = cell.getQualifierArray();  //列名的字节数据
    			byte[] valueArray = cell.getValueArray(); // value的字节数组
    			
    			System.out.println("行键: "+new String(rowArray,cell.getRowOffset(),cell.getRowLength()));
    			System.out.println("列族名: "+new String(familyArray,cell.getFamilyOffset(),cell.getFamilyLength()));
    			System.out.println("列名: "+new String(qualifierArray,cell.getQualifierOffset(),cell.getQualifierLength()));
    			System.out.println("value: "+new String(valueArray,cell.getValueOffset(),cell.getValueLength()));
    			
    		}
    		
    		table.close();
    		conn.close();
    		
    	}
    	
    	
    	/**
    	 * 按行键范围查询数据
    	 * @throws Exception 
    	 */
    	@Test
    	public void testScan() throws Exception{
    		
    		Table table = conn.getTable(TableName.valueOf("user_info"));
    		
    		// 包含起始行键,不包含结束行键,但是如果真的想查询出末尾的那个行键,那么,可以在末尾行键上拼接一个不可见的字节(\000)
    		Scan scan = new Scan("10".getBytes(), "10000\001".getBytes());
    		
    		ResultScanner scanner = table.getScanner(scan);
    		
    		Iterator<Result> iterator = scanner.iterator();
    		
    		while(iterator.hasNext()){
    			
    			Result result = iterator.next();
    			// 遍历整行结果中的所有kv单元格
    			CellScanner cellScanner = result.cellScanner();
    			while(cellScanner.advance()){
    				Cell cell = cellScanner.current();
    				
    				byte[] rowArray = cell.getRowArray();  //本kv所属的行键的字节数组
    				byte[] familyArray = cell.getFamilyArray();  //列族名的字节数组
    				byte[] qualifierArray = cell.getQualifierArray();  //列名的字节数据
    				byte[] valueArray = cell.getValueArray(); // value的字节数组
    				
    				System.out.println("行键: "+new String(rowArray,cell.getRowOffset(),cell.getRowLength()));
    				System.out.println("列族名: "+new String(familyArray,cell.getFamilyOffset(),cell.getFamilyLength()));
    				System.out.println("列名: "+new String(qualifierArray,cell.getQualifierOffset(),cell.getQualifierLength()));
    				System.out.println("value: "+new String(valueArray,cell.getValueOffset(),cell.getValueLength()));
    			}
    			System.out.println("----------------------");
    		}
    	}
    	
    	@Test
    	public void test(){
    		String a = "000";
    		String b = "000\0";
    		
    		System.out.println(a);
    		System.out.println(b);
    		
    		
    		byte[] bytes = a.getBytes();
    		byte[] bytes2 = b.getBytes();
    		
    		System.out.println("");
    		
    	}
    	
    	
    
    }
    

    4.2 完整版

    package com.zgcbank.hbase;
    
    import java.util.ArrayList;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.MasterNotRunningException;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.ZooKeeperConnectionException;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HConnectionManager;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
    import org.apache.hadoop.hbase.filter.CompareFilter;
    import org.apache.hadoop.hbase.filter.FilterList;
    import org.apache.hadoop.hbase.filter.FilterList.Operator;
    import org.apache.hadoop.hbase.filter.RegexStringComparator;
    import org.apache.hadoop.hbase.filter.RowFilter;
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    public class HbaseTest {
    
    	/**
    	 * 配置ss
    	 */
    	static Configuration config = null;
    	private Connection connection = null;
    	private Table table = null;
    
    	@Before
    	public void init() throws Exception {
    		config = HBaseConfiguration.create();// 配置
    		config.set("hbase.zookeeper.quorum", "master,work1,work2");// zookeeper地址
    		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
    		connection = ConnectionFactory.createConnection(config);
    		table = connection.getTable(TableName.valueOf("user"));
    	}
    
    	/**
    	 * 创建一个表
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	public void createTable() throws Exception {
    		// 创建表管理类
    		HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
    		// 创建表描述类
    		TableName tableName = TableName.valueOf("test3"); // 表名称
    		HTableDescriptor desc = new HTableDescriptor(tableName);
    		// 创建列族的描述类
    		HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
    		// 将列族添加到表中
    		desc.addFamily(family);
    		HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
    		// 将列族添加到表中
    		desc.addFamily(family2);
    		// 创建表
    		admin.createTable(desc); // 创建表
    	}
    
    	@Test
    	@SuppressWarnings("deprecation")
    	public void deleteTable() throws MasterNotRunningException,
    			ZooKeeperConnectionException, Exception {
    		HBaseAdmin admin = new HBaseAdmin(config);
    		admin.disableTable("test3");
    		admin.deleteTable("test3");
    		admin.close();
    	}
    
    	/**
    	 * 向hbase中增加数据
    	 * 
    	 * @throws Exception
    	 */
    	@SuppressWarnings({ "deprecation", "resource" })
    	@Test
    	public void insertData() throws Exception {
    		table.setAutoFlushTo(false);
    		table.setWriteBufferSize(534534534);
    		ArrayList<Put> arrayList = new ArrayList<Put>();
    		for (int i = 21; i < 50; i++) {
    			Put put = new Put(Bytes.toBytes("1234"+i));
    			put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("wangwu"+i));
    			put.add(Bytes.toBytes("info"), Bytes.toBytes("password"), Bytes.toBytes(1234+i));
    			arrayList.add(put);
    		}
    		
    		//插入数据
    		table.put(arrayList);
    		//提交
    		table.flushCommits();
    	}
    
    	/**
    	 * 修改数据
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	public void uodateData() throws Exception {
    		Put put = new Put(Bytes.toBytes("1234"));
    		put.add(Bytes.toBytes("info"), Bytes.toBytes("namessss"), Bytes.toBytes("lisi1234"));
    		put.add(Bytes.toBytes("info"), Bytes.toBytes("password"), Bytes.toBytes(1234));
    		//插入数据
    		table.put(put);
    		//提交
    		table.flushCommits();
    	}
    
    	/**
    	 * 删除数据
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	public void deleteDate() throws Exception {
    		Delete delete = new Delete(Bytes.toBytes("1234"));
    		table.delete(delete);
    		table.flushCommits();
    	}
    
    	/**
    	 * 单条查询
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	public void queryData() throws Exception {
    		Get get = new Get(Bytes.toBytes("1234"));
    		Result result = table.get(get);
    		System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
    		System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("namessss"))));
    		System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex"))));
    	}
    
    	/**
    	 * 全表扫描
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	public void scanData() throws Exception {
    		Scan scan = new Scan();
    		//scan.addFamily(Bytes.toBytes("info"));
    		//scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("password"));
    		scan.setStartRow(Bytes.toBytes("wangsf_0"));
    		scan.setStopRow(Bytes.toBytes("wangwu"));
    		ResultScanner scanner = table.getScanner(scan);
    		for (Result result : scanner) {
    			System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
    			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
    			//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
    			//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
    		}
    	}
    
    	/**
    	 * 全表扫描的过滤器
    	 * 列值过滤器
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	public void scanDataByFilter1() throws Exception {
    
    		// 创建全表扫描的scan
    		Scan scan = new Scan();
    		//过滤器:列值过滤器
    		SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"),
    				Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
    				Bytes.toBytes("zhangsan2"));
    		// 设置过滤器
    		scan.setFilter(filter);
    
    		// 打印结果集
    		ResultScanner scanner = table.getScanner(scan);
    		for (Result result : scanner) {
    			System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
    			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
    			//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
    			//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
    		}
    
    	}
    	/**
    	 * rowkey过滤器
    	 * @throws Exception
    	 */
    	@Test
    	public void scanDataByFilter2() throws Exception {
    		
    		// 创建全表扫描的scan
    		Scan scan = new Scan();
    		//匹配rowkey以wangsenfeng开头的
    		RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^12341"));
    		// 设置过滤器
    		scan.setFilter(filter);
    		// 打印结果集
    		ResultScanner scanner = table.getScanner(scan);
    		for (Result result : scanner) {
    			System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
    			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
    			//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
    			//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
    		}
    
    		
    	}
    	
    	/**
    	 * 匹配列名前缀
    	 * @throws Exception
    	 */
    	@Test
    	public void scanDataByFilter3() throws Exception {
    		
    		// 创建全表扫描的scan
    		Scan scan = new Scan();
    		//匹配rowkey以wangsenfeng开头的
    		ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("na"));
    		// 设置过滤器
    		scan.setFilter(filter);
    		// 打印结果集
    		ResultScanner scanner = table.getScanner(scan);
    		for (Result result : scanner) {
    			System.out.println("rowkey:" + Bytes.toString(result.getRow()));
    			System.out.println("info:name:"
    					+ Bytes.toString(result.getValue(Bytes.toBytes("info"),
    							Bytes.toBytes("name"))));
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")) != null) {
    				System.out.println("info:age:"
    						+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
    								Bytes.toBytes("age"))));
    			}
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex")) != null) {
    				System.out.println("infi:sex:"
    						+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
    								Bytes.toBytes("sex"))));
    			}
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name")) != null) {
    				System.out
    				.println("info2:name:"
    						+ Bytes.toString(result.getValue(
    								Bytes.toBytes("info2"),
    								Bytes.toBytes("name"))));
    			}
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age")) != null) {
    				System.out.println("info2:age:"
    						+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
    								Bytes.toBytes("age"))));
    			}
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("sex")) != null) {
    				System.out.println("info2:sex:"
    						+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
    								Bytes.toBytes("sex"))));
    			}
    		}
    		
    	}
    	/**
    	 * 过滤器集合
    	 * @throws Exception
    	 */
    	@Test
    	public void scanDataByFilter4() throws Exception {
    		
    		// 创建全表扫描的scan
    		Scan scan = new Scan();
    		//过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)
    		FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
    		//匹配rowkey以wangsenfeng开头的
    		RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^wangsenfeng"));
    		//匹配name的值等于wangsenfeng
    		SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("info"),
    				Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
    				Bytes.toBytes("zhangsan"));
    		filterList.addFilter(filter);
    		filterList.addFilter(filter2);
    		// 设置过滤器
    		scan.setFilter(filterList);
    		// 打印结果集
    		ResultScanner scanner = table.getScanner(scan);
    		for (Result result : scanner) {
    			System.out.println("rowkey:" + Bytes.toString(result.getRow()));
    			System.out.println("info:name:"
    					+ Bytes.toString(result.getValue(Bytes.toBytes("info"),
    							Bytes.toBytes("name"))));
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")) != null) {
    				System.out.println("info:age:"
    						+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
    								Bytes.toBytes("age"))));
    			}
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex")) != null) {
    				System.out.println("infi:sex:"
    						+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
    								Bytes.toBytes("sex"))));
    			}
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name")) != null) {
    				System.out
    				.println("info2:name:"
    						+ Bytes.toString(result.getValue(
    								Bytes.toBytes("info2"),
    								Bytes.toBytes("name"))));
    			}
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age")) != null) {
    				System.out.println("info2:age:"
    						+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
    								Bytes.toBytes("age"))));
    			}
    			// 判断取出来的值是否为空
    			if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("sex")) != null) {
    				System.out.println("info2:sex:"
    						+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
    								Bytes.toBytes("sex"))));
    			}
    		}
    		
    	}
    
    	@After
    	public void close() throws Exception {
    		table.close();
    		connection.close();
    	}
    
    }

    4.3 MapReduce操作Hbase

    4.3.1 实现方法

    Hbase对MapReduce提供支持,它实现了TableMapper类和TableReducer类,我们只需要继承这两个类即可。

    1、写个mapper继承TableMapper<Text, IntWritable>

    参数:Text:mapper的输出key类型; IntWritable:mapper的输出value类型。

          其中的map方法如下:

    map(ImmutableBytesWritable key, Result value,Context context)

     参数:key:rowKey;value: Result ,一行数据; context上下文

    2、写个reduce继承TableReducer<Text, IntWritable, ImmutableBytesWritable>

    参数:Text:reducer的输入key; IntWritable:reduce的输入value;

     ImmutableBytesWritable:reduce输出到hbase中的rowKey类型。

          其中的reduce方法如下:

    reduce(Text key, Iterable<IntWritable> values,Context context)

    参数: key:reduce的输入key;values:reduce的输入value;

    4.3.2 准备表

    1、建立数据来源表‘word’,包含一个列族‘content’

    向表中添加数据,在列族中放入列‘info’,并将短文数据放入该列中,如此插入多行,行键为不同的数据即可

    2、建立输出表‘stat’,包含一个列族‘content’

    3、通过Mr操作Hbase的‘word’表,对‘content:info’中的短文做词频统计,并将统计结果写入‘stat’表的‘content:info中’,行键为单词

    4.3.3 实现

    package com.zgcbank.hbase;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    /**
     * mapreduce操作hbase
     *
     */
    public class HBaseMr {
    	/**
    	 * 创建hbase配置
    	 */
    	static Configuration config = null;
    	static {
    		config = HBaseConfiguration.create();
    		config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
    		config.set("hbase.zookeeper.property.clientPort", "2181");
    	}
    	/**
    	 * 表信息
    	 */
    	public static final String tableName = "word";//表名1
    	public static final String colf = "content";//列族
    	public static final String col = "info";//列
    	public static final String tableName2 = "stat";//表名2
    	/**
    	 * 初始化表结构,及其数据
    	 */
    	public static void initTB() {
    		HTable table=null;
    		HBaseAdmin admin=null;
    		try {
    			admin = new HBaseAdmin(config);//创建表管理
    			/*删除表*/
    			if (admin.tableExists(tableName)||admin.tableExists(tableName2)) {
    				System.out.println("table is already exists!");
    				admin.disableTable(tableName);
    				admin.deleteTable(tableName);
    				admin.disableTable(tableName2);
    				admin.deleteTable(tableName2);
    			}
    			/*创建表*/
    				HTableDescriptor desc = new HTableDescriptor(tableName);
    				HColumnDescriptor family = new HColumnDescriptor(colf);
    				desc.addFamily(family);
    				admin.createTable(desc);
    				HTableDescriptor desc2 = new HTableDescriptor(tableName2);
    				HColumnDescriptor family2 = new HColumnDescriptor(colf);
    				desc2.addFamily(family2);
    				admin.createTable(desc2);
    			/*插入数据*/
    				table = new HTable(config,tableName);
    				table.setAutoFlush(false);
    				table.setWriteBufferSize(500);
    				List<Put> lp = new ArrayList<Put>();
    				Put p1 = new Put(Bytes.toBytes("1"));
    				p1.add(colf.getBytes(), col.getBytes(),	("The Apache Hadoop software library is a framework").getBytes());
    				lp.add(p1);
    				Put p2 = new Put(Bytes.toBytes("2"));p2.add(colf.getBytes(),col.getBytes(),("The common utilities that support the other Hadoop modules").getBytes());
    				lp.add(p2);
    				Put p3 = new Put(Bytes.toBytes("3"));
    				p3.add(colf.getBytes(), col.getBytes(),("Hadoop by reading the documentation").getBytes());
    				lp.add(p3);
    				Put p4 = new Put(Bytes.toBytes("4"));
    				p4.add(colf.getBytes(), col.getBytes(),("Hadoop from the release page").getBytes());
    				lp.add(p4);
    				Put p5 = new Put(Bytes.toBytes("5"));
    				p5.add(colf.getBytes(), col.getBytes(),("Hadoop on the mailing list").getBytes());
    				lp.add(p5);
    				table.put(lp);
    				table.flushCommits();
    				lp.clear();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			try {
    				if(table!=null){
    					table.close();
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	/**
    	 * MyMapper 继承 TableMapper
    	 * TableMapper<Text,IntWritable> 
    	 * Text:输出的key类型,
    	 * IntWritable:输出的value类型
    	 */
    	public static class MyMapper extends TableMapper<Text, IntWritable> {
    		private static IntWritable one = new IntWritable(1);
    		private static Text word = new Text();
    		@Override
    		//输入的类型为:key:rowKey; value:一行数据的结果集Result
    		protected void map(ImmutableBytesWritable key, Result value,
    				Context context) throws IOException, InterruptedException {
    			//获取一行数据中的colf:col
    			String words = Bytes.toString(value.getValue(Bytes.toBytes(colf), Bytes.toBytes(col)));// 表里面只有一个列族,所以我就直接获取每一行的值
    			//按空格分割
    			String itr[] = words.toString().split(" ");
    			//循环输出word和1
    			for (int i = 0; i < itr.length; i++) {
    				word.set(itr[i]);
    				context.write(word, one);
    			}
    		}
    	}
    	/**
    	 * MyReducer 继承 TableReducer
    	 * TableReducer<Text,IntWritable> 
    	 * Text:输入的key类型,
    	 * IntWritable:输入的value类型,
    	 * ImmutableBytesWritable:输出类型,表示rowkey的类型
    	 */
    	public static class MyReducer extends
    			TableReducer<Text, IntWritable, ImmutableBytesWritable> {
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values,
    				Context context) throws IOException, InterruptedException {
    			//对mapper的数据求和
    			int sum = 0;
    			for (IntWritable val : values) {//叠加
    				sum += val.get();
    			}
    			// 创建put,设置rowkey为单词
    			Put put = new Put(Bytes.toBytes(key.toString()));
    			// 封装数据
    			put.add(Bytes.toBytes(colf), Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum)));
    			//写到hbase,需要指定rowkey、put
    			context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
    		}
    	}
    	
    	public static void main(String[] args) throws IOException,
    			ClassNotFoundException, InterruptedException {
    		config.set("df.default.name", "hdfs://master:9000/");//设置hdfs的默认路径
    		config.set("hadoop.job.ugi", "hadoop,hadoop");//用户名,组
    		config.set("mapred.job.tracker", "master:9001");//设置jobtracker在哪
    		//初始化表
    		initTB();//初始化表
    		//创建job
    		Job job = new Job(config, "HBaseMr");//job
    		job.setJarByClass(HBaseMr.class);//主类
    		//创建scan
    		Scan scan = new Scan();
    		//可以指定查询某一列
    		scan.addColumn(Bytes.toBytes(colf), Bytes.toBytes(col));
    		//创建查询hbase的mapper,设置表名、scan、mapper类、mapper的输出key、mapper的输出value
    		TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,Text.class, IntWritable.class, job);
    		//创建写入hbase的reducer,指定表名、reducer类、job
    		TableMapReduceUtil.initTableReducerJob(tableName2, MyReducer.class, job);
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }

    五 HBASE运行原理

    5.1 master职责

    1.管理监控HRegionServer,实现其负载均衡。

    2.处理region的分配或转移,比如在HRegion split时分配新的HRegion;在HRegionServer退出时迁移其负责的HRegion到其他        HRegionServer上。

    3.处理元数据的变更

    4.管理namespace和table的元数据(实际存储在HDFS上)。

    5.权限控制(ACL)。

    6.监控集群中所有HRegionServer的状态(通过Heartbeat和监听ZooKeeper中的状态)。

    5.2 Region Server 职责

    1. 管理自己所负责的region数据的读写。
    2. 读写HDFS,管理Table中的数据。
    3. Client直接通过HRegionServer读写数据(从HMaster中获取元数据,找到RowKey所在的HRegion/HRegionServer后)。
    4. 刷新缓存到HDFS
    5. 维护Hlog
    6. 执行压缩
    7. 负责处理Region分片

    5.3 zookeeper集群所起作用

    1. 存放整个HBase集群的元数据以及集群的状态信息。
    2. 实现HMaster主从节点的failover。

    注: HMaster通过监听ZooKeeper中的Ephemeral节点(默认:/hbase/rs/*)来监控HRegionServer的加入和宕机。

    在第一个HMaster连接到ZooKeeper时会创建Ephemeral节点(默认:/hbasae/master)来表示Active的HMaster,其后加进来的HMaster则监听该Ephemeral节点

    如果当前Active的HMaster宕机,则该节点消失,因而其他HMaster得到通知,而将自身转换成Active的HMaster,在变为Active的HMaster之前,它会在/hbase/masters/下创建自己的Ephemeral节点。

    5.4 HBASE读写数据流程

    5.4.1 写数据流程

    客户端现在要插入一条数据,rowkey=r000001, 这条数据应该写入到table表中的那个region中呢?

    1/ 客户端要连接zookeeper, 从zk的/hbase节点找到hbase:meta表所在的regionserver(host:port);

    2/ regionserver扫描hbase:meta中的每个region的起始行健,对比r000001这条数据在那个region的范围内;

    3/ 从对应的 info:server key中存储了region是有哪个regionserver(host:port)在负责的;

    4/ 客户端直接请求对应的regionserver;

    5/ regionserver接收到客户端发来的请求之后,就会将数据写入到region中

    5.4.2 读数据流程

    客户端现在要查询rowkey=r000001这条数据,那么这个流程是什么样子的呢?

    1/ 首先Client连接zookeeper, 找到hbase:meta表所在的regionserver;

    2/ 请求对应的regionserver,扫描hbase:meta表,根据namespace、表名和rowkey在meta表中找到r00001所在的region是由那个regionserver负责的;

    3/找到这个region对应的regionserver

    4/ regionserver收到了请求之后,扫描对应的region返回数据到Client

    (先从MemStore找数据,如果没有,再到BlockCache里面读;BlockCache还没有,再到StoreFile上读(为了读取的效率);

    如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。)

    注:客户会缓存这些位置信息,然而第二步它只是缓存当前RowKey对应的HRegion的位置,因而如果下一个要查的RowKey不在同一个HRegion中,则需要继续查询hbase:meta所在的HRegion,然而随着时间的推移,客户端缓存的位置信息越来越多,以至于不需要再次查找hbase:meta Table的信息,除非某个HRegion因为宕机或Split被移动,此时需要重新查询并且更新缓存。

    5.4.3 数据flush过程

    1)当MemStore数据达到阈值(默认是128M,老版本是64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据;

    2)并将数据存储到HDFS中;

    3)在HLog中做标记点。

    5.4.4 数据合并过程

    1)当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并;

    2)当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理;

    3)当HregionServer宕机后,将HregionServer上的hlog拆分,然后分配给不同的HregionServer加载,修改.META.;

    4)注意:HLog会同步到HDFS。

    5.5 hbase:meta表

    hbase:meta表存储了所有用户HRegion的位置信息:

    Rowkey:tableName,regionStartKey,regionId,replicaId等;

    info列族:这个列族包含三个列,他们分别是:

    info:regioninfo列:

    regionId,tableName,startKey,endKey,offline,split,replicaId;

    info:server列:HRegionServer对应的server:port;

    info:serverstartcode列:HRegionServer的启动时间戳。

    5.6 Region Server内部机制

    • WAL即Write Ahead Log,在早期版本中称为HLog,它是HDFS上的一个文件,如其名字所表示的,所有写操作都会先保证将数据写入这个Log文件后,才会真正更新MemStore,最后写入HFile中。WAL文件存储在/hbase/WALs/${HRegionServer_Name}的目录中
    • BlockCache是一个读缓存,即“引用局部性”原理(也应用于CPU,分空间局部性和时间局部性,空间局部性是指CPU在某一时刻需要某个数据,那么有很大的概率在一下时刻它需要的数据在其附近;时间局部性是指某个数据在被访问过一次后,它有很大的概率在不久的将来会被再次的访问),将数据预读取到内存中,以提升读的性能。
    • HRegion是一个Table中的一个Region在一个HRegionServer中的表达。一个Table可以有一个或多个Region,他们可以在一个相同的HRegionServer上,也可以分布在不同的HRegionServer上,一个HRegionServer可以有多个HRegion,他们分别属于不同的Table。HRegion由多个Store(HStore)构成,每个HStore对应了一个Table在这个HRegion中的一个Column Family,即每个Column Family就是一个集中的存储单元,因而最好将具有相近IO特性的Column存储在一个Column Family,以实现高效读取(数据局部性原理,可以提高缓存的命中率)。HStore是HBase中存储的核心,它实现了读写HDFS功能,一个HStore由一个MemStore 和0个或多个StoreFile组成。
    • MemStore是一个写缓存(In Memory Sorted Buffer),所有数据的写在完成WAL日志写后,会 写入MemStore中,由MemStore根据一定的算法将数据Flush到地层HDFS文件中(HFile),通常每个HRegion中的每个 Column Family有一个自己的MemStore。
    • HFile(StoreFile) 用于存储HBase的数据(Cell/KeyValue)。在HFile中的数据是按RowKey、Column Family、Column排序,对相同的Cell(即这三个值都一样),则按timestamp倒序排列。
    • FLUSH详述
    1. 每一次Put/Delete请求都是先写入到MemStore中,当MemStore满后会Flush成一个新的StoreFile(底层实现是HFile),即一个HStore(Column Family)可以有0个或多个StoreFile(HFile)。
    2. 当一个HRegion中的所有MemStore的大小总和超过了hbase.hregion.memstore.flush.size的大小,默认128MB。此时当前的HRegion中所有的MemStore会Flush到HDFS中。
    3. 当全局MemStore的大小超过了hbase.regionserver.global.memstore.upperLimit的大小,默认40%的内存使用量。此时当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush顺序是MemStore大小的倒序(一个HRegion中所有MemStore总和作为该HRegion的MemStore的大小还是选取最大的MemStore作为参考?有待考证),直到总体的MemStore使用量低于hbase.regionserver.global.memstore.lowerLimit,默认38%的内存使用量。
    4. 当前HRegionServer中WAL的大小超过了hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs的数量,当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush使用时间顺序,最早的MemStore先Flush直到WAL的数量少于hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs这里说这两个相乘的默认大小是2GB,查代码,hbase.regionserver.max.logs默认值是32,而hbase.regionserver.hlog.blocksize默认是32MB。但不管怎么样,因为这个大小超过限制引起的Flush不是一件好事,可能引起长时间的延迟

    六.HBASE优化

    6.1 高可用

    在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以HBase支持对Hmaster的高可用配置。

    1.关闭HBase集群(如果没有开启则跳过此步)

    [atguigu@hadoop102 hbase]$ bin/stop-hbase.sh

    2.在conf目录下创建backup-masters文件

    [atguigu@hadoop102 hbase]$ touch conf/backup-masters

    3.在backup-masters文件中配置高可用HMaster节点

    [atguigu@hadoop102 hbase]$ echo hadoop103 > conf/backup-masters

    4.将整个conf目录scp到其他节点

    [atguigu@hadoop102 hbase]$ scp -r conf/ hadoop103:/opt/module/hbase/

    [atguigu@hadoop102 hbase]$ scp -r conf/ hadoop104:/opt/module/hbase/

    6.2 预分区

    每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。

    1.手动设定预分区

    hbase> create 'staff1','info','partition1',SPLITS => ['1000','2000','3000','4000']

    2.生成16进制序列预分区

    create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

    3.按照文件中设置的规则预分区

    创建splits.txt文件内容如下:

    aaaa

    bbbb

    cccc

    dddd

    然后执行:

    create 'staff3','partition3',SPLITS_FILE => 'splits.txt'

    4.使用JavaAPI创建预分区

    //自定义算法,产生一系列Hash散列值存储在二维数组中

    byte[][] splitKeys = 某个散列值函数

    //创建HBaseAdmin实例

    HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create());

    //创建HTableDescriptor实例

    HTableDescriptor tableDesc = new HTableDescriptor(tableName);

    //通过HTableDescriptor实例和散列值二维数组创建带有预分区的HBase表

    hAdmin.createTable(tableDesc, splitKeys);

    6.3 RowKey设计

    一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。接下来我们就谈一谈rowkey常用的设计方案。

    1.生成随机数、hash、散列值

    比如:

    原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7

    原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd

    原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913

    在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后作为每个分区的临界值。

    2.字符串反转

    20170524000001转成10000042507102

    20170524000002转成20000042507102

    3.字符串拼接

    20170524000001_a12e

    20170524000001_93i7

    6.4 内存优化

    HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。

    6.5 基础优化

    1.允许在HDFS的文件中追加内容

    hdfs-site.xml、hbase-site.xml

    属性:dfs.support.append

    解释:开启HDFS追加同步,可以优秀的配合HBase的数据同步和持久化。默认值为true。

    2.优化DataNode允许的最大文件打开数

     

     

    hdfs-site.xml

    属性:dfs.datanode.max.transfer.threads

    解释:HBase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096

    3.优化延迟高的数据操作的等待时间

    hdfs-site.xml

    属性:dfs.image.transfer.timeout

    解释:如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。

    4.优化数据的写入效率

    mapred-site.xml

    属性:

    mapreduce.map.output.compress

    mapreduce.map.output.compress.codec

    解释:开启这两个数据可以大大提高文件的写入效率,减少写入时间。第一个属性值修改为true,第二个属性值修改为:org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式。

    5.设置RPC监听数量

    hbase-site.xml

    属性:hbase.regionserver.handler.count

    解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。

    6.优化HStore文件大小

    hbase-site.xml

    属性:hbase.hregion.max.filesize

    解释:默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。

    7.优化hbase客户端缓存

    hbase-site.xml

    属性:hbase.client.write.buffer

    解释:用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。

    8.指定scan.next扫描HBase所获取的行数

    hbase-site.xml

    属性:hbase.client.scanner.caching

    解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。

    9.flush、compact、split机制

    当MemStore达到阈值,将Memstore中的数据Flush进Storefile;compact机制则是把flush出来的小文件合并成大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。

    涉及属性:

    128M就是Memstore的默认阈值

    hbase.hregion.memstore.flush.size:134217728

    即:这个参数的作用是当单个HRegion内所有的Memstore大小总和超过指定值时,flush该HRegion的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。

    hbase.regionserver.global.memstore.upperLimit:0.4

    hbase.regionserver.global.memstore.lowerLimit:0.38

    即:当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit

    ================================================================================

    以后博客的内容都是通过微信公众号链接的形式发布,之后迁移到公众号的文章都会重新修正,也更加详细,对于以前博客内容里面的错误或者理解不当的地方都会在公众号里面修正。

    欢迎关注我的微信公众号,以后我会发布更多工作中总结的技术内容。

    展开全文
  • 大数据之hbase详解

    2019-09-02 10:22:27
    HBase的原型是Google的BigTable论文,受到了该论文思想的启发,目前作为Hadoop的子项目来开发维护,用于支持结构化的数据存储.HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBASE技术可在廉价...
  • Phoenix学习笔记

    2020-10-05 20:55:40
    目录 一、Phoenix是什么 二、Phoenix命令操作 2.1 基本命令 2.2 表操作 2.2.1 创建表 2.2.2 显示所有表 ...2.4 HBase 表映射 ...3.2 配置hbase支持Phoenix创建二级索引 3.2覆盖索引 3.3本地索引与全局..

    目录

    一、Phoenix是什么

    Phoenix 将 Hbase的数据模型映射到关系世界,让开发人员可以是使用SQL去操作HBase。

    Phoenix与HBase之间的映射关系如下表

    hbase

    phoenix

    namespace

    database

    table

    table

    rowkey

    Primary key

    column family

    column qulifier

    Column

    Phoenix特点如下:

    • 将 SQL 编译为 HBase 扫描
    • 确定扫描RowKey的最佳开始和结束位置   
    • 扫描并行执行
    • 将where子句推送到服务器端的过滤器
    • 通过协处理器进行聚合操作  coprocesssor select count(*) from stu;
    • 完美支持HBase二级索引创建
    • DML命令以及通过DDL命令创建和操作表
    • 容易集成:如Spark,Hive,Pig,Flume和Map Reduce

    二、Phoenix命令操作

    2.1 基本命令

    !table查看表信息 
    !describe tablename可以查看表字段信息
    !history可以查看执行的历史SQL
    !dbinfo
    !index tb;查看tb的索引
    !help查看其他操作

    2.2 表操作

    2.2.1 创建表

            Hbase是区分大小写的,Phoenix 默认会把sql语句中的小写转换成大写,再建表,如果不希望转换,需要将表名,字段名等使用引号。HBase默认phoenix表的主键对应到ROW,column family 名为0,也可以在建表的时候指定column family。创建的表如果是联合主键,会把联合主键当作rowkey。

    create table test (id varchar primary key,name varchar,age integer );

    2.2.2 显示所有表

    0: jdbc:phoenix:hadoop003:2181> !tables

    2.2.3 删除表

    drop table "cftest";

     

    2.3 数据操作

    2.3.1 全字段插入

    upsert into "population" values('NY','NewYork',8143197); //数据存在则更新

    2.3.2 部分字段插入

    upsert into "population" (STATE,CITY) values('CA','Los Angeles');

    2.3.3 删除数据

    delete  from "population" where state='CA' and city='Los Angeles';

     

    2.4 HBase 表映射

            如果hbase中已经有表存在,在phoenix中是看不到的,需要将hbase的表映射到phoenix中,如果只需要在phoenix中对该表进行查询操作可以映射为视图,在phoenix中视图只能查询,不能插入数据;如果还需要在phoenix中对该表进行数据增删改查可以映射为表

    2.4.1 视图映射

    (1)先在hbase中准备好表,并添加数据

    create 'emp','info'
    
    put 'emp','7368','info:ename','TOM'
    put 'emp','7368','info:job','CLERK'
    put 'emp','7368','info:mgr','7902'
    put 'emp','7368','info:sal','700.0'
    put 'emp','7368','info:hiredate','1980-12-17'
    put 'emp','7368','info:deptno','10'
    put 'emp','7368','info:comm','200'
    
    put 'emp','7369','info:ename','SMITH'
    put 'emp','7369','info:job','SALES'
    put 'emp','7369','info:mgr','7902'
    put 'emp','7369','info:sal','800.0'
    put 'emp','7369','info:hiredate','1986-12-17'
    put 'emp','7369','info:deptno','20'
    put 'emp','7369','info:comm','300'

    (2)在phoenix中创建视图,并查看数据

    // 创建表
    create view  "emp"(
      "empno" varchar primary key, 
      "info"."ename" varchar, 
      "info"."job" varchar, 
      "info"."mgr" varchar, 
      "info"."hiredate" varchar, 
      "info"."sal" varchar, 
      "info"."comm" varchar, 
      "info"."deptno" varchar
    ) column_encoded_bytes=0;
    
    
    // 查询数据
    select * from "emp";
    
    // 插入数据会报错
    upsert into "emp" ("empno","ename","job") values ('8000','cat','SALES');

    (3)删除视图

    drop view "emp";

    2.4.2 表映射

    // 创建表
    create table  "emp"(
      "empno" varchar primary key, 
      "info"."ename" varchar, 
      "info"."job" varchar, 
      "info"."mgr" varchar, 
      "info"."hiredate" varchar, 
      "info"."sal" varchar, 
      "info"."comm" varchar, 
      "info"."deptno" varchar
    )column_encoded_bytes=0;
    
    // 查询表
    select * from "emp";
    
    // 可以插入数据
    upsert into "emp" ("empno","ename","job") values ('8000','cat','SALES');

    三、Phoenix 索引

    3.1 Phoenix 索引介绍

            HBase 只能通过 rowkey 进行搜索, 一般把 rowkey 称作一级索引. 在很长的一段时间里 HBase 就只支持一级索引.

            HBase 里面只有 rowkey 作为一级索引, 如果要对库里的非 rowkey 字段进行数据检索和查询, 往往要通过 MapReduce/Spark 等分布式计算框架进行,硬件资源消耗和时间延迟都会比较高。

            为了 HBase 的数据查询更高效、适应更多的场景, 诸如使用非 rowkey 字段检索也能做到秒级响应,或者支持各个字段进行模糊查询和多字段组合查询等, 因此需要在 HBase 上面构建二级索引, 以满足现实中更复杂多样的业务需求。

            从 0.94 版本开始, HBase 开始支持二级索引。

    3.2 配置hbase支持Phoenix创建二级索引

    (1)向hbase的配置文件hbase-site.xml中添加如下配置

    <property>
        <name>hbase.regionserver.wal.codec</name>
        <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
    </property>
    
    <property>
        <name>hbase.region.server.rpc.scheduler.factory.class</name>
        <value>org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory</value>
    <description>Factory to create the Phoenix RPC Scheduler that uses separate queues for index and metadata updates</description>
    </property>
    
    <property>
        <name>hbase.rpc.controllerfactory.class</name>
        <value>org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory</value>
        <description>Factory to create the Phoenix RPC Scheduler that uses separate queues for index and metadata updates</description>
    </property>
    <property>
        <name>hbase.master.loadbalancer.class</name>
        <value>org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer</value>
    </property>
    
    <property>
        <name>hbase.coprocessor.master.classes</name>
        <value>org.apache.phoenix.hbase.index.master.IndexMasterObserver</value>
    </property>

    (2)同时把以前zookeeper的配置的端口号去掉

    (3)将配置文件分发,重启Phoenix

     

    3.2 覆盖索引

            Phoenix特别强大,为我们提供了覆盖索引, 所谓覆盖索引就是从索引中直接获取查询结果。一旦从索引中找到数据,我们就不需要从主表再去查询数据了。可以将我们关心的数据捆绑在索引行中,从而节省了读取时间开销。

            要使用覆盖索引需要注意:1、select查询列中包含在索引列中,2、where条件包含索引列或者复合索引的前导列。

    //创建测试用例
    create table indextest (
    id char(4) not null primary key,
    age integer,
    name varchar,
    company varchar,
    school varchar)column_encoded_bytes=0;
    
    upsert into indextest values('0001',18,'张三','张三公司','张三学校');
    upsert into indextest values('0002',19,'李四','李四公司','李四学校');
    upsert into indextest values('0003',20,'王五','王五公司','王五学校');
    upsert into indextest values('0004',21,'赵六','赵六公司','赵六学校');
    
    
    // 创建基于company和school的覆盖索引,并将name列绑定在索引上。
    create index cover_indextest_index on indextest(company,school) include (name);
    
    // 可以执行如下sql,观察是否走索引。
    explain select name from indextest where school ='张三学校';  //不走
    explain select school from indextest where company ='张三学校';   //走
    
    // 删除索引   
    // drop index 索引名称 on 表名;
    drop index cover_indextest_index on indextest;
    
    
    // 接下来创建一个单字段覆盖索引看看,会走索引吗?
    create index cover_indextest_index_company on indextest(company) include (name);
    explain select name from indextest where company ='张三公司';
    //当使用*号作为字段去检索时,走的FULL SCAN。
    explain select  *  from indextest where company ='张三公司';
    // 这个时候你需要在查询时强行指定索引。
    explain select /*+index(indextest cover_indextest_index_company)*/ * from indextest where company='张三公司';
    
    
    // 创建包含多个字段的联合索引
    create index index_indextest_name_company on indextest(name,company);
    // 如下查询走索引
    explain select name from indextest where name ='张三';
    // 如下查询不走索引
    explain select name from indextest where company ='张三公司';

    3.3 本地索引与全局索引

        本地索引:local index索引数据会在原表添加新的列族存储原表中,侵入性强, 适合写多读少的情况。索引数据和数据表的数据是存放在相同的服务器中的,避免了在写操作的时候往不同服务器的索引表中写索引带来的额外开销。查询的字段不是索引字段索引表也会被使用,这会带来查询速度的提升。

        全局索引:即global index,索引数据会存储上单独的一张表中是默认的索引格式适合读多写少的情况写数据的时候会消耗大量开销,因为索引表也要更新,而索引表是分布在不同的数据节点上的,跨节点的数据传输带来了较大的性能消耗。在读取索引列的数据的时候 Phoenix 会选择索引表来降低查询消耗的时间。如果想查询的字段不是索引字段的话索引表不会被使用,也就是说不会带来查询速度的提升。

    // 建表
    create table indextest (
    id char(4) not null primary key,
    age integer,
    name varchar,
    company varchar,
    school varchar)column_encoded_bytes=0;
    // 创建全局索引
    0: jdbc:phoenix:> create index index_indextest_name on indextest(name);
    
    // 在Hbase客户端查看,并没有多出新的列族,只有默认的0族,而查看phoenix中的表,多了个索引表。
    hbase(main):024:0> describe 'INDEXTEST'
    // 将刚刚创建的全局索引删除,再创建本地索引,在Hbase客户端查看,发现多了索引列族。
    drop index index_indextest_name on indextest;
    create local index index_indextest_name on indextest(name);

    3.4 索引异步创建

            默认情况下,创建索引时,会在CREATE INDEX调用期间同步填充索引。但是如果数据表太大的话,会耗时巨大。从4.5开始,通过在索引创建DDL语句中包含ASYNC关键字,可以异步完成索引的填充,可以理解为让它进入后台慢慢去创建索引,而不是卡在那里等着创建完索引才退出命令。

    (1)在phoenix执行 并插入一些数据

    drop table indextest;
    
    create table indextest (
    id char(10) not null primary key,
    age integer,
    name varchar,
    company varchar,
    school varchar)column_encoded_bytes=0;
    

    (2)创建maven项目,引入hbase依赖

    <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        
        <dependency>
    	    <groupId>org.apache.phoenix</groupId>
    	    <artifactId>phoenix-core</artifactId>
    	    <version>4.8.0-HBase-1.2</version>
    	</dependency>
    </dependencies>

    (3)采用工具类,插入10w条数据测试。Jdbc连接工具

    package com.bigdata.myphoenix;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.*;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 功能介绍:使用jdbc对数据库操作:查询、更新(插入/修改/删除)、批量更新
     */
    public class DButil {
    
        private final Logger LOGGER = LoggerFactory.getLogger(getClass());
    
        /**jdbc的链接*/
        private Connection conn = null;
    
        /**准备sql*/
        private PreparedStatement ps = null;
    
        {
            initConnection();
        }
    
        /**
         * @param sql
         * @param params 参数
         * 功能介绍:更新操作(修改,删除,插入)
         */
        public int executeUpdate(String sql, Object[] params) {
            if(null == conn){
                initConnection();
            }
            try {
                ps = conn.prepareStatement(sql);
                if (params.length != 0) {
                    for (int i = 0; i < params.length; i++) {
                        ps.setObject(i + 1, params[i]);
                    }
                }
                int rows = ps.executeUpdate();
                conn.commit();
                return rows;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                closeUpdate();
            }
            return 0;
        }
    
        /**
         * @param sql
         * @param list
         * 功能介绍:批量更新
         */
        public void batchUpdate(String sql, List<Object[]> list) {
    
            if(null == conn){
                initConnection();
            }
            try {
                ps = conn.prepareStatement(sql);
                //关闭自动提交事务
                conn.setAutoCommit(false);
                //防止内存溢出
                final int batchSize = 1000;
                //记录插入数量
                int count = 0;
                int size = list.size();
                Object[] obj = null;
                for (int i = 0; i < size; i++) {
                    obj = list.get(i);
                    for (int j = 0; j < obj.length; j++) {
                        ps.setObject(j + 1, obj[j]);
                    }
                    ps.addBatch();
                    if (++count % batchSize == 0) {
                        ps.executeBatch();
                        conn.commit();
                    }
                }
                ps.executeBatch();
                conn.commit();
                conn.setAutoCommit(true);
            } catch (SQLException e) {
                e.printStackTrace();
                try {
                    conn.rollback();
                    conn.setAutoCommit(true);
                } catch (SQLException e1) {
                    e1.printStackTrace();
                }
            } finally {
                //关闭资源
                closeUpdate();
            }
        }
    
        /**
         * @param sql
         * @param params
         * 功能介绍:查询操作
         */
        public List<Map<String, Object>> executeQuery(String sql, Object[] params) {
            if(null == conn){
                initConnection();
            }
            ResultSet rs = null;
            List<Map<String, Object>> list = null;
            try {
                ps = conn.prepareStatement(sql);
                if (params != null) {
                    for (int i = 0; i < params.length; i++) {
                        ps.setObject(i + 1, params[i]);
                    }
                }
    
                long startTime = System.currentTimeMillis();
                rs = ps.executeQuery();
                LOGGER.info("UserBigTableService sql-executeQuery-time: " + (System.currentTimeMillis() - startTime) + "ms");
    
                list = new ArrayList<>();
                //移动光标,如果新的当前行有效,则返回 true;如果不存在下一行,则返回 false
                while (rs.next()) {
                    ResultSetMetaData rsmd = rs.getMetaData();
                    Map<String, Object> map = new HashMap<>(16);
                    for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                        map.put(rsmd.getColumnName(i), rs.getObject(i));
                    }
                    list.add(map);
                }
                return list;
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                closeQuery(rs);
            }
            return null;
        }
    
        /**
         * @param sql
         * @param params
         * 功能介绍:查询操作一条记录
         */
        public Map<String, Object> query (String sql, Object[] params) {
            if(null == conn){
                initConnection();
            }
            ResultSet rs = null;
            Map<String, Object> map = null;
            try {
                ps = conn.prepareStatement(sql);
                if (params != null) {
                    for (int i = 0; i < params.length; i++) {
                        ps.setObject(i + 1, params[i]);
                    }
                }
                rs = ps.executeQuery();
                //移动光标,如果新的当前行有效,则返回 true;如果不存在下一行,则返回 false
                while (rs.next()) {
                    ResultSetMetaData rsmd = rs.getMetaData();
                    map = new HashMap<>(16);
                    for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                        map.put(rsmd.getColumnName(i), rs.getObject(i));
                    }
                    //若有多条记录,取第一条。
                    break;
                }
                return map;
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                closeQuery(rs);
            }
            return null;
        }
    
        /**
         * 初始化连接
         */
        private void initConnection() {
            try {
                //local
               conn = DriverManager.getConnection("jdbc:phoenix:hadoop003");
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 功能介绍:关闭更新资源
         */
        private void closeUpdate() {
            try {
                if (ps != null) {
                    ps.close();
                }
    
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * @param rs 功能介绍:关闭查询资源
         */
        private void closeQuery(ResultSet rs) {
            try {
                if (rs != null) {
                    rs.close();
                }
    
                if (ps != null) {
                    ps.close();
                }
    
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    
    }

    (4)测试类

    package com.bigdata.myphoenix;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestPhoenix {
    
    	public static void main(String[] args) {
    		DButil dButil = new DButil();
    		
    		int id = 1;
    		String sql = "upsert into indextest values(?,?,?,?,?)";
    		for(int i = 0;i < 10*10000;i++){
    			Object[]  arr = new Object[5];
    			arr[0] =  id+"";
    			arr[1] =  18;
    			arr[2] =  "张三"+id;
    			arr[3] = "张三 公司"+id;
    			arr[4] =  "张三 学校"+id;
    			dButil.executeUpdate(sql, arr);
    			id ++;
    			System.out.println(id);
    		}
    	}
    
    }

    (5)执行插入数据操作,等待10w条数据插入完毕。也可以在phoenix执行count语句查看

    使用异步索引,需要两步。

    第一步:创建异步索引

    create  index index_indextest_name on indextest(name) async;

    这时,因为还没有真正生成索引,因此查看执行计划,不会走索引的。

    explain select name from indextest where name='张三姓名';

    第二步:通过HBase命令行单独启动填充索引表的map reduce作业,这时会提交yarn任务,为刚才的表创建索引。

    [root@hadoop003 hbase-1.3.1]# bin/hbase org.apache.phoenix.mapreduce.index.IndexTool --data-table INDEXTEST --index-table INDEX_INDEXTEST_NAME --output-path /hbase/index/indextest/index_indextest_name

    等待yarn任务完成,这时再查看执行计划,就会看到查询走索引了。

    只有当map reduce作业完成时,才会激活索引并开始在查询中使用。

    output-path选项用于指定用于写入HFile的HDFS目录

     

    展开全文
  • 需要先安装好hbase集群,phoenix只是一个工具,只需要在一台机器上安装就可以了 1、下载安装包 从对应的地址下载:http://mirrors.cnnic.cn/apache/phoenix/ 这里我们使用的是 phoenix-4.8.2-HBase-1.2-bin.tar.gz...
  • Hbase&Phoenix学习笔记

    2018-10-10 17:12:51
    hadoop fs -ls /apps/hbase/data/data/h3c/log_suspect_track_history_spark/hadoop fs - ls /apps/hbase/data/data/h3c/log_suspect_track_history_spark/ 统计某张Hbase表中ROW_KEY的分布情况 ./h...
  • HBase全网最佳学习资料汇总 https://yq.aliyun.com/articles/169085?spm=5176.100239.blogrightarea176102.19.pH0StL 学术界关于HBase在物联网/车联网/互联网/金融/高能物理等八大场景的理论研究 ...
  • hbase 详解

    千次阅读 2019-07-10 16:06:42
    1.hbase简介 1.1.什么是hbaseHBASE是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群。   HBASE的目标是存储并处理大型的数据,更具体来...
  • HBase学习之一】HBase简介

    千次阅读 2018-09-19 21:10:50
    目录   一、简介 二、HBase使用场景 2.1 历史数据存储类应用(约占七成) ... HBase - Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上...
  • HBase详细概述

    万次阅读 多人点赞 2016-06-12 19:40:02
    本文首先简单介绍了HBase,然后重点讲述了HBase的高并发和实时处理数据 、HBase数据模型、HBase物理存储、HBase系统架构、HBase三维有序存储、HBase调优、HBase Shell访问,HLog、HFile等。
  • 1、什么是hbase HBASE是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群。 HBASE的目标是存储并处理大型的数据,更具体来说是仅需使用普通的...
  • HBase常用操作之namespace

    万次阅读 多人点赞 2015-06-30 10:15:19
    HBase中,namespace命名空间指对一组表的逻辑分组,类似RDBMS中的database,方便对表在业务上划分。Apache HBase从0.98.0, 0.95.2两个版本开始支持namespace级别的授权操作,HBase全局管理员可以创建、修改和回收...
  • Hbase启动和停止命令

    万次阅读 2017-11-04 08:48:24
    启动HBase集群: bin/start-hbase.sh 单独启动一个HMaster进程: bin/hbase-daemon.sh start master 单独停止一个HMaster进程: bin/hbase-daemon.sh stop master 单独启动一个HRegionServer...
  • HBase配置web界面

    万次阅读 2017-04-13 16:05:01
    为了学习个Hbase,所有的配置都正常,也能正常启动,但就是访问不到web界面。用某老师的话说,这就有点闹心了哈!仔细一看原来版本不一样,1.0.1版本的hbase的master web 默认是不运行的,所以需要自己配置默认端口...
  • Hbase 统计表行数的3种方式总结

    万次阅读 2015-06-03 12:47:34
    有些时候需要我们去统计某一个hbase表的行数,由于hbase本身不支持SQL语言,只能通过其他方式实现。可以通过一下几种方式实现hbase表的行数统计工作: 1.count命令 最直接的方式是在hbase shell中执行count的命令...
  • linux下Hbase的常用shell命令

    万次阅读 2016-11-01 21:54:23
    HBase Shell和HBase交互 HBase常用shell语句 创建表 添加记录 查看所有记录 查看表中记录数 删除记录 删除表linux下查看hbase的安装路径find / -name hbaseHBase Shell和HBase交互/usr/hdp/2.3.4.
  • 1.我使用的是稳定版本的hbasehbase-1.2.4-bin.tar.gz 下载地址:http://www-eu.apache.org/dist/ 里面有个stable版本的2.配置环境变量,集群上的每个节点都要配置 vim ~/.bashrcexport HBASE_HOME=/usr/local/...
  • Hbase权限控制

    万次阅读 2017-06-19 13:09:45
    Hbase权限配置、使用手册 1 Hbase权限控制简介 Hbase的权限控制是通过AccessController Coprocessor协处理器框架实现的,可实现对用户的RWXCA的权限控制。 2 配置 配置hbase-site.xml CM主页→点击hbase(进入Hbase...
1 2 3 4 5 ... 20
收藏数 142,797
精华内容 57,118
关键字:

hbase