Unix/Linux 编程:网络编程之 基于Reactor实现WebSocket服务
Posted sesiria
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Unix/Linux 编程:网络编程之 基于Reactor实现WebSocket服务相关的知识,希望对你有一定的参考价值。
使用到的上一篇文章中基于epoll实现的Reactor模型。
OpenSSL使用其中的SHA1,base64 encode等库
openSSL源码使用1.1.0l
$ tar xzvf OpenSSL-1.1.0l.tar.gz
$ cd OpenSSL-1.1.0l
$ ./config –-prefix=/usr/local/openssl
$ make
$ sudo make install
安装完以后需要将openSSL的库和头文件添加到默认的搜索路径。否则后续编译需要加一大串 -I "...." -L "..." 这样的命令
执行
这是给全局用户修改
sudo vim /etc/profile
加入这两行然后重新登录shell后,gcc会将上面路径默认添加到搜索路径。
之后就可以编译了比如
websocket简介 参考文档RFC6455
1. 握手 基于http
2. 传输,握手后可以传输自己的应用层定义数据。 websocket是双向的,服务器也可以主动通知客户端。
2. 数据帧
websocket协议的实现
/* Server based on EPOLL Reactor Model
* compile command: gcc -I /usr/local/openssl/include -L /usr/local/openssl/lib websocket_server.c -o websocket_server -lcrypto
* if the environment variable has been added. just add the '-lcrypto' for compile.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <openssl/sha.h> // import openssl for sha-1
#include <openssl/pem.h>
#include <openssl/bio.h>
#include <openssl/evp.h>
#define BUFFER_LENGTH 1024
#define EVENT_SIZE 1024
#define GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
// define the state machine for web socket protocol
enum WS_STATUS {
WS_INIT = 0,
WS_HANDSHAKE = 1,
WS_DATATRANSFORM = 2,
WS_DATAEND
};
struct sockitem {
int sockfd;
int (*callback)(int events, void* arg);
char recvbuffer[BUFFER_LENGTH];
char sendbuffer[BUFFER_LENGTH];
int rlength; // length of the received buffer
int slength; // length of the send buffer.
int status;
};
struct reactor{
int epollfd;
struct epoll_event events[EVENT_SIZE];
};
// store for big-endian. // for FIN RSV1 RSV2 RSV3 opcode
struct _websocket_ophdr {
unsigned char opcode:4,
rsv3:1,
rsv2:1,
rsv1:1,
fin:1;
unsigned char payload_length:7,
mask:1;
}__attribute__((packed));
// struct for RFC6455 Websocket data frame.
struct _websocket_head_126 {
// length:1 0-->125
// length:2 126 --> 2^23
// length:3 127 --> 2^71
unsigned short payload_length;
char mask_key[4];
unsigned char data[8];
}__attribute__((packed));
struct _websocket_head_127 {
unsigned long long payload_length;
char mask_key[4];
unsigned char data[8];
} __attribute__((packed));
typedef struct _websocket_head_127 websocket_head_127;
typedef struct _websocket_head_126 websocket_head_126;
typedef struct _websocket_ophdr ophdr;
struct reactor * g_eventloop = NULL;
//==============function declaration==============
int recv_cb(int events, void* arg);
char* decode_packet(char *stream, char *mask, int length, int *ret);
int encode_packet(char *buffer,char *mask, char *stream, int length);
//===============end declaration==================
int send_cb(int events, void *arg) {
if (!(events & EPOLLOUT) || arg == NULL)
return -1;
struct sockitem* si = (struct sockitem*)arg;
int clientfd = si->sockfd;
int epoll_fd = g_eventloop->epollfd;
if (send(clientfd, si->sendbuffer, si->slength, 0)
== -1) {
if(errno == EAGAIN || errno == EWOULDBLOCK) {
// the send buffer is full.
// add send into EPOLLOUT event.
return 0;
}
// send data error maybe some network issue.
perror("send error");
close(clientfd);
return -1;
}
// change state into EPOLLIN
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
//WS_DATATRANSFORM; // once we have finished the transfer of header. change the state.
if(si->status == WS_HANDSHAKE)
si->status++;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, clientfd, &ev);
return 0;
}
int base64_encode(char *in_str, int in_len, char *out_str) {
BIO *b64, *bio;
BUF_MEM *bptr = NULL;
size_t size = 0;
if (in_str == NULL || out_str == NULL)
return -1;
b64 = BIO_new(BIO_f_base64());
bio = BIO_new(BIO_s_mem());
bio = BIO_push(b64, bio);
BIO_write(bio, in_str, in_len);
BIO_flush(bio);
BIO_get_mem_ptr(bio, &bptr);
memcpy(out_str, bptr->data, bptr->length);
out_str[bptr->length-1] = '\\0';
size = bptr->length;
BIO_free_all(bio);
return size;
}
int readline(char *allbuf, int level, char*linebuf) {
int len = strlen(allbuf);
for(; level < len; ++level) {
if(allbuf[level] == '\\r' && allbuf[level + 1] == '\\n')
return level + 2;
else
*(linebuf++) = allbuf[level];
}
return -1;
}
// implement for state handshake of the websocket protocol.
int handshake(struct sockitem *si, struct reactor *mainloop) {
char linebuf[BUFSIZ] = {0};
char sec_accept[32] = {0};
unsigned char sha1_data[SHA_DIGEST_LENGTH + 1] = {0};
char head[BUFFER_LENGTH] = {0};
int level = 0;
// si->recvbuffer, si->rlength;
do {
memset(linebuf, 0, sizeof(linebuf));
level = readline(si->recvbuffer, level, linebuf);
// if this line container the Sec-WebSocket-Key.
if(strstr(linebuf, "Sec-WebSocket-Key") != NULL) {
strcat(linebuf, GUID); // add the GUID
// encode by SHA1 alrogithm from openSSL
SHA1((unsigned char*)&linebuf + 19, strlen(linebuf + 19), (unsigned char*)&sha1_data);
base64_encode(sha1_data, strlen(sha1_data), sec_accept);
sprintf(head, "HTTP/1.1 101 Switching Protocols\\r\\n" \\
"Upgrade: websocket\\r\\n" \\
"Connection: Upgrade\\r\\n" \\
"Sec-WebSocket-Accept: %s\\r\\n" \\
"\\r\\n" , sec_accept);
printf("response\\n");
printf("%s\\n\\n\\n", head);
memset(si->sendbuffer, 0, BUFFER_LENGTH);
memset(si->recvbuffer, 0, BUFFER_LENGTH);
// copy head to sendbuffer
si->slength = strlen(head);
memcpy(si->sendbuffer, head, si->slength);
si->rlength = 0;
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
//si->sockfd = si->sockfd;
si->callback = send_cb;
ev.data.ptr = si;
epoll_ctl(mainloop->epollfd, EPOLL_CTL_MOD, si->sockfd, &ev);
break;
}
} while((si->recvbuffer[level] != 'r' || si->recvbuffer[level + 1] != '\\n') && level!= -1);
return 0;
}
// implementation for data transform for WS_DATATRANSFORM state.
int datatransform(struct sockitem *si, struct reactor *mainloop) {
// si->status --> WS_DATAFRANSFORM
// si->recvbuffer
int ret = 0;
char mask[4] = {0};
char *data = decode_packet(si->recvbuffer, mask, si->rlength, &ret);
// process the data frame from the client.
printf("data : %s , length : %d\\n", data, ret);
ret = encode_packet(si->sendbuffer, mask, data, ret);
si->slength = ret;
memset(si->recvbuffer, 0, BUFFER_LENGTH);
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
// si->sockfd = si->sockfd;
si->callback = send_cb;
si->status = WS_DATATRANSFORM;
ev.data.ptr = si;
epoll_ctl(mainloop->epollfd, EPOLL_CTL_MOD, si->sockfd, &ev);
}
void umask(char *data,int len,char *mask) {
int i;
for (i = 0;i < len;i ++)
*(data+i) ^= *(mask+(i%4));
}
char* decode_packet(char *stream, char *mask, int length, int *ret) {
ophdr *hdr = (ophdr*)stream;
unsigned char *data = stream + sizeof(ophdr);
int size = 0;
int start = 0;
//char mask[4] = {0};
int i = 0;
//if (hdr->fin == 1) return NULL;
if ((hdr->mask & 0x7F) == 126) {
websocket_head_126 *hdr126 = (websocket_head_126*)data;
size = hdr126->payload_length;
for (i = 0;i < 4;i ++) {
mask[i] = hdr126->mask_key[i];
}
start = 8;
} else if ((hdr->mask & 0x7F) == 127) {
websocket_head_127 *hdr127 = (websocket_head_127*)data;
size = hdr127->payload_length;
for (i = 0;i < 4;i ++) {
mask[i] = hdr127->mask_key[i];
}
start = 14;
} else {
size = hdr->payload_length;
memcpy(mask, data, 4);
start = 6;
}
*ret = size;
umask(stream+start, size, mask);
return stream + start;
}
int encode_packet(char *buffer,char *mask, char *stream, int length) {
ophdr head = {0};
head.fin = 1;
head.opcode = 1;
int size = 0;
if (length < 126) {
head.payload_length = length;
memcpy(buffer, &head, sizeof(ophdr));
size = 2;
} else if (length < 0xffff) {
websocket_head_126 hdr = {0};
hdr.payload_length = length;
memcpy(hdr.mask_key, mask, 4);
memcpy(buffer, &head, sizeof(ophdr));
memcpy(buffer+sizeof(ophdr), &hdr, sizeof(websocket_head_126));
size = sizeof(websocket_head_126);
} else {
websocket_head_127 hdr = {0};
hdr.payload_length = length;
memcpy(hdr.mask_key, mask, 4);
memcpy(buffer, &head, sizeof(ophdr));
memcpy(buffer+sizeof(ophdr), &hdr, sizeof(websocket_head_127));
size = sizeof(websocket_head_127);
}
memcpy(buffer+2, stream, length);
return length + 2;
}
int recv_cb(int events, void* arg) {
if (!(events & EPOLLIN) || arg == NULL)
return -1;
struct sockitem* si = (struct sockitem*)arg;
int clientfd = si->sockfd;
int epoll_fd = g_eventloop->epollfd;
// char buffer[BUFFER_LENGTH] = { 0 };
struct epoll_event ev;
int ret = recv(clientfd, si->recvbuffer, BUFFER_LENGTH, 0);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("read all data\\n");
}
close(clientfd);
ev.events = EPOLLIN | EPOLLET;
//ev->data.fd = clientfd;
ev.data.ptr = NULL;
free(si);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
}
else if (ret == 0) {
printf(" disconnect clientfd:%d\\n", clientfd);
close(clientfd);
ev.events = EPOLLIN | EPOLLET;
//ev->data.fd = clientfd;
ev.data.ptr = NULL;
free(si);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
return 0;
}
else {
printf("Recv: \\n%.*s\\nTotal: %d Bytes\\n", ret, si->recvbuffer, ret);
#if 0
si->rlength = ret;
memcpy(si->sendbuffer, si->recvbuffer, si->rlength);
si->slength = si->rlength;
#else
// state machine.
if (si->status == WS_HANDSHAKE) {
handshake(si, g_eventloop);
} else if (si->status == WS_DATATRANSFORM) {
datatransform(si, g_eventloop);
} else if (si->status == WS_DATAEND) {
} else {
assert(0);
}
#endif
}
}
// callback handler for accept events of the sockfd.
int accept_cb(int events, void* arg) {
if (!(events & EPOLLIN) || arg == NULL)
return -1;
struct sockitem* psi = (struct sockitem*)arg;
int epoll_fd = g_eventloop->epollfd;
int sockfd = psi->sockfd;
struct sockaddr_in client_addr;
struct epoll_event ev;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0)
return -1; // do nothing.
char str[INET_ADDRSTRLEN] = { 0 };
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = clientfd;
si->callback = recv_cb;
si->status = WS_HANDSHAKE;
printf("received from %s at port:%d, sockfd:%d, clientfd:%d\\n",
inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
ntohs(client_addr.sin_port), sockfd, clientfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = si;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);
}
// ./epoll 8080.
int main(int argc, char* argv[]) {
if (argc < 2) {
printf("parameter error!\\n");
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port); // convert network bytes
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
exit(EXIT_FAILURE);
}
printf("start server and wait for connection...\\n");
if (listen(sockfd, 5) < 0) {
exit(EXIT_FAILURE);
}
g_eventloop = (struct reactor*) malloc(sizeof(struct reactor));
// epoll coding.
g_eventloop->epollfd = epoll_create(1); // create the epoll root node for epoll object.
struct epoll_event ev;
ev.events = EPOLLIN;
//ev.data.fd = sockfd;
// define socketitem
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->status = WS_INIT;
si->callback = accept_cb;
ev.data.ptr = si;
epoll_ctl(g_eventloop->epollfd, EPOLL_CTL_ADD, sockfd, &ev);
while (1) {
// condition wait.
// max amount of events could be caught by 1 epoll_wait call.
int nready = epoll_wait(g_eventloop->epollfd, g_eventloop->events, EVENT_SIZE, -1);
if (nready < -1) {
break;
}
int i = 0;
for (i = 0; i < nready; i++) {
if (g_eventloop->events[i].events & EPOLLIN) {
struct sockitem* si = (struct sockitem*)g_eventloop->events[i].data.ptr;
if(si && si->callback)
si->callback(g_eventloop->events[i].events, si);
}
if (g_eventloop->events[i].events & EPOLLOUT) {
struct sockitem* si = (struct sockitem*)g_eventloop->events[i].data.ptr;
if(si && si->callback)
si->callback(g_eventloop->events[i].events, si);
}
}
}
close(sockfd);
exit(EXIT_SUCCESS);
}
测试:
客户端
服务端:
以上是关于Unix/Linux 编程:网络编程之 基于Reactor实现WebSocket服务的主要内容,如果未能解决你的问题,请参考以下文章
Unix/Linux 编程:网络编程之 epoll与Reactor
从事UNIX/LInux服务器编程最方便的代码编译工具------(eclipse for c/c++)(FileZilla)(Secure CRT) 这三种一定要一起使用 之3
从事UNIX/LInux服务器编程最方便的代码编译工具------(eclipse for c/c++)(FileZilla)(Secure CRT) 这三种一定要一起使用 之4