在结构化流中将数据帧传递给 UDF 时出错

Posted

技术标签:

【中文标题】在结构化流中将数据帧传递给 UDF 时出错【英文标题】:Error while passing dataframe to UDF in Structured Streaming 【发布时间】:2020-07-13 07:28:40 【问题描述】:

我在 Spark Structured 流中从 Kafka 读取事件,需要一一处理事件并写入 redis。我为此编写了一个 UDF,但它给了我火花上下文错误。

conf = SparkConf()\
.setAppName(spark_app_name)\
.setMaster(spark_master_url)\
.set("spark.redis.host", "redis")\
.set("spark.redis.port", "6379")\
.set("spark.redis.auth", "abc")

spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()

def func(element, event, timestamp):
    #redis i/o
    pass

schema = ArrayType(StructType(
[
    StructField("element_id", StringType()),
    StructField("event_name", StringType()),
    StructField("event_time", StringType())
]
))

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", topic) \
    .load()
    #.option("includeTimestamp", value = True)\

ds = df.selectExpr(("CAST(value AS STRING)"))\
     .withColumn("value", explode(from_json("value", schema)))

filter_func = udf(func, ArrayType(StringType()))

ds = ds.withColumn("column_name", filter_func(
    ds['value']['element_id'], 
    ds['value']['event_name'], 
    ds['value']['event_time']
))

query = ds.writeStream \
        .format("console") \
        .start()

query.awaitTermination()

错误消息:_pickle.PicklingError:无法序列化对象:异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。

感谢任何帮助。

【问题讨论】:

你也可以添加这个 - update_nfa ?? 您遇到的错误是什么? 已更新错误信息 你需要一个数组吗? 是的,udf 函数返回一个字符串列表,但我找不到 ListType。 【参考方案1】:

我试图从不允许的用户定义函数中访问 spark 上下文。 在 udf 中,我尝试使用 spark 上下文写入 spark-redis。

【讨论】:

以上是关于在结构化流中将数据帧传递给 UDF 时出错的主要内容,如果未能解决你的问题,请参考以下文章

在 Pig 中将关系传递给 Python UDF 时出错

pyspark 在 udf 中获取结构数据类型的字段名称

使用 PHP PDO 在 SQL 中将多行传递给 UDF

将多行结构化流式传输到 pandas udf

使用引用将结构传递给函数时出错[重复]

在 Spark 结构化流中将数据内部连接到左连接 DataFrame 时丢失条目