行为类似于线程的 C# 可重用或持久任务

Posted

技术标签:

【中文标题】行为类似于线程的 C# 可重用或持久任务【英文标题】:C# Reusable or Persistent Tasks that behave like Threads 【发布时间】:2019-10-10 10:21:18 【问题描述】:

使用线程,您可以创建持久的、可重用的局部变量,这些变量对于客户端连接等非常有用。但是,对于 System.Threading.Tasks.Dataflow 中的 ActionBlock 之类的任务,似乎没有任何类型的操作块的持久性或可重用性。因此,对于涉及与客户端交互的 ActionBlock,我的理解是,您要么需要从头开始初始化客户端连接,要么在更高范围内重用一个客户端连接(使用锁定?)。

用例:我正在使用一个反转控制的 .NET 库。大部分逻辑(除了启动和关闭)必须位于名为 ProcessEventsAsync 的单个 Task 方法中,由库调用,该方法接收 IEnumerable 数据。 ProcessEventsAsync 必须对所有数据进行一些处理,然后将其发送给一些下游消费者。为了提高性能,我尝试使用 Tasks 并行化 ProcessEventsAsync 中的逻辑。我还想从这个任务中收集一些性能指标。

让我举一个详细的例子来说明我在做什么:

internal class MyClass


  private String firstDownStreamConnectionString;
  private String secondDownStreamConnectionString;
  private SomeClient firstClient;
  private SomeClient secondClient;
  private ReportingClient reportingClient;
  private int totalUnhandledDataCount;

  public MyClass(String firstDownStreamConnectionString, String secondDownStreamConnectionString, String reportingClientKey)
  
      this.firstDownStreamConnectionString = firstDownStreamConnectionString;
      this.secondDownStreamConnectionString = secondDownStreamConnectionString;
      this.DegreeOfParallelism = Math.Max(Environment.ProcessorCount - 1, 1);
      this.reportingClient = new ReportingClient (reportingClientKey, DegreeOfParallelism);
      this.totalUnhandledDataCount = 0;
  
  // called once when the framework signals that processing is about to be ready
  public override async Task OpenAsync(CancellationToken cancellationToken, PartitionContext context)
  
    this.firstClient = SomeClient.CreateFromConnectionString(this.firstDownStreamConnectionString);
    this.secondClient = SomeClient.CreateFromConnectionString(this.secondDownStreamConnectionString );
    await Task.Yield();
  

  // this is called repeatedly by the framework
  // outside of startup and shutdown, it is the only entrypoint to my logic
  public override async Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable<Data> inputData)
  
    ActionBlock<List<Data>> processorActionBlock = new ActionBlock<List<Data>>(
      inputData =>
      
        SomeData firstDataset = new SomeData();
        SomeData secondDataset = new SomeData();
        int unhandledDataCount = 0;
        foreach (Data data in inputData)
        
          // if data fits one set of criteria, put it in firstDataSet
          // if data fits other set of criteria, put it in secondDataSet
          // otherwise increment unhandledDataCount
        
        Interlocked.Add(ref this.totalUnhandledDataCount, unhandledDataCount);
        lock (this.firstClient)
        
          try
          
            firstDataset.SendData(this.firstClient);
           catch (Exception e)
          
            lock(this.reportingClient)
            
              this.reportingClient.LogTrace(e);
            
          
        
        lock (this.secondClient)
        
          try
          
            secondDataset.SendData(this.secondClient);
           catch (Exception e)
          
            lock(this.reportingClient)
            
              this.reportingClient.LogTrace(e);
            
          
        
      ,
      new ExecutionDataflowBlockOptions
      
        MaxDegreeOfParallelism = this.DegreeOfParallelism
      );
    // construct as many List<Data> from inputData as there is DegreeOfParallelism
    // put that in a variable called batches
    for(int i = 0; i < DegreeOfParallelism; i++)
    
      processorActionBlock.Post(batches[i]);
    
    processorActionBlock.Complete();
    processorActionBlock.Completion.Wait();
    await context.CheckpointAsync();
  

我试图只保留相关代码,我省略了处理逻辑、大多数指标收集、数据如何发送、关闭逻辑等。

我想利用一些允许重用性的Task。我不想为这种类型的所有正在运行的任务重用单个客户端连接,我也不希望每个任务在每次调用时都创建一个新的客户端连接。我确实希望每个类似线程的任务都有一组持久的客户端连接。理想情况下,我也不想在 System.Threading.Tasks.Dataflow 中创建一个包装 Task 或扩展抽象类/接口的新类。

【问题讨论】:

您是否希望将操作放入队列中?客户提出一个事件并继续前进。事件进入队列或导致一些其他操作被放置在队列中。现在您已经与客户端断开连接,可以以最有效的方式处理该队列,可能包括并行处理。 只需将ActionBlockConcurrentDictionary 用于客户端,是的,任何并行都有线程安全开销 /i> 方法,它只是野兽的本性 此外,Dataflow 非常棒,并且专为处理数据管道时的这种情况而构建 @ScottHannen 我刚刚更新了我的代码,所以也许它会更清楚我想要做什么。这种方法的主要问题是 MyClass 的每个实例都需要对其接收的数据按顺序调用context.CheckpointAsync();。所以对 ProcessEventsAsync 的调用需要按顺序完成,并且要完成一个调用,我必须必须能够在上下文中调用 checkPoint @TheGeneral 我目前实际上正在使用 ActionBlocks。使用 ConcurrentDictionary 实际上并不是一个坏主意。有没有办法让 ActionBlock 实例知道它的 instanceid,或者我需要为处理结帐的客户端的 ConcurrentDictionary 实现一个包装器? "有没有办法让 ActionBlock 实例知道它的 instanceid" 在这些情况下,我要么为块创建一个元组或一个结构,即 ActionBlock&lt;(int id, Payload data)&gt; 或 @987654326 @ 或类似的,然后在处理时您天真地拥有有关该对象的信息,就此而言,您无论如何都可以将您的客户端传递给操作块 【参考方案1】:

您所描述的内容听起来像是异步委托或 Func。

例如:

Func<Task> TestFunc = async () =>

    Console.WriteLine("Begin");
    await Task.Delay(100);
    Console.WriteLine("Delay");
    await Task.Delay(100);
    Console.WriteLine("End");
;

如果函数在范围内,您只需:

await TestFunc();

您可以根据需要多次重复使用它。您还可以更改函数以接受参数。


编辑

您也可以尝试 AsyncLocal。根据文档:

因为基于任务的异步编程模型倾向于抽象线程的使用,所以可以使用 AsyncLocal 实例来跨线程持久化数据。

当与当前线程关联的值发生更改时,AsyncLocal 类还提供可选通知,因为它是通过设置 Value 属性显式更改的,或者当线程遇到等待或其他上下文转换时隐式更改。

【讨论】:

你如何通过这个实现(真正的)并行化和局部变量的持久化?在这种情况下,我真的看不出这与 Task.Run 有何不同。 我已编辑答案以包含 AsyncLocal。我希望这对你有用。有空我会尝试添加一个示例。【参考方案2】:

听起来你只需要一个存储依赖项的类?

void Main()

    var doer1 = new ThingDoer();
    var doer2 = new ThingDoer();

    // A & B use one pair of clients, and C & D use another pair
    var taskA = doer1.DoTheThing();
    var taskB = doer1.DoTheThing();

    var taskC = doer2.DoTheThing();
    var taskD = doer2.DoTheThing();


public class ThingDoer

    private SomeClient _someClient;
    private SomeErrorReportingClient _someErrorReportingClient;

    public ThingDoer(SomeClient someClient, SomeErrorReportingClient someErrorReportingClient)
    
        _someClient = someClient;
        _someErrorReportingClient = someErrorReportingClient;
    

    public ThingDoer()
        : this(new SomeClient, new SomeErrorReportingClient)
    
    

    public async Task DoTheThing()
    
        // Implementation here
    

“可重用性”的概念与任务并不真正兼容。

【讨论】:

我并不完全反对这种类型的解决方案,但我认为它不适用于我试图实现的并行性。我在原帖中提到 ActionBlock 的原因是它允许您显式控制并行度。也许我用错了,但让我更新原始帖子以更清楚我是如何使用它的

以上是关于行为类似于线程的 C# 可重用或持久任务的主要内容,如果未能解决你的问题,请参考以下文章

是否有类似于 Blender 节点编辑器的 Python 可重用组件? [关闭]

多线程通讯之共享内存二(可重用锁)

在 Django 项目中为可重用应用程序创建基于类的 Celery 任务

java 优雅的实现多线程等待,可重用的同步屏障Phaser | Java工具类

Angular 可重用模块和组件

可重用的 HttpClient 实例与静态变量(在多个线程中大量使用)?