使用ActiveMQ实现简易聊天功能
Posted flyuz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用ActiveMQ实现简易聊天功能相关的知识,希望对你有一定的参考价值。
一 什么是消息队列
我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ
二 为什么要用消息队列
使用消息队列主要有两点好处:
1.通过异步处理提高系统性能(削峰、减少响应所需时间);
2.降低系统耦合性。如果在面试的时候你被面试官问到这个问题的话,一般情况是你在你的简历上涉及到消息队列这方面的内容,这个时候推荐你结合你自己的项目来回答。
三 ActiveMQ
ActiveMQ 是基于 JMS 规范实现的。
JMS消息列队有两种消息模式,一种是点对点的消息模式,还有一种是订阅的模式。
四 实现
ActiveMQ下载地址:http://activemq.apache.org/components/classic/download/
解压缩apache-activemq-5.xxx-bin.zip到一个目录
启动ActiveMQ:运行C: apache-activemq-5.xxxinactivemq.bat
浏览器中输入:http://localhost:8161/admin/ 测试启动情况
使用点对点方式实现聊天功能
编写消息发送类和接收类。发送类中需要连接ActiveMQ 服务器,创建队列,发送消息;接收类中需要ActiveMQ 服务器,读取发送者发送消息所用的队列。接收类实现为一个单独的线程,使用监听器模式,每隔一段时间侦听是否有消息到来,若有消息到来,将消息添加到辅助类消息列表中。
使用2个队列,即对于每一个用户来说,发送消息为一个队列,接受消息为一个队列。
效果如下:
1 import org.apache.activemq.ActiveMQConnectionFactory; 2 3 import javax.jms.*; 4 5 import static java.lang.Thread.sleep; 6 7 public class MessageReceiver implements Runnable{ 8 private String url; 9 private String user; 10 private String password; 11 private final String QUEUE; 12 private Boolean stop; 13 Connection connection; 14 15 public MessageReceiver(String queue, String url, String user, String password) { 16 this.url = url; 17 this.user = user; 18 this.password = password; 19 this.QUEUE = queue; 20 stop = false; 21 } 22 23 public void run() { 24 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 25 try { 26 connection = connectionFactory.createConnection(); 27 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 28 Destination receiveQueue = session.createQueue(QUEUE); 29 MessageConsumer consumer = session.createConsumer(receiveQueue); 30 connection.start(); 31 while(!stop) { 32 consumer.setMessageListener(new MessageListener() { 33 @Override 34 public void onMessage(Message message) { 35 try { 36 //获取到接收的数据 37 String text = ((TextMessage) message).getText(); 38 MessageText.setMsg(text); 39 } catch (JMSException e) { 40 e.printStackTrace(); 41 } 42 } 43 }); 44 sleep(500); 45 } 46 } catch (JMSException e) { 47 e.printStackTrace(); 48 }catch (InterruptedException e) { 49 //Thread.currentThread().interrupt(); 50 e.printStackTrace(); 51 } 52 } 53 54 public void setStop(Boolean stop) { 55 this.stop = stop; 56 } 57 58 public void closeConnection(){ 59 try { 60 connection.close(); 61 } catch (JMSException e) { 62 e.printStackTrace(); 63 } 64 } 65 66 public String getUrl() { 67 return url; 68 } 69 70 public void setUrl(String url) { 71 this.url = url; 72 } 73 74 public String getUser() { 75 return user; 76 } 77 78 public void setUser(String user) { 79 this.user = user; 80 } 81 82 public String getPassword() { 83 return password; 84 } 85 86 public void setPassword(String password) { 87 this.password = password; 88 } 89 }
1 import org.apache.activemq.ActiveMQConnectionFactory; 2 3 import javax.jms.*; 4 import java.text.DateFormat; 5 import java.text.SimpleDateFormat; 6 import java.util.Date; 7 8 public class MessageSender { 9 private String url; 10 private String user; 11 private String password; 12 private final String QUEUE; 13 private Connection connection; 14 private Session session; 15 private Destination sendQueue; 16 private MessageProducer sender; 17 private TextMessage outMessage; 18 private DateFormat df; 19 20 public MessageSender(String queue, String url, String user, String password) { 21 this.url = url; 22 this.user = user; 23 this.password = password; 24 this.QUEUE = queue; 25 } 26 27 public void init() { 28 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 29 df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 30 try { 31 connection = connectionFactory.createConnection(); 32 connection.start(); 33 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 34 sendQueue = session.createQueue(QUEUE); 35 sender = session.createProducer(sendQueue); 36 outMessage = session.createTextMessage(); 37 } catch (JMSException e) { 38 e.printStackTrace(); 39 } 40 } 41 42 public void sendMessage(String messageStr) { 43 try { 44 outMessage = session.createTextMessage(); 45 String sendStr = df.format(new Date()) + " " + QUEUE + ": " + messageStr; 46 outMessage.setText(sendStr); 47 sender.send(outMessage); 48 session.commit(); 49 MessageText.setMsg(sendStr); 50 } catch (JMSException e) { 51 e.printStackTrace(); 52 } 53 } 54 55 public void closeConnection() { 56 try { 57 sender.close(); 58 connection.close(); 59 } catch (JMSException e) { 60 e.printStackTrace(); 61 } 62 } 63 64 public String getUrl() { 65 return url; 66 } 67 68 public void setUrl(String url) { 69 this.url = url; 70 } 71 72 public String getUser() { 73 return user; 74 } 75 76 public void setUser(String user) { 77 this.user = user; 78 } 79 80 public String getPassword() { 81 return password; 82 } 83 84 public void setPassword(String password) { 85 this.password = password; 86 } 87 }
以上是关于使用ActiveMQ实现简易聊天功能的主要内容,如果未能解决你的问题,请参考以下文章