import { getSensorRecords } from '@/api/external' import { publish, subscribe, unsubscribe } from '@/utils/mqtt' import { parseTime, parseByte } from '@/utils' let productId = null let deviceId = null export const Type = { LOAD: 1, LOOP: 2, CACHE: 3 } const RESET_MESSAGE = '__reset__' const types = new Map() export function start (device) { if (productId) { stop() } productId = device.productId deviceId = device.id subscribe([ `${productId}/${deviceId}/online`, `${productId}/${deviceId}/offline`, `${productId}/${deviceId}/status/reply`, `${productId}/${deviceId}/resource/progress`, `${productId}/${deviceId}/screen` ], onMessage) createType('online', { parser: onlineParser }) createType('status', { type: Type.LOAD, parser: statusParser, value: '未知', reset: true }) createType('download', { type: Type.CACHE, parser: downloadParser, value: [], reset: true, mark: {} }) createType('topology', { type: Type.CACHE, parser: topologyParser, reset: true }) } export function stop () { if (productId) { unsubscribe([ `${productId}/${deviceId}/online`, `${productId}/${deviceId}/offline`, `${productId}/${deviceId}/status/reply`, `${productId}/${deviceId}/resource/progress`, `${productId}/${deviceId}/screen` ], onMessage) productId = null deviceId = null types.forEach(inst => { if (inst.type === Type.LOOP) { inst.running = false } }) types.clear() } } export function addListener (type, cb, options) { let inst = types.get(type) if (!inst && options) { inst = createType(type, options) } if (inst && !inst.cbs.has(cb)) { inst.cbs.add(cb) invoke(inst, cb) } } export function removeListener (type, cb) { const inst = types.get(type) if (inst) { inst.cbs.delete(cb) } } function getTypeBySend (topic) { switch (topic) { case '/status/ask': return 'status' default: return null } } export function send (topic, message) { return publish(`${productId}/${deviceId}${topic}`, JSON.stringify(message || { timestamp: Date.now() })).then(() => { const inst = types.get(getTypeBySend(topic)) if (inst && inst.type === Type.LOAD) { inst.loading = true inst.cbs.forEach(cb => { cb(inst.value, inst) }) } }) } export function reset (type) { const inst = types.get(type) if (inst?.reset) { onUpdate(inst, RESET_MESSAGE) } } function getType (topic) { switch (topic) { case 'online': case 'offline': return 'online' case 'status/reply': return 'status' case 'resource/progress': return 'download' case 'screen': return 'topology' default: return null } } function onMessage (topic, message) { const result = topic.match(/^(\d+)\/(\d+)\/(.+)$/) if (result) { if (productId === result[1] && deviceId === result[2]) { onUpdate(types.get(getType(result[3])), message, result[3]) } } } function onUpdate (inst, message, topic) { if (inst) { try { const value = inst.parser ? inst.parser(inst, message, topic) : message updateType(inst, value) inst.cbs.forEach(cb => { cb(value, inst) }) } catch (e) { console.warn(e) } } } function updateType (inst, value) { console.log() switch (inst.type) { case Type.LOAD: inst.loading = false break case Type.LOOP: case Type.CACHE: break default: return } inst.value = value } function invoke (inst, cb) { switch (inst.type) { case Type.LOAD: case Type.LOOP: case Type.CACHE: cb(inst.value, inst) break default: break } } function createType (unique, options = {}) { let defaultOptions = null switch (options.type) { case Type.LOAD: defaultOptions = getLoadTypeOptions() break case Type.LOOP: defaultOptions = getLoopTypeOptions() break default: break } const inst = { cbs: new Set(), ...defaultOptions, ...options } types.set(unique, inst) return inst } function getLoadTypeOptions () { return { loading: false } } function getLoopTypeOptions () { return { running: true } } function onlineParser (inst, message, topic) { if (topic === 'online') { return true } reset('status') reset('download') return false } function statusParser (inst, message) { let status try { status = message === RESET_MESSAGE ? void 0 : JSON.parse(message).status } catch (e) { console.warn(e) status = -1 } return status } function downloadParser (inst, message) { if (message === RESET_MESSAGE) { inst.mark = {} inst.value = [] } else { const { ossUrl, complete, errorReason, progress, rate, success } = JSON.parse(message) const file = { id: ossUrl, name: ossUrl, complete, success, status: complete ? success ? 'is-success' : 'is-exception' : 'is-processing', tip: complete ? success ? '下载成功' : errorReason || '下载失败' : `${parseByte(rate)}/s`, percentage: complete ? 100 : progress } if (inst.mark[file.id]) { Object.assign(inst.value[inst.mark[file.id] - 1], file) } else { inst.value.push(file) inst.mark[file.id] = inst.value.length } } return inst.value } function transformValue (type, value, unit) { switch (type) { case 'temperature': return `${((value * 10) | 0) / 10}℃` case 'flooding': return value ? '是' : '否' default: return `${value}${unit}` } } function transform (type, arr) { return arr.map(({ sensorAddr, sensorValue, sensorValueUnit, logDate }) => { return { key: sensorAddr, name: `传感器${sensorAddr}`, sensorValue, value: transformValue(type, sensorValue, sensorValueUnit), time: parseTime(logDate * 1000, '{y}.{m}.{d} {h}:{i}:{s}') } }) } export function startSensor () { let sensor = types.get('sensor') if (sensor) { if (sensor.running) { return } sensor.running = true } else { sensor = createType('sensor', { type: Type.LOOP, parser: sensorParser }) } updateSensors(sensor) } export function stopSensor () { const sensor = types.get('sensor') if (sensor) { sensor.running = false } } function updateSensors (inst) { inst.running && getSensorRecords({ deviceId }, { custom: true }).then(({ data }) => { inst.running && onUpdate(inst, data) return 5000 }, () => 2000).then(delay => { if (inst.running) { setTimeout(updateSensors, delay, inst) } }) } function sensorParser (inst, data) { Object.keys(data).forEach(type => { onUpdate(types.get(type) || createType(type, { type: Type.CACHE, value: [] }), transform(type, data[type])) }) } function topologyParser (inst, message) { if (message === RESET_MESSAGE) { return null } return JSON.parse(message) }