精华内容
下载资源
问答
  • DT大数据梦工厂系列场景什么并行度、如何调节并行度并行度对性能怎样的影响以及并行度调节成多大合适?分析 并行度 Snail理解的并行度是指spark集群能同时并发处理的task数量,在数值上等于集群的总core的数量...

    参考

    中华石杉
    DT大数据梦工厂系列

    场景

    什么是并行度、如何调节并行度、并行度对性能有怎样的影响以及并行度调节成多大合适?

    分析

    • 并行度

    Snail理解的并行度是指spark集群能同时并发处理的task数量,在数值上等于集群的总core的数量,其值可以在编写应用程序的时候指定:

    val conf = new SparkConf()  
    conf.setAppName("my first spark app ").set("spark.default.parallelism", "1")  //指定并行度为1
    • 对性能的影响

    [3.0.0]详细说明了最大化资源的重要性。好的,现在假设集群资源为:
    12个executors, 每个executor分配 4 cores 8G,并且已通过spark-submit将资源设置为最大了。处理相同复杂度的spark任务,考虑如下三种并行度设置方式:

    spark.default.parallelism = 24
    spark.default.parallelism = 12*4
    spark.default.parallelism = 12*4*5

    先比较第1、2种情况:集群能并行处理 48个task,而第1中情况并行度只设置成24 - 这意味着平均分给每个executor的task 为两个(24/12),意味着每个executor都浪费了2个core,意味着对处理相同复杂度的spark任务而言,每个task要处理的数据量比第二种情况要多一倍:比如总共有2400G的数据要处理,第一种情况下每个task要处理的数据量是 2400G/24 等于100G,而第二种情况是 50G 。处理的数据量越大,出现磁盘I/O、GC的可能性就增加了!

    • 并行度调节成多大合适
      经验证明(snail也只是听王教主说的):每个 core分配5个task最佳
      官方是推荐将task数量设置成spark application总cpu core数量的2~3倍,比如150个cpu core,基本要设置task数量为300~500,原因如下:
      有些task会运行的快一点,比如50s就完了,有些task可能会慢一点,要1分半才运行完。所以如果你的task数量,刚好设置的跟cpu core数量相同,还是会导致资源的浪费-因为,比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,就导致了浪费。那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,提升性能。

    实验

    这里限于资源问题只能在本地验证一点:通过set(“spark.default.parallelism”, “1”)设置的并行度与实际并行运行的task数量一至。

    1、代码 set(“spark.default.parallelism”, “1”)

    package com.dt.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object WordCount {
      def main(args: Array[String]): Unit = {  
        val conf = new SparkConf()  
        conf.setAppName("my first spark app ").set("spark.default.parallelism", "1") // 本地模式不设置的话,spark会自行将并行度设置
        //成本地最大的cores
        val sc = new SparkContext(conf)    
    
        val lines2 = sc.textFile("file:///home/pengyucheng/resource/hellospark.txt")  
    
        lines2.flatMap(line => line.split(" ") ).map( word => (word,1) ).reduceByKey(_+_).
        map(wordNumberPair=>(wordNumberPair._2,wordNumberPair._1)).sortByKey(false).
        collect.foreach(wordNumberPair => println(wordNumberPair._1 +":" + wordNumberPair._2)) 
    
         sc.stop  
      }  
    }

    执行脚本

    --class com.dt.spark.WordCount \
    --num-executors 1 \
    --driver-memory 1000m \
    --executor-memory 1000m \
    --executor-cores 1 \
    /home/pengyucheng/resource/wordcount.jar \
    

    结果 web UI

    这里写图片描述

    这里写图片描述

    共一个job,分为3个stage,每个stage一个task

    2、代码(略)set(“spark.default.parallelism”, “2”)

    web UI

    这里写图片描述

    这里写图片描述

    这里写图片描述

    划分成2个job,每个job分为2个stage,每个stage2个任务

    总结

    • 并行度的设置需要与资源相匹配,才能最大化利用集群资源
    • 并行度设置方式set(“spark.default.parallelism”, “parallelismNum”)

    疑问:并行度设置的不同,怎么会导致同一任务划分的job数量不同呢?

    展开全文
  • flink中的slot和并行度

    千次阅读 2020-03-19 14:48:55
    什么并行度? 能够相互独立,互不干扰的执行同一种算子任务的所有线程集合,称为并行度。就好比三个人一起去搬砖,事先咱们先把这些砖分为三份,一人一份,这样我们做的事都是搬砖,但是三个人互不影响,你搬你...

    一.什么是并行度?
    能够相互独立,互不干扰的执行同一种算子任务的所有线程集合,称为并行度。就好比有三个人一起去搬砖,事先咱们先把这些砖分为三份,一人一份,这样我们做的事都是搬砖,但是三个人互不影响,你搬你的,我搬我的。
    二.什么是slot?
    flink的每一个application都会根据前后的算子操作划分为一个个Task(就好比spark的stage),每一个Task里面封装了一个个 sub-task (就好比spark的 Taskset里面封装了一个个task),这些sub-task就是一个个并行的实例(线程),那么线程运行的资源在哪呢?flink集群为每个TaskManager提供了solt(资源槽)。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例。一般情况下你的slot数是你每个节点的cpu的核数。我们的sub-task就是运行在这个slot资源槽内的,他们的组成如下图。
    在这里插入图片描述
    TaskManager里面包含了许多Task slot ,一个Task slot 里面 又包含了许多 sub-task(这里要注意是sub-task,要知道sub-task 和 task的区别) 。
    三.flink是如何划分task的?
    flink会根据sub-task的传输方式来划分这个application中有多少个task以及sub-task,如下图所示:
    这个图中有5种不同的算子,每个算子都有自己的并行度,那么按道理说这个application的task数为5,sub-task数为10。但是答案确错了,flink会根据sub-task是否会发生Operator Chain来划分task,如图的红色方框,上下两个算子的并行度相同,并且处于同一个TaskManager里面,因此这里面不会发生shuffle和网络传输,所以flink将这两个算子给划分成了一个task,所以这个application中有4个task(8个sub-task)。
    在这里插入图片描述
    这里还需要注意的是同一个application,多个 task的 subTask,可以运行在同一个 slot 资源槽中。 但是同一个 task 中的多个 sub-task不能运行在一个 slot 资源槽中,他们会被分散到不同的slot中,这就限制了我们的任何一个算子的并行度不能设置的超过集群总slot的数量,否则集群不能正常的运行。
    下面有两个例子:

    public class ParallelismTest2 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> source = env.socketTextStream("node01" , 8888)..setParallelism(1);
            SingleOutputStreamOperator<Tuple2<String, String>> gsp = source.map(new MapFunction<String, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> map(String value) throws Exception {
                    return Tuple2.of(value, "1");
                }
            }).setParallelism(6);
    
            gsp.print().setParallelism(1);
    
            env.execute();
        }
    }
    

    很明显这里面我们总共有8个sub-task,其中map算子的并行度为6,将我们的程序打包到集群运行,可以看到:我们集群的slot为6,map的并行度也是6,这样能够很好的运行,三个算子都显示running。
    在这里插入图片描述
    在这里插入图片描述
    但是当我们把程序的map并行度修改为7时,程序就出现问题了,如图:
    在这里插入图片描述
    此时可以看到由于map的并行度为7,而我们的slot数为6,也就是说flink认为我的资源不够这个任务使用了,因此状态一直是created,最终将会变成失败。

    展开全文
  • ==思考问题1== 向集群提交一个拓扑的时候,Storm是如何计算Task数以及Executor数的?...构建拓扑的时候,有3个地方会影响并行度,这3个地方之间有什么关系? builder.setSpout("spout", new RandomSentenceSpo...

    ==思考问题1==

    向集群提交一个拓扑的时候,Storm是如何计算Task数以及Executor数的?

    具体有多少个worker,多少个executor,每个executor负责多少个task?

     

    ==思考问题2:==

    构建拓扑的时候,有3个地方会影响并行度,这3个地方之间有什么关系?

    builder.setSpout("spout", new RandomSentenceSpout(), 5); //parallelism-hint
    builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1);
    builder.setSpout("spout", new RandomSentenceSpout(), 5).setMaxTaskParallelism(1);

     

    ==3个参数的信息==

    1、parallelism-hint:

    构建拓扑时,可以通过setSpout或setBolt的函数参数中指定。为初始executor数

    如:builder.setSpout("spout", new RandomSentenceSpout(), 5);

     

    2、 TOPOLOGY-TASKS:

    构建拓扑时,通过Spout/Bolt的setNumTasks()方法来指定。为component的task数(Spout或Bolt)。

    如:builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1);

     

    3、TOPOLOGY-MAX-TASK-PARALLELISM:

    构建拓扑时,通过Spout/Bolt的setMaxTaskParallelism()方法来指定。为component的最大并行度通常用于测试,在本地模式时使用。

    如:builder.setSpout("spout", new RandomSentenceSpout(), 5).setMaxTaskParallelism(1);

     

    ==结论1:Executor数是多少?==

    对应topology代码中, 为每个component指定的parallelism-hint数(通过setBolt和setSpout的参数)

     

    ==结论2:Task数是多少?==

    版本号:apache-storm-1.0.1

    代码路径:org/apache/storm/daemon/nimbus.clj

     

     

    这里有一个函数非常重要,看了之后上面的3个关系多少会清晰很多。

    该函数返回计算之后的真实的Task数

    (defn- component-parallelism [storm-conf component]
      (let [storm-conf (merge storm-conf (component-conf component))
            num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
            max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
            ]
        (if max-parallelism
          (min max-parallelism num-tasks)
          num-tasks)))

     

    这个代码是用clojure语言编写的,没有用过的人估计会非常蛋疼,

    为了方便理解,用伪代码(方便理解)翻译之后,大概思路是这个样子的:

    num-tasks = (TOPOLOGY-TASKS != null ? TOPOLOGY-TASKS : parallelism-hint);
    max-parallelism = TOPOLOGY-MAX-TASK-PARALLELISM;
        
    if (max-parallelism != null) {
        //取两者较小
        return min(num-tasks, max-parallelism);
    } else {    
        return num-tasks;
    }

     

    如果将3个参数进行排列组合之后,获得结果如下:

     

    简单理解来说:

    1、暂时不考虑TOPOLOGY-MAX-TASK-PARALLELIS。(测试用的玩意儿,弄出来影响思路)

    2、TOPOLOGY-TASKS优先于parallelism-hint。

     

    ==Executor与Task是如何匹配的?==

    下面的代码是分配的代码

    (defn- compute-executors [nimbus storm-id]
      (let [conf (:conf nimbus)
            blob-store (:blob-store nimbus)
            storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
            component->executors (:component->executors storm-base)
            storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
            topology (read-storm-topology-as-nimbus storm-id blob-store)
            task->component (storm-task-info topology storm-conf)]
        (->> (storm-task-info topology storm-conf)
             reverse-map
             (map-val sort)
             (join-maps component->executors)
             (map-val (partial apply partition-fixed))
             (mapcat second)
             (map to-executor-id)
             )))

     

    理解这个代码之前,我们首先把注意力放在storm-task-info这个函数上,看看它都干了些什么。

    代码位置:org/apache/storm/daemon/common.clj

    (defn storm-task-info
      "Returns map from task -> component id"
      [^StormTopology user-topology storm-conf]
      (->> (system-topology! storm-conf user-topology)
           all-components
           (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
           (sort-by first)
           (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
           (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
           (into {})
           ))

     

    来看看广大网友的解读版。参考博客:https://www.cnblogs.com/ierbar0604/p/4386480.html

    这个函数, 首先读出所有components ,对每个component, 读出TOPOLOGY-TASKS(已经过标准化之后的TASK数,具体参照前面的内容),

    最后用递增序列产生taskid, 并最终生成component和task的对应关系。

    (如果不设置TOPOLOGY-TASKS,task数等于executor数,后面分配就很容易,否则就涉及task分配问题)

     

    storm-task-info函数的输出,是这个样子的:

    {1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}

     

    然后,我们把注意力返回到compute-executors函数(调用storm-task-info函数的调用处)。

    还是用上面博客中,网友解读的版本来帮助我们理解。(注意:需要对照源码,确认当前版本代码是否有变化)

     

    ==我的笔记==

     

    最后,从程序与StormUI界面对比来看看并行度的分配结果。

    (拓扑程序)

     

     (UI界面)

     

    ==简单总结==

    1、有3个地方可以影响Task数,根据3个参数的结果决定Task数。

    2、executor数 = 所有组件的parallelism-hint总数。

    3、task数在生命周期内不变,executor数可能改变。

     

    ==rebalance命令==

    storm运行过程中,而已使用rebalance命令动态调整拓扑的worker数及并发度。

    命令模板:storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*  (*表示可以设置多个)

    ## 重新配置拓扑 "mytopology",使得该拓扑拥有 5 个 worker processes,
    ## 另外,配置名为 "blue-spout" 的 spout 使用 3 个 executor,
    ## 配置名为 "yellow-bolt" 的 bolt 使用 10 个 executor。
    
    $ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=1

    -w:标记覆盖Storm在禁用与关闭期间等待的时间长度。

     

    ==其他疑问==

    1、网上总是能看到,“不推荐使用setNumTasks”的方式来提高并发度。至于原因确实是一直没有搞明白。

    答:如果只单纯的使用setNumTasks,不调整parallelism-hint,会造成多个Task运行在1个executor的结果。并不一定能够提高性能。

     

    2、如果task数比executor数多,是否会有闲置executor?(需要用代码验证)

    答:不会有闲置executor。

     

    -------------

    参考博客:

    https://www.cnblogs.com/ierbar0604/p/4386480.html

    http://lib.csdn.net/article/60/42875

    转载于:https://www.cnblogs.com/quchunhui/p/8271349.html

    展开全文
  • why ? 为什么要编写并行程序与构建并行系统 The answer:单处理器的性能具有瓶颈,内部...在这里,用一个简单的例子来说明任务并行与数据并行之间的区别:现在一个教授P,然后他的手下4个助教(A,B,C,D),期...

    why ?

    为什么要编写并行程序与构建并行系统
    The answer:单处理器的性能具有瓶颈,内部原因是因为晶体管的密度不可能无限制的增大,外部原因是因为密度增大,处理器的散热就是一个很大的问题,过于高的温度会影响性能。

    how ?

    Two methods:任务并行与数据并行
    在这里,用一个简单的例子来说明任务并行与数据并行之间的区别:现在有一个教授P,然后他的手下有4个助教(A,B,C,D),期末考试参加的学生有100个,然后改卷子,卷子一共有5道题目。
    首先,我们可以将教授以及他手下的主教5人看作是5个核,然后,如果每一个核负责改卷子上的某一道题目,那么这样就属于任务并行(总共有5个任务,一个核一个)。另外一个想法就是一个核负责20个学生的卷子,则这就是数据并行(总共100个数据,每一个核20个)。
    并行程序的设计过程中,要注意的问题:核之间的通信,负载平衡,同步。
    功能最强大的并行程序就是通过显式的并行结构来编写,即用扩展C和扩展C++编写。

    what should we do ?

    三种并行编程的方法:
    One : 消息传递接口(Message-Passing Interface MPI)
    Two : POSIX线程(POSIX threads Pthreads)
    Three : OpenMP
    为什么会有三种并行编程的实现方法:因为总的而言,并行系统可以分为2类:其中一类是共享内存系统,另外一种是分布式内存系统。Pthreads以及OpenMP是为了共享内存系统的编程而设计的,而MPI是为了分布式内存系统而设计的。
    关于共享内存系统中两种方法的主要区别是:Pthreads是比较底层的然而OpenMP是对C语言相对更高层次的扩展。

    展开全文
  • 1.耦合比较低。不会影响其他模块的开发。 2.减轻团队的成本,可以并行开发,不用关注其他人怎么开发,先关注自己的开发。 3.配置比较简单,基本用注解就能实现,不用使用过多的配置文件。 4.微服务跨平台的,可以...
  • MATLAB Parfor

    2021-02-20 14:28:24
    Matlab parfor 并行计算 ...main里parfor循环中的function中添加parfor有什么影响呢,查了下娘是废物,谷歌了下,发现MATLAB帮助文档里就有, 简单的就是,只有外层parfor有用,内层没有并行
  • SPH光滑粒子流体动力学中英文都,中文版本以及英文版的都,拿去参考吧。光滑粒子流体动力学-一种无网格粒子法 第1章 绪论 1.1 数值模拟 1.1.1 数值模拟的作用 1.1.2 一般数值模拟的求解过程 1.2 基于网格的方法 ...
  • Checkpoint机制原理前言一、如何理解flink中state(状态)Ⅰ、state理解Ⅱ、案例理解stateⅢ、为什么需要state管理Ⅳ、理想中的state管理二、如何理解flink中checkpoint(检查点)Ⅰ、执行流程Ⅱ、ck保存了什么Ⅲ、单...
  • 这种默认的,task执行的算子中,使用了外部的变量,每个task都会获取一份变量的副本,有什么缺点呢?在什么情况下,会出现性能上的恶劣的影响呢?map,本身是不小,存放数据的一个单位是Entry,还有可能会用链表的...
  • 数据库隔离级别总结

    2014-03-05 15:10:22
    在数据库系统中,隔离是定义一个操作对数据所做的改变如何/何时对其它的并行操作可见。 隔离并不改变锁本身的行为,而是通过实行不同的锁... 维护一个最高的隔离级别虽然会防止数据的出错,但是却导致了并行度的...
  • 这种默认的,task执行的算子中,使用了外部的变量,每个task都会获取一份变量的副本,有什么缺点呢?在什么情况下,会出现性能上的恶劣的影响呢? map,本身是不小,存放数据的一个单位是Entry,还有可能会用链表的...
  • 这种默认的,task执行的算子中,使用了外部的变量,每个task都会获取一份变量的副本,有什么缺点呢?在什么情况下,会出现性能上的恶略的影响呢?  map本身是不小的,map中存放数据的单位是entry,还有可能会用...
  • §5.1.3 过度请求的影响 83 §5.1.4 调整以解决问题 83 §5.2 优化的执行者 84 §5.3 设置性能目标 84 第7章 系统优化方法 85 §6.1 何时优化效率最高 85 §6.1.1 系统设计阶段和开发阶段的优化 85 §6.1.2 改善产品...
  • CruiseYoung提供的带详细书签的电子书籍目录 ... Oracle Database 9i/10g/11g编程艺术:深入数据库体系结构:第...例如11g引入dbms_parallel_execute包来帮助自动实现原来需要人工实现的并行化,以及引入PSQ来控制并行度,...
  • 近年来,这种趋势略变化,部分原因受到量子隧穿效应影响。然而,并行化计算的进步以及半导体技术和量子计算潜在的革命性变化,可能意味着摩尔定律在未来几十年内继续保持正确。 观点 摩尔定律 (以及为什么人们...
  • 8.6.2 并行度的设定 211 8.7 直接加载 213 8.7.1 直接加载和REDO 216 8.7.2 直接加载和索引 219 8.7.3 直接加载和并行 221 8.7.4 直接加载和SQL*LOADER 226 第9章 变量绑定 232 9.1 什么是变量绑定,为什么要...
  • 1.2.8 对大数据平台中的元数据管理是怎么理解的,元数据收集管理体系是怎么样的,会对大数据应用有什么样的影响 1.2.9 你理解常见如阿里,和友商大数据平台的技术体系差异以及发展趋势和技术瓶颈,在存储和计算两...
  • 影响煤炭需求的因素很多,它们之间存在着复杂的关系,而不是线性或简单的非线性关系。因此不能用线性或简单非线性函数来描述。这也是为什么目前煤炭需求预测的精度较低的问题所在。BP神经网络是一种神经网络学习...
  • 影响煤炭需求的因素很多,它们之间存在着复杂的关系,而不是线性或简单的非线性关系。因此不能用线性或简单非线性函数来描述。这也是为什么目前煤炭需求预测的精度较低的问题所在。BP神经网络是一种神经网络学习...
  • 影响煤炭需求的因素很多,它们之间存在着复杂的关系,而不是线性或简单的非线性关系。因此不能用线性或简单非线性函数来描述。这也是为什么目前煤炭需求预测的精度较低的问题所在。BP神经网络是一种神经网络学习...
  • 26.模拟中偺接口与模拟用户接口有什么区别?完成哪些功能? 答:应用范围不一样: 模拟用户接口是社控交挟设备连接模拟话杌的接口电路,实际上是樸拟电和数宁电路 间的接口ε而模拟中繼口是用于连接模拟中緒线的接口,常...
  • 在介绍 Java 怎么学之前我给大家介绍一下学完了能干什么,因为目标的学习才是最高效的。 Java 这门语言,在公司里根据分工不同衍生出了众多的岗位或者技术方向。 我在 boss 直聘上搜索了 BAT 等大厂的岗位,目前...
  • 不能否认,以上这些情况正是我们大多数企业目前所面临的一个严峻的管理问题,然而,针对这一现象,我们又能有什么有效的办法来解决它呢?――事实是,在中国的企业还没有完全意识到这一问题的严重性的,国外的ERP/...
  • JavaScript王者归来

    2013-01-10 11:30:48
    多年来致力于 JavaScript技术和Web标准的推广,活跃于国内极有影响力的JavaScript专业网站——无忧脚本(www.51js.com),并任 JavaScript版的版主。平时热爱文学、写作和围棋。 目录: 第一部分 概论 第1章...
  • 3.7.1 对流程的影响 60 3.7.2 集中式持续集成 61 3.7.3 技术问题 61 3.7.4 替代方法 62 3.8 分布式版本控制系统 63 3.9 小结 66 第4章 测试策略的实现 67 4.1 引言 67 4.2 测试的分类 68 4.2.1 ...
  • 起振时间可由电路参数整定稳定受振荡器类型温度和电压等参数影响复位电 路的可靠性。 复位电路的基本功能是:系统上电时提供复位信号,直至系统电源稳定 后,撤销复位信号。为可靠起见,电源稳定后还要经一定的...
  • CruiseYoung提供的带详细书签的电子书籍目录 http://blog.csdn.net/fksec/article/details/7888251 Oracle Database 11g数据库管理艺术(涵盖DBA必知必会的所有数据库管理知识) 基本信息 原书名: Expert Oracle ...
  • 在数据库、主机、存储等领域丰富经验,主导了主机性能评估模型、数据库水平拆分、基于数据库日志解析的数据同步、基于ISCSI 的廉价存储等项目,目前专注于大规模数据的并行计算和存储、用户行为研究与风险控制领域...
  • <div><p>因为现在的<code>edp-package在使用过程中还是不少细节问题的,这些细节很难说清楚,所以我整理了一个我认为合理的流程,看看是否能采纳到现在的功能中 第一次写完发现自己想的不对&#...
  • 13.1.4 糟糕的内聚 201 13.1.5 表值函数 202 13.1.6 同一数据元素多个名称 202 13.1.7 数据库中的格式 202 13.1.8 将日期保存到字符串中 203 13.1.9 BIT标记、BOOLEAN及其他计算列 203 13.1.10 跨...

空空如也

空空如也

1 2
收藏数 38
精华内容 15
关键字:

并行度有什么影响