Scala中有FIFO流吗?

Posted

技术标签:

【中文标题】Scala中有FIFO流吗?【英文标题】:Is there a FIFO stream in Scala? 【发布时间】:2011-11-25 01:54:45 【问题描述】:

我正在寻找 Scala 中的 FIFO 流,即提供以下功能的东西

immutable.Stream(可以是有限的并记住已读取的元素的流) mutable.Queue(允许将元素添加到 FIFO)

流应该是可关闭的,并且应该阻止对下一个元素的访问,直到元素被添加或流被关闭。

实际上,我有点惊讶于集合库没有(似乎)包含这样的数据结构,因为它是 IMO 一个非常经典的数据结构。

我的问题:

1) 我是否忽略了什么?是否已有提供此功能的类?

2) 好的,如果它不包含在集合库中,那么它可能只是现有集合类的简单组合。然而,我试图找到这个微不足道的代码,但对于这样一个简单的问题,我的实现看起来仍然相当复杂。这样的 FifoStream 有没有更简单的解决方案?

class FifoStream[T] extends Closeable 

val queue = new Queue[Option[T]]

lazy val stream = nextStreamElem

private def nextStreamElem: Stream[T] = next() match 
    case Some(elem) => Stream.cons(elem, nextStreamElem)
    case None       => Stream.empty


/** Returns next element in the queue (may wait for it to be inserted). */
private def next() = 
    queue.synchronized 
        if (queue.isEmpty) queue.wait()
        queue.dequeue()
    


/** Adds new elements to this stream. */
def enqueue(elems: T*) 
    queue.synchronized 
        queue.enqueue(elems.mapSome(_): _*)
        queue.notify()
    


/** Closes this stream. */
def close() 
    queue.synchronized 
        queue.enqueue(None)
        queue.notify()
    


Paradigmatic 的解决方案(稍作修改)

感谢您的建议。我稍微修改了范例的解决方案,以便 toStream 返回一个不可变的流(允许可重复读取),以便满足我的需求。为了完整起见,这里是代码:

import collection.JavaConversions._
import java.util.concurrent.LinkedBlockingQueue, BlockingQueue

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) 
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match 
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )

【问题讨论】:

如果 FIFO 是可变的,它如何表现得像一个流?你想要某种日志结构吗? 我认为 FIFO 是输入和输出流的组合(构建队列)。那么输出流就是一个 Scala 流,它无论如何都会被懒惰地评估。但也许这从来都不是 Scala 开发人员的意图。我的FifoStream 类中的输出流也有点分离,并通过 FifoStream.stream 访问。但是,我实际上更喜欢可以通过管道传输和读取内容的单个对象。 【参考方案1】:

在 Scala 中,流是“函数式迭代器”。人们期望它们是纯粹的(没有副作用)和不可变的。在您的情况下,每次迭代流时都会修改队列(所以它不是纯粹的)。这会造成很多误解,因为重复两次相同的流会产生两种不同的结果。

话虽如此,您应该使用 Java BlockingQueues,而不是滚动您自己的实现。它们被认为在安全性和性能方面得到了很好的实施。这是我能想到的最简洁的代码(使用您的方法):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) 
  def toStream: Stream[A] = queue take match 
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )


object FIFOStream 
  def apply[A]() = new LinkedBlockingQueue

【讨论】:

谢谢。这就是我一直在寻找的。我不知道 JRE 中还有一个 BlockingQueue。挺有用的。它优雅地简化了代码。我稍微修改了您的解决方案,以便 toStream 返回一个稳定的流(允许可重复读取)并将其添加到我的问题的末尾。再次感谢。 这看起来很有用!但应该是def apply[A]() = new FIFOStream[A](new LinkedBlockingQueue) 吗?【参考方案2】:

我假设您正在寻找类似 @​​987654321@ 的东西?

Akka 有一个 BoundedBlockingQueue 这个接口的实现。 java.util.concurrent 中当然有可用的实现。

您也可以考虑使用 Akka 的 actors 来处理您正在做的任何事情。使用 Actor 来通知或推送新事件或消息,而不是拉取。

【讨论】:

BoundedBlockingQueue 很有趣,但它的功能似乎类似于scala.actors.threadpool.LinkedBlockingQueue 等现有队列。这两个队列都是不可关闭的,也没有实现Traversable,因此如果不导入JavaConversions._,就不能用于理解。此外,我注意到当我尝试读取之前未添加的元素时,读取操作并没有阻塞。【参考方案3】:

1) 看来您正在寻找以 Oz 等语言显示的数据流,它支持生产者-消费者模式。此类集合在集合 API 中不可用,但您始终可以自己创建一个。

2) 数据流依赖single-assignment variables的概念(这样就不必在声明点初始化,在初始化之前读取它们会导致阻塞):

val x: Int
startThread 
  println(x)

println("The other thread waits for the x to be assigned")
x = 1

如果语言支持单赋值(或数据流)变量,那么实现这样的流将很简单(参见link)。由于它们不是 Scala 的一部分,因此您必须像以前一样使用 wait-synchronized-notify 模式。

Concurrent queues from Java 也可以用来实现这一点,正如其他用户所建议的那样。

【讨论】:

这是一种非常简洁的语法有趣的方法,但我会在下一个 Scala 版本中添加它。 曾一度考虑过,但是否会这样做仍不清楚。问题在于它的主要用例是数据流并行框架——它必须为数据流并行框架提供某种钩子,而不是通常使用wait-notify 来实现。 DataFlow 并发在 Akka 中可用,Akka 将与 Scala 标准库合并:akka.io/docs/akka/1.2/scala/dataflow.html 是的,但不是作为语言本身的语言结构 - 它可以作为库例程使用。 @axel22 我没发现问题!

以上是关于Scala中有FIFO流吗?的主要内容,如果未能解决你的问题,请参考以下文章

Scala 风格:一个文件中有多个类?

`:_*`(冒号下划线星号)在 Scala 中有啥作用?

我们在 Scala 中有抑制异常吗?

在多模块 Java/scala 项目中有一个通用的 webapp?

Scala 模式匹配详解

Scala Control Structures