精华内容
下载资源
问答
  • Java程序调用Hadoop接口入门

    万次阅读 2017-04-24 20:29:34
    一、安装Hadoop集群环境 参考http://blog.itpub.net/29485627/viewspace-2137702/   二、程序编写 1 整个程序的目录为   2 HdfsUtil.java中的代码为 package hadoop.hdfs; import java.io....

    一、安装Hadoop集群环境

    参考http://blog.itpub.net/29485627/viewspace-2137702/

     

    二、程序编写

    1 整个程序的目录为

     

    2 HdfsUtil.java中的代码为

    package hadoop.hdfs;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hdfs.DistributedFileSystem;
    import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @author 作者 : yangyang
     * @version 创建时间:2016年4月21日 类说明 :hdfs文件系统操作类
     */
    public class HdfsUtil {
    
    	private static final Logger log = LoggerFactory.getLogger(HdfsUtil.class);
    	// 初始化
    	static Configuration conf = new Configuration();
    	static FileSystem hdfs;
    	static {
    		try {
    			hdfs = FileSystem.get(conf);
    		} catch (IOException e) {
    			if (log.isDebugEnabled())
    				log.debug("初始化hadoop配置失败", e);
    		}
    	}
    
    	/**
    	 * 创建文件夹
    	 * 
    	 * @param dir
    	 * @throws IOException
    	 */
    	public static boolean mkDirs(String dir) {
    		try {
    			Path path = new Path(dir);
    			return hdfs.mkdirs(path);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		return false;
    	}
    
    	/**
    	 * 本地文件上传到hdfs
    	 * 
    	 * @param localSrc
    	 * @param hdfsDst
    	 * @throws IOException
    	 */
    	public static void uploadFile(String localSrc, String hdfsDst) throws IOException {
    		Path src = new Path(localSrc);
    		Path dst = new Path(hdfsDst);
    		hdfs.copyFromLocalFile(src, dst);
    
    		FileStatus files[] = hdfs.listStatus(dst);
    		System.out.println("Upload to \t" + conf.get("fs.default.name")
    				+ hdfsDst);
    		for (FileStatus file : files) {
    			System.out.println(file.getPath());
    		}
    	}
    	
    	/**
    	 * 下载文件到本地
    	 * @param remotePath hdfs文件目录
    	 * @param localPath 本地文件目录
    	 */
    	public static void downLoadFile(String remotePath, String localPath) {
    
    		Path _remotePath = new Path(remotePath);
    		Path _localPath = new Path(localPath);
    		
    		try {
    			hdfs.copyToLocalFile(false,_remotePath, _localPath,true);
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} finally {
    
    			try {
    				hdfs.close();
    			} catch (IOException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    
    		}
    
    	}
    
    	/**
    	 * 创建文件
    	 * 
    	 * @param fileName
    	 * @param fileContent
    	 * @throws IOException
    	 */
    	public static void createFile(String fileName, String fileContent) {
    		Path dst = new Path(fileName);
    		byte[] bytes = fileContent.getBytes();
    		FSDataOutputStream output;
    		
    		try {
    			output = hdfs.create(dst);
    			output.write(bytes);
    		} catch (IOException e) {
    			if (log.isDebugEnabled())
    				log.debug("创建文件异常:" + fileName, e);
    		}
    		System.out.println("new file \t" + conf.get("fs.default.name")
    				+ fileName);
    	}
    	
    	/**
    	 * 获取文件内容
    	 * @param fileName 文件名
    	 * @return
    	 */
    	public static String readFileContent(String fileName){
    		Path p = new Path(conf.get("fs.default.name")+fileName);
    		FSDataInputStream in = null;
    		String content = "";
    		
    		try {
    			in = hdfs.open(p);
    			BufferedReader buff = new BufferedReader(new InputStreamReader(in));
    			content = buff.readLine();
    			buff.close();
    			in.close();
    			hdfs.close();
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			if(log.isDebugEnabled())
    				log.debug("读取文件:"+fileName+"失败", e);
    		}
    		
    		return content;
    	}
    
    	public void listFiles(String dirName) throws IOException {
    		Path f = new Path(dirName);
    		FileStatus[] status = hdfs.listStatus(f);
    		System.out.println(dirName + " has all files:");
    		for (int i = 0; i < status.length; i++) {
    			System.out.println(status[i].getPath().toString());
    		}
    	}
    
    	/**
    	 * 删除文件
    	 * 
    	 * @param fileName
    	 *            文件路径
    	 * @throws IOException
    	 */
    	public static boolean deleteFile(String fileName) throws IOException {
    		Path f = new Path(fileName);
    		boolean isExists = hdfs.exists(f);
    		if (isExists) {
    			boolean isDel = hdfs.delete(f, true);
    			return isDel;
    		} else {
    			return false;
    		}
    	}
    
    	/**
    	 * 获取集群上的所有节点名称
    	 * 
    	 * @throws IOException
    	 */
    	public static List<DatanodeInfo> getDateNodeHost() throws IOException {
    		DistributedFileSystem _hdfs = (DistributedFileSystem) hdfs;
    		DatanodeInfo[] dataNodeStats = _hdfs.getDataNodeStats();
    		List<DatanodeInfo> dataNodeLst = Arrays.asList(dataNodeStats);
    		return dataNodeLst;
    	}
    
    	/**
    	 * 文件重命名
    	 * 
    	 * @param fileName
    	 *            文件名
    	 * @param newFileName
    	 *            新文件名
    	 * @throws IOException
    	 */
    	public static boolean renameFile(String fileName, String newFileName) {
    
    		Path path = new Path(fileName);
    		Path newPath = new Path(newFileName);
    		boolean b = false;
    		try {
    			b = hdfs.rename(path, newPath);
    		} catch (Exception e) {
    			if (log.isDebugEnabled())
    				log.debug("文件:[" + fileName + "]修改为:[" + newFileName + "]失败", e);
    		}
    		return b;
    	}
    
    	public static void main(String[] args) throws IOException {
    /*		System.err.println(mkDirs("/test"));
    		createFile("/test/my.txt", "I Love Beijing!");
    		System.out.println(readFileContent("/test/my.txt"));*/
    //		deleteFile("/test/my.txt");
    //		uploadFile("F:\\a1.txt", "/test");
    		downLoadFile("/test/a1.txt", "F:\\a2.txt");
    	}
    }

     

    3 src/main/resouces中的四个配置文件从hadoop环境中拷贝,具体位于/usr/hadoop/etc/hadoop中

     

    三、验证

    1 运行HdfsUtil中的main方法的上三行(下三行先注释起来),结果为

    true

    new file    hdfs://192.168.121.201:9000/test/my.txt

     

    在hadoop环境中验证

    # hadoop fs -ls /

    # hadoop fs -ls /test

     

    2 将main方法中的第四行代码打开,其他行代码都注释,重新跑一次。

    进hadoop环境中验证,发现my.txt文件已成功删除


    3 建立F:\a1.txt文件,其中的内容随意输入,比如“abcdefg”。

    将main方法中的第五行代码打开,其他行代码都注释,重新跑一次。运行结果为:

    Uploadto   hdfs://192.168.121.201:9000/test

    hdfs://192.168.121.201:9000/test/a1.txt


    进hadoop环境中验证,发现a1.txt已经被传到hadoop中



    4 将main方法中的第六行代码打开,其他行代码都注释,重新跑一次。发现hadoop中的/test/a1.txt已被下载到F:\a2.txt,其中的内容为“abcdefg”

     

    展开全文
  • 摘要 总结自己springboot项目使用hbase、hadoop中出现的莫名奇怪的依赖使用报错 ...java.lang.NoClassDefFoundError: org/apache/hadoop/tracing/SpanReceiverHost at org.apache.hadoop.hdfs.DFSClient.<.

    在这里插入图片描述

    摘要

    总结自己springboot项目使用hbase、hadoop中出现的莫名奇怪的依赖使用报错

    hbase   2.0.2版本
    hadoop  3.1.1版本
    

    注意:本地依赖版本最好和服务器上版本一致,避免发生莫名奇怪错误

    情况1:NoClassDefFoundError

    java.lang.NoClassDefFoundError: org/apache/hadoop/tracing/SpanReceiverHost
    	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:643) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3354) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.fs.FileSystem$1.run(FileSystem.java:216) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.fs.FileSystem$1.run(FileSystem.java:213) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_231]
    	at javax.security.auth.Subject.doAs(Subject.java:422) ~[na:1.8.0_231]
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:213) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at com.geespace.microservices.directory.assets.service.impl.DataAssetsScreenServiceImpl.updateAssetsSize(DataAssetsScreenServiceImpl.java:149) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at com.geespace.microservices.directory.assets.service.impl.DataAssetsScreenServiceImpl$$FastClassBySpringCGLIB$$cc83a803.invoke(<generated>) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[api-gateway-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_231]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_231]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_231]
    	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_231]
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.tracing.SpanReceiverHost
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_231]
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_231]
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_231]
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_231]
    	... 27 common frames omitted	
    

    原因:碰到这种xxxxxClass未找到,就说明服务器上使用jar包版本不对,导致找不到类,原因就是jar出现多版本导致的,这个类出现在hadoop-common 2.7.7中,而我要使用的是3.1.1版本,不应该出现这个对象
    在这里插入图片描述
    解决思路:pom依赖使用标签进行依赖冲突排除,确保最后只有一个版本即可(也就是把乱七八糟jar内部引入的版本全部排除调)

    情况2:使用hbase报错:NullPointerException

    下面这句使用报错 ↓

     FileSystem fs = FileSystem.get(new URI(hdfsPath), configuration, hdfsName);
    

    原因:报空指针就是没有进行初始化,而直接调用方法导致的
    解决思路:写到一起或者初始化后再使用即可

    情况3:summary.typeQuotaInfos.typeQuotaInfo[3].type

    查看这篇文章即可 -> : https://blog.csdn.net/a924382407/article/details/117441247?spm=1001.2014.3001.5501

    展开全文
  • Java代码实现hadoop命令

    千次阅读 2017-07-20 22:19:31
    读取文件最简单的方法是使用java.net.URL对象打开数据流,从中读取数据,但让java程序能识别hadoop的hdfs url需要通过FsUrlStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHandlerFactory方法。...

    一. 获取文件系统实例
    通过FileSystem的get()或newInstance()方法获取文件系统的实例。

    get()和newInstance()方法分别有3个重载方法:

    //返回默认文件系统,core-site.xml中指定的,如果没有指定,则默认本地文件系统
    public static FileSystem get(Configuration conf) throws IOException
    public static FileSystem newInstance(Configuration conf) throws IOException
    
    //通过给定URI方案和权限来确定要使用的文件系统,若URI中未指定方案,返回默认文件系统
    public static FileSystem get(URI uri, Configuration conf) throws IOException
    public static FileSystem newInstance(URI uri, Configuration conf) throws IOException
    
    //作为给定用户来访问文件系统,对安全来说很重要
    public static FileSystem get(final URI uri, final Configuration conf, final String user)  throws IOException, InterruptedException
    public static FileSystem newInstance(final URI uri, final Configuration conf, final String user) throws IOException, InterruptedException
    

    另外可以通过getLocal()或newInstanceLocal()获取本地文件系统:

    public static LocalFileSystem getLocal(Configuration conf) throws IOException
    public static LocalFileSystem newInstanceLocal(Configuration conf) throws IOException
    

    二. 读取数据
    1. 从hadoop url读取数据

    读取文件最简单的方法是使用java.net.URL对象打开数据流,从中读取数据,但让java程序能识别hadoop的hdfs url需要通过FsUrlStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHandlerFactory方法。
    java测试类代码:

    public class ReadFromHadoopURL {
    
      static {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
      }
    
      public static void main(String[] args) throws Exception{
          String uri = "hdfs://localhost:9000/input/input1.txt";
          InputStream in = null;
    
          try{
              in = new URL(uri).openStream();
              IOUtils.copyBytes(in, System.out, 4096, false);
          }finally{
              IOUtils.closeStream(in);
          }
      }
    }
    

    注:
    这种文件读取的方法具有一定的限制性。因为Java.net.URL的setURLStreamHandlerFactory方法每个java虚拟机最多调用一次,如果程序中有不受自己控制的第三方组件调用了这个方法,将无法使用这种方法从hadoop中读取数据。
    附setURLStreamHandlerFactory源码:

    public static void setURLStreamHandlerFactory(URLStreamHandlerFactory fac) {
            synchronized (streamHandlerLock) {
                if (factory != null) {
                    throw new Error("factory already defined");
                }
    
                SecurityManager security = System.getSecurityManager();
                if (security != null) {
                    security.checkSetFactory();
                }
                handlers.clear();
                factory = fac;
            }
    
    }
    

    2.通过FileSystem API读取数据
    hadoop文件系统中通过org.apache.hadoop.fs.Path对象来代表文件。
    获取到FileSystem实例后通过open()方法获取文件的输入流

    //缓冲区默认大小4KB,bufferSize指定缓冲区大小
    public FSDataInputStream open(Path f) throws IOException
    public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;

    例:
    java测试类代码:

    public class ReadFromFileSystemAPI {
        public static void main(String[] args) throws Exception{
            String uri = "hdfs://localhost:9000/input/input1.txt";
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(URI.create(uri), conf);
            //第二种获取文件系统的方法
            //FileSystem fs = FileSystem.newInstance(URI.create(uri), conf);
            InputStream in = null;
            try{
                in = fs.open(new Path(uri));
                IOUtils.copyBytes(in, System.out, 4096, false);
            }finally{
                IOUtils.closeStream(in);
            }
        }
    
    }

    输入流FSDataInputStream对象介绍
    FileSystem对象中的open()方法返回的是org.apache.hadoop.fs.FSDataInputStream对象,这个对象继承了java.io.DataInputStream,并支持随机访问,从流的任意位置读取数据。

        public class FSDataInputStream extends DataInputStream
        implements Seekable, PositionedReadable,
          ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
          HasEnhancedByteBufferAccess{
    
    //implementation
    
    }

    Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量的方法。注:seek()方法开销相对高,需要慎用。

    public interface Seekable {
        //定位到从文件起始位置开始指定的偏移量的位置,若偏移量超出文件位置会报异常
      void seek(long pos) throws IOException;
    
         //返回当前位置相对于文件起始位置的偏移量
      long getPos() throws IOException;
    
        //查找数据的其他副本,若找到一个新副本则返回true,否则返回false
      boolean seekToNewSource(long targetPos) throws IOException;
    }
    
    PositionedReadable接口从一个指定偏移量处读取文件的一部分。
    public interface PositionedReadable {
      //从文件指定position处读取至多length字节的数据,并存入缓冲区buffer的指定偏移量offset处
     //返回值是督导的字节数,可能比length的长度小
      public int read(long position, byte[] buffer, int offset, int length) throws IOException;
    
       //从文件指定position处读取指定length的字节,并存入缓冲区buffer指定偏移量offset处
      //若读到文件末尾仍不足length字节,则抛出异常
      public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
    
      //从文件指定position处读取缓冲区buffer大小的字节,并存入buffer
      //若读到文件末尾仍不足length字节,则抛出异常
      public void readFully(long position, byte[] buffer) throws IOException;
    }

    例:
    测试代码:

    
        public class TestFSDataInputStream {
        private FileSystem fs = null;
        private FSDataInputStream in = null;
        private String uri = "hdfs://localhost:9000/input/input1.txt";
    
        private Logger log = Logger.getLogger(TestFSDataInputStream.class);
        static{
            PropertyConfigurator.configure("conf/log4j.properties");
        }
    
        @Before
        public void setUp() throws Exception {
            Configuration conf = new Configuration();
            fs = FileSystem.get(URI.create(uri), conf);
        }
    
        @Test
        public void test() throws Exception{
            try{
                in = fs.open(new Path(uri));
    
                log.info("文件内容:");
                IOUtils.copyBytes(in, System.out, 4096, false);
    
                in.seek(6);
                Long pos = in.getPos();
                log.info("当前偏移量:"+pos);
                log.info("读取内容:");
                IOUtils.copyBytes(in, System.out, 4096, false);
    
                byte[] bytes = new byte[10];
                int num = in.read(7, bytes, 0, 10);
                log.info("从偏移量7读取10个字节到bytes,共读取"+num+"字节");
                log.info("读取内容:"+(new String(bytes)));
    
                //以下代码会抛出EOFException
    //          in.readFully(6, bytes);
    //          in.readFully(6, bytes, 0, 10);
            }finally{
                IOUtils.closeStream(in);
            }
        } 
    }

    三. 写入数据
    1.新建文件
    给准备建的文件指定一个Path对象,然后通过FileSystem的create()方法返回一个用于写入数据的输出流。
    Create()方法有多个重载版本,允许指定是否需要强制覆盖现有文件、文件备份数量、写入文件时缓冲区大小、文件块大小及文件权限。还可指定Progressable回调接口,这样可以把数据写入datanode的进度通知给应用。
    Create()方法能为需要写入且当前不存在的文件创建父目录,若不希望这样,则应先调用exists()方法检查父目录是否存在。
    create()方法的所有重载方法:

    //创建一个输出流,默认覆盖现有文件  
    public FSDataOutputStream create(Path f) throws IOException 
    
      //创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
    public FSDataOutputStream create(Path f, boolean overwrite) throws IOException
    
     //创建一个输出流,默认覆盖现有文件,progress用来报告进度
    public FSDataOutputStream create(Path f, Progressable progress)  throws IOException
    
      //创建一个输出流,默认覆盖现有文件,replication指定文件备份数
    public FSDataOutputStream create(Path f, short replication) throws IOException 
    
      //创建一个输出流,默认覆盖现有文件,replication指定文件备份数,progress用来报告进度
    public FSDataOutputStream create(Path f, short replication, Progressable progress)  throws IOException
    
      //创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
     //bufferSize指定写入时缓冲区大小
    public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException
    
      //创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
     // bufferSize指定写入时缓冲区大小,replication指定文件备份数,blockSize指定文件块大小
    public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize ) throws IOException 
    
      //创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
     // bufferSize指定写入时缓冲区大小,replication指定文件备份数,blockSize指定文件块大小
     // progress用来报告进度
    public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,short replication, long blockSize,  Progressable progress ) throws IOException
    
      //创建一个输出流,文件存在时,overwrite为true则覆盖现有文件,为false则抛出异常
     // bufferSize指定写入时缓冲区大小,replication指定文件备份数,blockSize指定文件块大小
     // progress用来报告进度,permission指定文件权限
    public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)  throws IOException;
    
      //创建一个输出流,permission指定文件权限, bufferSize指定写入时缓冲区大小
     // replication指定文件备份数,progress用来报告进度
     // flags指定创建标志,标志如下:
     //          CREATE - 如果文件不存在则创建文件,否则抛出异常
     //          APPEND - 如果文件存在则向文件追加内容,否则抛出异常
     //          OVERWRITE - 文件存在时,覆盖现有文件,否则抛出异常
     //          CREATE|APPEND - 文件不存在时创建文件,文件已存在时向文件追加内容
     //          CREATE|OVERWRITE - 文件不存在时创建文件,否则覆盖已有文件
     //          SYNC_BLOCK - 强制关闭文件块,如果需要同步操作,每次写入后还需调用Syncable.hsync()方法
    public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException 
    
      //创建一个输出流,permission指定文件权限, bufferSize指定写入时缓冲区大小
     // replication指定文件备份数,progress用来报告进度,blockSize指定文件块大小
     // checksumOpt指定校验和选项,若为空,则使用配置文件中的值
     // flags指定创建标志,标志如下:
     //          CREATE - 如果文件不存在则创建文件,否则抛出异常
     //          APPEND - 如果文件存在则向文件追加内容,否则抛出异常
     //          OVERWRITE - 文件存在时,覆盖现有文件,否则抛出异常
     //          CREATE|APPEND - 文件不存在时创建文件,文件已存在时向文件追加内容
     //          CREATE|OVERWRITE - 文件不存在时创建文件,否则覆盖已有文件
     //          SYNC_BLOCK - 强制关闭文件块,如果需要同步操作,每次写入后还需调用Syncable.hsync()方法
    public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt)  throws IOException

    例:
    写入前HDFS中目录结构:
    测试代码:

    public class WriteByCreate {
        static{
            PropertyConfigurator.configure("conf/log4j.properties");
        }
    
        @Test
        public void createTest() throws Exception {
            String localSrc = "/home/hadoop/merge.txt";
            String dst = "hdfs://localhost:9000/input/merge.txt";
    
            InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
            Configuration conf = new Configuration();
    
            FileSystem fs = FileSystem.get(URI.create(dst), conf);
            OutputStream out = null;
            try{
                out = fs.create(new Path(dst), 
                        new Progressable() {
                            public void progress() {
                                System.out.print(".");
                            }
                        });
                Log.info("write start!");
                IOUtils.copyBytes(in, out, 4096, true);
                System.out.println();
                Log.info("write end!");
            }finally{
                IOUtils.closeStream(in);
                IOUtils.closeStream(out);
            }
        }
    }

    2.向已存在文件末尾追加数据
    FileSystem的append()方法允许在一个已存在文件的最后偏移量处追加数据。追加操作是可选的,并不是所有hadoop文件系统都实现了该操作。
    Append()的重载方法

    //向指定文件中追加数据,默认缓冲区大小4096,文件不存在时抛出异常
    public FSDataOutputStream append(Path f) throws IOException
    
    //向指定文件中追加数据,bufferSize指定缓冲区大小,文件不存在时抛出异常
    public FSDataOutputStream append(Path f, int bufferSize) throws IOException
    
    //向指定文件中追加数据,bufferSize指定缓冲区大小,文件不存在时抛出异常,progress报告进度
    public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException;

    例:

    测试代码

    public class WriteByAppend{
        static{
            PropertyConfigurator.configure("conf/log4j.properties");
        }
    
        @Test
        public void appendTest() throws Exception {
            String localSrc = "/home/hadoop/merge.txt";
            String dst = "hdfs://localhost:9000/input/merge.txt";
    
            InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
            Configuration conf = new Configuration();
    
            FileSystem fs = FileSystem.get(URI.create(dst), conf);
            OutputStream out = null;
            try{
                out = fs.append(new Path(dst),4096, 
                                 new Progressable() {
                                    public void progress() {
                                        System.out.print(".");
                                    }
                                });
                Log.info("write start!");
                IOUtils.copyBytes(in, out, 4096, true);
                System.out.println();
                Log.info("write end!");
            }finally{
                IOUtils.closeStream(in);
                IOUtils.closeStream(out);
            }
        }
    }

    输出流FSDataOutputStream对象
    FileSystem的create()方法及append()方法返回的是FSDataOutputStream对象,它也有一个查询文件当前位置的方法getPos()。与FSDataInputStream不同,FSDataOutputStream不允许在文件中定位,因为HDFS只允许对一个已打开的文件顺序写入,或在现有文件末尾追加数据,不支持在除文件末尾外的其他位置进行写入,因此写入时定位没有意义。

    • 四. 创建目录
    FileSystem提供了创建目录的方法。可以一次性创建所有必要但还没有的父目录。

     public boolean mkdirs(Path f) throws IOException
     public abstract boolean mkdirs(Path f, FsPermission permission ) throws IOException;

    通常不需要显示创建一个目录,因为调用create()方法写入文件时会自动创建父目录。

    五. 查询文件系统
    文件元数据FileStatus
    FileStatus类封装了文件系统中文件和目录的元数据,FileStatus源码中可以看到如下属性

    public class FileStatus implements Writable, Comparable {
    
      private Path path;//文件或目录的path
      private long length;//文件字节数
      private boolean isdir;//是否是目录
      private short block_replication;//文件块备份数
      private long blocksize;//文件块大小
      private long modification_time;//修改时间
      private long access_time;//访问时间
      private FsPermission permission;//权限
      private String owner;//所属用户
      private String group;//所属用户组
      private Path symlink; //软连接
    
        //method
    }
    

    FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象
    例:

    public class ShowFileStatus {
        private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
        private FileSystem fs;
    
        @Before
        public void setUp() throws IOException {
            Configuration conf = new Configuration();
            if (System.getProperty("test.build.data") == null) {
                System.setProperty("test.build.data", "/tmp");
            }
            cluster = new MiniDFSCluster(conf, 1, true, null);
            fs = cluster.getFileSystem();
            OutputStream out = fs.create(new Path("/dir/file"));
            out.write("content".getBytes("UTF-8"));
            out.close();
        }
    
        @After
        public void tearDown() throws IOException {
            if (fs != null) { 
                fs.close(); 
            }
            if (cluster != null) { 
                cluster.shutdown(); 
            }
        }
    
        @Test(expected = FileNotFoundException.class)
        public void throwsFileNotFoundForNonExistentFile() throws IOException {
            fs.getFileStatus(new Path("no-such-file"));
        }
    
        @Test
        public void fileStatusForFile() throws IOException {
            Path file = new Path("/dir/file");
            Log.info("文件filestatus:");
            FileStatus stat = fs.getFileStatus(file);
            Log.info("path:"+stat.getPath().toUri().getPath());
            Log.info("isdir:"+String.valueOf(stat.isDir()));
            Log.info("length:"+String.valueOf(stat.getLen()));
            Log.info("modification:"+String.valueOf(stat.getModificationTime()));
            Log.info("replication:"+String.valueOf(stat.getReplication()));
            Log.info("blicksize:"+String.valueOf(stat.getBlockSize()));
            Log.info("owner:"+stat.getOwner());
            Log.info("group:"+stat.getGroup());
            Log.info("permission:"+stat.getPermission().toString());
        }
    
        @Test
        public void fileStatusForDirectory() throws IOException {
            Path dir = new Path("/dir");
            Log.info("目录filestatus:");
            FileStatus stat = fs.getFileStatus(dir);
            Log.info("path:"+stat.getPath().toUri().getPath());
            Log.info("isdir:"+String.valueOf(stat.isDir()));
            Log.info("length:"+String.valueOf(stat.getLen()));
            Log.info("modification:"+String.valueOf(stat.getModificationTime()));
            Log.info("replication:"+String.valueOf(stat.getReplication()));
            Log.info("blicksize:"+String.valueOf(stat.getBlockSize()));
            Log.info("owner:"+stat.getOwner());
            Log.info("group:"+stat.getGroup());
            Log.info("permission:"+stat.getPermission().toString());
        }
    }

    列出文件
    列出目录中内容,可以使用FileSystem的listStatus()方法。方法接收一个或一组路径,如果路径是文件,以数组方法返回长度为1的FileStatus对象,如果路径是目录,返回0个或多个FileStatus对象表示目录中包含的文件或目录;如果是一组路径,依次轮流对每个路径调用listStatus方法,将结果累积到一个数组
    方法如下

    //列出给定路径下的文件或目录的status  public abstract FileStatus[] listStatus(Path f)  throws FileNotFoundException, IOException;
    //列出给定路径下符合用户提供的filter限制的文件或目录的status
    public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException 
    //列出给定的一组路径下文件或目录的status
    public FileStatus[] listStatus(Path[] files) throws FileNotFoundException, IOException 
    //列出给定的一组路径下符合用户提供的filter限制的文件或目录的status
    public FileStatus[] listStatus(Path[] files, PathFilter filter)  throws FileNotFoundException, IOException

    例:

    public class ListFileStatus {
        private FileSystem fs = null;
        private String uri = "hdfs://localhost:9000/input/input1.txt";
        private Path[] paths = new Path[]{new Path("/input.zip"),new Path("/input/"),new Path("/output/")};
    
        private Logger log = Logger.getLogger(TestFSDataInputStream.class);
        static{
            PropertyConfigurator.configure("conf/log4j.properties");
        }
    
        @Before
        public void setUp() throws Exception {
            Configuration conf = new Configuration();
            fs = FileSystem.get(URI.create(uri), conf);
        }
    
        @Test
        public void listStatusTest() throws Exception {
            log.info("--------------------------------");
            log.info("列出文件 ["+paths[0]+"] 的status:");
            FileStatus[] status = fs.listStatus(paths[0]);
            printFileStatus(status);
            log.info("--------------------------------");
    
            log.info("--------------------------------");
            log.info("列出目录 ["+paths[1]+"] 的status:");
            status = fs.listStatus(paths[1]);
            printFileStatus(status);
            log.info("--------------------------------");
    
            log.info("--------------------------------");
            log.info("列出一组path "+Arrays.toString(paths)+" 的status:");
            status = fs.listStatus(paths);
            printFileStatus(status);
            log.info("--------------------------------");
        }
    
        protected void printFileStatus(FileStatus[] status){
            for (FileStatus s : status) {
                log.info(s.getPath()+" status:");
                log.info("isdir:"+String.valueOf(s.isDir()));
                log.info("length:"+String.valueOf(s.getLen()));
                log.info("modification:"+String.valueOf(s.getModificationTime()));
                log.info("replication:"+String.valueOf(s.getReplication()));
                log.info("blicksize:"+String.valueOf(s.getBlockSize()));
                log.info("owner:"+s.getOwner());
                log.info("group:"+s.getGroup());
                log.info("permission:"+s.getPermission().toString());
                log.info("\n");
            }
        }
    }

    另外,需要在一次操作中处理一批文件时,hadoop提供了通配符来匹配多个文件。
    通配符 名称 匹配
    * 星号 匹配0或多个字符
    ? 问号 匹配单衣字符
    [ab] 字符类 匹配{a,b}集合里的一个字符
    [^ab] 非字符类 匹配非{a,b}集合里的一个字符
    [a-b] 字符范围 匹配一个{a,b}范围内的字符,包括ab,a的字典顺序要小于等于b
    [^a-b] 非字符范围 匹配一个不在{a,b}范围内的字符,包括ab,a的字典顺序要小于等于b
    {a,b} 或选择 匹配包含a或b中一个的
    \c 转义字符 匹配原字符c

    hadoop的FileSystem为通配提供了2个globStatus()方法,方法返回所有文件路径与给定的通配符相匹配的文件的FileStatus,filter可进一步对匹配进行限制:

    public FileStatus[] globStatus(Path pathPattern) throws IOException
    public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

    例:
    目录结构
    /
    ├── 2007/
    │ └── 12/
    │ ├── 30/
    │ └── 31/
    └── 2008/
    └── 01/
    ├── 01/
    └── 02/

    通配符示例:

    通配符 Expansion
    /* /2007 /2008
    // /2007/12 /2008/01
    //12/ /2007/12/30 /2007/12/31
    /200? /2007 /2008
    /200[78] /2007 /2008
    /200[7-8] /2007 /2008
    /200[^01234569] /2007 /2008
    ///{31,01} /2007/12/31 /2008/01/01
    ///3{0,1} /2007/12/30 /2007/12/31
    /*/{12/31,01/01} /2007/12/31 /2008/01/01

    测试代码:

    public class ListFileStatus {
        private FileSystem fs = null;
        private String uri = "hdfs://localhost:9000/input/input1.txt";
        private Path[] globPaths = new Path[]{new Path("/*"),new Path("/*/*"),new Path("/*/12/*"),new Path("/200?")
                                                ,new Path("/200[78]"),new Path("/200[7-8]"),new Path("/200[^01234569]")
                                                ,new Path("/*/*/{31,01}"),new Path("/*/*/3{0,1}"),new Path("/*/{12/31,01/01}")};
    
        private Logger log = Logger.getLogger(TestFSDataInputStream.class);
        static{
            PropertyConfigurator.configure("conf/log4j.properties");
        }
    
        @Before
        public void setUp() throws Exception {
            Configuration conf = new Configuration();
            fs = FileSystem.get(URI.create(uri), conf);
        }
    
        @Test
        public void globStatusTest() throws Exception {
            for(Path p:globPaths){
                log.info("glob ["+p+"]: ");
                FileStatus[] status = fs.globStatus(p);
                printFilePath(status);
            }
        }
    
        protected void printFilePath(FileStatus[] status){
            Path[] listedPaths = FileUtil.stat2Paths(status);
            for (Path p : listedPaths) {
                log.info(p);
            }
            log.info("");
        }
    }

    通配符并不总能精确的描述的描述想要访问的文件集,如使用通配符排除一个特定的文件就不太可能。FileSystem的listStatus()方法和globStatus()方法提供可选的PathFilter对象,以编程方式控制通配符。过滤器只能作用于文件名,不能针对文件属性进行过滤
    PathFilter接口:

    public interface PathFilter {
        boolean accept(Path path);
    
    }

    例:
    测试代码

    public class ListFileStatus {
        private FileSystem fs = null;
        private String uri = "hdfs://localhost:9000/input/input1.txt";
    
        private Logger log = Logger.getLogger(TestFSDataInputStream.class);
        static{
            PropertyConfigurator.configure("conf/log4j.properties");
        }
    
        @Before
        public void setUp() throws Exception {
            Configuration conf = new Configuration();
            fs = FileSystem.get(URI.create(uri), conf);
        }
    
        @Test
        public void pathFilterTest() throws Exception {
            log.info("glob [/2007/*/*]: ");
            FileStatus[] status = fs.globStatus(new Path("/2007/*/*"));
            printFilePath(status);
    
            log.info("glob [/2007/*/*] except [/2007/12/31]: ");
            status = fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$"));
            printFilePath(status);
        }
    
        protected void printFilePath(FileStatus[] status){
            Path[] listedPaths = FileUtil.stat2Paths(status);
            for (Path p : listedPaths) {
                log.info(p);
            }
            log.info("");
        }
    
        class RegexExcludePathFilter implements PathFilter {
            private final String regex;
    
            public RegexExcludePathFilter(String regex) {
                this.regex = regex;
            }
            public boolean accept(Path path) {
                return !path.toString().matches(regex);
            }
        }
    }

    六. 删除数据
    FileSystem的delete()方法可以永久删除文件或目录。

    public boolean delete(Path f) throws IOException     
    
    //recursive为true时,非空目录及其内容才会被删除,否则抛出异常
    public abstract boolean delete(Path f, boolean recursive) throws IOException
    
    //标记当文件系统关闭时将删除的文件。当JVM关闭时,被标记的文件将被删除
    public boolean deleteOnExit(Path f) throws IOException
    
    展开全文
  • Java 抽象类org.apache.hadoop.fs.FileSystem 定义了Hadoop 中的一个文件系统接口:与Hadoop 的某一文件系统进行交互的API 。虽然我们主要关注的是HDFS的实例,即DistributedFileSystem,但总体来说,还是应该继承...

    Java 接口

    Hadoop 有一个抽象的文件系统概念, HDFS 只是其中的一个实现。Java 抽象类org.apache.hadoop.fs.FileSystem 定义了Hadoop 中的一个文件系统接口:与Hadoop 的某一文件系统进行交互的API 。虽然我们主要关注的是HDFS的实例,即DistributedFileSystem,但总体来说,还是应该继承FileSystem抽象类,并编写代码,以保持其在不同文件系统中的可移植性。

    从Hadoop URL 中读取数据

    要从Hadoop文件系统中读取文件,最简单的方陆是使用java.net.URL 对象打开数据流,进而从中读取数据。具体格式如下:

    InputStream in = null;
    try {
      in = new URL("hdfs://host/path").openStream();
      // process in
    } finally {
      IOUtils.closeStream(in);
    }

    让Java 程序能够识别Hadoop的hdfs URL 方案还需要一些额外的工作。这里采用的方能是通过FsUrlStreamHandlerFactory 实例调用URL中的setURLStreamHandlerFactory 方法。由于Java 虚拟机只能调用一次上述方法,

    因此通常在静态方法中调用上述方法。这个限制意味着如果程序的其他组件(不受你控制的第三方组件)已经声明了一个URLStreamHandlerFactory 实例,

    你将无法再使用上述方法从Hadoop 中读取数据。下一节将讨论另一备选方法。

    通过URLStreamHandler 实例以标准输出方式显示Hadoop 文件系统的文件

    public class URLCat {
    static {
    URL.setURLStreamHandlerFactorγ(new FsUrlStreamHandler、Factory());
    }
    public static void main(String[] args) throws Exception {
    InputStream in = null;
    try {
    in = new URL(args[0]).openStream();
    IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
    IOutils.closeStream(in);
    }
    }
    }
    


    我们可以调用Hadoop 中简洁的IOUtils 类,并在finally 子句中关闭数据流,

    同时也可以在输入流和输出流之间复制数据(本例中为System.out)。copyBytes

    方法的最后两个参数,第一个用于设置复制的缓冲区大小,第二个用于设置复制结束后是否关闭数据流。这里我们选择自行关闭输入流,因而System.out 不关

    闭输入流。

    下面是一个运行示例:

    % hadoop URLCat hdfs://localhost/user/tom/quangle.txt

    On the top of the Crumpetty Tree

    The Quangle Wangle sat,

    But his face you could not see,

    On account of his Beave Hat.

    通过FileSystem API 读取数据

    正如前一小节所解释的,有时无法在应用中设置URLStreamHandlerFactory实例。这种情况下,需要使用FileSystem API 来打开一个文件的输入流。

    Hadoop 文件系统中通过HadoopPath 对象来代表文件(而非java.io.File 对象,因为它的语义与本地文件系统联系太紧密)。你可以将一条路径视为一个Hadoop 文件系统URI ,如hdfs://localhost/userltom/quangle. txt 。

    FileSystem 是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS 。获取FileSystem 实例有两种静态工广方法:

    public staticFileSystem get(Configuration conf) throws IOException

    Public staticFileSystem get(URI uri, Configuration conf) throws IOException

    Configuration 对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如conf/core-site. xml) 。第一个方法返回的是默认文件系统(在conf/core-site.xml 中指定的,如果没有指定,则使用默认的本地文件系统)。第二个方法通过给定的URI 方案和权限来确定要使用的文件系统,如果给定URI 中没有指定方案,则返回默认文件系统。

    有了FileSystem 实例之后,我们调用open()函数来获取文件的输入流:

    Public FSDatalnputStream open(Path f) throws IOException

    Public abstract FSDatalnputStream open(Path f , int bufferSize)throws IOException

    第一个方法使用默认的缓冲区大小4 KB。

    将上述方法结合起来,我们重写上一个例子:

    直接使用FileSystem 以标准输出格式显示Hadoop 文件系统中的文件

    public class FileSystemCat {
    public static void main(String[] args)throws Exception {
    String uri = args[0] ;
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    InputStream in = null;
    try {
    in = fs.open(new Path(uri));
    IOUtils.copyBytes(in , System.out, 4096 ,false);
    } finally {
    IOUtils.closeStream(in);
    }
    }
    }

    FSDatalnputStream

    实际上, FileSystem 对象中的open() 方住返回的是FSDatalnputStream对象,而不是标准的java.io 类对象。这个类是继承了java.io.DatalnputStream 接口的一个特殊类,井支持随机访问,由此可以从流的任意位置读取数据。

    package org.apache.hadoop.fs;

    public class FSDatalnputStream extendsDatalnputStream

    implements Seekable, PositionedReadable {

    // implementation elided

    }

    Seekable 接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos ())的查询方法:

    public interface Seekable {

    void seek(long pos) throws IOException;

    long getPos() throws IOException;

    boolean seekToNewSource (long tagetPos) throwsIOException;

    }

    务必牢记, seek() 方也是一个相对高开销的操作,需要慎重使用。建议用流数据来构建应用的访问模式(如使用MapReduce) ,而非执行大量的seek() 方法。

    写入数据

    FileSystem 类有一系列创建文件的方法。最简单的方法是给准备创建的文件指定

    一个Path 对象,然后返回一个用于写入数据的输出流:

    public FSDataOutputStream create(Path f)throws IOException

    上述方法有多个重载版本,允许我们指定是否需要强制覆盖已有的文件、文件备份

    数量、写入文件时所用缓冲区大小、文件块大小以及文件权限。

    还有一个重载方法progressable ,用于传递回调接口,如此一来,可以把数据写人数据节点的进度通知到你的应用:

    package org.apache.hadoop.util;

    public interface Progressable {

    public void progress();

    }

    另一种新建文件的方法,是使用append()方法在一个已有文件末尾追加数据(还存

    在一些其他重载版本) :

    public FSDataOutputStream append(Path f)throws IOException

    该追加操作允许一个writer 打开文件后在访问该文件的最后偏移量处追加数据。有了这个API ,某些应用可以创建无边界文件,例如,日志文件可以在机器重启后在已有文件后面继续追加数据。该追加操作是可选的,井非所有Hadoop 文件系统都实现了该操作。例如, HDFS 支持追加,但S3 文件系统就不支持。

    展开全文
  • 使用HadoopJava API操作HDFS

    千次阅读 2017-08-13 11:14:49
    本文介绍Java API访问HDFS,实现文件的读写,文件系统的操作等。开发环境为eclipse,开发时所依赖的jar包,可在Hadoop安装目录下找到。Demopackage com.test.hdfs;import org.apache.hadoop.conf.Configuration; ...
  • 搭建hadoop集群设置hadoop-env.sh中JAVA堆内存参数导致以下问题 hadoop Invalid maximum heap size: -Xmx4096Mm Error: Could not create the Java Virtual Machine. Error 问题分析: 出现这个问题是因为设置堆内存...
  • 果然网上总是有大佬手把手教我们走向人生巅峰 ...     ...1.spring for hadoop 是为hadoop开发弄的框架,跟spring for web 和hadoop集成没关系 2.Hadoop各种相关jar包里的FSDataInputStream中
  • java抽象类import org.apache.hadoop.fs.FileSystem 定义了hadoop中的一个文件系统接口。 一、读取数据 1、从Hadoop URL读取数据 这个方法是通过FsURLStreamHandlerFactory实例调用java.net.URL对象的...
  • hadoop

    2018-11-07 11:59:33
    • # hadoop 概述 • 实战1:部署Hadoop高性能集群 Hadoop是什么 Hadoop是Lucene创始人Doug Cutting,根据Google的相关内容山寨出来的分布式文件系统和对海量数据进行分析计算的基础框架系统,其中包含MapReduce程序...
  • eclipse远程hadoop

    2015-06-10 02:22:48
    远程连接hadoop分布式环境 1、确保分布式环境版本与eclipse插件版本要一致(0.20.205.0),否则连接是提示: [img]http://dl2.iteye.com/upload/attachment/0109/3746/545fb9e5-e871-36b1-9e4f-26ea77aa1dd0.png...
  • 在搭建hadoop环境之前,我们要先配置java的jdk环境。 1.首先安装rz命令,rz命令可以从本地上传文件到虚拟机中 [root@node04 ~]# yum -y install lrzsz 当然,大家也可以使用别的方法将安装包传进虚拟机中 2.创建一...
  • hadoop深入研究:(二)——java访问hdfs

    万次阅读 2013-06-03 21:28:09
    转载请注明出处,http://blog.csdn.net/lastsweetop/article/details/9001467所有源码在github上,https://github.com/lastsweetop/styhadoop读数据使用hadoop url读取比较简单的读取hdfs数据的方法就是通过java.net...
  • Hadoop

    千次阅读 2020-01-08 16:32:15
    Hadoop大数据生态系统 实验教程 第1章 欢迎来到大数据的世界 1.1 拥抱大数据 当今的社会,是一个信息大爆炸的社会,社会在高速发展,科技发达,信息流通,人们之间的交流越来越密切,生活也越来越方便,大量的数据在...
  • java.lang.OutOfMemoryError: Java heap space ...at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.(MapTask.java:781) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.(MapTask.java:524) at org.a
  • 平时的开发中线程是个少不了的东西,比如tomcat里的servlet就是线程,没有线程我们如何提供多用户访问呢?不过很多刚开始接触线程的开发攻城师却在这个上面吃了...JAVA线程池管理及分布式HADOOP调度框架搭建
  • Hadoop系列文章 Hadoop部署Apache Hadoop 3.2.1 单节点部署Java安装下载安装包在服务器中解压到指定目录配置环境变量HDFS Shell命令一览测试Hadoop安装成果Apache Hadoop 3.2.1 伪分布式部署hadoop环境配置文件配置...
  • Hadoop HDFS (3) JAVA访问HDFS

    千次阅读 2014-09-28 23:34:19
    现在我们来深入了解一下Hadoop的FileSystem类。这个类是用来跟Hadoop的文件系统进行交互的。虽然我们这里主要是针对HDFS,但是我们还是应该让我们的代码只使用抽象类FileSystem,这样我们的代码就可以跟任何一个...
  • 通过nodetool status从命令行用来验证Cassandra是否正在运行。 配置文件的默认位置是/etc/cassandra。 日志和数据目录的默认位置是/var/log/cassandra/和/var/lib/cassandra。 可以配置启动选项(堆...
  • 我们想要使用Java 来操作HDFS,就要先连接到HDFS文件系统,好在Hadoop 已经有了官方的jar包可以直接使用里面的类和方法。使用下面的定义的方法要首先创建一个maven项目,导入hadoop的依赖和junit的依赖。在pom.xml...
  • Hadoop HDFS (3) JAVA訪问HDFS

    千次阅读 2019-11-28 10:07:40
    如今我们来深入了解一下Hadoop的FileSystem类。这个类是用来跟Hadoop的文件系统进行交互的。尽管我们这里主要是针对HDFS。可是我们还是应该让我们的代码仅仅使用抽象类FileSystem。这样我们的代码就能够跟不论什么一...
  • 设置set mapreduce.job.reduces=68;...Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#3 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 18,023
精华内容 7,209
关键字:

java调hadoop

java 订阅