spark学习使用什么软件_spark学习使用 - CSDN
精华内容
参与话题
  • Spark学习笔记二之Spark Core核心概念一网打尽 简介 定义 ​ RDD(Resilient Distributed Datasets) ,直译成中文就是:弹性分布式数据集。 ​ 本质上,RDD 其实就是一系列元数据构成的,包含着数据分区列表,计算...

    RDD可以说是Spark Core最核心的内容,这一篇开始我将根据自己的理解针对RDD进行深入的剖析。

    注意一些关于Spark Core的简单的概念理解,请参考一下我的这篇博客:
    Spark学习笔记二之Spark Core核心概念一网打尽

    简介

    定义

    ​ RDD(Resilient Distributed Datasets) ,直译成中文就是:弹性分布式数据集。

    ​ 本质上,RDD 其实就是一系列元数据构成的,包含着数据分区列表,计算逻辑, RDD 之间的依赖转换关系等。

    RDD和Spark的关系:

    ​ Spark建立在抽象的RDD上,使得它可以用一致的方式处理大数据不同的应用场景,把所有需要处理的数据转化成RDD,然后对RDD进行一系列的算子运算,从而得到结果。

    Spark统一平台的根基就是RDD,RDD提供了统一的方式支持下列软件库:结构化查询语言Spark SQL,流处理框架Spark Streaming,结构化流处理框架Structured Streaming,机器学习库MLLib,图处理库GraphX等,也就是说上面说的软件库底层的数据抽象都是RDD。

    五大特性

    分区列表/partitions

    ​ RDD 里面存储着分区列表信息,分区Partition是一个逻辑概念,本质上就是为了分布式计算,分区数决定了并行计算的力度,一个分区就对应了一个 Task 任务。RDD 的并行度默认会从父 RDD 传给 子 RDD。

    ​ 可以在创建 RDD 时指定分区数,默认分区数,当从集合创建时,为分配到的资源 CPU 核数,当从 HDFS 创建时,为文件的 Block 数。

    Partition 和 Block 的关系:

    这里的 Block 不是 HDFS 里面的概念,而是 Spark 独有的概念。每个 RDD 的数据都是以 Block 的形式存储在多台机器上面的,可以说,Partition 是数据的逻辑形式,Block 就是数据的物理形式。Partition 和 Block 并不是严格的一一对应关系,Block 是和真实存储的数据强相关,而 Partition 为了高效的分布式计算,肯定会做一些优化的工作,比如某些 Block 过小,会合并起来对应一个 PartitIon,某个Block 过大,也会拆分开来对应多个 Partition。

    计算函数/compute

    每个分区都有计算函数,计算函数其实就是分区里面的数据要按照怎样的计算逻辑来处理,函数处理是以 Task 任务的形式进行的。

    依赖于其他 RDD 的列表/dependencies

    RDD每次转换都会生成新的 RDD,所以 RDD 会形成类似于流水线的前后依赖关系。这里之所以存储这些依赖信息,是为了系统的容错性考虑的,某个 RDD 出了问题,其他的 RDD 可以通过这些依赖关系转换成出问题的 RDD。RDD 的依赖关系分为窄依赖和宽依赖。

    窄依赖 VS 宽依赖

    1. 窄依赖: 上游的每一个RDD里面的 Partition 只会被下游的一个 RDD 的Partition 使用。窄依赖分为 OneToOneDependency 以及 RangeDenpendency。OneToOneDependency意味着上下游一一对应,RangeDenpendency意味着上:下=N:1,具体的可以参考下面的示意图。
      在这里插入图片描述
    1. 宽依赖:上游的每一个 RDD 里面的 Partition 都会被下游的多个 RDD 的 Partition 使用。宽依赖同时意味着 Shuffle,这里的 Shuffle 和 MapReduce 里面的 Shuffle 是一脉相承的关系。参考下面的示意图。
      在这里插入图片描述
    1. 之所以搞出来窄依赖和宽依赖的划分,是出于 Pipeline/流水线 的角度考虑,通俗点讲,就是我同一个流水线里面的数据可以在同一个进程里面连续进行处理,不需要跨进程,要知道跨进程往往意味着网络传输,不仅仅慢的一批而且还容易丢数据。

    Key-value 数据类型的 RDD分区器/partitioner

    这个控制着分区策略与分区数,每个 key-value 形式的 RDD 都有 Partitioner 属性,它决定了 RDD 如何分区。只针对 kv 形式的 RDD,用户可以自定义分区策略。

    每个分区都有一个优先位置列表

    RDD 存储了每个 Partition 的优先位置,对于 HDFS 来讲,就是每个 Partition对应的 Block 块的位置。Spark 在进行计算之前,就是通过这个信息找到具体的节点,将计算逻辑发送到这个节点,在存储数据的服务器本地进行计算。

    移动计算比移动数据更划算

    对于大数据来讲,参与计算的数据往往是 GB/TB 级别的,而计算程序本身则很小(往往KB级别),将大数据通过网络传输到计算程序所在节点上太不划算,网络开销能耗死人。所以将计算程序移动到存储数据的服务器节点上面才是正道。

    这个思想实际上我们很早就见过,大家电脑上面都装过杀毒软件,杀毒软件杀毒的时候会从远程下载病毒库到本地,这就是典型的移动计算思想。

    弹性

    弹性在RDD里面是一个很牛逼的概念,很多刚学Spark的人都不太理解弹性究竟是啥意思。实际上弹性有七层含义:

    数据存储内存和磁盘自动切换

    这个是最基本也是最好理解的,内存和磁盘的区别在哪呢——内存又贵又小但是速度比磁盘快好几个数量级!

    Spark出于的考虑肯定支持内存处理数据,但是内存的容量是有限的,Spark支持内存放不下的情况下,自动地放到磁盘里面。当然,这一步还会考虑到放置策略和优化算法,比如尽量连续存储等。

    基于Lineage的高效容错机制

    RDD是一个抽象的或者说是一个逻辑上的概念,它根本不存储实际数据(指的需要经过集群分布式计算的那些数据),它存储了RDD的由来:包括父RDD是谁,父RDD是怎样转化成当前RDD的即所谓的Lineage(血统)。

    这样做有什么好处呢?

    当发生错误的时候,比如某个RDD计算过程中发生了节点故障造成了数据丢失,那么我们根据父RDD以及转化关系就很容易找回丢失的那份数据。

    注意:

    1. 这里的数据不是存储在RDD里面的,而是RDD下面的分区所对应的物理存储,在HDFS集群上面就是block块。
    2. 这里要理解对于分布式集群来说,单节点故障,不会影响到其他节点上面存储的备份冗余数据。

    Task失败自动重试

    Task是一整个计算任务(Application)的最底层逻辑,对应的就是一个线程,同时一个分区(partition)就对应一个Task。

    Task的生命周期由TaskScheduler的实现类TaskSchedulerImpl掌控,TaskSchedulerImpl来从DAGScheduler获取到TaskSet(Task的集合),会运行这些Task,运行失败自动重试,默认重试次数为4次。

    Stage失败自动重试

    Stage本意就是阶段的意思,对应Spark来讲,如果能一直在一个节点上计算无疑效率最高,但是很多时候没有那么理想化,故Spark以Shuffle为分割点(Shuffle意味着跨节点),切分成了一个个阶段,在每个Stage中,计算任务以流水线的形式进行(Pipeline)。

    Stage的生命周期由DAGScheduler来掌控,默认重试次数也是4次。

    checkpoint/cache/persist

    ​ checkpoint是对RDD进行的标记,会产生一系列的文件,且所有父依赖都会被删除,是整个依赖的终点。

    ​ checkpoint 也是 lazy 级别的,可以通过主动或者被动的触发。cache/persist 后 RDD 工作时每个工作节点都会把计算的分片结果保存在内存或者磁盘中,下次如果对相同的 RDD 进行其他的 Action 计算,就可以重用。

    checkpoint对比 cache/persist

    • checkpoint是一个job来完成的,是执行完一个job之后,新建一个新的 job 来完成的,并不像 cache,是 job 执行过程中进行。

    • Spark 自动管理(包括创建和回收)cache 和 persist 持久化的数据,而 checkpoint 持久化的数据需要由用户自己管理

    • checkpoint 会清除 RDD 的 Lineage,避免 Lineage 过长导致序列化开销增大,而 cache 和 persist 不会清除 RDD 的 Lineage

    • checkpoint针对整个 RDD 计算链条中特别需要数据持久化的环节,进行基于 HDFS 等的数据持久化复用策略,而 cache/persist 只是存储在本地内存或者磁盘上,checkpoint 明显可靠性和容错性更高。

    对比 checkpoint cache/persist
    自动管理
    清除 Lineage
    存储 HDFS 本地内存/磁盘
    可靠性
    同一个 Job

    数据调度弹性

    Spark 将执行模型抽象为通用的有向无环图(DAG),DAG 的生成与具体的资源管理无关,当发生节点运行故障的时候,可有其他的可用节点代替故障节点运行(例如 HDFS 上的副本机制)。

    在 DAG 中,每个点就是一个任务,每一条边就是对应了 Spark 中的依赖关系。

    在 Spark 中,DAG 生成的流程关键在于回溯。在程序提交后,高层调度器(DAGScheduler)会将所有的 RDD 看成一个 Stage,然后对这个 Stage 进行从后往前的回溯,遇到 Shuffle 就断开,遇到窄依赖就归并到同一个 Stage 里面,等到所有的步骤回溯完成,便生成了一个 DAG 图。

    数据分片的高度弹性

    在计算过程中,可能会产生很多的数据碎片,这时产生一个 Partition 可能会非常小,一个 Partition 要一个 Task 去处理,一个 Task 对应一个线程,这时候就会降低系统的处理效率,所以,Spark 会讲许多小的 Partition 合并成一个较大的 Partition 去处理,同时,如果一个 Block 很大,也会考虑把 Partition 拆分成更小的数据分片,这样 Spark 就能够处理更多的批次,不容易 OOM。

    参考:

    《Spark 大数据商业实战三部曲:内核解密|商业案例|性能调优》 -王家林/段智华/夏阳 著

    个人博客:
    https://www.shockang.com

    展开全文
  • ml和mllib都是Spark中的机器学习库,目前常用的机器学习功能2个库都能满足需求。 spark官方推荐使用ml, 因为ml功能更全面更灵活,未来会主要支持ml,mllib很有可能会被废弃(据说可能是在spark3.0中deprecated)。 ...

    Spark中ml和mllib的区别

    来源:
    Spark中ml和mllib的主要区别和联系如下:

    • ml和mllib都是Spark中的机器学习库,目前常用的机器学习功能2个库都能满足需求。
    • spark官方推荐使用ml, 因为ml功能更全面更灵活,未来会主要支持ml,mllib很有可能会被废弃(据说可能是在spark3.0中deprecated)。
    • ml主要操作的是DataFrame, 而mllib操作的是RDD,也就是说二者面向的数据集不一样。相比于mllib在RDD提供的基础操作,ml在DataFrame上的抽象级别更高,数据和操作耦合度更低。
    • DataFrame和RDD什么关系?DataFrame是Dataset的子集,也就是Dataset[Row], 而DataSet是对RDD的封装,对SQL之类的操作做了很多优化。
    • 相比于mllib在RDD提供的基础操作,ml在DataFrame上的抽象级别更高,数据和操作耦合度更低。
    • ml中的操作可以使用pipeline, 跟sklearn一样,可以把很多操作(算法/特征提取/特征转换)以管道的形式串起来,然后让数据在这个管道中流动。大家可以脑补一下Linux管道在做任务组合时有多么方便。
    • ml中无论是什么模型,都提供了统一的算法操作接口,比如模型训练都是fit;不像mllib中不同模型会有各种各样的trainXXX。
    • mllib在spark2.0之后进入维护状态, 这个状态通常只修复BUG不增加新功能。

    以上就是ml和mllib的主要异同点。下面是ml和mllib逻辑回归的例子,可以对比看一下, 虽然都是模型训练和预测,但是画风很不一样。

    Spark ML中的机器学习

    PySpark.ML模块介绍

    来源:2018.01.13 13:59
    参考地址:spark.apache.org/docs/latest/api/python/pyspark.ml.html
    pyspark.ml package

    ML模块包括机器学习三个核心功能:

    (1)数据准备: 特征提取、变换、选择、分类特征的散列和自然语言处理等等;
    (2)机器学习方法: 实现了一些流行和高级的回归,分类和聚类算法;
    (3)实用程序: 统计方法,如描述性统计、卡方检验、线性代数(稀疏稠密矩阵和向量)和模型评估方法。

    目前ML模块还处于不断发展中,但是已经可以满足我们的基础数据科学任务。
    注意:官网上,标注“E”为测试阶段,不稳定,可能会产生错误失败

    ML模块三个抽象类:

    转换器(Transformer)、评估器(Estimator)和管道(Pipeline)

    数据准备

    数据查看

    df.describe().show()
    数据的查看和转换操作很多都可以利用pyspark.sql下的api接口来实现,比如一般数据预处理时候,我们会用pyspark.ml.feature module中的接口来实现,但如果不能满足,其实pyspark.sql.functions接口可能实现更多的变换。我们只要先对某一列变换,然后再加回到df.就好比在sklearn中我们会优先使用sklearn.preprocessing来完成预处理,但如果不能够满足自己的要求,我个人会使用map函数来做变换。

    数据类型的转换

    首先是在读入数据时,可以根据实际情形,设置inferschema=True,如

    df = spark.read.csv(path='/tmp/test/hour.csv', header=True, sep=',',inferSchema=True)
    

    这样会自动判断类型,否则会部为string类型。
    但是自动识别类型,有时还是不能满足需求,就需要人为转换,目前还没找到方法,不过应该是可以的。

    缺失值处理

    缺失值处理方法

    或者参考
    查看各列有无缺失值

    pyspark中查看缺失值似乎并没有太好的方法,但可以反复用df.desribe().show()或df.summary().show()来查看,因为对于’’,null是当成字符来看,与数字混在一起,要么最小,最么最最大,然后统计的时候是被排除在外的;NaN被当成最大,统计出来的均值、方差也是NaN。如下

    df=spark.sql('select * from pankoo.test')
    df.show()
    """
    +-----+-----+-----+
    |name1|name2|name3|
    +-----+-----+-----+
    |    1|     |    2|
    |    3|    4|  NaN|
    | Null|    5|    6|
    +-----+-----+-----+
    """
    df.describe().show()
    """
    +-------+------------------+------------------+-----+
    |summary|             name1|             name2|name3|
    +-------+------------------+------------------+-----+
    |  count|                 3|                 3|    3|
    |   mean|               2.0|               4.5|  NaN|
    | stddev|1.4142135623730951|0.7071067811865476|  NaN|
    |    min|                 1|                  |    2|
    |    max|              Null|                 5|  NaN|
    +-------+------------------+------------------+-----+
    """
    

    对于离散型变量,可以用select distinct xx from 。
    如果是标准的用nan表示缺失值,也可像下面那样用df.filter来查看。或者不用查看,直接进行填充。

    for i in range(len(fieldIndex)):
        df.filter('%s is null'%(fieldIndex[i])).select(fieldIndex[i]).limit(10).show()
    

    整理来讲在分布式环境下查看数据代价还是挺大的。
    利用pyspark.sql.DataFrame的接口
    利用类似下面的语句进行缺少值填充

    df.na.fill({'age': 50, 'name': 'unknown'})
    

    具体填充的值,可用类似下面的语句获取。

    #mean方法只接受数值类型的参数,Int, Long等,如果是String, Date, Timestamp 类型的话要用agg(mean(“b”))
    meanTemp=df.agg({'temp': 'avg'}).collect()[0][0]
    print(meanTemp)
    
    for i in range(len(fieldIndex)):
        meanTemp = df.agg({fieldIndex[i]: 'avg'}).collect()[0][0]
        df.na.fill({fieldIndex[i]:meanTemp})
    

    也可以用pyspark.ml.feature.Imputer(*args, **kwargs)来实现缺失值的填充
    类似于

    imputer=Imputer(strategy='mean',inputCols=['atemp'],outputCols=['out_atemp'])
    model=imputer.fit(df)
    df=model.transform(df)
    

    转换器

    pyspark.ml.Transformer
    通常通过将一个新列附加到DataFrame来转换数据。
    当从转换器的抽象类派生时,每个新的转换器类需要实现.transform()方法,该方法要求传递一个要被转换的DataFrame,该参数通常是第一个也是唯一的一个强制性参数。

    transform(dataset, params=None)
              Transforms the input dataset with optional parameters(使用可选参数转换输入数据集。).      
    Parameters: 
    **dataset** – input dataset, which is an instance of pyspark.sql.DataFrame
    **params** – an optional param map that overrides embedded params.
    Returns:    transformed dataset
    

    pyspark.ml.feature模块提供了许多的转换器:

    • pyspark.ml.feature.Binarizer(self, threshold=0.0, inputCol=None, outputCol=None)
      根据指定的阈值将连续变量转换为对应的二进制
    #-*-coding:utf-8-*-
    
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import Binarizer
    
    
    if __name__=="__main__":
        sc=SparkContext(appName="myApp")
        spark=SparkSession.builder.enableHiveSupport().getOrCreate()
        df=spark.createDataFrame([(0.5,)],["values"])#注意0.5后面有个逗号,不加会出错,不知道为什么要这样;
        """
        df.show()
        +------+
        | values |
        +------+
        | 0.5 |
        +------+
        """
        binarizer=Binarizer(threshold=1.0,inputCol="values",outputCol="features")
        bd=binarizer.transform(df)
        """
        bd.show()
        +------+-------+
        | values | features |
        +------+-------+
        | 0.5 | 0.0 |
        +------+-------+
        print(bd.head().features)
        0.0
        """
        binarizer.setParams(outputCol="freqs")
        bdf=binarizer.transform(df)
        """
        bdf.show()
        +------+-----+
        | values | freqs |
        +------+-----+
        | 0.5 | 0.0 |
        +------+-----+
        """
        params={binarizer.threshold:-0.5,binarizer.outputCol:"vectors"}
        bdf=binarizer.transform(df,params=params)
        """
        bdf.show()
        +------+-------+
        | values | vectors |
        +------+-------+
        | 0.5 | 1.0 |
        +------+-------+
        """
        temp_path="/tmp/test/"
        binarizerPath ="%sbinarizer"%(temp_path)
        binarizer.save(binarizerPath)
        loadBinarizer=Binarizer.load(binarizerPath)
        # print(loadBinarizer.getThreshold()==binarizer.getThreshold())
        # True
    
    • pyspark.ml.feature.Bucketizer(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error")
      与Binarizer类似,该方法根据阈值列表(分割的参数),将连续变量转换为多项值(连续变量离散化到指定的范围区间)
    #-*-coding:utf-8-*-
    
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import Bucketizer
    import numpy as np
    
    if __name__=="__main__":
        sc=SparkContext(appName="myApp")
        spark=SparkSession.builder.getOrCreate()
        values=[(0.1,),(0.4,),(1.2,),(1.5,),(np.nan,),(np.nan,)]
        # values = [(0.1,), (0.4,), (1.2,), (1.5,), (float("nan"),), (float("nan"),)]
        df=spark.createDataFrame(values,["values"])
        """
        df.show()
        +------+
        |values|
        +------+
        |   0.1|
        |   0.4|
        |   1.2|
        |   1.5|
        |   NaN|
        |   NaN|
        +------+
        """
        bucketizer=Bucketizer(splits=[-float("inf"),0.5,1.4,float("inf")],inputCol="values",outputCol="buckets",handleInvalid="keep")
        bdf=bucketizer.transform(df)
        """
        bdf.show()
        +------+-------+
        | values | buckets |
        +------+-------+
        | 0.1 | 0.0 |
        | 0.4 | 0.0 |
        | 1.2 | 1.0 |
        | 1.5 | 2.0 |
        | NaN | 3.0 |
        | NaN | 3.0 |
        print(bdf.head().buckets)
        0.0
        print(bdf.collect()[0].buckets)
        0.0
        print(bdf.collect()[3].buckets)
        2.0
        """
        bucketizer.setParams(outputCol="b")
        bdf=bucketizer.transform(df)
        """
        bdf.show()
        +------+---+
        | values | b |
        +------+---+
        | 0.1 | 0.0 |
        | 0.4 | 0.0 |
        | 1.2 | 1.0 |
        | 1.5 | 2.0 |
        | NaN | 3.0 |
        | NaN | 3.0 |
        +------+---+
        """
        temp_path = "/tmp/test/"
        bucketizerPath ="%sbucketizer"%(temp_path)
        bucketizer.save(bucketizerPath)
        loadBucketizer=Bucketizer.load(bucketizerPath)
        # print(loadBucketizer.getSplits()==bucketizer.getSplits() )
        # True
    
    • pyspark.ml.feature.ChiSqSelector(self, numTopFeatures=50, featuresCol="features", outputCol=None, labelCol="label", selectorType="numTopFeatures", percentile=0.1, fpr=0.05, fdr=0.05, fwe=0.05)
      对于分类目标变量(思考分类模型),此功能允许你选择预定义数量的特征(由numTopFeatures参数进行参数化),以便最好地说明目标的变化。该方法需要两部:需要.fit()——可以计算卡方检验,调用.fit()方法,将DataFrame作为参数传入返回一个ChiSqSelectorModel对象,然后可以使用该对象的.transform()方法来转换DataFrame。默认情况下,选择方法是numTopFeatures,默认顶级要素数设置为50。
    if __name__=="__main__":
        sc=SparkContext(appName="myApp")
        spark=SparkSession.builder.getOrCreate()
        df=spark.createDataFrame([(Vectors.dense([0.0, 0.0, 18.0, 1.0]),1.0),(Vectors.dense([0.0, 1.0, 12.0, 0.0]),0.0),(Vectors.dense([1.0, 0.0, 15.0, 0.1]),0.0)],["features","label"])
        """
        df.show()
        +------------------+-----+
        | features | label |
        +------------------+-----+
        | [0.0, 0.0, 18.0, 1.0] | 1.0 |
        | [0.0, 1.0, 12.0, 0.0] | 0.0 |
        | [1.0, 0.0, 15.0, 0.1] | 0.0 |
        +------------------+-----+
        """
        chiSqSelector=ChiSqSelector(numTopFeatures=1,outputCol="selectedFeatures")
        model=chiSqSelector.fit(df)
        # print(model.selectedFeatures)
        # [2]
        cdf=model.transform(df)
        """
        cdf.show()
        +------------------+-----+----------------+
        | features | label | selectedFeatures |
        +------------------+-----+----------------+
        | [0.0, 0.0, 18.0, 1.0] | 1.0 | [18.0] |
        | [0.0, 1.0, 12.0, 0.0] | 0.0 | [12.0] |
        | [1.0, 0.0, 15.0, 0.1] | 0.0 | [15.0] |
        +------------------+-----+----------------+
        """
    
    • pyspark.ml.feature.CountVectorizer(self, minTF=1.0, minDF=1.0, vocabSize=1 << 18, binary=False, inputCol=None, outputCol=None)
      用于标记文本
    >>> df = spark.createDataFrame(
    ...    [(0, ["a", "b", "c"]), (1, ["a", "b", "b", "c", "a"])],
    ...    ["label", "raw"])
    >>> cv = CountVectorizer(inputCol="raw", outputCol="vectors")
    >>> model = cv.fit(df)
    >>> model.transform(df).show(truncate=False)
    +-----+---------------+-------------------------+
    |label|raw            |vectors                  |
    +-----+---------------+-------------------------+
    |0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
    |1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
    +-----+---------------+-------------------------+
    ...
    >>> sorted(model.vocabulary) == ['a', 'b', 'c']
    True
    >>> countVectorizerPath = temp_path + "/count-vectorizer"
    >>> cv.save(countVectorizerPath)
    >>> loadedCv = CountVectorizer.load(countVectorizerPath)
    >>> loadedCv.getMinDF() == cv.getMinDF()
    True
    >>> loadedCv.getMinTF() == cv.getMinTF()
    True
    >>> loadedCv.getVocabSize() == cv.getVocabSize()
    True
    >>> modelPath = temp_path + "/count-vectorizer-model"
    >>> model.save(modelPath)
    >>> loadedModel = CountVectorizerModel.load(modelPath)
    >>> loadedModel.vocabulary == model.vocabulary
    True
    
    • pyspark.ml.feature.Imputer(*args, **kwargs)
      用于完成缺失值的插补估计器,使用缺失值所在列的平均值或中值。 输入列应该是DoubleType或FloatType。 目前的Imputer不支持分类特征,可能会为分类特征创建不正确的值。
      请注意,平均值/中值是在过滤出缺失值之后计算的。 输入列中的所有Null值都被视为缺失,所以也被归类。 为了计算中位数,使用pyspark.sql.DataFrame.approxQuantile(),相对误差为0.001。
    #-*-coding:utf-8-*-
    
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import Imputer
    from pyspark.ml.feature import ImputerModel
    
    if __name__=="__main__":
        sc=SparkContext(appName="myApp")
        spark=SparkSession.builder.getOrCreate()
        df=spark.createDataFrame([(1.0,float("nan")),(2.0,float("nan")),(float("nan"),3.0),(4.0,4.0),(5.0,5.0)],["a","b"])
        """
        df.show()
        +---+---+
        | a | b |
        +---+---+
        | 1.0 | NaN |
        | 2.0 | NaN |
        | NaN | 3.0 |
        | 4.0 | 4.0 |
        | 5.0 | 5.0 |
        +---+---+
        """
        imputer=Imputer(inputCols=["a","b"],outputCols=["out_a","out_b"])
        model=imputer.fit(df)
        """
        model.surrogateDF.show()
        #默认情况下,就是取出了ab两列的均值
        +---+---+
        | a | b |
        +---+---+
        | 3.0 | 4.0 |
        +---+---+
        """
        idf=model.transform(df)
        """
        idf.show()
        +---+---+-----+-----+
        | a | b | out_a | out_b |
        +---+---+-----+-----+
        | 1.0 | NaN | 1.0 | 4.0 |
        | 2.0 | NaN | 2.0 | 4.0 |
        | NaN | 3.0 | 3.0 | 3.0 |
        | 4.0 | 4.0 | 4.0 | 4.0 |
        | 5.0 | 5.0 | 5.0 | 5.0 |
        +---+---+-----+-----+
        """
        imputer.setStrategy("median").setMissingValue(1.0)
        model=imputer.fit(df)
        idf=model.transform(df)
        """
        idf.show()
        +---+---+-----+-----+
        | a | b | out_a | out_b |
        +---+---+-----+-----+
        | 1.0 | NaN | 4.0 | NaN |
        | 2.0 | NaN | 2.0 | NaN |
        | NaN | 3.0 | NaN | 3.0 |
        | 4.0 | 4.0 | 4.0 | 4.0 |
        | 5.0 | 5.0 | 5.0 | 5.0 |
        +---+---+-----+-----+
        """
        temp_path="/tmp/test/"
        imputer_path="%simputer"%(temp_path)
        imputer.save(imputer_path)
        loadImputer=Imputer.load(imputer_path)
        # print(loadImputer.getStrategy()==imputer.getStrategy())
        # True
        model_path="%simputermodel"%(temp_path)
        model.save(model_path)
        loadModel=ImputerModel.load(model_path)
        # print(loadModel.transform(df).head().out_a==model.transform(df).head().out_a)
        # True
    
    • pyspark.ml.feature.MaxAbsScaler(self, inputCol=None, outputCol=None)
      通过分割每个特征中的最大绝对值来单独重新缩放每个特征以范围[-1,1]。 它不会移动/居中数据,因此不会破坏任何稀疏性。
    >>> from pyspark.ml.linalg import Vectors
    >>> df = spark.createDataFrame([(Vectors.dense([1.0]),), (Vectors.dense([2.0]),)], ["a"])
    >>> maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
    >>> model = maScaler.fit(df)
    >>> model.transform(df).show()
    +-----+------+
    |    a|scaled|
    +-----+------+
    |[1.0]| [0.5]|
    |[2.0]| [1.0]|
    +-----+------+
    ...
    >>> scalerPath = temp_path + "/max-abs-scaler"
    >>> maScaler.save(scalerPath)
    >>> loadedMAScaler = MaxAbsScaler.load(scalerPath)
    >>> loadedMAScaler.getInputCol() == maScaler.getInputCol()
    True
    >>> loadedMAScaler.getOutputCol() == maScaler.getOutputCol()
    True
    >>> modelPath = temp_path + "/max-abs-scaler-model"
    >>> model.save(modelPath)
    >>> loadedModel = MaxAbsScalerModel.load(modelPath)
    >>> loadedModel.maxAbs == model.maxAbs
    True
    
    • pyspark.ml.feature.MinMaxScaler(self, min=0.0, max=1.0, inputCol=None, outputCol=None)
      使用列汇总统计信息,将每个特征单独重新标定为一个常用范围[min,max],这也称为最小 - 最大标准化或重新标定(注意由于零值可能会被转换为非零值,因此即使对于稀疏输入,转换器的输出也将是DenseVector)。 特征E的重新缩放的值被计算为,数据将被缩放到[0.0,1.0]范围内。
      For the case E_max == E_min, Rescaled(e_i) = 0.5 * (max + min)```
      
    >>> from pyspark.ml.linalg import Vectors
    >>> df = spark.createDataFrame([(Vectors.dense([0.0]),), (Vectors.dense([2.0]),)], ["a"])
    >>> mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
    >>> model = mmScaler.fit(df)
    >>> model.originalMin
    DenseVector([0.0])
    >>> model.originalMax
    DenseVector([2.0])
    >>> model.transform(df).show()
    +-----+------+
    |    a|scaled|
    +-----+------+
    |[0.0]| [0.0]|
    |[2.0]| [1.0]|
    +-----+------+
    ...
    >>> minMaxScalerPath = temp_path + "/min-max-scaler"
    >>> mmScaler.save(minMaxScalerPath)
    >>> loadedMMScaler = MinMaxScaler.load(minMaxScalerPath)
    >>> loadedMMScaler.getMin() == mmScaler.getMin()
    True
    >>> loadedMMScaler.getMax() == mmScaler.getMax()
    True
    >>> modelPath = temp_path + "/min-max-scaler-model"
    >>> model.save(modelPath)
    >>> loadedModel = MinMaxScalerModel.load(modelPath)
    >>> loadedModel.originalMin == model.originalMin
    True
    >>> loadedModel.originalMax == model.originalMax
    True
    
    • pyspark.ml.feature.Normalizer(self, p=2.0, inputCol=None, outputCol=None)
      使用给定的p范数标准化矢量以得到单位范数(默认为L2)。
    >>> from pyspark.ml.linalg import Vectors
    >>> svec = Vectors.sparse(4, {1: 4.0, 3: 3.0})
    >>> df = spark.createDataFrame([(Vectors.dense([3.0, -4.0]), svec)], ["dense", "sparse"])
    >>> normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
    >>> normalizer.transform(df).head().features
    DenseVector([0.6, -0.8])
    >>> normalizer.setParams(inputCol="sparse", outputCol="freqs").transform(df).head().freqs
    SparseVector(4, {1: 0.8, 3: 0.6})
    >>> params = {normalizer.p: 1.0, normalizer.inputCol: "dense", normalizer.outputCol: "vector"}
    >>> normalizer.transform(df, params).head().vector
    DenseVector([0.4286, -0.5714])
    >>> normalizerPath = temp_path + "/normalizer"
    >>> normalizer.save(normalizerPath)
    >>> loadedNormalizer = Normalizer.load(normalizerPath)
    >>> loadedNormalizer.getP() == normalizer.getP()
    True
    
    • pyspark.ml.feature.OneHotEncoder(self, dropLast=True, inputCol=None, outputCol=None)
      (分类列编码为二进制向量列)
      一个热门的编码器,将一列类别索引映射到一列二进制向量,每行至多有一个单值,表示输入类别索引。 例如,对于5个类别,输入值2.0将映射到[0.0,0.0,1.0,0.0]的输出向量。 最后一个类别默认不包含(可通过dropLast进行配置),因为它使向量条目总和为1,因此线性相关。 所以一个4.0的输入值映射到[0.0,0.0,0.0,0.0]。这与scikit-learn的OneHotEncoder不同,后者保留所有类别。 输出向量是稀疏的。
      用于将分类值转换为分类索引的StringIndexer.
    >>> stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
    >>> model = stringIndexer.fit(stringIndDf)
    >>> td = model.transform(stringIndDf)
    >>> encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
    >>> encoder.transform(td).head().features
    SparseVector(2, {0: 1.0})
    >>> encoder.setParams(outputCol="freqs").transform(td).head().freqs
    SparseVector(2, {0: 1.0})
    >>> params = {encoder.dropLast: False, encoder.outputCol: "test"}
    >>> encoder.transform(td, params).head().test
    SparseVector(3, {0: 1.0})
    >>> onehotEncoderPath = temp_path + "/onehot-encoder"
    >>> encoder.save(onehotEncoderPath)
    >>> loadedEncoder = OneHotEncoder.load(onehotEncoderPath)
    >>> loadedEncoder.getDropLast() == encoder.getDropLast()
    True
    

    OneHotEncoder可以实现对变量的哑变量化
    参考

    • pyspark.ml.feature.PCA(self, k=None, inputCol=None, outputCol=None)
      PCA训练一个模型将向量投影到前k个主成分的较低维空间。
    #-*-coding:utf-8-*-
    
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import PCA
    from pyspark.ml.feature import PCAModel
    from pyspark.ml.linalg import Vectors
    
    if __name__=="__main__":
        sc=SparkContext(appName="myApp")
        spark=SparkSession.builder.getOrCreate()
        data=[(Vectors.sparse(5,[(1,1.0),(3,7.0)]),),
              (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
              (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
        df=spark.createDataFrame(data,["features"])
        """
        df.show()
        +--------------------+
        | features |
        +--------------------+
        | (5, [1, 3], [1.0, 7.0]) |
        | [2.0, 0.0, 3.0, 4.0, ... |
        | [4.0, 0.0, 0.0, 6.0, ... |
        +--------------------+
        """
        pca=PCA(k=2,inputCol="features",outputCol="pcaFeatures")
        model=pca.fit(df)
        # print(model.explainedVariance)
        # [0.794393253223, 0.205606746777]
        pcadf=model.transform(df)
        """
        pcadf.show()
        +--------------------+--------------------+
        | features | pcaFeatures |
        +--------------------+--------------------+
        | (5, [1, 3], [1.0, 7.0]) | [1.64857282308838... |
        | [2.0, 0.0, 3.0, 4.0, ... | [-4.6451043317815... |
        | [4.0, 0.0, 0.0, 6.0, ... | [-6.4288805356764... |
        +--------------------+--------------------+
        """
        temp_path="/tmp/test/"
        pca_path="%spca"%(temp_path)
        pca.save(pca_path)
        pcaload=PCA.load(pca_path)
        # print(pcaload.getK()==pca.getK())
        # True
        pcaModel_path="%spcaModel"%(temp_path)
        model.save(pcaModel_path)
        modelLoad=PCAModel.load(pcaModel_path)
        # print(modelLoad.pc==model.pc)
        # True
        # print(modelLoad.explainedVariance==model.explainedVariance)
        # True
    
    • pyspark.ml.feature.QuantileDiscretizer(self, numBuckets=2, inputCol=None, outputCol=None, relativeError=0.001, handleInvalid="error")
      与Bucketizer方法类似,但不是传递分隔参数,而是传递一个numBuckets参数,然后该方法通过计算数据的近似分位数来决定分隔应该是什么。
    >>> values = [(0.1,), (0.4,), (1.2,), (1.5,), (float("nan"),), (float("nan"),)]
    >>> df = spark.createDataFrame(values, ["values"])
    >>> qds = QuantileDiscretizer(numBuckets=2,
    ...     inputCol="values", outputCol="buckets", relativeError=0.01, handleInvalid="error")
    >>> qds.getRelativeError()
    0.01
    >>> bucketizer = qds.fit(df)
    >>> qds.setHandleInvalid("keep").fit(df).transform(df).count()
    6
    >>> qds.setHandleInvalid("skip").fit(df).transform(df).count()
    4
    >>> splits = bucketizer.getSplits()
    >>> splits[0]
    -inf
    >>> print("%2.1f" % round(splits[1], 1))
    0.4
    >>> bucketed = bucketizer.transform(df).head()
    >>> bucketed.buckets
    0.0
    >>> quantileDiscretizerPath = temp_path + "/quantile-discretizer"
    >>> qds.save(quantileDiscretizerPath)
    >>> loadedQds = QuantileDiscretizer.load(quantileDiscretizerPath)
    >>> loadedQds.getNumBuckets() == qds.getNumBuckets()
    True
    
    • pyspark.ml.feature.StandardScaler(self, withMean=False, withStd=True, inputCol=None, outputCol=None)
      (标准化列,使其拥有零均值和等于1的标准差)
      通过使用训练集中样本的列汇总统计消除平均值和缩放到单位方差来标准化特征。使用校正后的样本标准偏差计算“单位标准差”,该标准偏差计算为无偏样本方差的平方根。
    >>> from pyspark.ml.linalg import Vectors
    >>> df = spark.createDataFrame([(Vectors.dense([0.0]),), (Vectors.dense([2.0]),)], ["a"])
    >>> standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
    >>> model = standardScaler.fit(df)
    >>> model.mean
    DenseVector([1.0])
    >>> model.std
    DenseVector([1.4142])
    >>> model.transform(df).collect()[1].scaled
    DenseVector([1.4142])
    >>> standardScalerPath = temp_path + "/standard-scaler"
    >>> standardScaler.save(standardScalerPath)
    >>> loadedStandardScaler = StandardScaler.load(standardScalerPath)
    >>> loadedStandardScaler.getWithMean() == standardScaler.getWithMean()
    True
    >>> loadedStandardScaler.getWithStd() == standardScaler.getWithStd()
    True
    >>> modelPath = temp_path + "/standard-scaler-model"
    >>> model.save(modelPath)
    >>> loadedModel = StandardScalerModel.load(modelPath)
    >>> loadedModel.std == model.std
    True
    >>> loadedModel.mean == model.mean
    True
    

    数据规范化、归一化、Z-Score

    • pyspark.ml.feature.VectorAssembler(self, inputCols=None, outputCol=None)
      非常有用,将多个数字(包括向量)列合并为一列向量
    #-*-coding:utf-8-*-
    
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import VectorAssembler
    
    if __name__=="__main__":
        sc=SparkContext(appName="myApp")
        spark=SparkSession.builder.getOrCreate()
        df=spark.createDataFrame([(12, 10, 3), (1, 4, 2)],["a","b","c"])
        """
        df.show()
        +---+---+---+
        | a | b | c |
        +---+---+---+
        | 12 | 10 | 3 |
        | 1 | 4 | 2 |
        +---+---+---+
        """
        vectorAssembler=VectorAssembler(inputCols=["a","b","c"],outputCol="features")
        vdf=vectorAssembler.transform(df)
        """
        vdf.show()
        +---+---+---+---------------+
        | a | b | c | features |
        +---+---+---+---------------+
        | 12 | 10 | 3 | [12.0, 10.0, 3.0] |
        | 1 | 4 | 2 | [1.0, 4.0, 2.0] |
        +---+---+---+---------------+
        """
        temp_path="/tmp/test/"
        vectorAssembler_path="%svectorAssembler"
        vectorAssembler.save(vectorAssembler_path)
        vectorAssemblerLoad=VectorAssembler.load(vectorAssembler_path)
        # print(vectorAssemblerLoad.transform(df).head().features==vectorAssembler.transform(df).head().features)
        # True
    
    • pyspark.ml.feature.VectorIndexer(self, maxCategories=20, inputCol=None, outputCol=None)类别列生成索引向量

    • pyspark.ml.feature.VectorSlicer(self, inputCol=None, outputCol=None, indices=None, names=None)作用于特征向量,给定一个索引列表,从特征向量中提取值。

    特征变换–标签和索引的转化

    来源:阮榕城 2017年12月18日
    在机器学习处理过程中,为了方便相关算法的实现,经常需要把标签数据(一般是字符串)转化成整数索引,或是在计算结束后将整数索引还原为相应的标签。

    Spark ML包中提供了几个相关的转换器,例如:StringIndexer,IndexToString,OneHotEncoder,VectorIndexer,它们提供了十分方便的特征转换功能,这些转换器类都位于pyspark.ml.feature包下。
    值得注意的是,用于特征转换的转换器和其他的机器学习算法一样,也属于ML Pipeline模型的一部分,可以用来构成机器学习流水线,以StringIndexer为例,其存储着进行标签数值化过程的相关 超参数,是一个Estimator,对其调用fit(…)方法即可生成相应的模型StringIndexerModel类,很显然,它存储了用于DataFrame进行相关处理的 参数,是一个Transformer(其他转换器也是同一原理)
    下面对几个常用的转换器依次进行介绍。

    StringIndexer

    ​StringIndexer转换器可以把一列类别型的特征(或标签)进行编码,使其数值化,索引的范围从0开始,该过程可以使得相应的特征索引化,使得某些无法接受类别型特征的算法可以使用,并提高诸如决策树等机器学习算法的效率。

    索引构建的顺序为标签的频率,优先编码频率较大的标签,所以出现频率最高的标签为0号。
    如果输入的是数值型的,我们会把它转化成字符型,然后再对其进行编码。

    首先,引入必要的包,并创建一个简单的DataFrame,它只包含一个id列和一个标签列category:

    from pyspark.ml.feature import StringIndexer
    
    df = spark.createDataFrame([(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],["id", "category"])
    

    随后,我们创建一个StringIndexer对象,设定输入输出列名,其余参数采用默认值,并对这个DataFrame进行训练,产生StringIndexerModel对象:

    indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
    model = indexer.fit(df)
    

    随后即可利用该对象对DataFrame进行转换操作,可以看到,StringIndexerModel依次按照出现频率的高低,把字符标签进行了排序,即出现最多的“a”被编号成0,“c”为1,出现最少的“b”为0。

    indexed = model.transform(df)
    indexed.show()
     
    +---+--------+-------------+
    | id|category|categoryIndex|
    +---+--------+-------------+
    |  0|       a|          0.0|
    |  1|       b|          2.0|
    |  2|       c|          1.0|
    |  3|       a|          0.0|
    |  4|       a|          0.0|
    |  5|       c|          1.0|
    +---+--------+-------------+
    

    IndexToString

    与StringIndexer相对应,IndexToString的作用是把标签索引的一列重新映射回原有的字符型标签。

    其主要使用场景一般都是和StringIndexer配合,先用StringIndexer将标签转化成标签索引,进行模型训练,然后在预测标签的时候再把标签索引转化成原有的字符标签。当然,你也可以另外定义其他的标签。

    首先,和StringIndexer的实验相同,我们用StringIndexer读取数据集中的“category”列,把字符型标签转化成标签索引,然后输出到“categoryIndex”列上,构建出新的DataFrame。

    from pyspark.ml.feature import IndexToString, StringIndexer
    
    df = spark.createDataFrame([(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],["id", "category"])
    
    indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
    model = indexer.fit(df)
    indexed = model.transform(df)
    

    随后,创建IndexToString对象,读取“categoryIndex”上的标签索引,获得原有数据集的字符型标签,然后再输出到“originalCategory”列上。最后,通过输出“originalCategory”列,可以看到数据集中原有的字符标签。

    converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
    converted = converter.transform(indexed)
    converted.show()
    """
    +---+--------+-------------+----------------+
    | id|category|categoryIndex|originalCategory|
    +---+--------+-------------+----------------+
    |  0|       a|          0.0|               a|
    |  1|       b|          2.0|               b|
    |  2|       c|          1.0|               c|
    |  3|       a|          0.0|               a|
    |  4|       a|          0.0|               a|
    |  5|       c|          1.0|               c|
    +---+--------+-------------+----------------+
    """
    converted.select("id", "categoryIndex", "originalCategory").show()
    +---+-------------+----------------+
    | id|categoryIndex|originalCategory|
    +---+-------------+----------------+
    |  0|          0.0|               a|
    |  1|          2.0|               b|
    |  2|          1.0|               c|
    |  3|          0.0|               a|
    |  4|          0.0|               a|
    |  5|          1.0|               c|
    +---+-------------+----------------+
    

    可以看到,并没有用到原来的model也能变换回来,说明传入的indexed里面保留了原来的映射关系,这样才能映射回去。
    在sklearn中是通过le = LabelEncoder(),le.fit(),然后le就可以去做映射le.transform(),最后再通过le.inverse_transform(yPred)映射回来。

    OneHotEncoder

    独热编码(One-Hot Encoding) 是指把一列类别性特征(或称名词性特征,nominal/categorical features)映射成一系列的二元连续特征的过程,原有的类别性特征有几种可能取值,这一特征就会被映射成几个二元连续特征,每一个特征代表一种取值,若该样本表现出该特征,则取1,否则取0。

    One-Hot编码适合一些期望类别特征为连续特征的算法,比如说逻辑斯蒂回归等。

    首先创建一个DataFrame,其包含一列类别性特征,需要注意的是,在使用OneHotEncoder进行转换前,DataFrame需要先使用StringIndexer将原始标签数值化:

    from pyspark.ml.feature import OneHotEncoder, StringIndexer
     
    df = spark.createDataFrame([
        (0, "a"),
        (1, "b"),
        (2, "c"),
        (3, "a"),
        (4, "a"),
        (5, "c")
    ], ["id", "category"])
     
    stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
    model = stringIndexer.fit(df)
    indexed = model.transform(df)
    

    随后,我们创建OneHotEncoder对象对处理后的DataFrame进行编码,可以看见,编码后的二进制特征呈稀疏向量形式,与StringIndexer编码的顺序相同,需注意的是最后一个Category(”b”)被编码为全0向量,若希望”b”也占有一个二进制特征,则可在创建OneHotEncoder时指定setDropLast(false)。

    encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
    encoded = encoder.transform(indexed)
    encoded.show()
     
    +---+--------+-------------+-------------+
    | id|category|categoryIndex|  categoryVec|
    +---+--------+-------------+-------------+
    |  0|       a|          0.0|(2,[0],[1.0])|
    |  1|       b|          2.0|    (2,[],[])|
    |  2|       c|          1.0|(2,[1],[1.0])|
    |  3|       a|          0.0|(2,[0],[1.0])|
    |  4|       a|          0.0|(2,[0],[1.0])|
    |  5|       c|          1.0|(2,[1],[1.0])|
    +---+--------+-------------+-------------+
    

    VectorIndexer

    之前介绍的StringIndexer是针对单个类别型特征进行转换,倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了VectorIndexer类来解决向量数据集中的类别性特征转换。

    通过为其提供maxCategories超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过maxCategories的特征需要会被认为是类别型的。

    在下面的例子中,我们读入一个数据集,然后使用VectorIndexer训练出模型,来决定哪些特征需要被作为类别特征,将类别特征转换为索引,这里设置maxCategories为10,即只有种类小10的特征才被认为是类别型特征,否则被认为是连续型特征:

    from pyspark.ml.feature import VectorIndexer
    data = spark.read.format('libsvm').load('file:///usr/local/spark/data/mllib/sample_libsvm_data.txt')
    indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
    indexerModel = indexer.fit(data)
    categoricalFeatures = indexerModel.categoryMaps
    indexedData = indexerModel.transform(data)
    indexedData.show()
    
    
    +-----+--------------------+--------------------+
    |label|            features|             indexed|
    +-----+--------------------+--------------------+
    |  0.0|(692,[127,128,129...|(692,[127,128,129...|
    |  1.0|(692,[158,159,160...|(692,[158,159,160...|
    |  1.0|(692,[124,125,126...|(692,[124,125,126...|
    |  1.0|(692,[152,153,154...|(692,[152,153,154...|
    |  1.0|(692,[151,152,153...|(692,[151,152,153...|
    |  0.0|(692,[129,130,131...|(692,[129,130,131...|
    |  1.0|(692,[158,159,160...|(692,[158,159,160...|
    |  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
    |  0.0|(692,[154,155,156...|(692,[154,155,156...|
    |  0.0|(692,[127,128,129...|(692,[127,128,129...|
    |  1.0|(692,[154,155,156...|(692,[154,155,156...|
    |  0.0|(692,[153,154,155...|(692,[153,154,155...|
    |  0.0|(692,[151,152,153...|(692,[151,152,153...|
    |  1.0|(692,[129,130,131...|(692,[129,130,131...|
    |  0.0|(692,[154,155,156...|(692,[154,155,156...|
    |  1.0|(692,[150,151,152...|(692,[150,151,152...|
    |  0.0|(692,[124,125,126...|(692,[124,125,126...|
    |  0.0|(692,[152,153,154...|(692,[152,153,154...|
    |  1.0|(692,[97,98,99,12...|(692,[97,98,99,12...|
    |  1.0|(692,[124,125,126...|(692,[124,125,126...|
    +-----+--------------------+--------------------+
    

    Spark ML 几种 归一化(规范化)方法总结

    4种不同的归一化方法:

    • Normalizer
    • StandardScaler
    • MinMaxScaler
    • MaxAbsScaler

    Normalizer

    Normalizer的作用范围是每一行,使每一个行向量的范数变换为一个单位范数,下面的示例代码都来自spark官方文档加上少量改写和注释。

    // L^1 norm将每一行的规整为1阶范数为1的向量,1阶范数即所有值绝对值之和。
    +---+--------------+------------------+
    | id| features| normFeatures|
    +---+--------------+------------------+
    | 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]|
    | 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]|
    | 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
    +---+--------------+------------------+
    
    // L^inf norm向量的无穷阶范数即向量中所有值中的最大值
    +---+--------------+--------------+
    | id| features| normFeatures|
    +---+--------------+--------------+
    | 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
    | 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
    | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
    +---+--------------+--------------+
    

    StandardScaler

    StandardScaler处理的对象是每一列,也就是每一维特征,将特征标准化为单位标准差或是0均值,或是0均值单位标准差。
    主要有两个参数可以设置:

    • withStd: 默认为真。将数据标准化到单位标准差。
    • withMean: 默认为假。是否变换为0均值。 (此种方法将产出一个稠密输出,所以不适用于稀疏输入。)

    StandardScaler需要fit数据,获取每一维的均值和标准差,来缩放每一维特征。

    StandardScaler是一个Estimator,它可以fit数据集产生一个StandardScalerModel,用来计算汇总统计。
    然后产生的模可以用来转换向量至统一的标准差以及(或者)零均值特征。
    注意如果特征的标准差为零,则该特征在向量中返回的默认值为0.0。

    // 将每一列的标准差缩放到1。
    +---+--------------+------------------------------------------------------------+
    |id |features |scaledFeatures |
    +---+--------------+------------------------------------------------------------+
    |0 |[1.0,0.5,-1.0]|[0.6546536707079772,0.09352195295828244,-0.6546536707079771]|
    |1 |[2.0,1.0,1.0] |[1.3093073414159544,0.1870439059165649,0.6546536707079771] |
    |2 |[4.0,10.0,2.0]|[2.618614682831909,1.870439059165649,1.3093073414159542] |
    +---+--------------+------------------------------------------------------------+
    

    MinMaxScaler

    MinMaxScaler作用同样是每一列,即每一维特征。将每一维特征线性地映射到指定的区间,通常是[0, 1]。

    MinMaxScaler计算数据集的汇总统计量,并产生一个MinMaxScalerModel。

    注意因为零值转换后可能变为非零值,所以即便为稀疏输入,输出也可能为稠密向量。

    该模型可以将独立的特征的值转换到指定的范围内。
    它也有两个参数可以设置:

    • min: 默认为0。指定区间的下限。
    • max: 默认为1。指定区间的上限。
    // 每维特征线性地映射,最小值映射到0,最大值映射到1。
    +--------------+-----------------------------------------------------------+
    |features |scaledFeatures |
    +--------------+-----------------------------------------------------------+
    |[1.0,0.5,-1.0]|[0.0,0.0,0.0] |
    |[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]|
    |[4.0,10.0,2.0]|[1.0,1.0,1.0] |
    +--------------+-----------------------------------------------------------+
    

    MaxAbsScaler

    MaxAbsScaler将每一维的特征变换到[-1, 1]闭区间上,通过除以每一维特征上的最大的绝对值,它不会平移整个分布,也不会破坏原来每一个特征向量的稀疏性。

    因为它不会转移/集中数据,所以不会破坏数据的稀疏性。

    // 每一维的绝对值的最大值为[4, 10, 2]
    +--------------+----------------+
    | features| scaledFeatures|
    +--------------+----------------+
    |[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|
    | [2.0,1.0,1.0]| [0.5,0.1,0.5]|
    |[4.0,10.0,2.0]| [1.0,1.0,1.0]|
    +--------------+----------------+
    

    ml中的数据统计特性

    描述性统计

    mllib中有count()、max()、min()、mean()、 normL1()、normL2() 、numNonzeros() 、 variance()等基本的统计功能,不知道为什么在ml中没有了。
    但是可以利用pysparl.sql.DataFrame来处理相关的统计

    • df.agg(*exprs)计算均值、最大值等
    • df.approxQuantile(col, probabilities, relativeError)计算分位数
    • df.corr(col1, col2, method=None)计算相关系数
    • df.describe(*cols)计算统计特性
    >>> df.describe().show()
    +-------+------------------+-----+
    |summary|               age| name|
    +-------+------------------+-----+
    |  count|                 2|    2|
    |   mean|               3.5| null|
    | stddev|2.1213203435596424| null|
    |    min|                 2|Alice|
    |    max|                 5|  Bob|
    +-------+------------------+-----+
    
    • df.summary(*statistics)
    >>> df.summary().show()
    +-------+------------------+-----+
    |summary|               age| name|
    +-------+------------------+-----+
    |  count|                 2|    2|
    |   mean|               3.5| null|
    | stddev|2.1213203435596424| null|
    |    min|                 2|Alice|
    |    25%|                 2| null|
    |    50%|                 2| null|
    |    75%|                 5| null|
    |    max|                 5|  Bob|
    +-------+------------------+-----+
    >>> df.summary("count", "min", "25%", "75%", "max").show()
    +-------+---+-----+
    |summary|age| name|
    +-------+---+-----+
    |  count|  2|    2|
    |    min|  2|Alice|
    |    25%|  2| null|
    |    75%|  5| null|
    |    max|  5|  Bob|
    +-------+---+-----+
    
    • df.dropna(how='any', thresh=None, subset=None)
    • df.fillna(value, subset=None)
    • df.avg(*cols)
    • count()
    • max(*cols)
    • mean(*cols)
    • min(*cols)
    • pivot(pivot_col, values=None)
    • sum(*cols)

    相关性

    class pyspark.ml.stat.Correlation

    #-*-coding:utf-8-*-
    
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.stat import Correlation
    
    if __name__=="__main__":
        sc=SparkContext(appName="myApp")
        spark=SparkSession.builder.getOrCreate()
        dataset=[
            [Vectors.dense([1, 0, 0, -2])],
            [Vectors.dense([4, 5, 0, 3])],
            [Vectors.dense([6, 7, 0, 8])],
            [Vectors.dense([9, 0, 0, 1])]
        ]
        df=spark.createDataFrame(dataset,["features"])
        dfCorr=Correlation.corr(dataset=df,column="features",method="pearson")
        """
        dfCorr.show()
        +--------------------+
        |   pearson(features)|
        +--------------------+
        |1.0              ...|
        +--------------------+
        print(dfCorr.collect())
        [Row(pearson(features) = DenseMatrix(4, 4, [1.0, 0.0556, nan, 0.4005, 0.0556, 1.0, nan, 0.9136, nan, nan, 1.0, nan,
                                                    0.4005, 0.9136, nan, 1.0], False))]
        """
        dfCorrelation=dfCorr.collect()[0][0]
        """
        print(print(str(dfCorrelation).replace('nan', 'NaN')))
        DenseMatrix([[ 1.        ,  0.05564149,         NaN,  0.40047142],
                     [ 0.05564149,  1.        ,         NaN,  0.91359586],
                     [        NaN,         NaN,  1.        ,         NaN],
                     [ 0.40047142,  0.91359586,         NaN,  1.        ]])
        """
    

    统计检验

    class pyspark.ml.stat.ChiSquareTest

    #-*-coding:utf-8-*-
    
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.stat import ChiSquareTest
    
    if __name__=="__main__":
        sc=SparkContext(appName="myApp")
        spark=SparkSession.builder.getOrCreate()
        dataset=[
            [0,Vectors.dense([0, 0, 1])],
            [0,Vectors.dense([1, 0, 1])],
            [1,Vectors.dense([2, 1, 1])],
            [1,Vectors.dense([3, 1, 1])]
        ]
        df=spark.createDataFrame(dataset,["label","features"])
        dfChiSq=ChiSquareTest.test(df,featuresCol="features",labelCol="label")
        """
        dfChiSq.show()
        +--------------------+----------------+-------------+                           
        |             pValues|degreesOfFreedom|   statistics|
        +--------------------+----------------+-------------+
        |[0.26146412994911...|       [3, 1, 0]|[4.0,4.0,0.0]|
        +--------------------+----------------+-------------+
        """
    
    展开全文
  • spark入门介绍(菜鸟必看)

    万次阅读 2015-11-25 22:13:04
    什么Spark Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。 与Hadoop和Storm等其他大数据和...

    什么是Spark

    Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。

    与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势。

    首先,Spark为我们提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。

    Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍,甚至能够将应用在磁盘上的运行速度提升10倍。

    Spark让开发者可以快速的用Java、Scala或Python编写程序。它本身自带了一个超过80个高阶操作符集合。而且还可以用它在shell中以交互式地查询数据。

    除了Map和Reduce操作之外,它还支持SQL查询,流数据,机器学习和图表数据处理。开发者可以在一个数据管道用例中单独使用某一能力或者将这些能力结合在一起使用。

    在这个Apache Spark文章系列的第一部分中,我们将了解到什么是Spark,它与典型的MapReduce解决方案的比较以及它如何为大数据处理提供了一套完整的工具。

    Hadoop和Spark

    Hadoop这项大数据处理技术大概已有十年历史,而且被看做是首选的大数据集合处理的解决方案。MapReduce是一路计算的优秀解决方案,不过对于需要多路计算和算法的用例来说,并非十分高效。数据处理流程中的每一步都需要一个Map阶段和一个Reduce阶段,而且如果要利用这一解决方案,需要将所有用例都转换成MapReduce模式。

    在下一步开始之前,上一步的作业输出数据必须要存储到分布式文件系统中。因此,复制和磁盘存储会导致这种方式速度变慢。另外Hadoop解决方案中通常会包含难以安装和管理的集群。而且为了处理不同的大数据用例,还需要集成多种不同的工具(如用于机器学习的Mahout和流数据处理的Storm)。

    如果想要完成比较复杂的工作,就必须将一系列的MapReduce作业串联起来然后顺序执行这些作业。每一个作业都是高时延的,而且只有在前一个作业完成之后下一个作业才能开始启动。

    而Spark则允许程序开发者使用有向无环图(DAG)开发复杂的多步数据管道。而且还支持跨有向无环图的内存数据共享,以便不同的作业可以共同处理同一个数据。

    Spark运行在现有的Hadoop分布式文件系统基础之上(HDFS)提供额外的增强功能。它支持将Spark应用部署到现存的Hadoop v1集群(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN集群甚至是Apache Mesos之中。

    我们应该将Spark看作是Hadoop MapReduce的一个替代品而不是Hadoop的替代品。其意图并非是替代Hadoop,而是为了提供一个管理不同的大数据用例和需求的全面且统一的解决方案。

    Spark特性

    Spark通过在数据处理过程中成本更低的洗牌(Shuffle)方式,将MapReduce提升到一个更高的层次。利用内存数据存储和接近实时的处理能力,Spark比其他的大数据处理技术的性能要快很多倍。

    Spark还支持大数据查询的延迟计算,这可以帮助优化大数据处理流程中的处理步骤。Spark还提供高级的API以提升开发者的生产力,除此之外还为大数据解决方案提供一致的体系架构模型。

    Spark将中间结果保存在内存中而不是将其写入磁盘,当需要多次处理同一数据集时,这一点特别实用。Spark的设计初衷就是既可以在内存中又可以在磁盘上工作的执行引擎。当内存中的数据不适用时,Spark操作符就会执行外部操作。Spark可以用于处理大于集群内存容量总和的数据集。

    Spark会尝试在内存中存储尽可能多的数据然后将其写入磁盘。它可以将某个数据集的一部分存入内存而剩余部分存入磁盘。开发者需要根据数据和用例评估对内存的需求。Spark的性能优势得益于这种内存中的数据存储。

    Spark的其他特性包括:

    • 支持比Map和Reduce更多的函数。
    • 优化任意操作算子图(operator graphs)。
    • 可以帮助优化整体数据处理流程的大数据查询的延迟计算。
    • 提供简明、一致的Scala,Java和Python API。
    • 提供交互式Scala和Python Shell。目前暂不支持Java。

    Spark是用Scala程序设计语言编写而成,运行于Java虚拟机(JVM)环境之上。目前支持如下程序设计语言编写Spark应用:

    • Scala
    • Java
    • Python
    • Clojure
    • R

    Spark生态系统

    除了Spark核心API之外,Spark生态系统中还包括其他附加库,可以在大数据分析和机器学习领域提供更多的能力。

    这些库包括:

    • Spark Streaming:
      • Spark Streaming基于微批量方式的计算和处理,可以用于处理实时的流数据。它使用DStream,简单来说就是一个弹性分布式数据集(RDD)系列,处理实时数据。
    • Spark SQL:
      • Spark SQL可以通过JDBC API将Spark数据集暴露出去,而且还可以用传统的BI和可视化工具在Spark数据上执行类似SQL的查询。用户还可以用Spark SQL对不同格式的数据(如JSON,Parquet以及数据库等)执行ETL,将其转化,然后暴露给特定的查询。
    • Spark MLlib:
      • MLlib是一个可扩展的Spark机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。
    • Spark GraphX:
      • GraphX是用于图计算和并行图计算的新的(alpha)Spark API。通过引入弹性分布式属性图(Resilient Distributed Property Graph),一种顶点和边都带有属性的有向多重图,扩展了Spark RDD。为了支持图计算,GraphX暴露了一个基础操作符集合(如subgraph,joinVertices和aggregateMessages)和一个经过优化的Pregel API变体。此外,GraphX还包括一个持续增长的用于简化图分析任务的图算法和构建器集合。

    除了这些库以外,还有一些其他的库,如BlinkDB和Tachyon。

    BlinkDB是一个近似查询引擎,用于在海量数据上执行交互式SQL查询。BlinkDB可以通过牺牲数据精度来提升查询响应时间。通过在数据样本上执行查询并展示包含有意义的错误线注解的结果,操作大数据集合。

    Tachyon是一个以内存为中心的分布式文件系统,能够提供内存级别速度的跨集群框架(如Spark和MapReduce)的可信文件共享。它将工作集文件缓存在内存中,从而避免到磁盘中加载需要经常读取的数据集。通过这一机制,不同的作业/查询和框架可以以内存级的速度访问缓存的文件。
    此外,还有一些用于与其他产品集成的适配器,如Cassandra(Spark Cassandra 连接器)和R(SparkR)。Cassandra Connector可用于访问存储在Cassandra数据库中的数据并在这些数据上执行数据分析。

    下图展示了在Spark生态系统中,这些不同的库之间的相互关联。

    图1. Spark框架中的库

    我们将在这一系列文章中逐步探索这些Spark库

    Spark体系架构

    Spark体系架构包括如下三个主要组件:

    • 数据存储
    • API
    • 管理框架

    接下来让我们详细了解一下这些组件。

    数据存储:

    Spark用HDFS文件系统存储数据。它可用于存储任何兼容于Hadoop的数据源,包括HDFS,HBase,Cassandra等。

    API:

    利用API,应用开发者可以用标准的API接口创建基于Spark的应用。Spark提供Scala,Java和Python三种程序设计语言的API。

    下面是三种语言Spark API的网站链接。

    资源管理:

    Spark既可以部署在一个单独的服务器也可以部署在像Mesos或YARN这样的分布式计算框架之上。

    下图2展示了Spark体系架构模型中的各个组件。

    图2 Spark体系架构

    弹性分布式数据集

    弹性分布式数据集(基于Matei的研究论文)或RDD是Spark框架中的核心概念。可以将RDD视作数据库中的一张表。其中可以保存任何类型的数据。Spark将数据存储在不同分区上的RDD之中。

    RDD可以帮助重新安排计算并优化数据处理过程。

    此外,它还具有容错性,因为RDD知道如何重新创建和重新计算数据集。

    RDD是不可变的。你可以用变换(Transformation)修改RDD,但是这个变换所返回的是一个全新的RDD,而原有的RDD仍然保持不变。

    RDD支持两种类型的操作:

    • 变换(Transformation)
    • 行动(Action)

    变换:变换的返回值是一个新的RDD集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个RDD作为参数,然后返回一个新的RDD。

    变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。

    行动:行动操作计算并返回一个新的值。当在一个RDD对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。

    行动操作包括:reduce,collect,count,first,take,countByKey以及foreach。

    如何安装Spark

    安装和使用Spark有几种不同方式。你可以在自己的电脑上将Spark作为一个独立的框架安装或者从诸如Cloudera,HortonWorks或MapR之类的供应商处获取一个Spark虚拟机镜像直接使用。或者你也可以使用在云端环境(如Databricks Cloud)安装并配置好的Spark。

    在本文中,我们将把Spark作为一个独立的框架安装并在本地启动它。最近Spark刚刚发布了1.2.0版本。我们将用这一版本完成示例应用的代码展示。

    如何运行Spark

    当你在本地机器安装了Spark或使用了基于云端的Spark后,有几种不同的方式可以连接到Spark引擎。

    下表展示了不同的Spark运行模式所需的Master URL参数。

    如何与Spark交互

    Spark启动并运行后,可以用Spark shell连接到Spark引擎进行交互式数据分析。Spark shell支持Scala和Python两种语言。Java不支持交互式的Shell,因此这一功能暂未在Java语言中实现。

    可以用spark-shell.cmd和pyspark.cmd命令分别运行Scala版本和Python版本的Spark Shell。

    Spark网页控制台

    不论Spark运行在哪一种模式下,都可以通过访问Spark网页控制台查看Spark的作业结果和其他的统计数据,控制台的URL地址如下:

    http://localhost:4040

    Spark控制台如下图3所示,包括Stages,Storage,Environment和Executors四个标签页

    (点击查看大图)

    图3. Spark网页控制台

    共享变量

    Spark提供两种类型的共享变量可以提升集群环境中的Spark程序运行效率。分别是广播变量和累加器。

    广播变量:广播变量可以在每台机器上缓存只读变量而不需要为各个任务发送该变量的拷贝。他们可以让大的输入数据集的集群拷贝中的节点更加高效。

    下面的代码片段展示了如何使用广播变量。

    //
    // Broadcast Variables
    //
    val broadcastVar = sc.broadcast(Array(1, 2, 3))
    broadcastVar.value

    累加器:只有在使用相关操作时才会添加累加器,因此它可以很好地支持并行。累加器可用于实现计数(就像在MapReduce中那样)或求和。可以用add方法将运行在集群上的任务添加到一个累加器变量中。不过这些任务无法读取变量的值。只有驱动程序才能够读取累加器的值。

    下面的代码片段展示了如何使用累加器共享变量:

    //
    // Accumulators
    //
    
    val accum = sc.accumulator(0, "My Accumulator")
    
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
    
    accum.value

    Spark应用示例

    本篇文章中所涉及的示例应用是一个简单的字数统计应用。这与学习用Hadoop进行大数据处理时的示例应用相同。我们将在一个文本文件上执行一些数据分析查询。本示例中的文本文件和数据集都很小,不过无须修改任何代码,示例中所用到的Spark查询同样可以用到大容量数据集之上。

    为了让讨论尽量简单,我们将使用Spark Scala Shell。

    首先让我们看一下如何在你自己的电脑上安装Spark。

    前提条件:

    • 为了让Spark能够在本机正常工作,你需要安装Java开发工具包(JDK)。这将包含在下面的第一步中。
    • 同样还需要在电脑上安装Spark软件。下面的第二步将介绍如何完成这项工作。

    注:下面这些指令都是以Windows环境为例。如果你使用不同的操作系统环境,需要相应的修改系统变量和目录路径已匹配你的环境。

    I. 安装JDK

    1)从Oracle网站上下载JDK。推荐使用JDK 1.7版本

    将JDK安装到一个没有空格的目录下。对于Windows用户,需要将JDK安装到像c:\dev这样的文件夹下,而不能安装到“c:\Program Files”文件夹下。“c:\Program Files”文件夹的名字中包含空格,如果软件安装到这个文件夹下会导致一些问题。

    注:不要在“c:\Program Files”文件夹中安装JDK或(第二步中所描述的)Spark软件。

    2)完成JDK安装后,切换至JDK 1.7目录下的”bin“文件夹,然后键入如下命令,验证JDK是否正确安装:

    java -version

    如果JDK安装正确,上述命令将显示Java版本。

    II. 安装Spark软件:

    Spark网站上下载最新版本的Spark。在本文发表时,最新的Spark版本是1.2。你可以根据Hadoop的版本选择一个特定的Spark版本安装。我下载了与Hadoop 2.4或更高版本匹配的Spark,文件名是spark-1.2.0-bin-hadoop2.4.tgz。

    将安装文件解压到本地文件夹中(如:c:\dev)。

    为了验证Spark安装的正确性,切换至Spark文件夹然后用如下命令启动Spark Shell。这是Windows环境下的命令。如果使用Linux或Mac OS,请相应地编辑命令以便能够在相应的平台上正确运行。

    c:
    cd c:\dev\spark-1.2.0-bin-hadoop2.4
    bin\spark-shell

    如果Spark安装正确,就能够在控制台的输出中看到如下信息。

    ….
    15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server
    15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 1.2.0
          /_/
    
    Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
    Type in expressions to have them evaluated.
    Type :help for more information.
    ….
    15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager
    15/01/17 23:17:53 INFO SparkILoop: Created spark context..
    Spark context available as sc.

    可以键入如下命令检查Spark Shell是否工作正常。

    sc.version

    (或)

    sc.appName

    完成上述步骤之后,可以键入如下命令退出Spark Shell窗口:

    :quit

    如果想启动Spark Python Shell,需要先在电脑上安装Python。你可以下载并安装Anaconda,这是一个免费的Python发行版本,其中包括了一些比较流行的科学、数学、工程和数据分析方面的Python包。

    然后可以运行如下命令启动Spark Python Shell:

    c:
    cd c:\dev\spark-1.2.0-bin-hadoop2.4
    bin\pyspark

    Spark示例应用

    完成Spark安装并启动后,就可以用Spark API执行数据分析查询了。

    这些从文本文件中读取并处理数据的命令都很简单。我们将在这一系列文章的后续文章中向大家介绍更高级的Spark框架使用的用例。

    首先让我们用Spark API运行流行的Word Count示例。如果还没有运行Spark Scala Shell,首先打开一个Scala Shell窗口。这个示例的相关命令如下所示:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
     
    val txtFile = "README.md"
    val txtData = sc.textFile(txtFile)
    txtData.cache()

    我们可以调用cache函数将上一步生成的RDD对象保存到缓存中,在此之后Spark就不需要在每次数据查询时都重新计算。需要注意的是,cache()是一个延迟操作。在我们调用cache时,Spark并不会马上将数据存储到内存中。只有当在某个RDD上调用一个行动时,才会真正执行这个操作。

    现在,我们可以调用count函数,看一下在文本文件中有多少行数据。

    txtData.count()

    然后,我们可以执行如下命令进行字数统计。在文本文件中统计数据会显示在每个单词的后面。

    val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    
    wcData.collect().foreach(println)

    如果想查看更多关于如何使用Spark核心API的代码示例,请参考网站上的Spark文档

    后续计划

    在后续的系列文章中,我们将从Spark SQL开始,学习更多关于Spark生态系统的其他部分。之后,我们将继续了解Spark Streaming,Spark MLlib和Spark GraphX。我们也会有机会学习像Tachyon和BlinkDB等框架。

    小结

    在本文中,我们了解了Apache Spark框架如何通过其标准API帮助完成大数据处理和分析工作。我们还对Spark和传统的MapReduce实现(如Apache Hadoop)进行了比较。Spark与Hadoop基于相同的HDFS文件存储系统,因此如果你已经在Hadoop上进行了大量投资和基础设施建设,可以一起使用Spark和MapReduce。

    此外,也可以将Spark处理与Spark SQL、机器学习以及Spark Streaming结合在一起。关于这方面的内容我们将在后续的文章中介绍。

    利用Spark的一些集成功能和适配器,我们可以将其他技术与Spark结合在一起。其中一个案例就是将Spark、Kafka和Apache Cassandra结合在一起,其中Kafka负责输入的流式数据,Spark完成计算,最后Cassandra NoSQL数据库用于保存计算结果数据。

    不过需要牢记的是,Spark生态系统仍不成熟,在安全和与BI工具集成等领域仍然需要进一步的改进。

    参考文献

    关于作者

    Srini Penchikala目前是一家金融服务机构的软件架构师,这个机构位于德克萨斯州的奥斯汀。他在软件系统架构、设计和开发方面有超过20年的经验。Srini目前正在撰写一本关于NoSQL数据库模式的书。他还是曼宁出版社出版的《Spring Roo in Action》一书的合著者(http://www.manning.com/SpringRooinAction)。他还曾经出席各种会议,如JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now和Project World Conference等。Srini还在InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net以及JavaWorld等网站上发表过很多关于软件系统架构、安全和风险管理以及NoSQL数据库等方面的文章。他还是InfoQ NoSQL数据库社区的责任编辑

     

    查看英文原文:Big Data Processing with Apache Spark – Part 1: Introduction

    展开全文
  • Lean Apache Spark 2 Apache Spark 2.x Cookbook,第2版 Learning Spark Streaming Apache Spark 2.x for Java Developers Scala and Spark for Big Data Analytics High Performance Spark完整版 Machine Learni...

    目录

    Lean Apache Spark 2

    本书于2017-03由Packt Publishing出版,作者Muhammad Asif Abbasi,全书356页。

    通过本书你将学到以下知识:

    • 概述大数据分析及其对组织和数据专业人员的重要性
    • 深入了解Spark,了解它与现有处理平台的区别
    • 了解各种文件格式的复杂性,以及如何使用Apache Spark处理它们。
    • 实现如何使用YARN,MESOS或独立集群管理器部署Spark。
    • 了解Spark SQL,SchemaRDD,缓存以及使用Hive和Parquet文件格式的概念
    • 了解Spark MLLib的架构,同时讨论Spark附带的一些现成算法。
    • 介绍一下SparkR的部署和使用情况。
    • 了解图形计算和市场上可用的图形处理系统的重要性
    • 通过使用ALS使用Spark构建推荐引擎来检查Spark的真实示例。
    • 使用Telco数据集,使用随机森林预测客户流失。

    Apache Spark 2.x Cookbook,第2版

    本书适合数据工程师,数据科学家以及那些想使用Spark的读者。阅读本书之前最好有Scala的编程基础。通过本书你将学到以下知识:

    • 在AWS上使用各种集群管理器安装和配置Apache Spark
    • 为Apache Spark设置开发环境,包括Databricks Cloud笔记本
    • 了解如何使用模式在Spark中操作数据
    • 使用Spark Streaming和Structured Streaming掌握实时流分析
    • 使用MLlib掌握监督学习和无监督学习
    • 使用MLlib构建推荐引擎
    • 使用GraphX和GraphFrames库进行图形处理
    • 开发一组通用应用程序或项目类型,以及解决复杂大数据问题的解决方案

    Learning Spark Streaming

    通过本书你将学到以下知识

    • 了解Spark流媒体是如何适应全局的
    • 学习核心概念,如Spark RDDs、Spark流集群和DStream的基础知识
    • 了解如何创建健壮的部署
    • 深入流算法
    • 学习如何调优,测量和监测火花流

    Apache Spark 2.x for Java Developers

    通过本书你将学到以下知识

    • 使用不同的文件格式处理数据,例如XML、JSON、CSV和纯文本,使用Spark core库。
    • 使用Spark流媒体库对来自各种数据源的数据进行分析,例如Kafka和Flume
    • 学习使用各种SQL函数(包括Spark SQL库中的窗口函数)创建SQL模式和分析结构化数据
    • 在实现机器学习技术以解决实际问题的同时,探索Spark Mlib api
    • 了解Spark GraphX,这样您就可以了解使用Spark执行的各种基于图形的分析

    Scala and Spark for Big Data Analytics

    通过本书你将学到以下知识

    • 了解Scala面向对象和函数式编程的概念
    • 深入了解Scala集合api
    • 使用RDD和DataFrame学习Spark的核心抽象
    • 使用SparkSQL和GraphX分析结构化和非结构化数据
    • 使用Spark结构化流进行可伸缩的容错流应用程序开发
    • 学习机器学习的最佳实践,分类,回归,降维,和推荐系统,以建立预测模型与广泛使用的算法在Spark MLlib & ML
      构建集群模型来集群大量数据
    • 了解Spark应用程序的调优、调试和监视
      在独立集群、Mesos和YARN上部署Spark应用程序

    High Performance Spark完整版

    本书适合软件工程师、数据工程师、开发者以及Spark系统管理员的使用。通过本数你可以学到:

    • 了解如何使Spark作业运行速度更快;
    • 使用Spark探索数据;
    • 使用Spark处理更大的数据集;
    • 减少管道运行时间以获得更快的洞察力。

    Machine Learning with Spark Second Edition

    • 接触最新版本的Spark ML
    • 用Scala和Python创建您的第一个Spark程序
    • 在您自己的计算机上以及在Amazon EC2上为Spark设置和配置开发环境
    • 访问公共机器学习数据集并使用Spark加载、处理、清理和转换数据
    • 使用Spark机器学习库通过使用众所周知的机器学习模型来实现程序
    • 处理大规模的文本数据,包括特征提取和使用文本数据作为机器学习模型的输入
    • 编写Spark函数来评估机器学习模型的性能

    展开全文
  • spark的四大核心组件

    万次阅读 2018-07-13 13:13:15
    spark的4大结构未完待续。。。
  • 高效搭建Spark完全分布式集群

    千次阅读 2014-07-18 00:08:35
    本文详细总结Spark分布式集群的安装步骤,帮助想要学习Spark的技术爱好者快速搭建Spark学习研究环境。
  • 解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。...
  • 子雨大数据之Spark入门教程

    万次阅读 2017-07-31 17:53:50
    林子雨老师与其团队做的技术分享,值得去好好研究下 林子雨老师 2016年10月30日 ...Spark最初诞生于美国加州大学伯克利分校(UC Berkeley)的AMP实验室,是一个可应用于大规模数据处理的快速、通用引擎。2013年,
  • Spark MLlib 机器学习算法与源码解析》spark是一个开源集群运算框架,最初是由加州大学柏克利分校AMPLab所开发。Spark使用了内存内运算技术,在内存上的运算速度比Hadoop MapReduce的运算速度快上100倍,即便是在...
  • Spark1.0.0 学习路线

    千次阅读 多人点赞 2014-07-23 16:22:30
    Spark1.0.0系列博客之引导篇,从预览篇、原理篇、运维篇、生态环境、源码篇、开发篇六个角度来介绍spark1.0.0。
  • 适合小白入门Spark的全面教程

    万次阅读 2018-08-09 00:20:20
    问题导读1.spark有哪些使用场景?2.spark有包含哪些组件?3.spark在哪些厂商已经应用?4.spark如何实现地震检测?Apache Spark是一个用于实...
  • 史上最简单的spark系列教程 | 完结

    万次阅读 多人点赞 2019-05-10 11:07:43
    教程参考自:《spark官方文档》,《spark快速分析》,《图解Spark核心技术与案例实战》,结合个人开发和记录中的坑,整合而成,教程通俗易懂,摒弃大量难懂的理论知识,整合为代码+精简理论的文章,涵盖Kafka,zookeeper,spark...
  • Spark初级入门(4):Scala 类和对象

    千人学习 2019-06-24 13:19:07
    能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器学习及图计算的原理。 讲师介绍:周志湖,电子科技大学计算机...
  • Spark能做什么

    万次阅读 2017-08-22 16:14:45
    在已经有了一定基础后,继续学习新东西的时候,最首先的就是弄清楚要学的东西能做什么,跟自己已有的知识有什么挂钩的地方。现在打算学习Spark,所以接下来会介绍Spark的最基础的入门。一、​Spark有些什么?​1....
  • 大数据学习路线 java(Java se,javaweb) Linux(shell,高并发架构,lucene,solr) Hadoop(Hadoop,HDFS,Mapreduce,yarn,hive,hbase,sqoop,zookeeper,flume) 机器学习(R,mahout) ...Spark(scala,spark,spark core,s...
  • 大数据技术学习路线

    万次阅读 多人点赞 2018-07-26 10:13:53
    如果你看完有信心能坚持学习的话,那就当下开始行动吧! 一、大数据技术基础 1、linux操作基础 linux系统简介与安装 linux常用命令–文件操作 linux常用命令–用户管理与权限 linux常用命令–系统管理 linux...
  • 大数据学习路线(完整详细版)

    万次阅读 2019-04-08 23:05:16
    java(Java se,javaweb)Linux(shell,高并发架构,lucene,solr)Hadoop(Hadoop,HDFS,Mapreduce,yarn,hive,hbase,sqoop,zookeeper,flume)机器学习(R,mahout)Storm(Storm,kafka,redis)Spark(scala,spark,spark core,spark ...
  • Spark Streaming:大规模流式数据处理

    万次阅读 2014-07-08 10:45:01
    转自:http://www.csdn.net/article/2014-01-27/2818282-Spark-Streaming-big-data
  • spark和flink的异同。

    万次阅读 2018-04-24 13:15:44
    今天在StackOverflow上看到一个问题解答,很好的解释了spark和flink的区别。转到这里并做整理存以笔记以备日后学而时习之。原文地址:...
  • Spark 生态系统组件

    万次阅读 2017-01-04 13:43:48
    引言:随着大数据技术的发展,实时流计算、机器学习、图计算等领域成为较热的研究方向,而Spark作为大数据处理的“利器”有着较为成熟的生态圈,能够一站式解决类似场景的问题。那么Spark生态系统中有哪些组件你知道...
1 2 3 4 5 ... 20
收藏数 23,409
精华内容 9,363
关键字:

spark学习使用什么软件