如何在 TPL/Dataflow 中发出笛卡尔积?
Posted
技术标签:
【中文标题】如何在 TPL/Dataflow 中发出笛卡尔积?【英文标题】:How to emit a cartesian product in TPL/Dataflow? 【发布时间】:2018-08-10 02:46:54 【问题描述】:我正在尝试实现以下行为:
[TestMethod]
public async Task ProducesCartesianProductOfInputs()
var block = new CartesianProductBlock<int, string>();
var target = new BufferBlock<Tuple<int, string>>();
var left = block.Left;
var right = block.Right;
block.LinkTo(target);
var actual = new List<Tuple<int, string>>();
Assert.IsTrue(left.Post(1));
Assert.IsTrue(right.Post("a"));
Assert.IsTrue(left.Post(2));
Assert.IsTrue(right.Post("b"));
// PROBLEM?: These can run before messages have been processed and appear to abort further processing.
left.Complete();
right.Complete();
while (await target.OutputAvailableAsync())
actual.Add(target.Receive());
var expected = new List<Tuple<int, string>>()
Tuple.Create(1, "a"),
Tuple.Create(2, "a"),
Tuple.Create(1, "b"),
Tuple.Create(2, "b"),
;
CollectionAssert.AreEquivalent(expected, actual.ToList());
我当前的(部分)实现不起作用,我不知道为什么:
// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>
private TransformManyBlock<T1, Tuple<T1, T2>> left;
private TransformManyBlock<T2, Tuple<T1, T2>> right;
private List<T1> leftReceived = new List<T1>();
private List<T2> rightReceived = new List<T2>();
private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();
private object lockObject = new object();
public ITargetBlock<T1> Left get return left;
public ITargetBlock<T2> Right get return right;
public CartesianProductBlock()
left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
lock (lockObject)
leftReceived.Add(l);
// Pair this input up with all received alternatives
return rightReceived.Select(r => Tuple.Create(l, r));
);
right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
lock(lockObject)
rightReceived.Add(r);
// Pair this input up with all received alternatives
return leftReceived.Select(l => Tuple.Create(l, r));
);
Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ =>
// TODO: Respect propagate completion linkOptions. Defauting to propagation for now.
foreach (var target in targets)
target.Complete();
);
private TaskCompletionSource<int> completion = new TaskCompletionSource<int>();
public Task Completion => completion.Task;
public void Complete() throw new NotImplementedException();
public void Fault(Exception exception) throw new NotImplementedException();
public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
left.LinkTo(target);
right.LinkTo(target);
targets.Add(target);
return null; // TODO: Return something proper to allow unlinking
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
throw new NotImplementedException();
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
throw new NotImplementedException();
public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
throw new NotImplementedException();
我遇到了以下(可能相关的)问题:
它是不确定的。测试以不同的方式失败。 看来(通过添加日志记录,并且因为我得到了 3 到 6 条输出消息)Complete
对两个输入的调用导致消息无法处理,尽管我的理解是它应该允许所有队列首先排空。 (如果不是,那么我不知道如何正确编写测试。)
很可能我的锁定方案是错误的/次优的,尽管我的目标是在尝试修复之前有一些大而粗糙的东西。
我对个人 TransformManyBlocks
的实验未能得出令人惊讶的结果,我不知道在这种情况下有什么不同。
【问题讨论】:
要检查一件小事:如果您使用 Post() 而不是 SendAsync(),请确保检查返回值并正确处理false
。我相信,除非您附加缓冲区,否则块会拒绝超出单个未处理消息的消息,这意味着 Post() 将返回 false 并且您发布的值将丢失。
啊,很好。在这种情况下,它们总是返回 true,但我添加了断言以供将来验证!
我开始怀疑不传播完整性会导致我的问题,但我还不能清楚地表达出来。当我这样做时,我似乎得到了更可靠的结果。
我尝试处理完成的最新版本的代码似乎效果更好:github.com/xaviershay/ori-tracker/blob/master/MapStitcher/…(留下原始帖子,因为不确定这是否是红鲱鱼)
【参考方案1】:
正如怀疑的那样,这与完整性传播有关。这是一个工作版本,包括适当的链接一次性和尊重链接选项:
// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>
private TransformManyBlock<T1, Tuple<T1, T2>> left;
private TransformManyBlock<T2, Tuple<T1, T2>> right;
private List<T1> leftReceived = new List<T1>();
private List<T2> rightReceived = new List<T2>();
private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();
private object lockObject = new object();
public ITargetBlock<T1> Left get return left;
public ITargetBlock<T2> Right get return right;
public CartesianProductBlock()
left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
lock (lockObject)
leftReceived.Add(l);
return rightReceived.Select(r => Tuple.Create(l, r)).ToList();
);
right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
lock(lockObject)
rightReceived.Add(r);
return leftReceived.Select(l => Tuple.Create(l, r)).ToList();
);
Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ =>
completion.SetResult(VoidResult.Instance);
);
private TaskCompletionSource<VoidResult> completion = new TaskCompletionSource<VoidResult>();
public Task Completion => completion.Task;
public void Complete()
Left.Complete();
Right.Complete();
public void Fault(Exception exception) throw new NotImplementedException();
public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
var leftLink = left.LinkTo(target);
var rightLink = right.LinkTo(target);
var link = new Link(leftLink, rightLink);
Task task = Task.FromResult(0);
if (linkOptions.PropagateCompletion)
task = Task.WhenAny(Task.WhenAll(Left.Completion, Right.Completion), link.Completion).ContinueWith(_ =>
// If the link has been disposed of, we should not longer propagate completeness.
if (!link.Completion.IsCompleted)
target.Complete();
);
return link;
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
throw new NotImplementedException();
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
throw new NotImplementedException();
public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
throw new NotImplementedException();
private class Link : IDisposable
private IDisposable leftLink;
private IDisposable rightLink;
public Link(IDisposable leftLink, IDisposable rightLink)
this.leftLink = leftLink;
this.rightLink = rightLink;
private TaskCompletionSource<VoidResult> completionSource = new TaskCompletionSource<VoidResult>();
public Task Completion get return completionSource.Task;
public void Dispose()
leftLink.Dispose();
rightLink.Dispose();
completionSource.SetResult(VoidResult.Instance);
private class VoidResult
public static VoidResult instance = new VoidResult();
public static VoidResult Instance get return instance;
protected VoidResult()
【讨论】:
以上是关于如何在 TPL/Dataflow 中发出笛卡尔积?的主要内容,如果未能解决你的问题,请参考以下文章