TCP 粘包和拆包
- TCP 粘包和拆包基本介绍
- TCP 粘包和拆包解决方案
- 案例
- 要求
- 代码
TCP 粘包和拆包基本介绍
-
TCP 是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的 socket, 因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔 较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于 分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
-
由于 TCP 无消息保护边界, 需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题, 看一张图
-
示意图 TCP 粘包、拆包图解
假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以 下四种情况:
- 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包
- 服务端一次接受到了两个数据包,D1 和 D2 粘合在一起,称之为 TCP 粘包
- 服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
- 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部 分内容 D1_2 和完整的 D2 包
TCP 粘包和拆包解决方案
-
使用自定义协议 + 编解码器 来解决
-
关键就是要解决 服务器端每次读取数据长度的问题, 这个问题解决,就不会出现服务器多读或少读数据的问 题,从而避免的 TCP 粘包、拆包 。
案例
要求
-
要求客户端发送 5 个 Message 对象, 客户端每次发送一个 Message 对象
-
服务器端每次接收一个 Message, 分 5 次进行解码, 每读取到 一个 Message , 会回复一个 Message 对象 给客 户端.
代码
- 创建实体类解析包
@Datapublic class MessageProtocol {private int len;//关键 字节数组的长度private byte[] content; //信息的字节数组}
- 创建编码器
/*** 编码器,将业务数据编码成二进制文件*/public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception {System.out.println(\"MyMessageEncoder encode 方法被调用\");//将实体类的数据 写入缓存区中byteBuf.writeInt(messageProtocol.getLen());byteBuf.writeBytes(messageProtocol.getContent());}}
- 创建解码器
/*** 解码器,将二进制文件解码成实体类对象*/public class MyMessageDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {System.out.println(\"MyMessageDecoder decode 被调用\");//需要将得到二进制字节码-> MessageProtocol 数据包(对象)int length = byteBuf.readInt();byte[] content = new byte[length];byteBuf.readBytes(content);//封装成 MessageProtocol 对象,放入 out, 传递下一个 handler 业务处理MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);list.add(messageProtocol);}}
- 创建服务器handler
/*** 处理业务的handler*/public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //cause.printStackTrace();ctx.close();}//对读取的数据处理@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {//接受到数据 解析数据int len = msg.getLen();byte[] content = msg.getContent();System.out.println(\"-----------\");System.out.println(\"服务器接收到信息如下\");System.out.println(\"长度=\" + len);System.out.println(\"内容=\" + new String(content, Charset.forName(\"utf-8\")));System.out.println(\"服务器接收到消息包数量=\" + (++this.count));//回复消息String str = UUID.randomUUID().toString();byte[] bytes = str.getBytes(Charset.forName(\"utf-8\"));MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(bytes.length);messageProtocol.setContent(bytes);//利用上下文读取且将刷新缓冲区ctx.writeAndFlush(messageProtocol);}}
- 创建客户端handler
/*** 客户端处理Handler*/public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;//连接请求时候触发@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//使用客户端发送 10 条数据for (int i = 0; i < 5; i++) {String msg = \"天太热了\";byte[] bytes = msg.getBytes(Charset.forName(\"utf-8\"));int len = bytes.length;//创建协议包对象MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setContent(bytes);messageProtocol.setLen(len);ctx.writeAndFlush(messageProtocol);}}//读取信息处理@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {int len = msg.getLen();byte[] content = msg.getContent();System.out.println(\"客户端收到了信息\");System.out.println(\"消息长度\" + len);System.out.println(\"消息内容\" + new String(content, Charset.forName(\"utf-8\")));System.out.println(\"客户端接收消息数量=\" + (++this.count));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(\"异常消息=\" + cause.getMessage());ctx.close();}}
- 创建服务器
public class MyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new MyMessageDecoder());ch.pipeline().addLast(new MyMessageEncoder());ch.pipeline().addLast(new MyServerHandler());}});ChannelFuture channelFuture = bootstrap.bind(8000).sync();channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}}
- 创建客户端
public class MyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new MyMessageEncoder());ch.pipeline().addLast(new MyMessageDecoder());ch.pipeline().addLast(new MyClientHandler());}});ChannelFuture channelFuture = bootstrap.connect(\"127.0.0.1\", 8000).sync();channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}}}
总结
避免网络传送过程中造成的粘包和拆包的最主要手段,利用 使用自定义协议 + 编解码器 ,在本次案例中,利用在传输过程中的解析包实体类的len长度,来确定一个消息的长度,来作为解析时候解析消息的长度来判断是否粘包和拆包