精华内容
下载资源
问答
  • 【笔记】注意分配的一些模型

    千次阅读 2017-09-03 14:39:31
    伊利诺斯大学航空学院实验室 ...假设从一个仪表转移注意力到另一个仪表的可能性与每个仪表上的注视点相对数量一致,基于信息带宽和信息源所要求的读数的精确性,提出注视点量化的理论模型,计

    伊利诺斯大学航空学院实验室





    20世纪40年代:Fitts 和 Jones 就对军用飞机飞行员的眼动行为和仪表扫视进行了一系列研究,研究成果奠定了T型仪表布局。

    20世纪60年代:Senders引入信息带宽的概念。假设从一个仪表转移注意力到另一个仪表的可能性与每个仪表上的注视点相对数量一致,基于信息带宽和信息源所要求的读数的精确性,提出注视点量化的理论模型,计算注意力需求[14]。

    1968 年:Carbonell 在此基础上,基于排队论(queueing theory)提出一种飞行员视觉采样行为计算模型,经过试验验证,该模型与执行任务时的飞行员眼动行为非常吻合[15],Carbonell 模型

    1985 年,Koch 和 Ullman 提出一种视觉注意模型,该模型针对给定场景,综合不同类型特征的显著图得到一个特征综合性的显著图,通过提出并采用了赢者全取算法来选
    择突显性最大的区域。[16]

    1998 年,Itti 等人在 Koch 和 Ullman 研究工作的基础上提出一种自底向上视觉注意计算模型,该模型针对给定场景需设置不同的参数值。该模型主要分为突显性映射图的计算阶段和注视点的选择阶段,突显性映射图的计算又分为三个步骤:视觉特征提取、中心——周边差异和归一化计算、特征重组。而将突显性映射图中幅度最强的区域设置为注视点。[17]

    2000 年,Itti&Koch 在该模型的基础上,针对注意力捕捉中的突显性提出一种更详细的计算模型,关注隐性注意力的转移而忽略眼动细节,重点在于整合模态、方向、强度和颜色信息。[17]

    Wickens 等人将情境意识注意力分配相结合,提出一种情境意识——注意力分配(A-SA)模型,其中注意力分配模块 SEEV 模型受到四个因素的影响:引起注意的事件突显性努力预期价值,通过对四个影响因素线性加权建立起注意力分配的关系式。[19]

    2011 年 Kelly S. Steelman 等以 Carbonell 和 Senders 的视觉扫视采样模型为框架,结2011 年 Kelly S. Steelman 等以 Carbonell 和 Senders 的视觉扫视采样模型为框架,结合 Wickens 的 SEEV 模型,提出了成熟的 NSEEV 模型。该模型既可预测注意力分配,又可预测飞行员检测离散事件发生所需时间和速率等绩效。通过输入显示器信息通道或者兴趣域(AOI)的图像文件和其他一些参数文件,模型首先产生表示通道优先性(预期和价值)、静态突显性(利用 Itti & Koch 的突显计算模型)、动态突显性(引入欧几里得距离)和特征优先性的草图,飞行员的任务设置决定了各草图的相关性值,然后将各草图根据相关性权重合成一副主图,最后利用概率选择模型,基于不同兴趣域(AOI)的相对激活水平选择出下一个注视点。[21]




    参考文献:


    [14]Senders J W. The human operator as a monitor and controller of multidegree of freedom systems[J]. IEEE Transactions on Human Factors in Electronics, 1964 (1): 2-5.

    【15】Carbonell J R, Ward J L, Senders J W. A queueing model of visual sampling experimental validation[J]. IEEE transactions on man-machine systems, 1968, 9(3): 82-87.

    【16】Koch C, Ullman S. Shifts in selective visual attention: towards the underlying neural circuitry[M]//Matters of intelligence. Springer Netherlands, 1987: 115-141.

    【17】Itti L, Koch C. A saliency-based search mechanism for overt and covert shifts of visual attention[J]. Vision research, 2000, 40(10): 1489-1506.

    【19】


    【52】Wickens C D, Goh J, Helleberg J, et al. Attentional models of multitask pilot performance using advanced display technology[J]. Human factors, 2003, 45(3): 360-380.

    [51]Itti L, Koch C, Niebur E. A model of saliency-based visual attention for rapid scene analysis[J]. IEEE Transactions on pattern analysis and machine intelligence, 1998, 20(11): 1254-1259.

    展开全文
  • 更长的列会小号更多的内存,因为MySQL通常会分配固定大小的内存块来保存内部值。尤其是在使用内存临时表 进行排序或操作时会特别糟糕。在利用磁盘临时表进行排序时也同样糟糕。  所以最好的策略是只分配真正...

    使用varchar(5)和varchar(200)存储‘hello’的空间开销是一样的,那么使用更短的列有什么优势吗?

           事实证明有很大的优势。更长的列会小号更多的内存,因为MySQL通常会分配固定大小的内存块来保存内部值。尤其是在使用内存临时表

    进行排序或操作时会特别糟糕。在利用磁盘临时表进行排序时也同样糟糕。

            所以最好的策略是只分配真正需要的空间。

    展开全文
  • 本篇主要分享业务事物层面对科目确定的影响。这部分内容,将分多篇进行讲解,本篇主要讲解“事物/事务代码”对科目的影响。 一、业务事物中自动科目确定的逻辑分析 如开篇的第一张图所示,自动科目确定的逻辑...

    导读

    本篇主要分享业务事物层面对科目确定的影响。这部分内容,将分多篇进行讲解,本篇主要讲解“事物/事务代码”对科目的影响。

    一、业务事物中自动科目确定的逻辑分析

    如开篇的第一张图所示,自动科目确定的逻辑如下:当货物移动/发票校验进行时,系统将能确定具体的价值串,当价值串被确定后,系统将能够确定具体的“事物/事务代码”,一旦“事物/事务代码”确定了,根据我们多次提到的,如下图所示的科目决定关系,就能决定出具体的科目。

    注意1:“事物/事务代码”和“科目分组代码”,在业务事物层面,对科目的确定均有影响作用。为了方便观看本篇内容的朋友理解,本篇先抛开“科目分组代码”的影响,仅讲“事物/事务代码”的作用,后续一篇中会专门讲“科目分组代码”的影响。

    二、“事物/事务代码”和“价值串”

    在本篇的第一部分中,我们了解到:自动确定科目的前提是,自动确定“事物/事务代码”;而自动确定“事物/事务代码”的前提是,自动确定“价值串”;“价值串”的自动确定,是根据货物移动,或者发票校验的执行而发生的。

    那么,这里我们就不得不提出一个问题:什么是“事物/事务代码”?什么是“价值串”?

    事物/事务代码:实际上就是科目确定的代码,不同的“事物/事务代码”分别对应着不同的科目。比如BSX代表着库存科目,WRX代表着GR/IR清算科目,PRD代表着差异科目等等。

    系统层面上的进一步解释,就是:OBYC中维护科目的时候,双击BSX,进入到BSX的维护界面,一定维护库存科目代码。下图为:BSX的科目配置界面。

    价值串:实际上是多个“事物/事务代码”在业务逻辑上的集合。多个不同的“事物/事务代码”,根据合理的业务逻辑,有机地集合在一起,用一个编号代表这个集合,而这个编号就是“价值串”。所以,“价值串”本身,就是一套业务层面的过账规则。

    比如,当我们针对采购订单进行收货时,库存科目要更新,GR/IR清算科目也要更新,如果是在标准价的情况下,差异科目可能也会更新,所以,针对采购订单收货的价值串,至少包括事物/事务代码:BSX、WRX、PRD。当然,SAP系统中,关于采购订单收货的价值串,包含了不止这三个的价值串。

    三、“货物移动/发票校验”、“价值串”及“事物/事务代码”的逻辑详解

    1.  移动类型与价值串的分配关系

    2.价值串与“事物/事务代码”的分配关系

    更详尽的内容,请在关注我的公众号,在我的公众号查看相关文章。

    https://mp.weixin.qq.com/s/uwkTZQP5ZgsWLQCcwtL4IQ

    才疏学浅,能力有限,如果大家发现问题,还请批评指正。

    愿大家的学习,轻松且愉快

     

    展开全文
  • datax(22):任务分配规则

    万次阅读 热门讨论 2021-01-26 23:18:25
    然后,为避免顺序给读写端带来长尾影响, * 将整合的结果shuffler掉 * 1 动态调整并获取channel数量 * 2 根据1的channel数量 切割reader 得到reader的cfg列表 * 3 根据2的 cfg数量切割writer,得到writer cfg 列表 *...

    前面学习了一些源码和datax的执行,其中有一个重要的流程任务切分。今天梳理下;


    一、概述

    Datax根首先据配置文件,确定好channel的并发数目。然后将整个job分成一个个小的task,然后划分成组。从JobContainer的start()方法开始,进入split()方法,split方法里执行后续所有的切分;


    二、总体流程

    1. 切分任务
    2. channel数目的确定
    3. reader的切分
    4. Writer的切分
    5. 合并配置 分
    6. 配任务

    三、切分任务

    JobContainer 的split负责将整个job切分成多个task,生成task配置的列表。

      /**
       * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果, 达到切分后数目相等,
       * 才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起。然后,为避免顺序给读写端带来长尾影响,
       * 将整合的结果shuffler掉 <br/>
       * 1 动态调整并获取channel数量 <br/>
       * 2 根据1的channel数量 切割reader 得到reader的cfg列表 <br/>
       * 3 根据2的 cfg数量切割writer,得到writer cfg 列表 <br/>
       * 4 获取transform的cfg 列表 <br/>
       * 5 合并234的cfg <br/>
       */
      private int split() {
        this.adjustChannelNumber();
        needChannelNumber = needChannelNumber <= 0 ? 1 : needChannelNumber;
    
        List<Configuration> readerTaskCfgs = this.doReaderSplit(this.needChannelNumber);
        int taskNumber = readerTaskCfgs.size();
        List<Configuration> writerTaskCfs = this.doWriterSplit(taskNumber);
    
        List<Configuration> trsfms = configuration.getListConfiguration(DATAX_JOB_CONTENT_TRANSFORMER);
    
        LOG.debug("transformer configuration: " + JSON.toJSONString(trsfms));
        //输入是reader和writer的parameter list,输出是content下面元素的list
        List<Configuration> contentCfgs = mergeReaderAndWriterTaskConfigs(readerTaskCfgs, writerTaskCfs,
            trsfms);
    
        LOG.debug("contentConfig configuration: " + JSON.toJSONString(contentCfgs));
        this.configuration.set(DATAX_JOB_CONTENT, contentCfgs);
        return contentCfgs.size();
      }
    

    执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型.


    四、channel数目的确定

    datax先从core.json 和 job.json 里获取用户指定的channel,然后再内部根据实际情况进行调整channel数量;

    1、用户指定channel数:

    {
        "core": {
           "transport" : {
              "channel": {
                 "speed": {
                    "record": 100,
                    "byte": 100
                 }
              }
           }
        },
        "job": {
          "setting": {
            "speed": {
              "record": 500,
              "byte": 1000,
              "channel" : 1
            }
          }
        }
    }
    

    job里的是全局配置, core里的channel是单个channel的限制。首先计算按照字节数限速,channel的数目应该为 500 / 100 = 5,然后按照记录数限速, channel的数目应该为 1000 / 100 = 10, 最后返回两者的最小值 5。虽然指定了channel为1, 但只有在没有限速的条件下,才会使用。

    2、程序根据实际情况调整channel数

    所有调整channel的代码在JobContainer的adjustChannelNumber方法

      /**
       * 根据byteNum和RecordNum调整channel数量 <br>
       * 1 是否有全局(job) byte限制,如果有,则必须要有channel的byte设置,最后计算出 需要的channelByByte数量  <br>
       * 2 是否有全局(job) record限制,如果有,则必须要有channel的record设置,最后计算出 需要的channelByRecord数量 <br>
       * 3 取1和2的最小值设置到job的channelNumber,如果可以设置,则该方法任务完成,退出 <br>
       * 4 如果3 未能设置,则从cfg中判断用户是否自己设置了channelNum,如果用户设置了,将用户设置的给本job channel <br>
       */
      private void adjustChannelNumber() {
        int needChannelNumByByte = Integer.MAX_VALUE;
    
        boolean hasByteLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
        if (hasByteLimit) {
          long jobByteSpeed = configuration.getInt(DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
          // 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
          Long channelByteSpeed = configuration.getLong(DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
          if (channelByteSpeed == null || channelByteSpeed <= 0) {
            throw DataXException.asDataXException(
                CONFIG_ERROR, "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
          }
          needChannelNumByByte = (int) (jobByteSpeed / channelByteSpeed);
          needChannelNumByByte = needChannelNumByByte > 0 ? needChannelNumByByte : 1;
          LOG.info("Job set Max-Byte-Speed to " + jobByteSpeed + " bytes.");
        }
    
        int needChannelNumByRecord = Integer.MAX_VALUE;
        boolean hasRecordLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
        if (hasRecordLimit) {
          long jobRecordSpeed = configuration.getInt(DATAX_JOB_SETTING_SPEED_RECORD, 100000);
          Long channelRecordSpeed = configuration.getLong(DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
          if (channelRecordSpeed == null || channelRecordSpeed <= 0) {
            throw DataXException.asDataXException(CONFIG_ERROR,
                "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
          }
          needChannelNumByRecord = (int) (jobRecordSpeed / channelRecordSpeed);
          needChannelNumByRecord = needChannelNumByRecord > 0 ? needChannelNumByRecord : 1;
          LOG.info("Job set Max-Record-Speed to " + jobRecordSpeed + " records.");
        }
    
        // 全局的 needChannelNumber 按照needChannelNumByByte 和needChannelNumByRecord  取较小值
        needChannelNumber = Math.min(needChannelNumByByte, needChannelNumByRecord);
    
        // 如果从byte或record上设置了needChannelNumber则退出
        if (this.needChannelNumber < Integer.MAX_VALUE) {
          return;
        }
    
        boolean hasChannelLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
        if (hasChannelLimit) {
          needChannelNumber = this.configuration.getInt(DATAX_JOB_SETTING_SPEED_CHANNEL);
          LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels.");
          return;
        }
        throw DataXException.asDataXException(CONFIG_ERROR, "Job运行速度必须设置");
      }
    

    五、reader的切分

    doReaderSplit方法, 调用Reader.Job的split方法,返回Reader.Task的Configuration列表

    /**
    * adviceNumber, 建议的数目
    */
    private List<Configuration> doReaderSplit(int adviceNumber) {
        // 切换ClassLoader
        classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
                PluginType.READER, this.readerPluginName));
        // 调用Job.Reader的split切分
        List<Configuration> readerSlicesConfigs =
                this.jobReader.split(adviceNumber);
        if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                    "reader切分的task数目不能小于等于0");
        }
        LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",
                this.readerPluginName, readerSlicesConfigs.size());
        classLoaderSwapper.restoreCurrentThreadClassLoader();
        return readerSlicesConfigs;
    }
     public List<Configuration> split(int adviceNumber) {
    
                LOG.info("split() begin...");
                List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
                // warn:每个slice拖且仅拖一个文件,
                // int splitNumber = adviceNumber;
                int splitNumber = this.sourceFiles.size();
                if (0 == splitNumber) {
    //                throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
    //                        String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH)));
                    String message = String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH));
                    LOG.info(message);
                    return new ArrayList<Configuration>();
                }
    
                List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);
                for (List<String> files : splitedSourceFiles) {
                    Configuration splitedConfig = this.readerOriginConfig.clone();
                    splitedConfig.set(Constant.SOURCE_FILES, files);
                    readerSplitConfigs.add(splitedConfig);
                }
    
                return readerSplitConfigs;
            }
    
    private <T> List<List<T>> splitSourceFiles(final List<T> sourceList, int adviceNumber) {
                List<List<T>> splitedList = new ArrayList<List<T>>();
                int averageLength = sourceList.size() / adviceNumber;
                averageLength = averageLength == 0 ? 1 : averageLength;
    
                for (int begin = 0, end = 0; begin < sourceList.size(); begin = end) {
                    end = begin + averageLength;
                    if (end > sourceList.size()) {
                        end = sourceList.size();
                    }
                    splitedList.add(sourceList.subList(begin, end));
                }
                return splitedList;
            }
    

    这里的reader是hdfs reader 原文件数,reader task数等于文件数。


    六、Writer的切分数

    doWriterSplit方法, 调用Writer.JOb的split方法,返回Writer.Task的Configuration列表

    private List<Configuration> doWriterSplit(int readerTaskNumber) {
        // 切换ClassLoader
        classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
                PluginType.WRITER, this.writerPluginName));
        // 调用Job.Reader的split切分
        List<Configuration> writerSlicesConfigs = this.jobWriter
                .split(readerTaskNumber);
        if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                    "writer切分的task不能小于等于0");
        }
        LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",
                this.writerPluginName, writerSlicesConfigs.size());
        classLoaderSwapper.restoreCurrentThreadClassLoader();
    
        return writerSlicesConfigs;
    }
    

    为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
    比如rediswriter

     public List<Configuration> split(int mandatoryNumber) {
                List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
                for (int i = 0; i < mandatoryNumber; i++) {
                    configurations.add(getPluginJobConf());
                }
                return configurations;
            }
    

    七、合并配置

    合并reader,writer,transformer配置列表。并将任务列表,保存在配置job.content的值里。

    private List<Configuration> mergeReaderAndWriterTaskConfigs(
            List<Configuration> readerTasksConfigs,
            List<Configuration> writerTasksConfigs,
            List<Configuration> transformerConfigs) {
        // reader切分的任务数目必须等于writer切分的任务数目
        if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                    String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
                            readerTasksConfigs.size(), writerTasksConfigs.size())
            );
        }
    
        List<Configuration> contentConfigs = new ArrayList<Configuration>();
        for (int i = 0; i < readerTasksConfigs.size(); i++) {
            Configuration taskConfig = Configuration.newDefault();
            // 保存reader相关配置
            taskConfig.set(CoreConstant.JOB_READER_NAME,
                    this.readerPluginName);
            taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
                    readerTasksConfigs.get(i));
            // 保存writer相关配置
            taskConfig.set(CoreConstant.JOB_WRITER_NAME,
                    this.writerPluginName);
            taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
                    writerTasksConfigs.get(i));
            // 保存transformer相关配置
            if(transformerConfigs!=null && transformerConfigs.size()>0){
                taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
            }
    
            taskConfig.set(CoreConstant.TASK_ID, i);
            contentConfigs.add(taskConfig);
        }
    
        return contentConfigs;
    }
    

    分配任务
    分配算法

    首先根据指定的channel数目和每个Taskgroup的拥有channel数目,计算出Taskgroup的数目
    根据每个任务的reader.parameter.loadBalanceResourceMark将任务分组
    根据每个任务writer.parameter.loadBalanceResourceMark来讲任务分组
    根据上面两个任务分组的组数,挑选出大的那个组
    轮询上面步骤的任务组,依次轮询的向各个TaskGroup添加一个,直到所有任务都被分配完
    这里举个实例:
    目前有7个task,channel有20个,每个Taskgroup拥有5个channel。
    首先计算出Taskgroup的数目, 20 / 5 = 4 。(实际不会有这种情况,channel数不会超过task数)

    根据reader.parameter.loadBalanceResourceMark,将任务分组如下:

    {
        "database_a" : [task_id_1, task_id_2],
        "database_b" : [task_id_3, task_id_4, task_id_5],
        "database_c" : [task_id_6, task_id_7]
    }
    

    根据writer.parameter.loadBalanceResourceMark,将任务分组如下:

    {
        "database_dst_d" : [task_id_1, task_id_2],
        "database_dst_e" : [task_id_3, task_id_4, task_id_5, task_id_6, task_id_7]
    }
    

    因为readerResourceMarkAndTaskIdMap有三个组,而writerResourceMarkAndTaskIdMap只有两个组。从中选出组数最多的,所以这里按照readerResourceMarkAndTaskIdMap将任务分配。

    执行过程是,轮询database_a, database_b, database_c,取出第一个。循环上一步

    1. 取出task_id_1 放入 taskGroup_1
    2. 取出task_id_3 放入 taskGroup_2
    3. 取出task_id_6 放入 taskGroup_3
    4. 取出task_id_2 放入 taskGroup_4
    5. 取出task_id_4 放入 taskGroup_1
    6. ………
    

    最后返回的结果为

    {
        "taskGroup_1": [task_id_1, task_id_4],
        "taskGroup_2": [task_id_3, task_id_7],
        "taskGroup_3": [task_id_6, task_id_5],
        "taskGroup_4": [task_id_2]
    }
    

    代码解释
    任务的分配是由JobAssignUtil类负责。使用者调用assignFairly方法,传入参数,返回TaskGroup配置列表

    public final class JobAssignUtil {
        /**
        * configuration 配置
        * channelNumber, channel总数
        * channelsPerTaskGroup, 每个TaskGroup拥有的channel数目
        */
        public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {
            
            List<Configuration> contentConfig = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
            // 计算TaskGroup的数目
            int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
    
            ......
            // 任务分组
            LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
            // 调用doAssign方法,分配任务
            List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);
    
            // 调整 每个 taskGroup 对应的 Channel 个数(属于优化范畴)
            adjustChannelNumPerTaskGroup(taskGroupConfig, channelNumber);
            return taskGroupConfig;
        }
    }
    

    任务分组
    按照task配置的reader.parameter.loadBalanceResourceMark和writer.parameter.loadBalanceResourceMark,分别对任务进行分组,选择分组数最高的那组,作为任务分组的源。

    /**
    * contentConfig参数,task的配置列表
    */
    private static LinkedHashMap<String, List<Integer>> parseAndGetResourceMarkAndTaskIdMap(List<Configuration> contentConfig) {
        // reader的任务分组,key为分组的名称,value是taskId的列表
        LinkedHashMap<String, List<Integer>> readerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();
        // writer的任务分组,key为分组的名称,value是taskId的列表
        LinkedHashMap<String, List<Integer>> 
        writerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();
    
        for (Configuration aTaskConfig : contentConfig) {
            int taskId = aTaskConfig.getInt(CoreConstant.TASK_ID);
            
            // 取出reader.parameter.loadBalanceResourceMark的值,作为分组名
            String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
            if (readerResourceMarkAndTaskIdMap.get(readerResourceMark) == null) {
                readerResourceMarkAndTaskIdMap.put(readerResourceMark, new LinkedList<Integer>());
            }
            // 把 readerResourceMark 加到 readerResourceMarkAndTaskIdMap 中
            readerResourceMarkAndTaskIdMap.get(readerResourceMark).add(taskId);
    
            // 取出writer.parameter.loadBalanceResourceMark的值,作为分组名
            String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
            if (writerResourceMarkAndTaskIdMap.get(writerResourceMark) == null) {
                writerResourceMarkAndTaskIdMap.put(writerResourceMark, new LinkedList<Integer>());
            }
            // 把 writerResourceMark 加到 writerResourceMarkAndTaskIdMap 中
            writerResourceMarkAndTaskIdMap.get(writerResourceMark).add(taskId);
        }
    
        // 选出reader和writer其中最大的
        if (readerResourceMarkAndTaskIdMap.size() >= writerResourceMarkAndTaskIdMap.size()) {
            // 采用 reader 对资源做的标记进行 shuffle
            return readerResourceMarkAndTaskIdMap;
        } else {
            // 采用 writer 对资源做的标记进行 shuffle
            return writerResourceMarkAndTaskIdMap;
        }
    }
    

    分配任务
    将上一部任务的分组,划分到每个taskGroup里

    private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {
        List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
    
        Configuration taskGroupTemplate = jobConfiguration.clone();
        taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);
    
        List<Configuration> result = new LinkedList<Configuration>();
    
        // 初始化taskGroupConfigList
        List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);
        for (int i = 0; i < taskGroupNumber; i++) {
            taskGroupConfigList.add(new LinkedList<Configuration>());
        }
    
        // 取得resourceMarkAndTaskIdMap的值的最大个数
        int mapValueMaxLength = -1;
    
        List<String> resourceMarks = new ArrayList<String>();
        for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) {
            resourceMarks.add(entry.getKey());
            if (entry.getValue().size() > mapValueMaxLength) {
                mapValueMaxLength = entry.getValue().size();
            }
        }
    
        
        int taskGroupIndex = 0;
        // 执行mapValueMaxLength次数,每一次轮询一遍resourceMarkAndTaskIdMap
        for (int i = 0; i < mapValueMaxLength; i++) {
            // 轮询resourceMarkAndTaskIdMap
            for (String resourceMark : resourceMarks) {
                if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
                    // 取出第一个
                    int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
                    // 轮询的向taskGroupConfigList插入值
                    taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
                    // taskGroupIndex自增
                    taskGroupIndex++;
                    // 删除第一个
                    resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
                }
            }
        }
    
        Configuration tempTaskGroupConfig;
        for (int i = 0; i < taskGroupNumber; i++) {
            tempTaskGroupConfig = taskGroupTemplate.clone();
            // 设置TaskGroup的配置
            tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
            tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
    
            result.add(tempTaskGroupConfig);
        }
        // 返回结果
        return result;
    }
    

    为组分配channel
    上面已经把任务划分成多个组,为了每个组能够均匀的分配channel,还需要调整。算法原理是,当channel总的数目,不能整除TaskGroup的数目时。多的余数个channel,从中挑选出余数个TaskGroup,每个多分配一个。

    比如现在有13个channel,然后taskgroup确有5个。那么首先每个组先分 13 / 5 = 2 个。那么还剩下多的3个chanel,分配给前面个taskgroup。

    private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) {
        int taskGroupNumber = taskGroupConfig.size();
        int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
        int remainderChannelCount = channelNumber % taskGroupNumber;
        // 表示有 remainderChannelCount 个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup + 1;
        // (taskGroupNumber - remainderChannelCount)个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup
    
        int i = 0;
        for (; i < remainderChannelCount; i++) {
            taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
        }
    
        for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) {
            taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
        }
    }
    


    注:

    1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

    2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

    展开全文
  • FPGA管脚分配时需注意的一些事项(以xilinx xc4vsx55为例) 平台:XC4VSX55 ISE10.1     设计过FPGA的原理图,看FPGA的手册,说管脚的分配问题,如时钟管脚要用GC类管脚,而且单端时钟输入时要用P类型的管脚...
  • JVM对象引用与内存分配策略

    千次阅读 2016-05-31 20:46:41
    》,今天针对垃圾回收和内存分配做个深入了解。 关于引用 在《JVM理解其实并不难! 》一文中提到,JVM是通过可达性分析来判断对象是否需要被回收,这可以弥补引用计数法的不足。即就算两个对象相互引用,只要这两...
  • Java垃圾回收器与内存分配策略

    千次阅读 2017-08-26 15:38:52
    当需要排查各种内存泄漏、内存溢出问题时,当来及收集成为系统达到更高并发量的瓶颈时,我们需要对JVM的GC机制和内存分配又更多的了解,这边文章是在上一篇文章的基础之上讲述了Java垃圾回收器与内存分配策略。
  • Elasticsearch 6.6 官方文档 之「索引分片分配

    千次阅读 多人点赞 2019-05-14 15:18:12
    在本模块中,提供每个索引的设置,以控制分片到节点的分配: - [分片分配过滤](https://www.elastic.co/guide/en/elasticsearch/reference/current/shard-allocation-filtering.html):`Shard allocation filtering...
  • windows内存分配总结

    千次阅读 2013-09-11 13:07:13
    MSDN上的解释为:HeapALloc是从堆上分配一块内存,且分配的内存是不可移动的(即如果没有连续的空间能满足分配的大小,程序不能将其他零散的空间利用起来,从而导致分配失败),该分配方法是从一指定地址开始分配,...
  • 注意力机制最新综述解读

    万次阅读 多人点赞 2019-04-16 18:02:24
    注意力模型(Attention Model,AM)已经成为神经网络中的一个重要概念,并在不同...最后,我们讨论了在实际应用中,注意力机制取得的重要影响。我们希望这项调查能够为注意力模型提供一个简明的介绍,并在开发应用方法...
  • 1.【Kafka分区分配策略(1)——RangeAssignor】 2.【Kafka分区分配策略(2)——RoundRobinAssignor和StickyAssignor】 自定义分区分配策略 读者不仅可以任意选用Kafka所提供的3种分配策略,还可以...
  • 首先,在使用动态分配内存技术前,必须明白自己在做什么,这样做与其它的方法有什么不同,特别是会产生哪些负面影响,天下没有免费的午餐。动态分配内存与静态分配内存的区别: 1) 静态内存分配是在编译时完成的,不...
  • 一般分配模型 ...注意,这个规划问题是整数线性规划(ILP)问题,也就是说,两个约束方程,保证每个任务被分配一次。决策变量仅允许取离散值0/1  二、实例分析---穷举法 在讲将匈牙利算法解决任务...
  • JVM内存分配与管理详解

    万次阅读 2018-01-23 16:17:58
    了解C++的程序员都知道,在内存管理领域,都是由程序员维护与管理,程序员用于最高的管理权限,但对于java程序员来说,在内存管理领域,程序员不必去关心内存的分配以及回收,在jvm自动内存管理机制的帮助下,不需要...
  • STL——分配

    千次阅读 2020-06-30 15:42:24
    malloc的大致内存操作如右上图所示,分配为size大小的内存,实际上还有额外的开销,即除蓝色以外的部分,这会造成资源的浪费,分配的字节越小浪费的比例越大,分配的子节越大,浪费的比例越小。当然这是不可避免的,...
  • 数据挖掘与工资分配

    千次阅读 2020-09-01 15:01:51
    企业的工资总额对一个企业的未来发展至关重要,本文以2018年26个省市分公司年运营的统计数据作为研究对象,在合理假设的基础上,综合考虑国企对各省市分公司工资分配影响因素,通过建立合理的模型;对公司工资分配...
  • Java核心-内存分配原理详解

    千次阅读 2013-11-22 17:54:59
    无意中看到的一篇不错的文章,转载记录一下,Java核心-内存分配原理详解 概述 栈、堆、常量池虽同属java内存分配时操作的区域,但其适应范围和功用却大不相同。java内存分配与管理是java的核心技术之一,一般...
  • 内存分配

    千次阅读 2011-12-21 14:44:43
    内存分配器来自于底层软件工程的案例研究.我从1987年开始编写一个内存分配器,并且(在许多志愿者的帮助下)一直维护和完善它.这个分配器实现了标准C例程malloc(),free()和realloc(),以及一些辅助工具例程.这个
  • 浅谈Java内存分配策略

    千次阅读 2016-09-11 20:43:51
    Java 程序运行时的内存分配策略有三种,分别是静态分配,栈式分配,和堆式分配,对应的,三种存储策略使用的内存空间主要分别是静态存储区(也称方法区)、栈区和堆区。 静态存储区(方法区):主要存放静态数据、...
  • 关于slub分配器的一些细节

    千次阅读 2010-02-09 21:22:00
    很早以前写过一篇《linux的最新slab分配器---slub分配器》,那篇文章从源代码的角度分析了slub 分配器的大致流程,但是一些细节没有注意到,因此本文就谈一下slub的一些细节,其实还是要从轮廓上考虑细节,真正的...
  • 一文了解JVM内存分配

    千次阅读 2019-01-05 12:11:55
    关于JVM内存分配一直有想法想自己整理一篇文档,之前总是查询别的博客,对于概念的理解和系统的知识梳理一直没有仔细整理过。所以整理这样一篇文章,夯实基础,后续会查漏补缺,也希望多多指正。 二、概述 ...
  • C++内存分配理解一

    千次阅读 2018-06-02 11:39:54
    内存分配 一般我们在使用C++来创建一个内置对象或者一个类的时候...在不考虑OS API的影响下,一切都是基于C语言当中的malloc/free来进行内存的分配和回收的。 现在再来进一步看一下各个函数: 接下来在看一下具...
  • Java内存分配机制

    千次阅读 2018-08-14 16:49:43
    2 虚拟机就会为其分配内存来存放对象自己的实例变量及其从父类继承过来的实例变量(即使这些从超类继承过来的实例变量有可能被隐藏也会被分配空间)。在为这些实例变量分配内存的同时,这些实例变量也会被赋予默认值...
  • iOS 性能优化 - Allocations分析内存分配

    千次阅读 多人点赞 2018-10-21 14:45:24
    Allocations用来分析静态内存分配。 Demo项目 Demo App Demo是一个简单的图片应用:首页只有一个简单的入口;次级页面会读取本地页面,加滤镜,然后按照瀑布流的方式显示出来;第三个页面提供大图显示; 运行代码:...
  • Java 内存分配全面浅析

    千次阅读 2016-10-20 17:00:11
    本文将由浅入深详细介绍Java内存分配的原理,以帮助新手更轻松的学习Java。这类文章网上有很多,但大多比较零碎。本文从认知过程角度出发,将带给读者一个系统的介绍。 进入正题前首先要知道的是Java程序运行在...
  • kernel内存分配中的vmalloc

    千次阅读 2016-12-23 17:02:56
    在内核初始化完成之后, 内存管理的责任就由伙伴...伙伴系统是一个结合了2的方幂个分配器和空闲缓冲区合并计技术的内存分配方案, 其基本思想很简单. 内存被分成含有很多页面的大块, 每一块都是2个页面大小的方幂. 如果找
  • 深度学习中的注意力机制(2017版)

    万次阅读 多人点赞 2017-12-10 21:57:17
    每个英文单词的概率代表了翻译当前单词“杰瑞”时,注意分配模型分配给不同英文单词的注意力大小。这对于正确翻译目标语单词肯定是有帮助的,因为引入了新的信息。 同理,目标句子中的每个单词都应该学会其对应...
  • 注意1与5点; 1、Linux内核管理内存空间的分配,所有程序对内存空间的申请和其他操作,最终都会交给内核来管理。 2、linux实现的是“虚拟内存系统”,对用户而言,所有内存都是虚拟的,也就是说程序并不是直接运行...
  • 什么是分配率?

    千次阅读 2020-05-26 16:35:39
    诸如“不可持续的分配率”和“您需要保持较低的分配率”等短语似乎仅属于Java Champions的词汇表。 复杂,恐怖并被魔术光环包围。 经常发生的情况是,当您更仔细地查看概念时,魔术会随着抽烟消失。 这篇文章试图...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 472,336
精华内容 188,934
关键字:

影响注意分配