pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?

Posted

技术标签:

【中文标题】pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?【英文标题】:pySpark Best alternative for using Spark SQL/DF withing a UDF? 【发布时间】:2019-11-30 18:52:51 【问题描述】:

我被困在一个过程中,我需要对 Dataframe 中的每个列值执行一些操作,这需要再次遍历 DF。以下是数据样本:

Row(user_id='KxGeqg5ccByhaZfQRI4Nnw', gender='male', business_id='S75Lf-Q3bCCckQ3w7mSN2g', friends='my4q3Sy6Ei45V58N2l8VGw, R3zl9VKw63rPxSfBxbasWw, c-c64rURhBR8V8scSbwo7Q, tn6qogrDbb9hEKfRBGUUpw, pu_AQig2fw40PshvtgONPQ, IDrgtQccPN9c4rBn7yyk4Q, OIIx11vTeLN8EBcZrYXHKQ')

friends 在这里只是其他user_id 的列表。我想要做的是为这个特定用户的每个friends 获取一些值。现在,由于这是user_id,我需要为此查询我的DF,这在UDF 中是不允许的。我既不能执行 spark.sql 也不能引用 Dataframe 并执行过滤器,因为两者都是 sparkSession 对象。

我可以在这里尝试什么不同的方法?

尝试创建一个 DF 然后过滤:

tempDF=sparkSession.sql("SELECT review_sentiment,user_id,business_id FROM events")

def getfriendsSentiment(friendsList, b_id):

  listOfSentiments=[]
  for friend_id in friendsList.split(','):
    try:
      listOfSentiments.append(tempDF.filter("user_id='"+friend_id+"' AND business_id='"+b_id+"'").rdd.flatMap(lambda x:x).collect()[0])
    except:
      pass

friendsSentiment = udf(getfriendsSentiment, StringType())
businessReviewUserDfWithFriends=businessReviewUserDfWithFriends.withColumn('friendsSentimentToBusiness', friendsSentiment('friends','business_id'))

错误:

py4j.Py4JException: Method __getstate__([]) does not exist

尝试创建一个表并查询它:

sparkSession.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")

def getfriendsSentiment(friendsList, b_id):

  listOfSentiments=[]
  for friend_id in friendsList.split(','):
    try:
       listOfSentiments.append(spark.sql("SELECT review_sentiment FROM events WHERE user_id='"+friend_id+"' AND business_id='"+b_id+"' GROUP BY review_sentiment ORDER BY COUNT(review_sentiment) DESC LIMIT 1").rdd.flatMap(lambda x:x).collect()[0])
    except:
      pass

错误:

PicklingError: Could not serialize object: Exception: It appears that you are attempting........

我能做些什么来解决这个问题?

【问题讨论】:

【参考方案1】:

您不能在 UDFS 中使用 SparkSession/DataFrame 对象。

我认为在这里可行的解决方案是爆炸每一行 通过朋友然后加入(friend.id==user.id&&friend.business_id==user.business_id)

第二个解决方案是可能的(如果事件表适合您的记忆), 就是一开始就收集你的事件表,然后广播给所有的执行者。然后,您可以在 UDF 中使用您的数据。只有当事件是一张小桌子并且适合您的记忆时才能完成。

【讨论】:

以上是关于pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?的主要内容,如果未能解决你的问题,请参考以下文章

pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?

pyspark如何使用两列编写UDF

使用 udf 的 pyspark 出错:您必须使用 Hive 构建 Spark。导出 'SPARK_HIVE=true' 并运行 build/sbt 程序集

在pyspark中使用pandas udf/apache Arrow

在 PySpark 中重新加载 UDF

在 pyspark 中使用 UDF 和简单数据帧