RabbitMQ - Work queues
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ - Work queues相关的知识,希望对你有一定的参考价值。
Producer:
private static void newTask(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE, true, false, false, null); String message = getMessage(args); channel.basicPublish("", TASK_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println("[x] send ‘" + message + "‘"); } catch (Exception e) { // TODO: handle exception } } private static String getMessage(String[] strings) { if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
build.bat
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; javac -cp %CP% Sender.java -d .
run.bat
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; java -cp %CP% rm_producer.Sender hello,world
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; java -cp %CP% rm_producer.Sender hello,world...
Consumer:
private static void taskWorker() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); try { channel.queueDeclare(TASK_QUEUE, true, false, false, null); System.out.println("[*] Waiting for message, to exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[x] Receive message ‘" + message + "‘"); try { doWork(message); } finally { System.out.print("[x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE, false, consumer); } catch (Exception e) { // TODO: handle exception } // System.in.read(); // channel.close(); // connection.close(); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == ‘.‘) { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } }
build.bat
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; javac -cp %CP% Receiver.java -d .
run.bat
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; java -cp %CP% rm_consumer.Receiver > log.txt
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar; java -cp %CP% rm_consumer.Receiver
以上是关于RabbitMQ - Work queues的主要内容,如果未能解决你的问题,请参考以下文章
消息队列RabbitMQ核心:简单(Hello World)模式队列(Work Queues)模式发布确认模式