将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中
Posted
技术标签:
【中文标题】将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中【英文标题】:Send multiple columns in Spark Dataframe to an external API and store the result in a separate column 【发布时间】:2019-03-11 22:05:39 【问题描述】:我有一个包含 40 多列的 spark 数据框。和数百万行。 我想创建另一列,它从上述数据框中获取 5 列,将 5 列中的每一行传递给单独的 Api(它采用这 5 个值并返回一些数据)并将结果存储在列中。
为简单起见,我使用以下示例: 假设我有以下数据框。我想将每一行“食物”和“价格”发送到一个 API,该 API 返回一个结果,并将其存储在一个名为“组合”的单独列中
输入:
+----+------+-----+
|name|food |price|
+----+------+-----+
|john|tomato|1.99 |
|john|carrot|0.45 |
|bill|apple |0.99 |
|john|banana|1.29 |
|bill|taco |2.59 |
+----+------+-----+
输出:
+----+------+-----+----------+
|name|food |price|combined |
+----+------+-----+----------+
|john|tomato|1.99 |abcd |
|john|carrot|0.45 |fdg |
|bill|apple |0.99 |123fgfg |
|john|banana|1.29 |fgfg4wf |
|bill|taco |2.59 |gfg45gn |
+----+------+-----+----------+
我创建了一个 UDF 来查看每一行:
val zip = udf
(food: String, price: Double) =>
val nvIn = new NameValue
nvIn.put("Query.ID", 1234)
nvIn.put("Food", food)
nvIn.put("Price", price)
val nvOut = new NameValue
val code: Code = getTunnelsClient().execute("CombineData", nvIn, nvOut) // this is calling the external API
nvOut.get("CombineData") //this is stored the result column
def test(sc: SparkContext, sqlContext: SQLContext): Unit =
import sqlContext.implicits._
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
val result = df.withColumn("combined", zip($"food", $"price"))
result.show(false)
这种方法有效,但是我很担心,因为我正在查看数据帧的每一行,并且我有数百万这样的行,它在集群上的性能不会那么好
有没有其他方法可以做到(比如使用 spark-sql),可能不使用 udf ?
【问题讨论】:
【参考方案1】:我强烈建议使用安全类型 spark
Dataset
api 将您的数据行发送到 api。
这涉及使用 as
函数将 Dataframe
行解析为 scala
case
class
,然后在 Dataset\Dataframe
上执行 map
函数以将其发送到 api 并返回另一个case class
代表你的Output
。
虽然严格来说不是spark sql
,但使用Dataset
api 仍然可以让您受益于spark sql
中提供的大部分优化
case class Input(name: String, food: String, price: Double)
case class Output(name: String, food: String, price: Double, combined: String)
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.as[Input].map(input =>
val nvIn = new NameValue
nvIn.put("Query.ID", 1234)
nvIn.put("Food", input.food)
nvIn.put("Price", input.price)
val nvOut = new NameValue
getTunnelsClient().execute("CombineData", nvIn, nvOut)
Output(input.name, input.food, input.price, nvOut.get("CombineData"))
).show(false)
【讨论】:
以上是关于将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spark Dataframe scala 将多个不同的列转换为 Map 列
Spark Dataframe API 选择多个列,将它们映射到一个固定的集合,然后联合所有
如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?