如何将 DataFrame 写入 postgres 表?
Posted
技术标签:
【中文标题】如何将 DataFrame 写入 postgres 表?【英文标题】:How to write DataFrame to postgres table 【发布时间】:2014-05-31 00:03:34 【问题描述】:有 DataFrame.to_sql 方法,但它只适用于 mysql、sqlite 和 oracle 数据库。我无法将这种方法传递给 postgres 连接或 sqlalchemy 引擎。
【问题讨论】:
【参考方案1】:从 pandas 0.14(2014 年 5 月下旬发布)开始,支持 postgresql。 sql
模块现在使用 sqlalchemy
来支持不同的数据库风格。您可以为 postgresql 数据库传递 sqlalchemy 引擎(请参阅docs)。例如:
from sqlalchemy import create_engine
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
df.to_sql('table_name', engine)
你说得对,在 pandas 中不支持 0.13.1 版 postgresql 是正确的。如果你需要使用旧版本的 pandas,这里有一个补丁版本的pandas.io.sql
:https://gist.github.com/jorisvandenbossche/10841234。
我之前写过这个,所以不能完全保证它总是有效的,但应该有基础)。如果您将该文件放在您的工作目录中并导入它,那么您应该可以这样做(其中con
是一个postgresql 连接):
import sql # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')
【讨论】:
这个达到 0.14 了吗? 是的,而且 0.15 已经发布(候选发布版)。我会更新答案,谢谢你的提问。 这篇文章为我解决了这个问题:***.com/questions/24189150/… 注意:to_sql 不会在 postgres 中导出数组类型。 我可以使用使用psycopg2.connect()
创建的现有Postgres
连接而不是创建新的Sqlalchemy engine
吗?【参考方案2】:
更快的选择:
以下代码将比 df.to_sql 方法更快地将您的 Pandas DF 复制到 postgres DB,并且您不需要任何中间 csv 文件来存储 df。
根据您的数据库规范创建引擎。
在您的 postgres 数据库中创建一个与 Dataframe (df) 具有相同列数的表。
DF 中的数据将在您的 postgres 表中插入。
from sqlalchemy import create_engine
import psycopg2
import io
如果你想替换表,我们可以使用普通的 to_sql 方法使用我们的 df 的 headers 替换它,然后将整个耗时的 df 加载到 DB 中。
engine = create_engine('postgresql+psycopg2://username:password@host:port/database')
df.head(0).to_sql('table_name', engine, if_exists='replace',index=False) #drops old table and creates new empty table
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()
【讨论】:
你为什么要output.seek(0)
?
这太快了,真有趣:D
Load is table 对我来说失败了,因为某些字段中有换行符。我该如何处理? df.to_csv(output, sep='\t', header=False, index=False, encoding='utf-8') cur.copy_from(output, 'messages', null="") # null 值变成''
contents = output.getvalue()抛出内存错误怎么办???
如果要使用schema,可以在to_sql
部分代码中添加schema=your_schema
参数。【参考方案3】:
Pandas 0.24.0+ 解决方案
在 Pandas 0.24.0 中引入了一项新功能,专为快速写入 Postgres 而设计。你可以在这里了解更多信息:https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
import csv
from io import StringIO
from sqlalchemy import create_engine
def psql_insert_copy(table, conn, keys, data_iter):
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join('""'.format(k) for k in keys)
if table.schema:
table_name = '.'.format(table.schema, table.name)
else:
table_name = table.name
sql = 'COPY () FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)
【讨论】:
在大多数情况下,添加method='multi'
选项就足够快了。但是,是的,这种COPY
方法是目前最快的方法。
这仅适用于 csv 吗?它也可以与 .xlsx 一起使用吗?关于这部分内容的一些注释会有所帮助。 with
之后的第一部分是写入内存缓冲区。 with
的最后一部分是使用 SQL 语句并利用 copy_expert 的速度来批量加载数据。 columns =
开头的中间部分是做什么的?
这对我来说效果很好。你能解释一下psql_insert_copy
函数中的keys
参数吗?它如何获得任何键并且键只是列名?
我尝试过使用这种方法,但是它会抛出一个错误:Table 'XYZ' already exists
。据我了解,它不应该创建一个表,不是吗?
@E.Epstein - 您可以将最后一行修改为 df.to_sql('table_name', engine, if_exists='replace', method=psql_insert_copy)
- 这确实会在您的数据库中创建一个表。【参考方案4】:
我就是这样做的。
可能会更快,因为它使用的是execute_batch
:
# df is the dataframe
if len(df) > 0:
df_columns = list(df)
# create (col1,col2,...)
columns = ",".join(df_columns)
# create VALUES('%s', '%s",...) one '%s' per column
values = "VALUES()".format(",".join(["%s" for _ in df_columns]))
#create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = "INSERT INTO () ".format(table,columns,values)
cur = conn.cursor()
psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
conn.commit()
cur.close()
【讨论】:
我得到 AttributeError:模块 'psycopg2' 没有属性 'extras'。啊,这需要显式导入。导入 psycopg2.extras 这个函数比sqlalchemy方案快很多【参考方案5】:创建引擎(其中 dialect='postgres' 或 'mysql' 等):
from sqlalchemy import create_engine
engine = create_engine(f'dialect://user_name@host:port/db_name')
Session = sessionmaker(bind=engine)
with Session() as session:
df = pd.read_csv(path + f'/file')
df.to_sql('table_name', con=engine, if_exists='append',index=False)
【讨论】:
它适用于大多数数据库,包括 postgres。您必须在引擎中指定方言 = create_engine(dialect='postgres', etc....)【参考方案6】:使用 psycopg2,您可以使用本机 sql 命令将数据写入 postgres 表。
import psycopg2
import pandas as pd
conn = psycopg2.connect("dbname='db' user='user' host='host' port='port' password='passwd'".format(
user=pg_user,
passwd=pg_pass,
host=pg_host,
port=pg_port,
db=pg_db))
cur = conn.cursor()
def insertIntoTable(df, table):
"""
Using cursor.executemany() to insert the dataframe
"""
# Create a list of tupples from the dataframe values
tuples = list(set([tuple(x) for x in df.to_numpy()]))
# Comma-separated dataframe columns
cols = ','.join(list(df.columns))
# SQL query to execute
query = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s)" % (
table, cols)
try:
cur.executemany(query, tuples)
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
return 1
【讨论】:
一个好的答案将始终包括解释为什么这会解决问题,以便 OP 和任何未来的读者可以从中学习。【参考方案7】:适用于 Python 2.7 和 Pandas 0.24.2 并使用 Psycopg2
Psycopg2 连接模块
def dbConnect (db_parm, username_parm, host_parm, pw_parm):
# Parse in connection information
credentials = 'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm
conn = psycopg2.connect(**credentials)
conn.autocommit = True # auto-commit each entry to the database
conn.cursor_factory = RealDictCursor
cur = conn.cursor()
print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
return conn, cur
连接到数据库
conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)
假设数据框已经作为 df 存在
output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL '' ESCAPE '\\' HEADER " # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()
【讨论】:
以上是关于如何将 DataFrame 写入 postgres 表?的主要内容,如果未能解决你的问题,请参考以下文章
将 Pyspark 数据帧加载到 postgres RDS 中的表中时出错
Spark SQL - 如何将 DataFrame 写入文本文件?