C++ Boost UDP接收器在放入线程时失败
Posted
技术标签:
【中文标题】C++ Boost UDP接收器在放入线程时失败【英文标题】:C++ Boost UDP receiver fails when put into thread 【发布时间】:2018-03-15 01:30:09 【问题描述】:我有一个可以工作的 UDP 接收器。代码在这里:
#include <array>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
std::string getMyIp()
std::string result;
try
boost::asio::io_service netService;
boost::asio::ip::udp::resolver resolver(netService);
boost::asio::ip::udp::udp::resolver::query query(boost::asio::ip::udp::v4(), "google.com", "");
boost::asio::ip::udp::udp::resolver::iterator endpoints = resolver.resolve(query);
boost::asio::ip::udp::udp::endpoint ep = *endpoints;
boost::asio::ip::udp::udp::socket socket(netService);
socket.connect(ep);
boost::asio::ip::address addr = socket.local_endpoint().address();
result = addr.to_string();
//std::cout << "My IP according to google is: " << results << std::endl;
catch (std::exception& e)
std::cerr << "Could not deal with socket. Exception: " << e.what() << std::endl;
return result;
class receiver
private:
boost::asio::ip::udp::socket socket_;
boost::asio::ip::udp::endpoint sender_endpoint_;
std::array<char, 1024> data_;
public:
receiver(boost::asio::io_service& io_service,
const boost::asio::ip::address& listen_address,
const boost::asio::ip::address& multicast_address,
unsigned short multicast_port = 13000)
: socket_(io_service)
// Create the socket so that multiple may be bound to the same address.
boost::asio::ip::udp::endpoint listen_endpoint(listen_address, multicast_port);
socket_.open(listen_endpoint.protocol());
socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
socket_.bind(listen_endpoint);
// Join the multicast group.
socket_.set_option(boost::asio::ip::multicast::join_group(multicast_address));
do_receive();
private:
void do_receive()
socket_.async_receive_from(boost::asio::buffer(data_), sender_endpoint_, [this](boost::system::error_code ec, std::size_t length)
if (!ec)
std::cout.write(data_.data(), length);
std::cout << std::endl;
do_receive();
);
;
int main(int argc, char* argv[])
try
boost::asio::io_service io_service;
receiver r(io_service, boost::asio::ip::make_address(getMyIp()), boost::asio::ip::make_address("224.0.0.0"), 13000);
io_service.run();
catch (std::exception& e)
std::cerr << "Exception: " << e.what() << "\n";
return 0;
我想将接收器代码放入一个类中的线程中,这样我就可以在它旁边做其他事情:
#define _CRT_SECURE_NO_WARNINGS
#include <ctime>
#include <iostream>
#include <string>
#include <queue>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <boost/thread/thread.hpp>
#include <boost/chrono.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using boost::asio::ip::udp;
using std::cout;
using std::cin;
using std::endl;
using std::string;
using namespace std;
std::string getMyIp()
std::string result;
try
boost::asio::io_service netService;
boost::asio::ip::udp::resolver resolver(netService);
boost::asio::ip::udp::udp::resolver::query query(boost::asio::ip::udp::v4(), "google.com", "");
boost::asio::ip::udp::udp::resolver::iterator endpoints = resolver.resolve(query);
boost::asio::ip::udp::udp::endpoint ep = *endpoints;
boost::asio::ip::udp::udp::socket socket(netService);
socket.connect(ep);
boost::asio::ip::address addr = socket.local_endpoint().address();
result = addr.to_string();
//std::cout << "My IP according to google is: " << results << std::endl;
catch (std::exception& e)
std::cerr << "Could not deal with socket. Exception: " << e.what() << std::endl;
return result;
class UdpReceiver
private:
boost::asio::ip::udp::socket socket_;
boost::asio::ip::udp::endpoint sender_endpoint_;
std::array<char, 1024> data_;
string address_send, address_recv;
unsigned short port_send, port_recv;
boost::thread_group threads; // thread group
boost::thread* thread_main; // main thread
boost::thread* thread_receive; // receive thread
boost::thread* thread_send; // get/send thread
boost::mutex stopMutex;
bool initialize = false;
bool stop, showBroadcast;
int i_send, i_recv, i_operator,
interval_send, interval_recv, interval_operator,
mode;
string message_send, message_recv;
string message_STOP = "STOP";
public:
// constructor
UdpReceiver(boost::asio::io_service& io_service, std::string address, unsigned short port, int interval, int mode, bool show = false)
: socket_(io_service),
showBroadcast(show)
initialize = false;
Initialize(io_service, show);
UdpReceiver(boost::asio::io_service& io_service, bool show = false)
: socket_(io_service),
showBroadcast(show)
Initialize(io_service, show);
// destructor
~UdpReceiver()
// show exit message
cout << "Exiting UDP Core." << endl;
// initialize
void Initialize(boost::asio::io_service& io_service, bool show = false)
if (initialize == false)
GetMode(true);
GetInfo(true);
CreateEndpoint(io_service);
CreateThreads();
stop = false;
showBroadcast = show;
i_send = 0;
i_recv = 0;
i_operator = 0;
message_send.clear();
message_recv.clear();
initialize = true; // clear flag
void GetMode(bool default_value = false)
std::string input;
if (default_value)
mode = 0;
else
string prompt = "Set mode:\n0/other - Listen\n1 - Send\nEnter your choice: ";
cout << prompt;
getline(cin, input);
try
mode = stoi(input);
// set default mode to Listen
if (mode > 1)
mode = 0;
catch (exception ec)
cout << "Error converting mode: " << ec.what() << endl;
Stop();
void GetInfo(bool default_value = false)
// always called after GetMode()
string address;
unsigned short port;
int interval;
if (default_value)
address = getMyIp();
port = 13000;
interval = 500;
switch (mode)
case 0:
address_recv = address;
port_recv = port;
interval_recv = interval;
break;
case 1:
address_send = address;
port_send = port;
interval_send = interval;
break;
default:
// already set to 0 in GetMode()
break;
void CreateEndpoint(boost::asio::io_service& io_service)
// Create the socket so that multiple may be bound to the same address.
boost::asio::ip::udp::endpoint listen_endpoint(boost::asio::ip::address::from_string(address_recv), port_recv);
socket_.open(listen_endpoint.protocol());
socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
socket_.bind(listen_endpoint);
// Join the multicast group.
socket_.set_option(boost::asio::ip::multicast::join_group(boost::asio::ip::address::from_string("224.0.0.0")));
void CreateThreads()
thread_main = new boost::thread(boost::ref(*this));
interval_operator = 500; // default value
switch (mode)
case 0:
thread_receive = new boost::thread(&UdpReceiver::Callable_Receive, this);
threads.add_thread(thread_receive);
break;
default:
// already set to 0 in GetMode()
break;
// start the threads
void Start()
// Wait till they are finished
threads.join_all();
// stop the threads
void Stop()
// warning message
cout << "Stopping all threads." << endl;
// signal the threads to stop (thread-safe)
stopMutex.lock();
stop = true;
stopMutex.unlock();
// wait for the threads to finish
thread_main->interrupt(); // in case not interrupted by operator()
threads.interrupt_all();
threads.join_all();
// close socket after everything closes
//socketPtr->close();
socket_.close();
void Callable_Receive()
while (!stop)
stopMutex.lock();
socket_.async_receive_from(boost::asio::buffer(data_), sender_endpoint_, [this](boost::system::error_code ec, std::size_t length)
if (!ec)
//cout << message_recv << endl;
std::cout.write(data_.data(), length);
std::cout << std::endl;
Callable_Receive();
);
stopMutex.unlock();
//cout << i_recv << endl;
++i_recv;
// Thread function
void operator () ()
while (!stop)
if (message_send == message_STOP)
try
this->Stop();
catch (exception e)
cout << e.what() << endl;
boost::this_thread::sleep(boost::posix_time::millisec(interval_operator));
boost::this_thread::interruption_point();
;
int main()
try
boost::asio::io_service io_service;
UdpReceiver mt(io_service, false);
mt.Start();
catch (std::exception& e)
std::cerr << "Exception: " << e.what() << "\n";
异步接收在 Callable_Receive() 内部,在 thread_receive 内部。当计数器打印在屏幕上时,我可以看到该线程正在运行(我将其注释掉)。但是, async_receive_from() 永远不会收到任何东西。谁能告诉我为什么会这样?
【问题讨论】:
【参考方案1】:Callable_Receive 中可能出现死锁。在以Callable_Receive
作为线程主体的线程中,您在调用async_receive_from
函数之前调用stopMutex.lock
。 async_receive_from
立即返回,但我们不知道何时将调用作为第三个参数传递给 async_receive_from
的 lambda 对象。当执行 lambda 对象的主体时,您正在调用 Callable_Receive
函数,如果 stopMutex
被锁定(带有 Callable_Receive
的线程仍在运行并且正在完成 while 循环中的下一次迭代)并且您尝试再次锁定它,您会死锁 - 在boost::mutex
上,当互斥锁已被同一个线程锁定时,您无法调用 lock 方法。
如果你想解决这个问题,你应该阅读boost::recursive_mutex
。
【讨论】:
用boost::resursive_mutex
替换boost::mutex
。以上是关于C++ Boost UDP接收器在放入线程时失败的主要内容,如果未能解决你的问题,请参考以下文章
使用 boost::asio::thread_pool 的 C++ 线程池,为啥我不能重用我的线程?
Boost UDP 多播接收器:set_option:请求的地址在其上下文中无效