上一篇主要介绍了一下YARN的基本概念以及Client的提交流程,这篇文章将会从ResourceManager入手,详解一下提交的流程。
ResouceManger接收任务
ResourceManager通过ClientRMService,首先要解析ApplicationSubmissionContext对象,获得提交的id,同时得到用户和提交的组等相应信息。随后生成一个RMApp,并在ResourceManager中注册,RMApp是Job的抽象,每个Job有一个RMApp与其对应。如果一切检验没问题后,正式提交给ResouceManager。
此后进入了状态机转换的部分,将分阶段说明:
1.RMAppEventType.START
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START)); |
可以看到事件类型是RMAppEvent,再到ResourceManager中找相应代码,看看异步dispatcher注册的handle类是哪个,代码如下:
// Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext)); |
进入ApplicationEventDipatcher看一下代码为
@Private
public static final class ApplicationEventDispatcher implements
EventHandler<RMAppEvent> {
private final RMContext rmContext;
public ApplicationEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void handle(RMAppEvent event) {
ApplicationId appID = event.getApplicationId();
RMApp rmApp = this.rmContext.getRMApps().get(appID);
if (rmApp != null) {
try {
rmApp.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + appID, t);
}
}
}
} |
这样我们就知道去找RMApp的实现类,RMAppImpl去找状态机改变及hook函数。其实也可以用比较简单的方法,就是在hadoop yarn代码中用grep去定位到包含RMAppEventType.START的类,基本上就八九不离十了。当然知道原来是首先需要掌握的。
随后看一下RMAppImpl的代码,状态机部分:
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition()) |
这表示状态RMAppEventType.START会触发RMAppNewlySavingTransition的hook函数,同时将RMAppState由NEW转换为NEW_SAVING。
在RMAppNewlySavingTransition的transition函数中调用了storeNewApplication
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationState appState =
new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
} |
至此,将新的Event重新发到dispatcher中进行处理。
2.RMStateStoreEventType.STORE_APP
RMStateStoreEventType由RMStateStore类来处理,相应的代码如下:
if(isRecoveryEnabled) {
recoveryEnabled = true;
rmStore = RMStateStoreFactory.getStore(conf);
} else {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
}
try {
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
rmStore.setResourceManager(rm);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to init state store", e);
throw e;
} |
默认的我们都是不带recovery,重启后需要重跑,因为recovery读大量目录,时间较慢,可以看到RMStateStore注册dispatcher中。
RMStateStore的handle函数,将event推送给handleStoreEvent方法进行处理,代码如下:
LOG.info("Storing info for app: " + appId);
try {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
storeApplicationStateInternal(appId, appStateData);
notifyDoneStoringApplication(appId, storedException);
} else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
updateApplicationStateInternal(appId, appStateData);
notifyDoneUpdatingApplication(appId, storedException);
}
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
notifyStoreOperationFailed(e);
} |
处理完后会执行notifyDoneStoringApplication方法
private void notifyDoneStoringApplication(ApplicationId appId,
Exception storedException) {
rmDispatcher.getEventHandler().handle(
new RMAppNewSavedEvent(appId, storedException));
} |
又向dispatcher中推送了一个RMAppNewSavedEvent的事件。
3.RMAppEventType.APP_NEW_SAVED
同1的处理,状态机最后将event推送到RMAppImpl处理,代码如下:
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) |
处理方式调用AddApplicationToSchedulerTransition
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user)); |
在ResouceManager中注册了AppAddSchedulerEvent的父类SchdulerEventType
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); |
Scheduler有自己独立的线程去处理,而不去堵塞ResourceManager的处理线程。由于有多重Scheduler,我这里只介绍我们使用的FairScheduler的流程,其它代码查看基本类似。
4.SchedulerEventType.APP_ADDED
FairScheduler的handle方法对多个事件进行处理,APP_ADDED的处理如下
case APP_ADDED:
if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser());
break; |
在addAplication方法中,要获得提交时候生成的RMApp对象,提交的用户以及queue的名称,把任务放到queue中用于后面的调度,调用的代码是assignToQueue,这里会对有效性就行验证。随后在FairScheduler中生成SchedulerApplication对象,代表该任务被调度器接收等待调度,并放到Map<ApplicationId, SchedulerApplication> applications结构中。最后还是通过dispatcher将event转发出去。
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); |
5.RMAppEventType.APP_ACCEPTED
还是通过RMAppImpl来处理APP_ACCEPTED事件。处理方法如下:
private static final class StartAppAttemptTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.createAndStartNewAttempt(false);
};
} |
这一步就是生成一个attempt,每个app每次执行都生成一个新的attempt,并保存在RMApp的内存结构中,attempt可能由于各种原因导致失败,app就会重新启动一个attempt,默认是超过4次,app就fail掉了。最后通过handler将事件发出处理。
6.RMAppAttemptEventType.START
RMAppAttemptEventType.START由刚刚生成的RMAppAttemptImpl来处理,如下代码:
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
RMAppAttemptEventType.START, new AttemptStartedTransition()) |
RMAppAttemptEventType.START触发状态由NEW转化为SUBMITTED,并执行AttemptStartedTransition方法。AttemptStartedTransition方法内需要注意的调用为:
// Register with the ApplicationMasterService
appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId);
// Add the applicationAttempt to the scheduler and inform the scheduler
// whether to transfer the state from previous attempt.
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
} |
主要就是向applicationMaster注册,并且发起调度的事件,期望能够被Scheduler调度。
7.SchedulerEventType.APP_ATTEMPT_ADDED
FairScheduler处理APP_ATTEMPT_ADDED事件。
SchedulerApplication application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
FSSchedulerApp attempt =
new FSSchedulerApp(applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}
application.setCurrentAppAttempt(attempt);
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
RMApp rmApp = rmContext.getRMApps().get(attempt.getApplicationId());
queue.addApp(attempt, runnable,rmApp.getApplicationSubmissionContext().getPriority());
if (runnable) {
maxRunningEnforcer.trackRunnableApp(attempt);
} else {
maxRunningEnforcer.trackNonRunnableApp(attempt);
}
queue.getMetrics().submitAppAttempt(user);
ClusterMetrics.getMetrics().addPendingApp(attempt.getApplicationId());
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user + " is isAttemptRecovering : " + isAttemptRecovering);
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
} |
生成FSSchedulerApp,对应了RMApp中生成的RMAppAttempt。然后根据fair得原则,检查该任务是否可以执行,如果超出queue同时运行的任务数把状态置为目前不可运行,否则就是可运行状态。至此,app处于pending状态,根据队列的情况稍后可以调度还是等待转化为可以调度状态。最后通过dispatcher发送event,RMAppAttemptEventType.ATTEMPT_ADDED通过RMAppAttempt来处理。
8.RMAppAttemptEventType.ATTEMPT_ADDED
RMAppAttemptImpl处理ATTEMPT_ADDED事件,调用了SchedulerTransition代码,如下
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
//Build blackList for AM
Set<NodeId> failNodes = appAttempt.getFailMasterNodes();
List<String> blackList = new ArrayList<String>(failNodes.size());
for(NodeId node : failNodes) {
blackList.add(node.toString());
}
if (!appAttempt.submissionContext.getUnmanagedAM()) {
// Request a container for the AM.
ResourceRequest request =
BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt
.getSubmissionContext().getResource(), 1);
// SchedulerUtils.validateResourceRequests is not necessary because
// AM resource has been checked when submission
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId,
Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, blackList, null);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
}
return RMAppAttemptState.SCHEDULED;
} else {
// save state and then go to LAUNCHED state
appAttempt.storeAttempt();
return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
} |
这里向Scheduler申请一个container,用于执行appmaster,这个container的优先级是0,可以放到任意的机器上运行,同时将申请时候的内存使用量也写上,container个数为1。
提前说明一下,appmaster的优先级是0,reduce是10,map是20,数字越小越优先调度。随后调用FairScheduler的allocate方法,这是最重要的一个方法,包含了mrappmaster申请,mrappmaster向scheduler申请资源等,放到下一篇继续说明。