一、心跳
什么是心跳
在 TPC 中,客户端和服务端建立连接之后,需要定期发送数据包,来通知对方自己还在线,以确保 TPC 连接的有效性。如果一个连接长时间没有心跳,需要及时断开,否则服务端会维护很多无用连接,浪费服务端的资源。
IdleStateHandler
Netty 已经为我们提供了心跳的 Handler:
IdleStateHandler
。当连接的空闲时间(读或者写)太长时,
IdleStateHandler
将会触发一个
IdleStateEvent
事件,传递的下一个 Handler。我们可以通过在 Pipeline Handler 中重写
userEventTrigged
方法来处理该事件,注意我们自己的 Handler 需要在
IdleStateHandler
后面。
下面我们来看看 IdleStateHandler 的源码。
1. 构造函数
最完整的构造函数如下:
public IdleStateHandler(boolean observeOutput,long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {}
参数解析:
-
observeOutput
:是否考虑出站时较慢的情况。如果 true:当出站时间太长,超过空闲时间,那么将不触发此次事件。如果 false,超过空闲时间就会触发事件。默认 false。
-
readerIdleTime
:读空闲的时间,0 表示禁用读空闲事件。
-
writerIdleTime
:写空闲的时间,0 表示禁用写空闲事件。
-
allIdleTime
:读或写空闲的时间,0 表示禁用事件。
-
unit
:前面三个时间的单位。
2. 事件处理
IdleStateHandler
继承
ChannelDuplexHandler
,重写了出站和入站的事件,我们来看看代码。当 channel 添加、注册、活跃的时候,会初始化
initialize(ctx)
,删除、不活跃的时候销毁
destroy()
,读写的时候设置
lastReadTime
和
lastWriteTime
字段。
public class IdleStateHandler extends ChannelDuplexHandler {@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isActive() && ctx.channel().isRegistered()) {initialize(ctx);}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {destroy();}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isActive()) {initialize(ctx);}super.channelRegistered(ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {initialize(ctx);super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {destroy();super.channelInactive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 判断是否开启 读空闲 或者 读写空闲 监控if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {// 设置 reading 标志位reading = true;firstReaderIdleEvent = firstAllIdleEvent = true;}ctx.fireChannelRead(msg);}// 读完成之后@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 判断是否开启 读空闲 或者 读写空闲 监控,检查 reading 标志位if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {// 设置 lastReadTime,后面判断读超时有用lastReadTime = ticksInNanos();reading = false;}ctx.fireChannelReadComplete();}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {// 判断是否开启 写空闲 或者 读写空闲 监控if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {// writeListener 的方法在下面,主要是设置 lastWriteTimectx.write(msg, promise.unvoid()).addListener(writeListener);} else {ctx.write(msg, promise);}}private final ChannelFutureListener writeListener = new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {lastWriteTime = ticksInNanos();firstWriterIdleEvent = firstAllIdleEvent = true;}};}
3. 初始化
当 channel 添加、注册、活跃的时候,会初始化
initialize(ctx)
,下面我们就来看看初始化的代码:
private void initialize(ChannelHandlerContext ctx) {// Avoid the case where destroy() is called before scheduling timeouts.// See: https://github.com/netty/netty/issues/143switch (state) {case 1:case 2:return;}state = 1;initOutputChanged(ctx);lastReadTime = lastWriteTime = ticksInNanos();if (readerIdleTimeNanos > 0) {readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),readerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (writerIdleTimeNanos > 0) {writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),writerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (allIdleTimeNanos > 0) {allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),allIdleTimeNanos, TimeUnit.NANOSECONDS);}}
其实初始化很简单,就是根据构造函数给的 读写空闲时间 去决定初始化哪些定时任务,分别是:
ReaderIdleTimeoutTask
(读空闲超时任务)、
WriterIdleTimeoutTask
(写空闲超时任务)、
AllIdleTimeoutTask
(读写空闲超时任务)。
4. 定时任务
我们来看看
ReaderIdleTimeoutTask
,剩下两个的原理跟
ReaderIdleTimeoutTask
差不多,感兴趣的同学自行阅读源码吧。
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {super(ctx);}@Overrideprotected void run(ChannelHandlerContext ctx) {// 查看是否超时long nextDelay = readerIdleTimeNanos;if (!reading) {nextDelay -= ticksInNanos() - lastReadTime;}if (nextDelay <= 0) {// 超时了,重新启动一个新的定时器,然后触发事件// Reader is idle - set a new timeout and notify the callback.readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);boolean first = firstReaderIdleEvent;firstReaderIdleEvent = false;try {// 构造事件IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);// 触发事件channelIdle(ctx, event);} catch (Throwable t) {ctx.fireExceptionCaught(t);}} else {// 没有超时,设置新的定时器,不过这次的时间是更短的时间// Read occurred before the timeout - set a new timeout with shorter delay.readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);}}}
从上面的代码可以看出:① 如果读空闲超时了,则重新起一个定时器,然后触发事件② 如果读空闲未超时,则新起一个时间更短(
readerIdleTimeNanos - ticksInNanos() - lastReadTime
)的定时器
5. 触发事件
上面的触发事件方法是:
channelIdle
,经过重重代码拨开,其实最终就是调用到了下面的代码:
private void invokeUserEventTriggered(Object event) {if (invokeHandler()) {try {// 触发事件,说白了,就是直接调用 userEventTriggered 方法而已((ChannelInboundHandler) handler()).userEventTriggered(this, event);} catch (Throwable t) {notifyHandlerException(t);}} else {fireUserEventTriggered(event);}}
其实触发事件,就是把事件传给下一个 Handler (
next
),就是调用
userEventTriggered
方法而已。所以我们处理心跳的 Handler 一定要写到
IdleStateHandler
。
ccx-rpc 心跳实现
1. 客户端
IdleStateHandler
放到启动类的
PipleLine
注册上,业务处理器
NettyClientHandler
在其后面。
public class NettyClient {// ... 忽略其他代码private NettyClient() {bootstrap = new Bootstrap()// ... 省略其他代码.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();// 设定 IdleStateHandler 心跳检测每 5 秒进行一次写检测// write()方法超过 5 秒没调用,就调用 userEventTriggerp.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));// 编码器p.addLast(new RpcMessageEncoder());// 解码器p.addLast(new RpcMessageDecoder());// 业务处理器p.addLast(new NettyClientHandler());}});}}
接下来我们来看看
NettyClientHandler
是如何处理心跳事件的:
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {// ... 忽略其他代码@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {// 根据上面的配置,超过 5 秒没有写请求,会触发 WRITER_IDLE 事件IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.WRITER_IDLE) {log.info("write idle happen [{}]", ctx.channel().remoteAddress());Channel channel = ctx.channel();// 触发写空闲事件后,就应该发心跳了。// 组装消息RpcMessage rpcMessage = new RpcMessage();rpcMessage.setSerializeType(SerializeType.PROTOSTUFF.getValue());rpcMessage.setCompressTye(CompressType.DUMMY.getValue());rpcMessage.setMessageType(MessageType.HEARTBEAT.getValue());// 发心跳消息channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, evt);}}}
2. 服务端
同样,服务端的
IdleStateHandler
放到启动类的
PipleLine
注册上,业务处理器
NettyServerHandler
在其后面。
public class NettyServerBootstrap {public void start() {ServerBootstrap bootstrap = new ServerBootstrap()// ... 忽略其他代码.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();// 30 秒之内没有收到客户端请求的话就关闭连接p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));// 编解码器p.addLast(new RpcMessageEncoder());p.addLast(new RpcMessageDecoder());// RPC 消息处理器p.addLast(serviceHandlerGroup, new NettyServerHandler());}});// ... 忽略其他代码}}
服务端收到超过 30 秒没有读请求的事件后,调用
ctx.close
将连接关闭。同时,如果收到了客户端发来的心跳消息,直接忽略即可。如果每个心跳都要去响应,会加重服务器的负担的。
NettyServerHandler
的代码如下
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcMessage requestMsg) {// 不处理心跳消息if (requestMsg.getMessageType() != MessageType.REQUEST.getValue()) {return;}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 处理空闲状态的if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.READER_IDLE) {log.info("idle check happen, so close the connection");ctx.close();}} else {super.userEventTriggered(ctx, evt);}}}
二、重连机制
很多时候服务端和客户端连接断开,仅仅是因为网络问题或者处理程序慢,并不是程序挂了。那么客户端想再发起请求,就发不出去了。此时需要一个功能:当发现连接断了之后,如果想往连接写数据,就自动重新连接上,这个就是重连机制。
客户端想请求服务端的接口,先从注册中心中,获得服务端的地址,然后跟服务端连接,然后写数据。简单代码如下:
protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException {// ... 忽略其他代码// 服务端地址InetSocketAddress socketAddress = new InetSocketAddress(selected.getHost(), selected.getPort());// 获取连接(Channel)Channel channel = nettyClient.getChannel(socketAddress);// 构建消息RpcMessage rpcMessage = buildRpcMessage(request);// 写消息(发请求)channel.writeAndFlush(rpcMessage);}
这个
nettyClient.getChannel(socketAddress)
是重连机制的秘密:
/*** 获取和指定地址连接的 channel,如果获取不到,则连接** @param address 指定要连接的地址* @return channel*/public Channel getChannel(SocketAddress address) {// 根据地址从缓存中获取 ChannelChannel channel = CHANNEL_MAP.get(address);// 如果获取不到,或者 channel 已经断开,则重连,然后放到 CHANNEL_MAP 缓存起来if (channel == null || !channel.isActive()) {// 连接channel = connect(address);CHANNEL_MAP.put(address, channel);}return channel;}
代码一目了然,就是使用了
CHANNEL_MAP
作为缓存,发现找不到或者已断开,就重新连接,然后放到
CHANNEL_MAP
中,以便下次获取。
总结
心跳是用于服务端和客户端保持有效连接的一种手段,客户端每隔一小段时间发一个心跳包,服务端收到之后不用响应,但是会记下客户端最后一次读的时间。服务器起定时器,定时检测客户端上次读请求的时间超过配置的值,超过就会触发事件,断开连接。重连机制是连接断开之后,要使用的时候自动重连的机制。
心跳和重连机制,结合起来让服务端和客户端的连接使用更加合理,该断开的断开节省服务端资源,该重连的重连提高可用性。
ccx-rpc 代码已经开源Github:https://github.com/chenchuxin/ccx-rpcGitee:https://gitee.com/imccx/ccx-rpc