2017-03-22 17:46:04 liuzhoulong 阅读数 1134
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5436 人正在学习 去看看 陆永剑

Spark MLlib之KMeans实例:


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

public class SparkMLlibKMeans {

	public static void main(String[] args) {

		SparkConf conf = new SparkConf().setAppName("K-means Example");
		JavaSparkContext sc = new JavaSparkContext(conf);

		// Load and parse data
		String path = "file:///data/hadoop/spark-2.0.0-bin-hadoop2.7/data/mllib/kmeans_data.txt";
		JavaRDD<String> data = sc.textFile(path);
		JavaRDD<Vector> parsedData = data.map(new Function<String, Vector>() {
			public Vector call(String s) {
				String[] sarray = s.split(" ");
				double[] values = new double[sarray.length];
				for (int i = 0; i < sarray.length; i++)
					values[i] = Double.parseDouble(sarray[i]);
				return Vectors.dense(values);
			}
		});
		parsedData.cache();
		
		// Cluster the data into two classes using KMeans
		int numIterations = 20;
		for (int numClusters = 1; numClusters <= 6; numClusters++) {//k值
		  KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters,
				numIterations);
		// Evaluate clustering by computing Within Set Sum of Squared Errors
		  double WSSSE = clusters.computeCost(parsedData.rdd());
		  System.out.println("k=" + numClusters+ " Within Set Sum of Squared Errors = " + WSSSE);
		  Vector[] vs = clusters.clusterCenters();
		  int clusteridx = clusters.predict(Vectors.dense(0.2,0.2,0.2));
		  System.out.println("(0.2,0.2,0.2) is cluster " + clusteridx);
		  for (Vector v : vs) {
		     System.out.println("cluser center=" + v);
		  }
		}
		sc.close();
		
	}

}

       K选择是 K-means 算法的关键,Spark MLlib 在 KMeansModel 类里提供了 computeCost 方法,该方法通过计算所有数据点到其最近的中心点的平方和来评估聚类的效果。一般来说,同样的迭代次数和算法跑的次数,这个值越小代表聚类的效果越好。但是在实际情况下,我们还要考虑到聚类结果的可解释性,不能一味的选择使 computeCost 结果值最小的那个 K。

上面例子中迭代算出k取值1到6个中心点的情况,并计算输出computeCost 和聚合后的中心点,预测点0.2 0.2 0.2 属于聚合后的那个中心点。

原始数据:

     

运行结果:

     

2016-12-22 19:33:45 u014135021 阅读数 3330
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5436 人正在学习 去看看 陆永剑

相比于本人上篇博客中scala实现的串行kmeans而已,这次的优点体现在并行的计算,并同时运行多组kmeans算法(选取不同的初值),选择其中效果最好的作为结果输出

作为一个初学者,这次的编程让我初步的体会到了函数式编程的魅力,让我见识到了并行计算,学习的道路还有很长啊

package zzl

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import breeze.linalg.norm
import org.apache.spark.mllib.linalg.Vectors

//该case类保存了每个向量本身与该向量的L2范数
case class VectorInform(var Point:Vector,var norm2:Double)
//该case类保存一个向量所对应的中心向量的id与该向量与中心向量之间的花费
case class CenterInform(val center_id:Int,val cost:Double)
class Kmeans(val data:RDD[Vector],val numClusters:Int,val MaxIterations:Int,val runs:Int = 1,
    val threshold:Double=1e-4,val savepath:String="/home/hadoop/haha")extends Serializable{
  def output(data:Array[Array[VectorInform]])
  {
    data.foreach {_.foreach {x=>x.Point.foreachActive((index,value)=>print(index+" "+value+"  "));println}}
  }
  //返回两向量的和向量
  def add(p1:Vector,p2:Vector):Vector=
  {
    var p3=new Array[Double](p1.size)
    for(i<- 0 until p1.size)
    {
      p3(i)=p1(i)+p2(i)   
    }
    Vectors.dense(p3)
  }
  //获取初始中心点
  def InitCenterPoint(data:RDD[VectorInform]):Array[Array[VectorInform]]={
    var sample=data.takeSample(false, numClusters*runs)
    Array.tabulate(runs)(r=>sample.slice(numClusters*r, numClusters*(r+1)))
  }
  //查找该点属于第k个并行模块的哪个类
  def FindClostCenter(center:Array[VectorInform],point:VectorInform):CenterInform=
  {
    var bestdistance=Double.PositiveInfinity
    var id=0
    for(i <- 0 until center.length)
    {
      var dist=Vectors.sqdist(center(i).Point, point.Point)
      if(dist<bestdistance)
      {
        bestdistance=dist
        id=i
      }
    }
    CenterInform(id,bestdistance)
  }
  def plus(a:(Vector,Int),b:(Vector,Int)):(Vector,Int)={
    (add(a._1, b._1),a._2+b._2)
  }
  def divide(sum:Vector,n:Int):Vector={
    val m=new Array[Double](sum.size)
    for(i<- 0 until sum.size)
      m(i)=sum(i)/n.toDouble
    Vectors.dense(m)
  }
  //算法核心,返回最优的中心向量及开销
  def runAlgorithm(data:RDD[VectorInform]):(Array[VectorInform],Double)=
  {
    var sc=data.sparkContext
    var center=InitCenterPoint(data)
    var runactive=Array.fill(runs)(true)
    var cost=Array.fill(runs)(0.0)
    var activeRuns=new ArrayBuffer[Int]++(0 until runs)//记录还在活跃的计算,因为有的计算可能已经收敛停止了
    var k=0
    while(k<MaxIterations&&(!activeRuns.isEmpty))
    {
      k+=1
      var cost2=Array.fill(runs)(0).map {_=>sc.accumulator(0.0)}//累加器
      var activecenter=activeRuns.map { x => center(x)}//这步很重要,每次迭代的时候去除那些已经收敛的计算
      var bestcenter=sc.broadcast(center)//广播,把中心的数据传输到每个分区,接下来就马上体现了并行计算的思想
      //每个分区计算计算,体会下并行计算
      var result=data.mapPartitions{points=> 
        /*
         * 获取必要的参数
         */
       val thiscenter=bestcenter.value //每个分区中获取中心点
       val runs= thiscenter.length//并行计算数量
       val n=thiscenter(0).length//n个中心点
       val dims=thiscenter(0)(0).Point.size//中心点的维度
       /*
        * 获取该分区类,每个并行计算中的每个类的向量的和与向量的个数
        */
       var sum=Array.fill(runs,n)(Vectors.zeros(dims))//保存每个并行度下每个类的向量的和
       var count=Array.fill(runs, n)(0)//保存每个并行度下每个类中向量的个数
       points.foreach { point => 
       //并行runs计算
        for(i<- 0 until runs)
          {
          val vp=FindClostCenter(thiscenter(i), point)
          count(i)(vp.center_id)+=1
          sum(i)(vp.center_id)=add(sum(i)(vp.center_id), point.Point)
          cost2(i)+=vp.cost
          }  
       }
      val result=for(i<-0 until runs; j<-0 until n)
        yield{
        ((i,j),(sum(i)(j),count(i)(j)))
      }
     result.iterator
    }.reduceByKey((a,b)=>plus(a, b)).collectAsMap()
     
      /*
       *更新中心点并判断是否满足停止的条件 
       */
      for((run,i)<-activeRuns.zipWithIndex)//注意理解这里,有的并行已经停止,run的值与i不一定相等
      {
        var change=false
        for(j<- 0 until numClusters)
        {
          val (sum,n)=result(i,j)//第i个并行计算中第j个类的向量和与向量总数    
          var newc=divide(sum, n)
          if(Vectors.sqdist(newc, center(run)(j).Point)>threshold)
            change=true
          center(run)(j).Point=newc
          }
        if(!change)
        {
          runactive(run)=false
          cost(run)=cost2(run).value
          println("Run "+run+" has stopped")
        }
      }
      activeRuns=activeRuns.filter {runactive(_)}
    }
    /*
     * 选择runs个并行中cost最小的中心点
     */
    var (mincost,bestrun)=cost.zipWithIndex.min
    (center(bestrun),mincost)
  }
  def run()
  {
    var norm2=data.map {Vectors.norm(_, 2)}
    var zipdata=data.zip(norm2).map(f=>new VectorInform(f._1,f._2))
    var center=InitCenterPoint(zipdata)
    var (endcenter,cost)=runAlgorithm(zipdata)
    println("-------------------------------")
    endcenter.foreach {x=>x.Point.foreachActive((a,b)=>print(b+" "));println}
    println("最小花费为:"+cost)
  }
}

package zzl

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.{Vectors,Vector}
object Main {
 
   def main(args: Array[String]): Unit = {
    var sc=new SparkContext(new SparkConf().setAppName("zzl").setMaster("local"))
    var data=sc.textFile("/home/hadoop/xixi", 2).map { s =>Vectors.dense(s.split(" ").map {_.toDouble})}
    
    var k=new Kmeans(data,2,40,20)
    k.run()
    
    
   }
}
本人是小白初学者,如果有哪里编写错误还望各位指正!

2017-03-29 16:40:06 RiverCode 阅读数 3150
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5436 人正在学习 去看看 陆永剑

1.KMeans概念

       KMeans基于划分的聚类方法。给定数据样本集Sample和应该划分的类书K,对样本数据Sample进行聚类,最终形成K个聚类,其相似的度量是某条数据与中心点的“距离”(距离可分为绝对距离、欧氏距离、闵可夫斯基距离。这里说的距离是欧式距离,欧氏距离也称欧几里得距离,它是在m维空间中两个点之间的真实距离)。

2.KMeans算法实例操作

2.1 数据准备

      从官网下载源码时在data文件夹下有mllib文件夹,里面有kmeans_data.txt,内容为:
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2

2.2实现思路

1.设置运行环境;

2.装载kmeans_data.txt数据集;

3.将数据集聚类(聚成2个类),进行20次迭代计算,形成数据模型;
4.在控制台打印数据模型的两个中心点;
5.使用误差平方和评估数据模型;
6.交叉评估1,只返回结果;
7.交叉评估2,返回数据集和结果。

3.用代码说话

     1.设置运行环境

val conf = new SparkConf().setAppName("Kmeans").setMaster("local")
val sc = new SparkContext(conf)

    2.装载数据集

val data = sc.textFile("E:\\spark-2.1.0\\spark-2.1.0\\data\\mllib\\kmeans_data.txt", 1)

val parseData = data.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))

    3.将数据集聚类,分成2个类,20次迭代,形成数据模型

val numClusters = 2
val numIterations = 20
val model = KMeans.train(parseData, numClusters, numIterations)

     4.数据模型的中心点

println("Cluster centers:")
    for (c <- model.clusterCenters) {
      println(" " + c.toString)
    }

    5.使用误差平方和评估数据模型

val cost = model.computeCost(parseData)

    6.使用模型测试单点数据

println("Vectors 0.2 0.2 0.2 is belongs to clusters:" + model.predict(Vectors.dense("0.2 0.2 0.2".split(" ").map(_.toDouble))))

 println("Vectors 0.25 0.25 0.25 is belongs to clusters:" + model.predict(Vectors.dense("0.25 0.25 0.25".split(" ").map(_.toDouble))))

 println("Vectors 8 8 8 is belongs to clusters:" + model.predict(Vectors.dense("8 8 8".split(" ").map(_.toDouble))))

     7.交叉评估1,只返回结果

    val testdata = data.map(s => Vectors.dense(s.split(" ").map(_.toDouble)))
    val result = model.predict(testdata)
    result.saveAsTextFile("F:\\machine-learning\\result1")

    8.交叉评估2,返回数据集和结果

val result2 = data.map {
      line =>
        val linevector = Vectors.dense(line.split(" ").map(_.toDouble))
        val prediction = model.predict(linevector)
        line + " " + prediction
    }.saveAsTextFile("F:\\machine-learning\\result2")

4.结果

     中心点结果:


    单点测试数据结果:


    交叉评估1,只返回结果:


     交叉评估2,返回数据集和结果:


5.相关资源链接

以上其实已经把完整代码都写完了,如果想下载完整代码,(豪们,会员们给点鼓励,多下载,打赏一二。不是会员者,不建议下载,因为基本已是完整代码),链接如下:

Spark中机器学期之KMeans算法完整代码实战讲解

http://download.csdn.net/detail/rivercode/9797842
















2017-09-21 17:58:01 stevekangpei 阅读数 525
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5436 人正在学习 去看看 陆永剑

Spark MLlib源代码解读之KMeans(下)


之前看过Kmeans的源代码,但是对于Spark KMeans生成初始中心点的方法没有理解到位,
最近又看了一下,再次补充一下。Spark生成初始中心点有一个方法叫做initKMeansParallel。
整个代码包含有 Kmeans类和localKmeans类,localkmeans类主要用于实现KMeans++方法来实现得到中心点。


initKMeansParallel大致思路

分布式实现KMeans中心点的方式大致如下:(有点绕)。

首先就像之前博客所讲的,Spark 的KMeans真正执行的有runs个任务,这个runs需要我们自己去指定,默认为1.表示的是会去进行runs个KMeans任务,然后选择其中的最小的cost作为最合适的聚类结果。所以这个initKMeansParallel方法最后返回的结果也是Array[Array[VectorWithNorm]]。是一个二维数组,表示的是有runs组中心点,每组中心点有k个向量。


在真正进行迭代的时候,每次迭代都会筛选到2*k个点,这个筛选的过程是初始化一个rand用于生成随机数,
这个sumCosts表示的是之前通过aggregate聚合好的每个runs参数下的总的cost值。
可以看到如果 (2 · c(r) · k)/sumCosts(r) > rand随机生成的数的话,则将这个point选中。

 val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>

        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
        pointsWithCosts.flatMap { case (p, c) =>
          val rs = (0 until runs).filter { r =>  //这个其实是在算概率, 2* c(r)*k/sumCosts(r)> rand.nextDouble
            //表示的是满足这个条件的点呗过滤下来
            rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
          }
          if (rs.length > 0) Some(p, rs) else None //大于0的话,用some封装,否则返回none。
        }
      }.collect()

选中了之后,需要更新这个centers的值。

def mergeNewCenters(): Unit = { //合并新的中心点到中心
      var r = 0
      while (r < runs) {
        centers(r) ++= newCenters(r)
        newCenters(r).clear()
        r += 1
      }
    }

注意这个mergeNewCenters,其中的centers是一个二维数组,而每一行其实是一个ArrayBuffer。merge的时候,每一行会不停的往里面添加向量。

 val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
  mergeNewCenters()  // 合并一下新的中心点。
      chosen.foreach { case (p, rs) =>
        rs.foreach(newCenters(_) += p.toDense)
      }

通过多次迭代之后,对于每一个runs参数我们会生成一系列的可能不少于K个的待选中心点。我们需要对每一个runs下面的中心点的参数加以权重。加权重的方式是如果在(i, j)第i个runs下的第j个待选中心点如果是最近的点的话,则((i, j), 1.0)。即这个对应的待选中心点加1,这样子的点在以后被筛选到的可能性会大。

val bcCenters = data.context.broadcast(centers)
    val weightMap = data.flatMap { p =>
      Iterator.tabulate(runs) { r =>
        ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
      }
    }.reduceByKey(_ + _).collectAsMap() //聚合之后返回的是一个map类型。

最后调用LocalKMeans. LocalKMeans.kMeansPlusPlus方法来求得finalCenters。

 val finalCenters = (0 until runs).par.map { r =>
      val myCenters = centers(r).toArray //表示的是第r个并行度下的中心点数组。

      val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray //获取weights的方式,刚才我们得到了一个Map类型,如果有的haul,返回值,没有的话返回0.
      LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
    }

LocalKMeans.kMeansPlusPlus这个方法大致是通过距离初始点尽可能远的点找到合适的中心点。当找到k个中心点之后,又再一次使用了一个local 版本的Kmeans算法来继续对这些生成的中心点,进行迭代,知道这些点不改变了之后,返回需要的结果。每次调用一次LocalKMeans.kMeansPlusPlus,返回k个中心点。总共调用runs次,则返回有[runs][k]个我们需要的中心点。


好接下来看看完整的代码吧。

 private def initKMeansParallel(data: RDD[VectorWithNorm])
  : Array[Array[VectorWithNorm]] = {

    //初始化中心及costs,tabluate方法返回一个数组,长度为runs。
    val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
    var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))

    // Initialize each run's first center to a random point.
    val seed = new XORShiftRandom(this.seed).nextInt()
    //初始化第一个中心点
    val sample = data.takeSample(true, runs, seed).toSeq //随机筛选出一些中心点。 
    val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) //获取一个长度为为runs的中心点数组 

    /** Merges new centers to centers. */
    def mergeNewCenters(): Unit = { //合并新的中心点到中心
      var r = 0
      while (r < runs) {
        centers(r) ++= newCenters(r)
        newCenters(r).clear()
        r += 1
      }
    }

    // On each step, sample 2 * k points on average for each run with probability proportional
    // to their squared distance from that run's centers. Note that only distances between points
    // and new centers are computed in each iteration.
    //每次迭代的过程,抽取2*k个样本,每次迭代计算样本点与中心店的距离
    var step = 0
    while (step < initializationSteps) {
      val bcNewCenters = data.context.broadcast(newCenters) //新的中心点
      val preCosts = costs

     //j将数据点和cost通过拉链操作连接在一起,返回一个(point,cost)
     //这个会在下一步的时候通过调用math。min方法,找出cost(r) 和通过kmeans的pointcost方法返回的最小的cost值
     //并将这个值更新到costs数组。同时将这个costs数组cache到缓存。
      costs = data.zip(preCosts).map { case (point, cost) =>
          Array.tabulate(runs) { r =>
            math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
          }
        }.persist(StorageLevel.MEMORY_AND_DISK)

      //接下来聚合costs,聚合后的返回值,为一个Array,其内部的元素为Double类型。
      //注意接下来这个aggregate算子,他接收三个参数,第一个参数接收一个初始值,这个初始值,首先会作用到每个分区,
      //应用于每个分区的函数是接下来定义的第一个SeqOp函数。
      //这个参数会在每个分区发挥作用。
      //接下来有一个combOp函数,这个函数会在对每一个聚合后的结果发挥作用。相当于前面函数作用后的结果,会在后面继续发挥作用.
      //需要注意的是,aggregate算子,的初始参数在第二个函数页发挥作用。而aggregateByKey算子不会发挥作用。

      val sumCosts = costs
        .aggregate(new Array[Double](runs))(
          //接下来计算的方式如下所示,由于有一个并行度,每一个并行度会有一个自己的costs数组,所以计算costs数组的时候会
          // 分开对每一个costs数组进行计算。第一个函数的意思是:合并第二个参数v(也是一个costs值)到第一个s里面。
         //然后返回一个s数组。相当于每一个分区都会有这个s数组
          seqOp = (s, v) => {
            // s += v
            var r = 0
            while (r < runs) {
              s(r) += v(r)
              r += 1
            }
            s
          },
          //接下来对于不同的分区,计算s数组的值的和。和刚才一样,因为有一个并行度,所以默认的是对每一个并行度下的cost数组进行计算。
          //然后返回这个s0。s0是一个数组。从0~runs的一个double类型的数组。每一个对应的元素包含的是对应的在第r次并行度下的cost值

          combOp = (s0, s1) => {
            // s0 += s1
            var r = 0
            while (r < runs) {
              s0(r) += s1(r)
              r += 1
            }
            s0
          }
        )

      bcNewCenters.unpersist(blocking = false) // 去掉在内存中的缓存
      preCosts.unpersist(blocking = false)

      val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>

        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
        pointsWithCosts.flatMap { case (p, c) =>
          val rs = (0 until runs).filter { r =>  //这个其实是在算概率, 2* c(r)*k/sumCosts(r)> rand.nextDouble
            //表示的是满足这个条件的点呗过滤下来
            rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
          }
          if (rs.length > 0) Some(p, rs) else None //大于0的话,用some封装,否则返回none。
        }
      }.collect()

      mergeNewCenters()  // 合并一下新的中心点。
      chosen.foreach { case (p, rs) =>
        rs.foreach(newCenters(_) += p.toDense)
      }
      step += 1
    }

    mergeNewCenters()
    costs.unpersist(blocking = false)

    val bcCenters = data.context.broadcast(centers)
    val weightMap = data.flatMap { p =>
      Iterator.tabulate(runs) { r =>
        ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
      }
    }.reduceByKey(_ + _).collectAsMap()

    bcCenters.unpersist(blocking = false)

    val finalCenters = (0 until runs).par.map { r =>
      val myCenters = centers(r).toArray //表示的是第r个并行度下的中心点数组。
      val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
      LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
    }

    finalCenters.toArray
  }
}
rivate[mllib] object LocalKMeans extends Logging {


  def kMeansPlusPlus(
      seed: Int, //种子点
      points: Array[VectorWithNorm], //中心点
      weights: Array[Double], //权重
      k: Int, //中心点个数
      maxIterations: Int  //最大的迭代次数
  ): Array[VectorWithNorm] = { //最终这个方法返回的是一个中心点和范数数组,表示的是第r个并行度下的中心点向量和范数
    val rand = new Random(seed)  // 初始化随机数,最大值为seed,而seed是每个对应的runs参数,即第一个kmeans任务。
    val dimensions = points(0).vector.size  //中心点的维度数
    val centers = new Array[VectorWithNorm](k) //存储中心点向量,共有k个。

    // Initialize centers by sampling using the k-means++ procedure.
    //通过kmeans++的方式来生成初始化的第一个中心点。
    centers(0) = pickWeighted(rand, points, weights).toDense //第一个中心点。

    for (i <- 1 until k) {
      // Pick the next center with a probability proportional to cost under current centers
      //接下来通过概率的方式找到剩下的中心点,距离当前中心点距离越远,可能性越大。
      val curCenters = centers.view.take(i) //取出第一个,由于刚开始初始化,所以这个值为0.

      val sum = points.view.zip(weights).map { case (p, w) => // (p, w)表示的是p指的是VectorWithNorm,w指的是权重。
        w * KMeans.pointCost(curCenters, p) //将权重值乘以p距离当前的中心点的距离,注意当前的p也是中心点。
      }.sum
      val r = rand.nextDouble() * sum // 将这个值最为累计score的最大值,
      var cumulativeScore = 0.0
      var j = 0
      while (j < points.length && cumulativeScore < r) {
        //找到一个尽可能接近r的累计score的值,然后记录下j。
        cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
        j += 1
      }

      if (j == 0) {
        logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." +
          s" Using duplicate point for center k = $i.")
        centers(i) = points(0).toDense
      } else {
        centers(i) = points(j - 1).toDense //返回points(j-1).
      }
    }

      // 通过上述的方式找到了k的聚类中心点。这k个聚类中心点同属于第runs个并行度下。

    // Run up to maxIterations iterations of Lloyd's algorithm
    //接下来有迭代maxIterations的执行Lloyd算法。    
   //此时计算的时候数据为我们之前的中心点数据,而上一步生成的centers数组作为中心点,然后不断的迭代,知道找到了合适的中心点。
    val oldClosest = Array.fill(points.length)(-1)
    var iteration = 0
    var moved = true
    while (moved && iteration < maxIterations) { //  如果移动了的话,
      moved = false
      val counts = Array.fill(k)(0.0) //这个表示属于这个点的数目,
      val sums = Array.fill(k)(Vectors.zeros(dimensions)) // 这个用来累加第i个向量,
      var i = 0

      while (i < points.length) {
        val p = points(i) // 之前的数据中心点作为数据点。
        val index = KMeans.findClosest(centers, p)._1  // 表示计算的中心点的index,

        axpy(weights(i), p.vector, sums(index))   // 这一步用来更新这个sum,sum(index) += p.vector * weights.

        counts(index) += weights(i)   /// 更新count。

        if (index != oldClosest(i)) {
          moved = true
          oldClosest(i) = index //  如果移动了的话,更新原来存在的index。
        }
        i += 1
      }

      //每一轮计算完成之后需要重新对中心点进行更新。
      // Update centers
      var j = 0
      while (j < k) {
        if (counts(j) == 0.0) { //如果第j个中心点周边的point数目是0的话,
          // Assign center to a random point
          centers(j) = points(rand.nextInt(points.length)).toDense //随机安排一个点。
        } else {
          scal(1.0 / counts(j), sums(j))  //否则的话,更新这个中心点。
          centers(j) = new VectorWithNorm(sums(j))
        }
        j += 1
      }
      iteration += 1
    }

    if (iteration == maxIterations) {
      logInfo(s"Local KMeans++ reached the max number of iterations: $maxIterations.")
    } else {
      logInfo(s"Local KMeans++ converged in $iteration iterations.")
    }

    centers  //最后返回中心点。
  }

  private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = {
    val r = rand.nextDouble() * weights.sum  //weights存储的是这个runs并行度下的所有的权重,这个对其进行求和。
    var i = 0
    var curWeight = 0.0
    while (i < data.length && curWeight < r) { //找到一个累加权重尽可能接近r的i值,返回这个data
      curWeight += weights(i)
      i += 1
    }
    data(i - 1) //当这个i值尽可能的接近r的时候,返回这个i对应的元素,将其作为第一个初始化点。
  }
}
2014-02-27 11:33:53 li385805776 阅读数 5577
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5436 人正在学习 去看看 陆永剑

思路:

1.随机生成数据

2.随机生成K个聚类中心

3.计算每个点所属的类别

4.计算新的聚类中心

5.比较聚类中心的变化情况,大于阈值跳转至3;小于阈值停止。

package myclass

import java.util.Random
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.spark.util.Vector
/**
 * Created by jack on 2/26/14.
 */
object MyKMeans {
	val N = 1000
	val R = 1000     //随机数范围  0-1  *  R
	val D = 10       //点空间纬度
	val K = 10       //聚类中心个数
	val rand = new Random(42) //随机种子
	val convergeDist = 0.01   //迭代收敛条件

	/**
	 * 将p分配到当前所有聚类中心的最短距离的类中
	 * */
	def closestPoint(p:Vector,centers: Array[Vector]): Int = {
		var bestIndex = 0
		var closest = Double.PositiveInfinity

		for (i <- 0 until centers.length) {
			val tempDist = p.squaredDist(centers(i))
			if(tempDist < closest) {
				closest = tempDist
				bestIndex = i
			}
		}

		bestIndex
	}

/**
 * 产生N个D维(每一维取值0-1000)随机的点
 * */
	def generateData = {
		def generatePoint(i: Int) = {
			Vector(D,_ => rand.nextDouble * R)
		}
		Array.tabulate(N)(generatePoint)
	}

	def main(args: Array[String]) {
		val sc = new SparkContext("local","My KMeans",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
		val data = sc.parallelize(generateData).cache()

		//随机初始化K个聚类中心
		val kPoints = data.takeSample(false,K,42).toArray
		var tempDist = 1.0

		while(tempDist > convergeDist) {
			//closest为(类别,(点,1)),1是用来后续统计各个类中点的数量count
			val closest = data.map(p => (closestPoint(p,kPoints),(p,1)))
			//按类别,计算点的坐标和,以及该类别中节点总数 (类别,(点向量和,点数))
			val pointStats = closest.reduceByKey{
				case ((x1,y1),(x2,y2)) => (x1+x2,y1+y2)
			}
			//生成新的聚类中心的Map(类别,新聚类中心)
			val newPoints = pointStats.map{
				pair => (pair._1, pair._2._1 / pair._2._2)
			}.collectAsMap()

			tempDist = 0.0
			for (i <- 0 until K) {
				tempDist += kPoints(i).squaredDist(newPoints(i))
			}
			//更新聚类中心到kPoint
			for (newP <- newPoints) {
				kPoints(newP._1) = newP._2
			}
			println("Finished iteration(delta = "+ tempDist + ")")
		}
		println("Final centers:")
		kPoints.foreach(println)
		System.exit(0)
	}
}


没有更多推荐了,返回首页