理解 Memcached 源码 - LRU II

多半情况下,LRU 会和 哈希表 一起使用,然后我们把这个组合称为 LRU-cache

LRU-cache 中, 哈希表 提供快速索引的功能; 而 LRU 则是将 最近最少使用 (least recently used) 的元素在适当的时机标记为过期,并予以删除,用来防止缓存无限增大。本篇我们主要看 哈希表 的实现。

概述 (课本都讲过了, 跳过

哈希表 本质上是一块 固定大小 的数组。数组的索引是由哈希生成的整形。在 哈希表 中,这个数组的元素也被称作 ,以区别于 哈希表 本身的元素,item。当然,如果生成的索引超过 的数量,则取模之。如果两个 键值哈希 成同一个整形索引值,或者两个索引值被取模后的值刚好相同,则 冲突 发生。 所以一般做法是在每个 上建立一个链表来存储所有 冲突 于此的键值对。

需要提一下,冲突 会降低 哈希表 的查找速度,原因很简单,因为要遍历链表 O(n)。处理方法也很简单,把数组大小扩容,然后重新对所有的元素进行 重新哈希,因为 的数量增加了,冲突 也就减少了,而由 冲突 所形成的链表也就变少或变短了。 当然,重新哈希 的代价很大,所以一般是在性能实际下降时严重时才会发起。具体过程很快会就会讲到。

模块初始化

第一个方法

hash_init

用来确定 哈希算法

int hash_init(enum hashfunc_type type) {
switch(type) {
case JENKINS_HASH:
hash = jenkins_hash;
settings.hash_algorithm = "jenkins";
break;
case MURMUR3_HASH:
hash = MurmurHash3_x86_32;
settings.hash_algorithm = "murmur3";
break;
default:
return -1;
}
return 0;
}
hash_init@hash.c

这个方法在 初始化过程中 被调用。


...
if (hash_init(hash_type) != 0) {
fprintf(stderr, "Failed to initialize hash_algorithm!\n");
exit(EX_USAGE);
}
...

main@memcached.c:5849

其中参数 hash_type 被设置为 MURMUR3_HASH。这个是由命令行参数 modern 决定的。


...
case MODERN:
/* Modernized defaults. Need to add equivalent no_* flags
* before making truly default. */
...
hash_type = MURMUR3_HASH;
...
break;
...

main@memcached.c:5849

第二个方法

assoc_init

为上述固定大小数组分配内存。

void assoc_init(const int hashtable_init) {
if (hashtable_init) {
hashpower = hashtable_init;
}
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
...// scr: stat
}
hash_init@hash.c

本方法也是在初始化过程中被调用


...
assoc_init(settings.hashpower_init);
...

main@memcached.c:5976

而初始大小则是由命令行参数hashpower确定。


...
case HASHPOWER_INIT:
if (subopts_value == NULL) {
fprintf(stderr, "Missing numeric argument for hashpower\n");
return 1;
}
settings.hashpower_init = atoi(subopts_value);
if (settings.hashpower_init < 12) {
fprintf(stderr, "Initial hashtable multiplier of %d is too low\n",
settings.hashpower_init);
return 1;
} else if (settings.hashpower_init > 64) {
fprintf(stderr, "Initial hashtable multiplier of %d is too high\n"
"Choose a value based on \"STAT hash_power_level\" from a running instance\n",
settings.hashpower_init);
return 1;
}
break;
...

main@memcached.c:5677

如上所述,如果 冲突 过多而导致查询性能下降,我们需要用更大的数组来替换当前数组。接下来我们看一下这个过程

扩容和迁移

memcached, 这个扩容的阈值是 1.5, 即, 当 item(或者是 哈希表 元素) 的数量超过 数量的 1.5 倍,上述机制会被激活。

Expand hash map

...
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
...
assoc_insert@assoc.c:173

这里, assoc_start_expand 仅仅是设置了一个标志位 (i.e., do_run_maintenance_thread), 然后给辅助维护线程发信号并唤起之。然后由这个线程进行真正的扩容迁移工作。

static void assoc_start_expand(void) {
if (started_expanding)
return;

started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}
assoc_insert@assoc.c:173

辅助维护线程主循环

static void *assoc_maintenance_thread(void *arg) {

mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread/* scr: the flag*/) {
int ii = 0;

/* There is only one expansion thread, so no need to global lock. */
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { // scr: ----> 2)
item *it, *next;
int bucket;
void *item_lock = NULL;

/* bucket = hv & hashmask(hashpower) =>the bucket of hash table
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
if ((item_lock = item_trylock(expand_bucket))) { // scr: --------> 3)
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next; // scr: ----------------------------------> 4)
bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}

old_hashtable[expand_bucket] = NULL; // scr: ----------------> 4.1)

expand_bucket++; // scr: --------------------------------------> 5)
if (expand_bucket == hashsize(hashpower - 1)) { // scr: -------> 6)
expanding = false;
free(old_hashtable);
... // scr: ---------------------------------------------------> stat & log
}
} else {
usleep(10*1000); // scr: ------------------------------------> 3.1)
}

if (item_lock) { // scr: --------------------------------------> 3.2)
item_trylock_unlock(item_lock);
item_lock = NULL;
}
}

if (!expanding) {
/* We are done expanding.. just wait for next invocation */
started_expanding = false;
pthread_cond_wait(&maintenance_cond, &maintenance_lock); // scr: > 0)
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
assoc_expand(); // scr: -----------------------------------------> 1)
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
}
assoc_maintenance_thread@assoc.c

0) 从这里开始,线程被唤起并开始工作;而当整个工作完成后,或者根本就没有迁移的需要时,此线程也是在这里被挂起。

1) assoc_expand 为新的 哈希表 分配内存。这个新的 哈希表 后面会完全替换由初始化阶段建立的既有 哈希表


/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
old_hashtable = primary_hashtable;

primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
... // scr: log
hashpower++;
expanding = true;
expand_bucket = 0;
... // scr: stat
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}

assoc_expand@assoc.c

2) 一个批次仅迁移固定数量 (hash_bulk_move)的 items。这样做的好处是在stop_assoc_maintenance_thread被调用时,辅助维护线程可以在完成当前批次立即停止,而不是要等到全量迁移完成。


#define DEFAULT_HASH_BULK_MOVE 1
int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;

assoc.c:207

...
char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
if (env != NULL) {
hash_bulk_move = atoi(env);
if (hash_bulk_move == 0) {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
...

start_assoc_maintenance_thread@assoc.c:281

3) (这里 “item lock” 实际上作用于 上,所以bucket lock应该是个更合适的名字) 用 item_trylock (i.e., pthread_mutex_trylock) 来访问 ; 3.1) 如果 不可用,线程休眠10秒; 3.2) 当 迁移完成时释放锁 (item_trylock_unlock)。


void *item_trylock(uint32_t hv) {
pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
if (pthread_mutex_trylock(lock) == 0) {
return lock;
}
return NULL;
}

item_trylock@thread.c


void item_trylock_unlock(void *lock) {
mutex_unlock((pthread_mutex_t *) lock);
}

item_trylock@thread.c

4) 对 bucket 上的所有 item 进行 重新哈希,并迁移至新 哈希表

5) 继续对下一个 进行迁移。

6) 所有的 都迁移完成 -> 回到 0)。

辅助迁移线程启动

int start_assoc_maintenance_thread() {
int ret;
char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
if (env != NULL) {
hash_bulk_move = atoi(env);
if (hash_bulk_move == 0) {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
pthread_mutex_init(&maintenance_lock, NULL);
if ((ret = pthread_create(&maintenance_tid, NULL,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
return -1;
}
return 0;
}
start_assoc_maintenance_thread@assoc.c

其他的初始化方法类似,这个方法也是在系统初始化时调用的。


...
if (start_assoc_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
...

main@memcached.c:5992

辅助维护线程终止

stop_assoc_maintenance_thread 是在系统终止时被调用的。语义上来看本方法对应的是 start_assoc_maintenance_thread,而功能上看,则和 assoc_start_expand 里的操作正好相反。


...
stop_assoc_maintenance_thread();
...

main@memcached.c:6098

void stop_assoc_maintenance_thread() {
mutex_lock(&maintenance_lock);
do_run_maintenance_thread = 0;
pthread_cond_signal(&maintenance_cond);
mutex_unlock(&maintenance_lock);

/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
}
stop_assoc_maintenance_thread@assoc.c

如上述,扩容和迁移的状态对所有的 哈希表 相关操作都有影响。下面我们具体看看有哪些影响。

CRUD

assoc_insert

int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket]; // scr: -------------------> 1)
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)]; // scr: > 2)
primary_hashtable[hv & hashmask(hashpower)] = it;
}

pthread_mutex_lock(&hash_items_counter_lock);
hash_items++; // scr: ------------------------------------------------> 3)
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
pthread_mutex_unlock(&hash_items_counter_lock);

MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
return 1;
}
assoc_insert@assoc.c

1) 如果正在扩容中,并且指定对应的 还没有被迁移,则将 item 插入至 old_hashtable。这里注意我们是用老的 数(i.e., hashmask(hashpower - 1)))来计算下标。

2) 如果不满足以上情况, 将 item 插入 primary_hashtable

3) 增加全局计数 hash_items (即item 数)。 如果 hash_items 超过了阈值, 则启动上文描述的扩容过程。


...
static unsigned int hash_items = 0;
...

assoc.c:51

assoc_find

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket]; // scr: ---------------------------> 1)
} else {
it = primary_hashtable[hv & hashmask(hashpower)]; // scr: --------> 2)
}

item *ret = NULL;
int depth = 0;
while (it) { // scr: -------------------------------------------------> 3)
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}
assoc_find@assoc.c

1) 和assoc_insert类似,这步用于定位尚未被重新哈希。并从 old_hashtable 取出对应的

2) 同样,如果没有进行扩容或者已经完成迁移,则从 primary_hashtable 中取。

3) 遍历链表并找到对应的 item。这个也是标准操作了。

assoc_delete

基于上篇的讲解,当考虑到扩容迁移时,这里唯一需要重新讨论的函数是 _hashitem_before。和assoc_find类似,这里的变化仅仅是,当对应的尚未完成迁移时从old_hashtable读取。

static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
item **pos;
unsigned int oldbucket;

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
pos = &old_hashtable[oldbucket]; // scr: ---------------------------> diff)
} else {
pos = &primary_hashtable[hv & hashmask(hashpower)];
}

while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
pos = &(*pos)->h_next;
}
return pos;
}
_hashitem_before@assoc.c