如何设计:具有多线程信号的单个阻塞工作者

Posted

技术标签:

【中文标题】如何设计:具有多线程信号的单个阻塞工作者【英文标题】:How to design: Single blocking worker with multiple threads signalling for work 【发布时间】:2021-11-07 13:20:19 【问题描述】:

我有一个工人,它在运行时会收集所有可用的工作并对其进行批处理。因为它不应该轮询可用的工作,所以它必须在某个地方阻塞,然后运行并返回阻塞。

同时,我有一些向系统添加工作的操作,有时比工作人员的处理速度要快得多。工作不会保存在内存中,因此不可能有工作队列。因此,每个操作都应该向工作人员发出至少有一些工作可用的信号。

1 Worker                N Producers
  ___                       ____
 /   \                     /    \
|    WAIT <------------ SIGNAL   |
|     |                   |      |
^     V                   ^      V
|     |                   |      |
|    WORK <- DATABASE <- ADD     |
 \___/                     \____/

我还没有找到用标准工具来表达这一点的方法,我需要它至少可以在 C#、Swift 和 Typescript 中实现。我检查的所有并发 API 都必须平衡,因此不允许有比等待更多的信号。类似地,我不想使用队列,因为它可能会变得太大并且不能很好地解决问题。

问题

在不深入代码的情况下,我应该使用什么类型、类、方法或算法来实现这个场景?

有没有什么方法可以结合现有的并发 api,如锁、信号量、原子类型来实现我的目标?

为什么没有一个信号量可以发出比等待更多的信号(比如等待会清除所有额外的信号)?

【问题讨论】:

关于 C# 可用的工具,这看起来像是一个可以用BlockingCollection&lt;T&gt;(同步)或Channel&lt;T&gt;(异步)解决的问题。这两个组件都支持背压(有限容量)。 任何挂起线程的方法都不是最理想的。虽然存在涉及这些“消费者 - 提供者”模式(也在 .NET 中建议)的遗留线程技术,这可能会导致挂起线程的数量不受限制,但这对于现代设计(恕我直言)来说是不行的 - 无论它是否正在运行在设备、台式机或服务器上。因此,更好的解决方案是(非阻塞)事件驱动,可能使用 FRP,即来自 Rx 系列的一些工具、Swift Combine 等。FRP 在所有提到的平台上都可用。在任何情况下:async ;) 你应该阅读How to Ask。你的解释我一点都不清楚。 @Enigmativity 我阅读了 HowToAsk,但我不知道如何才能更合规。由于这不是一个特定的代码问题,我无法给出代码示例,并且我试图使主题尽可能定义,而不会使其太长。 Zoulias:我说明了不使用队列的原因。 CouchDeveloper:您可能错过了我只需要一个有时被暂停的工人。轮询绝对不是 ios 框架的选项。 @AndreasPardeike - 一个好问题可以在几分钟内得到答案。我真的不明白你对你想要做什么的解释。也许您可以尝试改写? 【参考方案1】:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class consumeprodue<BlockingQue> 
    private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
    public static void main(String[] args) 
        Thread p = new Thread() 
            public void run() 
                for (int i = 0; i < 10; i++) 
                    try 
                        System.out.println(getName() + " produced :" + i);
                        queue.put(i);
                        Thread.sleep(200);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                

            

        ;
        Thread c = new Thread() 

            public void run() 

                try 
                    while (true) 
                        System.out.println(getName() + "consume :" + queue.take());
                    

                 catch (InterruptedException e) 

                    e.printStackTrace();
                
            

        ;
        Thread d = new Thread() 

            public void run() 

                try 
                    while (true) 
                        System.out.println(getName() + "consume------- :" + queue.take());
                    

                 catch (InterruptedException e) 

                    e.printStackTrace();
                
            

        ;
        d.start();
        p.start();
        c.start();

    


【讨论】:

虽然此代码可以解决问题,including an explanation 说明如何以及为什么解决问题将真正有助于提高您的帖子质量,并可能导致更多的赞成票。请记住,您正在为将来的读者回答问题,而不仅仅是现在提问的人。请edit您的回答添加解释并说明适用的限制和假设。 正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center。 感谢您的努力,但我明确表示我不想要详细的代码。我必须用多种语言来实现它,并且希望有人向我解释要使用哪些(软件)工具。

以上是关于如何设计:具有多线程信号的单个阻塞工作者的主要内容,如果未能解决你的问题,请参考以下文章

linux 多线程信号处理总结

java并发

python多线程-Semaphore(信号对象)

linux 多线程信号处理总结

java 多线程 28 : 多线程组件之 Semaphore 信号量

Dart 中的多线程 与 Future