pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?
Posted
技术标签:
【中文标题】pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?【英文标题】:pySpark Best alternative for using Spark SQL/DF withing a UDF? 【发布时间】:2019-11-30 18:52:51 【问题描述】:我被困在一个过程中,我需要对 Dataframe 中的每个列值执行一些操作,这需要再次遍历 DF。以下是数据样本:
Row(user_id='KxGeqg5ccByhaZfQRI4Nnw', gender='male', business_id='S75Lf-Q3bCCckQ3w7mSN2g', friends='my4q3Sy6Ei45V58N2l8VGw, R3zl9VKw63rPxSfBxbasWw, c-c64rURhBR8V8scSbwo7Q, tn6qogrDbb9hEKfRBGUUpw, pu_AQig2fw40PshvtgONPQ, IDrgtQccPN9c4rBn7yyk4Q, OIIx11vTeLN8EBcZrYXHKQ')
friends
在这里只是其他user_id
的列表。我想要做的是为这个特定用户的每个friends
获取一些值。现在,由于这是user_id
,我需要为此查询我的DF,这在UDF 中是不允许的。我既不能执行 spark.sql
也不能引用 Dataframe
并执行过滤器,因为两者都是 sparkSession
对象。
我可以在这里尝试什么不同的方法?
尝试创建一个 DF 然后过滤:
tempDF=sparkSession.sql("SELECT review_sentiment,user_id,business_id FROM events")
def getfriendsSentiment(friendsList, b_id):
listOfSentiments=[]
for friend_id in friendsList.split(','):
try:
listOfSentiments.append(tempDF.filter("user_id='"+friend_id+"' AND business_id='"+b_id+"'").rdd.flatMap(lambda x:x).collect()[0])
except:
pass
friendsSentiment = udf(getfriendsSentiment, StringType())
businessReviewUserDfWithFriends=businessReviewUserDfWithFriends.withColumn('friendsSentimentToBusiness', friendsSentiment('friends','business_id'))
错误:
py4j.Py4JException: Method __getstate__([]) does not exist
尝试创建一个表并查询它:
sparkSession.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")
def getfriendsSentiment(friendsList, b_id):
listOfSentiments=[]
for friend_id in friendsList.split(','):
try:
listOfSentiments.append(spark.sql("SELECT review_sentiment FROM events WHERE user_id='"+friend_id+"' AND business_id='"+b_id+"' GROUP BY review_sentiment ORDER BY COUNT(review_sentiment) DESC LIMIT 1").rdd.flatMap(lambda x:x).collect()[0])
except:
pass
错误:
PicklingError: Could not serialize object: Exception: It appears that you are attempting........
我能做些什么来解决这个问题?
【问题讨论】:
【参考方案1】:您不能在 UDFS 中使用 SparkSession/DataFrame 对象。
我认为在这里可行的解决方案是爆炸每一行
通过朋友然后加入(friend.id==user.id&&friend.business_id==user.business_id)
。
第二个解决方案是可能的(如果事件表适合您的记忆), 就是一开始就收集你的事件表,然后广播给所有的执行者。然后,您可以在 UDF 中使用您的数据。只有当事件是一张小桌子并且适合您的记忆时才能完成。
【讨论】:
以上是关于pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?的主要内容,如果未能解决你的问题,请参考以下文章
pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?
使用 udf 的 pyspark 出错:您必须使用 Hive 构建 Spark。导出 'SPARK_HIVE=true' 并运行 build/sbt 程序集