2017-07-18 19:58:15 silveryhand 阅读数 7310
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35107 人正在学习 去看看 张长志

org.apache.spark.ml.feature包中包含了4种不同的归一化方法:

  • Normalizer
  • StandardScaler
  • MinMaxScaler
  • MaxAbsScaler

有时感觉会容易混淆,借助官方文档和实际数据的变换,在这里做一次总结。

原文地址:http://www.neilron.xyz/spark-ml-feature-scaler/

0 数据准备

import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

dataFrame.show

// 原始数据
+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]|
|  2|[4.0,10.0,2.0]|
+---+--------------+

1 Normalizer

Normalizer的作用范围是每一行,使每一个行向量的范数变换为一个单位范数,下面的示例代码都来自spark官方文档加上少量改写和注释。

import org.apache.spark.ml.feature.Normalizer

// 正则化每个向量到1阶范数
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// 将每一行的规整为1阶范数为1的向量,1阶范数即所有值绝对值之和。
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+

// 正则化每个向量到无穷阶范数
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()

// 向量的无穷阶范数即向量中所有值中的最大值
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+

2 StandardScaler

StandardScaler处理的对象是每一列,也就是每一维特征,将特征标准化为单位标准差或是0均值,或是0均值单位标准差。
主要有两个参数可以设置:
- withStd: 默认为真。将数据标准化到单位标准差。
- withMean: 默认为假。是否变换为0均值。

StandardScaler需要fit数据,获取每一维的均值和标准差,来缩放每一维特征。

import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show

// 将每一列的标准差缩放到1。
+---+--------------+------------------------------------------------------------+
|id |features      |scaledFeatures                                              |
+---+--------------+------------------------------------------------------------+
|0  |[1.0,0.5,-1.0]|[0.6546536707079772,0.09352195295828244,-0.6546536707079771]|
|1  |[2.0,1.0,1.0] |[1.3093073414159544,0.1870439059165649,0.6546536707079771]  |
|2  |[4.0,10.0,2.0]|[2.618614682831909,1.870439059165649,1.3093073414159542]    |
+---+--------------+------------------------------------------------------------+

3 MinMaxScaler

MinMaxScaler作用同样是每一列,即每一维特征。将每一维特征线性地映射到指定的区间,通常是[0, 1]。
它也有两个参数可以设置:
- min: 默认为0。指定区间的下限。
- max: 默认为1。指定区间的上限。

import org.apache.spark.ml.feature.MinMaxScaler

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show

// 每维特征线性地映射,最小值映射到0,最大值映射到1。
+--------------+-----------------------------------------------------------+
|features      |scaledFeatures                                             |
+--------------+-----------------------------------------------------------+
|[1.0,0.5,-1.0]|[0.0,0.0,0.0]                                              |
|[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]|
|[4.0,10.0,2.0]|[1.0,1.0,1.0]                                              |
+--------------+-----------------------------------------------------------+

4 MaxAbsScaler

MaxAbsScaler将每一维的特征变换到[-1, 1]闭区间上,通过除以每一维特征上的最大的绝对值,它不会平移整个分布,也不会破坏原来每一个特征向量的稀疏性。

import org.apache.spark.ml.feature.MaxAbsScaler

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()

// 每一维的绝对值的最大值为[4, 10, 2]
+--------------+----------------+                                               
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|
| [2.0,1.0,1.0]|   [0.5,0.1,0.5]|
|[4.0,10.0,2.0]|   [1.0,1.0,1.0]|
+--------------+----------------+

总结

所有4种归一化方法都是线性的变换,当某一维特征上具有非线性的分布时,还需要配合其它的特征预处理方法。

2019-03-04 17:17:54 Java_Man_China 阅读数 5604
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35107 人正在学习 去看看 张长志
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.feature.MaxAbsScaler
/**
  * @author XiaoTangBao
  * @date 2019/3/4 16:21
  * @version 1.0
  */
object Normalized {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    val sparkSession = SparkSession.builder().master("local[4]").appName("NOrmalize").getOrCreate()
    val df =  sparkSession.createDataFrame(Seq((1, Vectors.dense(1.0, 12.5, -108.0)),
      (2, Vectors.dense(2.5, 36.0, 198.0)),(3, Vectors.dense(6.8, 24.0, 459.0))))
      .toDF("id","features")
    //Normalizer的作用范围是每一行,使每一个行向量的范数变换为一个单位范数
    val normalizer1 = new Normalizer()
      .setInputCol("features")
      .setOutputCol("normalfeatures")
      .setP(1.0)
    val L1 = normalizer1.transform(df)
    L1.show(false)

	+---+-----------------+------------------------------------------------------------+
	|id |features         |normalfeatures                                              |
	+---+-----------------+------------------------------------------------------------+
	|1  |[1.0,12.5,-108.0]|[0.00823045267489712,0.102880658436214,-0.8888888888888888] |
	|2  |[2.5,36.0,198.0] |[0.010570824524312896,0.1522198731501057,0.8372093023255814]|
	|3  |[6.8,24.0,459.0] |[0.013883217639853,0.04899959167006941,0.9371171906900776]  |
	+---+-----------------+------------------------------------------------------------+

    //StandardScaler处理的对象是每一列,也就是每一维特征,将特征标准化为单位标准差或是0均值,或是0均值单位标准差。
    val scaler_1 = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
      .setWithStd(true)
      .setWithMean(false)
    val scalerMode_l = scaler_1.fit(df)
    val scalaerdData_1 = scalerMode_l.transform(df)
    scalaerdData_1.show(false)

	+---+-----------------+------------------------------------------------------------+
	|id |features         |scaledFeatures                                              |
	+---+-----------------+------------------------------------------------------------+
	|1  |[1.0,12.5,-108.0]|[0.3321666477362439,1.0637495315070804,-0.38055308480157485]|
	|2  |[2.5,36.0,198.0] |[0.8304166193406097,3.063598650740391,0.6976806554695538]   |
	|3  |[6.8,24.0,459.0] |[2.2587332046064583,2.0423991004935944,1.617350610406693]   |
	+---+-----------------+------------------------------------------------------------+

    //MinMaxScaler作用同样是每一列,即每一维特征。将每一维特征线性地映射到指定的区间,通常是[0, 1]
    val scaler_2 = new MinMaxScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
    val scalerModel_2 = scaler_2.fit(df)
    val scalaerdData_2 = scalerModel_2.transform(df)
    scalaerdData_2.show(false)

	+---+-----------------+--------------------------------------------+
	|id |features         |scaledFeatures                              |
	+---+-----------------+--------------------------------------------+
	|1  |[1.0,12.5,-108.0]|[0.0,0.0,0.0]                               |
	|2  |[2.5,36.0,198.0] |[0.25862068965517243,1.0,0.5396825396825397]|
	|3  |[6.8,24.0,459.0] |[1.0,0.48936170212765956,1.0]               |
	+---+-----------------+--------------------------------------------+

    //MaxAbsScaler将每一维的特征变换到[-1, 1]闭区间上,通过除以每一维特征上的最大的绝对值,它不会平移整个分布,也不会破坏原来每一个特征向量的稀疏性。
    val scaler_3 = new MaxAbsScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
    val scalerModel_3 = scaler_3.fit(df)
    val scalaerdData_3 = scalerModel_3.transform(df)
    scalaerdData_3.show(false)
	+---+-----------------+-------------------------------------------------------------+
	|id |features         |scaledFeatures                                               |
	+---+-----------------+-------------------------------------------------------------+
	|1  |[1.0,12.5,-108.0]|[0.14705882352941177,0.3472222222222222,-0.23529411764705882]|
	|2  |[2.5,36.0,198.0] |[0.36764705882352944,1.0,0.43137254901960786]                |
	|3  |[6.8,24.0,459.0] |[1.0,0.6666666666666666,1.0]                                 |
	+---+-----------------+-------------------------------------------------------------+

  }
}
2018-12-06 13:25:41 k_wzzc 阅读数 112
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35107 人正在学习 去看看 张长志

Spark – 数据的特征缩放(Feature scaling)

特征缩放:有的叫数据归一化,有的叫数据标准化,其实两者有着一些差别,但是大多数时候都是表达的一个意思,它的目的就是使数据缩小范围。具体的介绍请参照维基百科。

spark中就提供了常用的这几种特征缩放方法

  • Normalizer
  • StandardScaler
  • MinMaxScaler
  • MaxAbsScaler
 // 使用 StandardScaler 标准化:  计算公式  X'=(Xi-u)/δ
  val scaler = new StandardScaler()
      .setInputCol("features")  .setOutputCol("scaledfts")
      .setWithStd(true)  .setWithMean(true)

      
// 创建一个 dataframe
    val dataFrame = spark.createDataFrame(Seq(
      (0, Vectors.dense(1.0, 0.5, -1.0)),
      (1, Vectors.dense(2.0, 1.0, 1.0)),
      (2, Vectors.dense(4.0, 10.0, 2.0))
    )).toDF("id", "features")

    dataFrame.printSchema()

  val scalerModel = scaler.fit(dataFrame)
    val scaledData = scalerModel.transform(dataFrame)
    scaledData.show(truncate = false)

spark提供的方法要求输入的数据类型是 Vector格式

root
 |-- id: integer (nullable = false)
 |-- features: vector (nullable = true)
结果查看
+---+--------------+--------------------------------------------------------------+
|id |features      |scaledfts                                                     |
+---+--------------+--------------------------------------------------------------+
|0  |[1.0,0.5,-1.0]|[-0.8728715609439696,-0.6234796863885498,-1.0910894511799618] |
|1  |[2.0,1.0,1.0] |[-0.21821789023599245,-0.5299577334302673,0.21821789023599242]|
|2  |[4.0,10.0,2.0]|[1.0910894511799618,1.1534374198188169,0.8728715609439697]    |
+---+--------------+--------------------------------------------------------------+

很多时候我们拿到的数据的特征不是向量形式。因此在做标准化之前需要将各个特征合并转化成向量。可以有两种方式解决 (本案例以鸢尾花数据集为例)

方法一 spark提供的API

  val iris = spark.read.option("header", true)
      .option("inferSchema", true)
      .csv("F:/DataSource/iris.csv")

  val fts = Array("sepalLength", "sepalWidth", "petalLength", "petalWidth")
    
    // 将多个列合并成向量列的特性转换器
  val amountVectorAssembler: VectorAssembler = new VectorAssembler()
      .setInputCols(fts)
      .setOutputCol("features")
      

    val df1 = amountVectorAssembler.transform(iris)
      .select($"class", $"features")
      
    scaler.fit(df1).transform(df1).show(3 )

方法二 自定义函数

  // 自定义函数合并列并转化为向量
   val vectorUdf = udf((fts: Seq[Double]) => {
     Vectors.dense(fts.toArray)
   })

   val df2 = iris.withColumn("features",
     vectorUdf(array("sepalLength", "sepalWidth", "petalLength", "petalWidth")))
     .select($"class",$"features")

   scaler.fit(df2).transform(df2).show(3)
   
两个方法的结果是一致的
+-----------+-----------------+--------------------+
|      class|         features|           scaledfts|
+-----------+-----------------+--------------------+
|Iris-setosa|[5.1,3.5,1.4,0.2]|[-0.8976738791967...|
|Iris-setosa|[4.9,3.0,1.4,0.2]|[-1.1392004834649...|
|Iris-setosa|[4.7,3.2,1.3,0.2]|[-1.3807270877331...|
+-----------+-----------------+--------------------+

剩下的几种方式就不再一一介绍,用法基本一致,具体的使用方法,适用范围以及计算方法可以参照spark官方提供的文档以及代码;也可以查看相关资料了解更详细的信息。

2019-01-01 16:33:19 vv545454 阅读数 1447
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35107 人正在学习 去看看 张长志

Spark版本:2.4.0
语言:Scala
任务:分类

这里对数据的处理步骤如下:

  1. 载入数据
  2. 归一化
  3. PCA降维
  4. 划分训练/测试集
  5. 线性SVM分类
  6. 验证精度
  7. 输出cvs格式的结果

前言

从Spark 2.0开始,Spark机器学习API是基于DataFrame的spark.ml。而之前的基于RDD的API spark.mllib已进入维护模式。
也就是说,Spark ML是Spark MLlib的一种新的API,它主要有以下几个优点:

  • 面向DataFrame,在RDD基础上进一步封装,提供更强大更方便的API
  • Pipeline功能,便于实现复杂的机器学习模型
  • 性能提升

基于Pipeline的Spark ML中的几个概念:

  • DataFrame:从Spark SQL 的引用的概念,表示一个数据集,它可以容纳多种数据类型。例如可以存储文本,特征向量,标签和预测值等
  • Transformer:是可以将一个DataFrame变换成另一个DataFrame的算法。例如,一个训练好的模型是一个Transformer,通过transform方法,将原始DataFrame转化为一个包含预测值的DataFrame
  • Estimator:是一个算法,接受一个DataFrame,产生一个Transformer。例如,一个学习算法(如PCA,SVM)是一个Estimator,通过fit方法,训练DataFrame并产生模型Transformer
  • Pipeline: Pipeline将多个Transformers和Estimators连接起来组合成一个机器学习工作流程
  • Parameter:用于对Transformers和Estimators指定参数的统一接口

本次实验使用的是Spark ML的API

首先要创建SparkSession

// 创建SparkSession
val spark = SparkSession
  .builder
  .appName("LinearSVCExample")
  .master("local")
  .getOrCreate()

数据处理步骤

1 载入数据

数据载入的方式有多种,这里使用libsvm格式的数据作为数据源,libsvm格式常被用来存储稀疏的矩阵数据,它每一行的格式如下:

label index1:value1 index2:value2 ...

第一个值是标签,后面是由“列号:值”组成键值对,只需要记录非0项即可。

数据加载使用load方法完成:

// 加载训练数据,生成DataFrame
val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

2 归一化

作为数据预处理的第一步,需要对原始数据做归一化处理,即把原始数据的每一维减去其平均值,再除以其标准差,使得数据总体分布为以0为中心,且标准差为1。

// 归一化
val scaler = new StandardScaler()
   .setInputCol("features")
   .setOutputCol("scaledFeatures")
   .setWithMean(true)
   .setWithStd(true)
   .fit(data)

val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")

3 PCA降维

有时数据的维数可能很大,直接进行分类不仅计算量很大,而且对数据量的要求也很高,常常会出现过拟合。因此需要进行降维,常用的是主成分分析(PCA)算法。

// 创建PCA模型,生成Transformer
val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(5)
  .fit(scaleddata)

//  transform数据,生成主成分特征
val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")

4 划分训练/测试集

经过降维的数据就可以拿来训练分类器了,但是在此之前要将数据划分为训练集和测试集,分类器只能在训练集上进行训练,在测试集上验证其分类精度。Spark提供了很方便的接口,按给定的比例随机划分训练/测试集。

// 将经过主成分分析的数据,按比例划分为训练数据和测试数据
val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)

5 线性SVM分类

这一步构建线性SVM模型,设置最大迭代次数和正则化项的系数,使用训练集进行训练。

// 创建SVC分类器(Estimator)
val lsvc = new LinearSVC()
  .setMaxIter(10)
  .setRegParam(0.1)

// 训练分类器,生成模型(Transformer)
val lsvcModel = lsvc.fit(trainingData)

6 验证精度

将训练好的分类器作用于测试集上,获得分类结果。

分类结果的好坏有很多种衡量的方法,如查准率、查全率等,这里我们使用最简单的一种衡量标准——精度,即正确分类的样本数占总样本数的比值。

// 用训练好的模型,验证测试数据
val res = lsvcModel.transform(testData).select("prediction","label")

// 计算精度
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(res)

println(s"Accuracy = ${accuracy}")

7 输出cvs格式的结果

Spark的DataFrame类型支持导出多种格式,这里以常用的csv格式为例。

这里输出的目的是为了使用Python进行可视化,在降维后进行,可以直观的看出降维后的数据是否明显可分。

使用VectorAssembler,将标签与特征合并为一列,再进行输出。

(这里是将合并后的列转换为String再输出的,因此输出的csv文件是带有引号和括号的,至于为什么要这样输出,请看第二部分)

// 将标签与主成分合成为一列
val assembler = new VectorAssembler()
  .setInputCols(Array("label","features"))
  .setOutputCol("assemble")
val output = assembler.transform(pcaResult)

// 输出csv格式的标签和主成分,便于可视化
val ass = output.select(output("assemble").cast("string"))
ass.write.mode("overwrite").csv("output.csv")

当然也可以用同样的方法输出训练/预测的结果,这里就不再详细介绍。

遇到的问题

完成这个简单的分类实验,花了我两天多的时间,从配置环境到熟悉API,再到遇见各种奇怪的问题……这里我都把他们记录下来,供以后参考。

1 配置环境

起初,我想通过在本机编写代码,然后访问安装在虚拟机中的Spark节点(单节点)这种方式进行实验的(不是提交jar包然后执行spark-submit),也就在是创建SparkSession时,指定虚拟机中的Spark:

val spark = SparkSession
  .builder
  .appName("LinearSVCExample")
  .master("spark://192.168.1.128:7077") // 虚拟机IP
  .getOrCreate()

然而,这样并没有成功。遇到的问题有:

  • 拒绝连接
  • Spark的worker里可以查看到提交的任务,但是一直处于等待状态,没有响应。并且提示:
    WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources(实际上,内存和CPU是够的)
  • 报错RuntimeException: java.io.EOFException......

在尝试过各种方案都没有解决问题之后,我放弃了,最后还是在本机中安装Spark,在local模式下运行。(如果有同学成功实现上面的访问方法,欢迎留言告诉我~

至于如何在本机(Windows)安装Spark,百度搜索即可

2 导出CSV格式的数据

将DataFrame导出为cvs格式的时候,遇到了这个问题:
java.lang.UnsupportedOperationException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.

而我要导出的DataFrame只是一个多行数组而已啊:

image.png

根据StackOverflow上面的提问,Spark的csv导出不支持复杂结构,array都不行。

然后有人给了一种办法,把数组转化为String,就可以导出了。

但是导出的结果是这样的:

image.png

需要进一步处理。

所以还不如手动实现导出csv文件,或者你有更好的办法,欢迎留言告诉我,非常感谢~

3 PCA维数限制

当我想跑一个10万维度的数据时,程序运行到PCA报错:
java.lang.IllegalArgumentException: Argument with more than 65535 cols: 109600

原来,Spark ML的PCA不支持超过65535维的数据。参见源码

4 SVM核

翻阅了Spark ML文档,只找到Linear Support Vector Machine,即线性核的支持向量机。对于高斯核和其他非线性的核,Spark ML貌似还没有实现。

image.png

5 withColumn操作

起初我认为对数据进行降维前,需要把DataFrame中的标签label与特征feature分开,然后对feature进行降维,再使用withColumn方法,把label与降维后的feature组合成新的DataFrame。

发现这样既不可行也没有必要。

首先,withColumn只能添加当前DataFrame的数据(对DataFrame某一列进行一些操作,再添加到这个DataFrame本身),不能把来自于不同DataFrame的Column添加到当前DataFrame中。

其次,PCA降维时,只需指定InputCoulum作为特征列,指定OutputColumn作为输出列,其他列的存在并不影响PCA的执行,PCA也不会改变它们,在新生成的DataFrame中依然会保留原来所有Column,并且添加上降维后的数据Column,后面再使用select方法选择出所需的Column即可。

完整代码(Pipeline版)

import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.classification.LinearSVC
import org.apache.spark.sql.SparkSession

object Hello {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "D:\\hadoop-2.8.3")
    //  屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 创建sparkSession
    val spark = SparkSession
      .builder
      .appName("LinearSVCExample")
      .master("local")
      .getOrCreate()

    // 加载训练数据,生成DataFrame
    val data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

    println(data.count())

    // 归一化
    val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
      .setWithMean(true)
      .setWithStd(true)
      .fit(data)

    val scaleddata = scaler.transform(data).select("label", "scaledFeatures").toDF("label","features")

    // 创建PCA模型,生成Transformer
    val pca = new PCA()
      .setInputCol("features")
      .setOutputCol("pcaFeatures")
      .setK(5)
      .fit(scaleddata)

    //  transform 数据,生成主成分特征
    val pcaResult = pca.transform(scaleddata).select("label","pcaFeatures").toDF("label","features")

    //  pcaResult.show(truncate=false)

    // 将标签与主成分合成为一列
    val assembler = new VectorAssembler()
      .setInputCols(Array("label","features"))
      .setOutputCol("assemble")
    val output = assembler.transform(pcaResult)

    // 输出csv格式的标签和主成分,便于可视化
    val ass = output.select(output("assemble").cast("string"))
    ass.write.mode("overwrite").csv("output.csv")

    // 将经过主成分分析的数据,按比例划分为训练数据和测试数据
    val Array(trainingData, testData) = pcaResult.randomSplit(Array(0.7, 0.3), seed = 20)

    // 创建SVC分类器(Estimator)
    val lsvc = new LinearSVC()
      .setMaxIter(10)
      .setRegParam(0.1)

    // 创建pipeline, 将上述步骤连接起来
    val pipeline = new Pipeline()
      .setStages(Array(scaler, pca, lsvc))
    
    // 使用串联好的模型在训练集上训练
    val model = pipeline.fit(trainingData)
    
    // 在测试集上测试
    val predictions = model.transform(testData).select("prediction","label")

    // 计算精度
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)

    println(s"Accuracy = ${accuracy}")

    spark.stop()
  }
}

最后的精度为1.0,这里使用的测试数据比较好分,从PCA后对前两维的可视化结果可以看出:

image.png

参考资料

Spark ML文档
DataFrame API
PCA列数限制-源码
导出cvs文件方法-stackoverflow
无法导出csv文件-stackoverflow
示例数据

2019-10-21 15:02:00 silentwolfyh 阅读数 239
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35107 人正在学习 去看看 张长志

目录:

一、描述

二、数据准备

三、 Normalizer

四、 StandardScaler

五、 MinMaxScaler

六、MaxAbsScaler

七、总结



一、描述

org.apache.spark.ml.feature包中包含了4种不同的归一化方法:

  • Normalizer
  • StandardScaler
  • MinMaxScaler
  • MaxAbsScaler

       有时感觉会容易混淆,借助官方文档和实际数据的变换,在这里做一次总结。

二、数据准备

import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

dataFrame.show

// 原始数据
+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]|
|  2|[4.0,10.0,2.0]|
+---+--------------+

三、 Normalizer

       Normalizer的作用范围是每一行,使每一个行向量的范数变换为一个单位范数,下面的示例代码都来自spark官方文档加上少量改写和注释。
在这里插入图片描述

import org.apache.spark.ml.feature.Normalizer

// 正则化每个向量到1阶范数
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// 将每一行的规整为1阶范数为1的向量,1阶范数即所有值绝对值之和。
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+

// 正则化每个向量到无穷阶范数
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()

// 向量的无穷阶范数即向量中所有值中的最大值
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+

四、 StandardScaler

       StandardScaler处理的对象是每一列,也就是每一维特征,将特征标准化为单位标准差或是0均值,或是0均值单位标准差。

主要有两个参数可以设置:

  • withStd: 默认为真。将数据标准化到单位标准差。
  • withMean: 默认为假。是否变换为0均值。

StandardScaler需要fit数据,获取每一维的均值和标准差,来缩放每一维特征。

标准差公式
在这里插入图片描述

import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show

// 将每一列的标准差缩放到1。
+---+--------------+------------------------------------------------------------+
|id |features      |scaledFeatures                                              |
+---+--------------+------------------------------------------------------------+
|0  |[1.0,0.5,-1.0]|[0.6546536707079772,0.09352195295828244,-0.6546536707079771]|
|1  |[2.0,1.0,1.0] |[1.3093073414159544,0.1870439059165649,0.6546536707079771]  |
|2  |[4.0,10.0,2.0]|[2.618614682831909,1.870439059165649,1.3093073414159542]    |
+---+--------------+------------------------------------------------------------+

五、 MinMaxScaler

       MinMaxScaler作用同样是每一列,即每一维特征。将每一维特征线性地映射到指定的区间,通常是[0, 1]。

它也有两个参数可以设置:

  • min: 默认为0。指定区间的下限。
  • max: 默认为1。指定区间的上限。

在这里插入图片描述

import org.apache.spark.ml.feature.MinMaxScaler

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show

// 每维特征线性地映射,最小值映射到0,最大值映射到1。
+--------------+-----------------------------------------------------------+
|features      |scaledFeatures                                             |
+--------------+-----------------------------------------------------------+
|[1.0,0.5,-1.0]|[0.0,0.0,0.0]                                              |
|[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]|
|[4.0,10.0,2.0]|[1.0,1.0,1.0]                                              |
+--------------+-----------------------------------------------------------+

六、MaxAbsScaler

       MaxAbsScaler将每一维的特征变换到[-1, 1]闭区间上,通过除以每一维特征上的最大的绝对值,它不会平移整个分布,也不会破坏原来每一个特征向量的稀疏性。

import org.apache.spark.ml.feature.MaxAbsScaler

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()

// 每一维的绝对值的最大值为[4, 10, 2]
+--------------+----------------+                                               
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|
| [2.0,1.0,1.0]|   [0.5,0.1,0.5]|
|[4.0,10.0,2.0]|   [1.0,1.0,1.0]|
+--------------+----------------+

备注:

1、第一列为1.0,2.0,4.0,所以最大绝对值为4.0作为分母
2、1.0/4 为 0.25
3、2.0/4 为 0.5
4、4.0/4 为 1.0

七、总结

       所有4种归一化方法都是线性的变换,当某一维特征上具有非线性的分布时,还需要配合其它的特征预处理方法。

没有更多推荐了,返回首页