精华内容
下载资源
问答
  • 针对windows系统,有一些连接HDFS进行文档管理的工具,比如说HDFS+Explorer。 但是我的是mac系统,我找了很久也没找到比较好用的工具,因此自己利用java开发了一个小工具,可以兼容windows平台和mac、Linux平台使用...
  • import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException;...import java.io.InputStream;...import java.io.InputStreamReader;...import java
    import net.sf.json.JSONObject;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.*;
    
    public class HDFSOperating {
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        /**
         * @param webhdfs
         * @param stream       the InputStream of file to upload
         * @param hdfsFilePath
         * @param op
         * @param parameters
         * @param method
         * @throws IOException
         */
        public void uploadFile(String webhdfs, InputStream stream, String hdfsFilePath, String op, Map<String, String> parameters, String method) throws IOException {
            HttpURLConnection con;
            try {
                con = getConnection(webhdfs, hdfsFilePath, op, parameters, method);
    
                byte[] bytes = new byte[1024];
                int rc = 0;
                while ((rc = stream.read(bytes, 0, bytes.length)) > 0)
                    con.getOutputStream().write(bytes, 0, rc);
                con.getInputStream();
                con.disconnect();
            } catch (IOException e) {
                logger.info(e.getMessage());
                e.printStackTrace();
            }
            stream.close();
        }
    
    
        /**
         * @param webhdfs
         * @param hdfsFilePath
         * @param op
         * @param parameters
         * @param method
         * @throws IOException
         */
        public Map<String, Object> getFileStatus(String [] webhdfs, String hdfsFilePath, String op, Map<String, String> parameters, String method) {
            Map<String, Object> fileStatus = new HashMap<String, Object>();
            HttpURLConnection connection  =null;
    
            for (String url:webhdfs){
                try {
                    HttpURLConnection conn = getConnection(url,hdfsFilePath,op,parameters,method);
                    if (conn.getInputStream() != null){
                        connection = conn;
                        break;
                    }
                }catch (IOException e){
                    logger.error("");
                }
            }
    
    
            StringBuffer sb = new StringBuffer();
            try {
                InputStream is = connection.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
                String line = null;
                while ((line = reader.readLine()) != null) {
                    sb.append(line);
                }
                reader.close();
                System.out.println(sb.toString());
                JSONObject root = JSONObject.fromObject(sb.toString());
                JSONObject status = root.getJSONObject("FileStatus");
                Iterator keys = status.keys();
    
                while (keys.hasNext()) {
                    String key = keys.next().toString();
                    String value = status.get(key).toString();
                    fileStatus.put(key, value);
                }
    //            is.close();
            }catch (IOException e){
                logger.error(Constants.EXCEPTION_WAS_CAUGHT,e);
            }catch (NullPointerException e){
                logger.error(Constants.EXCEPTION_WAS_CAUGHT,e);
            }
    
            return fileStatus;
        }
    
        /**
         * @param strurl     webhdfs like http://ip:port/webhdfs/v1 ,port usually 50070 or 14000
         * @param path       hdfs path + hdfs filename  eg:/user/razor/readme.txt
         * @param op         the operation for hdfsFile eg:GETFILESTATUS,OPEN,MKDIRS,CREATE etc.
         * @param parameters other parameter if you need
         * @param method     method eg: GET POST PUT etc.
         * @return
         */
        public HttpURLConnection getConnection(String strurl, String path, String op, Map<String, String> parameters, String method) {
            URL url = null;
            HttpURLConnection con = null;
            StringBuffer sb = new StringBuffer();
            try {
                sb.append(strurl);
                sb.append(path);
                sb.append("?op=");
                sb.append(op);
                if (parameters != null) {
                    for (String key : parameters.keySet())
                        sb.append("&").append(key + "=" + parameters.get(key));
                }
                url = new URL(sb.toString());
                con = (HttpURLConnection) url.openConnection();
                con.setRequestMethod(method);
                con.setRequestProperty("accept", "*/*");
                con.setRequestProperty("connection", "Keep-Alive");
                String s = "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)";
                String s1 = "ozilla/4.0 (compatible; MSIE 5.0; Windows NT; DigExt)";
                con.setRequestProperty("User-Agent", s1);
    //            con.setRequestProperty("Accept-Encoding", "gzip");
    //            con.setDoInput(true);
                con.setDoOutput(true);
                con.setUseCaches(false);
            } catch (IOException e) {
                logger.error(Constants.EXCEPTION_WAS_CAUGHT, e);
            }
            return con;
        }
    }
    
    

    参考链接:https://blog.csdn.net/hochoy/article/details/80435659

    展开全文
  • 本文主要参考了Hadoop HDFS文件系统通过java FileSystem 实现上传下载等,并实际的做了一下验证。代码与引用的文章差别不大,现列出来作为备忘。 import java.io.*; import java.net.URI; import org.apache....

    本文主要参考了Hadoop HDFS文件系统通过java FileSystem 实现上传下载等,并实际的做了一下验证。代码与引用的文章差别不大,现列出来作为备忘。

    
    import java.io.*;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.util.Progressable;
    import org.junit.Test;
    
    public class HdfsDemo {
    
        private static String SOURCE_PATH = "C:\\settings.jar";
        private static String DEST_PATH = "/hbase/upload/settings.jar";
        private static String MASTER_URI = "hdfs://192.168.209.129:9000";
    
        /**
         * 测试上传文件
         * 如果下述代码抛出异常:
         * org.apache.hadoop.security.AccessControlException: Permission denied: user=xxx, access=WRITE。
         * 则表明上传文件的用户没有权限。解决方式如下:
         * 1.配置环境变量HADOOP_USER_NAME, 并设置值为有访问hadoop集群有权限的用户,比如:hadoop。源码调试发现hadoop会首先从环境变量中去查找上传文件的用户名。查找不到会议pc的名字作为用户名。
         * 2.赋予hdfs上的文件夹/hbase/upload/读写权限。 命令是:hadoop fs -chmod 777  /hbase/upload。这样,可以保证上传文件到该目录成功,但上传到其他目录依然无权限。
         *
         * @throws Exception
         * @author lihong10 2017年12月9日 上午10:42:30
         */
        @Test
        public void testUpload() throws Exception {
    
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI(MASTER_URI), conf);
            InputStream in = new FileInputStream(SOURCE_PATH);
            OutputStream out = fs.create(new Path(DEST_PATH), new Progressable() {
                @Override
                public void progress() {
                    System.out.println("上传完一个设定缓存区大小容量的文件!");
                }
            });
            IOUtils.copyBytes(in, out, conf);
    
          /*byte[] buffer = new byte[1024];
            int len = 0;
    		while((len=in.read(buffer))>0){
    			out.write(buffer, 0, len);
    		} 
    		out.flush();
    		in.close();
    		out.close();*/
        }
    
    
        /**
         * 测试下载文件
         * @throws Exception
         */
        @Test
        public void testDownload() throws Exception {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI(MASTER_URI), conf);
    //		InputStream in = new FileInputStream("e:\\hadoop.avi");
    //		OutputStream out = fs.create(new Path("/demo/b.avi"));
            InputStream in = fs.open(new Path(DEST_PATH));
            OutputStream out = new FileOutputStream("E:\\XXXX.jar");
            IOUtils.copyBytes(in, out, conf);
    
          /*byte[] buffer = new byte[1024];
    		int len = 0;
    		while((len=in.read(buffer))>0){
    			out.write(buffer, 0, len);
    		} 
    		out.flush();
    		in.close();
    		out.close();*/
        }
    
        /**
         * 复制文件到远程文件系统,也可以看做是上传
         * @param conf
         * @param uri
         * @param local
         * @param remote
         * @throws IOException
         */
        public static void copyFile(Configuration conf, String uri, String local, String remote) throws IOException {
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            fs.copyFromLocalFile(new Path(local), new Path(remote));
            System.out.println("copy from: " + local + " to " + remote);
            fs.close();
        }
    
        @Test
        public void testCopyFile() throws IOException {
            // 使用命令:hadoop fs -ls  /hbase/upload, 可以查看复制到/hbase/upload目录下的文件
            copyFile(new Configuration(), MASTER_URI, SOURCE_PATH, "/hbase/upload/settings01.jar");
        }
    
        /**
         * 在hdfs文件系统下创建目录
         * @param conf
         * @param uri
         * @param remoteFile
         * @throws IOException
         */
        public static void makeDir(Configuration conf, String uri, String remoteFile) throws IOException {
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            Path path = new Path(remoteFile);
    
            fs.mkdirs(path);
            System.out.println("创建文件夹" + remoteFile);
        }
    
        @Test
        public void testMakeDir() throws IOException {
            //可以通过:hadoop fs -ls  /hbase/upload。   查看创建的目录subdir01
            makeDir(new Configuration(), MASTER_URI, "/hbase/upload/subdir01");
        }
    
    
        /**
         * 删除文件
         * @param conf
         * @param uri
         * @param filePath
         * @throws IOException
         */
        public static void delete(Configuration conf, String uri, String filePath) throws IOException {
            Path path = new Path(filePath);
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            fs.deleteOnExit(path);
            System.out.println("Delete: " + filePath);
            fs.close();
        }
    
        @Test
        public void testDelete() throws IOException {
            delete(new Configuration(), MASTER_URI, "/hbase/upload/settings01.jar");
        }
    
    
        /**
         * 查看文件内容
         * @param conf
         * @param uri
         * @param remoteFile
         * @throws IOException
         */
        public static void cat(Configuration conf, String uri, String remoteFile) throws IOException {
            Path path = new Path(remoteFile);
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            FSDataInputStream fsdis = null;
            System.out.println("cat: " + remoteFile);
            try {
                fsdis = fs.open(path);
                IOUtils.copyBytes(fsdis, System.out, 4096, false);
            } finally {
                IOUtils.closeStream(fsdis);
                fs.close();
            }
        }
    
        @Test
        public void testCat() throws IOException {
            //hbase/hbase.version 这个文件已经存在
            cat(new Configuration(), MASTER_URI, "/hbase/hbase.version");
        }
    
        /**
         * 查看目录下面的文件
         * @param conf
         * @param uri
         * @param folder
         * @throws IOException
         */
        public static void ls(Configuration conf, String uri, String folder) throws IOException {
            Path path = new Path(folder);
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            FileStatus[] list = fs.listStatus(path);
            System.out.println("ls: " + folder);
            System.out.println("==========================================================");
            for (FileStatus f : list) {
                System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDirectory(), f.getLen());
            }
            System.out.println("==========================================================");
            fs.close();
        }
    
        @Test
        public void testLs() throws IOException {
            ls(new Configuration(), MASTER_URI, "/hbase/upload");
        }
    
    }
    


    展开全文
  • JAVA 上传本地文件HDFS

    千次阅读 2020-08-28 15:57:53
    上传文件 上传的时候加个一个文件夹路径ypp 读取文件 步入正题(代码) 本地安装的hadoop版本为3.1.3 pom.xml <properties> <java.version>1.8</java.version> <spark.version>...

    步入正题

    本地安装的hadoop版本为3.1.3

    pom.xml

    <properties>
            <java.version>1.8</java.version>
            <spark.version>3.0.0</spark.version>
    
            <scala.version>2.12.10</scala.version>
            <scala.binary.version>2.12</scala.binary.version>
            <hive.version>2.3.7</hive.version>
            <hadoop.version>3.1.1</hadoop.version>
            <kafka.version>2.4.1</kafka.version>
            <zookeeper.version>3.4.14</zookeeper.version>
            <guava.version>14.0.1</guava.version>
        </properties>
    <!--spark框架开始-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--
            Add joda time to ensure that anything downstream which doesn't pull in spark-hive
            gets the correct joda time artifact, so doesn't have auth failures on later Java 8 JVMs
            -->
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--spark框架结束-->
            <!--hadoop开始-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!--hadoop结束-->


    HDFSController

    package com.example.controller.hdfs;
    
    import com.example.utils.hdfs.HDFSUtils;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.multipart.MultipartFile;
    
    /**
     * @ClassName HDFSController
     * @Date 2020/8/26 11:41
     */
    @RestController
    @RequestMapping("/spark/hdfs")
    public class HDFSController {
    
        @GetMapping("/read")
        public String readFile(String fileName) throws Exception {
            return HDFSUtils.readFile(fileName);
        }
    
        @PostMapping("/upload")
        public void upload(MultipartFile file) throws Exception {
            HDFSUtils.createFile("ypp",file);
        }
    }
    

    工具类HDFSUtils

    package com.example.utils.hdfs;
    
    import com.alibaba.fastjson.JSONObject;
    import com.example.utils.properties.HDFSPropertiesUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.springframework.web.multipart.MultipartFile;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @ClassName HDFSUtils
     * @Date 2020/8/26 15:25
     */
    public class HDFSUtils {
    
        private static String hdfsPath;
        private static String hdfsName;
        private static final int bufferSize = 1024 * 1024 * 64;
    
        static {
            //设置成自己的
            hdfsPath= HDFSPropertiesUtils.getPath();
            hdfsName=HDFSPropertiesUtils.getUserName();
        }
    
        /**
         * 获取HDFS配置信息
         * @return
         */
        private static Configuration getConfiguration() {
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", hdfsPath);
            configuration.set("HADOOP_USER_NAME",hdfsName);
            return configuration;
        }
    
        /**
         * 获取HDFS文件系统对象
         * @return
         * @throws Exception
         */
        public static FileSystem getFileSystem() throws Exception {
            /*
            //通过这种方式设置java客户端访问hdfs的身份:会以 ypp 的身份访问 hdfs文件系统目录下的路径:/user/ypp 的目录
            System.setProperty("HADOOP_USER_NAME","ypp");
            Configuration configuration = new Configuration();
            configuration.set("fs.defauleFS","hdfs://ypp:9090");
            FileSystem fileSystem = FileSystem.get(configuration);
             */
    
            /*
            客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份
            也可以在构造客户端fs对象时,通过参数传递进去
            FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
            */
            FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration());
            return fileSystem;
        }
    
        /**
         * 在HDFS创建文件夹
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean mkdir(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            if (existFile(path)) {
                return true;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            boolean isOk = fs.mkdirs(srcPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 判断HDFS文件是否存在
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean existFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            Path srcPath = new Path(path);
            boolean isExists = fs.exists(srcPath);
            return isExists;
        }
    
        /**
         * 读取HDFS目录信息
         * @param path
         * @return
         * @throws Exception
         */
        public static List<Map<String, Object>> readPathInfo(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path newPath = new Path(path);
            FileStatus[] statusList = fs.listStatus(newPath);
            List<Map<String, Object>> list = new ArrayList<>();
            if (null != statusList && statusList.length > 0) {
                for (FileStatus fileStatus : statusList) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("filePath", fileStatus.getPath());
                    map.put("fileStatus", fileStatus.toString());
                    list.add(map);
                }
                return list;
            } else {
                return null;
            }
        }
    
        /**
         * HDFS创建文件
         * @param path
         * @param file
         * @throws Exception
         */
        public static void createFile(String path, MultipartFile file) throws Exception {
            if (StringUtils.isEmpty(path) || null == file.getBytes()) {
                return;
            }
            String fileName = file.getOriginalFilename();
            FileSystem fs = getFileSystem();
            // 上传时默认当前目录,后面自动拼接文件的目录
            Path newPath = new Path(path + "/" + fileName);
            // 打开一个输出流
            FSDataOutputStream outputStream = fs.create(newPath);
            outputStream.write(file.getBytes());
            outputStream.close();
            fs.close();
        }
    
        /**
         * 读取HDFS文件内容
         * @param path
         * @return
         * @throws Exception
         */
        public static String readFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            FSDataInputStream inputStream = null;
            try {
                inputStream = fs.open(srcPath);
                // 防止中文乱码
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                String lineTxt = "";
                StringBuffer sb = new StringBuffer();
                while ((lineTxt = reader.readLine()) != null) {
                    sb.append(lineTxt);
                }
                return sb.toString();
            } finally {
                inputStream.close();
                fs.close();
            }
        }
    
        /**
         * 读取HDFS文件列表
         * @param path
         * @return
         * @throws Exception
         */
        public static List<Map<String, String>> listFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
    
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            // 递归找到所有文件
            RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(srcPath, true);
            List<Map<String, String>> returnList = new ArrayList<>();
            while (filesList.hasNext()) {
                LocatedFileStatus next = filesList.next();
                String fileName = next.getPath().getName();
                Path filePath = next.getPath();
                Map<String, String> map = new HashMap<>();
                map.put("fileName", fileName);
                map.put("filePath", filePath.toString());
                returnList.add(map);
            }
            fs.close();
            return returnList;
        }
    
        /**
         * HDFS重命名文件
         * @param oldName
         * @param newName
         * @return
         * @throws Exception
         */
        public static boolean renameFile(String oldName, String newName) throws Exception {
            if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            // 原文件目标路径
            Path oldPath = new Path(oldName);
            // 重命名目标路径
            Path newPath = new Path(newName);
            boolean isOk = fs.rename(oldPath, newPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 删除HDFS文件
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean deleteFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            if (!existFile(path)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            Path srcPath = new Path(path);
            boolean isOk = fs.deleteOnExit(srcPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 上传HDFS文件
         * @param path
         * @param uploadPath
         * @throws Exception
         */
        public static void uploadFile(String path, String uploadPath) throws Exception {
            if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 上传路径
            Path clientPath = new Path(path);
            // 目标路径
            Path serverPath = new Path(uploadPath);
    
            // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
            fs.copyFromLocalFile(false, clientPath, serverPath);
            fs.close();
        }
    
        /**
         * 下载HDFS文件
         * @param path
         * @param downloadPath
         * @throws Exception
         */
        public static void downloadFile(String path, String downloadPath) throws Exception {
            if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 上传路径
            Path clientPath = new Path(path);
            // 目标路径
            Path serverPath = new Path(downloadPath);
    
            // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
            fs.copyToLocalFile(false, clientPath, serverPath);
            fs.close();
        }
    
        /**
         * HDFS文件复制
         * @param sourcePath
         * @param targetPath
         * @throws Exception
         */
        public static void copyFile(String sourcePath, String targetPath) throws Exception {
            if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 原始文件路径
            Path oldPath = new Path(sourcePath);
            // 目标路径
            Path newPath = new Path(targetPath);
    
            FSDataInputStream inputStream = null;
            FSDataOutputStream outputStream = null;
            try {
                inputStream = fs.open(oldPath);
                outputStream = fs.create(newPath);
    
                IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);
            } finally {
                inputStream.close();
                outputStream.close();
                fs.close();
            }
        }
    
        /**
         * 打开HDFS上的文件并返回byte数组
         * @param path
         * @return
         * @throws Exception
         */
        public static byte[] openFileToBytes(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            try {
                FSDataInputStream inputStream = fs.open(srcPath);
                return IOUtils.readFullyToByteArray(inputStream);
            } finally {
                fs.close();
            }
        }
    
        /**
         * 打开HDFS上的文件并返回java对象
         * @param path
         * @return
         * @throws Exception
         */
        public static <T extends Object> T openFileToObject(String path, Class<T> clazz) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            String jsonStr = readFile(path);
            return JSONObject.parseObject(jsonStr, clazz);
        }
    
        /**
         * 获取某个文件在HDFS的集群位置
         * @param path
         * @return
         * @throws Exception
         */
        public static BlockLocation[] getFileBlockLocations(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            FileStatus fileStatus = fs.getFileStatus(srcPath);
            return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
        }
    
    }
    

    注意容易出现权限问题,由于开发阶段我都是用的自己windows安装的,HADOOP_USER_NAME直接用的administrator

     

    文件内容

    上传文件

    上传文件 hadoop namenode日志

    查看HDFS文件

    hadoop fs -ls /user/ypp/ypp #查看指定目录下的文件和文件夹。/user/ypp/ypp 是HDFS上的目录,不是本地目录,命令不分操作系统

    查看文件内容

    hadoop fs -cat /user/ypp/ypp/111.txt #查看文件内容,这里涉及到中文乱码了(后面补充解决),同样是HDFS上的目录,不是本地目录,命令不分操作系统

    读取文件内容

    展开全文
  • 通过java编程实现了远程HDFS文件创建,上传,下载,删除等。Hadoop类库中最终面向用户提供的接口类是FileSystem,该类封装了几乎所有的文件操作,例如CopyToLocalFile、CopyFromLocalFile、mkdir及delete等。

      HDFS(Hadoop Distributed File System)是Hadoop项目的核心子项目,是分布式计算中数据存储管理的基础篇,为了实现本地与HDFS的文件传输,主要借助Eclipse开发环境,通过java编程实现了远程HDFS的文件创建,上传,下载,删除等。

       其实对HDSF的文件操作主要有两种方式:命令行的方式JavaAPI的方式。命令行的方式简单直接,但是必须要求本地机器也是在Linux系统中已经安装了hadoop,这对习惯用windows系统的用户来说不得不安装虚拟机,然后再在虚拟机上安装Linux系统,这是一种挑。同时windows系统与虚拟机上安装的Linux系统进行文件传输也是要借助一些工具才可以实现。

       为了实现以上所遇到诸如系统不一致,手动输入命令的困扰,我们选择Java API的方式,专门的API函数,可以在非Hadoop机器上实现访问,同时与系统无关(windows、Linux甚至XP系统也可以)。Hadoop中关于文件操作类基本上全部是在"org.apache.hadoop.fs"包中,Hadoop类库中最终面向用户提供的接口类是FileSystem,该类封装了几乎所有的文件操作,例如CopyToLocalFile、CopyFromLocalFile、mkdir及delete等。综上基本上可以得出操作文件的程序库框架:

    operator( ) {

          得到Configuration对象 

         得到FileSystem对象 

         进行文件操作 }

    具体的HDFS的文件创建,上传,下载,删除等程序设计如下:

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;


    public class HDFSTest01 {

    /**
    * @author dcx by 2015.11.19
    * 新建文件 
    * @param dsta
    * @param conf
    * @return
    */
    public static boolean CreatDir(String dst , Configuration conf){
    Path dstPath = new Path(dst) ;
    try{
    FileSystem dhfs = FileSystem.get(conf);
           dhfs.mkdirs(dstPath);
    }
    catch(IOException ie){
    ie.printStackTrace() ;
    return false ;
    }
    return true ;
    }
     

    /**
    * @author dcx by 2015.11.19
    * 文件上传
    * @param src 
    * @param dst
    * @param conf
    * @return
    */
    public static boolean putToHDFS(String src , String dst , Configuration conf){
    Path dstPath = new Path(dst) ;
    try{
    FileSystem hdfs = dstPath.getFileSystem(conf) ;
    hdfs.copyFromLocalFile(false, new Path(src), dstPath) ;
    }
    catch(IOException ie){
    ie.printStackTrace() ;
    return false ;
    }
    return true ;
    }

    /**
    *  @author dcx by 2015.11.19
    * 文件下载
    * @param src
    * @param dst
    * @param conf
    * @return
    */
    public static boolean getFromHDFS(String src , String dst , Configuration conf){
    Path dstPath = new Path(dst) ;
    try{
    FileSystem dhfs = dstPath.getFileSystem(conf) ;
    dhfs.copyToLocalFile(false, new Path(src), dstPath) ;
    }catch(IOException ie){
    ie.printStackTrace() ;
    return false ;
    }
    return true ;
    }

     
    /**
    * @author dcx by 2015.11.19
    * 文件删除
    * @param path
    * @param conf
    * @return
    */
    public static boolean checkAndDel(final String path , Configuration conf){
    Path dstPath = new Path(path) ;
    try{
    FileSystem dhfs = dstPath.getFileSystem(conf) ;
    if(dhfs.exists(dstPath)){
    dhfs.delete(dstPath, true) ;
    }else{
    return false ;
    }
    }catch(IOException ie ){
    ie.printStackTrace() ;
    return false ;
    }
    return true ;
    }




    /**
    * @param 主函数测试
    */
    public static void main(String[] args) {

    boolean status = false ;
    String dst1 = "hdfs://192.168.1.225:9000/EBLearn_data/new" ;
    Configuration conf = new Configuration() ;
     
    //java.lang.IllegalArgumentException: Wrong FS:            hdfs://192.168.1.225:9000/EBLearn_data/hello.txt, expected: file:///
        //解决这个错误的两个方案:
    //方案1:下面这条命令必须加上,否则出现上面这个错误
    conf.set("fs.default.name", "hdfs://192.168.1.225:9000"); // "hdfs://master:9000"  
        //方案2: 将core-site.xml 和hdfs-site.xml放入当前工程中
       status = CreatDir( dst1 ,  conf) ;
       System.out.println("status="+status) ;

       String dst = "hdfs://192.168.1.225:9000/EBLearn_data" ;
    String src = "I:/hello.txt" ;

       status = putToHDFS( src ,  dst ,  conf) ;
    System.out.println("status="+status) ;
        
    src = "hdfs://192.168.1.225:9000/EBLearn_data/hello.txt" ;
    dst = "I:/hadoop_need/" ;
    status = getFromHDFS( src ,  dst ,  conf) ;
    System.out.println("status="+status) ;
     
    dst = "hdfs://192.168.1.225:9000/EBLearn_data/hello.txt" ;
    status = checkAndDel( dst ,  conf) ;
    System.out.println("status="+status) ;
    }




    }

    展开全文
  • java实现对hdfs文件上传与下载

    千次阅读 2019-02-21 15:08:24
    hdfs上下载到本地 /** *src:hdfs路径 例:hdfs://192.168.0.168:9000/data/a.json *dst:本地路径 */ public static void Hdfs2Local(String src, String dst) { try { Configuration conf = new Con...
  • 2、windows上的权限系统和linux上的权限系统,测试期间为了简单起见可以关闭权限检查 在namenode的hdfs-site.xml上,添加配置: property> name>dfs.permissions.enabledname> value>falsevalue> property> 3...
  • 019-07-01 16:45:24,933 INFO org.apache.hadoop.ipc.Server: IPC Server handler 2 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock from 58.211.111.42:63048 Call#3 Retry#0java.io.IOEx...
  • 搭建了一个Hadoop的环境,Hadoop集群环境部署在几个Linux服务器上,现在想使用...(如果想看最终解决问题的方法拉到最后,如果想看我的问题解决思路请从上向下看)问题描述上传文件的代码:privatestaticvoiduploadT...
  • JAVA操作HDFS文件系统 前言:上篇文章介绍了如何利用Shell去操作HDFS中的文件,本文介绍使用Java代码去操作HDFS中的文件,它的操作内容和shell的操作内容和方法基本一致,开发集成工具选择IDEA。 一、新建Maven...
  • Java操作hdfs文件系统

    千次阅读 2021-12-26 16:24:11
    在上一篇,我们通过命令行的使用掌握了如何基于hdfs的命令对hdfs文件系统的常用操作,本篇将分享如何基于JavaAPI 操作hdfs文件系统 前置准备 默认服务器上的hadoop服务已经启动 本地如果是windows环境,需要本地...
  • 利用JavaAPI访问HDFS文件1、重读配置文件core-site.xml要利用Java客户端来存取HDFS上的文件,不得不说的是配置文件hadoop-0.20.2/conf/core-site.xml了,最初我就是在这里吃了大亏,所以我死活连不上HDFS文件...
  • java读写hdfs文件

    2021-03-03 12:52:03
    前言最近接触了分布式文件存储系统FastDFS,但FastDFS是底层是用C语言写的,因此安装的时候还...经过反复实践,基本能够远程连接hdfs进行文件的操作了,直接上代码。本示例的前提是先启动hdfs文件系统,关于如何搭建...
  • Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错...
  • Java远程操作HDFS文件系统

    千次阅读 2019-05-30 23:08:30
    目标:通过Java编程实现远程HDFS文件系统的增、删、改、查,并且解释原理。 一、准备工作 集群:搭建Hadoop集群,并且启动HDFS。详细过程可以参考:Hadoop-2.5.1安装步骤及异常处理 二、创建Maven工程 选择创建工程...
  • try { //输出流对象,将本地要上传文件读取到内存中 FileInputStream fileInputStream = new FileInputStream(fileName); try { byte[] buffer = new byte[2048]; int count = fileInputStream.read(buffer, 0, ...
  • 使用java代码连接HDFS

    2021-03-29 08:14:31
    前提是输入localhost:50070可以正常进入网页 Configuration conf=new Configuration(); try { FileSystem fs= FileSystem.get(new URI("hdfs://192.168.175.100:9000"),conf,"root");...//上传文件 .
  • HDFSJAVA上传下载简单实现

    千次阅读 2019-04-01 19:30:38
    1.HDFSJava基本操作非常重要 2.Haoop的常用shell操作 1.hadoop fs -ls hdfs://haoop1:8020/目录 2.hadoop fs -copyFromLocal|put 文件地址 目的目录 3.hadoop fs -copyToLocal|get 文件地址 目的目录 ...
  • 要利用Java客户端来存取HDFS上的文件,不得不说的是配置文件hadoop-0.20.2/conf/core-site.xml了,最初我就是在这里吃了大亏,所以我死活连不上HDFS文件无法创建、读取。hadoop.tmp.dir/home/zhangzk/hadoopA base...
  • Hadoop hdfs上传文件报错解决

    千次阅读 2020-10-06 09:10:48
    无论是启动,还是以后会经常用到的MapReduce中的每一个job,以及HDFS等相关信息,Hadoop均存有日志文件以供分析。 报错命令: 该错误产生的原因: NameNode和DataNode的namespaceID不一致,这个错误是很多人在安装...
  • java操作hdfs上的文件及文件夹

    千次阅读 2019-02-13 15:38:26
    Java抽象类org.apache.hadoop.fs.FileSystem定义了hadoop的一个文件系统接口。该类是一个抽象类,通过以下两种静态工厂方法可以过去FileSystem实例:  public static FileSystem.get(Configuratio...
  • 记录一下Java API 连接hadoop操作hdfs的实现流程(使用连接池管理)。以前做过这方面的开发,本来以为不会有什么问题,但是做的还是坑坑巴巴,内心有些懊恼,记录下这烦人的过程,警示自己切莫眼高手低!一:引入相关...
  • 通过JAVA API上传文件HDFS

    千次阅读 2017-12-29 14:43:41
    hdfs操作注意事项  注意关闭虚拟机防火墙设置如果开启了翻墙代理,注意调整到本地模式在core-site.xml中配置时,不要使用localhost,而应该使用虚拟机IP地址,否则无法连接到虚拟机hadoop如下权限不够: 修改...
  • 我想用JAVA实现HDFS文件读写,所以在maven项目中写了如下的测试方法: @Test public void testCopyFromLocalFile() throws URISyntaxException, IOException, InterruptedException { //1、创建配置文件 ...
  • 只有光头才能变强!...springboot上传下载文件(3)--java api 操作HDFS集群+集群配置 springboot上传下载文件(4)--上传下载工具类(已封装) 前一篇文章讲了nginx+ftp搭建独立的文件服务器 ...
  • 使用java上传文件HDFS,需要读取哪些配置文件呢,一直拒接连接,请指教.一月 14, 2016 3:41:28 下午 org.apache.hadoop.util.NativeCodeLoader 警告: Unable to load native-hadoop library for your platform......
  • java连接hadoop的hdfs

    2021-02-12 18:02:52
    import java.io.BufferedInputStream;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 18,405
精华内容 7,362
关键字:

java连接hdfs上传文件

java 订阅