python + pyspark:在pyspark中进行多列比较的内部连接错误

Posted

技术标签:

【中文标题】python + pyspark:在pyspark中进行多列比较的内部连接错误【英文标题】:python+pyspark: error on inner join with multiple column comparison in pyspark 【发布时间】:2016-09-22 06:35:07 【问题描述】:

您好,我有 2 个数据框要加入

#df1
 name    genre  count
 satya   drama    1
 satya   action   3
 abc     drame    2
 abc     comedy   2
 def     romance  1

#df2
 name  max_count
 satya  3
 abc    2
 def    1

现在我想在名称和计数上加入 2 个以上的 dfs==max_count,但我遇到了一个错误

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
from pyspark.sql.functions import struct
df = spark.read.csv('file',sep = '###', header=True)
df1 = df.groupBy("name", "genre").count()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count))
final_df.show() ###Error
#py4j.protocol.Py4JJavaError: An error occurred while calling o207.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
#Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: count(1)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)

但“左”加入成功

final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count), "left")
final_df.show()  ###Success but i don't want left join , i want inner join

我的问题是为什么上面的失败了,我在那里做错了吗???

我将此链接称为“Find maximum row per group in Spark DataFrame”。使用了第一个答案(2 groupby 方法)。但同样的错误。

我正在使用 spark-2.0.0-bin-hadoop2.7 和 python 2.7。

请建议。谢谢。

编辑:

上述场景适用于spark 1.6(令人惊讶的是spark 2.0有什么问题(或者我的安装,我将在这里重新安装、检查和更新)。

有没有人在 spark 2.0 上尝试过这个并获得了成功,按照下面 Yaron 的回答???

【问题讨论】:

只是猜测.....列名会与数据框方法冲突吗?例如。 count。不知道为什么这只会影响内部连接。您可以尝试将 count 重命名为 cnt 或其他内容以排除这种可能性。 @RedBaron-Alredy 试过了。同样的错误。 【参考方案1】:

当我尝试加入两个 DataFrame,其中一个是 GroupedData 时,我遇到了同样的问题。当我在内部连接之前缓存 GroupedData DataFrame 时,它​​对我有用。对于您的代码,请尝试:

df1 = df.groupBy("name", "genre").count().cache()    # added cache()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count")).cache()   # added cache()
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count))    # no change

【讨论】:

@Johann-是的,它有效,但无法理解为什么!!!。您能否解释一下,为什么它有效以及为什么 Not-cached 版本无效。 @Satya 我的理解是,考虑到 Spark 的惰性求值机制,如果我们在加入 df1 和 df2 之前不缓存它们,Spark 会在发出加入命令。鉴于错误代码“无法评估表达式:count(1)”,Spark 似乎陷入了多次查找 count 值的循环中。【参考方案2】:

更新:您的代码似乎也由于使用“count”作为列名而失败。 count 似乎是 DataFrame API 中的受保护关键字。 将 count 重命名为“mycount”解决了这个问题。以下工作代码经过修改以支持我用来测试您的问题的 spark 版本 1.5.2。

df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/tmp/fac_cal.csv")
df1 = df.groupBy("name", "genre").count()
df1 = df1.select(col("name"),col("genre"),col("count").alias("mycount"))
df2 = df1.groupby('name').agg(F.max("mycount").alias("max_count"))
df2 = df2.select(col('name').alias('name2'),col("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2,[df1.name == df2.name2 , df1.mycount == df2.max_count])
final_df.show()

+-----+---------+-------+-----+---------+
| name|    genre|mycount|name2|max_count|
+-----+---------+-------+-----+---------+
|brata|   comedy|      2|brata|        2|
|brata|    drama|      2|brata|        2|
|panda|adventure|      1|panda|        1|
|panda|  romance|      1|panda|        1|
|satya|   action|      3|satya|        3|
+-----+---------+-------+-----+---------+

https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html中的复杂条件示例

cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]

你可以试试:

final_df = df1.join(df2, [df1.name == df2.name , df1.mycount == df2.max_count])

另请注意,根据规范,“left”不是有效连接类型的一部分: 如何 - str,默认为“内部”。内部、外部、left_outer、right_outer、leftsemi 之一。

【讨论】:

嗨 Yaron-仍然无法正常工作。 'left' 在我的系统上运行良好。我尝试将两个 dfs 中的列重命名为 (df1.columns==name,genre,cnt),(df2.columns==name,cnt) 并尝试 f = df1.join(df2,['name','cnt' ]) #得到错误和 f = df1.join(df2,['name','cnt'],'left') ##success... 在您的第一个示例中,您是否尝试过内部连接?如果它适用于您的系统,那么我的 spark 版本有些可疑。 @Satya 您能否与我们分享重现您看到的问题的输入文件? (例如,您在 spark.read.csv('file'... ) 中读取的“文件”) @Yaron-对不起,我做不到(该文件实际上是一个 >10Gb 的大文件)。但是我通过使用示例输入手动创建数据框尝试了同样的事情。 data = spark.createDataFrame([('satya', 'action'), ('satya', 'action'), ('satya', 'drama'), ('satya', 'action'), ('brata ', 'comedy'), ('brata', 'comedy'), ('panda', 'romance'), ('panda', 'adventure'), ('brata', 'drama'), ('brata ', '戏剧')], ['名称', '类型']) 在尝试使用多个相等条件进行“INNER”连接时,我在任何数据帧/s(fromfile(csv, json..etc) 或手动 i/ps)都面临同样的问题。但不是在执行“任何外部连接”时。【参考方案3】:

我在 spark 2.0 中的解决方法

我从各个 dfs 中的连接比较('name','mycount')中的列创建了一个列('combined'),所以现在我有一个列要比较,这在我比较时不会产生任何问题只有一列。

def combine_func(*args):
  data = '_'.join([str(x) for x in args]) ###converting nonstring to str tehn concatenation
  return data
combine_func = udf(combine_func, StringType())  ##register the func as udf
df1 = df1.withColumn('combined_new_1', combine_new(df1['name'],df1['mycount']))  ###a col having concatenated value from name and mycount columns eg: 'satya_3'
df2 = df2.withColumn('combined_new_2', combine_new(df2['name2'],df2['max_count']))
#df1.columns == 'name','genre', 'mycount', 'combined_new_1'
#df2.columns == 'name2', 'max_count', 'combined_new_2'
#Now join 
final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner')
#final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner').select('the columns you want')
final_df.show()  ####It is showing the result, Trust me.

除非您赶时间,否则请不要关注,最好寻找可靠的解决方案。

【讨论】:

以上是关于python + pyspark:在pyspark中进行多列比较的内部连接错误的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Pyspark 中运行 Python 脚本

python, pyspark : 获取 pyspark 数据框列值的总和

Pyspark:从 Python 到 Pyspark 实现 lambda 函数和 udf

在PySpark / Python RDD中过滤

ipython怎么安装pyspark

在 Python/PySpark 中 Spark 复制数据框列的最佳实践?