spark 数据分析实例

2017-01-07 16:22:43 miaote 阅读数 3866

上次学习Spark还是两个月前的事情,期中好多事情耽搁了,现在开始正式地学习Spark的使用。前面所学习的Scala基本知识也能用上了,终于可以从简单了解过渡到应用和实现的阶段。
这次学习的内容跟进的是《Spark高技术据分析》一章,里面的内容感觉很灵活,不仅是从最简单的Spark对数据的处理开始,而且其中稍带的Scala知识提及,可以加深对Scala的印象,从而运用起来就更加能了解其中的意义。
1.用Spark建立弹性分布式数据集(RDD)
弹性分布式数据集(Resilient Distributed Dataset):Spark所提供的基本抽象,代表分布在集群中多台机器上的对象集合。
SparkContext:负责协调集群上Spark作业的执行
有两种方式创建RDD:

  1. 基于外部数据源创建RDD
  2. 在一个过或多个RDD上执行转换操作来创建RDD,例如:过滤、汇总、关联等
    例如可以调用SparkContext如下方法创建一个自定义的RDD:
scala> val rdds=sc.parallelize(Array(1,2,3,4),4)

第一个参数代表并行化的对象集合,第二个参数代表分区的个数。分区就是数据集中的子集,Spark的并行单位就是分区。
也可以调用textFile方法来获取本地文件或者集群上的文件,来得到数据源:

scala> val rdd=sc.textFile("/home/coder-z/linkage")

如果路径是一个目录的话,会将目录中所有的文件数据作为RDD的输入。
假设我们是从文件读入数据的,那么得到的RDD中的数据基本单位是String,现在的问题就是如何将数据转换为我们想要的数据类型。
2.RDD数据处理
RDD动作:RDD的操作并不会导致集群的分布式计算,只有调用了action时分布式计算才会执行。
当我们得到一个RDD时,为了检验获得的数据集是否是指定文件中的,可以调用RDD的first方法:

scala> rdd.first

这里的first是一个方法,而不是字段。因为scala中,如果方法没有设置参数,那么可以直接省略括号。
现在执行如下动作:

val head=rdd.take(10)               //从rdd中抽取前10个数据记录
head.fliter(!isHead(_)).foreach(println)    //过滤掉头信息,并输出

head是一个Array[String]类型的数组,调用filter方法用来过滤数组中的信息。这行代码体现了Scala作为函数式编程的简易性,尽可能使用函数作为一种“变量”来使用。其中的isHead方法定义如下:

def isHead(line:String)=line.contains("id_1")

是一个返回Boolean类型的函数,Scala申明函数时可以不申明返回类型,Scala会自行推断,但是如果函数情况比较复杂,Scala不一定能够推断出函数的返回值类型。
foreach方法接收一个函数println作为参数对数据集上的每个元素进行println定义的处理。而每个元素就作为参数,传入了println函数中。
同样地,数据集中的每个元素作为参数也传入isHead中,可以用占位符”_”来表示传入的元素。当然,如下代码也做相同处理:

head.filter(x=>!isHead(x)).foreach(println)

x就代表正在处理的元素,交给isHead进行处理。

3.使用元组和case class进行数据结构化*
现在我们要将获得的字符串解析成所需要的格式,先来看一下我们需要的数据格式。

607,53170,1,?,1,?,1,1,1,1,1,TRUE

这是数据集中其中的一条记录,根据数据集的标题信息,我们得到格式:

(Int,Int,Double,Double,Double,Double,Double,Double,Double,Double,Double,Boolean)

其中记录里会有”?”数值,这是不确定的信息,应该在格式转化的时候过滤为”NaN”,这样的数据过滤在之后对数据集的分析中也是需要的。
先简单定义一个能够把第3-11个数据转换成Double的函数:

scala> def toDoubles(s:String)={
     if("?".equals(s)) Double.NaN else s.toDouble
    }

再利用Scala中的case class 语法,我们可以自定义一条数据记录的格式:

case class MatchData(id_1:Int,id_2:Int,scores:Array[Double],matched:Boolean)

运用MatchData作为一条记录,可以直接使用传入的参数名作为字段名调用。
例如:matchData.id_1,而不是matchData(0)这样用序号使用了。
接着,我们就指定定义一个结构化一整条记录的函数:

def parse(line:String)={
    val pieces=line.split(",")
    val scores=pieces.slice(2,11).map(toDoubles)
    val id_1=pieces(0).toInt
    val id_2=pieces(1).toInt
    val matched=pieces(11).toBoolean
    MatchData(id_1,id_2,scores,matched)
}

如下代码运用parse进行对数据集的结构化:

 val mas=head.filter(!isHead(_)).map(line=>parse(line))

map函数是对RDD数据集合上的所有数据进行处理。

RDD缓存:
对于parse的调用,parse只会在RDD执行某种输出调用时才会真正调用,所以每一次的这类调用parse都会执行一遍。因此需要有cache方法持久化数据。
Spark提供了缓存RDD的机制,RDD的cache方法调用后,会指示在下一次的RDD计算后对RDD进行存储。例如下面的示例:

cached.cache()
cached.count()
cached.take(10)

RDD的第一次计算出现在count()方法中,所以当count()方法调用完后,RDD会被储存起来,之后的take方法获得的就是已经经过储存的RDD,而不是从本地取出重新计算一次的RDD。

4.概要信息
关于前面获取的数据,我们已经做到可以将它们结构化为我们想要的格式了,但是里面还是稍微有一点点的瑕疵就是数据记录并不是完善的,其中的”?”符号代表数据记录项的缺失,那么我们如何统计我们的数据集的概要信息?
这里我们就要用到Spark提供的StatCounter对象生成统计概要信息,我们自定义一个NAStatCounter如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.util.StatCounter

class NAStatCounter extends Serializable {
  val stats:StatCounter=new StatCounter()    //StatCount对象,用来生成概要信息
  var missing:Long=0                         //统计NaN的个数
  def add(x:Double): NAStatCounter = {       
    if(java.lang.Double.isNaN(x)) {          
      missing=missing+1                    
    }else{
      stats.merge(x)                         //向当前实例stats加入统计信息
    }
    this
  }

  def merge(others:NAStatCounter):NAStatCounter= { //加入一组的概要信息
      stats.merge(others.stats)
      missing+=others.missing
      this
  }

  override def toString: String = {
    "stats:"+stats.toString()+"NaN:"+missing
  }


object NAStatCounter extends Serializable {
  def apply(x:Double)=new NAStatCounter().add(x)
}
val arr=Array(1.0,Double.NaN,17.29)     // 自定义的新数组
val nas=arr.map(d=>NAStatCounter(d))        //对数组每个元素进行概要信息统计

这个时候我们也可以做到将多个Array[NAStatCount]h合并到一起,使用zip方法就可以将两个相同长度的数组进行合并。如下所示:

val nas1=Array(1.0,Double.NaN,20.0).map(d=>NAStatCounter(d))
val nas2=Array(Double.NaN,2.0,20.7).map(d=>NAStatCounter(d))
val merged=nas1.zip(nas2).map{case(a,b)=>a.merge(b)}

如果要在Scala集合上的所有记录执行merge操作,使用reduce方法就可以代替。
reduce 使用关联函数,把集合中两两为T类型的元素映射为一个T类型的元素。
由此,我们对上面的代码段进行修改,使用reduce方法代替:

val nas=List(nas1,nas2)
val merge=nas.reduce((n1,n2)=>{
    n1.zip(n2).map{(a,b)=>a.merge(b)}
})

现在用一段精炼的函数,把对RDD数据集上所有数据记录进行概要信息统表示出来:

def statWithMissing(rdd:RDD[Array[Double]]):Array[NAStatCounter]={
    val natats=rdd.mapPartitions((iter:Iterator[Array[Double]])=>{
    val nas=iter.next().map(d=>NAStatCounter(d))
    iter.foreach(arr=>{                             //对每一个Array[Double]操作
        nas.zip(arr).foreach{(n,d)=>n.add(d)}   //对Array[NAStatCounter]操作
    })
    Iterator(nas)                              //最终返回一个Array[NAStatCounter]
}) 
natats.reduce((n1,n2)=>{
        n1.zip(n2).map{case(a,b)=>a.merge(b)}  //将所有NAStatCounter做聚合处理
        })
}

我觉得以上这段代码应该多多分析分析透,因为它的风格是很多Spark程序都具有的,并且能够表达出函数式编程的风格所在。


终于等到寒假的第一天开始继续研究研究Spark了,前面事情太多,没有空闲把重心放在Spark上。这也是今年的第一篇吧,曾经看到别人将一礼拜更新一篇博客为目标,那个时候还觉得好像很轻松吧?现在看来根本不是啊,上个月劈头盖脸瞎忙,也没能做到一个礼拜写一次(两个礼拜一次都没有!),接下来要边学着用Spark一边把KMeans写出来,然后还要配合上Hadoop集群,感觉任务也不小呢,加油吧!

2019-01-08 11:00:05 linweidong 阅读数 755

本实例来源于《Spark高级数据分析》,这是一个很好的spark数据挖掘的实例。从经验上讲,推荐引擎属于大规模机器学习,在日常购物中大家或许深有体会,比如:你在淘宝上浏览了一些商品,或者购买了一些商品,那么淘宝就会根据你的偏好给你推荐一些其他类似的商品。然而,相比较其他机器学习算法,推荐引擎的输出更加的直观,有时候的推荐效果让人吃惊。作为机器学习开篇文章,本篇文章会系统的介绍基于Audioscrobbler数据集的音乐推荐。

数据集介绍

Audioscrobbler数据集是一个公开发布的数据集,读者可以在(https://github.com/libaoquan95/aasPractice/tree/master/c3/profiledata_06-May-2005)网站获取。数据集主要有三部分组成,user_artist_data.txt文件是主要的数据集文件记录了约2420条用户id、艺术家id以及用户收听艺术家歌曲的次数数据,包含141000个用户和160万个艺术家;artist_data.txt文件记录了艺术家id和对应的名字;artist_alias.txt记录了艺术家id和对应的别称id。

 

推荐算法介绍

由于所选取的数据集只记录了用户和歌曲之间的交互情况,除了艺术家名字之外没有其他信息。因此要找的学习算法不需要用户和艺术家的属性信息,这类算法通常被称为协同过滤。如果根据两个用户的年龄相同来判断他们可能具有相似的偏好,这不叫协同过滤。相反,根据两个用户播放过许多相同歌曲来判断他们可能都喜欢某首歌,这是协调过滤。

本篇所用的算法在数学上称为迭代最小二乘,把用户播放数据当成矩阵A,矩阵低i行第j列上的元素的值,代表用户i播放艺术家j的音乐。矩阵A是稀疏的,绝大多数元素是0,算法将A分解成两个小矩阵X和Y,既A=XYT,X代表用户特征矩阵,Y代表特征艺术家矩阵。两个矩阵的乘积当做用户-艺术家关系矩阵的估计。可以通过下边一组图直观的反映:

现在假如有5个听众,音乐有5首,那么A是一个5*5的矩阵,假如评分如下:

图2.1 用户订阅矩阵

假如d是三个属性,那么X的矩阵如下:

 

图2.2 用户-特征矩阵

Y的矩阵如下:

图2.3 特征-电影矩阵

实际的求解过程中通常先随机的固定矩阵Y,则,为提高计算效率,通常采用并行计算X的每一行,既。得到X之后,再反求出Y,不断的交替迭代,最终使得XYT与A的平方误差小于指定阈值,停止迭代,得到最终的X(代表用户特征矩阵)和Y矩阵(代表特征艺术家矩阵)。在根据最终X和Y矩阵结果,向用户进行推荐。

 

数据准备

首先将样例数据上传到HDFS,如果想要在本地测试这些功能的话,需要内存数量至少 6g, 当然可以通过减少数据量来达到通用的测试。

object RunRecommender {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // Optional, but may help avoid errors due to long lineage
   // spark.sparkContext.setCheckpointDir("hdfs:///tmp/")
    spark.sparkContext.setCheckpointDir("d:///tmp/")

    //val base = "hdfs:///user/ds/"
    val base =  "E:/newcode/spark/aas/data/";
    val rawUserArtistData = spark.read.textFile(base + "user_artist_data.txt")
    val rawArtistData = spark.read.textFile(base + "artist_data.txt")
    val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt")

    val runRecommender = new RunRecommender(spark)
    runRecommender.preparation(rawUserArtistData, rawArtistData, rawArtistAlias)
    runRecommender.model(rawUserArtistData, rawArtistData, rawArtistAlias)
    runRecommender.evaluate(rawUserArtistData, rawArtistAlias)
    runRecommender.recommend(rawUserArtistData, rawArtistData, rawArtistAlias)
  }

}


def preparation(
    rawUserArtistData: Dataset[String],
    rawArtistData: Dataset[String],
    rawArtistAlias: Dataset[String]): Unit = {

  rawUserArtistData.take(5).foreach(println)

  val userArtistDF = rawUserArtistData.map { line =>
    val Array(user, artist, _*) = line.split(' ')
    (user.toInt, artist.toInt)
  }.toDF("user", "artist")

  userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show()

  val artistByID = buildArtistByID(rawArtistData)
  val artistAlias = buildArtistAlias(rawArtistAlias)

  val (badID, goodID) = artistAlias.head
  artistByID.filter($"id" isin (badID, goodID)).show()
}

/**
  * 过滤无效的用户艺术家ID和名字行,将格式不正确的数据行剔除掉。
  * @param rawArtistData
  * @return
  */
def buildArtistByID(rawArtistData: Dataset[String]): DataFrame = {
  rawArtistData.flatMap { line =>
    val (id, name) = line.span(_ != '\t')
    if (name.isEmpty) {
      None
    } else {
      try {
        Some((id.toInt, name.trim))
      } catch {
        case _: NumberFormatException => None
      }
    }
  }.toDF("id", "name")
}

/**
  * 过滤艺术家id和对应的别名id,将格式拼写错误的行剔除掉。
  * @param rawArtistAlias
  * @return
  */
def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = {
  rawArtistAlias.flatMap { line =>
    val Array(artist, alias) = line.split('\t')
    if (artist.isEmpty) {
      None
    } else {
      Some((artist.toInt, alias.toInt))
    }
  }.collect().toMap
}

代码中模型训练好之后,预测了用户 2093760 的推荐结果,我测试结果如下,由于里面代码使用了随机生成初始矩阵,每个人的结果都有可能不一样。

Some((2814,50 Cent))
Some((829,Nas))
Some((1003249,Ludacris))
Some((1001819,2Pac))
Some((1300642,The Game))

代码中也给出了该用户以前听过的艺术家的名字如下:

Some((1180,David Gray))
Some((378,Blackalicious))
Some((813,Jurassic 5))
Some((1255340,The Saw Doctors))
Some((942,Xzibit))

模型评价

auc评价方法

def areaUnderCurve(
    positiveData: DataFrame,
    bAllArtistIDs: Broadcast[Array[Int]],
    predictFunction: (DataFrame => DataFrame)): Double = {

  // What this actually computes is AUC, per user. The result is actually something
  // that might be called "mean AUC".

  // Take held-out data as the "positive".
  // Make predictions for each of them, including a numeric score
  val positivePredictions = predictFunction(positiveData.select("user", "artist")).
    withColumnRenamed("prediction", "positivePrediction")

  // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
  // small AUC problems, and it would be inefficient, when a direct computation is available.

  // Create a set of "negative" products for each user. These are randomly chosen
  // from among all of the other artists, excluding those that are "positive" for the user.
  val negativeData = positiveData.select("user", "artist").as[(Int,Int)].
    groupByKey { case (user, _) => user }.
    flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
      val random = new Random()
      val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
      val negative = new ArrayBuffer[Int]()
      val allArtistIDs = bAllArtistIDs.value
      var i = 0
      // Make at most one pass over all artists to avoid an infinite loop.
      // Also stop when number of negative equals positive set size
      while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
        val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
        // Only add new distinct IDs
        if (!posItemIDSet.contains(artistID)) {
          negative += artistID
        }
        i += 1
      }
      // Return the set with user ID added back
      negative.map(artistID => (userID, artistID))
    }.toDF("user", "artist")

  // Make predictions on the rest:
  val negativePredictions = predictFunction(negativeData).
    withColumnRenamed("prediction", "negativePrediction")

  // Join positive predictions to negative predictions by user, only.
  // This will result in a row for every possible pairing of positive and negative
  // predictions within each user.
  val joinedPredictions = positivePredictions.join(negativePredictions, "user").
    select("user", "positivePrediction", "negativePrediction").cache()

  // Count the number of pairs per user
  val allCounts = joinedPredictions.
    groupBy("user").agg(count(lit("1")).as("total")).
    select("user", "total")
  // Count the number of correctly ordered pairs per user
  val correctCounts = joinedPredictions.
    filter($"positivePrediction" > $"negativePrediction").
    groupBy("user").agg(count("user").as("correct")).
    select("user", "correct")

  // Combine these, compute their ratio, and average over all users
  val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
    select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
    agg(mean("auc")).
    as[Double].first()

  joinedPredictions.unpersist()

  meanAUC
}

完整代码下载:RunRecommender.scala
2018-03-11 11:01:27 u011596455 阅读数 7135

美团是数据驱动的互联网服务,用户每天在美团上的点击、浏览、下单支付行为都会产生海量的日志,这些日志数据将被汇总处理、分析、挖掘与学习,为美团的各种推荐、搜索系统甚至公司战略目标制定提供数据支持。大数据处理渗透到了美团各业务线的各种应用场景,选择合适、高效的数据处理引擎能够大大提高数据生产的效率,进而间接或直接提升相关团队的工作效率。

美团最初的数据处理以Hive SQL为主,底层计算引擎为MapReduce,部分相对复杂的业务会由工程师编写MapReduce程序实现。随着业务的发展,单纯的Hive SQL查询或者MapReduce程序已经越来越难以满足数据处理和分析的需求。

一方面,MapReduce计算模型对多轮迭代的DAG作业支持不给力,每轮迭代都需要将数据落盘,极大地影响了作业执行效率,另外只提供Map和Reduce这两种计算因子,使得用户在实现迭代式计算(比如:机器学习算法)时成本高且效率低。

另一方面,在数据仓库的按天生产中,由于某些原始日志是半结构化或者非结构化数据,因此,对其进行清洗和转换操作时,需要结合SQL查询以及复杂的过程式逻辑处理,这部分工作之前是由Hive SQL结合Python脚本来完成。这种方式存在效率问题,当数据量比较大的时候,流程的运行时间较长,这些ETL流程通常处于比较上游的位置,会直接影响到一系列下游的完成时间以及各种重要数据报表的生成。

基于以上原因,美团在2014年的时候引入了Spark。为了充分利用现有Hadoop集群的资源,我们采用了Spark on Yarn模式,所有的Spark app以及MapReduce作业会通过Yarn统一调度执行。Spark在美团数据平台架构中的位置如图所示:

v2-33cfa78abd8b2df4e30f3bcf08a116cd_r

 

经过近两年的推广和发展,从最开始只有少数团队尝试用Spark解决数据处理、机器学习等问题,到现在已经覆盖了美团各大业务线的各种应用场景。从上游的ETL生产,到下游的SQL查询分析以及机器学习等,Spark正在逐步替代MapReduce作业,成为美团大数据处理的主流计算引擎。目前美团Hadoop集群用户每天提交的Spark作业数和MapReduce作业数比例为4:1,对于一些上游的Hive ETL流程,迁移到Spark之后,在相同的资源使用情况下,作业执行速度提升了十倍,极大地提升了业务方的生产效率。

下面我们将介绍Spark在美团的实践,包括我们基于Spark所做的平台化工作以及Spark在生产环境下的应用案例。其中包含Zeppelin结合的交互式开发平台,也有使用Spark任务完成的ETL数据转换工具,数据挖掘组基于Spark开发了特征平台和数据挖掘平台,另外还有基于Spark的交互式用户行为分析系统以及在SEM投放服务中的应用,以下是详细介绍。

Spark交互式开发平台

 

在推广如何使用Spark的过程中,我们总结了用户开发应用的主要需求:

  1. 数据调研:在正式开发程序之前,首先需要认识待处理的业务数据,包括:数据格式,类型(若以表结构存储则对应到字段类型)、存储方式、有无脏数据,甚至分析根据业务逻辑实现是否可能存在数据倾斜等等。这个需求十分基础且重要,只有对数据有充分的掌控,才能写出高效的Spark代码;
  2. 代码调试:业务的编码实现很难保证一蹴而就,可能需要不断地调试;如果每次少量的修改,测试代码都需要经过编译、打包、提交线上,会对用户的开发效率影响是非常大的;
  3. 联合开发:对于一整个业务的实现,一般会有多方的协作,这时候需要能有一个方便的代码和执行结果共享的途径,用于分享各自的想法和试验结论。

 

基于这些需求,我们调研了现有的开源系统,最终选择了Apache的孵化项目Zeppelin,将其作为基于Spark的交互式开发平台。Zeppelin整合了Spark,Markdown,Shell,Angular等引擎,集成了数据分析和可视化等功能。

 

v2-42d5869627f8f5741c00b16e811d978c_b

 

我们在原生的Zeppelin上增加了用户登陆认证、用户行为日志审计、权限管理以及执行Spark作业资源隔离,打造了一个美团的Spark的交互式开发平台,不同的用户可以在该平台上调研数据、调试程序、共享代码和结论。

 

集成在Zeppelin的Spark提供了三种解释器:Spark、Pyspark、SQL,分别适用于编写Scala、Python、SQL代码。对于上述的数据调研需求,无论是程序设计之初,还是编码实现过程中,当需要检索数据信息时,通过Zeppelin提供的SQL接口可以很便利的获取到分析结果;另外,Zeppelin中Scala和Python解释器自身的交互式特性满足了用户对Spark和Pyspark分步调试的需求,同时由于Zeppelin可以直接连接线上集群,因此可以满足用户对线上数据的读写处理请求;最后,Zeppelin使用Web Socket通信,用户只需要简单地发送要分享内容所在的http链接,所有接受者就可以同步感知代码修改,运行结果等,实现多个开发者协同工作。

 

Spark作业ETL模板

 

除了提供平台化的工具以外,我们也会从其他方面来提高用户的开发效率,比如将类似的需求进行封装,提供一个统一的ETL模板,让用户可以很方便的使用Spark实现业务需求。

 

美团目前的数据生产主体是通过ETL将原始的日志通过清洗、转换等步骤后加载到Hive表中。而很多线上业务需要将Hive表里面的数据以一定的规则组成键值对,导入到Tair中,用于上层应用快速访问。其中大部分的需求逻辑相同,即把Hive表中几个指定字段的值按一定的规则拼接成key值,另外几个字段的值以json字符串的形式作为value值,最后将得到的对写入Tair。

 

v2-0a119b77acc868edbd353589e6333093_b

 

由于Hive表中的数据量一般较大,使用单机程序读取数据和写入Tair效率比较低,因此部分业务方决定使用Spark来实现这套逻辑。最初由业务方的工程师各自用Spark程序实现从Hive读数据,写入到Tair中(以下简称hive2Tair流程),这种情况下存在如下问题:
每个业务方都要自己实现一套逻辑类似的流程,产生大量重复的开发工作;
由于Spark是分布式的计算引擎,因此代码实现和参数设置不当很容易对Tair集群造成巨大压力,影响Tair的正常服务。
基于以上原因,我们开发了Spark版的hive2Tair流程,并将其封装成一个标准的ETL模板,其格式和内容如下所示:
v2-864ee6ac385a1f1bf43c4410ea7c74bf_b

source用于指定Hive表源数据,target指定目标Tair的库和表,这两个参数可以用于调度系统解析该ETL的上下游依赖关系,从而很方便地加入到现有的ETL生产体系中。

 

有了这个模板,用户只需要填写一些基本的信息(包括Hive表来源,组成key的字段列表,组成value的字段列表,目标Tair集群)即可生成一个hive2Tair的ETL流程。整个流程生成过程不需要任何Spark基础,也不需要做任何的代码开发,极大地降低了用户的使用门槛,避免了重复开发,提高了开发效率。该流程执行时会自动生成一个Spark作业,以相对保守的参数运行:默认开启动态资源分配,每个Executor核数为2,内存2GB,最大Executor数设置为100。如果对于性能有很高的要求,并且申请的Tair集群比较大,那么可以使用一些调优参数来提升写入的性能。目前我们仅对用户暴露了设置Executor数量以及每个Executor内存的接口,并且设置了一个相对安全的最大值规定,避免由于参数设置不合理给Hadoop集群以及Tair集群造成异常压力。

 

基于Spark的用户特征平台

 

在没有特征平台之前,各个数据挖掘人员按照各自项目的需求提取用户特征数据,主要是通过美团的ETL调度平台按月/天来完成数据的提取。

 

但从用户特征来看,其实会有很多的重复工作,不同的项目需要的用户特征其实有很多是一样的,为了减少冗余的提取工作,也为了节省计算资源,建立特征平台的需求随之诞生,特征平台只需要聚合各个开发人员已经提取的特征数据,并提供给其他人使用。特征平台主要使用Spark的批处理功能来完成数据的提取和聚合。
开发人员提取特征主要还是通过ETL来完成,有些数据使用Spark来处理,比如用户搜索关键词的统计。
开发人员提供的特征数据,需要按照平台提供的配置文件格式添加到特征库,比如在图团购的配置文件中,团购业务中有一个用户24小时时段支付的次数特征,输入就是一个生成好的特征表,开发人员通过测试验证无误之后,即完成了数据上线;另外对于有些特征,只需要从现有的表中提取部分特征数据,开发人员也只需要简单的配置即可完成。

 

v2-ad7464e5c9a581568f310ff56d442945_b

 

在图中,我们可以看到特征聚合分两层,第一层是各个业务数据内部聚合,比如团购的数据配置文件中会有很多的团购特征、购买、浏览等分散在不同的表中,每个业务都会有独立的Spark任务来完成聚合,构成一个用户团购特征表;特征聚合是一个典型的join任务,对比MapReduce性能提升了10倍左右。第二层是把各个业务表数据再进行一次聚合,生成最终的用户特征数据表。
特征库中的特征是可视化的,我们在聚合特征时就会统计特征覆盖的人数,特征的最大最小数值等,然后同步到RDB,这样管理人员和开发者都能通过可视化来直观地了解特征。 另外,我们还提供特征监测和告警,使用最近7天的特征统计数据,对比各个特征昨天和今天的覆盖人数,是增多了还是减少了,比如性别为女这个特征的覆盖人数,如果发现今天的覆盖人数比昨天低了1%(比如昨天6亿用户,女性2亿,那么人数降低了1%*2亿=2百万)突然减少2万女性用户说明数据出现了极大的异常,何况网站的用户数每天都是增长的。这些异常都会通过邮件发送到平台和特征提取的相关人。


数据挖掘平台是完全依赖于用户特征库的,通过特征库提供用户特征,数据挖掘平台对特征进行转换并统一格式输出,就此开发人员可以快速完成模型的开发和迭代,之前需要两周开发一个模型,现在短则需要几个小时,多则几天就能完成。特征的转换包括特征名称的编码,也包括特征值的平滑和归一化,平台也提供特征离散化和特征选择的功能,这些都是使用Spark离线完成。

开发人员拿到训练样本之后,可以使用Spark mllib或者Python sklearn等完成模型训练,得到最优化模型之后,将模型保存为平台定义好的模型存储格式,并提供相关配置参数,通过平台即可完成模型上线,模型可以按天或者按周进行调度。当然如果模型需要重新训练或者其它调整,那么开发者还可以把模型下线。不只如此,平台还提供了一个模型准确率告警的功能,每次模型在预测完成之后,会计算用户提供的样本中预测的准确率,并比较开发者提供的准确率告警阈值,如果低于阈值则发邮件通知开发者,是否需要对模型重新训练。

在开发挖掘平台的模型预测功时能我们走了点弯路,平台的模型预测功能开始是兼容Spark接口的,也就是使用Spark保存和加载模型文件并预测,使用过的人知道Spark mllib的很多API都是私有的开发人员无法直接使用,所以我们这些接口进行封装然后再提供给开发者使用,但也只解决了Spark开发人员的问题,平台还需要兼容其他平台的模型输出和加载以及预测的功能,这让我们面临必需维护一个模型多个接口的问题,开发和维护成本都较高,最后还是放弃了兼容Spark接口的实现方式,我们自己定义了模型的保存格式,以及模型加载和模型预测的功能。

v2-8c85cfdef52604509efed464e4fdd62a_b

以上内容介绍了美团基于Spark所做的平台化工作,这些平台和工具是面向全公司所有业务线服务的,旨在避免各团队做无意义的重复性工作,以及提高公司整体的数据生产效率。目前看来效果是比较好的,这些平台和工具在公司内部得到了广泛的认可和应用,当然也有不少的建议,推动我们持续地优化。
随着Spark的发展和推广,从上游的ETL到下游的日常数据统计分析、推荐和搜索系统,越来越多的业务线开始尝试使用Spark进行各种复杂的数据处理和分析工作。下面将以Spark在交互式用户行为分析系统以及SEM投放服务为例,介绍Spark在美团实际业务生产环境下的应用。

Spark在交互式用户行为分析系统中的实践

美团的交互式用户行为分析系统,用于提供对海量的流量数据进行交互式分析的功能,系统的主要用户为公司内部的PM和运营人员。普通的BI类报表系统,只能够提供对聚合后的指标进行查询,比如PV、UV等相关指标。但是PM以及运营人员除了查看一些聚合指标以外,还需要根据自己的需求去分析某一类用户的流量数据,进而了解各种用户群体在App上的行为轨迹。根据这些数据,PM可以优化产品设计,运营人员可以为自己的运营工作提供数据支持,用户核心的几个诉求包括:

  1. 自助查询,不同的PM或运营人员可能随时需要执行各种各样的分析功能,因此系统需要支持用户自助使用。
  2. 响应速度,大部分分析功能都必须在几分钟内完成。
  3. 可视化,可以通过可视化的方式查看分析结果。

要解决上面的几个问题,技术人员需要解决以下两个核心问题:

  1. 海量数据的处理,用户的流量数据全部存储在Hive中,数据量非常庞大,每天的数据量都在数十亿的规模。
  2. 快速计算结果,系统需要能够随时接收用户提交的分析任务,并在几分钟之内计算出他们想要的结果。

要解决上面两个问题,目前可供选择的技术主要有两种:MapReduce和Spark。在初期架构中选择了使用MapReduce这种较为成熟的技术,但是通过测试发现,基于MapReduce开发的复杂分析任务需要数小时才能完成,这会造成极差的用户体验,用户无法接受。

因此我们尝试使用Spark这种内存式的快速大数据计算引擎作为系统架构中的核心部分,主要使用了Spark Core以及Spark SQL两个组件,来实现各种复杂的业务逻辑。实践中发现,虽然Spark的性能非常优秀,但是在目前的发展阶段中,还是或多或少会有一些性能以及OOM方面的问题。因此在项目的开发过程中,对大量Spark作业进行了各种各样的性能调优,包括算子调优、参数调优、shuffle调优以及数据倾斜调优等,最终实现了所有Spark作业的执行时间都在数分钟左右。并且在实践中解决了一些shuffle以及数据倾斜导致的OOM问题,保证了系统的稳定性。

结合上述分析,最终的系统架构与工作流程如下所示:

  1. 用户在系统界面中选择某个分析功能对应的菜单,并进入对应的任务创建界面,然后选择筛选条件和任务参数,并提交任务。
  2. 由于系统需要满足不同类别的用户行为分析功能(目前系统中已经提供了十个以上分析功能),因此需要为每一种分析功能都开发一个Spark作业。
  3. 采用J2EE技术开发了Web服务作为后台系统,在接收到用户提交的任务之后,根据任务类型选择其对应的Spark作业,启动一条子线程来执行Spark-submit命令以提交Spark作业。
  4. Spark作业运行在Yarn集群上,并针对Hive中的海量数据进行计算,最终将计算结果写入数据库中。
  5. 用户通过系统界面查看任务分析结果,J2EE系统负责将数据库中的计算结果返回给界面进行展现。

v2-3175f11d79e2d5f840776336c071d198_b

该系统上线后效果良好:90%的Spark作业运行时间都在5分钟以内,剩下10%的Spark作业运行时间在30分钟左右,该速度足以快速响应用户的分析需求。通过反馈来看,用户体验非常良好。目前每个月该系统都要执行数百个用户行为分析任务,有效并且快速地支持了PM和运营人员的各种分析需求。

Spark在SEM投放服务中的应用

流量技术组负责着美团站外广告的投放技术,目前在SEM、SEO、DSP等多种业务中大量使用了Spark平台,包括离线挖掘、模型训练、流数据处理等。美团SEM(搜索引擎营销)投放着上亿的关键词,一个关键词从被挖掘策略发现开始,就踏上了精彩的SEM之旅。它经过预估模型的筛选,投放到各大搜索引擎,可能因为市场竞争频繁调价,也可能因为效果不佳被迫下线。而这样的旅行,在美团每分钟都在发生。如此大规模的随机“迁徙”能够顺利进行,Spark功不可没。

v2-aa5367b691a88ec57d71e9a222edbf12_b

Spark不止用于美团SEM的关键词挖掘、预估模型训练、投放效果统计等大家能想到的场景,还罕见地用于关键词的投放服务,这也是本段介绍的重点。一个快速稳定的投放系统是精准营销的基础。

美团早期的SEM投放服务采用的是单机版架构,随着关键词数量的极速增长,旧有服务存在的问题逐渐暴露。受限于各大搜索引擎API的配额(请求频次)、账户结构等规则,投放服务只负责处理API请求是远远不够的,还需要处理大量业务逻辑。单机程序在小数据量的情况下还能通过多进程勉强应对,但对于如此大规模的投放需求,就很难做到“兼顾全局”了。

新版SEM投放服务在15年Q2上线,内部开发代号为Medusa。在Spark平台上搭建的Medusa,全面发挥了Spark大数据处理的优势,提供了高性能高可用的分布式SEM投放服务,具有以下几个特性:

  1. 低门槛,Medusa整体架构的设计思路是提供数据库一样的服务。在接口层,让RD可以像操作本地数据库一样,通过SQL来“增删改查”线上关键词表,并且只需要关心自己的策略标签,不需要关注关键词的物理存储位置。Medusa利用Spark SQL作为服务的接口,提高了服务的易用性,也规范了数据存储,可同时对其他服务提供数据支持。基于Spark开发分布式投放系统,还可以让RD从系统层细节中解放出来,全部代码只有400行。
  2. 高性能、可伸缩,为了达到投放的“时间”、“空间”最优化,Medusa利用Spark预计算出每一个关键词在远程账户中的最佳存储位置,每一次API请求的最佳时间内容。在配额和账号容量有限的情况下,轻松掌控着亿级的在线关键词投放。通过控制Executor数量实现了投放性能的可扩展,并在实战中做到了全渠道4小时全量回滚。
  3. 高可用,有的同学或许会有疑问:API请求适合放到Spark中做吗?因为函数式编程要求函数是没有副作用的纯函数(输入是确定的,输出就是确定的)。这确实是一个问题,Medusa的思路是把请求API封装成独立的模块,让模块尽量做到“纯函数”的无副作用特性,并参考面向轨道编程的思路,将全部请求log重新返回给Spark继续处理,最终落到Hive,以此保证投放的成功率。为了更精准的控制配额消耗,Medusa没有引入单次请求重试机制,并制定了服务降级方案,以极低的数据丢失率,完整地记录了每一个关键词的旅行。

结论和展望

本文我们介绍了美团引入Spark的起源,基于Spark所做的一些平台化工作,以及Spark在美团具体应用场景下的实践。总体而言,Spark由于其灵活的编程接口、高效的内存计算,能够适用于大部分数据处理场景。在推广和使用Spark的过程中,我们踩过不少坑,也遇到过很多问题,但填坑和解决问题的过程,让我们对Spark有了更深入的理解,我们也期待着Spark在更多的应用场景中发挥重要的作用。


2019-07-16 23:21:48 JavaDestiny 阅读数 744

Spark实例

tomcat日志
110.52.250.126 - - [30/May/2018:17:38:20 +0800] "GET /source/plugin/wsh_wx/img/wsh_zk.css HTTP/1.1" 200 1482
27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 680
27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 682
27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/filetype/common.gif HTTP/1.1" 200 90
110.52.250.126 - - [30/May/2018:17:38:20 +0800] "GET /source/plugin/wsh_wx/img/wx_jqr.gif HTTP/1.1" 200 1770
27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/recommend_1.gif HTTP/1.1" 200 1030
110.52.250.126 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/wsh_zk.css HTTP/1.1" 200 4542
27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1" 200 582
27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/pn.png HTTP/1.1" 200 592
27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/editor/editor.gif HTTP/1.1" 200 13648
8.35.201.165 - - [30/May/2018:17:38:21 +0800] "GET /uc_server/data/avatar/000/05/94/42_avatar_middle.jpg HTTP/1.1" 200 6153
8.35.201.164 - - [30/May/2018:17:38:21 +0800] "GET /uc_server/data/avatar/000/03/13/42_avatar_middle.jpg HTTP/1.1" 200 5087
8.35.201.163 - - [30/May/2018:17:38:21 +0800] "GET /uc_server/data/avatar/000/04/87/42_avatar_middle.jpg HTTP/1.1" 200 5117
8.35.201.165 - - [30/May/2018:17:38:21 +0800] "GET /uc_server/data/avatar/000/01/01/42_avatar_middle.jpg HTTP/1.1" 200 5844
8.35.201.160 - - [30/May/2018:17:38:21 +0800] "GET /uc_server/data/avatar/000/04/12/42_avatar_middle.jpg HTTP/1.1" 200 3174
8.35.201.163 - - [30/May/2018:17:38:21 +0800] "GET /static/image/common/arw_r.gif HTTP/1.1" 200 65
8.35.201.166 - - [30/May/2018:17:38:21 +0800] "GET /static/image/common/search.png HTTP/1.1" 200 210
8.35.201.144 - - [30/May/2018:17:38:21 +0800] "GET /static/image/common/pmto.gif HTTP/1.1" 200 152
8.35.201.161 - - [30/May/2018:17:38:21 +0800] "GET /static/image/common/search.png HTTP/1.1" 200 3047
8.35.201.164 - - [30/May/2018:17:38:21 +0800] "GET /uc_server/data/avatar/000/05/83/35_avatar_middle.jpg HTTP/1.1" 200 7171
8.35.201.160 - - [30/May/2018:17:38:21 +0800] "GET /uc_server/data/avatar/000/01/54/35_avatar_middle.jpg HTTP/1.1" 200 5396

实例一:求图片的访问量

scala代码
package Spark

import org.apache.spark.{SparkConf, SparkContext}
import scala.util.matching.Regex

/*
* 解析tomcat日志
*
* @author Jabin
* @version 0.0.1
* @data 2019/07/16
* */
object LogCount {
  def main(args: Array[String]): Unit = {
    //创建Spark配置
    val conf = new SparkConf().setAppName("Log.Count").setMaster("local")
    //加载Spark配置
    val sc = new SparkContext(conf)

    val rdd = sc.textFile("C:\\Users\\Administrator\\Desktop\\日志\\tomcat.log")
    .map(
      line => {
      /*
      * 27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
      * 通过正则表达式匹配
      * */
      val pattern = "(/(\\w)+)+\\.[a-z]{3}".r
      //得到/static/image/common/faq.gif
      val photoDir = pattern.findAllIn(line).mkString(",")
      val regex = new Regex("\\w+\\.[a-z]{3}")
      //得到faq.gif
      val photoName = regex.findAllIn(photoDir).mkString(",")

      (photoName, 1)
    }
    )

    val rdd1 = rdd.reduceByKey(_+_)

    rdd1.foreach(println)

    val rdd2 = rdd1.sortBy(_._2,false)

    rdd2.foreach(println)

    rdd2.take(2).foreach(println)

    //关闭
    sc.stop()
  }
}

结果

//reduceByKey结果
(editor.gif,1)
(common.gif,1)
(35_avatar_middle.jpg,2)
(pn.png,1)
(wx_jqr.gif,1)
(pmto.gif,1)
(wsh_zk.css,2)
(42_avatar_middle.jpg,5)
(arw_r.gif,1)
(common_2_verify_icon.png,1)
(hot_1.gif,2)
(search.png,2)
(recommend_1.gif,1)
(faq.gif,1)
//sortBy结果
(42_avatar_middle.jpg,5)
(35_avatar_middle.jpg,2)
(wsh_zk.css,2)
(hot_1.gif,2)
(search.png,2)
(editor.gif,1)
(common.gif,1)
(pn.png,1)
(wx_jqr.gif,1)
(pmto.gif,1)
(arw_r.gif,1)
(common_2_verify_icon.png,1)
(recommend_1.gif,1)
(faq.gif,1)
//最终结果
(42_avatar_middle.jpg,5)
(35_avatar_middle.jpg,2)

实例二:创建自定义分区

scala代码

package Spark

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
import scala.util.matching.Regex

/*
* 解析tomcat日志,自定义分区
*
* @author Jabin
* @version 0.0.1
* @data 2019/07/16
* */
object PartitionCount {
  def main(args: Array[String]): Unit = {
    //创建Spark配置
    val conf = new SparkConf().setAppName("Partition.Count").setMaster("local")
    //加载Spark配置
    val sc = new SparkContext(conf)

    val rdd = sc.textFile("C:\\Users\\Administrator\\Desktop\\作业\\tomcat.log")
      .map(
        line => {
          /*
          * 27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
          * 通过正则表达式匹配
          * */
          val pattern = "(/(\\w)+)+\\.[a-z]{3}".r
          //得到/static/image/common/faq.gif
          val photoDir = pattern.findAllIn(line).mkString(",")
          val regex = new Regex("\\w+\\.[a-z]{3}")
          //得到faq.gif
          val photoName = regex.findAllIn(photoDir).mkString(",")

          (photoName, line)
        }
      )

    //获取不重复的photoName
    val rdd1 = rdd.map(_._1).distinct().collect

    //创建分区规则
    val partition = new PartitionCount(rdd1)
    val rdd2 = rdd.partitionBy(partition)

    rdd2.saveAsTextFile("C:\\Users\\Administrator\\Desktop\\日志\\partition")

	//关闭
    sc.stop()
  }
}

class PartitionCount(array: Array[String]) extends Partitioner{
  //创建map存储photoName
  val map = new mutable.HashMap[String, Int]()
  //初始化分区
  var id = 0

  for (arr <- array){
    map.put(arr,id)
    id += 1
  }

  //返回分区的数目
  override def numPartitions: Int = map.size

  //根据photoName,返回对应的分区
  override def getPartition(key: Any): Int = map.getOrElse(key.toString,0)
}

结果

在这里插入图片描述

实例三:访问数据库(查询)

SQL代码
CREATE database IF NOT EXISTS DATA;
USE DATA;
CREATE TABLE EMPLOYEE(ID INT NOT NULL AUTO_INCREMENT,NAME VARCHAR(20),SALARY INT,PRIMARY KEY(ID));
INSERT INTO EMPLOYEE(NAME,SALARY) VALUES('Destiny',1000);
INSERT INTO EMPLOYEE(NAME,SALARY) VALUES('Freedom',4500);
INSERT INTO EMPLOYEE(NAME,SALARY) VALUES('Fate',3000);
SELECT * FROM EMPLOYEE;

在这里插入图片描述
scala代码

package Spark

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/*
* 解析tomcat日志,自定义分区
*
* @author Jabin
* @version 0.0.1
* @data 2019/07/17
* */
object JDBC {
  //创建连接
  private val connection = () => {
    Class.forName("com.mysql.cj.jdbc.Driver").newInstance()
    DriverManager.getConnection("jdbc:mysql://localhost:3306/data?serverTimezone=GMT%2B8","root","root")
  }

  def main(args: Array[String]): Unit = {
    //创建Spark配置
    val conf = new SparkConf().setAppName("JDBC.Count").setMaster("local")
    //加载Spark配置
    val sc = new SparkContext(conf)

    val rdd = new JdbcRDD(sc,connection,"SELECT * FROM EMPLOYEE WHERE SALARY >= ? AND SALARY < ?",3000,6000,2,r => {
      val name = r.getString(2)
      val salary = r.getInt(3)

      (name,salary)
    })

    val result = rdd.collect()

    println(result.toBuffer)
//    result.foreach(println)

    //关闭
    sc.stop()
  }
}

结果

ArrayBuffer((FATE,3000), (FREEDOM,4500))

实例三:访问数据库(插入)

SQL代码
CREATE database IF NOT EXISTS DATA;
USE DATA;
CREATE TABLE LOG(PhotoName VARCHAR(50),Num INT)

在这里插入图片描述
scala代码

package Spark

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.matching.Regex

/*
* 将数据导入到MySQL
*
* @author Jabin
* @version 0.0.1
* @data 2019/07/18
* */
object MyMySQL {
  var connection : Connection = _
  var pst : PreparedStatement = _
  def main(args: Array[String]): Unit = {
    //创建Spark配置
    val conf = new SparkConf().setAppName("MySQL.Count").setMaster("local")
    //加载Spark配置
    val sc = new SparkContext(conf)

    val rdd = sc.textFile("C:\\Users\\Administrator\\Desktop\\作业\\tomcat.log")
      .map(
        line => {
          /*
          * 27.19.74.143 - - [30/May/2018:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
          * 通过正则表达式匹配
          * */
          val pattern = "(/(\\w)+)+\\.[a-z]{3}".r
          //得到/static/image/common/faq.gif
          val photoDir = pattern.findAllIn(line).mkString(",")
          val regex = new Regex("\\w+\\.[a-z]{3}")
          //得到faq.gif
          val photoName = regex.findAllIn(photoDir).mkString(",")

          (photoName, 1)
        }
      )

    val rdd1 = rdd.reduceByKey(_+_)

    rdd1.foreachPartition(insertData)

    sc.stop()
  }

  def insertData(iter: Iterator[(String, Int)]) = {
    try{
      connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/data?serverTimezone=GMT%2B8","root","root")
      pst = connection.prepareStatement("INSERT INTO LOG VALUES(?,?)")
      iter.foreach(f =>{
        pst.setString(1,f._1)
        pst.setInt(2,f._2)

        pst.executeUpdate()
      })
    }catch{
      case t: Throwable => t.printStackTrace()
    }finally {
      if (connection != null) connection.close()
      if (pst != null) pst.close()
    }
  }
}

结果

在这里插入图片描述

2019-10-15 17:28:32 sdafhkjas 阅读数 991

Python3实战Spark大数据分析及调度 学习资源

一、实例分析
1.1 数据 student.txt
在这里插入图片描述
1.2 代码在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

二、代码解析
2.1函数解析
2.1.1 collect()
RDD的特性
在这里插入图片描述在进行基本RDD“转换”运算时不会立即执行,结果不会显示在显示屏中,collect()是一个“动作”运算,会立刻执行,显示结果。

2.1.2 reduce()
说明
reduce()函数会对参数序列中的元素进行累积。

语法
reduce(function, iterable[, initializer])

参数
function – 函数,有两个参数
iterable – 可迭代对象
initializer – 可选,初始参数
实例
说明:Python3的内建函数移除了reduce函数,reduce函数放在functools模块
在这里插入图片描述
2.1.3 type()
语法
class type(name, bases, dict)

参数
name – 类的名称。
bases – 基类的元组。
dict – 字典,类内定义的命名空间变量。
返回值
一个参数返回对象类型, 三个参数,返回新的类型对象。

实例
在这里插入图片描述
在这里插入图片描述
三、问题分析在这里插入图片描述
解析
1、检查拼写是否有误
2、检查缩进是否合规
3、检查()是否一一配对

四、实例 小练
4.1 数据 user_small
在这里插入图片描述
4.2 用户上网记录统计(一行为一条记录).(用户:第3列)
在这里插入图片描述
4.2用户流量统计。分别统计上行流量及下行流量并将结果各列以空格键隔开输出到文件。(用户:第3列;上行流量:第25列;下行流量:第26列)

在这里插入图片描述
4.3 统计用户总流量
在这里插入图片描述
4.4、微信APP流量统计。(微信APP特征MicroMessenger,位于第20列,统计对应的下行流量值——第26列的数值。) 在这里插入图片描述