精华内容
下载资源
问答
  • hbase海量数据导入

    2016-06-29 17:56:58
    最近有个需求要对mysql的全量数据迁移到hbase,虽然hbase的设计非常利于高效的读取,但是它的compaction实现对海量数据写入造成非常大的影响,数据到一定量之后,就开始抽风。  分析hbase的实现,不管其运行的机制...
    最近有个需求要对mysql的全量数据迁移到hbase,虽然hbase的设计非常利于高效的读取,但是它的compaction实现对海量数据写入造成非常大的影响,数据到一定量之后,就开始抽风。 
    
    分析hbase的实现,不管其运行的机制,其最终存储结构为分布式文件系统中的hfile格式。 
    刚好hbase的源代码中提供一个HFileOutputFormat类,分析其源代码可以看到: 
    Java代码   收藏代码
    1. /** 
    2.  * Copyright 2009 The Apache Software Foundation 
    3.  * 
    4.  * Licensed to the Apache Software Foundation (ASF) under one 
    5.  * or more contributor license agreements.  See the NOTICE file 
    6.  * distributed with this work for additional information 
    7.  * regarding copyright ownership.  The ASF licenses this file 
    8.  * to you under the Apache License, Version 2.0 (the 
    9.  * "License"); you may not use this file except in compliance 
    10.  * with the License.  You may obtain a copy of the License at 
    11.  * 
    12.  *     http://www.apache.org/licenses/LICENSE-2.0 
    13.  * 
    14.  * Unless required by applicable law or agreed to in writing, software 
    15.  * distributed under the License is distributed on an "AS IS" BASIS, 
    16.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    17.  * See the License for the specific language governing permissions and 
    18.  * limitations under the License. 
    19.  */  
    20. package org.apache.hadoop.hbase.mapreduce;  
    21.   
    22. import java.io.IOException;  
    23. import java.util.Map;  
    24. import java.util.TreeMap;  
    25.   
    26. import org.apache.hadoop.conf.Configuration;  
    27. import org.apache.hadoop.fs.FileSystem;  
    28. import org.apache.hadoop.fs.Path;  
    29. import org.apache.hadoop.hbase.HConstants;  
    30. import org.apache.hadoop.hbase.KeyValue;  
    31. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
    32. import org.apache.hadoop.hbase.io.hfile.Compression;  
    33. import org.apache.hadoop.hbase.io.hfile.HFile;  
    34. import org.apache.hadoop.hbase.regionserver.StoreFile;  
    35. import org.apache.hadoop.hbase.util.Bytes;  
    36. import org.apache.hadoop.mapreduce.RecordWriter;  
    37. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
    38. import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
    39. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    40. import org.mortbay.log.Log;  
    41.   
    42. /** 
    43.  * Writes HFiles. Passed KeyValues must arrive in order. 
    44.  * Currently, can only write files to a single column family at a 
    45.  * time.  Multiple column families requires coordinating keys cross family. 
    46.  * Writes current time as the sequence id for the file. Sets the major compacted 
    47.  * attribute on created hfiles. 
    48.  * @see KeyValueSortReducer 
    49.  */  
    50. public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {  
    51.   public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context)  
    52.   throws IOException, InterruptedException {  
    53.     // Get the path of the temporary output file   
    54.     final Path outputPath = FileOutputFormat.getOutputPath(context);  
    55.     final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();  
    56.     Configuration conf = context.getConfiguration();  
    57.     final FileSystem fs = outputdir.getFileSystem(conf);  
    58.     // These configs. are from hbase-*.xml  
    59.     final long maxsize = conf.getLong("hbase.hregion.max.filesize"268435456);  
    60.     final int blocksize = conf.getInt("hfile.min.blocksize.size"65536);  
    61.     // Invented config.  Add to hbase-*.xml if other than default compression.  
    62.     final String compression = conf.get("hfile.compression",  
    63.       Compression.Algorithm.NONE.getName());  
    64.   
    65.     return new RecordWriter<ImmutableBytesWritable, KeyValue>() {  
    66.       // Map of families to writers and how much has been output on the writer.  
    67.       private final Map<byte [], WriterLength> writers =  
    68.         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);  
    69.       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;  
    70.       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());  
    71.   
    72.       public void write(ImmutableBytesWritable row, KeyValue kv)  
    73.       throws IOException {  
    74.         long length = kv.getLength();  
    75.         byte [] family = kv.getFamily();  
    76.         WriterLength wl = this.writers.get(family);  
    77.         if (wl == null || ((length + wl.written) >= maxsize) &&  
    78.             Bytes.compareTo(this.previousRow, 0this.previousRow.length,  
    79.               kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {  
    80.           // Get a new writer.  
    81.           Path basedir = new Path(outputdir, Bytes.toString(family));  
    82.           if (wl == null) {  
    83.             wl = new WriterLength();  
    84.             this.writers.put(family, wl);  
    85.             if (this.writers.size() > 1throw new IOException("One family only");  
    86.             // If wl == null, first file in family.  Ensure family dir exits.  
    87.             if (!fs.exists(basedir)) fs.mkdirs(basedir);  
    88.           }  
    89.           wl.writer = getNewWriter(wl.writer, basedir);  
    90.           Log.info("Writer=" + wl.writer.getPath() +  
    91.             ((wl.written == 0)? """, wrote=" + wl.written));  
    92.           wl.written = 0;  
    93.         }  
    94.         kv.updateLatestStamp(this.now);  
    95.         wl.writer.append(kv);  
    96.         wl.written += length;  
    97.         // Copy the row so we know when a row transition.  
    98.         this.previousRow = kv.getRow();  
    99.       }  
    100.   
    101.       /* Create a new HFile.Writer. Close current if there is one. 
    102.        * @param writer 
    103.        * @param familydir 
    104.        * @return A new HFile.Writer. 
    105.        * @throws IOException 
    106.        */  
    107.       private HFile.Writer getNewWriter(final HFile.Writer writer,  
    108.           final Path familydir)  
    109.       throws IOException {  
    110.         close(writer);  
    111.         return new HFile.Writer(fs,  StoreFile.getUniqueFile(fs, familydir),  
    112.           blocksize, compression, KeyValue.KEY_COMPARATOR);  
    113.       }  
    114.   
    115.       private void close(final HFile.Writer w) throws IOException {  
    116.         if (w != null) {  
    117.           StoreFile.appendMetadata(w, System.currentTimeMillis(), true);  
    118.           w.close();  
    119.         }  
    120.       }  
    121.   
    122.       public void close(TaskAttemptContext c)  
    123.       throws IOException, InterruptedException {  
    124.         for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {  
    125.           close(e.getValue().writer);  
    126.         }  
    127.       }  
    128.     };  
    129.   }  
    130.   
    131.   /* 
    132.    * Data structure to hold a Writer and amount of data written on it.  
    133.    */  
    134.   static class WriterLength {  
    135.     long written = 0;  
    136.     HFile.Writer writer = null;  
    137.   }  
    138. }  


    可以看到,它的工作流程就是首先根据你的配置文件初始化,然后写成hfile的格式。 
    这里我做了个偷懒的demo: 
    Java代码   收藏代码
    1. HFileOutputFormat hf = new HFileOutputFormat();  
    2.         HBaseConfiguration conf = new HBaseConfiguration();  
    3.         conf.addResource(new Path("/home/performance/softs/hadoop/conf/core-site.xml"));  
    4.         conf.set("mapred.output.dir""/tmp");  
    5.         conf.set("hfile.compression", Compression.Algorithm.LZO.getName());  
    6.         TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());  
    7.         RecordWriter writer = hf.getRecordWriter(context);  
    8.         KeyValue kv = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:action"),  
    9.                                    System.currentTimeMillis(), Bytes.toBytes("test"));  
    10.         KeyValue kv1 = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:id"),  
    11.                                     System.currentTimeMillis(), Bytes.toBytes("123"));  
    12.         KeyValue kv3 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:action"),  
    13.                                     System.currentTimeMillis(), Bytes.toBytes("test"));  
    14.         KeyValue kv4 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:id"),  
    15.                                     System.currentTimeMillis(), Bytes.toBytes("123"));  
    16.         writer.write(null, kv);  
    17.         writer.write(null, kv1);  
    18.         writer.write(null, kv3);  
    19.         writer.write(null, kv4);  
    20.         writer.close(context);  

    执行然之后,会在hdfs的/tmp目录下生成一份文件。 注意批量写数据的时候一定要保证key的有序性  
    这个时候,hbase自己提供的一个基于jruby的loadtable.rb脚本就可以发挥作用了。 
    它的格式是loadtable.rb 你希望的表明 hdfs路径: 
    hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/ 
    执行完之后: 
    运行./hbase shell 
    >list 
    就会显示刚才导入的offer表了。 
    展开全文
  • Hbase海量数据导入HbaseHbase数据导入Hdfs 文章目录Hbase--海量数据导入HbaseHbase数据导入Hdfs一:海量数据导入Hbase1.代码实现2.遇到的错误以及注意要点二:数据从Hbase导入到Hdfs1.代码实现2.遇到的错误...

    Hbase–海量数据导入Hbase和Hbase数据导入Hdfs

    一:海量数据导入Hbase

    1.代码实现
    package hbasePut;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.TestMiniMRClientCluster.MyReducer;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class HdfstrueHbase {
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		//加载配置文件
    		Configuration conf=new Configuration();
    		//将core.site  hdfs.site放在src下
    		//设置高可用的集群
    		conf.set("fs.defaultFS", "hdfs://bd1906/");
    		//指定zookeeper的访问路径
    		conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
    		//启动一个job
    		Job job=Job.getInstance(conf);
    		/**
    		 * 指定job的主要入口
    		 * class的获取
    		 * 1:类名,class
    		 * 2:对象,getClass
    		 * 3.Class.forname
    		 */
    		job.setJarByClass(HdfstrueHbase.class);
    		//指定job中mapper对应的类
    		job.setMapperClass(MyMapper.class);
    		//指定reduce对应的类
    		//job.setReducerClass(MyReducer.class);
    		
    		/**
    		 * 泛型:只在代码编译时后生效,代码运行的时候自动删除
    		 * 框架---map---reduce---hadfs
    		 */
    		//指定map输出的key  value的类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    		
    		
    		//指定reduce输出的key value的类型
    		job.setOutputKeyClass(NullWritable.class);
    		job.setOutputValueClass(Put.class);
    		
    		//指定输入  
    		Path inpath=new Path("/user/hbase/ratings.dat");
    		FileInputFormat.addInputPath(job, inpath);
    		
    		//指定输出路径 最终结果存储 值得hdfs的路径,输出的路径一定不能存在
    		//借助一个工具类指定表名
    		/**
    		 * 参数1:表名
    		 * 参数2:reduce的对应类
    		 * 参数三:job
    		 */
    		TableMapReduceUtil.initTableReducerJob("user_hdfs", MyReducer.class, job, 
    				null, null, null, null, false);
    		
    		//真正的提交job,不会打印日志
    		//job.submit()
    		job.waitForCompletion(true);
    	}
    	/**
    	 * key:行键
    	 * value:其他数据信息
    	 *
    	 */
    	static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
    		Text mk=new Text();
    		Text mv=new Text();
    		int i = 0;
    		@Override
    		protected void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			//1::1193::5::978300760
    			i++;
    			String line = value.toString();
    			String[] datas = line.split("::");
    			mk.set(datas[0]+i);
    			mv.set(datas[1]+"\t"+datas[2]+"\t"+datas[3]);
    			context.write(mk, mv);
    		}
    	}
    	/**
    	 * 泛型1:泛型2:map端输入
    	 * 泛型3:reduce端输出的key
    	 *泛型4:省略,mutation对象---put|delete
    	 *
    	 *解析map发送的数据进行封装
    	 */
    	static class MyReducer extends TableReducer<Text, Text, NullWritable>{
    		@Override
    		protected void reduce(Text key, Iterable<Text> value,Context context)
    				throws IOException, InterruptedException {
    			for (Text v : value) {
    				String[] datas = v.toString().split("\t");
    				Put p=new Put(key.getBytes());
    				//1::1193::5::978300760
    				p.addColumn("info".getBytes(), "version".getBytes(), datas[0].getBytes());
    				p.addColumn("info".getBytes(), "amount".getBytes(), datas[1].getBytes());
    				p.addColumn("info".getBytes(), "id".getBytes(), datas[2].getBytes());
    				//将put对象写出到hbase中
    				context.write(NullWritable.get(), p);
    			}
    		}
    	}
    }
    
    
    2.遇到的错误以及注意要点

    1)这里使用的hdfs配置的是高可用集群,需要将linux中Hbase的安装配置文件中的core.site和hdfs.site放在src下,不然不能识别;

    2)出现错误:Exception in thread “main” java.lang.IllegalArgumentException: Pathname /D:/bigdata_soft/hbase-1.2.6/lib/hbase-prefix-tree-1.2.6.jar from hdfs://bd1906/D:/bigdata_soft/hbase-1.2.6/lib/hbase-prefix-tree-1.2.6.jar is not a valid DFS filename.这个是jar包冲突导致的错误,解决办法:

    • 调整jar包的导入顺序: hadoop导入在前, hbase 导入在后
    • 代码TableMapReduceUtil.initTableReducerJob(“user_hdfs”, MyReducer.class,job,null,null,null,null,false);

    3)TableMapReduceUtil工具类的initTableReducerJob()方法可以用来指定路径,和解决jar包冲突,是一个好东西。

    4)如果使用TableMapReduceUtil指定了路径,前面的job指定的路径就可以删掉了

    5)一定要注意路径问题,如果在hdfs中路径错误,可能在map阶段处理的时候回造成数组下表越界的问题

    二:数据从Hbase导入到Hdfs

    1.代码实现

    这里是一个例题:求相同年龄的人数,将结果写到hdfs上面

    package HbaseTsxt;
    
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class HbaseToHdfs {
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		Configuration conf=HBaseConfiguration.create();
    		conf.set("fs.defaultFS", "hdfs://bd1906/");//mapreduce相关的,设置为高可用
    		conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");//指定zookeeper
    		//Connection connection=ConnectionFactory.createConnection(conf);
    		//启动一个工作
    		Job job=Job.getInstance(conf);
    		//指定工作的入口
    		job.setJarByClass(HbaseToHdfs.class);
    		//指定reduce的入口
    		job.setReducerClass(MyReduce.class);
    		//指定map的入口
    		job.setMapperClass(MyMapper.class);
    		//指定map输出的key,value
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		//指定reduce输出的key,value
    		//job.setOutputKeyClass(Text.class);
    		//job.setOutputValueClass(IntWritable.class);
    		//指定hbase的表
    		Scan scan=new Scan();
    		//scan.addColumn(family, qualifier);
    		TableMapReduceUtil.initTableMapperJob("HbaseAPI:HbaseAPI01", scan, MyMapper.class, Text.class, IntWritable.class, job,false);
    		//指定输出
    		FileSystem fs=FileSystem.get(conf);
    		Path output=new Path("/user/hbase/data/hbasetohdfs");
    		if(fs.exists(output)){
    			fs.delete(output, true);
    		}
    		//FileOutputFormat指定工作的输出路径
    		FileOutputFormat.setOutputPath(job, output);
    		//提交工作
    		job.waitForCompletion(true);
    		
    	}
    	/**
    	 * 泛型1:输出的key的类型
    	 * 泛型2:输出的value的类型
    	 *
    	 */
    	static class MyMapper extends TableMapper<Text, IntWritable>{
    		/**
    		 * 参数1:行键对象
    		 * 参数2:行键那一行的结果集
    		 */
    		//创建输入输出对象
    		Text mk=new Text();
    		IntWritable mv=new IntWritable();
    		@Override
    		protected void map(ImmutableBytesWritable key, Result value,
    				Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			//listCells()方法取出所有的单元格
    			List<Cell> Cells = value.listCells();
    			//遍历单元格,做k,v处理
    			for (Cell cell : Cells) {
    				String qualifier=Bytes.toString(CellUtil.cloneQualifier(cell));
    				if (qualifier.equals("age")) {
    					String k=Bytes.toString(CellUtil.cloneValue(cell));
    					mk.set(k);
    					mv.set(1);
    					context.write(mk, mv);
    				}
    			}
    		}
    	}
    	/**
    	 * 参数1:输入key的类型
    	 * 参数2:输入value的类型
    	 * 参数2:输出key的类型
    	 * 参数4:输出value的类型
    	 */
    	static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    		IntWritable mv=new IntWritable();
    		@Override
    		/**
    		 * 参数1:输入的key
    		 * 参数2:map端的value集合
    		 * key相同的启动一个reducetask,对values处理,v累=累加
    		 */
    		protected void reduce(Text key, Iterable<IntWritable> values,
    				Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    			int count=0;
    			for (IntWritable v : values) {
    				count+=v.get();
    			}
    			mv.set(count);
    			context.write(key, mv);
    		}
    		
    	}
    }
    
    
    2.遇到的错误以及注意要点

    1)写出的路径存在导致的错误:Exception in thread “main” org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://bd1906/user/hbase/data/hbasetohdfs already exists 解决办法:每次指定输出到hdfs上面的时候进行判断,如果路径存在就删除路径下的文件

    if(fs.exists(output)){
    			fs.delete(output, true);
    		}
    

    2)权限问题,导致无法连接,可以加上权限管理

    System.setProperty("HADOOP_USER_NAME", "hadoop");
    
    展开全文
  • hbase海量数据的全量导入方法,大数据导入
  • 博主之前简单介绍了,HBase写入数据的原理与HBase表在hdfs上的存储结构,还搞了搞HBase提供的导入工具ImportTsv, 想了解更多ImportTsv使用,请戳HBase导入海量数据之使用ImportTsv向HBase导入大量数据 ...

    前言

    博主之前简单介绍了,HBase写入数据的原理与HBase表在hdfs上的存储结构,还搞了搞HBase提供的导入工具ImportTsv, 想了解更多ImportTsv使用,请戳HBase导入海量数据之使用ImportTsv向HBase中导入大量数据

    今天咱们了解下Buckload

    如下图示,充分解释了Buckload的导入原理,通过MapReduce程序在hdfs中直接生成HFlie文件,然后将HFile文件移动到HBase中对应表的HDFS目录下
    在这里插入图片描述

    • ImportTsv是命令行导入,Buckload可以自定义程序生成HFile,再进行导入,由此可见,BuckLoad比较灵活

    1. BuckLoad导入数据到HBase,程序编写步骤

    1. 编写mapper程序,无论是map还是reduce,输出类型必须是<ImmutableBytesWritable,Put>或者<ImmutableBytesWritable,Keyvalue>
    2. 编写map方法,包含处理数据的逻辑。
    3. 将处理后的数据写到hdfs中
    4. 配置MapReduce任务的输入/输出格式,输入/输出类型,输入/输出目录等
    5. 使用BuckLoad方式导入数据,有两种方法:
      • 代码: 创建LoadIncrementalHFiles对象,调用doBulkLoad方法,加载刚才MapReduce程序生成HFile到表中即可。
        doBulkLoad有两种,HTable已经过时了,现在推荐使用第一种,
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
    loader.doBulkLoad(new Path(OUTPUT_PATH),admin,table,connection,getRegionLocator(tableName.valueOf(tableName)))
    
    • 命令行:在命令行中使用如下命令
    HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase classpath` hadoop jar $HBASE_HOME/lib/hbase-server-version.jar completebulkload <生成的HFile路径> <表名称> 
    

    2. 实例展示

    2.1 背景

    这有一个用户浏览网站的日志,分隔符为逗号

    共有四列:

    1. 手机号反转(避免Region热点)
    2. 手机号
    3. Mac地址
    4. 用户访问记录(用&&分割)访问记录内容:时间戳-agent-访问目录-上行流量-下行流量
    56279618741,14781697265,65:85:36:f9:b1:c0,
    1539787307-Mozilla/5.0 Macintosh; Intel Mac OS X 10_10_1 AppleWebKit/537.36 KHTML like Gecko Chrome/37.0.2062.124 Safari/537.36-https://dl.lianjia.com/ershoufang/102100802770.html-13660-6860
    &&1539786398-Mozilla/5.0 Windows NT 5.1 AppleWebKit/537.36 KHTML like Gecko Chrome/36.0.1985.67 Safari/537.36-https://dl.lianjia.com/ershoufang/102100576898.html-1959-91040
    &&1539785462-Mozilla/5.0 Windows NT 10.0 AppleWebKit/537.36 KHTML like Gecko Chrome/40.0.2214.93 Safari/537.36-https://dl.lianjia.com/ershoufang/102100762258.html-12177-53132
    

    2.2 数据大小

    12.48GB,共1999940条数据

    2.3 需求

    • 手机号反转做Rowkey,
    • 将手机号,Mac地址,用户访问地址插入到Info列簇的phoneNumber,macAddress,userView列中,
    • 将用户访问记录转为json格式

    2.4 代码实现

    1. 编写Mapper程序和map方法

    需要注意的是,要对rowkey的长度进行判断,筛选出rowkey长度大于0的,否则会报错

    public static class BuckLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
            public void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String[] dataLine = value.toString().split(",");
                // 手机号反转
                String phoneNumberReverse = dataLine[0];
                // 手机号
                String phoneNumber = dataLine[1];
                //  mac地址
                String macAddress = dataLine[2];
                //  用户访问浏览历史
                String userViewHistory = dataLine[3];
                //  解析用户访问浏览历史
                String[] userViewHistoryParse = userViewHistory.split("&&");
                //  创建StringBuffer用户拼接json
                StringBuffer stringBuffer = new StringBuffer();
                stringBuffer.append("[");
                for (String view : userViewHistoryParse) {
                    //  拼接json
                    String[] viewDetail = view.split("-");
                    String time = viewDetail[0];
                    String userAgent = viewDetail[1];
                    String visitUrl = viewDetail[2];
                    String upFlow = viewDetail[3];
                    String downFlow = viewDetail[4];
                    String json = "{\"time\":\"" + time + "\",\"userAgent\":\"" + userAgent + "\",\"visitUrl\":\"" + visitUrl + "\",\"upflow\":\"" + upFlow + "\",\"downFlow\":\"" + downFlow + "\"}";
                    stringBuffer.append(json);
                    stringBuffer.append(",");
                }
                stringBuffer.append("]");
                stringBuffer.deleteCharAt(stringBuffer.lastIndexOf(","));
                userViewHistory = stringBuffer.toString();
                //  将手机号反转作为rowkey
                ImmutableBytesWritable rowkey = new ImmutableBytesWritable(phoneNumberReverse.getBytes());
                // 筛选出rowkey为0的rowkey,某则导入的时候会报错
                if (rowkey.getLength()>0){
                    //  将其他列数据插入到对应列族中
                    Put put = new Put(phoneNumberReverse.getBytes());
                    put.addColumn("info".getBytes(), "phoneNumber".getBytes(), phoneNumber.getBytes());
                    put.addColumn("info".getBytes(), "macAddress".getBytes(), macAddress.getBytes());
                    put.addColumn("info".getBytes(), "userViewHistory".getBytes(), userViewHistory.getBytes());
                    context.write(rowkey, put);
                }
            }
        }
    
    1. 编写Reduce任务配置
    • 导入包的时候,注意导入的FileInputFormat和FileOutputFormat是下面这两个包
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public static void main(String[] args) throws Exception {
            final String INPUT_PATH= "hdfs://cluster/louisvv/weblog-20181121.txt";
            final String OUTPUT_PATH= "hdfs://cluster/louisvv/HFileOutput";
            Configuration conf=HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "192.168.1.22,192.168.1.50,192.168.1.51");
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("zookeeper.znode.parent", "/hbase-unsecure");
            conf.set("hbase.master", "192.168.1.22:16010");
            String tableName="user-view";
            Connection connection = null;
            try {
                // 创建hbase connection
                connection = ConnectionFactory.createConnection(conf);
                //  获取hbase admin
                Admin admin=connection.getAdmin();
                //  创建hbase table
                Table table = connection.getTable(TableName.valueOf(tableName));
                //  设置mapreduce job相关内容
                Job job=Job.getInstance(conf);
                job.setJarByClass(BuckLoadImport.class);
                //  设置mapper class
                job.setMapperClass(BuckLoadImport.BuckLoadMap.class);
                //  设置map输出key类型为ImmutableBytesWritable
                job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                //  设置map输出value类型为put
                job.setMapOutputValueClass(Put.class);
    
                //  设置job的输出格式为HFileOutputFormat2
                job.setOutputFormatClass(HFileOutputFormat2.class);
    
                // 设置文件输入输出路径
                FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
                FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
    
                //  设置HFileOutputFormat2
                HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf(tableName)));
                //  等待程序退出
                job.waitForCompletion(true);
    
    1. 编写好job的配置后,等待MapReduce程序运行完毕,创建LoadIncrementalHFiles,调用doBulkLoad方法
     //  使用buckload方式导入刚才MapReduce程序生成的HFile
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
    loader.doBulkLoad(new Path(OUTPUT_PATH),admin,table,connection.getRegionLocator(TableName.valueOf(tableName)));
    
    1. 程序编写好了后,打包,上传到服务器上

    在执行程序之前,需要创建表,如果不创建,则会自动创建
    建表语句:

    create 'user-view',
     {NAME => 'desc', BLOOMFILTER => 'ROWCOL', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'false', REPLICATION_SCOPE => '1'}, 
    {NAME => 'info', BLOOMFILTER => 'ROWCOL', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'false', REPLICATION_SCOPE => '1'},SPLITS => ['0','1', '2', '3', '4','5','6','7','8','9']
    
    1. 运行程序
    hadoop jar /louisvv/HBase-test.jar cn.louisvv.weblog.hbase.BuckLoadImport
    

    截取部分MapReduce日志如下:

    通过日志,可以看到,一共输入1999940条数据,输出1999936条数据,过滤了4条有问题的数据

    18/11/23 13:30:43 INFO mapreduce.Job: Running job: job_1542881108771_0004
    18/11/23 13:31:30 INFO mapreduce.Job:  map 0% reduce 0%
    省略....
    18/11/23 14:07:33 INFO mapreduce.Job:  map 100% reduce 100%
    18/11/23 14:07:37 INFO mapreduce.Job: Job job_1542881108771_0004 completed successfully
    18/11/23 14:07:38 INFO mapreduce.Job: Counters: 49
    	File System Counters
    		FILE: Number of bytes read=18234502087
    		FILE: Number of bytes written=36506399063
    		FILE: Number of read operations=0
    		FILE: Number of large read operations=0
    		FILE: Number of write operations=0
    		HDFS: Number of bytes read=13423862333
    		HDFS: Number of bytes written=3778584104
    		HDFS: Number of read operations=1051
    		HDFS: Number of large read operations=0
    		HDFS: Number of write operations=30
    	Job Counters 
    		Launched map tasks=200
    		Launched reduce tasks=11
    		Data-local map tasks=200
    		Total time spent by all maps in occupied slots (ms)=4528492
    		Total time spent by all reduces in occupied slots (ms)=3817650
    		Total time spent by all map tasks (ms)=2264246
    		Total time spent by all reduce tasks (ms)=1908825
    		Total vcore-milliseconds taken by all map tasks=2264246
    		Total vcore-milliseconds taken by all reduce tasks=1908825
    		Total megabyte-milliseconds taken by all map tasks=9274351616
    		Total megabyte-milliseconds taken by all reduce tasks=7818547200
    	Map-Reduce Framework
    		Map input records=1999940
    		Map output records=1999936
    		Map output bytes=18226502217
    		Map output materialized bytes=18234515161
    		Input split bytes=20400
    		Combine input records=0
    		Combine output records=0
    		Reduce input groups=1927972
    		Reduce shuffle bytes=18234515161
    		Reduce input records=1999936
    		Reduce output records=5783916
    		Spilled Records=3999872
    		Shuffled Maps =2200
    		Failed Shuffles=0
    		Merged Map outputs=2200
    		GC time elapsed (ms)=365192
    		CPU time spent (ms)=5841130
    		Physical memory (bytes) snapshot=570273415168
    		Virtual memory (bytes) snapshot=1170857234432
    		Total committed heap usage (bytes)=627039010816
    	Shuffle Errors
    		BAD_ID=0
    		CONNECTION=0
    		IO_ERROR=0
    		WRONG_LENGTH=0
    		WRONG_MAP=0
    		WRONG_REDUCE=0
    	File Input Format Counters 
    		Bytes Read=13423769333
    	File Output Format Counters 
    		Bytes Written=3778584104
    
    1. 在hdfs上查看生产的HFile文件
    • 生成的HFile目录,发现其中有一个info目录,是生成的列族目录
      在这里插入图片描述
    • 查看info目录下的内容,生产的是Region文件
      在这里插入图片描述
    1. 使用BuckLoad方法向表中导入数据:
      我这里使用的是命令行方式导入,命令如下:
    HADOOP_CLASSPATH=`/usr/hdp/2.6.0.3-8/hbase/bin/hbase classpath` hadoop jar hbase-server-1.1.2.2.6.0.3-8.jar completebulkload /yw/HFileOutput user-view
    
    数据导入成功,部分日志如下:
    
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:java.library.path=:/usr/hdp/2.6.0.3-8/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64:/usr/hdp/2.6.0.3-8/hadoop/lib/native
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:os.version=3.10.0-514.el7.x86_64
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:user.name=hdfs
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:user.home=/home/hdfs
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Client environment:user.dir=/usr/hdp/2.6.0.3-8/hbase/lib
    18/11/23 16:29:55 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=ai-main:2181,ai-node3:2181,ai-node4:2181 sessionTimeout=90000 watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@757f675c
    18/11/23 16:29:55 INFO zookeeper.ClientCnxn: Opening socket connection to server ai-node4/192.168.1.51:2181. Will not attempt to authenticate using SASL (unknown error)
    18/11/23 16:29:55 INFO zookeeper.ClientCnxn: Socket connection established to ai-node4/192.168.1.51:2181, initiating session
    18/11/23 16:29:55 INFO zookeeper.ClientCnxn: Session establishment complete on server ai-node4/192.168.1.51:2181, sessionid = 0x366665b1dbf0295, negotiated timeout = 60000
    18/11/23 16:29:57 INFO zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x46c3a14d connecting to ZooKeeper ensemble=ai-main:2181,ai-node3:2181,ai-node4:2181
    18/11/23 16:29:57 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=ai-main:2181,ai-node3:2181,ai-node4:2181 sessionTimeout=90000 watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@38fc5554
    18/11/23 16:29:57 INFO zookeeper.ClientCnxn: Opening socket connection to server ai-node3/192.168.1.50:2181. Will not attempt to authenticate using SASL (unknown error)
    18/11/23 16:29:57 INFO zookeeper.ClientCnxn: Socket connection established to ai-node3/192.168.1.50:2181, initiating session
    18/11/23 16:29:57 INFO zookeeper.ClientCnxn: Session establishment complete on server ai-node3/192.168.1.50:2181, sessionid = 0x2673ae5cb901733, negotiated timeout = 60000
    18/11/23 16:29:57 WARN mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://cluster/yw/HFileOutput/_SUCCESS
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:58 INFO hfile.CacheConfig: CacheConfig:disabled
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/f20edfdb89fc4630ae8c3791887d4852 first=80000042581 last=89999917251
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/fb6d6313abed41ef8fd5352442887031 first=00000006731 last=09999955271
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/ffa5997038414dceb9eb3b42d67b8adc first=70000014781 last=79999981941
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/4eaee167b73c41688d66440294a006d9 first=40000093231 last=49999941151
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/0c71bccc45704d129e0d0f8afce6ae5f first=1 last=19999956131
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/8b967a2cad6940619537382a2156a83c first=90000069581 last=99999997631
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/2907e292f624470ca71e4253491563f2 first=30000029371 last=39999882551
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/a67fd52d0125424b873c9ed49c0d8a7d first=20000123931 last=29999959681
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/00dcb6dc63c74d9a86a8d1ca1802b681 first=50000024931 last=59999976981
    18/11/23 16:29:59 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/yw/HFileOutput/info/c95917012c834d7991bf77830806370e first=60000015751 last=69999815851
    18/11/23 16:29:59 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
    18/11/23 16:29:59 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x2673ae5cb901733
    18/11/23 16:29:59 INFO zookeeper.ZooKeeper: Session: 0x2673ae5cb901733 closed
    18/11/23 16:29:59 INFO zookeeper.ClientCnxn: EventThread shut down
    
    1. 验证
      使用hbase shell 查看数据是否存在,就拿这条数据进行测试
    56279618741,14781697265,65:85:36:f9:b1:c0,
    1539787307-Mozilla/5.0 Macintosh; Intel Mac OS X 10_10_1 AppleWebKit/537.36 KHTML like Gecko Chrome/37.0.2062.124 Safari/537.36-https://dl.lianjia.com/ershoufang/102100802770.html-13660-6860
    &&1539786398-Mozilla/5.0 Windows NT 5.1 AppleWebKit/537.36 KHTML like Gecko Chrome/36.0.1985.67 Safari/537.36-https://dl.lianjia.com/ershoufang/102100576898.html-1959-91040
    &&1539785462-Mozilla/5.0 Windows NT 10.0 AppleWebKit/537.36 KHTML like Gecko Chrome/40.0.2214.93 Safari/537.36-https://dl.lianjia.com/ershoufang/102100762258.html-12177-53132
    

    进入hbase shell,查找该用户浏览信息

    hbase(main):002:0> get 'user-view','56279618741'
    COLUMN                                          CELL                                                                                                                                       
     info:macAddress                                timestamp=1542953074902, value=65:85:36:f9:b1:c0                                                                                           
     info:phoneNumber                               timestamp=1542953074902, value=14781697265                                                                                                 
     info:userViewHistory                           timestamp=1542953074902, value=[{"time":"1539787307","userAgent":"Mozilla/5.0 Macintosh; Intel Mac OS X 10_10_1 AppleWebKit/537.36 KHTML li
                                                    ke Gecko Chrome/37.0.2062.124 Safari/537.36","visitUrl":"https://dl.lianjia.com/ershoufang/102100802770.html","upflow":"13660","downFlow":"
                                                    6860"},{"time":"1539786398","userAgent":"Mozilla/5.0 Windows NT 5.1 AppleWebKit/537.36 KHTML like Gecko Chrome/36.0.1985.67 Safari/537.36",
                                                    "visitUrl":"https://dl.lianjia.com/ershoufang/102100576898.html","upflow":"1959","downFlow":"91040"},{"time":"1539785462","userAgent":"Mozi
                                                    lla/5.0 Windows NT 10.0 AppleWebKit/537.36 KHTML like Gecko Chrome/40.0.2214.93 Safari/537.36","visitUrl":"https://dl.lianjia.com/ershoufan
                                                    g/102100762258.html","upflow":"12177","downFlow":"53132"}]                                                      
    3 row(s) in 0.0420 seconds
    
    展开全文
  • 数据导入HBase时,若是小批量的数据,使用HBase提供的API就可以满足需求。 如果要灌入大量数据,使用API的方式导入,会占用大量的RegionServer的资源,影响该RegionServer上其他表的查询。 为了解决这种问题,HBase...

    前言

    • 数据导入HBase时,若是小批量的数据,使用HBase提供的API就可以满足需求。
    • 如果要灌入大量数据,使用API的方式导入,会占用大量的RegionServer的资源,影响该RegionServer上其他表的查询。

    为了解决这种问题,HBase官方提供了两种基于MapReduce的大量数据导入的方法:

    1. ImportTSV
    2. BuckLoad

    1. HBase导入数据的写入流程

    在这里插入图片描述

    1. 客户端Client向HRegionServer发起请求,要求写入数据。
    2. HRegionServer先将数据写入HLog中,即图中WAL过程。=》防止数据丢失
    3. HRegionServer将数据写入Memstore中,做缓存。
    4. 当内存中的数据达到阀值时,将数据Flush(刷写)到硬盘中,即写入到HFile中,并同时删除MemStore和HLog中的数据。

    2. HBase在hdfs中的存储结构

    我们知道,HBase的数据最终是以HFile的形式存储在hdfs中的,在hdfs中hbase的目录结构如下

    /HBase指定的存储目录/HBase表/表分布的Region/列族/HFile
    
    • 举个例子:
      现在有一张user表,表中有两个列族infodesc,user的region分布情况如下:(如下图示HBase WEB UI界面)
      在这里插入图片描述
    • 在hdfs中user表存储路径如下所示,无视两个.开头的文件,其他的目录名称均为上图中,Region对应的code码
      在这里插入图片描述
    • 点进去一个Region目录,目录内容如下图:
      其中info和desc这两个目录,就是HBase创建表时定义的列族,HFile存放在该目录下
      在这里插入图片描述
    • 点进去info列族这个文件夹,可以看到这个列族下的HFile
      在这里插入图片描述

    3. ImportTSV初次见面,请多指教~

    ImportTsv是HBase提供的一个命令行工具,将存储在hdfs上的数据文件,通过指定的分隔符解析后,导入到HBase表中。

    使用ImportTsv的方式导入数据与正常写入流程不同的是,跳过了WAL(Client写入到HLog)、Memcache(Client写入到MemStore)与Flush的过程,直接将HFlie文件移动到HBase表空间目录下即可,不影响HRegionServer的性能

    ImportTsv包含两种方式,能够将数据导入到HBase表中:

    1. 第一种,使用TableOutputformat在reduce端插入数据,但是这种方式导入大批量数据的时候有可能会存在问题,尤其是列比较多的宽表导入的时候,会出现RegionTooBusyException,导致数据丢失,如果数据量少,可以使用该方法
    2. 第二种(强烈推荐):先使用MapReduce程序生成HFlie文件,再执行LoadlncrementaHFlies命令,将文件移动到HBase表中对应的存储目录下。

    4. ImportTsv用法

    4.1 命令如下

    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>
    

    默认情况下,是第一种导入方法,即将数据在reduce端直接导入到HBase中

    4.2 参数说明

    • -Dimporttsv.columns=a,b,c 指定导入到HBase表中的列
    • <tablename> 表名
    • <inputdir> 数据文件在hdfs上的目录地址(直接写目录地址即可,例:/data/data.txt

    如上参数,是默认直接导入所需基本的参数,如下是根据具体上传的文件指定参数

    • 如果数据文件不是使用默认的分隔符 \t进行分割,需要指定文件分隔符:
      -Dimporttsv.separator="," 指定分隔符为逗号
    • 如果使用第二种方法,要生成HFlie文件,添加如下参数:
      -Dimporttsv.bluck.output=/path/for/output 指定生成的HFlie文件在hdfs的存储目录地址

    另外,还有一些运行参数可供选择

    • -Dimporttsv.skip.bad.lines=true / false
      在导入过程中,如果有不符合分割标准的行,被称之为badlines,设置是否跳过,如果不跳过,则Map-Reduce任务停止
    • -Dimporttsv.timestamp=currentTimeAsLong 指定导入的时间戳
    • -Dimporttsv.mapper.class=my.Mapper
      指定用户自定义的Mapper程序,用于替换默认的Mapper:org.apache.hadoop.hbase.mapreduce.TsvImporterMapper
    • -Dmapreduce.job.name=jobName 用户指定MapReduce任务名称
    • -Dcreate.table= yes/no
      如果HBase中没有创建表,是否使用ImportTSV工具创建该表,如果设置为no,则在HBase中表必须存在
    • -Dno.strict= true / false 是否忽略HBASE表中的列族检查,默认为false

    如果考虑提升导入性能,可以参考如下参数

    • -Dmapreduce.map.speculative=false 关闭Map端推断
    • -Dmapreduce.reduce.speculative=false 关闭Reduce端推断

    5. 使用ImportTsv导入HBase实例

    5.1 数据文件user.txt

    这里有一个数据文件,user.txt, 文件内容如下:如有雷同,纯属巧合
    在这里插入图片描述
    对应的列分别为:手机号反转、姓名、性别、年龄、身份证号、户籍地址、生日
    现要将数据文件导入到user表中,user表建表语句如下:
    info列簇是用户基本信息
    desc列簇是其他描述信息

    HBase创建表如下

    create 'user',
    {NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROWCOL', COMPRESSION => 'SNAPPY', VERSIONS => '1', MIN_VERSIONS => '0', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false',REPLICATION_SCOPE=>'1'},
    {NAME => 'desc', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROWCOL', COMPRESSION => 'SNAPPY', VERSIONS => '1', MIN_VERSIONS => '0', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536',IN_MEMORY => 'false', BLOCKCACHE => 'false',REPLICATION_SCOPE=>'1'},
    SPLITS => ['0','1', '2', '3', '4','5','6','7','8','9']
    

    5.2 需求

    需要将user.txt文件中的数据导入到user表中的info列族,row-key是用户手机号反转

    5.3 第一种方式导入(简单了解下,重点还是第二种)

    • 指定分隔符为逗号
    • 指定导入的列:第一列为HBASE_ROW_KEY,其他列都是info列族下的name,gender,age,idNumber,address,birthday列
    • 指定需要导入的表名:user
    • 指定数据文件在hdfs上的目录地址:/louisvv/user.txt
    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," \
    -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,info:idNumber,info:address,info:birthday \
    user /louisvv/user.txt
    

    5.4 第二种方式导入(强烈推荐,重点介绍)

    • 指定分隔符
    • 指定产生的HFlie在hdfs上存放的临时目录: /louisvv/hfile_tmp
    • 指定导入的列:第一列为hbase_row_key, 其他列都是info列族下的name,gender,age,idNumber,address,birthday列
    • 指定需要导入的表名:user
    • 指定数据文件在hdfs上的目录地址:/louisvv/user.txt
    hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," \
    -Dimporttsv.bulk.output=/louisvv/hfile_tmp \
    -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,info:idNumber,info:address,info:birthday \
    user /yw/user.txt
    
    • 截取Map-Reduce部分日志如下:
      可以看到MR的基本信息,共2000002(二百万)条数据,ImportTsv 处理的Bad Lines=0,即所有数据都成功的导入了,整个过程大约花费了1分20秒左右的时间,可以看出ImportTsv的效率还是挺快的。
    2018-10-15 16:54:24,570 INFO  [main] mapreduce.Job:  map 0% reduce 0%
    2018-10-15 16:54:35,670 INFO  [main] mapreduce.Job:  map 20% reduce 0%
    2018-10-15 16:54:38,688 INFO  [main] mapreduce.Job:  map 33% reduce 0%
    2018-10-15 16:54:45,730 INFO  [main] mapreduce.Job:  map 50% reduce 0%
    2018-10-15 16:54:54,795 INFO  [main] mapreduce.Job:  map 70% reduce 0%
    2018-10-15 16:54:57,812 INFO  [main] mapreduce.Job:  map 83% reduce 0%
    2018-10-15 16:55:05,853 INFO  [main] mapreduce.Job:  map 100% reduce 0%
    2018-10-15 16:55:13,891 INFO  [main] mapreduce.Job:  map 100% reduce 69%
    2018-10-15 16:55:16,905 INFO  [main] mapreduce.Job:  map 100% reduce 73%
    2018-10-15 16:55:19,920 INFO  [main] mapreduce.Job:  map 100% reduce 76%
    2018-10-15 16:55:22,935 INFO  [main] mapreduce.Job:  map 100% reduce 80%
    2018-10-15 16:55:25,949 INFO  [main] mapreduce.Job:  map 100% reduce 83%
    2018-10-15 16:55:28,964 INFO  [main] mapreduce.Job:  map 100% reduce 86%
    2018-10-15 16:55:31,976 INFO  [main] mapreduce.Job:  map 100% reduce 90%
    2018-10-15 16:55:34,989 INFO  [main] mapreduce.Job:  map 100% reduce 93%
    2018-10-15 16:55:38,003 INFO  [main] mapreduce.Job:  map 100% reduce 97%
    2018-10-15 16:55:41,014 INFO  [main] mapreduce.Job:  map 100% reduce 100%
    2018-10-15 16:55:41,022 INFO  [main] mapreduce.Job: Job job_1539592382677_0007 completed successfully
    2018-10-15 16:55:41,149 INFO  [main] mapreduce.Job: Counters: 50
    	File System Counters
    		FILE: Number of bytes read=437400267
    		FILE: Number of bytes written=875354426
    		FILE: Number of read operations=0
    		FILE: Number of large read operations=0
    		FILE: Number of write operations=0
    		HDFS: Number of bytes read=133551726
    		HDFS: Number of bytes written=188589865
    		HDFS: Number of read operations=11
    		HDFS: Number of large read operations=0
    		HDFS: Number of write operations=3
    	Job Counters 
    		Launched map tasks=2
    		Launched reduce tasks=1
    		Data-local map tasks=2
    		Total time spent by all maps in occupied slots (ms)=302200
    		Total time spent by all reduces in occupied slots (ms)=281168
    		Total time spent by all map tasks (ms)=37775
    		Total time spent by all reduce tasks (ms)=35146
    		Total vcore-milliseconds taken by all map tasks=37775
    		Total vcore-milliseconds taken by all reduce tasks=35146
    		Total megabyte-milliseconds taken by all map tasks=154726400
    		Total megabyte-milliseconds taken by all reduce tasks=143958016
    	Map-Reduce Framework
    		Map input records=2000002
    		Map output records=2000002
    		Map output bytes=431420770
    		Map output materialized bytes=437400273
    		Input split bytes=182
    		Combine input records=2000002
    		Combine output records=1999629
    		Reduce input groups=1999281
    		Reduce shuffle bytes=437400273
    		Reduce input records=1999629
    		Reduce output records=11995686
    		Spilled Records=3999258
    		Shuffled Maps =2
    		Failed Shuffles=0
    		Merged Map outputs=2
    		GC time elapsed (ms)=4894
    		CPU time spent (ms)=206900
    		Physical memory (bytes) snapshot=6744469504
    		Virtual memory (bytes) snapshot=16308625408
    		Total committed heap usage (bytes)=7036469248
    	ImportTsv
    		Bad Lines=0
    	Shuffle Errors
    		BAD_ID=0
    		CONNECTION=0
    		IO_ERROR=0
    		WRONG_LENGTH=0
    		WRONG_MAP=0
    		WRONG_REDUCE=0
    	File Input Format Counters 
    		Bytes Read=133551544
    	File Output Format Counters 
    		Bytes Written=188589865
    
    • 接下来要将生产的临时HFlie文件导入到HBase中
    • 指定刚刚的HFlie目录地址:/louisvv/hfile_tmp
    • 指定需要导入的表:user
    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /louisvv/hfile_tmp user
    
    • 运行日志如下
    2018-10-15 17:01:52,416 INFO  [main-SendThread(ai-main:2181)] zookeeper.ClientCnxn: Session establishment complete on server ai-main/192.168.1.22:2181, sessionid = 0x166666be050001d, negotiated timeout = 60000
    2018-10-15 17:01:52,781 WARN  [main] mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://cluster/louisvv/hfile_tmp/_SUCCESS
    2018-10-15 17:01:53,249 INFO  [LoadIncrementalHFiles-0] hfile.CacheConfig: CacheConfig:disabled
    2018-10-15 17:01:53,300 INFO  [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy]
    2018-10-15 17:01:53,331 INFO  [LoadIncrementalHFiles-0] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://cluster/louisvv/hfile_tmp/info/1c6888ef7b014b90908e48c47597ca0a first=13010007758 last=18999999393
    2018-10-15 17:01:53,742 INFO  [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
    2018-10-15 17:01:53,743 INFO  [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x166666be050001d
    2018-10-15 17:01:53,744 INFO  [main] zookeeper.ZooKeeper: Session: 0x166666be050001d closed
    2018-10-15 17:01:53,745 INFO  [main-EventThread] zookeeper.ClientCnxn: EventThread shut down
    
    • 验证
      使用hbase shell对数据进行验证,因为数据太多了,不能使用scan操作,这里就随机挑一个row_key为68398017181,我们进行一下get
      在这里插入图片描述
      查询结果如下:
      这里看到,中文在HBase中是以十六进制保存的,我们需要验证一下name字段是否和我们查询的相符即可
    	hbase(main):003:0> get 'user','68398017181'
    COLUMN                                          CELL                                                                                                                                       
     info:address                                   timestamp=1539756695119, value=\xE6\x89\xB6\xE9\xA3\x8E\xE5\x8E\xBF                                                                        
     info:age                                       timestamp=1539756695119, value=36                                                                                                          
     info:birthday                                  timestamp=1539756695119, value=198xxxxx                                                                                                   
     info:gender                                    timestamp=1539756695119, value=\xE5\xA5\xB3                                                                                                
     info:idNumber                                  timestamp=1539756695119, value=610XXXXXXXXXXX5584                                                                                          
     info:name                                      timestamp=1539756695119, value=\xE5\xBA\x9E\xE7\xBE\x8E\xE8\xAF\x97                                                                        
    6 row(s) in 0.0530 seconds
    

    我使用python程序进行转码:
    结果正确,则数据导入正确

    >>> print '\xE5\xBA\x9E\xE7\xBE\x8E\xE8\xAF\x97'.decode('utf-8')
    庞美诗
    

    如果喜欢本文章,请用小手点个赞~

    展开全文
  • 海量数据存储,HBase表中的数据能够容纳上百亿行*上百万列。2.面向列的存储,数据在表中是按照列进行存储的,能够动态的增加列并对列进行各种操作。3.准实时查询,HBase在海量的数据量下能够接近准实时的查询(百毫秒...
  • 通过Bulk Load导入HBase海量数据

    千次阅读 2015-07-16 19:27:19
    如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源,一个比较高效便捷的方法就是使用“Bulk Load”方法,即HBase提供的HFileOutputFormat类。 它是利用hbase数据信息按照特定格式存储在...
  • HBase海量数据存储

    2020-04-20 11:33:48
    1.海量数据存储,HBase表中的数据能够容纳上百亿行*上百万列。 2.面向列的存储,数据在表中是按照列进行存储的,能够动态的增加列并对列进行各种操作。 3.准实时查询,HBase在海量的数据量下能够接近准实时的查询...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 6,989
精华内容 2,795
关键字:

hbase海量数据导入