MapReduce从提交任务到执行的整理流程大概如下图所示: 简单的说MapReduce提交任务有大概几个过程:
1、首先JobClient首先上传必要的文件,如执行的jar包,split信息,DistributedCache等,随后将任务提交给JobTracker
2、JobTracker接受任务后,在后台不加锁实例化任务,并等待TaskTracker心跳接受任务
3、TaskTracker根据JobTracker得调度器获得需要执行的任务
4、TaskTracker本地化任务并执行MapReduce任务
下面详细的看一下提交过程: 比如最简单的wordcount,客户端代码是:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount "); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } |
提交的代码是job.waitForCompletion(true),实际上底层调用的是JobClient的submitJobInternal,步骤如下:
1、得到stagingDirectory暂存目录,将用户jar包,libjar,distributed cache等拷入到该目录下
2、将split信息写入到staging目录只能中,split是根据InputFormat来进行切分的
private int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<!--?,?--> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; } |
3、将所有conf写入到xml文件中,将任务提交给JobTracker
jobCopy.writeXml(out); |