فهرست منبع

fix:修复因消息粘包、拆包问题导致消息不完整关闭连接的问题

lihao16 2 ماه پیش
والد
کامیت
dceed7c743

+ 30 - 24
smsb-modules/smsb-netty/src/main/java/com/inspur/netty/handler/AuthServerHandler.java

@@ -33,35 +33,41 @@ public class AuthServerHandler extends ChannelInboundHandlerAdapter {
         log.info("AuthServerHandler: channelId = " + ctx.channel().id() + ",login channelGroup");
     }
 
-
+    /**
+     * 此方法现在只会在接收到一条完整的消息时被调用(已被解码器处理过)。
+     * 框架会自动释放 msg 这个 ByteBuf。
+     *
+     * @param ctx 上下文
+     * @param msg 一条完整的、去除了分隔符的消息
+     */
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        ByteBuf buf = (ByteBuf) msg;
-        byte[] buffer = new byte[buf.readableBytes()];
-        buf.readBytes(buffer);
-        String message = new String(buffer, "utf-8");
+        // 解码器已经将 "####" 分隔符去除,这里得到的是完整的消息体
+        String message = ((ByteBuf) msg).toString(Charset.forName("utf-8"));
+
         if (StringUtils.isEmpty(message)) {
-            log.info("AuthServerHandler: 客户端发来的消息是空");
+            log.warn("AuthServerHandler: 从 channelId = {} 收到解码后的空消息", ctx.channel().id());
+            return;
+        }
+        log.info("AuthServerHandler: 接收到客户端的完整消息: {}", message);
+
+        // 获取消息中的SN 256数据
+        String identifier = message.split("/")[0];
+        if (StringUtils.isEmpty(identifier)) {
+            log.warn("AuthServerHandler: 无法从消息 {} 中解析出设备标识", message);
+            // 消息格式不正确,关闭连接
+            ctx.close();
             return;
         }
-        String[] listMsg = message.split(NettyConstants.DATA_PACK_SEPARATOR);
-        for (String oneMsg : listMsg) {
-            if (StringUtils.isEmpty(oneMsg)) {
-                continue;
-            }
-            log.info("AuthServerHandler: 客户端发来的消息:" + oneMsg);
-            // 获取消息中的SN 256数据
-            String identifier = oneMsg.split("/")[0];
-            if (validateDevice(identifier)) {
-                ctx.fireChannelRead(oneMsg);
-            } else {
-                // 发送消息鉴权失败
-                String replayMsg = identifier + PushMessageType.INIT_REPLAY.getValue() + "/fail:auth fail";
-                ByteBuf byteBuf = Unpooled.copiedBuffer(replayMsg + NettyConstants.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
-                Channel channel = ctx.channel();
-                channel.writeAndFlush(byteBuf);
-                ctx.close();
-            }
+        if (validateDevice(identifier)) {
+            ctx.fireChannelRead(message);
+        } else {
+            // 发送消息鉴权失败
+            String replayMsg = identifier + PushMessageType.INIT_REPLAY.getValue() + "/fail:auth fail";
+            ByteBuf byteBuf = Unpooled.copiedBuffer(replayMsg + NettyConstants.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
+            Channel channel = ctx.channel();
+            channel.writeAndFlush(byteBuf);
+            ctx.close();
         }
     }
 

+ 30 - 0
smsb-modules/smsb-netty/src/main/java/com/inspur/netty/handler/MyFinalBusinessHandler.java

@@ -0,0 +1,30 @@
+package com.inspur.netty.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 自定义业务处理类 最终处理类
+ * 避免内存泄漏 自动释放 msg 的资源
+ *
+ * @author lihao16
+ */
+@Slf4j
+public class MyFinalBusinessHandler extends SimpleChannelInboundHandler {
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+        // 因为 SimpleChannelInboundHandler 会自动释放 msg,
+        // 所以不能在这里(或者之后)再调用 ctx.fireChannelRead(msg) 将 msg 传递下去,
+        // 否则会收到一个“非法引用计数”的错误,因为消息可能已经被释放了。
+        log.info("MyFinalBusinessHandler auto release msg : " + (String) msg);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+}

+ 13 - 0
smsb-modules/smsb-netty/src/main/java/com/inspur/netty/server/NettyServer.java

@@ -1,7 +1,10 @@
 package com.inspur.netty.server;
 
 import com.inspur.netty.handler.*;
+import com.inspur.netty.util.NettyConstants;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -9,6 +12,7 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.handler.timeout.IdleStateHandler;
 import lombok.SneakyThrows;
 import org.slf4j.Logger;
@@ -16,6 +20,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.stereotype.Component;
 
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -28,6 +33,12 @@ public class NettyServer {
 
     private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
 
+
+    /**
+     * 将字符串分隔符 "####" 转换为 ByteBuf
+     */
+    private static final ByteBuf delimiter = Unpooled.copiedBuffer(NettyConstants.DATA_PACK_SEPARATOR, StandardCharsets.UTF_8);
+
     @Bean
     public void startNettyServer(){
         new Thread(new Runnable() {
@@ -54,6 +65,7 @@ public class NettyServer {
                         @Override
                         protected void initChannel(SocketChannel channel) throws Exception {
                             channel.pipeline().addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
+                            channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                             channel.pipeline().addLast(new AuthServerHandler());
                             channel.pipeline().addLast(new ConnectServerHandler());
                             channel.pipeline().addLast(new HeartServerHandler());
@@ -69,6 +81,7 @@ public class NettyServer {
                             channel.pipeline().addLast(new TaskPlayControlHandler());
                             channel.pipeline().addLast(new TaskOtaHandler());
                             channel.pipeline().addLast(new TaskLogPushStartHandler());
+                            channel.pipeline().addLast(new MyFinalBusinessHandler());
                         }
                     });
             ChannelFuture f = b.bind(18081).sync();