将 Spark 流输出写入套接字

Posted

技术标签:

【中文标题】将 Spark 流输出写入套接字【英文标题】:Writing Spark Streaming Output to a Socket 【发布时间】:2015-01-01 14:33:33 【问题描述】:

我有一个 DStream“Crowd”,我想将“Crowd”中的每个元素写入一个套接字。当我尝试从该套接字读取时,它不会打印任何内容。我正在使用以下代码行:

val server = new ServerSocket(4000,200);
val conn = server.accept()
val out = new PrintStream(conn.getOutputStream());
crowd.foreachRDD(rdd => rdd.foreach(record=>out.println(record)))

但如果使用(虽然这不是我想要的):

crowd.foreachRDD(rdd => out.println(rdd)) 

它确实向套接字写入了一些东西。

我怀疑使用 rdd.foreach() 存在问题。虽然它应该工作。我不确定我错过了什么。

【问题讨论】:

【参考方案1】:

DStream闭包外的代码在驱动中执行,而rdd.foreach(...)会在RDD的每个分布式分区上执行。 因此,在驱动程序的机器上创建了一个套接字,并且作业尝试在另一台机器上写入它 - 由于显而易见的原因,这将无法正常工作。

DStream.foreachRDD 在驱动程序上执行,因此在这种情况下,套接字和计算在同一主机上执行。因此它有效。

由于 RDD 计算的分布式特性,这种服务器套接字方法将很难工作,因为动态服务发现成为一个挑战,即“我的服务器套接字在哪里打开?”。研究一些允许您集中访问分布式数据的系统。 Kafka 是这种流式处理的不错选择。

【讨论】:

谢谢。这有助于理解。因此,为了克服这个服务发现挑战,它将语句修改为:crowd.foreachRDD(rdd => rdd.collect.foreach(record=>out.println(record)))。这将起作用(它起作用),因为它将从工作人员那里收集 rdd 分区并将其发送到将写入套接字的驱动程序。我希望这是正确的方法。你怎么看这个..collect.foreach.......这件事?【参考方案2】:

Here in the official documentation你有答案!

您必须在foreachRDD 函数内创建连接,如果您想以最佳方式完成此操作,您需要创建一个连接“池”,然后将您想要的连接带入foreachPartition 函数内, 并调用 foreach 函数通过该连接发送元素。这是以最佳方式执行此操作的示例代码:

dstream.foreachRDD  rdd =>
  rdd.foreachPartition  partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  

无论如何,请检查其他 cmets,因为它们提供了有关问题上下文的良好知识。

【讨论】:

【参考方案3】:
crowd.foreachRDD(rdd => rdd.collect.foreach(record=>out.println(record)))

您在 cmets 中建议的代码可以正常工作,但在这种情况下,您必须在驱动程序中收集所有 RDD 记录。如果记录的数量很少,那将是可以的,但如果记录的数量大于驱动程序的内存,则会成为瓶颈。您的第一次尝试应始终在客户端处理数据。请记住,RDD 分布在工作机器上,这意味着首先您需要将 RDD 中的所有记录带到驱动程序中,从而增加通信,这在分布式计算中是一种扼杀。因此,如前所述,只有当 RDD 中的记录有限时,您的代码才可以使用。

我正在解决类似的问题,我一直在寻找如何将连接池化并将它们序列化到客户端计算机。如果有人对此有任何答案,那就太好了。

【讨论】:

以上是关于将 Spark 流输出写入套接字的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming“回声”读取和写入套接字

您可以同时写入套接字输入和输出流吗?

在apache tomcat中将字节写入Web套接字时写入超时

IDEA Spark Streaming 操作(套接字流)

IDEA Spark Streaming 操作(套接字流)-----make socket数据源

InputStream 没有收到 EOF