利用spark做文本聚类分析_spark 文本聚类 - CSDN
  • 本文将以聚类分析这个典型的机器学习问题为基础,向读者介绍如何使用 MLlib 提供的 K-means 算法对数据做聚类分析,我们还将通过分析源码,进一步加深读者对 MLlib K-means 算法的实现原理和使用方法的理解。

    引言

    提起机器学习 (Machine Learning),相信很多计算机从业者都会对这个技术方向感到兴奋。然而学习并使用机器学习算法来处理数据却是一项复杂的工作,需要充足的知识储备,如概率论,数理统计,数值逼近,最优化理论等。机器学习旨在使计算机具有人类一样的学习能力和模仿能力,这也是实现人工智能的核心思想和方法。传统的机器学习算法,由于技术和单机存储的限制,只能在少量数据上使用,随着 HDFS(Hadoop Distributed File System) 等分布式文件系统出现,存储海量数据已经成为可能。然而由于 MapReduce 自身的限制,使得使用 MapReduce 来实现分布式机器学习算法非常耗时和消耗磁盘容量。因为通常情况下机器学习算法参数学习的过程都是迭代计算的,即本次计算的结果要作为下一次迭代的输入,这个过程中,如果使用 MapReduce,我们只能把中间结果存储磁盘,然后在下一次计算的时候从新读取,这对于迭代 频发的算法显然是致命的性能瓶颈。Spark 立足于内存计算,天然的适应于迭代式计算,相信对于这点,读者通过前面几篇文章已经有了较为深入的了解。然而即便这样,对于普通开发者来说,实现一个分布式机器学习算法仍然是一件极具挑战的事情。MLlib 正是为了让基于海量数据的机器学习变得更加简单,它提供了常用机器学习算法的分布式实现,开发者只需要有 Spark 基础并且了解机器学习算法的原理,以及方法相关参数的含义,就可以轻松的通过调用相应的 API 来实现基于海量数据的机器学习过程。当然,原始数据 ETL,特征指标提取,调节参数并优化学习过程,这依然需要有足够的行业知识和数据敏感度,这往往也是经验的体现。本文的重点在于向读者介绍如何使用 MLlib 机器学习库提供的 K-means 算法做聚类分析,这是一个有意义的过程,相信会对读者特别是初学者有启发意义。

    Spark 机器学习库简介

    Spark 机器学习库提供了常用机器学习算法的实现,包括聚类,分类,回归,协同过滤,维度缩减等。使用 Spark 机器学习库来做机器学习工作,可以说是非常的简单,通常只需要在对原始数据进行处理后,然后直接调用相应的 API 就可以实现。但是要想选择合适的算法,高效准确地对数据进行分析,您可能还需要深入了解下算法原理,以及相应 Spark MLlib API 实现的参数的意义。

    需要提及的是,Spark 机器学习库从 1.2 版本以后被分为两个包,分别是:

    • spark.mllib

    Spark MLlib 历史比较长了,1.0 以前的版本中已经包含了,提供的算法实现都是基于原始的 RDD,从学习角度上来讲,其实比较容易上手。如果您已经有机器学习方面的经验,那么您只需要熟悉下 MLlib 的 API 就可以开始数据分析工作了。想要基于这个包提供的工具构建完整并且复杂的机器学习流水线是比较困难的。

    • spark.ml

    Spark ML Pipeline 从 Spark1.2 版本开始,目前已经从 Alpha 阶段毕业,成为可用并且较为稳定的新的机器学习库。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件,使用 ML Pipeline API,我们可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。显然,这种新的方式给我们提供了更灵活的方法,而且这也更符合机器学习过程的特点。

    从官方文档来看,Spark ML Pipeline 虽然是被推荐的机器学习方式,但是并不会在短期内替代原始的 MLlib 库,因为 MLlib 已经包含了丰富稳定的算法实现,并且部分 ML Pipeline 实现基于 MLlib。而且就笔者看来,并不是所有的机器学习过程都需要被构建成一个流水线,有时候原始数据格式整齐且完整,而且使用单一的算法就能实现目标,我们就没有必要把事情复杂化,采用最简单且容易理解的方式才是正确的选择。

    本文基于 Spark 1.5,向读者展示使用 MLlib API 进行聚类分析的过程。读者将会发现,使用 MLlib API 开发机器学习应用方式是比较简单的,相信本文可以使读者建立起信心并掌握基本方法,以便在后续的学习和工作中事半功倍。

    K-means 聚类算法原理

    聚类分析是一个无监督学习 (Unsupervised Learning) 过程, 一般是用来对数据对象按照其特征属性进行分组,经常被应用在客户分群,欺诈检测,图像分析等领域。K-means 应该是最有名并且最经常使用的聚类算法了,其原理比较容易理解,并且聚类效果良好,有着广泛的使用。

    和诸多机器学习算法一样,K-means 算法也是一个迭代式的算法,其主要步骤如下:

    • 第一步,选择 K 个点作为初始聚类中心。
    • 第二步,计算其余所有点到聚类中心的距离,并把每个点划分到离它最近的聚类中心所在的聚类中去。在这里,衡量距离一般有多个函数可以选择,最常用的是欧几里得距离 (Euclidean Distance), 也叫欧式距离。公式如下:
    Figure xxx. Requires a heading

    其中 C 代表中心点,X 代表任意一个非中心点。

    • 第三步,重新计算每个聚类中所有点的平均值,并将其作为新的聚类中心点。
    • 最后,重复 (二),(三) 步的过程,直至聚类中心不再发生改变,或者算法达到预定的迭代次数,又或聚类中心的改变小于预先设定的阀值。

    在实际应用中,K-means 算法有两个不得不面对并且克服的问题。

    1. 聚类个数 K 的选择。K 的选择是一个比较有学问和讲究的步骤,我们会在后文专门描述如何使用 Spark 提供的工具选择 K。
    2. 初始聚类中心点的选择。选择不同的聚类中心可能导致聚类结果的差异。

    Spark MLlib K-means 算法的实现在初始聚类点的选择上,借鉴了一个叫 K-means||的类 K-means++ 实现。K-means++ 算法在初始点选择上遵循一个基本原则: 初始聚类中心点相互之间的距离应该尽可能的远。基本步骤如下:

    • 第一步,从数据集 X 中随机选择一个点作为第一个初始点。
    • 第二步,计算数据集中所有点与最新选择的中心点的距离 D(x)。
    • 第三步,选择下一个中心点,使得最大。
    • 第四部,重复 (二),(三) 步过程,直到 K 个初始点选择完成。

    MLlib 的 K-means 实现

    Spark MLlib 中 K-means 算法的实现类 (KMeans.scala) 具有以下参数,具体如下。

    图 1. MLlib K-means 算法实现类预览
    图 1. MLlib K-means 算法实现类预览

    通过下面默认构造函数,我们可以看到这些可调参数具有以下初始值。

    图 2. MLlib K-means 算法参数初始值
    图 2. MLlib K-means 算法参数初始值

    参数的含义解释如下:

    • k 表示期望的聚类的个数。
    • maxInterations 表示方法单次运行最大的迭代次数。
    • runs 表示算法被运行的次数。K-means 算法不保证能返回全局最优的聚类结果,所以在目标数据集上多次跑 K-means 算法,有助于返回最佳聚类结果。
    • initializationMode 表示初始聚类中心点的选择方式, 目前支持随机选择或者 K-means||方式。默认是 K-means||。
    • initializationSteps表示 K-means||方法中的部数。
    • epsilon 表示 K-means 算法迭代收敛的阀值。
    • seed 表示集群初始化时的随机种子。

    通常应用时,我们都会先调用 KMeans.train 方法对数据集进行聚类训练,这个方法会返回 KMeansModel 类实例,然后我们也可以使用 KMeansModel.predict 方法对新的数据点进行所属聚类的预测,这是非常实用的功能。

    KMeans.train 方法有很多重载方法,这里我们选择参数最全的一个展示。

    图 3. KMeans.train 方法预览
    图 3. KMeans.train 方法预览

    KMeansModel.predict 方法接受不同的参数,可以是向量,或者 RDD,返回是入参所属的聚类的索引号。

    图 4. KMeansModel.predict 方法预览
    图 4. KMeansModel.predict 方法预览

    聚类测试数据集简介

    在本文中,我们所用到目标数据集是来自 UCI Machine Learning Repository 的 Wholesale customer Data Set。UCI 是一个关于机器学习测试数据的下载中心站点,里面包含了适用于做聚类,分群,回归等各种机器学习问题的数据集。

    Wholesale customer Data Set 是引用某批发经销商的客户在各种类别产品上的年消费数。为了方便处理,本文把原始的 CSV 格式转化成了两个文本文件,分别是训练用数据和测试用数据。

    图 5. 客户消费数据格式预览
    图 5. 客户消费数据格式预览

    读者可以从标题清楚的看到每一列代表的含义,当然读者也可以到 UCI 网站上去找到关于该数据集的更多信息。虽然 UCI 的数据可以自由获取并使用,但是我们还是在此声明,该数据集的版权属 UCI 以及其原始提供组织或公司所有。

    案例分析和编码实现

    本例中,我们将根据目标客户的消费数据,将每一列视为一个特征指标,对数据集进行聚类分析。代码实现步骤如下

    清单 1. 聚类分析实现类源码
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
    import org.apache.spark.mllib.linalg.Vectors
    object KMeansClustering {
     def main (args: Array[String]) {
     if (args.length < 5) {
    
        println("Usage:KMeansClustering trainingDataFilePath testDataFilePath numClusters
        numIterations runTimes")
     sys.exit(1)
     }
    
     val conf = new
        SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering")
     val sc = new SparkContext(conf)
    
       /**
     *Channel Region Fresh Milk Grocery Frozen Detergents_Paper Delicassen
     * 2 3
         12669 9656 7561 214 2674 1338
     * 2 3 7057 9810 9568 1762 3293 1776
     * 2 3 6353 8808
         7684 2405 3516 7844
     */
    
        val rawTrainingData = sc.textFile(args(0))
     val parsedTrainingData =
        rawTrainingData.filter(!isColumnNameLine(_)).map(line => {
    
        Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
     }).cache()
    
        // Cluster the data into two classes using KMeans
    
        val numClusters = args(2).toInt
     val numIterations = args(3).toInt
     val runTimes =
        args(4).toInt
     var clusterIndex:Int = 0
     val clusters:KMeansModel =
        KMeans.train(parsedTrainingData, numClusters, numIterations,runTimes)
    
        println("Cluster Number:" + clusters.clusterCenters.length)
    
        println("Cluster Centers Information Overview:")
     clusters.clusterCenters.foreach(
        x => {
    
        println("Center Point of Cluster " + clusterIndex + ":")
    
        println(x)
     clusterIndex += 1
     })
    
        //begin to check which cluster each test data belongs to based on the clustering result
    
        val rawTestData = sc.textFile(args(1))
     val parsedTestData = rawTestData.map(line =>
        {
    
        Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))
    
        })
     parsedTestData.collect().foreach(testDataLine => {
     val predictedClusterIndex:
        Int = clusters.predict(testDataLine)
    
        println("The data " + testDataLine.toString + " belongs to cluster " +
        predictedClusterIndex)
     })
    
        println("Spark MLlib K-means clustering test finished.")
     }
    
     private def
        isColumnNameLine(line:String):Boolean = {
     if (line != null &&
        line.contains("Channel")) true
     else false
     }

    该示例程序接受五个入参,分别是

    • 训练数据集文件路径
    • 测试数据集文件路径
    • 聚类的个数
    • K-means 算法的迭代次数
    • K-means 算法 run 的次数

    运行示例程序

    和本系列其他文章一样,我们依然选择使用 HDFS 存储数据文件。运行程序之前,我们需要将前文提到的训练和测试数据集上传到 HDFS。

    图 6. 测试数据的 HDFS 目录
    图 6. 测试数据的 HDFS 目录
    清单 2. 示例程序运行命令
    ./spark-submit --class com.ibm.spark.exercise.mllib.KMeansClustering \
     --master spark://<spark_master_node_ip>:7077 \
     --num-executors 6 \
    --driver-memory 3g \
    --executor-memory 512m \
    --total-executor-cores 6 \
     /home/fams/spark_exercise-1.0.jar \
     hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_training.txt \
     hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_test.txt \
     8 30 3
    图 7. K-means 聚类示例程序运行结果
    图 7. K-means 聚类示例程序运行结果

    如何选择 K

    前面提到 K 的选择是 K-means 算法的关键,Spark MLlib 在 KMeansModel 类里提供了 computeCost 方法,该方法通过计算所有数据点到其最近的中心点的平方和来评估聚类的效果。一般来说,同样的迭代次数和算法跑的次数,这个值越小代表聚类的效果越好。但是在实际情况下,我们还要考虑到聚类结果的可解释性,不能一味的选择使 computeCost 结果值最小的那个 K。

    清单 3. K 选择示例代码片段
    val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)
    ks.foreach(cluster => {
     val model:KMeansModel = KMeans.train(parsedTrainingData, cluster,30,1)
     val ssd = model.computeCost(parsedTrainingData)
     println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)
    })
    图 8. K 选择示例程序运行结果
    图 8. K 选择示例程序运行结果

    从上图的运行结果可以看到,当 K=9 时,cost 值有波动,但是后面又逐渐减小了,所以我们选择 8 这个临界点作为 K 的个数。当然可以多跑几次,找一个稳定的 K 值。理论上 K 的值越大,聚类的 cost 越小,极限情况下,每个点都是一个聚类,这时候 cost 是 0,但是显然这不是一个具有实际意义的聚类结果。

    结束语

    通过本文的学习,读者已经初步了解了 Spark 的机器学习库,并且掌握了 K-means 算法的基本原理,以及如何基于 Spark MLlib 构建自己的机器学习应用。机器学习应用的构建是一个复杂的过程,我们通常还需要对数据进行预处理,然后特征提取以及数据清洗等,然后才能利用算法来分析数据。Spark MLlib 区别于传统的机器学习工具,不仅是因为它提供了简单易用的 API,更重要的是 Spark 在处理大数据上的高效以及在迭代计算时的独特优势。虽然本文所采用的测试数据集很小,并不能反映大数据的应用场景,但是对于掌握基本原理已经足够,并且如果读者拥有更大的数据集就可以轻松的将本文的测试程序推广到大数据聚类的场景下,因为 Spark MLlib 的编程模型都是一致的,无非是数据读取和处理的方式略有不同。希望读者可以在本文中找到自己感兴趣的知识,相信这对读者今后深入学习是有帮助的。另外,读者在阅读本文的过程中,如果遇到问题或者发现不足之处,请不吝赐教,在文末留言,共同交流学习,谢谢。

    参考资料

    学习

    展开全文
  • 引言 提起机器学习 (Machine Learning),相信很多计算机从业者都会对这个技术方向感到兴奋。然而学习并使用机器学习算法来处理数据却是一项复杂的工作,需要充足的知识储备,如概率论,数理统计,数值逼近,最优化...

    引言

    提起机器学习 (Machine Learning),相信很多计算机从业者都会对这个技术方向感到兴奋。然而学习并使用机器学习算法来处理数据却是一项复杂的工作,需要充足的知识储备,如概率论,数理统计,数值逼近,最优化理论等。机器学习旨在使计算机具有人类一样的学习能力和模仿能力,这也是实现人工智能的核心思想和方法。传统的机器学习算法,由于技术和单机存储的限制,只能在少量数据上使用,随着 HDFS(Hadoop Distributed File System) 等分布式文件系统出现,存储海量数据已经成为可能。然而由于 MapReduce 自身的限制,使得使用 MapReduce 来实现分布式机器学习算法非常耗时和消耗磁盘容量。因为通常情况下机器学习算法参数学习的过程都是迭代计算的,即本次计算的结果要作为下一次迭代的输入,这个过程中,如果使用 MapReduce,我们只能把中间结果存储磁盘,然后在下一次计算的时候从新读取,这对于迭代 频发的算法显然是致命的性能瓶颈。Spark 立足于内存计算,天然的适应于迭代式计算,相信对于这点,读者通过前面几篇文章已经有了较为深入的了解。然而即便这样,对于普通开发者来说,实现一个分布式机器学习算法仍然是一件极具挑战的事情。MLlib 正是为了让基于海量数据的机器学习变得更加简单,它提供了常用机器学习算法的分布式实现,开发者只需要有 Spark 基础并且了解机器学习算法的原理,以及方法相关参数的含义,就可以轻松的通过调用相应的 API 来实现基于海量数据的机器学习过程。当然,原始数据 ETL,特征指标提取,调节参数并优化学习过程,这依然需要有足够的行业知识和数据敏感度,这往往也是经验的体现。本文的重点在于向读者介绍如何使用 MLlib 机器学习库提供的 K-means 算法做聚类分析,这是一个有意义的过程,相信会对读者特别是初学者有启发意义。

    Spark 机器学习库简介

    Spark 机器学习库提供了常用机器学习算法的实现,包括聚类,分类,回归,协同过滤,维度缩减等。使用 Spark 机器学习库来做机器学习工作,可以说是非常的简单,通常只需要在对原始数据进行处理后,然后直接调用相应的 API 就可以实现。但是要想选择合适的算法,高效准确地对数据进行分析,您可能还需要深入了解下算法原理,以及相应 Spark MLlib API 实现的参数的意义。

    需要提及的是,Spark 机器学习库从 1.2 版本以后被分为两个包,分别是:

    • spark.mllib

    Spark MLlib 历史比较长了,1.0 以前的版本中已经包含了,提供的算法实现都是基于原始的 RDD,从学习角度上来讲,其实比较容易上手。如果您已经有机器学习方面的经验,那么您只需要熟悉下 MLlib 的 API 就可以开始数据分析工作了。想要基于这个包提供的工具构建完整并且复杂的机器学习流水线是比较困难的。

    • spark.ml

    Spark ML Pipeline 从 Spark1.2 版本开始,目前已经从 Alpha 阶段毕业,成为可用并且较为稳定的新的机器学习库。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件,使用 ML Pipeline API,我们可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。显然,这种新的方式给我们提供了更灵活的方法,而且这也更符合机器学习过程的特点。

    从官方文档来看,Spark ML Pipeline 虽然是被推荐的机器学习方式,但是并不会在短期内替代原始的 MLlib 库,因为 MLlib 已经包含了丰富稳定的算法实现,并且部分 ML Pipeline 实现基于 MLlib。而且就笔者看来,并不是所有的机器学习过程都需要被构建成一个流水线,有时候原始数据格式整齐且完整,而且使用单一的算法就能实现目标,我们就没有必要把事情复杂化,采用最简单且容易理解的方式才是正确的选择。

    本文基于 Spark 1.5,向读者展示使用 MLlib API 进行聚类分析的过程。读者将会发现,使用 MLlib API 开发机器学习应用方式是比较简单的,相信本文可以使读者建立起信心并掌握基本方法,以便在后续的学习和工作中事半功倍。

    K-means 聚类算法原理

    聚类分析是一个无监督学习 (Unsupervised Learning) 过程, 一般是用来对数据对象按照其特征属性进行分组,经常被应用在客户分群,欺诈检测,图像分析等领域。K-means 应该是最有名并且最经常使用的聚类算法了,其原理比较容易理解,并且聚类效果良好,有着广泛的使用。

    和诸多机器学习算法一样,K-means 算法也是一个迭代式的算法,其主要步骤如下:

    • 第一步,选择 K 个点作为初始聚类中心。
    • 第二步,计算其余所有点到聚类中心的距离,并把每个点划分到离它最近的聚类中心所在的聚类中去。在这里,衡量距离一般有多个函数可以选择,最常用的是欧几里得距离 (Euclidean Distance), 也叫欧式距离。公式如下:

    Figure xxx. Requires a heading

    其中 C 代表中心点,X 代表任意一个非中心点。

    • 第三步,重新计算每个聚类中所有点的平均值,并将其作为新的聚类中心点。
    • 最后,重复 (二),(三) 步的过程,直至聚类中心不再发生改变,或者算法达到预定的迭代次数,又或聚类中心的改变小于预先设定的阀值。

    在实际应用中,K-means 算法有两个不得不面对并且克服的问题。

    1. 聚类个数 K 的选择。K 的选择是一个比较有学问和讲究的步骤,我们会在后文专门描述如何使用 Spark 提供的工具选择 K。
    2. 初始聚类中心点的选择。选择不同的聚类中心可能导致聚类结果的差异。

    Spark MLlib K-means 算法的实现在初始聚类点的选择上,借鉴了一个叫 K-means||的类 K-means++ 实现。K-means++ 算法在初始点选择上遵循一个基本原则: 初始聚类中心点相互之间的距离应该尽可能的远。基本步骤如下:

    • 第一步,从数据集 X 中随机选择一个点作为第一个初始点。
    • 第二步,计算数据集中所有点与最新选择的中心点的距离 D(x)。
    • 第三步,选择下一个中心点,使得最大。
    • 第四部,重复 (二),(三) 步过程,直到 K 个初始点选择完成。

    MLlib 的 K-means 实现

    Spark MLlib 中 K-means 算法的实现类 (KMeans.scala) 具有以下参数,具体如下。

    图 1. MLlib K-means 算法实现类预览

    图 1. MLlib K-means 算法实现类预览

    通过下面默认构造函数,我们可以看到这些可调参数具有以下初始值。

    图 2. MLlib K-means 算法参数初始值

    图 2. MLlib K-means 算法参数初始值

    参数的含义解释如下:

    • k 表示期望的聚类的个数。
    • maxInterations 表示方法单次运行最大的迭代次数。
    • runs 表示算法被运行的次数。K-means 算法不保证能返回全局最优的聚类结果,所以在目标数据集上多次跑 K-means 算法,有助于返回最佳聚类结果。
    • initializationMode 表示初始聚类中心点的选择方式, 目前支持随机选择或者 K-means||方式。默认是 K-means||。
    • initializationSteps表示 K-means||方法中的部数。
    • epsilon 表示 K-means 算法迭代收敛的阀值。
    • seed 表示集群初始化时的随机种子。

    通常应用时,我们都会先调用 KMeans.train 方法对数据集进行聚类训练,这个方法会返回 KMeansModel 类实例,然后我们也可以使用 KMeansModel.predict 方法对新的数据点进行所属聚类的预测,这是非常实用的功能。

    KMeans.train 方法有很多重载方法,这里我们选择参数最全的一个展示。

    图 3. KMeans.train 方法预览

    图 3. KMeans.train 方法预览

    KMeansModel.predict 方法接受不同的参数,可以是向量,或者 RDD,返回是入参所属的聚类的索引号。

    图 4. KMeansModel.predict 方法预览

    图 4. KMeansModel.predict 方法预览

    聚类测试数据集简介

    在本文中,我们所用到目标数据集是来自 UCI Machine Learning Repository 的 Wholesale customer Data Set。UCI 是一个关于机器学习测试数据的下载中心站点,里面包含了适用于做聚类,分群,回归等各种机器学习问题的数据集。

    Wholesale customer Data Set 是引用某批发经销商的客户在各种类别产品上的年消费数。为了方便处理,本文把原始的 CSV 格式转化成了两个文本文件,分别是训练用数据和测试用数据。

    图 5. 客户消费数据格式预览

    图 5. 客户消费数据格式预览

    读者可以从标题清楚的看到每一列代表的含义,当然读者也可以到 UCI 网站上去找到关于该数据集的更多信息。虽然 UCI 的数据可以自由获取并使用,但是我们还是在此声明,该数据集的版权属 UCI 以及其原始提供组织或公司所有。

    案例分析和编码实现

    本例中,我们将根据目标客户的消费数据,将每一列视为一个特征指标,对数据集进行聚类分析。代码实现步骤如下

    清单 1. 聚类分析实现类源码

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

    import org.apache.spark.mllib.linalg.Vectors

    object KMeansClustering {<br> def main (args: Array[String]) {<br> if (args.length < 5) {<br>

        println("Usage:KMeansClustering trainingDataFilePath testDataFilePath numClusters

        numIterations runTimes")<br> sys.exit(1)<br> }<br><br> val conf = new

        SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering")<br> val sc = new SparkContext(conf)<br>

       /**<br> *Channel Region Fresh Milk Grocery Frozen Detergents_Paper Delicassen<br> * 2 3

         12669 9656 7561 214 2674 1338<br> * 2 3 7057 9810 9568 1762 3293 1776<br> * 2 3 6353 8808

         7684 2405 3516 7844<br> */<br>

        val rawTrainingData = sc.textFile(args(0))<br> val parsedTrainingData =

        rawTrainingData.filter(!isColumnNameLine(_)).map(line => {<br>

        Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))<br> }).cache()<br>

        // Cluster the data into two classes using KMeans<br>

        val numClusters = args(2).toInt<br> val numIterations = args(3).toInt<br> val runTimes =

        args(4).toInt<br> var clusterIndex:Int = 0<br> val clusters:KMeansModel =

        KMeans.train(parsedTrainingData, numClusters, numIterations,runTimes)<br>

        println("Cluster Number:" + clusters.clusterCenters.length)<br>

        println("Cluster Centers Information Overview:")<br> clusters.clusterCenters.foreach(

        x => {<br>

        println("Center Point of Cluster " + clusterIndex + ":")<br>

        println(x)<br> clusterIndex += 1<br> })<br>

        //begin to check which cluster each test data belongs to based on the clustering result<br>

        val rawTestData = sc.textFile(args(1))<br> val parsedTestData = rawTestData.map(line =>

        {<br>

        Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble))<br>

        })<br> parsedTestData.collect().foreach(testDataLine => {<br> val predictedClusterIndex:

        Int = clusters.predict(testDataLine)<br>

        println("The data " + testDataLine.toString + " belongs to cluster " +

        predictedClusterIndex)<br> })<br>

        println("Spark MLlib K-means clustering test finished.")<br> }<br><br> private def

        isColumnNameLine(line:String):Boolean = {<br> if (line != null &&

        line.contains("Channel")) true<br> else false<br> }

    该示例程序接受五个入参,分别是

    • 训练数据集文件路径
    • 测试数据集文件路径
    • 聚类的个数
    • K-means 算法的迭代次数
    • K-means 算法 run 的次数

    运行示例程序

    和本系列其他文章一样,我们依然选择使用 HDFS 存储数据文件。运行程序之前,我们需要将前文提到的训练和测试数据集上传到 HDFS。

    图 6. 测试数据的 HDFS 目录

    图 6. 测试数据的 HDFS 目录

    清单 2. 示例程序运行命令

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    ./spark-submit --class com.ibm.spark.exercise.mllib.KMeansClustering \

     --master spark://<spark_master_node_ip>:7077 \

     --num-executors 6 \

    --driver-memory 3g \

    --executor-memory 512m \

    --total-executor-cores 6 \

     /home/fams/spark_exercise-1.0.jar \

     hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_training.txt \

     hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_test.txt \

     8 30 3

    图 7. K-means 聚类示例程序运行结果

    图 7. K-means 聚类示例程序运行结果

    如何选择 K

    前面提到 K 的选择是 K-means 算法的关键,Spark MLlib 在 KMeansModel 类里提供了 computeCost 方法,该方法通过计算所有数据点到其最近的中心点的平方和来评估聚类的效果。一般来说,同样的迭代次数和算法跑的次数,这个值越小代表聚类的效果越好。但是在实际情况下,我们还要考虑到聚类结果的可解释性,不能一味的选择使 computeCost 结果值最小的那个 K。

    清单 3. K 选择示例代码片段

    1

    2

    3

    4

    5

    6

    val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)

    ks.foreach(cluster => {

     val model:KMeansModel = KMeans.train(parsedTrainingData, cluster,30,1)

     val ssd = model.computeCost(parsedTrainingData)

     println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)

    })

    图 8. K 选择示例程序运行结果

    图 8. K 选择示例程序运行结果

    从上图的运行结果可以看到,当 K=9 时,cost 值有波动,但是后面又逐渐减小了,所以我们选择 8 这个临界点作为 K 的个数。当然可以多跑几次,找一个稳定的 K 值。理论上 K 的值越大,聚类的 cost 越小,极限情况下,每个点都是一个聚类,这时候 cost 是 0,但是显然这不是一个具有实际意义的聚类结果。

    结束语

    通过本文的学习,读者已经初步了解了 Spark 的机器学习库,并且掌握了 K-means 算法的基本原理,以及如何基于 Spark MLlib 构建自己的机器学习应用。机器学习应用的构建是一个复杂的过程,我们通常还需要对数据进行预处理,然后特征提取以及数据清洗等,然后才能利用算法来分析数据。Spark MLlib 区别于传统的机器学习工具,不仅是因为它提供了简单易用的 API,更重要的是 Spark 在处理大数据上的高效以及在迭代计算时的独特优势。虽然本文所采用的测试数据集很小,并不能反映大数据的应用场景,但是对于掌握基本原理已经足够,并且如果读者拥有更大的数据集就可以轻松的将本文的测试程序推广到大数据聚类的场景下,因为 Spark MLlib 的编程模型都是一致的,无非是数据读取和处理的方式略有不同。希望读者可以在本文中找到自己感兴趣的知识,相信这对读者今后深入学习是有帮助的。另外,读者在阅读本文的过程中,如果遇到问题或者发现不足之处,请不吝赐教,在文末留言,共同交流学习,谢谢。

    转载:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice4/index.html

    展开全文
  • 利用spark做文本聚类分析

    千次阅读 2017-02-08 23:03:51
    import java.util.Arrays; import java.util.List...import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.ap

    聚类分析

    什么是聚类分析?《数据挖掘导论》是给出了这样的定义:聚类分析仅根据在数据中发现的描述对象及其关系的信息,将数据对象分组。其目标是,组内的对象相互之间是相似的(相关的),而不同组中的对象是不同(不相关的)。组内的相似性(同质性)越大,组间差别越大,聚类就越好。

    想像有这样的一个情景:用户每天都会通过搜索引擎去查询他/她所感兴趣的信息,而我们希望能够根据用户的搜索词去细分目标用户群体,从而分析不同用户群体对哪些信息比较感兴趣。这时,聚类分析就是我们常常采用的手段。

    高斯混合分布聚类模型

    除了常见的基于距离的聚类模型,如k-means聚类,聚类中也有基于概率模型,例如高斯混合分布聚类模型(GMM)。基于概率模型的好处在于,它并没有像k-means那样让每一个数据点只能归属于一个簇当中,而是通过概率来反映每个数据点可能分布到每一个簇的概率值,即属于软聚类。在某些场景中,软聚类能够解释数据点的多元性,好比如人的兴趣点不唯一,用户行为的多样性等等。
    高斯混合分布模型主要是利用EM算法做参数估计,关于高斯混合分布聚类模型的详细讲述,我将其放到另一份博客当中:
    http://blog.csdn.net/qq_30843221/article/details/54894640

    聚类模型的详细过程

    1.样本数据
    我们模拟用户搜索行为,一组是搜索关于电影内容,而另一组是关于机器学习,具体数据如下:

    好看 电影 惊悚 悬疑 不错 推荐
    机器学习 自然语言处理 信息 检索
    机器学习 数据挖掘 人工智能 检索
    电影 动画 精彩 好看 不错 加油 推荐

    2.数据的加载,基本的分词(我使用的java版的spark,分词工具为hanlp)

    //加载数据
    String filename = "/home/quincy1994/test.txt";
    JavaRDD<String> sentences = sc.textFile(filename);
    JavaRDD<String> segRDD = sentences.map(new Seg());
    JavaRDD<Row> jrdd = segRDD.map(new StringtoRow());
    segRDD.cache();
    
    //数据转换为矩阵
    StructType schema = new StructType(new StructField[]{
        new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
    });
    DataFrame sentenceData = sqlContext.createDataFrame(jrdd, schema);
    Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");  //tokenizer以简单的空白分割词语
    DataFrame wordsData = tokenizer.transform(sentenceData); // 将句子分割词语
    
    //分词类
        static class Seg implements Function<String, String>{
    
            public String call(String sentence) throws Exception{
                String segStr = "";
                List<Term> termList = segment.seg(sentence); //分词
                StringBuilder sb = new StringBuilder();
                for(Term term: termList){
                    String word = term.word;
                    sb.append(word+ " ");
                }
                segStr = sb.toString().trim();
                return segStr;
            }
        }
    
        //将String的sentence转变为mllib中row数据类型
        static class StringtoRow implements Function<String, Row>{
    
            public Row call(String sentence) throws Exception {
                return RowFactory.create(sentence);
            }
        }

    3.特征选择(我主要采用的tfidf模型,刚开始使用word2vec不太理想,可能数据太稀疏)

    //tfidf模型
    int numFeatures = 20;  //选定抽取前k个特征
    HashingTF hashingTF  = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numFeatures);
    DataFrame featurizedData = hashingTF.transform(wordsData);
    IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
    IDFModel idfModel = idf.fit(featurizedData);
    DataFrame result = idfModel.transform(featurizedData);

    4.数据的归一化处理(之前忘了做这步,也导致数据聚类效果不理想)

    //归一化处理
    Normalizer normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures").setP(1.0);
    DataFrame l1NormData = normalizer.transform(result.select("features"));
    JavaRDD<Vector> normRDD = l1NormData.rdd().toJavaRDD().map(new RowToVector()); //将row转变成为vector
    normRDD.cache();
    
    //将row转变为Vector,机器学习模型基本采用vector类型
        static class RowToVector implements Function<Row, Vector>{
    
            public Vector call(Row r) throws Exception {
                // TODO Auto-generated method stub
                Vector features = r.getAs(0);    //将row转变成为vector 
                return features;
            }
        }

    5.使用高斯混合模型聚类(代码超简单)

    static int k = 2; //设定有多少个高斯混合模型
    GaussianMixtureModel gmm = new GaussianMixture().setK(k).run(normRDD.rdd());
    normRDD.cache();

    6.为每个节点标记它归属的簇

    //为每个节点标记归属的簇
            RDD<Vector> points = normRDD.rdd();
            JavaRDD<double[]> predictRDD = new JavaRDD(gmm.predictSoft(points), null);
            JavaRDD<Integer> resultRDD = predictRDD.map(new Group());
            resultRDD.cache();
    static class Group implements Function<double[], Integer>{
    
            //我设定归属概率大于0.5的簇,否则当其为噪声
            public Integer call(double[] probabilities) throws Exception {
                double max = 0.5;
                int index = -1;
                for(int i = 0; i < probabilities.length; i++){
                    if(max <= probabilities[i]){
                        index = i;
                        break;
                    }
                }
                return index;
            }
        }

    7.从每个簇中提取主要标签词

    //在每个簇中提取主标签
            Object[] output= resultRDD.collect().toArray();  //得到每个数据点属于的簇
            Object[]  seg = segRDD.collect().toArray();    //得到每个数据点原来的标签词
            //集合不同簇各自的标签词
            List<Tuple2<Integer, String>> list = new ArrayList<Tuple2<Integer, String>>();
            for(int i = 0; i<output.length; i++){
                int group = (Integer) output[i];
                String tags = (String) seg[i];
                Tuple2<Integer, String> one = new Tuple2<Integer, String>(group, tags);
                list.add(one);
            }
            JavaPairRDD<Integer, String>  rddValue = sc.parallelizePairs(list);
            JavaPairRDD<Integer, Iterable<String>> groupRDD = rddValue.groupByKey();  //按簇归类
            JavaRDD<Tuple2<Integer, String>> tagsRDD = groupRDD.map(new ReduceString()); //将不同的标签混合在一块
            JavaRDD<Tuple2<Integer,String>> topKRDD = tagsRDD.map(new TopTag()); //找出前k个具有代表性的标签
    
    static class ReduceString implements Function<Tuple2<Integer, Iterable<String>>, Tuple2<Integer, String>>{
            //合并标签词
            public Tuple2<Integer, String> call(Tuple2<Integer, Iterable<String>> clusterString){
                int key = clusterString._1();
                StringBuffer sb = new StringBuffer();
                Iterable<String> iter = clusterString._2();
                for( String string: iter){
                    sb.append(string + " ");
                }
                return new Tuple2(key, sb.toString().trim());
            }
        }
    
    static class TopTag implements Function<Tuple2<Integer, String>, Tuple2<Integer, String>>{
            //将所有的标签收集,排序,找出频率最高的前k个标签词 
            int topK = 3; 
    
            public Tuple2<Integer, String> call(Tuple2<Integer, String> cluster){
                int key = cluster._1();
                String[] taglist = cluster._2().split(" ");
                Map<String, Integer> map = new HashMap<String, Integer>();
                for(String tag: taglist){
                    if(!map.containsKey(tag)){
                        map.put(tag, 1);
                    }
                    else{
                        int count = map.get(tag);
                        map.put(tag, count + 1);
                    }
                }
    
                List<Map.Entry<String, Integer>> infolds = new ArrayList<Map.Entry<String, Integer>>(map.entrySet());
                Collections.sort(infolds, new Comparator<Map.Entry<String, Integer>>(){
                    public int compare(Map.Entry<String, Integer>o1, Map.Entry<String, Integer>o2){
                        return (o2.getValue() - o1.getValue());
                    }
                });
                String str = "";
                int num = 0;
                for(Map.Entry<String, Integer> one: infolds){
                    str += one.getKey() + " ";
                    if(num == topK){
                        break;
                    }
                    num += 1;
                }
                return new Tuple2<Integer, String>(key, str.trim());
            }
        }

    8.输出结果

    //输出结果
            List<Tuple2<Integer, String>> reducelist = topKRDD.collect();
            for(Tuple2<Integer, String> tags: reducelist){
                System.out.println(tags._1() + ":" + tags._2());
            }

    结果如下:

    0:机器学习 检索 信息
    1:电影 推荐 好看

    我将具体的代码放置我的github中:
    https://www.github.com/Quincy1994/SparkStudy/tree/master/cluster

    展开全文
  • Spark-KMeans文本聚类

    千次阅读 2016-01-07 11:37:05
    1 实验环境部署 1.1 主机环境  处理器 Intel(R) Core(TM)2 Duo CPU 2.80GHz 内存 8.00GB 操作系统 WIN7SP1 64bit ...VMware® Workstation 10.0.2 build-1744117 ...操作系统 Ubuntu12.04 LTS Desktop...

    实验环境部署

    1.1 主机环境 

    处理器 Intel(R) Core(TM)2 Duo CPU  2.80GHz

    内存 8.00GB

    操作系统 WIN7SP1 64bit

    1.2虚拟机环境

    VMware® Workstation  10.0.2 build-1744117

    处理器 2Core

    内存 4GB

    操作系统 Ubuntu12.04 LTS Desktop 32bit

     

    HadoopSpark环境在之前的练习中已经搭好。

    2 方法介绍

    2.1 文本聚类

    文本聚类(Text clustering)主要是依据著名的聚类假设:同类的文档相似度较大,而不同类的文档相似度较小。作为一种无监督的机器学习方法,聚类由于不需要训练过程,以及不需要预先对文档手工标注类别,因此具有一定的灵活性和较高的自动化处理能力,已经成为对文本信息进行有效地组织、摘要和导航的重要手段

    文本聚类可以用于生成一篇简明扼要的摘要文档;对搜索引擎返回的结果进行聚类,使用户迅速定位到所需要的信息;对用户感兴趣的文档(如用户浏览器cache中的网页)聚类,从而发现用户的兴趣模式并用于信息过滤和信息主动推荐等服务;数字图书馆服务;文档集合的自动整理等等。

    2.2 主要的聚类方法

    (1基于划分的方法

    基于划分的聚类算法(Partitioning Method)是文本聚类应用中最为普遍的算法。方法将数据集合分成若干个子集,它根据设定的划分数目k选出k个初始聚类中心,得到一个初始划分,然后采用迭代重定位技术,反复在k个簇之间重新计算每个簇的聚类中心,并重新分配每个簇中的对象,以改进划分的质量。使得到的划分满足“簇内相似度高,簇间相似度小”的聚类原则。典型的划分聚类方法有K-means算法和K-medoids算法,两者的区别在于簇代表点的计算方法不同。前者使用所有点的均值来代表簇,后者则采用类中某个数据对象来代表簇。为了对大规模的数据集进行聚类,以及处理复杂形状的聚类,各类改进的划分算法逐渐增多。

    基于划分方法的优点是运行速度快,但该方法必须事先确定k的取值。算法容易局部收敛,且不同的初始聚类中心选取对聚类结果影响较大。为此,应用最广泛的k-means算法有很多变种,他们可能在初始k个聚类中心的选择、相似度的计算和计算聚类中心等策略上有所不同,最终实现聚类结果改进的目标。

    (2基于层次的方法

    基于层次的聚类算法(Hierarchical Method)又叫“分级聚类算法”或“树聚类”,它通过分解给定的数据对象集来创建一个层次。这种聚类方法有两种基本的技术途径:一是先把每个对象看作一个簇,然后逐步对簇进行合并,直到所有对象合为一个簇,或满足一定条件为止;二是把所有对象看成一类,根据一些规则不断选择一个簇进行分解,直到满足一些预定的条件,如类的数目达到了预定值,或两个最近簇的距离达到阈值等。前者称为自下而上的凝聚式聚类,后者称为自上而下的分裂式聚类。

    (3基于密度的方法

    绝大多数划分算法都是基于对象之间的距离进行聚类,这类方法只能发现圆形或球状的簇,较难发现任意形状的簇。为此,提出了基于密度的聚类算法(Density-Based Clustering Method),其主要思想是:只要邻近区域的对象或数据点的数目超过某个阈值,就继续聚类。即对给定类中的每个数据点,在一个给定范围的区域中至少包含某个数目的点,这样就能很好的过滤掉“噪声”数据,发现任意形状的簇。其基本出发点是,寻找低密度区域分离的高密度区域。

    (4基于网格的方法

    基于网格的算法(Grid-Based Clustering Method)把对象空间量化为有限数目的单元,形成了一个网络结构。所用的聚类操作都在整个网络结构即量化的空间上进行。这种方法的一个突出的优点就是处理速度很快,其处理时间独立于数据对象的数目,只与量化空间中的每一维的单元数目有关。

    (5基于模型的方法

    基于模型的算法(Model-Based Clustering Method)试图优化给定的数据和某些数学模型之间的适应性。这样的算法经常是基于这样的假设,数据是根据潜在的概率分布生成的。它通过为每个聚类假设一个模型来发现符合相应模型的数据对象。根据标准统计方法并综合考虑“噪声”或异常数据,该方法可以自动确定聚类个数,从而得到鲁棒性较好的聚类方法。基于模型的算法主要有两类,分别为统计学方法和神经网络方法。

    2.3 K-means算法

    K-means算法接受数据集和参数k,经过若干次迭代,将输入的n个数据对象(以m维向量形式表示)划分为k个聚类,使得所获得的聚类满足:

    1、 同一聚类中的数据对象的相似度较高(或距离最近);

    2、 不同聚类中的数据对象的相似度较低(或距离最远)。

    算法流程:

    1、 适当选择k个初始中心;

    2、 在第i次迭代中,对任意一个样本数据,求其到k个中心的距离,然后将该样本数据归到距离最短的中心所在的类;

    3、 对于每个类,利用求均值等方法更新该类的中心值;

    4、 对于所有的k个聚类中心,如果经过23的某次迭代法更新后,值保持不变,则迭代结束,否则继续迭代。

    2.4 Hadoop实现

    本次作业的实验数据为大量的中文短文本,包含未分词和已分词两种模式,而对于大规模的中文网站聚类,其流程见下图:

     

    中文聚类分析

    根据这一完整的流程,我们先在hadoop下实现聚类,共有七个源文件:

    (1WordFrequenceInDocument.java

    提取中文、分词、去停用词、统计词频


     

    (2WordCountsInDocuments.java

    统计每个网页的单词数目


        (3WordsInCorpusTFIDF.java

    统计单词在多少个网页出现,计算TFIDF,建立词表


        (4DocumentVetorBuid.java

    建立网页向量,随机选取K个网页作为中心点


        (5Kmeans.java

    判断网页属于哪一类,更新中心点,最后输出网页所属中心标号


        (6KmeansDriver.java

    控制 MapreducJob顺序,以及K-means迭代流程,设置参数


        (7DocTool.java

    根据网页向量以及所有中心点向量输出网页所属的中心编号

     

     

    在处理中文文本的过程中,三个主要的mapreduce过程如下:

     

    1 WordFrequenceInDocument

     

    2 WordCountsInDocuments

     

    3 WordsInCorpusTFIDF

    网页向量以及初始中心点的选取在Mapreduce 中的过程为:

     

    4 DocumentVectorBuild

    DocTool简化了Kmeans过程中的代码,将计算网页向量与中心点向量之间的余弦距离,并根据最大的余弦距离判断网页属于哪一类的方法抽象出来, Kmeans 的迭代过程中可以直接在调用,简化了 Kmeans 主类的代码复杂度。

     Kmeans主类由两个Mapreduce 组成,一个是在迭代过程中更新中心点,一个是生成最后的结果,这两个 Mapreduce 的 Mapper 和 Reducer 如下:

     

    5 Kmeans聚类

    2.5 Spark实现

    Spark平台中的实现不需要分别编写Map方法和Reduce方法,而是按照串行程序的正常逻辑顺序来编写。具体实现过程如下:

    (1输入的文件使用Hadoop实现中预处理之后的文件,即样本文本特征向量集合,可以直接进行KMeans聚类的处理

    (2随机生成k个(k=10)向量(维度和取值范围都与样本文本的特征向量一致)作为初始的k个中心;

    (3使用map操作将每个样本文本特征向量与其所属的类别映射到一起(需要调用方法来计算每个样本文本特征向量到每个中心的距离,返回最近的中心ID);

    (4使用reduce操作按照映射到的中心ID来汇合,汇合的过程中,特征向量每个维度上数值都将累加,计数器也随之增长。然后再使用map操作,计算平均值,即可得到新的k个中心;

    (5计算新的k个中心与旧的k个中心的距离和,若为0(或小于某一阈值)则停止迭代,输出聚类结果,程序结束,否则继续迭代,重复345

    代码见附录。

    3 实验结果统计

    经统计,一共12142条样本文本,聚类到k=10,其中分类是0~9的分别为542217216522698302386303506976877条。

    Hadoop的结果:

     

    图2 Hadoop结果

    Spark的结果:

     

    图3 Spark结果

    4 对两个平台上实现方法的对比

    (1Hadoop各项时间的统计信息如下(单位ms):

    reading files and creating dict time: 73441

    generating vectors time: 7133

    total iteration: 10

    first iter time: 832246

    average iter time: 540034

    total time: 5204791

    (2Spark的各项时间的统计信息如下(单位ms):

    reading files time: 321

    total iteration: 8

    first iter time: 30428

    average iter time: 15000

    total time: 120132

    列表如下:

    单位:ms

      读文件时间

    迭代次数

    首次迭代时间

    平均迭代时间

    总时间

       Hadoop

    73441

    10

    832246

    540034

    5204791

       Spark

    321

    8

    30428

    15000

    120132

    6 HadoopSpark对比

    不难看出Spark平台的效率和速度都要比Hadoop平台高出很多。

    收获与建议

    通过本学期“网络大数据管理理论和应用”的学习,我了解到关于大数据、分布式平台、语义计算等比较前沿的知识。比如在学习过程中,我了解到通过大数据分析处理用户行为模式,向用户推荐广告,然后通过销量的变化来判断广告推荐系统的好坏这样一种算法,对于这种算法我就比较感兴趣,所以通过课程的学习我拓展了自身的视野。

    通过老师和助教的指导,我们还搭建了hadoopspark平台,进行了基本的mapreduce编程,为以后深入学习打下了基础。在学习过程中,我们还锻炼了做报告的能力,分享了自学的心得。

    至于建议方面,我觉得做报告的环节,是否可以限定在相关领域的核心期刊或会议论文上,这样学术性会更好,更能锻炼大家的学习和报告能力。


    附录

    object SparkKMeans {
      //String转Vector
      def parseVector(line: String): Vector = {
        return new Vector(line.split(' ').map(_.toDouble))
      }
    //计算最近中心点
      def closestCenter(p: Vector, centers: Array[Vector]): Int = {
        var bestIndex = 0
        var bestDist = p.squaredDist(centers(0)) //差平方之和
        for (i <- 1 until centers.length) {
          val dist = p.squaredDist(centers(i))
          if (dist < bestDist) {
            bestDist = dist
            bestIndex = i
          }
        }
        return bestIndex
      }
      //主方法
      def main(args: Array[String]) {
        val sc = new SparkContext("local", "SparkKMeans")
        System.out.println("reading files start...")
        val t1 = System.currentTimeMillis()
        val lines = sc.textFile("/home/fabkxd/Documents/kmeans_data/inputfile.txt")
        val points = lines.map(parseVector(_)).cache() //文本中每行代表一个样本,转换为Vector
        val t2 = System.currentTimeMillis()
        System.out.println("reading files done...\n")
        val dimensions = 5100 //节点的维度
        val k = 10 //聚类个数
        System.out.println("initial centers start...")
        // 随机初始化k个中心节点
        val rand = new Random(42)
        var centers = new Array[Vector](k)
        for (i <- 0 until k)
          centers(i) = Vector(dimensions, _ => rand.nextDouble)
        System.out.println("initial centers done...\n")
        System.out.println("kmeans iterations start...")
        var first: Long = 0
        val to1 = System.currentTimeMillis()
        var iter: Int = 1
        var over: Int = 1
    for (i <- 1 to iterations) {
          if (over == 1) {
            System.out.println("\titeration " + i + " start...")
            var ti1 = System.currentTimeMillis()
            //对每个点计算距离其最近的中心点
            val mappedPoints = points.map { p => (closestCenter(p, centers), (p, 1)) }
            val newCenters = mappedPoints.reduceByKey {
              case ((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2) //(向量相加, 计数器相加)
            }.map {
              case (id, (sum, count)) => (id, sum / count) //根据前面的聚类,用平均值算法重新计算中心节点的位置
            }.collect
            var cdist: Double = 0
            // 计算收敛距离并更新中心节点
            for ((id, value) <- newCenters) {
              cdist += value.squaredDist(centers(id))
              centers(id) = value
            }
            System.out.println("\t\tdistance: " + cdist)
     
            var ti2 = System.currentTimeMillis()
            if (first == 0)
              first = ti2 - ti1
            System.out.println("\titeration " + i + " done...")
            if (cdist < 0.3) {
              mappedPoints.saveAsTextFile("output")
              iter = i
              over = 0
            }
          }
        }
        val to2 = System.currentTimeMillis()
        System.out.println("kmeans iterations done...")
        System.out.println("reading files time: " + (t2 - t1))
        System.out.println("total iteration: " + iter);
        System.out.println("first iter time: " + first);
        System.out.println("average iter time: " + (to2 - to1) / iter);
        System.out.println("total time: " + (to2 - to1));
     
      }
    }
     


    展开全文
  • 本文转自... ... 本文章纯属个人学习笔记,持续不断的增加中...    本章主要的学习是中文分词 和两种统计词频(传统词频和TF-IDF算法 ) 的方法. ... 学习目的:通过N多的新闻标题 or 新闻摘...
  • 文本话题聚类(Kmeans/LDA)

    千次阅读 2019-04-06 20:33:33
    K-means 1 聚类是一种无监督的学习方法。聚类区别于分类,即事先不知道要寻找的内容,没有...3 K-means聚类算法,是一种广泛使用的聚类算法,其中k是需要指定的参数,即需要创建的簇的数目,K-means算法中的k个簇的...
  • Spark MLlib聚类KMeans

    2019-01-06 00:05:03
    算法说明  聚类(Cluster analysis...聚类算法是机器学习(或者说是数据挖掘更合适)中重要的一部分,除了最为简单的K-Means聚类算法外,比较常见的还有层次法(CURE、CHAMELEON等)、网格算法(STING、WaveClus...
  • 本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。...
  • http://www.aboutyun.com/thread-22235-1-1.html1.概述首先,笔者要先申明,我也是初学机器学习领域的内容,虽然我是从事大数据平台开发的工作,但是工作中确实没有跟spark MLlib打过交道,所以文中如果有描述错误的...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 基于Spark Mllib的文本分类 文本分类是一个典型的机器学习问题,其主要目标是通过对已有语料库文本数据训练得到分类模型,进而对新文本进行类别标签的预测。这在很多领域都有现实的应用场景,如新闻网站的新闻...
  • 利用word2vec对关键词进行聚类
  • 基于自适应参数调整的密度聚类算法的新闻热点发现实现步骤如下: 从es获取目标数据(新闻标题、摘要等信息):根据过滤条件获取目标数据; 利用bert将新闻标题和新闻摘要生成新闻特征向量: 利用bert-servin...
  • Spark快速大数据分析(一)

    千次阅读 2018-11-16 10:06:42
    Spark快速大数据分析 前3章内容,仅作为学习,有断章取义的嫌疑。如有问题参考原书 Spark快速大数据分析 以下为了打字方便,可能不是在注意大小写 1 Spark数据分析导论 1.1 Spark是什么 Spark是一个用来实现快速...
  • 文本相似度-NLP

    千次阅读 2019-03-16 17:04:14
    而有了文本之间相似性的度量方式,我们便可以利用划分法的K-means、基于密度的DBSCAN或者是基于模型的概率方法进行文本之间的聚类分析;另一方面,我们也可以利用文本之间的相似性对大规模语料进行去重预处理,或者...
  • 本文介绍使用Spark MLlib提供的朴素贝叶斯(Naive Bayes)算法,完成对中文文本的分类过程。主要包括中文分词、文本表示(TF-IDF)、模型训练、分类预测等。 中文分词 对于中文文本分类而言,需要先对文章进行
1 2 3 4 5 ... 20
收藏数 2,690
精华内容 1,076
关键字:

利用spark做文本聚类分析