精华内容
下载资源
问答
  • HBase批量写入数据

    千次阅读 2019-04-02 09:38:43
    HBase批量写入数据

    一、HBase安装

    1、上传解压

    2、修改环境变量

    vi /etc/profile

    export HBASE_HOME=/home/hadoop/hbase
    export PATH=$PATH:$HBASE_HOME/bin
    

    3、修改配置文件

    vi hbase-env.sh

    export JAVA_HOME=/usr/jdk/
    export JAVA_CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export HBASE_OPTS="-XX:+UseConcMarkSweepGC"
    export HBASE_MANAGES_ZK=false
    

    vi hbase-site.xml

    <configuration>
    <property>
    <name>hbase.master</name>
    <value>master1:60000</value>
    </property>
    <property>
    <name>hbase.master.maxclockskew</name> 
    <value>180000</value>
    </property>
    <property>
    <name>hbase.rootdir</name>
    <value>hdfs://hadoop-cluster1/hbase</value>
    </property>
    <property>
    <name>hbase.cluster.distributed</name> 
    <value>true</value>
    </property>
    <property>
    <name>hbase.zookeeper.quorum</name>
    <value>master1ha,master2,master2ha</value>
    </property>
    <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hadoop/hbase/tmp/zookeeper</value>
    </property>
    </configuration>
    

    vi regionservers

    h2slave1
    h2slave2
    h2slave3
    

    4、发送到其他机器

    scp –r /home/hadoop/hbase hadoop@slave1:/home/hadoop
    scp –r /home/hadoop/hbase hadoop@slave2:/home/hadoop
    scp –r /home/hadoop/hbase hadoop@slave3:/home/hadoop
    

    5、启动

    start-hbase.sh
    

    6、访问

    http://master:60010/ 
    

    二、HBase数据模型

    在这里插入图片描述

    Row Key(行健): 用来检索记录的主键。

    访问HBase table中的行,只有三种方式:
    1.通过单个row key访问
    2.通过row key的range(正则)
    3.全表扫描

    Columns Family(列簇):

    HBase表中的每个列,都归属于某个列族。

    三、HBase命令

    创建表

    create '表名', '列族名1','列族名2','列族名N'
    

    查看所有表

    list
    

    描述表

    describe  ‘表名’
    

    判断表存在

    exists  '表名'
    

    判断是否禁用启用表

    is_enabled '表名'
    is_disabled ‘表名’
    

    添加记录

    put  ‘表名’, ‘rowKey’, ‘列族 : 列‘  ,  '值'
    

    查看记录rowkey下的所有数据

    get  '表名' , 'rowKey'
    

    查看表中的记录总数

    count  '表名'
    

    获取某个列族

    get '表名','rowkey','列族'
    

    获取某个列族的某个列

    get '表名','rowkey','列族:列’
    

    删除记录

    delete  ‘表名’ ,‘行名’ , ‘列族:列'
    

    删除整行

    deleteall '表名','rowkey'
    

    删除一张表

    先要屏蔽该表,才能对该表进行删除
    第一步 disable ‘表名’ ,第二步  drop '表名'
    

    清空表

    truncate '表名'
    

    查看所有记录

    scan "表名"  
    

    查看某个表某个列中所有数据

    scan "表名" , {COLUMNS=>'列族名:列名'}
    

    四、Java API操作HBase

    Hbase依赖zookeeper

    public class HbaseTest {
       /**
         * 配置ss
         */
        static Configuration config = null;
        private Connection connection = null;
        private Table table = null;
    
    	@Before
    	public void init() throws Exception {
    		config = HBaseConfiguration.create();// 配置
    		config.set("hbase.zookeeper.quorum", "master1-vsp.com");// zookeeper地址
    		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
    		connection = ConnectionFactory.createConnection(config);
    		table = connection.getTable(TableName.valueOf("test4"));//表名
    	}
    
    	/**
    	 * 创建一个表
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	public void createTable() throws Exception {
    		// 创建表管理类
    		HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
    		// 创建表描述类
    		TableName tableName = TableName.valueOf("test4"); // 表名称
    		HTableDescriptor desc = new HTableDescriptor(tableName);
    		// 创建列族的描述类
    		HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
    		// 将列族添加到表中
    		desc.addFamily(family);
    		HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
    		// 将列族添加到表中
    		desc.addFamily(family2);
    		// 创建表
    		admin.createTable(desc); // 创建表
    	}
        
    	/**
    	 * 删除一个表
    	 * 
    	 * @throws Exception
    	 */
    	@Test
    	@SuppressWarnings("deprecation")
    	public void deleteTable() throws MasterNotRunningException,
    			ZooKeeperConnectionException, Exception {
    		HBaseAdmin admin = new HBaseAdmin(config);
    		admin.disableTable("test4");
    		admin.deleteTable("test4");
    		admin.close();
    	}
    
    	/**
    	 * 向hbase中增加数据
    	 * @throws Exception
    	 */
    	@SuppressWarnings({ "deprecation", "resource" })
    	@Test
    	public void insertData() throws Exception {
    		
    		Put put = new Put("wangsanfeng_1234".getBytes());
    		put.add("info".getBytes(),"name".getBytes(),"zhangsan".getBytes());
    		put.add("info".getBytes(),"age".getBytes(),"23".getBytes());
    		put.add("info".getBytes(),"sex".getBytes(),"nan".getBytes());
    		put.add("info".getBytes(),"address".getBytes(),"safa".getBytes());
    	
    		table.put(put);
    		
    	}
    	
    	 /**
          * 单条查询
    	  * @throws Exception
    	  */
    		@Test
    		public void queryData() throws Exception {
    			Get get = new Get(Bytes.toBytes("1234"));
    			Result result = table.get(get);
    			System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
    			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("namessss"))));
    			System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex"))));
    		}
    	
    		/**
    		 * 全表扫描
    		 * @throws Exception
    		 */
    		@Test
    		public void scanData() throws Exception {
    			Scan scan = new Scan();
    			//scan.addFamily(Bytes.toBytes("info"));
    			//scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("password"));
    			scan.setStartRow(Bytes.toBytes("wangsf_0"));
    			scan.setStopRow(Bytes.toBytes("wangwu"));
    			ResultScanner scanner = table.getScanner(scan);
    			for (Result result : scanner) {
    				System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
    				System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
    				//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
    				//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
    			}
    		}
    	
    		/**
    		 * 全表扫描的过滤器  列值过滤器
    		 * @throws Exception
    		 */
    		@Test
    		public void scanDataByFilter1() throws Exception {
    	
    			// 创建全表扫描的scan
    			Scan scan = new Scan();
    			//过滤器:列值过滤器
    			SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"),
    					Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
    					Bytes.toBytes("zhangsan2"));
    			// 设置过滤器
    			scan.setFilter(filter);
    	
    			// 打印结果集
    			ResultScanner scanner = table.getScanner(scan);
    			for (Result result : scanner) {
    				System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
    				System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
    				//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
    				//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
    			}
    	
    		}
    		
    		/**
    		 * rowkey过滤器
    		 * @throws Exception
    		 */
    		@Test
    		public void scanDataByFilter2() throws Exception {
    			
    			// 创建全表扫描的scan
    			Scan scan = new Scan();
    			//匹配rowkey以wangsenfeng开头的
    			RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^12341"));
    			// 设置过滤器
    			scan.setFilter(filter);
    			// 打印结果集
    			ResultScanner scanner = table.getScanner(scan);
    			for (Result result : scanner) {
    				System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("password"))));
    				System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))));
    				//System.out.println(Bytes.toInt(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("password"))));
    				//System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"))));
    			}			
    		}
    		
    		/**
    		 * 匹配列名前缀过滤器
    		 * @throws Exception
    		 */
    		@Test
    		public void scanDataByFilter3() throws Exception {
    			
    			// 创建全表扫描的scan
    			Scan scan = new Scan();
    			//匹配rowkey以wangsenfeng开头的
    			ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("na"));
    			// 设置过滤器
    			scan.setFilter(filter);
    			// 打印结果集
    			ResultScanner scanner = table.getScanner(scan);
    			for (Result result : scanner) {
    				System.out.println("rowkey:" + Bytes.toString(result.getRow()));
    				System.out.println("info:name:"
    						+ Bytes.toString(result.getValue(Bytes.toBytes("info"),
    								Bytes.toBytes("name"))));
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")) != null) {
    					System.out.println("info:age:"
    							+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
    									Bytes.toBytes("age"))));
    				}
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex")) != null) {
    					System.out.println("infi:sex:"
    							+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
    									Bytes.toBytes("sex"))));
    				}
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name")) != null) {
    					System.out
    					.println("info2:name:"
    							+ Bytes.toString(result.getValue(
    									Bytes.toBytes("info2"),
    									Bytes.toBytes("name"))));
    				}
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age")) != null) {
    					System.out.println("info2:age:"
    							+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
    									Bytes.toBytes("age"))));
    				}
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("sex")) != null) {
    					System.out.println("info2:sex:"
    							+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
    									Bytes.toBytes("sex"))));
    				}
    			}
    			
    		}
    		
    		/**
    		 * 过滤器集合
    		 * @throws Exception
    		 */
    		@Test
    		public void scanDataByFilter4() throws Exception {
    			
    			// 创建全表扫描的scan
    			Scan scan = new Scan();
    			//过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)
    			FilterList filterList = new FilterList(Operator.MUST_PASS_ONE);
    			//匹配rowkey以wangsenfeng开头的
    			RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("^wangsenfeng"));
    			//匹配name的值等于wangsenfeng
    			SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("info"),
    					Bytes.toBytes("name"), CompareFilter.CompareOp.EQUAL,
    					Bytes.toBytes("zhangsan"));
    			filterList.addFilter(filter);
    			filterList.addFilter(filter2);
    			// 设置过滤器
    			scan.setFilter(filterList);
    			// 打印结果集
    			ResultScanner scanner = table.getScanner(scan);
    			for (Result result : scanner) {
    				System.out.println("rowkey:" + Bytes.toString(result.getRow()));
    				System.out.println("info:name:"
    						+ Bytes.toString(result.getValue(Bytes.toBytes("info"),
    								Bytes.toBytes("name"))));
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")) != null) {
    					System.out.println("info:age:"
    							+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
    									Bytes.toBytes("age"))));
    				}
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("sex")) != null) {
    					System.out.println("infi:sex:"
    							+ Bytes.toInt(result.getValue(Bytes.toBytes("info"),
    									Bytes.toBytes("sex"))));
    				}
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name")) != null) {
    					System.out
    					.println("info2:name:"
    							+ Bytes.toString(result.getValue(
    									Bytes.toBytes("info2"),
    									Bytes.toBytes("name"))));
    				}
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age")) != null) {
    					System.out.println("info2:age:"
    							+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
    									Bytes.toBytes("age"))));
    				}
    				// 判断取出来的值是否为空
    				if (result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("sex")) != null) {
    					System.out.println("info2:sex:"
    							+ Bytes.toInt(result.getValue(Bytes.toBytes("info2"),
    									Bytes.toBytes("sex"))));
    				}
    			}
    }
    

    五、HBase批量写入数据

    	/**
    	 * 配置类
    	 */
    	public class ServerConfigs {
    	
    		public static final String ZK = "10.68.24.161:2181";//zookeper配置
    	    
    	    public static final String TOPIC = "ad_upload_event";
    	 
    	    //public static final String BROKER_LIST = "172.17.245.23:9092";//kalfa配置
    	 
    	    public static final String GROUP_ID = "test_group";
    	 
    	    public static final int CACHE_LIST_SIZE = 100; //批量提交数据条数
    	}
    	
    	/**
          * 工具类
          */
    	public class HBaseUtils {
    	    
    		ThreadLocal<List<Put>> threadLocal = new ThreadLocal<List<Put>>();
    	    HBaseAdmin admin = null;
    	    Connection conn = null;
    	    
    	    /**
    	     * 根据表名获取到HTable实例
    	     */
    	    public HTable getTable(String tableName) {
    	 
    	        HTable table = null;
    	        try {
    	            // table = new HTable(configuration, tableName);
    	            final TableName tname = TableName.valueOf(tableName);
    	            table = (HTable) conn.getTable(tname);
    	 
    	        } catch (IOException e) {
    	            e.printStackTrace();
    	        }
    	 
    	        return table;
    	    }
    	    
    	    /**
    	     * 添加单条记录到HBase表
    	     *
    	     * @param tableName HBase表名
    	     * @param rowkey    HBase表的rowkey
    	     * @param cf        HBase表的columnfamily
    	     * @param column    HBase表的列key
    	     * @param value     写入HBase表的值value
    	     */
    	    public void put(String tableName, String rowkey, String cf, String column, String value) {
    	 
    	        HTable table = getTable(tableName);
    	        Put put = new Put(Bytes.toBytes(rowkey));
    	        put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
    	        try {
    	            table.put(put);
    	        } catch (IOException e) {
    	            e.printStackTrace();
    	        }
    	    }
    	    
    	    
    	    /**
    	     * 批量添加记录到HBase表,同一线程要保证对相同表进行添加操作!
    	     *
    	     * @param tableName HBase表名
    	     * @param rowkey    HBase表的rowkey
    	     * @param cf        HBase表的columnfamily
    	     * @param column    HBase表的列key
    	     * @param value     写入HBase表的值value
    	     */
    	    public void bulkput(String tableName, String rowkey, String cf, String column, String value) {
    	        try {
    	            List<Put> list = threadLocal.get();
    	            if (list == null) {
    	                list = new ArrayList<Put>();
    	            }
    	            Put put = new Put(Bytes.toBytes(rowkey));
    	            put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
    	            list.add(put);
    	            if (list.size() >= ServerConfigs.CACHE_LIST_SIZE) {
    	                HTable table = getTable(tableName);
    	                table.put(list);
    	                list.clear();
    	            } else {
    	                threadLocal.set(list);
    	            }
    	       //  table.flushCommits();
    	        } catch (IOException e) {
    	            e.printStackTrace();
    	        }
    	    }
    	 
    	
    	    private static HBaseUtils instance = null;
    	    
    	    public static synchronized HBaseUtils getInstance() {
    	        if (null == instance) {
    	            instance = new HBaseUtils();
    	        }
    	        return instance;
    	    }
    	    
    	    
    	    private HBaseUtils() {
    	        Configuration configuration = new Configuration();
    	        configuration.set("hbase.zookeeper.quorum", ServerConfigs.ZK);
    	        configuration.set("hbase.rootdir", "hdfs://10.68.24.161:8020/hbase");
    	 
    	        try {
    	            conn = ConnectionFactory.createConnection(configuration);
    	            admin = new HBaseAdmin(configuration);
    	        } catch (IOException e) {
    	            e.printStackTrace();
    	        }
    	    }
    	
    	}
    	
        /**
          * 双线程,插入20W条数据,用了20s
          */
        public class Test {
    		  
           public static void main(String[] args) {
     
    	       //HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
    	       //System.out.println(table.getName().getNameAsString());
    	       long start = System.currentTimeMillis();
    	
    	       String tableName = "test4";
    	       // String rowkey = "1";
    	       for (int i = 0; i < 100000; i++) {
    	           HBaseUtils.getInstance().bulkput(tableName, i + "", "info", "name", String.valueOf(100321 + i));
    	       }
    	       new Thread(new Runnable() {
    	           public void run() {
    	               for (int i = 100000; i < 200000; i++) {
    	                   HBaseUtils.getInstance().bulkput("test4", i + "", "info", "name", String.valueOf(100321 + i));
    	               }
    	           }
    	       }).start();
    	 
    	       System.out.println(System.currentTimeMillis() - start);
    	   }
    	 
    }
    

    更多java、大数据学习面试资料,请扫码关注我的公众号:
    在这里插入图片描述

    展开全文
  • 想优化hbase批量写入速度,搞了个偏门套路,也不知道会不会出问题: 1)使用HTable , hTable = new HTable(conf, TableName.valueOf("storm_wxgz")); hTable.setWriteBufferSize(10 * 1024 * 1024);//10M ...

    想优化hbase批量写入速度,搞了个偏门套路,也不知道会不会出问题:

       1)使用HTable ,

    hTable = new HTable(conf, TableName.valueOf("storm_wxgz"));
    hTable.setWriteBufferSize(10 * 1024 * 1024);//10M
    hTable.setAutoFlushTo(false);

    2)使用定时器,每一秒去刷新一次

     

     

    也不知道对查询是否有影响。

     

    展开全文
  • scala实现hbase批量写入

    千次阅读 2019-08-19 18:29:28
    一、编程环境: 1、JDK1.8 2、scala2.11.7 3、hadoop3.0.0、hbase2.1.0、spark2.4.0 ...因为我这里使用使用的是cdh发行版本,hbase的jar包位置为:/opt/cloudera/parcels/CDH/lib/hbase/lib/ ...

    一、编程环境:

    1、JDK1.8

    2、scala2.11.7

    3、hadoop3.0.0、hbase2.1.0、spark2.4.0

    操作系统:centos7.6,编译器使用idea2019

    二、实现步骤:

    1、添加hbase的jar包

    因为我这里使用使用的是cdh发行版本,hbase的jar包位置为:/opt/cloudera/parcels/CDH/lib/hbase/lib/

    将该目录下的jar文件拷贝至特定目录,如下:

    #mkdir /data/lib/hbase

    #cp /opt/cloudera/parcels/CDH/lib/hbase/lib/* /data/lib/hbase

    2、使用idea创建scala项目,maven添加如下依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <hadoop.version>3.0.0</hadoop.version>
      </properties>
    
      <dependencyManagement>
        <dependencies>
          <!-- Camel BOM -->
          <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-parent</artifactId>
            <version>2.24.1</version>
            <scope>import</scope>
            <type>pom</type>
          </dependency>
        </dependencies>
      </dependencyManagement>
    
      <dependencies>
    
        <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-core</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-scala</artifactId>
        </dependency>
    
        <!-- scala -->
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>2.11.7</version>
        </dependency>
        <dependency>
          <groupId>org.scala-lang.modules</groupId>
          <artifactId>scala-xml_2.11</artifactId>
          <version>1.0.6</version>
        </dependency>
    
        <!-- logging -->
        <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-api</artifactId>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-core</artifactId>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-slf4j-impl</artifactId>
          <scope>runtime</scope>
        </dependency>
    
        <!-- testing -->
        <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-test</artifactId>
          <scope>test</scope>
        </dependency>
    
        <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.6.6</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.2.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>2.2.0</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.2.0</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-core</artifactId>
          <version>1.2.1</version>
        </dependency>
    
    <!--    <dependency>-->
    <!--      <groupId>org.apache.hbase</groupId>-->
    <!--      <artifactId>hbase-client</artifactId>-->
    <!--      <version>2.1.0</version>-->
    <!--    </dependency>-->
    
    <!--    <dependency>-->
    <!--      <groupId>org.apache.hbase</groupId>-->
    <!--      <artifactId>hbase-server</artifactId>-->
    <!--      <version>2.1.0</version>-->
    <!--    </dependency>-->
    
    <!--    <dependency>-->
    <!--      <groupId>org.apache.hbase</groupId>-->
    <!--      <artifactId>hbase-common</artifactId>-->
    <!--      <version>2.1.0</version>-->
    <!--    </dependency>-->
    
    <!--    <dependency>-->
    <!--      <groupId>org.apache.hbase</groupId>-->
    <!--      <artifactId>hbase</artifactId>-->
    <!--      <version>2.1.0</version>-->
    <!--      <type>pom</type>-->
    <!--    </dependency>-->
    
    <!--    <dependency>-->
    <!--      <groupId>org.apache.hbase</groupId>-->
    <!--      <artifactId>hbase-mapreduce</artifactId>-->
    <!--      <version>2.1.0</version>-->
    <!--    </dependency>-->
    
        <dependency>
          <groupId>org.glassfish</groupId>
          <artifactId>javax.el</artifactId>
          <version>3.0.1-b08</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>${hadoop.version}</version>
          <exclusions>
            <exclusion>
              <artifactId>jackson-databind</artifactId>
              <groupId>com.fasterxml.jackson.core</groupId>
            </exclusion>
          </exclusions>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          <exclusions>
            <exclusion>
              <artifactId>jackson-databind</artifactId>
              <groupId>com.fasterxml.jackson.core</groupId>
            </exclusion>
          </exclusions>
        </dependency>
    
        <dependency>
          <groupId>org.gdal</groupId>
          <artifactId>gdal</artifactId>
          <version>2.4.0</version>
        </dependency>
    
    
      </dependencies>

    注意:这里引入的hadoop版本尽量和系统中安装的版本一致,以免出现代码冲突的情况。

    3、idea添加hbase的jar包

    点击idea的file-Project Structure设置,将步骤1中创建的jar包引入,如下:

    4、添加maven的打包工具,这一步非必要,如果使用idea自带的打包工具无法正常打包,可以使用该方式。

    <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.5.5</version>
            <configuration>
              <archive>
                <manifest>
                  <mainClass>**this is your mainclass**</mainClass>
                </manifest>
              </archive>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
            </configuration>
    
            <executions>
              <execution>
                <id>make-assemble</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>

    5、编写代码

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object HbaseOpe {
      def writeTxtToHbase()={
        val spark = SparkSession.builder().appName("SparkHBaseRDD").getOrCreate()
    
        val sc = spark.sparkContext
    
        //表名称
        val tablename= "tb:table3"
    
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    
        val jobConf = new JobConf(hbaseConf)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
    
        //hdfs文件
        val txtpath="/ZF/2018001.txt"
        val txtRdd=sc.textFile(txtpath)
        txtRdd.map(_.split(",")).map(arr=>{
          val put = new Put(Bytes.toBytes(arr(0)))
          put.addColumn(Bytes.toBytes("static"),Bytes.toBytes("col1"),Bytes.toBytes(arr(1)))
          put.addColumn(Bytes.toBytes("static"),Bytes.toBytes("col2"),Bytes.toBytes(arr(2)))
          put.addColumn(Bytes.toBytes("static"),Bytes.toBytes("col3"),Bytes.toBytes(arr(3)))
          put.addColumn(Bytes.toBytes("static"),Bytes.toBytes("col4"),Bytes.toBytes(arr(4)))
          put.addColumn(Bytes.toBytes("static"),Bytes.toBytes("col5"),Bytes.toBytes(arr(5)))
          put.addColumn(Bytes.toBytes("static"),Bytes.toBytes("col6"),Bytes.toBytes(arr(6)))
          put.addColumn(Bytes.toBytes("static"),Bytes.toBytes("col7"),Bytes.toBytes(arr(7)))
          (new ImmutableBytesWritable, put)
        }).saveAsHadoopDataset(jobConf)
      }
    
    }

    使用spark-submit提交,我这里测试的是10000000条记录,7列数据,三台主机,共耗时4分钟左右。

    展开全文
  • 5.hbase 批量导入数据

    千次阅读 2019-03-13 09:21:02
    导入HBase 自定义MR 生成HFile 导入HBase 注意事项 源码下载 Bulk Loading 向hbase写入数据常用两种方式: 客户端 API写入 Mapreduce任务TableOutputFormat格式输出 然而,对于超大量的数据写入,这两种...
     
     
    

    Bulk Loading

    向hbase写入数据常用两种方式:

    • 客户端 API写入
    • Mapreduce任务TableOutputFormat格式输出

    然而,对于超大量的数据写入,这两种方式都不合适,会非常占用内存和耗时而且JVM GC和Hbase compaction都会急剧增加。为此,Hbase提供了另一种很好的解决方式——Bulk Loading,顾名思义就是批量导入,它通过预生成Hbase存储格式文件Hfile然后直接拷贝到对应RegionServer的方式减少导入时间和集群性能损耗。

    包括如下几步:

    • 从数据库或其他源抽取待写入数据,写入HBase所在HDFS集群
    • 数据预生成为HFile文件,注意生成的HFile文件和导入HDFS的文件差不多大,主要保证集群空间足够大
    • 导入数据到HBase

    后两步又分为不同场景处理:

    • 对于Hbase列不变的表,可以使用预定义工具importtsv来完成整个过程
    • 但是对于列一直在动态变化的表,必须自己写MR程序来完成整个过程

    Importtsv

    使用importtsv工具生成HFile必须保证导入HDFS的文件内容格式为tsv,也就是每行内容是使用相同分隔符分割的同字段数的内容,第一字段对应row key,后面每列对应hbase表的一字段(Qualifier),类似如下:

    row1^v11^v12^v13
    row2^v21^v22^v23
    row3^v31^v32^v33
    

    生成HFile

    首先,如下生成HFile,参数表示从HDFS处$input读入数据到表$table中,数据以^分割,对应的数据表字段名为依次为c1、c2、c3,生成的HFile保存在$output处。

    ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.4.4.jar importtsv \
            -Dimporttsv.columns=HBASE_ROW_KEY,b:c1,b:c2,b:c3 \
    		-Dimporttsv.separator=^ \
            -Dimporttsv.bulk.output=$output \
            $table $input \
            >> $log_file 2>&1
    

    导入HBase

    然后,如下将生成的HFile导入Hbase 表$table中。

    ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.4.4.jar completebulkload $output $table >> log/${table}-load.log 2>&1
    

    自定义MR

    使用importtsv固然很好,但是如果导入的数据列名是动态变化的,比如如下每行数据对应的hbase 字段名和个数都不一样,这种情况就没法用importtsv来处理,需要自己写MR来处理生成HFile和加载到HBase。

    row1 (k1,v11) (k2,v12)
    row2 (k2,v22) (k3,v23) (k4,v24)
    row3 (k3,v31)
    

    生成HFile

    这种情况需要自行考虑MR的输入,因为具体每列数据对应的字段名到底是什么,是没办法像importtsv一样提前指定的,因此必须在输入数据中包含此信息,比如如下,用|分别分割行键、字段名列表、对应值列表。

    row1|k1^k2|v11^v22
    row2|k2^k3^k4|v22^c23^v24
    row3|k3|v31
    
    • 设定格式

    这里为了生成HFile格式,MR的输入输出和中间map结果设置如下,最终生成文件格式为HFileOutputFormat2。

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat2.class);
    
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    
    • 设定 Mapper

    任务的Mapper部分主要完成读入并拆分数据,生成指定格式中间文件即可,如下,按照之前说的数据输入,先按照|拆分出三部分,按照rowKey,colKey和colVal填入put中写入文件即可,这里的列族COL_FAMILY是强制指定的。

    	public void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {
    		String line = value.toString();
            String[] list = line.split("\\|", -1);
            long length = list.length;
    
            //每行数据输入格式为rowkey|c1^c2^c3|v1^v2^v3
            if( length != 3 || list[1].isEmpty() || list[2].isEmpty()) {
                return;
            }
    
            String Rowkey=list[0];
    
            String[] colKey = list[1].split("\\^", -1);
            String[] colVal = list[2].split("\\^", -1);
            if (colKey.length!=colVal.length){
                return;
            }
    
            long colLen = colKey.length;
    
            //拼装rowkey和put
            ImmutableBytesWritable PutRowkey=new ImmutableBytesWritable(Rowkey.getBytes());
            Put put=new Put(Rowkey.getBytes());
    
            for (int i= 0; i<colLen; i++){
                if (!colVal[i].isEmpty()){
                    put.addColumn(COL_FAMILY.getBytes(), colKey[i].getBytes(), colVal[i].getBytes());
                }
            }
    
            context.write(PutRowkey, put);
        }
    
    • 设定Reducer

    任务的Reducer不用自己指定,如下HFileOutputFormat2.configureIncrementalLoad会根据hbase对应表信息设置reducer和对应的reducer 个数。

    //配置MapReduce作业,以执行增量加载到给定表中
    Configuration hbaseConf=HBaseConfiguration.create();
    hbaseConf.addResource(new Path("/home/wenzhou/hbase/conf/hbase-site.xml"));
    
    Connection conn=ConnectionFactory.createConnection(hbaseConf);
    Table table=conn.getTable(TableName.valueOf(tname));
    Admin admin=conn.getAdmin();
    
    HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf(tname)));
    

    导入HBase

    确认上述数据生成成功后,如下即可加载到hbase表中。

    //生成的HFile Bulkload导入
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf);
    loader.doBulkLoad(new Path(outputReal), admin, table, conn.getRegionLocator(TableName.valueOf(tname)));
    

    注意事项

    • 实际reducer个数和表的region个数相同,大量数据首次导入必须预分区,否则默认只有一个redion对应一个reducer,很容易Mem Overflow且数据倾斜厉害导致耗时很长

    源码下载

    对应源码下载链接,可直接用于生产环境

    开发参考文档

    原创,转载请注明来自

    展开全文
  • 第10章 HBase:Hadoop数据库10.7 HBase 批量导入10.7.1 批量导入数据的方法向HBase表中导入一条数据可以使用HBase Shell的put命令或HBase API的Put类,但是面对海量数据如何批量导入呢? 一般有三种方法:ImportTsv...
  • hbase超大量数据导入方式,原理就是提前生成相应的Hfile文件,再导入hbase中。 要导入数据的列相同的情况,使用现成的预定义工具importtsv 要导入数据的列不相同的情况,自定义MR实现 importtsv 说明:$output为...
  • Hbase批量导入&批量删除

    千次阅读 2019-04-01 12:11:37
    * 将list按行写入到txt文件中 * @param strings phonelist * @param path 输出路径 * @throws Exception 异常 */ private void writeFileContext(List<String> strings, String path) throws Exception { ...
  • Hbase 批量写入操作

    2013-11-17 15:21:29
    Code  Nmr List List Size Code Avarage Elapsed Time (sec) 1 List&lt;Put&gt;batchAllRows; 250.000 table.setAutoFlush(false); for (Put mRow : batchAllRows) {...}ta...
  • HBase 批量导入测试

    2015-04-22 02:49:02
    导入方法1. 采用mapreduce, 在mapper里按行产生Put, 然后用HTable.put, 不用reducer. 测试性能不忍直视, 直接排除。 导入方法2. 采用mapreduce, 先生成HFile, 再用LoadIncrementalHFiles. 用时1小时50分钟,才完成...
  • hbase批量导入之bulkloader使用实战

    千次阅读 2015-11-19 17:04:01
    HBase大量导入数据时,使用client方式将会导致大量网络IO以及损耗集群计算资源,Hbase官方的bulkloader可以很好解决这个场景。 bulkloader支持将写成HFile格式的数据直接放入HBase,这个过程分为: 1.准备HFile...
  • 文章目录BulkLoad将大规模数据导入HBase一、数据准备二、上传到HDFS上三、通过MR生成Hfile文件四、加载到HBase中五、查看数据HBase中数据 BulkLoad将大规模数据导入HBase 有个1个T的文件,我要导入Hbase有什么方法?...
  • 在用该方法之前,请确保hadoop、hbase、phoenix已经启动。在phoenix的安装目录下使用以下代码,启动MRHADOOP_CLASSPATH=/opt/hbase-0.98.8-hadoop2/conf hadoop jar phoenix-4.9.0-HBase-0.98-client.jar org.apache...
  • * HBase操作工具类:缓存模式多线程批量提交作业到hbase * * @Auther: ning.zhang * @Email: ning.zhang@phicomm.com * @CreateDate: 2018/7/30 */ public class HBaseUtils { ThreadLocal&lt;List&...
  • 1.创建一个hadoop项目(导入hadoop相关包,hbase-0.90.5.jar,zookeeper-3.3.2.jar) 2.创建测试数据(2.txt 上传至hdfs://127.0.0.1:9000/tmp/2.txt) 1150,content,email,xx@gmail.com1152,content,email,xx@cc....
  • 2、部署hbase1.0.0 3、下载phoenix-4.6.0-HBase-1.0。下载地址(http://mirror.nus.edu.sg/apache/phoenix/phoenix-4.6.0-HBase-1.0/bin/phoenix-4.6.0-HBase-1.0-bin.tar.gz) 4、phoenix 集成HBas...
  • HBASE批量数据导入

    2014-06-26 17:19:09
    把MYSQL中的数据导入hbase中,采用HBASE自带的BULK加载工具完成。过程分三步: 1.从mysql中导出数据为CSV或TSV格式的文本文件 2.利用importtsv工具转换Tsv文件为hbase的数据文件格式HFILE 3.利用...
  • spark批量导入hbase

    千次阅读 2016-12-17 10:19:27
    hbase批量导入

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 15,079
精华内容 6,031
关键字:

hbase批量写入