为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]
Posted
技术标签:
【中文标题】为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]【英文标题】:Why dataframe cannot be accessed inside UDF ? [Apache Spark Scala] [duplicate]为什么不能在 UDF 中访问数据框? [Apache Spark Scala] [重复] 【发布时间】:2019-03-10 06:15:16 【问题描述】:我目前正在使用 Apache Spark 进行流式传输项目。我有 2 个数据源,第一个从 Kafka 获取新闻数据。这些数据每次都在更新。第二个,我得到 masterWord 字典。该变量包含单词的数据框和单词的唯一键。
我想处理新闻数据,然后通过将数据与 masterWord 字典匹配,将其从单词 Seq 转换为 words_id 的 Seq。但是,我在访问我的 UDF 中的 masterWord 数据框时遇到了问题。当我尝试访问 UDF 中的数据帧时,Spark 返回此错误
原因:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 4.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0 i n stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException
为什么不能在 UDF 中访问数据帧?
从另一个数据框获取价值的最佳做法是什么?
这是我的代码
// read data stream from Kafka
val kafka = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", PropertiesLoader.kafkaBrokerUrl)
.option("subscribe", PropertiesLoader.kafkaTopic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "100")
.load()
// Transform data stream to Dataframe
val kafkaDF = kafka.selectExpr("CAST(value AS STRING)").as[(String)]
.select(from_json($"value", ColsArtifact.rawSchema).as("data"))
.select("data.*")
.withColumn("raw_text", concat(col("title"), lit(" "), col("text"))) // add column aggregate title and text
// read master word dictionary
val readConfig = ReadConfig(Map("uri" -> "mongodb://10.252.37.112/prayuga", "database" -> "prayuga", "collection" -> "master_word_2"))
var masterWord = MongoSpark.load(spark, readConfig)
// call UDF
val aggregateDF = kafkaDF.withColumn("text_aggregate", aggregateMongo(col("text_selected")))
// UDF
val aggregateMongo = udf((content: Seq[String]) =>
masterWord.show()
...
// code for query masterWord whether var content exist or not in masterWord dictionary
)
【问题讨论】:
广播数据帧,但广播值大小,默认不能超过10MB。首先确保执行器端的数据框不为空,然后检查大小。 @deo 我在本地运行这个程序,我假设如果我显示我的代码之类的数据框,我会在执行程序中运行它吗?我加载后显示了它,没有null。但是当我在 UDF 中访问时,有 null。 @user10465355 对不起,我不是在创建数据框,而是从数据源加载数据,有什么参考资料可以让我的数据框可以加载到 UDF 中吗? 【参考方案1】:数据帧存在于 spark 上下文中,并且仅在驱动程序中可用 每个任务都会看到数据的一部分(分区)并且可以使用它。如果您想让数据帧中的数据在 udf 中可用,您必须将其序列化到主服务器,然后您可以将其广播(或将其作为参数传递,这将基本上做同样的事情)到 udf,在这种情况下 Spark会将整个内容发送到正在运行的 udf 的每个实例
【讨论】:
感谢您的回答。我正在使用火花结构的流媒体。有没有我可以尝试的示例代码?我有点混淆广播这个变量或将它作为 UDF 中的参数传递。 实际上可以在 UDF 中使用数据帧,但前提是您要从中创建一个广播变量(类型为Broadcast[DataFrame]
)。
@raphael 广播它会将数据帧序列化到驱动程序,然后按照我上面写的那样发送它【参考方案2】:
如果你想在 UDF 中使用 DataFrames,你必须创建一个Broadcast
:
import spark.implicits._
val df_name =Seq("Raphael").toDF("name")
val bc_df_name: Broadcast[DataFrame] = spark.sparkContext.broadcast(df_name)
// use df_name inside udf
val udf_doSomething = udf(() => bc_df_name.value.as[String].first())
Seq(1,2,3)
.toDF("i")
.withColumn("test",udf_doSomething())
.show()
给了
+---+-------+
| i| test|
+---+-------+
| 1|Raphael|
| 2|Raphael|
| 3|Raphael|
+---+-------+
这至少适用于local
模式,确定这是否也适用于集群。无论如何,我不推荐这种方法,最好在驱动程序上的 scala 数据结构中转换(collect
)数据帧的内容(例如Map
)并广播此变量,或者改用连接。
【讨论】:
谢谢,这个例子很清楚。以上是关于为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]的主要内容,如果未能解决你的问题,请参考以下文章
在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构
Apache Spark - 将 UDF 的结果分配给多个数据框列