为啥使用套接字源和多个接收器的流式查询不起作用?

Posted

技术标签:

【中文标题】为啥使用套接字源和多个接收器的流式查询不起作用?【英文标题】:Why does streaming query with socket source and multiple sinks not work?为什么使用套接字源和多个接收器的流式查询不起作用? 【发布时间】:2019-01-02 20:21:02 【问题描述】:

我正在尝试使用多个查询来写入 spark 中的不同接收器。第一个查询有效,输出写入接收器,但第二个查询无效。

谁能指出我的错误。

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()

import spark.implicits._
val source = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
  .as[String]
  .map e =>
    println(e)
    e
  

// With Multiple Queries
val q1 = source.writeStream.outputMode("append").format("console")
  .trigger(Trigger.ProcessingTime(1000))
  .start()
println(q1)

val q2 = source.writeStream.outputMode("append")
  .format("csv")
  .option("path", "output.csv")
  .option("checkpointLocation", "/tmp/checkpoint/test")
  .trigger(Trigger.ProcessingTime(1000))
  .start()
println(q2)

spark.streams.awaitAnyTermination()

控制台接收器正在工作,但 CSV 接收器没有写入输出。如果我更改订单,则 csv sink 可以工作,但不能控制台。

【问题讨论】:

【参考方案1】:

我假设您正在使用 netcat 或类似的实用程序来生成数据。此类实用程序并非设计为可重放且不提供持久层,因此数据一旦被使用就会被不可逆转地破坏。

因此,第二个流将监听更改,但不会有任何数据到达它。

【讨论】:

可以使用 kafka 源和多个写入流进行流式传输?

以上是关于为啥使用套接字源和多个接收器的流式查询不起作用?的主要内容,如果未能解决你的问题,请参考以下文章

用于发送和接收用户定义对象的套接字程序不起作用

为啥 subscribe() 不起作用但模板中的异步可以?

PHP/Java 套接字通信不起作用

在接收器应用程序中,为啥音频标签事件不起作用?

为啥java更新查询不起作用?

为啥当我解构属性时接收道具的组件不起作用,但是当我使用 props.key 时它起作用了?