Spark源码笔记之-RPC框架

1. 核心抽象

Spark是一个分布式计算框架,job的执行过程一定会涉跨节点的方法调用(心跳,任务分发,block管理等),Spark RPC的实现可大致归纳为以下3个接口(抽象类):

  • RpcEnv:接收来自RpcEndpointRef发送的数据,并将数据转发至对应的RpcEndpoint,是整个RPC框架的协调中心
  • RpcEndpoint:定义了从连接建立到收到消息时所需要触发功能的接口。RpcEndpoint启动时需要注册到RpcEnv,用于对外暴露服务
  • RpcEndpointRef:对一个远程RpcEndpoint的引用,用于调用远程服务

2.具体实现

Spark通过继承和实现上述核心抽象,并采用Netty作为数据传输框架,实现了其内部的RPC通信。

2.1 NettyRpcEndpointRef

NettyRpcEndpointRef实现了一系列RpcEndpointRef中定义的调用远程RpcEndpoint的方法(例如:ask, send等),其具体实现较为简单,都是调用NettyRpcEnv的对应方法。
另外NettyRpcEndpointRef在RpcEndpoint所在节点和在其他节点的作用也不同:

  • 在RpcEndpoint所在节点NettyRpcEndpointRef只是对RpcEndpointAddress的简单封装。
  • 在其他节点上,NettyRpcEndpointRef则还保存了底层TransportClient的引用,避免重新创建连接。(是谁将NettyRpcEndpointRef对象序列化后发送到客户端的?

2.2 NettyRpcEnv

为了实现RpcEnv中所定义的方法,NettyRpcEnv引入了几个新的组件:Inbox、Outbox、Dispatcher、RequestMessage,(当然还有其他的一些类,由于涉及底层和Netty的交互,所以这里不再赘述)下面来进行逐一分析。

RequestMessage

private[netty] class RequestMessage(
    val senderAddress: RpcAddress,
    val receiver: NettyRpcEndpointRef,
    val content: Any) {
	...
}

RequestMessage是一个消息封装的类,将发送者的地址、接收者的引用、消息内容进行了封装,从NettyRpcEndpointRef发送至RpcEnv的消息都会被封装成此类

Outbox

Outbox的作用可以按其字面意思理解,即:类似于发件箱,NettyRpcEnv为每一个地址创建一个Outbox。NettyRpcEndpointRef调用send/ask发送消息,消息经过NettyRpcEnv放入到Outbox中,Outbox内部维护了一个链表用来存储需要发送的消息,同时维护了一个TransportClient来负责最终的数据发送,当client不存在时会自动创建。

多个线程同时post消息到Outbox时其中一个线程负责将队列中所有消息使用TransportClient进行发送。

Inbox

Inbox和Outbox内部结构类似,但作用刚好相反,本地的每个RpcEndpoint都有一个Inbox,在注册到RpcEnv时创建。Inbox负责将发送给RpcEndpoint的数据暂存到队列,最终线程安全的发送给RpcEndpoint。

Dispatcher

NettyRpcEnv的核心功能就是将消息转发给对应Endpoint的Inbox,这个功能就是Dispatcher来实现的。

Dispatcher内部维护了两个map,分别存储RpcEndpoint和RpcEndpointRef相关信息,RpcEndpoint向RpcEnv注册时会更新这两个map,这样就可以通过RpcEndpoint的name来找到对应的RpcEndpoint。

另外Dispatcher内置了线程池和一个消息队列,新消息在放入Inbox时还会将消息对应的Endpoint信息放入队列中,线程池中的线程不断的消费队列,并调用Inbox的process方法将消息发送到RpcEndopint。

2.3 RpcEndpoint

Spark中定义了很多RpcEndpoint用来实现各种功能的集群内部通信,其中较为简单的一个是:HeartbeatReceiver,其作用是:运行在Driver端,用于接收集群中Executor的心跳信息,并不断检测和移除心跳超时的Executor。

下面通过一张图来介绍HeartbeatReceiver的通信流程:

2.4 补充内容

spark1.6之前基于akka的通信模型中,rpc的客户端和服务端都需要进行端口监听,消息必须发送到该端口,而很多情况下客户端是主动连接服务端的,客户端没有必要监听一个固定端口单独接收服务端发送的数据。所以在接收方如果reveiver(NettyRpcEndpointRef)中的TransportClient不为空则直接由TransportClient进行发送,而不是交给Outbox,实现了连接的复用。
https://issues.apache.org/jira/browse/SPARK-10997