节点大小可变的环形队列实现

Posted rtoax

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了节点大小可变的环形队列实现相关的知识,希望对你有一定的参考价值。

在文章《Linux C语言在用户态实现一个低时延通知(eventfd)+轮询(无锁队列ring)机制的消息队列》中介绍了用户态低时延消息队列的实现,下面我们简单介绍优化步骤。同时在《对POSIX和SystemV消息队列优化:用户态消息队列》对消息队列接口进行了优化,里面使用的环形队列的节点大小是固定的,如何实现不固定节点大小的环形队列呢?直接上代码:

源码链接:https://github.com/Rtoax/test/tree/master/ipc/ring/variable-node-size

本文意在解决日志管理系统的时延和吞吐瓶颈问题:

 

 

 

ring.h

/**
 *  可变节点大小的队列
 *  
 *  作者:荣涛
 *  日期:
 *      2021年6月1日 创建 并完成初始版本
 *  
 */
#ifndef ____RING_H
#define ____RING_H 1

#include <assert.h>
#include <malloc.h>
#include <string.h>
#include <math.h>
#include <stdbool.h>

#ifndef likely
#define likely(x)    __builtin_expect(!!(x), 1)
#endif
#ifndef unlikely
#define unlikely(x)  __builtin_expect(!!(x), 0)
#endif
#ifndef __cachelinealigned
#define __cachelinealigned __attribute__((aligned(64)))
#endif
#ifndef _unused
#define _unused             __attribute__((unused))
#endif

static void  inline _unused mbarrier() { asm volatile("": : :"memory"); }
static void  inline _unused mrwbarrier() { asm volatile("mfence":::"memory"); }
static void  inline _unused mrbarrier()  { asm volatile("lfence":::"memory"); }
static void  inline _unused mwbarrier()  { asm volatile("sfence":::"memory"); }
static void  inline _unused __relax()  { asm volatile ("pause":::"memory"); }

struct __ring_node {
    unsigned int _valide:1;
    unsigned int _reserve:7;
    unsigned int _data_size:8;  //最大数据大小 256
    char _data[];
}__attribute__((packed));

struct __ring {
    size_t _nodes_size;   //总大小
    
    volatile size_t  _head;
    volatile size_t  _tail;

    char _nodes[];
} __cachelinealigned;


static size_t _unused __power_of_2(unsigned int size) 
{
    unsigned int i;
    for (i=0; (1U << i) < size; i++);
    return 1U << i;
}


static struct __ring* _unused __ring_create(size_t size)
{
    size_t nodes_size = __power_of_2(size);
    size_t total_size = nodes_size + sizeof(struct __ring);

    printf("nodes_size = %d, total_size = %d\\n", nodes_size, total_size);

    struct __ring *new_ring = (struct __ring *)malloc(total_size);
    
    assert(new_ring && "OOM error");

    memset(new_ring, 0x00, total_size);
    
    new_ring->_nodes_size = nodes_size;
    new_ring->_head = size/2;
    new_ring->_tail = size/2;


    return new_ring;
}


/*
----- 空闲
##### 已使用
***** 即将填充
%%%%% 空闲但不使用
*/
static inline bool _unused __ring_enqueue(struct __ring *ring, const void *msg, const size_t size) 
{
    assert(ring);
    assert(msg);
    assert(size < ring->_nodes_size);
    
    const size_t node_size = size + sizeof(struct __ring_node);

    size_t head = ring->_head;
    size_t tail = ring->_tail;
    size_t next_tail = (tail + node_size) & (ring->_nodes_size-1);

    /* tail指针将翻转
        
         next_tail   head    tail  
            **-------##########%%%%%%%%%%%
     */
    bool beyond = (next_tail < tail);
    if(unlikely(beyond)) {
//        printf("beyond.\\n");
        struct __ring_node *tmp = (struct __ring_node *)&ring->_nodes[tail];
        tmp->_data_size = ring->_nodes_size - tail;
        tmp->_valide    = 0;
        tail = 0;
        next_tail = node_size;
        if(next_tail >= head) {
//            printf("full1. (%d,%d)\\n", ring->_head, ring->_tail);
            return false;
        }
    } else {
        /*
                    head    tail  next_tail
            ---------##########*****------
         */
        if(ring->_nodes_size - tail < node_size) {
//            printf("full3. (%d,%d)\\n", ring->_head, ring->_tail);
            return false;
        }
        /*
             tail  head      
            ###***---#################%%%%
               next_tail
         */
        if(tail < head && next_tail > head) {
//            printf("full4. (%d,%d)\\n", ring->_head, ring->_tail);
            return false;
        }
    }
    
    struct __ring_node *node = (struct __ring_node *)&ring->_nodes[tail];

    node->_data_size    = size;
    node->_valide       = 1;
    memcpy(node->_data, msg, size);
    
    mwbarrier();
    
//    printf("insert: head = %d, tail = %d, size = %ld, node = %p\\n", ring->_head, tail, node->_data_size, node);
    
    ring->_tail = tail + node_size;
    
    return true;
}


static bool inline _unused __ring_dequeue( struct __ring *const ring, void *msg, size_t *size) 
{
    assert(ring);
    assert(msg);
    assert(size);

    size_t tail = ring->_tail;
    size_t head = ring->_head;

try_again:

    
    if (head == tail) {
//        printf("empty.\\n");
        return false;
    }

    struct __ring_node *node = (struct __ring_node *)&ring->_nodes[head];
    
    /* 结尾的 不可用包 %%%
       
        next_tail   head    tail  
           xxxxxxxxxxxxxxxxxxxxxxxx%%%
    */
    if(!node->_valide) {
//        printf("invalide node.\\n");
        mbarrier();
        ring->_head = 0;
        head = 0;
        goto try_again;
    }
    
    *size = node->_data_size;
    
    const size_t node_size = (*size) + sizeof(struct __ring_node);

    memcpy(msg, node->_data, *size);

    mbarrier();
    

    bool beyond = !!((head + node_size) > ring->_nodes_size);

    ring->_head = beyond?0:(head + node_size) & (ring->_nodes_size-1);
    
//    printf("delete: head = %d, tail = %d, size = %ld, node = %p\\n", ring->_head, tail, node->_data_size, node);
    
    return true;
}



#endif /*<____RING_H>*/

 

test.c

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

#include "ring.h"

struct msg {
#define MSG_MAGIC   0x1234abcd
    int magic;
    int magic2;
    int magic3;
    int magic4;
    int magic5;
    char pad[];
};

struct __ring* ring = NULL;


void *enqueue(void*arg)
{
    int i = 0;
    while(1) {
        i ++;
        size_t size = sizeof(struct msg) + i%100;
        struct msg *m = (struct msg *)malloc(size);
        m->magic = MSG_MAGIC;
        while(!__ring_enqueue(ring, m, size)) {
            __relax();
        }
//        printf("enqueue: msg.magic = 0x%x, size = %ld\\n", m->magic, size);
        free(m);
    }
}

void *dequeue(void*arg)
{
    char buffer[1024];
    size_t size;
    while(1) {
        while(__ring_dequeue(ring, buffer, &size)) {
            struct msg *m = (struct msg *)buffer;
//            printf("dequeue: msg.magic = 0x%x, size = %ld\\n", m->magic, size);
        }
        __relax();
    }
}


int main()
{
    unsigned int ring_size = getpagesize()*100;
//    unsigned int ring_size = 128;
    ring = __ring_create(ring_size);

    pthread_t tasks[4];

    pthread_create(&tasks[0], NULL, dequeue, NULL);
    pthread_create(&tasks[1], NULL, enqueue, NULL);

    pthread_join(tasks[0], NULL);
    pthread_join(tasks[1], NULL);

    return 0;
}

 

以上是关于节点大小可变的环形队列实现的主要内容,如果未能解决你的问题,请参考以下文章

数据结构(10)---队列之环形队列

什么是环形队列,采用什么方法实现环形队列

栈和队列----向有序的环形单链表中插入新节点

java实现环形队列

java数据结构与算法:单向队列与环形队列详解(图片+代码)

柔性数组和环形队列之间的故事