将元素从队列中出列时写入文件:Scala fs2 Stream
Posted
技术标签:
【中文标题】将元素从队列中出列时写入文件:Scala fs2 Stream【英文标题】:Writing elements to a file as they are dequeued from the queue : Scala fs2 Stream 【发布时间】:2020-10-08 08:58:45 【问题描述】:我对 fs2 流、进程元素进行了小测试,等待然后将它们写入文件。 我收到一个类型错误,但我无法弄清楚它的含义:
错误:required: fs2.Stream[[x]cats.effect.IO[x],Unit] => fs2.Stream[[+A]cats.effect.IO[A],Unit],
found : [F[_]]fs2.Pipe[F,Byte,Unit]
导入 java.nio.file.Paths
import cats.effect.Blocker, ExitCode, IO, IOApp, Timer
import fs2.Stream
import fs2.io
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q: Queue[IO, Int])(implicit timer: Timer[IO])
import core.Processing._
val blocker: Blocker =
Blocker.liftExecutionContext(
scala.concurrent.ExecutionContext.Implicits.global
)
def storeInQueue: Stream[IO, Unit] =
Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
.metered(Random.between(1, 20).seconds)
.through(q.enqueue)
def getFromQueue: Stream[IO, Unit] =
q.dequeue
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
.through(
io.file
.writeAll(Paths.get("file.txt"), blocker)
)
object Five extends IOApp
override def run(args: List[String]): IO[ExitCode] =
val program = for
q <- Queue.bounded[IO, Int](10)
b = new StreamTypeIntToDouble(q)
_ <- b.storeInQueue.compile.drain.start
_ <- b.getFromQueue.compile.drain
yield ()
program.as(ExitCode.Success)
【问题讨论】:
【参考方案1】:这里有几个问题,第一个是最令人困惑的。 writeAll
在其上下文F[_]
中是多态的,但它需要F
(以及Sync
)的ContextShift
实例。您目前在范围内没有ContextShift[IO]
,因此编译器不会推断writeAll
的F
应该是IO
。如果你添加这样的东西:
implicit val ioContextShift: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
...那么编译器将按照您的预期推断IO
。
我对这种情况的建议是跳过类型推断。用类型参数写出来只是稍微详细一点:
.through(
io.file
.writeAll[IO](Paths.get("file.txt"), blocker)
)
...这意味着您将收到有用的错误消息,例如缺少类型类实例。
一旦你解决了这个问题,就会有其他几个问题。接下来是在这种情况下使用evalMap
意味着您将拥有()
值的流。如果将其更改为 evalTap
,日志记录的副作用仍会适当地发生,但不会丢失调用它的流的实际值。
最后一个问题是writeAll
需要一个字节流,而您已经给它一个Int
s 流。你想如何处理这种差异取决于预期的语义,但为了举例,.map(_.toByte)
之类的东西会让它编译。
【讨论】:
感谢您的澄清。关于最后一个问题,如果我有一个 String 而不是 Int 或者我创建的类型,例如 Event.. 我认为 toByte 转换会起作用,会吗? 对于一个字符串,您需要getBytes
,它来自Java API。您可能希望显式提供一个字符集,因为如果您不带参数调用 s.getBytes()
,您将获得平台的默认字符集。
我的创作的显式类型,例如事件、操作或简单的人呢?
对于您自己定义的其他类型,您需要指定如何将其序列化为字节。有很多库可以帮助解决这个问题(比如 Scodec,或者 Circe,如果你可以使用 JSON)。您还可以使用 Java 序列化,它可以让您避免额外的依赖,但有许多不愉快的方面 IMO,并且与 fs2 等 Scala 库的理念不符。以上是关于将元素从队列中出列时写入文件:Scala fs2 Stream的主要内容,如果未能解决你的问题,请参考以下文章
.N个人站成一排,从左到右编号为1-N,从左到右报数"1,2,3,……",其中报数为1和2的人出列
Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing