仅在处理完我的 RDD 中的所有分区后,如何在 Spark Streaming 中接收输入?

Posted

技术标签:

【中文标题】仅在处理完我的 RDD 中的所有分区后,如何在 Spark Streaming 中接收输入?【英文标题】:How to receive an input in Spark Streaming only after all partitions in my RDD have been processed? 【发布时间】:2016-06-11 18:18:31 【问题描述】:

假设我有一个 JavaDStreamReceiver,它每秒从 Spark Streaming 中的 TCP/IP 套接字连接接收一个整数。 然后我将它存储在一个列表中,直到我有 100 个整数。 之后,我想将该 RDD 划分为 4 个分区,在我的 pc 中每个核心一个,并以并行方式映射这些分区。所以是这样的:

 public final class sparkstreaminggetjson 
 private static final Pattern SPACE = Pattern.compile(" ");
 private static Integer N=100;
 private static List<Integer> allInputValues= new List<Integer>();

 public static void main(String[] args) throws Exception 

  SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");


  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  JavaReceiverInputDStream<Integer> receivedStream = ssc.socketTextStream(
        args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);

  JavaDStream<List<Integer>> storeValuesInList=receivedStream.map( // if N<100, keeps inserting integers; if N>100, inserts the new value into the List and removes the oldest value );

  JavaDStream<List<Integer>> partitionedList=storeValuesInList.repartition(4);


  JavaDStream<List<Integer>> someCalculations=partionedList.map(//some calculations)

  JavaDStream<List<Integer>> otherCalculations=someCalculations.map(//other calculations)

...

finalStream.print();

这是我的问题。我想实现一个 FILO 模型,在该模型中我收到一个新输入,将它放在我的 RDD 的第一个分区中,并从 RDD 的最后一个分区中删除最后一个元素。所以基本上我从我的列表中输入和轮询整数,保持原始大小。之后我像往常一样并行处理每个分区。

这是我的问题:每当我的分区完成处理时,应用程序就会返回到receivedStream,而不是partitionedList。也就是说,我每个分区都有一个新的输入,这不是我想要的。我希望处理每个分区,然后才返回receivedStream 以获取新的输入。

我该怎么做?我应该在receivedStream 之后用其他方法替换map() 来分隔阶段吗?

非常感谢。

【问题讨论】:

我试图给出答案,但我意识到我可能误解了您的问题:您能否指定不​​同的条件应该开始新的计算? @Vale 谢谢你的回答,但你是对的,这不是我想知道的。现在,我有一个包含 4 个分区的 RDD,每个分区 25 个整数。我想要的是处理每个分区,然后才收到新的输入。正如我的代码现在,每次处理一个分区时,我都会收到一个新的输入(它进入接收器,我的 map() 方法的开始 理想情况下,我想在不同的内核中同时并行处理每个分区,然后才接收输入。希望这不会令人困惑 现在我明白了。但这是一个问题:Dstream 仅按时间间隔切割,在程序开始时给出。我不认为你可以停止接收新数据,除非你以某种方式向数据生产者发出信号,或者你创建一个个性化的接收器包装类,一旦标志设置为真,它就开始接收。但即使这样也是不可行的,对于分布式应用程序,我的意思是 【参考方案1】:

据我了解,您可以使用窗口:每秒 1 个整数意味着您可以使用

JavaDstream integers = your stream;
JavaDstream hundredInt = integers.window(Seconds(100));

这样每个 RDD 将有 100 个整数。

根据缓冲:newInt -&gt;[1...25][26...50][51...75][76...100] -&gt;lastInt 这就是我所理解的,所以如果你想保留最后一次计算,你可以rdd.cache()你的新 100 个整数并从中进行详细说明。或者rdd.checkpoint

【讨论】:

以上是关于仅在处理完我的 RDD 中的所有分区后,如何在 Spark Streaming 中接收输入?的主要内容,如果未能解决你的问题,请参考以下文章

Spark如何仅在分区内加入

Spark RDD - 分区总是在RAM中吗?

RDD 中的分区数和 Spark 中的性能

RDD 算子补充

将RDD划分为每个分区中元素数量固定的分区

如何从 pyspark rdd 或分区中确定原始 s3 输入文件名