dpdk无锁队列rte_ring实现分析

Posted rayylee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dpdk无锁队列rte_ring实现分析相关的知识,希望对你有一定的参考价值。

1. 概述

rte_ring(以下简称ring)是一个高效率的无锁环形队列,它具有以下特点:

  • FIFO
  • 队列长度是固定的,所有指针存放在数组中
  • 无锁实现(lockless)
  • 多消费者或单消费者出队
  • 多生产者或单消费者入队
  • 批量(bulk)出队 - 出队N个对象,否则失败
  • 批量(bulk)入队 - 入队N个对象,否则失败
  • 突发(burst)出队 - 尽可能地出队N个对象
  • 突发(burst)入队 - 尽可能地入队N个对象

与链表实现的队列相比,ring有以下优点:

  • 更快 - 仅需要一次CAS(Compare-And-Swap)操作
  • 比完全无锁的队列实现更简单
  • 适配批量操作 - 由于指针存放在数组中,相比链表式队列多个对象的操作没有太大的cache miss

当然,ring也有缺点:

  • 队列长度固定
  • 比链表式队列更消耗内存(因为创建的时候队列长度便固定了)

ring的实现借鉴了 [freebsd_ring] 和 [linux_ringbuffer] 。每个ring都有唯一的名字。 用户不可能创建两个具有相同名称的ring(如果尝试调用rte_ring_create()这样做的话,将返回NULL)。

2. ret_ring无锁队列操作图解

下面将以多生产者(multi-producer, mp)的情形来说明ring入队时的操作,多消费者出队的基本原理可以此类比。

每个ring都有两对head,tail指针,一对用于生产者(入队),另一对用于消费者(出队)。在下面各图中,上半部分表示lcore入队函数的局部变量, 下半部分表示ring的成员变量。objX表示队列中的对象。

Step1

一开始,lcore1和lcore2局部变量pro_head和cons_tail都和queue成员一致,局部变量prod_next都指向队列插入位置,即prod_head的前面。

Step2

接下来两个lcore通过CAS指令进行竞争,更新ring->prod_head改为胜者lcore的prod_next:

  • 如果ring->prod_head != prod_head, CAS失败,返回Step1
  • 否则,CAS成功,ring->prod_head = prod_next

下图中,lcore1竞争获胜,而lcore2需要重新进行Step1:

Step3

lcore2上的CAS操作也成功。lcore1将obj4入队,lcore2将obj5入队。

Step4

两个lcore进行竞争,更新ring->prod_tail:

  • 如果ring->prod_tail != prod_head,CAS失败,继续尝试
  • 否则,CAS成功, ring->prod_tail = prod_next

下图中,lcore1竞争获胜,lcore1上的入队操作到此结束。

Step5

lcore2如Step4一样,更新ring->prod_tail。至此lcore2的入队操作也已完成。

3. 代码分析

3.1 rte_ring 结构体:

 1 struct rte_ring
 2    
char name[RTE_RING_NAMESIZE];    /**< Name of the ring. */
 3     int flags;                       /**< Flags supplied at creation. */
 4     const struct rte_memzone *memzone;
 5            
/**< Memzone, if any, containing the rte_ring */
 6
 7    
struct prod
 8        
uint32_t watermark;      /**< Maximum items before EDQUOT. */
 9         uint32_t sp_enqueue;     /**< True, if single producer. */
10         uint32_t size;           /**< Size of ring. */
11         uint32_t mask;           /**< Mask (size-1) of ring. */
12         volatile uint32_t head;  /**< Producer head. */
13         volatile uint32_t tail;  /**< Producer tail. */
14     prod __rte_cache_aligned;
15
16    
struct cons
17        
uint32_t sc_dequeue;     /**< True, if single consumer. */
18         uint32_t size;           /**< Size of the ring. */
19         uint32_t mask;           /**< Mask (size-1) of ring. */
20         volatile uint32_t head;  /**< Consumer head. */
21         volatile uint32_t tail;  /**< Consumer tail. */
22 #ifdef RTE_RING_SPLIT_PROD_CONS
23     cons __rte_cache_aligned;
24
#else
25     cons;
26
#endif
27
28
#ifdef RTE_LIBRTE_RING_DEBUG
29     struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
30
#endif
31
32    
void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.
33                                          * not volatile so need to be careful
34                                          * about compiler re-ordering */
35 ;

3.2 入队函数 __rte_ring_mp_do_enqueue

 1 static inline int __attribute__((always_inline))
 2 __rte_ring_mp_do_enqueue(
struct rte_ring *r, void * const *obj_table,
 3             
unsigned n, enum rte_ring_queue_behavior behavior)
 4
 5    
uint32_t prod_head, prod_next;
 6    
uint32_t cons_tail, free_entries;
 7    
const unsigned max = n;
 8    
int success;
 9    
unsigned i, rep = 0;
10    
uint32_t mask = r->prod.mask;
11    
int ret;
12
13    
do
14         n
= max;
15
16         prod_head
= r->prod.head;
17         cons_tail
= r->cons.tail;
18         free_entries
= (mask + cons_tail - prod_head);
19
20        
if (unlikely(n > free_entries))
21            
if (behavior == RTE_RING_QUEUE_FIXED)
22                
return -ENOBUFS;
23            
24            
else
25                
if (unlikely(free_entries == 0))
26                    
return 0;
27                
28
29                 n
= free_entries;
30            
31        
32
33         prod_next
= prod_head + n;
34         success = rte_atomic32_cmpset(&r->prod.head, prod_head,
35                           prod_next);
36    
while (unlikely(success == 0));
37
38     ENQUEUE_PTRS();

39     rte_smp_wmb();
40
41    
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark))
42         ret
= (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
43                 (int)(n | RTE_RING_QUOT_EXCEED);
44    
45    
else
46         ret
= (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
47    
48

49     while (unlikely(r->prod.tail != prod_head))
50         rte_pause();

51
52        
if (RTE_RING_PAUSE_REP_COUNT &&
53             ++rep == RTE_RING_PAUSE_REP_COUNT)
54             rep
= 0;
55             sched_yield();
56        
57    
58     r
->prod.tail = prod_next;
59    
return ret;
60

第34-36行处理多个producer的竞争,没有竞争到写入位置的线程将继续循环。

第39行插入了一个rte_smp_wmb()调用,对这个函数DPDK文档的解释是:

Write memory barrier between lcores. Guarantees that the STORE operations that precede the rte_smp_wmb() call are globally visible across the lcores before the the STORE operations that follows it.

第49行的循环用于无锁同步对prod.tail的修改。

ENQUEUE_PTRS宏函数:

 1 #define ENQUEUE_PTRS() do \\
 2     const uint32_t size = r->prod.size; \\
 3     uint32_t idx = prod_head & mask; \\
 4     if (likely(idx + n < size)) \\
 5         for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) \\
 6             r->ring[idx] = obj_table[i]; \\
 7             r->ring[idx+1] = obj_table[i+1]; \\
 8             r->ring[idx+2] = obj_table[i+2]; \\
 9             r->ring[idx+3] = obj_table[i+3]; \\
10          \\
11         switch (n & 0x3) \\
12             case 3: r->ring[idx++] = obj_table[i++]; \\
13             case 2: r->ring[idx++] = obj_table[i++]; \\
14             case 1: r->ring[idx++] = obj_table[i++]; \\
15          \\
16      else \\
17         for (i = 0; idx < size; i++, idx++)\\
18             r->ring[idx] = obj_table[i]; \\
19         for (idx = 0; i < n; i++, idx++) \\
20             r->ring[idx] = obj_table[i]; \\
21      \\
22 while(0)

第5行,如果n>4,则把它分成数次写入,每次写入4个指针;不足4的余数在switch语句中写入。

3.3 出队函数 __rte_ring_mc_do_dequeue

 1 static inline int __attribute__((always_inline))
 2 __rte_ring_mc_do_dequeue(
struct rte_ring *r, void **obj_table,
 3         
unsigned n, enum rte_ring_queue_behavior behavior)
 4
 5    
uint32_t cons_head, prod_tail;
 6    
uint32_t cons_next, entries;
 7    
const unsigned max = n;
 8    
int success;
 9    
unsigned i, rep = 0;
10    
uint32_t mask = r->prod.mask;
11
12    
do
13         n
= max;
14
15         cons_head
= r->cons.head;
16         prod_tail
= r->prod.tail;
17         entries
= (prod_tail - cons_head);
18
19        
if (n > entries)
20            
if (behavior == RTE_RING_QUEUE_FIXED)
21                
return -ENOENT;
22            
23            
else
24                
if (unlikely(entries == 0))
25                    
return 0;
26                
27
28                 n
= entries;
29            
30        
31
32         cons_next
= cons_head + n;
33         success = rte_atomic32_cmpset(&r->cons.head, cons_head,
34                           cons_next);
35    
while (unlikely(success == 0));
36
37     DEQUEUE_PTRS();

38     rte_smp_rmb();
39
40     while (unlikely(r->cons.tail != cons_head))
41         rte_pause();

42
43        
if (RTE_RING_PAUSE_REP_COUNT &&
44             ++rep == RTE_RING_PAUSE_REP_COUNT)
45             rep
= 0;
46             sched_yield();
47        
48    
49     r
->cons.tail = cons_next;
50
51    
return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
52

 1 #define DEQUEUE_PTRS() do \\
 2     uint32_t idx = cons_head & mask; \\
 3     const uint32_t size = r->cons.size; \\
 4     if (likely(idx + n < size)) \\
 5         for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) \\
 6             obj_table[i] = r->ring[idx]; \\
 7             obj_table[i+1] = r->ring[idx+1]; \\
 8             obj_table[i+2] = r->ring[idx+2]; \\
 9             obj_table[i+3] = r->ring[idx+3]; \\
10          \\
11         switch (n & 0x3) \\
12             case 3: obj_table[i++] = r->ring[idx++]; \\
13             case 2: obj_table[i++] = r->ring[idx++]; \\
14             case 1: obj_table[i++] = r->ring[idx++]; \\
15          \\
16      else \\
17         for (i = 0; idx < size; i++, idx++) \\
18             obj_table[i] = r->ring[idx]; \\
19         for (idx = 0; i < n; i++, idx++) \\
20             obj_table[i] = r->ring[idx]; \\
21      \\
22 while (0)

3.4 32-bit取模索引

在前面介绍中,prod_head, prod_tail, cons_head 和 cons_tail索引由箭头表示。 但是,在实际实现中,这些值不会假定在0和 size(ring)-1 之间。 索引值在 0 ~ 2^32 -1之间,当我们访问ring本身时,我们屏蔽他们的值。 32bit模数也意味着如果溢出32bit的范围,对索引的操作将自动执行2^32 模。

下面解释索引值如何在ring中使用。为了简化说明,使用模16bit操作,而不是32bit。 另外,四个索引被定义为16bit无符号整数,与实际情况下的32bit无符号数相反。

这个ring包含11000对象。

这个ring包含12536个对象。

我们在上面的例子中使用模65536操作。 在实际执行情况中,这种低效操作是多余的,当溢出时会自动执行。代码始终保证生产者和消费者之间的距离在0 ~ size(ring)-1之间。 基于这个属性,我们可以对两个索引值做减法,而不用考虑溢出问题任何情况下,ring中的对象和空闲对象都在 0 ~ size(ring)-1之间,即便第一个减法操作已经溢出:

uint32_t entries = (prod_tail - cons_head);
uint32_t free_entries = (mask + cons_tail -prod_head);

参考文档

[dpdk_guide_ring]   DPDK programmer’s guide - Ring Library

[freebsd_ring]          FreeBSD buf_ring

[linux_ringbuffer]     Linux Lockless Ring Buffer

[lockfree_queue]     Yet another implementation of a lock-free circular array queue

[lockfree_coolshell] 酷壳:无锁队列的实现

以上是关于dpdk无锁队列rte_ring实现分析的主要内容,如果未能解决你的问题,请参考以下文章

DPDKring从DPDK的ring来看x86无锁队列的实现

DPDK — RING(librte_ring,Ring Manager,环缓冲区管理组件)

DPDK — RING(librte_ring,Ring Manager,环缓冲区管理组件)

DPDK — 数据加速方案的核心思想

DPDK — 数据加速方案的核心思想

dpdk专题