精华内容
下载资源
问答
  • 小文件
    万次阅读
    2021-11-23 11:04:09

    小文件问题

    读者交流群已经开通了,有需要的可以私信进入读者交流群

    1. 小文件在HDFS中存储本身就会占用过多的内存空间,这是因为每个文件都有元数据存储在内存中,给namenode内存中fsImage的合并造成压力,如果namenode内存使用完了,这个集群将不能再存储文件了;
    2. 对于MR查询过程中过多的小文件又会造成启动过多的Mapper Task, 每个Mapper都是一个后台线程,会占用JVM的空间
    3. 即使map阶段都设置了小文件合并,org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,太多小文件导致合并时间较长,查询缓慢;

    案例

    最近发现离线任务对一个增量Hive表的查询越来越慢,这引起了我的注意,我在cmd窗口手动执行count操作查询发现,速度确实很慢,才不到五千万的数据,居然需要300s,这显然是有问题的,我推测可能是有小文件。

    我去hdfs目录查看了一下该目录:

    图片

    发现确实有很多小文件,有480个小文件,我觉得我找到了问题所在,那么合并一下小文件吧:

    insert into test select * from tabl
    更多相关内容
  • 为啥集群小文件治理那么重要,你真的懂吗?

    千次阅读 多人点赞 2021-06-01 15:43:15
    小文件是 Hadoop 的常见挑战,如果不小心处理,它们可能会导致许多并发症。Apache Hadoop 分布式文件系统 (HDFS) 旨在存储和处理 TB 和 PB 级的大型数据集。但是,HDFS 存储小文件效率低下,导致 Namenode 内存利用...

            小文件是 Hadoop 集群运维中的常见挑战,尤其对于大规模运行的集群来说可谓至关重要。如果处理不好,可能会导致许多并发症。Hadoop集群本质是为了TB,PB规模的数据存储和计算因运而生的。为啥大数据开发都说小文件的治理重要,说HDFS 存储小文件效率低下,比如增加namenode负载等,降低访问效率等?究竟本质上为什么重要?以及如何从本质上剖析小文件,治理小文件呢?今天就带你走进小文件的世界。

     1.什么是小文件?

           日常生产中HDFS上小文件产生是一个很正常的事情,有些甚至是不可避免,比如jar,xml配置文件,tmp临时文件,流式任务等都是小文件的组成部分。当然更多的是因为集群设置不合理,造成一些意料之外的小文件产生。实际公司生产中对于小文件的大小没有一个统一的定义。一般公司集群的blocksize的大小在128/256两者居多。首先小文件大小肯定是要远小于blocksize的文件。一般公司小文件的大小定义如1Mb,8Mb,甚至16Mb,32Mb更大。根据公司实际集群状态定义,因为有些情况合并小文件需要消耗额外的资源。

          既然剖析小文件,那么不可避免的要先剖析hdfs的存储原理。众多周知了,HDFS上文件的数据存储分为namenode元数据管理和实际数据文件。hdfs上的数据文件被拆分成块block,这些块block在整个集群中的datanode的本地文件系统上存储和复制,每个块也维护者自己的blockmeta信息。namenode主要维护这些文件的元数据信息,具体namenode的解析参考我的其他博客。

    如下一个某个文件的某个block在data上存储的情况。

    2.小文件的产生

    1.流式数据,如flume,kafak,sparkstreaming,storm,flink等,流式增量文件,小窗口文件,如几分钟一次等。

    2.MapReduce引擎任务:如果纯map任务,大量的map;如果mapreduce任务,大量的reduce;两者都会造成大量的文件。出现这种情况的原因很多,果分布表的过度分区,输入大量的小文件,参数设置的不合理等,输出没有文件合并等。

    3.spark任务过度并行化,Spark 分区越多,写入的文件就越多。

    4.文件的压缩与存储格式不合理;一般生产公司很少使用textfile这种低效的文件格式了。

    使用压缩,降低文件的大小,同时也会降低文件的总块数。注意文件存储格式和压缩不合理只是加剧小文件问题,不是产生小文件的本质。

    3.小文件的危害

    3.1小文件对namenod的影响

    如下图1,一个文件192Mb,默认blocksize=128Mb,副本个数为3,存储为2个block。

     如下图2,同样一个文件192Mb,默认blocksize=128Mb,副本个数为3,存储为192个block

          namenode的namespace中主要占存储对象是文件的目录个数,文件(文件名长度)以及文件block数根据namenode实际使用经验来看,一个存储对象大概占用150字节的空间。HDFS上存储文件占用的namenode内存计算公式如下:

           Memory=150bytes*(1个文件inode+(文件的块数*副本个数))

         如上图1 ,一个文件192Mb,默认blocksize=128Mb,副本个数为3,存储为2个block,需要namenode内存=150*(1+2*3)=1050  Bytes

         同理,图2 一个文件192Mb,默认blocksize=128Mb,副本个数为3,存储为192个block,需要namenode内存=150 x (192 + (192 x 3)) = 115200 Bytes

    尖叫总结:

    1 .从上面可以看出,同样的一个文件,大小不同形态的存储占用namenode的内存之比相差了109倍之多。所以如果对于单namenode的集群来说,大量的小文件的会占用大量的namenode堆内存空间,给集群的存储造成瓶颈。有些人可能会说我们联邦,多组namenode不就没有这个问题了,其实不然,且往下看

    2.当 NameNode 重新启动时(虽然生产上这种情况很少),它必须将文件系统元数据fsimage从本地磁盘加载到内存中。这意味着如果 namenode 元数据很大,重启会更慢(以我们公司3亿block,5万多个文件对象来说,重启一次1.5小时,期间应用不可用)其次,datanode 还通过网络向 NameNode 报告块更改;更多的块意味着要通过网络报告更多的变化,等待时间更长。

    3.更多的文件,更多的block,意味着更多的读取请求需要由 NameNode 提供服务,这将增加 RPC 队列和处理延迟,进而导致namenode性能和响应能力下降。官方介绍说接近 40K~50K RPCs/s 人为是极高的负载。实际使用来看比这低时对于namenode来说性能都会打很大的折扣。

    3.2 小文件对datanode影响

          文件的block存储是存储在datanode本地系统上,底层的磁盘上,甚至不同的挂载目录,不同的磁盘上。大量的小文件,意味着数据着寻址需要花费很多时间,尤其对于高负载的集群来说,磁盘使用率50%以上的集群,花费在寻址的时间比文件读取写入的时间更多。这种就违背了blocksize大小设计的初衷(实践显示最佳效果是:寻址时间仅占传输时间的1%)。这样会造成磁盘的读写会很慢,拥有大量小文件会导致更多的磁盘搜索。如下磁盘延迟:

    3.3小文件对计算的影响

            基于HDFS文件系统的计算,blokc块是最小粒度的数据处理单元。块的多少往往影响应用程序的吞吐量。更多的文件,意味着更多的块,以及更多的节点分布。

            比如以MapReduce任务为例(hive等),在 MapReduce 中,会为每个读取的块生成一个单独的 Map 任务,如果大量小文件,大量的块,意味着着更多任务调度,任务创建开销,以及更多的任务管理开销(MapReduce 作业的 application master 是一个 Java 应用,它的主类是 MRAppMaster。它通过创建一定数量的bookkeeping object跟踪作业进度来初始化作业,该对象接受任务报告的进度和完成情况)。虽然可以开启map前文件合并,但是这也需要不停地从不同节点建立连接,数据读取,网络传输,然后进行合并,同样会增加消耗资源和增加计算时间,成本也很高。

          同样,如果是spark计算引擎,executor的一次读取和处理一个分区,默认情况下,每个分区是一个 HDFS 块,如果大量的小文件,每个文件都在不同的分区中读取,这将导致大量的任务调度开销,同时每个 CPU 内核的吞吐量降低。

         简单总结一下:小文件对于计算的影响就是需要大量节点之间频繁建立联系,数据传输等,浪费资源,消耗时间长。其次小文件相关大量的任务初始化时间甚至比计算时间还长,造成计算资源的使用浪费,降低集群的吞吐量。

         后续请见小文件多维度治理....................

       大多数开发人员都弄错的Hive与MapReduce小文件合并问题

     

    展开全文
  • [Hadoop合并小文件的两种解决方案]

    万次阅读 多人点赞 2018-07-03 21:50:28
    在Hadoop的运行环境中,什么文件是小文件?在Hadoop的世界中,小文件是指文件大小远远小于HDFS块大小的文件。Hadoop2.0中,HDFS默认的块大小是128MB,所以,比如2MB,7MB或9MB的文件就认为是小文件。在Hadoop的环境中...

        在Hadoop的运行环境中,什么文件是小文件?在Hadoop的世界中,小文件是指文件大小远远小于HDFS块大小的文件。Hadoop2.0中,HDFS默认的块大小是128MB,所以,比如2MB,7MB或9MB的文件就认为是小文件。在Hadoop的环境中,块大小是可以通过参数配置的,这个参数由一个名为dfs.block.size定义。如果一个应用要处理一个超大的文件,可以通过这个参数设置更大更大得到块文件,比如256MB或512MB。

        Hadoop的应用中,Hadoop可以很好的处理大文件,不过当文件很多,并且文件很小时,Hadoop会把每一个小文件传递给map()函数,而Hadoop在调用map()函数时会创建一个映射器,这样就会创建了大量的映射器,应用的运行效率并不高。如果使用和存储小文件,通常就会创建很过的映射器。例如,如果有2000个文件,每一个文件的大小约为2-3MB,在处理这一批文件时,就需要2000个映射器,将每一个文件发送到一个映射器,效率会非常低的。所以,在Hadoop的环境环境中,要解决这个问题,就需要把多个文件合并为一个文件,然后在进行处理。如上面的例子中,可以把40-50个文件合并为衣蛾块大小的文件(接近块大小128MB),通过合并这些小文件,最后就只需要40-50个映射器,这样效率就可以有较大提升了。Hadoop主要设计批处理大量数据的大文件,不是很多小文件。解决小文件问题的主要目的就是通过合并小文件为更大的文件来加快Hadoop的程序的执行,解决小文件问题可以减少map()函数的执行次数,相应地提高hadoop作业的整体性能。

        本文中,将为小文件问题提供两个解决方法:

    1、 在客户端将小文件合并为大文件。

    2、 使用Hadoop的CombineFileInputFormat<K,V>实现小文件的合并。

    在客户端合并小文件

        将小文件提交到MapReduce/Hadoop之前,需要先把这些小文件合并到大文件中,再把合并的大文件提交给MapReduce驱动器程序。

        定义一个SmallFilesConsolidator类接受一组小文件,然后将这些小文件合并在一起,生成更大的Hadoop文件,这些文件的大小接近于HDFS块大小(dfs.block.size),最优的解决方案便是尽可能创建少的文件。

        定义一个BucketThread类,这个类把小文件合并为一个大小于或接近于HDFS块大小的大文件。BucketThread是一个实现了Runable接口的独立线程,通过提供copyMerge()方法,把小文件合并为一个大文件。由于BucketThread是一个线程,所有的BucketThread对象可以并发的合并小文件。copyMerge()是BucketThread类的核心方法,它会把一个桶中的所有小文件合并为为一个临时的HDFS文件。例如,如果一个同种包含小文件{file1,file2,file3,file4,file5},那么合并得到的文件如下图所示:


    SmallFilesConsolidator类的实现

    /**
     * 为Hadoop作业驱动程序提供通用小文件进行合并功能。
     *
     */
    public class SmallFilesConsolidator {
    
    	private static Logger logger = Logger.getLogger(SmallFilesConsolidator.class);
    
    	// 可配置的HDFS根目录
    	private static String MERGED_HDFS_ROOT_DIR = "/tmp/";
    
    	/**
    	 * 获取Buckets的数量
    	 * 
    	 * @param totalFiles:总文件数
    	 * 
    	 * @param numberOfMapSlotsAvailable:
    	 * 
    	 * @param maxFilesPerBucket:每一个Bucket的最大文件数
    	 * 
    	 */
    	public static int getNumberOfBuckets(int totalFiles, int numberOfMapSlotsAvailable, int maxFilesPerBucket) {
    		if (totalFiles <= (maxFilesPerBucket * numberOfMapSlotsAvailable)) {
    			return numberOfMapSlotsAvailable;
    		} else {
    			int numberOfBuckets = totalFiles / maxFilesPerBucket;
    			int remainder = totalFiles % maxFilesPerBucket;
    			if (remainder == 0) {
    				return numberOfBuckets;
    			} else {
    				return numberOfBuckets + 1;
    			}
    		}
    	}
    
    	/**
    	 * 为映射器创建Buckets
    	 *
    	 */
    	public static BucketThread[] createBuckets(int totalFiles, int numberOfMapSlotsAvailable, int maxFilesPerBucket) {
    		int numberOfBuckets = getNumberOfBuckets(totalFiles, numberOfMapSlotsAvailable, maxFilesPerBucket);
    		BucketThread[] buckets = new BucketThread[numberOfBuckets];
    		return buckets;
    	}
    
    	/**
    	 * 填充Bucket
    	 *
    	 * @param buckets:所有Bucket列表
    	 * 
    	 * @param smallFiles:小文件数
    	 * 
    	 * @param job:Hadoop运行的作业
    	 * 
    	 * @param maxFilesPerBucket:每一个Bucket的最大文件数
    	 */
    	public static void fillBuckets(BucketThread[] buckets, List<String> smallFiles, Job job, int maxFilesPerBucket)
    			throws Exception {
    
    		int numberOfBuckets = buckets.length;
    		// 将所有的小文件分区并填充到bucket中
    		int combinedSize = smallFiles.size();
    		int biosetsPerBucket = combinedSize / numberOfBuckets;
    		if (biosetsPerBucket < maxFilesPerBucket) {
    			int remainder = combinedSize % numberOfBuckets;
    			if (remainder != 0) {
    				biosetsPerBucket++;
    			}
    		}
    
    		String parentDir = getParentDir();
    		// 使用Bucket的序号定义Bucket的Id(范围是从0到numberOfBuckets-1)
    		int id = 0;
    		int index = 0;
    		boolean done = false;
    		while ((!done) & (id < numberOfBuckets)) {
    			// 创建一个Bucket对象
    			buckets[id] = new BucketThread(parentDir, id, job.getConfiguration());
    			// 使用小文件填充Bucket
    			for (int b = 0; b < biosetsPerBucket; b++) {
    				buckets[id].add(smallFiles.get(index));
    				index++;
    				if (index == combinedSize) {
    					done = true;
    					break;
    				}
    			}
    			id++;
    		}
    	}
    
    	/**
    	 * 对于每一个Bucket启动一个线程,并合并小文件
    	 *
    	 */
    	public static void mergeEachBucket(BucketThread[] buckets, Job job) throws Exception {
    		if (buckets == null) {
    			return;
    		}
    
    		int numberOfBuckets = buckets.length;
    		if (numberOfBuckets < 1) {
    			return;
    		}
    
    		for (int ID = 0; ID < numberOfBuckets; ID++) {
    			if (buckets[ID] != null) {
    				buckets[ID].start();
    			}
    		}
    
    		// 等待所有线程完成
    		for (int ID = 0; ID < numberOfBuckets; ID++) {
    			if (buckets[ID] != null) {
    				buckets[ID].join();
    			}
    		}
    
    		for (int ID = 0; ID < numberOfBuckets; ID++) {
    			if (buckets[ID] != null) {
    				Path biosetPath = buckets[ID].getTargetDir();
    				addInputPathWithoutCheck(job, biosetPath);
    			}
    		}
    	}
    
    	private static void addInputPathWithoutCheck(Job job, Path path) {
    		try {
    			FileInputFormat.addInputPath(job, path);
    			logger.info("added path: " + path);
    		} catch (Exception e) {
    			logger.error("could not add path: " + path, e);
    		}
    	}
    
    	private static String getParentDir() {
    		String guid = UUID.randomUUID().toString();
    		return MERGED_HDFS_ROOT_DIR + guid + "/";
    	}
    
    }

    BucketThread类的实现

    /**
     * 这个类提供了将小于块大小的文件合并为一个大于块大小的文件,这样将提交较少的map()作业,提高map的运行效率。
     *
     */
    public class BucketThread implements Runnable {
    
    	private static Logger theLogger = Logger.getLogger(BucketThread.class);
    	private static final Path NULL_PATH = new Path("/tmp/null");
    
    	private Thread runner = null;
    	private List<Path> bucket = null;
    	private Configuration conf = null;
    	private FileSystem fs = null;
    	private String parentDir = null;
    
    	private String targetDir = null;
    	private String targetFile = null;
    
    	/**
    	 * 创建一个新的Bucket线程对象
    	 *
    	 * @param parentDir:父目录
    	 * @param id:
    	 *            每一个Bucket都有一个唯一的ID
    	 *
    	 */
    	public BucketThread(String parentDir, int id, Configuration conf) throws IOException {
    		this.parentDir = parentDir;
    		// 存储目标目录
    		this.targetDir = parentDir + id;
    		// 存储目标文件
    		this.targetFile = targetDir + "/" + id;
    		this.conf = conf;
    		this.runner = new Thread(this);
    		this.fs = FileSystem.get(this.conf);
    		this.bucket = new ArrayList<Path>();
    	}
    
    	/**
    	 * 启动线程
    	 */
    	public void start() {
    		runner.start();
    	}
    
    	/**
    	 * 连接并等待其他线程
    	 */
    	public void join() throws InterruptedException {
    		runner.join();
    	}
    
    	/**
    	 * 线程的核心执行
    	 */
    	public void run() {
    		try {
    			copyMerge();
    		} catch (Exception e) {
    			theLogger.error("run(): copyMerge() failed.", e);
    		}
    	}
    
    	/**
    	 * @param path
    	 *            :添加一个文件到Bucket中
    	 */
    	public void add(String path) {
    		if (path == null) {
    			return;
    		}
    
    		Path hdfsPath = new Path(path);
    		if (pathExists(hdfsPath)) {
    			bucket.add(hdfsPath);
    		}
    	}
    
    	public List<Path> getBucket() {
    		return bucket;
    	}
    
    	public int size() {
    		return bucket.size();
    	}
    
    	public Path getTargetDir() {
    		if (size() == 0) {
    			// 没有文件的空目录
    			return NULL_PATH;
    		} else if (size() == 1) {
    			return bucket.get(0);
    		} else {
    			// bucket有两个或更多的文件,并且已经被合并
    			return new Path(targetDir);
    		}
    	}
    
    	/**
    	 * 将多个目录中的所有文件复制到一个输出文件(合并)。
    	 *
    	 * 将bucket中的所有路径合并,并返回一个新的目录(targetDir),该目录包含合并的路径。
    	 */
    	public void copyMerge() throws IOException {
    		// 如果bucket中只有一个路径/dir,则不需要合并它
    		if (size() < 2) {
    			return;
    		}
    
    		Path hdfsTargetFile = new Path(targetFile);
    		OutputStream out = fs.create(hdfsTargetFile);
    		try {
    			for (int i = 0; i < bucket.size(); i++) {
    				FileStatus contents[] = fs.listStatus(bucket.get(i));
    				for (int k = 0; k < contents.length; k++) {
    					if (!contents[k].isDir()) {
    						InputStream in = fs.open(contents[k].getPath());
    						try {
    							IOUtils.copyBytes(in, out, conf, false);
    						} finally {
    							InputOutputUtil.close(in);
    						}
    					}
    				}
    
    			}
    		} finally {
    			InputOutputUtil.close(out);
    		}
    
    	}
    
    	public String getParentDir() {
    		return parentDir;
    	}
    
    	/**
    	 * HDFS目录存在,则返回true,否则返回false
    	 */
    	public boolean pathExists(Path path) {
    		if (path == null) {
    			return false;
    		}
    
    		try {
    			return fs.exists(path);
    		} catch (Exception e) {
    			return false;
    		}
    	}
    
    	public String toString() {
    		return bucket.toString();
    	}
    
    }

    Hadoop程序的实现

    /**
     * 使用小文件合并的单词计数驱动程序
     *
     */
    public class WordCountDriverWithConsolidator extends Configured implements Tool {
    
    	private static final Logger logger = Logger.getLogger(WordCountDriverWithConsolidator.class);
    	private static int NUMBER_OF_MAP_SLOTS_AVAILABLE = 8;
    	// 每一个bucket的最大文件数
    	private static int MAX_FILES_PER_BUCKET = 5;
    
    	private String inputDir = null;
    	private String outputDir = null;
    	private Job job = null;
    
    	public WordCountDriverWithConsolidator(String inputDir, String outputDir) {
    		this.inputDir = inputDir;
    		this.outputDir = outputDir;
    	}
    
    	public Job getJob() {
    		return this.job;
    	}
    
    	/**
    	 * 启动Job
    	 */
    	public int run(String[] args) throws Exception {
    		this.job = new Job(getConf(), "WordCountDriverWithConsolidator");
    		job.setJobName("WordCountDriverWithConsolidator");
    		job.getConfiguration().setInt("word.count.ignored.length", 3);
    
    		// 将所有jar文件添加到HDFS的分布式缓存中
    		HadoopUtil.addJarsToDistributedCache(job, "/lib/");
    
    		// 获取HDFS文件系统
    		FileSystem fs = FileSystem.get(job.getConfiguration());
    		List<String> smallFiles = HadoopUtil.listDirectoryAsListOfString(inputDir, fs);
    		int size = smallFiles.size();
    		if (size <= NUMBER_OF_MAP_SLOTS_AVAILABLE) {
    			for (String file : smallFiles) {
    				logger.info("file=" + file);
    				addInputPath(fs, job, file);
    			}
    		} else {
    			// 创建文件Bucket,每一个Bucket将会添加小文件
    			BucketThread[] buckets = SmallFilesConsolidator.createBuckets(size, NUMBER_OF_MAP_SLOTS_AVAILABLE,
    					MAX_FILES_PER_BUCKET);
    			SmallFilesConsolidator.fillBuckets(buckets, smallFiles, job, MAX_FILES_PER_BUCKET);
    			SmallFilesConsolidator.mergeEachBucket(buckets, job);
    		}
    
    		// 输出路径
    		FileOutputFormat.setOutputPath(job, new Path(outputDir));
    
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputFormatClass(TextOutputFormat.class);
    
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		job.setMapperClass(WordCountMapper.class);
    		job.setCombinerClass(WordCountReducer.class);
    		job.setReducerClass(WordCountReducer.class);
    
    		boolean status = job.waitForCompletion(true);
    		logger.info("run(): status=" + status);
    		return status ? 0 : 1;
    	}
    
    	/**
    	 * 添加输入路径
    	 */
    	private void addInputPath(FileSystem fs, Job job, String pathAsString) {
    		try {
    			Path path = new Path(pathAsString);
    			if (HadoopUtil.pathExists(path, fs)) {
    				FileInputFormat.addInputPath(job, path);
    			} else {
    				logger.info("addInputPath(): path does not exist. ignored: " + pathAsString);
    			}
    		} catch (Exception e) {
    			logger.error("addInputPath(): could not add path: " + pathAsString, e);
    		}
    	}
    
    	/**
    	 * 提交map/reduce作业
    	 */
    	public static int submitJob(String inputDir, String outputDir) throws Exception {
    		WordCountDriverWithConsolidator driver = new WordCountDriverWithConsolidator(inputDir, outputDir);
    		int status = ToolRunner.run(driver, null);
    		logger.info("submitJob(): status=" + status);
    		return status;
    	}
    
    	/**
    	 * Wordcount的map/reduce程序的主驱动程序。调用此方法提交map/reduce作业。
    	 * 
    	 * @throws Exception:作业跟踪器通信问题时抛出异常。
    	 * 
    	 */
    	public static void main(String[] args) throws Exception {
    		// 确定有两个参数
    		if (args.length != 2) {
    			logger.warn("2 arguments. <input-dir>, <output-dir>");
    			throw new IllegalArgumentException("2 arguments. <input-dir>, <output-dir>");
    		}
    
    		logger.info("inputDir=" + args[0]);
    		logger.info("outputDir=" + args[1]);
    		long startTime = System.currentTimeMillis();
    		int returnStatus = submitJob(args[0], args[1]);
    		long elapsedTime = System.currentTimeMillis() - startTime;
    		logger.info("returnStatus=" + returnStatus);
    		logger.info("Finished in milliseconds: " + elapsedTime);
    		System.exit(returnStatus);
    	}
    }
    /**
     * WordCount Mapper
     *
     */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    
    	private int ignoredLength = 3;
    	private static final IntWritable one = new IntWritable(1);
    	private Text reducerKey = new Text();
    
    
    	@Override
    	protected void setup(Context context) throws IOException, InterruptedException {
    		this.ignoredLength = context.getConfiguration().getInt("word.count.ignored.length", 3);
    	}
    
    
    	@Override
    	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    		String line = value.toString().trim();
    		if ((line == null) || (line.length() < ignoredLength)) {
    			return;
    		}
    
    
    		String[] words = StringUtils.split(line);
    		if (words == null) {
    			return;
    		}
    
    
    		for (String word : words) {
    			if (word.length() < this.ignoredLength) {
    				continue;
    			}
    			if (word.matches(".*[,.;]$")) {
    				word = word.substring(0, word.length() - 1);
    			}
    			reducerKey.set(word);
    			context.write(reducerKey, one);
    		}
    	}
    
    
    }
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    	public void reduce(Text key, Iterable<IntWritable> values, Context context)
    			throws IOException, InterruptedException {
    		int sum = 0;
    		for (IntWritable count : values) {
    			sum += count.get();
    		}
    		context.write(key, new IntWritable(sum));
    	}
    
    }

    用CombineFileInputFormat解决小文件问题

    使用Hadoop API(抽象类CombineFileInputFormat)来解决小文件的问题。抽象类CombineFileInputFormat的基本思想是通过使用一个定制的InputFormat允许将小文件合并到Hadoop的分片(split)或块(chunk)中。要使用抽象类CombineFileInputFormat,需要事项3个定制类。

        1、 CustomCFIF要扩展CombineFileInputFormat,创建子类来支持定制格式的输入。

        2、 PairOfStringLong是一个Writable类,会存储小文件名称及其偏移量(Long)。调用compareTo()方法:首先比较文件名,再比较便宜量。

        3、 CustomRecordReader是一个定制RecordReader。

    CustomCFIF自定义类的实现

    /**
     * 自定义文件输入格式,将较小的文件合并到控制大小为MAX_SPLIT_SIZE_128MB的文件中
     */
    public class CustomCFIF extends CombineFileInputFormat<PairOfStringLong, Text> {
    	final static long MAX_SPLIT_SIZE_128MB = 134217728; // 128 MB = 128*1024*1024
    
    
    	public CustomCFIF() {
    		super();
    		setMaxSplitSize(MAX_SPLIT_SIZE_128MB);
    	}
    
    
    	public RecordReader<PairOfStringLong, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
    			throws IOException {
    		return new CombineFileRecordReader<PairOfStringLong, Text>((CombineFileSplit) split, context,
    				CustomRecordReader.class);
    	}
    
    
    	@Override
    	protected boolean isSplitable(JobContext context, Path file) {
    		return false;
    	}
    }
    

    CustomRecordReader自定义类的实现

    /**
     * 自定义记录文件读取类
     *
     */
    public class CustomRecordReader extends RecordReader<PairOfStringLong, Text> {
    	private PairOfStringLong key;
    	private Text value;
    
    	// define pos and offsets
    	private long startOffset;
    	private long endOffset;
    	private long pos;
    
    	private FileSystem fs;
    	private Path path;
    	private FSDataInputStream fileIn;
    	private LineReader reader;
    
    	public CustomRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException {
    		path = split.getPath(index);
    		fs = path.getFileSystem(context.getConfiguration());
    		startOffset = split.getOffset(index);
    		endOffset = startOffset + split.getLength(index);
    		fileIn = fs.open(path);
    		reader = new LineReader(fileIn);
    		pos = startOffset;
    	}
    
    	@Override
    	public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
    		// This will not be called, use custom Constructor
    	}
    
    	@Override
    	public void close() throws IOException {
    	}
    
    	@Override
    	public float getProgress() throws IOException {
    		if (startOffset == endOffset) {
    			return 0;
    		}
    		return Math.min(1.0f, (pos - startOffset) / (float) (endOffset - startOffset));
    	}
    
    	@Override
    	public PairOfStringLong getCurrentKey() throws IOException, InterruptedException {
    		return key;
    	}
    
    	@Override
    	public Text getCurrentValue() throws IOException, InterruptedException {
    		return value;
    	}
    
    	@Override
    	public boolean nextKeyValue() throws IOException {
    		if (key == null) {
    			// key.filename = path.getName()
    			// key.offset = pos
    			key = new PairOfStringLong(path.getName(), pos);
    		}
    		if (value == null) {
    			value = new Text();
    		}
    		int newSize = 0;
    		if (pos < endOffset) {
    			newSize = reader.readLine(value);
    			pos += newSize;
    		}
    		if (newSize == 0) {
    			key = null;
    			value = null;
    			return false;
    		} else {
    			return true;
    		}
    	}
    }

    Hadoop程序的实现

    /**
     * 将小文件合并到大文件的单词计数驱动程序类。
     *
     */
    public class CombineSmallFilesDriver extends Configured implements Tool {
    
    	public static void main(String[] args) throws Exception {
    		long beginTime = System.currentTimeMillis();
    		System.exit(ToolRunner.run(new Configuration(), new CombineSmallFilesDriver(), args));
    		long elapsedTime = System.currentTimeMillis() - beginTime;
    		System.out.println("elapsed time(millis): " + elapsedTime);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		System.out.println("input path = " + args[0]);
    		System.out.println("output path = " + args[1]);
    
    		Configuration conf = getConf();
    		Job job = new Job(conf);
    		job.setJobName("CombineSmallFilesDriver");
    
    		// 将所有jar文件添加到HDFS的分布式缓存中
    		HadoopUtil.addJarsToDistributedCache(job, "/lib/");
    
    		// 定义文件数据格式化
    		job.setInputFormatClass(CustomCFIF.class);
    
    		// 定义Output的Key和Value类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 定义map和reduce的函数类
    		job.setMapperClass(WordCountMapper.class);
    		job.setReducerClass(WordCountReducer.class);
    		// job.setNumReduceTasks(13);
    
    		// 定义输入/输出路径
    		Path inputPath = new Path(args[0]);
    		Path outputPath = new Path(args[1]);
    		FileInputFormat.addInputPath(job, inputPath);
    		FileOutputFormat.setOutputPath(job, outputPath);
    
    		// 提交作业等待完成
    		job.submit();
    		job.waitForCompletion(true);
    		return 0;
    	}
    }
    
    /**
     * Wordcount Mapper
     */
    public class WordCountMapper extends Mapper<PairOfStringLong, Text, Text, IntWritable> {
    
    	final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();
    
    	public void map(PairOfStringLong key, Text value, Context context) throws IOException, InterruptedException {
    		String line = value.toString().trim();
    		String[] tokens = StringUtils.split(line, " ");
    		for (String tok : tokens) {
    			word.set(tok);
    			context.write(word, one);
    		}
    	}
    }
    
    /**
     * Wordcount Reduce
     */
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    	public void reduce(Text key, Iterable<IntWritable> values, Context context)
    			throws IOException, InterruptedException {
    		int sum = 0;
    		for (IntWritable val : values) {
    			sum += val.get();
    		}
    		context.write(key, new IntWritable(sum));
    	}
    }

    总结

           在客户端合并小文件及使用CombineFileInputFormat解决小文件问题,可以快速提高Hadoop程序的效率。

     


    展开全文
  • ​我们偶尔会遇到将一个大文件分割为若干个小文件的需求。比如,FAT32格式的U盘,无法存储文件大小大于4GB的文件,需要将文件分割为小于4GB的文件才能存储。本文将介绍几个可以将大文件分割为小文件的方法。 01...

    ​我们偶尔会遇到将一个大文件分割为若干个小文件的需求。比如,FAT32格式的U盘,无法存储文件大小大于4GB的文件,需要将文件分割为小于4GB的文件才能存储。本文将介绍几个可以将大文件分割为小文件的方法。

    01

    使用TotalCommander分割、合并文件

    选中一个文件,点击文件-拆分文件菜单,打开对话框,选择单个文件的大小后,点击确定,即可分割文件。

    合并文件时,选中后缀名为001的文件,点击文件-合并文件菜单即可。

    02

    使用压缩软件分割文件

    以下步骤使用7zip压缩软件进行演示。

    在文件上点击右键,选择添加到压缩包菜单,打开对话框,设置分卷大小,点击确定即可分割文件。

    ​选中压缩文件点击鼠标右键,选择提取文件到当前位置即可恢复原文件。

    欢迎关注微信公众号,及时获取信息。

     

    展开全文
  • Spark小文件合并

    万次阅读 2019-01-29 20:09:26
    最近使用spark sql执行etl时候出现了,最终结果大小只有几百k,但是小文件一个分区有上千的情况。 危害: hdfs有最大文件数限制 浪费磁盘资源(可能存在空文件); hive中进行统计,计算的时候,会产生很多个map,影响...
  • HDFS小文件问题及解决方案

    千次阅读 2018-12-24 18:49:24
    首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G...
  • Iceberg 合并小文件并删除历史(Flink)

    万次阅读 2021-10-14 17:50:48
    Iceberg 合并小文件并删除历史(Flink) Iceberg每一次操作都会产生多个数据文件(metadata、data、snapshot),需要自行合并清理。 详细Iceberg写入时文件变化请参考官网https://iceberg.apache.org/ 建表时新增with...
  • spark写入hdfs文件小文件解决办法

    千次阅读 2019-01-26 11:18:21
    我们在使用spark写入hdfs文件时,会经常由于partition的数目问题,导致最后保存在hdfs中时产生了很多小文件,之前也用过repartition的方法,但是会出现虽然会大量减少生成文件的数目,但是会使得最后保存文件这一步...
  • HDFS的小文件合并成大文件

    千次阅读 2019-11-07 16:04:41
    在实际项目中,输入数据往往是由许多小文件组成,这里的小文件是指小于HDFS系统Block大小的文件(默认128M), 然而每一个存储在HDFS中的文件、目录和块都映射为一个对象,存储在NameNode服务器内存中,通常占用150...
  • HDFS合并小文件

    千次阅读 2019-05-10 09:00:02
    关键字:hadoop hdfs 小文件、appendToFile、getmerge 众所周知,HDFS中过多的小文件,会给NameNode造成很大的压力,所谓的小文件,是指远远小于文件块大小的文件。 在使用HDFS的过程中,应尽量避免生成过多的...
  • 测试收图demo等,会在本地保存大量图片文件,时间一长,会有多哒几十个GB的数据,占用大量磁盘空间。使用windows的右键删除耗时完全不靠谱,需要十几个小时。尝试了shift+del还是很慢。又尝试了电脑管家的文件粉碎...
  • HDFS为什么不适合存储小文件

    千次阅读 2019-03-26 21:05:00
    不能高效的对大量的小数据进行存储(大量的小文件会很快沾满nameNode的内存空间)大量的小文件,也会影响NameNode的寻址时间 1、小文件过多,会过多占用namenode的内存,并浪费block。 - 文件的元数据(包括文件被...
  • Windows10删除大量小文件速度慢

    千次阅读 2021-01-01 19:53:04
    今天在windows10上删除某数据集时发现需要几个小时(大概有10000多张图片),心态直接爆炸,最后发现 电脑管家的文件粉碎可以删除速度很快。 点击就会自动安装,安装完成后右键需要删除的文件或者文件夹,选择文件...
  • Spark SQL合并小文件的一种方法

    万次阅读 2019-05-07 17:20:19
    小文件问题原因: spark.sql.shuffle.partitions=200 sparksql默认shuffle分区是200个,如果数据量比较小时,写hdfs时会产生200个小文件。可通过如下调整,使其自适应的合并小文件(本人测试环境从原来的200个小文件...
  • linux下如何将大文件分为多个小文件

    千次阅读 2019-04-27 08:40:33
    有没有办法(命令最好)把这样的大文件拆分为多个小文件呢? 拆分文件的Linux命令 (1)将文件按照存储大小拆分 如下命令,将954M大小的文件httpd.log,按照500MB每个文件大小进行拆分。拆分后,变为xaa和xab两个...
  • Hadoop实战项目:小文件合并

    千次阅读 2018-03-30 20:40:02
    项目背景 在实际项目中,输入数据往往是由许多小文件组成,这里的小文件是指小于HDFS系统Block大小的文件(默认128M),早期的版本所定义的小文件是64M,这里的hadoop-2.2.0所定义的小文件是128M。然而每一个存储在...
  • 将大文本文件分割成多个小文件

    万次阅读 多人点赞 2017-08-23 23:00:57
    本文介绍一种将一个大的文本文件分割成多个小文件的方法思路: 1.读取文章所有的行,并存入列表中 2.定义分割成的小文本的行数 3.将原文本内容按一定行数依次写入小文件
  • spark 处理小文件问题

    千次阅读 2018-04-18 00:04:44
    coalesce与repartition 解决小文件问题repartition(numPartitions: Int)  返回numPartitions分区个数的新RDD(或DataFrame)。  可以增加或减少此RDD中的并行性级别,内部使用shuffle来重新分配数据。  如果...
  • 今天主要来说一下sparksql写入hive后小文件太多,影响查询性能的问题.在另外一篇博客里面也稍微提到了一下,但还是感觉要单独说一下,首先我们要知道hive里面文件的数量=executor-cores*num-executors*job数,所以如果...
  • 问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。问题重现:1、创建flume配置文件flume-env.sh,:flume配置文件如下(根据自身需要修改): 因为flume可以配置多种采集方式,每种...
  • 在研究图片服务器问题时,了解到现在很多大公司基本上都是用分布式文件系统来存储海量小文件,比如Facebook有haystack,淘宝有TFS,京东有JFS。最近在研究TFS,结合之前学习的linux下的inode相关知识,了解到在ext...
  • 小文件存档

    千次阅读 2020-08-17 16:07:20
    1、HDFS存储小文件弊端 每个文件均按块存储,每个块的元数据存储在NameNode的内存中,因此HDFS存储小文件会非常低效。因为大量的小文件会耗尽NameNode中的大部分内存。但注意,存储小文件所需要的磁盘容量和数据块的...
  •  为了缓解大量小文件带给namenode内存的压力,Hadoop 0.18.0引入了Hadoop Archives(HAR files),其本质就是在HDFS之上构建一个分层文件系统。通过执行hadoop archive 命令就可以创建一个HAR文件。在命令行下,用户...
  • 在做项目时,有个ETL需要处理,数据都在HIVE里面,需要对数据进行统计分析转换。开始直接用的HIVE的JDBC,效率不高。...但是随之而来的是用sparksql往hive表中插入数据时,会产生很多小文件。用hive时,可以
  • windows下如何快速删除大量小文件

    万次阅读 2018-11-14 23:07:33
    系统运行了几年,由于开来一个dump功能,而dump没有做垃圾删除,导致磁盘空间爆满,通过shift_delete居然要两个多小时 由于情况紧急,两个多小时不能等,且最好一下都删完。 经过考虑,觉得通过界面删除太慢,应该...
  • [Hadoop]大量小文件问题及解决方案

    万次阅读 2016-12-25 15:34:09
    1. HDFS上的小文件问题小文件是指文件大小明显小于HDFS上块(block)大小(默认64MB)的文件。如果存储小文件,必定会有大量这样的小文件,否则你也不会使用Hadoop(If you’re storing small files, then you ...
  • hadoop面对大量的小文件处理

    千次阅读 2018-02-08 20:29:39
    小文件在HDFS中 HDFS块大小默认是128m(hadoop2.x默认是128),若是存储了很多这种小文件每个小文件占了一个block而每个block的文件,目录在namenode里以对像(元数据)形式存储, 就会造成namenode内存占用严重,同时 ...
  • 写在前面 本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和文献引用请见1000个问题搞定...如果有100000个小文件,每
  • 在pyspark中,使用数据框的文件写出函数write.parquet经常会生成太多的小文件,例如申请了100个block,而每个block中的结果 只有几百K,这在机器学习算法的结果输出中经常出现,这是一种很大的资源浪费,那么如何...
  • HDFS小文件问题解决方案与启发实践

    千次阅读 2017-01-08 14:25:07
    前言 继上文聊聊HDFS Block...在本文的阐述过程中,我们将通过一个平时遇到的典型问题-HDFS小文件过多问题作为贯穿全文的一个核心要点。在下文中,笔者将会介绍小文件的缘由,现有解决办法,新的解决方案等等内容。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,939,347
精华内容 2,375,738
关键字:

小文件