Druid Batch Job 问题汇总

最近在整理使用Druid Batch Job所遇到的问题,下面一一记录,我使用的Druid版本是0.8.2,所以以下方法适用于0.8.2。
一、依赖包问题
因为很多机器都无法连接外网,所以有必要修改druid的源文件,使他能够从我们的本地中心库下载文件,所需要修改的文件为ExtensionsConfig.java,修改内容如下

   private List<String> remoteRepositories = ImmutableList.of(
-      "https://repo1.maven.org/maven2/",
-      "https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local"
+      "http://10.39.0.110:8081/nexus/content/groups/public",
+      "http://10.39.0.110:8081/nexus/content/repositories/thirdparty"
   );

这样将会从我们的中心库下载所需要的包。
修改完后我们可以把所需要的包拉取的本地用于提交,修改$DRUID_HOME/config/_common/common.runtime.properties

druid.extensions.localRepository=/usr/local/druid-0.8.2/localRepo

指定本地仓库的位置。 随后在$DRUID_HOME目录下执行

java -cp  config/_common:config/broker:/usr/local/hadoop-2.4.0/etc/hadoop:lib/*  io.druid.cli.Main tools pull-deps

这样会将依赖包下载。
如果需要增加自己的的第三方依赖包,也修改$DRUID_HOME/config/_common/common.runtime.properties

druid.extensions.coordinates=["com.sina.hivexec:hive-exec:0.13.0"]

这样提交的时候会将依赖包加入到classpath中。

二、counter计数问题
我们的问题是由于一个第三方包导致获取counter计数抛异常,随后导致任务失败,目前解决方法是通过修改源码,catch住异常,查看是否是由于counter计数引起的,并且查看任务是否成功,成功的话继续下一步任务,而不是直接抛出异常结束。

三、有关reduce个数问题
简单说一下druid的流程,只讲一下partitionsSpec type为hash的情况。
1、如果不指定numShards 那么会分两个任务,第一个任务通过hyperloglog对每一个分区去查找基数大小,reduce会将每个分区的基数大小输出。
随后job会根据targetPartitionSize决定由几个shards来跑这个第二个任务,第二个任务就是生成index,基本流程跟realtime一样,根据日志,生成index。但是如果shards为1,相当于只有一个reduce去跑,会比较慢。这样如果基数是20000, “targetPartitionSize” : 10000,那么每个时间分区就只有20000/10000=2个reduce去跑。
2、如果指定numShards,那么就只有index一个任务,每个时间分区启动numShards个reduce,如果知道大概的数据量以及基数,可以直接指定numShareds.

四、时区问题
由于提交的时候指定的是UTC时区,所以需要在map 以及reduce阶段也制定时区,指定方法为,修改提交机器的mapred-site.xml

  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-Xmx1280M -Duser.timezone=UTC</value>
  </property>
  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-Xmx1536M -Duser.timezone=UTC</value>
  </property>

How to solve “.Git Directory Could Not Be Found! Please Specify a Valid [dotGitDirectory] in Your pom.xml” Problem

When i modify the source code of presto, and execute mvn package, something wrong happened, the stack trace is as follow:

.Git Directory Could Not Be Found! Please Specify a Valid [dotGitDirectory] in Your pom.xml

After google the problem, i find a simple way to solve it. Add this plugin to your pom.xml and everything is ok.

            <plugin>
              <groupId>pl.project13.maven</groupId>
              <artifactId>git-commit-id-plugin</artifactId>
              <version>2.1.15</version>
              <configuration>
                <failOnNoGitDirectory>false</failOnNoGitDirectory>
              </configuration>
            </plugin>

Hadoop Web Page Information

Sometimes, we want make sure which branch we compile the hadoop code. Normally, we can find this information through hadoop web page, for example the namenode web page:


If we want to change the information by self, we should modify the file in
$HADOOP_HOME/hadoop-common-project/hadoop-common/src/main/resources/common-version-info.properties

version=${pom.version}
revision=${version-info.scm.commit}
branch=${version-info.scm.branch}
user=${user.name}
date=${version-info.build.time}
url=${version-info.scm.uri}
srcChecksum=${version-info.source.md5}
protocVersion=${protobuf.version}

Just modify the value , you can change the web page of namenode. For example we can change the branch to branch=tag-20160817 as the image above.

关于Druid时区的问题

Druid使用过程中,一开始比较烦恼的是关于时区的问题。Druid内部采用joda作为时间函数lib,并且内部默认使用的都是UTC时间,而中国实用的是Asia/Shanghai时间,为东八区,差了八个小时,导致默认的数据的ingest与query都存在八个小时的时差。
为了解决这一问题,我们可以从ingest与query两方面进行处理。首先是ingest,默认的格式为

{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

这种格式符合ISO 8601,使用的是UTC时间,如果非要用这种格式,需要前端架一个实时系统,把时间戳改为Asia/Shanghai,通常来说这么做非常费。
所以推荐第二张方法,就是在定义timestamp的解析时候才用以下格式

                    "timestampSpec": {
                        "column": "timestamp",
                        "format": "posix"
                    },

使用posix时间而不是默认的string。这样的话posix时间全球都是统一的,规避了时间转化的问题。
对于query的问题,我们可以指定查询的时区,如下

    "granularity": {
        "type": "period",
        "period": "PT1H",
        "timeZone": "Asia/Shanghai"
    },

这样查询的过程指定了时区,也可以自动转化,防止了时差问题。

Hadoop Balancer问题

最近关注了一下集群Balancer的运转,由于集群非常大,并且有一些长时任务一只运行,磁盘不均衡的问题非常严重,所以快速的均衡数据是非常必要的。
但是从日志上看,我们的Balancer经常hang住。之前的做法是通过一个cron脚本进行检查,hang住一段时间后,自动的重启balancer。
这两天特意看了一下日志和代码,发现hang住的原因是由于npe导致的。具体的代码如下:

             // update locations
             for (String datanodeUuid : blk.getDatanodeUuids()) {
               final BalancerDatanode d = datanodeMap.get(datanodeUuid);
-              if (datanode != null) { // not an unknown datanode
+              if (d != null) { // not an unknown datanode
                 block.addLocation(d);
               }
             }

进行判断的对象错误了,将datanode改为d就好了。原先的做法会导致Balncer提交的copy任务抛出npe错误。

    for (Source source : sources) {
      futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
    }

    // wait for all dispatcher threads to finish
    for (Future<?> future : futures) {
      try {
        future.get();
      } catch (ExecutionException e) {
        LOG.warn("Dispatcher thread failed", e.getCause());
      }
    }

在这部分,由于提交的BlockMoveDispatcher任务抛出了npe,同时在future.get没有设置超时,就会一只hang住。
解决方法:除了修改npe以为,还可以在future.get()设置超时时间,确保不会一直hang住。

How to Avoid Druid Write /Tmp Directory Full

Recently, i have noticed when i started some of the realtime node, it is easy for druid to write /tmp directory full. The file is all start with filePeon.
After i investigate the code and the configuration of druid, i found druid write the index file in druid.indexer.task.baseDir, and the default value is System.getProperty(“java.io.tmpdir”).
So we can set java.io.tmpdir to another directory when we start the realtime node as below:

java -Djava.io.tmpdir=/data0/druid/tmp -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=25g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/wb_ad_interest_druid.spec -classpath config/_common:config/realtime:/usr/local/hadoop-2.4.0/etc/hadoop:lib/* io.druid.cli.Main server realtime

ResourceManager dispatcher handler slow because RMStore Synchronized Method

Recently, I have noticed the Async dispatcher in our resource manager get pending for some times.
Here are some log:

2016-05-24 00:46:20,398 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 24000
2016-05-24 00:46:21,008 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 25000
2016-05-24 00:46:21,632 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 26000
2016-05-24 00:46:22,251 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 27000
2016-05-24 00:46:22,873 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 28000
2016-05-24 00:46:23,501 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 29000
2016-05-24 00:46:24,109 INFO org.apache.hadoop.yarn.event.AsyncDispatcher: Size of event-queue is 30000  

As we all know, the async dispatcher in resource manager normally handle the event quickly enough, but from the log ,we can notice the pending situation is serious.
So we investigated this problem, and jstack the rescue manager process during pending. Here is the jstack information:

"AsyncDispatcher event handler" prio=10 tid=0x00007f4d6db10000 nid=0x5bca waiting for monitor entry [0x00007f4d3aa8c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.storeNewApplication(RMStateStore.java:375)
        - waiting to lock <0x00000003bae88af0> (a org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl$RMAppNewlySavingTransition.transition(RMAppImpl.java:881)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl$RMAppNewlySavingTransition.transition(RMAppImpl.java:872)
        at org.apache.hadoop.yarn.state.StateMachineFactory$SingleInternalArc.doTransition(StateMachineFactory.java:362)
        at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
        at org.apache.hadoop.yarn.state.StateMachineFactory.access$300(StateMachineFactory.java:46)
        at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:448)
        - locked <0x0000000394cbae40> (a org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.handle(RMAppImpl.java:645)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.handle(RMAppImpl.java:82)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$ApplicationEventDispatcher.handle(ResourceManager.java:690)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$ApplicationEventDispatcher.handle(ResourceManager.java:674)
        at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:173)
        at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:106)
        at java.lang.Thread.run(Thread.java:662)

"AsyncDispatcher event handler" daemon prio=10 tid=0x00007f4d6d8f6000 nid=0x5c32 in Object.wait() [0x00007f4d3a183000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2031)
        - locked <0x000000032bc7bd58> (a java.util.LinkedList)
        at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2015)
        at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2113)
        - locked <0x000000032bc7ba80> (a org.apache.hadoop.hdfs.DFSOutputStream)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:70)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:103)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore.writeFile(FileSystemRMStateStore.java:528)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore.storeApplicationStateInternal(FileSystemRMStateStore.java:329)
        - locked <0x00000003bae88af0> (a org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.handleStoreEvent(RMStateStore.java:625)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$ForwardingEventHandler.handle(RMStateStore.java:770)
        at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$ForwardingEventHandler.handle(RMStateStore.java:765)
        at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:173)
        at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:106)
        at java.lang.Thread.run(Thread.java:662)

It seemed the async dispatcher in resource manager is blocked by the method of storeNewApplication in RMSteateStore.
From the code we know there are two async dispatcher in resource manager process. One is the main dispatcher for whole resource manager to deal with applications submit, scheduler and other staff. The other is the dispatcher in rmstore, the function of rmstore can be explained in this blog. Because rmstore use hdfs or zk for backup, the process time is slow, so it have its own dispatcher in case not to pending the main dispatcher of resource manager.
Unfortunately, we use hdfs for our rmstore back up, deep inside the code

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

The main dispatcher handle the event to store the new application context information, this method only pass the event to the rmstatestore dispatcher and return immediately. But the method is sync. And in the child class of rmstatestore — Filesystemrmstatestore, the code to store application to hdfs is as follow:

  @Override
  public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    String appIdStr = appId.toString();
    Path appDirPath = getAppDir(rmAppRoot, appIdStr);
    fs.mkdirs(appDirPath);
    Path nodeCreatePath = getNodePath(appDirPath, appIdStr);

    LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      writeFile(nodeCreatePath, appStateData);
    } catch (Exception e) {
      LOG.info("Error storing info for app: " + appId, e);
      throw e;
    }
  }

This method is also sync. So if dispatcher in rmstatestore occupy the lock, and write to hdfs slow, it is easy to find the main dispatcher get pending to wait for the lock. And the lock is meaningless for main dispatcher, so we can just get rid of the lock.
There are related jira for this problem, the jira is YARN-4398.

Druid 系统架构说明(二)

一.说明

续前一篇Druid 系统架构说明 ,主要介绍了druid的基本架构以及使用说明。本篇更新内容,主要介绍的是使用Realtime Index service 替代之前介绍的realtime node来完成实时ingest,index build,hand off等任务。
首先要说明一下realtime node与index server的一些区别:
Alt text
可以看出当druid集群规模增大时,使用Realtime Index Service是必须的。

二.架构与流程

相比于之前博客缩写的架构,使用Realtime Index Service的Druid系统增加了几个组件,现在的系统架构图如下:
Druid推模式
上一篇博客主要介绍的是druid的拉模式,数据通过不同的Realtime Node通过kafka等拉取数据,建立索引,handoff到Historical Node。随着Druid业务增多,规模扩大,对Realtime Node的管理变成了非常繁琐的事情,所以Druid开发了推模式,解决这一问题。相信这也是很多分布式系统应用最后都需要解决的问题,就是使部署运维简单化,自动化。
这一篇主要介绍的是推模式,推模式增加了一些角色,分别是Overlord Node, MiddleManager Node, peon以及客户端的Tranquility. 下面一一介绍各个模块的功能以及流程。

(一)角色

1.Tranquility
客户端发送工具,用户通过Tranquility将数据实时的发送到Druid中。Tranquility负责与Zk通信,与Overlord交互,根据timestamp将有效数据发送到Peon中。
2.Overlord
负责分配任务到不同的Middle Manager中,类似于ResourceManager。
3.Middle Manager
负责根据不同的任务启动Peon,并且负责Peon启动后运行的状态,类似于NodeManager。
4.Peon
Peon代替了Realtime Node的大部分功能,通过Middle Manager启动,以独立进程的形式启动。

(二)流程说明

1.用户的spec文件在Tranquility中定义,首先Tranquility通过spec初始化,获得zk中Overlord的地址,与Overlord通信。
2.Overlord得到新写入任务后,查询zk节点信息,选择一个Middle Manager节点启动来启动peon,并将信息写入到zk中。
3.Middle Manager一直监控zk,发现有新的任务分配后,启动一个Peon进程,并监控Peon进程的状态。
4.Peon与Realtime Node流程基本一致,所不同的是Peon使用的是HTTP接口来接收数据,RealTime Node更多的是内部的线程不断的拉取Kafka的数据。
5.Tranquility随后通过zk获取Peon机器地址和端口,将数据不断的发送到Peon中。
6.Peon根据spec规则,定时或者定量将数据build index,handoff到deep storage(HDFS)中。
7.随后就是Coordinator根据Peon在zk中信息,将数据写入到sql中,并分配Historical Node去deep storage拉取index数据。
8.Historical Node到deep storage拉取index数据到本地,重建index到内存中,至此数据流入完成。

三.总结

通过realtime index service的推模式,Druid的部署运维管理更加简单,易用度更高。后面一些blog会对Druid代码进行分析。

js递归实现树结构

var treeData = {
        name: 'root',
        children: [{
            name: 'child1',
            children: [{
                name: 'child1_1',
                children: [{
                    name: 'child1_1_1'
                }]
            }]
        }, {
            name: 'child2',
            children: [{
                name: 'child2_1'
            }]
        }, {
            name: 'child3'
        }]
    };
    var strArr = [treeData.name];
    //递归渲染树结构,关键在于如何抽象出递归的参数,node(叶子节点) rootOrder(记录层级) fn(用于渲染每个节点)
    function goThroughTree(node, rootOrder, fn) {
        var children = node.children || [];
        if (children.length) {
            for (var i = 0; i < children.length; i++) {
                var item = children[i];
                var index = i + 1;
                var order = rootOrder ? rootOrder + '.' + index : index;
                fn(item, order);
                goThroughTree(item, order, fn);
            }

        }
    }
    goThroughTree(treeData, 0, function (item, order) {
        strArr.push('<div>')
        strArr.push(order);
        strArr.push(item.name);
        strArr.push('</div>');
    });
    document.write(strArr.join(''))

得到结果:
root
1child1
1.1child1_1
1.1.1child1_1_1
2child2
2.1child2_1
3child3