如何使用 Snowflake sql 查询的结果填充 pandas DataFrame?

Posted

技术标签:

【中文标题】如何使用 Snowflake sql 查询的结果填充 pandas DataFrame?【英文标题】:How can I populate a pandas DataFrame with the result of a Snowflake sql query? 【发布时间】:2019-04-06 11:15:12 【问题描述】:

使用Python Connector我可以查询雪花:

import snowflake.connector

# Gets the version
ctx = snowflake.connector.connect(
    user=USER,
    password=PASSWORD,
    account=ACCOUNT,
    authenticator='https://XXXX.okta.com',
    )
ctx.cursor().execute('USE warehouse MY_WH')
ctx.cursor().execute('USE MYDB.MYSCHEMA')


query = '''
select * from MYDB.MYSCHEMA.MYTABLE
LIMIT 10;
'''

cur = ctx.cursor().execute(query)

结果是snowflake.connector.cursor.SnowflakeCursor。如何将其转换为 pandas DataFrame?

【问题讨论】:

【参考方案1】:

您可以将DataFrame.from_records()pandas.read_sql() 与snowflake-sqlalchemy 一起使用。 snowflake-alchemy 选项的 API 更简单

pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])

将返回一个带有从 SQL 结果中获取的正确列名的 DataFrame。 iter(cur) 将光标转换为迭代器,cur.description 给出列的名称和类型。

所以完整的代码将是

import snowflake.connector
import pandas as pd

# Gets the version
ctx = snowflake.connector.connect(
    user=USER,
    password=PASSWORD,
    account=ACCOUNT,
    authenticator='https://XXXX.okta.com',
    )
ctx.cursor().execute('USE warehouse MY_WH')
ctx.cursor().execute('USE MYDB.MYSCHEMA')


query = '''
select * from MYDB.MYSCHEMA.MYTABLE
LIMIT 10;
'''

cur = ctx.cursor().execute(query)
df = pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])

如果你更喜欢使用pandas.read_sql,那么你可以

import pandas as pd

from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL


url = URL(
    account = 'xxxx',
    user = 'xxxx',
    password = 'xxxx',
    database = 'xxx',
    schema = 'xxxx',
    warehouse = 'xxx',
    role='xxxxx',
    authenticator='https://xxxxx.okta.com',
)
engine = create_engine(url)


connection = engine.connect()

query = '''
select * from MYDB.MYSCHEMA.MYTABLE
LIMIT 10;
'''

df = pd.read_sql(query, connection)

【讨论】:

谢谢!为我工作。 @ecerulm 我正在使用 anaconda,当我使用你的第一个版本时无法从“snowflake.sqlalchemy 导入 URL”安装。还有其他想法吗? @Maths12 你需要安装 snowflake python connector 。我不使用 anaconda,但我猜是 conda install -c conda-forge snowflake-connector-pythonpip install --upgrade snowflake-connector-python @ecerulm 我已经安装了它,但我仍然遇到同样的错误 您还需要snowflake-sqlalchemy,如Using the Snowflake SQLAlchemy Toolkit with the Python Connector中所述【参考方案2】:

现在有一个方法.fetch_pandas.all() 用于此,不再需要 SQL Alchemy。

请注意,您需要通过这样做为 pandas 安装 snowflake.connector

pip install snowflake-connector-python[pandas]

完整文档here

import pandas as pd
import snowflake.connector

conn = snowflake.connector.connect(
            user="xxx",
            password="xxx",
            account="xxx",
            warehouse="xxx",
            database="MYDB",
            schema="MYSCHEMA"
            )

cur = conn.cursor()

# Execute a statement that will generate a result set.
sql = "select * from MYTABLE limit 10"
cur.execute(sql)
# Fetch the result set from the cursor and deliver it as the Pandas DataFrame.
df = cur.fetch_pandas_all()

【讨论】:

不幸的是我不能让它工作;关于 PyArrow 的错误。你遇到过这个问题吗?【参考方案3】:

我只想在此处对代码进行一些小改动,以确保列具有正确的名称(在我的例子中,fetch 调用返回的长列名称包含名称本身之外的信息)。我把它留在这里,以防有人需要它:

import snowflake.connector
import pandas as pd

def fetch_pandas(cur, sql):
    cur.execute(sql)
    rows = 0
    while True:
        dat = cur.fetchmany(n)
        if not dat:
            break
        a = [cursor.description[i][0] for i in range(len(cursor.description))]
        df = pd.DataFrame(dat, columns=a)
        rows += df.shape[0]
    return df

n = 100000
conn = snowflake.connector.connect(
    user='xxxxx',
    password='yyyyyy',
    account='zzzzz',
    warehouse = 'wwwww',
    database = 'mmmmmm',
    schema = 'nnnnn'
    )

cursor = conn.cursor()

fetch_pandas(cursor, 'select * from "mmmmmm"."wwwww"."table"')

【讨论】:

以上是关于如何使用 Snowflake sql 查询的结果填充 pandas DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Snowflake 中编写等效的 IF ELSE adhoc sql 查询

如何根据 Snowflake 中先前查询的结果提交/回滚事务?

如何从 Snowflake SQL 查询创建 PySpark pandas-on-Spark DataFrame?

如何克服 Snowflake SQL UDTF 相关子查询错误?

如何使用 Python(SSO 身份验证)在 Snowflake 中进行查询?

如何替换 snowflake.execute 语句中的循环结果?