Spark Streaming“回声”读取和写入套接字

Posted

技术标签:

【中文标题】Spark Streaming“回声”读取和写入套接字【英文标题】:Spark Streaming "echo" read and write to a socket 【发布时间】:2018-05-25 06:25:27 【问题描述】:

我正在查看这个火花样本:

https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example

我想实现一个“echo”应用程序,一个火花流软件,它从一个套接字读取一些字符并在同一个套接字上输出相同的字符(这是我真正问题的简化,当然我有一些处理在输入上做,但我们现在不关心这个)。

我尝试按照本指南实施 CustomReceiver: https://spark.apache.org/docs/2.3.0/streaming-custom-receivers.html

我添加了一个方法getSocket

def getSocket() : Socket = 
   socket

我试着像这样调用它:

val receiver = new SocketReceiver2("localhost", 9999, StorageLevel.MEMORY_AND_DISK_2)
val lines = ssc.receiverStream(receiver)
lines.foreachRDD  
    rdd => rdd.foreachPartition  partitionOfRecords =>
       val os = receiver.getSocket().getOutputStream();
       partitionOfRecords.foreach(record => os.write(record.getBytes()))
    

但我收到 Task not serializable 错误。 (正如 T.Gaweda 指出的那样,这是意料之中的)。所以下一步就是开始使用累加器...

有没有更简单的方法可以在 Spark Streaming 中执行我的“回声”应用程序?

(我真的需要使用 Kafka(hdfs、hive...)从一个简单的 java 应用程序来回发送数据吗?)

【问题讨论】:

【参考方案1】:

接收者被发送给工作人员,在那里他们被执行。您可以在 Receiver 类中看到,它实现了 Serializable。

在您的代码中,您有 socket 字段,可能是 Socket 类型,它是不可序列化的

【讨论】:

是的,谢谢,我几乎意识到 :) :向工作人员发送打开的套接字是没有意义的。这就是为什么我说我应该开始使用累加器。我仍然对这个 echo 应用程序变得如此复杂感到惊讶。如果我有时间完成它,我会发布更新。

以上是关于Spark Streaming“回声”读取和写入套接字的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming:读取和写入状态信息到外部数据库,如 cassandra

Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题

使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?

混合 Spark Structured Streaming API 和 DStream 写入 Kafka

spark streaming 读取kafka两种方式的区别

数据湖(十六):Structured Streaming实时写入Iceberg