import Vue from 'vue' import axios from 'axios' import sparkMd5 from 'spark-md5' import { MessageBox, Message } from 'element-ui' import { addOrg } from '@/api/base' import request, { tenantRequest } from '@/utils/request' import { AssetType } from '@/constant' const CancelToken = axios.CancelToken const EventBus = new Vue() export const State = { INIT: 0, CALC: 4, CHECK: 8, READY: 10, UPLOADING: 50, MERGING: 90, SUCCESS: 100, ABORT: 999 } // 计算线程数的阀值,大于1时本地校验同文件将可能失效 const CALC_MAX = 1 // 上传线程数的阀值 const IDLE_MAX = 5 // 单片的大小 const CHUNK_SIZE = 10 * 1024 * 1024 // 完成度优先 // 开启后,空闲线程优先满足单个文件 // 关闭时,多线程数为同时上传的文件数最大数,当一个块上传完时会启用空闲线程协助上传 const COMPLETE_FIRST = false const ANALYSIS_FILE = true const files = [] const analyzeQueue = [] const hashQueue = [] const pendingQueue = [] let idle = IDLE_MAX let calc = CALC_MAX const noop = () => {} export function appendFile (file) { const fileType = getType(file) if (!fileType) { return } const { name, size } = file const obj = { id: Math.random().toString(16).slice(2), name, file, type: fileType, totalSize: size, uploaded: 0, valid: 1, state: State.INIT, url: fileType === AssetType.IMAGE ? URL.createObjectURL(file) : null } console.log(`添加文件${obj.name}, ${obj.totalSize}, ${file.type}`) files.push(obj) emitChange() if (ANALYSIS_FILE) { analyzeQueue.push(obj) analyzeMediaInfo() } else { startCalculate(obj) } } let analyzing = false function analyzeMediaInfo () { if (analyzing) { return } const obj = analyzeQueue.shift() if (!obj) { return } analyzing = true analyzeByWorker(obj).then(() => { startCalculate(obj) }, () => { remove(obj) }).finally(() => { analyzing = false analyzeMediaInfo() }) } function analyzeByWorker (obj) { return new Promise((resolve, reject) => { console.log(`开始解析文件${obj.name}`) const worker = new Worker('/mediainfo.js') worker.onmessage = e => { let isNeedConfirm = false const { error, media } = e.data if (error) { console.log(`解析文件${obj.name}失败`, error) } else { console.log(media) if (obj.type === AssetType.IMAGE) { const imageTrack = media.track.find(track => track['@type'] === 'Image') if (imageTrack) { obj.resolutionRatio = `${imageTrack.Width}x${imageTrack.Height}` } } else { const generalTrack = media.track.find(track => track['@type'] === 'General') obj.duration = generalTrack ? parseInt(generalTrack.Duration) | 0 : 0 if (obj.type === AssetType.VIDEO) { const videoTrack = media.track.find(track => track['@type'] === 'Video') if (videoTrack && videoTrack.Format !== 'AVC') { isNeedConfirm = true } } } } if (isNeedConfirm) { MessageBox.confirm( `视频 ${obj.name} 非H264编码将无法预览`, '继续上传', { type: 'warning' } ).then(resolve, reject) } else { resolve() } worker.terminate() } worker.onmessageerror = worker.onerror = e => { console.log(`解析视频${obj.name}失败`, e) resolve() worker.terminate() } worker.postMessage(obj) }) } export function removeFile (id) { const index = files.findIndex(obj => obj.id === id) if (~index) { const obj = files[index] if (isResolved(obj)) { files.splice(index, 1) emitChange() } else { MessageBox.confirm( `取消上传文件 ${obj.name} ?`, { type: 'warning' } ).then(() => { console.log(`${obj.name}已取消`) files.splice(index, 1) obj.source?.cancel('abort') setState(obj, State.ABORT) emitChange() }) } } } export function retry (id) { const obj = files.find(obj => obj.id === id) if (obj && !obj.valid) { obj.valid = true switch (obj.state) { case State.CALC: startCalculate(obj) break case State.CHECK: startCheck(obj) break case State.UPLOADING: startUpload(obj) break case State.MERGING: startMerge(obj) break default: break } emitChange() } } export function addListener (eventName, fn) { EventBus.$on(eventName, fn) if (eventName === 'change') { fn(getFiles()) } } export function removeListener (...args) { EventBus.$off(...args) } function emit (...args) { EventBus.$emit(...args) } function getType ({ name, type }) { switch (true) { case /png|jpg|jpeg|gif/i.test(type): return AssetType.IMAGE case /mp4/i.test(type): return AssetType.VIDEO case /audio\/mpeg/.test(type): return AssetType.AUDIO case /application\/(vnd.ms-powerpoint|vnd.openxmlformats-officedocument.presentationml.presentation)/.test(type): return AssetType.PPT case /application\/pdf/.test(type): return AssetType.PDF case /application\/(msword|vnd.openxmlformats-officedocument.wordprocessingml.document)/i.test(type): return AssetType.DOC default: Message({ type: 'warning', message: `暂不支持${name}该类型文件` }) return null } } let pending = false function emitChange () { if (!pending) { pending = true Vue.nextTick(() => { pending = false emit('change', getFiles()) }) } } let count = 0 function setState (obj, state) { if (state >= State.SUCCESS) { count -= 1 } else if (obj.state === State.INIT || state === obj.state) { count += 1 } obj.state = state emitChange() } function setInvalid (obj, message, stack) { count -= 1 console.warn(message, stack || '') Message({ type: 'warning', message }) obj.valid = false emitChange() } export function isUploading () { return count > 0 } function transformFile ({ id, type, name, state, valid, uploaded, totalChunks, url }) { return { id, type, name, state, valid, url, percentage: state === State.UPLOADING ? 10 + uploaded * 80 / totalChunks | 0 : state } } export function getFiles () { return files.map(transformFile) } function remove (obj, message) { if (message) { console.log(message) Message({ type: 'warning', message }) } setState(obj, State.ABORT) removeFile(obj.id) } function isResolved ({ state, valid }) { return state === State.SUCCESS || state === State.ABORT || !valid } function startCalculate (obj) { hashQueue.push(obj) shuntCalculate() } function shuntCalculate () { if (!calc) { return } const obj = hashQueue.shift() if (!obj) { return } if (isResolved(obj)) { shuntCalculate() return } console.log(`正在计算${obj.name}的HASH`) calc -= 1 calculate(obj) .finally(() => { obj.source = null calc += 1 shuntCalculate() }) .then(({ hash, chunks, totalChunks }) => { console.log(`${obj.name}计算HASH结束`, hash) obj.hash = hash obj.chunks = chunks obj.totalChunks = totalChunks startCheck(obj) }, e => { if (!isResolved(obj)) { setInvalid(obj, `${obj.name}计算HASH异常`, e) } }) } function calculate (obj) { setState(obj, State.CALC) if (typeof Worker !== 'undefined') { return calculateHashByWorker(obj) } if (window.requestIdleCallback) { return calculateHashByIdle(obj) } return calculateSimple(obj) } function calculateSimple (obj) { return new Promise((resolve, reject) => { obj.source = { cancel: reason => reject(reason) } const { file, totalSize } = obj file.arrayBuffer().then(arrayBuffer => { const spark = new sparkMd5.ArrayBuffer() spark.append(arrayBuffer) resolve({ hash: spark.end(), chunks: [{ index: 0, raw: file, size: totalSize }], totalChunks: 1 }) }, reject) }) } function createWorkerCancel (reject) { let worker return { cancel (reason) { if (worker !== null) { worker.terminate() worker = null reject(reason) } }, token (val) { worker = val } } } function calculateHashByWorker (obj, options = {}) { return new Promise((resolve, reject) => { obj.source = createWorkerCancel(reject) const { progress = noop } = options const { token } = obj.source const chunks = createChunks(obj.file) const worker = new Worker('/hash.js') token(worker) worker.onmessage = e => { const { hash, index, total } = e.data if (hash) { token(null) worker.terminate() resolve({ hash, chunks, totalChunks: chunks.length }) } else { progress(index, total) } } worker.onmessageerror = worker.onerror = e => { obj.source?.cancel(e) } worker.postMessage({ chunks }) }) } function createIdleCancel (reject) { let id = null return { cancel (reason) { if (id !== null) { window.cancelIdleCallback(id) reject(reason) } }, token (val) { id = val } } } function calculateHashByIdle (obj, options = {}) { return new Promise((resolve, reject) => { obj.source = createIdleCancel(reject) const { progress = noop } = options const { token } = obj.source const chunks = createChunks(obj.file) const total = chunks.length const spark = new sparkMd5.ArrayBuffer() const appendToSpark = blob => blob.arrayBuffer().then(arrayBuffer => spark.append(arrayBuffer)) let count = 0 const workLoop = async deadline => { while (count < total && deadline.timeRemaining() > 1) { await appendToSpark(chunks[count].raw) count++ progress(count, total) if (count === total) { token(null) resolve({ hash: spark.end(), chunks, totalChunks: chunks.length }) } } count < total && token(window.requestIdleCallback(workLoop)) } progress(count, total) token(window.requestIdleCallback(workLoop)) }) } function createChunks (blob, chunkSize = CHUNK_SIZE) { const chunks = [] try { const { size } = blob let cur = 0 let index = 0 while (cur < size) { chunks.push({ index, raw: blob.slice(cur, cur + chunkSize), size: Math.min(chunkSize, size - cur) }) cur += chunkSize index += 1 } } catch (e) { console.warn(e) return [] } return chunks } function isExists (obj) { const { id, hash } = obj const file = files.find(file => file.id !== id && file.hash === hash) if (file) { return { result: true, message: obj.name === file.name ? `${obj.name}已在上传列表中` : `${obj.name}在上传列表中存在同内容文件${file.name}` } } console.log(`${obj.name}服务器校验`) obj.source = CancelToken.source() return request({ url: '/minio-data/integral/check', method: 'post', data: { identifier: hash }, cancelToken: obj.source.token, custom: true, background: true }).finally(() => { obj.source = null }).then( ({ data }) => { return { result: true, message: `${obj.name}已上传过,文件名为${data.originalName}` } }, ({ errCode, data }) => { if (errCode === '666') { return { result: false, message: data } } return Promise.reject() } ) } async function startCheck (obj) { try { setState(obj, State.CHECK) const { result, message } = await isExists(obj) if (result) { remove(obj, message) return } message.forEach(chunkNumber => { const index = obj.chunks.findIndex(({ index }) => index === chunkNumber) if (~index) { obj.chunks.splice(index, 1) obj.uploaded += 1 } }) } catch (e) { if (!isResolved(obj)) { setInvalid(obj, `${obj.name}校验失败`, e) } return } if (obj.chunks.length) { setState(obj, State.READY) startUpload(obj) } else { startMerge(obj) } } function startUpload (obj) { console.log(`准备分流${obj.name}`) pendingQueue.push(obj) shuntTask() } function shuntTask () { console.log(`分流,当前剩余文件${pendingQueue.length}、线程${idle}`) if (!idle || !pendingQueue.length) { return } const obj = pendingQueue.shift() if (isResolved(obj)) { shuntTask() return } obj.source = CancelToken.source() setState(obj, State.UPLOADING) startChunkTask(obj) } function idleTask () { idle += 1 shuntTask() } function finish (obj) { const { id, hash, name, type } = obj Message({ type: 'success', message: `${name}上传成功` }) setState(obj, State.SUCCESS) emit('uploaded', { id, hash, name, type }) } function transformName (name) { return name.replace(/\.[^.]+$/, '') } function startChunkTask (obj) { idle -= 1 const { name, hash, source, chunks, totalChunks } = obj console.log(`开始上传${name},共${totalChunks}个切片`) start() // 抢占 if (COMPLETE_FIRST) { checkIdle() } function checkIdle () { while (idle && chunks.length) { idle -= 1 start(true) } } async function start (sub) { const chunk = chunks.shift() const { index, raw, size } = chunk try { console.log(`上传${name}第${index + 1}个切片, ${size}`) const formData = new FormData() formData.append('identifier', hash) formData.append('chunkNumber', index) formData.append('file', raw) await request({ url: '/minio-data/chunk/upload', method: 'post', data: formData, timeout: 0, cancelToken: source.token, custom: true, background: true }) console.log(`${name}第${index + 1}个切片上传成功`) obj.uploaded += 1 emitChange() if (obj.uploaded === totalChunks) { idleTask() startMerge(obj) } else if (!chunks.length || (sub && !COMPLETE_FIRST && pendingQueue.length)) { idleTask() } else { start(sub) // 利用空闲线程 checkIdle() } } catch (e) { chunks.unshift(chunk) if (isResolved(obj)) { chunk.error = 0 idleTask() return } chunk.error = (chunk.error || 0) + 1 console.log(`${name}第${index + 1}个切片上传失败第${chunk.error}次`, e) if (chunk.error === 3) { chunk.error = 0 obj.source?.cancel(e) obj.source = null setInvalid(obj, `${name}上传失败`) idleTask() } else { start(sub) } } } } function startMerge (obj) { const { hash, name, type, totalSize, totalChunks, resolutionRatio, duration } = obj console.log(`开始合并${name}`) setState(obj, State.MERGING) tenantRequest({ url: '/minio-data/chunk/reduce', method: 'post', data: addOrg({ identifier: hash, filename: transformName(name), type, totalSize, totalChunks, resolutionRatio, duration }), timeout: 0, custom: true, background: true }).then(({ data, errMessage }) => { if (data) { finish(obj, State.SUCCESS) } else { setInvalid(obj, `${name}合并文件失败`, errMessage) } }, e => { if (!isResolved(obj)) { setInvalid(obj, `${name}合并文件失败`, e) } }) }