AI智能
改变未来

从零开始实现简单 RPC 框架 8:网络通信之 Request-Response 模型

Netty 在服务端与客户端的网络通信中,使用的是异步双向通信(双工)的方式,即客户端和服务端可以相互主动发请求给对方,发消息后不会同步等响应。这样就会有一下问题:

  1. 如何识别消息是请求还是响应?
  2. 请求如何正确对应到响应?

1. 如何识别消息是请求还是响应

为了识别消息类型是请求或者响应,我们在消息中加入了

messageType

的属性,在上文我们也提到,这个消息类型在自定义协议的头部,他有几种类型:请求、响应、心跳,我们先来说说请求、响应。

public enum MessageType {/*** 普通请求*/REQUEST((byte) 1),/*** 普通响应*/RESPONSE((byte) 2),/*** 心跳*/HEARTBEAT((byte) 3),;private final byte value;}

请求(Request)的核心字段如下:

public class RpcRequest {/*** 接口名*/private String interfaceName;/*** 方法名*/private String methodName;/*** 参数列表*/private Object[] params;/*** 参数类型列表*/private Class<?>[] paramTypes;/*** 接口版本*/private String version;}

响应(Response)的核心字段如下:

public class RpcResponse<T> {/*** 请求id*/private long requestId;/*** 响应码*/private Integer code;/*** 提示消息*/private String message;/*** 响应数据*/private T data;}

发送消息的时候,按照消息类型和结构体,将数据组装好,写到 channel 即可。接收消息则要先解码,从消息头拿到消息类型,根据消息类型来反序列化到对应的结构体。

2. 请求如何正确对应到响应

流程图如下:有几个关键点:

  1. 客户端请求之后拿到 Future
  2. 有一个
    Map

    存放未响应的请求,

    Key: RequestId,Value: Future
  3. 服务端响应的数据中,包含了客户端的 RequestId,这是对应的关键
  4. 响应的结果会被
    NettyClientHandler.channelRead0

    监听到,根据响应的 RequestId 取出对应的 Future

  5. 将结果写到对应的 Future 中
  6. 客户端通过
    future.get()

    获取到数据

1) 客户端发请求

代码如下:

public class NettyInvoker extends AbstractInvoker {private final NettyClient nettyClient = NettyClient.getInstance();@Overrideprotected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException {// 获取 ChannelChannel channel = nettyClient.getChannel(socketAddress);// 构造一个空 FutureCompletableFuture<RpcResponse<?>> resultFuture = new CompletableFuture<>();// 构建 RPC 消息,此处会构建 requestIdRpcMessage rpcMessage = buildRpcMessage(request);// 将 request 和 Future 对应放到 Map 中UnprocessedRequests.put(rpcMessage.getRequestId(), resultFuture);// 发出请求channel.writeAndFlush(rpcMessage);// 返回结果return new AsyncResult(resultFuture);}// ...}

返回的

AsyncResult

只是

future

的包装。

public class AsyncResult implements RpcResult {private final CompletableFuture<?> future;public AsyncResult(CompletableFuture<?> future) {this.future = future;}}

2) 请求暂存

这个存储未响应的请求在

ccx-rpc

中是

UnprocessedRequests

类在管理:

public class UnprocessedRequests {private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>();public static void put(long requestId, CompletableFuture<RpcResponse<?>> future) {FUTURE_MAP.put(requestId, future);}}

3) 服务端响应数据监听

使用 Netty 的 Handler 监听服务端响应的数据,当有数据响应,则调用

UnprocessedRequests.complete

写入。

public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext context, RpcMessage requestMsg) {RpcResponse<?> response = (RpcResponse<?>) requestMsg.getData();UnprocessedRequests.complete(response);}}

UnprocessedRequests.complete

实际上就是找出并删除对应的请求,然后将数据写入:

future.complete(rpcResponse)

public class UnprocessedRequests {private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>();/*** 完成响应** @param rpcResponse 响应内容*/public static void complete(RpcResponse<?> rpcResponse) {CompletableFuture<RpcResponse<?>> future = FUTURE_MAP.remove(rpcResponse.getRequestId());if (future != null) {future.complete(rpcResponse);} else {throw new IllegalStateException("future is null. rpcResponse=" + JSONUtil.toJsonStr(rpcResponse));}}}

最后通过

AsyncResult.getData

可以获取到数据。

public class AsyncResult implements RpcResult {private final CompletableFuture<?> future;public AsyncResult(CompletableFuture<?> future) {this.future = future;}@Overridepublic Object getData() {try {return future.get();} catch (InterruptedException | ExecutionException e) {log.error("getData error.", e);}return null;}}

总结

Netty 网络通信是异步双工的,我们需要用正确 Request-Response 模型让客户端和服务端正确交互。

  1. 如何区分请求或响应?在消息中,可以加入 messageType 字段用来区分是请求或者响应。
  2. 如何把请求和响应对应?发出的请求需要用 RequestId 标记并用 Map 存起来。服务端收到请求之后,将 RequestId 原封不动写到响应结果中。客户端收到响应结果后,拿出 RequestId 找到对应的 Future 并写入结果。

ccx-rpc 代码已经开源Github:https://github.com/chenchuxin/ccx-rpcGitee:https://gitee.com/imccx/ccx-rpc

赞(0) 打赏
未经允许不得转载:爱站程序员基地 » 从零开始实现简单 RPC 框架 8:网络通信之 Request-Response 模型