Pārlūkot izejas kodu

feat: support specified resource playback

daigang 3 gadi atpakaļ
vecāks
revīzija
6e76a6fbd8

+ 1 - 1
package.json

@@ -21,7 +21,7 @@
     "keycloak-js": "^15.0.2",
     "mediainfo.js": "^0.1.7",
     "mpegts.js": "^1.6.10",
-    "mqtt": "^4.2.8",
+    "mqtt": "^4.3.7",
     "nprogress": "^0.2.0",
     "path-to-regexp": "^6.2.0",
     "spark-md5": "^3.0.2",

+ 3 - 0
src/store/getters.js

@@ -45,6 +45,9 @@ const getters = {
   },
   isValid (state, getters) {
     return getters.token && (getters.tenant || getters.isSuperAdmin)
+  },
+  account (state) {
+    return state.user.account
   }
 }
 

+ 6 - 1
src/store/modules/user.js

@@ -15,7 +15,8 @@ const state = {
   name: '',
   avatar: '',
   roles: new Set(),
-  accesses: new Set()
+  accesses: new Set(),
+  account: ''
 }
 
 const mutations = {
@@ -50,6 +51,9 @@ const mutations = {
   SET_ACCESSES (state, accesses) {
     console.log('accesses', accesses)
     state.accesses = accesses
+  },
+  SET_ACCOUNT (state, account) {
+    state.account = account
   }
 }
 
@@ -107,6 +111,7 @@ const actions = {
       commit('SET_AVATAR', avatar)
       commit('SET_ROLES', roleSet)
       commit('SET_ACCESSES', accessSet)
+      commit('SET_ACCOUNT', preferred_username)
     }
   },
   logout () {

+ 6 - 5
src/utils/mqtt.js

@@ -1,5 +1,6 @@
 import mqtt from 'mqtt'
 import { GATEWAY_WS } from '@/constant'
+import store from '@/store'
 import SM4 from '@/utils/sm4'
 
 const willTopic = 'web/offline'
@@ -47,6 +48,9 @@ function isWhite (item, topic, message) {
 
 export function createClient (options) {
   return mqtt.connect(host, {
+    username,
+    password,
+    clientId: `mqtt_${store.getters.account}_${Math.random().toString(16).slice(2)}`,
     protocolId: 'MQTT',
     protocolVersion: 4,
     clean: true,
@@ -67,9 +71,6 @@ function changeState (state) {
 function start () {
   console.log('Connecting mqtt client...')
   const client = createClient({
-    username,
-    password,
-    clientId: `mqttjs_${Math.random().toString(16).slice(2)}`,
     will: {
       topic: willTopic,
       payload: 'Connection Closed abnormally..!',
@@ -142,12 +143,12 @@ export function decodePayload (topic, payload) {
 }
 
 const decoder = new TextDecoder('utf-8')
-export function utf8ArrayBufferToString (buffer) {
+function utf8ArrayBufferToString (buffer) {
   return decoder.decode(buffer)
 }
 
 function decodeMessage (topic, arrayBuffer, message) {
-  if (whiteList.some(item => isWhite(item, topic, message))) {
+  if (topic && whiteList.some(item => isWhite(item, topic, message))) {
     console.log('white list')
   } else {
     const decodeStr = sm4.decrypt(arrayBuffer)

+ 138 - 0
src/views/device/detail/components/DeviceTakeOver/components/Asset.vue

@@ -0,0 +1,138 @@
+<template>
+  <div
+    class="o-preload-asset"
+  >
+    <div
+      class="o-preload-asset__img"
+      :class="{ 'u-pointer': status === 2 }"
+      :style="styles"
+      @click="onClick"
+    >
+      <svg-icon
+        v-if="isVideo"
+        class="o-preload-asset__thumb"
+        icon-class="video-thumb"
+      />
+      <i
+        v-if="status === -1"
+        class="o-preload-asset__refresh o-icon has-active el-icon-refresh u-pointer"
+      />
+    </div>
+    <div class="l-flex--row o-preload-asset__footer">
+      <auto-text
+        class="l-flex__fill c-sibling-item"
+        :text="asset.name"
+      />
+      <i
+        class="l-flex__none c-slibing-item el-icon-delete has-active u-pointer"
+        @click="onDel"
+      />
+    </div>
+  </div>
+</template>
+
+<script>
+import { getThumbnailUrl } from '@/api/asset'
+
+export default {
+  name: 'PreloadAsset',
+  props: {
+    asset: {
+      type: Object,
+      required: true
+    }
+  },
+  data () {
+    return {
+      showDetail: false,
+      isMultipleFiles: false,
+      total: 0,
+      active: 0,
+      source: null
+    }
+  },
+  computed: {
+    type () {
+      return this.asset.type
+    },
+    isImage () {
+      return this.type === 1
+    },
+    isVideo () {
+      return this.type === 2
+    },
+    isWeb () {
+      return this.type === 3
+    },
+    status () {
+      return this.asset.status
+    },
+    keyName () {
+      return this.asset.keyName
+    },
+    styles () {
+      return this.isImage
+        ? {
+          'background-image': `url("${getThumbnailUrl(this.keyName)}")`
+        }
+        : null
+    }
+  },
+  methods: {
+    onClick () {
+      switch (this.status) {
+        case -1:
+          this.$emit('reload', this.asset)
+          break
+        case 2:
+          this.$emit('play', this.asset)
+          break
+        default:
+          break
+      }
+    },
+    onDel () {
+      this.$emit('del', this.asset)
+    }
+  }
+}
+</script>
+
+<style lang="scss" scoped>
+.o-preload-asset {
+  position: relative;
+
+  &__img {
+    position: relative;
+    padding-top: 56.25%;
+    border-radius: $radius;
+    background-color: #000;
+    background-position: center center;
+    background-size: contain;
+    background-repeat: no-repeat;
+  }
+
+  &__thumb {
+    position: absolute;
+    top: 10%;
+    left: 10%;
+    width: 80%;
+    height: 80%;
+  }
+
+  &__refresh {
+    position: absolute;
+    top: 50%;
+    left: 50%;
+    color: #fff;
+    border-radius: 50%;
+    background-color: rgba(#000, 0.5);
+    transform: translate(-50%, -50%);
+  }
+
+  &__footer {
+    margin-top: 4px;
+    font-size: 14px;
+  }
+}
+</style>

+ 209 - 0
src/views/device/detail/components/DeviceTakeOver/index.vue

@@ -0,0 +1,209 @@
+<template>
+  <div class="l-flex">
+    <div class="l-flex__none c-debug u-overflow-y--auto">
+      <div
+        v-for="message in messages"
+        :key="message.id"
+      >
+        {{ message.info }}
+      </div>
+    </div>
+    <div
+      v-loading="loading"
+      class="l-flex__fill l-flex--col"
+    >
+      <div
+        v-if="!loading && !valid"
+        class="l-flex--col center"
+      >
+        <button
+          class="o-button"
+          @click="createClient"
+        >
+          连接设备
+        </button>
+      </div>
+      <el-tabs
+        v-model="active"
+        class="c-tabs has-bottom-padding"
+      >
+        <el-tab-pane
+          label="已下载"
+          name="loaded"
+        />
+        <el-tab-pane
+          label="下载中"
+          name="loading"
+        />
+        <el-tab-pane
+          label="异常"
+          name="error"
+        />
+      </el-tabs>
+      <div class="l-flex__fill l-grid--info mini u-overflow-y--auto">
+        <asset
+          v-for="asset in list"
+          :key="asset.keyName"
+          :asset="asset"
+          @play="onPlay"
+          @reload="onReload"
+          @del="onDel"
+        />
+      </div>
+    </div>
+  </div>
+</template>
+
+<script>
+import {
+  Topic,
+  takeOver
+} from './takeover'
+import Asset from './components/Asset'
+
+export default {
+  name: 'DeviceTakeOver',
+  components: {
+    Asset
+  },
+  props: {
+    device: {
+      type: Object,
+      required: true
+    },
+    online: {
+      type: [Boolean, String],
+      default: false
+    }
+  },
+  data () {
+    return {
+      loading: true,
+      valid: false,
+      messages: [],
+      active: 'loaded',
+      assets: []
+    }
+  },
+  computed: {
+    list () {
+      switch (this.active) {
+        case 'loaded':
+          return this.assets.filter(({ status }) => status === 2)
+        case 'loading':
+          return this.assets.filter(({ status }) => status === 0 || status === 1)
+        case 'error':
+          return this.assets.filter(({ status }) => status === -1)
+        default:
+          return this.assets
+      }
+    }
+  },
+  created () {
+    this.createClient()
+  },
+  beforeDestroy () {
+    if (this.valid) {
+      this.$clientProxy.close()
+    } else {
+      this.$client?.end(true)
+    }
+  },
+  methods: {
+    onMessage (topic, payload, requestPayload) {
+      switch (topic) {
+        case Topic.PRELOAD:
+          this.onReloadReply(requestPayload.assets)
+          break
+        case Topic.DEL:
+          this.onDelReply(requestPayload.assets)
+          break
+        case Topic.PULL:
+          this.onRefresh(payload.assets)
+          break
+        case Topic.DOWNLOAD:
+          this.onDownload(payload)
+          break
+        default:
+          break
+      }
+    },
+    onCreated (proxy) {
+      this.valid = true
+      this.$client = null
+      this.$clientProxy = proxy
+      this.$clientProxy.pull()
+    },
+    onClose () {
+      this.valid = false
+      this.loading = false
+      this.$client = null
+      this.$clientProxy = null
+    },
+    onDebug (message) {
+      this.messages.unshift({
+        id: `${Date.now()}_${Math.random().toString().slice(2)}`,
+        info: message
+      })
+    },
+    createClient () {
+      this.loading = true
+      this.$client = takeOver(this.device, {
+        onMessage: this.onMessage,
+        onCreated: this.onCreated,
+        onClose: this.onClose,
+        debug: this.onDebug
+      })
+    },
+    onRefresh (assets) {
+      this.loading = false
+      this.assets = assets
+    },
+    onPlay ({ type, name, keyName }) {
+      this.$clientProxy.play({ type, name, keyName })
+    },
+    onReload (asset) {
+      const { status, ...data } = asset
+      this.$clientProxy.preload([data])
+    },
+    onReloadReply (assets) {
+      assets.forEach(({ keyName }) => {
+        const index = this.assets.findIndex(asset => asset.keyName === keyName)
+        if (~index) {
+          this.assets[index].status = 0
+        }
+      })
+    },
+    onDel ({ name, keyName }) {
+      this.onDebug(`删除${name}`)
+      this.$clientProxy.del([keyName])
+    },
+    onDelReply (assets) {
+      assets.forEach(keyName => {
+        const index = this.assets.findIndex(asset => asset.keyName === keyName)
+        if (~index) {
+          this.assets.splice(index, 1)
+        }
+      })
+    },
+    onDownload ({ success, asset: { keyName } }) {
+      const index = this.assets.findIndex(asset => asset.keyName === keyName)
+      if (~index) {
+        this.assets[index].status = success ? 2 : -1
+      }
+    }
+  }
+}
+</script>
+
+<style lang="scss" scoped>
+.c-debug {
+  width: 200px;
+  padding-right: $spacing;
+  margin-right: $spacing;
+  color: $black;
+  font-size: 14px;
+  line-height: 24px;
+  border-right: 1px solid $gray--light;
+}
+</style>

+ 210 - 0
src/views/device/detail/components/DeviceTakeOver/takeover.js

@@ -0,0 +1,210 @@
+import { getAssets } from '@/api/asset'
+import store from '@/store'
+import {
+  createClient,
+  decodePayload,
+  sm4
+} from '@/utils/mqtt'
+
+const NAMESPACE = 'asset'
+
+export const Topic = {
+  PRELOAD: 'preload',
+  DEL: 'delete',
+  PULL: 'pull',
+  PLAY: 'play',
+  STOP: 'stop',
+  DOWNLOAD: 'download'
+}
+
+export function takeOver (device, { onMessage, onCreated, onClose, debug }) {
+  const { id, productId } = device
+
+  debug('正在连接...')
+
+  let force = false
+
+  const prefix = `${productId}/${id}/${NAMESPACE}`
+
+  let client = createClient({
+    clientId: `${store.getters.account}_takeover_${id}`,
+    reconnectPeriod: 0,
+    will: {
+      topic: `${prefix}/${Topic.STOP}`,
+      payload: createPayload(createMessageId()),
+      qos: 2,
+      retain: false
+    }
+  })
+
+  const messageMap = new Map()
+
+  const onMqttMessage = (event, paylaod) => {
+    const needMessageId = event !== Topic.DOWNLOAD
+    const messageId = paylaod.messageId
+    if (needMessageId && !messageMap.has(messageId)) {
+      return
+    }
+    console.log('Take Over MQTT', event, paylaod)
+    const requestPayload = needMessageId ? messageMap.get(messageId) : null
+    needMessageId && messageMap.delete(messageId)
+    switch (event) {
+      case Topic.PRELOAD:
+        debug('预加载资源接收成功')
+        break
+      case Topic.DEL:
+        debug('删除资源成功')
+        break
+      case Topic.PULL:
+        debug('资源拉取成功')
+        break
+      case Topic.PLAY:
+        debug(`播放${requestPayload.asset.name}成功`)
+        break
+      case Topic.STOP:
+        debug('结束')
+        break
+      case Topic.DOWNLOAD:
+        if (!paylaod.complete) {
+          return
+        }
+        debug('有资源状态更新')
+        break
+      default:
+        return
+    }
+    onMessage(event, paylaod, requestPayload)
+  }
+
+  const mock = {
+    do (event, messageId, payload) {
+      if (!__DEV__) {
+        return
+      }
+      this[event](messageId, payload)
+    },
+    preload (messageId) {
+      onMqttMessage(Topic.PRELOAD, { messageId })
+    },
+    delete (messageId) {
+      onMqttMessage(Topic.DEL, { messageId, failures: [] })
+    },
+    pull (messageId) {
+      getAssets({ pageNum: 1, pageSize: 10, type: 1, status: 2 }).then(({ data }) => {
+        onMqttMessage(Topic.PULL, {
+          messageId,
+          assets: data.map(({ type, originalName, keyName, size, md5 }) => {
+            return {
+              type,
+              name: originalName,
+              keyName,
+              size,
+              md5,
+              status: [-1, 1, 2, 0][Math.random() * 3 | 0]
+            }
+          })
+        }, true, true)
+      })
+    },
+    play (messageId) {
+      onMqttMessage(Topic.PLAY, { messageId })
+    },
+    stop (messageId) {
+      onMqttMessage(Topic.STOP, { messageId })
+    }
+  }
+
+  const send = (event, paylaod) => {
+    const messageId = createMessageId()
+    messageMap.set(messageId, paylaod)
+    client.publish(`${prefix}/${event}`, createPayload(messageId, paylaod))
+    mock.do(event, messageId, paylaod)
+  }
+
+  client.on('error', e => {
+    console.log('Take Over MQTT error: ', e)
+  })
+
+  client.on('connect', () => {
+    console.log('Take Over MQTT connected')
+    debug('连接成功')
+    client.subscribe(`${prefix}/+/#`)
+    onCreated(createProxy(device, send, () => {
+      force = true
+      client.end(true)
+    }, debug))
+  })
+
+  client.on('close', () => {
+    console.log('Take Over MQTT closed')
+    if (!force) {
+      debug('连接发生异常,已断开')
+      client.end(true)
+    }
+  })
+
+  client.on('end', () => {
+    console.log('Take Over MQTT end')
+    client = null
+    messageMap.clear()
+    force && debug('连接已断开')
+    onClose()
+  })
+
+  client.on('message', (topic, payload) => {
+    const result = /^\d+\/\d+\/asset\/(.+)\/reply$/.exec(topic)
+    if (result) {
+      onMqttMessage(result[1], JSON.parse(decodePayload(null, payload)))
+    }
+  })
+
+  return client
+}
+
+function createMessageId () {
+  return `${store.getters.account}_${Math.random().toString(16).slice(2)}`
+}
+
+function createPayload (messageId, payload) {
+  return sm4.encrypt(JSON.stringify({
+    messageId,
+    timestamp: `${Date.now()}`,
+    ...payload
+  }))
+}
+
+function createProxy (device, send, close, debug) {
+  const { wide, high } = device
+
+  return {
+    close,
+    send,
+    preload (assets) {
+      debug('预加载资源...')
+      send(Topic.PRELOAD, { assets })
+    },
+    del (assets) {
+      debug('删除资源...')
+      send(Topic.DEL, { assets })
+    },
+    pull () {
+      debug('资源拉取...')
+      send(Topic.PULL)
+    },
+    play (asset, rect) {
+      debug(`播放${asset.name}...`)
+      send(Topic.PLAY, {
+        asset,
+        top: 0,
+        left: 0,
+        width: wide,
+        height: high,
+        ...rect
+      })
+    },
+    stop () {
+      debug('结束播放...')
+      send(Topic.STOP)
+    }
+  }
+}

+ 5 - 2
src/views/device/detail/index.vue

@@ -77,6 +77,7 @@ import DeviceAlarm from './components/DeviceAlarm'
 import DeviceInvoke from './components/DeviceInvoke'
 import DeviceExternal from './components/DeviceExternal'
 import Sensors from './components/external/Sensors'
+import DeviceTakeOver from './components/DeviceTakeOver'
 
 export default {
   name: 'DeviceDetail',
@@ -86,7 +87,8 @@ export default {
     DeviceAlarm,
     DeviceInvoke,
     DeviceExternal,
-    Sensors
+    Sensors,
+    DeviceTakeOver
   },
   data () {
     return {
@@ -102,7 +104,8 @@ export default {
         { key: 'DeviceAlarm', label: '设备告警' },
         this.accessSet.has(Access.MANAGE_DEVICE) ? { key: 'DeviceInvoke', label: '设备操控' } : null,
         __SENSOR__ ? { key: 'Sensors', label: '传感器' } : null,
-        { key: 'DeviceExternal', label: '全链路监测' }
+        { key: 'DeviceExternal', label: '全链路监测' },
+        { key: 'DeviceTakeOver', label: '接管' }
       ].filter(Boolean)
     }
   },

+ 36 - 44
src/views/realm/debug/simulator/index.vue

@@ -60,71 +60,63 @@
 </template>
 
 <script>
-import {
-  subscribe,
-  unsubscribe
-} from '@/utils/mqtt'
 import { createProxy } from './simulate'
 
 export default {
   name: 'Simulate',
   data () {
     return {
-      sn: '',
+      sn: '0YK0Y42101000003',
       valid: false,
       loading: false,
       messages: []
     }
   },
   beforeDestroy () {
-    this.unsubscribe()
+    this.onClose(true)
   },
   methods: {
     createProxy () {
-      this.valid = false
-      this.unsubscribe()
-      if (this.sn) {
-        this.loading = true
-        this.$proxyPromise = createProxy(this.sn)
-        this.$proxyPromise.then(
-          val => {
-            this.$proxyPromise = null
-            this.$proxy = val
-            this.valid = true
-            subscribe(`${this.$proxy.productId}/${this.$proxy.deviceId}/+/#`, this.onMessage)
-            this.$message({
-              type: 'success',
-              message: '代理成功'
-            })
-          },
-          () => {
-            this.$message({
-              type: 'error',
-              message: '代理失败'
-            })
-          }
-        ).finally(() => {
-          this.loading = false
-        })
+      if (!this.sn) {
+        return
       }
+      this.onClose(true)
+      this.loading = true
+      this.$proxyPromise = createProxy(this.sn, this.onMessage, this.onClose)
+      this.$proxyPromise.then(
+        val => {
+          this.$proxyPromise = null
+          this.$proxy = val
+          this.valid = true
+          this.$message({
+            type: 'success',
+            message: '代理成功'
+          })
+        },
+        () => {
+          this.$message({
+            type: 'error',
+            message: '代理失败'
+          })
+        }
+      ).finally(() => {
+        this.loading = false
+      })
     },
-    publish (topic) {
-      if (!this.$proxy) {
+    publish (topic, message, encode, useRoot) {
+      return this.$proxy.publish(topic, message, encode, useRoot)
+    },
+    onClose (force) {
+      this.$proxyPromise?.reject()
+      this.$proxy?.stop()
+      this.valid = false
+      this.messages = []
+      if (!force) {
         this.$message({
           type: 'warning',
-          message: '请先代理设备'
+          message: '代理已断开'
         })
       }
-      return this.$proxy.publish(topic)
-    },
-    unsubscribe () {
-      if (this.$proxy) {
-        this.$proxy.cancel()
-        unsubscribe(`${this.$proxy.productId}/${this.$proxy.deviceId}/+/#`, this.onMessage)
-        this.$proxy = null
-      }
-      this.$proxyPromise?.reject()
-      this.messages = []
     },
     onMessage (topic, payload) {
       this.messages.unshift({

+ 51 - 52
src/views/realm/debug/simulator/simulate.js

@@ -3,125 +3,124 @@ import {
   subscribe,
   unsubscribe,
   publish,
-  sm4,
-  createClient
+  createClient,
+  decodePayload,
+  sm4
 } from '@/utils/mqtt'
 
-export function createProxy (sn) {
+export function createProxy (sn, onMessage, onClose) {
   const snHash = sha256(sn).toString().toUpperCase()
 
   let timer = -1
   let fn
 
+  const targetTopic = `smsb/${snHash}/init/reply`
+
   const promise = new Promise((resolve, reject) => {
     fn = (topic, payload) => {
-      unsubscribe(`smsb/${snHash}/init/reply`, fn)
-      try {
-        createProxyClient(JSON.parse(payload), resolve, reject)
-      } catch (e) {
-        reject()
+      if (topic === targetTopic) {
+        unsubscribe(targetTopic, fn)
+        try {
+          onMessage(topic, payload)
+          createProxyClient(JSON.parse(payload), { onMessage, onClose, resolve, reject })
+        } catch (e) {
+          reject()
+        }
       }
     }
 
     timer = setTimeout(() => {
-      publish(`smsb/${snHash}/init`, JSON.stringify({ messageId: Date.now().toString(), timestamp: Date.now().toString() }), true).catch(reject)
+      publish(`smsb/${snHash}/init`, getPayload(), true).catch(reject)
     }, 500)
 
-    subscribe(`smsb/${snHash}/init/reply`, fn)
+    subscribe(targetTopic, fn)
   })
 
   promise.reject = () => {
     clearTimeout(timer)
-    unsubscribe(`smsb/${snHash}/init/reply`, fn)
+    unsubscribe(targetTopic, fn)
   }
 
   return promise
 }
 
-function createProxyClient ({ productId, deviceId, username, password }, resolve, reject) {
-  if (!deviceId) {
-    reject()
-    return
-  }
-
+function createProxyClient ({ productId, deviceId, username, password }, { onMessage, onClose, resolve, reject }) {
   console.log('Simulate Connecting mqtt client...')
-  let reconnect = 0
-  let timer = -1
 
   let client = createClient({
     username,
     password,
-    clientId: deviceId
+    clientId: deviceId,
+    reconnectPeriod: 0
   })
 
+  let onEnd = reject
+  let force = false
+
   client.on('error', e => {
     console.log('Simulate MQTT error: ', e)
-    client?.end(true)
   })
 
   client.on('connect', () => {
     console.log('Simulate MQTT connected')
-    if (!client) {
-      console.log('Simulate MQTT client released')
-      return
-    }
-    clearTimeout(timer)
-    timer = setTimeout(() => {
-      reconnect = 0
-    }, 5000)
     client.subscribe([
       `${productId}/${deviceId}/oss/reply`
     ])
     resolve({
       productId,
       deviceId,
-      cancel () {
+      stop () {
         if (!client) {
           return
         }
+        force = true
         client.unsubscribe([
           `${productId}/${deviceId}/oss/reply`
         ])
         client.end(true)
       },
-      publish (topic, message, encode = true) {
+      publish (topic, message, encode = true, useRoot = false) {
         if (!client) {
           return
         }
-        message = JSON.stringify({
-          messageId: Date.now().toString(),
-          timestamp: Date.now().toString(),
-          ...message
-        })
-        client.publish(`${productId}/${deviceId}${topic}`, message && encode ? sm4.encrypt(message) : message)
+        topic = `${productId}/${deviceId}${topic}`
+        const payload = getPayload(message, deviceId)
+        onMessage(topic, payload)
+        if (useRoot) {
+          publish(topic, payload, encode).catch(console.log)
+        } else {
+          client.publish(topic, encode ? sm4.encrypt(payload) : payload)
+        }
       }
     })
+    onEnd = onClose
   })
 
-  client.on('reconnect', () => {
-    console.log('Simulate MQTT reconnecting...')
-    reconnect += 1
-    if (reconnect > 3) {
-      client?.end(true)
-    }
+  client.on('offline', () => {
+    console.log('Simulate MQTT offline')
   })
 
   client.on('close', () => {
     console.log('Simulate MQTT closed')
+    !force && client.end(true)
   })
 
-  client.on('disconnect', () => {
-    console.log('Simulate MQTT disconnect')
+  client.on('end', () => {
+    console.log('Simulate MQTT end')
+    client = null
+    !force && onEnd()
   })
 
-  client.on('offline', () => {
-    console.log('Simulate MQTT offline')
+  client.on('message', (topic, payload) => {
+    console.log('Simulate MQTT topic', topic)
+    onMessage(topic, decodePayload(topic, payload))
   })
+}
 
-  client.on('end', () => {
-    console.log('Simulate MQTT end')
-    clearTimeout(timer)
-    client = null
-    reject()
+function getPayload (payload, deviceId = 'init') {
+  return JSON.stringify({
+    messageId: `simulate_${deviceId}_${Math.random().toString(16).slice(2)}`,
+    timestamp: `${Date.now()}`,
+    ...payload
   })
 }

+ 32 - 20
yarn.lock

@@ -2488,7 +2488,7 @@ bindings@^1.5.0:
 
 bl@^4.0.2:
   version "4.1.0"
-  resolved "https://registry.yarnpkg.com/bl/-/bl-4.1.0.tgz#451535264182bec2fbbc83a62ab98cf11d9f7b3a"
+  resolved "https://registry.npmmirror.com/bl/-/bl-4.1.0.tgz#451535264182bec2fbbc83a62ab98cf11d9f7b3a"
   integrity sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==
   dependencies:
     buffer "^5.5.0"
@@ -3199,7 +3199,7 @@ commander@~2.19.0:
 
 commist@^1.0.0:
   version "1.1.0"
-  resolved "https://registry.yarnpkg.com/commist/-/commist-1.1.0.tgz#17811ec6978f6c15ee4de80c45c9beb77cee35d5"
+  resolved "https://registry.npmmirror.com/commist/-/commist-1.1.0.tgz#17811ec6978f6c15ee4de80c45c9beb77cee35d5"
   integrity sha512-rraC8NXWOEjhADbZe9QBNzLAN5Q3fsTPQtBV+fEVj6xKIgDgNiEVE6ZNfHpZOqfQ21YUzfVNUXLOEZquYvQPPg==
   dependencies:
     leven "^2.1.0"
@@ -3280,7 +3280,7 @@ concat-stream@^1.5.0:
 
 concat-stream@^2.0.0:
   version "2.0.0"
-  resolved "https://registry.yarnpkg.com/concat-stream/-/concat-stream-2.0.0.tgz#414cf5af790a48c60ab9be4527d56d5e41133cb1"
+  resolved "https://registry.npmmirror.com/concat-stream/-/concat-stream-2.0.0.tgz#414cf5af790a48c60ab9be4527d56d5e41133cb1"
   integrity sha512-MWufYdFw53ccGjCA+Ol7XJYpAlW6/prSMzuPOTRnJGcGzuhLn4Scrz7qf6o8bROZ514ltazcIFJZevcfbo0x7A==
   dependencies:
     buffer-from "^1.0.0"
@@ -4271,7 +4271,7 @@ duplexify@^3.4.2, duplexify@^3.6.0:
 
 duplexify@^4.1.1:
   version "4.1.2"
-  resolved "https://registry.yarnpkg.com/duplexify/-/duplexify-4.1.2.tgz#18b4f8d28289132fa0b9573c898d9f903f81c7b0"
+  resolved "https://registry.npmmirror.com/duplexify/-/duplexify-4.1.2.tgz#18b4f8d28289132fa0b9573c898d9f903f81c7b0"
   integrity sha512-fz3OjcNCHmRP12MJoZMPglx8m4rrFP8rovnk4vT8Fs+aonZoCwGg10dSsQsfP/E62eZcPTMSMP6686fu9Qlqtw==
   dependencies:
     end-of-stream "^1.4.1"
@@ -5336,7 +5336,7 @@ glob@7.1.4:
     once "^1.3.0"
     path-is-absolute "^1.0.0"
 
-glob@^7.0.3, glob@^7.1.2, glob@^7.1.3, glob@^7.1.4, glob@^7.1.6:
+glob@^7.0.3, glob@^7.1.2, glob@^7.1.3, glob@^7.1.4:
   version "7.2.0"
   resolved "https://registry.yarnpkg.com/glob/-/glob-7.2.0.tgz#d15535af7732e02e948f4c41628bd910293f6023"
   integrity sha512-lmLf6gtyrPq8tTjSmrO94wBeQbFR3HbLHbuyD69wuyQkImp2hWqMGB47OX65FBkPffO641IP9jWa1z4ivqG26Q==
@@ -5348,6 +5348,18 @@ glob@^7.0.3, glob@^7.1.2, glob@^7.1.3, glob@^7.1.4, glob@^7.1.6:
     once "^1.3.0"
     path-is-absolute "^1.0.0"
 
+glob@^7.1.6:
+  version "7.2.3"
+  resolved "https://registry.npmmirror.com/glob/-/glob-7.2.3.tgz#b8df0fb802bbfa8e89bd1d938b4e16578ed44f2b"
+  integrity sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==
+  dependencies:
+    fs.realpath "^1.0.0"
+    inflight "^1.0.4"
+    inherits "2"
+    minimatch "^3.1.1"
+    once "^1.3.0"
+    path-is-absolute "^1.0.0"
+
 global-dirs@^0.1.1:
   version "0.1.1"
   resolved "https://registry.yarnpkg.com/global-dirs/-/global-dirs-0.1.1.tgz#b319c0dd4607f353f3be9cca4c72fc148c49f445"
@@ -5604,7 +5616,7 @@ he@1.2.x, he@^1.1.0, he@^1.1.1:
 
 help-me@^3.0.0:
   version "3.0.0"
-  resolved "https://registry.yarnpkg.com/help-me/-/help-me-3.0.0.tgz#9803c81b5f346ad2bce2c6a0ba01b82257d319e8"
+  resolved "https://registry.npmmirror.com/help-me/-/help-me-3.0.0.tgz#9803c81b5f346ad2bce2c6a0ba01b82257d319e8"
   integrity sha512-hx73jClhyk910sidBB7ERlnhMlFsJJIBqSVMFDwPN8o2v9nmp5KgLq1Xz1Bf1fCMMZ6mPrX159iG0VLy/fPMtQ==
   dependencies:
     glob "^7.1.6"
@@ -6450,7 +6462,7 @@ js-message@1.0.7:
 
 js-sdsl@^2.1.2:
   version "2.1.4"
-  resolved "https://registry.yarnpkg.com/js-sdsl/-/js-sdsl-2.1.4.tgz#16f31a56cc09ec57723e0c477fdc07e1d2522627"
+  resolved "https://registry.npmmirror.com/js-sdsl/-/js-sdsl-2.1.4.tgz#16f31a56cc09ec57723e0c477fdc07e1d2522627"
   integrity sha512-/Ew+CJWHNddr7sjwgxaVeIORIH4AMVC9dy0hPf540ZGMVgS9d3ajwuVdyhDt6/QUvT8ATjR3yuYBKsS79F+H4A==
 
 js-sha256@0.9.0:
@@ -6625,8 +6637,8 @@ launch-editor@^2.2.1, launch-editor@^2.3.0:
 
 leven@^2.1.0:
   version "2.1.0"
-  resolved "https://registry.yarnpkg.com/leven/-/leven-2.1.0.tgz#c2e7a9f772094dee9d34202ae8acce4687875580"
-  integrity sha1-wuep93IJTe6dNCAq6KzORoeHVYA=
+  resolved "https://registry.npmmirror.com/leven/-/leven-2.1.0.tgz#c2e7a9f772094dee9d34202ae8acce4687875580"
+  integrity sha512-nvVPLpIHUxCUoRLrFqTgSxXJ614d8AgQoWl7zPe/2VadE8+1dpU3LBhowRuBAcuwruWtOdD8oYC9jDNJjXDPyA==
 
 levn@^0.4.1:
   version "0.4.1"
@@ -7161,7 +7173,7 @@ minimalistic-crypto-utils@^1.0.1:
   resolved "https://registry.yarnpkg.com/minimalistic-crypto-utils/-/minimalistic-crypto-utils-1.0.1.tgz#f6c00c1c0b082246e5c4d99dfb8c7c083b2b582a"
   integrity sha1-9sAMHAsIIkblxNmd+4x8CDsrWCo=
 
-minimatch@^3.0.4, minimatch@^3.1.2:
+minimatch@^3.0.4, minimatch@^3.1.1, minimatch@^3.1.2:
   version "3.1.2"
   resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-3.1.2.tgz#19cd194bfd3e428f049a70817c038d89ab4be35b"
   integrity sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==
@@ -7264,16 +7276,16 @@ mpegts.js@^1.6.10:
 
 mqtt-packet@^6.8.0:
   version "6.10.0"
-  resolved "https://registry.yarnpkg.com/mqtt-packet/-/mqtt-packet-6.10.0.tgz#c8b507832c4152e3e511c0efa104ae4a64cd418f"
+  resolved "https://registry.npmmirror.com/mqtt-packet/-/mqtt-packet-6.10.0.tgz#c8b507832c4152e3e511c0efa104ae4a64cd418f"
   integrity sha512-ja8+mFKIHdB1Tpl6vac+sktqy3gA8t9Mduom1BA75cI+R9AHnZOiaBQwpGiWnaVJLDGRdNhQmFaAqd7tkKSMGA==
   dependencies:
     bl "^4.0.2"
     debug "^4.1.1"
     process-nextick-args "^2.0.1"
 
-mqtt@^4.2.8:
+mqtt@^4.3.7:
   version "4.3.7"
-  resolved "https://registry.yarnpkg.com/mqtt/-/mqtt-4.3.7.tgz#42985ca490ea25d2c12c119d83c632db6dc9d589"
+  resolved "https://registry.npmmirror.com/mqtt/-/mqtt-4.3.7.tgz#42985ca490ea25d2c12c119d83c632db6dc9d589"
   integrity sha512-ew3qwG/TJRorTz47eW46vZ5oBw5MEYbQZVaEji44j5lAUSQSqIEoul7Kua/BatBW0H0kKQcC9kwUHa1qzaWHSw==
   dependencies:
     commist "^1.0.0"
@@ -7533,7 +7545,7 @@ num2fraction@^1.2.2:
 
 number-allocator@^1.0.9:
   version "1.0.10"
-  resolved "https://registry.yarnpkg.com/number-allocator/-/number-allocator-1.0.10.tgz#efc4c665e45bf60f0ad172aca1540e093b5292e8"
+  resolved "https://registry.npmmirror.com/number-allocator/-/number-allocator-1.0.10.tgz#efc4c665e45bf60f0ad172aca1540e093b5292e8"
   integrity sha512-K4AvNGKo9lP6HqsZyfSr9KDaqnwFzW203inhQEOwFrmFaYevpdX4VNwdOLk197aHujzbT//z6pCBrCOUYSM5iw==
   dependencies:
     debug "^4.3.1"
@@ -8815,8 +8827,8 @@ regjsparser@^0.8.2:
 
 reinterval@^1.1.0:
   version "1.1.0"
-  resolved "https://registry.yarnpkg.com/reinterval/-/reinterval-1.1.0.tgz#3361ecfa3ca6c18283380dd0bb9546f390f5ece7"
-  integrity sha1-M2Hs+jymwYKDOA3Qu5VG85D17Oc=
+  resolved "https://registry.npmmirror.com/reinterval/-/reinterval-1.1.0.tgz#3361ecfa3ca6c18283380dd0bb9546f390f5ece7"
+  integrity sha512-QIRet3SYrGp0HUHO88jVskiG6seqUGC5iAG7AwI/BV4ypGcuqk9Du6YQBUOUqm9c8pw1eyLoIaONifRua1lsEQ==
 
 relateurl@0.2.x:
   version "0.2.7"
@@ -8971,7 +8983,7 @@ reusify@^1.0.4:
 
 rfdc@^1.3.0:
   version "1.3.0"
-  resolved "https://registry.yarnpkg.com/rfdc/-/rfdc-1.3.0.tgz#d0b7c441ab2720d05dc4cf26e01c89631d9da08b"
+  resolved "https://registry.npmmirror.com/rfdc/-/rfdc-1.3.0.tgz#d0b7c441ab2720d05dc4cf26e01c89631d9da08b"
   integrity sha512-V2hovdzFbOi77/WajaSMXk2OLm+xNIeQdMMuB7icj7bk6zi2F8GGAxigcnDFpJHbNyNcgyJDiP+8nOrY5cZGrA==
 
 rgb-regex@^1.0.1:
@@ -10800,9 +10812,9 @@ ws@^6.0.0, ws@^6.2.1:
     async-limiter "~1.0.0"
 
 ws@^7.5.5:
-  version "7.5.7"
-  resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.7.tgz#9e0ac77ee50af70d58326ecff7e85eb3fa375e67"
-  integrity sha512-KMvVuFzpKBuiIXW3E4u3mySRO2/mCHSyZDJQM5NQ9Q9KHWHWh0NHgfbRMLLrceUK5qAL4ytALJbpRMjixFZh8A==
+  version "7.5.9"
+  resolved "https://registry.npmmirror.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591"
+  integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
 
 xmlbuilder@^9.0.7:
   version "9.0.7"