谈谈HftpFileSystem

最近在排查一个distcp反复失败的问题,我们知道distcp是对两个集群进行数据拷贝的工具,对于远程集群或者版本不一致的集群,我们使用hftp拉取数据,使用的方法如下:

hadoop distcp hftp://sourceFS:50070/src hdfs://destFS:50070/dest

Hftp的Schema是hftp开头的,实现的类为HftpFileSystem。
那Hftp的读取流程是如何呢,比如读取一个文件,流程如下:
流程说明
仔细说明一下hftp的流程:
1、首先Hftp会拼出NameNode的http URL,所有的Hftp交互都是通过http端口进行的,NameNode默认的http端口是50070,代码为

f = f.makeQualified(getUri(), getWorkingDirectory());
String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
URL u = getNamenodeURL(path, query);
return new FSDataInputStream(new RangeHeaderInputStream(connectionFactory, u));

这个时候还没有与NameNode进行连接。当openInputStream的时候才会进行实际连接,代码为

protected InputStream openInputStream() throws IOException {
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null;
final URLOpener opener = resolved? resolvedURL: originalURL;

final HttpURLConnection connection = opener.connect(startPos, resolved);

2、NameNode通过NamenodeWebHdfsMethods类来处理所有的HTTP请求,对于hftp发送的Open请求,NameNode的处理逻辑是根据位置选择一个最适合的DataNode,然后重定向把URL设置成该DataNode的http端口返回给Hftp client端。

final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();

至此,完成了Client与NameNode的交互,Client然后与DataNode进行交互。
3、Client根据重定向的URL与需要读取数据的DataNode建立连接,注意这一步走的还是HTTP端口,DataNode默认的HTTP端口是50075。

resolvedURL.setURL(getResolvedUrl(connection));

InputStream in = connection.getInputStream();

4、在DataNode内部通过DatanodeWebHdfsMethods类来处理HTTP请求,对于OPEN的请求,处理方式为

case OPEN:
{
final int b = bufferSize.getValue(conf);
final DFSClient dfsclient = newDfsClient(nnId, conf);
HdfsDataInputStream in = null;
try {
in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true));
in.seek(offset.getValue());
} catch(IOException ioe) {
IOUtils.cleanup(LOG, in);
IOUtils.cleanup(LOG, dfsclient);
throw ioe;
}

final long n = length.getValue() != null ?
Math.min(length.getValue(), in.getVisibleLength() - offset.getValue()) :
in.getVisibleLength() - offset.getValue();

/**
* Allow the Web UI to perform an AJAX request to get the data.
*/
return Response.ok(new OpenEntity(in, n, dfsclient))
.type(MediaType.APPLICATION_OCTET_STREAM)
.header("Access-Control-Allow-Methods", "GET")
.header("Access-Control-Allow-Origin", "*")
.build();
}

内部如果打开了本地读那么就生成了一个本地读(BlockReaderLocal)的DFSClient来读取数据,如果没有开启本地读,那么就生成一个Socket连接来读取数据。
5,6、返回给client inputstream用于数据读取。至此一个Hftp读取的流程结束。

我们遇到的问题

当执行distcp的时候有一端为hftp,发现任务都fail read一个文件,该文件所在的机器状态不太正常,查看网络连接,50075端口有很多CLOSE_WAIT,并且datanode进程写入、读取都非常慢,判断当时情况网络连接不正常,因为NameNode的HTTP接口只返回了一个DataNode地址,并不是像RPC接口一样,返回DataNode列表,如果一个出现问题,还可以读取其他的DataNode。在这个时候由于这台出问题的DataNode导致了反复无法读取数据,当时解决的方法是重启该进程,使得读取到其他的DataNode上。
至于深层原因,根当时的DN状态有关,由于没有保存完整堆栈信息,只能在下次出错的时候继续排查,但是基本思路是DN的socket server响应慢,由于内部逻辑导致的。

Hadoop 2运行distCpV1

记录一下,有些情况,需要使用distcpV1,命令为

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-extras-2.4.0.jar org.apache.hadoop.tools.DistCpV1 $source $dest

Hadoop Web Page Information

Sometimes, we want make sure which branch we compile the hadoop code. Normally, we can find this information through hadoop web page, for example the namenode web page:


If we want to change the information by self, we should modify the file in
$HADOOP_HOME/hadoop-common-project/hadoop-common/src/main/resources/common-version-info.properties

version=${pom.version}
revision=${version-info.scm.commit}
branch=${version-info.scm.branch}
user=${user.name}
date=${version-info.build.time}
url=${version-info.scm.uri}
srcChecksum=${version-info.source.md5}
protocVersion=${protobuf.version}

Just modify the value , you can change the web page of namenode. For example we can change the branch to branch=tag-20160817 as the image above.

Hadoop Balancer问题

最近关注了一下集群Balancer的运转,由于集群非常大,并且有一些长时任务一只运行,磁盘不均衡的问题非常严重,所以快速的均衡数据是非常必要的。
但是从日志上看,我们的Balancer经常hang住。之前的做法是通过一个cron脚本进行检查,hang住一段时间后,自动的重启balancer。
这两天特意看了一下日志和代码,发现hang住的原因是由于npe导致的。具体的代码如下:

             // update locations
             for (String datanodeUuid : blk.getDatanodeUuids()) {
               final BalancerDatanode d = datanodeMap.get(datanodeUuid);
-              if (datanode != null) { // not an unknown datanode
+              if (d != null) { // not an unknown datanode
                 block.addLocation(d);
               }
             }

进行判断的对象错误了,将datanode改为d就好了。原先的做法会导致Balncer提交的copy任务抛出npe错误。

    for (Source source : sources) {
      futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
    }

    // wait for all dispatcher threads to finish
    for (Future<?> future : futures) {
      try {
        future.get();
      } catch (ExecutionException e) {
        LOG.warn("Dispatcher thread failed", e.getCause());
      }
    }

在这部分,由于提交的BlockMoveDispatcher任务抛出了npe,同时在future.get没有设置超时,就会一只hang住。
解决方法:除了修改npe以为,还可以在future.get()设置超时时间,确保不会一直hang住。

ResourceManager dispatcher handler slow because RMStore Synchronized Method

Recently, I have noticed the Async dispatcher in our resource manager get pending for some times.
Here are some log:

2016-05-24 00:46:20,398 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 24000
2016-05-24 00:46:21,008 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 25000
2016-05-24 00:46:21,632 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 26000
2016-05-24 00:46:22,251 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 27000
2016-05-24 00:46:22,873 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 28000
2016-05-24 00:46:23,501 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 29000
2016-05-24 00:46:24,109 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 30000  

As we all know, the async dispatcher in resource manager normally handle the event quickly enough, but from the log ,we can notice the pending situation is serious.
So we investigated this problem, and jstack the rescue manager process during pending. Here is the jstack information:

"AsyncDispatcher event handler" prio=10 tid=0x00007f4d6db10000 nid=0x5bca waiting for monitor entry [0x00007f4d3aa8c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.storeNewApplication(RMStateStore.java:375)
        - waiting to lock <0x00000003bae88af0> (a org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl$RMAppNewlySavingTransition.transition(RMAppImpl.java:881)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl$RMAppNewlySavingTransition.transition(RMAppImpl.java:872)
        at org.apache.hadoop.yarn.state.StateMachineFactory$SingleInternalArc.doTransition(StateMachineFactory.java:362)
        at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
        at org.apache.hadoop.yarn.state.StateMachineFactory.access$300(StateMachineFactory.java:46)
        at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:448)
        - locked <0x0000000394cbae40> (a org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.handle(RMAppImpl.java:645)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.handle(RMAppImpl.java:82)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$ApplicationEventDispatcher.handle(ResourceManager.java:690)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$ApplicationEventDispatcher.handle(ResourceManager.java:674)
        at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:173)
        at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:106)
        at java.lang.Thread.run(Thread.java:662)

"AsyncDispatcher event handler" daemon prio=10 tid=0x00007f4d6d8f6000 nid=0x5c32 in Object.wait() [0x00007f4d3a183000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2031)
        - locked <0x000000032bc7bd58> (a java.util.LinkedList)
        at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2015)
        at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2113)
        - locked <0x000000032bc7ba80> (a org.apache.hadoop.hdfs.DFSOutputStream)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:70)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:103)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore.writeFile(FileSystemRMStateStore.java:528)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore.storeApplicationStateInternal(FileSystemRMStateStore.java:329)
        - locked <0x00000003bae88af0> (a org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.handleStoreEvent(RMStateStore.java:625)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$ForwardingEventHandler.handle(RMStateStore.java:770)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$ForwardingEventHandler.handle(RMStateStore.java:765)
        at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:173)
        at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:106)
        at java.lang.Thread.run(Thread.java:662)

It seemed the async dispatcher in resource manager is blocked by the method of storeNewApplication in RMSteateStore.
From the code we know there are two async dispatcher in resource manager process. One is the main dispatcher for whole resource manager to deal with applications submit, scheduler and other staff. The other is the dispatcher in rmstore, the function of rmstore can be explained in this blog. Because rmstore use hdfs or zk for backup, the process time is slow, so it have its own dispatcher in case not to pending the main dispatcher of resource manager.
Unfortunately, we use hdfs for our rmstore back up, deep inside the code

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

The main dispatcher handle the event to store the new application context information, this method only pass the event to the rmstatestore dispatcher and return immediately. But the method is sync. And in the child class of rmstatestore — Filesystemrmstatestore, the code to store application to hdfs is as follow:

  @Override
  public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    String appIdStr = appId.toString();
    Path appDirPath = getAppDir(rmAppRoot, appIdStr);
    fs.mkdirs(appDirPath);
    Path nodeCreatePath = getNodePath(appDirPath, appIdStr);

    LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      writeFile(nodeCreatePath, appStateData);
    } catch (Exception e) {
      LOG.info("Error storing info for app: " + appId, e);
      throw e;
    }
  }

This method is also sync. So if dispatcher in rmstatestore occupy the lock, and write to hdfs slow, it is easy to find the main dispatcher get pending to wait for the lock. And the lock is meaningless for main dispatcher, so we can just get rid of the lock.
There are related jira for this problem, the jira is YARN-4398.

Some Note About Deploy Hadoop Clusters

1.Download source code, using maven to compile and package, remember to compile the native for the same OS version.

mvn package -Pdist -Pnative -Dtar -DskipTests

2.Edit the core-site.xml, hdfs-site.xml, mapred-site.xml and yarn-site.xml file
you should add some JVM parameters or log position in /etc/bashrc, just a example below:

export JAVA_HOME=/usr/local/jdk1.7.0_67
export JRE_HOME=/usr/local/jdk1.7.0_67/jre
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export HADOOP_HOME=/usr/local/hadoop-2.4.0
export HADOOP_LOG_DIR=/data0/hadoop/log/hadoop
export HADOOP_PID_DIR=/data0/hadoop/pid/hadoop
export YARN_LOG_DIR=/data0/hadoop/log/yarn
export YARN_PID_DIR=/data0/hadoop/pid/yarn
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export HADOOP_NAMENODE_OPTS=" -Xmx20480m -Xms20480m -Xmn3072m -verbose:gc -Xloggc:/data0/hadoop/gclog/namenode.gc.log -XX:ErrorFile=/data0/hadoop/gclog/hs_err_pid.log -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=85 -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 -XX:+CMSClassUnloadingEnabled -XX:+DisableExplicitGC -Dcom.sun.management.jmxremote.port=6000 -Dcom.sun.management.jmxremote.ssl=false  -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.password.file=/usr/local/hadoop-2.4.0/etc/hadoop/jmxremote.password"
export YARN_RESOURCEMANAGER_OPTS=" -Xmx10240m -Xms10240m -Xmn3072m -verbose:gc -Xloggc:/data0/hadoop/gclog/yarn.gc.log -XX:ErrorFile=/data0/hadoop/gclog/hs_err_pid.log -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 -XX:+CMSClassUnloadingEnabled -XX:+DisableExplicitGC -Dcom.sun.management.jmxremote.port=6001 -Dcom.sun.management.jmxremote.ssl=false  -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.password.file=/usr/local/hadoop-2.4.0/etc/hadoop/jmxremote.password"
ulimit -u 65535

3.Put the packaged code into all the servers, including namenodes, resourcemanagers, nodemanagers and datanodes
4.startup journalnode first, i assume you use qjournal

hadoop-daemon.sh start journalnode

5.format namenode for specific namespace

hdfs namenode -format
if you use federation, make sure the cluster id are the same,so if the first ns cluster id is abcdefg, the second ns should format with the cluster id, hdfs namenode -format -clusterId=abcdefg

6.init the standby namenode for the same namespace

hdfs namenode -bootstrapStandby

7.start namenodes and datanodes

hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode

8.transition to active namenode

for example namespace is ns, active namenode is nn1
 hdfs haadmin -ns ns -transitionToActive nn1

9.mkdir dir for hadoop user and mapred(the user to startup resource manager and history server) user
10.mkdir for history server
for example the mapred-site.xml set history directory

mapreduce.jobhistory.intermediate-done-dir
hdfs://hbasens/hadoop/history/tmp
true
mapreduce.jobhistory.done-dir
hdfs://hbasens/hadoop/history/done
true

you have to set directory like this

hdfs dfs -mkdir -p /hadoop/history/tmp
hdfs dfs -chown -R mapred:mapred /hadoop/history
hdfs dfs -chmod -R 1777 /hadoop/history/tmp
hdfs dfs -mkdir -p /hadoop/history/done
hdfs dfs -chmod -R 1777 /hadoop/history/done

11.startup resourcemanager and nodemanager and mr history server

yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
mr-jobhistory-daemon.sh start historyserver

YARN-4493 ResourceManager moveQueue bug

When moving a running application to a different queue in resourceManger , the current implement don’t check if the app can run in the new queue before remove it from current queue. So if the destination queue is full, the app will throw exception, and don’t belong to any queue.
Here is the code

   private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
       FSLeafQueue oldQueue, FSLeafQueue newQueue) {
     boolean wasRunnable = oldQueue.removeApp(attempt);
     // if app was not runnable before, it may be runnable now
     boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
         attempt.getUser());
     if (wasRunnable && !nowRunnable) {
       throw new IllegalStateException("Should have already verified that app "
           + attempt.getApplicationId() + " would be runnable in new queue");

After that, the queue become orphane, can not schedule any resources. If you kill the app, the removeApp method in FSLeafQueue will throw IllealStateException of “Given app to remove app does not exist in queue …” exception.
So i think we should check if the destination queue can run the app before remove it from the current queue.
The patch is below:

   private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
       FSLeafQueue oldQueue, FSLeafQueue newQueue) {
-    boolean wasRunnable = oldQueue.removeApp(attempt);
     // if app was not runnable before, it may be runnable now
     boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
         attempt.getUser());
+    boolean wasRunnable = false;
+    if (nowRunnable) {
+        wasRunnable = oldQueue.removeApp(attempt);
+    }
     if (wasRunnable && !nowRunnable) {
       throw new IllegalStateException("Should have already verified that app "
           + attempt.getApplicationId() + " would be runnable in new queue");

DataXceiver本地读异常bug说明(HDFS-11802)

现象描述

用户在读取文件的时候报三台DN都无法取得该文件对应的block,经过fsck检查后没有发现该文件有丢块现象,到对应的dn上去查看日志,发现三台机器已经都处于不可读状态,报错为

2015-11-25 00:01:55,999 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: 10.39.5.160:50010:DataXceiverServer:
java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent xcievers: 4096
    at org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
    at java.lang.Thread.run(Thread.java:745)

很明显,这是超过了dataxceiver设置的最大的线程数4096,正常情况下是不可能超过的,所以说明dn有线程泄露的bug或者其它问题。
再检查日志,发现了以下的日志

Exception in thread "Thread-19" java.lang.IllegalStateException: failed to remove c53ce04928d1baa854f5dc1bfc8d565b
    at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
    at org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.removeShm(ShortCircuitRegistry.java:115)
    at org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry$RegisteredShm.handle(ShortCircuitRegistry.java:102)
    at org.apache.hadoop.net.unix.DomainSocketWatcher.sendCallback(DomainSocketWatcher.java:371)
    at org.apache.hadoop.net.unix.DomainSocketWatcher.access$1000(DomainSocketWatcher.java:52)
    at org.apache.hadoop.net.unix.DomainSocketWatcher$1.run(DomainSocketWatcher.java:511)
    at java.lang.Thread.run(Thread.java:745)

看到这个日志以后就大概知道了原因,是由于DomainSocketWatcher线程异常退出,导致本地读线程没有回收机制,占满了所有的dataxceiver slot导致的。

问题原因

DomainSocketWatcher线程负责对本地读线程建立的socket进行一些处理和清理等。出问题的代码为

<code>
      try {
        while (true) {
              doSomecleanup.......
        }
      } catch (InterruptedException e) {
        LOG.info(toString() + " terminating on InterruptedException");
      } catch (IOException e) {
        LOG.error(toString() + " terminating on IOException", e);
      } finally {
        lock.lock();
        try {
          kick(); // allow the handler for notificationSockets[0] to read a byte
          Iterator<Entry> iter = entries.values().iterator();
          while(iter.hasNext()) {
            sendCallback("close", iter, fdSet);
          }
          entries.clear();
          fdSet.close();
        } finally {
          lock.unlock();
        }
      }
</code>

正常情况下代码不会走入到finally,而是一直在while中loop。而报出的异常则是remove一个共享内存对象的时候失败,而导致的运行时异常。
经过jira查询和我们集群中机器日志查看,发现原因为在于Client向DataNode申请本地读时候,DataNode建立共享内存对象以及File Descriptor出现异常,导致分配失败,日志为

2015-11-06 04:52:41,080 INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: cliID: DFSClient_attempt_1435099124107_5925361_m_000028_0_1777694543_1, src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_SHM, shmId: n/a, srvID: 01f352c6-4e63-4158-8ead-3e8146103b6f, success: false

而在DataXceiver的requestShortCircuitShm代码中,如果失败则close连接

      if ((!success) && (peer == null)) {
        // If we failed to pass the shared memory segment to the client,
        // close the UNIX domain socket now.  This will trigger the 
        // DomainSocketWatcher callback, cleaning up the segment.
        IOUtils.cleanup(null, sock);
      }

但是,所有close操作都是通过回调DomainSocketWatcher来做的,这样,当DomainSocketWatcher再次close的时候内存中的共享内存对象由于已经close被释放,而报runtimeerror,这样DomainSocketWatcher线程异常退出,本地读没有清理线程,慢慢占满了slot,最后导致了DataNode不可用。

解决方法

DataXceiver不负责close连接,而只是负责将与client连接shutdown,使client能够快速反应读异常,同时增加更多的catch,如果后续还有异常能够找到原因。