Some Notes About Ops in Druid OLAP System

I have talked about Druid a lot in blog, including the architecture of druid and how to change time zone in druid. This post will focus on the basic operation in druid which operate everyday.

First, how to write the Hadoop map reduce index spec file


{
  "dataSchema" : {
    "dataSource" : "ingestion_test",
    "parser" : {
      "type" : "hadoopyString",
      "parseSpec" : {
        "format" : "tsv",
        "timestampSpec" : {
          "column" : "dt",
          "format" : "posix"
        },
        "dimensionsSpec" : {
          "dimensions": ["grade","src_flag","school_id","gender_desc","prov_name","city_name"]
        },
        "delimiter":"\u0001",
        "listDelimiter":"\u0002",
        "columns":  ["dt","uid","grade","src_flag","school_id","gender_desc","prov_name","city_name","flag"]
      }
    },
    "metricsSpec" : [
              {
                    "type": "count",
                    "name": "count_druid"
              },
              {
                    "type": "hyperUnique",
                    "name": "uv",
                    "fieldName" : "uid"
              },
              {
                    "type": "longSum",
                    "name": "count",
                    "fieldName" : "flag"
              }
    ],
    "granularitySpec" : {
      "type" : "uniform",
      "segmentGranularity" : "HOUR",
      "queryGranularity" : "NONE",
      "intervals" : [ "2017-3-13/2017-3-14" ]
    }
  },
  "ioConfig" : {
    "type" : "hadoop",
    "inputSpec" : {
      "type" : "static",
      "paths" : "/data//000000_0"
    },
    "metadataUpdateSpec" : {
                "type":"mysql",
                "connectURI":"jdbc:mysql://ip:3306/druid",
                "password" : "password",
                "user" : "user",
                "segmentTable" : "druid_segments"
    },
    "segmentOutputPath" : "hdfs://ns1/user/druid/localStorage"
  },
  "tuningConfig" : {
    "type" : "hadoop",
    "workingPath": "hdfs://ns1/user/druid/localStorage/workdir",
    "partitionsSpec" : {
      "type" : "hashed",
      "numShards" : 3
    },
    "shardSpecs" : { },
    "leaveIntermediate" : false,
    "cleanupOnFailure" : true,
    "overwriteFiles" : false,
    "ignoreInvalidRows" : false,
    "jobProperties" : { },
    "combineText" : false,
    "persistInHeap" : false,
    "ingestOffheap" : false,
    "bufferSize" : 134217728,
    "aggregationBufferRatio" : 0.5,
    "rowFlushBoundary" : 300000,
    "useCombiner" : true,
    "buildV9Directly" : true
  }
}

In the spec file ,you can assign reduce number in numShareds parameters.

Second, the example spec file which directly write to druid using tranquility


{
  "dataSources": [
    {
      "spec": {
        "dataSchema": {
          "dataSource": "main_static_log_tranq1",
          "parser": {
            "type": "string",
            "parseSpec": {
              "format": "json",
              "timestampSpec": {
                "column": "timestamp",
                "format": "posix"
              },
              "dimensionsSpec": {
                "dimensions": ["typeSignII",  "typeSignI", "typeSignIII", "typeSignIV",  "responseCode",  "processTotalTime", "serverIp", "terminal", "type", "service"],
                "dimensionExclusions": [],
                "spatialDimensions": []
              }
            }
          },
          "metricsSpec": [
            {
              "type": "count",
              "name": "count"
            },{
              "type": "doubleSum",
              "name": "mProcessTotalTime",
              "fieldName" : "mProcessTotalTime"
            }
          ],
          "granularitySpec": {
            "type": "uniform",
            "segmentGranularity": "SIX_HOUR",
            "queryGranularity": "MINUTE"
          }
        },
        "tuningConfig": {
          "type": "realtime",
          "maxRowsInMemory": 100000,
          "intermediatePersistPeriod": "PT10m",
          "windowPeriod": "PT60m"
        }
      },
      "properties" : {
            "task.partitions" : "1",
            "task.replicants" : "2"
      }
    }
  ],
  "properties": {
    "zookeeper.connect": "10.39.2.161:2181",
    "druid.selectors.indexing.serviceName": "overlord",
    "druid.discovery.curator.path": "/druid/discovery",
    "druidBeam.overlordPollPeriod": "PT20S"
  }
}

You can assign partitions and replications using task.partitions and task.replicants parameters.

How to Submit a Merge Job to Index Service in Druid

Normally, we use index service instead of realtime nodes in Druid to ingest realtime data. If you have multiple partitions in one time, and each of them is small, you have to merge them together to form a big segments to boost query effeciency.
For example we have two segments in the same time interval, just as below

two segments on the same time interval
What we care is to merge them together. Here is how to write the merge.json file and submit:


{
    "type": "merge",
    "dataSource": "main_static_log_tt",
    "aggregations": [
                {
                    "type": "count",
                    "name": "count"
                },{
                    "type": "doubleSum",
                    "name": "mProcessTotalTime",
                    "fieldName" : "mProcessTotalTime"
                }
    ],
    "rollup": "false",
    "segments": [
{"dataSource":"main_static_log_tt","interval":"2017-03-27T10:05:00.000Z/2017-03-27T10:06:00.000Z","version":"2017-03-27T10:05:00.000Z","loadSpec":{"type":"local","path":"/data0/test/file/main_static_log_tt/2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z/2017-03-27T10:05:00.000Z/0/index.zip"},"dimensions":"processTotalTime,responseCode,serverIp,typeSignI,typeSignII,typeSignIII,typeSignIV","metrics":"count,mProcessTotalTime","shardSpec":{"type":"none"},"binaryVersion":9,"size":129991,"identifier":"main_static_log_tt_2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z_2017-03-27T10:05:00.000Z"},
{"dataSource":"main_static_log_tt","interval":"2017-03-27T10:05:00.000Z/2017-03-27T10:06:00.000Z","version":"2017-03-27T10:05:00.000Z","loadSpec":{"type":"local","path":"/data0/test/file/main_static_log_tt/2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z/2017-03-27T10:05:00.000Z/1/index.zip"},"dimensions":"processTotalTime,responseCode,serverIp,typeSignI,typeSignII,typeSignIII,typeSignIV","metrics":"count,mProcessTotalTime","shardSpec":{"type":"none"},"binaryVersion":9,"size":190243,"identifier":"main_static_log_tt_2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z_2017-03-27T10:05:00.000Z_1"}
    ]
}

Remember to change the shardSpec type to none, because the merge function only merge that type, it ignore hash or linear type. But we can avoid it, we just change the type to none, it has some problem, later post i will talk about how to change the code and make it work.
After edit the json file, you can submit to your overlord node as below:

curl http://host:port/druid/indexer/v1/task -H "Content-Type:application/json" -X POST --data @merge.json

Kill the Job

Some time, you just want to take a test and later on you can kill the task to free the slot in index service. Here is how to write kill.json file and submit it:


{
    "type": "kill",
    "id": "sbsina",
    "dataSource": "main_static_log_tt",
    "interval": "2017-03-22T07:47:00.000Z/2017-03-28T07:48:00.000Z"
}

Submit it:

curl http://host:port/druid/indexer/v1/task -H "Content-Type:application/json" -X POST --data @kill.json

Disable Middle Manager to Update

submit a post to middle manager http port:

curl -X POST http://ip:port/druid/worker/v1/disable

网卡配置惊魂记

这是我近期遇到的一个集群的问题,涉及到的主要是硬件的配置导致的软件层问题。起初发现这个问题是由于我们开发的基于YARN的实时流系统某些机器的tps非常低,一开始并没有想到是硬件的问题,所以一直在排查实时框架的问题。最后发现连ping值都很低,才想到是OS配置相关的问题。
这是ping的延迟:

64 bytes from 10.39.****: icmp_seq=34 ttl=63 time=27.4 ms
64 bytes from 10.39.****: icmp_seq=35 ttl=63 time=21.2 ms
64 bytes from 10.39.****: icmp_seq=36 ttl=63 time=5.95 ms
64 bytes from 10.39.****: icmp_seq=37 ttl=63 time=16.5 ms
64 bytes from 10.39.****: icmp_seq=38 ttl=63 time=12.3 ms

底下是正常的ping的延迟

64 bytes from 10.39.****: icmp_seq=13 ttl=63 time=0.230 ms
64 bytes from 10.39.****: icmp_seq=14 ttl=63 time=0.203 ms
64 bytes from 10.39.****: icmp_seq=15 ttl=63 time=0.255 ms
64 bytes from 10.39.****: icmp_seq=16 ttl=63 time=0.229 ms

推测就是网卡的问题,我们是两个千兆网卡bonding,查看/proc/interrupt发现每个网卡只绑到了一个core上,这个core一直跑满,并且丢包,正常的网卡配置会使用多队列,将不同队列亲和到不同的core上,提高tps等。相关的资料可以参见这篇文章:网卡多队列简介.
由于实时系统发送的大多是小包,所以才会有这么大的影响,解决方法比较坑爹,配置好多队列后需要重启服务器。