HeartServerHandler.java 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package com.inspur.netty.handler;
  2. import com.inspur.device.domain.SmsbDeviceHeartRecord;
  3. import com.inspur.device.domain.vo.SmsbDeviceVo;
  4. import com.inspur.device.mapper.SmsbDeviceHeartRecordMapper;
  5. import com.inspur.device.mapper.SmsbDeviceMapper;
  6. import com.inspur.device.service.ISmsbDeviceService;
  7. import com.inspur.netty.message.push.PushMessageType;
  8. import com.inspur.netty.message.receive.ReceiveMessageType;
  9. import com.inspur.netty.util.NettyConstants;
  10. import io.netty.buffer.ByteBuf;
  11. import io.netty.buffer.Unpooled;
  12. import io.netty.channel.Channel;
  13. import io.netty.channel.ChannelHandlerContext;
  14. import io.netty.channel.ChannelInboundHandlerAdapter;
  15. import io.netty.handler.timeout.IdleState;
  16. import io.netty.handler.timeout.IdleStateEvent;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.dromara.common.core.utils.SpringUtils;
  19. import org.dromara.common.core.utils.StringUtils;
  20. import org.dromara.common.redis.utils.RedisUtils;
  21. import java.nio.charset.Charset;
  22. import java.util.Map;
  23. /**
  24. * netty heart handler
  25. *
  26. * @author lihao16
  27. */
  28. @Slf4j
  29. public class HeartServerHandler extends ChannelInboundHandlerAdapter {
  30. private static final SmsbDeviceHeartRecordMapper smsbDeviceHeartRecordMapper = SpringUtils.getBean(SmsbDeviceHeartRecordMapper.class);
  31. private static final ISmsbDeviceService smsbDeviceService = SpringUtils.getBean(ISmsbDeviceService.class);
  32. @Override
  33. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  34. if (evt instanceof IdleStateEvent) {
  35. IdleStateEvent event = (IdleStateEvent) evt;
  36. if (event.state() == IdleState.READER_IDLE) {
  37. try {
  38. String key = NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id();
  39. // Redis 原子递增
  40. Long lossHeartCount = RedisUtils.incrAtomicValue(key);
  41. if (lossHeartCount >= NettyConstants.HEART_LOSS_COUNT) {
  42. log.info("HeartServerHandler: lossHeartCount = " + lossHeartCount + ", close channel" + ctx.channel().id());
  43. // 清理 Redis 计数
  44. RedisUtils.deleteObject(key);
  45. ctx.channel().close();
  46. } else {
  47. log.info("HeartServerHandler: lossHeartCount = " + lossHeartCount + ", channel id = " + ctx.channel().id());
  48. }
  49. } catch (Exception e) {
  50. log.error("Redis 操作失败: " + e.getMessage(), e);
  51. }
  52. }
  53. } else {
  54. super.userEventTriggered(ctx, evt);
  55. }
  56. }
  57. @Override
  58. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  59. String message = (String) msg;
  60. // 处理heartbeat消息
  61. String messageType = message.split("/")[1];
  62. if (messageType.equals(ReceiveMessageType.HEARTBEAT.getValue())) {
  63. String identifier = message.split("/")[0];
  64. if (!StringUtils.isEmpty(identifier)) {
  65. Map<String, Channel> snChannelMap = ConnectServerHandler.getSnChannelMap();
  66. // 判断snChannelMap是否包含macAddr这个key值
  67. if (!snChannelMap.containsKey(identifier)) {
  68. log.info("心跳监测:snChannelMap 无 identifier = " + identifier + "信息,重新保存");
  69. snChannelMap.put(identifier, ctx.channel());
  70. ConnectServerHandler.updateDeviceOnlineStatue(identifier, NettyConstants.DEVICE_ONLINE_STATUS);
  71. }
  72. }
  73. String heartbeatReplay = identifier + PushMessageType.HEARTBEAT_REPLAY.getValue();
  74. ByteBuf byteBuf = Unpooled.copiedBuffer(heartbeatReplay + NettyConstants.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
  75. ctx.channel().writeAndFlush(byteBuf);
  76. log.debug("HeartServerHandler: 心跳消息已返回 : " + ctx.channel().id() + "identifier = " + identifier);
  77. Long lastNettyHeartTime = RedisUtils.getCacheObject(NettyConstants.DEVICE_LAST_HEART_PREFIX + identifier);
  78. // redis cache device last heart time
  79. Long currentNettyHeartTime = System.currentTimeMillis();
  80. RedisUtils.setCacheObject(NettyConstants.DEVICE_LAST_HEART_PREFIX + identifier, currentNettyHeartTime);
  81. buildAndInsertHeart(identifier, lastNettyHeartTime, currentNettyHeartTime);
  82. }else {
  83. ctx.fireChannelRead(message);
  84. }
  85. }
  86. private void buildAndInsertHeart(String identifier, Long lastNettyHeartTime, Long currentNettyHeartTime) {
  87. SmsbDeviceHeartRecord smsbDeviceHeartRecord = new SmsbDeviceHeartRecord();
  88. smsbDeviceHeartRecord.setHeartType(2);
  89. if (lastNettyHeartTime == null) {
  90. smsbDeviceHeartRecord.setTimeInterval(0L);
  91. }else {
  92. smsbDeviceHeartRecord.setTimeInterval(currentNettyHeartTime - lastNettyHeartTime);
  93. }
  94. smsbDeviceHeartRecord.setIdentifier(identifier);
  95. // 获取设备信息 缓存中
  96. SmsbDeviceVo smsbDeviceVo = smsbDeviceService.getDeviceByIdentifier(identifier);
  97. if (null != smsbDeviceVo) {
  98. smsbDeviceHeartRecord.setTenantId(smsbDeviceVo.getTenantId());
  99. }
  100. smsbDeviceHeartRecordMapper.insert(smsbDeviceHeartRecord);
  101. }
  102. /*@Override
  103. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  104. log.info("HeartServerHandler: " + ctx.channel().id() + " : 已经一分钟未收到客户端的消息!");
  105. if (evt instanceof IdleStateEvent){
  106. IdleStateEvent event = (IdleStateEvent)evt;
  107. if (event.state() == IdleState.READER_IDLE){
  108. String lossHeartCount = RedisUtils.getCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id());
  109. if (StringUtils.isNotEmpty(lossHeartCount) && Integer.parseInt(lossHeartCount) > NettyConstants.HEART_LOSS_COUNT){
  110. log.info("HeartServerHandler : lossHeartCount = " + lossHeartCount + ",channel close");
  111. ctx.channel().close();
  112. return;
  113. }
  114. if (StringUtils.isEmpty(lossHeartCount)) {
  115. RedisUtils.setCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(), "1");
  116. }else {
  117. RedisUtils.setCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(), String.valueOf(Integer.parseInt(lossHeartCount) + 1));
  118. }
  119. }
  120. }else {
  121. super.userEventTriggered(ctx,evt);
  122. }
  123. }*/
  124. }