精华内容
下载资源
问答
  • Spark中广播变量
    2022-01-22 13:54:52

    1.广播变量的意义
    广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少,从而减少传输过程中的IO,减少存放变量的内存占用.

    2.下面一张高清大图说明广播的过程,Driver将数据collect到一起,然后将完整的数据分发到executors上,进行相应的处理
    在这里插入图片描述

    3.广播变量的用法
    广播变量用法很简单,其实就是SparkContext的broadcast()方法,传入你要广播的变量,即可。

    context.broadcast(a)        // a 为需要广播出去的变量;context 为SparkContext
    

    使用广播变量的时候,直接调用广播变量(Broadcast类型)的value() / getValue() 可以获取到之前封装的广播变量。

    a.value()  //a为上面广播出去的变量。
    

    4.广播变量的例子
    需求:查询日志中每个省所拥有的资源数

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      *  广播变量的例子
      */
    object IPLocation {
      val rulesFilePath = "D:\\data\\ip.txt"
      val accessFilePath = "D:\\data\\access.log"
    
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
        val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
        //1:读取IP规则资源库
        val ipRulesLines: RDD[String] = sc.textFile(rulesFilePath)
        //2:整理IP规则
        //117.93.244.0|117.93.255.255|1969091584|1969094655|亚洲|中国|江苏|盐城||电信|320900|China|CN|120.139998|33.377631
        val ipRules: RDD[(Long, Long, String)] = ipRulesLines.map(line => {
          val fields = line.split("[|]")
          val startNum = fields(2).toLong
          val endNum = fields(3).toLong
          val province = fields(6)
          (startNum, endNum, province)
        })
        //
        //var result = ipRules.collect()
        //println(result.toBuffer)
        //3: 将IP规则收集到Driver(collect)
        val allIpRulesInDriver: Array[(Long, Long, String)] = ipRules.collect()
        //4:将全部的ip资源库通过广播的方式发送到Executor
        //广播之后,在Driver端获取了广播变量的引用(如果没有广播完,就不往下走)
        val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(allIpRulesInDriver)
    
        //5: 读取访问日志
        val accessLogLine: RDD[String] = sc.textFile(accessFilePath)
        //6: 整理访问日志
        //20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|
        val provinceAddOne: RDD[(String, Int)] = accessLogLine.map(line=>{
          val fields = line.split("[|]")
          val ip = fields(1)
          val ipNum = MyUtils.ip2Long(ip)
          //通过广播变量的引用获取Executor中的全部IP规则,然后进行匹配ip规则
          val allIpRulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value
          //根据规则进行查找,(用二分查找算法)
          var province = "未知"
          val index = MyUtils.binarySearch(allIpRulesInExecutor, ipNum)
          if(index != -1 ){
            province = allIpRulesInExecutor(index)._3
          }
          (province, 1)
        })
    
        //7: 按照省份的访问次数进行计数
        val reduceRDD: RDD[(String, Int)] = provinceAddOne.reduceByKey(_+_)
    
        //8:打印结果
        //    var result = reduceRDD.collect()
        //    println(result.toBuffer)
    
        //计算结果,将计算好的结果写入到mysql中
        //触发一个action,将数据写到mysql的逻辑函数传入
        //    reduceRDD.foreach(t =>{
        //      val conn = DriverManager.getConnection("jdbc:mysql://bigdata01:3306/bigdata", "root", "123456")
        //      val pstm = conn.prepareStatement("Insert Into .... values(?.?)")
        //      pstm.setString(1, t._1)
        //      pstm.setInt(2, t._2)
        //      pstm.executeUpdate()
        //      pstm.close()
        //      conn.close()
        //    })
        reduceRDD.foreachPartition(MyUtils.data2MySQL _)
    
        //9:释放资源
        sc.stop()
      }
    }
    

    5.上述涉及到的工具类

    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    /**
      * 两个工具类,一个转换成long,一个二分查找
      */
    object MyUtils {
    //Ip转换成Long类型
      def ip2Long(ip:String):Long ={
        val fragments = ip.split("[.]")
        var ipNum =0L
        for(i<- 0 until fragments.length){
          ipNum = fragments(i).toLong | ipNum << 8L
        }
        ipNum
      }
    
      def binarySearch(lines: Array[(Long,Long,String)],ip: Long):Int ={
        var low =0
        var high =lines.length-1
        while(low <=high){
          val middle =(low+high)/2
          if((ip>=lines(middle)._1) && (ip<=lines(middle)._2))
            return middle
          if(ip < lines(middle)._1)
            high=middle -1
          else{
            low =middle +1
          }
        }
        -1
      }
      def data2MySQL (it: Iterator[(String,Int)])= {
        //一个迭代器代表一个分区,分区中有多条数据
        //先获得一个JDBC连接
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "926718")
        //将数据通过Connection写入到数据库
        val pstm: PreparedStatement = conn.prepareStatement("insert into access_log values(?,?)") //将分区中的数据一条一条写入到MySQL
        it.foreach(tp => {
          pstm.setString(1, tp._1)
          pstm.setInt(2, tp._2)
          pstm.executeUpdate()
        }) //将分区中的数据全部写完之后,在关闭连接
        if (pstm != null) {
          pstm.close()
        }
        if (conn != null) {
          conn.close()
        }
      }
    
    }
    
    更多相关内容
  • 一、广播变量和累加器 通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向...
  • Spark三大数据结构 RDD 分布式数据集 广播变量:分布式只读共享变量 累加器:分布式只写共享变量 下面是累加器和广播变量的总结,不算是最好的,希望大家包涵
  • 我们知道多进程编程中,进程之间可以...基本概念Spark官方对广播变量的说明如下:广播变量可以让我们在每台计算机上保留一个只读变量,而不是为每个任务复制一份副本。例如,可以使用他们以高效的方式为每个计算节点...

    我们知道多进程编程中,进程之间可以创建共享内存,这是最快的进程通信的方式。那么,对于分布式系统,如何共享数据呢?Spark提供了两种在Spark集群中创建和使用共享变量的机制:广播变量和累加器。

    本文介绍广播变量的基本概念和实现原理。

    基本概念

    Spark官方对广播变量的说明如下:广播变量可以让我们在每台计算机上保留一个只读变量,而不是为每个任务复制一份副本。例如,可以使用他们以高效的方式为每个计算节点提供大型输入数据集的副本。Spark也尽量使用有效的广播算法来分发广播变量,以降低通信成本。

    另外,Spark action操作会被划分成一系列的stage来执行,这些stage根据是否产生shuffle操作来进行划分的。Spark会自动广播每个stage任务需要的通用数据。这些被广播的数据以序列化的形式缓存起来,然后在任务运行前进行反序列化。也就是说,在以下两种情况下显示的创建广播变量才有用:1)当任务跨多个stage并且需要同样的数据时;2)当以反序列化的形式来缓存数据时。

    从以上官方定义我们可以得出Spark广播变量的一些特性:

    1)广播变量会在每个worker节点上保留一份副本,而不是为每个Task保留一份副本。这样有什么好处?可以想象,在一个worker有时同时会运行若干的Task,若把一个包含较大数据的变量为Task都复制一份,而且还需要通过网络传输,应用的处理效率一定会受到很大影响。

    2)Spark会通过某种广播算法来进行广播变量的分发,这样可以减少通信成本。Spark使用了类似于BitTorrent协议的数据分发算法来进行广播变量的数据分发,该分发算法会在后面进行分析。

    3)广播变量有一定的适用场景:当任务跨多个stage,且需要同样的数据时,或以反序列化的形式来缓存数据时。

    本文,将围绕官方对广播变量的定义来分析其实现原理。在分析前,先来看一下广播变量的使用。

    广播变量的创建和使用

    假设你有一个变量: v。要使用该变量来创建一个广播变量时,非常简单,只需要调用SparkContext的broadcast(v)函数即可。在spark-shell下代码如下:

    scala> val v = Array(1,2,3,4,5,6)

    v: Array[Int] = Array(1, 2, 3, 4, 5, 6)

    scala> val bv = sc.broadcast(v)

    res1: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(1)

    scala> bv

    res9: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(2)

    // 获取广播变量的值 scala> bv.value

    res10: Array[Int] = Array(1, 2, 3, 4, 5, 6)

    // 销毁广播变量 scala> bv.destory

    我们把一个变量v(一个普通数组)转换成了一个广播变量bv。通过查看bv的类型,可以看出bv是一个Array[Int]类型的广播变量。我们可以通过bv.value来获取广播变量的值。这样,广播变量bv就可以用到以后的数据计算中了。

    注意,在创建广播变量时,广播变量的值必须是本地的可序列化的值,不能是RDD。

    另外,广播变量一旦创建就不应该再修改,这样可以保证所以的worker节点上的值是一致的。这是因为,现有worker将看不到更新的值,新的worker才可能会看到新的值。

    广播变量的实现原理

    我们根据广播变量的创建和使用流程来分析广播变量的实现。广播变量的实现过程如下图2所示:图2 广播变量的创建和读取过程

    注:BlockManager是Spark数据块管理模块,会在后面的文章详细分析。

    广播变量的创建

    广播变量的创建发生在Driver端,如图2所示,当调用SparkContext#broadcast来创建广播变量时,会把该变量的数据切分成多个数据块,保存到driver端的BlockManger中,使用的存储级别是:MEMORY_AND_DISK_SER。

    所以,广播变量的读取也是懒评价的,只有在Executor端需要获取广播变量时才会去获取。此时广播变量的数据只在Driver端存在。

    此时的状态如图3所示:图3 创建广播变量

    从图3可以看出,此时广播变量被保存在本地,并会把广播变量的值切分成多个数据块进行保存。广播变量数据块的默认大小是4M,数据块太大或太小都不利于数据的传输。

    广播变量的读取

    当要使用广播变量时,需要先获取广播变量的值,其实现流程如图2所示。获取广播变量调用的是bv.value。其实现逻辑如下:

    1)第1步(红色线1):首先从Executor本地的BlockManager中读取广播变量的数据,若存在就直接获取,并返回。不在执行第2步和第3步。若不存在,则执行第2步。

    2)第2步(红色线2):从Driver端获取广播变量的状态和位置信息(由于所有的BlockManager slave端都会向Master端汇报数据块状态)。

    3)第3步:优先从本地目录(数据块就在本地),或者相同主机的其他Executor中读取广播变量数据块。若在本Executor和同主机其他Executor中都不存在,则只能从远端获取数据。从远端获取数据的原则是:先从同一个机架(rack)的主机的Executor端获取。若不能从其他Executor中获取广播变量,则会直接从Driver端获取。

    从以上获取流程可以看出,在执行spark应用时,只要有一个worker节点的Executor从Driver端获取到了广播变量的数据,则其他的Executor就不需要从Driver端获取了。

    当某个Executor上的某个数据块被删除,可以从其他Executor直接获取该数据块,然后把数据块保存到自己的Executor的BlockManager中。

    这个读取的协议类似于:BitTorrent数据传输协议。其读取过程的示意图4所示:图4 广播变量读取过程

    从图4可以看出,Executor4中的任务需要使用广播变量,但它只有该变量的b4数据块。此时,它首先从同主机(worker2节点)的中获取数据,获取到数据块b3;然后分别从不同主机的Executor1和Executor2中读取数据块b1和b2。此时,Executor4就获取到变量b的全部数据块了,然后把这些数据块在自己的BlockManager中保存一份。此时,其他Executor就可以从Executor4中读取数据了。

    当完成这些操作后,各个Executor端的BlockManager(slave端)会向Driver端的BlockManager(master端)汇报数据块的状态。

    广播变量实现源码分析

    broadcast()函数

    通过SparkContext#broadcast函数可以创建一个广播变量,该函数的原型如下:

    def broadcast[T: ClassTag](value: T): Broadcast[T]

    在SparkContext中需要调用broadcast函数来创建一个广播变量,并返回一个org.apache.spark.broadcast.Broadcast对象,这样就可以在分布式函数中来读取广播变量的值。该变量会被发送到Spark集群的每个Executor节点上。

    注意:该广播变量一旦创建,就不应该再修改,因为即使修改了该变量的值,也无法让spark集群的执行节点看到改变后的新值。

    实现该函数的类实体调用过程如下图。创建广播变量的调用过程broadcast()函数的实现流程如下:

    1)判断需要广播的变量是否是RDD类型的变量,若是则终止函数,报告“不能广播RDD变量,可以通过collect()把数据聚集到driver端再广播”的错误。

    2)通过BroadcastManager的newBroadcast函数来创建广播变量,并返回一个Broadcast类的对象,Broadcast是抽象类,所以这里其实是该抽象类唯一实现类:TorrentBroadcast的对象。

    3)注册broadcast的cleanup函数,可以用来清除不再使用的broadcast变量。

    4)最后,返回新创建的TorrentBroadcast对象

    在类SparkContext中,broadcast函数的实现代码如下:

    def broadcast[T: ClassTag](value: T): Broadcast[T] = {

    assertNotStopped()

    // 不能直接广播rdd等分布式变量 require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),

    "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")

    // 通过BroadcastManager工具类来创建一个BroadcastFactory对象 val bc = env.broadcastManager.newBroadcast[T](value, isLocal)

    val callSite = getCallSite

    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)

    cleaner.foreach(_.registerBroadcastForCleanup(bc))

    // 返回Broadcast对象,这里其实是TorrentBroadcast类的对象 bc

    }

    TorrentBroadcast类

    介绍

    真正实现广播变量的操作是在TorrentBroadcast类中。当创建广播变量时,实际上是创建了一个该类的对象。也就是说,当执行以下代码时:

    val bv = sc.broadcast(v)

    实际上会执行:

    private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

    ...

    override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long):

    Broadcast[T] = {

    new TorrentBroadcast[T](value_, id)

    }

    ...

    }

    该类实现了以下的机制:

    1)驱动程序(driver)将序列化对象分成小块,并将这些块存储在驱动程序(driver)的BlockManager中。

    2)在每个executor上,executor首先尝试从其BlockManager中获取对象。 若它不存在,则远程从driver或其他executor(如果可用)中获取对象块。 一旦获得块,它就会将块放在自己的BlockManager中,为其他executor来获取数据做好准备。

    3)通过这种方式,可以防止driver成为发送多个广播数据副本的瓶颈(每个executor一个)。

    代码实现分析对象构造过程

    1)设置配置信息:setConf(SparkEnv.get.conf)。对于广播变量有几个重要的配置项需要设置。一个是切分广播变量时的数据块大小,该参数是由spark.broadcast.blockSize来设置,默认是4M。另外,若设置了spark.broadcast.compress参数,还需要创建压缩广播变量数据的对象。

    2)初始化广播变量的唯一id值:private val broadcastId = BroadcastBlockId(id)

    3)调用writeBlocks先把广播变量的值,作为单个对象写入本地BlockManger,然后把它划分成多个数据块,并保存到本地blockManager中。使用的存储级别是:MEMORY_AND_DISK_SER。实现代码截选如下:

    private def writeBlocks(value: T): Int = {

    ...

    // 保存单个对象 if (!blockManager.putSingle(broadcastId, value,

    MEMORY_AND_DISK,

    tellMaster = false)) {

    ...

    }

    // 把广播变量切分成块,然后对每个块进行序列化,并进行压缩 val blocks =

    TorrentBroadcast.blockifyObject(value, blockSize,

    SparkEnv.get.serializer,

    compressionCodec)

    ...

    blocks.zipWithIndex.foreach { case (block, i) =>

    ...

    // 把广播变量切割成块,并保存到bm中 if (!blockManager.putBytes(pieceId, bytes,

    MEMORY_AND_DISK_SER,

    tellMaster = true)) {

    ...

    }

    }

    blocks.length

    }读取广播变量:readBroadcastBlock

    通过readBroadcastBlock函数来从新构造广播对象,该函数会先从driver或其他executors中读取数据块。在driver端,若需要value值,它会直接从本地的block manager中读取数据。

    readBroadcastBlock函数的实现逻辑如下:

    1)从SparkEnv.get.broadcastManager.cachedValues从来获取对应broadcastId的数据块值:broadcastCache.get(broadcastId)

    2)从blockManager中获取对应id的广播变量的值:blockManager.getLocalValues(broadcastId)

    3)若不能从blockManager中获取值,则调用readBlocks函数来读取数据块。该函数会从driver或其他的executors中读取该变量的数据。

    BroadcastManager

    该类是一个辅助类,用来统一创建broadcast对外的接口。它提供了创建广播变量的对外接口:newBroadcast;删除广播变量的对外接口:unbroadcast;其实它都是调用了TorrentBroadcastFactory对应函数来实现的。

    BroadcastManager对象在SparkEnv中创建,这样在Driver端和Executor端都可以使用。该类的构造函数流程如下:

    1)定义了两个私有化变量,并且会为每个广播变量生成一个唯一的id,在创建broadcast变量时会通过nextBroadcastId.getAndIncrement()进行自增,并调用initialize()函数进行初始化:

    // 是否已经初始 private var initialized = false

    private var broadcastFactory: BroadcastFactory = null

    initialize()

    // 生成广播变量的id,该id是唯一的,这里先初始化,会在创建broadcast变量时进行自增操作 private val nextBroadcastId = new AtomicLong(0)

    2)initialize()函数的实现逻辑如下:

    a)初始化broadcastFactory变量,这里创建了TorrentBroadcastFactory对象

    b)调用TorrentBroadcastFactory的initialize函数来初始化。在实际的代码中,该类的initialize函数什么都不做。

    c)把initialized设置为true,同一个对象只初始化一次

    // Called by SparkContext or Executor before using Broadcast private def initialize() {

    synchronized { // 加锁 if (!initialized) {

    // 初始化broadcastFactory变量,这里创建了TorrentBroadcastFactory对象 broadcastFactory = new TorrentBroadcastFactory

    // 调用TorrentBroadcastFactory的initialize函数来初始化 broadcastFactory.initialize(isDriver, conf, securityManager)

    // 把initialized设置为true,同一个对象只初始化一次 initialized = true

    }

    }

    }

    3)从以上分析可以看到,当创建广播变量时,实际上是调用的TorrentBroadcastFactory类的newBroadcast函数来进行创建。

    TorrentBroadcastFactory工厂类

    该类提供了用来创建TorrentBroadcast对象的工厂函数newBroadcast,并提供了删除规定id的广播变量的对外接口函数:unbroadcast。

    该类的代码相对简单,主要代码如下:

    private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

    ...

    // 调用创建一个TorrentBroadcast对象 override def newBroadcast[T: ClassTag](value_ : T,

    isLocal: Boolean, id: Long): Broadcast[T] = {

    new TorrentBroadcast[T](value_, id)

    }

    ...

    // 删除给定id的广播变量 override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {

    TorrentBroadcast.unpersist(id, removeFromDriver, blocking)

    }

    }

    广播变量的销毁

    广播变量的销毁可以通过调用bv.destory来完成。从实现层面来说,最终调用TorrentBroadcast#unpersist来实现的。

    unpersist()函数

    该函数用来删除Driver端和Executor端的广播变量。其实现如下:

    def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {

    logDebug(s"Unpersisting TorrentBroadcast$id")

    SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)

    }

    该函数会调用blockManagerMaster的removeBroadcast函数来删除在executor上属于该broadcast变量的所有数据块。 实现过程是:从driver端发送一个RemoveBroadcast消息。在Executor上的BlockManager服务接收该消息,就会把广播变量从BlockManager中删除。

    若removeFromDriver设置成True,还会从Driver删除该变量的数据。

    总结

    本文从实现原理和源码两个方面分析了Spark广播变量的原理。

    展开全文
  • 读懂Spark广播变量

    2021-04-17 14:42:25
    1 如何理解广播变量 对指定列表中给定的单词计数。 val dict = List(“spark”, “tune”) val words = spark.sparkContext.textFile(“~/words.csv”) val keywords = words.filter(word => dict.contains...

    目录

    1 如何理解广播变量

    2 广播分布式数据集 

    3 如何用广播变量克制 Shuffle?

    4 小 结


    1 如何理解广播变量

    对指定列表中给定的单词计数。

    val dict = List(“spark”, “tune”)
    val words = spark.sparkContext.textFile(“~/words.csv”)
    val keywords = words.filter(word => dict.contains(word))
    keywords.map((_, 1)).reduceByKey(_ + _).collect

    按照这个需求,同学小 A 实现了如上的代码,一共有 4 行,我们逐一来看。第 1 行在 Driver 端给定待查单词列表 dict;第 2 行以 textFile API 读取分布式文件,内容包含一列,存储的是常见的单词;第 3 行用列表 dict 中的单词过滤分布式文件内容,只保留 dict 中给定的单词;第 4 行调用 reduceByKey 对单词进行累加计数。

    数据结构dict随着Task一起分发到Executors

                                                                                                       数据结构dict随着Task一起分发到Executors

    学习过调度系统之后,我们知道,第一行代码定义的 dict 列表连带后面的 3 行代码会一同打包到 Task 里面去。这个时候,Task 就像是一架架小飞机,携带着这些“行李”,飞往集群中不同的 Executors。对于这些“行李”来说,代码的“负重”较轻,可以忽略不计,而数据的负重占了大头,成了最主要的负担。

    你可能会说:“也还好吧,dict 列表又不大,也没什么要紧的”。但是,我们假设这个例子中的并行度是 10000,那么,Driver 端需要通过网络分发总共 10000 份 dict 拷贝。这个时候,集群内所有的 Executors 需要消耗大量内存来存储这 10000 份的拷贝,对宝贵的网络和内存资源来说,这已经是一笔不小的浪费了。更何况,如果换做一个更大的数据结构,Task 分发所引入的网络与内存开销会更可怕。换句话说,统计计数的业务逻辑还没有开始执行,Spark 就已经消耗了大量的网络和存储资源,这简直不可理喻。因此,我们需要对示例中的代码进行优化,从而跳出这样的窘境。

    但是,在着手优化之前,我们不妨先来想一想,现有的问题是什么,我们要达到的目的是什么。结合刚刚的分析,我们不难发现,Word Count 的核心痛点在于,数据结构的分发和存储受制于并行,并且是以 Task 为粒度的,因此往往频次过高。痛点明确了,调优的目的也就清晰了,我们需要降低数据结构分发的频次。要达到这个目的,我们首先想到的就是降低并行度。不过,牵一发而动全身,并行度一旦调整,其他与 CPU、内存有关的配置项都要跟着适配,这难免把调优变复杂了。实际上,要降低数据结构的分发频次,我们还可以考虑广播变量。

    广播变量是一种分发机制,它一次性封装目标数据结构,以 Executors 为粒度去做数据分发。换句话说,在广播变量的工作机制下,数据分发的频次等同于集群中的 Executors 个数。通常来说,集群中的 Executors 数量都远远小于 Task 数量,相差两到三个数量级是常有的事。那么,对于第一版的 Word Count 实现,如果我们使用广播变量的话,会有哪些变化呢?代码的改动很简单,主要有两个改动:第一个改动是用 broadcast 封装 dict 列表,第二个改动是在访问 dict 列表的地方改用 broadcast.value 替代。

    
    val dict = List(“spark”, “tune”)
    val bc = spark.sparkContext.broadcast(dict)
    val words = spark.sparkContext.textFile(“~/words.csv”)
    val keywords = words.filter(word => bc.value.contains(word))
    keywords.map((_, 1)).reduceByKey(_ + _).collect

    你可能会说:“这个改动看上去也没什么呀!”别着急,我们先来分析一下,改动之后的代码在运行时都有哪些变化。 在广播变量的运行机制下,封装成广播变量的数据,由 Driver 端以 Executors 为粒度分发,每一个 Executors 接收到广播变量之后,将其交给 BlockManager 管理由于广播变量携带的数据已经通过专门的途径存储到 BlockManager 中,因此分发到 Executors 的 Task 不需要再携带同样的数据。

    个时候,你可以把广播变量想象成一架架专用货机,专门为 Task 这些小飞机运送“大件行李”。Driver 与每一个 Executors 之间都开通一条这样的专用货机航线,统一运载负重较大的“数据行李”。有了专用货机来帮忙,Task 小飞机只需要携带那些负重较轻的代码就好了。等这些 Task 小飞机在 Executors 着陆,它们就可以到 Executors 的公用仓库 BlockManager 里去提取它们的“大件行李”。

    总之,在广播变量的机制下,dict 列表数据需要分发和存储的次数锐减。我们假设集群中有 20 个 Executors,不过任务并行度还是 10000,那么,Driver 需要通过网络分发的 dict 列表拷贝就会由原来的 10000 份减少到 20 份。同理,集群范围内所有 Executors 需要存储的 dict 拷贝,也由原来的 10000 份,减少至 20 份。这个时候,引入广播变量后的开销只是原来 Task 分发的 1/500!

    2 广播分布式数据集 

    那在刚刚的示例代码中,广播变量封装的是 Driver 端创建的普通变量:字符串列表。除此之外,广播变量也可以封装分布式数据集。我们来看这样一个例子。在电子商务领域中,开发者往往用事实表来存储交易类数据,用维度表来存储像物品、用户这样的描述性数据。事实表的特点是规模庞大,数据体量随着业务的发展不断地快速增长。维度表的规模要比事实表小很多,数据体量的变化也相对稳定。假设用户维度数据以 Parquet 文件格式存储在 HDFS 文件系统中,业务部门需要我们读取用户数据并创建广播变量以备后用,我们该怎么做呢?很简单,几行代码就可以搞定!

    
    val userFile: String = “hdfs://ip:port/rootDir/userData”
    val df: DataFrame = spark.read.parquet(userFile)
    val bc_df: Broadcast[DataFrame] = spark.sparkContext.broadcast(df)

    首先,我们用 Parquet API 读取 HDFS 分布式数据文件生成 DataFrame,然后用 broadcast 封装 DataFrame。从代码上来看,这种实现方式和封装普通变量没有太大差别,它们都调用了 broadcast API,只是传入的参数不同。但如果不从开发的视角来看,转而去观察运行时广播变量的创建过程的话,我们就会发现,分布式数据集与普通变量之间的差异非常显著。从普通变量创建广播变量,由于数据源就在 Driver 端,因此,只需要 Driver 把数据分发到各个 Executors,再让 Executors 把数据缓存到 BlockManager 就好了。但是,从分布式数据集创建广播变量就要复杂多了,具体的过程如下图所示。

     与普通变量相比,分布式数据集的数据源不在 Driver 端,而是来自所有的 Executors。Executors 中的每个分布式任务负责生产全量数据集的一部分,也就是图中不同的数据分区。因此,步骤 1 就是 Driver 从所有的 Executors 拉取这些数据分区,然后在本地构建全量数据。步骤 2 与从普通变量创建广播变量的过程类似。 Driver 把汇总好的全量数据分发给各个 Executors,Executors 将接收到的全量数据缓存到存储系统的 BlockManager 中。不难发现,相比从普通变量创建广播变量,从分布式数据集创建广播变量的网络开销更大。原因主要有二:一是,前者比后者多了一步网络通信;二是,前者的数据体量通常比后者大很多。

    3 如何用广播变量克制 Shuffle?

    你可能会问:“Driver 从 Executors 拉取 DataFrame 的数据分片,揉成一份全量数据,然后再广播出去,抛开网络开销不说,来来回回得费这么大劲,图啥呢?”这是一个好问题,因为以广播变量的形式缓存分布式数据集,正是克制 Shuffle 杀手锏。

    Shuffle Joins为什么这么说呢?我还是拿电子商务场景举例。有了用户的数据之后,为了分析不同用户的购物习惯,业务部门要求我们对交易表和用户表进行数据关联。这样的数据关联需求在数据分析领域还是相当普遍的。

    
    val transactionsDF: DataFrame = _
    val userDF: DataFrame = _
    transactionsDF.join(userDF, Seq(“userID”), “inner”)

    因为需求非常明确,同学小 A 立即调用 Parquet 数据源 API,读取分布式文件,创建交易表和用户表的 DataFrame,然后调用 DataFrame 的 Join 方法,以 userID 作为 Join keys,用内关联(Inner Join)的方式完成了两表的数据关联。在分布式环境中,交易表和用户表想要以 userID 为 Join keys 进行关联,就必须要确保一个前提:交易记录和与之对应的用户信息在同一个 Executors 内。也就是说,如果用户黄小乙的购物信息都存储在 Executor 0,而个人属性信息缓存在 Executor 2,那么,在分布式环境中,这两种信息必须要凑到同一个进程里才能实现关联计算。在不进行任何调优的情况下,Spark 默认采用 Shuffle Join 的方式来做到这一点。Shuffle Join 的过程主要有两步。

    第一步就是对参与关联的左右表分别进行 Shuffle,Shuffle 的分区规则是先对 Join keys 计算哈希值,再把哈希值对分区数取模。由于左右表的分区数是一致的,因此 Shuffle 过后,一定能够保证 userID 相同的交易记录和用户数据坐落在同一个 Executors 内。

    Shuffle 完成之后,第二步就是在同一个 Executors 内,Reduce task 就可以对 userID 一致的记录进行关联操作。但是,由于交易表是事实表,数据体量异常庞大,对 TB 级别的数据进行 Shuffle,想想都觉得可怕!因此,上面对两个 DataFrame 直接关联的代码,还有很大的调优空间。我们该怎么做呢?话句话说,对于分布式环境中的数据关联来说,要想确保交易记录和与之对应的用户信息在同一个 Executors 中,我们有没有其他办法呢?

    克制 Shuffle 的方式 ?

    还记得之前业务部门要求我们把用户表封装为广播变量,以备后用吗?现在它终于派上用场了!

    
    import org.apache.spark.sql.functions.broadcast
     
    val transactionsDF: DataFrame = _
    val userDF: DataFrame = _
     
    val bcUserDF = broadcast(userDF)
    transactionsDF.join(bcUserDF, Seq(“userID”), “inner”)
    

     Driver 从所有 Executors 收集 userDF 所属的所有数据分片,在本地汇总用户数据,然后给每一个 Executors 都发送一份全量数据的拷贝。既然每个 Executors 都有 userDF 的全量数据,这个时候,交易表的数据分区待在原地、保持不动,就可以轻松地关联到一致的用户数据。如此一来,我们不需要对数据体量巨大的交易表进行 Shuffle,同样可以在分布式环境中,完成两张表的数据关联。

    利用广播变量,我们成功地避免了海量数据在集群内的存储、分发,节省了原本由 Shuffle 引入的磁盘和网络开销,大幅提升运行时执行性能。当然,采用广播变量优化也是有成本的,毕竟广播变量的创建和分发,也是会带来网络开销的。但是,相比大表的全网分发,小表的网络开销几乎可以忽略不计。这种小投入、大产出,用极小的成本去博取高额的性能收益,真可以说是“四两拨千斤”!

    4 小 结

    在数据关联场景中,广播变量是克制 Shuffle 的杀手锏。掌握了它,我们就能以极小的成本,获得高额的性能收益。关键是我们要掌握两种创建广播变量的方式。

    • 第一种,从普通变量创建广播变量。在广播变量的运行机制下,普通变量存储的数据封装成广播变量,由 Driver 端以 Executors 为粒度进行分发,每一个 Executors 接收到广播变量之后,将其交由 BlockManager 管理。
    • 第二种,从分布式数据集创建广播变量,这就要比第一种方式复杂一些了。第一步,Driver 需要从所有的 Executors 拉取数据分片,然后在本地构建全量数据;第二步,Driver 把汇总好的全量数据分发给各个 Executors,Executors 再将接收到的全量数据缓存到存储系统的 BlockManager 中。

    结合这两种方式,我们在做数据关联的时候,把 Shuffle Joins 转换为 Broadcast Joins,就可以用小表广播来代替大表的全网分发,真正做到克制 Shuffle。

    展开全文
  • 一、自定义排序 自定义排序  Spark对简单的数据类型可以直接排序,但是对于一些复杂的条件就需要用自定义排序来实现了 1.第一种定义方法: 用到了隐式转换  package scalaBase.day11 import org.apache.spark.rdd...
  • 本期将介绍下 Spark 编程中两种类型的共享变量:累加器和广播变量。 简单说,累加器是用来对信息进行聚合的,而广播变量则是用来高效分发较大对象的。 学习目标 闭包的概念 累加器的原理 广播变量的原理 1. ...

    前言

    本期将介绍下 Spark 编程中两种类型的共享变量:累加器和广播变量。
    简单说,累加器是用来对信息进行聚合的,而广播变量则是用来高效分发较大对象的。

    学习目标

    • 闭包的概念
    • 累加器的原理
    • 广播变量的原理

    1. 闭包的概念

    在讲共享变量之前,我们先了解下啥是闭包,代码如下。

    var n = 1
    val func = (i:Int) => i + n
    

    函数 func 中有两个变量 n 和 i ,其中 i 为该函数的形式参数,也就是入参,在 func 函数被调用时, i 会被赋予一个新的值,我们称之为绑定变量(bound variable)。而 n 则是定义在了函数 func 外面的,该函数并没有赋予其任何值,我们称之为自由变量(free variable)。

    像 func 函数这样,返回结果依赖于声明在函数外部的一个或多个变量,将这些自由变量捕获并构成的封闭函数,我们称之为“闭包”。

    先看一个累加求和的栗子,如果在集群模式下运行以下代码,会发现结果并非我们所期待的累计求和。

    var sum = 0
    val arr = Array(1,2,3,4,5)
    sc.parallelize(arr).foreach(x => sum + x)
    println(sum)
    

    sum 的结果为0,导致这个结果的原因就是存在闭包。

    在集群中 Spark 会将对 RDD 的操作处理分解为 Tasks ,每个 Task 由 Executor 执行。而在执行之前,Spark会计算 task 的闭包(也就是 foreach() )。闭包会被序列化并发送给每个 Executor,但是发送给 Executor 的是副本,所以在 Driver 上输出的依然是 sum 本身。

    在这里插入图片描述

    如果想对 sum 变量进行更新,则就要用到接下来我们要讲的累加器。

    2. 累加器的原理

    累加器是对信息进行聚合的,通常在向 Spark 传递函数时,比如使用 map() 或者 filter() 传条件时,可以使用 Driver 中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,然而,正如前面所述,更新这些副本的值,并不会影响到 Driver 中对应的变量。

    累加器则突破了这个限制,可以将工作节点中的值聚合到 Driver 中。它的一个典型用途就是对作业执行过程中的特定事件进行计数。

    举个栗子,给了一个日志记录,需要统计这个文件中有多少空行。

    val sc = new SparkContext(...)
    val logs = sc.textFile(...)
    val blanklines = sc.accumulator(0)
    val callSigns = logs.flatMap(line => {
    	if(line == ""){
    		blanklines += 1
    	}
    	line.split("")
    })
    callSigns.count()
    println("日志中的空行数为:" + blanklines.value)
    

    总结下累加器的使用,首先 Driver 调用了 SparkContext.accumulator(initialValue) 方法,创建一个名为 blanklines 且初始值为0的累加器。然后在遇到空行时,Spark 闭包里的执行器代码就会对其 +1 。执行完成之后,Driver 可以调用累加器的 value 属性来访问累加器的值。

    需要说明的是,只有在行动算子 count() 运行之后,才可以 println 出正确的值,因为我们之前讲过 flatMap() 是惰性计算的,只有遇到行动操作之后才会出发强制执行运算进行求值。

    另外,工作节点上的任务是不可以访问累加器的值,在这些任务看来,累加器是一个只写的变量。

    对于累加器的使用,不仅可以进行数据的 sum 加法,也可以跟踪数据的最大值 max、最小值 min等。

    3. 广播变量的原理

    前面说了,Spark 会自动把闭包中所有引用到的自由变量发送到工作节点上,那么每个 Task 的闭包都会持有自由变量的副本。如果自由变量的内容很大且 Task 很多的情况下,为每个 Task 分发这样的自由变量的代价将会巨大,必然会对网络 IO 造成压力。

    广播变量则突破了这个限制,不是把变量副本发给所有的 Task ,而是将其分发给所有的工作节点一次,这样节点上的 Task 可以共享一个变量副本。

    Spark 使用的是一种高效的类似 BitTorrent 的通信机制,可以降低通信成本。广播的数据只会被发动各个节点一次,除了 Driver 可以修改,其他节点都是只读,并且广播数据是以序列化形式缓存在系统中的,当 Task 需要数据时对其反序列化操作即可。

    在使用中,Spark 可以通过调用 SparkContext.broadcast(v) 创建广播变量,并通过调用 value 来访问其值,举栗代码如下:

    val broadcastVar = sc.broadcast(Array(1,2,3))
    broadcastVar.value
    

    以上是本期分享,如有帮助请 点赞+关注+收藏 支持下哦~
    下期继续讲解 RDD 内容。

    往期精彩内容回顾:

    1 - Spark 概述(入门必看)
    2 - Spark 的模块组成
    3 - Spark 的运行原理
    4 - RDD 概念以及核心结构
    5 - Spark RDD 的宽窄依赖关系
    6 - 详解 Spark RDD 的转换操作与行动操作
    7 - Spark RDD 中常用的操作算子
    可扫码关注

    在这里插入图片描述

    展开全文
  • Flink广播变量

    2022-02-10 16:47:18
    在Flink中,taskmanager中划分不同slot,计算过程中算子在多个...在一个算子中使用广播变量主要有两个步骤: 广播变量 使用 withBroadcastSet(data, "name") 这个方法即可, name变量代表了获取该广播变量的名称
  • Spark中广播变量详解

    2020-12-28 19:42:45
    Spark中广播变量详解以及如何动态更新广播变量​mp.weixin.qq.com【前言:Spark目前提供了两种有限定类型的共享变量:广播变量和累加器,今天主要介绍一下基于Spark2.4版本的广播变量。先前的版本比如Spark2.1之前的...
  • Spark广播变量

    2021-08-24 17:57:42
    Spark广播变量 1. 什么是广播变量广播变量(Boardcast)是Spark中应对shuffle造成的性能变慢的有效克制手段,它是一种分发机制,一次性封装目标数据结构,以Excutor为粒度做数据分发。数据分发数=Excutor数 1.1 ...
  • 【前言:Spark目前提供了两种有限定类型的共享变量:广播变量和累加器,今天主要介绍一下基于Spark2.4版本的广播变量。先前的版本比如Spark2.1之前的广播变量有两种实现:HttpBroadcast和TorrentBroadcast,但是鉴于...
  • 1、Spark Streaming更新广播变量的方式 在Driver端通过累加器数据来一条就判断是否需要更新广播变量,通过这种方式就可以实现定时更新广播变量的方式。 lines.foreachRDD(rdd=>{ // 这里单例模式实例化广播...
  • 按照创建与使用方式的不同,Spark 提供了两类共享变量,分别是广播变量(Broadcast variables)和累加器(Accumulators)。接下来,我们就正式进入今天的学习,去深入了解这两种共享变量的用法、以及它们各自的适用...
  • 1、广播变量 广播变量的定义: 广播变量可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个spark操作使用,在机器学习中非常有用。广播变量是类型为spark.broadcast.Broadcast[T]的一个对象,其中...
  • spark中的广播变量

    2020-08-01 12:32:50
    一、使用广播变量的好处 1、Driver每次分发任务的时候会把task和计算逻辑的变量发送给Executor。不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。这样会导致消耗大量的内存导致严重的...
  • } } 2、广播变量:是通过广播将广播变量分发到taskmanager中进行处理 广播变量的使用步骤: 1、初始化数据 DataSet toBroadcast = env.fromElements(1,2,3); 2、广播数据(即注册数据,那个算子用,就在那个算子...
  • 前提:在spark环境下,当我们传递一个操作(例如:map,reduce)的函数到远程多个节点上进行运行时,各个节点都需要使用到该函数中的变量。如果变量比较大,如何下发这些变量呢?如果我们使用下面的方式,进行数据下发...
  • Spark 广播变量

    2020-11-23 09:41:43
    什么是广播变量 为什么需要广播变量? /** 以下代码就会出现一个问题: list是在driver端创建的,但是因为需要在executor端使用,所以driver会把list以task的形式发送到excutor端,也就相当于在executor需要复制一份,...
  • 1: 单独的broadcast 算子 (此时没有广播流) 是下游算子的每个并行度都会处理,等于数据扩大n倍 (而且后续的每个算子都是扩大了n 倍) DataStream a= source .broadcast(); DataStream b= a....
  • 广播变量

    2020-08-15 20:53:49
    有一个大的数据集和一个小的数据集,把每一个小的数据集在每一个executor里面进行缓存,大的小的数据集放在hdfs里面,如果传统的方法,那么就是调用join ,...3.在进行广播应用的时候,只需要把数据广播到当前任务的execu.
  • Spark流处理中定时更新广播变量

    千次阅读 2022-02-07 17:24:00
    Spark流处理中定时更新广播变量值 在实际项目应用上,某些需求会有更新静态规则表的情况,如消息过滤规则、风控规则等。通常这样的表数据量不会大,在spark中使用广播变量的形式使用,而广播变量是不支持更新的,...
  • Spark—广播变量

    2021-03-07 03:38:58
    广播变量Spark有两种共享变量——累加器、广播变量广播变量可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。需求来源Spark中分布式执行的代码需要传递到各个Executor的Task上...
  • 广播变量的作用 广播变量:分布式只读变量。 如果Executor端需要访问Driver端的某个变量,spark会向Executor端每个task都发送一个此变量的副本。如果此变量很大,就会占用大量的Executor节点的内存。 利用广播变量...
  • spark 广播变量和累加器使用和原理

    千次阅读 2022-02-28 09:55:51
    这些变量被复制到每台服务器上,对远程服务器上变量的任何更新都不会传播回driver程序。通常支持跨Tasks的读写共享变量性能比较低。也就是说如果在一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝...
  • Spark的广播变量机制

    2020-12-24 03:19:09
    Spark广播变量什么是广播变量?在同一个Execute共享同一份计算逻辑的变量广播变量使用场景我现在要在在这些内容中过滤java和object-c过滤内容使用广播变量过滤代码逻辑:packagecom.chenzhipeng.spark.examples01;...
  • Flink广播变量和分布式缓存 一:Flink广播变量 Flink支持广播变量,就是将数据广播到具体的taskmanager上,数据存储在内存中,这样可以减缓大量的shuffle操作; 比如在数据join阶段,不可避免的就是大量的shuffle操作...
  • Spark RDD、广播变量简介

    多人点赞 2020-05-18 23:24:54
    2.1 定义一个广播变量 val x = 1 val broadcast = sc.broadcast(x) 2.2 还原一个广播变量 val a = broadcast.value 2.3 注意事项 1) 能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的。可以将...
  • flink广播变量适用于解决活动配置、白名单等应用场景,根据活动配置或者白名单过滤数据后再做后续加工处理。广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。如果海量数据中,需要过滤出几百万甚至...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 86,833
精华内容 34,733
关键字:

广播变量