将 Spark SQL 批处理源转换为结构化流接收器
Posted
技术标签:
【中文标题】将 Spark SQL 批处理源转换为结构化流接收器【英文标题】:Convert a Spark SQL batch source to structured streaming sink 【发布时间】:2018-08-06 21:07:41 【问题描述】:试图通过简单地实现调用createRelation(...)
的addBatch(...)
将org.apache.spark.sql.sources.CreatableRelationProvider
转换为org.apache.spark.sql.execution.streaming.Sink
,但createRelation(...)
中有一个df.rdd
,这会导致以下错误:
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
试图研究如何org.apache.spark.sql.execution.streaming.FileStreamSink
也需要在流作业中从数据帧中获取 Rdd,它似乎在玩使用 df.queryExecution.executedPlan.execute()
生成 RDD 而不是调用 .rdd
的伎俩。
然而事情似乎没有那么简单:
似乎需要注意输出顺序 - https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L159
可能是一些急切的执行问题? (没有把握) https://issues.apache.org/jira/browse/SPARK-20865
可以找到我遇到的问题的更多详细信息here
想知道进行这种转换的惯用方式是什么?
【问题讨论】:
【参考方案1】:Dataset.rdd()
创建一个新的计划,只是打破了增量计划。因为 StreamExecution 使用现有计划来收集指标和更新水印,所以我们永远不应该创建新计划。否则,metrics 和 watermark 会在新计划中更新,StreamExecution 无法检索它们。
以下是 Scala 中用于在结构化流中转换列值的代码示例:
val convertedRows: RDD[Row] = df.queryExecution.toRdd.mapPartitions iter: Iterator[InternalRow] =>
iter.map row =>
val convertedValues: Array[Any] = new Array(conversionFunctions.length)
var i = 0
while (i < conversionFunctions.length)
convertedValues(i) = conversionFunctions(i)(row, i)
i += 1
Row.fromSeq(convertedValues)
【讨论】:
以上是关于将 Spark SQL 批处理源转换为结构化流接收器的主要内容,如果未能解决你的问题,请参考以下文章
将 SQL 查询转换为 Spark Dataframe 结构化数据处理