将 Spark 流输出写入套接字
Posted
技术标签:
【中文标题】将 Spark 流输出写入套接字【英文标题】:Writing Spark Streaming Output to a Socket 【发布时间】:2015-01-01 14:33:33 【问题描述】:我有一个 DStream“Crowd”,我想将“Crowd”中的每个元素写入一个套接字。当我尝试从该套接字读取时,它不会打印任何内容。我正在使用以下代码行:
val server = new ServerSocket(4000,200);
val conn = server.accept()
val out = new PrintStream(conn.getOutputStream());
crowd.foreachRDD(rdd => rdd.foreach(record=>out.println(record)))
但如果使用(虽然这不是我想要的):
crowd.foreachRDD(rdd => out.println(rdd))
它确实向套接字写入了一些东西。
我怀疑使用 rdd.foreach() 存在问题。虽然它应该工作。我不确定我错过了什么。
【问题讨论】:
【参考方案1】:DStream闭包外的代码在驱动中执行,而rdd.foreach(...)
会在RDD的每个分布式分区上执行。
因此,在驱动程序的机器上创建了一个套接字,并且作业尝试在另一台机器上写入它 - 由于显而易见的原因,这将无法正常工作。
DStream.foreachRDD
在驱动程序上执行,因此在这种情况下,套接字和计算在同一主机上执行。因此它有效。
由于 RDD 计算的分布式特性,这种服务器套接字方法将很难工作,因为动态服务发现成为一个挑战,即“我的服务器套接字在哪里打开?”。研究一些允许您集中访问分布式数据的系统。 Kafka 是这种流式处理的不错选择。
【讨论】:
谢谢。这有助于理解。因此,为了克服这个服务发现挑战,它将语句修改为:crowd.foreachRDD(rdd => rdd.collect.foreach(record=>out.println(record)))。这将起作用(它起作用),因为它将从工作人员那里收集 rdd 分区并将其发送到将写入套接字的驱动程序。我希望这是正确的方法。你怎么看这个..collect.foreach.......这件事?【参考方案2】:Here in the official documentation你有答案!
您必须在foreachRDD
函数内创建连接,如果您想以最佳方式完成此操作,您需要创建一个连接“池”,然后将您想要的连接带入foreachPartition
函数内, 并调用 foreach
函数通过该连接发送元素。这是以最佳方式执行此操作的示例代码:
dstream.foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
无论如何,请检查其他 cmets,因为它们提供了有关问题上下文的良好知识。
【讨论】:
【参考方案3】:crowd.foreachRDD(rdd => rdd.collect.foreach(record=>out.println(record)))
您在 cmets 中建议的代码可以正常工作,但在这种情况下,您必须在驱动程序中收集所有 RDD 记录。如果记录的数量很少,那将是可以的,但如果记录的数量大于驱动程序的内存,则会成为瓶颈。您的第一次尝试应始终在客户端处理数据。请记住,RDD 分布在工作机器上,这意味着首先您需要将 RDD 中的所有记录带到驱动程序中,从而增加通信,这在分布式计算中是一种扼杀。因此,如前所述,只有当 RDD 中的记录有限时,您的代码才可以使用。
我正在解决类似的问题,我一直在寻找如何将连接池化并将它们序列化到客户端计算机。如果有人对此有任何答案,那就太好了。
【讨论】:
以上是关于将 Spark 流输出写入套接字的主要内容,如果未能解决你的问题,请参考以下文章
在apache tomcat中将字节写入Web套接字时写入超时