如何在 TPL/Dataflow 中发出笛卡尔积?

Posted

技术标签:

【中文标题】如何在 TPL/Dataflow 中发出笛卡尔积?【英文标题】:How to emit a cartesian product in TPL/Dataflow? 【发布时间】:2018-08-10 02:46:54 【问题描述】:

我正在尝试实现以下行为:

[TestMethod]
public async Task ProducesCartesianProductOfInputs()

    var block = new CartesianProductBlock<int, string>();
    var target = new BufferBlock<Tuple<int, string>>();

    var left = block.Left;
    var right = block.Right;

    block.LinkTo(target);

    var actual = new List<Tuple<int, string>>();

    Assert.IsTrue(left.Post(1));
    Assert.IsTrue(right.Post("a"));
    Assert.IsTrue(left.Post(2));
    Assert.IsTrue(right.Post("b"));

    // PROBLEM?: These can run before messages have been processed and appear to abort further processing.
    left.Complete();
    right.Complete();

    while (await target.OutputAvailableAsync())
    
        actual.Add(target.Receive());
    

    var expected = new List<Tuple<int, string>>()
    
        Tuple.Create(1, "a"),
        Tuple.Create(2, "a"),
        Tuple.Create(1, "b"),
        Tuple.Create(2, "b"),
    ;

    CollectionAssert.AreEquivalent(expected, actual.ToList());

我当前的(部分)实现不起作用,我不知道为什么:

// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>

    private TransformManyBlock<T1, Tuple<T1, T2>> left;
    private TransformManyBlock<T2, Tuple<T1, T2>> right;

    private List<T1> leftReceived = new List<T1>();
    private List<T2> rightReceived = new List<T2>();
    private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();

    private object lockObject = new object();

    public ITargetBlock<T1> Left  get  return left;  
    public ITargetBlock<T2> Right  get  return right;  

    public CartesianProductBlock()
    
        left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
        
            lock (lockObject)
            
                leftReceived.Add(l);
                // Pair this input up with all received alternatives
                return rightReceived.Select(r => Tuple.Create(l, r));
            
        );
        right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
        
            lock(lockObject)
            
                rightReceived.Add(r);
                // Pair this input up with all received alternatives
                return leftReceived.Select(l => Tuple.Create(l, r));
            
        );
        Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ => 
            // TODO: Respect propagate completion linkOptions. Defauting to propagation for now.
            foreach (var target in targets)
            
                target.Complete();
            
        );
    

    private TaskCompletionSource<int> completion = new TaskCompletionSource<int>();

    public Task Completion => completion.Task;

    public void Complete()  throw new NotImplementedException(); 
    public void Fault(Exception exception)  throw new NotImplementedException(); 

    public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
    
        left.LinkTo(target);
        right.LinkTo(target);
        targets.Add(target);
        return null; // TODO: Return something proper to allow unlinking
    

    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
    
        throw new NotImplementedException();
    

    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
    
        throw new NotImplementedException();
    

    public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
    
        throw new NotImplementedException();
    

我遇到了以下(可能相关的)问题:

它是不确定的。测试以不同的方式失败。 看来(通过添加日志记录,并且因为我得到了 3 到 6 条输出消息)Complete 对两个输入的调用导致消息无法处理,尽管我的理解是它应该允许所有队列首先排空。 (如果不是,那么我不知道如何正确编写测试。) 很可能我的锁定方案是错误的/次优的,尽管我的目标是在尝试修复之前有一些大而粗糙的东西。 我对个人 TransformManyBlocks 的实验未能得出令人惊讶的结果,我不知道在这种情况下有什么不同。

【问题讨论】:

要检查一件小事:如果您使用 Post() 而不是 SendAsync(),请确保检查返回值并正确处理 false。我相信,除非您附加缓冲区,否则块会拒绝超出单个未处理消息的消息,这意味着 Post() 将返回 false 并且您发布的值将丢失。 啊,很好。在这种情况下,它们总是返回 true,但我添加了断言以供将来验证! 我开始怀疑不传播完整性会导致我的问题,但我还不能清楚地表达出来。当我这样做时,我似乎得到了更可靠的结果。 我尝试处理完成的最新版本的代码似乎效果更好:github.com/xaviershay/ori-tracker/blob/master/MapStitcher/…(留下原始帖子,因为不确定这是否是红鲱鱼) 【参考方案1】:

正如怀疑的那样,这与完整性传播有关。这是一个工作版本,包括适当的链接一次性和尊重链接选项:

// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>

    private TransformManyBlock<T1, Tuple<T1, T2>> left;
    private TransformManyBlock<T2, Tuple<T1, T2>> right;

    private List<T1> leftReceived = new List<T1>();
    private List<T2> rightReceived = new List<T2>();
    private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();

    private object lockObject = new object();

    public ITargetBlock<T1> Left  get  return left;  
    public ITargetBlock<T2> Right  get  return right;  

    public CartesianProductBlock()
    
        left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
        
            lock (lockObject)
            
                leftReceived.Add(l);
                return rightReceived.Select(r => Tuple.Create(l, r)).ToList();
            
        );
        right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
        
            lock(lockObject)
            
                rightReceived.Add(r);
                return leftReceived.Select(l => Tuple.Create(l, r)).ToList();
            
        );

        Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ => 
            completion.SetResult(VoidResult.Instance);
        );
    

    private TaskCompletionSource<VoidResult> completion = new TaskCompletionSource<VoidResult>();

    public Task Completion => completion.Task;

    public void Complete() 
        Left.Complete();
        Right.Complete();
    

    public void Fault(Exception exception)  throw new NotImplementedException(); 

    public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
    
        var leftLink = left.LinkTo(target);
        var rightLink = right.LinkTo(target);

        var link = new Link(leftLink, rightLink);

        Task task = Task.FromResult(0);
        if (linkOptions.PropagateCompletion)
        
            task = Task.WhenAny(Task.WhenAll(Left.Completion, Right.Completion), link.Completion).ContinueWith(_ =>
            
                // If the link has been disposed of, we should not longer propagate completeness.
                if (!link.Completion.IsCompleted)
                
                    target.Complete();
                
            );
        

        return link;
    

    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
    
        throw new NotImplementedException();
    

    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
    
        throw new NotImplementedException();
    

    public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
    
        throw new NotImplementedException();
    

    private class Link : IDisposable
    
        private IDisposable leftLink;
        private IDisposable rightLink;

        public Link(IDisposable leftLink, IDisposable rightLink)
        
            this.leftLink = leftLink;
            this.rightLink = rightLink;
        

        private TaskCompletionSource<VoidResult> completionSource = new TaskCompletionSource<VoidResult>();
        public Task Completion  get  return completionSource.Task;  

        public void Dispose()
        
            leftLink.Dispose();
            rightLink.Dispose();
            completionSource.SetResult(VoidResult.Instance);
        
    

    private class VoidResult
    
        public static VoidResult instance = new VoidResult();
        public static VoidResult Instance  get  return instance;  

        protected VoidResult()  
    

【讨论】:

以上是关于如何在 TPL/Dataflow 中发出笛卡尔积?的主要内容,如果未能解决你的问题,请参考以下文章

TPL Dataflow 如何与“全局”数据同步

TPL Dataflow 管道中的图像刷新问题

IO读写操作的TPL Dataflow实现中的内存问题

TPL DataFlow 一对一处理

TPL Dataflow,数据块收到第一项时的通知

如何在 PostgreSQL 中获得随机笛卡尔积?