log4j源码分析-2 (AysncLoggerAsyncAppender)

Posted 一只小笨龟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了log4j源码分析-2 (AysncLoggerAsyncAppender)相关的知识,希望对你有一定的参考价值。

Logger和Appender介绍

我们知道Logger和Appender其实都是日志框架中的两个组件,最终都是为了将日志输送到某个目的地,如果把开发着当做日志生产者,磁盘等存储介质当做日志消费者,那么Logger和Appender就是日志的搬运工。Logger主要对接"生产者",将待打印的日志信息分类然后转发至Appender,而Appender主要对接"消费者",决定日志信息怎么去目的地、去哪个目的地。

从Logger与Appender的对应关系看,应该属于多对多的关系,即一个logger可以引用多个Appender,同时,多个Logger可以同时引用一个Appender。

Logger和Appender都有同步和异步两种模式,通过组合可以分为以下几种日志输出方式。

1.同步Logger+同步Appender

所有流程串行。

log4j源码分析-2 (AysncLogger、AsyncAppender)

2.同步Logger+异步Appender

当执行完Appender的append方法后,logEvent放入阻塞队列中,由新线程从阻塞队列中取logEvent进行消费。


log4j源码分析-2 (AysncLogger、AsyncAppender)

3.异步Logger+同步Appender

log4j源码分析-2 (AysncLogger、AsyncAppender)

4.异步Logger+异步Appender

log4j源码分析-2 (AysncLogger、AsyncAppender)

AsyncLogger分析

日志调用链路的前半部分同步和异步都是一样的,从LoggerConfig类的log()方法开始出现分歧。

public void log(final String loggerName, final String fqcn, final StackTraceElement location, final Marker marker, final Level level, final Message data, final Throwable t) { List<Property> props = null; if (!propertiesRequireLookup) { props = properties; } else { if (properties != null) { props = new ArrayList<>(properties.size()); final LogEvent event = Log4jLogEvent.newBuilder() .setMessage(data) .setMarker(marker) .setLevel(level) .setLoggerName(loggerName) .setLoggerFqcn(fqcn) .setThrown(t) .build(); for (int i = 0; i < properties.size(); i++) { final Property prop = properties.get(i); final String value = prop.isValueNeedsLookup() // since LOG4J2-1575 ? config.getStrSubstitutor().replace(event, prop.getValue()) // : prop.getValue(); props.add(Property.createProperty(prop.getName(), value)); } } } final LogEvent logEvent = logEventFactory instanceof LocationAwareLogEventFactory ? ((LocationAwareLogEventFactory) logEventFactory).createEvent(loggerName, marker, fqcn, location, level, data, props, t) : logEventFactory.createEvent(loggerName, marker, fqcn, level, data, props, t); try {            //抽象方法,具体指向AsyncLoggerConfig或者LoggerConfig log(logEvent, LoggerConfigPredicate.ALL); } finally { // LOG4J2-1583 prevent scrambled logs when logging calls are nested (logging in toString()) ReusableLogEventFactory.release(logEvent); } }

该方法调用log()方法,是一个抽象方法,AsyncLogger和Logger都是其实现类,分别对应的是异步和同步Logger,本文关注AsyncLogger。

log4j源码分析-2 (AysncLogger、AsyncAppender)

执行AsyncLoggerConfig.log方法

protected void log(final LogEvent event, final LoggerConfigPredicate predicate) { // See LOG4J2-2301 if (predicate == LoggerConfigPredicate.ALL && ASYNC_LOGGER_ENTERED.get() == Boolean.FALSE && // Optimization: AsyncLoggerConfig is identical to LoggerConfig // when no appenders are present. Avoid splitting for synchronous // and asynchronous execution paths until encountering an // AsyncLoggerConfig with appenders. hasAppenders()) { // This is the first AsnycLoggerConfig encountered by this LogEvent ASYNC_LOGGER_ENTERED.set(Boolean.TRUE); try { // Detect the first time we encounter an AsyncLoggerConfig. We must log // to all non-async loggers first. super.log(event, LoggerConfigPredicate.SYNCHRONOUS_ONLY); // Then pass the event to the background thread where // all async logging is executed. It is important this // happens at most once and after all synchronous loggers // have been invoked, because we lose parameter references // from reusable messages. logToAsyncDelegate(event); } finally { ASYNC_LOGGER_ENTERED.set(Boolean.FALSE); } } else { super.log(event, predicate); } }

执行logToAsyncDelegate()方法,执行enqueue方法,调用Disruptor,将event事件放入环形队列。

//AsyncLoggerConfig.java private void logToAsyncDelegate(LogEvent event) { if (!isFiltered(event)) { // Passes on the event to a separate thread that will call // asyncCallAppenders(LogEvent). populateLazilyInitializedFields(event); if (!delegate.tryEnqueue(event, this)) { //如果获取Disruptor队列需要等待则执行等待策略 handleQueueFull(event); } } }
private void handleQueueFull(final LogEvent event) { if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock AsyncQueueFullMessageUtil.logWarningToStatusLogger(); logToAsyncLoggerConfigsOnCurrentThread(event); } else { // otherwise, we leave it to the user preference final EventRoute eventRoute = delegate.getEventRoute(event.getLevel()); // 1、DefaultAsyncQueueFullPolicy---等待队列,转为同步操作策略 // 2、DiscardingAsyncQueueFullPolicy---按照日志等级抛弃日志策略 eventRoute.logMessage(this, event); } }

执行tryEnqueue方法,调用disruptor环形队列并将event放入环形队列中,关于disruptor参考:https://www.cnblogs.com/lewis09/p/9974617.html

//AsyncLoggerConfigDisruptor.java @Override public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) { final LogEvent logEvent = prepareEvent(event); return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig); }

环形队列的消费:定义RingBufferLogEventHandler类实现Disruptor的SequenceReportingEventHandler的onEvent方法,从ringbuffer读取事件进行处理。

 @Override public void onEvent(final RingBufferLogEvent event, final long sequence, final boolean endOfBatch) throws Exception { try { //消费事件 event.execute(endOfBatch); } finally { event.clear(); // notify the BatchEventProcessor that the sequence has progressed. // Without this callback the sequence would not be progressed // until the batch has completely finished. notifyCallback(sequence); } }

event.execute内部逻辑:调用了ReliabilityStrategy接口,日志事件传递到适当的appender的对象的接口,然后使用appender的能力将日志输出。

public void actualAsyncLog(final RingBufferLogEvent event) { final LoggerConfig privateConfigLoggerConfig = privateConfig.loggerConfig; final List<Property> properties = privateConfigLoggerConfig.getPropertyList();
if (properties != null) { onPropertiesPresent(event, properties); }
privateConfigLoggerConfig.getReliabilityStrategy().log(this, event); }

AsyncAppender分析

接上文分析,当调用了ReliabilityStrategy接口后,代码执行到LoggerConfig类的processLogEvent()方法。

private void processLogEvent(final LogEvent event, final LoggerConfigPredicate predicate) { event.setIncludeLocation(isIncludeLocation()); if (predicate.allow(this)) { callAppenders(event); } logParent(event, predicate); }

执行AppenderControl的callAppender()方法。

public void callAppender(final LogEvent event) { if (shouldSkip(event)) { return; } callAppenderPreventRecursion(event); }
private void callAppenderPreventRecursion(final LogEvent event) { try { recursive.set(this); callAppender0(event); } finally { recursive.set(null); } }
private void callAppender0(final LogEvent event) { ensureAppenderStarted(); if (!isFilteredByAppender(event)) { tryCallAppender(event); } }
private void tryCallAppender(final LogEvent event) { try { //最终执行的Appender方法 appender.append(event); } catch (final RuntimeException ex) { handleAppenderError(event, ex); } catch (final Exception ex) { handleAppenderError(event, new AppenderLoggingException(ex)); } }

appender.append()是多态的,因此在异步场景下,选用的是AsyncAppender。

log4j源码分析-2 (AysncLogger、AsyncAppender)

Async类的append方法

如流程图所示,首先会是否配置blocking选项,默认是true,如果设置为false,则Appender直接会ToErrorAppender,反之,会按照一定的策略来处理这些消息。

策略有如下两种:

1、DefaultAsyncQueueFullPolicy—等待队列,转为同步操作策略

public class DefaultAsyncQueueFullPolicy implements AsyncQueueFullPolicy { @Override public EventRoute getRoute(final long backgroundThreadId, final Level level) {
// LOG4J2-471: prevent deadlock when RingBuffer is full and object // being logged calls Logger.log() from its toString() method if (Thread.currentThread().getId() == backgroundThreadId) { return EventRoute.SYNCHRONOUS; } return EventRoute.ENQUEUE; }}

2、DiscardingAsyncQueueFullPolicy—按照日志等级抛弃日志策略

public EventRoute getRoute(final long backgroundThreadId, final Level level) { if (level.isLessSpecificThan(thresholdLevel)) { if (discardCount.getAndIncrement() == 0) { LOGGER.warn("Async queue is full, discarding event with level {}. " + "This message will only appear once; future events from {} " + "are silently discarded until queue capacity becomes available.", level, thresholdLevel); } return EventRoute.DISCARD; } return super.getRoute(backgroundThreadId, level); }

配置如下:log4j2.component.properties

//如果使用了Disruptor队列,强烈建议配置该项,避免由于日志过多导致内存泄露//56 * 1024 = 57344,单位为bytesasyncLoggerRingBufferSize=57344
//以下如果两者都配置了,则第二种方式将会覆盖第一种方式的配置log4j2.AsyncQueueFullPolicy=Discard log4j2.AsyncQueueFullPolicy=Discard;log4j2.DiscardThreshold=INFO

append方法。

public void append(final LogEvent logEvent) { if (!isStarted()) { throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); } final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); //放入队列 if (!transfer(memento)) { //如果配置blocking=true,当队列满时,执行对应的策略 if (blocking) { if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock AsyncQueueFullMessageUtil.logWarningToStatusLogger(); logMessageInCurrentThread(logEvent); } else { // delegate to the event router (which may discard, enqueue and block, or log in current thread) final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); route.logMessage(this, memento); } } else { error("Appender " + getName() + " is unable to write primary appenders. queue is full"); logToErrorAppenderIfNecessary(false, memento); } } }
public void append(final LogEvent logEvent) { if (!isStarted()) { throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); } final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); //放入队列 if (!transfer(memento)) { //如果配置blocking=true,当队列满时,执行对应的策略 if (blocking) { if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock AsyncQueueFullMessageUtil.logWarningToStatusLogger(); logMessageInCurrentThread(logEvent); } else { // delegate to the event router (which may discard, enqueue and block, or log in current thread) final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); route.logMessage(this, memento); } } else { error("Appender " + getName() + " is unable to write primary appenders. queue is full"); logToErrorAppenderIfNecessary(false, memento); } } }

从阻塞队列中取出日志进行消费是在后台线程AsyncThread进行的,该线程会一直尝试从阻塞队列中获取LogEvent,如果获取成功,调用AppenderRef所引用Appender的append方法。

private class AsyncThread extends Log4jThread {
private volatile boolean shutdown = false; //此处的appenders代表的是在配置文件中配置的appender-ref,指向实际执行的appender private final List<AppenderControl> appenders; private final BlockingQueue<LogEvent> queue;
public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) { super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement()); this.appenders = appenders; this.queue = queue; setDaemon(true); }
@Override public void run() { while (!shutdown) { LogEvent event; try { event = queue.take(); if (event == SHUTDOWN_LOG_EVENT) { shutdown = true; continue; } } catch (final InterruptedException ex) { break; // LOG4J2-830 } event.setEndOfBatch(queue.isEmpty()); final boolean success = callAppenders(event); if (!success && errorAppender != null) { try { errorAppender.callAppender(event); } catch (final Exception ex) { // Silently accept the error. } } } // Process any remaining items in the queue. LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", queue.size()); int count = 0; int ignored = 0; while (!queue.isEmpty()) { try { final LogEvent event = queue.take(); if (event instanceof Log4jLogEvent) { final Log4jLogEvent logEvent = (Log4jLogEvent) event; logEvent.setEndOfBatch(queue.isEmpty()); callAppenders(logEvent); count++; } else { ignored++; LOGGER.trace("Ignoring event of class {}", event.getClass().getName()); } } catch (final InterruptedException ex) { // May have been interrupted to shut down. // Here we ignore interrupts and try to process all remaining events. } } LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); }
/** * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl} * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any * exceptions are silently ignored. * * @param event the event to forward to the registered appenders * @return {@code true} if at least one appender call succeeded, {@code false} otherwise */ boolean callAppenders(final LogEvent event) { boolean success = false; for (final AppenderControl control : appenders) { try { control.callAppender(event); success = true; } catch (final Exception ex) { // If no appender is successful the error appender will get it. } } return success; }
public void shutdown() { shutdown = true; if (queue.isEmpty()) { queue.offer(SHUTDOWN_LOG_EVENT); } if (getState() == State.TIMED_WAITING || getState() == State.WAITING) { this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call } } }

吞吐量对比

参考文档

https://www.cnblogs.com/lewis09/p/10003462.html

以上是关于log4j源码分析-2 (AysncLoggerAsyncAppender)的主要内容,如果未能解决你的问题,请参考以下文章

mybatis 源码分析--日志分析

细读源码之Log4j打印异常堆栈导致线程Block问题分析

log4j<=1.2.17反序列化漏洞(CVE-2019-17571)分析

log4j源码解析-文件解析

日志框架Log4j源码解析

Flume-NG源码分析-整体结构及配置载入分析