使用线程的并行 TCP 连接

Posted

技术标签:

【中文标题】使用线程的并行 TCP 连接【英文标题】:Parallel TCP connection using threads 【发布时间】:2021-12-19 09:38:41 【问题描述】:

我正在尝试构建一个使用线程打开并行 TCP 套接字的系统。 我的线程是使用消息队列 IPC 触发的,因此每次数据包到达消息队列时,一个线程“唤醒”,打开与远程服务器的 TCP 连接并发送数据包。 我的问题是,在 Wireshark 中,我可以看到使用线程而不是一个连接发送文件所需的时间更短,但吞吐量没有改变。 我的问题是:

    如何验证我的线程并行工作? 如何改进此代码? 3.如何用一个线程打开多个socket?

我正在使用虚拟机来运行多线程客户端。 我使用的 IDE 是 Clion ,语言是 C。 我的代码:

#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h>    // for ethernet header
#include<netinet/ip.h>      // for ip header
#include<netinet/udp.h>     // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE+10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;

typedef struct frag

    int packet_number;
    int seq;
    uint8_t data[4096];
    bool lastfrag;
 fragma;

void * middlemanThread(void *arg)

    ///========================================///
    ///**** Waiting for message queue trigger!:///
    ///=======================================///
    long id = (long)arg;
    id+=1;
    mqd_t qd; //queue descriptor
    //open the queue for reading//
    qd= mq_open(QUEUE_NAME,O_RDONLY);
    assert(qd != -1);
    struct mq_attr attr;
    assert(mq_getattr(qd,&attr) != -1);
    uint8_t *income_buf = calloc(attr.mq_msgsize,1);
    uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
    assert(income_buf);
    fragma frag;
    struct timespec timeout;
    clock_gettime(CLOCK_REALTIME,&timeout);
    timeout.tv_sec+=50;
    //bool closesoc =false;
    printf("Waiting for messages ..... \n\n");
    while(1)
        ///========================================///
        ///**** Open message queue fo receive:///
        ///=======================================///

        if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0)
            printf("Failed to receive message for 50 sec \n");
            //closesoc =true;
            pthread_exit(NULL);
        
        else
            cast_buf = income_buf;
            printf("Received successfully , your msg :\n");
            frag.packet_number = *cast_buf;
            cast_buf = (cast_buf + sizeof(int));
            frag.seq = *cast_buf;
            cast_buf = (cast_buf + sizeof(int));
            memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
            cast_buf = cast_buf + Nbytes;
            frag.lastfrag = *cast_buf;
            uint8_t * data = frag.data;
        
        pthread_mutex_lock(&lock);

        ///========================================///
        ///**** Connecting to Server and send Frament:///
        ///=======================================///

        int size = sizeof(( fragma *)income_buf)->packet_number + sizeof(( fragma *)income_buf)->seq + sizeof(( fragma *)income_buf)->data + sizeof(( fragma *)income_buf)->lastfrag;
        printf("In thread\n");
        int clientSocket;
        struct sockaddr_in serverAddr;
        socklen_t addr_size;

        // Create the socket.
        clientSocket = socket(PF_INET, SOCK_STREAM, 0);

        //Configure settings of the server address
        // Address family is Internet
        serverAddr.sin_family = AF_INET;

        //Set port number, using htons function
        serverAddr.sin_port = htons(8081);

        //Set IP address to localhost
        serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
        memset(serverAddr.sin_zero, '\0', sizeof serverAddr.sin_zero);

        //Connect the socket to the server using the address
        addr_size = sizeof serverAddr;
        connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
        if(send(clientSocket , income_buf , size,0) < 0)
        
            printf("Send failed\n");
        
        printf("Trhead Id : %ld \n" , id);
        printf("Packet number : %d \n Seq = %d  \n lasfrag = %d\n\n",frag.packet_number,frag.seq,(int)frag.lastfrag);
        pthread_mutex_unlock(&lock);
        //if(closesoc)
        close(clientSocket);
        usleep(20000);
    

int main()
    int i = 0;
    pthread_t tid[5];

    while(i< 5)
    
        if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
            printf("Failed to create thread\n");
        i++;
    
    sleep(2);
    i = 0;
    while(i< 5)
    
        pthread_join(tid[i++],NULL);
        printf("Thread ID : %d:\n",i);
    
    return 0;

【问题讨论】:

我不知道从哪里开始。在“线程化”时,代码既没有利用线程化(即,与多进程实现相比,它不会增加吞吐量/减少延迟),甚至没有共享 any 线程之间的状态(我看不到它使用互斥锁来做什么——“关键路径”代码都没有使用全局状态,所以为什么要锁定?)。其次,每个请求创建/使用/销毁套接字通常是一个坏主意(为了速度)。相反,可以管理一个连接池,并让现有连接至少停留一段时间。 更重要的是:TCP是一个stream协议。没有数据包 所以你建议尝试多处理? thoughput 没有增加是什么意思?如果时间更小,文件相同,不是自动变大吗? 'usleep(20000);'......为什么? 【参考方案1】:

因此,每次数据包到达消息队列时,都会有一个线程“唤醒”,打开与远程服务器的 TCP 连接并发送数据包

如果您完全关心速度或效率,请不要这样做。使用 TCP 套接字可以做的最昂贵的事情就是初始连接。您进行 3 次握手只是为了发送一条消息!

然后,您在执行整个操作时持有一个全局互斥锁 - 这也是您程序中最慢的操作。

当前的设计是有效地单线程的,但采用的是最复杂和最昂贵的方式。

我可以看到使用线程而不是一个连接发送文件所需的时间更短,但吞吐量没有改变

我不知道你实际测量的是什么,你也不清楚。什么是文件?一个片段?多个片段?与您的 MTU 相比,它有多大?您是否检查过片段实际上是按正确的顺序接收的,因为在我看来,唯一可能的并行性就是可能中断的地方。

如何才能降低单个文件的延迟和不变的吞吐量?

如何验证我的线程并行工作?

如果您在wireshark 中看到具有不同源端口的多个TCP 连接,并且它们的数据包是交错的,那么您就有了有效的并行性。这不太可能,因为您使用全局互斥锁明确禁止它!

在wireshark中检查吞吐量的最佳方法是什么?

不要。使用wireshark检查数据包,使用服务器确定吞吐量。这才是结果真正重要的地方。

3.并行TCP的概念是否可以提高吞吐量?

如果您不知道它的用途,为什么要实现所有这些复杂性?

单个线程(正确编码且没有虚假互斥抖动)很有可能会使您的网络饱和,所以:不会。拥有多个 I/O 线程通常是为了方便地划分逻辑和状态(即,每个线程有一个客户端,或者不同线程中有不同的不相关 I/O 子系统),而不是性能。


如果你想从消息队列中提取数据包并将它们发送到 TCP,那么高效的方法是:

    只使用一个线程执行此操作(您的程序可能有其他线程执行其他操作 - 如果可能,请避免与它们同步) 打开一个到服务器的持久 TCP 连接,不要为每个片段都连接/关闭它 就是这样。它比您拥有的要简单得多,而且性能会更好。

您实际上可以让一个线程处理多个不同的连接,但我看不出这对您的情况有什么用处,所以请保持简单。

【讨论】:

感谢您的评论。我实现了它,因为我阅读了一些文章,它可以增加吞吐量,并使用更多的连接带宽。我尝试模拟通过卫星环境(具有较大的带宽和较大的延迟)加速 tcp 连接的系统,这就是我尝试使用并行机制的原因 如果你有一个非常高延迟的网络,你仍然可以使用非阻塞 I/O 代替线程(并且重复的 TCP 连接变得更加惩罚) 它是大带宽和大延迟(又名大带宽延迟产品),那么多个连接实际上可以加快速度。但是,特别是如果它是linux,你不需要在单独的线程中分离连接,你只需要有几个连接并在它们之间均匀分布数据(在一个线程中)。然后您还需要特殊的逻辑来重建接收器上的数据,因此您应该考虑是否真的需要这样做。首先,检查吞吐量是否太低,您实际上需要加快速度。 嗨,艾菲。我已经使用 linux tc 使用卫星设置检查了吞吐量,是的,吞吐量正在下降并且发生更多的重传。你能给我一些使用一个线程的几个 TCP 连接的例子吗? 或者你的意思是使用多处理?【参考方案2】:

以下是部分答案:

3.并行TCP的概念是否假设增加吞吐量?

有点。这真的取决于瓶颈是什么。

第一个可能的瓶颈是拥塞控制。 TCP 发送方对一次可以发送多少数据包有一个限制(在收到第一个分组的 ACK 之前),称为拥塞窗口。这个数字应该从小开始并随着时间的推移而增长。此外,如果一个数据包丢失,这个数字会减半,然后缓慢增长,直到发生下一次丢包。然而,限制是每个 TCP 连接,因此如果您将数据分布在多个并行连接上,则总体拥塞窗口(所有流的所有窗口的总和)将增长得更快,并且下降的量更少。 (这是一个总结,细节你需要知道拥塞控制是如何工作的,这是一个很大的话题)。无论您是否使用线程,这都应该发生。您可以在一个线程中打开多个连接,并且达到相同的效果。

第二个可能的瓶颈是操作系统中的网络处理。 AFAIK 这是从 10Gb 连接开始的问题。也许是 1Gb,但可能不是。 TCP 处理发生在操作系统中,而不是在您的应用程序中。如果操作系统在处理器之间分散处理,您可能会获得更好的性能(应该有参数来启用它),并且可能会因为缓存而获得更好的性能。

如果你从磁盘读取文件,你的磁盘 IO 也很可能成为瓶颈。在这种情况下,我认为在不同线程之间传播发送数据实际上并没有帮助。

【讨论】:

感谢您的评论

以上是关于使用线程的并行 TCP 连接的主要内容,如果未能解决你的问题,请参考以下文章

Qt 多线程 QThreads 保持 TCP 连接并被重用

TCP连接队列

多个并行tcp连接

网络基础之 Http

java网络编程——多线程数据收发并行

HTTP 2.0 - 1 个 TCP/IP 连接与 6 个并行