使用 Python 将数据写入雪花

Posted

技术标签:

【中文标题】使用 Python 将数据写入雪花【英文标题】:Writing data into snowflake using Python 【发布时间】:2020-08-27 12:50:19 【问题描述】:

我们可以直接将数据写入雪花表而不使用Python使用雪花内部阶段吗????

先在stage中写入,然后转换,然后加载到表中,这似乎是辅助任务。是否可以像RDBMS中的JDBC连接一样一步完成。

【问题讨论】:

【参考方案1】:

将数据加载到 Snowflake 中绝对最快的方法是从内部或外部阶段的文件中加载。时期。所有连接器都能够使用标准插入命令插入数据,但这不会很好。也就是说,许多 Snowflake 驱动程序现在透明地使用 PUT/COPY 命令通过内部阶段将大数据加载到 Snowflake。如果这是您所追求的,那么您可以利用 pandas write_pandas 命令在单个命令中将数据从 pandas 数据帧加载到 Snowflake。在幕后,它将为您执行 PUT 和 COPY INTO。

https://docs.snowflake.com/en/user-guide/python-connector-api.html#label-python-connector-api-write-pandas

我强烈推荐这种模式,而不是任何驱动程序中的 INSERT 命令。而且我还建议在加载到雪花之后进行转换,而不是之前。

【讨论】:

另外,查看write_pandasparallelchunk_size 参数。调整这些可能会对总加载时间产生巨大影响。【参考方案2】:

如果有人在处理大型数据集时遇到问题。尝试使用 dask 并生成划分为块的数据帧。然后您可以将 dask.delayed 与 sqlalchemy 一起使用。在这里,我们使用雪花的本机连接器方法,即 pd_writer,它在后台使用 write_pandas 并最终使用 PUT COPY 和压缩的 parquet 文件。最后,老实说,它归结为您的 I/O 带宽。您拥有的吞吐量越多,它在雪花表中加载的速度就越快。但是这个 sn-p 总体上提供了相当多的并行度。

import functools
from dask.diagnostics import ProgressBar
from snowflake.connector.pandas_tools import pd_writer
import dask.dataframe as dd
df = dd.read_csv(csv_file_path, blocksize='64MB')
ddf_delayed = df.to_sql(
        table_name.lower(),
        uri=str(engine.url),
        schema=schema_name,
        if_exists=if_exists,
        index=False,
        method=functools.partial(
        pd_writer,quote_identifiers=False),
        compute=False,
        parallel=True
    )
with ProgressBar():
    dask.compute(ddf_delayed, scheduler='threads', retries=3)

【讨论】:

感谢您的回复 ....但它再次使用舞台仅加载数据 是的,但这就是雪花推荐的方法,因为这是将数据放入雪花表的最快方法。但是当然可以像 pandas 一样使用 sqlalchemy 和 dask to_sql 方法。【参考方案3】:

Java:

加载驱动类:

Class.forName("net.snowflake.client.jdbc.SnowflakeDriver")

Maven:

添加以下代码块作为依赖项

<dependency>
  <groupId>net.snowflake</groupId>
  <artifactId>snowflake-jdbc</artifactId>
  <version>version</version>

春天:

application.yml:

    spring:
      datasource
        hikari:
          maximumPoolSize: 4 # Specify maximum pool size
          minimumIdle: 1 # Specify minimum pool size
          driver-class-name: com.snowflake.client.jdbc.SnowflakeDriver

Python:

    import pyodbc

    # pyodbc connection string
    conn = pyodbc.connect("Driver=SnowflakeDSIIDriver; Server=XXX.us-east-2.snowflakecomputing.com; Database=VAQUARKHAN_DB; schema=public; UID=username; PWD=password")

    #  Cursor
    cus=conn.cursor()

    # Execute SQL statement to get current datetime and store result in cursor
    cus.execute("select current_date;")

    # Display the content of cursor
    row = cus.fetchone()

    print(row)
How to insert json response data in snowflake database more efficiently?

Apache Spark:

    <dependency>
         <groupId>net.snowflake</groupId>
         <artifactId>spark-snowflake_2.11</artifactId>
         <version>2.5.9-spark_2.4</version>
    </dependency>

代码

   import org.apache.spark.sql.DataFrame
   import org.apache.spark.sql.DataFrame


        / Use secrets DBUtil to get Snowflake credentials.
        val user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
        val password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")

        val options = Map(
          "sfUrl" -> "<snowflake-url>",
          "sfUser" -> user,
          "sfPassword" -> password,
          "sfDatabase" -> "<snowflake-database>",
          "sfSchema" -> "<snowflake-schema>",
          "sfWarehouse" -> "<snowflake-cluster>"
        )


        // Generate a simple dataset containing five values and write the dataset to Snowflake.
        spark.range(5).write
          .format("snowflake")
          .options(options)
          .option("dbtable", "<snowflake-database>")
          .save()


        // Read the data written by the previous cell back.
        val df: DataFrame = spark.read
          .format("snowflake")
          .options(options)
          .option("dbtable", "<snowflake-database>")
          .load()

        display(df)

将数据加载到 Snowflake 中的最快方法是从文件中

https://community.snowflake.com/s/article/How-to-Load-Terabytes-Into-Snowflake-Speeds-Feeds-and-Techniques https://bryteflow.com/how-to-load-terabytes-of-data-to-snowflake-fast/
https://www.snowflake.com/blog/ability-to-connect-to-snowflake-with-jdbc/ https://docs.snowflake.com/en/user-guide/jdbc-using.html https://www.persistent.com/blogs/json-processing-in-spark-snowflake-a-comparison/

【讨论】:

以上是关于使用 Python 将数据写入雪花的主要内容,如果未能解决你的问题,请参考以下文章

什么应用程序可以读取和写入数据到雪花?

将数据从雪花卸载到 s3 时,如何将日期时间戳添加到 zip 文件?

有没有办法将 csv 数据加载到雪花表中并报告每条记录是不是已成功加载? (使用 Python)

使用 Python/Scala 的 Databricks 雪花表

连接到雪花连接器时在笔记本中运行报告的最佳方法是什么?

Kafka JDBC Sink Connector 在雪花中找不到表