精华内容
下载资源
问答
  • pyspark 和spark区别

    2020-10-15 22:42:32
    最近在学数据分析处理相关工具,pyspark和spark区别是啥? 学pyspark是用python调用spark处理数据么, 如果是这样是不是就可以不单独学习spark了,求大佬赐教谢谢!
  • sample_spark3 如何使用findspark和pyspark使用spark3
  • vagrant-pyspark:Vagrant框,用于使用PySpark运行Spark作业单元测试
  • 带有PySparkSpark和Python用于大数据:Spark机器学习项目
  • 一个示例项目,旨在使用Apache Spark中的Pyspark和Spark SQL API演示ETL过程。 在这个项目中,我使用了Apache Sparks的Pyspark和Spark SQL API来对数据实施ETL过程,最后将转换后的数据加载到目标源。 我已经使用...
  • PySparkSpark3.0)

    2021-04-22 19:19:08
    PySpark简单来说就是Spark提供的Python编程API,包括交互式的PySpark shell非交互式的Python程序。 1.环境 Spark3.0 Hadooop3.2 Centos7 Python3.6.8 Pycharm Windos10 其中值得注意的是Python的版本必须是3.6+...

    PySpark(Spark3.0)

    PySpark简单来说就是Spark提供的Python编程API,包括交互式的PySpark shell和非交互式的Python程序。

    1.环境

    • Spark3.0
    • Hadooop3.2
    • Centos7
    • Python3.6.8
    • Pycharm
    • Windos10

    其中值得注意的是Python的版本必须是3.6+,以下是Spark官网的说明
    在这里插入图片描述

    前提

    ​ Spark3.0的集群已经搭建完毕,本文使用的是Standalone模式的集群

    ​ Hadoop3.2分布式集群搭建完毕

    2.PySpark shell

    2.1安装python3

    yum install -y python3
    

    PS:集群中的所有节点都要安装

    # 验证
    python3 -V
    

    在这里插入图片描述

    2.1配置环境变量

    PySparkShell的启动需要配置SPARK_HOMEPYSPARK_PYTHON这两个环境变量,如果不配置就会使用系统自带的Python2.7.5,由于Python2和Python3的语法是不兼容的,这样就会出现问题,导致PySPark无法使用。

    vi /etc/profile
    export SPARK_HOME=/opt/spark-3.0.2
    export PATH=$PATH:$SPARK_HOME/bin
    export PYSPARK_PYTHON=python3
    # 刷新环境变量
    source /etc/profile
    

    在这里插入图片描述
    注意:根据自己时间的路径配置

    2.2使用PySpark Shell

    # 启动Spark
    /opt/spark-3.0.2/sbin/start-all.sh
    # 在Spark的bin目录中有pySpark的脚本 直接全路径执行
    /opt/spark-3.0.2/bin/pyspark
    # 使用如下命令退出PySpark Shell
    exit()
    

    在这里插入图片描述

    如果Python的版本不是3.6+ 那就需要检查环境变量配置是否正确以及环境变量是否生效
    使用source /etc/profile 使环境变量的配置生效
    注意:这样启动只是一个Local模式的PySpark Shell

    2.3PySpark的WordCount

    WordCount单词次数计算 是一个比较经典的分布式计算样例,相当于Hello World了

    启动HDFS并将提前准备好单词文件上传到HDFS
    单词数据如下

    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    hello world
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    python java scala
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    spark flink mapreduce
    hello world
    hello world
    hello world
    hello world
    hello world
    
    # 使用vi创建一个文件并将单词数据复制到文件中
    vi 1.txt
    # 启动hdfs
    start-dfs.sh
    # 创建存放数据的文件
    hdfs dfs -mkdir -p /wc/in
    # 上传文件
    hdfs dfs -put 1.txt /wc/in/1.txt
    hdfs dfs -put 1.txt /wc/in/2.txt
    hdfs dfs -put 1.txt /wc/in/3.txt
    # 启动PySpark 指定Master 编写WordCount
    /opt/spark-3.0.2/bin/pyspark --master spark://master:7077
    # PySpark中提供了两个变量sc 和 spark
    # 其中sc 是 SparkContext对象
    # spark 是 SparkSession对象
    # 读取hdfs中的文件生成RDD Python是弱类型的语言 变量的定义比较随意
    lines = sc.textFile("hdfs://master:9000/wc/in")
    # 将每一行单词使用split 切分 分隔符为" "并压平
    # 这里会得到由一个个单独单词组成的RDD
    words = lines.flatMap(lambda x: x.split(" "))
    # 将单词和1组合在一起 (word,1)
    wordAndOne = words.map(lambda x: (x,1))
    # 对单词进行分组聚合
    reduced = wordAndOne.reduceByKey(lambda x,y: x + y)
    # 对聚合后的结果进行排序 默认为升序 False用于指定降序
    res = reduced.sortBy(lambda x: x[1],False)
    # 将结果收集到Drive 也就是shell
    res.collect()
    # 将结果保存到HDFS
    # 注意这里指定的HDFS不能存在 程序会自动生成
    res.saveAsTextFile("hdfs://master:9000/wc/res1")
    # 退出PySpark Shell
    quit()
    

    2.4在HDFS中查看结果

    hdfs dfs -cat /wc/res1/*
    hdfs dfs -ls /wc/res1/
    

    在这里插入图片描述

    ​ 结果分散在多个结果文件中,是全局有序的

    3.Pycharm中编写PySpark程序

    提前:Windows上安装好了Python3.6+

    1.解压Spark

    PySpark程序编写是在Windows10上的,首先将Spark3.0的安装包解压,解压到D:\app目录下,解压Spark安装包的原因是因为,安装包中提供了PySpark的依赖。
    在这里插入图片描述

    2.创建项目

    在创建项目时指定Python解释器的版本
    在这里插入图片描述
    配置项目依赖

    File --> Settings --> Project Structure
    在这里插入图片描述
    选择Add Content Root,在弹出的文件选择框中,选择Spark安装目录中的python文件夹下的lib目录中的py4j和pyspark的依赖文件,点击OK,将这两个依赖加入到当前项目的依赖库中。

    py4j 将Python代码转换为Java代码的库

    pyspark Python的Spark编程依赖库
    在这里插入图片描述

    3.编写WordCount程序

    新建一个demo文件夹,然后新建一个WordCount.py文件

    from pyspark import SparkConf, SparkContext
    
    if __name__ == '__main__':
        # 创建SparkConf对象 配置程序名为 WordCount 运行模式为local[*]
        # * 代表当前机器有几个逻辑核就启动几个线程
        conf = SparkConf().setMaster("wordCount").setMaster("local[*]")
        sc = SparkContext(conf=conf)
        # 读取hdfs中的文件生成RDD Python是弱类型的语言 变量的定义比较随意
        lines = sc.textFile("hdfs://master:9000/wc/in")
        # 将每一行单词使用split 切分 分隔符为" "并压平
        # 这里会得到由一个个单独单词组成的RDD
        words = lines.flatMap(lambda x: x.split(" "))
        # 将单词和1组合在一起 (word,1)
        wordAndOne = words.map(lambda x: (x, 1))
        # 对单词进行分组聚合
        reduced = wordAndOne.reduceByKey(lambda x, y: x + y)
        # 对聚合后的结果进行排序 默认为升序 False用于指定降序
        res = reduced.sortBy(lambda x: x[1], False)
        # 将结果收集到Drive 也就是本地
        print(res.collect())
        # 将结果保存到HDFS
        res.saveAsTextFile("hdfs://master:9000/wc/res3")
        # 关闭SparkContext
        sc.stop()
        
    

    程序报错,Could not find valid SPARK_HOME while searching…,主要是因为没有配置SPARK_HOME以及PYSPARK_PYTHON这两环境变量
    在这里插入图片描述
    点击右上角的程序配置,配置环境变量,添加SPARK_HOME值为SPARK安装包的解压路径,PYSAPRK_PYTHON指定PySpark的Python命令,注意python版本为3.6+。
    在这里插入图片描述
    继续运行程序,报错:Permission denied: user=killer
    在这里插入图片描述
    这里主要是由于HDFS的权限问题,需要将当前程序伪装成ROOT用户,按照上面的方法配置环境变量HADOOP_USER_NAME为root
    在这里插入图片描述
    然后运行成功
    在这里插入图片描述
    可以看到打印的单词出现次数,然后在HDFS中查看结果
    在这里插入图片描述

    展开全文
  • pyspark和spark pipe性能对比 用例程序

    千次阅读 2016-03-07 10:38:35
    //构造数据 public class Main { public ... org.apache.spark.SparkConf ... org.apache.spark.SparkContext ..." /home/gt/spark/bin/pipe.py" ...结论是Spark Scala是25s,pipe是50s,pyspark是75s
    //构造数据
    public class Main {
        public static void main(String[] args) throws IOException {
            File file = new File("/home/gt/testdata.dat");
            file.delete();
            file.createNewFile();
            OutputStream out = new FileOutputStream(file);
            OutputStreamWriter osw=new OutputStreamWriter(out);
            BufferedWriter writer = new BufferedWriter(osw);
            for(int i=0;i<9999999;i++){
                writer.write("aaabbbcccdddeee");
                writer.newLine();
            }
            writer.close();
            osw.close();
            out.close();
        }
    }

    pipe相关代码:

    #!/usr/bin/python
    #coding=utf-8
    def fff(line):
            s = set()
            l = list()
            length = len(line)
            for i in range(0,length-1):
                if line[i] not in s:
                    l.append(line[i]) 
                    s.add(line[i])
            return "".join(l)
    result = ""
    #var = 1  
    #while var == 1 :
    for i in range(1,1111111):
        s = raw_input()
        if s is None or s =="" :
            break
        result += fff(s) + "\n"
    print result
    package test
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object PipeTest {
    
      def main(args: Array[String]) {
        val t0 = System.currentTimeMillis();
        val sparkConf = new SparkConf().setAppName("pipe Test")
        val sc = new SparkContext(sparkConf)
    
        val a = sc.textFile("/home/gt/testdata.dat", 9)
        val result = a.pipe(" /home/gt/spark/bin/pipe.py").saveAsTextFile("/home/gt/output.dat")
        sc.stop()
        println("!!!!!!!!!" + (System.currentTimeMillis() - t0));
      }
    }

    pyspark相关代码

    #-*- coding: utf-8 -*-
    from __future__ import print_function
    
    import sys
    import time
    from pyspark import SparkContext
    
    #去掉重复的字母
    if __name__ == "__main__":
        t0 = time.time()
        sc = SparkContext(appName="app2ap")
        lines = sc.textFile("/home/gt/testdata.dat", 9)
        def fff(line):
            s = set()
            l = list()
            length = len(line)
            for i in range(0,length-1):
                if line[i] not in s:
                    l.append(line[i]) 
                    s.add(line[i])
            return "".join(l)                       
        rdd = lines.map(fff)
        rdd.saveAsTextFile("/home/gt/output.dat")
        sc.stop()
        print("!!!!!!")
        print(time.time()-t0)

    附加原生的程序:

    package test
    
    import java.util.ArrayList
    import java.util.HashSet
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object Test {
    
      def fff(line: String): String = {
        val s = new HashSet[Char]()
        val l = new ArrayList[Char]()
        val length = line.length()
        for (i <- 0 to length - 1) {
          val c = line.charAt(i)
          if (!s.contains(c)) {
            l.add(c)
            s.add(c)
          }
        }
        return l.toArray().mkString
      }
    
      def main(args: Array[String]) {
        val t0 = System.currentTimeMillis();
        val sparkConf = new SparkConf().setAppName("pipe Test")
        val sc = new SparkContext(sparkConf)
    
        val a = sc.textFile("/home/gt/testdata.dat", 9)
        val result = a.map(fff).saveAsTextFile("/home/gt/output.dat")
        sc.stop()
        println("!!!!!!!!!" + (System.currentTimeMillis() - t0));
      }
    
    }

    结论是Spark Scala是25s,pipe是50s,pyspark是75s

    展开全文
  • Local模式: 开发 简单的集群管理,自带的 ...http://spark.apache.org/docs/latest/spark-standalone.html#installing-spark-standalone-to-a-cluster standalone hdfs : 主根NameNode 从根 DataNode yarn: ...

    #清除防火墙

    [root@hadoop000 python]# systemctl stop firewalld.service
    [root@hadoop000 python]# systemctl disable firewalld.service
    

    [root@hadoop000 conf]# vim ~/.bash_profile

    export JAVA_HOME=/root/app/jdk1.8.0_211
    export PATH=$JAVA_HOME/bin:$PATH
    
    export SCALA_HOME=/root/app/scala-2.11.8
    export PATH=$SCALA_HOME/bin:$PATH
    
    export HADOOP_HOME=/root/app/hadoop-2.6.0-cdh5.7.0
    export PATH=$HADOOP_HOME/bin:$PATH
    
    export MAVEN_HOME=/root/app/apache-maven-3.3.9
    export PATH=$MAVEN_HOME/bin:$PATH
    
    export PATH=/root/app/python3/bin:$PATH
    export PYSPARK_PYTHON=python3
    
    export SPARK_HOME=/root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0
    export PATH=$SPARK_HOME/bin:$PATH
    
    export HIVE_HOME=/root/app/hive-1.1.0-cdh5.7.0
    export PATH=$HIVE_HOME/bin:$PATH
    
    

    [root@hadoop000 conf]# source ~/.bash_profile

    一、Local模式:
    开发

    http://spark.apache.org/docs/latest/submitting-applications.html

    简单的集群管理,自带的

    –master
    –name
    –py-files

    #ni.txt

    nihao   woqu    nihao
    hello   welcome
    woqu    hahah
    
    

    #spark0406.py

    import sys
    
    from pyspark import SparkConf, SparkContext
    
    if __name__ == '__main__':
    
        if len(sys.argv) != 3:
            print("Usage: wordcount <input> <output>", file=sys.stderr)
            sys.exit(-1)
    
        conf = SparkConf()
        sc = SparkContext(conf=conf)
    
        def printResult():
            counts = sc.textFile(sys.argv[1]) \
                .flatMap(lambda line: line.split("\t")) \
                .map(lambda x: (x, 1)) \
                .reduceByKey(lambda a, b: a + b)
    
            output = counts.collect()
    
            for (word, count) in output:
                print("%s: %i" % (word, count))
    
    
        def saveFile():
            sc.textFile(sys.argv[1]) \
                .flatMap(lambda line: line.split("\t")) \
                .map(lambda x: (x, 1)) \
                .reduceByKey(lambda a, b: a + b)\
                .saveAsTextFile(sys.argv[2])
    
        printResult()
    
        sc.stop()
    
    

    模式用 --master local[2]

    name 叫什么名字 --name spark-local

    在python是执行文件的路径 /root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/python/spark0406.py

    需要统计 ,需要读入文件就是输入 file:///root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/python/ni.txt

    输出 file:///root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/python/wc/output

    [root@hadoop000 bin]# ./spark-submit --master local[2] --name spark-local  /root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/python/spark0406.py file:///root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/python/ni.txt  file:///root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/python/wc/output
    

    二、standalone模式下的pyspark

    http://spark.apache.org/docs/latest/spark-standalone.html#installing-spark-standalone-to-a-cluster

    在这里插入代码片
    

    standalone
    hdfs : 主根NameNode 从根 DataNode
    yarn: ResourceManager NodeManager

    第一步启动你集群
    在这里插入图片描述
    第二步骤 在启动一个从节点
    在这里插入图片描述

    /root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/conf
    

    在这里插入图片描述
    在这里插入图片描述

    $SPARK_HOME/conf/slaves
    假设你有5台机器,就应该进行如下slaves的配置
    hadoop000
    hadoop001
    hadoop002
    hadoop003
    hadoop004

    如果是多台机器,那么每台机器都在相同的路径下部署spark

    在这里插入图片描述
    在这里插入图片描述
    spark://hadoop000:7077

    端口是7077
    外部端口是8080 如果被占用就自动加1了

    下面都是失败了
    在这里插入图片描述
    把 jdk 写入到 下面的文件里面
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://hadoop000:8020/directory"
    

    在这里插入图片描述
    在这里插入图片描述

    SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://hadoop000:8020/directory"
    PYSPARK_PYTHON=/root/app/python3/bin/python3
    export SPARK_MASTER_IP=192.168.194.151
    export SPARK_WORKER_MEMORY=5120m
    export SPARK_WORKER_CORES=1
    export SPARK_MASTER_PORT=7077
    export SPARK_MASTER_WEBUI_PORT=8080
    export SPARK_WORKER_INSTANCES=1
    export SPARK_EXECUTOR_EMEORY=5120m
    

    #我的安装地址

    JAVA_HOME=/root/app/jdk1.8.0_211
    

    在 启动一下
    在这里插入图片描述
    启动成功了
    在这里插入图片描述在这里插入图片描述
    8080 是集群外部ui地址

    如何提交呢
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    测试一下

    在这里插入图片描述

    在这里插入图片描述

    worker工作原理:

    1.Master要求Worker启动Driver和Executor。(LaunchDriver和LaunchExecutor)

    2.worker内部会启动一个线程DriverRunner,它会去负责启动Driver进程并进行管理。

    3.worker内部会启动一个线程ExecutorRunner,它会去负责启动Executor进程并进行管理。

    4.Executor找到对应Driver去反向注册自己。

    SPARK_WORKER_CORES 作业可用的CPU内核数量 所有可用的CPU内核数
    SPARK_WORKER_INSTANCES 每台机器上运行的worker数量 1
    SPARK_WORKER_CORES × SPARK_WORKER_INSTANCES 每个节点使用的最大内存数
    SPARK_WORKER_INSTANCES × SPARK_WORKER_MEMORY 作业可使用的内存容量 1G
    SPARK_DAEMON_MEMORY 分配给Spark master和worker守护进程的内存空间 512M

    配置 说明

    executor-memory 在spark-shell或spark-submit提交spark应用程序时申请使用的内存数量;不要超过节点的
    spark.storage.memoryFraction spark应用程序在所申请的内存资源中可用于cache的比例
    spark.shuffle.memoryFractio spark应用程序在所申请的内存资源中可用于shuffle的比例

    注意事项:

    对于频繁使用的表或查询才进行缓存,对于只使用一次的表不需要缓存;
    对于join操作,优先缓存较小的表;
    要多注意Stage的监控,多思考如何才能更多的Task使用PROCESS_LOCAL;
    要多注意Storage的监控,多思考如何才能Fraction cached的比例更多

    三、standalonem模式spark-submit运行

    #存入文件

    [root@hadoop000 python]# hadoop fs -put ni.txt
    

    #查看文件

    [root@hadoop000 python]# hadoop fs -text /ni.txt
    
    [root@hadoop000 python]# hadoop fs -put ni.txt /wc.txt
    

    如果使用standalone模式,而且你的节点个数大于1的时候,如果你使用本地文件测试,必须保证每个节点都有本地测试文件

    [root@hadoop000 sbin]# ./start-dfs.sh

    [root@hadoop000 bin]# ./spark-submit --master spark://hadoop000:7077 --name spark-standalone /root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/python/spark0406.py hdfs://hadoop000:8020/wc.txt hdfs://hadoop000:8020/wc/output
    

    四、standalonem yarn

    http://spark.apache.org/docs/latest/running-on-yarn.html

    yarn
    mapreduce yarn
    spark on yarn 70% 都在用
    spark作业客户端而已 他需要做的事情是提交作业到yarn上
    yarn vs standalon
    yarn:你只需要一个节点,然后提交作业即可 这个不是需要spark集群的 (不需要启动master和work)
    standalon: 你的spark集群上每个节点都需要部署spark,然后需要启动spark集群(需要master和work)

    问题: When running with master ‘yarn’ either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

    [root@hadoop000 conf]# pwd
    /root/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/conf
    vim spark-env.sh
    

    先看下linux python解析器在哪个路径下
    在这里插入图片描述
    在这里插入图片描述

    #需要启动它

    [root@hadoop000 sbin]# pwd
    /root/app/hadoop-2.6.0-cdh5.7.0/sbin
    [root@hadoop000 sbin]# ./start-yarn.sh 
    

    在这里插入图片描述

    from pyspark import SparkConf,SparkContext
    from pyspark.sql import SparkSession
    import sys
    
    if __name__ == '__main__':
    
    
            if len(sys.argv) !=3:
                    print("Usage: wordcount <input> <output>", file = sys.stderr)
                    sys.exit(-1)
    
            conf = SparkConf()
            sc = SparkContext(conf=conf)
    
            def printResult():
                    counts = sc.textFile(sys.argv[1])\
                            .flatMap(lambda line:line.split("\t"))\
                            .map(lambda x:(x,1))\
                            .reduceByKey(lambda a,b:a+b)
    
                    output = counts.collect()
                    for (word, count) in output :
                            print("%s: %i" % (word,count))
    
            #写入文件系统里面去
            def saveFile():
                    sc.textFile(sys.argv[1]) \
                    .flatMap(lambda line: line.split("\t")) \
                    .map(lambda x:(x,1)) \
                    .reduceByKey(lambda a,b:a+b)\
                    .saveAsTextFile(sys.argv[2])
    
            printResult()
    
            sc.stop()
    
    

    在这里插入图片描述
    在这里插入图片描述

    #运行

    [root@hadoop000 bin]# ./spark-submit --master yarn --name spark-yarnuu /root/data/spark0402.py hdfs://hadoop000:8020/ni.txt hdfs://hadoop000:8020/wc/output
    

    yarn 支持client和cluster模式: driver运行在哪里
    client是本地
    cluster是集群

    client: 提交作业的进程是不能停止的,否则作业就挂了
    cluster: 提交完作业,那么提交作业端就可以断开了,因为driver是运行在am里面的

    在这里插入图片描述

    展开全文
  • PySpark - Spark SQL基础

    2020-06-26 00:22:31
    Spark SQL 是 Apache Spark 处理结构化数据的模块。 一、初始化 ...from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config

    Spark SQL 是 Apache Spark 处理结构化数据的模块。

    一、初始化 SparkSession

    SparkSession 用于创建数据框,将数据框注册为表,执行 SQL 查询,缓存表及读取 Parquet 文件。

    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    sc = spark.sparkContext
    

    二、创建数据框

    1、从 RDD 创建

    from pyspark.sql.types import *  包括:
    __all__ = [
        "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType",
        "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType",
        "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"]
    from pyspark.sql import Row
    from pyspark.sql.types import *
    from collections import namedtuple

    (1) 推断 Schema

    lines = sc.textFile("people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
    peopledf = spark.createDataFrame(people)
    peopledf.show()
    +---------+---+
    |     name|age|
    +---------+---+
    |    Danae| 27|
    |  Claudia| 26|
    |Desdemona| 25|
    |    Chloe| 30|
    |  Felicia| 25|
    +---------+---+

    (2)指定 Schema

    people = parts.map(lambda p: Row(name=p[0], age=int(p[1].strip())))
    schemaString = "name age"
    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)
    spark.createDataFrame(people, schema)
    print(type(df))
    <class 'pyspark.sql.dataframe.DataFrame'>

    2、从 Spark 数据源创建

    (1)JSON

    df = spark.read.json("customer.json")
    df.show()
    # +-------+---+---------+--------+-----------+
    # |address|age|firstName|lastName|phoneNumber|
    # +-------+---+---------+--------+-----------+
    # |BeiJing| 29|      Tom|   Smith|13911112222|
    # |NanJing| 30|     July|     Doe|13922223333|
    # +-------+---+---------+--------+-----------+
    df2 = spark.read.load("people.json", format="json")
    df2.show()
    # +---+---------+
    # |age|     name|
    # +---+---------+
    # | 27|    Danae|
    # | 26|  Claudia|
    # | 25|Desdemona|
    # | 30|    Chloe|
    # | 25|  Felicia|
    # +---+---------+

    (2)Parquet 文件

    df3 = spark.read.load("people.parquet")
    df3.show()
    # +---------+---+
    # |     name|age|
    # +---------+---+
    # |    Danae| 27|
    # |  Claudia| 26|
    # |Desdemona| 25|
    # |    Chloe| 30|
    # |  Felicia| 25|
    # +---------+---+

    (3)文本文件

    df4 = spark.read.text("people.txt")
    df4.show()
    # +------------+
    # |       value|
    # +------------+
    # |    Danae,27|
    # |  Claudia,26|
    # |Desdemona,25|
    # |    Chloe,30|
    # |  Felicia,25|
    # +------------+

    3、查阅数据信息

    df.dtypes               # 返回 df 的列名与数据类型
    df.show()               # 显示 df 的内容
    df.head()               # 返回前 n 行数据
    df.first()              # 返回第 1 行数据
    df.take(2)              # 返回前 n 行数据
    df.schema               # 返回 df 的 Schema
    df.describe().show()    # 汇总统计数据
    df.columns              # 返回 df 的列名
    df.count()              # 返回 df 的行数
    df.distinct().count()   # 返回 df 中不重复的行数
    df.printSchema()        # 返回 df的 Schema
    df.explain()            # 返回逻辑与实体方案
    返回 df 的列名与数据类型: 
    [('address', 'string'), ('age', 'bigint'), ('firstName', 'string'), ('lastName', 'string'), ('phoneNumber', 'string')]
    显示 df 的内容: 
    +-------+---+---------+--------+-----------+
    |address|age|firstName|lastName|phoneNumber|
    +-------+---+---------+--------+-----------+
    |BeiJing| 29|      Tom|   Smith|13911112222|
    |NanJing| 30|     July|     Doe|13922223333|
    +-------+---+---------+--------+-----------+
    返回前 n 行数据: 
    [Row(address='BeiJing', age=29, firstName='Tom', lastName='Smith', phoneNumber='13911112222'), Row(address='NanJing', age=30, firstName='July', lastName='Doe', phoneNumber='13922223333')]
    返回前 1 行数据: 
    Row(address='BeiJing', age=29, firstName='Tom', lastName='Smith', phoneNumber='13911112222')
    返回前 n 行数据: 
    [Row(address='BeiJing', age=29, firstName='Tom', lastName='Smith', phoneNumber='13911112222'), Row(address='NanJing', age=30, firstName='July', lastName='Doe', phoneNumber='13922223333')]
    返回 df 的 Schema: 
    StructType(List(StructField(address,StringType,true),StructField(age,LongType,true),StructField(firstName,StringType,true),StructField(lastName,StringType,true),StructField(phoneNumber,StringType,true)))
    汇总统计数据: 
    +-------+-------+------------------+---------+--------+-----------------+
    |summary|address|               age|firstName|lastName|      phoneNumber|
    +-------+-------+------------------+---------+--------+-----------------+
    |  count|      2|                 2|        2|       2|                2|
    |   mean|   null|              29.5|     null|    null| 1.39166677775E10|
    | stddev|   null|0.7071067811865476|     null|    null|7856741.934616441|
    |    min|BeiJing|                29|     July|     Doe|      13911112222|
    |    max|NanJing|                30|      Tom|   Smith|      13922223333|
    +-------+-------+------------------+---------+--------+-----------------+
    返回 df 的列名: ['address', 'age', 'firstName', 'lastName', 'phoneNumber']
    返回 df 的行数: 2
    返回 df 的 Schema: 
    root
     |-- address: string (nullable = true)
     |-- age: long (nullable = true)
     |-- firstName: string (nullable = true)
     |-- lastName: string (nullable = true)
     |-- phoneNumber: string (nullable = true)
    
    返回逻辑与实体方案: 
    == Physical Plan ==
    *(1) FileScan json [address#6,age#7L,firstName#8,lastName#9,phoneNumber#10] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/D:/Mypython/spark/customer.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<address:string,age:bigint,firstName:string,lastName:string,phoneNumber:string>
    

    4、重复值

    df = df.dropDuplicates()

    5、查询

    1)Select

    df.select("firstName", "lastName").show()                # 显示 firstName、age 的所有条目
    +---------+--------+
    |firstName|lastName|
    +---------+--------+
    |      Tom|   Smith|
    |     July|     Doe|
    +---------+--------+
    df.select("firstName", "age", F.explode("phoneNumber").alias("contactInfo"))\
        .select("contactInfo.type", "firstName", "age")\
        .show()                                              # 显示 firstName、age 的所有条目和类型
    df.select(df["firstName"], df["age"] + 1).show()         # 显示 firstName 和 age 列的所有记录,并对 age 记录添加1
    +---------+---------+
    |firstName|(age + 1)|
    +---------+---------+
    |      Tom|       30|
    |     July|       31|
    +---------+---------+
    df.select(df['age'] > 29).show()                         # 显示所有小于24岁的记录
    +----------+
    |(age > 29)|
    +----------+
    |     false|
    |      true|
    +----------+

    (2)When

    df.select("firstName", F.when(df.age > 29, 1).otherwise(0)).show()
    +---------+--------------------------------------+
    |firstName|CASE WHEN (age > 29) THEN 1 ELSE 0 END|
    +---------+--------------------------------------+
    |      Tom|                                     0|
    |     July|                                     1|
    +---------+--------------------------------------+
    df[df.firstName.isin("Tom", "July")].show()
    +-------+---+---------+--------+-----------+
    |address|age|firstName|lastName|phoneNumber|
    +-------+---+---------+--------+-----------+
    |BeiJing| 29|      Tom|   Smith|13911112222|
    |NanJing| 30|     July|     Doe|13922223333|
    +-------+---+---------+--------+-----------+

    (3)Like

    df.select("firstName", df.lastName.like("Smith")).show()
    +---------+-------------------+
    |firstName|lastName LIKE Smith|
    +---------+-------------------+
    |      Tom|               true|
    |     July|              false|
    +---------+-------------------+

    (4)Startswith & Endswith

    df.select("firstName", df.lastName.startswith("Sm")).show()
    +---------+------------------------+
    |firstName|startswith(lastName, Sm)|
    +---------+------------------------+
    |      Tom|                    true|
    |     July|                   false|
    +---------+------------------------+
    df.select(df.lastName.endswith("th")).show()
    +----------------------+
    |endswith(lastName, th)|
    +----------------------+
    |                  true|
    |                 false|
    +----------------------+

    (5)Substring

    df.select(df.firstName.substr(1, 3).alias("name")).show()    # alias 对列进行重命名
    +----+
    |name|
    +----+
    | Tom|
    | Jul|
    +----+

    (6)Between

    df.select(df.age.between(22, 29)).show()
    +-----------------------------+
    |((age >= 22) AND (age <= 29))|
    +-----------------------------+
    |                         true|
    |                        false|
    +-----------------------------+

    6、列操作

    (1)添加列

    df = df.withColumn('telePhoneNumber', F.explode(df.phoneNumber.number))

    (2)修改列

    df = df.withColumnRenamed('phoneNumber', 'callNumber')
    df.show()
    +-------+---+---------+--------+-----------+
    |address|age|firstName|lastName| callNumber|
    +-------+---+---------+--------+-----------+
    |BeiJing| 29|      Tom|   Smith|13911112222|
    |NanJing| 30|     July|     Doe|13922223333|
    +-------+---+---------+--------+-----------+

    (3)删除列

    df = df.drop("address", "callNumber")
    df = df.drop(df.address).drop(df.callNumber)
    df.show()
    +---+---------+--------+
    |age|firstName|lastName|
    +---+---------+--------+
    | 29|      Tom|   Smith|
    | 30|     July|     Doe|
    +---+---------+--------+

    7、分组

    df.groupBy("age").count().show()  # 按 age 列分组,统计每组人数
    +---+-----+
    |age|count|
    +---+-----+
    | 29|    1|
    | 30|    1|
    +---+-----+

    8、筛选

    df.filter(df["age"] > 29).show()    # 按 age 列筛选,保留年龄大于29岁的
    +---+---------+--------+
    |age|firstName|lastName|
    +---+---------+--------+
    | 30|     July|     Doe|
    +---+---------+--------+

    9、排序

    df.sort(peopledf.age.desc()).show()
    df.sort("age", ascending=False).show()
    df.orderBy(["age", "city"], ascending=[0, 1]).show()
    +---+---------+--------+
    |age|firstName|lastName|
    +---+---------+--------+
    | 30|     July|     Doe|
    | 29|      Tom|   Smith|
    +---+---------+--------+

    10、替换缺失值

    df.na.fill(50).show()           # 用一个值替换空值
    df.na.drop().show()             # 去除 df 中为空值的行
    df.na.replace(10, 20).show()    # 用一个值替换另一个值

    11、重分区

    df.repartition(10).rdd.getNumPartitions()   # 将 df 拆分为10个分区
    df.coalesce(1).getNumPartitions()           # 将 df 合并为1个分区

    12、运行 SQL 查询

    (1)将数据框注册为视图

    df.createGlobalTempView("people")
    df.createTempView("customer")
    df.createOrReplaceTempView("customer")

    (2)查询视图

    df5 = spark.sql("SELECT * FROM customer").show()
    peopledf2 = spark.sql("SELECT * FROM global_temp.people").show()

    13、输出

    (1)数据结构

    rdd1 = df.rdd
    df.toJSON().first()
    df.toPandas()

    (2)保存至文件

    df.select("firstName", "address").write.save("nameAndAddress.parquet")
    df.select("firstName", "age").write.save("namesAndAges.json", format="json")

    14、终止 SparkSession

    spark.stop()

    摘自DataCamp
    Learn Python for Data Science Interactively

    展开全文
  • 在本地环境中 spark = SparkSession.builder \ .master('local[*]') \ .appName('cons_test') \ .config("spark.jars", "E:\postgresql-42.2.18.jar") \ .getOrCreate() ... from pyspark.sql import.
  • PyPMML-SparkPySpark的Python PMML评分库,称为SparkML Transformer,它实际上是的Python API。 先决条件 Java> = 1.8 Python 2.7或> = 3.5 依存关系 模组 PySpark PySpark> = 3.0.0 PySpark> = 2.4.0,<...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 17,947
精华内容 7,178
关键字:

pyspark和spark的区别