kmeans spark_spark-kmeans - CSDN
  • import sys import numpy as np from pyspark.sql import SparkSession #该函数主要是将文件的string类型转换成float类型 def parseVector(line): return np.array([float(x) for x in line.split(' ')]) ...
    import sys
    
    import numpy as np
    from pyspark.sql import SparkSession
    #该函数主要是将文件的string类型转换成float类型
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    #该函数将点分配到点集中,返回的是点集的index
    #其中传入的参数p是需分配的点的值(可以看成矢量,假设是m维),centers是目前的中心点的值(可以看成n*m维的矩阵,其中n指的是中心点的个数)
    def closestPoint(p, centers):
        bestIndex = 0
        closest = float("+inf")
        for i in range(len(centers)):
            tempDist = np.sum((p - centers[i]) ** 2)#计算需分配的点与第i个中心点的距离
            if tempDist < closest:
                closest = tempDist
                bestIndex = i
        return bestIndex
    
    
    if __name__ == "__main__":
    
        if len(sys.argv) != 4:
            print("Usage: kmeans <file> <k> <convergeDist>", file=sys.stderr)
            exit(-1)
    
        print("""WARN: This is a naive implementation of KMeans Clustering and is given
           as an example! Please refer to examples/src/main/python/ml/kmeans_example.py for an
           example on how to use ML's KMeans implementation.""", file=sys.stderr)
    
        spark = SparkSession\
            .builder\
            .appName("PythonKMeans")\
            .getOrCreate()
    
        lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
        data = lines.map(parseVector).cache()
        K = int(sys.argv[2])  #设置的中心点的个数
        convergeDist = float(sys.argv[3])  #设置的阈值
    
        kPoints = data.takeSample(False, K, 1)  #在data点集中随机选取K个点作为中心点
        tempDist = 1.0
    
        while tempDist > convergeDist:
            closest = data.map(
                lambda p: (closestPoint(p, kPoints), (p, 1)))#返回的结果是(p点所分配的集合的index值,(p点的值,1))
            pointStats = closest.reduceByKey(   #reduceByKey针对具有相同键(在这里指的是p点被分配到相同的点集)的二元组
    #则p1_c1和p2_c2指的都是二元组的value值,也就是(p,1)。所以,该句子表示将在同一个点集上的点的(p,1)分别求和,其中别忘了p是一个矢量
    lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1])) #得到的是(点集的index,(点值的求和,该点集的点的个数)) newPoints = pointStats.map( # 返回(点集的index,点值求和/点的个数(矢量除法)),作为新的中心点 lambda st: (st[0], st[1][0] / st[1][1])).collect()        #计算新旧中心点的距离差 tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints) for (iK, p) in newPoints: kPoints[iK] = p #设置新的中心点的值 print("Final centers: " + str(kPoints)) spark.stop()

    有可能表述不是很准确,但是能看懂就行。(随机可能学多了,喜欢用矩阵看问题)

    展开全文
  • 1.1 KMeans聚类算法 1.1.1 基础理论 KMeans算法的基本思想是初始随机给定K个簇中心,按照最邻近原则把待分类样本点分到各个簇。然后按平均法重新计算各个簇的质心,从而确定新的簇心。一直迭代,直到簇心的移动距离...

    1.1 KMeans聚类算法

    1.1.1 基础理论

    KMeans算法的基本思想是初始随机给定K个簇中心,按照最邻近原则把待分类样本点分到各个簇。然后按平均法重新计算各个簇的质心,从而确定新的簇心。一直迭代,直到簇心的移动距离小于某个给定的值。

    K-Means聚类算法主要分为三个步骤:

    (1)第一步是为待聚类的点寻找聚类中心;

    (2)第二步是计算每个点到聚类中心的距离,将每个点聚类到离该点最近的聚类中去;

    (3)第三步是计算每个聚类中所有点的坐标平均值,并将这个平均值作为新的聚类中心;

    反复执行(2)、(3),直到聚类中心不再进行大范围移动或者聚类次数达到要求为止。

    1.1.2过程演示

    下图展示了对n个样本点进行K-means聚类的效果,这里k取2:

    (a)未聚类的初始点集;

    (b)随机选取两个点作为聚类中心;

    (c)计算每个点到聚类中心的距离,并聚类到离该点最近的聚类中去;

    (d)计算每个聚类中所有点的坐标平均值,并将这个平均值作为新的聚类中心;

    (e)重复(c),计算每个点到聚类中心的距离,并聚类到离该点最近的聚类中去;

    (f)重复(d),计算每个聚类中所有点的坐标平均值,并将这个平均值作为新的聚类中心。

    参照以下文档:

    http://blog.sina.com.cn/s/blog_62186b46010145ne.html

    1.2 Spark Mllib KMeans源码分析

    class KMeansprivate (

        privatevar k: Int,

        privatevar maxIterations: Int,

        privatevar runs: Int,

        privatevar initializationMode: String,

        privatevar initializationSteps: Int,

        privatevar epsilon: Double,

        privatevar seed: Long)extends Serializablewith Logging {

    // KMeans类参数:

    k:聚类个数,默认2maxIterations:迭代次数,默认20runs:并行度,默认1

    initializationMode:初始中心算法,默认"k-means||"initializationSteps:初始步长,默认5epsilon:中心距离阈值,默认1e-4seed:随机种子。

      /**

       * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1,

       * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}.

       */

      defthis() =this(2,20, 1, KMeans.K_MEANS_PARALLEL,5, 1e-4, Utils.random.nextLong())

    // 参数设置

    /** Set the number of clusters to create (k). Default: 2. */

      def setK(k: Int):this.type = {

        this.k = k

        this

      }

    **省略各个参数设置代码**

    // run方法,KMeans主入口函数

      /**

       * Train a K-means model on the given set of points; `data` should be cached for high

       * performance, because this is an iterative algorithm.

       */

      def run(data: RDD[Vector]): KMeansModel = {

     

        if (data.getStorageLevel == StorageLevel.NONE) {

          logWarning("The input data is not directly cached, which may hurt performance if its"

            + " parent RDDs are also uncached.")

        }

     

    // Compute squared norms and cache them.

    // 计算每行数据的L2范数,数据转换:data[Vector]=> data[(Vector, norms)],其中norms是Vector的L2范数,norms就是

        val norms = data.map(Vectors.norm(_,2.0))

        norms.persist()

        val zippedData = data.zip(norms).map {case (v, norm) =>

          new VectorWithNorm(v, norm)

        }

        val model = runAlgorithm(zippedData)

        norms.unpersist()

     

        // Warn at the end of the run as well, for increased visibility.

        if (data.getStorageLevel == StorageLevel.NONE) {

          logWarning("The input data was not directly cached, which may hurt performance if its"

            + " parent RDDs are also uncached.")

        }

        model

      }

    // runAlgorithm方法,KMeans实现方法。

      /**

       * Implementation of K-Means algorithm.

       */

      privatedef runAlgorithm(data: RDD[VectorWithNorm]): KMeansModel = {

     

        val sc = data.sparkContext

     

        val initStartTime = System.nanoTime()

     

        val centers =if (initializationMode == KMeans.RANDOM) {

          initRandom(data)

        } else {

          initKMeansParallel(data)

        }

     

        val initTimeInSeconds = (System.nanoTime() - initStartTime) /1e9

        logInfo(s"Initialization with $initializationMode took " +"%.3f".format(initTimeInSeconds) +

          " seconds.")

     

        val active = Array.fill(runs)(true)

        val costs = Array.fill(runs)(0.0)

     

        var activeRuns =new ArrayBuffer[Int] ++ (0 until runs)

        var iteration =0

     

        val iterationStartTime = System.nanoTime()

    //KMeans迭代执行,计算每个样本属于哪个中心点,中心点累加样本的值及计数,然后根据中心点的所有的样本数据进行中心点的更新,并比较更新前的数值,判断是否完成。其中runs代表并行度。

        // Execute iterations of Lloyd's algorithm until all runs have converged

        while (iteration < maxIterations && !activeRuns.isEmpty) {

          type WeightedPoint = (Vector, Long)

          def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint = {

            axpy(1.0, x._1, y._1)

            (y._1, x._2 + y._2)

          }

     

          val activeCenters = activeRuns.map(r => centers(r)).toArray

          val costAccums = activeRuns.map(_ => sc.accumulator(0.0))

     

          val bcActiveCenters = sc.broadcast(activeCenters)

     

          // Find the sum and count of points mapping to each center

    //计算属于每个中心点的样本,对每个中心点的样本进行累加和计算;

    runs代表并行度,k中心点个数,sums代表中心点样本累加值,counts代表中心点样本计数;

    contribs代表((并行度I,中心J),(中心J样本之和,中心J样本计数和));

    findClosest方法:找到点与所有聚类中心最近的一个中心

          val totalContribs = data.mapPartitions { points =>

            val thisActiveCenters = bcActiveCenters.value

            val runs = thisActiveCenters.length

            val k = thisActiveCenters(0).length

            val dims = thisActiveCenters(0)(0).vector.size

     

            val sums = Array.fill(runs, k)(Vectors.zeros(dims))

            val counts = Array.fill(runs, k)(0L)

     

            points.foreach { point =>

              (0 until runs).foreach { i =>

               val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)

               costAccums(i) += cost

               val sum = sums(i)(bestCenter)

               axpy(1.0, point.vector, sum)

               counts(i)(bestCenter) += 1

              }

            }

     

            val contribs =for (i <-0 until runs; j <-0 until k) yield {

              ((i, j), (sums(i)(j), counts(i)(j)))

            }

            contribs.iterator

          }.reduceByKey(mergeContribs).collectAsMap()

    //更新中心点,更新中心点= sum/count

    判断newCentercenters之间的距离是否 > epsilon * epsilon;

          // Update the cluster centers and costs for each active run

          for ((run, i) <- activeRuns.zipWithIndex) {

            var changed =false

            var j =0

            while (j < k) {

              val (sum, count) = totalContribs((i, j))

              if (count !=0) {

               scal(1.0 / count, sum)

               val newCenter =new VectorWithNorm(sum)

               if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {

                 changed = true

               }

               centers(run)(j) = newCenter

              }

              j += 1

            }

            if (!changed) {

              active(run) = false

              logInfo("Run " + run +" finished in " + (iteration +1) + " iterations")

            }

            costs(run) = costAccums(i).value

          }

     

          activeRuns = activeRuns.filter(active(_))

          iteration += 1

        }

     

        val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) /1e9

        logInfo(s"Iterations took " +"%.3f".format(iterationTimeInSeconds) +" seconds.")

     

        if (iteration == maxIterations) {

          logInfo(s"KMeans reached the max number of iterations: $maxIterations.")

        } else {

          logInfo(s"KMeans converged in $iteration iterations.")

        }

     

        val (minCost, bestRun) = costs.zipWithIndex.min

     

        logInfo(s"The cost for the best run is $minCost.")

     

        new KMeansModel(centers(bestRun).map(_.vector))

      }

    //findClosest方法:找到点与所有聚类中心最近的一个中心

    /**

       * Returns the index of the closest center to the given point, as well as the squared distance.

       */

      private[mllib]def findClosest(

          centers: TraversableOnce[VectorWithNorm],

          point: VectorWithNorm): (Int, Double) = {

        var bestDistance = Double.PositiveInfinity

        var bestIndex =0

        var i =0

        centers.foreach { center =>

          // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary

          // distance computation.

          var lowerBoundOfSqDist = center.norm - point.norm

          lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist

          if (lowerBoundOfSqDist < bestDistance) {

            val distance: Double = fastSquaredDistance(center, point)

            if (distance < bestDistance) {

              bestDistance = distance

              bestIndex = i

            }

          }

          i += 1

        }

        (bestIndex, bestDistance)

      }

    findClosest方法中:var lowerBoundOfSqDist = center.norm - point.norm

    lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist

    如果中心点center是(a1,b1),需要计算的点point是(a2,b2),那么lowerBoundOfSqDist是:

    如下是展开式,第二个是真正计算欧式距离时的除去开平方的公式。(在查找最短距离的时候无需计算开方,因为只需要计算出开方里面的式子就可以进行比较了,mllib也是这样做的)

    可轻易证明上面两式的第一式将会小于等于第二式,因此在进行距离比较的时候,先计算很容易计算的lowerBoundOfSqDist,如果lowerBoundOfSqDist都不小于之前计算得到的最小距离bestDistance,那真正的欧式距离也不可能小于bestDistance了,因此这种情况下就不需要去计算欧式距离,省去很多计算工作。

    如果lowerBoundOfSqDist小于了bestDistance,则进行距离的计算,调用fastSquaredDistance,这个方法将调用MLUtils.scala里面的fastSquaredDistance方法,计算真正的欧式距离,代码如下:

    /**

       * Returns the squared Euclidean distance between two vectors. The following formula will be used

       * if it does not introduce too much numerical error:

       * <pre>

       *   \|a - b\|_2^2 = \|a\|_2^2 + \|b\|_2^2 - 2 a^T b.

       * </pre>

       * When both vector norms are given, this is faster than computing the squared distance directly,

       * especially when one of the vectors is a sparse vector.

       *

       * @param v1 the first vector

       * @param norm1 the norm of the first vector, non-negative

       * @param v2 the second vector

       * @param norm2 the norm of the second vector, non-negative

       * @param precision desired relative precision for the squared distance

       * @return squared distance between v1 and v2 within the specified precision

       */

      private[mllib]def fastSquaredDistance(

          v1: Vector,

          norm1: Double,

          v2: Vector,

          norm2: Double,

          precision: Double = 1e-6): Double = {

        val n = v1.size

        require(v2.size == n)

        require(norm1 >= 0.0 && norm2 >=0.0)

        val sumSquaredNorm = norm1 * norm1 + norm2 * norm2

        val normDiff = norm1 - norm2

        var sqDist =0.0

        /*

         * The relative error is

         * <pre>

         * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),

         * </pre>

         * which is bounded by

         * <pre>

         * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).

         * </pre>

         * The bound doesn't need the inner product, so we can use it as a sufficient condition to

         * check quickly whether the inner product approach is accurate.

         */

        val precisionBound1 =2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)

        if (precisionBound1 < precision) {

          sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)

        } elseif (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {

          val dotValue = dot(v1, v2)

          sqDist = math.max(sumSquaredNorm - 2.0 * dotValue,0.0)

          val precisionBound2 = EPSILON * (sumSquaredNorm +2.0 * math.abs(dotValue)) /

            (sqDist + EPSILON)

          if (precisionBound2 > precision) {

            sqDist = Vectors.sqdist(v1, v2)

          }

        } else {

          sqDist = Vectors.sqdist(v1, v2)

        }

        sqDist

      }

    fastSquaredDistance方法会先计算一个精度,有关精度的计算val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON),如果在精度满足条件的情况下,欧式距离sqDist = sumSquaredNorm - 2.0 * v1.dot(v2),sumSquaredNorm即为,2.0 * v1.dot(v2)即为。这也是之前将norm计算出来的好处。如果精度不满足要求,则进行原始的距离计算公式了,即调用Vectors.sqdist(v1, v2)。

    1.3 Mllib KMeans实例

    1、数据

    数据格式为:特征1 特征2 特征3

    0.0 0.0 0.0

    0.1 0.1 0.1

    0.2 0.2 0.2

    9.0 9.0 9.0

    9.1 9.1 9.1

    9.2 9.2 9.2

    2、代码

      //1读取样本数据

      valdata_path ="/home/jb-huangmeiling/kmeans_data.txt"

      valdata =sc.textFile(data_path)

      valexamples =data.map { line =>

        Vectors.dense(line.split(' ').map(_.toDouble))

      }.cache()

      valnumExamples =examples.count()

      println(s"numExamples = $numExamples.")

      //2建立模型

      valk =2

      valmaxIterations =20

      valruns =2

      valinitializationMode ="k-means||"

      valmodel = KMeans.train(examples,k, maxIterations,runs, initializationMode)

      //3计算测试误差

      valcost =model.computeCost(examples)

      println(s"Total cost = $cost.")

    展开全文
  • 1.标准kmeans算法kmeans算法是实际中最常用的聚类算法,没有之一。kmeans算法的原理简单,实现起来不是很复杂,实际中使用的效果一般也不错,所以深受广大人民群众的喜爱。 kmeans算法的原理介绍方面的paper...

    1.标准kmeans算法

    kmeans算法是实际中最常用的聚类算法,没有之一。kmeans算法的原理简单,实现起来不是很复杂,实际中使用的效果一般也不错,所以深受广大人民群众的喜爱。
    kmeans算法的原理介绍方面的paper多如牛毛,而且理解起来确实也不是很复杂,这里使用wiki上的版本:
    已知观测集(x1,x2,,xn),其中每个观测都是一个d维实矢量,kmeans聚类要把这n个观测值划分到k个集合中(kn),使得组内平方和(WCSS within-cluster sum of squares)最小。换句话说,它的目标是找到使得下式满足的聚类Si

    argminSi=1kxSixμi2

    其中μiSi中所有点的均值。

    标准kmeans算法的步骤一般如下:
    1.先随机挑选k个初始聚类中心。
    2.计算数据集中每个点到每个聚类中心的距离,然后将这个点分配到离该点最近的聚类中心。
    3.重新计算每个类中所有点的坐标的平均值,并将得到的这个新的点作为新的聚类中心。
    重复上面第2、3步,知道聚类中心点不再大范围移动(精度自己定义)或者迭代的总次数达到最大。

    2.标准kmeans算法的优缺点

    标准的kmeans算法的优缺点都很突出。这里挑几个最重要的点总结一下。

    主要优点:

    1.原理简单,易于理解。
    2.实现简单
    3.计算速度较快
    4.聚类效果还不错。

    主要缺点:

    1.需要确定k值。
    2.对初始中心点的选择敏感。
    3.对异常值敏感,因为异常值很很大程度影响聚类中心的位置。
    4.无法增量计算。这点在数据量大的时候尤为突出。

    3.spark中对kmeans的优化

    作为经典的聚类算法,一般的机器学习框架里都实现由kmeans,spark自然也不例外。前面我们已经讲了标准kmeans的流程以及优缺点,那么针对标准kmeans中的不足,spark里主要做了如下的优化:

    1.选择合适的K值。

    k的选择是kmeans算法的关键。Spark MLlib在KMeansModel里实现了computeCost方法,这个方法通过计算数据集中所有的点到最近中心点的平方和来衡量聚类的效果。一般来说,同样的迭代次数,这个cost值越小,说明聚类的效果越好。但在实际使用过程中,必须还要考虑聚类结果的可解释性,不能一味地选择cost值最小的那个k。比如我们如果考虑极限情况,如果数据集有n个点,如果令k=n,每个点都是聚类中心,每个类都只有一个点,此时cost值最小为0。但是这样的聚类结果显然是没有实际意义的。

    2.选择合适的初始中心点

    大部分迭代算法都对初始值很敏感,kmeans也是如此。spark MLlib在初始中心点的选择上,使用了k-means++的算法。想要详细了解k-means++的同学们,可以参考k-means++在wiki上的介绍:https://en.wikipedia.org/wiki/K-means%2B%2B
    kmeans++的基本思想是是初始中心店的相互距离尽可能远。为了实现这个初衷,采取如下步骤:
    1.从初始数据集中随机选择一个点作为第一个聚类中心点。
    2.计算数据集中所有点到最近一个中心点的距离D(x)并存在一个数组里,然后将所有这些距离加起来得到Sum(D(x))。
    3.然后再取一个随机值,用权重的方式计算下一个中心点。具体的实现方法:先取一个在Sum(D(x))范围内的随机值,然后领Random -= D(x),直至Random <= 0,此时这个D(x)对应的点为下一个中心点。
    4.重复2、3步直到k个聚类中心点被找出。
    5.利用找出的k个聚类中心点,执行标准的kmeans算法。

    算法的关键是在第三步。有两个小点需要说明:
    1.不能直接取距离最大的那个点当中心店。因为这个点很可能是离群点。
    2.这种取随机值的方法能保证距离最大的那个点被选中的概率最大。给大家举个很简单的例子:假设有四个点A、B、C、D,分别离最近中心的距离D(x)为1、2、3、4,那么Sum(D(x))=10。然后在[0,10]之间取一随机数假设为random,然后用random与D(x)依次相减,直至random<0为止。应该不难发现,D被选中的概率最大。

    4.spark实战kmeans算法

    前面讲了这么多理论,照例咱们需要实践一把。talk is cheap,show me the code!

    1.准备数据

    首先准备数据集。这里采用的数据集是UCI的一个数据集。数据地址http://archive.ics.uci.edu/ml/datasets/Wholesale+customers?cm_mc_uid=70889544450214522232748&cm_mc_sid_50200000=1469871598。UCI是一个常用的标准测试数据集,是搞ML与DM同学经常使用的数据集。关于该数据集的介绍,同学们可以去网页上查看。

    将数据下载下来以后查看一把,第一行相当于是表头,是对数据的相关说明。将此行去掉,还剩440行。将前400行作为训练集,后40行作为测试集。

    2.将代码run起来

    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
    import org.apache.spark.mllib.linalg.Vectors
    
    object KmeansTest {
      def main(args: Array[String]) {
    
        val conf = new
            SparkConf().setAppName("K-Means Clustering").setMaster("spark://your host:7077").setJars(List("your jar file"))
        val sc = new SparkContext(conf)
    
        val rawTrainingData = sc.textFile("file:///Users/lei.wang/data/data_training")
        val parsedTrainingData =
          rawTrainingData.filter(!isColumnNameLine(_)).map(line => {
            Vectors.dense(line.split(",").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
          }).cache()
    
        // Cluster the data into two classes using KMeans
    
        val numClusters = 8
        val numIterations = 30
        val runTimes = 3
        var clusterIndex: Int = 0
        val clusters: KMeansModel =
          KMeans.train(parsedTrainingData, numClusters, numIterations, runTimes)
    
        println("Cluster Number:" + clusters.clusterCenters.length)
    
        println("Cluster Centers Information Overview:")
        clusters.clusterCenters.foreach(
          x => {
            println("Center Point of Cluster " + clusterIndex + ":")
            println(x)
            clusterIndex += 1
          })
    
        //begin to check which cluster each test data belongs to based on the clustering result
    
        val rawTestData = sc.textFile("file:///Users/lei.wang/data/data_test")
        val parsedTestData = rawTestData.map(line => {
          Vectors.dense(line.split(",").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
    
        })
        parsedTestData.collect().foreach(testDataLine => {
          val predictedClusterIndex:
          Int = clusters.predict(testDataLine)
          println("The data " + testDataLine.toString + " belongs to cluster " +
            predictedClusterIndex)
        })
    
        println("Spark MLlib K-means clustering test finished.")
      }
    
      private def isColumnNameLine(line: String): Boolean = {
        if (line != null && line.contains("Channel")) true
        else false
      }
    }

    在本地将代码跑起来以后,输出如下:

    ...
    Cluster Number:8
    Cluster Centers Information Overview:
    Center Point of Cluster 0:
    [1.103448275862069,2.5517241379310343,39491.1724137931,4220.6551724137935,5250.172413793103,4478.103448275862,870.9655172413793,2152.8275862068967]
    Center Point of Cluster 1:
    [2.0,2.4210526315789473,7905.894736842105,20288.052631578947,30969.263157894737,2002.0526315789473,14125.105263157893,3273.4736842105262]
    Center Point of Cluster 2:
    [1.0,2.5,34782.0,30367.0,16898.0,48701.5,755.5,26776.0]
    Center Point of Cluster 3:
    [1.2190476190476192,2.5142857142857147,17898.97142857143,3221.7904761904765,4525.866666666667,3639.419047619048,1061.152380952381,1609.9047619047622]
    Center Point of Cluster 4:
    [1.8987341772151898,2.481012658227848,4380.5822784810125,9389.151898734177,14524.556962025315,1508.4556962025317,6457.683544303797,1481.1772151898733]
    Center Point of Cluster 5:
    [1.0817610062893082,2.4716981132075473,5098.270440251573,2804.295597484277,3309.0943396226417,2416.37106918239,901.1886792452831,803.0062893081762]
    Center Point of Cluster 6:
    [1.0,3.0,85779.66666666666,12503.666666666666,12619.666666666666,13991.666666666666,2159.0,3958.0]
    Center Point of Cluster 7:
    [2.0,3.0,29862.5,53080.75,60015.75,3262.25,27942.25,3082.25]
    ...

    此部分内容为聚类中心点相关信息,我们将k设为8,所以一共有8个中心点。

    ...
    The data [1.0,3.0,4446.0,906.0,1238.0,3576.0,153.0,1014.0] belongs to cluster 5
    The data [1.0,3.0,27167.0,2801.0,2128.0,13223.0,92.0,1902.0] belongs to cluster 3
    The data [1.0,3.0,26539.0,4753.0,5091.0,220.0,10.0,340.0] belongs to cluster 3
    The data [1.0,3.0,25606.0,11006.0,4604.0,127.0,632.0,288.0] belongs to cluster 3
    The data [1.0,3.0,18073.0,4613.0,3444.0,4324.0,914.0,715.0] belongs to cluster 3
    The data [1.0,3.0,6884.0,1046.0,1167.0,2069.0,593.0,378.0] belongs to cluster 5
    The data [1.0,3.0,25066.0,5010.0,5026.0,9806.0,1092.0,960.0] belongs to cluster 3
    The data [2.0,3.0,7362.0,12844.0,18683.0,2854.0,7883.0,553.0] belongs to cluster 4
    The data [2.0,3.0,8257.0,3880.0,6407.0,1646.0,2730.0,344.0] belongs to cluster 5
    The data [1.0,3.0,8708.0,3634.0,6100.0,2349.0,2123.0,5137.0] belongs to cluster 5
    The data [1.0,3.0,6633.0,2096.0,4563.0,1389.0,1860.0,1892.0] belongs to cluster 5
    The data [1.0,3.0,2126.0,3289.0,3281.0,1535.0,235.0,4365.0] belongs to cluster 5
    The data [1.0,3.0,97.0,3605.0,12400.0,98.0,2970.0,62.0] belongs to cluster 4
    The data [1.0,3.0,4983.0,4859.0,6633.0,17866.0,912.0,2435.0] belongs to cluster 5
    The data [1.0,3.0,5969.0,1990.0,3417.0,5679.0,1135.0,290.0] belongs to cluster 5
    The data [2.0,3.0,7842.0,6046.0,8552.0,1691.0,3540.0,1874.0] belongs to cluster 5
    The data [2.0,3.0,4389.0,10940.0,10908.0,848.0,6728.0,993.0] belongs to cluster 4
    The data [1.0,3.0,5065.0,5499.0,11055.0,364.0,3485.0,1063.0] belongs to cluster 4
    The data [2.0,3.0,660.0,8494.0,18622.0,133.0,6740.0,776.0] belongs to cluster 4
    The data [1.0,3.0,8861.0,3783.0,2223.0,633.0,1580.0,1521.0] belongs to cluster 5
    The data [1.0,3.0,4456.0,5266.0,13227.0,25.0,6818.0,1393.0] belongs to cluster 4
    The data [2.0,3.0,17063.0,4847.0,9053.0,1031.0,3415.0,1784.0] belongs to cluster 3
    The data [1.0,3.0,26400.0,1377.0,4172.0,830.0,948.0,1218.0] belongs to cluster 3
    The data [2.0,3.0,17565.0,3686.0,4657.0,1059.0,1803.0,668.0] belongs to cluster 3
    The data [2.0,3.0,16980.0,2884.0,12232.0,874.0,3213.0,249.0] belongs to cluster 3
    The data [1.0,3.0,11243.0,2408.0,2593.0,15348.0,108.0,1886.0] belongs to cluster 3
    The data [1.0,3.0,13134.0,9347.0,14316.0,3141.0,5079.0,1894.0] belongs to cluster 4
    The data [1.0,3.0,31012.0,16687.0,5429.0,15082.0,439.0,1163.0] belongs to cluster 0
    The data [1.0,3.0,3047.0,5970.0,4910.0,2198.0,850.0,317.0] belongs to cluster 5
    The data [1.0,3.0,8607.0,1750.0,3580.0,47.0,84.0,2501.0] belongs to cluster 5
    The data [1.0,3.0,3097.0,4230.0,16483.0,575.0,241.0,2080.0] belongs to cluster 4
    The data [1.0,3.0,8533.0,5506.0,5160.0,13486.0,1377.0,1498.0] belongs to cluster 5
    The data [1.0,3.0,21117.0,1162.0,4754.0,269.0,1328.0,395.0] belongs to cluster 3
    The data [1.0,3.0,1982.0,3218.0,1493.0,1541.0,356.0,1449.0] belongs to cluster 5
    The data [1.0,3.0,16731.0,3922.0,7994.0,688.0,2371.0,838.0] belongs to cluster 3
    The data [1.0,3.0,29703.0,12051.0,16027.0,13135.0,182.0,2204.0] belongs to cluster 0
    The data [1.0,3.0,39228.0,1431.0,764.0,4510.0,93.0,2346.0] belongs to cluster 0
    The data [2.0,3.0,14531.0,15488.0,30243.0,437.0,14841.0,1867.0] belongs to cluster 1
    The data [1.0,3.0,10290.0,1981.0,2232.0,1038.0,168.0,2125.0] belongs to cluster 5
    The data [1.0,3.0,2787.0,1698.0,2510.0,65.0,477.0,52.0] belongs to cluster 5
    ...

    此部分内容为测试集的聚类结果。因为我们选了40个样本作为测试集,所以此部分输出的内容一共有40行。

    5.后续工作

    本次测试是在单机上做的demo测试,数据集比较小,运算过程也比较快。其实当数据量增大以后,基本过程跟这是类似的,只需要将input改为集群的数据路径,然后再写个简单的shell脚本,调用spark-submit,将任务提交到集群即可。

    展开全文
  • Spark-KMeans聚类分析

    2018-09-14 15:56:45
    Spark机器学习库简介 K-means聚类算法原理 K-means 实现 运行示例 K值的选择 Spark机器学习库简介 MLlib是Spark的机器学习(ML)库。其目标是使实用的机器学习可扩展且简单。从较高的层面来说,它提供了以下...

    目录

    Spark机器学习库简介

    K-means聚类算法原理

    K-means 实现

    运行示例

    K值的选择


    Spark机器学习库简介

    MLlib是Spark的机器学习(ML)库。其目标是使实用的机器学习可扩展且简单。从较高的层面来说,它提供了以下工具:

    •     ML算法:常见的学习算法,如分类,回归,聚类和协同过滤
    •     特征化:特征提取,转换,降维和选择
    •     管道:用于构建,评估和调整ML管道的工具
    •     持久性:保存和加载算法,模型和管道
    •     实用程序:线性代数,统计,数据处理等

    使用 Spark 机器学习库来做机器学习工作,可以说是非常的简单,通常只需要在对原始数据进行处理后,然后直接调用相应的 API 就可以实现。但是要想选择合适的算法,高效准确地对数据进行分析,您可能还需要深入了解下算法原理,以及相应 Spark MLlib API 实现的参数的意义。

    K-means聚类算法原理

    何谓聚类,聚类指的是将数据分类到不同的类或者簇这样的一个过程,所以同一个簇中的对象有很大的相似性,而不同簇间的对象有很大的相异性,聚类与分类的不同在于,聚类所要求划分的类是未知的。

    聚类分析是一个无监督学习 (Unsupervised Learning) 过程, 一般是用来对数据对象按照其特征属性进行分组,经常被应用在客户分群,欺诈检测,图像分析等领域。K-means 应该是最有名并且最经常使用的聚类算法了,其原理比较容易理解,并且聚类效果良好,有着广泛的使用。

    和诸多机器学习算法一样,K-means 算法也是一个迭代式的算法,其主要步骤如下:

    • 第一步,选择 K 个点作为初始聚类中心。
    • 第二步,计算其余所有点到聚类中心的距离,并把每个点划分到离它最近的聚类中心所在的聚类中去。在这里,衡量距离一般有多个函数可以选择,最常用的是欧几里得距离 (Euclidean Distance), 也叫欧式距离。
    • 第三步,重新计算每个聚类中所有点的平均值,并将其作为新的聚类中心点。
    • 第四步,重复 (二),(三) 步的过程,直至聚类中心不再发生改变,或者算法达到预定的迭代次数,又或聚类中心的改变小于预先设定的阀值。

    在实际应用中,K-means 算法有两个不得不面对并且克服的问题。

    • 聚类个数 K 的选择。K 的选择是一个比较有学问和讲究的步骤,我们会在后文专门描述如何使用 Spark 提供的工具选择 K。
    • 初始聚类中心点的选择。选择不同的聚类中心可能导致聚类结果的差异。

    Spark MLlib K-means 算法的实现在初始聚类点的选择上,借鉴了一个叫 K-means||的类 K-means++ 实现。K-means++ 算法在初始点选择上遵循一个基本原则: 初始聚类中心点相互之间的距离应该尽可能的远。基本步骤如下:

    • 第一步,从数据集 X 中随机选择一个点作为第一个初始点。
    • 第二步,计算数据集中所有点与最新选择的中心点的距离 D(x)。
    • 第三步,选择下一个中心点,使得最大。
    • 第四部,重复 (二),(三) 步过程,直到 K 个初始点选择完成。

    K-means 实现

    Spark MLlib 中 K-means 算法的实现类 (KMeans.scala) 具有以下参数,具体如下:
    class KMeans private (
        private var k: Int,
        private var maxIterations: Int,
        private var runs: Int,
        private var initializationMode: String,
        private var initializationSteps: Int,
        private var epsilon: Double,
        private var seed: Long) extends Serializable with Logging

    • k 表示期望的聚类的个数。
    • maxInterations 表示方法单次运行最大的迭代次数。
    • runs 表示算法被运行的次数。K-means 算法不保证能返回全局最优的聚类结果,所以在目标数据集上多次跑 K-means 算法,有助于返回最佳聚类结果。
    • initializationMode 表示初始聚类中心点的选择方式, 目前支持随机选择或者 K-means||方式。默认是 K-means||。
    • initializationSteps表示 K-means||方法中的部数。
    • epsilon 表示 K-means 算法迭代收敛的阀值。
    • seed 表示集群初始化时的随机种子。

    运行示例

    import org.apache.spark.mllib.clustering.KMeans
    import org.apache.spark.mllib.linalg
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.rdd.RDD
    
    object KmeansSpark {
    
      def main(args: Array[String]): Unit = {
    
        //在本地启动Spark
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KmeansSpark")
        val sc = new SparkContext(sparkConf)
    
        //加载本地文件数据形成RDD
        val data = sc.textFile("file:///root/test.txt")
        val parsedData: RDD[linalg.Vector] = data.map(s=>{
          val values: Array[Double] = s.split(" ").map(x => x.toDouble)
          Vectors.dense(values)
        })
    
        //聚类中心个数
        val numClusters = 8
        //算法迭代次数
        val numIterations = 20
        //算法运行次数
        val runs = 10
        //KMeans训练
        val kmeansModel = KMeans.train(parsedData, numClusters, numIterations, runs)
    
        //打印聚类中心ID
        kmeansModel.clusterCenters.foreach(x=>{
         println(x)
        })
        //打印数据归属哪个聚类中心ID
        parsedData.map(v => v.toString + " belong to cluster: " +kmeansModel.predict(v))
        ss.foreach(x=>
          println(x)
        )
        sc.stop()
      
      }
    }

    K值的选择

    前面提到 K 的选择是 K-means 算法的关键,Spark MLlib 在 KMeansModel 类里提供了 computeCost 方法,该方法通过计算所有数据点到其最近的中心点的平方和来评估聚类的效果。一般来说,同样的迭代次数和算法跑的次数,这个值越小代表聚类的效果越好。但是在实际情况下,我们还要考虑到聚类结果的可解释性,不能一味的选择使 computeCost 结果值最小的那个 K。

    val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)
    ks.foreach(cluster => {
     val model:KMeansModel = KMeans.train(parsedData, cluster,30,1)
     val ssd = model.computeCost(parsedData)
     println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)
    })

     

     

    展开全文
  • Spark-Kmeans实战

    2016-03-22 00:44:24
    Kmeans实战算法总结 K-means均值聚类算法: 算法核心思想: 1. 选择K个类中心;(类中心范围为数据min,max之间) 2. 计算各样本到类中心的距离,把样本添加到离他最近的那个类中心的dataset中。ps:常用...
  • Spark实现K-Means的简单示例 注: 不是调用ML包,而是直接实现一个简易版的K-Means 不难,直接看代码吧(●’◡’●) //所有代码直接在main中执行即可 import scala.math.pow // 计算两个的点距离的平方 def ...
  • SparkMLlib实现K-means

    2018-11-15 22:29:15
    之前写过一篇关于kmeans的博客,里面详细的介绍了关于***K-means***的的详细描述,用python是实现的,并且在最后附带数据,了解更改关于K-means的内容详看https://blog.csdn.net/jklcl/article/details/76153430 ...
  • 实验 Spark ML Bisecting k-means聚类算法使用,实验文档
  • Kmeans原理介绍 聚类介绍 聚类kmeans 算法是一个无监督学习过程。一般是用来对数据对象按照其特征属性进行分组。经常被应用在客户分群、欺诈检测、图像分析领域。K-means是最有名并且最经常使用的聚类算法 算法介绍...
  • 不到一百行的代码教你在spark平台中使用scala实现kmeans算法。简单易懂,大量注释。适合初学者参考理解。本程序在intelliJ IDEA2016.1.1 中编程,运行在spark1.6.1 scala2.10.4本地模式下运行成功。 数据集:(其实...
  • #设置应用名称,显示在spark监控页面上 sc = SparkContext(appName="MySparkApplication")#读取数据,data文件夹下有6个数据文件,这样写能全部读取,需要注意的是,在其他worker的相同路径下也需要有这些文件 lines ...
  • KMeans on Spark

    2014-02-27 11:33:56
    思路: 1.随机生成数据 2.随机生成K个聚类中心 3.计算每个点所属的类别 4.计算新的聚类中心 5.比较聚类中心的变化情况,大于阈值...import org.apache.spark.SparkContext import SparkContext._ import org.ap
  • spark 实现K-means算法

    2019-05-20 14:35:20
    spark 实现K-means算法 package kmeans; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java....
  • 1 什么是KMeans算法 K-Means算法是一种cluster analysis的算法,其主要是来计算数据聚集的算法,主要通过不断地取离种子点最近均值的算法。 具体来说,通过输入聚类个数k,以及包含 n个数据对象的数据库,输出满足...
  • 面向函数式编程,用spark实现Kmeans
  • https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala// scalastyle:off println package org.apache.spark.examplesimport breeze.linalg.{Vector, ...
  • 引言 提起机器学习 (Machine Learning),相信很多计算机从业者都会对这个技术方向感到兴奋。然而学习并使用机器学习算法来处理数据却是一项复杂的工作,需要充足的知识储备,如概率论,数理统计,数值逼近,最优化...
  • Spark实现 – Kmeans聚类算法 Kmeans简介 Kmeans是最常用的聚类算法,也是十大经典的数据挖掘算法之一。聚类的思想用一句话概括就是“物以类聚,人以群分”。kmeans算法作为最基础的算法之一,基本上每本数据挖掘的...
  • Spark ml pipline交叉验证之Kmeans聚类 1.1 模型训练 1.1.1 输入参数 { "modelName ": "KMeans聚类 ", "numIterations ": " ", "numClasses ": " ", "runs ": " ", "numFolds ": "5 ", "maxIters ": [ ...
  • 压缩前:981 KB 压缩后:111 KB 思路: 取得图片每一点的像素,组成向量Vector如下:(w,h,R,G,B); 设置目的K值,训练所有点,获得KMeansModel; 此遍历所有的点,利用模型预测每个点属于哪个 中心点,...
1 2 3 4 5 ... 20
收藏数 2,493
精华内容 997
关键字:

kmeans spark