使用 Pandas .to_sql 将 JSON 列写入 Postgres

Posted

技术标签:

【中文标题】使用 Pandas .to_sql 将 JSON 列写入 Postgres【英文标题】:Writing JSON column to Postgres using Pandas .to_sql 【发布时间】:2017-05-19 01:48:21 【问题描述】:

在ETL 过程中,我需要从一个 Postgres 数据库中提取 JSON 列并将其加载到另一个。我们为此使用 Pandas,因为它有很多方法可以从不同的源/目标读取和写入数据,并且所有转换都可以使用 Python 和 Pandas 编写。老实说,我们对这种方法感到非常满意。但我们遇到了问题。

通常读取和写入数据非常容易。您只需使用pandas.read_sql_table 从源读取数据并使用pandas.to_sql 将其写入目标。但是,由于其中一个源表具有 JSON 类型的列(来自 Postgres),to_sql 函数崩溃并显示以下错误消息。

    df.to_sql(table_name, analytics_db)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/core/generic.py", line 1201, in to_sql
    chunksize=chunksize, dtype=dtype)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 470, in to_sql
    chunksize=chunksize, dtype=dtype)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 1147, in to_sql
    table.insert(chunksize)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 663, in insert
    self._execute_insert(conn, keys, chunk_iter)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 638, in _execute_insert
    conn.execute(self.insert_statement(), data)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
    context)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", line 459, in do_executemany
    cursor.executemany(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'

【问题讨论】:

【参考方案1】:

我一直在网上搜索解决方案,但找不到任何解决方案,所以这就是我们想出的(可能有更好的方法,但如果其他人遇到这种情况,至少这是一个开始)。

to_sql中指定dtype参数。

我们从:df.to_sql(table_name, analytics_db) 转到 df.to_sql(table_name, analytics_db, dtype='name_of_json_column_in_source_table': sqlalchemy.types.JSON),它确实有效。

【讨论】:

dtype='name_of_json_column_in_source_table': sqlalchemy.types.JSON 如果源数据的类型 = jsonp,这也有效 设置 dtype='name_of_json_column_in_source_table': sqlalchemy.types.JSON 即使在将 pandas 数据框中的 dicts 列写入 mysql 8.0.19 中的 JSON 列时也有效。谢谢!【参考方案2】:

如果您(重新)使用 json.dumps() 创建 JSON 列,则一切就绪。 这样可以使用 pandas 的 .to_sql() 方法写入数据,也可以使用 PostgreSQL 的更快的 COPY 方法(通过 psycopg2 的 copy_expert() 或 sqlalchemy 的 raw_connection())。

为了简单起见,假设我们有一列字典应该写入 JSON(B) 列:

import json
import pandas as pd

df = pd.DataFrame([['row1','a':1, 'b':2],
                   ['row2','a':3,'b':4,'c':'some text']],
                  columns=['r','kv'])

# conversion function:
def dict2json(dictionary):
    return json.dumps(dictionary, ensure_ascii=False)

# overwrite the dict column with json-strings
df['kv'] = df.kv.map(dict2json)

【讨论】:

【参考方案3】:

我无法评论 peralmq's answer,但如果是 postgresql JSONB,您可以使用

from sqlalchemy import dialects
dataframe.to_sql(..., dtype="json_column":dialects.postgresql.JSONB)

【讨论】:

以上是关于使用 Pandas .to_sql 将 JSON 列写入 Postgres的主要内容,如果未能解决你的问题,请参考以下文章

为啥 dask 的“to_sql”比 pandas 花费更多时间?

使用 SqlAlchemy 和 cx_Oracle 将 Pandas DataFrame 写入 Oracle 数据库时加快 to_sql()

pandas中的to_sql的使用

使用 pyODBC 的 fast_executemany 加速 pandas.DataFrame.to_sql

pandas.DataFrame.to_sql - 源 csv 文件和目标表的列顺序

Pandas 与 to_sql 的 ODBC 连接