RabbitMQ - Work queues

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ - Work queues相关的知识,希望对你有一定的参考价值。

Producer:

    private static void newTask(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE, true, false, false, null);

            String message = getMessage(args);
            channel.basicPublish("", TASK_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println("[x] send ‘" + message + "‘");
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 1)
            return "Hello World!";
        return joinStrings(strings, " ");
    }

    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0)
            return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }

build.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
javac -cp %CP% Sender.java -d .

run.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_producer.Sender hello,world
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_producer.Sender hello,world...

  

Consumer:

private static void taskWorker() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        try {
            channel.queueDeclare(TASK_QUEUE, true, false, false, null);
            System.out.println("[*] Waiting for message, to exit press CTRL+C");
            channel.basicQos(1);

            final Consumer consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("[x] Receive message ‘" + message + "‘");
                    try {
                        doWork(message);
                    } finally {
                        System.out.print("[x] Done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }

            };
            channel.basicConsume(TASK_QUEUE, false, consumer);
        } catch (Exception e) {
            // TODO: handle exception
        }

        // System.in.read();

        // channel.close();
        // connection.close();
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == ‘.‘) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

build.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
javac -cp %CP% Receiver.java -d .

run.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_consumer.Receiver > log.txt
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_consumer.Receiver

  

以上是关于RabbitMQ - Work queues的主要内容,如果未能解决你的问题,请参考以下文章

消息队列RabbitMQ核心:简单(Hello World)模式队列(Work Queues)模式发布确认模式

RabbitMQ - Work queues

RabbitMQ的work queue

RabbitMQ--work queues

RabbitMQ学习总结(上)——RabbitMQ介绍安装 和 Work Queues模式

RabbitMQ学习之Work Queues