1.RPC常规实现
RPC实现通常包含下图几个部分:
@startuml
[Caller]
[ClientStub]
[NetClient]
[NetServer]
[ServerStub]
[Callee]
[Caller] --> [ClientStub]
[ClientStub] --> [NetClient]
[NetClient] -right-> [NetServer]
[NetServer] -up-> [ServerStub]
[ServerStub] -up-> [Callee]
@enduml
Caller想要调用Callee提供的方法,可以通过直接调用ClientStub的同名方法来实现。ClientStub提供了Callee在本地的代理对象,ClientStub中的方法在被调用后会将请求参数进行封装和序列化,最终通过底层NetClient发送到服务端。
服务端则由NetServer来负责监听网络请求,并将收到的请求交给ServerStub,由其进行解析和Callee的调用,最终的调用结果原路返回给客户端。
2.HDFS Client的RPC实现
HDFS的RPC实现也是类似上图中的思路:
@startuml
skinparam linetype ortho
skinparam component {
backgroundColor<<impl>> #AntiqueWhite
}
rectangle ClientImpl {
component ClientStub实现 <<impl>>{
[Protocol]
[RPC]
[RPCEngine]
[ProtocolTranslatorPB]
Protocol -[hidden]r-> RPC
RPC -[hidden]r-> RPCEngine
Protocol -[hidden]d-> ProtocolTranslatorPB
}
component NetClient实现 <<impl>>{
[Client]
[Call]
[Connection]
Client -[hidden]r-> Call
Call -[hidden]r-> Connection
}
ClientStub实现 -[hidden]d--> NetClient实现
}
rectangle Standard {
[Caller]
[ClientStub]
[NetClient]
[NetServer]
[ServerStub]
[Server]
[Caller] -d--> [ClientStub]
[ClientStub] -d--> [NetClient]
[NetClient] -r--> [NetServer]
[NetServer] -u--> [ServerStub]
[ServerStub] -u--> [Server]
ClientStub -l-> ClientStub实现
NetClient -l-> NetClient实现
NetServer -r-> NetServer实现
ServerStub -r-> ServerStub实现
}
rectangle ServerImpl {
component NetServer实现 <<impl>>{
[Server] as Server1
[RPC.Server]
[RpcEngine.Server]
[ProtocolServerSideTranslatorPB]
}
component ServerStub实现 <<impl>>{
[RpcInvoker]
}
ServerStub实现 -[hidden]d- NetServer实现
}
@enduml
下面就针对HDFS中ClientStub和NetClient的实现来进行逐一分析:
2.1 ClientStub实现
上面提到,ClientStub的主要作用是作为代理对象将客户端调用通过NetClient转发到服务端,涉及到4个(类)核心类:Protocol,RPC,RPCEngine,ProtocolTranslatorPB。
2.1.1 Protocol类:
Protocol类主要是为了定义RPC调用的接口,服务端实现这些接口,ClientStub则是根据Protocol来代理请求。HDFS定义了较多的Protcol,比较常见的有:
- ClientProtocol:用于DFSClient和Namenode交互
- ClientDatanodeProtocol:用于DFSClient和Datanode交互
- DataTransferProtocol:用于DFSClient从Datanode读写数据
- DatanodeProtocol:用于Datanode与Namenode交互
- NamenodeProtocol:ActiveNamenode与StandByNamenode交互
2.1.2 RPC类:
RPC类是上层逻辑(Caller)进行RPC调用的入口类,HDFS定义的诸多Protocol都需要通过Rpc.getProxy(Protocol, ...)
来生成ClientStub,然后才能使用ClientStub实例进行RPC调用。
2.1.3 RpcEngine:
考虑到ClientStub在对RPC请求进行封装和序列化时可能有多种实现,所以HDFS并没有将生成ClientStub和ServerStub的逻辑(getProxy(), getServer())全部放到RPC类中,而是将这一工作封装成RpcEngine接口,然后在RPC类中使用HashMap维护Protocol与RpcEngine实现的映射,下游实现可以按需要注册到HashMap中。这样当客户端调用Rpc.getProxy(Protocol, ...)
时就会由对应的RpcEngine负责生成ClientStub。
HDFS默认提供了两种RpcEngine实现:ProtobufRpcEngine
和WritableRpcEngine
,分别用于支持Protobuf和Writable两种序列化方式。下面详细介绍ProtobufRpcEngine的实现(HDFS大部分RPC调用都是通过该引擎实现)。
创建ClientStub
ClientStub的创建过程如下图::
@startuml
skinparam linetype ortho
skinparam component {
backgroundColor<<impl>> #AntiqueWhite
}
Caller->RPC: getProtocolProxy(Protocol1)
activate RPC
RPC->RPC: getProtocolEngine()
RPC->ProtobufRpcEngine: getProxy(Protocol1)
activate ProtobufRpcEngine
participant Invoker <<RpcInvocationHandler>>
ProtobufRpcEngine->Invoker: new
activate Invoker
Invoker-->ProtobufRpcEngine: Invoker
deactivate Invoker
ProtobufRpcEngine->"Protocol1$Proxy": Proxy.newProxyInstance(Protocol1, Invoker)
activate "Protocol1$Proxy"
"Protocol1$Proxy" --> ProtobufRpcEngine: Protocol1
deactivate "Protocol1$Proxy"
ProtobufRpcEngine-->RPC: Protocol1
deactivate ProtobufRpcEngine
RPC --> Caller: Protocol1
deactivate RPC
@enduml
从这里就可以看到RPC.getProtocolEngine()从本地HashMap获取到当前协议的RpcEngine为ProtobufRpcEngine,然后调用ProtobufRpcEngine.getProxy()创建了一个代理类返回给Caller,对于该代理类的所有请求都会由Invoker进行序列化并通过底层NetClient发送到Server端。
ClientStub调用
@startuml
skinparam linetype ortho
skinparam component {
backgroundColor<<impl>> #AntiqueWhite
}
Caller->"Protocol1$Proxy": Protocol1.function1()
activate "Protocol1$Proxy"
"Protocol1$Proxy"->Invoker: invoke()
activate Invoker
Invoker->RpcRequestWrapper: new()
activate RpcRequestWrapper
RpcRequestWrapper-->Invoker: RpcRequestWrapper
deactivate RpcRequestWrapper
Invoker->Client: call(Writable RpcRequestWrapper)
activate Client
Client-->Invoker: RpcResponseWrapper
deactivate Client
Invoker-->"Protocol1$Proxy": return
deactivate Invoker
"Protocol1$Proxy"-->Caller: return
deactivate "Protocol1$Proxy"
@enduml
从上图可以看出,对RPC请求的序列化和发送操作实际是由Invoker来执行的,Invoker会将调用方法和参数封装成RpcRequestWrapper发送给Server端,同时将服务端返回的RpcResponseWrapper解析成方法的返回类型返回给Caller。
WritableRpcEngine的实现也是类似的,只是将请求和响应的序列化方式切换成了Writable,将RpcRequest(Response)Wrapper换成了内部类Invocation,这里附上类图,就不再详细介绍了
@startuml
'skinparam linetype ortho
interface RpcEngine{
+getProxy()
+getServer()
}
class RPC
abstract Server
RPC +-- Server
interface RpcInvocationHandler {
+ConnectionId getConnectionId()
+Object invoke(proxy, method, args)
}
rectangle WritableRpcEngine {
RpcEngine <|.. WritableRpcEngine
WritableRpcEngine *-- Invoker
WritableRpcEngine *-- Invocation
class "Server" as Server1
WritableRpcEngine *-- Server1
Server <|-- Server1
Server1 *-- WritableRpcInvoker
RpcInvocationHandler <|.. Invoker
}
rectangle ProtobufRpcEngine {
RpcEngine <|.. ProtobufRpcEngine
class "Server" as Server2
Server <|-- Server2
ProtobufRpcEngine *-- Server2
Server2 *-- ProtobufRpcInvoker
class "Invoker" as invoker2
ProtobufRpcEngine *-- invoker2
ProtobufRpcEngine *-- RpcWrapper
RpcWrapper <|.. RpcMessageWithHeader
RpcWrapper <|.. RpcResponseWrapper
ProtobufRpcEngine *-- RpcResponseWrapper
RpcMessageWithHeader <|-- RpcRequestWrapper
ProtobufRpcEngine *-- RpcRequestWrapper
RpcInvocationHandler <|.. invoker2
}
@enduml
2.1.4 ProtocolTranslatorPB类
Hadoop 0.23.0及其之前的版本只有WritableRpcEngine,该引擎实现较为简单:客户端记录调用代理对象的方法和参数,服务端通过反射直接调用对应方法,但是这种实现意味着客户端和服务端必须使用相同的协议类,无法实现向前向后兼容,也就意味着如果API发生变化:
- 集群升级,所有客户端都要升级
- 不支持滚动更新
- 多集群代理不支持跨版本
- ...
更加详细的介绍可以参考这里
所以新增加的ProtobufRpcEngine就是来解决这个问题。每一个协议都定义一个pb文件来用于传输(例如针对ClientProtocol就定义了ClientNamenodeProtocolPB.class),获取ClientStub代理时也是获取ClientNamenodeProtocolPB的代理对象,然后使用适配器类ClientNamenodeProtocolTranslatorPB包裹代理类来将ClientProtocol调用转换成ClientNamenodeProtocolPB调用,还可以同时协调不同版本之间的兼容性问题,而非像WritableRpcEngine一样直接获取ClientProtocol的代理对象:
@startuml
interface ClientProtocol <<Target>>
Caller ..> ClientProtocol
class ClientNamenodeProtocolTranslatorPB
ClientNamenodeProtocolTranslatorPB <<Adapter>>
ClientProtocol <|-- ClientNamenodeProtocolTranslatorPB
class ClientNamenodeProtocolPB
ClientNamenodeProtocolPB <<Adaptee>>
ClientNamenodeProtocolTranslatorPB o-- ClientNamenodeProtocolPB
class "ClientNamenodeProtocolPB" as c2 <<Target>>
interface "ClientProtocol" as c1
class NameNodeRpcServer <<Adaptee>>
class ClientNamenodeProtocolServerSideTranslatorPB
ClientNamenodeProtocolServerSideTranslatorPB <<Adapter>>
c2 <|-u- ClientNamenodeProtocolServerSideTranslatorPB
ClientNamenodeProtocolServerSideTranslatorPB o-u- NameNodeRpcServer
c1 <|.d. NameNodeRpcServer
ClientNamenodeProtocolTranslatorPB .[hidden]r. ClientNamenodeProtocolServerSideTranslatorPB
@enduml
2.2 NetClient实现
NetClient由Client类实现,Client类包含两个核心的内部类:Call和Connection;Call主要是记录此次调用的(id,请求,响应,重试次数等),而Connection则是一个线程类,封装了底层的网络连接(server地址,Socket,In/OutputStream等)用于和Server间的网络通信。
@startuml
ProtobufRpcEngine.Invoker ->Client: call(RpcRequestWrapper)
activate Client
Client ->Call: createCall()
activate Call
Call -->Client: Call
deactivate Call
Client ->Connection: new()
activate Connection
Connection --> Client: Connection
Client -> Connection: addCall()
Client -> Connection: setupIOstreams()
Connection -> Connection: setupConnection()
Connection -> Connection: start() Connection线程
Connection --> Client:
Client -> Connection: sendRpcRequest()
participant sendParamsExecutor <<ExecutorService>>
Connection ->sendParamsExecutor : submit()
activate sendParamsExecutor
sendParamsExecutor -->Connection: connection.out.write()
deactivate sendParamsExecutor
Connection -->Client:
Client ->Call: wait()
activate Call
Connection ->Connection: receiveRpcResponse() Connection线程
Connection -->Call: setRpcResponse()
deactivate Connection
Call -> Call: notify()
Call --> Client: return
Client -> Call: getResponse()
Call -->Client: Writable
deactivate Call
Client -->ProtobufRpcEngine.Invoker: Message
deactivate Client
@enduml
3.总结
以上就是HDFS Client RPC实现的主要内容,对于文件传输,失败重试和Namenode切换在客户端的实现后面会单独开篇介绍。