基于RabbitMQ实现异步消息通知处理
Posted javatiange
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于RabbitMQ实现异步消息通知处理相关的知识,希望对你有一定的参考价值。
业务场景
一个简单的业务场景如:某web页面存在多个相互关联的异步获取数据展示的区域,如何优雅的实现一个区域的数据更新时异步通知其它区域进行数据刷新?
图1 业务场景
当列表数据操作状态变更时,让上面统计区域自动更新。
常见实现方式是,在“操作”方法中调用“刷新统计数据”的方法,但这种方式不可取,原因是当业务逻辑复杂时这将变得难以维护(十有八九会出Bug)。
解决的思路
当“操作”状态变更后发送一个通知(生产者),由关心这个操作的业务(消息者)订阅消息并处理。实现业务解藕,适合分布式部署。
异步消息通知方式
- Ajax 短轮询
Ajax 轮询主要通过页面端的JS定时异步刷新任务来实现数据的加载,但这种方式实时效果较差,而且对服务端的压力也较大。 - 长轮询
长轮询主要也是通过 Ajax机制,但区别于传统的Ajax应用,长轮询的服务器端会在没有数据时阻塞请求直到有新的数据产生或者请求超时才返回,之后客户端再重新建立连接获取数据。这种会长时间地占用资源,如果消息频繁发送的话会给服务端带来较大的压力。3. WebSocket 双向通信 - WebSocket 是 html5 中一种新的通信协议,能够实现浏览器与服务器之间全双工通信。如果浏览器和服务端都支持WebSocket协议的话,该方式实现的消息推送无疑是最高效、简洁的。并且最新版本的 IE、Firefox、Chrome等浏览器都已经支持 WebSocket 协议,Apache Tomcat 7.0.27 以后的版本也开始支持 WebSocket。
STOMP
即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。STOMP协议由于设计简单,易于开发客户端,因此在多种语言和多种平台上得到广泛地应用。许多公司都提供了基于STOMP的服务器与客户端,如RabbitMQ服务端,基于浏览器的stomp.js客户端等。
RabbitMQ
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列和路由,可靠且安全。RabbitMQ 是一个开源的 AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、php、ActionScript、XMPP、STOMP 等,支持 Ajax。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ Web STOMP
该插件可以理解为 HTML5 WebSocket 与 STOMP 协议间的桥接,目的也是为了让浏览器能够使用 RabbitMQ。当 RabbitMQ 消息服务器开启了 STOMP 和 Web STOMP 插件后,浏览器端就可以轻松地使用 WebSocket 或者 SockerJS 客户端实现与 RabbitMQ 服务器进行通信。
RabbitMQ Web STOMP 是对 STOMP 协议的桥接,因此其语法也完全遵循 STOMP 协议。STOMP 是基于 frame 的协议,与 HTTP 的 frame 相似。一个 Frame 包含一个 command,一系列可选的 headers 和一个 body。STOMP client 的用户代理可以充当两个角色,当然也可能同时充当:作为生产者,通过 SEND frame 发送消息到服务器;作为消费者,发送 SUBCRIBE frame 到目的地并且通过 MESSAGE frame 从服务器获取消息。
在Web页面中利用WebSocket使用STOMP协议只需要下载stomp.js即可,考虑到老版本的浏览器不支持 WebSocket,SockJS 则提供了 WebSocket 的模拟支持。
RabbitMQ安装
见:RabbitMQ在CentOS7安装
解决问题
引入js插件
<script src="jquery/jquery-1.9.1.min.js"></script>
<script src="rabbitmq/sockjs-0.3.js"></script>
<script src="rabbitmq/stomp.js"></script>
<link rel="stylesheet" href="bootstrap/css/bootstrap.min.css">
<script src="bootstrap/js/bootstrap.min.js"></script>
测试页面html
<body>
<div style="padding: 50px 100px">
<button class="btn btn-success" type="button">
待出票 <span class="badge" id="notout">0</span>
</button>
<button class="btn btn-warning" type="button">
已出票<span class="badge" id="out">0</span>
</button>
<button class="btn btn-danger" type="button">
已退票 <span class="badge" id="break">0</span>
</button>
<button class="btn btn-primary" type="button">
已取消 <span class="badge" id="cancel">0</span>
</button>
<table class="table">
<thead>
<tr>
<th>产品</th>
<th>日期</th>
<th>状态</th>
<th>操作</th>
</tr>
</thead>
<tbody>
<tr>
<td>PEK-CKG</td>
<td>23/11/2017</td>
<td class="status">待出票</td>
<td><button class="btn btn-primary exStatus" type="button">出票</button></td>
</tr>
<tr>
<td>CAN-CKG</td>
<td>20/10/2017</td>
<td class="status">待出票</td>
<td><button class="btn btn-primary exStatus" type="button">出票</button></td>
</tr>
<tr>
<td>CAN-CKG</td>
<td>20/10/2017</td>
<td class="status">待出票</td>
<td><button class="btn btn-primary exStatus" type="button">出票</button></td>
</tr>
<tr>
<td>CUT-PEK</td>
<td>10/11/2017</td>
<td class="status">已出票</td>
<td><button class="btn btn-warning exStatus" type="button">退票</button></td>
</tr>
<tr>
<td>PEK-CKG</td>
<td>23/11/2017</td>
<td class="status">待支付</td>
<td><button class="btn btn-primary exStatus" type="button">取消</button></td>
</tr>
<tr>
<td>PEK-CKG</td>
<td>23/11/2017</td>
<td class="status">待支付</td>
<td><button class="btn btn-primary exStatus" type="button">取消</button></td>
</tr>
<tr>
<td>SHA-PEK</td>
<td>20/10/2017</td>
<td class="status">已退票</td>
<td></td>
</tr>
</tbody>
</table>
</div>
</body>
页面事件绑定和数据初始化
<script type="text/javascript">
var countNotout=function(data){
//真实业务可能是ajax向后台请求数据
$('#notout').html($("td:contains('待出票')").length);
};
var countOut=function(data){
//真实业务可能是ajax向后台请求数据
$('#out').html($("td:contains('已出票')").length);
};
var countBreak=function(data){
//真实业务可能是ajax向后台请求数据
$('#break').html($("td:contains('已退票')").length);
};
var countCancel = function(data) {
//真实业务可能是ajax向后台请求数据
$('#cancel').html($("td:contains('已取消')").length);
};
$(document).ready(function(){
countNotout();
countOut();
countBreak();
countCancel();
});
//事件绑定
$(".exStatus").click(function() {
if ('出票' === $(this).html()) {
$(this).removeClass().addClass('btn btn-warning exStatus');
$(this).parents('tr').find(".status").html('已出票');
$(this).html('退票');
//发送通知
sendMQ("doOut");
return;
}
if ('退票' === $(this).html()) {
$(this).parents('tr').find(".status").html('已退票');
$(this).remove();
//发送通知
sendMQ("doBreak");
return;
}
if ('取消' === $(this).html()) {
$(this).parents('tr').find(".status").html('已取消');
$(this).remove();
//发送通知
sendKeyMQ("doCancel");
return;
}
});
</script>
连接到RabbitMQ服务器,订阅消息通知
<script type="text/javascript">
//connection rabbitmq
var username = 'kaven';
var password = 'kaven123';
// Stomp.js boilerplate
if (location.search == '?ws') {
var ws = new WebSocket('ws://master:15674/ws');
console.log('Using WebSocket...');
} else {
var ws = new SockJS('http://master:15674/stomp');
console.log('Using SockJS...');
}
// Init Client
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
// Declare on_connect
var on_connect = function(x) {
//订阅模式
client.subscribe("/exchange/amq.fanout/rabbitmq_routingkey",
function(d) {
countOut(d.body);
});
//订阅模式
client.subscribe("/exchange/amq.fanout/rabbitmq_routingkey",
function(d) {
countBreak(d.body);
});
//订阅模式
client.subscribe("/exchange/amq.fanout/rabbitmq_routingkey",
function(d) {
countNotout(d.body);
});
//Direct模式
client.subscribe("/exchange/amq.direct/rabbitmq_orderCancel",
function(d) {
countCancel(d.body);
});
};
// 定义连接失败回调函数
var on_error = function(error) {
console.log(error.headers.message);
};
// Conect to RabbitMQ
client.connect(username, password, on_connect, on_error, '/');
var sendMQ = function(data) {
client.send('/exchange/amq.fanout/rabbitmq_routingkey', {"content-type" : "text/plain"}, data);
};
var sendKeyMQ = function(data) {
client.send('/exchange/amq.direct/rabbitmq_orderCancel', {"content-type" : "text/plain"}, data);
};
</script>
适用范围
此框架模型至少可以适用以下情况:
- 不同业务模块之间
- 不同开发语言之间
- 不同应用服务器之间
- 不同应用设备之间
写在最后
一篇神文档就把java多线程,锁,JMM,JUC和高并发设计模式讲明白了
最后给大家分享一篇一线开发大牛整理的java高并发核心编程神仙文档,里面主要包含的知识点有:多线程、线程池、内置锁、JMM、CAS、JUC、高并发设计模式、Java异步回调、CompletableFuture类等。
首先,咱们先来看目录
下面是详细的目录
其次咱们来看每个小节都有哪些内容
多线程原理与实战;
Java内置锁的核心原理;
CAS原理与JUC原子类;
可见性与有序性的原理;
JUC显式锁的原理与实战;
AQS抽象同步器的核心原理;
JUC容器类;
高并发设计模式;
高并发核心模式之异步回调模式;
CompletableFuture异步回调;
因为文章内容实在是太多了,不能够给大家一一体现出来,每个章节都有更加细化的内容。大家需要完整版文档的小伙伴,可以一键三连,下方获取免费领取方式!
以上是关于基于RabbitMQ实现异步消息通知处理的主要内容,如果未能解决你的问题,请参考以下文章