如何使用 Snowflake sql 查询的结果填充 pandas DataFrame?
Posted
技术标签:
【中文标题】如何使用 Snowflake sql 查询的结果填充 pandas DataFrame?【英文标题】:How can I populate a pandas DataFrame with the result of a Snowflake sql query? 【发布时间】:2019-04-06 11:15:12 【问题描述】:使用Python Connector我可以查询雪花:
import snowflake.connector
# Gets the version
ctx = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
authenticator='https://XXXX.okta.com',
)
ctx.cursor().execute('USE warehouse MY_WH')
ctx.cursor().execute('USE MYDB.MYSCHEMA')
query = '''
select * from MYDB.MYSCHEMA.MYTABLE
LIMIT 10;
'''
cur = ctx.cursor().execute(query)
结果是snowflake.connector.cursor.SnowflakeCursor
。如何将其转换为 pandas DataFrame?
【问题讨论】:
【参考方案1】:您可以将DataFrame.from_records()
或pandas.read_sql()
与snowflake-sqlalchemy 一起使用。 snowflake-alchemy 选项的 API 更简单
pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])
将返回一个带有从 SQL 结果中获取的正确列名的 DataFrame。 iter(cur)
将光标转换为迭代器,cur.description
给出列的名称和类型。
所以完整的代码将是
import snowflake.connector
import pandas as pd
# Gets the version
ctx = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
authenticator='https://XXXX.okta.com',
)
ctx.cursor().execute('USE warehouse MY_WH')
ctx.cursor().execute('USE MYDB.MYSCHEMA')
query = '''
select * from MYDB.MYSCHEMA.MYTABLE
LIMIT 10;
'''
cur = ctx.cursor().execute(query)
df = pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])
如果你更喜欢使用pandas.read_sql
,那么你可以
import pandas as pd
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
url = URL(
account = 'xxxx',
user = 'xxxx',
password = 'xxxx',
database = 'xxx',
schema = 'xxxx',
warehouse = 'xxx',
role='xxxxx',
authenticator='https://xxxxx.okta.com',
)
engine = create_engine(url)
connection = engine.connect()
query = '''
select * from MYDB.MYSCHEMA.MYTABLE
LIMIT 10;
'''
df = pd.read_sql(query, connection)
【讨论】:
谢谢!为我工作。 @ecerulm 我正在使用 anaconda,当我使用你的第一个版本时无法从“snowflake.sqlalchemy 导入 URL”安装。还有其他想法吗? @Maths12 你需要安装 snowflake python connector 。我不使用 anaconda,但我猜是conda install -c conda-forge snowflake-connector-python
或 pip install --upgrade snowflake-connector-python
@ecerulm 我已经安装了它,但我仍然遇到同样的错误
您还需要snowflake-sqlalchemy
,如Using the Snowflake SQLAlchemy Toolkit with the Python Connector中所述【参考方案2】:
现在有一个方法.fetch_pandas.all()
用于此,不再需要 SQL Alchemy。
请注意,您需要通过这样做为 pandas 安装 snowflake.connector
pip install snowflake-connector-python[pandas]
完整文档here
import pandas as pd
import snowflake.connector
conn = snowflake.connector.connect(
user="xxx",
password="xxx",
account="xxx",
warehouse="xxx",
database="MYDB",
schema="MYSCHEMA"
)
cur = conn.cursor()
# Execute a statement that will generate a result set.
sql = "select * from MYTABLE limit 10"
cur.execute(sql)
# Fetch the result set from the cursor and deliver it as the Pandas DataFrame.
df = cur.fetch_pandas_all()
【讨论】:
不幸的是我不能让它工作;关于 PyArrow 的错误。你遇到过这个问题吗?【参考方案3】:我只想在此处对代码进行一些小改动,以确保列具有正确的名称(在我的例子中,fetch 调用返回的长列名称包含名称本身之外的信息)。我把它留在这里,以防有人需要它:
import snowflake.connector
import pandas as pd
def fetch_pandas(cur, sql):
cur.execute(sql)
rows = 0
while True:
dat = cur.fetchmany(n)
if not dat:
break
a = [cursor.description[i][0] for i in range(len(cursor.description))]
df = pd.DataFrame(dat, columns=a)
rows += df.shape[0]
return df
n = 100000
conn = snowflake.connector.connect(
user='xxxxx',
password='yyyyyy',
account='zzzzz',
warehouse = 'wwwww',
database = 'mmmmmm',
schema = 'nnnnn'
)
cursor = conn.cursor()
fetch_pandas(cursor, 'select * from "mmmmmm"."wwwww"."table"')
【讨论】:
以上是关于如何使用 Snowflake sql 查询的结果填充 pandas DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Snowflake 中编写等效的 IF ELSE adhoc sql 查询
如何根据 Snowflake 中先前查询的结果提交/回滚事务?
如何从 Snowflake SQL 查询创建 PySpark pandas-on-Spark DataFrame?
如何克服 Snowflake SQL UDTF 相关子查询错误?