YARN MRAppMaster与Scheduler流程说明(三)

接上一篇说明,继续app提交在ResouceManager中的状态机流转。
9.FairScheduler.allocate
allocate方法基本上是申请mrappmaster和mrappmaster启动后向ResourceManager申请container资源使用的。它首先检查app是否在已经注册到scheduler中,然后检查申请资源状态,释放掉相应资源,并加入到AppSchedulingInfo结构中,这些内容会在MRAppmaster的流程中更具体的介绍。随后是获取能够运行appmaster的container资源的方法,代码为:

  public synchronized ContainersAndNMTokensAllocation
      pullNewlyAllocatedContainersAndNMTokens() {
    List<Container> returnContainerList =
        new ArrayList<Container>(newlyAllocatedContainers.size());
    List<NMToken> nmTokens = new ArrayList<NMToken>();
    for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
      .hasNext();) {
      RMContainer rmContainer = i.next();
      Container container = rmContainer.getContainer();
      try {
        // create container token and NMToken altogether.
        container.setContainerToken(rmContext.getContainerTokenSecretManager()
          .createContainerToken(container.getId(), container.getNodeId(),
            getUser(), container.getResource()));
        NMToken nmToken =
            rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
              getApplicationAttemptId(), container);
        if (nmToken != null) {
          nmTokens.add(nmToken);
        }
      } catch (IllegalArgumentException e) {
        // DNS might be down, skip returning this container.
        LOG.error("Error trying to assign container token and NM token to" +
            " an allocated container " + container.getId(), e);
        continue;
      }
      returnContainerList.add(container);
      i.remove();
      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
        RMContainerEventType.ACQUIRED));
    }
    return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
  }

这里需要说明的是由于YARN大部分操作都是通过Distpatcher进行异步操作,NodeManager向ResouceManager汇报心跳的时候会最后触发FairScheduler的NODE_UPDATE事件,NodeManager会汇报最新执行的,执行结束的Container,Scheduler会放到相应的结构中。然后会从父节点开始分配,直到叶子节点,分配的规则会在调度器部门专门写。最后选出合适的队列,并且从队列里面再根据fair规则分配,检查注册的App是否有合适的需要分配,App中的分配是根据优先级进行的,appmaster>reduce>map,然后再根据本地性找合适的container进行分配。分配成功后会将生成一个RMContainer对象,代表一个运行的container,并且加入到newlyAllocatedContainers结构中。

    // Create RMContainer
    RMContainer rmContainer = new RMContainerImpl(container, 
        getApplicationAttemptId(), node.getNodeID(),
        appSchedulingInfo.getUser(), rmContext);
 
    // Add it to allContainers list.
    newlyAllocatedContainers.add(rmContainer);
    liveContainers.put(container.getId(), rmContainer);  
        // Update consumption and track allocations
    appSchedulingInfo.allocate(type, node, priority, request, container);
    Resources.addTo(currentConsumption, container.getResource());
 
    // Inform the container
    rmContainer.handle(
        new RMContainerEvent(container.getId(), RMContainerEventType.START));

并发送RMContainerEventType.START到dispatcher进行处理。
回到allocate方法,将刚才说到的newlyAllcatedContainers取出,并封装ContainersAndNMTokensAllocation返回,相当于调度成功。

10.RMContainerEventType.START
状态机及处理代码为:

    // Transitions from NEW state
    .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED,
        RMContainerEventType.START, new ContainerStartedTransition())
 
 
 
   private static final class ContainerStartedTransition extends
      BaseTransition {
 
    @Override
    public void transition(RMContainerImpl container, RMContainerEvent event) {
      container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
          container.appAttemptId));
    }
  }

就是出发RMAppAttemptEventType.CONTAINER_ALLOCATED给dispatcher调用RMAppimpl进行处理。

11.RMAppAttemptEventType.CONTAINER_ALLOCATED
触发RMAppAttemptEventType.CONTAINER_ALLOCATED事件的方法很多,但是目前为止RMAppimpl状态为RMAppAttemptState.SCHEDULED,通过之前的状态就很容易找到状态机处理函数的入口为:

       // Transitions from SCHEDULED State
      .addTransition(RMAppAttemptState.SCHEDULED,
          EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
            RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.CONTAINER_ALLOCATED,
          new AMContainerAllocatedTransition())

处理函数其实很简单,就是从get第9步获取的container,并返回。如果获取失败重新调度。否则store这个attempt,触发RMStateStoreEventType.STORE_APP_ATTEMPT事件,代码如下:

  public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
    Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
 
    ApplicationAttemptState attemptState =
        new ApplicationAttemptState(appAttempt.getAppAttemptId(),
          appAttempt.getMasterContainer(), credentials,
          appAttempt.getStartTime());
 
    dispatcher.getEventHandler().handle(
      new RMStateStoreAppAttemptEvent(attemptState));
  }

12.RMStateStoreEventType.STORE_APP_ATTEMPT
这个事件很简单,就是根据配置是否保存和保存在哪种介质中,最后触发了RMAppAttemptEventType.ATTEMPT_NEW_SAVED事件,并发送到Dispatcher中。

13.RMAppAttemptEventType.ATTEMPT_NEW_SAVED
这个事件回到RMAppattemptImpl中处理,最后触发AMLauncherEventType.LAUNCH事件。

14.AMLauncherEventType.LAUNCH
这个事件由ApplicationMasterLauncher来处理,ApplicationMasterLauncher是ResouceManger中的一个服务,负责向NodeManager发送请求执行AppMaster。
AMLauncherEventType.LAUNCH的处理函数为

  private void launch(RMAppAttempt application) {
    Runnable launcher = createRunnableLauncher(application, 
        AMLauncherEventType.LAUNCH);
    masterEvents.add(launcher);
  }

masterEvents是一个BlockingQueue,ApplicationMasterLauncher有个线程LauncherThread负责调用生成的Runnable。至于与NodeManager交互放到下一篇中介绍。

YARN MRAppMaster与Scheduler流程说明(二)

上一篇主要介绍了一下YARN的基本概念以及Client的提交流程,这篇文章将会从ResourceManager入手,详解一下提交的流程。

ResouceManger接收任务

ResourceManager通过ClientRMService,首先要解析ApplicationSubmissionContext对象,获得提交的id,同时得到用户和提交的组等相应信息。随后生成一个RMApp,并在ResourceManager中注册,RMApp是Job的抽象,每个Job有一个RMApp与其对应。如果一切检验没问题后,正式提交给ResouceManager。
此后进入了状态机转换的部分,将分阶段说明:
1.RMAppEventType.START

      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.START));

可以看到事件类型是RMAppEvent,再到ResourceManager中找相应代码,看看异步dispatcher注册的handle类是哪个,代码如下:

      // Register event handler for RmAppEvents
      rmDispatcher.register(RMAppEventType.class,
          new ApplicationEventDispatcher(rmContext));

进入ApplicationEventDipatcher看一下代码为

  @Private
  public static final class ApplicationEventDispatcher implements
      EventHandler<RMAppEvent> {
 
    private final RMContext rmContext;
 
    public ApplicationEventDispatcher(RMContext rmContext) {
      this.rmContext = rmContext;
    }
 
    @Override
    public void handle(RMAppEvent event) {
      ApplicationId appID = event.getApplicationId();
      RMApp rmApp = this.rmContext.getRMApps().get(appID);
      if (rmApp != null) {
        try {
          rmApp.handle(event);
        } catch (Throwable t) {
          LOG.error("Error in handling event type " + event.getType()
              + " for application " + appID, t);
        }
      }
    }
  }

这样我们就知道去找RMApp的实现类,RMAppImpl去找状态机改变及hook函数。其实也可以用比较简单的方法,就是在hadoop yarn代码中用grep去定位到包含RMAppEventType.START的类,基本上就八九不离十了。当然知道原来是首先需要掌握的。
随后看一下RMAppImpl的代码,状态机部分:

    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())

这表示状态RMAppEventType.START会触发RMAppNewlySavingTransition的hook函数,同时将RMAppState由NEW转换为NEW_SAVING。
在RMAppNewlySavingTransition的transition函数中调用了storeNewApplication

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

至此,将新的Event重新发到dispatcher中进行处理。

2.RMStateStoreEventType.STORE_APP
RMStateStoreEventType由RMStateStore类来处理,相应的代码如下:

      if(isRecoveryEnabled) {
        recoveryEnabled = true;
        rmStore =  RMStateStoreFactory.getStore(conf);
      } else {
        recoveryEnabled = false;
        rmStore = new NullRMStateStore();
      }
 
      try {
        rmStore.init(conf);
        rmStore.setRMDispatcher(rmDispatcher);
        rmStore.setResourceManager(rm);
      } catch (Exception e) {
        // the Exception from stateStore.init() needs to be handled for
        // HA and we need to give up master status if we got fenced
        LOG.error("Failed to init state store", e);
        throw e;
      }

默认的我们都是不带recovery,重启后需要重跑,因为recovery读大量目录,时间较慢,可以看到RMStateStore注册dispatcher中。
RMStateStore的handle函数,将event推送给handleStoreEvent方法进行处理,代码如下:

      LOG.info("Storing info for app: " + appId);
      try {
        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
          storeApplicationStateInternal(appId, appStateData);
          notifyDoneStoringApplication(appId, storedException);
        } else {
          assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
          updateApplicationStateInternal(appId, appStateData);
          notifyDoneUpdatingApplication(appId, storedException);
        }
      } catch (Exception e) {
        LOG.error("Error storing app: " + appId, e);
        notifyStoreOperationFailed(e);
      }

处理完后会执行notifyDoneStoringApplication方法

  private void notifyDoneStoringApplication(ApplicationId appId,
                                                  Exception storedException) {
    rmDispatcher.getEventHandler().handle(
        new RMAppNewSavedEvent(appId, storedException));
  }

又向dispatcher中推送了一个RMAppNewSavedEvent的事件。

3.RMAppEventType.APP_NEW_SAVED
同1的处理,状态机最后将event推送到RMAppImpl处理,代码如下:

   .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())

处理方式调用AddApplicationToSchedulerTransition

      app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
        app.submissionContext.getQueue(), app.user));

在ResouceManager中注册了AppAddSchedulerEvent的父类SchdulerEventType

      schedulerDispatcher = createSchedulerEventDispatcher();
      addIfService(schedulerDispatcher);
      rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);

Scheduler有自己独立的线程去处理,而不去堵塞ResourceManager的处理线程。由于有多重Scheduler,我这里只介绍我们使用的FairScheduler的流程,其它代码查看基本类似。

4.SchedulerEventType.APP_ADDED
FairScheduler的handle方法对多个事件进行处理,APP_ADDED的处理如下

    case APP_ADDED:
      if (!(event instanceof AppAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      addApplication(appAddedEvent.getApplicationId(),
        appAddedEvent.getQueue(), appAddedEvent.getUser());
      break;

在addAplication方法中,要获得提交时候生成的RMApp对象,提交的用户以及queue的名称,把任务放到queue中用于后面的调度,调用的代码是assignToQueue,这里会对有效性就行验证。随后在FairScheduler中生成SchedulerApplication对象,代表该任务被调度器接收等待调度,并放到Map<ApplicationId, SchedulerApplication> applications结构中。最后还是通过dispatcher将event转发出去。

    rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));

5.RMAppEventType.APP_ACCEPTED
还是通过RMAppImpl来处理APP_ACCEPTED事件。处理方法如下:

  private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }

这一步就是生成一个attempt,每个app每次执行都生成一个新的attempt,并保存在RMApp的内存结构中,attempt可能由于各种原因导致失败,app就会重新启动一个attempt,默认是超过4次,app就fail掉了。最后通过handler将事件发出处理。

6.RMAppAttemptEventType.START
RMAppAttemptEventType.START由刚刚生成的RMAppAttemptImpl来处理,如下代码:

      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())

RMAppAttemptEventType.START触发状态由NEW转化为SUBMITTED,并执行AttemptStartedTransition方法。AttemptStartedTransition方法内需要注意的调用为:

      // Register with the ApplicationMasterService
      appAttempt.masterService
          .registerAppAttempt(appAttempt.applicationAttemptId);
 
      // Add the applicationAttempt to the scheduler and inform the scheduler
      // whether to transfer the state from previous attempt.
      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
    }

主要就是向applicationMaster注册,并且发起调度的事件,期望能够被Scheduler调度。

7.SchedulerEventType.APP_ATTEMPT_ADDED
FairScheduler处理APP_ATTEMPT_ADDED事件。

    SchedulerApplication application =
        applications.get(applicationAttemptId.getApplicationId());
    String user = application.getUser();
    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
 
    FSSchedulerApp attempt =
        new FSSchedulerApp(applicationAttemptId, user,
            queue, new ActiveUsersManager(getRootQueueMetrics()),
            rmContext);
    if (transferStateFromPreviousAttempt) {
      attempt.transferStateFromPreviousAttempt(application
        .getCurrentAppAttempt());
    }
    application.setCurrentAppAttempt(attempt);
 
    boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
 
    RMApp rmApp = rmContext.getRMApps().get(attempt.getApplicationId());
    queue.addApp(attempt, runnable,rmApp.getApplicationSubmissionContext().getPriority());
    if (runnable) {
      maxRunningEnforcer.trackRunnableApp(attempt);
    } else {
      maxRunningEnforcer.trackNonRunnableApp(attempt);
    }
 
    queue.getMetrics().submitAppAttempt(user);
    ClusterMetrics.getMetrics().addPendingApp(attempt.getApplicationId());
 
    LOG.info("Added Application Attempt " + applicationAttemptId
        + " to scheduler from user: " + user + " is isAttemptRecovering : " + isAttemptRecovering);
 
    if (isAttemptRecovering) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(applicationAttemptId
            + " is recovering. Skipping notifying ATTEMPT_ADDED");
      }
    } else {
      rmContext.getDispatcher().getEventHandler().handle(
        new RMAppAttemptEvent(applicationAttemptId,
            RMAppAttemptEventType.ATTEMPT_ADDED));
    }

生成FSSchedulerApp,对应了RMApp中生成的RMAppAttempt。然后根据fair得原则,检查该任务是否可以执行,如果超出queue同时运行的任务数把状态置为目前不可运行,否则就是可运行状态。至此,app处于pending状态,根据队列的情况稍后可以调度还是等待转化为可以调度状态。最后通过dispatcher发送event,RMAppAttemptEventType.ATTEMPT_ADDED通过RMAppAttempt来处理。

8.RMAppAttemptEventType.ATTEMPT_ADDED
RMAppAttemptImpl处理ATTEMPT_ADDED事件,调用了SchedulerTransition代码,如下

    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      //Build blackList for AM
      Set<NodeId> failNodes = appAttempt.getFailMasterNodes();
      List<String> blackList = new ArrayList<String>(failNodes.size());
      for(NodeId node : failNodes) {
        blackList.add(node.toString());
      }
      if (!appAttempt.submissionContext.getUnmanagedAM()) {
        // Request a container for the AM.
        ResourceRequest request =
            BuilderUtils.newResourceRequest(
                AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt
                    .getSubmissionContext().getResource(), 1);
 
        // SchedulerUtils.validateResourceRequests is not necessary because
        // AM resource has been checked when submission
        Allocation amContainerAllocation = appAttempt.scheduler.allocate(
            appAttempt.applicationAttemptId,
            Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, blackList, null);
        if (amContainerAllocation != null
            && amContainerAllocation.getContainers() != null) {
          assert (amContainerAllocation.getContainers().size() == 0);
        }
        return RMAppAttemptState.SCHEDULED;
      } else {
        // save state and then go to LAUNCHED state
        appAttempt.storeAttempt();
        return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
      }
    }

这里向Scheduler申请一个container,用于执行appmaster,这个container的优先级是0,可以放到任意的机器上运行,同时将申请时候的内存使用量也写上,container个数为1。
提前说明一下,appmaster的优先级是0,reduce是10,map是20,数字越小越优先调度。随后调用FairScheduler的allocate方法,这是最重要的一个方法,包含了mrappmaster申请,mrappmaster向scheduler申请资源等,放到下一篇继续说明。

YARN MRAppMaster与Scheduler流程说明(一)

基本说明

Hadoop基本上可以分为计算层与存储层, 这篇文章详细说明一下计算层YARN的一些基本流程,概念。其中包括任务提交的过程,调度过程,container运行过程等方面。
下图是Apache社区中YARN的一个基本流程图:
YARN流程
基本过程可以概述为以下部分:
1.Client提交任务
2.ResourceManager接受任务,生成相应结构
3.RM状态机转换,通过Scheduler找到能够运行MRAppmaster的NodeManager
4.ApplicationMasterLauncher向NM发送RPC运行MRAppmaster
5.Node Manager update向RM汇报运行情况,Scheduler调度需要在该Node上运行的container,MRAppmaster通过heartbeat向RM汇报以及fairscheduler根据请求资源情况,返回给MRAppmaster可以运行的container信息
6.MRAppmaster向NM发送RPC运行container
7.NM获得运行时,经过本地化,运行该container

状态机与异步

要想详细说明YARN中的流程,就必须要对其中的一些设计思想进行说明,那就必须要了解YARN中状态机的转变以及异步执行。
了解JobTracker代码的人都应该知道,至于JobTracker扩展的重要因素是JobTracker中记录了全量信息,包括所有Job,Job中的每个MapTask以及ReduceTask的状态,并且所有的执行都是串行执行,大大降低了调度效率。为此YARN首先把Job的职责分为两部分,MRAppmaster负责了task管理,ResourceManager负责调度与节点管理。同时,YARN中大量使用了事件驱动模型,都是通过底层的异步dispatcher来将各个module进行解耦,最大降低了由于串行带来的瓶颈。
此外,YARN与MRV1最大的不同之处就在于状态机的引用,状态机+事件驱动使得YARN的内部调度效率大大提高,但是也带来了代码阅读的极大不方便。下面结合一些代码说明一下如何阅读相应的部分:

  public StateMachineFactory
             &lt;OPERAND, STATE, EVENTTYPE, EVENT&gt;
          addTransition(STATE preState, STATE postState,
                        EVENTTYPE eventType,
                        SingleArcTransition&lt;OPERAND, EVENT&gt; hook){
    return new StateMachineFactory&lt;OPERAND, STATE, EVENTTYPE, EVENT&gt;
        (this, new ApplicableSingleOrMultipleTransition&lt;OPERAND, STATE, EVENTTYPE, EVENT&gt;
           (preState, eventType, new SingleInternalArc(postState, hook)));
  }

基本上YARN中的状态机都是调用这一接口,他的意思是preState是原始状态,postState是变更后状态,eventType是触发状态转变的事件,hook是状态转变后需要执行的钩子。基本上我们只要关注eventType,就能找到需要执行的hook,然后记录下postState即可。而每一种eventType都注册到一个handler进行处理。比如下面看一下RMAppImpl代码:

    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())

上面这一过程就是由RMAppEventType.START事件触发,状态由RMAppState.NEW转变为RMAppState.NEW_SAVING,同时执行RMAppNewlySavingTransition函数。

  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
 
      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

调用的是storeNewApplication(app)方法:

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

可以看出在这一步向disptacher发送了一个event,dispatcher将相应event发送到handler处理,至此流程流动了起来。
下面从作业执行的流程一步步详解。

Client提交任务

因为YARN支持多种任务提交,不仅局限与Map Reduce框架,同时支持Spark,甚至自己写的框架(例如我们开发并在线上一直运行的OLS实时系统),每个框架都需要自己写自己的Client端。这里从简单出发,分析MR framework,有时间会分享一下我们所写的OLS框架。
MR Framework使用JobClient来提交任务。流程很简单,首先是将所有jar包,依赖包,split信息,conf等拷贝到hdfs上,随后通过RPC发送submit。submit之前Client会调用YARNRunner对job进行一个封装,方法是createApplicationSubmissionContext,主要的用途就是把任务所有信息,包括任务名称,提交队列,以及MRAppmaster的启动参数进行设置,为下面MRAppmaster在NodeManager的启动进行准备。
封装好ApplicationSubmissionContext后,Client就向Resoucemanager的服务ClientRMService发送RPC请求,提交任务。
至此,Client的工作完成了。总的来说,Client负责的是任务切分,conf各种设置,MRAppmaster启动脚本生成等任务。