Linux 网络协议栈开发代码分析篇之数据收发 —— dev_queue_xmit()函数

Posted zqixiao_09

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux 网络协议栈开发代码分析篇之数据收发 —— dev_queue_xmit()函数相关的知识,希望对你有一定的参考价值。

     当所有的信息都准备好了之后,例如,出口设备,下一跳的地址,以及链路层地址。就会调用dev.c文件中的dev_queue_xmin函数,该函数是设备驱动程序执行传输的接口。也就是所有的数据包在填充完成后,最终发送数据时,都会调用该函数。

     Dev_queue_xmit函数只接收一个skb_buff结构作为输入的值。此数据结构包含了此函数所需要的一切信息。Skb->dev是出口设备,skb->data为有效的载荷的开头,其长度为skb->len

int dev_queue_xmit(struct sk_buff *skb)     
     
    struct net_device *dev = skb->dev;     
    struct netdev_queue *txq;     
    struct Qdisc *q;     
    int rc = -ENOMEM;     
    /* GSO will handle the following emulations directly. */    
    if (netif_needs_gso(dev, skb))     
        goto gso;     
    //首先判断skb是否被分段,如果分了段并且网卡不支持分散读的话需要将所有段重新组合成一个段     
    //这里__skb_linearize其实就是__pskb_pull_tail(skb, skb->data_len),这个函数基本上等同于pskb_may_pull     
    //pskb_may_pull的作用就是检测skb对应的主buf中是否有足够的空间来pull出len长度,     
    //如果不够就重新分配skb并将frags中的数据拷贝入新分配的主buff中,而这里将参数len设置为skb->datalen,     
    //也就是会将所有的数据全部拷贝到主buff中,以这种方式完成skb的线性化     
    if (skb_shinfo(skb)->frag_list &&     
        !(dev->features & NETIF_F_FRAGLIST) &&     
        __skb_linearize(skb))     
        goto out_kfree_skb;     
    /* Fragmented skb is linearized if device does not support SG,   
     * or if at least one of fragments is in highmem and device   
     * does not support DMA from it.   
     */    
     //如果上面已经线性化了一次,这里的__skb_linearize就会直接返回     
     //注意区别frags和frag_list,     
     //前者是将多的数据放到单独分配的页面中,sk_buff只有一个。而后者则是连接多个sk_buff     
    if (skb_shinfo(skb)->nr_frags &&     
        (!(dev->features & NETIF_F_SG) || illegal_highdma(dev, skb)) &&     
        __skb_linearize(skb))     
        goto out_kfree_skb;     
    /* 如果此包的校验和还没有计算并且驱动不支持硬件校验和计算,那么需要在这里计算校验和*/    
    if (skb->ip_summed == CHECKSUM_PARTIAL)      
        skb_set_transport_header(skb, skb->csum_start -     
                          skb_headroom(skb));     
        if (!dev_can_checksum(dev, skb) && skb_checksum_help(skb))     
            goto out_kfree_skb;     
         
gso:     
    /* Disable soft irqs for various locks below. Also   
     * stops preemption for RCU.   
     */    
    rcu_read_lock_bh();     
    //选择一个发送队列,如果设备提供了select_queue回调函数就使用它,否则由内核选择一个队列     
    //大部分驱动都不会设置多个队列,而是在调用alloc_etherdev分配net_device时将队列个数设置为1     
    //也就是只有一个队列     
    txq = dev_pick_tx(dev, skb);     
    //从netdev_queue结构上取下设备的qdisc     
    q = rcu_dereference(txq->qdisc);     
#ifdef CONFIG_NET_CLS_ACT     
    skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);     
#endif     
    //上面说大部分驱动只有一个队列,但是只有一个队列也不代表设备准备使用它     
    //这里检查这个队列中是否有enqueue函数,如果有则说明设备会使用这个队列,否则需另外处理     
    //关于enqueue函数的设置,我找到dev_open->dev_activate中调用了qdisc_create_dflt来设置,     
    //不知道一般驱动怎么设置这个queue     
    //需要注意的是,这里并不是将传进来的skb直接发送,而是先入队,然后调度队列,     
    //具体发送哪个包由enqueue和dequeue函数决定,这体现了设备的排队规则     
    if (q->enqueue)      
        spinlock_t *root_lock = qdisc_lock(q);     
        spin_lock(root_lock);     
        if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state)))      
            kfree_skb(skb);     
            rc = NET_XMIT_DROP;     
         else      
            //将skb加入到设备发送队列中,然后调用qdisc_run来发送     
            rc = qdisc_enqueue_root(skb, q);     
            qdisc_run(q); //下面看     
             
        spin_unlock(root_lock);     
        goto out;     
         
    //下面是处理不使用发送队列的情况,注意看下面一段注释     
    /* The device has no queue. Common case for software devices:   
       loopback, all the sorts of tunnels...   
       Really, it is unlikely that netif_tx_lock protection is necessary   
       here.  (f.e. loopback and IP tunnels are clean ignoring statistics   
       counters.)   
       However, it is possible, that they rely on protection   
       made by us here.   
       Check this and shot the lock. It is not prone from deadlocks.   
       Either shot noqueue qdisc, it is even simpler 8)   
     */    
    //要确定设备是开启的,下面还要确定队列是运行的。启动和停止队列由驱动程序决定     
    //详见ULNI中文版P251     
    //如上面英文注释所说,设备没有输出队列典型情况是回环设备     
    //我们所要做的就是直接调用驱动的hard_start_xmit将它发送出去     
    //如果发送失败就直接丢弃,因为没有队列可以保存它     
    if (dev->flags & IFF_UP)      
        int cpu = smp_processor_id(); /* ok because BHs are off */    
        if (txq->xmit_lock_owner != cpu)      
            HARD_TX_LOCK(dev, txq, cpu);     
            if (!netif_tx_queue_stopped(txq))      
                rc = 0;     
                //对于loopback设备,它的hard_start_xmit函数是loopback_xmit     
                //我们可以看到,在loopback_xmit末尾直接调用了netif_rx函数     
                //将带发送的包直接接收了回来     
                //这个函数下面具体分析,返回0表示成功,skb已被free     
                if (!dev_hard_start_xmit(skb, dev, txq))       
                    HARD_TX_UNLOCK(dev, txq);     
                    goto out;     
                     
                 
            HARD_TX_UNLOCK(dev, txq);     
            if (net_ratelimit())     
                printk(KERN_CRIT "Virtual device %s asks to "    
                       "queue packet!\\n", dev->name);     
         else      
            /* Recursion is detected! It is possible,   
             * unfortunately */    
            if (net_ratelimit())     
                printk(KERN_CRIT "Dead loop on virtual device "    
                       "%s, fix it urgently!\\n", dev->name);     
             
         
    rc = -ENETDOWN;     
    rcu_read_unlock_bh();     
out_kfree_skb:     
    kfree_skb(skb);     
    return rc;     
out:     
    rcu_read_unlock_bh();     
    return rc;     
     

    从此函数可以看出,当驱动使用发送队列的时候会循环从队列中取出包发, 而不使用队列的时候只发送一次,如果没发送成功就直接丢弃

    下面看下第二个函数:

int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,     
            struct netdev_queue *txq)     
     
    if (likely(!skb->next))      
        //从这里可以看出,对于每一个发送的包也会给ptype_all一份,     
        //而packet套接字创建时对于proto为ETH_P_ALL的会在ptype_all中注册一个成员     
        //因此对于协议号为ETH_P_ALL的packet套接字来说,发送和接受的数据都能收到     
        //而其他成员似乎不行,这个要回去试试     
        if (!list_empty(&ptype_all))       
            dev_queue_xmit_nit(skb, dev);     
        if (netif_needs_gso(dev, skb))      
            if (unlikely(dev_gso_segment(skb)))     
                goto out_kfree_skb;     
            if (skb->next)     
                goto gso;     
             
        //这个就是驱动提供的发送回调函数了     
        return dev->hard_start_xmit(skb, dev);     
         
gso:     
    do      
        struct sk_buff *nskb = skb->next;     
        int rc;     
        skb->next = nskb->next;     
        nskb->next = NULL;     
        rc = dev->hard_start_xmit(nskb, dev);     
        if (unlikely(rc))      
            nskb->next = skb->next;     
            skb->next = nskb;     
            return rc;     
             
        if (unlikely(netif_tx_queue_stopped(txq) && skb->next))     
            return NETDEV_TX_BUSY;     
     while (skb->next);     
    skb->destructor = DEV_GSO_CB(skb)->destructor;     
out_kfree_skb:     
    kfree_skb(skb);     
    return 0;     
  

qdisc_run和__qdisc_run的功能很简单,就是检查队列是否处于运行状态

 

  1. static inline void qdisc_run(struct Qdisc *q)     
  2.      
  3.     if (!test_and_set_bit(__QDISC_STATE_RUNNING, &q->state))     
  4.         __qdisc_run(q);     
  5.      
  6. void __qdisc_run(struct Qdisc *q)     
  7.      
  8.     unsigned long start_time = jiffies;     
  9.     //真正的操作在这个函数里面     
  10.     while (qdisc_restart(q))      
  11.         /*   
  12.          * Postpone processing if   
  13.          * 1. another process needs the CPU;   
  14.          * 2. we've been doing it for too long.   
  15.          */    
  16.         if (need_resched() || jiffies != start_time)      
  17.             //当需要进行调度或者时间超过了1个时间片的时候就退出循环,退出之前发出软中断请求     
  18.             __netif_schedule(q);     
  19.             break;     
  20.              
  21.          
  22.     clear_bit(__QDISC_STATE_RUNNING, &q->state);     
  23.     

 

然后循环调用qdisc_restart发送数据

下面这个函数qdisc_restart是真正发送数据包的函数,它从队列上取下一个帧,然后尝试将它发送出去,若发送失败则一般是重新入队。

此函数返回值为:发送成功时返回剩余队列长度

                             发送失败时返回0(若发送成功且剩余队列长度为0也返回0)

    

  1. static inline int qdisc_restart(struct Qdisc *q)     
  2.      
  3.     struct netdev_queue *txq;     
  4.     int ret = NETDEV_TX_BUSY;     
  5.     struct net_device *dev;     
  6.     spinlock_t *root_lock;     
  7.     struct sk_buff *skb;     
  8.     /* Dequeue packet */    
  9.     if (unlikely((skb = dequeue_skb(q)) == NULL))     
  10.         return 0;     
  11.     root_lock = qdisc_lock(q);     
  12.     /* And release qdisc */    
  13.     spin_unlock(root_lock);     
  14.     dev = qdisc_dev(q);     
  15.     txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));     
  16.     HARD_TX_LOCK(dev, txq, smp_processor_id());     
  17.     if (!netif_tx_queue_stopped(txq) &&     
  18.         !netif_tx_queue_frozen(txq))     
  19.         ret = dev_hard_start_xmit(skb, dev, txq);     
  20.     HARD_TX_UNLOCK(dev, txq);     
  21.     spin_lock(root_lock);     
  22.     switch (ret)      
  23.     case NETDEV_TX_OK:     
  24.         /* Driver sent out skb successfully */    
  25.         ret = qdisc_qlen(q);     
  26.         break;     
  27.     case NETDEV_TX_LOCKED:     
  28.         /* Driver try lock failed */    
  29.         //有可能其他CPU正占有这个锁,这里处理冲突     
  30.         //函数就不看了,里面的处理流程简单说下     
  31.         //分两种情况,如果占有锁的cpu就是当前cpu,那么释放这个包同时打印一条警告     
  32.         //否则将包重新入队     
  33.         ret = handle_dev_cpu_collision(skb, txq, q);     
  34.         break;     
  35.     default:     
  36.         /* Driver returned NETDEV_TX_BUSY - requeue skb */    
  37.         //当发送队列处于停止状态并且队列中有数据待发送时会返回NETDEV_TX_BUSY     
  38.         //代表发送忙,这种情况下就将包重新入队     
  39.         if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))     
  40.             printk(KERN_WARNING "BUG %s code %d qlen %d\\n",     
  41.                    dev->name, ret, q->q.qlen);     
  42.         ret = dev_requeue_skb(skb, q);     
  43.         break;     
  44.          
  45.     if (ret && (netif_tx_queue_stopped(txq) ||     
  46.             netif_tx_queue_frozen(txq)))     
  47.         ret = 0;     
  48.     return ret;     
  49.     

 

     至此,dev_queue_xmit到驱动层的发送流程就分析完了。

     已经有了dev_queue_xmit函数,为什么还需要软中断来发送呢?

     我们可以看到在dev_queue_xmit中将skb进行了一些处理(比如合并成一个包,计算校验和等)处理完的skb是可以直接发送的了,这时dev_queue_xmit也会先将skb入队(skb一般都是在这个函数中入队的)并且调用qdisc_run尝试发送,但是有可能发送失败,这时就将skb重新入队,调度软中断,并且自己直接返回。软中断只是发送队列中的skb以及释放已经发送的skb,它无需再对skb进行线性化或者校验和处理。另外在队列被停止的情况下,dev_queue_xmit仍然可以把包加入队列,但是不能发送这样在队列被唤醒的时候就需要通过软中断来发送停止期间积压的包。

      简而言之,dev_queue_xmit是对skb做些最后的处理并且第一次尝试发送,软中断是将前者发送失败或者没发完的包发送出去。(其实发送软中断还有一个作用,就是释放已经发送的包,因为某些情况下发送是在硬件中断中完成的,为了提高硬件中断处理效率,内核提供一种方式将释放skb放到软中断中进行,这时只要调用dev_kfree_skb_irq,它将skb加入softnet_data的completion_queue中,然后开启发送软中断,net_tx_action会在软中断中将completion_queue中的skb全部释放掉)




以上是关于Linux 网络协议栈开发代码分析篇之数据收发 —— dev_queue_xmit()函数的主要内容,如果未能解决你的问题,请参考以下文章

Linux 网络协议栈开发代码分析篇之VLAN—— VLAN收发处理

Linux 网络协议栈开发代码分析篇之VLAN—— Linux下VLAN功能的实现概述

linux内核源码分析之网络数据收发流程

SylixOS网络协议栈数据收发流程

Linux网络协议栈7--ipsec收发包流程

Linux从青铜到王者第十八篇:Linux网络基础第二篇之TCP协议