redis源码学习rax,我愿称之为“升级版字典树”
Posted 看,未来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码学习rax,我愿称之为“升级版字典树”相关的知识,希望对你有一定的参考价值。
rax VS trie
对于这个rax,那我们可更不陌生了,我觉得它就是把 trie 进行一个变种、压缩、强化。
关于 trie,字典树,咱也是手写过的:数据结构(12)-- 前缀树(字典树、Trie)
看一下前缀树和基数树的差别哈,以后有空了自己再手写一个基数树。
* 假设要存三个字符串:foo, footer, foobar
* 这是一个没有压缩的结构
*
* (f) ""
* \\
* (o) "f"
* \\
* (o) "fo"
* \\
* [t b] "foo"
* / \\
* "foot" (e) (a) "foob"
* / \\
* "foote" (r) (r) "fooba"
* / \\
* "footer" [] [] "foobar"
* 我们进行一下压缩:
* ["foo"] ""
* |
* [t b] "foo"
* / \\
* "foot" ("er") ("ar") "foob"
* / \\
* "footer" [] [] "foobar"
* redis中基本就是这个样子
* 如果我们再插入一个first:
*
* (f) ""
* /
* (i o) "f"
* / \\
* "firs" ("rst") (o) "fo"
* / \\
* "first" [] [t b] "foo"
* / \\
* "foot" ("er") ("ar") "foob"
* / \\
* "footer" [] [] "foobar"
* 那就变成了这个样子
RAX不仅可以存储字符串,同时还可以为这个字符串设置一个值,也就是 key-value。
数据结构
typedef struct rax
raxNode *head; //指向头结点的指针
uint64_t numele; //元素个数(key的个数)
uint64_t numnodes; //节点个数
rax; //rax代表一个Rax树
typedef struct raxNode
uint32_t iskey:1; //当前节点是否包含一个key,占用一个字节
uint32_t isnull:1; //当前key对应的value是否为空,占用一个字节
uint32_t iscompr:1;//当前节点是否为压缩节点,占用一个字节
uint32_t size:29; //压缩节点压缩的字符串长度 或 非压缩节点的子节点个数,占用29个字节
//真抠,不错
/* Data layout is as follows:
*
* If node is not compressed we have 'size' bytes, one for each children
* character, and 'size' raxNode pointers, point to each child node.
* Note how the character is not stored in the children but in the
* edge of the parents:
*
* [header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)
*
* if node is compressed (iscompr bit is 1) the node has 1 children.
* In that case the 'size' bytes of the string stored immediately at
* the start of the data section, represent a sequence of successive
* nodes linked one after the other, for which only the last one in
* the sequence is actually represented as a node, and pointed to by
* the current compressed node.
*
* [header iscompr=1][xyz][z-ptr](value-ptr?)
*
* Both compressed and not compressed nodes can represent a key
* with associated data in the radix tree at any level (not just terminal
* nodes).
*
* If the node has an associated key (iskey=1) and is not NULL
* (isnull=0), then after the raxNode pointers poiting to the
* children, an additional value pointer is present (as you can see
* in the representation above as "value-ptr" field).
*/
unsigned char data[];
//data中包含:填充字段、当前节点包含的字符串以及子节点的指针、key对应的value指针
raxNode; //raxNode代表Rax中的一个节点
为了实现Rax树的遍历,redis提供了 RaxStack、raxIterator 两种结构。
/* Stack data structure used by raxLowWalk() in order to, optionally, return
* a list of parent nodes to the caller. The nodes do not have a "parent"
* field for space concerns, so we use the auxiliary stack when needed. */
#define RAX_STACK_STATIC_ITEMS 32
typedef struct raxStack
void **stack; /* Points to static_items or an heap allocated array. */
size_t items, maxitems; /* Number of items contained and total space. */
/* Up to RAXSTACK_STACK_ITEMS items we avoid to allocate on the heap
* and use this static array of pointers instead. */
void *static_items[RAX_STACK_STATIC_ITEMS]; //这是一个数组,其中的每一个元素都指向一个存储路径
int oom; /* True if pushing into this stack failed for OOM at some point. */
raxStack; //用于存储从根节点到当前节点的路径
typedef struct raxIterator
int flags; //选择范围在下面
rax *rt; /* Radix tree we are iterating. */
unsigned char *key; //当前遍历到的key
void *data; /* Data associated to this key. */
size_t key_len; /* Current key length. */
size_t key_max; /* Max key len the current key buffer can hold. */
unsigned char key_static_string[RAX_ITER_STATIC_LEN]; //当key较大时,会从堆空间申请内存
raxNode *node; /* Current node. Only for unsafe iteration. */
raxStack stack; /* Stack used for unsafe iteration. */
raxNodeCallback node_cb; /* Optional node callback. Normally set to NULL. */
raxIterator; //用于遍历Rax树中所有的key
#define RAX_ITER_JUST_SEEKED (1<<0)
/* Iterator was just seeked. Return current element for the first iteration and clear the flag. */
#define RAX_ITER_EOF (1<<1) /* End of iteration reached. */
#define RAX_ITER_SAFE (1<<2) /* Safe iterator, allows operations while iterating. But it is slower. */
查找函数
这里可以稍微先瞄一眼,反正后面都是要去手写一下基数树的,先偷瞄一眼。
绕过重重包围,我们发现其实最终调用的是下面这个函数进行查找:
(不做解释,英文注释已经够清楚了)
/* Low level function that walks the tree looking for the string
* 's' of 'len' bytes. The function returns the number of characters
* of the key that was possible to process: if the returned integer
* is the same as 'len', then it means that the node corresponding to the
* string was found (however it may not be a key in case the node->iskey is
* zero or if simply we stopped in the middle of a compressed node, so that
* 'splitpos' is non zero).
*
* Otherwise if the returned integer is not the same as 'len', there was an
* early stop during the tree walk because of a character mismatch.
*
* The node where the search ended (because the full string was processed
* or because there was an early stop) is returned by reference as
* '*stopnode' if the passed pointer is not NULL. This node link in the
* parent's node is returned as '*plink' if not NULL. Finally, if the
* search stopped in a compressed node, '*splitpos' returns the index
* inside the compressed node where the search ended. This is useful to
* know where to split the node for insertion.
*
* Note that when we stop in the middle of a compressed node with
* a perfect match, this function will return a length equal to the
* 'len' argument (all the key matched), and will return a *splitpos which is
* always positive (that will represent the index of the character immediately
* *after* the last match in the current compressed node).
*
* When instead we stop at a compressed node and *splitpos is zero, it
* means that the current node represents the key (that is, none of the
* compressed node characters are needed to represent the key, just all
* its parents nodes). */
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts)
raxNode *h = rax->head;
raxNode **parentlink = &rax->head;
size_t i = 0; /* Position in the string. */
size_t j = 0; /* Position in the node children (or bytes if compressed).*/
while(h->size && i < len)
debugnode("Lookup current node",h);
unsigned char *v = h->data;
if (h->iscompr)
for (j = 0; j < h->size && i < len; j++, i++)
if (v[j] != s[i]) break;
if (j != h->size) break;
else
/* Even when h->size is large, linear scan provides good
* performances compared to other approaches that are in theory
* more sounding, like performing a binary search. */
for (j = 0; j < h->size; j++)
if (v[j] == s[i]) break;
if (j == h->size) break;
i++;
if (ts) raxStackPush(ts,h); /* Save stack of parent nodes. */
raxNode **children = raxNodeFirstChildPtr(h);
if (h->iscompr) j = 0; /* Compressed node only child is at index 0. */
memcpy(&h,children+j,sizeof(h));
parentlink = children+j;
j = 0; /* If the new node is non compressed and we do not
iterate again (since i == len) set the split
position to 0 to signal this node represents
the searched key. */
debugnode("Lookup stop node is",h);
if (stopnode) *stopnode = h;
if (plink) *plink = parentlink;
if (splitpos && h->iscompr) *splitpos = j;
return i;
插入元素
我们发现真正插入的函数是这个(有点长哈)
int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite)
size_t i;
int j = 0; /* Split position. If raxLowWalk() stops in a compressed
node, the index 'j' represents the char we stopped within the
compressed node, that is, the position where to split the
node for insertion. */
raxNode *h, **parentlink;
debugf("### Insert %.*s with value %p\\n", (int)len, s, data);
i = raxLowWalk(rax,s,len,&h,&parentlink,&j,NULL);
/* If i == len we walked following the whole string. If we are not
* in the middle of a compressed node, the string is either already
* inserted or this middle node is currently not a key, but can represent
* our key. We have just to reallocate the node and make space for the
* data pointer. */
if (i == len && (!h->iscompr || j == 0 /* not in the middle if j is 0 */))
debugf("### Insert: node representing key exists\\n");
/* Make space for the value pointer if needed. */
if (!h->iskey || (h->isnull && overwrite))
h = raxReallocForData(h,data);
if (h) memcpy(parentlink,&h,sizeof(h));
if (h == NULL)
errno = ENOMEM;
return 0;
/* Update the existing key if there is already one. */
if (h->iskey)
if (old) *old = raxGetData(h);
if (overwrite) raxSetData(h,data);
errno = 0;
return 0; /* Element already exists. */
/* Otherwise set the node as a key. Note that raxSetData()
* will set h->iskey. */
raxSetData(h,data);
rax->numele++;
return 1; /* Element inserted. */
/* If the node we stopped at is a compressed node, we need to
* split it before to continue.
*
* Splitting a compressed node have a few possible cases.
* Imagine that the node 'h' we are currently at is a compressed
* node contaning the string "ANNIBALE" (it means that it represents
* nodes A -> N -> N -> I -> B -> A -> L -> E with the only child
* pointer of this node pointing at the 'E' node, because remember that
* we have characters at the edges of the graph, not inside the nodes
* themselves.
*
* In order to show a real case imagine our node to also point to
* another compressed node, that finally points at the node without
* children, representing 'O':
*
* "ANNIBALE" -> "SCO" -> []
*
* When inserting we may face the following cases. Note that all the cases
* require the insertion of a non compressed node with exactly two
* children, except for the last case which just requires splitting a
* compressed node.
*
* 1) Inserting "ANNIENTARE"
*
* |B| -> "ALE" -> "SCO" -> []
* "ANNI" -> |-|
* |E| -> (... continue algo ...) "NTARE" -> []
*
* 2) Inserting "ANNIBALI"
*
* |E| -> "SCO" -> []
* "ANNIBAL" -> |-|
* |I| -> (... continue algo ...) []
*
* 3) Inserting "AGO" (Like case 1, but set iscompr = 0 into original node)
*
* |N| -> "NIBALE" -> "SCO" -> []
* |A| -> |-|
* |G| -> (... continue algo ...) |O| -> []
*
* 4) Inserting "CIAO"
*
* |A| -> "NNIBALE" -> "SCO" -> []
* |-|
* |C| -> (... continue algo ...) "IAO" -> []
*
* 5) Inserting "ANNI"
*
* "ANNI" -> "BALE" -> "SCO" -> []
*
* The final algorithm for insertion covering all the above cases is as
* follows.
*
* ============================= ALGO 1 =============================
*
* For the above cases 1 to 4, that is, all cases where we stopped in
* the middle of a compressed node for a character mismatch, do:
*
* Let $SPLITPOS be the zero-based index at which, in the
* compressed node array of characters, we found the mismatching
* character. For example if the node contains "ANNIBALE" and we add
* "ANNIENTARE" the $SPLITPOS is 4, that is, the index at which the
* mismatching character is found.
*
* 1. Save the current compressed node $NEXT pointer (the pointer to the
* child element, that is always present in compressed nodes).
*
* 2. Create "split node" having as child the non common letter
* at the compressed node. The other non common letter (at the key)
* will be added later as we continue the normal insertion algorithm
* at step "6".
*
* 3a. IF $SPLITPOS == 0:
* Replace the old node with the split node, by copying the auxiliary
* data if any. Fix parent's reference. Free old node eventually
* (we still need its data for the next steps of the algorithm).
*
* 3b. IF $SPLITPOS != 0:
* Trim the compressed node (reallocating it as well) in order to
* contain $splitpos characters. Change chilid pointer in order to link
* to the split node. If new compressed node len is just 1, set
* iscompr to 0 (layout is the same). Fix parent's reference.
*
* 4a. IF the postfix len (the length of the remaining string of the
* original compressed node after the split character) is non zero,
* create a "postfix node". If the postfix node has just one character
* set iscompr to 0, otherwise iscompr to 1. Set the postfix node
* child pointer to $NEXT.
*
* 4b. IF the postfix len is zero, just use $NEXT as postfix pointer.
*
* 5. Set child[0] of split node to postfix node.
*
* 6. Set the split node as the current node, set current index at child[1]
* and continue insertion algorithm as usually.
*
* ============================= ALGO 2 =============================
*
* For case 5, that is, if we stopped in the middle of a compressed
* node but no mismatch was found, do:
*
* Let $SPLITPOS be the zero-based index at which, in the
* compressed node array of characters, we stopped iterating because
* there were no more keys character to match. So in the example of
* the node "ANNIBALE", addig the string "ANNI", the $SPLITPOS is 4.
*
* 1. Save the current compressed node $NEXT pointer (the pointer to the
* child element, that is always present in compressed nodes).
*
* 2. Create a "postfix node" containing all the characters from $SPLITPOS
* to the end. Use $NEXT as the postfix node child pointer.
* If the postfix node length is 1, set iscompr to 0.
* Set the node as a key with the associated value of the new
* inserted key.
*
* 3. Trim the current node to contain the first $SPLITPOS characters.
* As usually if the new node length is just 1, set iscompr to 0.
* Take the iskey / associated value as it was in the orignal node.
* Fix the parent's reference.
*
* 4. Set the postfix node as the only child pointer of the trimmed
* node created at step 1.
*/
/* ------------------------- ALGORITHM 1 --------------------------- */
if (h->iscompr && i != len)
debugf("ALGO 1: Stopped at compressed node %.*s (%p)\\n",
h->size, h->data, (void*)h);
debugf("Still to insert: %.*s\\n", (int)(len-i), s+i);
debugf("Splitting at %d: '%c'\\n", j, ((char*)h->data)[j]);
debugf("Other (key) letter is '%c'\\n", s[i]);
/* 1: Save next pointer. */
raxNode **childfield = raxNodeLastChildPtr(h);
raxNode *next;
memcpy(&next,childfield,sizeof(next));
debugf("Next is %p\\n", (void*)next);
debugf("iskey %d\\n", h->iskey);
if (h->iskey)
debugf("key value is %p\\n", raxGetData(h));
/* Set the length of the additional nodes we will need. */
size_t trimmedlen = j;
size_t postfixlen = h->size - j - 1;
int split_node_is_key = !trimmedlen && h->iskey && !h->isnull;
size_t nodesize;
/* 2: Create the split node. Also allocate the other nodes we'll need
* ASAP, so that it will be simpler to handle OOM. */
raxNode *splitnode = raxNewNode(1, split_node_is_key);
raxNode *trimmed = NULL;
raxNode *postfix = NULL;
if (trimmedlen)
nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+
sizeof(raxNode*);
if (h->iskey && !h->isnull) nodesize += sizeof(void*);
trimmed = rax_malloc(nodesize);
if (postfixlen)
nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
sizeof(raxNode*);
postfix = rax_malloc(nodesize);
/* OOM? Abort now that the tree is untouched. */
if (splitnode == NULL ||
(trimmedlen && trimmed == NULL) ||
(postfixlen && postfix == NULL))
rax_free(splitnode);
rax_free(trimmed);
rax_free(postfix);
errno = ENOMEM;
return 0;
splitnode->data[0] = h->data[j];
if (j == 0)
/* 3a: Replace the old node with the split node. */
if (h->iskey)
void *ndata = raxGetData(h);
raxSetData(splitnode,ndata);
memcpy(parentlink,&splitnode,sizeof(splitnode));
else
/* 3b: Trim the compressed node. */
trimmed->size = j;
memcpy(trimmed->data,h->data,j);
trimmed->iscompr = j > 1 ? 1 : 0;
trimmed->iskey = h->iskey;
trimmed->isnull = h->isnull;
if (h->iskey && !h->isnull)
void *ndata = raxGetData(h);
raxSetData(trimmed,ndata);
raxNode **cp = raxNodeLastChildPtr(trimmed);
memcpy(cp,&splitnode,sizeof(splitnode));
memcpy(parentlink,&trimmed,sizeof(trimmed));
parentlink = cp; /* Set parentlink to splitnode parent. */
rax->numnodes++;
/* 4: Create the postfix node: what remains of the original
* compressed node after the split. */
if (postfixlen)
/* 4a: create a postfix node. */
postfix->iskey = 0;
postfix->isnull = 0;
postfix->size = postfixlen;
postfix->iscompr = postfixlen > 1;
memcpy(postfix->data,h->data+j+1,postfixlen);
raxNode **cp = raxNodeLastChildPtr(postfix);
memcpy(cp,&next,sizeof(next));
rax->numnodes++;
else
/* 4b: just use next as postfix node. */
postfix 以上是关于redis源码学习rax,我愿称之为“升级版字典树”的主要内容,如果未能解决你的问题,请参考以下文章
redis源码学习紧凑列表 listpack,t_hash的御用底层结构