精华内容
下载资源
问答
  • Hadoop项目实战

    2017-03-29 17:57:00
    这个项目是流量经营项目,通过Hadoop的离线数据项目。 运营商通过HTTP日志,分析用户的上网行为数据,进行行为轨迹的增强。 HTTP数据格式为: 流程: 系统架构: 技术选型: 这里只针对其中的一...

    这个项目是流量经营项目,通过Hadoop的离线数据项目。

    运营商通过HTTP日志,分析用户的上网行为数据,进行行为轨迹的增强。

     

    HTTP数据格式为:

    流程:

    系统架构:

    技术选型:

    这里只针对其中的一个功能进行说明:

    其中规则库是人工填充的,实例库是采用机器学习自动生成的,形式都是<url,info>。

    (一)统计流量排名前80%的URL,只有少数的URL流量比特别高,绝大多数的URL流量极低,没有参考价值,应当舍弃。

    FlowBean.java:

    package cn.itcast.hadoop.mr.flowsum;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    public class FlowBean implements WritableComparable<FlowBean>{
        
        
        private String phoneNB;
        private long up_flow;
        private long d_flow;
        private long s_flow;
        
        //在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
        public FlowBean(){}
        
        //为了对象数据的初始化方便,加入一个带参的构造函数
        public FlowBean(String phoneNB, long up_flow, long d_flow) {
            this.phoneNB = phoneNB;
            this.up_flow = up_flow;
            this.d_flow = d_flow;
            this.s_flow = up_flow + d_flow;
        }
    
        public void set(String phoneNB, long up_flow, long d_flow) {
            this.phoneNB = phoneNB;
            this.up_flow = up_flow;
            this.d_flow = d_flow;
            this.s_flow = up_flow + d_flow;
        }
        
        
        
        public String getPhoneNB() {
            return phoneNB;
        }
    
        public void setPhoneNB(String phoneNB) {
            this.phoneNB = phoneNB;
        }
    
        public long getUp_flow() {
            return up_flow;
        }
    
        public void setUp_flow(long up_flow) {
            this.up_flow = up_flow;
        }
    
        public long getD_flow() {
            return d_flow;
        }
    
        public void setD_flow(long d_flow) {
            this.d_flow = d_flow;
        }
    
        public long getS_flow() {
            return s_flow;
        }
    
        public void setS_flow(long s_flow) {
            this.s_flow = s_flow;
        }
    
        
        
        //将对象数据序列化到流中
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeUTF(phoneNB);
            out.writeLong(up_flow);
            out.writeLong(d_flow);
            out.writeLong(s_flow);
            
        }
    
        
        //从数据流中反序列出对象的数据
        //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
        @Override
        public void readFields(DataInput in) throws IOException {
    
            phoneNB = in.readUTF();
            up_flow = in.readLong();
            d_flow = in.readLong();
            s_flow = in.readLong();
            
        }
        
        
        @Override
        public String toString() {
    
            return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
        }
    
        @Override
        public int compareTo(FlowBean o) {
            return s_flow>o.getS_flow()?-1:1;
        }
        
    
    }

     

    TopkURLMapper.java:

    package cn.itcast.hadoop.mr.llyy.topkurl;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import cn.itcast.hadoop.mr.flowsum.FlowBean;
    
    public class TopkURLMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    
        private FlowBean bean = new FlowBean();
        private Text k = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
    
            String line = value.toString();
    
            String[] fields = StringUtils.split(line, "\t");
            try {
                if (fields.length > 32 && StringUtils.isNotEmpty(fields[26])
                        && fields[26].startsWith("http")) {
                    String url = fields[26];
    
                    long up_flow = Long.parseLong(fields[30]);
                    long d_flow = Long.parseLong(fields[31]);
    
                    k.set(url);
                    bean.set("", up_flow, d_flow);
    
                    context.write(k, bean);
                }
            } catch (Exception e) {
    
                System.out.println();
    
            }
        }
    
    }

    TopkURLReducer.java:

    package cn.itcast.hadoop.mr.llyy.topkurl;
    
    import java.io.IOException;
    import java.util.Map.Entry;
    import java.util.Set;
    import java.util.TreeMap;
    
    import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import cn.itcast.hadoop.mr.flowsum.FlowBean;
    
    public class TopkURLReducer extends Reducer<Text, FlowBean, Text, LongWritable>{
        private TreeMap<FlowBean,Text> treeMap = new TreeMap<>();
        private double globalCount = 0;
        
        
        // <url,{bean,bean,bean,.......}>
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values,Context context)
                throws IOException, InterruptedException {
            Text url = new Text(key.toString());
            long up_sum = 0;
            long d_sum = 0;
            for(FlowBean bean : values){
                
                up_sum += bean.getUp_flow();
                d_sum += bean.getD_flow();
            }
            
            FlowBean bean = new FlowBean("", up_sum, d_sum);
            //每求得一条url的总流量,就累加到全局流量计数器中,等所有的记录处理完成后,globalCount中的值就是全局的流量总和
            globalCount += bean.getS_flow();
            treeMap.put(bean,url);
    
        }
        
        
        //cleanup方法是在reduer任务即将退出时被调用一次
        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {
    
            Set<Entry<FlowBean, Text>> entrySet = treeMap.entrySet();
            double tempCount = 0;
            
            for(Entry<FlowBean, Text> ent: entrySet){
                
                if(tempCount / globalCount < 0.8){
                    
                    context.write(ent.getValue(), new LongWritable(ent.getKey().getS_flow()));
                    tempCount += ent.getKey().getS_flow();
                    
                }else{
                    return;
                }
                
                
            }
            
            
            
        }
        
    }

    TopkURLRunner.java:

    package cn.itcast.hadoop.mr.llyy.topkurl;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import cn.itcast.hadoop.mr.flowsum.FlowBean;
    
    public class TopkURLRunner extends Configured implements Tool{
    
        @Override
        public int run(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(TopkURLRunner.class);
            
            job.setMapperClass(TopkURLMapper.class);
            job.setReducerClass(TopkURLReducer.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            return job.waitForCompletion(true)?0:1;
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new TopkURLRunner(), args);
            System.exit(res);
            
        }
        
    }

    (二)将统计的URL导入到数据库中,这是URL规则库,一共就两个字段,URL和info说明,info是人工来实现,贴上标签。

    将上面的运行结果通过sqoop导入到数据库中,然后通过数据库读取再跑mapreduce程序。

    DBLoader.java:数据库的工具类。

    package cn.itcast.hadoop.mr.llyy.enhance;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.Statement;
    import java.util.HashMap;
    
    public class DBLoader {
    
        public static void dbLoader(HashMap<String, String> ruleMap) {
    
            Connection conn = null;
            Statement st = null;
            ResultSet res = null;
            
            try {
                Class.forName("com.mysql.jdbc.Driver");
                conn = DriverManager.getConnection("jdbc:mysql://weekend01:3306/urlcontentanalyse", "root", "root");
                st = conn.createStatement();
                res = st.executeQuery("select url,info from urlrule");
                while (res.next()) {
                    ruleMap.put(res.getString(1), res.getString(2));
                }
    
            } catch (Exception e) {
                e.printStackTrace();
                
            } finally {
                try{
                    if(res!=null){
                        res.close();
                    }
                    if(st!=null){
                        st.close();
                    }
                    if(conn!=null){
                        conn.close();
                    }
    
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
    
        }
        
        
        public static void main(String[] args) {
            DBLoader db = new DBLoader();
            HashMap<String, String> map = new HashMap<String,String>();
            db.dbLoader(map);
            System.out.println(map.size());
        }
    
    }

    LogEnhanceOutputFormat.java:默认是TextOutputFormat,这里我需要实现将不同的结果输到不同的文件中,而不是_SUCCESS中,所以我需要重写一个format。

    package cn.itcast.hadoop.mr.llyy.enhance;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class LogEnhanceOutputFormat<K, V> extends FileOutputFormat<K, V> {
    
        @Override
        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
                throws IOException, InterruptedException {
            
            FileSystem fs = FileSystem.get(new Configuration());
            FSDataOutputStream enhancedOs = fs.create(new Path("/liuliang/output/enhancedLog"));
            FSDataOutputStream tocrawlOs = fs.create(new Path("/liuliang/output/tocrawl"));
            
            return new LogEnhanceRecordWriter<K, V>(enhancedOs,tocrawlOs);
        }
    
        
        public static class LogEnhanceRecordWriter<K, V> extends RecordWriter<K, V>{
            private FSDataOutputStream enhancedOs =null;
            private FSDataOutputStream tocrawlOs =null;
    
            public LogEnhanceRecordWriter(FSDataOutputStream enhancedOs,FSDataOutputStream tocrawlOs){
                
                this.enhancedOs = enhancedOs;
                this.tocrawlOs = tocrawlOs;
                
            }
            
            
            @Override
            public void write(K key, V value) throws IOException,
                    InterruptedException {
    
                if(key.toString().contains("tocrawl")){
                    tocrawlOs.write(key.toString().getBytes());
                }else{
                    enhancedOs.write(key.toString().getBytes());
                }
                
                
            }
    
            @Override
            public void close(TaskAttemptContext context) throws IOException,
                    InterruptedException {
    
                if(enhancedOs != null){
                    enhancedOs.close();
                }
                if(tocrawlOs != null){
                    tocrawlOs.close();
                }
            
                
                
            }
            
            
            
        }
        
        
    }

     

    然后再从所有原始日志中抽取URL,查询规则库,如果由info标签,则追加在原始日志后面。否则,这个URL就是带爬取URL,后面追加tocrawl,这两种不同情况要输出到不同文件中。

    LogEnhanceMapper.java:

    package cn.itcast.hadoop.mr.llyy.enhance;
    
    import java.io.IOException;
    import java.util.HashMap;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * 
     * 
     * 读入原始日志数据,抽取其中的url,查询规则库,获得该url指向的网页内容的分析结果,追加到原始日志后
     * 
     * @author duanhaitao@itcast.cn
     * 
     */
    
    // 读入原始数据 (47个字段) 时间戳 ..... destip srcip ... url .. . get 200 ...
    // 抽取其中的url查询规则库得到众多的内容识别信息 网站类别,频道类别,主题词,关键词,影片名,主演,导演。。。。
    // 将分析结果追加到原始日志后面
    // context.write( 时间戳 ..... destip srcip ... url .. . get 200 ...
    // 网站类别,频道类别,主题词,关键词,影片名,主演,导演。。。。)
    // 如果某条url在规则库中查不到结果,则输出到带爬清单
    // context.write( url tocrawl)
    public class LogEnhanceMapper extends
            Mapper<LongWritable, Text, Text, NullWritable> {
    
        private HashMap<String, String> ruleMap = new HashMap<>();
    
        // setup方法是在mapper task 初始化时被调用一次
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            DBLoader.dbLoader(ruleMap);
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
    
            String line = value.toString();
    
            String[] fields = StringUtils.split(line, "\t");
            try {
                if (fields.length > 27 && StringUtils.isNotEmpty(fields[26])
                        && fields[26].startsWith("http")) {
                    String url = fields[26];
                    String info = ruleMap.get(url);
                    String result = "";
                    if (info != null) {
                        result = line + "\t" + info + "\n\r";
                        context.write(new Text(result), NullWritable.get());
                    } else {
                        result = url + "\t" + "tocrawl" + "\n\r";
                        context.write(new Text(result), NullWritable.get());
                    }
    
                } else {
                    return;
                }
            } catch (Exception e) {
                System.out.println("exception occured in mapper.....");
            }
        }
    
    }

    LogEnhanceRunner.java:

    package cn.itcast.hadoop.mr.llyy.enhance;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class LogEnhanceRunner extends Configured implements Tool{
    
        @Override
        public int run(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(LogEnhanceRunner.class);
            
            job.setMapperClass(LogEnhanceMapper.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            
            job.setOutputFormatClass(LogEnhanceOutputFormat.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            return job.waitForCompletion(true)?0:1;
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new LogEnhanceRunner(),args);
            System.exit(res);
        }
        
        
    }

    这里不写reduce也行。

     

    MapReduce实现Top K问题:https://blog.csdn.net/u011750989/article/details/11482805?locationNum=5

    转载于:https://www.cnblogs.com/DarrenChan/p/6640983.html

    展开全文
  • Hadoop项目实战:新闻离线项目分析 大数据高级架构师,多年大数据项目架构...

    扫码下载「CSDN程序员学院APP」,1000+技术好课免费看

    APP订阅课程,领取优惠,最少立减5元 ↓↓↓

    订阅后:请点击此处观看视频课程

     

    视频教程-Hadoop项目实战:新闻离线项目分析-Hadoop

    学习有效期:永久观看

    学习时长:1544分钟

    学习计划:26天

    难度:

     

    口碑讲师带队学习,让你的问题不过夜」

    讲师姓名:易水

    架构师

    讲师介绍:大数据高级架构师,多年大数据项目架构及研发经验,同时有丰富的授课经验。致力于打造大数据精品课程,让每位学习大数据的同学,学的起、学得会、学的好。

    ☛点击立即跟老师学习☚

     

    「你将学到什么?」

    本课程为项目实战课,项目各个环节既深入讲解理论知识,又结合项目业务进行实操,从而达到一站式掌握大数据离线项目。

     

    「课程学习目录」

    第1章:大数据平台集群节点准备
    1.虚拟机介绍
    2.VMware虚拟机安装
    3.Linux操作系统介绍
    4.虚拟机安装1
    5.虚拟机安装2
    6.Linux网络配置
    7.Linux静态IP配置
    8.Linux虚拟机克隆
    9.X-Shell集群连接工具使用
    10.Linux创建用户和用户组
    11.Linux主机名配置
    12.Linux防火墙关闭
    13.openssh-clients服务安装
    14.配置主机名与IP地址映射
    15.SSH免密码登录
    16.FileZilla集群上传与下载工具的安装与使用
    17.集群节点2完整配置
    18.集群节点3完整配置
    第2章:项目需求分析
    1.项目需求与分析
    2.项目架构设计
    3.项目离线计算与实时计算数据流程设计
    4.大数据集群平台角色规划
    第3章:Zookeeper分布式集群安装部署
    1.Zookeeper生态圈
    2.Zookeeper系统架构原理
    3.Zookeeper如何提供服务
    4.Zookeeper集群规划
    5.Zookeeper集群时钟同步
    6.Zookeeper集群Hosts文件配置
    7.Zookeeper集群SSH免密码登录
    8.Zookeeper 集群配置文件编写
    9.集群分发脚本deploy.sh编写
    10.集群分发脚本deploy.sh调试
    11.集群远程命令执行脚本runRemoteCmd.sh编写
    12.Zookeeper集群JDK安装
    13.Zookeeper集群安装部署
    14.Zookeeper Shell测试运行
    第4章:Hadoop(CDH5)分布式集群安装部署
    1.HDFS概述
    2.HDFS体系结构
    3.HDFS高可用原理
    4.YARN概述
    5.YARN系统架构
    6.MapReduce ON YARN
    7.YARN高可用
    8.HDFS安装配置
    9.HDFS服务启动
    10.HDFS测试运行
    11.YARN安装配置
    12.YARN服务启动bug调试
    13.YARN测试运行
    14.Hadoop官网文档使用详解
    第5章:Eclipse与MapReduce集成开发
    1.MapReduce概述
    2.MapReduce编程模型
    3.Wordcount完整流程分析
    4.本地JDK安装配置
    5.Eclipse下载安装1
    6.Eclipse下载安装2
    7.本地maven安装配置
    8.Eclipse配置maven
    9.Eclipse构建maven项目
    10.Eclipse开发MapReduce程序1
    11.Eclipse开发MapReduce程序2
    12.Eclipse本地调试运行MapReduce1
    13.Eclipse本地调试运行MapReduce2
    14.Eclipse项目打包的两种方式
    15.MapReduce提交YARN集群运行
    16.maven管理多个MapReduce程序1
    17.maven管理多个MapReduce程序2
    18.MapReduce调试、打包及部署运行
    第6章:HBase分布式集群安装部署
    1.HBase数据模型
    2.HBase物理模型
    3.HBase系统架构组成
    4.HBase实际应用案例
    5.HBase安装前须知
    6.HBase集群规划
    7.HBase版本选择与下载解压
    8.HBase配置文件修改
    9.HBase启动运行
    10.HBase master切换访问
    11.HBase 数据库shell测试运行
    12.HBase业务建模
    第7章:Kafka1.x分布式集群安装部署
    1.Kafka定义及应用场景
    2.Kafka设计目标与特点
    3.Kafka系统架构组成
    4.Kafka拓扑结构
    5.Kafka版本兼容性与选择
    6.Kafka集群安装配置
    7.Kafka在Zookeeper元数据解读
    8.Kafka 新api测试运行
    9.Kafka旧api测试运行
    10.Kafka集群监控与kafkamonitor.sh脚本编写
    11.Kafka 监控可视化指标分析
    12.Kafka监控注意事项
    第8章:大数据项目数据采集1
    1.Flume概述
    2.Flume系统架构
    3.Flume安装部署1
    4.Flume安装部署2
    5.Flume集群构建-编写集群配置文件
    6.Flume集群构建-数据聚合测试运行
    7.大数据项目数据格式分析
    8.项目数据预处理
    9.KafkaSink配置详解
    10.编写Flume与Kafka集成配置文件
    11.Flume单节点与Kafka集成测试运行
    12.Flume集群与Kafka集成测试运行
    13.HBaseSink配置详解
    14.Flume与HBase最简集成1
    15.Flume与HBase最简集成2
    16.HBase模型建立及与Flume集成配置文件编写
    17.Eclipse导入Flume源码
    18.Eclipse导入Flume源码错误调试
    19.Flume HBaseSink源码分析
    20.Flume HBaseSink源码修改
    21.Flume HBaseSink增加调试代码
    22.Flume HBaseSink源码打包及配置文件修改
    23.Flume HBaseSink二次开发源码部署
    24.Flume与HBase集成项目测试运行
    25.大数据项目离线和实时数据流程详解
    第9章:大数据项目数据采集2
    1.Flume Channel选择器原理
    2.Flume Channel选择器配置详解
    3.Flume单节点集成Kafka与HBase配置编写
    4.Flume单节点集成Kafka与HBase测试运行
    5.Flume集群集成Kafka与HBase配置文件编写
    6.Flume集群集成Kafka与HBase测试运行
    7.应用服务器模拟程序开发
    8.模拟程序两种打包方式及脚本相关目录创建
    9.模拟程序服务器测试运行
    10.模拟程序shell脚本开发
    11.Flume聚合启动脚本开发
    12.Flume采集脚本开发
    13.Flume关闭脚本开发
    14.Kafka消费者shell脚本开发
    15.大数据集群服务启动1
    16.大数据集群启动2及测试准备工作
    17.数据产生、采集、存储联调1
    18.数据产生、采集、存储联调2
    19.数据产生、采集、存储联调3
    第10章:大数据项目离线分析
    1.Hive概述
    2.Hive体系结构及原理
    3.Hive Metastore安装方式及部署架构
    4.Mysql元数据库在线安装
    5.Hive安装配置
    6.Hive服务启动与测试
    7.Hive图像界面安装配置
    8.Hive Web服务启动运行
    9.HiveServer与Hive Server2区别与联系
    10.JDBC连接HiveServer2测试1
    11.JDBC连接HiveServer2测试2
    12.beeline连接HiveServer2测试运行
    13.编写Hive与HBase集成配置文件
    14.Hive创建外部业务表1
    15.Hive创建外部业务表2
    16.大数据项目数据分析测试
    17.大数据项目离线分析1
    18.大数据项目离线分析2
    第11章:大数据项目数据可视化
    1.Hue安装配置1
    2.Hue安装配置2
    3.Hue安装配置3
    4.Hue与HDFS集成开发
    5.Hue与YARN集成开发
    6.Hue与Hive集成开发
    7.Hue与HBase集成开发
    8.Hue与MySQL集成开发
    9.大数据项目可视化分析1
    10.大数据项目可视化分析2

     

    7项超值权益,保障学习质量」

    • 大咖讲解

    技术专家系统讲解传授编程思路与实战。

    • 答疑服务

    专属社群随时沟通与讲师答疑,扫清学习障碍,自学编程不再难。

    • 课程资料+课件

    超实用资料,覆盖核心知识,关键编程技能,方便练习巩固。(部分讲师考虑到版权问题,暂未上传附件,敬请谅解)

    • 常用开发实战

    企业常见开发实战案例,带你掌握Python在工作中的不同运用场景。

    • 大牛技术大会视频

    2019Python开发者大会视频免费观看,送你一个近距离感受互联网大佬的机会。

    • APP+PC随时随地学习

    满足不同场景,开发编程语言系统学习需求,不受空间、地域限制。

     

    「什么样的技术人适合学习?」

    • 想进入互联网技术行业,但是面对多门编程语言不知如何选择,0基础的你
    • 掌握开发、编程技术单一、冷门,迫切希望能够转型的你
    • 想进入大厂,但是编程经验不够丰富,没有竞争力,程序员找工作难。

     

    「悉心打造精品好课,26天学到大牛3年项目经验」

    【完善的技术体系】

    技术成长循序渐进,帮助用户轻松掌握

    掌握Hadoop知识,扎实编码能力

    【清晰的课程脉络】

    浓缩大牛多年经验,全方位构建出系统化的技术知识脉络,同时注重实战操作。

    【仿佛在大厂实习般的课程设计】

    课程内容全面提升技术能力,系统学习大厂技术方法论,可复用在日后工作中。

     

    「你可以收获什么?」

    掌握大数据离线项目常用技术组件

    掌握大数据离线项目开发流程

     

    展开全文
  • Hadoop项目实战:新闻离线项目分析

    千人学习 2019-01-13 21:50:13
    本课程为项目实战课,项目各个环节既深入讲解理论知识,又结合项目业务进行实操,从而达到一站式掌握大数据离线项目。
  • Hadoop项目实战之多类型输入 1.背景 在日常的需求中,可能遇到的问题是:如果MapReduce job的任务需要的数据源在不同的文件中怎么办?例如:需要从文本文件user.txt中得到一批数据;从另外一个文本文件transactions....

    Hadoop项目实战之多类型输入

    1.背景

    在日常的需求中,可能遇到的问题是:如果MapReduce job的任务需要的数据源在不同的文件中怎么办?例如:需要从文本文件user.txt中得到一批数据;从另外一个文本文件transactions.txt(可以不是文本,也可以是数据库等数据源)中得到另一批数据。分别将这两批数据作为mapper的数据源,那么该怎么实现呢?
    Hadoop也是一个成熟的项目了(和如下的FCB一样,也应该学会自己合并Mapper的输出了)。
    在这里插入图片描述

    很高兴,Hadoop 已经学会了自己合并mapper输出了。它有一个类MultipleInputs,表示的是:多类型输入,可以详见我的博客:MultipleInputs详解
    下面给出一个示例,用于演示如何使用Mulitple

    2.示例

    2.1 需求

    现在有两个文本文件,分别是user.txttransactions.txt,虽然都是文本文件,但是它们的内容格式并不相同。

    • users.txt中存储的信息格式是<user_id, location_id>
    • transactions.txt中存储的信息格式是<timestamp product user_id price number>
      先需要提取出 user_id 对应的location_idproduct_id。 例如,针对如下的数据:
    [root@server4 hadoop]# hdfs dfs -cat /input/users.txt
    u1 UT
    u1 GA
    u3 CA
    u3 CA
    u5 GA
    [root@server4 hadoop]# hdfs dfs -cat /input/transactions.txt
    t1 p3 u1 3 330
    t2 p1 u2 1 400
    t3 p1 u1 3 600
    t4 p2 u2 10 1000
    t5 p4 u4 9 90
    t6 p1 u1 4 120
    t7 p4 u1 8 160
    t8 p4 u5 2 40
    

    我们想要的结果是:

    [root@server4 hadoop]# hdfs dfs -cat /output/leftJoin/part-r-00000
    u1	UT,p4,p1,p1,p3,
    u2	GA,p2,p1,
    u3	CA,
    u4	CA,p4,
    u5	GA,p4,
    
    • 如果两者的文件类型,及格式都一样的话,那么我们就用不着使用MultipleInputs了,因为setMapInput()就可以解决。但是如果 文件类型和格式并不完全相同,那么就需要使用MultipleInputs类。

    3.代码

    3.1 LeftJoinDriver
    package data_algorithm.chapter_4;
    
    import data_algorithm.utils.HdfsUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    /**
     * 1.MapReduce 1的实现目标是:将所有的user_id相同的 user 信息,已经卖出的product信息放在一起,并且是先放user的信息,再放product的信息
     * 例如,对于数据记录:
     * user_id ,location
     * 1       ,shanghai
     *
     * user_id ,product_id
     * 1       ,disk
     * 1       ,mouse
     * 那么期望得到的结果是:
     * user_id,location, list[product_id]
     * 1      ,shanghai, [disk,mouse]
     *
     */
    public class LeftJoinDriver extends Configured implements Tool {
        public static void main(String[] args) throws Exception {
            if (args.length != 3) {
                System.exit(1);
            }
            HdfsUtils.deletePath(args[2]);
            int returnStatus = ToolRunner.run(new LeftJoinDriver(), args);
            System.exit(returnStatus);
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(LeftJoinDriver.class);
    
            Path users = new Path(args[0]);
            Path transactions = new Path(args[1]);
    
            //利用MultipleInputs类可以使用多个Mapper 类;但是如果只有单个Mapper类,那么只需要使用Job.setMapperClass()即可
            MultipleInputs.addInputPath(job,
                    users,
                    TextInputFormat.class,
                    UserMapper.class);
    
            MultipleInputs.addInputPath(job,
                    transactions,
                    TextInputFormat.class,
                    TransactionMapper.class);
    
            //set output file path
            FileOutputFormat.setOutputPath(job,new Path(args[2]));
    
            job.setMapOutputKeyClass(User.class);
            job.setMapOutputValueClass(Text.class);
            job.setPartitionerClass(UserPartitioner.class);
    
            job.setGroupingComparatorClass(SecondarySortGroupComparator.class);
            job.setReducerClass(UserReducer.class);
            //job.setNumReduceTasks(3);
            boolean status = job.waitForCompletion(true);
            return status? 0: 1;
        }
    }
    
    3.2 UserMapper
    package data_algorithm.chapter_4;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * 1.用于对用户信息的Mapper
     */
    public class UserMapper extends Mapper<LongWritable,Text,User,Text>{
    
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line [] = value.toString().split(" ");
            String user_id = line[0];
    
            String user_loc = line[1];
            //写入一个user对象 -> 其中product_name 字段为空
            User user = new User(user_id,1,user_loc,"empty");
    
            //输出的格式是:user user_loc
            context.write(user,new Text(user_loc));
            //System.out.println("UserMapper End...");
        }
    }
    
    
    3.3 TransactionMapper
    package data_algorithm.chapter_4;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * 1.用于对用户信息的Mapper
     */
    public class TransactionMapper extends Mapper<LongWritable,Text,User,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line [] = value.toString().split(" ");
            String user_prd = line[1];
            String user_id = line[2];
    
            //构建一个user对象,其中location为空
            User user = new User(user_id,2,"empty",user_prd);
            //输出的格式是 user user_prd
            context.write(user, new Text(user_prd));
        }
    }
    
    
    3.4 UserPartitioner
    package data_algorithm.chapter_4;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class UserPartitioner extends Partitioner<User,Text> {
            //这里的numPartitions 是什么值?在哪里传参?
            public int getPartition(User user, Text text, int numPartitions) {
                return Math.abs(user.getUser_id().hashCode() % numPartitions);
            }
    }
    
    3.5 SecondarySortGroupComparator
    package data_algorithm.chapter_4;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class SecondarySortGroupComparator extends WritableComparator {
    
        //这个构造参数一定要实现,否则会报错
        public SecondarySortGroupComparator() {
            super(User.class,true);
        }   
        @Override
        public int compare(WritableComparable wc1, WritableComparable wc2) {
            User user1 = (User)wc1;
            User user2 = (User)wc2;
    
            return user1.getUser_id().compareTo(user2.getUser_id());
        }
    }
    
    
    3.6 UserReducer
    package data_algorithm.chapter_4;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class UserReducer extends Reducer<User,Text,Text,Text> {
        @Override
        protected void reduce(User key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuilder sb = new StringBuilder();
            for (Text tx : values) {
                sb.append(tx.toString()) .append( ",");
            }
            System.out.println("key:"+key+",value: "+sb.toString());
            context.write(new Text(key.getUser_id()),new Text(sb.toString()));
        }
    }
    
    
    3.7 User
    package data_algorithm.chapter_4;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class User implements Writable, WritableComparable<User>{
        private String user_id;
        private int level;//the level of 1 or 2
        private String location;//user location
        private String product_name;//the product name of user buy
    
        //为什么需要添加无参的构造函数
        public User() {
        }
    
        public User(String user_id, int level, String location, String product_name) {
            this.user_id = user_id;
            this.level = level;
            this.location = location;
            this.product_name = product_name;
        }
    
        public String getUser_id() {
            return user_id;
        }
    
        public void setUser_id(String user_id) {
            this.user_id = user_id;
        }
    
        public String getLocation() {
            return location;
        }
    
        public void setLocation(String location) {
            this.location = location;
        }
    
        public String getProduct_name() {
            return product_name;
        }
    
        public void setProduct_name(String product_name) {
            this.product_name = product_name;
        }
    
        public int getLevel() {
            return level;
        }
    
        public void setLevel(int level) {
            this.level = level;
        }
    
        public int compareTo(User user) {
            int compareValue = this.getUser_id().compareTo(user.getUser_id());
            if (compareValue == 0) {
                return this.getLevel() - user.getLevel();  //先出现 location, 再出现 product
                //return user.getLevel() - this.getLevel(); //先出现product , 再出现 location
            }
            return compareValue;
        }
    
        public void write(DataOutput out) throws IOException {
            out.writeUTF(this.user_id);
            out.writeInt(this.level);
            out.writeUTF(this.location);
            out.writeUTF(this.product_name);
        }
    
        public void readFields(DataInput in) throws IOException {
            this.user_id = in.readUTF();
            this.level = in.readInt();
            this.location = in.readUTF();
            this.product_name = in.readUTF();
        }
    
        @Override
        public int hashCode() {
            int result = this.getUser_id() != null ? this.getUser_id().hashCode() : 0;
            //result = 31 * result + level ;
            //不能使用这个+ level 值的,否则会将相同user_id 的分到不同的区?
            if (this.getUser_id().equals("u1")) {
                System.out.println("u1");
                return 1;
            }
            if (this.getUser_id().equals("u2")) {
                System.out.println("u2");
                return 2;
            }
            return result;
        }
    
        @Override
        public String toString() {
            return this.getUser_id();
        }
    }
    

    4.运行结果

    [root@server4 hadoop]# hdfs dfs -cat /output/leftJoin/part-r-00000
    u1	UT,p4,p1,p1,p3,
    u2	GA,p2,p1,
    u3	CA,
    u4	CA,p4,
    u5	GA,p4,
    

    5.注意事项

    有如下几点需要注意的:

    • 如果是自定义的Key,Value,必须手动实现Writable接口,这个类是Hadoop中十分关键的类,用于序列化;例如上述的User类,就实现了Writable
    • 如果想让自定义的Key,Value用排序的功能,则必须实现WritableComparable接口,因为这个接口中有compareTo()方法,用于排序。

    6. 常见报错

    开发这个项目的过程中,遇见了如下几个问题:

    6.1 hadoop java.lang.RuntimeException: java.lang.NoSuchMethodException
    • 报错
    hadoop java.lang.RuntimeException: java.lang.NoSuchMethodException
    
    • 报错原因
      报这个错,是因为实现Writable的类缺少无参构造器。【实现 Writable 的类必须要有自己的实现类】
    6.2
    • 报错
    java.lang.RuntimeException: java.io.EOFException
    	at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:164) ~[hadoop-common-2.6.4.jar:na]
    	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:1265) ~[hadoop-mapreduce-client-core-2.6.4.jar:na]
    	at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:74) ~[hadoop-common-2.6.4.jar:na]
    	at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63) ~[hadoop-common-2.6.4.jar:na]
    	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1593) ~[hadoop-mapreduce-client-core-2.6.4.jar:na]
    	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1482) ~[hadoop-mapreduce-client-core-2.6.4.jar:na]
    	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:720) ~[hadoop-mapreduce-client-core-2.6.4.jar:na]
    	at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2012) [hadoop-mapreduce-client-core-2.6.4.jar:na]
    	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:794) [hadoop-mapreduce-client-core-2.6.4.jar:na]
    	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) [hadoop-mapreduce-client-core-2.6.4.jar:na]
    	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) [hadoop-mapreduce-client-common-2.6.5.jar:na]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_77]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]
    	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
    Caused by: java.io.EOFException: null
    	at java.io.DataInputStream.readFully(DataInputStream.java:197) ~[na:1.8.0_77]
    	at java.io.DataInputStream.readUTF(DataInputStream.java:609) ~[na:1.8.0_77]
    	at java.io.DataInputStream.readUTF(DataInputStream.java:564) ~[na:1.8.0_77]
    	at data_algorithm.chapter_4.User.readFields(User.java:79) ~[classes/:na]
    	at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:158) ~[hadoop-common-2.6.4.jar:na]
    	... 15 common frames omitted
    
    • 报错原因
      在实现WritableComparator接口的类中,丢失了一个构造器,导致无法继续运行。
    public class SecondarySortGroupComparator extends WritableComparator {
        
        //这个构造参数一定要实现,否则会报错
        public SecondarySortGroupComparator() {
            super(User.class,true);
        }
    	····
    }
    

    7.

    在整个开发过程中,得到的结果也并非是一帆风顺的。如果将SecondarySortGroupComparator类中的compare()方法改成如下的样子:

        @Override
        public int compare(WritableComparable wc1, WritableComparable wc2) {        
            return 1;
        }
    

    那么得到的结果就是:

    [root@server4 hadoop]# hdfs dfs -cat hdfs://server4:9000/output/leftJoin/part-r-00000
    u1	UT,
    u2	GA,
    u3	CA,
    u4	CA,
    u5	GA,
    u2	p2,
    u2	p1,
    u4	p4,
    u1	p4,
    u1	p1,
    u1	p1,
    u1	p3,
    u5	p4,
    

    这是因为每个 user id 都作为了一组。所以即使是 user_id 相同的数据也没能放在一起;同理如果将其方法修改成如下的样子:

        @Override
        public int compare(WritableComparable wc1, WritableComparable wc2) {        
            return 0;
        }
    

    得到的输出结果如下:

    [root@server4 mapreduce]# hdfs dfs -cat  /output/leftJoin/part-r-00000
    u5	UT,GA,CA,CA,GA,p2,p1,p4,p4,p1,p1,p3,p4,
    [root@server4 mapreduce]# 
    

    可以看到这里的结果是将所有的 user_id 都分在了一组,导致输出的时候只有一个user_id,然后一堆value
    compareTo()方法正确的形式应该是:

    public int compare(WritableComparable wc1, WritableComparable wc2) {
            User user1 = (User)wc1;
            User user2 = (User)wc2;
            return user1.getUser_id().compareTo(user2.getUser_id());
        }
    

    这个代表的意思是:使用 user_id 作为比较项,如果 user_id相同,则分成一组,否则是不同组。如果是同组的,那么在最后reduce中处理的时候,就能够得到<key,list<value>>这种形式。

    展开全文
  • Hadoop项目实战-用户行为分析之编码实践 1.概述  本课程的视频教程地址:《用户行为分析之编码实践》  本课程以用户行为分析案例为基础,带着大家去完成对各个KPI的编码工作,以及应用调度工作,让...

    1.概述

      本课程的视频教程地址:《用户行为分析之编码实践

      本课程以用户行为分析案例为基础,带着大家去完成对各个KPI的编码工作,以及应用调度工作,让大家通过本课程掌握Hadoop项目的编码、调度流程。下面我们来看看本课程有哪些课时,如下图所示:

      首先,我们来学习第一课时:《Hadoop项目基础代码》。

    2.内容

    2.1 Hadoop项目基础代码

      本课时介绍编写Hadoop基础代码及脚本,在前面搭建好的Hadoop项目工程上, 完成项目的基本代码的编写,以及一些注意事项,为编写核心代码做准备,让大家掌握Hadoop项目的基础代码开发。

      本课时主要包含以下知识点,如下图所示:

      下面,我为大家介绍Hadoop项目工程的基本信息配置,由于本课程属于编码实践,所以设计到编码的相关流程。 所以大家在学习的时候,请事先将集群启动,IDE打开。下面我给大家演示如下内容,他们分别是:

    • 项目工程的相关配置信息(pom文件的依赖配置,log日志信息的配置)
    • 集群信息的相关配置(连接集群节点路径信息) 

      基础代码实现包含的内容,如下图所示:

      具体演示细节,大家可以观看视频,这里我就不多做赘述了。《观看地址

    2.2 Hadoop项目核心地址实现

      本课时介绍如何去实现Hadoop的核心代码模块, 在基础代码模块上,完成核心代码的实现,让大家掌握项目相关指标的统计开发。

      下面我们来看看本课时有那些知识点,如下图所示:

      下面我们来看看离线结果统计的处理方式有哪些,这里,我用一个图来说明,在离线统计中的统计方式,如下图所示: 

      这里,从图中我们可以看出,我们可以使用编写Hive脚本或Hive应用程序来统计, 也可以编写MapReduce程序来完成统计,也可以组合使用,这里,本课程的案例, 我使用的是组合使用,用Hive和MapReduce组合来完成。

      接着来看核心代码实现的内容,如下图所示:

      脚本如下所示:

    #创建分区
    CREATE EXTERNAL TABLE ubas(ip string, timespan string, url string,hour string)PARTITIONED BY (logdate string) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/home/hdfs/ubas/out/meta'

      统计的KPI脚本,如下所示:

    复制代码
    # clean hdfs data and output 
    /home/hadoop/hadoop-2.6.0/bin/hadoop jar ubas-1.0.0-jar-with-dependencies.jar $1
    
    # use hive to stats
    
    ## 1.location data to partition
    /home/hadoop/hive-0.14.0-bin/bin/hive -e "ALTER TABLE ubas ADD PARTITION(logdate='$1') LOCATION '/home/hdfs/ubas/out/meta/$1';"
    
    ## 2.stats pv
    /home/hadoop/hive-0.14.0-bin/bin/hive -e "CREATE TABLE pv_$1 AS SELECT COUNT(1) AS PV FROM ubas WHERE logdate='$1';"
    
    ## 3.stats ip
    /home/hadoop/hive-0.14.0-bin/bin/hive -e "CREATE TABLE ip_$1 AS SELECT COUNT(DISTINCT ip) AS IP FROM ubas WHERE logdate='$1';"
    
    ## 4.stats amount hour
    /home/hadoop/hive-0.14.0-bin/bin/hive -e "CREATE TABLE amount_$1 ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' AS SELECT '$1',hour AS HOUR_TAG, COUNT(hour) AS HOUR,'' AS UPDATE_DATE FROM ubas WHERE logdate='$1' GROUP BY hour;"
    
    ## 5.stats jr
    /home/hadoop/hive-0.14.0-bin/bin/hive -e "CREATE TABLE jr_$1 AS SELECT COUNT(1) AS JR FROM (SELECT COUNT(ip) AS times FROM ubas WHERE logdate='$1' GROUP BY ip HAVING times=1) e;"
    
    ## 6.combine pv,ip,jr and tr to ubas table
    /home/hadoop/hive-0.14.0-bin/bin/hive -e "CREATE TABLE ubas_$1 ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' AS SELECT '$1', a.pv, b.ip, c.jr, ROUND(COALESCE(CAST(b.ip AS DOUBLE), 0)/a.pv, 2),'' AS UPDATE_DATE FROM pv_$1 a JOIN ip_$1 b ON 1=1 JOIN jr_$1 c ON 1=1 ;"
    
    # sqoop data to mysql
    
    ## 1.sqoop t_kpi_day
    /home/hadoop/sqoop-1.4.5/bin/sqoop export -D sqoop.export.records.per.statement=100 --connect jdbc:mysql://10.211.55.26:3306/hadoop_ubas --username root --password root --table t_kpi_day --fields-terminated-by ',' --export-dir "/home/hive/warehouse/ubas_$1" --batch --update-key createdate --update-mode allowinsert;
    
    ## 2.sqoop t_kpi_hour
    /home/hadoop/sqoop-1.4.5/bin/sqoop export -D sqoop.export.records.per.statement=100 --connect jdbc:mysql://10.211.55.26:3306/hadoop_ubas --username root --password root --table t_kpi_hour --fields-terminated-by ',' --export-dir "/home/hive/warehouse/amount_$1" --batch --update-key createdate,kpi_code --update-mode allowinsert;
    
    # drop tmp table to hive warehouse
    /home/hadoop/hive-0.14.0-bin/bin/hive -e "drop table amount_$1;drop table ip_$1;drop table jr_$1;drop table pv_$1;drop table ubas_$1;"
    复制代码

    2.3 统计结果处理

      本课时介绍将统计好的数据导出到关系型数据库,以及对外提供数据共享接口,让大家掌握导出数据的流程及共享接口程序的编写。

       本课时主要有一下知识点,如下图所示:

      下面我们来看看使用 Sqoop 如何将 HDFS 上的统计结果导出到 MySQL 数据库, 接下来,我们来看看 Sqoop 的导出流程,如下图所示:

      首先,我们是将统计结果存放在 HDFS 集群上,然后我们使用 Sqoop 工具去将 HDFS 的数据导出到关系型数据库,如 MySQL 整个基本流程就是这样。下面我们来使用 Sqoop 工具对HDFS 上的数据进行导出,同样,在使用导出功能时,这样大家需要 安装 Sqoop 工具,Sqoop 的安装较为简单,大家可以下去补充学习下,这里就不多做赘述了。

      接下来,我们来看看数据共享流程,如下图所示:

      从图中我们可以看出,我们将统计后的结果存放在mysql数据库中,这时我们需要编写一个rpc将数据共享出去,这里我采用的共享方式是, 编写一个thrift的服务接口,将数据通过这个接口共享出去,然后,前端同学获取数据后,可以将数据结果以图表的方式进行展示。

      Thrift接口代码,如下所示:

    • Thrift接口文件
    namespace java cn.jikexueyuan.ubas.service
    
    service UBASService {
        map<string, double> queryDayKPI(1:string beginDate,2:string endDate),
        map<double, double> queryHourKPI(1:string beginDate,2:string endDate)
    }
    • Server模块代码
    复制代码
    package cn.jikexueyuan.ubas.main;
    
    import org.apache.thrift.TProcessorFactory;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.server.THsHaServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import cn.jikexueyuan.ubas.service.UBASService;
    import cn.jikexueyuan.ubas.service.impl.UBASServiceImpl;
    
    /**
     * @Date Mar 23, 2015
     *
     * @Author dengjie
     */
    public class StatsServer {
    
        private static Logger logger = LoggerFactory.getLogger(StatsServer.class);
    
        private final int PORT = 9090;
    
        @SuppressWarnings({ "rawtypes", "unchecked" })
        private void start() {
            try {
                TNonblockingServerSocket socket = new TNonblockingServerSocket(PORT);
                final UBASService.Processor processor = new UBASService.Processor(new UBASServiceImpl());
                THsHaServer.Args arg = new THsHaServer.Args(socket);
                /*
                 * Binary coded format efficient, intensive data transmission, The
                 * use of non blocking mode of transmission, according to the size
                 * of the block, similar to the Java of NIO
                 */
                arg.protocolFactory(new TCompactProtocol.Factory());
                arg.transportFactory(new TFramedTransport.Factory());
                arg.processorFactory(new TProcessorFactory(processor));
                TServer server = new THsHaServer(arg);
                server.serve();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            try {
                logger.info("start thrift server...");
                StatsServer stats = new StatsServer();
                stats.start();
            } catch (Exception ex) {
                ex.printStackTrace();
                logger.error(String.format("run thrift server has error,msg is %s", ex.getMessage()));
            }
        }
    
    }
    复制代码

    2.4 应用调度

      本课时介绍将开发的Hadoop应用打包部署到服务器,配置并完成应用调度, 让大家掌握Hadoop项目的打包和部署及调度流程。

      本课时主要包含一下知识点,如下图所示:

      下面,我们来看看项目打包插件的使用,首先打包的内容,下面我们来看一张图,如下图所示:

      关于使用Crontab进行定时调度,详情大家可以观看视频教程,这里我就不多做赘述了。《观看地址

      本课程我们对项目的指标进行了编码实践,并指导大家去编码实现相应的模块功能,以及帮助大家去调度我们开发的应用等知识,应该掌握一下知识,如下图所示:

    3.总结

      我们在有了这些知识作为基础,会使得我们在今后的工作中,开发类似的Hadoop项目变得游刃有余,更加的得心应手。

    4.结束语

      这就是本课程的主要内容,主要就对Hadoop项目做相应的编码实践,完成各个KPI的开发模块。

      如果本教程能帮助到您,希望您能点击进去观看一下,谢谢您的支持!

    posted on 2017-05-09 15:29 赵大海 阅读(...) 评论(...) 编辑 收藏

    转载于:https://www.cnblogs.com/zhaodahai/p/6830797.html

    展开全文
  • Hadoop项目实战-用户行为分析之分析与设计 http://www.cnblogs.com/smartloli/p/4569882.html 1.概述  本课程的视频教程地址:《用户行为分析之分析与设计》  下面开始本教程的学习,本教程以用户行为分析案例...
  • Hadoop项目实战-用户行为分析之分析与设计 http://www.cnblogs.com/smartloli/p/4569882.html 1.概述  本课程的视频教程地址:《用户行为分析之分析与设计》  下面开始本教程的学习,本教程以用户行为分析...
  • 大数据日志分析Hadoop项目实战

    千次阅读 2019-03-27 15:53:59
    教程目录0x00 教程内容0x01 大数据日志分析系统简介1. 需求2. 背景及架构0x02 UserAgentParser1.... 安装对应的jar包到本地Maven仓库0x03 项目实战1. 构建项目2. 引入依赖3. 编写测试代码4. 编写实战代码0x03 ...
  • Hadoop大数据解决方案进阶应用 Hadoop 讲师迪伦北风网版权所有 YARN支持的计算框架(3) Storm On YARN服务 Apache Tez介绍 Tez实现原理 课程目标 YARN-Storm ApplicationMaster Storm ApplicationMaster初始化时将在...
  • Hadoop大数据解决方案进阶应用 Hadoop 讲师迪伦北风网版权所有 Mahout数据挖掘工具 (1) Mahout简介 数据挖掘基础 课程目标 Mahout 数据金字塔 数据统计分析 数据挖掘 数据挖掘是从大量数据中提取或挖掘知识 数据挖掘...
  • CDH4.1实战系列 讲师Cloudy北风网版权所有) 1Cloudera Manager安装和环境熟悉 软件版本介绍 CentOS 版本要求 v5.7 - v6.2 之间 64位系统 本课用CentOS-6.0-x86_64 版本 Cloudera Manager 4.1 JDK 1.6 Cloudera ...
  • 【快速入门大数据】Hadoop项目实战-用户行为日志

    多人点赞 热门讨论 2021-01-18 22:32:04
    文章目录 用户日志 用处 日志生成渠道 日志内容 意义 离线数据处理架构 分析日志 引入解析UserAgent 单体实现 hadoop-MapReduce实现 效果图 总结 用户日志 用处 分析行为 推荐 日志生成渠道 服务端Ngnix统计 前端...
  • Hadoop之MapReduce详解【待更新】 一个MapReduce作业的运行周期是: (1)先在client端被提交到JobTracker上 (2)然后由JobTracker将作业分解成若干个Task,并将这些Task进行调度和监控,以保障这些程序...
  • * *

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,011
精华内容 404
关键字:

hadoop项目实战