package com.inspur.netty.handler; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.inspur.device.domain.SmsbDeviceErrorRecord; import com.inspur.device.domain.constants.DeviceConstants; import com.inspur.device.domain.vo.SmsbDeviceErrorRecordVo; import com.inspur.device.domain.vo.SmsbDeviceVo; import com.inspur.device.mapper.SmsbDeviceErrorRecordMapper; import com.inspur.device.mapper.SmsbDeviceMapper; import com.inspur.device.service.ISmsbDeviceService; import com.inspur.device.service.impl.SmsbDeviceServiceImpl; import com.inspur.netty.message.push.PushMessageType; import com.inspur.netty.message.receive.ReceiveMessageType; import com.inspur.netty.util.NettyConstants; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.sm4.MessageHandlerPool; import java.nio.charset.Charset; import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * netty connect handler * * @author lihao16 */ @Slf4j public class ConnectServerHandler extends ChannelInboundHandlerAdapter { /** * sn-channel map */ private static Map snChannelMap = new ConcurrentHashMap(); /** * channelId-mac map */ private static Map channelIdSnMap = new ConcurrentHashMap<>(); /** * device mapper */ private static final SmsbDeviceMapper smsbDeviceMapper = SpringUtils.getBean(SmsbDeviceMapper.class); /** * error record mapper */ private static final SmsbDeviceErrorRecordMapper smsbDeviceErrorRecordMapper = SpringUtils.getBean(SmsbDeviceErrorRecordMapper.class); /** * device service */ private static final ISmsbDeviceService smsbDeviceService = SpringUtils.getBean(SmsbDeviceServiceImpl.class); private static final MessageHandlerPool decryptAndEncryptHandlerPool = SpringUtils.getBean(MessageHandlerPool.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String message = (String) msg; if (StringUtils.isEmpty(message)) { log.info("ConnectServerHandler: 客户端发来的消息是空"); return; } // 处理init消息 String messageType = message.split("/")[1]; if (messageType.equals(ReceiveMessageType.INIT.getValue())) { log.info("ConnectServerHandler: init message :" + message); String channelId = ctx.channel().id().toString(); String identifier = message.split("/")[0]; log.info("ConnectServerHandler: channelId = " + channelId + ",identifier =" + identifier); // bugfix 存在相同mac多个ChannelId Channel oldChannel = snChannelMap.get(identifier); snChannelMap.put(identifier, ctx.channel()); if (null != oldChannel) { channelIdSnMap.remove(oldChannel.id().toString()); } channelIdSnMap.putIfAbsent(channelId, identifier); // update device online status updateDeviceOnlineStatue(identifier, NettyConstants.DEVICE_ONLINE_STATUS); // 发生init成功消息 String initSuccessMsg = identifier + PushMessageType.INIT_REPLAY.getValue() + "/success"; // 消息加密 String encryptMessage = decryptAndEncryptHandlerPool.encryptMessage(initSuccessMsg); ByteBuf byteBuf = Unpooled.copiedBuffer(encryptMessage + NettyConstants.DATA_PACK_SEPARATOR, Charset.forName("utf-8")); ctx.channel().writeAndFlush(byteBuf); } else { ctx.fireChannelRead(message); } } public static void updateDeviceOnlineStatue(String identifier, Integer onlineStatus) { log.info("ConnectServerHandler: update device identifier : " + identifier + " online status to : " + onlineStatus); try { SmsbDeviceVo smsbDeviceVo = smsbDeviceService.getDeviceByIdentifier(identifier); SmsbDeviceErrorRecord errorRecord = new SmsbDeviceErrorRecord(); // 设备上线 if (onlineStatus.equals(NettyConstants.DEVICE_ONLINE_STATUS)) { // 设备离线 smsbDeviceVo.setOnlineStatus(DeviceConstants.DEVICE_STATUS_ONLINE.longValue()); smsbDeviceVo.setLastOnline(new Date()); smsbDeviceService.updateDeviceStatus(smsbDeviceVo); // 记录设备告警记录 // 如果设备已经在线 不生成上线记录 if (smsbDeviceVo.getOnlineStatus().equals(Long.valueOf(onlineStatus))) { log.info("device :" + identifier + " is offline, create device online record"); buildErrorRecord(errorRecord, smsbDeviceVo, DeviceConstants.DEVICE_ERROR_LEVEL_GENERAL,DeviceConstants.DEVICE_ERROR_ONLINE); } smsbDeviceErrorRecordMapper.insert(errorRecord); }else { // 设备离线 smsbDeviceVo.setOnlineStatus(DeviceConstants.DEVICE_STATUS_OFFLINE.longValue()); smsbDeviceVo.setOfflineTime(new Date()); smsbDeviceService.updateDeviceStatus(smsbDeviceVo); buildErrorRecord(errorRecord, smsbDeviceVo, DeviceConstants.DEVICE_ERROR_LEVEL_URGENT,DeviceConstants.DEVICE_ERROR_OFFLINE); smsbDeviceErrorRecordMapper.insert(errorRecord); } } catch (Exception e) { log.error("ConnectServerHandler: update remote device status error {}", e.getMessage()); } } private static void buildErrorRecord(SmsbDeviceErrorRecord errorRecord, SmsbDeviceVo smsbDeviceVo, Integer errorLevel, Integer errorType) { errorRecord.setDeviceId(smsbDeviceVo.getId()); errorRecord.setDeviceName(smsbDeviceVo.getName()); errorRecord.setErrorLevel(errorLevel); errorRecord.setErrorType(errorType); errorRecord.setTenantId(smsbDeviceVo.getTenantId()); // 如果不是离线 在线时长为0 if (!errorType.equals(DeviceConstants.DEVICE_ERROR_OFFLINE)) { errorRecord.setOnlineDuration(0); }else { // 查询这个设备最后一次上线记录 SmsbDeviceErrorRecordVo lastOnlineRecord = smsbDeviceErrorRecordMapper.selectVoOne(new LambdaQueryWrapper() .eq(SmsbDeviceErrorRecord::getDeviceId, smsbDeviceVo.getId()) .eq(SmsbDeviceErrorRecord::getErrorType, DeviceConstants.DEVICE_ERROR_ONLINE) .orderByDesc(SmsbDeviceErrorRecord::getCreateTime) .last(DeviceConstants.LIMIT_ONE)); if (lastOnlineRecord == null) { errorRecord.setOnlineDuration(0); }else { errorRecord.setOnlineDuration((int) (System.currentTimeMillis() - lastOnlineRecord.getCreateTime().getTime()) / 1000); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("ConnectServerHandler:Server,exceptionCaught.ctx.close"); cause.printStackTrace(); removeChannel(ctx); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("ConnectServerHandler:Server,channelInactive channel id = " + ctx.channel().id().toString()); removeChannel(ctx); ctx.close(); } private void removeChannel(ChannelHandlerContext ctx) { log.info("ConnectServerHandler: Server,removeChannel ctx.channel().id() = " + ctx.channel().id().toString()); String channelId = ctx.channel().id().toString(); // 根据ChannelID获取sn if (channelIdSnMap.containsKey(channelId)) { String macAddr = channelIdSnMap.get(channelId); channelIdSnMap.remove(channelId); if (snChannelMap.containsKey(macAddr)) { snChannelMap.remove(macAddr); updateDeviceOnlineStatue(macAddr, NettyConstants.DEVICE_OFFLINE_STATUS); } } } /*public static Channel getChannelByMac(String macAddr){ try { byte[] channelBytes = RedisUtils.getCacheObject(NettyConstants.MAC_CHANNEL_SERIALIZE_KEY + macAddr); if (channelBytes != null) { return ChannelSerializationUtils.deserializeChannel(channelBytes); } return null; }catch (Exception e){ log.error("ConnectServerHandler: getChannelByMac error {}", e.getMessage()); return null; } }*/ public static Map getSnChannelMap() { return snChannelMap; } public static Map getChannelIdMacMap() { return channelIdSnMap; } }