ResourceManager dispatcher handler slow because RMStore Synchronized Method

Recently, I have noticed the Async dispatcher in our resource manager get pending for some times.
Here are some log:

2016-05-24 00:46:20,398 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 24000
2016-05-24 00:46:21,008 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 25000
2016-05-24 00:46:21,632 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 26000
2016-05-24 00:46:22,251 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 27000
2016-05-24 00:46:22,873 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 28000
2016-05-24 00:46:23,501 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 29000
2016-05-24 00:46:24,109 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 30000  

As we all know, the async dispatcher in resource manager normally handle the event quickly enough, but from the log ,we can notice the pending situation is serious.
So we investigated this problem, and jstack the rescue manager process during pending. Here is the jstack information:

"AsyncDispatcher event handler" prio=10 tid=0x00007f4d6db10000 nid=0x5bca waiting for monitor entry [0x00007f4d3aa8c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.storeNewApplication(RMStateStore.java:375)
        - waiting to lock <0x00000003bae88af0> (a org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl$RMAppNewlySavingTransition.transition(RMAppImpl.java:881)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl$RMAppNewlySavingTransition.transition(RMAppImpl.java:872)
        at org.apache.hadoop.yarn.state.StateMachineFactory$SingleInternalArc.doTransition(StateMachineFactory.java:362)
        at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
        at org.apache.hadoop.yarn.state.StateMachineFactory.access$300(StateMachineFactory.java:46)
        at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:448)
        - locked <0x0000000394cbae40> (a org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.handle(RMAppImpl.java:645)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.handle(RMAppImpl.java:82)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$ApplicationEventDispatcher.handle(ResourceManager.java:690)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$ApplicationEventDispatcher.handle(ResourceManager.java:674)
        at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:173)
        at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:106)
        at java.lang.Thread.run(Thread.java:662)

"AsyncDispatcher event handler" daemon prio=10 tid=0x00007f4d6d8f6000 nid=0x5c32 in Object.wait() [0x00007f4d3a183000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2031)
        - locked <0x000000032bc7bd58> (a java.util.LinkedList)
        at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2015)
        at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2113)
        - locked <0x000000032bc7ba80> (a org.apache.hadoop.hdfs.DFSOutputStream)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:70)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:103)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore.writeFile(FileSystemRMStateStore.java:528)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore.storeApplicationStateInternal(FileSystemRMStateStore.java:329)
        - locked <0x00000003bae88af0> (a org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.handleStoreEvent(RMStateStore.java:625)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$ForwardingEventHandler.handle(RMStateStore.java:770)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$ForwardingEventHandler.handle(RMStateStore.java:765)
        at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:173)
        at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:106)
        at java.lang.Thread.run(Thread.java:662)

It seemed the async dispatcher in resource manager is blocked by the method of storeNewApplication in RMSteateStore.
From the code we know there are two async dispatcher in resource manager process. One is the main dispatcher for whole resource manager to deal with applications submit, scheduler and other staff. The other is the dispatcher in rmstore, the function of rmstore can be explained in this blog. Because rmstore use hdfs or zk for backup, the process time is slow, so it have its own dispatcher in case not to pending the main dispatcher of resource manager.
Unfortunately, we use hdfs for our rmstore back up, deep inside the code

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

The main dispatcher handle the event to store the new application context information, this method only pass the event to the rmstatestore dispatcher and return immediately. But the method is sync. And in the child class of rmstatestore — Filesystemrmstatestore, the code to store application to hdfs is as follow:

  @Override
  public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    String appIdStr = appId.toString();
    Path appDirPath = getAppDir(rmAppRoot, appIdStr);
    fs.mkdirs(appDirPath);
    Path nodeCreatePath = getNodePath(appDirPath, appIdStr);

    LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      writeFile(nodeCreatePath, appStateData);
    } catch (Exception e) {
      LOG.info("Error storing info for app: " + appId, e);
      throw e;
    }
  }

This method is also sync. So if dispatcher in rmstatestore occupy the lock, and write to hdfs slow, it is easy to find the main dispatcher get pending to wait for the lock. And the lock is meaningless for main dispatcher, so we can just get rid of the lock.
There are related jira for this problem, the jira is YARN-4398.

YARN-4493 ResourceManager moveQueue bug

When moving a running application to a different queue in resourceManger , the current implement don’t check if the app can run in the new queue before remove it from current queue. So if the destination queue is full, the app will throw exception, and don’t belong to any queue.
Here is the code

   private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
       FSLeafQueue oldQueue, FSLeafQueue newQueue) {
     boolean wasRunnable = oldQueue.removeApp(attempt);
     // if app was not runnable before, it may be runnable now
     boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
         attempt.getUser());
     if (wasRunnable && !nowRunnable) {
       throw new IllegalStateException("Should have already verified that app "
           + attempt.getApplicationId() + " would be runnable in new queue");

After that, the queue become orphane, can not schedule any resources. If you kill the app, the removeApp method in FSLeafQueue will throw IllealStateException of “Given app to remove app does not exist in queue …” exception.
So i think we should check if the destination queue can run the app before remove it from the current queue.
The patch is below:

   private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
       FSLeafQueue oldQueue, FSLeafQueue newQueue) {
-    boolean wasRunnable = oldQueue.removeApp(attempt);
     // if app was not runnable before, it may be runnable now
     boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
         attempt.getUser());
+    boolean wasRunnable = false;
+    if (nowRunnable) {
+        wasRunnable = oldQueue.removeApp(attempt);
+    }
     if (wasRunnable && !nowRunnable) {
       throw new IllegalStateException("Should have already verified that app "
           + attempt.getApplicationId() + " would be runnable in new queue");

Change LogLevel For MRAppMaster

Sometimes we want to show the debug log in MRAppMaster, there are two methods to do it. The first one is to change the mapred-site.xml in your gateway where you submit the job, add this conf.

<property>
<name>yarn.app.mapreduce.am.log.level</name>
<value>DEBUG</value>
</property>

The second one is to add config in the submit command like this

hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.0.jar terasort -Dmapred.reduce.tasks=50 -Dmapreduce.map.speculative=false -Dmapreduce.reduce.speculative=false -Dyarn.app.mapreduce.am.log.level=DEBUG /test/1001 /test/1001_SORT14

YARN MRAppMaster启动流程(一)

从这篇起,开始介绍MRAppmaster与NodeManager交互启动,以及申请资源等内容。

1.接YARN MRAppMaster与Scheduler流程说明,ApplicationMasterLauncher启动一个AMLaucher线程,用于启动MRAppmaster。首先是通过RPC连接,interface是ContainerManagementProtocol,负责ApplicationMaster向NodeManager通信,然后封装所有的运行环境,参数等到context中,随后就是通过RPC向NodeManager发送startContainer命令。

    StartContainersResponse response =
        containerMgrProxy.startContainers(allRequests);

至此,ResourceManager调度完成,下面的工作交给了NodeManager。

2.ContainerManagerImpl.startContainers
NodeManager是通过ContainerManagerImpl模块来处理ApplicationMasterLauncher的请求的,首先会对ugi,token进行验证,随后就启动container,获得启动的用户,执行的所有命令和conf,随后根据这些信息生成Application对象用来执行。

    Container container =
        new ContainerImpl(getConfig(), this.dispatcher, launchContext,
          credentials, metrics, containerTokenIdentifier);
    ApplicationId applicationID =
        containerId.getApplicationAttemptId().getApplicationId();
 
    Application application =
            new ApplicationImpl(dispatcher, user, applicationID, credentials, context);

随后通过NodeManager的dispatcher发送ApplicationEventType.INIT_APPLICATION事件。

3.ApplicationEventType.INIT_APPLICATION
ApplicationImpl处理ApplicationEventType.INIT_APPLICATION事件,通知logHandler处理

      // Inform the logAggregator
      app.dispatcher.getEventHandler().handle(
          new LogHandlerAppStartedEvent(app.appId, app.user,
              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
              app.applicationACLs));

4.LogHandlerAppStartedEvent
我们集群开启了logAggregation,配置方法为在yarn-site.xml配置yarn.log-aggregation-enable为true。随后由LogAggregationService类进行处理,主要是初始化aggregator,代码为

  private void initApp(final ApplicationId appId, String user,
      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
      Map<ApplicationAccessType, String> appAcls) {
    ApplicationEvent eventResponse;
    try {
      verifyAndCreateRemoteLogDir(getConfig());
      initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
      eventResponse = new ApplicationEvent(appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
    } catch (YarnRuntimeException e) {
      LOG.warn("Application failed to init aggregation", e);
      eventResponse = new ApplicationEvent(appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
    }
    this.dispatcher.getEventHandler().handle(eventResponse);
  }

随后向Dispatcher发送ApplicationEventType.APPLICATION_LOG_HANDLING_INITED事件。
5.ApplicationEventType.APPLICATION_LOG_HANDLING_INITED
由ApplicationImpl处理,代码为

  static class AppLogInitDoneTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      app.dispatcher.getEventHandler().handle(
          new ApplicationLocalizationEvent(
              LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
    }
  }

最后通过dispatcher发送ApplicationLocalizationEvent事件。
6. LocalizationEventType.INIT_APPLICATION_RESOURCES
在ContainerManagerImpl中注册了ApplicationLocalizationEvent的处理方法为ResourceLocalizationService,代码是

dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);

这时候开始了MRAppmaster本地化的过程,所谓本地化就是把执行需要的所有资源,环境从集群中下载到执行MRAppmaster的NodeManager中的相应目录中。关于这部分的内容,我推荐看hortonwokrs的这篇文章来仔细了解。
最后发送ApplicationInitedEvent事件到ApplicationImpl处理。到这一步,application初始化完毕,表示application在这台机器上运行,建立了相应的内存结构,下一步开始了初始化container。
7.ApplicationInitedEvent.APPLICATION_INITED
直接发送ContainerEventType.INIT_CONTAINER到distapcher。
8.ContainerEventType.INIT_CONTAINER
ContainerImpl来处理ContainerEventType事件,开始本地化过程。调用了RequestResourcesTransition函数来处理。这里会对所有的资源,根据PUBLIC,PRIVATE,APPLICATION分别处理放到 Map<LocalResourceVisibility, Collection> req 中,发送LocalizationEventType.INIT_CONTAINER_RESOURCES事件。
9.LocalizationEventType.INIT_CONTAINER_RESOURCES
对于7中不同类型的资源,转发给相应的tracker进行处理。处理事件为ResourceEventType.REQUEST
10.ResourceEventType.REQUEST
LocalizedResource来处理这个事件。开始下载过程。代码为

  private static class FetchResourceTransition extends ResourceTransition {
    @Override
    public void transition(LocalizedResource rsrc, ResourceEvent event) {
      ResourceRequestEvent req = (ResourceRequestEvent) event;
      LocalizerContext ctxt = req.getContext();
      ContainerId container = ctxt.getContainerId();
      rsrc.ref.add(container);
      rsrc.dispatcher.getEventHandler().handle(
          new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt, 
              req.getLocalResourceRequest().getPattern()));
    }
  }

11.LocalizerResourceRequestEvent
由ResourceLocalizationService来处理,PUBLIC的资源由PublicLocalizer下载,PRIVATE和APPLICATION的资源由LocalizerRunner来处理。
PublicLocalizer内部有个queue为pending,做了同步,通过线程不断的拉文件系统的数据,通过FSDowload类来做。代码为

              pending.put(queue.submit(new FSDownload(lfs, null, conf,
                  publicDirDestPath, resource, request.getContext().getStatCache())),
                  request);

PRIVATE和APPLICATIOn则是通过LocalizerRunner线程不断拉取,最后有两种方式,一种是DefaultContainerExecutor一种是LinuxContainerExecutor(cgroup用来做资源隔离)来做,看一下默认的Default

//    ContainerLocalizer localizer =
//        new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
//            RecordFactoryProvider.getRecordFactory(getConf()));
 
    createUserLocalDirs(localDirs, user);
    createUserCacheDirs(localDirs, user);
    createAppDirs(localDirs, user, appId);
    createAppLogDirs(appId, logDirs);
 
 
    //Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
    // randomly choose the local directory
    Path appStorageDir = getWorkingDir(localDirs, user, appId);
 
    String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
    Path tokenDst = new Path(appStorageDir, tokenFn);
    lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
    LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
//    lfs.setWorkingDirectory(appStorageDir);
//    LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
    // TODO: DO it over RPC for maintaining similarity?
    FileContext localizerFc = FileContext.getFileContext(
        lfs.getDefaultFileSystem(), getConf());
    localizerFc.setUMask(lfs.getUMask());
    localizerFc.setWorkingDirectory(appStorageDir);
    LOG.info("Localizer CWD set to " + appStorageDir + " = " 
        + localizerFc.getWorkingDirectory());
    ContainerLocalizer localizer =
        new ContainerLocalizer(localizerFc, user, appId, locId, 
            getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf()));
 
    localizer.runLocalization(nmAddr);

重要的是ContainerLocalizer,在runLocalization代码为

     exec = createDownloadThreadPool();
     CompletionService<Path> ecs = createCompletionService(exec);
     localizeFiles(nodeManager, ecs, ugi);

用线程池来下载集群中的资源到本地对应目录,并设置相应的权限。而localizeFiles则会周期性的向ResourceLocalizationService汇报状态,

         try {
            nodemanager.heartbeat(status);
          } catch (YarnException e) { }
          return;
        }
        cs.poll(1000, TimeUnit.MILLISECONDS);

在ResourceLocalizationService对应代码为

  @Override
  public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
    return localizerTracker.processHeartbeat(status);
  }

如果本地化全部完成,在processHeartbeat后会有状态机的改变

     case FETCH_SUCCESS:
            // notify resource
            try {
            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
              .handle(
                new ResourceLocalizedEvent(req, ConverterUtils
                  .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));

至此,资源本地化完成,container等待执行,此时event为ResourceLocalizedEvent。

YARN MRAppMaster与Scheduler流程说明(三)

接上一篇说明,继续app提交在ResouceManager中的状态机流转。
9.FairScheduler.allocate
allocate方法基本上是申请mrappmaster和mrappmaster启动后向ResourceManager申请container资源使用的。它首先检查app是否在已经注册到scheduler中,然后检查申请资源状态,释放掉相应资源,并加入到AppSchedulingInfo结构中,这些内容会在MRAppmaster的流程中更具体的介绍。随后是获取能够运行appmaster的container资源的方法,代码为:

  public synchronized ContainersAndNMTokensAllocation
      pullNewlyAllocatedContainersAndNMTokens() {
    List<Container> returnContainerList =
        new ArrayList<Container>(newlyAllocatedContainers.size());
    List<NMToken> nmTokens = new ArrayList<NMToken>();
    for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
      .hasNext();) {
      RMContainer rmContainer = i.next();
      Container container = rmContainer.getContainer();
      try {
        // create container token and NMToken altogether.
        container.setContainerToken(rmContext.getContainerTokenSecretManager()
          .createContainerToken(container.getId(), container.getNodeId(),
            getUser(), container.getResource()));
        NMToken nmToken =
            rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
              getApplicationAttemptId(), container);
        if (nmToken != null) {
          nmTokens.add(nmToken);
        }
      } catch (IllegalArgumentException e) {
        // DNS might be down, skip returning this container.
        LOG.error("Error trying to assign container token and NM token to" +
            " an allocated container " + container.getId(), e);
        continue;
      }
      returnContainerList.add(container);
      i.remove();
      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
        RMContainerEventType.ACQUIRED));
    }
    return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
  }

这里需要说明的是由于YARN大部分操作都是通过Distpatcher进行异步操作,NodeManager向ResouceManager汇报心跳的时候会最后触发FairScheduler的NODE_UPDATE事件,NodeManager会汇报最新执行的,执行结束的Container,Scheduler会放到相应的结构中。然后会从父节点开始分配,直到叶子节点,分配的规则会在调度器部门专门写。最后选出合适的队列,并且从队列里面再根据fair规则分配,检查注册的App是否有合适的需要分配,App中的分配是根据优先级进行的,appmaster>reduce>map,然后再根据本地性找合适的container进行分配。分配成功后会将生成一个RMContainer对象,代表一个运行的container,并且加入到newlyAllocatedContainers结构中。

    // Create RMContainer
    RMContainer rmContainer = new RMContainerImpl(container, 
        getApplicationAttemptId(), node.getNodeID(),
        appSchedulingInfo.getUser(), rmContext);
 
    // Add it to allContainers list.
    newlyAllocatedContainers.add(rmContainer);
    liveContainers.put(container.getId(), rmContainer);  
        // Update consumption and track allocations
    appSchedulingInfo.allocate(type, node, priority, request, container);
    Resources.addTo(currentConsumption, container.getResource());
 
    // Inform the container
    rmContainer.handle(
        new RMContainerEvent(container.getId(), RMContainerEventType.START));

并发送RMContainerEventType.START到dispatcher进行处理。
回到allocate方法,将刚才说到的newlyAllcatedContainers取出,并封装ContainersAndNMTokensAllocation返回,相当于调度成功。

10.RMContainerEventType.START
状态机及处理代码为:

    // Transitions from NEW state
    .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED,
        RMContainerEventType.START, new ContainerStartedTransition())
 
 
 
   private static final class ContainerStartedTransition extends
      BaseTransition {
 
    @Override
    public void transition(RMContainerImpl container, RMContainerEvent event) {
      container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
          container.appAttemptId));
    }
  }

就是出发RMAppAttemptEventType.CONTAINER_ALLOCATED给dispatcher调用RMAppimpl进行处理。

11.RMAppAttemptEventType.CONTAINER_ALLOCATED
触发RMAppAttemptEventType.CONTAINER_ALLOCATED事件的方法很多,但是目前为止RMAppimpl状态为RMAppAttemptState.SCHEDULED,通过之前的状态就很容易找到状态机处理函数的入口为:

       // Transitions from SCHEDULED State
      .addTransition(RMAppAttemptState.SCHEDULED,
          EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
            RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.CONTAINER_ALLOCATED,
          new AMContainerAllocatedTransition())

处理函数其实很简单,就是从get第9步获取的container,并返回。如果获取失败重新调度。否则store这个attempt,触发RMStateStoreEventType.STORE_APP_ATTEMPT事件,代码如下:

  public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
    Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
 
    ApplicationAttemptState attemptState =
        new ApplicationAttemptState(appAttempt.getAppAttemptId(),
          appAttempt.getMasterContainer(), credentials,
          appAttempt.getStartTime());
 
    dispatcher.getEventHandler().handle(
      new RMStateStoreAppAttemptEvent(attemptState));
  }

12.RMStateStoreEventType.STORE_APP_ATTEMPT
这个事件很简单,就是根据配置是否保存和保存在哪种介质中,最后触发了RMAppAttemptEventType.ATTEMPT_NEW_SAVED事件,并发送到Dispatcher中。

13.RMAppAttemptEventType.ATTEMPT_NEW_SAVED
这个事件回到RMAppattemptImpl中处理,最后触发AMLauncherEventType.LAUNCH事件。

14.AMLauncherEventType.LAUNCH
这个事件由ApplicationMasterLauncher来处理,ApplicationMasterLauncher是ResouceManger中的一个服务,负责向NodeManager发送请求执行AppMaster。
AMLauncherEventType.LAUNCH的处理函数为

  private void launch(RMAppAttempt application) {
    Runnable launcher = createRunnableLauncher(application, 
        AMLauncherEventType.LAUNCH);
    masterEvents.add(launcher);
  }

masterEvents是一个BlockingQueue,ApplicationMasterLauncher有个线程LauncherThread负责调用生成的Runnable。至于与NodeManager交互放到下一篇中介绍。

YARN MRAppMaster与Scheduler流程说明(二)

上一篇主要介绍了一下YARN的基本概念以及Client的提交流程,这篇文章将会从ResourceManager入手,详解一下提交的流程。

ResouceManger接收任务

ResourceManager通过ClientRMService,首先要解析ApplicationSubmissionContext对象,获得提交的id,同时得到用户和提交的组等相应信息。随后生成一个RMApp,并在ResourceManager中注册,RMApp是Job的抽象,每个Job有一个RMApp与其对应。如果一切检验没问题后,正式提交给ResouceManager。
此后进入了状态机转换的部分,将分阶段说明:
1.RMAppEventType.START

      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.START));

可以看到事件类型是RMAppEvent,再到ResourceManager中找相应代码,看看异步dispatcher注册的handle类是哪个,代码如下:

      // Register event handler for RmAppEvents
      rmDispatcher.register(RMAppEventType.class,
          new ApplicationEventDispatcher(rmContext));

进入ApplicationEventDipatcher看一下代码为

  @Private
  public static final class ApplicationEventDispatcher implements
      EventHandler<RMAppEvent> {
 
    private final RMContext rmContext;
 
    public ApplicationEventDispatcher(RMContext rmContext) {
      this.rmContext = rmContext;
    }
 
    @Override
    public void handle(RMAppEvent event) {
      ApplicationId appID = event.getApplicationId();
      RMApp rmApp = this.rmContext.getRMApps().get(appID);
      if (rmApp != null) {
        try {
          rmApp.handle(event);
        } catch (Throwable t) {
          LOG.error("Error in handling event type " + event.getType()
              + " for application " + appID, t);
        }
      }
    }
  }

这样我们就知道去找RMApp的实现类,RMAppImpl去找状态机改变及hook函数。其实也可以用比较简单的方法,就是在hadoop yarn代码中用grep去定位到包含RMAppEventType.START的类,基本上就八九不离十了。当然知道原来是首先需要掌握的。
随后看一下RMAppImpl的代码,状态机部分:

    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())

这表示状态RMAppEventType.START会触发RMAppNewlySavingTransition的hook函数,同时将RMAppState由NEW转换为NEW_SAVING。
在RMAppNewlySavingTransition的transition函数中调用了storeNewApplication

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

至此,将新的Event重新发到dispatcher中进行处理。

2.RMStateStoreEventType.STORE_APP
RMStateStoreEventType由RMStateStore类来处理,相应的代码如下:

      if(isRecoveryEnabled) {
        recoveryEnabled = true;
        rmStore =  RMStateStoreFactory.getStore(conf);
      } else {
        recoveryEnabled = false;
        rmStore = new NullRMStateStore();
      }
 
      try {
        rmStore.init(conf);
        rmStore.setRMDispatcher(rmDispatcher);
        rmStore.setResourceManager(rm);
      } catch (Exception e) {
        // the Exception from stateStore.init() needs to be handled for
        // HA and we need to give up master status if we got fenced
        LOG.error("Failed to init state store", e);
        throw e;
      }

默认的我们都是不带recovery,重启后需要重跑,因为recovery读大量目录,时间较慢,可以看到RMStateStore注册dispatcher中。
RMStateStore的handle函数,将event推送给handleStoreEvent方法进行处理,代码如下:

      LOG.info("Storing info for app: " + appId);
      try {
        if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
          storeApplicationStateInternal(appId, appStateData);
          notifyDoneStoringApplication(appId, storedException);
        } else {
          assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
          updateApplicationStateInternal(appId, appStateData);
          notifyDoneUpdatingApplication(appId, storedException);
        }
      } catch (Exception e) {
        LOG.error("Error storing app: " + appId, e);
        notifyStoreOperationFailed(e);
      }

处理完后会执行notifyDoneStoringApplication方法

  private void notifyDoneStoringApplication(ApplicationId appId,
                                                  Exception storedException) {
    rmDispatcher.getEventHandler().handle(
        new RMAppNewSavedEvent(appId, storedException));
  }

又向dispatcher中推送了一个RMAppNewSavedEvent的事件。

3.RMAppEventType.APP_NEW_SAVED
同1的处理,状态机最后将event推送到RMAppImpl处理,代码如下:

   .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())

处理方式调用AddApplicationToSchedulerTransition

      app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
        app.submissionContext.getQueue(), app.user));

在ResouceManager中注册了AppAddSchedulerEvent的父类SchdulerEventType

      schedulerDispatcher = createSchedulerEventDispatcher();
      addIfService(schedulerDispatcher);
      rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);

Scheduler有自己独立的线程去处理,而不去堵塞ResourceManager的处理线程。由于有多重Scheduler,我这里只介绍我们使用的FairScheduler的流程,其它代码查看基本类似。

4.SchedulerEventType.APP_ADDED
FairScheduler的handle方法对多个事件进行处理,APP_ADDED的处理如下

    case APP_ADDED:
      if (!(event instanceof AppAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      addApplication(appAddedEvent.getApplicationId(),
        appAddedEvent.getQueue(), appAddedEvent.getUser());
      break;

在addAplication方法中,要获得提交时候生成的RMApp对象,提交的用户以及queue的名称,把任务放到queue中用于后面的调度,调用的代码是assignToQueue,这里会对有效性就行验证。随后在FairScheduler中生成SchedulerApplication对象,代表该任务被调度器接收等待调度,并放到Map<ApplicationId, SchedulerApplication> applications结构中。最后还是通过dispatcher将event转发出去。

    rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));

5.RMAppEventType.APP_ACCEPTED
还是通过RMAppImpl来处理APP_ACCEPTED事件。处理方法如下:

  private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }

这一步就是生成一个attempt,每个app每次执行都生成一个新的attempt,并保存在RMApp的内存结构中,attempt可能由于各种原因导致失败,app就会重新启动一个attempt,默认是超过4次,app就fail掉了。最后通过handler将事件发出处理。

6.RMAppAttemptEventType.START
RMAppAttemptEventType.START由刚刚生成的RMAppAttemptImpl来处理,如下代码:

      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())

RMAppAttemptEventType.START触发状态由NEW转化为SUBMITTED,并执行AttemptStartedTransition方法。AttemptStartedTransition方法内需要注意的调用为:

      // Register with the ApplicationMasterService
      appAttempt.masterService
          .registerAppAttempt(appAttempt.applicationAttemptId);
 
      // Add the applicationAttempt to the scheduler and inform the scheduler
      // whether to transfer the state from previous attempt.
      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
    }

主要就是向applicationMaster注册,并且发起调度的事件,期望能够被Scheduler调度。

7.SchedulerEventType.APP_ATTEMPT_ADDED
FairScheduler处理APP_ATTEMPT_ADDED事件。

    SchedulerApplication application =
        applications.get(applicationAttemptId.getApplicationId());
    String user = application.getUser();
    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
 
    FSSchedulerApp attempt =
        new FSSchedulerApp(applicationAttemptId, user,
            queue, new ActiveUsersManager(getRootQueueMetrics()),
            rmContext);
    if (transferStateFromPreviousAttempt) {
      attempt.transferStateFromPreviousAttempt(application
        .getCurrentAppAttempt());
    }
    application.setCurrentAppAttempt(attempt);
 
    boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
 
    RMApp rmApp = rmContext.getRMApps().get(attempt.getApplicationId());
    queue.addApp(attempt, runnable,rmApp.getApplicationSubmissionContext().getPriority());
    if (runnable) {
      maxRunningEnforcer.trackRunnableApp(attempt);
    } else {
      maxRunningEnforcer.trackNonRunnableApp(attempt);
    }
 
    queue.getMetrics().submitAppAttempt(user);
    ClusterMetrics.getMetrics().addPendingApp(attempt.getApplicationId());
 
    LOG.info("Added Application Attempt " + applicationAttemptId
        + " to scheduler from user: " + user + " is isAttemptRecovering : " + isAttemptRecovering);
 
    if (isAttemptRecovering) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(applicationAttemptId
            + " is recovering. Skipping notifying ATTEMPT_ADDED");
      }
    } else {
      rmContext.getDispatcher().getEventHandler().handle(
        new RMAppAttemptEvent(applicationAttemptId,
            RMAppAttemptEventType.ATTEMPT_ADDED));
    }

生成FSSchedulerApp,对应了RMApp中生成的RMAppAttempt。然后根据fair得原则,检查该任务是否可以执行,如果超出queue同时运行的任务数把状态置为目前不可运行,否则就是可运行状态。至此,app处于pending状态,根据队列的情况稍后可以调度还是等待转化为可以调度状态。最后通过dispatcher发送event,RMAppAttemptEventType.ATTEMPT_ADDED通过RMAppAttempt来处理。

8.RMAppAttemptEventType.ATTEMPT_ADDED
RMAppAttemptImpl处理ATTEMPT_ADDED事件,调用了SchedulerTransition代码,如下

    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      //Build blackList for AM
      Set<NodeId> failNodes = appAttempt.getFailMasterNodes();
      List<String> blackList = new ArrayList<String>(failNodes.size());
      for(NodeId node : failNodes) {
        blackList.add(node.toString());
      }
      if (!appAttempt.submissionContext.getUnmanagedAM()) {
        // Request a container for the AM.
        ResourceRequest request =
            BuilderUtils.newResourceRequest(
                AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt
                    .getSubmissionContext().getResource(), 1);
 
        // SchedulerUtils.validateResourceRequests is not necessary because
        // AM resource has been checked when submission
        Allocation amContainerAllocation = appAttempt.scheduler.allocate(
            appAttempt.applicationAttemptId,
            Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, blackList, null);
        if (amContainerAllocation != null
            && amContainerAllocation.getContainers() != null) {
          assert (amContainerAllocation.getContainers().size() == 0);
        }
        return RMAppAttemptState.SCHEDULED;
      } else {
        // save state and then go to LAUNCHED state
        appAttempt.storeAttempt();
        return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
      }
    }

这里向Scheduler申请一个container,用于执行appmaster,这个container的优先级是0,可以放到任意的机器上运行,同时将申请时候的内存使用量也写上,container个数为1。
提前说明一下,appmaster的优先级是0,reduce是10,map是20,数字越小越优先调度。随后调用FairScheduler的allocate方法,这是最重要的一个方法,包含了mrappmaster申请,mrappmaster向scheduler申请资源等,放到下一篇继续说明。

YARN MRAppMaster与Scheduler流程说明(一)

基本说明

Hadoop基本上可以分为计算层与存储层, 这篇文章详细说明一下计算层YARN的一些基本流程,概念。其中包括任务提交的过程,调度过程,container运行过程等方面。
下图是Apache社区中YARN的一个基本流程图:
YARN流程
基本过程可以概述为以下部分:
1.Client提交任务
2.ResourceManager接受任务,生成相应结构
3.RM状态机转换,通过Scheduler找到能够运行MRAppmaster的NodeManager
4.ApplicationMasterLauncher向NM发送RPC运行MRAppmaster
5.Node Manager update向RM汇报运行情况,Scheduler调度需要在该Node上运行的container,MRAppmaster通过heartbeat向RM汇报以及fairscheduler根据请求资源情况,返回给MRAppmaster可以运行的container信息
6.MRAppmaster向NM发送RPC运行container
7.NM获得运行时,经过本地化,运行该container

状态机与异步

要想详细说明YARN中的流程,就必须要对其中的一些设计思想进行说明,那就必须要了解YARN中状态机的转变以及异步执行。
了解JobTracker代码的人都应该知道,至于JobTracker扩展的重要因素是JobTracker中记录了全量信息,包括所有Job,Job中的每个MapTask以及ReduceTask的状态,并且所有的执行都是串行执行,大大降低了调度效率。为此YARN首先把Job的职责分为两部分,MRAppmaster负责了task管理,ResourceManager负责调度与节点管理。同时,YARN中大量使用了事件驱动模型,都是通过底层的异步dispatcher来将各个module进行解耦,最大降低了由于串行带来的瓶颈。
此外,YARN与MRV1最大的不同之处就在于状态机的引用,状态机+事件驱动使得YARN的内部调度效率大大提高,但是也带来了代码阅读的极大不方便。下面结合一些代码说明一下如何阅读相应的部分:

  public StateMachineFactory
             &lt;OPERAND, STATE, EVENTTYPE, EVENT&gt;
          addTransition(STATE preState, STATE postState,
                        EVENTTYPE eventType,
                        SingleArcTransition&lt;OPERAND, EVENT&gt; hook){
    return new StateMachineFactory&lt;OPERAND, STATE, EVENTTYPE, EVENT&gt;
        (this, new ApplicableSingleOrMultipleTransition&lt;OPERAND, STATE, EVENTTYPE, EVENT&gt;
           (preState, eventType, new SingleInternalArc(postState, hook)));
  }

基本上YARN中的状态机都是调用这一接口,他的意思是preState是原始状态,postState是变更后状态,eventType是触发状态转变的事件,hook是状态转变后需要执行的钩子。基本上我们只要关注eventType,就能找到需要执行的hook,然后记录下postState即可。而每一种eventType都注册到一个handler进行处理。比如下面看一下RMAppImpl代码:

    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())

上面这一过程就是由RMAppEventType.START事件触发,状态由RMAppState.NEW转变为RMAppState.NEW_SAVING,同时执行RMAppNewlySavingTransition函数。

  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
 
      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

调用的是storeNewApplication(app)方法:

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

可以看出在这一步向disptacher发送了一个event,dispatcher将相应event发送到handler处理,至此流程流动了起来。
下面从作业执行的流程一步步详解。

Client提交任务

因为YARN支持多种任务提交,不仅局限与Map Reduce框架,同时支持Spark,甚至自己写的框架(例如我们开发并在线上一直运行的OLS实时系统),每个框架都需要自己写自己的Client端。这里从简单出发,分析MR framework,有时间会分享一下我们所写的OLS框架。
MR Framework使用JobClient来提交任务。流程很简单,首先是将所有jar包,依赖包,split信息,conf等拷贝到hdfs上,随后通过RPC发送submit。submit之前Client会调用YARNRunner对job进行一个封装,方法是createApplicationSubmissionContext,主要的用途就是把任务所有信息,包括任务名称,提交队列,以及MRAppmaster的启动参数进行设置,为下面MRAppmaster在NodeManager的启动进行准备。
封装好ApplicationSubmissionContext后,Client就向Resoucemanager的服务ClientRMService发送RPC请求,提交任务。
至此,Client的工作完成了。总的来说,Client负责的是任务切分,conf各种设置,MRAppmaster启动脚本生成等任务。