Hadoop 服务安全授权说明

曾经我在Hadoop的历史版本上对UGI进行过代码更改,按照UGI,IP映射来确定哪些用户可以访问Hadoop的服务。最近在给一个客户看需求的时候,看到了Hadoop本身是提供这一个服务的,并且做得很细,不得不说Hadoop做得真是包罗万象,事无巨细。
这个需求是用户需要对自己的集群进行把控,防止一些无关的请求拉取数据,防止恶意攻击。
在Hadoop common模块是有专门模块https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/ServiceLevelAuth.html 来做这个事情。
首先需要设置 $HADOOP_CONF_DIR/core-site.xml设置hadoop.security.authorization 为true,打开hadoop 安全认证。随后配置$HADOOP_CONF_DIR/hadoop-policy.xml

这里面可以配置非常多的服务,对应了Hadoop的各种接口(protocol),加入我要设置Client访问HDFS的限制,只要设置

 <property>
    <name>security.client.protocol.acl</name>
    <value>test_user hadoop</value>
    <description>ACL for ClientProtocol, which is used by user code
    via the DistributedFileSystem.
    The ACL is a comma-separated list of user and group names. The user and
    group list is separated by a blank. For e.g. "alice,bob users,wheel".
    A special value of "*" means all users are allowed.</description>
  </property>

  <property>
    <name>security.client.protocol.hosts</name>
    <value>192.168.0.0/24</value>
  </property>

    <property>
    <name>security.client.protocol.hosts.blocked</name>
    <value>192.168.0.10</value>
  </property>

这里面我设置了允许user为test_user, group是hadoop的用户通过DFSClient访问HDFS。并且限定了IP范围是192.168.0网段,但是限制192.168.0.10不能访问。
其他的服务接口也是这么配置,只要设置好name,最后加入hosts和hosts.blocked就可以限制ip访问。

Capacity Scheduler Node Label在queue mapping情况下引起的提交stuck

为一个大客户启用Node Label功能,发现启用后,使用用户提交任务后,appmaster无法启动,导致了整个任务无法启动,集群无法提交任务。
集群的版本是Hadoop 2.7.2,Node Label 把整个集群分为两部分,一部分label为core,一部分label为vvp。在客户端提交任务的时候可以指定队列,这种情况下队列信息是在提交的context里面设置,而对于不显示的指定队列的应用,设置的是默认队列,即default,这段代码可以参考YARN Runner的实现。

appContext.setApplicationId(applicationId);                // ApplicationId
appContext.setQueue(                                       // Queue name
    jobConf.get(JobContext.QUEUE_NAME,
    YarnConfiguration.DEFAULT_QUEUE_NAME));

ResourceManager通过RMClientService接收到提交的请求后,会放置的状态机中,通过多次的异步处理,会生成RMAppAttempt对象,并生成AppMaster的request,相关代码在RMAppManager类中。
方法是validateAndCreateResourceRequest,在这里面提交的appmaster依然使用的是default队列,并没有通过queue mapping获取到他的实际映射队列,导致了无法获取label信息,从而无法调度。

    SchedulerUtils.normalizeAndValidateRequest(amReq,
        scheduler.getMaximumResourceCapability(),
        submissionContext.getQueue(), scheduler, isRecovery, rmContext);

在这段代码中,submissionContext的Queue依然是default,获取的是default对应的label,通常是默认的partition,而如果用户和组映射到了其他的Queue,而这个Queue需要到特定的Label运行,就会导致错误。
解决方法:

    String queueName = submissionContext.getQueue();
    if(scheduler instanceof CapacityScheduler) {
      queueName = ((CapacityScheduler)scheduler).getQueueMappings(queueName, user);
    }
    SchedulerUtils.normalizeAndValidateRequest(amReq,
        scheduler.getMaximumResourceCapability(),
        queueName, scheduler, isRecovery, rmContext);

在Capacity Scheduler中暴露出queue mapping的方法去获取mapping的queue,从而获取到实际的Label。

在Hadoop 2.8.5中这个问题已经被fix了,fix的方法如下:

  if (rmContext.getQueuePlacementManager() != null) {
    // We only do queue mapping when it's a new application
    rmContext.getQueuePlacementManager().placeApplication(
        submissionContext, user);
  }

在生成amReq之前,先根据QueueMapping重新设置queue的Name,从而获取到映射后的Queue,拿到最后的Label。
由于2.8.5这个改动远远大于2.7.3的改动,所以这次采用自己的fix方案解决这一问题。

一次扩容引发的ARP Cache问题

当某个EMR客户进行扩容时,机器接近上千台时,造成了网络通信问题,甚至有机器ping自己都平不通的情况。根据提示,原因是对于大集群来说,默认的centos arp cache配置不适合,需要调整相关参数。
现象是在给一个大客户进行扩容的时候,当机器接近千台的时候,NameNode主节点突然间通信变慢,请求堆积,最后zkfc直接将NameNode进行了failover了,另一台NameNode也是不定期的failover。
查看dmesg,发现了大量的异常日志:

[76391312.109413] net_ratelimit: 97 callbacks suppressed
[76391319.885189] net_ratelimit: 37 callbacks suppressed
[76391325.104167] net_ratelimit: 62 callbacks suppressed
[76391330.508496] net_ratelimit: 60 callbacks suppressed
[76391335.694525] net_ratelimit: 50 callbacks suppressed
[76391343.815606] net_ratelimit: 108 callbacks suppressed

dmesg报错

 

另外,NameNode的gmond metrics 收集也一直报错,无法发送metrics。对于gmond的metrics发送,其实对网络压力很小,如果依然无法发送,说明网络出现了较严重问题。
经过跟其他部门、兄弟团队合作,发现了是由于ARP Cache overflow造成的问题,从而严重的影响了网络性能。
ARP Cache的作用为,ARP表存储了IP地址和MAC地址的映射关系,ARP Cache有以下几个参数:
net.ipv4.neigh.default.gc_thresh1 ARP表小于该数值的时候不做垃圾回收
net.ipv4.neigh.default.gc_thresh2 ARP表大于该数值时,5s内进行垃圾回收
net.ipv4.neigh.default.gc_thresh3 ARP表的最大限额

再从我们系统中取得默认值发现:
net.ipv4.neigh.default.gc_thresh1 = 128
net.ipv4.neigh.default.gc_thresh2 = 512
net.ipv4.neigh.default.gc_thresh3 = 1024

默认配置偏小,导致了集群机器超过1000台后,网络丢包,不稳定现象,修改相关配置。

追加 /etc/sysctl.conf
net.ipv4.neigh.default.gc_thresh1 = 512
net.ipv4.neigh.default.gc_thresh2 = 2048
net.ipv4.neigh.default.gc_thresh3 = 10240
net.nf_conntrack_max = 524288

sysctl -p 更新配置

TensorFlow so file conclict with PyArrow new version

These days, i try to use tensorflow to train on ecs machine. It was okay at the first days, every thing is fine, suddenly, i found i cannot use TensorFlow anymore, even very simple command, like ‘import tensorflow’, can not be executed.
Python crash with Segmentation fault, so i just enable faulthandler.
The error is as below:

Fatal Python error: Segmentation fault

Current thread 0x00007f5f2acd5740 (most recent call first):
  File "<frozen importlib._bootstrap>", line 219 in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 922 in create_module
  File "<frozen importlib._bootstrap>", line 571 in module_from_spec
  File "<frozen importlib._bootstrap>", line 658 in _load_unlocked
  File "<frozen importlib._bootstrap>", line 684 in _load
  File "/usr/lib/conda/envs/python3.6/lib/python3.6/imp.py", line 343 in load_dynamic
  File "/usr/lib/conda/envs/python3.6/lib/python3.6/imp.py", line 243 in load_module
  File "/usr/lib/conda/envs/python3.6/lib/python3.6/site-packages/tensorflow/python/pywrap_tensorflow_internal.py", line 28 in swig_import_helper
  File "/usr/lib/conda/envs/python3.6/lib/python3.6/site-packages/tensorflow/python/pywrap_tensorflow_internal.py", line 32 in <module>

The code is simple:

_mod = imp.load_module('_pywrap_tensorflow_internal', fp, pathname, description)

just import a so file.

It confused me for two days, i have to check what i have done before tensorflow cannot be used. Finally , i remember i just upgraded pyarrow from 0.10.0 to 0.14.0.
So i just downgraded from 0.14.0 to 0.10.0, everything turned to be good.
It is very hard for people to know what is the main reason python throw Segmentation Error. If i don’t remember i upgrade the pyarrow, i think i would not thought it is confict between so file.

Hadoop 减轻Block Report风暴方法(HDFS Block Report Storm)

对于大的Hadoop集群,停机运维后最大的挑战就是Block Report风暴问题,所谓的Block Report风暴就是在NameNode重启后,会有大量的DataNode来汇报Block,来构建NameNode内部的数据结构,包括Block 和DataNode的映射,DataNode包含的所有Block等等。这块是NameNode内存占用的比较大的一部分,另外,大量的DataNode汇报,往往伴随了直接进入老年代,造成GC或者DataNode Block report反映不及时,超过30s socket时间。NameNode甚至在处理这个Block Report的中途,中断了连接,白白耗费了写锁,而DataNode端会休息一段时间重新发起Block report,如此往复,造成了雪崩效果。这就是所谓的HDFS block report风暴。
为了解决这一个问题,社区引入了一个参数dfs.blockreport.split.threshold,默认是100万,如果单机Block数量在100万以下,那么block report会封装成一个RPC,也就是只有一次RPC发送。而如果超过了这个阈值,在DN端这个逻辑是将一个大的block report切分到单盘去做,如果单盘中有一块盘超时,在重新做全部的盘的block report。原理上说这将会降低造成block report的风险和加快重启过程。我做了一个简单的测试,来验证效果。
首先通过阿里云 EMR 购买了一个集群,集群配置如下:
使用30台D1集群:
NameNode 16核 128G
DataNode 32核 128G 5.5T * 12
然后灌入数据,最后灌入了232290855 files and directories, 215077946 blocks = 447368801 total filesystem object(s).
DataNode单机的Block数量在2000万左右。
数据灌完后,重启NameNode,Image Load时间大概是23分钟。
经过计算,大概一次的Block Report时间会在30s左右,所以一次Block Report的RPC大概率会造成Socket timeout。
1.如果将dfs.blockreport.split.threshold设置成1亿条。
NameNode重启后, DataNode一把汇报所有report

2019-11-11 10:51:49,910 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x17b00cec3b356,  containing 12 storage report(s), of which we sent 0. The reports had 21507382 total blocks and used 0 RPC(s). This took 4263 msec to generate and 435410 msecs for RPC and NN processing. Got back no commands.
2019-11-11 10:55:02,672 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x17b8514c85fdf,  containing 12 storage report(s), of which we sent 0. The reports had 21507382 total blocks and used 0 RPC(s). This took 4175 msec to generate and 60061 msecs for RPC and NN processing. Got back no commands.
2019-11-11 11:52:57,836 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x17ebc082c478e,  containing 12 storage report(s), of which we sent 0. The reports had 21507382 total blocks and used 0 RPC(s). This took 5277 msec to generate and 678 msecs for RPC and NN processing. Got back no commands.
2019-11-11 13:26:17,754 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x183dfe776d467,  containing 12 storage report(s), of which we sent 0. The reports had 21426179 total blocks and used 0 RPC(s). This took 5238 msec to generate and 542 msecs for RPC and NN processing. Got back no commands.
2019-11-11 13:26:23,126 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x183e1265a5171,  containing 12 storage report(s), of which we sent 0. The reports had 21426179 total blocks and used 0 RPC(s). This took 5367 msec to generate and 563 msecs for RPC and NN processing. Got back no commands.
2019-11-11 13:26:28,440 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x183e26411ce59,  containing 12 storage report(s), of which we sent 0. The reports had 21426179 total blocks and used 0 RPC(s). This took 5347 msec to generate and 547 msecs for RPC and NN processing. Got back no commands.
2019-11-11 13:26:33,719 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x183e3a2783dc1,  containing 12 storage report(s), of which we sent 0. The reports had 21426179 total blocks and used 0 RPC(s). This took 5359 msec to generate and 484 msecs for RPC and NN processing. Got back no commands.
2019-11-11 13:26:38,967 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x183e4db86a462,  containing 12 storage report(s), of which we sent 0. The reports had 21426179 total blocks and used 0 RPC(s). This took 5269 msec to generate and 480 msecs for RPC and NN processing. Got back no commands.

经过了3个小时,也没有汇报成功。

2.将dfs.blockreport.split.threshold设置成100万条.

2019-11-11 14:18:15,588 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x18695e8fff624,  containing 12 storage report(s), of which we sent 2. The reports had 21426179 total blocks and used 2 RPC(s). This took 10670 msec to generate and 137642 msecs for RPC and NN processing. Got back no commands.
2019-11-11 14:20:04,027 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x186c1389c74b2,  containing 12 storage report(s), of which we sent 0. The reports had 21426179 total blocks and used 0 RPC(s). This took 5248 msec to generate and 60062 msecs for RPC and NN processing. Got back no commands.
2019-11-11 14:20:13,913 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x186bf29b0ccc7,  containing 12 storage report(s), of which we sent 2. The reports had 21426179 total blocks and used 2 RPC(s). This took 5339 msec to generate and 78788 msecs for RPC and NN processing. Got back no commands.
2019-11-11 14:23:29,457 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x186e125f12a27,  containing 12 storage report(s), of which we sent 4. The reports had 21426179 total blocks and used 4 RPC(s). This took 5219 msec to generate and 128366 msecs for RPC and NN processing. Got back no commands.
2019-11-11 14:32:14,183 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x18745b6b058ae,  containing 12 storage report(s), of which we sent 6. The reports had 21426179 total blocks and used 6 RPC(s). This took 5107 msec to generate and 221167 msecs for RPC and NN processing. Got back no commands.
2019-11-11 14:36:13,401 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Unsuccessfully sent block report 0x18778e2f96d16,  containing 12 storage report(s), of which we sent 6. The reports had 21426179 total blocks and used 6 RPC(s). This took 5229 msec to generate and 240599 msecs for RPC and NN processing. Got back no commands.
2019-11-11 14:36:43,543 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Successfully sent block report 0x187825e7cb772,  containing 12 storage report(s), of which we sent 12. The reports had 21426179 total blocks and used 12 RPC(s). This took 5510 msec to generate and 230014 msecs for RPC and NN processing. Got back no commands.

大概经历20分钟,汇报成功。再经过10分钟左右,心跳正常,能够正常提供服务。大概重启时间20+10+20,在1小时左右。大大缩短了时间。
经过测试,通过调整fs.blockreport.split.threshold 是有效的减少br风暴影响,加快汇报速度的方法。

Hadoop DataNode IO高问题排查

最近,有个客户反应,他通过Flume写入HDFS的数据经常超时,写入失败等。我看了一下他的集群,使用的是Aliyun D1机型,即本地盘,写入速度单盘一般都是100MB+,所以在磁盘性能上应该不存在太多问题。
登录集群后,看了一下基本的IO,CPU等信息,发现有大量的磁盘IO Util到达了100%,通过iotop查看io较多的进程,发现是Hadoop DataNode进场占用较多,进而查看到DataNode的一个du -sk 占用了大量的IO。
默认的DataNode会定期10分钟对磁盘空间进行扫描,使用的是DU命令,但是如果目录存在大量文件,并且文件经常变化,会导致DU需要大量的随机读,并引发IO使用率飙升。
至于这个客户的情况是,他通过flume写入了大量的小文件,每个6TB磁盘写入了150万个文件,一次DU将近了5分钟以上,这样子他大部分的时间都在进行无意义的IO操作,从而有效的IO写入操作延迟,甚至中断连接。
针对这一问题,有两种方式解决:
1.https://issues.apache.org/jira/browse/HADOOP-9884
这个jira基本上是希望通过df来代替du
2.修改fs.du.interval参数
修改这个参数,比如每个小时进行一次磁盘空间大小检查操作

最后用户是通过了第二种方式,将磁盘空间检查时间从10分钟提升到了1小时,基本上解决了写入问题。

TensorFlow Session restore checkpoint

最近写了一些TensorFlow的小程序,遇到了一个session无法restore checkpoint的问题,写法非常简单,使用的是

with tf.train.MonitoredTrainingSession(master = server.target,
                                           is_chief = task_index == 0,
                                           checkpoint_dir= checkpoint_dir,
                                           save_checkpoint_secs=20) as sess:

按理说,tf.train.MonitoredTrainingSession能够save和restore checkpoint,经过测试发现,save是没问题的,但是每次训练都是新的模型,没有持续训练。
经过多次查找,才发现正确的写法。
根据TF官网,save 和restore只需要使用tf.train.Saver()就可以解决问题。但是我根据官网去实现,却报了很多错误,最后发现,通过以下方法才可以,首先要获取latest checkpoint文件,就是最近的checkpoint文件,包括模型参数。
然后将模型restore出来,但是别忘了之前要reset这次训练的graph。
代码如下:

checkpoint_dir = "hdfs://emr-header-1:9000/movie"
saver = tf.train.Saver()
epoch = 0

with tf.train.MonitoredTrainingSession(master = server.target,
                                           is_chief = task_index == 0,
                                           checkpoint_dir= checkpoint_dir,
                                           save_checkpoint_secs=20) as sess:
     tf.reset_default_graph()
     sess.run(init)
     latest_path = tf.train.latest_checkpoint(checkpoint_dir=checkpoint_dir)
     saver.restore(sess, latest_path)

Hadoop CGroup 设置

hadoop 2.8 CGroup的设置,CentOS7 默认的CGroup位置在/sys/fs/cgroup/cpu 等,如果Hadoop不设置自动mount,而是使用默认mount位置的话,会mount到/sys/fs/cgroup/cpu/hadoop-yarn,所以如果要在Hadoop2.8版本请做以下操作:

假设以Hadoop用户启动YARN:
mkdir -p /sys/fs/cgroup/cpu/hadoop-yarn
chown -R hadoop:hadoop /sys/fs/cgroup/cpu/hadoop-yarn

修改如下配置:
yarn.nodemanager.container-executor.class   org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor
yarn.nodemanager.linux-container-executor.resources-handler.class  org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler
yarn.nodemanager.linux-container-executor.group hadoop

然后重启Node Manager就可以了

解决Docker容器中 mpi 日志丢失问题

最近使用Docker跑Tensorflow的时候,经常发现通过docker java api获取不到Tensorflow的日志获取日志的代码如下:

  private class DockerLogReader extends LogContainerResultCallback {
    @Override
    public void onStart(Closeable stream) {
      System.out.println("start");
    }

    @Override
    public void onNext(Frame item) {
      System.out.print(new String(item.getPayload()));
    }

    @Override
    public void onError(Throwable throwable) {
      System.out.print(throwable.getMessage());
    }

    @Override
    public void onComplete() {
      super.onComplete();
      System.out.println("Docker exit.");
    }
  }

正常情况下,代码会走到onNext中,从而收集到Docker中正在运行的TensorFlow的日志,由于使用的是Horovod,所以底层使用的是MPI去运行程序。然后在运行过程中,只打印了Python程序中的日志,而TensorFlow中的日志全部丢失了。
为了解决这一问题,单独起了两个Docker container,然后手动启动mpi,mpi的命令为:

mpirun  --allow-run-as-root -np 2 ***** >stdout 2>stderr

将命令重定向后发现stdout,stderr就是 docker java获取的日志,但是console能够打出日志,查询了mpi的文档,发现了一个选项-tag-output,同时将stderr重定向的stdout中

mpirun -tag-output --allow-run-as-root -np 2 ***** 2>&1

经过修改后,日志全部打印出来。成功解决这一问题。

pyspark with jupyter

首先配置jupyter config文件。

jupyter-notebook --generate-config

修改jupyter config文件

c.NotebookApp.port = 18888
c.NotebookApp.ip = '0.0.0.0'
c.NotebookApp.allow_root = True

当然要配置好spark,emr环境spark已经完全配置正确。配置pyspark参数

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

启动pyspark即可。

pyspark --master yarn