ThreadLocal造成Hadoop权限混乱问题

一、问题现象

       10月底进行了一次中心机迁移,迁移完成后发现大量文件的权限不正确,导致了用户任务大量出问题。为暂时解决问题,我们对所有的目录增加读和执行权限(a+rx),对所有文件增加了读(a+r)权限,以保证用户能够运行任务。

P1

         如上图所示,用户文件被加入了Acl功能,而管理员并没有为该用户开通任何Acl功能。

P2

        查询Acl后,发现用户加入了一个完全无关的Acl。可以确定是Hadoop的元数据出了问题,导致了权限的混乱。

二、bug 查找

       第二天取消了大部分目录的ACL,同时又设置了一些ACL。在查找问题的过程中发现从新中心机拿出FSImage并重新加载,取消的ACL也有部分设置了新的ACL,可是从新的中心机内存中通过hdfs dfs -ls 查询后权限是完全正常的。初步怀疑是SNN merge Edit和FSImage出问题或者FSImage EditLog写入的时候出问题,最复杂的就是JournalNode内部出问题(最不可能的一种情况)。

       首先查看了一下SNN的merge代码路径,与ANN的savenamespace 路径是完全一致的。写了一个简单的测试程序,从ANN大量随机的建立目录(包含ACL和不含ACL)并记录其EditLog日志,同时从SNN不断的读取journalnode中的Edits,并记录,对比两边Edit日志,最后发现两边的Edit日志是完全一样的,这就排除了是SNN后者Journalnode出的问题。进而去查看记录的EditLog日志,发现在ANN中的Editlog是存在问题的,一些本没有设置ACL的目录被加入了ACL记录,说明这是在ANN log Editlog的时候出现的问题。查看mkdir的代码路径,代码段一直是在sync中的,所以肯定不是由竞争导致的数据不一致,继续查找,在logMkdir部分发现了这段代码,

  public void logMkDir(String path, INode newNode) {

    PermissionStatus permissions = newNode.getPermissionStatus();

    MkdirOp op = MkdirOp.getInstance(cache.get())

      .setInodeId(newNode.getId())

      .setPath(path)

      .setTimestamp(newNode.getModificationTime())

      .setPermissionStatus(permissions);

 

    AclFeature f = newNode.getAclFeature();

    if (f != null) {

      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));

    }

    logEdit(op);

  }

           其中cache是一个ThreadLocal的变量,属于线程私有变量,这就存在了一个非常严重的问题,假设之前这个线程mkdir继承了一个ACL,那么就会走到op.setAclEntries,op是从cache中获得的,这次mkdir没有任何问题。下面这个线程又logMkdir,这次newNode是一个普通的INode,不带ACL属性,那么就不会设置任何AclEntries,但是cache中的MkdirOp是存在AclEntries属性的,就是上次设置的属性,这样logEdit以后这个dir的permission就混乱了,不是自己的属性了。导致了重启后属性出现的问题。

       这个bug我已经反馈给了社区,jira号是https://issues.apache.org/jira/browse/HDFS-7385 

 

三、解决方案

          patch非常简单,对于没有AclEntries的op,设置null即可。对于已经存在问题的NameNode,从ANN saveNamespace,获得一个正确的FSImage,SNN先升级代码,读取正确的Image,启动,切换为ANN,再把原来的ANN升级。

MapReduce在Hadoop1.x流程 —— JobClient提交过程

      MapReduce从提交任务到执行的整理流程大概如下图所示: client1 简单的说MapReduce提交任务有大概几个过程:

      1、首先JobClient首先上传必要的文件,如执行的jar包,split信息,DistributedCache等,随后将任务提交给JobTracker

      2、JobTracker接受任务后,在后台不加锁实例化任务,并等待TaskTracker心跳接受任务

      3、TaskTracker根据JobTracker得调度器获得需要执行的任务

      4、TaskTracker本地化任务并执行MapReduce任务

      下面详细的看一下提交过程: 比如最简单的wordcount,客户端代码是:

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

      提交的代码是job.waitForCompletion(true),实际上底层调用的是JobClient的submitJobInternal,步骤如下:

      1、得到stagingDirectory暂存目录,将用户jar包,libjar,distributed cache等拷入到该目录下

      2、将split信息写入到staging目录只能中,split是根据InputFormat来进行切分的

 private 
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<!--?,?--> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 
    List splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

3、将所有conf写入到xml文件中,将任务提交给JobTracker

jobCopy.writeXml(out);

Java ThreadPool

      今天需要在Map Reduce中实现一个线程池,原因是对方提供的API接口,需要对每行日志通过HTTP请求获取数据,如果单线程去做耗时占资源,所以想法是通过线程池,在每次setup时候启动线程池,在map的每一行提交任务,如果线程池满了就Block住,最后cleanup时候销毁线程池就好了。

      于是采用了较为简单的Doug Lea的Executors框架,这套框架已经对大部分并发操作进行了封装改写,方便用户使用。在setup时候代码为:

 

    @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      int threadsNumber = context.getConfiguration().getInt("Thread.Number", 100);
      ExecutorService pool = Executors.newFixedThreadPool(threadsNumber);
    }

      在map方法内,每一行通过HTTP获取相应的数据:

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      map reduce 任务提交后,发现无论怎么修改初始线程数,很快程序就报GC异常了,无法跑完。经过提醒后发现原来是Executors的问题,Executors是Doug Lea为了方便用户,加入了许多封装,出问题的就在这段代码:

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

      这是我调用的方法,虽然最多启动nThreads线程,但是底层调用的ThreadPoolExecutor它的队列是LinkedBlockingQueue,是个无界队列,这就是解释了为什么很容易发生GC的问题了,每一行都new 一个Runnable放到这个Queue中,很快内存就撑爆了。找到问题后就好解决了,不使用Executors,而是直接使用ThreadPoolExecutor,根据参数加入一个有界的BlockingQueue,代码如下:

   @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      conf = context.getConfiguration();
      final Path out = FileOutputFormat.getOutputPath(context);
      final String outPath = ((FileSplit)context.getInputSplit()).getPath().getName() +
          ((FileSplit)context.getInputSplit()).getStart();
      threadPoolNumber = conf.getInt("ThreadPoolNumber", 500);
      pool = new ThreadPoolExecutor(threadPoolNumber, threadPoolNumber, 0, TimeUnit.SECONDS,
          new ArrayBlockingQueue(threadPoolNumber));
      semaphore = new Semaphore(threadPoolNumber);
      }

      需要注意的是如果提交任务过快,ThreadPoolExecutor会根据策略选择处理方法,默认的是抛出RejectedExecutionException,也可以直接将超出的抛弃或者最老的抛弃,或者自己去写RejectedExecutionHandler。我觉得都不是很方便,就采用的是Semaphore,这样在Map端代码就是

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      semaphore.acquire();
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      首先获取信号量,如果没获取到就一直等待。当线程任务完成时,不要忘记在finally中release Semaphore。

      @Override
      public void run() {
        long start = System.currentTimeMillis();
        String result = DataParse.parse(message);
        if(result != null &amp;&amp; result != "") {
          try {
            queue.put(result);
          } catch (InterruptedException e) {
          } finally {
            semaphore.release();
          }
          httpRate.addAndGet(System.currentTimeMillis() - start);
        }
      }

Hadoop问题:Premature EOF: no length prefix available

      最近Hadoop 2.4集群发现一个奇怪的问题,问题的表现是一个hive任务报错,报无法找到文件,我到集群上cat相关文件也无法找到三个副本中的任何一个,但是通过fsck找到相关block的位置后,到机器上却能够找到这个block,并且block没有任何损坏。

      报错的错误日志是:

      add to deadNodes and continue. java.io.EOFException: Premature EOF: no length prefix available

      找到相关代码,发现时PB发送请求后没有返回数据。突然想到这个可能是DataNode没有响应DFSClient,第一是有可能DataNode的机器打开文件数过少,这个首先排除了,我们集群安装都是没有问题的,打开文件数都是65535。第二是DataNode自身的设置,查看后发现没有设置打开文件数,默认的设置是4096,应该能够满足要求。最后我对DataNode进行了jstack,后来发现有大量的Short circuit 本地读线程hang住,总共4096个,导致了这台机器无法响应其它的请求。当三台具有相同副本的机器都hang住后,就会出现这个问题。

      找到hadoop out的日志之后,发现了这个异常:

Exception in thread “Thread-19″ java.util.ConcurrentModificationException

        at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1100)

        at java.util.TreeMap$ValueIterator.next(TreeMap.java:1145)

        at org.apache.hadoop.net.unix.DomainSocketWatcher$1.run(DomainSocketWatcher.java:465)

        at java.lang.Thread.run(Thread.java:662)

      查询后发现,原来是DomainSocketWatcher这个类内部的线程在finally时候没有将treemap加锁导致了ConcurrentModificationException。

      查询jira,发现社区已经找到了这个问题,并提供了patch,地址是:

https://issues.apache.org/jira/browse/HADOOP-10404

      这个问题还真是比较无语。

HDFS BlockManager chooseExcessReplicates 问题

      近日在做HDFS RAID时发现一个现象,RAID结束后需要调整Block位置,设置副本数为2,使Block尽可能的分散,但是从Raid界面发现,经过很长时间后,Block位置仍然无法满足分散到不同机器上的要求。查看NameNode日志后发现

NewImage

      Block从某个节点copy到另一个节点后,从第二个节点汇报给NameNode,并请求NameNode删除原始节点,但是NameNode并没与删除原始节点,而是直接要求汇报的节点删除该Block。

      通过DEBUG发现,问题出自BlockManager的chooseExcessReplicates方法。

NewImage

      我测试集群只有17台机器,分布在两个机架上,源节点在一个机架上,另一个副本在第二个机架上,拷贝到的也是在第二个机架上。那么上面的判断:moreThanOne指的是大于等于两个副本的机架机器,exactlyOne指的是只有一个副本的机架机器。那么moreThanOne.contains(delNodeHint)肯定是false,因为源节点属于只有一个副本的机架机器。||后面的语句addedNode!=null为true,!moreThanOne.contains(addedNode)为false,由于新添加的节点机架之前已经有一个副本了,所以为非。最后if语句结果为false,这样你即使提供了delNodeHint,指定了一个需要删除的机器,但是根据放置策略,依然不能删除,毕竟Hint只是个提示。

     当然,我的测试集群较小,较大的集群这种情况不太可能发生,另外raid主节点定期去copy,在显示集群中除了Balancer,这种情况也很少发生。

Hadoop NameNode端口问题

      今天被前同事问起NameNode都启动了哪些端口,分别是IPC server, Http Server, jmx Server,但是netstat -anlp发现不止有三个,是五个。经过我同事分析,多出的两个是

启动java 的profile 参数(jvisualvm的参数)

-Dcom.sun.management.jmxremote=true
-Djava.rmi.server.hostname=ip
-Dcom.sun.management.jmxremote.port=9986
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false

注意:这里这一组参数会启动3个端口,分别是因为 com.sun.management.jmxremote=true 启动一个随机
com.sun.management.jmxremote.port=9986 启动一个9986端口以及再启动一个额外的随机端口

正是因为hadoop-env.sh脚本中带有-Dcom.sun.management.jmxremote 属性,才导致hadoop本身又多起了一个端口

Hadoop Pipeline详解

一、说明
Hadoop 2.x相比较于1.x有了较大的改变,像MapReduce层面架构以及代码基本上是完全重写的,在HDFS层面加入了HA,Federation等特性,代码更加层次化和易读,同时加入的PB初期可能给阅读带来障碍,熟悉之后就没有太大问题了。
Pipeline一只是Hadoop重要的一个过程,不管是MR任务,Hive任务等等,最后都需要Pipeline将数据写入到HDFS中。所以熟悉Hadoop Pipeline过程还是很有意义的。

二、流程说明
Hadoop pipeline的建立可以由以下的流程图来说明,Pipeline建立牵扯的有NameNode,DataNode,DFSClient等结构。
pipe1
以下为简要说明
1、首先DFSClient向NameNode发送RPC Create请求,NameNode在INode中生成新的INodeFile并加入到新的LeaseManager中作为稍后写入的租约管理。
2、请求成功返回后,DFSClient生成一个OutputStream,既DFSOutputStream
3、行程pipeline之前向NameNode发送AddBlock请求,NameNode生成新的Block,选择要写入的DataNode节点,并将Block注册到INodeFile中,以及BlockManager中,准备写入
4、建立pipeline,根据NameNode返回的信息,找到一个primary Datanode作为第一个节点,准备写入
5、DataNode Socket接收后,首先将数据写入buffer中,然后写入到下一个节点,写入成功后将buffer中数据写入本地磁盘,并等待ACK信息
6、与5一样,写入下一个节点然后写入本地,最后等待ACK信息
7、如果ACK都成功返回后,发回给DFSClient,本次写入成功。

三、详细说明
Continue reading…

Hadoop FastCopy

一、背景

      Hadoop目前使用Distcp作为集群间的数据传输工具,但是Hadoop2.0 可以采用federation,每个ns公用存储节点,这样还采用Distcp作为集群间数据传输将会非常浪费,直接的想法就是能够通过硬链接将federation中共用存储的block也进行共享,就会大大的节省带宽,IO,网络以及时间。

 

 

 

二、流程说明

 



Fastcopy1

      整体流程如图,以下为简要说明:

1、Client首先向Source NS1获取需要拷贝的文件的block location

2、对每一个block,根据Source得Location向Dest NS2申请block,并尽量申请到同一个DataNode节点

3、向DataNode发送File copy的RPC请求

4、DataNode获得请求后查看源DataNode和目的DataNode是否是同一台机器上的同一个实例的DataNode,如果是通过hard link完成copy

5、如果不是,退化为常规的机器间拷贝