Akka SourceQueue发送列表元素

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Akka SourceQueue发送列表元素相关的知识,希望对你有一定的参考价值。

我有一个List[String]和一个Source.queue。我想在一段时间后提供这个队列字符串元素。像这样的东西:

val data : List[String] = ""
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(data(??))

有人可以帮我吗?

编辑:我找到了一种方法,但寻找更优雅的方式

val tick = Source.tick(0 second, 2 second, "tick").zipWithIndex.limit(data.length)

tick.runForeach(t => {
  queue.offer(data(t._2.toInt))
}) 
答案

要在每个元素的特定时间间隔内将List[String]中的元素发送到队列,请按以下方式使用Source#delay

val data: List[String] = ???

Source(data)
  .delay(2.seconds, DelayOverflowStrategy.backpressure)
  .withAttributes(Attributes.inputBuffer(1, 1))
  .mapAsync(1)(x => queue.offer(x))
  .runWith(Sink.ignore)

使用withAttributes将输入缓冲区大小设置为1,因为默认值为16,并使用DelayOverflowStrategy.backpressure。此外,使用mapAsync,因为offer方法返回Future

或者,使用Source#throttle

Source(data)
  .throttle(1, 2.seconds, 1, ThrottleMode.Shaping)
  .mapAsync(1)(x => queue.offer(x))
  .runWith(Sink.ignore)

以上是关于Akka SourceQueue发送列表元素的主要内容,如果未能解决你的问题,请参考以下文章

如何在非对称系统中将对象发送到远程 akka 演员

Akka 经典 - 发送自定义类消息类型

Akka 在生成期间键入发送消息

Akka,发送Udp失败,“无法分配请求的地址”

akka-http 发送连续的分块 http 响应(流)

Akka源码分析-Persistence-AtLeastOnceDelivery