有效地从压缩的、分块的 HTTP 流中读取行,因为它们到达
Posted
技术标签:
【中文标题】有效地从压缩的、分块的 HTTP 流中读取行,因为它们到达【英文标题】:Efficiently reading lines from compressed, chunked HTTP stream as they arrive 【发布时间】:2014-03-14 21:35:48 【问题描述】:我编写了一个 HTTP 服务器,它可以生成由 JSON 结构事件组成的无穷无尽的 HTTP 流。类似于 Twitter 的流媒体 API。这些事件由\n
分隔(根据Server-sent events with Content-Type:text/event-stream)并且长度可能不同。
回应是
由于无限流而分块(HTTP 1.1 Transfer-Encoding:chunked) 压缩(内容编码:gzip)以节省带宽。我想在 Python 中尽快使用这些行,并尽可能节省资源,而无需重新发明***。
由于我目前正在使用 python-requests,你知道如何使它工作吗? 如果您认为 python-requests 在这里无济于事,我完全愿意接受替代框架/库。
我当前的实现基于requests 并使用iter_lines(...)
来接收行。但是chunk_size
参数很棘手。如果设置为1
,它会占用大量 CPU,因为某些事件可能会达到几千字节。如果设置为大于 1 的任何值,则某些事件会卡住,直到下一次到达并且整个缓冲区“被填满”。事件之间的时间可以是几秒钟。
我预计chunk_size
是某种“要接收的最大字节数”,就像在 unix 的recv(...)
中一样。相应的手册页说:
接收调用通常会返回任何可用数据,直到 请求的金额,而不是等待收到全额 请求。
但这显然不是它在请求库中的工作方式。他们或多或少地将其用作“要接收的确切字节数”。 在查看他们的源代码时,我无法确定哪个部分对此负责。也许是 httplib 的 Response 或者 ssl 的 SSLSocket。
作为一种解决方法,我尝试将服务器上的行填充为块大小的倍数。 但是 requests-library 中的块大小用于从 压缩 响应流中获取字节。 因此,直到我可以填充我的行以使它们的 compressed 字节序列是块大小的倍数之前,这将不起作用。但这似乎太老套了。
我读到 Twisted 可用于客户端上的 http 流的非阻塞、非缓冲处理,但我只找到了用于在服务器上创建流响应的代码。
【问题讨论】:
你知道一个很好的框架/库来完成这项任务吗?库请求是题外话,恐怕。 对图书馆请求感到抱歉。如前所述,我目前正在使用 python-requests 并希望继续使用它。所以我的问题主要是关于如何用 python-requests 做事。但是:如果没有办法,我完全可以使用另一个库。 可以找线。底层压缩 zlib 支持使用的刷新 (Z_SYNC_FLUSH),当我在 GZipped 响应流上刷新 Tornado 中的响应时。因此,http 流非常好,被分割成完美的片段,其中包含完整的压缩行。仅仅用 Python 读回它们是很困难的。 我的分析不正确;我正在下面写更多信息。这不是requests
可以解决的限制。
我曾经做过类似的事情,基本上是去掉所有的套接字处理(特别是 socket.makefile)来正确处理缓冲区并使用 select() 从套接字增量读取内容。工作得很好,但这是一个正确的主要 PITA。基本上,一旦你得到响应,从响应中提取套接字并自己处理东西。这使 CPU 使用率从 90% 下降到 2%,并大大提高了吞吐量。 (但那是在古代 Python 2.2 中,2.7+ httplib 在分块编码方面做得更好)。
【参考方案1】:
感谢Martijn Pieters answer,我停止处理 python-requests 行为并寻找完全不同的方法。
我最终使用了pyCurl。您可以将其与 select+recv 循环类似地使用,而无需反转控制流并将控制权交给 Tornado 等专用的 IO 循环。这样很容易使用生成器,一旦它们到达就产生新行- 无需在可能引入延迟或运行 IO 循环的额外线程的中间层中进行进一步缓冲。
同时,它足够高级,您无需担心分块传输编码、SSL 加密或 gzip 压缩。
这是我的旧代码,其中chunk_size
=1 导致 45% 的 CPU 负载,chunk_size
>1 引入了额外的延迟。
import requests
class RequestsHTTPStream(object):
def __init__(self, url):
self.url = url
def iter_lines(self):
headers = 'Cache-Control':'no-cache',
'Accept': 'text/event-stream',
'Accept-Encoding': 'gzip'
response = requests.get(self.url, stream=True, headers=headers)
return response.iter_lines(chunk_size=1)
这是我基于 pyCurl 的新代码:
(不幸的是 curl_easy_* 样式 perform
完全阻塞,这使得在不使用线程的情况下很难在中间产生行。因此我使用 curl_multi_* 方法)
import pycurl
import urllib2
import httplib
import StringIO
class CurlHTTPStream(object):
def __init__(self, url):
self.url = url
self.received_buffer = StringIO.StringIO()
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
self.curl.setopt(pycurl.ENCODING, 'gzip')
self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)
self.curlmulti = pycurl.CurlMulti()
self.curlmulti.add_handle(self.curl)
self.status_code = 0
SELECT_TIMEOUT = 10
def _any_data_received(self):
return self.received_buffer.tell() != 0
def _get_received_data(self):
result = self.received_buffer.getvalue()
self.received_buffer.truncate(0)
self.received_buffer.seek(0)
return result
def _check_status_code(self):
if self.status_code == 0:
self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
if self.status_code != 0 and self.status_code != httplib.OK:
raise urllib2.HTTPError(self.url, self.status_code, None, None, None)
def _perform_on_curl(self):
while True:
ret, num_handles = self.curlmulti.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
return num_handles
def _iter_chunks(self):
while True:
remaining = self._perform_on_curl()
if self._any_data_received():
self._check_status_code()
yield self._get_received_data()
if remaining == 0:
break
self.curlmulti.select(self.SELECT_TIMEOUT)
self._check_status_code()
self._check_curl_errors()
def _check_curl_errors(self):
for f in self.curlmulti.info_read()[2]:
raise pycurl.error(*f[1:])
def iter_lines(self):
chunks = self._iter_chunks()
return self._split_lines_from_chunks(chunks)
@staticmethod
def _split_lines_from_chunks(chunks):
#same behaviour as requests' Response.iter_lines(...)
pending = None
for chunk in chunks:
if pending is not None:
chunk = pending + chunk
lines = chunk.splitlines()
if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None
for line in lines:
yield line
if pending is not None:
yield pending
此代码尝试从传入流中获取尽可能多的字节,如果只有少数,则不会不必要地阻塞。相比之下,CPU 负载在 0.2% 左右
【讨论】:
这self.received_buffer.truncate(0)
不会导致数据丢失吗?我的意思是通过回调方法(self.received_buffer.write
)写入self.received_buffer
但尚未读取self.received_buffer.getvalue()
的数据?
写入不会异步发生,WRITEFUNCTION 仅在 perform() 内部调用。当您应该调用 perform 来读取/写入数据时, select() 指示不要进行不必要的轮询。
干得好!无论如何,您可以添加一个如何使用它的示例吗?向您展示如何将它与progressbar.ProgressBar 一起使用,我们将不胜感激。【参考方案2】:
您的iter_lines()
调用被阻塞并不是requests
' 的错。
Response.iter_lines()
方法调用Response.iter_content()
,后者调用urllib3
的HTTPResponse.stream()
,后者又调用HTTPResponse.read()
。
这些调用传递一个块大小,这就是作为self._fp.read(amt)
传递到套接字的内容。这是有问题的调用,因为self._fp
是由socket.makefile()
生成的文件对象(由httplib
module 完成);这个.read()
调用将 阻塞,直到amt
(压缩)字节被读取。
这个低级套接字文件对象确实支持.readline()
调用,它会更有效地工作,但是urllib3
在处理压缩数据时不能使用这个调用;行终止符在压缩流中不可见。
不幸的是,urllib3
不会在响应未压缩时调用 self._fp.readline()
;调用的结构方式很难传递你想在行缓冲模式下而不是在块缓冲模式下读取。
我必须说,HTTP 不是用于流式事件的最佳协议;我会为此使用不同的协议。 Websockets 浮现在脑海中,或者是针对您特定用例的自定义协议。
【讨论】:
感谢您指出,“问题”在请求使用的 http 堆栈的深处! @ThomasB.:是的,据我所知,没有任何库可以以干净的方式解决这个问题。压缩在这里没有帮助。您必须使用urllib2
,然后从响应对象访问原始套接字,进行非阻塞读取并进行自己的解压缩。不漂亮。
阅读SSE vs WebSockets后我选择了SSE。我只需要下游,JS 客户端超级简单,服务器超级简单,反向代理、加密、压缩在 HTTP 上都可以正常工作。
我已经切换到 PyCurl。它透明地处理分块、压缩、ssl 验证……并且响应速度更快(关于缓冲区惰性)。我将更新我的问题以包括我的发现。
@ThomasB.:我会自己回答。以上是关于有效地从压缩的、分块的 HTTP 流中读取行,因为它们到达的主要内容,如果未能解决你的问题,请参考以下文章