精华内容
下载资源
问答
  • spark 笔记

    2018-07-01 23:52:01
    spark 笔记
  • spark笔记

    2021-01-21 14:54:28
    目录SPARKSPARK笔记spark版本下载 SPARK SPARK笔记 spark版本下载 https://archive.apache.org/dist/spark/ 参考链接

    SPARK

    SPARK笔记

    spark版本下载

    https://archive.apache.org/dist/spark/
    官网
    参考链接

    spark连接HDFS报错

    spark连接HDFS报错:8020 failed on connection exception: java.net.ConnectException: 拒绝连接 解决方法
    

    解决方法

    刚开始学习spark,结果连接HDFS报错了。查了一些资料,原因找到了。
    因为我配置的HDFS集群端口是9000,而spark写的是8020,结果肯定会报错啊。把8020改为9000即可。错误很低级,但写出可能给新手有帮助吧。
    

    pyspark设置python的版本

    参考链接

    pyspark报错

    Caused by: MetaException(message:Hive Schema version 2.3.0 does not match metastore's schema version 1.2.0 Metastore is not upgraded or corrupt)
    

    解决办法

    原因:
    spark应用创建表时,指定的schema版本为2.3.0,而hive的schema版本为1.2.0,版本不兼容导致
    方法1
    spark安装:spark-3.0.0-bin-hadoop2.7-hive1.2.tgz
    方法2
    参考如下链接
    

    参考链接
    参考链接

    Failed to get database global_temp, returning NoSuchObjectException

    mv hive/conf/hive-site.xml  spark/conf/hive-site.xml
    mv hive/lib/mysql-connector-java-5.1.44-bin.jar  spark/jars/mysql-connector-java-5.1.44-bin.jar
    

    参考链接

    展开全文
  • spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory
  • spark笔记整理文档

    2018-01-18 18:16:10
    spark笔记整理文档spark笔记整理文档spark笔记整理文档
  • 火花笔记本演示 演示如何使用Spark笔记本
  • Spark-SourceCode分析 Apache Spark笔记本作者:祁传宏军 Spark简介 Spark起源自科研院所,加州大学伯克利分校UC Berkeley的AMP实验室。该校在edx上开设了系列课程:)“使用Spark XSeries进行数据科学与工程”。 08...
  • spark笔记.zip

    2020-04-23 12:42:30
    spark 学习笔记
  • spark笔记csdn

    2018-06-13 13:03:00
    spark笔记csdn https://blog.csdn.net/qq_27926875 转载于:https://www.cnblogs.com/ycx95/p/9177232.html

    spark笔记csdn

    转载于:https://www.cnblogs.com/ycx95/p/9177232.html

    展开全文
  • 王家林Spark笔记

    千次阅读 2016-08-25 02:26:07
    王家林Spark笔记 第一讲:Scala光速入门 本期内容 1、Scala的重大价值 2、Scala基础语法入门实战 3、Scala函数入门实战 4、Scala中Array、Map、Tuple实战 5、综合案例及Spark源码解析 kafka 消息中间件 val name:...
    王家林Spark笔记
    
    第一讲:Scala光速入门
    本期内容
    1、Scala的重大价值
    2、Scala基础语法入门实战
    3、Scala函数入门实战
    4、Scala中Array、Map、Tuple实战
    5、综合案例及Spark源码解析


    kafka 消息中间件
    val name:String = null
    import scala.math._
    min(20,4)
    Array(1,2,3,4)
    val array = Array(1,2,3,4)
    array
    val array = Array.apply(1,2,3,4)
    array


    val age:Int = 0
    val name:String = null
    val age1,age2,age3 = 0


    1+1
    1.+(1)


    import scala.math._
    min(20,4)


    if(age >= 18) "adult"


    val:不可变变量
    var:可变变量
    一般情况下用val


    val result = if(age>=18){}
    val age = 19
    res9.toInt
    if(age >= 18) "adult" else "child"
    val result = if(age >=18) "adult" else "child"


    val result = if(age >=18){
        "adult"
        buffered = 10
        buffered
    }
    println("Spark")
    print("\nSpark")
    printf(" %s is the future of Big Data Computation Framework.\n","Spark") 格式占位符


    -----------输入--读取内容:---------------------
    readLine(" Please enter your password : ")
    readInt
    ----------------------------------------------
    -------循环----------------
    while(element > 10){
    println(element)
    element -= 1
    }


    0 to element
    for(i <- 0 to element) println(i)
    for(i<-0 to element if i%2 ==0){println(i)}
    for(i <- 0 to element if i%2 ==0){println(i)}
    import scala.util.control.Breaks._
    for(i <- 1 to 10){
    if(i == 4) break
    }


    val n = 10
    def f1:Int = {
    for(i <- 1 to 20)
    {
    if(i == n) return 9
    println(i)
    }
    }


    val n = 10
    def f1:Any = {
      for(i<- 1 to 10){
        if(i==0) return i
        println(i)
      }
    }


    println _
    def f2 = println _


    import scala.io.Source._
    import scala.io._
    try{
      val content = fromFile("/root/1.scala").mkString
    }catch{
      case _:FileNotFoundException => println("Oops!!!File not found")
    }finally{
      println("Byebye world!")
    }


    val arr = new Array[Int](5)
    val arr1 = Array("scala","spark")


    import scala.collection.mutable.ArrayBuffer
    val arrBuffer = ArrayBuffer[Int]()
    arrBuffer += 10
    arrBuffer
    arrBuffer += (12,13,14,15,16,17)
    arrBuffer ++= Array(1,2,3,4)


    arrBuffer.trimEnd
    arrBuffer.trimStart
    arrBuffer.insert(5,100) 从指定位置插入数据
    arrBuffer.insert(7,1100,200,300,400,500,600)
    arrBuffer.remove(10) 指定位置进行移除
    arrBuffer.toArray


    val arr2 = arrBuffer.toArray
    arr2.toBuffer


    for (elem <- arr2) println(elem)
    arr2


    for(i<-0 until(arr2.length,1))println(arr2(i))
    for(i<-0 until(arr2.length,2))println(arr2(i))
    for(i<-(0 until arr2.length).reverse) println(arr2(i))
    arr2.sum
    arr2.max
    scala.until.Sorting.quickSort(arr2)
    scala.util.Sorting.quickSort(arr2)
    arr2.mkString
    arr2.mkString(", ")
    val arr3 = for(i <- arr2) yield i*i
    val arr3 = for(i <- arr2 if i % 3 == 0) yield i*i
    arr2.filter(_%3 ==0).map(i => i*i)
    arr2.filter{_%3 == 0}.map{i => i*i}




    def f3(param1:String,param2:Int =30) = param1 + param2
    f3("Spark")
    f3(param2=100,param1="Scala")
    def sum(numbers: Int*) = {var result = 0;for(element <- numbers) result += element;result}
    sum(1,2,3,4,5,6,7,8,9,10)
    sum(1 to 100: _*)
    def morning(content:String) = "Good" + content
    def morning(content:String):Unit = "Good" + content






    import scala.io.
    val arr3 = for(i <- arr2) yield i * i
    val arr3 = for(i <- arr2 if i % 3 == 0) yield i*i
    val person = scala.collection.immutable.SortedMap("Spark" -> 6,"Hadoop" -> 11)
    val persons = Map("Spark" ->6,"Hadoop" -> 11)


    for(elem <- arr2) println(elem)


    作业一:移除一个数组中第一个负数后的所有负数
    val persons = scala.collection.mutable.Map("Spark"->6,"Hadoop"->11)
    persons += ("Flink" -> 5)
    persons -= "Flink"
    val sparkValue = if(persons.contains("Spark")) persons("Spark") else 1000
    val sparkValue = persons.getOrElse("Spark",1000)
    for((key,value) <- persons) println(key + " : " + value)
    for(key <- persons.keySet) println(key + ":")


    val persons = scala.collection.immutable.SortedMap("Spark"->6,"Hadoop"->11)
    val tuple = ("Spark",6,99.0)
    tuple._1
    tuple._2
    --------------------------------------------
    第二节
    class HiScala{
      private var name = "Spark"
      def sayName(){println(name)}
      def getName = name
    }
    val scal  = new Hiscala
    scal.sayName
    scal.getName




    ------------------------------------
    第三节函数式编程
    def fun1(name:String){println(name)}
    val fun1_v =fun1 _
    fun1("Spark")
    fun1_v("Spark")
    fun1_v("Spark")
    val fun2 = (content:String)=>println(content)




    val xm="西门大官人"
    val jl="金莲"
    def makelove(status:Int){
      if (status == 1) println ("雄风再起")
      else println("偃旗息鼓") 
      println("金莲鄙视地说:还以为你真的那么强呢,不嗑药和大郎也差不多少啊!")
      println("西门大官人骂道:你个贱货!我不是早和你说过,老爷我今天有些感冒吗……")
    }
    makelove(0)






    高阶函数
    val xm="西门大官人"
    val jl="金莲"


    val say = (content:String) => println(content)


    def makelove(func:(String)=>Unit,status:Int){
      if (status == 1) println ("雄风再起")
      else println("偃旗息鼓") 
      func(jl+"鄙视地说:还以为你真的那么强呢,不嗑药和大郎也差不多少啊\n"+xm+"骂道:你个贱货!我不是早和你说过,老爷我今天有些感冒吗……")
    }
    makelove(say,0)




    abstract class Love(val man:String,val woman:String){
        def make;
    }


    class goodLove(man:String,woman:String) extends Love(man,woman){
        def make={
           println(s"$man:Let me drink something,....")
           println(s"$woman:  Great!")
           println(" .....wa,wa,....,(about 30000 words are omitted).....");
           println(s"$woman: Your knife is sharp as many years ago....");


        }
    }


    class badLove (man:String,woman:String) extends Love(man,woman){
        def make={
           println(s" $woman:Let me drink something,....")
           println(s" $man: That's ok,but I have forgotten something......")
           println(" .....wa,wa,....,(about 30 words are omitted).....");


           println(s"$woman: why you stop?!");
           println(s"$man: 你个贱人,爷今儿个感冒了,你不知道啊?!!");


        }
    }


    def main(status:Int){
      if(status == 1)
        new goodLove("西门庆","潘金莲").make
      else
        new badLove("西门庆","潘金莲").make


    }
    main(0)


    ------------------------
    val hiScala = (content:String) => println(content)


    def bigData(func:(String)=>Unit,content:String){
      func(content)
    }
    bigData(hiScala,"Spark")
    --------------------------------
    array.map(item =>2 * item)
    array.map(item => 2 * item)


    def func_Returned(content:String)=(message:String)=>println(message)
    func_Returned("Spark")


    def func_Returned(content:String) =(message:String)=>println(content + " " + message)


    val returned = func_Returned("Spark")


    returned("Scala")


    如果在函数的函数体中只使用了一次函数的输入参数的值此时我们可以将函数的输入参数的名称省略掉用下划线来代替。
    def spark(func:(String)=>Unit,name:String){func(name)}
    spark((name:String) => println(name),"Scala")
    spark(name => println(name),"Scala")
    spark(println,"Scala")
    spark(println(_),"Scala")
    array.map(2*_).filter(_ > 10).foreach(println)


    闭包:函数的变量超出他的有效作用域的时候,还能够对函数的内部变量进行访问。
    def scala(content:String)=(message:String) => println(content + ":" + message)
    val funcResult = scala("Spark")
    funcResult("Flink")


    sum_Curring_Better(1)(3)
    (1 to 100).reduceLeft(_+_)
    val list = List("Scala","Spark","Fink")
    list.map("The content is :" + _)
    list.map(println)
    val cal = list.map("The content is :" + _)
    list.flatMap(_.split)
    cal
    cal.flatMap(_.split(" "))
    cal.flatMap(_.split(" ")).foreach(print)
    list.zip(List(10,6,5))


    第四节:Scala模式匹配、类型系统
    def bigData(data:String){
      data match{
        case "Spark" => println("WoW!!!")
        case "Hadoop" => println("Ok")
        
        case _ if data == "Flink" => println("Cool")
        case _ => println("Something others")
      }
    }


    bigData("Hadoop")
    bigData("Flink")


    import java.io._
    def exception(e:Exception){
    e match{
      case fileException: FileNotFoundException => println("File not found:" + fileException)
      case _:Exception => println("Exception getting thread dump from executor SexecutorId",e)
    }
    }


    exception(new FileNotFoundException("oop"))


    def data(array:Array[String]){
      array match{
      case Array("Scala") => println("Scala")
      case Array(spark,hadoop,flink)=>println(spark + ":" + hadoop + ":" + flink)
      case Array("Spark",_*) => println("Spark ...")
      case _ => println("Unknow")
      }
    }


    data(Array("Scala"))
    data(Array("Scala","Spark","Kafka"))


    case class Person(name:String)


    class Compare[T : Ordering](val n1:T,val n2:T){
      def bigger(implicit ordered:Ordering[T]) = if(ordered.compare(n1,n2) > 0) n1 else n2
    }
    作业:阅读Spark源码RDD hadoopRDD SparkContext Master Worker的源码,并分析里面使用的所有模式匹配和类型参数的内容。


    -嵌套类------------------------
    class A{class B}
    val a1 = new A
    val a2 = new A
    val b1 = new a1.B
    val b2 = new a2.B


    A$B
    b1.getClass
    b1.getClass == b2.getClass


    typeOf[a1.B] == typeOf[a2.B]


    class Person[T](val content : T)


    val p = new Person[String]("Spark")
    p.getContent("Scala")
    p.getContent(100)
    val p = new Person[String(2.3)]


    ViewBounds 语法 <%


    第五节:Scala隐式转换和并发编程


    val result = 3 * Fraction(4,5)




    import scala.math.abs


    class Fraction(n: Int, d: Int) {
      private val num: Int = if (d == 0) 1 else n * sign(d) / gcd(n, d);
      private val den: Int = if (d == 0) 0 else d * sign(d) / gcd(n, d);


      override def toString = num + "/" + den


      def sign(a: Int) = if (a > 0) 1 else if (a < 0) -1 else 0


      def gcd(a: Int, b: Int): Int = if (b == 0) abs(a) else gcd(b, a % b)


      def +(other:Fraction):Fraction={
        newFrac((this.num * other.den) + (other.num * this.den),this.den * other.den)
      }


      def -(other:Fraction):Fraction={
        newFrac((this.num * other.den) - (other.num * this.den),this.den * other.den)
      }


      def *(other:Fraction):Fraction={
        newFrac(this.num * other.num,this.den * other.den)
      }


      def /(other:Fraction):Fraction={
        newFrac(this.num * other.den,this.den * other.num)
      }


      private def newFrac(a:Int,b:Int):Fraction={
        val x:Int = if (b == 0) 1 else a * sign(b) / gcd(a, b);
        val y:Int = if (b == 0) 0 else b * sign(b) / gcd(a, b);
        new Fraction(x,y)
      }
    }


    object Test extends App{
      val f = new Fraction(15,-6)
      val p = new Fraction(20,60)
      println(f)
      println(p)
      println(f + p)
      println(f - p)
      println(f * p)
      println(f / p)
    }




    class Level(val level:Int)
    def toWorker(name:String)(implicit level : Level){
      println(name + ":" + level)
    }
    implicit val level = new Level(8)
    toWorker("Spark")




    implicit val level = new Level(8)
    class Level(val level:Int)
    def toWorker(name:String)(implicit l:Level) = println(name  + ":" + l.level)


    import scala.actors.Actor
    class HiActor extends Actor{
      def act(){
        while(true){
          receive{
            case name:String => println(name)
          }
        } 
      }
    }
    val actor = new HiActor
    actor.start()
    actor ! "Spark"


    case class Basic(name: String,age: Int)
    case class Worker(name: String,age: Int)


    class basicActor extends Actor{
      def act(){
      while(true){
        receive{
          case Basic(name,age) => println("Basic Information:" + name + " : " + age)
          case Worker(name,age) => println("Basic Information:" + name + " : " + age)
        }
      }
      }
    }


    val b = new basicActor
    b.start
    b ! Basic("Scala",13)
    b ! Worker("Spark",7)
    val result = b !? Worker("Spark",7)


    sc.textFile("hdfs://192.168.1.30:9000/spark/input/access_2013_05_30.log").flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)).saveAsTextFile("hdfs://192.168.1.30:9000/spark/output4/")


    ---------------------------------------
    第十二课Spark集群工作原理
    Spark高可用HA实战
    -----------------------------------------------------------------------------------------------------
                   |---------------------------------------|
                   |                zookeeper              |               
          |---------------------------------------|
    ^                            ^
    |                            |
    |                            |
     Driver     <---->Master(active)               Master(standby)
    ^                    ^
    |     |
    |                    |
    |                    V
    | worker           worker
    |   ^                 ^
    |   |                 |
    |   |                 |
    |   v                 V
    v executor         executor
    -------------------------------------------------------------------------------------------
    2014年6月以前都是 两台active  standby集群资源分配
    以后都是三台:一台active 两台standby 通过zookeeper选出leader
    zookeeper:包含的元数据有Worker、Driver、Application。


    切换Master:程序在运行之前已经向Master申请过资源 Driver和executor进行通信 这种情况下不需要Master参与。除非executor出现故障。
    弊端:
    粗粒度:
     优点:一次性资源的分配后,不用关心资源的分配。而让Drive和executor进行交互完成作业。
     弊端:Job 一百万个任务 有一个没有完成就等待在哪里。资源不会释放,闲置在那里。
    细粒度:
     优点:你有这个计算资源就分配给计算任务。
     弊端:任务启动慢、没有办法复用。
    一般都是使用粗粒度。


    spark-shell --master spark://master:7077,slave1:7077,slave2:7077
    -------------------------------------------------------------------------------------------------------------------------------------
    第十三课:Spark内核架构
    Spark Runtime




    Driver  <------------->  Worker(RAM、Input Data)
            results tasks
            <------------->
    --------------------------------------------------------------------------------------------------------------------------
    Driver部分的代码:SparkConf + SparkContext
    Drive运行Application的main函数,创建的SparkContext是整个程序运行调度的核心,SparkContext要有高层调度器DagScheduler、底层调度器TestScheduler,也有SchedulerBackend 
    向Master注册程序,注册成功后,Master会分配资源,根据action触发的job job里有一系列的RDD
    从后向前推发现如果是宽依赖:发放给不同的stage,stage发放给底层调度器TestScheduler
    一般表示standalone模式


    应用程序有两个层面:
    Application = driver + executor
    Driver部分的代码:SparkConf + SparkContext
    Worker管理当前节点的计算资源并接受Master指令来分配具体的计算资源Executor(在新的进程中分配)
    ExecutorRunner


    spark优势:
    1、基于内存计算
    2、调度和容错


    窄依赖:一对一的,固定个数的依赖


    stage:计算逻辑完全一样只是计算的数据不同。
    问题:一个partition是否精准的等于一个block大小?不是


    一个Application里可以有多个job。
    checkpoint也可以导致Job


    专门用来提交Spark程序,这台机器一般一定和Spark Cluster在同样的网络环境中(Driver频繁和Executor通信),且其配置和普通的worker一致
    Application(各种依赖的外部资源,例如:*.soFile),使用sparkSubmit去运行程序(可以配置运行时候的各种参数,例如memory cores。。。)实际生产环境下写Shell脚本自动化配置
    和提交程序,当然当前的机器一定要安装了Spark,只不过是这里安装的Spark不属于集群。


    Driver(核心是SparkContext)
    --supervise  当Driver挂掉后,集群可重新启动Driver。
    SparkContent:创建DAGScheduler、TaskScheduler、SchedulerBackend
    在实例化过程中Register注册当前程序的Master,Master接受注册,如果没有问题,Master合为当前程序分配Appid并分配计算资源。
    一般情况下通过action触发job时SparkConext会通过DAGScheduler来把Job中的RDD构成的DAG划分成不同的Stage,每个Stage内部是一系列业务逻辑完全相同但是处理数据不同的Task,构成了
    TaskSet
    TaskScheduler和SchedulerBackend负责具体Task的运行(遵循数据本地性)


    --------------------------------------------------------------------------------------------------------------------
    Spark的程序的运行有两种模式:Client Cluster
    Spark Cluster
    Master:接受用户提交的作业并发送指令给Worker为当前程序分配计算资源。每个Worker所在节点默认为当前程序分配一个Executor,在Executor中通过线程池并发执行
    Master通知Worker接受要求启动Executor
    Worker Node 
    Worker进程,通过一个Prox为ExecutorRunner的对象实例来远程启动ExecutorBackend进行
    ExecutorBackend进程里面有Executor
    实际在工作的时候通过TaskRunner来封装Task,然后从ThreadPool中获取一条线程执行Task,执行完后线程被回收复用。


    ThreadPool
    最后一个Stage中Task称为ResultTask,产生Job的结果,其他前面的Stage中的Task都是ShuffleMapTask,为下一阶段的Stage做数据准备,相当于MapReduce中的Mapper


    整个Spark程序的运行,就是DAGScheduler把Job划分成不同的Stage,提交TaskSet的TaskScheduler,进而提交给Executor执行(符合数据本地性),每个Task会计算RDD中的一个
    Partition,基于该Partition来具体执行我们定义的一系列同一个Stage内部函数,依次类推直到整个程序运行完成。
    1、spark-env.sh spark-default.sh
    2、spark-submit提供的参数
    3、程序中SparkConf配置的参数
    ----------------------------------------------------------------------------------------------------------------
    第十四课 RDD解密
    1、RDD:基于工作集的应用抽象
    2、RDD内幕
    3、RDD思考
    基于数据流不适合的场景
    1、不适合大量的迭代
    2、交互式查询
    重点是:基于数据流的方式,不能够复用曾经的结果或者中间计算结果
    RDD是基于工作集的
    RDD:Resillient Distributed Dataset
    弹性之一:自动的进行内存和磁盘数据存储的切换
    弹性之二:基于Lineage的高效容错性
    弹性之三:Task如果失败会自动进行特定次数的重试
    弹性之四:Stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
    弹性之五:checkpoint和persist
    弹性之六:数据调度弹性:DAG、TASK和资源、管理无关
    弹性之七:数据分片的高度弹性


    RDD:是分布式函数式编程的抽象
    RDD通过记录数据更新的方式为何很高效。
    1、RDD不可变的 + lazy
    2、RDD是粗粒度的


    RDD写是粗粒度的
    但是RDD的操作可以使粗粒度也可以是细粒度的。
    Spark要统一数据计算领域,除了实时事务性处理


    RDD不支持细粒度的写操作以及增量迭代计算
    --------------------------------------------------
    第十五课 RDD创建内幕
    第一个RDD:代表了Spark应用程序输入数据的来源
    通过Tranformation来对RDD进行各种算子的转换
    1、使用程序中的集合创建RDD
    2、使用本地文件系统创建RDD
    3、使用HDFS创建RDD
    4、基于DB创建RDD
    5、基于NoSql,例如Hbase
    6、基于S3创建RDD
    7、基于数据流创建RDD


    1、通过集合创建RDD的实际意思:测试
    2、使用本地文件系统创建RDD的作用,
    3、使用HDFS来创建RDD 生产环境最常用的RDD创建方式


    实例:基于集合来创建RDD
    object RDDBasedOnCollections{
      def main(args: Array[String]){
        val conf = new SparkConf() //创建SparkConf对象
            conf.setAppName("RDDBasedOnCollections")
            conf.setMaster("local")
        val sc = new SparkContext(conf)
        //val number = 1 to 100
        //val rdd = sc.parallelize(number)
        //val sum = rdd.reduce(_ + _)
        //println("1+2+3...+99" + sum)


        val rdd = sc.textFile("D://data//SogouQ//")
        val linelength = rdd.map(line =>line.length)
        val sum = linelength.reduce(_ + _)
        println("the total=" + sum)
      }
    }
    Local模式 默认情况下如果失败了就是失败了
    实际上Spark的并行度到底应该设置多少呢?
    每个Core可以承载2-4个partition 64-128之间
    ----------------------------------------
    第十六课 RDD实战
    1、RDD实战
    2、RDD的Transformation与Action
    3、RDD执行手动绘图


    action触发job shuffle触发stage


    Transformations and Actions
    |---------------------------------------------------------------------------------------------
    |Transformations  map(f:T=>U)            : RDD[T] => RDD[U]
    |                 filter(f:T=>Bool)      : RDD[T] => RDD[T]
    |                 flatMap(f:T => Seq[U]) : RDD[T] => RDD[U]
    |                 sample(raction: Float) : RDD[T] = > RDD[T](Deterministic sampling)
    | groupByKey()           : RDD(K,V) => RDD[(K,Seq[V])]
    | reduceByKey(f:(V,V)=>V) : RDD(K,V)=>RDD[(K,V)]
    | union()                : (RDD[T],RDD[T])=>RDD[T]
    | join()                 :(RDD|(K,V),RDD(K,W)) => RDD[(K,(V,W))]
    | cogroup()              : (RDD[K,V],RDD[K,W]) => RDD[K,(Seq[V],Seq[W])]
    | crossProduct()         : (RDD[T],RDD[U]) => RDD[(T,U)]
    | mapValues(f:V => W)    : RDD[(K,V)](Preserves partitioning)
    | sort(c:Comparator[K])  : RDD[(K,V)]=>RDD[(K,V)]
    | partitionBy(p:Partitioner[K]): RDD[(K,V)]=>RDD[(K,V)]
    |Actions          count()                : RDD[T] => Long
    |                 collect()              : RDD[T] =>Seq[T]
    | reduce(f:(T:T)=>T)     : RDD[T] => T
    | lookup(k:K)            : RDD[(K,V)]=>Seq[V](On hash/range partitioned RDDs)
    | save(path : String)    : Output RDD to a storage system.e.g.HDFS
    |-----------------------------------------------------------------------------------------------


    -----------------------------------------------------
    第十七课
    1、map、filter、flatMap操作回顾
    2、reduceByKey、groupByKey
    3、join、cogroup


    object Tranformations{
      def main(args: Array[String]){
        val sc = new SparkContext("Tranformation Operation")
        mapTranformation(sc)
        filterTranformation(sc)
        flatMapTranformation(sc)


        groupByKeyTranformations(sc)
        reduceByKeyTranformation(sc)


        sc.stop()
      }


      def sparkContext(name:String)={
        val conf = new SparkConf().setAppName("Tranformation").setMaster("local")
        val sc = new SparkContext(conf)


      }


      def mapTranformation(sc:SparkContext){
        val nums = sc.parallelize(1 to 10)
        val mapped = nums.map(item => 2*item)
        mapped.collect.foreach(println) //收集计算结果并循环打印
      }


      def filterTransformation(sc:SparkContext){
        val nums = sc.parallelize(1 to 20)
        val filtered = nums.filter(item => item%2 ==0)
        filtered.collect.foreach(println)
      }


      def flatMapTranformation(sc: SparkContext){
        val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
        val bigDataString = sc.parallelized(bigData) //创建以字符串为元素类型的ParallelCollectionRDD
        val words = bigDataString.flatMap(line => line.split(" "))
        words.collect.foreach(println)
      }


      def groupByKeyTranformation(sc:SparkContext){
        val lines = sc.textFile("")
        val words = lines.flatMap{line => line.split(" ")}
        val pairs = words.map{word =>(word,1)}
        val wordCountsOdered = pairs.reduceByKey(_+_)
        wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))
      }


      def joinTranformation(sc: SparkContext){
        val studentNames = Array(
          Tuple2(1,"Spark"),
          Tuple2(2,"Spark"),
          Tuple2(3,"Spark"),
        )
        val studentScores = Array(
          Tuple2(1,100),
          Tuple2(2,95),
          Tuple2(3,65),
        )
        val name = sc.parallelize(studentNames)
        val scores = sc.parallelize(studentScores)


        val studentNameAndScore = name.join(scores)
        studentNameAndScore.collect.foreach(println)
      }
    }


    def join[W](other:RDD)[(K,W)],partitioner:Partitioner):RDD[(K,(V,W))] = self.withScope{
      this.cogroup(other,partitioner).flatMapValues(pair => for(v <- pair._1.iterator;
      w <- pair._2.iterator) yield(v,w)
      )
    }


    -java-实现
    JavaSparkContext sc = new JavaSparkContext(conf)
    List<Tuple2<Integer,String>> namesList = Arrays.asList(
      new Tuple2<Integer,String>(1,"Spark"),
      new Tuple2<Integer,String>(2,"Tachyon"),
      new Tuple2<Integer,String>(3,"Hadoop")
    );


    List<Tuple2<Integer,String>> scoresList = Arrays.asList(
      new Tuple2<Integer,Integer>(1,100),
      new Tuple2<Integer,Integer>(2,90),
      new Tuple2<Integer,Integer>(3,70),
      new Tuple2<Integer,Integer>(1,110),
      new Tuple2<Integer,Integer>(2,95),
      new Tuple2<Integer,Integer>(2,60),
    );
    JavaRDD<Tuple2<Integer,String>> names = sc.parallelizePairs(namesList);
    JavaRDD<Tuple2<Integer,Integer>> scores = sc.parallelizePairs(scoresList);
    names.cogroup(scores);


    JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> nameScores = names.cogroup(scores);
    nameScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>(){
      private static final long seriaVersionUID = 1L;
      public void call(Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> t) throws Exception{
        System.out.println("Student ID:" + t._1)
        System.out.println("Name:" + t._2._1)
        System.out.println("Score:" + t._2._1)
        System.out.println("==============================")
      }
    })




    join和cogroup是所有Spark学习者必须掌握的内容,没有任何商量的余地
    ---------------------------
    第十八课RDD持久化、广播、累加器
    val numbers = src.parllelize(1 to 100)
    numbers.reduce(_+_)
    val result = numbers.map(2*_)
    val data = result.collect
    如果想在命令终端中看到结果,就必须collect
    persist 
    1、某步骤计算特别好使
    2、计算链条特别长的情况
    3、checkpoint要在RDD也一定要持久化数据
    4、shuffle之后
    5、shuffle之前(框架默认帮助我们把数据持久化到本地磁盘)


    cache之后不能有其他算子
    persist是lazy级别的
    unpersist是eager级别的


    广播是由Driver发给前Application分配的所有Executor内存级别的只读变量。
    executor中的线程池中线程共享该全局变量,极大的减少了网络传输(否则的话每个Task都要传输一次该变量)并极大的节省了内存,
    当然也隐形的提高的CPU的有效工作。




    累加器:Accumulator:对于Executor只能修改但不可读,只对Driver可读。
    val sum = sc.accumulator(0)
    val data = sc.parallelize(1 to 100)
    data.foreach(item => sum += item)
    val result = data.foreach(item => sum += item)
    println(sum)
    -------------------------------------------------------------
    第十九课 Spark高级排序
    public class SecondarySortApp{
      public static void main(String[] args){
        SparkConf conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("")
        JavaPairRDD<SecondarySortKey,String> pairs = lines.mapToPair(new PairFunction<String,SecondarySortKey,String>){
          private static final long serialVersionUID = 1L;
          public Tuple2<SecondarySortKey,String> call(String line) throws Exception{
            String[] splited = line.split(" ")
    SecondarySortKey key = new SecondarySortKey(Integer.valueOf(splited[0]),Integer.valueOf(splited[1]));
    return new Tuple2<SecondarySortKey,String>(key,line);
          }
        }


        JavaPairRDD<SecondarySortKey,String> sorted = pairs.sortByKey(); //完成二次排序
        //过滤掉排序后自定的Key,保留排序的结果
        JavaRDD<String>
        sorted.map(new Function<Tuple2<SecondarySortKey,String>,String>(){
          public String call(Tuple2<SecondSortKey,String> sortedContent) throws Exception{
            return sortedContent._2;
          }
        })
      }


    }


    --scala实现--
    class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable{
      def compare(other:SecondarySortKey):Int={
        if(this.first - other.first != 0){
          this.first -other.first
        }else{
          this.second - other.second
        }
      }
    }


    object SecondarySortApp{
      def main(args:Array[String]){
        val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
        val sc = new SparkContext(conf)
        val lines = sc.textFile("")
        val pairWithSortKey = lines.map(line =>(
          new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
        ))


        val sorted = pairWithSortKey.sortByKey(false)
        val sortedResult = sorted.map(sortedLine => sortedLine._2)
        sortedResult.collect().foreach(println)
      }
    }
    -----------------------------------------
    第二十一课 从Spark架构中透视Job


    worker:负责当前节点cpu和内存资源的使用
    spark-shell默认情况下没有任何的Job
    默认的资源分配方式在每个worker上为当前程序分配一个ExecutorBackend进行,且默认情况下会最大化的使用Core和Memory
    Executor会并发线程池来运行Task


    CoarseGrainedExecutorBackend:里面有executor,executor会通过并发线程池线和复用的方式来执行我们的Task
    在一个Executor中一次性最多能够运行多少并发的task取决于当前Executor能够使用的Cores的数量
    由于线程不关心具体Task中运行什么代码,所以Task和Thread是解耦合的,所以Thread是可以被复用的。


    当Spark集群启动的时候,首先启动Master进程负责整个集群资源的管理和分配并接受作业的提交且为作业分配计算计算资源,每个工作节点默认情况下都会启动一个
    Worker Process来管理当前节点的Memory,CPU等计算资源并且向Master汇报Worker还能够正常工作。
    Worker还能够正常工作,当用户提交作业给Master的时候,Master会为程序分配ID并且分配计算资源,默认情况下会为当前的应用程序在每个WorkerProcess下面分配一个
    CoarseGranedExceptionBackend进程,该进程默认情况下会最大化的使用当前节点上的内存和CPU


    我们说Worker Process管理当前节点的内存和CPU的计算资源,实质上是通过Master管理每台机器上的计算资源的。
    WorkerProcess会接受Master的指令为当前要运行的应用程序来分配CoarseGranedExceptionBackend进程


    Stage0是Stage1的Mapper
    Stage1是Stage2的Mapper
    Stage1是Stage0的reduce
    Stage2是Stage1的reduce
    Spark是一个更加精致和高效的MapReduce思想的具体实现
    最后一个Stage里面的Task是Result Task类型
    前面所有的Stage中Task的类型都是ShuffleMap Task类型


    Stage里面的内容一定是在Executor中执行的!
    而且Stage必须从前往后执行


    Spark的一个应用程序中可以因为不同的Action产生众多的job,每个Job至少有一个Stage
    --------------------------------------------------------------------------------
    第二十二课 RDD的依赖关系
    1、窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如:map、filter等都会产生窄依赖
    2、宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖


    总结:如果父RDD的一个Partition被一个RDD的Partition所使用就是窄依赖,否则的话是宽依赖。如果子RDD中的Partition对父RDD的Partition依赖的数量不会随着RDD数量规模
    的改变而改变的话,就是窄依赖,否则的话就是宽依赖。


    特别说明:对join操作有两种情况,如果说join操作的使用每个partition仅仅和已知的Partition进行join,这次是join操作就是窄依赖,其他情况的join操作就是宽依赖
    因为是确定的partition数据的依赖关系,所有就是窄依赖,得出一个推论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的partition
    的数量不会随着RDD数量规模的改变而改变)


    遇到Shuffle级别的依赖关系必须计算依赖的RDD的所有Partition 并且都发生在一个Task中计算
    上面两种假设的核心问题都是在遇到shuffle依赖的时候无法进行pipeline




    注意:
    1、从后往前推理遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到该Stage中;
    2、每个Stage里面的Task数量是由该Stage中最后一个RDD的Partition的数量所决定。
    3、最后一个Stage里面的任务的类型是ResulTask,前面其他所有的Stage里面的任务的类型就是ShuffleMapTask
    补充:Hadoop中的MapReduce操作中的Mapper和Reducer在Spark中基本等量算子是:map、reduceByKey;


    表面上是数据流动,实质上算子在流动
    1、数据不动代码动
    2、在一个Stage内幕算子为何会流动(Pipline)?首先是算子合并,也就是所谓的函数式编程的执行的时候最终进行函数的展开从而把一个Stage内部的多个算子合并成为一个大算子
    (其内部包含了当前Stage中所有算子对数据的计算逻辑);其次是由于Tranformation操作的Lazy特性!在具体算子交给集群的Executor计算之前首先会通过Spark Framework(DAGScheduler)
    进行算子的优化(基于数据本地性的pipeline)


    -----------------------------------------------------------------------
    第二十三课 从物理执行角度透视Spark
    一、再次思考pipeline
    即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式:
    1、f(record),f作用于集合的每一条记录,每次只作用于一条记录;
    2、f(record),f一次性作用于集合的全部数据
    Spark采用是第一种方式,原因:
    1、无需等待,可以最大化的使用集群的计算资源
    2、减少OOM的发生;
    3、最大化的有利于并发;
    4、可以精准的控制每一Partition本身(Dependency)及其内部的计算(compute);
    5、基于lineage的算子流动式函数式编程,节省了中间结果的产生,并且可以最快的恢复;
    疑问:会不会增加网络通信?当然不会!因为在pipeline


    二、思考Spark Job 具体的物理执行
      Spark Application里面可以产生1个或者多个Job,例如spark-shell默认启动的时候内部就没有Job,只是作为资源的分配程序,可以在spark-shell里面写代码产生若干个Job,
    普通程序中一般而言可以有不同的Action,每个Action一般也会触发一个Job
      Spark是MapReduce思想的一种更加精致和高效的实现,MapReduce有很多具体不同的实现,例如Hadoop的MapReduce基本的计算流程如下:首先是以JVM为对象的并发的Mapper,
    Mapper中map的执行会产生输出数据,输出数据会经过Pariitioner指定的规则放到LocalFileSystem中,然后在经由Shuffle、Sort、Aggregate变成Reducer中的reduce的输入,
    执行reduce产生最终的执行结果。Hadoop MapReduce执行的流程虽然简单,但是过于死板,尤其是在构造复杂算法(迭代)时候非常不利于算法的实现,且执行效率极为低下!


      Spark算法构造和物理执行时最基本的核心:最大化pipeline
    基于Pipeline的思想,数据被使用的时候才开始计算,从数据流动的视角来说,是数据流到计算的位置!实质上从逻辑的角度来看,是算子在数据上流动!
    从算法构建的角度而言:肯定是算子作用于数据,所以是算子在数据上流动;方便算法的构建!
      
      从物理执行的角度而言:是数据流动到计算的位置;方便系统最为高效的运行!
    对于pipeline而言,数据计算的位置就是每个Stage中最后的RDD,一个震撼人心的内幕真相就是:每个Stage中除了最后一个RDD算子是真实的以外,前面的算子都是假的!
    由于计算的Lazy特性,导致计算从后往前回溯,形成Computing Chain,导致的结果就是需要首先计算出具体一个Stage内部左侧的RDD中本次计算依赖的Partition


    三、窄依赖的物理执行内幕
      一个Stage内部的RDD都是窄依赖,窄依赖计算本身是逻辑上看是从Stage内部最左侧的RDD开始立即计算的,根据Computing Chain,数据从一个计算步骤流动到下一个结算步骤,
    以此类推,直到计算到Stage内部的最后一个RDD来产生计算结果。
      Computing Chain的构建是从后往前回溯构建而成,而实际的物理计算则是让数据从前往后在算子上流动,直到流动到不能再流动位置才开始计算下一个Record。


    四、宽依赖物理执行内幕
      必须等到依赖的父Stage中的最后一个RDD全部数据彻底计算完毕,才能能够经过shuffle来计算当前Stage。
    -----------------
    第二十四课 Spark Hash Shuffle内幕彻底解密
    一:到底什么是Shuffle?
      Shuffle中文翻译为“洗牌”,需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。




































    展开全文
  • databricks-notebooks:示例Databricks Spark笔记本的集合(主要用于Azure Databricks)
  • Spark笔记1.docx

    2020-04-03 09:17:15
    什么是Spark? Spark特点?Hadoop与Spark的对比?Spark运行模式?Spark安装部署 standalone集群模式启动
  • 【Spark】spark笔记

    2019-04-12 21:13:22
    Spark粗略总结,后面会排版更新 1.spark简介 spark是一种专门为大规模数据处理而设计的 快速通用的计算引擎。 2.与mapreduce的对比 spark在计算过程中产生的中间输出结果是保存在内存中的 spark一般情况下比...

    Spark粗略总结,后面会排版更新

     

    1.spark简介

    spark是一种专门为大规模数据处理而设计的 快速通用的计算引擎。

    2.与mapreduce的对比

    spark在计算过程中产生的中间输出结果是保存在内存中的

    spark一般情况下比mapreduce快十倍,在迭代计算(机器学习中的逻辑回归)的时候可以快100倍

    3.spark速度快的原因

    (1)基于内存计算,也就是中间输出结果保存在内存中

    (2)DAG有向无环图可以切分任务执行的先后顺序

    4.运行模式

    (1)local(2)standalone(3)yarn(4)mesos

    5.RDD(弹性分布式数据集)的五大特性

    (1)RDD是由一系列patition组成的

    (2)函数是作用在patition上的

    (3)RDD之间具有依赖关系

    (4)分区器是作用在具有K,V格式的RDD上的

    (5)RDD提供一系列的最佳计算位置,移动计算不移动数据

    6.什么是K,V格式的RDD

    RDD中存储的都是二元组对象

    7.RDD的弹性如何体现

    patition的个数和大小可以改变,没有限制,而RDD又是由一系列的patition组成的

    8.RDD的容错如何体现

    RDD之间具有依赖关系,可以通过上一个RDD重新计算出RDD

    9.RDD的分布式的体现

    patition是分布式的在不同的节点上的,RDD由一系列的patition组成

    10.RDD的API代码流程

    (1)创建一个SparkConf对象

    (2)创建一个SparkContext对象,并传入参数conf

    (3)基于Spark创建一个RDD对象

    (4)应用程序中要有action行动算子来触发Transformations转换算子的执行

    (5)关闭Spark上下文对象

    展开全文
  • spark笔记之Spark SQL

    千次阅读 2018-09-03 14:18:35
    1. Spark SQL概述1.1. Spark SQL的前世今生  Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容。Shark建立在Hive的代码基础上,并通过将Hive的部分物理执行计划交换出来。这个方法使得Shark的用户...
  • 2.1 Spark Streaming原理 Spark Streaming 是基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。 2.2 Spark Streaming计算...
  • 1.1 Spark Streaming概述1.1.1什么是Spark Streaming Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据...
  • 大数据spark笔记.zip

    2019-12-22 22:41:59
    spark项目:数组也是一种复杂数据类型,表示一组有序的值的列表,可以通过数值索引来访问其中的值。数组的值也可以是任意类型——简单值、对象或数组 JSON数组也没有变量和分号,把数组和对象结合起来,可以构成...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 21,849
精华内容 8,739
关键字:

spark笔记