精华内容
下载资源
问答
  • yarn作业提交过程

    2018-01-02 13:09:00
    模拟word count yarn提交过程 转载于:https://www.cnblogs.com/flyingcr/p/10326988.html

    模拟word count  yarn提交过程




    转载于:https://www.cnblogs.com/flyingcr/p/10326988.html

    展开全文
  • 根据上一篇学习的MapReduce整体工作机制,然后练习一下写个WorldCount, 惯例写代码之前还是先梳理一下逻辑: 1、先写一个提供给map task用的类, 2、reduce task需要一个类来处理key相同的数据用, public ...

    根据上一篇学习的MapReduce整体工作机制,然后练习一下写个WorldCount,

    惯例写代码之前还是先梳理一下逻辑:

    1、先写一个提供给map task用的类,

    2、reduce task需要一个类来处理key相同的数据用,

    public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    	
    	@Override
    	protected void map(LongWritable key, Text value, Context context)
    			throws IOException, InterruptedException {
    
    		// 切单词
    		String line = value.toString();
    		String[] words = line.split(" ");
    		for(String word:words){
    			context.write(new Text(word), new IntWritable(1));
    		}
    	}
    }

    然后解析一下这个父类Mapper的参数:

    * KEYIN :是map task读取到的数据的key的类型,是一行的起始偏移量Long
    * VALUEIN:是map task读取到的数据的value的类型,是一行的内容String
    * 
    * KEYOUT:是用户的自定义map方法要返回的结果kv数据的key的类型,在wordcount逻辑中,我们需要返回的是单词String
    * VALUEOUT:是用户的自定义map方法要返回的结果kv数据的value的类型,在wordcount逻辑中,我们需要返回的是整数Integer
    * 
    * 但是,在mapreduce中,map产生的数据需要传输给reduce,需要进行序列化和反序列化,而jdk中的原生序列化机制产生的数据量比较冗余,就会导致数据在mapreduce运行过程中传输效率低下
    * 所以,hadoop专门设计了自己的序列化机制,那么,mapreduce中传输的数据类型就必须实现hadoop自己的序列化接口
    * 
    * hadoop为jdk中的常用基本类型Long String Integer Float等数据类型封住了自己的实现了hadoop序列化接口的类型:LongWritable,Text,IntWritable,FloatWritable(自己对应吧)
    
    public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    	
    	
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
    		int count = 0;
    		
    		Iterator<IntWritable> iterator = values.iterator();
    		while(iterator.hasNext()){
    			
    			IntWritable value = iterator.next();
    			count += value.get();
    		}
    		context.write(key, new IntWritable(count));
    	}
    }

    在运行这些程序之前先搭一个yarn集群,之所以要搭这个集群我这里解释一下:

    mapreduce程序应该是在很多机器上并行启动,而且先执行map task,当众多的maptask都处理完自己的数据后,还需要启动众多的reduce task,这个过程如果用用户自己手动调度不太现实,需要一个自动化的调度平台——hadoop中就为运行mapreduce之类的分布式运算程序开发了一个自动化调度平台——YARN

    首先,为你的mapreduce程序开发一个提交job到yarn的客户端类(模板代码):

    1. 描述你的mapreduce程序运行时所需要的一些信息(比如用哪个mapper、reducer、map和reduce输出的kv类型、jar包所在路径、reduce task的数量、输入输出数据的路径)
    2. 将信息和整个工程的jar包一起交给yarn

    然后,将整个工程(yarn客户端类+ mapreduce所有jar和自定义类)打成jar包

    然后,将jar包上传到hadoop集群中的任意一台机器上

    最后,运行jar包中的(YARN客户端类)

    安装yarn集群

    yarn集群中有两个角色:

    主节点:Resource Manager  1台

    从节点:Node Manager   N台

    Resource Manager一般安装在一台专门的机器上

    Node Manager应该与HDFS中的data node重叠在一起

    yarn的安装包已经有了,在hadoop包里就有,现在只需要配置:

    yarn-site.xml

    <property>

    <name>yarn.resourcemanager.hostname</name>

    <value>hdp-04</value>

    </property>

    <property>

    <name>yarn.nodemanager.aux-services</name>

    <value>mapreduce_shuffle</value>

    </property>

    然后复制到每一台机器上

    然后在hdp-04上,修改hadoop的slaves文件,列入要启动nodemanager的机器

    然后将hdp-04到所有机器的免密登陆配置好

    然后,就可以用脚本启动yarn集群:

    不用脚本每起一台:{yarn-daemon.sh start resourcemanager}

    查看剩余内存:free -m

    sbin/start-yarn.sh

    停止:

    sbin/stop-yarn.sh

    启动完成后,可以在windows上用浏览器访问resourcemanager的web端口:

    http://hdp-04:8088

    看resource mananger是否认出了所有的node manager节点

    配置好yarn-site.xml

    <property>

    <name>yarn.nodemanager.resource.memory-mb</name>

    <value>2048</value>               --------->最好在2G以上,因为在maptask和reducetask程序启动之前会启动一个主管程序(yarn app mapreduce am resource mb)

    </property>

    <property>

    <name>yarn.nodemanager.resource.cpu-vcores</name>

    <value>2</value>

    </property>

    以后再看:

    yarn集群部署完就可以把MarReduce程序拿来运行了,先启动job客户端,job客户端会把MapReduce程序jar包发给yarn,job客户端会和yarn交互,写代码:

    * 用于提交mapreduce job的客户端程序
    * 功能:
    *   1、封装本次job运行时所需要的必要参数
    *   2、跟yarn进行交互,将mapreduce程序成功的启动、运行
    public static void main(String[] args) throws Exception {
    		
    		// 在代码中设置JVM系统参数,用于给job对象来获取访问HDFS的用户身份
    		System.setProperty("HADOOP_USER_NAME", "root");
    		
    		
    		Configuration conf = new Configuration();
    		// 1、设置job运行时要访问的默认文件系统
    		conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
    		// 2、设置job提交到哪去运行
    		conf.set("mapreduce.framework.name", "yarn");
    		conf.set("yarn.resourcemanager.hostname", "hdp-01");
    		// 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
    		conf.set("mapreduce.app-submission.cross-platform","true");
    		
    		Job job = Job.getInstance(conf);
    		
    		// 1、封装参数:jar包所在的位置
    		job.setJar("d:/wc.jar");
    		//job.setJarByClass(JobSubmitter.class);
    		
    		// 2、封装参数: 本次job所要调用的Mapper实现类、Reducer实现类
    		job.setMapperClass(WordcountMapper.class);
    		job.setReducerClass(WordcountReducer.class);
    		
    		// 3、封装参数:本次job的Mapper实现类、Reducer实现类产生的结果数据的key、value类型
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		
    		
    		Path output = new Path("/wordcount/output");
    		FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
    		if(fs.exists(output)){
    			fs.delete(output, true);
    		}
    		
    		// 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径
    		FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
    		FileOutputFormat.setOutputPath(job, output);  // 注意:输出路径必须不存在
    		
    		
    		// 5、封装参数:想要启动的reduce task的数量
    		job.setNumReduceTasks(2);
    		
    		// 6、提交job给yarn
    		boolean res = job.waitForCompletion(true);
    		
    		System.exit(res?0:-1);
    		
    	}

     

    展开全文
  • worldcount yarn-cluster集群作业运行 上面写的是一个windows本地的worldcount的代码,当然这种功能简单 代码量少的 也可以直接在spark-shell中直接输scala指令。 但是在项目开发 企业运用中,因为本地的资源有限 ...

    worldcount yarn-cluster集群作业运行

    之前写的是一个windows本地的worldcount的代码,当然这种功能简单 代码量少的 也可以直接在spark-shell中直接输scala指令。

    但是在项目开发 企业运用中,因为本地的资源有限 使得无法发挥出spark的真正优势。因此 在这里 我就spark代码在集群中运行 做一些补充讲述。

    我使用的环境是: idea编译器 jdk1.7 scala 2.10 spark 1.6.0(因为公司服务器普遍搭建的还是cdh5.15集群,上面的spark版本是旧时的1.6.0版本 2.x上面的一些功能不能使用 例如SparkSession Spark.ml包 这里还需要注意一点的是在maven打包时 如果编译打包的环境是jdk1.8有可能会出现打包不成功 这是因为jdk与scala二者版本不兼容导致 建议读者将jdk换成1.7 或者提高scala版本)

    1. 首先搭建idea maven环境 添加相应依赖

    关于idea中配置maven环境之类的,还有什么jdk之类的搭建,南国在这里就不做篇幅说明了。 这些属于基本操作,不熟悉操作的网上有许多资料,比较简单。

    这里我主要给出项目所需要的pom.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>Huawei</groupId>
        <artifactId>Spark</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>1.7</maven.compiler.source>
            <maven.compiler.target>1.7</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <spark.version>1.6.0-cdh5.15.0</spark.version>
            <scala.version>2.10</scala.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <pluginManagement>
                <plugins>
                    <!--  scala-maven-plugin:编译scala程序的Maven插件 -->
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                    </plugin>
                    <!--  maven-compiler-plugin:编译java程序的Maven插件 -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.5.1</version>
                    </plugin>
                </plugins>
            </pluginManagement>
            <plugins>
                <!--  编译scala程序的Maven插件的一些配置参数 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>scala-test-compile</id>
                            <phase>process-test-resources</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!--  编译java程序的Maven插件的一些配置参数 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!--  maven-shade-plugin:打jar包用的Mavne插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

    这里 我强调在pom.xml中比较重要的地方:
    properities:重要的属性 我们可以将所添加依赖所属的版本添加进去
    dependency: 依赖 也就是项目具体所要调用的库文件 包等
    plugins: 插件 这里我主要配置的是打包scala java程序所需要的Maven插件

    2. scala程序编写并打包到集群中运行

    源代码:

    package com.xjh
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by xjh on 2019/7/15.
      */
    object wc {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setAppName("wc")
        val sc=new SparkContext(conf)
        val wc=sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    //    wc.foreach(print)
        wc.saveAsTextFile(args(1))
        sc.stop() //释放资源
      }
    }
    

    注意:写提交到集群中的程序代码最好在放在指定的包中,idea和eclipse不同 程序默认编写在Main下面,程序代码前是没有包名的,这种情况下 当你打包提交到spark-submit中运行时,程序难以正确都找到你想要运行代码的类名,而且 当你项目中的代码越写越多时,就越难以分清。 所以,何不将你的代码放在某个特定包名下了。

    Maven打包:
    如果你代码所在的项目工程之前 已经打包过,首先进行clean
    在这里插入图片描述
    然后点击package进行打包,网上对于在idea中打包的方法有几种,但其他方法过程太过繁琐 啰嗦,直接双击package进行打包
    在这里插入图片描述
    在console中显示BUILD SUCCESS表示打包成功,我们在项目工程的路径中target即可找到你刚才打包成功的jar。 如果显示的BUILDFAILURE则说明打包中出现了报错,努力找原因去把他解决,你一定没问题的!!
    在这里插入图片描述
    在这里插入图片描述
    上传到yarn集群运行
    首先我们查看一下集=集群上的Spark版本及其配置的环境,如下图所示:
    在这里插入图片描述
    我在上图标红的几处 读者细看,关于集群中的配置 可借鉴。

    yarn-client模式主要用于本地测试,client在本地 运行日志可直接在本地看到;
    yarn-cluster真正的集群模式,这也是我们在日常应用中最常用的集群模式。聪明的你可能说 spark不是还有standalone集群模式,是的,但是standalone是spark自身的集群模式,他和其他mapreduce hive zookeeper等常用的大数据服务是分开的。在应用中,我们一般使用yarn作为大数据集群的资源管理,毕竟yarn模式是现在最常用的。

    具体的操作:

    1. 首先将jar传输到linux系统上,这个根据你自己所用的shell远程登录的工具相关搜索一下 就行。传输的原理 就是ftp协议。这里我使用的MobaXterm, 他直接将所传输的文件 直接拖到linux中
      在这里插入图片描述
    2. 确保你的集群正常启动,提前设置好输入 输出路径 在spark-submit中提交作业
      这里 确保你的hadoop saprk服务正常启动
      然后在spark-submit中提交作业
    spark-submit --master yarn --deploy-mode cluster --driver-memory 500m --executor-memory 500m --executor-cores 1 --class com.huawei.xjh.wc /home/x50005784/Spark-1.0-SNAPSHOT.jar hdfs://cscloud-rs-hadoop293.com:8020/user/x50005784/test.txt hdfs://cscloud-rs-hadoop293.com:8020/user/x50005784/output
    

    这里简要说明指令的含义:
    –master 运行模式
    –driver-memory 内存大小
    –executor-memory –executor-cores 执行器设置
    –class 代码所在的类
    后面两个为代码中的两个输入路径
    在这里插入图片描述
    系统运行直到提示SUCCEEDED
    我们还可以在yarn管理资源的8088看到我们在运行的spark作业
    在这里插入图片描述
    3. 登录HDFS查看结果文件
    在这里插入图片描述

    Java程序编写并打包到集群中运行

    scala代码在集群中成功运行之后,实现Java代码的集群运行运行 就变得简单多了。相比较于scala代码而言,Java代码看上去就显得有些累赘 但是现在的企业运用中 前端 后台使用Java的居多,在日常应用中 Java也是必须要掌握的。

    我们在同一项目中新建Java类
    在这里插入图片描述
    编写Java代码:

    package com.huawei.xjh;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    
    
    /**
     * Created by xjh on 2019/7/15.
     */
    public class JavaWC {
        public static void main(String[] args) {
            SparkConf conf=new SparkConf().setAppName("JavaWC");
            JavaSparkContext jsc=new JavaSparkContext(conf);
            final JavaRDD<String> lines=jsc.textFile(args[0]);
    
            JavaRDD<String> wc=lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));
                }
            });
            JavaPairRDD<String,Integer> ones=wc.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s,1);
                }
            });
            JavaPairRDD<String,Integer> counts=ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
            });
            counts.saveAsTextFile(args[1]);
            jsc.stop(); //释放资源
        }
    }
    

    Java代码的逻辑 和上述的scala意义一样,只是代码实现有些不同。所以 我在这里没有写什么注释
    后续的打包过程和上述scala一样,只不过 在spark-submit中记得修改成为Java所在的类和包名。

    展开全文
  • YARN详解

    2018-01-16 17:48:19
    1. YARN架构 1.1 简介 1.1.1 架构 YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等几个组件构成。 YARN总体上仍然是Master/Slave结构,在整个资源管理框架中,...

    1. YARN架构

    1.1 简介

    1.1.1 架构

    yarn

    YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等几个组件构成。

    YARN总体上仍然是Master/Slave结构,在整个资源管理框架中,ResourceManager为Master,NodeManager为Slave,ResourceManager负责对各个NodeManager上的资源进行统一管理和调度。当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster,它负责向ResourceManager申请资源,并要求NodeManger启动可以占用一定资源的任务。由于不同的ApplicationMaster被分布到不同的节点上,因此它们之间不会相互影响。

    1.1.2 Job提交流程

    流程

    1. 用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
    2. ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
    3. ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。
    4. ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
    5. 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
    6. NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
    7. 各个任务通过RPC协议向ApplicationMaster汇报自己的状态和进度,以便让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
    8. 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。

    1.2 组件介绍

    1.2.1 ResourceManager(RM)

    RM 是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager)。

    调度器(Scheduler)

    调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。 
    需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器。

    在Yarn中有三种调度器可以选择:FIFO Scheduler ,Capacity Scheduler,Fair Scheduler。

    1. FIFO Scheduler

      FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。

    2. Capacity Scheduler

      Capacity 调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力。通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。除此之外,队列内部又可以垂直划分,这样一个组织内部的多个成员就可以共享这个队列资源了,在一个队列内部,资源的调度是采用的是先进先出(FIFO)策略。

      在正常的操作中,Capacity调度器不会强制释放Container,当一个队列资源不够用时,这个队列只能获得其它队列释放后的Container资源。当然,我们可以为队列设置一个最大资源使用量,以免这个队列过多的占用空闲资源,导致其它队列无法使用这些空闲资源,这就是”弹性队列”需要权衡的地方。

      配置方法:

      Capacity调度器的配置文件,文件名为capacity-scheduler.xml

      
      # 例如以下队列
      
      root
      ├── prod 40%
      └── dev 60% ~ 75%
        ├── eng 50%
        └── science 50%
         
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

      上面队列配置如下:

      <?xml version="1.0"?>
      
      <configuration>
          <!-- 定义了两个子队列prod和dev -->
          <property>
              <name>yarn.scheduler.capacity.root.queues</name>
              <value>prod, dev</value>
          </property>
      
          <!-- dev队列又被分成了eng和science -->
          <property>
              <name>yarn.scheduler.capacity.root.dev.queues</name>
              <value>eng, science</value>
          </property>
      
          <!-- 队列prod占40%的容量 -->
          <property>
              <name>yarn.scheduler.capacity.root.prod.capacity</name>
              <value>40</value>
          </property>
      
          <!-- 队列dev占60%的容量 -->
          <property>
              <name>yarn.scheduler.capacity.root.dev.capacity</name>
              <value>60</value>
          </property>
      
          <!-- 限制dev的最大资源伸缩比重为75%,所以即使prod队列完全空闲dev也不会占用全部集群资源 -->
          <property>
              <name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
              <value>75</value>
          </property>
      
          <!-- 队列eng占50%的容量,由于没有设置最大值,所以可能占用整个父队列的资源 -->
          <property>
              <name>yarn.scheduler.capacity.root.dev.eng.capacity</name>
              <value>50</value>
          </property>
      
          <!-- 队列science占50%的容量,由于没有设置最大值,所以可能占用整个父队列的资源 -->
          <property>
              <name>yarn.scheduler.capacity.root.dev.science.capacity</name>
              <value>50</value>
          </property>
      </configuration>
         
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45

      Capacity容器除了可以配置队列及其容量外,我们还可以配置一个用户或应用可以分配的最大资源数量、可以同时运行多少应用、队列的ACL认证等。

      在MapReduce中,我们可以通过mapreduce.job.queuename属性指定要用的队列。如果队列不存在,我们在提交任务时就会收到错误。如果我们没有定义任何队列,所有的应用将会放在一个default队列中。

      注意:对于Capacity调度器,我们的队列名必须是队列树中的最后一部分,如果我们使用队列树则不会被识别。即不能写成dev.eng,应该写为eng。

    3. Fair Scheduler

      Fair调度器的设计目标是为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)。举个例子,假设有两个用户A和B,他们分别拥有一个队列。当A启动一个job而B没有任务时,A会获得全部集群资源;当B启动一个job后,A的job会继续运行,不过一会儿之后两个任务会各自获得一半的集群资源。如果此时B再启动第二个job并且其它job还在运行,则它将会和B的第一个job共享B这个队列的资源,也就是B的两个job会用于四分之一的集群资源,而A的job仍然用于集群一半的资源,结果就是资源最终在两个用户之间平等的共享。

      
      # 启用Fair调度器
      
      
      # yarn-site.xml中配置
      
      
      <property>
          <name>yarn.resourcemanager.scheduler.class</name>
          <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
      </property>
      
      
      # 队列的配置
      
      
      # 配置文件为fair-scheduler.xml
      
      
      # 可以通过下面配置修改配置文件路径(yarn-site.xml中)
      
      <property>
          <name>yarn.scheduler.fair.allocation.file</name>
          <value>xxxxx</value>
      </property>
      
      
      # fair-scheduler.xml配置例
      
      <?xml version="1.0"?>
      
      <allocations>
          <!-- 默认调度策略,如果没有配置这项,默认fair -->
          <defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
      
          <queue name="prod">
              <!-- 权重,如果没有配置默认为1 -->
              <weight>40</weight>
              <schedulingPolicy>fifo</schedulingPolicy>
          </queue>
      
          <queue name="dev">
              <weight>60</weight>
              <queue name="eng"/>
              <queue name="science"/>
          </queue>
      
          <!--
          queuePlacementPolicy元素定义规则列表,会逐个尝试直到匹配成功。
          第一个规则specified,则会把应用放到它指定的队列中,若这个应用没有指定队列或队列名不存在,则不匹配这个规则;
          primaryGroup规则会尝试把应用以用户所在的Unix组名命名的队列中,如果没有这个队列,不创建队列转而尝试下一个;
          当前面所有规则不满足时,则触发default规则,把应用放在dev.eng队列中
          -->
          <queuePlacementPolicy>
              <rule name="specified" create="false"/>
              <rule name="primaryGroup" create="false"/>
              <rule name="default" create="dev.eng"/>
          </queuePlacementPolicy>
      </allocations>
         
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59

      抢占(Preemption)

      当一个job提交到一个繁忙集群中的空队列时,job并不会马上执行,而是阻塞直到正在运行的job释放系统资源。为了使提交job的执行时间更具预测性(可以设置等待的超时时间),Fair调度器支持抢占。

      yarn.scheduler.fair.preemption=true启动抢占,如果队列在minimum share preemption timeout指定的时间内未获得最小的资源保障,调度器就会抢占containers。顶级元素为所有队列配置这个超时时间;还可以在元素内配置元素来为某个队列指定超时时间。

      与之类似,如果队列在fair share preemption timeout指定时间内未获得平等的资源的一半(这个比例可以配置),调度器则会进行抢占containers。这个超时时间可以通过顶级元素和元素级元素分别配置所有队列和某个队列的超时时间。上面提到的比例可以通过(配置所有队列)和(配置某个队列)进行配置,默认是0.5。

    应用程序管理器(Applications Manager)

    应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

    1.2.2 NodeManager(NM)

    NodeManager替代了Hadoop v1版本中的TaskTracker,每个节点都会有一个NM,主要功能有:

    1. 为应用程序启动容器,同时确保申请的容器使用的资源不会超过节点上的总资源。
    2. 为task构建容器环境,包括二进制可执行文件,jars等。
    3. 为所在的节点提供了一个管理本地存储资源的简单服务,应用程序可以继续使用本地存储资源即使他没有从RM那申请。比如:MapReduce可以使用该服务程序存储map task的中间输出结果。

    一个NodeManager上面可以运行多个Container,Container之间的资源互相隔离,类似于虚拟机的多个系统一样,各自使用自己分配的资源。NodeManager会启动一个监控进行用来对运行在它上面的Container进行监控,当某个Container占用的资源超过约定的阈值后,NodeManager就会将其杀死。

    1.2.3 ApplicationMaster(AM)

    ApplicationMaster 负责管理应用程序的整个生命周期,每个应用程序都对应一个AM,主要功能有:

    1. 与RM的调度器通讯,协商管理资源分配(用Container表示)。
    2. 与NM合作,在合适的容器中运行对应的task,并监控这些task执行。
    3. 如果container出现故障,AM会重新向调度器申请资源。
    4. 计算应用程序所需的资源量,并转化成调度器可识别的协议。
    5. AM出现故障后,ASM会重启它,而由AM自己从之前保存的应用程序执行状态中恢复应用程序。

    1.2.4 Container

    Container可以说是一个对Application使用资源描述的集合(或容器),可以看做一个可序列化的java对象,封装了一些描述信息。

    Container的一些基本概念和工作流程如下:

    1. Container是YARN中资源的抽象,它封装了某个节点上一定量的资源(CPU和内存两类资源)。它跟Linux Container没有任何关系,仅仅是YARN提出的一个概念(从实现上看,可看做一个可序列化/反序列化的Java类)。
    2. Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster;
    3. Container的运行是由ApplicationMaster向资源所在的NodeManager发起的,Container运行时需提供内部执行的任务命令(可以使用任何命令,比如java、Python、C++进程启动命令均可)以及该命令执行所需的环境变量和外部资源(比如词典文件、可执行文件、 jar包等)。

    另外,一个应用程序所需的Container分为两大类,如下:

    1. 运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的ApplicationMaster所需的资源;
    2. 运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster与NodeManager通信以启动之。

    以上两类Container可能在任意节点上,它们的位置通常而言是随机的,即ApplicationMaster可能与它管理的任务运行在一个节点上。

    1.3 其他服务介绍

    1.3.1 YARN Timeline Server

    以一种通用的方式向YARN Timeline Server上注册应用程序的当前和历史状态,便于存储和检索。它主要有两大职责:

    1. 持久化应用程序的具体信息

      收集并检索应用程序或者框架的具体信息。例如,hadoop MR框架里面的与分片线关系的信息,诸如map tasks、reduce tasks、counters等。应用程序开发者可以在App Master端或者应用程序需的containers中通过TimelineClient将这些信息发送给Timeline Server。

      这些信息都可以通过REST APIs在具体App或者执行框架的UI界面查询到。

    2. 持久化已完成的应用程序的Generic information

      在Application history server中,显然只支持MR框架的job。而在Timeline Server中,Application History只是其中的一个功能。

      Generic information包括:

      • queue-name
      • 用户信息,还有设置在ApplicationSubmissionContext中的信息
      • 运行应用程序的application-attempts列表
      • 关于每个application-attempt的信息
      • 运行在每个application-attempt下的container列表
      • 每个container的信息

    1.3.2 Shared Cache

    1.3.2.1 介绍

    Shared Cache机制的作用在于为Yarn上的应用(application)提供了一种安全可扩展的上传和管理的资源的方式。不同应用之间允许复用上传的资源文件,从而减少集群网络开销以及应用启动开销。

    在SharedCache机制之前,hadoop在Yarn和Mapreduce的层面已有cache资源的机制:

    1. Yarn层面:NodeManager上有localizationService维护一系列cache资源,在该NodeManager上启动的Container允许从cache中读取相关文件。对于cache资源的可见性包括,public(允许被所有用户的应用访问)、private(只允许被文件所有者的应用访问)、application specific(只允许被指定应用访问)。

    2. Mapreduce层面:MR作业通过DistributedCache接口,上传cache资源到HDFS上,mr任务启动时会将cache文件分发到每个任务节点,在mr任务中可以通过DistributedCache接口从节点本地读取到cache文件。

    SharedCache机制

    是在现有机制的基础上实现的更通用的cache资源机制,主要特性有:

    1. 可扩展性: cache资源需要能扩展到所有节点上而不是集中到master节点
    2. 安全性: 需要对cache资源的访问权限进行控制
    3. 容错性: SharedCache服务可能会失败或者起停,但并不能影响服务正常运行
    4. 透明性: SharedCache服务只是服务端的优化,不应该影响现有MR作业或者其他Yarn的应用的代码。

    1.3.2.2 设计原理

    主要有两部分组成:SharedCacheManager(SCM),本地化服务

    1. Shared Cache Manager

      SCM是ShareCache服务的核心节点。他负责维护所有cache资源信息以及与客户端之间的通信。SCM本身是一个独立的进程,可以运行在任意节点上。管理员可以起停SCM服务,并且不会对现有服务产生影响。客户端与SCM的通信只需要通过两个接口

      Path use(String checksum, String appId):客户端向SCM注册应用对某个cache资源的使用。其中checksum是cache资源生成的唯一标识。如果该cache资源已经存在则返回其路径,否则返回null。

      boolean release(String checksum, String appId):客户端向SCM释放应用对某个cache资源的使用。成功则返回true,否则返回false。

      客户端与SCM交互流程如下:

      1. 客户端计算cache资源的checksum
      2. 客户端向SCM调用use(checksum, appId)接口

        • 如果资源已经存在,SCM返回其路径。客户端使用cache资源的路径,并做任务提交。随后进入步骤5
        • 如果资源尚未存在,SCM返回null。继续步骤3。
      3. 上传cache文件到HDFS上的某个路径(路径由应用自身决定)。

      4. Cache资源本地化到各个节点。
      5. 提交应用并使用之前获取到的cache资源路径。
      6. 应用结束后,调用release(checksum, appId)接口注销应用对cache资源的使用。

      SCM Store

      SCM Store在SCM中负责维护cache资源的元数据信息。对于每个cache资源,元数据包括:HDFS文件路径、Checksum值、对该cache资源的引用列表(application列表,每个app包括其id和user名)、该cache资源最近访问时间。

      目前情况下,SCM store中的信息只维护在内存中,后续会持久化到本地磁盘以及zookeeper上。

      SCM Cleaner service

      这个服务负责扫描并清理cache资源数据。这是个后台线程定期扫描,一段时间未被使用的cache资源,会被这个线程清理掉。

    2. 本地化服务

      这里SharedCache本地化服务基于先前的NodeManager的本地化服务。一旦cache资源被本地化,NodeManager会将其添加到shared cache中。默认情况下,appMaster会提交这个资源。

      本地化服务的流程

      1. 资源在NodeManager上进行本地化
      2. 如果该资源需要加入到sharedCache,执行3,否则退出
      3. 计算资源文件的checksum
      4. 上传资源文件到HDFS的某个目录,文件作为一个临时文件
      5. 临时文件重命名为一个特定文件。
      6. 通知SCM该资源文件已经加入到了SharedCache,如果SCM该文件先前已经被上传过(通过checksum),则删除该文件。

    2. 配置详解

    默认配置文件是yarn-default.xml,需要修改的话,在yarn-site.xml添加对应的属性。


    本节的缩写:

    RM: ResourceManager
    AM: ApplicationMaster
    NM: NodeManager 

    2.1 ResourceManager

    选项 默认值 说明
    yarn.resourcemanager.address ${yarn.resourcemanager.hostname}:8032 RM 对客户端暴露的地址。客户端通过该地址向 RM 提交应用程序,杀死应用程序等。
    yarn.resourcemanager.scheduler.address ${yarn.resourcemanager.hostname}:8030 RM 对 AM 暴露的访问地址。AM 通过该地址向RM申请资源、释放资源等。
    yarn.resourcemanager.resource-tracker.address ${yarn.resourcemanager.hostname}:8031 RM 对 NM 暴露的地址。NM 通过该地址向 RM 汇报心跳,领取任务等
    yarn.resourcemanager.admin.address ${yarn.resourcemanager.hostname}:8033 RM 对管理员暴露的访问地址。管理员通过该地址向 RM 发送管理命令等。
    yarn.resourcemanager.webapp.address 默认值:${yarn.resourcemanager.hostname}:8088 RM 对外web ui地址。用户可通过该地址在浏览器中查看集群各类信息。
    yarn.resourcemanager.webapp.https.address ${yarn.resourcemanager.hostname}:8090 同上的https地址
    yarn.resourcemanager.scheduler.class org.apache.hadoop.yarn.server.resourcemanager
    .scheduler.capacity.CapacityScheduler
    启用的资源调度器主类。目前可用的有FIFO、Capacity Scheduler和Fair Scheduler。
    yarn.resourcemanager.resource-tracker.client.thread-count 50 处理来自NodeManager的RPC请求的Handler数目。
    yarn.resourcemanager.scheduler.client.thread-count 50 处理来自ApplicationMaster的RPC请求的Handler数目。
    yarn.scheduler.minimum-allocation-mb 1024 单个容器可申请的最小内存资源量(MB)
    yarn.scheduler.maximum-allocation-mb 8192 单个容器可申请的最大内存资源量(MB)
    yarn.scheduler.minimum-allocation-vcores 1 单个容器可申请的最小虚拟CPU个数
    yarn.scheduler.maximum-allocation-vcores 32 单个容器可申请的最大虚拟CPU个数
    yarn.resourcemanager.nodes.include-path (空) NodeManager黑白名单。如果发现若干个NodeManager存在问题,比如故障率很高,任务运行失败率高,则可以将之加入黑名单中。注意,这两个配置参数可以动态生效。(调用一个refresh命令即可)
    yarn.resourcemanager.nodes.exclude-path (空) 见上
    yarn.resourcemanager.nodemanagers.heartbeat-interval-ms 1000(毫秒) NodeManager心跳间隔

    2.2 NodeManager

    选项 默认值 说明
    yarn.nodemanager.resource.memory-mb 8192 NodeManager总的可用物理内存
    yarn.nodemanager.vmem-pmem-ratio 2.1 每使用1MB物理内存,最多可用的虚拟内存数
    yarn.nodemanager.resource.cpu-vcores 8 NodeManager总的可用虚拟CPU个数
    yarn.nodemanager.local-dirs ${hadoop.tmp.dir}/nm-local-dir 中间结果存放位置,注意,这个参数通常会配置多个目录,已分摊磁盘IO负载
    yarn.nodemanager.log-dirs ${yarn.log.dir}/userlogs 日志存放地址(可配置多个目录)
    yarn.nodemanager.log.retain-seconds 10800(3小时) NodeManager上日志最多存放时间(不启用日志聚集功能时有效)
    yarn.nodemanager.aux-services (空) NodeManager上运行的附属服务。需配置成mapreduce_shuffle,才可运行MapReduce程序
    yarn.nodemanager.webapp.address ${yarn.nodemanager.hostname}:8042  
    yarn.nodemanager.localizer.address ${yarn.nodemanager.hostname}:8040  

    2.3 ResourceManager HA

    选项 默认值 说明
    yarn.resourcemanager.ha.enabled false 是否开启HA模式
    yarn.resourcemanager.cluster-id (空) 集群的Id,elector使用该值确保RM不会作为其它集群的active。
    yarn.resourcemanager.ha.id (空) 节点逻辑id
    yarn.resourcemanager.ha.rm-ids (空) RM逻辑id列表,用逗号分隔
    yarn.resourcemanager.ha.automatic-failover.enabled true 是否启用自动故障转移。默认情况下,在启用HA时,启用自动故障转移。
    yarn.resourcemanager.ha.automatic-failover.embedded true 启用内置的自动故障转移。默认情况下,在启用HA时,启用内置的自动故障转移
    yarn.resourcemanager.hostname.xxx (空) 各节点hostname(xxx为逻辑id,需分别配置,见rm-ids)
    yarn.resourcemanager.recovery.enabled false 是否开启自动恢复
    yarn.resourcemanager.zk-address (空) HA时,ZooKeeper节点列表
    yarn.resourcemanager.store.class org.apache.hadoop.yarn.server
    .resourcemanager.recovery.ZKRMStateStore
    配置RM状态信息存储方式,有MemStore和ZKStore
    yarn.resourcemanager.ha.automatic-failover.zk-base-path /yarn-leader-election ZooKeeper中的路径
    yarn.client.failover-proxy-provider org.apache.hadoop.yarn.client
    .ConfiguredRMFailoverProxyProvider
    Clients, AMs和NMs使用该类故障转移到active RM
    yarn.client.failover-max-attempts (yarn.resourcemanager
    .connect.max-wait.ms)
    FailoverProxyProvider尝试故障转移的最大次数
    yarn.client.failover-sleep-max-ms (yarn.resourcemanager
    .connect.retry-interval.ms)
    故障转移间的最大休眠时间(单位:毫秒)
    yarn.client.failover-retries 0 每个尝试连接到RM的重试次数。
    yarn.client.failover-retries-on-socket-timeouts 0 在socket超时时,每个尝试连接到RM的重试次数。

    2.4 Timeline Server

    选项 默认值 说明
    yarn.timeline-service.enabled false 是否启用timeline service
    yarn.resourcemanager.system-metrics-publisher.enabled false 控制YARN的系统指标是否发布到Timeline Server
    yarn.timeline-service.generic-application-history.enabled false 客户端是否能从timeline history-service检索generic application data。如果为false,则只能通过RM检索
    yarn.timeline-service.hostname 0.0.0.0 timeline service的hostname
    yarn.timeline-service.address ${yarn.timeline-service.hostname}:10200 RPC地址
    yarn.timeline-service.webapp.address ${yarn.timeline-service.hostname}:8188 web地址
    yarn.timeline-service.webapp.https.address ${yarn.timeline-service.hostname}:8190 https web地址
    yarn.timeline-service.leveldb-timeline-store.path ${hadoop.tmp.dir}/yarn/timeline leveldb timeline store存储路径

    2.5 sharedcache

    选项 默认值 说明
    yarn.sharedcache.enabled false 是否启动sharedcache
    yarn.sharedcache.root-dir /sharedcache sharedcache的根目录
    yarn.sharedcache.admin.address 0.0.0.0:8047 管理接口地址
    yarn.sharedcache.webapp.address 0.0.0.0:8788 web ui地址
    yarn.sharedcache.uploader.server.address 0.0.0.0:8046 NM接口地址
    yarn.sharedcache.client-server.address 0.0.0.0:8045 客户端接口地址

    2.6 动态设置

    可以使用如下命令在提交任务时动态设置:

    hadoop jar <jarName> -D mapreduce.map.memory.mb=5120
    或者
    hadoop jar <jarName> -D mapreduce.reduce.memory.mb=5120
     
    • 1
    • 2
    • 3

    3. 应用示例

    3.1 MapReduce On Yarn

    vim mapred-site.xml
    
    <configuration>
      <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
      </property>
    </configuration>
    
    vim yarn-site.xml
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
     
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.2 Spark On Yarn

    在YARN上启动Spark应用有两种模式。在cluster模式下,Spark驱动器(driver)在YARN Application Master中运行(运行于集群中),因此客户端可以在Spark应用启动之后关闭退出。而client模式下,Spark驱动器在客户端进程中,这时的YARN Application Master只用于向YARN申请资源。

    3.2.1 cluster运行

    运行命令

    $ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
    
    # 示例
    $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
        --master yarn \
        --deploy-mode cluster \
        --driver-memory 4g \
        --executor-memory 2g \
        --executor-cores 1 \
        --queue thequeue \
        --jars my-other-jar.jar,my-other-other-jar.jar \
        lib/spark-examples*.jar \
        app_arg1 app_arg2
     
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    执行步骤

    Spark On Yarn

    3.2.2 client运行

    执行命令

    $ ./bin/spark-shell --master yarn --deploy-mode client
     
    • 1

    4. 优化

    基于两方面优化:调度器和内存配置。

    调度器

    根据业务需要选择fair或capacity调度器。同时根据节点物理资源(性能)的高低,可以打标签,例如高配置节点、低配置节点和一般节点。

    内存优化

    依照以下属性计算推荐的配置

    • RAM(Amount of memory)总内存数
    • CORES(Number of CPU cores)CPU内核数
    • DISKS(Number of disks)硬盘数
    每个节点的总内存 系统内存 HBase内存
    4G 1G 1G
    8G 2G 1G
    16G 2G 2G
    24G 4G 4G
    48G 6G 8G
    64G 8G 8G
    72G 8G 8G
    96G 12G 16G
    128G 24G 24G
    256G 32G 32G
    512G 64G 64G

    Container的最大数计算方式:

    min (2*CORES, 1.8*DISKS, (Total available RAM) / MIN_CONTAINER_SIZE)
     
    • 1

    其中MIN_CONTAINER_SIZE是容器的最小内存,可以根据下表获得

    每个节点的总内存 容器最小内存的推荐值
    小于4G 256M
    4~8G 512M
    8~24G 1024M
    大于24G 2048M

    最终容器的内存由下式计算获得:

    RAM-per-Container = max (MIN_CONTAINER_SIZE, (Total Available RAM) / Containers))
     
    • 1

    最后YARN和MR的配置为:

    配置文件 属性
    yarn-site.xml yarn.nodemanager.resource.memory-mb Containers * RAM-per-Container
    yarn-site.xml yarn.scheduler.minimum-allocation-mb RAM-per-Container
    yarn-site.xml yarn.scheduler.maximum-allocation-mb containers * RAM-per-Container
    mapred-site.xml mapreduce.map.memory.mb RAM-per-Container
    mapred-site.xml mapreduce.reduce.memory.mb 2 * RAM-per-Container
    mapred-site.xml mapreduce.map.java.opts 0.8 * RAM-per-Container
    mapred-site.xml mapreduce.reduce.java.opts 0.8 * 2 * RAM-per-Container
    yarn-site.xml (check) yarn.app.mapreduce.am.resource.mb 2 * RAM-per-Container
    yarn-site.xml (check) yarn.app.mapreduce.am.command-opts 0.8 * 2 * RAM-per-Container

    例如:

    集群节点是12核CPU、48G和12块硬盘

    保留内存 = 6 GB 系统使用 + (如果有HBase) 8 GB HBase使用 
    容器最小内存 = 2 GB

    无HBase

    容器数 = min (2 * 12, 1.8 * 12, (48-6)/2) = min (24, 21.6, 21) = 21 
    每个容器的内存 = max (2, (48-6)/21) = max (2, 2) = 2

    属性
    yarn.nodemanager.resource.memory-mb = 21 * 2 = 42 * 1024 MB
    yarn.scheduler.minimum-allocation-mb = 2 * 1024 MB
    yarn.scheduler.maximum-allocation-mb = 21 * 2 = 42 * 1024 MB
    mapreduce.map.memory.mb = 2 * 1024 MB
    mapreduce.reduce.memory.mb = 2 * 2 = 4 * 1024 MB
    mapreduce.map.java.opts = 0.8 * 2 = 1.6 * 1024 MB
    mapreduce.reduce.java.opts = 0.8 * 2 * 2 = 3.2 * 1024 MB
    yarn.app.mapreduce.am.resource.mb = 2 * 2 = 4 * 1024 MB
    yarn.app.mapreduce.am.command-opts = 0.8 * 2 * 2 = 3.2 * 1024 MB

    有HBase

    容器数 = min (2 * 12, 1.8 * 12, (48-6-8)/2) = min (24, 21.6, 17) = 17 
    每个容器的内存 = max (2, (48-6-8)/17) = max (2, 2) = 2

    属性
    yarn.nodemanager.resource.memory-mb = 17 * 2 = 34 * 1024 MB
    yarn.scheduler.minimum-allocation-mb = 2 * 1024 MB
    yarn.scheduler.maximum-allocation-mb = 17 * 2 = 34 * 1024 MB
    mapreduce.map.memory.mb = 2 * 1024 MB
    mapreduce.reduce.memory.mb = 2 * 2 = 4 * 1024 MB
    mapreduce.map.java.opts = 0.8 * 2 = 1.6 * 1024 MB
    mapreduce.reduce.java.opts = 0.8 * 2 * 2 = 3.2 * 1024 MB
    yarn.app.mapreduce.am.resource.mb = 2 * 2 = 4 * 1024 MB
    yarn.app.mapreduce.am.command-opts = 0.8 * 2 * 2 = 3.2 * 1024 MB

    5. Memsos

    5.1 简介

    Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核。Mesos最初是由加州大学伯克利分校的AMPLab开发的,后在Twitter得到广泛使用。

    5.1.1 架构

    Mesos Architecture

    Mesos实现了两级调度架构,它可以管理多种类型的应用程序。第一级调度是Master的守护进程,管理Mesos集群中所有节点上运行的Slave守护进程。集群由物理服务器或虚拟服务器组成,用于运行应用程序的任务,比如Hadoop和MPI作业。第二级调度由被称作Framework的“组件”组成。Framework包括调度器(Scheduler)和执行器(Executor)进程,其中每个节点上都会运行执行器。Mesos能和不同类型的Framework通信,每种Framework由相应的应用集群管理。上图中只展示了Hadoop和MPI两种类型,其它类型的应用程序也有相应的Framework。

    主要组件以及概念:

    • Zookeeper 主要用来实现Master的选举,支持Master的高可用。
    • Master Mesos的主节点,接收Slave和Framework scheduler的注册,分配资源。
    • Slave 从节点,接收master发来的task,调度executor去执行。
    • Framework 比如上图中的Hadoop,MPI就是Framework,包括scheduler,executor两部分。scheduler独立运行,启动后注册到master,接收master发送的Resource Offer消息,来决定是否接受。Executor是给slave调用的,执行framework的task。Mesos内置了CommandExecutor(直接调用shell)和DockerExecutor两种executor,其他的自定义Executor需要提供uri,供slave下载。
    • Task Mesos最主要的工作其实就是分配资源,然后询问scheduler是否可以利用该资源执行Task,scheduler将资源和Task绑定后交由Master发送给指定的Slave执行。Task可以是长生命周期的,也可以使用批量的短生命周期的。

    5.1.2 Mesos流程

    Mesos流程

    1. Slave1 向 Master 报告,有4个CPU和4 GB内存可用
    2. Master 发送一个 Resource Offer 给 Framework1 来描述 Slave1 有多少可用资源
    3. FrameWork1 中的 FW Scheduler会答复 Master,我有两个 Task 需要运行在 Slave1,一个 Task 需要<2个CPU,1 GB内存=””>,另外一个Task需要<1个CPU,2 GB内存=””>
    4. 最后,Master 发送这些 Tasks 给 Slave1。然后,Slave1还有1个CPU和1 GB内存没有使用,所以分配模块可以把这些资源提供给 Framework2

    5.2 Yarn与Memsos对比

    1. 最大的不同点在于他们所采用的scheduler:mesos让framework决定mesos提供的这个资源是否适合该job,从而接受或者拒绝这个资源。而对于yarn来说,决定权在于yarn,是yarn本身(自行替应用程序作主)决定这个资源是否适合该job,对于各种各样的应用程序来说或许这就是个错误的决定(这就是现代人为什么拒绝父母之命媒妁之言而选择自由婚姻的缘故吧)。所以从scaling的角度来说,mesos更scalable。

    2. 其次,yarn是MapReduce进化的产物,yarn从诞生之日起就是为hadoop jobs管理资源的(yarn也开始朝着mesos涉及的领域进军),yarn只为hadoop jobs提供了一个static partitioning。而mesos的设计目标是为各个框架(hadoop、spark、web services等)提供dynamical partitioning,让各个集群框架共用数据中心机器。

    3. myriad项目将让yarn运行在mesos上面。

    6. 参考

    初步掌握Yarn的架构及原理

    Hadoop YARN架构设计要点

    Hadoop Yarn架构解析

    Hadoop Yarn 框架原理及运作机制

    Hadoop On Yarn Mapreduce运行原理与常用数据压缩格式

    yarn-site.xml相关配置参数

    Yarn 调度器Scheduler详解

    Hadoop Yarn内存使用优化配置

    Yarn资源分配性能调优

    Hadoop YARN

    YARN Timeline Server(hadoop2.7.1)——科普篇

    Hadoop中的Shared Cache机制

    Yarn、MR、HBase推荐内存配置

    Mesos 架构以及源码浅析

    深入浅出Mesos(二):Mesos的体系结构和工作流

    Mesos高可用解决方案剖析

    展开全文
  • 启动hive --service metastore 启动 dfs yarn [root@bigdatastorm bin]# ./spark-sql --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --total-executor-cores...
  • Yarn详解

    千次阅读 2016-12-31 17:04:32
    1. Yarn架构1.1 简介1.1.1 架构YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等几个组件构成。YARN总体上仍然是Master/Slave结构,在整个资源管理框架中,ResourceManager为Master,...
  • yarn 命令学习:yarn application

    千次阅读 2016-12-05 11:47:04
    yarn application 说明:打印 application(s) 报告,或者 kill 掉 application,共7个可选项 1、-list 列出所有 application 信息  示例:yarn application -list  打印信息: ...
  • Yarn的详细使用

    2021-08-06 10:10:50
    文章目录Yarn1、常用命令查看任务(yarn application)查看日志(yarn logs)查看尝试运行的任务(yarn applicationattempt)查看容器(yarn container)查看节点状态(yarn node)查看队列(yarn queue)2、生产环境核心参数3、...
  • hadoop yarn jobhistoryserver 配置

    千次阅读 2016-09-08 13:48:40
    hadoop yarn jobhistoryserver 配置
  • yarn核心配置详解

    2021-04-09 23:12:53
    官方配置 yarn-default.xml ResourceManager配置 //配置调度器类型--默认容量调度器 yarn.resourcemanager.scheduler.class ...yarn.resourcemanager.client.thread-count //默认是50,也就是同时可接受50个job NodeMan
  • Yarn on Docker集群方案

    千次阅读 2017-11-15 19:57:34
    数据中心中的应用一般独立部署,为了保证环境隔离与方便管理,保证应用最大资源 数据中心中普遍存在如下问题: ...为了合理利用Hadoop yarn的资源,队列间会互相抢占计算资源,造成重要任务阻塞 根据部门申请的机器数
  • hadoop 1.0中RPC的序列化机制是WritableRpcEngine,Yarn RPC采用ProtocolBuffer。(1) 类型结构 (2) 定义RPC协议package com.jackniu.yarnrpc.pb.api;public interface Calculate { int add(int num1, int num2); ...
  • 1. Yarn架构 1.1 简介 1.1.1 架构 YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等几个组件构成。 YARN总体上仍然是Master/Slave结构,在整个资源管理框架中,...
  • Yarn任务调度

    2018-04-23 20:10:48
    圈重点Spark on Yarn :仅仅是将spark作为一个客户端而已Application = diver(进程) + excutor(进程)是一个独立的进程集合diver用于运行main方法,会创建一个,new 一个sparkContext或者sparkSessionexcutor运行...
  • Hadoop - MapReduce on Yarn

    2019-07-16 00:55:21
    1.word count # 启动hdfs和yarn start-dfs.sh start-yarn.sh # 提交任务 hadoop jar /hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar wordcount /logs /example...
  • Yarn 资源管理

    千次阅读 2017-11-07 21:20:24
    一、Yarn 资源管理简述:yarn默认提供了两种调度规则,capacity scheduler和fair scheduler。 现在使用比较多的是capacity scheduler。具体的实现原理和调度源码可以google一下capacity scheduler。 Capacity调度器...
  • Yarn..

    2021-07-25 16:37:22
    第 1 章 Yarn 资源调度器 思考: 1)如何管理集群资源? 2)如何给任务合理分配资源? Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式 的操作系统平台,而 MapReduce 等运算程序...
  • Flink on Yarn部署

    千次阅读 2016-09-22 22:54:42
    环境信息:Hadoop版本:2.6.0Flink版本:1.1.2 快速部署Flink on Yarn环境:比如启动一个有4个TaskManager(每个节点都有4GB堆内存)的Yarn会话:1. 下载Flink的软件包,如flink-1.1.2-bin-hadoop26-scala_2.11....
  • YARN的配置项

    2021-08-04 09:43:00
    这里以Hadoop 2.7为例,如果新版有变更以新版为主。 ResourceManager相关配置参数: yarn.resourcemanager.address:...yarn.resourcemanager.scheduler.address:ResourceManager 对ApplicationMaster暴露的访问地
  • 一个Applciation运行在YARN上的流程为,从YARN Client向ResourceManager提交任务,将Applciation所需资源提交到HDFS中,然后Resource...
  • Yarn_基础库

    2017-12-14 12:42:58
    概述YARN 基础库是其他一切模块的基础,它的设计直接决定了YARN 的稳定性和扩展性,概括起来,YARN 的基础库主要有以下几个。 ❑ Protocol Buffers :Protocol Buffers 是 Google 开源的序列化库,具有平台无关、高...
  • YARN重点知识

    2021-03-18 15:23:18
    Yarn 资源调度器 Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式 的操作系统平台,而 MapReduce 等运算程序则相当于运行于操作系统之上的应用程序。 YARN 主要由 ResourceManager...
  • Hadoop Yarn组件介绍: ResourceManager(RM) NodeManager(NM) ApplicationMaster(AM) Container Yarn-Cluster模式 Spark On Yarn 一、 SparkSubmit 分析 二、转到 Client 三、ApplicationMaster 四、...
  • 大数据技术之YARN

    2021-07-29 11:45:09
    目录大数据技术之Yarn第 1 章 Yarn资源调度器1.1 Yarn 基础架构1.2 Yarn 工作机制1.3 作业提交全过程1.4 Yarn 调度器和调度算法1.4.1 先进先出调度器(FIFO)1.4.2 容量调度器(Capacity Scheduler )1.4.3 公平调度...
  • Yarn常用命令

    2021-08-09 16:09:50
    1、yarn application查看任务 yarn application -list 2021-08-09 16:08:29,196 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.10.103:8032 Total number of applications (application...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 16,107
精华内容 6,442
关键字:

countyarn