精华内容
下载资源
问答
  • HADOOP与HDFS数据压缩格式

    千次阅读 2018-10-17 18:20:11
    HADOOP与HDFS数据压缩格式 1、cloudera 数据压缩的一般准则 一般准则 是否压缩数据以及使用何种压缩格式对性能具有重要的影响。在数据压缩上,需要考虑的最重要的两个方面是 MapReduce 作业和存储在 HBase 中的...

    HADOOP与HDFS数据压缩格式

    1、cloudera 数据压缩的一般准则

    一般准则

    • 是否压缩数据以及使用何种压缩格式对性能具有重要的影响。在数据压缩上,需要考虑的最重要的两个方面是 MapReduce 作业和存储在 HBase 中的数据。在大多数情况下,每个的原则都类似。
    • 您需要平衡压缩和解压缩数据所需的能力、读写数据所需的磁盘 IO,以及在网络中发送数据所需的网络带宽。正确平衡这些因素有赖于集群和数据的特征,以及您的
    • 使用模式。
    • 如果数据已压缩(例如 JPEG 格式的图像),则不建议进行压缩。事实上,结果文件实际上可能大于原文件。
    • GZIP 压缩使用的 CPU 资源比 Snappy 或 LZO 更多,但可提供更高的压缩比。GZIP 通常是不常访问的冷数据的不错选择。而 Snappy 或 LZO 则更加适合经常访问的热数据。
    • BZip2 还可以为某些文件类型生成比 GZip 更多的压缩,但是压缩和解压缩时会在一定程度上影响速度。HBase 不支持 BZip2 压缩。
    • Snappy 的表现通常比 LZO 好。应该运行测试以查看您是否检测到明显区别。
    • 对于 MapReduce,如果您需要已压缩数据可拆分,BZip2、LZO 和 Snappy 格式都可拆分,但是 GZip 不可以。可拆分性与 HBase 数据无关。
    • 对于 MapReduce,您可压缩中间数据、输出或二者。相应地调整您为 MapReduce 作业提供的参数。以下示例压缩中间数据和输出。MR2 先显示,然后显示 MR1。
    MR2
    hadoop jar hadoop-examples-.jar sort "-Dmapreduce.compress.map.output=true"
          "-Dmapreduce.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"
          "-Dmapreduce.output.compress=true"
          "-Dmapreduce.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" -outKey
          org.apache.hadoop.io.Text -outValue org.apache.hadoop.io.Text input output
    MR1
    hadoop jar hadoop-examples-.jar sort "-Dmapred.compress.map.output=true"
          "-Dmapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"
          "-Dmapred.output.compress=true"
          "-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" -outKey
          org.apache.hadoop.io.Text -outValue org.apache.hadoop.io.Text input output
    

    2、Hadoop 压缩实现分析

    压缩简介

    Hadoop 作为一个较通用的海量数据处理平台,每次运算都会需要处理大量数据,我们会在 Hadoop 系统中对数据进行压缩处理来优化磁盘使用率,提高数据在磁盘和网络中的传输速度,从而提高系统处理数据的效率。在使用压缩方式方面,主要考虑压缩速度和压缩文件的可分割性。综合所述,使用压缩的优点如下:

    1. 节省数据占用的磁盘空间;
    2. 加快数据在磁盘和网络中的传输速度,从而提高系统的处理速度。

    压缩格式

    1. Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。
    2. Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。
    3. Hadoop 对每个压缩格式的支持, 详细见下表:
    压缩格式工具算法扩展名多文件可分割性
    DEFLATEDEFLATE.deflate
    GZIPgzipDEFLATE.gzp
    ZIPzipDEFLATE.zip是,在文件范围内
    BZIP2bzip2BZIP2.bz2
    LZOlzopLZO.lzo
    1. 如果压缩的文件没有扩展名,则需要在执行 MapReduce 任务的时候指定输入格式。
    hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/
      hadoop-streaming-0.20.2-CD H3B4.jar -file /usr/home/hadoop/hello/mapper.py -mapper /
      usr/home/hadoop/hello/mapper.py -file /usr/home/hadoop/hello/
      reducer.py -reducer /usr/home/hadoop/hello/reducer.py -input lzotest -output result4 -
      jobconf mapred.reduce.tasks=1*-inputformatorg.apache.hadoop.mapred.LzoTextInputFormat*
    

    性能对比

    1. Hadoop 下各种压缩算法的压缩比,压缩时间,解压时间见下表:
    压缩算法原始文件大小压缩文件大小压缩速度解压速度
    gzip8.3GB1.8GB17.5MB/s58MB/s
    bzip28.3GB1.1GB2.4MB/s9.5MB/s
    LZO-bset8.3GB2GB4MB/s60.6MB/s
    LZO8.3GB2.9GB49.3MB/s74.6MB/s

    因此我们可以得出:

    1. Bzip2 压缩效果明显是最好的,但是 bzip2 压缩速度慢,可分割。
    2. Gzip 压缩效果不如 Bzip2,但是压缩解压速度快,不支持分割。
    3. LZO 压缩效果不如 Bzip2 和 Gzip,但是压缩解压速度最快!并且支持分割!

    这里提一下,文件的可分割性在 Hadoop 中是很非常重要的,它会影响到在执行作业时 Map 启动的个数,从而会影响到作业的执行效率!

    所有的压缩算法都显示出一种时间空间的权衡,更快的压缩和解压速度通常会耗费更多的空间。在选择使用哪种压缩格式时,我们应该根据自身的业务需求来选择。

    3、4种常用压缩格式在Hadoop中的应用

    目前在Hadoop中用得比较多的有lzo,gzip,snappy,bzip2这4种压缩格式,笔者根据实践经验介绍一下这4种压缩格式的优缺点和应用场景,以便大家在实践中根据实际情况选择不同的压缩格式。

    1.gzip压缩

    • 优点:
      1. 压缩率比较高,而且压缩/解压速度也比较快;
      2. hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;
      3. 有hadoop native库;
      4. 大部分linux系统都自带gzip命令,使用方便。
    • 缺点:不支持split。
    • 应用场景:
      1. 当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用gzip压缩格式。譬如说一天或者一个小时的日志压缩成一个gzip文件,运行mapreduce程序的时候通过多个gzip文件达到并发。
      2. hive程序,streaming程序,和java写的mapreduce程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改。

    2.lzo压缩

    • 优点:
      1. 压缩/解压速度也比较快,合理的压缩率;
      2. 支持split,是hadoop中最流行的压缩格式;
      3. 支持hadoop native库;
      4. 可以在linux系统下安装lzop命令,使用方便。
    • 缺点:
      1. 压缩率比gzip要低一些;
      2. hadoop本身不支持,需要安装;
      3. 在应用中对lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。
    • 应用场景:
      一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点越明显。

    3.snappy压缩

    • 优点:
      1. 高速压缩速度和合理的压缩率;
      2. 支持hadoop native库。
    • 缺点:
      1. 不支持split;
      2. 压缩率比gzip要低;
      3. hadoop本身不支持,需要安装;
      4. linux系统下没有对应的命令。
    • 应用场景:
      1. 当mapreduce作业的map输出的数据比较大的时候,作为map到reduce的中间数据的压缩格式;
      2. 或者作为一个mapreduce作业的输出和另外一个mapreduce作业的输入。

    4.bzip2压缩

    • 优点:
      1. 支持split;
      2. 具有很高的压缩率,比gzip压缩率都高;
      3. hadoop本身支持,但不支持native;
      4. 在linux系统下自带bzip2命令,使用方便。
    • 缺点:
      1. 压缩/解压速度慢;
      2. 不支持native。
    • 应用场景:
      1. 适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式;
      2. 或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;
      3. 或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。

    5.4种压缩格式的特征的比较

    压缩格式splitnative压缩率速度是否hadoop自带linux命令换成压缩格式后,原来的应用程序是否要修改
    gzip很高比较快是,直接使用和文本处理一样,不需要修改
    lzo比较高很快否,需要安装需要建索引,还需要指定输入格式
    snappy比较高很快否,需要安装没有和文本处理一样,不需要修改
    bzip2最高是,直接使用和文本处理一样,不需要修改

    目前CDH集群一般都可选安装 Hadoop_Lzo,ucloud集群目前是集成了lzo的



    链接:https://www.jianshu.com/p/b50bc3f8819c
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

    展开全文
  • 工具文档 ...核心方法调用 将textfile文件类型 压缩成 gz文件类型 hadoop jar /data/soft/HsunTzu/HsunTzuPro-beat-2.0.jar "${COMPRESS_HDFS_PATH}" "/" "1" "/data/soft/HsunTz...

    工具文档

    https://github.com/mullerhai/HsunTzu/blob/master/README.md

    核心方法调用

    将textfile文件类型 压缩成 gz文件类型

    hadoop jar /data/soft/HsunTzu/HsunTzuPro-beat-2.0.jar "${COMPRESS_HDFS_PATH}" "/" "1" "/data/soft/HsunTzu/info.properties" "2" "0"
    

    使用效果

    压缩前

    在这里插入图片描述

    压缩中

    在这里插入图片描述

    压缩后

    在这里插入图片描述

    参照Github Very Fast Hdfs Origin File To Compress Decompress Untar Tarball

    展开全文
  • 我们的项目需求是spark任务处理完的数据发送至kafka中然后使用flume将数据写入hdfs中并且要求写入的文件格式是压缩格式。 flume是有自定义Sink这一说的,所以我们这里需要用到flume指定Sink这一技术,这里我们就...

    我们的项目需求是spark任务处理完的数据发送至kafka中然后使用flume将数据根据表名写入hdfs中并且要求写入的文件格式是压缩格式。

    flume是有自定义Sink这一说的,所以我们这里需要用到flume指定Sink这一技术,这里我们就需要用到的flume是1.7版本的

    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>

    这个是我的maven依赖

    以下是我的flume properties.properties配置

    server.sources = kafka_test
    server.channels = channel_test
    server.sinks = hdfs_test

    server.sources.kafka_test.type = org.apache.flume.source.kafka.KafkaSource
    server.sources.kafka_test.kafka.topics = flume_test
    server.sources.kafka_test.montime = 
    server.sources.kafka_test.nodatatime = 0
    server.sources.kafka_test.kafka.topics.regex = 
    server.sources.kafka_test.kafka.consumer.group.id = test1
    server.sources.kafka_test.kafka.bootstrap.servers = kafkaip:端口
    server.sources.kafka_test.kafka.security.protocol  = PLAINTEXT
    server.sources.kafka_test.batchDurationMillis = 1000
    server.sources.kafka_test.batchSize = 1000
    server.sources.kafka_test.channels = channel_test

    server.channels.channel_test.type = memory
    server.channels.channel_test.capacity = 10000
    server.channels.channel_test.transactionCapacity = 10000
    server.channels.channel_test.channelfullcount = 10
    server.channels.channel_test.keep-alive = 3
    server.channels.channel_test.byteCapacity = 
    server.channels.channel_test.byteCapacityBufferPercentage = 20

    server.sinks.hdfs_test.type = com.kdriving.dataprocessor.flume.MySink
    server.sinks.hdfs_test.channel = channel_test 

    为什么要自定义呢?

    因为kafka的一个topic中包含了多样的数据并不只是一种,而且这些数据我们是根据不同的表存储到hdfs不同的目录当中方便以后hive建表的时候关联。

    接下来看一下代码:

    package com.kdriving.dataprocessor.flume;
    
    import java.io.*;
    import java.net.URI;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    
    import com.alibaba.fastjson.JSONObject;
    import com.google.common.base.Throwables;
    import com.kd.common.application.AppInfo;
    import com.kd.common.log.Logger;
    import com.kd.common.log.LoggerFactory;
    
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.instrumentation.SinkCounter;
    import org.apache.flume.sink.AbstractSink;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
    import org.apache.hadoop.util.ReflectionUtils;
    import org.apache.kafka.clients.producer.Producer;
    
    
    public class MySink extends AbstractSink implements Configurable {
        private static final Logger LOGGER = LoggerFactory.getLogger(MySink.class);
        private static String nameservices = AppInfo.instance.getConf().getProperty("datacenter.base.hdfs.nameservices");
        private static String namenodes = AppInfo.instance.getConf().getProperty("datacenter.base.hdfs.namenodes");
        private static String namenodesAddr = AppInfo.instance.getConf().getProperty("datacenter.base.hdfs.namenodesAddr");
        private String hdfsURI;
    
        private static final long defaultRollInterval = 30;
        private String username;
    
        private String dataDir;
    
        private String dateFormat;
    
        private URI uri;
    
    
        private static Configuration conf = new Configuration();
    
        private FSDataOutputStream out = null;
        private Properties parameters;
        private Producer<String, String> producer;
        // private Context context;
        private int batchSize = 1000;// 一次事务的event数量,整体提交
     
        private SinkCounter sinkCounter;
    
        //初始化hdfs
        static {
            String[] namenodesArray = namenodes.split(",");
            String[] namenodesAddrArray = namenodesAddr.split(",");
            conf.set("fs.defaultFS", "hdfs://" + nameservices);
    
            conf.set("dfs.nameservices", nameservices);
    
            conf.set("dfs.ha.namenodes." + nameservices, namenodesArray[0] + "," + namenodesArray[1]);
    
            conf.set("dfs.namenode.rpc-address.hacluster." + namenodesArray[0], namenodesAddrArray[0]);
    
            conf.set("dfs.namenode.rpc-address.hacluster." + namenodesArray[1], namenodesAddrArray[1]);
    
            conf.set("dfs.client.failover.proxy.provider." + nameservices, "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    
            conf.setBoolean("dfs.support.append", true);
            conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "DEFAULT");
            conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
        }
    
        //数据处理的逻辑都在process方法中实现
        @Override
        public Status process() throws EventDeliveryException {
    
            LOGGER.info("开始执行mysink");
            // TODO Auto-generated method stub
            Status result = Status.READY;
    
            Channel channel = getChannel();
            Transaction transaction = null;
            Event event = null;
            String[] tablearrays = new String[]{"RADAR_RADAR_VALID", "RADAR_COMPREHENSIVE_VALID"};
    
            try {
                //要选择hdfs的压缩格式,可以从hdfs core-site.xml的配置项io.compression.codecs中查看hdfs支持的压缩格式
                String CodecClass = "org.apache.hadoop.io.compress.BZip2Codec";
                Class<?> codecClass = Class.forName(CodecClass);
                CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
                long processedEvent = 0;
                transaction = channel.getTransaction();
                transaction.begin();// 事务开始
                List<String> radarList = new ArrayList<>();
                List<String> COMPREHENSIVEList = new ArrayList<>();
                Boolean b = true;
                Map<String, List<String>> tableMap = new HashMap<>();
                while (b) {
                    event = channel.take();// 从channel取出一个事件
                    if (event == null) {
                        //LOGGER.info("event为空");
                        result = Status.BACKOFF;
                        break;
                    } else {
                        byte[] eventBody = event.getBody();
                        String eventData = new String(eventBody, "UTF-8");
                        //  LOGGER.info("event有数据-》" + eventData);
    
                    }
                    sinkCounter.incrementEventDrainAttemptCount();
                    // Map<String, String> headers = event.getHeaders();
    
                    byte[] eventBody = event.getBody();
                    String eventData = new String(eventBody, "UTF-8");
                    JSONObject jsonObject = JSONObject.parseObject(eventData);
                    String table = jsonObject.getString("TABLE");
    
                    if (table.equals(tablearrays[0])) {
                        radarList.add(eventData);
                    } else if (table.equals(tablearrays[1])) {
                        COMPREHENSIVEList.add(eventData);
                    }
                    if (radarList.size() > 1000) {
                        b = false;
                    } else if (COMPREHENSIVEList.size() > 1000) {
                        b = false;
                    }
                    processedEvent++;
                }
                tableMap.put(tablearrays[0], radarList);
                tableMap.put(tablearrays[1], COMPREHENSIVEList);
                LOGGER.info("messageList大小" + radarList.size());
                if (processedEvent == 0) {
                    LOGGER.info("processedEvent=0");
                    sinkCounter.incrementBatchEmptyCount();
                    result = Status.BACKOFF;
                } else {
                    if (processedEvent < batchSize) {
                        sinkCounter.incrementBatchUnderflowCount();
                    } else {
                        sinkCounter.incrementBatchCompleteCount();
                    }
                    sinkCounter.addToEventDrainAttemptCount(processedEvent);
    
    
                    for (Map.Entry<String, List<String>> entry : tableMap.entrySet()) {
    
                        FSDataOutputStream outputStream = creatoutputStream(entry.getKey());
    
                        CompressionOutputStream cout = codec.createOutputStream(outputStream);
                        for (String body : entry.getValue()) {
                            String clo = "id|row|RSPID|LONGITUDE|CreateTime|TimeMin1|flightId|GroudSpeed|TRACKID|MsgType|REGID|TimeStamp|DataSource|fusion|DepAP|CallSign|TimeMin5|VerticalRate|SOURCE|ArrAP|DataType|Height|Vector|LATITUDE|Altitude|";
                            String[] split = clo.split("\\|");
                            String data = new String(body.getBytes("UTF-8"));
                            JSONObject jsonObject = JSONObject.parseObject(data);
                            jsonObject.put("row", jsonObject.getString("id"));
                            StringBuilder stringBuilder = new StringBuilder();
                            for (String s : split) {
                                String string = jsonObject.getString(s);
                                if (string == null || string.equals("")) {
                                    string = "null";
                                }
                                stringBuilder.append(string).append("|");
                            }
                            LOGGER.info("HDFS正在写入新数据->" + stringBuilder);
                            String string = stringBuilder.toString();
    
                            cout.write(string.getBytes("UTF-8"));
                            cout.write("\r\n".getBytes("UTF-8"));
                            cout.flush();
                        }
                        cout.close();
                        outputStream.close();
                    }
    
    
                }
                transaction.commit();// batchSize个事件处理完成,一次事务提交
                sinkCounter.addToEventDrainSuccessCount(processedEvent);
                result = Status.READY;
            } catch (Exception e) {
                String errorMsg = "Failed to publish events !";
                LOGGER.error(errorMsg, e);
                e.printStackTrace();
                result = Status.BACKOFF;
                if (transaction != null) {
                    try {
                        transaction.rollback();
                        LOGGER.debug("transaction rollback success !");
                    } catch (Exception ex) {
                        LOGGER.error(errorMsg, ex);
                        throw Throwables.propagate(ex);
                    }
                }
                // throw new EventDeliveryException(errorMsg, e);
            } finally {
                if (transaction != null) {
                    transaction.close();
                }
            }
            return result;
    
    
        }
    
    
        public FSDataOutputStream creatoutputStream(String table) throws Exception {
            String radar_radar_validFile = "";
    
            String date = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
            Calendar calendar = Calendar.getInstance();
            int year = calendar.get(Calendar.YEAR);
            int moth = calendar.get(Calendar.MONTH) + 1;
            int day = calendar.get(Calendar.DATE);
            String yearPath = "/data/" + table + "/" + year;
            String monthPath = "/data/" + table + "/" + year + "/" + moth;
            String dayPath = "/data/" + table + "/" + year + "/" + moth + "/" + day;
            Path yearfilePath = new Path(yearPath);
            Path monthfilePath = new Path(monthPath);
            Path dayfilePath = new Path(dayPath);
            radar_radar_validFile = "/data/" + table + "/" + year + "/" + moth + "/" + day + "/" + date + "-" + table + ".bz2";
    
            Path filePath = new Path(radar_radar_validFile);
            FileSystem hdfs = filePath.getFileSystem(conf);
            if (!hdfs.exists(yearfilePath)) {
                hdfs.mkdirs(yearfilePath);
                LOGGER.info(yearfilePath.toString() + "路径不存在已创建成功");
            }
            if (!hdfs.exists(monthfilePath)) {
                hdfs.mkdirs(monthfilePath);
                LOGGER.info(monthfilePath.toString() + "路径不存在已创建成功");
            }
            if (!hdfs.exists(dayfilePath)) {
                hdfs.mkdirs(dayfilePath);
                LOGGER.info(dayfilePath.toString() + "路径不存在已创建成功");
            }
            if (!hdfs.exists(filePath)) {
                hdfs.createNewFile(filePath);
                LOGGER.info(filePath.toString() + "文件不存在已创建成功");
            }
    
    
            return hdfs.append(new Path(radar_radar_validFile));
    
    
        }
    
        //该方法用于读取Flume中Sink的配置,在Sink初始化时调用
        @Override
        public void configure(Context context) {
    
            if (sinkCounter == null) {
                sinkCounter = new SinkCounter(getName());
            }
    
            // customelog.sinks.sink1.type=death.flume.FlumeSinkDemo
            // customelog.sinks.sink1.channel=channel1
            // customelog.sinks.sink1.hdfsURI=hdfs://hostname:port
            // customelog.sinks.sink1.username=hdfs
            // customelog.sinks.sink1.dataDir=/death/data_sampling
            // customelog.sinks.sink1.dateFormat=YYYY-MM-dd
        }
    
        //该方法用于Sink启动时调用
        @Override
        public synchronized void start() {
    
            sinkCounter.start();
            sinkCounter.incrementConnectionCreatedCount();
            super.start();
    
    //        try {
    //            uri = new URI(hdfsURI);
    //            conf = new Configuration();
    //        }catch (Exception e){
    //            e.printStackTrace();
    //        }
        }
    
        //该方法用于Sink停止使用调用
        @Override
        public synchronized void stop() {
            sinkCounter.stop();
            sinkCounter.incrementConnectionClosedCount();
            super.stop();
        }
    }
    

     

    将项目jar包存放至flume的lib目录下面!!!然后执行properties.properties配置文件,我们使用的是FusionInsight Manager来启动fume任务,

    选择我们要执行flume任务的节点

    然后上传properties.properties文件,然后点击保存配置

     

    然后通过hue检查文件是否存在hdfs上面

    文件成功存储到该路径下

    然后再使用hive外部表关联至目录下

    CREATE EXTERNAL TABLE IF NOT EXISTS `RADAR_RADARAIRLINE_VALID`( `id` STRING ,
    `row` STRING ,
    `RSPID` STRING ,
    `LONGITUDE` STRING ,
    `CreateTime` STRING ,
    `TimeMin1` STRING ,
    `flightId` STRING ,
    `GroudSpeed` STRING ,
    `TRACKID` STRING ,
    `MsgType` STRING ,
    `REGID` STRING ,
    `TimeStamp` STRING ,
    `DataSource` STRING ,
    `fusion` STRING ,
    `DepAP` STRING ,
    `CallSign` STRING ,
    `TimeMin5` STRING ,
    `VerticalRate` STRING ,
    `SOURCE` STRING ,
    `ArrAP` STRING ,
    `DataType` STRING ,
    `Height` STRING ,
    `Vector` STRING ,
    `LATITUDE` STRING ,
    `Altitude` STRING  
    
    )
    PARTITIONED BY (`dt` string)
                        ROW FORMAT DELIMITED 
                        FIELDS TERMINATED BY '|'
                       STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
                         LOCATION 'hdfs://namenodeip:hdfs端口/data/radar_radar_valid/2020/07/23'

    关联成功后查询数据

    查询成功!!!

    当然这个中间有很多波折,,其实所谓的经验就是在一点点的学习中慢慢积累起来的!!!借此博客记录一下这次经典的flume结合hdfs,hive的使用

    展开全文
  • HDFS的文件压缩格式

    千次阅读 2018-08-01 12:43:59
    Hadoop默认支持Gzip和BZip2的解压缩方式,可直接读取(hadoop fs -text命令),但hive只能用TEXTFILE格式的表加载,然后再insertoverwrite 到其他格式的表(比如SEQUENCEFILE表),如果hive其他格式的表想要直接加载...

    Hadoop默认支持Gzip和BZip2的解压缩方式,可直接读取(hadoop fs -text命令),但hive只能用TEXTFILE格式的表加载,然后再insertoverwrite 到其他格式的表(比如SEQUENCEFILE表),如果hive其他格式的表想要直接加载压缩格式数据,需要重写INPUTFORMAT和OUTPUTFORMAT文件类。

    BZip2和LZO(提供block级的压缩)支持文件切分,Gzip和Snappy则不支持。 不支持则hadoop不能并行的进行map操作。

    hadoop中支持的压缩格式
    DEFLATEorg.apache.hadoop.io.compress.DefaultCodec
    gzip org.apache.hadoop.io.compress.GzipCodec
    bzip org.apache.hadoop.io.compress.BZip2Codec
    Snappy org.apache.hadoop.io.compress.SnappyCodec
    LZO:
    org.apache.hadoop.io.compress.LzopCodec或者com.hadoop.compression.lzo.LzopCodec;
    org.apache.hadoop.io.compress.LzoCodec或者com.hadoop.compression.lzo.LzoCodec;

    可以查看目前hive已加载的所以编解码器
    set io.compression.codecs;

    中间结果压缩
    set hive.exec.compress.intermediate=true;
    set hive.intermediate.compression.codec=org.apache.Hadoop.io.compress.LzoCodec;
    map结果压缩最好使用snappy的,因为压缩的前提是map输出非常大,影响io,如果中间结果数据集比较小反而会拖慢速度。

    设置map输出结果压缩
    Set mapred.map.output.compression.codec=org.apache.Hadoop.io.compress.SnappyCodec;

    最终输出结果压缩
    hive.exec.compress.output和mapred.output.compression.codec是hive-site.xml中的配置参数,
    而mapred.output.compress 和mapred.output.compression.codec 是hdfs-site.xml的配置参数。
    都可以配置实现。

    Hive格式各种格式下不同压缩算法的比较

    Hive 压缩全解读(hive表存储格式以及外部表直接加载压缩格式数据);HADOOP存储数据压缩方案对比(LZO,gz,ORC)

    Hive各种文件格式与压缩方式的结合测试

    Hive支持的文件格式与压缩算法

    展开全文
  • HDFS的数据压缩格式

    2019-09-18 16:46:13
    目前在Hadoop中用得比较多的有lzo,gzip,snappy,bzip2这4种压缩格式,笔者根据实践经验介绍一下这4种压缩格式的优缺点和应用场景,以便大家在实践中根据实际情况选择不同的压缩格式。 1.gzip压缩 优点: 压缩...
  • ➢块级压缩: 这里的块不同于HDFS中的块的概念.这种方式会将达到指定块大小的二进制数据压缩为一个块。 (2) Avro 将数据定义和数据- -起存储在一条消息中, 其中数据定义以JSON格式存储,数据以二 进制格式存储。Avro...
  • 使用Flume消费kafka中数据,sink到hdfs中出现数据压缩格式支持错误; 报错如下: error during configuration java.lang.IllegalArgumentException: Unsupported compression codec Lzop. Please choose from: [None...
  • HDFS文件压缩

    千次阅读 2018-01-26 13:52:53
    减少储存文件所需空间,还可以降低其在网络上传输的时间。 hadoop下各种压缩算法的压缩压缩算法 原始文件大小 压缩后的文件大小 压缩速度 解压速度 gzip 8.3GB 1.8GB 17.5MB/s 58MB/s bzip2 8.3GB 1.1GB 2.4MB/s
  • 我们知道,gzip默认可以分片,而lzo默认不可以分片,但可以通过创建索引的方式来支持分片,所以,我们创建该文件的索引 [hadoop@hadoop000 hadoop$ hadoop jar \ share/hadoop/mapreduce/lib/hadoop-lzo-0.4.21-...
  • hdfs文件压缩

    千次阅读 2016-12-10 00:42:39
    1.1、压缩格式总结   压缩格式 工具 算法 文件扩展名 是否可切分 DEFLATE 无 DEFLATE .deflate 否 Gzip gzip DEFLATE .gz 否 bzip2 bzip...
  • 问题描述:采用flume 上传 到HDFS 通过原生的sink 一直会报如下警告错误:问题会直接...flume 支持lzo 压缩前提条件:1、flume机器节点上安装有 lzo 库 hadoop 库 。2、 flume 启动的时候配置过 hadoop环境变量 。...
  • hdfs存储压缩方式对比

    千次阅读 2019-09-23 11:50:57
    前言: 随着数据规模的增大,集群存储的成本也随着增加,数十 PB 到百 PB 级别大集群...目前hdfs集群有多种存储压缩方式:gzip、bzip2、lzo、lz4、snappy等,下面介绍具体的压缩方式的对比 压缩方式对比 整体对...
  • hdfs文件存储格式

    千次阅读 2018-11-13 10:36:00
    hdfs 文件存储格式 hdfs 文件存储格式分为两大类 行存储和列存储 行存储,将一整行存储在一起,是一种连续的存储方式,例如SequenceFile,MapFile,缺点是如果只需要行中的某一列也必须把整行都读入内存当中 列存储 列...
  • ➢ 块级压缩:这里的块不同于 hdfs 中的块的概念.这种方式会将达到指定 块大小的二进制数据压缩为一个块。 Avro 将数据定义和数据一起存储在一条消息中,其中数据定义以 JSON 格式 存储,数据以二进制格式存储。Avro...
  • 本文主要介绍mapreduce过程中对hdfs文件压缩的使用。...Use Compressd Map Input:从HDFS中读取文件进行Mapreuce作业,如果数据很大,可以使用压缩并且选择支持分片的压缩方式(Bzip2,LZO),可以实现并行处理,提高...
  • 1.文件压缩 文件压缩好处: 减少数据所占用的磁盘空间 ...压缩格式 对应的编码/解码器 DEFLATE org.apache.hadoop.io.compress.DefaultCodec gzip org.apache.hadoop.io.compr...
  • HDFS压缩方式

    2021-03-03 14:41:48
    hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便。 缺点:不支持split。 应用场景:当每个文件压缩之后在130M以内的(1个块大小内)...
  • 这篇文章主要介绍一下Flink使用StreamingFileSink写入HDFS怎么用snappy压缩,之前的文章介绍过了写入parquet格式的数据,当时也有星球里面的朋友问这种写法怎么压缩,我只是简单的回复了说可以用AvroParquetWriter,...
  • 一. MapReduce on Yarn流程 1. 什么是MapReduce MapReduce是一个计算框架,核心思想是"分而治之",表现形式是有个输入(input),mapreduce操作这...有哪些压缩格式 gzip bzip2 LZO LZ4 snappy ...
  • HDFS文件的压缩

    千次阅读 2020-01-04 18:06:11
    前言 新搭建的集群,需要进行各种测试,于是让一个同事导入一些测试数据,大约1.4T左右。我进行了一个简单的hive测试 ...原来同事从ftp拉取过来的数据是gzip格式的,他直接如到Hive表中…,而gzip格式的...
  • 在Flink中我们可以很容易的使用内置的API来读取HDFS上的压缩文件,内置支持压缩格式包括.deflate,.gz, .gzip,.bz2以及.xz等。 但是如果我们想使用Flink内置sink API将数据以压缩的格式写入到HDFS上,好像并没有...
  • HDFS中的压缩与解压缩机制

    万次阅读 2013-08-14 16:24:46
    我们可以把数据文件压缩后再存入HDFS,以节省存储空间。...目前,Hadoop支持以下几种压缩格式 压缩格式 UNIX工具 算 法 文件扩展名 支持多文件 可分割 DEFLATE 无 DEFLATE .deflate No No gzip gzip DEFL
  • hadoop HDFS杂记

    2014-03-23 00:31:17
    1. HDFS使用ChecksumFileSystem或它的子类...2.HDFS支持压缩格式:deflate gzip bzip2 lzo snappy格式: java 访问: public static void main(String[] args) throws Exception { String codecClassname = args[0];
  • (1) spark textFile加载多个目录:  其实很简单,将多个目录(对应多个字符串),用,作为... val inputPath = List("hdfs://localhost:9000/test/hiveTest", "hdfs://localhost:9000/test/hiveTest2")  .mkString(",

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 16,617
精华内容 6,646
关键字:

hdfs支持的压缩格式