获取文件的最后n行,类似于tail
Posted
技术标签:
【中文标题】获取文件的最后n行,类似于tail【英文标题】:Get last n lines of a file, similar to tail 【发布时间】:2010-09-13 06:21:05 【问题描述】:我正在为 Web 应用程序编写日志文件查看器,为此我想通过日志文件的行进行分页。文件中的项目是基于行的,最新的项目位于底部。
所以我需要一个tail()
方法,它可以从底部读取n
行并支持偏移量。这是我想出的帽子:
def tail(f, n, offset=0):
"""Reads a n lines from f with an offset of offset lines."""
avg_line_length = 74
to_read = n + offset
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None]
avg_line_length *= 1.3
这是一个合理的方法吗?使用偏移量尾随日志文件的推荐方法是什么?
【问题讨论】:
在我的系统 (linux SLES 10) 上,相对于末端进行搜索会引发 IOError “不能进行非零末端相对搜索”。我喜欢这个解决方案,但已对其进行了修改以获取文件长度(seek(0,2)
然后tell()
),并使用该值相对于开头进行搜索。
恭喜 - 这个问题进入了 Kippo 源代码
应指定用于生成f
文件对象的open
命令的参数,因为根据f=open(..., 'rb')
或f=open(..., 'rt')
必须以不同方式处理f
我决定为此编写一个 100% 通用的解决方案,因此现在您可以访问一个巨大的文本文件,例如具有任意正或负切片的列表,例如:[-2000:-1900] 等等@ 987654321@
【参考方案1】:
这可能比你的更快。不对行长做任何假设。一次一个块地返回文件,直到找到正确数量的 '\n' 字符。
def tail( f, lines=20 ):
total_lines_wanted = lines
BLOCK_SIZE = 1024
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
# from the end of the file
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
# read the last block we haven't yet read
f.seek(block_number*BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count('\n')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = ''.join(reversed(blocks))
return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
我不喜欢关于行长的棘手假设——实际上——你永远无法知道这样的事情。
通常,这将在循环的第一次或第二次通过时定位最后 20 行。如果您的 74 个字符实际上是准确的,那么您将块大小设置为 2048 并且您将几乎立即拖尾 20 行。
此外,我不会消耗大量大脑卡路里来尝试与物理操作系统块对齐。使用这些高级 I/O 包,我怀疑您会看到尝试在 OS 块边界上对齐的任何性能后果。如果您使用较低级别的 I/O,那么您可能会看到加速。
更新
对于 Python 3.2 及更高版本,按照在文本文件中(那些在模式字符串中没有 "b" 的情况下打开的字节的过程),仅相对于开头查找文件是允许的(例外是用 seek(0, 2) 搜索到文件末尾)。:
例如:f = open('C:/.../../apache_logs.txt', 'rb')
def tail(f, lines=20):
total_lines_wanted = lines
BLOCK_SIZE = 1024
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
blocks = []
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
f.seek(block_number*BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
f.seek(0,0)
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count(b'\n')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = b''.join(reversed(blocks))
return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
【讨论】:
这在小日志文件上失败 -- IOError: invalid argument -- f.seek(block*1024, 2) 确实是非常好的方法。我使用了上面代码的略微修改版本,并提出了这个配方:code.activestate.com/recipes/577968-log-watcher-tail-f-log 不再适用于 python 3.2。我得到io.UnsupportedOperation: can't do nonzero end-relative seeks
我可以将偏移量更改为0,但这违背了函数的目的。
@DavidEnglund 原因是here。简而言之:在文本模式下不允许相对于文件末尾进行查找,大概是因为必须对文件内容进行解码,并且通常,在编码字节序列中查找任意位置可能会产生未定义的结果尝试从该位置开始解码为 Unicode。链接提供的建议是尝试以二进制模式打开文件并自己进行解码,捕获 DecodeError 异常。
不要使用此代码。它会破坏 python 2.7 中某些边界情况下的线条。下面@papercrane 的回答解决了这个问题。【参考方案2】:
假设您可以在 Python 2 上使用类似 unix 的系统:
import os
def tail(f, n, offset=0):
stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
stdin.close()
lines = stdout.readlines(); stdout.close()
return lines[:,-offset]
对于 python 3,你可以这样做:
import subprocess
def tail(f, n, offset=0):
proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
lines = proc.stdout.readlines()
return lines[:, -offset]
【讨论】:
应该独立于平台。此外,如果您阅读该问题,您会发现 f 是一个类似对象的文件。 问题并不是说平台依赖性是不可接受的。我不明白为什么当它提供了一种非常统一的(可能是你正在寻找的......当然是对我来说)的方式来做问题所要求的时,为什么值得两次反对。 谢谢,我想我必须在纯 Python 中解决这个问题,但没有理由不使用手头的 UNIX 实用程序,所以我选择了这个。现代 Python 中的 FWIW,subprocess.check_output 可能比 os.popen2 更可取;它简化了一些事情,因为它只是将输出作为字符串返回,并引发非零退出代码。 虽然这取决于平台,但它是一种非常有效的方式来完成所要求的工作,并且是一种非常快速的方式(你不需要必须将整个文件加载到内存中)。 @Shabbyrobe 您可能希望像 :offset_total = str(n+offset)
这样预先计算偏移量并替换此行 stdin,stdout = os.popen2("tail -n "+offset_total+" "+f)
以避免 TypeErrors (cannot concatenate int+str)
【参考方案3】:
这是我的答案。纯蟒蛇。使用 timeit 似乎很快。拖尾具有 100,000 行的日志文件的 100 行:
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165
代码如下:
import os
def tail(f, lines=1, _buffer=4098):
"""Tail a file and get X lines from the end"""
# place holder for the lines found
lines_found = []
# block counter will be multiplied by buffer
# to get the block size from the end
block_counter = -1
# loop until we find X lines
while len(lines_found) < lines:
try:
f.seek(block_counter * _buffer, os.SEEK_END)
except IOError: # either file is too small, or too many lines requested
f.seek(0)
lines_found = f.readlines()
break
lines_found = f.readlines()
# we found enough lines, get out
# Removed this line because it was redundant the while will catch
# it, I left it for history
# if len(lines_found) > lines:
# break
# decrement the block counter to get the
# next X bytes
block_counter -= 1
return lines_found[-lines:]
【讨论】:
优雅的解决方案!if len(lines_found) > lines:
真的有必要吗? loop
条件不会也捕获它吗?
我理解的一个问题:os.SEEK_END
是否只是为了清楚起见?据我发现,它的值是恒定的(= 2)。我想知道是否可以省略import os
。感谢您的出色解决方案!
@MaximilianPeters 是的。这不是必需的。我把它注释掉了。
我投了赞成票,但有一点点不足。搜索后,第一行读取可能不完整,因此为了获得 N _complete_lines,我将副本中的 while len(lines_found) < lines
更改为 while len(lines_found) <= lines
。谢谢!
总是从结尾处寻找是错误的,因为它假定每次循环迭代的结尾都是相同的。想想这段代码运行时写入的日志文件。【参考方案4】:
如果可以接受读取整个文件,则使用双端队列。
from collections import deque
deque(f, maxlen=n)
在 2.6 之前,deques 没有 maxlen 选项,但它很容易实现。
import itertools
def maxque(items, size):
items = iter(items)
q = deque(itertools.islice(items, size))
for item in items:
del q[0]
q.append(item)
return q
如果需要从末尾读取文件,则使用疾驰(也称为指数)搜索。
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
while len(lines) <= n:
try:
f.seek(-pos, 2)
except IOError:
f.seek(0)
break
finally:
lines = list(f)
pos *= 2
return lines[-n:]
【讨论】:
为什么那个底部函数有效?pos *= 2
似乎完全是武断的。它有什么意义?
@2mac Exponential Search。它从文件末尾迭代读取,每次读取量翻倍,直到找到足够的行。
我认为从末尾读取的解决方案将不支持使用 UTF-8 编码的文件,因为字符长度是可变的,并且您可能(可能会)落在一些无法实现的奇怪偏移量正确解释。
不幸的是,您的 galloping 搜索解决方案不适用于 python 3。因为 f.seek() 没有负偏移。我已经更新了您的代码,使其适用于 python 3 link
这里是来自文档的 deque 做什么:一旦有界长度的双端队列已满,当添加新项目时,相应数量的项目将从另一端丢弃。如果 n=1,它从文件中读取最后(或唯一的一行)。为什么你在 deque 做同样的事情时提供 tail 方法?【参考方案5】:
上面 S.Lott 的回答几乎对我有用,但最终给了我部分内容。事实证明,它破坏了块边界上的数据,因为数据以相反的顺序保存读取的块。调用 ''.join(data) 时,块的顺序错误。这解决了这个问题。
def tail(f, window=20):
"""
Returns the last `window` lines of file `f` as a list.
f - a byte file-like object
"""
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and bytes > 0:
if bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
data.insert(0, f.read(BUFSIZ))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
data.insert(0, f.read(bytes))
linesFound = data[0].count('\n')
size -= linesFound
bytes -= BUFSIZ
block -= 1
return ''.join(data).splitlines()[-window:]
【讨论】:
在列表的开头插入是个坏主意。为什么不使用双端队列结构? 遗憾的是不兼容 Python 3...试图找出原因。【参考方案6】:我最终使用的代码。我认为这是迄今为止最好的:
def tail(f, n, offset=None):
"""Reads a n lines from f with an offset of offset lines. The return
value is a tuple in the form ``(lines, has_more)`` where `has_more` is
an indicator that is `True` if there are more lines in the file.
"""
avg_line_length = 74
to_read = n + (offset or 0)
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None], \
len(lines) > to_read or pos > 0
avg_line_length *= 1.3
【讨论】:
没有完全回答这个问题。【参考方案7】:使用 mmap 的简单快速解决方案:
import mmap
import os
def tail(filename, n):
"""Returns last n lines from the filename. No exception handling"""
size = os.path.getsize(filename)
with open(filename, "rb") as f:
# for Windows the mmap parameters are different
fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
try:
for i in xrange(size - 1, -1, -1):
if fm[i] == '\n':
n -= 1
if n == -1:
break
return fm[i + 1 if i else 0:].splitlines()
finally:
fm.close()
【讨论】:
当输入可能很大时,这可能是最快的答案(或者如果它使用.rfind
方法向后扫描换行符,而不是在Python 级别;在 CPython 中,用 C 内置调用替换 Python 级别的代码通常会胜出很多)。对于较小的输入,带有maxlen
的deque
更简单,并且可能同样快速。【参考方案8】:
最简单的方法是使用deque
:
from collections import deque
def tail(filename, n=10):
with open(filename) as f:
return deque(f, n)
【讨论】:
这将遍历整个文件。如果您正在处理大文件,请记住这一点。【参考方案9】:将@papercrane 解决方案更新为 python3。
使用open(filename, 'rb')
打开文件并:
def tail(f, window=20):
"""Returns the last `window` lines of file `f` as a list.
"""
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
remaining_bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and remaining_bytes > 0:
if remaining_bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
bunch = f.read(BUFSIZ)
else:
# file too small, start from beginning
f.seek(0, 0)
# only read what was not read
bunch = f.read(remaining_bytes)
bunch = bunch.decode('utf-8')
data.insert(0, bunch)
size -= bunch.count('\n')
remaining_bytes -= BUFSIZ
block -= 1
return ''.join(data).splitlines()[-window:]
【讨论】:
您可能需要添加:assert "b" in file.mode, "File mode must be bytes!"
以检查文件模式是否实际上是字节。【参考方案10】:
应评论者的要求在my answer to a similar question 上发布答案,其中使用相同的技术来改变文件的最后一行,而不仅仅是获取它。
对于较大的文件,mmap
是执行此操作的最佳方法。为了改进现有的mmap
答案,这个版本可以在 Windows 和 Linux 之间移植,并且应该运行得更快(尽管它不会在 32 位 Python 上使用 GB 范围内的文件进行一些修改,请参阅other answer for hints on handling this, and for modifying to work on Python 2) .
import io # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap
def skip_back_lines(mm, numlines, startidx):
'''Factored out to simplify handling of n and offset'''
for _ in itertools.repeat(None, numlines):
startidx = mm.rfind(b'\n', 0, startidx)
if startidx < 0:
break
return startidx
def tail(f, n, offset=0):
# Reopen file in binary mode
with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# len(mm) - 1 handles files ending w/newline by getting the prior line
startofline = skip_back_lines(mm, offset, len(mm) - 1)
if startofline < 0:
return [] # Offset lines consumed whole file, nothing to return
# If using a generator function (yield-ing, see below),
# this should be a plain return, no empty list
endoflines = startofline + 1 # Slice end to omit offset lines
# Find start of lines to capture (add 1 to move from newline to beginning of following line)
startofline = skip_back_lines(mm, n, startofline) + 1
# Passing True to splitlines makes it return the list of lines without
# removing the trailing newline (if any), so list mimics f.readlines()
return mm[startofline:endoflines].splitlines(True)
# If Windows style \r\n newlines need to be normalized to \n, and input
# is ASCII compatible, can normalize newlines with:
# return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)
这假设拖尾的行数足够小,您可以一次安全地将它们全部读入内存;您还可以将其设为生成器函数,并通过将最后一行替换为以下内容手动一次读取一行:
mm.seek(startofline)
# Call mm.readline n times, or until EOF, whichever comes first
# Python 3.2 and earlier:
for line in itertools.islice(iter(mm.readline, b''), n):
yield line
# 3.3+:
yield from itertools.islice(iter(mm.readline, b''), n)
最后,以二进制模式读取(必须使用mmap
),因此它给出str
行(Py2)和bytes
行(Py3);如果您想要 unicode
(Py2) 或 str
(Py3),可以调整迭代方法以为您解码和/或修复换行符:
lines = itertools.islice(iter(mm.readline, b''), n)
if f.encoding: # Decode if the passed file was opened with a specific encoding
lines = (line.decode(f.encoding) for line in lines)
if 'b' not in f.mode: # Fix line breaks if passed file opened in text mode
lines = (line.replace(os.linesep, '\n') for line in lines)
# Python 3.2 and earlier:
for line in lines:
yield line
# 3.3+:
yield from lines
注意:我在无法访问 Python 进行测试的机器上输入了这一切。如果我输入任何错误,请告诉我;这与my other answer 非常相似,我认为它应该可以工作,但是调整(例如处理offset
)可能会导致细微的错误。如果有任何错误,请在 cmets 中告诉我。
【讨论】:
【参考方案11】:一个更干净的python3兼容版本,不插入而是追加和反转:
def tail(f, window=1):
"""
Returns the last `window` lines of file `f` as a list of bytes.
"""
if window == 0:
return b''
BUFSIZE = 1024
f.seek(0, 2)
end = f.tell()
nlines = window + 1
data = []
while nlines > 0 and end > 0:
i = max(0, end - BUFSIZE)
nread = min(end, BUFSIZE)
f.seek(i)
chunk = f.read(nread)
data.append(chunk)
nlines -= chunk.count(b'\n')
end -= nread
return b'\n'.join(b''.join(reversed(data)).splitlines()[-window:])
像这样使用它:
with open(path, 'rb') as f:
last_lines = tail(f, 3).decode('utf-8')
【讨论】:
不算太破旧——但我一般建议不要为一个已有 10 年历史的问题添加答案,但答案很多。但请帮帮我:您的代码中有哪些特定于 Python 3 的内容? 其他答案效果不佳:-) py3:见***.com/questions/136168/…【参考方案12】:基于 S.Lott 的最高投票答案(2008 年 9 月 25 日,21:43),但已针对小文件修复。
def tail(the_file, lines_2find=20):
the_file.seek(0, 2) #go to end of file
bytes_in_file = the_file.tell()
lines_found, total_bytes_scanned = 0, 0
while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned:
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
the_file.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += the_file.read(1024).count('\n')
the_file.seek(-total_bytes_scanned, 2)
line_list = list(the_file.readlines())
return line_list[-lines_2find:]
#we read at least 21 line breaks from the bottom, block by block for speed
#21 to ensure we don't get a half line
希望这是有用的。
【讨论】:
【参考方案13】:pypi 上有一些现有的 tail 实现,您可以使用 pip 安装它们:
mtFileUtil 多尾 log4tailer ...根据您的情况,使用这些现有工具之一可能会有优势。
【讨论】:
您知道任何可在 Windows 上运行的模块吗?我试过tailhead
、tailer
,但它们没有用。也试过mtFileUtil
。它最初抛出错误是因为 print
语句没有括号(我在 Python 3.6 上)。我在reverse.py
中添加了这些,错误消息消失了,但是当我的脚本调用模块(mtFileUtil.tail(open(logfile_path), 5)
)时,它不会打印任何内容。【参考方案14】:
简单:
with open("test.txt") as f:
data = f.readlines()
tail = data[-2:]
print(''.join(tail)
【讨论】:
这是一个完全糟糕的实现。考虑处理大文件,其中 n 也很大,操作成本太高【参考方案15】:我发现上面的 Popen 是最好的解决方案。它又快又脏,而且有效 对于 Unix 机器上的 python 2.6,我使用了以下
def GetLastNLines(self, n, fileName):
"""
Name: Get LastNLines
Description: Gets last n lines using Unix tail
Output: returns last n lines of a file
Keyword argument:
n -- number of last lines to return
filename -- Name of the file you need to tail into
"""
p = subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE)
soutput, sinput = p.communicate()
return soutput
s 输出将包含代码的最后 n 行。逐行遍历 soutput:
for line in GetLastNLines(50,'myfile.log').split('\n'):
print line
【讨论】:
【参考方案16】:为了高效处理非常大的文件(在您可能想要使用 tail 的日志文件情况下很常见),您通常希望避免读取整个文件(即使您这样做而不是一次将整个文件读入内存)但是,您确实需要以某种方式计算行而不是字符的偏移量。一种可能性是用 seek() 逐个字符向后读取,但这非常慢。相反,最好在更大的块中处理。
我前段时间写了一个实用函数,用于向后读取文件,可以在这里使用。
import os, itertools
def rblocks(f, blocksize=4096):
"""Read file as series of blocks from end of file to start.
The data itself is in normal order, only the order of the blocks is reversed.
ie. "hello world" -> ["ld","wor", "lo ", "hel"]
Note that the file must be opened in binary mode.
"""
if 'b' not in f.mode.lower():
raise Exception("File must be opened using binary mode.")
size = os.stat(f.name).st_size
fullblocks, lastblock = divmod(size, blocksize)
# The first(end of file) block will be short, since this leaves
# the rest aligned on a blocksize boundary. This may be more
# efficient than having the last (first in file) block be short
f.seek(-lastblock,2)
yield f.read(lastblock)
for i in range(fullblocks-1,-1, -1):
f.seek(i * blocksize)
yield f.read(blocksize)
def tail(f, nlines):
buf = ''
result = []
for block in rblocks(f):
buf = block + buf
lines = buf.splitlines()
# Return all lines except the first (since may be partial)
if lines:
result.extend(lines[1:]) # First line may not be complete
if(len(result) >= nlines):
return result[-nlines:]
buf = lines[0]
return ([buf]+result)[-nlines:]
f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
print line
[编辑]添加了更具体的版本(避免需要反转两次)
【讨论】:
快速测试表明,它的性能比我上面的版本差很多。可能是因为你的缓冲。 我怀疑这是因为我正在向后进行多次搜索,所以没有充分利用预读缓冲区。但是,我认为当您对行长的猜测不准确(例如非常长的行)时,它可能会做得更好,因为它避免了在这种情况下重新读取数据。【参考方案17】:您可以使用 f.seek(0, 2) 转到文件末尾,然后使用以下替换 readline() 逐行读取:
def readline_backwards(self, f):
backline = ''
last = ''
while not last == '\n':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
backline = last
last = ''
while not last == '\n':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
f.seek(1, 1)
return backline
【讨论】:
【参考方案18】:基于 Eyecue 的回答(2010 年 6 月 10 日 21:28):此类将 head() 和 tail() 方法添加到文件对象。
class File(file):
def head(self, lines_2find=1):
self.seek(0) #Rewind file
return [self.next() for x in xrange(lines_2find)]
def tail(self, lines_2find=1):
self.seek(0, 2) #go to end of file
bytes_in_file = self.tell()
lines_found, total_bytes_scanned = 0, 0
while (lines_2find+1 > lines_found and
bytes_in_file > total_bytes_scanned):
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
self.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += self.read(1024).count('\n')
self.seek(-total_bytes_scanned, 2)
line_list = list(self.readlines())
return line_list[-lines_2find:]
用法:
f = File('path/to/file', 'r')
f.head(3)
f.tail(3)
【讨论】:
【参考方案19】:如果文件不以 \n 结尾或确保读取完整的第一行,则其中一些解决方案会出现问题。
def tail(file, n=1, bs=1024):
f = open(file)
f.seek(-1,2)
l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway.
B = f.tell()
while n >= l and B > 0:
block = min(bs, B)
B -= block
f.seek(B, 0)
l += f.read(block).count('\n')
f.seek(B, 0)
l = min(l,n) # discard first (incomplete) line if l > n
lines = f.readlines()[-l:]
f.close()
return lines
【讨论】:
【参考方案20】:这是一个非常简单的实现:
with open('/etc/passwd', 'r') as f:
try:
f.seek(0,2)
s = ''
while s.count('\n') < 11:
cur = f.tell()
f.seek((cur - 10))
s = f.read(10) + s
f.seek((cur - 10))
print s
except Exception as e:
f.readlines()
【讨论】:
很好的例子!你能解释一下f.seek
之前try的用法吗?为什么不在with open
之前?另外,为什么在except
你做f.readlines()
??
老实说,try 可能应该先进行。我不记得除了在健康的标准 Linux 系统上之外没有捕获 open() 的原因,/etc/passwd 应该始终是可读的. try,然后 with 是更常见的顺序。【参考方案21】:
有一个非常有用的module可以做到这一点:
from file_read_backwards import FileReadBackwards
with FileReadBackwards("/tmp/file", encoding="utf-8") as frb:
# getting lines by lines starting from the last line up
for l in frb:
print(l)
【讨论】:
【参考方案22】:更新A.Coady给出的答案
适用于 python 3。
这使用Exponential Search,并且只会从后面缓冲N
行,非常有效。
import time
import os
import sys
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
# set file pointer to end
f.seek(0, os.SEEK_END)
isFileSmall = False
while len(lines) <= n:
try:
f.seek(f.tell() - pos, os.SEEK_SET)
except ValueError as e:
# lines greater than file seeking size
# seek to start
f.seek(0,os.SEEK_SET)
isFileSmall = True
except IOError:
print("Some problem reading/seeking the file")
sys.exit(-1)
finally:
lines = f.readlines()
if isFileSmall:
break
pos *= 2
print(lines)
return lines[-n:]
with open("stream_logs.txt") as f:
while(True):
time.sleep(0.5)
print(tail(f,2))
【讨论】:
【参考方案23】:我不得不从文件的最后一行读取一个特定的值,然后偶然发现了这个线程。我没有在 Python 中重新发明***,而是得到了一个很小的 shell 脚本,保存为 /usr/local/bin/get_last_netp:
#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk 'print $14'
在 Python 程序中:
from subprocess import check_output
last_netp = int(check_output("/usr/local/bin/get_last_netp"))
【讨论】:
【参考方案24】:不是第一个使用双端队列的例子,而是一个更简单的例子。这个是通用的:它适用于任何可迭代的对象,而不仅仅是文件。
#!/usr/bin/env python
import sys
import collections
def tail(iterable, N):
deq = collections.deque()
for thing in iterable:
if len(deq) >= N:
deq.popleft()
deq.append(thing)
for thing in deq:
yield thing
if __name__ == '__main__':
for line in tail(sys.stdin,10):
sys.stdout.write(line)
【讨论】:
【参考方案25】:This is my version of tailf
import sys, time, os
filename = 'path to file'
try:
with open(filename) as f:
size = os.path.getsize(filename)
if size < 1024:
s = size
else:
s = 999
f.seek(-s, 2)
l = f.read()
print l
while True:
line = f.readline()
if not line:
time.sleep(1)
continue
print line
except IOError:
pass
【讨论】:
【参考方案26】:import time
attemps = 600
wait_sec = 5
fname = "YOUR_PATH"
with open(fname, "r") as f:
where = f.tell()
for i in range(attemps):
line = f.readline()
if not line:
time.sleep(wait_sec)
f.seek(where)
else:
print line, # already has newline
【讨论】:
【参考方案27】:import itertools
fname = 'log.txt'
offset = 5
n = 10
with open(fname) as f:
n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))
【讨论】:
【参考方案28】:abc = "2018-06-16 04:45:18.68"
filename = "abc.txt"
with open(filename) as myFile:
for num, line in enumerate(myFile, 1):
if abc in line:
lastline = num
print "last occurance of work at file is in "+str(lastline)
【讨论】:
【参考方案29】:另一种解决方案
如果您的 txt 文件如下所示: 老鼠 蛇 猫 蜥蜴 狼 狗
你可以通过简单地在 python 中使用数组索引来反转这个文件 '''
contents=[]
def tail(contents,n):
with open('file.txt') as file:
for i in file.readlines():
contents.append(i)
for i in contents[:n:-1]:
print(i)
tail(contents,-5)
结果: 狗 狼 蜥蜴 猫
【讨论】:
【参考方案30】:好吧!我遇到了类似的问题,虽然我只需要LAST LINE ONLY, 所以我想出了自己的解决方案
def get_last_line(filepath):
try:
with open(filepath,'rb') as f:
f.seek(-1,os.SEEK_END)
text = [f.read(1)]
while text[-1] != '\n'.encode('utf-8') or len(text)==1:
f.seek(-2, os.SEEK_CUR)
text.append(f.read(1))
except Exception as e:
pass
return ''.join([t.decode('utf-8') for t in text[::-1]]).strip()
此函数返回文件中的最后一个字符串 我有一个 1.27gb 的日志文件,找到最后一行所需的时间非常少(甚至不到半秒)
【讨论】:
以上是关于获取文件的最后n行,类似于tail的主要内容,如果未能解决你的问题,请参考以下文章