Understanding The Memcached Source Code - LRU II

slab allocator (I, II, III) is the core module of the cache system, which largely determines how efficient the bottleneck resource, memory, can be utilized. The other 3 parts, namely,

LRU algorithm (I , II - this article , III) for entry expiration; and an

event driven model (I , II , III) based on libevent; and the

consistent hashing (not complete) for data distribution,

are built around it.

More often than not, the LRU algorithm is combined with a hash map, and is referred to as a LRU cache.

In a LRU-cache, the hash map enables fast accessing of cached objects; and LRU avoids the cache to grow infinitely by marking expired, or so called, least recently used objects. This time we examine the memcached‘s implementation of hash map.

Overview (textbook overlapped, skip)

Hash map is basically a fixed-sized array that indexes values with integers hashed from keys. In hash map an array entry is referred to as a bucket to distinguish from the entry of the hash map, i.e., item. If the hash value exceeds the number of buckets (i.e., array size), it rolls over using ‘mod’ (%). Collision occurs when more than two keys result in the same hash value or different hash values roll over to the same bucket, then a *linked list is formed on the bucket in collision.

Collision slows down lookups speed for the sequential access of linked list, hence it is required to increase the bucket number, and to rehash entries using the new bucket number before the performance goes too bad. This process will be discussed soon.

Module initialization

The first relevant method is

hash_init

which simply determines the hash algorithm type.

int hash_init(enum hashfunc_type type) {
switch(type) {
case JENKINS_HASH:
hash = jenkins_hash;
settings.hash_algorithm = "jenkins";
break;
case MURMUR3_HASH:
hash = MurmurHash3_x86_32;
settings.hash_algorithm = "murmur3";
break;
default:
return -1;
}
return 0;
}
hash_init@hash.c

This method is called from here as one of the init steps before the logic enters the main event loop.


...
if (hash_init(hash_type) != 0) {
fprintf(stderr, "Failed to initialize hash_algorithm!\n");
exit(EX_USAGE);
}
...

main@memcached.c:5849

The parameter hash_type is set to MURMUR3_HASH by the mentioned command-line argument modern.


...
case MODERN:
/* Modernized defaults. Need to add equivalent no_* flags
* before making truly default. */
...
hash_type = MURMUR3_HASH;
...
break;
...

main@memcached.c:5849

The second method

assoc_init

allocates the fixed sized array mentioned in the beginning.

void assoc_init(const int hashtable_init) {
if (hashtable_init) {
hashpower = hashtable_init;
}
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
...// scr: stat
}
hash_init@hash.c

This method is called in a similar location as hash_init as part of the system bootstrap process.


...
assoc_init(settings.hashpower_init);
...

main@memcached.c:5976

And the actual initial size is determined by the command-line argument hashpower.


...
case HASHPOWER_INIT:
if (subopts_value == NULL) {
fprintf(stderr, "Missing numeric argument for hashpower\n");
return 1;
}
settings.hashpower_init = atoi(subopts_value);
if (settings.hashpower_init < 12) {
fprintf(stderr, "Initial hashtable multiplier of %d is too low\n",
settings.hashpower_init);
return 1;
} else if (settings.hashpower_init > 64) {
fprintf(stderr, "Initial hashtable multiplier of %d is too high\n"
"Choose a value based on \"STAT hash_power_level\" from a running instance\n",
settings.hashpower_init);
return 1;
}
break;
...

main@memcached.c:5677

As said before, the array can be replaced with a newly allocated larger one if the performance drops due to excessive collision. Next we discuss the process of

Scale up & entry migration

In memcached, the threshold is 1.5, meaning, if the items number exceeds 1.5 * bucket number, the mentioned expanding process starts.

Expand hash map

...
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
...
assoc_insert@assoc.c:173

The assoc_start_expand simply set a flag (i.e., do_run_maintenance_thread), and send a signal to awake a maintenance thread that does the actual job.

static void assoc_start_expand(void) {
if (started_expanding)
return;

started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}
assoc_insert@assoc.c:173

Maintenance thread main loop

static void *assoc_maintenance_thread(void *arg) {

mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread/* scr: the flag*/) {
int ii = 0;

/* There is only one expansion thread, so no need to global lock. */
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { // scr: ----> 2)
item *it, *next;
int bucket;
void *item_lock = NULL;

/* bucket = hv & hashmask(hashpower) =>the bucket of hash table
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
if ((item_lock = item_trylock(expand_bucket))) { // scr: --------> 3)
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next; // scr: ----------------------------------> 4)
bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}

old_hashtable[expand_bucket] = NULL; // scr: ----------------> 4.1)

expand_bucket++; // scr: --------------------------------------> 5)
if (expand_bucket == hashsize(hashpower - 1)) { // scr: -------> 6)
expanding = false;
free(old_hashtable);
... // scr: ---------------------------------------------------> stat & log
}
} else {
usleep(10*1000); // scr: ------------------------------------> 3.1)
}

if (item_lock) { // scr: --------------------------------------> 3.2)
item_trylock_unlock(item_lock);
item_lock = NULL;
}
}

if (!expanding) {
/* We are done expanding.. just wait for next invocation */
started_expanding = false;
pthread_cond_wait(&maintenance_cond, &maintenance_lock); // scr: > 0)
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
assoc_expand(); // scr: -----------------------------------------> 1)
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
}
assoc_maintenance_thread@assoc.c

0) This is where the thread wakes up from sleep and start working, and goes to sleep when there is nothing to be done.

1) assoc_expand allocates the resource for the new hash map which is meant to replace the old one initialized from here.


/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
old_hashtable = primary_hashtable;

primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
... // scr: log
hashpower++;
expanding = true;
expand_bucket = 0;
... // scr: stat
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}

assoc_expand@assoc.c

2) Only migrate a certain number (hash_bulk_move) of buckets in one batch. When stop_assoc_maintenance_thread is called, the thread stops whenever one batch is complete, rather than hangs around to complete the migration of the whole hash map.


#define DEFAULT_HASH_BULK_MOVE 1
int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;

assoc.c:207

...
char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
if (env != NULL) {
hash_bulk_move = atoi(env);
if (hash_bulk_move == 0) {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
...

start_assoc_maintenance_thread@assoc.c:281

3) (The “item lock” actually works on the whole bucket hence bucket lock could be a better name) Use item_trylock (i.e., pthread_mutex_trylock) to access the bucket; 3.1) sleep for 10 sec when the the bucket is not available; and 3.2) release the lock using item_trylock_unlock when the migration (of this bucket) completes.


void *item_trylock(uint32_t hv) {
pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
if (pthread_mutex_trylock(lock) == 0) {
return lock;
}
return NULL;
}

item_trylock@thread.c


void item_trylock_unlock(void *lock) {
mutex_unlock((pthread_mutex_t *) lock);
}

item_trylock@thread.c

4) Rehash all the items in the bucket, and migrate them to the new hash map.

5) Move on to the next bucket.

6) Last bucket reached -> go to 0).

Maintenance thread start

int start_assoc_maintenance_thread() {
int ret;
char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
if (env != NULL) {
hash_bulk_move = atoi(env);
if (hash_bulk_move == 0) {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
pthread_mutex_init(&maintenance_lock, NULL);
if ((ret = pthread_create(&maintenance_tid, NULL,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
return -1;
}
return 0;
}
start_assoc_maintenance_thread@assoc.c

Similar to initialization methods, it is called during system bootstrap.


...
if (start_assoc_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
...

main@memcached.c:5992

Maintenance thread stop

stop_assoc_maintenance_thread is called in system shutdown process. Semantically it is opposite to start_assoc_maintenance_thread, while, mechanically, the operations of this method are opposite to that of assoc_start_expand.


...
stop_assoc_maintenance_thread();
...

main@memcached.c:6098

void stop_assoc_maintenance_thread() {
mutex_lock(&maintenance_lock);
do_run_maintenance_thread = 0;
pthread_cond_signal(&maintenance_cond);
mutex_unlock(&maintenance_lock);

/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
}
stop_assoc_maintenance_thread@assoc.c

As said before, the expanding & migration process discussed here has an impact on the logic of all hash map related operations. In the next section, we look at these operations.

CRUD

assoc_insert

int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket]; // scr: -------------------> 1)
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)]; // scr: > 2)
primary_hashtable[hv & hashmask(hashpower)] = it;
}

pthread_mutex_lock(&hash_items_counter_lock);
hash_items++; // scr: ------------------------------------------------> 3)
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
pthread_mutex_unlock(&hash_items_counter_lock);

MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
return 1;
}
assoc_insert@assoc.c

1) If expanding process is undergoing and the hash key associated bucket has not been migrated, insert the item to old_hashtable. Note that here we use the old bucket number (i.e., hashmask(hashpower - 1))) to calculate the hash index.

2) Otherwise, insert the item to primary_hashtable directly.

3) Increase the global variable hash_items (number of items). If it exceeds the threshold after the item is added, start expanding & migration process that is the main focus of the last section.


...
static unsigned int hash_items = 0;
...

assoc.c:51

assoc_find

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket]; // scr: ---------------------------> 1)
} else {
it = primary_hashtable[hv & hashmask(hashpower)]; // scr: --------> 2)
}

item *ret = NULL;
int depth = 0;
while (it) { // scr: -------------------------------------------------> 3)
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}
assoc_find@assoc.c

1) Similar to that of assoc_insert, this step locates the bucket from old_hashtable when the key is yet to be rehashed.

2) Use primary_hashtable directly otherwise.

3) Go through the linked list and compare the key (instead of the hash index) directly to lookup the item in the case of Collision.

assoc_delete

Based on last post, the only delete related method that deserves of re-discussion is _hashitem_before when considering expanding and migration. Similar to assoc_find, it simply reads from the old_hashtable when the key is yet to be rehashed.

static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
item **pos;
unsigned int oldbucket;

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
pos = &old_hashtable[oldbucket]; // scr: ---------------------------> diff)
} else {
pos = &primary_hashtable[hv & hashmask(hashpower)];
}

while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
pos = &(*pos)->h_next;
}
return pos;
}
_hashitem_before@assoc.c

One thing worth noting is that assoc_find is very similar to _hashitem_before. The difference here is, _hashitem_before returns the address of the next member of the element before the found one (pos = &(*pos)->h_next;), which is required when removing entries from a singly linked list; whilst this method returns the element found directly (ret = it;).

That's it. Did I make a serious mistake? or miss out on anything important? Or you simply like the read. Link me on -- I'd be chuffed to hear your feedback.