python 多线程将数据写入 influxDB
Posted smile-yan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 多线程将数据写入 influxDB相关的知识,希望对你有一定的参考价值。
问题描述
需要将本地的数据写入到云服务器端的 influxDB,需要写得数据比较多,每个数据文件都比较大。如果写脚本本地运行的话是比较慢的,为此特地花了一些时间加速写数据到 influxDB 中。
云服务器端
为了减少由于网络传输带来的时间消耗,建议将 数据 打包压缩起来上传到云服务器端,并且编写python 脚本 上传到云服务器上,方便在云服务器端运行。
多线程写入 influxDB
"""将数据写入 influxDB
"""
import os
import threading
import pandas as pd
from influxdb import InfluxDBClient
from pandas import DataFrame
import datetime
def csv_filter(name: str):
return name.endswith(".csv")
def reformat_point(measurement: str, data: DataFrame):
"""将df处理成为写入influxDB的json格式
:param measurement: csv 文件名
:param data: dataFrame
:return: dict
"""
times = data["time"].values
values = data["value"].values
result_list = []
for i in range(len(times)):
result_dict = "measurement": measurement, "time": times[i], "fields":
"value": values[i],
# 别的字段也是如此添加
result_list.append(result_dict)
return result_list
class WriterThread(threading.Thread):
def __init__(self, csv_files: list, data_path: str, database: str, thread_name: str,
host: str = "localhost", port: int = 8086, username: str = "root", password: str = "123"):
super().__init__()
self.csv_files = csv_files
self.data_path = data_path
self.database = database
self.thread_name = thread_name
self.host = host
self.port = port
self.username = username
self.password = password
self.headers =
'User-Agent': 'python-requests/2.24.0',
'Accept': '*/*',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Authorization': 'Basic cm9vdDpyb290'
def run(self):
client = InfluxDBClient(host=self.host, username=self.username, port=self.port,
database=self.database, password=self.password, headers=self.headers)
for csv in self.csv_files:
df = pd.read_csv(self.data_path + csv)
points = reformat_point(csv, df)
client.write_points(points, database=self.database)
print("[0] [1] ----> 2 size: [3]"
.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), self.thread_name, csv, df.shape))
print("[0] [1] finished".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), self.thread_name))
client.close()
if __name__ == '__main__':
host, port = "localhost", 8086
# 数据的文件夹
data_path = "/home/smileyan/influx_data/test_data/"
username, password = "root", "123"
database = "test_data"
headers =
'User-Agent': 'python-requests/2.24.0',
'Accept': '*/*',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Authorization': 'Basic cm9vdDpyb290'
csv_files = list(filter(csv_filter, os.listdir(data_path)))
threads = []
# 分块,给每个线程分配任务
parts = 10
piece_size = int(len(csv_files) / parts)
start = 0
for i in range(1, parts + 1):
end = start + piece_size
if end > len(csv_files):
end = len(csv_files)
thread = WriterThread(csv_files=csv_files[start:end],
thread_name=str(i),
data_path=data_path,
database=database)
start += piece_size
threads.append(thread)
for th in threads:
th.start()
print("all threads started")
总结
要根据实际需要对数据进行处理,然后写入 influxDB,但是由于 本地写到云服务器端的 influxDB 太慢,当数据量较大的时候,建议将脚本上传到云服务器端运行。
Smileyan
2022.10.27 23:30
以上是关于python 多线程将数据写入 influxDB的主要内容,如果未能解决你的问题,请参考以下文章