Druid Ingest Format问题

 

Druid使用过程中需要对历史数据进行索引,由于历史数据都是hive表形式,分隔符为\001,所以需要druid对ingest的format 任意delimiter进行支持,以下是支持的形式:

"parser" : {
"type" : "hadoopyString",
"parseSpec" : {
"format" : "tsv",
"timestampSpec" : {
"column" : "dt",
"format" : "posix"
},
"dimensionsSpec" : {
"dimensions": ["grade","src_flag","school_name","gender_desc","prov_name","city_name","school_prov"]
},
"delimiter":"\u0001",
"listDelimiter":"\u0002",
"columns": ["dt","uid","grade","src_flag","school_name","gender_desc","prov_name","city_name","school_prov","flag"]
}
},

指定解析格式是tsv,delimiter为\u0001,listDelimiter是multi时候使用,目前没有使用,定义为\u0002,启动任务即可。

Hadoop 2运行distCpV1

记录一下,有些情况,需要使用distcpV1,命令为

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-extras-2.4.0.jar org.apache.hadoop.tools.DistCpV1 $source $dest

Gen8 安装 Dsm说明

gen8安装DSM流程如下,首先需要安装ESXI 5.5,这个google一下一大把,然后就是安装dsm。下载的地址为 xpenology.
我选取的版本是DSM 5.1-5022。需要下载的是DSM 5.1-5022 PAT文件,XPEnoboot DS3615xs 5.1-5022.3 ISO和VMDK三个文件。过程如下:
1、建立虚拟机,使用光驱挂在IOS文件,随后根据提示找到安装的ip,随后登陆到该ip,上传PAT文件,格式化并重启
2、重启后关闭虚拟机,增加磁盘,把VMDK磁盘加入到引导磁盘,同时设置为该磁盘模式属性为独立非持久,这部很重要
3、随后重新启动dsm,用vmdk引导,这时候可以使用dsm 安装助手,进入后进行配置即可
最重要的是第1步会将磁盘格式化,第2步会重新配置引导磁盘。

Apache Kylin 安装说明

Kylin运行依赖于Hive,MR,Hbase等,所以Kylin的安装需要先配置好Hive,Hadoop,Hbase环境才行。
首先要注意的是,Kylin运行提交任务默认会认为MR任务所在的文件夹jar包,在集群其它机器上相同的位置也有相应的jar包。比如hadoop-mapreduce-client-core-2.4.0.jar位置在kylin机器上为 /usr/local/hadoop, 那么默认运行时候在集群中的机器上jar包的位置也需要在同样的位置。这样的话,我们就需要根据Hadoop集群位置来命名Kylin机器上hadoop安装包路径。
其次就是安装hive,hbase,并且注意的是我们采用的是跑kylin构建任务的Hadoop集群与提供查询服务的Hbase底层Hadoop集群分离的架构,需要跑任务的集群能够有权限访问Hbase集群中Hadoop的各个节点。
最后是关于Kylin的配置,Kylin的配置都在$KYLIN_HOME/conf/kylin.properties
需要注意的几个选项:
kylin.metadata.url=kylin_metadata@hbase kylin元数据存储
kylin.hdfs.working.dir=/user/jiangyu2/kylin Kylin构建过程结果存储路径
kylin.hbase.cluster.fs=hdfs://ns-test 如果是两个hdfs,需要设置hbase的hdfs
kylin.hbase.cluster.hdfs.config.file=hbase.hdfs.xml 如果是两个集群,需要设置hbase的conf ,位置在$KYLIN_HOME/conf下面的hbase.hdfs.xml
最后需要修改Hbase的一个配置

  <property>
      <name>hbase.use.dynamic.jar</name>
      <value>false</value>
  </property>

另外,Kylin的权限可以通过LDAP来管理的,也可以通过硬编码,如果采用硬编码,修改位置在$KYLIN_HOME/tomcat/webapps/kylin/WEB-INF/classes/kylinSecurity.xml

        <beans profile="testing">
                <!-- user auth -->
                <bean id="passwordEncoder" class="org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder" />

                <scr:authentication-manager alias="testingAuthenticationManager">
                        <scr:authentication-provider>
                                <scr:user-service>
                                        <scr:user name="MODELER" password="$2a$10$Le5ernTeGNIARwMJsY0WaOLioNQdb0QD11DwjeyNqqNRp5NaDo2FG" authorities="ROLE_MODELER" />
                                        <scr:user name="ANALYST" password="$2a$10$s4INO3XHjPP5Vm2xH027Ce9QeXWdrfq5pvzuGr9z/lQmHqi0rsbNi" authorities="ROLE_ANALYST" />
                                        <scr:user name="ADMIN" password="$2a$10$o3ktIWsGYxXNuUWQiYlZXOW5hWcqyNAFQsSSCSEWoC/BRVMAUjL32" authorities="ROLE_MODELER, ROLE_ANALYST, ROLE_ADMIN" />
                                </scr:user-service>
                                <scr:password-encoder ref="passwordEncoder" />
                        </scr:authentication-provider>
                </scr:authentication-manager>
        </beans>

可以新增加用户,新增加的用户密码由下面的代码来实现

package search;

import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;

/**
 * Created by jiangyu2 on 12/7/16.
 */
public class F {
  public static void main(String[] args) {
    String password = "suda_test";
    PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();
    String encodedPassword = passwordEncoder.encode(password);
    System.out.print(encodedPassword);
  }
}

Druid Batch Job 问题汇总

最近在整理使用Druid Batch Job所遇到的问题,下面一一记录,我使用的Druid版本是0.8.2,所以以下方法适用于0.8.2。
一、依赖包问题
因为很多机器都无法连接外网,所以有必要修改druid的源文件,使他能够从我们的本地中心库下载文件,所需要修改的文件为ExtensionsConfig.java,修改内容如下

   private List<String> remoteRepositories = ImmutableList.of(
-      "https://repo1.maven.org/maven2/",
-      "https://metamx.artifactoryonline.com/metamx/pub-libs-releases-local"
+      "http://10.39.0.110:8081/nexus/content/groups/public",
+      "http://10.39.0.110:8081/nexus/content/repositories/thirdparty"
   );

这样将会从我们的中心库下载所需要的包。
修改完后我们可以把所需要的包拉取的本地用于提交,修改$DRUID_HOME/config/_common/common.runtime.properties

druid.extensions.localRepository=/usr/local/druid-0.8.2/localRepo

指定本地仓库的位置。 随后在$DRUID_HOME目录下执行

java -cp  config/_common:config/broker:/usr/local/hadoop-2.4.0/etc/hadoop:lib/*  io.druid.cli.Main tools pull-deps

这样会将依赖包下载。
如果需要增加自己的的第三方依赖包,也修改$DRUID_HOME/config/_common/common.runtime.properties

druid.extensions.coordinates=["com.sina.hivexec:hive-exec:0.13.0"]

这样提交的时候会将依赖包加入到classpath中。

二、counter计数问题
我们的问题是由于一个第三方包导致获取counter计数抛异常,随后导致任务失败,目前解决方法是通过修改源码,catch住异常,查看是否是由于counter计数引起的,并且查看任务是否成功,成功的话继续下一步任务,而不是直接抛出异常结束。

三、有关reduce个数问题
简单说一下druid的流程,只讲一下partitionsSpec type为hash的情况。
1、如果不指定numShards 那么会分两个任务,第一个任务通过hyperloglog对每一个分区去查找基数大小,reduce会将每个分区的基数大小输出。
随后job会根据targetPartitionSize决定由几个shards来跑这个第二个任务,第二个任务就是生成index,基本流程跟realtime一样,根据日志,生成index。但是如果shards为1,相当于只有一个reduce去跑,会比较慢。这样如果基数是20000, “targetPartitionSize” : 10000,那么每个时间分区就只有20000/10000=2个reduce去跑。
2、如果指定numShards,那么就只有index一个任务,每个时间分区启动numShards个reduce,如果知道大概的数据量以及基数,可以直接指定numShareds.

四、时区问题
由于提交的时候指定的是UTC时区,所以需要在map 以及reduce阶段也制定时区,指定方法为,修改提交机器的mapred-site.xml

  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-Xmx1280M -Duser.timezone=UTC</value>
  </property>
  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-Xmx1536M -Duser.timezone=UTC</value>
  </property>

How to solve “.Git Directory Could Not Be Found! Please Specify a Valid [dotGitDirectory] in Your pom.xml” Problem

When i modify the source code of presto, and execute mvn package, something wrong happened, the stack trace is as follow:

.Git Directory Could Not Be Found! Please Specify a Valid [dotGitDirectory] in Your pom.xml

After google the problem, i find a simple way to solve it. Add this plugin to your pom.xml and everything is ok.

            <plugin>
              <groupId>pl.project13.maven</groupId>
              <artifactId>git-commit-id-plugin</artifactId>
              <version>2.1.15</version>
              <configuration>
                <failOnNoGitDirectory>false</failOnNoGitDirectory>
              </configuration>
            </plugin>

Hadoop Web Page Information

Sometimes, we want make sure which branch we compile the hadoop code. Normally, we can find this information through hadoop web page, for example the namenode web page:


If we want to change the information by self, we should modify the file in
$HADOOP_HOME/hadoop-common-project/hadoop-common/src/main/resources/common-version-info.properties

version=${pom.version}
revision=${version-info.scm.commit}
branch=${version-info.scm.branch}
user=${user.name}
date=${version-info.build.time}
url=${version-info.scm.uri}
srcChecksum=${version-info.source.md5}
protocVersion=${protobuf.version}

Just modify the value , you can change the web page of namenode. For example we can change the branch to branch=tag-20160817 as the image above.

关于Druid时区的问题

Druid使用过程中,一开始比较烦恼的是关于时区的问题。Druid内部采用joda作为时间函数lib,并且内部默认使用的都是UTC时间,而中国实用的是Asia/Shanghai时间,为东八区,差了八个小时,导致默认的数据的ingest与query都存在八个小时的时差。
为了解决这一问题,我们可以从ingest与query两方面进行处理。首先是ingest,默认的格式为

{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

这种格式符合ISO 8601,使用的是UTC时间,如果非要用这种格式,需要前端架一个实时系统,把时间戳改为Asia/Shanghai,通常来说这么做非常费。
所以推荐第二张方法,就是在定义timestamp的解析时候才用以下格式

                    "timestampSpec": {
                        "column": "timestamp",
                        "format": "posix"
                    },

使用posix时间而不是默认的string。这样的话posix时间全球都是统一的,规避了时间转化的问题。
对于query的问题,我们可以指定查询的时区,如下

    "granularity": {
        "type": "period",
        "period": "PT1H",
        "timeZone": "Asia/Shanghai"
    },

这样查询的过程指定了时区,也可以自动转化,防止了时差问题。

Hadoop Balancer问题

最近关注了一下集群Balancer的运转,由于集群非常大,并且有一些长时任务一只运行,磁盘不均衡的问题非常严重,所以快速的均衡数据是非常必要的。
但是从日志上看,我们的Balancer经常hang住。之前的做法是通过一个cron脚本进行检查,hang住一段时间后,自动的重启balancer。
这两天特意看了一下日志和代码,发现hang住的原因是由于npe导致的。具体的代码如下:

             // update locations
             for (String datanodeUuid : blk.getDatanodeUuids()) {
               final BalancerDatanode d = datanodeMap.get(datanodeUuid);
-              if (datanode != null) { // not an unknown datanode
+              if (d != null) { // not an unknown datanode
                 block.addLocation(d);
               }
             }

进行判断的对象错误了,将datanode改为d就好了。原先的做法会导致Balncer提交的copy任务抛出npe错误。

    for (Source source : sources) {
      futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
    }

    // wait for all dispatcher threads to finish
    for (Future<?> future : futures) {
      try {
        future.get();
      } catch (ExecutionException e) {
        LOG.warn("Dispatcher thread failed", e.getCause());
      }
    }

在这部分,由于提交的BlockMoveDispatcher任务抛出了npe,同时在future.get没有设置超时,就会一只hang住。
解决方法:除了修改npe以为,还可以在future.get()设置超时时间,确保不会一直hang住。