如何从套接字读取流数据集?

Posted

技术标签:

【中文标题】如何从套接字读取流数据集?【英文标题】:How to read streaming datasets from socket? 【发布时间】:2017-06-30 13:52:56 【问题描述】:

以下代码从套接字读取,但我没有看到任何输入进入作业。我有 nc -l 1111 正在运行,并且正在转储数据,但不确定为什么我的 Spark 作业无法从 10.176.110.112:1111 读取数据。

Dataset<Row> d = sparkSession.readStream().format("socket") 
                                    .option("host", "10.176.110.112")
                                    .option("port", 1111).load();

【问题讨论】:

【参考方案1】:

以下代码从套接字读取,但我没有看到任何输入进入作业。

好吧,老实说,您确实从任何地方阅读任何内容。您只描述了您在启动流式传输管道时将要做什么

由于您使用结构化流从套接字读取数据集,因此您应该使用start 运算符来触发数据获取(这仅在您定义接收器之后)。

start(): StreamingQuery 开始执行流式查询,当新数据到达时,它将不断地将结果输出到给定的路径。返回的 StreamingQuery 对象可用于与流交互。

start 之前,您应该定义流式传输数据的位置。它可以是 Kafka、文件、自定义流式接收器(可能使用 foreach 运算符)或控制台。

我在以下示例中使用console sink(又名格式)。我还使用 Scala,并将其重写为 Java 作为您的家庭练习。

d.writeStream.  // <-- this is the most important part
  trigger(Trigger.ProcessingTime("10 seconds")).
  format("console").
  option("truncate", false).
  start         // <-- and this

【讨论】:

以上是关于如何从套接字读取流数据集?的主要内容,如果未能解决你的问题,请参考以下文章

用VLC读取摄像头产生RTSP流,DSS主动取流转发

用VLC读取摄像头产生RTSP流,DSS侦听并转发

合并数据+统计流数

从套接字流读取时是不是需要线程睡眠?

C# Windows 应用程序从流套接字读取数据不可理解

nodejs fs 读取文件流一次读取多少数据