精华内容
下载资源
问答
  • 需求分析 首先我们想要得到的是在符合条件的action中,统计排名前十的热门商品.并且排名的依据是根据点击数量、下单数量、支付数量的次序进行排序的.所以通过逆推: top10商品–>(id,(clickCount=83|orderCount=67...

    做什么?

    在符合条件的用户行为数据中,获取点击、下单和支付数量排名前10的品类。在Top10的排序中,按照点击数量、下单数量、支付数量的次序进行排序,即优先考虑点击数量。

    需求分析

    在这里插入图片描述
    首先我们想要得到的是在符合条件的action中,统计排名前十的热门商品.并且排名的依据是根据点击数量、下单数量、支付数量的次序进行排序的.所以通过逆推:
    top10商品–>(id,(clickCount=83|orderCount=67|payCount=63))------>分别统计(id,clickCount=…),(id,orderCount=…)--------------->需要得到符合条件的原始数据

    步骤分析

    1. 得到符合需求一中过滤条件的原始数据—join算子
        val actionRdd=serverOne.getOriActionRDD(session,task);
        val sessionId2ActionRDD = actionRdd.map{
          item => (item.session_id, item)
        }
        val sessionId2FilterActionRDD=sessionId2ActionRDD.join(FilterInfo).map {
          case (sessionId,(action,info))=>{
            (sessionId,action);
          }
        }
    
    1. 获取所有发生过点击、下单、支付行为的categoryId
     var cid2CidRdd=sessionId2FilterActionRDD.flatMap{
          case(sessionId,action: UserVisitAction)=>{
            val categoryBuffer=new ArrayBuffer[(Long,Long)]();
            // 点击行为
            if(action.click_category_id != -1){
              categoryBuffer += ((action.click_category_id, action.click_category_id))
            }else if(action.order_category_ids != null){
              for(orderCid <- action.order_category_ids.split(","))
                categoryBuffer += ((orderCid.toLong, orderCid.toLong))
            }else if(action.pay_category_ids != null){
              for(payCid <- action.pay_category_ids.split(","))
                categoryBuffer += ((payCid.toLong, payCid.toLong))
            }
            categoryBuffer
          }
        }
        cid2CidRdd=cid2CidRdd.distinct();
    
    1. 分别统计点击、下单、支付行为的数量:
     // 第二步:统计品类的点击次数、下单次数、付款次数
        val cid2ClickCountRDD = getClickCount(sessionId2FilterActionRDD)
    
        val cid2OrderCountRDD = getOrderCount(sessionId2FilterActionRDD)
    
        val cid2PayCountRDD = getPayCount(sessionId2FilterActionRDD)
    
    def getClickCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)])={
         val clickFilterRDD=sessionId2FilterActionRDD.filter{
           case (sessionId,action: UserVisitAction)=>{
              action.click_category_id != -1L;
           }
         }
        val clickNumRDD = clickFilterRDD.map{
          case (sessionId, action) => (action.click_category_id, 1L)
        }
    
        clickNumRDD.reduceByKey(_+_)
      }
      def getOrderCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)])={
         val orderFilterRDD=sessionId2FilterActionRDD.filter(item=>item._2.order_category_ids!=null)
         val orderNumRDD=orderFilterRDD.flatMap{
           case (sessionId,action)=>{
    
              for(id<-action.order_category_ids.split(",")){
    
              }
             action.order_category_ids.split(",").map(item=>(item.toLong,1L));
           }
         }
        orderNumRDD.reduceByKey(_+_);
      }
      def getPayCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {
        val payFilterRDD = sessionId2FilterActionRDD.filter(item => item._2.pay_category_ids != null)
    
        val payNumRDD = payFilterRDD.flatMap{
          case (sid, action) =>
            action.pay_category_ids.split(",").map(item => (item.toLong, 1L))
        }
    
        payNumRDD.reduceByKey(_+_)
      }
    
    1. 用左连接算子,统计总的数据,最后格式为:categoryId,str,str代表总的数据:(clickCount=83|orderCount=67|payCount=63)
    def getFullCount(cid2CidRDD: RDD[(Long, Long)], cid2ClickCountRDD: RDD[(Long, Long)], cid2OrderCountRDD: RDD[(Long, Long)], cid2PayCountRDD: RDD[(Long, Long)]) = {
        val cid2ClickInfoRDD=cid2CidRDD.leftOuterJoin(cid2ClickCountRDD).map{
          case (cId,(categoryId,option))=>{
            val clickCount=if (option.isDefined)option.getOrElse(0);
            val aggrCount = Constants.FIELD_CATEGORY_ID + "=" + cId + "|" +
              Constants.FIELD_CLICK_COUNT + "=" + clickCount
    
            (cId, aggrCount)
          }
        }
        val cid2OrderInfoRDD = cid2ClickInfoRDD.leftOuterJoin(cid2OrderCountRDD).map{
          case (cid, (clickInfo, option)) =>
            val orderCount = if(option.isDefined) option.get else 0
            val aggrInfo = clickInfo + "|" +
              Constants.FIELD_ORDER_COUNT + "=" + orderCount
    
            (cid, aggrInfo)
        }
    
        val cid2PayInfoRDD = cid2OrderInfoRDD.leftOuterJoin(cid2PayCountRDD).map{
          case (cid, (orderInfo, option)) =>
            val payCount = if(option.isDefined) option.get else 0
            val aggrInfo = orderInfo + "|" +
              Constants.FIELD_PAY_COUNT + "=" + payCount
            (cid, aggrInfo)
        }
        cid2PayInfoRDD;
    
      }
    
    1. 自定义排序器,将数据转化为(sortKey,info)格式,接着用sorkByKey及逆行排序
      自定义排序:
    package server
    
    case class SortKey(clickCount:Long, orderCount:Long, payCount:Long) extends Ordered[SortKey]{
      // this.compare(that)
      // this compare that
      // compare > 0   this > that
      // compare <0    this < that
      override def compare(that: SortKey): Int = {
        if(this.clickCount - that.clickCount != 0){
          return (this.clickCount - that.clickCount).toInt
        }else if(this.orderCount - that.orderCount != 0){
          return (this.orderCount - that.orderCount).toInt
        }else{
          return (this.payCount - that.payCount).toInt
        }
      }
    }
    
    
    val sortRDD=cid2FullCountRDD.map{
          case (cId,info)=>{
            val clickCount = StringUtil.getFieldFromConcatString(info, "\\|", Constants.FIELD_CLICK_COUNT).toLong
            val orderCount = StringUtil.getFieldFromConcatString(info, "\\|", Constants.FIELD_ORDER_COUNT).toLong
            val payCount = StringUtil.getFieldFromConcatString(info, "\\|", Constants.FIELD_PAY_COUNT).toLong
    
            val sortKey = SortKey(clickCount, orderCount, payCount)
            (sortKey, info)
          }
        }
        //5.排序
        val top10=sortRDD.sortByKey(false).take(10);
    
    1. 数据封装,写入数据库
    //6.封装数据,写进数据库
        val top10CategoryRDD = sparkSession.sparkContext.makeRDD(top10).map{
          case (sortKey, countInfo) =>
            val cid = StringUtil.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
            val clickCount = sortKey.clickCount
            val orderCount = sortKey.orderCount
            val payCount = sortKey.payCount
            Top10Category(taskUUID, cid, clickCount, orderCount, payCount)
        }
    
        //保存到数据库
        /* import sparkSession.implicits._
         top10CategoryRDD.toDF().write
           .format("jdbc")
           .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
           .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
           .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
           .option("dbtable", "top10_category_0308")
           .mode(SaveMode.Append)
           .save*/
    

    完整代码:

    package server
    
    import commons.constant.Constants
    import commons.model.{Top10Category, UserVisitAction}
    import commons.utils.StringUtil
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    import scala.collection.mutable.ArrayBuffer
    
    class serverThree  extends Serializable {
    
    
    
    
      def top10PopularCategories(sparkSession: SparkSession,
                                 taskUUID: String,
                                 sessionId2FilterActionRDD: RDD[(String, UserVisitAction)])={
        //1.将所有基本数据,转化成(cId,cId)格式的总数据
        var cid2CidRdd=sessionId2FilterActionRDD.flatMap{
          case(sessionId,action: UserVisitAction)=>{
            val categoryBuffer=new ArrayBuffer[(Long,Long)]();
            // 点击行为
            if(action.click_category_id != -1){
              categoryBuffer += ((action.click_category_id, action.click_category_id))
            }else if(action.order_category_ids != null){
              for(orderCid <- action.order_category_ids.split(","))
                categoryBuffer += ((orderCid.toLong, orderCid.toLong))
            }else if(action.pay_category_ids != null){
              for(payCid <- action.pay_category_ids.split(","))
                categoryBuffer += ((payCid.toLong, payCid.toLong))
            }
            categoryBuffer
          }
        }
        cid2CidRdd=cid2CidRdd.distinct();
        // 第二步:统计品类的点击次数、下单次数、付款次数
        val cid2ClickCountRDD = getClickCount(sessionId2FilterActionRDD)
    
        val cid2OrderCountRDD = getOrderCount(sessionId2FilterActionRDD)
    
        val cid2PayCountRDD = getPayCount(sessionId2FilterActionRDD)
    
        //3.根据左连接,将总的数据cid2CidRdd和第二部得到的数据一个个进行连接,创造出cid:str
        //其中,str代表count=32|order=15.......
        val cid2FullCountRDD =  getFullCount(cid2CidRdd,cid2ClickCountRDD,cid2OrderCountRDD,cid2PayCountRDD);
    
        //4.自定义排序器,将数据转化为(sortKey,info)
        val sortRDD=cid2FullCountRDD.map{
          case (cId,info)=>{
            val clickCount = StringUtil.getFieldFromConcatString(info, "\\|", Constants.FIELD_CLICK_COUNT).toLong
            val orderCount = StringUtil.getFieldFromConcatString(info, "\\|", Constants.FIELD_ORDER_COUNT).toLong
            val payCount = StringUtil.getFieldFromConcatString(info, "\\|", Constants.FIELD_PAY_COUNT).toLong
    
            val sortKey = SortKey(clickCount, orderCount, payCount)
            (sortKey, info)
          }
        }
        //5.排序
        val top10=sortRDD.sortByKey(false).take(10);
        top10.foreach(println);
        //6.封装数据,写进数据库
        val top10CategoryRDD = sparkSession.sparkContext.makeRDD(top10).map{
          case (sortKey, countInfo) =>
            val cid = StringUtil.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
            val clickCount = sortKey.clickCount
            val orderCount = sortKey.orderCount
            val payCount = sortKey.payCount
            Top10Category(taskUUID, cid, clickCount, orderCount, payCount)
        }
    
        //保存到数据库
        /* import sparkSession.implicits._
         top10CategoryRDD.toDF().write
           .format("jdbc")
           .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
           .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
           .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
           .option("dbtable", "top10_category_0308")
           .mode(SaveMode.Append)
           .save*/
        top10
    
      }
    
      def getFullCount(cid2CidRDD: RDD[(Long, Long)], cid2ClickCountRDD: RDD[(Long, Long)], cid2OrderCountRDD: RDD[(Long, Long)], cid2PayCountRDD: RDD[(Long, Long)]) = {
        val cid2ClickInfoRDD=cid2CidRDD.leftOuterJoin(cid2ClickCountRDD).map{
          case (cId,(categoryId,option))=>{
            val clickCount=if (option.isDefined)option.getOrElse(0);
            val aggrCount = Constants.FIELD_CATEGORY_ID + "=" + cId + "|" +
              Constants.FIELD_CLICK_COUNT + "=" + clickCount
    
            (cId, aggrCount)
          }
        }
        val cid2OrderInfoRDD = cid2ClickInfoRDD.leftOuterJoin(cid2OrderCountRDD).map{
          case (cid, (clickInfo, option)) =>
            val orderCount = if(option.isDefined) option.get else 0
            val aggrInfo = clickInfo + "|" +
              Constants.FIELD_ORDER_COUNT + "=" + orderCount
    
            (cid, aggrInfo)
        }
    
        val cid2PayInfoRDD = cid2OrderInfoRDD.leftOuterJoin(cid2PayCountRDD).map{
          case (cid, (orderInfo, option)) =>
            val payCount = if(option.isDefined) option.get else 0
            val aggrInfo = orderInfo + "|" +
              Constants.FIELD_PAY_COUNT + "=" + payCount
            (cid, aggrInfo)
        }
        cid2PayInfoRDD;
    
      }
    
    
      def getClickCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)])={
         val clickFilterRDD=sessionId2FilterActionRDD.filter{
           case (sessionId,action: UserVisitAction)=>{
              action.click_category_id != -1L;
           }
         }
        val clickNumRDD = clickFilterRDD.map{
          case (sessionId, action) => (action.click_category_id, 1L)
        }
    
        clickNumRDD.reduceByKey(_+_)
      }
      def getOrderCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)])={
         val orderFilterRDD=sessionId2FilterActionRDD.filter(item=>item._2.order_category_ids!=null)
         val orderNumRDD=orderFilterRDD.flatMap{
           case (sessionId,action)=>{
    
              for(id<-action.order_category_ids.split(",")){
    
              }
             action.order_category_ids.split(",").map(item=>(item.toLong,1L));
           }
         }
        orderNumRDD.reduceByKey(_+_);
      }
      def getPayCount(sessionId2FilterActionRDD: RDD[(String, UserVisitAction)]) = {
        val payFilterRDD = sessionId2FilterActionRDD.filter(item => item._2.pay_category_ids != null)
    
        val payNumRDD = payFilterRDD.flatMap{
          case (sid, action) =>
            action.pay_category_ids.split(",").map(item => (item.toLong, 1L))
        }
    
        payNumRDD.reduceByKey(_+_)
      }
    
    }
    
    

    排序器:

    package server
    
    case class SortKey(clickCount:Long, orderCount:Long, payCount:Long) extends Ordered[SortKey]{
      // this.compare(that)
      // this compare that
      // compare > 0   this > that
      // compare <0    this < that
      override def compare(that: SortKey): Int = {
        if(this.clickCount - that.clickCount != 0){
          return (this.clickCount - that.clickCount).toInt
        }else if(this.orderCount - that.orderCount != 0){
          return (this.orderCount - that.orderCount).toInt
        }else{
          return (this.payCount - that.payCount).toInt
        }
      }
    }
    
    
    展开全文
  • 经过前期的了解和项目搭建,相信大家最关心的是能不能跑个demo出来...需求分析:我们要基于埋点数据找到包含商品访问的记录,解析数据成<K,V> K为商品Id,V为次数。然后排序取Top10或者Top20等等 数据存储: 当前

    经过前期的了解和项目搭建,相信大家最关心的是能不能跑个demo出来,基于一些常见的需求场景,来跑一波数据分析,这样学习才能有继续的动力。因此后续也不会有先去看它的代码实现原理啥的,还是那句话,现在这个阶段如果一头扎进去,出不来的。

    数据准备:
    网站埋点数据。
    需求:后台运营想知道我们当前哪些的访问量是最高的,基于火爆商品的访问做一些运营活动。
    需求分析:我们要基于埋点数据找到包含商品访问的记录,解析数据成<K,V> K为商品Id,V为次数。然后排序取Top10或者Top20等等
    数据存储:
    当前数据还是存储在mysql(后期迁移到hive内)
    代码环境:
    基于前面的环境的项目结构做编码

    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
     *
     * 贴出的是样例代码,因为不同的数据源读取方式不一样,但是思路是一样的
     *
    */ 
    object SparkSqlGetMysqlTest {
      def main(args: Array[String]): Unit = {
        var sc = new SparkConf()
          .setMaster("local")
          .setAppName("分区读取Mysql")
        val spark = SparkSession.builder().config(sc).getOrCreate()
        val dataFromMysql: DataFrame = spark.read
          .format("jdbc")
          .option("url", "jdbc:mysql")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("dbtable", "MYSQL_DB")
          .option("user", "xxxx")
          .option("password", "xxxx")
          .option("partitionColumn", "ID")
          .option("lowerBound", "0")
          .option("upperBound", "500000")
          .option("numPartitions", "20")
          .load()
        dataFromMysql.createOrReplaceTempView("MYSQL_DB_TEMP")
        val goods = spark.sql("取出属于商品详情页访问的那一条数据,和写sql类似")
        val allGoods: RDD[(String, Int)] = goods.rdd
    	  //过滤条件
    	  .filter(s=> s.getString(0).contains("/p/"))
    	  //加工数据,做一些数据修正或者数据格式规范化操作
          .map(record => record.getString(0).replace("v", "V").replace("HTML", "html"))
    	  //最终加工成<K,V>(思路来源于Spark的文字统计)
          .map(record => (record.substring(1,7), 1))
        //reduceByKey 统计某个sku的访问量<sku,数量>
        val result = allSkuCount.reduceByKey(_ + _)
    	//倒序排 取出top10
        val top10 = result.sortBy(_._2, false).take(20)
        top10.foreach(println)
      }
    }
    

    partitionColumn(数据库内用于读取区分的Id);
    lowerBound(下界);
    upperBound(上界);
    numPartitions(分区数量);
    这是Spark读取mysq分区读取的方式,分区的方式是定出当前多少个partion,每个partion内读取哪些数据。

    例如当前我的数据库内有56万的数据。。upperBound = 500000 ,lowerBound =0 , numPartitions = 20,partitionColumn = Id用的是主键
    读取的格式就是如下:
    [1]. Id < 25000
    [2]. 25000<=id<50000
    [3]. 50000<=id<75000
    [4]. …



    [20]. 500000<=id
    这样一来,每个分区的数量差不多是均衡的(但是这边有个问题我没有去研究,那就是随着数据量的提升,这边的分区方式是动态在变?还是怎么配置,没有想好)。

    数据输出格式

    (100001,10000)
    (100002,6000)
    (300001,4000)
    (400001,3900)
    (104001,1000)
    

    标识就是每个商品被访问的总量倒叙排列

    关于分区读取的,可以写个小栗子看它是怎么读取。

      def main(args: Array[String]): Unit = {
        var sc = new SparkConf()
          .setMaster("spark://192.168.2.142:7077")
    //      .setMaster("local")
          .setAppName("分区读取Mysql")
        val spark = SparkSession.builder().config(sc).getOrCreate()
        val dataFromMysql : DataFrame= spark.read
          .format("jdbc")
          .option("url", "jdbc:mysql")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("dbtable", "MYSQL_DB")
          .option("user", "xxxx")
          .option("password", "xxxx")
          .option("partitionColumn","ID")
          .option("lowerBound","0")
          .option("upperBound","500000")
          .option("numPartitions","20")
          .load()
        dataFromMysql.rdd.saveAsTextFile("file:///usr/local/bigdata/output.txt")
    
    展开全文
  • 环境 ubuntu14、flink1.7.2、... 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品 实现 创建maven项目,命名UserBehaviorAnalysis,其pom内容如下: 4.0.0 com.ustc UserBehaviorAnalysis pom 1.0-SNAPSH
  • 京东口红top 30分析

    2017-11-02 22:17:00
    一、抓取商品id 分析网页源码,发现...获取id后,分析商品页面可知道每个商品页面就是id号不同,可构造url: 将获取的id和构造的url保存在列表里,如下源码: 1 def get_product_url(url): 2 global ...

    一、抓取商品id

    分析网页源码,发现所有id都是在class=“gl-item”的标签里,可以利用bs4的select方法查找标签,获取id:

     

    获取id后,分析商品页面可知道每个商品页面就是id号不同,可构造url:

     

    将获取的id和构造的url保存在列表里,如下源码:

     1 def get_product_url(url):
     2     global pid
     3     global links
     4     req = urllib.request.Request(url)
     5     req.add_header("User-Agent",
     6                    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
     7                    '(KHTML, like Gecko) Chrome/60.0.3112.101 Safari/537.36')
     8     req.add_header("GET", url)
     9     content = urllib.request.urlopen(req).read()
    10     soup = bs4.BeautifulSoup(content, "lxml")
    11     product_id = soup.select('.gl-item')
    12     for i in range(len(product_id)):
    13         lin = "https://item.jd.com/" + str(product_id[i].get('data-sku')) + ".html"
    14         # 获取链接
    15         links.append(lin)
    16         # 获取id
    17         pid.append(product_id[i].get('data-sku'))

     二、获取商品信息

    通过商品页面获取商品的基本信息(商品名,店名,价格等):

     1         product_url = links[i]
     2         req = urllib.request.Request(product_url)
     3         req.add_header("User-Agent",
     4                        'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0')
     5         req.add_header("GET", product_url)
     6         content = urllib.request.urlopen(req).read()
     7         # 获取商品页面源码
     8         soup = bs4.BeautifulSoup(content, "lxml")
     9         # 获取商品名
    10         sku_name = soup.select_one('.sku-name').getText().strip()
    11         # 获取商店名
    12         try:
    13             shop_name = soup.find(clstag="shangpin|keycount|product|dianpuname1").get('title')
    14         except:
    15             shop_name = soup.find(clstag="shangpin|keycount|product|zcdpmc_oversea").get('title')
    16         # 获取商品ID
    17         sku_id = str(pid[i]).ljust(20)
    18         # 获取商品价格

    通过抓取评论的json页面获取商品热评、好评率、评论:

    获取热评源码:

     1 def get_product_comment(product_id):
     2     comment_url = 'https://club.jd.com/comment/productPageComments.action?' \
     3                   'callback=fetchJSON_comment98vv16496&' \
     4                   'productId={}&' \
     5                   'score=0&' \
     6                   'sortType=6&' \
     7                   'page=0&' \
     8                   'pageSize=10' \
     9                   '&isShadowSku=0'.format(str(product_id))
    10     response = urllib.request.urlopen(comment_url).read().decode('gbk', 'ignore')
    11     response = re.search(r'(?<=fetchJSON_comment98vv16496\().*(?=\);)', response).group(0)
    12     response_json = json.loads(response)
    13     # 获取商品热评
    14     hot_comments = []
    15     hot_comment = response_json['hotCommentTagStatistics']
    16     for h_comment in hot_comment:
    17         hot = str(h_comment['name'])
    18         count = str(h_comment['count'])
    19         hot_comments.append(hot + '(' + count + ')')
    20     return ','.join(hot_comments)

     获取好评率源码:

     1 def get_good_percent(product_id):
     2     comment_url = 'https://club.jd.com/comment/productPageComments.action?' \
     3                   'callback=fetchJSON_comment98vv16496&' \
     4                   'productId={}&' \
     5                   'score=0&' \
     6                   'sortType=6&' \
     7                   'page=0&' \
     8                   'pageSize=10' \
     9                   '&isShadowSku=0'.format(str(product_id))
    10     response = requests.get(comment_url).text
    11     response = re.search(r'(?<=fetchJSON_comment98vv16496\().*(?=\);)', response).group(0)
    12     response_json = json.loads(response)
    13     # 获取好评率
    14     percent = response_json['productCommentSummary']['goodRateShow']
    15     percent = str(percent) + '%'
    16     return percent

     获取评论源码:

     1 def get_comment(product_id, page):
     2     global word
     3     comment_url = 'https://club.jd.com/comment/productPageComments.action?' \
     4                   'callback=fetchJSON_comment98vv16496&' \
     5                   'productId={}&' \
     6                   'score=0&' \
     7                   'sortType=6&' \
     8                   'page={}&' \
     9                   'pageSize=10' \
    10                   '&isShadowSku=0'.format(str(product_id), str(page))
    11     response = urllib.request.urlopen(comment_url).read().decode('gbk', 'ignore')
    12     response = re.search(r'(?<=fetchJSON_comment98vv16496\().*(?=\);)', response).group(0)
    13     response_json = json.loads(response)
    14     # 写入评论.csv
    15     comment_file = open('{0}\\评论.csv'.format(path), 'a', newline='', encoding='utf-8', errors='ignore')
    16     write = csv.writer(comment_file)
    17     # 获取用户评论
    18     comment_summary = response_json['comments']
    19     for content in comment_summary:
    20         # 评论时间
    21         creation_time = str(content['creationTime'])
    22         # 商品颜色
    23         product_color = str(content['productColor'])
    24         # 商品名称
    25         reference_name = str(content['referenceName'])
    26         # 客户评分
    27         score = str(content['score'])
    28         # 客户评论
    29         content = str(content['content']).strip()
    30         # 记录评论
    31         word.append(content)
    32         write.writerow([product_id, reference_name, product_color, creation_time, score, content])
    33     comment_file.close()

     整体获取商品信息源码:

     1 def get_product_info():
     2     global pid
     3     global links
     4     global word
     5     # 创建评论.csv
     6     comment_file = open('{0}\\评论.csv'.format(path), 'w', newline='')
     7     write = csv.writer(comment_file)
     8     write.writerow(['商品id', '商品', '颜色', '评论时间', '客户评分', '客户评论'])
     9     comment_file.close()
    10     # 创建商品.csv
    11     product_file = open('{0}\\商品.csv'.format(path), 'w', newline='')
    12     product_write = csv.writer(product_file)
    13     product_write.writerow(['商品id', '所属商店', '商品', '价格', '商品好评率', '商品评价'])
    14     product_file.close()
    15 
    16     for i in range(len(pid)):
    17         print('[*]正在收集数据。。。')
    18         product_url = links[i]
    19         req = urllib.request.Request(product_url)
    20         req.add_header("User-Agent",
    21                        'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0')
    22         req.add_header("GET", product_url)
    23         content = urllib.request.urlopen(req).read()
    24         # 获取商品页面源码
    25         soup = bs4.BeautifulSoup(content, "lxml")
    26         # 获取商品名
    27         sku_name = soup.select_one('.sku-name').getText().strip()
    28         # 获取商店名
    29         try:
    30             shop_name = soup.find(clstag="shangpin|keycount|product|dianpuname1").get('title')
    31         except:
    32             shop_name = soup.find(clstag="shangpin|keycount|product|zcdpmc_oversea").get('title')
    33         # 获取商品ID
    34         sku_id = str(pid[i]).ljust(20)
    35         # 获取商品价格
    36         price_url = 'https://p.3.cn/prices/mgets?pduid=1580197051&skuIds=J_{}'.format(pid[i])
    37         response = requests.get(price_url).content
    38         price = json.loads(response)
    39         price = price[0]['p']
    40         # 写入商品.csv
    41         product_file = open('{0}\\商品.csv'.format(path), 'a', newline='', encoding='utf-8', errors='ignore')
    42         product_write = csv.writer(product_file)
    43         product_write.writerow(
    44             [sku_id, shop_name, sku_name, price, get_good_percent(pid[i]), get_product_comment(pid[i])])
    45         product_file.close()
    46         pages = int(get_comment_count(pid[i]))
    47         word = []
    48         try:
    49             for j in range(pages):
    50                 get_comment(pid[i], j)
    51         except Exception as e:
    52             print("[!!!]{}商品评论加载失败!".format(pid[i]))
    53             print("[!!!]Error:{}".format(e))
    54 
    55         print('[*]第{}件商品{}收集完毕!'.format(i + 1, pid[i]))56         # 的生成词云
    57         word = " ".join(word)
    58         my_wordcloud = WordCloud(font_path='C:\Windows\Fonts\STZHONGS.TTF', background_color='white').generate(word)
    59         my_wordcloud.to_file("{}.jpg".format(pid[i]))

     将商品信息和评论写入表格,生成评论词云:

     

    三、总结

            在爬取的过程中遇到最多的问题就是编码问题,获取页面的内容requset到的都是bytes类型的要decode(”gbk”),后来还是存在编码问题,最后找到一些文章说明,在后面加“ignore”可以解决,由于爬取的量太大,会有一些数据丢失,不过数据量够大也不影响对商品分析。

    转载于:https://www.cnblogs.com/feijiii/p/7774865.html

    展开全文
  • Scala_Spark-电商平台离线分析项目-需求六各区域top3商品统计 AreaTop3Stat.scala import java.util.UUID import commons.conf.ConfigurationManager import commons.constant.Constants import commons.utils....

    Scala_Spark-电商平台离线分析项目-需求六各区域top3商品统计

    AreaTop3Stat.scala

    import java.util.UUID
    
    import commons.conf.ConfigurationManager
    import commons.constant.Constants
    import commons.utils.ParamUtils
    import net.sf.json.JSONObject
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    /**
     *
     * 需求六:各区域top3商品统计
     * 用户访问行为表
     * 商品信息表
     * 加一个城市信息
     *
     */
    object AreaTop3Stat {
    
    
    
      def main(args: Array[String]): Unit = {
        // 获取限制条件
        val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
        val taskParam = JSONObject.fromObject(jsonStr)
    
        // 创建全局唯一主键
        val taskUUID = UUID.randomUUID().toString
    
        val sparkConf = new SparkConf().setAppName("area").setMaster("local[*]")
        val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    
    
        // 1
        // todo 第一步 获得用户指定日期范围内的点击行为
        // RDD[(cityId,pid)]
        val cityId2PidRDD = getCityAndProductInfo(sparkSession,taskParam)
            // 打印测试一下sql对不对
            // cityId2PidRDD.foreach(println(_))
            /*(cityid,productid)
            (5,47)
            (3,75)
            */
    
        // 2
        // todo 第二步 获得城市信息
        // RDD[(sityId,CityAreaInfo)]
        val cityId2AreaInfoRDD = getCityAreaInfo(sparkSession)
    
        // 3
        // todo 第三步 (一)基础信息临时表
        // tmp_area_basic_info: 表中的一条数据就代表一次点击商品的行为
        getAreaPidBasicInfoTable(sparkSession,cityId2PidRDD,cityId2AreaInfoRDD)
            // 测试 输出一下这张临时表
            // sparkSession.sql("select * from tmp_area_basic_info").show()
            /*+-------+---------+----+---+
             |city_id|city_name|area|pid|
              +-------+---------+----+---+
              |      0|       北京|  华北| 58|*/
    
    
        // todo 第五步 注册自定义函数
        // 注册UDF 基础表中数据拼接 tmp_area_basic_info 把city_name和city_id拼接在一起
        sparkSession.udf.register("concat_long_string",(v1:Long,v2:String,split:String)=>{
          v1 +split +v2
        })
        // 注册UDAF 用户自定义聚合函数
        sparkSession.udf.register("group_concat_distinct",new GroupConcatDistinct)
    
        // 4
        // todo 第四步 (二)包含完整商品信息的各区域商品点击次数临时表
        // tmp_area_click_count
        getAreaProductClickCountTable(sparkSession) //因为数据在上一张表里面 所以这里不需要传参数
            // 测试 输出一下这张临时表
            // sparkSession.sql("select * from tmp_area_click_count").show()
            /*+----+---+--------+ 这张不完整
            |area|pid|click_count|
            +----+---+--------+
            |  西南| 70|      17|
            |  西北| 43|      12|
              但是我想要不丢失每个area对应的city相关数据
              group by后面又不能加上这个字段
              所以实现一个UDF函数和UDAF,注册后修改锚点4的方法的sql语句
              所以有了第五步
    
              所以完整的 tmp_area_click_count 包含完整商品信息的各区域商品点击次数临时表
              +----+---+-----------+----------+
              |area|pid|click_count|city_infos|
              +----+---+-----------+----------+
              |  西南| 62|         15|      8:成都|
              |  华东| 73|         30| 1:上海,2:南京|
             */
    
        // 5
        // todo 第六步 完成两张表的聚合
        // 先注册自定义函数  product_info表中extend_info字段是json字串 需单独处理
        // 从json串获得指定key的value的字符串
        sparkSession.udf.register("get_json_field",(json:String,field:String)=>{
          val jsonObject = JSONObject.fromObject(json)
          jsonObject.getString(field)
        })
        // tmp_area_click_count临时表 + product_info表
        getAreaPidBasicInfoInfo(sparkSession)
            // 测试输出一下
            // sparkSession.sql("select * from tmp_area_count_product_info").show()
            /* tmp_area_count_product_info 还不够人性化
            +----+----------+---+------------+--------------+-----------+
            |area|city_infos|pid|product_name|product_status|click_count|
            +----+----------+---+------------+--------------+-----------+
            |  西北|      7:西安| 43|   product43|             1|         15|
            |  西南|      8:成都| 70|   product70|             1|         30|
            进一步把状态从 0/1 改成 self/third_party
             +----+----------+---+------------+--------------+-----------+
            |area|city_infos|pid|product_name|product_status|click_count|
            +----+----------+---+------------+--------------+-----------+
            |  西南|      8:成都| 62|   product62|          Self|         15|
            |  华东| 1:上海,2:南京| 73|   product73|   Third Party|         30|
             */
    
        // 6
        // todo 第七步 获取区域Top3商品统计 写入数据库
        getTop3Product(sparkSession,taskUUID)
            // 测试
            // sparkSession.sql("select * from temp_test").show()
            /* 数据准确 地区top3商品
            +----+----------+----------+---+------------+--------------+-----------+
            |area|area_level|city_infos|pid|product_name|product_status|click_count|
            +----+----------+----------+---+------------+--------------+-----------+
            |  华东|   A_Level| 1:上海,2:南京| 27|   product27|          Self|         40|
            |  华东|   A_Level| 1:上海,2:南京| 36|   product36|   Third Party|         39|
            |  华东|   A_Level| 1:上海,2:南京| 33|   product33|   Third Party|         38|
            |  西北|   C_Level|      7:西安| 28|   product28|          Self|         24|
            |  西北|   C_Level|      7:西安| 47|   product47|   Third Party|         24|
            |  西北|   C_Level|      7:西安| 32|   product32|          Self|         23|
             */
    
    
      }
    
    
      /**
       * 锚点6的方法
       * 获取区域top3商品,写入数据库
       *
       * 用到了sql的开窗函数(排序)
       * row_number() over(PARTITION BY area ORDER BY click_count DESC) rank
       *
       * 和case when语句(分等级)
       *
       * @param sparkSession
       * @param taskUUID
       */
      def getTop3Product(sparkSession: SparkSession, taskUUID: String) = {
        /*val sql = "select area,city_infos,pid,product_name,product_status,click_count," +
          "row_number() over(PARTITION BY area ORDER BY click_count DESC) rank " +
          "from tmp_area_count_product_info"
        sparkSession.sql(sql).createOrReplaceTempView("temp_test")*/
        /* 测试结果如下
        |area|city_infos|pid|product_name|product_status|click_count|rank|
        +----+----------+---+------------+--------------+-----------+----+
        |  华东| 1:上海,2:南京| 27|   product27|          Self|         40|   1|
        |  华东| 1:上海,2:南京| 36|   product36|   Third Party|         39|   2|
        |  华东| 1:上海,2:南京| 33|   product33|   Third Party|         38|   3|
        |  华东| 1:上海,2:南京|  6|    product6|   Third Party|         37|   4|
        |  华东| 1:上海,2:南京| 18|   product18|          Self|         36|   5|
        |  华东| 1:上海,2:南京|  9|    product9|   Third Party|         36|   6|
         */
    
        val sql = "select area, " +
          "CASE " +
          "WHEN area='华北' OR area='华东' THEN 'A_Level' " +
          "WHEN area='华中' OR area='华南' THEN 'B_Level' " +
          "WHEN area='西南' OR area='西北' THEN 'C_Level' " +
          "ELSE 'D_Level' " +
          "END area_level, " +
          "city_infos,pid,product_name,product_status,click_count from " +
          "(select area,city_infos,pid,product_name,product_status,click_count," +
          "row_number() over(PARTITION BY area ORDER BY click_count DESC) rank " +
          "from tmp_area_count_product_info) t " +
          "where rank<=3"
    
        // 测试
        // sparkSession.sql(sql).createOrReplaceTempView("temp_test")
        /*
        +----+----------+----------+---+------------+--------------+-----------+
        |area|area_level|city_infos|pid|product_name|product_status|click_count|
        +----+----------+----------+---+------------+--------------+-----------+
        |  华东|   A_Level| 1:上海,2:南京| 27|   product27|          Self|         40|
        |  华东|   A_Level| 1:上海,2:南京| 36|   product36|   Third Party|         39|
        |  华东|   A_Level| 1:上海,2:南京| 33|   product33|   Third Party|         38|
        |  西北|   C_Level|      7:西安| 28|   product28|          Self|         24|
        |  西北|   C_Level|      7:西安| 47|   product47|   Third Party|         24|
        |  西北|   C_Level|      7:西安| 32|   product32|          Self|         23|
         */
    
        // todo 写入数据库
        // 用sparksql来做 需要这步
         val top3ProductRDD = sparkSession.sql(sql).rdd.map{
          case row => {
            AreaTopProduct(taskUUID,
              row.getAs[String]("area"),
              row.getAs[String]("area_level"),
              row.getAs[Long]("pid"),
              row.getAs[String]("city_infos"),
              row.getAs[Long]("click_count"),
              row.getAs[String]("product_name"),
              row.getAs[String]("product_status"))
          }
        }
    
        import sparkSession.implicits._
        top3ProductRDD.toDF().write
          .format("jdbc")
          .option("url",ConfigurationManager.config.getString(Constants.JDBC_URL))
          .option("user",ConfigurationManager.config.getString(Constants.JDBC_USER))
          .option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
          .option("dbtable","area_top3_product")
          .mode(SaveMode.Append)
          .save()
    
      }
    
    
    
      /**
       * 锚点5的方法 拼接两张表
       * tmp_area_click_count临时表: area,city_infos,pid,click_count  ---tacc
       * product_info表: product_id,product_name,extend_info  ---pi
       *
       * "extend_info"字段: 自营还是第三方
       *
       * @param sparkSession
       */
      def getAreaPidBasicInfoInfo(sparkSession: SparkSession)={
        val sql = "select tacc.area,tacc.city_infos,tacc.pid,pi.product_name," +
          "if(get_json_field(pi.extend_info,'product_status')=1,'Self','Third Party') product_status," +  // 0/1改成self/third_party 用了sql的if(条件,true结果,false结果)语句
          "tacc.click_count " +
          "from tmp_area_click_count tacc join product_info pi on tacc.pid = pi.product_id"
    
        sparkSession.sql(sql).createOrReplaceTempView("tmp_area_count_product_info")
      }
    
    
    
      /**
       * 锚点4的方法
       * 含完整商品信息的各区域商品点击次数临时表
       * tmp_area_click_count
       *
       * 表与表之间一个sql语句就完事了
       * count(*) 因为原数据表 一条记录代表依次一次点击行为
       */
      def getAreaProductClickCountTable(sparkSession: SparkSession)={
        val sql = "select area,pid,count(*) click_count," +
          "group_concat_distinct(concat_long_string(city_id,city_name,':')) city_infos " +  //这句是自定义UDAF后加的
          "from tmp_area_basic_info group by area,pid"
        sparkSession.sql(sql).createOrReplaceTempView("tmp_area_click_count")
    
      }
    
    
    
      /**
       * 锚点3的方法
       * join
       * tmp_area_basic_info 基础信息临时表
       *
       * @param sparkSession
       * @param cityId2PidRDD
       * @param cityId2AreaInfoRDD
       */
      def getAreaPidBasicInfoTable(sparkSession: SparkSession,
                                   cityId2PidRDD: RDD[(Long,Long)],
                                   cityId2AreaInfoRDD: RDD[(Long, CityAreaInfo)])={
        val areaPidInfoRDD = cityId2PidRDD.join(cityId2AreaInfoRDD).map{
          case (cityId,(pid,areaInfo)) =>
            (cityId,areaInfo.city_name,areaInfo.area,pid) // 四个字段
        }
    
        import sparkSession.implicits._
        areaPidInfoRDD
          .toDF("city_id","city_name","area","pid") //需要指定字段名
          .createOrReplaceTempView("tmp_area_basic_info")
    
    
    
      }
    
    
    
      /**
       * 锚点2 的方法
       * 获取区域信息
       * 变成一个RDD
       * @param sparkSession
       */
      def getCityAreaInfo(sparkSession: SparkSession)={
        val cityAreaInfoArray = Array((0L, "北京", "华北"), (1L, "上海", "华东"),
          (2L, "南京", "华东"), (3L, "广州", "华南"), (4L, "三亚", "华南"),
          (5L, "武汉", "华中"), (6L, "长沙", "华中"), (7L, "西安", "西北"),
          (8L, "成都", "西南"), (9L, "哈尔滨", "东北"))
    
        sparkSession.sparkContext.makeRDD(cityAreaInfoArray).map{
          case(cityId,cityName,area) => {
            (cityId,CityAreaInfo(cityId,cityName,area))
          }
        }
      }
    
    
    
      /**
       *  锚点1的方法
       *  获得用户指定日期范围内的点击行为
       *
       * @param sparkSession
       * @param taskParam
       * @return
       */
      def getCityAndProductInfo(sparkSession: SparkSession, taskParam: JSONObject) = {
        val startDate = ParamUtils.getParam(taskParam,Constants.PARAM_START_DATE)
        val endDate = ParamUtils.getParam(taskParam,Constants.PARAM_END_DATE)
    
        // 只获取发生过点击的action数据 and click_product_id != -1
        // 获取到的一条action数据就代表一个点击行为
        val sql = "select city_id,click_product_id from user_visit_action where date >='"+startDate+"' and date <='"+endDate+"' and click_product_id != -1"
        import sparkSession.implicits._
        sparkSession.sql(sql).as[CityClickProduct].rdd.map{
          case cityPid => (cityPid.city_id,cityPid.click_product_id)
    
        }
    
      }
    
    
    
    }
    
    /*
    tmp_area_basic_info 区域临时表
        +-------+---------+----+---+
        |city_id|city_name|area|pid|
        +-------+---------+----+---+
        |      0|       北京|  华北| 58|
        |      0|       北京|  华北| 84|
    
    
    完整的 tmp_area_click_count 包含完整商品信息的各区域商品点击次数临时表
        +----+---+-----------+----------+
        |area|pid|click_count|city_infos|
        +----+---+-----------+----------+
        |  西南| 62|         15|      8:成都|
        |  华东| 73|         30| 1:上海,2:南京|
    
    
    优化的tmp_area_count_product_info
        +----+----------+---+------------+--------------+-----------+
        |area|city_infos|pid|product_name|product_status|click_count|
        +----+----------+---+------------+--------------+-----------+
        |  西南|      8:成都| 62|   product62|          Self|         15|
        |  华东| 1:上海,2:南京| 73|   product73|   Third Party|         30|
        
    
    数据准确 地区top3商品
        +----+----------+----------+---+------------+--------------+-----------+
        |area|area_level|city_infos|pid|product_name|product_status|click_count|
        +----+----------+----------+---+------------+--------------+-----------+
        |  华东|   A_Level| 1:上海,2:南京| 27|   product27|          Self|         40|
        |  华东|   A_Level| 1:上海,2:南京| 36|   product36|   Third Party|         39|
        |  华东|   A_Level| 1:上海,2:南京| 33|   product33|   Third Party|         38|
        |  西北|   C_Level|      7:西安| 28|   product28|          Self|         24|
        |  西北|   C_Level|      7:西安| 47|   product47|   Third Party|         24|
        |  西北|   C_Level|      7:西安| 32|   product32|          Self|         23|
    
    
    
    
     */
    
    
    
    /*有的东西没有标准答案*/
    /*先实现再优化*/
    

    自定义UDAF

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
    
    /**
     * 自定义UDF和UDAF是用来解析和调试复杂字段的
     * AreaTopStat.scala 需求六里面用了自定义UDF
     *
     * 需求六的自定义UDAF 用户自定义聚合函数
     * 聚合cityId:cityName
     * bufferCityInfo1: cityId1:cityName1,cityId2:cityName2
     *
     *
     * 需要在 AreaTopStat.scala 里面注册使用
     *
     * // 第五步 把city_name和city_id拼接在一起
     * // 注册UDF 基础表中数据拼接 tmp_area_basic_info
     *     sparkSession.udf.register("concat_long_string",(v1:Long,v2:String,split:String)=>{
     *        v1 +split +v2
     *     })
     * // 注册UDAF 用户自定义聚合函数
     *     sparkSession.udf.register("group_concat_distinct",new GroupConcatDistinct)
     *
     *
     * // 第四步 (二)包含完整商品信息的各区域商品点击次数临时表
     * // tmp_area_click_count
     * getAreaProductClickCountTable(sparkSession) //因为数据在上一张表里面 所以这里不需要传参数
     * // 测试 输出一下这张临时表
     *          sparkSession.sql("select * from tmp_area_click_count").show()
     *            /*+----+---+--------+ 这张不完整
     *            |area|pid|click_count|
     *            +----+---+--------+
     *            |  西南| 70|      17|
     *            |  西北| 43|      12|
     *
     *            但是我想要不丢失每个area对应的city相关数据
     *            group by后面又不能加上这个字段
     *            所以实现一个UDF函数和UDAF,注册后修改锚点4的方法的sql语句
     *            所以有了第五步
     *
     *            所以最终完整的 tmp_area_click_count 包含完整商品信息的各区域商品点击次数临时表
     *            +----+---+-----------+----------+
     *            |area|pid|click_count|city_infos|
     *            +----+---+-----------+----------+
     *            |  西南| 62|         15|      8:成都|
     *            |  华东| 73|         30| 1:上海,2:南京|
     *            */
     *
     *
     * def getAreaProductClickCountTable(sparkSession: SparkSession)={
     *      val sql = "select area,pid,count(*) click_count," +
     *      "group_concat_distinct(concat_long_string(city_id,city_name,':')) city_infos " +  //这句是自定义UDAF后加的
     *      "from tmp_area_basic_info group by area,pid"
     *
     *     sparkSession.sql(sql).createOrReplaceTempView("tmp_area_click_count")
     * }
     *
     */
    class GroupConcatDistinct extends UserDefinedAggregateFunction{
    
      // UDAF:输入数据类型为String
      override def inputSchema: StructType = StructType(StructField("cityInfo",StringType)::Nil) // ::Nil 啥意思 写上
    
      // 缓冲区类型
      override def bufferSchema: StructType = StructType(StructField("cityInfo",StringType)::Nil)
    
      // 输出类型
      override def dataType: DataType = StringType
    
      // true
      override def deterministic: Boolean = true
    
      // 初始化
      override def initialize(buffer: MutableAggregationBuffer): Unit ={
        buffer(0)=""
      }
    
      // 更新 重点来了
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        var bufferCityInfo= buffer.getString(0) //获取buffer里第0个位置的聚合数据 并且以字符串形式取出
        val cityInfo = input.getString(0) //获取输入的最新数据
    
        // 做一个有去重的字符串拼接
        if(!bufferCityInfo.contains(cityInfo)) { // 判断当前这个字符串里面没有这个cityInfo的字符串
          if ("".equals(bufferCityInfo)) {  // 缓冲区是新的 它就是一个空的字符串
            bufferCityInfo += cityInfo
          } else {
            bufferCityInfo += "," + cityInfo
          }
          // 更新第0个位置的buffer
          buffer.update(0,bufferCityInfo)
        }
      }
    
      // 合并
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        // bufferCityInfo1: cityId1:cityName1,cityId2:cityName2
        var bufferCityInfo1 = buffer1.getString(0)
        // bufferCityInfo2: cityId1:cityName1,cityId2:cityName2,cityId3:cityName3
        var bufferCityInfo2 = buffer2.getString(0)
    
        // 去重合并
        for(cityInfo <- bufferCityInfo2.split(",")){
          if(!bufferCityInfo1.contains(cityInfo)){
            if("".equals(bufferCityInfo1)){
              bufferCityInfo1 += cityInfo
            }else{
              bufferCityInfo1 += "," +cityInfo
            }
          }
        }
    
        buffer1.update(0,bufferCityInfo1)
    
      }
    
      // 统计结果
      override def evaluate(buffer: Row): Any = {
        buffer.getString(0)
      }
    }
    
    
    /**
     这个sql语句很特别 所以单独拿出来
     val sql = "select area,pid,count(*) click_count," +
    "group_concat_distinct(concat_long_string(city_id,city_name,':')) city_infos " +  //这句是自定义UDAF后加的
    "from tmp_area_basic_info group by area,pid"
     
     */
    

    样例类

    
    /**
     * 需求六的样例类
     * 锚点1的方法里的
     * @param city_id
     * @param click_product_id
     */
    case class CityClickProduct(city_id:Long,click_product_id:Long)
    
    
    /**
     * 需求六样例类
     * @param city_id
     * @param city_name
     * @param area
     */
    case class CityAreaInfo(city_id:Long,city_name:String,area:String)
    
    
    /**
     * 需求六样例类
     * 写入数据库的表 区域top3商品 数据形式
     *
     * @param taskid
     * @param area
     * @param areaLevel
     * @param productid
     * @param cityInfos
     * @param clickCount
     * @param productName
     * @param prodcutStatus
     */
    case class AreaTopProduct(
                             taskid:String,
                             area:String,
                             areaLevel:String,
                             productid:Long,
                             cityInfos:String,
                             clickCount:Long,
                             productName:String,
                             prodcutStatus:String
                             )
    
    

    依赖 前面的需求也一样

     <dependencies>
    
    
    
            <!-- Spark的依赖引入 -->
            <dependency>
                <groupId>com.atguigu</groupId>
                <artifactId>commons</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
            </dependency>
            <!-- 引入Scala -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
            </dependency>
    
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <!-- scala-maven-plugin插件用于在任意的maven项目中对scala代码进行编译/测试/运行/文档化 -->
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.atguigu.page.PageOneStepConvertRate</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    
    展开全文
  • TopN 的需求场景不管是在离线计算还是实时计算都是比较常见的,例如电商中计算热门销售商品、广告计算中点击数前N的广告、搜索中计算搜索次数前N的搜索词。topN又分为全局topN、分组topN, 比喻说热门销售商品可以...
  • JD文胸top150评论分析

    2021-02-08 16:04:47
    爬取了jd上了文胸综合排名前150个商品,并爬取了接近25000条的数据,对数据的color和size进行了简单分析
  • 如果是在关系数据库里实现各区域top3热门商品统计,需要编写sql查询语句。 之前用RDD先排序,后获取top的方法实现top n, 下面换成用临时表的sql top查询来实现,流程是: (1)创建SparkConf,构建Spark上下文 (2)...
  • Flink实战:全局TopN分析与实现

    千次阅读 2019-11-18 00:06:39
    点击蓝字关注~~按照区域areaId+商品gdsId分组,计算每个分组的累计销售额将得到的区域areaId+商品gdsId维度的销售额按照区域areaId分组,然后求得T...

空空如也

空空如也

1 2 3 4 5 ... 13
收藏数 248
精华内容 99
关键字:

top商品分析