How to Avoid Druid Write /Tmp Directory Full

Recently, i have noticed when i started some of the realtime node, it is easy for druid to write /tmp directory full. The file is all start with filePeon.
After i investigate the code and the configuration of druid, i found druid write the index file in druid.indexer.task.baseDir, and the default value is System.getProperty(“java.io.tmpdir”).
So we can set java.io.tmpdir to another directory when we start the realtime node as below:

java -Djava.io.tmpdir=/data0/druid/tmp -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=25g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=config/realtime/wb_ad_interest_druid.spec -classpath config/_common:config/realtime:/usr/local/hadoop-2.4.0/etc/hadoop:lib/* io.druid.cli.Main server realtime

Druid 系统架构说明(二)

一.说明

续前一篇Druid 系统架构说明 ,主要介绍了druid的基本架构以及使用说明。本篇更新内容,主要介绍的是使用Realtime Index service 替代之前介绍的realtime node来完成实时ingest,index build,hand off等任务。
首先要说明一下realtime node与index server的一些区别:
Alt text
可以看出当druid集群规模增大时,使用Realtime Index Service是必须的。

二.架构与流程

相比于之前博客缩写的架构,使用Realtime Index Service的Druid系统增加了几个组件,现在的系统架构图如下:
Druid推模式
上一篇博客主要介绍的是druid的拉模式,数据通过不同的Realtime Node通过kafka等拉取数据,建立索引,handoff到Historical Node。随着Druid业务增多,规模扩大,对Realtime Node的管理变成了非常繁琐的事情,所以Druid开发了推模式,解决这一问题。相信这也是很多分布式系统应用最后都需要解决的问题,就是使部署运维简单化,自动化。
这一篇主要介绍的是推模式,推模式增加了一些角色,分别是Overlord Node, MiddleManager Node, peon以及客户端的Tranquility. 下面一一介绍各个模块的功能以及流程。

(一)角色

1.Tranquility
客户端发送工具,用户通过Tranquility将数据实时的发送到Druid中。Tranquility负责与Zk通信,与Overlord交互,根据timestamp将有效数据发送到Peon中。
2.Overlord
负责分配任务到不同的Middle Manager中,类似于ResourceManager。
3.Middle Manager
负责根据不同的任务启动Peon,并且负责Peon启动后运行的状态,类似于NodeManager。
4.Peon
Peon代替了Realtime Node的大部分功能,通过Middle Manager启动,以独立进程的形式启动。

(二)流程说明

1.用户的spec文件在Tranquility中定义,首先Tranquility通过spec初始化,获得zk中Overlord的地址,与Overlord通信。
2.Overlord得到新写入任务后,查询zk节点信息,选择一个Middle Manager节点启动来启动peon,并将信息写入到zk中。
3.Middle Manager一直监控zk,发现有新的任务分配后,启动一个Peon进程,并监控Peon进程的状态。
4.Peon与Realtime Node流程基本一致,所不同的是Peon使用的是HTTP接口来接收数据,RealTime Node更多的是内部的线程不断的拉取Kafka的数据。
5.Tranquility随后通过zk获取Peon机器地址和端口,将数据不断的发送到Peon中。
6.Peon根据spec规则,定时或者定量将数据build index,handoff到deep storage(HDFS)中。
7.随后就是Coordinator根据Peon在zk中信息,将数据写入到sql中,并分配Historical Node去deep storage拉取index数据。
8.Historical Node到deep storage拉取index数据到本地,重建index到内存中,至此数据流入完成。

三.总结

通过realtime index service的推模式,Druid的部署运维管理更加简单,易用度更高。后面一些blog会对Druid代码进行分析。

OLAP实践(一) CrushJoin系统介绍

一、系统背景

      最近一段时间,我与同事liangjun(http://blog.csdn.net/hit_hlj_sgy)一直在对OLAP系统进行一些调研、开发以及将相应的系统在Sina weibo数据中的使用进行推广。在这一过程中,我们有一些经验,开发了一些实用的组件或系统,近期会进行一些分享。

      这篇blog主要介绍我们上个月完成的CrushJoin系统,工程名称揭示了它产生的意义,就是尽量减少join操作对集群数据,尤其是某些表进行反复、重复的扫描。它的背景如下:对于weibo中用户数据,每个用户由一个uid对应,同时用户有很多属性,包括他的年龄、省份、注册时间、性别、兴趣、用户活跃度、用户质量等等,经过整理以后发现总共有多达100+的属性,这些属性分布在10几张hive表中。非常重要的一种场景是数据分析师需要对用户的属性进行ad-hoc查询,比如我们想要知道在北京的用户中他的年龄构成如何,或者说我们要分析上海用户中年龄是40-50岁的用户是长期登陆的用户占上海用户中的总量多少。目前为止,现有的做法是根据需要的属性将多个表进行join,join的键为刚才提到的uid。这种方法的问题非常多,首先是ad-hoc查询需要尽快的查询时间,而通过hive转化为mr查询的方式在时间上根本无法做到实时,用户从提交查询到最后获得结果一般都需要十几甚至几十分钟,join的较多时候甚至是以小时为单位才能够查询出来。第二,这种查询都会join用户基础表,每次查询都需要扫描用户基础表将会十分浪费。

      为此,我们希望设计一套系统,能够提供给用户秒级的查询速度,满足用户对任意维度的属性进行join和in操作。这套系统准确的说不能算严格意义上的OLAP系统,因为传统意义上的OLAP分为维度(dimensions)+度量(metrics),要对多种维度进行join,groupby操作,获得相应的度量。这套系统借鉴了OLAP系统中的一些结构,更准确的叫法应该是多维filter系统。


二、相关知识

      下面我们来看一下用于这套系统的基本知识:

      1、bitmap

      bitmap也可称作bitset,是由0或者1构成的bit数组,如下图:

         Crush1

 

      其中每一位都是一个位移(offset),对bitmap的操作有AND,OR,XOR等等,因为所有操作都是按位运算,所以运算速度异常迅速。

 

      如果我们将用户的uid映射为bitmap中的一位,这样我们的count计数就转换为bitmap对应位为1的个数。就比如上图,我们可以计算出具有某种属性的用户为4。如果我们需要获得具有两个及以上的属性和,那么就将对应的bitmap进行按位与(AND)操作即可,如下图

        Crush2

      两个对应属性AND后,可以计算出满足这两个属性的用户数为2,以此类推。

      2、全排序

      我们知道,mapreduce的特性是在reduce端按key排序,但是,这种排序不是全局性的,而使用bitmap需要对全局uid进行排序,所以我们需要进行一次全排序。为此我们估算总共的用户,以及每个reduce需要的用户。这需要两步,首先是分析阶段,需要根据reduce个数,将总共的用户进行采样,确定每个reduce需要处理的uid下限和上限。第二步就是根据第一步的结果,使用TotalOrderPartitioner以及提供采样文件,就可以做到reducer之间有序,reducer内部也是有序的全排序。具体细节可以查询TotalOrderPartitioner,当然这步我们是做了一些修改。


三、系统设计与流程

      我们将所有的bitmap存储在Hbase中,Hbase表的设计为,在全排序获得的UID范围为key, columnFamily为1,column是排序好的属性,其中属性名称与int做了一个映射,所以column是1、2、3….等,value对应了该UID范围reduce计算好的各个属性的bitmap进行序列化的结果。

      Hbase建表的时候我们就根据reduce计算生成的文件设置好region数,这里的region数设置为200个,主要是方便我们查询时候使用Coprocessor,提升查询的并发性和速度。同时,这里需要特别说的是,bitmap的实现有多种,比如java原生的实现,但是如果数据量过大,即使是按位运算数据量也不能忽视,尤其是我每个reduce根据属性个数大概是150+的属性,每个属性生成一个bitmap,并存储在hbase中,数据量非常大。这里我们使用了Roaringbitmap,他的地址是https://github.com/lemire/RoaringBitmap,目前很多开源的OLAP项目都使用它,比如Druid。我们目前测试的效果,根据你的bitmap稀疏程度,RoaringBitmap基本上都能压缩几百甚至上千倍,并且计算更快,非常方便。

系统的流程如下:

                   Crush3

      1、系统每天更新一次,首先使用mr任务,将所有表对应的属性按照全排序,分布到对应的reduce端,reduce端根据属性值对每个属性生成一个bitmap,同时写入HFile中。

      2、HFile生成完成后,将HFile distcp到我们的Hbase集群中,通过bulkload将数据导入

      3、后端是个Jetty Server,用户查询通过Jetty Server,转换为对Hbase的查询,查询使用Coprocessor

      4、Hbase将Coprocessor结果返回给Jetty,Jetty立即将用户数量返回给用户,同时异步的将对应的UIDs写入到一个文件,方便用户查询

 

     经过我们的测试,查询10个属性总共需要600ms,将原先离线的join,转化为实时查询,完成了我们的目的。

Druid OLAP系统架构简单介绍

一、基本说明

    Druid是由metamarkets的技术人员开发,用于向买家、卖家、广告主进行广告展示的底层实时分析平台。

    Druid主要解决的是对实时数据以及较近时间的历史数据的多维查询提供高并发(多用户),低延时,高可靠性的问题。对比Druid与其他解决方案,Kylin对数据按照分区每天构建前一天的cube数据提供给用户查询,用户查询的是历史数据。而Druid不断的从ingest去拉取数据,持续构建cube,提供实时查询。

    OLAP中数据由dimensions+metrics组成,所有查询都是通过metrics计算加上dimensions的filter完成的。

 

二、角色与架构

    Druid中分以下几个角色:

    1、Realtime-node

    负责从数据源(ingest)拉取数据,对数据进行index,根据时间窗口将数据按照segment persist到磁盘,最终上传到deep storage(HDFS)中,同时开放http查询服务,提供用户热数据的查询。

    2、Historical-node

    负责从deep storage中拉取segment数据,并重建index,提供给用户查询历史数据。

    3、Coordinator-node

    负责管理Historical-node,以及segment数据的放置策略。

    4、Broker-node

    负责将查询route到Realtime-node或者Historical-node中,并将结果缓存

 

    Druid用到的依赖:

    1、Ingestion输入源

    常用的为Kafka,rabbitmq, twitter数据等。

    2、Zookeeper

    负责协调各个角色之间的任务启动停止等。

    3、Mysql

    负责保存segment的源信息以及config,rules等信息。

    4、Deep Storage

    持久存储,负责保存segment数据,常用的是Hdfs, Amazon S3等分布式存储。

 

 

Druid架构:

Druid01

三、流程说明

(一)Realtime-node

    Druid接收的数据都带有时间戳,时间戳将segment之间进行分离。同时每个Realtime-node启动的时候都需要指定spec,比如下面的一个示意: 

    spec需要提供给Realtime-node以下参数

Druid02

    spec需要提供给Realtime-node以下参数:

    1、dataSource:唯一标识

    2、dimensions:维度名称,如何从原始数据解析

    3、metrics:metrics类型

    4、granularitySpec:segment的persist时间粒度

    5、firehose:输入源,设置为kafka,需要提供给kafka zk cluster地址,以及feed

    6、tuningConfig:windwsPeriod, 日志时间戳如果不在windowsPeriod之内就会被直接丢弃。intermediatePersistPeriod,保存数据到     realtime-node本地磁盘时间间隔。

 

 

    流程:

Druid03

    1、原始数据通过ingest拉取后需要根据之前定义的dimensions与metrics进行解析,解析后的数据为以后要处理的数据InputRow

    2、随后Input row根据timestamp来检查时间范围,如果超出了之前定义的time window就直接丢弃,如果在segment时间范围内首先看一下segment是否已经生成,如果没有生成就生成该segment,并通过zk cluster写入相应的数据,通知其它角色,随后进行下一步index

    3、对于实时的数据都是通过IncrementalIndex的子类OnHeapIncrementalIndex来进行index,首先会对row封装为TimeAndDims类,这个类包含了时间和dimensions的key和value(名称和取值),将TimeAndDims put到factTable的hash中,同时计算metrics,就是更新当前的metrics值,比如对于MAX的metrics和SUM的metircs更新为:

Druid04

    4、随后查看一下当前的FireHydrant(index 和segment的封装)是否需要persist到磁盘中,规则是根据当前接收的条数与时间来确定的,如果符合条件就persist到磁盘中。

    persist分为两步,第一步是kafka client将offset写入到zk cluster中。

    第二部是本地persist,过程为

    (1)生成临时目录写入数据

    (2)生成index.drd文件,包含了版本,dimensions名称,metrics类型,该segment起始与结束时间以及bitmap类型(用于建倒排索引)

    (3)对dimension进行处理,这一步对dimension求出各个dimension基数,用于建字典,同时建立bitmap,这步将所有基数按照维度写入到dim_开头的文件中

    (4)对metircs进行处理,将metrics写入到metrics文件中

    (5)建立倒排索引,根据bitmap,按照维度以及维度中的基数排序后建立倒排索引,写入inverted.drd文件中

    (6)将上述文件按照index.drd.invert.drd,spatial.drd,各种dim,各种metrics按照顺序concat

    (7)生成上述文件在concat文件中的索引位置的索引文件

    (8)修改上述文件位置,persist结束

    可见,persist的文件是按列存储的。

 

 

    同时,Realtime-node内部还有个线程根据保存的时间粒度来判断是否将segment上传到Deep storage中(HDFS).

 

(二)Historical-node

    Historical-node主要的任务就是不断的从coordinator中获取需要下载的Segment文件,下载Segment文件并且根据索引建立QuerableIndex结构,用于查询。

    QueryableIndex结构对应了persist到本地后的索引文件,包含了查询需要的信息,尤其是bitmap。

    Historical-node中使用了apache Curator框架,简化了与Zk cluster的交互,Historical-node基本上就是封装了所有与zk的交互以及http服务。

 

(三)Coordinator

    Coordinator的实现类是Druid Coordinator,主要负责是于Mysql metadata交互,线程专门处理segment是否失效,是否删除以及move segment从某个Historical-node到其他Historical-node中。

 

 

(四)Query过程

    Query通过HTTP REST接口使用json来获取数据,Query可以作用于Broker,Historical-node,Realtime-node节点上,但是一般用于Broker上。

    目前Druid提供的查询有Aggregation Queries:Timeseries,TopN,GroupBy。 Metadata Queries:Time Boundary,Segment metadata以及Data Source metadata。 最后是search。

    针对Realtime-node和Historical-node的数据,query采用了不同的方式,对于实时数据,抽象类是IncrementalIndex,数据没有sort,同时数据的存储是行存储,而对于persist以后的数据抽象类是QueryabledIndex类,数据经过sort,存储是列存储。所以两种数据的检索方式不一样。

    首先通过解析json数据,获得需要检索的demison以及metric,然后根据不同的检索类型生成相应的QueryEngine,如TimeserieseQueryEngine,GroupbyQueryEngine等。

    QueryEngine内部的process方法去获取最终结果,首先生成cursor,根据是IncrementalIndex数据还是QueryabledIndex数据生成不同类型的cursor,每次advance one或者特定offset获取指定行数据,cursor advanced的同事通过aggregator对metrics进行运算,更新结果,最后cursor结束,将结果返回。

    由于用户query可能是由多个segment组成的,所以query是分发到不同的角色中处理,每个segment以一个任务形式放到线程池中执行,最后将所有数据汇总,以json的形式返回给用户。如下

Druid05