精华内容
下载资源
问答
  • HBase批量插入数据

    千次阅读 2019-08-06 14:18:40
    其实下文内容更适合在Spark中作为任务去执行,为了讲解,我先单独拎出来了,使用场景的话其实也很明显,就是大规模将数据写入HBase中。 关键点 大数据组件服务不可用(如断点、磁盘爆炸等)暂时不是本文内容所考虑...

    写在前面

    其实下文内容更适合在Spark中作为任务去执行,为了讲解,我先单独拎出来了,使用场景的话其实也很明显,就是大规模将数据写入HBase中。

    关键点

    1. 大数据组件服务不可用(如断点、磁盘爆炸等)暂时不是本文内容所考虑的内容。

    2. 首先,使用HBase1.0以上版本才支持的BufferedMutator,对HBase执行异步写入操作,使用mutate(List<? extends Mutation> var1)去执行写入操作。
      建议手动设定writeBufferSize参数,使用spark时设置为10 * 1024 * 1024(10MB),普通java程序设置为50 * 1024 * 1024(50MB),不宜过大。

    3. 加入容错机制,即BufferedMutator.ExceptionListener,防止在批量插入时因为触发Region Split或Region Blance等导致Region的短暂下线的相关异常,在接收到NotServingRegionException等异常时会进行一次重试。在重试之前需要将线程sleep几秒,不放心的话可以sleep几十秒,也是问题不大的。
      因为在组件可用的情况下,亿级数据一次split的操作在毫秒级的时间内就能完成,所以仅仅是一次重试便能在很大程度上减少插入数据时丢失数据的情况,并且对速度几乎无影响。

    4. 不要一次性插入几百万条数据,不仅内存吃不消而且插入速度也异常缓慢,个人实践下来每个批次3000-5000左右的数据量最佳。

    实现代码

    下面是具体代码,截取部分内容,其中关于HBase的连接和写入的表与列簇都需要自行修改。。。

            //HBase连接
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("zookeeper.znode.parent", "127.0.0.1");
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            configuration.set("hbase.zookeeper.quorum", "hbase-unsecure");
            Connection connection = null;
            BufferedMutator table = null;
            try {
                //异常处理
                final BufferedMutator.ExceptionListener listener = (e, mutator) -> {
                    String failTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now());
                    System.out.println("fail time :" + failTime + " ,insert data fail,cause:" + e.getCause(0) + ",failed num:" + e.getNumExceptions());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                    //重试
                    String retryTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now());
                    System.out.println("its time to retry:" + retryTime);
                    for (int i = 0; i < e.getNumExceptions(); i++) {
                        org.apache.hadoop.hbase.client.Row row = null;
                        try {
                            row = e.getRow(i);
                            mutator.mutate((Put) row);
                        } catch (IOException ex) {
                            System.out.println("insert data fail,please check hbase status and row info : " + row);
                        }
                    }
                };
                //表名自行修改
                BufferedMutatorParams htConfig = new BufferedMutatorParams(TableName.valueOf("wangelai")).writeBufferSize(10 * 1024 * 1024).listener(listener);
                connection = ConnectionFactory.createConnection(configuration);
                table = connection.getBufferedMutator(htConfig);
                int count = 0;
                List<Put> puts = new ArrayList<>(2048);
                //模拟插入数据
                for (int i = 0; i < 10000000; i++) {
                    Put p = new Put(Bytes.toBytes(String.valueOf(i)));
                    p.addColumn(Bytes.toBytes("default"), Bytes.toBytes("id"), Bytes.toBytes(i));
                    puts.add(p);
                    count++;
                    if (count % 3000 == 0) {
                        System.out.println("count:" + count);
                        table.mutate(puts);
                        puts = new ArrayList<>(2048);
                    }
                }
                // 提交最后的内容
                System.out.println("Total count:" + count);
                table.mutate(puts);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (table != null) {
                    try {
                        table.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    

    尾记

    不知不觉就到8月了,之间居然四个月没写博客,果然忙起来之后时间就过得飞快,哎。

    另外,VG加油,荣耀属于西恩!

    展开全文
  • Hbase 批量插入(mapReduce)

    千次阅读 2017-01-03 14:57:03
    Hbase利用MapReduce批量插入数据

     如果用Hbase提供的APi只能做少量的数据的Put,如果数据量很大10W以上用MapReudce还是很方便,单机测试10W数据插入Hbase不到1秒

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     */
    public class HbaseBatchImport {
    
        static class BatchMapper extends Mapper<LongWritable, Text, Text, Text> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSS");
            Text text = new Text();
    
            protected void map(LongWritable key, Text value, Context context) {
                try {
                    final String[] spliteds = value.toString().split("::");
                    Date date = new Date();
                    String dateFormat = simpleDateFormat.format(date);
                    final String rowKey = spliteds[0] + "_" + dateFormat;
                    text.set(rowKey);
                    context.write(text, value);
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    
        static class BatchReducer extends TableReducer<Text, Text, NullWritable> {
    
            protected void reduce(Text key, Iterable<Text> values, Context context) {
    
                for (Text tx : values) {
                    try {
                        final String[] arrays = tx.toString().split("::");
                        Put put = new Put(key.getBytes());
                        put.addColumn("info".getBytes(), "name".getBytes(), arrays[1].getBytes());
                        put.addColumn("info".getBytes(), "address".getBytes(), arrays[2].getBytes());
                        put.addColumn("info".getBytes(), Bytes.toBytes("age"), arrays[3].getBytes());
                        context.write(NullWritable.get(), put);
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    
        public static void main(String[] args) throws Exception {
    
    
            final Configuration configuration = new Configuration();
            //设置zookeeper
            configuration.set("hbase.zookeeper.quorum", "127.0.0.1");
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            //设置hbase表名称
            configuration.set(TableOutputFormat.OUTPUT_TABLE, "students");
            //将该值改大,防止hbase超时退出
            configuration.set("dfs.socket.timeout", "180000");
    
            final Job job = new Job(configuration, "HBaseBatchImport");
            //设置reduce的个数
            job.setNumReduceTasks(3);
            job.setMapperClass(BatchMapper.class);
            job.setReducerClass(BatchReducer.class);
            //设置map的输出,不设置reduce的输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            //不再设置输出路径,而是设置输出格式类型
            job.setOutputFormatClass(TableOutputFormat.class);
            //设置数据的输入路径
            FileInputFormat.setInputPaths(job, "hdfs://MacBook-Pro.local:9000/input");
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    


    展开全文
  • hbase 批量插入api

    千次阅读 2015-07-27 21:18:02
    conf.set("hbase.rootdir", "hdfs://192.168.80.20:9000/hbase"); conf.set(TableOutputFormat.OUTPUT_TABLE, TableName); Job job = new Job(conf, HBaseImport.class.getSimpleName()); ...

    1、数据格式a.txt:

    1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
    1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
    1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
    1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
    1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
    1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
    1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
    1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
    1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
    1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
    1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
    1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
    1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
    1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
    1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
    1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
    1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
    1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
    1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
    1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
    1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200
    1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200


    2、hbase 创建表 create 'wlan','cf'


    3、代码

    package com.utils;


    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;


    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;


    public class HBaseImport {
    static class BatchMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
    @Override
    protected void map(LongWritable key, Text value,
    Mapper<LongWritable, Text, LongWritable, Text>.Context context)
    throws IOException, InterruptedException {
    String line = value.toString();
    String[] splited = line.split("\t");
    SimpleDateFormat simpleDateFormatimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
    String format = simpleDateFormatimpleDateFormat.format(new Date(Long.parseLong(splited[0].trim())));
    String rowKey=splited[1]+"_"+format;
    Text v2s = new Text();
    v2s.set(rowKey+"\t"+line);
    context.write(key, v2s);
    }
    }
    static class BatchReducer extends TableReducer<LongWritable, Text, NullWritable>{
    private String family="cf";//列族


    @Override
    protected void reduce(LongWritable arg0, Iterable<Text> v2s,
    Reducer<LongWritable, Text, NullWritable, Mutation>.Context context)
    throws IOException, InterruptedException {
    for (Text v2 : v2s) {
    String[] splited = v2.toString().split("\t");
    String rowKey = splited[0];
    Put put = new Put(rowKey.getBytes());
    put.add(family.getBytes(), "raw".getBytes(), v2.toString().getBytes());
    put.add(family.getBytes(), "rePortTime".getBytes(), splited[1].getBytes());
    put.add(family.getBytes(), "msisdn".getBytes(), splited[2].getBytes());
    put.add(family.getBytes(), "apmac".getBytes(), splited[3].getBytes());
    put.add(family.getBytes(), "acmac".getBytes(), splited[4].getBytes());
    put.add(family.getBytes(), "host".getBytes(), splited[5].getBytes());
    put.add(family.getBytes(), "siteType".getBytes(), splited[6].getBytes());
    put.add(family.getBytes(), "upPackNum".getBytes(), splited[7].getBytes());
    put.add(family.getBytes(), "downPackNum".getBytes(), splited[8].getBytes());
    put.add(family.getBytes(), "upPayLoad".getBytes(), splited[9].getBytes());
    put.add(family.getBytes(), "downPayLoad".getBytes(), splited[10].getBytes());
    put.add(family.getBytes(), "httpStatus".getBytes(), splited[11].getBytes());
    context.write(NullWritable.get(), put);
    }
    }
    }
    private static final String TableName = "waln_log";
    public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum","192.168.80.20,192.168.80.21,192.168.80.22");
    //conf.set("hbase.rootdir", "hdfs://cluster/hbase");
    conf.set("hbase.rootdir", "hdfs://192.168.80.20:9000/hbase");
    conf.set(TableOutputFormat.OUTPUT_TABLE, TableName);

    Job job = new Job(conf, HBaseImport.class.getSimpleName());
    TableMapReduceUtil.addDependencyJars(job);
    job.setJarByClass(HBaseImport.class);

    job.setMapperClass(BatchMapper.class);
    job.setReducerClass(BatchReducer.class);

    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TableOutputFormat.class);

    FileInputFormat.setInputPaths(job, "hdfs://192.168.80.20:9000/data");
    System.out.println("xxxxxxx1xxxxxxxx");
    job.waitForCompletion(true);
    }
    }



    展开全文
  • 1.客户端在进行大量put操作的时候,最好开启缓冲区 ...因为每一次Put都是一次RPC调用,开启缓冲区,批量插入,减少 RPC调用的次数 HTable.setAutoFlush(false); //新版本没有这个方法 2.或者使用putlist ...

    1.客户端在进行大量put操作的时候,最好开启缓冲区

    因为每一次Put都是一次RPC调用,开启缓冲区,批量插入,减少

    RPC调用的次数

    HTable.setAutoFlush(false); //新版本没有这个方法

     

    2.或者使用putlist

    展开全文
  • 利用JAVAHbase表中批量插入数据 插入数据的文件格式要求: 1.文件名格式为:命名空间名_表名_时间戳.时间戳 如:dsj_test_1624591726565.1624591726565 2.文件内格式: 第一行用于描述表结构 :行键 ,列簇名1:列名...
  • Hbase1.1.2采用javaAPI插入批量数据

    千次阅读 2015-11-13 14:40:18
    最近在做采用javaAPI批量Hbase插值的工作,记录一下Configuration cfg = HbaseConfiguration.create(); Connnection conn = ConnectionFactory.createConnection(); Table table = conn.getTable(TableName.value...
  • 标题这么麻烦,其实是为了让大家能搜到我啦,嘻嘻,这篇文章主要的内容就是本地有个txt文件,或者其他格式的文件,使用java,把这个文件里数据批量导入到hbase的某个表中。 这只是个简单的笨办法啦、适合类似我...
  • HBase批量写入数据

    千次阅读 2019-04-02 09:38:43
    HBase批量写入数据
  • hbase phoenix批量插入数据例子

    万次阅读 2013-10-18 15:43:10
    package com.hbase; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Stat
  • HBase 数据的插入可以使用Java API 来写Java 程序逐条倒入,但是不是很方便。利用Hive自带的一个Jar包,可以建立Hive和HBase的映射关系 利用Hive 的insert可以将批量数 据导入到HBase中,还可以通过 Hql 语句进行...
  • 批量插入Hbase数据

    千次阅读 2019-01-08 20:49:10
    批量Hbase插入数据,需要知道表名,列名,列簇等字段即可。指定Hbase链接     package com.cslc.asiancup.dfstohbase; import com.cslc.asiancup.utils.HbaseUtilJava; import org.apache.hadoop.conf....
  • HBase新版本Java API

    2019-01-09 21:12:56
    ###HBase新版本Java API 之前没有码全,这次增删改查全乎了,网上有很多例子,自己根据实际在用的收集总结了一下 导入的包 import java.io.IOException; import java.util.Arrays; import java.util.List; ...
  • hbase批量入库遇到的坑

    万次阅读 热门讨论 2018-01-20 15:16:05
    hbase批量入库的总结 最近这一段时间一直在研究hbase的批量入库,看似简单的问题其实埋着无数的坑...... 接下来就把我遇到的一些问题和解决的办法分享给大家,希望能让那些新接触到的人不至于像我一样走这么多...
  • HBase Shell及JavaAPI操作

    千次阅读 2017-08-12 16:36:30
    HBase Shell 及 Java API 操作
  • HBase--JavaAPI数据增删改查导入依赖以及创建连接对象和admin对象添加数据(更新)添加多条数据添加多条记录查询查询单条方式一(NavigableMap)查询单条方式二(CellScanner)查询所有一查询所有二删除 hbase版本:2.3.4 ...
  • Java,python操作Hbase 操作Hbase python操作Hbase 安装Thrift之前所需准备 安装Thrift 产生针对Python的Hbase的API 启动Thrift服务 执行python文件,对hbase进行操作 ...从Hbase批量读记录 pyt...
  • hbase client(java

    2016-04-13 13:41:00
    package hbase; import java.io.IOException;import java.util.ArrayList;import java.util.Collection;import java.util.Iterator;import java.util.List;import java.util.ListIterator;import java.util.Map; i...
  • hbase批量入库的总结

    千次阅读 2018-12-11 16:22:06
    最近这一段时间一直在研究hbase批量入库,看似简单的问题其实埋着无数的坑......&nbsp;&nbsp;接下来就把我遇到的一些问题和解决的办法分享给大家,希望能让那些新接触到的人不至于像我一样走这么多弯路。...
  • Hbase使用笔记java

    2020-08-28 14:30:46
    1.配置文件 依赖 <dependency> ...org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.1</version> <exclusions>
  • 1、批量查询数据 import com.alicloud.openservices.tablestore.SyncClient; import com.alicloud.openservices...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,122
精华内容 2,048
关键字:

hbase批量插入java

java 订阅