瀏覽代碼

大华摄像头设备视频,支持订阅主题接受消息和下发指令

lihao16 4 月之前
父節點
當前提交
a10b9815e9

+ 1 - 1
elevator-admin/pom.xml

@@ -323,4 +323,4 @@
         </resources>
     </build>
 
-</project>
+</project>

+ 41 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/bean/DhErrorBean.java

@@ -0,0 +1,41 @@
+package com.inspur.elevator.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 大华设备故障上报
+ * @author lihao16
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class DhErrorBean {
+
+    /**
+     * 主键,跟uuid保持一致
+     */
+    private String keyId;
+
+    /**
+     * 故障类型
+     */
+    private Integer faultType;
+
+    /**
+     * 故障开始时间
+     */
+    private String faultBeginTime;
+
+    /**
+     * 故障结束时间
+     */
+    private String faultEndTime;
+
+    /**
+     * 故障视频地址
+     */
+    private String faultVideoUrl;
+
+}

+ 51 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/bean/DhHeartbeat.java

@@ -0,0 +1,51 @@
+package com.inspur.elevator.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 大华心跳消息
+ * @author lihao16
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class DhHeartbeat extends DhMsgBase{
+
+    /**
+     * 内存,单位G
+     */
+    private String memory;
+
+    /**
+     * 磁盘使用率,百分比
+     */
+    private String diskUsage;
+
+    /**
+     * cpu,百分比
+     */
+    private String cpu;
+
+    /**
+     * 磁盘剩余空间,单位G
+     */
+    private String diskLeftSpace;
+
+    /**
+     * 磁盘总空间,单位G
+     */
+    private String diskTotalSpace;
+
+    /**
+     * 当前的系统时间,时间戳
+     */
+    private Long timeStamp;
+
+    /**
+     * 设备IP地址
+     */
+    private String ipAddr;
+
+}

+ 54 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/bean/DhLiftFault.java

@@ -0,0 +1,54 @@
+package com.inspur.elevator.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 电梯故障上报
+ * @author lihao16
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class DhLiftFault {
+
+    /**
+     * 故障编号:
+     * 1	关人
+     * 2	开门走车
+     * 3    非门区停车
+     * 4    冲顶
+     * 5    蹲底
+     * 6	电梯超速运行
+     * 7	电梯进入检修模式
+     * 11	反复开关门
+     * 12	关门异常
+     */
+    public static final Integer FAULTCODE_TRAP = 1;
+    public static final Integer FAULTCODE_OPENDOOR_RUN = 2;
+    public static final Integer FAULTCODE_NODOOR_STOP = 3;
+    public static final Integer FAULTCODE_TOP = 4;
+    public static final Integer FAULTCODE_BOTTOM = 5;
+    public static final Integer FAULTCODE_OVERSPEED = 6;
+    public static final Integer FAULTCODE_TAKECARE = 7;
+    public static final Integer FAULTCODE_POWER_OFF = 9;
+    public static final Integer FAULTCODE_REPEAT_OPENCLOSE = 11;
+    public static final Integer FAULTCODE_CLOSE = 12;
+
+
+
+
+    /**
+     * 数据库id,设备自定义
+     */
+    private String uuid;
+
+    /**
+     * 故障类集合
+     */
+    private List<DhErrorBean> ErrorListBean;
+
+}

+ 48 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/bean/DhLiftState.java

@@ -0,0 +1,48 @@
+package com.inspur.elevator.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 电梯实时运行状态
+ * @author lihao16
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class DhLiftState extends DhMsgBase{
+
+    /**
+     * 故障类型
+     */
+    private Integer faultType;
+
+    /**
+     * 故障时间
+     */
+    private String faultTime;
+
+
+    /**
+     * 轿厢内光线,变化幅度
+     * 强:2;中:1;弱:0
+     */
+    private Integer lightVariationAmplitude;
+
+    /**
+     * 加速度
+     */
+    private Double acceleration;
+
+    /**
+     * 抖动频率
+     */
+    private Double jitterFrequency;
+
+    /**
+     * 抖动加速度
+     */
+    private Double jitterAcceleration;
+
+}

+ 83 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/bean/DhMsgBase.java

@@ -0,0 +1,83 @@
+package com.inspur.elevator.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 大华上报的数据
+ *
+ * @author: Hao Li
+ * @date: 2020/7/23 16:05
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class DhMsgBase {
+
+    /**
+     * 报警信息
+     */
+    public static final String CMD_HEARTBEAT = "HeartBeat";
+
+    /**
+     * 报警信息
+     */
+    public static final String CMD_ALARM_REPORT = "LiftFault";
+    /**
+     * 电梯实时数据上报
+     */
+    public static final String CMD_ELEINFO_REPORT = "LiftState";
+
+    /**
+     * 单次运行记录数据上报
+     */
+    public static final String CMD_STATISTICS_SINGLE_DATA_REPORT = "LiftRunInfo";
+
+    /**
+     * 操作类型
+     */
+    private String type;
+
+    /**
+     * 设备mac
+     */
+    private String macAddr;
+
+    /**
+     * 电梯编号
+     */
+    private String elevatorNo;
+
+    /**
+     * 当前楼层
+     */
+    private Integer currentFloor;
+
+    /**
+     * 当前速度
+     */
+    private Integer currentSpeed;
+
+    /**
+     * 运行方向(1向上,2向下)
+     */
+    private Integer runningDirection;
+
+    /**
+     * 当前门状态(0 开门,1 关门)
+     */
+    private Integer doorStatus;
+
+    /**
+     * 人数
+     */
+    private Integer personInLift;
+
+    /**
+     * 温度
+     */
+    private Double temperature;
+
+
+}

+ 89 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/bean/DhRunInfo.java

@@ -0,0 +1,89 @@
+package com.inspur.elevator.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * 大华MQTT上报单次运行数据
+ * @author lihao16
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class DhRunInfo extends DhMsgBase{
+
+    /**
+     * 运行总里程 单位米
+     */
+    private Float runningMileageTotal;
+
+    /**
+     * 单次运行时长,单位秒
+     */
+    private Float runningTimeTotal;
+
+    /**
+     * 载人数量
+     */
+    private Integer mannedNum;
+
+    /**
+     * 进入人数量
+     */
+    private Integer inNum;
+
+    /**
+     * 离开人数量
+     */
+    private Integer outNum;
+
+    /**
+     * 电梯运行速度集合
+     */
+    private List<Float> runSpeed;
+
+    /**
+     * 抖动频率数组
+     */
+    private List<Float> jitterFrequency;
+
+    /**
+     * 抖动加速度数组
+     */
+    private List<Float> jitterAcceleration;
+
+    /**
+     * 加速度数组
+     */
+    private List<Float> acceleration;
+
+    /**
+     * 运行开始时间,时间戳
+     */
+    private Long runBeginTimeStamp;
+
+    /**
+     * 运行结束时间,时间戳
+     */
+    private Long runEndTimeStamp;
+
+    /**
+     * 轿厢内光线,变化幅度
+     * 强2;中:1;弱:0
+     */
+    private Integer lightVariationAmplitude;
+
+    /**
+     * 运行开始楼层
+     */
+    private Integer runBeginFloor;
+
+    /**
+     * 运行结束楼层
+     */
+    private Integer runEndFloor;
+
+}

+ 16 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/config/Constant.java

@@ -1,5 +1,6 @@
 package com.inspur.elevator.config;
 
+import com.inspur.elevator.bean.DhLiftFault;
 import com.inspur.elevator.bean.HnAlarmMsg;
 import org.springframework.stereotype.Component;
 
@@ -13,6 +14,10 @@ public class Constant {
      * 海纳云错误码,和电梯安全平台错误码的对应关系
      */
     public static final ConcurrentHashMap<String, String> FAULTCODE_MAP = new ConcurrentHashMap<>();
+    /**
+     * 大华错误码,和电梯安全平台错误码的对应关系
+     */
+    public static final ConcurrentHashMap<String, String> FAULTCODE_MAP_DAHUA = new ConcurrentHashMap<>();
 
     static {
         FAULTCODE_MAP.put(Integer.toString(HnAlarmMsg.FAULTCODE_TRAP), "48");
@@ -22,6 +27,17 @@ public class Constant {
         FAULTCODE_MAP.put(Integer.toString(HnAlarmMsg.FAULTCODE_BLOCK), "02");
         FAULTCODE_MAP.put(Integer.toString(HnAlarmMsg.FAULTCODE_REPEAT_OPENCLOSE), "28");
         FAULTCODE_MAP.put(Integer.toString(HnAlarmMsg.FAULTCODE_OVERSPEED), "33");
+
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_TRAP), "48");
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_OPENDOOR_RUN), "35");
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_NODOOR_STOP), "32");
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_TOP), "36");
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_BOTTOM), "37");
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_OVERSPEED), "33");
+        // FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_TAKECARE), "33");
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_POWER_OFF), "31");
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_REPEAT_OPENCLOSE), "28");
+        FAULTCODE_MAP_DAHUA.put(Integer.toString(DhLiftFault.FAULTCODE_CLOSE), "02");
     }
 
     public static final String REDIS_KEY_FAULTCODE = "fault_code_";

+ 26 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/config/MqttPublishConfig.java

@@ -24,6 +24,12 @@ public class MqttPublishConfig {
     @Value("${mqtt.client-id:client-01}")
     private String clientId;
 
+    @Value("${mqtt.qos-dahua:1}")
+    private Integer qosDahua;
+
+    @Value("${mqtt.client-id-dahua:client-01}")
+    private String clientIdDahua;
+
     /**
      * 创建推送消息通道
      *
@@ -34,6 +40,11 @@ public class MqttPublishConfig {
         return new DirectChannel();
     }
 
+    @Bean(value = "publishChannelDahua")
+    public MessageChannel publishChannelDahua() {
+        return new DirectChannel();
+    }
+
     /**
      * mqtt推送配置
      * @param factory mqtt客户端
@@ -47,4 +58,19 @@ public class MqttPublishConfig {
         handler.setDefaultQos(qos);
         return handler;
     }
+
+    /**
+     * mqtt推送配置
+     * @param factory mqtt客户端
+     * @return
+     */
+    @Bean
+    @ServiceActivator(inputChannel = "publishChannelDahua")
+    public MessageHandler mqttOutbound_dahua(MqttPahoClientFactory factory) {
+        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientIdDahua + System.currentTimeMillis(), factory);
+        handler.setAsync(true);
+        handler.setDefaultQos(qosDahua);
+
+        return handler;
+    }
 }

+ 26 - 1
elevator-hn-adapter/src/main/java/com/inspur/elevator/config/MqttSubscribeConfig.java

@@ -24,12 +24,21 @@ public class MqttSubscribeConfig {
     @Value("${mqtt.receive-topic}")
     private String topic;
 
+    @Value("${mqtt.receive-topic-dahua}")
+    private String topicDahua;
+
     @Value("${mqtt.qos:0}")
     private Integer qos;
 
+    @Value("${mqtt.qos-dahua:1}")
+    private Integer qosDahua;
+
     @Value("${mqtt.client-id:client-01}")
     private String clientId;
 
+    @Value("${mqtt.client-id-dahua:client-01}")
+    private String clientIdDahua;
+
     private final MqttMsgSubscribe subscriber;
 
     public MqttSubscribeConfig(MqttMsgSubscribe mqttMsgSubscribe) {
@@ -47,7 +56,23 @@ public class MqttSubscribeConfig {
     }
 
     /**
-     * 使用客户端从订阅消息通道获取消息,配置qos等信息
+     * 大华 mqtt - heartbeat
+     * @param mqttSubscribeChannel 订阅消息通道
+     * @param factory mqtt客户端
+     * @return
+     */
+    @Bean
+    public MessageProducer channelInbound_dahua(MessageChannel mqttSubscribeChannel, MqttPahoClientFactory factory) {
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientIdDahua + System.currentTimeMillis(), factory, topicDahua);
+        adapter.setCompletionTimeout(5000);
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(qosDahua);
+        adapter.setOutputChannel(mqttSubscribeChannel);
+        return adapter;
+    }
+
+    /**
+     * 使用客户端从订阅消息通道获取消息,配置qos等信息 海纳云
      * @param mqttSubscribeChannel 订阅消息通道
      * @param factory mqtt客户端
      * @return

+ 24 - 0
elevator-hn-adapter/src/main/java/com/inspur/elevator/mq/MqttMsgPublisherDahua.java

@@ -0,0 +1,24 @@
+package com.inspur.elevator.mq;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+/**
+ * MQTT消息发布
+ * @author lihao16
+ */
+@Component
+@ConditionalOnProperty(value = "mqtt.enabled", havingValue = "true")
+@MessagingGateway(defaultRequestChannel = "publishChannelDahua")
+public interface MqttMsgPublisherDahua {
+
+    /**
+     * 推送消息
+     * @param topic 主题
+     * @param payload 消息内容
+     */
+    void publishMsg(@Header(MqttHeaders.TOPIC) String topic, String payload);
+}

+ 51 - 2
elevator-hn-adapter/src/main/java/com/inspur/elevator/mq/MqttMsgSubscribe.java

@@ -1,7 +1,7 @@
 package com.inspur.elevator.mq;
 
 import com.alibaba.fastjson2.JSON;
-import com.inspur.elevator.bean.HnMsg;
+import com.inspur.elevator.bean.*;
 import com.inspur.elevator.service.AsyncTaskService;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -23,6 +23,10 @@ public class MqttMsgSubscribe implements MessageHandler {
 
     private Logger logger = LoggerFactory.getLogger(MqttMsgSubscribe.class);
 
+    private static final String HN_CAMERA_SRV_TOPIC = "CAMERA_SRV_TOPIC/";
+
+    private static final String DH_CAMERA_SRV_TOPIC = "/API/V1/Up";
+
     @Autowired
     private AsyncTaskService asyncTaskService;
 
@@ -31,7 +35,19 @@ public class MqttMsgSubscribe implements MessageHandler {
         String id = String.valueOf(message.getHeaders().get(MqttHeaders.ID));
         String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
         String payload = String.valueOf(message.getPayload());
+        System.out.println("Received MQTT message: ID=" + id + ", Topic=" + topic + ", Payload=" + payload);
         logger.info("Mqtt服务器ID-->{},订阅主题-->{},收到消息-->{}", id, topic, payload);
+        if (topic.startsWith(HN_CAMERA_SRV_TOPIC)) {
+            // 海纳云处理逻辑
+            handleHnMessage(payload);
+        } else if (topic.startsWith(DH_CAMERA_SRV_TOPIC)) {
+            // 大华处理逻辑
+            handleDhMessage(payload, topic);
+        }
+    }
+
+    private void handleHnMessage(String payload) {
+        logger.info("海纳云消息处理开始:{}", payload);
         try {
             HnMsg hnMsg = JSON.parseObject(payload, HnMsg.class);
             if (StringUtils.isNotBlank(hnMsg.getCmd())) {
@@ -55,8 +71,41 @@ public class MqttMsgSubscribe implements MessageHandler {
             }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
-            logger.error("消息处理失败:{}", payload);
+            logger.error("海纳云消息处理失败:{}", payload);
         }
+    }
 
+    private void handleDhMessage(String payload, String topic) {
+        logger.info("大华消息处理开始:{}", payload);
+        try {
+            String msgType = topic.split("/")[5];
+            String deviceSn = topic.split("/")[4];
+            if (StringUtils.isNotBlank(msgType)) {
+                switch (msgType) {
+                    // 根据CMD类型,分别处理消息
+                    case DhMsgBase.CMD_HEARTBEAT:
+                        DhHeartbeat heartbeat = JSON.parseObject(payload, DhHeartbeat.class);
+                        asyncTaskService.handleDhHeartbeatMsg(heartbeat, deviceSn);
+                        break;
+                    case DhMsgBase.CMD_ALARM_REPORT:
+                        DhLiftFault faultMsg = JSON.parseObject(payload, DhLiftFault.class);
+                        asyncTaskService.handleDhAlarmMsg(faultMsg, deviceSn);
+                        break;
+                    case DhMsgBase.CMD_ELEINFO_REPORT:
+                        DhLiftState stateMsg = JSON.parseObject(payload, DhLiftState.class);
+                        asyncTaskService.handleDhEleInfoMsg(stateMsg, deviceSn);
+                        break;
+                    case DhMsgBase.CMD_STATISTICS_SINGLE_DATA_REPORT:
+                        DhRunInfo runInfoMsg = JSON.parseObject(payload, DhRunInfo.class);
+                        asyncTaskService.handleDhSingleDataMsg(runInfoMsg, deviceSn);
+                        break;
+                    default:
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            logger.error("大华消息处理失败:{}", payload);
+        }
     }
 }

+ 162 - 10
elevator-hn-adapter/src/main/java/com/inspur/elevator/service/AsyncTaskService.java

@@ -1,18 +1,17 @@
 package com.inspur.elevator.service;
 
 import com.alibaba.fastjson2.JSON;
-import com.inspur.elevator.bean.HnAlarmMsg;
-import com.inspur.elevator.bean.HnEleInfoMsg;
-import com.inspur.elevator.bean.HnMsg;
-import com.inspur.elevator.bean.HnStatisticSingleMsg;
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson2.util.DateUtils;
+import com.inspur.elevator.bean.*;
 import com.inspur.elevator.bean.elevator.DeviceAlarm;
 import com.inspur.elevator.bean.elevator.DeviceEvent;
 import com.inspur.elevator.bean.elevator.DeviceStatistics;
 import com.inspur.elevator.config.Constant;
 import com.inspur.elevator.mq.MqttMsgPublisher;
+import com.inspur.elevator.mq.MqttMsgPublisherDahua;
 import com.inspur.elevator.utils.RedisUtil;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.tomcat.util.bcel.Const;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -21,7 +20,6 @@ import org.springframework.stereotype.Service;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
-import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,12 +40,15 @@ public class AsyncTaskService {
 
     @Autowired
     private MqttMsgPublisher publisher;
+    @Autowired
+    private MqttMsgPublisherDahua publisherDahua;
 
     @Autowired
     private RedisUtil redisUtil;
 
     /**
      * 设备报警消息
+     *
      * @param hnMsg
      */
     @Async
@@ -69,12 +70,59 @@ public class AsyncTaskService {
             String topic = "ele/" + sn + "/event/device_alarm_info/post";
             String payload = JSON.toJSONString(deviceAlarm);
             this.publisher.publishMsg(topic, payload);
-            logger.info("推送主题-->{},推送消息-->{}", topic, payload);
+            logger.info("推送海纳云设备告警主题-->{},推送消息-->{}", topic, payload);
+        }
+    }
+
+    /**
+     * 电梯报警信息上报 - 大华
+     *
+     * @param faultMsg
+     * @param deviceSn
+     */
+    @Async
+    public void handleDhAlarmMsg(DhLiftFault faultMsg, String deviceSn) {
+        logger.info("handleDhAlarmMsg:{}", faultMsg);
+        // 获取报警信息
+        DhErrorBean errorBean = faultMsg.getErrorListBean().get(0);
+        Integer faultType = errorBean.getFaultType();
+        String faultBeginTime = errorBean.getFaultBeginTime();
+        String faultEndTime = errorBean.getFaultEndTime();
+        // 如果是进入检修的错误码,更新维保标识
+        if (DhLiftFault.FAULTCODE_TAKECARE.equals(faultType)) {
+            // 结束时间为空  说明未结束
+            if (StringUtils.isEmpty(faultEndTime)) {
+                redisUtil.set(Constant.REDIS_KEY_MT + deviceSn, "1");
+            } else {
+                redisUtil.set(Constant.REDIS_KEY_MT + deviceSn, "0");
+            }
+        } else {
+            // 添加故障码
+            if (Constant.FAULTCODE_MAP_DAHUA.containsKey(Integer.toString(faultType)) && StringUtils.isEmpty(faultEndTime)) {
+                // 对应的电梯安全平台的错误码
+                String faultCodeElevator = Constant.FAULTCODE_MAP_DAHUA.get(Integer.toString(faultType));
+                redisUtil.addSetData(Constant.REDIS_KEY_FAULTCODE + deviceSn, faultCodeElevator);
+            }
+            // 删除故障码
+            if (Constant.FAULTCODE_MAP_DAHUA.containsKey(Integer.toString(faultType)) && !StringUtils.isEmpty(faultEndTime)) {
+                // 对应的电梯安全平台的错误码
+                String faultCodeElevator = Constant.FAULTCODE_MAP_DAHUA.get(Integer.toString(faultType));
+                redisUtil.delSetData(Constant.REDIS_KEY_FAULTCODE + deviceSn, faultCodeElevator);
+            }
+            List<String> alarmCodeList = new ArrayList<>(getDeviceFaultCode(deviceSn));
+            // 构造电梯安全应急平台的消息报文,并发送
+            DeviceAlarm deviceAlarm = new DeviceAlarm(deviceSn, Long.toString(System.currentTimeMillis()),
+                    alarmCodeList.stream().reduce("", (a, b) -> a + (StringUtils.isNotEmpty(a) ? a + "," : "") + b));
+            String topic = "ele/" + deviceSn + "/event/device_alarm_info/post";
+            String payload = JSON.toJSONString(deviceAlarm);
+            this.publisher.publishMsg(topic, payload);
+            logger.info("推送大华设备告警主题-->{},推送消息-->{}", topic, payload);
         }
     }
 
     /**
      * 更新设备当前的错误码
+     *
      * @param hnAlarmMsg
      * @return
      */
@@ -132,7 +180,7 @@ public class AsyncTaskService {
         String topic = "ele/" + sn + "/event/property/post";
         String payload = JSON.toJSONString(deviceEvent);
         this.publisher.publishMsg(topic, payload);
-        logger.info("推送主题-->{},推送消息-->{}", topic, payload);
+        logger.info("推送海纳云实时数据主题-->{},推送消息-->{}", topic, payload);
     }
 
     /**
@@ -172,8 +220,43 @@ public class AsyncTaskService {
         return deviceEvent;
     }
 
+    private DeviceEvent syncDhDeviceEventData(String deviceSn, DhLiftState stateMsg) {
+        DeviceEvent deviceEvent = new DeviceEvent();
+        deviceEvent.setId(deviceSn);
+        //  时间 2025-06-25 14:30:28 转换成时间戳
+        Long ts = DateUtils.parseMillis(stateMsg.getFaultTime());
+        deviceEvent.setTs(String.valueOf(ts));
+        deviceEvent.setFl(stateMsg.getCurrentFloor());
+        // 两边的开关门状态定义是相反的
+        deviceEvent.setOd(stateMsg.getDoorStatus().equals(0) ? 1 : 0);
+        deviceEvent.setDi(stateMsg.getRunningDirection());
+        // 速度cm/s -> m/s
+        deviceEvent.setV(Double.valueOf(stateMsg.getCurrentSpeed()));
+        // 加速度单位 大华没有
+        deviceEvent.setAx(0.0);
+        deviceEvent.setAy(0.0);
+        deviceEvent.setAz(0.0);
+        // 偏移角度,大华没有
+        deviceEvent.setDfb(0.0);
+        deviceEvent.setDlr(0.0);
+        deviceEvent.setPc(stateMsg.getPersonInLift());
+        // 抖动频率
+        deviceEvent.setFj(stateMsg.getJitterFrequency());
+        // 抖动加速度
+        deviceEvent.setFa(stateMsg.getJitterAcceleration());
+        deviceEvent.setTe(stateMsg.getTemperature());
+        // 湿度,大华没有
+        deviceEvent.setHu(0.0);
+        // 亮度
+        deviceEvent.setBr(stateMsg.getLightVariationAmplitude());
+        deviceEvent.setMt(redisUtil.hasKey(Constant.REDIS_KEY_MT + deviceSn) ? Integer.parseInt(redisUtil.getString(Constant.REDIS_KEY_MT + deviceSn)) : 0);
+        deviceEvent.setBpv(0.0);
+        return deviceEvent;
+    }
+
     /**
      * 加速度单位转换,m/s2 -> mg
+     *
      * @param ms2
      * @return
      */
@@ -223,7 +306,76 @@ public class AsyncTaskService {
             String topic = "ele/" + sn + "/event/device_statistics_info/post";
             String payload = JSON.toJSONString(deviceStatistics);
             this.publisher.publishMsg(topic, payload);
-            logger.info("推送主题-->{},推送消息-->{}", topic, payload);
+            logger.info("推送海纳云单次运行主题-->{},推送消息-->{}", topic, payload);
         }
     }
-}
+
+
+    /**
+     * 电梯实时运行状态上报 - 大华
+     *
+     * @param stateMsg
+     * @param deviceSn
+     */
+    @Async
+    public void handleDhEleInfoMsg(DhLiftState stateMsg, String deviceSn) {
+        logger.info("handleDhEleInfoMsg:{}", stateMsg);
+        // 构造电梯安全应急平台的消息报文,并发送
+        DeviceEvent deviceEvent = syncDhDeviceEventData(deviceSn, stateMsg);
+        String topic = "ele/" + deviceSn + "/event/property/post";
+        String payload = JSON.toJSONString(deviceEvent);
+        this.publisher.publishMsg(topic, payload);
+        logger.info("推送大华实时数据主题-->{},推送消息-->{}", topic, payload);
+    }
+
+    /**
+     * 电梯单次运行状态上报 - 大华
+     *
+     * @param runInfoMsg
+     * @param deviceSn
+     */
+    @Async
+    public void handleDhSingleDataMsg(DhRunInfo runInfoMsg, String deviceSn) {
+        logger.info("handleDhSingleDataMsg:{}", runInfoMsg);
+        if (runInfoMsg != null) {
+            // 构造电梯安全应急平台的消息报文,并发送
+            DeviceStatistics deviceStatistics = new DeviceStatistics();
+            deviceStatistics.setId(deviceSn);
+            deviceStatistics.setRc(1);
+            deviceStatistics.setFrom(runInfoMsg.getRunBeginFloor());
+            deviceStatistics.setTo(runInfoMsg.getRunEndFloor());
+            // 运行距离
+            deviceStatistics.setRd(Double.valueOf(runInfoMsg.getRunningMileageTotal()));
+            deviceStatistics.setRt(Double.valueOf(runInfoMsg.getRunningTimeTotal()));
+            deviceStatistics.setEbc(0);
+            deviceStatistics.setPc(runInfoMsg.getMannedNum());
+            deviceStatistics.setFt(String.valueOf(runInfoMsg.getRunBeginTimeStamp()));
+            deviceStatistics.setLt(String.valueOf(runInfoMsg.getRunEndTimeStamp()));
+            deviceStatistics.setTe(runInfoMsg.getTemperature());
+            deviceStatistics.setHu(0);
+            deviceStatistics.setBr(runInfoMsg.getLightVariationAmplitude());
+            String topic = "ele/" + deviceSn + "/event/device_statistics_info/post";
+            String payload = JSON.toJSONString(deviceStatistics);
+            this.publisher.publishMsg(topic, payload);
+            logger.info("推送大华单次运行主题-->{},推送消息-->{}", topic, payload);
+        }
+    }
+
+    /**
+     * 设备心跳信息 - 大华
+     *
+     * @param heartbeat
+     * @param deviceSn
+     */
+    @Async
+    public void handleDhHeartbeatMsg(DhHeartbeat heartbeat, String deviceSn) {
+        // 收到心跳消息后 下发上报实时运行状态指令
+        logger.info("收到大华设备{}心跳消息,发送实时上状态指令", deviceSn);
+        JSONObject command = new JSONObject();
+        command.put("type", "Sendstate");
+        String topic = "/API/V1/Down/" + deviceSn + "/Command";
+        String payload = JSON.toJSONString(command);
+        this.publisherDahua.publishMsg(topic, payload);
+        logger.info("MQTT下发指令消息已成功发布至主题 {}", topic);
+    }
+}