上一篇介绍了RPC客户端的核心实现,这一篇继续来看服务端的实现。服务端的NetServer由Server类及其子类实现,而ServerStub则由RpcInvoker接口及其实现类来实现。
@startuml
'skinparam linetype ortho
abstract "Server" as ServerTop #Yellow
class RPC
abstract Server {
}
ServerTop <|-d- Server
RPC +-- Server
interface RpcInvoker #Yellow {
call(Server server, String protocol, Writable rpcRequest, long receiveTime)
}
RPC +-- RpcInvoker
class WritableRpcEngine
class ProtobufRpcEngine
class "Server" as Server1
class "Server" as Server2
WritableRpcEngine +-d- Server1
ProtobufRpcEngine +-d- Server2
Server1 .u.|> Server
Server2 .u.|> Server
Server1 +-d- WritableRpcInvoker
Server2 +-d- ProtobufRpcInvoker
WritableRpcInvoker .u.|> RpcInvoker
ProtobufRpcInvoker .u|> RpcInvoker
class WritableRpcInvoker
class ProtobufRpcInvoker
@enduml
1. NetServer实现
Server抽象类包含了HDFS对于NetServer实现的主要内容,其子类RPC.Server以及RpcEngine.Server只是增加了:引擎与协议映射注册,RpcInvoker的实现,RpcInvoker与协议映射注册。
1.1 Reactor模型实现
为了保证服务端性能Server并没有像Client端一样为每一个连接创建一个线程,而是使用Reactor模型来优化性能。
@startuml
'skinparam linetype ortho
'skinparam linetype polyline
rectangle Clients {
cloud Client1
cloud Client2
cloud ClientN
Client1 -[hidden]d-> Client2
Client2 -[hidden]d-> ClientN
}
Clients .[hidden]r.> Server
rectangle Server {
agent Listener
collections Readers
collections Handlers
agent Responder
queue CallQueue
Listener -d-> Readers: 2. 注册readSelector到新SocketChannel
Readers -r-> CallQueue: 4. 请求加入CallQueue
CallQueue -d-> Handlers: 5.消费并处理请求
Handlers -l-> Responder: 6. Handler线程未能发送完响应的连接
}
Responder .[norank]u.> Client1
Responder .[norank]u.> Client2
Responder .[norank]l.> ClientN
Client1 .u.> Listener: 1. acceptSelector.select()
ClientN .u.> Listener
Client2 .u.> Listener
@enduml
如上图所示:
- Listener负责监听新建立的连接,并将连接绑定到一组Reader中的某一个
- 每个Reader通过readSelector监听到读取事件时会进行数据读取,读取到一个完整请求后将请求加入队列中
- Handlers负责从队列中不断消费和处理请求,并将最终结果写入OutputStream。没有写入完成的会注册到Responder。
- Responder负责将监听可写入事件,将Handlers没有写入完成的数据继续写入(典型场景:网络抖动)
这部分的实现是使用Java自带的NIO实现,大家可自行了解。
1.2 Server创建流程
以Namenode为例,Namenode在启动时会创建底层NetServer,启动流程如下:
@startuml
NameNode -> NameNode: createRpcServer()
NameNode ->NameNodeRpcServer: new()
activate NameNodeRpcServer
NameNodeRpcServer -> RPC: setProtocolEngine()
Note over RPC: 1.创建各协议BlockingService
NameNodeRpcServer -> BlockingService: new(ClientNamenodeProtocolServersideTranslatorPB)
Note over RPC, BlockingService: 2.创建clientRpcServer和serviceRpcServer
NameNodeRpcServer ->RPC.Builder: build()
RPC.Builder ->ProtobufRpcEngine: getServer()
ProtobufRpcEngine ->Server: new()
Server ->Listener: new()
Listener ->Reader: new()
Reader -->Listener: Reader
Listener -> Reader: start()
loop doRunLoop()新线程读取请求
Reader -> Reader: doRead()
Reader ->Selector: select() & selectedKeys()
Selector -->Reader: Iterator<SelectionKey>
end
Listener -->Server: Listener
Server ->Handler: new()
Handler -->Server: Handler
Server ->Responder: new()
Responder -->Server: Responder
Server -->ProtobufRpcEngine: Server
ProtobufRpcEngine -->RPC.Builder: Server
RPC.Builder -->NameNodeRpcServer: Server
Note over RPC: 3.注册剩余支持的协议及BlockingService
NameNodeRpcServer -> RPC: setProtocolEngine()
NameNodeRpcServer -> Server: addProtocl()
NameNodeRpcServer -->NameNode: NameNodeRpcServer
deactivate NameNodeRpcServer
@enduml
可以看到NamNodeRpcServer启动了两个Server:clientRpcServer和serviceRpcServer,分别用于处理客户端请求和Datanode请求。
1.3 Server启动:
Server启动流程较为简单,因为大部分实现都是线程类,直接启动线程即可:
@startuml
NameNode -> NameNode: startCommonServices()
NameNode -> NameNodeRpcServer: start()
activate NameNodeRpcServer
Note over Server: 4.启动clientRpcServer serviceRpcServer
NameNodeRpcServer -> Server: start()
activate Server
Server -> Responder: Thread.start()
activate Responder
Responder --> Server: void
deactivate Responder
Server -> Listener: Thread.start()
activate Listener
Listener --> Server: void
deactivate Listener
Server -> Handler: Thread.start()
activate Handler
Handler --> Server: void
deactivate Handler
Server --> NameNodeRpcServer: void
deactivate Server
NameNodeRpcServer --> NameNode
deactivate NameNodeRpcServer
@enduml
1.4 Server请求处理
Listener实例创建时会注册一个acceptSelector到ServerSocketChannel,通过监听该channel的accept事件来分配连接到Reader:
@startuml
loop 新线程通过acceptSelector获取新io事件
Listener -> Selector: select() && selectedKeys()
activate Selector
Selector --> Listener: SelectionKey
deactivate Selector
Listener -> Listener: doAccept()
Listener -> Reader: 轮询获取一个Reader,addConnection()加入其pendingConnections
end
@enduml
分配到Reader的channel会加入pendingConnections,reader会不断读取该队列并注册readSelector到该队列,监听到OP_READ事件后读取数据,读取到一个完整请求后将其加入队列
@startuml
loop 新线程通过readSelector获取新io事件
Reader -> Channel: register()将readSelector注册到pendingConnections中的连接
activate Channel
Channel --> Reader: return
deactivate Channel
Reader -> Selector: select() && selectedKeys()
activate Selector
Selector --> Reader: SelectionKey
deactivate Reader
deactivate Selector
Reader -> Reader: doRead()
Reader -> Call: new(RpcRequestHeader, RpcRequestWrapper)
activate Call
Call --> Reader: Call
deactivate Call
Reader -> CallQueueManager: put(Call)
activate CallQueueManager
CallQueueManager --> Reader: return
deactivate CallQueueManager
end
@enduml
Handler从队列中取出请求进行处理,请求处理完后写入响应,但如果响应数据较大或者当前网络状况不佳则可能并不能完全发送完成,所以这里引入了Responder,Handler会将writeSelector注册到未能完全写入的channel,Responder线程则会根据writeSelector筛选出当前可写的channel继续写入剩余的响应数据
@startuml
loop 新线程通过CallQueueManager获取新Call
Handler -> CallQueueManager: take()
activate CallQueueManager
CallQueueManager --> Handler: Call
deactivate CallQueueManager
Handler -> RPC.Server: call(Call)
activate RPC.Server
RPC.Server -> Server: getRpcInvoker()
activate Server
Server --> RPC.Server: ProtobufRpcInvoker
RPC.Server -> ProtobufRpcInvoker: call()
activate ProtobufRpcInvoker
ProtobufRpcInvoker -> RPC.Server: getProtocolImpl()
activate RPC.Server
RPC.Server --> ProtobufRpcInvoker: BlockingService
deactivate RPC.Server
ProtobufRpcInvoker -> BlockingService: callBlockingMethod()
activate BlockingService
BlockingService --> ProtobufRpcInvoker: Message
deactivate BlockingService
ProtobufRpcInvoker --> RPC.Server: Message
deactivate ProtobufRpcInvoker
RPC.Server --> Handler: RpcResponseWrapper(Message)
deactivate RPC.Server
Handler -> Server: setUpResponse(RpcResponseWrapper)
Server -> Responder: doRespond(Call)
activate Responder
Responder --> Server: return
deactivate Responder
Server --> Handler: return
deactivate Server
end
group Responder Thread Loop
Responder -> SocketChannel: write
activate SocketChannel
SocketChannel --> Responder: return
deactivate SocketChannel
end
@enduml
2. ServerStub实现
ServerStub主要作用在于解析ClientStub发送的RPC请求并调用本地提供的服务。HDFS中ServerStub的角色由RpcInvoker的实现类来实现,RpcInvoker会在收到请求时按照protocol查找对应的实现类(即Server实现),然后将请求进行解析并调用实现类,并返回调用结果。
另外ProtobufRpcEngine在服务端和在客户端一样需要一个ClientNamenodeProtocolTranslatorPB作为适配器将底层ClientNamenodeProtocolPB转换成对ClientProtocol实现类的调用,这里就不再赘述。
3. 总结
了解了前两篇的内容后,客户端和服务端RPC通信链路可以完整的串联起来了:
@startuml
agent DFSClient <<Caller>>
rectangle "<<ClientStub>>" as clientStub {
agent ClientNamenodeProtocolTranslatorPB
agent "ClientProtocol$Proxy" as proxy
ClientNamenodeProtocolTranslatorPB -d-> proxy: ClientNamenodeProtocolPB
}
DFSClient -d-> ClientNamenodeProtocolTranslatorPB: ClientProtocol
agent Client <<NetClient>>
proxy -d-> Client: Writable \n (ProtonbufRpcEngine使用其子类RpcRequestWrapper,\n WritableRpcEngine使用其子类Invocation)
agent Server <<NetServer>>
Client -r-> Server: RpcRequestHeaderProto + Writable
rectangle "<<ServerStub>>" {
agent RpcInvoker
agent ClientNamenodeProtocolServerSideTranslatorPB
RpcInvoker -u-> ClientNamenodeProtocolServerSideTranslatorPB: ClientNamenodeProtocolPB
}
Server -u-> RpcInvoker: Writable
agent NamenodeRpcServer <<Callee>>
ClientNamenodeProtocolServerSideTranslatorPB -u-> NamenodeRpcServer: ClientProtocol
@enduml