强制任务在当前线程上继续?
Posted
技术标签:
【中文标题】强制任务在当前线程上继续?【英文标题】:Force a Task to continue on the current thread? 【发布时间】:2014-01-19 00:40:29 【问题描述】:我正在为 .NET 移植 AKKA 框架(现在不要太认真,现在是周末对 Actor 部分的破解)
我在其中的“未来”支持方面遇到了一些问题。 在 Java/Scala Akka 中,Future 将与 Await 调用同步等待。 很像 .NET Task.Wait()
我的目标是为此支持真正的异步等待。 它现在可以工作,但是在我当前的解决方案中,继续在错误的线程上执行。
这是将消息传递给我的一个演员时的结果,其中包含一个未来的等待块。 如您所见,actor 始终在同一个线程上执行,而 await 块在随机线程池线程上执行。
actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...
actor 使用 DataFlow BufferBlock<Message>
获取消息
或者更确切地说,我在缓冲区块上使用 RX 来订阅消息。
它是这样配置的:
var messages = new BufferBlock<Message>()
BoundedCapacity = 100,
TaskScheduler = TaskScheduler.Default,
;
messages.AsObservable().Subscribe(this);
到目前为止一切顺利。
但是,当我等待未来的结果时。 像这样:
protected override void OnReceive(IMessage message)
....
var result = await Ask(logger, m);
// This is not executed on the same thread as the above code
result.Match()
.With<SomeMessage>(t =>
Console.WriteLine("await thread 0",
System.Threading.Thread.CurrentThread.GetHashCode());
)
.Default(_ => Console.WriteLine("Unknown message"));
...
我知道这是异步等待的正常行为,但我确实必须确保只有一个线程可以访问我的演员。
我不希望 future 同步运行,我希望像往常一样运行异步,但我希望继续运行在与消息处理器/actor 相同的线程上。
我的未来支持代码如下所示:
public Task<IMessage> Ask(ActorRef actor, IMessage message)
TaskCompletionSource<IMessage> result =
new TaskCompletionSource<IMessage>();
var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());
// once this object gets a response,
// we set the result for the task completion source
var futureActorRef = new FutureActorRef(result);
future.Tell(new SetRespondTo(), futureActorRef);
actor.Tell(message, future);
return result.Task;
有什么想法可以强制继续在启动上述代码的同一线程上运行吗?
【问题讨论】:
该线程上的某些东西必须合作。你不能劫持一个线程。该线程必须以某种方式调用您的继续。也许使用自定义 SynchronizationContext。 不能以某种方式为活动线程安排继续吗? 线程池线程不获取特定的工作项,它们从队列中获取未指定的项。您将无法定位任何特定线程。不过,您可以使用使用您控制的线程的自定义 TaskScheduler。现在这变得很棘手。也许更容易删除结束在同一个线程上的要求?! 我想说的是 SetResult 不能保证被阻止。如果您在文档中没有找到该声明,则无法保证并且您不能依赖它。您也无法对此进行测试。 我在 MSDN 上发现了这一点:msdn.microsoft.com/en-us/library/… "ExecuteSynchronously 指定应同步执行延续任务。指定此选项后,延续将在导致先前任务的同一线程上运行过渡到它的最终状态。如果在创建延续时前件已经完成,则延续将在创建延续的线程上运行。只有非常短运行的延续应该同步执行“。它是可靠的。 【参考方案1】:我正在为 .NET 移植 AKKA 框架
甜。尽管从未接触过 Java/Scala/Akka,但我在 CodeMash '13 上参加了 Akka 演讲。我看到了 .NET 库/框架的巨大潜力。微软is working on something similar,我希望它最终会普遍可用(它是currently in a limited preview)。
我怀疑尽可能多地呆在 Dataflow/Rx 世界中是更简单的方法; async
最适用于异步操作(每个操作只有一个开始和单个结果),而 Dataflow 和 Rx 更适用于流和订阅(单个开始和多个结果)。所以我的第一个直觉反应是将缓冲区块链接到具有特定调度程序的ActionBlock
,或者使用ObserveOn
将Rx 通知移动到特定调度程序,而不是尝试在async
端进行。当然,我对 Akka API 的设计并不十分熟悉,所以对它持保留态度。
无论如何,我的async
intro 描述了唯一两个用于安排await
延续的可靠选项:SynchronizationContext.Current
和TaskScheduler.Current
。如果您的 Akka 端口更像是一个 框架(您的代码在其中进行托管,并且最终用户代码始终由您的代码执行),那么 SynchronizationContext
可能有意义。如果您的端口更像是一个库(最终用户代码在其中进行托管并在必要时调用您的代码),那么TaskScheduler
会更有意义。
自定义SynchronizationContext
的示例并不多,因为这种情况非常少见。我的AsyncEx library 中确实有一个AsyncContextThread
type,它为该线程定义了SynchronizationContext
和TaskScheduler
。有几个自定义TaskScheduler
s 的示例,例如Parallel Extensions Extras,它有一个STA scheduler 和一个"current thread" scheduler。
【讨论】:
请注意,Microsoft 正在公开开发 ActorFx 库。我发现奥尔良和 ActorFx 是否应该在未来合并或者是什么交易有点不清楚。 F# 用户组也有一些关于 Akka 移植到 F# 的讨论,以及一些实现代码的指针。我想主要观点在Fakka - F# Akka and the role it could play for f# broader appeal。【参考方案2】:任务调度程序决定是在新线程还是在当前线程上运行任务。 有一个选项可以强制在新线程上运行它,但没有强制它在当前线程上运行。 但是有一个方法 Task.RunSynchronously() 在当前 TaskScheduler 上同步运行任务。 此外,如果您使用的是 async/await,那么已经有类似的问题了。
【讨论】:
但是这个问题不是关于运行一个任务,而是关于在等待之后继续执行的地方。以上是关于强制任务在当前线程上继续?的主要内容,如果未能解决你的问题,请参考以下文章