使用无锁队列(环形缓冲区)注意事项

Posted wangshaowei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用无锁队列(环形缓冲区)注意事项相关的知识,希望对你有一定的参考价值。

环形缓冲区是生产者和消费者模型中常用的数据结构。生产者将数据放入数组的尾端,而消费者从数组的另一端移走数据,当达到数组的尾部时,生产者绕回到数组的头部。如果只有一个生产者和一个消费者,那么就可以做到免锁访问环形缓冲区(Ring Buffer)。写入索引只允许生产者访问并修改,只要写入者在更新索引之前将新的值保存到缓冲区中,则读者将始终看到一致的数据结构。同理,读取索引也只允许消费者访问并修改。

环形缓冲区实现原理图

                       技术分享图片

如图所示,当读者和写者指针相等时,表明缓冲区是空的,而只要写入指针在读取指针后面时,表明缓冲区已满。

清单 9. 2.6.10 环形缓冲区实现代码

技术分享图片
 /*

 * __kfifo_put - puts some data into the FIFO, no locking version

 * Note that with only one concurrent reader and one concurrent

 * writer, you don‘t need extra locking to use these functions.

 */

 unsigned int __kfifo_put(struct kfifo *fifo,

       unsigned char *buffer, unsigned int len)

 {

  unsigned int l;

  len = min(len, fifo->size - fifo->in + fifo->out);

  /* first put the data starting from fifo->in to buffer end */

  l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));

  memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

  /* then put the rest (if any) at the beginning of the buffer */

  memcpy(fifo->buffer, buffer + l, len - l);

  fifo->in += len;

  return len;

 }

 

 /*

 * __kfifo_get - gets some data from the FIFO, no locking version

 * Note that with only one concurrent reader and one concurrent

 * writer, you don‘t need extra locking to use these functions.

 */

 unsigned int __kfifo_get(struct kfifo *fifo,

     unsigned char *buffer, unsigned int len)

 {

  unsigned int l;

  len = min(len, fifo->in - fifo->out);

  /* first get the data from fifo->out until the end of the buffer */

  l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));

  memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

  /* then get the rest (if any) from the beginning of the buffer */

  memcpy(buffer + l, fifo->buffer, len - l);

  fifo->out += len;

  return len;

 }
技术分享图片

 

需要注意的是

使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)时,如果返回参数与传入参数len不相等时,则操作失败

 

我们定义一个

//注意student_info 共17字节 按照内存排列占24字节

typedef struct student_info

{

    uint64_t stu_id; //8个字节

    uint32_t age;  //4字节

    uint32_t score;//4字节

     char sex;//1字节

}student_info;

 

我们建立一个环形缓冲区,里面只有64字节大小(虽然我们实际使用时大小远大于此),向里面多次存入24字节student_info,看有什么反应

 

技术分享图片
//打印学生信息

void print_student_info(const student_info *stu_info)

{

    assert(stu_info);

    printf("id:%lu	",stu_info->stu_id);

    printf("age:%u	",stu_info->age);

     printf("sex:%d	",stu_info->sex);

    printf("score:%u
",stu_info->score);

}

 

student_info * get_student_info(time_t timer)

{

    student_info *stu_info = (student_info *)malloc(sizeof(student_info));

    srand(timer);

    stu_info->stu_id = 10000 + rand() % 9999;

    stu_info->age = rand() % 30;

    stu_info->score = rand() % 101;

     stu_info->sex=rand() % 2;

    print_student_info(stu_info);

    return stu_info;

}

void print_ring_buffer_len(struct ring_buffer *ring_buf)

{

     //用于打印缓冲区长度

     uint32_t ring_buf_len = 0;

     //取得已经使用缓冲区长度 size-ring_buf_len为未使用缓冲区的长度

     ring_buf_len=ring_buffer_len(ring_buf);

     printf("no use ring_buf_len:%d
",(ring_buf->size-ring_buf_len));

}

int main(int argc, char *argv[])

{

    uint32_t size = 0;

     //用于判断存储或者取得数据的字节数

     uint32_t oklen = 0;

    struct ring_buffer *ring_buf = NULL;

    //64字节

    size=BUFFER_SIZE;

    ring_buf = ring_buffer_alloc(size);

     printf("input student
");

     {

         student_info *stu_info;

         student_info stu_temp;

         uint32_t student_len=sizeof(student_info);

         printf("ring_buf_len:%d
",ring_buf->size);

         printf("student_len:%d
",student_len);

         //此时环形缓冲区没有数据我们去取数据当然为空

         memset(&stu_temp,0,student_len);

         oklen=ring_buffer_get(ring_buf, (void *)(&stu_temp), student_len);

         if(oklen==student_len)

         {

            printf("get student data
");

         }

         else

         {

            printf("no student data
");

         }

         printf("
");

        //第一次调用时用字节结束后还有64-24 =40字节

         stu_info = get_student_info(976686458);

         oklen = ring_buffer_put(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

           printf("1 put student data success
");

         }

         else

         {

            printf("1 put student data failure
");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("
");

         //第二次调用时用字节结束后还有64-48 =16字节

         stu_info = get_student_info(976686464);

         oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

           printf("2 put student data success
");

         }

         else

         {

            printf("2 put student data failure
");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("
");

         //第三次调用时需要用字节但只有字节失败

         //把字节都写满了

        //验证了在调用__kfifo_put函数或者__kfifo_get函数时,如果返回参数与传入参数len不相等时,则操作失败

         stu_info = get_student_info(976686445);

         oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

           printf("3 put student data success
");

         }

         else

         {

            printf("3 put student data failure
");

         }

         print_ring_buffer_len(ring_buf);

       

         printf("
");

         //第四次调用时需要用字节但无字节

         ////验证了在调用__kfifo_put函数或者__kfifo_get函数时,如果返回参数与传入参数len不相等时,则操作失败

         stu_info = get_student_info(976686421);

         oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

           printf("4 put student data success
");

         }

         else

         {

            printf("4 put student data failure
");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("
");

         //现在开始取学生数据里面保存了个学生数据我们取三次看效果

         printf("output student
");

 

         printf("
");

         //第一次取得数据并打印

         memset(stu_info,0,student_len);

          oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

            print_student_info(stu_info);

           printf("1 get student data success
");

         }

         else

         {

            printf("1 get student data failure
");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("
");

         ////第二次取得数据并打印

         memset(stu_info,0,student_len);

          oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

            print_student_info(stu_info);

           printf("2 get student data success
");

         }

         else

         {

            printf("2 get student data failure
");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("
");

        //第三次取得数据失败

         memset(stu_info,0,student_len);

         oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

            print_student_info(stu_info);

           printf("3 get student data success
");

         }

         else

         {

             printf("3 get student data failure
");

         }

         print_ring_buffer_len(ring_buf);

 

     }

 

     return 1;

}
技术分享图片

 

     技术分享图片

 

结论:在使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)时,如果返回参数与传入参数len不相等时,则操作失败。代码下载:tessc.rarhttp://files.cnblogs.com/dragonsuc/tessc.rar

需要注意的地方:

1.只有一个线程负责读,另一个线程负责写的时候,数据是线程安全的。上面的实现是基于这个原理实现的,当有多个线程读或者多个线程写的时候,不保证数据的正确性。
所以使用的时候,一个线程写,一个线程读。网络应用中比较常用,就是开一个线程接口数据,然后把数据写入队列。然后开一个调度线程读取网络数据,然后分发到处理线程。

2.数据长度默认宏定义了一个长度,超过这个长度的时候,后续的数据会写入失败。

本文参考文章:

http://blog.csdn.net/mergerly/article/details/39009473

http://www.cnblogs.com/Anker/p/3481373.html


以上是关于使用无锁队列(环形缓冲区)注意事项的主要内容,如果未能解决你的问题,请参考以下文章

高效无锁环形队列

高效无锁环形队列

高效无锁环形队列

高效无锁环形队列

CAS无锁队列的实现

DPDK ring库:环形缓冲区的解剖