精华内容
下载资源
问答
  • 2020-09-15 15:10:26

      Apache Spark在实际应用中迅速获得发展。加州大学伯克利分校的AMPLab于2009年开发了Spark,并于2010年将其开源。从那时起,它已发展成为大数据领域最大的开源社区之一,拥有来自50多个组织的200多位贡献者。这个开放源代码分析引擎以比MapReduce更快的速度处理大量数据而出类拔萃,因为数据被持久存储在Spark自己的处理框架中。

      在考虑Hadoop生态系统中的各种引擎时,重要的是要了解每个引擎在某些用例下效果最佳,并且企业可能需要使用多种工具组合才能满足每个所需的用例。话虽如此,这里是对Apache Spark的一些顶级用例的回顾。

      一、流数据

      Apache Spark的关键用例是其处理流数据的能力。由于每天要处理大量数据,因此对于公司而言,实时流传输和分析数据变得至关重要。Spark Streaming具有处理这种额外工作负载的能力。一些专家甚至认为,无论哪种类型,Spark都可以成为流计算应用程序的首选平台。提出此要求的原因是,Spark Streaming统一了不同的数据处理功能,从而使开发人员可以使用单个框架来满足其所有处理需求。

      当今企业使用Spark Streaming的一般方式包括:

      1、流式ETL –在数据仓库环境中用于批处理的传统ETL(提取,转换,加载)工具必须读取数据,将其转换为数据库兼容格式,然后再将其写入目标数据库。使用Streaming ETL,在将数据推送到数据存储之前,将对其进行连续的清理和聚合。

      2、数据充实 –这种Spark Streaming功能通过将实时数据与静态数据相结合来充实实时数据,从而使组织能够进行更完整的实时数据分析。在线广告商使用数据充实功能将历史客户数据与实时客户行为数据结合起来,并根据客户的行为实时提供更多个性化和针对性的广告。

      3、触发事件检测 – Spark Streaming使组织可以检测到可能对系统内部潜在严重问题的罕见或异常行为(“触发事件”)并做出快速响应。金融机构使用触发器来检测欺诈性交易并阻止其欺诈行为。医院还使用触发器来检测潜在的危险健康变化,同时监视患者的生命体征-向正确的护理人员发送自动警报,然后他们可以立即采取适当的措施。

      4、复杂的会话分析 –使用Spark Streaming,与实时会话有关的事件(例如登录网站或应用程序后的用户活动)可以组合在一起并进行快速分析。会话信息还可以用于不断更新机器学习模型。诸如Netflix之类的公司使用此功能可立即了解用户在其网站上的参与方式,并提供更多实时电影推荐。

      

    大数据分析

     

      二、机器学习

      许多Apache Spark用例中的另一个是它的机器学习功能。

      Spark带有用于执行高级分析的集成框架,该框架可帮助用户对数据集进行重复查询,这从本质上讲就是处理机器学习算法。在此框架中找到的组件包括Spark的可扩展机器学习库(MLlib)。MLlib可以在诸如聚类,分类和降维等领域中工作。所有这些使Spark可以用于一些非常常见的大数据功能,例如预测智能,用于营销目的的客户细分以及情感分析。使用推荐引擎的公司将发现Spark可以快速完成工作。

      网络安全是Spark 机器学习功能的一个很好的商业案例。通过使用Spark堆栈的各种组件,安全提供程序可以对数据包进行实时检查,以发现恶意活动的痕迹。在前端,Spark Streaming允许安全分析人员在将数据包传递到存储平台之前检查已知威胁。到达存储区后,数据包将通过其他堆栈组件(例如MLlib)进行进一步分析。因此,安全提供商可以在不断发展的过程中了解新的威胁-始终领先于黑客,同时实时保护其客户。

      三、互动分析

      Spark最显着的功能之一就是其交互式分析功能。MapReduce是为处理批处理而构建的,而Hive或Pig等SQL-on-Hadoop引擎通常太慢,无法进行交互式分析。但是,Apache Spark足够快,可以执行探索性查询而无需采样。Spark还与包括SQL,R和Python在内的多种开发语言接口。通过将Spark与可视化工具结合使用,可以交互地处理和可视化复杂的数据集。

      下一版本的Apache Spark(Spark 2.0)将于今年的4月或5月首次亮相,它将具有一项新功能- 结构化流 -使用户能够对实时数据执行交互式查询。通过将实时流与其他类型的数据分析相结合,预计结构化流将通过允许用户针对Web访问者当前会话运行交互式查询来促进Web分析。它也可以用于将机器学习算法应用于实时数据。在这种情况下,将对旧数据进行算法训练,然后将其重定向以合并新的数据,并在其进入​​内存时从中学习。

      四、雾计算

      尽管大数据分析可能会引起广泛关注,但真正激发技术界想象力的概念是物联网(IoT)。物联网通过微型传感器将对象和设备嵌入在一起,这些微型传感器彼此之间以及与用户进行通信,从而创建了一个完全互连的世界。这个世界收集了大量数据,对其进行处理,并提供革命性的新功能和应用程序供人们在日常生活中使用。但是,随着物联网的扩展,对大量,种类繁多的机器和传感器数据进行大规模并行处理的需求也随之增加。但是,利用云中的当前分析功能很难管理所有这些处理。

      那就是雾计算和Apache Spark出现的地方。

      雾计算将数据处理和存储分散化,而不是在网络边缘执行这些功能。但是,雾计算为处理分散数据带来了新的复杂性,因为它越来越需要低延迟,机器学习的大规模并行处理以及极其复杂的图形分析算法。幸运的是,有了Spark Streaming等关键堆栈组件,交互式实时查询工具(Shark),机器学习库(MLib)和图形分析引擎(GraphX),Spark不仅具有雾计算解决方案的资格。实际上,随着物联网行业逐渐不可避免地融合,许多行业专家预测,与其他开源平台相比,Spark有可能成为事实上的雾基础设施。

      现实世界中的火花

      如前所述,在线广告商和诸如Netflix之类的公司正在利用Spark获得见识和竞争优势。其他也从Spark受益的著名企业是:

      Uber –这家跨国在线出租车调度公司每天都从其移动用户那里收集TB级的事件数据。通过使用Kafka,Spark Streaming和HDFS构建连续的ETL管道,Uber可以在收集原始非结构化事件数据时将其转换为结构化数据,然后将其用于进一步和更复杂的分析。

      Pinterest –通过类似的ETL管道,Pinterest可以利用Spark Streaming即时了解世界各地的用户如何与Pins互动。因此,当人们浏览站点并查看相关的图钉时,Pinterest可以提出更相关的建议,以帮助他们选择食谱,确定要购买的产品或计划前往各个目的地的行程。

      Conviva –这家流媒体视频公司每月平均约有400万个视频供稿,仅次于YouTube。Conviva使用Spark通过优化视频流和管理实时视频流量来减少客户流失,从而保持一致的流畅,高质量的观看体验。

      何时不使用Spark

      尽管它具有通用性,但这并不一定意味着Apache Spark的内存中功能最适合所有用例。更具体地说,Spark并非设计为多用户环境。Spark用户需要知道他们有权访问的内存对于数据集是否足够。添加更多的用户使此操作变得更加复杂,因为用户必须协调内存使用量才能同时运行项目。由于无法处理这种类型的并发,用户将需要为大型批处理项目考虑使用备用引擎,例如Apache Hive。

      随着时间的流逝,Apache Spark将继续发展自己的生态系统,变得比以前更加通用。在大数据已成为规范的世界中,组织将需要找到最佳方式来利用它。从这些Apache Spark用例可以看出,未来几年将有很多机会来了解Spark的真正功能。

      随着越来越多的组织认识到从批处理过渡到实时数据分析的好处,Apache Spark的定位是可以在众多行业中获得广泛而快速的采用。

     

    摘自:https://www.aaa-cg.com.cn/data/2143.html

    更多相关内容
  • 近400多万条数据,可以做大数据分析案例和练习,可以作为推荐系统的学习 字段(id, userid,age,gender,item_id, behavior_type, item_category, date, province)(序号,用户ID,性别,商品ID,用户行为,商品...
  • Spark SQL上海摩拜共享单车数据分析完整源码,使用Maven导包
  • spark高级数据分析 一书中所有的示例代码,每章都有 本书中会有附带源代码实例的字样,但是京东买的书没有附带源代码。 都在这儿了,拿走不谢。
  • 首先,实验将本地数据集上传到数据仓库Hive,然后在Hive数据仓库下进行数据分析,接着本实验将数据从Hive导入到MySQL,利用Spark预测回头客行为,最后本实验利用ECharts在eclipse IDE 下进行数据可视化分析,得出了...
  • spark音乐专辑数据分析项目代码及其数据,使用spark的scala API编写数据分析代码,使用flask编写数据可视化应用程序
  • Spark数据分析及处理(实战分析)

    千次阅读 多人点赞 2020-11-18 19:23:34
    项目需求:使用Spark完成下列日志分析项目需求: 1.日志数据清洗 2.用户留存分析 1.数据清洗 读入日志文件并转化为RDD[Row]...代码分析如下: 日志如下图片,分析日志,处理需求 //spark操作外部数据 val spark = Sp

    项目需求:使用Spark完成下列日志分析项目需求:
    1.日志数据清洗
    2.用户留存分析

    1.数据清洗

    读入日志文件并转化为RDD[Row]类型

    • 按照Tab切割数据
    • 过滤掉字段数量少于8个的

    对数据进行清洗

    • 按照第一列和第二列对数据进行去重
    • 过滤掉状态码非200
    • 过滤掉event_time为空的数据
    • 将url按照”&”以及”=”切割

    保存数据

    • 将数据写入mysql表中

    代码分析如下:
    日志如下图片,分析日志,处理需求
    在这里插入图片描述

       //spark操作外部数据
        val spark = SparkSession.builder()
          .master("local[*]")
          .appName("clear").getOrCreate()
        val sc = spark.sparkContext
        import spark.implicits._
        //将工作中的要分析的日志导入到spark中
        val linesRdd = sc.textFile("D:\\360Downloads\\scala\\test.log")
    //    println(linesRdd.count())
        //需求1:按空格切分
        val line1 = linesRdd.map(x=>x.split("\t"))
        //需求2:过滤掉字段数量少于8个的
        val rdd = line1.filter(x => x.length == 8).map(x =>Row (x(0).trim, x(1).trim //trim去除两边空格的作用
          , x(2).trim, x(3).trim,
          x(4).trim, x(5).trim,
          x(6).trim, x(7).trim))
    //    rdd.collect().foreach(println)
       //处理成数据表的形式
        val schema = StructType(Array(
          StructField("event_time", StringType),
          StructField("url", StringType),
          StructField("method", StringType),
          StructField("status", StringType),
          StructField("sip", StringType),
          StructField("user_uip", StringType),
          StructField("action_prepend", StringType),
          StructField("action_client", StringType)))
       val df1 = spark.createDataFrame(rdd,schema)
        df1.show(10,true)
    

    结果如下:
    在这里插入图片描述

    数据清洗需求:第一列和第二列对数据去重,过滤掉非200,过滤掉event_time为空的数据

     
    println("-------对数据清洗开始-----------")
      //按照第一列和第二列对数据去重,过滤掉非200,过滤掉event_time为空的数据
        val ds1 = df1.dropDuplicates("event_time", "url")
          .filter(x => x(3) == "200")
    //等同上一步骤  .filter(x => StringUtils.isNotEmpty(x(0).toString))
          .filter(x=>x(0).equals("")==false)
        ds1.show(10,true)
    

    在这里插入图片描述

    继续细化分,对url里的数据进行划分,重新生成一张新的表
    需求:将url按照”&”以及”=”切割

        val dfDetail = ds1.map(row => {
          val urlArray = row.getAs[String]("url").split("\\?")
          //      val ur2 = row(1).toString.split("\\?")
          
          var map = Map("params" -> "null")
          if (urlArray.length == 2) {
            map = urlArray(1)
              .split("&").map(x => x.split("="))
              .filter(_.length == 2).map(x => (x(0), x(1)))
              .toMap
          }
    
          (row.getAs[String]("event_time"),
            map.getOrElse("actionBegin", ""),
            map.getOrElse("actionClient", ""),
            map.getOrElse("actionEnd", ""),
            map.getOrElse("actionName", ""),
            map.getOrElse("actionTest", ""),
            map.getOrElse("actionType", ""),
            map.getOrElse("actionValue", ""),
            map.getOrElse("clientType",""),
            map.getOrElse("examType",""),
            map.getOrElse("ifEquipment",""),
            map.getOrElse("questionId", ""),
            map.getOrElse("skillIdCount", ""),
            map.getOrElse("userSID", ""),
            map.getOrElse("userUID", ""),
            map.getOrElse("userUIP", ""),
            row.getAs[String]("method"),
            row.getAs[String]("status"),
            row.getAs[String]("sip"),
            row.getAs[String]("user_uip"),
            row.getAs[String]("action_prepend"),
            row.getAs[String]("action_client")
          )
        }).toDF()
       //一定要对应上面的get后的数据
    //    dfDetail.show(1)
        val detailRdd = dfDetail.rdd
        val detailschema = StructType(Array(
          StructField("event_time", StringType),
          StructField("actionBegin", StringType),
          StructField("actionClient", StringType),
          StructField("actionEnd",  StringType),
          StructField("actionName", StringType),
          StructField("actionTest",  StringType),
          StructField("actionType",  StringType),
          StructField("actionValue",  StringType),
          StructField("clientType", StringType),
          StructField("examType",  StringType),
          StructField("ifEquipment", StringType),
          StructField("questionId",  StringType),
          StructField("skillIdCount",  StringType),
          StructField("userSID", StringType),
          StructField("userUID",  StringType),
          StructField("userUIP", StringType),
          StructField("method", StringType),
          StructField("status", StringType),
          StructField("sip", StringType),
          StructField("user_uip", StringType),
          StructField("action_prepend", StringType),
          StructField("action_client", StringType)))
      
    
        val dforgDF = spark.createDataFrame(detailRdd,detailschema)
        dforgDF.show(3,false)
    

    结果如下:
    在这里插入图片描述

    需求:将数据写入mysql表中
    连接mysql数据库,写入数据

      val url="jdbc:mysql://192.168.195.20:3306/kb09db"
        val prop=new Properties()
        prop.setProperty("user","root")
        prop.setProperty("password","ok")
        prop.setProperty("driver","com.mysql.jdbc.Driver")
        prop.setProperty("url",url)
         println("开始写入MySQL")
        dforgDF.write.mode("overwrite").jdbc(url,"logDetail",prop)
        println("写入MySQL结束")
    

    运行结束,登录MySQL,查看数据库的表是否已写入

    需求分析:求当天新增用户总数n
    在这里插入图片描述

    注:
    日志分析内容如下:(截取其中一篇日志)
    event_time: 2018-09-04T20:27:31+08:00
    url:http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451617&actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36&actionEnd=1536150451705&actionName=viewQuestionAnalysis&actionTest=0&actionType=3&actionValue=272878&clientType=001_bdqn&examType=001&ifEquipment=web&questionId=32415&skillIdCount=0&userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3&userUID=272878&userUIP=117.152.82.106
    method: GET
    status: 200
    sip: 192.168.168.63
    user_uip : -
    action_prepend : -
    Apache-HttpClient/4.1.2
    action_client: (java 1.5)

    url:
    http://datacenter.bdqn.cn/logs/user?
    actionBegin=1536150451617
    actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36
    actionEnd=1536150451705
    actionName=viewQuestionAnalysis
    actionTest=0
    actionType=3
    actionValue=272878
    clientType=001_bdqn
    examType=001
    ifEquipment=web
    questionId=32415
    skillIdCount=0
    userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3
    userUID=272878
    userUIP=117.152.82.106

    actionBegin
    actionClient
    actionEnd
    actionName
    actionTest
    actionType
    actionValue
    clientType
    examType
    ifEquipment
    questionId
    skillIdCount
    userSID
    userUID
    userUIP

    展开全文
  • Spark数据分析案例之平均心率检测[2021]

    千次阅读 多人点赞 2021-06-17 08:21:19
    本文是基于林子雨老师的博客完成的一... 整体流程是模拟平均心率信息,然后发送给Kafka,接下来Spark Streaming再接收进行处理,将其写入MySQL数据库。Web通过间隔若干时间查询某个时间段内的心跳,并对其进行可视化。

           本文是基于林子雨老师的博客完成的一次课程设计,侧重于关注在实际操作中的一些问题的解决和各种系统软件的使用安装
           整体流程是模拟平均心率信息,然后发送给Kafka,接下来Spark Streaming再接收进行处理,将其写入MySQL数据库。Web通过间隔若干时间查询某个时间段内的心跳,并对其进行可视化。
    使用的版本如下图:
    在这里插入图片描述

    参考博客链接

    摘要

           平均心率检测以《Spark+Kafka构建实时分析Dashboard案例介绍》为基础,在linux下,模拟数据生成,数据预处理、消息队列发送和接收消息、数据实时处理、数据实时推送和实时展示等数据处理全流程,所涉及的各种典型操作涵盖Linux、Spark、Kafka、JAVA、MySQL、Maven、sbt等系统和软件的安装和使用方法。

    关键词:Spark,Kafka,JAVA,MySQL,;

    1 系统与软件简介

    1.1 Linux系统

           linux系统是一个开源、免费的操作系统,其稳定性、安全性、处理多并发已经得到 业界的认可,目前很多企业级的项目都会部署到Linux/unix系统上,是一个基于POSIX的多用户、多任务、支持多线程和多CPU的操作系统。它能运行主要的Unix工具软件、应用程序和网络协议。它支持32位和64位硬件。Linux继承了Unix以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统。

    1.2 Spark

           Spark是一种快速、通用、可扩展的大数据分析引擎。Spark是基于内存计算的大数据并行计算框架。除了扩展了广泛使用的 MapReduce 计算模型,而且高效地支持更多计算模式,包括交互式查询和流处理。Spark 适用于各种各样原先需要多种不同的分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理。通过在一个统一的框架下支持这些不同的计算,Spark 使我们可以简单而低耗地把各种处理流程整合在一起。

    1.3 Kafka

           Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写。

    1.4 MySQL

           MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS (Relational Database Management System,关系数据库管理系统) 应用软件之一。MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL所使用的 SQL 语言是用于访问数据库的最常用标准化语言。

    1.5 Maven

            Maven是一个项目管理和综合工具。Maven提供了开发人员构建一个完整的生命周期框架。开发团队可以自动完成项目的基础工具建设,Maven使用标准的目录结构和默认构建生命周期。它是一个Apache的开源项目,主要服务于基于Java平台的项目构建、依赖管理和项目信息管理。

    2 平均心率问题

    2.1 题目要求

           基于linux系统,运用Spark、Kafka、JAVA、MySQL、Maven、sbt等软件,首先我们构建了一个生产者,声明了主题和值及其他相关信息,运行producer应用程序随机产生代表正常人的心跳速率数据,并实时监测平均心率,发送给Kafka,接下来Spark Streaming再接收进行处理,将其写入MySQL数据库。Web通过间隔若干时间查询某个时间段内的心跳,并对其平均心率进行可视化。

    2.2 题目流程

    1、安装Ubuntu,Hadoop,JDK,Maven,Spark,MySql。其版本信息如下:

    在这里插入图片描述

    2、对代码进行编译:
    在这里插入图片描述

    3、开启Kafka,得到如图:

    在这里插入图片描述

    4、开启Spark streaming服务并且它会从Kafka主题中处理数据到MySQL,得到如图:
    在这里插入图片描述

    5、开启kafka producer,并且它会将事件写入kafka主题中,得到如图:
    在这里插入图片描述

    6、开启web服务器,如此可以观察dashboard,得到如图:

    在这里插入图片描述

    7、得到结果如图:

    在这里插入图片描述

    2.3 问题解决

    2.3.1 通过streaming操作kafka获取数据后,将数据写入MySQL数据库。我们可以使用如下代码创建数据库和表:
    在这里插入图片描述
    运行后发生错误如图所示:
    在这里插入图片描述
    解决方法:对代码进行修改可解决错误,代码如下:

    ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLECT=utf8_bin;
    

    其中

    1. ENGINE=InnoDB :表示使用innodb引擎,
    2. DEFAULT CHARSET=utf8:表示数据库默认编码为utf-8
    3. COLLECT=utf8_bin:表示数据库校对规则,以二进制比较,也就是区分大小写.

    2.3.2 我们通过maven进行Spark项目编译执行,进行如下操作:
    在这里插入图片描述
    运行后发生错误如图所示:
    在这里插入图片描述
    解决方法:编辑当前用户.bashrc文件,加上如下指令:
    alias sudo=“sudo env PATH=$PATH”
    或者去掉sudo,只保留/usr/local/maven/bin/mvn package -DskipTests
    在这里插入图片描述
    运行后发生错误如图所示:
    在这里插入图片描述
    解决方法:通过报错信息可知这是因为JVM申请的memory不够导致无法启动sparkcontext。进行相关信息的查询可知如下表:
    在这里插入图片描述
    由此可知修改命令如下:
    java -Xms256m -Xmx1024m -Dconfig=./config/common.conf -jar streaming/target/spark-streaming-0.1.jar

    2.3.4 在打开网页可视化的时候,只显示坐标轴,无显示数据,其报错如下:
    在这里插入图片描述
    解决方法 :从错误提示中可以知道是数据库不存在,最终发现在common.conf配置文件中数据库名称与我们创建的数据库名称不相同,如图所示:
    在这里插入图片描述
    改变数据库名称与配置文件中名称一致即可。

    总结

    如有错误,欢迎指出。

    在这里插入图片描述

    展开全文
  • Spark数据项目实例

    千次阅读 2020-11-19 17:23:42
    使用Spark完成下列日志分析项目需求: 日志数据清洗 用户留存分析 活跃用户分析 活跃用户地域信息分析 用户浏览深度分析 数据清洗 1)读入日志文件并转化为RDD[Row]类型 按照Tab其人格数据 过滤掉字段数量少于8个的 ...

    源数据:https://pan.baidu.com/s/1rzEwBfR1m_lpZHekuEFnCg
    提取码:tgpf
    源码:https://pan.baidu.com/s/1mKiImFn3OePf5Jm8PIUi3Q
    提取码:vxb7

    使用Spark完成下列日志分析项目需求:

    日志数据清洗
    用户留存分析
    活跃用户分析
    活跃用户地域信息分析
    用户浏览深度分析

    数据清洗
    1)读入日志文件并转化为RDD[Row]类型
    按照Tab其人格数据
    过滤掉字段数量少于8个的

    val line1 = linesRDD.map(x => x.split("\t"))
    val rdd = line1.filter(x => x.length == 8).map(x => Row(x(0).trim, x(1).trim, x(2).trim, x(3).trim, x(4).trim, x(5).trim, x(6).trim, x(7).trim))
    

    2)对数据进行清洗
    按照第一列和第二列对数据进行去重
    过滤掉状态码非200
    过滤掉event_time为空的数据
    将url按照"&“以及”="切割

     val schema = StructType(
          Array(
            StructField("event_time", StringType),
            StructField("url", StringType),
            StructField("method", StringType),
            StructField("status", StringType),
            StructField("sip", StringType),
            StructField("user_uip", StringType),
            StructField("action_prepend", StringType),
            StructField("action_client", StringType)
          )
        )
        val orgDF = spark.createDataFrame(rdd, schema)
    //    df1.printSchema()
    //    df1.show(10,false)
    
        //按照第一列和第二列对数据数据去重,过滤掉状态码非200,过滤掉event_time为空的数据
        val ds1 = orgDF.dropDuplicates("event_time", "url")
          .filter(x => x(3) == "200")
          .filter(x => StringUtils.isNotEmpty(x(0).toString))
    
    //    ds1.show(10,false)
        //将url按照"&"以及"="切割,即按照userUID
    	//userSID
    	//userUIP
    	//actionClient
    	//actionBegin
    	//actionEnd
    	//actionType
    	//actionPrepend
    	//actionTest
    	//ifEquipment
    	//actionName
    	//id
    	//progress进行切割
        val dfDetail = ds1.map(row => {
          val urlArray = row.getAs[String]("url").split("\\?")
          //      val urlArray2 = row(1).toString.split("\\?")
          var map = Map("params" -> "null")
          if (urlArray.length == 2) {
            map = urlArray(1)
              .split("&").map(x => x.split("="))
              .filter(_.length == 2)
              .map(x => (x(0), x(1)))
              .toMap
          }
          (row.getAs[String]("event_time"),
            row.getAs[String]("user_uip"),
            row.getAs[String]("method"),
            row.getAs[String]("status"),
            row.getAs[String]("sip"),
            map.getOrElse("actionBegin", ""),
            map.getOrElse("actionEnd", ""),
            map.getOrElse("userUID", ""),
            map.getOrElse("userSID", ""),
            map.getOrElse("userUIP", ""),
            map.getOrElse("actionClient", ""),
            map.getOrElse("actionType", ""),
            map.getOrElse("actionPrepend", ""),
            map.getOrElse("actionTest", ""),
            map.getOrElse("ifEquipment", ""),
            map.getOrElse("actionName", ""),
            map.getOrElse("progress", ""),
            map.getOrElse("id", ""))
        }).toDF()
    //    dfDetail.show(10,false)
    
        val detailRDD = dfDetail.rdd
        val detailSchema = StructType(
          Array(
            StructField("event_time", StringType),
            StructField("user_uip", StringType),
            StructField("method", StringType),
            StructField("status", StringType),
            StructField("sip", StringType),
            StructField("actionBegin", StringType),
            StructField("actionEnd", StringType),
            StructField("userUID", StringType),
            StructField("userSID", StringType),
            StructField("userUIP", StringType),
            StructField("actionClient", StringType),
            StructField("actionType", StringType),
            StructField("actionPrepend", StringType),
            StructField("actionTest", StringType),
            StructField("ifEquipment", StringType),
            StructField("actionName", StringType),
            StructField("progress", StringType),
            StructField("id", StringType)
          )
        )
    
        val detailDF = spark.createDataFrame(detailRDD, detailSchema)
        detailDF.printSchema()
        detailDF.show(3,false)
    

    3)保存数据
    将数据写入mysql表中

     val url = "jdbc:mysql://hadoop004:3306/kb09db"
        val prop = new Properties()
        prop.setProperty("user","root")
        prop.setProperty("password","ok")
        prop.setProperty("driver","com.mysql.jdbc.Driver")
    
        println("开始写入mysql")
        detailDF.write.mode(saveMode = "overwrite").jdbc(url,"logDetail",prop)
    //    orgDF.write.mode("overwrite").jdbc(url,"",prop)
        println("写入mysql结束")
    

    在这里插入图片描述

    用户留存分析
    计算用户的次日留存率
    1)求当天新增用户总数n
    2)求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m(次日留存数)
    3)m/n*100%

    package nj.zb.kb09.project
    import java.text.SimpleDateFormat
    import java.util.Properties
    import org.apache.spark.sql.SparkSession
    
    object UserAnalysice {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local[1]").appName("UserAnalysice").getOrCreate()
        import spark.implicits._
        val sc = spark.sparkContext
    
        val url = "jdbc:mysql://hadoop004:3306/kb09db"
        val prop = new Properties()
        prop.setProperty("user","root")
        prop.setProperty("password","ok")
        prop.setProperty("driver","com.mysql.jdbc.Driver")
    
        val detailDF = spark.read.jdbc(url, "logDetail", prop)
    //    detailDF.show(1,false)
    
    
        val changeTimeFun = spark.udf.register("changeTime", (x: String) => {
          val time = new SimpleDateFormat("yyyy-MM-dd")
            .parse(x.substring(1, 10)).getTime
          time
        })
    
        //所有的注册用户信息(userID,register_time,注册行为)
        val registDF = detailDF.filter(detailDF("actionName") === ("Registered"))
          .select("userUID","event_time", "actionName")
            .withColumnRenamed("event_time","register_time")
          .withColumnRenamed("userUID","regUID")
    
        val registDF2 = registDF.select($"regUID", changeTimeFun($"register_time")
          .as("register_date"), $"actionName").distinct()
    //    var num1 = registDF2.count()
    //    registDF2.show(3,false)
    
        //所有的用户登录信息DF(userUID,signin_time,登录行为)
        val signinDF = detailDF.filter(detailDF("actionName") === ("Signin"))
          .select("userUID","event_time", "actionName")
          .withColumnRenamed("event_time","signin_time")
          .withColumnRenamed("userUID","signUID")
          .distinct()
    
        val signinDF2 = signinDF.select($"signUID", changeTimeFun($"signin_time")
          .as("signin_date"), $"actionName").distinct()
    
    
        val joinDF = registDF2.join(signinDF2
          ,signinDF("signUID") === registDF("regUID")
          ,joinType = "inner")
    
    
        val frame = joinDF.filter(
          joinDF("register_date") === joinDF("signin_date") - 86400000)
            .groupBy($"register_date").count()
          .withColumnRenamed("count","signcount")
    //    frame.show()
    
        val frame1 = registDF2.groupBy($"register_date").count()
            .withColumnRenamed("count","regcount")
        frame1.show()
    
        val frame2 = frame.join(frame1
          ,"register_date")
        frame2.show()
    
        //计算次日留存率
        frame2.map(x => (
          x.getAs[Long]("register_date"),
          x.getAs[Long]("signcount"),
          x.getAs[Long]("regcount"),
          x.getAs[Long]("signcount").toDouble / x.getAs[Long]("regcount")
          )
        ).show()
    
    
    //    val dt = registDF.map(x => (x(0).toString.substring(0, 10), x(1).toString))
    //    dt.show(4,false)
    //    detailDF.createOrReplaceTempView("detailDF")
    //    spark.sql("select count(1) from detailDF where actionName = 'Registered'").show()
      }
    }
    

    在这里插入图片描述

    展开全文
  • 这篇文章主分析了红酒的通用数据集,这个数据集一共有1600个样本,11个红酒的理化性质,以及红酒的品质(评分从0到10)。这里主要用python进行分析,主要内容分为:单变量,双变量,和多变量分析。 注意:我们在分析...
  • Apache Spark是目前最主流和常用的分布式开源处理系统,支持跨多个工作负载重用代码—批处理、交互式查询、实时分析、机器学习和图形处理等。本节ShowMeAI给大家讲解它的相关知识。
  • python数据分析 tmdb电影数据分析 源码及数据
  • 使用Spark SQL读取文件数据集来生成Data Frame对象,再利用Spark SQL函数对Data Frame对象进行数据分析,并将结果存入MySQL数据库,再以Web网页的形式对分析结果进行可视化,其中使用Spring Boot读取数据库把数据以...
  • Spark Day07:Spark SQL 主要讲解2个方面内容:Spark 调度内核和SparkSQL 快速体验。 1、Spark 内核调度 讲解Spark框架如何对1个Job作业进行调度执行,将1个Job如何拆分为Task任务,放到Executor上执行。 【以...
  • 如何用Spark进行数据分析

    千次阅读 2020-02-05 15:36:23
    如何用Spark进行数据分析,对大数据感兴趣的小伙伴就随着小编一起来了解一下吧。 如何用Spark进行数据分析 什么是Apache Spark? Apache Spark是一个为速度和通用目标设计的集群计算平台。 从速度的角度看,...
  • 第一步、数据的读取(输入) ...第二步、数据处理(分析)   调用集合RDD中函数(高阶函数)进行处理分析   RDD –>函数 ->另外一个RDD :transformation函数   val outpuRDD = inputRDD.#(#:...
  • Spark综合练习——电影评分数据分析

    千次阅读 多人点赞 2021-05-31 23:12:31
    今天给大家带来一个Spark综合练习案例–电影评分 老师给定需求:统计评分次数>200的电影平均分Top10,并写入Mysql数据库中 我:所有字我都认识,怎么连在一起我就不认识了 不管了先new个实例对象,总没错吧 val ...
  • 大数据之spark_spark案例分析

    千次阅读 2020-09-18 21:50:47
    在给定的订单数据,根据订单的分类ID进行聚合,然后按照订单分类名称,统计出某一天商品各个分类的成交金额 数据样例 {“cid”: 1, “money”: 600.0, “longitude”:116.397128,“latitude”:39.916527,“oid”:“o...
  • 大数据Spark电影评分数据分析

    千次阅读 2021-08-23 11:02:20
    使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:对电影评分数据进行统分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。...
  • 基于Python语言的Spark数据处理分析——2020年美国新冠肺炎疫情数据分析 目录基于Python语言的Spark数据处理分析——2020年美国新冠肺炎疫情数据分析一、实验环境二、数据集1.数据集下载来源2.转换文件格式3.上传...
  • 目录Spark本地运行的几个实例代码(Java实现)实例一:词频数统计问题描述过程分析代码运行结果实例二:统计平均年龄问题描述过程分析代码运行结果案例三:统计身高最值问题描述过程分析代码运行结果案例四:统计...
  • Spark RDD实现单词计数
  • 基于Idea的Spark数据分析--scala

    千次阅读 2021-06-17 16:45:51
    • 基于Eclipse或IDEA完成Spark数据分析Spark1.x或2.x版本均可 • 能够读取给定的数据文件 • 出租车GPS数据文件(taxi_gps.txt) • 北京区域中心坐标及半径数据文件(district.txt) • 能够输出以下统计信息 ...
  • ① ApacheSpark 创始公司,也是 Spark 的最大代码贡献者,Spark 技术生态背后的商业公司。 在2013年,由加州大学伯克利分校 AMPLab 的创始团队 ApacheSpark 的创建者所成立。 ② 核心产品和技术,主导和推进 Spark...
  • 数据处理常用到NumPy、SciPy和Pandas,数据分析常用到Pandas和Scikit-Learn,数据可视化常用到Matplotlib,而对大规模数据进行分布式挖掘时则可以使用Pyspark来调用Spark集群的资源。 从一定程度上来说,学习Python...
  • 利用pyspark对于武汉租房数据进行分析,可以爬取不同地区套用本代码代码如下: from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType import pandas as pd from pyspark.ml.stat ...
  • RDD(弹性分布式数据集合)是Spark的基本数据结构,Spark中的所有数据都是通过RDD的形式进行组织。本文讲解RDD的属性、创建方式、广播与累加器等重要知识点,并图解RDD高频算子。
  • 打开 shell 或命令行窗口,进入示例代码解压后的目录,敲入命令 sbt test,该命令会 下载所有的依赖项,包括 Scala 编译器及第三方库。下载完毕后,sbt 会编译代码并运行单元测试。该命令最后会输出 success 信息。...
  • Spark 高级数据分析(第2版)

    千次阅读 2018-11-06 11:55:07
    作为计算框架,Spark 速度快,开发简单,能同时兼顾批处理和实时数据分析,因此很快被广大企业级用户所采纳,并随着近年人工智能的崛起而成为分析和挖掘大数据的重要得力工具。 本书由业内知名数据科学家执笔,通过...
  • 基于Spark的音乐专辑数据分析(scala)

    千次阅读 多人点赞 2020-08-15 20:07:52
    本项目采用scala编写数据分析代码,若采用python编写数据分析代码,可参考 基于Spark的音乐专辑数据分析展示。 数据来源 数据集albums.csv,包含了10万条音乐专辑的数据。主要字段说明如下: album_title:音乐专辑...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 22,241
精华内容 8,896
关键字:

spark数据分析案例代码