c++中异步线程的实现

Posted

技术标签:

【中文标题】c++中异步线程的实现【英文标题】:implementation of asynchronous thread in c++ 【发布时间】:2016-12-03 06:22:25 【问题描述】:

我无法在 c++11 中找到异步线程的正确用法。我想做的是我想创建线程,并且每个线程将同时运行,而无需像 thread.join() 那样相互等待,这使得其他线程要等到当前线程完成。所以,C++ 中是否有任何库可以使线程并行运行同时完成它们的工作而不必等待另一个完成。实际上我想要的是我想要的同时运行每个线程,这样它们就不会等待另一个完成,它的功能会同时执行,而不必等待其他线程完成。 谢谢, 库沙尔

编辑: 编辑:: 我在下面发布代码

#include <signal.h>
#include <thread>
#include <algorithm>
#include <cstring>
#include <csignal>
#include "paho_client.h"
using namespace std;
vector<string>    topic_container"rpi2/temp","sense                          /bannana","sense/util","mqtt/temp","sense/temp","sense/pine","sense/fortis/udap";
 vector<paho_client> publisher;
 vector<paho_client> subscriber;
 int finish_thread=1;
 void Onfinish(int signum)
 finish_thread=0;
 exit(EXIT_FAILURE);
 

 int main(int argc, char** argv) 
 signal(SIGINT, Onfinish);
 int topic_index;
 if(argc<3)
    cout<<"the format of starting commandline argument is"<<endl;              
    exit(1);
    

    while(finish_thread!=0)
    //paho_client::get_library_handle();
    if(strcmp(argv[1],"create_publisher"))
        for(topic_index=0;topic_index<atoi(argv[2]);topic_index++)
            thread pub_th;
            pub_th = thread([ = ]() 
                paho_client client("publisher", "192.168.0.102", "9876",
                                   topic_container[topic_index].c_str());
                client.paho_connect_withpub();
              publisher.push_back(client);
            );
         pub_th.join();
        
        vector<paho_client>::iterator it;
        int publisher_traverse=0;
        for(it=publisher.begin();it<publisher.end();publisher_traverse++)
           publisher[publisher_traverse].increment_count();
          publisher[publisher_traverse].get_count();
       

   
  
 return 0;

在使用 async with future 后,我得到了与上面相同的行为,请指出我哪里出错了

 #include <signal.h>
 #include <thread>
 #include <algorithm>
 #include <cstring>
#include <csignal>
#include <future>
#include "paho_client.h"
using namespace std;
vector<string> topic_container"rpi2/temp","sense/apple","sense/bannana","sense/util","mqtt/temp","sense/temp","sense/pine","sense/fortis/udap";
vector<paho_client> publisher;
vector<paho_client> subscriber;
int finish_thread=1;
void Onfinish(int signum)
finish_thread=0;
exit(EXIT_FAILURE);

int accumulate_block_worker_ret(int topic_index) 
//int topic_index=0;
paho_client client("publisher", "192.168.0.102", "9876",
                   topic_container[topic_index].c_str());
client.paho_connect_withpub();
publisher.push_back(client);
client.increment_count();
return client.get_count();
 


    int main(int argc, char** argv) 
    signal(SIGINT, Onfinish);

    if(argc<3)
    cout<<"the format of starting commandline argument is . /paho_client_emulate <create_publisher><count of publisher client to spawn>"  <<endl;
    exit(1);
   

     while(finish_thread!=0)
//   paho_client::get_library_handle();
     int topic_index;
      if(strcmp(argv[1],"create_publisher"))
     for(topic_index=0;topic_index<atoi(argv[2]);topic_index++)
    //  thread pub_th;
    // pub_th = thread([ = ]() 
       future<int> f =  async(std::launch::async,accumulate_block_worker_ret,topic_index);
    //      );
    //  pub_th.join();
        cout<<"the returned value from future is"<<f.get()<<endl;
       

    vector<paho_client>::iterator it;
    int publisher_traverse=0;
    for(it=publisher.begin();it<=publisher.end();publisher_traverse++)
        cout<<"came here"<<endl;
        publisher[publisher_traverse].increment_count();
        publisher[publisher_traverse].get_count();
     

     
     
     return 0;
    

【问题讨论】:

线程按定义并行运行。调用 join 不会改变这一点(调用者除外)。 join 用于等待线程完成。但它并不能以某种方式阻止其他线程并行运行。你要么没有清楚地表达你的问题,要么你对线程的工作方式有一些基本的误解。 对不起,我误解了我的问题线程确实并行运行,但我想要的是我希望线程不要等待另一个完成。它们应该同时运行它们的功能而不必等待。 那就不要打电话给join。就是这么简单。您无需做任何事情即可不等待。如果您确实想等待,您只需要做一些事情(致电join)。所以你的想法都是倒退的。 如果我不调用 join 。它会给出活动异常。我的意思是如果我在线程上调用 detach 将不能保证我会得到所有线程的输出。换句话说,我想要所有线程运行它的内部代码并输出它的数据 显示。您的。代码。正确的做法是不要在子线程中调用join,而是在主线程中调用join。如果您认为您已经这样做了,但仍然无法正常工作,那么除非您显示代码,否则我们将无法帮助您。 【参考方案1】:

我想首先启动所有发布者客户端(作为线程)并且 稍后从每个线程发布消息

pub_th.join() 被错误地放置在启动线程的循环内,因此在启动下一个线程之前等待每个线程的终止。要让线程并行运行,只需将.join() 移出该循环即可。当然要在循环体之后访问线程,它们必须存储在某个地方,例如。 G。在 vector - 为此,将第一个 for 循环更改为

        vector <thread> pub_threads;
        for (topic_index=0; topic_index<atoi(argv[2]); topic_index++)
        
            pub_threads.push_back(thread([ = ]()  /* whatever */ ));
        

完成后:

        for (auto &th: pub_threads) th.join();

实际上,我在每个实例中都在无限运行 paho_client 所以第一个线程没有完成…… 该线程连续运行

当然,如果从未完成,.join() 毫无意义。

【讨论】:

以上是关于c++中异步线程的实现的主要内容,如果未能解决你的问题,请参考以下文章

标准 C++ 中的异步线程

C++线程池ThreadPool实现解析

为啥 C++ 标准库中没有线程池?

C++ 的非线程异步 IO 简介?

C++ 中的异步线程安全日志记录(无互斥体)

c++线程异步同时运行