精华内容
下载资源
问答
  • 动态资源分配
    千次阅读
    2019-06-01 22:39:01

    同反压机制一样,Spark Streaming动态资源分配(即DRA,Dynamic Resource Allocation)也可以用来应对流处理中批次流量过载的场景。

    Spark Streaming动态资源分配,允许为应用动态分配资源。当任务积压时,申请更多资源;当任务空闲时,使用最少资源。

    在生产中,可将动态资源分配和背压机制一起使用,通过背压机制来细粒度确保系统稳定;通过动态资源分配机制来粗粒度根据应用负载,动态增减Executors。共同保证Spark Streaming流处理应用的稳定高效。

    动态资源分配的原理

    入口类是org.apache.spark.streaming.scheduler.ExecutorAllocationManagerExecutorAllocationManager中的定时器,每隔spark.streaming.dynamicAllocation.scalingInterval时间,调用一次manageAllocation方法来管理ExecutormanageAllocation方法计算规则如下:

    1. 必须完成至少一个Batch处理,即batchProcTimeCount > 0

    2. 计算Batch平均处理时间(Batch平均处理时间=Batch总处理时间/Batch总处理次数)。

    3. Batch平均处理时间大于阈值spark.streaming.dynamicAllocation.scalingUpRatio,则请求新的Executor。

    4. Batch平均处理时间小于阈值spark.streaming.dynamicAllocation.scalingDownRatio,则移除没有任务的Executor。

    动态资源分配重要参数

    1. spark.dynamicAllocation.enabled: 默认false,是否启用Spark批处理动态资源分配。

    2. spark.streaming.dynamicAllocation.enabled: 默认false,是否启用Spark Streaming流处理动态资源分配。

    3. spark.streaming.dynamicAllocation.scalingInterval: 默认60秒,多久检查一次。

    4. spark.streaming.dynamicAllocation.scalingUpRatio: 默认0.9,增加Executor的阈值。

    5. spark.streaming.dynamicAllocation.scalingDownRatio: 默认0.3,减少Executor的阈值。

    6. spark.streaming.dynamicAllocation.minExecutors: 默认无,最小Executor个数

    7. spark.streaming.dynamicAllocation.maxExecutors: 默认无,最大Executor个数。

    Spark Streaming动态资源分配注意事项

    1. Spark Streaming动态资源分配和Spark Core动态资源分配互斥

    Spark Core动态资源分配适合于批处理,如Spark Sql Cli,可以根据Task数量动态分配Executor数量;如Spark ThriftServer On Yarn,空闲时不占用资源,只有在用户提交Sql任务时才会根据Task数动态分配Executor数。

    当开启Spark Streaming动态资源分配时,需要关闭Spark Core动态资源分配。

    1. Spark Streaming动态资源分配起作用前,需要至少完成一个Batch处理

    由于Spark Streaming动态资源分配需要根据Batch总处理时间和Batch总处理次数来计算Batch平均处理时间,因此需要至少完成一个Batch处理。这就需要我们保证在Spark Streaming动态资源分配起作用前,应用程序不会崩溃。

    1. Spark Streaming动态资源分配应当和Spark Streaming背压机制同时使用

    启用动态资源分配

    
    sparkCommLib=/data/apps/sparkCommLib
    
    /usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --queue default \
    --name spark_streaming_dra \
    --driver-cores 1 \
    --driver-memory 1G \
    --executor-memory 1G \
    --conf spark.dynamicAllocation.enabled=false \
    --conf spark.streaming.dynamicAllocation.enabled=true \
    --conf spark.streaming.dynamicAllocation.minExecutors=1 \
    --conf spark.streaming.dynamicAllocation.maxExecutors=15 \
    --jars ${sparkCommLib}/kafka_2.11-0.10.1.0.jar,${sparkCommLib}/kafka-clients-0.10.1.0.jar,${sparkCommLib}/spark-streaming-kafka-0-10_2.11-2.1.1.jar,${sparkCommLib}/fastjson-1.2.5.jar \
    --class com.bigData.spark.SparkStreamingDRA \
    spark-1.0-SNAPSHOT.jar
    

    在Yarn上可以看到,随着Spark Streaming任务队列中Queued的Batch越来越多,Executors数量在逐渐增加。

    更多相关内容
  • 基于分解的多目标进化算法(具有动态资源分配 (DRA) 的 MOEA/D) Decomposition Based Multi Objective Evolutionary Algorithm 代码是基于基于分解的多目标进化算法(MOEA/D)的思想和资源分配策略开发的。资源分配...
  • 各种资源分配算法实现,关于神经网络控制,用MATLAB实现动态聚类或迭代自组织数据分析,搭建OFDM通信系统的框架,PLS部分最小二乘工具箱,使用高阶累积量对MPSK信号进行调制识别,解耦,恢复原信号,虚拟力的无线传感...
  • 传统的互联网构架模型已难以满足...仿真结果表明,本文提出的基于混合博弈的虚拟网络动态资源分配方案相对于传统资源分配方案而言,充分利用了基础设施提供商提供的物理资源,同时有效预防链路拥塞,增加了用户满意度。
  • 论文研究-云计算环境中SBS应用动态资源分配方法.pdf, 建立了一个SBS(基于服务的系统,service based system)应用端到端性能评价模型,并在该模型的基础上提出了SBS...
  • 本文讲述了3G+LTE的动态资源分配机制。
  • 这篇文章会详细介绍Spark 动态资源分配原理。 前言 最近在使用Spark Streaming程序时,发现如下几个问题: 1.高峰和低峰Spark Streaming每个周期要处理的数据量相差三倍以上,预分配资源会导致低峰的时候资源的大量...

    Spark 默认采用的是资源预分配的方式。这其实也和按需做资源分配的理念是有冲突的。这篇文章会详细介绍Spark 动态资源分配原理。

    前言
    最近在使用Spark Streaming程序时,发现如下几个问题:

    1.高峰和低峰Spark Streaming每个周期要处理的数据量相差三倍以上,预分配资源会导致低峰的时候资源的大量浪费。

    2.Spark Streaming 跑的数量多了后,资源占用相当可观。

    所以便有了要开发一套针对Spark Streaming 动态资源调整的想法。我在文章最后一个章节给出了一个可能的设计方案。不过要做这件事情,首先我们需要了解现有的Spark 已经实现的 Dynamic Resource Allocation 机制,以及为什么它无法满足现有的需求。

    入口
    在SparkContext 中可以看到这一行:

    _executorAllocationManager =
          if (dynamicAllocationEnabled) {
            Some(new ExecutorAllocationManager(this, listenerBus, _conf))
          } else {
            None
          }
    

    通过spark.dynamicAllocation.enabled参数开启后就会启动ExecutorAllocationManager。

    这里有我第一个吐槽的点,这么直接new出来,好歹也做个配置,方便第三方开发个新的组件可以集成进去。但是Spark很多地方都是这么搞的,完全没有原来Java社区的风格。

    动态调整资源面临的问题

    我们先看看,动态资源调整需要解决哪几个问题:

    1. Cache问题。如果需要移除的Executor含有RDD cache该如何办?
    2. Shuffle问题。如果需要移除的Executor包含了Shuffle Write相关数据该怎么办?
    3. 添加和删除之后都需要告知DAGSchedule进行相关信息更新。

    Cache去掉了重算即可。为了防止数据抖动,默认包含有Cache的Executor是不会被删除的,因为默认的Idle时间设置的非常大:

    private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
    "spark.dynamicAllocation.cachedExecutorIdleTimeout",
    s"${Integer.MAX_VALUE}s")
    

    你可以自己设置从而去掉这个限制。

    而对于Shuffle,则需要和Yarn集成,需要配置yarn.nodemanager.aux-services。具体配置方式,大家可以Google。这样Spark Executor就不用保存Shuffle状态了。

    触发条件
    添加Worker的触发条件是:

    1. 有Stage正在运行,并且预估需要的Executors > 现有的

    删除Woker的触发条件是:

    1. 一定时间内(默认60s)没有task运行的Executor

    我们看到触发条件还是比较简单的。这种简单就意味着用户需要根据实际场景,调整各个时间参数,比如到底多久没有运行task的Executor才需要删除。
    默认检测时间是100ms:

    private val intervalMillis: Long = 100
    

    如何实现Container的添加和释放

    只有ApplicationMaster才能够向Yarn发布这些动作。而真正的中控是org.apache.spark.ExecutorAllocationManager,所以他们之间需要建立一个通讯机制。对应的方式是在ApplicationMaster有一个private class AMEndpoint(类,比如删除释放容器的动作在里就有:

    case KillExecutors(executorIds) =>
            logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
            Option(allocator) match {
              case Some(a) => executorIds.foreach(a.killExecutor)
              case None => logWarning("Container allocator is not ready to kill executors yet.")
            }
            context.reply(true)
    

    而ExecutorAllocationManager则是引用YarnSchedulerBackend实例,该实例持有ApplicationMaster的 RPC引用

    private var amEndpoint: Option[RpcEndpointRef]
    

    如何获取调度信息

    要触发上面描述的操作,就需要任务的调度信息。这个是通过ExecutorAllocationListener extends SparkListener来完成的。具体是在 ExecutorAllocationMaster的start函数里,会将该Listener实例添加到SparkContext里的listenerBus里,从而实现对DAGSchecude等模块的监听。

    根据上面的分析,我们至少要知道如下三个信息:

    1. Executor上是否为空,如果为空,就可以标记为Idle.只要超过一定的时间,就可以删除掉这个Executor.
    2. 正在跑的Task有多少
    3. 等待调度的Task有多少

    这里是以Stage为区分的。分别以三个变量来表示:

    private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
    private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
    private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
    

    名字已经很清楚了。值得说的是stageIdToTaskIndices,其实就是stageId 对应的正在运行的task id 集合。

    那么怎么计算出等待调度的task数量呢?计算方法如下:

    stageIdToNumTasks(stageId) - stageIdToTaskIndices(stageId).size
    

    这些都是动态更新变化的,因为有了监听器,所以任务那边有啥变化,这边都会得到通知。

    定时扫描器

    有了上面的铺垫,我们现在进入核心方法:

    private def schedule(): Unit = synchronized {
        val now = clock.getTimeMillis
    
        updateAndSyncNumExecutorsTarget(now)
    
        removeTimes.retain { case (executorId, expireTime) =>
          val expired = now >= expireTime
          if (expired) {
            initializing = false
            removeExecutor(executorId)
          }
          !expired
        }
      }
    

    该方法会每隔100ms被调度一次。你可以理解为一个监控线程。

    Executor判定为空闲的机制
    只要有一个task结束,就会判定有哪些Executor已经没有任务了。然后会被加入待移除列表。在放到removeTimes的时候,会把当前时间now + executorIdleTimeoutS * 1000 作为时间戳存储起来。当调度进程扫描这个到Executor时,会判定时间是不是到了,到了的话就执行实际的remove动作。在这个期间,一旦有task再启动,并且正好运行在这个Executor上,则又会从removeTimes列表中被移除。那么这个Executor就不会被真实的删除了。

    Executor 需要增加的情况
    首先,系统会根据下面的公式计算出实际需要的Executors数目:

    private def maxNumExecutorsNeeded(): Int = {
        val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
        (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
      }
    

    接着每个计算周期到了之后,会和当前已经有的Executors数:numExecutorsTarget 进行比较。

    1. 如果发现 maxNumExecutorsNeeded < numExecutorsTarget 则会发出取消还有没有执行的Container申请。并且重置每次申请的容器数为1,也就是numExecutorsToAdd=1
    2. 否则如果发现当前时间now >= addTime(addTime 每次会增加一个sustainedSchedulerBacklogTimeoutS ,避免申请容器过于频繁),则会进行新容器的申请,如果是第一次,则增加一个(numExecutorsToAdd),如果是第二次则增加2个以此按倍数类推。直到maxNumExecutorsNeeded <= numExecutorsTarget ,然后就会重置numExecutorsToAdd。

    所以我们会发现,我们并不是一次性就申请足够的资源,而是每隔sustainedSchedulerBacklogTimeoutS次时间,按[1,2,4,8]这种节奏去申请资源的。因为在某个sustainedSchedulerBacklogTimeoutS期间,可能已经有很多任务完成了,其实不需要那么多资源了。而按倍数上升的原因是,防止为了申请到足够的资源时间花费过长。这是一种权衡。

    DRA评价
    我们发现,DRA(Dynamic Resource Allocation)涉及到的点还是很多的,虽然逻辑比较简单,但是和任务调度密切相关,是一个非常动态的过程。这个设计本身也是面向一个通用的调度方式。

    我个人建议如果采用了DRA,可以注意如下几点:

    1. 设置一个合理的minExecutors-maxExecutors值
    2. 将Executor对应的cpuCore 最好设置为<=3 ,避免Executor数目下降时,等不及新申请到资源,已有的Executor就因为任务过重而导致集群挂掉。
    3. 如果程序中有shuffle,例如(reduce,groupBy),建议设置一个合理的并行数,避免杀掉过多的Executors。
    4. 对于每个Stage持续时间很短的应用,其实不适合这套机制。这样会频繁增加和杀掉Executors,造成系统颠簸。而Yarn对资源的申请处理速度并不快。

    Spark Streaming该使用什么机制动态调整资源
    现有的DRA机制其实适合长时的批处理过程中,每个Stage需要的资源量不一样,并且耗时都比较长。Spark Streaming 可以理解为循环的微批处理。而DRA是在每次微批处理起作用,可能还没等DRA反应过来,这个周期就已经过了。

    Spark Streaming需要一个从全局一天24小时来考虑。每个调度周期的processing time可能更适合作为增减Executors的标准。同时如果发生delay的话,则可以扩大资源申请的速度。并且,因为是周期性的,释放和新增动作只会发生在一个新的周期的开始,所以他并不会面临现有 DRA的问题,譬如需要通过额外的方式保存Shuffle 状态等。所以实现起来更加容易。我们可能需要同时监听StreamingContext的一些信息。

    具体而言:

    每个周期检查上个周期的处理时间 ,设为 preProcessingTime,周期为duration, 一般而言,我们的Spark Streaming程序都会让preProcessingTime < duration。否则会发生delay。

    如果 preProcessingTime > 0.8 * duration,则一次性将资源申请到maxExecutors。

    如果preProcessingTime < duration,则应该删除的Worker为:

    removeExecutorNum =  currentExecutors * ((duration -preProcessingTime)/duration - 0.2)
    

    其中0.2 为预留的worker数。如果removeExecutorNum如果<=0 则不进行任何操作。

    假设duration =10s, preProcessingTime= 5s, currentExecutors=100,则我们理论上认为 只要保留50%的资源即可。
    但是为了防止延时,我们其实额外保留一些20%资源。也就意味着我们删除30个Executor。 我们并不会一次性将资源都释放掉。假设我们增加一个新的参数spark.streaming.release.num.duration=5,这个参数意味着我们需要花费5个周期释放掉这30个Executor的资源。也就是当前这个周期,我们要释放掉 6个Executor。
    接着到下一个周期,重复上面的计算。 直到计算结果 <=0 为止。

    如有错误之处,欢迎评论指出。

    展开全文
  • 资源是影响 Spark 应用执行效率的一个重要因素。Spark 应用中真正执行 task 的组件是 Executor,可以通过spark.executor.instances 指定 Spark 应用的 Executor 的数量。在运行过程中,无论 Executor上是否有 task ...

    前言

    资源是影响 Spark 应用执行效率的一个重要因素。Spark 应用中真正执行 task 的组件是 Executor,可以通过spark.executor.instances 指定 Spark 应用的 Executor 的数量。在运行过程中,无论 Executor上是否有 task 在执行,都会被一直占有直到此 Spark 应用结束。


    上篇我们从动态优化的角度讲述了 Spark 3.0 版本中的自适应查询特性,它主要是在一条 SQL 执行过程中不断优化执行逻辑,选择更好的执行策略,从而达到提升性能的目的。本篇我们将从整个 Spark 集群资源的角度讨论一个常见痛点:资源不足。

    在 Spark 集群中的一个常见场景是,随着业务的不断发展,需要运行的 Spark 应用数和数据量越来越大,靠资源堆砌的优化方式也越来越显得捉襟见肘。当一个长期运行的 Spark 应用,若分配给它多个 Executor,可是却没有任何 task 分配到这些 Executor 上,而此时有其他的 Spark 应用却资源紧张,这就造成了资源浪费和调度不合理。

    1.png

    要是每个 Spark 应用的 Executor 数也能动态调整那就太好了。

    动态资源分配(Dynamic Resource Allocation)就是为了解决这种场景而产生。Spark 2.4 版本中 on Kubernetes 的动态资源并不完善,在 Spark 3.0 版本完善了 Spark on Kubernetes 的功能,其中就包括更灵敏的动态分配。我们 Erda 的 FDP 平台(Fast Data Platform)从 Spark 2.4 升级到 Spark 3.0,也尝试了动态资源分配的相关优化。本文将针对介绍 Spark 3.0 中 Spark on Kubernetes 的动态资源使用。

    原理

    一个 Spark 应用中如果有些 Stage 稍微数据倾斜,那就有大量的 Executor 是空闲状态,造成集群资源的极大浪费。通过动态资源分配策略,已经空闲的 Executor 如果超过了一定时间,就会被集群回收,并在之后的 Stage 需要时可再次请求 Executor。

    如下图所示,固定 Executor 个数情况,Job1 End 和 Job2 Start 之间,Executor 处于空闲状态,此时就造成集群资源的浪费。

    2.png

    开启动态资源分配后,在 Job1 结束后,Executor1 空闲一段时间便被回收;在 Job2 需要资源时再申Executor2,实现集群资源的动态管理。

    3.png

    动态分配的原理很容易理解:“按需使用”。当然,一些细节还是需要考虑到:

    • 何时新增/移除 Executor
    • Executor 数量的动态调整范围
    • Executor 的增减频率
    • Spark on Kubernetes 场景下,Executor 的 Pod 销毁后,它存储的中间计算数据如何访问

    这些注意点在下面的参数列表中都有相应的说明。

    参数一览

    spark.dynamicAllocation.enabled=true #总开关,是否开启动态资源配置,根据工作负载来衡量是否应该增加或减少executor,默认false
    
    spark.dynamicAllocation.shuffleTracking.enabled=true #spark3新增,之前没有官方支持的on k8s的Dynamic Resouce Allocation。启用shuffle文件跟踪,此配置不会回收保存了shuffle数据的executor
    
    spark.dynamicAllocation.shuffleTracking.timeout #启用shuffleTracking时控制保存shuffle数据的executor超时时间,默认使用GC垃圾回收控制释放。如果有时候GC不及时,配置此参数后,即使executor上存在shuffle数据,也会被回收。暂未配置
    
    spark.dynamicAllocation.minExecutors=1 #动态分配最小executor个数,在启动时就申请好的,默认0
    
    spark.dynamicAllocation.maxExecutors=10 #动态分配最大executor个数,默认infinity
    
    spark.dynamicAllocation.initialExecutors=2 #动态分配初始executor个数默认值=spark.dynamicAllocation.minExecutors
    
    spark.dynamicAllocation.executorIdleTimeout=60s #当某个executor空闲超过这个设定值,就会被kill,默认60s
    
    spark.dynamicAllocation.cachedExecutorIdleTimeout=240s #当某个缓存数据的executor空闲时间超过这个设定值,就会被kill,默认infinity
    
    spark.dynamicAllocation.schedulerBacklogTimeout=3s #任务队列非空,资源不够,申请executor的时间间隔,默认1s(第一次申请)
    
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout #同schedulerBacklogTimeout,是申请了新executor之后继续申请的间隔,默认=schedulerBacklogTimeout(第二次及之后)
    
    spark.specution=true #开启推测执行,对长尾task,会在其他executor上启动相同task,先运行结束的作为结果
    
    

    实战演示

    无图无真相,下面我们将动态资源分配进行简单演示。

    1.配置参数

    动态资源分配相关参数配置如下图所示:

    4.png

    如下图所示,Spark 应用启动时的 Executor 个数为 2。因为配置了

    spark.dynamicAllocation.initialExecutors=2
    
    ![5.png](https://img-blog.csdnimg.cn/img_convert/0660f9c2fc015c0c3e527fc2cb97dda8.png)

    运行一段时间后效果如下,executorNum 会递增,因为空闲的 Executor 被不断回收,新的 Executor 不断申请。

    6.png

    2. 验证快慢 SQL 执行

    使用 SparkThrfitServer 会遇到的问题是一个数据量很大的 SQL 把所有的资源全占了,导致后面的 SQL 都等待,即使后面的 SQL 只需要几秒就能完成。我们开启动态分配策略,再来看 SQL 执行顺序。

    先提交慢 SQL:

    7.png

    再提交快 SQL:

    8.png

    如下图所示,开启动态资源分配后,因为 SparkThrfitServer 可以申请新的 Executor,后面的 SQL 无需等待便可执行。Job7(慢 SQL)还在运行中,后提交的 Job8(快 SQL)已完成。这在一定程度上缓解了资源分配不合理的情况。

    9.png

    3. 详情查看

    我们在 SparkWebUI 上可以看到动态分配的整个流程。

    登陆 SparkWebUI 页面,Jobs -> Event Timeline,可以看到 Driver 对整个应用的 Executor 调度。如下图所示,显示了每个 Executor 的创建和回收。

    10.png

    同时也能看到此 Executor 的具体创建和回收时间。

    11.png

    在 Executors 标签页,我们可以看到所有历史 Executor 的当前状态。如下图所示,之前的 Executor 都已被回收,只有 Executor-31 状态为 Active。

    12.png

    总结

    动态资源分配策略在空闲时释放 Executor,繁忙时申请 Executor,虽然逻辑比较简单,但是和任务调度密切相关。它可以防止小数据申请大资源,Executor 空转的情况。在集群资源紧张,有多个 Spark 应用的场景下,可以开启动态分配达到资源按需使用的效果。

    以上是我们在 Spark 相关优化的一点经验,希望能够对大家有所帮助😄。

    注:文中部分图片源自于网络,侵删。

    更多技术干货请关注【尔达 Erda】公众号,与众多开源爱好者共同成长~

    展开全文
  • 注水算法对功率,比特,速率的动态资源分配MATLAB仿真
  • 为了解决用户选择协作集的协作多点传输(CoMP)系统中本地资源调度时的冲突问题,设计了一种冲突避免的动态资源分配方法.通过对协作资源的细分和随机指派,实现了协作节点间资源分配的解耦,简化了协商过程,有效避免...
  • Spark如何进行动态资源分配

    千次阅读 2020-10-16 06:50:00
    一、操作场景对于Spark应用来说,资源是影响Spark应用执行效率的一个重要因素。当一个长期运行的服务,若分配给它多个Executor,可是却没有任何任务分配给它,而此时有其他的应用却...

    一、操作场景

    对于Spark应用来说,资源是影响Spark应用执行效率的一个重要因素。当一个长期运行的服务,若分配给它多个Executor,可是却没有任何任务分配给它,而此时有其他的应用却资源紧张,这就造成了很大的资源浪费和资源不合理的调度。

    动态资源调度就是为了解决这种场景,根据当前应用任务的负载情况,实时的增减Executor个数,从而实现动态分配资源,使整个Spark系统更加健康。

    二、动态资源策略

    1、资源分配策略

    开启动态分配策略后,application会在task因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该application现有的executor无法满足所有task并行运行。spark一轮一轮的申请资源,当有task挂起或等待spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)`时间的时候,会开始动态资源分配;之后会每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。
    之所以采用指数增长,出于两方面考虑:其一,开始申请的少是考虑到可能application会马上得到满足;其次要成倍增加,是为了防止application需要很多资源,而该方式可以在很少次数的申请之后得到满足。

    2、资源回收策略

    当application的executor空闲时间超过spark.dynamicAllocation.executorIdleTimeout(默认60s)后,就会被回收。

    三、操作步骤

    1、yarn的配置

    首先需要对YARN进行配置,使其支持Spark的Shuffle Service。

    修改每台集群上的yarn-site.xml:

     - 修改
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle,spark_shuffle</value>
    </property>
    
     - 增加
    <property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    <property>
    <name>spark.shuffle.service.port</name>
    <value>7337</value>
    </property>
    

    将$SPARKHOME/lib/spark-X.X.X-yarn-shuffle.jar拷贝到每台NodeManager的${HADOOPHOME}/share/hadoop/yarn/lib/下, 重启所有修改配置的节点。

    2、Spark的配置

    配置$SPARK_HOME/conf/spark-defaults.conf,增加以下参数:

    spark.shuffle.service.enabled true   //启用External shuffle Service服务
    spark.shuffle.service.port 7337 //Shuffle Service默认服务端口,必须和yarn-site中的一致
    spark.dynamicAllocation.enabled true  //开启动态资源分配
    spark.dynamicAllocation.minExecutors 1  //每个Application最小分配的executor数
    spark.dynamicAllocation.maxExecutors 30  //每个Application最大并发分配的executor数
    spark.dynamicAllocation.schedulerBacklogTimeout 1s 
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
    

    四、启动

    使用spark-sql On Yarn执行SQL,动态分配资源。以yarn-client模式启动ThriftServer:

    cd $SPARK_HOME/sbin/
    ./start-thriftserver.sh \
    --master yarn-client \
    --conf spark.driver.memory=10G \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=300 \
    --conf spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=5s
    

    启动后,ThriftServer会在Yarn上作为一个长服务来运行。

    点击【阅读全文】可以查看官方文档最新版本详细介绍~

    历史好文推荐

    1. 从0到1搭建大数据平台之计算存储系统

    2. 从0到1搭建大数据平台之调度系统

    3. 从0到1搭建大数据平台之数据采集系统

    4. 如何从0到1搭建大数据平台

    展开全文
  • 基于动态资源分配的WDM-PON结构研究基于动态资源分配的WDM-PON结构研究,任丹萍,寿国础,文章简要分析了WDM-PON的系统结构,提出了逻辑组的概念,通过选择最佳的逻辑组数目来降低系统成本,提高系统资源利用率。...
  • spark动态资源分配参数一览

    千次阅读 2020-07-14 16:03:07
    是否使用动态资源分配,该资源分配将根据工作负载上下扩展在此应用程序中注册的执行程序的数量。 这需要设置spark.shuffle.service.enabled或spark.dynamicAllocation.shuffleTracking.enabled。 以下配置也...
  • 粒子群优化技术在云计算环境中动态资源分配中的应用。 粒子群智能用于有效地将物理机资源分配给虚拟机请求。 “偏度因子”或“负载平衡因子”的概念用于测量每个物理机器中资源的不均匀性。 创建所有可能分配的...
  • 高速铁路网中的时延感知动态资源分配
  • 动态资源分配算法

    2013-07-11 17:44:37
    利用银行家算法进行资源分配的报告,包括程序和其他的一套说明都在我这里可以找到匹配的。
  • Spark动态资源分配

    千次阅读 2018-10-10 14:54:48
    cloudera manager默认是开启了spark动态资源分配的,即spark.dynamicAllocation,enable=true cloudera manager默认的动态配置参数似乎不是很合理,比如spark.dynamicAllocation.schedulerBacklogTimeout这个参数的...
  • 转自:http://zhangxiong0301.iteye.com/blog/2192641从spark1.2开始,可以根据application的负载动态地增加和减少分配给application的资源。也就是说,你的application在不需要资源的时候会把资源退还给集群,而在...
  • 此次课程设计的主要内容是实现算法模拟银行家算法,模拟实现动态资源分配,编写和调试一个系统动态资源的简单模拟银行家算法程序程序,观察死锁产生的条件,并使用适当的算法,有效的防止和避免死锁的发生。...
  • 全能制造系统动态资源分配及冲突消解,刘海英,田新诚,摘要:全能制造系统的协调机制包括动态资源分配和冲突消解两个部分。资源的动态分配和冲突的消解是全能系统制造领域中不同研究方
  • OFDMA系统中利用信道延时信息进行动态资源分配,吴燕嬿,杨绿溪,本文研究了在多用户正交频分复用系统(OFDMA)中通过有延时的信道信息进行的自适应子载波、功率分配。在OFDMA系统中的动态资源分配��
  • 3G系统动态资源分配下的QoS性能分析pdf,3G系统动态资源分配下的QoS性能分析
  • 文章目录使能Spark资源动态分配使能External Shuffle ServiceExternal Shuffle Service配置 使能Spark资源动态分配 进入${SPARK_HOME}/conf/目录,在spark-defaults.conf文件中新增如下配置 如果spark-defaults.conf...
  • spark动态资源分配

    千次阅读 2017-05-25 14:09:40
    前段时间仓库间推广spark-sql时,生产环境已经应用了spark dynamic resource allocation特性,即可动态资源分配,这里的动态资源分配是指executor级的,我们知道spark的资源分配是比较coarse-grained的,一个...
  • Spark在Yarn上的动态资源分配

    千次阅读 2016-09-20 11:18:35
    参考地址:http://spark.apache.org/docs/1.5.2/job-scheduling.html#configuration-and-setup 1.配置hadoop/etc/yarn-site.xml  yarn.nodemanager.aux-services  mapreduce_shuffle,spark_shuffle  yar
  • 采用静态固定资源分配等策略不能适应资源和用户请求的动态变化,容易产生资源碎片,造成网格资源利用率低等问题。提出了一种基于分类挖掘的资源动态分配模型和算法,通过资源管理服务器中的守护进程,对集群中的任务动作...
  • 动态规划求解资源分配 动态规划求解资源分配 实验目标 实验目标 1 1 掌握用动态规划方法求解实际问题的基本思路 掌握用动态规划方法求解实际问题的基本思路 2 2 进一步理解动态规划方法的实质巩固设计动态规划算法的...
  • 1。本次调试查看源代码采用 spark学习-57-Spark下Scala版HBase下的根据权重获取最真实数据...只是修改了sparkSession的创建,代码如下,这里启动了采用standlone模式进行调试,否则无法进行动态资源分配 var _sparkSe
  • 3 3 动态规划求解资源分配 实验目标 掌握用动态规划方法求解实际问题的基本思路 进一步理解动态规划方法的实质巩固设计动态规划算法的基本步骤 实验任务 设计动态规划算法求解资源分配问题给出算法的非形式描述 在 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 422,696
精华内容 169,078
关键字:

动态资源分配