pyspark中的条件爆炸
Posted
技术标签:
【中文标题】pyspark中的条件爆炸【英文标题】:Conditional explode in pyspark 【发布时间】:2020-09-24 18:55:31 【问题描述】:我有如下数据
+----------+-----------------------------------+---------------------------------------------------------------------+
|athl_id |Interest |branch |
+----------+-----------------------------------+---------------------------------------------------------------------+
|123 |Running |Running,Outdoor |
|856 |Running |Running |
|902 |Training,Fitness |Fitness,Training |
|9567 |Swimming,Training,Fitness |Swimming,Training,Fitness |
|477 |All |Running,All,Training,Soccer,Swimming,Fitness,Outdoor,Indoor |
|490 |Running,Indoor |Running,Indoor |
+----------+-----------------------------------+---------------------------------------------------------------------+
现在我想用以下条件分解两个字段兴趣和分支。
-
对于每个 athl_id,完全分解兴趣字段
如果任何以逗号分隔的分支值等于任何以逗号分隔的兴趣值,则从分支中完全忽略该值并分解其余部分。
如果没有逗号分隔的分支值等于任何逗号分隔的兴趣值,则分解字段分支。
Ex - 在上表中,athl_id - 902 对 Training,Fitness 有兴趣,并且由于分支值也相同(Fitness,Training),因此他们预期的结果对于分支和兴趣爆炸为两行。 同样,athl_id - 477 对所有感兴趣,分支的值为“跑步、所有、训练、足球、游泳、健身、户外、室内”,但由于“所有”是感兴趣的一部分,因此被分解的归档分支确实不包含“全部”,其余包含“跑步、训练、足球、游泳、健身、户外、室内”
预期结果:
+----------+-----------------------------------+---------------------------------------------------------------------+
|athl_id |Interest |branch |
+----------+-----------------------------------+---------------------------------------------------------------------+
|123 |Running |Outdoor |
|856 |Running | |
|902 |Training | |
|902 |Fitness | |
|9567 |Swimming | |
|9567 |Training | |
|9567 |Fitness | |
|477 |All |Running |
|477 |All |Training |
|477 |All |Soccer |
|477 |All |Swimming |
|477 |All |Fitness |
|477 |All |Outdoor |
|477 |All |Indoor |
|490 |Running | |
|490 |Indoor | |
+----------+-----------------------------------+---------------------------------------------------------------------+
现在,我尝试了以下方法,但遇到了错误。而且,我认为“array_contains”与确切的值不匹配..
spark.sql("""
select athl_id, Interest,
case when array_contains(split(branch,','),Interest) then null
else explode(split(branch,',')) end as branch
from (
select athl_id, explode(split(Interest,',')) as Interest ,branch from athl_details)a
""").show(100,False )
Traceback (most recent call last):
File "<stdin>", line 7, in <module>
File "/usr/lib/spark/python/pyspark/sql/session.py", line 767, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Generators are not supported when it's nested in expressions
有人可以建议我正确的方法吗?
谢谢!! ????
【问题讨论】:
【参考方案1】:使用来自 Spark 版本 >= 2.4 的 array_except
函数。
在split
ting 之后获取两列的元素差异,并在该列上使用explode_outer
。
from pyspark.sql.functions import col,explode_outer,array_except,split
split_col_df = df.withColumn('interest_array',split(col('interest'),',')) \
.withColumn('branch_array',split(col('branch'),','))
#Get the elements in branch not in interest
tmp_df = split_col_df.withColumn('elem_diff',array_except(col('branch_array'),col('interest_array')))
res = tmp_df.withColumn('interest_expl',explode_outer(col('interest_array'))) \
.withColumn('branch_expl',explode_outer(col('elem_diff')))
res.select('athl_id','interest_expl','branch_expl').show()
如果branch
列中可能存在重复项,并且您只想减去相同数量的公共值,则可能需要编写 UDF 来解决问题。
【讨论】:
以上是关于pyspark中的条件爆炸的主要内容,如果未能解决你的问题,请参考以下文章