如何将 MDC 与线程池一起使用?
Posted
技术标签:
【中文标题】如何将 MDC 与线程池一起使用?【英文标题】:How to use MDC with thread pools? 【发布时间】:2011-08-29 16:53:25 【问题描述】:在我们的软件中,我们广泛使用 MDC 来跟踪 Web 请求的会话 ID 和用户名等内容。这在原始线程中运行时工作正常。
但是,有很多事情需要在后台处理。为此,我们使用java.concurrent.ThreadPoolExecutor
和java.util.Timer
类以及一些自滚动的async 执行服务。所有这些服务都管理自己的线程池。
这就是Logback's manual 在这样的环境中使用 MDC 不得不说的:
工作线程不能总是从启动线程继承映射的诊断上下文的副本。当 java.util.concurrent.Executors 用于线程管理时就是这种情况。例如,newCachedThreadPool 方法创建一个 ThreadPoolExecutor 和其他线程池代码一样,它具有复杂的线程创建逻辑。
在这种情况下,建议在将任务提交给执行程序之前在原始(主)线程上调用 MDC.getCopyOfContextMap()。当任务运行时,作为它的第一个动作,它应该调用 MDC.setContextMapValues() 将原始 MDC 值的存储副本与新的 Executor 托管线程相关联。
这很好,但是很容易忘记添加这些调用,并且没有简单的方法可以识别问题,直到为时已晚。 Log4j 的唯一标志是您在日志中缺少 MDC 信息,而使用 Logback 您会获得陈旧的 MDC 信息(因为线程池中的线程从在其上运行的第一个任务继承其 MDC)。两者都是生产系统中的严重问题。
我认为我们的情况并没有什么特别之处,但我在网上找不到太多关于这个问题的信息。显然,这不是很多人遇到的事情,所以必须有办法避免它。我们在这里做错了什么?
【问题讨论】:
如果您的应用程序部署在 JEE 环境下,您可以使用 java 拦截器在 EJB 调用之前设置 MDC 上下文。 从 logback 版本 1.1.5 开始,MDC 值不再由子线程继承。 jira.qos.ch/browse/LOGBACK-422 已解决 @Ceki 需要更新文档:“子线程自动继承其父线程的映射诊断上下文的副本。” logback.qos.ch/manual/mdc.html 我向 slf4j 创建了一个拉取请求,它解决了跨线程使用 MDC 的问题(链接 github.com/qos-ch/slf4j/pull/150)。可能,如果人们发表评论并提出要求,他们会将更改纳入 SLF4J :) 【参考方案1】:是的,这也是我遇到的一个常见问题。有一些解决方法(如手动设置,如上所述),但理想情况下,您需要一个解决方案,
一致地设置 MDC; 避免MDC不正确但您不知道的默认错误;和 尽量减少对线程池使用方式的更改(例如,将Callable
与MyCallable
进行子类化,或者类似的丑陋)。
这是我使用的满足这三个需求的解决方案。代码应该是不言自明的。
(作为旁注,可以创建此执行程序并将其提供给 Guava 的 MoreExecutors.listeningDecorator()
,如果
你使用 Guava 的ListanableFuture
。)
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
/**
* A SLF4J MDC-compatible @link ThreadPoolExecutor.
* <p/>
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
* thread pool. This is a drop-in replacement for @link ThreadPoolExecutor sets MDC data before each task appropriately.
* <p/>
* Created by jlevy.
* Date: 6/14/13
*/
public class MdcThreadPoolExecutor extends ThreadPoolExecutor
final private boolean useFixedContext;
final private Map<String, Object> fixedContext;
/**
* Pool where task threads take MDC from the submitting thread.
*/
public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue)
return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
/**
* Pool where task threads take fixed MDC from the thread that creates the pool.
*/
@SuppressWarnings("unchecked")
public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue)
return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue);
/**
* Pool where task threads always have a specified, fixed MDC.
*/
public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue)
return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.fixedContext = fixedContext;
useFixedContext = (fixedContext != null);
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask()
return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
/**
* All executions will have MDC injected. @code ThreadPoolExecutor's submission methods (@code submit() etc.)
* all delegate to this.
*/
@Override
public void execute(Runnable command)
super.execute(wrap(command, getContextForTask()));
public static Runnable wrap(final Runnable runnable, final Map<String, Object> context)
return new Runnable()
@Override
public void run()
Map previous = MDC.getCopyOfContextMap();
if (context == null)
MDC.clear();
else
MDC.setContextMap(context);
try
runnable.run();
finally
if (previous == null)
MDC.clear();
else
MDC.setContextMap(previous);
;
【讨论】:
万一之前的context不为空,是不是一直都是垃圾?为什么要随身携带? 对;它不应该被设置。看起来卫生很好,例如如果 wrap() 方法被公开并被其他人使用。 您能否提供有关此 MdcThreadPoolExecutor 如何被 Log4J2 附加或引用的参考?有没有什么地方我们需要专门引用这个类,或者它是“自动”完成的?我没有使用番石榴。我可以,但我想知道在使用它之前是否有其他方法。 如果我正确理解您的问题,答案是肯定的,它是 SLF4J 中的“神奇”线程局部变量——请参阅 MDC.setContextMap() 等的实现。另外,顺便说一下,这使用SLF4J,而不是 Log4J,后者更可取,因为它适用于 Log4j、Logback 和其他日志记录设置。 为了完整性:如果你使用 Spring 的ThreadPoolTaskExecutor
而不是普通的 Java ThreadPoolExecutor
,你可以使用 moelholm.com/2017/07/24/… 中描述的 MdcTaskDecorator
【参考方案2】:
我们遇到了类似的问题。您可能希望扩展 ThreadPoolExecutor 并覆盖 before/afterExecute 方法,以便在启动/停止新线程之前进行所需的 MDC 调用。
【讨论】:
beforeExecute(Thread, Runnable)
和 afterExecute(Runnable, Throwable)
方法在其他情况下可能会有所帮助,但我不确定这将如何用于设置 MDC。它们都在生成的线程下执行。这意味着您需要能够在beforeExecute
之前从主线程获取更新的地图。
最好在过滤器中设置MDC,这意味着当请求正在被业务逻辑处理时,上下文不会被更新。我认为我们不应该在整个应用程序的任何地方更新 MDC【参考方案3】:
恕我直言,最好的解决方案是:
使用ThreadPoolTaskExecutor
实现你自己的TaskDecorator
使用它:executor.setTaskDecorator(new LoggingTaskDecorator());
装饰器可以是这样的:
private final class LoggingTaskDecorator implements TaskDecorator
@Override
public Runnable decorate(Runnable task)
// web thread
Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
return () ->
// work thread
try
// TODO: is this thread safe?
MDC.setContextMap(webThreadContext);
task.run();
finally
MDC.clear();
;
【讨论】:
抱歉,不太清楚您的意思。更新:我想我现在看到了,会改进我的答案。 只是为了让其他人知道:ThreadPoolTaskExecutor
和 TaskDecorator
都是 Spring 类。
关于你关于线程安全的问题:MDC.getCopyOfContextMap() 可能不是线程安全的,如果与过时的 SLF4J 一起使用,它只会返回一个可变的 ThreadContext.getContext()。对于较新的 SLF4J 版本,返回 new HashMap(ThreadContext.getContext())。【参考方案4】:
这就是我使用固定线程池和执行程序的方式:
ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();
在线程部分:
executor.submit(() ->
MDC.setContextMap(mdcContextMap);
// my stuff
);
【讨论】:
【参考方案5】:如果您在使用 @Async
注释运行任务的 Spring 框架相关环境中遇到此问题,您可以使用 TaskDecorator 方法来装饰任务。
这里提供了一个示例:
Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads我遇到了这个问题,上面的文章帮助我解决了这个问题,所以我在这里分享它。
【讨论】:
【参考方案6】:与之前发布的解决方案类似,Runnable
和 Callable
的 newTaskFor
方法可以被覆盖,以便在创建 RunnableFuture
时包装参数(请参阅接受的解决方案)。
注意:因此,必须调用executorService
的submit
方法而不是execute
方法。
对于ScheduledThreadPoolExecutor
,decorateTask
方法将被覆盖。
【讨论】:
【参考方案7】:与此处现有答案类似的另一个变体是实现 ExecutorService
并允许将委托传递给它。然后使用泛型,它仍然可以公开实际的委托,以防万一想要获取一些统计信息(只要不使用其他修改方法)。
参考代码:
https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common/concurrent/MDCThreadPoolExecutor.java https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common/concurrent/MDCWrappers.javapublic class MDCExecutorService<D extends ExecutorService> implements ExecutorService
private final D delegate;
public MDCExecutorService(D delegate)
this.delegate = delegate;
@Override
public void shutdown()
delegate.shutdown();
@Override
public List<Runnable> shutdownNow()
return delegate.shutdownNow();
@Override
public boolean isShutdown()
return delegate.isShutdown();
@Override
public boolean isTerminated()
return delegate.isTerminated();
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
return delegate.awaitTermination(timeout, unit);
@Override
public <T> Future<T> submit(Callable<T> task)
return delegate.submit(wrap(task));
@Override
public <T> Future<T> submit(Runnable task, T result)
return delegate.submit(wrap(task), result);
@Override
public Future<?> submit(Runnable task)
return delegate.submit(wrap(task));
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
return delegate.invokeAll(wrapCollection(tasks));
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
return delegate.invokeAny(wrapCollection(tasks));
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
@Override
public void execute(Runnable command)
delegate.execute(wrap(command));
public D getDelegate()
return delegate;
/* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
/concurrent/MDCWrappers.java */
private static Runnable wrap(final Runnable runnable)
final Map<String, String> context = MDC.getCopyOfContextMap();
return () ->
Map previous = MDC.getCopyOfContextMap();
if (context == null)
MDC.clear();
else
MDC.setContextMap(context);
try
runnable.run();
finally
if (previous == null)
MDC.clear();
else
MDC.setContextMap(previous);
;
private static <T> Callable<T> wrap(final Callable<T> callable)
final Map<String, String> context = MDC.getCopyOfContextMap();
return () ->
Map previous = MDC.getCopyOfContextMap();
if (context == null)
MDC.clear();
else
MDC.setContextMap(context);
try
return callable.call();
finally
if (previous == null)
MDC.clear();
else
MDC.setContextMap(previous);
;
private static <T> Consumer<T> wrap(final Consumer<T> consumer)
final Map<String, String> context = MDC.getCopyOfContextMap();
return (t) ->
Map previous = MDC.getCopyOfContextMap();
if (context == null)
MDC.clear();
else
MDC.setContextMap(context);
try
consumer.accept(t);
finally
if (previous == null)
MDC.clear();
else
MDC.setContextMap(previous);
;
private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks)
Collection<Callable<T>> wrapped = new ArrayList<>();
for (Callable<T> task : tasks)
wrapped.add(wrap(task));
return wrapped;
【讨论】:
为什么在 callable 运行后要再次设置 MDC?我找到了这个例子,但它似乎在我的调用线程上清除了 MDC:chintanradia.com/blog/… 我认为 MDC 是线程本地的,所以你在 runnable/callable 上所做的任何事情都是它自己的唯一副本? @devo,如果线程最初有一个 MDC 集,那么previous
是非空的,所以在 finally
中我们只是将它恢复回来。在大多数情况下,previous
是 null
,因此 MDC.clear
的执行方式与您找到的其他示例类似。是的,MDC 是线程本地的。【参考方案8】:
我可以使用以下方法解决这个问题
在主线程中(Application.java,我的应用程序的入口点)
static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();
在被Executer调用的类的run方法中
MDC.setContextMap(Application.mdcContextMap);
【讨论】:
以上是关于如何将 MDC 与线程池一起使用?的主要内容,如果未能解决你的问题,请参考以下文章