MapReduce在Hadoop1.x流程 —— JobClient提交过程

      MapReduce从提交任务到执行的整理流程大概如下图所示: client1 简单的说MapReduce提交任务有大概几个过程:

      1、首先JobClient首先上传必要的文件,如执行的jar包,split信息,DistributedCache等,随后将任务提交给JobTracker

      2、JobTracker接受任务后,在后台不加锁实例化任务,并等待TaskTracker心跳接受任务

      3、TaskTracker根据JobTracker得调度器获得需要执行的任务

      4、TaskTracker本地化任务并执行MapReduce任务

      下面详细的看一下提交过程: 比如最简单的wordcount,客户端代码是:

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

      提交的代码是job.waitForCompletion(true),实际上底层调用的是JobClient的submitJobInternal,步骤如下:

      1、得到stagingDirectory暂存目录,将用户jar包,libjar,distributed cache等拷入到该目录下

      2、将split信息写入到staging目录只能中,split是根据InputFormat来进行切分的

 private 
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<!--?,?--> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
 
    List splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
 
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

3、将所有conf写入到xml文件中,将任务提交给JobTracker

jobCopy.writeXml(out);

java.lang.IncompatibleClassChangeError 错误分析

    最近帮助同事把之前给他搭建的Hadoop2.2集群升级到2.4,并将之前我们做的实时应用程序迁移部署到Hadoop2.4上。

       在迁移的过程中,他发现之前通过Scribe客户端向Scribe中继发送日志的程序总是报错,异常日志是:

java.lang.IncompatibleClassChangeError: com/sina/mis/calligraphus/thrift/CalligraphusService$Client
	at com.sina.mis.client.ClientMulti.connect(ClientMulti.java:176)
	at com.sina.mis.client.ClientMulti.access$100(ClientMulti.java:37)
	at com.sina.mis.client.ClientMulti$1.run(ClientMulti.java:156)
	at java.util.TimerThread.mainLoop(Timer.java:512)
	at java.util.TimerThread.run(Timer.java:462)

      看一场应该是接口或者类的实现改变,导致的IncopatibleClassChangeError。 仔细检查了一下出错的代码为  inco1       分析一下Client一直是个抽象类,没有问题,Iface一直是接口也没有问题,出问题的就在TServiceClient接口上,联想到之前也是遇到过这个问题,Thrift在0.7以后这个接口就变成了抽象类,导致了implements报错,仔细的查看了一下集群上面使用这个累的jar包,确定是hive-exec.jar,反编译以后得到了印证:

col1

      最后解决方法,目前想到的是修改程序,明天查看一下是否可以通过选项将用户提交的jar包覆盖父类继承的CLASSPATH。

Jvm 参数选取顺序

      Jvm会选取最右侧的参数作为有效参数,当参数重复的时候,比如在Hadoop中修改启动的Jvm内存大小,需要将-Xmx或者-Xms放在最右侧,才能生效。

Spark SQL部署指南

      最近一直在看Spark以及Hive相关的东西,Spark SQL就是使用Hive SQL Parser将hive语句转化为RDD,方便Hive用户平滑迁移。

      首先就是如何打包:

       1、从github上下载源码,地址是https://github.com/apache/spark,我选取的是1.0.2(1.1.0在提交时候出现Permission denied问题,我放弃了)

      2、安装maven,scala等,Hadoop集群,并设定home目录

      3、通过Maven打包前修改Maven的jvm参数,export MAVEN_OPTS=”-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512”

      4、根据HADOOP版本通过MAVEN打包,比如对2.4版本 bash make-distribution.sh –name spark-1.0.2 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 打包出来是一个tgz的包,上传到部署的机器上 部署问题:

      5、解压tgz的包,将hive需要的mysql-jdbc的jar包拷贝到lib下,同时将hive配置文件hive-site.xml配置好拷贝到conf下

      6、修改配置文件,修改spark-default.conf

spark.yarn.historyServer.address 10.39.5.23:18080 // spark history server position
spark.eventLog.enabled true
spark.eventLog.dir   hdfs://ns1/user/jiangyu2/logs

修改spark-env.sh

export SPARK_JAR=hdfs://ns1/user/jiangyu2/spark-assembly-1.1.0-SNAPSHOT-hadoop2.4.0.jar
export HADOOP_CONF_DIR=/usr/local/hadoop-2.4.0/etc/hadoop
export YARN_CONF_DIR=/usr/local/hadoop-2.4.0/etc/hadoop
export SPARK_YARN_USER_ENV="CLASSPATH=/usr/local/hadoop-2.4.0/etc/hadoop/"
export SPARK_SUBMIT_LIBRARY_PATH=/usr/local/hadoop-2.4.0/lib/native/
export SPARK_CLASSPATH=$SPARK_CLASSPATH

7、启动spark shell

$ ./bin/spark-shell --master yarn-client
 
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
14/09/17 17:57:07 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@1f3b0168
 
scala> import hiveContext._
import hiveContext._
 
scala> hql("SELECT t1.fans_uid,t3.user_type FROM   (SELECT fans_uid,atten_uid,time FROM   ods_user_fanslist WHERE  dt = '20140909') t1  JOIN (SELECT uid,user_type_id AS user_type,user_status,reg_time FROM   mds_user_info WHERE  dt = '20140909') t3 ON t1.atten_uid = t3.uid limit 10”).collect().foreach(println)

8、写java程序提交同样的hql

package nimei;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;
 
public class NiDaYe {
  public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("caocaocao");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 
    JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.JavaHiveContext(ctx);
 
    // Queries are expressed in HiveQL.
    hiveCtx.hql("SELECT count(fans_uid),user_status from(SELECT fans_uid,atten_uid,time FROM   ods_user_fanslist WHERE  dt = '20140909') " +
    		"t1  JOIN (SELECT uid,user_type_id AS user_type,user_status,reg_time FROM  " +
    		" mds_user_info WHERE  dt = '20140909') t3 ON t1.atten_uid = t3.uid group by user_status").collect();
  }
}

  最后就是applicationMaster页面了 spark

Java ThreadPool

      今天需要在Map Reduce中实现一个线程池,原因是对方提供的API接口,需要对每行日志通过HTTP请求获取数据,如果单线程去做耗时占资源,所以想法是通过线程池,在每次setup时候启动线程池,在map的每一行提交任务,如果线程池满了就Block住,最后cleanup时候销毁线程池就好了。

      于是采用了较为简单的Doug Lea的Executors框架,这套框架已经对大部分并发操作进行了封装改写,方便用户使用。在setup时候代码为:

 

    @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      int threadsNumber = context.getConfiguration().getInt("Thread.Number", 100);
      ExecutorService pool = Executors.newFixedThreadPool(threadsNumber);
    }

      在map方法内,每一行通过HTTP获取相应的数据:

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      map reduce 任务提交后,发现无论怎么修改初始线程数,很快程序就报GC异常了,无法跑完。经过提醒后发现原来是Executors的问题,Executors是Doug Lea为了方便用户,加入了许多封装,出问题的就在这段代码:

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * <tt>nThreads</tt> threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if <tt>nThreads &lt;= 0</tt>
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

      这是我调用的方法,虽然最多启动nThreads线程,但是底层调用的ThreadPoolExecutor它的队列是LinkedBlockingQueue,是个无界队列,这就是解释了为什么很容易发生GC的问题了,每一行都new 一个Runnable放到这个Queue中,很快内存就撑爆了。找到问题后就好解决了,不使用Executors,而是直接使用ThreadPoolExecutor,根据参数加入一个有界的BlockingQueue,代码如下:

   @Override
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
      super.setup(context);
      conf = context.getConfiguration();
      final Path out = FileOutputFormat.getOutputPath(context);
      final String outPath = ((FileSplit)context.getInputSplit()).getPath().getName() +
          ((FileSplit)context.getInputSplit()).getStart();
      threadPoolNumber = conf.getInt("ThreadPoolNumber", 500);
      pool = new ThreadPoolExecutor(threadPoolNumber, threadPoolNumber, 0, TimeUnit.SECONDS,
          new ArrayBlockingQueue(threadPoolNumber));
      semaphore = new Semaphore(threadPoolNumber);
      }

      需要注意的是如果提交任务过快,ThreadPoolExecutor会根据策略选择处理方法,默认的是抛出RejectedExecutionException,也可以直接将超出的抛弃或者最老的抛弃,或者自己去写RejectedExecutionHandler。我觉得都不是很方便,就采用的是Semaphore,这样在Map端代码就是

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      semaphore.acquire();
      FetchFromUid one = new FetchFromUid(value.toString());
      pool.submit(one);
    }

      首先获取信号量,如果没获取到就一直等待。当线程任务完成时,不要忘记在finally中release Semaphore。

      @Override
      public void run() {
        long start = System.currentTimeMillis();
        String result = DataParse.parse(message);
        if(result != null &amp;&amp; result != "") {
          try {
            queue.put(result);
          } catch (InterruptedException e) {
          } finally {
            semaphore.release();
          }
          httpRate.addAndGet(System.currentTimeMillis() - start);
        }
      }

MAVEN org.eclipse.m2e:lifecycle-mapping问题

      最近在编译Presto时候遇到了一个MAVEN的问题,MAVEN报以下错误:

       [WARNING] The POM for org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 is missing, no dependency information available

       [WARNING] Failed to retrieve plugin descriptor for org.eclipse.m2e:lifecycle-mapping:1.0.0: Plugin org.eclipse.m2e:lifecycle-mapping:1.0.0 or one of its dependencies could not be resolved: Failed to read artifact descriptor for org.eclipse.m2e:lifecycle-mapping:jar:1.0.0

      google以下,发现了以下的解决方法:

      1、checkout https://github.com/mfriedenhagen/dummy-lifecycle-mapping-plugin

      2、解压,mvn clean install

      3、再去编译自己的工程,okay,解决

Hadoop问题:Premature EOF: no length prefix available

      最近Hadoop 2.4集群发现一个奇怪的问题,问题的表现是一个hive任务报错,报无法找到文件,我到集群上cat相关文件也无法找到三个副本中的任何一个,但是通过fsck找到相关block的位置后,到机器上却能够找到这个block,并且block没有任何损坏。

      报错的错误日志是:

      add to deadNodes and continue. java.io.EOFException: Premature EOF: no length prefix available

      找到相关代码,发现时PB发送请求后没有返回数据。突然想到这个可能是DataNode没有响应DFSClient,第一是有可能DataNode的机器打开文件数过少,这个首先排除了,我们集群安装都是没有问题的,打开文件数都是65535。第二是DataNode自身的设置,查看后发现没有设置打开文件数,默认的设置是4096,应该能够满足要求。最后我对DataNode进行了jstack,后来发现有大量的Short circuit 本地读线程hang住,总共4096个,导致了这台机器无法响应其它的请求。当三台具有相同副本的机器都hang住后,就会出现这个问题。

      找到hadoop out的日志之后,发现了这个异常:

Exception in thread “Thread-19″ java.util.ConcurrentModificationException

        at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1100)

        at java.util.TreeMap$ValueIterator.next(TreeMap.java:1145)

        at org.apache.hadoop.net.unix.DomainSocketWatcher$1.run(DomainSocketWatcher.java:465)

        at java.lang.Thread.run(Thread.java:662)

      查询后发现,原来是DomainSocketWatcher这个类内部的线程在finally时候没有将treemap加锁导致了ConcurrentModificationException。

      查询jira,发现社区已经找到了这个问题,并提供了patch,地址是:

https://issues.apache.org/jira/browse/HADOOP-10404

      这个问题还真是比较无语。

HDFS BlockManager chooseExcessReplicates 问题

      近日在做HDFS RAID时发现一个现象,RAID结束后需要调整Block位置,设置副本数为2,使Block尽可能的分散,但是从Raid界面发现,经过很长时间后,Block位置仍然无法满足分散到不同机器上的要求。查看NameNode日志后发现

NewImage

      Block从某个节点copy到另一个节点后,从第二个节点汇报给NameNode,并请求NameNode删除原始节点,但是NameNode并没与删除原始节点,而是直接要求汇报的节点删除该Block。

      通过DEBUG发现,问题出自BlockManager的chooseExcessReplicates方法。

NewImage

      我测试集群只有17台机器,分布在两个机架上,源节点在一个机架上,另一个副本在第二个机架上,拷贝到的也是在第二个机架上。那么上面的判断:moreThanOne指的是大于等于两个副本的机架机器,exactlyOne指的是只有一个副本的机架机器。那么moreThanOne.contains(delNodeHint)肯定是false,因为源节点属于只有一个副本的机架机器。||后面的语句addedNode!=null为true,!moreThanOne.contains(addedNode)为false,由于新添加的节点机架之前已经有一个副本了,所以为非。最后if语句结果为false,这样你即使提供了delNodeHint,指定了一个需要删除的机器,但是根据放置策略,依然不能删除,毕竟Hint只是个提示。

     当然,我的测试集群较小,较大的集群这种情况不太可能发生,另外raid主节点定期去copy,在显示集群中除了Balancer,这种情况也很少发生。

JVMTI Agent MANNIFEST.MF问题

      使用JVMTI对线上环境注射一个Agent用于对某些异常现象进行捕捉,这个过程后面会专门写一篇blog。对于Agent需要手动指定MANNIFEST.MF。今天在打包的时候一直出错,明明指定了Agent-Class: com.sina.cao.ProbeScheduler。但是打包后就消失了,解压后这行就丢了。经过检查发现,MANNIFEST.MF需要最后是一个空行。

line1:Manifest-Version: 1.0

line2:Agent-Class: com.sina.cao.ProbeScheduler

line3:

    这样打包以后就不会出问题了。