LuaTask教程之消息订阅,发布
Posted 合宙Luat
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LuaTask教程之消息订阅,发布相关的知识,希望对你有一定的参考价值。
本教程讲解LuaTask中的消息的发布和订阅的历程和原理。
先看一个例子:
小红是小明的姐姐。
小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。
后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看。
书架就是一个消息队列,小红是发布者,小明是订阅者。
当函数完成一个操作后,可以发布一个消息,其他函数可以订阅该消息并做对应的操作。举个例子,当socket发送完数据后发布“SEND_FINISH”。这时开发者想socket发布完成后通过串口发送数据或者改变某个IO口的状态。就可以订阅该消息subscribe("SEND_FINISH",callback)。callback为接收到SEND_FINISH消息后需要做的事,可为函数,可以线程。
先来看一个程序
--testMsgPub.lua
module(...,package.seeall)
require"sys"
local a = 2
local function pub()
print("pub")
sys.publish("TEST",a) --可以发布多个变量sys.publish("TEST",1,2,3)
end
pub()
--testMsgSub.lua
module(...,package.seeall)
require"sys"
local function subCallBack(...)
print("rev",arg[1])
end
sys.subscribe("TEST",subCallBack)从trace输出可以看到发布成功,并且执行了订阅消息的回调函数。
如果要在任务函数中订阅消息并做相应的处理,怎么办?
odule(...,package.seeall)
require"sys"
local a = 2
local function pub()
print("pub")
sys.publish("TEST",a)
end
pub()
sys.taskInit(function()
while true do
result, data = sys.waitUntil("TEST", 10000)
if result == true then
print("rev")
print(data)
end
sys.wait(2000)
end
end)
调用sys.waitUntil()函数即可。
接下来分析实现的源码
为了更好的理解源码,需要以下的预备知识:
1、回调函数的实现
local function callBackTest(...)
print("callBack",arg[1])
end
local function test( a,callBackTest )
if a > 1 then
callBackTest(1)
end
end
test(2,callBackTest)
--输出
--callBack 1
2、不定参数
function g (a, b, ...) end
g(3) -- a=3, b=nil, arg={n=0} -- n为不定参数的个数
g(3, 4) -- a=3, b=4, arg={n=0}
g(3, 4, 5, 8) -- a=3, b=4, arg={5, 8; n=2}
进入正题
------------------------------------------ LUA应用消息订阅/发布接口 ------------------------------------------
-- 订阅者列表
local subscribers = {}
--内部消息队列
local messageQueue = {}
--- 订阅消息
-- @param id 消息id
-- @param callback 消息回调处理
-- @usage subscribe("NET_STATUS_IND", callback)
function subscribe(id, callback)
if type(id) ~="{s}{t}{r}\in{g}"{\quad\text{or}\quad}{\left({t}{y}{p}{e}{\left({c}{a}{l}{l}{b}{a}{c}{k}\right)}\right.}~= "function" and type(callback) ~= "thread") then
log.warn("warning: sys.subscribe invalid parameter", id, callback)
return
end
if not subscribers[id] then subscribers[id] = {} end -- 如果没有重复消息
subscribers[id][callback] = true --标记id和callback关系
end
--- 取消订阅消息
-- @param id 消息id
-- @param callback 消息回调处理
-- @usage unsubscribe("NET_STATUS_IND", callback)
function unsubscribe(id, callback)
if type(id) ~="{s}{t}{r}\in{g}"{\quad\text{or}\quad}{\left({t}{y}{p}{e}{\left({c}{a}{l}{l}{b}{a}{c}{k}\right)}\right.}~= "function" and type(callback) ~= "thread") then
log.warn("warning: sys.unsubscribe invalid parameter", id, callback)
return
end
if subscribers[id] then subscribers[id][callback] = nil end --删除id和callback关系
end
--- 发布内部消息,存储在内部消息队列中
-- @param ... 可变参数,用户自定义
-- @return 无
-- @usage publish("NET_STATUS_IND")
function publish(...)
table.insert(messageQueue, arg) -- 将不定参数插入队列中
end
-- 分发消息
local function dispatch()
while true do
if #messageQueue == 0 then --如果队列长度为 跳出循环
break
end
local message = table.remove(messageQueue, 1) --获取队列的第一个
if subscribers[message[1]] then --如果订消息存在
for callback, _ in pairs(subscribers[message[1]]) do
if type(callback) == "function" then
print("unpack",unpack(message, 2, #message))
callback(unpack(message, 2, #message)) -- 返回第二个到最后一个
elseif type(callback) == "thread" then
coroutine.resume(callback, unpack(message))
end
end
end
end
end
以sys.publish("TEST",a)和sys.subscribe("TEST",subCallBack),订阅者列表为local subscribers = {}。内部消息队列为local messageQueue = {}为例:
1、在publish函数中,将"TEST"消息和参数插入messageQueue列表中
此时messageQueue中为{{"TEST",2;n=1}}
2、在subscribe函数中判断消息和callback类型是否正确,如果正确则在subscribers中建立消息与回调函数之间的关系。
此时subscribers["TEST"][subCallBack] = true。表明TEST消息对应的回掉函数为subCallBack
3、在dispatch()函数中,获得表头列表。
local message = table.remove(messageQueue, 1)
此时message为{"TEST",2;n=1}
找到该消息对应的回调函数或消息。将message中的参数传给回调函数。
通过pairs遍历得到消息对应的回调函数或者任务。
如果callback是函数,那么将publish时候的参数传给回调函数。
如果callback是线程,那么唤醒该线程。
以上只是单个消息举例,多个消息同理,因为每次循环都会将messageQueue的头部出队列,满足FIFO原则。
在有上基础下容易的理解waitUntil()的实现
--- Task任务的条件等待函数(包括事件消息和定时器消息等条件),只能用于任务函数中。
-- @param id 消息ID
-- @number ms 等待超时时间,单位ms,最大等待126322567毫秒
-- @return result 接收到消息返回true,超时返回false
-- @return data 接收到消息返回消息参数
-- @usage result, data = sys.waitUntil("SIM_IND", 120000)
function waitUntil(id, ms)
subscribe(id, coroutine.running())
local message = ms and {wait(ms)} or {coroutine.yield()}
unsubscribe(id, coroutine.running())
return message[1] ~= nil, unpack(message, 2, #message)
end
1、订阅id,并传入线程号
2、阻塞线程,如果接收到了消息,那么返回message
3、取消订阅该id
4、返回结果
更多Luat教程,尽在Luat博客:http://blog.openluat.com
以上是关于LuaTask教程之消息订阅,发布的主要内容,如果未能解决你的问题,请参考以下文章