C++ Boost UDP接收器在放入线程时失败

Posted

技术标签:

【中文标题】C++ Boost UDP接收器在放入线程时失败【英文标题】:C++ Boost UDP receiver fails when put into thread 【发布时间】:2018-03-15 01:30:09 【问题描述】:

我有一个可以工作的 UDP 接收器。代码在这里:

#include <array>
#include <iostream>
#include <string>
#include <boost/asio.hpp>

std::string getMyIp()

    std::string result;
    try
    
        boost::asio::io_service netService;
        boost::asio::ip::udp::resolver   resolver(netService);
        boost::asio::ip::udp::udp::resolver::query query(boost::asio::ip::udp::v4(), "google.com", "");
        boost::asio::ip::udp::udp::resolver::iterator endpoints = resolver.resolve(query);
        boost::asio::ip::udp::udp::endpoint ep = *endpoints;
        boost::asio::ip::udp::udp::socket socket(netService);
        socket.connect(ep);
        boost::asio::ip::address addr = socket.local_endpoint().address();
        result = addr.to_string();
        //std::cout << "My IP according to google is: " << results << std::endl;

    
    catch (std::exception& e)
    
        std::cerr << "Could not deal with socket. Exception: " << e.what() << std::endl;
    
    return result;


class receiver

private:
    boost::asio::ip::udp::socket socket_;
    boost::asio::ip::udp::endpoint sender_endpoint_;
    std::array<char, 1024> data_;

public:
    receiver(boost::asio::io_service& io_service,
        const boost::asio::ip::address& listen_address,
        const boost::asio::ip::address& multicast_address,
        unsigned short multicast_port = 13000)
        : socket_(io_service)
    
        // Create the socket so that multiple may be bound to the same address.
        boost::asio::ip::udp::endpoint listen_endpoint(listen_address, multicast_port);
        socket_.open(listen_endpoint.protocol());
        socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
        socket_.bind(listen_endpoint);

        // Join the multicast group.
        socket_.set_option(boost::asio::ip::multicast::join_group(multicast_address));
        do_receive();
    

private:
    void do_receive()
    
        socket_.async_receive_from(boost::asio::buffer(data_), sender_endpoint_, [this](boost::system::error_code ec, std::size_t length)
        
            if (!ec)
            
                std::cout.write(data_.data(), length);
                std::cout << std::endl;
                do_receive();
            
        );
    
;

int main(int argc, char* argv[])

    try
    
        boost::asio::io_service io_service;
        receiver r(io_service, boost::asio::ip::make_address(getMyIp()), boost::asio::ip::make_address("224.0.0.0"), 13000);
        io_service.run();
    
    catch (std::exception& e)
    
        std::cerr << "Exception: " << e.what() << "\n";
    

    return 0;
    

我想将接收器代码放入一个类中的线程中,这样我就可以在它旁边做其他事情:

#define _CRT_SECURE_NO_WARNINGS
#include <ctime>
#include <iostream>
#include <string>
#include <queue>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <boost/thread/thread.hpp> 
#include <boost/chrono.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

using boost::asio::ip::udp;
using std::cout;
using std::cin;
using std::endl;
using std::string;
using namespace std;

std::string getMyIp()

    std::string result;
    try
    
        boost::asio::io_service netService;
        boost::asio::ip::udp::resolver   resolver(netService);
        boost::asio::ip::udp::udp::resolver::query query(boost::asio::ip::udp::v4(), "google.com", "");
        boost::asio::ip::udp::udp::resolver::iterator endpoints = resolver.resolve(query);
        boost::asio::ip::udp::udp::endpoint ep = *endpoints;
        boost::asio::ip::udp::udp::socket socket(netService);
        socket.connect(ep);
        boost::asio::ip::address addr = socket.local_endpoint().address();
        result = addr.to_string();
        //std::cout << "My IP according to google is: " << results << std::endl;
    
    catch (std::exception& e)
    
        std::cerr << "Could not deal with socket. Exception: " << e.what() << std::endl;
    
    return result;


class UdpReceiver

private:
    boost::asio::ip::udp::socket socket_;
    boost::asio::ip::udp::endpoint sender_endpoint_;
    std::array<char, 1024> data_;

    string address_send, address_recv;
    unsigned short port_send, port_recv;
    boost::thread_group threads;            // thread group
    boost::thread* thread_main;             // main thread
    boost::thread* thread_receive;          // receive thread
    boost::thread* thread_send;             // get/send thread
    boost::mutex stopMutex;
    bool initialize = false;
    bool stop, showBroadcast;
    int i_send, i_recv, i_operator,
        interval_send, interval_recv, interval_operator,
        mode;
    string message_send, message_recv;
    string message_STOP = "STOP";

public:
    // constructor
    UdpReceiver(boost::asio::io_service& io_service, std::string address, unsigned short port, int interval, int mode, bool show = false)
        : socket_(io_service),
        showBroadcast(show)
    
        initialize = false;
        Initialize(io_service, show);
    

    UdpReceiver(boost::asio::io_service& io_service, bool show = false)
        : socket_(io_service),
        showBroadcast(show)
    
        Initialize(io_service, show);
    

    // destructor
    ~UdpReceiver()
    
        // show exit message
        cout << "Exiting UDP Core." << endl;
    

    // initialize
    void Initialize(boost::asio::io_service& io_service, bool show = false)
    
        if (initialize == false)
        
            GetMode(true);
            GetInfo(true);
        

        CreateEndpoint(io_service);
        CreateThreads();
        stop = false;
        showBroadcast = show;
        i_send = 0;
        i_recv = 0;
        i_operator = 0;
        message_send.clear();
        message_recv.clear();
        initialize = true;          // clear flag
    

    void GetMode(bool default_value = false)
    
        std::string input;
        if (default_value)
        
            mode = 0;           
        
        else
        
            string prompt = "Set mode:\n0/other - Listen\n1 - Send\nEnter your choice: ";
            cout << prompt;
            getline(cin, input);

            try
            
                mode = stoi(input);

                // set default mode to Listen
                if (mode > 1)
                    mode = 0;
            
            catch (exception ec)
            
                cout << "Error converting mode: " << ec.what() << endl;
                Stop();
            
        
    

    void GetInfo(bool default_value = false)
    
        // always called after GetMode()
        string address;
        unsigned short port;
        int interval;

        if (default_value)
        
            address = getMyIp();
            port = 13000;
            interval = 500;
        

        switch (mode)
        
        case 0:
            address_recv = address;
            port_recv = port;
            interval_recv = interval;
            break;

        case 1:
            address_send = address;
            port_send = port;
            interval_send = interval;
            break;

        default:
            // already set to 0 in GetMode()
            break;
        
    

    void CreateEndpoint(boost::asio::io_service& io_service)
    
        // Create the socket so that multiple may be bound to the same address.
        boost::asio::ip::udp::endpoint listen_endpoint(boost::asio::ip::address::from_string(address_recv), port_recv);
        socket_.open(listen_endpoint.protocol());
        socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
        socket_.bind(listen_endpoint);

        // Join the multicast group.
        socket_.set_option(boost::asio::ip::multicast::join_group(boost::asio::ip::address::from_string("224.0.0.0")));     
    

    void CreateThreads()
    
        thread_main = new boost::thread(boost::ref(*this));
        interval_operator = 500;    // default value

        switch (mode)
        
        case 0:
            thread_receive = new boost::thread(&UdpReceiver::Callable_Receive, this);
            threads.add_thread(thread_receive);
            break;

        default:
            // already set to 0 in GetMode()
            break;
        
    

    // start the threads
    void Start()
    
        // Wait till they are finished
        threads.join_all();
    

    // stop the threads
    void Stop()
    
        // warning message
        cout << "Stopping all threads." << endl;

        // signal the threads to stop (thread-safe)
        stopMutex.lock();
        stop = true;
        stopMutex.unlock();

        // wait for the threads to finish
        thread_main->interrupt();   // in case not interrupted by operator()
        threads.interrupt_all();
        threads.join_all();

        // close socket after everything closes
        //socketPtr->close();
        socket_.close();
    

    void Callable_Receive()
    
        while (!stop)
        
            stopMutex.lock();
            socket_.async_receive_from(boost::asio::buffer(data_), sender_endpoint_, [this](boost::system::error_code ec, std::size_t length)
            
                if (!ec)
                
                    //cout << message_recv << endl;
                    std::cout.write(data_.data(), length);
                    std::cout << std::endl;
                    Callable_Receive();
                
            );
            stopMutex.unlock();
            //cout << i_recv << endl;
            ++i_recv;
        
    

    // Thread function
    void operator () ()
    
        while (!stop)
        
            if (message_send == message_STOP)
            
                try
                
                    this->Stop();
                
                catch (exception e)
                
                    cout << e.what() << endl;
                
            

            boost::this_thread::sleep(boost::posix_time::millisec(interval_operator));
            boost::this_thread::interruption_point();
        
    
;

int main()

    try
    
        boost::asio::io_service io_service;
        UdpReceiver mt(io_service, false);
        mt.Start();
    
    catch (std::exception& e)
    
        std::cerr << "Exception: " << e.what() << "\n";
    

异步接收在 Callable_Receive() 内部,在 thread_receive 内部。当计数器打印在屏幕上时,我可以看到该线程正在运行(我将其注释掉)。但是, async_receive_from() 永远不会收到任何东西。谁能告诉我为什么会这样?

【问题讨论】:

【参考方案1】:

Callable_Receive 中可能出现死锁。在以Callable_Receive 作为线程主体的线程中,您在调用async_receive_from 函数之前调用stopMutex.lockasync_receive_from 立即返回,但我们不知道何时将调用作为第三个参数传递给 async_receive_from 的 lambda 对象。当执行 lambda 对象的主体时,您正在调用 Callable_Receive 函数,如果 stopMutex 被锁定(带有 Callable_Receive 的线程仍在运行并且正在完成 while 循环中的下一次迭代)并且您尝试再次锁定它,您会死锁 - 在boost::mutex 上,当互斥锁已被同一个线程锁定时,您无法调用 lock 方法。

如果你想解决这个问题,你应该阅读boost::recursive_mutex

【讨论】:

boost::resursive_mutex替换boost::mutex

以上是关于C++ Boost UDP接收器在放入线程时失败的主要内容,如果未能解决你的问题,请参考以下文章

使用 boost::asio::thread_pool 的 C++ 线程池,为啥我不能重用我的线程?

Boost UDP 多播接收器:set_option:请求的地址在其上下文中无效

C++ boost线程在实例化两次时导致分段错误

使用互斥锁锁定向量 - Boost

如何拆分接收到的 boost asio udp 套接字联合数据报

如何将 IDispatch::Invoke 放入 MFC C++ 线程?