精华内容
下载资源
问答
  • Pyspark实战

    2019-09-25 19:59:03
    本篇文章介绍pyspark的使用,参考书籍《pyspark实战》。 Apache Spark最初由Matei Zaharia开发,于2013年创建了Databricks公司并担任CTO。 1.spark介绍 spark提供MapReduce的灵活性和可扩展性,但是明显速度更快...

    本篇文章介绍pyspark的使用,参考书籍《pyspark实战》。

    Apache Spark最初由Matei Zaharia开发,于2013年创建了Databricks公司并担任CTO。

    1.spark介绍

    spark提供MapReduce的灵活性和可扩展性,但是明显速度更快。可以使用java,scala,sql,R和Python访问Spark API。

    1.1 spark作业和API

    spark作业与一系列对象依赖相关联,这些依赖关系以DAG图的方式组织。

    弹性分布式数据集(RDD)是JVM对象的分布式集合,Spark是围绕RDD构建的。RDD有两种操作:转换动作

    DataFrame与RDD类似,区别主要是DataFrame是以命名列的方式组织的。与关系型数据库中的表类似。与java、scala相比,python中的RDD速度很慢,但是DataFrame在各种语言中性能相似。

    Spark SQL的核心是Catalyst优化器。

    Spark 2.0发布的三个主要主题包括:性能增强;引入结构化流;统一Dataset和DataFrame。

     

     

    2.RDD

    2.1 创建RDD

    下边使用两种方式生成RDD,

    from pyspark import SparkContext
    
     data = SparkContext.parallelize([('alibaba', 790),('tencent',780),('jd', 50)])
     data1 = SparkContext.textFile()
    View Code

    RDD是无Schema的数据结构,如果对数据集使用方法.collect(),collect方法执行把数据集送回驱动的操作,驱动器将其序列化成一个列表。

     

    转载于:https://www.cnblogs.com/natty-sky/p/11289636.html

    展开全文
  • PySpark-Learning PySpark实战指南(Leaning PySpark)代码
  • Python大数据处理库 PySpark实战大数据时代分析工具Spark核心组件重要概念部署模式基本操作 这是《Python大数据处理库 PySpark实战》一书的总结归纳 大数据时代 大数据的特点:大量、高速、多样、低价值密度、真实...


    这是《Python大数据处理库 PySpark实战》一书的总结归纳

    大数据时代

    • 大数据的特点:大量、高速、多样、低价值密度、真实性
    • 谷歌三篇论文是大数据的基石:Google File System 、Google MapReduce、Google Bi 个Table,解决存储、计算、查询的问题

    分析工具

    • Hadoop:HDFS存储、MapReduce计算、YARN资源调度,离线,不适合随机读写的在线事务处理模型
    • Hive把结构化的文件映射为数据表,可以用来查询数据,把SQL翻译成MR,不适合用于联机事务处理和实时查询,适用于大量不可变数据的批处理
    • HBase:分布式、面向列的开源数据库,适合存储非结构化数据,适合写入量大读数据小的应用,查询简单的应用,对性能和可靠性要求较高的应用,内部使用LSTM树模型,读取任意数量的记录不会引发额外的寻道开销
    • Spark:任务中间输出结果保存在内存中,适用于机器学习等需要迭代的算法,用Scala开发和可和Java一起使用,计算速度快,提供80多个高级运算符(转换、聚合),有很多对应组件(Spark SQL、Spark Streaming等),支持多种资源管理器(YARN、Standalone)
    • Flink:计算框架和分布式处理引擎,对无界和有界数据流进行有状态计算,基于流执行引擎,DataSet API:对静态数据进行批处理操作;DataStream API:对数据流进行流处理操作;Table API:对结构化数据进行操作
    • Druid:分布式支持实时多维OLAP分析的数据处理系统,支持数据实时处理和多维度数据分析查询,支持根据时间戳对数据进行预聚合摄入和聚合分析,对时序数据处理,支持较高并发,实时导入,导入即可被查询,采用分布式shared-nothing架构,可扩展到PB级,数据查询支持SQL。
    • Kafka:为处理实时数据提供一个统一、高通量、低等待的平台。用于:发布和订阅(消息系统,读写流式数据),流数据处理,数据存储。可以作为连接各个子系统的数据管道,从而构建实时的数据管道和流式应用
    • Elasticsearch:开源的分布式的,提供Restful API的搜索和数据分析引擎,底层是Apache Lucene,用Java开发,用Lucence做索引和搜索,全文索引,适用于实时文档存储和分析搜索引擎,能横向扩展支持PB级别的数据,可以用Kibana实现数据可视化分析

    Spark

    • 用户提交的任务为Application,一个App对应一个SparkContext,一个APP中存在多个Job,每触发一次Action操作就会产生一个Job(可串行也可并行),每个Job中有多个Stage,每个Stage有多个Task,由TaskScheduler发到各个Executor中执行,Executor和Application一样没有Job运行也存在的。每个Job可包含多个RDD转换算子,调度时可以生成多个Stage

    核心组件

    • Spark Core:包含任务调度、内存管路和容错机制,内部定义RDD(弹性分布式数据集)
    • Spark SQL:处理结构化数据的查询分析
    • Spark Streaming:实时数据流处理组件,需要配合消息队列Kafka
    • Spark Mllib:包含通用机器学习功能的包
    • Spark GraphX:处理图的库,如社交网络图的计算

    重要概念

    • Application:提交一个作业就是一个Application,一个App对应一个SparkContext

    • Driver程序:Spark作业运行时会启动Driver进程,负责解析、生成Stage和调度Task到Executor执行,运行main函数并创建SparkContext

    • Cluster Manager:资源管理器 Standalone模式或者额yarn

    • Executor:真正执行作业Task的地方,接受Driver命令家在和运行Task,一个Executor执行一个到多个Task,多个Task之间可以相互通信

    • SparkContext:程序运行调度的核心,是Spark程序的入口, 由调度器DAGScheduler划分程序各个阶段,划分Stage,生成程序运行的有向无环图;调度器TaskSchedyker划分每个阶段的具体任务,Task的调度和容错;SchedulerBanked管理正在运行的程序分配计算资源的Exector

    • Job:工作单元,每触发一次Action操作就会产生一个Job(可串行也可并行),每个Job中有多个Stage,每个Stage有多个Task,

    • Stage:用来计算中间结果的Tasksets(一组相关联的任务集),在Shuffle的时候产生,如果下一个Stage要用到上一个Stage的全部数据要等上一个全部执行完,有两种Stage:ShuffleMapStage和ResultStage(最后一个Stage),其他的都是前者。ShuffleMapStage会产生中间结果,Stage经常被不同Job共享,前提是Job重用了同一个RDD

    • Task:任务执行的工作单位,每个Task会被发送到一个Worker节点上,每个Task对应RDD的一个Partition

    • RDD:弹性分布式数据集,不可变的、Lazy级别的、粗粒度的数据集合,包含一个或多个数据分片

    • DAG:有向无环图,将一个计算任务按照计算规则分解为若干子任务,子任务根据逻辑关系构建成为有向无环图

    • 算子:Transformation由DAGSchedular划分到pipline,Lazy级别;Action算子触发Job执行运算

    • 窄依赖:父RDD的分区只对应一个子RDD的分区,如果子RDD只有部分分区数据损坏,只要从对应的父RDD重新计算即可

    • 宽依赖:子RDD分区依赖父RDD的所有分区。损坏需要从所有父RDD重新计算,数据处理成本高,尽量避免

    • Lineage:每个RDD记录自己依赖的父RDD信息

    部署模式

    • Local模式

      • 本地采用多线程

      • ./bin/spark-submit \
        --class com.myspark.Job.WordCount \
        --master local[*] \
        /root/sparkjar/spark-demo-1.0.jar
        
    • Spark on YARN

      • yarn-cluster模式下,Driver运行在Application Master中,节点的选在由YARN调度,可关闭Client,计算结果不在Client上显示,不适合交互类型的作业

      • yarn-client模式下,Driver运行在Client端,和请求的YARN Container通信来调度工作,不可关闭Client,适合交互和调试

      • # yarn-cluster
        ./bin/spark-submit \
        --class com.myspark.Job.WordCount \
        --master yarn \
        --deploy-mode cluster \
        /root/sparkjar/spark-demo-1.0.jar
        
        # yarn-client
        ./bin/spark-submit \
        --class com.myspark.Job.WordCount \
        --master yarn \
        --deploy-mode client \
        /root/sparkjar/spark-demo-1.0.jar
        
    • Standalone模式

    • 和Local类似,Standalone的分布式调度器是Spark提供的,集群是Standalone的话,每个机器上都要有Spark

    • ./bin/spark-submit \
      --class com.myspark.Job.WordCount \
      -master spark://192.168.1.71:7077 \
      -executor-memory 16G \
      -total-executor-cores 16\
      /root/sparkjar/spark-demo-1.0.jar
      

    基本操作

    • 常用的Transformation操作
      • map: 接收一个处理函数并行处理源RDD中的每个元素,返回与源RDD元素一一对应的新RDD。
      • fiter: 并行处理源RDD中的每个元素,接受一个函数,并根据定义的规则对RDO
        中的每个元素进行过滤处理,返回处理结果为true的元素重新组成新的RDD
      • flatMap: flatMap是 map和flatten的组合操作,与map函数相似,不过map函数返回的新RDD包含的元素可能是嵌套类型。
      • mapPartitions: 应用于RDD中的每个分区。mapPartitions函数接受的参数为一个函数,该函数的参数为每个分区的送代器,返回值为每个分区元素处理之后组成的新的迭代器,该函数会作用于分区中的每一个元素。
      • mapPartitionsWithIndex:作用与mapPartitions函数相同,只是接受的参数(一个函数),需要传入两个参数,分区的索引作为第一一个参数传入,按照分区的索引对分区中元素进行处理。
      • Union:将两个RDD进行合并,返回结果为RDD中元素(不去重)。
      • Intersection:对两个RDD进行取交集运算,返回结果为RDD无重复元素。
      • Distinct:对RDD中元素去重。
      • groupByKey:在健值对(K-V)类型的RDD中按Key分组,将相同Key的无素聚集到同一个分区内,此函数不能接受函数作为参数,只接受一个可选参数,即任务数。
      • reduceByKey:对K-V类型的RDD按Key分组,它接受两个参数,第一个参数为处理函数,第二个参数为可选参数,用于设置reduce的任务数。reduceByKey函数能够在RDD分区本地提前进行聚合运算,这有效减少了shuffle过程传输的数量groupByKey函数更简洁高效。
      • aggregateByKey:对K-V 类型的RDD按Key分组进行reduce计算,可接受三个参数,第一个是初始化值,第二个是分区内处理函数,第三个是分区间处理函数
      • sortByKey: 对K-V类型的RDD内部元素按照Key进行排序,排序过程会涉及Shuffle操作。
      • join: 对K-V类型的RDD进行关联操作,它只能处理两个RDD之间的关联,超过两个RDD关联需要多次使用join函数。另外,join操作只会关联出具有相同Key的元素,相
        当于SQL语句中的inner join。
      • cogroup:对K-V类型的RDD进行关联,cogroup在 处理多个RDD的关联上比join更加优雅,它可以同时传入多个RDD作为参数进行关联。
      • coalesce:对RDD重新分区,将RDD中的分区数减小到参数numPartitions个,不会产生shufle。在较大的数据集中使用filer等过滤操作后可能会产生多个大小不等的中间结果数据文件,重新分区并减小分区可以提高作业的执行效率,是Spark中常用一种优化手段。
      • repartition:对RDD重新分区,接受一个参数,即numPartitions分区数,它是coalesce函数设置shuffle为true的一种实现形式。
    • 常用的Action操作
      • reduce:处理RDD两两之间元素的聚集操作。
      • collct:返回RDD中所有数据元素。
      • Count:返回RDD中元素个数。
      • First:返回RDD中的第一个元素。
      • Take:返回RDD中的前N个元素。
      • saveAsTextFile:将RDD写入文本文件,保存至本地文件系统或者HDFS中。
      • saveAsSequnceFile:将KV类型的RDD写入Sequence File文件, 保存至本地文件系统或者HDFS中。
      • countByKey: 返回K.V类型的RDD.这个RDD中数据为每个Key包含的元素个数。
      • Foreach:遍历RDD中所有元素, 接受参数为一个函教,常用操作是传入println函教打印所有元素。
    • 可以使用persist() 或者 cache() 方法缓存RDD ,unpersist()消除缓存
      • MEMORY_ONLY: RDD仅缓仔一份到内仔,此为默认致。
      • MEMORY_ONLY_2:将RDD分别缓存在集群的两个节点上,RDD在集群内存中保存
        两份。
      • MEMORY_ONLY_SER:将RDD以Java序列化对象的方式缓存到内存中,有效减少了
        RDD在内存中占用的空间,不过读取时会消耗更多的CPU资源。
      • DISK_ONLY: RDD仅缓存一份到磁盘。
      • MEMORY_ AND_DISK: RDD仅缓存一份到内存,当内存中空间不足时,会将部分
        RDD分区缓存到磁盘。
      • MEMORY_AN_ DISK 2:将RDD分别缓存在集群的两个节点上,当内存中空间不足
        时,会将部分RDD分区缓存到磁盘,RDD在集群内存中保存两份。
      • MEMORY_AND_DISK_SER:将RDD2Java序列化对象的方式缓存到内存中,当内存
        中空间不足时,会将部分RDD分区缓存到磁盘,有效减少了RDD在内存中占用6信
        间,不过读取时会消耗更多的CPU资源。
      • OFF_HEAP:将RDD以序列化的方式缓存到JVM之外的存储空间中,
        与其他缓存模式相比,减少了JVM垃圾回收开销。
    展开全文
  • Python大数据处理库 PySpark实战二Pyspark建立Spark RDDpyspark shellVScodeJupyter notebook动作算子变换算子 Pyspark建立Spark RDD 每个RDD可以分成多个分区,每个分区可以看作是一个数据集片段,可以保存到Spark...

    Pyspark建立Spark RDD

    • 每个RDD可以分成多个分区,每个分区可以看作是一个数据集片段,可以保存到Spark集群中的不同节点上
    • RDD自身具有容错机制,且是一种只读的数据结构,只能通过转换生成新的RDD;一个RDD通过分区可以多台机器上并行处理;可将部分数据缓存在内存中,可多次重用;当内存不足时,可把数据落到磁盘上
    • 创建RDD的方法
      • parallelize(集合,分区数)
      • range sc.range(1,10,2) 开始结束步长
      • 使用HDFS建立RDD

    pyspark shell

     #pyspark shell
     rdd = sc.parallelize(["hello world","hello spark"]);
     rdd2 = rdd.flatMap(lambda line:line.split(" "));
     rdd3 = rdd2.map(lambda word:(word,1));
     rdd5 = rdd3.reduceByKey(lambda a, b : a + b);
     rdd5.collect();
     quit();
    

    VScode

     # vscode
     #pip install findspark
     #fix:ModuleNotFoundError: No module named 'pyspark'
     import findspark
     findspark.init()
     
     #############################
     from pyspark import SparkConf, SparkContext
     
     # 创建SparkContext
     conf = SparkConf().setAppName("WordCount").setMaster("local[*]")
     sc = SparkContext(conf=conf)
      
     rdd = sc.parallelize(["hello world","hello spark"]);
     rdd2 = rdd.flatMap(lambda line:line.split(" "));
     rdd3 = rdd2.map(lambda word:(word,1));
     rdd5 = rdd3.reduceByKey(lambda a, b : a + b);
     #print,否则无法显示结果
     #[('spark', 1), ('hello', 2), ('world', 1)]
     print(rdd5.collect());
     #防止多次创建SparkContexts
     sc.stop()
    

    Jupyter notebook

     #jupyter
     from pyspark.sql import SparkSession
     spark = SparkSession.builder.master("local[*]").appName("WordCount").getOrCreate();
     sc = spark.sparkContext
     rdd = sc.parallelize(["hello world","hello spark"]);
     rdd2 = rdd.flatMap(lambda line:line.split(" "));
     rdd3 = rdd2.map(lambda word:(word,1));
     rdd5 = rdd3.reduceByKey(lambda a, b : a + b);
     #print,否则无法显示结果
     #[('spark', 1), ('hello', 2), ('world', 1)]
     print(rdd5.collect());
     #防止多次创建SparkContexts
     sc.stop()
    

    动作算子

    collect 把RDD类型数据转化为数组 同时从集群中拉取数据dirver端

    stats 返回RDD元素的计数、均值、方差、最大值和最小值

    countByKey 统计RDD[K,V]中每个K的数量 每个相同的K 结果加一 不是把V的值相加

    • first: 返回RDD中一个元素

    • max: 返回最大的一个元素

    • sum: 返回和

    • take: 返回前n个元素

    • top: 返回排序后的前n个元素 降序 top(10,key=str):按照字典序排序 前10个

    • count: 返回个数

    • collect :把RDD类型数据转化为数组 同时从集群中拉取数据dirver端

    • collectAsMap: 把键值RDD转换成Map映射保留其键值结构

    • countByKey: 统计RDD[K,V]中每个K的数量 每个相同的K 结果加一 不是把V的值相加

    • countByValue :统计一个RDD中各个Value出现的次数,返回字典,key是元素的值,value是出现的次数/

      sc.parallelize(range(2,100)) 等价于 sc.range(2,100)
      
      rdd3 = sc.parallelize([("a",1),("a",1),("b",2),("a",1)])
      print(rdd3.countByKey())
      #defaultdict(<class 'int'>, {'a': 3, 'b': 1})
      print(rdd3.countByValue())
      #defaultdict(<class 'int'>, {('a', 1): 3, ('b', 2): 1})
      
    • stats:返回RDD元素的计数、均值、方差、最大值和最小值

      rdd = sc.parallelize(range(100))
      print(rdd.stats())
      #(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99, min: 0)
      
    • aggregate : aggregate(zeroValue,seqOp,combOp) 使用seqOP函数和给定的zeroValue聚合每个分区上的元素,然后用CombOp和zeroValue聚合所有分区结果

      data=[1,3,5,7,9,11,13,15,17]
      rdd=sc.parallelize(data,2)
      print(rdd.glom().collect()) 
      # [[1, 3, 5, 7], [9, 11, 13, 15, 17]]
      seqOp = (lambda x, y: (x[0] + y, x[1] + 1))  #求和 和 个数
      combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
      a=rdd.aggregate((0,0),seqOp,combOp)
      #(81, 9)=(0,0)+(16,4)+(65,5)
      

    变换算子

    coalesce 重新分区
    filter 过滤
    map 每个元素转换
    flatmap 每个元素转换并扁平化
    mapPartitions 按分区转换
    mapValues KV格式 保留k 对v操作
    reduce 减少个数 ; reducebykey KV格式 对v操作 减少元素个数
    join 内链接; fullOuterJoin 全外部连接
    groupBy 函数的返回作为k 分组 ;groupByKey KV中的K分组
    keys 、values获取 对应序列
    zip 元素相同 一一对应
    union 合并; substract 减法 ; intersection 交集; certesian交集
    cache、persist 缓存
    glom 查看分区状态
    sortBy:对RDD元素进行排序

    • coalesce:rdd.coalesce(numPartitions,[isShuffle=False]) 将RDD进行重新分区,分区过程中是否进行混洗操作

      rdd=sc.parallelize([1, 2, 3, 4, 5], 3).glom()
      #[[1], [2, 3], [4, 5]]
      rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6], 3).coalesce(1,False)
      #[1, 2, 3, 4, 5, 6]
      
    • repartition: 和coalesce(1,True) 一样 重新分区并混洗

    • distinct :去重

    • filter:返回满足过滤函数为True的元素构成 filter(lambda x: x%2 == 0)

      #filter
      rdd5 = sc.parallelize([1,2,3,4,5]).filter(lambda x: x%2 == 0)
      print(rdd5.collect())
      [2,4]
      
    • map:对RDD每个元素按照func定义的逻辑处理,在统计单词个数中常用rdd.map(func,preservesPartitioning=Flase)

      rdd = sc.parallelize(["b", "a", "c", "d"])
      rdd2 = rdd.map(lambda x: (x, 1))
      #[('b', 1), ('a', 1), ('c', 1), ('d', 1)]
      
    • flatMap:对RDD中每一个元素按照func的处理逻辑操作,并将结果扁平化处理

      #faltMap
      rdd5 = sc.parallelize([1,2,3,4,5]).flatMap(lambda x:[(x,1)])
      print(rdd5.collect())
      [(1, 2), (2, 4), (3, 6), (4, 8), (5, 10)]
      
    • flatMapValues:对RDD元素格式为KV对中的Value进行func定义的逻辑处理,形成新的KV,并把结果扁平化处理

      #flatMapValues
      rdd = sc.parallelize([("a", [1, 2, 3]), ("c", ["w", "m"])])
      ret = rdd.flatMapValues(lambda x: x)
      #[('a', 1), ('a', 2), ('a', 3), ('c', 'w'), ('c', 'm')]
      
    • mapPartitions:RDD每个分区中元素按照定义的逻辑返回处理,并分别返回值

      rdd = sc.parallelize([1, 2, 3, 4 , 5], 2)
      def f(iter): 
          yield sum(iter) #yield的作用是把函数变成generator,返回的是iterable对象
      
      rdd2 = rdd.mapPartitions(f)
      print(rdd2.collect())
      #[3,12]
      
    • mapValues:对KV格式的RDD中的每个元素应用函数,K值不变且保留原始分区, 对Value操作

      rdd = sc.parallelize([("a", ["hello", "spark", "!"]), ("b", ["cumt"])])
      rdd2 = rdd.mapValues(lambda x:len(x))
      #[('a', 3), ('b', 1)]
      
    • mapPartitionsWithIndex:RDD每个分区中元素按照定义的逻辑返回处理,跟踪原始分区的索引

      rdd = sc.parallelize([1, 2, 3, 4 ,5 ,6], 3)
      def f(index, iter): 
        #分区索引 0,1,2
        print(index)
        for x in iter:
          #1,2;3,4;5,6
          print(x)
          yield index
      ret = rdd.mapPartitionsWithIndex(f).sum()
      #3=0+1+2
      print(ret)	
      
    • reduce : 按照func对RDD元素计算,减少元素个数

      rdd = sc.parallelize([1, 2, 3, 4, 5])
      ret = rdd.reduce(lambda x,y : x+y)
      15
      
    • reduceByKey : 对KV的数据进行运算,减少元素个数

      rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("b", 3)])
      rdd2 = rdd.reduceByKey(lambda x,y:x+y)
      #[('a', 3), ('b', 4)]
      
    • join: 包含自身和另一个匹配键的所有成对元素,每对元素以(k,(v1,v2))元组返回,其中(k,v1)在自身,(k,v2)在另一个中

      x = sc.parallelize([("a", 1), ("b", 4)])
      y = sc.parallelize([("a", 2), ("a", 3)])
      ret = x.join(y).collect()
      #[('a', (1, 2)), ('a', (1, 3))]
      
    • fullOuterJoin : 全外部连接 没有匹配到就是None

      x = sc.parallelize([("a", 1), ("b", 4)])
      y = sc.parallelize([("a", 2), ("c", 8)])
      rdd = x.fullOuterJoin(y)
      # [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
      
    • leftOuterJoin 和 rightOuterJoin : 左外连接 和 右外连接

      x = sc.parallelize([("a", 1), ("b", 4)])
      y = sc.parallelize([("a", 2), ("c", 8)])
      rdd = x.leftOuterJoin(y)
      #[('b', (4, None)), ('a', (1, 2))]
      rdd = x.rightOuterJoin(y)
      #[('c', (None, 8)), ('a', (1, 2))]
      
    • groupBy :groupBy(func,numPartitions=None,partitionFunc=<function portable_hash) 函数的返回作为key,通过key对其元素进行分组,返回新的RDD

    • rdd = sc.parallelize([1, 2, 3, 4, 5, 10])
      rdd = rdd.groupBy(lambda x:x%2)
      result = rdd.collect()
      #[(0, <pyspark.resultiterable.ResultIterable object at 0x110ef9c50>), (1, <pyspark.resultiterable.ResultIterable object at 0x110ef94d0>)]
      ret = sorted([(x, sorted(y)) for (x, y) in result])
      #[(0, [2, 4, 10]), (1, [1, 3, 5])]
      
    • groupByKey : 将RDD中每个键的值分组为单个序列,用numsPartitions分区对生成的RDD进行哈希分区 如果求和或平均值 建议使用reduceByKey 或 AggregateByKey

      rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
      rdd2 = rdd.groupByKey().mapValues(lambda x: sum(x))
      rdd3 = rdd.reduceByKey(lambda x,y: x+y) #和rdd2一样
      # [('a', 2), ('b', 1)]
      print(sorted(rdd2.collect()))
      
    • keyBy: 将原有RDD中的元素作为Key,Key通过func返回值作为value创建一个元组

      rdd = sc.parallelize(range(0,3))
      rdd = rdd.keyBy(lambda x: x*x)
      #[(0, 0), (1, 1), (4, 2)]
      
    • keys:获取KV格式中的Key序列,返回新的RDD

      rdd1 = sc.parallelize([("a",1),("b",2),("a",3)])
      print(rdd1.keys().collect())
      #['a', 'b', 'a']
      
    • values:获取KV格式中的Value序列,返回新的RDD

      rdd1 = sc.parallelize([("a",1),("b",2),("a",3)])
      print(rdd1.keys().collect())
      #[1, 2, 3]
      
    • zip:rdd.zip(otherRDD)将第一个RDD中的元素作为Key,第二个RDD中的作为Value组成新的RDD,两个RDD的元素个数相同

      x = sc.parallelize(range(1,6))
      y = sc.parallelize(range(801, 806))
      print(x.zip(y).collect())
      #[(1, 801), (2, 802), (3, 803), (4, 804), (5, 805)]
      #x,y长度必须相等
      
    • zipWithIndex:RDD元素作为key,索引作为Value

      rdd = sc.parallelize(["a", "b", "c", "d"], 3)
      print(rdd.zipWithIndex().collect())
      #[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
      
    • union:第一个RDD元素和第二个的合并

      dd =sc.parallelize(range(1,10))
      rdd2 =sc.parallelize(range(11,20))
      rdd3 = rdd.union(rdd2)
      #[1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19]
      
    • subtract:第一个中排出第二个中的元素

      x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
      y = sc.parallelize([("a", 1), ("b", 5)])
      z = x.subtract(y)
      #[('b', 4), ('a', 3)]
      
    • subtractByKey :从元素为KV格式的RDD中除掉另一个,只要Key一样就删除

      x = sc.parallelize([("a", 1), ("b", 4), ("c", 5), ("a", 3)])
      y = sc.parallelize([("a", 7), ("b", 0)])
      z = x.subtractByKey(y)
      #[('c', 5)]
      
    • intersection:返回交集并去重

      rdd1 = sc.parallelize([("a", 2), ("b", 1), ("a", 2),("b", 3)])
      rdd2 = sc.parallelize([("a", 2), ("b", 1), ("e", 5)])
      ret = rdd1.intersection(rdd2).collect()
      #('a', 2), ('b', 1)]
      
    • certesian: 返回两个RDD的笛卡尔积 元素较多可能出现内存不足情况

      rdd = sc.parallelize([1, 2])
      rdd2 = sc.parallelize([3, 7])
      rdd3 = sorted(rdd.cartesian(rdd2).collect())
      #[(1, 3), (1, 7), (2, 3), (2, 7)]
      print(rdd3)
      
    • sortBy:对RDD元素进行排序,sortBy(keyfuc,ascending=True,numPartitions=None),默认升序

      rdd = [('a', 6), ('f', 11), ('c', 7), ('d', 4), ('e', 5)]
      rdd2 = sc.parallelize(rdd).sortBy(lambda x: x[0])
      #[('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)]
      rdd3 = sc.parallelize(rdd).sortBy(lambda x: x[1])
      #[('f', 2), ('d', 4), ('e', 5), ('a', 6), ('c', 7)]
      rdd3 = sc.parallelize(rdd).sortBy(lambda x: x[1],False)
      #[('c', 7), ('a', 6), ('e', 5), ('d', 4), ('f', 2)]
      
    • sortByKey : 按照Key排序 sortByKey(ascending=True,numPartitions=None,keyfunc=)

      x = [('a', 6), ('f', 2), ('c', 7), ('d', 4), ('e', 5)]
      rdd = sc.parallelize(x).sortByKey(True, 1)
      #[('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)]
      print(rdd.collect())
      
    • takeOrdered:RDD中获取排序后的前num个元素构成RDD,默认升序,可支持可选函数

      rdd =sc.parallelize(range(2,100))
      print(rdd.takeOrdered(10))
      #[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
      print(rdd.takeOrdered(10, key=lambda x: -x))
      #[99, 98, 97, 96, 95, 94, 93, 92, 91, 90]
      
    • takeSample:takeSample(withReplacement,num,seed=None) 抽样出固定大小的子数据集合,第一个参数布尔值表示是否可以多次抽样,第二个抽样的个数,第三个随机数生成器种子

      dd =sc.parallelize(range(2,10))
      print(rdd.takeSample(True, 20, 1))
      #True代表一个元素可以出现多次
      #[5, 9, 5, 3, 2, 2, 7, 7, 5, 7, 9, 9, 5, 3, 2, 4, 5, 5, 6, 8]
      print(rdd.takeSample(False, 20, 1))
      #False代表一个元素只能出现1次
      #[5, 8, 3, 7, 9, 2, 6, 4]
      
    • sample : sample(withReplacement,fraction,seed) 第二个参数 抽样比例[0,1]

      rdd = sc.parallelize(range(100), 1)
      ret = rdd.sample(False, 2, 1)
      #可能输出[9, 11, 13, 39, 49, 55, 61, 65, 90, 91, 93, 94]
      
    • randomSplit:按照权重对RDD随机切分,返回多个RDD构成的列表

      rdd = sc.parallelize(range(100), 1)
      rdd1, rdd2 = rdd.randomSplit([2, 3], 10)
      print(len(rdd1.collect())) #40
      print(len(rdd2.collect())) #60
      
    • loopup: 根据key值从RDD中找到相关的元素,返回KV中的V

      rdd = sc.parallelize([('a', 'b'), ('c', 'd')])
      print(rdd.lookup('a')) #['b']
      
    • fold:对RDD每个元素按照func的逻辑进行处理fold(value,func) func有两个参数a,b a的初始值为value,后续为累加值,b代表当前元素值 可以用来累加 累乘

      #fold
      ret=sc.parallelize([1, 2, 3, 4, 5]).fold(0, lambda x,y:x+y)
      #15
      ret=sc.parallelize([1, 2, 3, 4, 5]).fold(1, lambda x,y:x*y)
      #120
      
    • foldByKey:对RDD元素格式为KV对中的Key进行func定义的逻辑处理,可以用来分组累加累乘

      #foldByKey
      rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3),("b", 5)])
      rdd2=rdd.foldByKey(0, lambda x,y:x+y)
      # [('a', 4), ('b', 7)]
      rdd3=rdd.foldByKey(1, lambda x,y:x*y)
      # [('a', 3), ('b', 10)]
      
    • foreach:对RDD每个元素按照func定义的逻辑处理

    • foreachPartion:对RDD每个分区中的元素按照func定义逻辑处理,一般来说foreachPartion效率比foreach高,是一次性处理一个partition数据,在写数据库的时候,性能比map高很多

      rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3),("b", 5)])
      def f(x):
          print(x)
          return (x[0],x[1]*2)
          
      def f2(iter):
          for x in iter:
              print(x)
              
      ret = rdd.foreach(f)
      ret2 = sc.parallelize([1,2,3,4,5,6,7,8],2).foreachPartition(f2)
      
    • aggregateByKey:aggregate(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc=) 使用seqFunc函数和给定的zeroValue聚合每个分区上的元素,然后用CombFunc和zeroValue聚合所有分区结果

      data=[("a",1),("b",2),("a",3),("b",4),("a",5),("b",6),("a",7),("b",8),("a",9),("b",10)]
      rdd=sc.parallelize(data,2)
      print(rdd.glom().collect())
      #[[('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5)], [('b', 6), ('a', 7), ('b', 8), ('a', 9), ('b', 10)]]
      def seqFunc(x,y):
      	return x + y
      def combFunc(x,y):
      	return x + y
      a=rdd.aggregateByKey(0,seqFunc,combFunc)
      # [('b', 30), ('a', 25)]
      print(a.collect())
      
    • combineByKey:

      • createCombiner: V => C 这个函数把当前的值作为参数 可以对其做一些操作并返回
      • mergeValue :(C,V) => C 把元素V合并到之前的元素C上 (这个操作在每个分区内进行)
      • mergeCombiners:(C,C) => C 把2个元素合并 (这个操作在不同分区间进行)
      a = [1,2]
      b = [10,11]
      a.extend(b) #[1, 2, 10, 11]
      a.append(b) #[1, 2, [10, 11]]
      
      #combineByKey
      rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2),("b", 4)],2)
      def to_list(a):
          return [a]
      def append(a, b): #分区合并
          a.append(b)
          return a
      def extend(a, b):#不同分区合并
          a.extend(b)
          return a
      print(rdd.glom().collect())
      ret = sorted(rdd.combineByKey(to_list, append, extend).collect())
      #[[('a', 1), ('b', 3)], [('a', 2), ('b', 4)]]
      #[('a', [1, 2]), ('b', [3, 4])]  
      
    • glom:把RDD中每一个分区的元素T转换成Array[T],每个分区只有一个数组元素

      #glom
      rdd2 = sc.parallelize([1,2,3,4,5],3)
      print(rdd2.collect())
      #[1, 2, 3, 4, 5]
      print(rdd2.glom().collect())
      #[[1], [2, 3], [4, 5]]
      print(rdd2.coalesce(1).glom().collect())
      #[[1, 2, 3, 4, 5]]
      
    • cache : 缓存 默认存储级别(MEMORY_ONLY)

    • persist : 缓存 可以定制存储级别 storageLevel

    • saveAsTextFile: 保存RDD文件作为一个对象,

    展开全文
  • Python大数据处理库 PySpark实战四ETL 实战实验数据来源数据加载观察资料选择、筛选与聚合机器学习实战实验数据来源数据加载统计描述清洗与变形Pipeline逻辑回归预测决策树预测 ETL 实战 实验数据来源 ...

    ETL 实战

    实验数据来源

    • https://groupllens.org/datasets/movielens/
    • 下载一个精简数据集。rating.csv 电影评分记录 :userId给电影评价的用户ID movieId 电影的ID rating 打分5分满分,timestamp时间戳

    数据加载

    from pyspark.sql import SparkSession
    from pyspark.sql.context import SQLContext
    from pyspark.sql.functions import to_utc_timestamp
    
    spark = SparkSession.builder.master("local[*]").appName("PySpark ETL").getOrCreate()
    sc = spark.sparkContext
    #############################################
    #相对路径,文件包含标题行
    df = spark.read.csv('hdfs://localhost:9000/ml-latest-small/ratings.csv',header=True)
    #打印默认的字段类型信息
    df.printSchema()
    #打印前20条数据
    df.show()
    #打印总行数
    print(df.count())
    
    root
     |-- userId: string (nullable = true)
     |-- movieId: string (nullable = true)
     |-- rating: string (nullable = true)
     |-- timestamp: string (nullable = true)
    
    +------+-------+------+---------+
    |userId|movieId|rating|timestamp|
    +------+-------+------+---------+
    |     1|      1|   4.0|964982703|
    |     1|      3|   4.0|964981247|
    |     1|      6|   4.0|964982224|
    |     1|     47|   5.0|964983815|
    |     1|     50|   5.0|964982931|
    |     1|     70|   3.0|964982400|
    |     1|    101|   5.0|964980868|
    |     1|    110|   4.0|964982176|
    |     1|    151|   5.0|964984041|
    |     1|    157|   5.0|964984100|
    |     1|    163|   5.0|964983650|
    |     1|    216|   5.0|964981208|
    |     1|    223|   3.0|964980985|
    |     1|    231|   5.0|964981179|
    |     1|    235|   4.0|964980908|
    |     1|    260|   5.0|964981680|
    |     1|    296|   3.0|964982967|
    |     1|    316|   3.0|964982310|
    |     1|    333|   5.0|964981179|
    |     1|    349|   4.0|964982563|
    +------+-------+------+---------+
    only showing top 20 rows
    
    100836
    

    但是我们发现读取文件后,用printSchema打印各个字段类型都是string,不满足业务的实际情况

    df = spark.read.csv('hdfs://localhost:9000/ml-latest-small/ratings.csv',header=True,inferSchema=True)
    df.printSchema()
    root
     |-- userId: integer (nullable = true)
     |-- movieId: integer (nullable = true)
     |-- rating: double (nullable = true)
     |-- timestamp: integer (nullable = true) |-- timestamp: integer (nullable = true)
    

    观察资料

    # Matplotlib 绘制折线图
    import matplotlib
    import matplotlib.pyplot as plt
    import numpy as np
    
    #支持中文,否则乱码
    plt.rcParams['font.family'] = ['sans-serif']
    plt.rcParams['font.sans-serif'] = ['SimHei']
    #准备数据
    t = np.arange(0.0, 2.0, 0.01)
    s = 1 + np.sin(2 * np.pi * t)
    fig, ax = plt.subplots()
    #设置窗口标题
    fig.canvas.set_window_title('折线图示例')
    #绘图,折线图
    ax.plot(t, s)
    #坐标轴设置
    ax.set(xlabel='时间 (s)', ylabel='电压 (mV)',title='折线图')
    ax.grid()
    plt.show()
    

    在这里插入图片描述

    对rating.csv数据进行探究

    df = spark.read.csv('hdfs://localhost:9000/ml-latest-small/ratings.csv',header=True,inferSchema=True)
    df.show(10)
    df.printSchema()
    # 对rating列进行数据统计分析
    df.select("rating").describe().show()
    #打印资料的行数量和列数量
    print(df.count(),len(df.columns))
    
    #删除所有列的空值和NaN
    dfNotNull=df.na.drop()
    print(dfNotNull.count(),len(dfNotNull.columns))
    
    #创建视图movie
    df.createOrReplaceTempView("movie")
    #spark sql
    df2 = spark.sql("select count(*) as counter, rating from movie \
    						group by rating order by rating asc")
    df2.show()
    ##############################################
    from matplotlib import pyplot as plt 
    pdf = df2.toPandas()
    x = pdf["rating"]
    y = pdf["counter"]
    plt.xlabel("rating")
    plt.ylabel("counter")
    plt.title("movie rating")
    plt.bar(x,y)
    #显示数值标签
    for x1,y1 in zip(x,y):
      plt.text(x1, y1+0.05, '%.0f' %y1, ha='center', va= 'bottom',fontsize=11)
    plt.show()
    
    
    +------+-------+------+---------+
    |userId|movieId|rating|timestamp|
    +------+-------+------+---------+
    |     1|      1|   4.0|964982703|
    |     1|      3|   4.0|964981247|
    |     1|      6|   4.0|964982224|
    |     1|     47|   5.0|964983815|
    |     1|     50|   5.0|964982931|
    |     1|     70|   3.0|964982400|
    |     1|    101|   5.0|964980868|
    |     1|    110|   4.0|964982176|
    |     1|    151|   5.0|964984041|
    |     1|    157|   5.0|964984100|
    +------+-------+------+---------+
    only showing top 10 rows
    
    root
     |-- userId: integer (nullable = true)
     |-- movieId: integer (nullable = true)
     |-- rating: double (nullable = true)
     |-- timestamp: integer (nullable = true)
    
    +-------+------------------+
    |summary|            rating|
    +-------+------------------+
    |  count|            100836|
    |   mean| 3.501556983616962|
    | stddev|1.0425292390606342|
    |    min|               0.5|
    |    max|               5.0|
    +-------+------------------+
    
    100836 4
    100836 4
    +-------+------+
    |counter|rating|
    +-------+------+
    |   1370|   0.5|
    |   2811|   1.0|
    |   1791|   1.5|
    |   7551|   2.0|
    |   5550|   2.5|
    |  20047|   3.0|
    |  13136|   3.5|
    |  26818|   4.0|
    |   8551|   4.5|
    |  13211|   5.0|
    +-------+------+
    

    在这里插入图片描述

    选择、筛选与聚合

    • 转换时间
      • UNIX_TIMESTAMP:是把时间字段转化为整型,需要注意的是有些数据库需要指明时间字段类型,比如MySQL里是可以直接UNIX_TIMESTAMP(‘20200223’),而某些大数据平台需要UNIX_TIMESTAMP(‘20200223’,‘yyyyMMdd’)
      • FROM_UNIXTIME:从整型里把时间整型进行破解成想要的时间格式,使用时可指定格式
    from pyspark.sql.functions import from_unixtime
    df = spark.read.csv('hdfs://localhost:9000/ml-latest-small/ratings.csv',header=True,inferSchema=True)
    df = df.withColumn("rating", df.rating.cast("double"))
    #新增一列date
    df = df.withColumn("date",from_unixtime(df.timestamp.cast("bigint"), 'yyyy-MM-dd'))
    df = df.withColumn("date",df.date.cast("date"))
    #删除timestamp列
    df = df.drop("timestamp")
    df.show(10)
    
    +------+-------+------+----------+
    |userId|movieId|rating|      date|
    +------+-------+------+----------+
    |     1|      1|   4.0|2000-07-31|
    |     1|      3|   4.0|2000-07-31|
    |     1|      6|   4.0|2000-07-31|
    |     1|     47|   5.0|2000-07-31|
    |     1|     50|   5.0|2000-07-31|
    |     1|     70|   3.0|2000-07-31|
    |     1|    101|   5.0|2000-07-31|
    |     1|    110|   4.0|2000-07-31|
    |     1|    151|   5.0|2000-07-31|
    |     1|    157|   5.0|2000-07-31|
    +------+-------+------+----------+
    
    • Inner Join 电影表
    from pyspark.sql.types import BooleanType
    from pyspark.sql.functions import udf
    
    df2 = spark.read.csv('hdfs://localhost:9000/ml-latest-small/movies.csv',header=True)
    df3 = df.join(df2, df.movieId == df2.movieId,"inner").select("userId",df.movieId,"title","date","rating")
    #定义普通的python函数
    def isLike(v):
        if v > 4:
            return True
        else:
            return False
    #创建udf函数
    udf_isLike=udf(isLike,BooleanType())
    df3 = df3.withColumn("isLike",udf_isLike(df3["rating"]))
    df3.show()
    
    +------+-------+--------------------+----------+------+------+
    |userId|movieId|               title|      date|rating|isLike|
    +------+-------+--------------------+----------+------+------+
    |     1|      1|    Toy Story (1995)|2000-07-31|   4.0| false|
    |     1|      3|Grumpier Old Men ...|2000-07-31|   4.0| false|
    |     1|      6|         Heat (1995)|2000-07-31|   4.0| false|
    |     1|     47|Seven (a.k.a. Se7...|2000-07-31|   5.0|  true|
    |     1|     50|Usual Suspects, T...|2000-07-31|   5.0|  true|
    |     1|     70|From Dusk Till Da...|2000-07-31|   3.0| false|
    |     1|    101|Bottle Rocket (1996)|2000-07-31|   5.0|  true|
    |     1|    110|   Braveheart (1995)|2000-07-31|   4.0| false|
    |     1|    151|      Rob Roy (1995)|2000-07-31|   5.0|  true|
    |     1|    157|Canadian Bacon (1...|2000-07-31|   5.0|  true|
    +------+-------+--------------------+----------+------+------+
    
    • 定义Pandas UDF函数,对象聚合 要求Pyspark2.3 以上
    from pyspark.sql.functions import pandas_udf, udf
    
    #定义pandas udf函数,用于GroupedData
    @pandas_udf("string", PandasUDFType.GROUPED_AGG) 
    def fmerge(v):
        return ','.join(v)
    
    df5 = spark.read.csv('hdfs://localhost:9000/ml-latest-small/tags.csv',header=True)
    df5 = df5.drop("timestamp")
    #groupBy聚合
    df7 = df5.groupBy(["userId","movieId"]).agg(fmerge(df5["tag"]))
    df7 = df7.withColumnRenamed("fmerge(tag)","tags")
    #select选择
    df6 = df3.join(df7,(df3.movieId == df7.movieId) & (df3.userId == df7.userId))\
            .select(df3.userId,df3.movieId,"title","date","tags","rating","isLike") \
            .orderBy(["date"], ascending=[0])
    #filter筛选
    df6 = df6.filter(df.date>'2015-10-25')
    df6.show(20)
    
    • 存储数据
    #存储数据text格式
    f6.rdd.coalesce(1).saveAsTextFile("movie-out")
    #存储数据CSV格式 
    df6.coalesce(1).write.format("csv").option("header","true").save("movie-out-csv")
    #parquet格式
    df6.write.format('parquet').save("movie-out-parquet")
    #json格式
    df6.coalesce(1).write.format("json").save("movie-out-json")
    #存储到数据库
    #需要把对应的数据库驱动安装到目录jars下
    #save to file
    db_url = "jdbc:sqlserver://localhost:1433;databaseName=bg_data;user=root;password=root"
    db_table = "movie"
    #overwrite会重新生成表 append
    df6.write.mode("overwrite").jdbc(db_url, db_table)
    
    

    机器学习实战

    实验数据来源

    • https://www.kaggle.com/c/titanic
    • 通过训练数据集分析哪些乘客可能幸存

    数据加载

    from pyspark.sql import SparkSession
    from pyspark.sql.context import SQLContext
    from pyspark.sql.functions import to_utc_timestamp
    
    spark = SparkSession.builder.master("local[*]").appName("PySpark AI").getOrCreate()
    sc = spark.sparkContext
    
    print("Titanic train.csv Info")
    df_train = spark.read.csv('hdfs://localhost:9000/titanic/train.csv',header=True,inferSchema=True).cache()
    df_train.printSchema()
    print(df_train.count(),len(df_train.columns))
    df_train.show()
    print("#############################################")
    print("Titanic test.csv Info")
    df_test = spark.read.csv('hdfs://localhost:9000/titanic/test.csv',header=True,inferSchema=True).cache()
    df_test.printSchema()
    print(df_test.count(),len(df_test.columns))
    df_test.show()
    
    Titanic train.csv Info
    root
     |-- PassengerId: integer (nullable = true)
     |-- Survived: integer (nullable = true)
     |-- Pclass: integer (nullable = true)
     |-- Name: string (nullable = true)
     |-- Sex: string (nullable = true)
     |-- Age: double (nullable = true)
     |-- SibSp: integer (nullable = true)
     |-- Parch: integer (nullable = true)
     |-- Ticket: string (nullable = true)
     |-- Fare: double (nullable = true)
     |-- Cabin: string (nullable = true)
     |-- Embarked: string (nullable = true)
    
    891 12
    +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
    |PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
    +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
    |          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
    |          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
    |          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
    |          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
    |          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
    |          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|          330877| 8.4583| null|       Q|
    |          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|           17463|51.8625|  E46|       S|
    |          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1|          349909| 21.075| null|       S|
    |          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742|11.1333| null|       S|
    |         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736|30.0708| null|       C|
    |         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|         PP 9549|   16.7|   G6|       S|
    |         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|          113783|  26.55| C103|       S|
    |         13|       0|     3|Saundercock, Mr. ...|  male|20.0|    0|    0|       A/5. 2151|   8.05| null|       S|
    |         14|       0|     3|Andersson, Mr. An...|  male|39.0|    1|    5|          347082| 31.275| null|       S|
    |         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0|          350406| 7.8542| null|       S|
    |         16|       1|     2|Hewlett, Mrs. (Ma...|female|55.0|    0|    0|          248706|   16.0| null|       S|
    |         17|       0|     3|Rice, Master. Eugene|  male| 2.0|    4|    1|          382652| 29.125| null|       Q|
    |         18|       1|     2|Williams, Mr. Cha...|  male|null|    0|    0|          244373|   13.0| null|       S|
    |         19|       0|     3|Vander Planke, Mr...|female|31.0|    1|    0|          345763|   18.0| null|       S|
    |         20|       1|     3|Masselmani, Mrs. ...|female|null|    0|    0|            2649|  7.225| null|       C|
    +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
    only showing top 20 rows
    
    #############################################
    Titanic test.csv Info
    root
     |-- PassengerId: integer (nullable = true)
     |-- Pclass: integer (nullable = true)
     |-- Name: string (nullable = true)
     |-- Sex: string (nullable = true)
     |-- Age: double (nullable = true)
     |-- SibSp: integer (nullable = true)
     |-- Parch: integer (nullable = true)
     |-- Ticket: string (nullable = true)
     |-- Fare: double (nullable = true)
     |-- Cabin: string (nullable = true)
     |-- Embarked: string (nullable = true)
    
    418 11
    +-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
    |PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
    +-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
    |        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0|          330911| 7.8292| null|       Q|
    |        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0|          363272|    7.0| null|       S|
    |        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0|          240276| 9.6875| null|       Q|
    |        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0|          315154| 8.6625| null|       S|
    |        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|         3101298|12.2875| null|       S|
    |        897|     3|Svensson, Mr. Joh...|  male|14.0|    0|    0|            7538|  9.225| null|       S|
    |        898|     3|Connolly, Miss. Kate|female|30.0|    0|    0|          330972| 7.6292| null|       Q|
    |        899|     2|Caldwell, Mr. Alb...|  male|26.0|    1|    1|          248738|   29.0| null|       S|
    |        900|     3|Abrahim, Mrs. Jos...|female|18.0|    0|    0|            2657| 7.2292| null|       C|
    |        901|     3|Davies, Mr. John ...|  male|21.0|    2|    0|       A/4 48871|  24.15| null|       S|
    |        902|     3|    Ilieff, Mr. Ylio|  male|null|    0|    0|          349220| 7.8958| null|       S|
    |        903|     1|Jones, Mr. Charle...|  male|46.0|    0|    0|             694|   26.0| null|       S|
    |        904|     1|Snyder, Mrs. John...|female|23.0|    1|    0|           21228|82.2667|  B45|       S|
    |        905|     2|Howard, Mr. Benjamin|  male|63.0|    1|    0|           24065|   26.0| null|       S|
    |        906|     1|Chaffee, Mrs. Her...|female|47.0|    1|    0|     W.E.P. 5734| 61.175|  E31|       S|
    |        907|     2|del Carlo, Mrs. S...|female|24.0|    1|    0|   SC/PARIS 2167|27.7208| null|       C|
    |        908|     2|   Keane, Mr. Daniel|  male|35.0|    0|    0|          233734|  12.35| null|       Q|
    |        909|     3|   Assaf, Mr. Gerios|  male|21.0|    0|    0|            2692|  7.225| null|       C|
    |        910|     3|Ilmakangas, Miss....|female|27.0|    1|    0|STON/O2. 3101270|  7.925| null|       S|
    |        911|     3|"Assaf Khalil, Mr...|female|45.0|    0|    0|            2696|  7.225| null|       C|
    +-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
    only showing top 20 rows
    

    统计描述

    df_train = spark.read.csv('hdfs://localhost:9000/titanic/train.csv',header=True,inferSchema=True).cache()
    #计算基本的统计描述信息
    df_train.describe("Age","Pclass","SibSp","Parch").show()
    df_train.describe("Sex","Cabin","Embarked","Fare","Survived").show()
    
    pdf = df_train.groupBy('sex','Survived') \
         .agg({'PassengerId': 'count'}) \
         .withColumnRenamed("count(PassengerId)","count") \
         .orderBy("sex") \
         .toPandas()
    print(pdf)
    
    pdf = df_train.groupBy('Pclass','Survived') \
         .agg({'PassengerId': 'count'}) \
         .withColumnRenamed("count(PassengerId)","count") \
         .orderBy("Pclass") \
         .toPandas()
    print(pdf)
    
    
    pdf = df_train.groupBy('Parch','Survived') \
         .agg({'PassengerId': 'count'}) \
         .withColumnRenamed("count(PassengerId)","count") \
         .orderBy("Parch") \
         .toPandas()
    print(pdf)
    
    pdf = df_train.groupBy('SibSp','Survived') \
         .agg({'PassengerId': 'count'}) \
         .withColumnRenamed("count(PassengerId)","count") \
         .orderBy("SibSp") \
         .toPandas()
    print(pdf)
    
    +-------+------------------+------------------+------------------+-------------------+
    |summary|               Age|            Pclass|             SibSp|              Parch|
    +-------+------------------+------------------+------------------+-------------------+
    |  count|               714|               891|               891|                891|
    |   mean| 29.69911764705882| 2.308641975308642|0.5230078563411896|0.38159371492704824|
    | stddev|14.526497332334035|0.8360712409770491|1.1027434322934315| 0.8060572211299488|
    |    min|              0.42|                 1|                 0|                  0|
    |    max|              80.0|                 3|                 8|                  6|
    +-------+------------------+------------------+------------------+-------------------+
    
    +-------+------+-----+--------+-----------------+-------------------+
    |summary|   Sex|Cabin|Embarked|             Fare|           Survived|
    +-------+------+-----+--------+-----------------+-------------------+
    |  count|   891|  204|     889|              891|                891|
    |   mean|  null| null|    null| 32.2042079685746| 0.3838383838383838|
    | stddev|  null| null|    null|49.69342859718089|0.48659245426485753|
    |    min|female|  A10|       C|              0.0|                  0|
    |    max|  male|    T|       S|         512.3292|                  1|
    +-------+------+-----+--------+-----------------+-------------------+
    
          sex  Survived  count
    0  female         1    233
    1  female         0     81
    2    male         0    468
    3    male         1    109
    
       Pclass  Survived  count
    0       1         0     80
    1       1         1    136
    2       2         1     87
    3       2         0     97
    4       3         1    119
    5       3         0    372
        Parch  Survived  count
    0       0         0    445
    1       0         1    233
    2       1         0     53
    3       1         1     65
    4       2         1     40
    5       2         0     40
    6       3         1      3
    7       3         0      2
    8       4         0      4
    9       5         0      4
    10      5         1      1
    11      6         0      1
        SibSp  Survived  count
    0       0         0    398
    1       0         1    210
    2       1         0     97
    3       1         1    112
    4       2         1     13
    5       2         0     15
    6       3         1      4
    7       3         0     12
    8       4         0     15
    9       4         1      3
    10      5         0      5
    11      8         0      7
    

    清洗与变形

    from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
    
    df_train = spark.read.csv('hdfs://localhost:9000/titanic/train.csv',header=True,inferSchema=True).cache()
    #用平均值29.699替换缺失值
    df_train = df_train.fillna({'Age': round(29.699,2)})
    #用登录最多的港口'S'替换缺失值
    df_train = df_train.fillna({'Embarked': 'S'})
    #df_train = df_train.fillna({'Cabin': 'unknown'})
    #删除列
    df_train = df_train.drop("Cabin")
    df_train = df_train.drop("Ticket")
    
    #StringIndexer转换器把一列标签类型的特征数值化编码
    labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
    model = labelIndexer.fit(df_train)
    df_train = model.transform(df_train)
    
    labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
    model = labelIndexer.fit(df_train)
    df_train = model.transform(df_train)
    
    df_train.show()
    # 特征选择
    features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']
    train_features = df_train[features]
    train_features.show()
    # train_labels = df_train['Survived']
    # train_labels.show()
    
    # VectorAssembler将多个列转换成向量
    df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")
    df = df_assembler.transform(train_features)
    
    #df["features"].show()-> TypeError: 'Column' object is not callable
    df["features",].show()
    df["Survived",].show()
    
    
    +-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
    |PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Embarked|iEmbarked|iSex|
    +-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
    |          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|       S|      0.0| 0.0|
    |          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|       C|      1.0| 1.0|
    |          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|  7.925|       S|      0.0| 1.0|
    |          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|   53.1|       S|      0.0| 1.0|
    |          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|   8.05|       S|      0.0| 0.0|
    |          6|       0|     3|    Moran, Mr. James|  male|30.0|    0|    0| 8.4583|       Q|      2.0| 0.0|
    |          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|51.8625|       S|      0.0| 0.0|
    |          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 21.075|       S|      0.0| 0.0|
    |          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|11.1333|       S|      0.0| 1.0|
    |         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|30.0708|       C|      1.0| 1.0|
    |         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|   16.7|       S|      0.0| 1.0|
    |         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  26.55|       S|      0.0| 1.0|
    |         13|       0|     3|Saundercock, Mr. ...|  male|20.0|    0|    0|   8.05|       S|      0.0| 0.0|
    |         14|       0|     3|Andersson, Mr. An...|  male|39.0|    1|    5| 31.275|       S|      0.0| 0.0|
    |         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0| 7.8542|       S|      0.0| 1.0|
    |         16|       1|     2|Hewlett, Mrs. (Ma...|female|55.0|    0|    0|   16.0|       S|      0.0| 1.0|
    |         17|       0|     3|Rice, Master. Eugene|  male| 2.0|    4|    1| 29.125|       Q|      2.0| 0.0|
    |         18|       1|     2|Williams, Mr. Cha...|  male|30.0|    0|    0|   13.0|       S|      0.0| 0.0|
    |         19|       0|     3|Vander Planke, Mr...|female|31.0|    1|    0|   18.0|       S|      0.0| 1.0|
    |         20|       1|     3|Masselmani, Mrs. ...|female|30.0|    0|    0|  7.225|       C|      1.0| 1.0|
    +-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
    only showing top 20 rows
    
    +------+----+----+-----+-----+-------+---------+--------+
    |Pclass|iSex| Age|SibSp|Parch|   Fare|iEmbarked|Survived|
    +------+----+----+-----+-----+-------+---------+--------+
    |     3| 0.0|22.0|    1|    0|   7.25|      0.0|       0|
    |     1| 1.0|38.0|    1|    0|71.2833|      1.0|       1|
    |     3| 1.0|26.0|    0|    0|  7.925|      0.0|       1|
    |     1| 1.0|35.0|    1|    0|   53.1|      0.0|       1|
    |     3| 0.0|35.0|    0|    0|   8.05|      0.0|       0|
    |     3| 0.0|30.0|    0|    0| 8.4583|      2.0|       0|
    |     1| 0.0|54.0|    0|    0|51.8625|      0.0|       0|
    |     3| 0.0| 2.0|    3|    1| 21.075|      0.0|       0|
    |     3| 1.0|27.0|    0|    2|11.1333|      0.0|       1|
    |     2| 1.0|14.0|    1|    0|30.0708|      1.0|       1|
    |     3| 1.0| 4.0|    1|    1|   16.7|      0.0|       1|
    |     1| 1.0|58.0|    0|    0|  26.55|      0.0|       1|
    |     3| 0.0|20.0|    0|    0|   8.05|      0.0|       0|
    |     3| 0.0|39.0|    1|    5| 31.275|      0.0|       0|
    |     3| 1.0|14.0|    0|    0| 7.8542|      0.0|       0|
    |     2| 1.0|55.0|    0|    0|   16.0|      0.0|       1|
    |     3| 0.0| 2.0|    4|    1| 29.125|      2.0|       0|
    |     2| 0.0|30.0|    0|    0|   13.0|      0.0|       1|
    |     3| 1.0|31.0|    1|    0|   18.0|      0.0|       0|
    |     3| 1.0|30.0|    0|    0|  7.225|      1.0|       1|
    +------+----+----+-----+-----+-------+---------+--------+
    only showing top 20 rows
    
    +--------------------+
    |            features|
    +--------------------+
    |[3.0,0.0,22.0,1.0...|
    |[1.0,1.0,38.0,1.0...|
    |[3.0,1.0,26.0,0.0...|
    |[1.0,1.0,35.0,1.0...|
    |(7,[0,2,5],[3.0,3...|
    |[3.0,0.0,30.0,0.0...|
    |(7,[0,2,5],[1.0,5...|
    |[3.0,0.0,2.0,3.0,...|
    |[3.0,1.0,27.0,0.0...|
    |[2.0,1.0,14.0,1.0...|
    |[3.0,1.0,4.0,1.0,...|
    |[1.0,1.0,58.0,0.0...|
    |(7,[0,2,5],[3.0,2...|
    |[3.0,0.0,39.0,1.0...|
    |[3.0,1.0,14.0,0.0...|
    |[2.0,1.0,55.0,0.0...|
    |[3.0,0.0,2.0,4.0,...|
    |(7,[0,2,5],[2.0,3...|
    |[3.0,1.0,31.0,1.0...|
    |[3.0,1.0,30.0,0.0...|
    +--------------------+
    only showing top 20 rows
    
    +--------+
    |Survived|
    +--------+
    |       0|
    |       1|
    |       1|
    |       1|
    |       0|
    |       0|
    |       0|
    |       0|
    |       1|
    |       1|
    |       1|
    |       1|
    |       0|
    |       0|
    |       0|
    |       1|
    |       0|
    |       1|
    |       0|
    |       1|
    +--------+
    

    Pipeline

    • 一个pipeline被指定成为一个阶段序列,
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    training = spark.createDataFrame([
        (0, "Hello PySpark", 1.0),
        (1, "Using Flink", 0.0),
        (2, "PySpark 3.0", 1.0),
        (3, "Test MySQL", 0.0)
    ], ["id", "text", "label"])
    # pipeline 三个阶段: tokenizer -> hashingTF -> logR.
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    logR = LogisticRegression(maxIter=10, regParam=0.001)
    pipeline = Pipeline(stages=[tokenizer, hashingTF, logR])
    #训练数据上进行pipeline fit操作,产生一个model
    model = pipeline.fit(training)
    #############################################
    #测试集
    test = spark.createDataFrame([
        (4, "PySpark Pipeline"),
        (5, "pipeline"),
        (6, "PySpark python"),
        (7, "julia c#")
    ], ["id", "text"])
    
    #model执行transform
    prediction = model.transform(test)
    #预测
    selected = prediction.select("id", "text", "probability", "prediction")
    for row in selected.collect():
        tid, text, prob, prediction = row
        print("(%d, %s) --> prediction=%f,prob=%s" \
        % (tid, text,  prediction,str(prob)))
        
     
    (4, PySpark Pipeline) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833]
    (5, pipeline) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104]
    (6, PySpark python) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833]
    (7, julia c#) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104]
    

    逻辑回归预测

    • 训练
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import StringIndexer, VectorAssembler
    from pyspark.ml.classification import LogisticRegression
    import matplotlib.pyplot as plt
    
    spark = SparkSession.builder.master("local[*]").appName("PySpark ML").getOrCreate()
    sc = spark.sparkContext
    #############################################
    
    df_train = spark.read.csv('hdfs://localhost:9000/titanic/train.csv',header=True,inferSchema=True).cache()
    
    df_train = df_train.fillna({'Age': round(29.699,0)})
    df_train = df_train.fillna({'Fare': 36.0})
    df_train = df_train.fillna({'Embarked': 'S'})
    df_train = df_train.drop("Cabin")
    df_train = df_train.drop("Ticket")
    labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
    model = labelIndexer.fit(df_train)
    df_train = model.transform(df_train)
    labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
    model = labelIndexer.fit(df_train)
    df_train = model.transform(df_train)
    features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']
    train_features = df_train[features]
    df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp','Parch', 'Fare', 'iEmbarked'], outputCol="features")
    train = df_assembler.transform(train_features)
    
    
    #LogisticRegression模型
    lg = LogisticRegression(labelCol='Survived')
    lgModel = lg.fit(train)
    #保存模型
    lgModel.write().overwrite().save("hdfs://localhost:9000/model/logistic-titanic")
    print("save model to hdfs://localhost:9000/model/logistic-titanic")
    
    
    print("areaUnderROC: " + str(lgModel.summary.areaUnderROC))
    #ROC curve is a plot of FPR against TPR
    
    plt.figure(figsize=(5,5))
    plt.plot([0, 1], [0, 1], 'r--')
    plt.plot(lgModel.summary.roc.select('FPR').collect(),
             lgModel.summary.roc.select('TPR').collect())
    plt.xlabel('FPR')
    plt.ylabel('TPR')
    plt.show()
    
    save model to hdfs://localhost:9000/model/logistic-titanic
    areaUnderROC: 0.8569355233864868
    
    image-20210619211443109
    • 预测
    from pyspark.ml.classification import LogisticRegressionModel
    
    df_test = spark.read.csv('hdfs://localhost:9000/titanic/test.csv',header=True,inferSchema=True).cache()
    df_test = df_test.fillna({'Age': round(29.699,0)})
    df_test = df_test.fillna({'Embarked': 'S'})
    #有一个null
    df_test = df_test.fillna({'Fare': 36.0})
    df_test = df_test.drop("Cabin")
    df_test = df_test.drop("Ticket")
    #新增Survived列,默认值为0
    df_test = df_test.withColumn("Survived",0 * df_test["Age"])
    
    labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
    model = labelIndexer.fit(df_test)
    df_test = model.transform(df_test)
    labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
    model = labelIndexer.fit(df_test)
    df_test = model.transform(df_test)
    features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']
    test_features = df_test[features]
    df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch','Fare', 'iEmbarked'], outputCol="features")
    test = df_assembler.transform(test_features)
    
    lgModel = LogisticRegressionModel.load("hdfs://localhost:9000/model/logistic-titanic")
    testSummary =lgModel.evaluate(test)
    
    results=testSummary.predictions
    results["features","rawPrediction","probability","prediction"].show()
    
    +--------------------+--------------------+--------------------+----------+
    |            features|       rawPrediction|         probability|prediction|
    +--------------------+--------------------+--------------------+----------+
    |[3.0,0.0,34.5,0.0...|[1.99328605097899...|[0.88009035220072...|       0.0|
    |[3.0,1.0,47.0,1.0...|[0.63374031844971...|[0.65333708360849...|       0.0|
    |[2.0,0.0,62.0,0.0...|[1.97058477648159...|[0.87767391006101...|       0.0|
    |(7,[0,2,5],[3.0,2...|[2.21170839644084...|[0.90129601257823...|       0.0|
    |[3.0,1.0,22.0,1.0...|[-0.2919725495559...|[0.42752102300610...|       1.0|
    |(7,[0,2,5],[3.0,1...|[1.68822917787023...|[0.84399113755992...|       0.0|
    |[3.0,1.0,30.0,0.0...|[-0.9032166903750...|[0.28838991532794...|       1.0|
    |[2.0,0.0,26.0,1.0...|[1.42490075002778...|[0.80610554993708...|       0.0|
    |[3.0,1.0,18.0,0.0...|[-1.1236436862496...|[0.24533604281752...|       1.0|
    |[3.0,0.0,21.0,2.0...|[2.59895227540995...|[0.93079411943702...|       0.0|
    |(7,[0,2,5],[3.0,3...|[2.33390585204715...|[0.91164644844255...|       0.0|
    |(7,[0,2,5],[1.0,4...|[0.69025711721974...|[0.66602412131662...|       0.0|
    |[1.0,1.0,23.0,1.0...|[-2.7419887292668...|[0.06054069440361...|       1.0|
    |[2.0,0.0,63.0,1.0...|[2.82767950026722...|[0.94415337330052...|       0.0|
    |[1.0,1.0,47.0,1.0...|[-1.7316563679495...|[0.15037583472736...|       1.0|
    |[2.0,1.0,24.0,1.0...|[-1.7197655536498...|[0.15190136429145...|       1.0|
    |[2.0,0.0,35.0,0.0...|[0.88008689342827...|[0.70684022722317...|       0.0|
    |[3.0,0.0,21.0,0.0...|[1.71304684487762...|[0.84723105652294...|       0.0|
    |[3.0,1.0,27.0,1.0...|[-0.1717428611873...|[0.45716950894284...|       1.0|
    |[3.0,1.0,45.0,0.0...|[-0.0389664987514...|[0.49025960775551...|       1.0|
    +--------------------+--------------------+--------------------+----------+
    

    决策树预测

    from pyspark.ml.classification import DecisionTreeClassifier
    #DecisionTree模型
    dtree = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
    treeModel = dtree.fit(train)
    #打印treeModel
    print(treeModel.toDebugString)
    #对训练数据进行预测
    dt_predictions = treeModel.transform(train)
    dt_predictions.select("prediction", "Survived", "features").show()
    
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Survived', metricName = 'accuracy')
    print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions))
    
    
    #查看树信息
    DecisionTreeClassificationModel (uid=DecisionTreeClassifier_43ab8b3b232f4c305402) of depth 5 with 49 nodes
      If (feature 1 in {0.0})
       If (feature 2 <= 3.0)
        If (feature 3 <= 2.0)
         Predict: 1.0
        Else (feature 3 > 2.0)
         If (feature 4 <= 1.0)
          Predict: 0.0
         Else (feature 4 > 1.0)
          If (feature 3 <= 4.0)
           Predict: 1.0
          Else (feature 3 > 4.0)
           Predict: 0.0
           
          ...
          
    +----------+--------+--------------------+
    |prediction|Survived|            features|
    +----------+--------+--------------------+
    |       0.0|       0|[3.0,0.0,22.0,1.0...|
    |       1.0|       1|[1.0,1.0,38.0,1.0...|
    |       1.0|       1|[3.0,1.0,26.0,0.0...|
    |       1.0|       1|[1.0,1.0,35.0,1.0...|
    |       0.0|       0|(7,[0,2,5],[3.0,3...|
    |       0.0|       0|[3.0,0.0,30.0,0.0...|
    |       0.0|       0|(7,[0,2,5],[1.0,5...|
    |       0.0|       0|[3.0,0.0,2.0,3.0,...|
    |       1.0|       1|[3.0,1.0,27.0,0.0...|
    |       1.0|       1|[2.0,1.0,14.0,1.0...|
    |       1.0|       1|[3.0,1.0,4.0,1.0,...|
    |       1.0|       1|[1.0,1.0,58.0,0.0...|
    |       0.0|       0|(7,[0,2,5],[3.0,2...|
    |       0.0|       0|[3.0,0.0,39.0,1.0...|
    |       1.0|       0|[3.0,1.0,14.0,0.0...|
    |       1.0|       1|[2.0,1.0,55.0,0.0...|
    |       0.0|       0|[3.0,0.0,2.0,4.0,...|
    |       0.0|       1|(7,[0,2,5],[2.0,3...|
    |       1.0|       0|[3.0,1.0,31.0,1.0...|
    |       1.0|       1|[3.0,1.0,30.0,0.0...|
    +----------+--------+--------------------+
    only showing top 20 rows
    
    Decision Tree Accu: 0.8417508417508418
    
    展开全文
  • Python大数据处理库 PySpark实战 总结三共享变量DataFrames 与 Spark SQL创建DataFramesSpark SQL基本用法编写Spark程序并提交 共享变量 广播变量 broadcast 广播变量允许程序缓存一个只读变量在集群的每台机器...
  • 本文通过使用SparkMachineLearningLibrary和PySpark来解决一个文本多分类问题,内容包括:数据提取、ModelPipeline、训练/测试数据集划分、模型训练和评价等,具体细节可以参考下面全文。ApacheSpark受到越来越多的...
  • 以一个处理结构化数据的入门程序,带大家进入pyspark的大门。本例子以深圳股市的股息率分析为例,讲解spark的输入、分析计算和输出。
  • PySpark实战指南:准备数据建模

    千次阅读 2019-06-04 21:42:07
    from pyspark.context import SparkContext from pyspark.sql.session import SparkSession sc = SparkContext('local') spark = SparkSession(sc) df = spark.createDataFrame([ (1, 144.5, 5....
  • Pyspark实战(四)pyspark操作hbase

    千次阅读 2019-07-01 23:09:49
    from pyspark.sql import SparkSession def hbasetest():  spark = SparkSession.builder.appName('SparkHBaseRDD').getOrCreate()  sc=spark.sparkContext  tablename='test'  conf = {"hbase.mapreduce....
  • Hbase环境参考上一章节 安装happybase Pip install happybase ...from pyspark.sql import SparkSession import happybase def hpbase(): spark = SparkSession.builder.appName('SparkHBaseRDD').mast...
  • pyspark和happyhase操作hbase需要提前部署和安装pyspark和happyhbase的python包,具体的安装过程可看靠前面章节,这里不再赘述。 1、引入相关包 from pyspark import SparkContext,SparkConf #pyspark包,v2.2.0 ...
  • PySpark实战之Spark优化

    2020-09-13 19:41:32
    1、优化之HistoryServer配置及使用 Monitoring and Instrumentation There are several ways to monitor Spark applications: web UIs, metrics, and external instrumentation. 译 监测与仪器 ...
  • Pyspark实战(一)环境部署

    千次阅读 2019-06-27 22:59:59
    这里假设Python环境已经部署完成,相关版本如下: spark2.2.0,部署过程参考https://blog.csdn.net/luoye4321/article/details/90552674。 python3.7,部署过程参考... JavaJDK1.8以上版本 下载pyspark包 使用...
  • E盘根目录创建test.txt输入测试内容如下: this is a test this very good you is very good ...from pyspark import SparkContext,SparkConf def wordcount(): txtfile=r'E:\test.txt' c...
  • Pyspark的本质还是调用scala的jar包,我们以上篇文章wordcount为例,其中一段代码为: rdd.flatMap(lambda x:x.split( )).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).foreach(lambda x:print(x)) 其中:...
  • 比较的时候采用二分法查找,找到对应的经度和纬度 4,对相同的经度和纬度做累计求和 代码 from pyspark.sql import SparkSession # 255.255.255.255 0~255 256 2^8 8位2进制数 32位2进制数 #将ip转换为特殊的数字...
  • 用户画像-ID_MAPPING pyspark实战

    千次阅读 热门讨论 2019-08-11 12:08:07
    # 高版本中有方法map_from_arrays可以直接用,利用函数的时候,传进去的数据类型一定要是能够识别的数据类型,公司集群老版本pyspark,没有这个方法 # b = df_idmapping_need.select(F.map_from_arrays(F.array(F....
  • https://cloud.tencent.com/developer/article/1096712 Spark的安装和使用(Python版) http://dblab.xmu.edu.cn/blog/1689-2/ https://blog.csdn.net/qq_14959801/article/details/79586786 Spark大数据分析...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,236
精华内容 494
关键字:

pyspark实战