精华内容
下载资源
问答
  • CDH SQOOP 2实例

    千次阅读 2016-12-13 15:39:25
    1. CDH sqoop2环境搭建 添加服务,选择sqoop2,然后下一步,遇到错误修改错误。 2. 脚本命令查看 参考 http://sqoop.apache.org/ http://sqoop.apache.org/docs/1.99.5/Sqoop5MinutesDemo.html   2.1 查看连接 ...

       以前数据抽取都是kettle ETL,测试一下sqoop

    1. CDH sqoop2环境搭建

    添加服务,选择sqoop2,然后下一步,遇到错误修改错误。


    2. 脚本命令查看

    参考 http://sqoop.apache.org/

    http://sqoop.apache.org/docs/1.99.5/Sqoop5MinutesDemo.html

     

    2.1 查看连接

    show connector

     

    CDH5.5支持四种类型

    JDBC,kite,hdfs,kafka;其中kafka只支持接收

     

    2.2 查看连接

    show link

     

    2.3 查看任务

    show job

    2.4 创建连接

    sqoop:000> create link -c 1

    数据序号是connector编号,然后不能类型填写不同参数

     

    2.5 创建任务

    sqoop:000> create job -f 1 -t 2

    从一个link(目标数据源)到另一个link(目的数据源),编号是link编号,编号名字不能重复

    2.6 启动任务

    sqoop:000> start job -j 1

    启动所选的任务,编号是任务编号

     

    3. Java代码

    3.1 实例从mysql到hdfs

    将student表抽到hdfs

     

     

    packagecom.bocom;
     
    importorg.apache.sqoop.client.SqoopClient;
    importorg.apache.sqoop.model.MConnector;
    importorg.apache.sqoop.model.MDriverConfig;
    importorg.apache.sqoop.model.MFromConfig;
    importorg.apache.sqoop.model.MJob;
    importorg.apache.sqoop.model.MLink;
    importorg.apache.sqoop.model.MLinkConfig;
    importorg.apache.sqoop.model.MSubmission;
    importorg.apache.sqoop.model.MToConfig;
    importorg.apache.sqoop.submission.counter.Counter;
    importorg.apache.sqoop.submission.counter.CounterGroup;
    importorg.apache.sqoop.submission.counter.Counters;
    importorg.apache.sqoop.validation.Status;
     
    publicclass MysqlToHDFS {
        public static void main(String[] args) {
            sqoopTransfer();
        }
        public static void sqoopTransfer() {
            //初始化
            String url ="http://server4:12000/sqoop/";
            SqoopClient client = newSqoopClient(url);
            client.setServerUrl(url);
            int num=55;
           
            //创建一个源链接 JDBC
            long fromConnectorId = 1;
            MLink fromLink =client.createLink(fromConnectorId);
            fromLink.setName("JDBC"+num);
    //       fromLink.setCreationUser("liuwei");
            MLinkConfig fromLinkConfig =fromLink.getConnectorLinkConfig();
           fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://192.168.3.142/mysql");
           fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
           fromLinkConfig.getStringInput("linkConfig.username").setValue("root");
           fromLinkConfig.getStringInput("linkConfig.password").setValue("1234");
            Status fromStatus =client.saveLink(fromLink);
            System.out.println(fromStatus);
            if(fromStatus.canProceed()) {
             System.out.println("创建JDBC Link成功,ID为:" + fromLink.getPersistenceId());
            } else {
             System.out.println("创建JDBC Link失败");
            }
         
           
           
            //创建一个目的地链接HDFS
            long toConnectorId = 3;
            MLink toLink =client.createLink(toConnectorId);
            toLink.setName("HDFS"+num);
           toLink.setCreationUser("admln");
            MLinkConfig toLinkConfig =toLink.getConnectorLinkConfig();
            System.out.println(toLinkConfig);
           toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://server4:8020/");
            Status toStatus =client.saveLink(toLink);
            if(toStatus.canProceed()) {
             System.out.println("创建HDFS Link成功,ID为:" + toLink.getPersistenceId());
            } else {
             System.out.println("创建HDFS Link失败");
            }
           
          //创建一个任务
            long fromLinkId =fromLink.getPersistenceId();
            long toLinkId =toLink.getPersistenceId();
            MJob job = client.createJob(fromLinkId,toLinkId);
            job.setName("MySQL to HDFSjob"+num);
           job.setCreationUser("hadoop");
            //设置源链接任务配置信息
            MFromConfig fromJobConfig = job.getFromJobConfig();
           System.out.println("****"+fromJobConfig);
           fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("mysql");
           fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("student");
            fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
    //       fromJobConfig.getStringInput("fromJobConfig.columns").setValue("id,name");
            MToConfig toJobConfig =job.getToJobConfig();
           System.out.println("===="+toJobConfig);
    //       toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
           toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
           toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
            toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/sqoop/student");
            MDriverConfig driverConfig =job.getDriverConfig();
           driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
     
            Status status = client.saveJob(job);
           System.out.println("===="+status);
            if(status.canProceed()) {
             System.out.println("JOB创建成功,ID为:"+ job.getPersistenceId());
            } else {
             System.out.println("JOB创建失败。");
            }
           
           
           
          //启动任务
            long jobId = job.getPersistenceId();
    //        long jobId = 4;
            MSubmission submission =client.startJob(jobId);
            System.out.println("JOB提交状态为 : " + submission.getStatus());
            while(submission.getStatus().isRunning()&& submission.getProgress() != -1) {
              System.out.println("进度 : " + String.format("%.2f%%", submission.getProgress() * 100));
              //三秒报告一次进度
              try {
                Thread.sleep(3000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
            System.out.println("JOB执行结束... ...");
            System.out.println("Hadoop任务ID为:" + submission.getExternalJobId());
            Counters counters =submission.getCounters();
            if(counters != null) {
              System.out.println("计数器:");
              for(CounterGroup group : counters) {
                System.out.print("\t");
               System.out.println(group.getName());
                for(Counter counter : group) {
                  System.out.print("\t\t");
                 System.out.print(counter.getName());
                  System.out.print(": ");
                 System.out.println(counter.getValue());
                }
              }
            }
           if(submission.getError().getErrorSummary() != null) {
              System.out.println("JOB执行异常,异常信息为 : " +submission.getError());
            }
            System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");
        }
    }


    3.2 实例从hdfs到mysql

    插入到student_test表

    Select count(*)from student_test

    共一百万数据

     

    packagecom.bocom;
     
    importorg.apache.sqoop.client.SqoopClient;
    importorg.apache.sqoop.model.MDriverConfig;
    importorg.apache.sqoop.model.MFromConfig;
    importorg.apache.sqoop.model.MJob;
    importorg.apache.sqoop.model.MLink;
    importorg.apache.sqoop.model.MLinkConfig;
    importorg.apache.sqoop.model.MSubmission;
    importorg.apache.sqoop.model.MToConfig;
    importorg.apache.sqoop.submission.counter.Counter;
    importorg.apache.sqoop.submission.counter.CounterGroup;
    importorg.apache.sqoop.submission.counter.Counters;
    importorg.apache.sqoop.validation.Status;
     
    publicclass HDFSToMysql {
        public static void main(String[] args) {
            sqoopTransfer();
        }
        public static void sqoopTransfer() {
            //初始化
            String url ="http://server4:12000/sqoop/";
            SqoopClient client = newSqoopClient(url);
            int num=205;
           
            //创建一个源链接 HDFS
            long fromConnectorId = 3;
            MLink fromLink =client.createLink(fromConnectorId);
            fromLink.setName("HDFSconnector"+num);
           fromLink.setCreationUser("admln");
            MLinkConfig fromLinkConfig =fromLink.getConnectorLinkConfig();
           fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://server4:8020/");
            Status fromStatus =client.saveLink(fromLink);
            if(fromStatus.canProceed()) {
             System.out.println("创建HDFS Link成功,ID为:" + fromLink.getPersistenceId());
            } else {
             System.out.println("创建HDFS Link失败");
            }
            //创建一个目的地链接 JDBC
            long toConnectorId = 1;
            MLink toLink = client.createLink(toConnectorId);
            toLink.setName("JDBCconnector"+num);
           toLink.setCreationUser("admln");
            MLinkConfig toLinkConfig =toLink.getConnectorLinkConfig();
           toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://192.168.3.142/mysql");
           toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
           toLinkConfig.getStringInput("linkConfig.username").setValue("root");
           toLinkConfig.getStringInput("linkConfig.password").setValue("1234");
            Status toStatus =client.saveLink(toLink);
            if(toStatus.canProceed()) {
             System.out.println("创建JDBC Link成功,ID为:" + toLink.getPersistenceId());
            } else {
             System.out.println("创建JDBC Link失败");
            }
           
            //创建一个任务
            long fromLinkId =fromLink.getPersistenceId();
            long toLinkId =toLink.getPersistenceId();
            MJob job = client.createJob(fromLinkId,toLinkId);
            job.setName("HDFS to MySQLjob"+num);
            job.setCreationUser("admln");
            //设置源链接任务配置信息
            MFromConfig fromJobConfig =job.getFromJobConfig();
           fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/sqoop/student/");
           
            //创建目的地链接任务配置信息
            MToConfig toJobConfig =job.getToJobConfig();
           toJobConfig.getStringInput("toJobConfig.schemaName").setValue("mysql");
           toJobConfig.getStringInput("toJobConfig.tableName").setValue("student_test");
           //toJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
            // set the driver config values
    //        MDriverConfig driverConfig =job.getDriverConfig();
    //       driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);//这句还没弄明白
            Status status = client.saveJob(job);
            if(status.canProceed()) {
             System.out.println("JOB创建成功,ID为:"+ job.getPersistenceId());
            } else {
             System.out.println("JOB创建失败。");
            }
           
            //启动任务
            long jobId = job.getPersistenceId();
            MSubmission submission =client.startJob(jobId);
            System.out.println("JOB提交状态为 : " + submission.getStatus());
           while(submission.getStatus().isRunning() &&submission.getProgress() != -1) {
              System.out.println("进度 : " + String.format("%.2f%%", submission.getProgress() * 100));
              //三秒报告一次进度
              try {
                Thread.sleep(3000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
            System.out.println("JOB执行结束... ...");
            System.out.println("Hadoop任务ID为:" + submission.getExternalJobId());
            Counters counters =submission.getCounters();
            if(counters != null) {
              System.out.println("计数器:");
              for(CounterGroup group : counters) {
                System.out.print("\t");
               System.out.println(group.getName());
                for(Counter counter : group) {
                 System.out.print("\t\t");
                  System.out.print(counter.getName());
                  System.out.print(": ");
                 System.out.println(counter.getValue());
                }
              }
            }
           if(submission.getError().getErrorSummary() != null) {
              System.out.println("JOB执行异常,异常信息为 : " +submission.getError());
            }
            System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");
        }
    }


    展开全文
  • CDH配置sqoop

    2021-01-21 13:38:48
    配置mysql驱动 下载mysql驱动到cdh的jars路径下 mysql驱动下载地址 root@cdh1:/opt/cloudera/parcels/CDH/jars# ll mysql-connector-java-5.1.49-bin.jar -rw-r--r-- 1 root root 1006906 1月 21 09:17 mysql-...

    添加sqoop服务

    1. 打开cluster
      在这里插入图片描述
    2. 添加服务
      在这里插入图片描述
    3. 选择sqoop2,然后一路继续即可
      在这里插入图片描述

    在这里插入图片描述
    4. 安装完成后查看服务

    在这里插入图片描述

    配置mysql驱动

    1. 下载mysql驱动到cdh的jars路径下

    mysql驱动下载地址

    root@cdh1:/opt/cloudera/parcels/CDH/jars# ll mysql-connector-java-5.1.49-bin.jar 
    -rw-r--r-- 1 root root 1006906 1月  21 09:17 mysql-connector-java-5.1.49-bin.jar
    
    
    1. 给sqoop配置mysql驱动,创建软连接
    root@cdh8:/opt/cloudera/parcels/CDH/lib/sqoop/lib# ln -s ../../../jars/mysql-connector-java-5.1.49-bin.jar mysql-connector-java-5.1.49-bin.jar
    

    4.连接mysql查看数据库

    root@cdh3:~# sqoop list-databases --connect jdbc:mysql://cdh3:3306/ --username xxx --password xxxx
    Warning: /opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/bin/../lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
    Please set $ACCUMULO_HOME to the root of your Accumulo installation.
    21/01/21 13:31:41 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.14.2
    21/01/21 13:31:41 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
    21/01/21 13:31:41 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
    Thu Jan 21 13:31:41 CST 2021 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
    information_schema
    
    展开全文
  • Sqoop配置与基本操作

    2020-11-18 16:19:58
    Sqoop配置与基本操作一、Sqoop配置1、解压文件2、拷贝jar包3、配置sqoop.env4、配置环境变量5、启动mysql,hadoop,hive服务6、测试二、Sqoop基本操作 一、Sqoop配置 1、解压文件 #解压sqoop压缩文件 [root@chust01...

    一、Sqoop配置

    1、解压文件

    #解压sqoop压缩文件
    	[root@chust01 hadoop]# tar -zxvf /opt/download/hadoop/sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz -C ./
    #查看解压后的文件,改名为简易版名称sqoop146
    	[root@chust01 hadoop]# ls
    	[root@chust01 hadoop]# mv sqoop-1.4.6.bin__hadoop-2.0.4-alpha/ sqoop146
    	[root@chust01 hadoop]# ls
    #查看权限,权限低的话可以授权为777权限
    	[root@chust01 hadoop]# ls -la
    

    2、拷贝jar包

    #进入lib下面
    	[root@chust01 hadoop]# cd sqoop146/lib/
    #拷贝mysql
    	[root@chust01 lib]# cp /opt/download/hadoop/mysql-connector-java-5.1.32.jar ./
    #拷贝hadoop-common
    	[root@chust01 lib]# cp /opt/software/hadoop/hadoop260/share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar ./
    #拷贝hadoop-hdfs
    	[root@chust01 lib]# cp /opt/software/hadoop/hadoop260/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar ./
    #拷贝hadoop-mapreduce-client-core
    	[root@chust01 lib]# cp /opt/software/hadoop/hadoop260/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0-cdh5.14.2.jar ./
    

    3、配置sqoop.env

    #进入到conf/目录下,查看env文件,复制一份出来改个名字然后配置,防止报错
    	[root@chust01 lib]# cd ../conf/
    	[root@chust01 conf]# ls
    	oraoop-site-template.xml  sqoop-env-template.sh    sqoop-site.xml
    	sqoop-env-template.cmd    sqoop-site-template.xml
    	[root@chust01 conf]# cp sqoop-env-template.sh sqoop-env.sh
    	[root@chust01 conf]# ls
    	oraoop-site-template.xml  sqoop-env-template.cmd  sqoop-site-template.xml
    	sqoop-env.sh              sqoop-env-template.sh   sqoop-site.xml
    #编辑sqoop-env.sh
    	[root@chust01 conf]# vi sqoop-env.sh
    #sqoop-env.sh中5个地方的路径配置上,并且删掉前面的#号
    	#Set path to where bin/hadoop is available
    	export HADOOP_COMMON_HOME=/opt/software/hadoop/hadoop260
    	#Set path to where hadoop-*-core.jar is available
    	export HADOOP_MAPRED_HOME=/opt/software/hadoop/hadoop260/share/hadoop/mapreduce
    	#set the path to where bin/hbase is available
    	export HBASE_HOME=/opt/software/hadoop/hbase120
    	#Set the path to where bin/hive is available
    	export HIVE_HOME=/opt/software/hadoop/hive110
    	#Set the path for where zookeper config dir is
    	export ZOOCFGDIR=/opt/software/hadoop/zookeeper345
    

    4、配置环境变量

    	#在sqoop主目录新建一个日志目录mylog,出了问题可以查看该日志
    		[root@chust01 sqoop146]# mkdir mylog
    	#配置常规项bin和日志目录mylog
    		export JAVA_HOME=/opt/software/jdk180
    		export HADOOP_HOME=/opt/software/hadoop/hadoop260
    		export HIVE_HOME=/opt/software/hadoop/hive110
    		export ZK_HOME=/opt/software/hadoop/zookeeper345
    		export HBASE_HOME=/opt/software/hadoop/hbase120
    		export SQOOP_HOME=/opt/software/hadoop/sqoop146export PATH=$SQOOP_HOME/bin:$HBASE_HOME/bin:$ZK_HOME/bin:$ZK_HOME/sbin:$HIVE_HOME/bin:$HIVE_HOME/lib:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$JAVA_HOME/bin:$PATH
    		export CLASS_PATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
    		export LOGDIR=$SQOOP_HOME/mylog/
    	#激活,把配置信息读到内存中
    		[root@chust01 conf]# source /etc/profile
    		[root@chust01 conf]# echo $SQOOP_HOME
    

    5、启动mysql,hadoop,hive服务

    	[root@chust01 conf]# systemctl status mysql
    	[root@chust01 conf]# start-all.sh
    	[root@chust01 conf]# nohup hive --service metastore>/dev/null 2>&1 &
    	[root@chust01 conf]# nohup hive --service hiveserver2>/dev/null 2>&1 &
    

    6、测试

    #测试命令,连上和登陆数据库,连接上之后会出现当前数据库的列表,否则失败。在虚拟机中中,回车默认是提交符,需要增加\转义符
    
    sqoop list-databases \
    --connect jdbc:mysql://192.168.221.140:3306 \
    --username root \
    --password kb10
    

    二、Sqoop基本操作

    sqoop 是 apache 旗下一款“Hadoop 和关系数据库服务器之间传送数据”的工具,本质还是一个命令行工具,核心功能就是导入和导出:
    导入数据:MySQL导入数据到 Hadoop 的 HDFS、HIVE、HBASE 等数据存储系统
    导出数据:从 Hadoop 的文件系统中导出数据到关系数据库mysql 等

    1、mysql->hdfs,把mysql(RDBMS)的数据导入到hdfs

    测试1:
    全部导入

    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --table staff \
    --target-dir '/kb10/test1119' \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by '\t'
    

    测试2:
    导入指定部分的列

    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --target-dir '/kb10/test111901' \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by '\t' \
    --columns name,sex \
    --table staff
    

    测试3:
    导入部分行,select查询结果导入,如果用的双引号,$CONDITIONS前面要加转义符,否则会被识别为shell的变量报错。

    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --target-dir '/kb10/test111902' \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by '\t' \
    --query 'select name,sex from staff where id<=1 and $CONDITIONS;'
    

    测试4:
    使用sqoop关键词筛选查询导入数据,使用where子句

    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --target-dir '/kb10/test111903' \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by '\t' \
    --table staff \
    --where 'id=1'
    

    2、hdfs->mysql,使用sqoop把hdfs里的文件导出到mysql(RDBMS)里

    需要现在mysql里新建一个表,这里新建了一个数据库company,新建表的名字叫hive_staff
    a).在MySQL创建需要的表结构:

    	create table hive_staff(
    	id int,
    	name varchar(50),
    	sex varchar(20),
    	);
    

    b).写下面的命令

    sqoop export \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --table hive_staff \
    --num-mappers 1 \
    --export-dir '/kb10/test1119' \
    --input-fields-terminated-by '\t'
    

    如果文件是压缩格式的,使用hcatalog才能传;如果不是,也可以用hcatalog传
    需要先配环境变量,目录为hive目录下的hcatalog,配置完即可使用hcatalog-database、table直接传hive表至MySQL里。不过要注意MySQL里varchar设定的字符串范围

    sqoop export \
    --connect jdbc:mysql://single:3306/dim_intes \
    --username root \
    --password root \
    --table dim_users \
    --num-mappers 3 \
    --hcatalog-database dwd_intes \
    --hcatalog-table dwd_users
    

    3、mysql->hive,从mysql(RDBMS)中导入到HIVE

    该过程分为两步,第一步将数据导入到 HDFS,第二步将导入到 HDFS 的数据迁移到Hive 仓库,第一步默认的临时目录暂时未知,需要再研究下

    • 新建好表
    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --table staff \
    --num-mappers 1 \
    --hive-import \
    --fields-terminated-by '\t' \
    --hive-overwrite \
    --delete-target-dir \
    --hive-table xym.staff_hive
    
    sqoop import  \
    --connect jdbc:mysql://192.168.221.140:3306/retail_db  \
    --username root  \
    --password kb10 \
    --table orders \
    --hive-import \
    --create-hive-table \
    --hive-database retail_db \
    --hive-table orders \
    --m 3
    

    4、mysql->hbase,把mysql(RDBMS)的数据导入到 Hbase

    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --table staff \
    --columns 'id,name,sex' \
    --hbase-create-table \
    --hbase-table kb10:hbase_staff2 \
    --column-family 'info' \
    --hbase-row-key 'id' \
    --num-mappers 1 \
    --split-by id \
    --hbase-bulkload
    

    split-by是类似分区的
    bulkload是用sqoop去完成HDFS路径的转移和指定

    4.1、bulkload可能遇到的问题

    1、使用hbase中的reduce默认配置为1
    2、数据量超过某个范围会失败
    解决方案:
    1、预分区表(启动多个reduce)

    create "ns:bulkloadtable","family",{SPLITS=>["1","2","3","4","5","6","7","8","9"]}
    

    2、修改单核HRegion的大小上限(分裂阀值,默认大小为1G)

    <property>
    <name>hbase.hregion.max.filesize</name>
    <value>10737418240</value>
    </property>
    

    3、修改单分区单列簇的HFile文件数量上限(默认32个)

    <property>
    <name>hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily</name>
    <value>3200</value>
    </property>
    

    5、hbase->hive,hbase导出到hive

    这个步骤不需要连接操作mysql,但需要提前把hive和hbase里的jar包互相拷贝,相应的jar包如下(在hive的bin目录下):

    #hive的lib下面的jar包拷到hbase的lib下面
    [root@chust01 lib]# cp hive-hbase-handler-1.1.0-cdh5.14.2.jar /opt/software/hadoop/hbase120/lib/
    #hbase的lib下面的jar包拷到hive的lib下面
    [root@chust01 lib]# cp /opt/software/hadoop/hbase120/lib/hbase-client-1.2.0-cdh5.14.2.jar ./
    [root@chust01 lib]# cp /opt/software/hadoop/hbase120/lib/hbase-hadoop2-compat-1.2.0-cdh5.14.2.jar ./
    [root@chust01 lib]# cp /opt/software/hadoop/hbase120/lib/hbase-hadoop-compat-1.2.0-cdh5.14.2.jar ./
    [root@chust01 lib]# cp /opt/software/hadoop/hbase120/lib/hbase-it-1.2.0-cdh5.14.2.jar ./
    [root@chust01 lib]# cp /opt/software/hadoop/hbase120/lib/hbase-server-1.2.0-cdh5.14.2.jar ./

    这个操作不需要连接mysql数据库,直接在hive里创建表结构,然后把hbase的数据映射过去即可

    create external table hbase_student(sid int,student_id int,course_id int,score int)
    stored by  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties("hbase.columns.mapping" = ":key,scores:student_id,scores:course_id,scores:score")
    tblproperties("hbase.table.name" = "kb10:mysql_stu")
    

    6、mysql->hdfs,根据指定的字段条件增量导出

    –incremental append只支持新增不支持更新
    –table TABLE_NAME 或 --query SELECT_COMMAND
    –split-by 和 -m结合实现numberReduceTasks并行
    –check-column 和 --last-value 结合实现–check-column :where sid>5

    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --query "select * from staff where \$CONDITIONS" \
    --target-dir /kb10/test111904 \
    --split-by id \
    --num-mappers 2 \
    --check-column id \
    --incremental append \
    --last-value 0
    

    在mysql里插入增量,接着上一条的value值(可在上一步执行完后看到value值)
    insert into TABLE_NAME(…) values(…),(),(),();
    insert into staff(name,sex) values(‘x2’,‘男’),(‘x4’,‘男’),(‘x6’,‘男’),(‘x7’,‘男’);

    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --query "select * from staff where \$CONDITIONS" \
    --target-dir /kb10/test111904 \
    --split-by id \
    --num-mappers 2 \
    --check-column id \
    --incremental append \
    --last-value 2
    

    7、mysql->hdfs,根据时间戳增量导出

    –incremental lastmodified修改和增加
    –check 必须为timstamp类型

    create table lmtest(id int auto_increment primary key,name varchar(20),time timestamp);
    insert into lmtest(name) value('henry'),('pola'),('mike'),('ariel'),('jerry');
    insert into lmtest(name) value('liudehua'),('zhangxueyou'),('liming');
    
    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password kb10 \
    --query "select id,name,time from lmtest where \$CONDITIONS" \
    --target-dir '/kb10/lm1118' \
    --split-by id \
    -m 1 \
    --check-column time \
    --incremental lastmodified \
    --merge-key id \
    --last-value "2020-11-19 20:25:20.0"
    

    插入下面的数据后,修改上面的last-value,再次执行,即可把增量数据导入进去

    insert into lmtest(name) value('xxxx'),('ssssss'),('xxxxxxaaa');
    

    三、创建sqoop job

    1、设置免密登陆

    sqoop job一般用于定时任务等情况,如果每次都输入密码的话,会有些麻烦,因此可以设置免密登陆。方法如下:
    a).编辑sqoop的conf/目录下的sqoop-site.xml,在下方插入:

      		<property>
      		<name>sqoop.metastore.client.record.password</name>
      		<value>true</value>
      		<description>If true, allow saved passwords in the metastore.</description>
      		</property>
    

    b).然后设置一个路径,以echo的方式将数据库的密码写入隐藏文件.mysql.password,路径可以自由设置,我这边直接设置在了conf/下了

    echo -n “kb10” >/opt/software/hadoop/sqoop146/conf/.mysql.password

    c).最后在使用时,直接用--password file:///opt/software/hadoop/sqoop146/conf/.mysql.password \替代--password kb10,file:///后面的是密码文件的绝对路径。

    2、创建sqoop job

    sqoop job --create torderincrementjob -- import \
    --connect jdbc:mysql://192.168.221.140:3306/company \
    --username root \
    --password-file file:///opt/software/hadoop/sqoop146/conf/.mysql.password \
    --table staff \
    --target-dir /kb10/score1118 \
    -m 1 \
    --fields-terminated-by "," \
    --lines-terminated-by "\n" \
    --null-string '\\N' \
    --null-non-string '\\N' \
    --incremental append \
    --check-column id \
    --last-value 10
    

    然后在数据库里添加数据insert into score(student_id,course_id,score) values(4,1,85),(4,3,83),(4,5,85)
    执行:sqoop job -exec torderincrementjob
    数据库里每次更新数据后,直接输入上面的exec执行代码即可,当然也可以设置定时执行,目前还没操作过,暂时就不写这段了。
    附上sqoop job的操作指令:
    #查看sqoop job:sqoop job --listor:sqoop job --show JOB_NAME
    #删除sqoop job:sqoop job --delete JOB_NAME
    #执行sqoop job:sqoop job --exec JOB_NAME

    四、常用指令解释

    –input-fields-terminated-by 字段间的分隔符
    –input-lines-terminated-by 行之间的分隔符

    –append 将数据追加到 HDFS 中已经存在的 DataSet 中
    –m 或–num-mappers 启动 N 个 map 来并行导入数据,默认 4 个。
    –query 将查询结果作为数据导入,使用时必须伴随–target-dir,–hive-table,如果查询中有where 条件,则条件后必须加上$CONDITIONS 关键字
    –split-by 按照某一列来切分表的工作单元,不能与–autoreset-to-one-mapper 连用
    –target-dir 后面跟指定 HDFS 路径
    –where 从关系数据库导入数据时的查询条件
    –null-string string 类型的列如果 null,替换为指定字符串
    –null-non-string 非 string 类型的列如果 null,替换为指定字符串
    –check-column 作为增量导入判断的列名
    –incremental 和append 或 lastmodified配合
    –last-value 指定某一个值,用于标记增量导入的位置

    sqoop create-hive-table 生成与关系数据库表结构对应的 hive 表结构
    –hive-overwrite覆盖掉hive表中已经存在的数据
    –hive-table TABLE_NAME要创建的hive表,默认使用mysql表名

    五、定时任务完整流程

    现有一份数据表,在mysql中不断更新有数据,现在要求定时把mysql里新增的数据追加到HBase库里,然后映射为Hive表,方案如下:
    1、新建数据库exam(HBase、Hive里都建同名数据库),创建一张mysql表,id为主键自增列,表格主题内容为name,增加一列时间戳(作为增量导入的根据)。插入几行数据

    use exam;
    
    create table cron_test(
    id int auto_increment primary key,
    name varchar(20),
    regTime timestamp
    );
    
    insert into cron_test(name) values('xiaoming'),('angela'),('huagbo'),('xuzhheng');
    insert into cron_test(name) values('mas'),('safasf'),('weqw'),('fdsf');
    insert into cron_test(name) values('asdas'),('adad2'),('aaa'),('ddddd');
    

    2、新建HBase表,列簇为info,并映射到Hive里

    #新建hbase表 hb_cron_test
    create 'exam:hb_cron_test','info'
    
    #新建hive外部表 映射hbase 中的 exam:hb_cron_test
    create external table hv_cron_test(
    id int,
    name string,
    regTime timestamp
    )
    stored by  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties("hbase.columns.mapping" = ":key,info:name,info:regTime")
    tblproperties("hbase.table.name" = "exam:hb_cron_test")
    

    3、新建导入任务,把MySQL数据导入到HBase

    #导到hbase
    sqoop import \
    --connect jdbc:mysql://192.168.221.140:3306/exam \
    --username root \
    --password kb10 \
    --table cron_test \
    --columns id,name,regTime \
    --hbase-table exam:hb_cron_test \
    --hbase-row-key id \
    --column-family info \
    --incremental append \
    --check-column id \
    --last-value 0
    

    4、创建job,名字为myjob(仅第一行和上面的导入数据不一样,import前必须要有一个空格)

    #创建job
    sqoop job --create myjob -- import \
    --connect jdbc:mysql://192.168.221.140:3306/exam \
    --username root \
    --password kb10 \
    --table cron_test \
    --columns id,name,regTime \
    --hbase-table exam:hb_cron_test \
    --hbase-row-key id \
    --column-family info \
    --incremental append \
    --check-column id \
    --last-value 0
    

    5、创建shell脚本myjob.sh

    	#!/bin/bash
    	source /etc/profile
    	function now(){
    		echo $1" myjob at "`date +"%Y-%m-%D %H:%M:%S"`
    	}
    	now start
    	sqoop job -exec myjob
    	now end
    

    6、参考本文(3.1设置保存密码),先手动执行一次脚本,记住密码:./mhhjob.sh
    cd /opt/software/hadoop/sqoop146/conf/
    vi sqoop-site.xml,新加入下面的配置(或找到本来就有的配置然后把注释取消掉也行,都一样)

      		<property>
      		<name>sqoop.metastore.client.record.password</name>
      		<value>true</value>
      		<description>If true, allow saved passwords in the metastore.</description>
      		</property>
    

    7、创建配置文件,通过文件去执行

    • 设置为1分钟执行一次
    	vi mhh.log
    	*/1 * * * * bash /root/crontab/myjob.sh
    

    8、查看任务,可以看到,当前并没有任何定时任务

    crontab -l
    no crontab for root
    

    9、挂载任务

    	crontab mmh.log 
    

    10、执行任务

    	tail -f /var/log/cron
    

    在这里插入图片描述

    11、在mysql里增加下面的数据,然后在HBase、Hive里重新查询,就可以看到数据已经增量导入进去了!

    insert into cron_test(name) values('xiaoming1'),('angela1'),('huagbo1'),('xuzhheng1');
    insert into cron_test(name) values('12312'),('123123'),('qweqwe'),('aaadsad');
    
    展开全文
  • CDH 安装 sqoop1

    千次阅读 2019-03-07 22:22:41
    总帖:CDH 6系列(CDH 6.0、CHD6.1等)安装和使用 命令测试 1.sqoop help 2.sqoop import \ --connect jdbc:mysql://192.168.20.41:3306/adm \ --username root \ --password admin \ --table ...

    日萌社

    人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新)


     

    总帖:CDH 6系列(CDH 6.0、CHD6.1等)安装和使用

    命令测试
    
    1.sqoop help
    2.sqoop import \
    --connect jdbc:mysql://192.168.20.41:3306/adm \
    --username root \
    --password admin \
    --table userPortraitComplete \
    --hive-table adm.userPortraitComplete \
    --hive-import \
    --m 1


    展开全文
  • Sqoop配置使用

    千次阅读 2016-12-03 17:08:22
    SQOOP:底层是Mapreduce,利用Mapreduce加快数据传输速度,批处理方式进行数据传输,并且只有Map ...解压:tar -zxf sqoop-1.4.5-cdh5.3.6.tar.gz -C /opt/cdh-5.3.6/ sqoop-env-template.sh –》sqoop-env.shexport
  • cdh版本的sqoop安装以及配置

    千次阅读 2018-05-10 23:08:51
    sqoop安装需要提前安装好sqoop依赖:hadoop 、hive、hbase、zookeeperhadoop安装步骤请访问:http://www.cnblogs.com/xningge/articles/8433297.htmlhive安装步骤请访问:...zookeeper安装步骤请...
  • Sqoop配置安装

    2013-11-29 09:37:54
    Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导入到关系型数据库中。...
  • sqoop安装配置

    2020-09-26 10:46:48
    提取码Sqoop安装配置下载安装配置环境变量修改sqoop-env.sh验证安装 下载安装 下载sqoop拖入linux并解压 tar -zxvf sqoop-1.4.6-cdh5.14.2.tar.gz 重命名 mv sqoop-1.4.6-cdh5.14.2 sqoop 配置环境变量 vi /etc/...
  • Sqoop简介 Sqoop是一个用于Hadoop和关系型数据库或主机之间的数据传输工具。它可以将数据从关系型数据库import到HDFS,也可以从HDFS export到关系型数据库,通过Hadoop的MapReduce实现。 我们现在的需求就是需要...
  • Sqoop安装配置

    2020-09-26 11:00:41
    Sqoop安装配置Sqoop 安装配置下载并解压修改配置文件拷贝 JDBC 驱动验证 Sqoop测试 Sqoop 是否能够成功连接数据库 Sqoop 安装配置 下载并解压 安装 Sqoop 的前提是已经具备 Java 和 Hadoop、Hive、ZooKeeper、HBase ...
  • Hadoop CDH4.5 Sqoop部署

    2014-05-16 19:09:00
    CDH4中还提供了一个Sqoop 2, Sqoop 2是一个服务端和客户端模式在HDFS和关系数据库之间传输数据的。 1  安装 Sqoop 2     Sqoop 2服务被分成了两个包:客户端sqoop2-client和服务端sqoop2-server...
  • sqoop 配置及导入导出操作

    千次阅读 2011-11-16 21:06:42
    下面是CDH3和SQOOP 1.2.0的下载地址 http://archive.cloudera.com/cdh/3/hadoop-0.20.2-CDH3B4.tar.gz http://archive.cloudera.com/cdh/3/sqoop-1.2.0-CDH3B4.tar.gz 其中sqoop-1.2.0-CDH3B4依赖hadoop-core-...
  • 文章目录一、Sqoop简介二、Sqoop原理三、Sqoop安装3.1 下载并解压3.2 修改配置文件3.3 拷贝JDBC驱动3.4 配置环境变量3.5 验证是否安装成功 一、Sqoop简介 Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的...
  • 1.下载并解压 wget https://archive.cloudera.com/cdh5/cdh/5/sqoop-1.4.6-cdh5.7.0.tar.gz tar -zxvf sqoop-1.4.6-cdh5.7.0.tar.gz -C...Sqoop配置文件与大多数大数据框架类似,在sqoop根目录下的conf目录中。 ...
  • cdh集群安装sqoop

    千次阅读 2018-03-07 18:45:53
    sqoop-cdh安装:1、上传sqoop-1.4.6-cdh5.10.1.tar.gz包到/opt/sqoop目录2、解压缩文件: tar -zxf sqoop-1.4.6-cdh5.10.1.tar.gz3、设置环境变量:vim /etc/profile#添加一下内容export JAVA_HOME=/usr/java/jdk...
  • sqoop配置与使用

    2013-12-26 16:32:00
    复制3里hadoop-core-0.20.2-CDH3B4.jar到sqoop的lib下 5.在某处复制mysql-connector-java-5.1.10.jar到sqoop的lib下 6.修改configure-sqoop 注释掉hbase zookeeper检查: #if [ ! -d "${HBASE_...
  • 上一章说了怎么在CDH中安装LZO,安装完成后现在继续配置HDFS和Hive下的LZO,我的CDH版本为6.2.1,如果你的不一致,请查阅官方文档。链接: 点这里。 操作步骤 1、修改HDFS配置 在HDFS配置项中搜索“压缩编码解码器...
  • CDH中安装使用sqoop2

    千次阅读 2018-05-09 10:45:15
    由于我们的Hadoop、Hive等集群都是通过CDH安装部署的,而且CDH本身支持sqoop安装,因此直接就在这里安装测试了。CDH版本:5.14.x 安装过程如下:点击首页的“Cluster 1”进入Cluster 1界面后,点击Clusters-&gt...
  • sqoop-1.4.6-cdh5.13.2.tar

    2018-12-19 15:10:34
    mv /usr/local/sqoop-1.4.6-cdh5.13.2/conf/sqoop-env.template.sh /usr/local/sqoop-1.4.6-cdh5.13.2/conf/sqoop-env.sh vi /usr/local/sqoop-1.4.6-cdh5.13.2/conf/sqoop-env.sh export HADOOP_COMMON_HOME=/usr/...
  • CDH5.7中sqoop2使用

    千次阅读 2016-04-29 17:23:24
    ---------------------------------------sqoop2版本不支持直接导成hive表的形式,只能导入到hdfs中-------------------- 在官网下载对应版本的额包...设置配置文件/home/dba/sqoop2-1.99.5-cdh5.7.0
  • Sqoop安装配置介绍

    2019-08-22 14:05:33
    Sqoop安装配置介绍一、下载并安装二、修改配置文件三、拷贝jdbc驱动四、验证Sqoop五、测试Sqoop是否能够成功连接数据库 一、下载并安装 wget ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,483
精华内容 1,393
关键字:

cdhsqoop配置