Scala 中的 Apache Spark 日志记录
Posted
技术标签:
【中文标题】Scala 中的 Apache Spark 日志记录【英文标题】:Apache Spark logging within Scala 【发布时间】:2015-05-26 08:38:32 【问题描述】:我正在寻找一种能够在 Apache Spark 节点上执行代码时记录额外数据的解决方案,以帮助稍后调查执行期间可能出现的一些问题。尝试使用传统解决方案(例如 com.typesafe.scalalogging.LazyLogging
)会失败,因为无法在 Apache Spark 等分布式环境上序列化日志实例。
我已经调查过这个问题,目前我发现的解决方案是使用 org.apache.spark.Logging
特征,如下所示:
class SparkExample with Logging
val someRDD = ...
someRDD.map
rddElement => logInfo(s"$rddElement will be processed.")
doSomething(rddElement)
但是,Logging trait 似乎不是 Apache Spark 的永久解决方案,因为它被标记为 @DeveloperApi
并且类文档提到:
这可能会在未来的版本中更改或删除。
我想知道 - 它们是否是我可以使用的任何已知日志记录解决方案,并且允许我在 Apache Spark 节点上执行 RDD 时记录数据?
@Later Edit :下面的一些 cmets 建议使用 Log4J。我尝试过使用 Log4J,但在使用 Scala 类(而不是 Scala 对象)中的记录器时仍然遇到问题。 这是我的完整代码:
import org.apache.log4j.Logger
import org.apache.spark._
object Main
def main(args: Array[String])
new LoggingTestWithRDD().doTest()
class LoggingTestWithRDD extends Serializable
val log = Logger.getLogger(getClass.getName)
def doTest(): Unit =
val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
val spark = new SparkContext(conf)
val someRdd = spark.parallelize(List(1, 2, 3))
someRdd.map
element =>
log.info(s"$element will be processed")
element + 1
spark.stop()
我看到的例外是:
线程 "main" org.apache.spark.SparkException 中的异常:任务不可序列化 -> 原因:java.io.NotSerializableException: org.apache.log4j.Logger
【问题讨论】:
嗯...配置您的 log4j 并完成您的日志记录。 所以基本上 Apache Spark 强制你只使用 log4j ? Fwiw,log4j的正确拼写是“slf4j”。 除了 / 作为日志记录的替代方案之外,指标可能会为您提供您想要的:spark.apache.org/docs/latest/monitoring.html @michael_n 这不正确。 log4j 和 slf4j 是不同的东西。 【参考方案1】:您可以使用 Akhil 在https://www.mail-archive.com/user@spark.apache.org/msg29010.html 中提出的解决方案。 我自己用过,效果很好。
Akhil Das 星期一,2015 年 5 月 25 日 08:20:40 -0700 试试这个方法:
object Holder extends Serializable @transient lazy val log = Logger.getLogger(getClass.getName) val someRdd = spark.parallelize(List(1, 2, 3)).foreach element => Holder.log.info(element)
【讨论】:
我有一个火花流自定义接收器,将 NullPointering 自己弄死,花了一天时间拉我的胡子。这就是解决方案。谢谢。 这似乎可以解决。假设我想启用来自特定模块的日志。该怎么做? 我在输出日志中看不到任何日志。你能建议我可能出错的地方吗? 我仍然无法在 spark-jobserver 中使用这种确切的方法。有人尝试过这种配置吗?一切看起来都应该可以工作,但我在闭包中看不到任何日志记录。 你能告诉我getLogger方法是从哪里导入的【参考方案2】:使用 Log4j 2.x。核心记录器已被序列化。问题解决了。
Jira 讨论:https://issues.apache.org/jira/browse/LOG4J2-801
"org.apache.logging.log4j" % "log4j-api" % "2.x.x"
"org.apache.logging.log4j" % "log4j-core" % "2.x.x"
"org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"
【讨论】:
您能否给出此日志记录的完整实现。比如你如何创建 log4j2.properties 以及如何在代码中实现。【参考方案3】:如果您需要在map
、filter
或其他RDD
函数之前和之后执行一些代码,请尝试使用mapPartition
,其中底层迭代器被显式传递。
例子:
val log = ??? // this gets captured and produced serialization error
rdd.map x =>
log.info(x)
x+1
变成:
rdd.mapPartition it =>
val log = ??? // this is freshly initialized in worker nodes
it.map x =>
log.info(x)
x + 1
每个基本的RDD
函数始终使用mapPartition
实现。
确保明确处理分区器并且不要松动它:参见 Scaladoc,preservesPartitioning
参数,这对性能至关重要。
【讨论】:
【参考方案4】:这是一篇旧帖子,但我想提供我的工作解决方案,这是我经过很多努力后才得到的,仍然对其他人有用:
我想在 rdd.map 函数中打印 rdd 内容但得到Task Not Serializalable Error
。这是我使用扩展 java.io.Serializable
的 scala 静态对象解决此问题的方法:
import org.apache.log4j.Level
object MyClass extends Serializable
val log = org.apache.log4j.LogManager.getLogger("name of my spark log")
log.setLevel(Level.INFO)
def main(args:Array[String])
rdd.map(t=>
//Using object's logger here
val log =MyClass.log
log.INFO("count"+rdd.count)
)
【讨论】:
【参考方案5】:val log = Logger.getLogger(getClass.getName),
您可以使用“日志”来写日志。此外,如果您需要更改记录器属性,您需要在 /conf 文件夹中有 log4j.properties。默认情况下,我们将在该位置有一个模板。
【讨论】:
我尝试使用 log4j,但在从类(而不是对象)调用 logger 变量时仍然遇到序列化问题:Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger
简单的解决方案:在本地方法范围内声明日志变量。
如果你制作“日志”@transient 会怎样?
地图分区来拯救这样的事情。您可以在 mappartitions 函数中创建记录器并使用它。该技术用于 JDBC 连接/mq/Kafka 生产者。【参考方案6】:
使记录器瞬态和懒惰可以解决问题
@transient lazy val log = Logger.getLogger(getClass.getName)
@transient
将告诉 spark 不要为所有执行程序序列化它,而惰性将导致在第一次使用时创建实例。换句话说,每个执行者都有自己的记录器实例。即使可以,序列化记录器也不是一个好主意。
当然,您放入 map() 闭包中的任何内容都将在执行程序上运行,因此将在执行程序日志中找到,而不是在驱动程序日志中找到。 对于执行器上的自定义 log4j 属性,您需要将 log4j.properties 添加到执行器类路径并将您的 log4j.properties 发送到执行器。
这可以通过将以下参数添加到您的 spark-submit 命令--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties " --files ./log4j.properties
来完成
还有其他方法可以设置这些配置,但这是最常见的。
【讨论】:
【参考方案7】:这是我的解决方案:
我正在使用 SLF4j(带有 Log4j 绑定), 在我的每一个 spark 工作的基类中,我都有这样的东西:
import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass)
就在我在分布式功能代码中使用LOG
的地方之前,我将记录器引用复制到本地常量。
val LOG = this.LOG
它对我有用!
【讨论】:
以上是关于Scala 中的 Apache Spark 日志记录的主要内容,如果未能解决你的问题,请参考以下文章
Scala 错误:线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化
forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员
如何使用scala将特定函数转换为apache spark中的udf函数? [复制]
如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?
intellij 中 spark scala 应用程序中的线程“main”java.lang.NoClassDefFoundError:org/apache/spark/sql/catalyst/St