MapReduce在Hadoop1.x流程 —— JobClient提交过程

      MapReduce从提交任务到执行的整理流程大概如下图所示: client1 简单的说MapReduce提交任务有大概几个过程:

      1、首先JobClient首先上传必要的文件,如执行的jar包,split信息,DistributedCache等,随后将任务提交给JobTracker

      2、JobTracker接受任务后,在后台不加锁实例化任务,并等待TaskTracker心跳接受任务

      3、TaskTracker根据JobTracker得调度器获得需要执行的任务

      4、TaskTracker本地化任务并执行MapReduce任务

      下面详细的看一下提交过程: 比如最简单的wordcount,客户端代码是:

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

      提交的代码是job.waitForCompletion(true),实际上底层调用的是JobClient的submitJobInternal,步骤如下:

      1、得到stagingDirectory暂存目录,将用户jar包,libjar,distributed cache等拷入到该目录下

      2、将split信息写入到staging目录只能中,split是根据InputFormat来进行切分的

 private 
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<!--?,?--> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 
    List splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

3、将所有conf写入到xml文件中,将任务提交给JobTracker

jobCopy.writeXml(out);

Java ThreadPool

      今天需要在Map Reduce中实现一个线程池,原因是对方提供的API接口,需要对每行日志通过HTTP请求获取数据,如果单线程去做耗时占资源,所以想法是通过线程池,在每次setup时候启动线程池,在map的每一行提交任务,如果线程池满了就Block住,最后cleanup时候销毁线程池就好了。

      于是采用了较为简单的Doug Lea的Executors框架,这套框架已经对大部分并发操作进行了封装改写,方便用户使用。在setup时候代码为:

 

    @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      int threadsNumber = context.getConfiguration().getInt("Thread.Number", 100);
      ExecutorService pool = Executors.newFixedThreadPool(threadsNumber);
    }

      在map方法内,每一行通过HTTP获取相应的数据:

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      map reduce 任务提交后,发现无论怎么修改初始线程数,很快程序就报GC异常了,无法跑完。经过提醒后发现原来是Executors的问题,Executors是Doug Lea为了方便用户,加入了许多封装,出问题的就在这段代码:

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

      这是我调用的方法,虽然最多启动nThreads线程,但是底层调用的ThreadPoolExecutor它的队列是LinkedBlockingQueue,是个无界队列,这就是解释了为什么很容易发生GC的问题了,每一行都new 一个Runnable放到这个Queue中,很快内存就撑爆了。找到问题后就好解决了,不使用Executors,而是直接使用ThreadPoolExecutor,根据参数加入一个有界的BlockingQueue,代码如下:

   @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      conf = context.getConfiguration();
      final Path out = FileOutputFormat.getOutputPath(context);
      final String outPath = ((FileSplit)context.getInputSplit()).getPath().getName() +
          ((FileSplit)context.getInputSplit()).getStart();
      threadPoolNumber = conf.getInt("ThreadPoolNumber", 500);
      pool = new ThreadPoolExecutor(threadPoolNumber, threadPoolNumber, 0, TimeUnit.SECONDS,
          new ArrayBlockingQueue(threadPoolNumber));
      semaphore = new Semaphore(threadPoolNumber);
      }

      需要注意的是如果提交任务过快,ThreadPoolExecutor会根据策略选择处理方法,默认的是抛出RejectedExecutionException,也可以直接将超出的抛弃或者最老的抛弃,或者自己去写RejectedExecutionHandler。我觉得都不是很方便,就采用的是Semaphore,这样在Map端代码就是

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      semaphore.acquire();
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      首先获取信号量,如果没获取到就一直等待。当线程任务完成时,不要忘记在finally中release Semaphore。

      @Override
      public void run() {
        long start = System.currentTimeMillis();
        String result = DataParse.parse(message);
        if(result != null &amp;&amp; result != "") {
          try {
            queue.put(result);
          } catch (InterruptedException e) {
          } finally {
            semaphore.release();
          }
          httpRate.addAndGet(System.currentTimeMillis() - start);
        }
      }

Hadoop问题:Premature EOF: no length prefix available

      最近Hadoop 2.4集群发现一个奇怪的问题,问题的表现是一个hive任务报错,报无法找到文件,我到集群上cat相关文件也无法找到三个副本中的任何一个,但是通过fsck找到相关block的位置后,到机器上却能够找到这个block,并且block没有任何损坏。

      报错的错误日志是:

      add to deadNodes and continue. java.io.EOFException: Premature EOF: no length prefix available

      找到相关代码,发现时PB发送请求后没有返回数据。突然想到这个可能是DataNode没有响应DFSClient,第一是有可能DataNode的机器打开文件数过少,这个首先排除了,我们集群安装都是没有问题的,打开文件数都是65535。第二是DataNode自身的设置,查看后发现没有设置打开文件数,默认的设置是4096,应该能够满足要求。最后我对DataNode进行了jstack,后来发现有大量的Short circuit 本地读线程hang住,总共4096个,导致了这台机器无法响应其它的请求。当三台具有相同副本的机器都hang住后,就会出现这个问题。

      找到hadoop out的日志之后,发现了这个异常:

Exception in thread “Thread-19″ java.util.ConcurrentModificationException

        at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1100)

        at java.util.TreeMap$ValueIterator.next(TreeMap.java:1145)

        at org.apache.hadoop.net.unix.DomainSocketWatcher$1.run(DomainSocketWatcher.java:465)

        at java.lang.Thread.run(Thread.java:662)

      查询后发现,原来是DomainSocketWatcher这个类内部的线程在finally时候没有将treemap加锁导致了ConcurrentModificationException。

      查询jira,发现社区已经找到了这个问题,并提供了patch,地址是:

https://issues.apache.org/jira/browse/HADOOP-10404

      这个问题还真是比较无语。

HDFS BlockManager chooseExcessReplicates 问题

      近日在做HDFS RAID时发现一个现象,RAID结束后需要调整Block位置,设置副本数为2,使Block尽可能的分散,但是从Raid界面发现,经过很长时间后,Block位置仍然无法满足分散到不同机器上的要求。查看NameNode日志后发现

NewImage

      Block从某个节点copy到另一个节点后,从第二个节点汇报给NameNode,并请求NameNode删除原始节点,但是NameNode并没与删除原始节点,而是直接要求汇报的节点删除该Block。

      通过DEBUG发现,问题出自BlockManager的chooseExcessReplicates方法。

NewImage

      我测试集群只有17台机器,分布在两个机架上,源节点在一个机架上,另一个副本在第二个机架上,拷贝到的也是在第二个机架上。那么上面的判断:moreThanOne指的是大于等于两个副本的机架机器,exactlyOne指的是只有一个副本的机架机器。那么moreThanOne.contains(delNodeHint)肯定是false,因为源节点属于只有一个副本的机架机器。||后面的语句addedNode!=null为true,!moreThanOne.contains(addedNode)为false,由于新添加的节点机架之前已经有一个副本了,所以为非。最后if语句结果为false,这样你即使提供了delNodeHint,指定了一个需要删除的机器,但是根据放置策略,依然不能删除,毕竟Hint只是个提示。

     当然,我的测试集群较小,较大的集群这种情况不太可能发生,另外raid主节点定期去copy,在显示集群中除了Balancer,这种情况也很少发生。

Hadoop NameNode端口问题

      今天被前同事问起NameNode都启动了哪些端口,分别是IPC server, Http Server, jmx Server,但是netstat -anlp发现不止有三个,是五个。经过我同事分析,多出的两个是

启动java 的profile 参数(jvisualvm的参数)

-Dcom.sun.management.jmxremote=true
-Djava.rmi.server.hostname=ip
-Dcom.sun.management.jmxremote.port=9986
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false

注意:这里这一组参数会启动3个端口,分别是因为 com.sun.management.jmxremote=true 启动一个随机
com.sun.management.jmxremote.port=9986 启动一个9986端口以及再启动一个额外的随机端口

正是因为hadoop-env.sh脚本中带有-Dcom.sun.management.jmxremote 属性,才导致hadoop本身又多起了一个端口

Hadoop Pipeline详解

一、说明
Hadoop 2.x相比较于1.x有了较大的改变,像MapReduce层面架构以及代码基本上是完全重写的,在HDFS层面加入了HA,Federation等特性,代码更加层次化和易读,同时加入的PB初期可能给阅读带来障碍,熟悉之后就没有太大问题了。
Pipeline一只是Hadoop重要的一个过程,不管是MR任务,Hive任务等等,最后都需要Pipeline将数据写入到HDFS中。所以熟悉Hadoop Pipeline过程还是很有意义的。

二、流程说明
Hadoop pipeline的建立可以由以下的流程图来说明,Pipeline建立牵扯的有NameNode,DataNode,DFSClient等结构。
pipe1
以下为简要说明
1、首先DFSClient向NameNode发送RPC Create请求,NameNode在INode中生成新的INodeFile并加入到新的LeaseManager中作为稍后写入的租约管理。
2、请求成功返回后,DFSClient生成一个OutputStream,既DFSOutputStream
3、行程pipeline之前向NameNode发送AddBlock请求,NameNode生成新的Block,选择要写入的DataNode节点,并将Block注册到INodeFile中,以及BlockManager中,准备写入
4、建立pipeline,根据NameNode返回的信息,找到一个primary Datanode作为第一个节点,准备写入
5、DataNode Socket接收后,首先将数据写入buffer中,然后写入到下一个节点,写入成功后将buffer中数据写入本地磁盘,并等待ACK信息
6、与5一样,写入下一个节点然后写入本地,最后等待ACK信息
7、如果ACK都成功返回后,发回给DFSClient,本次写入成功。

三、详细说明
Continue reading…

Hadoop FastCopy

一、背景

      Hadoop目前使用Distcp作为集群间的数据传输工具,但是Hadoop2.0 可以采用federation,每个ns公用存储节点,这样还采用Distcp作为集群间数据传输将会非常浪费,直接的想法就是能够通过硬链接将federation中共用存储的block也进行共享,就会大大的节省带宽,IO,网络以及时间。

 

 

 

二、流程说明

 



Fastcopy1

      整体流程如图,以下为简要说明:

1、Client首先向Source NS1获取需要拷贝的文件的block location

2、对每一个block,根据Source得Location向Dest NS2申请block,并尽量申请到同一个DataNode节点

3、向DataNode发送File copy的RPC请求

4、DataNode获得请求后查看源DataNode和目的DataNode是否是同一台机器上的同一个实例的DataNode,如果是通过hard link完成copy

5、如果不是,退化为常规的机器间拷贝


JournalNode性能测试

一、背景

 

    Hadoop 2 开始采用JournalNode作为共享存储,JournalNode由NameNode部分的QuorumJournalManager,IPCLoggerChannel及JournalNode Server端的JournalNodeRPCServer组成。

 

    IPCLoggerChannel由一个JournalNode对应一个线程去完成日志的发送,保证日志的有序性,同时根据Paxos算法,绝大多数JournalNode完成回应就算完成日志发送。

 

    原先Hadoop 1采用的是本地记录Editlog并在远程NFS上记录一份,使用JournalNode后,我们需要获得相应的数据来比较JournalNode性能及对NameNode的影响。

 

 

 

二、测试方法

 

    分别采用1,3,5台机器组成journalNode,测试每分钟写入的日志数,并且与大集群相比,得到比较结果。

 

    以下数据是从大集群获得的每分钟edit transaction数,一般的峰值不到10万。


Jn2

    以下是单节点JournalNode接收数量:

Jn1

    五个节点JournalNode接收数量:

Jn3

    综合上述数据,我们可以确定,即使使用journalnode,也能保证我的NameNode的并发性能。