Scala 中的 Apache Spark 日志记录

Posted

技术标签:

【中文标题】Scala 中的 Apache Spark 日志记录【英文标题】:Apache Spark logging within Scala 【发布时间】:2015-05-26 08:38:32 【问题描述】:

我正在寻找一种能够在 Apache Spark 节点上执行代码时记录额外数据的解决方案,以帮助稍后调查执行期间可能出现的一些问题。尝试使用传统解决方案(例如 com.typesafe.scalalogging.LazyLogging)会失败,因为无法在 Apache Spark 等分布式环境上序列化日志实例。

我已经调查过这个问题,目前我发现的解决方案是使用 org.apache.spark.Logging 特征,如下所示:

class SparkExample with Logging 
  val someRDD = ...
  someRDD.map 
    rddElement => logInfo(s"$rddElement will be processed.")
    doSomething(rddElement)
  

但是,Logging trait 似乎不是 Apache Spark 的永久解决方案,因为它被标记为 @DeveloperApi 并且类文档提到:

这可能会在未来的版本中更改或删除。

我想知道 - 它们是否是我可以使用的任何已知日志记录解决方案,并且允许我在 Apache Spark 节点上执行 RDD 时记录数据?

@Later Edit :下面的一些 cmets 建议使用 Log4J。我尝试过使用 Log4J,但在使用 Scala 类(而不是 Scala 对象)中的记录器时仍然遇到问题。 这是我的完整代码:

import org.apache.log4j.Logger
import org.apache.spark._

object Main 
 def main(args: Array[String]) 
  new LoggingTestWithRDD().doTest()
 


class LoggingTestWithRDD extends Serializable 

  val log = Logger.getLogger(getClass.getName)

  def doTest(): Unit = 
   val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
   val spark = new SparkContext(conf)

   val someRdd = spark.parallelize(List(1, 2, 3))
   someRdd.map 
     element =>
       log.info(s"$element will be processed")
       element + 1
    
   spark.stop()
 

我看到的例外是:

线程 "main" org.apache.spark.SparkException 中的异常:任务不可序列化 -> 原因:java.io.NotSerializableException: org.apache.log4j.Logger

【问题讨论】:

嗯...配置您的 log4j 并完成您的日志记录。 所以基本上 Apache Spark 强制你只使用 log4j ? Fwiw,log4j的正确拼写是“slf4j”。 除了 / 作为日志记录的替代方案之外,指标可能会为您提供您想要的:spark.apache.org/docs/latest/monitoring.html @michael_n 这不正确。 log4j 和 slf4j 是不同的东西。 【参考方案1】:

您可以使用 Akhil 在https://www.mail-archive.com/user@spark.apache.org/msg29010.html 中提出的解决方案。 我自己用过,效果很好。

Akhil Das 星期一,2015 年 5 月 25 日 08:20:40 -0700 试试这个方法:

object Holder extends Serializable       
   @transient lazy val log = Logger.getLogger(getClass.getName)    



val someRdd = spark.parallelize(List(1, 2, 3)).foreach  element =>
   Holder.log.info(element)

【讨论】:

我有一个火花流自定义接收器,将 NullPointering 自己弄死,花了一天时间拉我的胡子。这就是解决方案。谢谢。 这似乎可以解决。假设我想启用来自特定模块的日志。该怎么做? 我在输出日志中看不到任何日志。你能建议我可能出错的地方吗? 我仍然无法在 spark-jobserver 中使用这种确切的方法。有人尝试过这种配置吗?一切看起来都应该可以工作,但我在闭包中看不到任何日志记录。 你能告诉我getLogger方法是从哪里导入的【参考方案2】:

使用 Log4j 2.x。核心记录器已被序列化。问题解决了。

Jira 讨论:https://issues.apache.org/jira/browse/LOG4J2-801

"org.apache.logging.log4j" % "log4j-api" % "2.x.x"

"org.apache.logging.log4j" % "log4j-core" % "2.x.x"

"org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"

【讨论】:

您能否给出此日志记录的完整实现。比如你如何创建 log4j2.properties 以及如何在代码中实现。【参考方案3】:

如果您需要在mapfilter 或其他RDD 函数之前和之后执行一些代码,请尝试使用mapPartition,其中底层迭代器被显式传递。

例子:

val log = ??? // this gets captured and produced serialization error
rdd.map  x =>
  log.info(x)
  x+1

变成:

rdd.mapPartition  it =>
  val log = ??? // this is freshly initialized in worker nodes
  it.map  x =>
    log.info(x)
    x + 1
  

每个基本的RDD 函数始终使用mapPartition 实现。

确保明确处理分区器并且不要松动它:参见 Scaladoc,preservesPartitioning 参数,这对性能至关重要。

【讨论】:

【参考方案4】:

这是一篇旧帖子,但我想提供我的工作解决方案,这是我经过很多努力后才得到的,仍然对其他人有用:

我想在 rdd.map 函数中打印 rdd 内容但得到Task Not Serializalable Error。这是我使用扩展 java.io.Serializable 的 scala 静态对象解决此问题的方法:

import org.apache.log4j.Level

object MyClass extends Serializable

val log = org.apache.log4j.LogManager.getLogger("name of my spark log")

log.setLevel(Level.INFO)

def main(args:Array[String])


rdd.map(t=>

//Using object's logger here

val log =MyClass.log

log.INFO("count"+rdd.count)
)



【讨论】:

【参考方案5】:
val log = Logger.getLogger(getClass.getName),

您可以使用“日志”来写日志。此外,如果您需要更改记录器属性,您需要在 /conf 文件夹中有 log4j.properties。默认情况下,我们将在该位置有一个模板。

【讨论】:

我尝试使用 log4j,但在从类(而不是对象)调用 logger 变量时仍然遇到序列化问题:Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger 简单的解决方案:在本地方法范围内声明日志变量。 如果你制作“日志”@transient 会怎样? 地图分区来拯救这样的事情。您可以在 mappartitions 函数中创建记录器并使用它。该技术用于 JDBC 连接/mq/Kafka 生产者。【参考方案6】:

使记录器瞬态和懒惰可以解决问题

@transient lazy val log = Logger.getLogger(getClass.getName)

@transient 将告诉 spark 不要为所有执行程序序列化它,而惰性将导致在第一次使用时创建实例。换句话说,每个执行者都有自己的记录器实例。即使可以,序列化记录器也不是一个好主意。

当然,您放入 map() 闭包中的任何内容都将在执行程序上运行,因此将在执行程序日志中找到,而不是在驱动程序日志中找到。 对于执行器上的自定义 log4j 属性,您需要将 log4j.properties 添加到执行器类路径并将您的 log4j.properties 发送到执行器。

这可以通过将以下参数添加到您的 spark-submit 命令--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties " --files ./log4j.properties 来完成 还有其他方法可以设置这些配置,但这是最常见的。

【讨论】:

【参考方案7】:

这是我的解决方案:

我正在使用 SLF4j(带有 Log4j 绑定), 在我的每一个 spark 工作的基类中,我都有这样的东西:

import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass) 

就在我在分布式功能代码中使用LOG 的地方之前,我将记录器引用复制到本地常量。

val LOG = this.LOG

它对我有用!

【讨论】:

以上是关于Scala 中的 Apache Spark 日志记录的主要内容,如果未能解决你的问题,请参考以下文章

Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化

forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员

Spark 与 Scala 中的 ETL 过程

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?

intellij 中 spark scala 应用程序中的线程“main”java.lang.NoClassDefFoundError:org/apache/spark/sql/catalyst/St