异步接收来自服务器的消息

Posted

技术标签:

【中文标题】异步接收来自服务器的消息【英文标题】:Recieve a message from server asynchronously 【发布时间】:2021-07-06 09:18:54 【问题描述】:

我有一个客户端程序和一个服务器程序。可能有多个服务器和多个 可以连接到那里选择的多个服务器的客户端 客户端程序列出一个菜单

    connect 4000 // 连接到服务器的 4000 端口 bid 1000 4000 // 向4000端口的服务器发送出价1000

现在服务器可能会收到来自与其连接的多个客户端的出价并跟踪最高的 出价到现在。每当提出新的出价时,服务器都会向每个连接的客户端发送广播 一一喜欢-write(users[i].sock_fd, msg, size)。 如何在客户端收听此消息? 这里有两件事

    客户端需要监听服务器发送的消息。 客户端还从用户的命令行读取文本或菜单项(连接和出价)。

我已经对第 2) 部分进行了编码,但对如何将 1) 编码到客户端并同时使 2) 也能正常工作感到困惑

客户端代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>

#define BUF_SIZE 128

#define MAX_AUCTIONS 5
#ifndef VERBOSE
#define VERBOSE 0
#endif

#define ADD 0
#define SHOW 1
#define BID 2
#define QUIT 3

/* Auction struct - this is different than the struct in the server program
 */
typedef struct auction_data

    int sock_fd;
    char item[BUF_SIZE];
    int current_bid;
 auction_data;
auction_data *auction_data_ptr;

/* Displays the command options available for the user.
 * The user will type these commands on stdin.
 */

void print_menu()

    printf("The following operations are available:\n");
    printf("    show\n");
    printf("    add <server address> <port number>\n");
    printf("    bid <item index> <bid value>\n");
    printf("    quit\n");


/* Prompt the user for the next command 
 */
void print_prompt()

    printf("Enter new command: ");
    fflush(stdout);


/* Unpack buf which contains the input entered by the user.
 * Return the command that is found as the first word in the line, or -1
 * for an invalid command.
 * If the command has arguments (add and bid), then copy these values to
 * arg1 and arg2.
 */
int parse_command(char *buf, int size, char *arg1, char *arg2)

    int result = -1;
    char *ptr = NULL;
    if (strncmp(buf, "show", strlen("show")) == 0)
    
        return SHOW;
    
    else if (strncmp(buf, "quit", strlen("quit")) == 0)
    
        return QUIT;
    
    else if (strncmp(buf, "add", strlen("add")) == 0)
    
        result = ADD;
    
    else if (strncmp(buf, "bid", strlen("bid")) == 0)
    
        result = BID;
    

    ptr = strtok(buf, " ");  // first word in buf
    ptr = strtok(NULL, " "); // second word in buf

    if (ptr != NULL)
    
        strncpy(arg1, ptr, BUF_SIZE);
    
    else
    
        return -1;
    
    ptr = strtok(NULL, " "); // third word in buf

    if (ptr != NULL)
    
        strncpy(arg2, ptr, BUF_SIZE);
        return result;
    
    else
    
        return -1;
    
    return -1;


/* Connect to a server given a hostname and port number.
 * Return the socket for this server
 */
int add_server(char *hostname, int port)

    // Create the socket FD.
    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (sock_fd < 0)
    
        perror("client: socket");
        exit(1);
    

    // Set the IP and port of the server to connect to.
    struct sockaddr_in server;
    server.sin_family = AF_INET;
    server.sin_port = htons(port);
    struct addrinfo *ai;

    /* this call declares memory and populates ailist */
    if (getaddrinfo(hostname, NULL, NULL, &ai) != 0)
    
        close(sock_fd);
        return -1;
    
    /* we only make use of the first element in the list */
    server.sin_addr = ((struct sockaddr_in *)ai->ai_addr)->sin_addr;

    // free the memory that was allocated by getaddrinfo for this list
    freeaddrinfo(ai);

    // Connect to the server.
    if (connect(sock_fd, (struct sockaddr *)&server, sizeof(server)) == -1)
    
        perror("client: connect");
        close(sock_fd);
        return -1;
    
    if (VERBOSE)
    
        fprintf(stderr, "\nDebug: New server connected on socket %d. Awaiting item\n", sock_fd);
    
    return sock_fd;

/* ========================= Add helper functions below ========================
 * Please add helper functions below to make it easier for the TAs to find the 
 * work that you have done.  Helper functions that you need to complete are also
 * given below.
 */

/* Print to standard output information about the auction
 */
void print_auctions(struct auction_data *a, int size)

    printf("Current Auctions:\n");

    for (int i = 0; i < size; i++)
    
        struct auction_data auction_data = a[i];
        printf("(%d) %s bid = %d\n", i, auction_data.item, auction_data.current_bid);
    

    /* TODO Print the auction data for each currently connected 
     * server.  Use the follosing format string:
     *     "(%d) %s bid = %d\n", index, item, current bid
     * The array may have some elements where the auction has closed and
     * should not be printed.
     */


/* Process the input that was sent from the auction server at a[index].
 * If it is the first message from the server, then copy the item name
 * to the item field.  (Note that an item cannot have a space character in it.)
 */
void update_auction(char *buf, int size, struct auction_data *a, int index)


    // TODO: Complete this function

    // fprintf(stderr, "ERROR malformed bid: %s", buf);
    // printf("\nNew bid for %s [%d] is %d (%d seconds left)\n",           );


int main(void)


    char name[BUF_SIZE];
    int size = 0;
    // Declare and initialize necessary variables
    // TODO

    // Get the user to provide a name.
    printf("Please enter a username: ");
    fflush(stdout);
    int num_read = read(STDIN_FILENO, name, BUF_SIZE);
    printf("%s-name\n", name);
    if (num_read <= 0)
    
        fprintf(stderr, "ERROR: read from stdin failed\n");
        exit(1);
    
    print_menu();
    // TODO
    char server_reply[2000];

    while (1)
    

        print_prompt();
        char *command;
        scanf("%m[^\n]s", &command);
        getchar();
        char arg1[100];
        char arg2[100];
        int commandNumber = parse_command(command, 1000, arg1, arg2);
        char dest[100] = "";
        strcpy(dest, name);
        dest[strlen(dest) - 1] = '\0';
        if (commandNumber == ADD)
        
            printf("%s-name4\n", dest);

            int port = atoi(arg2);
            int sock_fd = add_server(arg1, port);
            printf("%s-server\n", server_reply);
            write(sock_fd, dest, strlen(dest));
            auction_data_ptr = (auction_data *)realloc(auction_data_ptr, (size + 1) * sizeof(auction_data_ptr));
            auction_data_ptr[size].sock_fd = sock_fd;
            size++;
        
        else if (commandNumber == SHOW)
        
            print_auctions(auction_data_ptr, size);
        
        else if (commandNumber == BID)
        
            int itemIndex = atoi(arg1);
            int bidValue = atoi(arg2);
            printf("%d-test\n", auction_data_ptr[itemIndex].sock_fd);
            send(auction_data_ptr[itemIndex].sock_fd, arg2, strlen(arg2), 0);
        
        else if (commandNumber == QUIT)
        
        

        // TODO
    
    return 0; // Shoud never get here

服务器代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#ifndef PORT
#define PORT 30000
#endif
#define MAX_BACKLOG 5
#define MAX_CONNECTIONS 20
#define BUF_SIZE 128
#define MAX_NAME 56

int verbose = 0;

struct user

    int sock_fd;
    char name[MAX_NAME];
    int bid;
;

typedef struct

    char *item;
    int highest_bid; // value of the highest bid so far
    int client;      // index into the users array of the top bidder
 Auction;

/*
 * Accept a connection. Note that a new file descriptor is created for
 * communication with the client. The initial socket descriptor is used
 * to accept connections, but the new socket is used to communicate.
 * Return the new client's file descriptor or -1 on error.
 */
int accept_connection(int fd, struct user *users)

    int user_index = 0;
    while (user_index < MAX_CONNECTIONS && users[user_index].sock_fd != -1)
    
        user_index++;
    

    if (user_index == MAX_CONNECTIONS)
    
        fprintf(stderr, "server: max concurrent connections\n");
        return -1;
    

    int client_fd = accept(fd, NULL, NULL);
    if (client_fd < 0)
    
        perror("server: accept");
        close(fd);
        exit(1);
    

    users[user_index].sock_fd = client_fd;
    users[user_index].name[0] = '\0';
    return client_fd;


/* Remove \r\n from str if the characters are at the end of the string.
 * Defensively assuming that \r could be the last or second last character.
 */
void strip_newline(char *str)

    if (str[strlen(str) - 1] == '\n' || str[strlen(str) - 1] == '\r')
    
        if (str[strlen(str) - 2] == '\r')
        
            str[strlen(str) - 2] = '\0';
        
        else
        
            str[strlen(str) - 1] = '\0';
        
    


/*
 * Read a name from a client and store in users.
 * Return the fd if it has been closed or 0 otherwise.
 */
int read_name(int client_index, struct user *users)

    int fd = users[client_index].sock_fd;

    /* Note: This is not the best way to do this.  We are counting
     * on the client not to send more than BUF_SIZE bytes for the
     * name.
     */
    int num_read = read(fd, users[client_index].name, MAX_NAME);
    if (num_read == 0)
    
        users[client_index].sock_fd = -1;
        return fd;
    
    users[client_index].name[num_read] = '\0';
    strip_newline(users[client_index].name);

    if (verbose)
    
        fprintf(stderr, "[%d] Name: %s\n", fd, users[client_index].name);
    

    /*
    if (num_read == 0 || write(fd, buf, strlen(buf)) != strlen(buf)) 
        users[client_index].sock_fd = -1;
        return fd;
    
*/

    return 0;


/* Read a bid from a client and store it in bid.
 * If the client does not send a number, bid will be set to -1
 * Return fd if the socket is closed, or 0 otherwise.
 */
int read_bid(int client_index, struct user *users, int *bid)

    printf("inside bid\n");
    int fd = users[client_index].sock_fd;
    char buf[BUF_SIZE];
    char *endptr;
    int num_read = read(fd, buf, BUF_SIZE);
    if (num_read == 0)
    
        return fd;
    
    buf[num_read] = '\0';

    if (verbose)
    
        fprintf(stderr, "[%d] bid: %s", fd, buf);
    

    // Check if the client sent a valid number
    // (We are not checking for a good bid here.)
    errno = 0;
    *bid = strtol(buf, &endptr, 10);
    if (errno != 0 || endptr == buf)
    
        *bid = -1;
    

    return 0;


void broadcast(struct user *users, char *msg, int size)

    for (int i = 0; i < MAX_CONNECTIONS; i++)
    
        if (users[i].sock_fd != -1)
        
            if (write(users[i].sock_fd, msg, size) == -1)
            
                // Design flaw: can't remove this socket from select set
                close(users[i].sock_fd);
                users[i].sock_fd = -1;
            
        
    


int prep_bid(char *buf, Auction *a, struct timeval *t)

    // send item, current bid, time left in seconds
    printf("robin2-%s-%d\n", a->item, a->highest_bid);

    printf("robin-%ld\n", t->tv_sec);
    sprintf(buf, "%s %d %ld", a->item, a->highest_bid, t->tv_sec);
    printf("robin-bid2\n");

    return 0;


/* Update auction if new_bid is higher than current bid.  
 * Write to the client who made the bid if it is lower
 * Broadcast to all clients if the bid is higher
 */
int update_bids(int client_index, struct user *users,
                int new_bid, Auction *auction, struct timeval *t)

    char buf[BUF_SIZE];

    if (new_bid > auction->highest_bid)
    
        auction->highest_bid = new_bid;
        auction->client = client_index;

        prep_bid(buf, auction, t);
        if (verbose)
        
            fprintf(stderr, "[%d] Sending to %d:\n    %s\n",
                    getpid(), users[client_index].sock_fd, buf);
        

        broadcast(users, buf, strlen(buf) + 1);
    
    else
    
        fprintf(stderr, "Client %d sent bid that was too low.  Ignored\n",
                client_index);
    
    return 0;


int main(int argc, char **argv)

    argc = 7;
    argv[1] = "-v";
    argv[2] = "-t";
    argv[3] = "5";
    argv[4] = "-p";
    argv[5] = "4000";
    argv[6] = "robin";
    Auction auction;
    int opt;
    int port = PORT;
    struct timeval timeout;
    struct timeval *time_ptr = NULL;
    int minutes = 0;
    while ((opt = getopt(argc, argv, "vt:p:")) != -1)
    
        switch (opt)
        
        case 'v':
            verbose = 1;
            break;
        case 't':
            minutes = atoi(optarg);
            timeout.tv_sec = minutes * 60;
            timeout.tv_usec = 0;
            time_ptr = &timeout;
            break;
        case 'p':
            port = atoi(optarg);
            break;
        default:
            fprintf(stderr, "Usage: auction_server [-v] [-t timeout] [-p port] item\n");
            exit(1);
        
    
    if (optind >= argc)
    
        fprintf(stderr, "Expected argument after options\n");
        exit(1);
    

    auction.item = argv[optind];
    auction.client = -1;
    auction.highest_bid = -1;

    struct user users[MAX_CONNECTIONS];
    for (int index = 0; index < MAX_CONNECTIONS; index++)
    
        users[index].sock_fd = -1;
        users[index].name[0] = '\0';
    

    // Create the socket FD.
    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (sock_fd < 0)
    
        perror("server: socket");
        exit(1);
    

    // Set information about the port (and IP) we want to be connected to.
    struct sockaddr_in server;
    server.sin_family = AF_INET;
    server.sin_port = htons(port);
    server.sin_addr.s_addr = INADDR_ANY;

    // This sets an option on the socket so that its port can be reused right
    // away. Since you are likely to run, stop, edit, compile and rerun your
    // server fairly quickly, this will mean you can reuse the same port.
    int on = 1;
    int status = setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR,
                            (const char *)&on, sizeof(on));
    if (status == -1)
    
        perror("setsockopt -- REUSEADDR");
    

    // This should always be zero. On some systems, it won't error if you
    // forget, but on others, you'll get mysterious errors. So zero it.
    memset(&server.sin_zero, 0, 8);

    // Bind the selected port to the socket.
    if (bind(sock_fd, (struct sockaddr *)&server, sizeof(server)) < 0)
    
        perror("server: bind");
        close(sock_fd);
        exit(1);
    

    // Announce willingness to accept connections on this socket.
    if (listen(sock_fd, MAX_BACKLOG) < 0)
    
        perror("server: listen");
        close(sock_fd);
        exit(1);
    

    if (verbose)
    
        fprintf(stderr, "[%d] Ready to accept connections on %d\n",
                getpid(), port);
    

    // The client accept - message accept loop. First, we prepare to listen
    // to multiple file descriptors by initializing a set of file descriptors.
    int max_fd = sock_fd;
    fd_set all_fds;
    FD_ZERO(&all_fds);
    FD_SET(sock_fd, &all_fds);

    while (1)
    
        // select updates the fd_set it receives, so we always use a copy
        // and retain the original.
        fd_set listen_fds = all_fds;
        int nready;
        if ((nready = select(max_fd + 1, &listen_fds, NULL, NULL, time_ptr)) == -1)
        
            perror("server: select");
            exit(1);
        
        if (nready == 0)
        
            char buf[BUF_SIZE];
            sprintf(buf, "Auction closed: %s wins with a bid of %d\r\n",
                    users[auction.client].name, auction.highest_bid);
            printf("%s", buf);
            broadcast(users, buf, BUF_SIZE);
            exit(0);
        
        // Is it the original socket? Create a new connection ...
        if (FD_ISSET(sock_fd, &listen_fds))
        

            int client_fd = accept_connection(sock_fd, users);
            if (client_fd != -1)
            
                if (client_fd > max_fd)
                
                    max_fd = client_fd;
                
                FD_SET(client_fd, &all_fds);
                if (verbose)
                
                    fprintf(stderr, "[%d] Accepted connection on %d\n",
                            getpid(), client_fd);
                
            
        

        // Next, check the clients.
        for (int index = 0; index < MAX_CONNECTIONS; index++)
        
            if (users[index].sock_fd > -1 && FD_ISSET(users[index].sock_fd, &listen_fds))
            
                int client_closed = 0;
                int new_bid = 0;

                if (users[index].name[0] == '\0')
                
                    client_closed = read_name(index, users);
                    if (client_closed == 0)
                    
                        char buf[BUF_SIZE];
                        prep_bid(buf, &auction, time_ptr);
                        if (verbose)
                        
                            fprintf(stderr, "[%d] Sending to %d:\n    %s\n",
                                    getpid(), users[index].sock_fd, buf);
                        
                        if (write(users[index].sock_fd, buf, strlen(buf) + 1) == -1)
                        
                            fprintf(stderr, "Write to %d failed\n", sock_fd);
                            close(sock_fd);
                        
                    
                
                else
                 // read a bid
                    client_closed = read_bid(index, users, &new_bid);
                    if (client_closed == 0)
                    
                        update_bids(index, users, new_bid, &auction, time_ptr);
                    
                

                if (client_closed > 0)
                
                    FD_CLR(client_closed, &all_fds);
                    printf("Client %d disconnected\n", client_closed);
                
            
        
    

    // Should never get here.
    return 1;

【问题讨论】:

编辑您的问题,并在此处将您的服务器和客户端作为代码块中的文本发布。如果没有看到您已经拥有的东西,我们就无法真正明智地发表评论。 添加了客户端和服务器代码。这不是完整的代码,因为它太大了。如果这没有帮助,我可以包括在内。服务器代码中的void broadcast(struct user *users, char *msg, int size) 写入连接到它的每个客户端。我在客户端做什么才能阅读此消息 简而言之,我必须做类似recv(sock_fd, server_reply, 2000, 0 的事情,但我需要听服务器/等待它接收消息。我认为我们可以忽略整个代码和描述,只需将其设为一行 - How does client wait for the server to recieve a message in a separate thread 【参考方案1】:

警告:因为您只发布了服务器和客户端的部分代码,这将是一些建议。

您的客户可以同时附加/连接到多个个投标服务器。因此,它必须能够以类似于服务器的方式跟踪多个连接。

您的主要 [已说明] 问题是您在用户提示下阻止了客户端(例如,来自 stdin 通过 scanf 等)。目前,这意味着客户端“卡”在用户输入提示符下,并且不能从它所连接的服务器发送消息。下面详细了解如何解决此问题。

因此,您将有一堆来自服务器的代码需要在客户端中,但有一些细微差别。您可能希望稍微概括一些服务器代码,以便它可以在服务器客户端中运行(例如,您可能希望将其移动到common.c)。

您已经在服务器中有代码来处理多个连接。服务器需要一个select 掩码,它是监听 fd 和所有活动客户端 fd 的 OR。

同样,您的客户端需要一个 select 掩码,它是用户输入(例如 0)和所有活动服务器连接的 fd 的 OR。

在 fd 0 上执行select 使用stdio.h 流不会很好地工作。因此,将访问stdin 替换为(例如)read(0,line_buffer,sizeof(line_buffer))。如果 fd 0 是 select 掩码中的 set,则执行此操作。该角色与您的服务器在sock_fd 上为accept 所做的非常相似。

您需要允许部分读取并追加到缓冲区,直到看到换行符。因此,您必须完成fgets 通常在组装整条生产线时所做的工作。然后,您可以拨打parse_command

因为read 不理解换行符,用户可以输入多行,然后才能进行读取。

所以,对于用户输入:

connect 4000\n
bid 100 4000\n
connect 5000\n

您可能会读到以下内容:

conn
ect
 4000\nbid 100 4000
\nconnect
 5000\n

您可能还需要在 fd 0 上使用FIONREAD ioctl 来防止阻塞。而且,您可能需要通过termios 调用将内核 TTY 层设置为原始模式。

客户端现在变得与您的服务器代码非常相似。它将处理任何连接的服务器和用户输入的[异步]操作。

提示:根据 DRY 原则 [“不要重复自己”] ...

服务器中已经有一个struct user。客户将需要类似/相同的东西,例如struct server。在概括代码时,不要让两个不同的结构本质上做同样的事情,而是考虑将现有结构重命名为(例如)struct connection

【讨论】:

我已经更新了当前工作的完整代码。也许您可以准确指出需要进行更改的地方。这种类型的编码对我来说似乎很神秘

以上是关于异步接收来自服务器的消息的主要内容,如果未能解决你的问题,请参考以下文章

同步消息发送和接收 AGSCMPP

MicroServices - 通过AMQP与多个接收器场景进行异步通信

有没有办法实现 XMPP 客户端或接收到的消息,可以接收来自 XMPP 服务器的所有消息?

接收来自 emqx 推送消息的长时间运行服务

Rabbitmq的简单概述和源码部署

带有异步计时器的 Python 异步 websocket 客户端