takeover.js 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. import { getAssets } from '@/api/asset'
  2. import store from '@/store'
  3. import {
  4. State,
  5. AssetType
  6. } from '@/constant'
  7. import {
  8. createClient,
  9. decodePayload,
  10. sm4
  11. } from '@/utils/mqtt'
  12. const NAMESPACE = 'asset'
  13. export const Topic = {
  14. PRELOAD: 'preload',
  15. DEL: 'delete',
  16. PULL: 'pull',
  17. PLAY: 'play',
  18. STOP: 'stop',
  19. DOWNLOAD: 'download'
  20. }
  21. export const MediaType = {
  22. IMAGE: 1,
  23. VIDEO: 2,
  24. WEB: 3
  25. }
  26. export const MediaStatus = {
  27. ERROR: -1,
  28. WAITING: 0,
  29. LOADING: 1,
  30. LOADED: 2
  31. }
  32. export function takeOver (device, { onMessage, onCreated, onClose, debug }) {
  33. const { id, productId } = device
  34. debug('正在连接...')
  35. let force = false
  36. const prefix = `${productId}/${id}/${NAMESPACE}`
  37. let client = createClient({
  38. clientId: `${store.getters.account}_takeover_${id}`,
  39. reconnectPeriod: 0,
  40. will: {
  41. topic: `${prefix}/${Topic.STOP}`,
  42. payload: createPayload(createMessageId()),
  43. qos: 2,
  44. retain: false
  45. }
  46. })
  47. const messageMap = new Map()
  48. const onMqttMessage = (event, paylaod) => {
  49. const needMessageId = event !== Topic.DOWNLOAD
  50. const messageId = paylaod.messageId
  51. if (needMessageId && !messageMap.has(messageId)) {
  52. return
  53. }
  54. console.log('Take Over MQTT', event, paylaod)
  55. const requestPayload = needMessageId ? messageMap.get(messageId) : null
  56. needMessageId && messageMap.delete(messageId)
  57. switch (event) {
  58. case Topic.PRELOAD:
  59. debug('预加载资源接收成功')
  60. break
  61. case Topic.DEL:
  62. debug('删除资源成功')
  63. break
  64. case Topic.PULL:
  65. debug('资源拉取成功')
  66. break
  67. case Topic.PLAY:
  68. debug(`播放${requestPayload.asset.name}成功`)
  69. break
  70. case Topic.STOP:
  71. debug('结束')
  72. break
  73. case Topic.DOWNLOAD:
  74. if (!paylaod.complete) {
  75. return
  76. }
  77. debug('有资源状态更新')
  78. break
  79. default:
  80. return
  81. }
  82. onMessage(event, paylaod, requestPayload)
  83. }
  84. const mock = {
  85. do (event, messageId, payload) {
  86. this[event](messageId, payload)
  87. },
  88. preload (messageId, payload) {
  89. onMqttMessage(Topic.PRELOAD, { messageId })
  90. setTimeout(() => {
  91. payload.assets.forEach(asset => {
  92. if (asset.type !== MediaType.WEB) {
  93. onMqttMessage(Topic.DOWNLOAD, {
  94. messageId: createMessageId,
  95. asset,
  96. complete: true,
  97. success: [true, false][Math.random() * 2 | 0]
  98. })
  99. }
  100. })
  101. }, 2000)
  102. },
  103. delete (messageId) {
  104. onMqttMessage(Topic.DEL, { messageId, failures: [] })
  105. },
  106. pull (messageId) {
  107. getAssets({ pageNum: 1, pageSize: 10, type: [AssetType.IMAGE, AssetType.VIDEO][Math.random() * 2 | 0], status: State.RESOLVED }).then(({ data }) => {
  108. onMqttMessage(Topic.PULL, {
  109. messageId,
  110. assets: data.map(({ type, originalName, keyName, size, md5 }) => {
  111. return {
  112. type,
  113. name: originalName,
  114. keyName,
  115. size,
  116. md5,
  117. status: [MediaStatus.ERROR, MediaStatus.LOADING, MediaStatus.LOADED, MediaStatus.WAITING][Math.random() * 3 | 0]
  118. }
  119. })
  120. }, true, true)
  121. })
  122. },
  123. play (messageId) {
  124. onMqttMessage(Topic.PLAY, { messageId })
  125. },
  126. stop (messageId) {
  127. onMqttMessage(Topic.STOP, { messageId })
  128. }
  129. }
  130. const send = (event, paylaod) => {
  131. const messageId = createMessageId()
  132. messageMap.set(messageId, paylaod)
  133. if (__DEV__) {
  134. mock.do(event, messageId, paylaod)
  135. } else {
  136. client.publish(`${prefix}/${event}`, createPayload(messageId, paylaod))
  137. }
  138. }
  139. client.on('error', e => {
  140. console.log('Take Over MQTT error: ', e)
  141. })
  142. client.on('connect', () => {
  143. console.log('Take Over MQTT connected')
  144. debug('连接成功')
  145. client.subscribe(`${prefix}/+/#`)
  146. onCreated(createProxy(device, send, () => {
  147. force = true
  148. client.end(true)
  149. }, debug))
  150. })
  151. client.on('close', () => {
  152. console.log('Take Over MQTT closed')
  153. if (!force) {
  154. debug('连接发生异常,已断开')
  155. client.end(true)
  156. }
  157. })
  158. client.on('end', () => {
  159. console.log('Take Over MQTT end')
  160. client = null
  161. messageMap.clear()
  162. force && debug('连接已断开')
  163. onClose()
  164. })
  165. client.on('message', (topic, payload) => {
  166. const result = /^\d+\/\d+\/asset\/(.+)\/reply$/.exec(topic)
  167. if (result) {
  168. onMqttMessage(result[1], JSON.parse(decodePayload(null, payload)))
  169. }
  170. })
  171. return client
  172. }
  173. function createMessageId () {
  174. return `${store.getters.account}_${Math.random().toString(16).slice(2)}`
  175. }
  176. function createPayload (messageId, payload) {
  177. return sm4.encrypt(JSON.stringify({
  178. messageId,
  179. timestamp: `${Date.now()}`,
  180. ...payload
  181. }))
  182. }
  183. function createProxy (device, send, close, debug) {
  184. const { wide, high } = device
  185. return {
  186. close,
  187. send,
  188. preload (assets) {
  189. debug('预加载资源...')
  190. send(Topic.PRELOAD, { assets })
  191. },
  192. del (assets) {
  193. debug('删除资源...')
  194. send(Topic.DEL, { assets })
  195. },
  196. pull () {
  197. debug('资源拉取...')
  198. send(Topic.PULL)
  199. },
  200. play (asset, rect) {
  201. debug(`播放${asset.name}...`)
  202. send(Topic.PLAY, {
  203. asset,
  204. top: 0,
  205. left: 0,
  206. width: wide,
  207. height: high,
  208. ...rect
  209. })
  210. },
  211. stop () {
  212. debug('结束播放...')
  213. send(Topic.STOP)
  214. }
  215. }
  216. }