一个发包乱序问题记录

Posted 安庆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个发包乱序问题记录相关的知识,希望对你有一定的参考价值。

在用户线程绑定某个核的情况下,从某个线程发送的udp报文,偶尔出现了乱序。我们来分析下发包流程:

 0xffffffff81593b30 : dev_hard_start_xmit+0x0/0x1a0 [kernel]------------进入driver层
 0xffffffff81596b08 : __dev_queue_xmit+0x448/0x550 [kernel]--------------其实少了一个qdisc层的堆栈
 0xffffffff81596c20 : dev_queue_xmit+0x10/0x20 [kernel]-----------------dev层
 0xffffffff815a284d : neigh_resolve_output+0x11d/0x220 [kernel]
 0xffffffff815dd65c : ip_finish_output+0x2ac/0x7a0 [kernel]
 0xffffffff815dde53 : ip_output+0x73/0xe0 [kernel]
 0xffffffff815dba87 : ip_local_out_sk+0x37/0x40 [kernel]
 0xffffffff815dbdf3 : ip_queue_xmit+0x143/0x3a0 [kernel]----------------ip层
 0xffffffff815f60fc : tcp_transmit_skb+0x52c/0xa20 [kernel]
 0xffffffff815f687c : tcp_write_xmit+0x28c/0xcf0 [kernel]
 0xffffffff815f755e : __tcp_push_pending_frames+0x2e/0xc0 [kernel]
 0xffffffff815e54ac : tcp_push+0xec/0x120 [kernel]
 0xffffffff815e8e70 : tcp_sendmsg+0xd0/0xc30 [kernel]---------------------tcp层
 0xffffffff816153d9 : inet_sendmsg+0x69/0xb0 [kernel]---------------------inet层
 0xffffffff815765ad : sock_aio_write+0x15d/0x180 [kernel]
 0xffffffff81208093 : do_sync_write+0x93/0xe0 [kernel]---------------------vfs层
 0xffffffff81208c75 : vfs_write+0x1c5/0x1f0 [kernel]
 0xffffffff8120998f : sys_write+0x7f/0xe0 [kernel]
 0xffffffff816c5715 : system_call_fastpath+0x1c/0x21 [kernel]

没有故意去抓udp的堆栈,除了tcp层那部分不太一样,其他都应该一样,不影响我们分析。

可以看到,这个是sys态直接发送的案例,后面其实就是

dev_hard_start_xmit--》xmit_one--》
netdev_start_xmit--》
__netdev_start_xmit--》
ops->ndo_start_xmit,这个对于bond,这个是
bond_start_xmit,最终还是会走到实体设备,如i40e的驱动,是i40e_lan_xmit_frame ,而ixegb的驱动则是:ixgbe_xmit_frame

也就是,整个发送过程都体现出来了。而且这个流程是在sys态完成的,不是在软中断中,软中断的话,需要从net_tx_action 中看起。

但是,如果对整个流程非常了解的人,可以看到堆栈中缺少一部分,那就是

__qdisc_run--》qdisc_restart--》
sch_direct_xmit--》
dev_hard_start_xmit这部分并没有在此体现。

从qdisc角度来看,虽然它的作用在于流量控制,但是也可以看做是dev层到驱动层的一个缓存层,从前面堆栈看,如果一个socket 固定从某个

cpu上发送,不会出现乱序,到了网卡驱动层,更不会乱序,那么到了qdisc层会怎么样?

下面详细分析下qdisc的可能乱序行为:

int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
{
。。。
    txq = netdev_pick_tx(dev, skb, accel_priv);
    q = rcu_dereference_bh(txq->qdisc);
。。。
    if (q->enqueue) {------------enqueue不为空,流控
        rc = __dev_xmit_skb(skb, q, dev, txq);
        goto out;
    }
。。。
}

拿到skb之后,怎么选择哪个queue发送呢?

//队列选择函数,如果不是多队列,ops->ndo_select_queue非空的话,则调用ops->ndo_select_queue,否则__netdev_pick_tx
struct netdev_queue *netdev_pick_tx(struct net_device *dev,
                    struct sk_buff *skb,
                    void *accel_priv)
{
    int queue_index = 0;

#ifdef CONFIG_XPS
    u32 sender_cpu = skb->sender_cpu - 1;

    if (sender_cpu >= (u32)NR_CPUS)
        skb->sender_cpu = raw_smp_processor_id() + 1;-------------开启了xps的话,会设置sender_cpu的值为当前cpu号+1
#endif

    if (dev->real_num_tx_queues != 1) {---------------------------多队列的设备
        const struct net_device_ops *ops = dev->netdev_ops;
        if (ops->ndo_select_queue)-----------i40e这个地方是NULL
            queue_index = ops->ndo_select_queue(dev, skb, accel_priv,
                                __netdev_pick_tx);
        else
            queue_index = __netdev_pick_tx(dev, skb);-------------所以i40e走这个流程

        if (!accel_priv)
            queue_index = netdev_cap_txqueue(dev, queue_index);
    }

    skb_set_queue_mapping(skb, queue_index);------------------保存对应的queue_index到skb->queue_mapping,发送的时候,会取这个来获取对应的queue
    return netdev_get_tx_queue(dev, queue_index);
}

既然i40e走的是默认的选queue_index的流程,那么就需要看一下这个函数:

static u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb)
{
    struct sock *sk = skb->sk;
    int queue_index = sk_tx_queue_get(sk);------之前保存的queue_index

    if (queue_index < 0 || skb->ooo_okay ||
        queue_index >= dev->real_num_tx_queues) {
        int new_index = get_xps_queue(dev, skb);------开启了xps,优先选择
        if (new_index < 0)--------没选到
            new_index = skb_tx_hash(dev, skb);--------则直接hash

        if (queue_index != new_index && sk &&
            rcu_access_pointer(sk->sk_dst_cache))
            sk_tx_queue_set(sk, new_index);

        queue_index = new_index;
    }

    return queue_index;
}

 在开启了xps的情况下,get_xps_queue 会根据xps_map来选择队列,如果xps只绑定了一个cpu,则用那个对应的queue-index,否则根据skb来hash选择:

//开启xps,根据sender_cpu来选择map
static inline int get_xps_queue(struct net_device *dev, struct sk_buff *skb)
{
#ifdef CONFIG_XPS
    struct xps_dev_maps *dev_maps;
    struct xps_map *map;
    int queue_index = -1;

    rcu_read_lock();
    dev_maps = rcu_dereference(dev->xps_maps);
    if (dev_maps) {
        map = rcu_dereference(
            dev_maps->cpu_map[skb->sender_cpu - 1]);//由于之前设置了sendercpu,所以这里取该cpu,找到对应的map
        if (map) {
            if (map->len == 1)
                queue_index = map->queues[0];//如果cpu只关联当前net_device的一个队列,当然直接选择
            else
                queue_index = map->queues[reciprocal_scale(skb_get_hash(skb),
                                       map->len)];//当前cpu关联了多个队列,则做下hash选择
            if (unlikely(queue_index >= dev->real_num_tx_queues))
                queue_index = -1;
        }
    }
    rcu_read_unlock();

    return queue_index;
#else
    return -1;
#endif
}

 我们目前设置的xps_map如下:

cat /sys/class/net/eth0/queues/tx-0/xps_cpus
00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000001

也就是一个队列,只对应一个cpu。这个展示的结果,是从队列的角度来展示对应的cpu,但是内核实现的时候,
实际上代码内使用了反向映射,通过xps_dev_maps存放到cpu到tx队列集合的映射:参照:dev_maps->cpu_map 函数的 实现过程,

按照我们目前这样的这样的话,一个队列对应的是一个核,一个核针对某个网卡,也只是某一个队列,按道理也不会乱序啊。因为txq因为queue-index唯一确定,

而qdisc 又是由queue->qdisc唯一确定的。

选择好了queue,那么来看对应的qdisc实现:

static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
                 struct net_device *dev,
                 struct netdev_queue *txq)
{
    spinlock_t *root_lock = qdisc_lock(q);
    bool contended;
    int rc;

    qdisc_pkt_len_init(skb);
    qdisc_calculate_pkt_len(skb, q);
    /*
     * Heuristic to force contended enqueues to serialize on a
     * separate lock before trying to get qdisc main lock.
     * This permits __QDISC_STATE_RUNNING owner to get the lock more often
     * and dequeue packets faster.
     */
    contended = qdisc_is_running(q);
    if (unlikely(contended))
        spin_lock(&q->busylock);

    spin_lock(root_lock);
    ........//只保留一种条件,其他忽略
        rc = q->enqueue(skb, q) & NET_XMIT_MASK;//入队qdisc,对于fq,其实就是将当前的skb按顺序加到flow的尾部
        if (qdisc_run_begin(q)) {
            if (unlikely(contended)) {
                spin_unlock(&q->busylock);
                contended = false;
            }
            __qdisc_run(q);//取包调dequeue,也是按顺序的,
        }
    }
    spin_unlock(root_lock);
    if (unlikely(contended))
        spin_unlock(&q->busylock);
    return rc;
}

好的,正式进入了qdisc层,来看一下q->enqueue的实现:

当然如果把qdisc看做一个对象的话,它的背后还有一堆class和filter,由于我们环境使用的是:

[[email protected] ~]# tc qdisc show dev eth0
qdisc mq 0: root
qdisc fq 0: parent :1 limit 10000p flow_limit 100p buckets 1024 quantum 3028 initial_quantum 15140

针对的是fq的实现:

static struct Qdisc_ops fq_qdisc_ops __read_mostly = {
    .id        =    "fq",
    .priv_size    =    sizeof(struct fq_sched_data),

    .enqueue    =    fq_enqueue,

看看实现:

static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch)
{
    struct fq_sched_data *q = qdisc_priv(sch);
    struct fq_flow *f;

    if (unlikely(sch->q.qlen >= sch->limit))//fq管理的flow个数超过阈值
        return qdisc_drop(skb, sch);------------------------------丢包,但不会乱序

    f = fq_classify(skb, q);-------------根据skb的sk来获取对应的flow,如果没有,则申请一个。
    if (unlikely(f->qlen >= q->flow_plimit && f != &q->internal)) {
        q->stat_flows_plimit++;
        return qdisc_drop(skb, sch);
    }

    f->qlen++;
    if (skb_is_retransmit(skb))
        q->stat_tcp_retrans++;
    qdisc_qstats_backlog_inc(sch, skb);
    if (fq_flow_is_detached(f)) {
        fq_flow_add_tail(&q->new_flows, f);----------------------------------加到skb加到对应的flow中去,关键函数
        if (time_after(jiffies, f->age + q->flow_refill_delay))
            f->credit = max_t(u32, f->credit, q->quantum);
        q->inactive_flows--;
    }

    /* Note: this overwrites f->age */
    flow_queue_add(f, skb);

    if (unlikely(f == &q->internal)) {
        q->stat_internal_packets++;
    }
    sch->q.qlen++;

    return NET_XMIT_SUCCESS;
}

我们看一下加skb到flow中去的过程:

static void flow_queue_add(struct fq_flow *flow, struct sk_buff *skb)
{
    struct sk_buff *prev, *head = flow->head;

    skb->next = NULL;
    if (!head) {
        flow->head = skb;
        flow->tail = skb;
        return;
    }
    if (likely(!skb_is_retransmit(skb))) {
        flow->tail->next = skb;--------------------skb加到链表尾,所以绝对不会乱序
        flow->tail = skb;
        return;
    }

不管怎么样,skb是加入到了对应的flow中了,就等着dequeue的时候发送了,间接地保证了时序。

同理也可以分析 :

static struct sk_buff *fq_dequeue(struct Qdisc *sch)
这个也是按顺序取包发送,不会乱序。
从这个流程看,应该不会乱序,那么最终乱序的原因是?仔细查看我们的发包流程,我们一直强调是在sys态,还有一个中断时发包的流程没有分析。
我们来看i40e的napi发包模式:
i40e_napi_poll函数调用 i40e_clean_tx_irq,清理发送队列的数据。
static bool i40e_clean_tx_irq(struct i40e_vsi *vsi,
                  struct i40e_ring *tx_ring, int napi_budget)//发送完之后资源回收
{
。。。。
        if (__netif_subqueue_stopped(tx_ring->netdev,
                         tx_ring->queue_index) &&
           !test_bit(__I40E_DOWN, &vsi->state)) {
            netif_wake_subqueue(tx_ring->netdev,
                        tx_ring->queue_index);
            ++tx_ring->tx_stats.restart_queue;
        }
。。。。。
}

具体查看 netif_wake_subqueue 的实现,

netif_wake_subqueue --》__netif_schedule --》__netif_reschedule 
static inline void __netif_reschedule(struct Qdisc *q)
{
    struct softnet_data *sd;
    unsigned long flags;

    local_irq_save(flags);
    sd = this_cpu_ptr(&softnet_data);
    q->next_sched = NULL;
    *sd->output_queue_tailp = q;
    sd->output_queue_tailp = &q->next_sched;//output_queue_tailp只会在中断中操作,net_tx_action会获取sd->output_queue,然后调用qdisc_run发包。
    raise_softirq_irqoff(NET_TX_SOFTIRQ);
    local_irq_restore(flags);
}

触发软中断,而根据中断绑核的设置,该中断号绑定的cpu并不一定和xps的queue映射的cpu是同一个,这样的话,就存在两个cpu发送一个流的情况。一个是在sys态,调用qdisc_run发包,

一个是在软中断处理时,调用qdisc_run发包,这两个cpu可能不是同一个。

由于发送的时候,还是要调用qdisc的spin_lock,所以虽然cpu不是同一个,但还是依靠自旋锁控制了并发dequeue一个skb的情况。那就是不可能两个cpu都dequeue到同一个skb的情况。

中断里面处理流程:

        root_lock = qdisc_lock(q);
            if (spin_trylock(root_lock)) {-------------获取qdisc的自旋锁
                smp_mb__before_clear_bit();
                clear_bit(__QDISC_STATE_SCHED,
                      &q->state);//清理__QDISC_STATE_SCHED状态,
                qdisc_run(q);-------------------------这个里面还是会判断qdisc的state,如果是running状态,则直接返回,不会调用__qdisc_run
                spin_unlock(root_lock);

殊途同归,进入__qdisc_run:

void __qdisc_run(struct Qdisc *q)//进入该函数,此时应持有qdisc_lock
{
    int quota = weight_p;

    while (qdisc_restart(q)) {
        /*
         * Ordered by possible occurrence: Postpone processing if
         * 1. we‘ve exceeded packet quota
         * 2. another process needs the CPU;
         */
        if (--quota <= 0 || need_resched()) {//配额用完了,__netif_schedule会触发软中断
            __netif_schedule(q);
            break;
        }
    }

    qdisc_run_end(q);
}


static inline int qdisc_restart(struct Qdisc *q)
{
    struct netdev_queue *txq;
    struct net_device *dev;
    spinlock_t *root_lock;
    struct sk_buff *skb;
    bool validate;

    /* Dequeue packet */
    skb = dequeue_skb(q, &validate);
    if (unlikely(!skb))
        return 0;

    root_lock = qdisc_lock(q);
    dev = qdisc_dev(q);
    txq = skb_get_tx_queue(dev, skb);

    return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);
}

所以按道理也能保证发送时序,但是由于我劫持了响应的网卡发包驱动,自己再做了一次缓存,而这些缓存的管理,是percpu的,所以qdisc持有锁发送的时候,最终到了两个不同的percpu的缓存,然后就存在发包乱序的可能了,毕竟两个cpu再也看不到对方的存在了,也不会按顺序从qdisc中取skb了,他们只管自己percpu缓存中的skb,所以会分别尝试获取:

HARD_TX_LOCK(dev, txq, smp_processor_id());这把锁,然后发包。乱序就很正常了。
 
结论:
1.如果过多的cpu使用相同的tx队列,那么加重tx对应的qdisc锁的争抢,也会增加对txq->_xmit_lock的争抢,
为了将qdisc的锁争抢降低到最低,最好就是:。 如果每个cpu只关联了一个tx,甚至能消除竞争
也可以减小因为发送完成中断造成的cache miss。
因此xps_cpus的配置最好结合/proc/irq//smp_affinity, 映射最好在同一个cpu或者同一个numa node的cpu上。
 
2.如果在驱动层再做了缓存,要保证各个cpu的时序非常困难,所以只能保证开启了xps的情况下,将xps映射到的cpu和网卡对应中断完全对应,比1的要求还要严格,
因为多个cpu访问的percpu缓存再也不相关了,同一个流如果有skb在不同的percpu缓存,时序无法保证。
 

当然还有一种情况是cpu的热插拔,这个不在本文讨论范围之内。

 附:

一个net_device在初始化的时候:

void dev_init_scheduler(struct net_device *dev)
{
    dev->qdisc = &noop_qdisc;
。。。。
}

但是当网卡up的时候,调用dev_activate,会重新设置,如果是多队列网卡,则设置为 mq_qdisc_ops:

void dev_activate(struct net_device *dev)
{
    int need_watchdog;

    /* No queueing discipline is attached to device;
     * create default one for devices, which need queueing
     * and noqueue_qdisc for virtual interfaces
     */

    if (dev->qdisc == &noop_qdisc)
        attach_default_qdiscs(dev);
。。。。。
}

static void attach_default_qdiscs(struct net_device *dev)
{
    struct netdev_queue *txq;
    struct Qdisc *qdisc;

    txq = netdev_get_tx_queue(dev, 0);

    if (!netif_is_multiqueue(dev) ||
        dev->priv_flags & IFF_NO_QUEUE) {
        netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL);
        dev->qdisc = txq->qdisc_sleeping;
        atomic_inc(&dev->qdisc->refcnt);
    } else {
        qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);-------------默认使用多队列策略设置root的策略,即mq_qdisc_ops
        if (qdisc) {
            dev->qdisc = qdisc;
            qdisc->ops->attach(qdisc);
        }
    }
}
 
参考资料:
https://lwn.net/Articles/412062/
 

以上是关于一个发包乱序问题记录的主要内容,如果未能解决你的问题,请参考以下文章

一道面试题目

并发包java.util.concurrent.locks.Lock

用于从 cloudkit 检索单列的代码模式/片段

CSP核心代码片段记录

npm发包记录

数据库使用order by排序乱序的问题