rabbitmq消费者生产者实践

Posted 卡特的编程生活

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq消费者生产者实践相关的知识,希望对你有一定的参考价值。

完成消费者的模板化处理,并且可以配置

rabbitmq消费者生产者实践自定义事件的bug修复

rabbitmq消费者生产者实践讲解清楚,整理需要使用的文档

1,完成消费者的模板化处理,并且可以配置

rabbitmq消费者生产者实践

核心配置代码:

@Value("${rabbitmq.system.name}")

private String systemName;

@Autowired

private RabbitMqCfg rabbitMqCfg;

@Bean

@Qualifier("connectionFactoryProducer")

public ConnectionFactory cachingConnectionFactory()

{

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

connectionFactory.setPort(5672);

connectionFactory.setUsername(rabbitMqCfg.getUsername());

connectionFactory.setPassword(rabbitMqCfg.getPassword());

connectionFactory.setHost(rabbitMqCfg.getHost());

connectionFactory.setVirtualHost(rabbitMqCfg.getVirtualHost());

connectionFactory.setChannelCacheSize(rabbitMqCfg.getChannelCacheSize());

connectionFactory.setPublisherConfirms(rabbitMqCfg.isPublisherConfirm());

return connectionFactory;

}

@Bean

public AmqpAdmin amqpAdmin()

{

return new RabbitAdmin(cachingConnectionFactory());

}

@Bean

public DirectExchange systemExchange()

{

DirectExchange userExchange = new DirectExchange(systemName+"Exchange", true, false, null);

amqpAdmin().declareExchange(userExchange);

return userExchange;

}

/**

* 可以按照这个模板申明多个绑定关系,在绑定关系里可以建立队列;

* @return

*/

@Bean

public Binding bindingUser()

{

Queue userQueue = new Queue("q_"+systemName+"_user",true,false,false,null);

amqpAdmin().declareQueue(userQueue);

Binding binding = BindingBuilder.bind(userQueue).to(systemExchange()).with(userQueue.getName());

amqpAdmin().declareBinding(binding);

return binding;

}

@Autowired

private CommonMessageListener channelAwareMessageListener;

@Bean

@Qualifier("SimpleMessageListenerContainer1")

public SimpleMessageListenerContainer simpleMessageListenerContainer()

{

SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();

simpleMessageListenerContainer.setConnectionFactory(connectionFactory);

/**

* 这里配置所有的需要监听的队列名;

*/

simpleMessageListenerContainer.setQueueNames("q_test_user");

simpleMessageListenerContainer.setMessageListener(channelAwareMessageListener);

simpleMessageListenerContainer.setConcurrentConsumers(1);

simpleMessageListenerContainer.setMessageConverter(new Jackson2JsonMessageConverter());

simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动设置消息确认

return simpleMessageListenerContainer;

}

package com.stosz.web.test.rabbitmq.core.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.rabbitmq.client.Channel;

import com.stosz.web.test.rabbitmq.starter.MailSender;

import org.apache.log4j.Logger;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import java.util.Optional;

@Component

public class CommonMessageListener implements ChannelAwareMessageListener {

private static final Logger log = Logger.getLogger(CommonMessageListener.class);

@Autowired

private Map<String,RabbitMessageHandler> rabbitMessageHandlerMap;

@Autowired

private MailSender mailSender;

/**

* Callback for processing a received Rabbit message.

* <p>Implementors are supposed to process the given Message,

* typically sending reply messages through the given Session.

*

* @param message the received AMQP message (never <code>null</code>)

* @param channel the underlying Rabbit Channel (never <code>null</code>)

* @throws Exception Any.

*/

@Override

public void onMessage(Message message, Channel channel) throws Exception {

log.trace("message.getMessageProperties() : "+message.getMessageProperties());

log.trace("message.getBody() : "+message.getBody());

log.info("message : "+ new String(message.getBody()));

log.trace("message.getClass() : "+message.getClass());

log.trace("=======================================================");

log.trace("channel.getChannelNumber() : "+channel.getChannelNumber());

log.trace("channel.getDefaultConsumer() : "+channel.getDefaultConsumer());

log.trace("channel.getNextPublishSeqNo() : "+channel.getNextPublishSeqNo());

log.trace("channel.getCloseReason() : "+channel.getCloseReason());

log.trace("channel.getClass() : "+channel.getClass());

log.trace("channel.getConnection() : "+channel.getConnection());

if(null == message || message.getBody().length < 1)

{

doNotify("队列中有空消息",message,channel);

return;

}

String rabbitMessage = new String(message.getBody());

log.trace("接受到的消息:" + rabbitMessage);

try {

Map<String,Object> messageMap = new ObjectMapper().readValue(rabbitMessage, new HashMap<>().getClass());

String tableName = String.valueOf(Optional.ofNullable(messageMap.get("tableName")).orElse(""));

if(!rabbitMessageHandlerMap.containsKey(tableName))

{

doNotify("系统中暂时没有定义对应的"+tableName+"的消息处理器,请联系消息消费系统管理员进行添加",message,channel);

return;

}

String methodName= String.valueOf(Optional.ofNullable(messageMap.get("methodName")).orElse(""));

Map<String,Object> dataItem =(Map<String,Object>) Optional.ofNullable(messageMap.get("dataItem")).orElse(new HashMap<>());

RabbitMessageHandler rabbitMessageHandler = rabbitMessageHandlerMap.get(tableName);

boolean messageHandleSuccess = rabbitMessageHandler.handleMessage(methodName,dataItem);

if(messageHandleSuccess){

doSuccess(message,channel);

}else

{

doRetry(message,channel);

}

} catch (IOException e) {

e.printStackTrace();

doNotify(e.getMessage(),message,channel);

}finally {

// channel.close(200,"消息处理完毕,关闭channel!");

}

}

/**

* do not response the queue ack , need retry

*/

private void doRetry(Message message, Channel channel)

{

System.out.println("消息处理失败,返回非确认标志到队列");

try {

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

} catch (IOException e) {

e.printStackTrace();

}

}

/**

* response the queue ack , the message handle ok

*/

private void doSuccess(Message message, Channel channel)

{

System.out.println("消息处理成功,返回确认标志到队列");

try {

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

} catch (IOException e) {

e.printStackTrace();

}

}

/**

* the data is abnormal , should notify the system admin todo

*/

private void doNotify(String notifyMessage,Message message,Channel channel)

{

log.trace("消息不正常,发送邮件通知消费系统管理员,下面是消息内容。");

System.out.println(notifyMessage);

log.trace("message.getMessageProperties() : "+message.getMessageProperties());

log.trace("message.getBody() : "+message.getBody());

log.trace("message : "+ new String(message.getBody()));

log.trace("message.getClass() : "+message.getClass());

/**

* 防止消息不断重发,到邮件取查看消息即可;

*/

doSuccess(message,channel);

}

}

处理器

package com.stosz.web.test.rabbitmq.core.consumer;

import java.util.Map;

public interface RabbitMessageHandler {

boolean handleMessage(String method, Map<String,Object> dataItem);

}

package com.stosz.web.test.rabbitmq.core.consumer;

import org.apache.log4j.Logger;

import java.util.Arrays;

import java.util.List;

import java.util.Map;

import java.util.concurrent.atomic.AtomicBoolean;

public class UserMessageHandler implements RabbitMessageHandler {

private static final Logger log = Logger.getLogger(UserMessageHandler.class);

private final static List<String> userMethod = Arrays.asList("add","update","delete");

@Override

public boolean handleMessage(String method, Map<String, Object> dataItem) {

AtomicBoolean handleSuccess = new AtomicBoolean(false);

/**

* 判断操作是否支持;

*/

if(!userMethod.contains(method)) {

log.trace("不支持的method:".concat(method));

handleSuccess.set(false);

return handleSuccess.get();

}

switch (method)

{

case "add":

System.out.println("======模拟处理消息队列中的 user 的 add 操作 =============");

handleSuccess.set(true);

break;

case "update":

System.out.println("======模拟处理消息队列中的 user 的 update 操作 =============");

handleSuccess.set(true);

break;

case "delete":

System.out.println("======模拟处理消息队列中的 user 的 delete 操作 =============");

handleSuccess.set(true);

break;

}

return handleSuccess.get();

}

}

配置处理工厂

@Bean

public Map<String,RabbitMessageHandler> rabbitMessageHandlerMap()

{

ConcurrentHashMap<String,RabbitMessageHandler> rabbitMessageHandlerConcurrentHashMap = new ConcurrentHashMap<>();

//这里注册消息处理器

rabbitMessageHandlerConcurrentHashMap.putIfAbsent("user",new UserMessageHandler());

rabbitMessageHandlerConcurrentHashMap.putIfAbsent("zone",new ZoneMessageHandler());

return rabbitMessageHandlerConcurrentHashMap;

}

2,自定义事件的bug修复

采用的送spring的事件体系;

package com.stosz.web.test.rabbitmq.core.publisher;

import com.stosz.web.test.rabbitmq.core.message.MQMessage;

import org.springframework.context.ApplicationContext;

import org.springframework.context.event.ApplicationContextEvent;

public class DataChangeEvent extends ApplicationContextEvent {

private MQMessage message;

public MQMessage getMessage() {

return message;

}

public DataChangeEvent(ApplicationContext source, MQMessage message) {

super(source);

this.message = message;

}

}

package com.stosz.web.test.rabbitmq.core.publisher;

import com.stosz.web.test.rabbitmq.core.message.MQMessage;

import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.ApplicationListener;

import org.springframework.context.annotation.Scope;

import org.springframework.stereotype.Component;

@Component

@Scope("singleton")

public class DataChannelListener implements ApplicationListener<DataChangeEvent> {

@Autowired

private RabbitMessagingTemplate rabbitMessagingTemplate;

@Value("${rabbitmq.system.name}")

private String systemName;

@Override

public void onApplicationEvent(DataChangeEvent event) {

System.out.println("事件内容:"+event+" 容器对象:" + event.getApplicationContext().getDisplayName());

if(event instanceof DataChangeEvent) {

DataChangeEvent dataChangeEvent = event;

/**

* 获取消息,插入队列中;

*/

MQMessage mqMessage = dataChangeEvent.getMessage();

String tableName = mqMessage.getTableName();

String exchangeName = systemName+"Exchange";

String qName ="q_"+systemName+"_"+tableName;

// Object recieveObject = rabbitMessagingTemplate.convertSendAndReceive(exchangeName, qName, mqMessage, new Object().getClass());

rabbitMessagingTemplate.convertAndSend(exchangeName, qName, mqMessage);

// System.out.println("触发rabbitmq消息插入..."+ recieveObject);

System.out.println("触发rabbitmq消息插入...");

}

}

}

package com.stosz.web.test.rabbitmq.dao;

import com.stosz.web.test.rabbitmq.core.message.MQMessage;

import com.stosz.web.test.rabbitmq.core.publisher.DataChangeEvent;

import com.stosz.web.test.rabbitmq.entity.User;

import org.springframework.beans.BeansException;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

import org.springframework.stereotype.Repository;

import org.springframework.transaction.annotation.Propagation;

import org.springframework.transaction.annotation.Transactional;

import java.util.List;

import java.util.Optional;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.stream.Collectors;

@Repository

@Qualifier("userDao")

public class UserDaoImpl implements UserDao,ApplicationContextAware {

public static AtomicInteger id= new AtomicInteger(0);

private static ConcurrentHashMap<Integer,User> userDB = new ConcurrentHashMap<>(500);

protected ApplicationContext applicationEventPublisher;

@Override

@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRES_NEW)

public int add(String username, int age) {

System.out.println("添加用户成功! username= "+username + " age= " + age);

Integer userId = id.addAndGet(1);

User user = new User();

user.setId( userId.longValue());

user.setUsername(username);

user.setAge(age);

userDB.put(userId,user);

applicationEventPublisher.publishEvent(new DataChangeEvent(applicationEventPublisher,new MQMessage("user","add", user)));

return userId;

}

@Override

public boolean delete(int id) {

System.out.println("删除用户success , id = "+id);

userDB.remove(new Integer(id));

User user = new User();

user.setId((long) id);

applicationEventPublisher.publishEvent(new DataChangeEvent(applicationEventPublisher,new MQMessage("user","delete", user)));

return !userDB.contains(id);

}

@Override

public boolean update(int id, String username, int age) {

System.out.println("更新用户success , id = "+id +"原来的信息:"+ userDB.get(id) + "跟新信息是:"+username+"|" +age);

User user = new User();

user.setId((long)id);

user.setUsername(username);

user.setAge(age);

userDB.put(new Integer(id),user);

applicationEventPublisher.publishEvent(new DataChangeEvent(applicationEventPublisher,new MQMessage("user","update", user)));

return true;

}

@Override

public List<User> list() {

return userDB.values().stream().sorted((o1, o2) -> (int)(o1.getId()-o2.getId())).collect(Collectors.toList());

}

@Override

public User getById(int id) {

return Optional.ofNullable(userDB.get(id)).orElse(new User());

}

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationEventPublisher = applicationContext;

}

}

发生bug的原因是因为重复注册了监听器,扫描的包的位置精确控制即可。

3,讲解清楚,整理需要使用的文档

问题:强类型化的好处;


以上是关于rabbitmq消费者生产者实践的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ:如何在 Python 生产者和消费者之间发送 Python 字典?

rabbitmq的简单介绍

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。)

RabbitMQ工作模式

Go RabbitMQ

RabbitMQ简单Java示例——生产者和消费者