Scala中有FIFO流吗?
Posted
技术标签:
【中文标题】Scala中有FIFO流吗?【英文标题】:Is there a FIFO stream in Scala? 【发布时间】:2011-11-25 01:54:45 【问题描述】:我正在寻找 Scala 中的 FIFO 流,即提供以下功能的东西
immutable.Stream(可以是有限的并记住已读取的元素的流) mutable.Queue(允许将元素添加到 FIFO)流应该是可关闭的,并且应该阻止对下一个元素的访问,直到元素被添加或流被关闭。
实际上,我有点惊讶于集合库没有(似乎)包含这样的数据结构,因为它是 IMO 一个非常经典的数据结构。
我的问题:
1) 我是否忽略了什么?是否已有提供此功能的类?
2) 好的,如果它不包含在集合库中,那么它可能只是现有集合类的简单组合。然而,我试图找到这个微不足道的代码,但对于这样一个简单的问题,我的实现看起来仍然相当复杂。这样的 FifoStream 有没有更简单的解决方案?
class FifoStream[T] extends Closeable
val queue = new Queue[Option[T]]
lazy val stream = nextStreamElem
private def nextStreamElem: Stream[T] = next() match
case Some(elem) => Stream.cons(elem, nextStreamElem)
case None => Stream.empty
/** Returns next element in the queue (may wait for it to be inserted). */
private def next() =
queue.synchronized
if (queue.isEmpty) queue.wait()
queue.dequeue()
/** Adds new elements to this stream. */
def enqueue(elems: T*)
queue.synchronized
queue.enqueue(elems.mapSome(_): _*)
queue.notify()
/** Closes this stream. */
def close()
queue.synchronized
queue.enqueue(None)
queue.notify()
Paradigmatic 的解决方案(稍作修改)
感谢您的建议。我稍微修改了范例的解决方案,以便 toStream 返回一个不可变的流(允许可重复读取),以便满足我的需求。为了完整起见,这里是代码:
import collection.JavaConversions._
import java.util.concurrent.LinkedBlockingQueue, BlockingQueue
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() )
lazy val toStream: Stream[A] = queue2stream
private def queue2stream: Stream[A] = queue take match
case Some(a) => Stream cons ( a, queue2stream )
case None => Stream empty
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
【问题讨论】:
如果 FIFO 是可变的,它如何表现得像一个流?你想要某种日志结构吗? 我认为 FIFO 是输入和输出流的组合(构建队列)。那么输出流就是一个 Scala 流,它无论如何都会被懒惰地评估。但也许这从来都不是 Scala 开发人员的意图。我的FifoStream
类中的输出流也有点分离,并通过 FifoStream.stream 访问。但是,我实际上更喜欢可以通过管道传输和读取内容的单个对象。
【参考方案1】:
在 Scala 中,流是“函数式迭代器”。人们期望它们是纯粹的(没有副作用)和不可变的。在您的情况下,每次迭代流时都会修改队列(所以它不是纯粹的)。这会造成很多误解,因为重复两次相同的流会产生两种不同的结果。
话虽如此,您应该使用 Java BlockingQueues,而不是滚动您自己的实现。它们被认为在安全性和性能方面得到了很好的实施。这是我能想到的最简洁的代码(使用您的方法):
import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] )
def toStream: Stream[A] = queue take match
case Some(a) => Stream cons ( a, toStream )
case None => Stream empty
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
object FIFOStream
def apply[A]() = new LinkedBlockingQueue
【讨论】:
谢谢。这就是我一直在寻找的。我不知道 JRE 中还有一个 BlockingQueue。挺有用的。它优雅地简化了代码。我稍微修改了您的解决方案,以便 toStream 返回一个稳定的流(允许可重复读取)并将其添加到我的问题的末尾。再次感谢。 这看起来很有用!但应该是def apply[A]() = new FIFOStream[A](new LinkedBlockingQueue)
吗?【参考方案2】:
我假设您正在寻找类似 @987654321@ 的东西?
Akka 有一个 BoundedBlockingQueue 这个接口的实现。 java.util.concurrent 中当然有可用的实现。
您也可以考虑使用 Akka 的 actors 来处理您正在做的任何事情。使用 Actor 来通知或推送新事件或消息,而不是拉取。
【讨论】:
BoundedBlockingQueue
很有趣,但它的功能似乎类似于scala.actors.threadpool.LinkedBlockingQueue
等现有队列。这两个队列都是不可关闭的,也没有实现Traversable
,因此如果不导入JavaConversions._
,就不能用于理解。此外,我注意到当我尝试读取之前未添加的元素时,读取操作并没有阻塞。【参考方案3】:
1) 看来您正在寻找以 Oz 等语言显示的数据流,它支持生产者-消费者模式。此类集合在集合 API 中不可用,但您始终可以自己创建一个。
2) 数据流依赖single-assignment variables的概念(这样就不必在声明点初始化,在初始化之前读取它们会导致阻塞):
val x: Int
startThread
println(x)
println("The other thread waits for the x to be assigned")
x = 1
如果语言支持单赋值(或数据流)变量,那么实现这样的流将很简单(参见link)。由于它们不是 Scala 的一部分,因此您必须像以前一样使用 wait
-synchronized
-notify
模式。
Concurrent queues from Java 也可以用来实现这一点,正如其他用户所建议的那样。
【讨论】:
这是一种非常简洁的语法有趣的方法,但我会在下一个 Scala 版本中添加它。 曾一度考虑过,但是否会这样做仍不清楚。问题在于它的主要用例是数据流并行框架——它必须为数据流并行框架提供某种钩子,而不是通常使用wait
-notify
来实现。
DataFlow 并发在 Akka 中可用,Akka 将与 Scala 标准库合并:akka.io/docs/akka/1.2/scala/dataflow.html
是的,但不是作为语言本身的语言结构 - 它可以作为库例程使用。
@axel22 我没发现问题!以上是关于Scala中有FIFO流吗?的主要内容,如果未能解决你的问题,请参考以下文章