为啥当我发送两个输入流时 Spark Streaming 停止工作?
Posted
技术标签:
【中文标题】为啥当我发送两个输入流时 Spark Streaming 停止工作?【英文标题】:Why does Spark Streaming stop working when I send two input streams?为什么当我发送两个输入流时 Spark Streaming 停止工作? 【发布时间】:2016-05-09 12:39:16 【问题描述】:我正在开发一个 Spark Streaming 应用程序,我需要在其中使用来自 Python 中的两个服务器的输入流,每个服务器每秒向 Spark 上下文发送一条 JSON 消息。
我的问题是,如果我只对一个流执行操作,那么一切正常。但是,如果我有来自不同服务器的两个流,那么 Spark 在它可以打印任何东西之前就冻结了,并且只有在两个服务器都发送了它们必须发送的所有 JSON 消息时才重新开始工作(当它检测到 'socketTextStream
没有接收到数据。
这是我的代码:
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>()
public Tuple2<Integer, String> call(String stream) throws Exception
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);
return streamPair;
);
JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>()
public Tuple2<Integer, String> call(String stream) throws Exception
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);
return streamPair;
);
dataStream2.print(); //for example
请注意,没有 ERROR 消息,Spark 在启动上下文后简单冻结,当我从端口获取 JSON 消息时,它没有显示任何内容。
非常感谢。
【问题讨论】:
【参考方案1】:查看 Spark Streaming documentation 中的这些警告,看看它们是否适用:
要记住的要点
在本地运行 Spark Streaming 程序时,请勿使用“local”或“local1”作为主 URL。这些中的任何一个都意味着只有一个线程将用于在本地运行任务。如果您使用的是基于接收器(例如套接字、Kafka、Flume 等)的输入 DStream,那么将使用单个线程来运行接收器,不留任何线程来处理接收到的数据。因此,在本地运行时,始终使用“local[n]”作为主 URL,其中 n > 要运行的接收器数量(有关如何设置主服务器的信息,请参阅 Spark 属性)。 将逻辑扩展到在集群上运行,分配给 Spark Streaming 应用程序的核心数必须大于接收器数。否则系统会收到数据,但无法处理。
【讨论】:
完美,我设置了 local[2] 而不是 local[3]。非常感谢。 顺便说一句,如果您使用的是 Kafka,您可以从同一个DStream
中的两个源中读取数据,这样您就不必为每个源绑定一个专用线程。在此处查看此答案,了解您如何在 Kafka 中执行此操作 - 在单个 DStream
中打开多个主题。以上是关于为啥当我发送两个输入流时 Spark Streaming 停止工作?的主要内容,如果未能解决你的问题,请参考以下文章
Spark Direct Stream 不会为每个 kafka 分区创建并行流
为啥ffmpeg在转换https m3u8流时忽略protocol_whitelist标志?