使用Iteratees使用Play Scala将文件直接上传到S3 chunk-by-chunk

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Iteratees使用Play Scala将文件直接上传到S3 chunk-by-chunk相关的知识,希望对你有一定的参考价值。

我试图使用Iteratees将文件直接上传到s3。我仍然是函数式编程的新手,并且很难将一些工作代码拼凑在一起。

我编写了一个iteratee,它处理上传文件的块并将它们发送到S3。上传失败,最后出错。

请帮我解决这个问题。

以下是我提出的代码

控制器处理程序

  def uploadFile = Action.async(BodyParser(rh => S3UploadHelper("bucket-name").s3Iteratee() ))  { implicit request =>
    Future {
      if(uploadLogger.isInfoEnabled) uploadLogger.info(s"Contents of Uploaded file :: 
 " + request.body)
      Ok(views.html.index("File uploaded"))
    }
  }

助手班

case class S3UploadHelper(bucket: String, key: String = UUID.generate()) {

  private val AWS_ACCESS_KEY = ""
  private val AWS_SECRET_KEY = ""
  private val yourAWSCredentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY)
  val amazonS3Client = new AmazonS3Client(yourAWSCredentials)


  private val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  private val initResponse = amazonS3Client.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val uploadLogger = Logger("upload")

  def s3Iteratee(etags: Seq[PartETag] = Seq.empty[PartETag]): Iteratee[Array[Byte], Either[Result, CompleteMultipartUploadResult]] = Cont {
    case in: El[Array[Byte]] =>
      // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk
      val uploadRequest = new UploadPartRequest()
        .withBucketName(bucket)
        .withKey(key)
        .withPartNumber(etags.length + 1)
        .withUploadId(uploadId)
        .withInputStream(new ByteArrayInputStream(in.e))
        .withPartSize(in.e.length)
      if(uploadLogger.isDebugEnabled) uploadLogger.debug(">> " + String.valueOf(in.e))
      val etag = Future { amazonS3Client.uploadPart(uploadRequest).getPartETag }
      etag.map(etags :+ _)
      Await.result(etag, 1.seconds)
      s3Iteratee(etags)
    case in @ Empty => s3Iteratee(etags)
    case in @ EOF =>
      import scala.collection.JavaConversions._
      val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toList)
      val result = amazonS3Client.completeMultipartUpload(compRequest)
      Done(Right(result), in)
    case in => s3Iteratee(etags)
  }

}

虽然Iteratee似乎工作,我能够按块处理文件块,但上传失败并出现奇怪的错误。这是日志

[debug] upload - >> [B@1df9048d
[debug] upload - >> [B@152dcf59
[debug] upload - >> [B@7cfeb0d8
[debug] upload - >> [B@136844c5
[debug] upload - >> [B@16f41590
[debug] upload - >> [B@6dd85710
[debug] upload - >> [B@64294203
[debug] upload - >> [B@35366c2f
[debug] upload - >> [B@358a78c
[debug] upload - >> [B@2c171020
[debug] upload - >> [B@20076fb
[debug] upload - >> [B@4d13580
[debug] upload - >> [B@42738651
[debug] upload - >> [B@5671082f
[debug] upload - >> [B@57c70bb4
[debug] upload - >> [B@4154394f
[debug] upload - >> [B@4f93cf15
[debug] upload - >> [B@4bac523f
[debug] upload - >> [B@eaec52e
[debug] upload - >> [B@6ed00bf5
[debug] upload - >> [B@3f6a8a5d
[debug] upload - >> [B@16fe1a25
[debug] upload - >> [B@6e813a61
[debug] upload - >> [B@e01be7
[debug] upload - >> [B@6bb351c4
[debug] upload - >> [B@dfa51a5
[debug] upload - >> [B@6acf2049
[debug] upload - >> [B@6a7021d4
[debug] upload - >> [B@1b3c602f
[debug] upload - >> [B@44146d94
[debug] upload - >> [B@574ac037
[debug] upload - >> [B@3cdf258b
[debug] upload - >> [B@441a0727
[debug] upload - >> [B@2385aafd
[debug] upload - >> [B@224f9dc2
[debug] upload - >> [B@6779077d
[debug] upload - >> [B@734e178a
[debug] upload - >> [B@7d92895c
[debug] upload - >> [B@23edaaa1
[debug] upload - >> [B@c00134e
[debug] upload - >> [B@ff1a703
[error] play - Cannot invoke the action, eventually got an error: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 98h72s0EBA7653AD, AWS Error Code: MalformedXML, AWS Error Message: The XML you provided was not well-formed or did not validate against our published schema, S3 Extended Request ID: R7e44g8oRy5b4xd7MU++atibwrBSRFezeMxNCXE38gyzcwci5Zf
[error] application - 

! @6k2maob49 - Internal server error, for (POST) [/v1/file_upload] ->

play.api.Application$$anon$1: Execution exception[[AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema]]
        at play.api.Application$class.handleError(Application.scala:296) ~[play_2.10-2.3.2.jar:2.3.2]
        at play.api.DefaultApplication.handleError(Application.scala:402) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at scala.Option.map(Option.scala:145) [scala-library.jar:na]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:556) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:289) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:170) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:2723) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:1964) ~[aws-java-sdk-1.3.11.jar:na]
答案

我过去做过这个,亚马逊s3需要5Mb块,我最后还是返回元组,你可以根据你的要求改变。

val client = new AmazonS3Client(new BasicAWSCredentials(access_key, secret_key))

def my_parser = BodyParser { 

val consume_5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) &>> Iteratee.consume()
val rechunkAdapter: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped(consume_5MB)

multipartFormData(Multipart.handleFilePart({

  case Multipart.FileInfo(partName, file_name, content_type) => {

    val object_id = java.util.UUID.randomUUID().toString().replaceAll("-", "")
    val object_id_key = if (content_type.getOrElse("").contains("video") || content_type.getOrElse("").contains("audio")) object_id else object_id + file_name.substring(file_name.lastIndexOf('.'))
    var position = 0
    val etags = new java.util.ArrayList[PartETag]()

    val initRequest = new InitiateMultipartUploadRequest(bucket, object_id_key)
    val initResponse = client.initiateMultipartUpload(initRequest)
    println("fileName = " + file_name)
    println("contentType = " + content_type)

    (rechunkAdapter &>> Iteratee.foldM[Array[Byte], Int](1) { (c, bytes) =>
      Future {
        println("got a chunk!  :" + c + " size in KB: " + (bytes.length / 1024));
        val is = new java.io.ByteArrayInputStream(bytes)

        val uploadRequest = new UploadPartRequest()
          .withBucketName(bucket).withKey(object_id_key)
          .withUploadId(initResponse.getUploadId())
          .withPartNumber(c)
          .withFileOffset(position)
          .withInputStream(is)
          .withPartSize(bytes.length)

        etags.add(client.uploadPart(uploadRequest).getPartETag)
        position = position + bytes.length

        c + 1
      }
    }).map { v =>
      try {
        val compRequest = new CompleteMultipartUploadRequest(
          bucket,
          object_id_key,
          initResponse.getUploadId(),
          etags)
        client.completeMultipartUpload(compRequest)
        println("Finished uploading " + file_name)   
        client.setObjectAcl(bucket, object_id_key, com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead)
        (object_id_key, file_name, content_type.getOrElse("application/octet-stream")) 
      } catch {
        case e: Exception => {
          println("S3 upload Ex " + e.getMessage())
          val abortMPURequest = new AbortMultipartUploadRequest("xxxxxxx", object_id, initResponse.getUploadId())
          client.abortMultipartUpload(abortMPURequest);
         ("error", file_name, content_type.getOrElse("application/octet-stream"))
        }
      }
    }
  }
}))

}

以上是关于使用Iteratees使用Play Scala将文件直接上传到S3 chunk-by-chunk的主要内容,如果未能解决你的问题,请参考以下文章

Play 2.x:使用 Iteratees 进行响应式文件上传

Play 2.2:使用 Play Caching (Scala) 对代码进行单元测试时出现问题

Scala,使用 play 2.5 将 joda.LocalDateTime 转换为 json

使用ScalaTest和Selenium测试Scala.JS + Play

Play framework + Scala:使用动作组合注入依赖

如何使用 Scala 登录 Play Framework?