使用线程的并行 TCP 连接
Posted
技术标签:
【中文标题】使用线程的并行 TCP 连接【英文标题】:Parallel TCP connection using threads 【发布时间】:2021-12-19 09:38:41 【问题描述】:我正在尝试构建一个使用线程打开并行 TCP 套接字的系统。 我的线程是使用消息队列 IPC 触发的,因此每次数据包到达消息队列时,一个线程“唤醒”,打开与远程服务器的 TCP 连接并发送数据包。 我的问题是,在 Wireshark 中,我可以看到使用线程而不是一个连接发送文件所需的时间更短,但吞吐量没有改变。 我的问题是:
-
如何验证我的线程并行工作?
如何改进此代码?
3.如何用一个线程打开多个socket?
我正在使用虚拟机来运行多线程客户端。 我使用的 IDE 是 Clion ,语言是 C。 我的代码:
#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h> // for ethernet header
#include<netinet/ip.h> // for ip header
#include<netinet/udp.h> // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE+10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;
typedef struct frag
int packet_number;
int seq;
uint8_t data[4096];
bool lastfrag;
fragma;
void * middlemanThread(void *arg)
///========================================///
///**** Waiting for message queue trigger!:///
///=======================================///
long id = (long)arg;
id+=1;
mqd_t qd; //queue descriptor
//open the queue for reading//
qd= mq_open(QUEUE_NAME,O_RDONLY);
assert(qd != -1);
struct mq_attr attr;
assert(mq_getattr(qd,&attr) != -1);
uint8_t *income_buf = calloc(attr.mq_msgsize,1);
uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
assert(income_buf);
fragma frag;
struct timespec timeout;
clock_gettime(CLOCK_REALTIME,&timeout);
timeout.tv_sec+=50;
//bool closesoc =false;
printf("Waiting for messages ..... \n\n");
while(1)
///========================================///
///**** Open message queue fo receive:///
///=======================================///
if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0)
printf("Failed to receive message for 50 sec \n");
//closesoc =true;
pthread_exit(NULL);
else
cast_buf = income_buf;
printf("Received successfully , your msg :\n");
frag.packet_number = *cast_buf;
cast_buf = (cast_buf + sizeof(int));
frag.seq = *cast_buf;
cast_buf = (cast_buf + sizeof(int));
memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
cast_buf = cast_buf + Nbytes;
frag.lastfrag = *cast_buf;
uint8_t * data = frag.data;
pthread_mutex_lock(&lock);
///========================================///
///**** Connecting to Server and send Frament:///
///=======================================///
int size = sizeof(( fragma *)income_buf)->packet_number + sizeof(( fragma *)income_buf)->seq + sizeof(( fragma *)income_buf)->data + sizeof(( fragma *)income_buf)->lastfrag;
printf("In thread\n");
int clientSocket;
struct sockaddr_in serverAddr;
socklen_t addr_size;
// Create the socket.
clientSocket = socket(PF_INET, SOCK_STREAM, 0);
//Configure settings of the server address
// Address family is Internet
serverAddr.sin_family = AF_INET;
//Set port number, using htons function
serverAddr.sin_port = htons(8081);
//Set IP address to localhost
serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
memset(serverAddr.sin_zero, '\0', sizeof serverAddr.sin_zero);
//Connect the socket to the server using the address
addr_size = sizeof serverAddr;
connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
if(send(clientSocket , income_buf , size,0) < 0)
printf("Send failed\n");
printf("Trhead Id : %ld \n" , id);
printf("Packet number : %d \n Seq = %d \n lasfrag = %d\n\n",frag.packet_number,frag.seq,(int)frag.lastfrag);
pthread_mutex_unlock(&lock);
//if(closesoc)
close(clientSocket);
usleep(20000);
int main()
int i = 0;
pthread_t tid[5];
while(i< 5)
if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
printf("Failed to create thread\n");
i++;
sleep(2);
i = 0;
while(i< 5)
pthread_join(tid[i++],NULL);
printf("Thread ID : %d:\n",i);
return 0;
【问题讨论】:
我不知道从哪里开始。在“线程化”时,代码既没有利用线程化(即,与多进程实现相比,它不会增加吞吐量/减少延迟),甚至没有共享 any 线程之间的状态(我看不到它使用互斥锁来做什么——“关键路径”代码都没有使用全局状态,所以为什么要锁定?)。其次,每个请求创建/使用/销毁套接字通常是一个坏主意(为了速度)。相反,可以管理一个连接池,并让现有连接至少停留一段时间。 更重要的是:TCP是一个stream协议。没有数据包。 所以你建议尝试多处理? thoughput 没有增加是什么意思?如果时间更小,文件相同,不是自动变大吗? 'usleep(20000);'......为什么? 【参考方案1】:因此,每次数据包到达消息队列时,都会有一个线程“唤醒”,打开与远程服务器的 TCP 连接并发送数据包
如果您完全关心速度或效率,请不要这样做。使用 TCP 套接字可以做的最昂贵的事情就是初始连接。您进行 3 次握手只是为了发送一条消息!
然后,您在执行整个操作时持有一个全局互斥锁 - 这也是您程序中最慢的操作。
当前的设计是有效地单线程的,但采用的是最复杂和最昂贵的方式。
我可以看到使用线程而不是一个连接发送文件所需的时间更短,但吞吐量没有改变
我不知道你实际测量的是什么,你也不清楚。什么是文件?一个片段?多个片段?与您的 MTU 相比,它有多大?您是否检查过片段实际上是按正确的顺序接收的,因为在我看来,唯一可能的并行性就是可能中断的地方。
如何才能降低单个文件的延迟和不变的吞吐量?
如何验证我的线程并行工作?
如果您在wireshark 中看到具有不同源端口的多个TCP 连接,并且它们的数据包是交错的,那么您就有了有效的并行性。这不太可能,因为您使用全局互斥锁明确禁止它!
在wireshark中检查吞吐量的最佳方法是什么?
不要。使用wireshark检查数据包,使用服务器确定吞吐量。这才是结果真正重要的地方。
3.并行TCP的概念是否可以提高吞吐量?
如果您不知道它的用途,为什么要实现所有这些复杂性?
单个线程(正确编码且没有虚假互斥抖动)很有可能会使您的网络饱和,所以:不会。拥有多个 I/O 线程通常是为了方便地划分逻辑和状态(即,每个线程有一个客户端,或者不同线程中有不同的不相关 I/O 子系统),而不是性能。
如果你想从消息队列中提取数据包并将它们发送到 TCP,那么高效的方法是:
-
只使用一个线程执行此操作(您的程序可能有其他线程执行其他操作 - 如果可能,请避免与它们同步)
打开一个到服务器的持久 TCP 连接,不要为每个片段都连接/关闭它
就是这样。它比您拥有的要简单得多,而且性能会更好。
您实际上可以让一个线程处理多个不同的连接,但我看不出这对您的情况有什么用处,所以请保持简单。
【讨论】:
感谢您的评论。我实现了它,因为我阅读了一些文章,它可以增加吞吐量,并使用更多的连接带宽。我尝试模拟通过卫星环境(具有较大的带宽和较大的延迟)加速 tcp 连接的系统,这就是我尝试使用并行机制的原因 如果你有一个非常高延迟的网络,你仍然可以使用非阻塞 I/O 代替线程(并且重复的 TCP 连接变得更加惩罚) 它是大带宽和大延迟(又名大带宽延迟产品),那么多个连接实际上可以加快速度。但是,特别是如果它是linux,你不需要在单独的线程中分离连接,你只需要有几个连接并在它们之间均匀分布数据(在一个线程中)。然后您还需要特殊的逻辑来重建接收器上的数据,因此您应该考虑是否真的需要这样做。首先,检查吞吐量是否太低,您实际上需要加快速度。 嗨,艾菲。我已经使用 linux tc 使用卫星设置检查了吞吐量,是的,吞吐量正在下降并且发生更多的重传。你能给我一些使用一个线程的几个 TCP 连接的例子吗? 或者你的意思是使用多处理?【参考方案2】:以下是部分答案:
3.并行TCP的概念是否假设增加吞吐量?
有点。这真的取决于瓶颈是什么。
第一个可能的瓶颈是拥塞控制。 TCP 发送方对一次可以发送多少数据包有一个限制(在收到第一个分组的 ACK 之前),称为拥塞窗口。这个数字应该从小开始并随着时间的推移而增长。此外,如果一个数据包丢失,这个数字会减半,然后缓慢增长,直到发生下一次丢包。然而,限制是每个 TCP 连接,因此如果您将数据分布在多个并行连接上,则总体拥塞窗口(所有流的所有窗口的总和)将增长得更快,并且下降的量更少。 (这是一个总结,细节你需要知道拥塞控制是如何工作的,这是一个很大的话题)。无论您是否使用线程,这都应该发生。您可以在一个线程中打开多个连接,并且达到相同的效果。
第二个可能的瓶颈是操作系统中的网络处理。 AFAIK 这是从 10Gb 连接开始的问题。也许是 1Gb,但可能不是。 TCP 处理发生在操作系统中,而不是在您的应用程序中。如果操作系统在处理器之间分散处理,您可能会获得更好的性能(应该有参数来启用它),并且可能会因为缓存而获得更好的性能。
如果你从磁盘读取文件,你的磁盘 IO 也很可能成为瓶颈。在这种情况下,我认为在不同线程之间传播发送数据实际上并没有帮助。
【讨论】:
感谢您的评论以上是关于使用线程的并行 TCP 连接的主要内容,如果未能解决你的问题,请参考以下文章