如何使用NODEJS+REDIS开发一个消息队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用NODEJS+REDIS开发一个消息队列相关的知识,希望对你有一定的参考价值。
MQ全称为MessageQueue,
消息队列(MQ)是一种应用程
序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们>。消
息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过
队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM
WEBSPHERE
MQ。
以上介绍仍旧来自百度百科.
消息队列产品对比
目前比较流行的MQ有2种,ActiveMQ
以及
RabbitMQ
,
RabbitMQ性能号称能够达到每秒10000,而REDIS官方的压力测试值在7-8万之间,而且是去掉了网络IO操作,真实情况我估计在每秒2-3
万的并发操作,但这个数目对于一般的应用应该足够了.
Redis如何支持消息队列?
在新版本的redis
v2.6以上以及以上版本开始支持
subscribe
以及
publish
操作,
subscribe订阅一个频道,publish可以像频道广播消息.
这个机制最老的应用应该是算是聊天室了.
Sub/Pub
模式固然很好用,但是同样有一个问题,就是如果有多个人订阅了同一频道,而这个频道的数据只能被一个接收方处理,不能够重复处理,这时该怎么办?
解决方法有2种,
1.
publish
将数据写入到一个list
or
sorted
list
队列,写完成后开始给终端广播消息,告诉大家,有新的数据等待处理,这个时候,谁能pop到数据,就是谁处理,这个操作是原子性的,也就是说不会被重复处理.
2.
使用阻塞模式,
redis提供了blpop
brpop这种操作,也就是一直阻塞一个队列,直到有数据来.
这种模式保证了数据的原子性,而且使应用程序可以支持分布式多台机器部署.
Sub/Pub模式
(sub.js):
var
redis
=
require("redis");
var
client
=
redis.createClient(6379,
‘127.0.0.1‘,
connect_timeout:
1);
//订阅一个频道
var
sub
=
function(c)
var
c
=
c
||
‘roban:test:channel‘;
client.subscribe(c,function(e)
console.log(‘starting
subscribe
channel:‘+c);
);
;
//订阅一个频道
sub();
//处理错误,如果出现错误,或者服务器断开了链接,等待恢复时,继续订阅这个频道
client.on(‘error‘,
function(error)
console.log(error);
sub();
);
//订阅处理函数
client.on(‘message‘,function(err,response)
console.log(response);
);
打开redis命令行,输入以下命令:
publish
roban:test:channel
hello
发布这条信息后,sub端会输出以下信息:
Robans-Pro:node
robanlee$
node
demo.js
starting
subscribe
channel:roban:test:channel
hello 参考技术A /**
* Add task
* @author Robanlee@gmail.com
*/
//加载函数,集中加载一些LIB,这个源码请参照最后的附属文件
var loader = require('./loader');
function addTask(opts)
new loader(this);
//默认设置
this.opts =
keyIDs:'schedule:job:ids',
keyLists:'schedule:job:list',
keyJob:'schedule:job:'
//合并配置,类似JQUERY: extend
this.mergeParams(opts);
;
//Merge options
addTask.prototype.mergeParams = function ( param )
if(undefined === this.opts )
return false;
for(var x in param)
if(param[x] != undefined && '' != param[x])
this.opts[x] = param[x];
;
//添加数据方法
addTask.prototype.pushData = function ( data )
if(undefined == data )
console.log('--ERROR:data is null');
return false;
this.getIncr.call(this,function(response,obj)
var id = response;
obj.redisClient.rpush(obj.opts.keyLists,id,function(err,response)
if(err) throw err;
);
data.id = id;
var m = obj.redisClient.multi();
for(var x in data)
m.hset( obj.opts.keyJob+id,x,data[x] );
m.exec(function(err,response)
if(err) throw err;
console.log('[info] Task: ['+data.name+'] has been set successful!');
);
);
;
//获取REDIS目前的自增ID
addTask.prototype.getIncr = function (callBack)
var obj = this;
this.redisClient.incr(this.opts.keyIDs,function(err,response)
console.log("[info] Current id is : " + response);
callBack(response, obj);
);
;本回答被提问者采纳
以上是关于如何使用NODEJS+REDIS开发一个消息队列的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 nodejs 将消息发送到 RabbitMQ 队列
面试官心理分析+面试题剖析:消息队列+Redis 缓存+分布式系统等
实时聊天,不想丢消息——用队列(redis、zeromq)?