Some Notes About Ops in Druid OLAP System

I have talked about Druid a lot in blog, including the architecture of druid and how to change time zone in druid. This post will focus on the basic operation in druid which operate everyday.

First, how to write the Hadoop map reduce index spec file


{
  "dataSchema" : {
    "dataSource" : "ingestion_test",
    "parser" : {
      "type" : "hadoopyString",
      "parseSpec" : {
        "format" : "tsv",
        "timestampSpec" : {
          "column" : "dt",
          "format" : "posix"
        },
        "dimensionsSpec" : {
          "dimensions": ["grade","src_flag","school_id","gender_desc","prov_name","city_name"]
        },
        "delimiter":"\u0001",
        "listDelimiter":"\u0002",
        "columns":  ["dt","uid","grade","src_flag","school_id","gender_desc","prov_name","city_name","flag"]
      }
    },
    "metricsSpec" : [
              {
                    "type": "count",
                    "name": "count_druid"
              },
              {
                    "type": "hyperUnique",
                    "name": "uv",
                    "fieldName" : "uid"
              },
              {
                    "type": "longSum",
                    "name": "count",
                    "fieldName" : "flag"
              }
    ],
    "granularitySpec" : {
      "type" : "uniform",
      "segmentGranularity" : "HOUR",
      "queryGranularity" : "NONE",
      "intervals" : [ "2017-3-13/2017-3-14" ]
    }
  },
  "ioConfig" : {
    "type" : "hadoop",
    "inputSpec" : {
      "type" : "static",
      "paths" : "/data//000000_0"
    },
    "metadataUpdateSpec" : {
                "type":"mysql",
                "connectURI":"jdbc:mysql://ip:3306/druid",
                "password" : "password",
                "user" : "user",
                "segmentTable" : "druid_segments"
    },
    "segmentOutputPath" : "hdfs://ns1/user/druid/localStorage"
  },
  "tuningConfig" : {
    "type" : "hadoop",
    "workingPath": "hdfs://ns1/user/druid/localStorage/workdir",
    "partitionsSpec" : {
      "type" : "hashed",
      "numShards" : 3
    },
    "shardSpecs" : { },
    "leaveIntermediate" : false,
    "cleanupOnFailure" : true,
    "overwriteFiles" : false,
    "ignoreInvalidRows" : false,
    "jobProperties" : { },
    "combineText" : false,
    "persistInHeap" : false,
    "ingestOffheap" : false,
    "bufferSize" : 134217728,
    "aggregationBufferRatio" : 0.5,
    "rowFlushBoundary" : 300000,
    "useCombiner" : true,
    "buildV9Directly" : true
  }
}

In the spec file ,you can assign reduce number in numShareds parameters.

Second, the example spec file which directly write to druid using tranquility


{
  "dataSources": [
    {
      "spec": {
        "dataSchema": {
          "dataSource": "main_static_log_tranq1",
          "parser": {
            "type": "string",
            "parseSpec": {
              "format": "json",
              "timestampSpec": {
                "column": "timestamp",
                "format": "posix"
              },
              "dimensionsSpec": {
                "dimensions": ["typeSignII",  "typeSignI", "typeSignIII", "typeSignIV",  "responseCode",  "processTotalTime", "serverIp", "terminal", "type", "service"],
                "dimensionExclusions": [],
                "spatialDimensions": []
              }
            }
          },
          "metricsSpec": [
            {
              "type": "count",
              "name": "count"
            },{
              "type": "doubleSum",
              "name": "mProcessTotalTime",
              "fieldName" : "mProcessTotalTime"
            }
          ],
          "granularitySpec": {
            "type": "uniform",
            "segmentGranularity": "SIX_HOUR",
            "queryGranularity": "MINUTE"
          }
        },
        "tuningConfig": {
          "type": "realtime",
          "maxRowsInMemory": 100000,
          "intermediatePersistPeriod": "PT10m",
          "windowPeriod": "PT60m"
        }
      },
      "properties" : {
            "task.partitions" : "1",
            "task.replicants" : "2"
      }
    }
  ],
  "properties": {
    "zookeeper.connect": "10.39.2.161:2181",
    "druid.selectors.indexing.serviceName": "overlord",
    "druid.discovery.curator.path": "/druid/discovery",
    "druidBeam.overlordPollPeriod": "PT20S"
  }
}

You can assign partitions and replications using task.partitions and task.replicants parameters.

How to Submit a Merge Job to Index Service in Druid

Normally, we use index service instead of realtime nodes in Druid to ingest realtime data. If you have multiple partitions in one time, and each of them is small, you have to merge them together to form a big segments to boost query effeciency.
For example we have two segments in the same time interval, just as below

two segments on the same time interval
What we care is to merge them together. Here is how to write the merge.json file and submit:


{
    "type": "merge",
    "dataSource": "main_static_log_tt",
    "aggregations": [
                {
                    "type": "count",
                    "name": "count"
                },{
                    "type": "doubleSum",
                    "name": "mProcessTotalTime",
                    "fieldName" : "mProcessTotalTime"
                }
    ],
    "rollup": "false",
    "segments": [
{"dataSource":"main_static_log_tt","interval":"2017-03-27T10:05:00.000Z/2017-03-27T10:06:00.000Z","version":"2017-03-27T10:05:00.000Z","loadSpec":{"type":"local","path":"/data0/test/file/main_static_log_tt/2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z/2017-03-27T10:05:00.000Z/0/index.zip"},"dimensions":"processTotalTime,responseCode,serverIp,typeSignI,typeSignII,typeSignIII,typeSignIV","metrics":"count,mProcessTotalTime","shardSpec":{"type":"none"},"binaryVersion":9,"size":129991,"identifier":"main_static_log_tt_2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z_2017-03-27T10:05:00.000Z"},
{"dataSource":"main_static_log_tt","interval":"2017-03-27T10:05:00.000Z/2017-03-27T10:06:00.000Z","version":"2017-03-27T10:05:00.000Z","loadSpec":{"type":"local","path":"/data0/test/file/main_static_log_tt/2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z/2017-03-27T10:05:00.000Z/1/index.zip"},"dimensions":"processTotalTime,responseCode,serverIp,typeSignI,typeSignII,typeSignIII,typeSignIV","metrics":"count,mProcessTotalTime","shardSpec":{"type":"none"},"binaryVersion":9,"size":190243,"identifier":"main_static_log_tt_2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z_2017-03-27T10:05:00.000Z_1"}
    ]
}

Remember to change the shardSpec type to none, because the merge function only merge that type, it ignore hash or linear type. But we can avoid it, we just change the type to none, it has some problem, later post i will talk about how to change the code and make it work.
After edit the json file, you can submit to your overlord node as below:

curl http://host:port/druid/indexer/v1/task -H "Content-Type:application/json" -X POST --data @merge.json

Kill the Job

Some time, you just want to take a test and later on you can kill the task to free the slot in index service. Here is how to write kill.json file and submit it:


{
    "type": "kill",
    "id": "sbsina",
    "dataSource": "main_static_log_tt",
    "interval": "2017-03-22T07:47:00.000Z/2017-03-28T07:48:00.000Z"
}

Submit it:

curl http://host:port/druid/indexer/v1/task -H "Content-Type:application/json" -X POST --data @kill.json

Disable Middle Manager to Update

submit a post to middle manager http port:

curl -X POST http://ip:port/druid/worker/v1/disable

Druid Ingest Format问题

 

Druid使用过程中需要对历史数据进行索引,由于历史数据都是hive表形式,分隔符为\001,所以需要druid对ingest的format 任意delimiter进行支持,以下是支持的形式:

"parser" : {
"type" : "hadoopyString",
"parseSpec" : {
"format" : "tsv",
"timestampSpec" : {
"column" : "dt",
"format" : "posix"
},
"dimensionsSpec" : {
"dimensions": ["grade","src_flag","school_name","gender_desc","prov_name","city_name","school_prov"]
},
"delimiter":"\u0001",
"listDelimiter":"\u0002",
"columns": ["dt","uid","grade","src_flag","school_name","gender_desc","prov_name","city_name","school_prov","flag"]
}
},

指定解析格式是tsv,delimiter为\u0001,listDelimiter是multi时候使用,目前没有使用,定义为\u0002,启动任务即可。

Apache Kylin 安装说明

Kylin运行依赖于Hive,MR,Hbase等,所以Kylin的安装需要先配置好Hive,Hadoop,Hbase环境才行。
首先要注意的是,Kylin运行提交任务默认会认为MR任务所在的文件夹jar包,在集群其它机器上相同的位置也有相应的jar包。比如hadoop-mapreduce-client-core-2.4.0.jar位置在kylin机器上为 /usr/local/hadoop, 那么默认运行时候在集群中的机器上jar包的位置也需要在同样的位置。这样的话,我们就需要根据Hadoop集群位置来命名Kylin机器上hadoop安装包路径。
其次就是安装hive,hbase,并且注意的是我们采用的是跑kylin构建任务的Hadoop集群与提供查询服务的Hbase底层Hadoop集群分离的架构,需要跑任务的集群能够有权限访问Hbase集群中Hadoop的各个节点。
最后是关于Kylin的配置,Kylin的配置都在$KYLIN_HOME/conf/kylin.properties
需要注意的几个选项:
kylin.metadata.url=kylin_metadata@hbase kylin元数据存储
kylin.hdfs.working.dir=/user/jiangyu2/kylin Kylin构建过程结果存储路径
kylin.hbase.cluster.fs=hdfs://ns-test 如果是两个hdfs,需要设置hbase的hdfs
kylin.hbase.cluster.hdfs.config.file=hbase.hdfs.xml 如果是两个集群,需要设置hbase的conf ,位置在$KYLIN_HOME/conf下面的hbase.hdfs.xml
最后需要修改Hbase的一个配置

  <property>
      <name>hbase.use.dynamic.jar</name>
      <value>false</value>
  </property>

另外,Kylin的权限可以通过LDAP来管理的,也可以通过硬编码,如果采用硬编码,修改位置在$KYLIN_HOME/tomcat/webapps/kylin/WEB-INF/classes/kylinSecurity.xml

        <beans profile="testing">
                <!-- user auth -->
                <bean id="passwordEncoder" class="org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder" />

                <scr:authentication-manager alias="testingAuthenticationManager">
                        <scr:authentication-provider>
                                <scr:user-service>
                                        <scr:user name="MODELER" password="$2a$10$Le5ernTeGNIARwMJsY0WaOLioNQdb0QD11DwjeyNqqNRp5NaDo2FG" authorities="ROLE_MODELER" />
                                        <scr:user name="ANALYST" password="$2a$10$s4INO3XHjPP5Vm2xH027Ce9QeXWdrfq5pvzuGr9z/lQmHqi0rsbNi" authorities="ROLE_ANALYST" />
                                        <scr:user name="ADMIN" password="$2a$10$o3ktIWsGYxXNuUWQiYlZXOW5hWcqyNAFQsSSCSEWoC/BRVMAUjL32" authorities="ROLE_MODELER, ROLE_ANALYST, ROLE_ADMIN" />
                                </scr:user-service>
                                <scr:password-encoder ref="passwordEncoder" />
                        </scr:authentication-provider>
                </scr:authentication-manager>
        </beans>

可以新增加用户,新增加的用户密码由下面的代码来实现

package search;

import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;

/**
 * Created by jiangyu2 on 12/7/16.
 */
public class F {
  public static void main(String[] args) {
    String password = "suda_test";
    PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();
    String encodedPassword = passwordEncoder.encode(password);
    System.out.print(encodedPassword);
  }
}

关于Druid时区的问题

Druid使用过程中,一开始比较烦恼的是关于时区的问题。Druid内部采用joda作为时间函数lib,并且内部默认使用的都是UTC时间,而中国实用的是Asia/Shanghai时间,为东八区,差了八个小时,导致默认的数据的ingest与query都存在八个小时的时差。
为了解决这一问题,我们可以从ingest与query两方面进行处理。首先是ingest,默认的格式为

{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

这种格式符合ISO 8601,使用的是UTC时间,如果非要用这种格式,需要前端架一个实时系统,把时间戳改为Asia/Shanghai,通常来说这么做非常费。
所以推荐第二张方法,就是在定义timestamp的解析时候才用以下格式

                    "timestampSpec": {
                        "column": "timestamp",
                        "format": "posix"
                    },

使用posix时间而不是默认的string。这样的话posix时间全球都是统一的,规避了时间转化的问题。
对于query的问题,我们可以指定查询的时区,如下

    "granularity": {
        "type": "period",
        "period": "PT1H",
        "timeZone": "Asia/Shanghai"
    },

这样查询的过程指定了时区,也可以自动转化,防止了时差问题。

How to Avoid Druid Write /Tmp Directory Full

Recently, i have noticed when i started some of the realtime node, it is easy for druid to write /tmp directory full. The file is all start with filePeon.
After i investigate the code and the configuration of druid, i found druid write the index file in druid.indexer.task.baseDir, and the default value is System.getProperty(“java.io.tmpdir”).
So we can set java.io.tmpdir to another directory when we start the realtime node as below:

java -Djava.io.tmpdir=/data0/druid/tmp -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=25g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/wb_ad_interest_druid.spec -classpath config/_common:config/realtime:/usr/local/hadoop-2.4.0/etc/hadoop:lib/* io.druid.cli.Main server realtime

Druid 系统架构说明(二)

一.说明

续前一篇Druid 系统架构说明 ,主要介绍了druid的基本架构以及使用说明。本篇更新内容,主要介绍的是使用Realtime Index service 替代之前介绍的realtime node来完成实时ingest,index build,hand off等任务。
首先要说明一下realtime node与index server的一些区别:
Alt text
可以看出当druid集群规模增大时,使用Realtime Index Service是必须的。

二.架构与流程

相比于之前博客缩写的架构,使用Realtime Index Service的Druid系统增加了几个组件,现在的系统架构图如下:
Druid推模式
上一篇博客主要介绍的是druid的拉模式,数据通过不同的Realtime Node通过kafka等拉取数据,建立索引,handoff到Historical Node。随着Druid业务增多,规模扩大,对Realtime Node的管理变成了非常繁琐的事情,所以Druid开发了推模式,解决这一问题。相信这也是很多分布式系统应用最后都需要解决的问题,就是使部署运维简单化,自动化。
这一篇主要介绍的是推模式,推模式增加了一些角色,分别是Overlord Node, MiddleManager Node, peon以及客户端的Tranquility. 下面一一介绍各个模块的功能以及流程。

(一)角色

1.Tranquility
客户端发送工具,用户通过Tranquility将数据实时的发送到Druid中。Tranquility负责与Zk通信,与Overlord交互,根据timestamp将有效数据发送到Peon中。
2.Overlord
负责分配任务到不同的Middle Manager中,类似于ResourceManager。
3.Middle Manager
负责根据不同的任务启动Peon,并且负责Peon启动后运行的状态,类似于NodeManager。
4.Peon
Peon代替了Realtime Node的大部分功能,通过Middle Manager启动,以独立进程的形式启动。

(二)流程说明

1.用户的spec文件在Tranquility中定义,首先Tranquility通过spec初始化,获得zk中Overlord的地址,与Overlord通信。
2.Overlord得到新写入任务后,查询zk节点信息,选择一个Middle Manager节点启动来启动peon,并将信息写入到zk中。
3.Middle Manager一直监控zk,发现有新的任务分配后,启动一个Peon进程,并监控Peon进程的状态。
4.Peon与Realtime Node流程基本一致,所不同的是Peon使用的是HTTP接口来接收数据,RealTime Node更多的是内部的线程不断的拉取Kafka的数据。
5.Tranquility随后通过zk获取Peon机器地址和端口,将数据不断的发送到Peon中。
6.Peon根据spec规则,定时或者定量将数据build index,handoff到deep storage(HDFS)中。
7.随后就是Coordinator根据Peon在zk中信息,将数据写入到sql中,并分配Historical Node去deep storage拉取index数据。
8.Historical Node到deep storage拉取index数据到本地,重建index到内存中,至此数据流入完成。

三.总结

通过realtime index service的推模式,Druid的部署运维管理更加简单,易用度更高。后面一些blog会对Druid代码进行分析。

Druid OLAP系统架构简单介绍

一、基本说明

    Druid是由metamarkets的技术人员开发,用于向买家、卖家、广告主进行广告展示的底层实时分析平台。

    Druid主要解决的是对实时数据以及较近时间的历史数据的多维查询提供高并发(多用户),低延时,高可靠性的问题。对比Druid与其他解决方案,Kylin对数据按照分区每天构建前一天的cube数据提供给用户查询,用户查询的是历史数据。而Druid不断的从ingest去拉取数据,持续构建cube,提供实时查询。

    OLAP中数据由dimensions+metrics组成,所有查询都是通过metrics计算加上dimensions的filter完成的。

 

二、角色与架构

    Druid中分以下几个角色:

    1、Realtime-node

    负责从数据源(ingest)拉取数据,对数据进行index,根据时间窗口将数据按照segment persist到磁盘,最终上传到deep storage(HDFS)中,同时开放http查询服务,提供用户热数据的查询。

    2、Historical-node

    负责从deep storage中拉取segment数据,并重建index,提供给用户查询历史数据。

    3、Coordinator-node

    负责管理Historical-node,以及segment数据的放置策略。

    4、Broker-node

    负责将查询route到Realtime-node或者Historical-node中,并将结果缓存

 

    Druid用到的依赖:

    1、Ingestion输入源

    常用的为Kafka,rabbitmq, twitter数据等。

    2、Zookeeper

    负责协调各个角色之间的任务启动停止等。

    3、Mysql

    负责保存segment的源信息以及config,rules等信息。

    4、Deep Storage

    持久存储,负责保存segment数据,常用的是Hdfs, Amazon S3等分布式存储。

 

 

Druid架构:

Druid01

三、流程说明

(一)Realtime-node

    Druid接收的数据都带有时间戳,时间戳将segment之间进行分离。同时每个Realtime-node启动的时候都需要指定spec,比如下面的一个示意: 

    spec需要提供给Realtime-node以下参数

Druid02

    spec需要提供给Realtime-node以下参数:

    1、dataSource:唯一标识

    2、dimensions:维度名称,如何从原始数据解析

    3、metrics:metrics类型

    4、granularitySpec:segment的persist时间粒度

    5、firehose:输入源,设置为kafka,需要提供给kafka zk cluster地址,以及feed

    6、tuningConfig:windwsPeriod, 日志时间戳如果不在windowsPeriod之内就会被直接丢弃。intermediatePersistPeriod,保存数据到     realtime-node本地磁盘时间间隔。

 

 

    流程:

Druid03

    1、原始数据通过ingest拉取后需要根据之前定义的dimensions与metrics进行解析,解析后的数据为以后要处理的数据InputRow

    2、随后Input row根据timestamp来检查时间范围,如果超出了之前定义的time window就直接丢弃,如果在segment时间范围内首先看一下segment是否已经生成,如果没有生成就生成该segment,并通过zk cluster写入相应的数据,通知其它角色,随后进行下一步index

    3、对于实时的数据都是通过IncrementalIndex的子类OnHeapIncrementalIndex来进行index,首先会对row封装为TimeAndDims类,这个类包含了时间和dimensions的key和value(名称和取值),将TimeAndDims put到factTable的hash中,同时计算metrics,就是更新当前的metrics值,比如对于MAX的metrics和SUM的metircs更新为:

Druid04

    4、随后查看一下当前的FireHydrant(index 和segment的封装)是否需要persist到磁盘中,规则是根据当前接收的条数与时间来确定的,如果符合条件就persist到磁盘中。

    persist分为两步,第一步是kafka client将offset写入到zk cluster中。

    第二部是本地persist,过程为

    (1)生成临时目录写入数据

    (2)生成index.drd文件,包含了版本,dimensions名称,metrics类型,该segment起始与结束时间以及bitmap类型(用于建倒排索引)

    (3)对dimension进行处理,这一步对dimension求出各个dimension基数,用于建字典,同时建立bitmap,这步将所有基数按照维度写入到dim_开头的文件中

    (4)对metircs进行处理,将metrics写入到metrics文件中

    (5)建立倒排索引,根据bitmap,按照维度以及维度中的基数排序后建立倒排索引,写入inverted.drd文件中

    (6)将上述文件按照index.drd.invert.drd,spatial.drd,各种dim,各种metrics按照顺序concat

    (7)生成上述文件在concat文件中的索引位置的索引文件

    (8)修改上述文件位置,persist结束

    可见,persist的文件是按列存储的。

 

 

    同时,Realtime-node内部还有个线程根据保存的时间粒度来判断是否将segment上传到Deep storage中(HDFS).

 

(二)Historical-node

    Historical-node主要的任务就是不断的从coordinator中获取需要下载的Segment文件,下载Segment文件并且根据索引建立QuerableIndex结构,用于查询。

    QueryableIndex结构对应了persist到本地后的索引文件,包含了查询需要的信息,尤其是bitmap。

    Historical-node中使用了apache Curator框架,简化了与Zk cluster的交互,Historical-node基本上就是封装了所有与zk的交互以及http服务。

 

(三)Coordinator

    Coordinator的实现类是Druid Coordinator,主要负责是于Mysql metadata交互,线程专门处理segment是否失效,是否删除以及move segment从某个Historical-node到其他Historical-node中。

 

 

(四)Query过程

    Query通过HTTP REST接口使用json来获取数据,Query可以作用于Broker,Historical-node,Realtime-node节点上,但是一般用于Broker上。

    目前Druid提供的查询有Aggregation Queries:Timeseries,TopN,GroupBy。 Metadata Queries:Time Boundary,Segment metadata以及Data Source metadata。 最后是search。

    针对Realtime-node和Historical-node的数据,query采用了不同的方式,对于实时数据,抽象类是IncrementalIndex,数据没有sort,同时数据的存储是行存储,而对于persist以后的数据抽象类是QueryabledIndex类,数据经过sort,存储是列存储。所以两种数据的检索方式不一样。

    首先通过解析json数据,获得需要检索的demison以及metric,然后根据不同的检索类型生成相应的QueryEngine,如TimeserieseQueryEngine,GroupbyQueryEngine等。

    QueryEngine内部的process方法去获取最终结果,首先生成cursor,根据是IncrementalIndex数据还是QueryabledIndex数据生成不同类型的cursor,每次advance one或者特定offset获取指定行数据,cursor advanced的同事通过aggregator对metrics进行运算,更新结果,最后cursor结束,将结果返回。

    由于用户query可能是由多个segment组成的,所以query是分发到不同的角色中处理,每个segment以一个任务形式放到线程池中执行,最后将所有数据汇总,以json的形式返回给用户。如下

Druid05