精华内容
下载资源
问答
  • spark机器学习模型选择以及超参数调优相关的网页资料的搜集整理
  • SPARK模型实例,基于HiveSQL,实现随机森林模型的训练和预测

    本篇紧接上一篇官方实例
    http://blog.csdn.net/dahunbi/article/details/72821915
    官方实例有个缺点,用于训练的数据直接就load进来了,不做任何处理,有些投机取巧。

        // Load and parse the data file.
        val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

    实践中,我们的spark都是架构在hadoop系统上的,表都是存放在HDFS上,那么正常的提取方式是用hiveSQL,要调用HiveContext。
    上一篇提到过,有两个machine learning的库,一个是ML,一个是MLlib

    ML的实例,用到pipeline:

    import java.io.{ObjectInputStream, ObjectOutputStream}
    
    import org.apache.spark.ml.util.MLWritable
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FSDataInputStream, Path, FileSystem}
    import org.apache.spark.ml.feature.VectorAssembler
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
    
    
        val hc = new HiveContext(sc)
        import hc.implicits._
        // 调用HiveContext
    
        // 取样本,样本的第一列为label(0或者1),其他列可能是姓名,手机号,以及真正要参与训练的特征columns
        val data = hc.sql(s"""select  *  from database1.traindata_userprofile""".stripMargin)
        //提取schema,也就是表的column name,drop(2)删掉1,2列,只保留特征列
    
        val schema = data.schema.map(f=>s"${f.name}").drop(2)
    
        //ML的VectorAssembler是一个transformer,要求数据类型不能是string,将多列数据转化为单列的向量列,比如把age、income等等字段列合并成一个 userFea 向量列,方便后续训练
        val assembler = new VectorAssembler().setInputCols(schema.toArray).setOutputCol("userFea")
        val userProfile = assembler.transform(data.na.fill(-1e9)).select("label","userFea")
        val data_train = userProfile.na.fill(-1e9)
        // 取训练样本
        val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(userProfile)
        val featureIndexer = new VectorIndexer().setInputCol("userFea").setOutputCol("indexedFeatures").setMaxCategories(4).fit(userProfile)
    
        // Split the data into training and test sets (30% held out for testing).
        val Array(trainingData, testData) = userProfile.randomSplit(Array(0.7, 0.3))
        // Train a RandomForest model.
        val rf = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
        rf.setMaxBins(32).setMaxDepth(6).setNumTrees(90).setMinInstancesPerNode(4).setImpurity("gini")
        // Convert indexed labels back to original labels.
        val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
    
        val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
    
        // Train model. This also runs the indexers.
        val model = pipeline.fit(trainingData)
        println("training finished!!!!")
        // Make predictions.
        val predictions = model.transform(testData)
    
        // Select example rows to display.
        predictions.select("predictedLabel", "indexedLabel", "indexedFeatures").show(5)
    
        val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
        val accuracy = evaluator.evaluate(predictions)
        println("Test Error = " + (1.0 - accuracy))
    }

    MLlib的例子,基于RDD,请注意从ML的vector转换成MLlib的vector的过程

    import java.io.{ObjectInputStream, ObjectOutputStream}
    
    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FSDataInputStream, Path, FileSystem}
    import org.apache.spark.ml.feature.VectorAssembler
    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.tree.RandomForest
    import org.apache.spark.mllib.tree.model.RandomForestModel
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}
    //import org.apache.spark.ml.linalg.Vector
    import org.apache.spark.mllib.util.MLUtils
    
      var modelRF: RandomForestModel = null
    
      val hc = new HiveContext(sc)
      import hc.implicits._
      // 广告画像构建完毕
    
      // 取样本,样本的第一列为label(0或者1),其他列可能是姓名,手机号,以及真正要参与训练的特征columns
      val data = hc.sql(s"""select  *  from database1.traindata_userprofile""".stripMargin)
      提取schema,也就是表的column name,drop(2)删掉1,2列,只保留特征列
      val schema = data.schema.map(f=>s"${f.name}").drop(1)
      //ML的VectorAssembler是一个transformer,要求数据类型不能是string,将多列数据转化为单列的向量列,比如把age、income等等字段列合并成一个 userFea 向量列,方便后续训练
      val assembler = new VectorAssembler().setInputCols(schema.toArray).setOutputCol("userFea")
      val data2 = data.na.fill(-1e9)
      val userProfile = assembler.transform(data2).select("label","userFea")
    
      //重点在这:用ML的VectorAssembler构建的vector,必须要有这个格式的转换,从ML的vector转成 MLlib的vector,才能给MLlib里面的分类器使用(这两种vector还真是个坑,要注意)
      val userProfile2 = MLUtils.convertVectorColumnsFromML(userProfile, "userFea")
      // 取训练样本
      val rdd_Data : RDD[LabeledPoint]= userProfile2.rdd.map {
        x => val label = x.getAs[Double]("label")
          val userFea = x.getAs[Vector]("userFea")
          LabeledPoint(label,userFea)
      }
      // 构建好了训练数据就可以进行训练了, RF的参数如下
      val impurity = "gini"
      val featureSubsetStrategy = "auto"
      // Let The Algorithm Choose
      val categoricalFeaturesInfo = Map[Int, Int]()
      val iteration = 50
      val maxDepth = 9
      val numClasses = 2
      val maxBins = 32
      val numTrees = 70
      modelRF = RandomForest.trainClassifier(rdd_Data, numClasses, categoricalFeaturesInfo,
        numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
      println("training finished!!!!")
      // Evaluate model on test instances and compute test error
      val labelAndPreds = userProfile2.rdd.map { x=>
        val label = x.getAs[Double]("label")
        val userFea = x.getAs[Vector]("userFea")
        val prediction = modelRF.predict(userFea)
        (label, prediction)
      }
      labelAndPreds.take(10).foreach(println)
      modelRF.save(sc, "/home/user/victorhuang/RFCModel_mllib")
      spark.stop()
    展开全文
  • Spark - ML Tuning这一章节主要讲述如何通过使用MLlib的工具来调试模型算法和pipeline,内置的交叉验证和其他工具允许用户优化模型和pipeline中的超参数;目录:模型选择,也就是调参;交叉验证;训练集、验证集划分...

    Spark - ML Tuning

    这一章节主要讲述如何通过使用MLlib的工具来调试模型算法和pipeline,内置的交叉验证和其他工具允许用户优化模型和pipeline中的超参数;

    目录:

    模型选择,也就是调参;

    交叉验证;

    训练集、验证集划分;

    模型选择(调参)

    机器学习的一个重要工作就是模型选择,或者说根据给定任务使用数据来发现最优的模型和参数,也叫做调试,既可以针对单个模型进行调试,也可以针对整个pipeline的各个环节进行调试,使用者可以一次对整个pipeline进行调试而不是每次一个pipeline中的部分;

    MLlib支持CrossValidator和TrainValidationSplit等模型选择工具,这些工具需要下列参数:

    Estimator:待调试的算法或者Pipeline;

    参数Map列表:用于搜索的参数空间;

    Evaluator:衡量模型在集外测试集上表现的方法;

    这些工具工作方式如下:

    分割数据到训练集和测试集;

    对每一组训练&测试数据,应用所有参数空间中的可选参数组合:

    对每一组参数组合,使用其设置到算法上,得到对应的model,并验证该model的性能;

    选择得到最好性能的模型使用的参数组合;

    Evaluator针对回归问题可以是RegressionEvaluator,针对二分数据可以是BinaryClassificationEvaluator,针对多分类问题的MulticlassClassificationEvaluator,默认的验证方法可以通过setMetricName来修改;

    交叉验证

    CrossValidator首先将数据分到一个个的fold中,使用这些fold集合作为训练集和测试集,如果k=3,那么CrossValidator将生成3个(训练,测试)组合,也就是通过3个fold排列组合得到的,每一组使用2个fold作为训练集,另一个fold作为测试集,为了验证一个指定的参数组合,CrossValidator需要计算3个模型的平均性能,每个模型都是通过之前的一组训练&测试集训练得到;

    确认了最佳参数后,CrossValidator最终会使用全部数据和最佳参数组合来重新训练预测;

    例子:通过交叉验证进行模型选择;

    注意:交叉验证在整个参数网格上是十分耗时的,下面的例子中,参数网格中numFeatures有3个可取值,regParam有2个可取值,CrossValidator使用2个fold,这将会训练3*2*2个不同的模型,在实际工作中,通常会设置更多的参数、更多的参数取值以及更多的fold,换句话说,CrossValidator本身就是十分奢侈的,无论如何,与手工调试相比,它依然是一种更加合理和自动化的调参手段;

    from pyspark.ml import Pipeline

    from pyspark.ml.classification import LogisticRegression

    from pyspark.ml.evaluation import BinaryClassificationEvaluator

    from pyspark.ml.feature import HashingTF, Tokenizer

    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

    # Prepare training documents, which are labeled.

    training = spark.createDataFrame([

    (0, "a b c d e spark", 1.0),

    (1, "b d", 0.0),

    (2, "spark f g h", 1.0),

    (3, "hadoop mapreduce", 0.0),

    (4, "b spark who", 1.0),

    (5, "g d a y", 0.0),

    (6, "spark fly", 1.0),

    (7, "was mapreduce", 0.0),

    (8, "e spark program", 1.0),

    (9, "a e c l", 0.0),

    (10, "spark compile", 1.0),

    (11, "hadoop software", 0.0)

    ], ["id", "text", "label"])

    # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.

    tokenizer = Tokenizer(inputCol="text", outputCol="words")

    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

    lr = LogisticRegression(maxIter=10)

    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

    # We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.

    # This will allow us to jointly choose parameters for all Pipeline stages.

    # A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

    # We use a ParamGridBuilder to construct a grid of parameters to search over.

    # With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,

    # this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.

    paramGrid = ParamGridBuilder() \

    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \

    .addGrid(lr.regParam, [0.1, 0.01]) \

    .build()

    crossval = CrossValidator(estimator=pipeline,

    estimatorParamMaps=paramGrid,

    evaluator=BinaryClassificationEvaluator(),

    numFolds=2) # use 3+ folds in practice

    # Run cross-validation, and choose the best set of parameters.

    cvModel = crossval.fit(training)

    # Prepare test documents, which are unlabeled.

    test = spark.createDataFrame([

    (4, "spark i j k"),

    (5, "l m n"),

    (6, "mapreduce spark"),

    (7, "apache hadoop")

    ], ["id", "text"])

    # Make predictions on test documents. cvModel uses the best model found (lrModel).

    prediction = cvModel.transform(test)

    selected = prediction.select("id", "text", "probability", "prediction")

    for row in selected.collect():

    print(row)

    划分训练、验证集

    对于超参数调试,Spark还支持TrainValidationSplit,它一次只能验证一组参数,这与CrossValidator一次进行k次截然不同,因此它更加快速,但是如果训练集不够大的化就无法得到一个真实的结果;

    不像是CrossValidator,TrainValidationSplit创建一个训练、测试组合,它根据trainRatio将数据分为两部分,假设trainRatio=0.75,那么数据集的75%作为训练集,25%用于验证;

    与CrossValidator类似的是,TrainValidationSplit最终也会使用最佳参数和全部数据来训练一个预测器;

    from pyspark.ml.evaluation import RegressionEvaluator

    from pyspark.ml.regression import LinearRegression

    from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

    # Prepare training and test data.

    data = spark.read.format("libsvm")\

    .load("data/mllib/sample_linear_regression_data.txt")

    train, test = data.randomSplit([0.9, 0.1], seed=12345)

    lr = LinearRegression(maxIter=10)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.

    # TrainValidationSplit will try all combinations of values and determine best model using

    # the evaluator.

    paramGrid = ParamGridBuilder()\

    .addGrid(lr.regParam, [0.1, 0.01]) \

    .addGrid(lr.fitIntercept, [False, True])\

    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\

    .build()

    # In this case the estimator is simply the linear regression.

    # A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

    tvs = TrainValidationSplit(estimator=lr,

    estimatorParamMaps=paramGrid,

    evaluator=RegressionEvaluator(),

    # 80% of the data will be used for training, 20% for validation.

    trainRatio=0.8)

    # Run TrainValidationSplit, and choose the best set of parameters.

    model = tvs.fit(train)

    # Make predictions on test data. model is the model with combination of parameters

    # that performed best.

    model.transform(test)\

    .select("features", "label", "prediction")\

    .show()

    展开全文
  • Spark 模型总结

    2015-07-18 14:55:50
    当然,在Spark,还提供了更多的transformation 函数,比方说 filter 和 join。以及其他很多很多的操作,极大提升了灵活性。 2. Spark Driver and Workers 一张图表示Spark整体架构: RDDs就是...

    注明:图片以及相关资料均来自Scalable Machine Learning from BerkelyX,只是个人总结使用,侵权即删

    整体框架

    1. Map-Reduce

    MR的价值体现在对大数据集的分布式处理上。
    如下面的图例:(图片来自Scalable Machine Learning from BerkelyX)
    MR1
    MR2
    将大规模的文档先分开成不同的partitions到不同的worker;再通过map,对每一个worker的文档进行映射处理;最后一步通过Reduce操作,分而治之。

    • 不仅仅是Map-Reduce
      当然,在Spark,还提供了更多的transformation 函数,比方说 filter 和 join。以及其他很多很多的操作,极大提升了灵活性。
      MR3

    2. Spark Driver and Workers

    一张图表示Spark整体架构:
    architect
    RDDs就是分布在workers上的
    Spark Context一开始就要定义

    内设对象

    1. RDDs

    Resilient Distributed Datasets

    存在样式:
    RDDs
    一旦创建不可更改!

    • 可以通过如下方式创建:
      parallelize 一个数据集
      transform 另一个RDDs
      从HDFS或者其他存储系统中读取

    • 操作:

      1. Transformations:
        比如map, flatmap, filter等
        属性:lazy,非立刻执行,而是等到actions发生才会被执行
      2. Actions:
        比如:collect,count, reduce
    • 总结
      RDDs的整体流程就是
      RDDs



    对RDDs的操作实例可参见我的另一篇文章Spark+Python lab2

    2. Key-Value RDDs

    3. Closures

    4.Shared Variables

    <未完待续>

    展开全文
  • 1 Spark编程模型 用户使用SparkContext 提供的API编写Driver段程序(常用的textFile sequenceFile runjob 等)Spark

    1 Spark编程模型

    在这里插入图片描述

    1. 用户使用SparkContext 提供的API编写Driver段程序(常用的textFile sequenceFile runjob 等)Spark

    2.4Spark基本架构

    从集群部署的角度来看,Spark集群由集群管理器Cluster Manager 工作节点Worker 执行器 Executor 驱动器Driver 应用程序Application 等部分组成
    1Cluster Manager 并不负责对Executor 的资源分配,分配的资源属于一级分配,将各个Worker上的内存、CPU等资源分配给Application ,不负责对Executor的资源分配
    2、Worker
    Worker阶段主要负责:

    将自己内存、CPU等资源通过注册机制告知Cluster Manager;
    创建Executor;
    将资源和任务进一步分配给Executor;
    同步资源信息,Executor状态信息给Cluster Manager等

    3、Executor
    执行计算任务的一线组件,主要负责任务执行与Worker、Driver信息同步
    4、Driver
    Application的驱动程序,Application通过Driver与Cluster Manager、Executor进行通信。
    5、Application
    用户使用Spark提供的API编写的应用程序,Application通过Spark API进行RDD的转换和DAG的构建
    并通过Driver将Application注册到Cluster Manager ,Cluser Manager根据Application的资源需求,通过一级分配将内存、CPU等资源分配给Application。Driver通过二级分配将Executor等资源分配给每一个任务 Application 最终通过Driver告诉Executor运行任务
    关系图如下
    在这里插入图片描述

    展开全文
  • 作者 | Anne Holler、Michael Mui 译者 | 平川 编辑 | Natalie ...该平台被设计成了一个端到端的工作流,目前支持经典的机器学习、时间序列预测和深度学习模型,可以涵盖大量的用例,从生成市场预测...
  • failure: Lost task 18.3 in stage 17.0 (TID 59784,XXXXX, executor 19): java.lang.IllegalArgumentException: Row length is 0 场景是写入hbase: val put = new Put(Bytes.toBytes(line._1)) ...
  • REST Web服务,用于对PMML模型进行评分。 目录 特征 完全支持PMML规范版本3.0到4.4。 评估由库处理。 简单而强大的REST API: 模型部署和取消部署。 在单个预测,批量预测和CSV预测模式下进行模型评估。 高性能...
  • error:org.apache.spark.shuffle.FetchFailedException: Failed to connect to xxxx/xx.xx.xx.xx:xxxx 定位来定位去与防火墙等无关。反复查看日志: 2019-09-30 11:00:46,521 | WARN | [dispatcher-event-loop-...
  • 我正在玩Spark,以便学习如何使用它.我按照安装说明在Homestead下进行了新的安装.从路由页面生成的css链接是没有生成css.因此,主页被顶起.任何人都可以帮我追查问题吗?更新以下是运行npm install时收到的警告:npm ...
  • Spark编程模型

    千次阅读 2015-07-17 15:13:22
    Spark编程模型
  • Spark内存模型

    2020-02-04 16:02:18
    Spark内存模型 Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优 在执行 Spark 的应用...
  • Spark计算模型

    千次阅读 2018-08-15 10:59:45
    1. Spark计算模型 1.1 Spark程序模型 首先通过一个简单的实例了解Spark的程序模型。 1)SparkContext中的textFile函数从HDFS读取日志文件,输出变量file。 valfile=sc.textFile("hdfs://xxx") 2)RDD...
  • Spark 存储模型

    2018-12-14 17:53:21
    spark 存储模型是主从模型,其中Driver是Master,Executor是Slave。Driver负责数据的元信息管理,Slave 负责存储数据,执行Driver传递过来的数据操作命令。 Driver 应用启动时,SparkContext会在Driver端创建SparkEnv...
  • spark计算模型

    千次阅读 2016-02-07 11:15:34
    spark计算模型 与Hadoop 不同,Spark 一开始就瞄准性能,将数据(包括部分中间数据)放在内存,在内存中计算。用户将重复利用的数据缓存到内存,提高下次的计算效率,因此Spark 尤其适合迭代型和交互型任务。Spark ...
  • Spark 内存模型

    2021-03-17 23:45:28
    spark 内存模型中会涉及到多个配置,这些配置由一些环境参数及其配置值有关,为防止后面理解混乱,现在这里列举出来,如果忘记了,可以返回来看看: spark.executor.memory :JVM On-Heap 内存(堆内内存),在使用...
  • 服务端使用训练出来的模型,spark模型计算第一步是实现spark模型加载。  线上服务对用户体验影响极大,故需要对模型使用进行优化。  1、多线程并发进行计算,线上两个服务。优化cpu 2、在扩召回集,io是性能...
  • Spark调度模型

    2016-07-06 10:33:00
    调度模型的好坏,是由底层的抽象模型所决定的,spark的底层抽象是RDD spark调度模型系统,分为底层调度模型(TASKscheduler)和高层调度模型(DAGscheduler) 调度过程 1. application driver发出请求,这个...
  • Spark sparkcore 内存模型

    2019-01-09 16:56:20
    Spark学习之路 (十一)SparkCore的调优之Spark内存模型 目录一、概述二、堆内和堆外内存规划2.1 堆内内存2.2 堆外内存2.3 内存管理接口三、内存空间分配3.1 静态内存管理3.2 统一内存管理四、...
  • SPARK2.1.0模型设计与基本架构(下) ... 阅读提示:读者如果对Spark的背景知识不是很了解的话,建议首先阅读《SPARK2.1.0...SPARK模型设计 1. Spark编程模型 正如Hadoop在介绍MapReduce编程模型时选择word count...
  • Spark模型设计 1. Spark编程模型 正如Hadoop在介绍MapReduce编程模型时选择word count的例子,并且使用图形来说明一样,笔者对于Spark编程模型也选择用图形展现。 Spark 应用程序从编写到提交、执行、输出的整个...
  • RDD是Spark核心数据结构,它是逻辑集的实体,在集群中多台机器之间进行数据分区,通过对多台机器上RDD分区的控制,能够减少数据的重排(data Shuffling)。Spark通过partitionBy运算符对原始RDD进行数据再分配从而创建...
  • SparkSpark 编程模型及快速入门

    千次阅读 2016-11-25 10:25:08
    http://blog.csdn.net/pipisorry/article/details/52366356Spark编程模型SparkContext类和SparkConf类我们可通过如下方式调用 SparkContext 的简单构造函数,以默认的参数值来创建相应的对象。val sc = new ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,853
精华内容 2,341
关键字:

spark模型