精华内容
下载资源
问答
  • Python开发Spark应用之Wordcount词频统计

    千次阅读 2016-10-23 01:38:11
    但是发现Spark这玩意还是蛮有意思的。下面给大家介绍一下如何用python跑一遍Wordcount的词频统计的示例程序。#在pyspark模块中引入SparkContext和SparkConf类 #在operator模块中导入add类 from pyspark import ...

    待我学有所成,结发与蕊可好。@夏瑾墨


    一个早上只做了一点微小的工作,很忏愧。但是发现Spark这玩意还是蛮有意思的。下面给大家介绍一下如何用python跑一遍Wordcount的词频统计的示例程序。

    #在pyspark模块中引入SparkContext和SparkConf类
    #在operator模块中导入add类
    from pyspark import SparkContext, SparkConf 
    from operator import add
    
    #应用程序名
    #初始化一个SparkContext,现在sc就是一个SparkContext的实例化对象,然后方可创建RDD。
    appName = "WordCount"
    conf = SparkConf().setAppName(appName).setMaster("local")
    sc = SparkContext(conf=conf)
    
    # inputFiles表示输入文件路径
    # stopWordFile表示停词文件路径
    # outputFile表示输出文件路径
    inputFiles = "/home/hadoop/software/spark-2.0.0-bin-hadoop2.6/examples/src/main/resources/wordcount/*"
    stopWordFile = "/home/hadoop/software/spark-2.0.0-bin-hadoop2.6/examples/src/main/resources/wordcount/stopword.txt"
    outputFile = "/tmp/result"
    
    #处理非单词符号
    targetList = list('\t\().,?[]!;|') + ['--']
    #用空格替换这些标点符号,同时将替换后的行拆分成单词.在flatMap中使用replaceAndSplit函数
    def replaceAndSplit(s):
        for c in targetList:
            s = s.replace(c, " ")
        return s.split()
    
    inputRDD = sc.textFile(inputFiles)
    stopRDD = sc.textFile(stopWordFile)
    stopList = stopRDD.map(lambda x: x.strip()).collect()
    
    inputRDDv1 = inputRDD.flatMap(replaceAndSplit)
    inputRDDv2 = inputRDDv1.filter(lambda x: x not in stopList)
    inputRDDv3 = inputRDDv2.map(lambda x: (x,1))
    inputRDDv4 = inputRDDv3.reduceByKey(add)
    inputRDDv5 = inputRDDv4.map(lambda x: (x[1], x[0]))
    inputRDDv6 = inputRDDv5.sortByKey(ascending=False)
    inputRDDv7 = inputRDDv6.map(lambda x: (x[1], x[0])).keys()
    top100 = inputRDDv7.take(100)
    result = sc.parallelize(top100)
    result.saveAsTextFile(outputFile)

    背景知识

    1.任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的,SparkContext的初始化需要一个SparkConf对象,Sparkconf包括了Spark集群配置的各种参数(比如主节点的URL)。初始化后,就可以用SparkContext对象所包含的各种方法来创建,操作分布式数据集和共享变量。

    2.涉及的函数

    • Python split()方法:通过指定分隔符对字符串进行切片,如果参数num 有指定值,则仅分隔 num 个子字符串。
    • Python strip() 方法:用于移除字符串头尾指定的字符(默认为空格)。
    • Python lambda()方法:用来创建匿名函数,lambda的主体是一个表达式,用来封转有限的逻辑进去。
    • Python内建的filter()函数 : 用于过滤序列,filter()也接收一个函数和一个序列.
    • map( )方法:接收一个函数,应用到RDD中的每个元素,然后为每一条输入返回一个对象。根据提供的函数对指定序列做映射。
    • flatMap( )方法:接收一个函数replaceAndSplit,应用到RDD中的每个元素,返回一个包含可迭代的类型(如list等)的RDD,可以理解为先Map(),后flat(). -

    map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:
    操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象
    操作2:最后将所有对象合并为一个对象

    • Spark sortByKey函数 : 作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的.
    • take(): Spark的RDD的action操作take()用于提取数据
    • parallelize() : 创建一个并行集合,例如sc.parallelize(0 until numMappers, numMappers) 创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份.
    • Spark主要提供了两种函数:parallelize和makeRDD:
      1)parallelize的声明:
    def parallelize[T: ClassTag](    
     seq: Seq[T],    
    numSlices: Int = defaultParallelism): RDD[T]   

    2)makeRDD的声明:

    def makeRDD[T: ClassTag](    
    
    seq: Seq[T],    
     numSlices: Int = defaultParallelism): RDD[T]    
    def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]   

    3)区别:

    A)makeRDD函数比parallelize函数多提供了数据的位置信息。
    B)两者的返回值都是ParallelCollectionRDD,但parallelize函数可以自己指定分区的数量,而makeRDD函数固定为seq参数的size大小。

    ![这里写图片描述](https://img-blog.csdn.net/20161023140214492)
    

    使用spark-submit执行python文件,我们选择使用local模式

    以下是词频统计结果:
    这里写图片描述
    这里写图片描述
    这里写图片描述

    参考资料

    1.Lambda 表达式有何用处?如何使用?
    https://www.zhihu.com/question/20125256
    2.python中的map、filter、reduce函数用法
    http://blog.sina.com.cn/s/blog_45ac0d0a010191rb.html
    3.Python 特殊语法:filter、map、reduce、lambda
    http://www.cnblogs.com/fangshenghui/p/3445469.html
    4.Spark RDD操作(Python)总结
    http://blog.csdn.net/sinat_29581293/article/details/51487283
    5.SparkContext 简单分析
    http://www.cnblogs.com/softlin/p/5792126.html


    待我学有所成,结发与蕊可好。@夏瑾墨

    展开全文
  • TensorFlowOnSpark TensorFlowOnSpark为Apache Hadoop和Apache Spark集群带来了可扩展的深度学习。 通过将TensorFlow深度学习框架中的突出功能与Apache Spark和Apache Hadoop相结合,TensorFlowOnSpark TensorFlowOn...
  • 这些是Java和Python示例代码,用于在我的博客教程中显示Warehouse-Scale Computing中编程模型的HOWTO。 下面有五个示例,主要目的是让您亲身体验运行MapReduce并获得对MapReduce范例的更深入的了解,熟悉Apache ...
  • SparkFlow - 在Apache Spark引入Tensorflow易于使用的库
  • 运行python版本的Spark程序

    万次阅读 2016-03-18 17:59:42
    使用 spark-submit 解释执行python脚本 使用 python 解释执行python脚本 引入pyspark和py4j这两个模块

    两种方法:

    1. 使用 spark-submit 解释执行python脚本
    2. 使用 python 解释执行python脚本

    1. 使用Spark-submit解释执行python脚本

    python脚本中需要在开头导入spark相关模块,调用时使用spark-submit提交,示例代码如下:

    ===========================================================

    """odflow.py"""
    from pyspark import SparkContext
    
    fileDir = "/TripChain3_Demo.txt"
    # sc = SparkContext("local", "ODFlow")
    sc = SparkContext("spark://ITS-Hadoop10:7077", "ODFlow")
    lines = sc.textFile(fileDir)
    
    # python不能直接写多行的lambda表达式,所以要封装在函数中
    def toKV(line):
        arr = line.split(",")
        t = arr[5].split(" ")[1].split(":")
        return (t[0]+t[1]+","+arr[11]+","+arr[18],1)
    
    r1 = lines.map( lambda line : toKV(line) ).reduceByKey(lambda a,b: a+b)
    # 排序并且存入一个(repartition)文件中
    r1.sortByKey(False).saveAsTextFile("/pythontest/output")
    

    ===========================================================

    发布命令为:

    spark-submit \
      --master spark://ITS-Hadoop10:7077 \
      odflow.py
    

    2. 使用 python 解释执行python脚本

    直接用python执行会出现错误:

    ImportError: No module named pyspark
    ImportError: No module named py4j.java_gateway
    

    缺少pyspark和py4j这两个模块,这两个包在Spark的安装目录里,需要在环境变量里定义PYTHONPATH,编辑~/.bashrc或者/etc/profile文件均可

    vi ~/.bashrc # 或者 sudo vi /etc/profile
    # 添加下面这一行
    export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH
    # 使其生效
    source ~/.bashrc # 或者 sudo source /etc/profile
    

    然后关闭终端,重新打开,用python执行即可

    python odflow.py
    
    展开全文
  • Linux下运行带有Spark依赖的Python脚本

    千次阅读 2018-07-25 16:37:50
    spark-submit脚本会帮我们引入Python程序的Spark依赖。这个脚本为SparkPythonAPI配置好了运行环境。 首先找到spark的安装目录,一般在Linux下,会将第三方软件安装到/opt目录下面。 然后运行下述指令: /opt/...

    在Python中,你可以把应用写成Python脚本,但是需要使用Spark自带的bin/spark-submit脚本来运行。spark-submit脚本会帮我们引入Python程序的Spark依赖。这个脚本为Spark的PythonAPI配置好了运行环境。

    首先找到spark的安装目录,一般在Linux下,会将第三方软件安装到/opt目录下面。

    然后运行下述指令:

    /opt/spark/bin/spark-submit my_script.py

    或者进入到spark的安装目录下面:

    bin/spark-submit my_script.py

    测试脚本:

    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("My App")
    sc = SparkContext(conf = conf)
    
    lines = sc.parallelize(["pandas", "cat", "i like pandas"])
    word = lines.filter(lambda s: "pandas" in s)
    print(word.collect())

     

    展开全文
  • 单词统计程序Scala实现---idea 安装scala插件创建maven项目,引入scala sdkpom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="...

    单词统计程序

    Scala实现

    ---

    1. idea 安装scala插件
    2. 创建maven项目,引入scala sdk

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>spark-learn</groupId>
        <artifactId>cn.spark.learn</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>1.7</maven.compiler.source>
            <maven.compiler.target>1.7</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <scala.version>2.10.6</scala.version>
            <scala.compat.version>2.10</scala.compat.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.2</version>
            </dependency>
        </dependencies>
    
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-make:transitive</arg>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.18.1</version>
                    <configuration>
                        <useFile>false</useFile>
                        <disableXmlReport>true</disableXmlReport>
                        <includes>
                            <include>**/*Test.*</include>
                            <include>**/*Suite.*</include>
                        </includes>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>cn.itcast.spark.WordCount</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>

         3. 代码实现

    object WordCount {
    
      def main(args: Array[String]): Unit = {
        // 创建conf,设置应用程序的名字和运行的方式,local[2]表示本地模式运行两个线程,产生两个文件结果
        val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]")
        // 创建sparkcontext
        val sc = new SparkContext(conf)
        // 开始计算代码
        // textfile从hdfs中读取代码
        val file: RDD[String] = sc.textFile("hdfs://mini1:9000/words.txt")
        // 压平,分割每一行数据为每个单词
        val words: RDD[String] = file.flatMap(_.split(" "))
        val tuple: RDD[(String, Int)] = words.map((_, 1))
        val result: RDD[(String, Int)] = tuple.reduceByKey(_ + _)
        val resultBy: RDD[(String, Int)] = result.sortBy(_._2, false)
    
        // 打印结果
        resultBy.foreach(println)
      }
    
    }

    以上程序的输出结果 从控制台打印上来看可能没有排序,原因是local[2]启动了两个线程执行,产生了两个结果文件,local[1]相当于全局排序。

    4. 提交到集群执行

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
        // 创建conf,设置应用程序的名字和运行的方式,local[2]表示本地模式运行两个线程,产生两个文件结果
        //    val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]")
        // 提交到集群执行
        val conf = new SparkConf().setAppName("wordcount")
        // 创建sparkcontext
        val sc = new SparkContext(conf)
        // 开始计算代码
        // textfile从hdfs中读取代码
        val file: RDD[String] = sc.textFile(args(0))
        // 压平,分割每一行数据为每个单词
        val words: RDD[String] = file.flatMap(_.split(" "))
        val tuple: RDD[(String, Int)] = words.map((_, 1))
        val result: RDD[(String, Int)] = tuple.reduceByKey(_ + _)
        val resultBy: RDD[(String, Int)] = result.sortBy(_._2, false)
    
        // 打印结果
        //    resultBy.foreach(println)
        resultBy.saveAsTextFile(args(1))
      }
    
    }

    使用idea打包,上传至集群中的任意台机器。

    提交任务

    spark-submit --master spark://mini1:7077 --class cn.itcast.spark.WordCount  original-spark-learn-1.0-SNAPSHOT.jar hdfs://mini1:9000/words.txt hdfs://mini1:9000/ceshi-scala/


    Python实现

    ---

    #!/usr/bin/python
    
    from pyspark import SparkContext, SparkConf
    
    conf = SparkConf().setAppName("pywordCount").setMaster("spark://mini1:7077")
    sc = SparkContext(conf = conf)
    
    sc.textFile("hdfs://mini1:9000/words.txt").flatMap(lambda a:a.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).saveAsTextFile("hdfs://mini1:9000/wordcount/ceshi/")

    spark-submit wordcount.py



    展开全文
  • 解决pyspark问题:在集群中使用cluster模式,引入python复杂第三方库 问题:在工作中,使用spark-summit部署python第三方库保存的xgboost模型(单机库模型,非xgboost-4j),运行出错,ImportError: No module name...
  • 第1章 Spark的设计与运行原理 简介 hadoop中计算框架MapReduce的缺点: 表能能力有限,计算都必须要转化成Map和...于是,引入SPARK: 更多操作算子,计算模式虽然也属于MapReduce,但具备更多算子 内存计算,...
  • 注意:实验前先引入包from pyspark.context import SparkContext ,还需配置 sc = SparkContext('local', 'test') ,然后才能用sc做操作。一、常见的转换操作1、map() : 将函数应用于RDD中的每个元素,将返回值构成...
  • /etc/alternatives/spark-submit \ ...--conf "spark.pyspark.driver.python=/home/uther/miniconda2/envs/uther/bin/python3" \ --conf "spark.pyspark.python=/home/uther/miniconda2/envs/uther/bin/pyth.
  • 网上提交 scala spark 任务的攻略非常多,官方文档其实也非常详细仔细的介绍了 spark-submit 的用法。但是对于 python 的提交提及得非常少,能查阅到的资料非常少导致是有非常多的坑需要踩。 官方文档对于任务提交...
  • spark学习

    2020-12-26 17:05:03
    针对播放日志引入spark,提升问题分析效率。 spark概述 Apache Spark是用于大规模数据处理的统一分析引擎。它提供Java,Scala,Python和R的高级API,以及支持常规执行图的优化引擎。它还支持一组丰富的更高级别的...
  • 引入了 Apache Arrow 作为交换数据格式,这意味着我们可以避免 Java/Scala 和 Python 之间的 ser/der,这确实可以比传统方式加快通信效率。 当你在Java/Scala端调用python代码时,PyJava会自动启动一些python worker...
  • spark sql

    2018-07-05 22:20:19
    Spark SQL简介Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。2 Spark SQL的特性集成无缝地将SQL查询与Spark程序混合。 Spark...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 讲习班实践 在本研讨会中,练习集中于使用和 API,以及数据处理中的 。 我的github帐户(此处为scala)中的和Scala均提供了练习。... 如果要使用交互式spark-shell(仅限scala / python),则需要下载。
  • 1、spark的特性 (1)运行速度快,速度是hadoop ...(2)易用性好,spark不仅支持scala编程,还支持java、R语言和python编写。 (3)通用性好,spack on yarn、spark on mesos和standalone(spark自身带的资源框架) (...
  • //whereis python3 sudo -u hdfs pyspark --conf "spark.pyspark.driver.python=/usr/bin/python3" --conf "spark.pyspark.python=/usr/bin/...参考:Pyspark spark-submit 集群提交任务以及引入虚拟环境依赖包攻略
  • spark的特点,spark和mapreduce的比较

    千次阅读 2018-08-22 22:24:07
    1.spark的特点 (1)运行速度快,如果数据由磁盘读取,速度是hadoop mapreduce的10倍以上,...(2)易用性好,spark不仅支持scala编程呢个,还支持java和python编写。 (3)通用性好 (4)随处运行 2.spark和mapreduc...
  • pytz:支持跨平台时区计算,并将 tz database 引入 Python。 文本处理 用于解析和操作文本的库。 通用 chardet:字符编码检测器,兼容 Python2 和 Python3。 difflib:(Python 标准库)帮助我们进行差异化比较...
  • python 创建egg

    千次阅读 2016-11-14 15:37:23
    最近需要使用spark 运算数据,我是个Pythoner,自然用的是...查文档说是要把python 的package打包成egg,再提交上去才能正确引入。 上网查询了下python 打包,现在都是用setuptools,直接在package下穿件一个
  • Spark Streaming 快速入门

    2019-04-08 20:46:00
     Spark Streaming将Apache Spark的语言集成API引入流处理,使您可以像编写批处理作业一样编写流式作业。它支持Java,Scala和Python。  2.容错  Spark Streaming可以开箱即用,恢复丢失的工作和操作状态【例如...
  • Spark1.3.0以Spark SQL原有的SchemaRDD为蓝本,引入Spark DataFrameAPI,不仅为Scala、Python、Java三种语言环境提供了形如R和Pandas的API,而且自然而然地继承了SparkSQL的分布式处理能力。此外,S...
  • Spark-SQL note

    2020-01-30 23:12:00
    python 操作 RDD 慢:要转换可执行代码,在 JVM 中运行,涉及不同语言引擎间的切换,进行进程间的通信很耗费时间 DataFrame 以RDD为基础的分布式数据集,类似于关系型数据库的二维表 DataFrame引入 schema 和 ...
  • 之前的做法是用python调用pyspark或者Scala开发spark。这次想尝试不同的方法,用python脚本的方式来实现。 知识 首先要了解python脚本操作Linux系统的一些模块和方法--subprocess模块。Python2.4版本引入了...
  • 1 Spark 了解 开发语言:scala 目前支持的应用编程语言如下: ...(2)在Spark引入了RDD (Resilient Distributed Dataset)的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部.
  • 二、话说,python也要引入pyspark,还有下载spark的lib, 也挺麻烦的。只有scala可以直接调用 spark-shell, 还算好用。废话不多说,直接上代码 三、完整的pom,不多不少刚刚好。 <?xml version="1.0" encoding=...

空空如也

空空如也

1 2 3 4
收藏数 66
精华内容 26
关键字:

python引入spark

python 订阅