boost::asio::socket 线程安全
Posted
技术标签:
【中文标题】boost::asio::socket 线程安全【英文标题】:boost::asio::socket thread safety 【发布时间】:2011-11-13 20:12:47 【问题描述】:(这是我原来问题的简化版)
我有几个线程写入 boost asio 套接字。这似乎工作得很好,没有问题。
文档说共享套接字不是线程安全的(here,在底部)所以我想知道是否应该用互斥锁或其他东西保护套接字。
question 坚持认为保护是必要的,但没有给出如何保护的建议。
我最初问题的所有答案也坚持认为我所做的事情很危险,并且大多数人都敦促我用 async_writes 或更复杂的东西替换我的写入。但是,我不愿意这样做,因为这会使已经运行的代码复杂化,并且没有一个回答者让我相信他们知道他们在说什么 - 他们似乎已经阅读了与我相同的文档并且正在猜测,就像我一样是。
所以,我编写了一个简单的程序来对从两个线程写入共享套接字进行压力测试。
这里是服务器,它只是简单地写出它从客户端接收到的任何内容
int main()
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3001));
tcp::socket socket(io_service);
acceptor.accept(socket);
for (;;)
char mybuffer[1256];
int len = socket.read_some(boost::asio::buffer(mybuffer,1256));
mybuffer[len] = '\0';
std::cout << mybuffer;
std::cout.flush();
return 0;
这里是客户端,它创建两个线程以尽可能快地写入共享套接字
boost::asio::ip::tcp::socket * psocket;
void speaker1()
string msg("speaker1: hello, server, how are you running?\n");
for( int k = 0; k < 1000; k++ )
boost::asio::write(
*psocket,boost::asio::buffer(msg,msg.length()));
void speaker2()
string msg("speaker2: hello, server, how are you running?\n");
for( int k = 0; k < 1000; k++ )
boost::asio::write(
*psocket,boost::asio::buffer(msg,msg.length()));
int main(int argc, char* argv[])
boost::asio::io_service io_service;
// connect to server
tcp::resolver resolver(io_service);
tcp::resolver::query query("localhost", "3001");
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
psocket = new tcp::socket(io_service);
boost::system::error_code error = boost::asio::error::host_not_found;
while (error && endpoint_iterator != end)
psocket->close();
psocket->connect(*endpoint_iterator++, error);
boost::thread t1( speaker1 );
boost::thread t2( speaker2 );
Sleep(50000);
这行得通!完美,据我所知。客户端不会崩溃。消息到达服务器时不会出现乱码。它们通常交替到达,每个线程一个。有时一个线程在另一个线程之前收到两三条消息,但我认为这不是问题,只要没有乱码并且所有消息都到达。
我的结论:从理论上讲,套接字可能不是线程安全的,但是很难让它失败,所以我不会担心它。
【问题讨论】:
将 io_service::post() 与调用 write() 的处理程序一起使用几乎没有价值。你太复杂了,使用 async_write()。 从 ASIO 的角度来看,您的实施没有意义。这种代码风格不需要 asio。 为什么你认为你需要多个线程? 【参考方案1】:在重新研究 async_write 的代码后,我现在确信任何写操作都是线程安全的当且仅当数据包大小小于
default_max_transfer_size = 65536;
一旦调用 async_write,就会在同一个线程中调用 async_write_some。池中调用某种形式的 io_service::run 的任何线程将继续为该写入操作调用 async_write_some 直到它完成。
如果必须多次调用这些 async_write_some 调用(数据包大于 65536),这些调用可以并且将会交织。
ASIO 不会像您期望的那样对套接字的写入进行排队,一个接一个地完成。为了确保 thread 和 interleave 安全写入,请考虑以下代码:
void my_connection::async_serialized_write(
boost::shared_ptr<transmission> outpacket)
m_tx_mutex.lock();
bool in_progress = !m_pending_transmissions.empty();
m_pending_transmissions.push(outpacket);
if (!in_progress)
if (m_pending_transmissions.front()->scatter_buffers.size() > 0)
boost::asio::async_write(m_socket,
m_pending_transmissions.front()->scatter_buffers,
boost::asio::transfer_all(),
boost::bind(&my_connection::handle_async_serialized_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
else // Send single buffer
boost::asio::async_write(m_socket,
boost::asio::buffer(
m_pending_transmissions.front()->buffer_references.front(), m_pending_transmissions.front()->num_bytes_left),
boost::asio::transfer_all(),
boost::bind(
&my_connection::handle_async_serialized_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
m_tx_mutex.unlock();
void my_connection::handle_async_serialized_write(
const boost::system::error_code& e, size_t bytes_transferred)
if (!e)
boost::shared_ptr<transmission> transmission;
m_tx_mutex.lock();
transmission = m_pending_transmissions.front();
m_pending_transmissions.pop();
if (!m_pending_transmissions.empty())
if (m_pending_transmissions.front()->scatter_buffers.size() > 0)
boost::asio::async_write(m_socket,
m_pending_transmissions.front()->scatter_buffers,
boost::asio::transfer_exactly(
m_pending_transmissions.front()->num_bytes_left),
boost::bind(
&chreosis_connection::handle_async_serialized_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
else // Send single buffer
boost::asio::async_write(m_socket,
boost::asio::buffer(
m_pending_transmissions.front()->buffer_references.front(),
m_pending_transmissions.front()->num_bytes_left),
boost::asio::transfer_all(),
boost::bind(
&my_connection::handle_async_serialized_write,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
m_tx_mutex.unlock();
transmission->handler(e, bytes_transferred, transmission);
else
MYLOG_ERROR(
m_connection_oid.toString() << " " << "handle_async_serialized_write: " << e.message());
stop(connection_stop_reasons::stop_async_handler_error);
这基本上构成了一次发送一个数据包的队列。 async_write 仅在第一次写入成功后调用,然后调用原始处理程序进行第一次写入。
如果 asio 使每个套接字/流的写入队列自动进行,那会更容易。
【讨论】:
有趣!既然我从来没想过要发送这么大的数据包,看来我会没事的。 当套接字的内部缓冲区已满(呃)时也可能发生这种情况 我一直怀疑与此相关的部分。 对交错的观察很重要。同样重要的是人们要认识到一个链(非并发调用)不能做任何事情来防止异步方法(async_read 和 async_write)对套接字的并发访问。 asio 设计要求在读取和写入的处理程序中调用读取和写入。这导致 (1) 异步方法的非并发调用,(2) 它们启动的异步代码的非并发执行,以及 (3) 避免交错。不幸的是,这并不能满足真正并发的需求。 如果我不直接从工作线程调用 async_write ( some_huge_data_packet ),而是将写入操作“发布”到我的单个 io_service::run 线程,我能否保证我的巨大数据包将不交错?【参考方案2】:对非线程安全的异步处理程序使用boost::asio::io_service::strand
。
链被定义为事件的严格顺序调用 处理程序(即没有并发调用)。股线的使用允许 在多线程程序中执行代码而不需要 显式锁定(例如使用互斥锁)。
timer tutorial 可能是最简单的方法来缠绕你的头。
【讨论】:
股线看起来像一个非常有趣和有用的小动物。但是,乍一看,它在我的特定情况下似乎没有帮助。对套接字的写入不受同一个 io_service 控制,而是作为完全独立进程的结果发生。 (在其他情况下,对有用性 +1 投票。( @ravenspoint 单个套接字由单个 I/O 服务提供服务,它需要 reference in the constructor。如果您使用多个io_service
对象,那将是一个完全不同的问题。
我不明白你之前的评论。我已经编辑了我的问题,希望能澄清我需要做什么。
@ravenspoint 您正在将异步方法与同步方法混合使用。不要那样做。链仅在异步上下文中有用。
如果 strand 调用的方法不是同步的,则不成立。只有调用将是同步的,这不会阻止异步方法启动的代码的并发执行。【参考方案3】:
听起来这个问题归结为:
当
async_write_some()
从两个不同的线程在一个套接字上同时调用时会发生什么
我相信这正是不是线程安全的操作。这些缓冲区在线路上发出的顺序是未定义的,它们甚至可能是交错的。特别是如果您使用便利函数async_write()
,因为它是作为对下面的async_write_some()
的一系列调用实现的,直到整个缓冲区被发送。在这种情况下,从两个线程发送的每个片段可能会随机交错。
保护你免受这种情况的唯一方法是构建你的程序来避免这种情况。
实现这一点的一种方法是编写一个应用层发送缓冲区,由单个线程负责将其推送到套接字上。这样你就可以只保护发送缓冲区本身。请记住,尽管简单的std::vector
不起作用,因为在末尾添加字节可能最终会重新分配它,可能有一个出色的async_write_some()
引用它。相反,使用缓冲区的链表可能是个好主意,并利用 asio 的 scatter/gather 功能。
【讨论】:
我没有使用 aysnc_write_some 或 async_write。我正在使用写(boost.org/doc/libs/1_47_0/doc/html/boost_asio/reference/…)【参考方案4】:理解 ASIO 的关键是要认识到完成处理程序只在调用了io_service.run()
的线程的上下文中运行,而不管哪个线程调用了异步方法。如果您只在一个线程中调用了io_service.run()
,那么所有完成处理程序都将在该线程的上下文中连续执行。如果您在多个线程中调用了io_service.run()
,则完成处理程序将在其中一个线程的上下文中执行。你可以把它想象成一个线程池,池中的线程是那些在同一个io_service
对象上调用了io_service.run()
的线程。
如果您有多个线程调用io_service.run()
,那么您可以通过将完成处理程序放入strand
来强制对其进行序列化。
要回答您问题的最后一部分,您应该致电boost::async_write()
。这会将写入操作分派到调用io_service.run()
的线程上,并在写入完成时调用完成处理程序。如果您需要序列化此操作,那么它会稍微复杂一些,您应该阅读 strands here 上的文档。
【讨论】:
我的一大困难是我不知道什么是“完成处理程序”,什么不是。 boost::asio::write() 是完成处理程序吗?它是否会“神奇地”将写入操作排队到(不同的)线程中,该线程调用传递给套接字构造函数的 io_service 的 run() 方法?如果两者的答案都是肯定的,那么我的原始代码是正确的,我可以放松了! @ravenspoint: boost::asio::write() 是一个同步调用,所以只要你在调用发生的线程上下文中调用它,它就会立即执行。 boost::asio::async_write() 将函数对象作为其参数之一。当您调用 boost::asio::async_write() 时,写入操作“神奇地”排队到调用 run() 的线程上。 @ravenspoint:如果你在 linux 上运行,它的工作原理是这样的:当你调用 io_service::run() 时,底层代码调用 epoll(),它会阻塞当前线程,直到一个线程有活动它的文件描述符。当您调用 async_write() 时,它会将套接字文件描述符添加到 epoll 集中,并将完成处理程序与该文件描述符相关联。当 epoll 返回时,底层代码写入数据并调用处理程序。这一切都发生在调用 run() 的线程中。 我准备接受 async_write() 如您所描述的那样工作。问题是现在我必须阻止写入线程在第一个完成之前执行另一个 async_write() 调用。叹。也许发布函子更容易?至少我已经编写并测试了代码。 @ravenspoint:区别不在于一个阻止而另一个不阻止。不同的是,一个是同步的,另一个是异步的。为了执行异步操作,操作必须排队。您可以执行同步操作而无需排队,这就是 ASIO 所做的。【参考方案5】:首先考虑套接字是一个流,并且在内部没有防止并发读取和/或写入。有三个不同的考虑。
-
访问同一套接字的函数的并发执行。
同时执行包含相同套接字的委托。
写入同一套接字的委托交错执行。
chat example 是异步的,但不是并发的。 io_service 从单个线程运行,使所有聊天客户端操作非并发。换句话说,它避免了所有这些问题。甚至 async_write 也必须在内部完成发送消息的所有部分,然后才能进行任何其他工作,避免交错问题。
处理程序仅由当前正在为 io_service 调用 run()、run_one()、poll() 或 poll_one() 的任何重载的线程调用。
通过将工作发布到单线程 io_service,其他线程可以通过在 io_service 中排队工作来安全地避免并发和阻塞。但是,如果您的方案阻止您缓冲给定套接字的所有工作,那么事情会变得更加复杂。您可能需要阻止套接字通信(但不是线程),而不是无限期地排队工作。此外,工作队列可能非常难以管理,因为它完全不透明。
如果您的 io_service 运行多个线程,您仍然可以轻松避免上述问题,但您只能从其他读取或写入的处理程序(以及在启动时)调用读取或写入。这对所有对套接字的访问进行排序,同时保持非阻塞。安全性源于该模式在任何给定时间仅使用一个线程这一事实。但是从独立线程发布工作是有问题的——即使你不介意缓冲它。
strand 是一个 asio 类,它以确保非并发调用的方式将工作发布到 io_service。然而,使用 strand 调用 async_read 和/或 async_write 只能解决三个问题中的第一个。这些函数在内部将工作发布到套接字的 io_service。如果该服务正在运行多个线程,则可以同时执行工作。
那么,对于给定的套接字,您如何安全地同时调用 async_read 和/或 async_write?
对于并发调用者,第一个问题可以通过互斥锁或链来解决,如果您不想缓冲工作,则使用前者,如果您愿意,则使用后者。这在函数调用期间保护了套接字,但对其他问题没有任何作用。
第二个问题似乎最难,因为很难看出从两个函数异步执行的代码内部发生了什么。异步函数都将工作发布到套接字的 io_service。
来自升压插座来源:
/**
* This constructor creates a stream socket without opening it. The socket
* needs to be opened and then connected or accepted before data can be sent
* or received on it.
*
* @param io_service The io_service object that the stream socket will use to
* dispatch handlers for any asynchronous operations performed on the socket.
*/
explicit basic_stream_socket(boost::asio::io_service& io_service)
: basic_socket<Protocol, StreamSocketService>(io_service)
并且来自 io_service::run()
/**
* The run() function blocks until all work has finished and there are no
* more handlers to be dispatched, or until the io_service has been stopped.
*
* Multiple threads may call the run() function to set up a pool of threads
* from which the io_service may execute handlers. All threads that are
* waiting in the pool are equivalent and the io_service may choose any one
* of them to invoke a handler.
*
* ...
*/
BOOST_ASIO_DECL std::size_t run();
因此,如果您为套接字提供多个线程,则它别无选择,只能使用多个线程 - 尽管不是线程安全的。避免这个问题的唯一方法(除了替换套接字实现)是只给套接字一个线程来使用。对于单个套接字,这就是你想要的(所以不要费心去写一个替换)。
-
第三个问题可以通过使用在 async_write 之前锁定、传递到完成处理程序并在该点解锁的(不同的)互斥体来解决。这将阻止任何调用方开始写入,直到前面写入的所有部分都完成。
请注意,async_write 帖子在队列中工作 - 这就是它几乎可以立即返回的方式。如果你投入太多工作,你可能不得不处理一些后果。尽管对套接字使用单个 io_service 线程,但您可能有任意数量的线程通过对 async_write 的并发或非并发调用发布工作。
另一方面,async_read 很简单。没有交错问题,您只需从前一个调用的处理程序中循环回来。您可能希望也可能不希望将生成的工作分派到另一个线程或队列,但如果您在完成处理程序线程上执行它,您只是阻塞了单线程套接字上的所有读取和写入。
更新
我对套接字流的底层实现进行了更多挖掘(针对一个平台)。似乎是套接字始终在调用线程上执行平台套接字调用,而不是发布到 io_service 的委托。换句话说,尽管 async_read 和 async_write 看起来立即返回,但它们实际上在返回之前执行了所有套接字操作。只有处理程序发布到 io_service。我审查过的示例代码既没有记录也没有公开,但假设它是有保证的行为,它会显着影响上面的第二个问题。
假设发布到 io_service 的工作不包含套接字操作,则无需将 io_service 限制为单个线程。然而,它确实加强了防止异步函数并发执行的重要性。因此,例如,如果一个人遵循聊天示例,而是将另一个线程添加到 io_service,就会出现问题。通过在函数处理程序中执行异步函数调用,您可以执行并发函数。这将需要一个互斥锁或所有异步函数调用都被重新发布以在链上执行。
更新 2
关于第三个问题(交错),如果数据大小超过 65536 字节,则工作在 async_write 内部被分解并分批发送。但重要的是要理解,如果 io_service 中有多个线程,则第一个以外的工作块将发布到不同的线程。在调用完成处理程序之前,这一切都发生在 async_write 函数内部。该实现创建自己的中间完成处理程序,并使用它们来执行除第一个套接字操作之外的所有操作。
这意味着如果有多个 io_service 线程并且要发布超过 64kb 的数据,则围绕 async_write 调用(互斥或链)的任何保护都将不保护套接字(默认情况下,这可能会有所不同)。因此,在这种情况下,interleave guard 不仅是为了交错安全,也是为了套接字的线程安全。我在调试器中验证了所有这些。
互斥选项
async_read 和 async_write 函数在内部使用 io_service 来获取在其上发布完成处理程序的线程,在线程可用之前一直阻塞。这使得使用互斥锁保护它们很危险。当使用互斥锁来保护这些函数时,当线程备份锁时会发生死锁,从而使 io_service 饿死。鉴于在使用多线程 io_service 发送 > 64k 时没有其他方法可以保护 async_write,它在这种情况下有效地将我们锁定在一个线程中 - 这当然解决了并发问题。
【讨论】:
【参考方案6】:根据 2008 年 11 月的 boost 1.37 asio 更新,包括写入在内的某些同步操作“现在是线程安全的”,允许“在单个套接字上进行并发同步操作,如果操作系统支持的话”boost 1.37.0 history。这似乎支持您所看到的,但过度简化的“共享对象:不安全”子句仍保留在 ip::tcp::socket 的 boost 文档中。
【讨论】:
感谢您的检查。遗憾的是,没有人来给出明确的答案——只有文档的其他读者。顺便说一句,你的 XML 编辑器,狐狸,石头!【参考方案7】:对旧帖子的另一条评论...
我认为asio::async_write()
重载的asio文档中的关键语句是the following:
此操作通过对流的 async_write_some 函数的零次或多次调用来实现,称为组合操作。在此操作完成之前,程序必须确保流不执行其他写入操作(例如 async_write、流的 async_write_some 函数或任何其他执行写入的组合操作)。
据我了解,这记录了上述许多答案中的假设:
如果多个线程执行io_context.run()
,来自对asio::async_write
的调用的数据可能会交错。
也许这对某人有帮助;-)
【讨论】:
【参考方案8】:这取决于您是否从多个线程访问同一个套接字对象。假设您有两个线程运行相同的io_service::run()
函数。
例如,如果您同时进行读写操作,或者可能正在执行取消操作 从其他线程。那么就不安全了。
但是,如果您的协议一次只执行一项操作。
-
如果只有一个线程运行 io_service run 则没有问题。如果您想从其他线程在套接字上执行某些操作,您可以调用 io_service::post()
在套接字上执行此操作的处理程序,因此它将在同一线程中执行。
如果您有多个线程在执行
io_service::run
并且您尝试同时执行操作 - 比如说取消和读取操作,那么您应该使用链。 Boost.Asio 文档中有一个教程。
【讨论】:
从多个线程访问套接字。并非所有线程都使用 io_service。【参考方案9】:我一直在进行大量测试,但无法破解 asio。即使没有锁定任何互斥体。
不过,我还是建议您在每个调用周围使用 async_read
和 async_write
和互斥锁。
我认为唯一的缺点是,如果您有多个线程调用 io_service::run
,则可能会同时调用您的完成处理程序。
就我而言,这不是问题。这是我的测试代码:
#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/asio.hpp>
#include <vector>
using namespace std;
char databuffer[256];
vector<boost::asio::const_buffer> scatter_buffer;
boost::mutex my_test_mutex;
void my_test_func(boost::asio::ip::tcp::socket* socket, boost::asio::io_service *io)
while(1)
boost::this_thread::sleep(boost::posix_time::microsec(rand()%1000));
//my_test_mutex.lock(); // It would be safer
socket->async_send(scatter_buffer, boost::bind(&mycallback));
//my_test_mutex.unlock(); // It would be safer
int main(int argc, char **argv)
for(int i = 0; i < 256; ++i)
databuffer[i] = i;
for(int i = 0; i < 4*90; ++i)
scatter_buffer.push_back(boost::asio::buffer(databuffer));
boost::asio::io_service my_test_ioservice;
boost::asio::ip::tcp::socket my_test_socket(my_test_ioservice);
boost::asio::ip::tcp::resolver my_test_tcp_resolver(my_test_ioservice);
boost::asio::ip::tcp::resolver::query my_test_tcp_query("192.168.1.10", "40000");
boost::asio::ip::tcp::resolver::iterator my_test_tcp_iterator = my_test_tcp_resolver.resolve(my_test_tcp_query);
boost::asio::connect(my_test_socket, my_test_tcp_iterator);
for (size_t i = 0; i < 8; ++i)
boost::shared_ptr<boost::thread> thread(
new boost::thread(my_test_func, &my_test_socket, &my_test_ioservice));
while(1)
my_test_ioservice.run_one();
boost::this_thread::sleep(boost::posix_time::microsec(rand()%1000));
return 0;
这是我在 python 中的临时服务器:
import socket
def main():
mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
mysocket.bind((socket.gethostname(), 40000))
mysocket.listen(1)
while 1:
(clientsocket, address) = mysocket.accept()
print("Connection from: " + str(address))
i = 0
count = 0
while i == ord(clientsocket.recv(1)):
i += 1
i %= 256
count+=1
if count % 1000 == 0:
print(count/1000)
print("Error!")
return 0
if __name__ == '__main__':
main()
请注意,运行此代码可能会导致您的计算机崩溃。
【讨论】:
这段代码并没有真正正确地测试它。它应该发送远大于 65536 的数据包,以说明 asio 对于交错 async_write 调用本身并不是线程安全的。 互斥锁不保护从临界区异步调用的代码。 但我同意在 async_read 和 async_write 的并发执行周围使用互斥锁是个好主意,因为套接字不是线程安全的,否则它会被并发访问。以上是关于boost::asio::socket 线程安全的主要内容,如果未能解决你的问题,请参考以下文章