精华内容
参与话题
问答
  • 简单的Spark案例——课程学习量统计

    千次阅读 2018-10-12 00:16:40
    需求:如下图的文件中有很多访问记录,第一列表示访问站点的时间戳,第二列表示访问的站点,中间用制表符分割。这里相当于学习的不同课程,如java,ui,bigdata,android,h5等,其中每门课程又分为子课程,如h5课程...

    需求:如下图的文件中有很多访问记录,第一列表示访问站点的时间戳,第二列表示访问的站点,中间用制表符分割。这里相当于学习的不同课程,如java,ui,bigdata,android,h5等,其中每门课程又分为子课程,如h5课程分为teacher,course等。现在需要统计每门课程,学习量最高的两门子课程并降序排列。

    测试数据下载地址

    链接:https://pan.baidu.com/s/1DjeJkeBwfzLUl0iqF5yo8w 密码:hd35 文件为:access.txt

    package com.whua
    
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * @author: whua 
      * @create: 2018/10/11 18:54
      */
    object ProjectCount1 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("ObjectCount1").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
        //获取数据
        val file: RDD[String] = sc.textFile("/Users/whua/Documents/tmpTest/access.txt")
    
        //提取url并生成为一个元组
        val urlAndOne: RDD[(String, Int)] = file.map(line => {
          val fields = line.split("\t")
          val url = fields(1)
          (url, 1)
        })
    
        //把相同的url聚合
        val sumedUrl: RDD[(String, Int)] = urlAndOne.reduceByKey(_ + _)
    
        //获取学科信息
        val project: RDD[(String, String, Int)] = sumedUrl.map(x => {
          val url = x._1
          val count = x._2
          val project = new URL(url).getHost
          (project, url, count)
        })
    
        //用学科来分组,聚合后得到结果
        val ans: Array[(String, List[(String, String, Int)])] = project.groupBy(_._1)
          .mapValues(_.toList.sortBy(_._3).reverse.take(2)).collect()
        //   val ans: Array[(String, String, Int)] = project.sortBy(_._3,false).take(3)
        for (item <- ans) {
          println(item)
        }
        sc.stop()
      }
    }

    上面的代码是比较基础的代码,接下来在此基础上,引入缓存机制分区器,缓存机制主要是将常用的数据缓存起来,再次调用的时候效率较高;而分区器是为了解决数据倾斜的问题,在结果生成的文件中,我么可以看到,有的文件中产生的数据量多,有的文件中产生的数据量少,分区器是解决数据倾斜的方法之一。下面我们将调用Spark自带的哈希分区器,显而易见,是采用哈希的方式来放置数据产生的位置。

    package com.whua
    
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
    
    /**
      * @author: whua 
      * @create: 2018/10/11 20:13
      */
    object ProjectCount2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("ObjectCount1").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
        //获取数据
        val file: RDD[String] = sc.textFile("/Users/whua/Documents/tmpTest/access.txt")
    
        //提取url并生成为一个元组
        val urlAndOne: RDD[(String, Int)] = file.map(line => {
          val fields = line.split("\t")
          val url = fields(1)
          (url, 1)
        })
    
        //把相同的url聚合
        val sumedUrl: RDD[(String, Int)] = urlAndOne.reduceByKey(_ + _)
    
        //获取学科信息并缓存
        val cachedProject: RDD[(String, (String, Int))] = sumedUrl.map(x => {
          val url = x._1
          val count = x._2
          val project = new URL(url).getHost
          (project, (url, count))
        }).cache()
    
        //调用Spark自带的分区器此时会发生哈希碰撞,需要自定义分区器
        val ans: RDD[(String, (String, Int))] = cachedProject.partitionBy(new HashPartitioner(3))
        ans.saveAsTextFile("/Users/whua/Documents/tmpTest/out")
    
        sc.stop()
      }
    }

    但是,有时候在不同的数据,不同的实际问题中,自带定分区器不一定能很好地解决数据倾斜的问题,这时候就需要自定义分区器,自定义的分区器主要是继承Partitioner抽象类,重写两个方法。这里我们根据不同的课程名来进行分区,即相同的课程名的记录写到同一个文件中。

    package com.whua
    
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    /**
      * @author: whua 
      * @create: 2018/10/11 20:41
      */
    object ProjectCount3 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("ObjectCount1").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
        //获取数据
        val file: RDD[String] = sc.textFile("/Users/whua/Documents/tmpTest/access.txt")
    
        //提取url并生成为一个元组
        val urlAndOne: RDD[(String, Int)] = file.map(line => {
          val fields = line.split("\t")
          val url = fields(1)
          (url, 1)
        })
    
        //把相同的url聚合
        val sumedUrl: RDD[(String, Int)] = urlAndOne.reduceByKey(_ + _)
    
        //获取学科信息并缓存
        val cachedProject: RDD[(String, (String, Int))] = sumedUrl.map(x => {
          val url = x._1
          val count = x._2
          val project = new URL(url).getHost
          (project, (url, count))
        }).cache()
    
        //得到所有学科
        val projects: Array[String] = cachedProject.keys.distinct().collect()
        //调用自定义分区器并得到分区号
        val partitioner: ProjectPartitioner = new ProjectPartitioner(projects)
        //分区
        val partitioned: RDD[(String, (String, Int))] = cachedProject.partitionBy(partitioner)
        //每个分区的数据进行排序并取top2
        val ans: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
          it.toList.sortBy(_._2._2).reverse.take(2).iterator
        })
    
        ans.saveAsTextFile("/Users/whua/Documents/tmpTest/out")
        sc.stop()
      }
    }
    
    class ProjectPartitioner(projects: Array[String]) extends Partitioner {
      //用来存放学科和分区号
      private val projectsAndPartNum = new mutable.HashMap[String, Int]
      //计数器,用于指定分区号
      var n = 0
    
      for (pro <- projects) {
        //HashMap插入
        projectsAndPartNum += (pro -> n)
        n += 1
      }
    
      //得到分区数
      override def numPartitions: Int = projects.length
    
      //得到分区号
      override def getPartition(key: Any): Int = {
        projectsAndPartNum.getOrElse(key.toString, 0)
      }
    }

    结果如下:

     

    展开全文
  • Spark案例实战之一

    千次阅读 2018-04-11 20:30:06
    一.计算最受欢迎的老师 1.项目需求:现有某网络上的访问日志,现需要计算某一学科下被访问次数最多的老师。 ... 3.代码如下: ...import java.net.URL ...import org.apache.spark.rdd.RDD import org...

    一.计算最受欢迎的老师

    1.项目需求:现有某网络上的访问日志,现需要计算某一学科下被访问次数最多的老师。
    2.网络的url如右:http://bigdata.xiaoniu.com/laozhaobigdata表示学科,laozhao表示教师。
    3.代码如下:

    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /*
    1.分析最受欢迎的老师
     */
    object PopularTeacher{
      def main(args:Array[String]): Unit = {
        val words = Array("http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://java.xiaoniu.com/laozhang",
          "http://java.xiaoniu.com/laozhang",
          "http://python.xiaoniu.com/laoqian",
          "http://java.xiaoniu.com/laoli",
          "http://python.xiaoniu.com/laoli",
          "http://python.xiaoniu.com/laoli")
    
        val conf = new SparkConf().setAppName("Popular").setMaster("local")
        val sc = new SparkContext(conf)
    
        //读取数据
        //val result1 :RDD [String]= sc.textFile(args(0))
        val result1 = sc.parallelize(words)
        val subjectAndTeacher:RDD[(String,String)]  = result1.map(lines =>{
          val url = new URL(lines)
          println("url = "+url)
          val host = new URL(lines).getHost
          println("host = "+host)
    
          val subject = host.substring(0,host.indexOf("."))//切分字符串
          val teacher = url.getPath.substring(1)//获得老师的名字
          (subject,teacher)//这是一个直接返回的
        })//整理数据
    
        //总的排序
        val result2 = subjectAndTeacher.map(x => (x,1))  //形成  ((键值对),1) 这种map
        val result22 = result2.reduceByKey(_+_)//根据键将相同的合并
        //print("result22's content are:") //并行的程序,你永远都不知道是不是按照程序的顺序输出
        result22.foreach(println)
    
        val result3: Array[((String, String), Int)] = result22.collect()
        //println(result3.toBuffer)
    
        //每个学科里面做排序   局部排序   按照学科的名字排序
        //val result4  = result22.groupBy(_._1._1)
        val result4: RDD[(String, Iterable[((String, String), Int)])] = result22.groupBy(x => x._1._1)
    
        //二次排序
        //将keys和values转换成List类型,然后按照values排序,然后倒叙输出,然后取前三
        val result5: RDD[(String, List[((String, String), Int)])] = result4.mapValues(_.toList.sortBy(_._2).reverse.take(3))
        val result = result5.collect()
        result5.foreach(println)
      }
    }
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    /**
      *1.自定义分区器
      *2.继承自Partitioner
      *3.subjects是一个字符串数组
      *
       * @param subjects
      */
    class SelfPartition (subjects :Array[String]) extends Partitioner{
     /*当课程和分区之间没有定义规则时,需要自定义规则
     val rules = new mutable.HashMap[String ,Int]()
     var i = 0
      for (sub <- subjects){
        rules += (sub -> i)
        i+=1
      }
      */
      //直接固定map
      val rules = Map("bigdata"-> 1,"java"->2,"python"->3)//不用new 直接写Map
    
      //定义分区数    是个方法,而不是定义变量
      override def numPartitions: Int = {
        subjects.length
      }
    
      //获取具体分区
      override def getPartition(key: Any): Int ={
        val k = key.toString
        rules.getOrElse(k,0)
      }
    }
    
    /**
      * 1.访问记录存储是一个URL,暂时用一个records = Array[String]来存储
      * 2.将records转换成text(一个rdd)
      * 3.对text进行操作,如:mapPartitions,map
      * 4.将操作后的结果收集并写出到控制台
      */
    
    
    object FavoriteTeacher{
      def main (args:Array[String]): Unit ={
        val conf = new SparkConf().setAppName("FavoriteTeacher").setMaster("local")
        val sc = new SparkContext(conf)
    
        //存储文本
        val records: Array[String] = Array("http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://java.xiaoniu.com/laozhang",
          "http://java.xiaoniu.com/laozhang",
          "http://python.xiaoniu.com/laoqian",
          "http://java.xiaoniu.com/laoli",
          "http://python.xiaoniu.com/laoli",
          "http://python.xiaoniu.com/laoli")
        val text: RDD[String] = sc.parallelize(records)//转换成rdd
        print("First disposition:")
        text.collect().foreach(println)
        //打印结果如下:http://bigdata.xiaoniu.com/laozhao
    //    http://bigdata.xiaoniu.com/laozhao
    //    http://bigdata.xiaoniu.com/laozhao
    //    http://bigdata.xiaoniu.com/laozhao
    //    http://bigdata.xiaoniu.com/laozhao
    //    http://java.xiaoniu.com/laozhang
    //    http://java.xiaoniu.com/laozhang
    //    http://python.xiaoniu.com/laoqian
    //    http://java.xiaoniu.com/laoli
    //    http://python.xiaoniu.com/laoli
    //    http://python.xiaoniu.com/laoli
    
        /*
          1.处理lines,并返回一个(String,String)元组
         */
        def fun1(lines :String ): (String, String) = {
          val url = new URL(lines)//将lines转换成URL
          val hostName = url.getHost//获取host
          val path = url.getPath//获取path
          val courseName = hostName.substring(0,hostName.indexOf("."))//获取课程名
          val teacherName = path.substring(1)//获取教师的姓名
          (courseName,teacherName)
        }
        val res1: RDD[(String, String)] = text.map(fun1)
        print("Second disposition:")
        res1.foreach(print)
        //打印结果如下:(bigdata,laozhao)(bigdata,laozhao)(bigdata,laozhao)
        // (bigdata,laozhao)(bigdata,laozhao)(java,laozhang)(java,laozhang)(python,laoqian)
        // (java,laoli)(python,laoli)(python,laoli)
    
    
        val res2: RDD[((String, String), Int)] = res1.map(x => (x,1))//形成一个map 组合
        val res3: RDD[((String, String), Int)] = res2.reduceByKey(_+_)//根据Key将每个map合并
        print("Third disposition:")
        res3.foreach(print)
        val res4: RDD[(String, Iterable[((String, String), Int)])] = res3.groupBy(_._1._1)//根据学科来分组
        res4.foreach(println)
        val finRes  = res4.mapValues(x => x.toList.sortBy(_._2).reverse.take(2))//对value操作!很重要
        finRes.foreach(print)
    
    //    val selfPartition = new SelfPartition(records)//new 一个分区对象
    //    val res4 = res2.reduceByKey(selfPartition,_+_)
      }
    }
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    /**
      *1.自定义分区器
      *2.继承自Partitioner
      *3.subjects是一个字符串数组
      *
       * @param subjects
      */
    class SelfPartition (subjects :Array[String]) extends Partitioner{
     //当课程和分区之间没有定义规则时,需要自定义规则
     val rules = new mutable.HashMap[String ,Int]()
     var i = 0
      for (sub <- subjects){
        rules += (sub -> i)  //将rules逐渐添加完
        i+=1
      }
    
      //直接固定map
      //val rules = Map("bigdata"-> 1,"java"->2,"python"->3)//不用new 直接写Map
    
      //定义分区数    是个方法,而不是定义变量
      override def numPartitions: Int = {
        subjects.length+   1
      }
    
      //获取具体分区
      override def getPartition(key: Any): Int ={
        val k = key.toString
        rules.getOrElse(k,0)
      }
    }
    
    /**
      * 1.访问记录存储是一个URL,暂时用一个records = Array[String]来存储
      * 2.将records转换成text(一个rdd)
      * 3.对text进行操作,如:mapPartitions,map
      * 4.将操作后的结果收集并写出到控制台
      * 5.让每个学科分到各自的分区
      */
    
    
    object FavoriteTeacher{
      def main (args:Array[String]): Unit ={
        val conf = new SparkConf().setAppName("FavoriteTeacher").setMaster("local")
        val sc = new SparkContext(conf)
    
        //存储文本
        val records: Array[String] = Array("http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://bigdata.xiaoniu.com/laozhao",
          "http://java.xiaoniu.com/laozhang",
          "http://java.xiaoniu.com/laozhang",
          "http://python.xiaoniu.com/laoqian",
          "http://java.xiaoniu.com/laoli",
          "http://python.xiaoniu.com/laoli",
          "http://python.xiaoniu.com/laoli")
        val text: RDD[String] = sc.parallelize(records)//转换成rdd
        print("First disposition:")
        text.collect().foreach(println)
        //打印结果如下:http://bigdata.xiaoniu.com/laozhao
    //    http://bigdata.xiaoniu.com/laozhao
    //    http://bigdata.xiaoniu.com/laozhao
    //    http://bigdata.xiaoniu.com/laozhao
    //    http://bigdata.xiaoniu.com/laozhao
    //    http://java.xiaoniu.com/laozhang
    //    http://java.xiaoniu.com/laozhang
    //    http://python.xiaoniu.com/laoqian
    //    http://java.xiaoniu.com/laoli
    //    http://python.xiaoniu.com/laoli
    //    http://python.xiaoniu.com/laoli
    
        /*
          1.处理lines,并返回一个(String,String)元组
         */
        def fun1(lines :String ): (String, String) = {
          val url = new URL(lines)//将lines转换成URL
          val hostName = url.getHost//获取host
          val path = url.getPath//获取path
          val courseName = hostName.substring(0,hostName.indexOf("."))//获取课程名
          val teacherName = path.substring(1)//获取教师的姓名
          (courseName,teacherName)
        }
        val res1: RDD[(String, String)] = text.map(fun1)
        print("Second disposition:")
        res1.foreach(print)
        //打印结果如下:(bigdata,laozhao)(bigdata,laozhao)(bigdata,laozhao)
        // (bigdata,laozhao)(bigdata,laozhao)(java,laozhang)(java,laozhang)(python,laoqian)
        // (java,laoli)(python,laoli)(python,laoli)
    
    
        val res2: RDD[((String, String), Int)] = res1.map(x => (x,1))//形成一个map 组合
        val subjects: Array[String] = res2.map(_._1._1).distinct().collect()
    
        print("subjects = "+subjects)
    
    
        val res3: RDD[((String, String), Int)] = res2.reduceByKey(_+_)//根据Key将每个map合并
        print("Third disposition:")
        res3.foreach(print)
    
        val selfPartition = new SelfPartition(subjects)
    
        //按照自定义的规则分区shuffle
         val res4: RDD[(String, (String, Int))] = res3.map(t => (t._1._1, (t._1._2,t._2))).partitionBy(selfPartition)
    
        /*
          * 1.分区中本来就是Iterator,所以在toList之后,需要再转换成iterator
          */
        val result: RDD[(String, (String, Int))] = res4.mapPartitions(_.toList.sortBy(_._2._2).reverse.take(2).iterator)
        result.foreach(print)
      }
    }
    展开全文
  • spark案例难>

    2020-08-13 23:26:16
    1:需求 2:代码实现 ...import java.text.SimpleDateFormat ...import org.apache.spark.{Partitioner, SparkConf, SparkContext} import scala.collection.mutable object XiangMu { def main(a..

    1:需求

     

     

    2:代码实现

    package Day07
    
    import java.text.SimpleDateFormat
    import java.util.Date
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    import scala.collection.mutable
    
     
    
    object XiangMu {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("JuHe").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
        val lines = sc.textFile("F:\\spark\\spark-day05\\资料\\data.csv")
        //用mapPartition,比map好,这样只需要new一次SimpleDateFormat
        val mapLines: RDD[((String, Long, Long, Long), Null)] = lines.mapPartitions(it => {
          val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          it.map(i => {
            val files = i.split(",")
            val uid = files(0)
            val str = files(1)
            val startTime = dateFormat.parse(str).getTime
            val str1 = files(2)
            val endTime = dateFormat.parse(str1).getTime
            val flow = files(3).toLong
            ((uid, startTime, endTime, flow), null)
          })
        })
        //将uid去重
        val disUid: Array[(String)] = mapLines.map(_._1._1).distinct().collect()
        //传入自定义分区
        val partition: MyIdPartition = new MyIdPartition(disUid)
    
        //按什么排序
          implicit val value = Ordering[Long].on[(String, Long, Long, Long)](i => i._2)
        //repartitionAndSortWithinPartitions重新排序,然后在分区内排序
        val partitionLines: RDD[((String, Long, Long, Long), Null)] = mapLines.repartitionAndSortWithinPartitions(partition)
        //我们将如果大于了十分钟为0,第二个大于十分钟为1,依次累加.得到一个变量
        
        val value1: RDD[((String, Long), (Long, Long, Long))] = partitionLines.mapPartitions(it => {
          var tempp = 0L
          var folt = 0
    
          it.map(i => {
            val starTime: Long = i._1._2
            val endTime: Long = i._1._3
            if (tempp != 0) {
              if ((starTime - tempp) / 1000 / 60 > 10) {
                folt += 1
              } else {
                folt += 0
              }
            }
            tempp = endTime
            ((i._1._1, folt), (starTime, endTime, i._1._4))
          })
    
    
        })
        //聚合每个key的起始时间和结束实际谁最小和大相加flow,将时间戳转为正常时间
        val value2: RDD[((String, Long), (Long, Long, Long))] = value1.reduceByKey((x, y) => (Math.min(x._1, y._1), Math.max(x._2, y._2), (x._3 + y._3)))
    
        val value3: RDD[((String, Long), String, String, Long)] = value2.mapPartitions(it => {
          val sdf = new SimpleDateFormat("yyyy:MM:dd HH:mm:ss")
          it.map(i => {
    
            val start = sdf.format(new Date(i._2._1))
            val end = sdf.format(new Date(i._2._2))
            ((i._1._1, i._1._2), start, end, i._2._3)
    
          })
        })
        println(value3.collect().toBuffer)
    
    
     //ArrayBuffer(((2,2),2020:02:18 16:01:23,2020:02:18 17:40:52,90), ((1,0),2020:02:18 14:20:30,2020:02:18 15:20:30,50), ((2,1),2020:02:18 15:20:49,2020:02:18 15:30:24,30), ((3,0),2020:02:18 14:39:58,2020:02:18 15:35:53,50),
        //         ((1,1),2020:02:18 15:37:23,2020:02:18 18:03:27,150), ((2,0),2020:02:18 14:18:24,2020:02:18 15:01:40,20))
      }
    
    }
    //定义分区
    class MyIdPartition(disUid: Array[String]) extends Partitioner{
      val uidPartition = new mutable.HashMap[String, Int]()
      var partitionID=0//区ID
      for(sid<-disUid){
        uidPartition(sid)=partitionID  //遍历每个uid
        partitionID+=1        //将分区+1由0号分区开始
      }
    
      override def numPartitions: Int =  disUid.length
    
      override def getPartition(key: Any): Int =  {
        val tuple = key.asInstanceOf[(String, Long, Long, Long)] //获取全局聚合的key
        val value = tuple._1
        uidPartition(value)      //传给hashmap按什么分区
      }
    }
    展开全文
  • Spark 之 经典案例

    2017-09-01 17:06:30
    转自:... Hadoop经典案例Spark实现(二)——数据去重问题 1、原始数据 1)file1: [plain] view plain copy 2012-3-1 a  2012-3-2 b  2012-3

    转自:http://blog.csdn.net/kwu_ganymede/article/details/50474763


    Hadoop经典案例Spark实现(二)——数据去重问题


    1、原始数据
    1)file1:

    [plain] view plain copy
    1. 2012-3-1 a  
    2. 2012-3-2 b  
    3. 2012-3-3 c  
    4. 2012-3-4 d  
    5. 2012-3-5 a  
    6. 2012-3-6 b  
    7. 2012-3-7 c  
    8. 2012-3-3 c   

    2)file2:
    [plain] view plain copy
    1. 2012-3-1 b  
    2. 2012-3-2 a  
    3. 2012-3-3 b  
    4. 2012-3-4 d  
    5. 2012-3-5 a  
    6. 2012-3-6 c  
    7. 2012-3-7 d  
    8. 2012-3-3 c   


    数据输出:
    [plain] view plain copy
    1. 2012-3-1 a  
    2. 2012-3-1 b  
    3. 2012-3-2 a  
    4. 2012-3-2 b  
    5. 2012-3-3 b  
    6. 2012-3-3 c  
    7. 2012-3-4 d  
    8. 2012-3-5 a  
    9. 2012-3-6 b  
    10. 2012-3-6 c  
    11. 2012-3-7 c  
    12. 2012-3-7 d  


    3)、说明
    数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,
    无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,

    而对value-list则没有要求。当reduce接收到一个<key,value-list>时就直接将key复制到输出的key中,并将value设置成空值。


    2、MapReduce实现

    代码编写

    [java] view plain copy
    1. import java.io.IOException;  
    2.   
    3. import org.apache.hadoop.conf.Configuration;  
    4. import org.apache.hadoop.fs.Path;  
    5. import org.apache.hadoop.io.IntWritable;  
    6. import org.apache.hadoop.io.Text;  
    7. import org.apache.hadoop.mapreduce.Job;  
    8. import org.apache.hadoop.mapreduce.Mapper;  
    9. import org.apache.hadoop.mapreduce.Reducer;  
    10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    12. import org.apache.hadoop.util.GenericOptionsParser;  
    13.    
    14.   
    15. public class Dedup {  
    16.   
    17.     //map将输入中的value复制到输出数据的key上,并直接输出  
    18.   
    19.     public static class Map extends Mapper<Object,Text,Text,Text>{  
    20.   
    21.     private static Text line=new Text();//每行数据  
    22.   
    23.     //实现map函数  
    24.   
    25.     public void map(Object key,Text value,Context context)  
    26.   
    27.         throws IOException,InterruptedException{  
    28.   
    29.         line=value;  
    30.   
    31.         context.write(line, new Text(""));  
    32.   
    33.     }  
    34.    
    35.   
    36.     }  
    37.   
    38.      
    39.   
    40.     //reduce将输入中的key复制到输出数据的key上,并直接输出  
    41.   
    42.     public static class Reduce extends Reducer<Text,Text,Text,Text>{  
    43.   
    44.     //实现reduce函数  
    45.   
    46.     public void reduce(Text key,Iterable<Text> values,Context context)  
    47.   
    48.         throws IOException,InterruptedException{  
    49.   
    50.         context.write(key, new Text(""));  
    51.   
    52.     }  
    53.         
    54.     }  
    55.   
    56.      
    57.   
    58.     public static void main(String[] args) throws Exception{  
    59.   
    60.     Configuration conf = new Configuration();  
    61.   
    62.     //这句话很关键  
    63.   
    64.     conf.set("mapred.job.tracker""192.168.1.2:9001");  
    65.   
    66.          
    67.   
    68.     String[] ioArgs=new String[]{"dedup_in","dedup_out"};  
    69.   
    70.         String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();  
    71.   
    72.         if (otherArgs.length != 2) {  
    73.   
    74.           System.err.println("Usage: Data Deduplication <in> <out>");  
    75.   
    76.           System.exit(2);  
    77.   
    78.         }  
    79.   
    80.        
    81.   
    82.      Job job = new Job(conf, "Data Deduplication");  
    83.   
    84.      job.setJarByClass(Dedup.class);  
    85.   
    86.        
    87.   
    88.      //设置Map、Combine和Reduce处理类  
    89.   
    90.      job.setMapperClass(Map.class);  
    91.   
    92.      job.setCombinerClass(Reduce.class);  
    93.   
    94.      job.setReducerClass(Reduce.class);  
    95.   
    96.        
    97.   
    98.      //设置输出类型  
    99.   
    100.      job.setOutputKeyClass(Text.class);  
    101.   
    102.      job.setOutputValueClass(Text.class);  
    103.   
    104.        
    105.   
    106.      //设置输入和输出目录  
    107.   
    108.      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
    109.   
    110.      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
    111.   
    112.      System.exit(job.waitForCompletion(true) ? 0 : 1);  
    113.   
    114.      }  
    115.   
    116. }   


    3、Spark实现Scala版本

    [java] view plain copy
    1. val two = sc.textFile("/tmp/spark/two")  
    2.   
    3. two.filter(_.trim().length>0).map(line=>(line.trim,"")).groupByKey().sortByKey().keys.collect.foreach(println _)  

    上面通过groupByKey来去重,并sortByKey排序,因为hadoop的结果也是排序过的,验证结果是一样的,代码精简不少



    展开全文
  • Spark详细案例实操

    2020-07-24 08:30:32
    Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。 编写WordCount程序 创建一...
  • Spark 常用案例

    千次阅读 2018-07-18 16:43:12
    数据过滤清洗数据 .../spark/seven.txt") //filter 过滤长度小于0, 过滤不包含GET与POST的URL val filtered = data.filter(_.length() &gt; 0).filter(line =&gt; (line.indexOf...
  • Spark 入门实战之最好的实例

    万次阅读 多人点赞 2018-08-16 16:34:28
    转载:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/ 搭建开发环境 安装 Scala IDE 搭建 Scala 语言开发环境很容易,Scala IDE 官网 下载合适的版本并解压就可以完成安装,本文使用...
  • spark大数据案例

    2018-11-12 09:21:20
    包含了Spark的一系列的小案例,包含core,sql,stream等案例
  • Spark开发实例(编程实践)

    千次阅读 2019-06-29 17:45:37
    本节将介绍如何实际动手进行 RDD 的转换与操作,以及如何编写、编译、打包和运行 Spark 应用程序。 启动 SparkShell Spark 的交互式脚本是一种学习 API 的简单途径,也是分析数据集交互的有力工具。Spark 包含多种...
  • 一、flume安装 (一)概述    Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等...
  • 大数据Spark实战视频教程

    万人学习 2016-11-10 14:26:54
    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室...
  • Spark实战(5)_Spark Core核心编程

    千次阅读 2018-05-10 18:25:39
    Spark版本 cdh5.9.0集成的spark的版本1.6.0,集成的hadoop版本2.6.0。查看的网址:http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/5.9.0/如果用cdh5.9.0 parcels离线安装自带的spark(on yarn),启动时提示...
  • Spark实战项目之电影推荐

    千次阅读 2019-06-17 12:56:57
    一、Spark知识点 二、项目数据 三、项目代码 import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark....
  • Spark实战

    万次阅读 2015-06-24 16:07:05
    01.Spark简介(Spark VS MapReduce) 02.Spark生态系统 03.Scala集合简介 04.spark的关键组件 05.核心概念:弹性分布式数据集 06.RDD的操作(转换(transformation)动作(actions)) 07.RDD依赖 08.Wordcount例子 09...
  • 大数据Spark实战视频教程 张长志技术全才、擅长领域:区块链、大数据、Ja...
  • Spark实战教程 大强老师 大华软件学院 技术总监 / 高级讲师 曾就...
  • Spark 入门实战之实例

    千次阅读 2018-01-03 23:15:38
    转载:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/ http://blog.csdn.net/gongpulin/article/details/51534754 搭建开发环境 安装 Scala IDE 搭建 Scala 语言开发环境...
  • Spark案例实战之四

    2018-07-27 22:04:17
    Spark案例实战之四 一.微博专栏分析 1.需求:有一个微博网站,下面有很多栏目,每个栏目下面都有几千万用户,每个用户会有很多的粉丝,要求取出各栏目粉丝量最多的用户TopN。【可用TreeMap实现,专栏:feature, ...
  • SparkSQL实战
  • 自己按照视频讲的整理即学即用Spark实战44讲范东来模块一,详细内容参考链接https://kaiwu.lagou.com/course/courseInfo.htm?courseId=71#/detail/pc?id=1971
  • 使用java开发spark 实战

    千次阅读 2017-01-18 12:50:53
    一:环境搭建 安装jdk 和maven。 1. 安装jdk并配置环境变量 系统变量→新建 JAVA_HOME 变量 。 变量值填写jdk的安装目录(本人是 E:\Java\jdk1.7.0) 系统变量→寻找 Path 变量→编辑 ...
  • IntelliJ IDEA开发Spark案例之WordCount

    千次阅读 2019-03-28 15:37:41
    教程目录0x00 教程内容0x01 新建Maven项目1. 新建Maven项目2. 项目配置3. 引入项目的依赖0x02 编写WordCount代码1. 新建包2. 编写scala代码0x03 校验结果1. 统计文件准备2. 统计结果0xFF 总结 0x00 教程内容 ...a....b....
  • Spark实战高手之路

    2015-09-22 07:30:07
    Spark实战
  • 图解Spark 核心技术与案例实战 清晰版,包含了书中设计到的所有项目代码已经项目测试数据
  • SparkSQL引入了一种新的RDD——SchemaRDD,SchemaRDD由行对象(Row)以及描述行对象中每列数据类型的Schema组成;SchemaRDD很象传统数据库中的表。SchemaRDD可以通过RDD、Parquet文件、JSON文件、或者通过使用hiveql...
  • Spark实战.docx

    2018-06-01 09:26:00
    1. Spark是特性  高可伸缩性  高容错  于内存计算 2. Spark的生态体系(BDAS,中文:伯利克分析栈)  MapReduce属于Hadoop生态体系之一,Spark则属于BDAS生态体系之一  Hadoop包含了MapReduce、HDFS、HBase...
  • 启动并查看集群的状况 第一步:启动Hadoop集群,这个在第二讲中讲解的非常细致,在此不再赘述: 启动之后在Master这台机器上使用jps命令... 在Hadoop集群成功启动的基础上,启动Spark集群需要使用Spark的sbin
  • HadoopSparkExampler,Hadoop+Spark大数据巨量分析演示代码
  • Spark简单案例实战

    千次阅读 2018-04-09 10:13:16
    Spark简单案例实战 一.给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6)键值对的key表示图书名称,value表示每天图书销量,请计算出每个键对应的平均值,也就是每种图书每天的平均销量...
  • 使用java开发spark实战

    千次阅读 2016-04-26 16:42:24
     使用java开发spark 实战     一:环境搭建 安装jdk 和maven。 1. 安装jdk并配置环境变量 系统变量→新建 JAVA_HOME 变量 。 变量值填写jdk的安装目录(本人是 E:\Java\jdk1.7.0) 系统变量→寻找 Path ...

空空如也

1 2 3 4 5 ... 20
收藏数 38,439
精华内容 15,375
关键字:

spark案例