Python 观察者模式:示例、提示? [关闭]

Posted

技术标签:

【中文标题】Python 观察者模式:示例、提示? [关闭]【英文标题】:Python Observer Pattern: Examples, Tips? [closed] 【发布时间】:2010-12-26 15:16:38 【问题描述】:

是否有任何用 Python 实现的 GoF Observer 示例?我有一个位代码,目前有一些调试代码通过键类绑定(如果设置了魔法环境,则当前向 stderr 生成消息)。此外,该类有一个接口用于增量返回结果以及将它们存储(在内存中)以进行后期处理。 (该类本身是一个作业管理器,用于通过 ssh 在远程机器上同时执行命令。

目前该类的用法如下:

job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
    for each in job.poll():
        incrementally_process(job.results[each])
        time.sleep(0.2) # or other more useful work
post_process(job.results)

另一种使用模式是:

job = SSHJobMan(hostlist, cmd)
job.wait()  # implicitly performs a start()
process(job.results)

这一切都适用于当前实用程序。但是,它确实缺乏灵活性。比如我目前支持一个简短的输出格式或者一个进度条作为增量结果,我也支持 post_process() 函数的简短、完整和“合并消息”输出。

但是,我想支持多个结果/输出流(终端的进度条、日志文件的调试和警告、成功作业的输出到一个文件/目录、错误消息和其他不成功的结果工作到另一个人,等等)。

这听起来像是需要观察者的情况......让我的类的实例接受来自其他对象的注册,并在它们发生时用特定类型的事件回调它们。

我正在查看PyPubSub,因为我在 SO 相关问题中看到了几个对它的引用。我不确定我是否准备好将外部依赖项添加到我的实用程序中,但我可以看到使用他们的接口作为我的模型的价值,如果这会让其他人更容易使用的话。 (该项目既是一个独立的命令行实用程序,也是一个用于编写其他脚本/实用程序的类)。

简而言之,我知道如何做我想做的事......但是有很多方法可以完成它。我想就从长远来看最有可能对代码的其他用户起作用的建议提出建议。

代码本身位于:classh。

【问题讨论】:

【参考方案1】:

但它确实缺乏灵活性。

嗯...实际上,如果您想要异步 API,这对我来说是一个不错的设计。通常是这样。也许你需要的只是从 stderr 切换到 Python 的 logging 模块,它有自己的发布/订阅模型,Logger.addHandler() 等等。

如果您确实想支持观察者,我的建议是保持简单。你真的只需要几行代码。

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.iteritems():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

您的 Job 类可以继承 Observable。当感兴趣的事情发生时,请拨打self.fire(type="progress", percent=50) 或类似的电话。

【讨论】:

Heh - 我打算删除关于 logging 的评论,因为我已经添加了另一个带有示例代码的答案。但是现在这个答案有复选标记,也许我会不理会它。 :) 如果我想观察一个对象的绑定方法怎么办?我真的很喜欢这个实现,但我看不出如何将它应用于不太具体的用例。我在下面发布了我的实现。它更复杂,但它通过一个非常简单的接口涵盖了观察函数和绑定方法。 @DanielSank 你总是可以将它包装在一个装饰器中作为一个方法,它应该可以正常工作【参考方案2】:

我认为其他答案中的人做得过火了。只需不到 15 行代码,即可在 Python 中轻松实现事件。

你有两个类:EventObserver。任何想要监听事件的类,都需要继承 Observer 并设置为监听(观察)特定事件。当 Event 被实例化并触发时,所有监听该事件的观察者都将运行指定的回调函数。

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observables = 
    def observe(self, event_name, callback):
        self._observables[event_name] = callback


class Event():
    def __init__(self, name, data, autofire = True):
        self.name = name
        self.data = data
        if autofire:
            self.fire()
    def fire(self):
        for observer in Observer._observers:
            if self.name in observer._observables:
                observer._observables[self.name](self.data)

示例

class Room(Observer):

    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # Observer's init needs to be called
    def someone_arrived(self, who):
        print(who + " has arrived!")

room = Room()
room.observe('someone arrived',  room.someone_arrived)

Event('someone arrived', 'Lenard')

输出:

Room is ready.
Lenard has arrived!

【讨论】:

【参考方案3】:

更多方法...

示例:日志记录模块

也许您只需要从 stderr 切换到 Python 的 logging 模块,它具有强大的发布/订阅模型。

很容易开始生成日志记录。

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")

在消费者方面还有一些工作要做。不幸的是,配置记录器输出需要 7 整行代码。 ;)

# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()

另一方面,日志记录包中有大量的东西。如果您需要将日志数据发送到一组轮换文件、一个电子邮件地址和 Windows 事件日志,那么您将得到满足。

示例:最简单的观察者

但是您根本不需要使用任何库。支持观察者的一个极其简单的方法是调用一个什么都不做的方法。

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

有时你可以直接说job.on_progress = progressBar.update,而不是写一个lambda,这很好。

这很简单。一个缺点是它自然不支持订阅相同事件的多个侦听器。

示例:类 C# 事件

通过一些支持代码,您可以在 Python 中获取类似 C# 的事件。代码如下:

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)

生产者使用装饰器声明事件:

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

这与上面的“简单观察者”代码完全相同,但您可以使用+= 添加任意数量的侦听器。 (与 C# 不同,没有事件处理程序类型,订阅事件时不必new EventHandler(foo.bar),也不必在触发事件之前检查 null。与 C# 一样,事件不会抑制异常。 )

如何选择

如果logging 满足您的所有需求,请使用它。否则,做最适合你的最简单的事情。需要注意的关键是您不需要承担很大的外部依赖。

【讨论】:

在Python代码中,为什么@event是一个描述符?看起来它也可以是一个简单的装饰器,不是吗? @miracle2k 我不知道怎么做。描述符的重点是支持 C# += 语法。 我喜欢类似 c# 的事件语法 + 一个不错的装饰器示例【参考方案4】:

如果对象不会仅仅因为它们正在观察某些东西而保持活动状态,那又如何呢?下面请找到具有以下特性的观察者模式的实现:

    用法是pythonic。要将观察者添加到实例 .bar 的绑定方法 foo,只需执行 foo.bar.addObserver(observer)。 观察者不能因为是观察者而保持活力。换句话说,观察者代码不使用强引用。 不需要子分类(描述符 ftw)。 可用于不可散列的类型。 可以在一个类中多次使用。 (奖励)截至今天,代码存在于正确的可下载、可安装的package on github 中。

这是代码(github package 或 PyPI package 具有最新的实现):

import weakref
import functools

class ObservableMethod(object):
    """
    A proxy for a bound method which can be observed.

    I behave like a bound method, but other bound methods can subscribe to be
    called whenever I am called.
    """

    def __init__(self, obj, func):
        self.func = func
        functools.update_wrapper(self, func)
        self.objectWeakRef = weakref.ref(obj)
        self.callbacks =   #observing object ID -> weak ref, methodNames

    def addObserver(self, boundMethod):
        """
        Register a bound method to observe this ObservableMethod.

        The observing method will be called whenever this ObservableMethod is
        called, and with the same arguments and keyword arguments. If a
        boundMethod has already been registered to as a callback, trying to add
        it again does nothing. In other words, there is no way to sign up an
        observer to be called back multiple times.
        """
        obj = boundMethod.__self__
        ID = id(obj)
        if ID in self.callbacks:
            s = self.callbacks[ID][1]
        else:
            wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
            s = set()
            self.callbacks[ID] = (wr, s)
        s.add(boundMethod.__name__)

    def discardObserver(self, boundMethod):
        """
        Un-register a bound method.
        """
        obj = boundMethod.__self__
        if id(obj) in self.callbacks:
            self.callbacks[id(obj)][1].discard(boundMethod.__name__)

    def __call__(self, *arg, **kw):
        """
        Invoke the method which I proxy, and all of it's callbacks.

        The callbacks are called with the same *args and **kw as the main
        method.
        """
        result = self.func(self.objectWeakRef(), *arg, **kw)
        for ID in self.callbacks:
            wr, methodNames = self.callbacks[ID]
            obj = wr()
            for methodName in methodNames:
                getattr(obj, methodName)(*arg, **kw)
        return result

    @property
    def __self__(self):
        """
        Get a strong reference to the object owning this ObservableMethod

        This is needed so that ObservableMethod instances can observe other
        ObservableMethod instances.
        """
        return self.objectWeakRef()


class ObservableMethodDescriptor(object):

    def __init__(self, func):
        """
        To each instance of the class using this descriptor, I associate an
        ObservableMethod.
        """
        self.instances =   # Instance id -> (weak ref, Observablemethod)
        self._func = func

    def __get__(self, inst, cls):
        if inst is None:
            return self
        ID = id(inst)
        if ID in self.instances:
            wr, om = self.instances[ID]
            if not wr():
                msg = "Object id %d should have been cleaned up"%(ID,)
                raise RuntimeError(msg)
        else:
            wr = weakref.ref(inst, Cleanup(ID, self.instances))
            om = ObservableMethod(inst, self._func)
            self.instances[ID] = (wr, om)
        return om

    def __set__(self, inst, val):
        raise RuntimeError("Assigning to ObservableMethod not supported")


def event(func):
    return ObservableMethodDescriptor(func)


class Cleanup(object):
    """
    I manage remove elements from a dict whenever I'm called.

    Use me as a weakref.ref callback to remove an object's id from a dict
    when that object is garbage collected.
    """
    def __init__(self, key, d):
        self.key = key
        self.d = d

    def __call__(self, wr):
        del self.d[self.key]

要使用它,我们只需使用@event 装饰我们想要使其可观察的方法。这是一个例子

class Foo(object):
    def __init__(self, name):
        self.name = name

    @event
    def bar(self):
        print("%s called bar"%(self.name,))

    def baz(self):
        print("%s called baz"%(self.name,))

a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()

【讨论】:

【参考方案5】:

来自wikipedia:

from collections import defaultdict

class Observable (defaultdict):

  def __init__ (self):
      defaultdict.__init__(self, object)

  def emit (self, *args):
      '''Pass parameters to all observers and update states.'''
      for subscriber in self:
          response = subscriber(*args)
          self[subscriber] = response

  def subscribe (self, subscriber):
      '''Add a new subscriber to self.'''
      self[subscriber]

  def stat (self):
      '''Return a tuple containing the state of each observer.'''
      return tuple(self.values())

Observable 就是这样使用的。

myObservable = Observable ()

# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)

# emit parameters to each observer
myObservable.emit(6, 2)

# get updated values
myObservable.stat()         # returns: (8, 3.0, 4, 12)

【讨论】:

对于defaultdict.__init__(self, default)lambda before_emit=object(): before_emit 可能是比object 更好的默认值。因此,第一次调用 emit() 之前的所有值都是相同的,并且它们与订阅者的任何可能响应不同。 @JFS 我相信他们会让你改进***的例子。 我在发布之前阅读了 Wikipedia 示例。它似乎不适用于我的需求,也似乎没有描述 Python 社区的一些常见或最佳实践。此外,我认为 defaultdict 和 self[subscriber] 的使用是难以理解的,并且由于我选择与 Python 2.4.x 兼容(通常随 Linux 发行版一起提供——我的主要目标受众)而被排除在外。 @Jim Denis:请用此评论更新您的问题。这是改变答案的信息。【参考方案6】:

根据 Jason 的回答,我将类似 C# 的事件示例实现为一个成熟的 python 模块,包括文档和测试。我喜欢花哨的pythonic东西:)

所以,如果您想要一些现成的解决方案,您可以使用code on github。

【讨论】:

【参考方案7】:

示例:twisted log observers

注册一个观察者yourCallable()(一个接受字典的可调用对象)来接收所有日志事件(除了任何其他观察者):

twisted.python.log.addObserver(yourCallable)

示例:complete producer/consumer example

来自 Twisted-Python 邮件列表:

#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""

import random

from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

class Producer:
    """Send back the requested number of random integers to the client."""
    implements(interfaces.IPushProducer)
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
        self._paused = True
        print('pausing connection from %s' % (self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
            self._proto.transport.unregisterProducer()
            self._proto.transport.loseConnection()
    def stopProducing(self):
        pass

class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
        producer.resumeProducing()
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()

【讨论】:

我喜欢他们如何记录对观察者的限制(以及他们如何从订阅者列表中删除任何引发可调用的异常。我不确定“addObservable()”是我的首选名称;但如果我听到呼吁,我会遵从社区的意见。 @Jim Dennis:ObserverObservable 是不同的野兽。前者观察后者。【参考方案8】:

OP 询问“是否有任何用 Python 实现的 GoF Observer 示例?” 这是 Python 3.7 中的一个示例。这个 Observable 类满足在一个 observable许多观察者 之间创建关系同时保持独立它们的结构的要求。

from functools import partial
from dataclasses import dataclass, field
import sys
from typing import List, Callable


@dataclass
class Observable:
    observers: List[Callable] = field(default_factory=list)

    def register(self, observer: Callable):
        self.observers.append(observer)

    def deregister(self, observer: Callable):
        self.observers.remove(observer)

    def notify(self, *args, **kwargs):
        for observer in self.observers:
            observer(*args, **kwargs)


def usage_demo():
    observable = Observable()

    # Register two anonymous observers using lambda.
    observable.register(
        lambda *args, **kwargs: print(f'Observer 1 called with args=args, kwargs=kwargs'))
    observable.register(
        lambda *args, **kwargs: print(f'Observer 2 called with args=args, kwargs=kwargs'))

    # Create an observer function, register it, then deregister it.
    def callable_3():
        print('Observer 3 NOT called.')

    observable.register(callable_3)
    observable.deregister(callable_3)

    # Create a general purpose observer function and register four observers.
    def callable_x(*args, **kwargs):
        print(f'args[0] observer called with args=args, kwargs=kwargs')

    for gui_field in ['Form field 4', 'Form field 5', 'Form field 6', 'Form field 7']:
        observable.register(partial(callable_x, gui_field))

    observable.notify('test')


if __name__ == '__main__':
    sys.exit(usage_demo())

【讨论】:

【参考方案9】:

观察者设计的功能性方法:

def add_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    # If this is the first listener, then set up the method wrapper
    if not listeners:

        listeners = [listener]
        setattr(obj, listener_attr, listeners)

        # Get the object's method
        method = getattr(obj, method_name)

        @wraps(method)
        def method_wrapper(*args, **kwags):
            method(*args, **kwags)
            for l in listeners:
                l(obj, *args, **kwags) # Listener also has object argument

        # Replace the original method with the wrapper
        setattr(obj, method_name, method_wrapper)

    else:
        # Event is already set up, so just add another listener
        listeners.append(listener)


def remove_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    if listeners:
        # Remove the listener
        next((listeners.pop(i)
              for i, l in enumerate(listeners)
              if l == listener),
             None)

        # If this was the last listener, then remove the method wrapper
        if not listeners:
            method = getattr(obj, method_name)
            delattr(obj, listener_attr)
            setattr(obj, method_name, method.__wrapped__)

然后可以使用这些方法将侦听器添加到任何类方法。例如:

class MyClass(object):

    def __init__(self, prop):
        self.prop = prop

    def some_method(self, num, string):
        print('method:', num, string)

def listener_method(obj, num, string):
    print('listener:', num, string, obj.prop)

my = MyClass('my_prop')

add_listener(my, 'some_method', listener_method)
my.some_method(42, 'with listener')

remove_listener(my, 'some_method', listener_method)
my.some_method(42, 'without listener')

输出是:

method: 42 with listener
listener: 42 with listener my_prop
method: 42 without listener

【讨论】:

以上是关于Python 观察者模式:示例、提示? [关闭]的主要内容,如果未能解决你的问题,请参考以下文章

RxJava系列2(基本概念及使用介绍)

Python观察者设计模式[关闭]

观察者模式

观察者模式

观察者模式

观察者模式