typescript RxJS文件上传器演示

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了typescript RxJS文件上传器演示相关的知识,希望对你有一定的参考价值。

import { Fetch, Utils } from 'teambition-sdk'
import { Observable } from 'rxjs/Observable'
import { Subject } from 'rxjs/Subject'
import { ReplaySubject } from 'rxjs/ReplaySubject'
import { AjaxResponse } from 'rxjs/observable/dom/AjaxObservable'
import { Subscriber } from 'rxjs/Subscriber'
import * as config from 'config'

export interface ChunkMeta {
  fileType: string
  fileName: string
  mimeType: string
  fileCategory: string
  fileSize: number
  chunkSize: number
  chunks: number
  created: string
  fileMD5: string
  lastUpdated: string
  uploadedChunks: number[]
  storage: string
  token: {
    userId: string
    exp: number
    storage: string
  }
  fileKey: string
  downloadUrl: string
  thumbnailUrl: string
  previewUrl: string
}

export class ChunkUploader {
  static resume$ = new Subject<void>()
  static pause$ = new Subject<void>()

  private fetch = new Fetch()
  private progress = new Subject<number>()

  chunkUploader$: Observable<ChunkMeta> = this.getChunkMeta()
    .concatMap((chunkMeta) => {
      const progress$ = this.progress
      const blobs = this.slice(this.file, chunkMeta.chunks, chunkMeta.chunkSize)
      const uploaded: number[] = []
      const dists = blobs.map((blob, index) => {
        let currentLoaded = 0
        const result = this.uploadChunk(chunkMeta, index, blob)
        return result.progress$
          .catch(() => Promise.resolve({ loaded: uploaded[index] * this.file.size }))
          .do((r) => {
            currentLoaded = r.loaded / this.file.size
            uploaded[index] = currentLoaded
            const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0))
            progress$.next(percent)
            if (percent >= 0.99) {
              progress$.complete()
            }
          })
      })

      const uploadStream = Observable.from(dists)
        .mergeAll(this.concurrency)

      return Observable.forkJoin(uploadStream)
        .mapTo(chunkMeta)
    })

  progress$ = this.progress
    .distinctUntilChanged((x, y) => x - y >= 0)

  settleFile$ = this.chunkUploader$
    .concatMap((meta) => {
      return this.fetch.post(`/upload/chunk/${meta.fileKey}`)
        .retryWhen(() => ChunkUploader.resume$)
        .catch((e) => Promise.resolve(e))
    })

  constructor(
    private file: File,
    private strikerToken: string,
    private chunkRetryCount = 2,
    private concurrency = 3
  ) { }

  slice(file: File, n: number, chunkSize: number): Blob[] {
    const result: Blob[] = []
    for (let i = 0; i < n; i ++) {
      const startSize = i * chunkSize
      const slice = file.slice(startSize, i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize)
      result.push(slice)
    }
    return result
  }

  private getChunkMeta() {
    this.fetch.setAPIHost(config.FILE_HOST)
    this.fetch.setHeaders({
      Authorization: this.strikerToken
    })
    return this.fetch
      .post<ChunkMeta>(`/upload/chunk`, {
        fileSize: this.file.size,
        fileMD5: Utils.uuid(),
        lastUpdated: this.file.lastModifiedDate,
        fileName: this.file.name
      })
  }

  private uploadChunk(meta: ChunkMeta, index: number, blob: Blob) {
    const host = `${config.FILE_HOST}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}`
    const complete$ = new ReplaySubject<void>(1)
    const progress$: Observable<ProgressEvent | AjaxResponse> = Observable
      .create((subscriber: Subscriber<ProgressEvent | AjaxResponse>) => {
        const ajax$ = Observable.ajax({
          url: host,
          body: blob,
          method: 'post',
          crossDomain: true,
          headers: {
            Authorization: this.strikerToken,
            'Content-Type': 'application/octet-stream'
          },
          progressSubscriber: subscriber
        })
          .retry(this.chunkRetryCount)
          .takeUntil(ChunkUploader.pause$)
          .catch((e) => Promise.resolve(e))
          .repeatWhen(() => {
            return ChunkUploader.resume$
              .takeUntil(complete$)
          })

      const subscription = ajax$.subscribe()
      return () => subscription.unsubscribe()
    })
      .retryWhen(() => ChunkUploader.resume$)
      .publish()
      .refCount()

    return { progress$, meta }
  }
}

以上是关于typescript RxJS文件上传器演示的主要内容,如果未能解决你的问题,请参考以下文章

如何解决 TypeScript 2.4 和 RxJS 5.x 中的“主题不正确地扩展 Observable”错误?

使用 RxJS 在 TypeScript 中创建 BaseObserver

Angular,转换 JSON,Rxjs 运算符,Typescript

text Angular / Typescript / RxJs - 最佳实践

typescript rxjs订阅

typescript mergeMap Rxjs嵌套请求