2016-11-27 01:02:46 u014252563 阅读数 2931
  • Kaggle 神器:XGBoost 从基础到实战

    主讲老师冒老师为计算机博士,现在中科院从事科研教学工作,十余年机器学习教学经验,主持国家级科研项目3项,研究方向为机器学习、计算机视觉及多媒体处理。 XGBoost是"极端梯度提升"(eXtreme Gradient Boosting)的简称。XGBoost源于梯度提升框架,但是能并行计算、近似建树、对稀疏数据的有效处理以及内存使用优化,这使得XGBoost至少比现有梯度提升实现有至少10倍的速度提升。XGBoost可以处理回归、分类和排序等多种任务。由于它在预测性能上的强大且训练速度快,XGBoost已屡屡斩获Kaggle各大竞赛的冠军宝座。

    39272 人正在学习 去看看 AI100讲师

一、关于spark ml pipeline与机器学习


一个典型的机器学习构建包含若干个过程
1、源数据ETL
2、数据预处理
3、特征选取
4、模型训练与验证
以上四个步骤可以抽象为一个包括多个步骤的流水线式工作,从数据收集开始至输出我们需要的最终结果。因此,对以上多个步骤、进行抽象建模,简化为流水线式工作流程则存在着可行性,对利用spark进行机器学习的用户来说,流水线式机器学习比单个步骤独立建模更加高效、易用。
受 scikit-learn 项目的启发,并且总结了MLlib在处理复杂机器学习问题的弊端(主要为工作繁杂,流程不清晰),旨在向用户提供基于DataFrame 之上的更加高层次的 API 库,以更加方便的构建复杂的机器学习工作流式应用。一个pipeline 在结构上会包含一个或多个Stage,每一个 Stage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等,这样的Stage 在 ML 里按照处理问题类型的不同都有相应的定义和实现。两个主要的stage为Transformer和Estimator。Transformer主要是用来操作一个DataFrame 数据并生成另外一个DataFrame 数据,比如svm模型、一个特征提取工具,都可以抽象为一个Transformer。Estimator 则主要是用来做模型拟合用的,用来生成一个Transformer。可能这样说比较难以理解,下面就以一个完整的机器学习案例来说明spark ml pipeline是怎么构建机器学习工作流的。


二、使用spark ml pipeline构建机器学习工作流

在此以Kaggle数据竞赛Display Advertising Challenge的数据集(该数据集为利用用户特征进行广告点击预测)开始,利用spark ml pipeline构建一个完整的机器学习工作流程。
Display Advertising Challenge的这份数据本身就不多做介绍了,主要包括3部分,numerical型特征集、Categorical类型特征集、类标签。

首先,读入样本集,并将样本集划分为训练集与测试集:

 //使用file标记文件路径,允许spark读取本地文件
        String fileReadPath = "file:\\D:\\dac_sample\\dac_sample.txt";
        //使用textFile读入数据
        SparkContext sc = Contexts.sparkContext;
        RDD<String> file = sc.textFile(fileReadPath,1);
        JavaRDD<String> sparkContent = file.toJavaRDD();
        JavaRDD<Row> sampleRow = sparkContent.map(new Function<String, Row>() {
            public Row call(String string) {
                String tempStr = string.replace("\t",",");
                String[] features = tempStr.split(",");
                int intLable= Integer.parseInt(features[0]);
                String intFeature1  = features[1];
                String intFeature2  = features[2];                String CatFeature1 = features[14];
                String CatFeature2 = features[15];
                return RowFactory.create(intLable, intFeature1, intFeature2, CatFeature1, CatFeature2);
            }
        });


        double[] weights = {0.8, 0.2};
        Long seed = 42L;
        JavaRDD<Row>[] sampleRows = sampleRow.randomSplit(weights,seed);

得到样本集后,构建出 DataFrame格式的数据供spark ml pipeline使用:

 List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("lable", DataTypes.IntegerType, false));
        fields.add(DataTypes.createStructField("intFeature1", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("intFeature2", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("CatFeature1", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("CatFeature2", DataTypes.StringType, true));
        //and so on


        StructType schema = DataTypes.createStructType(fields);
        DataFrame dfTrain = Contexts.hiveContext.createDataFrame(sampleRows[0], schema);//训练数据
        dfTrain.registerTempTable("tmpTable1");
        DataFrame dfTest = Contexts.hiveContext.createDataFrame(sampleRows[1], schema);//测试数据
        dfTest.registerTempTable("tmpTable2");

由于在dfTrain、dfTest中所有的特征目前都为string类型,而机器学习则要求其特征为numerical类型,在此需要对特征做转换,包括类型转换和缺失值的处理。
首先,将intFeature由string转为double,cast()方法将表中指定列string类型转换为double类型,并生成新列并命名为intFeature1Temp,
之后,需要删除原来的数据列 并将新列重命名为intFeature1,这样,就将string类型的特征转换得到double类型的特征了。

  //Cast integer features from String to Double
       dfTest = dfTest.withColumn("intFeature1Temp",dfTest.col("intFeature1").cast("double"));
       dfTest = dfTest.drop("intFeature1").withColumnRenamed("intFeature1Temp","intFeature1");

如果intFeature特征是年龄或者特征等类型,则需要进行分箱操作,将一个特征按照指定范围进行划分:

 /*特征转换,部分特征需要进行分箱,比如年龄,进行分段成成年未成年等 */
        double[] splitV = {0.0,16.0,Double.MAX_VALUE};
        Bucketizer bucketizer = new Bucketizer().setInputCol("").setOutputCol("").setSplits(splitV);

再次,需要将categorical 类型的特征转换为numerical类型。主要包括两个步骤,缺失值处理和编码转换。
缺失值处理方面,可以使用全局的NA来统一标记缺失值:

 /*将categoricalb类型的变量的缺失值使用NA值填充*/
        String[] strCols = {"CatFeature1","CatFeature2"};
        dfTrain = dfTrain.na().fill("NA",strCols);
        dfTest = dfTest.na().fill("NA",strCols);

缺失值处理完成之后,就可以正式的对categorical类型的特征进行numerical转换了。在spark ml中,可以借助StringIndexer和oneHotEncoder完成
这一任务:

 // StringIndexer  oneHotEncoder 将 categorical变量转换为 numerical 变量
        // 如某列特征为星期几、天气等等特征,则转换为七个0-1特征
        StringIndexer cat1Index = new StringIndexer().setInputCol("CatFeature1").setOutputCol("indexedCat1").setHandleInvalid("skip");
        OneHotEncoder cat1Encoder = new OneHotEncoder().setInputCol(cat1Index.getOutputCol()).setOutputCol("CatVector1");
        StringIndexer cat2Index = new StringIndexer().setInputCol("CatFeature2").setOutputCol("indexedCat2");
        OneHotEncoder cat2Encoder = new OneHotEncoder().setInputCol(cat2Index.getOutputCol()).setOutputCol("CatVector2");

至此,特征预处理步骤基本完成了。由于上述特征都是处于单独的列并且列名独立,为方便后续模型进行特征输入,需要将其转换为特征向量,并统一命名,
可以使用VectorAssembler类完成这一任务:

 /*转换为特征向量*/
        String[] vectorAsCols = {"intFeature1","intFeature2","CatVector1","CatVector2"};
        VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature");

通常,预处理之后获得的特征有成千上万维,出于去除冗余特征、消除维数灾难、提高模型质量的考虑,需要进行选择。在此,使用卡方检验方法,
利用特征与类标签之间的相关性,进行特征选取:

 /*特征较多时,使用卡方检验进行特征选择,主要是考察特征与类标签的相关性*/
        ChiSqSelector chiSqSelector = new ChiSqSelector().setFeaturesCol("vectorFeature").setLabelCol("label").setNumTopFeatures(10)
                .setOutputCol("selectedFeature");

在特征预处理和特征选取完成之后,就可以定义模型及其参数了。简单期间,在此使用LogisticRegression模型,并设定最大迭代次数、正则化项:

 /* 设置最大迭代次数和正则化参数 setElasticNetParam=0.0 为L2正则化 setElasticNetParam=1.0为L1正则化*/
        /*设置特征向量的列名,标签的列名*/
        LogisticRegression logModel = new LogisticRegression().setMaxIter(100).setRegParam(0.1).setElasticNetParam(0.0)
                .setFeaturesCol("selectedFeature").setLabelCol("lable");

在上述准备步骤完成之后,就可以开始定义pipeline并进行模型的学习了:

  /*将特征转换,特征聚合,模型等组成一个管道,并调用它的fit方法拟合出模型*/
        PipelineStage[] pipelineStage = {cat1Index,cat2Index,cat1Encoder,cat2Encoder,vectorAssembler,logModel};
        Pipeline pipline = new Pipeline().setStages(pipelineStage);
        PipelineModel pModle = pipline.fit(dfTrain);

上面pipeline的fit方法得到的是一个Transformer,我们可以使它作用于训练集得到模型在训练集上的预测结果:

//拟合得到模型的transform方法进行预测
        DataFrame output = pModle.transform(dfTest).select("selectedFeature", "label", "prediction", "rawPrediction", "probability");
        DataFrame prediction = output.select("label", "prediction");
        prediction.show();

分析计算,得到模型在训练集上的准确率,看看模型的效果怎么样:

/*测试集合上的准确率*/
        long correct = prediction.filter(prediction.col("label").equalTo(prediction.col("'prediction"))).count();
        long total = prediction.count();
        double accuracy = correct / (double)total;

        System.out.println(accuracy);

最后,可以将模型保存下来,下次直接使用就可以了:

String pModlePath = ""file:\\D:\\dac_sample\\";
        pModle.save(pModlePath);

三,梳理和总结:


上述,借助代码实现了基于spark ml pipeline的机器学习,包括数据转换、特征生成、特征选取、模型定义及模型学习等多个stage,得到的pipeline
模型后,就可以在新的数据集上进行预测,总结为两部分并用流程图表示如下:
训练阶段:

预测阶段:

借助于Pepeline,在spark上进行机器学习的数据流向更加清晰,同时每一stage的任务也更加明了,因此,无论是在模型的预测使用上、还是
模型后续的改进优化上,都变得更加容易。

2016-09-19 14:16:34 Richard_More 阅读数 1559
  • Kaggle 神器:XGBoost 从基础到实战

    主讲老师冒老师为计算机博士,现在中科院从事科研教学工作,十余年机器学习教学经验,主持国家级科研项目3项,研究方向为机器学习、计算机视觉及多媒体处理。 XGBoost是"极端梯度提升"(eXtreme Gradient Boosting)的简称。XGBoost源于梯度提升框架,但是能并行计算、近似建树、对稀疏数据的有效处理以及内存使用优化,这使得XGBoost至少比现有梯度提升实现有至少10倍的速度提升。XGBoost可以处理回归、分类和排序等多种任务。由于它在预测性能上的强大且训练速度快,XGBoost已屡屡斩获Kaggle各大竞赛的冠军宝座。

    39272 人正在学习 去看看 AI100讲师


1. 下载数据,并写入hdfs中

miaofu@master:~$ hadoop fs -ls /user/miaofu/covtype
-rw-r--r--   2 miaofu supergroup   75169317 2016-09-17 23:20 /user/miaofu/covtype

2. 启动spark集群

miaofu@master:~/spark-1.6.2-bin-hadoop2.6$ jps
6649 ResourceManager
10821 Worker
2434 NameNode
2680 DataNode
2938 SecondaryNameNode
31714 SparkSubmit
10705 Master
32000 Jps
6786 NodeManager

3. 进入spark shell

miaofu@master:~/spark-1.6.2-bin-hadoop2.6$ bin/spark-shell --master spark://master:7077
16/09/19 13:19:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.2
      /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.7.0_95)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/09/19 13:19:30 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/09/19 13:19:30 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/09/19 13:19:37 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/09/19 13:19:37 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/09/19 13:19:40 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/09/19 13:19:40 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
SQL context available as sqlContext.

4. 读入数据,并简单分析

scala> val rawData = sc.textFile("hdfs:////user/miaofu/covtype")
rawData: org.apache.spark.rdd.RDD[String] = hdfs:////user/miaofu/covtype MapPartitionsRDD[1] at textFile at <console>:27

scala> rawData.counts()
<console>:30: error: value counts is not a member of org.apache.spark.rdd.RDD[String]
              rawData.counts()
                      ^

scala> rawData.count()
res1: Long = 581012  
scala> var line = rawData.take(4)(1)
line: String = 2590,56,2,212,-6,390,220,235,151,6225,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5

scala> line
res3: String = 2590,56,2,212,-6,390,220,235,151,6225,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,5

scala> val values = line.split(",").map(_.toDouble)
values: Array[Double] = Array(2590.0, 56.0, 2.0, 212.0, -6.0, 390.0, 220.0, 235.0, 151.0, 6225.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.0)

scala> values
res4: Array[Double] = Array(2590.0, 56.0, 2.0, 212.0, -6.0, 390.0, 220.0, 235.0, 151.0, 6225.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.0)

scala> values.init
res5: Array[Double] = Array(2590.0, 56.0, 2.0, 212.0, -6.0, 390.0, 220.0, 235.0, 151.0, 6225.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

scala> values.last
res6: Double = 5.0

5. 构建训练,测试,验证集

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression._

scala> val data1 = rawData.map{ line =>
     | line.split(",").map(_.toDouble)
     | }
data1: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[4] at map at <console>:37

scala> data1
res16: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[4] at map at <console>:37

scala> val data2 = data1.map{ v=>
     | LabeledPoint(v.last-1,Vectors.dense(v.init))
     | }
data2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[5] at map at <console>:39




scala> val Array(trainData,cvData,testData) = 
     | data2.randomSplit(Array(0.8,0.1,0.1))
trainData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[9] at randomSplit at <console>:63
cvData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[10] at randomSplit at <console>:63
testData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[11] at randomSplit at <console>:63

scala> trainData.cache()
res27: trainData.type = MapPartitionsRDD[9] at randomSplit at <console>:63

scala> cvData.cache()
res28: cvData.type = MapPartitionsRDD[10] at randomSplit at <console>:63

scala> testData.cache()
res29: testData.type = MapPartitionsRDD[11] at randomSplit at <console>:63


6. 写一个评估模型的函数

scala> import org.apache.spark.mllib.evaluation._
import org.apache.spark.mllib.evaluation._

scala> import org.apache.spark.mllib.tree._
import org.apache.spark.mllib.tree._

scala> import org.apache.spark.mllib.tree.model._
import org.apache.spark.mllib.tree.model._
import org.apache.spark.rdd._
scala> def getMetrics(model:DecisionTreeModel,data:RDD[LabeledPoint]):
     | MulticlassMetrics = {
     | val predictionsAndLabels = data.map( e =>
     | (model.predict(e.features),e.label)
     | )
     | new MulticlassMetrics(predictionsAndLabels)
     | }
getMetrics: (model: org.apache.spark.mllib.tree.model.DecisionTreeModel, data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])org.apache.spark.mllib.evaluation.MulticlassMetrics

7. 模型训练与测试

scala> val model = DecisionTree.trainClassifier(
     | trainData,7,Map[Int,Int](),"gini",4,100 )
model: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 4 with 31 nodes

scala> model
res30: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 4 with 31 nodes

scala> val metrics = getMetrics(model,cvData)
metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@2c181731

scala> metrics.confusionMatrix
                                                            def confusionMatrix: org.apache.spark.mllib.linalg.Matrix   

scala> metrics.confusionMatrix
                                                            def confusionMatrix: org.apache.spark.mllib.linalg.Matrix   

scala> metrics.confusionMatrix
res31: org.apache.spark.mllib.linalg.Matrix =                                   
14260.0  6593.0   7.0     0.0    0.0   0.0  340.0  
5485.0   22277.0  483.0   20.0   3.0   0.0  38.0   
0.0      443.0    3042.0  82.0   0.0   0.0  0.0    
0.0      0.0      169.0   104.0  0.0   0.0  0.0    
0.0      864.0    27.0    0.0    14.0  0.0  0.0    
0.0      440.0    1168.0  100.0  0.0   0.0  0.0    
1101.0   26.0     0.0     0.0    0.0   0.0  927.0  

scala> metrics.precision
res32: Double = 0.7002568389843655

scala> metrics.
asInstanceOf                confusionMatrix             fMeasure                    falsePositiveRate           isInstanceOf                labels                      
precision                   recall                      toString                    truePositiveRate            weightedFMeasure            weightedFalsePositiveRate   
weightedPrecision           weightedRecall              weightedTruePositiveRate    

scala> metrics.
asInstanceOf                confusionMatrix             fMeasure                    falsePositiveRate           isInstanceOf                labels                      
precision                   recall                      toString                    truePositiveRate            weightedFMeasure            weightedFalsePositiveRate   
weightedPrecision           weightedRecall              weightedTruePositiveRate    

scala> metrics.
asInstanceOf                confusionMatrix             fMeasure                    falsePositiveRate           isInstanceOf                labels                      
precision                   recall                      toString                    truePositiveRate            weightedFMeasure            weightedFalsePositiveRate   
weightedPrecision           weightedRecall              weightedTruePositiveRate    

scala> metrics.
asInstanceOf                confusionMatrix             fMeasure                    falsePositiveRate           isInstanceOf                labels                      
precision                   recall                      toString                    truePositiveRate            weightedFMeasure            weightedFalsePositiveRate   
weightedPrecision           weightedRecall              weightedTruePositiveRate    

scala> metrics.recall
res33: Double = 0.7002568389843655



8. web UI



2016-05-11 22:40:40 LXYTSOS 阅读数 9932
  • Kaggle 神器:XGBoost 从基础到实战

    主讲老师冒老师为计算机博士,现在中科院从事科研教学工作,十余年机器学习教学经验,主持国家级科研项目3项,研究方向为机器学习、计算机视觉及多媒体处理。 XGBoost是"极端梯度提升"(eXtreme Gradient Boosting)的简称。XGBoost源于梯度提升框架,但是能并行计算、近似建树、对稀疏数据的有效处理以及内存使用优化,这使得XGBoost至少比现有梯度提升实现有至少10倍的速度提升。XGBoost可以处理回归、分类和排序等多种任务。由于它在预测性能上的强大且训练速度快,XGBoost已屡屡斩获Kaggle各大竞赛的冠军宝座。

    39272 人正在学习 去看看 AI100讲师

昨天我在Kaggle上下载了一份用于手写数字识别的数据集,想通过最近学习到的一些方法来训练一个模型进行手写数字识别。这些数据集是从28×28像素大小的手写数字灰度图像中得来,其中训练数据第一个元素是具体的手写数字,剩下的784个元素是手写数字灰度图像每个像素的灰度值,范围为[0,255],测试数据则没有训练数据中的第一个元素,只包含784个灰度值。现在我打算使用Spark MLlib中提供的朴素贝叶斯算法来训练模型。

首先来设定Spark上下文的一些参数:

val conf = new SparkConf()
    .setAppName("DigitRecgonizer")
    .setMaster("local[*]")
    .set("spark.driver.memory", "10G")
val sc = new SparkContext(conf)

这样Spark上下文已经创建完毕了,那么现在来读取训练数据吧,在这里我把原本的训练数据的header去掉了,只保留了数据部,训练数据是以csv格式保存的:

val rawData = sc.textFile("file://path/train-noheader.csv")

由于数据是csv格式,所以接下来用“,”将每行数据转换成数组:

val records = rawData.map(line => line.split(","))

下面需要将这些数据处理成朴素贝叶斯能够接受的数据类型LabeledPoint ,此类型接收两个参数,第一个参数是label(标签,在这里就是具体的手写数字),第二个参数是features (特征向量,在这里是784个灰度值):

    val records = rawData.map(line => line.split(","))
    val data = records.map{ r =>
      val label = r(0).toInt
      val features = r.slice(1, r.size).map(p => p.toDouble)
      LabeledPoint(label, Vectors.dense(features))
    }

现在已经把数据都准备好了,可以开始训练模型了,在MLlib中,只需要简单地调用train 方法就能完成模型的训练:

val nbModel = NaiveBayes.train(data)

现在已经训练出了一个模型,我们看看它在训练数据集上的准确率如何,在这里我将训练数据集的特征传给模型进行训练,将得到的结果与真实的结果进行对比,然后统计出正确的条数,以此来评估模型的准确率,这应该也算是一种交叉验证吧:

    val nbTotalCorrect = data.map { point =>
      if (nbModel.predict(point.features) == point.label) 1 else 0
    }.sum
    val numData = data.count()
    val nbAccuracy = nbTotalCorrect / numData

运行完这段代码,我得到的准确率是0.8261190476190476

下面开始对测试数据进行识别了,首先读入测试数据:

val unlabeledData = sc.textFile("file://path/test-noheader.csv")

再用与之前同样的方式进行预处理:

val unlabeledRecords = unlabeledData.map(line => line.split(","))
val features = unlabeledRecords.map{ r =>
  val f = r.map(p => p.toDouble)
  Vectors.dense(f)
}

注意,测试数据中没有标签,所以将它所有数值都作为特征features

现在开始对测试数据进行识别,并把结果保存为文件:

    val predictions = nbModel.predict(features).map(p => p.toInt)
    predictions.repartition(1).saveAsTextFile("file://path/digitRec.txt")

到这里所有工作都完成了,之后我把计算出来的结果上传到Kaggle上,发现准确率在0.83左右,与我之前在训练数据集上得到的评估结果相近。

今天就到这里吧,以后可能还会寻找其他的方式来训练模型,看看效果如何。

2015-12-23 23:01:00 smarthhl 阅读数 41
  • Kaggle 神器:XGBoost 从基础到实战

    主讲老师冒老师为计算机博士,现在中科院从事科研教学工作,十余年机器学习教学经验,主持国家级科研项目3项,研究方向为机器学习、计算机视觉及多媒体处理。 XGBoost是"极端梯度提升"(eXtreme Gradient Boosting)的简称。XGBoost源于梯度提升框架,但是能并行计算、近似建树、对稀疏数据的有效处理以及内存使用优化,这使得XGBoost至少比现有梯度提升实现有至少10倍的速度提升。XGBoost可以处理回归、分类和排序等多种任务。由于它在预测性能上的强大且训练速度快,XGBoost已屡屡斩获Kaggle各大竞赛的冠军宝座。

    39272 人正在学习 去看看 AI100讲师

作者:张天雷

摘自:InfoQ

如何利用高性能分布式计算平台来解决现实问题一直是人们所关心的话题。近期,comSysto公司的Danial Bartl就分享了该公司研发团队利用Spark平台解决Kaggle竞赛问题的经历,为Spark等平台应用于数据科学领域提供了借鉴。

Danial提到,comSysto公司经常会举行一些讨论会,来评估未来的技术和共享以往的经验。在近期,大数据分析类的众包平台Kaggle的一道数据科学的挑战赛引起了他们的注意。

该挑战赛的内容十分有趣:AXA提供了一个包含5万个匿名驾驶员线路的数据集。本次竞赛的目的是根据路线研发出一个驾驶类型的算法类签名,来表征驾驶员的特征。例如,驾驶员是否长距离驾驶?短距离驾驶?高速驾驶?回头路?是否从某些站点急剧加速?是否高速转弯?所有这些问题的答案形成了表征驾驶员特征的独特标签。

面对此挑战,comSysto公司的团队想到了涵盖批处理、流数据、机器学习、图处理、SQL查询以及交互式定制分析等多种处理模型的Spark平台。他们正好以此挑战赛为契机来增强Spark方面的经验。为了对数据集进行分析并控制投入成本,他们搭建了一个包含只三个节点的集群——每个节点包含一个八核的i7处理器和16GB的内存。集群运行了携带Spark库的MR,可以有效存储运算的中间结果。接下来,本文就从数据分析、机器学习和结果等三个方面介绍comSysto团队解决以上问题的过程。

数据分析

作为解决问题的第一个步骤,数据分析起着非常关键的作用。然而,出乎comSysto公司团队意料的是,竞赛提供的原始数据非常简单。该数据集只包含了线路的若干匿名坐标对(x,y),如(1.3,4.4)、(2.1,4.8)和(2.9,5.2)等。如下图所示,驾驶员会在每条线路中出发并返回到原点(0,0),然后从原点挑选随机方向再出发,形成多个折返的路线。

拿到数据后,comSysto公司的团队有些气馁:只看坐标很难表征一个驾驶员吧?!

信息指纹的定义

因此,在原始数据如此简单的情况,该团队面临的一个问题就是如何将坐标信息转换为有用的机器学习数据。经过认证思考,其采用了建立信息指纹库的方法,来搜集每一个驾驶员有意义和特殊的特征。为了获得信息指纹,团队首先定义了一系列特征:

  • 距离:所有相邻两个坐标欧氏距离的总和。

  • 绝对距离:起点和终点的欧氏距离。

  • 线路中停顿的总时间:驾驶员停顿的总时间。

  • 线路总时间:某个特定线路的表项个数(如果假设线路的坐标值为每秒钟记录的数值,路线中表项的个数就是线路的总秒数)。

  • 速度:某个点的速度定义为该点和前一个点之间的欧氏距离。假设坐标单位为米、坐标之间的记录时间间隔为1秒,该定义所给出的速度单位就为m/s。然而,本次分析中,速度主要用于对比不同点或者不同驾驶员。只要速度的单位相同即可,并不追求其绝对值。对于加速、减速和向心加速度,该说明同样成立。

  • 加速度:加速时,该点和前一点速度的差值

  • 减速度:减速时,该点和前一点速度的差值

  • 向心加速度


其中,v为速度、r为曲线路径所形成圆的半径。半径计算需要用到当前点、之前和之后的若干个点的坐标信息。而,向心加速度是对驾驶员高速驾驶风格的体现:该值越大表明转弯的速度越快。

一个驾驶员所有线路的上述特征组成了其简历(信息指纹)。根据经验,城市道路和高速道路上的平均速度是不同的。因此,一个驾驶员在所有线路上的平均速度并没有很多意义。ecoSysto选择了城市道路、长距离高速道路和乡村道路等不同路线类型的平均速度和最大速度作为了研究对象。

数据统计:根据统计,本次竞赛的数据集中共包含了2700个驾驶员,共54000个线路的信息。所有的线路共包含3.6亿个X/Y坐标——以每秒记录一个坐标来算,共包含10万个小时的线路数据。

机器学习

在初步的数据准备和特征提取后,ecoSysto团队开始选择和测试用于预测驾驶员行为的机器学习模型。

聚类

机器学习的第一步就是把路线进行分类——ecoSysto团队选择k-means算法来对路线类型进行自动分类。这些类别根据所有驾驶员的所有路线推导得到,并不针对单个驾驶员。在拿到聚类结果后,ecoSysto团队的第一感觉就是,提取出的特征和计算得到的分类与路线长度相关。这表明,他们能够作为路线类型的一个指针。最终,根据交叉验证结果,他们选择了8种类型——每条路线指定了一种类型的ID,用于进一步分析。

预测

对于驾驶员行为预测,ecoSysto团队选择一个随机森林(random forest)算法来训练预测模型。该模型用于计算某个特定驾驶员完成给定路线的概率。首先,团队采用下述方法建立了一个训练集:选择一个驾驶员的约200条路线(标为“1”——匹配),再加随机选择的其他驾驶员的约200条路线(标为“0”——不匹配)。然后,这些数据集放入到随机森林训练算法中,产生每个驾驶员的随机森林模型。之后,该模型进行交叉验证,并最终产生Kaggle竞赛的提交数据。根据交叉验证的结果,ecoSysto团队选择了10棵树和最大深度12作为随机森林模型的参数。有关更多Spark机器学习库(MLib)中用于预测的集成学习算法的对比可参考Databrick的博客

流水线

ecoSysto团队的工作流划分为了若干用Java应用实现的独立步骤。这些步骤可以通过“spark-submit”命令字节提交给Spark执行。流水线以Hadoop SequenceFile作为输入,以CSV文件作为输出。流水线主要包含下列步骤:

  • 转换原始输入文件:将原有的55万个小的CSV文件转换为一个单独的Hadoop Sequen ceFile。

  • 提取特征并计算统计数字:利用以上描述的定义计算特征值,并利用Spark RDD变换API计算平均值和方差等统计数字,写入到一个CSV文件中。

  • 计算聚类结果:利用以上特征和统计值以及Spark MLlib的API来对路线进行分类。

  • 随机森林训练:选取maxDepth和crossValidation等配置参数,结合每条线路的特征,开始随机森林模型的训练。对于实际Kaggle提交的数据,ecoSysto团队只是加载了串行化的模型,并预测每条线路属于驾驶员的概率,并将其以CSV格式保存在文件中。

结果

最终,ecoSysto团队的预测模型以74%的精度位列Kaggle排行榜的670位。该团队表示,对于只花2天之间就完成的模型而言,其精度尚在可接受范围内。如果再花费一定的时间,模型精度肯定可以有所改进。但是,该过程证明了高性能分布式计算平台可用于解决实际的机器学习问题。


2015-11-18 18:00:00 weixin_34417635 阅读数 13
  • Kaggle 神器:XGBoost 从基础到实战

    主讲老师冒老师为计算机博士,现在中科院从事科研教学工作,十余年机器学习教学经验,主持国家级科研项目3项,研究方向为机器学习、计算机视觉及多媒体处理。 XGBoost是"极端梯度提升"(eXtreme Gradient Boosting)的简称。XGBoost源于梯度提升框架,但是能并行计算、近似建树、对稀疏数据的有效处理以及内存使用优化,这使得XGBoost至少比现有梯度提升实现有至少10倍的速度提升。XGBoost可以处理回归、分类和排序等多种任务。由于它在预测性能上的强大且训练速度快,XGBoost已屡屡斩获Kaggle各大竞赛的冠军宝座。

    39272 人正在学习 去看看 AI100讲师

如何利用高性能分布式计算平台来解决现实问题一直是人们所关心的话题。近期,comSysto公司的Danial Bartl就分享了该公司研发团队利用Spark平台解决Kaggle竞赛问题的经历,为Spark等平台应用于数据科学领域提供了借鉴。

\\

Danial提到,comSysto公司经常会举行一些讨论会,来评估未来的技术和共享以往的经验。在近期,大数据分析类的众包平台Kaggle的一道数据科学的挑战赛引起了他们的注意。该挑战赛的内容十分有趣:AXA提供了一个包含5万个匿名驾驶员线路的数据集。本次竞赛的目的是根据路线研发出一个驾驶类型的算法类签名,来表征驾驶员的特征。例如,驾驶员是否长距离驾驶?短距离驾驶?高速驾驶?回头路?是否从某些站点急剧加速?是否高速转弯?所有这些问题的答案形成了表征驾驶员特征的独特标签。

\\

面对此挑战,comSysto公司的团队想到了涵盖批处理、流数据、机器学习、图处理、SQL查询以及交互式定制分析等多种处理模型的Spark平台。他们正好以此挑战赛为契机来增强Spark方面的经验。为了对数据集进行分析并控制投入成本,他们搭建了一个包含只三个节点的集群——每个节点包含一个八核的i7处理器和16GB的内存。集群运行了携带Spark库的MapR Hadoop,可以有效存储运算的中间结果。接下来,本文就从数据分析、机器学习和结果等三个方面介绍comSysto团队解决以上问题的过程。

\\

数据分析

\\

作为解决问题的第一个步骤,数据分析起着非常关键的作用。然而,出乎comSysto公司团队意料的是,竞赛提供的原始数据非常简单。该数据集只包含了线路的若干匿名坐标对(x,y),如(1.3,4.4)、(2.1,4.8)和(2.9,5.2)等。如下图所示,驾驶员会在每条线路中出发并返回到原点(0,0),然后从原点挑选随机方向再出发,形成多个折返的路线。

\\

1.png

\\

拿到数据后,comSysto公司的团队有些气馁:只看坐标很难表征一个驾驶员吧?!

\\

信息指纹的定义

\\

因此,在原始数据如此简单的情况,该团队面临的一个问题就是如何将坐标信息转换为有用的机器学习数据。经过认证思考,其采用了建立信息指纹库的方法,来搜集每一个驾驶员有意义和特殊的特征。为了获得信息指纹,团队首先定义了一系列特征:

\\
  • 距离:所有相邻两个坐标欧氏距离的总和。 \\
  • 绝对距离:起点和终点的欧氏距离。 \\
  • 线路中停顿的总时间:驾驶员停顿的总时间。 \\
  • 线路总时间:某个特定线路的表项个数(如果假设线路的坐标值为每秒钟记录的数值,路线中表项的个数就是线路的总秒数)。 \\
  • 速度:某个点的速度定义为该点和前一个点之间的欧氏距离。假设坐标单位为米、坐标之间的记录时间间隔为1秒,该定义所给出的速度单位就为m/s。然而,本次分析中,速度主要用于对比不同点或者不同驾驶员。只要速度的单位相同即可,并不追求其绝对值。对于加速、减速和向心加速度,该说明同样成立。 \\
  • 加速度:加速时,该点和前一点速度的差值 \\
  • 减速度:减速时,该点和前一点速度的差值 \\
  • 向心加速度:\

2.png

\\

其中,v为速度、r为曲线路径所形成圆的半径。半径计算需要用到当前点、之前和之后的若干个点的坐标信息。而,向心加速度是对驾驶员高速驾驶风格的体现:该值越大表明转弯的速度越快。

\\

一个驾驶员所有线路的上述特征组成了其简历(信息指纹)。根据经验,城市道路和高速道路上的平均速度是不同的。因此,一个驾驶员在所有线路上的平均速度并没有很多意义。ecoSysto选择了城市道路、长距离高速道路和乡村道路等不同路线类型的平均速度和最大速度作为了研究对象。

\\

数据统计:根据统计,本次竞赛的数据集中共包含了2700个驾驶员,共54000个线路的信息。所有的线路共包含3.6亿个X/Y坐标——以每秒记录一个坐标来算,共包含10万个小时的线路数据。

\\

机器学习

\\

在初步的数据准备和特征提取后,ecoSysto团队开始选择和测试用于预测驾驶员行为的机器学习模型。

\\

聚类

\\

机器学习的第一步就是把路线进行分类——ecoSysto团队选择k-means算法来对路线类型进行自动分类。这些类别根据所有驾驶员的所有路线推导得到,并不针对单个驾驶员。在拿到聚类结果后,ecoSysto团队的第一感觉就是,提取出的特征和计算得到的分类与路线长度相关。这表明,他们能够作为路线类型的一个指针。最终,根据交叉验证结果,他们选择了8种类型——每条路线指定了一种类型的ID,用于进一步分析。

\\

预测

\\

对于驾驶员行为预测,ecoSysto团队选择一个随机森林(random forest)算法来训练预测模型。该模型用于计算某个特定驾驶员完成给定路线的概率。首先,团队采用下述方法建立了一个训练集:选择一个驾驶员的约200条路线(标为“1”——匹配),再加随机选择的其他驾驶员的约200条路线(标为“0”——不匹配)。然后,这些数据集放入到随机森林训练算法中,产生每个驾驶员的随机森林模型。之后,该模型进行交叉验证,并最终产生Kaggle竞赛的提交数据。根据交叉验证的结果,ecoSysto团队选择了10棵树和最大深度12作为随机森林模型的参数。有关更多Spark机器学习库(MLib)中用于预测的集成学习算法的对比可参考Databrick的博客

\\

流水线

\\

ecoSysto团队的工作流划分为了若干用Java应用实现的独立步骤。这些步骤可以通过“spark-submit”命令字节提交给Spark执行。流水线以Hadoop SequenceFile作为输入,以CSV文件作为输出。流水线主要包含下列步骤:

\\

3.png

\\
  • 转换原始输入文件:将原有的55万个小的CSV文件转换为一个单独的Hadoop SequenceFile。 \\
  • 提取特征并计算统计数字:利用以上描述的定义计算特征值,并利用Spark RDD变换API计算平均值和方差等统计数字,写入到一个CSV文件中。 \\
  • 计算聚类结果:利用以上特征和统计值以及Spark MLlib的API来对路线进行分类。 \\
  • 随机森林训练:选取maxDepth和crossValidation等配置参数,结合每条线路的特征,开始随机森林模型的训练。对于实际Kaggle提交的数据,ecoSysto团队只是加载了串行化的模型,并预测每条线路属于驾驶员的概率,并将其以CSV格式保存在文件中。\

结果

\\

最终,ecoSysto团队的预测模型以74%的精度位列Kaggle排行榜的670位。该团队表示,对于只花2天之间就完成的模型而言,其精度尚在可接受范围内。如果再花费一定的时间,模型精度肯定可以有所改进。但是,该过程证明了高性能分布式计算平台可用于解决实际的机器学习问题。

\\

感谢杜小芳对本文的审校。

\

给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ@丁晓昀),微信(微信号:InfoQChina)关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入InfoQ读者交流群06e1fec4a87eca3142d54d09844c629f.png)。

没有更多推荐了,返回首页