Java GZIPInputStream OutOfMemoryError

       记录一个集群中用户提交任务失败的情况,用户的任务一直没有问题,但是某天的日志过来后,在集群中报错,报的是GZipInputStream 的OutOfMemory问题。刚开始认为是数据过大,将内存设置大一些就好了,但是设置内存不管作用,在单机给他的日志做测试的时候依然报错,错误堆栈信息为:

2014-11-10 09:52:54,743 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError

        at java.util.zip.Inflater.inflateBytes(Native Method)

        at java.util.zip.Inflater.inflate(Inflater.java:238)

        at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:135)

        at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:90)

        at com.sina.suda.mr.util.BinaryDecoder.unGZip(BinaryDecoder.java:21)

        at com.sina.suda.mr.util.BinaryDecoder.decodePostStr(BinaryDecoder.java:67)

       根据堆栈查看了一下jdk source code的路径,发现最后的native 代码报错。Google该问题,发现时jdk6的问题,由于日志传输过程中gzip文件corrrupt掉了,这样导致了OutOfMemory。解决方法比较暴力,在外层catch OutOfMemoryError,然后返回一个空结果。

Java ThreadLocal介绍

       记录一下ThreadLocal的一些用法,以及由此引出的一个Hadoop bug。首先说一下ThreadLocal,通常在多线程编程方面我们更多使用的是Lock,Synchronized等等,ThreadLocal使用并不是很多。当年我入职淘宝时候其中的一道笔试题考查的就是ThreadLocal。 根据JDK中的描述,ThreadLocal提供了Thread本地变量。相对于ThreadLocal,其他变量能够被所有线程访问并且改写,需要通过同步机制来进行协调。而ThreadLocal是每个线程内部的一个私有变量,线程有个隐式的引用,线程之间互不干扰。简单的说就是ThreadLocal是通过空间换时间,每个线程对应了一个ThreadLocal的私有变量,只有这个线程自己使用,它的好处是不用关注线程之间的相互干扰,不需要加锁等,这样在大并发情况下是相当划算的,因为synchronized操作毕竟是相当损失性能的。

       但是,使用ThreadLocal的时候要注意ThreadLocal变量是线程独有的,如果通过ThreadLocal需改一些状态改变,一定要注意不要将这个线程之前的状态加入到这次操作中,导致错误。比如,下面的列子,我有10个线程,每个线程有一个Meta的ThreadLocal变量,我的本意是10个线程去设置状态,每个线程做1000次操作,其中有一次是Exception的操作,其它都是正确的操作。下面为代码:

       Meta类:

package com.dj;
 
 
public class Meta {
 
  private int number;
  private STATE state;
 
  public Meta() {
    number = 0;
    state = STATE.NORMAL;
  }
 
  private Meta(int number, STATE state) {
    this.number = number;
    this.state = state;
  }
 
  public int getNumber() {
    return number;
  }
 
  public void setNumber(int number) {
    this.number = number;
  }
 
  public STATE getState() {
    return state;
  }
 
  public void setState(STATE state) {
    this.state = state;
  }
 
  public static Meta spawn(Meta meta) {
    return new Meta(meta.number,meta.state);
  }
 
}

       STATE类:

package com.dj;
 
public enum STATE {
  NORMAL, EXCEPTION
};

       Process类。

package com.dj;
 
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
 
public class Process {
  public static final AtomicInteger GENERATOR = new AtomicInteger(0);
  public static final int TIMES =  1000;
  public static final int THREADS = 10;
  public static final int EXCEPTION_NUM = 10;
  public ArrayList<Meta> list = new ArrayList<Meta>();
 
 
  ThreadLocal<Meta> t = new ThreadLocal<Meta>() {
    protected Meta initialValue() {
      return new Meta();
    };
  };
 
  synchronized void process(STATE state) {
    t.get().setNumber(GENERATOR.getAndAdd(1));
    if(state==STATE.EXCEPTION) {
      t.get().setState(state);
    }
    list.add(Meta.spawn(t.get()));
  }
 
 
  public int stat() {
    Collections.sort(list, new Comparator<Meta>() {
 
      @Override
      public int compare(Meta o1, Meta o2) {
        return o1.getNumber() < o2.getNumber() ? -1 : 1; 
      }
 
    });
 
    int exception = 0;
    for(Meta meta: list) {
      if(meta.getState() == STATE.EXCEPTION)
        exception++;
    }
 
 
    System.out.println("Exception number should be " +
    EXCEPTION_NUM+" but the real number is "+exception);
    return exception;
  }
 
  public static void main(String[] args) {
    final Semaphore locks = new Semaphore(THREADS);
    Thread[] threads = new Thread[THREADS];
    final Process process = new Process();
    final AtomicInteger exSize = new AtomicInteger(0);
    for(Thread t : threads) {
      t = new Thread(new Runnable() {
 
        @Override
        public void run() {
          try {
            locks.acquire();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          Random r = new Random();
          int setException = r.nextInt(TIMES);
          exSize.addAndGet(TIMES-setException);
          for(int i=0 ; i<TIMES; i++) {
            if(i == setException)
              process.process(STATE.EXCEPTION);
            else 
              process.process(STATE.NORMAL);
          }
          locks.release();
        }
      });
      t.start();
    }
    int realNumber = 0;
    while(true) {
      if(locks.availablePermits() == THREADS) {
        realNumber = process.stat();
        break;
      }
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        break;
      }
    }
    System.out.println(realNumber+"  "+exSize);
  }
}

       运行时结果为:

Exception number should be 10 
but the real number is 4943 4943 4943

       这也很好解释,由于我们设置Exception操作之后没有重新设置状态,导致了异常状态有上千次。这其实也是近期我发现的Hadoop一个比较严重Bug的一个模拟。下一篇会讨论一下Hadoop的这个问题。

Java ThreadPool

      今天需要在Map Reduce中实现一个线程池,原因是对方提供的API接口,需要对每行日志通过HTTP请求获取数据,如果单线程去做耗时占资源,所以想法是通过线程池,在每次setup时候启动线程池,在map的每一行提交任务,如果线程池满了就Block住,最后cleanup时候销毁线程池就好了。

      于是采用了较为简单的Doug Lea的Executors框架,这套框架已经对大部分并发操作进行了封装改写,方便用户使用。在setup时候代码为:

 

    @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      int threadsNumber = context.getConfiguration().getInt("Thread.Number", 100);
      ExecutorService pool = Executors.newFixedThreadPool(threadsNumber);
    }

      在map方法内,每一行通过HTTP获取相应的数据:

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      map reduce 任务提交后,发现无论怎么修改初始线程数,很快程序就报GC异常了,无法跑完。经过提醒后发现原来是Executors的问题,Executors是Doug Lea为了方便用户,加入了许多封装,出问题的就在这段代码:

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

      这是我调用的方法,虽然最多启动nThreads线程,但是底层调用的ThreadPoolExecutor它的队列是LinkedBlockingQueue,是个无界队列,这就是解释了为什么很容易发生GC的问题了,每一行都new 一个Runnable放到这个Queue中,很快内存就撑爆了。找到问题后就好解决了,不使用Executors,而是直接使用ThreadPoolExecutor,根据参数加入一个有界的BlockingQueue,代码如下:

   @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      conf = context.getConfiguration();
      final Path out = FileOutputFormat.getOutputPath(context);
      final String outPath = ((FileSplit)context.getInputSplit()).getPath().getName() +
          ((FileSplit)context.getInputSplit()).getStart();
      threadPoolNumber = conf.getInt("ThreadPoolNumber", 500);
      pool = new ThreadPoolExecutor(threadPoolNumber, threadPoolNumber, 0, TimeUnit.SECONDS,
          new ArrayBlockingQueue(threadPoolNumber));
      semaphore = new Semaphore(threadPoolNumber);
      }

      需要注意的是如果提交任务过快,ThreadPoolExecutor会根据策略选择处理方法,默认的是抛出RejectedExecutionException,也可以直接将超出的抛弃或者最老的抛弃,或者自己去写RejectedExecutionHandler。我觉得都不是很方便,就采用的是Semaphore,这样在Map端代码就是

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      semaphore.acquire();
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      首先获取信号量,如果没获取到就一直等待。当线程任务完成时,不要忘记在finally中release Semaphore。

      @Override
      public void run() {
        long start = System.currentTimeMillis();
        String result = DataParse.parse(message);
        if(result != null &amp;&amp; result != "") {
          try {
            queue.put(result);
          } catch (InterruptedException e) {
          } finally {
            semaphore.release();
          }
          httpRate.addAndGet(System.currentTimeMillis() - start);
        }
      }

jvisualvm使用说明

   调试程序,需要找到程序热点,尤其是多线程程序,线程的等待时间,各个耦合系统之间的调用时间都需要仔细的进行监控从而获得系统性能的一个大概了解。

 jvisualvm使用起来相当方便,可以进行远程调试,满足最基本的需求。

   远程配置:需要在服务器端配置调试选项

   java  -Djava.rmi.server.hostname=10.73.20.72

-Dcom.sun.management.jmxremote=true

-Dcom.sun.management.jmxremote.port=9999

-Dcom.sun.management.jmxremote.ssl=false

-Dcom.sun.management.jmxremote.authenticate=false   com.sina.mis.calligraphus.core.Calligraphus 

注意:一般来说需要配置java.rmi.server.hostname属性,配置位服务器的ip地址,剩下的是端口,和不采用任何认证方式,便于调试。

   在本地打开jvisualvm,打开远程调试,键入地址,选择添加jmx connection,地址和端口,端口即上面配置的端口。就可以使用了,主要是hostname的配置。

 

java classpath加载顺序

    Java Classpath的加载是有顺序的,简单的说就是如果在Classpath中有多个条目具有相同的名称,那么前面的会被加载,后面的会被忽略。

Java Runnable convert to Callable Test

package com.jiangyu;
 
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
public class TestConvertThread {
  public static final Random r = new Random();
 
  class Call implements Callable<Object> {
 
    @Override
    public Object call() throws Exception {
      Thread.sleep(r.nextInt(5)*1000);
      System.out.println("finished callable");
      return true;
    }
 
  }
 
  class Run implements Runnable {
 
    @Override
    public void run() {
      try {
        Thread.sleep(r.nextInt(5)*1000);
        System.out.println("finished runnable");
      } catch (InterruptedException e) {
      }
    }
 
  }
 
  public void startJob() throws InterruptedException, ExecutionException, TimeoutException {
    ExecutorService executor = Executors.newCachedThreadPool();
    Future<Object> result = null;
    // this is for callable
    result = executor.submit(new Call());
    result.get(10, TimeUnit.SECONDS);
 
    // this is for runnable convert to callable
    result = executor.submit(Executors.callable(new Run()));
    result.get(10, TimeUnit.SECONDS);
  }
 
  public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
    TestConvertThread tt = new TestConvertThread();
    tt.startJob();
  }
}

测试了一下Runnable convert 到 Callable,需要Future返回类型为Object。JDK中可以使用Executors的Callable方法将Runnalbe直接转化为Callable。