LuaTask教程之消息订阅,发布

Posted 合宙Luat

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LuaTask教程之消息订阅,发布相关的知识,希望对你有一定的参考价值。

本教程讲解LuaTask中的消息的发布和订阅的历程和原理。

先看一个例子:

小红是小明的姐姐。

小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。

后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看。

书架就是一个消息队列,小红是发布者,小明是订阅者。

当函数完成一个操作后,可以发布一个消息,其他函数可以订阅该消息并做对应的操作。举个例子,当socket发送完数据后发布“SEND_FINISH”。这时开发者想socket发布完成后通过串口发送数据或者改变某个IO口的状态。就可以订阅该消息subscribe("SEND_FINISH",callback)。callback为接收到SEND_FINISH消息后需要做的事,可为函数,可以线程。

先来看一个程序

--testMsgPub.lua
module(...,package.seeall)

require"sys"
local  a = 2
local function pub()
   print("pub")
   sys.publish("TEST",a)       --可以发布多个变量sys.publish("TEST",1,2,3)
end
pub()

--testMsgSub.lua
module(...,package.seeall)

require"sys"

local function subCallBack(...)
   print("rev",arg[1])
end

sys.subscribe("TEST",subCallBack)从trace输出可以看到发布成功,并且执行了订阅消息的回调函数。

如果要在任务函数中订阅消息并做相应的处理,怎么办?

odule(...,package.seeall)

require"sys"
local  a = 2
local function pub()
   print("pub")
   sys.publish("TEST",a)
end
pub()
sys.taskInit(function()
   while true do
       result, data = sys.waitUntil("TEST", 10000)
       if result == true then
           print("rev")
           print(data)
       end
       sys.wait(2000)
   end
end)

调用sys.waitUntil()函数即可。

接下来分析实现的源码

为了更好的理解源码,需要以下的预备知识:

1、回调函数的实现

local function callBackTest(...)
   print("callBack",arg[1])
end

local function test( a,callBackTest )
   if a > 1 then
       callBackTest(1)
   end
end
test(2,callBackTest)
--输出
--callBack    1

2、不定参数

function g (a, b, ...) end
g(3)              -- a=3, b=nil, arg={n=0}   -- n为不定参数的个数
g(3, 4)           -- a=3, b=4, arg={n=0}
g(3, 4, 5, 8)     -- a=3, b=4, arg={5, 8; n=2}  

进入正题

------------------------------------------ LUA应用消息订阅/发布接口 ------------------------------------------
-- 订阅者列表
local subscribers = {}
--内部消息队列
local messageQueue = {}

--- 订阅消息
-- @param id 消息id
-- @param callback 消息回调处理
-- @usage subscribe("NET_STATUS_IND", callback)
function subscribe(id, callback)
   if type(id) ~="{s}{t}{r}\in{g}"{\quad\text{or}\quad}{\left({t}{y}{p}{e}{\left({c}{a}{l}{l}{b}{a}{c}{k}\right)}\right.}~= "function" and type(callback) ~= "thread") then
       log.warn("warning: sys.subscribe invalid parameter", id, callback)
       return
   end
   if not subscribers[id] then subscribers[id] = {} end    -- 如果没有重复消息
   subscribers[id][callback] = true        --标记id和callback关系
end

--- 取消订阅消息
-- @param id 消息id
-- @param callback 消息回调处理
-- @usage unsubscribe("NET_STATUS_IND", callback)
function unsubscribe(id, callback)
   if type(id) ~="{s}{t}{r}\in{g}"{\quad\text{or}\quad}{\left({t}{y}{p}{e}{\left({c}{a}{l}{l}{b}{a}{c}{k}\right)}\right.}~= "function" and type(callback) ~= "thread") then
       log.warn("warning: sys.unsubscribe invalid parameter", id, callback)
       return
   end
   if subscribers[id] then subscribers[id][callback] = nil end  --删除id和callback关系
end

--- 发布内部消息,存储在内部消息队列中
-- @param ... 可变参数,用户自定义
-- @return 无
-- @usage publish("NET_STATUS_IND")
function publish(...)
   table.insert(messageQueue, arg)     -- 将不定参数插入队列中
end

-- 分发消息
local function dispatch()
   while true do
       if #messageQueue == 0 then      --如果队列长度为  跳出循环
           break
       end
       local message = table.remove(messageQueue, 1)   --获取队列的第一个
       if subscribers[message[1]] then                     --如果订消息存在
           for callback, _ in pairs(subscribers[message[1]]) do
               if type(callback) == "function" then
                   print("unpack",unpack(message, 2, #message))
                   callback(unpack(message, 2, #message))   -- 返回第二个到最后一个
               elseif type(callback) == "thread" then
                   coroutine.resume(callback, unpack(message))
               end
           end
       end
   end
end

以sys.publish("TEST",a)和sys.subscribe("TEST",subCallBack),订阅者列表为local subscribers = {}。内部消息队列为local messageQueue = {}为例:

1、在publish函数中,将"TEST"消息和参数插入messageQueue列表中

此时messageQueue中为{{"TEST",2;n=1}}

2、在subscribe函数中判断消息和callback类型是否正确,如果正确则在subscribers中建立消息与回调函数之间的关系。

此时subscribers["TEST"][subCallBack] = true。表明TEST消息对应的回掉函数为subCallBack

3、在dispatch()函数中,获得表头列表。

local message = table.remove(messageQueue, 1)

此时message为{"TEST",2;n=1}

找到该消息对应的回调函数或消息。将message中的参数传给回调函数。

通过pairs遍历得到消息对应的回调函数或者任务。

如果callback是函数,那么将publish时候的参数传给回调函数。

如果callback是线程,那么唤醒该线程。

以上只是单个消息举例,多个消息同理,因为每次循环都会将messageQueue的头部出队列,满足FIFO原则。

在有上基础下容易的理解waitUntil()的实现

--- Task任务的条件等待函数(包括事件消息和定时器消息等条件),只能用于任务函数中。
-- @param id 消息ID
-- @number ms 等待超时时间,单位ms,最大等待126322567毫秒
-- @return result 接收到消息返回true,超时返回false
-- @return data 接收到消息返回消息参数
-- @usage result, data = sys.waitUntil("SIM_IND", 120000)
function waitUntil(id, ms)
   subscribe(id, coroutine.running())        
   local message = ms and {wait(ms)} or {coroutine.yield()}
   unsubscribe(id, coroutine.running())
   return message[1] ~= nil, unpack(message, 2, #message)
end

1、订阅id,并传入线程号

2、阻塞线程,如果接收到了消息,那么返回message

3、取消订阅该id

4、返回结果

更多Luat教程,尽在Luat博客:http://blog.openluat.com



以上是关于LuaTask教程之消息订阅,发布的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq系列三 之发布/订阅

RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

小程序消息订阅发送功能教程

redis学习教程三《发送订阅事务连接》

Redis实现消息队列之发布订阅模式

“一切都是消息”--MSF(消息服务框架)之发布-订阅模式