使用 Python 将数据写入雪花
Posted
技术标签:
【中文标题】使用 Python 将数据写入雪花【英文标题】:Writing data into snowflake using Python 【发布时间】:2020-08-27 12:50:19 【问题描述】:我们可以直接将数据写入雪花表而不使用Python使用雪花内部阶段吗????
先在stage中写入,然后转换,然后加载到表中,这似乎是辅助任务。是否可以像RDBMS中的JDBC连接一样一步完成。
【问题讨论】:
【参考方案1】:将数据加载到 Snowflake 中绝对最快的方法是从内部或外部阶段的文件中加载。时期。所有连接器都能够使用标准插入命令插入数据,但这不会很好。也就是说,许多 Snowflake 驱动程序现在透明地使用 PUT/COPY 命令通过内部阶段将大数据加载到 Snowflake。如果这是您所追求的,那么您可以利用 pandas write_pandas
命令在单个命令中将数据从 pandas 数据帧加载到 Snowflake。在幕后,它将为您执行 PUT 和 COPY INTO。
https://docs.snowflake.com/en/user-guide/python-connector-api.html#label-python-connector-api-write-pandas
我强烈推荐这种模式,而不是任何驱动程序中的 INSERT 命令。而且我还建议在加载到雪花之后进行转换,而不是之前。
【讨论】:
另外,查看write_pandas
的parallel
和chunk_size
参数。调整这些可能会对总加载时间产生巨大影响。【参考方案2】:
如果有人在处理大型数据集时遇到问题。尝试使用 dask 并生成划分为块的数据帧。然后您可以将 dask.delayed 与 sqlalchemy 一起使用。在这里,我们使用雪花的本机连接器方法,即 pd_writer,它在后台使用 write_pandas 并最终使用 PUT COPY 和压缩的 parquet 文件。最后,老实说,它归结为您的 I/O 带宽。您拥有的吞吐量越多,它在雪花表中加载的速度就越快。但是这个 sn-p 总体上提供了相当多的并行度。
import functools
from dask.diagnostics import ProgressBar
from snowflake.connector.pandas_tools import pd_writer
import dask.dataframe as dd
df = dd.read_csv(csv_file_path, blocksize='64MB')
ddf_delayed = df.to_sql(
table_name.lower(),
uri=str(engine.url),
schema=schema_name,
if_exists=if_exists,
index=False,
method=functools.partial(
pd_writer,quote_identifiers=False),
compute=False,
parallel=True
)
with ProgressBar():
dask.compute(ddf_delayed, scheduler='threads', retries=3)
【讨论】:
感谢您的回复 ....但它再次使用舞台仅加载数据 是的,但这就是雪花推荐的方法,因为这是将数据放入雪花表的最快方法。但是当然可以像 pandas 一样使用 sqlalchemy 和 dask to_sql 方法。【参考方案3】:Java:
加载驱动类:
Class.forName("net.snowflake.client.jdbc.SnowflakeDriver")
Maven:
添加以下代码块作为依赖项
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>version</version>
春天:
application.yml:
spring:
datasource
hikari:
maximumPoolSize: 4 # Specify maximum pool size
minimumIdle: 1 # Specify minimum pool size
driver-class-name: com.snowflake.client.jdbc.SnowflakeDriver
Python:
import pyodbc
# pyodbc connection string
conn = pyodbc.connect("Driver=SnowflakeDSIIDriver; Server=XXX.us-east-2.snowflakecomputing.com; Database=VAQUARKHAN_DB; schema=public; UID=username; PWD=password")
# Cursor
cus=conn.cursor()
# Execute SQL statement to get current datetime and store result in cursor
cus.execute("select current_date;")
# Display the content of cursor
row = cus.fetchone()
print(row)
How to insert json response data in snowflake database more efficiently?
Apache Spark:
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>spark-snowflake_2.11</artifactId>
<version>2.5.9-spark_2.4</version>
</dependency>
代码
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
/ Use secrets DBUtil to get Snowflake credentials.
val user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
val password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")
val options = Map(
"sfUrl" -> "<snowflake-url>",
"sfUser" -> user,
"sfPassword" -> password,
"sfDatabase" -> "<snowflake-database>",
"sfSchema" -> "<snowflake-schema>",
"sfWarehouse" -> "<snowflake-cluster>"
)
// Generate a simple dataset containing five values and write the dataset to Snowflake.
spark.range(5).write
.format("snowflake")
.options(options)
.option("dbtable", "<snowflake-database>")
.save()
// Read the data written by the previous cell back.
val df: DataFrame = spark.read
.format("snowflake")
.options(options)
.option("dbtable", "<snowflake-database>")
.load()
display(df)
将数据加载到 Snowflake 中的最快方法是从文件中
https://community.snowflake.com/s/article/How-to-Load-Terabytes-Into-Snowflake-Speeds-Feeds-and-Techniques https://bryteflow.com/how-to-load-terabytes-of-data-to-snowflake-fast/https://www.snowflake.com/blog/ability-to-connect-to-snowflake-with-jdbc/ https://docs.snowflake.com/en/user-guide/jdbc-using.html https://www.persistent.com/blogs/json-processing-in-spark-snowflake-a-comparison/
【讨论】:
以上是关于使用 Python 将数据写入雪花的主要内容,如果未能解决你的问题,请参考以下文章
将数据从雪花卸载到 s3 时,如何将日期时间戳添加到 zip 文件?
有没有办法将 csv 数据加载到雪花表中并报告每条记录是不是已成功加载? (使用 Python)