使用替换字段值查询后将系列转储回 InfluxDB

Posted

技术标签:

【中文标题】使用替换字段值查询后将系列转储回 InfluxDB【英文标题】:Dump series back into InfluxDB after querying with replaced field value 【发布时间】:2019-02-10 19:03:16 【问题描述】:

场景

我想通过从 InfluxDB 查询测量值来将数据发送到 MQTT 代理(云)。

我在名为status 的架构中有一个字段。它可以是10status=0 表示该系列尚未发送到云端。如果我得到 MQTT 代理的确认,那么我希望使用 status=1 将查询重写回数据库。

如FAQs for InfluxDB regarding Duplicate data 中所述,如果信息与之前的查询具有相同的时间戳但具有不同的字段值 => 则将显示更新字段。

为了测试这一点,我创建了以下内容:

CREATE DATABASE dummy
USE dummy
INSERT meas_1, type=t1, status=0,value=123 1536157064275338300

查询:

SELECT * FROM meas_1

提供

time                status type value         
1536157064275338300 0      t1   234      

现在,如果我想覆盖该系列,我会执行以下操作:

INSERT meas_1, type=t1, status=1,value=123 1536157064275338300                                                                       

这将覆盖系列

 time                status type value         
 1536157064275338300 1      t1   234     

(注意:目前 InfluxDB 中的 标签 无法做到这一点)

用法

    使用"status"=0客户端查询一些信息。 重组 JSON 以发送到云端 将信息发送到云端 如果成功,则将步骤 1. 的输出写回 DB,但使用 status=1

我正在使用InfluxDBClient Python3 创建应用程序(MQTT + InfluxDB)

write_points API 中有一个参数提到batch_size,需要int 作为输入。

我不确定如何将它与我想要的应用程序一起使用。有人可以指导我使用这个或数据库的架构,以便我可以将实际和非冗余信息上传到云端吗?

【问题讨论】:

【参考方案1】:

batch_size 实际上是需要传递给write_points 的测量列表的长度。

步骤

    创建客户端并从测量中查询(这里我们查询gps信息)

    client = InfluxDBClient(database='dummy')
    
    op = client.query('SELECT * FROM gps WHERE "status"=0', epoch='ns')
    

    ResultSet 放入列表中:

     batch = list(op.get_points('gps'))
    

    为更新创建一个空列表

     updated_batch = []
    

    解析每个测量并将status 标志更改为1。注意,InfluxDB 中的默认值是浮点数

       for each in batch:
    new_mes = 
    'measurement': 'gps',
    'tags': 
    'type': 'gps'
    ,
    'time': each['time'],
    'fields': 
      'lat': float(each['lat']),
      'lon': float(each['lon']),
      'alt': float(each['alt']),
      'status': float(1)
    
    
    updated_batch.append(new_mes)
    

    最后通过客户端以batch_size作为updated_batch的长度转储积分

    client.write_points(updated_batch, batch_size=len(updated_batch))
    

这会覆盖系列,因为它包含相同的时间戳,status 字段设置为 1

【讨论】:

以上是关于使用替换字段值查询后将系列转储回 InfluxDB的主要内容,如果未能解决你的问题,请参考以下文章

InfluxDB Flux - 过滤字段匹配值的位置

从 InfluxDB 测量中删除具有不需要的字段值的点

influxDB:如何在 influxDB v2.0 中将字段转换为标签

InfluxDB - 使用 Flux 按系列数限制查询结果

如何获取InfluxDB系列的基本名称列表?

在 influxdb 中为聚合数据添加标签