跨平台事件处理 - std::condition_variable wait_for 似乎忽略了超时

Posted

技术标签:

【中文标题】跨平台事件处理 - std::condition_variable wait_for 似乎忽略了超时【英文标题】:Cross platform Event handling - std::condition_variable wait_for seems ignores timeout 【发布时间】:2015-03-26 12:59:21 【问题描述】:

我正在移植一些使用本机 MS API 的代码,并且我已经实现了一些尝试使用 CreateEventSetEventWaitForSingleObjectWaitForMultipleObjects 等模拟事件处理的东西...

我使用了std::condition_variable,因为在我的情况下,它只需要在同一个应用程序中工作即可进行线程同步。

在我的测试应用程序中,一切似乎都在正常工作,但是当我在生产代码中尝试相同的代码时,调用 wait_for 会破坏代码,尽管没有发生异常。当我逐步完成它时,它似乎还在继续,然后它永远不会超时。我在调试配置中打开了所有异常,没有任何信号。我不知道是什么原因造成的......

我在 VS2013 的 Win7 机器上测试这个

代码分为HandlerEvent两个类,其中Handler保存了Event的固定数组。 Event::WaitForSingleObject 是我实际在 condition_variable 上等待的地方,它可以直接调用,也可以从 Handler::WaitForMultipleObjects 调用,超时时间最短,以便验证列表中的所有事件。

我有多个线程访问 Handler 对象,但不能同时访问数组元素(这是设计使然),因此对数组的访问不受保护。

这是实现尝试处理事件处理的代码的头文件 (concurrency2.h):

#ifndef AAAAAA
#define AAAAAA

#include<thread>
#include<future>
#include<mutex>
#include<array>

static const unsigned k_INFINITE = 0xFFFFFFFF;

namespace cross

	class Event;

	using EventId = int;	//prefer alias declaration to typedefs
	typedef Event* Event_handle;
	
	class Event
	
		std::string m_name;
		
		//notification agent for event notification (this is what other threads will use to notify the event)
		std::mutex m_cvmutex;
		std::condition_variable m_cv;
		bool m_signaled;
		bool m_waiting;

		//notification agents for event termination (used internally to force an event to be signaled so we can kill the event)
		std::mutex m_tcvmutex;
		std::condition_variable m_tcv;
		bool m_tcv_signaled;
		bool m_terminating;

		Event(void) 

		int SignalTermination()
		
			m_tcv.notify_all();
			m_tcv_signaled = true;
			return -2;
		
		void WaitForTermination()
		
			if (!m_tcv_signaled)
			
				std::unique_lock<std::mutex> lk(m_tcvmutex);
				m_tcv.wait(lk);	//wait for ever
				lk.unlock();
			
		

	public:
		Event(std::string _name) : 
			m_name(_name), 
			m_signaled(false),
			m_waiting(false),
			m_terminating(false),
			m_tcv_signaled(false)
		
		
		void SetEvent(bool kill=false)
		
			m_cv.notify_all();
			m_signaled = true;
			m_terminating = kill;

			std::cout << name() << " notify... " << std::endl;

			if (m_terminating)
			
				WaitForTermination();
			
		

		void ResetEvent()
		
			m_signaled = false;
		
		
		bool IsWaiting()
		
			return m_waiting;
		

		const char* name()
		
			return m_name.c_str();
		

		/*
			returns:
				 0 if event was triggered
				-1 if timeout has occured
				-2 if I'm trying to get rid of this

		*/
		int WaitForSingleObject(unsigned timeout = k_INFINITE)
		
			if (m_terminating)
			
				return SignalTermination();
			

			int ret = -1;

			if (m_signaled)
			
				ret = 0;
				//m_set_before = false;
			
			else
			
				m_waiting = true;

				std::unique_lock<std::mutex> lk(m_cvmutex, std::try_to_lock);

				//std::cout << "wait... " ;
				if (timeout == k_INFINITE)
				
					m_cv.wait(lk);
					ret = 0;
				
				else
				
					auto wait = m_cv.wait_for(lk, std::chrono::milliseconds(timeout));

					if (wait == std::cv_status::timeout)
					
						ret = -1;
					
					else if (wait == std::cv_status::no_timeout)
					
						ret = 0;
					
				

				lk.unlock();

				m_waiting = false;
			

			if (m_terminating)
			
				ret = SignalTermination();
			

			return ret;
		

		
	;

	class Handler
	
	public:
		Handler() : 
			count(0),
			wait_for_multiple_objects_timeout(100)
		
			m_events.fill(nullptr);
		
		~Handler() 
		
			//for (const auto& ev : m_events)
			for (auto ev : m_events)
			
				delete ev; 
				//ev = NULL; this doesn't work inside a range based loop
			
		

		EventId CreateEvent(char* name)
		
			m_events[count] = new Event(name);
			
			return count++;	//the event id will be the index in the events vector. TODO This needs to be smarter!
		
		void SetEvent(EventId eid)
		
			if (eid < count && m_events[eid] != nullptr)
			
				m_events[eid]->SetEvent();
			
		
		void ResetEvent(EventId eid)
		
			if (eid < count && m_events[eid] != nullptr)
			
				m_events[eid]->ResetEvent();
			
		
		void ResetEvents()
		
			for (auto ev : m_events)
			
				if (ev != nullptr)
					ev->ResetEvent();
			
		
		bool CloseHandle(EventId eid)
		
			if (eid < count && m_events[eid] != nullptr)
			
				if (m_events[eid]->IsWaiting())
				
					//if it's waiting, set the event and signal it to be killed.
					//because we are trying to kill the even, SetEvent will block until the event is dead
					m_events[eid]->SetEvent(true);
				

				delete m_events[eid];
				m_events[eid] = nullptr;
				return true;
			
			return false;
		
		void CloseAllHandles()
		
			for (int i = 0; i < 100; ++i)
			
				if (m_events[i] != nullptr)
				
					CloseHandle(i);
				
			
		
		int WaitForSingleObject(EventId eid, unsigned timeout = k_INFINITE)
		
			if (eid < count && m_events[eid] != nullptr)
			
				return m_events[eid]->WaitForSingleObject(timeout);
			

			return -1;
		
		EventId WaitForMultipleObjects(unsigned count, EventId events[], bool wait_for_all=false, unsigned timeout = k_INFINITE)
		
			/*
				timeout is the value that we must wait until at least one (or all if wait_for_all is true), of the events is triggered.
				Each event will wait for 100 miliseconds in order to allow all events to be verified.
				timeout is used for each of the events in the list and is not a global value. 
			*/

			std::vector<unsigned> timeouts;
			std::vector<bool> signaled;
			for (unsigned i = 0; i < count; ++i)
			
				timeouts.push_back(0);
				signaled.push_back(false);
			

			bool wait_for_ever = timeout == k_INFINITE;

			do
			
				for (unsigned i = 0; i < count; ++i)
				
					EventId result = m_events[events[i]]->WaitForSingleObject(wait_for_multiple_objects_timeout);

					if (result == -2)
					
						return -2;
					
					else if (result == -1)
					
						//timeout waiting for events[i]
						if (!wait_for_ever)
						
							timeouts[i] += wait_for_multiple_objects_timeout;
							if (timeouts[i] >= timeout)
							
								//as soon as one of the events timeout, they are all timedout. This can be different if we need it to...
								return -1;
							
						
					
					else
					
						signaled[i] = true;

						if (!wait_for_all)
							return events[i];					

						bool all_signaled = true;
						for (auto sig : signaled)
						
							if (sig == false)
							
								all_signaled = false;
								break;
							
						

						if (all_signaled)
						
							return events[i];	//return last signaled event when all events are signaled
						
					
				
			 while (1);

			return -1;
		
		const char* GetName(EventId eid)
		
			if (eid < count && m_events[eid] != nullptr)
			
				return m_events[eid]->name();
			

			return "";
		

	private:
		int count;
		std::array<Event_handle, 100> m_events;
		unsigned wait_for_multiple_objects_timeout;
	;


 //end namespace cross

#endif

这里是 main.cpp

// BackToTheFuture.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"


#include<thread>
#include<iostream>
#include<mutex>
#include <string>
#include <sstream>

#include "concurrency2.h"

using namespace std;

cross::Handler handler;
cross::EventId events[6];

void mythread()

	
	int roll_index = 0;

	do
	
		try
		
			cross::EventId signaled = handler.WaitForMultipleObjects(3, events, true);// false, 5000);
			if (signaled >= 0)
			
				cout << handler.GetName(signaled) << " signaled@1" << endl;
				break;
			
			else if (signaled == -1)
			
				cout << handler.GetName(roll_index) << " time out@1" << endl;
				break;
			
			else if (signaled == -2)
				break;
		
		catch (...)	//need to handle this properly, for now will do
		
			cout << "exception..." << endl;
		

		if (++roll_index > 2)
			roll_index = 0;

	 while (1);

	cout << "EXITED thread 1\n";


void mythread2()


	int roll_index = 0;

	do
	
		try
		
			//WaitForMultipleObjects test - pass an array of EventIds
			cross::EventId signaled = handler.WaitForMultipleObjects(3, events+3, true);// , false, 5000);
			if (signaled >= 0)
			
				cout << handler.GetName(signaled) << " signaled@2" << endl;
				break;
			
			else if (signaled == -1)
			
				cout << handler.GetName(roll_index) << " time out@2" << endl;
				break;
			
			else if (signaled == -2)
				break;

		
		catch (...)	//need to handle this properly, for now will do
		
			cout << "exception..." << endl;
		

		if (++roll_index > 2)
			roll_index = 0;

	 while (1);

	cout << "EXITED thread 2\n";



int _tmain(int argc, _TCHAR* argv[])

	events[0] = handler.CreateEvent("event 0");
	events[1] = handler.CreateEvent("event 1");
	events[2] = handler.CreateEvent("event 2");
	events[3] = handler.CreateEvent("event a");
	events[4] = handler.CreateEvent("event b");
	events[5] = handler.CreateEvent("event c");


	std::thread t1(mythread);
	std::thread t2(mythread2);

	string input = "";
	int myNumber = 0;
	bool _exit = false;
	do
	
		getline(cin, input);

		// This code converts from string to number safely.
		stringstream myStream(input);
		if (myStream >> myNumber)
		
			if (myNumber > 5)
			
				handler.CloseAllHandles();
				_exit = true;

				handler.SetEvent(0);
			
			else
			 
				handler.SetEvent(myNumber);
			
		

	 while (!_exit);


	t1.join();
	t2.join();

	cout << "Terminated... press any key to continue..." << endl;
	getline(cin, input);

	return 0;

任何帮助将不胜感激!

干杯,安德烈

【问题讨论】:

【参考方案1】:

我不确定这是否是原因,但我认为以下行:

std::unique_lock<std::mutex> lk(m_cvmutex, std::try_to_lock);

不应使用 std::try_to_lock 参数,因为它不会阻止获取锁——尽管我看不出在你的示例中这可能会失败......例如,如果有多个线程正在调用 @ 987654323@ 在同一个 Event 实例上,lk 可能无法获取互斥锁,并且以下 std::wait_for 可能表现出“未定义的行为”。

【讨论】:

以上是关于跨平台事件处理 - std::condition_variable wait_for 似乎忽略了超时的主要内容,如果未能解决你的问题,请参考以下文章

在等待std :: condition_variable时如何处理系统时钟更改?

使用C ++ 11的chrono和_USE_32BIT_TIME_T

C++并发编程之三 并发操作的同步

在等待通知 std::condition_variable 期间执行“等待回调”

std::condition_variable 条件变量类型

std::condition_variable wait() 和 notify_one() 同步