为啥当我发送两个输入流时 Spark Streaming 停止工作?

Posted

技术标签:

【中文标题】为啥当我发送两个输入流时 Spark Streaming 停止工作?【英文标题】:Why does Spark Streaming stop working when I send two input streams?为什么当我发送两个输入流时 Spark Streaming 停止工作? 【发布时间】:2016-05-09 12:39:16 【问题描述】:

我正在开发一个 Spark Streaming 应用程序,我需要在其中使用来自 Python 中的两个服务器的输入流,每个服务器每秒向 Spark 上下文发送一条 JSON 消息。

我的问题是,如果我只对一个流执行操作,那么一切正常。但是,如果我有来自不同服务器的两个流,那么 Spark 在它可以打印任何东西之前就冻结了,并且只有在两个服务器都发送了它们必须发送的所有 JSON 消息时才重新开始工作(当它检测到 'socketTextStream 没有接收到数据。

这是我的代码:

    JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
            StorageLevels.MEMORY_AND_DISK_SER);

    JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);

    JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() 
        public Tuple2<Integer, String> call(String stream) throws Exception 


            Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);

            return streamPair;
        
    );

    JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() 
        public Tuple2<Integer, String> call(String stream) throws Exception 


            Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);

            return streamPair;
        
    );

dataStream2.print(); //for example

请注意,没有 ERROR 消息,Spark 在启动上下文后简单冻结,当我从端口获取 JSON 消息时,它没有显示任何内容。

非常感谢。

【问题讨论】:

【参考方案1】:

查看 Spark Streaming documentation 中的这些警告,看看它们是否适用:

要记住的要点

在本地运行 Spark Streaming 程序时,请勿使用“local”或“local1”作为主 URL。这些中的任何一个都意味着只有一个线程将用于在本地运行任务。如果您使用的是基于接收器(例如套接字、Kafka、Flume 等)的输入 DStream,那么将使用单个线程来运行接收器,不留任何线程来处理接收到的数据。因此,在本地运行时,始终使用“local[n]”作为主 URL,其中 n > 要运行的接收器数量(有关如何设置主服务器的信息,请参阅 Spark 属性)。 将逻辑扩展到在集群上运行,分配给 Spark Streaming 应用程序的核心数必须大于接收器数。否则系统会收到数据,但无法处理。

【讨论】:

完美,我设置了 local[2] 而不是 local[3]。非常感谢。 顺便说一句,如果您使用的是 Kafka,您可以从同一个 DStream 中的两个源中读取数据,这样您就不必为每个源绑定一个专用线程。在此处查看此答案,了解您如何在 Kafka 中执行此操作 - 在单个 DStream 中打开多个主题。

以上是关于为啥当我发送两个输入流时 Spark Streaming 停止工作?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark SQL 流时缺少 Avro 自定义标头

Spark Direct Stream 不会为每个 kafka 分区创建并行流

为啥ffmpeg在转换https m3u8流时忽略protocol_whitelist标志?

openfire+spark开发即时聊天系统,当我将两者服务配置好后,可以正常聊天。为啥不能发送文件夹?

执行任何 IO/流时 C++ JNI 崩溃/挂起

为啥我的控制器发送内容类型“application/octet-stream”?