Spark Streaming Driver 和 App 工作文件清理

Posted

技术标签:

【中文标题】Spark Streaming Driver 和 App 工作文件清理【英文标题】:Spark Streaming Driver and App work files cleanup 【发布时间】:2017-02-09 14:05:00 【问题描述】:

我正在运行 spark 2.0.2 并在 spark 独立集群上以 cluster 部署模式部署流作业。流式作业工作正常,但在 SPARK_HOME 的工作目录中创建的应用程序和驱动程序的 stderr 文件存在问题。由于流始终在运行,这些文件只会变大,我不知道如何控制它。

我已经尝试了以下解决方案,即使它们与手头的问题并不完全相关,但我仍然尝试过但没有奏效:

    Apache Spark does not delete temporary directories How to log using log4j to local file system inside a Spark application that runs on YARN?

谁能帮助我如何限制正在创建的这些文件的大小?

P.S:我尝试了在conf/spark-env.sh 中添加以下行并重新启动集群的解决方案,但在运行流应用程序的情况下它不起作用。

export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=60 -Dspark.worker.cleanup.appDataTtl=60"

编辑

@YuvalItzchakov 我已经尝试过你的建议,但没有奏效。驱动的stderr日志如下:

Launch Command: "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java" "-cp" "/mnt/spark2.0.2/conf/:/mnt/spark2.0.2/jars/*" "-Xmx2048M" "-Dspark.eventLog.enabled=true" "-Dspark.eventLog.dir=/mnt/spark2.0.2/JobsLogs" "-Dspark.executor.memory=2g" "-Dspark.deploy.defaultCores=2" "-Dspark.io.compression.codec=snappy" "-Dspark.submit.deployMode=cluster" "-Dspark.shuffle.consolidateFiles=true" "-Dspark.shuffle.compress=true" "-Dspark.app.name=Streamingjob" "-Dspark.kryoserializer.buffer.max=128M" "-Dspark.master=spark://172.16.0.27:7077" "-Dspark.shuffle.spill.compress=true" "-Dspark.serializer=org.apache.spark.serializer.KryoSerializer" "-Dspark.cassandra.input.fetch.size_in_rows=20000" "-Dspark.executor.extraJavaOptions=-Dlog4j.configuration=file:///mnt/spark2.0.2/sparkjars/log4j.xml" "-Dspark.jars=file:/mnt/spark2.0.2/sparkjars/StreamingJob-assembly-0.1.0.jar" "-Dspark.executor.instances=10" "-Dspark.driver.extraJavaOptions=-Dlog4j.configuration=file:///mnt/spark2.0.2/sparkjars/log4j.xml" "-Dspark.driver.memory=2g" "-Dspark.rpc.askTimeout=10" "-Dspark.eventLog.compress=true" "-Dspark.executor.cores=1" "-Dspark.driver.supervise=true" "-Dspark.history.fs.logDirectory=/mnt/spark2.0.2/JobsLogs" "-Dlog4j.configuration=file:///mnt/spark2.0.2/sparkjars/log4j.xml" "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@172.16.0.29:34475" "/mnt/spark2.0.2/work/driver-20170210124424-0001/StreamingJob-assembly-0.1.0.jar" "Streamingjob"
========================================

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/02/10 12:44:26 INFO SecurityManager: Changing view acls to: cassuser
17/02/10 12:44:26 INFO SecurityManager: Changing modify acls to: cassuser
17/02/10 12:44:26 INFO SecurityManager: Changing view acls groups to: 
17/02/10 12:44:26 INFO SecurityManager: Changing modify acls groups to: 

我的log4j.xml 文件如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
    <appender name="stdout" class="org.apache.log4j.RollingFileAppender">
        <param name="threshold" value="TRACE"/>
        <param name="File" value="stdout"/>
        <param name="maxFileSize" value="1MB"/>
        <param name="maxBackupIndex" value="10"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%dyyyy-MM-dd HH:mm:ss %-5p %c1:%L - %m%n"/>
        </layout>
        <filter class="org.apache.log4j.varia.LevelRangeFilter">
            <param name="levelMin" value="ALL" />
            <param name="levelMax" value="OFF" />
        </filter>
    </appender>

    <appender name="stderr" class="org.apache.log4j.RollingFileAppender">
        <param name="threshold" value="WARN"/>
        <param name="File" value="stderr"/>
        <param name="maxFileSize" value="1MB"/>
        <param name="maxBackupIndex" value="10"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%dyyyy-MM-dd HH:mm:ss %-5p %c1:%L - %m%n"/>
        </layout>
    </appender>
</log4j:configuration>

请注意,我已在答案中从您的 xml 中删除了此根标记,因为它给出了一些错误:

<root>
    <appender-ref ref="console"/>
</root>

【问题讨论】:

你可以为你的worker和master提供一个自定义的log4j.xml,并带有一个滚动文件appender。 @YuvalItzchakov 你能告诉我怎么做吗? 【参考方案1】:

您可以为此使用自定义的log4j xml 文件。

首先,声明您的 XML 文件:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
    <appender name="stdout" class="org.apache.log4j.RollingFileAppender">
        <param name="threshold" value="TRACE"/>
        <param name="File" value="stdout"/>
        <param name="maxFileSize" value="50MB"/>
        <param name="maxBackupIndex" value="100"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%dyyyy-MM-dd HH:mm:ss %-5p %c1:%L - %m%n"/>
        </layout>
        <filter class="org.apache.log4j.varia.LevelRangeFilter">
            <param name="levelMin" value="ALL" />
            <param name="levelMax" value="OFF" />
        </filter>
    </appender>

    <appender name="stderr" class="org.apache.log4j.RollingFileAppender">
        <param name="threshold" value="WARN"/>
        <param name="File" value="stderr"/>
        <param name="maxFileSize" value="50MB"/>
        <param name="maxBackupIndex" value="100"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%dyyyy-MM-dd HH:mm:ss %-5p %c1:%L - %m%n"/>
        </layout>
    </appender>
    <root>
        <appender-ref ref="console"/>
    </root>
</log4j:configuration>

然后,当您运行流式作业时,您需要通过extraJavaOptionslog4j.xml 文件传递​​给Spark master 和worker:

spark-submit \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///path/to/log4j.xml \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///path/to/log4j.xml

请注意,主节点和工作节点上的路径可能不同,具体取决于您将 JAR 和文件部署到 Spark 的方式。您说您使用的是集群模式,所以我假设您正在手动分派 JAR 和额外文件,但是对于在客户端模式下运行它的任何人,您还需要通过 --files 标志添加 xml 文件。

【讨论】:

试试这个..!! 请查看已编辑的问题。我已经添加了驱动程序stderr 日志。顺便说一句,这没有用。如果我做错了什么,请告诉我。我正在使用以下方法向 Spark 提交作业: spark-submit --deploy-mode cluster --supervise --conf "spark.eventLog.enabled=true" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file: ///mnt/spark2.0.2/sparkjars/log4j.xml" --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///mnt/spark2.0.2/sparkjars/log4j.xml" --master spark://172.16.0.27:7077 --class Streamingjob /mnt/spark2.0.2/sparkjars/StreamingJob-assembly-0.1.0.jar 任何 cmets..? 它没有拾取文件。是否命名为log4j.xml 是的,它被命名为log4j.xml,并且拥有-rwxrwxrwx 权限

以上是关于Spark Streaming Driver 和 App 工作文件清理的主要内容,如果未能解决你的问题,请参考以下文章

第13课:Spark Streaming源码解读之Driver容错安全性

第13课:Spark Streaming源码解读之Driver容错安全性

(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考

Spark Streaming源码解读之Driver容错安全性

Spark Streaming源码解读之Driver容错安全性

第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考