如何将 DataFrame 写入 postgres 表?

Posted

技术标签:

【中文标题】如何将 DataFrame 写入 postgres 表?【英文标题】:How to write DataFrame to postgres table 【发布时间】:2014-05-31 00:03:34 【问题描述】:

DataFrame.to_sql 方法,但它只适用于 mysql、sqlite 和 oracle 数据库。我无法将这种方法传递给 postgres 连接或 sqlalchemy 引擎。

【问题讨论】:

【参考方案1】:

从 pandas 0.14(2014 年 5 月下旬发布)开始,支持 postgresql。 sql 模块现在使用 sqlalchemy 来支持不同的数据库风格。您可以为 postgresql 数据库传递 sqlalchemy 引擎(请参阅docs)。例如:

from sqlalchemy import create_engine
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

你说得对,在 pandas 中不支持 0.13.1 版 postgresql 是正确的。如果你需要使用旧版本的 pandas,这里有一个补丁版本的pandas.io.sql:https://gist.github.com/jorisvandenbossche/10841234。 我之前写过这个,所以不能完全保证它总是有效的,但应该有基础)。如果您将该文件放在您的工作目录中并导入它,那么您应该可以这样做(其中con 是一个postgresql 连接):

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

【讨论】:

这个达到 0.14 了吗? 是的,而且 0.15 已经发布(候选发布版)。我会更新答案,谢谢你的提问。 这篇文章为我解决了这个问题:***.com/questions/24189150/… 注意:to_sql 不会在 postgres 中导出数组类型。 我可以使用使用psycopg2.connect() 创建的现有Postgres 连接而不是创建新的Sqlalchemy engine 吗?【参考方案2】:

更快的选择:

以下代码将比 df.to_sql 方法更快地将您的 Pandas DF 复制到 postgres DB,并且您不需要任何中间 csv 文件来存储 df。

根据您的数据库规范创建引擎。

在您的 postgres 数据库中创建一个与 Dataframe (df) 具有相同列数的表。

DF 中的数据将在您的 postgres 表中插入

from sqlalchemy import create_engine
import psycopg2 
import io

如果你想替换表,我们可以使用普通的 to_sql 方法使用我们的 df 的 headers 替换它,然后将整个耗时的 df 加载到 DB 中。

engine = create_engine('postgresql+psycopg2://username:password@host:port/database')

df.head(0).to_sql('table_name', engine, if_exists='replace',index=False) #drops old table and creates new empty table

conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()

【讨论】:

你为什么要output.seek(0) 这太快了,真有趣:D Load is table 对我来说失败了,因为某些字段中有换行符。我该如何处理? df.to_csv(output, sep='\t', header=False, index=False, encoding='utf-8') cur.copy_from(output, 'messages', null="") # null 值变成'' contents = output.getvalue()抛出内存错误怎么办??? 如果要使用schema,可以在to_sql部分代码中添加schema=your_schema参数。【参考方案3】:

Pandas 0.24.0+ 解决方案

在 Pandas 0.24.0 中引入了一项新功能,专为快速写入 Postgres 而设计。你可以在这里了解更多信息:https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO

from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('""'.format(k) for k in keys)
        if table.schema:
            table_name = '.'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY  () FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

【讨论】:

在大多数情况下,添加method='multi' 选项就足够快了。但是,是的,这种COPY 方法是目前最快的方法。 这仅适用于 csv 吗?它也可以与 .xlsx 一起使用吗?关于这部分内容的一些注释会有所帮助。 with 之后的第一部分是写入内存缓冲区。 with 的最后一部分是使用 SQL 语句并利用 copy_expert 的速度来批量加载数据。 columns =开头的中间部分是做什么的? 这对我来说效果很好。你能解释一下psql_insert_copy 函数中的keys 参数吗?它如何获得任何键并且键只是列名? 我尝试过使用这种方法,但是它会抛出一个错误:Table 'XYZ' already exists。据我了解,它不应该创建一个表,不是吗? @E.Epstein - 您可以将最后一行修改为 df.to_sql('table_name', engine, if_exists='replace', method=psql_insert_copy) - 这确实会在您的数据库中创建一个表。【参考方案4】:

我就是这样做的。

可能会更快,因为它使用的是execute_batch

# df is the dataframe
if len(df) > 0:
    df_columns = list(df)
    # create (col1,col2,...)
    columns = ",".join(df_columns)

    # create VALUES('%s', '%s",...) one '%s' per column
    values = "VALUES()".format(",".join(["%s" for _ in df_columns])) 

    #create INSERT INTO table (columns) VALUES('%s',...)
    insert_stmt = "INSERT INTO  () ".format(table,columns,values)

    cur = conn.cursor()
    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
    conn.commit()
    cur.close()

【讨论】:

我得到 AttributeError:模块 'psycopg2' 没有属性 'extras'。啊,这需要显式导入。导入 psycopg2.extras 这个函数比sqlalchemy方案快很多【参考方案5】:
创建引擎(其中 dialect='postgres' 或 'mysql' 等):
from sqlalchemy import create_engine
engine = create_engine(f'dialect://user_name@host:port/db_name')
Session = sessionmaker(bind=engine) 

with Session() as session:
    df = pd.read_csv(path + f'/file') 
    df.to_sql('table_name', con=engine, if_exists='append',index=False)

【讨论】:

它适用于大多数数据库,包括 postgres。您必须在引擎中指定方言 = create_engine(dialect='postgres', etc....)【参考方案6】:

使用 psycopg2,您可以使用本机 sql 命令将数据写入 postgres 表。

import psycopg2
import pandas as pd

conn = psycopg2.connect("dbname='db' user='user' host='host' port='port' password='passwd'".format(
            user=pg_user,
            passwd=pg_pass,
            host=pg_host,
            port=pg_port,
            db=pg_db))
cur = conn.cursor()    
def insertIntoTable(df, table):
        """
        Using cursor.executemany() to insert the dataframe
        """
        # Create a list of tupples from the dataframe values
        tuples = list(set([tuple(x) for x in df.to_numpy()]))
    
        # Comma-separated dataframe columns
        cols = ','.join(list(df.columns))
        # SQL query to execute
        query = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s)" % (
            table, cols)
    
        try:
            cur.executemany(query, tuples)
            conn.commit()

        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            return 1

【讨论】:

一个好的答案将始终包括解释为什么这会解决问题,以便 OP 和任何未来的读者可以从中学习。【参考方案7】:

适用于 Python 2.7 和 Pandas 0.24.2 并使用 Psycopg2

Psycopg2 连接模块

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
    # Parse in connection information
    credentials = 'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm
    conn = psycopg2.connect(**credentials)
    conn.autocommit = True  # auto-commit each entry to the database
    conn.cursor_factory = RealDictCursor
    cur = conn.cursor()
    print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
    return conn, cur

连接到数据库

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

假设数据框已经作为 df 存在

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()

【讨论】:

以上是关于如何将 DataFrame 写入 postgres 表?的主要内容,如果未能解决你的问题,请参考以下文章

将 Pyspark 数据帧加载到 postgres RDS 中的表中时出错

如何将 DataFrame 写入 MySQL 表

Spark SQL - 如何将 DataFrame 写入文本文件?

如何将 pyspark-dataframe 写入红移?

如何将大型 Pyspark DataFrame 写入 DynamoDB

Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?