如何将 Source[ByteString, Any] 转换为 InputStream

Posted

技术标签:

【中文标题】如何将 Source[ByteString, Any] 转换为 InputStream【英文标题】:How to convert Source[ByteString, Any] to InputStream 【发布时间】:2015-08-10 17:48:32 【问题描述】:

akka-http 表示使用 multipart/form-data 编码上传的文件为Source[ByteString, Any]。我需要使用需要 InputStream 的 Java 库来解组它。

如何将Source[ByteString, Any] 变成InputStream

【问题讨论】:

我没有投反对票,但我猜这个问题背后似乎没有任何工作。你发现了一个障碍。你试图绕过它做什么?很高兴看到有人先投入一些时间进行研究,在来这里之前用尽这些选择并告诉我们您已经尝试过什么。 仅供参考,在 groups.google.com/forum/#!topic/akka-user/4WvOrFtewQY 的 akka 用户列表中似乎对此进行了一些讨论,还有一个关于同一 github.com/akka/akka/issues/17338 的未决案例 我也对否决票感到好奇。我认为这个问题是有效的,因为该解决方案既不是由库开箱即用提供的,也不是在文档中描述的。答案也很有帮助,希望能对其他人有所帮助。 【参考方案1】:

从 2.x 版开始,您可以使用以下代码实现此目的:

import akka.stream.scaladsl.StreamConverters
...
val inputStream: InputStream = entity.dataBytes
        .runWith(
           StreamConverters.asInputStream(FiniteDuration(3, TimeUnit.SECONDS))
        )

见:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.1/scala/migration-guide-1.0-2.x-scala.html

注意:在 2.0.2 版本中被破坏并在 2.4.2 中修复

【讨论】:

它已损坏,但是从 2.0.2 开始:github.com/akka/akka/issues/19392 并且已接受答案的解决方案可能由于融合而死锁(发生了不太可能的情况!;) @kostya 感谢您指出此问题,但修复后语法似乎保持不变:github.com/akka/akka/pull/19575 我同意,一旦错误得到修复,这将是首选解决方案。特别是考虑到当前接受的答案中的方法并不总是适用于 2.0 @lisak 在这种情况下,您可以执行 data.runFold(ByteString.empty)(_ ++ _).toArray ,其中数据类型为 Source[ByteString, Any] 它会在物化时将整个数据集加载到内存中吗?【参考方案2】:

您可以尝试使用写入PipedOutputStreamOutputStreamSink 并将其馈送到您的其他代码用作其输入流的PipedInputStream。这是一个有点粗略的想法,但它可以工作。代码如下所示:

import akka.util.ByteString
import akka.stream.scaladsl.Source
import java.io.PipedInputStream
import java.io.PipedOutputStream
import akka.stream.io.OutputStreamSink
import java.io.BufferedReader
import java.io.InputStreamReader
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer

object PipedStream extends App
  implicit val system = ActorSystem("flowtest")
  implicit val mater = ActorFlowMaterializer()

  val lines = for(i <- 1 to 100) yield ByteString(s"This is line $i\n")
  val source = Source(lines)

  val pipedIn = new PipedInputStream()
  val pipedOut = new PipedOutputStream(pipedIn)      
  val flow = source.to(OutputStreamSink(() => pipedOut))
  flow.run()

  val reader = new BufferedReader(new InputStreamReader(pipedIn))
  var line:String = reader.readLine
  while(line != null)
    println(s"Reader received line: $line")
    line = reader.readLine
             

【讨论】:

顺便说一句,PipedOutputStream javadocs 提到不建议尝试在单个线程中同时使用PipedOutputStreamPipedInputStream,因为它可能会使线程死锁。 akka 中有什么方法可以确保写入和读取发生在不同的线程中? 我认为您不太可能最终出现在同一个线程上。 HTTP 系统使用ExecutionContext 来服务具有您要阅读的多部分表单的请求。该请求由该 EC 的一个线程提供服务。如果您使用相同的 EC 启动另一个 RunnableFlow,您将获得另一个线程,因此那里没有死锁。如果您真的很担心,那么您可以让读取表单数据的流程使用完全不同的ExecutionContext 谢谢你的解释,虽然我有点担心你回答中的“不太可能”这个词;)它可能会发生(并且将会)或不会发生。 我说“不太可能”,因为它设置正确它不会发生。但是例如,假设您正在测试,并且您对所有事情都使用 CallingThreadDispatcher,那么它将陷入僵局。【参考方案3】:

您可以从 ByteString 中提取一个插入器,然后获取 InputStream。像这样的东西(伪代码):

source.map  data: ByteString =>
  data.iterator.asInputStream

更新

以 Multipart.FormData 开头的更详细的示例

def isSourceFromFormData(formData: Multipart.FormData): Source[InputStream, Any] = 
 formData.parts.map  part => 
   part.entity.dataBytes
   .map(_.iterator.asInputStream)
.flatten(FlattenStrategy.concat)

【讨论】:

现在的问题是如何将多个输入流转换成单个输入流 您需要在此处使用源代码。正如您在问题中提到的,您将一些二进制文件作为多部分形式的部分上传。因此,您所要做的就是将源的 ByteString 源(每个文件 1 个源)转换为唯一的 Source[ByteString]。 akka 流中的此操作应执行扁平化操作。像这样的东西(伪代码):source of sources.flatten(FlattenStrategy.concat)。希望这会有所帮助。 没那么简单。每个文件由 Source[ByteString] 表示,每个 Source 可能有多个 ByteString。 也许您需要多次展平?无论如何,我认为你已经掌握了正确完成这项任务的所有信息。 鉴于您的情况,我认为您应该使用 source.runWith(StreamConverters.asInputStream) 并将该输入流提供给您的 Java API。

以上是关于如何将 Source[ByteString, Any] 转换为 InputStream的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Akka ByteString 转换为字符串?

将 Data.ByteString.Lazy 转换为 CStringLen 的最有效方法

如何通过流式ByteString跟踪进度?

如何在现有的 Haskell 代码中从 String 转到 Data.ByteString.Lazy?

PlayWS发布多部分表单数据

java protobuf如何从int创建ByteString