从这篇起,开始介绍MRAppmaster与NodeManager交互启动,以及申请资源等内容。
1.接YARN MRAppMaster与Scheduler流程说明,ApplicationMasterLauncher启动一个AMLaucher线程,用于启动MRAppmaster。首先是通过RPC连接,interface是ContainerManagementProtocol,负责ApplicationMaster向NodeManager通信,然后封装所有的运行环境,参数等到context中,随后就是通过RPC向NodeManager发送startContainer命令。
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests); |
至此,ResourceManager调度完成,下面的工作交给了NodeManager。
2.ContainerManagerImpl.startContainers
NodeManager是通过ContainerManagerImpl模块来处理ApplicationMasterLauncher的请求的,首先会对ugi,token进行验证,随后就启动container,获得启动的用户,执行的所有命令和conf,随后根据这些信息生成Application对象用来执行。
Container container =
new ContainerImpl(getConfig(), this.dispatcher, launchContext,
credentials, metrics, containerTokenIdentifier);
ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
Application application =
new ApplicationImpl(dispatcher, user, applicationID, credentials, context); |
随后通过NodeManager的dispatcher发送ApplicationEventType.INIT_APPLICATION事件。
3.ApplicationEventType.INIT_APPLICATION
ApplicationImpl处理ApplicationEventType.INIT_APPLICATION事件,通知logHandler处理
// Inform the logAggregator
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
app.applicationACLs)); |
4.LogHandlerAppStartedEvent
我们集群开启了logAggregation,配置方法为在yarn-site.xml配置yarn.log-aggregation-enable为true。随后由LogAggregationService类进行处理,主要是初始化aggregator,代码为
private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
ApplicationEvent eventResponse;
try {
verifyAndCreateRemoteLogDir(getConfig());
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
} catch (YarnRuntimeException e) {
LOG.warn("Application failed to init aggregation", e);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
}
this.dispatcher.getEventHandler().handle(eventResponse);
} |
随后向Dispatcher发送ApplicationEventType.APPLICATION_LOG_HANDLING_INITED事件。
5.ApplicationEventType.APPLICATION_LOG_HANDLING_INITED
由ApplicationImpl处理,代码为
static class AppLogInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
}
} |
最后通过dispatcher发送ApplicationLocalizationEvent事件。
6. LocalizationEventType.INIT_APPLICATION_RESOURCES
在ContainerManagerImpl中注册了ApplicationLocalizationEvent的处理方法为ResourceLocalizationService,代码是
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); |
这时候开始了MRAppmaster本地化的过程,所谓本地化就是把执行需要的所有资源,环境从集群中下载到执行MRAppmaster的NodeManager中的相应目录中。关于这部分的内容,我推荐看hortonwokrs的这篇文章来仔细了解。
最后发送ApplicationInitedEvent事件到ApplicationImpl处理。到这一步,application初始化完毕,表示application在这台机器上运行,建立了相应的内存结构,下一步开始了初始化container。
7.ApplicationInitedEvent.APPLICATION_INITED
直接发送ContainerEventType.INIT_CONTAINER到distapcher。
8.ContainerEventType.INIT_CONTAINER
ContainerImpl来处理ContainerEventType事件,开始本地化过程。调用了RequestResourcesTransition函数来处理。这里会对所有的资源,根据PUBLIC,PRIVATE,APPLICATION分别处理放到 Map<LocalResourceVisibility, Collection> req 中,发送LocalizationEventType.INIT_CONTAINER_RESOURCES事件。
9.LocalizationEventType.INIT_CONTAINER_RESOURCES
对于7中不同类型的资源,转发给相应的tracker进行处理。处理事件为ResourceEventType.REQUEST
10.ResourceEventType.REQUEST
LocalizedResource来处理这个事件。开始下载过程。代码为
private static class FetchResourceTransition extends ResourceTransition {
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceRequestEvent req = (ResourceRequestEvent) event;
LocalizerContext ctxt = req.getContext();
ContainerId container = ctxt.getContainerId();
rsrc.ref.add(container);
rsrc.dispatcher.getEventHandler().handle(
new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt,
req.getLocalResourceRequest().getPattern()));
}
} |
11.LocalizerResourceRequestEvent
由ResourceLocalizationService来处理,PUBLIC的资源由PublicLocalizer下载,PRIVATE和APPLICATION的资源由LocalizerRunner来处理。
PublicLocalizer内部有个queue为pending,做了同步,通过线程不断的拉文件系统的数据,通过FSDowload类来做。代码为
pending.put(queue.submit(new FSDownload(lfs, null, conf,
publicDirDestPath, resource, request.getContext().getStatCache())),
request); |
PRIVATE和APPLICATIOn则是通过LocalizerRunner线程不断拉取,最后有两种方式,一种是DefaultContainerExecutor一种是LinuxContainerExecutor(cgroup用来做资源隔离)来做,看一下默认的Default
// ContainerLocalizer localizer =
// new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
// RecordFactoryProvider.getRecordFactory(getConf()));
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
createAppLogDirs(appId, logDirs);
//Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
// randomly choose the local directory
Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
// lfs.setWorkingDirectory(appStorageDir);
// LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
// TODO: DO it over RPC for maintaining similarity?
FileContext localizerFc = FileContext.getFileContext(
lfs.getDefaultFileSystem(), getConf());
localizerFc.setUMask(lfs.getUMask());
localizerFc.setWorkingDirectory(appStorageDir);
LOG.info("Localizer CWD set to " + appStorageDir + " = "
+ localizerFc.getWorkingDirectory());
ContainerLocalizer localizer =
new ContainerLocalizer(localizerFc, user, appId, locId,
getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf()));
localizer.runLocalization(nmAddr); |
重要的是ContainerLocalizer,在runLocalization代码为
exec = createDownloadThreadPool();
CompletionService<Path> ecs = createCompletionService(exec);
localizeFiles(nodeManager, ecs, ugi); |
用线程池来下载集群中的资源到本地对应目录,并设置相应的权限。而localizeFiles则会周期性的向ResourceLocalizationService汇报状态,
try {
nodemanager.heartbeat(status);
} catch (YarnException e) { }
return;
}
cs.poll(1000, TimeUnit.MILLISECONDS); |
在ResourceLocalizationService对应代码为
@Override
public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
return localizerTracker.processHeartbeat(status);
} |
如果本地化全部完成,在processHeartbeat后会有状态机的改变
case FETCH_SUCCESS:
// notify resource
try {
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(
new ResourceLocalizedEvent(req, ConverterUtils
.getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize())); |
至此,资源本地化完成,container等待执行,此时event为ResourceLocalizedEvent。