YARN MRAppMaster与Scheduler流程说明(二)

上一篇主要介绍了一下YARN的基本概念以及Client的提交流程,这篇文章将会从ResourceManager入手,详解一下提交的流程。

ResouceManger接收任务

ResourceManager通过ClientRMService,首先要解析ApplicationSubmissionContext对象,获得提交的id,同时得到用户和提交的组等相应信息。随后生成一个RMApp,并在ResourceManager中注册,RMApp是Job的抽象,每个Job有一个RMApp与其对应。如果一切检验没问题后,正式提交给ResouceManager。
此后进入了状态机转换的部分,将分阶段说明:
1.RMAppEventType.START

      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.START));

可以看到事件类型是RMAppEvent,再到ResourceManager中找相应代码,看看异步dispatcher注册的handle类是哪个,代码如下:

      // Register event handler for RmAppEvents
      rmDispatcher.register(RMAppEventType.class,
          new ApplicationEventDispatcher(rmContext));

进入ApplicationEventDipatcher看一下代码为

  @Private
  public static final class ApplicationEventDispatcher implements
      EventHandler<RMAppEvent> {
 
    private final RMContext rmContext;
 
    public ApplicationEventDispatcher(RMContext rmContext) {
      this.rmContext = rmContext;
    }
 
    @Override
    public void handle(RMAppEvent event) {
      ApplicationId appID = event.getApplicationId();
      RMApp rmApp = this.rmContext.getRMApps().get(appID);
      if (rmApp != null) {
        try {
          rmApp.handle(event);
        } catch (Throwable t) {
          LOG.error("Error in handling event type " + event.getType()
              + " for application " + appID, t);
        }
      }
    }
  }

这样我们就知道去找RMApp的实现类,RMAppImpl去找状态机改变及hook函数。其实也可以用比较简单的方法,就是在hadoop yarn代码中用grep去定位到包含RMAppEventType.START的类,基本上就八九不离十了。当然知道原来是首先需要掌握的。
随后看一下RMAppImpl的代码,状态机部分:

    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())

这表示状态RMAppEventType.START会触发RMAppNewlySavingTransition的hook函数,同时将RMAppState由NEW转换为NEW_SAVING。
在RMAppNewlySavingTransition的transition函数中调用了storeNewApplication

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

至此,将新的Event重新发到dispatcher中进行处理。

2.RMStateStoreEventType.STORE_APP
RMStateStoreEventType由RMStateStore类来处理,相应的代码如下:

      if(isRecoveryEnabled) {
        recoveryEnabled = true;
        rmStore =  RMStateStoreFactory.getStore(conf);
      } else {
        recoveryEnabled = false;
        rmStore = new NullRMStateStore();
      }
 
      try {
        rmStore.init(conf);
        rmStore.setRMDispatcher(rmDispatcher);
        rmStore.setResourceManager(rm);
      } catch (Exception e) {
        // the Exception from stateStore.init() needs to be handled for
        // HA and we need to give up master status if we got fenced
        LOG.error("Failed to init state store", e);
        throw e;
      }

默认的我们都是不带recovery,重启后需要重跑,因为recovery读大量目录,时间较慢,可以看到RMStateStore注册dispatcher中。
RMStateStore的handle函数,将event推送给handleStoreEvent方法进行处理,代码如下:

      LOG.info("Storing info for app: " + appId);
      try {
        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
          storeApplicationStateInternal(appId, appStateData);
          notifyDoneStoringApplication(appId, storedException);
        } else {
          assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
          updateApplicationStateInternal(appId, appStateData);
          notifyDoneUpdatingApplication(appId, storedException);
        }
      } catch (Exception e) {
        LOG.error("Error storing app: " + appId, e);
        notifyStoreOperationFailed(e);
      }

处理完后会执行notifyDoneStoringApplication方法

  private void notifyDoneStoringApplication(ApplicationId appId,
                                                  Exception storedException) {
    rmDispatcher.getEventHandler().handle(
        new RMAppNewSavedEvent(appId, storedException));
  }

又向dispatcher中推送了一个RMAppNewSavedEvent的事件。

3.RMAppEventType.APP_NEW_SAVED
同1的处理,状态机最后将event推送到RMAppImpl处理,代码如下:

   .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())

处理方式调用AddApplicationToSchedulerTransition

      app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
        app.submissionContext.getQueue(), app.user));

在ResouceManager中注册了AppAddSchedulerEvent的父类SchdulerEventType

      schedulerDispatcher = createSchedulerEventDispatcher();
      addIfService(schedulerDispatcher);
      rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);

Scheduler有自己独立的线程去处理,而不去堵塞ResourceManager的处理线程。由于有多重Scheduler,我这里只介绍我们使用的FairScheduler的流程,其它代码查看基本类似。

4.SchedulerEventType.APP_ADDED
FairScheduler的handle方法对多个事件进行处理,APP_ADDED的处理如下

    case APP_ADDED:
      if (!(event instanceof AppAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      addApplication(appAddedEvent.getApplicationId(),
        appAddedEvent.getQueue(), appAddedEvent.getUser());
      break;

在addAplication方法中,要获得提交时候生成的RMApp对象,提交的用户以及queue的名称,把任务放到queue中用于后面的调度,调用的代码是assignToQueue,这里会对有效性就行验证。随后在FairScheduler中生成SchedulerApplication对象,代表该任务被调度器接收等待调度,并放到Map<ApplicationId, SchedulerApplication> applications结构中。最后还是通过dispatcher将event转发出去。

    rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));

5.RMAppEventType.APP_ACCEPTED
还是通过RMAppImpl来处理APP_ACCEPTED事件。处理方法如下:

  private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }

这一步就是生成一个attempt,每个app每次执行都生成一个新的attempt,并保存在RMApp的内存结构中,attempt可能由于各种原因导致失败,app就会重新启动一个attempt,默认是超过4次,app就fail掉了。最后通过handler将事件发出处理。

6.RMAppAttemptEventType.START
RMAppAttemptEventType.START由刚刚生成的RMAppAttemptImpl来处理,如下代码:

      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())

RMAppAttemptEventType.START触发状态由NEW转化为SUBMITTED,并执行AttemptStartedTransition方法。AttemptStartedTransition方法内需要注意的调用为:

      // Register with the ApplicationMasterService
      appAttempt.masterService
          .registerAppAttempt(appAttempt.applicationAttemptId);
 
      // Add the applicationAttempt to the scheduler and inform the scheduler
      // whether to transfer the state from previous attempt.
      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
    }

主要就是向applicationMaster注册,并且发起调度的事件,期望能够被Scheduler调度。

7.SchedulerEventType.APP_ATTEMPT_ADDED
FairScheduler处理APP_ATTEMPT_ADDED事件。

    SchedulerApplication application =
        applications.get(applicationAttemptId.getApplicationId());
    String user = application.getUser();
    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
 
    FSSchedulerApp attempt =
        new FSSchedulerApp(applicationAttemptId, user,
            queue, new ActiveUsersManager(getRootQueueMetrics()),
            rmContext);
    if (transferStateFromPreviousAttempt) {
      attempt.transferStateFromPreviousAttempt(application
        .getCurrentAppAttempt());
    }
    application.setCurrentAppAttempt(attempt);
 
    boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
 
    RMApp rmApp = rmContext.getRMApps().get(attempt.getApplicationId());
    queue.addApp(attempt, runnable,rmApp.getApplicationSubmissionContext().getPriority());
    if (runnable) {
      maxRunningEnforcer.trackRunnableApp(attempt);
    } else {
      maxRunningEnforcer.trackNonRunnableApp(attempt);
    }
 
    queue.getMetrics().submitAppAttempt(user);
    ClusterMetrics.getMetrics().addPendingApp(attempt.getApplicationId());
 
    LOG.info("Added Application Attempt " + applicationAttemptId
        + " to scheduler from user: " + user + " is isAttemptRecovering : " + isAttemptRecovering);
 
    if (isAttemptRecovering) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(applicationAttemptId
            + " is recovering. Skipping notifying ATTEMPT_ADDED");
      }
    } else {
      rmContext.getDispatcher().getEventHandler().handle(
        new RMAppAttemptEvent(applicationAttemptId,
            RMAppAttemptEventType.ATTEMPT_ADDED));
    }

生成FSSchedulerApp,对应了RMApp中生成的RMAppAttempt。然后根据fair得原则,检查该任务是否可以执行,如果超出queue同时运行的任务数把状态置为目前不可运行,否则就是可运行状态。至此,app处于pending状态,根据队列的情况稍后可以调度还是等待转化为可以调度状态。最后通过dispatcher发送event,RMAppAttemptEventType.ATTEMPT_ADDED通过RMAppAttempt来处理。

8.RMAppAttemptEventType.ATTEMPT_ADDED
RMAppAttemptImpl处理ATTEMPT_ADDED事件,调用了SchedulerTransition代码,如下

    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      //Build blackList for AM
      Set<NodeId> failNodes = appAttempt.getFailMasterNodes();
      List<String> blackList = new ArrayList<String>(failNodes.size());
      for(NodeId node : failNodes) {
        blackList.add(node.toString());
      }
      if (!appAttempt.submissionContext.getUnmanagedAM()) {
        // Request a container for the AM.
        ResourceRequest request =
            BuilderUtils.newResourceRequest(
                AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt
                    .getSubmissionContext().getResource(), 1);
 
        // SchedulerUtils.validateResourceRequests is not necessary because
        // AM resource has been checked when submission
        Allocation amContainerAllocation = appAttempt.scheduler.allocate(
            appAttempt.applicationAttemptId,
            Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, blackList, null);
        if (amContainerAllocation != null
            && amContainerAllocation.getContainers() != null) {
          assert (amContainerAllocation.getContainers().size() == 0);
        }
        return RMAppAttemptState.SCHEDULED;
      } else {
        // save state and then go to LAUNCHED state
        appAttempt.storeAttempt();
        return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
      }
    }

这里向Scheduler申请一个container,用于执行appmaster,这个container的优先级是0,可以放到任意的机器上运行,同时将申请时候的内存使用量也写上,container个数为1。
提前说明一下,appmaster的优先级是0,reduce是10,map是20,数字越小越优先调度。随后调用FairScheduler的allocate方法,这是最重要的一个方法,包含了mrappmaster申请,mrappmaster向scheduler申请资源等,放到下一篇继续说明。