Redis之跳表源码阅读

zhyjc6于2020-07-11发布 约9121字·约20分钟 本文总阅读量

前言

跳表是什么?计算机专业的同学毕业了也不一定听说过,因为课本上根本没有跳表。包括鼎鼎大名的《算法4》和《算法导论》都没有提及跳表这一数据结构。我也是在看面经的过程中发现的,我发现大多数关于跳表的问题都会涉及到 redis ,于是我就去找了本 《Redis设计与实现》,看看其中关于跳表的描述,差不多就知道了跳表是个什么玩意。然后再去Google看一些大牛的分析文章,自己再将其总结一遍。

定义

跳表本质上其实就是个优化后的双向链表,是可以进行二分查找的有序链表。

跳表是一个随机化的数据结构,可以被看做二叉树的一个变种,它在性能上和红黑树,AVL树不相上下,但是跳表的原理非常简单,目前在 Redis 和 LeveIDB 中都有用到。

复杂度

算法 平均 最差
空间 $O(N)$ $O(NlogN)$
搜索 $O(logN)$ $O(N)$
插入 $O(logN)$ $O(N)$
删除 $O(logN)$ $O(N)$

原理

既然跳表的本质是个双向链表,那么它是如何进行二分查找的呢?当然不是重载运算符加号 “+”,再一个一个加上去。那么一个链表要如何才能实现二分查找呢?这就是理解跳表的核心问题。

我们不妨从时间复杂度的角度逆向进行分析吧!跳表的搜索、插入和删除的平均时间复杂度都是 $O(logN)$ ,这个时间复杂度一般对应的就是二分法。如果我们实现了二分查找,那么搜索的时间复杂度就能满足。如果搜索的时间复杂度为 $O(logN)$ ,那么插入的时间复杂度就是 $O(logN)$ 的查找插入位置 + $O(1)$ 的插入,也就是 $O(logN)$ 的时间复杂度,同理删除也能达到 $O(logN)$ 的时间复杂度。

现在我们的问题就是跳表是如何实现二分查找的?

答案是多级指针。什么意思呢?就是一个跳表的结点里面有一个基础的前向指针和后向指针,但是该结点可能会存在多个多级前向指针指针指向前面不同距离的结点。如图:

搜索时先从头结点的最高级指针遍历,如果下一结点值大于目标值,那么就说明目标值在当前结点和下一节点之间。于是在该结点我们取低一级指针,继续向前遍历,同样找到第一个大于目标值的结点,再该结点前一个结点停下,继续取低一级指针,依次循环以上操作,直到最后取到最低一级指针,也就是最原始的指针(指针指向下一节点,不会跳过结点)。当到达最低一级指针时,我们已经离目标值很近了,因此只需要简单的向前遍历就好了。如图:

实现

我们就以 redis-6.0.5(当前最新稳定版) 中的实现为例,看一下跳表是如何实现的。

跳表结构

typedef char *sds; //sds.h

/* ZSETs use a specialized version of Skiplists */
  typedef struct zskiplistNode {
      sds ele; 
      double score;
      struct zskiplistNode *backward;                                                                                                
      struct zskiplistLevel {
          struct zskiplistNode *forward;
          unsigned long span;
      } level[];
  } zskiplistNode;
  
  typedef struct zskiplist {
      struct zskiplistNode *header, *tail;
      unsigned long length;
      int level;
  } zskiplist;

在文件 server.h 中我们可以看到跳表就是一个没有方法只有成员变量的结构体(C语言实现)。其中 zskiplistNode 是跳表结点,zskiplist 是跳表加上头尾结点将跳表结点包装起来的一个列表结构。

zskiplistNode

zskiplist

一个例子

带有不同层高的结点之间的指向关系

创建跳表

我们看到zslCreate函数就是创建并返回一个空的跳表,其中头结点被赋值,但尾结点被赋值为NULL。

  /* Create a new skiplist. */
  zskiplist *zslCreate(void) {
      int j;
      zskiplist *zsl;
  
      zsl = zmalloc(sizeof(*zsl));
      zsl->level = 1; 
      zsl->length = 0; 
      zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
      for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
          zsl->header->level[j].forward = NULL;
          zsl->header->level[j].span = 0; 
      }    
      zsl->header->backward = NULL;                                                                                                  
      zsl->tail = NULL;
      return zsl; 
  }

我们进一下zmalloc函数看看它都干了些什么。

//看一下zmalloc
static void (*zmalloc_oom_handler)(size_t) = zmalloc_default_oom;
void *zmalloc(size_t size) {
    void *ptr = malloc(size+PREFIX_SIZE);

    if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
    update_zmalloc_stat_alloc(zmalloc_size(ptr));
    return ptr;
#else
    *((size_t*)ptr) = size;
    update_zmalloc_stat_alloc(size+PREFIX_SIZE);
    return (char*)ptr+PREFIX_SIZE;
#endif
}
//看一下zmalloc_default_oom
static void zmalloc_default_oom(size_t size) {
    fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",
        size);
    fflush(stderr);
    abort();
}

再进一下zslCreateNode函数

  /* Create a skiplist node with the specified number of levels.
   * The SDS string 'ele' is referenced by the node after the call. */
  zskiplistNode *zslCreateNode(int level, double score, sds ele) {                                                                   
      zskiplistNode *zn =
          zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
      zn->score = score;
      zn->ele = ele;
      return zn;
  }

销毁跳表结点

/* Free the specified skiplist node. The referenced SDS string representation
   * of the element is freed too, unless node->ele is set to NULL before calling
   * this function. */
  void zslFreeNode(zskiplistNode *node) {
      sdsfree(node->ele);
      zfree(node);
  }
//跟进一下sdsfree函数
/* Free an sds string. No operation is performed if 's' is NULL. */
void sdsfree(sds s) {
    if (s == NULL) return;
    s_free((char*)s-sdsHdrSize(s[-1]));
}
//再跟进一下s_free函数
#define s_free zfree
//竟然是个宏定义?算了继续跟进zfree函数
void zfree(void *ptr) {                                                                                                              
#ifndef HAVE_MALLOC_SIZE
    void *realptr;
    size_t oldsize;
#endif

    if (ptr == NULL) return;
#ifdef HAVE_MALLOC_SIZE
    update_zmalloc_stat_free(zmalloc_size(ptr));
    free(ptr);
#else     
    realptr = (char*)ptr-PREFIX_SIZE;
    oldsize = *((size_t*)realptr);
    update_zmalloc_stat_free(oldsize+PREFIX_SIZE);
    free(realptr);
#endif
}
//不跟了,太多调用关系了

销毁跳表

  /* Free a whole skiplist. */                                                                                                       
  void zslFree(zskiplist *zsl) {
      zskiplistNode *node = zsl->header->level[0].forward, *next;
  
      zfree(zsl->header);
      while(node) {
          next = node->level[0].forward;
          zslFreeNode(node);
          node = next;
      }
      zfree(zsl);
  }
//可以看到先通过头结点得到第一个结点,然后头结点就没用了,直接销毁
//然后就是利用最低级指针遍历所有结点,
//通过销毁节点函数一一销毁
//最后是zfree掉 跳表这个变量

插入结点

变量

流程

首先查找要插入的位置,这个过程中记录下新插入结点每一级指针的的后向结点update[i],同时记录其排位rank[i]。方便后续更新。

调用随机函数取得新结点的指针层数,然后调用创建结点函数得到新结点。

最后将第一步查找得到的update和rank信息用上,更新新的指针指向和跨度。

/* Insert a new node in the skiplist. Assumes the element does not already
   * exist (up to the caller to enforce that). The skiplist takes ownership
   * of the passed SDS string 'ele'. */
  zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
      zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
      unsigned int rank[ZSKIPLIST_MAXLEVEL];
      int i, level;
  
      serverAssert(!isnan(score));
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
          /* store rank that is crossed to reach the insert position */
          rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
          while (x->level[i].forward &&
                  (x->level[i].forward->score < score ||
                      (x->level[i].forward->score == score &&
                      sdscmp(x->level[i].forward->ele,ele) < 0)))
          {
              rank[i] += x->level[i].span;
              x = x->level[i].forward;
          }
          update[i] = x;
      }
      /* we assume the element is not already inside, since we allow duplicated
       * scores, reinserting the same element should never happen since the
       * caller of zslInsert() should test in the hash table if the element is
       * already inside or not. */
      level = zslRandomLevel();
      if (level > zsl->level) {
          for (i = zsl->level; i < level; i++) {
              rank[i] = 0;
              update[i] = zsl->header;
              update[i]->level[i].span = zsl->length;
          }
          zsl->level = level;
      }
      x = zslCreateNode(level,score,ele);
      for (i = 0; i < level; i++) {
          x->level[i].forward = update[i]->level[i].forward;
          update[i]->level[i].forward = x;
  
          /* update span covered by update[i] as x is inserted here */
          x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
          update[i]->level[i].span = (rank[0] - rank[i]) + 1;
      }
  
      /* increment span for untouched levels */
      for (i = level; i < zsl->level; i++) {
          update[i]->level[i].span++;
      }
  
      x->backward = (update[0] == zsl->header) ? NULL : update[0];
      if (x->level[0].forward)
          x->level[0].forward->backward = x;
      else
          zsl->tail = x;
      zsl->length++;
      return x;
  }

这个随机函数有必要列出来一下

  /* Returns a random level for the new skiplist node we are going to create.
   * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
   * (both inclusive), with a powerlaw-alike distribution where higher
   * levels are less likely to be returned. */
  int zslRandomLevel(void) {
      int level = 1;
      while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
          level += 1;
      return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
  }

删除结点

变量

流程

首先查找待删除结点位置,并记录update结点数组。

如果找到待删除结点并且该结点的分数和内容都匹配,那么就将其删除,否则说明没找到,返回0。

  /* Delete an element with matching score/element from the skiplist.
   * The function returns 1 if the node was found and deleted, otherwise
   * 0 is returned.
   *
   * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
   * it is not freed (but just unlinked) and *node is set to the node pointer,
   * so that it is possible for the caller to reuse the node (including the
   * referenced SDS string at node->ele). */
  int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
      zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
      int i;
  
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
          while (x->level[i].forward &&
                  (x->level[i].forward->score < score ||
                      (x->level[i].forward->score == score &&
                       sdscmp(x->level[i].forward->ele,ele) < 0)))
          {
              x = x->level[i].forward;
          }
          update[i] = x;
      }
      /* We may have multiple elements with the same score, what we need
       * is to find the element with both the right score and object. */
      x = x->level[0].forward;
      if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
          zslDeleteNode(zsl, x, update);
          if (!node)
              zslFreeNode(x);
          else
              *node = x;
          return 1;
      }
      return 0; /* not found */
  }

//跟进zslDeleteNode函数
  /* Internal function used by zslDelete, zslDeleteRangeByScore and
   * zslDeleteRangeByRank. */
  void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
      int i;
      for (i = 0; i < zsl->level; i++) {
          if (update[i]->level[i].forward == x) {
              update[i]->level[i].span += x->level[i].span - 1;
              update[i]->level[i].forward = x->level[i].forward;
          } else {
              update[i]->level[i].span -= 1;
          }
      }
      if (x->level[0].forward) {
          x->level[0].forward->backward = x->backward;
      } else {
          zsl->tail = x->backward;
      }
      while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
          zsl->level--;
      zsl->length--;
  }

查找结点

通过排位查找

该方法的查找很简单,就是从最高层级向低级遍历,累加遇到的跨度,直到达到参数rank即可返回当前结点。否则返回null。

  /* Finds an element by its rank. The rank argument needs to be 1-based. */
  zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
      zskiplistNode *x;
      unsigned long traversed = 0;
      int i;
  
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
          while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
          {
              traversed += x->level[i].span;
              x = x->level[i].forward;
          }
          if (traversed == rank) {
              return x;
          }
      }
      return NULL;                                                                                                                   
  }

后记

这是第一次看redis源码,之前打算暑假看看redis源码,但这一看就看傻了,一个文件就有几千行代码,总共一百多文件,这谁顶得住啊!不看了,看看书了解一下就打住。跳表的部分也还有好大一部分没看,就先看这些吧!

参考资料