SQLAlchemy,Psycopg2和Postgresql COPY

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SQLAlchemy,Psycopg2和Postgresql COPY相关的知识,希望对你有一定的参考价值。

看起来Psycopg有一个用于执行COPY的自定义命令:

psycopg2 COPY using cursor.copy_from() freezes with large inputs

有没有办法从SQLAlchemy访问此功能?

答案

It doesn't look like it

您可能只需使用psycopg2来公开此功能并放弃ORM功能。我想我在这样的操作中并没有真正看到ORM的好处,因为它是一个直接的批量插入并处理单个对象,而ORM实际上并没有太多意义。

另一答案

接受的答案是正确的,但如果你想要的不只是EoghanM的评论继续下面的工作,请将表格复制到CSV ...

from sqlalchemy import sessionmaker, create_engine

eng = create_engine("postgresql://user:pwd@host:5432/db")
ses = sessionmaker(bind=engine)

dbcopy_f = open('/tmp/some_table_copy.csv','wb')

copy_sql = 'COPY some_table TO STDOUT WITH CSV HEADER'

fake_conn = eng.raw_connection()
fake_cur = fake_conn.cursor()
fake_cur.copy_expert(copy_sql, dbcopy_f)

sessionmaker不是必需的但是如果你习惯于创建引擎和会话同时使用raw_connection你需要将它们分开(除非有某种方法通过会话对象访问引擎我不知道)。提供给copy_expert的sql字符串也不是唯一的方法,有一个基本的copy_to函数,你可以使用参数的子集,你可以通过正常的COPY TO查询。对我来说命令的整体性能似乎很快,复制出约20000行的表。

http://initd.org/psycopg/docs/cursor.html#cursor.copy_to http://docs.sqlalchemy.org/en/latest/core/connections.html#sqlalchemy.engine.Engine.raw_connection

另一答案

如果您的引擎配置了psycopg2连接字符串(这是默认设置,所以"postgresql://...""postgresql+psycopg2://..."),您可以使用SQL Alchemy会话创建一个psycopg2游标

cursor = session.connection().connection.cursor()

你可以用它来执行

cursor.copy_from(...)

光标将在与您当前会话相同的事务中处于活动状态。如果commitrollback发生,任何进一步使用光标抛出psycopg2.InterfaceError,你将不得不创建一个新的。

另一答案

您可以使用:

def to_sql(engine, df, table, if_exists='fail', sep='	', encoding='utf8'):
    # Create Table
    df[:0].to_sql(table, engine, if_exists=if_exists)

    # Prepare data
    output = cStringIO.StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    cursor.copy_from(output, table, sep=sep, null='')
    connection.commit()
    cursor.close()

我在5秒而不是4分钟内插入200000行

另一答案

您不需要下拉到psycopg2,使用raw_connection也不需要光标。

像往常一样执行sql,你甚至可以使用text()绑定参数:

engine.execute(text('''copy some_table from :csv
                       delimiter ',' csv'''
                   ).execution_options(autocommit=True),
               csv='/tmp/a.csv')

如果接受execution_options(autocommit=True),你可以放弃this PR

另一答案

如果你可以到达引擎,你就拥有了所需的一切:

engine = create_engine('postgresql+psycopg2://myuser:password@localhost/mydb')
# or 
engine = session.engine
# or any other way you know to get to the engine

现在你可以工作了。

# isolate a connection
connection = engine.connect().connection

# get the cursor
cursor = connection.cursor()

以下是与cursor.copy_expert()一起使用的COPY语句的一些模板,这是一个比copy_from()copy_to()更完整和更灵活的选项,如下所示:http://initd.org/psycopg/docs/cursor.html#cursor.copy_expert

# to dump to a file
dump_to = """
COPY mytable 
TO STDOUT
WITH (
    FORMAT CSV,
    DELIMITER ',',
    HEADER
);
"""

# to copy from a file:
copy_from = """
COPY mytable 
FROM STDIN
WITH (
    FORMAT CSV,
    DELIMITER ',',
    HEADER
);
"""

查看上述选项的含义以及您的具体情况https://www.postgresql.org/docs/current/static/sql-copy.html可能感兴趣的其他选项。

重要说明:cursor.copy_expert()文档的链接指示使用STDOUT写入文件,使用STDIN从文件复制。但是如果你看一下PostgreSQL手册上的语法,你会注意到你也可以直接在COPY语句中指定要写入或写入的文件。不要这样做,如果你不是以root身份运行(在开发过程中以root身份运行Python,那么你可能只是浪费时间?)只需执行psycopg2文档中指示的内容并使用cursor.copy_expert()在语句中指定STDIN或STDOUT ,应该没问题。

# running the copy statement
with open('/path/to/your/data/file.csv') as f:
     cursor.copy_expert(copy_from, file=f)

# don't forget to commit the changes.
connection.commit()

以上是关于SQLAlchemy,Psycopg2和Postgresql COPY的主要内容,如果未能解决你的问题,请参考以下文章

SQLAlchemy,Psycopg2和Postgresql COPY

如何使用 pandas sqlalchemy 和 psycopg2 处理 NaT

SQLAlchemy(psycopg2.ProgrammingError)无法适应类型'dict'

SQLAlchemy 还是 psycopg2?

如何在与 SQLAlchemy 和 psycopg2 的 PostgreSQL 连接上设置“lock_timeout”?

SQLAlchemy/psycopg2 到 PostgreSQL 数据库的连接是不是加密