YARN MRAppMaster启动流程(一)

从这篇起,开始介绍MRAppmaster与NodeManager交互启动,以及申请资源等内容。

1.接YARN MRAppMaster与Scheduler流程说明,ApplicationMasterLauncher启动一个AMLaucher线程,用于启动MRAppmaster。首先是通过RPC连接,interface是ContainerManagementProtocol,负责ApplicationMaster向NodeManager通信,然后封装所有的运行环境,参数等到context中,随后就是通过RPC向NodeManager发送startContainer命令。

    StartContainersResponse response =
        containerMgrProxy.startContainers(allRequests);

至此,ResourceManager调度完成,下面的工作交给了NodeManager。

2.ContainerManagerImpl.startContainers
NodeManager是通过ContainerManagerImpl模块来处理ApplicationMasterLauncher的请求的,首先会对ugi,token进行验证,随后就启动container,获得启动的用户,执行的所有命令和conf,随后根据这些信息生成Application对象用来执行。

    Container container =
        new ContainerImpl(getConfig(), this.dispatcher, launchContext,
          credentials, metrics, containerTokenIdentifier);
    ApplicationId applicationID =
        containerId.getApplicationAttemptId().getApplicationId();
 
    Application application =
            new ApplicationImpl(dispatcher, user, applicationID, credentials, context);

随后通过NodeManager的dispatcher发送ApplicationEventType.INIT_APPLICATION事件。

3.ApplicationEventType.INIT_APPLICATION
ApplicationImpl处理ApplicationEventType.INIT_APPLICATION事件,通知logHandler处理

      // Inform the logAggregator
      app.dispatcher.getEventHandler().handle(
          new LogHandlerAppStartedEvent(app.appId, app.user,
              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
              app.applicationACLs));

4.LogHandlerAppStartedEvent
我们集群开启了logAggregation,配置方法为在yarn-site.xml配置yarn.log-aggregation-enable为true。随后由LogAggregationService类进行处理,主要是初始化aggregator,代码为

  private void initApp(final ApplicationId appId, String user,
      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
      Map<ApplicationAccessType, String> appAcls) {
    ApplicationEvent eventResponse;
    try {
      verifyAndCreateRemoteLogDir(getConfig());
      initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
      eventResponse = new ApplicationEvent(appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
    } catch (YarnRuntimeException e) {
      LOG.warn("Application failed to init aggregation", e);
      eventResponse = new ApplicationEvent(appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
    }
    this.dispatcher.getEventHandler().handle(eventResponse);
  }

随后向Dispatcher发送ApplicationEventType.APPLICATION_LOG_HANDLING_INITED事件。
5.ApplicationEventType.APPLICATION_LOG_HANDLING_INITED
由ApplicationImpl处理,代码为

  static class AppLogInitDoneTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      app.dispatcher.getEventHandler().handle(
          new ApplicationLocalizationEvent(
              LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
    }
  }

最后通过dispatcher发送ApplicationLocalizationEvent事件。
6. LocalizationEventType.INIT_APPLICATION_RESOURCES
在ContainerManagerImpl中注册了ApplicationLocalizationEvent的处理方法为ResourceLocalizationService,代码是

dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);

这时候开始了MRAppmaster本地化的过程,所谓本地化就是把执行需要的所有资源,环境从集群中下载到执行MRAppmaster的NodeManager中的相应目录中。关于这部分的内容,我推荐看hortonwokrs的这篇文章来仔细了解。
最后发送ApplicationInitedEvent事件到ApplicationImpl处理。到这一步,application初始化完毕,表示application在这台机器上运行,建立了相应的内存结构,下一步开始了初始化container。
7.ApplicationInitedEvent.APPLICATION_INITED
直接发送ContainerEventType.INIT_CONTAINER到distapcher。
8.ContainerEventType.INIT_CONTAINER
ContainerImpl来处理ContainerEventType事件,开始本地化过程。调用了RequestResourcesTransition函数来处理。这里会对所有的资源,根据PUBLIC,PRIVATE,APPLICATION分别处理放到 Map<LocalResourceVisibility, Collection> req 中,发送LocalizationEventType.INIT_CONTAINER_RESOURCES事件。
9.LocalizationEventType.INIT_CONTAINER_RESOURCES
对于7中不同类型的资源,转发给相应的tracker进行处理。处理事件为ResourceEventType.REQUEST
10.ResourceEventType.REQUEST
LocalizedResource来处理这个事件。开始下载过程。代码为

  private static class FetchResourceTransition extends ResourceTransition {
    @Override
    public void transition(LocalizedResource rsrc, ResourceEvent event) {
      ResourceRequestEvent req = (ResourceRequestEvent) event;
      LocalizerContext ctxt = req.getContext();
      ContainerId container = ctxt.getContainerId();
      rsrc.ref.add(container);
      rsrc.dispatcher.getEventHandler().handle(
          new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt, 
              req.getLocalResourceRequest().getPattern()));
    }
  }

11.LocalizerResourceRequestEvent
由ResourceLocalizationService来处理,PUBLIC的资源由PublicLocalizer下载,PRIVATE和APPLICATION的资源由LocalizerRunner来处理。
PublicLocalizer内部有个queue为pending,做了同步,通过线程不断的拉文件系统的数据,通过FSDowload类来做。代码为

              pending.put(queue.submit(new FSDownload(lfs, null, conf,
                  publicDirDestPath, resource, request.getContext().getStatCache())),
                  request);

PRIVATE和APPLICATIOn则是通过LocalizerRunner线程不断拉取,最后有两种方式,一种是DefaultContainerExecutor一种是LinuxContainerExecutor(cgroup用来做资源隔离)来做,看一下默认的Default

//    ContainerLocalizer localizer =
//        new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
//            RecordFactoryProvider.getRecordFactory(getConf()));
 
    createUserLocalDirs(localDirs, user);
    createUserCacheDirs(localDirs, user);
    createAppDirs(localDirs, user, appId);
    createAppLogDirs(appId, logDirs);
 
 
    //Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
    // randomly choose the local directory
    Path appStorageDir = getWorkingDir(localDirs, user, appId);
 
    String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
    Path tokenDst = new Path(appStorageDir, tokenFn);
    lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
    LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
//    lfs.setWorkingDirectory(appStorageDir);
//    LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
    // TODO: DO it over RPC for maintaining similarity?
    FileContext localizerFc = FileContext.getFileContext(
        lfs.getDefaultFileSystem(), getConf());
    localizerFc.setUMask(lfs.getUMask());
    localizerFc.setWorkingDirectory(appStorageDir);
    LOG.info("Localizer CWD set to " + appStorageDir + " = " 
        + localizerFc.getWorkingDirectory());
    ContainerLocalizer localizer =
        new ContainerLocalizer(localizerFc, user, appId, locId, 
            getPaths(localDirs), RecordFactoryProvider.getRecordFactory(getConf()));
 
    localizer.runLocalization(nmAddr);

重要的是ContainerLocalizer,在runLocalization代码为

     exec = createDownloadThreadPool();
     CompletionService<Path> ecs = createCompletionService(exec);
     localizeFiles(nodeManager, ecs, ugi);

用线程池来下载集群中的资源到本地对应目录,并设置相应的权限。而localizeFiles则会周期性的向ResourceLocalizationService汇报状态,

         try {
            nodemanager.heartbeat(status);
          } catch (YarnException e) { }
          return;
        }
        cs.poll(1000, TimeUnit.MILLISECONDS);

在ResourceLocalizationService对应代码为

  @Override
  public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
    return localizerTracker.processHeartbeat(status);
  }

如果本地化全部完成,在processHeartbeat后会有状态机的改变

     case FETCH_SUCCESS:
            // notify resource
            try {
            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
              .handle(
                new ResourceLocalizedEvent(req, ConverterUtils
                  .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));

至此,资源本地化完成,container等待执行,此时event为ResourceLocalizedEvent。

FSImage Parse For Summary

很多时候我们想要解析一个FSImage文件分析当时的hdfs,但是我们并不想获得所有的inode与block,而只是想知道summary,比如有多少个INodes,有多少个Blocks,多少个Directory,多少个Files。如果使用OfflineImageViewr会占用大量的内存,很多时候甚至是不现实的,所以开发一个简单的summary程序方便查看状态十分必要。


首先看一下FSImage文件的结构,FSImage文件按照Section进行存储,
1.MAGIN_HEADER (“HDFSIMG1″)
2.FILE_VERSION,LAYOUT_VERSION
3.SECTION
3.1 NS_INFO Section
包含了timestamp,lastAllocateBlockId,txid等等。
3.2 INODES Section
固化所有INode节点,INodeDirectory节点,FileUnderconstruction节点,以及Snapshot数据。
3.3 DELEGATION_TOKENS Section
3.4 CACHE_POOLS Section
4.FileSummary
每一个Section固化结束后都想summary写入相应的Section名称,长度及偏移量,方便读取。

所以读取的时候我们首先读取到summary中信息,根据summary中信息跳转到文件的offset部分即可读取,对于我上面所说的需求,我们感兴趣的是NS_INFO Section和INode Section,其它Section我们不读即可。
1.读取到NS_INFO Section我们去获得timestamp,lastBlockId和TransactionId。
2.读取到INdoes Section,我们只需记录文件个数和目录个数即可。
工程在我的github

YARN Container cleanup kill其它进程导致的NodeManager 挂起


一、现象

在我最近的升级过程中,经常发现一些NodeManager无关挂起,并且挂起前没有任何日志,查看dmesg,也没有任何异常。对于这种情况,非常难查原因,经过同事排查,最后确定是由于Yarn Container的cleanup导致的bug。


二、原因及解决方法

这个问题的jira号是YARN-3678,这个问题产生的原因是当container执行结束后会通过状态机执行cleanup的操作,实现的类是ContainersLauncher.java。cleanup的逻辑如下图:
Alt
1. 首先kill SIGTERM pid,让container能够优雅的退出
2. 随后kill SIGKILL pid,直接kill -9
3. 这时候可能会产生一些问题,如果在这250ms之内这个container已经退出,同时这个pid被分配给其它线程使用了,这时候kill掉新启动的线程,如果是同一个用户启动的话就可能kill掉该线程对应的整个进程。
说的极端一点,如果kill的是一个NodeManager新启动的线程,就会造成NodeManager挂起,这就是产生的原因。
但是这个现象产生需要一定的条件,对于Linux Container Executor,如果使用不同的用户去启动,那么即使kill掉这个pid,也不会被杀。对于Default Container Executor,则会出现这一问题。
为什么没有使用不同用户启动container的原因是你需要将所有用户的账号同步的集群中的所有机器中,这对于我们是不现实的。
为此,我们需要修改代码,修改方法也很简单,在kill -9之前ps一下这个进程的pid,查看一下是否是之前执行的containerId,就可以了,具体代码在github

Hadoop classpath问题

    近期使用一个Hadoop周边系统,druid的时候发现一个问题,在druid配置hdfs位置的时候配置如下:

# Deep storage (local filesystem for examples – don’t use this in production)
druid.storage.type=hdfs
druid.storage.storageDirectory=hdfs://ns****/druid/localStorage

    启动脚本为:

java -Xmx256m -Ddruid.realtime.specFile=examples/wikipedia/wikipedia_realtime_kafka.spec -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/realtime:lib/* io.druid.cli.Main server realtime

    大多数开启HA的Hadoop集群,如果使用集群的话基本都是这种配置。

    启动Druid后发现,并没有写入我想要的集群,而写入了另一个集群,反复查找,以为打包过程中混入了错误的hdfs-site.xml文件等等,但是并没有找到。最后在同事的帮助下发现,原来我们曾经配置过一个vip域名,ns****指向了某台hadoop机器的中心机,翻过来看这个问题才想到不是由于打入了错误的hdfs-site.xml文件,而是环境变量没有加入Hadoop的conf文件。

    修改启动脚本为:

java -Xmx256m -Ddruid.realtime.specFile=examples/wikipedia/wikipedia_realtime_kafka.spec -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/realtime:$HADOOP_HOME/etc/hadoop:lib/* io.druid.cli.Main server realtime

    启动后一切正常。经验,Hadoop周边系统一定要加好CLASSPATH。

Yarn shuffle OOM错误分析及解决

      最近集群中一些任务经常在reduce端跑出Shuffle OOM的错误,具体错误如下:

2015-03-09 16:19:13,646 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#14
	at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1550)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:56)
	at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:46)
	at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.(InMemoryMapOutput.java:63)
	at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297)
	at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287)
	at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
	at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
	at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)

vBomu

      先看一下基本流程,map端进行处理后将结果放在map端local路径中,map端不断心跳汇报给mrappmaster,在适当的阶段(另外可以写一个流程说明),reduce启动,reduce发送心跳给mrappmaster,获取已经结束的maptask对象。之后对已经结束的map进程的数据进行拉取俗称Shuffle,拉取是通过Fetcher线程进行的,随后进行sort。

      有关的几个重要参数

public static final String SHUFFLE_INPUT_BUFFER_PERCENT = “mapreduce.reduce.shuffle.input.buffer.percent”;     默认0.7

public static final String SHUFFLE_MEMORY_LIMIT_PERCENT = “mapreduce.reduce.shuffle.memory.limit.percent”;     默认0.25

public static final String SHUFFLE_MERGE_PERCENT = “mapreduce.reduce.shuffle.merge.percent”;     默认0.66

      个问题是在Fetcher过程中爆出的。首先解释一下参数,第一个参数SHUFFLE_INPUT_BUFFER_PERCENT是指在总的HeapSize中shuffle占得内存百分比我们总的HeapSize是1.5G,那大概Fetcher就是1.0G。 SHUFFLE_MEMORY_LIMIT_PERCENT是指的map copy过来的数据是放内存中还是直接写磁盘。 超过1.5G*0.7*0.25=250M的都放在磁盘中,其它开辟内存空间,放在内存中。 SHUFFLE_MERGE_PERCENT是指merge的百分比,超过这个百分比后停止fetcher,进行merge,merge到磁盘中。   

      跑出OOM后,调了下jvm参数,获取heapdump数据,根据MAT获取以下数据。数据如下:2015-03-10_11-11-212015-03-10_11-11-44

      首先发现整体的内存并没有到1.5G。其次,看了下内存对象分布,byte数组占了很大比例,这也很正常,所有内存中的buffer都是以byte数组形式出现的。在对比一下byte数组大小,大于900M,这就有一个问题了,首先整体HeapSize是1.5G,old区大概是1个G,这时候如果byte数组是900M来一个100M+的拷贝,由于是大内存开辟,不会进入Young区,直接开辟内存空间到Old区,而Old区即使fullgc也没有那么多连续空间,所以分配失败,报OOM错误。这时,只是一个假设,调整Xmn参数,减小Young区内存大小,增大Old区进行测试,成功,印证了想法。

      但是对于我们跑任务调整jvm参数毕竟不现实,那么我们根据经验调整SHUFFLE_INPUT_BUFFER_PERCENT参数就可以了,调整为0.6即可解决问题。

DataNode与NameNode通讯

这是我近期做的关于DataNode与NameNode之间通讯的一个简单培训,这也是在查询DataNode动态上下盘时候发现问题时候顺便总结的。关于动态上下盘的Bug会在稍后写一篇文章分享。

因为被扫端口导致JobTracker挂起问题

       最近一段时间我们的Hbase集群所用的JobTracker(hadoop基线版本1.0.2)天天夜里挂起,停止服务,无法回应任何TaskTracker心跳,进而无法调度任务。以下是心跳汇报报错:

                                                 Jt1

      可以看出最后出错在fairscheduler调度任务阶段,有pool name是null,导致了最后排序报NPE错误。        

      最后查找这个NULL值得来源,通过jira发现社区已经发现了这一个问题,jira地址是https://issues.apache.org/jira/browse/MAPREDUCE-4195。        简单的说一下这个问题,第一步是有人通过脚本或者工具(不是通过页面链接)调用了jobqueue_details.jsp页面,错误发生在下面代码

  String queueName = request.getParameter("queueName");
  TaskScheduler scheduler = tracker.getTaskScheduler();
  Collection jobs = scheduler.getJobs(queueName);

queueName传入的肯定是null,然后调用FairScheduler的getJobs,此时queueName是null。FairScheduler会调用PoolManager的getPool方法: 

  public synchronized Pool getPool(String name) {
    Pool pool = pools.get(name);
    if (pool == null) {
      pool = new Pool(scheduler, name);
      pool.setSchedulingMode(defaultSchedulingMode);
      pools.put(name, pool);
    }
    return pool;
  }

      通过这个方法,就产生了名称是NULL的queueName。此后就跟前文所说,调度器挂起,JT无法调度任何服务。解决方法:参见jira。

      另外,提一下我们产生这个问题的原因,是由于Sina曾经被人攻陷内网,安全组有两台服务器不停的扫内部服务器端口,hadoop bug就这样被触发了。

 

Hadoop trunk maven编译问题

 从Hadoop trunk git库中pull下来代码后,需要进行编译,但是编译过程中报以下错误:

[ERROR] Plugin org.apache.hadoop:hadoop-maven-plugins:3.0.0-SNAPSHOT or one of its dependencies could not be resolved: Could not find artifact org.apache.hadoop:hadoop-maven-plugins:jar:3.0.0-SNAPSHOT -> [Help 1]

       解决方法很简单:

       $ cd hadoop-maven-plugins

       $ mvn install

ThreadLocal造成Hadoop权限混乱问题

一、问题现象

       10月底进行了一次中心机迁移,迁移完成后发现大量文件的权限不正确,导致了用户任务大量出问题。为暂时解决问题,我们对所有的目录增加读和执行权限(a+rx),对所有文件增加了读(a+r)权限,以保证用户能够运行任务。

P1

         如上图所示,用户文件被加入了Acl功能,而管理员并没有为该用户开通任何Acl功能。

P2

        查询Acl后,发现用户加入了一个完全无关的Acl。可以确定是Hadoop的元数据出了问题,导致了权限的混乱。

二、bug 查找

       第二天取消了大部分目录的ACL,同时又设置了一些ACL。在查找问题的过程中发现从新中心机拿出FSImage并重新加载,取消的ACL也有部分设置了新的ACL,可是从新的中心机内存中通过hdfs dfs -ls 查询后权限是完全正常的。初步怀疑是SNN merge Edit和FSImage出问题或者FSImage EditLog写入的时候出问题,最复杂的就是JournalNode内部出问题(最不可能的一种情况)。

       首先查看了一下SNN的merge代码路径,与ANN的savenamespace 路径是完全一致的。写了一个简单的测试程序,从ANN大量随机的建立目录(包含ACL和不含ACL)并记录其EditLog日志,同时从SNN不断的读取journalnode中的Edits,并记录,对比两边Edit日志,最后发现两边的Edit日志是完全一样的,这就排除了是SNN后者Journalnode出的问题。进而去查看记录的EditLog日志,发现在ANN中的Editlog是存在问题的,一些本没有设置ACL的目录被加入了ACL记录,说明这是在ANN log Editlog的时候出现的问题。查看mkdir的代码路径,代码段一直是在sync中的,所以肯定不是由竞争导致的数据不一致,继续查找,在logMkdir部分发现了这段代码,

  public void logMkDir(String path, INode newNode) {

    PermissionStatus permissions = newNode.getPermissionStatus();

    MkdirOp op = MkdirOp.getInstance(cache.get())

      .setInodeId(newNode.getId())

      .setPath(path)

      .setTimestamp(newNode.getModificationTime())

      .setPermissionStatus(permissions);

 

    AclFeature f = newNode.getAclFeature();

    if (f != null) {

      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));

    }

    logEdit(op);

  }

           其中cache是一个ThreadLocal的变量,属于线程私有变量,这就存在了一个非常严重的问题,假设之前这个线程mkdir继承了一个ACL,那么就会走到op.setAclEntries,op是从cache中获得的,这次mkdir没有任何问题。下面这个线程又logMkdir,这次newNode是一个普通的INode,不带ACL属性,那么就不会设置任何AclEntries,但是cache中的MkdirOp是存在AclEntries属性的,就是上次设置的属性,这样logEdit以后这个dir的permission就混乱了,不是自己的属性了。导致了重启后属性出现的问题。

       这个bug我已经反馈给了社区,jira号是https://issues.apache.org/jira/browse/HDFS-7385 

 

三、解决方案

          patch非常简单,对于没有AclEntries的op,设置null即可。对于已经存在问题的NameNode,从ANN saveNamespace,获得一个正确的FSImage,SNN先升级代码,读取正确的Image,启动,切换为ANN,再把原来的ANN升级。