python + pyspark:在pyspark中进行多列比较的内部连接错误
Posted
技术标签:
【中文标题】python + pyspark:在pyspark中进行多列比较的内部连接错误【英文标题】:python+pyspark: error on inner join with multiple column comparison in pyspark 【发布时间】:2016-09-22 06:35:07 【问题描述】:您好,我有 2 个数据框要加入
#df1
name genre count
satya drama 1
satya action 3
abc drame 2
abc comedy 2
def romance 1
#df2
name max_count
satya 3
abc 2
def 1
现在我想在名称和计数上加入 2 个以上的 dfs==max_count,但我遇到了一个错误
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
from pyspark.sql.functions import struct
df = spark.read.csv('file',sep = '###', header=True)
df1 = df.groupBy("name", "genre").count()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count))
final_df.show() ###Error
#py4j.protocol.Py4JJavaError: An error occurred while calling o207.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
#Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: count(1)
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
但“左”加入成功
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count), "left")
final_df.show() ###Success but i don't want left join , i want inner join
我的问题是为什么上面的失败了,我在那里做错了吗???
我将此链接称为“Find maximum row per group in Spark DataFrame”。使用了第一个答案(2 groupby 方法)。但同样的错误。
我正在使用 spark-2.0.0-bin-hadoop2.7 和 python 2.7。
请建议。谢谢。
编辑:
上述场景适用于spark 1.6(令人惊讶的是spark 2.0有什么问题(或者我的安装,我将在这里重新安装、检查和更新)。
有没有人在 spark 2.0 上尝试过这个并获得了成功,按照下面 Yaron 的回答???
【问题讨论】:
只是猜测.....列名会与数据框方法冲突吗?例如。count
。不知道为什么这只会影响内部连接。您可以尝试将 count
重命名为 cnt
或其他内容以排除这种可能性。
@RedBaron-Alredy 试过了。同样的错误。
【参考方案1】:
当我尝试加入两个 DataFrame,其中一个是 GroupedData 时,我遇到了同样的问题。当我在内部连接之前缓存 GroupedData DataFrame 时,它对我有用。对于您的代码,请尝试:
df1 = df.groupBy("name", "genre").count().cache() # added cache()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count")).cache() # added cache()
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count)) # no change
【讨论】:
@Johann-是的,它有效,但无法理解为什么!!!。您能否解释一下,为什么它有效以及为什么 Not-cached 版本无效。 @Satya 我的理解是,考虑到 Spark 的惰性求值机制,如果我们在加入 df1 和 df2 之前不缓存它们,Spark 会在发出加入命令。鉴于错误代码“无法评估表达式:count(1)”,Spark 似乎陷入了多次查找 count 值的循环中。【参考方案2】:更新:您的代码似乎也由于使用“count”作为列名而失败。 count 似乎是 DataFrame API 中的受保护关键字。 将 count 重命名为“mycount”解决了这个问题。以下工作代码经过修改以支持我用来测试您的问题的 spark 版本 1.5.2。
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/tmp/fac_cal.csv")
df1 = df.groupBy("name", "genre").count()
df1 = df1.select(col("name"),col("genre"),col("count").alias("mycount"))
df2 = df1.groupby('name').agg(F.max("mycount").alias("max_count"))
df2 = df2.select(col('name').alias('name2'),col("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2,[df1.name == df2.name2 , df1.mycount == df2.max_count])
final_df.show()
+-----+---------+-------+-----+---------+
| name| genre|mycount|name2|max_count|
+-----+---------+-------+-----+---------+
|brata| comedy| 2|brata| 2|
|brata| drama| 2|brata| 2|
|panda|adventure| 1|panda| 1|
|panda| romance| 1|panda| 1|
|satya| action| 3|satya| 3|
+-----+---------+-------+-----+---------+
https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html中的复杂条件示例
cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
你可以试试:
final_df = df1.join(df2, [df1.name == df2.name , df1.mycount == df2.max_count])
另请注意,根据规范,“left”不是有效连接类型的一部分: 如何 - str,默认为“内部”。内部、外部、left_outer、right_outer、leftsemi 之一。
【讨论】:
嗨 Yaron-仍然无法正常工作。 'left' 在我的系统上运行良好。我尝试将两个 dfs 中的列重命名为 (df1.columns==name,genre,cnt),(df2.columns==name,cnt) 并尝试 f = df1.join(df2,['name','cnt' ]) #得到错误和 f = df1.join(df2,['name','cnt'],'left') ##success... 在您的第一个示例中,您是否尝试过内部连接?如果它适用于您的系统,那么我的 spark 版本有些可疑。 @Satya 您能否与我们分享重现您看到的问题的输入文件? (例如,您在 spark.read.csv('file'... ) 中读取的“文件”) @Yaron-对不起,我做不到(该文件实际上是一个 >10Gb 的大文件)。但是我通过使用示例输入手动创建数据框尝试了同样的事情。 data = spark.createDataFrame([('satya', 'action'), ('satya', 'action'), ('satya', 'drama'), ('satya', 'action'), ('brata ', 'comedy'), ('brata', 'comedy'), ('panda', 'romance'), ('panda', 'adventure'), ('brata', 'drama'), ('brata ', '戏剧')], ['名称', '类型']) 在尝试使用多个相等条件进行“INNER”连接时,我在任何数据帧/s(fromfile(csv, json..etc) 或手动 i/ps)都面临同样的问题。但不是在执行“任何外部连接”时。【参考方案3】:我在 spark 2.0 中的解决方法
我从各个 dfs 中的连接比较('name','mycount')中的列创建了一个列('combined'),所以现在我有一个列要比较,这在我比较时不会产生任何问题只有一列。
def combine_func(*args):
data = '_'.join([str(x) for x in args]) ###converting nonstring to str tehn concatenation
return data
combine_func = udf(combine_func, StringType()) ##register the func as udf
df1 = df1.withColumn('combined_new_1', combine_new(df1['name'],df1['mycount'])) ###a col having concatenated value from name and mycount columns eg: 'satya_3'
df2 = df2.withColumn('combined_new_2', combine_new(df2['name2'],df2['max_count']))
#df1.columns == 'name','genre', 'mycount', 'combined_new_1'
#df2.columns == 'name2', 'max_count', 'combined_new_2'
#Now join
final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner')
#final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner').select('the columns you want')
final_df.show() ####It is showing the result, Trust me.
除非您赶时间,否则请不要关注,最好寻找可靠的解决方案。
【讨论】:
以上是关于python + pyspark:在pyspark中进行多列比较的内部连接错误的主要内容,如果未能解决你的问题,请参考以下文章
python, pyspark : 获取 pyspark 数据框列值的总和