无法使用火花结构化流计算文档

Posted

技术标签:

【中文标题】无法使用火花结构化流计算文档【英文标题】:Unable to Count the documents using spark structured streaming 【发布时间】:2020-04-14 11:11:18 【问题描述】:

我正在尝试使用 couchbase 作为使用 spark 连接器的 spark 结构化流的流源。

val records = spark.readStream
.format(“com.couchbase.spark.sql”).schema(schema)
.load()

我有这个问题

records
.groupBy(“type”)
.count()
.writeStream
.outputMode(“complete”)
.format(“console”)
.start()
.awaitTermination()

对于这个查询,我没有得到正确的输出。我的查询输出表是这样的

Batch: 0
20/04/14 14:28:00 INFO CodeGenerator: Code generated in 10.538654 ms
20/04/14 14:28:00 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@17fe0ec7 committed.
±-------±----+
|type | count|
±-------±----+
±-------±----+

但是,如果我使用沙发底座将文档作为非流式获取。喜欢

val cdr = spark.read.couchbase(EqualTo(“type”, “cdr”))
cdr.count()

已正确推断此非流式操作的架构,并为结构化流式传输使用相同的架构。

INFO N1QLRelation: Inferred schema is StructType(StructField(META_ID,StringType,true), StructField(_class,StringType,true), StructField(accountId,StringType,true), 

给出正确的输出。 (计数 = 28)。

请告诉我为什么这不适用于结构化流媒体。

【问题讨论】:

我不熟悉 couchbase 连接器,但也许默认行为是只查找新记录?当您提交流时,您正在尝试将新记录添加到沙发库或记录已经存在? 【参考方案1】:

这可能是因为您只流式传输从现在开始发生的变化,而不是过去的事件。 如果您想“从头开始”流式传输所有内容,则需要指定。

此博客文章中显示了示例:https://blog.couchbase.com/couchbase-spark-connector-2-0-0-released/

基本上在您的流中,您需要指定以下行

  .couchbaseStream(from = FromBeginning, to = ToInfinity)

【讨论】:

我正在使用结构化流。这些在那个中不可用

以上是关于无法使用火花结构化流计算文档的主要内容,如果未能解决你的问题,请参考以下文章

如何使用火花流计算流中的新元素

在火花结构化流中反序列化 kafka avro 主题的 int 编码无效

如何在火花结构化流式读取流中倒带 Kafka 偏移

火花流foreach多个作家

如何直接在 Azure Blob 存储上存储火花作业(结构化流)的检查点?

火花中的Java 8流开销