2017-03-25 16:59:39 u012318074 阅读数 1365
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35084 人正在学习 去看看 张长志

本文为《Spark大型电商项目实战》 系列文章之一,主要介绍session随机抽取模块中的获取抽取session明细数据的实现过程。

获取session明细数据

在包com.erik.sparkproject.spark.session下的类UserVisitSessionAnalyzeSpark.java中,
JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);下方添加:

        JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);

然后将
randomExtractSession(task.getTaskid(),filteredSessionid2AggrInfoRDD);
改为
randomExtractSession(task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2actionRDD);

接着添加抽取session明细的方法

/**
         * 第四步:获取抽取出来的session的明细数据
         */
        JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD = 
                extractSessionidsRDD.join(sessionid2actionRDD);
        extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<String, Row>>>() {


            private static final long serialVersionUID = 1L;

            public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
                //在包com.erik.sparkproject.domain中新建SessionDetail.java
                //在包com.erik.sparkproject.dao中新建ISessionDetailDAO.java接口
                //在包com.erik.sparkproject.impl中新建SessionDetailDAOImpl.java
                //在DAOFactory.java中添加
                //public static ISessionDetailDAO getSessionDetailDAO() {
                //return new SessionDetailDAOImpl();
                //}
                Row row = tuple._2._2;

                //封装sessionDetail的domain
                SessionDetail sessionDetail = new SessionDetail();
                sessionDetail.setTaskid(taskid);
                sessionDetail.setUserid(row.getLong(1));
                sessionDetail.setSessionid(row.getString(2));
                sessionDetail.setPageid(row.getLong(3));
                sessionDetail.setActionTime(row.getString(4));
                sessionDetail.setSearchKeyword(row.getString(5));
                sessionDetail.setClickCategoryId(row.getLong(6));
                sessionDetail.setClickProductId(row.getLong(7));
                sessionDetail.setOrderCategoryIds(row.getString(8));
                sessionDetail.setOrderProductIds(row.getString(9));
                sessionDetail.setPayCategoryIds(row.getString(10));
                sessionDetail.setPayProductIds(row.getString(11));

                ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
                sessionDetailDAO.insert(sessionDetail);
            }

        });

这里需要将

private static void randomExtractSession(
            final long taskid,
            JavaPairRDD<String, String> sessionid2AggrInfoRDD)

改为

private static void randomExtractSession(
            final long taskid,
            JavaPairRDD<String, String> sessionid2AggrInfoRDD,
            JavaPairRDD<String, Row> sessionid2actionRDD)

创建Domin

在包com.erik.sparkproject.domain中新建SessionDetail.java

package com.erik.sparkproject.domain;

/**
 * session明细
 * @author Erik
 *
 */
public class SessionDetail {
    private long taskid;
    private long userid;
    private String sessionid;
    private long pageid;
    private String actionTime;
    private String searchKeyword;
    private long clickCategoryId;
    private long clickProductId;
    private String orderCategoryIds;
    private String orderProductIds;
    private String payCategoryIds;
    private String payProductIds;

    public long getTaskid() {
        return taskid;
    }
    public void setTaskid(long taskid) {
        this.taskid = taskid;
    }
    public long getUserid() {
        return userid;
    }
    public void setUserid(long userid) {
        this.userid = userid;
    }
    public String getSessionid() {
        return sessionid;
    }
    public void setSessionid(String sessionid) {
        this.sessionid = sessionid;
    }
    public long getPageid() {
        return pageid;
    }
    public void setPageid(long pageid) {
        this.pageid = pageid;
    }
    public String getActionTime() {
        return actionTime;
    }
    public void setActionTime(String actionTime) {
        this.actionTime = actionTime;
    }
    public String getSearchKeyword() {
        return searchKeyword;
    }
    public void setSearchKeyword(String searchKeyword) {
        this.searchKeyword = searchKeyword;
    }
    public long getClickCategoryId() {
        return clickCategoryId;
    }
    public void setClickCategoryId(long clickCategoryId) {
        this.clickCategoryId = clickCategoryId;
    }
    public long getClickProductId() {
        return clickProductId;
    }
    public void setClickProductId(long clickProductId) {
        this.clickProductId = clickProductId;
    }
    public String getOrderCategoryIds() {
        return orderCategoryIds;
    }
    public void setOrderCategoryIds(String orderCategoryIds) {
        this.orderCategoryIds = orderCategoryIds;
    }
    public String getOrderProductIds() {
        return orderProductIds;
    }
    public void setOrderProductIds(String orderProductIds) {
        this.orderProductIds = orderProductIds;
    }
    public String getPayCategoryIds() {
        return payCategoryIds;
    }
    public void setPayCategoryIds(String payCategoryIds) {
        this.payCategoryIds = payCategoryIds;
    }
    public String getPayProductIds() {
        return payProductIds;
    }
    public void setPayProductIds(String payProductIds) {
        this.payProductIds = payProductIds;
    }
}

创建DAO

在包com.erik.sparkproject.dao中新建ISessionDetailDAO.java接口

package com.erik.sparkproject.dao;

import com.erik.sparkproject.domain.SessionDetail;

/**
 * session明细接口
 * @author Erik
 *
 */
public interface ISessionDetailDAO {

    /**
     * 插入一条session明细数据
     * @param sessionDetail
     */
    void insert(SessionDetail sessionDetail);

}

创建Impl

在包com.erik.sparkproject.impl中新建SessionDetailDAOImpl.java

package com.erik.sparkproject.impl;

import com.erik.sparkproject.dao.ISessionDetailDAO;
import com.erik.sparkproject.domain.SessionDetail;
import com.erik.sparkproject.jdbc.JDBCHelper;

/**
 * session明细DAO实现类
 * @author Erik
 *
 */
public class SessionDetailDAOImpl implements ISessionDetailDAO{

    public void insert(SessionDetail sessionDetail) {
        String sql = "insert into session_detail value(?,?,?,?,?,?,?,?,?,?,?,?)";
        Object[] params = new Object[] {
                sessionDetail.getTaskid(),
                sessionDetail.getUserid(),
                sessionDetail.getSessionid(),
                sessionDetail.getPageid(),
                sessionDetail.getActionTime(),
                sessionDetail.getSearchKeyword(),
                sessionDetail.getClickCategoryId(),
                sessionDetail.getClickProductId(),
                sessionDetail.getOrderCategoryIds(),
                sessionDetail.getOrderProductIds(),
                sessionDetail.getPayCategoryIds(),
                sessionDetail.getPayProductIds()};

        JDBCHelper jdbcHelper = JDBCHelper.getInstance();
        jdbcHelper.executeUpdate(sql, params);

    }

}

添加DAOFactory

在DAOFactory.java中添加

public static ISessionDetailDAO getSessionDetailDAO() {
        return new SessionDetailDAOImpl();
    }           

《Spark 大型电商项目实战》源码:https://github.com/Erik-ly/SprakProject

本文为《Spark大型电商项目实战》系列文章之一。
更多文章:Spark大型电商项目实战:http://blog.csdn.net/u012318074/article/category/6744423

2019-03-21 17:10:56 someby 阅读数 92
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35084 人正在学习 去看看 张长志

目录

分析

实现

完整代码


本篇文章将介绍用户访问session分析-session随机抽取之按时间比例随机抽取算法实现。

分析

1.处理的数据格式 <yyyy-MM-dd_HH,count>,需要将数据格式转化为<yyyy-MM-dd,<HH,count>>。

2.计算100个session在需要抽取的天数中平均抽取的个数n。

3.在每天的session中根据每个小时所占比例随机抽取各个时间段抽取这个n个session。

实现

1.将<yyyy-MM-dd_HH,count>格式的map,转换为<yyyy-MM-dd,<HH,count>>

      //将<yyyy-MM-dd_HH,count>格式的map,转换为<yyyy-MM-dd,<HH,count>>
        Map<String,Map<String,Long>> dateHourCountMap = new HashMap<String,Map<String,Long>>();
        for (Map.Entry<String, Long> countEntry : countMap.entrySet()){
            String dateHour = countEntry.getKey();
            String date = dateHour.split("_")[0];
            String hour = dateHour.split("_")[1];
            long count = countEntry.getValue();

            Map<String,Long> hourCountMap = dateHourCountMap.get(date);
            if (hourCountMap ==null){
                hourCountMap = new HashMap<String,Long>();
                dateHourCountMap.put(date,hourCountMap);
            }
            dateHourCountMap.put(date,hourCountMap);
        }

2.总共要抽取100个session,按照天数,进行平分

int extractNumberPerDay = 100 /dateHourCountMap.size();

3.在每天的session中根据每个小时所占比例随机抽取各个时间段抽取这个n个session。

        //<date,<hour,(1,3,4,2103)>>
        Map<String,Map<String, List<Integer>>> dateHourExtractMap =
                new HashMap<String,Map<String,List<Integer>>>();

        Random random = new Random();

        for (Map.Entry<String,Map<String,Long>> dateHourCountEntry : dateHourCountMap.entrySet()){
            String date = dateHourCountEntry.getKey();
            Map<String,Long> hourCountMap = dateHourCountEntry.getValue();

            //计算出每天的session总数
            long sessionCount = 0L;
            for (long hourCount : hourCountMap.values()){
                sessionCount += hourCount;
            }

            Map<String,List<Integer>> hourExtractMap  = dateHourExtractMap.get(date);
            if (hourExtractMap == null){
                hourExtractMap = new HashMap<String,List<Integer>>();
                dateHourExtractMap.put(date,hourExtractMap);
            }

            //遍历每一个小时
            for (Map.Entry<String,Long> hourCountEntry : hourCountMap.entrySet()){
                String hour = hourCountEntry.getKey();
                long count = hourCountEntry.getValue();

                // 计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量
                // 就可以计算出,当前小时需要抽取的session数量
                int hourExtractNumber = (int)((double)count/(double) sessionCount)*extractNumberPerDay;
                if (hourExtractNumber > count){
                    hourExtractNumber = (int)count;
                }

                //先获取当前小时的存放随机数的list
                List<Integer> extractIndexList = hourExtractMap.get(hour);
                if (extractIndexList == null){
                    extractIndexList = new ArrayList<Integer>();
                    hourExtractMap.put(hour,extractIndexList);
                }

                //生成上面计算出来的数量的随机数
                for (int i = 0; i < hourExtractNumber;i++){
                    int extractIndex = random.nextInt((int)count);
                    while (extractIndexList.contains(extractIndex)){
                        extractIndex = random.nextInt((int)count);
                    }
                    extractIndexList.add(extractIndex);
                }

            }


        }

完整代码

UserVisitSessionAnalyzeSpark.java
   /**
     * 随机抽取session
     * @param sessionid2AggrInfoRDD
     */
    private static void randomExtractSession(
            JavaPairRDD<String, String> sessionid2AggrInfoRDD) {
        // 第一步,计算出每天每小时的session数量,获取<yyyy-MM-dd_HH,sessionid>格式的RDD
        JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(

                new PairFunction<Tuple2<String,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<String, String> tuple) throws Exception {
                        String aggrInfo = tuple._2;

                        String startTime = StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.FIELD_START_TIME);
                        String dateHour = DateUtils.getDateHour(startTime);

                        return new Tuple2<String, String>(dateHour, aggrInfo);
                    }

                });



        /**
         * 思考一下:这里我们不要着急写大量的代码,做项目的时候,一定要用脑子多思考
         *
         * 每天每小时的session数量,然后计算出每天每小时的session抽取索引,遍历每天每小时session
         * 首先抽取出的session的聚合数据,写入session_random_extract表
         * 所以第一个RDD的value,应该是session聚合数据
         *
         */

        // 得到每天每小时的session数量
        Map<String, Long> countMap =  time2sessionidRDD.countByKey();

        //第二步,使用按时间比例随机抽取算法,计算出每天每小时需要抽取session的索引

        //将<yyyy-MM-dd_HH,count>格式的map,转换为<yyyy-MM-dd,<HH,count>>
        Map<String,Map<String,Long>> dateHourCountMap = new HashMap<String,Map<String,Long>>();
        for (Map.Entry<String, Long> countEntry : countMap.entrySet()){
            String dateHour = countEntry.getKey();
            String date = dateHour.split("_")[0];
            String hour = dateHour.split("_")[1];
            long count = countEntry.getValue();

            Map<String,Long> hourCountMap = dateHourCountMap.get(date);
            if (hourCountMap ==null){
                hourCountMap = new HashMap<String,Long>();
                dateHourCountMap.put(date,hourCountMap);
            }
            dateHourCountMap.put(date,hourCountMap);
        }



        //开始实现按时间比例随机抽取算法

        //总共要抽取100个session,按照天数,进行平分
        int extractNumberPerDay = 100 /dateHourCountMap.size();

        //<date,<hour,(1,3,4,2103)>>
        Map<String,Map<String, List<Integer>>> dateHourExtractMap =
                new HashMap<String,Map<String,List<Integer>>>();

        Random random = new Random();

        for (Map.Entry<String,Map<String,Long>> dateHourCountEntry : dateHourCountMap.entrySet()){
            String date = dateHourCountEntry.getKey();
            Map<String,Long> hourCountMap = dateHourCountEntry.getValue();

            //计算出每天的session总数
            long sessionCount = 0L;
            for (long hourCount : hourCountMap.values()){
                sessionCount += hourCount;
            }

            Map<String,List<Integer>> hourExtractMap  = dateHourExtractMap.get(date);
            if (hourExtractMap == null){
                hourExtractMap = new HashMap<String,List<Integer>>();
                dateHourExtractMap.put(date,hourExtractMap);
            }

            //遍历每一个小时
            for (Map.Entry<String,Long> hourCountEntry : hourCountMap.entrySet()){
                String hour = hourCountEntry.getKey();
                long count = hourCountEntry.getValue();

                // 计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量
                // 就可以计算出,当前小时需要抽取的session数量
                int hourExtractNumber = (int)((double)count/(double) sessionCount)*extractNumberPerDay;
                if (hourExtractNumber > count){
                    hourExtractNumber = (int)count;
                }

                //先获取当前小时的存放随机数的list
                List<Integer> extractIndexList = hourExtractMap.get(hour);
                if (extractIndexList == null){
                    extractIndexList = new ArrayList<Integer>();
                    hourExtractMap.put(hour,extractIndexList);
                }

                //生成上面计算出来的数量的随机数
                for (int i = 0; i < hourExtractNumber;i++){
                    int extractIndex = random.nextInt((int)count);
                    while (extractIndexList.contains(extractIndex)){
                        extractIndex = random.nextInt((int)count);
                    }
                    extractIndexList.add(extractIndex);
                }

            }


        }

    }

 

2019-03-21 22:10:04 someby 阅读数 63
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35084 人正在学习 去看看 张长志

目录

SessionDetail表实例化

SessionDetail.java

ISessionDetailDAO.java

UserVisitSessionAnalyzeSpark.java

UserVisitSessionAnalyzeSpark.java完整代码

 


本篇文章将记录用户访问session分析-session随机抽取之获取抽取session的明细数据。

SessionDetail表实例化

domain

SessionDetail.java

package graduation.java.domain;

/**
 * FileName: SessionDetail
 * Author:   hadoop
 * Email:    3165845957@qq.com
 * Date:     19-3-21 下午9:31
 * Description:
 * session_detail表实体类
 */
public class SessionDetail {
    private long taskid;
    private long userid;
    private String sessionid;
    private long pageid;
    private String actionTime;
    private String seachKeyWord;
    private long clickCategoryId;
    private long clickProductId;
    private String orderCategoryIds;
    private String orderProductIds;
    private String payCategoryIds;
    private String payProductIds;

    public long getTaskid() {
        return taskid;
    }

    public void setTaskid(long taskid) {
        this.taskid = taskid;
    }

    public long getUserid() {
        return userid;
    }

    public void setUserid(long userid) {
        this.userid = userid;
    }

    public String getSessionid() {
        return sessionid;
    }

    public void setSessionid(String sessionid) {
        this.sessionid = sessionid;
    }

    public long getPageid() {
        return pageid;
    }

    public void setPageid(long pageid) {
        this.pageid = pageid;
    }

    public String getActionTime() {
        return actionTime;
    }

    public void setActionTime(String actionTime) {
        this.actionTime = actionTime;
    }

    public String getSeachKeyWord() {
        return seachKeyWord;
    }

    public void setSeachKeyWord(String seachKeyWord) {
        this.seachKeyWord = seachKeyWord;
    }

    public long getClickCategoryId() {
        return clickCategoryId;
    }

    public void setClickCategoryId(long clickCategoryId) {
        this.clickCategoryId = clickCategoryId;
    }

    public long getClickProductId() {
        return clickProductId;
    }

    public void setClickProductId(long clickProductId) {
        this.clickProductId = clickProductId;
    }

    public String getOrderCategoryIds() {
        return orderCategoryIds;
    }

    public void setOrderCategoryIds(String orderCategoryIds) {
        this.orderCategoryIds = orderCategoryIds;
    }

    public String getOrderProductIds() {
        return orderProductIds;
    }

    public void setOrderProductIds(String orderProductIds) {
        this.orderProductIds = orderProductIds;
    }

    public String getPayCategoryIds() {
        return payCategoryIds;
    }

    public void setPayCategoryIds(String payCategoryIds) {
        this.payCategoryIds = payCategoryIds;
    }

    public String getPayProductIds() {
        return payProductIds;
    }

    public void setPayProductIds(String payProductIds) {
        this.payProductIds = payProductIds;
    }

    @Override
    public String toString() {
        return "SessionDetail{" +
                "taskid=" + taskid +
                ", userid=" + userid +
                ", sessionid='" + sessionid + '\'' +
                ", pageid=" + pageid +
                ", actionTime='" + actionTime + '\'' +
                ", seachKeyWord='" + seachKeyWord + '\'' +
                ", clickCategoryId=" + clickCategoryId +
                ", clickProductId=" + clickProductId +
                ", orderCategoryIds='" + orderCategoryIds + '\'' +
                ", orderProductIds='" + orderProductIds + '\'' +
                ", payCategoryIds='" + payCategoryIds + '\'' +
                ", payProductIds='" + payProductIds + '\'' +
                '}';
    }
}

dao

ISessionDetailDAO.java

package graduation.java.dao;

import graduation.java.domain.SessionDetail;

/**
 * FileName: ISessionDetailDAO
 * Author:   hadoop
 * Email:    3165845957@qq.com
 * Date:     19-3-21 下午9:38
 * Description:
 * Session明细DAO接口
 */
public interface ISessionDetailDAO {
    /**
     * 出入一条session明细
     * @param sessionDetail
     */
    void insert(SessionDetail sessionDetail);

}

impl

SessionDetailImpl.java

package graduation.java.impl;

import graduation.java.dao.ISessionDetailDAO;
import graduation.java.domain.SessionDetail;
import graduation.java.jdbc.JDBCHelper;

/**
 * FileName: SessionDetailImpl
 * Author:   hadoop
 * Email:    3165845957@qq.com
 * Date:     19-3-21 下午9:39
 * Description:
 * session明细DAO实现类
 */
public class SessionDetailImpl implements ISessionDetailDAO {
    /**
     * 插入一条session明细
     * @param sessionDetail
     */
    public void insert(SessionDetail sessionDetail){
        String sql = "insert into session_detail values(?,?,?,?,?,?,?,?,?,?,?,?)";
        Object[] param = new Object[]{
                sessionDetail.getTaskid(),
                sessionDetail.getSessionid(),
                sessionDetail.getPageid(),
                sessionDetail.getActionTime(),
                sessionDetail.getSeachKeyWord(),
                sessionDetail.getClickCategoryId(),
                sessionDetail.getClickProductId(),
                sessionDetail.getOrderCategoryIds(),
                sessionDetail.getOrderProductIds(),
                sessionDetail.getPayCategoryIds(),
                sessionDetail.getPayProductIds()
        };
        JDBCHelper jdbcHelper = JDBCHelper.getInstance();
        jdbcHelper.executeUpdate(sql,param);
    }
}

spark

UserVisitSessionAnalyzeSpark.java

(加入的内容)

   // 如果要进行session粒度的数据聚合
        // 首先要从user_visit_action表中,查询出来指定日期范围内的行为数据
        JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext,taskParam);


        JavaPairRDD<String,Row> session2actionRDD = getSessionid2ActionRDD(actionRDD);
   randomExtractSession(task.getTaskId(),filteredSessionid2AggrInfoRDD,session2actionRDD);
 /**
     * 获取sessionid到访问行为数据的映射的RDD
     * @param actionRDD
     * @return
     */

    public static JavaPairRDD<String,Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD){
        return actionRDD.mapToPair(new PairFunction<Row, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Row> call(Row row) throws Exception {
                return new Tuple2<String,Row>(row.getString(2),row);
            }
        });
    }
    /**
     * 随机抽取session
     * @param sessionid2AggrInfoRDD
     */
    private static void randomExtractSession(
            final long taskid,
            JavaPairRDD<String, String> sessionid2AggrInfoRDD,
            JavaPairRDD<String,Row> sessionid2actionRDD) {
        // 第一步,计算出每天每小时的session数量,获取<yyyy-MM-dd_HH,sessionid>格式的RDD
        JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(

                new PairFunction<Tuple2<String,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<String, String> tuple) throws Exception {
                        String aggrInfo = tuple._2;

                        String startTime = StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.FIELD_START_TIME);
                        String dateHour = DateUtils.getDateHour(startTime);

                        return new Tuple2<String, String>(dateHour, aggrInfo);
                    }

                });



        /**
         * 思考一下:这里我们不要着急写大量的代码,做项目的时候,一定要用脑子多思考
         *
         * 每天每小时的session数量,然后计算出每天每小时的session抽取索引,遍历每天每小时session
         * 首先抽取出的session的聚合数据,写入session_random_extract表
         * 所以第一个RDD的value,应该是session聚合数据
         *
         */

        // 得到每天每小时的session数量
        Map<String, Long> countMap =  time2sessionidRDD.countByKey();

        //第二步,使用按时间比例随机抽取算法,计算出每天每小时需要抽取session的索引

        //将<yyyy-MM-dd_HH,count>格式的map,转换为<yyyy-MM-dd,<HH,count>>
        Map<String,Map<String,Long>> dateHourCountMap = new HashMap<String,Map<String,Long>>();
        for (Map.Entry<String, Long> countEntry : countMap.entrySet()){
            String dateHour = countEntry.getKey();
            String date = dateHour.split("_")[0];
            String hour = dateHour.split("_")[1];
            long count = countEntry.getValue();

            Map<String,Long> hourCountMap = dateHourCountMap.get(date);
            if (hourCountMap ==null){
                hourCountMap = new HashMap<String,Long>();
                dateHourCountMap.put(date,hourCountMap);
            }
            dateHourCountMap.put(date,hourCountMap);
        }



        //开始实现按时间比例随机抽取算法

        //总共要抽取100个session,按照天数,进行平分
        int extractNumberPerDay = 100 /dateHourCountMap.size();

        //<date,<hour,(1,3,4,2103)>>
        Map<String,Map<String, List<Integer>>> dateHourExtractMap =
                new HashMap<String,Map<String,List<Integer>>>();

        Random random = new Random();

        for (Map.Entry<String,Map<String,Long>> dateHourCountEntry : dateHourCountMap.entrySet()){
            String date = dateHourCountEntry.getKey();
            Map<String,Long> hourCountMap = dateHourCountEntry.getValue();

            //计算出每天的session总数
            long sessionCount = 0L;
            for (long hourCount : hourCountMap.values()){
                sessionCount += hourCount;
            }

            Map<String,List<Integer>> hourExtractMap  = dateHourExtractMap.get(date);
            if (hourExtractMap == null){
                hourExtractMap = new HashMap<String,List<Integer>>();
                dateHourExtractMap.put(date,hourExtractMap);
            }

            //遍历每一个小时
            for (Map.Entry<String,Long> hourCountEntry : hourCountMap.entrySet()){
                String hour = hourCountEntry.getKey();
                long count = hourCountEntry.getValue();

                // 计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量
                // 就可以计算出,当前小时需要抽取的session数量
                int hourExtractNumber = (int)((double)count/(double) sessionCount)*extractNumberPerDay;
                if (hourExtractNumber > count){
                    hourExtractNumber = (int)count;
                }

                //先获取当前小时的存放随机数的list
                List<Integer> extractIndexList = hourExtractMap.get(hour);
                if (extractIndexList == null){
                    extractIndexList = new ArrayList<Integer>();
                    hourExtractMap.put(hour,extractIndexList);
                }

                //生成上面计算出来的数量的随机数
                for (int i = 0; i < hourExtractNumber;i++){
                    int extractIndex = random.nextInt((int)count);
                    while (extractIndexList.contains(extractIndex)){
                        extractIndex = random.nextInt((int)count);
                    }
                    extractIndexList.add(extractIndex);
                }

            }

        }

        /**
         * 第三步:遍历每天每小时的session,然后根据随机索引进行抽取
         */

        // 执行groupByKey算子,得到<dateHour,(session aggrInfo)>
        JavaPairRDD<String,Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey();

        // 我们用flatMap算子,遍历所有的<dateHour,(session aggrInfo)>格式的数据
        // 然后呢,会遍历每天每小时的session
        // 如果发现某个session恰巧在我们指定的这天这小时的随机抽取索引上
        // 那么抽取该session,直接写入MySQL的random_extract_session表
        // 将抽取出来的session id返回回来,形成一个新的JavaRDD<String>
        // 然后最后一步,是用抽取出来的sessionid,去join它们的访问行为明细数据,写入session表

        JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair(

                new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<Tuple2<String, String>> call(
                            Tuple2<String, Iterable<String>> tuple)
                            throws Exception {
                        List<Tuple2<String, String>> extractSessionids =
                                new ArrayList<Tuple2<String, String>>();

                        String dateHour = tuple._1;
                        String date = dateHour.split("_")[0];
                        String hour = dateHour.split("_")[1];
                        Iterator<String> iterator = tuple._2.iterator();

                        List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour);

                        ISessionRandomExtractDAO sessionRandomExtractDAO =
                                DAOFactory.getSessionRandomExtractDAO();

                        int index = 0;
                        while(iterator.hasNext()) {
                            String sessionAggrInfo = iterator.next();

                            if(extractIndexList.contains(index)) {
                                String sessionid = StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                                // 将数据写入MySQL
                                SessionRandomExtract sessionRandomExtract = new SessionRandomExtract();
                                sessionRandomExtract.setTaskid(taskid);
                                sessionRandomExtract.setSessionid(sessionid);
                                sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_START_TIME));
                                sessionRandomExtract.setSerachKeyWords(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS));
                                sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS));

                                sessionRandomExtractDAO.insert(sessionRandomExtract);

                                // 将sessionid加入list
                                extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid));
                            }

                            index++;
                        }

                        return (Iterator<Tuple2<String, String>>) extractSessionids;
                    }

                });


        /**
         * 第四步:获取抽取出来的session的明细数据
         */
        JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD =
                extractSessionidsRDD.join(sessionid2actionRDD);
        extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
                Row row = tuple._2._2;

                SessionDetail sessionDetail = new SessionDetail();
                sessionDetail.setTaskid(taskid);
                sessionDetail.setUserid(row.getLong(0));
                sessionDetail.setSessionid(row.getString(1));
                sessionDetail.setPageid(row.getLong(2));
                sessionDetail.setActionTime(row.getString(3));
                sessionDetail.setSeachKeyWord(row.getString(4));
                sessionDetail.setClickCategoryId(row.getLong(5));
                sessionDetail.setClickProductId(row.getLong(6));
                sessionDetail.setOrderCategoryIds(row.getString(7));
                sessionDetail.setOrderProductIds(row.getString(8));
                sessionDetail.setPayCategoryIds(row.getString(9));
                sessionDetail.setPayProductIds(row.getString(11));

                ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
                sessionDetailDAO.insert(sessionDetail);
            }
        });

    }

UserVisitSessionAnalyzeSpark.java完整代码

package graduation.java.spark;

/**
 * FileName: UserVisitSessionAnlyizSpark
 * Author:   hadoop
 * Email:    3165845957@qq.com
 * Date:     19-3-1 上午10:41
 * Description:
 * 用户访问session分析Spark作业
 *
 * 接收用户创建的分析任务,用户可能指定的条件如下:
 *
 * 1、时间范围:起始日期~结束日期
 * 2、性别:男或女
 * 3、年龄范围
 * 4、职业:多选
 * 5、城市:多选
 * 6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件
 * 7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件
 *
 * 我们的spark作业如何接受用户创建的任务?
 *
 * J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param
 * 字段中
 *
 * 接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本
 * spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数
 * 参数就封装在main函数的args数组中
 *
 * 这是spark本身提供的特性
 */

import com.alibaba.fastjson.JSONObject;
import graduation.java.conf.ConfigurationManager;
import graduation.java.constant.Constants;
import graduation.java.dao.ISessionAggrStatDAO;
import graduation.java.dao.ISessionDetailDAO;
import graduation.java.dao.ISessionRandomExtractDAO;
import graduation.java.dao.ITaskDAO;
import graduation.java.domain.SessionAggrStat;
import graduation.java.domain.SessionDetail;
import graduation.java.domain.SessionRandomExtract;
import graduation.java.domain.Task;
import graduation.java.impl.DAOFactory;
import graduation.java.test.MockData;
import graduation.java.util.*;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import scala.Tuple2;
import org.apache.spark.sql.SQLContext;


import java.util.*;


/**
 * 用户访问session分析Spark作业
 *
 *
 */
public class UserVisitSessionAnalyzeSpark {

    public static void main(String[] args) {

        Logger.getLogger("org").setLevel(Level.ERROR);
        args = new String[]{"1"};
        // 构建Spark上下文
        SparkConf conf = new SparkConf()
                .setAppName(Constants.SPARK_APP_NAME_SESSION)
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = getSQLContext(sc.sc());

        // 生成模拟测试数据
        mockData(sc, sqlContext);
        //创建需要使用的DAO组件
        ITaskDAO taskDAO = DAOFactory.getTaskDAO();

        //首先查询书来指定的任务,并获取任务查询参数
        long taskid = ParamUtils.getTaskIdFromArgs(args);
        Task task = taskDAO.findById(taskid);

        //测试
        //System.out.println("taskId: "+taskid);
        JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());

        //测试
        /*if (taskParam !=null){
            System.out.println("taskParam is "+ taskParam.values());
        }*/

        // 如果要进行session粒度的数据聚合
        // 首先要从user_visit_action表中,查询出来指定日期范围内的行为数据
        JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext,taskParam);


        JavaPairRDD<String,Row> session2actionRDD = getSessionid2ActionRDD(actionRDD);


        /*System.out.println("*****************************************");
        actionRDD.rdd().count();
        System.out.println("*****************************************");
        for (Row row : actionRDD.take(10)){
            System.out.println(row.toString());
        }*/


        // 首先,可以将行为数据,按照session_id进行groupByKey分组
        // 此时的数据的粒度就是session粒度了,然后呢,可以将session粒度的数据
        // 与用户信息数据,进行join
        // 然后就可以获取到session粒度的数据,同时呢,数据里面还包含了session对应的user的信息

        //测试
        //System.out.println("sessionid2AggrInfoRDD");



        JavaPairRDD<String,String> sessionid2AggrInfoRDD = aggregateBySession(sqlContext,actionRDD);
        //测试
        //sessionid2AggrInfoRDD.count();
       /* System.out.println("**************sessionid2AggrInfoRDD**************");
        for (Tuple2<String,String> tuple2 : sessionid2AggrInfoRDD.take(10)){
            System.out.println(tuple2.toString());
        }
        System.out.println("**************sessionid2AggrInfoRDD**************");*/

        // 接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤
        // 相当于我们自己编写的算子,是要访问外面的任务参数对象的
        // 所以,大家记得我们之前说的,匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的

        // 重构,同时进行过滤和统计
        Accumulator<String> sessionAggrStatAccumulator = sc.accumulator(
                "", new SessionAggrStatAccumulator());
        JavaPairRDD<String,String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(sessionid2AggrInfoRDD,taskParam,sessionAggrStatAccumulator);
        //测试
       /* System.out.println("\n\n\n\n\filteredSessionid2InfoRDD");
        filteredSessionid2AggrInfoRDD.count();
        for(Tuple2<String,String> tuple : filteredSessionid2AggrInfoRDD.take(10)){
            System.out.println(tuple);
        }*/


        /**
         * 对于Accumulator这种分布式累加计算的变量的使用,有一个重要说明
         *
         * 从Accumulator中,获取数据,插入数据库的时候,一定要,一定要,是在有某一个action操作以后
         * 再进行。。。
         *
         * 如果没有action的话,那么整个程序根本不会运行。。。
         *
         * 是不是在calculateAndPersisitAggrStat方法之后,运行一个action操作,比如count、take
         * 不对!!!
         *
         * 必须把能够触发job执行的操作,放在最终写入MySQL方法之前
         *
         * 计算出来的结果,在J2EE中,是怎么显示的,是用两张柱状图显示
         */

        /*System.out.println(filteredSessionid2AggrInfoRDD.count());*/


        /**
         * 特别说明
         * 我们知道,要将上一个功能的session聚合统计数据获取到,就必须是在一个action操作触发job之后
         * 才能从Accumulator中获取数据,否则是获取不到数据的,因为没有job执行,Accumulator的值为空
         * 所以,我们在这里,将随机抽取的功能的实现代码,放在session聚合统计功能的最终计算和写库之前
         * 因为随机抽取功能中,有一个countByKey算子,是action操作,会触发job
         */
        randomExtractSession(task.getTaskId(),filteredSessionid2AggrInfoRDD,session2actionRDD);


        //计算出各个范围的session占比,并写入Mysql
      // calculateAndPersistAggrStat(sessionAggrStatAccumulator.value(),task.getTaskId());
        /**
         * session聚合统计(统计出访问时长和访问步长,各个区间的session数量占总session数量的比例)
         *
         * 如果不进行重构,直接来实现,思路:
         * 1、actionRDD,映射成<sessionid,Row>的格式
         * 2、按sessionid聚合,计算出每个session的访问时长和访问步长,生成一个新的RDD
         * 3、遍历新生成的RDD,将每个session的访问时长和访问步长,去更新自定义Accumulator中的对应的值
         * 4、使用自定义Accumulator中的统计值,去计算各个区间的比例
         * 5、将最后计算出来的结果,写入MySQL对应的表中
         *
         * 普通实现思路的问题:
         * 1、为什么还要用actionRDD,去映射?其实我们之前在session聚合的时候,映射已经做过了。多此一举
         * 2、是不是一定要,为了session的聚合这个功能,单独去遍历一遍session?其实没有必要,已经有session数据
         * 		之前过滤session的时候,其实,就相当于,是在遍历session,那么这里就没有必要再过滤一遍了
         *
         * 重构实现思路:
         * 1、不要去生成任何新的RDD(处理上亿的数据)
         * 2、不要去单独遍历一遍session的数据(处理上千万的数据)
         * 3、可以在进行session聚合的时候,就直接计算出来每个session的访问时长和访问步长
         * 4、在进行过滤的时候,本来就要遍历所有的聚合session信息,此时,就可以在某个session通过筛选条件后
         * 		将其访问时长和访问步长,累加到自定义的Accumulator上面去
         * 5、就是两种截然不同的思考方式,和实现方式,在面对上亿,上千万数据的时候,甚至可以节省时间长达
         * 		半个小时,或者数个小时
         *
         * 开发Spark大型复杂项目的一些经验准则:
         * 1、尽量少生成RDD
         * 2、尽量少对RDD进行算子操作,如果有可能,尽量在一个算子里面,实现多个需要做的功能
         * 3、尽量少对RDD进行shuffle算子操作,比如groupByKey、reduceByKey、sortByKey(map、mapToPair)
         * 		shuffle操作,会导致大量的磁盘读写,严重降低性能
         * 		有shuffle的算子,和没有shuffle的算子,甚至性能,会达到几十分钟,甚至数个小时的差别
         * 		有shfufle的算子,很容易导致数据倾斜,一旦数据倾斜,简直就是性能杀手(完整的解决方案)
         * 4、无论做什么功能,性能第一
         * 		在传统的J2EE或者.NET后者PHP,软件/系统/网站开发中,我认为是架构和可维护性,可扩展性的重要
         * 		程度,远远高于了性能,大量的分布式的架构,设计模式,代码的划分,类的划分(高并发网站除外)
         *
         * 		在大数据项目中,比如MapReduce、Hive、Spark、Storm,我认为性能的重要程度,远远大于一些代码
         * 		的规范,和设计模式,代码的划分,类的划分;大数据,大数据,最重要的,就是性能
         * 		主要就是因为大数据以及大数据项目的特点,决定了,大数据的程序和项目的速度,都比较慢
         * 		如果不优先考虑性能的话,会导致一个大数据处理程序运行时间长度数个小时,甚至数十个小时
         * 		此时,对于用户体验,简直就是一场灾难
         *
         * 		所以,推荐大数据项目,在开发和代码的架构中,优先考虑性能;其次考虑功能代码的划分、解耦合
         *
         * 		我们如果采用第一种实现方案,那么其实就是代码划分(解耦合、可维护)优先,设计优先
         * 		如果采用第二种方案,那么其实就是性能优先
         *
         * 		讲了这么多,其实大家不要以为我是在岔开话题,大家不要觉得项目的课程,就是单纯的项目本身以及
         * 		代码coding最重要,其实项目,我觉得,最重要的,除了技术本身和项目经验以外;非常重要的一点,就是
         * 		积累了,处理各种问题的经验
         *
         */

        // 关闭Spark上下文
        sc.close();
    }




    /**
     * 获取SQLContext
     * 如果是在本地测试环境的话,那么就生成SQLContext对象
     * 如果是在生产环境运行的话,那么就生成HiveContext对象
     * @param sc SparkContext
     * @return SQLContext
     */
    private static SQLContext getSQLContext(SparkContext sc) {
        boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
        if(local) {
            return new SQLContext(sc);
        } else {
            return new HiveContext(sc);
        }
    }

    /**
     * 生成模拟数据(只有本地模式,才会去生成模拟数据)
     * @param sc
     * @param sqlContext
     */
    private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {
        boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
        if(local) {
            MockData.mock(sc, sqlContext);
        }
    }

    /**
     * 获取指定日期范围内的用户访问行为数据
     * @param sqlContext SQLContext
     * @param taskParam 任务参数
     * @return 行为数据RDD
     */
    private static JavaRDD<Row> getActionRDDByDateRange(
            SQLContext sqlContext, JSONObject taskParam) {
        String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
        String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);

        String sql =
                "select * "
                        + "from user_visit_action "
                        + "where date>='" + startDate + "' "
                        + "and date<='" + endDate + "'";

        Dataset actionDF = sqlContext.sql(sql);
        System.out.println("actionDF");
        actionDF.show(10);

        return actionDF.javaRDD();
    }

    /**
     * 获取sessionid到访问行为数据的映射的RDD
     * @param actionRDD
     * @return
     */

    public static JavaPairRDD<String,Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD){
        return actionRDD.mapToPair(new PairFunction<Row, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Row> call(Row row) throws Exception {
                return new Tuple2<String,Row>(row.getString(2),row);
            }
        });
    }




    /**
     * 对行为数据按session粒度进行聚合
     * @param actionRDD 行为数据RDD
     * @return session粒度聚合数据
     */
    private static JavaPairRDD<String, String> aggregateBySession(
            SQLContext sqlContext, JavaRDD<Row> actionRDD) {
        // 现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索
        // 我们现在需要将这个Row映射成<sessionid,Row>的格式
        JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(

                /**
                 * PairFunction
                 * 第一个参数,相当于是函数的输入
                 * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值
                 */
                new PairFunction<Row, String, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Row> call(Row row) throws Exception {
                        return new Tuple2<String, Row>(row.getString(2), row);
                    }

                });

        // 对行为数据按session粒度进行分组
        JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
                sessionid2ActionRDD.groupByKey();

        // 对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来
        // 到此为止,获取的数据格式,如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
        JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(

                new PairFunction<Tuple2<String,Iterable<Row>>, Long, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
                            throws Exception {
                        String sessionid = tuple._1;
                        Iterator<Row> iterator = tuple._2.iterator();

                        StringBuffer searchKeywordsBuffer = new StringBuffer("");
                        StringBuffer clickCategoryIdsBuffer = new StringBuffer("");

                        Long userid = null;

                        // session的起始和结束时间
                        Date startTime = null;
                        Date endTime = null;
                        // session的访问步长
                        int stepLength = 0;

                        // 遍历session所有的访问行为
                        while(iterator.hasNext()) {
                            // 提取每个访问行为的搜索词字段和点击品类字段
                            Row row = iterator.next();
                            if(userid == null) {
                                userid = row.getLong(1);
                            }
                            System.out.println("row.toString"+row.toString());
                            String searchKeyword = row.getString(5);
                            Long clickCategoryId = row.getLong(6);
                            //String clickCategoryId = String.valueOf(row.getLong(6));

                            // 实际上这里要对数据说明一下
                            // 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的
                            // 其实,只有搜索行为,是有searchKeyword字段的
                            // 只有点击品类的行为,是有clickCategoryId字段的
                            // 所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的

                            // 我们决定是否将搜索词或点击品类id拼接到字符串中去
                            // 首先要满足:不能是null值
                            // 其次,之前的字符串中还没有搜索词或者点击品类id

                            if(StringUtils.isNotEmpty(searchKeyword)) {
                                if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
                                    searchKeywordsBuffer.append(searchKeyword + ",");
                                }
                            }
                            if(clickCategoryId != Long.MAX_VALUE) {
                                if(!clickCategoryIdsBuffer.toString().contains(
                                        String.valueOf(clickCategoryId))) {
                                    clickCategoryIdsBuffer.append(clickCategoryId + ",");
                                }
                            }

                            // 计算session开始和结束时间
                            Date actionTime = DateUtils.parseTime(row.getString(4));

                            if(startTime == null) {
                                startTime = actionTime;
                            }
                            if(endTime == null) {
                                endTime = actionTime;
                            }

                            if(actionTime.before(startTime)) {
                                startTime = actionTime;
                            }
                            if(actionTime.after(endTime)) {
                                endTime = actionTime;
                            }

                            // 计算session访问步长
                            stepLength++;
                        }

                        String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
                        String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());

                        // 计算session访问时长(秒)
                        long visitLength = (endTime.getTime() - startTime.getTime()) / 1000;

                        // 大家思考一下
                        // 我们返回的数据格式,即使<sessionid,partAggrInfo>
                        // 但是,这一步聚合完了以后,其实,我们是还需要将每一行数据,跟对应的用户信息进行聚合
                        // 问题就来了,如果是跟用户信息进行聚合的话,那么key,就不应该是sessionid
                        // 就应该是userid,才能够跟<userid,Row>格式的用户信息进行聚合
                        // 如果我们这里直接返回<sessionid,partAggrInfo>,还得再做一次mapToPair算子
                        // 将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举

                        // 所以,我们这里其实可以直接,返回的数据格式,就是<userid,partAggrInfo>
                        // 然后跟用户信息join的时候,将partAggrInfo关联上userInfo
                        // 然后再直接将返回的Tuple的key设置成sessionid
                        // 最后的数据格式,还是<sessionid,fullAggrInfo>

                        // 聚合数据,用什么样的格式进行拼接?
                        // 我们这里统一定义,使用key=value|key=value
                        String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
                                + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
                                + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|"
                                + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|"
                                + Constants.FIELD_STEP_LENGTH + "=" + stepLength
                                +Constants.FIELD_START_TIME + "=" +DateUtils.formateTime(startTime);
                        //测试
                        //System.out.println("partAggrInfo: "+ partAggrInfo);

                        return new Tuple2<Long, String>(userid, partAggrInfo);
                    }

                });

        // 查询所有用户数据,并映射成<userid,Row>的格式
        String sql = "select * from user_info";
        JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();

        JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(

                new PairFunction<Row, Long, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, Row> call(Row row) throws Exception {
                        return new Tuple2<Long, Row>(row.getLong(0), row);
                    }

                });

        // 将session粒度聚合数据,与用户信息进行join
        JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
                userid2PartAggrInfoRDD.join(userid2InfoRDD);

        // 对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据
        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(

                new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<Long, Tuple2<String, Row>> tuple)
                            throws Exception {
                        String partAggrInfo = tuple._2._1;
                        Row userInfoRow = tuple._2._2;

                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);

                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;

                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }

                });

        return sessionid2FullAggrInfoRDD;
    }



    /**
     * 过滤session数据
     * @param sessionid2AggrInfoRDD
     * @return
     */
    private static JavaPairRDD<String, String> filterSessionAndAggrStat(
            JavaPairRDD<String, String> sessionid2AggrInfoRDD,
            final JSONObject taskParam, final Accumulator sessionAggrStatAccumulator) {
        // 为了使用我们后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串
        // 此外,这里其实大家不要觉得是多此一举
        // 其实我们是给后面的性能优化埋下了一个伏笔
        String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE);
        String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
        String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);
        String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);
        String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);
        String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
        String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);

        String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")
                + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")
                + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
                + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")
                + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")
                + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")
                + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds: "");

        if(_parameter.endsWith("\\|")) {
            _parameter = _parameter.substring(0, _parameter.length() - 1);
        }

        final String parameter = _parameter;

        // 根据筛选参数进行过滤
        JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(

                new Function<Tuple2<String,String>, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, String> tuple) throws Exception {
                        // 首先,从tuple中,获取聚合数据
                        String aggrInfo = tuple._2;

                        // 接着,依次按照筛选条件进行过滤
                        // 按照年龄范围进行过滤(startAge、endAge)
                        if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE,
                                parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
                            return false;
                        }

                        // 按照职业范围进行过滤(professionals)
                        // 互联网,IT,软件
                        // 互联网
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL,
                                parameter, Constants.PARAM_PROFESSIONALS)) {
                            return false;
                        }

                        // 按照城市范围进行过滤(cities)
                        // 北京,上海,广州,深圳
                        // 成都
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY,
                                parameter, Constants.PARAM_CITIES)) {
                            return false;
                        }

                        // 按照性别进行过滤
                        // 男/女
                        // 男,女
                        if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX,
                                parameter, Constants.PARAM_SEX)) {
                            return false;
                        }

                        // 按照搜索词进行过滤
                        // 我们的session可能搜索了 火锅,蛋糕,烧烤
                        // 我们的筛选条件可能是 火锅,串串香,iphone手机
                        // 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中
                        // 任何一个搜索词相当,即通过
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS,
                                parameter, Constants.PARAM_KEYWORDS)) {
                            return false;
                        }

                        // 按照点击品类id进行过滤
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS,
                                parameter, Constants.PARAM_CATEGORY_IDS)) {
                            return false;
                        }

                        // 如果经过了之前的多个过滤条件之后,程序能够走到这里
                        // 那么就说明,该session是通过了用户指定的筛选条件的,也就是需要保留的session
                        // 那么就要对session的访问时长和访问步长,进行统计,根据session对应的范围
                        // 进行相应的累加计数

                        // 主要走到这一步,那么就是需要计数的session
                        sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
                        //计算出session的访问时间和步长的范围,并进行想要的累加
                        long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(aggrInfo,"\\|",Constants.FIELD_VISIT_LENGTH));
                        long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(aggrInfo,"\\|",Constants.FIELD_STEP_LENGTH));
                        calculateVisitLength(visitLength);
                        calculateStepLength(stepLength);
                        return true;
                    }

                    /**
                     * 计算访问时间范围
                     * @param visitLength
                     */
                    private void calculateVisitLength(long visitLength){
                        if (visitLength >=1 && visitLength <= 3){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
                        } else  if (visitLength >=4 && visitLength <= 6){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);
                        } else  if (visitLength >=7 && visitLength <= 9){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);
                        } else  if (visitLength >=10 && visitLength <= 30){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);
                        } else  if (visitLength > 30 && visitLength <= 60){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);
                        } else  if (visitLength > 60 && visitLength <= 180){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);
                        } else  if (visitLength > 180 && visitLength <= 600){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);
                        } else  if (visitLength > 600 && visitLength <= 1800){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);
                        } else  if (visitLength > 1800){
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
                        }
                    }

                    /**
                     * 计算访问步长范围
                     * @param stepLength
                     */
                    private void calculateStepLength(long stepLength){
                        if (stepLength >= 1 && stepLength <= 3){
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);
                        } else if(stepLength >= 4 && stepLength <= 6){
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);
                        } else if (stepLength >= 7 && stepLength <= 9){
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);
                        } else if(stepLength >= 10 && stepLength <= 30){
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);
                        } else if(stepLength > 30 && stepLength <= 60){
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
                        } else if (stepLength > 60){
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
                        }
                    }

                });

        return filteredSessionid2AggrInfoRDD;
    }




    /**
     * 随机抽取session
     * @param sessionid2AggrInfoRDD
     */
    private static void randomExtractSession(
            final long taskid,
            JavaPairRDD<String, String> sessionid2AggrInfoRDD,
            JavaPairRDD<String,Row> sessionid2actionRDD) {
        // 第一步,计算出每天每小时的session数量,获取<yyyy-MM-dd_HH,sessionid>格式的RDD
        JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(

                new PairFunction<Tuple2<String,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<String, String> tuple) throws Exception {
                        String aggrInfo = tuple._2;

                        String startTime = StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.FIELD_START_TIME);
                        String dateHour = DateUtils.getDateHour(startTime);

                        return new Tuple2<String, String>(dateHour, aggrInfo);
                    }

                });



        /**
         * 思考一下:这里我们不要着急写大量的代码,做项目的时候,一定要用脑子多思考
         *
         * 每天每小时的session数量,然后计算出每天每小时的session抽取索引,遍历每天每小时session
         * 首先抽取出的session的聚合数据,写入session_random_extract表
         * 所以第一个RDD的value,应该是session聚合数据
         *
         */

        // 得到每天每小时的session数量
        Map<String, Long> countMap =  time2sessionidRDD.countByKey();

        //第二步,使用按时间比例随机抽取算法,计算出每天每小时需要抽取session的索引

        //将<yyyy-MM-dd_HH,count>格式的map,转换为<yyyy-MM-dd,<HH,count>>
        Map<String,Map<String,Long>> dateHourCountMap = new HashMap<String,Map<String,Long>>();
        for (Map.Entry<String, Long> countEntry : countMap.entrySet()){
            String dateHour = countEntry.getKey();
            String date = dateHour.split("_")[0];
            String hour = dateHour.split("_")[1];
            long count = countEntry.getValue();

            Map<String,Long> hourCountMap = dateHourCountMap.get(date);
            if (hourCountMap ==null){
                hourCountMap = new HashMap<String,Long>();
                dateHourCountMap.put(date,hourCountMap);
            }
            dateHourCountMap.put(date,hourCountMap);
        }



        //开始实现按时间比例随机抽取算法

        //总共要抽取100个session,按照天数,进行平分
        int extractNumberPerDay = 100 /dateHourCountMap.size();

        //<date,<hour,(1,3,4,2103)>>
        Map<String,Map<String, List<Integer>>> dateHourExtractMap =
                new HashMap<String,Map<String,List<Integer>>>();

        Random random = new Random();

        for (Map.Entry<String,Map<String,Long>> dateHourCountEntry : dateHourCountMap.entrySet()){
            String date = dateHourCountEntry.getKey();
            Map<String,Long> hourCountMap = dateHourCountEntry.getValue();

            //计算出每天的session总数
            long sessionCount = 0L;
            for (long hourCount : hourCountMap.values()){
                sessionCount += hourCount;
            }

            Map<String,List<Integer>> hourExtractMap  = dateHourExtractMap.get(date);
            if (hourExtractMap == null){
                hourExtractMap = new HashMap<String,List<Integer>>();
                dateHourExtractMap.put(date,hourExtractMap);
            }

            //遍历每一个小时
            for (Map.Entry<String,Long> hourCountEntry : hourCountMap.entrySet()){
                String hour = hourCountEntry.getKey();
                long count = hourCountEntry.getValue();

                // 计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量
                // 就可以计算出,当前小时需要抽取的session数量
                int hourExtractNumber = (int)((double)count/(double) sessionCount)*extractNumberPerDay;
                if (hourExtractNumber > count){
                    hourExtractNumber = (int)count;
                }

                //先获取当前小时的存放随机数的list
                List<Integer> extractIndexList = hourExtractMap.get(hour);
                if (extractIndexList == null){
                    extractIndexList = new ArrayList<Integer>();
                    hourExtractMap.put(hour,extractIndexList);
                }

                //生成上面计算出来的数量的随机数
                for (int i = 0; i < hourExtractNumber;i++){
                    int extractIndex = random.nextInt((int)count);
                    while (extractIndexList.contains(extractIndex)){
                        extractIndex = random.nextInt((int)count);
                    }
                    extractIndexList.add(extractIndex);
                }

            }

        }

        /**
         * 第三步:遍历每天每小时的session,然后根据随机索引进行抽取
         */

        // 执行groupByKey算子,得到<dateHour,(session aggrInfo)>
        JavaPairRDD<String,Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey();

        // 我们用flatMap算子,遍历所有的<dateHour,(session aggrInfo)>格式的数据
        // 然后呢,会遍历每天每小时的session
        // 如果发现某个session恰巧在我们指定的这天这小时的随机抽取索引上
        // 那么抽取该session,直接写入MySQL的random_extract_session表
        // 将抽取出来的session id返回回来,形成一个新的JavaRDD<String>
        // 然后最后一步,是用抽取出来的sessionid,去join它们的访问行为明细数据,写入session表

        JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair(

                new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<Tuple2<String, String>> call(
                            Tuple2<String, Iterable<String>> tuple)
                            throws Exception {
                        List<Tuple2<String, String>> extractSessionids =
                                new ArrayList<Tuple2<String, String>>();

                        String dateHour = tuple._1;
                        String date = dateHour.split("_")[0];
                        String hour = dateHour.split("_")[1];
                        Iterator<String> iterator = tuple._2.iterator();

                        List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour);

                        ISessionRandomExtractDAO sessionRandomExtractDAO =
                                DAOFactory.getSessionRandomExtractDAO();

                        int index = 0;
                        while(iterator.hasNext()) {
                            String sessionAggrInfo = iterator.next();

                            if(extractIndexList.contains(index)) {
                                String sessionid = StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                                // 将数据写入MySQL
                                SessionRandomExtract sessionRandomExtract = new SessionRandomExtract();
                                sessionRandomExtract.setTaskid(taskid);
                                sessionRandomExtract.setSessionid(sessionid);
                                sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_START_TIME));
                                sessionRandomExtract.setSerachKeyWords(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS));
                                sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString(
                                        sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS));

                                sessionRandomExtractDAO.insert(sessionRandomExtract);

                                // 将sessionid加入list
                                extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid));
                            }

                            index++;
                        }

                        return (Iterator<Tuple2<String, String>>) extractSessionids;
                    }

                });


        /**
         * 第四步:获取抽取出来的session的明细数据
         */
        JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD =
                extractSessionidsRDD.join(sessionid2actionRDD);
        extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
                Row row = tuple._2._2;

                SessionDetail sessionDetail = new SessionDetail();
                sessionDetail.setTaskid(taskid);
                sessionDetail.setUserid(row.getLong(0));
                sessionDetail.setSessionid(row.getString(1));
                sessionDetail.setPageid(row.getLong(2));
                sessionDetail.setActionTime(row.getString(3));
                sessionDetail.setSeachKeyWord(row.getString(4));
                sessionDetail.setClickCategoryId(row.getLong(5));
                sessionDetail.setClickProductId(row.getLong(6));
                sessionDetail.setOrderCategoryIds(row.getString(7));
                sessionDetail.setOrderProductIds(row.getString(8));
                sessionDetail.setPayCategoryIds(row.getString(9));
                sessionDetail.setPayProductIds(row.getString(11));

                ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
                sessionDetailDAO.insert(sessionDetail);
            }
        });

    }





    /**
     * 计算各session范围占比,并写入MySQL
     * @param value
     * @param taskId
     */
    private static void calculateAndPersistAggrStat(String value, long taskId) {
        //从Accumulate统计串中获取值
        long session_count = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.SESSION_COUNT));

        long visit_length_1s_3s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1s_3s));
        long visit_length_4s_6s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_4s_6s));
        long visit_length_7s_9s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_7s_9s));
        long visit_length_10s_30s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10s_30s));
        long visit_length_30s_60s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30s_60s));
        long visit_length_1m_3m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1m_3m));
        long visit_length_3m_10m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_3m_10m));
        long visit_length_10m_30m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10m_30m));
        long visit_length_30m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30m));

        long step_length_1_3 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_1_3));
        long step_length_4_6 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_4_6));
        long step_length_7_9 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_7_9));
        long step_length_10_30 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_10_30));
        long step_length_30_60 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_30_60));
        long step_length_60 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_60));


        //计算各个访问时长和步长的占比
        double visit_length_1s_3s_ratio = NumberUtils.formatDouble((double) visit_length_1s_3s/session_count,2);
        double visit_length_4s_6s_ratio = NumberUtils.formatDouble((double)visit_length_4s_6s/session_count,2);
        double visit_length_7s_9s_ratio = NumberUtils.formatDouble((double)visit_length_7s_9s/session_count,2);
        double visit_length_10s_30s_ratio = NumberUtils.formatDouble((double)visit_length_10s_30s/session_count,2);
        double visit_length_30s_60s_ratio = NumberUtils.formatDouble((double)visit_length_30s_60s/session_count,2);
        double visit_length_1m_3m_ratio = NumberUtils.formatDouble((double)visit_length_1m_3m/session_count,2);
        double visit_length_3m_10m_ratio = NumberUtils.formatDouble((double)visit_length_3m_10m/session_count,2);
        double visit_length_10m_30m_ratio = NumberUtils.formatDouble((double)visit_length_10m_30m/session_count,2);
        double visit_length_30m_ratio = NumberUtils.formatDouble((double)
                visit_length_30m/session_count,2);

        double step_length_1_3_ratio = NumberUtils.formatDouble((double)
                step_length_1_3/session_count,2);
        double step_length_4_6_ratio = NumberUtils.formatDouble((double)
                step_length_4_6/session_count,2);
        double step_length_7_9_ratio = NumberUtils.formatDouble((double)
                step_length_7_9/session_count,2);
        double step_length_10_30_ratio = NumberUtils.formatDouble((double)step_length_10_30/session_count,2);
        double step_length_30_60_ratio = NumberUtils.formatDouble((double)step_length_30_60/session_count,2);
        double step_length_60_ratio = NumberUtils.formatDouble((double)
                step_length_60/session_count,2);

//        将统计封装为Domain对象
        SessionAggrStat sessionAggrStat = new SessionAggrStat();
        sessionAggrStat.setSession_count(session_count);
        sessionAggrStat.setTaskid(taskId);
        sessionAggrStat.setVisit_length_1s_3s_ratio(visit_length_1s_3s_ratio);
        sessionAggrStat.setVisit_length_4s_6s_ratio(visit_length_4s_6s_ratio);
        sessionAggrStat.setVisit_length_7s_9s_ratio(visit_length_7s_9s_ratio);
        sessionAggrStat.setVisit_length_10s_30s_ratio(visit_length_10s_30s_ratio);
        sessionAggrStat.setVisit_length_30s_60s_ratio(visit_length_30s_60s_ratio);
        sessionAggrStat.setVisit_length_1m_3m_ratio(visit_length_1m_3m_ratio);
        sessionAggrStat.setVisit_length_3m_10m_ratio(visit_length_3m_10m_ratio);
        sessionAggrStat.setVisit_length_10m_30m_ratio(visit_length_10m_30m_ratio);
        sessionAggrStat.setVisit_length_30m_ratio(visit_length_30m_ratio);

        sessionAggrStat.setStep_length_1_3_ratio(step_length_1_3_ratio);
        sessionAggrStat.setStep_length_4_6_ratio(step_length_4_6_ratio);
        sessionAggrStat.setStep_length_7_9_ratio(step_length_7_9_ratio);
        sessionAggrStat.setStep_length_10_30_ratio(step_length_10_30_ratio);
        sessionAggrStat.setStep_length_30_60_ratio(step_length_30_60_ratio);
        sessionAggrStat.setStep_length_60_ratio(step_length_60_ratio);

        ISessionAggrStatDAO sessionAggrStatDAO = DAOFactory.getSessionAggrStatDAO();
        sessionAggrStatDAO.insert(sessionAggrStat);
    }
}

 

2019-11-03 13:28:19 weixin_44345917 阅读数 28
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35084 人正在学习 去看看 张长志

Scala_Spark-电商平台离线分析项目-需求二Session随机抽取

样例类

/**
 * 需求二的样例类
 * session随机抽取详细表
 *
 * @param taskid    当前计算批次的id
 * @param userid    用户id
 * @param sessionid   session的id
 * @param pageid    某个页面的id
 * @param actionTime    点击行为的时间点
 * @param searchKeyword   用户搜索的关键字
 * @param clickCategoryId   某一个商品品类的id
 * @param clickProductId    某一个商品的id
 * @param orderCategoryIds    一次订单所有品类的id集合
 * @param orderProductIds   一次订单所有商品的id集合
 * @param payCategoryIds    一次支付中所有品类的id集合
 * @param payProductIds   一次支付中所有商品的id集合
 */
case class SessionDetail(
                        taskid:String,
                        userid:Long,
                        sessionid:String,
                        pageid:Long,
                        actionTime:String,
                        searchKeyword:String,
                        clickCategoryId:Long,
                        clickProductId:Long,
                        orderCategoryIds:String,
                        orderProductIds:String,
                        payCategoryIds:String,
                        payProductIds:String
                        )


/**
 * 需求二的样例类
 * Session随机抽取表
 *
 * @param taskid    当前计算批次的id
 * @param sessionid   抽取session的id
 * @param startTime   session开始的时间
 * @param searcKeywords   session查询的字段
 * @param clickCategoryIds    session点击的类别id的集合
 */
case class SessionRandomExtract(taskid:String,
                                sessionid:String,
                                startTime:String,
                                searcKeywords:String,
                                clickCategoryIds:String)


方法

 /**
   * (步骤7 需求二里的方法里的方法)
   * 根据数量随机生成下标
   *
   * @param extractNumberPerDay
   * @param dateSessionCount
   * @param hourCountMap
   * @param dateHourExtractIndexListMap
   * @return
   */
  def generateRandomIndexList(extractNumberPerDay: Int,
                              dateSessionCount: Long,
                              hourCountMap: mutable.HashMap[String, Long],
                              dateHourExtractIndexListMap: mutable.HashMap[String, ListBuffer[Int]]) = {
    for((hour,count) <- hourCountMap){
      //该hour抽样数 =(每小时的session个数除以当天session总数)*当天抽样session数
      var hourExrCount = ((count / dateSessionCount.toDouble)*extractNumberPerDay).toInt
      //避免一个小时抽取数超过这个小时的总数
      if(hourExrCount > count){
        hourExrCount = count.toInt
      }

      val random = new Random()

      dateHourExtractIndexListMap.get(hour) match{
        case None => dateHourExtractIndexListMap(hour) = new ListBuffer[Int]
          for(i <- 0 until hourExrCount){ //(0,hourExrCount)
            var index = random.nextInt(count.toInt)
            while (dateHourExtractIndexListMap(hour).contains(index)){
              index = random.nextInt(count.toInt)
            }

            dateHourExtractIndexListMap(hour).append(index) //是不是也可以用“+”
          }

        case Some(list) =>
          for(i <- 0 until hourExrCount){
            var index = random.nextInt(count.toInt)
            while(dateHourExtractIndexListMap(hour).contains(index)){
              index = random.nextInt(count.toInt)
            }
            dateHourExtractIndexListMap(hour).append(index)
          }
      }
      
    }


  }



  /**
   * 随机抽取session(步骤7 需求二里的)
 *
   * @param sparkSession
   * @param taskUUID
   * @param sessionId2FilteredRDD
   */
  def sessionRandomExtract(sparkSession: SparkSession,
                           taskUUID: String,
                           sessionId2FilteredRDD: RDD[(String, String)]): Unit = {
    // dataHourFullInfoRDD:RDD[(dataHour,fullInfo)]
    val dataHourFullInfoRDD = sessionId2FilteredRDD.map{
      case(sid,fullInfo) =>
//        println("========================fullInfo==="+fullInfo)

        val startTime = StringUtils.getFieldFromConcatString(fullInfo,"\\|",Constants.FIELD_START_TIME)
//        println("========================startTime==="+startTime)
        val dataHour = DateUtils.getDateHour(startTime) //yyyy-MM-dd HH:mm:ss)=>(yyyy-MM-dd_HH)
        (dataHour,fullInfo)
    }
    // 得到每天每小时的session数量 根据key
    val countMap: collection.Map[String, Long] = dataHourFullInfoRDD.countByKey()

    // 按时间比例随机抽取算法,计算出每天每小时要抽取的session的索引
    // 将<yyyy-MM-dd_HH,count>的map转换成<yyyy-MM-dd,<HH,count>>的格式
    val dateHourCountMap: mutable.HashMap[String, mutable.HashMap[String, Long]] = mutable.HashMap[String,mutable.HashMap[String,Long]]()
    for((dateHour,count) <- countMap){
      val date = dateHour.split("_")(0)
      val hour = dateHour.split("_")(1)
      //模式匹配 dataHourCountMap中 日期存在和日期不存在分别进行处理
      dateHourCountMap.get(date) match {
          // 不存在则新增
        case None => dateHourCountMap(date)=new mutable.HashMap[String,Long]();
          dateHourCountMap(date) += (hour-> count) //空累加一下?
          // 存在,累加
        case Some(hourCountMap) => hourCountMap += (hour -> count)
      }
    }

    // 按时间比例随机抽取算法
    // 每天抽取量=1000/总天数
    // 每小时要随机抽取的session数 =(每小时的session数/当天的session总数)*当天的抽取量
    //todo: 解决问题一:获取每一天要抽取的session数
    val extractNumberPerDay = 100 / dateHourCountMap.size
    //todo: 解决问题二:一天有多少session :dataHourCountMap(date).values.sum
    //todo: 解决问题三:一个小时有多少个session:dataHourCountMap(date)(hour)
    val dateHourExtractIndexListMap = new mutable.HashMap[String,mutable.HashMap[String,ListBuffer[Int]]]()

    //dateHourCountMap:Map[(date,Map[(hour,count)])]
    for((date,hourCountMap) <- dateHourCountMap) {
      val dateSessionCount = hourCountMap.values.sum

      dateHourExtractIndexListMap.get(date) match {
        case None => dateHourExtractIndexListMap(date) = new mutable.HashMap[String,ListBuffer[Int]]()
          generateRandomIndexList(extractNumberPerDay,dateSessionCount,hourCountMap,dateHourExtractIndexListMap(date))
        case Some(map) =>
          generateRandomIndexList(extractNumberPerDay,dateSessionCount,hourCountMap,dateHourExtractIndexListMap(date))
      }
    }
    /*到目前为止,获取到了每个小时要抽取的session的index*/


    //todo: 将map进行广播,提升任务性能
    var dateHourExtractIndexListMapBroadcast = sparkSession.sparkContext.broadcast(dateHourExtractIndexListMap)

    //<yyyy-MM-dd_HH,aggrInfo> ---- <yyyy-MM-dd_HH,(session aggrInfo)>
    val dateHour2GroupRDD = dataHourFullInfoRDD.groupByKey()  //dataHourFullInfoRDD:RDD[(dataHour,fullInfo)]

    //extractSessionRDD[SessionRandomExtract]
    val extractSessionRDD = dateHour2GroupRDD.flatMap{
      case(dateHour,iterableFullInfo) =>
        val date = dateHour.split("_")(0)
        val hour = dateHour.split("_")(1)

        val extractList = dateHourExtractIndexListMapBroadcast.value.get(date).get(hour)

        val extractSessionArrayBuffer = new ArrayBuffer[SessionRandomExtract]()

        var index=0

        for(fullInfo <- iterableFullInfo){
          if(extractList.contains(index)){
            val sessionId = StringUtils.getFieldFromConcatString(fullInfo,"\\|",Constants.FIELD_SESSION_ID)
            val startTime = StringUtils.getFieldFromConcatString(fullInfo,"\\|",Constants.FIELD_START_TIME)

            val searchKeywords = StringUtils.getFieldFromConcatString(fullInfo,"\\|",Constants.FIELD_CLICK_CATEGORY_IDS)
            val clickCategories = StringUtils.getFieldFromConcatString(fullInfo,"\\|",Constants.FIELD_CLICK_CATEGORY_IDS)

            val extractSession = SessionRandomExtract(taskUUID,sessionId,startTime,searchKeywords,clickCategories) //大写S和小写s是分别两个类 取名字太难了我

            extractSessionArrayBuffer += extractSession

          }
          // index自增
          index += 1
        }
        extractSessionArrayBuffer

    }

    // todo:倒入数据库
    import sparkSession.implicits._
    extractSessionRDD.toDF().write
      .format("jdbc")
      .option("url",ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("user",ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .option("dbtable","session_random_extract")
      .mode(SaveMode.Append)
      .save()

    /*session_random_extract 就在本地mysql里啦*/

  }


主线程

def main(args: Array[String]): Unit = {

    // 获取筛选条件
    val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)

    // 获取筛选条件的JsonObject
    val taskParam = JSONObject.fromObject(jsonStr)

    // 创建全局唯一的主键
    val taskUUID = UUID.randomUUID().toString

    // 创建SparkConf
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("session")

    // 创建SparkSession (包含SparkContext)
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // 获取原始动作表
    // actionRDD:RDD[UserVisitAction]
    val actionRDD = getOriActionRDD(sparkSession,taskParam)

    // 测试1打印输出 先确认下数据获取成功
//    actionRDD.foreach(println(_))

    // map-----sessionID2ActionRDD:RDD[(sessionID,UserVieitAction)]
    val sessionID2ActionRDD = actionRDD.map(item => (item.session_id, item)) //item就是平时练习的x

    // groupByKey-----sessionID2GroupActionRDD: RDD[(sessionID, Iterable[UserVisitAction])]
    val session2GroupActionRDD = sessionID2ActionRDD.groupByKey()

    session2GroupActionRDD.cache()


    //todo:聚合数据
    //测试2打印输出
//    session2GroupActionRDD.foreach(println(_))

    //测试3打印输出
//    val userId2AggrInfoRDD = getSessionFullInfo(sparkSession, session2GroupActionRDD)
//    userId2AggrInfoRDD.foreach(println(_))

    //4打印输出
    val sessionId2FullInfoRDD  = getSessionFullInfo(sparkSession, session2GroupActionRDD)
    sessionId2FullInfoRDD .foreach(println(_))

    //至此聚合完成,开始过滤操作

    //5 过滤
    //todo:过滤

        //对自定义累加器进行注册
    val sessionAccumulator =new SessionAccumulator
    sparkSession.sparkContext.register(sessionAccumulator)

        //在过滤过程中完成了累加器的更新操作
        //sessionId2FilterRDD:RDD[(sessionId,fullInfo)]是所有符合过滤条件的数组组成的RDD
        //getSessionFilteredRDD:实现根据限制条件对session数据进行过滤,并完成累加的更新
    val sessionId2FilteredRDD =getSessionFilteredRDD(taskParam,sessionId2FullInfoRDD,sessionAccumulator) //sessionAccumulator作为参数传进去
        //s输出
    sessionId2FilteredRDD.foreach(println(_)) //需要一个action操作

    //6 计算比率 存入mysql数据库
    //todo: 计算比率 存入mysql数据库
    getSessionRatio(sparkSession,taskUUID,sessionAccumulator.value)


    // 需求二:Session随机抽取
    // 7
    // sessionId2FilteredRDD:RDD[(sid,fullInfo)]
    sessionRandomExtract(sparkSession,taskUUID,sessionId2FilteredRDD)



  }

2018-06-14 21:31:51 minemine999 阅读数 2549
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35084 人正在学习 去看看 张长志

随机森林算法原理

随机森林是由多个决策树构成的森林,算法分类结果由决策树的投票结果得到,其属于集成学习中的bagging方法。算法的主要原理如下:

算法流程如下:

  1. 假设为一棵决策树,其中每个决策树的抽样方式为重抽样,独立抽样次,每次随机抽取个样本,就可组成个训练集数据集,且它们是相互独立同分布的。
  2. 单棵决策树的生长原则遵循:每次从数据集的全部特征属性中随机选取个来训练,并比较评估效果。选择内节点的分裂属性中评估效果最好的属性进行分裂,并且遵循节点不纯度的原则。决策树由根节点遍历向下分裂。那么由一系列的决策树组合得到随机森林,其中是独立同分布的随机变量。
  3.  对于分裂后的2个或以上的子节点,继续进行分裂直到数据集S能够正确评估,或者属性全部用完。每棵决策树都不进行剪枝,使其充分生长。
  4. 所生成的棵决策树构成了随机森林。随机性表现在两个方面:每棵树训练集的抽取过程与节点分裂时的特征属性的选择过程。这样各树之间的相关性很低。对于新样本输入时,根据训练好的模型,每一棵树都会对其评估一个值,根据投票机制给出样本的类别标签或者根据均值法给出回归结果。

 

并行化计算

弹性分布式缓存(ResilientDistributed Dataset,RDD),是Spark最核心的抽象概念,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD可以被加载内存中,每次对RDD数据集的操作之后的结果直接存放在内存中,当后续需要被调用时直接内存输入,省去了Map Reduce大量的磁盘IO操作。

RDD的操作分为两类:Transaction与Action。其中Transaction是惰性执行的,惰性执行表示真正需要时才被执行,这里是在需要具体的Action去触发才会开始执行,每个Action的触发都会提交一个Job。如图2.1所示,首先通过textfile操作从外部存储HDFS中读取文件,构建两个RDD实例A和实例C,然后A做flatMap和Map转换操作,对C做Map型操作和reduceByKey转换操作,最后对得到的B和E两个做联合操作,并通过saveAsSequenceFlie操作将最终的F实例持久化到外部存储系统HDFS上。

图2.1 RDD操作流程示意图

由此可见,Spark在内存中分布式的存储和迭代并行处理数据,最终将数据整合并保存起来。RDD的依赖分为窄依赖和宽依赖,窄依赖指父RDD的每个分区都只被子RDD的一个分区所使用,例如map、filter;宽依赖指父RDD的分区被多个子RDD的分区所依赖,例如groupByKey,reduceByKey等操作。如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。 这种划分有两个用处。首先,窄依赖支持在一个结点上管道化执行。例如基于一对一的关系,可以在 filter之后执行 map。其次,窄依赖支持更高效的故障还原。因为对于窄依赖,只有丢失的父 RDD的分区需要重新计算。而对于宽依赖,一个结点的故障可能导致来自所有父RDD的分区丢失,因此就需要完全重新执行。因此对于宽依赖,Spark会在持有各个父分区的结点上,将中间数据持久化来简化故障还原,就像 MapReduce会持久化 map 的输出一样。

图2.2 RDD的窄依赖和宽依赖

如图2.2所示,其中每一个方框表示一个RDD,其内部阴影表示RDD的分区。对于窄依赖,可以进行pipeline操作,即允许在单个集群节点流水线式地执行,这个节点可以计算所有父级分区。对RDD依赖可以从下面两个方面理解:

(1).  依赖本身是描述两个RDD之间的关系,但一个RDD可以与多个RDD有有依赖关系;

(2).  宽依赖和窄依赖的判断:在RDD的各个分区中对父RDD的分区的依赖关系。

Spark将数据在分布式环境下分区,然后将作业转化为DAG,并分阶段进行DAG的调用和任务的分布式并行处理。

图2.3 DAG典型示意图

 

信用卡诈骗案例分析

1) 数据集简介

本数据集采用Kaggle上欧洲信用卡使用者的消费数据creditcard.csv,共有284807条样本数据,其中492条消费记录属于信用卡诈骗,其余均为正常交易样本数据,数据属性特征序列:time,v1,v2,…,v28,amount和类别标签class,1表示欺骗样本,0表示正常,在实际情况中,由于诈骗的样本案例较少,实际正常的样本过多所以该样本存在严重的不平衡现象,采用一般的诸如:SVM,朴素贝叶斯等分类算法会造成严重过拟合导致预测结果偏向于正常样本,而随机森林是采用决策树融合的方式,其两大机制样本随机抽样和属性随机抽样能够很好的针对不平衡样本进行分类决策,同时减少过拟合。

运行环境配置如下:

表3.2 运行环境配置

应用

版本

Windows

10

Spark

2.3.0

Python

3.5

Jupyter notebook

4.3.0

pyspark

/

Java

1.8

Hadoop

2.7.6

 

2)  算法实现:

算法实现步骤:

  1. 构建RDD:从外部存储系统即磁盘上加载数据;
  2. 对加载的数据集进行格式转换,训练集和测试集划分;
  3. 模型训练;
  4. 模型预测和准确度;
  5.  模型评估;

 

3) 算法输出执行效果:

图3.1算法执行总结

表3.2 模型评估

模型精度

99.967%

训练时间

8.978s

召回率

83%

ROC

95.073

 

附录代码

from pyspark.context import SparkContext

from pyspark.sql.session import SparkSession

#通用配置

CSV_PATH = "creditcard.csv"

APP_NAME = "Random Forest Example"

SPARK_URL = "local[*]"

RANDOM_SEED = 13579

TRAINING_DATA_RATIO = 0.7

RF_NUM_TREES = 3

RF_MAX_DEPTH = 4

RF_NUM_BINS = 32


#数据集加载

spark = SparkSession.builder \

   .appName(APP_NAME) \

   .master(SPARK_URL) \

   .getOrCreate()


df = spark.read \

   .options(header = "true", inferschema = "true") \

   .csv(CSV_PATH)


print("Total number of rows: %d" %df.count())

  
#将DataFrame数据类型转换成RDD的DataFrame

from pyspark.mllib.linalg import Vectors

from pyspark.mllib.regression importLabeledPoint


transformed_df = df.rdd.map(lambdarow:LabeledPoint(row[-1],Vectors.dense(row[0:-1])))

splits =[TRAINING_DATA_RATIO,1.0-TRAINING_DATA_RATIO]

training_data,test_data =transformed_df.randomSplit(splits,RANDOM_SEED)


print("Number of training setrow:%d"%training_data.count())

print("Number oftest set rows:%d"%test_data.count())


 #随机森林算法模型训练

from pyspark.mllib.tree import RandomForest

from time import *

start_time = time()

model =RandomForest.trainClassifier(training_data,numClasses=2,categoricalFeaturesInfo={},\

                                   numTrees=RF_NUM_TREES,featureSubsetStrategy='auto',impurity="gini",\

                                    maxDepth=RF_MAX_DEPTH,maxBins=RF_NUM_BINS,seed=RANDOM_SEED)


end_time = time()

elapsed_time = end_time - start_time

print("Time to train model:%.3fseconds"% elapsed_time)


#预测和计算准确度

predictions =model.predict(test_data.map(lambda x:x.features))

labels_and_predictions = test_data.map(lambdax:x.label).zip(predictions)

acc = labels_and_predictions.filter(lambda x:x[0] == x[1]).count() / float(test_data.count())

print("Modelaccuracy:%.3f%%"%(acc*100))


#模型评估

from pyspark.mllib.evaluation import BinaryClassificationMetrics


strart_time = time()

metrics =BinaryClassificationMetrics(labels_and_predictions)

print("Area under Predicton/Recall(PR)curve:%.f"%(metrics.areaUnderPR*100))

print("Area under Receiver OperatingCharacteristic(ROC) curve:%.3f"%(metrics.areaUnderROC*100))


end_time = time()

elapsed_time = end_time - start_time

print("Time to evaluate model:%.3fseconds"%elapsed_time)

 

没有更多推荐了,返回首页