从这篇起,开始介绍MRAppmaster与NodeManager交互启动,以及申请资源等内容。
1.接YARN MRAppMaster与Scheduler流程说明,ApplicationMasterLauncher启动一个AMLaucher线程,用于启动MRAppmaster。首先是通过RPC连接,interface是ContainerManagementProtocol,负责ApplicationMaster向NodeManager通信,然后封装所有的运行环境,参数等到context中,随后就是通过RPC向NodeManager发送startContainer命令。
StartContainersResponse response = containerMgrProxy.startContainers(allRequests); |
至此,ResourceManager调度完成,下面的工作交给了NodeManager。
2.ContainerManagerImpl.startContainers
NodeManager是通过ContainerManagerImpl模块来处理ApplicationMasterLauncher的请求的,首先会对ugi,token进行验证,随后就启动container,获得启动的用户,执行的所有命令和conf,随后根据这些信息生成Application对象用来执行。
Container container = new ContainerImpl(getConfig(), this.dispatcher, launchContext, credentials, metrics, containerTokenIdentifier); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); Application application = new ApplicationImpl(dispatcher, user, applicationID, credentials, context); |
随后通过NodeManager的dispatcher发送ApplicationEventType.INIT_APPLICATION事件。
3.ApplicationEventType.INIT_APPLICATION
ApplicationImpl处理ApplicationEventType.INIT_APPLICATION事件,通知logHandler处理
// Inform the logAggregator app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, app.applicationACLs)); |
4.LogHandlerAppStartedEvent
我们集群开启了logAggregation,配置方法为在yarn-site.xml配置yarn.log-aggregation-enable为true。随后由LogAggregationService类进行处理,主要是初始化aggregator,代码为
private void initApp(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, Map<ApplicationAccessType, String> appAcls) { ApplicationEvent eventResponse; try { verifyAndCreateRemoteLogDir(getConfig()); initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); } catch (YarnRuntimeException e) { LOG.warn("Application failed to init aggregation", e); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED); } this.dispatcher.getEventHandler().handle(eventResponse); } |
随后向Dispatcher发送ApplicationEventType.APPLICATION_LOG_HANDLING_INITED事件。
5.ApplicationEventType.APPLICATION_LOG_HANDLING_INITED
由ApplicationImpl处理,代码为
static class AppLogInitDoneTransition implements SingleArcTransition<ApplicationImpl, ApplicationEvent> { @Override public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); } } |
最后通过dispatcher发送ApplicationLocalizationEvent事件。
6. LocalizationEventType.INIT_APPLICATION_RESOURCES
在ContainerManagerImpl中注册了ApplicationLocalizationEvent的处理方法为ResourceLocalizationService,代码是
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); |
这时候开始了MRAppmaster本地化的过程,所谓本地化就是把执行需要的所有资源,环境从集群中下载到执行MRAppmaster的NodeManager中的相应目录中。关于这部分的内容,我推荐看hortonwokrs的这篇文章来仔细了解。
最后发送ApplicationInitedEvent事件到ApplicationImpl处理。到这一步,application初始化完毕,表示application在这台机器上运行,建立了相应的内存结构,下一步开始了初始化container。
7.ApplicationInitedEvent.APPLICATION_INITED
直接发送ContainerEventType.INIT_CONTAINER到distapcher。
8.ContainerEventType.INIT_CONTAINER
ContainerImpl来处理ContainerEventType事件,开始本地化过程。调用了RequestResourcesTransition函数来处理。这里会对所有的资源,根据PUBLIC,PRIVATE,APPLICATION分别处理放到 Map<LocalResourceVisibility, Collection
9.LocalizationEventType.INIT_CONTAINER_RESOURCES
对于7中不同类型的资源,转发给相应的tracker进行处理。处理事件为ResourceEventType.REQUEST
10.ResourceEventType.REQUEST
LocalizedResource来处理这个事件。开始下载过程。代码为
private static class FetchResourceTransition extends ResourceTransition { @Override public void transition(LocalizedResource rsrc, ResourceEvent event) { ResourceRequestEvent req = (ResourceRequestEvent) event; LocalizerContext ctxt = req.getContext(); ContainerId container = ctxt.getContainerId(); rsrc.ref.add(container); rsrc.dispatcher.getEventHandler().handle( new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt, req.getLocalResourceRequest().getPattern())); } } |
11.LocalizerResourceRequestEvent
由ResourceLocalizationService来处理,PUBLIC的资源由PublicLocalizer下载,PRIVATE和APPLICATION的资源由LocalizerRunner来处理。
PublicLocalizer内部有个queue为pending,做了同步,通过线程不断的拉文件系统的数据,通过FSDowload类来做。代码为
pending.put(queue.submit(new FSDownload(lfs, null, conf, publicDirDestPath, resource, request.getContext().getStatCache())), request); |
PRIVATE和APPLICATIOn则是通过LocalizerRunner线程不断拉取,最后有两种方式,一种是DefaultContainerExecutor一种是LinuxContainerExecutor(cgroup用来做资源隔离)来做,看一下默认的Default
// ContainerLocalizer localizer = // new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs), // RecordFactoryProvider.getRecordFactory(getConf())); createUserLocalDirs(localDirs, user); createUserCacheDirs(localDirs, user); createAppDirs(localDirs, user, appId); createAppLogDirs(appId, logDirs); //Path appStorageDir = getFirstApplicationDir(localDirs, user, appId); // randomly choose the local directory Path appStorageDir = getWorkingDir(localDirs, user, appId); String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); Path tokenDst = new Path(appStorageDir, tokenFn); lfs.util().copy(nmPrivateContainerTokensPath, tokenDst); LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); // lfs.setWorkingDirectory(appStorageDir); // LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory()); // TODO: DO it over RPC for maintaining similarity? FileContext localizerFc = FileContext.getFileContext( lfs.getDefaultFileSystem(), getConf()); localizerFc.setUMask(lfs.getUMask()); localizerFc.setWorkingDirectory(appStorageDir); LOG.info("Localizer CWD set to " + appStorageDir + " = " + localizerFc.getWorkingDirectory()); ContainerLocalizer localizer = new ContainerLocalizer(localizerFc, user, appId, locId, getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf())); localizer.runLocalization(nmAddr); |
重要的是ContainerLocalizer,在runLocalization代码为
exec = createDownloadThreadPool(); CompletionService<Path> ecs = createCompletionService(exec); localizeFiles(nodeManager, ecs, ugi); |
用线程池来下载集群中的资源到本地对应目录,并设置相应的权限。而localizeFiles则会周期性的向ResourceLocalizationService汇报状态,
try { nodemanager.heartbeat(status); } catch (YarnException e) { } return; } cs.poll(1000, TimeUnit.MILLISECONDS); |
在ResourceLocalizationService对应代码为
@Override public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) { return localizerTracker.processHeartbeat(status); } |
如果本地化全部完成,在processHeartbeat后会有状态机的改变
case FETCH_SUCCESS: // notify resource try { getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle( new ResourceLocalizedEvent(req, ConverterUtils .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); |
至此,资源本地化完成,container等待执行,此时event为ResourceLocalizedEvent。