Pandas 加载 CSV 比加载 SQL 更快

Posted

技术标签:

【中文标题】Pandas 加载 CSV 比加载 SQL 更快【英文标题】:Pandas is faster to load CSV than SQL 【发布时间】:2017-10-08 01:04:19 【问题描述】:

似乎从 CSV 加载数据比使用 Pandas 从 SQL (Postgre SQL) 加载数据更快。 (我有 SSD)

这是我的测试代码:

import pandas as pd
import numpy as np

start = time.time()
df = pd.read_csv('foo.csv')
df *= 3
duration = time.time() - start
print('0s'.format(duration))

engine = create_engine('postgresql://user:password@host:port/schema')
start = time.time()
df = pd.read_sql_query("select * from mytable", engine)
df *= 3
duration = time.time() - start
print('0s'.format(duration))

foo.csv 和数据库是相同的(两者中的数据和列的数量相同,4 列,100 000 行充满随机 int)。

CSV 需要 0.05 秒

SQL 耗时 0.5 秒

你认为 CSV 比 SQL 快 10 倍是正常的吗?我想知道我是否在这里遗漏了什么......

【问题讨论】:

是的。正常.... 好的,谢谢,我真的认为 SQL 更快 【参考方案1】:

虽然 Steven G 对该过程的解释从根本上回答了您的问题,而 Simon G 的 COPY 解决方案是我能找到的最有效的解决方案,但我决定更深入地研究您的问题,并且实际衡量与其相关的不同方面。

https://github.com/mikaelhg/pandas-pg-csv-speed-poc 有一个项目,其中包含各种替代解决方案的 pytest 基准测试。

此测试的 CSV 比问题中的大一个数量级,形状为 (3742616, 6)。只是为了确保各种缓冲区的大小恰到好处而导致结果偏斜的可能性较小。

感谢Finnish Traffic Safety Bureau Trafi's open data initiative提供测试数据。

至于 PostgreSQL 安装,它在规范的 Docker 容器内,并以向上的 shared_bufferswork_mem 值启动,数据文件存储在主机的 /dev/shm 挂载点下,以否定实际磁盘 I/O。它的 UNIX 套接字连接点也同样暴露出来。

version: '3'

services:

  db:
    image: 'postgres:10-alpine'
    command: "postgres -c 'shared_buffers=512MB' -c 'temp_buffers=80MB' -c 'work_mem=256MB'"
    ports:
      - '5432:5432'
    volumes:
      - '/dev/shm/pgtest/data:/var/lib/postgresql/data'
      - '/dev/shm/pgtest/run:/var/run/postgresql'
    environment:
      POSTGRES_USER: test
      POSTGRES_PASSWORD: test
      POSTGRES_DB: test

  test:
    image: pandas_speed_poc:temp
    build:
      context: .
      dockerfile: Dockerfile.test-runner
    volumes:
      - '.:/app'
      - '/dev/shm/pgtest/run:/var/run/postgresql'
    working_dir: '/app'
    user: '1000'

测试运行器是一个简单的 Ubuntu 18.04 容器:

FROM ubuntu:18.04
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get -qq update && \
    apt-get -y -qq install python3-dev python3-pip python3-psycopg2 \
                    build-essential \
                    bash less nano wait-for-it
RUN pip3 install sqlalchemy numpy pandas \
                pytest pytest-benchmark
WORKDIR /app
CMD wait-for-it db:5432 -- /bin/bash -c "trap : TERM INT; sleep infinity & wait"

实际的基准测试是为pytest-benchmark 编写的 Python 3 unittest

#!/usr/bin/python3

from sqlalchemy import create_engine
import psycopg2
import psycopg2.extensions

import pandas as pd
import numpy as np

import io
import time
import gzip

import unittest
import pytest

DATA_FILE = 'data/licenses.csv.gz'

DROP_TABLE = "DROP TABLE IF EXISTS licenses"

CREATE_TABLE = """
    CREATE TABLE licenses (
        a VARCHAR(16),
        b CHAR(3),
        c CHAR(6),
        d INTEGER,
        e INTEGER,
        f INTEGER
    )
"""

COPY_FROM = """
    COPY licenses (a, b, c, d, e, f) FROM STDIN
    WITH (FORMAT CSV, DELIMITER ';', HEADER)
"""

COPY_TO = "COPY licenses TO STDOUT WITH (FORMAT CSV, HEADER)"

SELECT_FROM = 'SELECT * FROM licenses'

VACUUM = "VACUUM FULL ANALYZE"

DB_UNIX_SOCKET_URL = 'postgresql://test:test@/test'

DB_TCP_URL = 'postgresql://test:test@db/test'

def my_cursor_factory(*args, **kwargs):
    cursor = psycopg2.extensions.cursor(*args, **kwargs)
    cursor.itersize = 10240
    return cursor

class TestImportDataSpeed(unittest.TestCase):

    @pytest.fixture(autouse=True)
    def setupBenchmark(self, benchmark):
        self.benchmark = benchmark

    @classmethod
    def setUpClass(cls):
        cls.engine = create_engine(DB_TCP_URL, connect_args='cursor_factory': my_cursor_factory)
        connection = cls.engine.connect().connection
        cursor = connection.cursor()

        cursor.execute(DROP_TABLE)
        cursor.execute(CREATE_TABLE)

        with gzip.open(DATA_FILE, 'rb') as f:
            cursor.copy_expert(COPY_FROM, file=f, size=1048576)

        connection.commit()

        connection.set_session(autocommit=True)
        cursor.execute(VACUUM)

        cursor.close()
        connection.close()

    def test_pd_csv(self):

        def result():
            return pd.read_csv(DATA_FILE, delimiter=';', low_memory=False)

        df = self.benchmark(result)
        assert df.shape == (3742616, 6)

    def test_psycopg2_cursor(self):

        def result():
            connection = self.engine.connect().connection
            cursor = connection.cursor()
            cursor.itersize = 102400
            cursor.arraysize = 102400
            cursor.execute(SELECT_FROM)
            rows = cursor.fetchall()
            cursor.close()
            connection.close()
            return pd.DataFrame(rows)

        df = self.benchmark(result)
        assert df.shape == (3742616, 6)

    def test_pd_sqla_naive(self):

        def result():
            return pd.read_sql_query(SELECT_FROM, self.engine)

        df = self.benchmark(result)
        assert df.shape == (3742616, 6)

    def test_pd_sqla_chunked(self):

        def result():
            gen = (x for x in pd.read_sql(SELECT_FROM, self.engine, chunksize=10240))
            return pd.concat(gen, ignore_index=True)

        df = self.benchmark(result)
        assert df.shape == (3742616, 6)

    def test_pg_copy(self):
        connection = self.engine.connect().connection
        cursor = connection.cursor()

        def result(cursor):
            f = io.StringIO()
            cursor.copy_expert(COPY_TO, file=f, size=1048576)
            f.seek(0)
            return pd.read_csv(f, low_memory=False)

        df = self.benchmark(result, cursor)
        assert df.shape == (3742616, 6)

最终结果:

speed_test.py .....


-------------------------------------------------------------------------------- benchmark: 5 tests -------------------------------------------------------------------------------
Name (time in s)            Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_pd_csv              1.4623 (1.0)      1.4903 (1.0)      1.4776 (1.0)      0.0110 (1.21)     1.4786 (1.0)      0.0171 (1.15)          2;0  0.6768 (1.0)           5           1
test_pg_copy             3.0631 (2.09)     3.0842 (2.07)     3.0732 (2.08)     0.0091 (1.0)      3.0769 (2.08)     0.0149 (1.0)           2;0  0.3254 (0.48)          5           1
test_psycopg2_cursor     4.5325 (3.10)     4.5724 (3.07)     4.5531 (3.08)     0.0161 (1.77)     4.5481 (3.08)     0.0249 (1.68)          2;0  0.2196 (0.32)          5           1
test_pd_sqla_naive       6.0177 (4.12)     6.0523 (4.06)     6.0369 (4.09)     0.0147 (1.62)     6.0332 (4.08)     0.0242 (1.63)          2;0  0.1656 (0.24)          5           1
test_pd_sqla_chunked     6.0247 (4.12)     6.1454 (4.12)     6.0889 (4.12)     0.0442 (4.86)     6.0963 (4.12)     0.0524 (3.52)          2;0  0.1642 (0.24)          5           1
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

您的结果会有所不同!在您自己的架构上运行测试以获得相关结果。

【讨论】:

综合设置;执行良好并记录在案。【参考方案2】:

使用 PostgreSQL 数据库时,您可以结合使用 SQL 和 CSV 来充分利用这两种方法。 SQL 准确选择您需要的数据,并通过 CSV 输出快速将其加载到 pandas DataFrame 中。

conn = psycopg2.connect(**conn_params)
with conn.cursor() as cur:
    sql = 'SELECT * FROM large_table'
    buf = io.StringIO()
    cur.copy_expert(f'COPY (sql) TO STDOUT WITH CSV HEADER', buf)
    buf.seek(0)
    df = pd.read_csv(buf, header=0, low_memory=False,
                     true_values='t', false_values='f')
conn.close()

这使用 PostgreSQL 的快速 COPY 命令和 psycopg2 的 copy_expert() 函数将查询结果读入 CSV 格式的字符串缓冲区。然后,您可以在该字符串缓冲区上使用 pandas read_csv()

缺点是您可能必须在之后转换数据类型(例如时间戳将是字符串)。 read_csv() 函数有一些参数可以帮助处理这个问题(例如 parse_datestrue_valuesfalse_values、...)。

在我的用例(3000 万行,15 列)中,与 pandas read_sql() 函数相比,这使我的性能提高了大约 2-3 倍。

【讨论】:

【参考方案3】:

CSV 比 SQL 快得多是完全正常的,但它们并不意味着相同的事情,即使您可以将它们用于相同的事情:

    CSV 用于顺序访问,即从文件的开头开始,逐行读取每一行,根据需要处理。

    SQL 用于索引访问,即,您查看索引,然后转到您要查找的行。您还可以执行全表扫描,即不使用任何索引,这使得表本质上是一个臃肿的 CSV。

您的查询是全表扫描,它不查看索引,因为它针对所有数据,所以是的,这很正常。

另一方面,如果你尝试像

这样的查询
select * from mytable where myindex = "myvalue";

与在 csv 中搜索相同的行相比,您将获得巨大的提升。那是因为 SQL 中的索引

【讨论】:

【参考方案4】:

这是正常行为,读取 csv 文件始终是简单加载数据的最快方法之一

CSV 非常天真和简单。直接从它加载会非常快。对于结构复杂的海量数据库,CSV 不是一种选择。 SQL 非常快地从表中选择数据并将该数据返回给您。自然,如果您可以选择、修改和操作数据,则会增加通话的间接时间成本。

假设您在 csv 中有一个从 1920 年到 2017 年的时间序列,但您只需要从 2010 年到今天的数据。

csv 方法是加载整个 csv,然后选择 2010 年到 2017 年。

SQL 方法是通过 SQL 选择功能预先选择年份

在这种情况下,SQL 会快得多。

【讨论】:

您能详细说明一下吗?为什么加载 CSV 比从关系数据库中获取数据更快? @Vame 一个 CSV 非常天真和简单。直接从它加载非常快。当你加载它时,你不能真的只选择 csv 的某些行。对于具有复杂结构的大型数据库,CSV 不是一种选择。 SQL 非常快地从表中选择数据并将该数据返回给您。自然,如果您可以选择、修改和操作数据,这将为您的通话增加额外的时间成本。答案示例 @StevenG Haelle 正在使用 Pandas,它可以用这种类型的查询做很多事情。调用将是 df[(df.year>=2010) & (df.year

以上是关于Pandas 加载 CSV 比加载 SQL 更快的主要内容,如果未能解决你的问题,请参考以下文章

Numpy Genfromtxt 比 pandas read_csv 慢

使用数据框在 Pandas 中加载 csv 文件

H2数据库加载csv数据更快

使用scala在sql表中加载csv文件

pandas.DataFrame.to_sql - 源 csv 文件和目标表的列顺序

从 csv.reader 之后的列(Python Pandas)中获取最早的日期