Google 微软 Apple 无人驾驶 Java 人工智能 大数据 阿里巴巴 特斯拉 Facebook VR/AR 安全 手机 亚马逊 机器人 云计算

netty4粘包/拆包/断包 解决方案

粘包、拆包表现形式

现在假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种,现列举如下:

第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象,此种情况不在本文的讨论范围内。normal

第二种情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。one

第三种情况,这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。half_one

one_half

粘包问题的解决策略

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决。业界的主流协议的解决方案,可以归纳如下: 
1. 消息定长,报文大小固定长度,例如每个报文的长度固定为200字节,如果不够空位补空格; 
2. 包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分; 
3. 将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段; 
4. 更复杂的自定义应用层协议。

Netty粘包和拆包解决方案

Netty提供了多个解码器,可以进行分包的操作,分别是: 
* LineBasedFrameDecoder (换行)
   LineBasedFrameDecoder是回车换行解码器,如果用户发送的消息以回车换行符作为消息结束的标识,则可以直接使用Netty的LineBasedFrameDecoder对消息进行解码,只需要在初始化Netty服务端或者客户端时将LineBasedFrameDecoder正确的添加到ChannelPipeline中即可,不需要自己重新实现一套换行解码器。
   LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。防止由于数据报没有携带换行符导致接收到ByteBuf无限制积压,引起系统内存溢出。

* DelimiterBasedFrameDecoder(添加特殊分隔符报文来分包) 
   DelimiterBasedFrameDecoder是分隔符解码器,用户可以指定消息结束的分隔符,它可以自动完成以分隔符作为码流结束标识的消息的解码。
   回车换行解码器实际上是一种特殊的DelimiterBasedFrameDecoder解码器。

* FixedLengthFrameDecoder(使用定长的报文来分包) 
    FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包等问题,非常实用。
    对于定长消息,如果消息实际长度小于定长,则往往会进行补位操作,它在一定程度上导致了空间和资源的浪费。但是它的优点也是非常明显的,编解码比较简单,因此在实际项目中仍然有一定的应用场景。

* LengthFieldBasedFrameDecoder (自定义解码器跟编码器)

   本文介绍的重点LengthFieldBasedFrameDecoder,一般包含了消息头(head)、消息体(body):消息头是固定的长度,一般有有以下信息 -> 是否压缩(zip)、消息类型(type or cmdid)、消息体长度(body length);消息体长度不是固定的,其大小由消息头记载,一般记载业务交互信息。

  netty对应来说就是编码器(Encoder)跟解码器(Decoder),一般其中会有一个基本消息类对外输出,egg:

  1. /**
  2. * @describe 消息缓存区
  3. * @author zhikai.chen
  4. * @date 2018年4月28日 上午10:13:35
  5. */
  6. public class Message {
  7. /**
  8. * 要发送的数据
  9. */
  10. private String data;
  11. /**
  12. * 业务编号
  13. */
  14. private short cmdId;
  15. /**
  16. * 消息类型 0xAF 表示心跳包 0xBF 表示超时包 0xCF 业务信息包
  17. */
  18. private byte type;
  19. /**
  20. * 是否压缩,1是,0不是
  21. */
  22. private byte zip = 0 ;
  23. /**
  24.      * 封装要发送的数据包
  25.      * @param data 业务数据
  26.      * @param cmdId 业务标识号
  27.      * @param type 消息类型
  28.      */
  29. public Message(String data,short cmdId,byte type){
  30. this.data=data;
  31. this.cmdId=cmdId;
  32. this.type=type;
  33. }
  34. public String getData() {
  35. return data;
  36. }
  37. public void setData(String data) {
  38. this.data = data;
  39. }
  40. public short getCmdId() {
  41. return cmdId;
  42. }
  43. public void setCmdId(short cmdId) {
  44. this.cmdId = cmdId;
  45. }
  46. public byte getType() {
  47. return type;
  48. }
  49. public void setType(byte type) {
  50. this.type = type;
  51. }
  52. public byte getZip() {
  53. return zip;
  54. }
  55. public void setZip(byte zip) {
  56. this.zip = zip;
  57. }
Encoder:
  1. /**
  2. * @describe 消息编码器,封装
  3. * @author zhikai.chen
  4. * @date 2018年4月28日 上午10:17:52
  5. */
  6. public class MessageEncoder extends MessageToByteEncoder<Message> {
  7. // 编码格式
  8. private final Charset charset = Charset.forName(“UTF-8”);
  9. // 需要压缩的长度
  10. private final int compressLength=1024;
  11. @Override
  12. protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
  13. String source=msg.getData();
  14. byte[] body=source.getBytes(charset);
  15. if(body.length > compressLength){
  16. msg.setZip((byte)1);
  17. // 加压
  18. body=ZipTool.compress(body);
  19. }
  20. //cmdId(2)+type(1)+zip(1)+body(4)=8
  21. //out = Unpooled.directBuffer(8+body.length);
  22. //cmdId
  23. out.writeShort(msg.getCmdId());
  24. //type
  25. out.writeByte(msg.getType());
  26. //是否加压
  27. out.writeByte(msg.getZip());
  28. //长度
  29. out.writeInt(body.length);
  30. //内容
  31. out.writeBytes(body);
  32. }
  33. }

NioSocketServerInitializer(服务端跟客户端都一致):

  1. //TODO 参考Message
  2. //body(4)+zip(1)+cmdId(2)+type(1)=8
  3. //最大长度
  4. private static final int MAX_FRAME_LENGTH = 1024 * 1024;
  5. //这个值就是MessageEncoder body.length(4)
  6. private static final int LENGTH_FIELD_LENGTH = 4;
  7. //这个值就是MessageEncoder zip(1)+cmdId(2)+type(1)
  8. private static final int LENGTH_FIELD_OFFSET = 4;
  9. private static final int LENGTH_ADJUSTMENT = 0;
  10. private static final int INITIAL_BYTES_TO_STRIP = 0;
  1. //TODO 粘包处理
  2. //解码器
  3. ch.pipeline().addLast(new MessageDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP));
  4. //编码器
  5. ch.pipeline().addLast(new MessageEncoder());

稍微解释一下:
1)LENGTH_FIELD_LENGTH指的就是我们这边CustomMsg中length这个属性的大小,我们这边是int型,所以是4
2)LENGTH_FIELD_OFFSET指的就是我们这边length字段的起始位置,因为前面有zip(1)+cmdId(2)+type(1)=4,所以是4
3)LENGTH_ADJUSTMENT指的是length这个属性的值,假如我们的body长度是40,有时候,有些人喜欢将length写成44,因为length本身还占有4个字节,

    这样就需要调整一下,那么就需要-4,我们这边没有这样做,所以写0就可以了   

Decoder
  1. /**
  2. * @describe 消息解码器
  3. * @author zhikai.chen
  4. * @date 2018年4月28日 上午11:09:15
  5. */
  6. public class MessageDecoder extends LengthFieldBasedFrameDecoder {
  7. //body(4)+zip(1)+cmdId(2)+type(1)=8
  8. private static final int HEADER_SIZE = 8;
  9. // 编码格式
  10. private final Charset charset = Charset.forName(“UTF-8”);
  11. public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
  12. super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
  13. }
  14. public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip) {
  15. super(maxFrameLength, lengthFieldOffset, lengthFieldLength,lengthAdjustment,initialBytesToStrip);
  16. }
  17. @Override
  18. protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  19. if (in == null) {
  20. return null;
  21. }
  22. // 消息头读取不完整,不做解析返回null,直到读完整为止
  23. if (in.readableBytes() <= HEADER_SIZE) {
  24. return null;
  25. }
  26. in.markReaderIndex();
  27. short cmdId = in.readShort();
  28. byte type = in.readByte();
  29. byte zip = in.readByte();
  30. int dataLength = in.readInt();
  31. // TODO 网络信号不好,没有接收到完整数据
  32. if (in.readableBytes() < dataLength) {
  33. //保存当前读到的数据,下一次继续读取
  34. //断包处理:查看ByteToMessageDecoder的channelRead方法,ByteBuf cumulation属性
  35. in.resetReaderIndex();
  36. return null;
  37. }
  38. byte[] data = new byte[dataLength];
  39. in.readBytes(data);
  40. // TODO 手动释放内存
  41. //in.release(); // or ReferenceCountUtil.release(in);
  42. //判断是否压缩
  43. if(zip==1){
  44. data=ZipTool.uncompress(data);
  45. }
  46. String body = new String(data, charset);
  47. Message msg = new Message(body, cmdId, type);
  48. return msg;
  49. }
  50. }

Decoder的解码顺序是跟Encoder一致的。

对应的消息接收handler read处理:

  1. NioSocketServerHandler server=new NioSocketServerHandler();
  2. pipeline.addLast(server);
  1. public class NioSocketServerHandler extends SimpleChannelInboundHandler<Message> {
  2. private final Logger log=LoggerFactory.getLogger(this.getClass());
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
  5. log.info(“server read msg:{}”, JSON.toJSONString(msg));
  6. }}

write处理:

  1. Message msg=new Message(“ok”,ModuleID.HEART_BEAT,(byte)0xAF);
  2. ctx.channel().writeAndFlush(msg);

断包处理

眼尖的童靴其实已经发现(Decoder):

  1. //TODO 消息头都读取不完整,不做解析返回null,直到读完整为之
  2.         if (in.readableBytes() <= HEADER_SIZE) {
  3.             return null;
  4.         }
  5. in.markReaderIndex();
  6. short cmdId = in.readShort();
  7. byte type = in.readByte();
  8. byte zip = in.readByte();
  9. int dataLength = in.readInt();
  10. // TODO 网络信号不好,没有接收到完整数据
  11. if (in.readableBytes() < dataLength) {
  12. //保存当前读到的数据,下一次继续读取
  13. //断包处理:查看ByteToMessageDecoder的channelRead方法,ByteBuf cumulation属性
  14. in.resetReaderIndex();
  15. return null;
  16. }

   说明注释已经讲解了,我们来看看消息体一次读不完(断包)了,netty底层是怎么处理的:解码器继承LengthFieldBasedFrameDecoder,而LengthFieldBasedFrameDecoder继承ByteToMessageDecoder,ByteToMessageDecoder中有一个channelRead方法:

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. if (msg instanceof ByteBuf) {
  4. CodecOutputList out = CodecOutputList.newInstance();
  5. try {
  6. ByteBuf data = (ByteBuf) msg;
  7. first = cumulation == null;
  8. if (first) {
  9. cumulation = data;
  10. } else {
  11. cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
  12. }
  13. callDecode(ctx, cumulation, out);
  14. } catch (DecoderException e) {
  15. throw e;
  16. } catch (Throwable t) {
  17. throw new DecoderException(t);
  18. } finally {
  19. if (cumulation != null && !cumulation.isReadable()) {
  20. numReads = 0;
  21. cumulation.release();
  22. cumulation = null;
  23. } else if (++ numReads >= discardAfterReads) {
  24. // We did enough reads already try to discard some bytes so we not risk to see a OOME.
  25. // See https://github.com/netty/netty/issues/4275
  26. numReads = 0;
  27. discardSomeReadBytes();
  28. }
  29. int size = out.size();
  30. decodeWasNull = !out.insertSinceRecycled();
  31. fireChannelRead(ctx, out, size);
  32. out.recycle();
  33. }
  34. } else {
  35. ctx.fireChannelRead(msg);
  36. }
  37. }

我们来看看cumulation的定义,

  1. ByteBuf cumulation;
  2. private Cumulator cumulator = MERGE_CUMULATOR;
  3. private boolean singleDecode;
  4. private boolean decodeWasNull;
  5. private boolean first;
  6. private int discardAfterReads = 16;
  7. private int numReads;

cumulation为ByteBuf对象,是一个缓冲区,即如果消息体一次读不完,下一次继续读取,直到读完整消息头给定的长度为止。

点赞 0 打赏

我要评论