如何在多线程 C++ 中拆除观察者关系?

Posted

技术标签:

【中文标题】如何在多线程 C++ 中拆除观察者关系?【英文标题】:How do I tear down observer relationship in multithreaded C++? 【发布时间】:2009-02-10 20:26:43 【问题描述】:

我有一个为客户提供Subscribe(Observer*)Unsubscribe(Observer*) 的主题。主题在自己的线程中运行(它在订阅的观察者上调用Notify()),互斥体保护其内部观察者列表。

我希望客户端代码(我无法控制)能够在取消订阅后安全地删除观察者。如何实现?

持有互斥锁 - 甚至是递归的 mutex - 当我通知观察者时 不是一个选择,因为 死锁风险。 我可以将观察者标记为移除 在取消订阅电话中并将其删除 从主题线程。然后 客户可以等待特别的 “安全删除”通知。这 看起来很安全,但对于 客户。

编辑

下面是一些说明性代码。问题是如何防止在 Run 处于“Problem here”评论时发生取消订阅。然后我可以回调一个已删除的对象。或者,如果我始终持有互斥锁而不是制作副本,我可以使某些客户端死锁。

#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer

public:
    void Notify() 
;

class Subject

public:
    Subject() : t(bind(&Subject::Run, this))
    
    

    void Subscribe(Observer* o)
    
        mutex::scoped_lock l(m);
        observers.insert(o);
    

    void Unsubscribe(Observer* o)
    
        mutex::scoped_lock l(m);
        observers.erase(o);
    

    void Run()
    
        for (;;)
        
            WaitForSomethingInterestingToHappen();
            set<Observer*> notifyList;
            
                mutex::scoped_lock l(m);
                notifyList = observers;
            
            // Problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&Observer::Notify));
        
    

private:
    set<Observer*> observers;
    thread t;
    mutex m;
;

编辑

由于存在死锁风险,我无法在持有互斥锁时通知观察者。发生这种情况的最明显方式 - 客户端从 Notify 内部调用订阅或取消订阅 - 很容易通过使互斥体递归来解决。更隐蔽的是不同线程上出现间歇性死锁的风险。

我处于多线程环境中,因此在线程执行的任何时候,它通常都会持有一系列锁 L1、L2、... Ln。另一个线程将持有锁 K1, K2, ... Km。正确编写的客户端将确保不同的线程总是以相同的顺序获取锁。但是当客户端与我的主题的互斥体交互时——称之为 X——这个策略将被打破:订阅/取消订阅的调用按 L1、L2、... Ln、X 的顺序获取锁。从我的主题线程调用通知获取锁X, K1, K2, ... Km 的顺序。如果 Li 或 Kj 中的任何一个可以在任何调用路径上重合,则客户端将遭受间歇性死锁,几乎无法对其进行调试。由于我不控制客户端代码,所以我不能这样做。

【问题讨论】:

我错过了一些东西。特定观察者何时会取消订阅其他观察者?特定的观察者应该只取消订阅自己。通知列表中只能有相同的观察者一次,那么已经触发的通知怎么会变得陈旧? 客户端可以随时取消订阅观察者,而不仅仅是在通知中。正是在这种一般情况下,我们才能与主题线程竞争。 @fizzer:这里不清楚你是如何实现内存安全的。你如何保证observers中的所有Observer*都指向Observer的有效实例? @Matthieu。这只是客户的责任。假设在调用 Subscribe() 的线程上分配了一个实例,为了简单起见,同一个客户端线程想要在调用 Unsubscribe() 后销毁该实例。何时执行删除是安全的(如果有的话)是问题的重点之一。 【参考方案1】:

Unsubscribe() 应该是同步的,这样它就不会返回,直到 Observer 被保证不再在 Subject 的列表中。这是安全的唯一方法。

ETA(将我的评论移至答案):

由于时间似乎不是问题,因此在通知每个观察者之间获取并释放互斥锁。您将无法像现在这样使用 for_each,您必须检查迭代器以确保它仍然有效。

for ( ... )

    take mutex
    check iterator validity
    notify
    release mutex

这会做你想做的。

【讨论】:

由于时间似乎不是问题,在通知每个观察者之间获取并释放互斥锁。 由于死锁风险,我想避免在调用客户端代码时持有互斥锁。 您调用的客户端代码应该在您的线程上下文中执行,所以这应该不是问题。如果它尝试采用相同的互斥锁,因为它是同一个线程,它会成功。 在不知道其他线程在做什么的情况下,您无法知道迭代器是否有效。一旦你失去了对容器的独占访问权,你就必须悲观并假设所有迭代器和对其中元素的引用都是无效的。 我认为如果指向观察者的指针仍在观察者列表中,则可以相信它是可用的。因此,一旦您使用了互斥锁,唯一需要检查的是您当前的迭代器是否仍然很好,例如观察者*仍在列表中。【参考方案2】:

你能把 Subscribe() 的签名改成 Unsubscribe() 吗?将 Observer* 替换为 shared_ptr 之类的东西会让事情变得更容易。

编辑:将上面的“easy”替换为“easy”。 有关如何难以“正确”的示例,请参阅 Boost.Signals 和 adopted-but-not-yet-in-the-distribution Boost.Signals2(以前称为 Boost.ThreadSafeSignals)库的历史.

【讨论】:

我想到了这个——我什至可以在 Subject 中保存 weak_ptr,客户甚至不需要取消订阅。不幸的是,我无法更改此界面,但我以后一定会这样做。 我使用 shared_ptr 和 weak_ptr 构建了很多类似的东西,它们运行良好。太糟糕了,您无法更改 API。 虽然这解决了“谁删除谁”资源问题,但如果不解决同步问题,您仍然会遇到通知发生在已取消订阅的观察者上的情况 - 也许这不是问题. @fizzer:请注意,通过使用enable_shared_from_this 作为Observer 的基类,您可以轻松地将其改装到您的设计中(它将与客户端代码源兼容)。 【参考方案3】:

“理想”的解决方案包括使用shared_ptrweak_ptr。但是,为了通用,它还必须考虑到 Subject 在其某些 Observer 之前被删除的问题(是的,这也可能发生)。

class Subject 
public:
    void Subscribe(std::weak_ptr<Observer> o);
    void Unsubscribe(std::weak_ptr<Observer> o);

private:
    std::mutex mutex;
    std::set< std::weak_ptr<Observer> > observers;
;

class Observer: boost::noncopyable 
public:
    ~Observer();

    void Notify();

private:
    std::mutex;
    std::weak_ptr<Subject> subject;
;

通过这种结构,我们创建了一个循环图,但明智地使用了weak_ptr,以便ObserverSubject 可以在不协调的情况下被破坏。

注意:为简单起见,我假设 Observer 一次只观察一个 Subject,但它可以轻松观察多个主题。


现在,您似乎陷入了不安全的内存管理。这是一个相当困难的情况,你可以想象。在这种情况下,我建议做一个实验:异步Unsubscribe。或者至少,对Unsubscribe 的调用将从外部同步,但异步实现。

想法很简单:我们将使用事件队列来实现同步。那就是:

Unsubscribe 的调用在队列中发布一个事件(有效负载Observer*),然后等待 当Subject线程处理完Unsubscribe事件后,它会唤醒等待中的线程

您可以使用忙等待或条件变量,除非性能另有规定,否则我建议使用条件变量。

注意:此解决方案完全无法解释 Subject 过早死亡。

【讨论】:

我不想假设一个事件队列,或者对客户端强加特定的所有权语义,但我喜欢 Unsubscribe() 等待通知线程确认取消订阅的想法。【参考方案4】:

与其让客户端收到“SafeToDelete”通知,不如为他们提供 IsSubscribed(Observer *) 方法。客户端代码则变为:

subject.Unsubscribe( obsever );l
while( subject.IsSubscribed( observer ) ) 
   sleep_some_short_time;   // OS specific sleep stuff

delete observer;

不会太繁琐。

【讨论】:

如果 IsSubscribed() 只是在观察者集合中查找,它仍然是坏的。当主题线程在我发布的示例代码中的“这里的问题”注释处睡着时,整个事情就可以完成。【参考方案5】:

您可以在 CSubject 类型中创建一个“待删除队列”。当你移除 Observer 时,你可以调用 pSubject->QueueForDelete(pObserver)。然后当主题线程在通知之间时,它可以安全地从队列中删除观察者。

【讨论】:

【参考方案6】:

嗯...我不太明白您的问题,因为如果客户致电取消订阅,您应该能够让客户删除它(您不使用它)。但是,如果由于某种原因您无法在客户端取消订阅观察者后关闭关系,您可以添加“主题”一个新操作以安全地删除观察者,或者只是让客户端表示他们不再对观察者感兴趣.

重新思考编辑:好的,现在我想我明白你的问题是什么了。我认为您的问题的最佳解决方案是执行以下操作:

    让每个存储的观察者元素都有一个“有效”标志。当您处于通知循环中时,此标志将用于通知它与否。 您需要一个互斥锁来保护对该“有效”标志的访问。然后,取消订阅操作为“有效”标志锁定互斥锁,为选定的观察者将其设置为 false。 通知循环还必须锁定和解锁有效标志的互斥锁,并且只对“有效”的观察者起作用。

鉴于取消订阅操作将阻塞互斥体以重置有效标志(并且该特定观察者将不再在您的线程中使用),代码是线程安全的,客户端可以尽快删除任何观察者因为退订已经返回。

【讨论】:

【参考方案7】:

这样的事情会令人满意吗?不过,在收到通知时取消订阅观察者仍然不安全,因为您需要一个像您提到的那样的界面(据我所知)。

Subscribe(Observer *x)

    mutex.lock();
    // add x to the list
    mutex.unlock();


Unsubscribe(Observer *x)

    mutex.lock();
    while (!ok_to_delete)
        cond.wait(mutex);
    // remove x from list
    mutex.unlock();


NotifyLoop()

    while (true) 
        // wait for something to trigger a notify

        mutex.lock();
        ok_to_delete = false;
        // build a list of observers to notify
        mutex.unlock();

        // notify all observers from the list saved earlier

        mutex.lock();
        ok_to_delete = true;
        cond.notify_all();
        mutex.unlock();
    

如果您希望能够在 Notify() 中取消订阅() - (对客户端 IMO 的一个糟糕的设计决定...) 您可以将通知线程的线程 ID 添加到您的数据结构。在 Unsubscribe 函数中,您可以根据当前线程的 id 检查该线程 id(大多数线程库都提供了这个 - 例如 pthread_self)。如果它们相同,则无需等待条件变量即可继续。

注意:如果客户端负责删除观察者,这意味着您会遇到在通知回调内部的情况,您将取消订阅并删除观察者,但仍在使用该指针执行某些操作。这是客户必须注意的事情,并且只能在 Notify() 结束时将其删除。

【讨论】:

但是如果 Observer Unsubscribe() 自身从其 Notify 回调中退出,那不会出现死锁吗? +1 因为它解决了问题的症结所在。不过,我希望客户能够订阅回调,所以我需要更多的思考。可怕的是没有一个众所周知的习语来解决这个问题。我每天都看到这种模式,总是被打破。 fizzer - 从回调订阅很好。订阅时您不需要等待条件变量,只需等待互斥锁(在调用 Notify() 之前显式释放。只有取消订阅才是问题所在。 对不起,我在晚上的最后一件事上发表了评论。我想取消订阅从回调中工作。取消订阅以响应事件对于客户来说是一件非常合理的事情。 对延迟回复表示歉意。我认为它几乎就在那里,除了条件需要阻止客户端返回(并可能调用删除)。所以 ok_to_delete 在 Notify 循环中需要为 false,客户端需要等待 after 'remove x from list'【参考方案8】:

如果不是很优雅,我认为这可以解决问题:

class Subject 
public:
Subject() : t(bind(&Subject::Run, this)),m_key(0)        
void Subscribe(Observer* o) 
    mutex::scoped_lock l(m);
    InternalObserver io( o );
    boost::shared_ptr<InternalObserver> sp(&io);
    observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));


void Unsubscribe(Observer* o) 
    mutex::scoped_lock l(m);
    observers.find( MakeKey(o) )->second->exists = false;    

void WaitForSomethingInterestingToHappen() 
void Run()

    for (;;)
    
        WaitForSomethingInterestingToHappen();
        for( unsigned int i = 0; i < observers.size(); ++ i )
        
            mutex::scoped_lock l(m);
            if( observers[i]->exists )
            
                mem_fun(&Observer::Notify);//needs changing
            
            else
            
                observers.erase(i);
                --i;
            
        
    

private:

int MakeKey(Observer* o) 
    return ++m_key;//needs changeing, sha of the object?

class InternalObserver 
public:
    InternalObserver(Observer* o) : m_o( o ), exists( true ) 
    Observer* m_o;
    bool exists;
;

map< int, boost::shared_ptr<InternalObserver> > observers;
thread t;
mutex m;
int m_key;
;

【讨论】:

【参考方案9】:

observers 更改为map,键为Observer*,并赋值Observer 的包装器。包装器包含一个 volatile 布尔值以指示 Observer 是否有效。在subscribe 方法中,包装对象以有效 状态创建。在unsubscribe 方法中,包装器被标记为invalidNotifywrapper 而不是 actual Observer 上调用。如果有效(仍然订阅),包装器将在 actual Observer 上调用 Notify

#include <map>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer

public:
    void Notify() 
;

class ObserverWrapper : public Observer

public:
    Observer* wrappee;
    volatile bool valid;
    ObserverWrapper(Observer* o) 
    
        wrappee = o;
        valid = true;
    

    void Notify() 
    
        if (valid) wrappee->Notify();
    

class Subject

public:
    Subject() : t(bind(&Subject::Run, this))
    
    

    void Subscribe(Observer* o)
    
        mutex::scoped_lock l(m);
        boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
        observers.insert(pair<Observer*, sptr));
    

    void Unsubscribe(Observer* o)
    
        mutex::scoped_lock l(m);
        observers.find(o)->second->valid = false;
        observers.erase(o);
    

    void Run()
    
        for (;;)
        
            WaitForSomethingInterestingToHappen();
            vector<ObserverWrapper*> notifyList;
            
                mutex::scoped_lock l(m);
                boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
            
            // Should be no problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&ObserverWrapper::Notify));
        
    

private:
    map<Observer*, ObserverWrapper*> observers;
    thread t;
    mutex m;
;

【讨论】:

如果主题线程在“这里应该没问题”行被挂起,没有什么可以阻止客户端线程取消订阅。 @fizzer 是的,可以取消订阅。我相信这应该不是问题,因为取消订阅操作会将 ObserverWrapper 中的有效标志设置为 false。所以不会在 Observer 上调用 notify。 我很抱歉。该漏洞介于测试有效标志和在 wrappee 上调用 Notify 之间。 确实!我确实错过了。

以上是关于如何在多线程 C++ 中拆除观察者关系?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 volatile 在多线程 C 或 C++ 编程中没有用?

在多线程 C++ 中捕获进程的输出

在多线程 C++ 应用程序中测量时间

在多线程 C++ 程序中使用 std::vector 时应用程序崩溃

Reactor中的Thread和Scheduler

c++的queue在多线程下崩溃原因分析