MapReduce在Hadoop1.x流程 —— JobClient提交过程

      MapReduce从提交任务到执行的整理流程大概如下图所示: client1 简单的说MapReduce提交任务有大概几个过程:

      1、首先JobClient首先上传必要的文件,如执行的jar包,split信息,DistributedCache等,随后将任务提交给JobTracker

      2、JobTracker接受任务后,在后台不加锁实例化任务,并等待TaskTracker心跳接受任务

      3、TaskTracker根据JobTracker得调度器获得需要执行的任务

      4、TaskTracker本地化任务并执行MapReduce任务

      下面详细的看一下提交过程: 比如最简单的wordcount,客户端代码是:

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

      提交的代码是job.waitForCompletion(true),实际上底层调用的是JobClient的submitJobInternal,步骤如下:

      1、得到stagingDirectory暂存目录,将用户jar包,libjar,distributed cache等拷入到该目录下

      2、将split信息写入到staging目录只能中,split是根据InputFormat来进行切分的

 private 
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<!--?,?--> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 
    List splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

3、将所有conf写入到xml文件中,将任务提交给JobTracker

jobCopy.writeXml(out);