精华内容
下载资源
问答
  • GEPIA (Gene Expression Profiling Interactive Analysis) web服务器是2017年推出的,是基于TCGA和GTEx数据库中肿瘤和正常样本进行基因表达分析的一个资源。今天向大家介绍一下更新和增强的GEPIA2版本,提供了更高的...

    GEPIA (Gene Expression Profiling Interactive Analysis) web服务器是2017年推出的,是基于TCGA和GTEx数据库中肿瘤和正常样本进行基因表达分析的一个资源。今天向大家介绍一下更新和增强的GEPIA2版本,提供了更高的resolution和更多的功能。

     

     

     

    数据库介绍

     

    GEPIA2具有198 619种isoforms功能上相似的蛋白质,具有相似但不完全相同的氨基酸序列,由不同基因编码,或由去除不同外显子的相同基因的RNA转录本编码)和84种癌症亚型,从基因水平扩展到转录本水平将基因表达量化,支持对特定癌症亚型的分析和亚型之间的比较。此外,GEPIA2采用了受单细胞测序研究启发的新的基因特征量化分析技术提供定制分析,用户可以上传自己的RNA-seq数据,并与TCGA和GTEx样本进行比较。还提供了一个用于批量处理的API,以及容易检索分析结果。更新后的web服务器可以通过(见文末)公开访问。

     

    使用方法

     

    文章剩余内容<<<<

     

     

    展开全文
  • 当代自然语言处理都是基于统计的,统计自然需要很多样本,因此语料和词汇资源是必不可少的,本节介绍语料和词汇资源的重要性和获取方式 请尊重原创,转载请注明来源网站www.shareditor.com以及原始链接地址 ...

    当代自然语言处理都是基于统计的,统计自然需要很多样本,因此语料和词汇资源是必不可少的,本节介绍语料和词汇资源的重要性和获取方式

    请尊重原创,转载请注明来源网站www.shareditor.com以及原始链接地址

    NLTK语料库

    NLTK包含多种语料库,举一个例子:Gutenberg语料库,执行:

    nltk.corpus.gutenberg.fileids()

    返回Gutenberg语料库的文件标识符

    [u'austen-emma.txt', u'austen-persuasion.txt', u'austen-sense.txt', u'bible-kjv.txt', u'blake-poems.txt', u'bryant-stories.txt', u'burgess-busterbrown.txt', u'carroll-alice.txt', u'chesterton-ball.txt', u'chesterton-brown.txt', u'chesterton-thursday.txt', u'edgeworth-parents.txt', u'melville-moby_dick.txt', u'milton-paradise.txt', u'shakespeare-caesar.txt', u'shakespeare-hamlet.txt', u'shakespeare-macbeth.txt', u'whitman-leaves.txt']

     

    nltk.corpus.gutenberg就是gutenberg语料库的阅读器,它有很多实用的方法,比如:

    nltk.corpus.gutenberg.raw('chesterton-brown.txt'):输出chesterton-brown.txt文章的原始内容

    nltk.corpus.gutenberg.words('chesterton-brown.txt'):输出chesterton-brown.txt文章的单词列表

    nltk.corpus.gutenberg.sents('chesterton-brown.txt'):输出chesterton-brown.txt文章的句子列表

     

    类似的语料库还有:

    from nltk.corpus import webtext:网络文本语料库,网络和聊天文本

    from nltk.corpus import brown:布朗语料库,按照文本分类好的500个不同来源的文本

    from nltk.corpus import reuters:路透社语料库,1万多个新闻文档

    from nltk.corpus import inaugural:就职演说语料库,55个总统的演说

    语料库的一般结构

    以上各种语料库都是分别建立的,因此会稍有一些区别,但是不外乎以下几种组织结构:散养式(孤立的多篇文章)、分类式(按照类别组织,相互之间没有交集)、交叉式(一篇文章可能属于多个类)、渐变式(语法随着时间发生变化)

    语料库的通用接口

    fileids():返回语料库中的文件

    categories():返回语料库中的分类

    raw():返回语料库的原始内容

    words():返回语料库中的词汇

    sents():返回语料库句子

    abspath():指定文件在磁盘上的位置

    open():打开语料库的文件流

    加载自己的语料库

    收集自己的语料文件(文本文件)到某路径下(比如/tmp),然后执行:

    >>> from nltk.corpus import PlaintextCorpusReader
    >>> corpus_root = '/tmp'
    >>> wordlists = PlaintextCorpusReader(corpus_root, '.*')
    >>> wordlists.fileids()


    就可以列出自己语料库的各个文件了,也可以使用如wordlists.sents('a.txt')和wordlists.words('a.txt')等方法来获取句子和词信息

    条件频率分布

    条件分布大家都比较熟悉了,就是在一定条件下某个事件的概率分布。自然语言的条件频率分布就是指定条件下某个事件的频率分布。

    比如要输出在布朗语料库中每个类别条件下每个词的概率:

    # coding:utf-8
    
    import sys
    reload(sys)
    sys.setdefaultencoding( "utf-8" )
    
    import nltk
    from nltk.corpus import brown
    
    # 链表推导式,genre是brown语料库里的所有类别列表,word是这个类别中的词汇列表
    # (genre, word)就是类别加词汇对
    genre_word = [(genre, word)
            for genre in brown.categories()
            for word in brown.words(categories=genre)
            ]
    
    # 创建条件频率分布
    cfd = nltk.ConditionalFreqDist(genre_word)
    
    # 指定条件和样本作图
    cfd.plot(conditions=['news','adventure'], samples=[u'stock', u'sunbonnet', u'Elevated', u'narcotic', u'four', u'woods', u'railing', u'Until', u'aggression', u'marching', u'looking', u'eligible', u'electricity', u'$25-a-plate', u'consulate', u'Casey', u'all-county', u'Belgians', u'Western', u'1959-60', u'Duhagon', u'sinking', u'1,119', u'co-operation', u'Famed', u'regional', u'Charitable', u'appropriation', u'yellow', u'uncertain', u'Heights', u'bringing', u'prize', u'Loen', u'Publique', u'wooden', u'Loeb', u'963', u'specialties', u'Sands', u'succession', u'Paul', u'Phyfe'])

    注意:这里如果把plot直接换成tabulate ,那么就是输出表格形式,和图像表达的意思相同

    请尊重原创,转载请注明来源网站www.shareditor.com以及原始链接地址

    我们还可以利用条件频率分布,按照最大条件概率生成双连词,最终生成一个随机文本

    这可以直接使用bigrams()函数,它的功能是生成词对链表。

    创建python文件如下:

    # coding:utf-8
    
    import sys
    reload(sys)
    sys.setdefaultencoding( "utf-8" )
    
    import nltk
    
    # 循环10次,从cfdist中取当前单词最大概率的连词,并打印出来
    def generate_model(cfdist, word, num=10):
        for i in range(num):
            print word,
            word = cfdist[word].max()
    
    # 加载语料库
    text = nltk.corpus.genesis.words('english-kjv.txt')
    
    # 生成双连词
    bigrams = nltk.bigrams(text)
    
    # 生成条件频率分布
    cfd = nltk.ConditionalFreqDist(bigrams)
    
    # 以the开头,生成随机串
    generate_model(cfd, 'the')

    执行效果如下:

    the land of the land of the land of the

    the的最大概率的双连词是land,land最大概率双连词是of,of最大概率双连词是the,所以后面就循环了

     

    其他词典资源

    有一些仅是词或短语以及一些相关信息的集合,叫做词典资源。

    词汇列表语料库:nltk.corpus.words.words(),所有英文单词,这个可以用来识别语法错误

    停用词语料库:nltk.corpus.stopwords.words,用来识别那些最频繁出现的没有意义的词

    发音词典:nltk.corpus.cmudict.dict(),用来输出每个英文单词的发音

    比较词表:nltk.corpus.swadesh,多种语言核心200多个词的对照,可以作为语言翻译的基础

    同义词集:WordNet,面向语义的英语词典,由同义词集组成,并组织成一个网络

    展开全文
  • 从逻辑思维角度提升自己表达技巧

    千次阅读 多人点赞 2018-07-12 07:40:00
    结合头脑风暴法找到主要问题,然后在不考虑现有资源的限制基础上,考虑解决该问题的所有可能方法,在这个过程中,要特别注意多种方法的结合可能是个新的解决方法,然后再往下分析,每种解决方法所需要的各种资源,...

    640?wx_fmt=gif

    作者:Sunface(孙飞)

    转自:https://blog.csdn.net/erlib/article/details/80989921

    逻辑性

    从事软件开发行业的同学们或多或少都具有相当不错的逻辑性,毕竟编程开发本身就是逻辑性较强的任务。但是大家是否考虑过这种逻辑性应该怎么应用到社交技巧上?下面就跟着笔者一起来详细分析下吧。

    逻辑思维

    当逻辑性上升到软技能(社交技巧)层面,就成了逻辑思维。

    逻辑思维一直是职场社交和个人职业发展中最重要的软技能之一。它的本质就是在遇到问题时,给你提供一种梳理问题、分析问题及解决问题的方法论。例如,当你需要向领导陈述问题时,你可以运用逻辑思维梳理即将表达的内容,把你的问题背景、论点及结论划分清晰,从而将你的结论层次清晰的传递过对方。

    因此,你的逻辑思维能力越强,相对应的解决问题的能力及沟通技巧就越强。下面的内容,我们将从问题描述、思考过程和沟通表达三个角度对逻辑思维进行展开,问题描述不清楚甚至错误,那么思考过程就失去了意义;同时没有思考过程就没有合理的结论,没有合理的结论也就无法更好的进行沟通表达,因此这两点是逻辑思维的重中之重。

    问题描述

    描述问题的技巧主要为了解决以下的难点:

    · 不能准确的界定问题。

    · 对于问题不能进行较好的拆分

    · 看待问题不全面

    这里,首先我们引入5W2H的方法,该方法在管理学中会经常被提及,但是实际上,我们可以运用到个人决策方面,用来提升自身的问题分析和描述能力。

    提出疑问对于发现问题和解决问题是极其重要的。创造力高的人,都具有擅于提问题的能力,众所周知,提出一个好的问题,就意味着问题解决了一半。提问题的技巧高,可以发挥人的想象力。相反,有些问题提出来,反而挫伤我们的想象力。

    发明者在设计新产品时,常常提出:为什么(Why);做什么(What);何人做(Who);何时(When);何地(Where);如何(How );多少(How much)。这就构成了 5W2H 法的总框架。如果提问题中常有”假如……”、“如果……”、“是否……”这样的虚构,就是一种设问,设问需要更高的想象力。

    640?wx_fmt=jpeg

    一个简单的例子

    其实缺乏 5W2H 导致了问题的场景在生活中是比比皆是的

    A: 老同学,今天遇到你很高兴啊,下次咱们一起去吃饭,聊一聊 

    B: 好啊 

    A: 88 

    B: (黑人问号)心理状态:什么时候去啊?去哪里?AA? 

    B: 得出结论: 看来是随便说说的,并不是真的想去

    因此回答这一小节开头的难点,我们来简单分析下。

    不能准确的界定问题

    如果是这一条的话,以后就用这个方法来思考问题。人的首要思维都是靠大脑在动,其实殊不知,很多事情用手和脑同时动,智商会被拉高的,而且思维会很活跃。所以请记下这个方法,在以后思考问题界定问题的时候,就用笔画下这个 5W2H,然后一一的去对入。时间久后,自然而然的锻炼除了大脑和手共同 “ 思考 ”,从而拉升智商的提高。

    640?wx_fmt=jpeg

    看待问题不全面

    640?wx_fmt=jpeg

    思考过程

    有了良好的问题描述,我们就可以开始更加深入的思考了,其实在描述问题的过程中,本身就进行了初步的思考了,不是吗?(回答不是的同学,请继续看上一节 ;( )

    对问题进行更进一步的明确

    明确问题主要有两个方法:

    1. 设定想要的状态,即设定目标或设定参照物。当问题很明确,那么设定“理所当然”的目标即可,如:公司连续两年赤字,那么设定目标为公司盈利即可;当问题不明确,那么需要设定理想目标,如:公司连续 10 年保持全国第四,那么设定目标可考虑 5 年内成为全国第一。

    2. 把问题具体化到能够思考原因的大小,列举具体事例,从事例中归纳问题。如,年轻员工没有朝气,那么我们可以通过列举具体事例:打招呼有气无力、写资料错误率高、辞职率高等,通过这样的细化,我们就知道具体的问题是什么了。

    深挖原因

    深挖原因的关键在于,不停地询问为什么,逐步深入挖掘。如何有逻辑地深入挖掘原因?这里建议使用MECE 分析法。下面会介绍 MECE,但是大家要注意: 不是所有问题都是严格的按照 MECE 的方法来执行,大家可以在学习后,自己总结一套简化版本,适用于日常情况的,生搬硬套总是会落入下乘的。

    MECE 分析法

    该方法是麦肯锡的黄金法则:四步看透问题的本质,精准解决问题!

    MECE,是 Mutually Exclusive Collectively Exhaustive 缩写,中文意思是”相互独立,完全穷尽”。 也就是对于一个重大的议题,能够做到不重叠、不遗漏的分类,而且能够借此有效把握问题的核心,并解决问题的方法。

    所谓的不遗漏、不重叠指在将某个整体(不论是客观存在的还是概念性的整体)划分为不同的部分时,必须保证划分后的各部分符合以下要求:

    · 完整性(无遗漏),指分解工作的过程中不要漏掉某项,意味着问题的细分是在同一维度上并有明确区分、不可重迭的

    · 独立性(无重复),强调每项工作之间要独立,无交叉重叠,意味着问题的分析要全面、周密

    一个例子

    工厂里搞 5S 管理时候培训师经常使用的一个例子,“怎么整理三个抽屉里的杂乱东西”:

    方法一

    1. 抽屉一的东西分类,整理

    2. 抽屉二的东西分类,整理

    3. 抽屉三的东西分类,整理

    方法二

    1. 定义三个类别,三个抽屉分别规定一个类别

    2. 整理抽屉一到一二三

    3. 整理抽屉二到一二三

    4. 整理抽屉三到一二三

    方法三:

    1. 所有东西都拿出来

    2. 规定三个抽屉内物品的用途

    3. 所有的物品按照类别放进去

    我们来评价一下三种方法: 

    方法一,程序最复杂,需要至少九个步骤,且结果最差,整理后东西仍然是混乱的,按照 5S 的定义只涉及到整理没涉及到整顿; 

    方法二,程序复杂,九个步骤,结果是清晰的; 方法三,步骤简单,结果清晰

    MECE 的四个步骤

    确定范围。

    也就是要明确当下讨论的问题到底是什么,以及我们想要达到的目的是什么。这个范围决定了问题的边界。这也让”完全穷尽“成为一种可能。换句话说,MECE 中的”完全穷尽“是指有边界的穷尽。

    寻找符合 MECE 的切入点。

    所谓的切入点是指,你准备按什么来分,或者说大家共同的属性是什么。比如,是按颜色分、按大小分、按时间序列分还是按重要性分?这一步是最难的,但也是最关键的。在找切入点的时候,一定要记得以终为始!这个时候一定要反复思考,你当初要解决的【问题】或当初分析的【目的】是什么。换句话说,你希望分类后解决什么问题,得出什么结论。

    找出大的分类后考虑是否可以用 MECE 继续细分。

    当你觉得这些内容已经确定以后,仔细琢磨它们。是不是每一项内容都是独立的、可以清楚区分的事情?如果是,那么你的内容清单就是”相互独立的”。如果不是,对它们进行分类和归纳。

    确认有没有遗漏或重复。

    分完类之后必须重新检视一遍,看看有没有明显的遗漏或重复。建议画出一个金字塔结构图,用可视化的方式比较容易发现是否有重叠项。

    注意事项

    1. 在确立问题的时候,通过类似鱼刺图的方法,在确立主要问题的基础上,再逐个往下层层分解,直至所有的疑问都找到,通过问题的层层分解,可以分析出关键问题和初步的解决问题的思路;

    2. 结合头脑风暴法找到主要问题,然后在不考虑现有资源的限制基础上,考虑解决该问题的所有可能方法,在这个过程中,要特别注意多种方法的结合有可能是个新的解决方法,然后再往下分析,每种解决方法所需要的各种资源,并通过分析比较,从上述多种方案中找到目前状况下最现实最令人满意的答案。

    一个案例: 如何组织一场培训

    • 确立核心问题:组织培

    • 列出关键点,并且完全穷尽

    640?wx_fmt=jpeg

    • 检查每一项是否完全独立,如果不是,对它们进行分类和归纳

      640?wx_fmt=jpeg

    • 再检查是否每一层是否完全独立,而且穷尽

    640?wx_fmt=jpeg

    MECE 总结

    我们会发现这种呈现的结构变成了金字塔样式,每一层都是下一层内容的总结概括,而第一层是要阐述的核心问题(或观点),这就是麦肯锡推崇的金字塔思维结构。使用金字塔结构图可以比较容易地发现是否有重叠项。

    MECE 原则最大好处就在于,对于影响问题产生的所有因素进行层层分解,通过分解得出关键问题所在,以及解决问题的初步思路。无论绩效问题还是业绩问题,都可以通过 MECE 不断归纳总结,梳理思路寻找达到目标的关键点。

    沟通表达

    在问题被描述、思考清楚后,就该沟通表达了,可以遵循论点 -> 结论 -> 理由 -> 行动的框架。

    论点/背景

    论点,一般指接下来谈话的中心内容。论点阐述时经常包含背景介绍,它们往往不可分割,阐述论点,应尽量从对方了解的信息开始阐述。

    我们可以使用上本提到的 5W2H 法来细化论点,同时将来各事件元素(时间、地点、人物、事件、原因、如何进展、进展如何等)梳理清楚。

    举一个简单例子,你想找领导聊聊加班的事情,你不能说 “Boss,关于加班我想找你聊一聊“ 。这样显然没有将事件元素陈述清晰,Boss 会无法判断你谈话的内容,他只能去猜测你将要表达内容是关于加班的哪个方面。正确的论点阐述应该是 “Boss,最近,年轻员工加班时间增加过多了,导致了大家怨言较多,人心浮动,我们是不是应该做一些调整?”, 这样论点就描述清楚了。

    结论

    1.不要答非所问 

    结论和论点要紧密相关,这样易于被听众快速理解接。如果问题是“是/否”类型,那就以是否作为结论开始;如果问题是怎么做,那就回答该怎么做,尽量避免答案过于弯绕。

    例如,领导问你最近你的工作完成的如何?你回答说“还在进行中”,其实就不太合理,领导既然这么问,也许是抱着以下目的的:

    · 阶段性的结果是什么

    · 是否存在一些困难

    · 也许你平时跟领导汇报太少了,所以他对你的状态是不清楚的

    · 对你的关心

    结果,你仅仅是一句“还在进行中”,基本没有给领导提供任何可用的信息。

    2.遵循金字塔原则:结论先行 
    否则倾听者容易产生疲倦。当希望给倾听者以准备时间或者倾听者自行得出结论的时候,我们才将结论放后面。

    3.理由 
    理由的陈述关键点在于前文提到的
    MECE 分析法, 筛选“合格”理由作为结论的支撑,避免将一个相似的理由分成多个理由来说。那么如何做到符合 MECE 原则?如何筛选“正确合格”理由?

    · 换位思考 
    从对方角度设想,选择能让对方信服的理由,所以面对不同的听众,要有不同的侧重

    · 理由的理由 
    对于认同感不强的理由,采用“理由的理由”去支持它。“理由的理由”可以是:数据证明、一般常识/规律、事例的累积、已决断的策略、公司规定/制度等

    · 理由推导结论 
    有大量事实,但没有结论的场景,我们可以先陈列大量理由,从理由中推导出结论

    · 理由整理分类 
    已经有结论,但理由支撑没有思路的时候,我们同样可以先陈列大量理由,再整理分类理由

    在目前互联网行业,有一个很好的结论阐述法:基于数据的结论。因此大家在给予结论时,首先给出基于理由推导出的结论,其次应该尽量用数据来佐证你的结论,提高说服力。

    最后我们再介绍一种科学的由理由推导结论的方法: 

    1. 归纳并举:列举理由,通过理由的共通点推测结论,这种推理方式的缺点是显得主观,所以要格外注意考虑例外的情况 
    2. 演绎推理型:三段式,大前提 + 小前提 -> 结论。如果大前提、小前提是错误的,那么演绎推理的结论也是无意义的,所以要注意确保大、小前提是正确的

    最后

    给大家推荐一本书<<零秒思考>>,推荐一篇文章<<使用 A4 纸笔记法 100 天增强逻辑思维>>

    理论懂得再多,如果不实践,那就是纸上谈兵,希望大家最终都能通过实践形成强大的逻辑思维,为未来的成功之路打下坚实的基础。

    640?wx_fmt=png

    展开全文
  • Yarn资源请求处理和资源分配原理解析

    万次阅读 热门讨论 2017-12-14 09:57:31
    FairScheduler的资源调度原理和代码 FairScheduler的调度概览 两种调度时机-心跳调度和持续调度 开始进行资源调度 判断这个application是否适合在这个节点上分配资源运行 YARN请求资源时的locality和relaxility限定 ...

    目录

    概述

    在我的上一篇《Yarn FairScheduler 的资源预留机制导致集群停止服务事故分析》中介绍了我们由12台服务器、每台资源容量(100G,32 vCores)组成的yarn集群由于资源预留导致宕机的一次事故。资源预留是Yarn进行资源分配过程中为了让大的应用不至于被小的饿死而进行资源预定的分配方式。本文就从原理和代码层面详细介绍Yarn的资源分配流程。

    下图是我们向yarn提交任务到最后任务以一个一个container的形式运行起来的大致过程。我们通过客户端向Yarn提交计算任务,Yarn会首先为我们的任务生成一个ApplicationMaster,AM负责对整个应用的资源进行管理,包括资源的申请、释放、任务中container的调度和运行。 关于ApplicationMaster与ResourceManager之间进行通信的详细机制,大家可以参考我的博客《YARN ApplicationMaster与ResourceManager之间基于applicationmaster_protocol.proto协议的allocate()接口源码解析》 ,本文不再重复讲解。本文将讲解的资源调度,就是ApplicationMaster向ResourceManager请求资源以后,Yarn的ResourceManager委托我们所配置的调度器(FairScheduler/CapacityScheduler)决定是否分配资源、分配多少资源的过程。当然,ApplicationMaster本身也是运行在一个container中的,也是进行调度的。下文将进行讲解。

    RM分配Container的流程

    FairScheduler的资源调度原理和代码

    FairScheduler的调度概览

    我们所配置的调度器FairScheduler是运行在ResourceManager内的调度算法。Hadoop的官方文档详细介绍了FairScheduler的使用方法。

    FairScheduler的资源队列之间存在层级关系,树的根节点的代表了整个集群的资源,根节点的名字叫做root。我们定义Yarn的资源队列,就是在root下面定义其子节点,比如,root.queue1、root.queue2等等,即,实际上,资源队列之间形成了资源队列树。FSParentQueue对象代表了树中的非叶子节点,FSLeafQueue代表了树的叶子节点。由于我们提交的任何一个应用都需要运行在某个队列中,因此,叶子节点下面还挂载了正在该队列上运行的应用,Yarn使用FSAppAttempt来作为一个运行时的应用在ResourceManager端的抽象,因此,这些FSAppAttempt对象都挂载在对应的叶子节点下面。

    使用树恰当地表达对集群资源进行划分时所需要的隔离关系和层级关系。比如,我们一个集群资源给公司的所有部门提供计算服务,部门与部门之间有可能是有平级部门的兄弟关系,平级部门之间的资源需要相互隔离,也有可能是上下级部门的关系,上级部门有权利使用下级任何部门的资源。因此,使用树来抽象资源,既可以实现平级部门之间的资源隔离,也能够表达上下级部门之间资源的包含关系。

    资源队列树

    上图就是这个资源树的示意图。FairScheduler通过这样一棵资源树,维护了整个集群的资源使用情况。资源树中的每一个叶子节点都挂载了此时正在这个队列上运行、或者正在申请运行的应用。FairScheduler只需要对这个树进行适当遍历,就可以知道任何一个资源队列当前有哪些应用在运行、队列当前还剩下多少资源、已经使用多少资源等。

    通过这样一个资源树,资源调度的过程就变成了这样一个不断进行的过程:从树中取出一个资源请求,如果这个资源请求不违背队列的最大资源量等限制,也能够在某个服务器上运行(这个服务器剩余资源可供运行这个请求的资源),那么,就让这个请求运行在对应的服务器上,即创建对应的container。实际上,分配了这个container以后,会在ApplicationMaster某一次心跳响应中返回这个分配结果,ApplicationMaster知道资源分配成功,就与对应的NodeManager通信,请求该NodeManager将对应的Container(比如运行Mapper或者Reducer的Container或者运行Spark executor 的Container)在其节点上启动,NodeManager收到请求,如果验证通过,则启动对应的Container。

    那么,一次调度的发生是如何被触发的呢?这就涉及到两种调度时机,即心跳调度和持续调度。

    两种调度时机-心跳调度和持续调度

    心跳调度是最早的yarn的调度方式,在2.3版本以前的yarn只支持心跳调度。Yarn的NodeManager会通过心跳的方式定期向ResourceManager汇报自身状态,当NodeManager向ResourceManager汇报了自身资源情况(比如,当前可用资源,正在使用的资源,已经释放的资源),这个RPC会触发ResourceManager调用nodeUpdate()方法,这个方法为这个节点进行一次资源调度,即,从维护的Queue中取出合适的应用的资源请求(合适 ,指的是这个资源请求既不违背队列的最大资源使用限制,也不违背这个NodeManager的剩余资源量限制)放到这个NodeManager上运行。这种调度方式一个主要缺点就是调度缓慢,当一个NodeManager即使已经有了剩余资源,调度也只能在心跳发送以后才会进行,不够及时

    YARN-1010中引入了连续资源调度机制,不用等待NodeManager向ResourceManager发送心跳才进行任务调度,而是由一个独立的线程进行实时的资源分配等调度,与NodeManager的心跳出发的调度相互异步并行进行。当心跳到来,只需要把调度结果通过心跳响应告诉对应的NodeManager即可。

    我们通过yarn.scheduler.fair.continuous-scheduling-enabled来配置是否打开连续调度功能。默认情况下该功能关闭。

      /**
        ContinuousSchedulingThread负责进行持续的资源调度,与NodeManager的心跳产生的调度同时进行
       */
      private class ContinuousSchedulingThread extends Thread {
        @Override
        public void run() {
          while (!Thread.currentThread().isInterrupted()) {
            try {
              continuousSchedulingAttempt();
              Thread.sleep(getContinuousSchedulingSleepMs());//睡眠很短的一段时间进行下一轮调度
            } catch (InterruptedException e) {
              //略
            }
          }
        }
      }

    FairScheduler.initScheduler()方法中构造了ContinuousSchedulingThread线程对象:

          if (continuousSchedulingEnabled) { 
            // start continuous scheduling thread
            schedulingThread = new ContinuousSchedulingThread();
            schedulingThread.setName("FairSchedulerContinuousScheduling");
            schedulingThread.setDaemon(true);
          }

    FairScheduler.serviceStart()方法中启动了ContinuousSchedulingThread线程。该线程每一轮执行完毕,会通过我们配置的yarn.scheduler.fair.continuous-scheduling-sleep-ms睡眠一段时间,然后进行下一轮调度,默认情况下,这个时间是5ms,这个时间间隔接近实时。而对于心跳调度方式,心跳时间间隔通过yarn.nodemanager.heartbeat.interval-ms配置,默认值 1000ms。我把连续调度的每一次执行叫做一轮,在下文中我们可以看到,每一轮,会遍历当前集群的所有节点,挨个对这个节点进行一次调度,即,取出合适的请求,分配到这个节点上运行。

    这就是持续调度线程的初始化和启动机制 ,其初始化和启动时伴随着我们的FairScheduler调度器的初始化和启动同时进行的,是FairScheduler的一种调度优化机制。

    无论是NodeManager心跳时触发调度,还是通过ContinuousSchedulingThread进行实时、持续触发,他们对某个节点进行一次调度的算法和原理是公用的,都是通过synchronized void attemptScheduling(FSSchedulerNode node)来在某个节点上进行一次调度,方法的参数代表了准备进行资源分配的节点。两种触发机制不同的地方只有两个:

    1. 调度时机:心跳调度仅仅发生在收到了某个NodeManager的心跳信息的情况下,持续调度则不依赖与NodeManager的心跳通信,是连续发生的,当心跳到来,会将调度结果直接返回给NodeManager;
    2. 调度范围:心跳调度机制下,当收到某个节点的心跳,就对这个节点且仅仅对这个节点进行一次调度,即谁的心跳到来就触发对谁的调度,而持续调度的每一轮,是会遍历当前集群的所有节点,每个节点依次进行一次调度,保证一轮下来每一个节点都被公平的调度一次

    开始进行资源调度

    这是连续调度方式的一轮调度开始的入口:

    
    void continuousSchedulingAttempt() throws InterruptedException {
      long start = getClock().getTime();
      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
      //进行调度以前,先对节点根据剩余资源的多少进行排序,从而让资源更充裕的节点先得到调度
      //这样我们更容易让所有节点的资源能够被均匀分配,而不会因为某些节点总是先被调度所以总是比
      //后调度的节点的资源使用率更高
      synchronized (this) {
        Collections.sort(nodeIdList, nodeAvailableResourceComparator);
      }
      // 遍历所有节点,依次对每一个节点进行一次调度
      for (NodeId nodeId : nodeIdList) {
        //FSSchedulerNode是FairScheduler视角下的一个节点
        FSSchedulerNode node = getFSSchedulerNode(nodeId);
        try {
          //判断
          if (node != null && Resources.fitsIn(minimumAllocation,
              node.getAvailableResource())) {
            attemptScheduling(node);
          }
        } catch (Throwable ex) { //这每次调度过程中如果发生异常,这个异常将被捕获,因此不会影响在其它节点上进行调度
          //异常处理,略
          }
        }
      }
    
      long duration = getClock().getTime() - start;
      fsOpDurations.addContinuousSchedulingRunDuration(duration);
    }

    continuousSchedulingAttempt()会遍历所有节点,依次进行资源调度,这里的调度,就是试图找出一个container请求放到这个服务器上运行。为了让整个集群的资源分配在服务器节点之间能够更均匀,调度以前通过资源比较器对节点按照资源余量从多到少排序,从而让资源更充裕的先被调度,这样做更有利于让所有节点的资源使用量达到均衡,而不至于由于某些节点的序号排在前面而总是被先调度,造成资源调度倾斜

    关于比较器,使用的是NodeAvailableResourceComparator比较器,我们跟踪比较器的代码看到,实际上比较资源的时候只考虑了内存,没有考虑vCores等其它资源。

    对于每一个节点,如果节点的资源剩余量大于yarn.scheduler.minimum-allocation-mb所配置的最小调度量才会对这个节点进行调度。如果满足要求,attemptScheduling(node)就开始针对这个节服务器进行调度了,即,把选出合适的资源请求,分配到这个节点上。上面说过,如果是心跳调度模式,也是通过这个方法对发送心跳的节点进行资源调度的。

    synchronized void attemptScheduling(FSSchedulerNode node) {
      //略
      // Assign new containers...
      // 1. Check for reserved applications
      // 2. Schedule if there are no reservations
      FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
      if (reservedAppSchedulable != null) { //如果这个节点上已经有reservation
        Priority reservedPriority = node.getReservedContainer().getReservedPriority();
        FSQueue queue = reservedAppSchedulable.getQueue();
        //如果这个节点被这个应用预定,这里就去判断这个应用是不是有能够分配到这个node上到请求,如果有这样到请求,并且,没有超过队列到剩余资源,那么,就可以把这个预定的资源尝试进行分配(有可能分配失败)
          //而如果发现这个应用没有任何一个请求适合在这个节点运行,或者,当前队列的剩余资源已经不够运行这个预留的、还没来得及执行的container,那么这个container就没有再预留的必要了
        if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
            || !fitsInMaxShare(queue,
            node.getReservedContainer().getReservedResource())) {
          //如果这个被预留的container已经不符合运行条件,就没有必要保持预留了,直接取消预留,让出资源
          reservedAppSchedulable.unreserve(reservedPriority, node);
          reservedAppSchedulable = null;
        } else {
          //对这个已经进行了reservation对节点进行节点分配,当然,有可能资源还是不足,因此还将处于预定状态
          node.getReservedAppSchedulable().assignReservedContainer(node);
        }
      }
      if (reservedAppSchedulable == null) {这个节点还没有进行reservation,则尝试进行assignment
        // No reservation, schedule at queue which is farthest below fair share
        int assignedContainers = 0;
        while (node.getReservedContainer() == null) { //如果这个节点没有进行reservation,那么,就尝试
          boolean assignedContainer = false;
          if (!queueMgr.getRootQueue().assignContainer(node).equals(
              Resources.none())) { //尝试进行container的分配,并判断是否完全没有分配到并且也没有reserve成功
            assignedContainers++; //如果分配到了资源,或者预留到了资源,总之不是none
            assignedContainer = true;
          }
          if (!assignedContainer) { break; }
          if (!assignMultiple) { break; }
          if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
        }
      }
      updateRootQueueMetrics();
    }

    在对这个节点进行调度的时候,会先判断这个节点是不是一个被预留的节点,预留和非预留的节点的处理方式是不同的,具体流程是:

    • 如果这个服务器是一个被某个container预留的服务器,那么,需要对这种状态进行处理,以选择(1)剥夺预留、(2)保持预留状态或者(3)将预留状态转换成分配状态:
      • 剥夺预留: 如果发现这个被预留的应用没有任何一个请求适合在这个节点上运行,另外,由于每一个应用都有所在的资源队列,因此如果我们发现资源队列的剩余资源已经小于这个应用的预留资源,那么,这个预留已经没有存在的必要了,需要取消预留权;从这里我们可以看到,如果一个container在这个节点上是预留状态,说明队列的剩余资源肯定满足container的资源需求,只是服务器剩余资源无法运行这个container;
      • 尝试分配资源:如果我们发现这个在这个节点预留资源的应用的确可以在这个节点运行,并且,预留的资源也的确在队列剩余资源的允许范围内,即,还有预留的必要性,那么,就可以尝试进行一次资源分配了,当然,这个资源分配有可能还是失败,如果失败就已然保持预留状态,这个为预留的container进行 资源分配尝试 的过程是在FSAppAttempt.assignReservedContainer()中进行的;
    • 如果这个节点是一个正常未预留的节点,那么就可以进行正常的资源分配了。

    判断这个application是否适合在这个节点上分配资源运行

    在判断是进行预留权剥夺还是资源分配尝试的时候,需要判断这个预留资源的应用是否有任何一个请求适合在这个节点上运行,以及队列剩余资源是否允许这个预留的container的存在。在这里有必要详细解释什么叫做适合在这个节点上运行,这是通过

    这是通过hasContainerForNode()方法进行判断的:

      /**
       * Whether this app has containers requests that could be satisfied on the
       * given node, if the node had full space.
       * 关于这个方法的判断条件,为什么如果anyRequest==null就直接返回false,这是因为applicationMaster在
       * 为应用申请资源的时候,如果是NODE_LOCAL,顺便也会创建这个节点对应的RACK的RACK_LOCAL的请求和offswitch的请求
       * 这个可以看MRAppMaster发送请求的时候所使用的RMContainerRequestor.addContainerReq()和ApplicationMaster通过
       * 调用AMRMClientImpl.addContainerRequest()申请资源的过程
       * 或者查看董西成的博客http://dongxicheng.org/mapreduce-nextgen/yarnmrv2-mrappmaster-containerallocator/
       */
      public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
        //查找这个优先级下面目前三种请求,一种是没有任何本地化限制的请求,一种是限制为本地机架的请求,一种是限制为本节点内的请求
        ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY); //所有请求,对机架和节点没有要求
        ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName());//在这个节点所在机架上的请求
        ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName());  //在这个节点上的请求
    
        return
            // There must be outstanding requests at the given priority:
            anyRequest != null && anyRequest.getNumContainers() > 0 &&
                // If locality relaxation is turned off at *-level, there must be a
                // non-zero request for the node's rack:
                (anyRequest.getRelaxLocality() ||
                    (rackRequest != null && rackRequest.getNumContainers() > 0)) &&
                // If locality relaxation is turned off at rack-level, there must be a
                // non-zero request at the node:
                (rackRequest == null || rackRequest.getRelaxLocality() ||
                    (nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
                // The requested container must be able to fit on the node:
                Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
                    anyRequest.getCapability(), node.getRMNode().getTotalCapability());
      }

    YARN请求资源时的locality和relaxility限定

    ApplicationMaster客户端向ResourceManager资源申请,会把自己的资源需求构造为一个ResourceRequest对象发送给RM。一个ResourceRequest包含了若干个相同Capability的container的请求的集合,包含以下元素:

    • Capability: 单个Container的资源量,由<mem,vCore>组合构成,是一个Resource类的实现;

    • Containers Number : Container的数量,整个请求的资源量为 container number * capability

    • Resource Name:资源名称(某个节点的ip、某个机架的ip或者是*代表任意),这里叫做名称有些让人难以理解,其实是限制这个ResourceRequest对象资源运行的locality,这里有必要非常具体的讲解YARN的locality。ApplicationMaster客户端在提交应用的时候,有时候会对container运行的位置提出限制,比如,由于某些数据在服务器node1上,因此ApplicationMaster客户端希望用来处理这些数据的container就运行在这个服务器上,这个要求运行在某个特定节点的本地化叫做NODE_LOCAL,也可以,要求运行在某个固定的机架rack1上,这种机架的本地化叫做RACK_LOCAL,或者,也许没有要求(OFF_SWITCH)。因此,Resource Name可以是某个节点的ip(NODE_LOCAL),或者某个机架的ip(RACK_LOCAL),或者是通配符*(OFF_SWITCH);

      本地化分为三种,定义在NodeType中:

      public enum NodeType {
      NODE_LOCAL(0), //请求规定了必须运行在某个的服务器节点
      RACK_LOCAL(1), //请求规定了必须运行在某个机架上
      OFF_SWITCH(2); //这个请求对本地化没有要求
      public int index;
      
      private NodeType(int index) {
        this.index = index;
      }
      }

    • relaxLocality:是否允许某种本地化限制松弛到更低的要求,比如,当某个ResourceRequest要求其container运行在node1,但是node1的资源剩余量始终无法满足要求,那么需要进行本地化松弛,可以放弃必须运行在服务器node1的要求,改为只要求运行在这个服务器node1所在的机架rack1上就行。

      我们用一个例子来讲解请求一个NODE_LOCAL或者RACK_LOCAL是怎样进行的。

      如果某一个Container的运行有本地化需求,比如,这个Container要求只在node1上运行,那么,它其实会为这个Container创建多个不同的locality的请求:

      资源名称         内存         cpu     松弛度
      <“node1”,      “memory:1G”, 1,      true>  //发送resourcename为node1的请求,relaxLocality=true
      <“rack1”,      “memory:1G”, 1,      false/true> //同时发送node1所在的rack的请求,是否松弛relaxLocality的值
      <“*”,          “memory:1G”, 1,      false/true>  //同时发送off-switch请求,是否松弛relaxLocality的值

      如果这个请求值允许在node1上运行,那么relaxLocality==false,即rack1和off-switch的请求中的松弛变量都是false,那么,当node1请求无法满足,RM试图将请求降级到rack1或者off-switch的时候,检查他们的relaxLocality,发现是false,降级不被允许,只能继续等待直到node1满足条件,或者node1始终无法分配资源,资源分配失败。

    总之,ApplicationMaster 在请求container的时候,如果本地化需求是NODE_LOCAL或者RACK_LOCAL,都会为一个container同时发送多个不同的resource name的请求,RM最终只会选择一个请求进行执行,并且,尽量满足NODE_LOCAL,如果不满足就只能RACK_LOCAL,最后只能运行OFF_SWITCH,总之根据可用资源的情况和请求中设置的允许的松弛程度决定是否分配资源。

    在这里,我们可以深刻体会到ApplicationMaster的api是多么复杂

    有了对资源本地化(locality)和本地化松弛(relaxility)的理解,我们就可以看懂hasContainerForNode() 方法用来判断这个应用的资源请求当前是否可以运行在这台服务器上了:

    1. 如果连OFF_SWITCH的请求都没有,那么这个应用肯定不需要运行了。我们在上面说过,无论是NODE_LOCAL、RACK_LOCAL抑或本来就是OFF_SWITCH的请求,都会发送OFF_SWITCH的请求,因此,如果发现一个应用连OFF_SWITCH的请求都没有,就没有必要再考虑其他的了。

    2. 如果OFF_SWITCH的relaxLocality是打开的(允许松弛到OFF_SWITCH的级别),或者虽然是关闭的(不允许松弛到OFF_SWITCH)但是有RACK_LOCAL的请求存在

    3. 如果RACK_LOCAL级别的relaxLocality是打开的(允许松弛到RACK_LOCAL级别),或者,虽然RACK_LOCAL级别的relaxLocality是关闭的(不允许松弛到RACK_LOCAL),但是却有NODE_LOCAL级别的请求

    4. 这个资源请求能够在这个节点上运行,即节点剩余资源足够运行这个请求

    如果以上条件都满足,hasContainerForNode()返回true,则说明这个在这个节点上进行预订的app或许可以从预定状态变成分配状态了,因此,尝试对这个预定进行分配。而如果hasContainerForNode()返回false,说明这个预定的container实际上不可以在这个服务器上运行,因此没有必要继续空占资源,防止资源被长期无效占用。尝试将app在节点上预定的资源进行allocate的过程,是调用的FSAppAttempt.assignReservedContainer()方法,其实最终也是调用private Resource assignContainer(FSSchedulerNode node, boolean reserved)方法,尝试将这个container在这个节点上进行分配,后面一个参数标记了这个节点是否被预定了。

    可见,资源分配的关键方法,就是assignContainer()方法。在进行container分配的过程中,会发生分配成功、或者将资源请求转变为预留状态的过程。我们将具体讲解assignContainer()方法。

    资源分配:assignContainer()

    我在资源调度实现机制概览介绍了Yarn 对队列配置的树形结构。树的每一个节点(注意这里的节点指的是树的节点,不是服务器节点)都是一个资源的抽象,比如,这个节点代表的最大资源、已使用资源、空闲未使用资源。

    树的非叶子节点(FSParentQueue)和叶子节点(FSLeafQueue)都是资源集合,代表了一定的资源,不同的是,非叶子节点(FSParentQueue)只是用来代表他所管理的叶子节点的集合,叶子节点是应用具体运行的队列,因此叶子节点上挂载了正在它上面运行的应用的集合。FSParentQueue和FSLeafQueue都实现了assignContainer()方法,用来在这个队列上进行资源分配。每一次调度,都是从root节点开始,通过递归方式,调用这个节点的assignContainer()方法,尝试为挂载的请求进行一次分配,一旦分配成功则退出递归。我们分别来看FSParentQueue和FSLeafQueue对assignContainer()方法的实现。

    下图是进行一轮资源分配(只有连续调度的一轮分配会对所有服务器挨个进行一次分配,心跳调度只对心跳节点进行一次分配,但是原理相同)的概图。从图中可以看到,为某一个节点进行资源分配,就是对资源树进行递归搜索,依次从资源树最上层的root节点、到普通的非叶子节点、再到叶子节点、然后是应用、最后是应用的某个资源请求,然后将选出的请求分配到对应的服务器上的过程。

    Yarn资源分配概图

    下面,我们就依次从代码角度讲解这个资源分配过程。

    Parent节点调用FSParentQueue.assignContainer()方法进行资源分配

    我在介绍连续调度和心跳调度 的时候,讲到Yarn是遍历所有的服务器,然后对于每一个服务器,对资源树进行一次递归搜索,选出请求在这个服务器上执行。

    这是FSParentQueue.assignContainer()方法:

      public Resource assignContainer(FSSchedulerNode node) {
        Resource assigned = Resources.none();
    
        // If this queue is over its limit, reject
        if (!assignContainerPreCheck(node)) {
          return assigned;
        }
    
        Collections.sort(childQueues, policy.getComparator());
        for (FSQueue child : childQueues) { //从这个for循环可以看出来这是广度优先遍历
          assigned = child.assignContainer(node); //childQueus有可能是FSParentQueue或者FSLeafQueue
          if (!Resources.equals(assigned, Resources.none())) { //如果成功分配到了资源,那么,没有必要再去兄弟节点上进行资源分配了
            break;
          }
        }
        return assigned;
      }

    我们从代码中可以看出,FSParentQueue其实并没有进行实际的资源分配,只是不断遍历子节点直到遇到叶子节点才会尝试进行分配,因此资源分配代码实际上是在FSChildQueue.assignContainer()中执行的。

    同时,我们必须看到FSParentQueue.assignContainer()中对FSParentQueue的子节点的遍历并不是随机排序的,而是通过对应的policy定义了排序规则:

     Collections.sort(childQueues, policy.getComparator());

    这里的Policy就是FairScheduler的官方文档上介绍的不同的Policy,对于FSParentQueue.assignContainer()中使用Policy进行子队列的排序,决定了对某个父节点的多个子节点进行资源分配的顺序。我在我的另外一篇文章将会介绍不同的Policy,在这里我们忽略策略问题。

    同时,我们从for循环的退出规则可以看出,递归遍历过程中,一旦分配成功,for循环即退出,这轮分配结束,代码退出到continuousSchedulingAttempt()的for循环处,开始对下一个节点的剩余资源进行尝试分配。即,一轮分配中,对某个服务器,只会进行一个请求的分配。这样也是为了达到平均分配的结果,避免多个请求都堆积分配到某一个服务器而其它服务器却空闲的情况。

    Leaf节点通过调用FSLeafQueue.assignContainer()方法进行资源分配

    通过从root(FSParentQueue)节点递归调用assignContainer(),最终将到达最终叶子节点的assignContainer()方法,才真正开始进行分配:

    /**
     * 尽量将自己的app分配到某个节点上,FSLeafQueue会遍历自己目前所有的runnableApps,然后逐个尝试进行分配,只要有一个分配成功就退出
     * @param node 等待进行container分配的节点
     * @return
     */
    @Override
    public Resource assignContainer(FSSchedulerNode node) {
      Resource assigned = Resources.none();
      if (!assignContainerPreCheck(node)) {
        return assigned;
      }
      Comparator<Schedulable> comparator = policy.getComparator();//根据对应的policy提供的排序器对apps进行排序
      writeLock.lock();
      try {
        Collections.sort(runnableApps, comparator);
      } finally {
        writeLock.unlock();
      }
      // Release write lock here for better performance and avoiding deadlocks.
      // runnableApps can be in unsorted state because of this section,
      // but we can accept it in practice since the probability is low.
      readLock.lock();
      try {
        for (FSAppAttempt sched : runnableApps) { //对排序完成的资源一次进行调度,即对他们进行资源分配尝试
          if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
            continue;
          }
    
          assigned = sched.assignContainer(node); //这里的sched应该是FSAppAttempt
          if (!assigned.equals(Resources.none())) { //如果发现进行了资源分配,即,不管是进行了预留,还是进行了实际的分配,都跳出循环
            break;
          }
        }
      } finally {
        readLock.unlock();
      }
      return assigned;
    }

    在进行分配以前,通过调用assignContainerPreCheck(node)判断能否在当前这个leaf queue上往这个节点进行资源分配,即,如果这个队列的已使用资源小于这个队列的最大可使用资源(还有剩余资源)并且这个节点没有被预定,那么才可以继续往下走进行分配:

      /**
       * 判断当前的这个Queue是否能够进行一次资源分配,即,
       * 如果这个队列已经使用的资源小于最大资源并且这不是一个被预定的节点才能进行分配,否则,不可以再分配container了
       */
      protected boolean assignContainerPreCheck(FSSchedulerNode node) {
        if (!Resources.fitsIn(getResourceUsage(),
            scheduler.getAllocationConfiguration().getMaxResources(getName()))
            || node.getReservedContainer() != null) {
          return false;
        }
        return true;
      }
    
     public static boolean fitsIn(Resource smaller, Resource bigger) {
        return smaller.getMemory() <= bigger.getMemory() &&
            smaller.getVirtualCores() <= bigger.getVirtualCores();
      }

    同样,我们看到,和FSParentQueue.assignContainer()中对挂载的子节点进行排序一样,使用Policy对挂载的FSAppAttempt进行了排序。显然,根据不同的Policy定义的比较器,决定了对Application进行资源分配的先后顺序。

    在完成了队列资源检查并对这个叶子节点下面挂载的应用进行了排序,就开始遍历这些应用并尝试将某个应用的请求分配到当前的这个yarn服务器节点。同样,我们从for循环可以看到,一旦有成功分配,循环立刻退出。这种限制一轮分配最多只进行一次成功分配,是为了请求被均匀分配到服务器节点,以及每个队列都得到及时的请求分配,而不至于在某一轮分配中将请求全部分配到某个节点,或者,某个资源队列进行了多次调度,其它队列却一次都没有被调度。

    这样,FSLeafQueue.assignContainer()最终通过调用具体的Application的assignContainer()方法实现调度。在RM端,每个Application使用FSAppAttemt对象来表示。

    应用通过调用FSAppAttemt.assignContainer()方法进行资源分配

    这是FSAppAttemt.assignContainer()方法:

    private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
    
     //返回一个基于Priority进行排序的Priority集合
      Collection<Priority> prioritiesToTry = (reserved) ?
          Arrays.asList(node.getReservedContainer().getReservedPriority()) :
          getPriorities();
    
      // For each priority, see if we can schedule a node local, rack local
      // or off-switch request. Rack of off-switch requests may be delayed
      // (not scheduled) in order to promote better locality.
      synchronized (this) {
        for (Priority priority : prioritiesToTry) {
          if (getTotalRequiredResources(priority) <= 0 ||
              !hasContainerForNode(priority, node)) {
            continue;
          }
    
          addSchedulingOpportunity(priority);
    
          // Check the AM resource usage for the leaf queue
          if (getLiveContainers().size() == 0 && !getUnmanagedAM()) {
            if (!getQueue().canRunAppAM(getAMResource())) {
              return Resources.none();
            }
          }
    
          ResourceRequest rackLocalRequest = getResourceRequest(priority,
              node.getRackName());
          ResourceRequest localRequest = getResourceRequest(priority,
              node.getNodeName());
    
          if (localRequest != null && !localRequest.getRelaxLocality()) {
            LOG.warn("Relax locality off is not supported on local request: "
                + localRequest);
          }
    
          NodeType allowedLocality;
          if (scheduler.isContinuousSchedulingEnabled()) { //如果使用的是持续调度,那么需要根据当前的时间确认当前对这个priority可以采取的本地化水平
            allowedLocality = getAllowedLocalityLevelByTime(priority,
                scheduler.getNodeLocalityDelayMs(),
                scheduler.getRackLocalityDelayMs(),
                scheduler.getClock().getTime()); //根据时间去决定允许的本地策略是NODE_LOCAL/RACK_LOCAL/OFF_SWITCH
          } else {
            allowedLocality = getAllowedLocalityLevel(priority,
                scheduler.getNumClusterNodes(),
                scheduler.getNodeLocalityThreshold(), //yarn.scheduler.fair.locality.threshold.node ,这是一个从0到1之间的小数,代表,我必须经过多少次失败的调度,才能允许将本地化策略降级到RACK_LOCAL
                scheduler.getRackLocalityThreshold());//yarn.scheduler.fair.locality.threshold.node,这是一个从0到1之间的小数,代表,我必须经过多少次失败的调度,才能允许将本地化策略降级到OFF_SWITCH
          }
    
          if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
              && localRequest != null && localRequest.getNumContainers() != 0) { //如果NODE_LOCAL/RACK_LOCAL都不是空的,那么进行NODE_LOCAL级别的调度
            return assignContainer(node, localRequest,
                NodeType.NODE_LOCAL, reserved);
          }
    
          if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
            continue;
          }
    
          if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
              && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
              allowedLocality.equals(NodeType.OFF_SWITCH))) { //如果RACK_LOCAL的请求不是空的并且允许的本地化策略是RACK_LOCAL/OFF_SWITCH,则进行RACK_LOCAL调度
            return assignContainer(node, rackLocalRequest,
                NodeType.RACK_LOCAL, reserved);
          }
    
          ResourceRequest offSwitchRequest =
              getResourceRequest(priority, ResourceRequest.ANY); //否则,进行OFF_SWITCH调度
          if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
            continue;
          }
    
          if (offSwitchRequest != null &&
              offSwitchRequest.getNumContainers() != 0) {
            if (!hasNodeOrRackLocalRequests(priority) ||
                allowedLocality.equals(NodeType.OFF_SWITCH)) {
              return assignContainer(
                  node, offSwitchRequest, NodeType.OFF_SWITCH, reserved); //进行OFF_SWITCH调度
            }
          }
        }
      }
      return Resources.none();
    }

    根据Priority优先级排序以及资源请求的Priority简介

    每一个FSAppAttempt保存了自己当前所有container请求的Priority的list,通过调用getPriorities()获取了按照Priority(优先级)排序的container请求的list,我们来看这个对Priority进行排序的比较器:

      //一个排序的HashSet,用来保存当前这个applicaiton的所有请求的优先级
      final Set<Priority> priorities = new TreeSet<Priority>(
          new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
    
    public static class Comparator 
      implements java.util.Comparator<org.apache.hadoop.yarn.api.records.Priority> {
        @Override
        public int compare(org.apache.hadoop.yarn.api.records.Priority o1, org.apache.hadoop.yarn.api.records.Priority o2) {
          return o1.getPriority() - o2.getPriority(); //从比较器来看,值越小优先级越高
        }
      }

    这些container请求的Priority是不同类型的应用的ApplicationMaster自己决定的。以MapReduce为例,MapReduce的任务的ApplicationMaster实现类是MRAppMaster,它委托RMContainerAllocator向远程的ApplicationMasterService请求资源,MR的任务类型包括Map任务、Reduce任务和Fail Map任务(失败需要重试的map),我们看在RMContainerAllocator中对这三种类型的任务的优先级定义:

    static final Priority PRIORITY_FAST_FAIL_MAP;
    static final Priority PRIORITY_REDUCE;
    static final Priority PRIORITY_MAP;
    //略
    static {
      PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
      PRIORITY_FAST_FAIL_MAP.setPriority(5);
      PRIORITY_REDUCE = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
      PRIORITY_REDUCE.setPriority(10);
      PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
      PRIORITY_MAP.setPriority(20);
    }

    可以看到,FAST_FAIL_MAP任务、REDUCE任务和MAP任务的Priority分别是5,10,15,数字越小,优先级越高,因此在FSAppAttempt.assignContainer()中这个请求就会优先被考虑,即,如果有失败的Map任务,这个失败的Map任务优先执行,其次是Reduce任务,最后是正常的Map任务。实际运行情况下,MR任务总是先产生map任务,因此先只提交map任务运行,然后如果有reduce任务,reduce任务将优先于现有的map任务运行,而失败的map任务由于需要重试,其它reduce任务可能在等待这个失败的map任务执行完才能进入下一个阶段,因此它的优先级最高。

    获得排序后的Priority,就可以按照优先级,遍历这个Priority的list,取出对应Priority的container请求,尝试进行资源分配。

    分配前的校验

    对于某一个priority的请求,在进行资源分配之前,会进行以下检查:

    • 确认这个priority的确有请求存在,因为FSAppAttemtp除了保存所有请求的Priority的list,还保存了每一个Priority到请求的对应关系,即Priority到ResourceRequest的对应关系。我在 YARN请求资源时的locality和reality限定 这一节讲过RequestRequest对象的结构;

    • 再次确认这个priority对应的请求的确适合在这个节点上运行,同样是调用hasContainerForNode()方法进行判断了,我在判断这个application是否适合在这个节点上分配资源运行 这一节专门讲过请求的本地化特性以及hasContainerForNode()的判断机制;

    • 如果当前准备分配的container是ApplicationMaster的container,那么这个container的分配需要判断是否满足maxAMShare的配置限制;maxAMShare是FairScheduler用来限制一个队列的资源最多可用来运行ApplicationMaster而不是具体job的比例。我们具体来看看这一部分代码:

      //如果liveContainer==0,并且这个Application的AM是一个managedAM,那么在分配container的时候必须考虑
      //是否超过了当前队列的maAMShare配置的最大AM比例值
      if (getLiveContainers().size() == 0 && !getUnmanagedAM()) {
      if (!getQueue().canRunAppAM(getAMResource())) {
        return Resources.none();
      }
      }
      
       /*
       * 判断当前队列是否能够运行一个ApplicationMaster应用
       * @param amResource 需要运行的am的资源量
       * @return true if this queue can run
       */
      public boolean canRunAppAM(Resource amResource) {
        float maxAMShare = //获取队列的maxAMShare参数,即队列允许用来运行AM的资源总量
            scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
        if (Math.abs(maxAMShare - -1.0f) < 0.0001) { //如果配置的值为-1.0f,则说明没有限制
          return true;
        }
        //计算队列中可以用来运行AM的最大资源量,即,用队列的FairShare * maxAMShare,这里的fair share指的是instaneous fair share值
        Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
        Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); //计算如果运行了这个am以后这个队列所有的am的资源使用量
        return !policy
            .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);  //对于默认的FairSharePolicy,判断如果运行了这个am,是否超过了maxAMShare的限制
      }

      了解一下什么叫做 unmanaged AM:yarn中ApplicationMaster分为两种,一种是unmanaged am,这种ApplicationMaster独立运行在yarn之外,虽然需要与Yarn RM进行通信和进行资源申请,但是其本身运行所需要的资源不是yarn分配的;另外一种叫做managed AM,即客户端在向yarn提交应用,会先申请ApplicationMaster的启动请求,yarn会分配一个container来运行ApplicationMaster,然后ApplicationMaster会向yarn进行Map/Reduce job的资源申请以及job的管理;

      对于managed am,yarn通过maxAMShare对资源队列中用来运行ApplicationMaster的资源进行了限制,这是为了防止发生资源死锁:如果一个队列中太多的资源都用来运行ApplicationMaster,即队列被很多小的应用充斥,应用都在运行,此时队列资源耗尽,所有ApplicationMaster都申请不到新的container,因此不断等待,新的应用又无法申请新的资源。这段代码就是判断当前的这个container是不是AppliationMaster的container,如果是,则必须满足maxAMShare的资源限制。

    • 在完成了以上校验,即确认可以进行container的分配,就开始进行资源分配。

    判断资源分配的locality并进行资源分配

    yarn需要确认当前这一个分配是进行的NODE_LOCAL/RACK_LOCAL/OFF_SWITCH中的哪种分配。显然,从优先级角度,NODE_LOCAL>RACK_LOCAL>OFF_SWITCH,因为Yarn是最希望能够满足container的NODE_LOCAL请求的,当然,如果不能满足,则只能进行本地化降级,这种降级不是第一次发现无法满足NODE_LOCAL就立刻进行的,而是稍作延迟,如果还是无法按照原本地化标准执行,则对本地化进行降级,因为,NODE_LOCAL现在满足不了,但是也许过一会儿就可以满足了,我们来看:

            ResourceRequest rackLocalRequest = getResourceRequest(priority,
                node.getRackName()); //获取这个应用请求这个节点所在机架的RACK_LOCAL的请求
            ResourceRequest localRequest = getResourceRequest(priority,
                node.getNodeName()); //获取这个应用请求这个节点的NODE_LOCAL的请求
            NodeType allowedLocality;
            if (scheduler.isContinuousSchedulingEnabled()) { //如果使用的是持续调度,那么需要根据当前的时间确认当前对这个priority可以采取的本地化水平
              allowedLocality = getAllowedLocalityLevelByTime(priority,
                  scheduler.getNodeLocalityDelayMs(),
                  scheduler.getRackLocalityDelayMs(),
                  scheduler.getClock().getTime()); //根据时间去决定允许的本地策略是NODE_LOCAL/RACK_LOCAL/OFF_SWITCH
            } else {
              allowedLocality = getAllowedLocalityLevel(priority,
                  scheduler.getNumClusterNodes(),
                  scheduler.getNodeLocalityThreshold(), //yarn.scheduler.fair.locality.threshold.node ,这是一个从01之间的小数,代表,我必须经过多少次失败的调度,才能允许将本地化策略降级到RACK_LOCAL
                  scheduler.getRackLocalityThreshold());//yarn.scheduler.fair.locality.threshold.node,这是一个从0到1之间的小数,代表,我必须经过多少次失败的调度,才能允许将本地化策略降级到OFF_SWITCH
            }

    Yarn会通过调用getAllowedLocalityLevelByTime()或者getAllowedLocalityLevel()判断当前允许的本地化策略,他们分别从时间层面和重试次数层面决定是不是已经经历了足够长时间的等待或者足够多次的等待,如果等待时间够长,或者失败次数哦够多,就只能尝试进行本地化降级了。

    getAllowedLocalityLevelByTime()是用在连续性调度方式的,是从时间层面确定当前运行的本地化,思想是,如果距离上一次Container的分配时间已经超过了阈值,说明这个NODE_LOCAL的调度已经失败了一段时间,因此需要立刻降级:

      /**
       * 返回允许的进行container调度的本地化水平,参数nodeLocalityDelayMs、rackLocalityDelayMs分别代表对NODE_LOCAL和RACK_LOCAL的
       * 进行降级前需要延迟的时间阈值
       */
      public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
              long nodeLocalityDelayMs, long rackLocalityDelayMs,
              long currentTimeMs) {
    
        //默认NODE_LOCAL和RACK_LOCAL的延迟调度时间是-1,代表我们的FairSchedulerd调度器
        // if not being used, can schedule anywhere
        if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { //如果NODE_LOCAL或者RACK_LOCAL允许off-swtich本地级别的调度,则直接返回OFF_SWITCH的本地级别
          return NodeType.OFF_SWITCH;
        }
    
        //默认的本地级别是NODE_LOCAL
        if (! allowedLocalityLevel.containsKey(priority)) { //
          allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
          return NodeType.NODE_LOCAL;
        }
    
        NodeType allowed = allowedLocalityLevel.get(priority); //获取这个优先级运行的LOCAL级别
    
        // if level is already most liberal, we're done
        if (allowed.equals(NodeType.OFF_SWITCH)) { //如果这个LOCAL级别直接允许OFF_SWITCH,那就直接OFF_SWITCH
          return NodeType.OFF_SWITCH;
        }
    
        //如果这个LOCAL级别不允许OFF_SWITCH调度,就需要根据RACK_LOCAL或者NODE_LOCAL以及超时时间进行判断最终是进行NODE_LOCAL、RACK_LOCAL还是OFF_SWITCH的本地级别
        // check waiting time
        long waitTime = currentTimeMs;
        if (lastScheduledContainer.containsKey(priority)) { //如果上一次调度的container还有这个优先级的 , 则用当前时间减去上一个container的调度时间从而获得等待时间
          waitTime -= lastScheduledContainer.get(priority);
        } else {
          waitTime -= getStartTime();//否则,等待时间就是从这个application启动的时间到现在的时间
        }
    
        long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
                nodeLocalityDelayMs : rackLocalityDelayMs; //RACK_LOCAL或者NODE_LOCAL等待的阈值
    
        //如果目前的等待时间已经超过了thresholdTime
        if (waitTime > thresholdTime) { //如果等待时间超过了阈值
          if (allowed.equals(NodeType.NODE_LOCAL)) { //
            allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);//将NODE_LOCAL降级为RACK_LOCAL
            resetSchedulingOpportunities(priority, currentTimeMs);
          } else if (allowed.equals(NodeType.RACK_LOCAL)) {//将RACK_LOCAL降级为OFF_SWITCH
            allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
            resetSchedulingOpportunities(priority, currentTimeMs);
          }
        }
        //如果等待时间还没有超时,那就不会对locality进行降级,原来是什么,现在还是什么
        return allowedLocalityLevel.get(priority);
      }

    getAllowedLocalityLevel()是从重试次数角度去考虑的,用在心跳调度方式,即,如果发现针对这个请求的重试次数已经超过了我们的FairScheduler规定的次数,则没有必要再等待了,直接降级处理:

    /**
     * 给出当前集群的节点数量以及在进行本地化降级前失败的调度的次数阈值(这个阈值是一个比例值,即失败的次数占集群节点规模的比例),
     * 返回允许调度对当前这个Priority的container请求进行调度的locality,
     */
    public synchronized NodeType getAllowedLocalityLevel(Priority priority,
        int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) {
      // upper limit on threshold
      if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
      if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
    
      // If delay scheduling is not being used, can schedule anywhere
      if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { //如果我们没有打开延迟调度策略,那么,直接就用OFF_SWITCH
        return NodeType.OFF_SWITCH;
      }
      //如果已经配置了延迟调度,则根据
    
      // 默认的本地化策略是NODE_LOCAL
      if (!allowedLocalityLevel.containsKey(priority)) { //如果这个优先级目前还没有保存在allowedLocalityLevel,则使用默认的NODE_LOCAL的本地化策略,因为我们总是希望进行NodeLocal的调度
        allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
        return NodeType.NODE_LOCAL;
      }
    
      NodeType allowed = allowedLocalityLevel.get(priority); //如果当前优先级已经有了对应的允许的本地策略略,则根据允许的本地策略
    
      // If level is already most liberal, we're done
      if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; //乳沟直接就允许OFF_SWITCH,就OFF_SWITCH
    
      double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
        rackLocalityThreshold; //获取允许的本地级别对应的阈值
    
      // Relax locality constraints once we've surpassed threshold.
      if (getSchedulingOpportunities(priority) > (numNodes * threshold)) { //如果超过了阈值,则需要进行本地化降级
        if (allowed.equals(NodeType.NODE_LOCAL)) {
          allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); //将本地化策略从NODE_LOCAL降级为RACK_LOCAL
          resetSchedulingOpportunities(priority);
        }
        else if (allowed.equals(NodeType.RACK_LOCAL)) {
          allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); //将本地化策略从RACK_LOCAL降级为OFF_SWITCH
          resetSchedulingOpportunities(priority);
        }
      }
      //如果还没有达到阈值,则该是什么本地化就还是什么本地化
      return allowedLocalityLevel.get(priority);
    }

    由此可以看到,连续调度和心跳调度对于本地化降级的处理思想其实是相同的,都是在降级之前进行适当的延迟。不同之处只是降级时机的判断标准不同

    连续调度的降级时机是在时间层面判断距离上一次成功调度的时间是否已经超过了FairScheduler所规定的阈值,如果是,则可以立刻进行降级了,我们通过这两个配置项控制时间阈值:

    而心跳调度的降级时机是在失败调度的次数层面,即,连续失败的次数是否已经超过了指定的阈值(这个阈值其实是一个比例值,乘以集群节点数,就是允许的失败次数),如果超过了,则需要降级,我们通过这两个配置项控制这个比例阈值:

    在完成了本地化判断以后,就可以开始为container分配资源了,即调用private Resource assignContainer( FSSchedulerNode node, ResourceRequest request, NodeType type, boolean reserved),用来在确认了本地化策略后进行container的资源分配。assignContainer(...)的响应代码虽然比较繁杂,但是难度都不大,因此不做列出。

    下图描述了 assignContainer()方法分配这个container的流程:

    assignContainer()的具体流程

    最后一个参数reserved标记当前是否是在给一个处于reserved状态的container分配资源,如果是一个reserved container,则不需要新创建container,而是直接把这个预留态的container从RMContainerState.RESERVED状态变成RMContainerState.ALLOCATED,而如果不是,则需要新创建container,并直接从RMContainerState.NEW状态变为RMContainerState.ALLOCATED状态,这个我们可以从RMContainerImpl中定义的container的状态机看到:

    
      private static final StateMachineFactory<RMContainerImpl, RMContainerState, 
                                               RMContainerEventType, RMContainerEvent> 
       stateMachineFactory = new StateMachineFactory<RMContainerImpl, 
           RMContainerState, RMContainerEventType, RMContainerEvent>(
          RMContainerState.NEW)
    
        // Transitions from NEW state
        .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED,
            RMContainerEventType.START, new ContainerStartedTransition())
        //略
        .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED,
            RMContainerEventType.START, new ContainerStartedTransition())
    

    可以看到,如果发生了RMContainerEventType.START事件,处于RMContainerState.NEW或者RMContainerState.RESERVED状态的container都会变成RMContainerState.ALLOCATED状态,同时,这个状态机的hook类是ContainerStartedTransition,会被调用。关于RM端container的状态定义和状态转换关系,大家可以参考这篇博客:《RMContainer状态机分析》

    ContainerStartedTransition判断当前的container是否是AM container,如果是,则需要开始ApplicationMaster的启动过程,并更新整个Appliation Attempt的状态(Appliation Attempt是整个Application的运行时的实例),因为,如果这个container是整个ApplicationMaster的container,那么整个Appliation Attempt的状态就发生变化,比如,如果这个continer是AM的container并且分配成功,那么整个Appliation Attempt的状态将从SCHEDULED变为 ALLOCATED_SAVING状态。关于Application Attempt的状态转换,大家可以参考《RMAppAttempt状态机分析》

    因此,ApplicationMaster Container和普通的执行具体Map/Reduce任务的Container分配以后的处理流程是不同的:

    • 如果是ApplicationMaster的Container被成功分配,这个分配事件最终会注册给ResourceManager的ApplicationMasterLaucher,ApplicationMasterLaucher维护着一个独立的线程,不断检测是否有新的ApplicationMaster的启动事件,如果有,就会通过ContainerManagementProtocol协议与container所在的服务器通信,直接在对应的节点上把这个ApplicationMaster进程运行起来,AppAttempt的状态因此从ALLOCATED变为LAUCHED;ApplicationMaster运行起来以后,开始进行普通的container的请求;

    • 如果是普通的container的请求,是ApplicationMaster向ResourceManager请求资源,ResourceManager会分配对应的Container并将分配结果告知ApplicationMaster,ApplicationMaster会和对应的NodeManager通信,启动这个container,这个通信也是基于ContainerManagementProtocol协议进行的。

    RM分配Container的流程

    如果这个启动的container是普通的container,那么只是会更新RM端每个应用的container信息等等。下一次收到了ApplicationMaster的心跳,就会把这些新分配的container信息返回给ApplicationMaster,ApplicationMaster接着会在对应的NodeManager上启动这些container。

    从上图中还可以看到,如果这个container请求满足队列剩余资源,即队列剩余可用资源大于container需要的资源,但是,这个服务器节点的剩余资源却不足以运行这个container,就需要进行资源预留。关于资源预留,大家可以参考我专门讲解资源预留机制的博客:《Yarn FairScheduler 的资源预留机制导致集群停止服务事故分析》

    总结

    Yarn通过资源树的形式来对整个集群的资源进行横向划分和纵向层级划分,很好地表达了企业应用中需要表达的部门之间的资源隔离以及上下级部门之间的资源分层逻辑。

    Yarn的资源调度不是阻塞式的,即,不是ApplicationMaster发起资源请求、Yarn处理请求然后进行资源分配然后将分配结果通过本次响应返回给ApplicationMaster。每次ApplicationMaster发起请求,Yarn会把这些请求挂载到上文提到的资源队列树中。然后,通过连续调度方式,以一个独立的线程,每隔一段时间,对集群中所有的服务器进行一轮调度,或者,某个NodeManager的心跳信息到来以后触发对这个心跳服务器的资源分配。然后,当收到某个AppliationMaster的资源请求以后,就将当前已经进行了成功分配的分配结果通过这次请求的Response返回给AM,显然,这次返回的分配成功的container完全可能是上一次所请求的资源的container。

    由此可见,AM和RM之前的资源请求通信,是一种心跳式的通信,AM的心跳定时发出,如果有新的请求,心跳就携带这些请求,否则不携带任何请求,如果有新的分配结果,心跳的响应就会携带回这些结果,否则,不返回任何新的container的分配结果。

    Yarn将资源请求的本地化层级分为节点内、机架内和集群内三个本地化级别,满足某些应用希望数据计算和数据本身在同一节点,或者在同一机架的需求。对于节点内或者机架内的请求无法满足,Yarn采用延迟调度的方式,即过一段时间再尝试满足这种本地化需求。如果失败时间或者失败次数超过限制,就进行本地化降级,比如,如果重试N次或者N分钟发现仍然无法将请求分配到应用所指定的node,则尝试分配到这个node相同的机架,或者,如果重试N次或者N分钟发现仍然无法将请求分配到应用所指定的机架,则只好将应用分配到集群内任何可用的节点上。

    展开全文
  • Android应用程序资源的编译和打包过程分析

    万次阅读 多人点赞 2013-04-15 00:57:14
    我们知道,在一个APK文件中,除了代码文件之外,还有很多资源文件。这些资源文件是通过Android资源打包工具aapt(Android Asset Package Tool)打包到APK文件里面的。在打包之前,大部分文本格式的XML资源文件还会...
  • 最近很多朋友问我刷题、面试没有什么好的资源。今天就给大家找了三个棒的开源资源,内容非常硬核,很多人靠着它进了大厂。 不绕弯子,三个分别是谷歌师兄《谷歌大佬的刷题笔记》,东哥《labuladong的算法小抄》...
  • 第1章 基带无线资源概述 1.1 无线资源的作用 所谓无线资源是指能够承载用户二进制数据的无线信号。 有点类似飞机以及飞机上的座位。 不同的频率类似不同航班的飞机。 不同子载波波类似与同一个飞机上的不同...
  • RDF-资源描述框架

    千次阅读 2015-09-26 16:55:19
    资源描述框架(Resource Description Framework) RDF--Web数据集成的元数据解决方案 罗威(中国国防科技信息中心 北京 100036) 2001-6-8 一. 引言  在现今的社会中,信息无处不在,从这些信息中获取对自己有用的...
  • 博客是记录学习历程、分享经验的最佳平台,多年以来,各路技术大牛在ITeye网站上产生了大量优质的技术文章,并将系列文章集结成专栏,以便读者能够更便捷、更系统地浏览学习,这些可称之为“编程精华资源”。...
  • 机器学习资源汇总(持续更新)

    千次阅读 2016-10-23 18:37:25
    因为最近忙着看论文,自己的基础又不是很好,所以需要参考大量资料来让自己理解论文的内容。看了很多博文大多数都讲解的比较粗略,或是直接搬运了其他文章,导致逻辑混乱,并不能很好地让我理解一些概念,不过还是...
  • 上一篇博客中,客户端已连接到ed2k网络及客户端与服务器交互的eMule源码梳理,这里将开始搜索资源并下载及客户端与客户端交互的eMule源码梳理 搜索资源并下载,这是一个即包含和和服务器交互还包含与另一些客户端...
  • yarn资源调度参数配置

    千次阅读 2018-07-12 22:11:27
    在Hadoop2.0中, YARN负责管理MapReduce中的资源(内存, CPU等)并且将其打包成Container. 这样可以精简MapReduce, 使之专注于其擅长的数据处理任务, 将无需考虑资源调度. 如下图所示 YARN会管理集群中所有机器的可用...
  • ArcGIS制图表达Representation-符号制作

    千次阅读 2016-10-24 10:11:53
    标准符号几乎被绝大部分ArcGIS用户所认知并长期使用,但制图表达符号几乎不人为所知,哪怕是那些使用制图表达的人员,几乎只关注制图表达的规则和几何效果,在制图表达符号制作方面面临着无从下手的境地。...
  • 让你的思考和表达逻辑性 金字塔原理 金字塔原理是一种重点突出、逻辑清晰、主次分明的逻辑思路、表达方式和规范动作,该原理可应用于商务写作、商务演示、表达与演说。 金字塔原理的基本结构是:中心思想...
  • 资源下载 下面是《Android Studio开发实战 从零基础到App上线》(第一版)一书用到的工具和代码资源: 1、本书使用的Android Studio版本为2.2.3,因为Android官网现在不提供该版本的下载,所以博主把该版本的64位...
  • RBAC新解 - 基于资源的权限管理

    千次阅读 2017-01-25 23:07:20
    如果你好奇现实世界没有被多个系统使用的基于资源的权限控制框架,你可以了解一下Apache Shiro。它是一个java平台的现代权限管理框架。通过它的权限(Permission)概念,Shiro很好地支持基于资源的权限访问控制。 ...
  • 企业资源规划 ERP

    万次阅读 2007-09-29 16:30:00
    换言之,ERP将企业内部所有资源整合在一起,对采购、生产、成本、库存、分销、运输、财务、人力资源进行规划,从而达到最佳资源组合,取得最佳效益。 企业资源规划 ERP, Enterprise Resource Planning)的合理运用...
  • 程序员与HR博弈之:城府的表达你的兴趣爱好

    万次阅读 多人点赞 2014-05-06 15:57:24
     从广义上来讲,很多公司尤其是大中型公司的招聘,并不是因为急缺某岗位而招人,更多的是人才储备或人力资源建设方面的因素。这时“面试“这个过程就充满了一些技巧性、博弈性。这里我们要讲的是博弈性。说到底,...
  • ArcGIS10.4 全部资源以及安装指南

    万次阅读 2018-01-25 00:00:30
    ArcGIS10.4 全部资源以及安装指南 ...整理一些我遇到的问题,分享这些资源,希望对学习GIS的同学帮助。 主要的步骤分为两步。 资源包:链接:https://pan.baidu.com/s/1dGh3bYL 密码:aoax Ste
  • 基于深度学习和稀疏表达的人脸识别算法 1 利用VGGFace提取人脸特征 2 PCA对人脸特征进行降维 3 稀疏表达的人脸匹配 Code1 介绍本文将介绍一种基于深度学习和稀疏表达的人脸识别算法。首先,利用深度学习框架(VGGFace...
  • 生物信息学数据库资源 {#database}

    万次阅读 2018-11-06 19:54:20
    做数据分析常常会需要用到参考基因组和注释文件,还会需要分析公共数据,了解常见的生物信息学数据库资源也是非常必要的!故本章首先介绍常用的参考基因组和注释文件,然后介绍生物信息常用的数据库资源如:...
  • RESTful API设计系列三:资源

    千次阅读 2017-03-08 22:04:24
    本文在HTTP协议的背景下,介绍了RESTful中的资源包含那些类型的数据;资源与JSON、XML、YAML等格式间的映射规则。作者支持将资源映射称JSON格式。 阅读本文还需要了解HTTP协议,否则很多属于很难理解。
  • 全球蛋白资源数据库UniProt

    万次阅读 2010-03-15 13:21:00
    UniProt 是一个集中收录蛋白质资源并能与其它资源相 互联系的数据库 , 也是目前为止收录蛋白质序列目录最广泛 、 功能注释最全面的一个数据库 。 UniProt 是由欧洲生物信息学研究所(European Bioinformatics ...
  • github上总结的python资源列表

    万次阅读 2016-07-05 13:42:12
    Python 资源大全中文版我想很多程序员应该记得 GitHub 上一个 Awesome - XXX 系列的资源整理。awesome-python 是 vinta 发起维护的 Python 资源列表,内容包括:Web框架、网络爬虫、网络内容提取、模板引擎、...
  • HR必读的五大畅销人力资源管理书籍

    千次阅读 2015-05-10 16:12:42
    想要成为一名专业的HR,除了在靠前辈的指引外,还得多读一些人力资源管理书籍,提升自己的各方面专业技能。下面,小编收集了HR必读的人力资源五大畅销书籍。 书名:《人力资源管理概论》 作者:彭剑锋 推荐...
  • 人力资源主管的素质要求

    千次阅读 2006-10-23 23:38:00
    人力资源主管是战斗在第一线的基层管理人员,是人力资源决策信息的提供者,人力资源管理的这种变化与角色的扮演对人力资源主管的基本素质提出了很高的要求,它要求人力资源主管必须过硬的人格品质、合理的知识结构...
  • Hadoop YARN同时支持内存和CPU两种资源的调度(默认只支持内存,如果想进一步调度CPU,需要自己进行一些配置),本文将介绍YARN是如何对这些资源进行调度和隔离的。 在YARN中,资源管理由ResourceManager和...
  • STM32F7 架构和资源分析

    千次阅读 2019-05-23 10:38:16
    一、STM32F7 资源 当第一眼看到STM32F7的数据手册Datasheet和参考手册Reference manual时!可以说是抑制不住的兴奋!尽管ST所研发的这款基于Cortex-M7的芯片与当初ARM公司发表声明Cortex-M7内核时所声明的无论是在...
  • 这一点非常重要,在经典的两军问题中,消息是由信使传递的,而信使是人,人是军队作战的最重要资源也是最不可靠的资源,比如会叛变…因此每条消息或者确认相互只能派遣一个信使去递送消息,在通信上讲,就是消息不能...
  • 资源下载 下面是《Android Studio开发实战 从零基础到App上线(第2版)》一书用到的工具和代码资源: 1、本书使用的Android Studio版本为3.2,最新的安装包可前往Android官网页面下载。 2、本书使用的Android NDK版本...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 152,739
精华内容 61,095
关键字:

怎么表达自己有资源