如何从我的 Python Spark 脚本中登录

Posted

技术标签:

【中文标题】如何从我的 Python Spark 脚本中登录【英文标题】:How do I log from my Python Spark script 【发布时间】:2014-10-13 23:28:33 【问题描述】:

我有一个使用spark-submit 运行的 Python Spark 程序。我想把日志语句放进去。

logging.info("This is an informative message.")
logging.debug("This is a debug message.")

我想使用与 Spark 相同的记录器,以便日志消息以相同的格式输出,并且级别由相同的配置文件控制。我该怎么做?

我尝试将logging 语句放入代码中,并从logging.getLogger() 开始。在这两种情况下,我都看到了 Spark 的日志消息,但没有看到我的。我一直在查看Python logging documentation,但无法从那里弄清楚。

不确定这是否是提交给 Spark 的脚本所特有的,或者只是我不了解日志记录的工作原理。

【问题讨论】:

您可能看不到您的日志记录语句,因为默认的日志记录级别是 WARNING,所以当您尝试 INFO 或 DEBUG 时,您会被过滤掉。 【参考方案1】:

你可以在一个类中实现logging.Handler接口,将日志消息转发到Spark下的log4j。然后使用logging.root.addHandler()(以及可选的logging.root.removeHandler())安装该处理程序。

处理程序应该有如下方法:

def emit(self, record):
    """Forward a log message for log4j."""
    Logger = self.spark_session._jvm.org.apache.log4j.Logger
    logger = Logger.getLogger(record.name)
    if record.levelno >= logging.CRITICAL:
        # Fatal and critical seem about the same.
        logger.fatal(record.getMessage())
    elif record.levelno >= logging.ERROR:
        logger.error(record.getMessage())
    elif record.levelno >= logging.WARNING:
        logger.warn(record.getMessage())
    elif record.levelno >= logging.INFO:
        logger.info(record.getMessage())
    elif record.levelno >= logging.DEBUG:
        logger.debug(record.getMessage())
    else:
        pass

在初始化 Spark 会话后应立即安装处理程序:

spark = SparkSession.builder.appName("Logging Example").getOrCreate()
handler = CustomHandler(spark_session)
# Replace the default handlers with the log4j forwarder.
root_handlers = logging.root.handlers[:]
for h in self.root_handlers:
    logging.root.removeHandler(h)
logging.root.addHandler(handler)

# Now you can log stuff.
logging.debug("Installed log4j log handler.")

这里有一个更完整的例子:https://gist.github.com/thsutton/65f0ec3cf132495ef91dc22b9bc38aec

【讨论】:

【参考方案2】:
import logging

# Logger

logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s')
logger = logging.getLogger('driver_logger')
logger.setLevel(logging.DEBUG)

从 pyspark 登录的最简单方法!

【讨论】:

仅在执行 logger.error() 时打印一些内容【参考方案3】:

您需要获取 spark 本身的记录器,默认情况下 getLogger() 将为您自己的模块返回记录器。尝试类似:

logger = logging.getLogger('py4j')
logger.info("My test info statement")

也可能是'pyspark' 而不是'py4j'

如果您在 spark 程序中使用的函数(并且会执行一些日志记录)与 main 函数在同一模块中定义,则会出现一些序列化错误。

这在here进行了解释,并给出了同一个人的示例here

我也在 spark 1.3.1 上测试过这个

编辑:

要将日志记录从 STDERR 更改为 STDOUT,您必须删除当前的 StreamHandler 并添加一个新的。

找到已有的Stream Handler(完成后可以去掉这行)

print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]

可能只有一个,但如果没有,您将不得不更新位置。

logger.removeHandler(logger.handlers[0])

sys.stdout 添加新的处理程序

import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)

【讨论】:

我是否必须将 logger 对象作为参数传递给所有使用它的组件?有什么方法可以全局设置吗? 只要您不进行线程或多处理,您应该能够将其设置在模块的顶部并在任何地方使用它。只需将logging. 更改为logger. 即可随时记录内容。 谢谢。它以这种方式工作。但是消息总是发送到 stderr。我们如何才能直接到 stdout 或 stderr? 我更新了我的答案来为你解决这个问题。可能有一种方法可以更新现有的 StreamHandler,我不确定,但以上是我知道的方法。 我很想否决这个答案,因为它对我不起作用。查看 pyspark 源代码,pyspark 从不配置 py4j 记录器,并且 py4j 使用 java.utils.logging 而不是 spark 使用的 log4j 记录器,所以我怀疑这是否可行。我认为这可能适用于主节点上的代码,但不适用于工作人员上运行的任何内容。【参考方案4】:

我们需要从执行程序记录,而不是从驱动程序节点。所以我们做了以下事情:

    我们在所有节点上创建了一个/etc/rsyslog.d/spark.conf(使用引导方法和 Amazon Elastic Map 减少so that the Core nodes forwarded sysloglocal1` 向主节点发送消息。

    在主节点上,我们启用了 UDP 和 TCP syslog 侦听器,我们对其进行了设置,以便所有 local 消息都记录到 /var/log/local1.log

    我们在 map 函数中创建了一个 Python logging 模块 Syslog 记录器。

    现在我们可以使用logging.info() 登录。 ...

我们发现的一件事是同一个分区正在多个执行程序上同时处理。显然,当 Spark 有额外资源时,它一直都在这样做。这可以处理执行器神秘延迟或失败的情况。

登录 map 函数让我们了解了 Spark 的工作原理。

【讨论】:

您是否在代码中实现了任何自定义日志记录,例如特定于作业的时间戳等。这种方法是否也适用于从所有工作节点/执行器输出火花日志。 你好。我也有兴趣使用它!我只是想问你一些问题。 1. 你是如何完成第 2 步的。 ? 2. executor日志去哪了? S3?还是它们与其他默认日志一起汇总在标准输出中? @J.Snow — 我们正在使用标准的 syslog 日志记录。例如,请参阅linux.die.net/man/5/syslog.conf。执行程序日志被发送到 local1 syslog 设施,然后通过 UDP 发送到头端。 @vy32 抱歉,我不是这方面的专家。也许我可以更具体一些。您是否需要打开 syslog TCP 和 UDP 端口?当您说“我们设置它以便所有本地消息都记录到 /var/log/local1.log”时,您的意思是您在某些配置文件中指定了一些属性以将日志输出到 /var/log/local1。日志?对于所有问题,我很抱歉,但我不是这些问题的专家。你能分享你的 EMR 引导脚本吗?【参考方案5】:

pyspark和java log4j交互的关键是jvm。 下面是python代码,conf缺少url,但这是关于日志记录的。

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

my_jars = os.environ.get("SPARK_HOME")
myconf = SparkConf()
myconf.setMaster("local").setAppName("DB2_Test")
myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars)
spark = SparkSession\
 .builder\
 .appName("DB2_Test")\
 .config(conf = myconf) \
 .getOrCreate()


Logger= spark._jvm.org.apache.log4j.Logger
mylogger = Logger.getLogger(__name__)
mylogger.error("some error trace")
mylogger.info("some info trace")

【讨论】:

【参考方案6】:

就我而言,我很高兴将我的日志消息与通常的 spark 日志消息一起添加到工人标准错误中。

如果这符合您的需求,那么诀窍是将特定的 Python 记录器重定向到 stderr

例如,受this answer 启发的以下内容对我来说效果很好:

def getlogger(name, level=logging.INFO):
    import logging
    import sys

    logger = logging.getLogger(name)
    logger.setLevel(level)
    if logger.handlers:
        # or else, as I found out, we keep adding handlers and duplicate messages
        pass
    else:
        ch = logging.StreamHandler(sys.stderr)
        ch.setLevel(level)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        ch.setFormatter(formatter)
        logger.addHandler(ch)
    return logger

用法:

def tst_log():
    logger = getlogger('my-worker')
    logger.debug('a')
    logger.info('b')
    logger.warning('c')
    logger.error('d')
    logger.critical('e')
    ...

输出(加上几行上下文):

17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver

【讨论】:

在 getlogger 函数中 import loggingimport sys 是否有特殊原因? 这在工人内部不起作用...似乎只能在驱动程序中起作用..【参考方案7】:

您可以从 SparkContext 对象中获取记录器:

log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")

【讨论】:

我得到了问题:logger = logging.getLogger('py4j') TypeError: 'JavaPackage' object is not callable 这绝对允许我像 Spark 一样记录(谢谢!)。除了从 SparkContext 之外,还有其他方法可以获取此记录器吗?在创建 SparkContext 之前,我需要打印一些日志 @marlieg 在创建 spark 上下文之前,您无权访问 spark 日志记录。 我在 PySpark 中尝试使用这个想法时出错。我所做的是尝试将记录器存储为全局,然后当它不起作用时尝试将上下文本身存储为全局。我的用例是能够在 foreach 函数(它没有 spark 上下文)内对我的执行程序进行记录调用。 “例外:您似乎正试图从广播变量、操作或转换中引用 SparkContext。SparkContext 只能在驱动程序上使用,而不是在它在工作人员上运行的代码中使用。有关更多信息,请参阅 SPARK-5063。” 我得到了这个工作,但无法弄清楚日志的存储位置,有人可以帮我解决这个问题

以上是关于如何从我的 Python Spark 脚本中登录的主要内容,如果未能解决你的问题,请参考以下文章

如何在使用 Python 脚本登录时维护登录会话

如何在 Elastic beanstalk 上从我的 PHP 按需运行 python 脚本?

Django/AngularJS:如何从我的 AngularJS 脚本访问我的 Python 上下文项

如何在远程 Spark 集群上运行本地 Python 脚本?

为啥我不能从我的 python 脚本创建可执行文件?

教你如何在Spark Scala/Java应用中调用Python脚本