| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- package com.inspur.netty.handler;
- import com.inspur.device.domain.SmsbDeviceHeartRecord;
- import com.inspur.device.domain.vo.SmsbDeviceVo;
- import com.inspur.device.mapper.SmsbDeviceHeartRecordMapper;
- import com.inspur.device.mapper.SmsbDeviceMapper;
- import com.inspur.device.service.ISmsbDeviceService;
- import com.inspur.netty.message.push.PushMessageType;
- import com.inspur.netty.message.receive.ReceiveMessageType;
- import com.inspur.netty.util.NettyConstants;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import lombok.extern.slf4j.Slf4j;
- import org.dromara.common.core.utils.SpringUtils;
- import org.dromara.common.core.utils.StringUtils;
- import org.dromara.common.redis.utils.RedisUtils;
- import java.nio.charset.Charset;
- import java.util.Map;
- /**
- * netty heart handler
- *
- * @author lihao16
- */
- @Slf4j
- public class HeartServerHandler extends ChannelInboundHandlerAdapter {
- private static final SmsbDeviceHeartRecordMapper smsbDeviceHeartRecordMapper = SpringUtils.getBean(SmsbDeviceHeartRecordMapper.class);
- private static final ISmsbDeviceService smsbDeviceService = SpringUtils.getBean(ISmsbDeviceService.class);
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- try {
- String key = NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id();
- // Redis 原子递增
- Long lossHeartCount = RedisUtils.incrAtomicValue(key);
- if (lossHeartCount >= NettyConstants.HEART_LOSS_COUNT) {
- log.info("HeartServerHandler: lossHeartCount = " + lossHeartCount + ", close channel" + ctx.channel().id());
- // 清理 Redis 计数
- RedisUtils.deleteObject(key);
- ctx.channel().close();
- } else {
- log.info("HeartServerHandler: lossHeartCount = " + lossHeartCount + ", channel id = " + ctx.channel().id());
- }
- } catch (Exception e) {
- log.error("Redis 操作失败: " + e.getMessage(), e);
- }
- }
- } else {
- super.userEventTriggered(ctx, evt);
- }
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- String message = (String) msg;
- // 处理heartbeat消息
- String messageType = message.split("/")[1];
- if (messageType.equals(ReceiveMessageType.HEARTBEAT.getValue())) {
- String identifier = message.split("/")[0];
- if (!StringUtils.isEmpty(identifier)) {
- Map<String, Channel> snChannelMap = ConnectServerHandler.getSnChannelMap();
- // 判断snChannelMap是否包含macAddr这个key值
- if (!snChannelMap.containsKey(identifier)) {
- log.info("心跳监测:snChannelMap 无 identifier = " + identifier + "信息,重新保存");
- snChannelMap.put(identifier, ctx.channel());
- ConnectServerHandler.updateDeviceOnlineStatue(identifier, NettyConstants.DEVICE_ONLINE_STATUS);
- }
- }
- String heartbeatReplay = identifier + PushMessageType.HEARTBEAT_REPLAY.getValue();
- ByteBuf byteBuf = Unpooled.copiedBuffer(heartbeatReplay + NettyConstants.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
- ctx.channel().writeAndFlush(byteBuf);
- log.debug("HeartServerHandler: 心跳消息已返回 : " + ctx.channel().id() + "identifier = " + identifier);
- Long lastNettyHeartTime = RedisUtils.getCacheObject(NettyConstants.DEVICE_LAST_HEART_PREFIX + identifier);
- // redis cache device last heart time
- Long currentNettyHeartTime = System.currentTimeMillis();
- RedisUtils.setCacheObject(NettyConstants.DEVICE_LAST_HEART_PREFIX + identifier, currentNettyHeartTime);
- buildAndInsertHeart(identifier, lastNettyHeartTime, currentNettyHeartTime);
- }else {
- ctx.fireChannelRead(message);
- }
- }
- private void buildAndInsertHeart(String identifier, Long lastNettyHeartTime, Long currentNettyHeartTime) {
- SmsbDeviceHeartRecord smsbDeviceHeartRecord = new SmsbDeviceHeartRecord();
- smsbDeviceHeartRecord.setHeartType(2);
- if (lastNettyHeartTime == null) {
- smsbDeviceHeartRecord.setTimeInterval(0L);
- }else {
- smsbDeviceHeartRecord.setTimeInterval(currentNettyHeartTime - lastNettyHeartTime);
- }
- smsbDeviceHeartRecord.setIdentifier(identifier);
- // 获取设备信息 缓存中
- SmsbDeviceVo smsbDeviceVo = smsbDeviceService.getDeviceByIdentifier(identifier);
- if (null != smsbDeviceVo) {
- smsbDeviceHeartRecord.setTenantId(smsbDeviceVo.getTenantId());
- }
- smsbDeviceHeartRecordMapper.insert(smsbDeviceHeartRecord);
- }
- /*@Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- log.info("HeartServerHandler: " + ctx.channel().id() + " : 已经一分钟未收到客户端的消息!");
- if (evt instanceof IdleStateEvent){
- IdleStateEvent event = (IdleStateEvent)evt;
- if (event.state() == IdleState.READER_IDLE){
- String lossHeartCount = RedisUtils.getCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id());
- if (StringUtils.isNotEmpty(lossHeartCount) && Integer.parseInt(lossHeartCount) > NettyConstants.HEART_LOSS_COUNT){
- log.info("HeartServerHandler : lossHeartCount = " + lossHeartCount + ",channel close");
- ctx.channel().close();
- return;
- }
- if (StringUtils.isEmpty(lossHeartCount)) {
- RedisUtils.setCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(), "1");
- }else {
- RedisUtils.setCacheObject(NettyConstants.DEVICE_HEART_LOSS_PREFIX + ctx.channel().id(), String.valueOf(Integer.parseInt(lossHeartCount) + 1));
- }
- }
- }else {
- super.userEventTriggered(ctx,evt);
- }
- }*/
- }
|