2018-12-04 16:37:22 qwezhaohaihong 阅读数 232
  • Python基础

    Python 以其简洁、优雅、高效的特点,成为目前流行的4大主流开发语言之一,其应用广泛,易学易用,让很多人爱不释手。本套课程为初学者量身打造,是你入门 Python 的必修课程。这一部分内容涵盖了Python基础的知识点,包括Python的基础语法,比如变量,字符编码等,数据类型以及if else,for循环等流程控制语句,内容简单却十分关键,无论是数据类型还是控制语句都是在以后的Python学习无论方向都会反复大量应用的。 此课程特别适合之前完全无任何开发经验的小白白学习! Python除了是人工智能领域第一语言之外,其实还广泛的应用于WEB开发、云计算、金融分析、自动化运维、自动化测试、爬虫、大数据分析等领域,生态圈极为丰富和活跃。它强大的模块库大大的提高了开发者的开发效率,简洁明朗的语法使写代码如写诗一样优雅顺畅,极大降低了编程的学习门槛。可以不谦虚的说,Python是唯一一门无论是开发老司机还是小白白都一致称赞的编程语言。 不论你是什么背景出身,如果对编程感兴趣,从Python入手吧,Python是检测你是否适合做程序员的好的语言,如果练Python都学不会,哈哈,那你可以放弃这个职业啦!

    14587 人正在学习 去看看 李杰

首先,python提交spark的命令 比较复杂的版本如下:

spark-submit --master yarn-client --driver-memory 4g --executor-cores 4 --executor-memory 8g --conf spark.dynamicAllocation.minExecutors=10 --queue queue_cem  /home/test/test_pyspark.py(测试文件目录)

此时是配置了spark-submit命令的

非路径方法有:/bin/spark-submit xxxx.py直接就可以提交python文件(最简单的方法)

 

其次,有时候集群里的python版本不适合更新或者无法联网进行导包的时候,我们可以采用如下方法进行解决:

我对python提交sparkd的初步理解是,spark平台和一个python环境进行绑定,利用pyspark进行交互,因此我们导入的本地包需要导入只pyspark的lib目录下面,并且得是.zip的格式!pyspark的包目录怎么找呢,首先进入pyspark编译器,执行如下代码

import os
import sys
spark_name = os.environ.get('SPARK_HOME',None)
if not spark_name:
    raise ValueErrorError('spark环境没有配置好')
print spark_name

 

spark_name会给出一个路径,我们只需要在路径后添加上 /lib,这就是我们要存放包的路径了,在终端下将本地的.zip格式的第三方包(从github下载最合适)利用mv命令移动至pyspark环境的包目录,此时包已经进入pyspark环境中了,接下来执行一串代码进行配置即可(在pyspark编译环境下)

import os
import sys
spark_name = os.environ.get('SPARK_HOME',None)
if not spark_name:
    raise ValueErrorError('spark环境没有配置好')
sys.path.insert(0,os.path.join(spark_name,'python'))
sys.path.insert(0,os.path.join(spark_name,'python/lib/py4j-0.10.4-src.zip'))
exec(open(os.path.join(spark_name,'python/pyspark/shell.py')).read())

最后一行的代码有可能报错,因为我是学习的别人的方法,不过没关系,这个时候py4j包已经可以使用,并且通过spark-submit提交的py文件也可以import

 

以上内容为个人理解,如有不对请指正

2015-11-21 15:10:20 power0405hf 阅读数 9948
  • Python基础

    Python 以其简洁、优雅、高效的特点,成为目前流行的4大主流开发语言之一,其应用广泛,易学易用,让很多人爱不释手。本套课程为初学者量身打造,是你入门 Python 的必修课程。这一部分内容涵盖了Python基础的知识点,包括Python的基础语法,比如变量,字符编码等,数据类型以及if else,for循环等流程控制语句,内容简单却十分关键,无论是数据类型还是控制语句都是在以后的Python学习无论方向都会反复大量应用的。 此课程特别适合之前完全无任何开发经验的小白白学习! Python除了是人工智能领域第一语言之外,其实还广泛的应用于WEB开发、云计算、金融分析、自动化运维、自动化测试、爬虫、大数据分析等领域,生态圈极为丰富和活跃。它强大的模块库大大的提高了开发者的开发效率,简洁明朗的语法使写代码如写诗一样优雅顺畅,极大降低了编程的学习门槛。可以不谦虚的说,Python是唯一一门无论是开发老司机还是小白白都一致称赞的编程语言。 不论你是什么背景出身,如果对编程感兴趣,从Python入手吧,Python是检测你是否适合做程序员的好的语言,如果练Python都学不会,哈哈,那你可以放弃这个职业啦!

    14587 人正在学习 去看看 李杰

没有设置spark环境变量的话:

cd /spark路径/bin
./spark-submit /usr/qy/test_pyspark.py

test_pyspark.py:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Wicle Qian
# 2015.11.19
# test the python in Spark without pyspark

from pyspark import *

def test():
    sc = SparkContext('local','qy_test')
    print sc
    textFile = sc.textFile("file:///usr/local/cluster/spark/README.md")
    print textFile.count()
    print textFile.first()

if __name__ == '__main__':
    test()
2019-09-19 11:09:24 weixin_40548136 阅读数 47
  • Python基础

    Python 以其简洁、优雅、高效的特点,成为目前流行的4大主流开发语言之一,其应用广泛,易学易用,让很多人爱不释手。本套课程为初学者量身打造,是你入门 Python 的必修课程。这一部分内容涵盖了Python基础的知识点,包括Python的基础语法,比如变量,字符编码等,数据类型以及if else,for循环等流程控制语句,内容简单却十分关键,无论是数据类型还是控制语句都是在以后的Python学习无论方向都会反复大量应用的。 此课程特别适合之前完全无任何开发经验的小白白学习! Python除了是人工智能领域第一语言之外,其实还广泛的应用于WEB开发、云计算、金融分析、自动化运维、自动化测试、爬虫、大数据分析等领域,生态圈极为丰富和活跃。它强大的模块库大大的提高了开发者的开发效率,简洁明朗的语法使写代码如写诗一样优雅顺畅,极大降低了编程的学习门槛。可以不谦虚的说,Python是唯一一门无论是开发老司机还是小白白都一致称赞的编程语言。 不论你是什么背景出身,如果对编程感兴趣,从Python入手吧,Python是检测你是否适合做程序员的好的语言,如果练Python都学不会,哈哈,那你可以放弃这个职业啦!

    14587 人正在学习 去看看 李杰


接上篇文章Spark 基础、实践,我们开始正式学习python实战spark.

1.简单使用

1.1 代码提交

  • ./bin/run-example SparkPi 10 运行一个样例代码,实际调用spark-submit提交样例脚本
  • ./bin/spark-shell --master local[2] 启动交互式的spark scala shell,在master local(也可以选择分布式的集群master的url)运行,分配两个线程。spark-shell --help查看选项。
  • ./bin/spark-submit examples/src/main/python/pi.py 10通过spark-submit提交py样例,十分常用的方法。
  • ./bin/sparkR --master local[2]交互式的r语言接口
    以下在命令行调用pyspark运行
>>> textFile = spark.read.text("README.md")
>>> textFile.count() # Dataframe的行数
105                                                                             
>>> textFile.first() # Dataframe的第一行
Row(value='# Apache Spark')
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) # 过滤获取包含“Spark”的行
>>> textFile.filter(textFile.value.contains("Spark")).count()
20
>>> from pyspark.sql.functions import *
#按空格切分计算word个数命名为"numWords",然后获取最大的"numWords"
#`select`和`agg`的参数都是`Column`,我们可以通过`df.colName`获取指定列
>>> textFile.select(size(split(textFile.value,"\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=22)]
>>> textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
DataFrame[word: string, count: bigint]
#`explode`函数将行Dataset转换为words的数据集,执行`groupBy`和`count`计算每个word的计数
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
##获取计数通过`collect()`
>>> wordCounts.collect()
[Row(word='online', count=1), Row(word='graphs', count=1), ...]
#Spark支持将数据集拖放到集群范围的内存缓存中,有利于重复访问数据,比如查询一个小的"hot"数据集或者运行类似PageRank迭代算法,以下将标记我们的linesWithSpark数据集缓存
>>> linesWithSpark.cache()
DataFrame[value: string]
>>> linesWithSpark.count()
20
>>> linesWithSpark.count()
20

1.2读取文件并计数

以下是读取一个文件夹,对包含指定字符的行计数。

from pyspark.sql import SparkSession
logfile = "file:///home/hadoop/software/spark/spark-2.4.4-bin-hadoop2.7/README.md"
spark = SparkSession.builder.appName("SimpleApp").getOrCreate() # SparkSession创建数据集
logData = spark.read.text(logfile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Line with a:{}  Line with b:{}".format(numAs,numBs))
spark.stop()

有的应用使用自定义的类或引用第三方文件,我们可以在spark-submit时通过--py-files添加打包的zip文件。

#Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

我们可以安装pyspark通过python解释器来运行

#Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

2.RDD编程指引

从高层次来看,每个Spark应用程序都由一个驱动程序组成,该驱动程序运行用户的主要功能,并在集群上执行各种并行操作。Spark提供的主要抽象是一个弹性分布式数据集(RDD),它是跨集群节点分区的元素集合,可以并行操作。RDDs的创建是从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件开始的,或者是驱动程序中已有的Scala集合,并对其进行转换。用户还可以要求Spark将RDD持久存储在内存中,以便在并行操作之间有效地重用它。最后,RDDs自动从节点故障中恢复。

Spark中的第二个抽象是可以在并行操作中使用的共享变量。默认情况下,当Spark作为不同节点上的一组任务并行运行一个函数时,它将函数中使用的每个变量的副本发送给每个任务。有时,需要在任务之间或任务与驱动程序之间共享一个变量。Spark支持两种类型的共享变量:广播变量(可用于在所有节点的内存中缓存一个值)和累加器(仅“添加”到其中的变量,如计数器加和)。

2.1连接spark

Spark 2.4.4适用于Python 2.7+或Python 3.4+。它可以使用标准的CPython解释器,因此可以使用像NumPy这样的C库。它也适用于PyPy 2.3+。
如果要构建打包的PySpark应用程序或库,可以将其添加到setup.py文件中:

install_requires=[
	'pyspark=={site.SPARK_VERSION}'
]

要在Python中运行Spark应用程序而不需要pip安装PySpark,请使用位于Spark目录中的bin/ Spark -submit脚本。这个脚本将加载Spark的Java/Scala库,并允许您向集群提交应用程序。还可以使用bin/pyspark启动交互式Python shell。
如果要访问HDFS数据,需要使用PySpark构建来链接到您的HDFS版本。Spark主页上还提供了一些预构建包,用于常见的HDFS版本。
最后,您需要将一些Spark类导入程序。添加以下行:from pyspark import SparkContext, SparkConf
PySpark在driver节点和worker节点中都需要相同的Python版本。它在PATH中使用默认的python版本,你可以指定PYSPARK_PYTHON想使用的python版本,例如:

$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

2.2 初始化

  • 首先必须是创建一个SparkContext对象,它告诉Spark如何访问集群
  • 要创建SparkContext,则需要构建一个包含应用程序信息的SparkConf对象。
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName参数是application要在集群UI上显示的名称。master支持Spark、Mesos、YARN集群URL或者"local"(本地模式)。实际上,在集群上运行时,我们不希望在程序中硬编码master,而是使用spark-submit启动应用程序提交代码。而对于本地测试和单元测试,可以通过“local”运行Spark in-process过程调试。

2.3 使用shell

在PySpark shell中,名为sc的变量中创建了一个解释器可以识别的SparkContext

  • —master参数设置SparkContext连接到哪个master
  • --py-file传递逗号分隔的列表,将python .zip、.egg或.py文件添加到运行时路径。
  • --packages参数提供以逗号分隔的Maven坐标列表,将依赖项(Spark Packages)添加到shell会话中。
  • --repository传递任何存在依赖项的附加存储库(例如Sonatype)。Spark包所具有的任何python依赖项(在该包的requirements.txt中列出)必须在必要时使用pip手动安装。

在四个核心上运行bin/pyspark,可以使用:$ ./bin/pyspark --master local[4]
添加要引用的code使用$ ./bin/pyspark --master local[4] --py-files code.py
要获得完整的选项列表运行pyspark-help

spark支持增强型python解释器ipython中启动pyspark shell。pyspark与ipython 1.0.0及更高版本兼容。要使用ipython,请在运行bin/pyspark时将PYSPARK_DRIVER_PYTHON变量设置为ipython$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
如果要使用Jupyter notebook(著名的ipyhton解释器)$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark,可以输入命令%pylab inline,然后再尝试从Jupyter笔记本中使用Spark。

2.4 弹性分布式数据集(Resilient Distributed Datasets)

Spark围绕弹性分布式数据集(RDD)的概念展开,RDD是可以并行操作的容错元素集合。创建RDDs有两种方法:

  • 并行化(parallelizing)driver中的现有集合。
  • 引用外部存储系统中的数据集,如共享文件系统、HDFS、HBase或任何支持Hadoop InputFormat的数据源.

2.4.1 并行化集合

并行化集合是通过driver中存在的迭代器或集合上调用SparkContextparallelize方法创建的。复制集合的元素形成可并行操作的分布式数据集。例如以下是创建一个并行的集合容纳数字1到5:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

创建分布式数据集(distData),就可以并行的操作它。可以调用distData.reduce(lambda a, b: a + b)将列表中的元素相加。稍后我们将描述对分布式数据集的操作。
并行集合的一个重要参数是要将数据集分割的partition数。Spark为每个分区运行一个任务。通常,集群中的每个CPU需要2-4个分区。通常,Spark会根据集群自动设置分区的数量,可以通过将它作为第二个参数传递来手动设置它sc.parallelize(data, 10)。注意:代码中的一些地方使用术语片(分区的同义词)来维护向后兼容性。

2.4.2 外部数据集

PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark支持文本文件、sequencefile和任何其他Hadoop InputFormat。

可以使用SparkContext的textFile方法创建文本文件RDDs。此方法接受文件的URI(本地路径或hdfs://、s3a://等URI),并将其作为行集合读取。下面是一个示例调用:
distFile = sc.textFile("data.txt")
创建后,可以对dataset操作,以下通过map和reduce获取行的sizes然后相加distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
Spark读取文件的一些注意事项:

  • 如果使用本地文件系统上的路径,则必须在工作节点上的相同路径上访问该文件。要么将文件复制到所有workers,要么使用挂载于网络的共享文件系统。
  • Spark的所有基于文件的输入方法(包括textFile)都支持在目录、压缩文件和通配符上运行。可以使用textFile(“/my/directory”)、textFile(“/my/directory/*.txt”)、textFile(“/my/directory/*.gz”)
  • textFile方法还接受一个可选的第二个参数,用于控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(在HDFS中,块的默认大小为128MB),但是也可以通过传递更大的值来请求更多的分区。注意,分区不能少于块

除了文本文件,Spark的Python API还支持其他几种数据格式:

  • SparkContext.wholeTextFiles允许读取包含多个小文本文件的目录,并以(filename, content)对的形式返回每个小文本文件。这与textFile相反,textFile在每个文件中每行返回一条记录。

  • RDD.saveAsPickleFile SparkContext.pickleFile。pickleFile支持以包含pickled Python object的简单格式保存RDD。batch用于pickle序列化,默认batch size为10。

  • SequenceFile和Hadoop输入/输出格式

注意,这些特性目前标记为实验性的,并且面向高级用户。将来,它可能会被基于Spark SQL的读/写支持所替代,在这种情况下,Spark SQL是首选的方法。

写入支持
PySpark SequenceFile支持加载Java中键值对的RDD,将可写对象转换为基本Java类型,并使用Pyrolite pickles生成的Java对象。当将键值对的RDD保存到SequenceFile时,PySpark执行相反的操作。它将Python对象unpickle为Java对象,然后将它们转换为可写对象。以下可写项会自动转换:
在这里插入图片描述
数组不是开箱即用的。在读取或写入时,用户需要指定自定义ArrayWritable的子类。在写入时,用户还需要指定将数组转换为自定义ArrayWritable子类的自定义转换器。读取时,默认转换器将自定义ArrayWritable子类型转换为Java Object[],然后将其pickle为Python元组。为了获取Python数组。对于基元类型数组,用户需要指定自定义转换器。

保存和加载SequenceFiles
与文本文件类似,可以通过指定路径保存和加载sequencefile。可以指定键类和值类,但标准可写对象则不需要指定。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

保存和加载其他Hadoop输入/输出格式
PySpark还可以读取任何Hadoop InputFormat或编写任何Hadoop OutputFormat,用于“新的”和“旧的”Hadoop MapReduce api。如果需要,Hadoop配置可以作为Python dict传入。下面是一个使用Elasticsearch ESInputFormat的例子:

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
                             "org.apache.hadoop.io.NullWritable",
                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                             conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

注意如果InputFormat仅仅依赖于Hadoop配置和/或输入路径,并且键和值类可以根据上表轻松地转换,那么这种方法应该适用于这种情况。

如果您有自定义的序列化二进制数据(例如从Cassandra / HBase加载数据),那么首先需要将Scala/Java端上的数据转换为Pyrolite的pickler可以处理的数据。为此提供了一个转换器特性。只需扩展此特性并在convert方法中实现转换代码。记住,要确保这个类以及访问InputFormat所需的任何依赖项都打包到Spark作业jar中,并包含在PySpark类路径中。

2.4.3 RDD操作

RDDs支持两种类型的操作:transformations(从现有数据集创建新数据集)和actions(在数据集上运行计算后向驱动程序返回一个值)。例如,map是一个转换,它通过一个函数传递每个dataset元素,并返回一个表示结果的新RDD。另一方面,reduce是一个使用某个函数聚合RDD的所有元素并将最终结果返回给驱动程序的操作还有一个返回分布式数据集的并行的reduceByKey

Spark中的所有transformation都是惰性的,因为它们不会立即计算结果。相反,它们只记住应用于某些基本数据集(例如文件)的转换。只有当操作需要将结果返回给driver,才会计算转换。这种设计使Spark能够更有效地运行。例如,通过map创建的数据集将在reduce中使用,并且只将reduce的结果返回给driver,而不是更大的map数据集。

默认情况下,每次对每个转换后的RDD运行操作时,都可以重新计算它。也可以使用persist(或cache)方法将RDD持久化到内存中,在这种情况下,Spark会将元素保存在集群中,以便下次查询时更快地访问它。还支持在磁盘上持久存储RDDs,或跨多个节点复制RDDs。

基础

以下是一个简单的例子:

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行定义来自外部文件的RDD。这个数据集没有加载到内存中,也没有以其他方式执行:行只是指向文件的指针。
第二行定义了lineLengths作为转换map的结果。同样,transformation是惰性的,lineLengths不会立即计算出来。最后,运行reduce,这是一个action。此时Spark将计算分解为在单机上运行的任务,每台机器都运行一部分mapreduction,只向driver返回它的答案。
如果我们稍后还想再次使用lineLengths,我们可以在reduce前添加下列代码,可以将第一次的计算结果存储到内存中。

lineLengths.persist()

传递函数到spark

Spark的API严重依赖于传递到driver`程序中的函数:

  • Lambda表达式,用于可以写成表达式的简单函数。(Lambdas不支持多语句函数或不返回值的语句。)
  • 调用Spark本地defs内的函数,为较长的代码。
  • 模块中的顶级函数。

例如,要传递一个比lambda更长的函数,考虑下面的代码:

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)
    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

注意:虽然也可以在类实例中传递对方法的引用(与单例对象相反),但这需要同时传递包含该类的对象和方法。例如:

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

在这里,如果我们创建一个新的MyClass并在其上调用doStuff,其中的映射引用那个MyClass实例的func方法,因此需要将整个对象发送到集群。
同样的,访问外部对象的字段也会引用整个对象:

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

为了避免这个问题,最简单的方法是将field复制到本地变量中,而不是从外部访问它:

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

理解闭包

Spark的难点之一是理解跨集群执行代码时变量和方法的范围和生命周期。在变量范围之外修改变量的RDD操作可能经常引起混淆。在下面的示例中,我们将查看使用foreach()递增计数器的代码,但是其他操作也可能出现类似的问题。
考虑简单的RDD元素sum,它的行为可能会有所不同,这取决于是否在相同的JVM中执行。一个常见的例子是在本地模式下运行Spark(–master = local[n]),而不是将Spark应用程序部署到集群(通过Spark -submit to YARN):

counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

本地模式与集群模式

上述代码的行为是未定义的,可能无法按预期工作。为了执行jobs,Spark将RDD操作的处理分解为tasks,每个task都由executor执行。在执行之前,Spark计算任务的闭包。闭包是那些在RDD上执行计算时必须可见的变量和方法(在本例中是foreach())。这个闭包被序列化并发送到每个执行器。

闭包中发送给每个执行器的变量现在都是副本,因此,当在foreach函数中引用counter时,它不再是driver节点上的计数器。在driver节点的内存中仍然有一个计数器,但是对于执行器并不可见。执行器只看到序列化闭包的副本。因此,counter的最终值仍然为零,因为counter上的所有操作都引用了序列化闭包中的值。

在本地模式的某些情况下,foreach函数将与driver程序在相同的JVM中实际执行,并引用相同的原始计数器,并可能实际更新counter

为了确保在这类场景中定义良好的行为,应该使用累加器。Spark中的累加器专门用于提供一种机制,以便在集群中的工作节点之间执行操作时安全地更新变量。本指南的累加器部分将更详细地讨论这些。

一般来说,这样的构造(循环或局部定义的方法)不应该用来改变某些全局状态。Spark不定义或保证从闭包外部引用的对象的突变行为。一些这样做的代码可能在本地模式下正常工作,但这只是偶然的,而且这些代码在分布式模式下不会像预期的那样工作。如果需要全局聚合,需要使用累加器

打印RDD的元素

另一个常见的习惯用法是尝试使用rdd.foreach(println)rdd.map(println)打印出RDD的元素。在单机上,这将生成预期的输出并打印所有RDD的元素。但是,在集群模式下,执行器调用的输出stdout是写入executor的stdout,而不是driver上的stdout,所以driver上的stdout不会显示这些!
打印驱动程序上的所有元素,可以使用collect()方法首先将RDD带到driver程序节点:rdd.collect().foreach(println)。但是,这可能导致驱动程序耗尽内存,因为collect()将整个RDD提取到一台机器上;如果只需要打印RDD的几个元素,那么更安全的方法是使用take(): rdd.take(100).foreach(println)

处理键值对

大多数Spark操作在包含任何类型对象的RDDs上工作,有少数特殊操作是在键值对的RDDs上可用。最常见的是分布式“shuffle”操作,如按keygroupaggregating
在Python中,这些操作在包含内置的tuple如(1,2)的RDDs。

例子,下面的代码使用对键-值对的reduceByKey操作来计算文件中每行文本出现的次数:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

我们还可以使用counts.sortByKey()来按字母顺序排序对,最后使用counts.collect()将它们作为对象列表返回到driver
下一篇把常用的Transformationsactions整理下。
Spark官方文档

2017-09-08 14:00:16 u012882134 阅读数 2846
  • Python基础

    Python 以其简洁、优雅、高效的特点,成为目前流行的4大主流开发语言之一,其应用广泛,易学易用,让很多人爱不释手。本套课程为初学者量身打造,是你入门 Python 的必修课程。这一部分内容涵盖了Python基础的知识点,包括Python的基础语法,比如变量,字符编码等,数据类型以及if else,for循环等流程控制语句,内容简单却十分关键,无论是数据类型还是控制语句都是在以后的Python学习无论方向都会反复大量应用的。 此课程特别适合之前完全无任何开发经验的小白白学习! Python除了是人工智能领域第一语言之外,其实还广泛的应用于WEB开发、云计算、金融分析、自动化运维、自动化测试、爬虫、大数据分析等领域,生态圈极为丰富和活跃。它强大的模块库大大的提高了开发者的开发效率,简洁明朗的语法使写代码如写诗一样优雅顺畅,极大降低了编程的学习门槛。可以不谦虚的说,Python是唯一一门无论是开发老司机还是小白白都一致称赞的编程语言。 不论你是什么背景出身,如果对编程感兴趣,从Python入手吧,Python是检测你是否适合做程序员的好的语言,如果练Python都学不会,哈哈,那你可以放弃这个职业啦!

    14587 人正在学习 去看看 李杰

笔者最近项目有点杂,什么都做,最近有涉及到spark的mllib上了。
本地没有spark环境,但需要调用spark的api。费了一番周折,记录下配置方法。

安装py4j和pyspark

笔者安装的是Anaconda2,带有了全套的python环境。本地开发spark项目,还需要安装py4j和pyspark的lib.

pip install py4j
pip install pyspark

下载spark

注意,只是下载spark而已,不需要安装。
地址是:https://spark.apache.org/downloads.html
下载 spark-2.2.0-bin-hadoop2.6.tgz
解压到 D:\software\spark-2.2.0-bin-hadoop2.6

配置环境变量

这一步比较重要。
主要的三个配置如下:

变量名:SPARK_HOME
变量值:D:\software\spark-2.2.0-bin-hadoop2.6

变量名:PYTHONPATH
变量值:%SPARK_HOME%\python;%SPARK_HOME%\python\lib\py4j-0.10.4-src.zip

变量名:Path
变量值:%SPARK_HOME%\bin

添加hadoop和java的依赖

下载winutils.exe.将其放在文件夹D:\software\spark-2.2.0-bin-hadoop2.6\HADOOP_HOME\bin下。
下载并安装java。

测试代码

# encoding: utf-8
from pyspark import SparkContext
import os
import sys
os.environ['SPARK_HOME'] = "D:\\software\\spark-2.2.0-bin-hadoop2.6"
os.environ['JAVA_HOME'] = "D:\\java\\jdk1.8"
sys.path.append("D:\\software\\spark-2.2.0-bin-hadoop2.6\\python")
os.environ['HADOOP_HOME'] = "D:\\software\\spark-2.2.0-bin-hadoop2.6\\HADOOP_HOME"

sc = SparkContext("local", "Simple App")
from numpy import array
from pyspark.mllib.clustering import BisectingKMeans
data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
bskm = BisectingKMeans()
model = bskm.train(sc.parallelize(data, 2), k=4)
p = array([0.0, 0.0])
print model.predict(p)
print model.k
print model.computeCost(p)

注意,在代码中添加了四个环境变量。
HADOOP_HOME指定winutils.exe所在的文件夹的上一级目录(不含bin),否则会提示找不到winutils.exe。
上述代码能正常运行,说明配置无误。

运行结果可能提示:
Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
表示无法使用原生hadoop环境,使用编译好的java类,对测试无影响。

scala本地发开spark的配置环境和python相似。

贴一下其他的参考文档:
http://www.jianshu.com/p/5fc3470165b8
http://blog.csdn.net/hjxinkkl/article/details/57083549?winzoom=1

2016-04-16 21:50:28 wireless_com 阅读数 13841
  • Python基础

    Python 以其简洁、优雅、高效的特点,成为目前流行的4大主流开发语言之一,其应用广泛,易学易用,让很多人爱不释手。本套课程为初学者量身打造,是你入门 Python 的必修课程。这一部分内容涵盖了Python基础的知识点,包括Python的基础语法,比如变量,字符编码等,数据类型以及if else,for循环等流程控制语句,内容简单却十分关键,无论是数据类型还是控制语句都是在以后的Python学习无论方向都会反复大量应用的。 此课程特别适合之前完全无任何开发经验的小白白学习! Python除了是人工智能领域第一语言之外,其实还广泛的应用于WEB开发、云计算、金融分析、自动化运维、自动化测试、爬虫、大数据分析等领域,生态圈极为丰富和活跃。它强大的模块库大大的提高了开发者的开发效率,简洁明朗的语法使写代码如写诗一样优雅顺畅,极大降低了编程的学习门槛。可以不谦虚的说,Python是唯一一门无论是开发老司机还是小白白都一致称赞的编程语言。 不论你是什么背景出身,如果对编程感兴趣,从Python入手吧,Python是检测你是否适合做程序员的好的语言,如果练Python都学不会,哈哈,那你可以放弃这个职业啦!

    14587 人正在学习 去看看 李杰

能在本地Mac环境用python提交spark 任务会方便很多,但是在安装了 spark-1.6-bin-without-hadoop  (spark.apache.org/download) 之后,在python 中  “import pyspark” 会报“no module named pyspark” 错误。 没错,这种错误都是 路径问题。


为了本地使用spark,需要在~/.bash_profile 中增加两个环境变量:SPARK_HOME 以及必知的PYTHONPATH

export SPARK_HOME=/Users/abc/Documents/spark-1.6.0-bin-without-hadoop #这是spark 的安装路径

export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH

注:Py4J 有点像 Python 版的 JNI,通过它, Python 程序可以利用 Python 解释器直接调用Java虚拟机中的 Java 对象,也可以让 Java 调用 Python 对象。

然后,别忘了,source ~/.bash_profile 让它生效。 运行 python shell,

from pyspark import SparkContext 

都可以了么, 但是 当你单独执行pyspark 或者 在python 中初始化SparkConf 等其它类的时候,又报错了

"Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream"

spark 访问FS 时库文件丢失,看来spark 和hadoop 的结合还需要指明更多的纽带,简单的换一下 spark distribution吧。将 spark-1.6.0-bin-without-hadoop 换成 spark-1.6.0-bin-hadoop2.6,然后更新 .bash_profile 中SPARK_HOME 的路径。

直接运行pyspark:

$ pyspark

Python 2.7.11 (default, Mar  1 2016, 18:40:10) 

[GCC 4.2.1 Compatible Apple LLVM 7.0.2 (clang-700.1.81)] on darwin

Type "help", "copyright", "credits" or "license" for more information.

16/04/16 21:41:02 INFO spark.SparkContext: Running Spark version 1.6.0

16/04/16 21:41:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

16/04/16 21:41:05 INFO spark.SecurityManager: Changing view acls to: abel,hdfs

16/04/16 21:41:05 INFO spark.SecurityManager: Changing modify acls to: abel,hdfs

16/04/16 21:41:05 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(abel, hdfs); users with modify permissions: Set(abel, hdfs)

16/04/16 21:41:06 INFO util.Utils: Successfully started service 'sparkDriver' on port 55162.

16/04/16 21:41:06 INFO slf4j.Slf4jLogger: Slf4jLogger started

16/04/16 21:41:06 INFO Remoting: Starting remoting

16/04/16 21:41:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.106:55165]

16/04/16 21:41:07 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 55165.

16/04/16 21:41:07 INFO spark.SparkEnv: Registering MapOutputTracker

16/04/16 21:41:07 INFO spark.SparkEnv: Registering BlockManagerMaster

16/04/16 21:41:07 INFO storage.DiskBlockManager: Created local directory at /private/var/folders/wk/fxn2zdyd7rz8rm66rst4h15w0000gn/T/blockmgr-6de54d08-31c9-430e-ac3c-9f3e0635e486

16/04/16 21:41:07 INFO storage.MemoryStore: MemoryStore started with capacity 511.5 MB

16/04/16 21:41:07 INFO spark.SparkEnv: Registering OutputCommitCoordinator

16/04/16 21:41:07 INFO server.Server: jetty-8.y.z-SNAPSHOT

16/04/16 21:41:07 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040

16/04/16 21:41:07 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.

16/04/16 21:41:07 INFO ui.SparkUI: Started SparkUI at http://192.168.1.106:4040

16/04/16 21:41:07 INFO executor.Executor: Starting executor ID driver on host localhost

16/04/16 21:41:07 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55167.

16/04/16 21:41:07 INFO netty.NettyBlockTransferService: Server created on 55167

16/04/16 21:41:07 INFO storage.BlockManagerMaster: Trying to register BlockManager

16/04/16 21:41:07 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:55167 with 511.5 MB RAM, BlockManagerId(driver, localhost, 55167)

16/04/16 21:41:07 INFO storage.BlockManagerMaster: Registered BlockManager

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0

      /_/


Using Python version 2.7.11 (default, Mar  1 2016 18:40:10)

SparkContext available as sc, HiveContext available as sqlContext.

>>>

OK, 至此,pyspark 算是在本机的MAC 环境中可以基本上正常工作了。

No module named pyspark

阅读数 3140

没有更多推荐了,返回首页