精华内容
下载资源
问答
  • pyspark使用
    2021-10-28 11:32:57

    udf使用:

    问题:

    不能传入一个df,udf中使用另一个udf,这将抛出一个错误TypeError: cannot pickle '_thread.RLockobject

    解决:

    使用join的大于小于过滤条件

    df1.join(df2, on=[(df2.timestamp > df1.start) & (df2.timestamp < df1.end)]) \ .groupby('start', 'end', 'event_name') \ .agg(F.mean('measurement').alias('avg')) \ .show()

    问题:

    pysaprk struct类型作为key的map不能转成dict的key

    python对key进行哈希函数运算,根据计算的结果决定value的存储地址,所以字典是无序存储的,且key必须是可哈希的。可哈希表示key必须是不可变类型,如:数字、字符串、元组。

    字典(dictionary)是除列表意外python之中最灵活的内置数据结构类型。列表是有序的对象结合,字典是无序的对象集合。两者之间的区别在于:字典当中的元素是通过键来存取的,而不是通过偏移存取。

    PickleException: expected zero arguments for construction of ClassDict  for pyspark.sql.dtypes._create_row

    貌似是由于返回值中包含了numpy.dtype,但是我没有使用,不知道为啥报错。好像是因为类型是map?

    解决:

    转换成str类型在进行处理或者explore展开处理

    from pyspark.sql import Row
    >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
    [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
    >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
    +---+-----+
    |key|value|
    +---+-----+
    |  a|    b|
    +---+-----+

    处理未成功:将map的key转换成str作为新字典的key,总是报字典的key_error。更改了生成的map_key。好像是没有获取到相应的key的问题。。。

    UDF实在是太难调试了,跑一遍要好久,看不出错在哪。。。

    更多相关内容
  • Pyspark使用

    2022-07-02 08:57:43
    第一步:PySpark 应用程序从初始化开始,这是 PySpark 的入口点,如下所示。如果通过 pyspark 可执行文件在 PySpark shell 中运行它,则 shell 会自动在变量 spark 中为用户创建会话。 第二步:数据帧创建,有多种...

    第一步:PySpark 应用程序从初始化开始,这是 PySpark 的入口点,如下所示。如果通过 pyspark 可执行文件在 PySpark shell 中运行它,则 shell 会自动在变量 spark 中为用户创建会话。SparkSession

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()

    第二步:数据帧创建,有多种方式,本文使用Pandas

    pandas_df = pd.DataFrame({
        'a': [1, 2, 3],
        'b': [2., 3., 4.],
        'c': ['string1', 'string2', 'string3'],
        'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
        'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
    })
    df = spark.createDataFrame(pandas_df)
    df

    第三步:查看数据,使用DataFrame.show()来查看数据,下边代码为查看一行

    df.show(1)

    也可以垂直显示,当行太长

    df.show(1, vertical=True)

    可以看到 DataFrame 的架构和列名称

    df.columns
    df.printSchema()

    显示数据帧的摘要

    df.select("a", "b", "c").describe().show()

    DataFrame.collect()将分发的数据作为 Python 中的本地数据收集到驱动程序端。请注意,当数据集太大而无法放入驱动程序端时,这可能会引发内存不足错误,因为它收集了从执行程序到驱动程序端的所有数据。

    df.collect()

    为避免引发内存不足异常,可以使用 DataFrame.take()或DataFrame.tail()

    df.take(1)

    PySpark DataFrame还提供toPandas方法转换回熊猫DataFrame以利用熊猫API。请注意,还会将所有数据收集到驱动程序端,当数据太大而无法放入驱动程序端时,这些数据很容易导致内存不足错误。

    df.toPandas()

    展开全文
  • pyspark使用

    2021-08-26 10:27:03
    saprk本身是Scala语言编写的,使用pyspark可以使用python语言处理RDD。 RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了...

    pyspark背景

    • saprk本身是Scala语言编写的,使用pyspark可以使用python语言处理RDD。
    • RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些RDD应用多个操作来完成某项任务。
    • RDD执行包括Transformation(不立即执行)和Action(立即执行)两种计算方式。

    常用函数

    1.RDD操作,立即执行
    在这里插入图片描述
    2.不立即执行
    在这里插入图片描述
    3.Pair RDD操作
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    头文件激活等

    在这里插入图片描述

    头文件

    spark = SparkSession.builder.appName('tmp').config('')...
    
    sc = spark.sparkContext
    

    读取与写入

    1. sql形式读取
    order_sql_str = """
    select
        name
        ,age
    from
        custome_order
    where
        concat(year, month, day) between {0} and {1}
    """.strip()
    begin_str, end_str = '20210826', '20210826'
    df_order = spark.sql(order_sql_str.format(begin_str, end_str))
    
    • 得到的是df类型
    • df_order.rdd转换为RDD类型(字段调取方式为字典型和pd型)
    • df_order.rdd.keyBy(lambda x:x.name)转换为RDD类型之后取值用df类型方式
    • df_order.rdd.map(lambda x: x.asDict())转换为RDD类型之后转为健值对类型
    • df_order.rdd.map(lambda x: x.asDict()).keyBy(lambda x:x['name']) 转换健值对类型之后字段读取处理方式举例
    1. sc.textFile形式读取
    order_rdd = sc.textFile('/home/custome')
    
    • 读入的形式是RDD类型,后续可以转json,可以map函数处理,可以take一条看内容加以灵活处理,例如:
    order_rdd.map(json.loads)
    
    def _format(line):
    	return {'name':line['name'],
    	'age':line['age']}
    order_rdd.map(_format)
    
    1. 写入文件
    • 重新分区为1part写入output_path
    rdd1.map(json.dumps).repartition(1).saveAsTextFile(outout_path)
    
    • 将list类型的arr转换为rdd类型写入output_path
    sc.parallelize([arr]).map(json.dumps).repartition(1).saveAsTextFile(outout_path)
    
    • parquet形式写入与读取
    df.repartition(100).write.parquet(path=output,mode='overwrite')
    df = spark.read.parquet(output)
    

    常用命令

    1. join之后铺平操作
    rdd1.keyBy(lambda x: x['name']).join(rdd2.keyBy(lambda x: x['name'])).map(lambda x: dict(x[1][0], **x[1][1]))
    
    1. 去重操作(保存最后一个),并恢复到原来的形式
    rdd1.keyBy(lambda x:x['name']).reduceByKey(lambda x,y: y).map(lambda x:x[1])
    
    1. 使用json形式查看
    json.dumps(sc.textFile(input_path).map(json.loads).collect())
    
    1. 将rdd转为spark中的DataFrame格式
    df1 = rdd1.toDF()
    rdd1 = df1.map(lambda x:x.asDict())
    
    1. 将spark中的DataFrame格式转为pandas中的DataFrame,使用toPandas()
    2. withColumn()#在df中新增数据列,会返回一个新的DataFrame,需要两个参数,第二个参数是对原来的列作何操作,比如原来的列整体加一,不可以从别的列进行取数。
    3. 两种主键设置方式
    rdd1.keyBy(lambda x:x['name'])
    
    rdd1.map(lambda x:(x['name'],x))
    
    1. DataFrame去重
    df.dropDuplicates()
    

    上述实现方式可以在SQL语句中直接实现。

    1. spark.sql中使用自定义函数
    def func1(x,y):
    	return x+y
    func = spark.udf.register("func", func1)
    
    1. sql中groupby多个字段表示多个字段联合唯一
    2. sql中使用filter函数
    df = spark.sql(order_sql_str)
    df.filter("name='张三'")
    
    展开全文
  • 出于种种原因,spark集群节点中的环境会不一致,会出现一些莫名其妙的错误,在没有root权限的情况下,可以使用自己配置好的python包分发到集群节点上使用;比方说,本机是python2.7,节点是python2.6的错误:...

    出于种种原因,spark集群节点中的环境会不一致,会出现一些莫名其妙的错误,在没有root权限的情况下,可以使用自己配置好的python包分发到集群节点上使用;

    比方说,本机是python2.7,节点是python2.6的错误:Exception: Python in worker has different version 2.6 than that in driver 2.7, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)

    修改后的 /spark2.3/python/pyspark/shell.py,默认的脚本会用/spark2.3/conf/spark-env.sh覆盖自己设置的环境变量

    export PYTHONSTARTUP="/data/src/shell.py"

    本机调用的python环境

    export PYSPARK_PYTHON=/data/python27_ml/bin/python2.7

    不用python的shell,用jupyter做为交互环境

    export PYSPARK_DRIVER_PYTHON=/data/python27_ml/bin/jupyter

    export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0 --port 8888 --notebook-dir /data/notebook --config /data/src/notebook_config.json"

    运行, /data/python27_ml.tgz是本地路径,要分发到节点上的python包exec "${SPARK_HOME}"/bin/spark-submit \

    pyspark-shell-main \

    --name "PySparkShell" \

    --conf "spark.yarn.dist.archives=/data/python27_ml.tgz" \

    "$@"

    /data/src/shell.py

    需要在SparkContext初始化之前设置环境变量PYSPARK_PYTHON

    os.environ['PYSPARK_PYTHON'] = './python27_ml.tgz/python27_ml/bin/python'

    分发的python包找不到对应python可执行文件的错误:19/02/15 14:55:43 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.160.108.154, executor 9): java.io.IOException: Cannot run program "./python27/bin/python": error=2, No such file or directory

    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)

    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:168)

    展开全文
  • sample_spark3 如何使用findspark和pyspark使用spark3
  • pyspark 大表和小表join,使用广播变量,并对广播变量更新 from pyspark import SparkConf, SparkContext, SQLContext # import org.apache.spark.sql.functions.broadcast # 给定节点,根据其信息进行扩展,找到...
  • pyspark使用graphframes报错 本文主要介绍如何处理pyspark在使用graphframes时碰到的报错。通常,在pyspark下使用graphframe不成功的原因是由于没有在pyspark/jars中导入graphframe所依赖的的jar文件。这里不再对此...
  •  使用Idea开发pyspark程序,在这之前我们需要做一些准备工作, 如同PyCharm开发pyspark程序一样,我们需要先对python项目进行配置: 1. 使用anaconda3里面的python  打开File -&gt; Project Structure -&...
  • pyspark使用hbase详解

    2021-07-27 16:30:40
    pyspark使用hbase详解 一、测试代码: def write2hbase(): from pyspark.sql import SparkSession from pyspark import SparkContext, SparkConf spark = SparkSession.builder.appName("nlp").getOrCreate() # ...
  • pyspark 使用Many different factors come into play as to why a particular user may or may not churn. In this project I use PySpark to analyse and predict churn using data similar to those of companies ...
  • pyspark 使用xgboost模型做训练
  • pyspark使用分布式xgboost

    千次阅读 热门讨论 2020-12-18 18:16:53
    亲测跑通 环境: Python 3.6.5 Pyspark:2.4.5 Spark: 2.4.3 步骤: ... 第二步:下载相关文件(下载地址) ...关键点1:将xgboost4j-0.72.jar和Xgboost4j-spark-0.72.jar添加到job中(使用--ja...
  • zeppelin pyspark java sparkudf

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 14,587
精华内容 5,834
关键字:

pyspark使用