broadcast-service一个轻量级Python发布订阅者框架

Posted 帅气的黑桃J

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了broadcast-service一个轻量级Python发布订阅者框架相关的知识,希望对你有一定的参考价值。

本文节选至本人博客:https://www.blog.zeeland.cn/archives/broadcast-service-description

Introduction

前两天在Python最佳实践-构建自己的第三方库文章中介绍了自己构建的一个轻量级的Python发布订阅者框架,今天来简单介绍一下。

项目地址:

broadcast-service

broadcast-service是一个轻量级Python发布订阅者框架,事实上,我更愿意称其为广播模式,因为该框架更加的轻量级,通过broadcast-service,只需要引入一个单例类,你就可以十分轻松地构建起一个发布订阅者模式,几乎没有代码侵入性。

Install

执行以下语句即可安装该库。

pip install broadcast-service

Usage

下面是一个简单的发布订阅者实现:

from broadcast_service import broadcast_service

def handle_msg():
    print('handle_msg callback')

if __name__ == '__main__':
    # listen topic
    broadcast_service.listen('Test', handle_msg)

    # publish broadcast
    broadcast_service.broadcast('Test')

如果你想要在发布topic的时候传入对应的参数,你只需要将参数传入broadcast(topic_name, params) params中。

from broadcast_service import broadcast_service

def handle_msg(params):
    print(params)

if __name__ == '__main__':
    info = 'This is very important msg'

    # listen topic
    broadcast_service.listen('Test', handle_msg)

    # publish broadcast
    broadcast_service.broadcast('Test', info)

from broadcast_service import broadcast_service

def handle_msg(info, info2):
    print(info)
    print(info2)

if __name__ == '__main__':
    info = 'This is very important msg'
    info2 = 'This is also a very important msg.'

    # listen topic
    broadcast_service.listen('Test', handle_msg)

    # publish broadcast
    broadcast_service.broadcast('Test', info, info2)

此外,broadcast-service默认支持异步回调的方式,避免了现成堵塞的发生,下面是一个形象的demo演示。

    One day, leader Tom arrive the company but find not one staff in company
  because all staff are playing outside. Therefor, Tom send a message
  everyone must must return to company now. Staff Jack, Jasmine, Jane go back
  now when they receive it. Actually, they need different time to go back
  in different places. When they return, they need to declare that they are back.
  ---
  有一天,领导Tom来到公司,却发现公司里没有一个员工,
  因为所有的员工都在外面玩耍。于是,Tom发了一条信息,
  现在每个人都必须回到公司。Jack,Jasmine,Jane必须现在回去
  事实上,当他们收到信息时,他们正处在不同的地方,每个人回
  去需要花费的时间不同,并且回来后他们需要声明他们回来了。
from broadcast_service import broadcast_service
import time
import random

class Application:
    """
    This demo aim to show how to use broadcast-service.
    Scene:
        One day, leader Tom arrive the company but find not one staff in company
        because all staff are playing outside. Therefor, leader Tom send a message
        everyone must must return to company now. Staff Jack, Jasmine, Jane go back
        now when they receive it. Actually, they need different time to go back
        in different places. When they return, they need to declare that they are back.
    Attention:
        broadcast-service is asynchronous by defalut. If you want to close the async
        state. You can use:
            broadcast_service.enable_async = False
        to close the async statement.

    """
    def __init__(self):
        self.leader = Leader('Tom')
        self.staff1 = Staff('Jack')
        self.staff2 = Staff('Jasmine')
        self.staff3 = Staff('Jane')
        self.current_time = None

    def run(self):
        self.current_time = time.time()
        self.leader.notice_go_back()

def print_time():
    return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

class Leader:
    def __init__(self, name):
        self.name = name

    def notice_go_back(self):
        print('[1] 0(leader) notice meeting now.'.format(self.name, print_time()))
        meeting_info = 'You guys must go back now!'
        broadcast_service.broadcast('meeting', meeting_info)

class Staff:
    def __init__(self, name):
        self.name = name
        self.rec_msg()

    def rec_msg(self):
        broadcast_service.listen('meeting', self.go_back)

    def go_back(self, info):
        print("[2] 0(staff) receive msg '1' and go back now.".format(self.name, info, print_time()))
        time.sleep(random.randint(1, 5))
        print('[1] 0(staff) is back now.'.format(self.name, print_time()))

if __name__ == '__main__':
    app = Application()
    app.run()

事实上,从如下结果可以看到,该主体发布时,订阅者可以同时响应订阅的主题,而不会产生线程堵塞的问题。

以上是关于broadcast-service一个轻量级Python发布订阅者框架的主要内容,如果未能解决你的问题,请参考以下文章

JSON 获取属性值的方法

Python实现MQTT接收订阅数据

一篇长文带你在python里玩转Json数据

saltstack安装部署与入门使用

Python 协程学习有点难度?这篇文字值得你去收藏

Python 协程学习有点难度?这篇文字值得你去收藏