如何从 Snowflake SQL 查询创建 PySpark pandas-on-Spark DataFrame?

Posted

技术标签:

【中文标题】如何从 Snowflake SQL 查询创建 PySpark pandas-on-Spark DataFrame?【英文标题】:How to create a PySpark pandas-on-Spark DataFrame from Snowflake SQL query? 【发布时间】:2021-10-25 17:03:24 【问题描述】:

注意:需要使用分布式处理,这就是我使用Pandas API on Spark的原因。

为了创建 pandas-on-Spark DataFrame,我尝试了 2 种不同的方法(概述如下:“OPTION 1”、“OPTION 2”)。

这些选项是否可行?如果是这样,我该如何处理给定的错误(在下面的“ISSUE(S)”和“OPTION 2”的错误日志中概述)?

或者,我是否应该从 PySpark SQL Pandas UDFs 开始查询,然后转换为 pandas-on-Spark DataFrame?

# (Spark 3.2.0, Scala 2.12, DBR 10.0)

##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## I. Import libraries & dependencies
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
import numpy as np
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import Column
from pyspark.sql.functions import *

##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## II. Load data + create Spark DataFrames
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
df_1 = spark.read.format("snowflake").options(**options).option("query","SELECT PROPERTY_ID,AVGRENT_MARKET FROM schema_1").load()
df_2 = spark.read.format("snowflake").options(**options).option("query","SELECT PROPERTY_ID,PROPERTY_ZIPCODE FROM schema_2").load()

##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## III. OPTION 1: Union Spark DataFrames
##      ISSUE(S): Results in 'None' values in PROPERTY_ZIPCODE column
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##

## Create merged dataframe from two Spark Dataframes
# df_3 = df_1.unionByName(df_2, allowMissingColumns=True)

##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## III. OPTION 2: Create Spark SQL DataFrame from SQL tables 
##      ISSUE(S): "AnalysisException: Reference 'PROPERTY_ID' is ambiguous, could be: table_1.PROPERTY_ID, table_2.PROPERTY_ID."
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##

## Create tables from two Spark DataFrames
df_1.createOrReplaceTempView("Table_1")
df_2.createOrReplaceTempView("Table_2")

## Specify SQL Snowflake query to merge tables
merge_tables = '''
  SELECT Table_1.PROPERTY_ID,
  Table_1.AVGRENT_MARKET,
  Table_2.PROPERTY_ID,
  Table_2.PROPERTY_ZIPCODE
  FROM Table_2 INNER JOIN Table_1 
  ON Table_2.PROPERTY_ID=Table_1.PROPERTY_ID 
  LIMIT 25
'''

## Create merged Spark SQL dataframe based on query
df_3 = spark.sql(merge_tables)

##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## Create a pandas-on-Spark DataFrame
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
df_3 = ps.DataFrame(df_3)
# df_3 = df_3.to_pandas_on_spark() # Alternative conversion option

“选项 2”的错误日志:

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<command-2142959205032388> in <module>
     52 ##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
     53 # df_3 = ps.DataFrame(df_3)
---> 54 df_3 = df_3.to_pandas_on_spark() # Alternative conversion option

/databricks/spark/python/pyspark/sql/dataframe.py in to_pandas_on_spark(self, index_col)
   2777 
   2778         index_spark_columns, index_names = _get_index_map(self, index_col)
-> 2779         internal = InternalFrame(
   2780             spark_frame=self, index_spark_columns=index_spark_columns, index_names=index_names
   2781         )

/databricks/spark/python/pyspark/pandas/internal.py in __init__(self, spark_frame, index_spark_columns, index_names, index_fields, column_labels, data_spark_columns, data_fields, column_label_names)
    633 
    634             # Create default index.
--> 635             spark_frame = InternalFrame.attach_default_index(spark_frame)
    636             index_spark_columns = [scol_for(spark_frame, SPARK_DEFAULT_INDEX_NAME)]
    637 

/databricks/spark/python/pyspark/pandas/internal.py in attach_default_index(sdf, default_index_type)
    865 
    866         if default_index_type == "sequence":
--> 867             return InternalFrame.attach_sequence_column(sdf, column_name=index_column)
    868         elif default_index_type == "distributed-sequence":
    869             return InternalFrame.attach_distributed_sequence_column(sdf, column_name=index_column)

/databricks/spark/python/pyspark/pandas/internal.py in attach_sequence_column(sdf, column_name)
    878     @staticmethod
    879     def attach_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
--> 880         scols = [scol_for(sdf, column) for column in sdf.columns]
    881         sequential_index = (
    882             F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).cast("long") - 1

/databricks/spark/python/pyspark/pandas/internal.py in <listcomp>(.0)
    878     @staticmethod
    879     def attach_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
--> 880         scols = [scol_for(sdf, column) for column in sdf.columns]
    881         sequential_index = (
    882             F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).cast("long") - 1

/databricks/spark/python/pyspark/pandas/utils.py in scol_for(sdf, column_name)
    590 def scol_for(sdf: SparkDataFrame, column_name: str) -> Column:
    591     """Return Spark Column for the given column name."""
--> 592     return sdf["``".format(column_name)]
    593 
    594 

/databricks/spark/python/pyspark/sql/dataframe.py in __getitem__(self, item)
   1657         """
   1658         if isinstance(item, str):
-> 1659             jc = self._jdf.apply(item)
   1660             return Column(jc)
   1661         elif isinstance(item, Column):

/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise

AnalysisException: Reference 'PROPERTY_ID' is ambiguous, could be: table_1.PROPERTY_ID, table_2.PROPERTY_ID.

【问题讨论】:

【参考方案1】:

如果您想要的只是一个连接,那么请改用 Spark join function。它更加清洁和可维护。

df_1 = spark.read...load()
df_2 = spark.read...load()

df_3 = df_1.join(df_2, on=['PROPERTY_ID'], how='inner')

【讨论】:

谢谢!这很好用,而且很简约 不客气,如果你觉得有帮助,请投票并接受我的回答:) 我有至少 15 个声望可以投票!

以上是关于如何从 Snowflake SQL 查询创建 PySpark pandas-on-Spark DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

如何通过查询从Snowflake中创建特定视图的表

snowflake.com / 意见 / sdk

如何使用 Snowflake sql 查询的结果填充 pandas DataFrame?

如何克服 Snowflake SQL UDTF 相关子查询错误?

如何在 powerBi 的高级编辑器中编写 Snowflake SELECT 语句查询

我们如何在 Snowflake 中使用子查询,从 (select....) 中选择列失败