将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中

Posted

技术标签:

【中文标题】将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中【英文标题】:Send multiple columns in Spark Dataframe to an external API and store the result in a separate column 【发布时间】:2019-03-11 22:05:39 【问题描述】:

我有一个包含 40 多列的 spark 数据框。和数百万行。 我想创建另一列,它从上述数据框中获取 5 列,将 5 列中的每一行传递给单独的 Api(它采用这 5 个值并返回一些数据)并将结果存储在列中。

为简单起见,我使用以下示例: 假设我有以下数据框。我想将每一行“食物”和“价格”发送到一个 API,该 API 返回一个结果,并将其存储在一个名为“组合”的单独列中

输入:

+----+------+-----+
|name|food  |price|
+----+------+-----+
|john|tomato|1.99 |
|john|carrot|0.45 |
|bill|apple |0.99 |
|john|banana|1.29 |
|bill|taco  |2.59 |
+----+------+-----+

输出:

+----+------+-----+----------+
|name|food  |price|combined  |
+----+------+-----+----------+
|john|tomato|1.99 |abcd      |
|john|carrot|0.45 |fdg       |
|bill|apple |0.99 |123fgfg   |
|john|banana|1.29 |fgfg4wf   |
|bill|taco  |2.59 |gfg45gn   |
+----+------+-----+----------+

我创建了一个 UDF 来查看每一行:

val zip = udf 
(food: String, price: Double) =>
    val nvIn = new NameValue
    nvIn.put("Query.ID", 1234)
    nvIn.put("Food", food)
    nvIn.put("Price", price)
    val nvOut = new NameValue

    val code: Code = getTunnelsClient().execute("CombineData", nvIn, nvOut) // this is calling the external API
    nvOut.get("CombineData")     //this is stored the result column
  

  def test(sc: SparkContext, sqlContext: SQLContext): Unit = 
    import sqlContext.implicits._
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")


    val result = df.withColumn("combined", zip($"food", $"price"))
    result.show(false)

  

这种方法有效,但是我很担心,因为我正在查看数据帧的每一行,并且我有数百万这样的行,它在集群上的性能不会那么好

有没有其他方法可以做到(比如使用 spark-sql),可能不使用 udf ?

【问题讨论】:

【参考方案1】:

我强烈建议使用安全类型 spark Dataset api 将您的数据行发送到 api。

这涉及使用 as 函数将 Dataframe 行解析为 scala case class,然后在 Dataset\Dataframe 上执行 map 函数以将其发送到 api 并返回另一个case class 代表你的Output

虽然严格来说不是spark sql,但使用Dataset api 仍然可以让您受益于spark sql 中提供的大部分优化

case class Input(name: String, food: String, price: Double)
case class Output(name: String, food: String, price: Double, combined: String)

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

df.as[Input].map(input => 
    val nvIn = new NameValue
    nvIn.put("Query.ID", 1234)
    nvIn.put("Food", input.food)
    nvIn.put("Price", input.price)
    val nvOut = new NameValue
    getTunnelsClient().execute("CombineData", nvIn, nvOut)
    Output(input.name, input.food, input.price, nvOut.get("CombineData"))
).show(false)

【讨论】:

以上是关于将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark Dataframe scala 将多个不同的列转换为 Map 列

Spark Dataframe API 选择多个列,将它们映射到一个固定的集合,然后联合所有

如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?

如何在 spark DataFrame 中将多个浮点列连接到一个 ArrayType(FloatType()) 中?

如何将列除以 Spark DataFrame 中的总和

使用 PySpark 将多个数字列拟合到 spark-ml 模型中