无法使用火花结构化流计算文档
Posted
技术标签:
【中文标题】无法使用火花结构化流计算文档【英文标题】:Unable to Count the documents using spark structured streaming 【发布时间】:2020-04-14 11:11:18 【问题描述】:我正在尝试使用 couchbase 作为使用 spark 连接器的 spark 结构化流的流源。
val records = spark.readStream
.format(“com.couchbase.spark.sql”).schema(schema)
.load()
我有这个问题
records
.groupBy(“type”)
.count()
.writeStream
.outputMode(“complete”)
.format(“console”)
.start()
.awaitTermination()
对于这个查询,我没有得到正确的输出。我的查询输出表是这样的
Batch: 0
20/04/14 14:28:00 INFO CodeGenerator: Code generated in 10.538654 ms
20/04/14 14:28:00 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@17fe0ec7 committed.
±-------±----+
|type | count|
±-------±----+
±-------±----+
但是,如果我使用沙发底座将文档作为非流式获取。喜欢
val cdr = spark.read.couchbase(EqualTo(“type”, “cdr”))
cdr.count()
已正确推断此非流式操作的架构,并为结构化流式传输使用相同的架构。
INFO N1QLRelation: Inferred schema is StructType(StructField(META_ID,StringType,true), StructField(_class,StringType,true), StructField(accountId,StringType,true),
给出正确的输出。 (计数 = 28)。
请告诉我为什么这不适用于结构化流媒体。
【问题讨论】:
我不熟悉 couchbase 连接器,但也许默认行为是只查找新记录?当您提交流时,您正在尝试将新记录添加到沙发库或记录已经存在? 【参考方案1】:这可能是因为您只流式传输从现在开始发生的变化,而不是过去的事件。 如果您想“从头开始”流式传输所有内容,则需要指定。
此博客文章中显示了示例:https://blog.couchbase.com/couchbase-spark-connector-2-0-0-released/
基本上在您的流中,您需要指定以下行
.couchbaseStream(from = FromBeginning, to = ToInfinity)
【讨论】:
我正在使用结构化流。这些在那个中不可用以上是关于无法使用火花结构化流计算文档的主要内容,如果未能解决你的问题,请参考以下文章
在火花结构化流中反序列化 kafka avro 主题的 int 编码无效