精华内容
下载资源
问答
  • JAVA 上传本地文件到HDFS

    千次阅读 2020-08-28 15:57:53
    上传文件 上传的时候加个一个文件夹路径ypp 读取文件 步入正题(代码) 本地安装的hadoop版本为3.1.3 pom.xml <properties> <java.version>1.8</java.version> <spark.version>...

    步入正题

    本地安装的hadoop版本为3.1.3

    pom.xml

    <properties>
            <java.version>1.8</java.version>
            <spark.version>3.0.0</spark.version>
    
            <scala.version>2.12.10</scala.version>
            <scala.binary.version>2.12</scala.binary.version>
            <hive.version>2.3.7</hive.version>
            <hadoop.version>3.1.1</hadoop.version>
            <kafka.version>2.4.1</kafka.version>
            <zookeeper.version>3.4.14</zookeeper.version>
            <guava.version>14.0.1</guava.version>
        </properties>
    <!--spark框架开始-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--
            Add joda time to ensure that anything downstream which doesn't pull in spark-hive
            gets the correct joda time artifact, so doesn't have auth failures on later Java 8 JVMs
            -->
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--spark框架结束-->
            <!--hadoop开始-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!--hadoop结束-->


    HDFSController

    package com.example.controller.hdfs;
    
    import com.example.utils.hdfs.HDFSUtils;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.multipart.MultipartFile;
    
    /**
     * @ClassName HDFSController
     * @Date 2020/8/26 11:41
     */
    @RestController
    @RequestMapping("/spark/hdfs")
    public class HDFSController {
    
        @GetMapping("/read")
        public String readFile(String fileName) throws Exception {
            return HDFSUtils.readFile(fileName);
        }
    
        @PostMapping("/upload")
        public void upload(MultipartFile file) throws Exception {
            HDFSUtils.createFile("ypp",file);
        }
    }
    

    工具类HDFSUtils

    package com.example.utils.hdfs;
    
    import com.alibaba.fastjson.JSONObject;
    import com.example.utils.properties.HDFSPropertiesUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.springframework.web.multipart.MultipartFile;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @ClassName HDFSUtils
     * @Date 2020/8/26 15:25
     */
    public class HDFSUtils {
    
        private static String hdfsPath;
        private static String hdfsName;
        private static final int bufferSize = 1024 * 1024 * 64;
    
        static {
            //设置成自己的
            hdfsPath= HDFSPropertiesUtils.getPath();
            hdfsName=HDFSPropertiesUtils.getUserName();
        }
    
        /**
         * 获取HDFS配置信息
         * @return
         */
        private static Configuration getConfiguration() {
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", hdfsPath);
            configuration.set("HADOOP_USER_NAME",hdfsName);
            return configuration;
        }
    
        /**
         * 获取HDFS文件系统对象
         * @return
         * @throws Exception
         */
        public static FileSystem getFileSystem() throws Exception {
            /*
            //通过这种方式设置java客户端访问hdfs的身份:会以 ypp 的身份访问 hdfs文件系统目录下的路径:/user/ypp 的目录
            System.setProperty("HADOOP_USER_NAME","ypp");
            Configuration configuration = new Configuration();
            configuration.set("fs.defauleFS","hdfs://ypp:9090");
            FileSystem fileSystem = FileSystem.get(configuration);
             */
    
            /*
            客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份
            也可以在构造客户端fs对象时,通过参数传递进去
            FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
            */
            FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration());
            return fileSystem;
        }
    
        /**
         * 在HDFS创建文件夹
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean mkdir(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            if (existFile(path)) {
                return true;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            boolean isOk = fs.mkdirs(srcPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 判断HDFS文件是否存在
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean existFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            Path srcPath = new Path(path);
            boolean isExists = fs.exists(srcPath);
            return isExists;
        }
    
        /**
         * 读取HDFS目录信息
         * @param path
         * @return
         * @throws Exception
         */
        public static List<Map<String, Object>> readPathInfo(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path newPath = new Path(path);
            FileStatus[] statusList = fs.listStatus(newPath);
            List<Map<String, Object>> list = new ArrayList<>();
            if (null != statusList && statusList.length > 0) {
                for (FileStatus fileStatus : statusList) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("filePath", fileStatus.getPath());
                    map.put("fileStatus", fileStatus.toString());
                    list.add(map);
                }
                return list;
            } else {
                return null;
            }
        }
    
        /**
         * HDFS创建文件
         * @param path
         * @param file
         * @throws Exception
         */
        public static void createFile(String path, MultipartFile file) throws Exception {
            if (StringUtils.isEmpty(path) || null == file.getBytes()) {
                return;
            }
            String fileName = file.getOriginalFilename();
            FileSystem fs = getFileSystem();
            // 上传时默认当前目录,后面自动拼接文件的目录
            Path newPath = new Path(path + "/" + fileName);
            // 打开一个输出流
            FSDataOutputStream outputStream = fs.create(newPath);
            outputStream.write(file.getBytes());
            outputStream.close();
            fs.close();
        }
    
        /**
         * 读取HDFS文件内容
         * @param path
         * @return
         * @throws Exception
         */
        public static String readFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            FSDataInputStream inputStream = null;
            try {
                inputStream = fs.open(srcPath);
                // 防止中文乱码
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                String lineTxt = "";
                StringBuffer sb = new StringBuffer();
                while ((lineTxt = reader.readLine()) != null) {
                    sb.append(lineTxt);
                }
                return sb.toString();
            } finally {
                inputStream.close();
                fs.close();
            }
        }
    
        /**
         * 读取HDFS文件列表
         * @param path
         * @return
         * @throws Exception
         */
        public static List<Map<String, String>> listFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
    
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            // 递归找到所有文件
            RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(srcPath, true);
            List<Map<String, String>> returnList = new ArrayList<>();
            while (filesList.hasNext()) {
                LocatedFileStatus next = filesList.next();
                String fileName = next.getPath().getName();
                Path filePath = next.getPath();
                Map<String, String> map = new HashMap<>();
                map.put("fileName", fileName);
                map.put("filePath", filePath.toString());
                returnList.add(map);
            }
            fs.close();
            return returnList;
        }
    
        /**
         * HDFS重命名文件
         * @param oldName
         * @param newName
         * @return
         * @throws Exception
         */
        public static boolean renameFile(String oldName, String newName) throws Exception {
            if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            // 原文件目标路径
            Path oldPath = new Path(oldName);
            // 重命名目标路径
            Path newPath = new Path(newName);
            boolean isOk = fs.rename(oldPath, newPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 删除HDFS文件
         * @param path
         * @return
         * @throws Exception
         */
        public static boolean deleteFile(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return false;
            }
            if (!existFile(path)) {
                return false;
            }
            FileSystem fs = getFileSystem();
            Path srcPath = new Path(path);
            boolean isOk = fs.deleteOnExit(srcPath);
            fs.close();
            return isOk;
        }
    
        /**
         * 上传HDFS文件
         * @param path
         * @param uploadPath
         * @throws Exception
         */
        public static void uploadFile(String path, String uploadPath) throws Exception {
            if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 上传路径
            Path clientPath = new Path(path);
            // 目标路径
            Path serverPath = new Path(uploadPath);
    
            // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
            fs.copyFromLocalFile(false, clientPath, serverPath);
            fs.close();
        }
    
        /**
         * 下载HDFS文件
         * @param path
         * @param downloadPath
         * @throws Exception
         */
        public static void downloadFile(String path, String downloadPath) throws Exception {
            if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 上传路径
            Path clientPath = new Path(path);
            // 目标路径
            Path serverPath = new Path(downloadPath);
    
            // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
            fs.copyToLocalFile(false, clientPath, serverPath);
            fs.close();
        }
    
        /**
         * HDFS文件复制
         * @param sourcePath
         * @param targetPath
         * @throws Exception
         */
        public static void copyFile(String sourcePath, String targetPath) throws Exception {
            if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
                return;
            }
            FileSystem fs = getFileSystem();
            // 原始文件路径
            Path oldPath = new Path(sourcePath);
            // 目标路径
            Path newPath = new Path(targetPath);
    
            FSDataInputStream inputStream = null;
            FSDataOutputStream outputStream = null;
            try {
                inputStream = fs.open(oldPath);
                outputStream = fs.create(newPath);
    
                IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);
            } finally {
                inputStream.close();
                outputStream.close();
                fs.close();
            }
        }
    
        /**
         * 打开HDFS上的文件并返回byte数组
         * @param path
         * @return
         * @throws Exception
         */
        public static byte[] openFileToBytes(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            try {
                FSDataInputStream inputStream = fs.open(srcPath);
                return IOUtils.readFullyToByteArray(inputStream);
            } finally {
                fs.close();
            }
        }
    
        /**
         * 打开HDFS上的文件并返回java对象
         * @param path
         * @return
         * @throws Exception
         */
        public static <T extends Object> T openFileToObject(String path, Class<T> clazz) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            String jsonStr = readFile(path);
            return JSONObject.parseObject(jsonStr, clazz);
        }
    
        /**
         * 获取某个文件在HDFS的集群位置
         * @param path
         * @return
         * @throws Exception
         */
        public static BlockLocation[] getFileBlockLocations(String path) throws Exception {
            if (StringUtils.isEmpty(path)) {
                return null;
            }
            if (!existFile(path)) {
                return null;
            }
            FileSystem fs = getFileSystem();
            // 目标路径
            Path srcPath = new Path(path);
            FileStatus fileStatus = fs.getFileStatus(srcPath);
            return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
        }
    
    }
    

    注意容易出现权限问题,由于开发阶段我都是用的自己windows安装的,HADOOP_USER_NAME直接用的administrator

     

    文件内容

    上传文件

    上传文件 hadoop namenode日志

    查看HDFS文件

    hadoop fs -ls /user/ypp/ypp #查看指定目录下的文件和文件夹。/user/ypp/ypp 是HDFS上的目录,不是本地目录,命令不分操作系统

    查看文件内容

    hadoop fs -cat /user/ypp/ypp/111.txt #查看文件内容,这里涉及到中文乱码了(后面补充解决),同样是HDFS上的目录,不是本地目录,命令不分操作系统

    读取文件内容

    展开全文
  • java上传本地文件到HDFS简单demo

    万次阅读 2018-01-15 15:10:52
    本文整理了上传文件到hdfs的三种java代码,均已测试通过 1、创建maven项目  2、pom依赖 junit junit 3.8.1 test org.apache.hadoop hadoop-client 2.7.3

    本文整理了上传文件到hdfs的三种java代码,均已测试通过

    1、创建maven项目 


    2、pom依赖

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.96.1-hadoop2</version>
        </dependency>
    </dependencies>
    
    <build>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <filtering>true</filtering>
                <excludes>
                    <exclude>**/*.java</exclude>
                </excludes>
            </resource>
        </resources>
    
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>${mainClass}</mainClass>
                                </transformer>
                                <transformer                                    implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                                <transformer                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.tooling</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


    3、java代码

    代码一:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    public class CopyFile {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        String localDir = "/home/hdfs/files/test.txt";
        String hdfsDir = "hdfs://server:8020/bbbb";
        try{
                Path localPath = new Path(localDir);
                Path hdfsPath = new Path(hdfsDir);
                FileSystem hdfs = FileSystem.get(conf);
                if(!hdfs.exists(hdfsPath)){
                     hdfs.mkdirs(hdfsPath);
                 }
                 hdfs.copyFromLocalFile(localPath, hdfsPath);
             }catch(Exception e){
             e.printStackTrace();
             }
        }
    }
    代码二:

    package my.test;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    public class UploadSingle {
     public static void main(String[] args) throws URISyntaxException, IOException{
         Configuration conf = new Configuration();
         URI uri = new URI("hdfs://server:8020");
         FileSystem fs = FileSystem.get(uri,conf);
         Path resP = new Path("/home/hdfs/files/test.txt");
         Path destP = new Path("/aaaaaa");
         if(!fs.exists(destP)){
              fs.mkdirs(destP);
         }
        fs.copyFromLocalFile(resP, destP);
        fs.close();
      }
    }

    代码三:

    import java.io.IOException;
    import java.net.URI;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    //上传本地文件到HDFS
    public class CopyFile {
      public static void main(String[] args) throws IOException {
          //获取配置文件信息
          Configuration conf = new Configuration();
          conf.addResource(new Path("conf/core-site.xml"));
          //获取文件系统
          FileSystem hdfs = FileSystem.get(conf);
          //文件名称
          Path src = new Path("/home/hdfs/files/test.txt");
          Path dst = new Path("hdfs://server:8020/cccc");
          if(!hdfs.exists(dst)){
              hdfs.mkdirs(dst);
          }
          hdfs.copyFromLocalFile(src, dst);
          System.out.println("Upload to " + conf.get("fs.default.name"));
          FileStatus files[] = hdfs.listStatus(dst);
          for(FileStatus file:files){
              System.out.println(file.getPath());
          }
      }
    }
    以上各段代码,其中,

    if(!hdfs.exists(dst)){
    hdfs.mkdirs(dst);
    }

    这段代码必须存在,否则通过 hdfs dfs -ls /cccc命令查看cccc等指定路径下的文件列表时显示不出text.txt文件,

    而是将txt中的文字内容直接存入了此路径下,通过 hdfs dfs -cat /cccc命令可以查看文字内容。


    4、在eclipse项目上右击,选择run as–》maven install,生成jar包,将jar包上传到服务器上。

    5、运行jar包 

    unix代码: 

    su hdfs 
    $ hadoop jar path/name.jar classname 
    其中,path是jar包的路径,name是项目名称,classname是类名。

    展开全文
  • java生成文件并上传本地文件hdfs

    万次阅读 2020-11-17 15:06:41
    1.创建一个maven项目,导入jar包(将下面代码放入pom.xml文件中,然后导入相应的jar包) <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=...

    项目整体结构说明:
    在这里插入图片描述
    1.创建一个maven项目,导入jar包(将下面代码放入pom.xml文件中,然后导入相应的jar包)

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
    
        <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.5</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.6.5</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.5</version>
            </dependency>
        </dependencies>
        <groupId>org.example</groupId>
        <artifactId>sc</artifactId>
        <version>1.0-SNAPSHOT</version>
    
    
    </project>
    

    2.生成本地文件代码(将下面代码放入createFile文件中)

    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    
    public class createFile {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            File f1 = new File("D:/test.txt");//本地文件地址
            try {
                FileOutputStream fos = new FileOutputStream(f1);
                if (!f1.exists()) {
                    try {
                        f1.createNewFile();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                byte[] buffer = new byte[1024 * 640];
                long start = System.currentTimeMillis();
                for (int i = 0; i < 1024; i++) {
                    fos.write(buffer);
                    fos.flush();
    
                }
                long end = System.currentTimeMillis();
                long time = end - start;
                System.out.println("写入时间:" + time + "ms");
                fos.close();
            } catch (IOException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }
    
    }
    

    3.上传文件代码(将下面代码放入HdfsTest文件中)

    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    public class HdfsTest {
            public static void main (String[]args) throws Exception {
                //1 创建连接
                Configuration conf = new Configuration();
                //2 连接端口
                conf.set("fs.defaultFS", "hdfs://192.168.43.190:9870");//主节点的ip地址
                //3 获取连接对象
                FileSystem fs = FileSystem.get(conf);
                //本地文件上传到 hdfs
                System.out.println("开始上传");
                fs.copyFromLocalFile(new Path("D://test.txt"), new Path("usr/local/hadoop/input"));//第一个路径是本地文件路径,第二个路径是hdfs下路径
                fs.close();
                System.out.println("上传完毕");
            }
        }
    

    4、使用流上传文件

    //流上传文件
            FileInputStream in=new FileInputStream("D://test.txt");//读取本地文件
            FSDataOutputStream out = fs.create(new Path("usr/local/hadoop/input"));//在hdfs上创建路径
            byte[] b = new byte[1024*1024];
            int read = 0;
            while((read = in.read(b)) > 0){
                out.write(b, 0, read);
            }
    

    5.将hdfs文件下载到本地

    //hdfs文件复制到本地(流)
    System.out.println("------开始下载------");
    FSDataInputStream in = fs.open(new Path("usr/local/hadoop/input1"));
    FileOutputStream out = new FileOutputStream("D:/xx.txt");
    IOUtils.copyBytes(in, out, conf);
    

    注:所有任务做完,记得关闭流

    ---------------------------------------------------------------------提示---------------------------------------------------------------------

    整个项目打包:链接:https://pan.baidu.com/s/1Mpz74-qBOIzZkSFJUg9Rug
    提取码:mnay (百度网盘)

    展开全文
  • 019-07-01 16:45:24,933 INFO org.apache.hadoop.ipc.Server: IPC Server handler 2 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock from 58.211.111.42:63048 Call#3 Retry#0java.io.IOEx...

    019-07-01 16:45:24,933 INFO org.apache.hadoop.ipc.Server: IPC Server handler 2 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock from 58.211.111.42:63048 Call#3 Retry#0

    java.io.IOException: File /a1.txt could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation.

    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1620)

    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3350)

    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:678)

    at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:213)

    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:491)

    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)

    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)

    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2141)

    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2137)

    at java.security.AccessController.doPrivileged(Native Method)

    at javax.security.auth.Subject.doAs(Subject.java:422)

    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1835)

    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2135)

    学些hadoop。遇到这个问题,查找网上好多资料,一般都是说namenode和datanode不同步导致的,或者防火墙没开50010端口,或者nameNode和datanode无法通信导致的。

    其实通过命令行都是可以正常操作的,远程调用的时候可以创建目录和文件,但是像文件写内容的时候,就写不进去,报如上错误。

    本地host需要配置好,然后加上下面这句代码

    configuration = new Configuration();

    configuration.set("dfs.client.use.datanode.hostname", "true");

    意思大概就是伪分布式hdfs,datanode注册到namenode的ip是本机的127.0.0.1,当远程客户端连接到namenode得到datanode的ip的时候,得到的是127.0.0.1,这自然是连接不上的。这里的意思大概就是强制本地java客户端使用hostname去连接datanode,可以连接成功

    防火墙端口50010也是必须打开的,因为数据节点需要使用这个端口

    展开全文
  • 在做Hadoop数据挖掘项目的时候,我们第一步是源数据的获取,即把相应的数据放置到HDFS中,以便Hadoop进行计算,手动将文件上传到HDFS中,未免太费时费力,所以我们可以采取像Flume一样的框架,或者采用Shell脚本进行...
  • 这是我的代码和在网上看到的差不多//..................................................................importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;i...这是我的代码 和在网上看到的...
  • 通过JAVA API上传文件到HDFS

    千次阅读 2017-12-29 14:43:41
     注意关闭虚拟机防火墙设置如果开启了翻墙代理,注意调整到本地模式在core-site.xml中配置时,不要使用localhost,而应该使用虚拟机IP地址,否则无法连接虚拟机hadoop如下权限不够: 修改用户权限 上传成功 ...
  • Linux上传本地文件到HDFS

    万次阅读 多人点赞 2018-01-15 16:02:37
    上一篇文章记录了如何使用java代码将本地文件上传到HDFS中,之后我又尝试了直接使用Linux命令上传文件,过程如下。 su hdfs //切换到HDFS用户 cd /home/hdfs/files //进入服务器中文件所在目录 ls //查看此目录...
  • java上传文件到HDFS

    千次阅读 2016-11-09 19:13:59
    java上传本地文件到HDFS。 1、pom依赖 <groupId>org.apache.hadoop <artifactId>hadoop-common <version>2.6.0-cdh5.5.1 </dependency><dependency> <groupId>or
  • JAVA实现批量上传本地文件HDFS

    千次阅读 2017-07-09 17:34:32
    JAVA实现批量上传本地文件HDFS版权声明:本文为博主原创文章,未经博主允许不得转载。前言 小白一枚,本文简单实现了批量上传遥感影像至HDFS,所以没有实现窗体简单的关闭、缩小、取消等功能。重申这只是简单demo...
  • 原因:文件夹和文件名都是存放在 NameNode 上的,本地可以通过公网访问 NameNode,所以创建文件夹和文件都可以,但是当写数据的时候,NameNode 和DataNode 是通过内网通信的,NameNode 会返回给 DataNode 的内网 IP...
  • 首先在网上找了好久没有找到从本地文件系统上传整个目录到hdfs文件系统的程序,权威指南上也没有,都是单个文件上传,所以这里自己编写了一个程序,封装成jar包执行能够复制。先说明一下代码:须要手动输入两个路径...
  • 上传本地文件到HDFS

    千次阅读 2016-09-21 10:09:44
    本文程序参考《实战Hadoop》【刘鹏 主编】,如有侵权立即删除,仅作记录以备后查。 开发环境: 选项 参数 虚拟操作系统 Centos 6.8 集成开发环境 Eclipse Mars ...其中src文件夹下的Hdoop包存放java
  • 上传本地文件到hdfs

    千次阅读 2018-03-23 20:43:29
    package com.yc.hadoop.hdfs;import java.net.URI;import java.util.Scanner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import...
  • 在上一篇博文中,我们讲了如何编写、运行、测试一个MR,但是hdfs上的文件是手动执行命令从本地linux上传hdfs的。在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐。那么,我们可以使用hdfs...
  • 这里使用的是CDH 5.13.0分布式系统在网上搜集了很久,找到一套能上传文件java代码package com.data.sysatem.crawler;import java.io.BufferedInputStream;import java.io.File;import java.io.FileInputStream;...
  • 我正在尝试将文件写入HDFS,文件已创建,但它在群集上是空的,但是当我在本地运行代码时,它就像一个魅力.这是我的代码:FSDataOutputStream recOutputWriter = null;FileSystem fs = null;try {//OutputWriter = new ...
  • I've searched for some time now and none of the solutions seem to work for me.Pretty straightforward - I want to upload data from my local file system to HDFS using the Java API. The Java program wil....
  • 引言:通过Java本地把windows里的文件资源上传到centOs环境下的 hdfs文件系统里,中间遇到了很多问题,不过最终还是把文件上传到hdfs里了 环境:centos-7.0,hadoop.2.8.5.gz,jdk1.8,eclipse 1、下载hadoop.2.8.5...
  • [Hadoop]Hadoop上传本地文件到HDFS

    千次阅读 2015-04-13 23:35:33
    代码如下,需要注意标红处,要不然会出现FileNotFound(permission deny)得错误。...import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem;
  • hadoop idea 本地上传文件到hdfs

    千次阅读 2019-04-02 10:09:16
    最近在学习hadoop编程,其中需要将数据上传到hafs仓库,但是要先复制centos下再上传上去,比较麻烦,所以直接写了一个上传的程序,这个代码不能实现循环上传多个文件,有相应的函数,你可以试一下,也不是很难。...
  • 利用java API实现本地文件上传hdfs

    千次阅读 2014-08-29 16:49:36
    public static void main(String args[]) throws Exception  { Configuration conf = new Configuration();...FileSystem hdfs=FileSystem.get(conf); Path src_dir =new Path("D:\\C++\\helloworld");
  • import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import ...
  • java操作Hadoop源码之HDFS Java API操作-创建目录,附带所需全部jar包,欢迎下载学习。
  • Java文件HDFS失败

    2021-03-08 18:34:30
    各位大牛:本人在虚拟机上搭建了Hadoop环境,...但在我本地电脑,往虚拟机上的HDFS文件时,文件创建成功,但往文件里写入内容一直失败,感觉是找不可写的datanode, 不知道怎么配置,望各位大牛指点。错误代码如...
  • Hadoop——上传本地文件到hdfs

    千次阅读 2015-04-05 21:19:59
    import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public cla
  • 代码如下:package ...import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import org.apache.hadoop.conf.Configuration;import org.apache.h...
  • 当使用 JAVA API 时提示: put: Permission denied: user=root, access=MrBlackWhite, inode=”/user/shijin”:hdfs:supergroup:drwxr-xr-x 解决办法: 另外关于使用命令时遇到权限的问题,可以参考这篇...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 23,489
精华内容 9,395
关键字:

java上传本地文件到hdfs

java 订阅