即使有 0 条消息,火花流中的转换也需要更多时间

Posted

技术标签:

【中文标题】即使有 0 条消息,火花流中的转换也需要更多时间【英文标题】:Transformations in spark streaming taking more time even though there are 0 messages 【发布时间】:2019-02-13 10:22:43 【问题描述】:

我在使用火花流时遇到严重的性能问题。对于 10 秒的批处理间隔,程序大约需要 2 分钟。我尝试在没有来自 kafka 主题的 0 条消息的情况下进行调试。即使没有要使用/处理的消息,大多数转换也需要超过 30 秒的时间。即使 decodeMessagesDF 中没有消息,以下命令也需要大约 40 秒。

val enrichedDF: DataFrame = decodeMessagesDF.join(broadcast(customer), (decodeMessagesDF( "rowkey") === customer("rowkey")) && (customer("mkt_opto_flag") === "N") && (customer("ads_opto_flag") === "N"))

此外,以下发布代码也需要大约 30 秒才能发布 0 条消息

  message.foreachPartition part =>
  val producer = new KafkaProducer[String, String](props)
  part.foreach msg =>
    val message = new ProducerRecord[String, String](topic, msg._1, msg._2)
    producer.send(message)
  
  producer.close()

如果代码中有任何错误,请告诉我。谢谢

【问题讨论】:

您在日志中找到任何线索吗? AFAIK,代码看起来不错。 不,我在日志中找不到任何内容。谢谢 【参考方案1】:

如果“客户”有大量数据,广播是昂贵的。也许你应该像这样将广播(客户)从加入操作中退出:

    val consumerBroadcast = sc.broadcast(customer)
    val enrichedDF: DataFrame = decodeMessagesDF.join(broadcast(consumerBroadcast), (decodeMessagesDF( "rowkey") === customer("rowkey")) && (customer("mkt_opto_flag") === "N") && (customer("ads_opto_flag") === "N"))

此代码将只广播一次客户。但您的代码将每批次广播消费者。

【讨论】:

以上是关于即使有 0 条消息,火花流中的转换也需要更多时间的主要内容,如果未能解决你的问题,请参考以下文章

每个微批次火花流中处理的总记录

火花流中的广播变量空指针异常

Spark:火花流中的接收器是瓶颈吗?

如何更新火花流中的广播变量?

如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?

如何在火花流中添加 2 行具有相同键(列值)的行?