YARN MRAppMaster与Scheduler流程说明(一)

基本说明

Hadoop基本上可以分为计算层与存储层, 这篇文章详细说明一下计算层YARN的一些基本流程,概念。其中包括任务提交的过程,调度过程,container运行过程等方面。
下图是Apache社区中YARN的一个基本流程图:
YARN流程
基本过程可以概述为以下部分:
1.Client提交任务
2.ResourceManager接受任务,生成相应结构
3.RM状态机转换,通过Scheduler找到能够运行MRAppmaster的NodeManager
4.ApplicationMasterLauncher向NM发送RPC运行MRAppmaster
5.Node Manager update向RM汇报运行情况,Scheduler调度需要在该Node上运行的container,MRAppmaster通过heartbeat向RM汇报以及fairscheduler根据请求资源情况,返回给MRAppmaster可以运行的container信息
6.MRAppmaster向NM发送RPC运行container
7.NM获得运行时,经过本地化,运行该container

状态机与异步

要想详细说明YARN中的流程,就必须要对其中的一些设计思想进行说明,那就必须要了解YARN中状态机的转变以及异步执行。
了解JobTracker代码的人都应该知道,至于JobTracker扩展的重要因素是JobTracker中记录了全量信息,包括所有Job,Job中的每个MapTask以及ReduceTask的状态,并且所有的执行都是串行执行,大大降低了调度效率。为此YARN首先把Job的职责分为两部分,MRAppmaster负责了task管理,ResourceManager负责调度与节点管理。同时,YARN中大量使用了事件驱动模型,都是通过底层的异步dispatcher来将各个module进行解耦,最大降低了由于串行带来的瓶颈。
此外,YARN与MRV1最大的不同之处就在于状态机的引用,状态机+事件驱动使得YARN的内部调度效率大大提高,但是也带来了代码阅读的极大不方便。下面结合一些代码说明一下如何阅读相应的部分:

  public StateMachineFactory
             <OPERAND, STATE, EVENTTYPE, EVENT>
          addTransition(STATE preState, STATE postState,
                        EVENTTYPE eventType,
                        SingleArcTransition<OPERAND, EVENT> hook){
    return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
        (this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
           (preState, eventType, new SingleInternalArc(postState, hook)));
  }

基本上YARN中的状态机都是调用这一接口,他的意思是preState是原始状态,postState是变更后状态,eventType是触发状态转变的事件,hook是状态转变后需要执行的钩子。基本上我们只要关注eventType,就能找到需要执行的hook,然后记录下postState即可。而每一种eventType都注册到一个handler进行处理。比如下面看一下RMAppImpl代码:

    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())

上面这一过程就是由RMAppEventType.START事件触发,状态由RMAppState.NEW转变为RMAppState.NEW_SAVING,同时执行RMAppNewlySavingTransition函数。

  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
 
      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

调用的是storeNewApplication(app)方法:

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

可以看出在这一步向disptacher发送了一个event,dispatcher将相应event发送到handler处理,至此流程流动了起来。
下面从作业执行的流程一步步详解。

Client提交任务

因为YARN支持多种任务提交,不仅局限与Map Reduce框架,同时支持Spark,甚至自己写的框架(例如我们开发并在线上一直运行的OLS实时系统),每个框架都需要自己写自己的Client端。这里从简单出发,分析MR framework,有时间会分享一下我们所写的OLS框架。
MR Framework使用JobClient来提交任务。流程很简单,首先是将所有jar包,依赖包,split信息,conf等拷贝到hdfs上,随后通过RPC发送submit。submit之前Client会调用YARNRunner对job进行一个封装,方法是createApplicationSubmissionContext,主要的用途就是把任务所有信息,包括任务名称,提交队列,以及MRAppmaster的启动参数进行设置,为下面MRAppmaster在NodeManager的启动进行准备。
封装好ApplicationSubmissionContext后,Client就向Resoucemanager的服务ClientRMService发送RPC请求,提交任务。
至此,Client的工作完成了。总的来说,Client负责的是任务切分,conf各种设置,MRAppmaster启动脚本生成等任务。