Druid OLAP系统架构简单介绍

一、基本说明

    Druid是由metamarkets的技术人员开发,用于向买家、卖家、广告主进行广告展示的底层实时分析平台。

    Druid主要解决的是对实时数据以及较近时间的历史数据的多维查询提供高并发(多用户),低延时,高可靠性的问题。对比Druid与其他解决方案,Kylin对数据按照分区每天构建前一天的cube数据提供给用户查询,用户查询的是历史数据。而Druid不断的从ingest去拉取数据,持续构建cube,提供实时查询。

    OLAP中数据由dimensions+metrics组成,所有查询都是通过metrics计算加上dimensions的filter完成的。

 

二、角色与架构

    Druid中分以下几个角色:

    1、Realtime-node

    负责从数据源(ingest)拉取数据,对数据进行index,根据时间窗口将数据按照segment persist到磁盘,最终上传到deep storage(HDFS)中,同时开放http查询服务,提供用户热数据的查询。

    2、Historical-node

    负责从deep storage中拉取segment数据,并重建index,提供给用户查询历史数据。

    3、Coordinator-node

    负责管理Historical-node,以及segment数据的放置策略。

    4、Broker-node

    负责将查询route到Realtime-node或者Historical-node中,并将结果缓存

 

    Druid用到的依赖:

    1、Ingestion输入源

    常用的为Kafka,rabbitmq, twitter数据等。

    2、Zookeeper

    负责协调各个角色之间的任务启动停止等。

    3、Mysql

    负责保存segment的源信息以及config,rules等信息。

    4、Deep Storage

    持久存储,负责保存segment数据,常用的是Hdfs, Amazon S3等分布式存储。

 

 

Druid架构:

Druid01

三、流程说明

(一)Realtime-node

    Druid接收的数据都带有时间戳,时间戳将segment之间进行分离。同时每个Realtime-node启动的时候都需要指定spec,比如下面的一个示意: 

    spec需要提供给Realtime-node以下参数

Druid02

    spec需要提供给Realtime-node以下参数:

    1、dataSource:唯一标识

    2、dimensions:维度名称,如何从原始数据解析

    3、metrics:metrics类型

    4、granularitySpec:segment的persist时间粒度

    5、firehose:输入源,设置为kafka,需要提供给kafka zk cluster地址,以及feed

    6、tuningConfig:windwsPeriod, 日志时间戳如果不在windowsPeriod之内就会被直接丢弃。intermediatePersistPeriod,保存数据到     realtime-node本地磁盘时间间隔。

 

 

    流程:

Druid03

    1、原始数据通过ingest拉取后需要根据之前定义的dimensions与metrics进行解析,解析后的数据为以后要处理的数据InputRow

    2、随后Input row根据timestamp来检查时间范围,如果超出了之前定义的time window就直接丢弃,如果在segment时间范围内首先看一下segment是否已经生成,如果没有生成就生成该segment,并通过zk cluster写入相应的数据,通知其它角色,随后进行下一步index

    3、对于实时的数据都是通过IncrementalIndex的子类OnHeapIncrementalIndex来进行index,首先会对row封装为TimeAndDims类,这个类包含了时间和dimensions的key和value(名称和取值),将TimeAndDims put到factTable的hash中,同时计算metrics,就是更新当前的metrics值,比如对于MAX的metrics和SUM的metircs更新为:

Druid04

    4、随后查看一下当前的FireHydrant(index 和segment的封装)是否需要persist到磁盘中,规则是根据当前接收的条数与时间来确定的,如果符合条件就persist到磁盘中。

    persist分为两步,第一步是kafka client将offset写入到zk cluster中。

    第二部是本地persist,过程为

    (1)生成临时目录写入数据

    (2)生成index.drd文件,包含了版本,dimensions名称,metrics类型,该segment起始与结束时间以及bitmap类型(用于建倒排索引)

    (3)对dimension进行处理,这一步对dimension求出各个dimension基数,用于建字典,同时建立bitmap,这步将所有基数按照维度写入到dim_开头的文件中

    (4)对metircs进行处理,将metrics写入到metrics文件中

    (5)建立倒排索引,根据bitmap,按照维度以及维度中的基数排序后建立倒排索引,写入inverted.drd文件中

    (6)将上述文件按照index.drd.invert.drd,spatial.drd,各种dim,各种metrics按照顺序concat

    (7)生成上述文件在concat文件中的索引位置的索引文件

    (8)修改上述文件位置,persist结束

    可见,persist的文件是按列存储的。

 

 

    同时,Realtime-node内部还有个线程根据保存的时间粒度来判断是否将segment上传到Deep storage中(HDFS).

 

(二)Historical-node

    Historical-node主要的任务就是不断的从coordinator中获取需要下载的Segment文件,下载Segment文件并且根据索引建立QuerableIndex结构,用于查询。

    QueryableIndex结构对应了persist到本地后的索引文件,包含了查询需要的信息,尤其是bitmap。

    Historical-node中使用了apache Curator框架,简化了与Zk cluster的交互,Historical-node基本上就是封装了所有与zk的交互以及http服务。

 

(三)Coordinator

    Coordinator的实现类是Druid Coordinator,主要负责是于Mysql metadata交互,线程专门处理segment是否失效,是否删除以及move segment从某个Historical-node到其他Historical-node中。

 

 

(四)Query过程

    Query通过HTTP REST接口使用json来获取数据,Query可以作用于Broker,Historical-node,Realtime-node节点上,但是一般用于Broker上。

    目前Druid提供的查询有Aggregation Queries:Timeseries,TopN,GroupBy。 Metadata Queries:Time Boundary,Segment metadata以及Data Source metadata。 最后是search。

    针对Realtime-node和Historical-node的数据,query采用了不同的方式,对于实时数据,抽象类是IncrementalIndex,数据没有sort,同时数据的存储是行存储,而对于persist以后的数据抽象类是QueryabledIndex类,数据经过sort,存储是列存储。所以两种数据的检索方式不一样。

    首先通过解析json数据,获得需要检索的demison以及metric,然后根据不同的检索类型生成相应的QueryEngine,如TimeserieseQueryEngine,GroupbyQueryEngine等。

    QueryEngine内部的process方法去获取最终结果,首先生成cursor,根据是IncrementalIndex数据还是QueryabledIndex数据生成不同类型的cursor,每次advance one或者特定offset获取指定行数据,cursor advanced的同事通过aggregator对metrics进行运算,更新结果,最后cursor结束,将结果返回。

    由于用户query可能是由多个segment组成的,所以query是分发到不同的角色中处理,每个segment以一个任务形式放到线程池中执行,最后将所有数据汇总,以json的形式返回给用户。如下

Druid05