精华内容
下载资源
问答
  • Hadoop执行任务过程

    千次阅读 2018-01-16 09:48:18
    jre外依赖的jar包,待处理的数据文件以及输出文件夹的位置和名称client端首先检查此任务输出的文件夹是否存在,然后向JobTracker为此任务申请一个id,然后在hdfs中创建一个对应此任务的文件夹,将这个任务依赖的...
    1. 用户从client端提交一个任务,此任务指定了运行的jar包,除java jre外依赖的jar包,待处理的数据文件以及输出文件夹的位置和名称
    2. client端首先检查此任务输出的文件夹是否存在,然后向JobTracker为此任务申请一个id,然后在hdfs中创建一个对应此任务的文件夹,将这个任务依赖的外部jar包以及任务要运行的jar包放入此任务的文件夹中;然后调用任务的InputFormat中的getSplit方法来计算inputSplit,其中每一个inputSplit是由一个mapper来处理的,inputSplit对象在Hadoop的定义中仅仅有两个属性Length和Locations,用户可以定义自己的inputSplit对象来包含更多的属性。计算完毕的inputSplit文件放到Hdfs中job所对应的folder里面
    3. client端调用Submit方法来提交任务,通过RPC调用JobTrackerSubmit方法,JobTrackerSubmit方法首先生成Hadoop任务,然后将此任务根据此任务的配置来初始化生成对应的MapReduce Task,其中Map Task的数量跟InputSplit的数量相同,而Reduce Task的数量是由用户在配置文件中指定的,如果用户不指定,则默认值为1
    4. 初始化后的MapReduce Task会放到队列中,直到JobTracker收到TaskTracker的心跳,如果JobTracker发现TaskTracker的心跳中声明目前TaskTracker是空闲并且能够执行任务的,那么,JobTracker会将MapperReducer Task分配给TaskTracker执行,TaskTracker收到心跳的回应中如果携带了要执行的Mapper或者Reducer的信息,则TaskTracker会开始执行分配到的Task
    5. TaskTracker执行Mapper的时候,会首先去HDFS中取出相应的Split描述信息和任务依赖的jar包以及任务执行的jar包,然后根据Split信息读取相应的数据文件中的部分。首先调用RecordReader中的getCurrentKey和getCurrentValue读取下一个Key,Value然后将此Key, Value传递给Mapper的Map函数,在Map函数中将其转换成为MapOutputKey,MapOutputValue。
    6. Mapper的输出,如果没有Reducer,会直接排序输出到硬盘上,如果有Reducer,Mapper的输出会首先经过Partitioner的计算,计算输出的Key,Value是要分到哪一个Reducer,然后会首先存储在内存中,如果内存中放不下后,会对这一部分结果进行spill out,在将内存中的数据spill out到硬盘的时候,会调用SortAndSpill对输出结果进行排序,以便保证一个文件中结果是有序的。每次spill out会产生一个文件,这个文件中包含多个partition的数据,当产生多个文件后,会对这多个有序文件进行归并排序,即合并操作。此过程会重复直到Mapper所有的值输出完毕为止,Mapper会一个文件,此文件中会先按partition对元组进行排序,然后相同partition中的元祖,会按照key来进行排序。
    7. Reducer启动之后,会不断从JobTracker返回的heartbeat response中获取每个Mapper是否完成的信息。会启动默认的几个copy线程,默认值是5,从每个Mapper的输出结果中拷贝数据,拷贝数据使用的是Http协议,当从每个Mapper中获取到应该处理的数据之后,会对输入的值进行一个归并排序,然后对每一个输入的Key,Set<Value>应用Reducer中的Reduce函数,输出后的结果会直接写到硬盘上,作为输出结果。

    Comment

    1. JobTracker选择哪一个任务进行初始化是由Scheduler的逻辑实现来决定的,用户可以提供自定义的Scheduler
    2. TaskTracker会启动一个Jvm来执行Mapper或者Reducer,除非用户指定需要重用之前启动的Jvm
    3. 用户可以提供Combiner来对Mapper的输出进行初步的合并,Combiner会在Partitioner之后执行,主要是在Mapper的结果Spillout到文件的时候进行combine操作。
    4. 用户可以提供Partitioner来确定Mapper输出的Key,Value 被分到哪一个Reducer,可以用来防止数据倾斜。
    5. 用户可提供自定义InputFormat来定义如何产生Split
    6. 用户可提供自定义RecordReader来定义如何从输入中产生Key,Value对
    7. 用户可提供自定义OutputFormat和RecordWriter来定义如何输出。
    展开全文
  • hadoop执行jar流程分析

    千次阅读 2015-12-15 09:49:38
    项目要结束了,最近在整理项目的相关文档,之前项目中在用hadoop jar **.jar提交作业时,...在hadoop-env.sh中和mapreduce.application.classpath、yarn.application.classpath将jar都设置进去了,这样在本地执行hadoo
    
    项目要结束了,最近在整理项目的相关文档,之前项目中在用hadoop jar **.jar提交作业时,设置了些公共依赖jar包到CLASSPATH中,这样算子在打包时就不需要把很多jar包再打进去离开 。
    
    在hadoop-env.sh中和mapreduce.application.classpath、yarn.application.classpath将jar都设置进去了,这样在本地执行hadoop jar命令时就就不会报缺少依赖错误,但关于他们具体的工作原理不太清楚了,就着这个机会,就准备好好分析一下hadoop运行原理,这篇先分析hadoop jar提交任务。

    一、命令行hadoop jar *** 命令的hadoop脚本为/usr/local/hadoop
    #!/bin/bash
      # Reference: http://stackoverflow.com/questions/59895/can-a-bash-script-tell-what-directory-its-stored-in
      SOURCE="${BASH_SOURCE[0]}"
      BIN_DIR="$( dirname "$SOURCE" )"
      while [ -h "$SOURCE" ]
      do
        SOURCE="$(readlink "$SOURCE")"
        [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE"
        BIN_DIR="$( cd -P "$( dirname "$SOURCE"  )" && pwd )"
      done
      BIN_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
      LIB_DIR=$BIN_DIR/../lib
    # Autodetect JAVA_HOME if not defined
    . $LIB_DIR/bigtop-utils/bigtop-detect-javahome
    export HADOOP_LIBEXEC_DIR=//$LIB_DIR/hadoop/libexec
    exec $LIB_DIR/hadoop/bin/hadoop "$@"
    监测JAVAHOMR,设定HADOOP_LIBEXEC_DIR变量,执行实际上的hadoop脚本,该脚本位置在/opt/cloudera/parcels/CDH/lib/hadoop/bin/下($LIB_DIR的值为/opt/cloudera/parcels/CDH/lib)

    二、/opt/cloudera/parcels/CDH/lib/hadoop/bin/hadoop脚本执行流程
       1、执行hadoop-config.sh脚本进行相关配置(hadoop-config.sh详细解析请参考另外一篇博客: http://blog.csdn.net/a822631129/article/details/50038883
    (1)、设置hadoop、hdfs、yarn、mapred目录的一些变量
    (2)、设定配置文件目录
    (3)、执行hadoop-env.sh文件,项目中就在这个脚本里将依赖的jar添加进去了(hadoop-env.sh脚本的作用就是设置在执行用户写的mapreduce程序中使用的变量,其中设定CLASSPATH使得在提交节点执行用户程序时能够找到依赖,至于container中使用的默认依赖就要通过其他配置搞定了)
    (4)、设置JAVA_HOME,JAVA_HEAP_SIZE等变量。
    (5)、设置需要加载执行的类CLASS(即CLASSPATH变量,将COMMON、HDFS、YANR、MAPREDUCE、HADOOP_CLASSPATH中的jar都添加上了)和HADOOP_OPTS参数(hadoop.log.dir、hadoop.log.file、hadoop.home.dir、hadoop.root.logger、java.library.path、hadoop.policy.file),
       2、获得用户命令COMMAND,给命令分类,本例是jar。
          确定要执行的CLASS是org.apache.hadoop.util.RunJar
       3、 export CLASSPATH=$CLASSPATH
           exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
           设定CLASSPATH,获取参数,调用java执行类(RunJar)
      /opt/cloudera/parcels/CDH/lib/hadoop/bin/hadoop脚本: 
    # This script runs the hadoop core commands. 
    
    bin=`which $0`
    bin=`dirname ${bin}`
    bin=`cd "$bin"; pwd`
    
    DEFAULT_LIBEXEC_DIR="$bin"/../libexec
    HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
    . $HADOOP_LIBEXEC_DIR/hadoop-config.sh
    
    function print_usage(){
      echo "Usage: hadoop [--config confdir] COMMAND"
      echo "       where COMMAND is one of:"
      echo "  fs                   run a generic filesystem user client"
      echo "  version              print the version"
      echo "  jar <jar>            run a jar file"
      echo "  checknative [-a|-h]  check native hadoop and compression libraries availability"
      echo "  distcp <srcurl> <desturl> copy file or directories recursively"
      echo "  archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive"
      echo "  classpath            prints the class path needed to get the"
      echo "  credential           interact with credential providers"
      echo "                       Hadoop jar and the required libraries"
      echo "  daemonlog            get/set the log level for each daemon"
      echo "  trace                view and modify Hadoop tracing settings"
      echo " or"
      echo "  CLASSNAME            run the class named CLASSNAME"
      echo ""
      echo "Most commands print help when invoked w/o parameters."
    }
    
    if [ $# = 0 ]; then
      print_usage
      exit
    fi
    
    COMMAND=$1
    case $COMMAND in
      # usage flags
      --help|-help|-h)
        print_usage
        exit
        ;;
    
      #hdfs commands
      namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|fetchdt|oiv|dfsgroups|portmap|nfs3)
        echo "DEPRECATED: Use of this script to execute hdfs command is deprecated." 1>&2
        echo "Instead use the hdfs command for it." 1>&2
        echo "" 1>&2
        #try to locate hdfs and if present, delegate to it.  
        shift
        if [ -f "${HADOOP_HDFS_HOME}"/bin/hdfs ]; then
          exec "${HADOOP_HDFS_HOME}"/bin/hdfs ${COMMAND/dfsgroups/groups}  "$@"
        elif [ -f "${HADOOP_PREFIX}"/bin/hdfs ]; then
          exec "${HADOOP_PREFIX}"/bin/hdfs ${COMMAND/dfsgroups/groups} "$@"
        else
          echo "HADOOP_HDFS_HOME not found!"
          exit 1
        fi
        ;;
    
      #mapred commands for backwards compatibility
      pipes|job|queue|mrgroups|mradmin|jobtracker|tasktracker|mrhaadmin|mrzkfc|jobtrackerha)
        echo "DEPRECATED: Use of this script to execute mapred command is deprecated." 1>&2
        echo "Instead use the mapred command for it." 1>&2
        echo "" 1>&2
        #try to locate mapred and if present, delegate to it.
        shift
        if [ -f "${HADOOP_MAPRED_HOME}"/bin/mapred ]; then
          exec "${HADOOP_MAPRED_HOME}"/bin/mapred ${COMMAND/mrgroups/groups} "$@"
        elif [ -f "${HADOOP_PREFIX}"/bin/mapred ]; then
          exec "${HADOOP_PREFIX}"/bin/mapred ${COMMAND/mrgroups/groups} "$@"
        else
          echo "HADOOP_MAPRED_HOME not found!"
          exit 1
        fi
        ;;
    
      #core commands  
      *)
        # the core commands
        if [ "$COMMAND" = "fs" ] ; then
          CLASS=org.apache.hadoop.fs.FsShell
        elif [ "$COMMAND" = "version" ] ; then
          CLASS=org.apache.hadoop.util.VersionInfo
        elif [ "$COMMAND" = "jar" ] ; then
          CLASS=org.apache.hadoop.util.RunJar
        elif [ "$COMMAND" = "key" ] ; then
          CLASS=org.apache.hadoop.crypto.key.KeyShell
        elif [ "$COMMAND" = "checknative" ] ; then
          CLASS=org.apache.hadoop.util.NativeLibraryChecker
        elif [ "$COMMAND" = "distcp" ] ; then
          CLASS=org.apache.hadoop.tools.DistCp
          CLASSPATH=${CLASSPATH}:${TOOL_PATH}
        elif [ "$COMMAND" = "daemonlog" ] ; then
          CLASS=org.apache.hadoop.log.LogLevel
        elif [ "$COMMAND" = "archive" ] ; then
          CLASS=org.apache.hadoop.tools.HadoopArchives
          CLASSPATH=${CLASSPATH}:${TOOL_PATH}
        elif [ "$COMMAND" = "credential" ] ; then
          CLASS=org.apache.hadoop.security.alias.CredentialShell
        elif [ "$COMMAND" = "trace" ] ; then
          CLASS=org.apache.hadoop.tracing.TraceAdmin
        elif [ "$COMMAND" = "classpath" ] ; then
          if [ "$#" -eq 1 ]; then
            # No need to bother starting up a JVM for this simple case.
            echo $CLASSPATH
            exit
          else
            CLASS=org.apache.hadoop.util.Classpath
          fi
        elif [[ "$COMMAND" = -*  ]] ; then
            # class and package names cannot begin with a -
            echo "Error: No command named \`$COMMAND' was found. Perhaps you meant \`hadoop ${COMMAND#-}'"
            exit 1
        else
          CLASS=$COMMAND
        fi
        shift
    
        # Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
        HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
    
        #make sure security appender is turned off
        HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
    
        export CLASSPATH=$CLASSPATH

    三、org.apache.hadoop.util.RunJar执行流程:
    1.通过命令行参数获取程序jar包名,再通过jar包名称获得jar包中主类名称,若是jar包中没有manifest文件,读第二个参数为主类名称
    2.准备运行环境,在hadoop.tmp.dir下创建hadoop-unjar*目录,作为作业执行的工作目录;然后调用unjar方法把jar文件解压到该目录下
    3.调用ClassLoader加载CLASSPATH,把算子jar解压后的内容class、lib等加载到CLASSPATH。
    4.根据java反射机制执行jar包主类的main方法。

     /** Run a Hadoop job jar.  If the main class is not in the jar's manifest,
       * then it must be provided on the command line. */
      public static void main(String[] args) throws Throwable {
    //获取jar包名称,获得jar包中主类名称,若是jar包中没有manifest文件,读第二个参数为主类名称
     String usage = "RunJar jarFile [mainClass] args...";
    
        if (args.length < 1) {
          System.err.println(usage);
          System.exit(-1);
        }
    
        int firstArg = 0;
        String fileName = args[firstArg++];
        File file = new File(fileName);
        if (!file.exists() || !file.isFile()) {
          System.err.println("Not a valid JAR: " + file.getCanonicalPath());
          System.exit(-1);
        }
        String mainClassName = null;
    
        JarFile jarFile;
        try {
          jarFile = new JarFile(fileName);
        } catch(IOException io) {
          throw new IOException("Error opening job jar: " + fileName)
            .initCause(io);
        }
    
        Manifest manifest = jarFile.getManifest();
        if (manifest != null) {
          mainClassName = manifest.getMainAttributes().getValue("Main-Class");
        }
        jarFile.close();
    
        if (mainClassName == null) {
          if (args.length < 2) {
            System.err.println(usage);
            System.exit(-1);
          }
          mainClassName = args[firstArg++];
        }
        mainClassName = mainClassName.replaceAll("/", ".");
    
        File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
        ensureDirectory(tmpDir);
        final File workDir;
        try { 
          workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
        } catch (IOException ioe) {
          // If user has insufficient perms to write to tmpDir, default  
          // "Permission denied" message doesn't specify a filename. 
          System.err.println("Error creating temp dir in hadoop.tmp.dir " + tmpDir + " due to " + ioe.getMessage());
          System.exit(-1);
          return;
        }
    
        if (!workDir.delete()) {
          System.err.println("Delete failed for " + workDir);
          System.exit(-1);
        }
        ensureDirectory(workDir);
    
        ShutdownHookManager.get().addShutdownHook(
          new Runnable() {
            @Override
            public void run() {
              FileUtil.fullyDelete(workDir);
            }
          }, SHUTDOWN_HOOK_PRIORITY);
    
        unJar(file, workDir);
    
        ArrayList<URL> classPath = new ArrayList<URL>();
        classPath.add(new File(workDir+"/").toURI().toURL());
        classPath.add(file.toURI().toURL());
        classPath.add(new File(workDir, "classes/").toURI().toURL());
        File[] libs = new File(workDir, "lib").listFiles();
        if (libs != null) {
          for (int i = 0; i < libs.length; i++) {
            classPath.add(libs[i].toURI().toURL());
          }
        }
        
        ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0]));
    
        Thread.currentThread().setContextClassLoader(loader);
        Class<?> mainClass = Class.forName(mainClassName, true, loader);
        Method main = mainClass.getMethod("main", new Class[] {Array.newInstance(String.class, 0).getClass()});
        String[] newArgs = Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]);
        try {
          main.invoke(null, new Object[] { newArgs });
        } catch (InvocationTargetException e) {
          throw e.getTargetException();
        }
      }

    四、由以上分析可知,在执行hadoop-config.sh脚本时,执行了hadoop-env.sh,就可将hadoop-env.sh中设置的CLASSPATH加载到了执行jar时的环境变量里,而像mapreduce.application.classpath、yarn.application.classpath这两个属性设置的东西在此时却是没有加载的,这个应该是hadoop的container任务会用到,这个问题以后再具体分析。$HADOOP_HOME/bin/hadoop脚本实现了很多hadoop命令,但是还有很多命令是通过$HADOOP_HOME/bin/mapred或$HADOOP_HOME/bin/hdfs来执行的,有兴趣的可以看看这些脚本,这里暂不做分析。本文主要分析了hadoop执行jar文件的流程:通过解析jar文件的到主类,利用java反射机制执行主类的main方法,进而执行相关程序。进而hadoop如何提交job在下文再做介绍。
    展开全文
  • Hadoop 提交任务执行流程总结

    千次阅读 2016-08-21 21:13:52
    用流水线可表示任务执行流程如下: input(k1,v1) -> map -> (k2,v2) -> combine -> shuffle(partitioner) -> sort -> (k2,v2) -> reduce -> (k3,v3) ->output 接着来段大白话,聊一聊: 一,...

    用流水线可表示任务执行流程如下:

    input(k1,v1) -> map -> (k2,v2) -> combine -> shuffle(partitioner) -> sort -> (k2,v2) -> reduce -> (k3,v3) ->output

    接着来段大白话,聊一聊:

    一,input

    (通过FileInputFormat设置),此步主要工作:验证输入形式,调用InputSplit决定map个数,并通过RecordReader用于输入记录;

    二,map

    (把输入的(k1,v1)分割成n个(k2,v2)),此步主要工作: setup初始化map工作例如打开文件,map例如把一行分割成(word,1)的(k,v)形式,用于后面reduce词频统计,cleanup收尾map工作例如关闭文件;

    三,combine

    此步主要工作:对map操作过后的(k2,v2),按键值进行归并,也就是把key值相同的的value归并成一个values_list,此步是在map端进行,但是仅仅做归并操作,并没有多余的操作,目的是不让数据太分散,减少数据传输的网络开销;

    四,shuffle

    (这个词,记了好几次...也怪我记性差~_~!!)此处用partitioner就好记喽,此步主要工作:对combine后的结果进行分区操作,使具有相关性的数据发送到相同reduce节点上,避免具有相关性数据分散在不同reduce节点上,导致reduce计算过程中还得去访问其他reduce节点,降低运算效率;

    五,sort

    此步主要操作:map处理过后的数据在进行reduce操作之前,进行排序操作,其过程为map过后,对分散存储的数据进行快速排序,然后通过归并排序把分散的数据存入一个大文件;

    六,reduce

    对(k2,v2)进行操作,其中v2指value_list,例如词频统计对(value_list数据进行累加),同map包括(setup, reduce, cleanup)

    七,output

    (通过FileOutputFormat设置),此步主要把任务数据存储起来,其中包括recordWriter对输出进行记录
    这一过程中系统参数设置如下:

    点击(此处)折叠或打开

    1. job.setJarByClass(Unique.class);//设置任务执行类
    2. job.setMapperClass(UniMapper.class);//设置map类
    3. job.setCombinerClass(UniReduce.class);//设置reduce类
    4. job.setReducerClass(UniReduce.class);//设置combine类
    5. job.setOutputKeyClass(Text.class);//设置程序输出k类型
    6. job.setOutputValueClass(Text.class);//设置任务输出v类型
    7. FileInputFormat.addInputPath(job, new Path(args[0]));//输入路径
    8. FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径

    参考链接:

    http://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

    展开全文
  • 目录 1 引言  1.1 目的  1.2 读者范围 2 综述 3 代码详细分析 ... 3.1 启动Hadoop集群  3.2 JobTracker启动以及Job的初始化  3.3 TaskTracker启动以及发送Heartbeat  3.4 JobTracker接

    目录

    1 引言

     1.1 目的

     1.2 读者范围

    2 综述

    3 代码详细分析

     3.1 启动Hadoop集群

     3.2 JobTracker启动以及Job的初始化

     3.3 TaskTracker启动以及发送Heartbeat

     3.4 JobTracker接收Heartbeat并向TaskTracker分配任务

     3.5 TaskTracker接收HeartbeatResponse

     3.6 MapReduce任务的运行

     3.6.1 MapTask的运行

     3.6.2 ReduceTask的运行

    4 致谢

     



    1 引言

    1.1 目的

    该文档从源代码的级别剖析了Hadoop0.20.2版本的MapReduce模块的运行原理和流程,对JobTracker、TaskTracker的内部结构和交互流程做了详细介绍。系统地分析了Map程序和Reduce程序运行的原理。读者在阅读之后会对Hadoop MapReduce0.20.2版本源代码有一个大致的认识。

    1.2 读者范围

    如果读者想只是想从原理上更加深入了解Hadoop MapReduce运行机制的话,只需要阅读第2章综述即可,该章节要求读者对HadoopMapReduce模型有系统的了解。

    如果读者想深入了解HadoopMapReduce的源代码,则需阅读该文档第2、3节。阅读第3节需要读者熟练掌握Java语言的基本语法,并且对反射机制、动态代理有一定的了解。同时,还要求读者对于Hadoop HDFS和Hadoop RPC的基本用法有一定的了解。

    另外,属性Hadoop源代码的最好方法是远程调试,有关远程调试的方法请读者去网上自行查阅资料。

     

    2 综述

    Hadoop源代码分为三大模块:MapReduce、HDFS和Hadoop Common。其中MapReduce模块主要实现了MapReduce模型的相关功能;HDFS模块主要实现了HDFS的相关功能;而Hadoop Common主要实现了一些基础功能,比如说RPC、网络通信等。

    在用户使用HadoopMapReduce模型进行并行计算时,用户只需要写好Map函数、Reduce函数,之后调用JobClient将Job提交即可。在JobTracker收到提交的Job之后,便会对Job进行一系列的配置,然后交给TaskTracker进行执行。执行完毕之后,JobTracker会通知JobClient任务完成,并将结果存入HDFS中

     

    如图所示,用户提交Job是通过JobClient类的submitJob()函数实现的。在Hadoop源代码中,一个被提交了的Job由JobInProgress类的一个实例表示。该类封装了表示Job的各种信息,以及Job所需要执行的各种动作。在调用submitJob()函数之后,JobTracker会将作业加入到一个队列中去,这个队列的名字叫做jobInitQueue。然后,在JobTracker中,有一个名为JobQueueTaskScheduler的对象,会不断轮询jobInitQueue队列,一旦发现有新的Job加入,便将其取出,然后将其初始化。

    在Hadoop代码中,一个Task由一个TaskInProgress类的实例表示。该类封装了描述Task所需的各种信息以及Task执行的各种动作。

    TaskTracker自从启动以后,会每隔一段时间向JobTracker发送消息,消息的名称为“Heartbeat”。Heartbeat中包含了该TaskTracker当前的状态以及对Task的请求。JobTracker在收到Heartbeat之后,会检查该heartbeat的里所包含的各种信息,如果发现错误会启动相应的错误处理程序。如果TaskTracker在Heartbeat中添加了对Task的请求,则JobTracker会添加相应的指令在对Heartbeat的回复中。在Hadoop源代码中,JobTracker对TaskTracker的指令称为action,JobTracker对TaskTracker所发送来的Heartbeat的回复消息称为HeartbeatResponse。

    在TaskTracker内部,有一个队列叫做TaskQueue。该中包含了所有新加入的Task。每当TaskTracker收到HeartbeatResponse后,会对其进行检查,如果其中包含了新的Task,便将其加入到TaskQueue中。在TaskTracker内部,有两个线程不断轮询TaskQueue,一个是MapLauncher,另一个是ReduceLauncher。如果发现有新加入的Map任务,MapLauncher便将其取出并且执行。如果是Reduce任务,ReduceLauncher便将其取出执行。

    不论是Map Task还是Reduce Task,当他们被取出之后,都要进行本地化。本地化的意思就是将所有需要的信息,比如需要运行的jar文件、配置文件、输入数据等等,一起拷贝到本地的文件系统。这样做的目的是为了方便任务在某台机器上独立执行。本地化之后,TaskTracker会为每一个task单独创建一个jvm,然后单独运行。等Task运行完之后,TaskTracker会通知JobTracker任务完成,以进行下一步的动作。

    等到所有的Task都完成之后,Job也就完成了,此时JobTracker会通知JobClient工作完成。

    3 代码详细分析

    下面从用户使用Hadoop进行MapReduce计算的过程为线索,详细介绍Task执行的细节,并对Hadoop MapReduce的主要代码进行分析。

    3.1 启动Hadoop集群

    Hadoop集群的启动是通过在Master上运行start-all.sh脚本进行的。运行该脚本之后,Hadoop会配置一系列的环境变量以及其他Hadoop运行所需要的参数,然后在本机运行JobTracker和NameNode。然后通过SSH登录到所有slave机器上,启动TaskTracker和DataNode。

    因为本文只介绍HadoopMapReduce模块,所以NameNode和DataNode的相关知识不再介绍。

    3.2 JobTracker启动以及Job的初始化

    org.apache.hadoop.mapred.JobTracker类实现了Hadoop MapReduce模型的JobTracker的功能,主要负责任务的接受,初始化,调度以及对TaskTracker的监控。

    JobTracker单独作为一个JVM运行,main函数就是启动JobTracker的入口函数。在main函数中,有以下两行非常重要的代码:

    startTracker(new JobConf());

    JobTracker.offerService();

    startTracker函数是一个静态函数,它调用JobTracker的构造函数生成一个JobTracker类的实例,名为result。然后,进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。

    在JobTracker.offerService()中,调用了taskScheduler对象的start()方法。该对象是JobTracker的一个数据成员,类型为TaskScheduler。该类型的提供了一系列接口,使得JobTracker可以对所有提交的job进行初始化以及调度。但是该类型实际上是一个抽象类型,其真正的实现类型为JobQueueTaskScheduler类,所以,taskScheduler.start()方法执行的是JobQueueTaskScheduler类的start方法。

    该方法的详细代码如下:

    public synchronized void start() throwsIOException {

    //调用TaskScheduler.start()方法,实际上没有做任何事情

    super.start();

    //注册一个JobInProgressListerner监听器

    taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener

    );

    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);

    eagerTaskInitializationListener.start();

    taskTrackerManager.addJobInProgressListener(eagerTaskInitializationListener)

    }

    JobQueueTaskScheduler类的start方法主要注册了两个非常重要的监听器:jobQueueJobInProgressListener和eagerTaskInitializationListener。前者是JobQueueJobInProgressListener类的一个实例,该类以先进先出的方式维持一个JobInProgress的队列,并且监听各个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例,该类不断监听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入),则立即调用该实例的initTasks方法,对job进行初始化。

    JobInProgress类的initTasks方法的主要代码如下:

    public synchronized void initTasks() throwsIOException {

    ……

    //从HDFS中读取job.split文件从而生成input splits

    String jobFile = profile.getJobFile();

    Path sysDir = newPath(this.jobtracker.getSystemDir());

    FileSystem fs = sysDir.getFileSystem(conf);

    DataInputStream splitFile =

    fs.open(newPath(conf.get("mapred.job.split.file")));

    JobClient.RawSplit[] splits;

    try {

    splits = JobClient.readSplitFile(splitFile);

    } finally {

     splitFile.close();

    }

    //map task的个数就是input split的个数

     numMapTasks = splits.length;

     //为每个map tasks生成一个TaskInProgress来处理一个input split

     maps = newTaskInProgress[numMapTasks];

     for(inti=0; i < numMapTasks; ++i) {

     inputLength += splits[i].getDataLength();

     maps[i] =new TaskInProgress(jobId, jobFile,

     splits[i],

     jobtracker, conf, this, i);

     }

     /*

    对于map task,将其放入nonRunningMapCache,是一个Map<Node,

    List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input

    split所在的Node上。在此,Node代表一个datanode或者机架或者数据中 

    心。nonRunningMapCache将在JobTracker向TaskTracker分配map task的

    时候使用。

    */

     if(numMapTasks > 0) {

     nonRunningMapCache = createCache(splits,maxLevel);

    }

    //创建reduce task

    this.reduces = new TaskInProgress[numReduceTasks];

     for (int i= 0; i < numReduceTasks; i++) {

     reduces[i]= new TaskInProgress(jobId, jobFile,

     numMapTasks, i,

     jobtracker, conf, this);

     /*reducetask放入nonRunningReduces,其将在JobTracker向TaskTracker

    分配reduce task的时候使用。*/

     nonRunningReduces.add(reduces[i]);

     }

     

     //创建两个cleanup task,一个用来清理map,一个用来清理reduce.

     cleanup =new TaskInProgress[2];

     cleanup[0]= new TaskInProgress(jobId, jobFile, splits[0],

     jobtracker, conf, this, numMapTasks);

     cleanup[0].setJobCleanupTask();

     cleanup[1]= new TaskInProgress(jobId, jobFile, numMapTasks,

     numReduceTasks, jobtracker, conf, this);

     cleanup[1].setJobCleanupTask();

     //创建两个初始化 task,一个初始化map,一个初始化reduce.

     setup =new TaskInProgress[2];

     setup[0] =new TaskInProgress(jobId, jobFile, splits[0],

     jobtracker,conf, this, numMapTasks + 1 );

     setup[0].setJobSetupTask();

     setup[1] =new TaskInProgress(jobId, jobFile, numMapTasks,

     numReduceTasks + 1, jobtracker, conf, this);

     setup[1].setJobSetupTask();

     tasksInited.set(true);//初始化完毕

     ……

    }

    3.3 TaskTracker启动以及发送Heartbeat

    org.apache.hadoop.mapred.TaskTracker类实现了MapReduce模型中TaskTracker的功能。

    TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的。

    Main函数中最重要的语句是:

    new TaskTracker(conf).run();

    其中run函数主要调用了offerService函数:

    State offerService() throws Exception {

      longlastHeartbeat = 0;

     //TaskTracker进行是一直存在的

      while(running && !shuttingDown) {

          ……

          longnow = System.currentTimeMillis();

          //每隔一段时间就向JobTracker发送heartbeat

          longwaitTime = heartbeatInterval - (now - lastHeartbeat);

          if(waitTime > 0) {

           synchronized(finishedCount) {

             if (finishedCount[0] == 0) {

               finishedCount.wait(waitTime);

              }

             finishedCount[0] = 0;

            }

          }

          ……

          //发送Heartbeat到JobTracker,得到response

         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

          ……

         //从Response中得到此TaskTracker需要做的事情

         TaskTrackerAction[] actions = heartbeatResponse.getActions();

          ……

          if(actions != null){

            for(TaskTrackerAction action: actions) {

             if (action instanceof LaunchTaskAction) {

               //如果是运行一个新的Task,则将Action添加到任务队列中

               addToTaskQueue((LaunchTaskAction)action);

              }else if (action instanceof CommitTaskAction) {

              CommitTaskAction commitAction = (CommitTaskAction)action;

               if (!commitResponses.contains(commitAction.getTaskID())) {

                 commitResponses.add(commitAction.getTaskID());

               }

              }else {

               tasksToCleanup.put(action);

              }

            }

          }

      }

      returnState.NORMAL;

    }

    其中transmitHeartBeat函数的作用就是第2章中提到的向JobTracker发送Heartbeat。其主要逻辑如下:

    private HeartbeatResponse transmitHeartBeat(longnow) throws IOException {

      //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息

      booleansendCounters;

      if (now> (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

       sendCounters = true;

       previousUpdate = now;

      }

      else {

       sendCounters = false;

      }

      ……

      //报告给JobTracker,此TaskTracker的当前状态

      if(status == null) {

       synchronized (this) {

         status = new TaskTrackerStatus(taskTrackerName, localHostname,

                                         httpPort,

    cloneAndResetRunningTaskStatuses(

           sendCounters),

           failures,

           maxCurrentMapTasks,

            maxCurrentReduceTasks);

        }

      }

      ……

      //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:

      //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数

    //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

      booleanaskForNewTask;

      longlocalMinSpaceStart;

     synchronized (this) {

       askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||

                       status.countReduceTasks() <maxCurrentReduceTasks)

    && acceptNewTasks;

       localMinSpaceStart = minSpaceStart;

      }

      ……

      //向JobTracker发送heartbeat,这是一个RPC调用

     HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,

    justStarted, askForNewTask,

    heartbeatResponseId);

      ……

      returnheartbeatResponse;

    }

    3.4 JobTracker接收Heartbeat并向TaskTracker分配任务

    当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, booleanacceptNewTasks, short responseId)函数被调用:

    public synchronized HeartbeatResponseheartbeat(TaskTrackerStatus status,

    boolean initialContact, boolean acceptNewTasks,short responseId)

      throws IOException{

      ……

      StringtrackerName = status.getTrackerName();

      ……

      shortnewResponseId = (short)(responseId + 1);

      ……

     HeartbeatResponse response = newHeartbeatResponse(newResponseId, null);

     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

      //如果TaskTracker向JobTracker请求一个task运行

      if(acceptNewTasks) {

       TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);

        if(taskTrackerStatus == null) {

         LOG.warn("Unknown task tracker polling; ignoring: " +trackerName);

        } else{

         //setup和cleanup的task优先级最高

         List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

          if(tasks == null ) {

            //任务调度器分配任务

           tasks = taskScheduler.assignTasks(taskTrackerStatus);

          }

          if(tasks != null) {

            for(Task task : tasks) {

             //将任务放入actions列表,返回给TaskTracker

             expireLaunchingTasks.addNewTask(task.getTaskID());

             actions.add(new LaunchTaskAction(task));

            }

          }

        }

      }

      ……

      intnextInterval = getNextHeartbeatInterval();

     response.setHeartbeatInterval(nextInterval);

     response.setActions(

    actions.toArray(newTaskTrackerAction[actions.size()]));

      ……

      returnresponse;

    }

    默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:

    public synchronized List<Task>assignTasks(TaskTrackerStatus taskTracker)

        throwsIOException {

     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

      intnumTaskTrackers = clusterStatus.getTaskTrackers();

    Collection<JobInProgress> jobQueue

    = jobQueueJobInProgressListener.getJobQueue();

      intmaxCurrentMapTasks = taskTracker.getMaxMapTasks();

      intmaxCurrentReduceTasks = taskTracker.getMaxReduceTasks();

      intnumMaps = taskTracker.countMapTasks();

      intnumReduces = taskTracker.countReduceTasks();

      //计算剩余的map和reduce的工作量:remaining

      intremainingReduceLoad = 0;

      intremainingMapLoad = 0;

     synchronized (jobQueue) {

        for(JobInProgress job : jobQueue) {

          if(job.getStatus().getRunState() == JobStatus.RUNNING) {

            inttotalMapTasks = job.desiredMaps();

            inttotalReduceTasks = job.desiredReduces();

           remainingMapLoad += (totalMapTasks - job.finishedMaps());

           remainingReduceLoad += (totalReduceTasks -job.finishedReduces());

          }

        }

      }

      //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。

      intmaxMapLoad = 0;

      intmaxReduceLoad = 0;

      if(numTaskTrackers > 0) {

       maxMapLoad = Math.min(maxCurrentMapTasks,

      (int)Math.ceil((double) remainingMapLoad numTaskTrackers));

       maxReduceLoad = Math.min(maxCurrentReduceTasks,

        (int)Math.ceil((double) remainingReduceLoad

        numTaskTrackers));

      }

      ……

     

      //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task

      if(numMaps < maxMapLoad) {

        inttotalNeededMaps = 0;

       synchronized (jobQueue) {

          for(JobInProgress job : jobQueue) {

            if(job.getStatus().getRunState() != JobStatus.RUNNING) {

             continue;

            }

            Task t = job.obtainNewMapTask(taskTracker,numTaskTrackers,

               taskTrackerManager.getNumberOfUniqueHosts());

            if(t != null) {

             return Collections.singletonList(t);

            }

            ……

          }

        }

      }

      //分配完map task,再分配reduce task

      if(numReduces < maxReduceLoad) {

        inttotalNeededReduces = 0;

       synchronized (jobQueue) {

          for(JobInProgress job : jobQueue) {

            if(job.getStatus().getRunState() != JobStatus.RUNNING ||

               job.numReduceTasks == 0) {

              continue;

            }

           Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,

               taskTrackerManager.getNumberOfUniqueHosts());

            if(t != null) {

             return Collections.singletonList(t);

            }

            ……

          }

        }

      }

      returnnull;

    }

    从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。

    3.5 TaskTracker接收HeartbeatResponse

    在向JobTracker发送heartbeat后,如果返回的reponse中含有分配好的任务LaunchTaskAction,TaskTracker则调用addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者ReduceLauncher对象的taskToLaunch队列。在此,MapLauncher和ReduceLauncher对象均为TaskLauncher类的实例。该类是TaskTracker类的一个内部类,具有一个数据成员,是TaskTracker.TaskInProgress类型的队列。在此特别注意,在TaskTracker类内部所提到的TaskInProgress类均为TaskTracker的内部类,我们用TaskTracker.TaskInProgress表示,一定要和MapRed包中的TaskInProgress类区分,后者我们直接用TaskInProgress表示。如果应答包中包含的任务是map task则放入mapLancher的taskToLaunch队列,如果是reduce task则放入reduceLancher的taskToLaunch队列:

    private void addToTaskQueue(LaunchTaskActionaction) {

      if(action.getTask().isMapTask()) {

       mapLauncher.addToTaskQueue(action);

      } else {

       reduceLauncher.addToTaskQueue(action);

      }

    }

    TaskLauncher类的addToTaskQueue方法代码如下:

    private TaskInProgress registerTask(LaunchTaskAction action,

          TaskLauncher launcher) {

             //从action中获取Task对象

        Task t = action.getTask();  

        LOG.info("LaunchTaskAction(registerTask): " + t.getTaskID() +

                 " task's state:" + t.getState());

        //生成TaskTracker.TaskInProgress对象

        TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);

        synchronized(this){

          /*在相应的数据结构中增加所生成的TaskTracker.TaskInProgress对

            象,以通知程序其他部分该任务的建立*/

          tasks.put(t.getTaskID(),tip);

          runningTasks.put(t.getTaskID(),tip);

          boolean isMap =t.isMapTask();

          if (isMap) {

            mapTotal++;

          } else {

            reduceTotal++;

          }

        }

        return tip;

      }

    同时,TaskLauncher类继承了Thread类,所以在程序运行过程中,它们各自都以一个线程独立运行。它们的启动在TaskTracker初始化过程中已经完成。该类的run函数就是不断监测taskToLaunch队列中是否有新的TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgresstip),该函数的工作就是第二节中提到的本地化。该函数代码如下:

    private void localizeJob(TaskInProgress tip)throws IOException {

      //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar

      PathlocalJarFile = null;

      Task t =tip.getTask();

      JobIDjobId = t.getJobID();

      PathjobFile = new Path(t.getJobFile());

      ……

      PathlocalJobFile = lDirAlloc.getLocalPathForWrite(

                      getLocalJobDir(jobId.toString())

                     + Path.SEPARATOR + "job.xml",

                      jobFileSize, fConf);

     RunningJob rjob = addTaskToJob(jobId, tip);

     synchronized (rjob) {

        if(!rjob.localized) {

         FileSystem localFs = FileSystem.getLocal(fConf);

          PathjobDir = localJobFile.getParent();

          ……

          //将job.split拷贝到本地

         systemFS.copyToLocalFile(jobFile, localJobFile);

         JobConf localJobConf = new JobConf(localJobFile);

          PathworkDir = lDirAlloc.getLocalPathForWrite(

                          (getLocalJobDir(jobId.toString())

                           + Path.SEPARATOR +"work"), fConf);

          if(!localFs.mkdirs(workDir)) {

           throw new IOException("Mkdirs failed to create "

                        + workDir.toString());

          }

         System.setProperty("job.local.dir", workDir.toString());

         localJobConf.set("job.local.dir", workDir.toString());

          //copy Jar file to the local FS and unjar it.

         String jarFile = localJobConf.getJar();

          longjarFileSize = -1;

          if(jarFile != null) {

           Path jarFilePath = new Path(jarFile);

           localJarFile = new Path(lDirAlloc.getLocalPathForWrite(

                                      getLocalJobDir(jobId.toString())

                                       +Path.SEPARATOR + "jars",

                                       5 *jarFileSize, fConf), "job.jar");

            if(!localFs.mkdirs(localJarFile.getParent())) {

             throw new IOException("Mkdirs failed to create jars directory");

            }

            //将job.jar拷贝到本地

           systemFS.copyToLocalFile(jarFilePath, localJarFile);

           localJobConf.setJar(localJarFile.toString());

           //将job得configuration写成job.xml

           OutputStream out = localFs.create(localJobFile);

            try{

             localJobConf.writeXml(out);

            }finally {

             out.close();

            }

            // 解压缩job.jar

           RunJar.unJar(new File(localJarFile.toString()),

                         newFile(localJarFile.getParent().toString()));

          }

         rjob.localized = true;

         rjob.jobConf = localJobConf;

        }

      }

      //真正的启动此Task

     launchTaskForJob(tip, new JobConf(rjob.jobConf));

    }

    当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数:

    public synchronized void launchTask() throwsIOException {

        ……

        //创建task运行目录

       localizeTask(task);

        if(this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

         this.taskStatus.setRunState(TaskStatus.State.RUNNING);

        }

        //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner

       this.runner = task.createRunner(TaskTracker.this, this);

       this.runner.start();

       this.taskStatus.setStartTime(System.currentTimeMillis());

    }

    TaskRunner是抽象类,是Thread类的子类,其run函数如下:

    public final void run() {

        ……

       TaskAttemptID taskid = t.getTaskID();

       LocalDirAllocator lDirAlloc = newLocalDirAllocator("mapred.local.dir");

        FilejobCacheDir = null;

        if(conf.getJar() != null) {

         jobCacheDir = new File(

                            newPath(conf.getJar()).getParent().toString());

        }

        File workDir = newFile(lDirAlloc.getLocalPathToRead(

                                 TaskTracker.getLocalTaskDir(

                                   t.getJobID().toString(),

                                   t.getTaskID().toString(),

                                    t.isTaskCleanupTask())

               + Path.SEPARATOR + MRConstants.WORKDIR,

                                  conf).toString());

       FileSystem fileSystem;

        PathlocalPath;

        ……

        //拼写classpath

        StringbaseDir;

        Stringsep = System.getProperty("path.separator");

       StringBuffer classPath = new StringBuffer();

        //start with same classpath as parent process

       classPath.append(System.getProperty("java.class.path"));

       classPath.append(sep);

        if(!workDir.mkdirs()) {

          if(!workDir.isDirectory()) {

           LOG.fatal("Mkdirs failed to create " + workDir.toString());

          }

        }

        Stringjar = conf.getJar();

        if (jar!= null) {     

          // ifjar exists, it into workDir

         File[] libs = new File(jobCacheDir, "lib").listFiles();

          if(libs != null) {

            for(int i = 0; i < libs.length; i++) {

             classPath.append(sep);         //add libs from jar to classpath

             classPath.append(libs[i]);

            }

          }

         classPath.append(sep);

         classPath.append(new File(jobCacheDir, "classes"));

         classPath.append(sep);

         classPath.append(jobCacheDir);

        }

        ……

       classPath.append(sep);

       classPath.append(workDir);

        //拼写命令行java及其参数

       Vector<String> vargs = new Vector<String>(8);

        Filejvm =

          newFile(new File(System.getProperty("java.home"), "bin"),"java");

       vargs.add(jvm.toString());

        StringjavaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");

       javaOpts = javaOpts.replace("@taskid@", taskid.toString());

        String[] javaOptsSplit = javaOpts.split(" ");

        StringlibraryPath = System.getProperty("java.library.path");

        if(libraryPath == null) {

         libraryPath = workDir.getAbsolutePath();

        } else{

         libraryPath += sep + workDir;

        }

        booleanhasUserLDPath = false;

        for(inti=0; i<javaOptsSplit.length ;i++) {

         if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {

           javaOptsSplit[i] += sep + libraryPath;

            hasUserLDPath = true;

           break;

          }

        }

       if(!hasUserLDPath) {

         vargs.add("-Djava.library.path=" + libraryPath);

        }

        for(int i = 0; i < javaOptsSplit.length; i++) {

         vargs.add(javaOptsSplit[i]);

        }

        //添加Child进程的临时文件夹

        Stringtmp = conf.get("mapred.child.tmp", "./tmp");

        PathtmpDir = new Path(tmp);

        if(!tmpDir.isAbsolute()) {

         tmpDir = new Path(workDir.toString(), tmp);

        }

       FileSystem localFs = FileSystem.getLocal(conf);

        if(!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {

          thrownew IOException("Mkdirs failed to create " + tmpDir.toString());

        }

       vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());

        // Addclasspath.

       vargs.add("-classpath");

       vargs.add(classPath.toString());

        //log文件夹

        longlogSize = TaskLog.getTaskLogLength(conf);

       vargs.add("-Dhadoop.log.dir=" +

            newFile(System.getProperty("hadoop.log.dir")

           ).getAbsolutePath());

       vargs.add("-Dhadoop.root.logger=INFO,TLA");

       vargs.add("-Dhadoop.tasklog.taskid=" + taskid);

       vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);

        // 运行map task和reduce task的子进程的main class是Child

       vargs.add(Child.class.getName()); // main of Child

        ……

        //运行子进程

       jvmManager.launchJvm(this,

           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,

               workDir, env, pidFile, conf));

    }

    在程序运行过程中,实际运行的TaskRunner实例应该是MapTaskRunner或者是ReduceTaskRunner。这两个子类只对TaskRunner进行了简单修改,在此不做赘述。

    在jvmManager.launchJvm()方法中,程序将创建一个新的jvm,来执行新的程序。

    3.6 MapReduce任务的运行

    真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:

    while (true) {

      //从TaskTracker通过网络通信得到JvmTask对象

      JvmTaskmyTask = umbilical.getTask(jvmId);

      ……

     idleLoopCount = 0;

      task =myTask.getTask();

      taskid =task.getTaskID();

      isCleanup= task.isTaskCleanupTask();

      JobConfjob = new JobConf(task.getJobFile());

     TaskRunner.setupWorkDir(job);

     numTasksToExecute = job.getNumTasksToExecutePerJvm();

     task.setConf(job);

      defaultConf.addResource(newPath(task.getJobFile()));

      ……

      //运行task

     task.run(job, umbilical);            // run the task

      if(numTasksToExecute > 0 && ++numTasksExecuted ==

      numTasksToExecute){

        break;

      }

    }

    3.6.1 MapTask的运行

    3.6.1.1 MapTask.run()方法

    如果task是MapTask,则其run函数如下:

    public void run(final JobConf job, finalTaskUmbilicalProtocol umbilical)

        throws IOException,ClassNotFoundException, InterruptedException {

       //负责与TaskTracker的通信,通过该对象可以获得必要的对象

       this.umbilical = umbilical;  

        // 启动Reporter线程,用来和TaskTracker交互目前运行的状态

       TaskReporter reporter = new TaskReporter(getProgress(), umbilical);

       reporter.startCommunicationThread();

        boolean useNewApi =job.getUseNewMapper();

        /*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创

          建commiter,设置工作目录等*/

        initialize(job, getJobID(),reporter, useNewApi);

       /*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方

       法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/

        if(jobCleanup) {

          runJobCleanupTask(umbilical,reporter);

          return;

        }

        if(jobSetup) {

          //主要是创建工作目录的FileSystem对象

          runJobSetupTask(umbilical,reporter);

          return;

        }

        if(taskCleanup) {

          //设置任务目前所处的阶段为结束阶段,并且删除工作目录

          runTaskCleanupTask(umbilical,reporter);

          return;

        }

        //如果不是上述四种类型,则真正运行任务

        if (useNewApi) {

          runNewMapper(job, split, umbilical,reporter);

        } else {

          runOldMapper(job, split, umbilical, reporter);

        }

        done(umbilical, reporter);

      }

    3.6.1.2 MapTask.runNewMapper()方法

    其中,我们只研究运用新API编写程序的情况,所以runOldMapper函数我们将不做考虑。runNewMapper的代码如下:

    private   <INKEY,INVALUE,OUTKEY,OUTVALUE>

        voidrunNewMapper(

                    final JobConf job,

                    final BytesWritable rawSplit,

                    final TaskUmbilicalProtocol umbilical,

                    TaskReporter reporter

      ) throws IOException, ClassNotFoundException, InterruptedException{

      /*TaskAttemptContext类继承于JobContext类,相对于JobContext类增加

    了一些有关task的信息。通过taskContext对象可以获得很多与任务执行相

    关的类,比如用户定义的Mapper类,InputFormat类等等 */

       org.apache.hadoop.mapreduce.TaskAttemptContexttaskContext =

       new org.apache.hadoop.mapreduce.TaskAttemptContext(job,getTaskID());

        //创建用户自定义的Mapper类的实例

       org.apache.hadoop.mapreduce.Mapper

        <INKEY,INVALUE,OUTKEY,OUTVALUE>  mapper=

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(),job);

        // 创建用户指定的InputFormat类的实例

    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat= (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)

     ReflectionUtils.newInstance(taskContext.getInputFormatClass(),job);

        // 重新生成InputSplit

        org.apache.hadoop.mapreduce.InputSplit split =null;

        DataInputBuffer splitBuffer =new DataInputBuffer();

       splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());

        SerializationFactory factory =new SerializationFactory(job);

        Deserializer<? extendsorg.apache.hadoop.mapreduce.InputSplit>

          deserializer =

            (Deserializer<? extendsorg.apache.hadoop.mapreduce.InputSplit>)

            factory.getDeserializer(job.getClassByName(splitClass));

        deserializer.open(splitBuffer);

        split =deserializer.deserialize(null);

      //根据InputFormat对象创建RecordReader对象,默认是LineRecordReader

       org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =

          new NewTrackingRecordReader<INKEY,INVALUE>

             (inputFormat.createRecordReader(split, taskContext), reporter);

       

       job.setBoolean("mapred.skip.on", isSkipping());

    //生成RecordWriter对象

    org.apache.hadoop.mapreduce.RecordWriter output = null;

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context  mapperContext = null;

        try {

         Constructor<org.apache.hadoop.mapreduce.Mapper.Context>

            contextConstructor =

           org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor

            (newClass[]{org.apache.hadoop.mapreduce.Mapper.class,

                        Configuration.class,

                        org.apache.hadoop.mapreduce.TaskAttemptID.class,

                        org.apache.hadoop.mapreduce.RecordReader.class,

                         org.apache.hadoop.mapreduce.RecordWriter.class,

                     org.apache.hadoop.mapreduce.OutputCommitter.class,

                        org.apache.hadoop.mapreduce.StatusReporter.class,

                        org.apache.hadoop.mapreduce.InputSplit.class});

     

          //get an output object

          if(job.getNumReduceTasks() == 0) {

             output = newNewDirectOutputCollector(taskContext, job,

                   umbilical, reporter);

          } else{

           output = new NewOutputCollector(taskContext, job, umbilical,

                  reporter);

          }

     

         mapperContext = contextConstructor.newInstance(mapper, job,

                   getTaskID(), input, output, committer, reporter, split);

          /*初始化,在默认情况下调用的是LineRecordReader的initialize方

           法,主要是打开输入文件并且将文件指针指向文件头*/

         input.initialize(split, mapperContext);

         mapper.run(mapperContext);    //运行真正的Mapper类

         input.close();

         output.close(mapperContext);

        } catch(NoSuchMethodException e) {

          thrownew IOException("Can't find Context constructor", e);

        } catch(InstantiationException e) {

          thrownew IOException("Can't create Context", e);

        } catch(InvocationTargetException e) {

          thrownew IOException("Can't invoke Context constructor", e);

        } catch(IllegalAccessException e) {

          thrownew IOException("Can't invoke Context constructor", e);

        }

      }

    3.6.1.3 Mapper.run()方法

    其中mapper.run方法调用的是Mapper类的run方法。这也是用户要实现map方法所需要继承的类。该类的run方法代码如下:

    public void run(Context context) throws IOException, InterruptedException{

        setup(context);

        while (context.nextKeyValue()){

          map(context.getCurrentKey(),context.getCurrentValue(), context);

        }

        cleanup(context);

      }

    该方法首先调用了setup方法,这个方法在Mapper当中实际上是什么也没有做。用户可重写此方法让程序在执行map函数之前进行一些其他操作。然后,程序将不断获取键值对交给map函数处理,也就是用户所希望进行的操作。之后,程序调用cleanup函数。这个方法和setup一样,也是Mapper类的一个方法,但是实际上什么也没有做。用户可以重写此方法进行一些收尾工作。

    3.6.1.4 Map任务执行序列图


    图 Map任务执行序列图

    3.6.2 ReduceTask的运行

    3.6.2.1 ReduceTask.run()方法

    如果运行的任务是ReduceTask,则其run函数如下:

    public void run(JobConfjob, final TaskUmbilicalProtocol umbilical)

        throws IOException,InterruptedException, ClassNotFoundException {

        this.umbilical = umbilical;

       job.setBoolean("mapred.skip.on", isSkipping());

     

        /*添加reduce过程需要经过的几个阶段。以便通知TaskTracker目前运

         行的情况*/

        if (isMapOrReduce()) {

          copyPhase =getProgress().addPhase("copy");

          sortPhase  = getProgress().addPhase("sort");

          reducePhase =getProgress().addPhase("reduce");

        }

        // 设置并启动reporter进程以便和TaskTracker进行交流

        TaskReporter reporter = newTaskReporter(getProgress(), umbilical);

       reporter.startCommunicationThread();

        boolean useNewApi =job.getUseNewReducer();

         /*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创

          建commiter,设置工作目录等*/

        initialize(job, getJobID(), reporter,useNewApi);

    /*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方

       法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/

        if(jobCleanup) {

         runJobCleanupTask(umbilical, reporter);

         return;

        }

        if(jobSetup) {

          //主要是创建工作目录的FileSystem对象

         runJobSetupTask(umbilical, reporter);

         return;

        }

        if(taskCleanup) {

          //设置任务目前所处的阶段为结束阶段,并且删除工作目录

         runTaskCleanupTask(umbilical, reporter);

         return;

        }

       

        //Initialize the codec

        codec =initCodec();

     

        boolean isLocal ="local".equals(job.get("mapred.job.tracker","local"));

        if (!isLocal) {

         //ReduceCopier对象负责将Map函数的输出拷贝至Reduce所在机器

          reduceCopier = newReduceCopier(umbilical, job, reporter);

          //fetchOutputs函数负责拷贝各个Map函数的输出

          if (!reduceCopier.fetchOutputs()){

           if(reduceCopier.mergeThrowable instanceof FSError) {

              throw(FSError)reduceCopier.mergeThrowable;

            }

            throw newIOException("Task: " + getTaskID() +

                " - The reducecopier failed", reduceCopier.mergeThrowable);

          }

        }

        copyPhase.complete();                // copy is already complete

       setPhase(TaskStatus.Phase.SORT);

        statusUpdate(umbilical);

     

        final FileSystem rfs =FileSystem.getLocal(job).getRaw();

        //根据JobTracker是否在本地来决定调用哪种排序方式

        RawKeyValueIterator rIter =isLocal

          ? Merger.merge(job, rfs,job.getMapOutputKeyClass(),

             job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),

             !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor",100),

              newPath(getTaskID().toString()), job.getOutputKeyComparator(),

              reporter,spilledRecordsCounter, null)

          :reduceCopier.createKVIterator(job, rfs, reporter);

           

        // free up the data structures

        mapOutputFilesOnDisk.clear();

       

        sortPhase.complete();                         // sort is complete

       setPhase(TaskStatus.Phase.REDUCE);

        statusUpdate(umbilical);

        Class keyClass =job.getMapOutputKeyClass();

        Class valueClass =job.getMapOutputValueClass();

        RawComparator comparator =job.getOutputValueGroupingComparator();

     

        if (useNewApi) {

          runNewReducer(job, umbilical,reporter, rIter, comparator,

                        keyClass,valueClass);

       } else {

          runOldReducer(job, umbilical,reporter, rIter, comparator,

                        keyClass,valueClass);

        }

        done(umbilical, reporter);

      }

    3.6.2.2 ReduceTask.runNewReducer()方法

    同样,在此我们只考虑当用户用新的API编写程序时的情况。所以我们只关注runNewReducer方法,其代码如下:

    private <INKEY,INVALUE,OUTKEY,OUTVALUE>

      void runNewReducer(JobConfjob,

                         finalTaskUmbilicalProtocol umbilical,

                         final TaskReporterreporter,

                         RawKeyValueIterator rIter,

                         RawComparator<INKEY>comparator,

                         Class<INKEY>keyClass,

                         Class<INVALUE>valueClass

                         ) throwsIOException,InterruptedException,

                                 ClassNotFoundException {

        // wrapvalue iterator to report progress.

        finalRawKeyValueIterator rawIter = rIter;

        rIter =new RawKeyValueIterator() {

         public void close() throws IOException {

           rawIter.close();

          }

         public DataInputBuffer getKey() throws IOException {

           return rawIter.getKey();

          }

         public Progress getProgress() {

           return rawIter.getProgress();

          }

         public DataInputBuffer getValue() throws IOException {

           return rawIter.getValue();

          }

         public boolean next() throws IOException {

           boolean ret = rawIter.next();

           reducePhase.set(rawIter.getProgress().get());

           reporter.progress();

           return ret;

          }

        };

      /*TaskAttemptContext类继承于JobContext类,相对于JobContext类增加

    了一些有关task的信息。通过taskContext对象可以获得很多与任务执行相

    关的类,比如用户定义的Mapper类,InputFormat类等等 */

       org.apache.hadoop.mapreduce.TaskAttemptContexttaskContext =

       neworg.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());

        //创建用户定义的Reduce类的实例

       org.apache.hadoop.mapreduce.Reducer

        <INKEY,INVALUE,OUTKEY,OUTVALUE>  reducer =

    (org.apache.hadoop.mapreduce.Reducer

    <INKEY,INVALUE,OUTKEY,OUTVALUE>)

           ReflectionUtils.newInstance(taskContext.getReducerClass(), job);

        //创建用户指定的RecordWriter

       org.apache.hadoop.mapreduce.RecordWriter

    <OUTKEY,OUTVALUE> output =

    (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)

           outputFormat.getRecordWriter(taskContext);

    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>

    trackedRW =

    new NewTrackingRecordWriter<OUTKEY,OUTVALUE>

    (output, reduceOutputCounter);

       job.setBoolean("mapred.skip.on", isSkipping());

       org.apache.hadoop.mapreduce.Reducer.Context

            reducerContext = createReduceContext(reducer, job, getTaskID(),

             rIter,reduceInputKeyCounter,

             reduceInputValueCounter,

             trackedRW, committer,

             reporter, comparator, keyClass,

           valueClass);

       reducer.run(reducerContext);

       output.close(reducerContext);

      }

    3.6.2.3 reducer.run()方法

    其中,reducer的run函数如下:

    public void run(Context context) throws IOException, InterruptedException{

        setup(context);

        while (context.nextKey()) {

         reduce(context.getCurrentKey(), context.getValues(), context);

        }

        cleanup(context);

      }

    该函数先调用setup函数,该函数默认是什么都不做,但是用户可以通过重写此函数来在运行reduce函数之前做一些初始化工作。然后程序会不断读取输入数据,交给reduce函数处理。这里的reduce函数就是用户所写的reduce函数。最后调用cleanup函数。默认的cleanup函数是没有做任何事情,但是用户可以通过重写此函数来进行一些收尾工作。

    3.6.2.4 Reduce任务执行序列图


    图 Reduce任务执行序列图

     

     

    4 致谢

    作者是在读了“觉先”的博客《Hadoop学习总结之四:Map-Reduce的过程解析》之后才从宏观上了解Hadoop MapReduce模块的工作原理,并且以此为蓝本,写出了本文。所以,在此向“觉先”表示敬意。另外本文当中可能有很多地方直接引用前述博文,在此特别声明,文中就不一一标注了

    展开全文
  • Hadoop之MapReduce任务执行流程 图中名词的解析 1.job 表示一个MapReduce作业,负责监控作业的运行状态,它维护了一个作业的状态机,以实现异步执行各种作业相关操作 2.Task 表示一个MapReduce作业的某个任务,...
  • hadoop mapreduce执行流程

    2012-09-17 16:22:50
    100台hadoop机器(准确地说应该是tasktracker机),默认block大小为64M,这样每台执行map的文件刚好是一个64M的block文件(假设这个分发过程已经完成,同时忽略备份数之类的细节),并且我们使用10个reduce任务来...
  • hadoop作业执行流程及代码简略解读

    千次阅读 2013-10-22 16:00:09
    hadoop作业执行流程及代码简略解读 本文:参考了网上的博文。出处也不知是哪里,不好意思。最近整理磁盘文档发现的好资料所以整理补充了一下供大家学习参考一下吧。 1.主要组成部分:  Hadoop包括hdfs与...
  • hadoop的mapreduce任务执行流程

    千次阅读 2017-03-23 20:25:19
    hadoop2.x的三大核心:mapreduce 、hdfs以及yarn ,其中核心之一mapreduce,利用了分而治之的思想,Map(映射)和 Reduce(归约),分布式多处理然后进行汇总的思想,比如:清点扑克牌把里面的花色都分开,一个人...
  • 上节介绍了Task节点向Master节点发送心跳信号,从而接收任务,然后部署和启动任务,本节介绍Task节点具体的执行任务的过程。首先,Task节点根据任务的类型,执行MapTask.run()或者ReduceTask.run()这两个方法,下面...
  • 5.JobTracker  JobTracker是在网络... JobTracker类中有一个main()函数,hadoop启动的时候执行此main()函数启动JobTracker进程,main()中生成一个JobTracker的对象,然后通过tracker.offerService()语句启动服务,
  • 1.客户端client向ResourceManager提交Application,ResourceManager接受Application并根据集群资源使用情况选择一台node启动Application任务调度器driver 2.ResourceManager找到那个node,令该node上的nodeManager...
  • hadoop中mapreduce部分执行流程

    千次阅读 2012-03-27 17:24:25
    Hadoop包括hdfs与mapreduce两部分,在试用期期间我主要看了mapreduce部分,即hadoop执行作业的部分。 mapreduce中几个主要的概念  mapreduce整体上可以分为这么几条执行的线索,jobclient,JobTracker与...
  • Hadoop中Reduce任务执行框架

    千次阅读 2011-12-20 19:53:15
    在前面的一系列文章中我主要围绕Hadoop对Map任务执行框架的设计与实现展开了详细的讨论,记得在博文Hadoop中Map任务执行框架中说过还要为大家详细地描述Hadoop对Reduce任务执行框架的设计,那么在本文,我将兑现这...
  • 本文参考自: 原文地址 1、 导入数据对需分析的数据进行分片,片的大小默认与 datanode 块大小相同。 2、 每个数据片由一个 mapper 进行分析,mapper 按照需求将数据拆分为一个个 keyvalue 格式的数据。...
  • hadoop任务执行过程

    千次阅读 2016-02-11 12:01:47
    3.1、任务提交 JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。 向JobTracker请求一个新的job ID检测此job的output配置计算此job的input splits将Job运行所需的资源拷贝到JobTracker的文件...
  • 请求资源到位以后,协商NodeManager启动对应的container,并调度任务执行,监控任务运行状态,通过心跳定时向RM报告任务信息。 3、NodeManager 每台机器对应一个NodeManager,对 container 负责,监控所在机器上...
  • Hadoop安装配置流程

    千次阅读 2011-03-23 23:33:00
    Apache的hadoop安装部署配置详细过程
  • Hadoop执行过程

    2013-02-28 10:32:58
    根据Hadoop++论文的描述,Hadoop执行过程分为Load、Map、Shuffle、Reduce这四个阶段,可以看成是一个由split、itemize、map、reduce等10个函数或算子组成的DAG。其中每一个函数或算子,都可以提供自定义的
  • Hadoop Yarn工作流程

    2017-07-31 18:50:21
    之前在上课的时候对于Yarn的工作流程听的比较模糊,回去后自己在网上查了查资料,加上自己的理解,整理出该笔记。
  • hadoop简单运行流程

    千次阅读 2015-07-31 00:02:42
    Hadoop集群中分主节点master节点和slave节点,master节点监控slave节点。master和slave之间通过ssh协议进行通信。  master节点上部署有JobTracker和NameNode,当然也可以部署TaskTracker和DataNode。slave节点上...
  • hadoop hdfs 读写流程

    千次阅读 2018-08-31 21:55:39
    开始之前先看看其基本属性,HDFS(Hadoop Distributed File System)是GFS的开源实现。 特点如下: 能够运行在廉价机器上,硬件出错常态,需要具备高容错性 流式数据访问,而不是随机读写 面向大规模数据集,...
  • 源码流程分析3-Task节点管理启动任务 1. 代码执行流程1) TaskTracker的启动的时候会加载所有信息,包括利用RPC获得JobTracker 的RPC变量定义为jobClient;TaskTracker.run()方法会去循环向JobTracke
  • 1、Hadoop MapReduce作业调度 早期的Hadoop使用的FIFO调度器来调度用户提交的 作业。现在主要使用的调度器包括Yahoo公司提出...2、MapReduce执行框架的组件和执行流程  每个TaskTracker节点将从HDFS分布式文件中读取所
  • Hadoop详细的流程

    2015-05-19 21:03:14
    MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的...
  • Hadoop Map/Reduce执行流程详解

    千次阅读 2017-07-12 11:24:56
    Hadoop Map/Reduce执行流程详解 转载Map/Reduce 一个Map/Reduce 作业(job) 通常会把输入的数据(input file)切分为若干独立的数据块(splits),然后由 map任务(task)以完全并行的方式处理它们。Map/...
  • 刚刚看到一篇文章对 hadoop1 和 hadoop 2 做了一个...从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 34,613
精华内容 13,845
关键字:

hadoop执行任务流程