Python InfluxDB2 - write_api.write(...) 如何检查是不是成功?

Posted

技术标签:

【中文标题】Python InfluxDB2 - write_api.write(...) 如何检查是不是成功?【英文标题】:Python InfluxDB2 - write_api.write(...) How to check for success?Python InfluxDB2 - write_api.write(...) 如何检查是否成功? 【发布时间】:2021-12-08 08:32:54 【问题描述】:

我需要将历史数据写入 InfluxDB(我使用的是 Python,在这种情况下这不是必须的,所以我可能愿意接受非 Python 解决方案)。我像这样设置了写API

write_api = client.write_api(write_options=ASYNCHRONOUS)

数据来自一个以时间戳为键的DataFrame,所以我像这样将它写入数据库

result = write_api.write(bucket=bucket, data_frame_measurement_name=field_key, record=a_data_frame)

即使 InfluxDB 服务器已关闭,此调用也不会引发异常。 result 有一个受保护的属性 _success,它是调试中的布尔值,但我无法从代码中访问它。

如何检查写入是否成功?

【问题讨论】:

咆哮:我多么讨厌这个基于事件的勇敢的新世界,一切都是“最终的一致性,或者永远不会”。 【参考方案1】:

如果您想立即将数据写入数据库,请使用同步版本的write_api - https://github.com/influxdata/influxdb-client-python/blob/58343322678dd20c642fdf9d0a9b68bc2c09add9/examples/example.py#L12

应该通过调用.get() - https://github.com/influxdata/influxdb-client-python#asynchronous-client“触发”异步写入

问候

【讨论】:

您的回答并不完全正确,但它引导我找到了正确的解决方案。【参考方案2】:

如果您使用后台批处理,您可以添加自定义成功、错误和重试回调。

from influxdb_client import InfluxDBClient

def success_cb(details, data):
    url, token, org = details
    print(url, token, org)
    data = data.decode('utf-8').split('\n')
    print('Total Rows Inserted:', len(data))  

def error_cb(details, data, exception):
    print(exc)

def retry_cb(details, data, exception):
    print('Retrying because of an exception:', exc)    


with InfluxDBClient(url, token, org) as client:
    with client.write_api(success_callback=success_cb,
                          error_callback=error_cb,
                          retry_callback=retry_cb) as write_api:

        write_api.write(...)

如果您急于测试所有回调并且不想等到所有重试完成,您可以覆盖间隔和重试次数。

from influxdb_client import InfluxDBClient, WriteOptions

with InfluxDBClient(url, token, org) as client:
    with client.write_api(success_callback=success_cb,
                          error_callback=error_cb,
                          retry_callback=retry_cb,
                          write_options=WriteOptions(retry_interval=60,
                                                     max_retries=2),
                          ) as write_api:
        ...

【讨论】:

【参考方案3】:

write_api.write() 返回 multiprocessing.pool.AsyncResultmultiprocessing.pool.AsyncResult(两者相同)。

使用此返回对象,您可以通过多种方式检查异步请求。见这里:https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult

如果可以使用阻塞请求,则可以使用write_api = client.write_api(write_options=SYNCRONOUS)

【讨论】:

以上是关于Python InfluxDB2 - write_api.write(...) 如何检查是不是成功?的主要内容,如果未能解决你的问题,请参考以下文章

influxdb2安装

influxdb2安装

Windows安装InfluxDB2

python_操作excel

Python_异常:TypeError: write() argument must be str, not list

使用入口点脚本初始化 influxdb2 存储桶