2018-04-11 13:02:38 Sugar_girl 阅读数 1335
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5409 人正在学习 去看看 陆永剑

聚类 - spark.mllib

聚类是一种无监督的学习问题,我们的目标是根据一些相似的概念将实体的子集相互分组。聚类通常用于探索性分析和/或作为分层 监督学习管线(其中针对每个群集训练不同的分类器或回归模型)的组成部分。

该spark.mllib软件包支持以下模型:
K-means
Gaussian mixture
Power iteration clustering (PIC)
Latent Dirichlet allocation (LDA)
Bisecting k-means
Streaming k-means

K-means

K-means是最常用的聚类算法之一,可将数据点聚类到预定义数量的聚类中。该spark.mllib实现包含称为kmeans的k-means++方法的并行变体。
实现中有以下参数:spark.mllib
k是期望的簇的数量。
maxIterations是要运行的最大迭代次数。
initializationModel指定随机初始化或通过k-means 进行初始化。
runs是运行k-means算法的次数(k-means不能保证找到全局最优解,并且在给定数据集上运行多次时,算法返回最佳聚类结果)。
initializationSteps确定k-means算法中的步数。
epsilon决定了我们认为k-means已经收敛的距离阈值。
initialModel是用于初始化的一组可选集群中心。如果提供此参数,则只执行一次运行。

from __future__ import print_function

import sys

import numpy as np
from pyspark.sql import SparkSession


def parseVector(line):
    return np.array([float(x) for x in line.split(',')])


def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex


if __name__ == "__main__":

#    if len(sys.argv) != 4:
#        print("Usage: kmeans <file> <k> <convergeDist>", file=sys.stderr)
#        exit(-1)

    print("""WARN: This is a naive implementation of KMeans Clustering and is given
       as an example! Please refer to examples/src/main/python/ml/kmeans_example.py for an
       example on how to use ML's KMeans implementation.""", file=sys.stderr)

    spark = SparkSession\
        .builder\
        .appName("PythonKMeans")\
        .getOrCreate()
    inputFile = "hdfs://node1:8020/mv_training/training_set.txt"

    lines = spark.read.text(inputFile).rdd
    #lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    #data = lines.map(parseVector).cache()
    data = lines.map(lambda row: row.value.split(","))
    #temp = data
    K = int(sys.argv[2])
    convergeDist = float(sys.argv[3])

    kPoints = data.takeSample(False, K, 1)
    tempDist = 1.0

    while tempDist > convergeDist:
        closest = data.map(
            lambda p: (closestPoint(p, kPoints), (p, 1)))
        pointStats = closest.reduceByKey(
            lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
        newPoints = pointStats.map(
            lambda st: (st[0], st[1][0] / st[1][1])).collect()

        tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)

        for (iK, p) in newPoints:
            kPoints[iK] = p

    print("Final centers: " + str(kPoints))
    closest = data.map(lambda p: (p,closestPoint(np.array(p).astype(int) , np.array(kPoints).astype(int))))
    closest.repartition(1).saveAsTextFile("file:///root/kmeans_result")

最后得到的中心点是kPoints,closest为保存各行数据所属类别后的结果,并保存到文件中。
官网链接:Clustering - spark.mllib

2018-11-11 21:50:23 weixin_41045344 阅读数 212
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5409 人正在学习 去看看 陆永剑

spark 实现K-means算法

package kmeans;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;.
import java.util.Arrays;
import java.util.Iterator;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;


import scala.Tuple2;



public class kmeans{
  static double[][] center = new double[4][2];  //这里有4个中心点,为2维
  static int[] number = new int[4];           //记录属于当前中心点的数据的个数,方便做除法
  static double[][] new_center = new double[4][2];    //计算出来的新中心点
  public static void main(String[] args) {

    // 从文件中读出中心点,并且放入center数组中
    ArrayList<String> arrayList = new ArrayList<String>();
    try {
      File file = new File("/usr/local/hadoop-2.7.3/centers.txt");
      InputStreamReader input = new InputStreamReader(new FileInputStream(file));
      BufferedReader bf = new BufferedReader(input);
      // 按行读取字符串
      String str;
      while ((str = bf.readLine()) != null) {
        arrayList.add(str);
      }
      bf.close();
      input.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
    // 对ArrayList中存储的字符串进行处理 
    for (int i = 0; i < 4; i++) {
      for (int j = 0; j < 2; j++) {
        String s = arrayList.get(i).split(",")[j];
        center[i][j] = Double.parseDouble(s);
      }
    }


    //System.out.println("center+++" + center[3][1]);
    SparkConf conf = new SparkConf().setAppName("kmeans").setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    JavaRDD<String> datas = jsc.textFile("spark/input4/k-means.dat");     //从hdfs上读取data
    
    while(true) {
      for (int i = 0; i< 4;i++)           //注意每次循环都需要将number[i]变为0
      {
        number[i]=0;
      }
      //将data分开,得到key: 属于某个中心点的序号(0/1/2/3),value: 与该中心点的距离
      JavaPairRDD<Integer, Tuple2<Double, Double>> data = datas.mapToPair(new PairFunction<String, Integer,Tuple2<Double, Double>>() {
        private static final long serialVersionUID = 1L;
        @Override
        public Tuple2<Integer,Tuple2<Double, Double>> call(String str) throws Exception {
            final double[][] loc = center;  
            String[] datasplit = str.split(",");    
            double x = Double.parseDouble(datasplit[0]);
            double y = Double.parseDouble(datasplit[1]);
            double minDistance = 99999999;
                  int centerIndex = 0;  
            for(int i = 0;i < 4;i++){
              double itsDistance = (x-loc[i][0])*(x-loc[i][0])+(y-loc[i][1])*(y-loc[i][1]);
              if(itsDistance < minDistance){
                minDistance = itsDistance;
                centerIndex = i; 
              }
            }
            number[centerIndex]++;        //得到属于4个中心点的个数

            return new Tuple2<Integer,Tuple2<Double, Double>>(centerIndex, new Tuple2<Double,Double>(x,y));
             // the center's number & data
        }
      });

          //得到key: 属于某个中心点的序号, value:新中心点的坐标
          JavaPairRDD<Integer, Iterable<Tuple2<Double, Double>>> sum_center = data.groupByKey();
          //System.out.println(sum_center.collect());

           JavaPairRDD<Integer,Tuple2<Double, Double>> Ncenter = sum_center.mapToPair(new PairFunction<Tuple2<Integer, Iterable<Tuple2<Double, Double>>>,Integer,Tuple2<Double, Double>>() {
        private static final long serialVersionUID = 1L;
        @Override
        public Tuple2<Integer, Tuple2<Double, Double>> call(Tuple2<Integer, Iterable<Tuple2<Double, Double>>> a)throws Exception {
          //System.out.println("i am here**********new center******");
          int sum_x = 0;
          int sum_y = 0;
          Iterable<Tuple2<Double, Double>> it = a._2;

          for(Tuple2<Double, Double> i : it) {
            sum_x += i._1;
            sum_y +=i._2;
          }

          double average_x = sum_x / number[a._1];
          double average_y = sum_y/number[a._1];
          //System.out.println("**********new center******"+a._1+" "+average_x+","+average_y);
          return new Tuple2<Integer,Tuple2<Double,Double>>(a._1,new Tuple2<Double,Double>(average_x,average_y));
        } 
      });     


        //将中心点输出  
        Ncenter.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Double,Double>>>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void call(Tuple2<Integer,Tuple2<Double,Double>> t) throws Exception {
          new_center[t._1][0] = t._2()._1;
          new_center[t._1][1] = t._2()._2;
          System.out.println("the new center: "+ t._1+"  "+t._2()._1+" , "+t._2()._2);
        }
        
      });

        //判断新的中心点和原来的中心点是否一样,一样的话退出循环得到结果,不一样的话继续循环(这里可以设置一个迭代次数)
          double distance = 0;
          for(int i=0;i<4;i++) {      
              distance += (center[i][0]-new_center[i][0])*(center[i][0]-new_center[i][0]) + (center[i][1]-new_center[i][1])*(center[i][1]-new_center[i][1]);    
          }
          
          if(distance == 0.0) {
            //finished
            for(int j = 0;j<4;j++) {
              System.out.println("the final center: "+"  "+center[j][0]+" , "+center[j][1]);
            }
            break;
          }
          else {
            for(int i = 0;i<4;i++) {
              center[i][0] = new_center[i][0];
              center[i][1] = new_center[i][1];
              new_center[i][0] = 0;
              new_center[i][1] = 0;
              System.out.println("the new center: "+"  "+center[i][0]+" , "+center[i][1]);
            }
          }
    }
    
  }
 
}

输入:

1. centers.txt :
	96,826
	606,776	
	474,866
	400,768
  1. data.dat:
    存放所有点的坐标存放所有点的坐标。

96,826
606,776
474,866
400,768
2,920
356,766
36,687
-26,824
422,744
145,124
-21,207
154,781
157,872
-66,97
112,126
538,181
2,758
361,152
103,850
5,68
218,249
206,318
604,877
32,327
531,833
559,-17
237,255
485,196
166,784
86,913
-23,220
184,313
175,161
193,999
520,878
505,162
398,108
330,830
528,700
546,877
388,709
102,891
439,863
525,171
470,947
461,757
538,84
43,672
-37,442
534,70
-59,221
196,235
469,317
176,908
62,190
591,211
78,224
623,57
312,167
164,205
451,783
461,-14
309,915
487,848
580,275
16,833
605,991
495,292
219,790
167,889
205,195
224,255
-11,233
99,209
468,932
370,96
509,900
142,83
577,-3
-63,259
584,916
136,789
227,845
100,256
505,688
454,149
486,160
426,292
18,876
534,722
-37,92
574,191
-8,794
51,845
498,107
105,907
55,47
540,668
494,329
448,201
113,191
-8,801
191,582
573,634
117,214
48,235
311,-55
463,684
38,852
74,638
698,68
591,745
553,843
32,201
199,824
491,789
547,308
135,859
128,146
391,791
15,733
449,259
450,280
142,89
128,960
500,204
469,738
500,226
100,902
527,807
535,648
151,394
87,863
373,671
413,223
499,255
443,918
435,779
77,276
121,751
495,197
485,299
428,754
509,-40
361,893
90,192
490,688
44,753
545,867
501,908
528,291
101,937
118,110
52,790
144,711
355,814
580,305
16,788
596,316
57,186
508,713
109,835
518,216
604,757
608,276
727,859
-52,232
497,266
659,748
555,965
113,293
583,611
536,732
568,663
228,807
452,204
83,159
97,296
560,174
461,722
574,787
376,877
14,687
221,715
69,979
580,137
709,637
133,689
72,849
415,214
177,870
513,173
160,179
118,769
352,706
-17,607
490,96
87,425
282,756
592,177
182,-3
-79,338
463,822
87,910
131,791
431,279
687,677
230,634
627,809
127,227
422,302
297,344
101,672
414,342
0,262
464,1027
650,909
591,797
-2,154
208,753
315,734
136,856
402,224
508,76
207,126
528,378
375,226
433,728
67,832
469,120
49,805
382,734
582,104
7,271
556,1008
145,96
623,767
466,272
139,910
446,255
436,-35
463,818
162,266
162,266
73,923
125,838
34,248
368,902
-32,672
473,22
73,189
388,316
133,883
99,228
319,320
474,876
576,252
43,733
89,197
-25,162
173,812
523,841
61,827
457,720
180,238
51,703
120,374
104,788
161,227
469,408
-29,721
105,773
556,217
492,849
411,9
654,387
166,791
470,181
49,124
476,864
148,263
593,193
152,259
130,879
-15,185
639,220
559,249
349,871
451,617
419,220
51,167
-13,193
598,189
452,252
115,1019
134,909
-62,794
109,298
19,749
92,193
469,236
254,156
221,720
528,98
708,683
653,377
47,57
482,806
349,814
547,744
94,765
495,132
101,385
22,180
-44,63
509,121
157,221
560,171
149,124
433,954
138,119
483,912
160,184
436,275
46,663
504,224
69,289
151,228
72,193
574,685
157,862
379,814
452,330
538,180
129,747
60,855
168,265
95,849
155,711
240,203
98,176
175,558
480,771
18,636
79,829
50,148
36,707
568,222
513,260
199,49
564,460
117,869
469,308
381,661
10,193
539,346
83,773
341,288
-19,887
756,178
160,887
20,116
15,777
307,748
56,870
125,240
468,1008
424,841
517,872
198,752
605,721
125,180
65,827
540,657
63,771
314,708
376,289
459,254
89,853
180,285
404,68
451,830
-94,206
545,804
501,856
102,230
574,834
480,855
668,394
548,600
-87,266
149,885
345,237
599,149
239,148
608,963
364,202
69,799
553,232
471,818
154,180



2018-10-22 18:25:15 qq_31032181 阅读数 1050
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5409 人正在学习 去看看 陆永剑

 1.取英文文章数据,训练成模型,就是特征向量,用word2Vec。

 2.然后用这个模型,去将新数据,或者老数据进行分类。

 3.效果非常好的话,会形成类似:体育、游戏、生活、艺术等类别。

 4.所用为最新的spark ml,不是mllib。

 5.中间可以自己加一些去除停用词,结果优化,格式化输出等。

    val conf = new SparkConf().setMaster("local[2]").setAppName("word2vec_test03")
    var sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val path = "D:\\part-00000"
    val rdd = sc.textFile(path)
//    import scala.collection.JavaConverters._
//    val stopWords = sc.textFile("stop_words_eng.txt").collect().toSeq.asJava
//    val filter = new StopRecognition().insertStopWords(stopWords)
//    filter.insertStopNatures("w", null, "null")

    val splitWordRdd = rdd.map(_.split(",")).filter(_.size>1).map(x => {
      (x(0), x(1))
    })
    //splitWordRdd.foreach(println)

    val df = sqlContext.createDataFrame(splitWordRdd).toDF("userID", "words")
    df.show()

    df.createOrReplaceTempView("input_data")
    //    df.show()
    //sqlContext.udf.register("strLen", (str: String) => str.toString.replaceAll("\\]_\\[",", "))
    val result_valuse = sqlContext.sql("select userID, concat_ws(' ',collect_set(cast(words as string))) as values from input_data group by userID")
    val df2 = result_valuse.toDF()
    df2.show()

    //var z:Array[String] = new Array[String](3)
    val splitWordRdd2 = df2.rdd.map(file => {
      (file(0).toString, file(1).toString.replaceAll(" ",",").split(","))
    })

    val df3 = sqlContext.createDataFrame(splitWordRdd2).toDF("userID", "words")
    df3.rdd.foreach(println)

    // Learn a mapping from words to Vectors.
    val word2Vec = new Word2Vec()
      .setInputCol("words")
      .setOutputCol("result")
      .setVectorSize(200)
      .setMinCount(2)
    val model = word2Vec.fit(df3)
//    val synonyms = model.findSynonyms("black", 10)
//
//    synonyms.show()
    val result = model.transform(df)
    val vocs = model.getVectors
    val vocs2 = vocs.withColumnRenamed( "vector" , "features" )
    //vocs.rdd.saveAsTextFile("D:\\soft\\IDEA\\data\\input\\aaa")
//
//    val kmeans = new KMeans()
//      .setK(20)
//      .setFeaturesCol("features")
//      .setPredictionCol("prediction")
//    val model2 = kmeans.fit(vocs2)
//    val predictions = model2.transform(vocs2)
//    predictions.show()


    ///predictions.rdd.saveAsTextFile("D:\\soft\\IDEA\\data\\input\\aaa")
//    predictions.createOrReplaceTempView("table_re")
//    val result1 = sqlContext.sql("select prediction,concat_ws(',',collect_set(cast(word as string))) as words from table_re group by prediction")
//    val result2 = result1.explode( "words" , "words_" ){words: String => words.split( "," )}
//    val result3 = result2.select("prediction","words_")
    //result1.rdd.foreach(println)
    //result1.rdd.repartition(1).saveAsTextFile("D:\\soft\\IDEA\\data\\input\\user_data_re")
    //result3.write.partitionBy("prediction").text("D:\\soft\\IDEA\\data\\input\\user_data_re")



//    val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)
//    ks.foreach(cluster => {
//      val kmeans = new KMeans()
//          .setK(cluster)
//          .setFeaturesCol("features")
//          .setPredictionCol("prediction")
//      val model2 = kmeans.fit(vocs2)
//      val ssd = model2.computeCost(vocs2)
//      println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd)
//    })

//    // 计算聚类的中心点
//    println("Cluster Centers: ")
//    model.clusterCenters.foreach(println)

 有兴趣可以加我的大数据、数据分析、爬虫群:
《453908562》

2017-03-08 20:47:42 u013939918 阅读数 1965
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5409 人正在学习 去看看 陆永剑
性能调优:


并行度调节


性能调优首先是增加资源,增加Application对应的executor的数量,增加executor里面的cpu core,然后
增加executor里面的内存大小!


这节课也是非常重要的,因为分配完你所能分配的最大资源了!然后对应你的资源调节你程序的并行度!


Spark并行度指的是什么?
Spark作业,Application,Jobs,action(collect)触发一个job,1个job;每个job拆成多个stage,
发生shuffle的时候,会拆分出一个stage,reduceByKey;


stage0
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)


stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()


reduceByKey,stage0的task,在最后,执行到reduceByKey的时候,会为每个stage1的task,
都创建一份文件(也可能是合并在少量的文件里面);每个stage1的task,会去各个节点上的各个
task创建的属于自己的那一份文件里面,拉取数据;每个stage1的task,拉取到的数据,
一定是相同key对应的数据。对相同的key,对应的values,才能去执行我们自定义的function操作(_ + _)




并行度:其实就是指的是,Spark作业中,各个stage的task数量,也就代表了Spark作业的在各个阶段
(stage)的并行度。


如果不调节并行度,导致并行度过低,会怎么样?


假设,现在已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor,
每个executor有10G内存,每个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。


task没有设置,或者设置的很少,比如就设置了,100个task。50个executor,每个executor有3个
cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,
可以并行运行。但是你现在,只有100个task,平均分配一下,每个executor分配到2个task,ok,
那么同时在运行的task,只有100个,每个executor只会并行运行2个task。每个executor剩下的一个
cpu core,就浪费掉了。


你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。
合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,
总共集群有150个cpu core,可以并行运行150个task。那么就应该将你的Application的并行度,
至少设置成150,才能完全有效的利用你的集群资源,让150个task,并行执行;而且task增加到150个以后,
即可以同时并行运行,还可以让每个task要处理的数据量变少;比如总共150G的数据要处理,
如果是100个task,每个task计算1.5G的数据;现在增加到150个task,可以并行运行,
而且每个task主要处理1G的数据就可以。


很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,
并且减少每个task要处理的数据量,最终,就是提升你的整个Spark作业的性能和运行速度。


1、task数量,至少设置成与Spark application的总cpu core数量相同(最理想情况,比如总共150个
cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)


2、官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,比如150个cpu core,
基本要设置task数量为300~500;
实际情况,与理想情况不同的,有些task会运行的快一点,比如50s就完了,有些task,可能会慢一点,
要1分半才运行完,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费,
因为,比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,
就导致了浪费。那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,
另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,
提升性能。


3、如何设置一个Spark Application的并行度?
spark.default.parallelism 
SparkConf conf = new SparkConf()
  .set("spark.default.parallelism", "500")
2016-06-14 15:26:05 vfgbv 阅读数 3450
  • 从零开始学习机器学习视频教程

    零基础入门机器学习视频培训课程概况:机器学习数学基础、Python基础、机器学习算法(线性回归、逻辑回归、聚类算法、EM算法),机器学习项目实战(Kmeans篮球数据分析、贝叶斯算法训练)、推荐算法、项目实战。  任务作业:很多人都喜欢看NBA,也喜欢拿实力相近的球员进行比较,你能利用机器学习的方式进行分析吗?动手的机会来了!请 结合课程【项目实战】章节中的【Kmeans篮球数据分类】。从NBA网站中随机拿到30名篮球运动员的得分和助攻(尽量数据间隔较大)。用python对数据进行处理(换算成每分钟的得分和助攻)。然后用Kmeans对获取的球员进行分类。看看自己心仪的球员属于哪一类~  (温馨提示: 注意 作业需写在CSDN博客中,请把作业链接贴在评论区,老师会定期逐个批改~~)

    5409 人正在学习 去看看 陆永剑

Spark性能优化指南——高级篇

转载:http://tech.meituan.com/spark-tuning-pro.html

前言

基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。

数据倾斜调优

调优概述

有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

数据倾斜发生时的现象

  • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。

  • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

数据倾斜发生的原理

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。

数据倾斜原理

如何定位导致数据倾斜的代码

数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中。

如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里我们可以介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

这里我们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大致推算出一个stage对应的代码。如下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,因此就可以认为,以这个算子为界限,会划分出前后两个stage。

  • stage0,主要是执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,我们可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内。
  • stage1,主要是执行从reduceByKey到collect操作,stage1的各个task一开始运行,就会首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。
val conf = new SparkConf()
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.collect().foreach(println(_))

通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就可以确定是由educeByKey算子导致的数据倾斜问题。比如某个单词出现了100万次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

某个task莫名其妙内存溢出的情况

这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。

但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

查看导致数据倾斜的key的数据分布情况

知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。

此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:

  1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
  2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

举例来说,对于上面所说的单词计数程序,如果确定了是stage1的reduceByKey算子导致了数据倾斜,那么就应该看看进行reduceByKey操作的RDD中的key分布情况,在这个例子中指的就是pairs RDD。如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

数据倾斜的解决方案

解决方案一:使用Hive ETL预处理数据

方案适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。

项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。

解决方案二:过滤少数导致倾斜的key

方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

解决方案三:提高shuffle操作的并行度

方案适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。

方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。

方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

解决方案四:两阶段聚合(局部聚合+全局聚合)

方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });

// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

解决方案五:将reduce join转为map join

方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。

方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 获取当前RDD数据的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 从rdd1数据Map中,根据key获取到可以join到的数据。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });

// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。

解决方案六:采样倾斜key并分拆join操作

方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

  • 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
  • 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
  • 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
  • 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
  • 而另外两个普通的RDD就照常join即可。
  • 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });

// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }

        });

// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });

// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

解决方案七:使用随机前缀和扩容RDD进行join

方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

  • 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
  • 然后将该RDD的每条数据都打上一个n以内的随机前缀。
  • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
  • 最后将两个处理后的RDD进行join即可。

方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

方案实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。

// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });

// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });

// 将两个处理后的RDD进行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

解决方案八:多种方案组合使用

在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

shuffle调优

调优概述

大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

ShuffleManager发展概述

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理。

HashShuffleManager运行原理

未经优化的HashShuffleManager

下图说明了未经优化的HashShuffleManager的原理。这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

优化后的HashShuffleManager

下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

SortShuffleManager运行原理

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

bypass运行机制

下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  • 不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

shuffle相关参数调优

以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

写在最后的话

本文分别讲解了开发过程中的优化原则、运行前的资源参数设置调优、运行中的数据倾斜的解决方案、为了精益求精的shuffle调优。希望大家能够在阅读本文之后,记住这些性能调优的原则以及方案,在Spark作业开发、测试以及运行的过程中多尝试,只有这样,我们才能开发出更优的Spark作业,不断提升其性能。

没有更多推荐了,返回首页