Play 2.x:使用 Iteratees 进行响应式文件上传
Posted
技术标签:
【中文标题】Play 2.x:使用 Iteratees 进行响应式文件上传【英文标题】:Play 2.x : Reactive file upload with Iteratees 【发布时间】:2012-08-08 15:44:14 【问题描述】:我将从问题开始:如何使用 Scala API 的 Iteratee
将文件上传到云存储(在我的情况下是 Azure Blob 存储,但我认为这不是最重要的现在)
背景:
我需要将输入分成大约 1 MB 的块,以将大型媒体文件 (300 MB+) 存储为 Azure 的 BlockBlobs
。不幸的是,我的 Scala 知识仍然很差(我的项目是基于 Java 的,Scala 的唯一用途是上传控制器)。
我尝试使用此代码:Why makes calling error or done in a BodyParser's Iteratee the request hang in Play Framework 2.0?(作为Input
Iteratee
) - 它工作得很好,但我可以使用的每个Element
的大小为 8192 字节,因此对于发送数百个字节来说太小了兆字节的文件到云端。
我必须说这对我来说是一种全新的方法,而且很可能我误解了一些东西(不想说我误解了一切;>)
我将不胜感激任何提示或链接,这将有助于我解决该主题。如果有任何类似用法的示例,那将是我了解这个想法的最佳选择。
【问题讨论】:
您是否正在寻找将输入重新分块成更大的块? 【参考方案1】:基本上,您首先需要将输入重新分块为更大的块,1024 * 1024 字节。
首先让我们有一个Iteratee
,它将消耗最多 1m 字节(可以让最后一个块更小)
val consumeAMB =
Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()
使用它,我们可以构造一个 Enumeratee
(适配器),使用一个名为 grouped 的 API 重新组合块:
val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
Enumeratee.grouped(consumeAMB)
此处分组使用Iteratee
来确定要放入每个块中的数量。它为此使用了我们的consumeAMB。这意味着结果是 Enumeratee
将输入重新分块到 1MB 的 Array[Byte]
。
现在我们需要编写BodyParser
,它将使用Iteratee.foldM
方法发送每个字节块:
val writeToStore: Iteratee[Array[Byte],_] =
Iteratee.foldM[Array[Byte],_](connectionHandle) (c,bytes) =>
// write bytes and return next handle, probable in a Future
foldM 传递一个状态并在其传递的函数(S,Input[Array[Byte]]) => Future[S]
中使用它来返回一个新的未来状态。 foldM 不会再次调用该函数,直到 Future
完成并且有可用的输入块。
并且正文解析器将重新分块输入并将其推送到存储中:
BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
返回一个 Right 表示您在正文解析结束时返回一个正文(恰好是这里的处理程序)。
【讨论】:
很好的解释。两个问题:(1)Iteratee.foldM
是做什么的?在这里的 API 文档中找不到:playframework.org/documentation/api/2.0.2/scala/… (2) 为什么需要map(Right(_))
?如果你能在你的帖子中添加一些关于这些的东西会很棒。
谢谢萨德克,我需要一些时间来测试一下。
@Sadache: Iteratee.foldM[E,A] 似乎放在master
中,但不是放在2.0.3
中,这是真的吗?我们将为此生产使用稳定版本。您是否计划很快发布新版本?
可以,不过你也可以暂时复制 foldM 方法的代码。
仅供参考,在***.com/questions/12609451/… 有更多关于这个答案的讨论。【参考方案2】:
如果您的目标是流式传输到 S3,这里有一个我已经实现并测试过的助手:
def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
(implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] =
import scala.collection.JavaConversions._
val initRequest = new InitiateMultipartUploadRequest(bucket, key)
val initResponse = s3.initiateMultipartUpload(initRequest)
val uploadId = initResponse.getUploadId
val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped
Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) case (etags, bytes) =>
val uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withPartNumber(etags.length + 1)
.withUploadId(uploadId)
.withInputStream(new ByteArrayInputStream(bytes))
.withPartSize(bytes.length)
val etag = Future s3.uploadPart(uploadRequest).getPartETag
etag.map(etags :+ _)
val futETags = enum &> rechunker |>>> uploader
futETags.map etags =>
val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
s3.completeMultipartUpload(compRequest)
.recoverWith case e: Exception =>
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
Future.failed(e)
【讨论】:
你如何在控制器上使用这个方法?【参考方案3】:将以下内容添加到您的配置文件中
play.http.parser.maxMemoryBuffer=256K
【讨论】:
【参考方案4】:对于那些也试图找出这个流问题的解决方案的人,你也可以使用parse.multipartFormData 中已经实现的东西,而不是编写一个全新的 BodyParser。 您可以实现类似下面的内容来覆盖默认处理程序 handleFilePartAsTemporaryFile。
def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] =
handleFilePart
case FileInfo(partName, filename, contentType) =>
(rechunkAdapter &>> writeToS3).map
_ =>
val compRequest = new CompleteMultipartUploadRequest(...)
amazonS3Client.completeMultipartUpload(compRequest)
...
def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)
我能够完成这项工作,但我仍然不确定整个上传过程是否是流式传输的。我尝试了一些大文件,似乎只有在客户端发送了整个文件后才开始 S3 上传。
我查看了上面的解析器实现,我认为一切都是使用 Iteratee 连接的,因此文件应该被流式传输。 如果有人对此有所了解,那将非常有帮助。
【讨论】:
以上是关于Play 2.x:使用 Iteratees 进行响应式文件上传的主要内容,如果未能解决你的问题,请参考以下文章
使用Iteratees使用Play Scala将文件直接上传到S3 chunk-by-chunk
如何在 Play 2.2.x 的 dist 任务中禁用 ScalaDoc 生成(使用 project/build.scala)?
如何在构建定义中更改 Play 2.3.x 中的默认端口 9000?
为啥在 Play 2.3 项目中 sbt 编译后 sbt 失败并显示 NoClassDefFoundError: play/Play$ in Play 2.2.x 项目?