优化Scala代码以读取不适合内存的大文件的有效方法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了优化Scala代码以读取不适合内存的大文件的有效方法相关的知识,希望对你有一定的参考价值。

Problem Statement Below,

我们有一个大型日志文件,用于存储用户与应用程序的交互。日志文件中的条目遵循以下模式:{userId,timestamp,actionType}其中actionType是两个可能值之一:[open,close]

Constraints:

  1. 日志文件太大,无法放入一台计算机的内存中。还假设聚合数据不适合内存。
  2. 代码必须能够在一台机器上运行。
  3. 不应该使用mapreduce或第三方数据库的开箱即用的实现;不要以为我们有Hadoop或Spark或其他分布式计算框架。
  4. 每个用户的每个actionType可以有多个条目,日志文件中可能缺少条目。因此,用户可能在两个打开的记录之间缺少关闭记录,反之亦然。
  5. 时间戳将严格按升序排列。

对于这个问题,我们需要实现一个类/类来计算每个用户在打开和关闭之间所花费的平均时间。请记住,某些用户缺少条目,因此我们必须在进行计算时选择如何处理这些条目。代码应遵循关于我们如何做出选择的一致政策。

对于日志文件中的所有用户,解决方案的所需输出应为[{userId,timeSpent},....]。

示例日志文件(逗号分隔,文本文件)

1,1435456566,open 
2,1435457643,open 
3,1435458912,open 
1,1435459567,close 
4,1435460345,open 
1,1435461234,open 
2,1435462567,close 
1,1435463456,open 
3,1435464398,close 
4,1435465122,close 
1,1435466775,close

Approach

下面是我用Python和Scala编写的代码,它似乎效率不高,符合给出的方案的期望,我想在本论坛中从开发人员社区反馈我们如何更好地优化这些代码按照给定方案。

规模实施

import java.io.FileInputStream
import java.util.{Scanner, Map, LinkedList}
import java.lang.Long
import scala.collection.mutable

object UserMetrics extends App {
  if (args.length == 0) {
    println("Please provide input data file name for processing")
  } 
  val userMetrics = new UserMetrics()
  userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)
}

case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)

class UserMetrics {

  val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()

  def readInputFile(stArr:String, timeOut: Int) {
    var inputStream: FileInputStream = null
    var sc: Scanner = null
    try {
      inputStream = new FileInputStream(stArr);
      sc = new Scanner(inputStream, "UTF-8");
      while (sc.hasNextLine()) {
        val line: String = sc.nextLine();
        processInput(line, timeOut)
      }

      for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap) {
        val userInfo:UserInfo = userLs.get(0)
        val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
        println("{" + key +","+timespent + "}")
      }

      if (sc.ioException() != null) {
        throw sc.ioException();
      }
    } finally {
      if (inputStream != null) {
        inputStream.close();
      }
      if (sc != null) {
        sc.close();
      }
    }
  }

  def processInput(line: String, timeOut: Int) {
    val strSp = line.split(",")

    val userId: Integer = Integer.parseInt(strSp(0))
    val curTimeStamp = Long.parseLong(strSp(1))
    val status = strSp(2)
    val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
    val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()

    val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)

    if (lsUserInfo != null && lsUserInfo.size() > 0) {
      val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
      val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
      val prevStatus: String = lastUserInfo.prevStatus

      if (prevStatus.equals("open")) {
        if (status.equals(lastUserInfo.prevStatus)) {
           val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
           val timeDiff = lastUserInfo.timeSpent + timeSelector
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
        } else if(!status.equals(lastUserInfo.prevStatus)){
          val timeDiff = lastUserInfo.timeSpent + curTimeStamp - prevTimeStamp
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
        }
      } else if(prevStatus.equals("close")) {
        if (status.equals(lastUserInfo.prevStatus)) {
          lsUserInfo.remove()
          val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent + timeSelector, lastUserInfo.occurence+1))
        }else if(!status.equals(lastUserInfo.prevStatus))
          {     
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))
        }
      }
    }else if(lsUserInfo.size()==0){
      lsUserInfo.add(uInfo)
    }
    usermap.put(userId, lsUserInfo)
  }

}

Python实现

import sys

def fileBlockStream(fp, number_of_blocks, block):
    #A generator that splits a file into blocks and iterates over the lines of one of the blocks.

    assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
    assert 0 < number_of_blocks

    fp.seek(0,2) #seek to end of file to compute block size
    file_size = fp.tell() 

    ini = file_size * block / number_of_blocks #compute start & end point of file block
    end = file_size * (1 + block) / number_of_blocks

    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()

    while fp.tell() < end:
        yield fp.readline() #iterate over lines of the particular chunk or block

def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
    countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
    for rows in chunk.splitlines():
        if len(rows.split(",")) != 3:
            continue
        userKeyID = rows.split(",")[0]
        try:
            curTimeStamp = int(rows.split(",")[1])
        except ValueError:
            print("Invalid Timestamp for ID:" + str(userKeyID))
            continue
        curEvent = rows.split(",")[2]
        if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close": 
        #Check if already existing userID with expected Close event 0 - Open; 1 - Close
        #Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
            curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount
            avgTimeSpentDict[userKeyID][totTmPos] = totalTime
            avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
            avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open

        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
            avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
            avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close

        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
            curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            avgTimeSpentDict[userKeyID][totTmPos]=totalTime
            avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
            avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount          

        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close": 
            curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            avgTimeSpentDict[userKeyID][totTmPos]=totalTime
            avgTimeSpentDict[userKeyID][openTmPos]=openTime
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount

        elif curEvent == "open":
            #Initialize userid with Open event
            avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]

        elif curEvent == "close":
            #Initialize userid with missing handler function since there is no Open event for this User
            totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
            avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]

def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
    if lastTimeVal - curTimeVal > defaultTimeOut:
        return defaultTimeOut,curTimeVal
    else:
        return lastTimeVal - curTimeVal,curTimeVal

def computeAvg(avgTimeSpentDict,defaultTimeOut):
    resDict = {}
    for k,v in avgTimeSpentDict.iteritems():
        if v[0] == 0:
            resDict[k] = 0
        else:
            resDict[k] = v[1]/v[0]
    return resDict

if __name__ == "__main__":
    avgTimeSpentDict = {}
    if len(sys.argv) < 2:
        print("Please provide input data file name for processing")
        sys.exit(1)

    fileObj = open(sys.argv[1])
    number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
    defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
    for chunk_number in range(number_of_chunks):
        for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
            computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
    print (computeAvg(avgTimeSpentDict,defaultTimeOut))
    avgTimeSpentDict.clear() #Nullify dictionary 
    fileObj.close #Close the file object

上面的两个程序都给出了所需的输出,但效率对于这个特定的场景至关重要。如果您对现有实施有更好的建议或任何建议,请告诉我。

提前致谢!!

答案

你所追求的是迭代器的用法。我不会重写你的代码,但这里的诀窍可能是使用iterator。幸运的是,Scala为这项工作提供了开箱即用的工具。

import scala.io.Source
object ReadBigFiles {
  def read(fileName: String): Unit = {
    val lines: Iterator[String] = Source.fromFile(fileName).getLines
    // now you get iterator semantics for the file line traversal
    // that means you can only go through the lines once, but you don't incur a penalty on heap usage
  }
}

对于您的用例,您似乎需要lastUser,因此您要处理2个条目的组。我认为你有两个选择,要么选择iterator.sliding(2),它会为每对产生迭代器,或者只是使用选项将混合添加到混合中。

def navigate(source: Iterator[String], last: Option[User]): ResultType = {
  if (source.hasNext) {
    val current = source.next()
    last match {
      case Some(existing) => // compare with previous user etc
      case None => navigate(source, Some(current))
    }
  } else {
    // exit recursion, return result
  }
}

您可以避免使用您编写的所有代码来读取文件等。如果需要计算出现次数,只需在递归中构建一个Map,并根据业务逻辑在每一步增加出现次数。

以上是关于优化Scala代码以读取不适合内存的大文件的有效方法的主要内容,如果未能解决你的问题,请参考以下文章

将标头记录(或字符串/文件)添加到 Scala / Java 中的大文件中

电子书 Scala程序设计 第2版.pdf

读取和存储GPU的大矩阵文件

使用 Python 处理不适合内存的文件

大数据 | 适合小白入门的Spark基础及源码分析视频教程

Scala 中的 Disk-persisted-lazy-cacheable-List ™