rabbitmq消费者生产者实践
Posted 卡特的编程生活
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq消费者生产者实践相关的知识,希望对你有一定的参考价值。
完成消费者的模板化处理,并且可以配置 自定义事件的bug修复 讲解清楚,整理需要使用的文档 |
1,完成消费者的模板化处理,并且可以配置
核心配置代码: @Value("${rabbitmq.system.name}") private String systemName; @Autowired private RabbitMqCfg rabbitMqCfg; @Bean @Qualifier("connectionFactoryProducer") public ConnectionFactory cachingConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setPort(5672); connectionFactory.setUsername(rabbitMqCfg.getUsername()); connectionFactory.setPassword(rabbitMqCfg.getPassword()); connectionFactory.setHost(rabbitMqCfg.getHost()); connectionFactory.setVirtualHost(rabbitMqCfg.getVirtualHost()); connectionFactory.setChannelCacheSize(rabbitMqCfg.getChannelCacheSize()); connectionFactory.setPublisherConfirms(rabbitMqCfg.isPublisherConfirm()); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(cachingConnectionFactory()); } @Bean public DirectExchange systemExchange() { DirectExchange userExchange = new DirectExchange(systemName+"Exchange", true, false, null); amqpAdmin().declareExchange(userExchange); return userExchange; } /** * 可以按照这个模板申明多个绑定关系,在绑定关系里可以建立队列; * @return */ @Bean public Binding bindingUser() { Queue userQueue = new Queue("q_"+systemName+"_user",true,false,false,null); amqpAdmin().declareQueue(userQueue); Binding binding = BindingBuilder.bind(userQueue).to(systemExchange()).with(userQueue.getName()); amqpAdmin().declareBinding(binding); return binding; } |
@Autowired private CommonMessageListener channelAwareMessageListener; @Bean @Qualifier("SimpleMessageListenerContainer1") public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); simpleMessageListenerContainer.setConnectionFactory(connectionFactory); /** * 这里配置所有的需要监听的队列名; */ simpleMessageListenerContainer.setQueueNames("q_test_user"); simpleMessageListenerContainer.setMessageListener(channelAwareMessageListener); simpleMessageListenerContainer.setConcurrentConsumers(1); simpleMessageListenerContainer.setMessageConverter(new Jackson2JsonMessageConverter()); simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动设置消息确认 return simpleMessageListenerContainer; } |
package com.stosz.web.test.rabbitmq.core.consumer; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import com.stosz.web.test.rabbitmq.starter.MailSender; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @Component public class CommonMessageListener implements ChannelAwareMessageListener { private static final Logger log = Logger.getLogger(CommonMessageListener.class); @Autowired private Map<String,RabbitMessageHandler> rabbitMessageHandlerMap; @Autowired private MailSender mailSender; /** * Callback for processing a received Rabbit message. * <p>Implementors are supposed to process the given Message, * typically sending reply messages through the given Session. * * @param message the received AMQP message (never <code>null</code>) * @param channel the underlying Rabbit Channel (never <code>null</code>) * @throws Exception Any. */ @Override public void onMessage(Message message, Channel channel) throws Exception { log.trace("message.getMessageProperties() : "+message.getMessageProperties()); log.trace("message.getBody() : "+message.getBody()); log.info("message : "+ new String(message.getBody())); log.trace("message.getClass() : "+message.getClass()); log.trace("======================================================="); log.trace("channel.getChannelNumber() : "+channel.getChannelNumber()); log.trace("channel.getDefaultConsumer() : "+channel.getDefaultConsumer()); log.trace("channel.getNextPublishSeqNo() : "+channel.getNextPublishSeqNo()); log.trace("channel.getCloseReason() : "+channel.getCloseReason()); log.trace("channel.getClass() : "+channel.getClass()); log.trace("channel.getConnection() : "+channel.getConnection()); if(null == message || message.getBody().length < 1) { doNotify("队列中有空消息",message,channel); return; } String rabbitMessage = new String(message.getBody()); log.trace("接受到的消息:" + rabbitMessage); try { Map<String,Object> messageMap = new ObjectMapper().readValue(rabbitMessage, new HashMap<>().getClass()); String tableName = String.valueOf(Optional.ofNullable(messageMap.get("tableName")).orElse("")); if(!rabbitMessageHandlerMap.containsKey(tableName)) { doNotify("系统中暂时没有定义对应的"+tableName+"的消息处理器,请联系消息消费系统管理员进行添加",message,channel); return; } String methodName= String.valueOf(Optional.ofNullable(messageMap.get("methodName")).orElse("")); Map<String,Object> dataItem =(Map<String,Object>) Optional.ofNullable(messageMap.get("dataItem")).orElse(new HashMap<>()); RabbitMessageHandler rabbitMessageHandler = rabbitMessageHandlerMap.get(tableName); boolean messageHandleSuccess = rabbitMessageHandler.handleMessage(methodName,dataItem); if(messageHandleSuccess){ doSuccess(message,channel); }else { doRetry(message,channel); } } catch (IOException e) { e.printStackTrace(); doNotify(e.getMessage(),message,channel); }finally { // channel.close(200,"消息处理完毕,关闭channel!"); } } /** * do not response the queue ack , need retry */ private void doRetry(Message message, Channel channel) { System.out.println("消息处理失败,返回非确认标志到队列"); try { channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } catch (IOException e) { e.printStackTrace(); } } /** * response the queue ack , the message handle ok */ private void doSuccess(Message message, Channel channel) { System.out.println("消息处理成功,返回确认标志到队列"); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } } /** * the data is abnormal , should notify the system admin todo */ private void doNotify(String notifyMessage,Message message,Channel channel) { log.trace("消息不正常,发送邮件通知消费系统管理员,下面是消息内容。"); System.out.println(notifyMessage); log.trace("message.getMessageProperties() : "+message.getMessageProperties()); log.trace("message.getBody() : "+message.getBody()); log.trace("message : "+ new String(message.getBody())); log.trace("message.getClass() : "+message.getClass()); /** * 防止消息不断重发,到邮件取查看消息即可; */ doSuccess(message,channel); } } |
处理器 package com.stosz.web.test.rabbitmq.core.consumer; import java.util.Map; public interface RabbitMessageHandler { boolean handleMessage(String method, Map<String,Object> dataItem); } |
package com.stosz.web.test.rabbitmq.core.consumer; import org.apache.log4j.Logger; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; public class UserMessageHandler implements RabbitMessageHandler { private static final Logger log = Logger.getLogger(UserMessageHandler.class); private final static List<String> userMethod = Arrays.asList("add","update","delete"); @Override public boolean handleMessage(String method, Map<String, Object> dataItem) { AtomicBoolean handleSuccess = new AtomicBoolean(false); /** * 判断操作是否支持; */ if(!userMethod.contains(method)) { log.trace("不支持的method:".concat(method)); handleSuccess.set(false); return handleSuccess.get(); } switch (method) { case "add": System.out.println("======模拟处理消息队列中的 user 的 add 操作 ============="); handleSuccess.set(true); break; case "update": System.out.println("======模拟处理消息队列中的 user 的 update 操作 ============="); handleSuccess.set(true); break; case "delete": System.out.println("======模拟处理消息队列中的 user 的 delete 操作 ============="); handleSuccess.set(true); break; } return handleSuccess.get(); } } |
配置处理工厂 @Bean public Map<String,RabbitMessageHandler> rabbitMessageHandlerMap() { ConcurrentHashMap<String,RabbitMessageHandler> rabbitMessageHandlerConcurrentHashMap = new ConcurrentHashMap<>(); //这里注册消息处理器 rabbitMessageHandlerConcurrentHashMap.putIfAbsent("user",new UserMessageHandler()); rabbitMessageHandlerConcurrentHashMap.putIfAbsent("zone",new ZoneMessageHandler()); return rabbitMessageHandlerConcurrentHashMap; } |
2,自定义事件的bug修复
采用的送spring的事件体系;
package com.stosz.web.test.rabbitmq.core.publisher; import com.stosz.web.test.rabbitmq.core.message.MQMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.event.ApplicationContextEvent; public class DataChangeEvent extends ApplicationContextEvent { private MQMessage message; public MQMessage getMessage() { return message; } public DataChangeEvent(ApplicationContext source, MQMessage message) { super(source); this.message = message; } } |
package com.stosz.web.test.rabbitmq.core.publisher; import com.stosz.web.test.rabbitmq.core.message.MQMessage; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @Component @Scope("singleton") public class DataChannelListener implements ApplicationListener<DataChangeEvent> { @Autowired private RabbitMessagingTemplate rabbitMessagingTemplate; @Value("${rabbitmq.system.name}") private String systemName; @Override public void onApplicationEvent(DataChangeEvent event) { System.out.println("事件内容:"+event+" 容器对象:" + event.getApplicationContext().getDisplayName()); if(event instanceof DataChangeEvent) { DataChangeEvent dataChangeEvent = event; /** * 获取消息,插入队列中; */ MQMessage mqMessage = dataChangeEvent.getMessage(); String tableName = mqMessage.getTableName(); String exchangeName = systemName+"Exchange"; String qName ="q_"+systemName+"_"+tableName; // Object recieveObject = rabbitMessagingTemplate.convertSendAndReceive(exchangeName, qName, mqMessage, new Object().getClass()); rabbitMessagingTemplate.convertAndSend(exchangeName, qName, mqMessage); // System.out.println("触发rabbitmq消息插入..."+ recieveObject); System.out.println("触发rabbitmq消息插入..."); } } } |
package com.stosz.web.test.rabbitmq.dao; import com.stosz.web.test.rabbitmq.core.message.MQMessage; import com.stosz.web.test.rabbitmq.core.publisher.DataChangeEvent; import com.stosz.web.test.rabbitmq.entity.User; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @Repository @Qualifier("userDao") public class UserDaoImpl implements UserDao,ApplicationContextAware { public static AtomicInteger id= new AtomicInteger(0); private static ConcurrentHashMap<Integer,User> userDB = new ConcurrentHashMap<>(500); protected ApplicationContext applicationEventPublisher; @Override @Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRES_NEW) public int add(String username, int age) { System.out.println("添加用户成功! username= "+username + " age= " + age); Integer userId = id.addAndGet(1); User user = new User(); user.setId( userId.longValue()); user.setUsername(username); user.setAge(age); userDB.put(userId,user); applicationEventPublisher.publishEvent(new DataChangeEvent(applicationEventPublisher,new MQMessage("user","add", user))); return userId; } @Override public boolean delete(int id) { System.out.println("删除用户success , id = "+id); userDB.remove(new Integer(id)); User user = new User(); user.setId((long) id); applicationEventPublisher.publishEvent(new DataChangeEvent(applicationEventPublisher,new MQMessage("user","delete", user))); return !userDB.contains(id); } @Override public boolean update(int id, String username, int age) { System.out.println("更新用户success , id = "+id +"原来的信息:"+ userDB.get(id) + "跟新信息是:"+username+"|" +age); User user = new User(); user.setId((long)id); user.setUsername(username); user.setAge(age); userDB.put(new Integer(id),user); applicationEventPublisher.publishEvent(new DataChangeEvent(applicationEventPublisher,new MQMessage("user","update", user))); return true; } @Override public List<User> list() { return userDB.values().stream().sorted((o1, o2) -> (int)(o1.getId()-o2.getId())).collect(Collectors.toList()); } @Override public User getById(int id) { return Optional.ofNullable(userDB.get(id)).orElse(new User()); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationEventPublisher = applicationContext; } } |
发生bug的原因是因为重复注册了监听器,扫描的包的位置精确控制即可。 |
3,讲解清楚,整理需要使用的文档
问题:强类型化的好处; |
以上是关于rabbitmq消费者生产者实践的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ:如何在 Python 生产者和消费者之间发送 Python 字典?