Some Notes About Ops in Druid OLAP System

I have talked about Druid a lot in blog, including the architecture of druid and how to change time zone in druid. This post will focus on the basic operation in druid which operate everyday.

First, how to write the Hadoop map reduce index spec file


{
  "dataSchema" : {
    "dataSource" : "ingestion_test",
    "parser" : {
      "type" : "hadoopyString",
      "parseSpec" : {
        "format" : "tsv",
        "timestampSpec" : {
          "column" : "dt",
          "format" : "posix"
        },
        "dimensionsSpec" : {
          "dimensions": ["grade","src_flag","school_id","gender_desc","prov_name","city_name"]
        },
        "delimiter":"\u0001",
        "listDelimiter":"\u0002",
        "columns":  ["dt","uid","grade","src_flag","school_id","gender_desc","prov_name","city_name","flag"]
      }
    },
    "metricsSpec" : [
              {
                    "type": "count",
                    "name": "count_druid"
              },
              {
                    "type": "hyperUnique",
                    "name": "uv",
                    "fieldName" : "uid"
              },
              {
                    "type": "longSum",
                    "name": "count",
                    "fieldName" : "flag"
              }
    ],
    "granularitySpec" : {
      "type" : "uniform",
      "segmentGranularity" : "HOUR",
      "queryGranularity" : "NONE",
      "intervals" : [ "2017-3-13/2017-3-14" ]
    }
  },
  "ioConfig" : {
    "type" : "hadoop",
    "inputSpec" : {
      "type" : "static",
      "paths" : "/data//000000_0"
    },
    "metadataUpdateSpec" : {
                "type":"mysql",
                "connectURI":"jdbc:mysql://ip:3306/druid",
                "password" : "password",
                "user" : "user",
                "segmentTable" : "druid_segments"
    },
    "segmentOutputPath" : "hdfs://ns1/user/druid/localStorage"
  },
  "tuningConfig" : {
    "type" : "hadoop",
    "workingPath": "hdfs://ns1/user/druid/localStorage/workdir",
    "partitionsSpec" : {
      "type" : "hashed",
      "numShards" : 3
    },
    "shardSpecs" : { },
    "leaveIntermediate" : false,
    "cleanupOnFailure" : true,
    "overwriteFiles" : false,
    "ignoreInvalidRows" : false,
    "jobProperties" : { },
    "combineText" : false,
    "persistInHeap" : false,
    "ingestOffheap" : false,
    "bufferSize" : 134217728,
    "aggregationBufferRatio" : 0.5,
    "rowFlushBoundary" : 300000,
    "useCombiner" : true,
    "buildV9Directly" : true
  }
}

In the spec file ,you can assign reduce number in numShareds parameters.

Second, the example spec file which directly write to druid using tranquility


{
  "dataSources": [
    {
      "spec": {
        "dataSchema": {
          "dataSource": "main_static_log_tranq1",
          "parser": {
            "type": "string",
            "parseSpec": {
              "format": "json",
              "timestampSpec": {
                "column": "timestamp",
                "format": "posix"
              },
              "dimensionsSpec": {
                "dimensions": ["typeSignII",  "typeSignI", "typeSignIII", "typeSignIV",  "responseCode",  "processTotalTime", "serverIp", "terminal", "type", "service"],
                "dimensionExclusions": [],
                "spatialDimensions": []
              }
            }
          },
          "metricsSpec": [
            {
              "type": "count",
              "name": "count"
            },{
              "type": "doubleSum",
              "name": "mProcessTotalTime",
              "fieldName" : "mProcessTotalTime"
            }
          ],
          "granularitySpec": {
            "type": "uniform",
            "segmentGranularity": "SIX_HOUR",
            "queryGranularity": "MINUTE"
          }
        },
        "tuningConfig": {
          "type": "realtime",
          "maxRowsInMemory": 100000,
          "intermediatePersistPeriod": "PT10m",
          "windowPeriod": "PT60m"
        }
      },
      "properties" : {
            "task.partitions" : "1",
            "task.replicants" : "2"
      }
    }
  ],
  "properties": {
    "zookeeper.connect": "10.39.2.161:2181",
    "druid.selectors.indexing.serviceName": "overlord",
    "druid.discovery.curator.path": "/druid/discovery",
    "druidBeam.overlordPollPeriod": "PT20S"
  }
}

You can assign partitions and replications using task.partitions and task.replicants parameters.

How to Submit a Merge Job to Index Service in Druid

Normally, we use index service instead of realtime nodes in Druid to ingest realtime data. If you have multiple partitions in one time, and each of them is small, you have to merge them together to form a big segments to boost query effeciency.
For example we have two segments in the same time interval, just as below

two segments on the same time interval
What we care is to merge them together. Here is how to write the merge.json file and submit:


{
    "type": "merge",
    "dataSource": "main_static_log_tt",
    "aggregations": [
                {
                    "type": "count",
                    "name": "count"
                },{
                    "type": "doubleSum",
                    "name": "mProcessTotalTime",
                    "fieldName" : "mProcessTotalTime"
                }
    ],
    "rollup": "false",
    "segments": [
{"dataSource":"main_static_log_tt","interval":"2017-03-27T10:05:00.000Z/2017-03-27T10:06:00.000Z","version":"2017-03-27T10:05:00.000Z","loadSpec":{"type":"local","path":"/data0/test/file/main_static_log_tt/2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z/2017-03-27T10:05:00.000Z/0/index.zip"},"dimensions":"processTotalTime,responseCode,serverIp,typeSignI,typeSignII,typeSignIII,typeSignIV","metrics":"count,mProcessTotalTime","shardSpec":{"type":"none"},"binaryVersion":9,"size":129991,"identifier":"main_static_log_tt_2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z_2017-03-27T10:05:00.000Z"},
{"dataSource":"main_static_log_tt","interval":"2017-03-27T10:05:00.000Z/2017-03-27T10:06:00.000Z","version":"2017-03-27T10:05:00.000Z","loadSpec":{"type":"local","path":"/data0/test/file/main_static_log_tt/2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z/2017-03-27T10:05:00.000Z/1/index.zip"},"dimensions":"processTotalTime,responseCode,serverIp,typeSignI,typeSignII,typeSignIII,typeSignIV","metrics":"count,mProcessTotalTime","shardSpec":{"type":"none"},"binaryVersion":9,"size":190243,"identifier":"main_static_log_tt_2017-03-27T10:05:00.000Z_2017-03-27T10:06:00.000Z_2017-03-27T10:05:00.000Z_1"}
    ]
}

Remember to change the shardSpec type to none, because the merge function only merge that type, it ignore hash or linear type. But we can avoid it, we just change the type to none, it has some problem, later post i will talk about how to change the code and make it work.
After edit the json file, you can submit to your overlord node as below:

curl http://host:port/druid/indexer/v1/task -H "Content-Type:application/json" -X POST --data @merge.json

Kill the Job

Some time, you just want to take a test and later on you can kill the task to free the slot in index service. Here is how to write kill.json file and submit it:


{
    "type": "kill",
    "id": "sbsina",
    "dataSource": "main_static_log_tt",
    "interval": "2017-03-22T07:47:00.000Z/2017-03-28T07:48:00.000Z"
}

Submit it:

curl http://host:port/druid/indexer/v1/task -H "Content-Type:application/json" -X POST --data @kill.json

Disable Middle Manager to Update

submit a post to middle manager http port:

curl -X POST http://ip:port/druid/worker/v1/disable

谈谈HftpFileSystem

最近在排查一个distcp反复失败的问题,我们知道distcp是对两个集群进行数据拷贝的工具,对于远程集群或者版本不一致的集群,我们使用hftp拉取数据,使用的方法如下:

hadoop distcp hftp://sourceFS:50070/src hdfs://destFS:50070/dest

Hftp的Schema是hftp开头的,实现的类为HftpFileSystem。
那Hftp的读取流程是如何呢,比如读取一个文件,流程如下:
流程说明
仔细说明一下hftp的流程:
1、首先Hftp会拼出NameNode的http URL,所有的Hftp交互都是通过http端口进行的,NameNode默认的http端口是50070,代码为

f = f.makeQualified(getUri(), getWorkingDirectory());
String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
URL u = getNamenodeURL(path, query);
return new FSDataInputStream(new RangeHeaderInputStream(connectionFactory, u));

这个时候还没有与NameNode进行连接。当openInputStream的时候才会进行实际连接,代码为

protected InputStream openInputStream() throws IOException {
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null;
final URLOpener opener = resolved? resolvedURL: originalURL;

final HttpURLConnection connection = opener.connect(startPos, resolved);

2、NameNode通过NamenodeWebHdfsMethods类来处理所有的HTTP请求,对于hftp发送的Open请求,NameNode的处理逻辑是根据位置选择一个最适合的DataNode,然后重定向把URL设置成该DataNode的http端口返回给Hftp client端。

final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();

至此,完成了Client与NameNode的交互,Client然后与DataNode进行交互。
3、Client根据重定向的URL与需要读取数据的DataNode建立连接,注意这一步走的还是HTTP端口,DataNode默认的HTTP端口是50075。

resolvedURL.setURL(getResolvedUrl(connection));

InputStream in = connection.getInputStream();

4、在DataNode内部通过DatanodeWebHdfsMethods类来处理HTTP请求,对于OPEN的请求,处理方式为

case OPEN:
{
final int b = bufferSize.getValue(conf);
final DFSClient dfsclient = newDfsClient(nnId, conf);
HdfsDataInputStream in = null;
try {
in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true));
in.seek(offset.getValue());
} catch(IOException ioe) {
IOUtils.cleanup(LOG, in);
IOUtils.cleanup(LOG, dfsclient);
throw ioe;
}

final long n = length.getValue() != null ?
Math.min(length.getValue(), in.getVisibleLength() - offset.getValue()) :
in.getVisibleLength() - offset.getValue();

/**
* Allow the Web UI to perform an AJAX request to get the data.
*/
return Response.ok(new OpenEntity(in, n, dfsclient))
.type(MediaType.APPLICATION_OCTET_STREAM)
.header("Access-Control-Allow-Methods", "GET")
.header("Access-Control-Allow-Origin", "*")
.build();
}

内部如果打开了本地读那么就生成了一个本地读(BlockReaderLocal)的DFSClient来读取数据,如果没有开启本地读,那么就生成一个Socket连接来读取数据。
5,6、返回给client inputstream用于数据读取。至此一个Hftp读取的流程结束。

我们遇到的问题

当执行distcp的时候有一端为hftp,发现任务都fail read一个文件,该文件所在的机器状态不太正常,查看网络连接,50075端口有很多CLOSE_WAIT,并且datanode进程写入、读取都非常慢,判断当时情况网络连接不正常,因为NameNode的HTTP接口只返回了一个DataNode地址,并不是像RPC接口一样,返回DataNode列表,如果一个出现问题,还可以读取其他的DataNode。在这个时候由于这台出问题的DataNode导致了反复无法读取数据,当时解决的方法是重启该进程,使得读取到其他的DataNode上。
至于深层原因,根当时的DN状态有关,由于没有保存完整堆栈信息,只能在下次出错的时候继续排查,但是基本思路是DN的socket server响应慢,由于内部逻辑导致的。

Druid Ingest Format问题

 

Druid使用过程中需要对历史数据进行索引,由于历史数据都是hive表形式,分隔符为\001,所以需要druid对ingest的format 任意delimiter进行支持,以下是支持的形式:

"parser" : {
"type" : "hadoopyString",
"parseSpec" : {
"format" : "tsv",
"timestampSpec" : {
"column" : "dt",
"format" : "posix"
},
"dimensionsSpec" : {
"dimensions": ["grade","src_flag","school_name","gender_desc","prov_name","city_name","school_prov"]
},
"delimiter":"\u0001",
"listDelimiter":"\u0002",
"columns": ["dt","uid","grade","src_flag","school_name","gender_desc","prov_name","city_name","school_prov","flag"]
}
},

指定解析格式是tsv,delimiter为\u0001,listDelimiter是multi时候使用,目前没有使用,定义为\u0002,启动任务即可。

Hadoop 2运行distCpV1

记录一下,有些情况,需要使用distcpV1,命令为

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-extras-2.4.0.jar org.apache.hadoop.tools.DistCpV1 $source $dest

Gen8 安装 Dsm说明

gen8安装DSM流程如下,首先需要安装ESXI 5.5,这个google一下一大把,然后就是安装dsm。下载的地址为 xpenology.
我选取的版本是DSM 5.1-5022。需要下载的是DSM 5.1-5022 PAT文件,XPEnoboot DS3615xs 5.1-5022.3 ISO和VMDK三个文件。过程如下:
1、建立虚拟机,使用光驱挂在IOS文件,随后根据提示找到安装的ip,随后登陆到该ip,上传PAT文件,格式化并重启
2、重启后关闭虚拟机,增加磁盘,把VMDK磁盘加入到引导磁盘,同时设置为该磁盘模式属性为独立非持久,这部很重要
3、随后重新启动dsm,用vmdk引导,这时候可以使用dsm 安装助手,进入后进行配置即可
最重要的是第1步会将磁盘格式化,第2步会重新配置引导磁盘。

Apache Kylin 安装说明

Kylin运行依赖于Hive,MR,Hbase等,所以Kylin的安装需要先配置好Hive,Hadoop,Hbase环境才行。
首先要注意的是,Kylin运行提交任务默认会认为MR任务所在的文件夹jar包,在集群其它机器上相同的位置也有相应的jar包。比如hadoop-mapreduce-client-core-2.4.0.jar位置在kylin机器上为 /usr/local/hadoop, 那么默认运行时候在集群中的机器上jar包的位置也需要在同样的位置。这样的话,我们就需要根据Hadoop集群位置来命名Kylin机器上hadoop安装包路径。
其次就是安装hive,hbase,并且注意的是我们采用的是跑kylin构建任务的Hadoop集群与提供查询服务的Hbase底层Hadoop集群分离的架构,需要跑任务的集群能够有权限访问Hbase集群中Hadoop的各个节点。
最后是关于Kylin的配置,Kylin的配置都在$KYLIN_HOME/conf/kylin.properties
需要注意的几个选项:
kylin.metadata.url=kylin_metadata@hbase kylin元数据存储
kylin.hdfs.working.dir=/user/jiangyu2/kylin Kylin构建过程结果存储路径
kylin.hbase.cluster.fs=hdfs://ns-test 如果是两个hdfs,需要设置hbase的hdfs
kylin.hbase.cluster.hdfs.config.file=hbase.hdfs.xml 如果是两个集群,需要设置hbase的conf ,位置在$KYLIN_HOME/conf下面的hbase.hdfs.xml
最后需要修改Hbase的一个配置

  <property>
      <name>hbase.use.dynamic.jar</name>
      <value>false</value>
  </property>

另外,Kylin的权限可以通过LDAP来管理的,也可以通过硬编码,如果采用硬编码,修改位置在$KYLIN_HOME/tomcat/webapps/kylin/WEB-INF/classes/kylinSecurity.xml

        <beans profile="testing">
                <!-- user auth -->
                <bean id="passwordEncoder" class="org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder" />

                <scr:authentication-manager alias="testingAuthenticationManager">
                        <scr:authentication-provider>
                                <scr:user-service>
                                        <scr:user name="MODELER" password="$2a$10$Le5ernTeGNIARwMJsY0WaOLioNQdb0QD11DwjeyNqqNRp5NaDo2FG" authorities="ROLE_MODELER" />
                                        <scr:user name="ANALYST" password="$2a$10$s4INO3XHjPP5Vm2xH027Ce9QeXWdrfq5pvzuGr9z/lQmHqi0rsbNi" authorities="ROLE_ANALYST" />
                                        <scr:user name="ADMIN" password="$2a$10$o3ktIWsGYxXNuUWQiYlZXOW5hWcqyNAFQsSSCSEWoC/BRVMAUjL32" authorities="ROLE_MODELER, ROLE_ANALYST, ROLE_ADMIN" />
                                </scr:user-service>
                                <scr:password-encoder ref="passwordEncoder" />
                        </scr:authentication-provider>
                </scr:authentication-manager>
        </beans>

可以新增加用户,新增加的用户密码由下面的代码来实现

package search;

import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;

/**
 * Created by jiangyu2 on 12/7/16.
 */
public class F {
  public static void main(String[] args) {
    String password = "suda_test";
    PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();
    String encodedPassword = passwordEncoder.encode(password);
    System.out.print(encodedPassword);
  }
}

Druid Batch Job 问题汇总

最近在整理使用Druid Batch Job所遇到的问题,下面一一记录,我使用的Druid版本是0.8.2,所以以下方法适用于0.8.2。
一、依赖包问题
因为很多机器都无法连接外网,所以有必要修改druid的源文件,使他能够从我们的本地中心库下载文件,所需要修改的文件为ExtensionsConfig.java,修改内容如下

   private List<String> remoteRepositories = ImmutableList.of(
-      "https://repo1.maven.org/maven2/",
-      "https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local"
+      "http://10.39.0.110:8081/nexus/content/groups/public",
+      "http://10.39.0.110:8081/nexus/content/repositories/thirdparty"
   );

这样将会从我们的中心库下载所需要的包。
修改完后我们可以把所需要的包拉取的本地用于提交,修改$DRUID_HOME/config/_common/common.runtime.properties

druid.extensions.localRepository=/usr/local/druid-0.8.2/localRepo

指定本地仓库的位置。 随后在$DRUID_HOME目录下执行

java -cp  config/_common:config/broker:/usr/local/hadoop-2.4.0/etc/hadoop:lib/*  io.druid.cli.Main tools pull-deps

这样会将依赖包下载。
如果需要增加自己的的第三方依赖包,也修改$DRUID_HOME/config/_common/common.runtime.properties

druid.extensions.coordinates=["com.sina.hivexec:hive-exec:0.13.0"]

这样提交的时候会将依赖包加入到classpath中。

二、counter计数问题
我们的问题是由于一个第三方包导致获取counter计数抛异常,随后导致任务失败,目前解决方法是通过修改源码,catch住异常,查看是否是由于counter计数引起的,并且查看任务是否成功,成功的话继续下一步任务,而不是直接抛出异常结束。

三、有关reduce个数问题
简单说一下druid的流程,只讲一下partitionsSpec type为hash的情况。
1、如果不指定numShards 那么会分两个任务,第一个任务通过hyperloglog对每一个分区去查找基数大小,reduce会将每个分区的基数大小输出。
随后job会根据targetPartitionSize决定由几个shards来跑这个第二个任务,第二个任务就是生成index,基本流程跟realtime一样,根据日志,生成index。但是如果shards为1,相当于只有一个reduce去跑,会比较慢。这样如果基数是20000, “targetPartitionSize” : 10000,那么每个时间分区就只有20000/10000=2个reduce去跑。
2、如果指定numShards,那么就只有index一个任务,每个时间分区启动numShards个reduce,如果知道大概的数据量以及基数,可以直接指定numShareds.

四、时区问题
由于提交的时候指定的是UTC时区,所以需要在map 以及reduce阶段也制定时区,指定方法为,修改提交机器的mapred-site.xml

  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-Xmx1280M -Duser.timezone=UTC</value>
  </property>
  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-Xmx1536M -Duser.timezone=UTC</value>
  </property>

How to solve “.Git Directory Could Not Be Found! Please Specify a Valid [dotGitDirectory] in Your pom.xml” Problem

When i modify the source code of presto, and execute mvn package, something wrong happened, the stack trace is as follow:

.Git Directory Could Not Be Found! Please Specify a Valid [dotGitDirectory] in Your pom.xml

After google the problem, i find a simple way to solve it. Add this plugin to your pom.xml and everything is ok.

            <plugin>
              <groupId>pl.project13.maven</groupId>
              <artifactId>git-commit-id-plugin</artifactId>
              <version>2.1.15</version>
              <configuration>
                <failOnNoGitDirectory>false</failOnNoGitDirectory>
              </configuration>
            </plugin>

Hadoop Web Page Information

Sometimes, we want make sure which branch we compile the hadoop code. Normally, we can find this information through hadoop web page, for example the namenode web page:


If we want to change the information by self, we should modify the file in
$HADOOP_HOME/hadoop-common-project/hadoop-common/src/main/resources/common-version-info.properties

version=${pom.version}
revision=${version-info.scm.commit}
branch=${version-info.scm.branch}
user=${user.name}
date=${version-info.build.time}
url=${version-info.scm.uri}
srcChecksum=${version-info.source.md5}
protocVersion=${protobuf.version}

Just modify the value , you can change the web page of namenode. For example we can change the branch to branch=tag-20160817 as the image above.