package com.inspur.netty.handler; import com.inspur.device.domain.SmsbDeviceHeartRecord; import com.inspur.device.domain.vo.SmsbDeviceVo; import com.inspur.device.mapper.SmsbDeviceHeartRecordMapper; import com.inspur.device.mapper.SmsbDeviceMapper; import com.inspur.device.service.ISmsbDeviceService; import com.inspur.netty.domain.es.EsSmsbDeviceHeartRecord; import com.inspur.netty.esmapper.EsSmsbDeviceHeartRecordMapper; import com.inspur.netty.message.push.PushMessageType; import com.inspur.netty.message.receive.ReceiveMessageType; import com.inspur.netty.util.NettyConstants; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.utils.DateUtils; import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.easyes.common.property.EasyEsProperties; import java.nio.charset.Charset; import java.util.Map; /** * netty heart handler * * @author lihao16 */ @Slf4j public class HeartServerHandler extends ChannelInboundHandlerAdapter { private static final SmsbDeviceHeartRecordMapper smsbDeviceHeartRecordMapper = SpringUtils.getBean(SmsbDeviceHeartRecordMapper.class); private static final ISmsbDeviceService smsbDeviceService = SpringUtils.getBean(ISmsbDeviceService.class); private static final EasyEsProperties easyEsProperties = SpringUtils.containsBean("easyEsProperties") ? SpringUtils.getBean(EasyEsProperties.class) : null; private static final EsSmsbDeviceHeartRecordMapper esSmsbDeviceHeartRecordMapper = SpringUtils.containsBean("esSmsbDeviceHeartRecordMapper") ? SpringUtils.getBean(EsSmsbDeviceHeartRecordMapper.class) : null; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { try { String key = NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(); // Redis 原子递增 Long lossHeartCount = RedisUtils.incrAtomicValue(key); if (lossHeartCount >= NettyConstants.HEART_LOSS_COUNT) { log.info("HeartServerHandler: lossHeartCount = " + lossHeartCount + ", close channel" + ctx.channel().id()); // 清理 Redis 计数 RedisUtils.deleteObject(key); ctx.channel().close(); } else { log.info("HeartServerHandler: lossHeartCount = " + lossHeartCount + ", channel id = " + ctx.channel().id()); } } catch (Exception e) { log.error("Redis 操作失败: " + e.getMessage(), e); } } } else { super.userEventTriggered(ctx, evt); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String message = (String) msg; // 处理heartbeat消息 String messageType = message.split("/")[1]; if (messageType.equals(ReceiveMessageType.HEARTBEAT.getValue())) { String identifier = message.split("/")[0]; if (!StringUtils.isEmpty(identifier)) { Map snChannelMap = ConnectServerHandler.getSnChannelMap(); // 判断snChannelMap是否包含macAddr这个key值 if (!snChannelMap.containsKey(identifier)) { log.info("心跳监测:snChannelMap 无 identifier = " + identifier + "信息,重新保存"); snChannelMap.put(identifier, ctx.channel()); ConnectServerHandler.updateDeviceOnlineStatue(identifier, NettyConstants.DEVICE_ONLINE_STATUS); } } String heartbeatReplay = identifier + PushMessageType.HEARTBEAT_REPLAY.getValue(); ByteBuf byteBuf = Unpooled.copiedBuffer(heartbeatReplay + NettyConstants.DATA_PACK_SEPARATOR, Charset.forName("utf-8")); ctx.channel().writeAndFlush(byteBuf); log.debug("HeartServerHandler: 心跳消息已返回 : " + ctx.channel().id() + "identifier = " + identifier); Long lastNettyHeartTime = RedisUtils.getCacheObject(NettyConstants.DEVICE_LAST_HEART_PREFIX + identifier); // redis cache device last heart time Long currentNettyHeartTime = System.currentTimeMillis(); RedisUtils.setCacheObject(NettyConstants.DEVICE_LAST_HEART_PREFIX + identifier, currentNettyHeartTime); buildAndInsertHeart(identifier, lastNettyHeartTime, currentNettyHeartTime); }else { ctx.fireChannelRead(message); } } private void buildAndInsertHeart(String identifier, Long lastNettyHeartTime, Long currentNettyHeartTime) { SmsbDeviceHeartRecord smsbDeviceHeartRecord = new SmsbDeviceHeartRecord(); EsSmsbDeviceHeartRecord esSmsbDeviceHeartRecord = new EsSmsbDeviceHeartRecord(); smsbDeviceHeartRecord.setHeartType(2); esSmsbDeviceHeartRecord.setHeartType("2"); if (lastNettyHeartTime == null) { smsbDeviceHeartRecord.setTimeInterval(0L); esSmsbDeviceHeartRecord.setTimeInterval("0"); }else { smsbDeviceHeartRecord.setTimeInterval(currentNettyHeartTime - lastNettyHeartTime); esSmsbDeviceHeartRecord.setTimeInterval(currentNettyHeartTime - lastNettyHeartTime + ""); } smsbDeviceHeartRecord.setIdentifier(identifier); esSmsbDeviceHeartRecord.setIdentifier(identifier); // 获取设备信息 缓存中 SmsbDeviceVo smsbDeviceVo = smsbDeviceService.getDeviceByIdentifier(identifier); if (null != smsbDeviceVo) { smsbDeviceHeartRecord.setTenantId(smsbDeviceVo.getTenantId()); esSmsbDeviceHeartRecord.setTenantId(smsbDeviceVo.getTenantId()); } esSmsbDeviceHeartRecord.setCreateTime(DateUtils.getTime()); smsbDeviceHeartRecordMapper.insert(smsbDeviceHeartRecord); if (easyEsProperties != null && easyEsProperties.isEnable()) { esSmsbDeviceHeartRecordMapper.insert(esSmsbDeviceHeartRecord); } } /*@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("HeartServerHandler: " + ctx.channel().id() + " : 已经一分钟未收到客户端的消息!"); if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state() == IdleState.READER_IDLE){ String lossHeartCount = RedisUtils.getCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id()); if (StringUtils.isNotEmpty(lossHeartCount) && Integer.parseInt(lossHeartCount) > NettyConstants.HEART_LOSS_COUNT){ log.info("HeartServerHandler : lossHeartCount = " + lossHeartCount + ",channel close"); ctx.channel().close(); return; } if (StringUtils.isEmpty(lossHeartCount)) { RedisUtils.setCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(), "1"); }else { RedisUtils.setCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(), String.valueOf(Integer.parseInt(lossHeartCount) + 1)); } } }else { super.userEventTriggered(ctx,evt); } }*/ }