跳表与模型补偿实现
Posted 资质平庸的程序员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了跳表与模型补偿实现相关的知识,希望对你有一定的参考价值。
到了我这般码龄,再记录代码直接相关篇的文章是很难的,有很多难述清楚的原因。
可今天起的太早花了一部分时间做了其他事情,剩下的时间不足以再做点其他的事情,不如……
记录一篇?就选择跳表实现吧——记录跟实现顺序有差异。
1 跳表个人弱理
1.1 最大高度
/**
* [1] 跳表最大高度
* 设层级增长概率为 P ,
* 结点在层 L 的概率为 P^(L-1)
* 当跳表拥有 N 结点时
* 层 L 结点数的期望为 N * P^(L-1) 。
* 层 L 有结点需满足 N * P^(L-1) >= 1 即
* P^(L-1) >= 1/N --> (1/P)^(L-1) <= N
* --> L-1 <= log(1/P)(N)
* --> L <= log(1/P)(N) + 1
* 即理论上层
* Lx = log(1/P)(N) + 1 为跳表最高层,只 1 结点。
* -------------------------------------------
* 此时最高层 Lx 有 1 结点的概率为 P^(Lx-1) = 1/N。
* 即当跳表容纳兆量级数据时,最高层有结点的概率为
* 1/(2^20),即当数据量较大时最高层有结点概率太小,
* 几乎不可能发生。
*
* 所以最高层的选择可考虑小于 log(1/P)(N) + 1 的层次
* 设所选择最高层比理论最高层小 m 层即 Lm = Lx - m
* = log(1/P)(N) + 1 - m
* = log(1/P)(N) + log(1/P)(1/P) - log(1/P)((1/P)^m)
* = log(1/P)(N*P^(m-1))
* 即层 Lm 有 1 结点的概率 P^(Lm-1) = 1/(N*P^(m-1))
* 即层 Lm 结点数期望为 N * p^(Lm-1) = 1/(P^(m-1))
* 当 m = 1 时,层 Lm 结点数期望为 1
* m = 2 时,层 Lm 结点数期望为 1/P
* m = 3 时,层 Lm 结点数期望为 1/p^2
* 以此类推。
*
* “Skip Lists: ......”
* 推荐当 P = 2 时 m = 2,即推荐选择跳表最高层为 log(1/P)(N)。*/
1.2 额外空间消耗
/**
*[2] 跳表结点平均空间消耗
* 层级增长概率跟跳表结点平均空间消耗相关,结点平均层级
* 指针数期望为
* 1*(1-P) + 2*P*(1-P) + ...+ L*P^(L-1)*(1-P) + ... ∞
* 通过极限计算,上式约为 1/(1-P) 。
* 即 P=1/4,跳表结点的平均层级指针为 4/3 个。
*
* 我们可以根据数据量 N 及性能要求(搜索函数处分析)来选择相应的
* 层级增长概率 P,最大层数 Lm 。*/
综上1.1和1.2,1/2 和 64 可分别为跳表最大层级增长概率和最大适宜高度值,此时跳表结点均占 2 个额外层级指针空间。
1.3 搜索性能
/**
* 根据 “Skip Lists: ...” 一文方法反向分析查找性能。寻找结点 x
* 的过程约“最高层结点数 + C(k)”;C(k) 为 x 结点从 m 层到达最高
* 层 L 所路过结点数,k = L - m 。
* [1] 最高层结点数
* 跳表的较大概率的最大高度为 L = log(1/P)(N),
* L 层结点数的期望为 1/P 。
* [2] 从 x 所在层 m 到达最高层 L 将路过结点数
* K = 0, C(k) = 0 (1)
* K > 0, C(K) = (1-P)(1+C(K)) + P(1 + C(K-1)) (2)
* 即 (1-P)(向左走一结点 + 还需上升 K 层的结点数) +
* P(向上走一结点 + 还需上升 K-1 层的结点数)
*
* 由(1) (2) 可得 C(1)=1/P, C(2)=2/P,... 用数学归纳法假设
* C(K-1)=(K-1)/P 可得 C(K)=K/P,所以 C(K) = K/P 。
*
* 综上,搜索结点 x 的需遍历的结点数约为 C(K)+1/P,
* K 的级别大概率为 O(log(1/P)(N)),所以跳表搜索效率级别为
* O(log(1/P)(N)) 。*/
2 跳表相关类型定义
2.1 容纳 C 中基本及复合数据类型
typedef union sklData COMM_DATA_U;
union sklData
void *a;
double d;
int64_t s64;
uint64_t u64;
;
2.2 跳表结点类型
typedef struct sklNode SKL_NODE_S;
/**
* 跳表结点类型:
* d 数据;
* prev 前向结点地址;
* lvp[] level pointer 层级指针。*/
struct sklNode
union sklData d;
struct sklNode *prev;
struct sklNode *lvp[];
;
#define SKL_NEXT(x) ((x)->lvp[0])
2.3 跳表管理类型
typedef struct sklList SKL_S;
/**
* 跳表管理类型:
* app 任命参数;
* iter 用于第一层链表结点前/后向迭代遍历;
* cnt 当前结点数;
* ver 结点数变化次数;
* muse 内存使用量(字节);
* lvc level current 当前层数;
* seed rand_r(&seed) 用于随机数产生;
* head 头结点(末尾勿动)。*/
struct sklList
struct sklApp app;
struct sklNode *iter[2];
uint64_t cnt, ver, muse;
unsigned int seed;
signed int lvc;
struct sklNode head;
;
/* frontward, backward */
#define SKL_ITERF(s) ((s)->iter[0])
#define SKL_ITERB(s) ((s)->iter[1])
#define SKL_NODEV(s, x) ((x) == &(s)->head ? NULL : &(x)->d)
3 接口实现
3.1 原型
/**
* 外部参数任命(appoint)类型:
* uext 用户扩展,无则赋 NULL;
* drset 去赋值回调,无则赋 NULL;
* dset 赋值回调,无则赋 NULL;
* dcmp 值比较回调;
* step 1/step 为跳表结点增长概率
* lvm 跳表最大层数限制。*/
typedef struct sklApp
void *uext;
int (*drset)(struct sklData *d, const void *u);
int (*dset) (struct sklData *d, const void *u);
int (*dcmp) (struct sklData *d, const void *u);
int step, lvm;
SKL_APP_S;
SKL_S * sklAppoint(const SKL_APP_S *app);
void sklRemove(SKL_S *skl);
const COMM_DATA_U * sklPut(SKL_S *skl, const void *usr);
const COMM_DATA_U * sklGet(const SKL_S *skl, const void *usr);
const COMM_DATA_U * sklDel(SKL_S *skl, const void *usr);
const COMM_DATA_U * sklSeekLast(SKL_S *skl);
const COMM_DATA_U * sklSeekFirst(SKL_S *skl);
const COMM_DATA_U * sklSeek(SKL_S *skl, const void *usr);
const COMM_DATA_U * sklPrev(SKL_S *skl);
const COMM_DATA_U * sklNext(SKL_S *skl);
3.2 接口实现参考
初始化和销毁比较简单就不贴码了。
#define SKL_LVMAX (64)
#define SKL_LVSTEPMIN (2)
#define SKL_CNT(skl, lv, mb, op) \\
( \\
(skl)->ver += 1; \\
(skl)->cnt op##= 1; \\
(skl)->muse op##= mb; \\
(skl)->adj.lvn[lv-1] op##= 1; \\
)
/**
* 随机生成跳表结点,理论上单结点拥有 L
* 层的概率为(1/step)^(L-1) 。*/
static int sklRandLevel(SKL_S *skl)
int lv = 1;
int lvm = skl->app.lvm;
int step = skl->app.step;
unsigned int seed = skl->seed;
while (rand_r(&seed) % step == 0 && lv < lvm) ++lv;
skl->seed = seed;
return lv;
static SKL_NODE_S *
sklFindEqualOrLarge(const SKL_S *skl, const void *d)
int r = -1;
int i = skl->lvc;
SKL_VCMPF dcmp = skl->app.dcmp;
const SKL_NODE_S *n = &skl->head, *x;
while (--i >= 0)
x = n->lvp[i];
while (x != &skl->head && (r = dcmp(&x->d, d)) < 0)
n = x, x = x->lvp[i];
if (!r) break;
return (SKL_NODE_S *)x;
/**
* 在跳表 skl 中根据 skl->app.dcmp 寻找等于或恰大于(不存在等于)
* d 的结点,同时查找出指向等于或恰大于 d 结点的各层级指针以及
* d 结点层级(等于存在时)。
*
* 为减少 CPU 分支预测开销,与 sklFindEqualOrLarge 各成函数。*/
static SKL_NODE_S *
sklFindUpdateLevel(const SKL_S *skl, const void *d, SKL_NODE_S **u[], int *eqlv)
int i = skl->lvc;
int r = -1, eq = -1;
SKL_VCMPF dcmp = skl->app.dcmp;
const SKL_NODE_S *n = &skl->head, *x;
while (--i >= 0)
x = n->lvp[i];
while (x != &skl->head && (r = dcmp(&x->d, d)) < 0)
n = x, x = x->lvp[i];
u[i] = (SKL_NODE_S **)n->lvp + i;
if (eq < i && 0 == r) eq = i;
*eqlv = eq + 1;
return (SKL_NODE_S *)x;
inline static void sklIterSeek(SKL_S *skl, SKL_NODE_S *x)
SKL_ITERB(skl) = SKL_ITERF(skl) = x;
return ;
inline static void sklIterUpdate(SKL_S *skl, SKL_NODE_S *del)
if (SKL_ITERF(skl) == del) SKL_ITERF(skl) = del->prev;
if (SKL_ITERB(skl) == del) SKL_ITERB(skl) = SKL_NEXT(del);
return ;
#if 0 /* 外部接口 */
#endif
const COMM_DATA_U * sklPut(SKL_S *skl, const void *usr)
int i, lv;
size_t mb;
SKL_NODE_S *x, *in, *prev;
SKL_NODE_S **u[SKL_LVMAX];
x = sklFindUpdateLevel(skl, usr, u, &lv);
if (lv > 0) return &x->d;
lv = sklRandLevel(skl);
mb = sizeof(*in) + lv * sizeof(SKL_NODE_S *);
in = (SKL_NODE_S *) malloc(mb);
if (in == NULL) return NULL;
int ret = app->dset(&in->d, usr);
if (__glibc_unlikely(SKL_OK != ret))
free(in);
return NULL;
if (skl->lvc < lv)
SKL_NODE_S **lvp = skl->head.lvp;
for (i = skl->lvc; i < lv; ++i)
u[i] = lvp + i;
skl->lvc = lv;
prev = x->prev;
for (i = 0; i < lv; ++i)
in->lvp[i] = *u[i];
*u[i] = in;
in->prev = prev;
SKL_NEXT(in)->prev = in;
SKL_CNT(skl, lv, mb, +);
return &in->d;
const COMM_DATA_U * sklGet(const SKL_S *skl, const void *usr)
SKL_NODE_S *x;
x = sklFindEqualOrLarge(skl, usr);
return SKL_NODEV(skl, x);
const COMM_DATA_U * sklDel(SKL_S *skl, const void *usr)
SKL_NODE_S *x, *n;
SKL_NODE_S **u[SKL_LVMAX];
int i, lv, lvc = skl->lvc;
SKL_APP_S *app = &skl->app;
x = sklFindUpdateLevel(skl, usr, u, &lv);
if (lv <= 0) return SKL_NODEV(skl, x);
int ret = app->drset(&x->d, app->uext);
if (SKL_OK != ret) return NULL;
sklIterUpdate(skl, x);
for (i = 0; i < lv; ++i)
n = x->lvp[i];
if (n->lvp[i] == x)
--lvc;
*u[i] = n;
SKL_NEXT(x)->prev = x->prev;
free(x);
skl->lvc = CMAX(1, lvc);
SKL_CNT(skl, lv, sizeof(*x) + lv * sizeof(x), -);
return SKL_NODEV(skl, *u[0]);
const COMM_DATA_U * sklSeek(SKL_S *skl, const void *usr)
SKL_NODE_S *x;
x = sklFindEqualOrLarge(skl, usr);
sklIterSeek(skl, x);
return SKL_NODEV(skl, x);
const COMM_DATA_U * sklSeekFirst(SKL_S *skl)
SKL_NODE_S *x;
x = SKL_NEXT(&skl->head);
sklIterSeek(skl, x);
return SKL_NODEV(skl, x);
const COMM_DATA_U * sklSeekLast(SKL_S *skl)
SKL_NODE_S *x;
x = skl->head.prev;
sklIterSeek(skl, x);
return SKL_NODEV(skl, x);
const COMM_DATA_U * sklPrev(SKL_S *skl)
SKL_NODE_S *x = SKL_ITERF(skl);
SKL_ITERF(skl) = x->prev;
return SKL_NODEV(skl, x);
const COMM_DATA_U * sklNext(SKL_S *skl)
SKL_NODE_S *x = SKL_ITERB(skl);
SKL_ITERB(skl) = SKL_NEXT(x);
return SKL_NODEV(skl, x);
4 扩展:跳表模型补偿尝试
在实现跳表层级增长时,随跳表容纳量增加,伪随机数可能模拟不了理想跳表模型,可再内部为此作一些补偿尝试。
对于 C 伪随机函数 rand_r(),阀值 thr 取 1MiB-2MiB 时,当数据量上升到几十兆/百兆/GiB时搜索性能可提升一些。
/**
* 跳表模型补偿类型:
* lvn 各层级结点数;
* seed 层级随机种子;
* lvb 起始层级。*/
struct sklAdj
uint64_t lvn[SKL_LVMAX];
unsigned seed;
int lvb;
;
/**
* 外部参数任命(appoint)类型:
* uext 用户扩展,无则赋 NULL;
* drset 去赋值回调,无则赋 NULL;
* dset 赋值回调,无则赋 NULL;
* dcmp 值比较回调;
* adjthr 补偿(调整阈值);
* adjust 调整标识(0不调整);
* step 1/step 为跳表结点增长概率;
* lvm 跳表最大层数限制。*/
typedef struct sklApp
void *uext;
int (*drset)(struct sklData *d, const void *u);
int (*dset) (struct sklData *d, const void *u);
int (*dcmp) (struct sklData *d, const void *u);
uint64_t adjthr;
int adjust;
int step,lvm;
SKL_APP_S;
/**
* 根据跳表理论模型补偿结点层级,进一步提升跳表搜索性能。
* 得到补偿后,诸如 MiB/GiB 级可提升 2~3 倍。*/
inline static int sklLevelAdjust(SKL_S *skl, int lv)
int step = skl->app.step;
uint64_t cnt = skl->cnt + 1;
uint64_t *lvn = skl->adj.lvn;
/**
* 若当前层级数较少则放弃此次调整:
* lv 层结点数应为 lvn[lv-1] < cnt * (1/step)^(lv-1) */
if (lvn[lv-1] < cnt * (int)pow(1/(float)step, lv-1))
return lv;
/**
* 以伪随机数最大能匹配的模型层数做一次延伸补偿:
* 延伸补偿的最高层满足结点期望数 1/P 时停止补偿。*/
float theroy = log(cnt) / log(step);
int limit = (int)theroy < theroy ? (int)theroy + 1 : (int)theroy;
if (__glibc_likely(skl->adj.lvb != -1))
limit = CMIN(limit, skl->adj.lvb<<1);
/* 延伸补偿最高层结点数为 1/P,达此数则补偿完成。*/
if (lvn[limit-1] >= step) return lv;
if (__glibc_unlikely(skl->adj.lvb == -1))
/**
* 寻找补偿起始层;
* 该层作为延伸补偿层数的最大值*/
int lvb = limit - 1;
while (lvb >= 0 && 0 == lvn[lvb]) --lvb;
if (__glibc_unlikely(0 > lvb)) lvb = 0;
skl->adj.lvb = lvb;
skl->adj.seed = time(NULL) ^ lvb;
limit = CMIN(limit, skl->adj.lvb<<1);
/* 预防相同层级连续生成 */
int base = skl->adj.lvb;
if (base > limit - step) base = limit - step;
for(lv = 1; rand_r(&skl->adj.seed) % step == 0 && lv < limit; ++lv)
;/* none */
lv = base + CMAX(lv % (limit - base), 1) + 1;
return lv;
/**
* 随机生成跳表结点,理论上单结点拥有 L
* 层的概率为(1/step)^(L-1) 。*/
static int sklRandLevel(SKL_S *skl)
int lv = 1;
int lvm = skl->app.lvm;
int step = skl->app.step;
unsigned int seed = skl->seed;
while (rand_r(&seed) % step == 0 && lv < lvm) ++lv;
skl->seed = seed;
if (skl->app.adjust && (skl->cnt > skl->app.adjthr))
lv = sklLevelAdjust(skl, lv);
return lv;
以上是关于跳表与模型补偿实现的主要内容,如果未能解决你的问题,请参考以下文章