linux socket poll io处理-udp
Posted osnet
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了linux socket poll io处理-udp相关的知识,希望对你有一定的参考价值。
文章目录
linux socket中定义了多种io事件,io事件发生时会调用它们处理函数。
struct sock
//sock wait queue and async head
struct socket_wq __rcu *sk_wq; // socket等待事件队列,用于io事件异步通知
atomic_t sk_wmem_alloc;
void (*sk_state_change)(struct sock *sk);//callback to indicate change in the state of the sock,socket状态发生变化
void (*sk_data_ready)(struct sock *sk, int bytes);//callback to indicate there is data to be processed,socket缓冲区有数据可读时继续回调
void (*sk_write_space)(struct sock *sk);//callback to indicate there is bf sending space available,socket缓冲区有足够空间发送数据时进行回调
void (*sk_error_report)(struct sock *sk); //callback to indicate errors, 发生socket 错误时
;
创建sock时会初始化io处理函数。
void sock_init_data(struct socket *sock, struct sock *sk)
skb_queue_head_init(&sk->sk_receive_queue);
skb_queue_head_init(&sk->sk_write_queue);
skb_queue_head_init(&sk->sk_error_queue);
#ifdef CONFIG_NET_DMA
skb_queue_head_init(&sk->sk_async_wait_queue);
#endif
sk->sk_send_head = NULL;
init_timer(&sk->sk_timer);
sk->sk_allocation = GFP_KERNEL;
sk->sk_rcvbuf = sysctl_rmem_default;
sk->sk_sndbuf = sysctl_wmem_default;
sk->sk_state = TCP_CLOSE;
//初始化io事件回调处理函数
sk->sk_state_change = sock_def_wakeup;
sk->sk_data_ready = sock_def_readable;
sk->sk_write_space = sock_def_write_space;
sk->sk_error_report = sock_def_error_report;
sk->sk_destruct = sock_def_destruct;
udp_poll
先看poll等待事件的设置,do_select函数中回调
do_select:
mask = (*f_op->poll)(f.file, wait);
const struct proto_ops inet_dgram_ops =
....
.poll = udp_poll,
....
;
unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
unsigned int mask = datagram_poll(file, sock, wait);
struct sock *sk = sock->sk;
return mask;
unsigned int datagram_poll(struct file *file, struct socket *sock,
poll_table *wait)
struct sock *sk = sock->sk;
unsigned int mask;
//把wait插入等待队列sk->sk_wq
sock_poll_wait(file, sk_sleep(sk), wait);
mask = 0;
/* exceptional events? */
//io 异常事件
if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
mask |= POLLERR |
(sock_flag(sk, SOCK_SELECT_ERR_QUEUE) ? POLLPRI : 0);
if (sk->sk_shutdown & RCV_SHUTDOWN)
mask |= POLLRDHUP | POLLIN | POLLRDNORM;
if (sk->sk_shutdown == SHUTDOWN_MASK)
mask |= POLLHUP;
/* readable? */
//接收队列不为空,可读
if (!skb_queue_empty(&sk->sk_receive_queue))
mask |= POLLIN | POLLRDNORM;
/* Connection-based need to check for termination and startup */
//协议状态变化事件
if (connection_based(sk))
if (sk->sk_state == TCP_CLOSE)
mask |= POLLHUP;
/* connection hasn't started yet? */
if (sk->sk_state == TCP_SYN_SENT)
return mask;
/* writable? */
//可写事件
if (sock_writeable(sk))
mask |= POLLOUT | POLLWRNORM | POLLWRBAND;
else
set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
return mask;
把wait插入等待队列sk->sk_wq
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
return &rcu_dereference_raw(sk->sk_wq)->wait;
static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p)
//p是在do_select中设置的,wait_address是sk->sk_wq->wait socket等待队列
if (!poll_does_not_wait(p) && wait_address)
poll_wait(filp, wait_address, p);
/* We need to be sure we are in sync with the
* socket flags modification.
*
* This memory barrier is paired in the wq_has_sleeper.
*/
smp_mb();
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p); //回调do_select中设置的函数
do_select中:
void poll_initwait(struct poll_wqueues *pwq)
init_poll_funcptr(&pwq->pt, __pollwait);//__pollwait为_qproc
pwq->polling_task = current; //设置当前进程
pwq->triggered = 0;
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
entry->filp = get_file(filp);
entry->wait_address = wait_address;
entry->key = p->_key;
init_waitqueue_func_entry(&entry->wait, pollwake);
entry->wait.private = pwq;
//把wait加入到等待对列wait_address
add_wait_queue(wait_address, &entry->wait);
static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p)
struct poll_table_page *table = p->table;
if (p->inline_index < N_INLINE_POLL_ENTRIES)
return p->inline_entries + p->inline_index++;
if (!table || POLL_TABLE_FULL(table))
struct poll_table_page *new_table;
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
if (!new_table)
p->error = -ENOMEM;
return NULL;
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
return table->entry++;
读IO事件唤醒
上面当前进程挂载到sk_wk等待队列,然后让出cpu,当网卡驱动收到数据,协议栈处理完成,会唤醒poll等待进程。
static const struct net_protocol udp_protocol =
.handler = udp_rcv,
.err_handler = udp_err,
.no_policy = 1,
.netns_ok = 1,
;
协议栈处理完成,会调用udp_rcv函数,把接收数据挂到接收队列。
int udp_rcv(struct sk_buff *skb)
return __udp4_lib_rcv(skb, &udp_table, IPPROTO_UDP);
int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
int proto)
struct sock *sk;
struct udphdr *uh;
unsigned short ulen;
struct rtable *rt = skb_rtable(skb);
__be32 saddr, daddr;
struct net *net = dev_net(skb->dev);
uh = udp_hdr(skb);
ulen = ntohs(uh->len);
saddr = ip_hdr(skb)->saddr;
daddr = ip_hdr(skb)->daddr;
//根据目的ip地址,端口号找到对应socket
sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);
if (sk != NULL)
int ret = udp_queue_rcv_skb(sk, skb);
return 0;
__udp_queue_rcv_skb--》sock_queue_rcv_skb
int sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
int err;
int skb_len;
unsigned long flags;
struct sk_buff_head *list = &sk->sk_receive_queue; //接收队列
if (!sk_rmem_schedule(sk, skb, skb->truesize))
atomic_inc(&sk->sk_drops);
return -ENOBUFS;
//插入接收队列
__skb_queue_tail(list, skb);
if (!sock_flag(sk, SOCK_DEAD))
sk->sk_data_ready(sk, skb_len); //唤醒等待进程
return 0;
sk->sk_data_ready = sock_def_readable;
static void sock_def_readable(struct sock *sk, int len)
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
//唤醒wq->wait队列中的进程
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
写IO事件唤醒
skb在释放后,使用的写buf被释放,会回调写io事件处理函数。
kfree_skb-->__kfree_skb-->skb_release_head_state-->skb->destructor
skb_set_owner_w 设置skb->destructor = sock_wfree
static inline void skb_set_owner_w(struct sk_buff *skb, struct sock *sk)
skb_orphan(skb);
skb->sk = sk;
skb->destructor = sock_wfree;
/*
* We used to take a refcount on sk, but following operation
* is enough to guarantee sk_free() wont free this sock until
* all in-flight packets are completed
*/
atomic_add(skb->truesize, &sk->sk_wmem_alloc);
sock_wfree中调用sk->sk_write_space,进而调用sock_def_write_space,唤醒sk_wq队列上等待进程。
/*
* Write buffer destructor automatically called from kfree_skb.
*/
void sock_wfree(struct sk_buff *skb)
struct sock *sk = skb->sk;
unsigned int len = skb->truesize;
if (!sock_flag(sk, SOCK_USE_WRITE_QUEUE))
/*
* Keep a reference on sk_wmem_alloc, this will be released
* after sk_write_space() call
*/
atomic_sub(len - 1, &sk->sk_wmem_alloc);
sk->sk_write_space(sk);
len = 1;
static void sock_def_write_space(struct sock *sk)
struct socket_wq *wq;
rcu_read_lock();
/* Do not wake up a writer until he can make "significant"
* progress. --DaveM
*/
//唤醒sk_wq队列上进程
if ((atomic_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf)
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
POLLWRNORM | POLLWRBAND);
/* Should agree with poll, otherwise some programs break */
if (sock_writeable(sk))
sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
rcu_read_unlock();
哪里调用skb_set_owner_w 设置skb->destructor = sock_wfree;
/*
* Allocate a skb from the socket's send buffer.
*/
struct sk_buff *sock_wmalloc(struct sock *sk, unsigned long size, int force,
gfp_t priority)
if (force || atomic_read(&sk->sk_wmem_alloc) < sk->sk_sndbuf)
struct sk_buff *skb = alloc_skb(size, priority);
if (skb)
skb_set_owner_w(skb, sk);
return skb;
return NULL;
EXPORT_SYMBOL(sock_wmalloc);
udp_sendmsg--》ip_append_data--》__ip_append_data--》sock_wmalloc
raw_sendmsg--》ip_append_data--》__ip_append_data--》sock_wmalloc
ping_v4_sendmsg--》ip_append_data--》__ip_append_data--》sock_wmalloc
ip_send_unicast_reply--》ip_append_data--》__ip_append_data--》sock_wmalloc
udp_sendmsg--》ip_make_skb--》sock_wmalloc
udp_sendmsg--》ip_make_skb--》__ip_append_data--》sock_alloc_send_skb--》sock_alloc_send_pskb--》skb_set_owner_w
ip_fragment--》skb_set_owner_w
ip_finish_output2--》skb_set_owner_w
skb_cow_data--》skb_set_owner_w
以上是关于linux socket poll io处理-udp的主要内容,如果未能解决你的问题,请参考以下文章
(OK) Linux epoll模型—socket epoll server client chat
socket.io 不适用于传输:['xhr-polling']