将 Spark SQL 批处理源转换为结构化流接收器

Posted

技术标签:

【中文标题】将 Spark SQL 批处理源转换为结构化流接收器【英文标题】:Convert a Spark SQL batch source to structured streaming sink 【发布时间】:2018-08-06 21:07:41 【问题描述】:

试图通过简单地实现调用createRelation(...)addBatch(...)org.apache.spark.sql.sources.CreatableRelationProvider转换为org.apache.spark.sql.execution.streaming.Sink,但createRelation(...)中有一个df.rdd,这会导致以下错误:

org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)

试图研究如何org.apache.spark.sql.execution.streaming.FileStreamSink 也需要在流作业中从数据帧中获取 Rdd,它似乎在玩使用 df.queryExecution.executedPlan.execute() 生成 RDD 而不是调用 .rdd 的伎俩。

然而事情似乎没有那么简单:

    似乎需要注意输出顺序 - https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L159

    可能是一些急切的执行问题? (没有把握) https://issues.apache.org/jira/browse/SPARK-20865

可以找到我遇到的问题的更多详细信息here

想知道进行这种转换的惯用方式是什么?

【问题讨论】:

【参考方案1】:

Dataset.rdd() 创建一个新的计划,只是打破了增量计划。因为 StreamExecution 使用现有计划来收集指标和更新水印,所以我们永远不应该创建新计划。否则,metrics 和 watermark 会在新计划中更新,StreamExecution 无法检索它们。

以下是 Scala 中用于在结构化流中转换列值的代码示例:

val convertedRows: RDD[Row] = df.queryExecution.toRdd.mapPartitions  iter: Iterator[InternalRow] =>
  iter.map  row =>
    val convertedValues: Array[Any] = new Array(conversionFunctions.length)
    var i = 0
    while (i < conversionFunctions.length) 
      convertedValues(i) = conversionFunctions(i)(row, i)
      i += 1
    
    Row.fromSeq(convertedValues)
  

【讨论】:

以上是关于将 Spark SQL 批处理源转换为结构化流接收器的主要内容,如果未能解决你的问题,请参考以下文章

将 SQL 查询转换为 Spark Dataframe 结构化数据处理

将spark结构化流数据帧转换为JSON

如何将 Spark 结构化流数据写入 REST API?

在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务

Spark 结构化流/Spark SQL 中的条件爆炸

Spark SQL