在分布式系统中组织事件的执行并避免死锁

Posted

技术标签:

【中文标题】在分布式系统中组织事件的执行并避免死锁【英文标题】:Organizing execution of events in a distributed system and avoid deadlocks 【发布时间】:2021-03-03 00:38:56 【问题描述】:

我在系统中对事件进行优先级排序时遇到问题。 我有一个简单的类,可以订阅彼此的输出

public interface INode<TIn, TOut> : IBaseNode

    event EventHandler<TOut> Output; 
    //Note: subscribe just calls node.Output += this.OnInput
    void Subscribe(IBaseNode node);
    void OnInput(object sender, TIn input)
 

使用它,我可以通过订阅它们的输出将节点链接在一起

CarDealerNode.Subscribe(NewModelNode);
LoggerNode.Subscribe(CarDealerNode);

我的问题是,当事件触发时,它会以半不确定的广度优先方式发生。我想保持这些事件的执行顺序,以便以更动态的方式优先处理事件。

我的第一印象是使用一些优先级队列来对任务进行排序,但这可能会导致问题,因为较低优先级的事情可能永远不会执行

public class SynchronizationInfo

    public SyncPriority Priority  get; set;  = SyncPriority.Normal;
    public object Sender  get; set; 
    public DateTime Created  get; set;  = DateTime.Now;
    public Task Operation  get; set; 


public class SynchronizationContext

    public PriorityQueue<SynchronizationInfo> ExecutionQueue = new PriorityQueue<SynchronizationInfo>();
    //...

但是我仍然难以掌握确保不会发生死锁的方法,如果添加高优先级的内容比执行该优先级的速度更快,则不会执行低优先级的事件.

此外,仅仅因为某些事情的优先级过低并不意味着所有优先级较高的事情都应该优先处理,时间是一个重要因素。

是否有可靠有效的推荐方式来处理任务的优先执行。以一种没有任务遇到死锁的方式,(例如,时间增加优先级以提高优先级以确保执行)?

【问题讨论】:

您可能仍然可以使用优先级队列,但是优先级应该是取决于“本机”优先级 (SyncPriority) 和自项目创建以来经过的时间的数字。所以应该在出队事件期间计算优先级(队列中的所有项目)。 @Evk 好点,我可能可以根据枚举计算优先级以及该任务需要执行的时间。我也可以想到其他几种不同的方法,比如使用Queue[] 并以不同的速率从每个队列中调用 take 并拼接提要。但我是分布式系统的新手,所以我真正希望有人能给我一个明确的广泛使用的答案。也许这个问题比我想象的更不常见,但我觉得这将是许多实时系统的优先事项。 另一种选择可能是在特定间隔重新计算优先级,足够大,不会对性能产生太大影响。然后低优先级的项目永远不会死锁,因为迟早它们的优先级将被提升到足以出列。不同之处不是每次出队都这样做(如我的第一条评论),而是间隔更大。 您可以使用动态优先级队列 - 一个允许项目动态重新计算其优先级的 pq。 (出于显而易见的原因,您一次只能对 1 个项目执行此操作。)您认为这值得尝试吗?前段时间我负责一个实时调度应用程序,它做到了这一点,它工作正常。您只需要真实小心,将 pq 保持在原始状态。 @johnny5 SortedDictionary 不适合这个恕我直言。 pq 是完美的,因为(我相信你知道)将其中的数量加倍仅意味着最多 1 个比较来平衡它。我认为您会对基本容器的简单程度感到惊讶,甚至允许即时重新平衡。但是你比我更了解你的用例:-) 【参考方案1】:

既然可以有更多队列,为什么还要设置一个队列?

这适用于任何恒定的优先级计数,尽管优先级足够低。 (据我所知,您有一个enum 给他们,所以可能只有几个优先事项)。另外,我们不会使用优先级队列,而是使用几个普通队列。

为每个优先级创建一个队列。该任务根据其优先级注册到队列中。对于每个任务,都会像 @Funk 一样存储创建时间戳。 当您希望处理下一个任务时,请检查每个队列中可用元素的时间戳。 这使您可以检测到过期的低优先级任务并提高它们的优先级。

可以通过多种方式增加优先级。例如:

当任务在队列中的时间足够长时直接开始执行。 (例如 high_creation_time > medium_creation_time + C -> 运行中优先级任务) 将任务重新调度到优先级更高的队列中,而不是直接运行。

哪种方式更适合你,有点难说。

这种方法的复杂性:

添加新任务:O(1) - 只需将其添加到相应的队列中 运行任务:O(1) - 假设我们有恒定数量的优先级,这只是检查所有队列并找到接下来应该运行的元素的问题。 重新安排任务(如果适用) - 一次推送和一次弹出,因此 O(1)

【讨论】:

这与某些 OS 调度程序(多级队列调度)所做的类似 - 尽管更简单 - 恕我直言,这是处理手头问题的最佳方法。 实际上,这是我回答这个问题的灵感。不幸的是,快速搜索一篇易于阅读的文章并没有产生任何结果。如果您知道任何事情,我很乐意提供链接。 我同意,这似乎类似于我的一个想法,即拥有一组队列并以不同的速率从所有队列中拉出。但是这种方式有点复杂,因为我只在需要时才从较低的优先级中提取【参考方案2】:

是否有可靠有效的推荐方式来处理任务的优先执行。以一种没有任务遇到死锁的方式,(例如,时间增加优先级以提高优先级以确保执行)?

“时间增加优先级”方法的问题是需要一直重新计算优先级队列。

让我们回顾一下优先级队列的正常用例。以下表示数据结构中有序项的列表:

Priority = SyncPriority.High, Created = "2021-03-05 12:34:01", ... Priority = SyncPriority.High, Created = "2021-03-05 12:34:04", ... Priority = SyncPriority.High, Created = "2021-03-05 12:34:06", ... Priority = SyncPriority.Normal, Created = "2021-03-05 12:34:02", ... Priority = SyncPriority.Normal, Created = "2021-03-05 12:34:05", ... Priority = SyncPriority.Low, Created = "2021-03-05 12:34:03", ...

我们可以判断每秒发生的事件,从HighNormalLow、...优先级开始。当添加下一个High 优先级时,我们可以将它插入到具有Normal 优先级的第一个项目之前。数据结构的效率是基于所有其他项目的顺序不变的事实。

如果我们将经过的时间(即增长的时间间隔)添加到组合中,而不是创建时间,则必须在每次请求下一个最高优先级项目时重新计算顺序。密钥基本上在时间上变为线性而不是常数。

为了避免这个难题及其固有的复杂性,您可以分而治之。允许使用窗口系统的几种简化中的一种。

最大优先级队列公开以下成员:

插入 删除最大值

第一个简化示例,限制优先级队列中的元素数量。

public class PriorityQueueWithMaxElements

    private readonly int _maxElementsInPriorityQueue;

    private readonly PriorityQueue<SynchronizationInfo> _priorityQueue;
    private readonly Queue<SynchronizationInfo> _backingQueue;

    public PriorityQueueWithMaxElements(int maxElementsInQueue)
    
        _maxElementsInPriorityQueue = maxElementsInQueue;

        _priorityQueue = new PriorityQueue<SynchronizationInfo>();
        _backingQueue = new Queue<SynchronizationInfo>();
    

    public void Insert(SynchronizationInfo info)
    
        if (_backingQueue.Any() || _priorityQueue.Count == _maxElementsInPriorityQueue)
        
            _backingQueue.Enqueue(info);
        
        else
        
            _priorityQueue.Insert(info);
        
    

    public SynchronizationInfo RemoveMax()
    
        var max = _priorityQueue.RemoveMax();

        if (max == null && _backingQueue.Count > 0)
        
            var numberOfItems = Math.Min(_maxElementsInPriorityQueue, _backingQueue.Count);
            for (var i = 0; i < numberOfItems; i++)
            
                _priorityQueue.Insert(_backingQueue.Dequeue());
            

            max = _priorityQueue.RemoveMax();
        

        return max;
    

第二个例子,对优先队列中元素之间的最大时间延迟进行限制。

public class PriorityQueueWithMaxDelay

    private readonly TimeSpan _maxDelay;
    
    private readonly PriorityQueue<SynchronizationInfo> _priorityQueue;
    private readonly Queue<SynchronizationInfo> _backingQueue;

    // DateTime of oldest item in the priority-queue.
    private DateTime? _baseDateTime;

    public PriorityQueueWithMaxDelay(TimeSpan maxDelay)
    
        _maxDelay = maxDelay;

        _priorityQueue = new PriorityQueue<SynchronizationInfo>();
        _backingQueue = new Queue<SynchronizationInfo>();
    

    public void Insert(SynchronizationInfo info)
    
        if (_baseDateTime == null)
        
            _baseDateTime = info.Created;
        

        if (_backingQueue.Any() || !IsWithinDelay(info))
        
            _backingQueue.Enqueue(info);
        
        else
        
            _priorityQueue.Insert(info);
        
    

    public SynchronizationInfo RemoveMax()
    
        var max = _priorityQueue.RemoveMax();

        if (max == null && _backingQueue.Count > 0)
        
            _baseDateTime = _backingQueue.Peek().Created;

            var numberOfItems = _backingQueue.TakeWhile(IsWithinDelay).Count();
            for (var i = 0; i < numberOfItems; i++)
            
                _priorityQueue.Insert(_backingQueue.Dequeue());
            

            max = _priorityQueue.RemoveMax();
        

        if (_priorityQueue.Count == 0)
        
            _baseDateTime = null;
        


        return max;
    

    public bool IsWithinDelay(SynchronizationInfo info)
        => info.Created - _baseDateTime < _maxDelay;

请注意,一旦超出界限,优先级队列将在再次补充之前完全清空。这是为了避免低优先级的项目无限期地留在队列中。我将多线程支持作为练习留给读者。可以说,高性能实现将更容易实现,然后您必须重新计算每个请求上所有项目的顺序。

【讨论】:

有趣,这肯定会工作 +1。我唯一的保留是枚举其中一个队列来填充优先级队列似乎是浪费的操作,我拥有的另一个是后备队列可以在顶部填充低优先级的东西,所以它不能完全控制 但是无论如何,这会起作用,所以谢谢,当赏金用完时,我会记住这一点 另外,让我们考虑添加高优先级的东西比处理它们的速度快一点的情况。然后,当任一回退机制触发时,您可能会积压大量的中优先级或低优先级任务——这些任务将同时运行。当一些最近添加的 low_priority 任务可能比队列被清空时出现的 high_priority 任务晚得多时,这可能会导致令人讨厌的优先级反转。但是,在这种情况下,无论如何您都可能注定要失败...【参考方案3】:

目前,事件按照它们注册的顺序执行。然而,这只是它现在的实现方式,我不建议任何人依赖这种行为,因为在未来的版本中可能不会如此。

.NET 语言将事件实现隐藏在语法障碍之后,尽管事件处理基于相对明确定义的委托系统。

从这里检查委托类: Delegate Class

即使在附加事件之后,您也可以通过分离所有处理程序,然后按所需顺序重新附加来更改顺序。

注意:您可以通过更改事件上的addremove 操作来指定其他一些行为来覆盖事件的默认行为。然后,您可以将事件处理程序保存在您自己管理的列表中,并根据您喜欢的任何规则处理触发顺序。

【讨论】:

感谢您的回复,但我正在寻找一种管理任务的可靠方法

以上是关于在分布式系统中组织事件的执行并避免死锁的主要内容,如果未能解决你的问题,请参考以下文章

Redis实现分布式锁

使用Redis实现分布式锁

在分布式系统中使用 DDD

分布式系统的进程组织

分布式系统的进程组织

多线程之死锁定位及故障分析,尽可能避免线上事故(十三)