import { ThirdPartyDevice } from '@/constant' import { WillTopic, createClient, decodePayload } from '@/utils/mqtt' import { transformPercentVolumeToLevelVolume } from '@/utils/control/volume' import { getStatusReport } from '@/api/device' import { getThirdPartyDevicesByThirdPartyDevice } from '@/api/mesh' import { Status, Power } from './constant' import { GET_POWER_STATUS, GET_RECEIVER_INFO, parseCachePower, getPowerStatusByMessage, getReceivingCardStatusByMessage } from './nova' const TIMEOUT_MILLISECOND = 120000 const EXPIRED_MILLISECOND = 300000 const deviceCache = new Map() const cbMap = new Map() const injectMap = new Map() let client = null export function startMonitor () { if (client) { return } client = createClient({ will: { topic: WillTopic, payload: 'Thirdparty device monitor has died.', qos: 0, retain: false } }) client.on('error', err => { console.log('Monitor connection error: ', err) }) client.on('connect', () => { console.log('Monitor connected') }) client.on('disconnect', () => { console.log('Monitor disconnect') }) client.on('reconnect', () => { console.log('Monitor reconnecting...') }) client.on('offline', () => { console.log('Monitor offline') }) client.on('close', () => { console.log('Monitor closed') }) client.on('message', (topic, payload) => { console.log('Monitor topic', topic) const result = /^(\d+)\/(\d+)\/(screen|multifunctionCard\/invoke\/reply)$/.exec(topic) if (!result) { return } const id = result[2] console.log('monitor cache', id) const message = JSON.parse(decodePayload(topic, payload)) const timestamp = Number(message.timestamp) || Date.now() if (result[3] === 'screen') { const { versionName, versionCode, externalUsage, ramUsage, volume } = message emit(id, 'screen', { timestamp, versionName, versionCode, externalUsage, ramUsage, volume: transformPercentVolumeToLevelVolume(volume) }) return } if (message.function === GET_POWER_STATUS) { const data = getPowerStatusByMessage(message) if (data.success) { emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { timestamp, ...data.data }) } } else if (message.function === GET_RECEIVER_INFO) { const data = getReceivingCardStatusByMessage(message) if (data.success) { emit(id, ThirdPartyDevice.RECEIVING_CARD, { timestamp, ...data.data }) } } else { injectMap.get(id)?.forEach(cb => { cb(message) }) } }) } function emit (id, key, value) { const cache = getCacheById(id) cache[key] = value cbMap.get(id)?.forEach(cb => { cb(cache, key) }) } function getCacheById (id) { let cache = deviceCache.get(id) if (!cache) { deviceCache.set(id, cache = { [ThirdPartyDevice.MULTI_FUNCTION_CARD]: { status: Status.LOADING, switchStatus: Power.LOADING, powers: [] }, [ThirdPartyDevice.RECEIVING_CARD]: { status: Status.LOADING, receivers: [] }, screen: null }) } return cache } function getFreshCacheById (id) { const cache = deviceCache.get(id) if (cache) { let update = false if (cache[ThirdPartyDevice.MULTI_FUNCTION_CARD].status > Status.LOADING && (Date.now() - cache[ThirdPartyDevice.MULTI_FUNCTION_CARD].timestamp > EXPIRED_MILLISECOND)) { cache[ThirdPartyDevice.MULTI_FUNCTION_CARD] = { status: Status.LOADING, switchStatus: Power.LOADING, powers: [] } update = true } if (cache.screen && (Date.now() - cache.screen.timestamp > EXPIRED_MILLISECOND)) { cache.screen = null update = true } if (update) { cbMap.get(id)?.forEach(cb => { cb(cache) }) } return cache } return getCacheById(id) } export function resetCacheById (id) { console.log('monitor', id, '->', 'reset cache') const cache = deviceCache.get(id) if (cache) { deviceCache.delete(id) } } function checkMultiCard (id, multiCard) { switch (multiCard.status) { case Status.LOADING: console.log('monitor', id, '->', 'add task') addTask(id) break case Status.OK: if (Date.now() - multiCard.timestamp > TIMEOUT_MILLISECOND) { multiCard.status = Status.WARNING } break default: break } } const subscribePromise = Promise.resolve() let subscribeIds = new Set() let subscribeWaiting = false let unsubscribeIds = new Set() let unsubscribeWaiting = false function doSubscribe () { if (!subscribeWaiting) { subscribeWaiting = true subscribePromise.then(() => { if (subscribeIds.size) { const topics = [] subscribeIds.forEach(id => { topics.push( `+/${id}/screen`, `+/${id}/multifunctionCard/invoke/reply` ) }) client.subscribe(topics) subscribeIds = new Set() console.log('monitor subscribe', topics) } subscribeWaiting = false }) } } export function addListener (id, cb) { let cbSet = cbMap.get(id) if (cbSet) { if (unsubscribeIds.has(id)) { unsubscribeIds.delete(id) } } else { cbMap.set(id, cbSet = new Set()) subscribeIds.add(id) doSubscribe() } if (!cb) { return } if (cbSet.has(cb)) { console.log('monitor', id, '->', 'cb exsit') return } console.log('monitor add', id, '->', 'success') const cache = getFreshCacheById(id) cbSet.add(cb) checkMultiCard(id, cache[ThirdPartyDevice.MULTI_FUNCTION_CARD]) cb(cache) } function doUnsubscribe () { if (!unsubscribeWaiting) { unsubscribeWaiting = true subscribePromise.then(() => { if (unsubscribeIds.size) { const topics = [] unsubscribeIds.forEach(id => { topics.push( `+/${id}/screen`, `+/${id}/multifunctionCard/invoke/reply` ) cbMap.delete(id) injectMap.delete(id) }) client.unsubscribe(topics) unsubscribeIds = new Set() console.log('monitor unsubscribe', topics) } unsubscribeWaiting = false }) } } function checkCb (id) { const cbSet = cbMap.get(id) const injectSet = injectMap.get(id) if ((!cbSet || !cbSet.size) && (!injectSet || !injectSet.size)) { unsubscribeIds.add(id) doUnsubscribe() } } export function removeListener (id, cb) { const cbSet = cbMap.get(id) if (!cbSet || !cbSet.has(cb)) { console.log('monitor', id, '->', 'not exsit') return } if (subscribeIds.has(id)) { subscribeIds.delete(id) } console.log('monitor remove', id, '->', 'success') cbSet.delete(cb) checkCb(id) } export function addInjectListener (id, cb) { let cbSet = injectMap.get(id) if (!cbSet) { injectMap.set(id, cbSet = new Set()) addListener(id) } if (cbSet.has(cb)) { console.log('inject', id, '->', 'exsit') return } console.log('inject add', id, '->', 'success') cbSet.add(cb) } export function removeInjectListener (id, cb) { const cbSet = injectMap.get(id) if (!cbSet || !cbSet.has(cb)) { console.log('inject', id, '->', 'not exsit') return } console.log('inject remove', id, '->', 'success') cbSet.delete(cb) checkCb(id) } let waiting = new Set() let queue = new Set() let fetching = false let started = false function addTask (id) { if (fetching) { if (queue.has(id)) { console.log('task exsit', id) } else { waiting.add(id) } } else { queue.add(id) flush() } } function flush () { if (!started && (queue.size || waiting.size)) { queue = new Set([...queue, ...waiting]) waiting = new Set() started = true Promise.resolve().then(startTask) } } function startTask () { fetching = true console.log('monitor task', queue.size) const ids = [...queue] getStatusReport(ids, { background: true, custom: true }).then( async ({ data }) => { const map = {} data.forEach(item => { map[item.deviceId] = item }) const now = Date.now() const errorSet = new Set() for (let i = 0; i < ids.length; i++) { const id = ids[i] if (getCacheById(id)[ThirdPartyDevice.MULTI_FUNCTION_CARD].status !== Status.LOADING) { continue } const multiCard = map[id] // 未上报过数据需判断是否绑定了多功能卡 if (!multiCard) { try { const { data } = await getThirdPartyDevicesByThirdPartyDevice(id, [ ThirdPartyDevice.MULTI_FUNCTION_CARD ], { background: true, custom: true }) if (data?.[0]?.instance) { emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { status: Status.WARNING, timestamp: Date.now(), switchStatus: Power.LOADING, powers: [] }) } else { emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { status: Status.NONE }) } } catch (e) { errorSet.add(id) } continue } const timestamp = Number(multiCard.reportTimestamp) const powerInfo = parseCachePower(multiCard) // 超时未上报的需判断是否解绑了多功能卡 if (now - timestamp > TIMEOUT_MILLISECOND) { try { const { data } = await getThirdPartyDevicesByThirdPartyDevice(id, [ ThirdPartyDevice.MULTI_FUNCTION_CARD ], { background: true, custom: true }) if (data?.[0]?.instance) { emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { status: Status.WARNING, timestamp, ...powerInfo }) } else { emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { status: Status.NONE }) } } catch (e) { errorSet.add(id) } continue } if (multiCard.switchStatus === Power.LOADING) { emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { status: Status.WARNING, timestamp, ...powerInfo }) continue } emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { status: Status.OK, timestamp, ...powerInfo }) } queue = errorSet } ).then(() => { fetching = false started = false setTimeout(flush, 500) }) }