TPL数据流处理N条最新消息
Posted
技术标签:
【中文标题】TPL数据流处理N条最新消息【英文标题】:TPL dataflow process N latest messages 【发布时间】:2021-10-21 12:39:16 【问题描述】:我正在尝试创建某种队列来处理收到的 N 条最新消息。现在我有这个:
private static void SetupMessaging()
_messagingBroadcastBlock = new BroadcastBlock<string>(msg => msg, new ExecutionDataflowBlockOptions
//BoundedCapacity = 1,
EnsureOrdered = true,
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1
);
_messagingActionBlock = new ActionBlock<string>(msg =>
Console.WriteLine(msg);
Thread.Sleep(5000);
, new ExecutionDataflowBlockOptions
BoundedCapacity = 2,
EnsureOrdered = true,
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1
);
_messagingBroadcastBlock.LinkTo(_messagingActionBlock, new DataflowLinkOptions PropagateCompletion = true );
_messagingBroadcastBlock.LinkTo(DataflowBlock.NullTarget<string>());
问题是如果我将 1,2,3,4,5 发布到它,我会得到 1,2,5,但我希望它是 1,4,5。欢迎提出任何建议。UPD 1 我能够使以下解决方案起作用
class FixedCapacityActionBlock<T>
private readonly ActionBlock<CancellableMessage<T>> _actionBlock;
private readonly ConcurrentQueue<CancellableMessage<T>> _inputCollection = new ConcurrentQueue<CancellableMessage<T>>();
private readonly int _maxQueueSize;
private readonly object _syncRoot = new object();
public FixedCapacityActionBlock(Action<T> act, ExecutionDataflowBlockOptions opt)
var options = new ExecutionDataflowBlockOptions
EnsureOrdered = opt.EnsureOrdered,
CancellationToken = opt.CancellationToken,
MaxDegreeOfParallelism = opt.MaxDegreeOfParallelism,
MaxMessagesPerTask = opt.MaxMessagesPerTask,
NameFormat = opt.NameFormat,
SingleProducerConstrained = opt.SingleProducerConstrained,
TaskScheduler = opt.TaskScheduler,
//we intentionally ignore this value
//BoundedCapacity = opt.BoundedCapacity
;
_actionBlock = new ActionBlock<CancellableMessage<T>>(cmsg =>
if (cmsg.CancellationTokenSource.IsCancellationRequested)
return;
act(cmsg.Message);
, options);
_maxQueueSize = opt.BoundedCapacity;
public bool Post(T msg)
var fullMsg = new CancellableMessage<T>(msg);
//what if next task starts here?
lock (_syncRoot)
_inputCollection.Enqueue(fullMsg);
var itemsToDrop = _inputCollection.Skip(1).Except(_inputCollection.Skip(_inputCollection.Count - _maxQueueSize + 1));
foreach (var item in itemsToDrop)
item.CancellationTokenSource.Cancel();
CancellableMessage<T> temp;
_inputCollection.TryDequeue(out temp);
return _actionBlock.Post(fullMsg);
和
class CancellableMessage<T> : IDisposable
public CancellationTokenSource CancellationTokenSource get; set;
public T Message get; set;
public CancellableMessage(T msg)
CancellationTokenSource = new CancellationTokenSource();
Message = msg;
public void Dispose()
CancellationTokenSource?.Dispose();
虽然这可行并且实际上完成了这项工作,但这个实现看起来很脏,也可能不是线程安全的。
【问题讨论】:
您能否更详细地解释一下这段代码的用途? 【参考方案1】:这是一个 TransformBlock
和 ActionBlock
实现,只要收到较新的消息并达到 BoundedCapacity
限制,就会丢弃其队列中最旧的消息。它的行为与配置了BoundedChannelFullMode.DropOldest
的Channel
非常相似。
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockDropOldest<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IProgress<TInput> droppedMessages = null)
if (transform == null) throw new ArgumentNullException(nameof(transform));
dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
var boundedCapacity = dataflowBlockOptions.BoundedCapacity;
var cancellationToken = dataflowBlockOptions.CancellationToken;
var queue = new Queue<TInput>(Math.Max(0, boundedCapacity));
var outputBlock = new BufferBlock<TOutput>(new DataflowBlockOptions()
BoundedCapacity = boundedCapacity,
CancellationToken = cancellationToken
);
if (boundedCapacity != DataflowBlockOptions.Unbounded)
dataflowBlockOptions.BoundedCapacity = checked(boundedCapacity * 2);
// After testing, at least boundedCapacity + 1 is required.
// Make it double to be sure that all non-dropped messages will be processed.
var transformBlock = new ActionBlock<object>(async _ =>
TInput item;
lock (queue)
if (queue.Count == 0) return;
item = queue.Dequeue();
var result = await transform(item).ConfigureAwait(false);
await outputBlock.SendAsync(result, cancellationToken).ConfigureAwait(false);
, dataflowBlockOptions);
dataflowBlockOptions.BoundedCapacity = boundedCapacity; // Restore initial value
var inputBlock = new ActionBlock<TInput>(item =>
var droppedEntry = (Exists: false, Item: (TInput)default);
lock (queue)
transformBlock.Post(null);
if (queue.Count == boundedCapacity) droppedEntry = (true, queue.Dequeue());
queue.Enqueue(item);
if (droppedEntry.Exists) droppedMessages?.Report(droppedEntry.Item);
, new ExecutionDataflowBlockOptions()
CancellationToken = cancellationToken
);
PropagateCompletion(inputBlock, transformBlock);
PropagateFailure(transformBlock, inputBlock);
PropagateCompletion(transformBlock, outputBlock);
_ = transformBlock.Completion.ContinueWith(_ => lock (queue) queue.Clear(); ,
TaskScheduler.Default);
return DataflowBlock.Encapsulate(inputBlock, outputBlock);
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
try await source.Completion.ConfigureAwait(false); catch
var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (exception != null) target.Fault(exception); else target.Complete();
async void PropagateFailure(IDataflowBlock source, IDataflowBlock target)
try await source.Completion.ConfigureAwait(false); catch
if (source.Completion.IsFaulted) target.Fault(source.Completion.Exception);
// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockDropOldest<TInput, TOutput>(
Func<TInput, TOutput> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IProgress<TInput> droppedMessages = null)
return CreateTransformBlockDropOldest(item => Task.FromResult(transform(item)),
dataflowBlockOptions, droppedMessages);
// ActionBlock equivalent
public static ITargetBlock<TInput>
CreateActionBlockDropOldest<TInput>(
Func<TInput, Task> action,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IProgress<TInput> droppedMessages = null)
if (action == null) throw new ArgumentNullException(nameof(action));
var block = CreateTransformBlockDropOldest<TInput, object>(
async item => await action(item).ConfigureAwait(false); return null; ,
dataflowBlockOptions, droppedMessages);
block.LinkTo(DataflowBlock.NullTarget<object>());
return block;
// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
CreateActionBlockDropOldest<TInput>(
Action<TInput> action,
ExecutionDataflowBlockOptions dataflowBlockOptions = null,
IProgress<TInput> droppedMessages = null)
return CreateActionBlockDropOldest(
item => action(item); return Task.CompletedTask; ,
dataflowBlockOptions, droppedMessages);
想法是将排队的项目存储在辅助Queue
中,并将虚拟(空)值传递给内部ActionBlock<object>
。该块忽略作为参数传递的项目,并从队列中取一个项目,如果有的话。 Αlock
用于确保队列中所有未丢弃的项目最终都会被处理(当然除非发生异常)。
还有一个额外的功能。可选的IProgress<TInput>
droppedMessages
参数允许在每次删除消息时接收通知。
使用示例:
_messagingActionBlock = CreateActionBlockDropOldest<string>(msg =>
Console.WriteLine($"Processing: msg");
Thread.Sleep(5000);
, new ExecutionDataflowBlockOptions
BoundedCapacity = 2,
, new Progress<string>(msg =>
Console.WriteLine($"Message dropped: msg");
));
【讨论】:
【参考方案2】:TPL Dataflow
不适合 Last N messages
,因为它是队列或管道 (FIFO),而不是堆栈 (LIFO)。您真的需要使用数据流库来执行此操作吗?
使用ConcurrentStack<T>
更容易,您只需引入一个生产者任务,它会发布到堆栈,以及一个消费者任务,它从堆栈获取消息,而处理的数量少于N
(More about Producer-Consumer) .
如果您需要TPL Dataflow
,您可以在消费者任务中使用它来开始处理最后的消息,但不能在生产者中使用,因为它确实不是它的本意使用方式。此外,还有其他一些基于事件架构的库,它们可能更适合您的问题。
【讨论】:
以上是关于TPL数据流处理N条最新消息的主要内容,如果未能解决你的问题,请参考以下文章