Play 2.x:使用 Iteratees 进行响应式文件上传

Posted

技术标签:

【中文标题】Play 2.x:使用 Iteratees 进行响应式文件上传【英文标题】:Play 2.x : Reactive file upload with Iteratees 【发布时间】:2012-08-08 15:44:14 【问题描述】:

我将从问题开始:如何使用 Scala API 的 Iteratee 将文件上传到云存储(在我的情况下是 Azure Blob 存储,但我认为这不是最重要的现在)

背景:

我需要将输入分成大约 1 MB 的块,以将大型媒体文件 (300 MB+) 存储为 Azure 的 BlockBlobs。不幸的是,我的 Scala 知识仍然很差(我的项目是基于 Java 的,Scala 的唯一用途是上传控制器)。

我尝试使用此代码:Why makes calling error or done in a BodyParser's Iteratee the request hang in Play Framework 2.0?(作为InputIteratee) - 它工作得很好,但我可以使用的每个Element 的大小为 8192 字节,因此对于发送数百个字节来说太小了兆字节的文件到云端。

我必须说这对我来说是一种全新的方法,而且很可能我误解了一些东西(不想说我误解了一切;>)

我将不胜感激任何提示或链接,这将有助于我解决该主题。如果有任何类似用法的示例,那将是我了解这个想法的最佳选择。

【问题讨论】:

您是否正在寻找将输入重新分块成更大的块? 【参考方案1】:

基本上,您首先需要将输入重新分块为更大的块,1024 * 1024 字节。

首先让我们有一个Iteratee,它将消耗最多 1m 字节(可以让最后一个块更小)

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()

使用它,我们可以构造一个 Enumeratee(适配器),使用一个名为 grouped 的 API 重新组合块:

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

此处分组使用Iteratee 来确定要放入每个块中的数量。它为此使用了我们的consumeAMB。这意味着结果是 Enumeratee 将输入重新分块到 1MB 的 Array[Byte]

现在我们需要编写BodyParser,它将使用Iteratee.foldM方法发送每个字节块:

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle) (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  

foldM 传递一个状态并在其传递的函数(S,Input[Array[Byte]]) => Future[S] 中使用它来返回一个新的未来状态。 foldM 不会再次调用该函数,直到 Future 完成并且有可用的输入块。

并且正文解析器将重新分块输入并将其推送到存储中:

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

返回一个 Right 表示您在正文解析结束时返回一个正文(恰好是这里的处理程序)。

【讨论】:

很好的解释。两个问题:(1)Iteratee.foldM是做什么的?在这里的 API 文档中找不到:playframework.org/documentation/api/2.0.2/scala/… (2) 为什么需要map(Right(_))?如果你能在你的帖子中添加一些关于这些的东西会很棒。 谢谢萨德克,我需要一些时间来测试一下。 @Sadache: Iteratee.foldM[E,A] 似乎放在master 中,但不是放在2.0.3 中,这是真的吗?我们将为此生产使用稳定版本。您是否计划很快发布新版本? 可以,不过你也可以暂时复制 foldM 方法的代码。 仅供参考,在***.com/questions/12609451/… 有更多关于这个答案的讨论。【参考方案2】:

如果您的目标是流式传输到 S3,这里有一个我已经实现并测试过的助手:

def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
                (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = 
  import scala.collection.JavaConversions._

  val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  val initResponse = s3.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped 
    Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
  

  val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty)  case (etags, bytes) =>
    val uploadRequest = new UploadPartRequest()
      .withBucketName(bucket)
      .withKey(key)
      .withPartNumber(etags.length + 1)
      .withUploadId(uploadId)
      .withInputStream(new ByteArrayInputStream(bytes))
      .withPartSize(bytes.length)

    val etag = Future  s3.uploadPart(uploadRequest).getPartETag 
    etag.map(etags :+ _)
  

  val futETags = enum &> rechunker |>>> uploader

  futETags.map  etags =>
    val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
    s3.completeMultipartUpload(compRequest)
  .recoverWith  case e: Exception =>
    s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
    Future.failed(e)
  


【讨论】:

你如何在控制器上使用这个方法?【参考方案3】:

将以下内容添加到您的配置文件中

play.http.parser.maxMemoryBuffer=256K

【讨论】:

【参考方案4】:

对于那些也试图找出这个流问题的解决方案的人,你也可以使用parse.multipartFormData 中已经实现的东西,而不是编写一个全新的 BodyParser。 您可以实现类似下面的内容来覆盖默认处理程序 handleFilePartAsTemporaryFile

def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = 
  handleFilePart 
    case FileInfo(partName, filename, contentType) =>

      (rechunkAdapter &>> writeToS3).map 
        _ =>
          val compRequest = new CompleteMultipartUploadRequest(...)
          amazonS3Client.completeMultipartUpload(compRequest)
          ...
      
  


def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)

我能够完成这项工作,但我仍然不确定整个上传过程是否是流式传输的。我尝试了一些大文件,似乎只有在客户端发送了整个文件后才开始 S3 上传。

我查看了上面的解析器实现,我认为一切都是使用 Iteratee 连接的,因此文件应该被流式传输。 如果有人对此有所了解,那将非常有帮助。

【讨论】:

以上是关于Play 2.x:使用 Iteratees 进行响应式文件上传的主要内容,如果未能解决你的问题,请参考以下文章

使用Iteratees使用Play Scala将文件直接上传到S3 chunk-by-chunk

Iteratees和FRP之间有啥联系?

如何在 Play 2.2.x 的 dist 任务中禁用 ScalaDoc 生成(使用 project/build.scala)?

Play 2.x:如何使用普通按钮发出 AJAX 请求

如何在构建定义中更改 Play 2.3.x 中的默认端口 9000?

为啥在 Play 2.3 项目中 sbt 编译后 sbt 失败并显示 NoClassDefFoundError: play/Play$ in Play 2.2.x 项目?