HDFS学习笔记之:(1)RPC客户端实现

1.RPC常规实现

RPC实现通常包含下图几个部分:

@startuml

[Caller]
[ClientStub]
[NetClient]
[NetServer]
[ServerStub]
[Callee]

[Caller] --> [ClientStub]
[ClientStub] --> [NetClient]
[NetClient] -right-> [NetServer]
[NetServer] -up-> [ServerStub]
[ServerStub] -up-> [Callee]

@enduml

Caller想要调用Callee提供的方法,可以通过直接调用ClientStub的同名方法来实现。ClientStub提供了Callee在本地的代理对象,ClientStub中的方法在被调用后会将请求参数进行封装和序列化,最终通过底层NetClient发送到服务端。

服务端则由NetServer来负责监听网络请求,并将收到的请求交给ServerStub,由其进行解析和Callee的调用,最终的调用结果原路返回给客户端。

2.HDFS Client的RPC实现

HDFS的RPC实现也是类似上图中的思路:

@startuml
skinparam linetype ortho

skinparam component {
  backgroundColor<<impl>> #AntiqueWhite
}

rectangle ClientImpl {
  component ClientStub实现 <<impl>>{
      [Protocol]
      [RPC]
      [RPCEngine]
      [ProtocolTranslatorPB]
      Protocol -[hidden]r-> RPC
      RPC -[hidden]r-> RPCEngine
      Protocol -[hidden]d-> ProtocolTranslatorPB
  }

  component NetClient实现 <<impl>>{
      [Client]
      [Call]
      [Connection]
      Client -[hidden]r-> Call
      Call -[hidden]r-> Connection
  }

  ClientStub实现 -[hidden]d--> NetClient实现

}


rectangle Standard {
  [Caller]
  [ClientStub]
  [NetClient]
  [NetServer]
  [ServerStub]
  [Server]
  [Caller] -d--> [ClientStub]
  [ClientStub] -d--> [NetClient]
  [NetClient] -r--> [NetServer]
  [NetServer] -u--> [ServerStub]
  [ServerStub] -u--> [Server]

  ClientStub -l-> ClientStub实现
  NetClient -l-> NetClient实现
  NetServer -r-> NetServer实现
  ServerStub -r-> ServerStub实现
}


rectangle ServerImpl {
  component NetServer实现 <<impl>>{
      [Server] as Server1
      [RPC.Server]
      [RpcEngine.Server]
      [ProtocolServerSideTranslatorPB]
  }


  component ServerStub实现 <<impl>>{
      [RpcInvoker]
  }
  ServerStub实现 -[hidden]d- NetServer实现
}

@enduml

下面就针对HDFS中ClientStub和NetClient的实现来进行逐一分析:

2.1 ClientStub实现

上面提到,ClientStub的主要作用是作为代理对象将客户端调用通过NetClient转发到服务端,涉及到4个(类)核心类:Protocol,RPC,RPCEngine,ProtocolTranslatorPB。

2.1.1 Protocol类:

Protocol类主要是为了定义RPC调用的接口,服务端实现这些接口,ClientStub则是根据Protocol来代理请求。HDFS定义了较多的Protcol,比较常见的有:

  • ClientProtocol:用于DFSClient和Namenode交互
  • ClientDatanodeProtocol:用于DFSClient和Datanode交互
  • DataTransferProtocol:用于DFSClient从Datanode读写数据
  • DatanodeProtocol:用于Datanode与Namenode交互
  • NamenodeProtocol:ActiveNamenode与StandByNamenode交互

2.1.2 RPC类:

RPC类是上层逻辑(Caller)进行RPC调用的入口类,HDFS定义的诸多Protocol都需要通过Rpc.getProxy(Protocol, ...)来生成ClientStub,然后才能使用ClientStub实例进行RPC调用。

2.1.3 RpcEngine:

考虑到ClientStub在对RPC请求进行封装和序列化时可能有多种实现,所以HDFS并没有将生成ClientStub和ServerStub的逻辑(getProxy(), getServer())全部放到RPC类中,而是将这一工作封装成RpcEngine接口,然后在RPC类中使用HashMap维护Protocol与RpcEngine实现的映射,下游实现可以按需要注册到HashMap中。这样当客户端调用Rpc.getProxy(Protocol, ...)时就会由对应的RpcEngine负责生成ClientStub。

HDFS默认提供了两种RpcEngine实现:ProtobufRpcEngineWritableRpcEngine,分别用于支持Protobuf和Writable两种序列化方式。下面详细介绍ProtobufRpcEngine的实现(HDFS大部分RPC调用都是通过该引擎实现)。

创建ClientStub

ClientStub的创建过程如下图::

@startuml
    skinparam linetype ortho

skinparam component {
    backgroundColor<<impl>> #AntiqueWhite
}

Caller->RPC: getProtocolProxy(Protocol1)
activate RPC
RPC->RPC: getProtocolEngine()
RPC->ProtobufRpcEngine: getProxy(Protocol1)
activate ProtobufRpcEngine
participant Invoker <<RpcInvocationHandler>>
ProtobufRpcEngine->Invoker: new
activate Invoker
Invoker-->ProtobufRpcEngine: Invoker
deactivate Invoker
ProtobufRpcEngine->"Protocol1$Proxy": Proxy.newProxyInstance(Protocol1, Invoker)
activate "Protocol1$Proxy"
"Protocol1$Proxy" --> ProtobufRpcEngine: Protocol1  
deactivate "Protocol1$Proxy"
ProtobufRpcEngine-->RPC: Protocol1
deactivate ProtobufRpcEngine
RPC --> Caller: Protocol1
deactivate RPC

@enduml

从这里就可以看到RPC.getProtocolEngine()从本地HashMap获取到当前协议的RpcEngine为ProtobufRpcEngine,然后调用ProtobufRpcEngine.getProxy()创建了一个代理类返回给Caller,对于该代理类的所有请求都会由Invoker进行序列化并通过底层NetClient发送到Server端。

ClientStub调用

@startuml

skinparam linetype ortho

skinparam component {
    backgroundColor<<impl>> #AntiqueWhite
}

Caller->"Protocol1$Proxy": Protocol1.function1()
activate "Protocol1$Proxy"
"Protocol1$Proxy"->Invoker: invoke()
activate Invoker
Invoker->RpcRequestWrapper: new()
activate RpcRequestWrapper
RpcRequestWrapper-->Invoker: RpcRequestWrapper
deactivate RpcRequestWrapper
Invoker->Client: call(Writable RpcRequestWrapper)
activate Client
Client-->Invoker: RpcResponseWrapper
deactivate Client
Invoker-->"Protocol1$Proxy": return
deactivate Invoker
"Protocol1$Proxy"-->Caller: return
deactivate "Protocol1$Proxy"

@enduml

从上图可以看出,对RPC请求的序列化和发送操作实际是由Invoker来执行的,Invoker会将调用方法和参数封装成RpcRequestWrapper发送给Server端,同时将服务端返回的RpcResponseWrapper解析成方法的返回类型返回给Caller。

WritableRpcEngine的实现也是类似的,只是将请求和响应的序列化方式切换成了Writable,将RpcRequest(Response)Wrapper换成了内部类Invocation,这里附上类图,就不再详细介绍了

@startuml

'skinparam linetype ortho

interface RpcEngine{
    +getProxy()
    +getServer()
}
class RPC
abstract Server
RPC +-- Server
interface RpcInvocationHandler {
    +ConnectionId getConnectionId()
    +Object invoke(proxy, method, args)
}

rectangle WritableRpcEngine { 
    RpcEngine <|.. WritableRpcEngine
    WritableRpcEngine *-- Invoker
    WritableRpcEngine *-- Invocation
    class "Server" as Server1
    WritableRpcEngine *-- Server1
    Server <|-- Server1
    Server1 *-- WritableRpcInvoker
    RpcInvocationHandler <|.. Invoker
}


rectangle ProtobufRpcEngine { 
    RpcEngine <|.. ProtobufRpcEngine
    class "Server" as Server2
    Server <|-- Server2
    ProtobufRpcEngine *-- Server2
    Server2 *-- ProtobufRpcInvoker
    class "Invoker" as invoker2
    ProtobufRpcEngine *-- invoker2
    ProtobufRpcEngine *-- RpcWrapper
    RpcWrapper <|.. RpcMessageWithHeader
    RpcWrapper <|.. RpcResponseWrapper
    ProtobufRpcEngine *-- RpcResponseWrapper
    RpcMessageWithHeader <|-- RpcRequestWrapper
    ProtobufRpcEngine *-- RpcRequestWrapper
    RpcInvocationHandler <|.. invoker2
}
@enduml

2.1.4 ProtocolTranslatorPB类

Hadoop 0.23.0及其之前的版本只有WritableRpcEngine,该引擎实现较为简单:客户端记录调用代理对象的方法和参数,服务端通过反射直接调用对应方法,但是这种实现意味着客户端和服务端必须使用相同的协议类,无法实现向前向后兼容,也就意味着如果API发生变化:

  • 集群升级,所有客户端都要升级
  • 不支持滚动更新
  • 多集群代理不支持跨版本
  • ...
    更加详细的介绍可以参考这里
    所以新增加的ProtobufRpcEngine就是来解决这个问题。每一个协议都定义一个pb文件来用于传输(例如针对ClientProtocol就定义了ClientNamenodeProtocolPB.class),获取ClientStub代理时也是获取ClientNamenodeProtocolPB的代理对象,然后使用适配器类ClientNamenodeProtocolTranslatorPB包裹代理类来将ClientProtocol调用转换成ClientNamenodeProtocolPB调用,还可以同时协调不同版本之间的兼容性问题,而非像WritableRpcEngine一样直接获取ClientProtocol的代理对象:
@startuml
 
    interface ClientProtocol <<Target>>
    Caller ..> ClientProtocol
    class ClientNamenodeProtocolTranslatorPB
    ClientNamenodeProtocolTranslatorPB <<Adapter>> 
    ClientProtocol <|-- ClientNamenodeProtocolTranslatorPB
    class ClientNamenodeProtocolPB
    ClientNamenodeProtocolPB <<Adaptee>>
    ClientNamenodeProtocolTranslatorPB o-- ClientNamenodeProtocolPB


    class "ClientNamenodeProtocolPB" as c2 <<Target>>
    interface "ClientProtocol" as c1 
    class NameNodeRpcServer <<Adaptee>>
    class ClientNamenodeProtocolServerSideTranslatorPB
    ClientNamenodeProtocolServerSideTranslatorPB <<Adapter>>
    c2 <|-u- ClientNamenodeProtocolServerSideTranslatorPB
    ClientNamenodeProtocolServerSideTranslatorPB o-u- NameNodeRpcServer
    c1 <|.d. NameNodeRpcServer

    ClientNamenodeProtocolTranslatorPB .[hidden]r. ClientNamenodeProtocolServerSideTranslatorPB
    
@enduml

2.2 NetClient实现

NetClient由Client类实现,Client类包含两个核心的内部类:Call和Connection;Call主要是记录此次调用的(id,请求,响应,重试次数等),而Connection则是一个线程类,封装了底层的网络连接(server地址,Socket,In/OutputStream等)用于和Server间的网络通信。

@startuml
    ProtobufRpcEngine.Invoker ->Client: call(RpcRequestWrapper)
    activate Client
    Client ->Call: createCall()
    activate Call
    Call -->Client: Call
    deactivate Call
    Client ->Connection: new()  
    activate Connection
    Connection --> Client: Connection
    Client -> Connection: addCall()
    Client -> Connection: setupIOstreams()
    Connection -> Connection: setupConnection()
    Connection -> Connection: start() Connection线程
    Connection --> Client: 
    Client -> Connection: sendRpcRequest()
    participant sendParamsExecutor <<ExecutorService>>
    Connection ->sendParamsExecutor : submit()
    activate sendParamsExecutor
    sendParamsExecutor -->Connection: connection.out.write()
    deactivate sendParamsExecutor
    Connection -->Client: 
    Client ->Call: wait()
    activate Call
    Connection ->Connection: receiveRpcResponse() Connection线程
    Connection -->Call: setRpcResponse()
    deactivate Connection
    Call -> Call: notify()
    Call --> Client: return
    Client -> Call: getResponse()
    Call -->Client: Writable
    deactivate Call
    Client -->ProtobufRpcEngine.Invoker: Message
    deactivate Client
@enduml

3.总结

以上就是HDFS Client RPC实现的主要内容,对于文件传输,失败重试和Namenode切换在客户端的实现后面会单独开篇介绍。