TaskLogPushStartHandler.java 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package com.inspur.netty.handler;
  2. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  3. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  4. import com.inspur.device.domain.SmsbDeviceLogPush;
  5. import com.inspur.device.domain.SmsbDeviceTask;
  6. import com.inspur.device.domain.SmsbDeviceTaskDetail;
  7. import com.inspur.device.domain.constants.DeviceTaskConstants;
  8. import com.inspur.device.domain.vo.SmsbDeviceTaskVo;
  9. import com.inspur.device.domain.vo.SmsbDeviceVo;
  10. import com.inspur.device.mapper.SmsbDeviceLogPushMapper;
  11. import com.inspur.device.mapper.SmsbDeviceTaskDetailMapper;
  12. import com.inspur.device.mapper.SmsbDeviceTaskMapper;
  13. import com.inspur.device.service.ISmsbDeviceService;
  14. import com.inspur.device.service.impl.SmsbDeviceServiceImpl;
  15. import com.inspur.netty.message.receive.ReceiveMessageType;
  16. import io.netty.channel.ChannelHandlerContext;
  17. import io.netty.channel.ChannelInboundHandlerAdapter;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.dromara.common.core.utils.SpringUtils;
  20. /**
  21. * 日志开始抓取回复
  22. *
  23. * @author lihao16
  24. */
  25. @Slf4j
  26. public class TaskLogPushStartHandler extends ChannelInboundHandlerAdapter {
  27. private static final ISmsbDeviceService smsbDeviceService = SpringUtils.getBean(SmsbDeviceServiceImpl.class);
  28. private static final SmsbDeviceTaskMapper smsbDeviceTaskMapper = SpringUtils.getBean(SmsbDeviceTaskMapper.class);
  29. private static final SmsbDeviceTaskDetailMapper smsbDeviceTaskDetailMapper = SpringUtils.getBean(SmsbDeviceTaskDetailMapper.class);
  30. private static final SmsbDeviceLogPushMapper smsbDeviceLogPushMapper = SpringUtils.getBean(SmsbDeviceLogPushMapper.class);
  31. @Override
  32. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  33. String message = (String) msg;
  34. // 处理结束推流回复reply消息
  35. if (message.contains(ReceiveMessageType.CONTROL_START_LOG_PUSH_REPLAY.getValue())) {
  36. log.info("收到设备日志开始抓取回复消息:{}", message);
  37. String identifier = message.split("/")[0];
  38. // 1、 查询当前设备最近一条日志抓取任务
  39. SmsbDeviceVo smsbDeviceVo = smsbDeviceService.getDeviceByIdentifier(identifier);
  40. if (null == smsbDeviceVo) {
  41. log.error("未查询到设备信息:{}", identifier);
  42. return;
  43. }
  44. LambdaQueryWrapper<SmsbDeviceTask> lqw = Wrappers.lambdaQuery();
  45. lqw.eq(SmsbDeviceTask::getIdentifier, identifier);
  46. lqw.eq(SmsbDeviceTask::getTaskType, DeviceTaskConstants.DEVICE_LOG_PUSH_START);
  47. lqw.orderByDesc(SmsbDeviceTask::getCreateTime);
  48. lqw.last("limit 1");
  49. SmsbDeviceTaskVo smsbDeviceTask = smsbDeviceTaskMapper.selectVoOne(lqw);
  50. if (null == smsbDeviceTask) {
  51. log.error("未查询到设备日志开始抓取任务:{}", identifier);
  52. return;
  53. }
  54. // 2、收到回复 认定接收任务
  55. SmsbDeviceTaskDetail taskDetail = new SmsbDeviceTaskDetail();
  56. taskDetail.setTaskId(smsbDeviceTask.getId());
  57. taskDetail.setTaskStatus(DeviceTaskConstants.DEVICE_TASK_STATUS_INIT);
  58. taskDetail.setTenantId(smsbDeviceVo.getTenantId());
  59. smsbDeviceTaskDetailMapper.insert(taskDetail);
  60. // 3、更新抓取状态
  61. SmsbDeviceLogPush lastLogPush = smsbDeviceLogPushMapper.selectOne(new LambdaQueryWrapper<SmsbDeviceLogPush>()
  62. .eq(SmsbDeviceLogPush::getDeviceId, smsbDeviceVo.getId()).orderByDesc(SmsbDeviceLogPush::getId).last("limit 1"));
  63. lastLogPush.setLogStatus(1);
  64. smsbDeviceLogPushMapper.updateById(lastLogPush);
  65. } else {
  66. ctx.fireChannelRead(message);
  67. }
  68. }
  69. }