python 多线程将数据写入 influxDB

Posted smile-yan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 多线程将数据写入 influxDB相关的知识,希望对你有一定的参考价值。

问题描述

需要将本地的数据写入到云服务器端的 influxDB,需要写得数据比较多,每个数据文件都比较大。如果写脚本本地运行的话是比较慢的,为此特地花了一些时间加速写数据到 influxDB 中。

云服务器端

为了减少由于网络传输带来的时间消耗,建议将 数据 打包压缩起来上传到云服务器端,并且编写python 脚本 上传到云服务器上,方便在云服务器端运行。

多线程写入 influxDB

"""将数据写入 influxDB
"""
import os
import threading

import pandas as pd
from influxdb import InfluxDBClient
from pandas import DataFrame
import datetime


def csv_filter(name: str):
    return name.endswith(".csv")


def reformat_point(measurement: str, data: DataFrame):
    """将df处理成为写入influxDB的json格式
    :param measurement: csv 文件名
    :param data: dataFrame
    :return: dict
    """
    times = data["time"].values
    values = data["value"].values
    result_list = []
    for i in range(len(times)):
        result_dict = "measurement": measurement, "time": times[i], "fields": 
            "value": values[i],
            # 别的字段也是如此添加
        
        result_list.append(result_dict)
    return result_list


class WriterThread(threading.Thread):
    def __init__(self, csv_files: list, data_path: str, database: str, thread_name: str,
                 host: str = "localhost", port: int = 8086, username: str = "root", password: str = "123"):
        super().__init__()
        self.csv_files = csv_files
        self.data_path = data_path
        self.database = database
        self.thread_name = thread_name
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.headers = 
            'User-Agent': 'python-requests/2.24.0',
            'Accept': '*/*',
            'Connection': 'keep-alive',
            'Content-Type': 'application/json',
            'Authorization': 'Basic cm9vdDpyb290'
        

    def run(self):
        client = InfluxDBClient(host=self.host, username=self.username, port=self.port,
                                database=self.database, password=self.password, headers=self.headers)
        for csv in self.csv_files:
            df = pd.read_csv(self.data_path + csv)
            points = reformat_point(csv, df)
            client.write_points(points, database=self.database)
            print("[0] [1] ---->  2 size: [3]"
                  .format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), self.thread_name, csv, df.shape))
        print("[0] [1] finished".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), self.thread_name))
        client.close()


if __name__ == '__main__':
    host, port = "localhost", 8086
    # 数据的文件夹
    data_path = "/home/smileyan/influx_data/test_data/"
    username, password = "root", "123"
    database = "test_data"
    headers = 
        'User-Agent': 'python-requests/2.24.0',
        'Accept': '*/*',
        'Connection': 'keep-alive',
        'Content-Type': 'application/json',
        'Authorization': 'Basic cm9vdDpyb290'
    

    csv_files = list(filter(csv_filter, os.listdir(data_path)))
    threads = []
    # 分块,给每个线程分配任务
    parts = 10
    piece_size = int(len(csv_files) / parts)
    start = 0
    for i in range(1, parts + 1):

        end = start + piece_size
        if end > len(csv_files):
            end = len(csv_files)
        thread = WriterThread(csv_files=csv_files[start:end],
                              thread_name=str(i),
                              data_path=data_path,
                              database=database)
        start += piece_size
        threads.append(thread)

    for th in threads:
        th.start()
    print("all threads started")

总结

要根据实际需要对数据进行处理,然后写入 influxDB,但是由于 本地写到云服务器端的 influxDB 太慢,当数据量较大的时候,建议将脚本上传到云服务器端运行。

Smileyan
2022.10.27 23:30

以上是关于python 多线程将数据写入 influxDB的主要内容,如果未能解决你的问题,请参考以下文章

python多线程应用挂起

windows环境,多线程情况下,C语言向文件写入数据。

一个Socket能否被多线程写入

Python的多线程锁跟队列

python通过邮箱进行多线程通信

python的多线程多进程代码示例