scala优先级队列没有正确排序?

Posted

技术标签:

【中文标题】scala优先级队列没有正确排序?【英文标题】:scala priority queue not ordering properly? 【发布时间】:2011-10-16 03:41:27 【问题描述】:

我发现 Scala 的 collection.mutable.PriorityQueue 出现了一些奇怪的行为。我正在执行外部排序并使用 1M 记录对其进行测试。每次我运行测试并验证 10-20 条记录之间的结果没有正确排序。我用java.util.PriorityQueue 替换了scala PriorityQueue 实现,它100% 的时间都有效。有什么想法吗?

这是代码(抱歉有点长……)。我使用来自http://sortbenchmark.org/ 的工具gensort -a 1000000valsort 对其进行测试

def externalSort(inFileName: String, outFileName: String)
    (implicit ord: Ordering[String]): Int = 

  val MaxTempFiles = 1024
  val TempBufferSize = 4096

  val inFile = new java.io.File(inFileName)

  /** Partitions input file and sorts each partition */
  def partitionAndSort()(implicit ord: Ordering[String]):
      List[java.io.File] = 

    /** Gets block size to use */
    def getBlockSize: Long = 
      var blockSize = inFile.length / MaxTempFiles
      val freeMem = Runtime.getRuntime().freeMemory()
      if (blockSize < freeMem / 2)
        blockSize = freeMem / 2
      else if (blockSize >= freeMem)
        System.err.println("Not enough free memory to use external sort.")
      blockSize
    

    /** Sorts and writes data to temp files */
    def writeSorted(buf: List[String]): java.io.File = 
      // Create new temp buffer
      val tmp = java.io.File.createTempFile("external", "sort")
      tmp.deleteOnExit()

      // Sort buffer and write it out to tmp file
      val out = new java.io.PrintWriter(tmp)
      try 
        for (l <- buf.sorted) 
          out.println(l)
        
       finally 
        out.close()
      

      tmp
    

    val blockSize = getBlockSize
    var tmpFiles = List[java.io.File]()
    var buf = List[String]()
    var currentSize = 0

    // Read input and divide into blocks
    for (line <- io.Source.fromFile(inFile).getLines()) 
      if (currentSize > blockSize) 
        tmpFiles ::= writeSorted(buf)
        buf = List[String]()
        currentSize = 0
      
      buf ::= line
      currentSize += line.length() * 2 // 2 bytes per char
    
    if (currentSize > 0) tmpFiles ::= writeSorted(buf)

    tmpFiles
  

  /** Merges results of sorted partitions into one output file */
  def mergeSortedFiles(fs: List[java.io.File])
      (implicit ord: Ordering[String]): Int = 

    /** Temp file buffer for reading lines */
    class TempFileBuffer(val file: java.io.File) 

      private val in = new java.io.BufferedReader(
        new java.io.FileReader(file), TempBufferSize)
      private var curLine: String = ""

      readNextLine() // prep first value

      def currentLine = curLine

      def isEmpty = curLine == null

      def readNextLine() 
        if (curLine == null) return

        try 
          curLine = in.readLine()
         catch 
          case _: java.io.EOFException => curLine = null
        

        if (curLine == null) in.close()
      

      override protected def finalize() 
        try 
          in.close()
         finally 
          super.finalize()
        
      
    

    val wrappedOrd = new Ordering[TempFileBuffer] 
      def compare(o1: TempFileBuffer, o2: TempFileBuffer): Int = 
        ord.compare(o1.currentLine, o2.currentLine)
      
    

    val pq = new collection.mutable.PriorityQueue[TempFileBuffer](
      )(wrappedOrd)

    // Init queue with item from each file
    for (tmp <- fs) 
      val buf = new TempFileBuffer(tmp)
      if (!buf.isEmpty) pq += buf
    

    var count = 0

    val out = new java.io.PrintWriter(new java.io.File(outFileName))
    try 
      // Read each value off of queue
      while (pq.size > 0) 
        val buf = pq.dequeue()
        out.println(buf.currentLine)
        count += 1
        buf.readNextLine()
        if (buf.isEmpty) 
          buf.file.delete() // don't need anymore
         else 
          // re-add to priority queue so we can process next line
          pq += buf
        
      
     finally 
      out.close()
    

    count
  

  mergeSortedFiles(partitionAndSort())

【问题讨论】:

Scala 版本是 2.9.0.1 我一直在编写代码来回答一个关于 codility 的测试,该测试适用于 Java 的 Priority Queue 但不适用于 Scala 的大型数据集。他们声称正在使用 scala 2.12 。我不确定这些链接将保持多长时间... Java 版本 app.codility.com/demo/results/demoXCWYCR-6PZ Scala 版本 app.codility.com/demo/results/demoKABUXG-7FV 我在代码中犯了一个错误。我以为我可以按顺序从 PriorityQueue 中获取元素,但实际上只能通过使用 dequeue 函数按排序顺序获取元素。有效的代码是这个:app.codility.com/demo/results/demoMU2HYZ-S3H 【参考方案1】:

我的测试在 PriorityQueue 中没有显示任何错误。

import org.scalacheck._
import Prop._

object PriorityQueueProperties extends Properties("PriorityQueue") 
  def listToPQ(l: List[String]): PriorityQueue[String] =  
    val pq = new PriorityQueue[String]
    l foreach (pq +=)
    pq 
  
  def pqToList(pq: PriorityQueue[String]): List[String] = 
    if (pq.isEmpty) Nil 
    else  val h = pq.dequeue; h :: pqToList(pq) 

  property("Enqueued elements are dequeued in reverse order") = 
    forAll  (l: List[String]) => l.sorted == pqToList(listToPQ(l)).reverse 

  property("Adding/removing elements doesn't break sorting") = 
    forAll  (l: List[String], s: String) => 
      (l.size > 0) ==> 
      ((s :: l.sorted.init).sorted ==  
        val pq = listToPQ(l)
        pq.dequeue
        pq += s
        pqToList(pq).reverse 
      )
    


scala> PriorityQueueProperties.check
+ PriorityQueue.Enqueued elements are dequeued in reverse order: OK, passed
   100 tests.
+ PriorityQueue.Adding/removing elements doesn't break sorting: OK, passed 
  100 tests.

如果您能以某种方式将输入减少到足以制作测试用例的程度,那将会有所帮助。

【讨论】:

这就是问题所在,小输入不会出现这种情况。第一个乱序记录可以在 600K 开始测试。【参考方案2】:

我用 500 万个输入多次运行它,输出总是符合预期。通过查看您的代码,我的猜测是您的 Ordering 是问题所在(即它给出的答案不一致。)

【讨论】:

以上是关于scala优先级队列没有正确排序?的主要内容,如果未能解决你的问题,请参考以下文章

优先级队列对对象排序不正确(用户定义的比较)

Scala中HashMap的优先级队列

Prim算法优先队列

HDU 4857 逃生(反向拓扑排序+优先队列)

Scala 问题对 Stack[A] 使用 PriorityQueue 非默认排序

如何在 Scala 中获取优先级队列的第 k 个最小元素?