精华内容
下载资源
问答
  • 修改JobTracker源码,把JobTracker启动时间写入文件/private/jobtracker/starttime (hdfs文件系统); 只需替换hadoop-core-1.0.4.jar文件,然后重启系统集群即可,启动后就可以在hdfs文件系统看到/private/...
  • hadoop 启动时间写入文件

    千次阅读 2013-12-20 14:31:36
    hadoop1.0.4; 昨天看了源码,说到可以使用log信息来打印出来JobTracker的启动时间,然后来解析这个Log文件,但是这里有个问题,就是...只要在JobTracker启动之后,然后再写入即可。 还是按源码来说(首先把原来修改的

    hadoop1.0.4;

    昨天看了源码,说到可以使用log信息来打印出来JobTracker的启动时间,然后来解析这个Log文件,但是这里有个问题,就是log文件是在hadoop的主节点namenode的linux机器中的,不是在客户端,读取会有一定的问题,然后就又想到了还是写文件(hdfs文件系统)的方式。只要在JobTracker启动之后,然后再写入即可。

    还是按源码来说(首先把原来修改的还原回来),看代码的302、303行:

    result = new JobTracker(conf, identifier);
            result.taskScheduler.setTaskTrackerManager(result);
    这里就是新建JobTracker的代码了,可以在这两行代码后面加上这样的代码:

    result = new JobTracker(conf, identifier);
            result.taskScheduler.setTaskTrackerManager(result);
            /**
             * add by fansy at 2013/12/20
             */
            writeString(JOBTRACKER_STARTTIME,conf);
            LOG.info("***---JOBTRACKER_STARTTIME:"+JOBTRACKER_STARTTIME);
    这样就可以写入了,同时保证不会是在safemode状态写入,这样就不会报错了。

    额,还需要修改的代码为:

    新增一个static 变量:

    private static String JOBTRACKER_STARTTIME;
    修改generateNewIdentifier代码:

     private static String generateNewIdentifier() {
    	JOBTRACKER_STARTTIME=getDateFormat().format(new Date());
        return JOBTRACKER_STARTTIME;
      }
    这样就可以了。

    编译、替换、重新启动集群,可以看到hdfs文件系统上面的新文件:


    集群启动的图片为:



    额,同时可以下载hadoop-core-1.0.4.jar 文件,在http://download.csdn.net/detail/fansy1990/6745283可以看到;

    需要读取这个文件,可以使用下面的方法:

    public static String readString(Path path, Configuration conf) throws IOException {
    		    FileSystem fs = FileSystem.get(path.toUri(), conf);
    		    FSDataInputStream in = fs.open(path);
    		    try {
    		      return in.readUTF();
    		    } finally {
    		      Closeables.closeQuietly(in);
    		    }
    		  }

    writeString 的代码:

    private static void writeString(String value,JobConf conf) {
    	  LOG.info("***********************************prepare to wirte to file with value:"+value);
    	  Path path=new Path("/private/jobtracker/starttime");
    	   FileSystem fs;
    	   FSDataOutputStream out=null;
    	    try {
    	    	fs = FileSystem.get(path.toUri(),conf);
    		    out = fs.create(path);
    		    out.writeUTF(value);
    	    } catch(Exception e){
    	    	LOG.info("********************************:"+e.getMessage());
    	    }finally {
    	      Closeables.closeQuietly(out);
    	    } 
      }
    


    如果您觉得lz的blog或者资源还ok的话,可以选择给lz投一票,多谢。(投票地址:http://vote.blog.csdn.net/blogstaritem/blogstar2013/fansy1990 )



    分享,成长,快乐

    转载请注明blog地址:http://blog.csdn.net/fansy1990




    展开全文
  • Hadoop的磁盘写入策略引发的问题

    千次阅读 2017-10-14 13:52:18
    DataNode挂载的磁盘或者DataNode节点挂载多个磁盘,如果存在一些磁盘大小一样,数据在落盘时就可能会出现磁盘使用率均匀的情况,容量较小的盘容易被写满,而容量大的盘还剩很多空间。磁盘写满后,影响Hadoop集群...

    DataNode挂载的磁盘或者DataNode节点挂载多个磁盘,如果存在一些磁盘大小不一样,数据在落盘时就可能会出现磁盘使用率不均匀的情况,容量较小的盘容易被写满,而容量大的盘还剩很多空间。磁盘写满后,影响Hadoop集群的正常工作。国庆第一天,线上集群就报出了JournalNode挂掉的异常情况,经查是由于2T的磁盘被写满,JournalNode无法再写入数据。当时采取了临时的措施,删掉HBase和Hive中不用,占大量空间的表。磁盘使用率下降一部分后,重新启动JournalNode。

    集群中每个DataNode都挂载了两个硬盘,分别为2T和4T的,2T基本都被写满,而4T的才50%多。是什么造成了这种数据落盘时的不均匀情况?本主要文调研了Hadoop的数据两种写入磁盘的策略,并分析了两种策略的主要源码实现,最后总结解决此次异常的经验。

    两种写入策略

    循环选取

    循环选取策略是在hfds1.0中实现的,hdfs2.x默认沿用hfds1.x的方式
    hdfs2.0默认沿用hfds1.0的方式,按照循环的策略,数据会均匀的落在不同大小的盘上,大磁盘和小磁盘存储的块是一样的,导致小的磁盘最先被写满。

    可用空间策略

    hdfs2.0也提供了另一种策略,将数据优先写入具有最大可用空间。通过一个概率计算出选择写入的磁盘,磁盘剩余空间大的将会获得更大的写入概率,这样磁盘的使用率就会相对均匀。

    两种方案的对比图如下,图来源于链接,能够很清楚的看出两种策略的不同。
    这里写图片描述
    hdfs3.0提供了一个在线磁盘均衡器diskbalancer ,能在不停机的情况下,对数据进行均衡操作。但是hadoop3.0仍是一个测试版本,因此不可能进行升级。

    源码分析

    循环选取策略

    循环选取的策略很简单,循环扫描整个Volumes,如果availableVolumeSize大于blockSize ,即返回该volume。为了保证每次选择的起点都不是从头开始,导致数据写满一个盘后再写另一个盘,使用了一个curVolumes定位器来防止这个问题。

    int curVolume = curVolumes[curVolumeIndex] < volumes.size()
                ? curVolumes[curVolumeIndex] : 0;
    int startVolume = curVolume;
    long maxAvailable = 0;
    while (true) {
          final V volume = volumes.get(curVolume);
          curVolume = (curVolume + 1) % volumes.size();
          long availableVolumeSize = volume.getAvailable();
          if (availableVolumeSize > blockSize) {
            curVolumes[curVolumeIndex] = curVolume;
            return volume;
          }
    
          if (availableVolumeSize > maxAvailable) {
            maxAvailable = availableVolumeSize;
          }
    
          if (curVolume == startVolume) {
            throw new DiskOutOfSpaceException("Out of space: "
                + "The volume with the most available space (=" + maxAvailable
                + " B) is less than the block size (=" + blockSize + " B).");
          }
        }

    可用空间策略

    1、通过计算最大剩余空间与最小剩余空间的差值,然后与阈值dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold进行对比,默认为10G,如果小于该值,将使用循环写入策略,如果不小于该值,则使用最大可用空间策略。

    public boolean areAllVolumesWithinFreeSpaceThreshold() {
          long leastAvailable = Long.MAX_VALUE;
          long mostAvailable = 0;
          for (AvailableSpaceVolumePair volume : volumes) {
            leastAvailable = Math.min(leastAvailable, volume.getAvailable());
            mostAvailable = Math.max(mostAvailable, volume.getAvailable());
          }
          return (mostAvailable - leastAvailable) < balancedSpaceThreshold;
        }

    2、通过与leastAvailable + balancedSpaceThreshold比较,将volume划分为两类集合。一类lowAvailableVolumes相对最小,一类highAvailableVolumes相对最大。

    public List<AvailableSpaceVolumePair> getVolumesWithHighAvailableSpace() {
          long leastAvailable = getLeastAvailableSpace();
          List<AvailableSpaceVolumePair> ret = new ArrayList<AvailableSpaceVolumePair>();
          for (AvailableSpaceVolumePair volume : volumes) {
                //leastAvailable为所有Volume中容量最小的
            if (volume.getAvailable() > leastAvailable + balancedSpaceThreshold) {
              ret.add(volume);
            }
          }
          return ret;
        }

    3、根据dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction的大小(默认为0.75f)和lowAvailableVolumes,highAvailableVolumes的大小计算出一个两类Volumes选取的概率。这里没有直接使用0.75f,而是考虑到了两类Volume的数量的影响,如果highAvailableVolumes的数量大于lowAvailableVolumes,则计算出的Volume选取概率将大于0.75。反之则小。

            // 获得相对最大和相对最小的磁盘集合,将volume划分为两类
          List<V> highAvailableVolumes = extractVolumesFromPairs(
              volumesWithSpaces.getVolumesWithHighAvailableSpace());
          List<V> lowAvailableVolumes = extractVolumesFromPairs(
              volumesWithSpaces.getVolumesWithLowAvailableSpace());
    
          // 算出一个相对概率
          float preferencePercentScaler =
              (highAvailableVolumes.size() * balancedPreferencePercent) +
              (lowAvailableVolumes.size() * (1 - balancedPreferencePercent));
          float scaledPreferencePercent =
              (highAvailableVolumes.size() * balancedPreferencePercent) /
              preferencePercentScaler;

    4、最后随机生成一个概率与scaledPreferencePercent对比,从而决定从highAvailableVolumes,还是lowAvailableVolumes中选择Volume。这里同样使用了循环顺序选择策略。

           if (mostAvailableAmongLowVolumes < replicaSize ||
              random.nextFloat() < scaledPreferencePercent) {
            // 在high volume中循环选择一个
            volume = roundRobinPolicyHighAvailable.chooseVolume(
                highAvailableVolumes, replicaSize);
          } else {
            // 在low volume中循环选择一个
            volume = roundRobinPolicyLowAvailable.chooseVolume(
                lowAvailableVolumes, replicaSize);
          }

    汇总:程序根据设定的阈值判断使用循环顺序策略还是最大可用空间策略,如果使用最大可用空间策略,将所有的Volume分为两类,根据设置的选取概率和每一类的数量计算出每一类的选取概率,然后在选取到的集合中再使用循环顺序策略。

    解决方法

    由于循环策略造成磁盘不均的解决方法如下:
    1、数据清理:此方法属于紧急措施:清理掉hdfs中不用的数据
    2、数据压缩:手动压缩部分数据,对于HBase可使用GZ压缩方式,能快速有效的降低磁盘使用率
    3、数据移盘:手动进行数据的移动,将部分数据由写满的盘移动到其它盘中

    主要有三步操作:
    1、关闭DataNode节点
    2、使用mv命令移动数据,要绝对保证移动后的数据相对目录与移动前一致,如移动前data/1/dfs/dn/current/BP-1788246909-172.23.1.202-1412278461680/current/finalized/subdir0/subdir1/,移动后为data/5/dfs/dn/current/BP-1788246909-172.23.1.202-1412278461680/current/finalized/subdir0/subdir1/
    3、重启DataNode
    可以参考 https://wiki.apache.org/hadoop/FAQ#On_an_individual_data_node.2C_how_do_you_balance_the_blocks_on_the_disk.3F
    4、通过上述步骤后,可以选择切换到可用空间策略上。

    总结

    经过此次异常情况,我重新梳理了问题的过程,分析薄弱的环节,加强的对Hadoop磁盘的监控,增加对异常的处理手段。同时也对Hadoop的磁盘写入策略进行了调研,了解问题产生的原因,才能更好的解决问题。

    展开全文
  • 功能描述:从文件中读取数据写入mysql. 问题:为什么程序执行map100%,combine100%,reduce0%.程序没有运行到reduce中 package b508.demo; import java.io.IOException; import java.io.DataInput; import java.io....
  • Hadoop SequenceFile 是一个由二进制形式key/value的字节流组成的存储文件,SequenceFile可压缩可切分,非常适合hadoop文件存储特性,SequenceFile的写入由SequenceFile.Writer来实现, 根据压缩类型SequenceFile....

    Hadoop SequenceFile 是一个由二进制形式key/value的字节流组成的存储文件,SequenceFile可压缩可切分,非常适合hadoop文件存储特性,SequenceFile的写入由SequenceFile.Writer来实现, 根据压缩类型SequenceFile.Writer又派生出两个子类SequenceFile.BlockCompressWriter和SequenceFile.RecordCompressWriter, 压缩方式由SequenceFile类的内部枚举类CompressionType来表示,定义了三种方式

    不采用压缩:

    CompressionType.NONE

    记录级别的压缩:

    CompressionType.RECORD

    块级别的压缩:

    CompressionType.BLOCK

    使用时可以通过参数: io.seqfile.compression.type=[NONE|RECORD|BLOCK] 来指定具体的压缩方式.

    写入SequenceFile时通过创建一个SequenceFile.Writer来实现SequenceFile.Writer writer = SequenceFile.createWriter然后调用writer.append(key, value);方法进行数据写入, 根据指定的压缩方式不同,写入时SequenceFile组织内部结构也有所不同.

    SequenceFile Header在三种压缩方式都是相同的,在创建SequenceFile.Writer对象时在构造函数中依次调用

    initializeFileHeader(); writeFileHeader(); finalizeFileHeader();

    来完成文件头的写入.

    SequenceFile文件头格式如下:

    SequenceFile 内容,根据指定的压缩方式不同,组织结构也有所不同,当压缩方式指定为CompressionType.NONE,CompressionType.RECORD时,文件内容由 同步标记+RECODE 组成,当压缩方式指定为CompressionType.BLOCK时,文件内容由 同步标记+BLOCK 组成

    同步标记+RECODE:

    输出流会维护一个上次插入同步点时的记录位置(lastSyncPos,初始为0),每次append(key, value)时都会检查当前输出流pos与上次同步点之间的距离是否大于等于SYNC_INTERVAL, 如果是, 就会插入一个同步点(sync)

    CompressionType.NONE 时 记录信息不压缩

    CompressionType.RECORD 时 记录信息压缩(单条记录压缩)

    同步标记+BLOCK:

    BlockCompressWriter内部维护keyBuffer,valBuffer,每次append(key, value)时会把key和value对象序列化到keyBuffer和valBuffer, 并判断keyBuffer和valBuffer相加后的size是否大于等于compressionBlockSize, 如果是则插入一个同步点,并刷出数据流成一个block.

    每个block与block之间都会有一个同步点(sync)

    一个block内会有多条记录组成,压缩是作用在block之上的,比RECODE方式能获得更好的压缩比

    compressionBlockSize可以通过io.seqfile.compress.blocksize=size参数指定,默认值是1000000

    展开全文
  • Hadoop中HDFS写入文件的原理剖析

    千次阅读 2015-06-16 20:27:04
    要为即将到来的大数据时代最准备不是,下面的大白话简单记录了Hadoop中HDFS在存储文件时都做了哪些个事情,位将来集群问题的排查提供一些参考依据。 步入正题 创建一个新文件的过程: 第一步:客户端通过...

    要为即将到来的大数据时代最准备不是,下面的大白话简单记录了Hadoop中HDFS在存储文件时都做了哪些个事情,位将来集群问题的排查提供一些参考依据。

    步入正题

    创建一个新文件的过程:

    第一步:客户端通过DistributedFilesystem 对象中的creat()方法来创建文件,此时,RPC会 通过一个RPC链接协议来调用namenode,并在命名空间中创建一个新文件,namenode执行各种权限以及文件isexist 的检查,dfs返回一个输出流,否则抛出 IOEXCEPTION。输出流控制一个DFSoutPutstream,负责处理数据节点和名称节点之间的通信

    第二步:客户端开始通过输出流写入数据,DFSoutPutstream将客户端写入的数据分成一个个的数据包包,然后写入到dfs中的一个queue,这些queue中的数据包被dfs中的数据流管理,数据流通过一定的分发机制,将这些数据包形成副本并存放在datanode上,当前例如我们设置的dfs.replication=3,则需要将副本放在三个datanode上,这三个datanode会通过一个管线连接,数据流将包分流给管线中第一个的datanode,这个节点会存储包并且发送给管线中的第二个datanode。同样地,第二个数据节点存储包并且传给管线中第三个datanode

    (我就不画流程图了,大家肯定能想明白咯  )

    第三步:其实第三步应该归属到第二步里面,上一步中所提到的DFSoutPutstream有一个内部等待确认queue,专门用来存放datanode收到的数据包,只有管线中所有的datanode收到副本并且存储成功返回成功标识后等待确认queue才会移除所有的数据包。大家此时可能要问了,如果在复制过程中管线中的某一个datanode 发生了故障,hadoop是如何处理的呢?这就是hadoop的容错的强大之处了;

    首先、管线会关闭,等待确认队列中的所有数据包都会被添加回到数据队列,由此可以保证数据包的完整性和顺序性

    其次、当前块中取一个正常的数据节点,使其联系namenode,将故障节点告知namenode、由此下次故障节点恢复后能将里面残留的不完整的副本文件清空。

    第三、故障节点被删除,余下的数据包继续写入到剩下的节点中。namenode注意到当前的副本不足(dfs.replication=3),则会在另外一个datanode上安排创建新的副本。

    此时问题就来了(挖掘技术哪家强??  ),如果在写入期间,datanode大规模的发生故障怎么办眤??

    其实这种情况很少发生但林子大了什么鸟都有是不是,我们在部署hadoop 有一个配置选项:dfs.replication.min  一般默认是1 ,意思就是说只要有一个节点成功,则hdfs就认为本次写入时成功的,后续的它本身会自己去意识到副本数的不足而去进行复制冗余。

    最后、书接上文,客户端写入完成后就会通过DistributedFilesystem 调用close()方法,该方法有一个神奇的作用,它会将数据队列剩下的所有包包都放在等待确认queue中,并等待确认,namenode中已经记录下来了所有副本存放的datanode。

    看完理论知识和大家用简单地大白话分享下,可有意思了

    Hadoop中HDFS读取文件的原理剖析 http://www.linuxidc.com/Linux/2015-02/113639.htm

    Hadoop中HDFS读取和写入的工作原理 http://www.linuxidc.com/Linux/2015-02/112775.htm

    将本地文件拷到HDFS中 http://www.linuxidc.com/Linux/2013-05/83866.htm

    从HDFS下载文件到本地 http://www.linuxidc.com/Linux/2012-11/74214.htm

    将本地文件上传至HDFS http://www.linuxidc.com/Linux/2012-11/74213.htm

    HDFS基本文件常用命令 http://www.linuxidc.com/Linux/2013-09/89658.htm

    Hadoop中HDFS和MapReduce节点基本简介 http://www.linuxidc.com/Linux/2013-09/89653.htm

    更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

    本文永久更新链接地址http://www.linuxidc.com/Linux/2015-02/113638.htm

    展开全文
  • hadoop文件写入

    2018-02-11 15:30:00
    由上图可知;写入文件分为三个角色,分别是clientnode namenode 和datanodecliennode本质为java虚拟机.namenode 和datanode则是Hadoop数据集群存储块第一步:create实际是客户端创建Distribute...
  • Hadoop向HDFS写入、删除、读取文件

    千次阅读 2015-05-10 22:59:07
    先说说遇到的一些问题: 1、比如Configuration 这个如果点Eclipse的红色小叉,。...3、byte[]数组开得比较大然后写入的话,会存在问题是写入很多0,我的方法比较笨,对string判断,为0的之后的就写。。
  • hadoop SequenceFile 写入例程注释

    千次阅读 2011-08-18 21:24:21
    import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; im
  • Hadoop】HDFS文件写入流程详解

    千次阅读 2018-08-06 09:48:45
    HDFS文件写入流程详解 HDFS文件写入流程   题目:Client 端上传文件的时候下列哪项正确? A. 数据经过 NameNode 传递给 DataNode B. Client 端将文件切分为 Block,依次上传 C. Client 只上传数据到...
  • 浅析hadoop写入数据api

    2014-01-07 15:35:06
    hadoop中的读api很简单用FSDataInputStream类就可以满足一般要求,而hadoop中的写操作却是和普通java操作一样。 hadoop对于写操作提供了一个类:FSDataOutputStream,这个类重载了很多write方法,用于...
  • luncen写入hadoop实例

    2016-07-07 09:56:13
    采用jar luncen 5.4 版本 hadoop 2.6 windows 版本 下载前请自行安装hadoop 2.6 windows 版本网上有教程
  • Hadoop 创建文件并写入数据

    千次阅读 2015-04-05 19:09:25
    import java.io.IOException;...import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; pu
  • Hadoop入门——初识Hadoop

    万次阅读 多人点赞 2018-07-18 13:55:59
    Hadoop被公认是一套行业大数据标准开源软件,在分布式环境下提供了海量数据的处理能力。几乎所有主流厂商都围绕Hadoop开发工具、开源软件、商业化工具和技术服务。今年大型IT公司,如EMC、Microsoft、Intel、...
  • show creat table 表名 找到建表的位置 location ...hadoop fs -text viewfs://cluster11/user/mbadp/hive/warehouse/t_monitor_user_profile/source=news/dt=20180628/* > ~/data/mbadp/profile_news.txt * 通配...
  • 1,在介绍hadoop写文件的时候我们经常会说首先分割文件为多个块;那么是怎么分割的呢?这里其实不要有过的纠结,这里的块是block,是hdfs中切块的大小,属于物理划分,默认64M,在hadoop-default.xml配置中有体现:&...
  • hadoop API 写入HDFS简单注释

    千次阅读 2011-08-18 11:14:42
    import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream;...import org.apache.hadoop.conf.Configuration;...//通过管道进行写入  } }
  • Hadoop文件写入(详细剖析)

    千次阅读 2015-09-03 21:01:20
    文件是如何写入 HDFS 的。尽管比较详细,但对于理解数据流还是很有用的,因为它清楚地说明了 HDFS 的一致模型。要考虑的情况是如何创建一个新文件,并把数据写入该文件,最后关闭该文件。参见上图客户端通过对 ...
  • 问题: 普通用户echo写入文件,提示权限不够。 解决方式: sudo tee test.txt <<< "要插入内容" 转载于:https://www.cnblogs.com/iCheny/p/9766910.html
  • hadoop中的读api很简单用FSDataInputStream类就可以满足一般要求,而hadoop中的写操作却是和普通java操作一样。 Hadoop对于写操作提供了一个类:FSDataOutputStream,这个类重载了很多write方法,用于写入很多...
  • Hadoop-写入数据的几种方式

    千次阅读 2016-03-24 15:36:58
    create(新建) import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream;...import org.apache.hadoop.conf.Configurati
  • Hadoop

    万次阅读 多人点赞 2019-09-16 22:44:08
    Hadoop简介 Hadoop 的思想之源:Google 第一遇到大数据计算问题的公司 Openstack :NASA 面对的数据和计算难题 - 大量的网页怎么存储 - 搜索算法 带给我们的关键技术和思想(Google三大理论) - GFS 文件存储 - Map-...
  • hadoop原理学习——hdfs写入数据

    千次阅读 2018-04-11 17:19:41
    Hadoop 解决的是哪些问题?简单来讲,大型企业和政府都可能会包含有大量数据, (我们可以看做是一块巨大的豆腐)例如马路卡口监控视频拍摄的机动车号牌,我们如果要对如此海量的数据进行复杂的分析,还要非常快速的...
  • 请管理员删除 有敏感信息。请管理员删除 有敏感信息。
  • kettle使用Hadoop写入数据 ,第二次会把第一次写入的数据覆盖掉怎么解决,请各位大神指教谢谢
  • hadoop小程序---写入hdfs

    千次阅读 2014-02-07 17:54:46
    在eclipse中引入hadoop的核心包,我用的是。 写一个测试类 import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs....
  • 本节和大家继续学习一下有关Hadoop集群性能优化中机架感知配置方面的内容,主要包括写入数据和读出数据两大部分,希望通过本节的介绍大家对Hadoop集群中机架感知配置有一定的认识。 Hadoop集群功能测试 以下是分别...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 95,496
精华内容 38,198
关键字:

hadoop不能写入