2017-02-07 14:52:28 qq_30843221 阅读数 4931
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34884 人正在学习 去看看 张长志

聚类分析

什么是聚类分析?《数据挖掘导论》是给出了这样的定义:聚类分析仅根据在数据中发现的描述对象及其关系的信息,将数据对象分组。其目标是,组内的对象相互之间是相似的(相关的),而不同组中的对象是不同(不相关的)。组内的相似性(同质性)越大,组间差别越大,聚类就越好。

想像有这样的一个情景:用户每天都会通过搜索引擎去查询他/她所感兴趣的信息,而我们希望能够根据用户的搜索词去细分目标用户群体,从而分析不同用户群体对哪些信息比较感兴趣。这时,聚类分析就是我们常常采用的手段。

高斯混合分布聚类模型

除了常见的基于距离的聚类模型,如k-means聚类,聚类中也有基于概率模型,例如高斯混合分布聚类模型(GMM)。基于概率模型的好处在于,它并没有像k-means那样让每一个数据点只能归属于一个簇当中,而是通过概率来反映每个数据点可能分布到每一个簇的概率值,即属于软聚类。在某些场景中,软聚类能够解释数据点的多元性,好比如人的兴趣点不唯一,用户行为的多样性等等。
高斯混合分布模型主要是利用EM算法做参数估计,关于高斯混合分布聚类模型的详细讲述,我将其放到另一份博客当中:
http://blog.csdn.net/qq_30843221/article/details/54894640

聚类模型的详细过程

1.样本数据
我们模拟用户搜索行为,一组是搜索关于电影内容,而另一组是关于机器学习,具体数据如下:

好看 电影 惊悚 悬疑 不错 推荐
机器学习 自然语言处理 信息 检索
机器学习 数据挖掘 人工智能 检索
电影 动画 精彩 好看 不错 加油 推荐

2.数据的加载,基本的分词(我使用的java版的spark,分词工具为hanlp)

//加载数据
String filename = "/home/quincy1994/test.txt";
JavaRDD<String> sentences = sc.textFile(filename);
JavaRDD<String> segRDD = sentences.map(new Seg());
JavaRDD<Row> jrdd = segRDD.map(new StringtoRow());
segRDD.cache();

//数据转换为矩阵
StructType schema = new StructType(new StructField[]{
    new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
DataFrame sentenceData = sqlContext.createDataFrame(jrdd, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");  //tokenizer以简单的空白分割词语
DataFrame wordsData = tokenizer.transform(sentenceData); // 将句子分割词语

//分词类
    static class Seg implements Function<String, String>{

        public String call(String sentence) throws Exception{
            String segStr = "";
            List<Term> termList = segment.seg(sentence); //分词
            StringBuilder sb = new StringBuilder();
            for(Term term: termList){
                String word = term.word;
                sb.append(word+ " ");
            }
            segStr = sb.toString().trim();
            return segStr;
        }
    }

    //将String的sentence转变为mllib中row数据类型
    static class StringtoRow implements Function<String, Row>{

        public Row call(String sentence) throws Exception {
            return RowFactory.create(sentence);
        }
    }

3.特征选择(我主要采用的tfidf模型,刚开始使用word2vec不太理想,可能数据太稀疏)

//tfidf模型
int numFeatures = 20;  //选定抽取前k个特征
HashingTF hashingTF  = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numFeatures);
DataFrame featurizedData = hashingTF.transform(wordsData);
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
DataFrame result = idfModel.transform(featurizedData);

4.数据的归一化处理(之前忘了做这步,也导致数据聚类效果不理想)

//归一化处理
Normalizer normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures").setP(1.0);
DataFrame l1NormData = normalizer.transform(result.select("features"));
JavaRDD<Vector> normRDD = l1NormData.rdd().toJavaRDD().map(new RowToVector()); //将row转变成为vector
normRDD.cache();

//将row转变为Vector,机器学习模型基本采用vector类型
    static class RowToVector implements Function<Row, Vector>{

        public Vector call(Row r) throws Exception {
            // TODO Auto-generated method stub
            Vector features = r.getAs(0);    //将row转变成为vector 
            return features;
        }
    }

5.使用高斯混合模型聚类(代码超简单)

static int k = 2; //设定有多少个高斯混合模型
GaussianMixtureModel gmm = new GaussianMixture().setK(k).run(normRDD.rdd());
normRDD.cache();

6.为每个节点标记它归属的簇

//为每个节点标记归属的簇
        RDD<Vector> points = normRDD.rdd();
        JavaRDD<double[]> predictRDD = new JavaRDD(gmm.predictSoft(points), null);
        JavaRDD<Integer> resultRDD = predictRDD.map(new Group());
        resultRDD.cache();
static class Group implements Function<double[], Integer>{

        //我设定归属概率大于0.5的簇,否则当其为噪声
        public Integer call(double[] probabilities) throws Exception {
            double max = 0.5;
            int index = -1;
            for(int i = 0; i < probabilities.length; i++){
                if(max <= probabilities[i]){
                    index = i;
                    break;
                }
            }
            return index;
        }
    }

7.从每个簇中提取主要标签词

//在每个簇中提取主标签
        Object[] output= resultRDD.collect().toArray();  //得到每个数据点属于的簇
        Object[]  seg = segRDD.collect().toArray();    //得到每个数据点原来的标签词
        //集合不同簇各自的标签词
        List<Tuple2<Integer, String>> list = new ArrayList<Tuple2<Integer, String>>();
        for(int i = 0; i<output.length; i++){
            int group = (Integer) output[i];
            String tags = (String) seg[i];
            Tuple2<Integer, String> one = new Tuple2<Integer, String>(group, tags);
            list.add(one);
        }
        JavaPairRDD<Integer, String>  rddValue = sc.parallelizePairs(list);
        JavaPairRDD<Integer, Iterable<String>> groupRDD = rddValue.groupByKey();  //按簇归类
        JavaRDD<Tuple2<Integer, String>> tagsRDD = groupRDD.map(new ReduceString()); //将不同的标签混合在一块
        JavaRDD<Tuple2<Integer,String>> topKRDD = tagsRDD.map(new TopTag()); //找出前k个具有代表性的标签

static class ReduceString implements Function<Tuple2<Integer, Iterable<String>>, Tuple2<Integer, String>>{
        //合并标签词
        public Tuple2<Integer, String> call(Tuple2<Integer, Iterable<String>> clusterString){
            int key = clusterString._1();
            StringBuffer sb = new StringBuffer();
            Iterable<String> iter = clusterString._2();
            for( String string: iter){
                sb.append(string + " ");
            }
            return new Tuple2(key, sb.toString().trim());
        }
    }

static class TopTag implements Function<Tuple2<Integer, String>, Tuple2<Integer, String>>{
        //将所有的标签收集,排序,找出频率最高的前k个标签词 
        int topK = 3; 

        public Tuple2<Integer, String> call(Tuple2<Integer, String> cluster){
            int key = cluster._1();
            String[] taglist = cluster._2().split(" ");
            Map<String, Integer> map = new HashMap<String, Integer>();
            for(String tag: taglist){
                if(!map.containsKey(tag)){
                    map.put(tag, 1);
                }
                else{
                    int count = map.get(tag);
                    map.put(tag, count + 1);
                }
            }

            List<Map.Entry<String, Integer>> infolds = new ArrayList<Map.Entry<String, Integer>>(map.entrySet());
            Collections.sort(infolds, new Comparator<Map.Entry<String, Integer>>(){
                public int compare(Map.Entry<String, Integer>o1, Map.Entry<String, Integer>o2){
                    return (o2.getValue() - o1.getValue());
                }
            });
            String str = "";
            int num = 0;
            for(Map.Entry<String, Integer> one: infolds){
                str += one.getKey() + " ";
                if(num == topK){
                    break;
                }
                num += 1;
            }
            return new Tuple2<Integer, String>(key, str.trim());
        }
    }

8.输出结果

//输出结果
        List<Tuple2<Integer, String>> reducelist = topKRDD.collect();
        for(Tuple2<Integer, String> tags: reducelist){
            System.out.println(tags._1() + ":" + tags._2());
        }

结果如下:

0:机器学习 检索 信息
1:电影 推荐 好看

我将具体的代码放置我的github中:
https://www.github.com/Quincy1994/SparkStudy/tree/master/cluster

2017-11-17 20:21:33 u011195431 阅读数 1354
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34884 人正在学习 去看看 张长志

Spark文本聚类

聚类是常见的无监督学习算法,用于找到相似的Item,在无标记数据的情况下经常使用。这里考虑,当我们拥有大量文本,需要找到相似的文本(粗分类)时,使用Spark进行实验。

Spark mlib简介

mlib是Spark提供的机器学习算法库,提供特征工程、分类、回归、聚类、协同过滤等算法调用接口。
(1)对于特征工程主要包括:特征提取、特征变换、特征选择等。
提供常见的特征提取方法包括:TF-IDF,Word2Vec,CountVecmtorizer;
提供常见的特征变换方法包括:分句、去停用词、n-gram语言模型、二值化、PCA、DCT变换、One-hot编码等;
提供常见的特征选择算法:VectorSlicer、 R model Formula、卡方检验等。
(2)对于分类和回归,提供逻辑回归、决策树、随机森林、GBDT、MLP、朴素贝叶斯等方法。
(3)对于聚类,提供K-Means,LDA主题模型、高斯混合模型等。
(4)另外还支持协同过滤,便于搭建推荐系统。

中文文本分词

对于python而言,常见的分词工具有jieba,为了更加精细准确的进行中文分词,此处采用PyLTP工具(支持自定义词典)。

TFIDF特征

用词频表示文本特征,将文本进行量化,变成矩阵形式。矩阵内的每个元素就是词语对应的词频信息。矩阵大小为M*N,其中M表示文本数量,N表示词典中词语数量。TF表示某词在该文本内出现的频率,IDF表示Inverse Document Frequency, 某个词在文本中出现的频率。
IDF
|D|表示文本数量,DF(t,D)表示出现t词的文本数量
TFIDF = TF *IDF

word2vec介绍

word2vec是google开发的向量化词语的工具,实现方法是CBOW和Skip-Gram算法。Spark使用如下:

from pyspark.mllib.feature import Word2Vec

word2vec = Word2Vec()
model = word2vec.fit(input_data)

文本表示

为了将文本量化,使用TFIDF词频特征,再加上word embedding共同表示文本。TFIDF= S1(M*N矩阵),word2vec = S2(N*K矩阵)。文档表示为S = S1*S2 (M*K维矩阵)

Kmeans, LDA聚类

Kmeans聚类

from numpy import array
from math import sqrt

from pyspark.mllib.clustering import KMeans, KMeansModel

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")

LDA

from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)

# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
    .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
2017-02-10 21:55:58 qq_30843221 阅读数 3076
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34884 人正在学习 去看看 张长志

朴素贝叶斯模型

朴素贝叶斯法是基于贝叶斯定理与特征条件独立假设的分类方法。对于给定的训练数据集,首先基于特征条件独立假设学习输入/输出的联合概率分布;然后基于此模型,对给定的输入x,利用贝叶斯定理求出后验概率最大的输出y。至于朴素贝叶斯模型的原理部分,这里就不讲啦,有疑惑的朋友,我推荐看李航的《统计学习方法》中的第四章。我在这里主要谈论的是基于Java版的spark贝叶斯模型。

应用场景

相对于LR,SVM这类二元分类模型,贝叶斯模型在多元分类模型中显得更有优势的。举一个场景,我们希望能通过用户搜索的关键词来判断用户的兴趣爱好。例如用户搜索的关键词是“萌宠 猫狗”,我们通过贝叶斯模型分析出用户原来对动物是感兴趣的。

实现的过程

1.确定分类类别与训练样本及其特征
假设确定有以下的类别以及部分的特征词:

健康养生:健康养生/预防疾病/健康养生专家/膳食营养/休闲养生/健康资讯/休闲与养生
军事历史:军事历史/武器/坦克/军委主席/人类精神文明/特务/突击队/八路军/四代机/
电影:电影/电影资料库/爱情片/鹰眼/刘亦菲/葛优/动作/影片推荐/惊悚/日韩电影/港台电影/
教育:教育/英语教师/华图教育/作文/公务员培训/211/挂科/雅思/地理/申论/高校广播/
旅游:旅游/旅行游记/游记/观光/爸妈游/旅行爱好者/旅行是找寻自我/国内游/
音乐:/Kugou/钢琴/音樂/网络流行/古筝/Urban/音乐人/翻唱/酷狗/虾米音乐/
摄影:摄影/时尚摄影/时装摄影/Photography/摄影师/专题摄影/摄影/相册/相机/索尼/尼康
萌宠:萌宠/萌宠物/可爱宠物/宠物用品/猫猫狗狗宠物控/猫控/食肉动物/猫咪/喵星人/宠物用品商城/

实际中的项目有25个类别,我将其置于不同的文件中,每个类别大概有500个特征词
这里写图片描述

2.获取标签特征词

List<String> vocabulary = new ArrayList<String>();
        File dir = new File("/home/quincy1994/文档/微脉圈/tags/类别库");
        File[] files = dir.listFiles(); //获取不同类别的标签文件
        StringBuilder sb = new StringBuilder();
        for(File file : files){
            BufferedReader br = new BufferedReader(new FileReader(file));
            String line = null;
            while((line = br.readLine()) != null){
                sb.append(line + "`");  //按“`"分割不同类别的标签
            }
        }
         String[] tags = sb.toString().trim().split("`");
         List<String> newTags = new ArrayList<String>();
         for(String tag: tags){
             if(tag.length() > 4){
                 newTags.add(tag);  //去除空行标签
             }
         }
         Object[] newtags =  newTags.toArray();
         List<Tuple2<Integer, String>> list = new ArrayList<Tuple2<Integer,String>>(); //记录每类中的标签
         for(int i=0; i<newtags.length;i++){
             Tuple2 <Integer, String> classWithTags = new Tuple2<Integer, String>(i, (String)newtags[i]); 
             System.out.println(classWithTags);
             list.add(classWithTags);
             String[] tokens = ((String)newtags[i]).split("/");
             for(String tag: tokens){
                 vocabulary.add(tag);
             }
         }

3.获取训练样本
在获得训练样本的过程中,由于特征维度是上万维。如果为每个样本都申请上万维的向量空间,会导致jvm内存不足。为了解决这样的问题。我将训练样本转变为libsvm文件,而mllib支持libsvm文件的操作。libsvm格式文件为:【label】(空格)【index1】:【value1】(空格)【index2】:【value2】…..
其中【label】是训练数据集的目标值,对于分类,它是标识某类的整数(支持多个类);对于回归,是任意的实数。【index】是以1开始的整数,可以是不连续的;【value】为实数,也就是我们常说的自变量。检验数据文件中的label只用于计算准确度或误差,如果它是未知的,只需用一个数填写这一栏,也可以空看不填。具体的样例如下:

+1 1:0.7 2:1 3:1 4:-0.32
-1 1:0.58 2:-1 3:0.33 4:-0.6

 //获取训练样本
     JavaPairRDD<Integer, String> trainRDD = sc.parallelizePairs(list); //将每类的标签词转化为RDD
     JavaPairRDD<Integer, String> trainSetRDD = trainRDD.mapValues(new ToTrainSet(vocabulary)); //将标签词转化为向量模型
     List<Tuple2<Integer, String>> trainSet = trainSetRDD.collect(); 
     writeTrainSet(trainSet);  //写成libsvm文件格式,以方便训练
     System.out.println("trainset is ok");

static class ToTrainSet implements Function<String, String>{
        List<String> vocabulary = null; //标签特征库
        public ToTrainSet(List<String> vocabulary){
            this.vocabulary = vocabulary;
        }
        public String call(String sentence) throws Exception {
            // TODO Auto-generated method stub
            int length = vocabulary.size();         //特征维度
            String[] tags = sentence.split("/");    
            List<Integer> tagsindex = new ArrayList<Integer>();
            for(int i =0; i<tags.length; i++){
                tagsindex.add(vocabulary.indexOf(tags[i]));
            }
            String vector = "";  //将特征向量转变为String类,节省空间
            for(int i = 0 ; i < length; i++){
                if(tagsindex.contains(i)){
                    vector += String.valueOf(1) + " ";
                }
                else{
                    vector += String.valueOf(0) + " ";
                }
            }
            return vector.trim();
        }
    }

    public static void  writeTrainSet( List<Tuple2<Integer, String>> list) throws Exception{
        File file = new File("./trainset");
        PrintWriter pr = new PrintWriter(new FileWriter(file));
        for(Tuple2<Integer, String> one : list){     //将每个训练样本以libsvm格式保存到trainset文件当中
            String label = String.valueOf(one._1);   //训练样本的类别属性
            String vector = one._2();  //训练样本的向量模型
            String[] indexes = vector.split(" ");
            pr.print(label + " ");
            String value = "";
            for(int i = 0; i<indexes.length;i++){
                value += (i+1) + ":" + indexes[i] + " ";   // i+1是因为libsvm文件的index是从1开始
            }
            pr.print(value.trim());
            pr.println();
        }
        pr.close();
    }

4.读取训练集并训练模型

 String path = "./trainset";
     JavaRDD<LabeledPoint> trainData = MLUtils.loadLibSVMFile(sc.sc(), path).toJavaRDD();
     model = NaiveBayes.train(trainData.rdd(), 1.0); 
//   model.save(sc.sc(), "./model");
     System.out.println("model is ok");

5.预测新的测试集

 String testStr = "萌宠 猫狗 ";
     double[] testArray = sentenceToArrays(vocabulary, testStr);
     writeTestSet(testArray);
     String testPath = "./testset";
     JavaRDD<LabeledPoint> testData = MLUtils.loadLibSVMFile(sc.sc(), testPath).toJavaRDD();

    public static void writeTestSet(double[] testArray) throws Exception {
        //和writeTrainSet一样
        File file = new File("./testset");
        PrintWriter pr = new PrintWriter(new FileWriter(file));
        pr.print("0" + " ");
        String value = "";
        for(int i=0; i<testArray.length; i++){
            value += (i+1) + ":" + testArray[i] + " ";
        }
        pr.print(value.trim());
        pr.close();
    }

6.多元分类预测

JavaRDD<double[]> resultData = testData.map(new GetProbabilities());
     List<double[]> result = resultData.collect(); //保存的是每个测试样本所属于不同类别的概率值
     for(double[] one: result){
         for(int i=0;i<one.length;i++){
             System.out.println("class "+ i + ":" + one[i]);
         }
     }

最终的结果如下:

class 0:0.032182006265154946
class 1:0.0336352243495811
class 2:0.03491449856708539
class 3:0.033205199987016924
class 4:0.034989082254391006
class 5:0.0331936923801072
class 6:0.03519542406951625
class 7:0.14276183106876328(萌宠类最高)
class 8:0.035138968378985495
class 9:0.0320506177571864
class 10:0.034970413943529836
class 11:0.033309038283581525
class 12:0.033930527800123976
class 13:0.03278336996884944
class 14:0.035473397978207644
class 15:0.034846339484132204
class 16:0.0355179245862518
class 17:0.03428401522003527
class 18:0.03556253508239065
class 19:0.03555615701038051
class 20:0.03377058314903299
class 21:0.035026463749860785
class 22:0.03428401522003527
class 23:0.03418761030403304
class 24:0.03456346204880003
class 25:0.0346676010929670

详细的代码,我放置到github当中:
https://www.github.com/Quincy1994/SparkStudy/tree/master/NB_model/MYNaiveBayes.java

2019-05-21 23:01:16 k_wzzc 阅读数 237
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34884 人正在学习 去看看 张长志

Spark实现Canopy聚类算法

为什么需要Canopy算法

Canopy算法一般是为其他聚类算法的一种聚类方法,常用的聚类如 K-means 等聚类算法都需要事先k值(即聚类个数),并且会随机选择k个初始聚类中心。这种实现指定的k和随机初始化的聚类中心不仅会降低聚类算法的效率,而且得到的结果也可能是局部最优的。Canopy算法就可以解决以上问题。

Canopy算法的步骤

Canopy算法的核心步骤如下:

  1. 给定一组待聚类的数据集D;
  2. 给定两个阈值 T1,T2 且T1 > T2;
  3. 随机选择D中的一个数据d作为中心,并将d从D中移除;
  4. 计算D中所有点到d的距离distance;
  5. 将所有distance < T1的点归到以d为中心的canopy1类中;
  6. 将所有distance < T2的点从D中移除;
  7. 重复步骤4-6,直到D为空,形成多个Canopy类。

从Canopy算法实现的步骤来看,Canopy算法的优点明显,同时也存在一定的缺点:就是要事先指定合适的T1 和 T2

Canopy聚类过程如图所示:
Canopy聚类过程

代码实现

本案例以鸢尾花数据集为例

 // 数据加载
    val irisData = spark.read
      .option("header", true)
      .option("inferSchema", true)
      .csv(inputFile)

 ////////////////////////数据预处理//////////////////////////////
    val schema = irisData.schema
    val fts = schema.filterNot(_.name == """class""").map(_.name).toArray

    // 特征处理
    val amountVectorAssembler: VectorAssembler = new VectorAssembler()
      .setInputCols(fts)
      .setOutputCol("features")

    val ftsDF = amountVectorAssembler
      .transform(irisData)
      .select("features")


   // 计算欧式距离
    def Euclidean(p1: Seq[Double], p2: Seq[Double]) = {
      var distant = 0d
      for (i <- 0 until p1.length) {
        distant += pow(p1(i) - p2(i), 2)
      }
      sqrt(distant)
    }

   // Canopy聚类算法过程
   def ConapyRunner(df: DataFrame, t1: Double, t2: Double) = {

      var flag = true
      var cluster = 0 // ClusterId Canopy类的Id

      // 结果表的Schema信息
      val schemaOfResult: StructType = df.schema
        .add("conapyId", IntegerType) //增加一列表示类id的字段

      // 创建一个空DF 用于接收结果
      var resultDF =
        spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schemaOfResult)

      var dfcopy = df

      while (flag) {

        cluster += 1

        // 随机选择一个初始点p 为了方便计算 在这里选择第一个点作为初始点
        val d = dfcopy.head.getAs[DenseVector]("features")

        //定义距离计算的自定义函数(默认为欧式距离)
        val distanceUDF = udf((vec: DenseVector) => {
          val distance = Euclidean(vec.toArray, d.toArray)
          //输出结果模式匹配
          (distance < t1, distance < t2)
        })

        val distanceDF = dfcopy
          .withColumn("distance", distanceUDF($"features"))
          .withColumn("t2threshold", $"distance".getField("_2"))
          .withColumn("t1threshold", $"distance".getField("_1"))

        // 将距离小于t1的输出 同时剔除的还有初始点d
        resultDF = resultDF.union(
          distanceDF
            .where($"t1threshold")
            .select($"features", lit(cluster) as "cluster")
        )

        // 将距离小于T2的点从数据集移除
        dfcopy = distanceDF
          .where(!$"t2threshold")
          .select($"features")

        // 判断df是否已经为空 df为空时 结束循环
        flag = !dfcopy.isEmpty

      }

      // 返回聚类结果集
      resultDF

    }

  // 聚类方法调用
   val dd = ConapyRunner(ftsDF, 4, 1.5)

    dd.show()

结果查看(已省略部分展示结果)

+-----------------+--------+
|         features|conapyId|
+-----------------+--------+
|[5.1,3.5,1.4,0.2]|       1|
|[4.9,3.0,1.4,0.2]|       1|
|[5.6,2.5,3.9,1.1]|       2|
|[5.9,3.2,4.8,1.8]|       2|
+-----------------+--------+

从算法可以得知,使用Canopy聚类时,产生的各类之间可能是有交集的,数据集的Canopy划分完成后,类似于下图:
Canopy聚类结果

参考资料:

https://www.deeplearn.me/1765.html
https://www.cnblogs.com/jamesf/p/4751565.html

2018-09-14 14:51:27 a544258023 阅读数 731
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34884 人正在学习 去看看 张长志

目录

Spark机器学习库简介

K-means聚类算法原理

K-means 实现

运行示例

K值的选择


Spark机器学习库简介

MLlib是Spark的机器学习(ML)库。其目标是使实用的机器学习可扩展且简单。从较高的层面来说,它提供了以下工具:

  •     ML算法:常见的学习算法,如分类,回归,聚类和协同过滤
  •     特征化:特征提取,转换,降维和选择
  •     管道:用于构建,评估和调整ML管道的工具
  •     持久性:保存和加载算法,模型和管道
  •     实用程序:线性代数,统计,数据处理等

使用 Spark 机器学习库来做机器学习工作,可以说是非常的简单,通常只需要在对原始数据进行处理后,然后直接调用相应的 API 就可以实现。但是要想选择合适的算法,高效准确地对数据进行分析,您可能还需要深入了解下算法原理,以及相应 Spark MLlib API 实现的参数的意义。

K-means聚类算法原理

何谓聚类,聚类指的是将数据分类到不同的类或者簇这样的一个过程,所以同一个簇中的对象有很大的相似性,而不同簇间的对象有很大的相异性,聚类与分类的不同在于,聚类所要求划分的类是未知的。

聚类分析是一个无监督学习 (Unsupervised Learning) 过程, 一般是用来对数据对象按照其特征属性进行分组,经常被应用在客户分群,欺诈检测,图像分析等领域。K-means 应该是最有名并且最经常使用的聚类算法了,其原理比较容易理解,并且聚类效果良好,有着广泛的使用。

和诸多机器学习算法一样,K-means 算法也是一个迭代式的算法,其主要步骤如下:

  • 第一步,选择 K 个点作为初始聚类中心。
  • 第二步,计算其余所有点到聚类中心的距离,并把每个点划分到离它最近的聚类中心所在的聚类中去。在这里,衡量距离一般有多个函数可以选择,最常用的是欧几里得距离 (Euclidean Distance), 也叫欧式距离。
  • 第三步,重新计算每个聚类中所有点的平均值,并将其作为新的聚类中心点。
  • 第四步,重复 (二),(三) 步的过程,直至聚类中心不再发生改变,或者算法达到预定的迭代次数,又或聚类中心的改变小于预先设定的阀值。

在实际应用中,K-means 算法有两个不得不面对并且克服的问题。

  • 聚类个数 K 的选择。K 的选择是一个比较有学问和讲究的步骤,我们会在后文专门描述如何使用 Spark 提供的工具选择 K。
  • 初始聚类中心点的选择。选择不同的聚类中心可能导致聚类结果的差异。

Spark MLlib K-means 算法的实现在初始聚类点的选择上,借鉴了一个叫 K-means||的类 K-means++ 实现。K-means++ 算法在初始点选择上遵循一个基本原则: 初始聚类中心点相互之间的距离应该尽可能的远。基本步骤如下:

  • 第一步,从数据集 X 中随机选择一个点作为第一个初始点。
  • 第二步,计算数据集中所有点与最新选择的中心点的距离 D(x)。
  • 第三步,选择下一个中心点,使得最大。
  • 第四部,重复 (二),(三) 步过程,直到 K 个初始点选择完成。

K-means 实现

Spark MLlib 中 K-means 算法的实现类 (KMeans.scala) 具有以下参数,具体如下:
class KMeans private (
    private var k: Int,
    private var maxIterations: Int,
    private var runs: Int,
    private var initializationMode: String,
    private var initializationSteps: Int,
    private var epsilon: Double,
    private var seed: Long) extends Serializable with Logging

  • k 表示期望的聚类的个数。
  • maxInterations 表示方法单次运行最大的迭代次数。
  • runs 表示算法被运行的次数。K-means 算法不保证能返回全局最优的聚类结果,所以在目标数据集上多次跑 K-means 算法,有助于返回最佳聚类结果。
  • initializationMode 表示初始聚类中心点的选择方式, 目前支持随机选择或者 K-means||方式。默认是 K-means||。
  • initializationSteps表示 K-means||方法中的部数。
  • epsilon 表示 K-means 算法迭代收敛的阀值。
  • seed 表示集群初始化时的随机种子。

运行示例

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD

object KmeansSpark {

  def main(args: Array[String]): Unit = {

    //在本地启动Spark
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KmeansSpark")
    val sc = new SparkContext(sparkConf)

    //加载本地文件数据形成RDD
    val data = sc.textFile("file:///root/test.txt")
    val parsedData: RDD[linalg.Vector] = data.map(s=>{
      val values: Array[Double] = s.split(" ").map(x => x.toDouble)
      Vectors.dense(values)
    })

    //聚类中心个数
    val numClusters = 8
    //算法迭代次数
    val numIterations = 20
    //算法运行次数
    val runs = 10
    //KMeans训练
    val kmeansModel = KMeans.train(parsedData, numClusters, numIterations, runs)

    //打印聚类中心ID
    kmeansModel.clusterCenters.foreach(x=>{
     println(x)
    })
    //打印数据归属哪个聚类中心ID
    parsedData.map(v => v.toString + " belong to cluster: " +kmeansModel.predict(v))
    ss.foreach(x=>
      println(x)
    )
    sc.stop()
  
  }
}

K值的选择

前面提到 K 的选择是 K-means 算法的关键,Spark MLlib 在 KMeansModel 类里提供了 computeCost 方法,该方法通过计算所有数据点到其最近的中心点的平方和来评估聚类的效果。一般来说,同样的迭代次数和算法跑的次数,这个值越小代表聚类的效果越好。但是在实际情况下,我们还要考虑到聚类结果的可解释性,不能一味的选择使 computeCost 结果值最小的那个 K。

val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)
ks.foreach(cluster => {
 val model:KMeansModel = KMeans.train(parsedData, cluster,30,1)
 val ssd = model.computeCost(parsedData)
 println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)
})

 

 

在spark上做简单的文本分类

博文 来自: q1w2e3r4470

Spark:聚类算法

阅读数 3811

Spark ML - 聚类算法

阅读数 1292

没有更多推荐了,返回首页