linux socket poll io处理-udp

Posted osnet

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了linux socket poll io处理-udp相关的知识,希望对你有一定的参考价值。

文章目录

linux socket中定义了多种io事件,io事件发生时会调用它们处理函数。

struct sock 

    //sock wait queue and async head
	struct socket_wq __rcu	*sk_wq; // socket等待事件队列,用于io事件异步通知


	atomic_t		sk_wmem_alloc;

	void			(*sk_state_change)(struct sock *sk);//callback to indicate change in the state of the sock,socket状态发生变化
	void			(*sk_data_ready)(struct sock *sk, int bytes);//callback to indicate there is data to be processed,socket缓冲区有数据可读时继续回调
	void			(*sk_write_space)(struct sock *sk);//callback to indicate there is bf sending space available,socket缓冲区有足够空间发送数据时进行回调
	void			(*sk_error_report)(struct sock *sk); //callback to indicate errors, 发生socket 错误时

;

创建sock时会初始化io处理函数。

void sock_init_data(struct socket *sock, struct sock *sk)

	skb_queue_head_init(&sk->sk_receive_queue);
	skb_queue_head_init(&sk->sk_write_queue);
	skb_queue_head_init(&sk->sk_error_queue);
#ifdef CONFIG_NET_DMA
	skb_queue_head_init(&sk->sk_async_wait_queue);
#endif

	sk->sk_send_head	=	NULL;

	init_timer(&sk->sk_timer);

	sk->sk_allocation	=	GFP_KERNEL;
	sk->sk_rcvbuf		=	sysctl_rmem_default;
	sk->sk_sndbuf		=	sysctl_wmem_default;
	sk->sk_state		=	TCP_CLOSE;

    //初始化io事件回调处理函数
	sk->sk_state_change	=	sock_def_wakeup;
	sk->sk_data_ready	=	sock_def_readable;
	sk->sk_write_space	=	sock_def_write_space;
	sk->sk_error_report	=	sock_def_error_report;
	sk->sk_destruct		=	sock_def_destruct;



udp_poll

先看poll等待事件的设置,do_select函数中回调

do_select:
mask = (*f_op->poll)(f.file, wait);
const struct proto_ops inet_dgram_ops = 
    ....
	.poll		   = udp_poll,
	....
	;
unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)

	unsigned int mask = datagram_poll(file, sock, wait);
	struct sock *sk = sock->sk;

	return mask;



unsigned int datagram_poll(struct file *file, struct socket *sock,
			   poll_table *wait)

	struct sock *sk = sock->sk;
	unsigned int mask;
   
    //把wait插入等待队列sk->sk_wq
	sock_poll_wait(file, sk_sleep(sk), wait);
	mask = 0;

	/* exceptional events? */
	//io 异常事件
	if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
		mask |= POLLERR |
			(sock_flag(sk, SOCK_SELECT_ERR_QUEUE) ? POLLPRI : 0);

	if (sk->sk_shutdown & RCV_SHUTDOWN)
		mask |= POLLRDHUP | POLLIN | POLLRDNORM;
	if (sk->sk_shutdown == SHUTDOWN_MASK)
		mask |= POLLHUP;

	/* readable? */
	//接收队列不为空,可读
	if (!skb_queue_empty(&sk->sk_receive_queue))
		mask |= POLLIN | POLLRDNORM;

	/* Connection-based need to check for termination and startup */
	//协议状态变化事件
	if (connection_based(sk)) 
		if (sk->sk_state == TCP_CLOSE)
			mask |= POLLHUP;
		/* connection hasn't started yet? */
		if (sk->sk_state == TCP_SYN_SENT)
			return mask;
	

	/* writable? */
	//可写事件
	if (sock_writeable(sk))
		mask |= POLLOUT | POLLWRNORM | POLLWRBAND;
	else
		set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);

	return mask;

把wait插入等待队列sk->sk_wq

static inline wait_queue_head_t *sk_sleep(struct sock *sk)

	BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
	return &rcu_dereference_raw(sk->sk_wq)->wait;



static inline void sock_poll_wait(struct file *filp,
		wait_queue_head_t *wait_address, poll_table *p)

    //p是在do_select中设置的,wait_address是sk->sk_wq->wait  socket等待队列
	if (!poll_does_not_wait(p) && wait_address) 
		poll_wait(filp, wait_address, p);
		/* We need to be sure we are in sync with the
		 * socket flags modification.
		 *
		 * This memory barrier is paired in the wq_has_sleeper.
		 */
		smp_mb();
	


static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)

	if (p && p->_qproc && wait_address)
		p->_qproc(filp, wait_address, p); //回调do_select中设置的函数


do_select中:
void poll_initwait(struct poll_wqueues *pwq)

	init_poll_funcptr(&pwq->pt, __pollwait);//__pollwait为_qproc
	pwq->polling_task = current; //设置当前进程
	pwq->triggered = 0;
	pwq->error = 0;
	pwq->table = NULL;
	pwq->inline_index = 0;


/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
				poll_table *p)

	struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
	struct poll_table_entry *entry = poll_get_entry(pwq);
	if (!entry)
		return;
	entry->filp = get_file(filp);
	entry->wait_address = wait_address;
	entry->key = p->_key;
	init_waitqueue_func_entry(&entry->wait, pollwake);
	entry->wait.private = pwq;
	//把wait加入到等待对列wait_address
	add_wait_queue(wait_address, &entry->wait);


static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p)

	struct poll_table_page *table = p->table;

	if (p->inline_index < N_INLINE_POLL_ENTRIES)
		return p->inline_entries + p->inline_index++;

	if (!table || POLL_TABLE_FULL(table)) 
		struct poll_table_page *new_table;

		new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
		if (!new_table) 
			p->error = -ENOMEM;
			return NULL;
		
		new_table->entry = new_table->entries;
		new_table->next = table;
		p->table = new_table;
		table = new_table;
	

	return table->entry++;

读IO事件唤醒

上面当前进程挂载到sk_wk等待队列,然后让出cpu,当网卡驱动收到数据,协议栈处理完成,会唤醒poll等待进程。

static const struct net_protocol udp_protocol = 
	.handler =	udp_rcv,
	.err_handler =	udp_err,
	.no_policy =	1,
	.netns_ok =	1,
;

协议栈处理完成,会调用udp_rcv函数,把接收数据挂到接收队列。

int udp_rcv(struct sk_buff *skb)

	return __udp4_lib_rcv(skb, &udp_table, IPPROTO_UDP);


int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
		   int proto)

	struct sock *sk;
	struct udphdr *uh;
	unsigned short ulen;
	struct rtable *rt = skb_rtable(skb);
	__be32 saddr, daddr;
	struct net *net = dev_net(skb->dev);



	uh   = udp_hdr(skb);
	ulen = ntohs(uh->len);
	saddr = ip_hdr(skb)->saddr;
	daddr = ip_hdr(skb)->daddr;
    
    //根据目的ip地址,端口号找到对应socket
	sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);

	if (sk != NULL) 
		int ret = udp_queue_rcv_skb(sk, skb);

	
	return 0;


__udp_queue_rcv_skb--》sock_queue_rcv_skb

int sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)

	int err;
	int skb_len;
	unsigned long flags;
	struct sk_buff_head *list = &sk->sk_receive_queue; //接收队列

	
	if (!sk_rmem_schedule(sk, skb, skb->truesize)) 
		atomic_inc(&sk->sk_drops);
		return -ENOBUFS;
	

    //插入接收队列
	__skb_queue_tail(list, skb);


	if (!sock_flag(sk, SOCK_DEAD))
		sk->sk_data_ready(sk, skb_len); //唤醒等待进程
	return 0;


sk->sk_data_ready	=	sock_def_readable;

static void sock_def_readable(struct sock *sk, int len)

	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	if (wq_has_sleeper(wq))
	//唤醒wq->wait队列中的进程
		wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
						POLLRDNORM | POLLRDBAND);
	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
	rcu_read_unlock();


写IO事件唤醒

skb在释放后,使用的写buf被释放,会回调写io事件处理函数。

kfree_skb-->__kfree_skb-->skb_release_head_state-->skb->destructor

skb_set_owner_w 设置skb->destructor = sock_wfree

static inline void skb_set_owner_w(struct sk_buff *skb, struct sock *sk)

	skb_orphan(skb);
	skb->sk = sk;
	skb->destructor = sock_wfree;
	/*
	 * We used to take a refcount on sk, but following operation
	 * is enough to guarantee sk_free() wont free this sock until
	 * all in-flight packets are completed
	 */
	atomic_add(skb->truesize, &sk->sk_wmem_alloc);

sock_wfree中调用sk->sk_write_space,进而调用sock_def_write_space,唤醒sk_wq队列上等待进程。

/*
 * Write buffer destructor automatically called from kfree_skb.
 */
void sock_wfree(struct sk_buff *skb)

	struct sock *sk = skb->sk;
	unsigned int len = skb->truesize;

	if (!sock_flag(sk, SOCK_USE_WRITE_QUEUE)) 
		/*
		 * Keep a reference on sk_wmem_alloc, this will be released
		 * after sk_write_space() call
		 */
		atomic_sub(len - 1, &sk->sk_wmem_alloc);
		sk->sk_write_space(sk);
		len = 1;
	



static void sock_def_write_space(struct sock *sk)

	struct socket_wq *wq;

	rcu_read_lock();

	/* Do not wake up a writer until he can make "significant"
	 * progress.  --DaveM
	 */
	 //唤醒sk_wq队列上进程
	if ((atomic_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) 
		wq = rcu_dereference(sk->sk_wq);
		if (wq_has_sleeper(wq))
			wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
						POLLWRNORM | POLLWRBAND);

		/* Should agree with poll, otherwise some programs break */
		if (sock_writeable(sk))
			sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
	

	rcu_read_unlock();

哪里调用skb_set_owner_w 设置skb->destructor = sock_wfree;

/*
 * Allocate a skb from the socket's send buffer.
 */
struct sk_buff *sock_wmalloc(struct sock *sk, unsigned long size, int force,
			     gfp_t priority)

	if (force || atomic_read(&sk->sk_wmem_alloc) < sk->sk_sndbuf) 
		struct sk_buff *skb = alloc_skb(size, priority);
		if (skb) 
			skb_set_owner_w(skb, sk);
			return skb;
		
	
	return NULL;

EXPORT_SYMBOL(sock_wmalloc);
udp_sendmsg--》ip_append_data--》__ip_append_data--》sock_wmalloc

raw_sendmsg--》ip_append_data--》__ip_append_data--》sock_wmalloc

ping_v4_sendmsg--》ip_append_data--》__ip_append_data--》sock_wmalloc

ip_send_unicast_reply--》ip_append_data--》__ip_append_data--》sock_wmalloc
udp_sendmsg--》ip_make_skb--》sock_wmalloc
udp_sendmsg--》ip_make_skb--》__ip_append_data--》sock_alloc_send_skb--》sock_alloc_send_pskb--》skb_set_owner_w
ip_fragment--》skb_set_owner_w
ip_finish_output2--》skb_set_owner_w
skb_cow_data--》skb_set_owner_w

以上是关于linux socket poll io处理-udp的主要内容,如果未能解决你的问题,请参考以下文章

Linux 高级IO

socket编程:多路复用I/O服务端客户端之poll

(OK) Linux epoll模型—socket epoll server client chat

socket.io 不适用于传输:['xhr-polling']

(OK) Linux epoll模型—socket epoll server client chat—pthread

Socket.io 客户端切换到 xhr-polling 时出错