使用ActiveMQ实现简易聊天功能

Posted flyuz

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用ActiveMQ实现简易聊天功能相关的知识,希望对你有一定的参考价值。

一 什么是消息队列

我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ

 

二 为什么要用消息队列

使用消息队列主要有两点好处:

1.通过异步处理提高系统性能(削峰、减少响应所需时间);

2.降低系统耦合性。如果在面试的时候你被面试官问到这个问题的话,一般情况是你在你的简历上涉及到消息队列这方面的内容,这个时候推荐你结合你自己的项目来回答。

 

三 ActiveMQ

ActiveMQ 是基于 JMS 规范实现的。

JMS消息列队有两种消息模式,一种是点对点的消息模式,还有一种是订阅的模式。

 

四 实现

ActiveMQ下载地址:http://activemq.apache.org/components/classic/download/

解压缩apache-activemq-5.xxx-bin.zip到一个目录

启动ActiveMQ:运行C: apache-activemq-5.xxxinactivemq.bat

浏览器中输入:http://localhost:8161/admin/ 测试启动情况

 

使用点对点方式实现聊天功能

编写消息发送类和接收类。发送类中需要连接ActiveMQ 服务器,创建队列,发送消息;接收类中需要ActiveMQ 服务器,读取发送者发送消息所用的队列。接收类实现为一个单独的线程,使用监听器模式,每隔一段时间侦听是否有消息到来,若有消息到来,将消息添加到辅助类消息列表中。

使用2个队列,即对于每一个用户来说,发送消息为一个队列,接受消息为一个队列。

效果如下:

 

技术图片

技术图片
 1 import org.apache.activemq.ActiveMQConnectionFactory;
 2 
 3 import javax.jms.*;
 4 
 5 import static java.lang.Thread.sleep;
 6 
 7 public class MessageReceiver implements Runnable{
 8     private String url;
 9     private String user;
10     private String password;
11     private final String QUEUE;
12     private Boolean stop;
13     Connection connection;
14 
15     public MessageReceiver(String queue, String url, String user, String password) {
16         this.url = url;
17         this.user = user;
18         this.password = password;
19         this.QUEUE = queue;
20         stop = false;
21     }
22 
23     public void run() {
24         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
25         try {
26             connection = connectionFactory.createConnection();
27             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
28             Destination receiveQueue = session.createQueue(QUEUE);
29             MessageConsumer consumer = session.createConsumer(receiveQueue);
30             connection.start();
31             while(!stop) {
32                 consumer.setMessageListener(new MessageListener() {
33                     @Override
34                     public void onMessage(Message message) {
35                         try {
36                             //获取到接收的数据
37                             String text = ((TextMessage) message).getText();
38                             MessageText.setMsg(text);
39                         } catch (JMSException e) {
40                             e.printStackTrace();
41                         }
42                     }
43                 });
44                 sleep(500);
45             }
46         } catch (JMSException e) {
47             e.printStackTrace();
48         }catch (InterruptedException e) {
49             //Thread.currentThread().interrupt();
50             e.printStackTrace();
51         }
52     }
53 
54     public void setStop(Boolean stop) {
55         this.stop = stop;
56     }
57 
58     public void closeConnection(){
59         try {
60             connection.close();
61         } catch (JMSException e) {
62             e.printStackTrace();
63         }
64     }
65 
66     public String getUrl() {
67         return url;
68     }
69 
70     public void setUrl(String url) {
71         this.url = url;
72     }
73 
74     public String getUser() {
75         return user;
76     }
77 
78     public void setUser(String user) {
79         this.user = user;
80     }
81 
82     public String getPassword() {
83         return password;
84     }
85 
86     public void setPassword(String password) {
87         this.password = password;
88     }
89 }
MessageReceiver
技术图片
 1 import org.apache.activemq.ActiveMQConnectionFactory;
 2 
 3 import javax.jms.*;
 4 import java.text.DateFormat;
 5 import java.text.SimpleDateFormat;
 6 import java.util.Date;
 7 
 8 public class MessageSender {
 9     private String url;
10     private String user;
11     private String password;
12     private final String QUEUE;
13     private Connection connection;
14     private Session session;
15     private Destination sendQueue;
16     private MessageProducer sender;
17     private TextMessage outMessage;
18     private DateFormat df;
19 
20     public MessageSender(String queue, String url, String user, String password) {
21         this.url = url;
22         this.user = user;
23         this.password = password;
24         this.QUEUE = queue;
25     }
26 
27     public void init() {
28         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
29         df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
30         try {
31             connection = connectionFactory.createConnection();
32             connection.start();
33             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
34             sendQueue = session.createQueue(QUEUE);
35             sender = session.createProducer(sendQueue);
36             outMessage = session.createTextMessage();
37         } catch (JMSException e) {
38             e.printStackTrace();
39         }
40     }
41 
42     public void sendMessage(String messageStr) {
43         try {
44             outMessage = session.createTextMessage();
45             String sendStr = df.format(new Date()) + "
" + QUEUE + ": " + messageStr;
46             outMessage.setText(sendStr);
47             sender.send(outMessage);
48             session.commit();
49             MessageText.setMsg(sendStr);
50         } catch (JMSException e) {
51             e.printStackTrace();
52         }
53     }
54 
55     public void closeConnection() {
56         try {
57             sender.close();
58             connection.close();
59         } catch (JMSException e) {
60             e.printStackTrace();
61         }
62     }
63 
64     public String getUrl() {
65         return url;
66     }
67 
68     public void setUrl(String url) {
69         this.url = url;
70     }
71 
72     public String getUser() {
73         return user;
74     }
75 
76     public void setUser(String user) {
77         this.user = user;
78     }
79 
80     public String getPassword() {
81         return password;
82     }
83 
84     public void setPassword(String password) {
85         this.password = password;
86     }
87 }
MessageSender

 

以上是关于使用ActiveMQ实现简易聊天功能的主要内容,如果未能解决你的问题,请参考以下文章

Java实现简易聊天室

Socket编程一实现简易的聊天功能以及文件传输

Ajax PHP JavaScript MySQL实现简易的无刷新在线聊天室

Java Dome(实现一个简易版QQ聊天室)

vue + socket.io实现一个简易聊天室

nio 代码实现简易多人聊天