仅在处理完我的 RDD 中的所有分区后,如何在 Spark Streaming 中接收输入?
Posted
技术标签:
【中文标题】仅在处理完我的 RDD 中的所有分区后,如何在 Spark Streaming 中接收输入?【英文标题】:How to receive an input in Spark Streaming only after all partitions in my RDD have been processed? 【发布时间】:2016-06-11 18:18:31 【问题描述】:假设我有一个 JavaDStreamReceiver,它每秒从 Spark Streaming 中的 TCP/IP 套接字连接接收一个整数。 然后我将它存储在一个列表中,直到我有 100 个整数。 之后,我想将该 RDD 划分为 4 个分区,在我的 pc 中每个核心一个,并以并行方式映射这些分区。所以是这样的:
public final class sparkstreaminggetjson
private static final Pattern SPACE = Pattern.compile(" ");
private static Integer N=100;
private static List<Integer> allInputValues= new List<Integer>();
public static void main(String[] args) throws Exception
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<Integer> receivedStream = ssc.socketTextStream(
args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<List<Integer>> storeValuesInList=receivedStream.map( // if N<100, keeps inserting integers; if N>100, inserts the new value into the List and removes the oldest value );
JavaDStream<List<Integer>> partitionedList=storeValuesInList.repartition(4);
JavaDStream<List<Integer>> someCalculations=partionedList.map(//some calculations)
JavaDStream<List<Integer>> otherCalculations=someCalculations.map(//other calculations)
...
finalStream.print();
这是我的问题。我想实现一个 FILO 模型,在该模型中我收到一个新输入,将它放在我的 RDD 的第一个分区中,并从 RDD 的最后一个分区中删除最后一个元素。所以基本上我从我的列表中输入和轮询整数,保持原始大小。之后我像往常一样并行处理每个分区。
这是我的问题:每当我的分区完成处理时,应用程序就会返回到receivedStream
,而不是partitionedList
。也就是说,我每个分区都有一个新的输入,这不是我想要的。我希望处理每个分区,然后才返回receivedStream
以获取新的输入。
我该怎么做?我应该在receivedStream
之后用其他方法替换map()
来分隔阶段吗?
非常感谢。
【问题讨论】:
我试图给出答案,但我意识到我可能误解了您的问题:您能否指定不同的条件应该开始新的计算? @Vale 谢谢你的回答,但你是对的,这不是我想知道的。现在,我有一个包含 4 个分区的 RDD,每个分区 25 个整数。我想要的是处理每个分区,然后才收到新的输入。正如我的代码现在,每次处理一个分区时,我都会收到一个新的输入(它进入接收器,我的 map() 方法的开始 理想情况下,我想在不同的内核中同时并行处理每个分区,然后才接收输入。希望这不会令人困惑 现在我明白了。但这是一个问题:Dstream 仅按时间间隔切割,在程序开始时给出。我不认为你可以停止接收新数据,除非你以某种方式向数据生产者发出信号,或者你创建一个个性化的接收器包装类,一旦标志设置为真,它就开始接收。但即使这样也是不可行的,对于分布式应用程序,我的意思是 【参考方案1】:据我了解,您可以使用窗口:每秒 1 个整数意味着您可以使用
JavaDstream integers = your stream;
JavaDstream hundredInt = integers.window(Seconds(100));
这样每个 RDD 将有 100 个整数。
根据缓冲:newInt ->[1...25][26...50][51...75][76...100] ->lastInt
这就是我所理解的,所以如果你想保留最后一次计算,你可以rdd.cache()
你的新 100 个整数并从中进行详细说明。或者rdd.checkpoint
。
【讨论】:
以上是关于仅在处理完我的 RDD 中的所有分区后,如何在 Spark Streaming 中接收输入?的主要内容,如果未能解决你的问题,请参考以下文章