Java 异步处理 RabbitMQ

Posted 程序员泥瓦匠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 异步处理 RabbitMQ相关的知识,希望对你有一定的参考价值。


点击左上角蓝字,关注“锅外的大佬”

专注分享国外最新技术内容


很多开发人员说,将应用程序切换到异步处理很复杂。因为他们有一个天然需要同步通信的Web应用程序。在这篇文章中,我想介绍一种方法来达到异步通信的目的:使用一些众所周知的库和工具来设计他们的系统。 下面的例子是用Java编写的,但我相信它更多的是基本原理,同一个应用程序可以用任何语言来重新写。

所需的工具和库:

  • Spring Boot

  • RabbitMQ

1.Web应用程序

一个用Spring MVC编写的Web应用程序并运行在Tomcat上。 它所做的只是将一个字符串发送到一个队列中 (异步通信的开始) 并等待另一个队列中的消息作为HTTP响应发送回来。

首先,我们需要定义几个依赖项,然后等待Spring Boot执行所有必要的自动配置。

 
   
   
 
  1. <dependencies>

  2. <dependency>

  3. <groupId>org.springframework.boot</groupId>

  4. <artifactId>spring-boot-starter-web</artifactId>

  5. </dependency>

  6. <dependency>

  7. <groupId>org.springframework.boot</groupId>

  8. <artifactId>spring-boot-starter-amqp</artifactId>

  9. </dependency>

  10. <dependency>

  11. <groupId>com.thedeanda</groupId>

  12. <artifactId>lorem</artifactId>

  13. </dependency>

  14. </dependencies>

 
   
   
 
  1. @SpringBootApplication

  2. public class BlockingApplication {

  3. public static void main(String[] args) {

  4. SpringApplication.run(BlockingApplication.class, args);

  5. }

  6. @RestController

  7. public static class MessageController {

  8. private final RabbitTemplate rabbitTemplate;

  9. public MessageController(CachingConnectionFactory connectionFactory) {

  10. this.rabbitTemplate = new RabbitTemplate(connectionFactory);

  11. }

  12. @GetMapping("invoke")

  13. public String sendMessage() {

  14. Message response = rabbitTemplate.sendAndReceive("uppercase", null, request());

  15. return new String(response.getBody());

  16. }

  17. private static Message request() {

  18. Lorem LOREM = LoremIpsum.getInstance();

  19. String name = LOREM.getFirstName() + " " + LOREM.getLastName();

  20. return new Message(name.getBytes(), new MessageProperties());

  21. }

  22. }

  23. @Bean

  24. public CachingConnectionFactory connectionFactory() {

  25. CachingConnectionFactory factory = new CachingConnectionFactory();

  26. factory.setAddresses("localhost:5672");

  27. factory.setUsername("admin");

  28. factory.setPassword("admin");

  29. return factory;

  30. }

  31. }

2.消费端应用程序

第二个应用程序仅仅是一个等待消息的RabbitMQ的消费端,将拿到的字符串转换为大写,然后将此结果发送到输出队列中。

 
   
   
 
  1. <dependencies>

  2. <dependency>

  3. <groupId>org.springframework.boot</groupId>

  4. <artifactId>spring-boot-starter-amqp</artifactId>

  5. </dependency>

  6. </dependencies>

 
   
   
 
  1. @SpringBootApplication

  2. public class ServiceApplication {

  3. public static void main(String[] args) {

  4. SpringApplication.run(ServiceApplication.class, args);

  5. }

  6. public static class MessageListener {

  7. public String handleMessage(byte[] message) {

  8. Random rand = new Random();

  9. // Obtain a number between [0 - 49] + 50 = [50 - 99]

  10. int n = rand.nextInt(50) + 50;

  11. String content = new String(message);

  12. try {

  13. Thread.sleep(n);

  14. } catch (InterruptedException e) {

  15. e.printStackTrace();

  16. }

  17. return content.toUpperCase();

  18. }

  19. }

  20. @Bean

  21. public CachingConnectionFactory connectionFactory() {

  22. CachingConnectionFactory factory = new CachingConnectionFactory();

  23. factory.setAddresses("localhost:5672");

  24. factory.setUsername("admin");

  25. factory.setPassword("admin");

  26. return factory;

  27. }

  28. @Bean

  29. public SimpleMessageListenerContainer serviceListenerContainer() {

  30. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

  31. container.setConnectionFactory(connectionFactory());

  32. container.setConcurrentConsumers(20);

  33. container.setMaxConcurrentConsumers(40);

  34. container.setQueueNames("uppercase_messages");

  35. container.setMessageListener(new MessageListenerAdapter(new MessageListener()));

  36. return container;

  37. }

  38. }

3.底层如何执行的?

程序启动并首次调用sendMessage()方法后,我们可以看到Spring AMQP支持自动创建了一个新的回复队列并等待来自我们的服务应用程序的响应。

 
   
   
 
  1. 2019-05-12 17:23:21.451 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]

  2. 2019-05-12 17:23:21.457 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-VF-iqD9rLEuljIBstbCI1A identity=10e58093] started

如果我们在消费端应用程序中查看消息,我们可以看到Spring自动传播有关回复队列的信息以及相关ID,用于将其传递回Web应用程序以便能够将请求和响应配对在一起。

这就是发生魔术的地方。 当然,如果您想使其更复杂,您可以在协作中包含更多服务,然后将Web应用程序的最终响应放入与自动生成的队列不同的队列中, 该队列只具有正确的关联ID。 另外,不要忘记设置合理的超时。

这个解决方案还有一个很大的缺点 - 应用程序吞吐量。 我故意这样做,以便我可以跟进这篇文章,进一步深入调查 AsyncProfiler! 但是目前,我们使用Tomcat作为主HTTP服务器,默认为200个线程,这意味着我们的应用程序无法同时处理200多条消息,因为我们的服务器线程正在等待RabbitMQ 回复队列的响应,直到有消息进入或发生超时。

感谢您阅读本文,敬请关注后续内容! 如果您想自己尝试一下,请查看我的GitHub存储库。

原文链接:https://dzone.com/articles/how-to-split-up-a-synchronous-and-asynchronous-of

译者:KeepGoingPawn 

--END--


下方二维码关注我

Java 异步处理 RabbitMQ

因为坚持分享可落地的技术架构文章

以上是关于Java 异步处理 RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

Java面试题--RabbitMQ

Java面试-RabbitMQ

Java教程之RabbitMQ介绍

Java教程:RabbitMq如何开启发布手动确认模式,采用及时或异步方式确定消息是否发送到队列

好程序员Java学习资源分享RabbitMQ介绍

喵星之旅-狂奔的兔子-rabbitmq的java客户端使用入门