monitor.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. import { ThirdPartyDevice } from '@/constant'
  2. import {
  3. WillTopic,
  4. createClient,
  5. decodePayload
  6. } from '@/utils/mqtt'
  7. import { transformPercentVolumeToLevelVolume } from '@/utils/control/volume'
  8. import { getStatusReport } from '@/api/device'
  9. import { getThirdPartyDevicesByThirdPartyDevice } from '@/api/mesh'
  10. import {
  11. Status,
  12. Power
  13. } from './constant'
  14. import {
  15. GET_POWER_STATUS,
  16. GET_RECEIVER_INFO,
  17. parseCachePower,
  18. getPowerStatusByMessage,
  19. getReceivingCardStatusByMessage
  20. } from './nova'
  21. const TIMEOUT_MILLISECOND = 120000
  22. const EXPIRED_MILLISECOND = 300000
  23. const deviceCache = new Map()
  24. const cbMap = new Map()
  25. const injectMap = new Map()
  26. let client = null
  27. export function startMonitor () {
  28. if (client) {
  29. return
  30. }
  31. client = createClient({
  32. will: {
  33. topic: WillTopic,
  34. payload: 'Thirdparty device monitor has died.',
  35. qos: 0,
  36. retain: false
  37. }
  38. })
  39. client.on('error', err => {
  40. console.log('Monitor connection error: ', err)
  41. })
  42. client.on('connect', () => {
  43. console.log('Monitor connected')
  44. })
  45. client.on('disconnect', () => {
  46. console.log('Monitor disconnect')
  47. })
  48. client.on('reconnect', () => {
  49. console.log('Monitor reconnecting...')
  50. })
  51. client.on('offline', () => {
  52. console.log('Monitor offline')
  53. })
  54. client.on('close', () => {
  55. console.log('Monitor closed')
  56. })
  57. client.on('message', (topic, payload) => {
  58. console.log('Monitor topic', topic)
  59. const result = /^(\d+)\/(\d+)\/(screen|multifunctionCard\/invoke\/reply)$/.exec(topic)
  60. if (!result) {
  61. return
  62. }
  63. const id = result[2]
  64. console.log('monitor cache', id)
  65. const message = JSON.parse(decodePayload(topic, payload))
  66. const timestamp = Number(message.timestamp) || Date.now()
  67. if (result[3] === 'screen') {
  68. const { versionName, versionCode, externalUsage, ramUsage, volume } = message
  69. emit(id, 'screen', {
  70. timestamp,
  71. versionName,
  72. versionCode,
  73. externalUsage,
  74. ramUsage,
  75. volume: transformPercentVolumeToLevelVolume(volume)
  76. })
  77. return
  78. }
  79. if (message.function === GET_POWER_STATUS) {
  80. const data = getPowerStatusByMessage(message)
  81. if (data.success) {
  82. emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, {
  83. timestamp,
  84. ...data.data
  85. })
  86. }
  87. } else if (message.function === GET_RECEIVER_INFO) {
  88. const data = getReceivingCardStatusByMessage(message)
  89. if (data.success) {
  90. emit(id, ThirdPartyDevice.RECEIVING_CARD, {
  91. timestamp,
  92. ...data.data
  93. })
  94. }
  95. } else {
  96. injectMap.get(id)?.forEach(cb => {
  97. cb(message)
  98. })
  99. }
  100. })
  101. }
  102. function emit (id, key, value) {
  103. const cache = getCacheById(id)
  104. cache[key] = value
  105. cbMap.get(id)?.forEach(cb => {
  106. cb(cache, key)
  107. })
  108. }
  109. function getCacheById (id) {
  110. let cache = deviceCache.get(id)
  111. if (!cache) {
  112. deviceCache.set(id, cache = {
  113. [ThirdPartyDevice.MULTI_FUNCTION_CARD]: {
  114. status: Status.LOADING,
  115. switchStatus: Power.LOADING,
  116. powers: []
  117. },
  118. [ThirdPartyDevice.RECEIVING_CARD]: {
  119. status: Status.LOADING,
  120. receivers: []
  121. },
  122. screen: null
  123. })
  124. }
  125. return cache
  126. }
  127. function getFreshCacheById (id) {
  128. const cache = deviceCache.get(id)
  129. if (cache) {
  130. let update = false
  131. if (cache[ThirdPartyDevice.MULTI_FUNCTION_CARD].status > Status.LOADING && (Date.now() - cache[ThirdPartyDevice.MULTI_FUNCTION_CARD].timestamp > EXPIRED_MILLISECOND)) {
  132. cache[ThirdPartyDevice.MULTI_FUNCTION_CARD] = {
  133. status: Status.LOADING,
  134. switchStatus: Power.LOADING,
  135. powers: []
  136. }
  137. update = true
  138. }
  139. if (cache.screen && (Date.now() - cache.screen.timestamp > EXPIRED_MILLISECOND)) {
  140. cache.screen = null
  141. update = true
  142. }
  143. if (update) {
  144. cbMap.get(id)?.forEach(cb => {
  145. cb(cache)
  146. })
  147. }
  148. return cache
  149. }
  150. return getCacheById(id)
  151. }
  152. export function resetCacheById (id) {
  153. console.log('monitor', id, '->', 'reset cache')
  154. const cache = deviceCache.get(id)
  155. if (cache) {
  156. deviceCache.delete(id)
  157. }
  158. }
  159. function checkMultiCard (id, multiCard) {
  160. switch (multiCard.status) {
  161. case Status.LOADING:
  162. console.log('monitor', id, '->', 'add task')
  163. addTask(id)
  164. break
  165. case Status.OK:
  166. if (Date.now() - multiCard.timestamp > TIMEOUT_MILLISECOND) {
  167. multiCard.status = Status.WARNING
  168. }
  169. break
  170. default:
  171. break
  172. }
  173. }
  174. const subscribePromise = Promise.resolve()
  175. let subscribeIds = new Set()
  176. let subscribeWaiting = false
  177. let unsubscribeIds = new Set()
  178. let unsubscribeWaiting = false
  179. function doSubscribe () {
  180. if (!subscribeWaiting) {
  181. subscribeWaiting = true
  182. subscribePromise.then(() => {
  183. if (subscribeIds.size) {
  184. const topics = []
  185. subscribeIds.forEach(id => {
  186. topics.push(
  187. `+/${id}/screen`,
  188. `+/${id}/multifunctionCard/invoke/reply`
  189. )
  190. })
  191. client.subscribe(topics)
  192. subscribeIds = new Set()
  193. console.log('monitor subscribe', topics)
  194. }
  195. subscribeWaiting = false
  196. })
  197. }
  198. }
  199. export function addListener (id, cb) {
  200. let cbSet = cbMap.get(id)
  201. if (cbSet) {
  202. if (unsubscribeIds.has(id)) {
  203. unsubscribeIds.delete(id)
  204. }
  205. } else {
  206. cbMap.set(id, cbSet = new Set())
  207. subscribeIds.add(id)
  208. doSubscribe()
  209. }
  210. if (!cb) {
  211. return
  212. }
  213. if (cbSet.has(cb)) {
  214. console.log('monitor', id, '->', 'cb exsit')
  215. return
  216. }
  217. console.log('monitor add', id, '->', 'success')
  218. const cache = getFreshCacheById(id)
  219. cbSet.add(cb)
  220. checkMultiCard(id, cache[ThirdPartyDevice.MULTI_FUNCTION_CARD])
  221. cb(cache)
  222. }
  223. function doUnsubscribe () {
  224. if (!unsubscribeWaiting) {
  225. unsubscribeWaiting = true
  226. subscribePromise.then(() => {
  227. if (unsubscribeIds.size) {
  228. const topics = []
  229. unsubscribeIds.forEach(id => {
  230. topics.push(
  231. `+/${id}/screen`,
  232. `+/${id}/multifunctionCard/invoke/reply`
  233. )
  234. cbMap.delete(id)
  235. injectMap.delete(id)
  236. })
  237. client.unsubscribe(topics)
  238. unsubscribeIds = new Set()
  239. console.log('monitor unsubscribe', topics)
  240. }
  241. unsubscribeWaiting = false
  242. })
  243. }
  244. }
  245. function checkCb (id) {
  246. const cbSet = cbMap.get(id)
  247. const injectSet = injectMap.get(id)
  248. if ((!cbSet || !cbSet.size) && (!injectSet || !injectSet.size)) {
  249. unsubscribeIds.add(id)
  250. doUnsubscribe()
  251. }
  252. }
  253. export function removeListener (id, cb) {
  254. const cbSet = cbMap.get(id)
  255. if (!cbSet || !cbSet.has(cb)) {
  256. console.log('monitor', id, '->', 'not exsit')
  257. return
  258. }
  259. if (subscribeIds.has(id)) {
  260. subscribeIds.delete(id)
  261. }
  262. console.log('monitor remove', id, '->', 'success')
  263. cbSet.delete(cb)
  264. checkCb(id)
  265. }
  266. export function addInjectListener (id, cb) {
  267. let cbSet = injectMap.get(id)
  268. if (!cbSet) {
  269. injectMap.set(id, cbSet = new Set())
  270. addListener(id)
  271. }
  272. if (cbSet.has(cb)) {
  273. console.log('inject', id, '->', 'exsit')
  274. return
  275. }
  276. console.log('inject add', id, '->', 'success')
  277. cbSet.add(cb)
  278. }
  279. export function removeInjectListener (id, cb) {
  280. const cbSet = injectMap.get(id)
  281. if (!cbSet || !cbSet.has(cb)) {
  282. console.log('inject', id, '->', 'not exsit')
  283. return
  284. }
  285. console.log('inject remove', id, '->', 'success')
  286. cbSet.delete(cb)
  287. checkCb(id)
  288. }
  289. let waiting = new Set()
  290. let queue = new Set()
  291. let fetching = false
  292. let started = false
  293. function addTask (id) {
  294. if (fetching) {
  295. if (queue.has(id)) {
  296. console.log('task exsit', id)
  297. } else {
  298. waiting.add(id)
  299. }
  300. } else {
  301. queue.add(id)
  302. flush()
  303. }
  304. }
  305. function flush () {
  306. if (!started && (queue.size || waiting.size)) {
  307. queue = new Set([...queue, ...waiting])
  308. waiting = new Set()
  309. started = true
  310. Promise.resolve().then(startTask)
  311. }
  312. }
  313. function startTask () {
  314. fetching = true
  315. console.log('monitor task', queue.size)
  316. const ids = [...queue]
  317. getStatusReport(ids, { background: true, custom: true }).then(
  318. async ({ data }) => {
  319. const map = {}
  320. data.forEach(item => {
  321. map[item.deviceId] = item
  322. })
  323. const now = Date.now()
  324. const errorSet = new Set()
  325. for (let i = 0; i < ids.length; i++) {
  326. const id = ids[i]
  327. if (getCacheById(id)[ThirdPartyDevice.MULTI_FUNCTION_CARD].status !== Status.LOADING) {
  328. continue
  329. }
  330. const multiCard = map[id]
  331. // 未上报过数据需判断是否绑定了多功能卡
  332. if (!multiCard) {
  333. try {
  334. const { data } = await getThirdPartyDevicesByThirdPartyDevice(id, [
  335. ThirdPartyDevice.MULTI_FUNCTION_CARD
  336. ], { background: true, custom: true })
  337. if (data?.[0]?.instance) {
  338. emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, {
  339. status: Status.WARNING,
  340. timestamp: Date.now(),
  341. switchStatus: Power.LOADING,
  342. powers: []
  343. })
  344. } else {
  345. emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { status: Status.NONE })
  346. }
  347. } catch (e) {
  348. errorSet.add(id)
  349. }
  350. continue
  351. }
  352. const timestamp = Number(multiCard.reportTimestamp)
  353. const powerInfo = parseCachePower(multiCard)
  354. // 超时未上报的需判断是否解绑了多功能卡
  355. if (now - timestamp > TIMEOUT_MILLISECOND) {
  356. try {
  357. const { data } = await getThirdPartyDevicesByThirdPartyDevice(id, [
  358. ThirdPartyDevice.MULTI_FUNCTION_CARD
  359. ], { background: true, custom: true })
  360. if (data?.[0]?.instance) {
  361. emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, {
  362. status: Status.WARNING,
  363. timestamp,
  364. ...powerInfo
  365. })
  366. } else {
  367. emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, { status: Status.NONE })
  368. }
  369. } catch (e) {
  370. errorSet.add(id)
  371. }
  372. continue
  373. }
  374. if (multiCard.switchStatus === Power.LOADING) {
  375. emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, {
  376. status: Status.WARNING,
  377. timestamp,
  378. ...powerInfo
  379. })
  380. continue
  381. }
  382. emit(id, ThirdPartyDevice.MULTI_FUNCTION_CARD, {
  383. status: Status.OK,
  384. timestamp,
  385. ...powerInfo
  386. })
  387. }
  388. queue = errorSet
  389. }
  390. ).then(() => {
  391. fetching = false
  392. started = false
  393. setTimeout(flush, 500)
  394. })
  395. }