为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]

Posted

技术标签:

【中文标题】为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]【英文标题】:Why dataframe cannot be accessed inside UDF ? [Apache Spark Scala] [duplicate]为什么不能在 UDF 中访问数据框? [Apache Spark Scala] [重复] 【发布时间】:2019-03-10 06:15:16 【问题描述】:

我目前正在使用 Apache Spark 进行流式传输项目。我有 2 个数据源,第一个从 Kafka 获取新闻数据。这些数据每次都在更新。第二个,我得到 masterWord 字典。该变量包含单词的数据框和单词的唯一键。

我想处理新闻数据,然后通过将数据与 masterWord 字典匹配,将其从单词 Seq 转换为 words_id 的 Seq。但是,我在访问我的 UDF 中的 masterWord 数据框时遇到了问题。当我尝试访问 UDF 中的数据帧时,Spark 返回此错误

原因:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 4.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0 i n stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException

为什么不能在 UDF 中访问数据帧?

从另一个数据框获取价值的最佳做法是什么?

这是我的代码

// read data stream from Kafka
val kafka = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", PropertiesLoader.kafkaBrokerUrl)
  .option("subscribe", PropertiesLoader.kafkaTopic)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", "100")
  .load()

// Transform data stream to Dataframe
val kafkaDF = kafka.selectExpr("CAST(value AS STRING)").as[(String)]
  .select(from_json($"value", ColsArtifact.rawSchema).as("data"))
  .select("data.*")
  .withColumn("raw_text", concat(col("title"), lit(" "), col("text"))) // add column aggregate title and text

// read master word dictionary
val readConfig = ReadConfig(Map("uri" -> "mongodb://10.252.37.112/prayuga", "database" -> "prayuga", "collection" -> "master_word_2"))
var masterWord = MongoSpark.load(spark, readConfig)

// call UDF
val aggregateDF = kafkaDF.withColumn("text_aggregate", aggregateMongo(col("text_selected")))

// UDF
val aggregateMongo = udf((content: Seq[String]) => 
  masterWord.show()
  ...
  // code for query masterWord whether var content exist or not in masterWord dictionary
)

【问题讨论】:

广播数据帧,但广播值大小,默认不能超过10MB。首先确保执行器端的数据框不为空,然后检查大小。 @deo 我在本地运行这个程序,我假设如果我显示我的代码之类的数据框,我会在执行程序中运行它吗?我加载后显示了它,没有null。但是当我在 UDF 中访问时,有 null。 @user10465355 对不起,我不是在创建数据框,而是从数据源加载数据,有什么参考资料可以让我的数据框可以加载到 UDF 中吗? 【参考方案1】:

数据帧存在于 spark 上下文中,并且仅在驱动程序中可用 每个任务都会看到数据的一部分(分区)并且可以使用它。如果您想让数据帧中的数据在 udf 中可用,您必须将其序列化到主服务器,然后您可以将其广播(或将其作为参数传递,这将基本上做同样的事情)到 udf,在这种情况下 Spark会将整个内容发送到正在运行的 udf 的每个实例

【讨论】:

感谢您的回答。我正在使用火花结构的流媒体。有没有我可以尝试的示例代码?我有点混淆广播这个变量或将它作为 UDF 中的参数传递。 实际上可以在 UDF 中使用数据帧,但前提是您要从中创建一个广播变量(类型为 Broadcast[DataFrame])。 @raphael 广播它会将数据帧序列化到驱动程序,然后按照我上面写的那样发送它【参考方案2】:

如果你想在 UDF 中使用 DataFrames,你必须创建一个Broadcast

import spark.implicits._

val df_name =Seq("Raphael").toDF("name")

val bc_df_name: Broadcast[DataFrame] = spark.sparkContext.broadcast(df_name)

// use df_name inside udf
val udf_doSomething = udf(() => bc_df_name.value.as[String].first())

Seq(1,2,3)
  .toDF("i")
  .withColumn("test",udf_doSomething())
  .show()

给了

+---+-------+
|  i|   test|
+---+-------+
|  1|Raphael|
|  2|Raphael|
|  3|Raphael|
+---+-------+

这至少适用于local 模式,确定这是否也适用于集群。无论如何,我不推荐这种方法,最好在驱动程序上的 scala 数据结构中转换(collect)数据帧的内容(例如Map)并广播此变量,或者改用连接。

【讨论】:

谢谢,这个例子很清楚。

以上是关于为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]的主要内容,如果未能解决你的问题,请参考以下文章

在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构

火花在UDF中创建数据框

Apache Spark - 将 UDF 的结果分配给多个数据框列

为啥不能在 Redshift 的 CTE 的某些子句中调用不可变的 UDF?

Pyspark:访问 UDF 中行内的列

Apache Spark - 注册 UDF - 返回数据帧