提示:阅读本文前最好先阅读:

* 《Spark2.1.0之内置RPC框架》
<https://blog.csdn.net/beliefer/article/details/80799622>
* 《spark2.1.0之源码分析——RPC配置TransportConf》
<https://blog.csdn.net/beliefer/article/details/80888076>
* 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
<https://blog.csdn.net/beliefer/article/details/80981101>
* 《spark2.1.0之源码分析——RPC服务器TransportServer》
<https://blog.csdn.net/beliefer/article/details/81062342>
* 《spark2.1.0之源码分析——RPC管道初始化》
<https://blog.csdn.net/beliefer/article/details/81197447>
* 《spark2.1.0之源码分析——RPC传输管道处理器详解
<https://blog.csdn.net/beliefer/article/details/81326016>》
* 《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解
<https://blog.csdn.net/beliefer/article/details/81624875>》
* 《spark2.1.0之源码分析——RPC服务端引导程序TransportServerBootstrap
<https://blog.csdn.net/beliefer/article/details/81867045>》
         在《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解
<https://blog.csdn.net/beliefer/article/details/81624875>
》一文曾介绍过服务端RpcHandler对请求消息的处理,现在来看看客户端发送RPC请求的原理。我们在分析《spark2.1.0之源码分析——RPC管道初始化》
<https://blog.csdn.net/beliefer/article/details/81197447>
中列出的代码清单2中的createChannelHandler方法时,看到调用了TransportClient的构造器(见代码清单1),其中TransportResponseHandler的引用将赋给handler属性。

代码清单1         TransportClient的构造器
public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel); this.handler =
Preconditions.checkNotNull(handler); this.timedOut = false; }
TransportClient一共有五个方法用于发送请求,分别为:

* fetchChunk:从远端协商好的流中请求单个块;
* stream:使用流的ID,从远端获取流数据;
* sendRpc:向服务端发送RPC的请求,通过At least Once Delivery原则保证请求不会丢失;
* sendRpcSync:向服务端发送异步的RPC的请求,并根据指定的超时时间等待响应;
* send:向服务端发送RPC的请求,但是并不期望能获取响应,因而不能保证投递的可靠性;
本节只选择最常用的sendRpc和fetchChunk进行分析,其余实现都可以触类旁通。

发送RPC请求

         sendRpc方法的实现见代码清单2。

代码清单2         sendRpc的实现
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
final long startTime = System.currentTimeMillis(); if (logger.isTraceEnabled())
{ logger.trace("Sending RPC to {}", getRemoteAddress(channel)); } //
使用UUID生成请求主键requestId final long requestId =
Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);//
添加requestId与RpcResponseCallback的引用之间的关系 // 发送RPC请求 channel.writeAndFlush(new
RpcRequest(requestId, new NioManagedBuffer(message))).addListener( new
ChannelFutureListener() { @Override public void operationComplete(ChannelFuture
future) throws Exception { if (future.isSuccess()) { long timeTaken =
System.currentTimeMillis() - startTime; if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", requestId,
getRemoteAddress(channel), timeTaken); } } else { String errorMsg =
String.format("Failed to send RPC %s to %s: %s", requestId,
getRemoteAddress(channel), future.cause()); logger.error(errorMsg,
future.cause()); handler.removeRpcRequest(requestId); channel.close(); try {
callback.onFailure(new IOException(errorMsg, future.cause())); } catch
(Exception e) { logger.error("Uncaught exception in RPC response callback
handler!", e); } } } }); return requestId; }
结合代码清单2,我们知道sendRpc方法的实现步骤如下:

* 使用UUID生成请求主键requestId;
*
调用addRpcRequest向handler(特别提醒下读者这里的handler不是RpcHandler,而是通过TransportClient构造器传入的TransportResponseHandler)添加requestId与回调类RpcResponseCallback的引用之间的关系。TransportResponseHandler的addRpcRequest方法(见代码清单3)将更新最后一次请求的时间为当前系统时间,然后将requestId与RpcResponseCallback之间的映射加入到outstandingRpcs缓存中。outstandingRpcs专门用于缓存发出的RPC请求信息。
* 调用Channel的writeAndFlush方法将RPC请求发送出去,这和在《
spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解
<https://blog.csdn.net/beliefer/article/details/81624875>
》一文列出的代码清单7中服务端调用的respond方法响应客户端的一样,都是使用Channel的writeAndFlush方法。当发送成功或者失败时会回调ChannelFutureListener的operationComplete方法。如果发送成功,那么只会打印requestId、远端地址及花费时间的日志,如果发送失败,除了打印错误日志外,还要调用TransportResponseHandler的removeRpcRequest方法(见代码清单4)将此次请求从outstandingRpcs缓存中移除。
代码清单3        添加RPC请求到缓存
public void addRpcRequest(long requestId, RpcResponseCallback callback) {
updateTimeOfLastRequest(); outstandingRpcs.put(requestId, callback); }
代码清单4         从缓存中删除RPC请求
public void removeRpcRequest(long requestId) {
outstandingRpcs.remove(requestId); }
请求发送成功后,客户端将等待接收服务端的响应。根据《spark2.1.0之源码分析——RPC管道初始化》
<https://blog.csdn.net/beliefer/article/details/81197447>
一文的图1,返回的消息也会传递给TransportChannelHandler的channelRead方法(见《
spark2.1.0之源码分析——RPC传输管道处理器详解
<https://blog.csdn.net/beliefer/article/details/81326016>
》一文的代码清单1),根据之前的分析,消息的分析将最后交给TransportResponseHandler的handle方法来处理。TransportResponseHandler的handle方法分别对《
spark2.1.0之源码分析——RPC传输管道处理器详解
<https://blog.csdn.net/beliefer/article/details/81326016>》一文的图2中的六种 <>
ResponseMessage进行处理,由于服务端使用processRpcRequest方法(见《
spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解
<https://blog.csdn.net/beliefer/article/details/81624875>
》一文的代码清单4)处理RpcRequest类型的消息后返回给客户端的消息为RpcResponse或RpcFailure,所以我们来看看客户端的TransportResponseHandler的handle方法是如何处理RpcResponse和RpcFailure,见代码清单5。

代码清单5         RpcResponse和RpcFailure消息的处理
} else if (message instanceof RpcResponse) { RpcResponse resp = (RpcResponse)
message; RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);//
获取RpcResponseCallback if (listener == null) { logger.warn("Ignoring response
for RPC {} from {} ({} bytes) since it is not outstanding", resp.requestId,
getRemoteAddress(channel), resp.body().size()); } else {
outstandingRpcs.remove(resp.requestId); try {
listener.onSuccess(resp.body().nioByteBuffer()); } finally {
resp.body().release(); } } } else if (message instanceof RpcFailure) {
RpcFailure resp = (RpcFailure) message; RpcResponseCallback listener =
outstandingRpcs.get(resp.requestId); // 获取RpcResponseCallback if (listener ==
null) { logger.warn("Ignoring response for RPC {} from {} ({}) since it is not
outstanding", resp.requestId, getRemoteAddress(channel), resp.errorString); }
else { outstandingRpcs.remove(resp.requestId); listener.onFailure(new
RuntimeException(resp.errorString)); }
从代码清单5看到,处理RpcResponse的逻辑为:

*
使用RpcResponse对应的RpcRequest的主键requestId,从outstandingRpcs缓存中获取注册的RpcResponseCallback,此处的RpcResponseCallback即为代码清单2中传递给sendRpc方法的RpcResponseCallback;
* 移除outstandingRpcs缓存中requestId和RpcResponseCallback的注册信息;
*
调用RpcResponseCallback的onSuccess方法,处理成功响应后的具体逻辑。这里的RpcResponseCallback需要各个使用TransportClient的sendRpc方法的场景中分别实现;
* 最后释放RpcResponse的body,回收资源。
处理RpcFailure的逻辑为:

*
使用RpcFailure对应的RpcRequest的主键requestId,从outstandingRpcs缓存中获取注册的RpcResponseCallback,此处的RpcResponseCallback即为代码清单2中传递给sendRpc方法的RpcResponseCallback;
* 移除outstandingRpcs缓存中requestId和RpcResponseCallback的注册信息;
*
调用RpcResponseCallback的onFailure方法,处理失败响应后的具体逻辑。这里的RpcResponseCallback需要在使用TransportClient的sendRpc方法时指定或实现。
发送获取块请求

         fetchChunk的实现见代码清单6。

代码清单6         fetchChunk的实现
public void fetchChunk( long streamId, final int chunkIndex, final
ChunkReceivedCallback callback) { final long startTime =
System.currentTimeMillis(); if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex,
getRemoteAddress(channel)); } final StreamChunkId streamChunkId = new
StreamChunkId(streamId, chunkIndex);// 创建StreamChunkId //
添加StreamChunkId与ChunkReceivedCallback之间的对应关系
handler.addFetchRequest(streamChunkId, callback); // 发送块请求
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener( new
ChannelFutureListener() { @Override public void operationComplete(ChannelFuture
future) throws Exception { if (future.isSuccess()) { long timeTaken =
System.currentTimeMillis() - startTime; if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken); } } else { String errorMsg =
String.format("Failed to send request %s to %s: %s", streamChunkId,
getRemoteAddress(channel), future.cause()); logger.error(errorMsg,
future.cause()); handler.removeFetchRequest(streamChunkId); channel.close();
try { callback.onFailure(chunkIndex, new IOException(errorMsg,
future.cause())); } catch (Exception e) { logger.error("Uncaught exception in
RPC response callback handler!", e); } } } }); }
结合代码清单6,我们知道fetchChunk方法的实现步骤如下:

* 使用流的标记streamId和块的索引chunkIndex创建StreamChunkId;
*
调用addFetchRequest向handler(特别提醒下读者这里的handler不是RpcHandler,而是通过TransportClient构造器传入的TransportResponseHandler)添加StreamChunkId与回调类ChunkReceivedCallback的引用之间的关系。TransportResponseHandler的addFetchRequest方法(见代码清单7)将更新最后一次请求的时间为当前系统时间,然后将StreamChunkId与ChunkReceivedCallback之间的映射加入到outstandingFetches缓存中。outstandingFetches专门用于缓存发出的块请求信息。
* 调用Channel的writeAndFlush方法将块请求发送出去,这和在《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解
<https://blog.csdn.net/beliefer/article/details/81624875>
》一文列出的代码清单7中服务端调用的respond方法响应客户端的一样,都是使用Channel的writeAndFlush方法。当发送成功或者失败时会回调ChannelFutureListener的operationComplete方法。如果发送成功,那么只会打印StreamChunkId、远端地址及花费时间的日志,如果发送失败,除了打印错误日志外,还要调用TransportResponseHandler的removeFetchRequest方法(见代码清单8)将此次请求从outstandingFetches缓存中移除。
代码清单7         添加块请求到缓存
public void addFetchRequest(StreamChunkId streamChunkId,
ChunkReceivedCallback callback) { updateTimeOfLastRequest();
outstandingFetches.put(streamChunkId, callback); }
代码清单8         从缓存中删除块请求
public void removeFetchRequest(StreamChunkId streamChunkId) {
outstandingFetches.remove(streamChunkId); }
请求发送成功后,客户端将等待接收服务端的响应。根据《spark2.1.0之源码分析——RPC管道初始化》
<https://blog.csdn.net/beliefer/article/details/81197447>
一文的图1,返回的消息也会传递给TransportChannelHandler的channelRead方法(见《
spark2.1.0之源码分析——RPC传输管道处理器详解
<https://blog.csdn.net/beliefer/article/details/81326016>
》一文的代码清单1),根据之前的分析,消息的分析将最后交给TransportResponseHandler的handle方法来处理。TransportResponseHandler的handle方法分别对《
spark2.1.0之源码分析——RPC传输管道处理器详解
<https://blog.csdn.net/beliefer/article/details/81326016>
》一文的图2中的六种处理结果进行处理,由于服务端使用processFetchRequest方法(见《
spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解
<https://blog.csdn.net/beliefer/article/details/81624875>
》一文的代码清单3)处理ChunkFetchRequest类型的消息后返回给客户端的消息为ChunkFetchSuccess或ChunkFetchFailure,所以我们来看看客户端的TransportResponseHandler的handle方法是如何处理ChunkFetchSuccess和ChunkFetchFailure,见代码清单9

代码清单9         ChunkFetchSuccess和ChunkFetchFailure消息的处理
if (message instanceof ChunkFetchSuccess) { ChunkFetchSuccess resp =
(ChunkFetchSuccess) message; ChunkReceivedCallback listener =
outstandingFetches.get(resp.streamChunkId); if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not
outstanding", resp.streamChunkId, getRemoteAddress(channel));
resp.body().release(); } else { outstandingFetches.remove(resp.streamChunkId);
listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
resp.body().release(); } } else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message; ChunkReceivedCallback
listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) {
logger.warn("Ignoring response for block {} from {} ({}) since it is not
outstanding", resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
} else { outstandingFetches.remove(resp.streamChunkId);
listener.onFailure(resp.streamChunkId.chunkIndex, new
ChunkFetchFailureException( "Failure while fetching " + resp.streamChunkId + ":
" + resp.errorString)); } }
从代码清单9看到,处理ChunkFetchSuccess的逻辑为:

*
使用ChunkFetchSuccess对应的StreamChunkId,从outstandingFetches缓存中获取注册的ChunkReceivedCallback,此处的ChunkReceivedCallback即为代码清单6中传递给fetchChunk方法的ChunkReceivedCallback;
* 移除outstandingFetches缓存中StreamChunkId和ChunkReceivedCallback的注册信息;
*
调用ChunkReceivedCallback的onSuccess方法,处理成功响应后的具体逻辑。这里的ChunkReceivedCallback需要各个使用TransportClient的fetchChunk方法的场景中分别实现;
* 最后释放ChunkFetchSuccess的body,回收资源。
处理ChunkFetchFailure的逻辑为:

*
使用ChunkFetchFailure对应的StreamChunkId,从outstandingFetches缓存中获取注册的ChunkReceivedCallback,此处的ChunkReceivedCallback即为代码清单6中传递给fetchChunk方法的ChunkReceivedCallback;
* 移除outstandingFetches缓存中StreamChunkId和ChunkReceivedCallback的注册信息;
*
调用ChunkReceivedCallback的onFailure方法,处理失败响应后的具体逻辑。这里的ChunkReceivedCallback需要各个使用TransportClient的fetchChunk方法的场景中分别实现。
在详细介绍了TransportClient和TransportResponseHandler之后,对于客户端我们就可以扩展
《spark2.1.0之源码分析——RPC管道初始化》
<https://blog.csdn.net/beliefer/article/details/81197447>
一文的图1,把TransportResponseHandler及TransportClient的处理流程增加进来,如下图所示。
客户端请求、响应流程图

上图中的序号①表示调用TransportResponseHandler的addRpcRequest方法(或addFetchRequest方法)将更新最后一次请求的时间为当前系统时间,然后将requestId与RpcResponseCallback之间的映射加入到outstandingRpcs缓存中(或将StreamChunkId与ChunkReceivedCallback之间的映射加入到outstandingFetches缓存中)。②表示调用Channel的writeAndFlush方法将RPC请求发送出去。图中的虚线表示当TransportResponseHandler处理RpcResponse和RpcFailure时将从outstandingRpcs缓存中获取此请求对应的RpcResponseCallback(或处理ChunkFetchSuccess和ChunkFetchFailure时将从outstandingFetches缓存中获取StreamChunkId对应的ChunkReceivedCallback),并执行回调。此外,TransportClientBootstrap将可能存在于上图中任何两个组件的箭头连线中间。

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:



 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html <https://item.jd.com/12302500.html>

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信