火花中的Java 8流开销

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了火花中的Java 8流开销相关的知识,希望对你有一定的参考价值。

我不是Spark的专家,我正在使用Spark进行一些计算。

    // [userId, lastPurchaseLevel]
    JavaPairRDD<String, Integer> lastPurchaseLevels = 
            levels.groupByKey()
            .join(purchases.groupByKey())
            .mapValues(t -> getLastPurchaseLevel(t));

在getLastPurchaseLevel()函数中,我有这样的代码:

private static Integer getLastPurchaseLevel(Tuple2<Iterable<SourceLevelRecord>, Iterable<PurchaseRecord>> t) {

....
final Comparator<PurchaseRecord> comp = (a, b) -> Long.compare(a.dateMsec, b.dateMsec);
PurchaseRecord latestPurchase = purchaseList.stream().max(comp).get();

但我的老板告诉我不要使用流(),他说:

我们更好地采用经典方式,因为没有CPU核心可以进行流式传输 - 所有CPU都已经被Spark工作者使用。

我知道经典的方法是迭代并找到最大值,因此流将导致比经典方式更多的CPU消耗或开销?或者只是在这种Spark环境中?

答案

我们更好地采用经典方式,因为没有CPU核心可以进行流式传输 - 所有CPU都已经被Spark工作者使用。

老板的想法:Spark已经将任务安排到线程(或cpu内核),不需要在单个任务中同时执行任务。

...所以流将导致比经典方式更多的CPU消耗或开销?或者只是在这种Spark环境中?

除非另有说明(通过调用Stream.parallel()方法),Java流是单线程的。所以只要你没有并行化流,你的老板就不会抱怨。

以上是关于火花中的Java 8流开销的主要内容,如果未能解决你的问题,请参考以下文章

如何使用火花流计算流中的新元素

java - 调用自定义火花UDF时如何解决Java中的NoSuchMethodException

火花流中的广播变量空指针异常

火花流到pyspark json文件中的数据帧

如何更新火花流中的广播变量?

使用火花流解析事件中心消息上的 JSON