Spark:如何加速 foreachRDD?

Posted

技术标签:

【中文标题】Spark:如何加速 foreachRDD?【英文标题】:Spark : How to speedup foreachRDD? 【发布时间】:2017-03-03 15:18:39 【问题描述】:

我们有一个 Spark 流应用程序,它摄取数据 @10,000/ 秒 ...我们在 DStream 上使用 foreachRDD 操作(因为除非在 DStream 上找到输出操作,否则 spark 不会执行)

所以我们必须像这样使用foreachRDD输出操作,最多需要3小时 ...写入单批数据(10,000),这是

代码片段 1:

requestsWithState.foreachRDD  rdd =>

     rdd.foreach 
     case (topicsTableName, hashKeyTemp, attributeValueUpdate) =>           
          val client = new AmazonDynamoDBClient()
          val request = new UpdateItemRequest(topicsTableName, hashKeyTemp, attributeValueUpdate)
          try client.updateItem(request)

        catch 
            case se: Exception => println("Error executing updateItem!\nTable ", se)
         
        
        case null =>
      
    
   

所以我认为 foreachRDD 中的代码可能是问题所在,所以将其注释掉以查看它需要多少时间......令我惊讶的是......即使 foreachRDD 中没有代码,它仍然运行 3 小时

代码片段 2:

requestsWithState.foreachRDD  
rdd => rdd.foreach  
// No code here still takes a lot of time ( there used to be code but removed it to see if it's any faster without code) // 
 
  

如果我们遗漏任何内容或另一种方法来执行此操作,请告诉我们,因为我理解没有 DStream 火花流应用程序上的输出操作将无法运行..此时我无法使用其他输出操作...

注意:为了隔离问题并确保发电机代码没有问题......我用空循环运行......看起来 foreachRDD 在迭代一个巨大的记录集时本身很慢在@10,000/sec ...而不是作为空foreachRDD的发电机代码和发电机代码花费了相同的时间......

ScreenShot 显示了 foreachRDD 执行的所有阶段和花费的时间,即使它只是循环且内部没有代码

foreachRDD 空循环所花费的时间

foreachRDD 空循环的 9 个工作节点之间大型运行任务的任务分配...

【问题讨论】:

不确定为什么空的会很慢,但是如果您有很多集群同时运行它,请确保您对发电机的写入吞吐量非常高。也可能有助于发布您的火花流配置。 @Derek_M 感谢您的评论 ....我们有非常庞大的读写吞吐量 ...写入 10,000 应该不是问题 ....就像我在问题..dynamo代码不应该是一个问题,并证明这不是我用空循环运行的问题......看起来foreachRDD本身很慢...... 您的流媒体配置是什么样的?在运行 foreachRDD 之前是否进行了其他操作? @Derek_M 我确实运行了地图转换,但正如您所见 ..从更新的屏幕截图中 ..他们只需要几秒钟的时间,而 foreachRDD 空循环占用的主要时间(以分钟为单位).. @zero323 是的,您假设它在上游有一个 mapWithState 是绝对正确的......我已经用任务分配更新了这个问题......请让我知道是否你需要更多信息 【参考方案1】:

我知道已经晚了,但如果你想听,我有一些猜测可能会给你一些见解。

耗时长的不是rdd.foreach里面的代码,而是rdd.foreach之前的代码,生成rdd的代码。 转换是惰性的,在您使用结果之前,spark 不会计算它。 当代码在rdd.foreach 中运行时,sp​​ark 会进行计算,并生成数据行。rdd.foreach 循环中的代码只操作结果。 您可以通过注释掉 rdd.foreach 来检查这一点

requestsWithState.foreachRDD  
  //rdd => rdd.foreach  
  // No code here still takes a lot of time ( there used to be code but removed it to //see if it's any faster without code) 
  //
 

我想它会非常快,因为没有计算发生。 或者您可以将转换更改为非常简单的转换,它也会很快。 它不能解决你的问题,但如果我是对的,它会帮助你找到你的问题。

【讨论】:

【参考方案2】:

您是否尝试过不使用循环,如下所示?

//requestsWithState.foreachRDD   
  //rdd => rdd.foreach   
  // No code here //  
  //  
//

foreachRDD 需要时间,而不是其中的代码。请注意,它是foreach 而不是for。不管里面有没有代码,它都会运行n次。

有效的测试可用于性能测试:

https://tech.ovoenergy.com/spark-streaming-in-production-testing/

【讨论】:

以上是关于Spark:如何加速 foreachRDD?的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming之妙用foreachRDD和foreachPartition

Spark Streaming之妙用foreachRDD和foreachPartition

在 apache spark 流中使用 foreachRDD 内的数据库连接

Spark Streaming DStream的output操作以及foreachRDD详解

spark官网学习

070 DStream中的transform和foreachRDD函数