了解 Python HTTP 流

Posted

技术标签:

【中文标题】了解 Python HTTP 流【英文标题】:Understanding Python HTTP streaming 【发布时间】:2013-07-23 05:45:52 【问题描述】:

我正在努力使用 Python 和请求访问流式 API。

API 说明:“我们启用了流式端点,以使用持久的 HTTP 套接字连接来请求报价和交易数据。来自 API 的流式数据包括发出经过身份验证的 HTTP 请求并让 HTTP 套接字对不断接收数据。”

我是如何尝试访问数据的:

s = requests.Session()
def streaming(symbols):
    url = 'https://stream.tradeking.com/v1/market/quotes.json'
    payload = 'symbols': ','.join(symbols)
    return s.get(url, params=payload, stream=True)  
r = streaming(['AAPL', 'GOOG'])

Requests 文档here 展示了两件有趣的事情:使用生成器/迭代器来处理分块数据,并在数据字段中传递。对于流式数据,建议使用如下代码:

for line in r.iter_lines():
    print(line)

似乎都不起作用,虽然我不知道在生成器函数中放什么,因为这个例子不清楚。使用 r.iter_lines(),我得到输出:"b'"status":"connected""status":disconnected"'"

我可以访问标头,响应为 HTTP 200,但无法获取有效数据,或者找到有关如何在 python 中访问流式 HTTP 数据的明确示例。任何帮助,将不胜感激。 API 建议使用 Jetty for Java 来保持流打开,但我不确定如何在 Python 中执行此操作。

标头:'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'

【问题讨论】:

将您的网址放入我的浏览器会返回“无效的授权标头”消息。您需要进行身份验证吗?和/或,您是否正确读取 JSON 格式的结果? @verbsintransit 如果 OP 看到 200 OK,那么显然他已经完成了身份验证。 API 确实需要身份验证。为简单起见,我省略了 auth 行。我会发布密钥,但它们与我的交易账户相关联。对于非流式请求,我使用 Request 的 .json()。我可能也需要在这里这样做 - 不确定。 最初错过了 200 响应,我的错。只是还想说api键的一个好习惯是通过单独的函数从文件中读取它们。这样你就可以不用担心复制/粘贴代码 sn-ps,如果你使用 git,很容易将该文件包含在 .gitignore 中。 【参考方案1】:

正如verbsintransit 所说,您需要解决您的身份验证问题,但是您可以使用以下示例解决您的流式传输问题:

s = requests.Session()

def streaming(symbols):
    payload = 'symbols': ','.join(symbols)
    headers = 'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'
    req = requests.Request("GET",'https://stream.tradeking.com/v1/market/quotes.json',
                           headers=headers,
                           params=payload).prepare()

    resp = s.send(req, stream=True)

    for line in resp.iter_lines():
        if line:
            yield line


def read_stream():

    for line in streaming(['AAPL', 'GOOG']):
        print line


read_stream()

if line: 条件是检查line 是实际消息还是只是保持连接。

【讨论】:

非常感谢!我想我可能需要等到 tmw 股市开盘时才能对其进行全面测试。同时我会学习/尝试理解它,并将发布发生的事情。 这个例子的关键位是send()中的stream=True。如果您没有设置,Requests 会尝试下载整个正文。文档确实显示了该关键字正在使用中。 我得到的错误如下: ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response',)) 我真的很想让这种方法发挥作用。我遇到错误并且不清楚如何进行调试:urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer'))【参考方案2】:

不确定您是否发现了这一点,但 TradeKing 不会在其 JSON blob 之间添加换行符。因此,您必须使用 iter_content 逐字节获取它,将该字节附加到缓冲区,尝试解码缓冲区,成功清除缓冲区并生成结果对象。 :(

【讨论】:

很高兴为您解决了问题!我开始做的是使用 NodeJS 来侦听流并通过 ZeroMQ 接口向 python 侦听器生成对象,然后他们使用它来做魔术。 Python 在处理大量符号时变得迟缓。 krillr,您能否发布一个 URL,指向与您使用 Node 和 ZeroMQ 的解决方案相关的任何有用信息。非常感谢【参考方案3】:
import requests
from requests_oauthlib import OAuth1


def streaming(symbols):
    consumer_key     = '***'
    consumer_secret  = '***'
    access_token     = '***'
    access_secret    = '***'

    auth = OAuth1(consumer_key,
        client_secret = consumer_secret,
        resource_owner_key = access_token,
        resource_owner_secret = access_secret)
        
    payload = 'symbols': ','.join(symbols)
    resp = requests.Session().request("GET",'https://stream.tradeking.com/v1/market/quotes.json',stream=True,auth=auth,params=payload)
    # resp.raise_for_status()
    
    for chunk in resp.iter_content(chunk_size=1):
        if chunk:
            yield chunk.decode('utf8')

#try this
for line in streaming(['AAPL', 'GOOG']):
    print(line)

【讨论】:

欢迎来到 ***。虽然这段代码可以解决问题,including an explanation 解决问题的方式和原因确实有助于提高帖子的质量,并可能导致更多的赞成票。请记住,您正在为将来的读者回答问题,而不仅仅是现在提出问题的人。请edit您的回答添加解释并说明适用的限制和假设。

以上是关于了解 Python HTTP 流的主要内容,如果未能解决你的问题,请参考以下文章

使用 Python 读取 HTTP 服务器推送流

python 2 控制流

Python 3 从互联网广播流中获取歌曲名称

Python 检查互联网广播流是不是可用/直播的方法

Python 控制流代码混淆简介,加大别人分析你代码逻辑和流程难度

使用 Requests HTTP 库了解 python 中的内存消耗增加