接上一篇说明,继续app提交在ResouceManager中的状态机流转。
9.FairScheduler.allocate
allocate方法基本上是申请mrappmaster和mrappmaster启动后向ResourceManager申请container资源使用的。它首先检查app是否在已经注册到scheduler中,然后检查申请资源状态,释放掉相应资源,并加入到AppSchedulingInfo结构中,这些内容会在MRAppmaster的流程中更具体的介绍。随后是获取能够运行appmaster的container资源的方法,代码为:
public synchronized ContainersAndNMTokensAllocation pullNewlyAllocatedContainersAndNMTokens() { List<Container> returnContainerList = new ArrayList<Container>(newlyAllocatedContainers.size()); List<NMToken> nmTokens = new ArrayList<NMToken>(); for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i .hasNext();) { RMContainer rmContainer = i.next(); Container container = rmContainer.getContainer(); try { // create container token and NMToken altogether. container.setContainerToken(rmContext.getContainerTokenSecretManager() .createContainerToken(container.getId(), container.getNodeId(), getUser(), container.getResource())); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); if (nmToken != null) { nmTokens.add(nmToken); } } catch (IllegalArgumentException e) { // DNS might be down, skip returning this container. LOG.error("Error trying to assign container token and NM token to" + " an allocated container " + container.getId(), e); continue; } returnContainerList.add(container); i.remove(); rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); } return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens); } |
这里需要说明的是由于YARN大部分操作都是通过Distpatcher进行异步操作,NodeManager向ResouceManager汇报心跳的时候会最后触发FairScheduler的NODE_UPDATE事件,NodeManager会汇报最新执行的,执行结束的Container,Scheduler会放到相应的结构中。然后会从父节点开始分配,直到叶子节点,分配的规则会在调度器部门专门写。最后选出合适的队列,并且从队列里面再根据fair规则分配,检查注册的App是否有合适的需要分配,App中的分配是根据优先级进行的,appmaster>reduce>map,然后再根据本地性找合适的container进行分配。分配成功后会将生成一个RMContainer对象,代表一个运行的container,并且加入到newlyAllocatedContainers结构中。
// Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations appSchedulingInfo.allocate(type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); // Inform the container rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.START)); |
并发送RMContainerEventType.START到dispatcher进行处理。
回到allocate方法,将刚才说到的newlyAllcatedContainers取出,并封装ContainersAndNMTokensAllocation返回,相当于调度成功。
10.RMContainerEventType.START
状态机及处理代码为:
// Transitions from NEW state .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED, RMContainerEventType.START, new ContainerStartedTransition()) private static final class ContainerStartedTransition extends BaseTransition { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( container.appAttemptId)); } } |
就是出发RMAppAttemptEventType.CONTAINER_ALLOCATED给dispatcher调用RMAppimpl进行处理。
11.RMAppAttemptEventType.CONTAINER_ALLOCATED
触发RMAppAttemptEventType.CONTAINER_ALLOCATED事件的方法很多,但是目前为止RMAppimpl状态为RMAppAttemptState.SCHEDULED,通过之前的状态就很容易找到状态机处理函数的入口为:
// Transitions from SCHEDULED State .addTransition(RMAppAttemptState.SCHEDULED, EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition()) |
处理函数其实很简单,就是从get第9步获取的container,并返回。如果获取失败重新调度。否则store这个attempt,触发RMStateStoreEventType.STORE_APP_ATTEMPT事件,代码如下:
public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) { Credentials credentials = getCredentialsFromAppAttempt(appAttempt); ApplicationAttemptState attemptState = new ApplicationAttemptState(appAttempt.getAppAttemptId(), appAttempt.getMasterContainer(), credentials, appAttempt.getStartTime()); dispatcher.getEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); } |
12.RMStateStoreEventType.STORE_APP_ATTEMPT
这个事件很简单,就是根据配置是否保存和保存在哪种介质中,最后触发了RMAppAttemptEventType.ATTEMPT_NEW_SAVED事件,并发送到Dispatcher中。
13.RMAppAttemptEventType.ATTEMPT_NEW_SAVED
这个事件回到RMAppattemptImpl中处理,最后触发AMLauncherEventType.LAUNCH事件。
14.AMLauncherEventType.LAUNCH
这个事件由ApplicationMasterLauncher来处理,ApplicationMasterLauncher是ResouceManger中的一个服务,负责向NodeManager发送请求执行AppMaster。
AMLauncherEventType.LAUNCH的处理函数为
private void launch(RMAppAttempt application) { Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.LAUNCH); masterEvents.add(launcher); } |
masterEvents是一个BlockingQueue,ApplicationMasterLauncher有个线程LauncherThread负责调用生成的Runnable。至于与NodeManager交互放到下一篇中介绍。