ThreadLocal造成Hadoop权限混乱问题

一、问题现象

       10月底进行了一次中心机迁移,迁移完成后发现大量文件的权限不正确,导致了用户任务大量出问题。为暂时解决问题,我们对所有的目录增加读和执行权限(a+rx),对所有文件增加了读(a+r)权限,以保证用户能够运行任务。

P1

         如上图所示,用户文件被加入了Acl功能,而管理员并没有为该用户开通任何Acl功能。

P2

        查询Acl后,发现用户加入了一个完全无关的Acl。可以确定是Hadoop的元数据出了问题,导致了权限的混乱。

二、bug 查找

       第二天取消了大部分目录的ACL,同时又设置了一些ACL。在查找问题的过程中发现从新中心机拿出FSImage并重新加载,取消的ACL也有部分设置了新的ACL,可是从新的中心机内存中通过hdfs dfs -ls 查询后权限是完全正常的。初步怀疑是SNN merge Edit和FSImage出问题或者FSImage EditLog写入的时候出问题,最复杂的就是JournalNode内部出问题(最不可能的一种情况)。

       首先查看了一下SNN的merge代码路径,与ANN的savenamespace 路径是完全一致的。写了一个简单的测试程序,从ANN大量随机的建立目录(包含ACL和不含ACL)并记录其EditLog日志,同时从SNN不断的读取journalnode中的Edits,并记录,对比两边Edit日志,最后发现两边的Edit日志是完全一样的,这就排除了是SNN后者Journalnode出的问题。进而去查看记录的EditLog日志,发现在ANN中的Editlog是存在问题的,一些本没有设置ACL的目录被加入了ACL记录,说明这是在ANN log Editlog的时候出现的问题。查看mkdir的代码路径,代码段一直是在sync中的,所以肯定不是由竞争导致的数据不一致,继续查找,在logMkdir部分发现了这段代码,

  public void logMkDir(String path, INode newNode) {

    PermissionStatus permissions = newNode.getPermissionStatus();

    MkdirOp op = MkdirOp.getInstance(cache.get())

      .setInodeId(newNode.getId())

      .setPath(path)

      .setTimestamp(newNode.getModificationTime())

      .setPermissionStatus(permissions);

 

    AclFeature f = newNode.getAclFeature();

    if (f != null) {

      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));

    }

    logEdit(op);

  }

           其中cache是一个ThreadLocal的变量,属于线程私有变量,这就存在了一个非常严重的问题,假设之前这个线程mkdir继承了一个ACL,那么就会走到op.setAclEntries,op是从cache中获得的,这次mkdir没有任何问题。下面这个线程又logMkdir,这次newNode是一个普通的INode,不带ACL属性,那么就不会设置任何AclEntries,但是cache中的MkdirOp是存在AclEntries属性的,就是上次设置的属性,这样logEdit以后这个dir的permission就混乱了,不是自己的属性了。导致了重启后属性出现的问题。

       这个bug我已经反馈给了社区,jira号是https://issues.apache.org/jira/browse/HDFS-7385 

 

三、解决方案

          patch非常简单,对于没有AclEntries的op,设置null即可。对于已经存在问题的NameNode,从ANN saveNamespace,获得一个正确的FSImage,SNN先升级代码,读取正确的Image,启动,切换为ANN,再把原来的ANN升级。

MapReduce在Hadoop1.x流程 —— JobClient提交过程

      MapReduce从提交任务到执行的整理流程大概如下图所示: client1 简单的说MapReduce提交任务有大概几个过程:

      1、首先JobClient首先上传必要的文件,如执行的jar包,split信息,DistributedCache等,随后将任务提交给JobTracker

      2、JobTracker接受任务后,在后台不加锁实例化任务,并等待TaskTracker心跳接受任务

      3、TaskTracker根据JobTracker得调度器获得需要执行的任务

      4、TaskTracker本地化任务并执行MapReduce任务

      下面详细的看一下提交过程: 比如最简单的wordcount,客户端代码是:

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

      提交的代码是job.waitForCompletion(true),实际上底层调用的是JobClient的submitJobInternal,步骤如下:

      1、得到stagingDirectory暂存目录,将用户jar包,libjar,distributed cache等拷入到该目录下

      2、将split信息写入到staging目录只能中,split是根据InputFormat来进行切分的

 private 
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<!--?,?--> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 
    List splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

3、将所有conf写入到xml文件中,将任务提交给JobTracker

jobCopy.writeXml(out);

Java ThreadPool

      今天需要在Map Reduce中实现一个线程池,原因是对方提供的API接口,需要对每行日志通过HTTP请求获取数据,如果单线程去做耗时占资源,所以想法是通过线程池,在每次setup时候启动线程池,在map的每一行提交任务,如果线程池满了就Block住,最后cleanup时候销毁线程池就好了。

      于是采用了较为简单的Doug Lea的Executors框架,这套框架已经对大部分并发操作进行了封装改写,方便用户使用。在setup时候代码为:

 

    @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      int threadsNumber = context.getConfiguration().getInt("Thread.Number", 100);
      ExecutorService pool = Executors.newFixedThreadPool(threadsNumber);
    }

      在map方法内,每一行通过HTTP获取相应的数据:

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      map reduce 任务提交后,发现无论怎么修改初始线程数,很快程序就报GC异常了,无法跑完。经过提醒后发现原来是Executors的问题,Executors是Doug Lea为了方便用户,加入了许多封装,出问题的就在这段代码:

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

      这是我调用的方法,虽然最多启动nThreads线程,但是底层调用的ThreadPoolExecutor它的队列是LinkedBlockingQueue,是个无界队列,这就是解释了为什么很容易发生GC的问题了,每一行都new 一个Runnable放到这个Queue中,很快内存就撑爆了。找到问题后就好解决了,不使用Executors,而是直接使用ThreadPoolExecutor,根据参数加入一个有界的BlockingQueue,代码如下:

   @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      conf = context.getConfiguration();
      final Path out = FileOutputFormat.getOutputPath(context);
      final String outPath = ((FileSplit)context.getInputSplit()).getPath().getName() +
          ((FileSplit)context.getInputSplit()).getStart();
      threadPoolNumber = conf.getInt("ThreadPoolNumber", 500);
      pool = new ThreadPoolExecutor(threadPoolNumber, threadPoolNumber, 0, TimeUnit.SECONDS,
          new ArrayBlockingQueue(threadPoolNumber));
      semaphore = new Semaphore(threadPoolNumber);
      }

      需要注意的是如果提交任务过快,ThreadPoolExecutor会根据策略选择处理方法,默认的是抛出RejectedExecutionException,也可以直接将超出的抛弃或者最老的抛弃,或者自己去写RejectedExecutionHandler。我觉得都不是很方便,就采用的是Semaphore,这样在Map端代码就是

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      semaphore.acquire();
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      首先获取信号量,如果没获取到就一直等待。当线程任务完成时,不要忘记在finally中release Semaphore。

      @Override
      public void run() {
        long start