log4j2 异步日志 -- AsyncAppender
Posted 小脑斧科技博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了log4j2 异步日志 -- AsyncAppender相关的知识,希望对你有一定的参考价值。
1. 引言
在我们的工程项目中,日志记录是必不可少的,在 java 项目中,我们通常会使用 log4j、logback、log4j2 等等组件中的一个来实现日志的记录。
首先,日志记录需要进行磁盘 IO 操作,这无疑是非常耗时的,虽然很多组件提供了写入内存 buffer 区的功能,但这可能造成宕机前部分日志的丢失,另一方面,为了保证日志记录的原子性,这些组件在写入日志时通常需要加锁,在高并发场景下,很容易出现锁等待造成性能的严重下降,这常常是十分令人抓狂的一件事。
log4j2 之所以能够在众多日志组件中脱颖而出,其异步日志的实现,无疑是一个重要的特性。
本文,我们就来详细了解一下,log4j2 的异步日志是如何实现的。
我们知道,Appender 是 log4j2 的核心组件之一,他用来接收 LogEvent 并按照预先的配置打印到指定的位置。而 AsyncAppender 则是 log4j2 提供用来实现异步日志的收集和打印的。
下图就是官方提供的各个日志组件异步 Appender 的执行耗时:
可见 log4j2 的 AsyncAppender 优势是非常明显的。
2. AsyncAppender 的配置
2.1 配置参数
首先我们来看看 AsyncAppender 如何配置和使用,可以参看官方文档:
https://logging.apache.org/log4j/2.x/manual/appenders.html#AsyncAppender
AsyncAppender 拥有以下配置参数:
参数名 | 参数类型 | 描述 |
---|---|---|
AppenderRef | String | 最终执行日志写入的 appender,可配置多个 |
blocking | boolean | 如果为 true 或默认值,当队列满时,阻塞等待,否则将 logEvent 放入 ErrorAppender |
shutdownTimeout | integer | LogEvent 的最长等待时间(毫秒数),为 0 或默认值表示永远等待 |
bufferSize | integer | 队列长度,默认值为 1024 |
errorRef | String | ErrorAppender,如果不配置,则丢弃所有 ErrorLogEvent |
filter | Filter | 用来过滤 LogEvent,只有符合条件的事件才会被处理 |
name | String | Appender 名称 |
ignoreExceptions | boolean | 需要被忽略的异常 |
includeLocation | boolean | 是否打印线程本地信息,默认为 false,如果为 true,会有一定的性能损耗 |
BlockingQueueFactory | BlockingQueueFactory | BlockingQueue 的生产工厂类 |
2.2 配置示例
下面是一个典型配置:
<Configuration name="LinkedTransferQueueExample">
<Appenders>
<List name="List"/>
<Async name="Async" bufferSize="262144">
<AppenderRef ref="List"/>
<LinkedTransferQueue/>
</Async>
</Appenders>
<Loggers>
<Root>
<AppenderRef ref="Async"/>
</Root>
</Loggers>
</Configuration>
3. AsyncAppender 的实现
接下来我们就深入源码,看看 AsyncAppender 是如何实现的。
3.1 类关系图
从类关系图我们可以看到,AsyncAppender 继承自 AbstractAppender。
AbstractAppender 实现了 Appender 接口,并且提供了其中 LifeCycle 接口中生命周期治理的一系列方法的默认实现,同时 AbstractAppender 继承自 AbstractFilterable,提供了 Filterable 接口的默认实现。
3.2 append 方法的实现
public void append(LogEvent logEvent) {
if (!this.isStarted()) {
throw new IllegalStateException("AsyncAppender " + this.getName() + " is not active");
} else {
// 构建 Log4jLogEvent 对象
Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, this.includeLocation);
InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
// 将 event 放入队列
if (!this.transfer(memento)) { // transfer 返回 false 表示队列已满
// blocking 参数决定当队列满时是否放入 ErrorAppender
if (this.blocking) {
if (Logger.getRecursionDepth() > 1) {
Message message = AsyncQueueFullMessageUtil.transform(logEvent.getMessage());
this.logMessageInCurrentThread((new org.apache.logging.log4j.core.impl.Log4jLogEvent.Builder(logEvent)).setMessage(message).build());
} else {
EventRoute route = this.asyncQueueFullPolicy.getRoute(this.thread.getId(), memento.getLevel());
route.logMessage(this, memento);
}
} else {
this.error("Appender " + this.getName() + " is unable to write primary appenders. queue is full");
this.logToErrorAppenderIfNecessary(false, memento);
}
}
}
}
结合上面介绍的配置参数来看,这些代码非常容易理解。
3.3 AsyncQueueFullPolicy
上面流程图中提到了队列 full 策略,他指的是在队列已满且 blocking 参数为 true 时,log4j2 要执行的操作,主要有以下两种:
3.3.1 DefaultAsyncQueueFullPolicy -- 等待队列,转为同步操作策略
public class DefaultAsyncQueueFullPolicy implements AsyncQueueFullPolicy {
@Override
public EventRoute getRoute(final long backgroundThreadId, final Level level) {
// LOG4J2-471: prevent deadlock when RingBuffer is full and object
// being logged calls Logger.log() from its toString() method
if (Thread.currentThread().getId() == backgroundThreadId) {
return EventRoute.SYNCHRONOUS;
}
return EventRoute.ENQUEUE;
}
}
3.3.2 DiscardingAsyncQueueFullPolicy -- 按照日志等级抛弃日志策略
public class DiscardingAsyncQueueFullPolicy implements AsyncQueueFullPolicy {
@Override
public EventRoute getRoute(final long backgroundThreadId, final Level level) {
if (level.isLessSpecificThan(thresholdLevel)) {
if (discardCount.getAndIncrement() == 0) {
LOGGER.warn("Async queue is full, discarding event with level {}. " +
"This message will only appear once; future events from {} " +
"are silently discarded until queue capacity becomes available.",
level, thresholdLevel);
}
return EventRoute.DISCARD;
}
return super.getRoute(backgroundThreadId, level);
}
}
4. log4j2 的队列工厂 -- BlockingQueueFactory
通过上述的源码和讲解,我们已经窥知 log4j2 异步日志提升性能的一些端倪了。
没错,log4j2 是通过将 LogEvent 放入队列,异步消费来实现的。
这里提到的队列,就是我们在配置文件中配置的 BlockingQueueFactory 所生产的队列对象,Log4j2 支持生成以下四种队列:
ArrayBlockingQueue -- 默认的队列,通过 java 原生的 ArrayBlockingQueue 实现。
DisruptorBlockingQueue -- disruptor 包实现的高性能队列。
JCToolsBlockingQueue -- JCTools 实现的无锁队列。
LinkedTransferQueue -- 通过 java7 以上原生支持的 LinkedTransferQueue 实现。
在上述四中中,默认的是 ArrayBlockingQueue,最为推荐的是 disruptor 包实现的高性能队列 DisruptorBlockingQueue。
那么,究竟 DisruptorBlockingQueue 的高性能队列是如何实现的呢?后续我会写专题文章来进行讲解,敬请期待。
5. 队列消费者
有了上面讲述的 AsyncAppender 的 append 方法作为队列的生产者,自然还有对了的消费者,他就是 AsyncThread,也包含在 AsyncAppender.java 中。
他的主要工作是消费队列中的 LogEvent,并将日志按照已配置的 AppenderRef 以指定的方式输出到指定的位置。
AsyncThread 的源码非常容易理解:
private class AsyncThread extends Log4jThread {
private volatile boolean shutdown = false;
private final List<AppenderControl> appenders;
private final BlockingQueue<LogEvent> queue;
public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
this.appenders = appenders;
this.queue = queue;
setDaemon(true);
}
@Override
public void run() {
while (!shutdown) {
LogEvent event;
try {
event = queue.take();
if (event == SHUTDOWN_LOG_EVENT) {
shutdown = true;
continue;
}
} catch (final InterruptedException ex) {
break; // LOG4J2-830
}
event.setEndOfBatch(queue.isEmpty());
final boolean success = callAppenders(event);
if (!success && errorAppender != null) {
try {
errorAppender.callAppender(event);
} catch (final Exception ex) {
// Silently accept the error.
}
}
}
// Process any remaining items in the queue.
LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
queue.size());
int count = 0;
int ignored = 0;
while (!queue.isEmpty()) {
try {
final LogEvent event = queue.take();
if (event instanceof Log4jLogEvent) {
final Log4jLogEvent logEvent = (Log4jLogEvent) event;
logEvent.setEndOfBatch(queue.isEmpty());
callAppenders(logEvent);
count++;
} else {
ignored++;
LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
}
} catch (final InterruptedException ex) {
// May have been interrupted to shut down.
// Here we ignore interrupts and try to process all remaining events.
}
}
LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
+ "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
}
...
}
6. 吞吐量对比
6.1 记录峰值吞吐量
6.2 与其他日志记录包的异步吞吐量比较
微信公众号
以上是关于log4j2 异步日志 -- AsyncAppender的主要内容,如果未能解决你的问题,请参考以下文章