如何从 Snowflake SQL 查询创建 PySpark pandas-on-Spark DataFrame?
Posted
技术标签:
【中文标题】如何从 Snowflake SQL 查询创建 PySpark pandas-on-Spark DataFrame?【英文标题】:How to create a PySpark pandas-on-Spark DataFrame from Snowflake SQL query? 【发布时间】:2021-10-25 17:03:24 【问题描述】:注意:需要使用分布式处理,这就是我使用Pandas API on Spark的原因。
为了创建 pandas-on-Spark DataFrame,我尝试了 2 种不同的方法(概述如下:“OPTION 1”、“OPTION 2”)。
这些选项是否可行?如果是这样,我该如何处理给定的错误(在下面的“ISSUE(S)”和“OPTION 2”的错误日志中概述)?
或者,我是否应该从 PySpark SQL Pandas UDFs 开始查询,然后转换为 pandas-on-Spark DataFrame?
# (Spark 3.2.0, Scala 2.12, DBR 10.0)
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## I. Import libraries & dependencies
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
import numpy as np
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import Column
from pyspark.sql.functions import *
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## II. Load data + create Spark DataFrames
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
df_1 = spark.read.format("snowflake").options(**options).option("query","SELECT PROPERTY_ID,AVGRENT_MARKET FROM schema_1").load()
df_2 = spark.read.format("snowflake").options(**options).option("query","SELECT PROPERTY_ID,PROPERTY_ZIPCODE FROM schema_2").load()
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## III. OPTION 1: Union Spark DataFrames
## ISSUE(S): Results in 'None' values in PROPERTY_ZIPCODE column
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## Create merged dataframe from two Spark Dataframes
# df_3 = df_1.unionByName(df_2, allowMissingColumns=True)
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## III. OPTION 2: Create Spark SQL DataFrame from SQL tables
## ISSUE(S): "AnalysisException: Reference 'PROPERTY_ID' is ambiguous, could be: table_1.PROPERTY_ID, table_2.PROPERTY_ID."
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## Create tables from two Spark DataFrames
df_1.createOrReplaceTempView("Table_1")
df_2.createOrReplaceTempView("Table_2")
## Specify SQL Snowflake query to merge tables
merge_tables = '''
SELECT Table_1.PROPERTY_ID,
Table_1.AVGRENT_MARKET,
Table_2.PROPERTY_ID,
Table_2.PROPERTY_ZIPCODE
FROM Table_2 INNER JOIN Table_1
ON Table_2.PROPERTY_ID=Table_1.PROPERTY_ID
LIMIT 25
'''
## Create merged Spark SQL dataframe based on query
df_3 = spark.sql(merge_tables)
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
## Create a pandas-on-Spark DataFrame
##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
df_3 = ps.DataFrame(df_3)
# df_3 = df_3.to_pandas_on_spark() # Alternative conversion option
“选项 2”的错误日志:
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<command-2142959205032388> in <module>
52 ##~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~##
53 # df_3 = ps.DataFrame(df_3)
---> 54 df_3 = df_3.to_pandas_on_spark() # Alternative conversion option
/databricks/spark/python/pyspark/sql/dataframe.py in to_pandas_on_spark(self, index_col)
2777
2778 index_spark_columns, index_names = _get_index_map(self, index_col)
-> 2779 internal = InternalFrame(
2780 spark_frame=self, index_spark_columns=index_spark_columns, index_names=index_names
2781 )
/databricks/spark/python/pyspark/pandas/internal.py in __init__(self, spark_frame, index_spark_columns, index_names, index_fields, column_labels, data_spark_columns, data_fields, column_label_names)
633
634 # Create default index.
--> 635 spark_frame = InternalFrame.attach_default_index(spark_frame)
636 index_spark_columns = [scol_for(spark_frame, SPARK_DEFAULT_INDEX_NAME)]
637
/databricks/spark/python/pyspark/pandas/internal.py in attach_default_index(sdf, default_index_type)
865
866 if default_index_type == "sequence":
--> 867 return InternalFrame.attach_sequence_column(sdf, column_name=index_column)
868 elif default_index_type == "distributed-sequence":
869 return InternalFrame.attach_distributed_sequence_column(sdf, column_name=index_column)
/databricks/spark/python/pyspark/pandas/internal.py in attach_sequence_column(sdf, column_name)
878 @staticmethod
879 def attach_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
--> 880 scols = [scol_for(sdf, column) for column in sdf.columns]
881 sequential_index = (
882 F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).cast("long") - 1
/databricks/spark/python/pyspark/pandas/internal.py in <listcomp>(.0)
878 @staticmethod
879 def attach_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
--> 880 scols = [scol_for(sdf, column) for column in sdf.columns]
881 sequential_index = (
882 F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).cast("long") - 1
/databricks/spark/python/pyspark/pandas/utils.py in scol_for(sdf, column_name)
590 def scol_for(sdf: SparkDataFrame, column_name: str) -> Column:
591 """Return Spark Column for the given column name."""
--> 592 return sdf["``".format(column_name)]
593
594
/databricks/spark/python/pyspark/sql/dataframe.py in __getitem__(self, item)
1657 """
1658 if isinstance(item, str):
-> 1659 jc = self._jdf.apply(item)
1660 return Column(jc)
1661 elif isinstance(item, Column):
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
121 # Hide where the exception came from that shows a non-Pythonic
122 # JVM exception message.
--> 123 raise converted from None
124 else:
125 raise
AnalysisException: Reference 'PROPERTY_ID' is ambiguous, could be: table_1.PROPERTY_ID, table_2.PROPERTY_ID.
【问题讨论】:
【参考方案1】:如果您想要的只是一个连接,那么请改用 Spark join function。它更加清洁和可维护。
df_1 = spark.read...load()
df_2 = spark.read...load()
df_3 = df_1.join(df_2, on=['PROPERTY_ID'], how='inner')
【讨论】:
谢谢!这很好用,而且很简约 不客气,如果你觉得有帮助,请投票并接受我的回答:) 我有至少 15 个声望可以投票!以上是关于如何从 Snowflake SQL 查询创建 PySpark pandas-on-Spark DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Snowflake sql 查询的结果填充 pandas DataFrame?
如何克服 Snowflake SQL UDTF 相关子查询错误?