一个生产者,多个消费者

Posted

技术标签:

【中文标题】一个生产者,多个消费者【英文标题】:One Producer, multiple Consumers 【发布时间】:2015-03-29 14:03:50 【问题描述】:

我一直在编写一些代码,但我需要帮助。 我创建了一个生产者和一个消费者,但是我需要创建多个消费者,他们将从生产者那里消费特定的String,例如我需要一个专门消费“左手移动”的消费者。

代码中包含的是buffer、producer、consumer和main。我不确定如何通知正确的消费者并比较需要使用的字符串。就目前而言,我只有一个消费者。

public class iRobotBuffer 
    private boolean empty = true;

    public synchronized String take() 
        // Wait until message is
        // available.
        while (empty) 
            try 
                wait();
             catch (InterruptedException e) 
        
        // Toggle status.
        empty = true;
        // Notify producer that
        // status has changed.
        notifyAll();
        return message;
    

    public synchronized void put(String message) 
        // Wait until message has
        // been retrieved.
        while (!empty) 
            try 
                wait();
             catch (InterruptedException e) 
        
        // Toggle status.
        empty = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    


public class iRobotConsumer implements Runnable 
    private iRobotBuffer robotBuffer;

    public iRobotConsumer(iRobotBuffer robotBuffer)
        this.robotBuffer = robotBuffer;
    

    public void run() 
        Random random = new Random();
        for (String message = robotBuffer.take();
                ! message.equals("DONE");
                message = robotBuffer.take()) 
            System.out.format("MESSAGE RECEIVED: %s%n", message);
            try 
                Thread.sleep(random.nextInt(5000));
             catch (InterruptedException e) 
        
    


public class iRobotProducer implements Runnable 
    private iRobotBuffer robotBuffer;
    private int number;

    public iRobotProducer(iRobotBuffer robotBuffer)
      
        this.robotBuffer = robotBuffer;
        //this.number = number;
    

    public void run() 
        String commandInstructions[] = 
                "Move Left Hand",
                "Move Right Hand",
                "Move Both Hands",
        ;
        int no = commandInstructions.length;
        int randomNo;
        Random random = new Random();


        for (int i = 0;
                i < commandInstructions.length;
                i++) 
            randomNo =(int)(Math.random()*no);
            System.out.println(commandInstructions[randomNo]);

            robotBuffer.put(commandInstructions[i]);
            try 
                Thread.sleep(random.nextInt(5000));
             catch (InterruptedException e) 
        
        robotBuffer.put("DONE");
    


public class iRobot

    public static void main(String[] args)
    
        iRobotBuffer robotBuffer = new iRobotBuffer();
        (new Thread(new iRobotProducer(robotBuffer))).start();
        (new Thread(new iRobotConsumer(robotBuffer))).start();

    //main
//class

【问题讨论】:

感谢编辑文本,有建设性答案的机会吗? 什么是正确的消费者 编辑只是帮助人们阅读代码的问题。这不是没有建设性的 谢谢,真的很有帮助 我所说的正确消费者的意思是消费者会消费其中一个产生的字符串,例如左手移动将被左手消费者消费。这就是我现在卡住的地方 【参考方案1】:

问题在于您的 iRobotBuffer 类。它需要是一个队列来支持多个生产者/消费者。我已经提供了这样一个队列的代码,但是 java 已经有一个实现(BlockingDeque&lt;E&gt;)。

public class BlockingQueue<T> 

    private final LinkedList<T> innerList = new LinkedList<>();
    private boolean isEmpty = true;

    public synchronized T take() throws InterruptedException 
        while (isEmpty) 
            wait();
        

        T element = innerList.removeFirst();
        isEmpty = innerList.size() == 0;
        return element;
    

    public synchronized void put(T element) 
        isEmpty = false;
        innerList.addLast(element);
        notify();
    

【讨论】:

非常感谢!那么,您将如何确保将正确的消息发送给相关消费者? 那你需要路由。您需要一个额外的类来接收消息,并将其放入正确的队列中,具体取决于标准。在消息队列系统中,它们通常按主题路由。这可能有助于您理解其背后的理论:spring.io/blog/2010/06/14/… 谢谢!您对如何按主题设置路由有任何想法吗?链接已离线。 这很复杂。您将有多个队列,每个订阅者都从其队列中轮询。每个队列都应用了一个主题通配符(翻译为:我是订阅者,我对 top1.top2.* 的消息很感兴趣)。当您发送消息时,您有一个主题(top1.top2.mytopic)。该消息将被复制并路由到所有监听该主题的订阅者。【参考方案2】:

据我了解,您需要 3 个消费者,每个 move 指令一个。 您可以使用 java.util.concurrent 包中的 ArrayBlockingQueue 代替 iRobotBuffer 类。顺便说一句,您可以看看提供的其他并发集合 - 一个可能会更好。

然后对于消费者,您可以peek() 在队列中测试它是否符合要求,然后poll()

【讨论】:

非常感谢,我之前没有使用过并发系统或多线程,所以这对我来说很新。你能在代码中告诉我你的意思吗?我对这些包一点都不熟悉。

以上是关于一个生产者,多个消费者的主要内容,如果未能解决你的问题,请参考以下文章

生产者与消费者问题

多个生产者,单个消费者

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。)

多个生产者和消费者,但一个共享资源 - 只有一个线程在运行

ActiveMQ:单个生产者,多个消费者

如何在kafka中实现多个生产者和多个消费者