HeartServerHandler.java 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package com.inspur.netty.handler;
  2. import com.inspur.device.domain.SmsbDeviceHeartRecord;
  3. import com.inspur.device.domain.vo.SmsbDeviceVo;
  4. import com.inspur.device.mapper.SmsbDeviceHeartRecordMapper;
  5. import com.inspur.device.mapper.SmsbDeviceMapper;
  6. import com.inspur.device.service.ISmsbDeviceService;
  7. import com.inspur.netty.domain.es.EsSmsbDeviceHeartRecord;
  8. import com.inspur.netty.esmapper.EsSmsbDeviceHeartRecordMapper;
  9. import com.inspur.netty.message.push.PushMessageType;
  10. import com.inspur.netty.message.receive.ReceiveMessageType;
  11. import com.inspur.netty.util.NettyConstants;
  12. import io.netty.buffer.ByteBuf;
  13. import io.netty.buffer.Unpooled;
  14. import io.netty.channel.Channel;
  15. import io.netty.channel.ChannelHandlerContext;
  16. import io.netty.channel.ChannelInboundHandlerAdapter;
  17. import io.netty.handler.timeout.IdleState;
  18. import io.netty.handler.timeout.IdleStateEvent;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.dromara.common.core.utils.DateUtils;
  21. import org.dromara.common.core.utils.SpringUtils;
  22. import org.dromara.common.core.utils.StringUtils;
  23. import org.dromara.common.redis.utils.RedisUtils;
  24. import org.dromara.easyes.common.property.EasyEsProperties;
  25. import java.nio.charset.Charset;
  26. import java.util.Map;
  27. /**
  28. * netty heart handler
  29. *
  30. * @author lihao16
  31. */
  32. @Slf4j
  33. public class HeartServerHandler extends ChannelInboundHandlerAdapter {
  34. private static final SmsbDeviceHeartRecordMapper smsbDeviceHeartRecordMapper = SpringUtils.getBean(SmsbDeviceHeartRecordMapper.class);
  35. private static final ISmsbDeviceService smsbDeviceService = SpringUtils.getBean(ISmsbDeviceService.class);
  36. private static final EasyEsProperties easyEsProperties = SpringUtils.containsBean("easyEsProperties")
  37. ? SpringUtils.getBean(EasyEsProperties.class) : null;
  38. private static final EsSmsbDeviceHeartRecordMapper esSmsbDeviceHeartRecordMapper = SpringUtils.containsBean("esSmsbDeviceHeartRecordMapper")
  39. ? SpringUtils.getBean(EsSmsbDeviceHeartRecordMapper.class) : null;
  40. @Override
  41. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  42. if (evt instanceof IdleStateEvent) {
  43. IdleStateEvent event = (IdleStateEvent) evt;
  44. if (event.state() == IdleState.READER_IDLE) {
  45. try {
  46. String key = NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id();
  47. // Redis 原子递增
  48. Long lossHeartCount = RedisUtils.incrAtomicValue(key);
  49. if (lossHeartCount >= NettyConstants.HEART_LOSS_COUNT) {
  50. log.info("HeartServerHandler: lossHeartCount = " + lossHeartCount + ", close channel" + ctx.channel().id());
  51. // 清理 Redis 计数
  52. RedisUtils.deleteObject(key);
  53. ctx.channel().close();
  54. } else {
  55. log.info("HeartServerHandler: lossHeartCount = " + lossHeartCount + ", channel id = " + ctx.channel().id());
  56. }
  57. } catch (Exception e) {
  58. log.error("Redis 操作失败: " + e.getMessage(), e);
  59. }
  60. }
  61. } else {
  62. super.userEventTriggered(ctx, evt);
  63. }
  64. }
  65. @Override
  66. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  67. String message = (String) msg;
  68. // 处理heartbeat消息
  69. String messageType = message.split("/")[1];
  70. if (messageType.equals(ReceiveMessageType.HEARTBEAT.getValue())) {
  71. String identifier = message.split("/")[0];
  72. if (!StringUtils.isEmpty(identifier)) {
  73. Map<String, Channel> snChannelMap = ConnectServerHandler.getSnChannelMap();
  74. // 判断snChannelMap是否包含macAddr这个key值
  75. if (!snChannelMap.containsKey(identifier)) {
  76. log.info("心跳监测:snChannelMap 无 identifier = " + identifier + "信息,重新保存");
  77. snChannelMap.put(identifier, ctx.channel());
  78. ConnectServerHandler.updateDeviceOnlineStatue(identifier, NettyConstants.DEVICE_ONLINE_STATUS);
  79. }
  80. }
  81. String heartbeatReplay = identifier + PushMessageType.HEARTBEAT_REPLAY.getValue();
  82. ByteBuf byteBuf = Unpooled.copiedBuffer(heartbeatReplay + NettyConstants.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
  83. ctx.channel().writeAndFlush(byteBuf);
  84. log.debug("HeartServerHandler: 心跳消息已返回 : " + ctx.channel().id() + "identifier = " + identifier);
  85. Long lastNettyHeartTime = RedisUtils.getCacheObject(NettyConstants.DEVICE_LAST_HEART_PREFIX + identifier);
  86. // redis cache device last heart time
  87. Long currentNettyHeartTime = System.currentTimeMillis();
  88. RedisUtils.setCacheObject(NettyConstants.DEVICE_LAST_HEART_PREFIX + identifier, currentNettyHeartTime);
  89. buildAndInsertHeart(identifier, lastNettyHeartTime, currentNettyHeartTime);
  90. }else {
  91. ctx.fireChannelRead(message);
  92. }
  93. }
  94. private void buildAndInsertHeart(String identifier, Long lastNettyHeartTime, Long currentNettyHeartTime) {
  95. SmsbDeviceHeartRecord smsbDeviceHeartRecord = new SmsbDeviceHeartRecord();
  96. EsSmsbDeviceHeartRecord esSmsbDeviceHeartRecord = new EsSmsbDeviceHeartRecord();
  97. smsbDeviceHeartRecord.setHeartType(2);
  98. esSmsbDeviceHeartRecord.setHeartType("2");
  99. if (lastNettyHeartTime == null) {
  100. smsbDeviceHeartRecord.setTimeInterval(0L);
  101. esSmsbDeviceHeartRecord.setTimeInterval("0");
  102. }else {
  103. smsbDeviceHeartRecord.setTimeInterval(currentNettyHeartTime - lastNettyHeartTime);
  104. esSmsbDeviceHeartRecord.setTimeInterval(currentNettyHeartTime - lastNettyHeartTime + "");
  105. }
  106. smsbDeviceHeartRecord.setIdentifier(identifier);
  107. esSmsbDeviceHeartRecord.setIdentifier(identifier);
  108. // 获取设备信息 缓存中
  109. SmsbDeviceVo smsbDeviceVo = smsbDeviceService.getDeviceByIdentifier(identifier);
  110. if (null != smsbDeviceVo) {
  111. smsbDeviceHeartRecord.setTenantId(smsbDeviceVo.getTenantId());
  112. esSmsbDeviceHeartRecord.setTenantId(smsbDeviceVo.getTenantId());
  113. }
  114. esSmsbDeviceHeartRecord.setCreateTime(DateUtils.getTime());
  115. smsbDeviceHeartRecordMapper.insert(smsbDeviceHeartRecord);
  116. if (easyEsProperties != null && easyEsProperties.isEnable()) {
  117. esSmsbDeviceHeartRecordMapper.insert(esSmsbDeviceHeartRecord);
  118. }
  119. }
  120. /*@Override
  121. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  122. log.info("HeartServerHandler: " + ctx.channel().id() + " : 已经一分钟未收到客户端的消息!");
  123. if (evt instanceof IdleStateEvent){
  124. IdleStateEvent event = (IdleStateEvent)evt;
  125. if (event.state() == IdleState.READER_IDLE){
  126. String lossHeartCount = RedisUtils.getCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id());
  127. if (StringUtils.isNotEmpty(lossHeartCount) && Integer.parseInt(lossHeartCount) > NettyConstants.HEART_LOSS_COUNT){
  128. log.info("HeartServerHandler : lossHeartCount = " + lossHeartCount + ",channel close");
  129. ctx.channel().close();
  130. return;
  131. }
  132. if (StringUtils.isEmpty(lossHeartCount)) {
  133. RedisUtils.setCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(), "1");
  134. }else {
  135. RedisUtils.setCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(), String.valueOf(Integer.parseInt(lossHeartCount) + 1));
  136. }
  137. }
  138. }else {
  139. super.userEventTriggered(ctx,evt);
  140. }
  141. }*/
  142. }