在 pyspark UDF 中使用广播数据帧
Posted
技术标签:
【中文标题】在 pyspark UDF 中使用广播数据帧【英文标题】:Using broadcasted dataframe in pyspark UDF 【发布时间】:2018-10-29 08:55:54 【问题描述】:是否可以在 pyspark SQl 应用程序的 UDF 中使用广播数据帧。
我的代码在 pyspark 数据帧中调用广播的数据帧,如下所示。
fact_ent_df_data =
sparkSession.sparkContext.broadcast(fact_ent_df.collect())
def generate_lookup_code(col1,col2,col3):
fact_ent_df_count=fact_ent_df_data.
select(fact_ent_df_br.TheDate.between(col1,col2),
fact_ent_df_br.Ent.isin('col3')).count()
return fact_ent_df_count
sparkSession.udf.register("generate_lookup_code" , generate_lookup_code )
sparkSession.sql('select sample4,generate_lookup_code(sample1,sample2,sample 3) as count_hol from table_t')
当我使用广播的 df_bc 时,我在分配错误之前使用了局部变量。任何帮助表示赞赏 我得到的错误是
Traceback (most recent call last):
File "C:/Users/Vignesh/PycharmProjects/gettingstarted/aramex_transit/spark_driver.py", line 46, in <module>
sparkSession.udf.register("generate_lookup_code" , generate_lookup_code )
File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\sql\udf.py", line 323, in register
self.sparkSession._jsparkSession.udf().registerPython(name, register_udf._judf)
File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\sql\udf.py", line 148, in _judf
self._judf_placeholder = self._create_judf()
File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\sql\udf.py", line 157, in _create_judf
wrapped_func = _wrap_function(sc, self.func, self.returnType)
File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\sql\udf.py", line 33, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\rdd.py", line 2391, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\serializers.py", line 575, in dumps
return cloudpickle.dumps(obj, 2)
File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\cloudpickle.py", line 918, in dumps
cp.dump(obj)
File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\cloudpickle.py", line 249, in dump
raise pickle.PicklingError(msg)
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o24.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
【问题讨论】:
【参考方案1】:将 Spark Broadcast 变量视为 Python 简单数据类型,如 list
,那么问题是如何将变量传递给 UDF
函数。这是一个例子:
假设我们有年龄列表d
和一个包含name
和age
列的数据框。所以我们要检查每个人的年龄是否在年龄列表中。
from pyspark.sql.functions import udf, col
l = [13, 21, 34] # ages list
d = [('Alice', 10), ('bob', 21)] # data frame rows
rdd = sc.parallelize(l)
b_rdd = sc.broadcast(rdd.collect()) # define broadcast variable
df = spark.createDataFrame(d , ["name", "age"])
def check_age (age, age_list):
if age in l:
return "true"
return "false"
def udf_check_age(age_list):
return udf(lambda x : check_age(x, age_list))
df.withColumn("is_age_in_list", udf_check_age(b_rdd.value)(col("age"))).show()
输出:
+-----+---+--------------+
| name|age|is_age_in_list|
+-----+---+--------------+
|Alice| 10| false|
| bob| 21| true|
+-----+---+--------------+
【讨论】:
谢谢。我想从 UDF 内部对广泛的数据帧执行多列过滤器。每行值都需要用于过滤和获取计数,并在 udf 执行后返回。在您的示例中,我可以知道这里发生了什么 udf_check_age(b_rdd.value)(col("age"))。 col(age) 是否作为年龄列表传递?我需要从 3 列传递值,那么在这种情况下它必须是 df.withColumn(count_age,udf_cheack_age(b_rdd.value)(col('age'),col(age2)))。你能建议一下吗? 那么,您想将带有广播变量的 3 列中的值传递给一个函数并获得一个输出?喜欢:def func(col1, col2, col3, b_rdd)
?
这个错误主要是因为在 UDF 中使用了数据框对象。 pickle.PicklingError:无法序列化对象:Py4JError:调用 o24.__getnewargs__ 时出错。跟踪:py4j.Py4JException:方法 __getnewargs__([]) 不存在
请将代码添加到问题中,在评论中它很混乱!
我的问题是我没有清单。我有一个数据框。当我通过数据帧时,我有一些酸洗问题。【参考方案2】:
只是尝试根据 Soheil 的回答提供一个更简单的示例。
from pyspark.sql.functions import udf, col
def check_age (_age):
return _age > 18
dict_source = "alice": 10, "bob": 21
broadcast_dict = sc.broadcast(dict_source) # define broadcast variable
rdd = sc.parallelize(list(dict_source.keys()))
result = rdd.map(
lambda _name: check_age(broadcast_dict.value.get(_name)) # Here you specify the broadcasted var `.value`
)
print(result.collect())
【讨论】:
以上是关于在 pyspark UDF 中使用广播数据帧的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?