log4j源码分析-2 (AysncLoggerAsyncAppender)
Posted 一只小笨龟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了log4j源码分析-2 (AysncLoggerAsyncAppender)相关的知识,希望对你有一定的参考价值。
Logger和Appender介绍
我们知道Logger和Appender其实都是日志框架中的两个组件,最终都是为了将日志输送到某个目的地,如果把开发着当做日志生产者,磁盘等存储介质当做日志消费者,那么Logger和Appender就是日志的搬运工。Logger主要对接"生产者",将待打印的日志信息分类然后转发至Appender,而Appender主要对接"消费者",决定日志信息怎么去目的地、去哪个目的地。
从Logger与Appender的对应关系看,应该属于多对多的关系,即一个logger可以引用多个Appender,同时,多个Logger可以同时引用一个Appender。
Logger和Appender都有同步和异步两种模式,通过组合可以分为以下几种日志输出方式。
1.同步Logger+同步Appender
所有流程串行。
2.同步Logger+异步Appender
当执行完Appender的append方法后,logEvent放入阻塞队列中,由新线程从阻塞队列中取logEvent进行消费。
3.异步Logger+同步Appender
4.异步Logger+异步Appender
AsyncLogger分析
日志调用链路的前半部分同步和异步都是一样的,从LoggerConfig类的log()方法开始出现分歧。
public void log(final String loggerName, final String fqcn, final StackTraceElement location, final Marker marker,
final Level level, final Message data, final Throwable t) {
List<Property> props = null;
if (!propertiesRequireLookup) {
props = properties;
} else {
if (properties != null) {
props = new ArrayList<>(properties.size());
final LogEvent event = Log4jLogEvent.newBuilder()
.setMessage(data)
.setMarker(marker)
.setLevel(level)
.setLoggerName(loggerName)
.setLoggerFqcn(fqcn)
.setThrown(t)
.build();
for (int i = 0; i < properties.size(); i++) {
final Property prop = properties.get(i);
final String value = prop.isValueNeedsLookup() // since LOG4J2-1575
? config.getStrSubstitutor().replace(event, prop.getValue()) //
: prop.getValue();
props.add(Property.createProperty(prop.getName(), value));
}
}
}
final LogEvent logEvent = logEventFactory instanceof LocationAwareLogEventFactory ?
((LocationAwareLogEventFactory) logEventFactory).createEvent(loggerName, marker, fqcn, location, level,
data, props, t) : logEventFactory.createEvent(loggerName, marker, fqcn, level, data, props, t);
try {
//抽象方法,具体指向AsyncLoggerConfig或者LoggerConfig
log(logEvent, LoggerConfigPredicate.ALL);
} finally {
// LOG4J2-1583 prevent scrambled logs when logging calls are nested (logging in toString())
ReusableLogEventFactory.release(logEvent);
}
}
该方法调用log()方法,是一个抽象方法,AsyncLogger和Logger都是其实现类,分别对应的是异步和同步Logger,本文关注AsyncLogger。
执行AsyncLoggerConfig.log方法
protected void log(final LogEvent event, final LoggerConfigPredicate predicate) {
// See LOG4J2-2301
if (predicate == LoggerConfigPredicate.ALL &&
ASYNC_LOGGER_ENTERED.get() == Boolean.FALSE &&
// Optimization: AsyncLoggerConfig is identical to LoggerConfig
// when no appenders are present. Avoid splitting for synchronous
// and asynchronous execution paths until encountering an
// AsyncLoggerConfig with appenders.
hasAppenders()) {
// This is the first AsnycLoggerConfig encountered by this LogEvent
ASYNC_LOGGER_ENTERED.set(Boolean.TRUE);
try {
// Detect the first time we encounter an AsyncLoggerConfig. We must log
// to all non-async loggers first.
super.log(event, LoggerConfigPredicate.SYNCHRONOUS_ONLY);
// Then pass the event to the background thread where
// all async logging is executed. It is important this
// happens at most once and after all synchronous loggers
// have been invoked, because we lose parameter references
// from reusable messages.
logToAsyncDelegate(event);
} finally {
ASYNC_LOGGER_ENTERED.set(Boolean.FALSE);
}
} else {
super.log(event, predicate);
}
}
执行logToAsyncDelegate()方法,执行enqueue方法,调用Disruptor,将event事件放入环形队列。
//AsyncLoggerConfig.java
private void logToAsyncDelegate(LogEvent event) {
if (!isFiltered(event)) {
// Passes on the event to a separate thread that will call
// asyncCallAppenders(LogEvent).
populateLazilyInitializedFields(event);
if (!delegate.tryEnqueue(event, this)) {
//如果获取Disruptor队列需要等待则执行等待策略
handleQueueFull(event);
}
}
}
private void handleQueueFull(final LogEvent event) {
if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
// If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
AsyncQueueFullMessageUtil.logWarningToStatusLogger();
logToAsyncLoggerConfigsOnCurrentThread(event);
} else {
// otherwise, we leave it to the user preference
final EventRoute eventRoute = delegate.getEventRoute(event.getLevel());
// 1、DefaultAsyncQueueFullPolicy---等待队列,转为同步操作策略
// 2、DiscardingAsyncQueueFullPolicy---按照日志等级抛弃日志策略
eventRoute.logMessage(this, event);
}
}
执行tryEnqueue方法,调用disruptor环形队列并将event放入环形队列中,关于disruptor参考:https://www.cnblogs.com/lewis09/p/9974617.html
//AsyncLoggerConfigDisruptor.java
public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
final LogEvent logEvent = prepareEvent(event);
return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
}
环形队列的消费:定义RingBufferLogEventHandler类实现Disruptor的SequenceReportingEventHandler的onEvent方法,从ringbuffer读取事件进行处理。
public void onEvent(final RingBufferLogEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
try {
//消费事件
event.execute(endOfBatch);
}
finally {
event.clear();
// notify the BatchEventProcessor that the sequence has progressed.
// Without this callback the sequence would not be progressed
// until the batch has completely finished.
notifyCallback(sequence);
}
}
event.execute内部逻辑:调用了ReliabilityStrategy接口,日志事件传递到适当的appender的对象的接口,然后使用appender的能力将日志输出。
public void actualAsyncLog(final RingBufferLogEvent event) {
final LoggerConfig privateConfigLoggerConfig = privateConfig.loggerConfig;
final List<Property> properties = privateConfigLoggerConfig.getPropertyList();
if (properties != null) {
onPropertiesPresent(event, properties);
}
privateConfigLoggerConfig.getReliabilityStrategy().log(this, event);
}
AsyncAppender分析
接上文分析,当调用了ReliabilityStrategy接口后,代码执行到LoggerConfig类的processLogEvent()方法。
private void processLogEvent(final LogEvent event, final LoggerConfigPredicate predicate) {
event.setIncludeLocation(isIncludeLocation());
if (predicate.allow(this)) {
callAppenders(event);
}
logParent(event, predicate);
}
执行AppenderControl的callAppender()方法。
public void callAppender(final LogEvent event) {
if (shouldSkip(event)) {
return;
}
callAppenderPreventRecursion(event);
}
private void callAppenderPreventRecursion(final LogEvent event) {
try {
recursive.set(this);
callAppender0(event);
} finally {
recursive.set(null);
}
}
private void callAppender0(final LogEvent event) {
ensureAppenderStarted();
if (!isFilteredByAppender(event)) {
tryCallAppender(event);
}
}
private void tryCallAppender(final LogEvent event) {
try {
//最终执行的Appender方法
appender.append(event);
} catch (final RuntimeException ex) {
handleAppenderError(event, ex);
} catch (final Exception ex) {
handleAppenderError(event, new AppenderLoggingException(ex));
}
}
appender.append()是多态的,因此在异步场景下,选用的是AsyncAppender。
Async类的append方法
如流程图所示,首先会是否配置blocking选项,默认是true,如果设置为false,则Appender直接会ToErrorAppender,反之,会按照一定的策略来处理这些消息。
策略有如下两种:
1、DefaultAsyncQueueFullPolicy—等待队列,转为同步操作策略
public class DefaultAsyncQueueFullPolicy implements AsyncQueueFullPolicy {
public EventRoute getRoute(final long backgroundThreadId, final Level level) {
// LOG4J2-471: prevent deadlock when RingBuffer is full and object
// being logged calls Logger.log() from its toString() method
if (Thread.currentThread().getId() == backgroundThreadId) {
return EventRoute.SYNCHRONOUS;
}
return EventRoute.ENQUEUE;
}
}
2、DiscardingAsyncQueueFullPolicy—按照日志等级抛弃日志策略
public EventRoute getRoute(final long backgroundThreadId, final Level level) {
if (level.isLessSpecificThan(thresholdLevel)) {
if (discardCount.getAndIncrement() == 0) {
LOGGER.warn("Async queue is full, discarding event with level {}. " +
"This message will only appear once; future events from {} " +
"are silently discarded until queue capacity becomes available.",
level, thresholdLevel);
}
return EventRoute.DISCARD;
}
return super.getRoute(backgroundThreadId, level);
}
配置如下:log4j2.component.properties
//如果使用了Disruptor队列,强烈建议配置该项,避免由于日志过多导致内存泄露
//56 * 1024 = 57344,单位为bytes
asyncLoggerRingBufferSize=57344
//以下如果两者都配置了,则第二种方式将会覆盖第一种方式的配置
log4j2.AsyncQueueFullPolicy=Discard
log4j2.AsyncQueueFullPolicy=Discard;log4j2.DiscardThreshold=INFO
append方法。
public void append(final LogEvent logEvent) {
if (!isStarted()) {
throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
}
final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
//放入队列
if (!transfer(memento)) {
//如果配置blocking=true,当队列满时,执行对应的策略
if (blocking) {
if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
// If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
AsyncQueueFullMessageUtil.logWarningToStatusLogger();
logMessageInCurrentThread(logEvent);
} else {
// delegate to the event router (which may discard, enqueue and block, or log in current thread)
final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
route.logMessage(this, memento);
}
} else {
error("Appender " + getName() + " is unable to write primary appenders. queue is full");
logToErrorAppenderIfNecessary(false, memento);
}
}
}
public void append(final LogEvent logEvent) {
if (!isStarted()) {
throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
}
final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
//放入队列
if (!transfer(memento)) {
//如果配置blocking=true,当队列满时,执行对应的策略
if (blocking) {
if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
// If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
AsyncQueueFullMessageUtil.logWarningToStatusLogger();
logMessageInCurrentThread(logEvent);
} else {
// delegate to the event router (which may discard, enqueue and block, or log in current thread)
final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
route.logMessage(this, memento);
}
} else {
error("Appender " + getName() + " is unable to write primary appenders. queue is full");
logToErrorAppenderIfNecessary(false, memento);
}
}
}
从阻塞队列中取出日志进行消费是在后台线程AsyncThread进行的,该线程会一直尝试从阻塞队列中获取LogEvent,如果获取成功,调用AppenderRef所引用Appender的append方法。
private class AsyncThread extends Log4jThread {
private volatile boolean shutdown = false;
//此处的appenders代表的是在配置文件中配置的appender-ref,指向实际执行的appender
private final List<AppenderControl> appenders;
private final BlockingQueue<LogEvent> queue;
public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
this.appenders = appenders;
this.queue = queue;
setDaemon(true);
}
public void run() {
while (!shutdown) {
LogEvent event;
try {
event = queue.take();
if (event == SHUTDOWN_LOG_EVENT) {
shutdown = true;
continue;
}
} catch (final InterruptedException ex) {
break; // LOG4J2-830
}
event.setEndOfBatch(queue.isEmpty());
final boolean success = callAppenders(event);
if (!success && errorAppender != null) {
try {
errorAppender.callAppender(event);
} catch (final Exception ex) {
// Silently accept the error.
}
}
}
// Process any remaining items in the queue.
LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
queue.size());
int count = 0;
int ignored = 0;
while (!queue.isEmpty()) {
try {
final LogEvent event = queue.take();
if (event instanceof Log4jLogEvent) {
final Log4jLogEvent logEvent = (Log4jLogEvent) event;
logEvent.setEndOfBatch(queue.isEmpty());
callAppenders(logEvent);
count++;
} else {
ignored++;
LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
}
} catch (final InterruptedException ex) {
// May have been interrupted to shut down.
// Here we ignore interrupts and try to process all remaining events.
}
}
LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
+ "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
}
/**
* Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
* objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
* exceptions are silently ignored.
*
* @param event the event to forward to the registered appenders
* @return {@code true} if at least one appender call succeeded, {@code false} otherwise
*/
boolean callAppenders(final LogEvent event) {
boolean success = false;
for (final AppenderControl control : appenders) {
try {
control.callAppender(event);
success = true;
} catch (final Exception ex) {
// If no appender is successful the error appender will get it.
}
}
return success;
}
public void shutdown() {
shutdown = true;
if (queue.isEmpty()) {
queue.offer(SHUTDOWN_LOG_EVENT);
}
if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
}
}
}
吞吐量对比
参考文档
https://www.cnblogs.com/lewis09/p/10003462.html
以上是关于log4j源码分析-2 (AysncLoggerAsyncAppender)的主要内容,如果未能解决你的问题,请参考以下文章