Some Notes About Ops in Druid OLAP System

I have talked about Druid a lot in blog, including the architecture of druid and how to change time zone in druid. This post will focus on the basic operation in druid which operate everyday.

First, how to write the Hadoop map reduce index spec file


{
  "dataSchema" : {
    "dataSource" : "ingestion_test",
    "parser" : {
      "type" : "hadoopyString",
      "parseSpec" : {
        "format" : "tsv",
        "timestampSpec" : {
          "column" : "dt",
          "format" : "posix"
        },
        "dimensionsSpec" : {
          "dimensions": ["grade","src_flag","school_id","gender_desc","prov_name","city_name"]
        },
        "delimiter":"\u0001",
        "listDelimiter":"\u0002",
        "columns":  ["dt","uid","grade","src_flag","school_id","gender_desc","prov_name","city_name","flag"]
      }
    },
    "metricsSpec" : [
              {
                    "type": "count",
                    "name": "count_druid"
              },
              {
                    "type": "hyperUnique",
                    "name": "uv",
                    "fieldName" : "uid"
              },
              {
                    "type": "longSum",
                    "name": "count",
                    "fieldName" : "flag"
              }
    ],
    "granularitySpec" : {
      "type" : "uniform",
      "segmentGranularity" : "HOUR",
      "queryGranularity" : "NONE",
      "intervals" : [ "2017-3-13/2017-3-14" ]
    }
  },
  "ioConfig" : {
    "type" : "hadoop",
    "inputSpec" : {
      "type" : "static",
      "paths" : "/data//000000_0"
    },
    "metadataUpdateSpec" : {
                "type":"mysql",
                "connectURI":"jdbc:mysql://ip:3306/druid",
                "password" : "password",
                "user" : "user",
                "segmentTable" : "druid_segments"
    },
    "segmentOutputPath" : "hdfs://ns1/user/druid/localStorage"
  },
  "tuningConfig" : {
    "type" : "hadoop",
    "workingPath": "hdfs://ns1/user/druid/localStorage/workdir",
    "partitionsSpec" : {
      "type" : "hashed",
      "numShards" : 3
    },
    "shardSpecs" : { },
    "leaveIntermediate" : false,
    "cleanupOnFailure" : true,
    "overwriteFiles" : false,
    "ignoreInvalidRows" : false,
    "jobProperties" : { },
    "combineText" : false,
    "persistInHeap" : false,
    "ingestOffheap" : false,
    "bufferSize" : 134217728,
    "aggregationBufferRatio" : 0.5,
    "rowFlushBoundary" : 300000,
    "useCombiner" : true,
    "buildV9Directly" : true
  }
}

In the spec file ,you can assign reduce number in numShareds parameters.

Second, the example spec file which directly write to druid using tranquility


{
  "dataSources": [
    {
      "spec": {
        "dataSchema": {
          "dataSource": "main_static_log_tranq1",
          "parser": {
            "type": "string",
            "parseSpec": {
              "format": "json",
              "timestampSpec": {
                "column": "timestamp",
                "format": "posix"
              },
              "dimensionsSpec": {
                "dimensions": ["typeSignII",  "typeSignI", "typeSignIII", "typeSignIV",  "responseCode",  "processTotalTime", "serverIp", "terminal", "type", "service"],
                "dimensionExclusions": [],
                "spatialDimensions": []
              }
            }
          },
          "metricsSpec": [
            {
              "type": "count",
              "name": "count"
            },{
              "type": "doubleSum",
              "name": "mProcessTotalTime",
              "fieldName" : "mProcessTotalTime"
            }
          ],
          "granularitySpec": {
            "type": "uniform",
            "segmentGranularity": "SIX_HOUR",
            "queryGranularity": "MINUTE"
          }
        },
        "tuningConfig": {
          "type": "realtime",
          "maxRowsInMemory": 100000,
          "intermediatePersistPeriod": "PT10m",
          "windowPeriod": "PT60m"
        }
      },
      "properties" : {
            "task.partitions" : "1",
            "task.replicants" : "2"
      }
    }
  ],
  "properties": {
    "zookeeper.connect": "10.39.2.161:2181",
    "druid.selectors.indexing.serviceName": "overlord",
    "druid.discovery.curator.path": "/druid/discovery",
    "druidBeam.overlordPollPeriod": "PT20S"
  }
}

You can assign partitions and replications using task.partitions and task.replicants parameters.

How to Submit a Merge Job to Index Service in Druid

Normally, we use index service instead of realtime nodes in Druid to ingest realtime data. If you have multiple partitions in one time, and each of them is small, you have to merge them together to form a big segments to boost query effeciency.
For example we have two segments in the same time interval, just as below

two segments on the same time interval
What we care is to merge them together. Here is how to write the merge.json file and submit:


{
    "type": "merge",
    "dataSource": "main_static_log_tt",
    "aggregations": [
                {
                    "type": "count",
                    "name": "count"
                },{
                    "type": "doubleSum",
                    "name": "mProcessTotalTime",
                    "fieldName" : "mProcessTotalTime"
                }
    ],
    "rollup": "false",
    "segments": [
{"dataSource":"main_static_log_tt","interval":"2017-03-27T10:05:00.000Z/2017-03-27T10:06:00.000Z","version":"2017-03-27T10:05:00.000Z","loadSpec":{"type":"local","path":"/data0/test/file/main_static_log_tt/2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z/2017-03-27T10:05:00.000Z/0/index.zip"},"dimensions":"processTotalTime,responseCode,serverIp,typeSignI,typeSignII,typeSignIII,typeSignIV","metrics":"count,mProcessTotalTime","shardSpec":{"type":"none"},"binaryVersion":9,"size":129991,"identifier":"main_static_log_tt_2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z_2017-03-27T10:05:00.000Z"},
{"dataSource":"main_static_log_tt","interval":"2017-03-27T10:05:00.000Z/2017-03-27T10:06:00.000Z","version":"2017-03-27T10:05:00.000Z","loadSpec":{"type":"local","path":"/data0/test/file/main_static_log_tt/2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z/2017-03-27T10:05:00.000Z/1/index.zip"},"dimensions":"processTotalTime,responseCode,serverIp,typeSignI,typeSignII,typeSignIII,typeSignIV","metrics":"count,mProcessTotalTime","shardSpec":{"type":"none"},"binaryVersion":9,"size":190243,"identifier":"main_static_log_tt_2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z_2017-03-27T10:05:00.000Z_1"}
    ]
}

Remember to change the shardSpec type to none, because the merge function only merge that type, it ignore hash or linear type. But we can avoid it, we just change the type to none, it has some problem, later post i will talk about how to change the code and make it work.
After edit the json file, you can submit to your overlord node as below:

curl http://host:port/druid/indexer/v1/task -H "Content-Type:application/json" -X POST --data @merge.json

Kill the Job

Some time, you just want to take a test and later on you can kill the task to free the slot in index service. Here is how to write kill.json file and submit it:


{
    "type": "kill",
    "id": "sbsina",
    "dataSource": "main_static_log_tt",
    "interval": "2017-03-22T07:47:00.000Z/2017-03-28T07:48:00.000Z"
}

Submit it:

curl http://host:port/druid/indexer/v1/task -H "Content-Type:application/json" -X POST --data @kill.json

Disable Middle Manager to Update

submit a post to middle manager http port:

curl -X POST http://ip:port/druid/worker/v1/disable

Druid Ingest Format问题

 

Druid使用过程中需要对历史数据进行索引,由于历史数据都是hive表形式,分隔符为\001,所以需要druid对ingest的format 任意delimiter进行支持,以下是支持的形式:

"parser" : {
"type" : "hadoopyString",
"parseSpec" : {
"format" : "tsv",
"timestampSpec" : {
"column" : "dt",
"format" : "posix"
},
"dimensionsSpec" : {
"dimensions": ["grade","src_flag","school_name","gender_desc","prov_name","city_name","school_prov"]
},
"delimiter":"\u0001",
"listDelimiter":"\u0002",
"columns": ["dt","uid","grade","src_flag","school_name","gender_desc","prov_name","city_name","school_prov","flag"]
}
},

指定解析格式是tsv,delimiter为\u0001,listDelimiter是multi时候使用,目前没有使用,定义为\u0002,启动任务即可。

Druid Batch Job 问题汇总

最近在整理使用Druid Batch Job所遇到的问题,下面一一记录,我使用的Druid版本是0.8.2,所以以下方法适用于0.8.2。
一、依赖包问题
因为很多机器都无法连接外网,所以有必要修改druid的源文件,使他能够从我们的本地中心库下载文件,所需要修改的文件为ExtensionsConfig.java,修改内容如下

   private List<String> remoteRepositories = ImmutableList.of(
-      "https://repo1.maven.org/maven2/",
-      "https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local"
+      "http://10.39.0.110:8081/nexus/content/groups/public",
+      "http://10.39.0.110:8081/nexus/content/repositories/thirdparty"
   );

这样将会从我们的中心库下载所需要的包。
修改完后我们可以把所需要的包拉取的本地用于提交,修改$DRUID_HOME/config/_common/common.runtime.properties

druid.extensions.localRepository=/usr/local/druid-0.8.2/localRepo

指定本地仓库的位置。 随后在$DRUID_HOME目录下执行

java -cp  config/_common:config/broker:/usr/local/hadoop-2.4.0/etc/hadoop:lib/*  io.druid.cli.Main tools pull-deps

这样会将依赖包下载。
如果需要增加自己的的第三方依赖包,也修改$DRUID_HOME/config/_common/common.runtime.properties

druid.extensions.coordinates=["com.sina.hivexec:hive-exec:0.13.0"]

这样提交的时候会将依赖包加入到classpath中。

二、counter计数问题
我们的问题是由于一个第三方包导致获取counter计数抛异常,随后导致任务失败,目前解决方法是通过修改源码,catch住异常,查看是否是由于counter计数引起的,并且查看任务是否成功,成功的话继续下一步任务,而不是直接抛出异常结束。

三、有关reduce个数问题
简单说一下druid的流程,只讲一下partitionsSpec type为hash的情况。
1、如果不指定numShards 那么会分两个任务,第一个任务通过hyperloglog对每一个分区去查找基数大小,reduce会将每个分区的基数大小输出。
随后job会根据targetPartitionSize决定由几个shards来跑这个第二个任务,第二个任务就是生成index,基本流程跟realtime一样,根据日志,生成index。但是如果shards为1,相当于只有一个reduce去跑,会比较慢。这样如果基数是20000, “targetPartitionSize” : 10000,那么每个时间分区就只有20000/10000=2个reduce去跑。
2、如果指定numShards,那么就只有index一个任务,每个时间分区启动numShards个reduce,如果知道大概的数据量以及基数,可以直接指定numShareds.

四、时区问题
由于提交的时候指定的是UTC时区,所以需要在map 以及reduce阶段也制定时区,指定方法为,修改提交机器的mapred-site.xml

  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-Xmx1280M -Duser.timezone=UTC</value>
  </property>
  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-Xmx1536M -Duser.timezone=UTC</value>
  </property>

关于Druid时区的问题

Druid使用过程中,一开始比较烦恼的是关于时区的问题。Druid内部采用joda作为时间函数lib,并且内部默认使用的都是UTC时间,而中国实用的是Asia/Shanghai时间,为东八区,差了八个小时,导致默认的数据的ingest与query都存在八个小时的时差。
为了解决这一问题,我们可以从ingest与query两方面进行处理。首先是ingest,默认的格式为

{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

这种格式符合ISO 8601,使用的是UTC时间,如果非要用这种格式,需要前端架一个实时系统,把时间戳改为Asia/Shanghai,通常来说这么做非常费。
所以推荐第二张方法,就是在定义timestamp的解析时候才用以下格式

                    "timestampSpec": {
                        "column": "timestamp",
                        "format": "posix"
                    },

使用posix时间而不是默认的string。这样的话posix时间全球都是统一的,规避了时间转化的问题。
对于query的问题,我们可以指定查询的时区,如下

    "granularity": {
        "type": "period",
        "period": "PT1H",
        "timeZone": "Asia/Shanghai"
    },

这样查询的过程指定了时区,也可以自动转化,防止了时差问题。

How to Avoid Druid Write /Tmp Directory Full

Recently, i have noticed when i started some of the realtime node, it is easy for druid to write /tmp directory full. The file is all start with filePeon.
After i investigate the code and the configuration of druid, i found druid write the index file in druid.indexer.task.baseDir, and the default value is System.getProperty(“java.io.tmpdir”).
So we can set java.io.tmpdir to another directory when we start the realtime node as below:

java -Djava.io.tmpdir=/data0/druid/tmp -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=25g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/wb_ad_interest_druid.spec -classpath config/_common:config/realtime:/usr/local/hadoop-2.4.0/etc/hadoop:lib/* io.druid.cli.Main server realtime