一个发包乱序问题记录
Posted 安庆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个发包乱序问题记录相关的知识,希望对你有一定的参考价值。
在用户线程绑定某个核的情况下,从某个线程发送的udp报文,偶尔出现了乱序。我们来分析下发包流程:
0xffffffff81593b30 : dev_hard_start_xmit+0x0/0x1a0 [kernel]------------进入driver层 0xffffffff81596b08 : __dev_queue_xmit+0x448/0x550 [kernel]--------------其实少了一个qdisc层的堆栈 0xffffffff81596c20 : dev_queue_xmit+0x10/0x20 [kernel]-----------------dev层 0xffffffff815a284d : neigh_resolve_output+0x11d/0x220 [kernel] 0xffffffff815dd65c : ip_finish_output+0x2ac/0x7a0 [kernel] 0xffffffff815dde53 : ip_output+0x73/0xe0 [kernel] 0xffffffff815dba87 : ip_local_out_sk+0x37/0x40 [kernel] 0xffffffff815dbdf3 : ip_queue_xmit+0x143/0x3a0 [kernel]----------------ip层 0xffffffff815f60fc : tcp_transmit_skb+0x52c/0xa20 [kernel] 0xffffffff815f687c : tcp_write_xmit+0x28c/0xcf0 [kernel] 0xffffffff815f755e : __tcp_push_pending_frames+0x2e/0xc0 [kernel] 0xffffffff815e54ac : tcp_push+0xec/0x120 [kernel] 0xffffffff815e8e70 : tcp_sendmsg+0xd0/0xc30 [kernel]---------------------tcp层 0xffffffff816153d9 : inet_sendmsg+0x69/0xb0 [kernel]---------------------inet层 0xffffffff815765ad : sock_aio_write+0x15d/0x180 [kernel] 0xffffffff81208093 : do_sync_write+0x93/0xe0 [kernel]---------------------vfs层 0xffffffff81208c75 : vfs_write+0x1c5/0x1f0 [kernel] 0xffffffff8120998f : sys_write+0x7f/0xe0 [kernel] 0xffffffff816c5715 : system_call_fastpath+0x1c/0x21 [kernel]
没有故意去抓udp的堆栈,除了tcp层那部分不太一样,其他都应该一样,不影响我们分析。
可以看到,这个是sys态直接发送的案例,后面其实就是
也就是,整个发送过程都体现出来了。而且这个流程是在sys态完成的,不是在软中断中,软中断的话,需要从net_tx_action 中看起。
但是,如果对整个流程非常了解的人,可以看到堆栈中缺少一部分,那就是
从qdisc角度来看,虽然它的作用在于流量控制,但是也可以看做是dev层到驱动层的一个缓存层,从前面堆栈看,如果一个socket 固定从某个
cpu上发送,不会出现乱序,到了网卡驱动层,更不会乱序,那么到了qdisc层会怎么样?
下面详细分析下qdisc的可能乱序行为:
int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv) { 。。。 txq = netdev_pick_tx(dev, skb, accel_priv); q = rcu_dereference_bh(txq->qdisc); 。。。 if (q->enqueue) {------------enqueue不为空,流控 rc = __dev_xmit_skb(skb, q, dev, txq); goto out; } 。。。 }
拿到skb之后,怎么选择哪个queue发送呢?
//队列选择函数,如果不是多队列,ops->ndo_select_queue非空的话,则调用ops->ndo_select_queue,否则__netdev_pick_tx struct netdev_queue *netdev_pick_tx(struct net_device *dev, struct sk_buff *skb, void *accel_priv) { int queue_index = 0; #ifdef CONFIG_XPS u32 sender_cpu = skb->sender_cpu - 1; if (sender_cpu >= (u32)NR_CPUS) skb->sender_cpu = raw_smp_processor_id() + 1;-------------开启了xps的话,会设置sender_cpu的值为当前cpu号+1 #endif if (dev->real_num_tx_queues != 1) {---------------------------多队列的设备 const struct net_device_ops *ops = dev->netdev_ops; if (ops->ndo_select_queue)-----------i40e这个地方是NULL queue_index = ops->ndo_select_queue(dev, skb, accel_priv, __netdev_pick_tx); else queue_index = __netdev_pick_tx(dev, skb);-------------所以i40e走这个流程 if (!accel_priv) queue_index = netdev_cap_txqueue(dev, queue_index); } skb_set_queue_mapping(skb, queue_index);------------------保存对应的queue_index到skb->queue_mapping,发送的时候,会取这个来获取对应的queue return netdev_get_tx_queue(dev, queue_index); }
既然i40e走的是默认的选queue_index的流程,那么就需要看一下这个函数:
static u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb) { struct sock *sk = skb->sk; int queue_index = sk_tx_queue_get(sk);------之前保存的queue_index if (queue_index < 0 || skb->ooo_okay || queue_index >= dev->real_num_tx_queues) { int new_index = get_xps_queue(dev, skb);------开启了xps,优先选择 if (new_index < 0)--------没选到 new_index = skb_tx_hash(dev, skb);--------则直接hash if (queue_index != new_index && sk && rcu_access_pointer(sk->sk_dst_cache)) sk_tx_queue_set(sk, new_index); queue_index = new_index; } return queue_index; }
在开启了xps的情况下,get_xps_queue 会根据xps_map来选择队列,如果xps只绑定了一个cpu,则用那个对应的queue-index,否则根据skb来hash选择:
//开启xps,根据sender_cpu来选择map static inline int get_xps_queue(struct net_device *dev, struct sk_buff *skb) { #ifdef CONFIG_XPS struct xps_dev_maps *dev_maps; struct xps_map *map; int queue_index = -1; rcu_read_lock(); dev_maps = rcu_dereference(dev->xps_maps); if (dev_maps) { map = rcu_dereference( dev_maps->cpu_map[skb->sender_cpu - 1]);//由于之前设置了sendercpu,所以这里取该cpu,找到对应的map if (map) { if (map->len == 1) queue_index = map->queues[0];//如果cpu只关联当前net_device的一个队列,当然直接选择 else queue_index = map->queues[reciprocal_scale(skb_get_hash(skb), map->len)];//当前cpu关联了多个队列,则做下hash选择 if (unlikely(queue_index >= dev->real_num_tx_queues)) queue_index = -1; } } rcu_read_unlock(); return queue_index; #else return -1; #endif }
我们目前设置的xps_map如下:
cat /sys/class/net/eth0/queues/tx-0/xps_cpus 00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000001
也就是一个队列,只对应一个cpu。这个展示的结果,是从队列的角度来展示对应的cpu,但是内核实现的时候,
实际上代码内使用了反向映射,通过xps_dev_maps存放到cpu到tx队列集合的映射:参照:dev_maps->cpu_map 函数的 实现过程,
按照我们目前这样的这样的话,一个队列对应的是一个核,一个核针对某个网卡,也只是某一个队列,按道理也不会乱序啊。因为txq因为queue-index唯一确定,
而qdisc 又是由queue->qdisc唯一确定的。
选择好了queue,那么来看对应的qdisc实现:
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, struct net_device *dev, struct netdev_queue *txq) { spinlock_t *root_lock = qdisc_lock(q); bool contended; int rc; qdisc_pkt_len_init(skb); qdisc_calculate_pkt_len(skb, q); /* * Heuristic to force contended enqueues to serialize on a * separate lock before trying to get qdisc main lock. * This permits __QDISC_STATE_RUNNING owner to get the lock more often * and dequeue packets faster. */ contended = qdisc_is_running(q); if (unlikely(contended)) spin_lock(&q->busylock); spin_lock(root_lock); ........//只保留一种条件,其他忽略 rc = q->enqueue(skb, q) & NET_XMIT_MASK;//入队qdisc,对于fq,其实就是将当前的skb按顺序加到flow的尾部 if (qdisc_run_begin(q)) { if (unlikely(contended)) { spin_unlock(&q->busylock); contended = false; } __qdisc_run(q);//取包调dequeue,也是按顺序的, } } spin_unlock(root_lock); if (unlikely(contended)) spin_unlock(&q->busylock); return rc; }
好的,正式进入了qdisc层,来看一下q->enqueue的实现:
当然如果把qdisc看做一个对象的话,它的背后还有一堆class和filter,由于我们环境使用的是:
[[email protected] ~]# tc qdisc show dev eth0 qdisc mq 0: root qdisc fq 0: parent :1 limit 10000p flow_limit 100p buckets 1024 quantum 3028 initial_quantum 15140
针对的是fq的实现:
static struct Qdisc_ops fq_qdisc_ops __read_mostly = { .id = "fq", .priv_size = sizeof(struct fq_sched_data), .enqueue = fq_enqueue,
看看实现:
static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch) { struct fq_sched_data *q = qdisc_priv(sch); struct fq_flow *f; if (unlikely(sch->q.qlen >= sch->limit))//fq管理的flow个数超过阈值 return qdisc_drop(skb, sch);------------------------------丢包,但不会乱序 f = fq_classify(skb, q);-------------根据skb的sk来获取对应的flow,如果没有,则申请一个。 if (unlikely(f->qlen >= q->flow_plimit && f != &q->internal)) { q->stat_flows_plimit++; return qdisc_drop(skb, sch); } f->qlen++; if (skb_is_retransmit(skb)) q->stat_tcp_retrans++; qdisc_qstats_backlog_inc(sch, skb); if (fq_flow_is_detached(f)) { fq_flow_add_tail(&q->new_flows, f);----------------------------------加到skb加到对应的flow中去,关键函数 if (time_after(jiffies, f->age + q->flow_refill_delay)) f->credit = max_t(u32, f->credit, q->quantum); q->inactive_flows--; } /* Note: this overwrites f->age */ flow_queue_add(f, skb); if (unlikely(f == &q->internal)) { q->stat_internal_packets++; } sch->q.qlen++; return NET_XMIT_SUCCESS; }
我们看一下加skb到flow中去的过程:
static void flow_queue_add(struct fq_flow *flow, struct sk_buff *skb) { struct sk_buff *prev, *head = flow->head; skb->next = NULL; if (!head) { flow->head = skb; flow->tail = skb; return; } if (likely(!skb_is_retransmit(skb))) { flow->tail->next = skb;--------------------skb加到链表尾,所以绝对不会乱序 flow->tail = skb; return; }
不管怎么样,skb是加入到了对应的flow中了,就等着dequeue的时候发送了,间接地保证了时序。
同理也可以分析 :
static bool i40e_clean_tx_irq(struct i40e_vsi *vsi, struct i40e_ring *tx_ring, int napi_budget)//发送完之后资源回收 { 。。。。 if (__netif_subqueue_stopped(tx_ring->netdev, tx_ring->queue_index) && !test_bit(__I40E_DOWN, &vsi->state)) { netif_wake_subqueue(tx_ring->netdev, tx_ring->queue_index); ++tx_ring->tx_stats.restart_queue; } 。。。。。 }
具体查看 netif_wake_subqueue 的实现,
static inline void __netif_reschedule(struct Qdisc *q) { struct softnet_data *sd; unsigned long flags; local_irq_save(flags); sd = this_cpu_ptr(&softnet_data); q->next_sched = NULL; *sd->output_queue_tailp = q; sd->output_queue_tailp = &q->next_sched;//output_queue_tailp只会在中断中操作,net_tx_action会获取sd->output_queue,然后调用qdisc_run发包。
raise_softirq_irqoff(NET_TX_SOFTIRQ);
local_irq_restore(flags);
}
触发软中断,而根据中断绑核的设置,该中断号绑定的cpu并不一定和xps的queue映射的cpu是同一个,这样的话,就存在两个cpu发送一个流的情况。一个是在sys态,调用qdisc_run发包,
一个是在软中断处理时,调用qdisc_run发包,这两个cpu可能不是同一个。
由于发送的时候,还是要调用qdisc的spin_lock,所以虽然cpu不是同一个,但还是依靠自旋锁控制了并发dequeue一个skb的情况。那就是不可能两个cpu都dequeue到同一个skb的情况。
中断里面处理流程:
root_lock = qdisc_lock(q); if (spin_trylock(root_lock)) {-------------获取qdisc的自旋锁 smp_mb__before_clear_bit(); clear_bit(__QDISC_STATE_SCHED, &q->state);//清理__QDISC_STATE_SCHED状态, qdisc_run(q);-------------------------这个里面还是会判断qdisc的state,如果是running状态,则直接返回,不会调用__qdisc_run spin_unlock(root_lock);
殊途同归,进入__qdisc_run:
void __qdisc_run(struct Qdisc *q)//进入该函数,此时应持有qdisc_lock { int quota = weight_p; while (qdisc_restart(q)) { /* * Ordered by possible occurrence: Postpone processing if * 1. we‘ve exceeded packet quota * 2. another process needs the CPU; */ if (--quota <= 0 || need_resched()) {//配额用完了,__netif_schedule会触发软中断 __netif_schedule(q); break; } } qdisc_run_end(q); }
所以按道理也能保证发送时序,但是由于我劫持了响应的网卡发包驱动,自己再做了一次缓存,而这些缓存的管理,是percpu的,所以qdisc持有锁发送的时候,最终到了两个不同的percpu的缓存,然后就存在发包乱序的可能了,毕竟两个cpu再也看不到对方的存在了,也不会按顺序从qdisc中取skb了,他们只管自己percpu缓存中的skb,所以会分别尝试获取:
也可以减小因为发送完成中断造成的cache miss。
因此xps_cpus的配置最好结合/proc/irq//smp_affinity, 映射最好在同一个cpu或者同一个numa node的cpu上。
当然还有一种情况是cpu的热插拔,这个不在本文讨论范围之内。
附:
一个net_device在初始化的时候:
void dev_init_scheduler(struct net_device *dev) { dev->qdisc = &noop_qdisc; 。。。。 }
但是当网卡up的时候,调用dev_activate,会重新设置,如果是多队列网卡,则设置为 mq_qdisc_ops:
void dev_activate(struct net_device *dev) { int need_watchdog; /* No queueing discipline is attached to device; * create default one for devices, which need queueing * and noqueue_qdisc for virtual interfaces */ if (dev->qdisc == &noop_qdisc) attach_default_qdiscs(dev); 。。。。。 } static void attach_default_qdiscs(struct net_device *dev) { struct netdev_queue *txq; struct Qdisc *qdisc; txq = netdev_get_tx_queue(dev, 0); if (!netif_is_multiqueue(dev) || dev->priv_flags & IFF_NO_QUEUE) { netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL); dev->qdisc = txq->qdisc_sleeping; atomic_inc(&dev->qdisc->refcnt); } else { qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);-------------默认使用多队列策略设置root的策略,即mq_qdisc_ops if (qdisc) { dev->qdisc = qdisc; qdisc->ops->attach(qdisc); } } }
以上是关于一个发包乱序问题记录的主要内容,如果未能解决你的问题,请参考以下文章