pyspark 订阅
PySpark 是 Spark 为 Python 开发者提供的 API。以下是 PySpark 提供的每个模块每个类的详解及示例代码。 展开全文
PySpark 是 Spark 为 Python 开发者提供的 API。以下是 PySpark 提供的每个模块每个类的详解及示例代码。
信息
外文名
PySpark
最新版本
2.1.0
含    义
Spark为 Python开发者提供的API
对应的版本
0.10.4
PySpark子模块
PySpark 是 Spark 为 Python 开发者提供的 API [1]  ,位于 $SPARK_HOME/bin 目录,其依赖于 Py4J。随Spark 2.1.0发布的 Py4J位于 $SPARK_HOME/python/lib 目录,对应的版本是 0.10.4。pyspark.sql 模块pyspark.streaming 模块pyspark.ml 包pyspark.mllib 包
收起全文
精华内容
下载资源
问答
  • pyspark

    万次阅读 2018-03-13 21:33:12
    本文主要介绍python如何通过pyspark的API操作spark

    本文主要介绍python如何通过pysparkAPI操作spark

     

    Spark安装略,下载解压配置下就OK 我使用的是spark-2.2.0-bin-hadoop2.7

    安装完毕后需要配置一下SPARK_HOME:

    SPARK_HOME=C:\spark\spark-2.2.0-bin-hadoop2.7

    Path里也要记得添加一下:

    Path=XXXX;%SPARK_HOME%\bin;

     

     

    PythonSpark交互主要用到pyspark这个模块,所以需要准备好扩展包,详细请参考《机器学习入门前准备

    Whl安装好后,能得到一个py4j文件夹,但是还需要pyspark模块这个文件夹里的内容,pyspark的获得更简单,直接去复制spark-2.2.0-bin-hadoop2.7/python/pyspark就好了。

    PS:在某些版本的pyspark调用时会出现,自己稍微查下原因,网上都有配套的py文件可以覆盖,这里不是本文的重点,所以略过。

     

     

    我们在《Spark原理详解》中介绍过,RDD分为转化(transformation)和动作(action)两种操作。RDD是基于当前的partitions生成新的partitions;动作是基于当前的partitions生成返回对象(数值、集合、字典等)。所以在通过python调用sparkAPI时需要搞清楚返回值是什么。如果返回的是partitions,调用collect()函数可以拿到封装后的数据集,分区部分对客户端是透明的,也可以调用glom()来关心具体的分区情况。如果调用的是action那么就简单得多,API直接返回结果内容。


    MapReduce API

    最典型,也是最基本的入门API

    from pyspark import SparkContext
    
    sc = SparkContext('local')
    #第二个参数2代表的是分区数,默认为1
    old=sc.parallelize([1,2,3,4,5],2)
    newMap = old.map(lambda x:(x,x**2))
    newReduce = old.reduce(lambda a,b : a+b)
    print(newMap.glom().collect())
    print(newReduce)
    

    结果是:

    [[(1, 1), (2, 4)], [(3, 9), (4, 16), (5, 25)]]
    15
    

    SparkContext是代码的核心,初始化时需要设置spark的启动类型,分为localMesosYARNStandalone模式(详见Spark原理详解

    Mapreduce里都要设置一个function,我们这里用了lambda匿名函数来实现。从结果可以看将前两和后三个分别放在了1个分区中,reduce是个action直接返回的是keysum

    预留问题:能否reduce按第二行进行求和合并,how

     

     

    flatMapfilterdistinc API

    数据的拆分、过滤和去重

    sc = SparkContext('local') 
    old=sc.parallelize([1,2,3,4,5])
    #新的map里将原来的每个元素拆成了3个
    newFlatPartitions = old.flatMap(lambda x : (x, x+1, x*2))
    #过滤,只保留小于6的元素
    newFilterPartitions = newFlatPartitions.filter(lambda x: x<6)
    #去重
    newDiscinctPartitions = newFilterPartitions.distinct()
    print(newFlatPartitions.collect())
    print(newFilterPartitions.collect())
    print(newDiscinctPartitions.collect())
    

    结果:

    [1, 2, 2, 2, 3, 4, 3, 4, 6, 4, 5, 8, 5, 6, 10]
    [1, 2, 2, 2, 3, 4, 3, 4, 4, 5, 5]
    [1, 2, 3, 4, 5]
    


     

    SampletaskSamplesampleByKey API

    数据的抽样,在机器学习中十分实用的功能,而它们有的是传输有的是动作,需要留意这个区别。

    代码:

    sc = SparkContext('local')
    old=sc.parallelize(range(8))
    samplePartition = [old.sample(withReplacement=True, fraction=0.5) for i in range(5)]
    for num, element in zip(range(len(samplePartition)), samplePartition) :
        print('sample: %s y=%s' %(str(num),str(element.collect())))
    taskSamplePartition  = [old.takeSample(withReplacement=False, num=4) for i in range(5)]
    for num, element in zip(range(len(taskSamplePartition)), taskSamplePartition) :
        #注意因为是action,所以element是集合对象,而不是rdd的分区
        print('taskSample: %s y=%s' %(str(num),str(element)))
    mapRdd = sc.parallelize([('B',1),('A',2),('C',3),('D',4),('E',5)])
    y = [mapRdd.sampleByKey(withReplacement=False,
                            fractions={'A':0.5, 'B':1, 'C':0.2, 'D':0.6, 'E':0.8}) for i in range(5)]
    for num, element in zip(range(len(y)), y) :
        #注意因为是action,所以element是集合对象,而不是rdd的分区
        print('y: %s y=%s' %(str(num),str(element.collect())))
    

    结果:

    sample: 0 y=[2, 5]
    sample: 1 y=[0, 3, 3, 6]
    sample: 2 y=[0, 4, 7]
    sample: 3 y=[1, 3, 3, 3, 6, 7]
    sample: 4 y=[2, 4, 6]
    taskSample: 0 y=[3, 4, 1, 6]
    taskSample: 1 y=[2, 5, 3, 4]
    taskSample: 2 y=[7, 1, 2, 5]
    taskSample: 3 y=[6, 3, 1, 2]
    taskSample: 4 y=[4, 6, 5, 0]
    y: 0 y=[('B', 1)]
    y: 1 y=[('B', 1), ('D', 4), ('E', 5)]
    y: 2 y=[('B', 1), ('A', 2), ('C', 3), ('E', 5)]
    y: 3 y=[('B', 1), ('A', 2), ('D', 4), ('E', 5)]
    y: 4 y=[('B', 1), ('A', 2), ('C', 3), ('E', 5)]
    

    有几个参数需要说明下:

    withReplacement代表取值后是否重新放回元素池,也就决定了某元素能否重复出现。

    Fraction代表每个元素被取出来的概率。

    Num代表取出元素的个数。

     

     

    交集intersection、并集union、排序sortBy API

    sc = SparkContext('local')
    rdd1 = sc.parallelize(['C','A','B','B'])
    rdd2 = sc.parallelize(['A','A','D','E','B'])
    rdd3 = rdd1.union(rdd2)
    rdd4 = rdd1.intersection(rdd2)
    print(rdd3.collect())
    print(rdd4.collect())
    print(rdd3.sortBy(lambda x : x[0]).collect())
    

    结果:

    ['C', 'A', 'B', 'B', 'A', 'A', 'D', 'E', 'B']
    ['A', 'B']
    ['A', 'A', 'A', 'B', 'B', 'B', 'C', 'D', 'E']
    


    flod折叠、aggregate聚合API

    这俩都是action,虽然pyspark提供了maxminsumcountmeanstdev(标准差,反应平均值的离散程度)、sampleStdev(与stdev意义相同,stdev分母N-1sampleStdev分母N)、sampleVariance(方差,所有值平方和除N-1)、topcountByValuefirstcollectAsMap等内置的统计函数,但是在某型特殊场景下还是希望能人工订制聚合的公式,需要用到这两个动作。

    代码:

    sc = SparkContext('local')
    rdd1 = sc.parallelize([2,4,6,1])
    rdd2 = sc.parallelize([2,4,6,1],4)
    zeroValue = 0
    foldResult = rdd1.fold(zeroValue,lambda element, accumulate : accumulate+element)
    zeroValue = (1,2)
    seqOp = lambda accumulate,element  : (accumulate[0] + element, accumulate[1] * element)
    combOp = lambda accumulate,element : (accumulate[0]+element[0],  accumulate[1] * element[1])
    aggregateResult = rdd1.aggregate(zeroValue,seqOp,combOp)
    print(foldResult)
    print(aggregateResult)
    aggregateResult = rdd2.aggregate(zeroValue,seqOp,combOp)
    print(foldResult)
    print(aggregateResult)
    


    结果:

    13
    (15, 192)
    13
    (18, 1536)
    

    Fold略简单,但是agregate的理解非常难,不同的分区场景会得到不同的结果,这里用图来解释说明下:

    默认1partition的情况:

    4partition的情况:



    reduceByKey reduceByKeyLocal API

    这两个要计算的效果是一样的,但是前者是传输,后者是动作,使用时候需要注意:

    sc = SparkContext('local')
    oldRdd=sc.parallelize([('Key1',1),('Key3',2),('Key1',3),('Key2',4),('Key2',5)])
    newRdd = oldRdd.reduceByKey(lambda accumulate,ele : accumulate+ele)
    newActionResult = oldRdd.reduceByKeyLocally(lambda accumulate,ele : accumulate+ele)
    print(newRdd.collect())
    print(newActionResult)
    


    结果:

    [('Key1', 4), ('Key3', 2), ('Key2', 9)]
    {'Key1': 4, 'Key3': 2, 'Key2': 9}
    



     

    回到前面mapreduce尾巴留的那个思考题,实现的方式不止一种,我这里给出两种解题思路:

    方案A

    sc = SparkContext('local')
    #第二个参数2代表的是分区数,默认为1
    old=sc.parallelize([1,2,3,4,5])
    newMapRdd = old.map(lambda x : (str(x),x**2))
    print(newMapRdd.collect())
    mergeRdd = newMapRdd.values()
    print(mergeRdd.sum())
    

    方案B
    sc = SparkContext('local') 
    oldRdd=sc.parallelize([1,2,3,4,5])
    newListRdd = oldRdd.map(lambda x : x**2)
    newMapRdd = oldRdd.zip(newListRdd)
    print(newMapRdd.values().sum())
    

     
    之所以给出这些思路,是因为我们在使用pyspark的时候,除了要关心transformationaction之分,还需要注意你要处理的rdd里的数据是list还是map,因为对于他们实用的方法又是不同的。如果有必要,可以像这样做listmap的转换。





    展开全文
  • PySpark

    2021-03-15 23:24:22
    PySpark REFERENCE: 官方文档 先决条件 以下所说的都为在Windows环境测试 环境为Anaconda、Spark2.4.7、JDK1.8、Python3.7、PySpark2.4.7 Anaconda是一个杰出的数据分析工具,内部集成了大量的有关数据分析的...

    PySpark

    REFERENCE: 官方文档

    先决条件

    以下所说的都为在Windows环境测试

    环境为AnacondaSpark2.4.7JDK1.8Python3.7PySpark2.4.7

    Anaconda是一个杰出的数据分析工具,内部集成了大量的有关数据分析的Python包,方便使用,我们使用Anaconda来安装PySpark,Anaconda默认自带Python环境,此处默认已经配置好了Python环境

    Step1

    在本地安装JDK并配置环境变量

    Step2

    下载合适版本的Spark,并将Spark放置在合适目录,并配置环境变量

    去下载

    NOTE:Spark2除了Spark2.4.2是用Scala2.12预编译的之外其他都是用Scala2.11,Spark3.0+都是用Scala2.12预编译的

    Step3

    在本地安装PySpark,注意PySpark版本需要与Spark版本保持一致,如果不一致可能会带来意想不到的错误

    pip uninstall pyspark
    pip install pyspark==2.4.7
    

    安装与Spark版本一致的PySpark,否则运行时会报错,可能会报Scala版本不一致的错误

    即刻开始

    运气好

    如果运气好的话,打开Anaconda,从中打开Jupyter Lab或者Jupyter Notebook,使用以下代码即刻快速开始使用PySpark进行数据分析

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    
    from datetime import datetime, date
    import pandas as pd
    from pyspark.sql import Row
    
    df = spark.createDataFrame([
        Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
        Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
        Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
    ])
    df
    
    DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
    

    运气不好

    当然也有可能,由于安装存在种种问题,在使用上述代码进行开发的时候,总是报种种错误,下面这些代码是经过总结得到的,一般绝不会出错

    import pyspark
    import os
    import findspark
    # 路径需要改为自己的
    findspark.init("E:\spark-2.4.7-bin-hadoop2.7")
    # 路径改为自己本地的SPARK_HOME路径
    os.environ ['SPARK_HOME'] = 'E:\spark-2.4.7-bin-hadoop2.7'
    
    spark = SparkSession.builder.master("local[1]").getOrCreate()
    
    df = spark.sql("SELECT 1 AS test")
    df.show()
    
    spark.stop()
    

    正式使用

    上述代码不适合复用,我们将其封装成为函数

    def initSparkSession():
        """
        实例化一个SparkSession对象
        """
        import os
        from pyspark.sql import SparkSession
    
        os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars kudu-spark2_2.11-1.13.0.jar pyspark-shell'
        os.environ ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_201'
        os.environ ['SPARK_HOME'] = 'E:\spark-2.4.7-bin-hadoop2.7'
    
        import findspark
        findspark.init("E:\spark-2.4.7-bin-hadoop2.7")
        spark = SparkSession.builder.master("local[2]").appName("PySparkLocal").getOrCreate()
        return spark
    

    PySpark读取Kudu

    如果想要成功读取Kudu,还需要Kudu-Spark的Jar包,放在合适的位置,在代码中由os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars kudu-spark2_2.11-1.13.0.jar pyspark-shell'指定,示例中使用的是kudu-spark2_2.11-1.13.0.jar,位置为代码所在的目录,当然也可以使用绝对路径

    def initSparkSession():
        """
        实例化一个SparkSession对象
        """
        import os
        from pyspark.sql import SparkSession
    
        os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars kudu-spark2_2.11-1.13.0.jar pyspark-shell'
        os.environ ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_201'
        os.environ ['SPARK_HOME'] = 'E:\spark-2.4.7-bin-hadoop2.7'
    
        import findspark
        findspark.init("E:\spark-2.4.7-bin-hadoop2.7")
        spark = SparkSession.builder.master("local[2]").appName("PySparkLocal").getOrCreate()
        return spark
    
    def readKuduTable(spark, kuduMaster, kuduTable):
        df = spark.read.format("org.apache.kudu.spark.kudu") \
                .option("kudu.table","{}".format(kuduTable)) \
                .option("kudu.master", "{}".format(kuduMaster)) \
                .load()
        return df
    
    
    if __name__ == '__main__':
        # 实例化一个SparkSession对象
        spark = initSparkSession()
        # 将对象作为参数传入函数
        df = readKuduTable(spark, "node1:7051", "fox_tm_vehicle_series")
        
        df.show() # 将DF的内容打印
        df.createOrReplaceTempView("tab") # 创建临时表,方便后面使用sql
    	spark.sql("SELECT id, name, name_alias FROM tab").show() # 选取几个字段进行展示打印
        spark.sql("SELECT COUNT(*) FROM tab").show()
        df.toPandas() # 以pandas的表格形式展示
        spark.sql("SELECT * FROM tab LIMIT 20").toPandas() # 如果原表数据量太大,直接调用toPandas()的话,可能会导致Driver内存溢出,这里使用sql选取20行再以pandas的表格形式展示
        
        spark.stop()
    
    展开全文
  • Pyspark

    2019-10-11 16:21:22
    PySpark指南PysparkDataframe与pandas的区别常用操作功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、...

    Pyspark

    最近用到了pyspark对数据进行分析和挖掘,在这里总结一下用到的一些技术,巩固知识的同时分享知识。

    Dataframe

    与pandas的区别

    pyspark的基本数据结构是dataframe,这与pandas的dataframe不一样。
    区别:

    1. pandas的大部分操作,pyspark都不可以用

    2. Pyspark Dataframe的数据框是不可变的(因为其数据量比较大),只能通过withColumns等进行操作;

    3. pyspark的dataframe转化为pandas:
      pandas_df = spark_df.toPandas()

      反过来:
      spark_df = sqlContext.createDataFrame(pandas_df)

    常用操作

    更详细的操作可以参考这篇博客:link.
    这篇博客对于Pyspark的DataFrame有很详细的操作介绍。

    1.查看前几列:

    df.take(10) # 查看前十行
    df.show() # 如果行数超过20,默认只显示前20行
    

    2.查询总行数:

    df.count()
    

    3.查询概要:

    df.printSchema()
    

    4.选择一列或者多列

    df.select('name')
    df['name']
    df.name
    df.select(df['name'])
    

    5.新增数据列

    df.withColumn('name1', func(df.name2))
    # 此处的func可以通过下述方法进行定义,从而实现新增列的函数操作
    
    from pyspark.sql.types import StringType
    from pyspark.sql.types import DoubleType
    from pyspark.sql.functions import udf
    
    # 定义函数
    def function(x):
    	return x
    func = udf(function, StringType()) #将结果转化为能够存入DataFrame的数据格式
    

    6.频数统计

    df.groupby('name').agg{'name2': 'mean'}.show() # 统计平均数
    df.groupby('name').count().show() # 统计数量
    

    7.删除

    df.drop(df.name).collet()
    

    8.读写csv

    df.write.csv(path="",header="True",sep=",", mode="overwrite")
    

    9.重分区存入hadoop

    df.coalesce(2) 
    #可以将原本分为200个小块的数据,分为2个小块存入,减少了文件个数
    

    Pyspark介绍

    Pyspark是大数据框架spark为python提供的接口,因为python在数据分析与机器学习方面的广泛应用,pyspark顺势而来。

    展开全文
  • PySpark-源码

    2021-03-06 19:18:31
    PySpark
  • PySpark-Learning PySpark实战指南(Leaning PySpark)代码
  • Learn PySpark

    2019-09-11 16:17:27
    Learn PySpark
  • PySpark教程 PySpark是用于Spark的Python API。 PySpark教程的目的是提供使用PySpark的基本分布式算法。 PySpark具有用于基本测试和调试的交互式外壳程序( $SPARK_HOME/bin/pyspark ),不应将其用于生产环境。 ...
  • PySpark和MLlib PySpark和MLlib入门
  • learning pyspark

    2018-11-08 17:39:32
    learning pyspark 指导书籍,语言:英文 源自databricks
  • pyspark.docx

    2021-05-10 08:53:57
    pyspark.docx
  • PySpark_Test:测试项目以练习pyspark
  • pyspark cookbook

    2018-11-12 16:35:37
    Perform effective data processing, machine learning, and analytics using PySpark Overcome challenges in developing and deploying Spark solutions using Python Explore recipes for efficiently combining ...
  • pyspark-coverage-site:PySpark覆盖演示
  • Learning PySpark

    2018-04-07 21:58:00
    Learning PySpark. Tomasz Drabas【全】学习资源较少,一本不错的书
  • PySpark教程

    千次阅读 2018-12-20 09:40:13
    PySpark简介 PySpark环境设置 PySpark SparkContext PySpark RDD PySpark广播与累积器 PySpark SparkConf PySpark SparkFiles PySpark StorageLevel PySpark MLlib PySpark Serializers
    展开全文
  • pyspark-源码

    2021-03-08 17:27:25
    pyspark 该存储库专用于pyspark的代码段。 该代码已针对为Hadoop 2.7.3构建的Spark 2.4.6进行了测试。 注意:为了通过pyspark连接到Mongodb,您需要其他jar文件,具体取决于您使用的spark版本。 有用链接:
  • pyspark原理简介

    万次阅读 2014-07-30 22:08:20
    这是前段时间在看spark的python支持的时候,简单过了一下pyspark里的python代码,整理了一个大致流程。虽然几乎不会python,但基本上能看懂pyspark是怎么让不同虚拟机之间传输数据的、如何在python环境调用java类的...
  • Udacity-PySpark1:Udacity-PySpark1-
  • PySpark样板 编写PySpark作业的样板 有关详细信息,请参见随附的博客文章, 为

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,997
精华内容 4,798
关键字:

pyspark