跳表实现弱试

Posted 比较普通的程序员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了跳表实现弱试相关的知识,希望对你有一定的参考价值。

跳表弱试

到了我这般码龄,再记录代码直接相关篇的文章是很难的,有很多难述清楚的原因。
可今天起的太早花了一部分时间做了其他事情,剩下的时间不足以再做点其他的事情,不如……
记录一篇?就选择跳表实现吧——记录跟实现顺序有差异。

1 跳表个人弱理

1.1 最大高度

/**
 * [1] 跳表最大高度
 * 设层级增长概率为 P ,
 * 结点在层 L 的概率为 P^(L-1)
 * 当跳表拥有 N 结点时
 * 层 L 结点数的期望为 N * P^(L-1) 。
 * 层 L 有结点需满足 N * P^(L-1) >= 1 即
 * P^(L-1) >= 1/N --> (1/P)^(L-1) <= N
 *                --> L-1 <= log(1/P)(N)
 *                --> L   <= log(1/P)(N) + 1
 * 即理论上层
 * Lx = log(1/P)(N) + 1 为跳表最高层,只 1 结点。
 * -------------------------------------------
 * 此时最高层 Lx 有 1 结点的概率为 P^(Lx-1) = 1/N。
 * 即当跳表容纳兆量级数据时,最高层有结点的概率为
 * 1/(2^20),即当数据量较大时最高层有结点概率太小,
 * 几乎不可能发生。
 * 
 * 所以最高层的选择可考虑小于 log(1/P)(N) + 1 的层次
 * 设所选择最高层比理论最高层小 m 层即 Lm = Lx - m
 *  = log(1/P)(N) + 1 - m
 *  = log(1/P)(N) + log(1/P)(1/P) - log(1/P)((1/P)^m)
 *  = log(1/P)(N*P^(m-1))
 * 即层 Lm 有 1 结点的概率 P^(Lm-1)  = 1/(N*P^(m-1))
 * 即层 Lm 结点数期望为 N * p^(Lm-1) = 1/(P^(m-1))
 * 当 m = 1 时,层 Lm 结点数期望为 1
 *    m = 2 时,层 Lm 结点数期望为 1/P
 *    m = 3 时,层 Lm 结点数期望为 1/p^2
 * 以此类推。
 * 
 * “Skip Lists: ......”
 * 推荐当 P = 2 时 m = 2,即推荐选择跳表最高层为 log(1/P)(N)。*/

1.2 额外空间消耗

/**
 *[2] 跳表结点平均空间消耗
 * 层级增长概率跟跳表结点平均空间消耗相关,结点平均层级
 * 指针数期望为
 * 1*(1-P) + 2*P*(1-P) + ...+ L*P^(L-1)*(1-P) + ... ∞
 * 通过极限计算,上式约为 1/(1-P) 。
 * 即 P=1/4,跳表结点的平均层级指针为 4/3 个。
 * 
 * 我们可以根据数据量 N 及性能要求(搜索函数处分析)来选择相应的
 * 层级增长概率 P,最大层数 Lm 。*/

综上1.1和1.2,1/2 和 64 可分别为跳表最大层级增长概率和最大适宜高度值,此时跳表结点均占 2 个额外层级指针空间。

1.3 搜索性能

/**
 * 根据 “Skip Lists: ...” 一文方法反向分析查找性能。寻找结点 x 
 * 的过程约“最高层结点数 + C(k)”;C(k) 为  x 结点从 m 层到达最高
 * 层 L 所路过结点数,k = L - m 。
 * [1] 最高层结点数
 *   跳表的较大概率的最大高度为 L = log(1/P)(N),
 *   L 层结点数的期望为 1/P 。
 * [2] 从 x 所在层 m 到达最高层 L 将路过结点数
 *   K = 0, C(k) = 0                                (1)
 *   K > 0, C(K) = (1-P)(1+C(K)) + P(1 + C(K-1))    (2)
 *   即 (1-P)(向左走一结点 + 还需上升 K 层的结点数) + 
 *       P(向上走一结点 + 还需上升 K-1 层的结点数) 
 * 
 * 由(1) (2) 可得 C(1)=1/P, C(2)=2/P,... 用数学归纳法假设
 * C(K-1)=(K-1)/P 可得 C(K)=K/P,所以 C(K) = K/P 。
 * 
 * 综上,搜索结点 x 的需遍历的结点数约为 C(K)+1/P,
 * K 的级别大概率为 O(log(1/P)(N)),所以跳表搜索效率级别为
 * O(log(1/P)(N)) 。*/

2 跳表相关类型定义

2.1 容纳 C 中基本及复合数据类型

typedef union sklData COMM_DATA_U;
union sklData

    void    *a;
    double   d;
    int64_t  s64;
    uint64_t u64;
;

2.2 跳表结点类型

typedef struct sklNode SKL_NODE_S;
/**
 * 跳表结点类型:
 * d     数据;
 * prev  前向结点地址;
 * lvp[] level pointer 层级指针。*/
struct sklNode

    union sklData    d;
    struct sklNode  *prev;
    struct sklNode  *lvp[];
;
#define SKL_NEXT(x) ((x)->lvp[0])

2.3 跳表管理类型

typedef struct sklList SKL_S;
/**
 * 跳表管理类型:
 * app  任命参数;
 * iter 用于第一层链表结点前/后向迭代遍历;
 * cnt  当前结点数;
 * ver  结点数变化次数;
 * muse 内存使用量(字节);
 * lvc  level current 当前层数;
 * seed rand_r(&seed) 用于随机数产生;
 * head 头结点(末尾勿动)。*/
struct sklList

    struct sklApp   app;
    struct sklNode *iter[2];
    uint64_t cnt, ver, muse;
    unsigned int seed;
    signed   int lvc;
    struct sklNode head;
;
/* frontward, backward */
#define SKL_ITERF(s)    ((s)->iter[0])
#define SKL_ITERB(s)    ((s)->iter[1])
#define SKL_NODEV(s, x) ((x) == &(s)->head ? NULL : &(x)->d)

3 接口实现

3.1 原型

/**
 * 外部参数任命(appoint)类型:
 * uext   用户扩展,无则赋 NULL;
 * drset  去赋值回调,无则赋 NULL;
 * dset   赋值回调,无则赋 NULL;
 * dcmp   值比较回调;
 * step   1/step 为跳表结点增长概率
 * lvm    跳表最大层数限制。*/
typedef struct sklApp

    void *uext;
    int (*drset)(struct sklData *d, const void *u);
    int (*dset) (struct sklData *d, const void *u);
    int (*dcmp) (struct sklData *d, const void *u);
    int step, lvm;
 SKL_APP_S;

SKL_S * sklAppoint(const SKL_APP_S *app);
void sklRemove(SKL_S *skl);

const COMM_DATA_U * sklPut(SKL_S *skl, const void *usr);
const COMM_DATA_U * sklGet(const SKL_S *skl, const void *usr);
const COMM_DATA_U * sklDel(SKL_S *skl, const void *usr);

const COMM_DATA_U * sklSeekLast(SKL_S *skl);
const COMM_DATA_U * sklSeekFirst(SKL_S *skl);
const COMM_DATA_U * sklSeek(SKL_S *skl, const void *usr);
const COMM_DATA_U * sklPrev(SKL_S *skl);
const COMM_DATA_U * sklNext(SKL_S *skl);

3.2 接口实现参考

初始化和销毁比较简单就不贴码了。

#define SKL_LVMAX       (64)
#define SKL_LVSTEPMIN   (2)
#define SKL_CNT(skl, lv, mb, op)    \\
(                                  \\
    (skl)->ver  += 1;               \\
    (skl)->cnt  op##= 1;            \\
    (skl)->muse op##= mb;           \\
    (skl)->adj.lvn[lv-1] op##= 1;   \\
)

/**
 * 随机生成跳表结点,理论上单结点拥有 L 
 * 层的概率为(1/step)^(L-1) 。*/
static int sklRandLevel(SKL_S *skl)

    int lv   = 1;
    int lvm  = skl->app.lvm;
    int step = skl->app.step;
    unsigned int seed = skl->seed;

    while (rand_r(&seed) % step == 0 && lv < lvm) ++lv;
    skl->seed = seed;
    return lv;


static SKL_NODE_S * 
sklFindEqualOrLarge(const SKL_S *skl, const void *d)

    int r = -1;
    int i = skl->lvc;
    SKL_VCMPF dcmp = skl->app.dcmp;
    const SKL_NODE_S *n = &skl->head, *x;

    while (--i >= 0) 
        x = n->lvp[i];
        while (x != &skl->head && (r = dcmp(&x->d, d)) < 0)
            n = x, x = x->lvp[i];
        if (!r) break;
    
    return (SKL_NODE_S *)x;


/**
 * 在跳表 skl 中根据 skl->app.dcmp 寻找等于或恰大于(不存在等于)
 * d 的结点,同时查找出指向等于或恰大于 d 结点的各层级指针以及
 * d 结点层级(等于存在时)。
 * 
 * 为减少 CPU 分支预测开销,与 sklFindEqualOrLarge 各成函数。*/
static SKL_NODE_S *
sklFindUpdateLevel(const SKL_S *skl, const void *d, SKL_NODE_S **u[], int *eqlv)

    int i = skl->lvc;
    int r = -1, eq = -1;
    SKL_VCMPF dcmp = skl->app.dcmp;
    const SKL_NODE_S *n = &skl->head, *x;

    while (--i >= 0) 
        x = n->lvp[i];
        while (x != &skl->head && (r = dcmp(&x->d, d)) < 0)
            n = x, x = x->lvp[i];
        u[i] = (SKL_NODE_S **)n->lvp + i;
        if (eq < i && 0 == r) eq = i;
    
    *eqlv = eq + 1;
    return (SKL_NODE_S *)x;


inline static void sklIterSeek(SKL_S *skl, SKL_NODE_S *x)

    SKL_ITERB(skl) = SKL_ITERF(skl) = x;
    return ;


inline static void sklIterUpdate(SKL_S *skl, SKL_NODE_S *del)

    if (SKL_ITERF(skl) == del) SKL_ITERF(skl) = del->prev;
    if (SKL_ITERB(skl) == del) SKL_ITERB(skl) = SKL_NEXT(del);
    return ;


#if 0 /* 外部接口 */
#endif
const COMM_DATA_U * sklPut(SKL_S *skl, const void *usr)

    int i, lv;
    size_t mb;
    SKL_NODE_S *x, *in, *prev;
    SKL_NODE_S **u[SKL_LVMAX];

    x = sklFindUpdateLevel(skl, usr, u, &lv);
    if (lv > 0) return &x->d;

    lv = sklRandLevel(skl);
    mb = sizeof(*in) + lv * sizeof(SKL_NODE_S *);
    in = (SKL_NODE_S *) malloc(mb);
    if (in == NULL) return NULL;

    int ret = app->dset(&in->d, usr);
    if (__glibc_unlikely(SKL_OK != ret)) 
        free(in);
        return NULL;
    

    if (skl->lvc < lv) 
        SKL_NODE_S **lvp = skl->head.lvp;
        for (i = skl->lvc; i < lv; ++i)
            u[i] = lvp + i;
        skl->lvc = lv;
    
    prev = x->prev;
    for (i = 0; i < lv; ++i) 
        in->lvp[i] = *u[i];
        *u[i] = in;
    
    in->prev = prev;
    SKL_NEXT(in)->prev = in;

    SKL_CNT(skl, lv, mb, +);
    return &in->d;


const COMM_DATA_U * sklGet(const SKL_S *skl, const void *usr)

    SKL_NODE_S *x;

    x = sklFindEqualOrLarge(skl, usr);
    return SKL_NODEV(skl, x);


const COMM_DATA_U * sklDel(SKL_S *skl, const void *usr)

    SKL_NODE_S *x, *n;
    SKL_NODE_S **u[SKL_LVMAX];
    int i, lv, lvc = skl->lvc;
    SKL_APP_S *app = &skl->app;

    x = sklFindUpdateLevel(skl, usr, u, &lv);
    if (lv <= 0) return SKL_NODEV(skl, x);

    int ret = app->drset(&x->d, app->uext);
    if (SKL_OK != ret) return NULL;

    sklIterUpdate(skl, x);
    for (i = 0; i < lv; ++i) 
        n = x->lvp[i];
        if (n->lvp[i] == x)
            --lvc;
        *u[i] = n;
    
    SKL_NEXT(x)->prev = x->prev;
    free(x);

    skl->lvc = CMAX(1, lvc);
    SKL_CNT(skl, lv, sizeof(*x) + lv * sizeof(x), -);
    return SKL_NODEV(skl, *u[0]);


const COMM_DATA_U * sklSeek(SKL_S *skl, const void *usr)

    SKL_NODE_S *x;

    x = sklFindEqualOrLarge(skl, usr);
    sklIterSeek(skl, x);
    return SKL_NODEV(skl, x);


const COMM_DATA_U * sklSeekFirst(SKL_S *skl)

    SKL_NODE_S *x;

    x = SKL_NEXT(&skl->head);
    sklIterSeek(skl, x);
    return SKL_NODEV(skl, x);


const COMM_DATA_U * sklSeekLast(SKL_S *skl)

    SKL_NODE_S *x;

    x = skl->head.prev;
    sklIterSeek(skl, x);
    return SKL_NODEV(skl, x);


const COMM_DATA_U * sklPrev(SKL_S *skl)

    SKL_NODE_S *x = SKL_ITERF(skl);
    SKL_ITERF(skl) = x->prev;
    return SKL_NODEV(skl, x);


const COMM_DATA_U * sklNext(SKL_S *skl)

    SKL_NODE_S *x = SKL_ITERB(skl);
    SKL_ITERB(skl) = SKL_NEXT(x);
    return SKL_NODEV(skl, x);

4 扩展:跳表模型补偿尝试

在实现跳表层级增长时,随跳表容纳量增加,伪随机数可能模拟不了理想跳表模型,可再内部为此作一些补偿尝试。
对于 C 伪随机函数 rand_r(),阀值 thr 取 1MiB-2MiB 时,当数据量上升到几十兆/百兆/GiB时搜索性能可提升一些。

/**
 * 跳表模型补偿类型:
 * lvn  各层级结点数;
 * seed 层级随机种子;
 * lvb  起始层级。*/
struct sklAdj

    uint64_t lvn[SKL_LVMAX];
    unsigned seed;
    int lvb;
;

/**
 * 外部参数任命(appoint)类型:
 * uext   用户扩展,无则赋 NULL;
 * drset  去赋值回调,无则赋 NULL;
 * dset   赋值回调,无则赋 NULL;
 * dcmp   值比较回调;
 * adjthr 补偿(调整阈值);
 * adjust 调整标识(0不调整);
 * step   1/step 为跳表结点增长概率;
 * lvm    跳表最大层数限制。*/
typedef struct sklApp

    void *uext;
    int (*drset)(struct sklData *d, const void *u);
    int (*dset) (struct sklData *d, const void *u);
    int (*dcmp) (struct sklData *d, const void *u);
    uint64_t adjthr;
    int adjust;
    int step,lvm;
 SKL_APP_S;

/**
 * 根据跳表理论模型补偿结点层级,进一步提升跳表搜索性能。
 * 得到补偿后,诸如 MiB/GiB 级可提升 2~3 倍。*/
inline static int sklLevelAdjust(SKL_S *skl, int lv)

    int step = skl->app.step;
    uint64_t cnt = skl->cnt + 1;
    uint64_t *lvn = skl->adj.lvn;

    /**
     * 若当前层级数较少则放弃此次调整:
     * lv 层结点数应为 lvn[lv-1] < cnt * (1/step)^(lv-1) */
    if (lvn[lv-1] < cnt * (int)pow(1/(float)step, lv-1))
        return lv;

    /**
     * 以伪随机数最大能匹配的模型层数做一次延伸补偿:
     * 延伸补偿的最高层满足结点期望数 1/P 时停止补偿。*/
    float theroy = log(cnt) / log(step);
    int limit = (int)theroy < theroy ? (int)theroy + 1 : (int)theroy;
    if (__glibc_likely(skl->adj.lvb != -1))
        limit = CMIN(limit, skl->adj.lvb<<1);

    /* 延伸补偿最高层结点数为 1/P,达此数则补偿完成。*/
    if (lvn[limit-1] >= step) return lv;

    if (__glibc_unlikely(skl->adj.lvb == -1)) 
        /**
         * 寻找补偿起始层;
         * 该层作为延伸补偿层数的最大值*/
        int lvb = limit - 1;
        while (lvb >= 0 && 0 == lvn[lvb]) --lvb;
        if (__glibc_unlikely(0 > lvb)) lvb = 0;

        skl->adj.lvb  = lvb;
        skl->adj.seed = time(NULL) ^ lvb;
        limit = CMIN(limit, skl->adj.lvb<<1);
    

    /* 预防相同层级连续生成 */
    int base = skl->adj.lvb;
    if (base > limit - step) base = limit - step;
    for(lv = 1; rand_r(&skl->adj.seed) % step == 0 && lv < limit; ++lv)
        ;/* none */

    lv = base + CMAX(lv % (limit - base), 1) + 1;
    return lv;


/**
 * 随机生成跳表结点,理论上单结点拥有 L 
 * 层的概率为(1/step)^(L-1) 。*/
static int sklRandLevel(SKL_S *skl)

    int lv   = 1;
    int lvm  = skl->app.lvm;
    int step = skl->app.step;
    unsigned int seed = skl->seed;

    while (rand_r(&seed) % step == 0 && lv < lvm) ++lv;
    skl->seed = seed;

    if (skl->app.adjust && (skl->cnt > skl->app.adjthr))
        lv = sklLevelAdjust(skl, lv);
    return lv;

以上是关于跳表实现弱试的主要内容,如果未能解决你的问题,请参考以下文章

跳表与模型补偿实现

跳表与模型补偿实现

学习笔记Redis中有序集合zset的实现原理——跳表

参考Redis源码实现不带span的简单跳表

参考Redis源码实现不带span的简单跳表

面试官:为何Redis使用跳表而非红黑树实现SortedSet?