精华内容
下载资源
问答
  • pso并行化算法.rar

    2019-06-27 11:16:59
    并行化实现了PSO算法,大大提高了算法的寻优效果。基于OpenMP和MPI两种方法实现。
  • Aprior并行化算法在Spark上的实现

    千次阅读 2015-08-12 08:59:45
    本文为大家分享的Spark实战案例是K-频繁项集挖掘——Apriori并行化算法的实现。关联数据挖掘、频繁项集挖掘的常用算法有Apriori,Fp-growth以及eclat算法。这里我使用Apriori算法进行频繁项集挖掘。Apriori算法于2006...

    本文为大家分享的Spark实战案例是K-频繁项集挖掘——Apriori并行化算法的实现。关联数据挖掘、频繁项集挖掘的常用算法有Apriori,Fp-growth以及eclat算法。这里我使用Apriori算法进行频繁项集挖掘。Apriori算法于2006年12月被国际权威的学术组织ICDM评为数据挖掘领域的十大经典算法。不熟悉的同学可以关注我的文章,我会详细讲解其原理及实现。

    首先给出需求说明:在Chess标准数据集上进行1到8频繁项集的挖掘,其中支持度support=0.85。每个文件的输出格式为项集:频率,如a,b,c:0.85。

    我们在写Spark程序的时候一定要注意写出的程序是并行化的,而不是只在client上运行的单机程序。否则你的算法效率将让你跌破眼镜而你还在郁闷为什么Spark这么慢甚至比不上Hadoop-MR。此外还需要对算法做相关优化。在这里主要和大家交流一下算法思路和相关优化。

    对于Apriori算法的实现见下文源码。在Spark上实现这个算法的时候主要分为两个阶段。第一阶段是一个整体的遍历求出每个项集的阶段,第二阶段主要是针对第i个项集求出第i+1项集的候选集的阶段。

    对于这个算法可以做如下优化:
    1. 观察!这点很重要,经过观察可以发现有大量重复的数据,所谓方向不对努力白费也是这个道理,首先需要压缩重复的数据。不然会做许多无用功。
    2. 设计算法的时候一定要注意是并行化的,大家可能很疑惑,Spark不就是并行化的么?可是你一不小心可能就写成只在client端运行的算法了。
    3. 因为数据量比较大,切记多使用数据持久化以及BroadCast广播变量对中间数据进行相应处理。
    4. 数据结构的优化,BitSet是一种优秀的数据结构他只需一位就可以存储以个整形数,对于所给出的数据都是整数的情况特别适用。
    下面给出算法实现源码:

    import scala.util.control.Breaks._
    import scala.collection.mutable.ArrayBuffer
    import java.util.BitSet
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark._
    
    
    object FrequentItemset {
      def main(args: Array[String]) {
        if (args.length != 2) {
          println("USage:<Datapath> <Output>")
        }
        //initial SparkContext
        val sc = new SparkContext()
        val SUPPORT_NUM = 15278611 //Transactions total is num=17974836, SUPPORT_NUM = num*0.85
        val TRANSACITON_NUM = 17974836.0
        val K = 8
    
    
        //All transactions after removing transaction ID, and here we combine the same transactions.
        val transactions = sc.textFile(args(0)).map(line =>
          line.substring(line.indexOf(" ") + 1).trim).map((_, 1)).reduceByKey(_ + _).map(line => {
          val bitSet = new BitSet()
          val ss = line._1.split(" ")
          for (i <- 0 until ss.length) {
            bitSet.set(ss(i).toInt, true)
          }
          (bitSet, line._2)
        }).cache()
    
    
        //To get 1 frequent itemset, here, fi represents frequent itemset
        var fi = transactions.flatMap { line =>
          val tmp = new ArrayBuffer[(String, Int)]
          for (i <- 0 until line._1.size()) {
            if (line._1.get(i)) tmp += ((i.toString, line._2))
          }
          tmp
        }.reduceByKey(_ + _).filter(line1 => line1._2 >= SUPPORT_NUM).cache()
        val result = fi.map(line => line._1 + ":" + line._2 / TRANSACITON_NUM)
        result.saveAsTextFile(args(1) + "/result-1")
    
    
        for (i <- 2 to K) {
          val candiateFI = getCandiateFI(fi.map(_._1).collect(), i)
          val bccFI = sc.broadcast(candiateFI)
          //To get the final frequent itemset
          fi = transactions.flatMap { line =>
            val tmp = new ArrayBuffer[(String, Int)]()
            //To check if each itemset of candiateFI in transactions
            bccFI.value.foreach { itemset =>
              val itemArray = itemset.split(",")
              var count = 0
              for (item <- itemArray) if (line._1.get(item.toInt)) count += 1
              if (count == itemArray.size) tmp += ((itemset, line._2))
            }
            tmp
          }.reduceByKey(_ + _).filter(_._2 >= SUPPORT_NUM).cache()
          val result = fi.map(line => line._1 + ":" + line._2 / TRANSACITON_NUM)
          result.saveAsTextFile(args(1) + "/result-" + i)
          bccFI.unpersist()
        }
      }
    
    
      //To get the candiate k frequent itemset from k-1 frequent itemset
      def getCandiateFI(f: Array[String], tag: Int) = {
        val separator = ","
        val arrayBuffer = ArrayBuffer[String]()
        for(i <- 0 until f.length;j <- i + 1 until f.length){
          var tmp = ""
          if(2 == tag) tmp = (f(i) + "," + f(j)).split(",").sortWith((a,b) => a.toInt <= b.toInt).reduce(_+","+_)
          else {
            if (f(i).substring(0, f(i).lastIndexOf(',')).equals(f(j).substring(0, f(j).lastIndexOf(',')))) {
              tmp = (f(i) + f(j).substring(f(j).lastIndexOf(','))).split(",").sortWith((a, b) => a.toInt <= b.toInt).reduce(_ + "," + _)
            }
          }
          var hasInfrequentSubItem = false //To filter the item which has infrequent subitem
          if (!tmp.equals("")) {
            val arrayTmp = tmp.split(separator)
            breakable {
              for (i <- 0 until arrayTmp.size) {
                var subItem = ""
                for (j <- 0 until arrayTmp.size) {
                  if (j != i) subItem += arrayTmp(j) + separator
                }
                //To remove the separator "," in the end of the item
                subItem = subItem.substring(0, subItem.lastIndexOf(separator))
                if (!f.contains(subItem)) {
                  hasInfrequentSubItem = true
                  break
                }
              }
            } //breakable
          }
          else hasInfrequentSubItem = true
          //If itemset has no sub inftequent itemset, then put it into candiateFI
          if (!hasInfrequentSubItem) arrayBuffer += (tmp)
        } //for
        arrayBuffer.toArray
      }
    }

    在这里提一下我的实验结果以便大家参考,对于2G,1800W条记录的数据,在80G内存,10个虚拟节点的集群上18秒就算完了1-8频繁项集的挖掘。应该还算不错。

    先写到这里,欢迎大家提出相关的建议或意见。
    (by希慕,转载请注明出处)

    展开全文
  • 为了将BP算法用于大规模数据分类问题,采用MapReduce思想,将大数据集切分成若干小的数据集来并行加速处理,同时引入Bagging算法的思想来综合并行结果,提高分类的准确率。通过在各个节点上根据子数据集独立地训练各个BP...
  • 大家好,下面为大家分享的实战案例是K-频繁相机挖掘并行化算法。相信从事数据挖掘相关工作的同学对频繁项集的相关算法 比较了解,这里我们用Apriori算法及其优化算法实现。 首先说一下实验结果。对于2G,1800W...

    大家好,下面为大家分享的实战案例是K-频繁相机挖掘并行化算法。相信从事数据挖掘相关工作的同学对频繁项集的相关算法

    比较了解,这里我们用Apriori算法及其优化算法实现。

    首先说一下实验结果。对于2G,1800W条记录的数据,我们用了18秒就算完了1-8频繁项集的挖掘。应该还算不错。

    给出题目:

    本题的较第四题难度更大。我们在写程序的时候一定要注意写出的程序是并行化的,而不是只在client上运行的单机程序。否

    则你的算法效率将让你跌破眼镜。此外还需要对算法做相关优化。在这里主要和大家交流一下算法思路和相关优化。

    对于Apriori算法的实现在这里不做过多赘述,百度一下大片大片。在Spark上实现这个算法的时候主要分为两个阶段第一阶段

    是一个整体的循环求出每个项集的阶段,第二阶段主要是针对第i个项集求出第i+1项集的候选集的阶段。

    对于这个算法可以做如下优化:

    1. 观察!这点很重要,经过观察可以发现有大量重复的数据,所谓方向不对努力白费也是这个道理,首先需要压缩重复的数据。不然会做许多无用功。
    2. 设计算法的时候一定要注意是并行化的,大家可能很疑惑,Spark不就是并行化的么?可是你一不小心可能就写成只在client端运行的算法了。
    3. 因为数据量比较大,切记多使用数据持久化以及BroadCast广播变量对中间数据进行相应处理。
    4. 数据结构的优化,BitSet是一种优秀的数据结构他只需一位就可以存储以个整形数,对于所给出的数据都是整数的情况特别适用。 
    下面给出算法实现源码:
    • import scala.util.control.Breaks._
    • import scala.collection.mutable.ArrayBuffer
    • import java.util.BitSet
    • import org.apache.spark.SparkContext
    • import org.apache.spark.SparkContext._
    • import org.apache.spark._


    • object FrequentItemset {
    •   def main(args: Array[String]) {
    •     if (args.length != 2) {
    •       println("USage:<Datapath> <Output>")
    •     }
    •     //initial SparkContext
    •     val sc = new SparkContext()
    •     val SUPPORT_NUM = 15278611 //Transactions total is num=17974836, SUPPORT_NUM = num*0.85
    •     val TRANSACITON_NUM = 17974836.0
    •     val K = 8


    •     //All transactions after removing transaction ID, and here we combine the same transactions.
    •     val transactions = sc.textFile(args(0)).map(line =>
    •       line.substring(line.indexOf(" ") + 1).trim).map((_, 1)).reduceByKey(_ + _).map(line => {
    •       val bitSet = new BitSet()
    •       val ss = line._1.split(" ")
    •       for (i <- 0 until ss.length) {
    •         bitSet.set(ss(i).toInt, true)
    •       }
    •       (bitSet, line._2)
    •     }).cache()


    •     //To get 1 frequent itemset, here, fi represents frequent itemset
    •     var fi = transactions.flatMap { line =>
    •       val tmp = new ArrayBuffer[(String, Int)]
    •       for (i <- 0 until line._1.size()) {
    •         if (line._1.get(i)) tmp += ((i.toString, line._2))
    •       }
    •       tmp
    •     }.reduceByKey(_ + _).filter(line1 => line1._2 >= SUPPORT_NUM).cache()
    •     val result = fi.map(line => line._1 + ":" + line._2 / TRANSACITON_NUM)
    •     result.saveAsTextFile(args(1) + "/result-1")


    •     for (i <- 2 to K) {
    •       val candiateFI = getCandiateFI(fi.map(_._1).collect(), i)
    •       val bccFI = sc.broadcast(candiateFI)
    •       //To get the final frequent itemset
    •       fi = transactions.flatMap { line =>
    •         val tmp = new ArrayBuffer[(String, Int)]()
    •         //To check if each itemset of candiateFI in transactions
    •         bccFI.value.foreach { itemset =>
    •           val itemArray = itemset.split(",")
    •           var count = 0
    •           for (item <- itemArray) if (line._1.get(item.toInt)) count += 1
    •           if (count == itemArray.size) tmp += ((itemset, line._2))
    •         }
    •         tmp
    •       }.reduceByKey(_ + _).filter(_._2 >= SUPPORT_NUM).cache()
    •       val result = fi.map(line => line._1 + ":" + line._2 / TRANSACITON_NUM)
    •       result.saveAsTextFile(args(1) + "/result-" + i)
    •       bccFI.unpersist()
    •     }
    •   }


    •   //To get the candiate k frequent itemset from k-1 frequent itemset
    •   def getCandiateFI(f: Array[String], tag: Int) = {
    •     val separator = ","
    •     val arrayBuffer = ArrayBuffer[String]()
    •     for(i <- 0 until f.length;j <- i + 1 until f.length){
    •       var tmp = ""
    •       if(2 == tag) tmp = (f(i) + "," + f(j)).split(",").sortWith((a,b) => a.toInt <= b.toInt).reduce(_+","+_)
    •       else {
    •         if (f(i).substring(0, f(i).lastIndexOf(',')).equals(f(j).substring(0, f(j).lastIndexOf(',')))) {
    •           tmp = (f(i) + f(j).substring(f(j).lastIndexOf(','))).split(",").sortWith((a, b) => a.toInt <= b.toInt).reduce(_ + "," + _)
    •         }
    •       }
    •       var hasInfrequentSubItem = false //To filter the item which has infrequent subitem
    •       if (!tmp.equals("")) {
    •         val arrayTmp = tmp.split(separator)
    •         breakable {
    •           for (i <- 0 until arrayTmp.size) {
    •             var subItem = ""
    •             for (j <- 0 until arrayTmp.size) {
    •               if (j != i) subItem += arrayTmp(j) + separator
    •             }
    •             //To remove the separator "," in the end of the item
    •             subItem = subItem.substring(0, subItem.lastIndexOf(separator))
    •             if (!f.contains(subItem)) {
    •               hasInfrequentSubItem = true
    •               break
    •             }
    •           }
    •         } //breakable
    •       }
    •       else hasInfrequentSubItem = true
    •       //If itemset has no sub inftequent itemset, then put it into candiateFI
    •       if (!hasInfrequentSubItem) arrayBuffer += (tmp)
    •     } //for
    •     arrayBuffer.toArray
    •   }
    • }
    先写到这里,欢迎大家提出相关的建议或意见。
                (by老杨,转载请注明出处)


    转载于:https://www.cnblogs.com/yangmu/p/4216103.html

    展开全文
  • 矩阵乘法的并行化算法讨论

    万次阅读 多人点赞 2015-09-26 13:28:14
    另一方面,矩阵乘法同时也是并行计算领域常常被用来作为范例的一个话题。它的特点是首先计算量可能相当大,适合利用并行实现来提高效率。其次,它所使用的各种数据之间(矩阵中的元素)没有相互依赖性,可以充分使用...
    矩阵乘法是线性代数里面会讲到的一种非常基础、也十分普遍的计算规则。另一方面,矩阵乘法同时也是并行计算领域常常被用来作为范例的一个话题。它的特点是首先计算量可能相当大,适合利用并行实现来提高效率。其次,它所使用的各种数据之间(矩阵中的元素)没有相互依赖性,可以充分使用并行处理的计算资源。


    串行实现


    根据线性代数的基本知识,m × l 的矩阵A,乘以一个大小为 l × n 的矩阵B,将得到一个 m × n 的矩阵C=A×B,其中


    下图是用图示来表示的这种计算规则:


    为了方便讨论,我们可以不是普遍性地假设有所矩阵的大小都是 n × n 的,下面就是串行实现的矩阵乘法的代码:

    int A[n][n], B[n][n], C[n][n];
    ...
    for(i=0;i<n;i++){
        for(j=0;j<n;j++){
            C[i][j]=0;
            for(k=0;k<n;k++)
                C[i][j]+=A[i][k]*B[k][j];
        }
    }
    

    易见,这个算法的计算复杂度为O(n^3)。


    基本并行实现的讨论


    正如前面所讲的,矩阵相乘过程中,结果矩阵C中的每个元素都是可以独立计算的,即彼此之间并无依赖性。所以如果采用更多的处理器,将会显著地提高矩阵相乘的计算效率。


    对于大小为n × n 的矩阵,加入我们有n个处理器,那么结果矩阵中的每一行,都可以用一个处理器来负责计算。此时,总共的并行计算步数为 O(n^2)。你可以理解为在串行实现的代码中,最外层的循环 for(i=0;i<n;i++) 被分别由n个处理器来并行的执行,而每个处理需要完成的任务仅仅是内部的两层循环。


    如果采用n^2个处理器,那么就相当于结果矩阵中的每个元素都由一个处理器来负责计算。此时,总共的并行计算步数为 O(n)。你可以理解为在串行实现的代码中,最外面的两层循环 被分解到n^2个处理器来并行的执行,而每个处理需要完成的任务仅仅是内部的一层循环,即for(k=0;k<n;k++)。


    更进一步,如果有n^3个处理器,那么即使最内层的循环for(k=0;k<n;k++)也有n个处理器在并行的负责。但是最终的求和运算,我们需要一个类似reduction的操作,因此最终的计算复杂度就是O(log n)。


    当然,你一定会想到的是,实际中,通常并不可能有像矩阵元素那么多的处理器资源。这时我们该怎么做。对于一个大小为n × n 的大矩阵A,我们其实可以把它切分成s^2个子矩阵Ap,q,每个子矩阵的大小为 m × m,其中 m = n / s,即0 <= p, q < s。对于两个大矩阵A和B,现在我们有:


    用图示表示则有:



    Cannon算法


    著名的Cannnon算法使用一个由s^2个处理器组成的二维网孔结构(mesh),而且这个mesh还是周边带环绕的(The processors are connected as a torus)。处理器Processor (i,j) (我们用它来表示位于位置(i,j)处的处理器)最开始时存有子矩阵Ai,jBi,j。随着算法的进行,这些子矩阵会向左或向上位移。如下图所示:


    这个算法的根本出发点是在处理器阵列中,要合理分布两个待乘的矩阵元素。由乘积公式可知,要在处理单元 P(i,j)中计算乘积元素C(i,j),必须在该单元中准备好矩阵元素A(i,s)和B(s,j)。但是如果我们像下图那样分布矩阵元素,我们在计算C(i,j)时所需的元素显然是不足够的,但是可以通过向上循环位移B的元素,并向左循环位移A的元素,来获取合适的成对的矩阵元素。


    Cannnon算法的具体流程:



    下面是矩阵位移的一个示例,其中s=3;


    显然,算法的复杂度 t(n)=O(n), p(n) = n^2,w(n) = O(n^3),所以是成本最佳的。


    ---------------------------------------------

    参考文献与推荐阅读材料

    【1】陈国良,并行算法的设计与分析(第3版),高等教育出版社,2009

    【2】矩阵计算并行算法(百度文库地址:http://wenku.baidu.com/view/d64ba9b4b14e852458fb57fc.html)


    (本文完)


    展开全文
  • 莎士比亚文集词频统计并行化算法

    千次阅读 2016-11-21 21:40:12
    声明:本博客内容由本人经过实验楼实验得来。...莎士比亚文集中具有多个章节,因此需要用到并行化的方法,这里使用Spark进行处理。数据下载# 莎士比亚文集: wget http://labfile.oss.aliyuncs.com/courses/456/sha

    声明:本博客内容由本人经过实验楼实验得来。

    题目描述

    在给定的莎士比亚文集上(多个文件),根据规定的停词表,统计出现频率最高的 100 个单词。所谓的停词表,即在词表中的词语并不统计他的频率。

    莎士比亚文集中具有多个章节,因此需要用到并行化的方法,这里使用Spark进行处理。

    数据下载

    # 莎士比亚文集:
    wget http://labfile.oss.aliyuncs.com/courses/456/shakespear.zip
    # 停词表:
    wget http://labfile.oss.aliyuncs.com/courses/456/stopword.txt

    编程模型

    Spark上开发的应用程序都是由一个driver programe构成,这个所谓的驱动程序在Spark集群通过跑main函数来执行各种并行操作。集群上的所有节点进行并行计算需要共同访问一个分区元素的集合,这就是RDD(RDD resilient distributed dataset)弹性分布式数据集。RDD可以存储在内存或磁盘中,具有一定的容错性,可以在节点宕机重启后恢复。RDD可以从文件系统或HDFS中的文件创建,也可以从Scala或Python集合中创建。

    主要针对RDD进行各种操作,程序中的步骤:

    提取数据到RDD中,在本实验中我们将莎士比亚文集和停词表文件转换成RDD
    1. 转换(transformations)操作:将已存在的数据集转换成新的数据集,例如map。转换是惰性的,不会立刻计算结果,仅仅记录转换操作应用的目标数据集,当动作需要一个结果时才计算。在本实验中我们需要转换文集RDD和停词表RDD。
    2. 动作(actions) :数据集计算后返回一个值给驱动程序,例如reduce。本实验中需要对统计词频map的结果进行reduce操作。

    程序实现

    读入文件:

    # 打开pyspark
    [yqtao@localhost ~]$ pyspark
    # 读入数据
    >>> inputFiles = "/home/yqtao/bigdata/shakespear/*"
    >>> stopWordFile = "/home/yqtao/bigdata/stopword.txt"
    # 创建RDD文件
    >>> inputRDD = sc.textFile(inputFiles)
    >>> stopRDD = sc.textFile(stopWordFile)

    处理特殊符号

    输入的文件内容里除了英文单词之外,还有下面的特殊符号,这些符号不应该计算到统计中。

    • 制表符
    • 反斜线
    • 句号
    • 逗号
    • 竖线 |
    • 中括号
    • 问号
    • 破折号
    • 叹号
    • 分号
    • 其他英文符号

    其策略就是用空格替换这些特出的符号,然后在将其划分为单词。

    # targetList是一个列表,包含这些特殊的符号
    # 定义一个函数实现此功能
    # python真的好强大!!!!
    >>> targetList = list('\t\().,?[]!;|') + ['--']
    >>> 
    >>> def replaceAndSplit(s):
    ...     for c in targetList:
    ...         s = s.replace(c, " ")
    ...     return s.split()
    
    # 通过flatMap调用自定义的函数
    # 即此时的RDD中没有了这些特殊的符号
    >>>inputRDDv1 = inputRDD.flatMap(replaceAndSplit)

    处理停词表

    将停词表读入列表中:

    # 去除空行,获得列表
    >>> stopList = stopRDD.map(lambda x: x.strip()).collect()

    如果输入的文件中有停词表中的词,则将其去除,采用RDD的filter方法。

    >>>inputRDDv2 = inputRDDv1.filter(lambda x: x not in stopList)

    Map操作

    所谓的map操作转换操作,这点要深刻理解。
    给每个单词组成元组(x,1),表示单词x出现了1次。

    >>>inputRDDv3 = inputRDDv2.map(lambda x: (x,1))

    Reduce操作

    即计算操作,将相同的词进行统计频数。这里是对第二个数进行统计的。

    # 导入add
    >>>from operator import add
    >>>inputRDDv4 = inputRDDv3.reduceByKey(add)
    # 保存,可查看结果
    # 可以看到他是分布式的存储的
    >>>inputRDDv4.saveAsTextFile('/tmp/v4output')

    TopK操作

    即要统计出现最高频率的100个单词。

    # 首先将单词频率与单词进行交换,这样就可以对频率进行排序
    >>>inputRDDv5 = inputRDDv4.map(lambda x: (x[1], x[0]))
    # 进行降序排列ascending=False表示降序
    >>>inputRDDv6 = inputRDDv5.sortByKey(ascending=False)

    排完序后,取单词:

    # 交换位置,去key值
    >>>inputRDDv7 = inputRDDv6.map(lambda x: (x[1], x[0])).keys()
    # 取前100个单词
    >>>top100 = inputRDDv7.take(100)
    # 存储操作
    >>>outputFile = "/tmp/result"
    >>>result = sc.parallelize(top100)
    >>>result.saveAsTextFile(outputFile)

    完整的python代码

      1 #!/usr/bin/python
      2 # coding: utf-8
      3 from pyspark import SparkContext,SparkConf
      4 from operator import add
      5 import sys
      6 appName="WordCount"
      7 conf=SparkConf().setAppName(appName).setMaster("local")
      8 sc=SparkContext(conf=conf)
      9 inputFiles = "/home/yqtao/bigdata/shakespear/*"
      1 #!/usr/bin/python
      2 # coding: utf-8
      3 from pyspark import SparkContext,SparkConf
      4 from operator import add
      5 import sys
      6 appName="WordCount"
      7 conf=SparkConf().setAppName(appName).setMaster("local")
      8 sc=SparkContext(conf=conf)
      9 inputFiles = "/home/yqtao/bigdata/shakespear/*"
     10 stopWordFile = "/home/yqtao/bigdata/stopword.txt"
     11 outputFile = "/tmp/result"
     12 
     13 targetList=list('\t\().,?;|')+['--']
     14 
     15 def replaceAndSplit(s):
     16     for c in targetList:
     17         s=s.replace(c," ")
     18     return s.split()    
     19     
     20     
     21 inputRDD=sc.textFile(inputFiles)
     22 stopRDD=sc.textFile(stopWordFile)
     23 stopList = stopRDD.map(lambda x:x.strip()).collect()
     24     
     25 inputRDD1=inputRDD.flatMap(replaceAndSplit)
     26 inputRDD2=inputRDD1.filter(lambda x: x not in stopList)
     27 inputRDD3=inputRDD2.map(lambda x: (x,1))
     28 inputRDD4=inputRDD3.reduceByKey(add)
     29 inputRDD5=inputRDD4.map(lambda x:(x[1],x[0]))
     30 inputRDD6=inputRDD5.sortByKey(ascending=False)
     31 inputRDD7=inputRDD6.map(lambda x: (x[1],x[0])).keys()
     32 top100=inputRDD7.take(100)
     33 result=sc.parallelize(top100)
     34 result.saveAsTextFile(outputFile)

    运行:

    [yqtao@localhost bigdata]$ spark-submit shake.py

    打开结果如下所示:

      1 I
      2 And
      3 him
      4 thou
      5 so
      6 The
      7 thy
      8 all
      9 To
     10 by
     11 thee
     12 That
     13 we
     14 But
     15 what
     16 good
     17 O
     18 more
     19 they
     20 What
     21 lord
     22 now
     23 love
     24 them
     25 A
     26 KING
    展开全文
  • 朴素贝叶斯分类并行化算法

    千次阅读 2016-06-20 11:20:31
    NaiveBayesMain.java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser;...
  • 背景:spark graphx并未提供极大团挖掘算法当下的极大团算法都是串行化的算法...在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍...
  • 利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化) 对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据...
  • 在 Spark2.3 平台上实现 Apriori 频繁项集挖掘的并行化算法。要求程序利用 Spark 进行并行计算。 二、算法设计 2.1 设计思路 变量定义 D为数据集,设Lk是k项频繁项集,Ck是k项候选集,每一行数据定义为一笔交易...
  • 包含两种平台上运行的kmeans算法:一种是在Hadoop系统上的并行化kmeans算法,支持读文件,执行聚类算法,输出质心文件,将每个数据的聚类信息输出到控制台上;另一种是串行的聚类算法,支持读文件数据,执行kmeans...
  • 最初学习归并算法,对不会使其具体跑在不同的核上报有深深地怨念,刚好算倒重温了这个算法,利用java的thread来体验一下并行归并算法
  • 介绍了十余种代表性的基于 CPU 和 GPU 并行化碰撞检测算法,并从算法的可扩展性和存储空间消耗以及任务量均衡化等方面分析了这些算法的优缺点。最后总结了并行化碰撞检测算法研究中存在的问题和新的发展方向以及常用...
  • 针对这一问题,提出了一种基于MapReduce模型的并行化k-medoids聚类算法,首先采用基于密度的聚类思想对k-medoids算法初始点的选取策略进行优化,并利用Hadoop平台下的MapReduce编程框架实现了算法并行化处理。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,988
精华内容 795
关键字:

并行化算法