精华内容
下载资源
问答
  • python 运行 hadoop 2.0 mapreduce 程序
    要点:#!/usr/bin/python  因为要发送到各个节点,所以py文件必须是可执行的。 
    1)  统计(所有日志)独立ip数目,即不同ip的总数 
    ####################本地测试############################
    cat /home/hadoop/Sep-2013/*/* | python ipmappper.py | sort | python ipreducer.py
    本地部分测试结果:
    99.67.46.254    13
    99.95.174.29    47
    sum of single ip    13349
    #####################hadoop集群运行############################
    bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/jobs_python/job_logstat/ipmapper.py -reducer /data/hadoop/jobs_python/job_logstat/ipreducer.py -input /log_original/* -output /log_ipnum -file /data/hadoop/jobs_python/job_logstat/ipmapper.py -file /data/hadoop/jobs_python/job_logstat/ipreducer.py 
    集群部分测试结果:
    99.67.46.254    13
    99.95.174.29    47
    sum of single ip    13349
    
    ipmapper.py:
    ##########################mapper代码#######################################
    #!/usr/bin/python
    # --*-- coding:utf-8 --*--
    import re
    import sys
    
    pat = re.compile('(?P<ip>\d+.\d+.\d+.\d+).*?"\w+ (?P<subdir>.*?) ')
    for line in sys.stdin:
        match = pat.search(line)
        if match:
            print '%s\t%s' % (match.group('ip'), 1)
    
    ipreducer.py
    ##########################reducer代码#####################################
    #!/usr/bin/python
    from operator import itemgetter
    import sys
    
    dict_ip_count = {}
    
    for line in sys.stdin:
        line = line.strip()
        ip, num = line.split('\t')
        try:
            num = int(num)
            dict_ip_count[ip] = dict_ip_count.get(ip, 0) + num
    
        except ValueError:
            pass
    
    sorted_dict_ip_count = sorted(dict_ip_count.items(), key=itemgetter(0))
    for ip, count in sorted_dict_ip_count:
        print '%s\t%s' % (ip, count)
    
    
    
    2)  统计(所有日志)每个子目录访问次数
    ########################本地测试######################################
    cat /home/hadoop/Sep-2013/*/* | python subdirmapper.py | sort | python subdirreducer.py
    部分结果:
    http://dongxicheng.org/recommend/    2
    http://dongxicheng.org/search-engine/scribe-intro/trackback/    1
    http://dongxicheng.org/structure/permutation-combination/    1
    http://dongxicheng.org/structure/sort/trackback/    1
    http://dongxicheng.org/wp-comments-post.php    5
    http://dongxicheng.org/wp-login.php/    3535
    http://hadoop123.org/administrator/index.php    4
    
    #######################hadoop集群运行########################################
    bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/jobs_python/job_logstat/subdirmapper.py -reducer /data/hadoop/jobs_python/job_logstat/subdirreducer.py -input /log_original/* -output /log_subdirnum -file /data/hadoop/jobs_python/job_logstat/subdirmapper.py -file /data/hadoop/jobs_python/job_logstat/subdirreducer.py
    部分结果:
    http://dongxicheng.org/search-engine/scribe-intro/trackback/    1
    http://dongxicheng.org/structure/permutation-combination/    1
    http://dongxicheng.org/structure/sort/trackback/    1
    http://dongxicheng.org/wp-comments-post.php    5
    http://dongxicheng.org/wp-login.php/    3535
    http://hadoop123.org/administrator/index.php    4
    
    #######################################mapper代码###########################################
    #!/usr/bin/python
    # --*-- coding:utf-8 --*--
    import re
    import sys
    
    pat = re.compile('(?P<ip>\d+.\d+.\d+.\d+).*?"\w+ (?P<subdir>.*?) ')
    for line in sys.stdin:
        match = pat.search(line)
        if match:
            print '%s\t%s' % (match.group('subdir'), 1)
    #######################################reducer代码###########################################
    #!/usr/bin/python
    from operator import itemgetter
    import sys
    
    dict_subdir_count = {}
    
    for line in sys.stdin:
        line = line.strip()
        subdir, num = line.split('\t')
        try:
            num = int(num)
            dict_subdir_count[subdir] = dict_subdir_count.get(subdir, 0) + num
        except ValueError:
            pass
    
    sorted_dict_ip_count = sorted(dict_subdir_count.items(), key=itemgetter(0))
    for subdir, count in sorted_dict_ip_count:
        print '%s\t%s' % (subdir, count)
    复制代码

     

    
    
    
    
    
    【还是用java写mr程序吧】
    参考网址:
    http://asfr.blogbus.com/logs/44208067.html
    
    bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/mapper.py -reducer /data/hadoop/reducer.py -input /in/* -output /py_out -file /data/hadoop/mapper.py -file /data/hadoop/reducer.py 
    
    
    python开发mapreduce的原理:
    》与linux管道机制一致
    》通过标准输入输出实现进程间通信
    》标准输入输出是任何语言都支持的。
    举几个例子:
    cat 1.txt | grep 'dong' | sort
    cat 1.txt | python grep.py | java sort.jar
    
    以标准输入流作为输入:
    c++: cin
    c: scanf
    以标准输出流作为输出:
    c++:count
    c:printf
    
    局限性:可以实现Mapper Reducer,其他组件需要用java实现。
    
    hadoop-streaming 进行测试很简单的哦。
    编译程序,生成可执行文件
    g++ -o mapper mapper.cpp
    g++ -o reducer reduer.cpp
    测试程序:
    cat test.txt | ./mappper | sort | ./reducer
    
    
    展开全文
  • Hadoop-2.4.1学习之Mapper和Reducer

    万次阅读 2014-11-05 11:30:45
    Hadoop-2.4.1中MapReduce作业的Mapper和Reducer综述

           MapReduce允许程序员能够容易地编写并行运行在大规模集群上处理大量数据的程序,确保程序的运行稳定可靠和具有容错处理能力。程序员编写的运行在MapReduce上的应用程序称为作业(job),Hadoop既支持用Java编写的job,也支持其它语言编写的作业,比如Hadoop Streaming(shell、python)和Hadoop Pipes(c++)。Hadoop-2.X不再保留Hadoop-1.X版本中的JobTracker和TaskTracker组件,但这并不意味着Hadoop-2.X不再支持MapReduce作业,相反Hadoop-2.X通过唯一的主ResourceManager、每个节点一个的从NodeManager和每个应用程序一个的MRAppMaster保留了对MapReduce作业的向后兼容。在新版本中MapReduce作业依然由Map和Reduce任务组成,Map依然接收由MapReduce框架将输入数据分割为数据块,然后Map任务以完全并行的方式处理这些数据块,接着MapReduce框架对Map任务的输出进行排序,并将结果做为Reduce任务的输入,最后由Reduce任务输出最终的结果,在整个执行过程中MapReduce框架负责任务的调度,监控和重新执行失败的任务等。

          通常计算节点和存储节点是相同的,MapReduce框架会有效地将任务安排在存储数据的节点上,有助于降低传输数据时的带宽使用量。MapReduce应用程序通过实现或者继承合适的接口或类提供了map和reduce函数,这两个函数负责Map任务和Reduce任务。作业客户端将编写好的作业提交给ResourceManager,而不再是JobTracker,ResourceManager负责将作业分布到从节点上,调度和监控作业,为作业客户端提供状态和诊断信息。</

    展开全文
  • Hadoop自定义组件Combiner Combiner组件介绍 Combiner是一个特殊的Reduce组件 ,它处于Mapper和Reduce中间的一种组件,Combiner组件的父类就是Reducer. Combiner和Reducer之间的区别在于运行的位置 ,Reducer是每一个...

    Hadoop自定义组件Combiner

    Combiner组件介绍

    Combiner是一个特殊的Reduce组件 ,它处于Mapper和Reduce中间的一种组件,Combiner组件的父类就是Reducer.
    CombinerReducer之间的区别在于运行的位置 ,Reducer是每一个接收全局的Map Task 所输出的结果,Combiner一般是在MapTask的节点中运行.

    combiner

    每一个map都会产生大量的本地输出,Combiner的作用就是对map输出的结果先做一次合并,以较少的map和reduce节点中的数据传输量,Combiner的存在就是提高当前网络IO传输的性能,是MapReduce的一种优化方法。

    ​ 并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景不适用于Combiner。如果可以使用Combiner,一般情况下,和我们的reduce函数是一致的。

    combiner特点

    1. Combiner是MR程序中MapperReduce之外的一种组件,它到父类是Reducer,它们到的区别在于运行的位置

    2. Reduce阶段的Reducer是每一个接收全局的Map Task 所输出的结果

    3. Combiner是在合并排序后运行的。因此map端和reduce端都可以调用此函数。

    4. Combiner的存在就是提高当前网络IO传输的性能,是MapReduce的一种优化手段。

    5. Combiner在驱动类中的设置:

      job.setCombinerClass(MyCombiner.class);
      

    实现思路

    • 继承Reducer
    • 重写reduce方法,根据业务需求处理数据,处理完后调用context.write()方法写出数据即可。

    案例

    需求分析

    ​ 收集用户的电影评分数据,统计每个用户评分最高的10部电影

    电影评分数据

    电影名 评分 日期 用户名
    move14 01 2012-12-38 user1
    move14 01 2012-12-38 user3
    move14 01 2012-12-38 user5
    move13 02 2012-12-37 user1
    move13 02 2012-12-37 user4
    move13 02 2012-12-37 user6
    move12 03 2012-12-36 user1
    move12 03 2012-12-36 user5
    move12 03 2012-12-36 user7
    move11 04 2012-12-35 user1
    move11 04 2012-12-35 user6
    move11 04 2012-12-35 user8
    move10 05 2012-12-34 user1
    move10 05 2012-12-34 user7
    move10 05 2012-12-34 user9
    move09 06 2012-12-33 user1
    move09 06 2012-12-33 user8
    move09 06 2012-12-33 user10
    move08 07 2012-12-32 user1
    move08 07 2012-12-32 user9
    move08 07 2012-12-32 user11
    move07 08 2012-12-31 user1
    move07 08 2012-12-31 user10
    move07 08 2012-12-31 user12
    move06 09 2012-12-30 user1
    move06 09 2012-12-30 user11
    move06 09 2012-12-30 user13
    move05 10 2012-12-29 user1
    move05 10 2012-12-29 user12
    move05 10 2012-12-29 user14
    move04 11 2012-12-28 user1
    move04 11 2012-12-28 user13
    move04 11 2012-12-28 user15
    move03 12 2012-12-27 user1
    move03 12 2012-12-27 user14
    move03 12 2012-12-27 user16
    move02 13 2012-12-26 user1
    move02 13 2012-12-26 user15
    move02 13 2012-12-26 user17
    move01 14 2012-12-25 user1
    move01 14 2012-12-25 user16
    move01 14 2012-12-25 user18
    move01 14 2012-12-24 user2
    move01 14 2012-12-24 user17
    move01 14 2012-12-24 user19
    move02 13 2012-12-23 user2
    move02 13 2012-12-23 user18
    move02 13 2012-12-23 user20
    move03 12 2012-12-22 user2
    move03 12 2012-12-22 user19
    move03 12 2012-12-22 user21
    move04 11 2012-12-21 user2
    move04 11 2012-12-21 user20
    move04 11 2012-12-21 user22
    move05 10 2012-12-20 user2
    move05 10 2012-12-20 user21
    move05 10 2012-12-20 user23
    move06 09 2012-12-19 user2
    move06 09 2012-12-19 user22
    move06 09 2012-12-19 user24
    move07 08 2012-12-18 user2
    move07 08 2012-12-18 user23
    move07 08 2012-12-18 user25
    move08 07 2012-12-17 user2
    move08 07 2012-12-17 user24
    move08 07 2012-12-17 user26
    move09 06 2012-12-16 user2
    move09 06 2012-12-16 user25
    move09 06 2012-12-16 user27
    move10 05 2012-12-15 user2
    move10 05 2012-12-15 user26
    move10 05 2012-12-15 user28
    move11 04 2012-12-14 user2
    move11 04 2012-12-14 user27
    move11 04 2012-12-14 user29
    move12 03 2012-12-13 user2
    move12 03 2012-12-13 user28
    move12 03 2012-12-13 user30
    move13 02 2012-12-12 user2
    move13 02 2012-12-12 user29
    move13 02 2012-12-12 user31
    move14 01 2012-12-11 user2
    move14 01 2012-12-11 user30
    move14 01 2012-12-11 user32
    

    代码实现

    电影Bean

    package hadoop.mr.custom.combiner;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class MovieBean implements WritableComparable<MovieBean> {
    
        private String moviename;
    
        private Integer score;
    
        private String date;
    
        private String username;
    
        public String getMoviename() {
            return moviename;
        }
    
        public void setMoviename(String moviename) {
            this.moviename = moviename;
        }
    
        public Integer getScore() {
            return score;
        }
    
        public void setScore(Integer score) {
            this.score = score;
        }
    
        public String getDate() {
            return date;
        }
    
        public void setDate(String date) {
            this.date = date;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        /**
         * 输出结果的排序方法
         * @param o
         * @return
         */
        @Override
        public int compareTo(MovieBean o) {
            return this.username.compareTo(o.username);
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(moviename);
            out.writeInt(score);
            out.writeUTF(date);
            out.writeUTF(username);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            moviename=in.readUTF();
            score=in.readInt();
            date=in.readUTF();
            username=in.readUTF();
        }
    
        @Override
        public String toString() {
            return "MovieBean{" +
                    "moviename='" + moviename + '\'' +
                    ", score=" + score +
                    ", date='" + date + '\'' +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
    

    自定义组件combiner

    package hadoop.mr.custom.combiner;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.TreeMap;
    
    /**
     * MapReducer自定义组件 Combiner
     * Combiner 是一个特殊的Reduce组件 ,它处于Mapper和Reduce中间的一种组件,Combiner组件的父类就Reducer
     * ​ Combiner和Reducer之间的区别在于运行的位置 ,Reducer是每一个接收全局的Map Task 所输出的结果,Combiner一般是在MapTask的节点中运行.
     *
     * MovieCombiner 类实现
     */
    public class MovieCombiner extends Reducer<Text, MovieBean, Text,MovieBean> {
        //排序集合  降序排序
        TreeMap<Integer, MovieBean> treeMap = new TreeMap<Integer, MovieBean>(Comparator.reverseOrder());
        /**
         * 实现排名功能
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {
            //迭代遍历
            Iterator<MovieBean> iterator = values.iterator();
            while (iterator.hasNext()) {
                MovieBean movieBean = iterator.next();
                MovieBean bean = new MovieBean();
                bean.setMoviename(movieBean.getMoviename());
                bean.setDate(movieBean.getDate());
                bean.setScore(movieBean.getScore());
                bean.setUsername(movieBean.getUsername());
                if (bean!=null){
                    treeMap.put(bean.getScore(),bean);
                }
            }
            //TODO 只保留前十条数据,清除其他数据
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (int i = 0; i < 9; i++) {
                for (Map.Entry<Integer, MovieBean> entry:treeMap.entrySet()) {
                    MovieBean movieBean = entry.getValue();
                    MovieBean bean = new MovieBean();
                    bean.setMoviename(movieBean.getMoviename());
                    bean.setDate(movieBean.getDate());
                    bean.setScore(movieBean.getScore());
                    bean.setUsername(movieBean.getUsername());
                    context.write(new Text(entry.getValue().getUsername()),bean);
                }
            }
            super.cleanup(context);
        }
    }
    
    

    Mapper程序

    package hadoop.mr.custom.combiner;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class MovieMapper extends Mapper<LongWritable,Text,Text,MovieBean> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (key.get()==0){
                return;
            }
            String[] values = value.toString().split("\\s");
            MovieBean bean = new MovieBean();
            bean.setMoviename(values[0]);
            bean.setScore(Integer.parseInt(values[1]));
            bean.setDate(values[2]);
            bean.setUsername(values[3]);
            context.write(new Text(bean.getUsername()),bean);
        }
    }
    
    

    Reducer程序

    package hadoop.mr.custom.combiner;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    public class MovieReducer extends Reducer<Text,MovieBean, Text,MovieBean> {
    
        @Override
        protected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {
            Iterator<MovieBean> iterator = values.iterator();
            while (iterator.hasNext()) {
                MovieBean bean = iterator.next();
                context.write(key, bean);
            }
        }
    }
    
    

    mapreducer主入口

    package hadoop.mr.custom.combiner;
    
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import util.FolderUtil;
    
    public class MovieDriver extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            Job job = Job.getInstance();
            job.setJobName(MovieDriver.class.getName());
            job.setJarByClass(MovieDriver.class);
            job.setMapperClass(MovieMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(MovieBean.class);
            job.setReducerClass(MovieReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(MovieBean.class);
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            //设置自定义组件 Combiner
            job.setCombinerClass(MovieCombiner.class);
            System.out.println(job.waitForCompletion(true) ? "执行成功" : "执行失败");
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            args=new String[]{"D:\\BigData\\hadoop\\mr\\custom\\combiner\\input","D:\\BigData\\hadoop\\mr\\custom\\combiner\\output"};
            MovieDriver movieDriver = new MovieDriver();
            FolderUtil.delFolder(args[1]);
            movieDriver.run(args);
        }
    }
    
    
    展开全文
  • hadoop组件

    2019-10-11 20:34:24
    文章目录2019/9/161.hive的分隔符2.hive的文件存储格式3.hive的运行方式4.属性设置5.hive示图6.hive的jdbc7.hive优化2019/9/171.安装LZO压缩2.测试LZO压缩3.hive的存储过程4.tez安装过程5.TEZ的执行案例2019/9/181....

    2019/9/16

    1.hive的分隔符

    hive默认的列与列之间的分隔符是:\001,注意不是tab
    通常分隔符:
    tab
    ,
    " "
    |
    \n
    \001	^A (\u0001,注意不是\0001也不是\01)
    \002	^B
    \003	^C
    

    2.hive的文件存储格式

    hive默认的数据文件存储格式为:textfile
    textfile:普通的文本文件存储,不压缩。
    sequencefile:hive为用户提供的二进制存储,本身就压缩。不能用load方式加载数据
    rcfile:hive提供行列混合存储,hive在该格式下,将会尽量把附近的行和列的块尽量存储到一起。仍然压缩,查询效率较高。
    orc:优化后的rcfile。
    parquet:典型列式存储,自带压缩,查询较快。
    
    textFile:可以配合压缩配置属性进行压缩。
    map端压缩:
    mapreduce.map.output.compress = false
    mapreduce.map.output.codec = DefaultCodec
    reduce端压缩:
    mapreduce.output.fileoutputformat.compress = false
    mapreduce.output.fileoutputformat.compress.type=NONE/RECORD/BLOCK(默认RECORD)
    hive压缩配置:
    set hive.exec.compress.output=false;
    set hive.exec.compress.intermediate=false;
    set hive.intermediate.compression.codec=
    set hive.intermediate.compression.type=
    
    
    CREATE TABLE `u4`(
      `id` int,
      `name` string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    stored as textfile;
    
    set mapreduce.output.fileoutputformat.compress=true;
    set hive.exec.compress.output=true;
    insert into table u4
    select * from u2;
    
    2、sequence :
    CREATE TABLE `u4`(
      `id` int,
      `name` string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    stored as sequencefile;
    
    3、rcfile : 
    CREATE TABLE `u5`(
      `id` int,
      `name` string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    stored as rcfile;
    
    
    4、orc : 
    CREATE TABLE `u6`(
      `id` int,
      `name` string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    stored as orc;
    
    5、parquet
    CREATE TABLE `u7`(
      `id` int,
      `name` string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    stored as PARQUET;
    insert into table u7
    select * from u2;
    

    3.hive的运行方式

    1、cli:命令行(hive/beeline)如果启动beeline连接需要启动hiveserver2 
    hive --service hiveserver2 &
    hiveserver2 &
    
    beeline
    !connect jdbc:hive2://localhost:10000
    
    
    
    通过命令行 hive -e 'sql query';
    通过命令行 hive -f /hql文件   (生产线)
    
    
    hive -S --hivevar mapoutputdir=/home/hivedata/out/05 --hivevar textoutdir=/home/hivedata/out/06 --hivevar limit=1 -f ./hql
    
    注意:
    1、一个--hivevar 或者 --hiveconf 只能带一个参数
    2、--hiveconf 或者 --hivevar 可以混合使用
    3、--hiveconf 或 --hivevar 定义参数不能取消
    

    4.属性设置

    1、hive-site.xml (全局,配置公共和启动前必须配置元数据库的配置、日志配置等)
    2、hive通过命令行参数设置 hive --hiveconf a=10 -e ''
    3、set
    
    
    区别:
    属性优先级别从上往下一次升高。
    hive-site.xml是全局和永久的,其它两是临时和局部。
    hive-site.xml适合所有属性配置,而后两个对于系统级别的属性不能配置。
    比如启动所需的元数据库url、log配置等。
    

    5.hive示图

    hive的视图简单理解为逻辑上的表
    hive现目前只支持逻辑视图,不支持物化视图。
    
    hive的视图意义:
    1、对数据进行局部暴露(涉及隐私数据不暴露)。
    2、简化复杂查询。
    
    创建视图(cvas):
    
    create view if not exists tab_v1 
    as 
    select id from u2;
    
    查看视图:
    
    show tables;
    show create table tab_v1;
    desc tab_v1;
    
    删除视图:
    drop view if exists tab_v2;
    drop table if exists tab_v1;   (不支持)
    
    注意:
    1、不建议先删除视图对应的表后再查询视图。
    2、视图是不能用insert into 或者load 方式来加载数据。
    3、视图是只读,不能修改其结构、表相关属性。
    

    6.hive的jdbc

    1、conn、ps\rs的关闭顺序需要rs\ps\conn。
    2、连接的用户名和密码需要填写。
    

    7.hive优化

    1、explain 和 explain extended :
    explain : 只有对hql语句的解释。
    explain extended:对hql语句的解释,以及抽象表达式树的生成。
    
    stage 相当于一个job,一个stage可以是limit、也可以是一个子查询、也可以是group by等。
    hive默认一次只执行一个stage,但是如果stage之间没有相互依赖,将可以并行执行。
    任务越复杂,hql代码越复杂,stage越多,运行的时间一般越长。
    查看stage之间的依赖关系,stage的个数,也能查看执行顺序,可以改变hql语句调整执行顺序。
    stage原则是越少越好,依赖越简单越好,一个stage是一个mr或者mr的一部分。
    
    2、join
    hive的查询永远是小表(结果集)驱动大表(结果集)
    hive中的on的条件只能是等值 and连接 
    注意hive是否配置普通join转换成map端join、以及mapjoin小表文件大小的阀值
    注意hive的倾斜join:
    hive.optimize.skewjoin=false
    hive.skewjoin.key=100000
    hive.skewjoin.mapjoin.map.tasks=10000
    
    
    数据倾斜:
    数据倾斜:由于key分布不均匀造成的数据向一个方向偏离的现象。
    本身数据就倾斜
    join语句容易造成
    count(distinct col) 很容易造成倾斜
    group by 也可能会造成
    
    倾斜现象:
    卡在某一个reduce任务的某个进度。
    
    
    解决方法:
      2.1 找到造成数据倾斜的key,然后再通过hql语句避免(查看日志是哪个task失败--->找该task中关联字段、		  group by\count(distrinct col) ---> 抽样字段个数 ---> 判断是否是倾斜的key )。单独拿出来处	   理,然后在和正常的结果进行union all。
    
      2.2 造成倾斜的key加随机数(加的随机不能造成二次倾斜、保证加随机不能影响原有的业务)。
    
     	  select 
     	  t2.*
      	  from t_user2 t2
      	  join t_user2 t1
     	  on t2.id = t1.id
     	  ;
    
      2.3 设置相关倾斜的属性
    	  hive.map.aggr=true;
    	  hive.groupby.skewindata=false;  (建议开启)
     	  hive.optimize.skewjoin=false;
    	  skewjoin 先关属性查看:
    	  skew 相关的属性:
    
      2.4 如上都不行,则需要从新查看业务,优化语句流程。
    
    3、limit的优化:
    hive.limit.row.max.size=100000
    hive.limit.optimize.limit.file=10
    hive.limit.optimize.enable=false  (如果limit较多时建议开启)
    hive.limit.optimize.fetch.max=50000
    
    4、本地模式
    hive.exec.mode.local.auto=false (建议打开)
    hive.exec.mode.local.auto.inputbytes.max=134217728  (128M)
    hive.exec.mode.local.auto.input.files.max=4
    
    5、并行执行:
    hive.exec.parallel=false   (建议开启)
    hive.exec.parallel.thread.number=8
    
    6、严格模式
    hive.mapred.mode=nonstrict
    
    7、mapper和reducer的个数:
    不是mapper和redcuer个数越多越好,也不是越少越好。适合就好。
    
    将小文件合并处理(将输入类设置为:CombineTextInputFormat)
    通过配置将小文件合并:
    mapred.max.split.size=256000000   
    mapred.min.split.size.per.node=1
    mapred.min.split.size.per.rack=1
    hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
    
    手动设置:
    set mapred.map.tasks=2;
    
    reducer的个数(自动决定和手动设置):
    mapred.reduce.tasks=-1
    hive.exec.reducers.max=1009
    
    8、配置jvm重用:
    mapreduce.job.jvm.numtasks=1   ###
    
    mapred.job.reuse.jvm.num.tasks=1;
    
    
    
    10、索引是一种hive的优化:(索引并不好)
    
    11、分区本身就是hive的一种优化:
    
    12、job的数量:
    一般是一个查询产生一个job,然后通常情况一个job,可以是一个子查询、一个join、一个group by 、一个limit等一些操作。
    
    1个job:
    select
    t1.*
    from t_user1 t1
    left join t_user2 t2
    on t1.id = t2.id
    where t2.id is null
    ;
    
    如下3个job:
    select
    t1.*
    from t_user1 t1
    where id in (
    select
    t2.id
    from t_user2 t2
    limit 1
    )
    ;
    
    13、analyze:
    
    Analyze,分析表(也称为计算统计信息)是一种内置的Hive操作,可以执行该操作来收集表上的元数据信息。这可以极大的改善表上的查询时间,因为它收集构成表中数据的行计数,文件计数和文件大小(字节),并在执行之前将其提供给查询计划程序。
    
    已经存在表的Analyze语法:
    ANALYZE TABLE [db_name.]tablename [PARTITION(partcol1[=val1], partcol2[=val2], ...)]  -- (Note: Fully support qualified table name since Hive 1.2.0, see HIVE-10007.)
      COMPUTE STATISTICS 
      [FOR COLUMNS]          -- (Note: Hive 0.10.0 and later.)
      [CACHE METADATA]       -- (Note: Hive 2.1.0 and later.)
      [NOSCAN];
    
    例1(指定分区)、
    ANALYZE table dw_employee_hive partition(bdp_day=20190701) COMPUTE STATISTICS;
    收集表的bdp_day=20190701的这个分区下的所有列列相关信息。它是一个细粒度的分析语句。它收集指定的分区上的元数据,并将该信息存储在Hive Metastore中已进行查询优化。该信息包括每列,不同值的数量,NULL值的数量,列的平均大小,平均值或列中所有值的总和(如果类型为数字)和值的百分数。
    
    例2(指定所有列)、
    ANALYZE table dw_employee_hive partition(bdp_day=20190701) COMPUTE STATISTICS FOR COLUMNS;
    收集表的bdp_day=20190701的这个分区下的所有列相关信息。
    
    例3(指定某列)、
    ANALYZE table dw_employee_hive partition(bdp_day=20190701) COMPUTE STATISTICS FOR COLUMNS snum,dept;
    
    例4、
    ANALYZE TABLE dw_employee_hive partition(bdp_day=20190701) COMPUTE STATISTICS NOSCAN;
    收集指定分区相关信息,然后不进行扫描。
    
    测试分析后的结果。
    例1、
    DESCRIBE EXTENDED dw_employee_hive partition(bdp_day=20190701);
    
    描述结果:
    ...parameters:{totalSize=10202043, numRows=33102, rawDataSize=430326, ...
    
    例2、
    desc formatted dw_employee_hive partition(bdp_day=20190701) name;
    
    结果如下:
    # col_name  data_type   min max num_nulls   distinct_count  avg_col_len max_col_len num_trues   num_falses  comment
    name string 0 37199 4.0 4 from deserializer
    
    
    注意:
    对分区表的分析,一般都要指定分区,如对全表分析,则可以这样使用partition(bdp_day).
    

    2019/9/17

    1.安装LZO压缩

    1. 安装编译环境: yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool

    2. 下载源码::http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz

    3. 解压并编译:./configure -prefix=/home/hadoop/apps/lzo/ make&&make install

    4. 编译hadoop-lzo:下载源码,修改pom.xml文件,使用maven安装编译。

      mvn package -Dmaven.test.skip=true

    5. 进入target,将hadoop-lzo-0.4.21-SNAPSHOT.jar放到hadoop的classpath下。

    6. scp命令分发到其他服务器。

    7. core.xml配置如下:

      <property> 
          <name>io.compression.codecs</name> 		
          <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.Def aultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.Lz oCodec,com.hadoop.compression.lzo.LzopCodec
          </value> 
      </property>
      
      <property> 
          <name>io.compression.codec.lzo.class</name> 
          <value>com.hadoop.compression.lzo.LzoCodec</value>
      </property>
      

    2.测试LZO压缩

    CREATE TABLE lzo_test(
    id bigint,
    firstname string,
    lastname string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    STORED AS  
    INPUTFORMAT "com.hadoop.mapred.DeprecatedLzoTextInputFormat"
    OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
    指定输入格式和输出格式为lzo和普通。
    
    准备数据:
    19630001	john	lennon
    19630002	paul	mccartney
    19630003	george	harrison
    19630004	ringo	starr
    
    lzop ./lzodata.txt 压缩成lzo格式。
    
    在hive中加载数据
    
    load data local inpath '/home/hadoop/hivedata/lzodata.txt.lzo' into table lzo_test;
    
    select * from lzo_test
    结果:
    OK
    19630001        john    lennon
    19630002        paul    mccartney
    19630003        george  harrison
    19630004        ringo   starr
    
    索引lzo文件:
    hadoop jar /home/hadoop/apps/hadoop/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.DistributedLzoIndexer /user/hive2/warehouse/lzo_test/
    
    

    3.hive的存储过程

    1. 开启hiveserver2服务
    
    2. 创建一个文件,内容如下(demp.hpl):
    create function hello(text string)
    returnS string
    BEGIN
    RETRUEN 'Hello,' || text || '!';
    END;
    
    create procedure select_u53()
    begin
    FOR item IN(
    SELECT id,name FROM u5 limit 3
    )
    loop
            println item.id || '|' || item.name || '|' || hello(item.name);
    end loop;
    end;
    
    ||表示将输出连接到一起。
    
    3.执行命令
    hplsql -f demp.hpl -main select_u53()
    
    第二种方式:
    通过include调用:将下列内容写入文件(temp.sh),直接调用。
    include /home/hadoop/hivedata/temp.hql
    call select_u53();
    执行命令:
    hplsql -f demp.sh
    

    4.tez安装过程

    1. 下载tez的二进制文件。

    2. tar -zxvf /home/hadoop/apps/apache-tez-0.8.5-bin.tar.gz -C /home/hadoop/apps/tez-0.8.5

    3. 将tez中的target下的文件上传到hdfs上面。

    4. 配置环境变量,将tez中tez/lib下面的包加入环境变量。

    5. 将lzo的包加入hive的环境变量

    6. 配置tez-site.xml如下:

      <?xml version="1.0" encoding="UTF-8"?>
      <configuration>
      	<property>     
      		<name>tez.lib.uris</name>     
      		<value>${fs.defaultFS}/tez-0.9.0/tez.tar.gz</value>   
      	</property>
        	<property>
          	<name>tez.container.max.java.heap.fraction</name>
              <value>0.2</value>   
          </property>
      </configuration>
      
    7. 将tez-lib下的hadoop包换成相应的版本。

    5.TEZ的执行案例

    1. 更换引擎: set hive.execution.engine=tez;

    2. 执行查询:select count(*) from u5;看到如下画面:

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zWoWGlt5-1570797238633)(C:\Users\eRRRchou\Desktop\tez.JPG)]

    2019/9/18

    1.SQOOP

    1. 列出数据库:
    sqoop list-databases \
    --connect jdbc:mysql://mini01:3306 \
    --username root \
    --password root
    
    2.列出所有表:
    sqoop list-tables \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root
    
    3.导入数据到hdfs:
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root \
    --table stu \
    -m 1 \
    --fields-terminated-by '\t' \
    --compress \
    --target-dir /sqoop/out/stu/01
    使用lzo压缩之后,是.lzo_default格式。
    
    4.query导入(不能和table公用)
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root \
    --query 'select name from stu where id = 1000 and $CONDITIONS' \
    -m 1 \
    --fields-terminated-by '\t' \
    --compress \
    --target-dir /sqoop/out/stu/02
    query使用=>""需要在$CONDITIONS前面加一个\
    
    5.导入到Hive
    5.1创建表
    CREATE TABLE sq1(
    id bigint,  
    firstname string,
    age string
    ) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
    ;
    
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root --password root \
    --table stu \
    --null-string '\\N' \
    --null-non-string 0 \
    --fields-terminated-by '\t' \
    --hive-import \
    --hive-overwrite \
    --hive-table sq1 \
    -m 1
    
    6.增量导入:append
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root \
    --table stu \
    -m 1 \
    --incremental append \
    --check-column id \
    --last-value 0 \
    --target-dir /sqoop/out/stu/03
    -----------------------------------------------------
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root \
    --table stu \
    -m 1 \
    --incremental append \
    --check-column id \
    --last-value 1000 \
    --target-dir /sqoop/out/03
    
    不包括last-value
    
    6.增量导入:lastmodified
    CREATE TABLE sq2(
    id bigint,  
    firstname string,
    dt string
    ) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
    ;
    
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root \
    --table stu1 \
    --fields-terminated-by '\t' \
    --hive-import \
    --hive-overwrite \
    --hive-table sq2 \
    -m 1 \
    --incremental lastmodified \
    --merge-key id \
    --check-column dt \
    --last-value "2019-09-14 08:20:44"
    ===================================================
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root \
    --table stu1 \
    --fields-terminated-by '\t' \
    --hive-import \
    --hive-table sq2 \
    -m 1 \
    --incremental lastmodified \
    --merge-key id \
    --check-column dt \
    --last-value "2019-09-14 08:30:04"
    
    7.分区增量导入:
    CREATE TABLE sq3(
    id int,  
    name string
    )
    partitioned by(dt string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t' 
    ;
    
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root \
    --table stu1 \
    --fields-terminated-by '\t' \
    --hive-import \
    --hive-table sq3 \
    --hive-partition-key DT \
    --hive-partition-value "2019-9-14" \
    -m 1 \
    --incremental lastmodified \
    --merge-key id \
    --check-column dt \
    --last-value "2019-09-14 08:30:00"
    ==================================================
    sqoop import \
    --connect jdbc:mysql://mini01:3306/test \
    --username root \
    --password root \
    --table stu1 \
    --fields-terminated-by '\t' \
    --hive-import \
    --hive-table sq3 \
    --hive-partition-key DT \
    --hive-partition-value "2019-9-14" \
    -m 1 \
    --incremental lastmodified \
    --check-column dt \
    --last-value "2019-09-14 08:54:19"
    

    2.DATAX

    2.1 hdfsToMysql

    {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "hdfsreader",
                        "parameter": {
                            "path": "/dataxout/mysqlToHdfs__ae2af2d8_7ec4_4d31_a50e_3e3ac2d3f2df",
                            "defaultFS": "hdfs://mini02:9000",
                            "column": ["*"],
                            "fileType": "text",
                            "encoding": "UTF-8",
                            "fieldDelimiter": " "
                        }
    
                    },
                    "writer": {
                        "name": "mysqlwriter",
                        "parameter": {
                            "writeMode": "insert",
                            "username": "root",
                            "password": "root",
                            "column": [
                                "name",
                                "age"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://mini01:3306/test?useUnicode=true&characterEncoding=utf8",
                                    "table": [
                                        "person1"
                                    ]
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
    
    

    2.2 MysqlToHdfs

      {
        "job": {
            "setting": {
                "speed": {
                    "channel": 1
                }
            },
            "content": [
                {
                    "reader": { 
                        "name": "mysqlreader",
                        "parameter": { 
                            "username": "root", 
                            "password": "root",  
                            "column": ["user_id","user_name","trade_time"], 
                            "connection": [ 
                                { 
                                    "table": [ 
                                        "usertable" 
                                    ], 
                                    "jdbcUrl": [ 
                                        "jdbc:mysql://mini01/test?characterEncoding=utf8"
                                    ] 
                                } 
                            ] 
                        } 
                    }, 
              "writer": {
                        "name": "hdfswriter",
                        "parameter": {
                            "defaultFS": "hdfs://mini02:9000/",
                            "fileType": "orc",
                            "path": "/dataxout",
                            "fileName": "mysqlToHdfs2",
                             "column":
                             [
                                {
                                  "name": "user_id",
                                  "type": "bigint"
                                },
                                {
                                  "name": "user_name",
                                  "type": "string"
                                },
    			    {
                                  "name": "trade_time",
                                  "type": "string"
                                }
                             ],
                            "writeMode": "append",
                            "fieldDelimiter": " "
                        }
                    }
                }
            ]
        }
    }
    
    

    2019/9/19

    1.FLUME的组件:

    source : 数据源组件,专门读取相对应的数据,并将数据传到channel中.
    channel : 管道,用于连接source和sink
    sink : 数据下沉组件,用于将channel中的数据持久化到对应的文件系统中或者流中。
    agent : flume的运行单元,里面必须包含一个或者多个source、channel、sink,运行在单个jvm中。
    event : 事件,是数据的描述。
    interceptor : 拦截器,作用于source阶段,用于过滤数据。
    selectorer : 选择器,作用于source阶段,默认是replicating,也就是复用功能。mutiplxing
    groupsinks : sink组,用于将多个sink选择sink。
    常见的组件:
    source常用组件:exec、avro source 、 spooling dirctory 、kafka 、netcat 、http、自定义
    channel常用:memory 、 file 、kafka 、 jdbc
    sinks常用 : logger 、avro 、 hdfs 、hive 、hbase、kafka等
    

    2.FLUME案例

    2.1 exec+memory+logger

    #定义source|channel|sink组件
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #配置r1的属性
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/hadoop/log.txt
    #配置sinks的属性
    a1.sinks.k1.type = logger
    #配置channel的属性
    a1.channels.c1.type = memory
    #通道中存储的最大事件数
    a1.channels.c1.capacity = 10000
    #每次交易通道从源或汇给接收器的最大事件数
    a1.channels.c1.transactionCapacity = 10000
    
    #绑定source与sink与channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    flume-ng agent -c conf/  -f conf/execToLogger.conf -n a1 -Dflume.root.logger=INFO,console
    

    2.2 exec+memory+hdfs

    #定义source|channel|sink组件
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    #定义来源
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/f_log
    
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /flume/events/dt=%y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = events-
    #文件夹滚动的时间 roundValue*roundUnit
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 24
    a1.sinks.k1.hdfs.roundUnit = hour
    #本地时间戳
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    a1.sinks.k1.hdfs.fileType = DataStream
    #文件滚动的大小 默认1024K
    #a1.sinks.k1.hdfs.rollSize = 100
    #配置channel的属性
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 10000
    
    #配置channel的属性
    a1.channels.c1.type = memory
    #通道中存储的最大事件数
    a1.channels.c1.capacity = 10000
    #每次交易通道从源或汇给接收器的最大事件数
    a1.channels.c1.transactionCapacity = 10000
    
    #绑定source与sink与channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    
    #hdfs.rollInterval      滚动当前文件之前要等待的秒数(0 =根据时间间隔从不滚动)
    #hdfs.rollSize	        触发滚动的文件大小,以字节为单位(0:从不基于文件大小滚动)
    #hdfs.rollCount	    	滚动之前写入文件的事件数(0 =基于事件数从不滚动)
    
    
    flume-ng agent -c conf/  -f conf/execToHdfs.conf -n a1 -Dflume.root.logger=INFO,console
    

    2.3 spooldir+memory+logger

    #定义source|channel|sink组件
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #配置r1的属性
    a1.sources.r1.type = spoolDir
    a1.sources.r1.spoolDir =  /home/hadoop/log
    #是否以绝对路径名称开头
    a1.sources.r1.fileHeader = true
    a1.sources.r1.fileHeaderKey = file
    
    #配置sinks的属性
    a1.sinks.k1.type = logger
    #配置channel的属性
    a1.channels.c1.type = memory
    #通道中存储的最大事件数
    a1.channels.c1.capacity = 10000
    #每次交易通道从源或汇给接收器的最大事件数
    a1.channels.c1.transactionCapacity = 10000
    
    #绑定source与sink与channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    flume-ng agent -c conf/  -f conf/spoodirToLogger.conf -n a1 -Dflume.root.logger=INFO,console
    往 /home/hadoop/log 丢文件。
    

    2.4 syslogtcp+memory+logger

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    #监听主机和端口
    a1.sources.r1.type=syslogtcp
    a1.sources.r1.port=6666
    a1.sources.r1.host=mini01
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    #添加或删除事件的超时时间
    a1.channels.c1.keep-alive=3
    #定义byteCapacity和通道中所有事件的估计总大小之间的缓冲区百分比
    a1.channels.c1.byteCapacityBufferPercentage = 20
    #该通道中所有事件的总和所允许的最大内存总字节数。
    a1.channels.c1.byteCapacity = 800000
    
    a1.sinks.s1.type = logger
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    flume-ng agent -c conf/  -f conf/tcpToLogger.conf -n a1 -Dflume.root.logger=INFO,console
    
    在本机上面使用nc命令发送信息:echo "hello world" | nc mini01 6666
    

    2.5 http+memory+logger

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=org.apache.flume.source.http.HTTPSource
    a1.sources.r1.port=6666
    a1.sources.r1.bind=mini01
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    a1.channels.c1.keep-alive=3
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
    
    a1.sinks.s1.type = logger
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    
    flume-ng agent -c conf/  -f conf/httpToLogger.conf -n a1 -Dflume.root.logger=INFO,console
    
    使用命令: curl -X POST -d '[{"headers":{"time":"2017-06-13"},"body":"this is http"}]' http://mini01:6666 
    

    2.6 exec+file+hdfs

    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=exec
    a1.sources.r1.command= tail -f /home/hadoop/log
    
    a1.channels.c1.type=file
    #存储检查点文件的目录
    a1.channels.c1.checkpointDir=/home/hadoop/checkpoint
    #用于存储日志文件
    a1.channels.c1.dataDirs=/home/hadoop/data
    
    a1.sinks.s1.type = hdfs
    #文件路径
    a1.sinks.s1.hdfs.path = hdfs://mini01/flume/nshop/%y-%m-%d/%H%M/%S
    #开头
    a1.sinks.s1.hdfs.filePrefix = event-
    #结尾
    a1.sinks.s1.hdfs.fileSuffix=.json
    #正在写入
    a1.sinks.s1.hdfs.inUseSuffix=.tmp
    a1.sinks.s1.hdfs.rollInterval=2
    #滚动大小
    a1.sinks.s1.hdfs.rollSize=1024
    a1.sinks.s1.hdfs.fileType=DataStream
    a1.sinks.s1.hdfs.writeFormat=Text
    a1.sinks.s1.hdfs.round = true
    #滚动文件夹
    a1.sinks.s1.hdfs.roundValue = 1
    a1.sinks.s1.hdfs.roundUnit = second
    a1.sinks.s1.hdfs.useLocalTimeStamp=ture
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    
    flume-ng agent -c conf/  -f conf/execAndFileToHdfs.conf -n a1 -Dflume.root.logger=INFO,console
    flume-ng agent -c conf/  -f conf/colony.conf -n a1 -Dflume.root.logger=INFO,console
    

    2.7 集群配置

    主节点:mini01:
    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    
    a1.sources.r1.type=avro
    a1.sources.r1.port=6666
    a1.sources.r1.bind=mini01
    
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    a1.channels.c1.keep-alive=3
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
    
    a1.sinks.s1.type =logger
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    
    从节点:mini02:
    #配置sources,channels,sinks
    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    #配置监听nc
    a1.sources.r1.type=syslogtcp
    a1.sources.r1.port=6666
    a1.sources.r1.host=mini02
    #配置管道
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    a1.channels.c1.keep-alive=3
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
    #配置下沉
    a1.sinks.s1.type = avro
    a1.sinks.s1.hostname=mini01
    a1.sinks.s1.port=6666
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    从节点:mini03:
    
    #配置下沉
    a1.sinks.s1.type = logger
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    [hadoop@mini03 flume]$ ^C
    [hadoop@mini03 flume]$ cat conf/colony.conf 
    #配置sources,channels,sinks
    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    #配置监听nc
    a1.sources.r1.type=syslogtcp
    a1.sources.r1.port=6666
    a1.sources.r1.host=mini03
    #配置管道
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    a1.channels.c1.keep-alive=3
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
    #配置下沉
    a1.sinks.s1.type = avro
    a1.sinks.s1.hostname=mini01
    a1.sinks.s1.port=6666
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    

    2.8 作业

    主节点:
    mini02:
    #配置sources,channels,sinks
    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    #配置监听avro
    a1.sources.r1.type=avro
    a1.sources.r1.port=6666
    a1.sources.r1.bind=mini02
    #配置管道
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    a1.channels.c1.keep-alive=3
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
    #配置下沉
    a1.sinks.s1.type = logger
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    
    mini03:
    #配置sources,channels,sinks
    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
    #配置监听nc
    a1.sources.r1.type=avro
    a1.sources.r1.port=6666
    a1.sources.r1.bind=mini03
    #配置管道
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    a1.channels.c1.keep-alive=3
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
    #配置下沉
    a1.sinks.s1.type = logger
    
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1
    
    从节点:mini01
    #配置三个组件
    a1.sources=r1
    a1.channels=c1 c2
    a1.sinks=s1 s2
    #配置监听net端口
    a1.sources.r1.type=syslogtcp
    a1.sources.r1.port=6666
    a1.sources.r1.bind=mini01
    #配置管道1
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
    a1.channels.c1.keep-alive=3
    a1.channels.c1.byteCapacityBufferPercentage = 20
    a1.channels.c1.byteCapacity = 800000
    #配置管道2
    a1.channels.c2.type=memory
    a1.channels.c2.capacity=1000
    a1.channels.c2.transactionCapacity=100
    a1.channels.c2.keep-alive=3
    a1.channels.c2.byteCapacityBufferPercentage = 20
    a1.channels.c2.byteCapacity = 800000
    #配置下沉1
    a1.sinks.s1.type = avro
    a1.sinks.s1.hostname = mini02
    a1.sinks.s1.port = 6666
    #配置下沉2
    a1.sinks.s2.type = avro
    a1.sinks.s2.hostname = mini03
    a1.sinks.s2.port = 6666
    
    a1.sources.r1.channels=c1 c2
    a1.sinks.s1.channel=c1
    a1.sinks.s2.channel=c2
    
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = s1 s2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.s1 = 5
    a1.sinkgroups.g1.processor.priority.s2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    

    2.9 拦截器

    a1.sources.r1.interceptors = i1 i2 i3
    #时间戳拦截器
    #该拦截器将事件处理的时间(以毫秒为单位)插入到事件标头中。
    a1.sources.r1.interceptors.i1.type = timestamp
    a1.sources.r1.interceptors.i1.preserveExisting=true
    
    #主机拦截器
    #该拦截器将插入正在运行该代理的主机的主机名或IP地址。它会host根据配置插入带有密钥或已配置密钥的标头,其值##是主机的主机名或IP地址。
    a1.sources.r1.interceptors.i2.type = host
    a1.sources.r1.interceptors.i2.hostHeader = hostname
    a1.sources.r1.interceptors.i2.preserveExisting=true
    
    #静态拦截器
    #静态拦截器允许用户向所有事件添加带有静态值的静态标头。
    a1.sources.r1.interceptors.i3.type = static
    a1.sources.r1.interceptors.i3.key = city
    a1.sources.r1.interceptors.i3.value = NEW_YORK
    
    
    #正则表达式	“*”	用于与事件匹配的正则表达式
    #excludeEvents	假	如果为true,则正则表达式确定要排除的事件,否则正则表达式确定要包括的事件。
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = regex_filter
    a1.sources.r1.interceptors.i1.regex=^[0-9].*$
    a1.sources.r1.interceptors.i1.excludeEvents=false
    
    展开全文
  • Hadoop的三大核心组件分别是: HDFS(Hadoop Distribute File System):hadoop的数据存储工具。 YARN(Yet Another Resource Negotiator,另一种资源协调者):Hadoop 的资源管理器。 Hadoop MapReduce 分布式计算...
  • Hadoop Combiner组件

    2016-05-12 10:30:20
    Combiner组件是针对每一个map进行局部处理。。。。 什么时候运行Combiner: 1、当job设置了Combiner,并且spill( 达到溢写阈值后就会写入到磁盘中)的个数达到min.num.spill.for.combine(默认是3)的时候...
  • hadoop重要组件

    2020-02-02 11:45:56
    1.hdfs重要组件: Configration FileSystem FileStatus BlockLocation Path IOUtils 2.mapreduce重要组件 ...Mapper Reducer LineRecordReader:默认进行切片读取的组件(initialize方法) MapperT...
  • Hadoop学习(二)Hadoop三大核心组件

    万次阅读 多人点赞 2018-08-25 00:16:30
    Hadoop的三大核心组件分别是: HDFS(Hadoop Distribute File System):hadoop的数据存储工具。 YARN(Yet Another Resource Negotiator,另一种资源协调者):Hadoop 的资源管理器。 Hadoop MapReduce:分布式...
  • Hadoop学习——MapReduce的组件及简单API   上一篇参考Hadoop学习——MapReduce的简单介绍及执行步骤 ...一、Mapper组件 介绍   可以读取文件,默认是一行一行读取,把输入 key和value通过map()传给程序员,输...
  • 本文将会浅析Hadoop生态的发展历程及其中部分组件的技术原理,最终就Hadoop是否会被Spark取代给出结论。 一、Hadoop的核心组件 在对Hadoop核心组件进行介绍之前,我们需要先了解Hadoop解决了什么问题。Hadoop...
  • 1.1hadoop的概念 Hadoop是 Apache基金会下一个开源的分布式计算平台,它以分布式文件系统HDFS和MapReduce算法为核心,为用户提供了系统底层细节透明的分布式基础架构。用户可以在不了解分布式底层细节的情况下,...
  • Hadoop(17) MR 决定Mapper数量因素

    千次阅读 2016-10-03 19:49:57
    进入 org.apache.hadoop.mapreduce.Job下的waitForCompletion() ————————————————————————————————————————————————————————————> submit();Job中的...
  • MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。 大数据量计算 利用了hdfs做了存储,计算任务是不是要分散计算 (1)任务的分配 (2)...
  • Hadoop的三大核心组件

    2021-07-19 16:01:40
    Hadoop的三大核心组件分别是: HDFS(Hadoop Distribute File System):hadoop的数据存储工具。 YARN(Yet Another Resource Negotiator,另一种资源协调者):Hadoop 的资源管理器。 Hadoop MapReduce:分布式计算...
  • hadoop、spark各大数据组件介绍

    千次阅读 2019-10-09 15:20:53
    hadoop相关组件 hadoop体系结构,如图: hadoop核心设计,如图 Hadoop Common Hadoop体系最底层的一个模块,为Hadoop各子项目提供各种工具,如:配置文件和日志操作等。 HDFS 是Hadoop应用程序中主要的...
  • hadoop组件mr

    2019-09-20 16:12:17
    1.Combiner是MR程序中Mapper和Reduce之外的一种组件 2.Combiner组件的父类就是Reducer 3.Combiner和Reducer之间的区别在于运行的位置 4.Reducer是每一个接收全局的Map Task 所输出的结果 5.Combiner是在Map...
  • Hadoop生态圈重要组件整理

    千次阅读 2019-02-12 18:56:13
    Hadoop生态圈重要组件的概论与简述 初学hadoop的朋友们一定知道,Hadoop是由Apache开发的分布式系统基础架构,类似于自然界中的生态系统。这个系统中包含多个组件,共同完成分布式框架处理的任务。这里为大家整理了...
  • 完整的分布式运算程序,并发运行在一个Hadoop集群 上。 二、MapReduer的优缺点 2.1 MapReduce的优点 1. MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可 以分布到大量廉价...
  • Hadoop核心组件之MapReduce详解
  • 一、Hadoop核心组件 首先要介绍一下Hadoop,现在Hadoop分为3部分,分别是HDFS,Yarn和Mrv2 Hadoop Common Hadoop Common是在Hadoop0.2版本之后分离出来的HDFS和MapReduce独立子项目的内容,是Hadoop的核心部分,...
  • hadoop

    2021-01-02 15:34:08
    目录hadoop简介及搭建HDFSHDFS常用命令HDFS结构及特点Blocknamenodedatanodefsimage和editsdfs目录HDFS文件传输过程java连接HDFS搭建分布式计算mapreduce示例代码编写map组件编写reduce组件编写driver打包运行案例...
  • Hadoop组件/流程/配置

    2019-04-22 21:50:55
    一.Hadoop 1.Hadoop是一个能够对大量数据进行分布式处理的软件框架。Hadoop 以一种可靠、高效、可伸缩的方式进行数据处理。 Hadoop 是可靠的,因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保...
  • 文章目录一、安装1.1 准备...hadoop-3.1.4.tar.gz jdk-8u271-linux-x64.tar.gz 虚拟机规划 主机名 IP NN DN RM NM bigdata01 192.168.1.101 Y Y Y bigdata02 192.168.1.102 Y Y Y bigdata03 192.168.1
  • Hadoop组件之MapReduce框架

    千次阅读 2016-06-04 18:34:48
    Hadoop组件之MapReduce框架此处的介绍不是书上的抄写,只作理解为主,所以请谅解语言用词的精确性。而且不可能用一篇文章就学会MapReduce 框架,仅作为引导。认识MapReduceMapReduce是一种编程模型,用于大规模数据...
  • Hadoop组件之Combiner

    2017-07-27 07:15:12
    combiner组件
  • Hadoop生态圈组件介绍 Hadoop生态图,家族产品,通俗地说,就是Hadoop核心模块和衍生的子项目。常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,992
精华内容 2,396
关键字:

组件运行hadoop的mapper