Java Runnable convert to Callable Test

package com.jiangyu;
 
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
public class TestConvertThread {
  public static final Random r = new Random();
 
  class Call implements Callable<Object> {
 
    @Override
    public Object call() throws Exception {
      Thread.sleep(r.nextInt(5)*1000);
      System.out.println("finished callable");
      return true;
    }
 
  }
 
  class Run implements Runnable {
 
    @Override
    public void run() {
      try {
        Thread.sleep(r.nextInt(5)*1000);
        System.out.println("finished runnable");
      } catch (InterruptedException e) {
      }
    }
 
  }
 
  public void startJob() throws InterruptedException, ExecutionException, TimeoutException {
    ExecutorService executor = Executors.newCachedThreadPool();
    Future<Object> result = null;
    // this is for callable
    result = executor.submit(new Call());
    result.get(10, TimeUnit.SECONDS);
 
    // this is for runnable convert to callable
    result = executor.submit(Executors.callable(new Run()));
    result.get(10, TimeUnit.SECONDS);
  }
 
  public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
    TestConvertThread tt = new TestConvertThread();
    tt.startJob();
  }
}

测试了一下Runnable convert 到 Callable,需要Future返回类型为Object。JDK中可以使用Executors的Callable方法将Runnalbe直接转化为Callable。

Hadoop 2.0 Datanode

Hadoop 2.0 Datanode 更加细分,将结构分为两个大部分。

DataNode相当于一个包装转发器,将请求发送给相应的处理单元。

DataNode 分为以下两部分:

1、与NameNode汇报的逻辑,block report,heartbeat

2、底层与文件系统交互的datanode 存储逻辑

 

其中第一部分的调用如下:

DataNode —>  BlockProolManager  —>  BPOfferService  —>   BPServiceActor

第二部分的调用如下:

DataNode  —>  FSDataSetImpl     —>     FSVolumeList(FSVolume)     —>     BLockPoolSlice

 

DataNode启动流程:

1、Main

2、StartDatanode

3、初始化BlockPoolManager

4、doRefreshNamenodes

     从配置里面读出相应的namespace id,namenode id(HA)

     初始化BPOfferService,每个namespace对应一个BPOfferService。

     初始化BPActor,每个namenode对应一个BPActor,即Active NameNode和StandbyNameNode各对应一个BPActor。

5、启动所有的BPActor。

     BPActor启动过程如下:

     (1)与NameNode进行handshake,首先建立BPActor对NameNode的RPC连接

     (2)获得namespace info,比较一下与现有的version,目前不会由于svn version不一致直接报错了

     (3)verifyAndSetNamespaceInfo,BPOfferService搞定后,调用DataNode的initBlockPool

     首先向BlockPoolManager注册该BPOfferService。在这个阶段,初始化FSDataSet,初始化存储层,初始化FSVolume,初始化FSVolumeList和ReplicaMap。启动DataBlockScanner和DirectoryScanner。向底层的文件交互FSVolume注册该BlockPool

     (4)handshake第二部,向NameNode注册,NameNode向其内部的DataNode注册该DataNode

     (5)设置BlockReport的细节

VIP配置方法

    通过系统管理员找到一个VIP地址,添加虚拟网卡命令:ifconfig eth0:0 10.210.225.31 up

    关闭虚拟网卡命令 ifconfig eth0:0 down 

SOCAT使用说明

一、SOCAT说明

 

    SOCAT类似于NC,它内部命令比较复杂,需要的时候可以man看一下,主要用于两个流之间的连接。我们主要用它的TCP转发。

 

二、使用说明 

    测试环境为虚拟机,ns1的nn1为10.210.225.30,将10.210.225.35的50070端口全部转发到10.210.225.30:50070。

    nohup socat TCP-LISTEN:50070,fork TCP:10.210.225.30:50070  &Socat1

    这样访问10.210.225.30:50070就可以通过10.210.225.35:50070来进行。

    这样的意义是对hadoop get结果的操作不用落地一次,直接下载到客户前端机。

提车记

  昨天提车,颤颤悠悠的开回家,停入地下车库,总感觉要撞,慢,一定要慢,提醒自己。

使用NFS服务提高hadoop 可靠性

    这是早期写的一篇文章,记录在blogger,自己转过来。

    众所周知 hadoop是一个单点系统,即所谓的spof,所以元数据的保护就是重中之重。目前元数据写两份,分别是本地的磁盘,远程的磁盘。还有一个secondary namenode做冷备。写到远程的磁盘就需要的是NFS服务了。

     NFS(Network File System, 网络文件系统)可以通过网络将分享不同主机(不同的OS)的目录——可以通过NFS挂载远程主机的目录, 访问该目录就像访问本地目录一样。

    首先下载NFS,网址是http://sourceforge.net/projects/nfs/files/ 选择合适的版本下载。到server和client机器安装。

    编辑server主机的文件/etc/exports /etc/hosts.allow /etc/hosts.deny三个文件。

    对/etc/exports  加入例如:/data0 10.39.2.121(rw,no_root_squash)

Continue reading…

Hadoop RPC

一、说明

    Hadoop无论是1.x还是2.x机器不同角色之间的通信全部是通过RPC完成的,RPC底层都是通过Dynamic Proxy完成,无外乎就是TCP连接,验证协议,提取字段等等。所不同的是1.x采用的是plain java自己完成序列化与反序列化,而2.x底层采用的是PB序列化与反序列化,造成了使用的不同。


二、Dynamic Proxy基本概念

    Dynamic Proxy是动态代理,采用的是反射技术,在运行时动态生成对象,要求代理类与实际执行类具有相同的接口,同时实现InvocationHandler接口,最后通过Proxy类生成代理类:

Proxy.newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)

参数第一个为ClassLoader,第二个为共同的interface,第三个是实现的handler方法。这样调用代理的相应方法会调用handler方法。

 

三、RPC in Hadoop 1.x

    Hadoop底层RPC通信采用Dynamic Proxy,这样隔离了不同身份的机器之间底层连接检验等细节,而专注于业务逻辑。

    下面是Hadoop 1.x RPC的一个简单流程,比如DFSClient想要与NameNode通信,建立RPC连接。

(一)Client端

集群错误记录

     今早被人发邮件说集群任务有个reduce一直无法完成,查看了一下日志,发现有大量的创建块操作失败,根据以往的经验,当年曾经被桌面级磁盘坑过,这种情况无外乎是压力过大,导致pipeline长时间无法建立。然后通过ganglia检查后发现并没有太大的压力。

     检查DataNode日志发现有大量的too many open files错误,很低级的错误,装机后没有改好配置。
     ulimit -a 后印证了想法,nofile为1024。
     同时记录一下:
     18内核修改/etc/security/limits.conf文件,32内核除了修改上面的文件,还需要修改/etc/security/limits.d/90-nproc.conf文件。
     *     soft     nofile     65535
     *     hard    nofile     65535 
      

yum downloadonly

     有些时候集群中的机器无法联网,通过yum进行安装,这样就需要找一个相同内核的机器将rpm下载后再到相应的机器上进行安装,以解决这一问题。

     yum 有相应的工具完成这一任务。
     1、首先安装yum-downloadonly。
          yum install yum-downloadonly
     2、如果是新安装,使用命令
          yum install httpd –downloadonly —downloaddir=./httpd
          如果是升级,则使用
          yum update httpd -y –downloadonly —downloaddir=./httpd
     3、然后将rpm拷贝到指定机器
          rpm -ivh           安装即可