精华内容
下载资源
问答
  • 基于ApacheSpark软件栈的实时大数据
  • 文章目录实习五 Spark软件栈体验Spark安装与启动1.Spark RDD-WordCount2.Spark SQL3.Spark MLlib之Titanic4.GraphX再现PageRank 实习五 Spark软件栈体验 Spark安装与启动 本次实习采用spark为3.0.0版本。 根据教程...

    实习五 Spark软件栈体验

    Spark安装与启动

    本次实习采用spark为3.0.0版本。

    在根据教程安装后输入

    ./bin/spark-shell

    进入交互模式,界面生成如下结果:

    在这里插入图片描述

    输入如下代码进行RDD简单操作:

    val textFile = sc.textFile("file:///usr/local/spark/spark-3.0.0-bin-hadoop2.7/README.md")
    textFile.count()  
    
    textFile.first() 
    
    textFile.filter(line => line.contains("Spark")).count()
    
    textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) 

    在这里插入图片描述

    1.Spark RDD-WordCount

    1. 清除txt中的标点符号,得到纯单词与空格组成的Shakespeare.txt

      在这里插入图片描述

    2. 导入Shakespeare.txt并计数

    val s = sc.textFile("file:///usr/local/spark/spark-3.0.0-bin-hadoop2.7/Shakespeare.txt")
    s.count

    在这里插入图片描述

    1. 设定输出的文件个数并执行统计逻辑

      val numOutputFiles = 1
      val counts = s.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _, numOutputFiles)
    2. 保存计算结果到本地

      counts.saveAsTextFile("file:///usr/local/spark/spark-3.0.0-bin-hadoop2.7/Shakespeare_output.txt")

      在这里插入图片描述

    3. 在本地查看结果

      在这里插入图片描述

      在这里插入图片描述

    2.Spark SQL

    1. 数据:

      采用教学网上的tmdb-5000-movie-dataset.zip中的数据集。

    2. 目标:

      在该数据集上实现的两个实用的查询功能。

    3. 思路:

      分为预处理和查询两个步骤;从本地导入数据集到python程序后,对于公司为空的电影进行过滤,且对于某个对应多个电影公司的电影,将其按照“production_companies”一栏中拆分,将电影公司按照逗号拆分成多行;随后我们进行两项查询:

      1. 查询平均评分大于6.5的电影

      2. 按照公司将高分电影的收入加起来,然后列出前20名收入高的公司

    4. 代码:

      from pyspark.sql import SparkSession
      from pyspark.sql.functions import explode
      from pyspark.sql.functions import split
      from pyspark.sql.functions import col
      from pyspark.sql.functions import explode_outer
      from pyspark.sql import functions as F
      
      import sys
      sys.path.append("/usr/local/spark/spark-3.0.0-bin-hadoop2.7/python/pyspark/mllib/Movie")
      import pyspark
      spark = pyspark.sql.SparkSession.builder.appName("SimpleApp").getOrCreate()
      sc = spark.sparkContext
      
      df=spark.read.csv('file:///usr/local/spark/spark-3.0.0-bin-hadoop2.7/python/pyspark/mllib/Movie/tmdb_5000_movies.csv',inferSchema=True,header=True)
      df.printSchema()
      
      #过滤掉出品公司为空的电影
      df_filter=df.filter(df['production_companies']!='[]')
      
      #将电影公司按逗号拆分成多行
      df_whith=df_filter.withColumn('production_companies_tmp',explode(split("production_companies", ",")))
      df_whith.select('production_companies_tmp').show(10)
      
      #查询评分>6.5的前5部电影
      df_where = df_whith.where(F.col("vote_average")>'6.5')
      df_where.show(5)
      
      #查询收入为前20名的公司
      df_res=df_where.groupBy('original_title','production_companies_tmp').agg({"revenue": "sum"}).withColumnRenamed("sum(revenue)", "sum_revenue").orderBy(F.desc('sum_revenue'))
      df_res.show(20)
    5. 运行结果:

      在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    在shell中结果显示如下:

    lz@lz-virtual-machine:/usr/local/spark/spark-3.0.0-bin-hadoop2.7/python/pyspark/mllib/Movie$ python3 movie-analysis.py
    20/08/11 22:35:58 WARN util.Utils: Your hostname, lz-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.152.128 instead (on interface ens33)
    20/08/11 22:35:58 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    20/08/11 22:35:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    root                                                                            
     |-- budget: integer (nullable = true)
     |-- genres: string (nullable = true)
     |-- homepage: string (nullable = true)
     |-- id: string (nullable = true)
     |-- keywords: string (nullable = true)
     |-- original_language: string (nullable = true)
     |-- original_title: string (nullable = true)
     |-- overview: string (nullable = true)
     |-- popularity: string (nullable = true)
     |-- production_companies: string (nullable = true)
     |-- production_countries: string (nullable = true)
     |-- release_date: string (nullable = true)
     |-- revenue: string (nullable = true)
     |-- runtime: string (nullable = true)
     |-- spoken_languages: string (nullable = true)
     |-- status: string (nullable = true)
     |-- tagline: string (nullable = true)
     |-- title: string (nullable = true)
     |-- vote_average: string (nullable = true)
     |-- vote_count: string (nullable = true)
    
    +------------------------+
    |production_companies_tmp|
    +------------------------+
    |    http://www.avatar...|
    |          "[{""id"": 270|
    |          "[{""id"": 470|
    |    http://www.thedar...|
    |          "[{""id"": 818|
    |          "[{""id"": 851|
    |           {""id"": 2343|
    |         "[{""id"": 8828|
    |          "[{""id"": 616|
    |          "[{""id"": 849|
    +------------------------+
    only showing top 10 rows
    
    +---------+-------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+--------------------+-----+--------------------+--------------------+------------------------+
    |   budget|       genres|            homepage|             id|            keywords|   original_language|      original_title|            overview|          popularity|production_companies|production_countries|        release_date|             revenue|          runtime|    spoken_languages|          status|             tagline|title|        vote_average|          vote_count|production_companies_tmp|
    +---------+-------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+--------------------+-----+--------------------+--------------------+------------------------+
    |105000000|"[{""id"": 18| ""name"": ""Dram...| {""id"": 10749| ""name"": ""Roma...|                null|               64682|      "[{""id"": 818| ""name"": ""base...|       {""id"": 1326| ""name"": ""infi...|       {""id"": 1523| ""name"": ""obse...|    {""id"": 3929| ""name"": ""hope""}| {""id"": 209714| ""name"": ""3d""}]"|   en|    The Great Gatsby|An adaptation of ...|           {""id"": 1326|
    |185000000|"[{""id"": 28| ""name"": ""Acti...|    {""id"": 12| ""name"": ""Adve...|        {""id"": 878| ""name"": ""Scie...|http://www.startr...|              188927|     "[{""id"": 9663| ""name"": ""sequ...|       {""id"": 9743| ""name"": ""stra...|  {""id"": 158449| ""name"": ""hatr...| {""id"": 161176| ""name"": ""spac...|   en|    Star Trek Beyond|The USS Enterpris...|         "[{""id"": 9663|
    |180000000|"[{""id"": 28| ""name"": ""Acti...|    {""id"": 12| ""name"": ""Adve...|http://legendofta...|              258489|      "[{""id"": 409| ""name"": ""afri...|       {""id"": 5650| ""name"": ""fera...|       {""id"": 7347| ""name"": ""tarz...|   {""id"": 10787| ""name"": ""jung...| {""id"": 158130| ""name"": ""anim...|   en|The Legend of Tarzan|Tarzan, having ac...|           {""id"": 5650|
    |175000000|"[{""id"": 16| ""name"": ""Anim...| {""id"": 10751| ""name"": ""Fami...|         {""id"": 12| ""name"": ""Adve...|        {""id"": 878| ""name"": ""Scie...|http://www.monste...|               15512|     "[{""id"": 9951| ""name"": ""alie...|   {""id"": 10891| ""name"": ""gian...| {""id"": 179431| ""name"": ""duri...|   en|  Monsters vs Aliens|When Susan Murphy...|    http://www.monste...|
    |165000000|"[{""id"": 35| ""name"": ""Come...|    {""id"": 12| ""name"": ""Adve...|         {""id"": 14| ""name"": ""Fant...|         {""id"": 16| ""name"": ""Anim...|      {""id"": 10751| ""name"": ""Fami...|http://www.shrekf...|               10192|"[{""id"": 189111| ""name"": ""ogre""}| {""id"": 209714| ""name"": ""3d""}]"|   en| Shrek Forever After|A bored and domes...|          {""id"": 10751|
    +---------+-------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+--------------------+-----+--------------------+--------------------+------------------------+
    only showing top 5 rows
    
    
    +--------------------+------------------------+-----------+                     
    |      original_title|production_companies_tmp|sum_revenue|
    +--------------------+------------------------+-----------+
    | ""name"": ""Anim...|             {""id"": 14|   417859.0|
    | ""name"": ""Dram...|             {""id"": 53|   316152.0|
    | ""name"": ""Acti...|             {""id"": 35|   173931.0|
    | ""name"": ""Roma...|             {""id"": 53|    51955.0|
    | ""name"": ""Fant...|           {""id"": 9648|    45649.0|
    | ""name"": ""Thri...|          {""id"": 10749|    42586.0|
    | ""name"": ""Come...|          {""id"": 10769|    33106.0|
    | ""name"": ""Thri...|           {""id"": 9648|    26268.0|
    | ""name"": ""Come...|          {""id"": 10402|    20391.0|
    | ""name"": ""Adve...|             {""id"": 35|    19585.0|
    | ""name"": ""Myst...|          {""id"": 10749|    14283.0|
    | ""name"": ""Fant...|          {""id"": 10751|    10192.0|
    | ""name"": ""Thri...|             {""id"": 14|     9707.0|
    | ""name"": ""Adve...|             {""id"": 18|     9588.0|
    | ""name"": ""Come...|           {""id"": 9648|     7364.0|
    | ""name"": ""Myst...|             {""id"": 53|     2779.0|
    |               21301|                      en|   6.501815|
    |               96399|                      en|   5.759545|
    |               20761|                      en|   5.566203|
    |               25461|                      en|   3.643662|
    +--------------------+------------------------+-----------+
    only showing top 20 rows

    3.Spark MLlib之Titanic

    1. 数据:

      采用教学网上的TitanicTrainTest.zip中的数据作为训练集和测试集。

    2. 目标:

      采用教学网上的TitanicTrainTest.zip中给出的训练集数据训练分类模型,并且给出在测试集中的正确率

    3. 思路:

      对数据进行预处理,例如:对缺省数据可以采取用平均值补充等方式来进行填补;随后用pyspark中 的mllib中的 模型,基于已知数据进行训练,再从测试集上给出模型的正确率。

    4. data_preload.py

      首先对数据进行预加载,载入“trainwithlabels.csv”与“testwithlabels.csv”中;初步处理一下,Age缺省者用平均值补充,将文字类标签都转化为数字/字母,简化标签;存储到 “processtestwithlabels.csv”与"processtrainwithlabels.csv"中。

      import pandas as pd
      
      def fixing(path,doc):
          try:
              titanic = pd.read_csv(path + doc)
          except:
              print("can't find {doc} in {path}! Please check you path.")
              raise(ValueError)
          titanic['Age']=titanic['Age'].fillna(titanic['Age'].median())   #fillna()表示补充,median()表示求平均值
          titanic.loc[titanic['Sex']=='male','Sex']=0
          titanic.loc[titanic['Sex']=='female','Sex']=1
          titanic['Embarked']=titanic['Embarked'].fillna('S')
          titanic.loc[titanic['Embarked']=='S','Embarked']=0
          titanic.loc[titanic['Embarked']=='C','Embarked']=1
          titanic.loc[titanic['Embarked']=='Q','Embarked']=2
          titanic = titanic.drop(['Name','Ticket','Cabin'],axis=1)
          titanic.to_csv("./" + "process" + doc,index=False)
      
      if __name__ == '__main__':
          fixing("./","trainwithlabels.csv")
          fixing("./","testwithlabels.csv")
          print("finish successfully!")
    5. mllib_data_process.py

      对数据进行一系列处理,使得数据更好/作为mllib训练数据输入时符合mllib所要求的格式。导入的数据为“processtestwithlabels.csv”与"processtrainwithlabels.csv",输出到“train_mllib.data”与 “test_mllib.data”中。

      from pyspark.mllib.linalg import SparseVector
      from pyspark.mllib.regression import LabeledPoint 
      import numpy as np
      import pandas as pd
      
      
      def feature_normalize(data):
          mu = np.mean(data,axis=0) 
          std = np.std(data,axis=0) 
          return (data - mu)/std
      
      
      def save_mllib(path,name,type):
          file = pd.read_csv(path+name)
          label = file['Survived'].values
          temp_data = file[["Pclass","Sex","Age","SibSp","Parch","Fare","Embarked"]].values
          temp_data = feature_normalize(temp_data)
          output = ''
          for i in range(len(label)):
              output += str(label[i]) + " 1:"
              for j in range(7):
                  output += str(temp_data[i][j]) 
                  if j == 6:
                      output += '\n'
                  else:
                      output += " " + str(j + 2) + ":"
          if type == "train":
              newfile = open(path+"train_mllib.data",'w')
          else:
              newfile = open(path+"test_mllib.data",'w')
          newfile.write(output)
          newfile.close()
      
      if __name__ == "__main__":
          save_mllib("./","processtrainwithlabels.csv","train")
          save_mllib("./","processtestwithlabels.csv","test")
          
    6. mllib.py

      训练模型。调用SVMwithSGD中的训练方式,基于“train_mllib.data”与 “test_mllib.data”进行训练;随后将该模型应用于测试集test_data上进行预测,并且与真正的label进行比对,算出预测成功的数量,从而计算得出预算成功的准确率。

      from pyspark.mllib.util import MLUtils
      from pyspark.mllib.classification import SVMWithSGD
      
      import sys
      sys.path.append("/usr/local/spark/spark-3.0.0-bin-hadoop2.7/python")
      import pyspark
      spark = pyspark.sql.SparkSession.builder.appName("SimpleApp").getOrCreate()
      sc = spark.sparkContext
      
      train_data = MLUtils.loadLibSVMFile(sc=sc,path='file:///usr/local/spark/spark-3.0.0-bin-hadoop2.7/python/pyspark/mllib/Titanic/train_mllib.data')
      test_data = MLUtils.loadLibSVMFile(sc=sc,path='file:///usr/local/spark/spark-3.0.0-bin-hadoop2.7/python/pyspark/mllib/Titanic/test_mllib.data')
      
      model = SVMWithSGD.train(train_data, iterations=100, step=1, miniBatchFraction=1.0, regParam=0.01, regType="l2")
      prediction = model.predict(test_data.map(lambda x :x.features)).collect()
      true_label = test_data.map(lambda x :x.label).collect()
      account = 0
      for index in range(len(true_label)):
      	if true_label[index] == prediction[index]:
      		account += 1
      print("accuracy: {}".format(account/len(true_label)))
      print("done")
    7. 在shell中操作:

      cd /usr/local/spark/spark-3.0.0-bin-hadoop2.7/python/pyspark/mllib/Titanic
      pip3 install pandas//事先没有安装pandas
      python3 data_preload.py
      python3 mllib_data_process.py
      python3 mllib.py

      在这里插入图片描述

      在这里插入图片描述

      最后一步结果为

      20/08/11 21:20:16 WARN util.Utils: Your hostname, lz-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.152.128 instead (on interface ens33)
      20/08/11 21:20:16 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
      20/08/11 21:20:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      Setting default log level to "WARN".
      To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
      accuracy: 0.81  

      可以看出训练出的该模型在测试集上准确率为0.81。

    8. 一些错误类型总结:

      1. 在运行时需要开启hadoop

        start-dfs.sh
        start-yarn.sh
        mr-jobhistory-daemon.sh start historyserver

        否则会报错fail connection

      2. 在运行mllib.py时遇到了hdfs空间文件上传出问题的情况,屡次格式化等操作后都不成功后,在py文件中将

        path='./train_mllib.data'
        

        改成了本地绝对路径

        path='file:///usr/local/spark/spark-3.0.0-bin-hadoop2.7/python/pyspark/mllib/Titanic/train_mllib.data'

        这样,在运行mllib.py时便不会产生报错说非要去找hadoop空间中的文件:

        Input path does not exist: hdfs://localhost:9000/user/lz/train_mllib.data

        而是直接从本地文件中找到了。

    4.GraphX再现PageRank

    1. 数据:

      采用教学网上的network.jpg,写成vertex和edge两个txt文档,edge.txt中每一行为顶点i指向顶点j的形式,手动将无向图的一条边改写为有向图的两条边。

      在这里插入图片描述

    2. 程序

      代码如下:(只整理了scala中需要输入的命令)

      scala> import org.apache.spark.graphx.GraphLoader
      
      
      scala> val graph = GraphLoader.edgeListFile(sc,"file:///home/lz/Desktop/homework/edge.txt")
      //导入边数据
      
      scala> val ranks = graph.pageRank(0.0001).vertices
      //运行PageRank
      
      scala> val users= sc.textFile("file:///home/lz/Desktop/homework/vertex.txt").map { line =>
           |   val fields = line.split(",")
           |   (fields(0).toLong, fields(1))
           |   }
      //导入顶点对应关系
      
      scala> val ranksByUsername = users.join(ranks).map {
           |   case (id, (username, rank)) => (username, rank)
           | }
      //整理
      
      //输出
      scala> println(ranksByUsername.collect().mkString("\n"))
      (D,1.4715087621628664)
      (A,1.0192604591304733)
      (F,1.2890402583321614)
      (C,0.7942640133346549)
      (G,1.2890402583321614)
      (I,0.8567906329897478)
      (H,0.9524103919494304)
      (J,0.5141607513033776)
      (E,0.7942640133346549)
      (B,1.0192604591304733)

      运行结果如图:

    在这里插入图片描述

    在这里插入图片描述

    最终迭代多次稳定后结果为:

    (D,1.4715087621628664)
    (A,1.0192604591304733)
    (F,1.2890402583321614)
    (C,0.7942640133346549)
    (G,1.2890402583321614)
    (I,0.8567906329897478)
    (H,0.9524103919494304)
    (J,0.5141607513033776)
    (E,0.7942640133346549)
    (B,1.0192604591304733)
    
    展开全文
  • Spark 软件栈架构概述

    千次阅读 2019-03-24 12:28:16
    速度方面,Spark拓展了MapReduce计算框架,并且高效的支持更多计算模式,包括交互式查询和流处理。处理大规模数据集时,Spark的一个核心优势是内存计算,因而速度更快。 Spark适用于各种各样的不同的分布式平台...

    Spark是一个用于实现快速通用的集群计算的平台

    在速度方面,Spark拓展了MapReduce计算框架,并且高效的支持更多计算模式,包括交互式查询和流处理。在处理大规模数据集时,Spark的一个核心优势是内存计算,因而速度更快。

    Spark适用于各种各样的不同的分布式平台的场景,包括批处理、迭代计算、交互式查询、流处理,通过在一个统一的框架下支持这些计算,减轻了对各种平台分别管理调控的负担

    Spark提供丰富的接口,包括Python Java Scala SQL的调用API以及内建的程序库,Spark可以运行在Hadoop集群上

    Spark的核心是一个对由很多计算任务组成、运行在多个计算节点的应用调度、分发及监控的计算引擎,由于Spark引擎速度快,通用的特点,Spark还支持各种组件

    ——Spark Core

    Spark Core实现了Spark基本功能,包括任务调度、内存管理、错误恢复、存储系统的交互。其核心是RDD(resilient distributed dataset)弹性分布式数据集 RDD表示分布在多个计算节点可以并行操作的元素集合、是Spark编程的抽象

    ——Spark SQL

    Spark SQL是Spark用于操作结构化数据的程序包,可以使用SQL 或Apache Hive 版本的HQL来查询数据,同时SQL可以与传统RDD编程结合,程序设计人员可以在应用中使用SQL来进行数据分析

    ——Spark Streaming

    Spark提供的对实时数据的流式计算组件,如应用于生产环境网页服务器日志分析,消息队列分析,都是数据流

    ——集群管理器

    Spark支持在各种集群管理器上运行,包括Hadoop YARN Apache Mesos 以及Spark自带的简易的调度管理器 

    展开全文
  • 戴金权:基于Spark软件栈的下一代大数据分析

    千次下载 热门讨论 2014-12-18 10:38:06
    该文档来自于英特尔大数据首席架构师戴金权,2014中国大数据技术大会大数据技术分论坛的演讲“基于Spark软件栈的下一代大数据分析”。
  • 1)Spark core:是其它组件的基础,spark的内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装了底层通讯框架,是Spark的基础。 2)SparkStreaming是一个对实时数据流进行高通量、容错处理的...

    1)Spark core:是其它组件的基础,spark的内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装了底层通讯框架,是Spark的基础。

    2)SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,将流式计算分解成一系列短小的批处理作业。

    3)Spark sql:Shark是SparkSQL的前身,Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。

    4)BlinkDB :是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。

    5)MLBase是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。

    6)GraphX是Spark中用于图和图并行计算

    展开全文
  • spark

    2019-11-29 09:30:18
    spark软件栈 spark Core:实现了spark的基本功能,包括任务调度、内存管理、错误恢复与存储系统交互等模块以及对RDD(弹性分布式数据集)的定义及相关的API操作。 spark Sql:是spark用来操作结构化数据的程序包,...

    spark数据分析导论

    spark是什么?

    spark是一个用来实现快速而通用的集群计算平台。主要的特点就是在内存中进行计算。

    spark的软件栈

    在这里插入图片描述

    spark Core:实现了spark的基本功能,包括任务调度、内存管理、错误恢复与存储系统交互等模块以及对RDD(弹性分布式数据集)的定义及相关的API操作。

    spark Sql:是spark用来操作结构化数据的程序包,支持Apache Hive 版本的HQL来进行数据查询。

    **spark streaming:**是spark提供的对实时数据进行流式计算的组件。

    **MLlib:**提供了常见的机器学习功能的程序库,提供了一些算法的实现:分类、回归、据类以及模型评估等。

    **GraphX:**提供了操作图的程序库,可以进行并行的图计算。

    RDD编程

    RDD概述

    **RDD:**弹性分布式数据集,其实就是分布式的元素集合。spark自动将RDD的数据分发到集群上,并将操作并行化执行。

    RDD操作分为转化操作和行动操作,转化操作会由一个RDD生成一个新的RDD,行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中。转化操作是惰性的,需要行动进行触发,举一个例子:

    lines = sc.textFile("README.md") #创建RDD,但是不会加载数据到内存中
    python_lines = lines.filter(lambda line: "python" in line) # 转化成新的RDD
    python_lines.first() # 并发执行计算
    

    执行first()行动算子时,spark会只将含有python单词的第一行数据拉取到内存中,而不是全部拉取,默认情况下,Spark的RDD会在每次对他们进行行动操作时重新计算,如果想要在多个行动中使用同一个RDD的话,需要对RDD进行持久化操作。

    创建RDD

    两种方式:

    • 取读外部数据
    • 在驱动器程序中对一个集合进行并行化
    lines = sc.parallelize(["pandas","I like python"]) # 开发时用的不多,因为需要将所有集合数据拉到一个节点上
    

    RDD操作

    两种操作:

    • 转化操作:惰性操作,返回的一定是新的RDD。
    • 行动操作:非惰性操作,返回的一定是其他类型的数据类型,这也是区别是否为行动操作的重要指标
    errorsRDD = inputRDD.filter(lambda x: "error" in x)
    warningsRDD = inputRDD.filter(lambda x: "warning" in x)
    badlinesRDD =  errorsRDD.union(warningsRDD) # 连接操作
    

    上述代码中,spark会使用谱系图来记录这些不同RDD之间的依赖关系,当持久化的RDD缺失数据时,spark也是使用谱系图来进行RDD的恢复的。

    在这里插入图片描述

    转化操作时惰性的,所以在进行转化操作时,操作程序不会立即执行,所有我们最好不要把RDD看作存储某些特定数据的数据集合,我们可以把它看成存储记录如何计算数据的指令列表。

    转化操作算子

    map():接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中的元素。

    nums = sc.parallelize([1,2,3,4])
    squared = nums.map(lambda x: x*x) # squared: 1,4,9,16
    

    filter():接收一个函数,将RDD中满足该函数的元素放入新的RDD中返回。

    words = sc.parallelize(["python","java","C","C++"])
    squared = words.map(lambda x: "C" in x) # squared: "C","C++"
    

    flatMap():接收一个函数,应用于RDD中的每个元素并将返回的所有元素构成新的RDD.

    lines = sc.parallelize(["python R beautiful","java is good","C","C++ well"])
    words = lines.flatMap(lambda x: x.split(" ")) # 'python', 'R', 'beautiful', 'java', 'is', 'good', 'C', 'C++', 'well'
    

    distinct():去重

    sample():对RDD进行采样

    union():生成一个包含两个RDD中所有元素的RDD

    intersection():求两个RDD共同的元素的RDD

    subtract():移除一个RDD中的内容

    cartesian():与另一个RDD的笛卡尔积

    num1 = sc.parallelize([1,2,3])
    num2 = sc.parallelize([3,4,5])
    num_all = num1.union(num2) # 1,2,3,3,4,5
    num_dis = num_all.distinct() # 1,2,3,4,5
    num_samp = num_all.sample(False,0.6) # 不确定的2个数
    num_inter = num1.intersection(num2) # 3
    num_sub = num1.subtract(num2) # 1,2
    num_car = num1.cartesian(num2) # (1,3),(1,4)...(3,5)
    

    行动操作算子(常用)

    reduce():接收一个函数,操作两个相同类型的元素,并返回一个同样类型的元素。

    collect():返回RDD中所有元素,需要将集群上的数据拉取到单节点上。

    count():返回RDD中元素的个数

    countByValue():各个元素在RDD中出现的次数

    take(num):从RDD中返回num个元素

    top(num):从RDD中返回最前面的num个元素

    aggregate(zeroValue)(seq0p,comb0p):与reduce类似,不同之处在于返回不同类型的数据。理解起来很难,可以参考此链接文章

    RDD持久化

    RDD是惰性求值的,当重复的使用同一个RDD时,每次都会重算RDD以及它的所有依赖,消耗很大,需要对RDD进行持久化

    RDD.persist()
    RDD.cache()
    

    在这里插入图片描述

    参数末尾加上“_2”表示持久化数据分为两份

    当缓存的数据过多,内存存不下时,spark会采用最近最少使用的缓存策略从内存中清除数据,若仅是内存缓存级别,会直接清除,下次使用,重新计算,若是磁盘级别,会将移除的数据存入磁盘中。

    键值对RDD操作

    spark中包括键值对类型的RDD叫做键值对RDD

    以键值对集合{(1,2),(3,4),(3,6)}为例

    在这里插入图片描述

    在这里插入图片描述

    数据读取与保存

    文本文件: 每一行都是RDD的一个元素

    input = sc.textFile("file:///home/README.md")
    
    result.saveAsTextFile(outputFile)
    

    json:读取json数据时,常常需要将其所在文件读入,然后使用json解析器来对json进行解析。

    此外还支持csv、sequenceFile、对象文件、hdsf、hbase、hive等数据的读取与写入。

    集群环境下的spark

    在分布式环境下,spark集群采用的是主/从结构,有一个节点负责中央协调(driver节点),调度各个分布式的工作节点(执行器)。
    在这里插入图片描述

    驱动器节点:spark驱动器就是执行程序中的main()方法的进程。两大职责:

    • 驱动器程序负责把用户程序转化为多个物理执行的单元(task任务),其实相当与由操作指令和逻辑组成了一个有向无环图(DAG),然后按照这个DAG执行物理计划。
    • 调度执行器节点,根据DAG,驱动器程序会在各个执行器进程间协调任务的调度。驱动器会根据当前的执行点集合,尝试把所有任务基于数据所在位置分配给合适的执行器进程,从而减少网络消耗。

    执行器节点:spark执行器是一种工作进程,负责在spark作业中运行任务,任务间相互独立。两大职责:

    • 负责运行组成spark应用的任务,并将结果返回给驱动器进程。
    • 他们通过自身的块管理器为用户程序中要求缓存的RDD提供内存式存储。

    小结:

    1. 用户通过spark2-submit脚本提交应用。
    2. spark2-submit脚本启动驱动器程序,调用用户定义的main()方法。
    3. 驱动器程序与集群管理器通信,申请资源以启动执行器节点。
    4. 集群管理器为驱动器程序启动执行器节点。
    5. 驱动器进程执行用户应用中的操作。根据程序中所定义的对RDD的转化操作和行动操作,驱动器节点把工作以任务的形式发送到执行器进程。
    6. 任务在执行器程序中进行计算并保存结果。
    7. 如果驱动器程序的main()方法退出,或者调用了sparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源。

    spark2-submit部署应用

    bin/spark2-submit [optinos] <app jar | python file> [app options]
    

    在这里插入图片描述

    spark调优与调试

    每个sparkContext对象都需要一个sparkConf的实例,sparkConf的三个创建方式。

    • spark默认配置项。
    • 代码层次创建的对象conf = new SparkConf() conf.set("spark.app.name", "my spark app")
    • 使用spark-submit传递的参数。

    实际配置生效优先级:2 > 3 > 1

    并行度优化

    RDD会被分为一系列的分区,每个分区都是整个数据的子集,当spark调度并运行任务时,spark会为每个分区中的数据创建出一个任务,因此调整分区的数量提升并行度来提高程序的效率。两种调整分区的方法:

    • 在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度。
    • 对已有的RDD进行重新分区来获得更多或者更少的分区数RDD.repartition(num)

    序列化格式

    当Spark需要通过网络传输数据,或是将数据溢写到磁盘上时,spark需要把数据序列化为二进制格式,默认情况,spark会使用java内建的序列化库,在这一点上,我们可以使用其他的更好的序列化工具库,如Kryo

    内存管理

    spark中内存主要有以下几种用途:

    • RDD存储:当进行RDD持久化时,RDD的分区会被缓存到内存中。
    • 数据清洗与聚合的缓存区:当进行数据混洗操作时,spark会创建一些缓冲区来存储输出数据或聚合操作的中间结果,可以通过spark.shuffle.memoryFraction参数来限定缓存区内存占总内存的比例。
    • 用户代码:用户代码中定义的函数,数组及对象等会占用大部分内存。

    默认情况下,spark的内存分配方案:RDD存储:60%;缓存区:20%;用户代码:20%。

    内存优化的三种方案:

    • 根据需要调整各个内存占比。
    • 进行RDD持久化时,选择最优的缓存等级,若内存占用较大,可以采用MEMORY_AND_DISK(溢出后缓存到磁盘上)的等级。
    • 当内存占用较大时,缓存序列化后的对象,而不是要直接缓存,虽然先序列化会浪费一部分时间,但是也比需要时从新根据谱系图重新计算要快。

    Spark SQL介绍

    spark SQL 是用来操作结构化和半结构化数据的接口,提供以下三大功能:

    • 可以从各种结构化数据源中读取数据。
    • 不仅支持在spark程序内使用sql语句进行数据查询,也支持外部工具中通过标准数据库连接器(JDBC/ODBC)进行查询。
    • spark SQL 支持SQL与常规的python/java/scala代码高度整合。
    ### 下面介绍一下写pyspark SQL脚本的一些语法
    from pyspark.sql import SparkSession # 引入pyspark sql 包
    import pyspark.sql.functions as F # 导入pyspark sql 中相关函数
    from pyspark.sql.window import Window # 导入开窗函数,数据清洗时需要用到
    import argparse # 导入获取执行python脚本运行参数的包
    from pyspark.sql.types import * # 导入使用自定义函数udf时,用的返回类型
    
    # 获取脚本执行时需要传递的参数
    paraser = argparse.ArgumentParser()
    paraser.add_argument('-date','--stat_dt') # 添加参数命--stat_dt 或 -date的参数
    args = paraser.parse_parse_args()
    stat_dt = args.stat_dt # 得到stat_dt参数值
    
    # 定义一个函数,作为udf函数参数使用
    def get_test_data(name):
        if name == "python":
            return 1
        else:
            return 0
     
    #定义udf函数
    udf_get_test_data = F.udf(get_test_data,IntegerType()) #注意函数名不要(),后面参数为函数返回值													   # 的类型
    # 获取一个sparkSession对象
    spark = SparkSession.builder.appName("my test app")
    		.config("hive.exec.dynamic.partition","true") \ # 设置开启动态分区
        	.config("hive.exec.dynamic.partition.mode","nonstrict") \ # 全部动态分区
            .enableHiveSupport() \ 
            .getOrCreate()
            
    # 获取hive中数据
    data = spark.sql("select cust_id,name,price,stat_dt from test_table") # 获取hive上相关字段数据
    data = data.filter(data.stat_dt == stat_dt) # 过滤其他数据
    data = data.withColumn("class", udf_get_test_data(F.col("name"))) # 添加一列
    data = data.withColunm("price_sum", F.sum("price")).over(Window.partition("cust_id")) # 算出每个客户的总价值,使用groupBy会丢失数据
    data = data.withColumn("index", F.row_number().over(Window.partitionBy("cust_id").orderBy("price"))) # 未每个客户按照价格排序
    data = data.filter(data.index = F.lit("1")) # 获取每个客户价格最低的数据
    data = data.drop("stat_dt") # 删除分区列
    data.createOrReplaceTempView("data") # 注册为临时表
    spark.sql("alter table standard.test_table_result drop if exists partition(stat_dt='%s')" %(stat_dt)) # 删除原有分区数据,支持重跑
    spark.sql("insert into standard.test_table_result partition(stat_dt='%s')" %(stat_dt)) # 插入结果数据
    spark.stop() # 停止程序
    
    
    展开全文
  • 大数据软件栈

    千次阅读 2013-12-10 22:50:42
    Hadoop软件栈 建议使用CDH5( http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH-Version-and-Packaging-Information/cdhvd_topic_3.html ) 必需组件Hadoop 2.0、...
  • 2014 Spark

    2015-05-14 15:06:09
    spark软件栈,是一个结构上统一,功能多元化的软件栈。大数据技术说明life is short,your need spark!
  • Spark

    2016-11-12 15:16:25
    Spark
  • Spark(一)Spark概述

    2019-01-03 15:02:03
    1.3、Spark软件栈 1.4、Spark与Hadoop 1.5、运行流程及特点 1.6、常用术语 1.7、Spark的适用场景 1、Spark概述 Spark官网关于Spark2.2.0需要以下条件: maven3.3.9+ Java8+ Spark2.2.0 1.1、Spark是什么? ...
  • 前言:spark 软件栈图 一,Spark Core 二,Spark SQL 三,Spark Streaming 四,MLib 前言:spark 软件栈图 一,Spark Core Spark Core 实现了 Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,430
精华内容 2,172
关键字:

在spark的软件栈中