HDFS学习笔记之:(2)RPC服务端实现

上一篇介绍了RPC客户端的核心实现,这一篇继续来看服务端的实现。服务端的NetServer由Server类及其子类实现,而ServerStub则由RpcInvoker接口及其实现类来实现。

@startuml
    'skinparam linetype ortho
    abstract "Server" as ServerTop #Yellow
    class RPC
    abstract Server {

    }
    ServerTop <|-d- Server
    RPC +-- Server
    interface RpcInvoker #Yellow {
        call(Server server, String protocol, Writable rpcRequest, long receiveTime)
    }
    RPC +-- RpcInvoker
    class WritableRpcEngine
    class ProtobufRpcEngine


    class "Server" as Server1 
    class "Server" as Server2

    WritableRpcEngine +-d- Server1
    ProtobufRpcEngine +-d- Server2

    Server1 .u.|> Server
    Server2 .u.|> Server
    Server1 +-d- WritableRpcInvoker
    Server2 +-d- ProtobufRpcInvoker
    WritableRpcInvoker .u.|> RpcInvoker
    ProtobufRpcInvoker .u|> RpcInvoker

    class WritableRpcInvoker
    class ProtobufRpcInvoker
@enduml

1. NetServer实现

Server抽象类包含了HDFS对于NetServer实现的主要内容,其子类RPC.Server以及RpcEngine.Server只是增加了:引擎与协议映射注册,RpcInvoker的实现,RpcInvoker与协议映射注册。

1.1 Reactor模型实现

为了保证服务端性能Server并没有像Client端一样为每一个连接创建一个线程,而是使用Reactor模型来优化性能。

@startuml
    'skinparam linetype ortho
    'skinparam linetype polyline
    rectangle Clients {
        cloud Client1        
        cloud Client2        
        cloud ClientN
        Client1 -[hidden]d-> Client2        
        Client2 -[hidden]d-> ClientN        
    }
    Clients .[hidden]r.> Server

    rectangle Server {
        agent Listener
        collections Readers 
        collections Handlers
        agent Responder
        queue CallQueue
        Listener -d-> Readers: 2. 注册readSelector到新SocketChannel
        Readers -r-> CallQueue: 4. 请求加入CallQueue
        CallQueue -d-> Handlers: 5.消费并处理请求
        Handlers -l-> Responder: 6. Handler线程未能发送完响应的连接
    }

    Responder .[norank]u.> Client1
    Responder .[norank]u.> Client2
    Responder .[norank]l.> ClientN

    Client1 .u.> Listener: 1. acceptSelector.select()
    ClientN .u.> Listener
    Client2 .u.> Listener
    
@enduml

如上图所示:

  • Listener负责监听新建立的连接,并将连接绑定到一组Reader中的某一个
  • 每个Reader通过readSelector监听到读取事件时会进行数据读取,读取到一个完整请求后将请求加入队列中
  • Handlers负责从队列中不断消费和处理请求,并将最终结果写入OutputStream。没有写入完成的会注册到Responder。
  • Responder负责将监听可写入事件,将Handlers没有写入完成的数据继续写入(典型场景:网络抖动)
    这部分的实现是使用Java自带的NIO实现,大家可自行了解。

1.2 Server创建流程

以Namenode为例,Namenode在启动时会创建底层NetServer,启动流程如下:

@startuml

NameNode -> NameNode: createRpcServer()
NameNode ->NameNodeRpcServer: new()
activate NameNodeRpcServer
NameNodeRpcServer -> RPC: setProtocolEngine()
Note over RPC: 1.创建各协议BlockingService
NameNodeRpcServer -> BlockingService: new(ClientNamenodeProtocolServersideTranslatorPB)
Note over RPC, BlockingService: 2.创建clientRpcServer和serviceRpcServer
NameNodeRpcServer ->RPC.Builder: build()
RPC.Builder ->ProtobufRpcEngine: getServer()

ProtobufRpcEngine ->Server: new()
Server ->Listener: new()
Listener ->Reader: new()
Reader -->Listener: Reader
Listener -> Reader: start()
loop doRunLoop()新线程读取请求
    Reader -> Reader: doRead()
    Reader ->Selector: select() & selectedKeys()
    Selector -->Reader: Iterator<SelectionKey>
end
Listener -->Server: Listener
Server ->Handler: new()
Handler -->Server: Handler 
Server ->Responder: new()
Responder -->Server: Responder
Server -->ProtobufRpcEngine: Server
ProtobufRpcEngine -->RPC.Builder: Server
RPC.Builder -->NameNodeRpcServer: Server
Note over RPC: 3.注册剩余支持的协议及BlockingService
NameNodeRpcServer -> RPC: setProtocolEngine()
NameNodeRpcServer -> Server: addProtocl()
NameNodeRpcServer -->NameNode: NameNodeRpcServer
deactivate NameNodeRpcServer

@enduml

可以看到NamNodeRpcServer启动了两个Server:clientRpcServer和serviceRpcServer,分别用于处理客户端请求和Datanode请求。

1.3 Server启动:

Server启动流程较为简单,因为大部分实现都是线程类,直接启动线程即可:

@startuml

NameNode -> NameNode: startCommonServices()
NameNode -> NameNodeRpcServer: start()
activate NameNodeRpcServer
Note over Server: 4.启动clientRpcServer serviceRpcServer
NameNodeRpcServer -> Server: start()
activate Server
Server -> Responder: Thread.start()
activate Responder
Responder --> Server: void
deactivate Responder
Server -> Listener: Thread.start()
activate Listener
Listener --> Server: void
deactivate Listener
Server -> Handler: Thread.start()
activate Handler
Handler --> Server: void
deactivate Handler
Server --> NameNodeRpcServer: void
deactivate Server
NameNodeRpcServer --> NameNode
deactivate NameNodeRpcServer
@enduml

1.4 Server请求处理

Listener实例创建时会注册一个acceptSelector到ServerSocketChannel,通过监听该channel的accept事件来分配连接到Reader:

@startuml

    loop 新线程通过acceptSelector获取新io事件
        Listener -> Selector: select() && selectedKeys()
        activate Selector
        Selector --> Listener: SelectionKey
        deactivate Selector
        Listener -> Listener: doAccept()
        Listener -> Reader: 轮询获取一个Reader,addConnection()加入其pendingConnections
    end
@enduml

分配到Reader的channel会加入pendingConnections,reader会不断读取该队列并注册readSelector到该队列,监听到OP_READ事件后读取数据,读取到一个完整请求后将其加入队列

@startuml

    loop 新线程通过readSelector获取新io事件
        Reader -> Channel: register()将readSelector注册到pendingConnections中的连接
        activate Channel
        Channel --> Reader: return
        deactivate Channel
        Reader -> Selector: select() && selectedKeys()
        activate Selector
        Selector --> Reader: SelectionKey
        deactivate Reader
        deactivate Selector
        Reader -> Reader: doRead()
        Reader -> Call: new(RpcRequestHeader, RpcRequestWrapper)
        activate Call
        Call --> Reader: Call
        deactivate Call
        Reader -> CallQueueManager: put(Call)
        activate CallQueueManager
        CallQueueManager --> Reader: return
        deactivate CallQueueManager
    end
@enduml

Handler从队列中取出请求进行处理,请求处理完后写入响应,但如果响应数据较大或者当前网络状况不佳则可能并不能完全发送完成,所以这里引入了Responder,Handler会将writeSelector注册到未能完全写入的channel,Responder线程则会根据writeSelector筛选出当前可写的channel继续写入剩余的响应数据

@startuml
    loop 新线程通过CallQueueManager获取新Call
        Handler -> CallQueueManager: take()
        activate CallQueueManager
        CallQueueManager --> Handler: Call
        deactivate CallQueueManager
        Handler -> RPC.Server: call(Call)
        activate RPC.Server
        RPC.Server -> Server: getRpcInvoker()
        activate Server
        Server --> RPC.Server: ProtobufRpcInvoker
        RPC.Server -> ProtobufRpcInvoker: call()
        activate ProtobufRpcInvoker
        ProtobufRpcInvoker -> RPC.Server: getProtocolImpl()
        activate RPC.Server
        RPC.Server --> ProtobufRpcInvoker: BlockingService
        deactivate RPC.Server
        ProtobufRpcInvoker -> BlockingService: callBlockingMethod()
        activate BlockingService
        BlockingService --> ProtobufRpcInvoker: Message
        deactivate BlockingService
        ProtobufRpcInvoker --> RPC.Server: Message
        deactivate ProtobufRpcInvoker
        RPC.Server --> Handler: RpcResponseWrapper(Message)
        deactivate RPC.Server
        Handler -> Server: setUpResponse(RpcResponseWrapper)
        Server -> Responder: doRespond(Call)
        activate Responder
        Responder --> Server: return
        deactivate Responder
        Server --> Handler: return
        deactivate Server
    end
        group Responder Thread Loop
            Responder -> SocketChannel: write
            activate SocketChannel
            SocketChannel --> Responder: return
            deactivate SocketChannel 
        end 
@enduml

2. ServerStub实现

ServerStub主要作用在于解析ClientStub发送的RPC请求并调用本地提供的服务。HDFS中ServerStub的角色由RpcInvoker的实现类来实现,RpcInvoker会在收到请求时按照protocol查找对应的实现类(即Server实现),然后将请求进行解析并调用实现类,并返回调用结果。

另外ProtobufRpcEngine在服务端和在客户端一样需要一个ClientNamenodeProtocolTranslatorPB作为适配器将底层ClientNamenodeProtocolPB转换成对ClientProtocol实现类的调用,这里就不再赘述。

3. 总结

了解了前两篇的内容后,客户端和服务端RPC通信链路可以完整的串联起来了:

@startuml

agent DFSClient <<Caller>>

rectangle "<<ClientStub>>" as clientStub {
    agent ClientNamenodeProtocolTranslatorPB
    agent "ClientProtocol$Proxy" as proxy 

    ClientNamenodeProtocolTranslatorPB -d-> proxy: ClientNamenodeProtocolPB
}


DFSClient -d-> ClientNamenodeProtocolTranslatorPB: ClientProtocol

agent Client <<NetClient>>
proxy -d-> Client: Writable \n (ProtonbufRpcEngine使用其子类RpcRequestWrapper,\n WritableRpcEngine使用其子类Invocation)

agent Server <<NetServer>>

Client -r-> Server: RpcRequestHeaderProto + Writable
rectangle "<<ServerStub>>" {
    agent RpcInvoker
    agent ClientNamenodeProtocolServerSideTranslatorPB
    RpcInvoker -u-> ClientNamenodeProtocolServerSideTranslatorPB: ClientNamenodeProtocolPB
}
Server -u-> RpcInvoker: Writable
agent NamenodeRpcServer <<Callee>>
ClientNamenodeProtocolServerSideTranslatorPB -u-> NamenodeRpcServer: ClientProtocol
@enduml