精华内容
下载资源
问答
  • 使用maven+springmvc+spring来创建的关于在web端对HDFS的上传,下载,删除以及查看各级目录,部署后需要改ip地址,以及一些细节问题没有处理好。后期改善后会继续上传
  • 一 、hdfs的权限 hdfs是一个文件系统,类似于unix和linux。 1、有用户的概念 hdfs没有提供相关命令和接口去创建用户。它所采取的办法是,信任客户端,默认情况下使用的操作系统提供的用户。 当然hdfs支持扩展继承第...

    一 、hdfs的权限

    hdfs是一个文件系统,类似于unix和linux。

    1、有用户的概念

    hdfs没有提供相关命令和接口去创建用户。它所采取的办法是,信任客户端,默认情况下使用的操作系统提供的用户。
    当然hdfs支持扩展继承第三方用户认证系统,例如kerberos 、LDAP等。
    在hdfs中有超级用户的概念,hdfs系统中的超级用户是namenode进程的启动用户

    linux的超级用户是root

    2、hdfs还有权限的概念

    hdfs的权限是自己控制的,来自于hdfs的超级用户

    3、实操

    切换我们用root搭建的HDFS,使用god用户来启动

    步骤:

    • node1~node3添加god用户
    useradd god 
    passwd god
    
    • 将资源与用户绑定 (1、安装部署程序;2、数据存放的目录)
      使用下面的命令递归的将某个目录的拥有者权限设置为god用户。
    chown -R god src
    

    我们如果想要让god用户启动并管理hdfs集群,就需要给/opt/bigdata下的hadoop安装目录下的所有文件夹赋予god用户访问权限

        [root@node1 bigdata]# chown -R god hadoop-2.6.5/
        [root@node1 bigdata]# ll
        total 460
        drwxrwxr-x 10 god  haizhang    161 May  9 21:18 hadoop-2.6.5
        drwxr-xr-x 14 2002     2002   4096 Apr 28 16:28 zookeeper-3.4.14
        -rw-r--r--  1 root root     463805 May 14 21:25 zookeeper.out
        [root@node1 bigdata]# cd hadoop-2.6.5/
        [root@node1 hadoop-2.6.5]# ll
        total 112
        drwxrwxr-x 2 god haizhang   194 Oct  3  2016 bin
        drwxrwxr-x 4 god haizhang    40 May 10 21:15 etc
        drwxrwxr-x 2 god haizhang   106 Oct  3  2016 include
        drwxrwxr-x 3 god haizhang    20 Oct  3  2016 lib
        drwxrwxr-x 2 god haizhang   239 Oct  3  2016 libexec
        -rw-rw-r-- 1 god haizhang 84853 Oct  3  2016 LICENSE.txt
        drwxr-xr-x 2 god root      4096 May 17 18:09 logs
        -rw-rw-r-- 1 god haizhang 14978 Oct  3  2016 NOTICE.txt
        -rw-rw-r-- 1 god haizhang  1366 Oct  3  2016 README.txt
        drwxrwxr-x 2 god haizhang  4096 Oct  3  2016 sbin
        drwxrwxr-x 4 god haizhang    31 Oct  3  2016 share
    
    

    此时可以看到hadoop下的目录及其子目录的所有者,变成了god用户。当然这里root用户仍然是可以操作这些目录的。

    除此之外,god用户也要能够有权限访问hdfs中存放数据的目录,也就是我们定义的namenode,datanode,jn这些目录。

    [root@node1 bigdata]# cd /var/bigdata
    [root@node1 bigdata]# chown -R god hadoop/
    [root@node1 bigdata]# ll
    total 0
    drwxr-xr-x 6 god root 51 May 14 21:54 hadoop
    

    此时god用户就获得了hadoop目录下的所有读写执行权限了
    同样的在node2,node3机器上也执行上面的赋予god用户操作hadoop的权限。
    全部搞定之后,god用户就相当于hadoop的管理者了,可以调用hadoop的脚本命令以及进行读写操作等。

    • 切换到god去启动,其中启动start-dfs.sh的时候,需要god用户进行免密操作。

    首先输入su命令切换root用户到god用户

    su god
    

    然后cd到.ssh/目录下,可以发现一开始目录下面是空的。我们首先使用ssh命令登陆localhost

        [god@node1 .ssh]$ ssh localhost
    

    发现是需要输入密码的,原因是我们没有进行ssh免密,没生成ssh密钥。我们需要为god用户生成ssh密钥

        [god@node1 .ssh]$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
    

    然后将id_dsa文件内容加到authorized_keys文件中。

        [god@node1 .ssh]$ cat id_dsa.pub > authorized_keys
    

    此时如果你使用ssh localhost命令发现还需要输入密码,就说明你的authorized_keys文件权限不对,ssh对authorized_keys权限要求很苛刻,必须要为600.

        [god@node1 .ssh]$ chmod 600 authorized_keys
    

    此时使用ssh就可以免密登陆了。

    如果希望node1上使用god用户免密登陆其他node机器上,就需要将id_dsa分发。我们使用另一种方式进行分发ssh-copy-id -i id_dsa 主机名

    [god@node1 .ssh]$ ssh-copy-id -i id_dsa node2
    /usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "id_dsa.pub"
    The authenticity of host 'node2 (192.168.199.12)' can't be established.
    ECDSA key fingerprint is SHA256:nMKJNE3w5g86k6FNTWHanxEHqCElITTKmoHO1o9Aky4.
    ECDSA key fingerprint is MD5:44:3f:05:9c:45:e9:3a:2d:10:69:63:a9:38:d1:6c:7e.
    Are you sure you want to continue connecting (yes/no)? yes
    /usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
    /usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
    god@node2's password: 
    Permission denied, please try again.
    god@node2's password: 
    
    Number of key(s) added: 1
    
    Now try logging into the machine, with:   "ssh 'node2'"
    and check to make sure that only the key(s) you wanted were added.
    
    [god@node1 .ssh]$ ssh-copy-id -i id_dsa node3
    /usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "id_dsa.pub"
    The authenticity of host 'node3 (192.168.199.14)' can't be established.
    ECDSA key fingerprint is SHA256:nMKJNE3w5g86k6FNTWHanxEHqCElITTKmoHO1o9Aky4.
    ECDSA key fingerprint is MD5:44:3f:05:9c:45:e9:3a:2d:10:69:63:a9:38:d1:6c:7e.
    Are you sure you want to continue connecting (yes/no)? yes
    /usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
    /usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
    god@node3's password: 
    
    Number of key(s) added: 1
    
    Now try logging into the machine, with:   "ssh 'node3'"
    
    

    此时node2、node3上就有了node1的公钥了。

    可以登陆node3中查看authorized_keys文件

    [god@node3 .ssh]$ cat authorized_keys 
    ssh-dss AAAAB3NzaC1kc3MAAACBALsCYXAIHUmZge6ewfEhN6AU2cJIop0w+dyjxRlUBZkkjfsvPCrJ6NUo11reF3rNX7jbEC0Kz2lHZJ2khR2sPKLmm9HVVkCnbcHgG8EbNkRkIy3gK7tR2CeEB8XfIlmB8YVWxazHKnoi90WOeYEMwWT7xiYcuclSqY4MFgRRSZuZAAAAFQDcNM3gTFRZCdZS4vvXKox43S9TrQAAAIAWAYm+mmTn7oIJj8CfiL7yonuhsI06HBF6cZV87azO5VbxvQ24HtKbGprDN8iFdUAJoz4zyk7gi7Gj83kCZriulmDAPkVdDDJjZPDbIA66vsV/jXGiwfHQDLEUYNc3XHJJ9FMJJfKKEoYwH0BANGEzv0VqwY5dwLSP+xTR7g43PQAAAIAtz1J0W3osn8FvKcHO7TB2kxNi/xKYDJLOpeo3MRpXHlcSQB7a8i0q1vQtMr70sInDnkn28WCqj1bVdszHAgtTzZ/X5324sgtdlmw0R7yMigEg6nYWPvpfp7Ejewdu752T6lKb/prI6dF1cxSotHJ1t35I82mvinazeOxinszr3w== god@node1
    

    发现确实有了node1传递过来的id_dsa文件的内容。

    同理,node2也需要免密登陆node1.在node2中登陆god用户,并且为它生成ssh公钥

    [god@node2 .ssh]$ ssh-keygen -t dsa -P '' -f ./id_dsa  
    

    然后操作如下

        [god@node2 .ssh]$ ssh-copy-id -i id_dsa node2
        [god@node2 .ssh]$ ssh-copy-id -i id_dsa node1
    

    此时node2就可以免密用ssh登陆node2和node1了。

    • 修改hdfs-site.xml

    当我们做完上面的操作之后,登陆node1,需要修改hdfs-site.xml文件中的dfs.ha.fencing.ssh.private-key-files对应的value

      <property>
          <name>dfs.ha.fencing.ssh.private-key-files</name>
          <value>/home/god/.ssh/id_dsa</value>
        </property>
    

    将上面的修改分发给node2、node3

        [god@node1 .ssh]$ scp hdfs-site.xml node2:`pwd`
        [god@node1 .ssh]$ scp hdfs-site.xml node3:`pwd`
    
    • 使用god用户启动hdfs
        [god@node1 .ssh]$ start-dfs.sh
        Starting namenodes on [node1 node2]
        node2: starting namenode, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-namenode-node2.out
        node1: starting namenode, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-namenode-node1.out
        node2: starting datanode, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-datanode-node2.out
        node3: starting datanode, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-datanode-node3.out
        Starting journal nodes [node1 node2 node3]
        node2: starting journalnode, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-journalnode-node2.out
        node1: starting journalnode, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-journalnode-node1.out
        node3: starting journalnode, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-journalnode-node3.out
        Starting ZK Failover Controllers on NN hosts [node1 node2]
        node2: starting zkfc, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-zkfc-node2.out
        node1: starting zkfc, logging to /opt/bigdata/hadoop-2.6.5/logs/hadoop-god-zkfc-node1.out
    

    需要注意的是,node2、node3也要切换到god用户喔。

    • 使用god用户创建一个/user/god目录在hdfs上
        [god@node1 .ssh]$ hdfs dfs -mkdir -p /user/god
    

    在这里插入图片描述
    发现god用户可以在root用户创建的目录下新建子目录

    那么我们如果使用root用户能否在/user/god目录下创建个abc目录呢?

    [root@node1 ~]# hdfs dfs -mkdir -p /user/god/abc 
    mkdir: Permission denied: user=root, access=WRITE, inode="/user/god":god:supergroup:drwxr-xr-x
    

    发现创建失败,原因是root用户虽然属于supergroup组,但是supergroup组并没有对god用户创建的目录中具备写权限。

    谁启动hdfs的,谁就是老大

    4、 hdfs中的组

    hdfs中提供了创建组和修改组及组权限的接口

    • 改变用户目录的持有者和所属组:
    [god@node1 ~]$ hdfs dfs -mkdir -p /temp
    [god@node1 ~]$ hdfs dfs -chown god:ooxx /temp
    
    

    在这里插入图片描述
    此时目录/temp下的组就变成了ooxx了

    • 改变用户创建的目录对应的权限
        [god@node1 ~]$ hdfs dfs -chmod 770 /temp
    

    在这里插入图片描述
    此时ooxx组的用户都对这个/temp目录有读写执行操作权限了。

    • 查看当前用户在hdfs中的组
    [god@node1 ~]$ hdfs groups
    god : god
    

    使用hdfs dfsadmin

        [god@node1 ~]$ hdfs dfsadmin
        Usage: hdfs dfsadmin
        Note: Administrative commands can only be run as the HDFS superuser.
                [-report [-live] [-dead] [-decommissioning]]
                [-safemode <enter | leave | get | wait>]
                [-saveNamespace]
                [-rollEdits]
                [-restoreFailedStorage true|false|check]
                [-refreshNodes]
                [-setQuota <quota> <dirname>...<dirname>]
                [-clrQuota <dirname>...<dirname>]
                [-setSpaceQuota <quota> <dirname>...<dirname>]
                [-clrSpaceQuota <dirname>...<dirname>]
                [-finalizeUpgrade]
                [-rollingUpgrade [<query|prepare|finalize>]]
                [-refreshServiceAcl]
                [-refreshUserToGroupsMappings]
                [-refreshSuperUserGroupsConfiguration]
                [-refreshCallQueue]
                [-refresh <host:ipc_port> <key> [arg1..argn]
                [-reconfig <datanode|...> <host:ipc_port> <start|status>]
                [-printTopology]
                [-refreshNamenodes datanode_host:ipc_port]
                [-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]
                [-setBalancerBandwidth <bandwidth in bytes per second>]
                [-fetchImage <local directory>]
                [-allowSnapshot <snapshotDir>]
                [-disallowSnapshot <snapshotDir>]
                [-shutdownDatanode <datanode_host:ipc_port> [upgrade]]
                [-getDatanodeInfo <datanode_host:ipc_port>]
                [-metasave filename]
                [-setStoragePolicy path policyName]
                [-getStoragePolicy path]
                [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
                [-help [cmd]]
        
        Generic options supported are
        -conf <configuration file>     specify an application configuration file
        -D <property=value>            use value for given property
        -fs <local|namenode:port>      specify a namenode
        -jt <local|resourcemanager:port>    specify a ResourceManager
        -files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
        -libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
        -archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.
        
        The general command line syntax is
        bin/hadoop command [genericOptions] [commandOptions]
        
    
    

    上面给出的是hdfs dfsadmin可用的选项,注意,这里必须要是hdfs的超级用户才可以调用上面的选项,也就是启动hdfs的用户。

    我们注意到有个参数refreshUserToGroupsMappings,这个参数的含义是将当前namenode所在的机器上的用户所属组,刷新同步到hdfs中。例子如下:

    [root@node1 ~]# useradd good
    [root@node1 ~]# groupadd ooxx
    [root@node1 ~]# passwd good
    [root@node1 ~]# usermod -a -G ooxx good
    [root@node1 ~]# id good
    uid=1002(good) gid=1002(good) groups=1002(good),1003(ooxx)
    

    这里我在node1上创建了good用户,并且创建ooxx组,且让good用户修改其组为ooxx。不过就算你在本机上指定good用户为ooxx组的成员,hdfs中实际上还是没同步的(可以使用hdfs groups命令查)

    故此使用下面的命令同步:

    [god@node1 ~]$ hdfs dfsadmin -refreshUserToGroupsMappings
    Refresh user to groups mapping successful for node1/192.168.199.11:8020
    Refresh user to groups mapping successful for node2/192.168.199.12:8020
    [good@node1 ~]$ hdfs groups
    good : good ooxx
    

    此时good用户就可以在god用户创建的/temp目录下新建目录了

    [good@node3 root]$ hdfs dfs -mkdir -p /temp/abv
    

    在这里插入图片描述
    结论:默认hdfs依赖操作系统上的用户和用户组。

    二、集成开发环境

    我们需要在IDEA中集成hdfs的clien来调用hdfs的接口执行一些操作。
    客户端向hdfs中汇报它自己是谁的时候,可以使用下面三种方式
    1、配置hdfs中client向集群提供的用户名

    • 参考系统登陆用户名

    • 参考环境变量
      我们可以配置环境变量如下:
      在这里插入图片描述
      这一步必须要优先启动下idea
      jdk版本:集群和开发环境jdk版本需要一致

    • 代码中指明
      使用FileSystem对象获取
        //手动的指定连接fs的时候使用的用户名
        fs = FileSystem.get(URI.create("hdfs://mycluster"),conf,"god")
    
    2、配置hdfs的pom文件

    其中hadoop包括(common,hdfs,yarn,mapreduce)

    导入下面的pom依赖

     <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.6.5</version>
            </dependency>
    

    将node1节点上的hadoop目录下的core-site.xml 和 hdfs-site.xml引入到idea项目中的resources路径下;
    在这里插入图片描述

    3、使用代码连接hdfs并创建一个目录

    代码如下:

        public class TestHDFS {
        
            public Configuration conf = null;
            //这才是和hdfs连接的客户端
            public FileSystem fs = null;
        
            @Before
            public void conn() throws IOException {
                //这里Configuration构造函数传入loadDefaults为true表示启动之后会加载编译好的core-site.xml和hdfs-site.xml里的配置
                conf = new Configuration(true);
                /**
                 *创建fs对象,注意这里会根据core-site.xml配置的fs.defaultFS对应的值判断返回fs对象的具体类型。比如我们配置的这个值
                 * 有个hdfs://前缀,那就会返回一个分布式的文件系统fs对象回来。
                 * 并且这里的fs会去读取我们之前定义的HADOOP_USER_NAME的环境变量,来获取向hdfs提供的客户端角色(定义为god用户)
                 */
                fs = FileSystem.get(conf);
            }
        
            /**
             * 模拟向hdfs中创建目录
             */
            @Test
            public void mkdir() throws IOException {
                Path path = new Path("/luohaizhang");
                //检查目录是否存在在hdfs中
                if(fs.exists(path)){
                    //删除已存在的目录
                    fs.delete(path,true);
                }
                //创建目录
                boolean mkdirs = fs.mkdirs(path);
            }
        
            @After
            public void close() throws IOException {
                fs.close();
            }
        }
    
    

    上面代码的注解已经标注很清晰了,注意一点的是fs = FileSystem.get(conf);这种构造Filesystem对象的方式,默认是去你主机配置的环境变量获取用户信息的。
    效果:
    在这里插入图片描述

    4、使用代码上传文件到hdfs

        /**
             * 上传文件
             * @throws IOException
             */
            @Test
            public void uploadFile() throws IOException {
                BufferedInputStream input = new BufferedInputStream(new FileInputStream("D:\\redislog.txt"));
                Path path = new Path("/luohaizhang/redisLog");
                FSDataOutputStream fsDataOutputStream = fs.create(path);
                IOUtils.copyBytes(input,fsDataOutputStream,conf,true);
            }
    
    

    效果:

    在这里插入图片描述

    5、从hdfs读取下载文件

    下面给出的是直接读取文件,文件块对用户来讲是隐藏的。用户感知不到:

         /**
             * 读取下载hdfs文件
             * @throws IOException
             */
            @Test
            public void readFile() throws IOException {
                BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream("./data/redisLogHdfs.txt"));
                Path path = new Path("/luohaizhang/redisLog");
                FSDataInputStream open = fs.open(path);
                IOUtils.copyBytes(open,outputStream,conf,true);
            }
    

    当然如果你想要知道文件块都在hdfs中的哪些节点上存储,也是很简单的。这里现在hdfs中上传一个比较大的文件,并定义文件块划分的大小

        [god@node1 ~]$ for i in `seq 100000`;do echo "hello luohaizhang god $i" >> data.txt; done;
        [god@node1 ~]$ ll -h
        -rw-rw-r-- 1 god god 2.7M May 19 11:08 data.txt
        [god@node1 ~]$ hdfs dfs -D dfs.blocksize=1048576 -put data.txt
    

    在这里插入图片描述

    此时data.txt默认上传到hdfs中的/user/god目录下。并且备拆分成3个block,每个block存在2个副本。
    现在我们可以使用代码来获取这个文件的元信息

        /**
         * 读取下载hdfs文件
         * @throws IOException
         */
        @Test
        public void readFileMetaInfo() throws IOException {
            Path path = new Path("/user/god/data.txt");
            FSDataInputStream open = fs.open(path);
            //去fs中获取path指定的文件的元数据
            FileStatus fss = fs.getFileStatus(path);
            //这里会去fs中获取文件元数据指定长度的文件块存放的位置信息。
            BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fss, 0, fss.getLen());
            //遍历文件块位置信息
            for(BlockLocation blockLocation : fileBlockLocations)
                System.out.println(blockLocation);
        }
    

    对应的结果:
    在这里插入图片描述

    对上面结果进行分析:

        0,1048576,node3,node2
        块的起始location,块的末尾location,块的副本存放在node3上,块的副本存放在node2上。
    

    这也是hdfs的强大之处,我们可以获取文件块的存放位置,按块拉取文件。也使得计算向数据移动(通过hdfs提供的文件块location)。提高处理效率。

    我们可以进入到hdfs机器中确认下,上面生成的3个文件块里面存放的内容

    [root@node2 subdir0]# pwd
    /var/bigdata/hadoop/ha/dfs/data/current/BP-1181962499-192.168.199.11-1589603211493/current/finalized/subdir0/subdir0
    [root@node2 subdir0]# ll -h
    total 2.7M
    -rw-rw-r-- 1 god god 1.1K May 19 10:45 blk_1073741825
    -rw-rw-r-- 1 god god   19 May 19 10:45 blk_1073741825_1001.meta
    -rw-rw-r-- 1 god god 1.0M May 19 11:09 blk_1073741826
    -rw-rw-r-- 1 god god 8.1K May 19 11:09 blk_1073741826_1002.meta
    -rw-rw-r-- 1 god god 1.0M May 19 11:09 blk_1073741827
    -rw-rw-r-- 1 god god 8.1K May 19 11:09 blk_1073741827_1003.meta
    -rw-rw-r-- 1 god god 676K May 19 11:09 blk_1073741828
    -rw-rw-r-- 1 god god 5.3K May 19 11:09 blk_1073741828_1004.meta
    

    可以看到node2中datanode存放了3个文件块以及校验和(后缀826-828的)。正是我们上面传递的文件。 首先可以访问下第一个块 blk_1073741826的文件末尾内容

    [root@node2 subdir0]# cat blk_1073741826 | tail -10 
    hello luohaizhang god 37837
    hello luohaizhang god 37838
    hello luohaizhang god 37839
    hello luohaizhang god 37840
    hello luohaizhang god 37841
    hello luohaizhang god 37842
    hello luohaizhang god 37843
    hello luohaizhang god 37844
    hello luohaizhang god 37845
    hello luohaizhang god 
    

    发现因为按块划分,部门溢出块大小的字节数将被移到块 blk_1073741827中,我们看下块 blk_1073741827的文件内容

        [root@node2 subdir0]# cat blk_1073741827 |head -10  
        37846
        hello luohaizhang god 37847
        hello luohaizhang god 37848
        hello luohaizhang god 37849
        hello luohaizhang god 37850
        hello luohaizhang god 37851
        hello luohaizhang god 37852
        hello luohaizhang god 37853
        hello luohaizhang god 37854
        hello luohaizhang god 37855
    

    发现块 blk_1073741827的文件首行就是块 blk_1073741826缺少的部分。这个时候,如果程序A和程序B都想要使用HDFS提供的客户端FileSystem去读取不同块的内容,怎么实现呢?观察下面的代码

    
        @Test
        public void readFile3() throws IOException {
            Path path = new Path("/user/god/data.txt");
            FSDataInputStream in = fs.open(path);
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
        }
    
    

    结果:
    在这里插入图片描述
    上面结果显示,如果直接调用fs提供的输出流的readByte方法,实际上只会从文件块的第一行开始读,那么同理,不管有多少个程序调用readByte方法,都是从第一个文件块的第一行读下去。这样就不能做到并行计算的目的。幸好FSDataInputStream提供了seek方法,让客户端提供他想要读取的文件块location,然后从该location往下读即可

      @Test
        public void readFile3() throws IOException {
            Path path = new Path("/user/god/data.txt");
            FSDataInputStream in = fs.open(path);
            //假设想要从第二个文件块的首行开始读,这里一个文件块大小是1M = 1048576
            in.seek(1048576);
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
            System.out.println((char)in.readByte());
    
        }
    

    结果:

    在这里插入图片描述
    此时确实是第二个文件块blk_1073741827中首行的内容!这样就可以根据不同的location,定位想要获取的数据,并行计算即可!并且默认的是读取离自己最近的datanode,这是hdfs框架的默认机制。

    展开全文
  • 1.下载apache-maven-3.5.0-bin.tar,并设置MAVEN_HOME. 2. 下载hadoop-2.6.0.tar,并设置HADOOP_HOME. 3.POM配置。 < dependency >   <!-- Spark dependency -->   < groupId > org.apache.spark ...

    1.下载apache-maven-3.5.0-bin.tar,并设置MAVEN_HOME.

    2. 下载hadoop-2.6.0.tar,并设置HADOOP_HOME.

    3.POM配置。

    <dependency> <!-- Spark dependency -->

          <groupId>org.apache.spark</groupId>

          <artifactId>spark-sql_2.11</artifactId>

          <version>2.2.0</version>

        </dependency>

        

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-core_2.11</artifactId>

        <version>2.2.0</version>

        <scope>provided</scope>

    </dependency>

        <dependency> <!-- Spark dependency -->

          <groupId>org.apache.hadoop</groupId>

          <artifactId>hadoop-client</artifactId>

          <version>2.6.0</version>

        </dependency>

        <dependency>

        <groupId>com.google.collections</groupId>

        <artifactId>google-collections</artifactId>

        <version>1.0</version>

    </dependency>


    4. 创建SimpleApp类。

    import java.util.Arrays;
    import java.util.List;


    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.SparkSession;


    public class SimpleApp {
    public static void main(String[] args) {

              //local代表运行本地集群
       SparkConf conf = new SparkConf().setAppName("app demo").setMaster("local");
       JavaSparkContext sc = new JavaSparkContext(conf);

               //文件放在工程根目录下面
       JavaRDD<String> lines = sc.textFile("test.txt");
       JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
       int totalLength = lineLengths.reduce((a, b) -> a + b);
       
       System.out.println("length:"+totalLength);
       
     }
    }


    5.运行MAVEN命令eclipse:clean eclipse:eclipse构建好MAVEN项目。

    6.运行SimpleApp

    展开全文
  • SpringBoot集成Hadoop系列一 ---- 对HDFS的文件操作

    万次阅读 热门讨论 2019-05-23 19:32:43
    HDFS操作设计以下几个主要的类: Configuration:封装了客户端或者服务器的配置信息 FileSystem:此类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作通过FileSystem的静态方法get获得该对象...

    一.对HDFS操作设计以下几个主要的类:

    Configuration:封装了客户端或者服务器的配置信息

    FileSystem:此类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作通过FileSystem的静态方法get获得该对象,例:FileSystem hdfs = FileSystem.get(conf);

    FSDataInputStream:这是HDFS中的输入流,通过由FileSystem的open方法获取

    FSDataOutputStream:这是HDFS中的输出流,通过由FileSystem的create方法获取

    二.依赖配置

    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.hdfs</groupId>
    	<artifactId>HadoopTest</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    	<name>HadoopTest</name>
    	<url>http://maven.apache.org</url>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.0.0.RELEASE</version>
    		<relativePath />
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-common</artifactId>
    			<version>3.1.1</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-hdfs</artifactId>
    			<version>3.1.1</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-client</artifactId>
    			<version>3.1.1</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-mapreduce-client-core</artifactId>
    			<version>3.1.1</version>
    		</dependency>
    		<dependency>
    			<groupId>cn.bestwu</groupId>
    			<artifactId>ik-analyzers</artifactId>
    			<version>5.1.0</version>
    		</dependency>
    		<dependency>
    			<groupId>jdk.tools</groupId>
    			<artifactId>jdk.tools</artifactId>
    			<version>1.8</version>
    			<scope>system</scope>
    			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
    		</dependency>
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<scope>test</scope>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<configuration>
    					<source>1.8</source>
    					<target>1.8</target>
    				</configuration>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    
    # tomcat thread = 200
    server.tomcat.max-threads=1000
    
    <!--edit tomcat port-->
    server.port=8900
    # session time 30
    server.session-timeout=60
    
    spring.application.name=hadoop
    spring.servlet.multipart.max-file-size=50MB
    spring.servlet.multipart.max-request-size=50MB
    
    hdfs.path=hdfs://localhost:9000
    hdfs.username=linhaiy
    logging.config=classpath:logback.xml
    

    三.HDFS文件操作接口开发

    package com.hadoop.config;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * HDFS配置类
     * @author linhaiy
     * @date 2019.05.18
     */
    @Configuration
    public class HdfsConfig {
        @Value("${hdfs.path}")
        private String path;
    
        public String getPath() {
            return path;
        }
    
        public void setPath(String path) {
            this.path = path;
        }
    }
    
    
    package com.hadoop.hdfs.entity;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    /**
     * 用户实体类
     * @author linhaiy
     * @date 2019.05.18
     */
    public class User implements Writable {
    
    	private String username;
    	private Integer age;
    	private String address;
    
    	public User() {
    		super();
    		// TODO Auto-generated constructor stub
    	}
    
    	public User(String username, Integer age, String address) {
    		super();
    		this.username = username;
    		this.age = age;
    		this.address = address;
    	}
    
    	@Override
    	public void write(DataOutput output) throws IOException {
    		// 把对象序列化
    		output.writeChars(username);
    		output.writeInt(age);
    		output.writeChars(address);
    	}
    
    	@Override
    	public void readFields(DataInput input) throws IOException {
    		// 把序列化的对象读取到内存中
    		username = input.readUTF();
    		age = input.readInt();
    		address = input.readUTF();
    	}
    
    	public String getUsername() {
    		return username;
    	}
    
    	public void setUsername(String username) {
    		this.username = username;
    	}
    
    	public Integer getAge() {
    		return age;
    	}
    
    	public void setAge(Integer age) {
    		this.age = age;
    	}
    
    	public String getAddress() {
    		return address;
    	}
    
    	public void setAddress(String address) {
    		this.address = address;
    	}
    
    	@Override
    	public String toString() {
    		return "User [username=" + username + ", age=" + age + ", address=" + address + "]";
    	}
    
    }
    
    package com.hadoop.hdfs.service;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import javax.annotation.PostConstruct;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.apache.hadoop.io.IOUtils;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.web.multipart.MultipartFile;
    
    import com.hadoop.util.JsonUtil;
    
    @Component
    public class HdfsService {
    
    	@Value("${hdfs.path}")
    	private String path;
    	@Value("${hdfs.username}")
    	private String username;
    
    	private static String hdfsPath;
    	private static String hdfsName;
    	private static final int bufferSize = 1024 * 1024 * 64;
    
    	/**
    	 * 获取HDFS配置信息
    	 * @return
    	 */
    	private static Configuration getConfiguration() {
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", hdfsPath);
    		return configuration;
    	}
    
    	/**
    	 * 获取HDFS文件系统对象
    	 * @return
    	 * @throws Exception
    	 */
    	public static FileSystem getFileSystem() throws Exception {
    		// 客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份
    		// DHADOOP_USER_NAME=hadoop
    		// 也可以在构造客户端fs对象时,通过参数传递进去
    		FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
    		return fileSystem;
    	}
    
    	/**
    	 * 在HDFS创建文件夹
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static boolean mkdir(String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return false;
    		}
    		if (existFile(path)) {
    			return true;
    		}
    		FileSystem fs = getFileSystem();
    		// 目标路径
    		Path srcPath = new Path(path);
    		boolean isOk = fs.mkdirs(srcPath);
    		fs.close();
    		return isOk;
    	}
    
    	/**
    	 * 判断HDFS文件是否存在
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static boolean existFile(String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return false;
    		}
    		FileSystem fs = getFileSystem();
    		Path srcPath = new Path(path);
    		boolean isExists = fs.exists(srcPath);
    		return isExists;
    	}
    
    	/**
    	 * 读取HDFS目录信息
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static List<Map<String, Object>> readPathInfo(String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return null;
    		}
    		if (!existFile(path)) {
    			return null;
    		}
    		FileSystem fs = getFileSystem();
    		// 目标路径
    		Path newPath = new Path(path);
    		FileStatus[] statusList = fs.listStatus(newPath);
    		List<Map<String, Object>> list = new ArrayList<>();
    		if (null != statusList && statusList.length > 0) {
    			for (FileStatus fileStatus : statusList) {
    				Map<String, Object> map = new HashMap<>();
    				map.put("filePath", fileStatus.getPath());
    				map.put("fileStatus", fileStatus.toString());
    				list.add(map);
    			}
    			return list;
    		} else {
    			return null;
    		}
    	}
    
    	/**
    	 * HDFS创建文件
    	 * @param path
    	 * @param file
    	 * @throws Exception
    	 */
    	public static void createFile(String path, MultipartFile file) throws Exception {
    		if (StringUtils.isEmpty(path) || null == file.getBytes()) {
    			return;
    		}
    		String fileName = file.getOriginalFilename();
    		FileSystem fs = getFileSystem();
    		// 上传时默认当前目录,后面自动拼接文件的目录
    		Path newPath = new Path(path + "/" + fileName);
    		// 打开一个输出流
    		FSDataOutputStream outputStream = fs.create(newPath);
    		outputStream.write(file.getBytes());
    		outputStream.close();
    		fs.close();
    	}
    
    	/**
    	 * 读取HDFS文件内容
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static String readFile(String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return null;
    		}
    		if (!existFile(path)) {
    			return null;
    		}
    		FileSystem fs = getFileSystem();
    		// 目标路径
    		Path srcPath = new Path(path);
    		FSDataInputStream inputStream = null;
    		try {
    			inputStream = fs.open(srcPath);
    			// 防止中文乱码
    			BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
    			String lineTxt = "";
    			StringBuffer sb = new StringBuffer();
    			while ((lineTxt = reader.readLine()) != null) {
    				sb.append(lineTxt);
    			}
    			return sb.toString();
    		} finally {
    			inputStream.close();
    			fs.close();
    		}
    	}
    
    	/**
    	 * 读取HDFS文件列表
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static List<Map<String, String>> listFile(String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return null;
    		}
    		if (!existFile(path)) {
    			return null;
    		}
    
    		FileSystem fs = getFileSystem();
    		// 目标路径
    		Path srcPath = new Path(path);
    		// 递归找到所有文件
    		RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(srcPath, true);
    		List<Map<String, String>> returnList = new ArrayList<>();
    		while (filesList.hasNext()) {
    			LocatedFileStatus next = filesList.next();
    			String fileName = next.getPath().getName();
    			Path filePath = next.getPath();
    			Map<String, String> map = new HashMap<>();
    			map.put("fileName", fileName);
    			map.put("filePath", filePath.toString());
    			returnList.add(map);
    		}
    		fs.close();
    		return returnList;
    	}
    
    	/**
    	 * HDFS重命名文件
    	 * @param oldName
    	 * @param newName
    	 * @return
    	 * @throws Exception
    	 */
    	public static boolean renameFile(String oldName, String newName) throws Exception {
    		if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
    			return false;
    		}
    		FileSystem fs = getFileSystem();
    		// 原文件目标路径
    		Path oldPath = new Path(oldName);
    		// 重命名目标路径
    		Path newPath = new Path(newName);
    		boolean isOk = fs.rename(oldPath, newPath);
    		fs.close();
    		return isOk;
    	}
    
    	/**
    	 * 删除HDFS文件
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static boolean deleteFile(String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return false;
    		}
    		if (!existFile(path)) {
    			return false;
    		}
    		FileSystem fs = getFileSystem();
    		Path srcPath = new Path(path);
    		boolean isOk = fs.deleteOnExit(srcPath);
    		fs.close();
    		return isOk;
    	}
    
    	/**
    	 * 上传HDFS文件
    	 * @param path
    	 * @param uploadPath
    	 * @throws Exception
    	 */
    	public static void uploadFile(String path, String uploadPath) throws Exception {
    		if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
    			return;
    		}
    		FileSystem fs = getFileSystem();
    		// 上传路径
    		Path clientPath = new Path(path);
    		// 目标路径
    		Path serverPath = new Path(uploadPath);
    
    		// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
    		fs.copyFromLocalFile(false, clientPath, serverPath);
    		fs.close();
    	}
    
    	/**
    	 * 下载HDFS文件
    	 * @param path
    	 * @param downloadPath
    	 * @throws Exception
    	 */
    	public static void downloadFile(String path, String downloadPath) throws Exception {
    		if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
    			return;
    		}
    		FileSystem fs = getFileSystem();
    		// 上传路径
    		Path clientPath = new Path(path);
    		// 目标路径
    		Path serverPath = new Path(downloadPath);
    
    		// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
    		fs.copyToLocalFile(false, clientPath, serverPath);
    		fs.close();
    	}
    
    	/**
    	 * HDFS文件复制 
    	 * @param sourcePath
    	 * @param targetPath
    	 * @throws Exception
    	 */
    	public static void copyFile(String sourcePath, String targetPath) throws Exception {
    		if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
    			return;
    		}
    		FileSystem fs = getFileSystem();
    		// 原始文件路径
    		Path oldPath = new Path(sourcePath);
    		// 目标路径
    		Path newPath = new Path(targetPath);
    
    		FSDataInputStream inputStream = null;
    		FSDataOutputStream outputStream = null;
    		try {
    			inputStream = fs.open(oldPath);
    			outputStream = fs.create(newPath);
    
    			IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);
    		} finally {
    			inputStream.close();
    			outputStream.close();
    			fs.close();
    		}
    	}
    
    	/**
    	 * 打开HDFS上的文件并返回byte数组
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static byte[] openFileToBytes(String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return null;
    		}
    		if (!existFile(path)) {
    			return null;
    		}
    		FileSystem fs = getFileSystem();
    		// 目标路径
    		Path srcPath = new Path(path);
    		try {
    			FSDataInputStream inputStream = fs.open(srcPath);
    			return IOUtils.readFullyToByteArray(inputStream);
    		} finally {
    			fs.close();
    		}
    	}
    
    	/**
    	 * 打开HDFS上的文件并返回java对象
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static <T extends Object> T openFileToObject(String path, Class<T> clazz) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return null;
    		}
    		if (!existFile(path)) {
    			return null;
    		}
    		String jsonStr = readFile(path);
    		return JsonUtil.fromObject(jsonStr, clazz);
    	}
    
    	/**
    	 * 获取某个文件在HDFS的集群位置
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	public static BlockLocation[] getFileBlockLocations(String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return null;
    		}
    		if (!existFile(path)) {
    			return null;
    		}
    		FileSystem fs = getFileSystem();
    		// 目标路径
    		Path srcPath = new Path(path);
    		FileStatus fileStatus = fs.getFileStatus(srcPath);
    		return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    	}
    
    	@PostConstruct
    	public void getPath() {
    		hdfsPath = this.path;
    	}
    
    	@PostConstruct
    	public void getName() {
    		hdfsName = this.username;
    	}
    
    	public static String getHdfsPath() {
    		return hdfsPath;
    	}
    
    	public String getUsername() {
    		return username;
    	}
    }
    
    package com.hadoop.hdfs.controller;
    
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.fs.BlockLocation;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.multipart.MultipartFile;
    
    import com.hadoop.hdfs.entity.User;
    import com.hadoop.hdfs.service.HdfsService;
    import com.hadoop.util.Result;
    
    @RestController
    @RequestMapping("/hadoop/hdfs")
    public class HdfsAction {
    
    	private static Logger LOGGER = LoggerFactory.getLogger(HdfsAction.class);
    
    	/**
    	 * 创建文件夹
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@RequestMapping(value = "mkdir", method = RequestMethod.POST)
    	@ResponseBody
    	public Result mkdir(@RequestParam("path") String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			LOGGER.debug("请求参数为空");
    			return new Result(Result.FAILURE, "请求参数为空");
    		}
    		// 创建空文件夹
    		boolean isOk = HdfsService.mkdir(path);
    		if (isOk) {
    			LOGGER.debug("文件夹创建成功");
    			return new Result(Result.SUCCESS, "文件夹创建成功");
    		} else {
    			LOGGER.debug("文件夹创建失败");
    			return new Result(Result.FAILURE, "文件夹创建失败");
    		}
    	}
    
    	/**
    	 * 读取HDFS目录信息
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/readPathInfo")
    	public Result readPathInfo(@RequestParam("path") String path) throws Exception {
    		List<Map<String, Object>> list = HdfsService.readPathInfo(path);
    		return new Result(Result.SUCCESS, "读取HDFS目录信息成功", list);
    	}
    
    	/**
    	 * 获取HDFS文件在集群中的位置
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/getFileBlockLocations")
    	public Result getFileBlockLocations(@RequestParam("path") String path) throws Exception {
    		BlockLocation[] blockLocations = HdfsService.getFileBlockLocations(path);
    		return new Result(Result.SUCCESS, "获取HDFS文件在集群中的位置", blockLocations);
    	}
    
    	/**
    	 * 创建文件
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/createFile")
    	public Result createFile(@RequestParam("path") String path, @RequestParam("file") MultipartFile file)
    			throws Exception {
    		if (StringUtils.isEmpty(path) || null == file.getBytes()) {
    			return new Result(Result.FAILURE, "请求参数为空");
    		}
    		HdfsService.createFile(path, file);
    		return new Result(Result.SUCCESS, "创建文件成功");
    	}
    
    	/**
    	 * 读取HDFS文件内容
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/readFile")
    	public Result readFile(@RequestParam("path") String path) throws Exception {
    		String targetPath = HdfsService.readFile(path);
    		return new Result(Result.SUCCESS, "读取HDFS文件内容", targetPath);
    	}
    
    	/**
    	 * 读取HDFS文件转换成Byte类型
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/openFileToBytes")
    	public Result openFileToBytes(@RequestParam("path") String path) throws Exception {
    		byte[] files = HdfsService.openFileToBytes(path);
    		return new Result(Result.SUCCESS, "读取HDFS文件转换成Byte类型", files);
    	}
    
    	/**
    	 * 读取HDFS文件装换成User对象
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/openFileToUser")
    	public Result openFileToUser(@RequestParam("path") String path) throws Exception {
    		User user = HdfsService.openFileToObject(path, User.class);
    		return new Result(Result.SUCCESS, "读取HDFS文件装换成User对象", user);
    	}
    
    	/**
    	 * 读取文件列表
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/listFile")
    	public Result listFile(@RequestParam("path") String path) throws Exception {
    		if (StringUtils.isEmpty(path)) {
    			return new Result(Result.FAILURE, "请求参数为空");
    		}
    		List<Map<String, String>> returnList = HdfsService.listFile(path);
    		return new Result(Result.SUCCESS, "读取文件列表成功", returnList);
    	}
    
    	/**
    	 * 重命名文件
    	 * @param oldName
    	 * @param newName
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/renameFile")
    	public Result renameFile(@RequestParam("oldName") String oldName, @RequestParam("newName") String newName)
    			throws Exception {
    		if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
    			return new Result(Result.FAILURE, "请求参数为空");
    		}
    		boolean isOk = HdfsService.renameFile(oldName, newName);
    		if (isOk) {
    			return new Result(Result.SUCCESS, "文件重命名成功");
    		} else {
    			return new Result(Result.FAILURE, "文件重命名失败");
    		}
    	}
    
    	/**
    	 * 删除文件
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/deleteFile")
    	public Result deleteFile(@RequestParam("path") String path) throws Exception {
    		boolean isOk = HdfsService.deleteFile(path);
    		if (isOk) {
    			return new Result(Result.SUCCESS, "delete file success");
    		} else {
    			return new Result(Result.FAILURE, "delete file fail");
    		}
    	}
    
    	/**
    	 * 上传文件
    	 * @param path
    	 * @param uploadPath
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/uploadFile")
    	public Result uploadFile(@RequestParam("path") String path, @RequestParam("uploadPath") String uploadPath)
    			throws Exception {
    		HdfsService.uploadFile(path, uploadPath);
    		return new Result(Result.SUCCESS, "upload file success");
    	}
    
    	/**
    	 * 下载文件
    	 * @param path
    	 * @param downloadPath
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/downloadFile")
    	public Result downloadFile(@RequestParam("path") String path, @RequestParam("downloadPath") String downloadPath)
    			throws Exception {
    		HdfsService.downloadFile(path, downloadPath);
    		return new Result(Result.SUCCESS, "download file success");
    	}
    
    	/**
    	 * HDFS文件复制
    	 * @param sourcePath
    	 * @param targetPath
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/copyFile")
    	public Result copyFile(@RequestParam("sourcePath") String sourcePath, @RequestParam("targetPath") String targetPath)
    			throws Exception {
    		HdfsService.copyFile(sourcePath, targetPath);
    		return new Result(Result.SUCCESS, "copy file success");
    	}
    
    	/**
    	 * 查看文件是否已存在
    	 * @param path
    	 * @return
    	 * @throws Exception
    	 */
    	@PostMapping("/existFile")
    	public Result existFile(@RequestParam("path") String path) throws Exception {
    		boolean isExist = HdfsService.existFile(path);
    		return new Result(Result.SUCCESS, "file isExist: " + isExist);
    	}
    }
    

    四.一些测试结果截图

     

     

     

    展开全文
  • 构建基于SpringCloud分布式,使用hadoop HDFS实现文件的上传下载功能 最近做springcloud项目开发,需要使用文件服务器HDFS进行文件的上传下载,所以将实现的方法步骤做个记录,分享出来,如有疑问,欢迎随时交流: ...

    构建基于SpringCloud分布式,使用hadoop HDFS实现文件的上传下载功能

    最近做springcloud项目开发,需要使用文件服务器HDFS进行文件的上传下载,所以将实现的方法步骤做个记录,分享出来,如有疑问,欢迎随时交流:

    项目结构如下

    在这里插入图片描述
    项目使用到Eureka作为注册中心,通过Feign客户端进行服务调用,废话少说,直接上代码
    代码片.

    实现层(file_service_impl)

    // application.yml配置文件
    server:
      port: 8020
      undertow:
        direct-buffers: true
    ### hadoop配置
    hdfs:
     
    展开全文
  • storm整合hdfs—将数据写到hdfs

    千次阅读 2018-11-06 11:09:21
    最近由于业务需求,需要将数据经过storm实时处理加工之后,要转存到HDFS。小厨在实现业务之前首先写了一个测试用例,话不多说,直接上干货。。。 首先介绍一下用例业务中使用的软件版本:storm 1.1.0、 hadoop ...
  • 一、Storm集成HDFS 1.1 项目结构 本用例源码下载地址:storm-hdfs-integration 1.2 项目主要依赖 项目主要依赖如下,有两个地方需要注意: 这里由于我服务器上安装的是 CDH 版本的 Hadoop,在导入依赖时...
  • 大数据平台配置可执行的jar包,需求是jar包能够获取hive表数据,并将数据上传到hdfs。 组件 jdk8 + hive + hdfs 源码 https://gitee.com/acelee723/acelee-hive-hdfs-main-jar 代码 1.hive操作类 import org...
  • Spring boot项目整合Hadoop的HDFS

    千次阅读 2018-11-16 19:38:36
    由于最近需要使用Spring boot整合Hadoop的HDFS,但是在整合的过程遇到了很多问题,网上也没有现成教程,都是自己摸索出来的,配置了很久都没能把项目搭建出来,希望对大家有帮助。 使用Spring boot整合HDFS主要是...
  • 在实施方案前,假设读者已经基本熟悉以下技术 (不细说) Java,maven hdfs,kerberos 方案实施 最后目录大概如下 新建maven工程,pom.xml配置, 添加以下 org.apache.hadoop hadoop-client 2.6.5 org.apache.hadoop ...
  • //跟HDFS数据同步的时间,当tuple中结果达到了1K,就与HDFS进行同步 bolt.withSyncPolicy(new CountSyncPolicy(100)); return bolt; } // 返回一个Redis的Bolt将结果插入到Redis private static IRichBolt ...
  • springBoo整合HDFS

    2019-11-19 08:55:28
    1、HDFS环境搭建:https://blog.csdn.net/BlaineLi/article/details/103135082 2、SpringBoot 2.1.1版本; 2、代码实现 1、pom.xml文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns=...
  • 一、Storm集成HDFS 1.1 项目结构 本用例源码下载地址:storm-hdfs-integration 1.2 项目主要依赖 项目主要依赖如下,有两个地方需要注意: 这里由于我服务器上安装的是CDH版本的Hadoop,在导入依赖时...
  • storm的组件可以和HDFS系统进行交互。 使用: 以下例子用竖线 | 作为分隔符,把文件写到HDFS的路径上去,一次同步每一千个tuple.每5MB滚动一次文件。 先在HDFS上创建路径 hadoop fs -mkdir /storm_write_hdfs ...
  • 一、Storm集成HDFS 1.1 项目结构 本用例源码下载地址:storm-hdfs-integration 1.2 项目主要依赖 项目主要依赖如下,有两个地方需要注意: 这里由于我服务器上安装的是 CDH 版本的 Hadoop,在导入依赖时引入的...
  • Hadoop+idea+maven开发配置

    千次阅读 2016-10-26 20:24:09
    idea是java开发的利器之一,其自身集成maven,十分适用于大型项目的开发。大多数的Hadoop程序都是在idea下开发的,本文介绍了Hadoop+idea+maven的环境配置与开发测试 准备 在ubuntu下安装Hadoop,具体细节可见...
  • Flume集成logback将日志写入HDFS

    万次阅读 2021-06-03 15:13:33
    Flume集成logback将日志写入HDFSFlume 配置文件(Kafka代替Flume Channel)Spring Boot + logback集成Flume测试 Flume 配置文件(Kafka代替Flume Channel) flume-test-conf.properties # 组件命名 a1.sources = r1 a1...
  • idea 本地运行hdfs程序

    2019-08-25 18:51:34
    本地开发IDEA 开发hdfs api 1.先下载window环境编译好的Hadoop安装包,到安装目录下 注意一定得要有(winutils.exe)这个文件 下载地址:https://pan.baidu.com/s/1tcwFOjbQJlFKR-t5S_wxxw 2.跟配置jdk环境变量一样...
  • 上面的例子是使用main方法测试的,如果整合到springMVC,通过页面发ajax请求执行呢?主要还是pom.xml,很容易就因为包冲突报错。经过删减,只留下了相关的jar包,...http://maven.apache.org/POM/4.0.0" xmln...
  • springboot集成hadoop实战

    2020-12-01 19:41:41
    springboot集成hadoop实现hdfs增删改查 maven坐标 <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop....
  • 一、配置windows下的Hadoop包 下载Hadoop安装包(可将之前在linux下安装的压缩包直接解压用) ...三、使用Idea创建Maven工程 创建好后,导入依赖: <dependencies> <!-- https://mvnrepository.co
  • 1. Hue概述及版本下载 1)概述  Hue是一个开源的Apache Hadoop UI系统,...通过使用Hue我们可以在浏览器端的Web控制台上与Hadoop集群进行交互来分析处理数据,例如操作HDFS上的数据,运行MapReduce Job等等。 2)...
  • 这里写自定义目录标题...无法下载org.apache.sqoop:sqoop:1.4.7问题4:log4j的依赖冲突问题5:执行本地MapReduce时遇到NoClassDefFoundError问题6: 程序包org.apache.hadoop.io不存在问题7:无权访问HDFS目录问题8:...
  • 本人去年的时候一直对maven项目很头疼,由于在构建hadoop项目时涉及到很多版本冲突方面的问题,但是在今年的开发中将很多问题得以解决。这一次,将本人的经验得以总结,为大家讲解一下用maven构建hadoop项目的具体...
  • HDFS JavaAPI

    2018-09-17 20:34:51
    利用java api实现HDFS增删查改文件和文件目录,运行代码首先eclipse集成hadoop环境 maven项目pom.xml导入相关依赖
  • Springboot整合操作HDFS的HA集群

    千次阅读 2019-05-13 16:38:59
    Springboot整合操作HDFS的HA集群 一、pom依赖及踩坑 新建一个springboot项目,添加以下依赖 <dependencies> <dependency> <groupId>org.springframework.data</groupId> <...
  • 面试

    千次阅读 2018-08-14 14:28:10
    底层的数据存取传统上是使用关系型数据库,可以是MySQL、Oracle、SQLServer、DB2等,随着大数据时代的来临,也可以采用NoSQL(如MongoDB、MemBase、BigTable等)和其他大数据存取方案(如GFS、HDFS等);项目的开发...
  • eclipse集成maven

    2017-11-06 22:08:30
    1、安装maven [tom@blue01 modules]$ tar zxvf /opt/softwares/apache-maven-3.0.5-bin.tar.gz 配置MAVEN_HOME:# vi /etc/profile (root用户) MAVEN_HOME=/opt/modules/apache-maven-3.0.5 export PATH=$...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,318
精华内容 1,327
关键字:

maven集成hdfs