在 TPL 数据流中动态订阅/取消订阅

Posted

技术标签:

【中文标题】在 TPL 数据流中动态订阅/取消订阅【英文标题】:Dynamically subscribe/unsubscribe in TPL Dataflow 【发布时间】:2021-08-08 17:03:47 【问题描述】:

我有一个消息流,根据一些标准,我希望每个消费者能够并行处理其中一些消息。每个消费者都应该能够动态订阅和取消订阅。

我有以下输入数据约束:

每秒大约 500 条消息 大约 15000 名消费者 大约 500 个类别 在大多数情况下,每个消费者订阅 1-3 个类别。

到目前为止,这是我所拥有的:

public class Test

    static void Main()
    
        var consumer1 = new Consumer("Consumer1");
        consumer1.SubscribeForCategory(1);
        consumer1.SubscribeForCategory(2);

        var consumer2 = new Consumer("Consumer2");
        consumer2.SubscribeForCategory(2);
        consumer2.SubscribeForCategory(3);
        consumer2.SubscribeForCategory(4);

        var consumer3 = new Consumer("Consumer3");
        consumer3.SubscribeForCategory(3);
        consumer3.SubscribeForCategory(4);

        var consumers = new[] consumer1, consumer2, consumer3;
        var publisher = new Publisher(consumers);

        var message1 = new Message(1, "message1 test");
        var message2 = new Message(2, "message2");
        var message3 = new Message(1, "message3");
        var message4 = new Message(3, "message4 test");
        var message5 = new Message(4, "message5");
        var message6 = new Message(3, "message6");

        var messages = new[] message1, message2, message3, message4, message5, message6;

        foreach (var message in messages)
        
            publisher.Publish(message);
        

        Console.ReadLine();
    


public class Message

    public Message(int categoryId, string data)
    
        CategoryId = categoryId;
        Data = data;
    

    public int CategoryId  get; 

    public string Data  get; 


public class Publisher

    private readonly IEnumerable<Consumer> _consumers;

    public Publisher(IEnumerable<Consumer> consumers)
    
        _consumers = consumers;
    

    public void Publish(Message message)
    
        IEnumerable<Consumer> consumers = _consumers.Where(c => c.CategoryIds.Contains(message.CategoryId));
        foreach (Consumer consumer in consumers)
        
            consumer.AddMessage(message);
        
    


public class Consumer

    private readonly HashSet<int> _categoryIds;
    private readonly ActionBlock<Message> _queue;

    public Consumer(string name)
    
        Name = name;
        _categoryIds = new HashSet<int>();

        _queue = new ActionBlock<Message>(async m =>  await Foo(m); , 
                                          new ExecutionDataflowBlockOptions 
                                          
                                              MaxDegreeOfParallelism = 1, 
                                              SingleProducerConstrained = true
                                          );
    

    public string Name  get; 

    public IReadOnlyCollection<int> CategoryIds => _categoryIds;

    public void AddMessage(Message message)
    
        bool accepted = _queue.Post(message);
        if (!accepted)
        
            Console.WriteLine("Message has been rejected!");
        
    

    public void SubscribeForCategory(int categoryId)
    
        _categoryIds.Add(categoryId);
    

    private async Task Foo(Message message)
    
        // process message
        await Task.Delay(10);

        if (message.Data.Contains("test"))
        
            _categoryIds.Remove(message.CategoryId);
        

        Console.WriteLine($"[Name] - category id: [message.CategoryId] data: [message.Data]");
    

很遗憾,该解决方案存在几个问题:

    在消费者处理每条消息时,有可能取消订阅已添加到 ActionBlock 输入队列中的某些消息。 在 Publisher.cs 中,我正在迭代每个帐户类别集合,稍后在 Account Foo 方法中,有机会删除某些类别,这将导致以下异常: System.InvalidOperationException: Collection was modified;枚举操作可能无法执行。 另外,我不太确定在 publisher.Publish() 中添加“调度逻辑”是否是个好主意

一种可能的解决方案是将所有消息转发给每个消费者(每个消费者都应该决定是否应该处理它),但恐怕这会慢得多。

我知道 Akka.Net 和 Microsoft Orleans 等基于 Actor 模型的框架,但我希望所有这些都在进程中发生(当然,如果可以实现的话)。

有没有人有更优雅的解决方案?您对如何改进当前方法有什么建议吗?

【问题讨论】:

分类有多少?每个消费者平均同时订阅多少个类别? 我有大约 500 个类别。这取决于。在大多数情况下,大约 1-3 个类别,在极少数情况下超过 150 个。 您是否尝试过使用synchronization primitive 来允许您访问集合而不会遇到System.InvalidOperationException @chunk1ty Dataflow 已经提供了您想要的功能,您无需编写所有代码。您所需要做的就是将块链接在一起并在链接中指定一个谓词来过滤项目。您可以在运行时断开链接。 Publisher 本身应该是 TransformBlockActionBlock 不是队列,而是实际的消费者 顺便说一句,你怎么会有 15K 消费者?他们在做什么?它们真的是不同的方法,还是具有不同参考/配置数据的相同方法?为什么不将该数据与每条消息一起传递给消费者呢?还是它们代表不同的远程设备或服务? 【参考方案1】:

TPL DataFlow 库已经提供了您想要的。它的块不是队列,它们是实际的生产者和消费者。您可以删除几乎所有您添加的代码。您甚至可以使用 LINQ 查询来创建和链接“发布者”和“消费者”:

var n=10;
var consumers=( from i in Enumerable.Range(0,n)
                let categories=new ConcurrentDictinoary<int,int>()
                select new  
                             Block=new ActionBlock(msg=>Consume(msg,categories)
                                                        ,blockOptions),
                             Categories=categories
                ).ToArray();

foreach(var pair in consumers)

    publisher.LinkTo(pair.Block,linkOption,msg=>IsAllowed(msg,pair.Category));


bool IsAllowed(Message msg,ConcurrentDictionary<int,int> categories)

    return categories.ContainsKey(msg.CategoryId);


async Task Consume(Message message,ConcurrentDictinary<int,int> categories)

    if (message.Data.Contains("test"))
    
        categories.TryRemove(message.CategoryId);
    
    ...

块与函数一起使用并非偶然。 Dataflow 库和它所基于的 CSP 范式与 OOP 有很大不同,更接近于函数式编程。

顺便说一下,TPL Dataflow 是从 Microsoft Robotics Frameworks 和 Concurrency Runtime 发展而来的。在机器人技术和自动化领域,有很多大量微处理器在交换信息。数据流 专为创建复杂的处理网格和处理大量消息而构建。

说明

数据流不是一组队列,它包含旨在链接到管道中的活动块。 ActionBlock 不是队列,它一个队列。实际上,它是一个消费者,通常位于管道的尾部。 TransformBlock 接收传入的消息,一一处理它们,然后将它们发送到任何链接的块。

块是链接的,因此您无需手动从一个块中获取消息并将它们传递给另一个块。 Link 可以包含一个谓词,用于过滤目标块接受的消息。可以通过调用Dispose 来切断链接。

假设这是“消费者”方法:

async Task Consume(Message message)

    await Task.Delay(100);
    Console.WriteLine($"Category id: [message.CategoryId] data: [message.Data]");

您可以创建一些 ActionBlock,也许在一个数组中:

var consumers=new[]
     new ActionBlock(Consume),
     new ActionBlock(Consume),
     new ActionBlock(Consume)
;

每个动作块当然可以使用不同的委托。

管道的“头”应该是一个 TransformBlock。在这种情况下,发布者除了链接到目标块之外什么都不做。至少我们可以打印一些东西:

Message PassThrough(Message message)

    Console.WriteLine("Incoming");
    return Message;


var publisher=new TransformBlock(PassThrough);

您可以使用LinkTo 将“发布者”链接到“消费者”:

var options=new DataflowLinkOptions  PropagateCompletion=true;

var link1=publisher.LinkTo(consumers[0],options, msg=>msg.CategoryId % 3==0);
var link2=publisher.LinkTo(consumers[1],options, msg=>msg.CategoryId % 3==1);
var link3=publisher.LinkTo(consumers[2],options, msg=>msg.CategoryId % 3==2);

“发布者”块产生的消息将被发送到链接谓词接受它的第一个目标。消息按照创建的顺序提供给链接。如果没有链接接受该消息,它将留在输出队列中并阻止它。

在实际场景中,应始终确保所有消息都得到处理,或者有一个块可以处理任何不匹配的内容。

public.LinkTo(theOtherBlock,options);

link1link2link3 对象只是 IDisposeables。它们可用于断开链接:

link2.Dispose();

可以随时创建和断开链接,根据需要更改管道(或更复杂设计中的网格)的形状。如果链接被破坏或修改,任何已经发布到目标块队列的消息都不会被丢弃。

为了减少不需要的消息的数量,我们可以为每个块的输入队列添加一个绑定:

var blockOptions=new DataflowBlockOptions  BoundedCapacity=1 ;

var consumers=new[]
     new ActionBlock(Consume,blockOptions),
     new ActionBlock(Consume,blockOptions),
     new ActionBlock(Consume,blockOptions)
;

要动态更改接受的消息,我们可以将值存储在例如ConcurrentDictionary 中。谓词可能会在消费者修改允许值的同时尝试检查消息:

ConcurrentDictionary[] _allowedCategories=new[] 
    new ConcurrentDictionary<int,int>(),
    new ConcurrentDictionary<int,int>(),
    new ConcurrentDictionary<int,int>(),
;

async Task Consume(Message message,ConcurrentDictinary<int,int> categories)

    if (message.Data.Contains("test"))
    
        categories.TryRemove(message.CategoryId);
    
    ...

而“消费者”变成了

var consumers=new[]
     new ActionBlock(msg=>Consume(msg,categories[0])),
     new ActionBlock(msg=>Consume(msg,categories[1])),
     new ActionBlock(msg=>Consume(msg,categories[2]))
;

最好为链接谓词创建一个单独的方法:

bool IsAllowed(Message msg,ConcurrentDictionary<int,int> categories)

    return categories.ContainsKey(msg.CategoryId);


var link1=publisher.LinkTo(consumers[0],options, msg=>IsAllowed(msg,categories[0]));
var link2=publisher.LinkTo(consumers[1],options, msg=>IsAllowed(msg,categories[1]));

可以使用 LINQ 和 `Enumerable.Range 创建所有这些。这是否是一个好主意是另一回事:

var n=10;
var consumers=( from i in Enumerable.Range(0,n)
                         let categories=new ConcurrentDictinoary<int,int>()
                         select new  
                             Block=new ActionBlock(msg=>Consume(msg,categories)
                                                        ,blockOptions),
                             Categories=categories
                         ).ToArray();

foreach(var pair in consumers)

    publisher.LinkTo(pair.Block,linkOption,msg=>IsAllowed(msg,pair.Category));

无论网格如何构建,发布到它都是一样的。在头块上使用SendAsync

for(int i=0;i<1000;i++)

    var msg=new Message(...);
    await publisher.SendAsync(msg);

【讨论】:

它看起来很有前途,也更优雅。我阅读了有关 TransformBlock 的信息,但我不确定如何在运行时更改过滤条件。当我有时间时,我必须检查一下。 我注意到您没有在 ActionBlock 中使用 SingleProducerConstrained 选项。是什么原因? @chunk1ty 除了从头开始写一个很长的答案之外别无他法 OP 有 15000 个消费者,每个消费者是/有一个数据流块。使用LinkTo+predicate 将生产者块与15,000 个消费者块链接起来,意味着每条消息将调用15,000 次谓词(IsAllowed 方法)。每秒 500 条消息乘以每秒 7,500,000 次调用。这是我认为不可接受的开销,这就是为什么我没有在回答中提出这个解决方案。 @TheodorZoulias 除非没有理由使用 15K 个消费者,每个消费者只会收到 2 条消息。 OP 正在尝试使用 ActionBlock 作为 Akka 代理。这是一个非常不同的范例。数据流类似于 shell 脚本管道。您不需要创建 15K 的程序实例来处理文件中的 15K 行。您创建一个管道,将行拆分并传递它们一个接一个地处理它们【参考方案2】:

我认为您的模型中缺少实体 Category,添加它不仅会在概念上而且会在性能方面改进您的模型。每个类别都可以保存一个订阅该类别的消费者列表,这使得仅向订阅的消费者发送消息变得微不足道。

为了解决线程安全问题,我的建议是使用immutable collections 而不是可变的HashSet&lt;T&gt;s 或List&lt;T&gt;s。不可变集合的优点是可以使用低锁定技术(ImmutableInterlocked.Update 方法)安全地自动更新它们,并且可以随时提供不受未来修改影响的内容快照。如果你问如何改变不可变集合,答案是你没有改变它,而是用不同的不可变集合替换引用。这些结构的实现方式允许其内部零碎的高度可重用性。例如,在 ImmutableHashSet&lt;T&gt; 中添加一个已经包含 1,000,000 个项目的项目,不需要分配一个包含所有旧项目和新项目的新内存块。只会分配少量的小对象(内部二叉树中的节点)。

这种便利是有代价的:不可变集合上的大多数操作至少比可变集合上的相同操作慢 10 倍。这种开销很可能在宏观计划中可以忽略不计,但您可能想自己分析和衡量它,并判断它是否有影响。

Category 类:

public class Category

    private ImmutableHashSet<Consumer> _consumers;

    public int Id  get; 
    public ImmutableHashSet<Consumer> Consumers => Volatile.Read(ref _consumers);

    public Category(int id)
    
        this.Id = id;
        _consumers = ImmutableHashSet.Create<Consumer>();
    

    public void SubscribeConsumer(Consumer consumer) =>
        ImmutableInterlocked.Update(ref _consumers, col => col.Add(consumer));

    public void UnsubscribeConsumer(Consumer consumer) =>
        ImmutableInterlocked.Update(ref _consumers, col => col.Remove(consumer));

注意Volatile.Read,它确保存储在_consumers 字段中的最新引用将立即对所有访问Consumers 属性的线程可见。

Consumer 类:

public class Consumer

    private readonly ActionBlock<Message> _block;
    private IImmutableList<Category> _categories;

    public string Name  get; 
    public IImmutableList<Category> Categories => Volatile.Read(ref _categories);

    public Consumer(string name)
    
        this.Name = name;
        _categories = ImmutableArray.Create<Category>();
        _block = new ActionBlock<Message>(async message =>
        
            if (!Categories.Any(cat => cat.Id == message.CategoryId)) return;
            // Process message...
        );
    

    public void SendMessage(Message message)
    
        bool accepted = _block.Post(message);
        Debug.Assert(accepted);
    

    public void SubscribeForCategory(Category category)
    
        ImmutableInterlocked.Update(ref _categories, col => col.Add(category));
        category.SubscribeConsumer(this);
    

    public void UnsubscribeForCategory(Category category)
    
        ImmutableInterlocked.Update(ref _categories, col => col.Remove(category));
        category.UnsubscribeConsumer(this);
    

注意SubscribeForCategory 方法还负责添加反向关系(类别-&gt; 消费者)。在上面的实现中,这两个关系不是原子添加的,这意味着观察者可以看到消费者订阅了一个类别,而该类别没有订阅消费者。根据您的描述,您的应用中似乎不存在这样的观察者,因此这种不一致可能并不重要。

Publisher 类需要保存类别列表,而不是消费者:

public class Publisher

    private readonly Dictionary<int, Category> _categories;

    public Publisher(IEnumerable<Category> categories)
    
        _categories = categories.ToDictionary(cat => cat.Id);
    

    public void Publish(Message message)
    
        var category = _categories[message.CategoryId];
        foreach (Consumer consumer in category.Consumers)
            consumer.SendMessage(message);
    

注意Publish 方法是多么简单。

【讨论】:

以上是关于在 TPL 数据流中动态订阅/取消订阅的主要内容,如果未能解决你的问题,请参考以下文章

EventBus事件通信框架 ( 订阅方法注册 | 注册 事件类型 - 订阅类 + 订阅方法 到指定集合 | 取消注册 数据准备 )

Apollo GraphQL 取消订阅似乎坏了

React Apollo - 取消订阅查询

合并 - 订阅者在第二次订阅时被静默取消

离子生命周期取消/订阅 Firebase 数据库流(使用异步管道)

访问用户之前在 iOS 上取消、升级或删除的订阅的接收数据信息