rabbitmq演示代码

Posted zhangblearn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq演示代码相关的知识,希望对你有一定的参考价值。

简单使用:

package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Producer代码 服务端-生产者
* @author cxsz-hp16
* @Title: Sender
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2315:02
*/
public class Sender {
//消息名
private final static String QUEUE_NAME = "MyQueue";

public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();
/**
* 队列声明
* 参数:queue:队列名、durable:是否持久化、exclusive:是否排外、arguments:设置队列消息什么时候被删除
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//设置消息内容
String message = "my first message";
/**
*
* 参数:
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("已发消息:"+message);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* 客户端-消费者
* @author cxsz-hp16
* @Title: Receiver
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2315:31
*/
public class Receiver {
private final static String QUEUE_NAME = "MyQueue";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//回调消费消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
* @author cxsz-hp16
* @Title: NewTask
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:30
*/
public class NewTask {
//消息名
private final static String QUEUE_NAME = "newTask";

public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();

boolean durable = true;
channel.queueDeclare(QUEUE_NAME,durable,true,false,null);
//设置消息内容
String message = "2.";
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + message + "‘");
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;

/**
* @author cxsz-hp16
* @Title: Worker
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:31
*/
public class Worker {
private final static String QUEUE_NAME = "newTask";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//回调消费消息
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);//设置一个任务
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,false,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}

/**
* 任务
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
//将字符串转换为字符数组
for (char ch: task.toCharArray()) {
//当值为.时,阻塞线程来达到耗时的目的
if (ch == ‘.‘){
Thread.sleep(1000);
}
}
}


}



package com.imooc.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
* @author cxsz-hp16
* @Title: NewTask
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:30
*/
public class NewTask23 {
//消息名
private final static String QUEUE_NAME = "task_queue";

public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();

boolean durable = true;
channel.queueDeclare(QUEUE_NAME,true,true,false,null);
//设置消息内容
String message = "2.";
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN
,message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + message + "‘");
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}

package com.imooc.cusumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author cxsz-hp16
* @Title: Worker
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:31
*/
public class Worker3 {
private final static String QUEUE_NAME = "task_queue";

public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,false,null);

channel.basicQos(1);
//回调消费消息
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);//设置一个任务
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,false,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}

/**
* 任务
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
//将字符串转换为字符数组
for (char ch: task.toCharArray()) {
//当值为.时,阻塞线程来达到耗时的目的
if (ch == ‘.‘){
Thread.sleep(1000);
}
}
}


}





















































































































































































































































































































































































































以上是关于rabbitmq演示代码的主要内容,如果未能解决你的问题,请参考以下文章

微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)

html PHP代码片段: - AJAX基本示例:此代码演示了使用PHP和JavaScript实现的基本AJAX功能。

sql 这些代码片段将演示如何逐步使用PolyBase。你应该有一个blob存储和存储秘密方便

RabbitMQ python 演示 route

如何使用 Android 片段?

Android课程---Android Studio使用小技巧:提取方法代码片段