c#数据流或任务,消费消息并行处理

Posted

技术标签:

【中文标题】c#数据流或任务,消费消息并行处理【英文标题】:c# Dataflow or Tasks, consuming messages to process in parallel 【发布时间】:2013-09-23 04:30:18 【问题描述】:

如果我想从外部队列中获取消息,请使用 Redis 或类似名称。让一个线程不断检查队列并将消息发送到相关的BroadcastBlock 进行处理是否更好(例如)

if (message.type == "person")
    personBroadcast.post(message);
else
    monsterBroadcast.post(message);

然后哪个会广播到管道进行处理,或者说 4 Tasks 更好,它们都从队列中取出消息并自己处理它们?

在第一种情况下,TPL DataFlow 块实际上是并行处理的,还是它们需要在单独的任务中?我正在尝试决定哪种方法可以充分利用资源。建议表示赞赏。

【问题讨论】:

【参考方案1】:

使用filtered linking between blocks 可以更轻松地完成:

// this can be a BroadcastBlock too
var broadCastBlock = new BufferBlock();

// filter only persons
broadCastBlock.LinkTo(personBroadcast, m => m.type == "person");

// send all other messages as monsters
broadCastBlock.LinkTo(monsterBroadcast);

每个块都有一个对应的任务,在线程池中执行。默认情况下,任务不会重新启动,但您可以使用 DataflowBlockOptions.MaxMessagesPerTask 属性更改此行为。这将不会影响Completion 任务。

因此您不需要为TPL Dataflow 执行任何额外任务,因为它会自行创建所需的任务。

【讨论】:

【参考方案2】:

您的用例听起来像是一个破坏者框架可以派上用场的用例。简而言之,disruptor 使用单个线程来监听事件,然后将它们分派给处理程序(定义中的广播块)。你可以在https://code.google.com/p/disruptor-net/找到一个.net版本的disruptor

【讨论】:

以上是关于c#数据流或任务,消费消息并行处理的主要内容,如果未能解决你的问题,请参考以下文章

C# 并行开发总结

真正的并行处理[关闭]

如何在基于 C# 的 Windows 服务中处理以不同时间间隔并行运行的多个任务?

RabbitMQ入门教程——工作队列

C#并发编程之初识并行编程

flink调优之压测任务的合理并行度