如何减少来自 kafka 的 spark 数据帧并收集结果?

Posted

技术标签:

【中文标题】如何减少来自 kafka 的 spark 数据帧并收集结果?【英文标题】:How do I reduce a spark dataframe from kafka and collect the result? 【发布时间】:2019-11-29 01:21:19 【问题描述】:

我有一个从 kafka 流创建的数据框。我想将其减少为单个值,然后在我的程序中使用该单个值。

```scala
import sparkSession.implicits._
val df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("subscribe", "theTopic")
  .load()

val result = df
  .selectExpr("CAST(value AS STRING) as json")
  .map(json => getAnInt(json))
  .reduce  (x, y) =>
    if (x > y) x else y
  



 someOtherFunction(result)
 ```

我希望将流减少到一个值,然后我可以在我的程序的其余部分中使用它。相反,它失败了:

org.apache.spark.sql.AnalysisException: 带有流源的查询必须使用 writeStream.start();; 卡夫卡 在 org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389) 在 org.apache.spark.sql.catalyst.analysis.U...

【问题讨论】:

【参考方案1】:

您只能在流式数据帧上使用writeStream。我不确定您是否打算拥有此流数据帧。如果您删除readStream 并改用read,您可能会解决此问题!

【讨论】:

就是这样。谢谢!

以上是关于如何减少来自 kafka 的 spark 数据帧并收集结果?的主要内容,如果未能解决你的问题,请参考以下文章

spark中将每个组作为新数据帧并在循环中传递另一个函数的最佳方法是啥?

spark如何在cassandra表之间复制数据?

来自 Kafka 的 Spark Streaming 有错误 numRecords 不能为负

如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?

如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源

Spark中的拆分,操作和联合数据框