如何使用 pandas 读取并推送到 SQL 数据库中的文件不断获取数据

Posted

技术标签:

【中文标题】如何使用 pandas 读取并推送到 SQL 数据库中的文件不断获取数据【英文标题】:How to read and push to an SQL database using pandas a file that is constantly getting data 【发布时间】:2021-03-28 01:14:48 【问题描述】:

我有一个 CSV 文件,它不断地让另一个程序插入新数据(在末尾附加新行)。

我的目标是在 python 中读取这个文件并将传入的数据在线推送到 SQL 数据库中。在推送数据之前,我不想等待(或者更确切地说我负担不起)文件完成。一旦文件打开,程序应该不断地等待新的行被推送到数据库中。

Pandas 嵌入了一个非常有用的方法to_sql,我目前正在使用该方法将数据推送到数据库中,这就是我喜欢继续使用 pandas 的原因。

Pandas 的 read_csv 方法接受我尝试使用的 chunk_size 参数。行为很有趣,但是每次查询块时文件重新加载,因此无法解决我的问题。在这个例子中

df = pd.read_csv('filename.csv', chunksize=1)
time.sleep(10)
df.get_chunk(5)

如果文件中的数据在sleep期间被修改,查询chunk时不会被抓到。

有人知道我该怎么做吗?

提前致谢

【问题讨论】:

我认为您需要保持与 pandas 的开放连接,以便它知道它写入的最后一个块,或者您可以使用 skiprows 参数并在每次从 csv 读取时从该行读取文件。如果读取后移动 csv 会发生什么? @Manakin 加载 csv 文件后,Windows 会阻止您移动文件,因为它检测到它正在程序中打开。所以你认为只需要read_csv() 持续访问最新数据?没有办法保持对传入数据的读取开放 【参考方案1】:

一般来说:不要。如果您正在读取由另一个进程打开的文件,则无法确定您正在读取有效的 CSV(因为写入过程可能已经完成了一半)。

如果你坚持,有几种方法可以做到。第一个是使用watchdog。这个 Python 库使“监听”文件更改成为可能。一旦文件发生变化(因为其他进程写了新行),您可以再次调用pandas.read_csv

另一种方法是定期(例如每 10 秒)检查文件的内容。这是此类程序的示例:

import time
import pandas


def check_updates(n_rows_read: int) -> int:
    """ Check for updates in a file, starting from a specific row number.

    Args:
        n_rows_read (int): The number of rows in the file that are already read

    Returns:
        int: The total number of lines that are now processed in the file
    """

    # Read the file, starting from where we left off earlier
    df = pandas.read_csv("./test.csv", skiprows=n_rows_read)

    # TODO: Implement your logic here, and push to your SQL database
    # df.to_sql(...)

    return df.shape[0] + n_rows_read


if __name__ == "__main__":

    # In the begining we haven't read any lines yet
    n_rows_read = 0

    # Start an infinite loop
    while True:

        # Get updates, and keep track of how many rows of the file are processed
        n_rows_read = check_updates(n_rows_read)

        # Wait for a bit
        time.sleep(10)

注意:您需要根据自己的目的修改此代码

【讨论】:

谢谢!我不知道看门狗库。我开发了与您的代码 sn-p 类似的东西。我想知道使用我不知道的库是否可以提供更好的解决方案。无论如何,谢谢你的回答,它实际上回答了我所有的问题,所以我接受了这个答案。

以上是关于如何使用 pandas 读取并推送到 SQL 数据库中的文件不断获取数据的主要内容,如果未能解决你的问题,请参考以下文章

Oracle PL/SQL 程序在源表中拆分逗号分隔的数据并推送到目标表中

EventMachine 和 em-websocket - 从队列中读取并推送到通道

如何设置状态并推送到数组?

如何使用 GitHub API v3 创建提交并推送到 repo?

如何捕获屏幕和音频输入并推送到 rtmp 服务器?

如何在 Vue 中求和并推送到匹配的对象