精华内容
下载资源
问答
  • JAVA上传文件到HDFS

    2018-01-08 16:37:26
      package ... import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop....

     

    package com.hqgf.testhdfs;
    
    import java.io.IOException;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class Hdfs {
    	static Configuration conf = new Configuration();
    
        static {
            conf.set("fs.defaultFS", "hdfs://cluster");
            conf.set("dfs.nameservices", "cluster");
            conf.set("dfs.ha.namenodes.cluster", "nn1,nn2");
            conf.set("dfs.namenode.rpc-address.cluster.nn1", "xxx.xx.x.xxx:8020");
            conf.set("dfs.namenode.rpc-address.cluster.nn2", "xxx.xx.x.xxx:8020");
            conf.set("dfs.client.failover.proxy.provider.cluster"
                    ,"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        }
        
        public static void  getDirectoryFromHdfs(String direPath){
            try {
                FileSystem fs = FileSystem.get(URI.create(direPath),conf);
                FileStatus[] filelist = fs.listStatus(new Path(direPath));
                for (int i = 0; i < filelist.length; i++) {
                    System.out.println("_________" + direPath + "目录下所有文件______________");
                    FileStatus fileStatus = filelist[i];
                    System.out.println("Name:"+fileStatus.getPath().getName());
                    System.out.println("Size:"+fileStatus.getLen());
                    System.out.println("Path:"+fileStatus.getPath());
                }
                fs.close();
            } catch (Exception e){
    
            }
        }
    
        public static void uploadFile(String src,String dst) throws IOException{
            
            FileSystem fs = FileSystem.get(conf);
            Path srcPath = new Path(src); //本地上传文件路径
            Path dstPath = new Path(dst); //hdfs目标路径
            //调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
            fs.copyFromLocalFile(false, srcPath, dstPath);
    
            //打印文件路径
            System.out.println("Upload to "+conf.get("fs.default.name"));
            System.out.println("------------list files------------"+"\n");
            FileStatus [] fileStatus = fs.listStatus(dstPath);
            for (FileStatus file : fileStatus)
            {
                System.out.println(file.getPath());
            }
            fs.close();
        }
        
        public static void main(String[] args) throws IOException {
    
    //        String localFilePath = "D:\\Project\\eclipse\\workspace\\DataCenterChanger\\test\\20180108.txt";
    //        String hdfsFilePath = "/tmp/";
    //        System.out.println(localFilePath);
    //        System.out.println(hdfsFilePath);
    //        uploadFile(localFilePath,hdfsFilePath);
            getDirectoryFromHdfs("/tmp/20180108.txt");
    
        }
    }
    

     

     

    展开全文
  • java上传文件到HDFS

    千次阅读 2016-11-09 19:13:59
    java上传本地文件到HDFS。 1、pom依赖 <groupId>org.apache.hadoop <artifactId>hadoop-common <version>2.6.0-cdh5.5.1 </dependency><dependency> <groupId>or

    java上传本地文件到HDFS。
    1、pom依赖

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0-cdh5.5.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.6.0-cdh5.5.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-maven-plugins</artifactId>
        <version>2.6.0-cdh5.5.1</version>
    </dependency>

    2、java代码

    /**
     * Created by xxx on 2016/11/9.
     */
    
    import java.io.*;
    import java.net.URI;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class HdfsTest {
    
        static Configuration conf = new Configuration();
    
        static {
            conf.set("fs.defaultFS", "hdfs://nameservice1");
            conf.set("dfs.nameservices", "nameservice1");
            conf.set("dfs.ha.namenodes.nameservice1", "nn1,nn2");
            conf.set("dfs.namenode.rpc-address.nameservice1.nn1", "xxx:8020");
            conf.set("dfs.namenode.rpc-address.nameservice1.nn2", "xxx:8020");
            conf.set("dfs.client.failover.proxy.provider.nameservice1"
                    ,"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    
    
    //        conf.addResource("classpath:/hadoop/core-site.xml");
    //        conf.addResource("classpath:/hadoop/hdfs-site.xml");
    //        conf.addResource("classpath:/hadoop/mapred-site.xml");
        }
    
        //创建新文件
        public static void createFile(String dst , byte[] contents) throws IOException{
            FileSystem fs = FileSystem.get(conf);
            Path dstPath = new Path(dst); //目标路径
            //打开一个输出流
            FSDataOutputStream outputStream = fs.create(dstPath);
            outputStream.write(contents);
            outputStream.close();
            fs.close();
            System.out.println("文件创建成功!");
        }
    
        //上传本地文件
        public static void uploadFile(String src,String dst) throws IOException{
            //Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            Path srcPath = new Path(src); //本地上传文件路径
            Path dstPath = new Path(dst); //hdfs目标路径
            //调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
            fs.copyFromLocalFile(false, srcPath, dstPath);
    
            //打印文件路径
            System.out.println("Upload to "+conf.get("fs.default.name"));
            System.out.println("------------list files------------"+"\n");
            FileStatus [] fileStatus = fs.listStatus(dstPath);
            for (FileStatus file : fileStatus)
            {
                System.out.println(file.getPath());
            }
            fs.close();
        }
    
        //文件重命名
        public static void rename(String oldName,String newName) throws IOException{
            //Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            Path oldPath = new Path(oldName);
            Path newPath = new Path(newName);
            boolean isok = fs.rename(oldPath, newPath);
            if(isok){
                System.out.println("rename ok!");
            }else{
                System.out.println("rename failure");
            }
            fs.close();
        }
        //删除文件
        public static void delete(String filePath) throws IOException{
            //Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(filePath);
            boolean isok = fs.deleteOnExit(path);
            if(isok){
                System.out.println("delete ok!");
            }else{
                System.out.println("delete failure");
            }
            fs.close();
        }
    
        //创建目录
        public static void mkdir(String path) throws IOException{
            //Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            Path srcPath = new Path(path);
            boolean isok = fs.mkdirs(srcPath);
            if(isok){
                System.out.println("create " + path + " dir ok!");
            }else{
                System.out.println("create " + path + " dir failure");
            }
            fs.close();
        }
    
        //读取文件的内容
        public static void readFile(String filePath) throws IOException{
            //Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            Path srcPath = new Path(filePath);
            InputStream in = null;
            try {
                in = fs.open(srcPath);
                IOUtils.copyBytes(in, System.out, 4096, false); //复制到标准输出流
            } finally {
                IOUtils.closeStream(in);
            }
        }
    
        /**
         * 遍历指定目录(direPath)下的所有文件
         */
        public static void  getDirectoryFromHdfs(String direPath){
            try {
                FileSystem fs = FileSystem.get(URI.create(direPath),conf);
                FileStatus[] filelist = fs.listStatus(new Path(direPath));
                for (int i = 0; i < filelist.length; i++) {
                    System.out.println("_________" + direPath + "目录下所有文件______________");
                    FileStatus fileStatus = filelist[i];
                    System.out.println("Name:"+fileStatus.getPath().getName());
                    System.out.println("Size:"+fileStatus.getLen());
                    System.out.println("Path:"+fileStatus.getPath());
                }
                fs.close();
            } catch (Exception e){
    
            }
        }
    
    
        public static void main(String[] args) throws IOException {
            String today = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
    
            String localFilePath = "F:\\datafortag\\maimaimai\\quan-" + today;
            String hdfsFilePath = "/user/rec/maimaimai/upload_month=" + today.substring(0,7) + "/upload_date=" + today + "/";
            System.out.println(localFilePath);
            System.out.println(hdfsFilePath);
    
            //"/user/rec/maimaimai/upload_month=2016-11/upload_date=2016-11-09/"
            //1、遍历指定目录(direPath)下的所有文件
            //getDirectoryFromHdfs("/user/rec/maimaimai");
    
            //2、新建目录
            //mkdir(hdfsFilePath);
    
            //3、上传文件
            //uploadFile(localFilePath, hdfsFilePath);
            //getDirectoryFromHdfs(hdfsFilePath);
    
            //4、读取文件
            //readFile("/user/rec/maimaimai/quan-2016-11-09");
    
            //5、重命名
    //        rename("/user/rec/maimaimai/2016-11/2016-11-09/quan-2016-11-09", "/user/rec/maimaimai/2016-11/2016-11-09/quan-2016-11-08.txt");
    //        getDirectoryFromHdfs("/user/rec/maimaimai/2016-11/2016-11-09");
    
            //6、创建文件,并向文件写入内容
            //byte[] contents =  "hello world 世界你好\n".getBytes();
            //createFile("/user/rec/maimaimai/2016-11/2016-11-09/test.txt",contents);
            //readFile("/user/rec/maimaimai/2016-11/2016-11-09/test.txt");
    
            //7、删除文件
            //delete("/user/rec/maimaimai/quan-2016-11-08.txt"); //使用相对路径
            //delete("test1");    //删除目录
        }
    
    }
    
    展开全文
  • 上传文件到HDFS

    2015-10-30 19:43:35
    上传文件到HDFSjava代码实现。已经测试了,可以直接运行。
  • java上传文件到hdfs简单demo

    千次阅读 2016-10-09 21:03:17
    package com.lijie.uploadsingle...import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; i
    package com.lijie.uploadsingle;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class UploadSingle {
        public static void main(String[] args) throws URISyntaxException, IOException {
            Configuration conf = new Configuration();
            URI uri = new URI("hdfs://lijie:9000");
            FileSystem fs = FileSystem.get(uri,conf);
            Path resP = new Path("F://a.txt");
            Path destP = new Path("/aaaaaa");
            if(!fs.exists(destP)){
                fs.mkdirs(destP);
            }
            String name = "F://a.txt".substring("F://a.txt".lastIndexOf("/")+1, "F://a.txt".length());
            fs.copyFromLocalFile(resP, destP);
            System.out.println("name is " + name);
            fs.close();
        }
    }
    
    展开全文
  • JAVA 上传本地文件到HDFS

    千次阅读 2020-08-28 15:57:53
    上传文件 上传的时候加个一个文件夹路径ypp 读取文件 步入正题(代码) 本地安装的hadoop版本为3.1.3 pom.xml <properties> <java.version>1.8</java.version> <spark.version>...

    步入正题

    本地安装的hadoop版本为3.1.3

    pom.xml

    <properties>
            <java.version>1.8</java.version>
            <spark.version>3.0.0</spark.version>
    
            <scala.version>2.12.10</scala.version>
            <scala.binary.version>2.12</scala.binary.version>
            <hive.version>2.3.7</hive.version>
            <hadoop.version>3.1.1</hadoop.version>
            <kafka.version>2.4.1</kafka.version>
            <zookeeper.version>3.4.14</zookeeper.version>
            <guava.version>14.0.1</guava.version>
        </properties>
    <!--spark框架开始-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--
            Add joda time to ensure that anything downstream which doesn't pull in spark-hive
            gets the correct joda time artifact, so doesn't have auth failures on later Java 8 JVMs
            -->
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--spark框架结束-->
            <!--hadoop开始-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!--hadoop结束-->


    HDFSController

    package com.example.controller.hdfs;
    
    import com.example.utils.hdfs.HDFSUtils;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.multipart.MultipartFile;
    
    /**
     * @ClassName HDFSController
     * @Date 2020/8/26 11:41
     */
    @RestController
    @RequestMapping("/spark/hdfs")
    public class HDFSController {
    
        @GetMapping("/read")
        public String readFile(String fileName) throws Exception {
            return HDFSUtils.readFile(fileName);
        }
    
        @PostMapping("/upload")
        public void upload(MultipartFile file) throws Exception {
            HDFSUtils.createFile("ypp",file);
        }
    }
    

    工具类HDFSUtils

    package com.example.utils.hdfs;
    
    import com.alibaba.fastjson.JSONObject;
    import com.example.utils.properties.HDFSPropertiesUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.springframework.web.multipart.MultipartFile;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @ClassName HDFSUtils
     * @Date 2020/8/26 15:25
     */
    public class HDFSUtils {
    
        private static String hdfsPath;
        private static String hdfsName;
        private static final int bufferSize = 1024 * 1024 * 64;
    
        static {
            //设置成自己的
            hdfsPath= HDFSPropertiesUtils.getPath();
            hdfsName=HDFSPropertiesUtils.getUserName();
        }
    
        /**
         * 获取HDFS配置信息
         * @return
         */
        private static Configuration getConfiguration() {
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", hdfsPath);
            configuration.set("HADOOP_USER_NAME",hdfsName);
            return configuration;
        }
    
        /**
         * 获取HDFS文件系统对象
         * @return
         * @throws Exception
         */
        public static FileSystem getFileSystem() throws Exception {
            /*
            //通过这种方式设置java客户端访问hdfs的身份:会以 ypp 的身份访问 hdfs文件系统目录下的路径:/user/ypp 的目录
            System.setProperty("HADOOP_USER_NAME","ypp");
            Configuration configuration = new Configuration();
            configuration.set("fs.defauleFS","hdfs://ypp:9090");
            FileSystem fileSystem = FileSystem.get(configuration);
             */
    
            /*
            客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份
            也可以在构造客户端fs对象时,通过参数传递进去
            FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
            */
            FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration());
            return fileSystem;
        }
    
        /**
         * 在HDFS创建文件夹
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean mkdir(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            if (existFile(path)) {
                return true;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            boolean isOk = fs.mkdirs(srcPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 判断HDFS文件是否存在
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean existFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            Path srcPath = new Path(path);
            boolean isExists = fs.exists(srcPath);
            return isExists;
        }
    
        /**
         * 读取HDFS目录信息
         * @param path
         * @return
         * @throws Exception
         */
        public static List<Map<String, Object>> readPathInfo(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path newPath = new Path(path);
            FileStatus[] statusList = fs.listStatus(newPath);
            List<Map<String, Object>> list = new ArrayList<>();
            if (null != statusList && statusList.length > 0) {
                for (FileStatus fileStatus : statusList) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("filePath", fileStatus.getPath());
                    map.put("fileStatus", fileStatus.toString());
                    list.add(map);
                }
                return list;
            } else {
                return null;
            }
        }
    
        /**
         * HDFS创建文件
         * @param path
         * @param file
         * @throws Exception
         */
        public static void createFile(String path, MultipartFile file) throws Exception {
            if (StringUtils.isEmpty(path) || null == file.getBytes()) {
                return;
            }
            String fileName = file.getOriginalFilename();
            FileSystem fs = getFileSystem();
            // 上传时默认当前目录,后面自动拼接文件的目录
            Path newPath = new Path(path + "/" + fileName);
            // 打开一个输出流
            FSDataOutputStream outputStream = fs.create(newPath);
            outputStream.write(file.getBytes());
            outputStream.close();
            fs.close();
        }
    
        /**
         * 读取HDFS文件内容
         * @param path
         * @return
         * @throws Exception
         */
        public static String readFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            FSDataInputStream inputStream = null;
            try {
                inputStream = fs.open(srcPath);
                // 防止中文乱码
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                String lineTxt = "";
                StringBuffer sb = new StringBuffer();
                while ((lineTxt = reader.readLine()) != null) {
                    sb.append(lineTxt);
                }
                return sb.toString();
            } finally {
                inputStream.close();
                fs.close();
            }
        }
    
        /**
         * 读取HDFS文件列表
         * @param path
         * @return
         * @throws Exception
         */
        public static List<Map<String, String>> listFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
    
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            // 递归找到所有文件
            RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(srcPath, true);
            List<Map<String, String>> returnList = new ArrayList<>();
            while (filesList.hasNext()) {
                LocatedFileStatus next = filesList.next();
                String fileName = next.getPath().getName();
                Path filePath = next.getPath();
                Map<String, String> map = new HashMap<>();
                map.put("fileName", fileName);
                map.put("filePath", filePath.toString());
                returnList.add(map);
            }
            fs.close();
            return returnList;
        }
    
        /**
         * HDFS重命名文件
         * @param oldName
         * @param newName
         * @return
         * @throws Exception
         */
        public static boolean renameFile(String oldName, String newName) throws Exception {
            if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            // 原文件目标路径
            Path oldPath = new Path(oldName);
            // 重命名目标路径
            Path newPath = new Path(newName);
            boolean isOk = fs.rename(oldPath, newPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 删除HDFS文件
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean deleteFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            if (!existFile(path)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            Path srcPath = new Path(path);
            boolean isOk = fs.deleteOnExit(srcPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 上传HDFS文件
         * @param path
         * @param uploadPath
         * @throws Exception
         */
        public static void uploadFile(String path, String uploadPath) throws Exception {
            if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 上传路径
            Path clientPath = new Path(path);
            // 目标路径
            Path serverPath = new Path(uploadPath);
    
            // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
            fs.copyFromLocalFile(false, clientPath, serverPath);
            fs.close();
        }
    
        /**
         * 下载HDFS文件
         * @param path
         * @param downloadPath
         * @throws Exception
         */
        public static void downloadFile(String path, String downloadPath) throws Exception {
            if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 上传路径
            Path clientPath = new Path(path);
            // 目标路径
            Path serverPath = new Path(downloadPath);
    
            // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
            fs.copyToLocalFile(false, clientPath, serverPath);
            fs.close();
        }
    
        /**
         * HDFS文件复制
         * @param sourcePath
         * @param targetPath
         * @throws Exception
         */
        public static void copyFile(String sourcePath, String targetPath) throws Exception {
            if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 原始文件路径
            Path oldPath = new Path(sourcePath);
            // 目标路径
            Path newPath = new Path(targetPath);
    
            FSDataInputStream inputStream = null;
            FSDataOutputStream outputStream = null;
            try {
                inputStream = fs.open(oldPath);
                outputStream = fs.create(newPath);
    
                IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);
            } finally {
                inputStream.close();
                outputStream.close();
                fs.close();
            }
        }
    
        /**
         * 打开HDFS上的文件并返回byte数组
         * @param path
         * @return
         * @throws Exception
         */
        public static byte[] openFileToBytes(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            try {
                FSDataInputStream inputStream = fs.open(srcPath);
                return IOUtils.readFullyToByteArray(inputStream);
            } finally {
                fs.close();
            }
        }
    
        /**
         * 打开HDFS上的文件并返回java对象
         * @param path
         * @return
         * @throws Exception
         */
        public static <T extends Object> T openFileToObject(String path, Class<T> clazz) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            String jsonStr = readFile(path);
            return JSONObject.parseObject(jsonStr, clazz);
        }
    
        /**
         * 获取某个文件在HDFS的集群位置
         * @param path
         * @return
         * @throws Exception
         */
        public static BlockLocation[] getFileBlockLocations(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            FileStatus fileStatus = fs.getFileStatus(srcPath);
            return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
        }
    
    }
    

    注意容易出现权限问题,由于开发阶段我都是用的自己windows安装的,HADOOP_USER_NAME直接用的administrator

     

    文件内容

    上传文件

    上传文件 hadoop namenode日志

    查看HDFS文件

    hadoop fs -ls /user/ypp/ypp #查看指定目录下的文件和文件夹。/user/ypp/ypp 是HDFS上的目录,不是本地目录,命令不分操作系统

    查看文件内容

    hadoop fs -cat /user/ypp/ypp/111.txt #查看文件内容,这里涉及到中文乱码了(后面补充解决),同样是HDFS上的目录,不是本地目录,命令不分操作系统

    读取文件内容

    展开全文
  • java上传本地文件到HDFS简单demo

    万次阅读 2018-01-15 15:10:52
    本文整理了上传文件到hdfs的三种java代码,均已测试通过 1、创建maven项目  2、pom依赖 junit junit 3.8.1 test org.apache.hadoop hadoop-client 2.7.3
  • 原因:文件夹和文件名都是存放在 NameNode 上的,本地可以通过公网访问 NameNode,所以创建文件夹和文件都可以,但是当写数据的时候,NameNode 和DataNode 是通过内网通信的,NameNode 会返回给 DataNode 的内网 IP...
  • 通过JAVA API上传文件到HDFS

    千次阅读 2017-12-29 14:43:41
    hdfs操作注意事项  注意关闭虚拟机防火墙设置如果开启了翻墙代理,注意调整本地模式在core-site.xml中配置时,不要使用localhost,而应该使用虚拟机IP地址,否则无法连接虚拟机hadoop如下权限不够: 修改...
  • 今天在windows连接虚拟机的hdfs,通过IDEA上传文件到虚拟机的hdfs上,出现了权限不足问题,原因是以windows的用户上传文件,所以出错,原代码如下: package cn.edu.lingnan.hdfsclient; import org.apache.hadoop....
  • java操作Hadoop源码之HDFS Java API操作-上传文件,附带所需全部jar包,欢迎下载学习。
  • 本地文件上传到HDFS

    2018-04-14 23:55:30
    Hadoop HDFS的FileSystemAPI操作文件 将准备好的文件上传到hdfs的user/hadoop/music文件
  • 问题描述:java.lang.NoSuchMethodError:org.apache.hadoop.fs.FSOutputSummer.(Ljava/util/zip/Checksum;II)V. 解决办法:在pom文件中引入下列jar包 &lt;dependency&gt;  &lt;groupId&gt;org....
  • import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.C...
  • 使用Java API操作hdfs上传文件hdfs

    千次阅读 2019-08-22 01:12:03
    /** *上传文件,比较底层的写法 * @throws IOException */ @Test public void shanchaung() throws IOException { System.setProperty("HADOOP_USER_NAME","hadoop"); Configuration co...
  • 使用javaHDFS文件的新建上传下载

    千次阅读 2019-05-27 08:44:51
    1.步骤:在Windows下安装:1.安装JDK→2....①官网下载Java开发工具安装包jdk-8u201-windows-x64.exe:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html ②运行...
  • 本文主要参考了Hadoop HDFS文件系统通过java FileSystem 实现上传下载等,并实际的做了一下验证。代码与引用的文章差别不大,现列出来作为备忘。 import java.io.*; import java.net.URI; import org.apache....
  • package com.kfk.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.F....
  • 在上一篇博文中,我们讲...那么,我们可以使用hdfs提供的java api实现文件上传hdfs,或者直接从ftp上传hdfs。 然而,需要说明一点,在上一篇博文中,笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中
  • java实现对hdfs文件上传与下载

    千次阅读 2019-02-21 15:08:24
    hdfs上下载本地 /** *src:hdfs路径 例:hdfs://192.168.0.168:9000/data/a.json *dst:本地路径 */ public static void Hdfs2Local(String src, String dst) { try { Configuration conf = new Con...
  • 对于我这种刚开始学习Hadoop的人员,需要频繁的在本地电脑和HDFS之间上传下载以及删除文件,如果全部采用代码操作比较麻烦。 针对windows系统,有一些连接HDFS进行文档管理的工具,比如说HDFS+Explorer。 但是我的是...
  • Java 实现HDFS文件上传

    千次阅读 2018-05-24 15:57:57
    HDFS上传文件大致分为三种:1)直接调用API2)使用webhdfs3)httpfs以下对前两种进行简要说明1)直接调用API(直接上代码)public void hdfsUpload(String srcPath) throws IOException,URISyntaxException { ...
  • import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException;...import java.io.InputStream;...import java.io.InputStreamReader;...import java
  • // 1文件上传 @Service public class HDFSClient<jsonObjectS> { public Boolean upload1(InputStream in, String remoteDst) { Configuration conf = new Configuration(); FileSystem ...
  • Windows下使用JavaHDFS集群进行操作
  • hadoop idea 本地上传文件到hdfs

    千次阅读 2019-04-02 10:09:16
    最近在学习hadoop编程,其中需要将数据上传到hafs仓库,但是要先复制centos下再上传上去,比较麻烦,所以直接写了一个上传的程序,这个代码不能实现循环上传多个文件,有相应的函数,你可以试一下,也不是很难。...
  • java生成文件上传本地文件hdfs

    万次阅读 2020-11-17 15:06:41
    1.创建一个maven项目,导入jar包(将下面代码放入pom.xml文件中,然后导入相应的jar包) <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=...
  • 在搭建完hadoop大数据系统(CDH5.16.1)后,如何访问hdfs文件系统上的数据呢?那当然是通过构建maven项目 使用java api接口进行文件了。为此,特别进行了hdfs文件系统java api访问的整理。
  • 解决从本地文件系统上传到HDFS时的权限问题

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 46,633
精华内容 18,653
关键字:

java上传文件到hdfs

java 订阅