spark实战_spark 实战 - CSDN
  • 实战概览一、实战内容二、大数据实时流处理分析系统简介1.需求2.背景及架构三、实战所用到的架构和涉及的知识1.后端架构2.前端框架四、项目实战1.后端开发实战1.构建项目2.引入依赖3.创建工程包结构4.编写代码5.编写...

    一、实战内容

    • 编写python脚本,源源不断产生学习网站的用户行为日志。
    • 启动 Flume 收集产生的日志。
    • 启动 Kfka 接收 Flume 接收的日志。
    • 使用 Spark Streaming 消费 Kafka 的用户日志。
    • Spark Streaming将数据清洗过滤非法数据,然后分析日志中用户的访问课程,统计课程数,得到最受欢迎课程TOP5,第二个业务是统计各个搜索引擎的搜索量。(通过分析日志还可以实现很多其他的业务,基于篇幅,本项目只实现其中两个)。
    • 将 Spark Streaming 处理的结果写入 HBase 中。
    • 前端使用 Spring MVC、 Spring、 MyBatis 整合作为数据展示平台。
    • 使用Ajax异步传输数据到jsp页面,并使用 Echarts 框架展示数据。
    • 本实战使用IDEA2018作为开发工具,JDK版本为1.8,Scala版本为2.11。

    二、大数据实时流处理分析系统简介

    • 1.需求

      如今大数据技术已经遍布生产的各个角落,其中又主要分为离线处理实时流处理。本实战项目则是使用了实时流处理,而大数据的实时流式处理的特点:
      1.数据会不断的产生,且数量巨大。
      2.需要对产生额数据实时进行处理。
      3.处理完的结果需要实时读写进数据库或用作其他分析。
      针对以上的特点,传统的数据处理结构已经无力胜任,因而产生的大数据实时流处理的架构思想。
    • 2.背景及架构

      数据的处理一般涉及数据的聚合,数据的处理和展现能够在秒级或者毫秒级得到响应。针对这些问题目前形成了Flume + kafka + Storm / Spark /Flink + Hbase / Redis 的技架构。本实战采用Flume + kafka + Spark + Hbase的架构。

    三、实战所用到的架构和涉及的知识

    • 1.后端架构

      1.Hadoop-2.6.0-cdh5.7.0
      2.Spark-2.2.0-bin-2.6.0-cdh5.7.0
      3.Apache-flume-1.9.0-bin
      4.Kafka_2.11-1.0.0
      5.Hbase-1.2.0-cdh5.7.0
    • 2.前端框架

      1.Spring MVC-4.3.3.RELEASE
      2.Spring-4.3.3.RELEASE
      3.MyBatis-3.2.1
      4.Echarts

    四、项目实战

    • 1.后端开发实战

      1.构建项目

      构建Scala工程项目,并添加Maven支持。

      2.引入依赖
        <properties>
            <scala.version>2.11.8</scala.version>
            <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
            <spark.version>2.2.0</spark.version>
            <hbase.version>1.2.0-cdh5.7.0</hbase.version>
        </properties>
      
        <repositories>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
            </repository>
        </repositories>
      
       <dependencies>
            <dependency>
              <groupId>org.scala-lang</groupId>
              <artifactId>scala-library</artifactId>
              <version>${scala.version}</version>
            </dependency>
        
            <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-client</artifactId>
                  <version>${hadoop.version}</version>
            </dependency>
        
            <dependency>
                  <groupId>org.apache.hbase</groupId>
                  <artifactId>hbase-client</artifactId>
                  <version>${hbase.version}</version>
            </dependency>
        
            <dependency>
              <groupId>org.apache.spark</groupId>
              <artifactId> spark-streaming-kafka-0-8_2.11</artifactId>
              <version>${spark.version}</version>
            </dependency>
        
            <dependency>
              <groupId>com.fasterxml.jackson.module</groupId>
              <artifactId>jackson-module-scala_2.11</artifactId>
              <version>2.6.5</version>
            </dependency>
        
            <dependency>
              <groupId>net.jpountz.lz4</groupId>
              <artifactId>lz4</artifactId>
              <version>1.3.0</version>
            </dependency>
            
            <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming_2.11</artifactId>
                  <version>${spark.version}</version>
            </dependency>
        
            <dependency>
              <groupId>org.apache.commons</groupId>
              <artifactId>commons-lang3</artifactId>
              <version>3.5</version>
            </dependency>
        
            <dependency>
                  <groupId>com.ggstar</groupId>
                  <artifactId>ipdatabase</artifactId>
                  <version>1.0-SNAPSHOT</version>
            </dependency>
       </dependencies>
      
      3.创建工程包结构

      在scala目录下创建如下包结构:
      创建工程包结构

      4.编写代码

      (1). 在util包下创健Java类HBaseUtils,用于连接HBase,存储处理的结果:
      util包下创健Java类HBaseUtils
      HBaseUtils完整代码如下:

       /**
       * Hbase工具类,用于:
       * 连接HBase,获得表实例
       */
      public class HBaseUtils {
      
          private Configuration configuration = null;
          private Connection connection = null;
          private static HBaseUtils instance = null;
      
          /**
           * 在私有构造方法中初始化属性
           */
          private HBaseUtils(){
              try {
                  configuration = new Configuration();
                  //指定要访问的zk服务器
                  configuration.set("hbase.zookeeper.quorum", "hadoop01:2181");
                  //得到Hbase连接
                  connection = ConnectionFactory.createConnection(configuration);
              }catch(Exception e){
                  e.printStackTrace();
              }
          }
      	
      	/**
      	* 获得HBase连接实例
      	*/
      	
         public static synchronized HBaseUtils getInstance(){
              if(instance == null){
                  instance = new HBaseUtils();
              }
              return instance;
          }
      
          /**
           *由表名得到一个表的实例
           * @param tableName
           * @return
           */
          public  HTable getTable(String tableName) {
              HTable hTable = null;
              try {
                  hTable = (HTable)connection.getTable(TableName.valueOf(tableName));
              }catch (Exception e){
                  e.printStackTrace();
              }
              return hTable;
          }
      }
      

      在util包下创Scala object类DateUtils,用于格式化日志的日期,代码如下:

      /**
      * 格式化日期工具类
      */
      object DateUtils {
      
      //指定输入的日期格式
      val YYYYMMDDHMMSS_FORMAT= FastDateFormat.getInstance("yyyy-MM-dd hh:mm:ss")
      //指定输出格式
      val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddhhmmss")
      
      //输入String返回该格式转为log的结果
      def getTime(time:String) = {
        YYYYMMDDHMMSS_FORMAT.parse(time).getTime
      }
      
      def parseToMinute(time:String) = {
        //调用getTime
        TARGET_FORMAT.format(getTime(time))
       }
      }
      

      (2). 在domain包下创建以下几个Scala样例类:
      ClickLog:用于封装清洗后的日志信息:
      在domain包下创建ClickLog
      然后将该类声明为样例类,在class关键字前增加case关键字:
      在这里插入图片描述
      ClickLog 类完整代码如下:

       /**
        * 封装清洗后的数据
        * @param ip 日志访问的ip地址
        * @param time 日志访问的时间
        * @param courseId 日志访问的实战课程编号
        * @param statusCode 日志访问的状态码
        * @param referer 日志访问的referer信息
        */
      case class ClickLog (ip:String,time:String,courseId:Int,statusCode:Int,referer:String)
      

      再创建样例类 CourseClickCount 用于封装课程统计的结果,样例类 CourseSearchClickCount 用于封装搜索引擎的统计结果,因为创建过程与上面的ClickLog 类一样,这里不再展示,直接给出完整代码:
      CourseClickCount 类完整代码如下:

         /**
          * 封装实战课程的总点击数结果
           * @param day_course 对应于Hbase中的RowKey
           * @param click_count 总点击数
           */
         case class CourseClickCount(day_course:String,click_count:Int)
      

      CourseSearchClickCount 类完整代码如下:

          /**
        * 封装统计通过搜索引擎多来的实战课程的点击量
        * @param day_serach_course 当天通过某搜索引擎过来的实战课程
        * @param click_count 点击数
        */
      case class CourseSearchClickCount(day_serach_course:String,click_count:Int)
      

      (3). 在dao包下创建以下Scala的object类:
      CourseClickCountDao :用于交互HBase,把课程点击数的统计结果写入HBase:
       在dao包下创建CourseClickCountDao
      CourseClickCountDao 类的完整代码如下:

      	/**
        * 实战课程点击数统计访问层
        */
      object CourseClickCountDao {
      
          val tableName = "ns1:courses_clickcount"  //表名
          val cf = "info"   //列族
          val qualifer = "click_count"   //列
      
        /**
          * 保存数据到Hbase
          * @param list (day_course:String,click_count:Int) //统计后当天每门课程的总点击数
          */
        def save(list:ListBuffer[CourseClickCount]): Unit = {
        	//调用HBaseUtils的方法,获得HBase表实例
          val table = HBaseUtils.getInstance().getTable(tableName)
          for(item <- list){
            //调用Hbase的一个自增加方法
            table.incrementColumnValue(Bytes.toBytes(item.day_course),
              Bytes.toBytes(cf),
              Bytes.toBytes(qualifer),
              item.click_count)  //赋值为Long,自动转换
          }
        }
      }
      

      CourseClickCountDao :用于交互HBase,把搜引擎搜索数量的统计结果写入HBase,创建过程与CourseClickCountDao 类一致故不再展示,完整代码如下:

      object CourseSearchClickCountDao {
        val tableName = "ns1:courses_search_clickcount"
        val cf = "info"
        val qualifer = "click_count"
      
        /**
          * 保存数据到Hbase
          * @param list (day_course:String,click_count:Int) //统计后当天每门课程的总点击数
          */
        def save(list:ListBuffer[CourseSearchClickCount]): Unit = {
          val table = HBaseUtils.getInstance().getTable(tableName)
          for(item <- list){
            table.incrementColumnValue(Bytes.toBytes(item.day_serach_course),
              Bytes.toBytes(cf),
              Bytes.toBytes(qualifer),
              item.click_count)  //赋值为Long,自动转换
          }
        }
      }
      

      注意: 代码中的HBase表需要提前创建好,详情请看本节的 8.在HBase中创建项目需要的表
      (4). 在application包下创建Scala的object类 CountByStreaming,用于处理数据,是本项目的程序入口,最为核心的类:
      在这里插入图片描述
      CountByStreaming 类的完整代码如下:

      object CountByStreaming {
      
        def main(args: Array[String]): Unit = {
      
          /**
            * 最终该程序将打包在集群上运行,
            * 需要接收几个参数:zookeeper服务器的ip,kafka消费组,
            * 主题,以及线程数
            */
          if(args.length != 4){
              System.err.println("Error:you need to input:<zookeeper> <group> <toplics> <threadNum>")
              System.exit(1)
            }
      
            //接收main函数的参数,外面的传参
            val Array(zkAdderss,group,toplics,threadNum) = args
      
          /**
            * 创建Spark上下文,下本地运行需要设置AppName
            * Master等属性,打包上集群前需要删除
            */
          val sparkConf = new SparkConf()
              .setAppName("CountByStreaming")
              .setMaster("local[4]")
            
            //创建Spark离散流,每隔60秒接收数据
            val ssc = new StreamingContext(sparkConf,Seconds(60))
            //使用kafka作为数据源
            val topicsMap = toplics.split(",").map((_,threadNum.toInt)).toMap
            //创建kafka离散流,每隔60秒消费一次kafka集群的数据
            val kafkaInputDS = KafkaUtils.createStream(ssc,zkAdderss,group,topicsMap)
      
             //得到原始的日志数据
            val logResourcesDS = kafkaInputDS.map(_._2)
          /**
            * (1)清洗数据,把它封装到ClickLog中
            * (2)过滤掉非法的数据
            */
            val cleanDataRDD = logResourcesDS.map(line => {
              val splits = line.split("\t")
              if(splits.length != 5) {      //不合法的数据直接封装默认赋予错误值,filter会将其过滤
                ClickLog("", "", 0, 0, "")
              }
              else {
              val ip = splits(0)   //获得日志中用户的ip
              val time = DateUtils.parseToMinute(splits(1)) //获得日志中用户的访问时间,并调用DateUtils格式化时间
              val status = splits(3).toInt  //获得访问状态码
              val referer = splits(4)
              val url = splits(2).split(" ")(1)  //获得搜索url
              var courseId = 0
              if(url.startsWith("/class")){
                val courseIdHtml = url.split("/")(2)
                courseId = courseIdHtml.substring(0,courseIdHtml.lastIndexOf(".")).toInt
              }
              ClickLog(ip,time,courseId,status,referer)  //将清洗后的日志封装到ClickLog中
              }
            }).filter(x => x.courseId != 0 )   //过滤掉非实战课程
      
          /**
            * (1)统计数据
            * (2)把计算结果写进HBase
            */
            cleanDataRDD .map(line => {
              //这里相当于定义HBase表"ns1:courses_clickcount"的RowKey,
              // 将‘日期_课程’作为RowKey,意义为某天某门课的访问数
              (line.time.substring(0,8) + "_" + line.courseId,1)   //映射为元组
            }).reduceByKey(_ + _)   //聚合
              .foreachRDD(rdd =>{    //一个DStream里有多个RDD
              rdd.foreachPartition(partition => {   //一个RDD里有多个Partition
                val list = new ListBuffer[CourseClickCount]
                partition.foreach(item => {   //一个Partition里有多条记录
                  list.append(CourseClickCount(item._1,item._2))
                })
                CourseClickCountDao.save(list)   //保存至HBase
              })
            })
      
          /**
            * 统计至今为止通过各个搜索引擎而来的实战课程的总点击数
            * (1)统计数据
            * (2)把统计结果写进HBase中去
            */
          cleanDataRDD.map(line => {
            val referer = line.referer
            val time = line.time.substring(0,8)
            var url = ""
            if(referer == "-"){     //过滤非法url
              (url,time)
            }else {
              //取出搜索引擎的名字
              url = referer.replaceAll("//","/").split("/")(1)
              (url,time)
            }
          }).filter(x => x._1 != "").map(line => {
            //这里相当于定义HBase表"ns1:courses_search_clickcount"的RowKey,
            // 将'日期_搜索引擎名'作为RowKey,意义为某天通过某搜搜引擎访问课程的次数
            (line._2 + "_" + line._1,1)   //映射为元组
          }).reduceByKey(_ + _)   //聚合
            .foreachRDD(rdd => {
              rdd.foreachPartition(partition => {
                val list = new ListBuffer[CourseSearchClickCount]
                partition.foreach(item => {
                  list.append(CourseSearchClickCount(item._1,item._2))
                })
                CourseSearchClickCountDao.save(list)
              })
            })
      
          ssc.start()
          ssc.awaitTermination()
        }
      }
      
      5.编写python脚本产生数据

      编写一个python脚本,命名为 generate_log.py,用于产生用户日志:

       import random
       import time
       
       //创建url访问数组class/112,数字代表的是实战课程id
       url_paths = [
       	"class/112.html",
       	"class/128.html",
       	"class/145.html",
       	"class/146.html",
       	"class/500.html",
       	"class/250.html",
       	"class/131.html",
       	"class/130.html",
       	"class/271.html",
       	"class/127.html",
       	"learn/821",
       	"learn/823",
       	"learn/987",
       	"learn/500",
       	"course/list"
       ]
       
       //创建ip数组,随机选择4个数字作为ip如132.168.30.87
       ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168,192,134,111,54,64,110,43]
       
       //搜索引擎访问数组{query}代表搜索关键字
       http_referers = [
       	"http://www.baidu.com/s?wd={query}",
       	"https://www.sogou.com/web?query={query}",
       	"http://cn.bing.com/search?q={query}",
       	"https://search.yahoo.com/search?p={query}",
       ]
       
       //搜索关键字数组
       search_keyword = [
       	"Spark SQL实战",
       	"Hadoop基础",
       	"Storm实战",
       	"Spark Streaming实战",
       	"10小时入门大数据",
       	"SpringBoot实战",
       	"Linux进阶",
       	"Vue.js"
       ]
       
       //状态码数组
       status_codes = ["200","404","500","403"]
       
       //随机选择一个url
       def sample_url():
       	return random.sample(url_paths, 1)[0]
       	
       //随机组合一个ip
       def sample_ip():
       	slice = random.sample(ip_slices , 4)
       	return ".".join([str(item) for item in slice])
       
       //随机产生一个搜索url
       def sample_referer():
       	//一半的概率会产生非法url,用于模拟非法的用户日志
       	if random.uniform(0, 1) > 0.5:
       		return "-"
       
       	refer_str = random.sample(http_referers, 1)
       	query_str = random.sample(search_keyword, 1)
       	return refer_str[0].format(query=query_str[0])
       
       //随机产生一个数组
       def sample_status_code():
       	return random.sample(status_codes, 1)[0]
       
       //组合以上的内容,产生一条简单的用户访问日志
       def generate_log(count = 10):
       
       	//获取本机时间并将其作为访问时间写进访问日志中
       	time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
       	
       	//存储日志的目标文件(换成你自己的)
       	f = open("/home/hadoop/data/click.log","w+")
       	
       	//组合用户日志
       	while count >= 1:
       		query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(url=sample_url(), ip=sample_ip(), referer=sample_referer(), status_code=sample_status_code(),local_time=time_str)
       
       		f.write(query_log + "\n")
       
       		count = count - 1 
       
       //执行main,每次产生100条用户日志
       if __name__ == '__main__':
       	generate_log(100)
      
      6.创建日志存放目录并编写Flume的配置文件

      (1). 创建日志存储的目录用于存放日志,位置自定义,但是一定要与python脚本及即将要编写的Flume配置文件中监控的目录一致,否则将无法收集。在home目录下创建data文件夹,并创建click.log文件,即最后日志存放在~/data/click.log中。
      (2). 在Flume的配置文件目录下新建一个文件,即在$FLUME_HOME/conf下新建一个文件 streaming_project.conf
      streaming_project.conf的完整配置如下:

        exec-memory-kafka.sources = exec-source  #exec的源,用于监控某个文件是否有     数据追加
       exec-memory-kafka.sinks = kafka-sink
       exec-memory-kafka.channels = memory-channel
       
       exec-memory-kafka.sources.exec-source.type = exec
       exec-memory-kafka.sources.exec-source.command = tail -F /home/hadoop/data/click.log  #被监控的文件,目录必须正确
       exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
       
       exec-memory-kafka.channels.memory-channel.type = memory 
       
       exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink 
       exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop01:9092   #Kafka集群的某个活动节点的ip
       exec-memory-kafka.sinks.kafka-sink.topic = streamtopic  #Kafka的主题
       exec-memory-kafka.sinks.kafka-sink.batchSize = 10
       exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
       
       exec-memory-kafka.sources.exec-source.channels = memory-channel   #关联三大组件
       xec-memory-kafka.sinks.kafka-sink.channel = memory-channel
      
      7.创建Kafka主题

      在Linux终端执行以下命令:
      $KAFKA_HOME/bin/./kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 3 --partitions 3 --topic streamtopic
      主题的名字自定义,副本的个数小于等于Kafka集群中存活节点的个数,zookeeper写任意一台存活的zookeeper的主机ip。

      8.在HBase中创建项目需要的表

      在Linux终端执行以下命令:
      (1). 进入Hbase shell:$HBASE_HOME/bin/./hbase shell
      在Hbase shell中执行以下命令:
      (2). 创建名字空间 ns1:create_namespace 'ns1'
      (3). 创建课程访问统计表,列族为info:create 'ns1:courses_clickcoun', 'info'
      (4). 创建搜索引擎统计表,列族为info:create 'ns1:courses_search_clickcount', 'info'

      9.测试后端代码

      (1). 启动Flume,监控存储日志的文件。
      在Linux终端下执行以下命令:
      $FLUME_HOME/bin/./flume-ng agent -n exec-memory-kafka -f FLUME_HOME/conf/streaming_project.conf
      (2). 调用Linux的crontab命令,周期性调用python脚本,源源不断产生数据,有关于crontab命令的使用可自行查找相关资料,这里不详细展,在LInux终端下执行命令:crontab -e,进入任务设置,键盘输入 i ,进入insert模式,编写脚本如下:
      在这里插入图片描述
      编写完后在键盘下敲esc健退出insert模式,输入 :wq保存退出。
      注意: 如果要停止产生日志,可以执行命令:crontab -e,进入任务设置,在那一行脚本前输入 #,代表注释那一行脚本。
      (3). 启动一个kafak控制台消费者,检测Flume是否成功收集到日志:
      在Linux终端输入命令:$KAFKA_HOME/bin/./kafka-console-consumer.sh --topic streamtopic --zookeeper hadoop01:2181,如果成功则终端控制台会有如下显示:
      在这里插入图片描述
      (4). 运行本项目的核心类 CountByStreaming ,因为我们设置main方法要接收许多参数,因此在运行前需要配置一下该类:
      在这里插入图片描述
      在这里插入图片描述
      配置完成后运行程序即可。
      注意: 有的小伙伴的电脑可能运行内存不足报如下异常:
      在这里插入图片描述
      只需要在代码中增加如下配置就可以解决:
      在这里插入图片描述
      (5). 进入hbase shell,查看结果:
      在hbase shell下输入以下命令:
      scan 'ns1:courses_clickcount' :扫描课程点击统计表,得到如下结果:
      在这里插入图片描述
      scan 'ns1:courses_search_clickcount' :扫描搜索引擎搜索统计表,得到如下结果:
      在这里插入图片描述
      至此,后端的代码及测试已经完成。

    • 2.前端开发实战

      1.构建工程

      构建Java Web工程项目,并添加Maven支持。

      2.引入依赖
      <properties>
         <hbase.version>1.2.0-cdh5.7.0</hbase.version>
         <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
         <spring.version>4.3.3.RELEASE</spring.version>
         <spring_mvc.vserion>4.3.3.RELEASE</spring_mvc.vserion>
         <mybatis.version>3.2.1</mybatis.version>
         <mysql.version>5.1.17</mysql.version>
      </properties>
      
      <repositories>
         <repository>
             <id>cloudera</id>
             <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
         </repository>
      </repositories>
      
      <dependencies>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
             <version>${mysql.version}</version>
         </dependency>
      
         <dependency>
             <groupId>c3p0</groupId>
             <artifactId>c3p0</artifactId>
             <version>0.9.1.2</version>
         </dependency>
      
         <dependency>
             <groupId>org.mybatis</groupId>
             <artifactId>mybatis</artifactId>
             <version>${mybatis.version}</version>
         </dependency>
      
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.11</version>
         </dependency>
      
         <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-context-support</artifactId>
             <version>${spring.version}</version>
         </dependency>
         
         <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-jdbc</artifactId>
             <version>${spring.version}</version>
         </dependency>
      
         <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-webmvc</artifactId>
             <version>${spring_mvc.vserion}</version>
         </dependency>
      
         <dependency>
             <groupId>org.mybatis</groupId>
             <artifactId>mybatis-spring</artifactId>
             <version>1.3.0</version>
         </dependency>
      
         <dependency>
             <groupId>org.aspectj</groupId>
             <artifactId>aspectjweaver</artifactId>
             <version>1.8.10</version>
         </dependency>
      
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>jstl</artifactId>
             <version>1.2</version>
         </dependency>
      
         <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>${hbase.version}</version>
         </dependency>
      
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
      
         <dependency>
             <groupId>net.sf.json-lib</groupId>
             <artifactId>json-lib</artifactId>
             <version>2.4</version>
             <classifier>jdk15</classifier>
         </dependency>
      
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
             <version>2.9.3</version>
         </dependency>
      
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
             <version>2.9.3</version>
         </dependency>
      
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>
             <version>2.9.3</version>
         </dependency>
      </dependencies>
      
      3.创建MySQL表

      创建course用于根据课程id(HBase存储的数据)查询课程名:
      create table course(id int,name varchar(30))
      插入数据后得到如下表格:
      在这里插入图片描述
      创建course用于根据搜索引擎id(HBase存储的数据)查询搜索引擎名:
      create table search_engine(id varchar(10),name varchar(30))
      插入数据后得到如下表格:
      在这里插入图片描述

      4.配置SSM框架

      (1). 在resources目录下创建如下xml文件:
      在这里插入图片描述
      (2). 在Web/WEB-INF 下创建dispatcher-servlet.xml
      在这里插入图片描述
      (3). 以上xml文件的完整内容:
      beans.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
            xmlns:aop="http://www.springframework.org/schema/aop"
            xsi:schemaLocation="http://www.springframework.org/schema/beans
         							 http://www.springframework.org/schema/beans/spring-beans.xsd
         							   http://www.springframework.org/schema/context
         							   http://www.springframework.org/schema/context/spring-context-4.3.xsd
         							   http://www.springframework.org/schema/tx
         							   http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
         							   http://www.springframework.org/schema/aop
         							   http://www.springframework.org/schema/aop/spring-aop-4.3.xsd"
            default-autowire="byType">
         <!-- 配置数据源 换成你的数据库url-->
         <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource">
             <property name="driverClass" value="com.mysql.jdbc.Driver"/>
             <property name="jdbcUrl" value="jdbc:mysql://hadoop01:3306/jdbc?user=root&amp;password=root"/>
             <property name="user" value="root"/>
             <property name="password" value="root"/>
             <property name="maxPoolSize" value="20"/>
             <property name="minPoolSize" value="2"/>
             <property name="initialPoolSize" value="3"/>
             <property name="acquireIncrement" value="2"/>
         </bean>
      
         <!-- 配置SessionFactory -->
         <bean id="SessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
             <property name="dataSource" ref="dataSource"/>
             <property name="configLocation" value="classpath:mybatis-config.xml"/>
         </bean>
         <!-- 包扫描 -->
         <context:component-scan base-package="com.ssm.dao,com.ssm.service,com.ssm.dao" />
      
         <!-- 事务管理器 -->
         <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
             <property name="dataSource" ref="dataSource"/>
         </bean>
      
         <!-- 配置事务 -->
         <tx:advice id="txAdvice" transaction-manager="txManager">
             <tx:attributes>
                 <tx:method name="*" propagation="REQUIRED" isolation="DEFAULT"/>
             </tx:attributes>
         </tx:advice>
      
         <!-- 配置事务切面 -->
         <aop:config>
             <aop:advisor advice-ref="txAdvice" pointcut="execution(* *..*Service.*(..))" />
         </aop:config>
      

      mybatis-config.xml

      <?xml version="1.0" encoding="UTF-8" ?>
      <!DOCTYPE configuration
              PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
              "http://mybatis.org/dtd/mybatis-3-config.dtd">
      <configuration>
      
          <!-- 创建别名关系 -->
          <typeAliases>
              <typeAlias type="com.ssm.domain.Course" alias="_Course"/>
              <typeAlias type="com.ssm.domain.SearchEngine" alias="_SearchEngine"/>
          </typeAliases>
      
          <!--   创建映射关系   -->
          <mappers>
              <mapper resource="CourseMapper.xml"/>
              <mapper resource="SearchEngineMapper.xml"/>
          </mappers>
      </configuration>
      

      CourseMapper.xml

      <?xml version="1.0" encoding="UTF-8" ?>
      <!DOCTYPE mapper
              PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
              "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
      <!-- 定义名字空间 -->
      <mapper namespace="course">
          <select id="findCourseName" parameterType="int" resultType="_Course">
              select id,`name`
              from course
              where id = #{id}
          </select>
      </mapper>
      

      SearchEngineMapper.xml

      <?xml version="1.0" encoding="UTF-8" ?>
      <!DOCTYPE mapper
              PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
              "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
      <!-- 定义名字空间 -->
      <mapper namespace="searchEngine">
          <select id="findEngineName" parameterType="String" resultType="_SearchEngine">
              select `name`
              from search_engine
              where id = #{name}
          </select>
      </mapper>
      

      dispatcher-servlet.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:mvc="http://www.springframework.org/schema/mvc"
             xmlns:context="http://www.springframework.org/schema/context"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
      								http://www.springframework.org/schema/beans/spring-beans.xsd
      								http://www.springframework.org/schema/mvc
      								http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
      								http://www.springframework.org/schema/context
      								http://www.springframework.org/schema/context/spring-context-4.3.xsd">
          <!-- 配置包扫描路径 -->
          <context:component-scan base-package="com.ssm.controller"/>
          <!-- 配置使用注解驱动 -->
          <mvc:annotation-driven />
          <mvc:default-servlet-handler/>
          <!-- 配置视图解析器 -->
          <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
              <property name="prefix" value="/"/>
              <property name="suffix" value=".jsp"/>
          </bean>
      </beans>
      

      (4). 配置Web/WEB-INF/web.xml:

      <?xml version="1.0" encoding="UTF-8"?>
      <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
               version="4.0">
      
          <!-- 指定spring的配置文件beans.xml -->
          <context-param>
              <param-name>contextConfigLocation</param-name>
              <param-value>classpath:beans.xml</param-value>
          </context-param>
      
          <!-- 确保web服务器启动时,完成spring的容器初始化 -->
          <listener>
              <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
          </listener>
      
          <!-- 配置分发器 -->
          <servlet>
              <servlet-name>dispatcher</servlet-name>
              <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
          </servlet>
      
          <servlet-mapping>
              <servlet-name>dispatcher</servlet-name>
              <url-pattern>/</url-pattern>
          </servlet-mapping>
      </web-app>
      

      至此,SSM整合配置已经完成。

      5.创建工程包结构

      在java目录下创建以下包结构:
      在这里插入图片描述

      6.编写代码

      (1). 在util包下新健Java类 HBaseUtils用于访问HBase,完整代码如下:

          public class HBaseUtils {
        
            private Configuration configuration = null;
            private Connection connection = null;
            private static HBaseUtils hBaseUtil = null;
        
            private HBaseUtils(){
                try {
                    configuration = new Configuration();
                    //zookeeper服务器的地址
                    configuration.set("hbase.zookeeper.quorum","hadoop01:2181");
                    connection =  ConnectionFactory.createConnection(configuration);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        
            /**
             * 获得HBaseUtil实例
             * @return
             */
            public static synchronized HBaseUtils getInstance(){
                if(hBaseUtil == null){
                    hBaseUtil = new HBaseUtils();
                }
                return hBaseUtil;
            }
        
            /**
             * 根据表名获得表对象
             */
            public HTable getTable(String tableName){
                try {
                    HTable table = null;
                    table = (HTable)connection.getTable(TableName.valueOf(tableName));
                    return table;
                }catch (Exception e){
                    e.printStackTrace();
                }
                return null;
            }
        
            /**
             * 根据日期查询统计结果
             */
            public Map<String,Long> getClickCount(String tableName,String date){
                Map<String,Long> map = new HashMap<String, Long>();
                try {
                    //得到表实例
                    HTable table = getInstance().getTable(tableName);
                    //列族
                    String cf = "info";
                    //列
                    String qualifier = "click_count";
                    //定义扫描器前缀过滤器,只扫描给定日期的row
                    Filter filter =  new PrefixFilter(Bytes.toBytes(date));
                    //定义扫描器
                    Scan scan = new Scan();
                    scan.setFilter(filter);
                    ResultScanner results =  table.getScanner(scan);
                    for(Result result:results){
                        //取出rowKey
                        String rowKey = Bytes.toString(result.getRow());
                        //取出点击次数
                        Long clickCount = Bytes.toLong(result.getValue(cf.getBytes(),qualifier.getBytes()));
                        map.put(rowKey,clickCount);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                    return null;
                }
                return map;
            }
         }
      

      (2). 在domain包下新建以下JavaBean用于封装信息:
      Course

         /**
        *课程类,实现Comparable接口用于排序
        */
       public class Course implements Comparable<Course>{
       
           private int id;   //课程编号
           private String name;    //课程名
           private Long count;     //点击次数
       
           public Long getCount() {
               return count;
           }
       
           public void setCount(Long count) {
               this.count = count;
           }
       
           public int getId() {
               return id;
           }
       
           public void setId(int id) {
               this.id = id;
           }
       
           public String getName() {
               return name;
           }
       
           public void setName(String name) {
               this.name = name;
           }
       
           /**
            * 降序排序
            */
           public int compareTo(Course course) {
               return course.count.intValue() - this.count.intValue();
           }
       }
      

      SearchEngine

        /**
       * 搜索引擎类
       */
      public class SearchEngine {
      
          private String name;
          private Long count;
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public Long getCount() {
              return count;
          }
      
          public void setCount(Long count) {
              this.count = count;
          }
      }
      

      (3). 在dao包下新建以下接口:
      CourseClickCountDao:

          /**
       * 课程统计结果Dao
       */
      public interface CourseClickCountDao {
      
          public Set<Course> findCourseClickCount(String tableName, String date);
      }
      

      SearchEngineCountDao

      /**
      * 索索引擎统计Dao
      */
      public interface SearchEngineCountDao {
      
         public List<SearchEngine> findSearchEngineCount(String tableName, String date);
      }
      

      (4). 在dao包下的impl包新建以下Java类实现dao下的接口:
      CourseClickCountDaoImpl:

      /**
      * 课程点击统计Dao实现类
      */
      @Repository("CourseClickCountDao")
      public class CourseClickCountDaoImpl extends SqlSessionDaoSupport implements CourseClickCountDao {
      
         public Set<Course> findCourseClickCount(String tableName, String date) {
             Map<String,Long> map = new HashMap<String, Long>();
             Course course = null;
             Set<Course> set = new TreeSet<Course>();
             map = HBaseUtils.getInstance().getClickCount(tableName,date);
             for (Map.Entry<String, Long> entry : map.entrySet()) {
                 //Rowkey的结构:20190330_112,112为课程的id
               int id = Integer.parseInt(entry.getKey().substring(9));
                 //查询课程名并封装进bean
               course = getSqlSession().selectOne("course.findCourseName",id);
                 //将课程名封装进bean
               course.setCount(entry.getValue());
               set.add(course);
             }
             return set;
         }
      }
      

      CourseClickCountDaoImpl

      /**
      * 搜索引擎统计Dao实现类
      */
      @Repository("SearchEngineCountDao")
      public class SearchEngineCountDaoImpl extends SqlSessionDaoSupport implements SearchEngineCountDao {
      
         public List<SearchEngine> findSearchEngineCount(String tableName, String date) {
             Map<String,Long> map = new HashMap<String, Long>();
             List<SearchEngine> list = new ArrayList<SearchEngine>();
             SearchEngine searchEngine = null;
             //调用HBaseUtil查询结果
             map = HBaseUtils.getInstance().getClickCount(tableName,date);
             System.out.println(map);
             for ( Map.Entry<String, Long> entry : map.entrySet()){
                 //取出搜索引擎的名字
                 String name = entry.getKey().split("\\.")[1];
                 //查询引擎名字,封装进bean
                 searchEngine = getSqlSession().selectOne("searchEngine.findEngineName",name);
                 //把搜索引擎的点击次数封装进bean
                 searchEngine.setCount(entry.getValue());
                 list.add(searchEngine);
             }
             return list;
         }
      }
      

      (5). 在service包新建如下接口,用于为controller层提供服务:
      CourseClickService

      /**
      * 课程点击统计Service类
      */
      public interface CourseClickService {
      
         public List<Course> findCourseClickCount(String tableName, String date);
      }
      

      SearchEngineService

      /**
      * 搜索引擎点击Service实现类
      */
      public interface SearchEngineService {
      
         public List<SearchEngine> findSearchEngineCount(String tableName, String date);
      }
      

      (6). 在service包下的impl包新建如Java类,实现service包下的接口:
      CourseClickServiceImpl

      /**
       * 课程点击统计Service实现类
       */
      @Service("CourseClickService")
      public class CourseClickServiceImpl implements CourseClickService {
      
          //注入CourseClickCountDaoImpl类
          @Resource(name = "CourseClickCountDao")
          private CourseClickCountDao  courseClickCountDao;
      
          /**
           * 将点击率TOP5的课程封装进list
           */
          public List<Course> findCourseClickCount(String tableName, String date) {
              List<Course> list = new ArrayList<Course>();
              //Set集合里的bean根据点击数进行的降序排序
              Set<Course> set = courseClickCountDao.findCourseClickCount(tableName,date);
              Iterator<Course> iterator = set.iterator();
              int i = 0;
              //将TOP5课程封装进list
              while (iterator.hasNext() && i++ < 5){
                  list.add(iterator.next());
              }
              return list;
          }
      }
      

      SearchEngineServiceImpl

      /**
      * 搜索引擎点击统计Service实现类
      */
         @Service("SearchEngineService")
         public class SearchEngineServiceImpl implements SearchEngineService {
         
             //注入SearchEngineCountDao
             @Resource(name = "SearchEngineCountDao")
             private SearchEngineCountDao  searchEngineDao;
         
             public List<SearchEngine> findSearchEngineCount(String tableName, String date) {
                 return searchEngineDao.findSearchEngineCount(tableName,date);
             }
         }
      

      (6). 在controller包中新建以下Java类,用于响应Web的请求:
      CourseClickController

      /**
      * 课程点击Controller
      */
      @Controller
      public class CourseClickController {
      
         //注入CourseClickService
         @Resource(name = "CourseClickService")
         private CourseClickService service;
      
         //页面跳转
         @RequestMapping("/courseclick")
         public String toGetCourceClick(){
             return "courseclick";
         }
      
         /**
          *  sponseBody注解的作用是将controller的方法返回的对象通过适当的转换器转
          *  换为指定的格式之后,写入到response对象的body区,通常用来返回JSON数据或者是XML
          */
         @ResponseBody
         @RequestMapping(value = "/getCourseClickCount",method = RequestMethod.GET)
         public JSONArray courseClickCount(String date){
             //如果url没有传值,传入一个默认值
             if(date == null || date.equals("")){
                 date = "20190330";
             }
             List<Course> list = null;
             list = service.findCourseClickCount("ns1:courses_clickcount",date);
             //封装JSON数据
             return JSONArray.fromObject(list);
         }
      }
      

      SearchEngineController

       /**
       * 搜索引擎点击Controller
       */
      @Controller
      public class SearchEngineController {
      
          //注入SearchEngineService
          @Resource(name = "SearchEngineService")
          private SearchEngineService searchEngineService;
      
          //页面跳转
          @RequestMapping("/searchclick")
          public String toGetCourceClick(){
              return "searchclick";
          }
      
          /**
           *  sponseBody注解的作用是将controller的方法返回的对象通过适当的转换器转
           *  换为指定的格式之后,写入到response对象的body区,通常用来返回JSON数据或者是XML
           */
          @ResponseBody
          @RequestMapping(value = "/getSearchClickCount",method = RequestMethod.GET)
          public JSONArray searchClickCount(String date){
              //如果url没有传值,传入一个默认值
              if(date == null || date.equals("")){
                  date = "20190330";
              }
              List<SearchEngine> list = null;
              list = searchEngineService.findSearchEngineCount("ns1:courses_search_clickcount",date);
              //封装JSON数据
              return JSONArray.fromObject(list);
          }
      }
      

      (6). 在Web目录下新建js目录,引入echarts和jquery(自行去官网下载),在Web目录下新建两个jsp文件courseclick.jspsearchclick.jsp用于展示数据,得到如下目录:
      在这里插入图片描述
      courseclick.jsp

       <%@ page contentType="text/html;charset=UTF-8" language="java" %>
       <head>
           <meta charset="UTF-8"/>
           <!-- 设置每隔60秒刷新一次页面-->
           <meta http-equiv="refresh" content="60">
           <title>学习网实战课程实时访问统计</title>
           <script src="js/echarts.min.js"></script>
           <script src="js/jquery-1.11.3.min.js"></script>
           <style type="text/css">
               div{
                   display: inline;
               }
           </style>
       </head>
       <body>
       <div id="main" style="width: 600px;height:400px;float: left;margin-top:50px"></div>
       <div id="main2" style="width: 700px;height:400px;float: right;margin-top:50px"></div>
       <script type="text/javascript">
           var scources = [];
           var scources2 = [];
           var scources3 = [];
           var scources4 = [];
            //获得url上参数date的值
           function GetQueryString(name)
           {
               var reg = new RegExp("(^|&)"+ name +"=([^&]*)(&|$)");
               var r = window.location.search.substr(1).match(reg);//search,查询?后面的参数,并匹配正则
               if(r!=null)return  unescape(r[2]); return null;
           }
           var date = GetQueryString("date");
           $.ajax({
               type:"GET",
               url:"/getCourseClickCount?date="+date,
               dataType:"json",
               async:false,
               error: function (data) {
                   alert("失败啦");
               },
               success:function (result) {
                   if(scources.length != 0){
                       scources.clean();
                       scources2.clean();
                       scources3.clean();
                       scources4.clean();
                   }
                   for(var i = 0; i < result.length; i++){
                       scources.push(result[i].name);
                       scources2.push(result[i].count);
                       scources3.push({"value":result[i].count,"name":result[i].name});
       
                   }
                   for(var i = 0; i < 3; i++){
                       scources4.push({"value":result[i].count,"name":result[i].name});
                   }
               }
           })
       
           // 基于准备好的dom,初始化echarts实例
           var myChart = echarts.init(document.getElementById('main'));
       
           // 指定图表的配置项和数据
           var option = {
                   title: {
                       text: '学习网实时实战课程访问量',
                       subtext: '课程点击数',
                       x:'center'
                   },
               tooltip: {},
              /* legend: {
                   data:['点击数']
               },*/
               xAxis: {
                   data: scources
               },
               yAxis: {},
               series: [{
                   name: '点击数',
                   type: 'bar',
                   data: scources2
               }]
           };
       <!--------------------------------------------------------------------------- -->
           // 使用刚指定的配置项和数据显示图表。
           myChart.setOption(option);
       
           var myChart = echarts.init(document.getElementById('main2'));
       
           // 指定图表的配置项和数据
           var option = {
               title: {
                   text: '学习网实时实战课程访问量',
                   subtext: '课程点击数',
                   x:'center'
               },
               tooltip: {
                   trigger: 'item',
                   formatter: "{a} <br/>{b}: {c} ({d}%)"
               },
               legend: {
                   orient: 'vertical',
                   x: 'left'/*,
                   data:scources*/
               },
               series: [
                   {
                       name:'课程点击数',
                       type:'pie',
                       selectedMode: 'single',
                       radius: [0, '30%'],
       
                       label: {
                           normal: {
                               position: 'inner'
                           }
                       },
                       labelLine: {
                           normal: {
                               show: false
                           }
                       },
                       data:scources4
                   },
                   {
                       name:'课程点击数',
                       type:'pie',
                       radius: ['40%', '55%'],
                       label: {
                           normal: {
                               formatter: '{a|{a}}{abg|}\n{hr|}\n  {b|{b}:}{c}  {per|{d}%}  ',
                               backgroundColor: '#eee',
                               borderColor: '#aaa',
                               borderWidth: 1,
                               borderRadius: 4,
       
                               rich: {
                                   a: {
                                       color: '#999',
                                       lineHeight: 22,
                                       align: 'center'
                                   },
                                   hr: {
                                       borderColor: '#aaa',
                                       width: '100%',
                                       borderWidth: 0.5,
                                       height: 0
                                   },
                                   b: {
                                       fontSize: 16,
                                       lineHeight: 33
                                   },
                                   per: {
                                       color: '#eee',
                                       backgroundColor: '#334455',
                                       padding: [2, 4],
                                       borderRadius: 2
                                   }
                               }
                           }
                       },
                       data:scources3
                   }
               ]
           };
       
           // 使用刚指定的配置项和数据显示图表。
           myChart.setOption(option);
       
         </script>
        </body>
       </html>
      

      courseclick.jsp

      <%@ page contentType="text/html;charset=UTF-8" language="java" %>
      <!DOCTYPE html>
      <html lang="en">
      <head>
          <meta charset="UTF-8"/>
          <!-- 设置每隔60秒刷新一次页面-->
          <meta http-equiv="refresh" content="60">
          <title>学习网课程搜索引擎访问统计</title>
          <script src="js/echarts.min.js"></script>
          <script src="js/jquery-1.11.3.min.js"></script>
          <style type="text/css">
              div{
                  display: inline;
              }
          </style>
      </head>
      <body>
      <div id="main" style="width: 600px;height:400px;float: left;margin-top:50px"></div>
      <div id="main2" style="width: 700px;height:400px;float: right;margin-top:50px"></div>
      <script type="text/javascript">
          var scources = [];
          var scources2 = [];
          var scources3 = [];
          //获得url上参数date的值
          function GetQueryString(name)
          {
              var reg = new RegExp("(^|&)"+ name +"=([^&]*)(&|$)");
              var r = window.location.search.substr(1).match(reg);//search,查询?后面的参数,并匹配正则
              if(r!=null)return  unescape(r[2]); return null;
          }
          var date = GetQueryString("date");
          $.ajax({
              type:"GET",
              url:"/getSearchClickCount?date="+date,
              dataType:"json",
              async:false,
              success:function (result) {
                  if(scources.length != 0){
                      scources.clean();
                      scources2.clean();
                      scources3.clean();
                  }
                  for(var i = 0; i < result.length; i++){
                      scources.push(result[i].name);
                      scources2.push(result[i].count);
                      scources3.push({"value":result[i].count,"name":result[i].name});
                  }
              }
          })
      
          // 基于准备好的dom,初始化echarts实例
          var myChart = echarts.init(document.getElementById('main'));
      
          // 指定图表的配置项和数据
          var option = {
              title: {
                  text: '学习网实时课程搜索引擎访问量',
                  subtext: '搜索引擎搜索数',
                  x:'center'
              },
              color: ['#3398DB'],
              tooltip : {
                  trigger: 'axis',
                  axisPointer : {            // 坐标轴指示器,坐标轴触发有效
                      type : 'shadow'        // 默认为直线,可选为:'line' | 'shadow'
                  }
              },
              grid: {
                  left: '3%',
                  right: '4%',
                  bottom: '3%',
                  containLabel: true
              },
              xAxis : [
                  {
                      type : 'category',
                      data : scources,
                      axisTick: {
                          alignWithLabel: true
                      }
                  }
              ],
              yAxis : [
                  {
                      type : 'value'
                  }
              ],
              series : [
                  {
                      name:'直接访问',
                      type:'bar',
                      barWidth: '60%',
                      data:scources2
                  }
              ]
          };
      
          <!--------------------------------------------------------------------------- -->
          // 使用刚指定的配置项和数据显示图表。
          myChart.setOption(option);
      
          var myChart = echarts.init(document.getElementById('main2'));
      
          // 指定图表的配置项和数据
          var option = {
              title : {
                  text: '学习网搜索引擎搜索图',
                  subtext: '搜索引擎使用比例',
                  x:'center'
              },
              tooltip : {
                  trigger: 'item',
                  formatter: "{a} <br/>{b} : {c} ({d}%)"
              },
              legend: {
                  x : 'center',
                  y : 'bottom',
                  data:scources
              },
              toolbox: {
                  show : true,
                  feature : {
                      mark : {show: true},
                      dataView : {show: true, readOnly: false},
                      magicType : {
                          show: true,
                          type: ['pie', 'funnel']
                      },
                      restore : {show: true},
                      saveAsImage : {show: true}
                  }
              },
              calculable : true,
              series : [
                  {
                      name:'搜索数',
                      type:'pie',
                      radius : [20, 110],
                      center : ['25%', '50%'],
                      roseType : 'radius',
                      label: {
                          normal: {
                              show: false
                          },
                          emphasis: {
                              show: true
                          }
                      },
                      lableLine: {
                          normal: {
                              show: false
                          },
                          emphasis: {
                              show: true
                          }
                      },
                      data:scources3
                  },
                  {
                      name:'搜索数',
                      type:'pie',
                      radius : [30, 110],
                      center : ['75%', '50%'],
                      roseType : 'area',
                      data:scources3
                  }
              ]
          };
      
          // 使用刚指定的配置项和数据显示图表。
          myChart.setOption(option);
      
      </script>
      </body>
      </html>
      

    五、项目结果展示

    1.启动前端Web项目的TomCat服务器,在url地址栏中输入查询的日期:
    在这里插入图片描述
    2.得到如下漂亮的展示结果:
    在这里插入图片描述
    在这里插入图片描述
    配合页面每隔60秒刷新一次数据,后台数据每隔60秒产生一批,Spark Streaming每隔60秒处理一批数据,形成了数据实时产生实时处理实时展示的。

    六、总结

    • 1.在本项目中,我们从后端到前端,由里到外系统地学习了大数据实时流处理的流程,总结一下过程如下后台每隔60s执行一次python脚本产生一批用户日志存储在目标文件中 -> Flume监控目标文件并收集 -> 收集后的日志传给Kafka -> Spark Streaming消费Kafka的数据处理完将结果写入HBase,前端SSM整合读取HBase的数据,并且响应jsp页面的Ajax请求,将数据以JSON格式发给前端页面,最后前端页面使用Echarts展示数据。
      2.本项目只是实现了其中的两个功能,但是还可以挖掘更多的功能,比如统计IP
      地址来获取每个省份最受欢迎课程TOP3,及每门课程在哪个省份最受欢迎等等,能够分析中很多具有价值的信息,这也是大数据魅力值所在!
    • 更多内容请阅读 萧邦主的技术博客导航
    展开全文
  • 在公众号(big_data_community)内回复:“Spark138讲”获取网盘下载地址(限时分享7天,之后有需要的请到公众号留言)本资源来源于网络,分享到网络。


    在公众号(big_data_community)内回复:“Spark138讲”获取网盘下载地址(应大家请求,地址已更新


    本资源来源于网络,分享到网络。

    展开全文
  • 转载:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/ 搭建开发环境 安装 Scala IDE 搭建 Scala 语言开发环境很容易,Scala IDE 官网 下载合适的版本并解压就可以完成安装,本文使用...

    转载:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/

    搭建开发环境

    1. 安装 Scala IDE

      搭建 Scala 语言开发环境很容易,Scala IDE 官网 下载合适的版本并解压就可以完成安装,本文使用的版本是 4.1.0。

    2. 安装 Scala 语言包

      如果下载的 Scala IDE 自带的 Scala 语言包与 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不一致,那么就需要下载和本文所使用的 Spark 所匹配的版本,以确保实现的 Scala 程序不会因为版本问题而运行失败。

      请下载并安装 Scala 2.10.5 版本

    3. 安装 JDK

      如果您的机器上没有安装 JDK,请下载并安装 1.6 版本以上的 JDK。

    4. 创建并配置 Spark 工程

      打开 Scala IDE,创建一个名称为 spark-exercise 的 Scala 工程。

    图 1. 创建 scala 工程

    图 1. 创建 scala 工程

    在工程目录下创建一个 lib 文件夹,并且把您的 Spark 安装包下的 spark-assembly jar 包拷贝到 lib 目录下。

    图 2. Spark 开发 jar 包

    图 2. Spark 开发 jar 包

    并且添加该 jar 包到工程的 classpath 并配置工程使用刚刚安装的 Scala 2.10.5 版本.,工程目录结构如下。

    图 3. 添加 jar 包到 classpath

    图 3. 添加 jar 包到 classpath

    回页首

    运行环境介绍

    为了避免读者对本文案例运行环境产生困惑,本节会对本文用到的集群环境的基本情况做个简单介绍。

    • 本文所有实例数据存储的环境是一个 8 个机器的 Hadoop 集群,文件系统总容量是 1.12T,NameNode 叫 hadoop036166, 服务端口是 9000。读者可以不关心具体的节点分布,因为这个不会影响到您阅读后面的文章。
    • 本文运行实例程序使用的 Spark 集群是一个包含四个节点的 Standalone 模式的集群, 其中包含一个 Master 节点 (监听端口 7077) 和三个 Worker 节点,具体分布如下:
    Server Name Role
    hadoop036166 Master
    hadoop036187 Worker
    hadoop036188 Worker
    hadoop036227 Worker
    • Spark 提供一个 Web UI 去查看集群信息并且监控执行结果,默认地址是:http://<spark_master_ip>:8080 ,对于该实例提交后我们也可以到 web 页面上去查看执行结果,当然也可以通过查看日志去找到执行结果。

    图 4. Spark 的 web console

    图 4. Spark 的 web console

    回页首

    案例分析与编程实现

    案例一

    a. 案例描述

    提起 Word Count(词频数统计),相信大家都不陌生,就是统计一个或者多个文件中单词出现的次数。本文将此作为一个入门级案例,由浅入深的开启使用 Scala 编写 Spark 大数据处理程序的大门。

    b.案例分析

    对于词频数统计,用 Spark 提供的算子来实现,我们首先需要将文本文件中的每一行转化成一个个的单词, 其次是对每一个出现的单词进行记一次数,最后就是把所有相同单词的计数相加得到最终的结果。

    对于第一步我们自然的想到使用 flatMap 算子把一行文本 split 成多个单词,然后对于第二步我们需要使用 map 算子把单个的单词转化成一个有计数的 Key-Value 对,即 word -> (word,1). 对于最后一步统计相同单词的出现次数,我们需要使用 reduceByKey 算子把相同单词的计数相加得到最终结果。
    c. 编程实现

    清单 1.SparkWordCount 类源码

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    
    object SparkWordCount {
     def FILE_NAME:String = "word_count_results_";
     def main(args:Array[String]) {
     if (args.length < 1) {
     println("Usage:SparkWordCount FileName");
     System.exit(1);
     }
     val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
     val sc = new SparkContext(conf);
     val textFile = sc.textFile(args(0));
     val wordCounts = textFile.flatMap(line => line.split(" ")).map(
                                            word => (word, 1)).reduceByKey((a, b) => a + b)
     //print the results,for debug use.
     //println("Word Count program running results:");
     //wordCounts.collect().foreach(e => {
     //val (k,v) = e
     //println(k+"="+v)
     //});
     wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
     println("Word Count program running results are successfully saved.");
     }
    }

    d. 提交到集群执行

    本实例中, 我们将统计 HDFS 文件系统中/user/fams 目录下所有 txt 文件中词频数。其中 spark-exercise.jar 是 Spark 工程打包后的 jar 包,这个 jar 包执行时会被上传到目标服务器的/home/fams 目录下。运行此实例的具体命令如下:

    清单 2.SparkWordCount 类执行命令

     ./spark-submit \
    --class com.ibm.spark.exercise.basic.SparkWordCount \
    --master spark://hadoop036166:7077 \
    --num-executors 3 \
    --driver-memory 6g --executor-memory 2g \
    --executor-cores 2 \
    /home/fams/sparkexercise.jar \
    hdfs://hadoop036166:9000/user/fams/*.txt

    e. 监控执行状态

    该实例把最终的结果存储在了 HDFS 上,那么如果程序运行正常我们可以在 HDFS 上找到生成的文件信息

    图 5. 案例一输出结果

    图 5. 案例一输出结果

    打开 Spark 集群的 Web UI, 可以看到刚才提交的 job 的执行结果。

    图 6. 案例一完成状态

    图 6. 案例一完成状态

    如果程序还没运行完成,那么我们可以在 Running Applications 列表里找到它。

    案例二

    a. 案例描述

    该案例中,我们将假设我们需要统计一个 1000 万人口的所有人的平均年龄,当然如果您想测试 Spark 对于大数据的处理能力,您可以把人口数放的更大,比如 1 亿人口,当然这个取决于测试所用集群的存储容量。假设这些年龄信息都存储在一个文件里,并且该文件的格式如下,第一列是 ID,第二列是年龄。

    图 7. 案例二测试数据格式预览

    图 7. 案例二测试数据格式预览

    现在我们需要用 Scala 写一个生成 1000 万人口年龄数据的文件,源程序如下:

    清单 3. 年龄信息文件生成类源码

     import java.io.FileWriter
     import java.io.File
     import scala.util.Random
    
     object SampleDataFileGenerator {
     
     def main(args:Array[String]) {
     val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false)
     val rand = new Random()
     for ( i <- 1 to 10000000) {
     writer.write( i + " " + rand.nextInt(100))
     writer.write(System.getProperty("line.separator"))
     }
     writer.flush()
     writer.close()
     }
     }

    b. 案例分析

    要计算平均年龄,那么首先需要对源文件对应的 RDD 进行处理,也就是将它转化成一个只包含年龄信息的 RDD,其次是计算元素个数即为总人数,然后是把所有年龄数加起来,最后平均年龄=总年龄/人数。

    对于第一步我们需要使用 map 算子把源文件对应的 RDD 映射成一个新的只包含年龄数据的 RDD,很显然需要对在 map 算子的传入函数中使用 split 方法,得到数组后只取第二个元素即为年龄信息;第二步计算数据元素总数需要对于第一步映射的结果 RDD 使用 count 算子;第三步则是使用 reduce 算子对只包含年龄信息的 RDD 的所有元素用加法求和;最后使用除法计算平均年龄即可。

    由于本例输出结果很简单,所以只打印在控制台即可。

    c. 编程实现

    清单 4.AvgAgeCalculator 类源码

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    object AvgAgeCalculator {
     def main(args:Array[String]) {
     if (args.length < 1){
     println("Usage:AvgAgeCalculator datafile")
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
     val sc = new SparkContext(conf)
     val dataFile = sc.textFile(args(0), 5);
     val count = dataFile.count()
     val ageData = dataFile.map(line => line.split(" ")(1))
     val totalAge = ageData.map(age => Integer.parseInt(
                                    String.valueOf(age))).collect().reduce((a,b) => a+b)
     println("Total Age:" + totalAge + ";Number of People:" + count )
     val avgAge : Double = totalAge.toDouble / count.toDouble
     println("Average Age is " + avgAge)
     }
    }

    d. 提交到集群执行

    要执行本实例的程序,需要将刚刚生成的年龄信息文件上传到 HDFS 上,假设您刚才已经在目标机器上执行生成年龄信息文件的 Scala 类,并且文件被生成到了/home/fams 目录下。

    那么您需要运行一下 HDFS 命令把文件拷贝到 HDFS 的/user/fams 目录。

    清单 5. 年龄信息文件拷贝到 HDFS 目录的命令

    hdfs dfs –copyFromLocal /home/fams /user/fams

    清单 6.AvgAgeCalculator 类的执行命令

     ./spark-submit \
     --class com.ibm.spark.exercise.basic.AvgAgeCalculator \
     --master spark://hadoop036166:7077 \
     --num-executors 3 \
     --driver-memory 6g \
     --executor-memory 2g \
     --executor-cores 2 \
     /home/fams/sparkexercise.jar \
     hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt

    e. 监控执行状态

    在控制台您可以看到如下所示信息:

    图 8. 案例二输出结果

    图 8. 案例二输出结果

    我们也可以到 Spark Web Console 去查看 Job 的执行状态

    图 9. 案例二完成状态

    图 9. 案例二完成状态

    案例三

    a. 案例描述

    本案例假设我们需要对某个省的人口 (1 亿) 性别还有身高进行统计,需要计算出男女人数,男性中的最高和最低身高,以及女性中的最高和最低身高。本案例中用到的源文件有以下格式, 三列分别是 ID,性别,身高 (cm)。

    图 10. 案例三测试数据格式预览

    图 10. 案例三测试数据格式预览

    我们将用以下 Scala 程序生成这个文件,源码如下:

    清单 7. 人口信息生成类源码

    import java.io.FileWriter
    import java.io.File
    import scala.util.Random
    
    object PeopleInfoFileGenerator {
     def main(args:Array[String]) {
     val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false)
     val rand = new Random()
     for ( i <- 1 to 100000000) {
     var height = rand.nextInt(220)
     if (height < 50) {
     height = height + 50
     }
     var gender = getRandomGender
     if (height < 100 && gender == "M")
     height = height + 100
     if (height < 100 && gender == "F")
     height = height + 50
     writer.write( i + " " + getRandomGender + " " + height)
     writer.write(System.getProperty("line.separator"))
     }
     writer.flush()
     writer.close()
     println("People Information File generated successfully.")
     }
     
     def getRandomGender() :String = {
     val rand = new Random()
     val randNum = rand.nextInt(2) + 1
     if (randNum % 2 == 0) {
     "M"
     } else {
     "F"
     }
     }
    }

    b. 案例分析

    对于这个案例,我们要分别统计男女的信息,那么很自然的想到首先需要对于男女信息从源文件的对应的 RDD 中进行分离,这样会产生两个新的 RDD,分别包含男女信息;其次是分别对男女信息对应的 RDD 的数据进行进一步映射,使其只包含身高数据,这样我们又得到两个 RDD,分别对应男性身高和女性身高;最后需要对这两个 RDD 进行排序,进而得到最高和最低的男性或女性身高。

    对于第一步,也就是分离男女信息,我们需要使用 filter 算子,过滤条件就是包含”M” 的行是男性,包含”F”的行是女性;第二步我们需要使用 map 算子把男女各自的身高数据从 RDD 中分离出来;第三步我们需要使用 sortBy 算子对男女身高数据进行排序。

    c. 编程实现

    在实现上,有一个需要注意的点是在 RDD 转化的过程中需要把身高数据转换成整数,否则 sortBy 算子会把它视为字符串,那么排序结果就会受到影响,例如 身高数据如果是:123,110,84,72,100,那么升序排序结果将会是 100,110,123,72,84,显然这是不对的。

    清单 8.PeopleInfoCalculator 类源码

    object PeopleInfoCalculator {
     def main(args:Array[String]) {
     if (args.length < 1){
     println("Usage:PeopleInfoCalculator datafile")
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
     val sc = new SparkContext(conf)
     val dataFile = sc.textFile(args(0), 5);
     val maleData = dataFile.filter(line => line.contains("M")).map(
                                  line => (line.split(" ")(1) + " " + line.split(" ")(2)))
     val femaleData = dataFile.filter(line => line.contains("F")).map(
                                  line => (line.split(" ")(1) + " " + line.split(" ")(2)))
     //for debug use
     //maleData.collect().foreach { x => println(x)}
     //femaleData.collect().foreach { x => println(x)}
     val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
     val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
     //for debug use
     //maleHeightData.collect().foreach { x => println(x)}
     //femaleHeightData.collect().foreach { x => println(x)}
     val lowestMale = maleHeightData.sortBy(x => x,true).first()
     val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
     //for debug use
     //maleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
     //femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
     val highestMale = maleHeightData.sortBy(x => x, false).first()
     val highestFemale = femaleHeightData.sortBy(x => x, false).first()
     println("Number of Male Peole:" + maleData.count())
     println("Number of Female Peole:" + femaleData.count())
     println("Lowest Male:" + lowestMale)
     println("Lowest Female:" + lowestFemale)
     println("Highest Male:" + highestMale)
     println("Highest Female:" + highestFemale)
     }
    }

    d. 提交到集群执行

    在提交该程序到集群执行之前,我们需要将刚才生成的人口信息数据文件上传到 HDFS 集群,具体命令可以参照上文。

    清单 9.PeopleInfoCalculator 类的执行命令

     ./spark-submit \
     --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \
     --master spark://hadoop036166:7077 \
     --num-executors 3 \
     --driver-memory 6g \
     --executor-memory 3g \
     --executor-cores 2 \
     /home/fams/sparkexercise.jar \
     hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt

    e. 监控执行状态

    对于该实例,如程序中打印的一样,会在控制台显示如下信息:

    图 11. 案例三输出结果

    图 11. 案例三输出结果

    在 Spark Web Console 里可以看到具体的执行状态信息

    图 12. 案例三完成状态

    图 12. 案例三完成状态

    案例四

    a. 案例描述

    该案例中我们假设某搜索引擎公司要统计过去一年搜索频率最高的 K 个科技关键词或词组,为了简化问题,我们假设关键词组已经被整理到一个或者多个文本文件中,并且文档具有以下格式。

    图 13. 案例四测试数据格式预览

    图 13. 案例四测试数据格式预览

    我们可以看到一个关键词或者词组可能出现多次,并且大小写格式可能不一致。

    b. 案例分析

    要解决这个问题,首先我们需要对每个关键词出现的次数进行计算,在这个过程中需要识别不同大小写的相同单词或者词组,如”Spark”和“spark” 需要被认定为一个单词。对于出现次数统计的过程和 word count 案例类似;其次我们需要对关键词或者词组按照出现的次数进行降序排序,在排序前需要把 RDD 数据元素从 (k,v) 转化成 (v,k);最后取排在最前面的 K 个单词或者词组。

    对于第一步,我们需要使用 map 算子对源数据对应的 RDD 数据进行全小写转化并且给词组记一次数,然后调用 reduceByKey 算子计算相同词组的出现次数;第二步我们需要对第一步产生的 RDD 的数据元素用 sortByKey 算子进行降序排序;第三步再对排好序的 RDD 数据使用 take 算子获取前 K 个数据元素。

    c. 编程实现

    清单 10.TopKSearchKeyWords 类源码

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object TopKSearchKeyWords {
     def main(args:Array[String]){
     if (args.length < 2) {
     println("Usage:TopKSearchKeyWords KeyWordsFile K");
     System.exit(1)
     }
     val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
     val sc = new SparkContext(conf)
     val srcData = sc.textFile(args(0))
     val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
     //for debug use
     //countedData.foreach(x => println(x))
     val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
     val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
     topKData.foreach(println)
     }
    }

    d. 提交到集群执行

    清单 11.TopKSearchKeyWords 类的执行命令

     ./spark-submit \
     --class com.ibm.spark.exercise.basic.TopKSearchKeyWords \
     --master spark://hadoop036166:7077 \
     --num-executors 3 \
     --driver-memory 6g \
     --executor-memory 2g \
     --executor-cores 2 \
     /home/fams/sparkexercise.jar \
     hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt

    e. 监控执行状态

    如果程序成功执行,我们将在控制台看到以下信息。当然读者也可以仿照案例二和案例三那样,自己尝试使用 Scala 写一段小程序生成此案例需要的源数据文件,可以根据您的 HDFS 集群的容量,生成尽可能大的文件,用来测试本案例提供的程序。

    图 14. 案例四输出结果

    图 14. 案例四输出结果

    图 15. 案例四完成状态

    图 15. 案例四完成状态

    回页首

    Spark job 的执行流程简介

    我们可以发现,Spark 应用程序在提交执行后,控制台会打印很多日志信息,这些信息看起来是杂乱无章的,但是却在一定程度上体现了一个被提交的 Spark job 在集群中是如何被调度执行的,那么在这一节,将会向大家介绍一个典型的 Spark job 是如何被调度执行的。

    我们先来了解以下几个概念:

    DAG: 即 Directed Acyclic Graph,有向无环图,这是一个图论中的概念。如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图。

    Job:我们知道,Spark 的计算操作是 lazy 执行的,只有当碰到一个动作 (Action) 算子时才会触发真正的计算。一个 Job 就是由动作算子而产生包含一个或多个 Stage 的计算作业。

    Stage:Job 被确定后,Spark 的调度器 (DAGScheduler) 会根据该计算作业的计算步骤把作业划分成一个或者多个 Stage。Stage 又分为 ShuffleMapStage 和 ResultStage,前者以 shuffle 为输出边界,后者会直接输出结果,其边界可以是获取外部数据,也可以是以一个 ShuffleMapStage 的输出为边界。每一个 Stage 将包含一个 TaskSet。

    TaskSet: 代表一组相关联的没有 shuffle 依赖关系的任务组成任务集。一组任务会被一起提交到更加底层的 TaskScheduler。

    Task:代表单个数据分区上的最小处理单元。分为 ShuffleMapTask 和 ResultTask。ShuffleMapTask 执行任务并把任务的输出划分到 (基于 task 的对应的数据分区) 多个 bucket(ArrayBuffer) 中,ResultTask 执行任务并把任务的输出发送给驱动程序。

    Spark 的作业任务调度是复杂的,需要结合源码来进行较为详尽的分析,但是这已经超过本文的范围,所以这一节我们只是对大致的流程进行分析。

    Spark 应用程序被提交后,当某个动作算子触发了计算操作时,SparkContext 会向 DAGScheduler 提交一个作业,接着 DAGScheduler 会根据 RDD 生成的依赖关系划分 Stage,并决定各个 Stage 之间的依赖关系,Stage 之间的依赖关系就形成了 DAG。Stage 的划分是以 ShuffleDependency 为依据的,也就是说当某个 RDD 的运算需要将数据进行 Shuffle 时,这个包含了 Shuffle 依赖关系的 RDD 将被用来作为输入信息,进而构建一个新的 Stage。我们可以看到用这样的方式划分 Stage,能够保证有依赖关系的数据可以以正确的顺序执行。根据每个 Stage 所依赖的 RDD 数据的 partition 的分布,会产生出与 partition 数量相等的 Task,这些 Task 根据 partition 的位置进行分布。其次对于 finalStage 或是 mapStage 会产生不同的 Task,最后所有的 Task 会封装到 TaskSet 内提交到 TaskScheduler 去执行。有兴趣的读者可以通过阅读 DAGScheduler 和 TaskScheduler 的源码获取更详细的执行流程。

    回页首

    结束语

    通过本文,相信读者对如何使用 Scala 编写 Spark 应用程序处理大数据已经有了较为深入的了解。当然在处理实际问题时,情况可能比本文举得例子复杂很多,但是解决问题的基本思想是一致的。在碰到实际问题的时候,首先要对源数据结构格式等进行分析,然后确定如何去使用 Spark 提供的算子对数据进行转化,最终根据实际需求选择合适的算子操作数据并计算结果。本文并未介绍其它 Spark 模块的知识,显然这不是一篇文章所能完成的,希望以后会有机会总结更多的 Spark 应用程序开发以及性能调优方面的知识,写成文章与更多的 Spark 技术爱好者分享,一起进步。由于时间仓促并且本人知识水平有限,文章难免有未考虑周全的地方甚至是错误,希望各位朋友不吝赐教。有任何问题,都可以在文末留下您的评论,我会及时回复。

    展开全文
  • 一、flume安装 (一)概述    Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等...

       由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现

    1、集群测试实例

       代码如下:
    from pyspark.sql import SparkSession

    if __name__ == "__main__":
        spark = SparkSession\
                .builder\
                .appName("PythonWordCount")\
                .master("spark://mini1:7077") \
                .getOrCreate()
        spark.conf.set("spark.executor.memory", "500M")
        sc = spark.sparkContext
        a = sc.parallelize([1, 2, 3])
        b = a.flatMap(lambda x: (x,x ** 2))
        print(a.collect())
        print(b.collect())
    

       运行结果:
    在这里插入图片描述

    2、从文件中读取

       为了方便调试,这里采用本地模式进行测试

    from py4j.compat import long
    from pyspark.sql import SparkSession
    def formatData(arr):
        # arr = arr.split(",")
        mb = (arr[0], arr[2])
        flag = arr[3]
        time = long(arr[1])
        # time = arr[1]
        if flag == "1":
              time = -time
        return (mb,time)
    
    
    if __name__ == "__main__":
        spark = SparkSession\
                .builder\
                .appName("PythonWordCount")\
                .master("local")\
                .getOrCreate()
    
        sc = spark.sparkContext
        # sc = spark.sparkContext
        line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))
        count = line.map(lambda x: formatData(x))
        rdd0 = count.reduceByKey(lambda agg, obj: agg + obj)
        # print(count.collect())
        line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))
    
        rdd = count.map(lambda arr: (arr[0][1], (arr[0][0], arr[1])))
        rdd1 = line2.map(lambda arr: (arr[0], (arr[1], arr[2])))
    
        rdd3 = rdd.join(rdd1)
        rdd4 =rdd0.map(lambda arr: (arr[0][0], arr[0][1], arr[1]))
            # .map(lambda arr: list(arr).sortBy(lambda arr1: arr1[2]).reverse)
        rdd5 = rdd4.groupBy(lambda arr: arr[0]).values().map(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True))
        print(rdd5.collect())
    

       原文件数据:
    在这里插入图片描述

    在这里插入图片描述

       结果如下:

    [[('18688888888', '16030401EAFB68F1E3CDF819735E1C66', 87600), ('18688888888', '9F36407EAD0629FC166F14DDE7970F68', 51200), ('18688888888', 'CC0710CC94ECC657A8561DE549D940E0', 1300)], [('18611132889', '16030401EAFB68F1E3CDF819735E1C66', 97500), ('18611132889', '9F36407EAD0629FC166F14DDE7970F68', 54000), ('18611132889', 'CC0710CC94ECC657A8561DE549D940E0', 1900)]]
    

    3、读取文件并将结果保存至文件

    from pyspark.sql import SparkSession
    from py4j.compat import long
    
    
    def formatData(arr):
        # arr = arr.split(",")
        mb = (arr[0], arr[2])
        flag = arr[3]
        time = long(arr[1])
        # time = arr[1]
        if flag == "1":
              time = -time
        return (mb,time)
    
    
    if __name__ == "__main__":
        spark = SparkSession\
                .builder\
                .appName("PythonWordCount")\
                .master("local")\
                .getOrCreate()
        sc = spark.sparkContext
        line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))
        rdd0 = line.map(lambda x: formatData(x))
        rdd1 = rdd0.reduceByKey(lambda agg, obj: agg + obj).map(lambda t: (t[0][1], (t[0][0], t[1])))
        line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))
        rdd2 = line2.map(lambda x: (x[0], (x[1], x[2])))
        rdd3 = rdd1.join(rdd2).map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1][0], x[1][1][1]))
    
        rdd4 = rdd3.groupBy(lambda x: x[0])
        rdd5 = rdd4.mapValues(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True)[:2])
        print(rdd1.join(rdd2).collect())
        print(rdd5.collect())
        rdd5.saveAsTextFile("D:\\code\\hadoop\\data\\spark\\day02\\out1")
        sc.stop()
    
       结果如下:

    在这里插入图片描述

    4、根据自定义规则匹配

    import urllib
    from pyspark.sql import SparkSession
    def getUrls(urls):
        url = urls[0]
        parsed = urllib.parse.urlparse(url)
        return (parsed.netloc, url, urls[1])
    
    if __name__ == "__main__":
        spark = SparkSession \
            .builder \
            .appName("PythonWordCount") \
            .master("local") \
            .getOrCreate()
        sc = spark.sparkContext
        line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day02\\itcast.log").map(lambda x: x.split('\t'))
        //从数据库中加载规则
        arr = ["java.itcast.cn", "php.itcast.cn", "net.itcast.cn"]
        rdd1 = line.map(lambda x: (x[1], 1))
        rdd2 = rdd1.reduceByKey(lambda agg, obj: agg + obj)
        rdd3 = rdd2.map(lambda x: getUrls(x))
    
        for ins in arr:
            rdd = rdd3.filter(lambda x:x[0] == ins)
            result = rdd.sortBy(lambda x: x[2], ascending = False).take(2)
            print(result)
        spark.stop()
    

       结果如下:
    在这里插入图片描述

    5、自定义类排序

    from operator import gt
    from pyspark.sql import SparkSession
    
    
    class Girl:
        def __init__(self, faceValue, age):
            self.faceValue = faceValue
            self.age = age
    
        def __gt__(self, other):
            if other.faceValue == self.faceValue:
                return gt(self.age, other.age)
            else:
                return gt(self.faceValue, other.faceValue)
    
    
    if __name__ == "__main__":
        spark = SparkSession\
                .builder\
                .appName("PythonWordCount")\
                .master("local")\
                .getOrCreate()
        sc = spark.sparkContext
        rdd1 = sc.parallelize([("yuihatano", 90, 28, 1), ("angelababy", 90, 27, 2), ("JuJingYi", 95, 22, 3)])
        rdd2 = rdd1.sortBy(lambda das: Girl(das[1], das[2]),False)
        print(rdd2.collect())
        sc.stop()
    

       结果如下:

    在这里插入图片描述

    6、JDBC

    from pyspark import SQLContext
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession\
                .builder\
                .appName("PythonWordCount")\
                .master("local")\
                .getOrCreate()
        sc = spark.sparkContext
        sqlContext = SQLContext(sc)
        df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/hellospark",driver="com.mysql.jdbc.Driver",dbtable="(select * from actor) tmp",user="root",password="123456").load()
        print(df.select('description','age').show(2))
        # print(df.printSchema)
    
        sc.stop()
    

       结果如下:
    在这里插入图片描述

    展开全文
  • 写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,我觉得,就是要来调节最优的资源配置;在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源...
  • 背景 业务上有一份行车轨迹的数据 carRecord.csv 如下: id;carNum;orgId;capTime 1;粤A321;0002;20200512 102010 2;云A321;0001;20200512 102010 3;粤A321;0001;20200512 103010 4;云A321;...2020051...
  • Spark版本 cdh5.9.0集成的spark的版本1.6.0,集成的hadoop版本2.6.0。查看的网址:http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/5.9.0/如果用cdh5.9.0 parcels离线安装自带的spark(on yarn),启动时提示...
  • 大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室...
  • Spark实战高手之路

    2020-07-21 10:00:02
    Spark实战
  • 自己按照视频讲的整理即学即用Spark实战44讲范东来模块一,详细内容参考链接https://kaiwu.lagou.com/course/courseInfo.htm?courseId=71#/detail/pc?id=1971
  • hadoop组件—spark实战----spark on k8s模式k8s原生方式安装spark2.4.4 cluster mode hadoop组件—spark实战----spark on k8s模式k8s原生方式—cluster mode调用运行spark程序 本篇文章尝试安装client mode。 两种...
  • hadoop组件—spark实战----spark on k8s模式k8s原生方式安装spark2.4.4 cluster mode 本章记录 cluster mode这种方式的spark集群使用方法。 首先注意 提交spark命令需要借助 spark的安装包中的spark-submit命...
  • 大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室...
  • hadoop组件—spark实战----spark on k8s模式k8s原生方式安装spark2.4.4 client mode和使用 本篇文章记录运行pyspark和提交python程序。 制作有python和java环境和包含有spark2.4.4客户端的镜像 ...
  • Spark实战教程 大强老师 大华软件学院 技术总监 / 高级讲师 曾就...
  •  使用ide开发spark 实战 使用ide 开发spark的Local和Cluster  一: 配置开发环境 1. 要在本地安装好java和scala。 由于spark1.6需要scala 2.10.X版本的。推荐 2.10.4,java版本最好是1.8。所以提前我们要...
  • 第34课:在IDEA中开发Spark实战
  • Spark实战开发

    2020-07-22 23:31:26
    Apache Spark是大型数据处理的快速和通用引擎,此为Spark实战开发教程 ,适合初学者 ,简单易学。
  • Spark实战项目--电商

    2020-07-17 10:14:27
    Spark实战项目——电商指标统计 一、引言 1.1 框架设计原理 1.2 框架搭建 1.2.1 Util 1.2.1.1 EnvUtils 1.2.1.2 PropertiesUtil 1.2.2 core 1.2.2.1 TApplication 1.2.2.2 TController 1.2.2.3 TService 1.2.2.4 ...
  • Spark实战高手之路-从零开始
1 2 3 4 5 ... 20
收藏数 18,653
精华内容 7,461
关键字:

spark实战