行为类似于线程的 C# 可重用或持久任务
Posted
技术标签:
【中文标题】行为类似于线程的 C# 可重用或持久任务【英文标题】:C# Reusable or Persistent Tasks that behave like Threads 【发布时间】:2019-10-10 10:21:18 【问题描述】:使用线程,您可以创建持久的、可重用的局部变量,这些变量对于客户端连接等非常有用。但是,对于 System.Threading.Tasks.Dataflow 中的 ActionBlock 之类的任务,似乎没有任何类型的操作块的持久性或可重用性。因此,对于涉及与客户端交互的 ActionBlock,我的理解是,您要么需要从头开始初始化客户端连接,要么在更高范围内重用一个客户端连接(使用锁定?)。
用例:我正在使用一个反转控制的 .NET 库。大部分逻辑(除了启动和关闭)必须位于名为 ProcessEventsAsync 的单个 Task 方法中,由库调用,该方法接收 IEnumerable 数据。 ProcessEventsAsync 必须对所有数据进行一些处理,然后将其发送给一些下游消费者。为了提高性能,我尝试使用 Tasks 并行化 ProcessEventsAsync 中的逻辑。我还想从这个任务中收集一些性能指标。
让我举一个详细的例子来说明我在做什么:
internal class MyClass
private String firstDownStreamConnectionString;
private String secondDownStreamConnectionString;
private SomeClient firstClient;
private SomeClient secondClient;
private ReportingClient reportingClient;
private int totalUnhandledDataCount;
public MyClass(String firstDownStreamConnectionString, String secondDownStreamConnectionString, String reportingClientKey)
this.firstDownStreamConnectionString = firstDownStreamConnectionString;
this.secondDownStreamConnectionString = secondDownStreamConnectionString;
this.DegreeOfParallelism = Math.Max(Environment.ProcessorCount - 1, 1);
this.reportingClient = new ReportingClient (reportingClientKey, DegreeOfParallelism);
this.totalUnhandledDataCount = 0;
// called once when the framework signals that processing is about to be ready
public override async Task OpenAsync(CancellationToken cancellationToken, PartitionContext context)
this.firstClient = SomeClient.CreateFromConnectionString(this.firstDownStreamConnectionString);
this.secondClient = SomeClient.CreateFromConnectionString(this.secondDownStreamConnectionString );
await Task.Yield();
// this is called repeatedly by the framework
// outside of startup and shutdown, it is the only entrypoint to my logic
public override async Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable<Data> inputData)
ActionBlock<List<Data>> processorActionBlock = new ActionBlock<List<Data>>(
inputData =>
SomeData firstDataset = new SomeData();
SomeData secondDataset = new SomeData();
int unhandledDataCount = 0;
foreach (Data data in inputData)
// if data fits one set of criteria, put it in firstDataSet
// if data fits other set of criteria, put it in secondDataSet
// otherwise increment unhandledDataCount
Interlocked.Add(ref this.totalUnhandledDataCount, unhandledDataCount);
lock (this.firstClient)
try
firstDataset.SendData(this.firstClient);
catch (Exception e)
lock(this.reportingClient)
this.reportingClient.LogTrace(e);
lock (this.secondClient)
try
secondDataset.SendData(this.secondClient);
catch (Exception e)
lock(this.reportingClient)
this.reportingClient.LogTrace(e);
,
new ExecutionDataflowBlockOptions
MaxDegreeOfParallelism = this.DegreeOfParallelism
);
// construct as many List<Data> from inputData as there is DegreeOfParallelism
// put that in a variable called batches
for(int i = 0; i < DegreeOfParallelism; i++)
processorActionBlock.Post(batches[i]);
processorActionBlock.Complete();
processorActionBlock.Completion.Wait();
await context.CheckpointAsync();
我试图只保留相关代码,我省略了处理逻辑、大多数指标收集、数据如何发送、关闭逻辑等。
我想利用一些允许重用性的Task。我不想为这种类型的所有正在运行的任务重用单个客户端连接,我也不希望每个任务在每次调用时都创建一个新的客户端连接。我确实希望每个类似线程的任务都有一组持久的客户端连接。理想情况下,我也不想在 System.Threading.Tasks.Dataflow 中创建一个包装 Task 或扩展抽象类/接口的新类。
【问题讨论】:
您是否希望将操作放入队列中?客户提出一个事件并继续前进。事件进入队列或导致一些其他操作被放置在队列中。现在您已经与客户端断开连接,可以以最有效的方式处理该队列,可能包括并行处理。 只需将ActionBlock
和ConcurrentDictionary
用于客户端,是的,任何并行都有线程安全开销 /i> 方法,它只是野兽的本性 此外,Dataflow 非常棒,并且专为处理数据管道时的这种情况而构建
@ScottHannen 我刚刚更新了我的代码,所以也许它会更清楚我想要做什么。这种方法的主要问题是 MyClass 的每个实例都需要对其接收的数据按顺序调用context.CheckpointAsync();
。所以对 ProcessEventsAsync 的调用需要按顺序完成,并且要完成一个调用,我必须必须能够在上下文中调用 checkPoint
@TheGeneral 我目前实际上正在使用 ActionBlocks。使用 ConcurrentDictionary 实际上并不是一个坏主意。有没有办法让 ActionBlock 实例知道它的 instanceid,或者我需要为处理结帐的客户端的 ConcurrentDictionary 实现一个包装器?
"有没有办法让 ActionBlock 实例知道它的 instanceid" 在这些情况下,我要么为块创建一个元组或一个结构,即 ActionBlock<(int id, Payload data)>
或 @987654326 @ 或类似的,然后在处理时您天真地拥有有关该对象的信息,就此而言,您无论如何都可以将您的客户端传递给操作块
【参考方案1】:
您所描述的内容听起来像是异步委托或 Func。
例如:
Func<Task> TestFunc = async () =>
Console.WriteLine("Begin");
await Task.Delay(100);
Console.WriteLine("Delay");
await Task.Delay(100);
Console.WriteLine("End");
;
如果函数在范围内,您只需:
await TestFunc();
您可以根据需要多次重复使用它。您还可以更改函数以接受参数。
编辑
您也可以尝试 AsyncLocal
因为基于任务的异步编程模型倾向于抽象线程的使用,所以可以使用 AsyncLocal 实例来跨线程持久化数据。
当与当前线程关联的值发生更改时,AsyncLocal 类还提供可选通知,因为它是通过设置 Value 属性显式更改的,或者当线程遇到等待或其他上下文转换时隐式更改。
【讨论】:
你如何通过这个实现(真正的)并行化和局部变量的持久化?在这种情况下,我真的看不出这与 Task.Run 有何不同。 我已编辑答案以包含 AsyncLocal听起来你只需要一个存储依赖项的类?
void Main()
var doer1 = new ThingDoer();
var doer2 = new ThingDoer();
// A & B use one pair of clients, and C & D use another pair
var taskA = doer1.DoTheThing();
var taskB = doer1.DoTheThing();
var taskC = doer2.DoTheThing();
var taskD = doer2.DoTheThing();
public class ThingDoer
private SomeClient _someClient;
private SomeErrorReportingClient _someErrorReportingClient;
public ThingDoer(SomeClient someClient, SomeErrorReportingClient someErrorReportingClient)
_someClient = someClient;
_someErrorReportingClient = someErrorReportingClient;
public ThingDoer()
: this(new SomeClient, new SomeErrorReportingClient)
public async Task DoTheThing()
// Implementation here
“可重用性”的概念与任务并不真正兼容。
【讨论】:
我并不完全反对这种类型的解决方案,但我认为它不适用于我试图实现的并行性。我在原帖中提到 ActionBlock 的原因是它允许您显式控制并行度。也许我用错了,但让我更新原始帖子以更清楚我是如何使用它的以上是关于行为类似于线程的 C# 可重用或持久任务的主要内容,如果未能解决你的问题,请参考以下文章
是否有类似于 Blender 节点编辑器的 Python 可重用组件? [关闭]
在 Django 项目中为可重用应用程序创建基于类的 Celery 任务