如何解决 Pyspark 中的分析异常错误
Posted
技术标签:
【中文标题】如何解决 Pyspark 中的分析异常错误【英文标题】:How to solve analysis exception error in Pyspark 【发布时间】:2019-03-15 08:31:08 【问题描述】:我在 Pyspark 中遇到错误:
AnalysisException: u'Resolved attribute(s) week#5230 missing from
longitude#4976,address#4982,minute#4986,azimuth#4977,province#4979,
action_type#4972,user_id#4969,week#2548,month#4989,postcode#4983,location#4981
in operator !Aggregate [user_id#4969, week#5230], [user_id#4969,
week#5230, count(distinct day#4987) AS days_per_week#3605L].
Attribute(s) with the same name appear in the operation: week.
Please check if the right attribute(s) are used
这似乎来自使用agg
函数的sn-p代码:
df_rs = df_n.groupBy('user_id', 'week')
.agg(countDistinct('day').alias('days_per_week'))
.where('days_per_week >= 1')
.groupBy('user_id')
.agg(count('week').alias('weeks_per_user'))
.where('weeks_per_user >= 5').cache()
但是我在这里没有看到问题。而且我之前已经多次在相同的数据上使用过这行代码。
编辑:我一直在查看代码,错误的类型似乎来自这种连接:
df = df1.join(df2, 'user_id', 'inner')
df3 = df4.join(df1, 'user_id', 'left_anti).
但还没有解决问题。
EDIT2:不幸的是,建议的问题与我的不相似,因为这不是列名歧义的问题,而是缺少属性的问题,在检查实际数据帧时似乎没有丢失。
【问题讨论】:
Spark Dataframe distinguish columns with duplicated name的可能重复 你能提供你的数据框架构吗? 【参考方案1】:我遇到了同样的问题,并通过在加入之前将 Resolved attributes missing columns 重命名为一些临时名称来解决它,这对我来说是一种解决方法,希望它也对你有所帮助。不知道这个问题背后的真正原因,自从 spark 1.6 SPARK-10925987654321@
【讨论】:
【参考方案2】:我也多次遇到这个问题,遇到了this 这里提到这是火花相关的错误。 根据这篇文章,我想出了下面的代码来解决我的问题。
代码可以处理 LEFT、RIGHT、INNER 和 OUTER 连接,但 OUTER 连接在这里作为 FULL OUTER 工作。
def join_spark_dfs_sqlbased(sparkSession,left_table_sdf,right_table_sdf,common_join_cols_list=[],join_type="LEFT"):
temp_join_afix="_tempjoincolrenames"
join_type=join_type.upper()
left=left_table_sdf.select(left_table_sdf.columns)
right=right_table_sdf.select(right_table_sdf.columns)
if len(common_join_cols_list)>0:
common_join_cols_list=[col+temp_join_afix for col in common_join_cols_list]
else:
common_join_cols_list = list(set(left.columns).intersection(right.columns))
common_join_cols_list=[col+temp_join_afix for col in common_join_cols_list]
for col in left.columns:
left = left.withColumnRenamed(col, col + temp_join_afix)
left.createOrReplaceTempView('left')
for col in right.columns:
right = right.withColumnRenamed(col, col + temp_join_afix)
right.createOrReplaceTempView('right')
non_common_cols_left_list=list(set(left.columns)-set(common_join_cols_list))
non_common_cols_right_list=list(set(right.columns)-set(common_join_cols_list))
unidentified_common_cols=list(set(non_common_cols_left_list)-set(non_common_cols_right_list))
if join_type in ['LEFT','INNER','OUTER']:
non_common_cols_right_list=list(set(non_common_cols_right_list)-set(unidentified_common_cols))
common_join_cols_list_with_table=['a.'+col +' as '+col for col in common_join_cols_list]
else:
non_common_cols_left_list=list(set(non_common_cols_left_list)-set(unidentified_common_cols))
common_join_cols_list_with_table=['b.'+col +' as '+col for col in common_join_cols_list]
non_common_cols_left_list_with_table=['a.'+col +' as '+col for col in non_common_cols_left_list]
non_common_cols_right_list_with_table=['b.'+col +' as '+col for col in non_common_cols_right_list]
non_common_cols_list_with_table=non_common_cols_left_list_with_table + non_common_cols_right_list_with_table
if join_type=="OUTER":
join_type="FULL OUTER"
join_type=join_type+" JOIN"
select_cols=common_join_cols_list_with_table+non_common_cols_list_with_table
common_join_cols_list_with_table_join_query=['a.'+col+ '='+'b.'+col for col in common_join_cols_list]
query= "SELECT "+ ",".join(select_cols) + " FROM " + "left" + " a " + join_type + " " + "right" + " b" +" ON "+ " AND ".join(common_join_cols_list_with_table_join_query)
print("query:",query)
joined_sdf= sparkSession.sql(query)
for col in joined_sdf.columns:
if temp_join_afix in col:
joined_sdf = joined_sdf.withColumnRenamed(col, col.replace(temp_join_afix,''))
return joined_sdf
【讨论】:
以上是关于如何解决 Pyspark 中的分析异常错误的主要内容,如果未能解决你的问题,请参考以下文章
如何解决空指针异常和 ThreadException 中的错误android
如何解决 java 中的错误 org.omg.CORBA.BAD_OPERATION 异常?
如何解决空指针异常和 ThreadException 中的错误android