将 ThreadLocal 传播到从 ExecutorService 获取的新线程
Posted
技术标签:
【中文标题】将 ThreadLocal 传播到从 ExecutorService 获取的新线程【英文标题】:Propagating ThreadLocal to a new Thread fetched from a ExecutorService 【发布时间】:2011-11-07 18:18:48 【问题描述】:我正在使用 ExecutorService 和 Future(示例代码 here)在一个超时的单独线程中运行一个进程(线程“生成”发生在 AOP 方面)。
现在,主线程是一个Resteasy 请求。 Resteasy 使用一个或多个 ThreadLocal 变量来存储一些我需要在我的 Rest 方法调用中检索的上下文信息。问题是,由于 Resteasy 线程在新线程中运行,ThreadLocal 变量丢失了。
将 Resteasy 使用的任何 ThreadLocal 变量“传播”到新线程的最佳方法是什么?似乎 Resteasy 使用多个 ThreadLocal 变量来跟踪上下文信息,我想“盲目地”将所有信息转移到新线程。
我查看了ThreadPoolExecutor
的子类化并使用beforeExecute 方法将当前线程传递给池,但我找不到将 ThreadLocal 变量传递给池的方法。
有什么建议吗?
谢谢
【问题讨论】:
第二段可以稍微改写一下吗?这让我很困惑。另外, beforeExecute 有什么问题?您无法使其正常工作,或者您意识到它无法满足您的需求? 【参考方案1】:与线程关联的ThreadLocal
实例集保存在每个Thread
的私有成员中。您列举这些的唯一机会是对Thread
进行一些反思;这样,您可以覆盖线程字段的访问限制。
一旦您获得了ThreadLocal
的集合,您就可以使用ThreadPoolExecutor
的beforeExecute()
和afterExecute()
钩子在后台线程中复制,或者为您的任务创建一个Runnable
包装器来拦截run()
调用以取消设置必要的 ThreadLocal
实例。实际上,后一种技术可能效果更好,因为它可以方便地在任务排队时存储ThreadLocal
值。
更新:下面是第二种方法的更具体的说明。与我最初的描述相反,包装器中存储的只是调用线程,在执行任务时会询问该线程。
static Runnable wrap(Runnable task)
Thread caller = Thread.currentThread();
return () ->
Iterable<ThreadLocal<?>> vars = copy(caller);
try
task.run();
finally
for (ThreadLocal<?> var : vars)
var.remove();
;
/**
* For each @code ThreadLocal in the specified thread, copy the thread's
* value to the current thread.
*
* @param caller the calling thread
* @return all of the @code ThreadLocal instances that are set on current thread
*/
private static Collection<ThreadLocal<?>> copy(Thread caller)
/* Use a nasty bunch of reflection to do this. */
throw new UnsupportedOperationException();
【讨论】:
你能举例说明后一种技术是如何工作的吗? @erickson 是你的“令人讨厌的反思”,类似于***.com/a/32231177/131929? @MarcelStör 我没有尝试过,但它看起来是正确的信息。因此,除了读取value
字段之外,您还需要在条目上调用get()
以获取该条目引用的ThreadLocal
实例。一旦你有了这两个对象,你就完成了反射。为找到的每个条目调用 local.set(value)
,并将本地线程添加到从 copy()
返回的集合中。
@MarcelStör 当我再看一点时,线程本地映射的并发访问实际上可能存在可见性问题,因为它是为单线程访问设计的。这意味着您需要复制当前线程中的值,然后在工作线程中设置它们,以安全的方式在线程之间传送它们。我将不得不再考虑一下并修改我的答案。
@MarcelStör 经过更多思考,我认为在这种特殊情况下,一切都很好。将任务提交给Executor
将解决可见性问题,因为那里存在内存屏障。并且由于主线程在后台线程运行时处于等待状态,因此主线程的ThreadLocals
在此期间不会改变。但是,如果这些条件发生变化,则可能需要进行一些额外的工作。【参考方案2】:
根据@erickson 的回答,我编写了这段代码。它适用于可继承的ThreadLocals。它使用与 Thread 构造函数中使用的相同方法构建可继承的ThreadLocals 列表。当然,我使用反射来做到这一点。我也重写了执行器类。
public class MyThreadPoolExecutor extends ThreadPoolExecutor
@Override
public void execute(Runnable command)
super.execute(new Wrapped(command, Thread.currentThread()));
包装器:
private class Wrapped implements Runnable
private final Runnable task;
private final Thread caller;
public Wrapped(Runnable task, Thread caller)
this.task = task;
this.caller = caller;
public void run()
Iterable<ThreadLocal<?>> vars = null;
try
vars = copy(caller);
catch (Exception e)
throw new RuntimeException("error when coping Threads", e);
try
task.run();
finally
for (ThreadLocal<?> var : vars)
var.remove();
复制方法:
public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
List<ThreadLocal<?>> threadLocals = new ArrayList<>();
Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
field.setAccessible(true);
Object map = field.get(caller);
Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
table.setAccessible(true);
Method method = ThreadLocal.class
.getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
method.setAccessible(true);
Object o = method.invoke(null, map);
Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
field2.setAccessible(true);
field2.set(Thread.currentThread(), o);
Object tbl = table.get(o);
int length = Array.getLength(tbl);
for (int i = 0; i < length; i++)
Object entry = Array.get(tbl, i);
Object value = null;
if (entry != null)
Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
"get");
referentField.setAccessible(true);
value = referentField.invoke(entry);
threadLocals.add((ThreadLocal<?>) value);
return threadLocals;
【讨论】:
【参考方案3】:据我了解您的问题,您可以查看 InheritableThreadLocal,它旨在将 ThreadLocal
变量从父线程上下文传递到子线程上下文
【讨论】:
行不通,首先,OP 无法控制第三方库中的ThreadLocal
创建。其次,ExecutorService
重用线程,而InheritableThreadLocal
仅在您直接生成新线程时才有效。【参考方案4】:
我不喜欢反射方法。另一种解决方案是实现执行程序包装器并将对象作为ThreadLocal
上下文直接传递给传播父上下文的所有子线程。
public class PropagatedObject
private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();
//put, set, merge methods, etc
==>
public class ObjectAwareExecutor extends AbstractExecutorService
private final ExecutorService delegate;
private final PropagatedObject objectAbsorber;
public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber)
this.delegate = delegate;
this.objectAbsorber = objectAbsorber;
@Override
public void execute(final Runnable command)
final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
delegate.execute(() ->
try
objectAbsorber.set(parentContext);
command.run();
finally
parentContext.putAll(objectAbsorber.get());
objectAbsorber.clean();
);
objectAbsorber.merge(parentContext);
【讨论】:
【参考方案5】:这是一个将父线程中的当前 LocaleContext 传递给 CompletableFuture [默认情况下它使用 ForkJoinPool] 跨越的子线程的示例。
只需在 Runnable 块内的子线程中定义您想要做的所有事情。因此,当 CompletableFuture 执行 Runnable 块时,它是受控制的子线程,瞧,您在 Child 的 ThreadLocal 中设置了父线程的 ThreadLocal 内容。
这里的问题不是整个 ThreadLocal 都被复制过来了。仅复制 LocaleContext。由于 ThreadLocal 只能对它所属的 Thread 进行私有访问,因此使用 Reflection 并尝试在 Child 中获取和设置太多古怪的东西,这可能会导致内存泄漏或性能下降。
因此,如果您从 ThreadLocal 中知道您感兴趣的参数,那么此解决方案的工作方式会更简洁。
public void parentClassMethod(Request request)
LocaleContext currentLocale = LocaleContextHolder.getLocaleContext();
executeInChildThread(() ->
LocaleContextHolder.setLocaleContext(currentLocale);
//Do whatever else you wanna do
));
//Continue stuff you want to do with parent thread
private void executeInChildThread(Runnable runnable)
try
CompletableFuture.runAsync(runnable)
.get();
catch (Exception e)
LOGGER.error("something is wrong");
【讨论】:
【参考方案6】:如果您查看 ThreadLocal 代码,您可以看到:
public T get()
Thread t = Thread.currentThread();
...
当前线程不能被覆盖。
可能的解决方案:
看看 java 7 fork/join 机制(但我认为这是一种不好的方式)
查看 endorsed 机制以覆盖 JVM 中的 ThreadLocal
类。
尝试重写 RESTEasy(你可以在你的 IDE 中使用 Refactor 工具来替换所有 ThreadLocal 的使用,看起来很简单)
【讨论】:
以上是关于将 ThreadLocal 传播到从 ExecutorService 获取的新线程的主要内容,如果未能解决你的问题,请参考以下文章