Dask 加载 JSON(用于实时绘图)
Posted
技术标签:
【中文标题】Dask 加载 JSON(用于实时绘图)【英文标题】:Dask load JSON (for realtime plots) 【发布时间】:2022-01-09 00:10:29 【问题描述】:我正在尝试使用 dask 从 http 地址加载 JSON,然后将其放入数据框中,以便用破折号绘制一些实验数据。目标是实时获取数据并显示机器的实时图(示例数据可以在这里找到:http://aav.rz-berlin.mpg.de:17668/retrieval/data/getData.json?pv=FHIMP%3AHeDrop%3AForepressure_Droplet_Src)
这是我尝试过的:
import json
import dask.bag as db
mybag = db.read_text("http://aav.rz-berlin.mpg.de:17668/retrieval/data/getData.json?pv=FHIMP%3AHeDrop%3AForepressure_Droplet_Src").map(json.loads)
mybag.to_dataframe()
但是 mybag.to_dataframe() 冻结了我的代码。
我也试过了:
import dask.dataframe as dd
dd.read_json('url')
返回“ValueError:预期的对象或值”。所以根据错误信息,根本没有JSON。问题是否源于由元数据和数据字段组成的 JSON?
附带问题:如果我想提供一个 Web 应用程序进行监控,我的系统是否还有这样的意义?这是我第一次与 Dash 和 Dask 合作。如果我理解正确,Dask 基本上会在这里完成后端的工作,如果我有一个向我发送 JSON 数据的 API,则无需让它独立存在。
【问题讨论】:
【参考方案1】:关于问题中第二个sn-p报错的小说明,字符串传递给dd.read_json
,触发报错:
import dask.dataframe as dd
dd.read_json('url') # note that a string is passed here
【讨论】:
【参考方案2】:Dask 通常不是实时/流式分析引擎。大多数情况下,事情应该是功能性的,以相同的参数运行相同的任务可以保证产生相同的输出 - 显然不是这里的情况。
client.submit
API 可以生成实时分析,它会在调用时创建任意任务。但是,它仍然要求任务是有限的,以便其他任务随后获取结果并对其进行操作。从给定的 URL 读取永远不会结束。
如果您想将 dask 与流数据结合使用,或者通常希望在 python 中处理流数据,您可能想尝试streamz
。列出的源主要是轮询(在计时器上重复某些操作以检查新事件)或由入站事件驱动(如服务器等待连接)。不过,您可以轻松地为 HTTP 端点创建一个源:
from streamz import Source, Stream
import aiohttp
@Stream.register_api(staticmethod)
class from_http(Source):
def __init__(self, url, chunk_size=1024, **kwargs):
self.url = url
self.chunk_size = chunk_size
super().__init__(**kwargs)
async def run(self):
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as resp:
async for chunk in resp.content.iter_chunked(self.chunk_size):
await self.emit(chunk, asynchronous=True)
此流节点的输出是二进制数据块 - 由您编写可以将其解析为 JSON 的下游节点(因为块边界不会尊重 JSON 记录终止符)。
【讨论】:
我不确定它是否只是我的系统,但 json 流确实会在一段时间后结束...... 这让生活更轻松! 嗯,但是对于实时监控来说,确定正确的时间不是很棘手吗? (在当前请求过期之前发起新请求)或者 streamz 可以按时对齐系列... streamz 的这个实现是读取单个请求,直到数据用完。但是,是的,你可以做任何你喜欢的后期处理。以上是关于Dask 加载 JSON(用于实时绘图)的主要内容,如果未能解决你的问题,请参考以下文章