精华内容
下载资源
问答
  • spark期末大作业

    2021-06-09 14:44:02
    spark期末大作业 spark介绍* Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之...

    spark期末大作业

    spark介绍*

    Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL、Spark Streaming、MLLib和GraphX等组件,也就是BDAS(伯克利数据分析栈),这些组件逐渐形成大数据处理一站式解决平台。从各方面报道来看Spark抱负并非池鱼,而是希望替代Hadoop在大数据中的地位,成为大数据处理的主流标准,不过Spark还没有太多大项目的检验,离这个目标还有很大路要走。
    Spark使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集(Scala 提供一个称为 Actor 的并行模型,其中Actor通过它的收件箱来发送和接收非同步信息而不是共享数据,该方式被称为:Shared Nothing 模型)。在Spark官网上介绍,它具有运行速度快、易用性好、通用性强和随处运行等特点。

    一、需求分析

    1根据data.txt的数据分析某大学计算机系的成绩
    (1)该系总共有多少学生;
    (2)该系共开设了多少门课程;
    (3)Tom同学的总成绩平均分是多少;
    (4)求每名同学的选修的课程门数;
    (5)该系DataBase课程共有多少人选修;
    (6)各门课程的平均分是多少;
    (7)使用累加器计算共有多少人选了DataBase这门课。
    2 编写独立应用程序实现数据去重
    对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C;
    3.编写独立应用程序实现求平均值问题
    每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中

    二、操作步骤

    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/Data01.txt”)
    res = lines.map(lambda x:x.split(“,”)).map(lambda x: x[0]) //获取每行数据的第1列
    distinct_res = res.distinct() //去重操作
    distinct_res.count()//取元素总个数 //265
    在这里插入图片描述
    lines = sc.textFile(“file:///usr/local/spark/sparksqldata/Data01.txt”)
    res = lines.map(lambda x:x.split(“,”)).map(lambda x:x[1]) //获取每行数据的第2列
    distinct_res = res.distinct()//去重操作
    distinct_res.count()//取元素总个数 //8
    在这里插入图片描述
    Tom同学的总成绩平均分是多少;
    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/Data01.txt”)
    res = lines.map(lambda x:x.split(“,”)).filter(lambda x:x[0]==”Tom”) //筛选Tom同学的成绩信息
    res.foreach(print)
    score = res.map(lambda x:int(x[2])) //提取Tom同学的每门成绩,并转换为int类型
    num = res.count() //Tom同学选课门数
    sum_score = score.reduce(lambda x,y:x+y) //Tom同学的总成绩
    avg = sum_score/num // 总成绩/门数=平均分
    print(avg)
    在这里插入图片描述

    在这里插入图片描述

    (8)求每名同学的选修的课程门数;
    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/Data01.txt”)
    res = lines.map(lambda x:x.split(“,”)).map(lambda x:(x[0],1)) //学生每门课程都对应(学生姓名,1),学生有n门课程则有n个(学生姓名,1)
    each_res = res.reduceByKey(lambda x,y: x+y) //按学生姓名获取每个学生的选课总数
    each_res.foreach(print)
    在这里插入图片描述

    (9)该系DataBase课程共有多少人选修;
    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/Data01.txt”)
    res=lines.map(lambdax:x.split(“,”)).filter(lambda x:x[1]==”DataBase”)
    res.count()
    在这里插入图片描述

    (10)各门课程的平均分是多少;
    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/Data01.txt”)
    res=lines.map(lambdax:x.split(“,”)).map(lambdax:(x[1],(int(x[2]),1))) //为每门课程的分数后面新增一列1,表示1个学生选择了该课程。格式如(‘ComputerNetwork’, (44, 1))
    temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) //按课程名聚合课程总分和选课人数。格式如(‘ComputerNetwork’, (7370, 142))
    avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2)))//课程总分/选课人数 = 平均分,并利用round(x,2)保留两位小数
    avg.foreach(print)
    在这里插入图片描述

    (11)使用累加器计算共有多少人选了DataBase这门课。
    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/Data01.txt”)
    res=lines.map(lambdax:x.split(“,”)).filter(lambda x:x[1]==”DataBase”)//筛选出选了DataBase课程的数据
    accum = sc.accumulator(0) //定义一个从0开始的累加器accum
    res.foreach(lambda x:accum.add(1))//遍历res,每扫描一条数据,累加器加1
    accum.value //输出累加器的最终值
    在这里插入图片描述

    对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C
    from pyspark import SparkContext
    #初始化SparkContext
    sc = SparkContext(‘local’,‘remdup’)
    #加载两个文件A和B
    lines1 = sc.textFile(“file:///usr/local/spark/mycode/remdup/A”)
    lines2 = sc.textFile(“file:///usr/local/spark/mycode/remdup/B”)
    #合并两个文件的内容
    lines = lines1.union(lines2)
    #去重操作
    distinct_lines = lines.distinct()
    #排序操作
    res = distinct_lines.sortBy(lambda x:x)
    #将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到两个文件
    res.repartition(1).saveAsTextFile(“file:///usr/local/spark/mycode/remdup/result”)
    在这里插入图片描述

    查看结果:
    在这里插入图片描述

    每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。
    from pyspark import SparkContext
    #初始化SparkContext
    sc = SparkContext(‘local’,’ avgscore’)
    #加载三个文件Algorithm.txt、Database.txt和Python.txt
    lines1 = sc.textFile(“file:///usr/local/spark/mycode/avgscore/Algorithm.txt”)
    lines2 = sc.textFile(“file:///usr/local/spark/mycode/avgscore/Database.txt”)
    lines3 = sc.textFile(“file:///usr/local/spark/mycode/avgscore/Python.txt”)
    #合并三个文件的内容
    lines = lines1.union(lines2).union(lines3)
    #为每行数据新增一列1,方便后续统计每个学生选修的课程数目。data的数据格式为(‘小明’, (92, 1))
    data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
    #根据key也就是学生姓名合计每门课程的成绩,以及选修的课程数目。res的数据格式为(‘小明’, (269, 3))
    res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
    #利用总成绩除以选修的课程数来计算每个学生的每门课程的平均分,并利用round(x,2)保留两位小数
    result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
    #将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到三个文件
    result.repartition(1).saveAsTextFile(“file:///usr/local/spark/mycode/avgscore/result”)
    在这里插入图片描述

    经验总结
    通过这次实验我熟悉了Spark的RDD基本操作及键值对操作,还有如何使用RDD编程解决实际具体问题的方法,但是我在这次实验中也是遇到一些问题的,首先是环境配置的为日,在做第二个实验的时候就出现过这个问题:
    在这里插入图片描述

    经过查询发现是因为python是系统自带的,由于pyspark不在python的环境变量下,将资源的目录配置到python的环境变量下即可,如下图:
    在这里插入图片描述
    最后成功解决了这个问题;还有一个就是一开始对于RDD编程掌握得还不是很熟练,然后我看了之前做过的实验,再通过网络去查找一些相关的知识,最后也是做出来了,然后经过这一学期的大数据学习,我学习了很多spark的知识,虽然都是一些基础,但是打好基础自己的专业学习,以及以后的就业也是很有帮助的,最后感觉老师这一学期的淳淳教导,谢谢老师。

    展开全文
  • Spark期末大作业.docx

    2021-06-14 21:14:39
    Spark期末大作业.docx
  • Spark期末大作业

    2021-06-13 14:51:26
    2020年美国新冠肺炎疫情数据分析 本案例以2020年美国新冠肺炎疫情数据作为数据集,以Python为编程语言,使用Spark对数据进行分析,并对分析结果进行可视化。...本次作业使用的数据集来自数据网站Kagg.

    一、需求描述

    本案例以2020年美国新冠肺炎疫情数据作为数据集,以Python为编程语言,使用Spark对数据进行分析,并对分析结果进行可视化。

    本次实验环境为Linux:Ubuntu 18.04、Hadoop3.1.3、Python: 3.6、Spark: 2.4.0、Jupyter Notebook。

    本次作业使用的数据集来自数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,其中包含了美国发现首例新冠肺炎确诊病例至今(2020-05-19)的相关数据。其中,Hadoop需要配置完全并启动,pip需要更新到22.0以上版本并利用“pip3 install pandas”命令安装pandas包,python版本要更新到3.6以上并设置成默认版本。

    二、环境介绍

    在Hadoop官网下载hadoop-3.1.3.tar.gz,创建hadoop用户,安装SSH、配置SSH无密码登陆,安装Java环境,配置Hadoop单机 (非分布式),配置Hadoop伪分布式,配置完成以后运行Hadoop伪分布式实例并创建实验所需的文件夹。在Spark官网下载Spark2.4.0,安装Hadoop(伪分布式),安装JAVA JDK,安装Spark(Local模式)。python版本为3.8、Python pip版本为22.0。

    三、数据处理

    数据集来自数据网站Kaggle的美国新冠肺炎疫情数据集,该数据集以数据表us-counties.csv组织,其中包含了美国发现首例新冠肺炎确诊病例至2020-05-19的相关数据。数据包含以下字段:

    date 日期 2020/1/21;2020/1/22;etc

    county 区县(州的下一级单位) Snohomish;

    state 州 Washington

    cases 截止该日期该区县的累计确诊人数 1,2,3…

    deaths 截止该日期该区县的累计确诊人数 1,2,3…


    数据上传及上传结果查看

    Us-counties.csv文件:

     

    格式转换后的Us-counties.txt文件:

     

     

    格式转换

    原始数据集是以.csv文件组织的,为了方便spark读取生成RDD或者DataFrame,首先将us-counties.csv转换为.txt格式文件us-counties.txt。转换操作使用python实现,代码组织在toTxt.py中,具体代码

    import pandas as pd
    
    #.csv->.txt
    data = pd.read_csv('/home/hadoop/us-counties.csv')
    with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:
    for line in data.values:
    f.write((str(line[0])+'\t'+str(line[1])+'\t'
    +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

    输入命令“python3 toTxt.py”执行

    转换后的us-counties.txt文件如下:

    将文件上传至HDFS文件系统中

    然后使用如下命令把本地文件系统的“/home/hadoop/us-counties.txt”上传到HDFS文件系统中,具体路径是“/user/hadoop/us-counties.txt”。具体命令如下:

    ./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

    四、使用Spark对数据进行分析

    这里采用Python作为编程语言。

    1. 完整代码

    本部分操作的完整实验代码存放在了analyst.py中,具体如下:

    from pyspark import SparkConf,SparkContext
    from pyspark.sql import Row
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from datetime import datetime
    import pyspark.sql.functions as func
     
    def toDate(inputStr):
        newStr = ""
        if len(inputStr) == 8:
            s1 = inputStr[0:4]
            s2 = inputStr[5:6]
            s3 = inputStr[7]
            newStr = s1+"-"+"0"+s2+"-"+"0"+s3
        else:
            s1 = inputStr[0:4]
            s2 = inputStr[5:6]
            s3 = inputStr[7:]
            newStr = s1+"-"+"0"+s2+"-"+s3
        date = datetime.strptime(newStr, "%Y-%m-%d")
        return date
     
     
     
    #主程序:
    spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
     
    fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                        StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
    schema = StructType(fields)
     
    rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
    rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
     
     
    shemaUsInfo = spark.createDataFrame(rdd1,schema)
     
    shemaUsInfo.createOrReplaceTempView("usInfo")
     
    #1.计算每日的累计确诊病例数和死亡数
    df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())
     
    #列重命名
    df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
    df1.repartition(1).write.json("result1.json")                               #写入hdfs
     
    #注册为临时表供下一步使用
    df1.createOrReplaceTempView("ustotal")
     
    #2.计算每日较昨日的新增确诊病例数和死亡病例数
    df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
     
    df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")           #写入hdfs
     
    #3.统计截止5.19日 美国各州的累计确诊人数和死亡人数
    df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
     
    df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfs
     
    df3.createOrReplaceTempView("eachStateInfo")
     
    #4.找出美国确诊最多的10个州
    df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")
    df4.repartition(1).write.json("result4.json")
     
    #5.找出美国死亡最多的10个州
    df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
    df5.repartition(1).write.json("result5.json")
     
    #6.找出美国确诊最少的10个州
    df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
    df6.repartition(1).write.json("result6.json")
     
    #7.找出美国死亡最少的10个州
    df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
    df7.repartition(1).write.json("result7.json")
     
    #8.统计截止5.19全美和各州的病死率
    df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
    df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")

    执行analyst.py:

    2. 读取文件生成DataFrame

    上面已经给出了完整代码。下面我们再对代码做一些简要介绍。首先看看读取文件生成DataFrame。
    由于本实验中使用的数据为结构化数据,因此可以使用spark读取源文件生成DataFrame以方便进行后续分析实现。
    本部分代码组织在analyst.py中,读取us-counties.txt生成DataFrame的代码如下:

    1. from pyspark import SparkConf,SparkContext
      from pyspark.sql import Row
      from pyspark.sql.types import *
      from pyspark.sql import SparkSession
      from datetime import datetime
      import pyspark.sql.functions as func
       
      def toDate(inputStr):
          newStr = ""
          if len(inputStr) == 8:
              s1 = inputStr[0:4]
              s2 = inputStr[5:6]
              s3 = inputStr[7]
              newStr = s1+"-"+"0"+s2+"-"+"0"+s3
          else:
              s1 = inputStr[0:4]
              s2 = inputStr[5:6]
              s3 = inputStr[7:]
              newStr = s1+"-"+"0"+s2+"-"+s3
          date = datetime.strptime(newStr, "%Y-%m-%d")
          return date
       
       
      #主程序:
      spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
       
      fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                          StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
      schema = StructType(fields)
       
      rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
      rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))
       
      shemaUsInfo = spark.createDataFrame(rdd1,schema)
       
      shemaUsInfo.createOrReplaceTempView("usInfo")
       

       

    3.进行数据分析

    本实验主要统计以下8个指标,分别是:
    1) 统计美国截止每日的累计确诊人数和累计死亡人数。做法是以date作为分组字段,对cases和deaths字段进行汇总统计。
    2) 统计美国每日的新增确诊人数和新增死亡人数。因为新增数=今日数-昨日数,所以考虑使用自连接,连接条件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases计算该日新增。
    3) 统计截止5.19日,美国各州的累计确诊人数和死亡人数。首先筛选出5.19日的数据,然后以state作为分组字段,对cases和deaths字段进行汇总统计。
    4) 统计截止5.19日,美国确诊人数最多的十个州。对3)的结果DataFrame注册临时表,然后按确诊人数降序排列,并取前10个州。
    5) 统计截止5.19日,美国死亡人数最多的十个州。对3)的结果DataFrame注册临时表,然后按死亡人数降序排列,并取前10个州。
    6) 统计截止5.19日,美国确诊人数最少的十个州。对3)的结果DataFrame注册临时表,然后按确诊人数升序排列,并取前10个州。
    7) 统计截止5.19日,美国死亡人数最少的十个州。对3)的结果DataFrame注册临时表,然后按死亡人数升序排列,并取前10个州
    8) 统计截止5.19日,全美和各州的病死率。病死率 = 死亡数/确诊数,对3)的结果DataFrame注册临时表,然后按公式计算。
    在计算以上几个指标过程中,根据实现的简易程度,既采用了DataFrame自带的操作函数,又采用了spark sql进行操作。

    4. 结果文件

    上述Spark计算结果保存.json文件,方便后续可视化处理。由于使用Python读取HDFS文件系统不太方便,故将HDFS上结果文件转储到本地文件系统中,使用以下命令:

    ./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1

    对于result2等结果文件,使用相同命令,只需要改一下路径即可。

    五、数据可视化

    1. 可视化工具选择与代码

    选择使用python第三方库pyecharts作为可视化工具。
    在使用前,需要安装pyecharts,安装代码如下:

    pip install pyecharts

    具体可视化实现代码组织与showdata.py文件中。具体代码如下:

    from pyecharts import options as opts
    from pyecharts.charts import Bar
    from pyecharts.charts import Line
    from pyecharts.components import Table
    from pyecharts.charts import WordCloud
    from pyecharts.charts import Pie
    from pyecharts.charts import Funnel
    from pyecharts.charts import Scatter
    from pyecharts.charts import PictorialBar
    from pyecharts.options import ComponentTitleOpts
    from pyecharts.globals import SymbolType
    import json
    
    
    
    #1.画出每日的累计确诊病例数和死亡数——>双柱状图
    def drawChart_1(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
    while True:
    line = f.readline()
    if not line: # 到 EOF,返回空字符串,则终止循环
    break
    js = json.loads(line)
    date.append(str(js['date']))
    cases.append(int(js['cases']))
    deaths.append(int(js['deaths']))
    
    d = (
    Bar()
    .add_xaxis(date)
    .add_yaxis("累计确诊人数", cases, stack="stack1")
    .add_yaxis("累计死亡人数", deaths, stack="stack1")
    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数"))
    .render("/home/hadoop/result/result1/result1.html")
    )
    
    
    #2.画出每日的新增确诊病例数和死亡数——>折线图
    def drawChart_2(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
    while True:
    line = f.readline()
    if not line: # 到 EOF,返回空字符串,则终止循环
    break
    js = json.loads(line)
    date.append(str(js['date']))
    cases.append(int(js['caseIncrease']))
    deaths.append(int(js['deathIncrease']))
    
    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
    series_name="新增确诊",
    y_axis=cases,
    markpoint_opts=opts.MarkPointOpts(
    data=[
    opts.MarkPointItem(type_="max", name="最大值")
    
    ]
    ),
    markline_opts=opts.MarkLineOpts(
    data=[opts.MarkLineItem(type_="average", name="平均值")]
    ),
    )
    .set_global_opts(
    title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),
    tooltip_opts=opts.TooltipOpts(trigger="axis"),
    toolbox_opts=opts.ToolboxOpts(is_show=True),
    xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result1.html")
    )
    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
    series_name="新增死亡",
    y_axis=deaths,
    markpoint_opts=opts.MarkPointOpts(
    data=[opts.MarkPointItem(type_="max", name="最大值")]
    ),
    markline_opts=opts.MarkLineOpts(
    data=[
    opts.MarkLineItem(type_="average", name="平均值"),
    opts.MarkLineItem(symbol="none", x="90%", y="max"),
    opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),
    ]
    ),
    )
    .set_global_opts(
    title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),
    tooltip_opts=opts.TooltipOpts(trigger="axis"),
    toolbox_opts=opts.ToolboxOpts(is_show=True),
    xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result2.html")
    )
    
    
    
    
    #3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格
    def drawChart_3(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    allState = []
    with open(root, 'r') as f:
    while True:
    line = f.readline()
    if not line: # 到 EOF,返回空字符串,则终止循环
    break
    js = json.loads(line)
    row = []
    row.append(str(js['state']))
    row.append(int(js['totalCases']))
    row.append(int(js['totalDeaths']))
    row.append(float(js['deathRate']))
    allState.append(row)
    
    table = Table()
    
    headers = ["State name", "Total cases", "Total deaths", "Death rate"]
    rows = allState
    table.add(headers, rows)
    table.set_global_opts(
    title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="")
    )
    table.render("/home/hadoop/result/result3/result1.html")
    
    
    #4.画出美国确诊最多的10个州——>词云图
    def drawChart_4(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
    while True:
    line = f.readline()
    if not line: # 到 EOF,返回空字符串,则终止循环
    break
    js = json.loads(line)
    row=(str(js['state']),int(js['totalCases']))
    data.append(row)
    
    c = (
    WordCloud()
    .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10"))
    .render("/home/hadoop/result/result4/result1.html")
    )
    
    
    
    
    #5.画出美国死亡最多的10个州——>象柱状图
    def drawChart_5(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    state = []
    totalDeath = []
    with open(root, 'r') as f:
    while True:
    line = f.readline()
    if not line: # 到 EOF,返回空字符串,则终止循环
    break
    js = json.loads(line)
    state.insert(0,str(js['state']))
    totalDeath.insert(0,int(js['totalDeaths']))
    
    c = (
    PictorialBar()
    .add_xaxis(state)
    .add_yaxis(
    "",
    totalDeath,
    label_opts=opts.LabelOpts(is_show=False),
    symbol_size=18,
    symbol_repeat="fixed",
    symbol_offset=[0, 0],
    is_symbol_clip=True,
    symbol=SymbolType.ROUND_RECT,
    )
    .reversal_axis()
    .set_global_opts(
    title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),
    xaxis_opts=opts.AxisOpts(is_show=False),
    yaxis_opts=opts.AxisOpts(
    axistick_opts=opts.AxisTickOpts(is_show=False),
    axisline_opts=opts.AxisLineOpts(
    linestyle_opts=opts.LineStyleOpts(opacity=0)
    ),
    ),
    )
    .render("/home/hadoop/result/result5/result1.html")
    )
    
    
    
    #6.找出美国确诊最少的10个州——>词云图
    def drawChart_6(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
    while True:
    line = f.readline()
    if not line: # 到 EOF,返回空字符串,则终止循环
    break
    js = json.loads(line)
    row=(str(js['state']),int(js['totalCases']))
    data.append(row)
    
    c = (
    WordCloud()
    .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州"))
    .render("/home/hadoop/result/result6/result1.html")
    )
    
    
    
    
    #7.找出美国死亡最少的10个州——>漏斗图
    def drawChart_7(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
    while True:
    line = f.readline()
    if not line: # 到 EOF,返回空字符串,则终止循环
    break
    js = json.loads(line)
    data.insert(0,[str(js['state']),int(js['totalDeaths'])])
    
    c = (
    Funnel()
    .add(
    "State",
    data,
    sort_="ascending",
    label_opts=opts.LabelOpts(position="inside"),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
    .render("/home/hadoop/result/result7/result1.html")
    )
    
    
    #8.美国的病死率--->饼状图
    def drawChart_8(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    values = []
    with open(root, 'r') as f:
    while True:
    line = f.readline()
    if not line: # 到 EOF,返回空字符串,则终止循环
    break
    js = json.loads(line)
    if str(js['state'])=="USA":
    values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
    values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
    c = (
    Pie()
    .add("", values)
    .set_colors(["blcak","orange"])
    .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
    .render("/home/hadoop/result/result8/result1.html")
    )
    
    
    #可视化主程序:
    index = 1
    while index<9:
    funcStr = "drawChart_" + str(index)
    eval(funcStr)(index)
    index+=1

    2.结果图标展示

    可视化结果是.html格式的,reslut1的结果展示图保存路径为“/home/hadoop/result/result1/result1.html”,reslut2的结果展示图保存路径为“/home/hadoop/result/result2/result1.html”,其余类似递推。具体截图如下:

    (1)美国每日的累计确诊病例数和死亡数——>双柱状图

    (2)美国每日的新增确诊病例数——>折线图

    (3)截止5.19,美国各州累计确诊、死亡人数和病死率—>表格

    (4)截止5.19,美国累计确诊人数前10的州—>词云图

    (5)截止5.19,美国累计死亡人数前10的州—>象柱状图

    (6)截止5.19,美国累计确诊人数最少的10个州—>词云图

    (7)截止5.19,美国累计死亡人数最少的10个州—>漏斗图

    (8)截止5.19,美国的病死率—>饼状图

    经验总结

            通过本次实验,我们把在这一学期学的内容都使用了一遍,并且进行了融会贯通,本学期中,我们主要是对之前所学的python的内容进行巩固练习,并且对于一些细节处的修改;然后学习了shark,RDD,最后学了dataframe以及彼此之间的数据转换。

            在本次实验中,先是将数据集.csv文件进行格式转换,方便spark读取生成RDD或者DataFrame;接着将数据集上次到HDFS文件系统中,在这里我们要注意上传路径是个已存在文件夹,若无该文件夹,先创建再进行上述操作,否则可能会报错;接着使用spark对数据进行分析,并将分析结果输出,注意输出路径;记得HDFS查看文件是否输出成功;最后进行数据的可视化,在此需要安装可视化工具pyecharts对分析完成的数据进行可视化即可。

            本次实验中,我遇到过许多的问题,其中困扰我最久的是环境的搭建,python、hadoop、pyspark、pyecharts和Jupyter Notebook的安装,单个实现不难,主要是有一些彼此之间不兼容,版本或高或低,导致代码无法实现,重新安装时总会出现各种各样的问题。对此,我只能百度一一解决;然后就是数据可视化,由于对这一方面不太熟悉,因此在实现的过程中比较艰难,但好在一一克服了。

            通过了本次实验收获了良多,虽然在这个过程中经历了许多坎坷,但也认识到了自己的不足,找到了接下来努力的方向,进一步努力提升自己的技术水平。

    展开全文
  • spark期末大作业.docx

    2021-06-13 19:14:46
    作业
  • RDD编程初级实践
  • Hadoop:2.7.1以上版本,JDK:1.8以上版本,Spark:2.4.0以上版本,Python:3.6以上版本。 1、根据data.txt的数据分析某大学计算机系的成绩 (1)该系总共有多少学生; (2)该系共开设了多少门课程; (3)Tom...

    1、需求描述

    本次实验需要:系统:linux unbuntu14.04,处理器:至少需要两个处器,一个内核,内存:至少4G,硬盘空间:大小需要20GB。Hadoop:2.7.1以上版本,JDK:1.8以上版本,Spark:2.4.0以上版本,Python:3.6以上版本。

    1、根据data.txt的数据分析某大学计算机系的成绩

    (1)该系总共有多少学生;

    (2)该系共开设了多少门课程;

    (3)Tom同学的总成绩平均分是多少;

    (4)求每名同学的选修的课程门数;

    (5)该系DataBase课程共有多少人选修;

    (6)各门课程的平均分是多少;

    (7)使用累加器计算共有多少人选了DataBase这门课。

    2、编写独立应用程序实现数据去重

    对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C

    3、编写独立应用程序实现求平均值问题

    每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中

    2、环境介绍

    环境准备:

    • Hadoop下载与安装
    1. https://pan.baidu.com/share/init?surl=mUR3M2U_lbdBzyV_p85eSA

    (提取码:99bg)进入该百度云盘链接后,找到Hadoop安装文件hadoop-2.7.1.tar.gz。

    2、下载完后还需要配置必备工作才能安装hadoop。

    (1)首先创建Hadoop用户,sudo useradd -m hadoop -s /bin/bash

    (2)设置Hadoop用户密码,sudo passwd Hadoop

    (3)为Hadoop用户增加管理员权限,sudo adduser hadoop sudo

    (4)使用Hadoop用户登录后需要更新aptsudo apt-get update

    (5)安装vim, sudo apt-get install vim

    (6)安装JAVA环境

    sudo apt-get install openjdk-7-jre openjdk-7-jdk

    (7) 安装好 OpenJDK 后,需要找到相应的安装路径,这个路径是用于配置 JAVA_HOME 环境变量的。

    dpkg -L openjdk-7-jdk | grep '/bin/javac'

    (8) 接着需要配置一下 JAVA_HOME 环境变量,为方便,我们在 ~/.bashrc 中进行设置, sudo  vim ~/.bashrc

    (9) 在文件最前面添加如下单独一行(注意 = 号前后不能有空格),将“JDK安装路径”改为上述命令得到的路径,并保存:

    (10)刷新环境变量,source ~/.bashrc

    (11)安装Hadoop,我们选择将 Hadoop 安装至 /usr/local/ 中

    sudo tar -zxf ~/下载/hadoop-2.6.0.tar.gz -C /usr/local

     # 解压到/usr/local中

    cd /usr/local/

    sudo mv ./hadoop-2.6.0/ ./hadoop    # 将文件夹名改为hadoop

    sudo chown -R hadoop ./hadoop       # 修改文件权限

    (12) Hadoop 解压后即可使用。输入如下命令来检查 Hadoop 是否可用,成功则会显示 Hadoop 版本信息:

    cd /usr/local/hadoop

    ./bin/hadoop version

    • Spark的下载安装

    (1)Spark官方下载地址:http://spark.apache.org/downloads.html

    (2) 这里是Local模式(单机模式)的 Spark安装。我们选择Spark 1.6.2版本,并且假设当前使用用户名hadoop登录了Linux操作系统。

    sudo tar -zxf ~/下载/spark-1.6.2-bin-without-hadoop.tgz -C /usr/local/  #解压

    cd /usr/local

    sudo mv ./spark-1.6.2-bin-without-hadoop/ ./spark #移动文件

    sudo chown -R hadoop:hadoop ./spark  #此处的hadoop 为你的用户名授权

    (3) 安装后,还需要修改Spark的配置文件spark-env.sh

    cd /usr/local/spark

    cp ./conf/spark-env.sh.template ./conf/spark-env.sh

    (4) 编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

    有了上面的配置信息以后,Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据。如果没有配置上面信息,Spark就只能读写本地数据,无法读写HDFS数据。配置完成后就可以直接使用,不需要像Hadoop运行启动命令。

    3、数据来源描述

    数据来源于期末大作业素材,分别是A.txt,Algorithm.txt,B.txt,data.txt,Database.txt,Python.txt六个文件,由于学校系统不支持双向复制粘贴(支持双向复制粘贴跳过),所以在windows系统中使用FileZilla软件传输到virtualbox虚拟机上,具体步骤如下:

    1. 在虚拟机网络设置为桥接模式,并打开虚拟机终端输入ifconfig,查看本地ip并复制,如无请刷新网络重试。
    2. 打开FileZilla,打开文件,站点管理器,新建站点,主机输入刚刚复制的ip地址,用户名输入hadoop,密码输入你虚拟机hadoop用户下的密码,点击连接。

    连接成功便可以传输文件了。

    4、数据上传及上传结果查看

     

    5、数据处理过程描述

    pyspark交互式编程

    1、该系总共有多少学生:

    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/data.txt”)//获取data.txt文件
    res = lines.map(lambda x:x.split(“,”)).map(lambda x: x[0]) //获取每行数据的第1列
    sum = res.distinct()// distinct去重
    sum.count()//取元素总个数265
    

    2、该系共开设了多少门课程;

    lines = sc.textFile(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
    res = lines.map(lambda x:x.split(“,”)).map(lambda x:x[1]) //获取每行数据的第2列
    dis_res = res.distinct()//distinct去重
    dis_res.count()//取元素总个数8
    

    3、Tom同学的总成绩平均分是多少;

    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
    res = lines.map(lambda x:x.split(“,”)).filter(lambda x:x[0]==”Tom”) //筛选出Tom同学的成绩信息
    res.foreach(print)//循环输出
    

    score = res.map(lambda x:int(x[2]))//提取Tom同学的每门成绩,并转换为int类型
    num = res.count() //Tom同学选课门数
    sum_score = score.reduce(lambda x,y:x+y) //Tom同学的总成绩
    avg = sum_score/num // 总成绩/门数=平均分
    print(avg)//输出平均分
    

    4、求每名同学的选修的课程门数;

    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
    res = lines.map(lambda x:x.split(“,”)).map(lambda x:(x[0],1)) //学生每门课程都对应(学生姓名,1),学生有n门课程则有n个(学生姓名,1)
    each_res = res.reduceByKey(lambda x,y: x+y) //按学生姓名获取每个学生的选课总数
    each_res.foreach(print)//循环输出
    

    5、该系DataBase课程共有多少人选修;

    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
    res=lines.map(lambdax:x.split(“,”)).filter(lambda x:x[1]==”DataBase”)
    res.count()//使用count统计
    

    6、各门课程的平均分是多少;

    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/data.txt”) //获取Data.txt文件
    res=lines.map(lambdax:x.split(“,”)).map(lambdax:(x[1],(int(x[2]),1))) //为每门课程的分数后面新增一列1,表示1个学生选择了该课程。
    temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) //按课程名聚合课程总分和选课人数。格式如(‘ComputerNetwork’, (7370, 142))
    avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2)))//课程总分/选课人数 = 平均分,并利用round(x,2)保留两位小数
    avg.foreach(print)//循环输出
    

     

    7、使用累加器计算共有多少人选了DataBase这门课。

    lines=sc.textFile(“file:///usr/local/spark/sparksqldata/data.txt”) //获取data.txt文件
    res=lines.map(lambdax:x.split(“,”)).filter(lambda x:x[1]==”DataBase”)//筛选出选了DataBase课程的数据
    accum = sc.accumulator(0) //定义一个从0开始的累加器accum
    res.foreach(lambda x:accum.add(1))//遍历res,每扫描一条数据,累加器加1
    accum.value //输出累加器的最终值1764
    

     

    编写独立应用程序实现数据去重

    1、  导入SparkContext包

    2、  初始化SparkContext

    3、  加载两个文件A和B

    4、  使用union合并两个文件的内容

    5、  使用distinct去重操作

    6、  使用sortBy排序操作

    7、  将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到两个文件

    from pyspark import SparkContext
    sc=SparkContext('local','sparksqldata')
    lines1 = sc.textFile("file:///usr/local/spark/sparksqldata/A.txt")
    lines2 = sc.textFile("file:///usr/local/spark/sparksqldata/B.txt")
    lines = lines1.union(lines2)
    dis_lines=lines.distinct()
    res = dis_lines.sortBy(lambda x:x)
    res.repartition(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/result")
    

    总共500行数据,截图截至第一页。

    编写独立应用程序实现求平均值问题

    from pyspark import SparkContext
    sc = SparkContext("local","sparksqldata")
    lines1 = sc.textFile("file:///usr/local/spark/sparksqldata/Algorithm.txt")
    lines2 = sc.textFile("file:///usr/local/spark/sparksqldata/Database.txt")
    lines3 = sc.textFile("file:///usr/local/spark/sparksqldata/Python.txt")
    lines = lines1.union(lines2).union(lines3)
    data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
    res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
    data = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
    data.repartition(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/result1")
    

    6、经验总结

    Spark是一个基于内存的快速的用于大规模数据处理的统一分析引擎。Spark有容错、并行的特性。spark发展迅猛,框架比hadoop更加灵活实用。减少了延时处理,提高性能效率实用灵活性,也可以与hadoop切实相互结合。RDD(弹性分布式数据集)是Spark的核心数据模型,也是一个抽象的元素集合,包含有数据。弹性体现在RDD的数据默认情况下是存储在内存中的,如果内存中存储不下,spark会自动将RDD中的数据写入到磁盘中。

    经过这次期末大作业加深了对pyspark的印象,实验中使用编程了计算数据,首先先创建RDD,然后使用Map方法拆分每行记录,取出每行的第某个元素,然后使用方法实现结果。count方法来计算总个数,distinct方法去除重复数据,round方法保留小数等等。还有许多方法今后仍需继续学习,能够达到灵活运用的程度。

    在这次大作业中发现,对spark和RDD编程还有许多不足,对RDD的理解不够深刻,对代码实际运用还有很多不足的地方,在今后的学习中,仍需要努力学习。

     

     

     

    参考文献

    [1]Hadoop3.1.3安装教程_单机/伪分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04)_厦大数据库实验室博客[J/OL]. http://dblab.xmu.edu.cn/blog/2441-2/.
    [2] Spark安装和编程实践(Spark2.4.0)[J/OL]. http://dblab.xmu.edu.cn/blog/2501-2/

     

     

     

    展开全文
  • Spark大数据大作业.doc

    2021-07-17 23:12:57
    基于大数据下Spark快速数据分析期末论文
  • spark期末复习题总结

    2021-01-14 10:08:20
    spark期末复习题总结
  • 总结    说明:本次大作业是基于ubuntukylin14.04(16)版本即hadoop集群(hadoop2.6.0版本)和hbase伪分布式(hbase1.1.2版本),并安装好Scala2.11.8、Spark2.1.0、sbt0.13.15和Scala IDE for eclipse4.7.0,并...


       说明:本次大作业是基于ubuntukylin14.04(16)版本即hadoop集群(hadoop2.6.0版本)和hbase伪分布式(hbase1.1.2版本),并安装好Scala2.11.8、Spark2.1.0、sbt0.13.15和Scala IDE for eclipse4.7.0,并完成了sbt eclipse插件的全局安装,在实验报告1至3中均有详细步骤,在此不再描述。

    任务1:求top值程序个性化(30分)

      基于ubuntukylin14.04(16)版本,完成教材141页5.4.1节求top值程序个性化。相关代码和数据参考http://dblab.xmu.edu.cn/blog/1632-2/

     1. 创建工作项目mcf14gzxm

      (1)在eclipse工作目录(本人/home/mcf14/gongzuomulu)中创建工作项目mcf14gzxm。
    在这里插入图片描述
      (2)在gongzuomulu目录中输入mkdir -p mcf14gzxm/src/main/scala命令创建scala目录存放spark应用程序。
    在这里插入图片描述
      (3)在gongzuomulu目录中输入vim mcf14gzxm/build.sbt命令创建包含sbt打包配置信息的build.sbt文件,并在其中添加以下信息后保存退出。

    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
    

    在这里插入图片描述
    在这里插入图片描述
      (4)在mcf14gzxm目录中输入mkdir project命令创建project目录并进入,然后输入vim build.properties命令创建包含程序配置信息的build.properties文件并打开,在其中添加sbt的版本信息后保存退出:

    sbt.version=0.13.15
    

    在这里插入图片描述
    在这里插入图片描述

     2. 创建eclipse应用程序

      在程序主目录即/home/mcf14/gongzuomulu/mcf14gzxm中输入sbt eclipse命令创建eclipse应用程序,如下所示即为成功。
    在这里插入图片描述

     3. 导入mcf14gzxm项目

      在终端输入eclipse命令打开eclipse,在eclipse界面右击左侧打开快捷菜单,然后点击import…,在Select an import wizard:中搜索Existing Projects into Workspace并选中,然后点击Next,点击Browse…找到刚才创建的工作项目mcf14gzxm(即/home/mcf14/gongzuomulu/mcf14gzxm)然后点击Finish就能导入mcf14gzxm项目了。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
      注:此时要注意eclipse中Scala的版本问题!如果mcf14gzxm项目有红叉,那就说明Scala版本出错,解决方法为:右击mcf14gzxm项目→Properties→Scala Compiler→选中Use Project Settings→在Scala Installation中选中与安装好的Scala一致的版本→Apply and Close,此时就不会出错了。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

     4. 创建数据样本文件mcf14TopN.txt

      打开终端在用户主目录下创建数据样本文件TopN.txt(注意里边不能有空格)。
    在这里插入图片描述

     5. 创建程序代码文件mcf14TopN.scala

      在mcf14gzxm项目中的src/main/scala目录上右击,选择New→Package→Name为mcf14TopN→Finish来创建mcf14TopN包,并右击此包选择New→Scala Object→Name为mcf14TopN.mcf14TopN→Finish来创建mcf14TopN.scala代码文件,并在其中输入代码(见文末),然后运行即可。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    任务2:文件排序程序个性化(30分)

      基于ubuntukylin14.04(16)版本,完成教材143页5.4.2节文件排序程序个性化。相关代码和数据参考http://dblab.xmu.edu.cn/blog/1632-2/

     1. 创建数据样本目录mcf14file及文件mcf14file1.txt、mcf14file2.txt、mcf14file3.txt

      在用户主目录下创建目录mcf14file并在其中创建三个输入文件mcf14file1.txt、mcf14file2.txt、mcf14file3.txt,在每个输入文件中分行输入四个数字。
    在这里插入图片描述

     2. 创建程序代码文件mcf14FileSort.scala

      在mcf14gzxm项目中创建mcf14file包和mcf14FileSort.scala代码文件(见文末)并运行,成功后就会在输出目录输出结果。
    在这里插入图片描述
    在这里插入图片描述

    任务3:二次排序程序个性化(30分)

      基于ubuntukylin14.04(16)版本,完成教材144页5.4.3节二次排序程序个性化。相关代码和数据参考http://dblab.xmu.edu.cn/blog/1632-2/

     1. 创建数据样本文件mcf14SecondarySort.txt

      在用户主目录下创建mcf14SecondarySort.txt文件,在其中的每行输入两个数字并用空格隔开,输入若干行。
    在这里插入图片描述

     2. 创建程序代码文件mcf14SecondarySortApp.scala和mcf14SecondarySortKey.scala

      在mcf14gzxm项目中创建SecondarySort包和mcf14SecondarySortApp.scala代码文件和mcf14SecondarySortKey.scala代码文件(见文末)并运行,成功后就会在控制台输出结果。
    在这里插入图片描述

    开发总结(10分)

     1. 关于eclipse导入项目后出现红叉的问题

      原因:eclipse中Scala的版本与安装的Scala版本不一致。
      解决方法:右击项目→Properties→Scala Compiler→选中Use Project Settings→在Scala Installation中选中与安装好的Scala一致的版本→Apply and Close。

     2. 关于A master URL must be set in your configuration的问题

      原因:未指定或设置Spark的运行模式。
      解决办法1:在代码中的setAppName("(代码名)")后边加上.setMaster(“local”)
      解决办法2:点击运行按钮边的三角形→Run Configurations…→Scala Application→与代码对应的应用程序→Arguments→在VM arguments中输入-Dspark.master=local

     3. 关于Input path does not exist的问题

      原因:输入目录不存在或路径设置错误。
      解决方法:查看目录是否存在,查看代码中路径是否正确。

     4. 关于Output directory file的问题

      原因:代码中设置的输出目录已存在。
      解决办法:将设置的输出目录删除或修改目录名。

     5. 关于运行代码时弹出run configurations窗口的问题

      原因:代码运行时未指定主类型。
      解决办法:run configurations窗口左侧双击Scala Application→Name为“代码名字+$”→Main class为“包名.代码名”(若有多个包则都应加上)→Apply→Run

     6. 关于错误: 找不到或无法加载主类的问题

      原因:主类设置错误或未设置。
      解决办法:参考“5. 关于运行代码时弹出run configurations窗口的问题”。

     7. 总结

      本次大作业是三个程序个性化任务,大作业其实也是程序试验的内容,在做实验的过程中,程序的运行基本上采用了local本地模式,因为条件有限,只部署了一个X1主节点和一个X2副节点,但是同样能够满足基本的实验要求。做实验的难点就在于将所需要的全部软件安装好并配置好,形成一个完整的大数据计算平台,切不可因为一时的困难而放弃。
      坚持是很有必要的,特别是在遇到运行程序时出错的问题,然而,坚持中的方向有时候更为重要。有可能为了解决这个问题,不知不觉已经过去了好几个小时,甚至有时候两三天都无法解决这个问题,那么此时就应该去请教老师了,或许在老师的指导下会“柳暗花明又一村”。
      在动手实践的过程中,要充分的认识到自己实践的重要性。别人动手做实验是学不到自己身上的,自己动手做实验学会的东西,那是永远也丢不掉的。Spark作为大数据计算平台的后起之秀,相比于Hadoop在许多方面都有了很大的进步,Hadoop是数据向计算靠拢,而Spark是计算向数据靠拢,因此Spark仅在数据处理效率上就有了很大的提升,其他的像多种数据集操作、内存计算、基于DAG的任务调度执行机制、多种高层次又简洁的API等比Hadoop更为强大。相比于单个使用,将Hadoop和Spark相结合统一部署更能在实际的生产环境中发挥最大的价值。因此,深刻地把握Hadoop和Spark的运行机制、工作原理更能够解决问题,取得收获。
      做实验,出现问题解决问题,有时候真的比顺顺利利的做完更能学会知识。因为在遇到程序出错的时候,就得上网上搜索查看很多相关的内容。在出现问题→搜索问题→探究问题→解决问题这个过程中,无疑让我学会了很多。相比于纯书本理论学习,没有实际动手进行实验操作是比较难学会真正的知识的,特别是在现在的这个互联网大数据时代,各个方面都离不开互联网和数据。个人认为,先整体把握课本的内容架构,然后再亲自动手实践,将理论与实际相结合,用理论指导实践,用实践去检验理论,这样,就能不断的提高自己的认知水平,进一步提高自己的专业本领。


    mcf14TopN.scala代码:

    import org.apache.spark.{SparkConf, SparkContext}
    object mcf14TopN {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("mcf14TopN").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
        val lines = sc.textFile("file:///home/mcf14/mcf14TopN.txt",2)//本地测试数据文件路径
        var num = 0;
        val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))
          .map(_.split(",")(2))
          .map(x => (x.toInt,""))
          .sortByKey(false)
          .map(x => x._1).take(5)
          .foreach(x => {
            num = num + 1
            println(num + "\t" + x)
          })
      }
    }
    

    mcf14FileSort.scala代码:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.HashPartitioner
    object mcf14FileSort {
        def main(args: Array[String]) {
            val conf = new SparkConf().setAppName("mcf14FileSort").setMaster("local")
            val sc = new SparkContext(conf)
            val dataFile = "file:///home/mcf14/mcf14file"//测试文件输入目录
            val lines = sc.textFile(dataFile,3)
            var index = 0
            val result = lines.filter(_.trim().length>0).map(n=>(n.trim.toInt,"")).partitionBy(new HashPartitioner(1)).sortByKey().map(t => {
             index += 1
                (index,t._1)
            })
            result.saveAsTextFile("file:///home/mcf14/fileoutput")//本地结果输出目录
        }
    }
    

    mcf14SecondarySortApp.scala代码:

    package cn.edu.xmu.spark
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    object mcf14SecondarySortApp {
      def main(args:Array[String]){
         val conf = new SparkConf().setAppName("mcf14SecondarySortApp").setMaster("local")
           val sc = new SparkContext(conf)
           val lines = sc.textFile("file:///home/mcf14/mcf14SecondarySort.txt", 1)//本地文件输入目录
           val pairWithSortKey = lines.map(line=>(new mcf14SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line))
           val sorted = pairWithSortKey.sortByKey(false)
           val sortedResult = sorted.map(sortedLine =>sortedLine._2)
           sortedResult.collect().foreach (println)
      }
    }
    

    mcf14SecondarySortKey.scala代码:

    package cn.edu.xmu.spark
    class mcf14SecondarySortKey(val first:Int,val second:Int) extends Ordered[mcf14SecondarySortKey] with Serializable {
    def compare(other:mcf14SecondarySortKey):Int = {
        if (this.first - other.first !=0) {
             this.first - other.first
        } else {
          this.second - other.second
        }
      }
    }
    
    展开全文
  • 大数据导论期末大作业

    千次阅读 2020-06-19 21:05:13
    利用spark对hive中的数据进行分析,提取疫情前十的洲 将分析结果可视化 一.对美国的疫情数据的爬取 这里利用新浪网的数据,网页链接是http://t.cn/A67OCJyZ 首先创造一个data.txt文件用来存储获取的数据 touch data...
  • RDD编程初级实践一、实践目的二、实践设备三、实践内容四、操作步骤1.实践文件准备2.pyspark交互式编程3.编写独立应用程序实现数据去重4....本作业提供分析数据data.txt,该数据集包含了某大学计算机系的成绩,数
  • 文章目录1.pyspark交互式编程(1)该系总共有多少学生;(2)该系共开设了多少门课程;...假设当前目录为/usr/local/spark/mycode/remdup,在当前目录下新建一个remdup.py文件,复制下面代码;3.在目录/usr/local/

空空如也

空空如也

1 2 3 4 5 ... 7
收藏数 136
精华内容 54
关键字:

spark期末大作业