Understanding The Memcached Source Code - LRU III

slab allocator (I, II, III) is the core module of the cache system, which largely determines how efficient the bottleneck resource, memory, can be utilized. The other 3 parts, namely,

LRU algorithm (I, II, III - this article) for entry expiration; and an

event driven model (I , II , III) based on libevent; and the

consistent hashing (not complete) for data distribution,

are built around it.

In previous posts, we have discussed different facets of an item, i.e., slab, hash map and LRU list as well as their associated (CRUD) methods, which build up the internal procedures and perform client requests after the corresponding commands are parsed by the drive machine. This time we will go through those procedures by issuing telnet commands to a Memcached instance and see how the discussed modules work together on various item operations. We will also see the whole picture of LRU lists that maintain the property of ‘least recently used’ in accordance to those operations.

On top of standard LRU algorithm, the Memcached (1.4.28) emploies 3 lists instead of just 1, i.e., hot, warm and cold, a.k.a., Segmented LRU for each slab class. This heuristic is implemented to reduce the lock contention between item bumping (an action that moves recently accessed item to the list head) and item read. Moreover, unlike a casual implementation (such as the one I coded), Memcached does not bump item right on read action. Rather, the bumping is delayed to other operations when the resource is in short, which could reflect Memcached‘s read-first design decision.

In normal use cases, let’s say, a social media, the volume of read requests are more than that of other operations combined by orders of magnitude, hence it’s a critical point that worth extensive optimizations, I suppose.

We start this post by issuing an item read command to a Memcached instance.

~telnet localhost 11211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
...// add some items
> get test

Read

The the normal execution of this procedure,

ref
1 |~Drive machine & command parser
|-process_get_command
++ |-item_get
|-assoc_find (LRU II)
|-item_update
|-item_unlink_q (LRU I)
|-item_link_q (LRU I)

process_get_command

static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
char *key;
size_t nkey;
int i = 0;
item *it;
token_t *key_token = &tokens[KEY_TOKEN];
char *suffix;
assert(c != NULL);

do {
while(key_token->length != 0) { // scr: -----------------> *)

key = key_token->value;
nkey = key_token->length;

if(nkey > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
while (i-- > 0) {
item_remove(*(c->ilist + i));
}
return;
}

it = item_get(key, nkey, c); // scr: ----------------> 1)
if (settings.detail_enabled) {
stats_prefix_record_get(key, nkey, NULL != it);
}
if (it) {
if (i >= c->isize) { // scr: --------------------> *)
item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
if (new_list) {
c->isize *= 2;
c->ilist = new_list;
} else {
... // scr: stat
item_remove(it);
break;
}
}

if (return_cas)
{
... // scr: cas
}
else
{
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
if (add_iov(c, "VALUE ", 6) != 0 || // scr: ---> *)
add_iov(c, ITEM_key(it), it->nkey) != 0 ||
add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
{
item_remove(it);
break;
}
}

... // scr: verbose & stat

item_update(it); // scr: ------------------------> 2)
*(c->ilist + i) = it;
i++;

} else {
... // scr: stat
}

key_token++; // scr: --------------------------------> *)
}

/*
* If the command string hasn't been fully processed, get the next set
* of tokens.
*/
if(key_token->value != NULL) { // scr: ------------------> *)
ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
key_token = tokens;
}

} while(key_token->value != NULL);

c->icurr = c->ilist;
c->ileft = i;
... // scr: cas & verbose

if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
out_of_memory(c, "SERVER_ERROR out of memory writing get response");
}
else { // scr: ----------------------------------------------> *)
conn_set_state(c, conn_mwrite);
c->msgcurr = 0;
}
}
process_get_command@memcached.c

The only relevant step here are 1) item_get and 2) item_update. Steps marked as *) are mostly command parsing and I/O which will be discussed in later posts when we examine event driven mechanism.

do_item_get

Like other methods discussed before, item_get is a thread-safe wrapper of do_item_get.


item *item_get(const char *key, const size_t nkey, conn *c) {
item *it;
uint32_t hv;
hv = hash(key, nkey);
item_lock(hv);
it = do_item_get(key, nkey, hv, c);
item_unlock(hv);
return it;
}

item_get@thread.c

item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c) {
item *it = assoc_find(key, nkey, hv); // scr: -----------------> 1)
if (it != NULL) {
refcount_incr(&it->refcount); // scr: ---------------------> 2)
...// scr: comments
}
int was_found = 0;

...// scr: verbose

if (it != NULL) {
was_found = 1;
if (item_is_flushed(it)) {
...// scr: item flush
// scr: -----------------------------------------------------------> 3)
} else if (it->exptime != 0 && it->exptime <= current_time) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
...// scr: stat & verbose
was_found = 3;
} else { // scr: ------------------------------------------> 4)
it->it_flags |= ITEM_FETCHED|ITEM_ACTIVE;
DEBUG_REFCNT(it, '+');
}
}

...// scr: verbose

return it;
}
do_item_get@items.c

1) Use the discussed assoc_find to locate the item using the hash key.

2) Increase the discussed reference count.

3) If the item has expired, remove it. Note that do_item_unlink decreases the reference count held by the last step, and do_item_remove actually removes the item. These two methods will be discussed soon in item delete.

4) Simply mark the item as ITEM_ACTIVE rather than perform item bumping which is offloaded to other operations associated procedures. This is part of the heuristic discussed in the beginning.

do_item_update

item_update is a thread-safe wrapper of do_item_update.


void item_update(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);

item_lock(hv);
do_item_update(item);
item_unlock(hv);
}
item_update@thread.c

void do_item_update(item *it) {
MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
assert((it->it_flags & ITEM_SLABBED) == 0);

if ((it->it_flags & ITEM_LINKED) != 0) {
it->time = current_time; // scr: ----------------------> 1)
if (!settings.lru_maintainer_thread) {
item_unlink_q(it);
item_link_q(it);
}
}
}
}
do_item_update@items.c

1) The only line effective in this method is to set the access time for the item in (passively) an interval of 60 seconds. lru_maintainer_thread is set to true by command line argument modern so the operations inside if (!settings.lru_maintainer_thread) is not applicable.


#define ITEM_UPDATE_INTERVAL 60

memcached.h:73


case MODERN:
...
start_lru_maintainer = true;
break;

memcached.c:5828

Next,

> delete test

Delete

Call stack in normal execution,

ref
1 |~Drive machine & command parser
|-process_delete_command
++ |-do_item_get
|-do_item_unlink
|-assoc_delete (LRU II)
|-item_unlink_q (LRU I)
-- |-do_item_remove
-- |-do_item_remove
|-do_item_free
|-do_slabs_free (Slab II)

process_delete_command

static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
char *key;
size_t nkey;
item *it;

...// scr: sanity check

key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;

...// scr: sanity check & stat

it = item_get(key, nkey, c); // scr: -------------------------> 1)
if (it) {
MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);

...// scr: stat

item_unlink(it); // scr: ---------------------------------> 2)
item_remove(it); /* release our reference */
out_string(c, "DELETED");
} else {
...// scr: stat

out_string(c, "NOT_FOUND");
}
}
process_delete_command@memcached.c

1) Get the item using item_get discussed in last section. Note that the reference count is increased in item_get.

2) Delete it.

item_unlink is a thread safe wrapper of do_item_unlink.


void item_unlink(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);
item_lock(hv);
do_item_unlink(item, hv);
item_unlock(hv);
}
item_unlink@thread.c

void do_item_unlink(item *it, const uint32_t hv) {
...
if ((it->it_flags & ITEM_LINKED) != 0) {
it->it_flags &= ~ITEM_LINKED;
...// scr: stat
assoc_delete(ITEM_key(it), it->nkey, hv); // scr: --------> 1)
item_unlink_q(it); // scr: -------------------------------> 2)
do_item_remove(it); // scr: ------------------------------> 3)
}
}
do_item_unlink@items.c

1) As discussed in last post, assoc_delete removes the item from the hash map; and

2) item_unlink_q removes the item from the LRU list that the item belongs to.

3) This time do_item_remove simply decreases the reference count. The item will be removed when do_item_remove is called the second time from

do_item_remove

item_remove is a thread safe wrapper of do_item_remove.


void item_remove(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);

item_lock(hv);
do_item_remove(item);
item_unlock(hv);
}
item_remove@thread.c

void do_item_remove(item *it) {
...// scr: sanity check

if (refcount_decr(&it->refcount) == 0) { // scr: --------> 1)
item_free(it);
}
}
...
void item_free(item *it) {
size_t ntotal = ITEM_ntotal(it);
unsigned int clsid;
...// scr: sanity check

/* so slab size changer can tell later if item is already free or not */
clsid = ITEM_clsid(it); // scr: -------------------------> 2)
DEBUG_REFCNT(it, 'F');
slabs_free(it, ntotal, clsid); // scr: ------------------> 3)
}
do_item_remove@items.c

1) Decrease the reference count, if it reaches 0, goto 2) and free the item.

2) Use ITEM_clsid to get the slab class the item belongs. This macro removes the list type from slabs_clsid.


#define ITEM_clsid(item) ((item)->slabs_clsid & ~(3<<6))

memcached.h:116

3) Call slabs_free to release the memory to slab subsystem.

Create

The Item creating procedure is divided into several logic fragments by the mentioned drive machine, 1) creating an empty item object with the key and other meta data sent through; 2) read the value (from the socket) and fill the item object with it; and 3) link the item object. The workflow controller - drive machine will be discussed in the next post.

Now we send an add command to the Memcached instance.

> add test 0 60 11 (\r\n)
> hello world

Creating an empty item object

After the first line of the above command

> add test 0 60 11 (\r\n)

the procedure described in this section starts, the call stack of the hot path is,

ref
1 |~Drive machine & command parser
|-process_update_command
|-do_item_alloc
|-slabs_clsid (Slab III)
++ |-do_slabs_alloc (Slab III)
|-lru_pull_tail (on hot list)
|-do_item_update_nolock (same to do_item_update)
|-do_item_remove
|-item_link_q (LRU I)
|-do_item_remove
|-lru_pull_tail (on warm list)
|-same as hot list
|-lru_pull_tail (on cold list)
|-do_item_unlink_nolock (same to do_item_unlink LRU I)

We start from

process_update_command

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
char *key;
size_t nkey;
unsigned int flags;
int32_t exptime_int = 0;
time_t exptime;
int vlen;
uint64_t req_cas_id=0;
item *it;

...// scr: irrelevant code & sanity checks

key = tokens[KEY_TOKEN].value; // scr: ----------------------> 1)
nkey = tokens[KEY_TOKEN].length;

if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
&& safe_strtol(tokens[3].value, &exptime_int)
&& safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}

/* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
exptime = exptime_int;

...// scr: cas & sanity checks

vlen += 2; // scr: ------------------------------------------> 2)

...// scr: stat

it = item_alloc(key, nkey, flags, realtime(exptime), vlen); // 3)

...// scr: cas & error handling

c->item = it; // scr: ---------------------------------------> 4)
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
c->cmd = comm;
conn_set_state(c, conn_nread);
}
process_update_command@memcached.c

1) Set the key (i.e., test), as well as the meta data (i.e., flags, 0; exptime, 60;vlen,11`), to local variables.

2) Increase vlen by 2, to populate the \n\r in addition to the key string.

3) Call item_alloc to allocate the memory (from slab) for the item.

4) After item_alloc is called, set the properties of conn. Here ritem points to the data portion of an item chunk; and rlbytes is set to vlen. These two fields will be used to populate the data portion with the content, i.e., hello world, in the next post.

do_item_alloc

Unlike other methods we have discussed, item_alloc is a wrapper of do_item_alloc without adding any locks. I would assume this wrapper is added simply for consistent code style.


item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
item *it;
/* do_item_alloc handles its own locks */
it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
return it;
}

main@memcached.c:5849

item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
const rel_time_t exptime, const int nbytes,
const uint32_t cur_hv) {
int i;
uint8_t nsuffix;
item *it = NULL;
char suffix[40];
unsigned int total_chunks; // scr: -----------------------------> 1)
size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
... // scr: cas

unsigned int id = slabs_clsid(ntotal); // scr: -----------------------> 2)
if (id == 0)
return 0;

/* If no memory is available, attempt a direct LRU juggle/eviction */
/* This is a race in order to simplify lru_pull_tail; in cases where
* locked items are on the tail, you want them to fall out and cause
* occasional OOM's, rather than internally work around them.
* This also gives one fewer code path for slab alloc/free
*/
for (i = 0; i < 5; i++) {
/* Try to reclaim memory first */
... // scr: legacy, no lru_maintainer_thread
it = slabs_alloc(ntotal, id, &total_chunks, 0); // scr: ----------> 3)
... // scr: no-expire setting
if (it == NULL) {
if (settings.lru_maintainer_thread) { // scr: ----------------> 4)
lru_pull_tail(id, HOT_LRU, total_chunks, false, cur_hv);
lru_pull_tail(id, WARM_LRU, total_chunks, false, cur_hv);
lru_pull_tail(id, COLD_LRU, total_chunks, true, cur_hv);
} else {
... // scr: legacy, no lru_maintainer_thread
}
} else {
break;
}
}

... // scr: stat & sanity check

/* Refcount is seeded to 1 by slabs_alloc() */
it->next = it->prev = it->h_next = 0; // scr: ------------------------> 5)
/* Items are initially loaded into the HOT_LRU. This is '0' but I want at
* least a note here. Compiler (hopefully?) optimizes this out.
*/
if (settings.lru_maintainer_thread) {
... // scr: no expire setting
} else {
id |= HOT_LRU; // scr: ---------------------------------------> 6)
}
} else {
... // scr: legacy, no lru_maintainer_thread
}
it->slabs_clsid = id; // scr: ----------------------------------------> 7)

DEBUG_REFCNT(it, '*');
it->it_flags = settings.use_cas ? ITEM_CAS : 0;
it->nkey = nkey;
it->nbytes = nbytes;
memcpy(ITEM_key(it), key, nkey);
it->exptime = exptime;
memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
it->nsuffix = nsuffix;
return it;
}
do_item_alloc@items.c

1) item_make_header initializes suffix portion of the item chunk using the meta data (flags) and key.


static size_t item_make_header(const uint8_t nkey, const unsigned int flags, const int nbytes,
char *suffix, uint8_t *nsuffix) {
/* suffix is defined at 40 chars elsewhere.. */
*nsuffix = (uint8_t) snprintf(suffix, 40, " %u %d\r\n", flags, nbytes - 2);
return sizeof(item) + nkey + *nsuffix + nbytes;
}

item_make_header@items.c

2), 3) are discussed in detail in Slab III. To recap, slabs_clsid select the most optimal slab class and slab_alloc allocates one item chunk from slab sub-system.

4) If slab_alloc fails, try to release some memory using lru_pull_tail and retry the allocation for at most 5 times. lru_pull_tail is the focus of the next section.

5) Initialize the pointers of LRU list and hash collision list.

6) Set the list type (HOT_LRU) to the slabs_clsid, which indicates that this item belongs to the “HOT” LRU list of its respective slab class.

7) Initialize other fields of item.

lru_pull_tail

static int lru_pull_tail(const int orig_id, const int cur_lru,
const unsigned int total_chunks, const bool do_evict, const uint32_t cur_hv) {
item *it = NULL;
int id = orig_id; // scr: ---------------------------------------> p)
int removed = 0;
if (id == 0)
return 0;

int tries = 5;
item *search;
item *next_it;
void *hold_lock = NULL;
unsigned int move_to_lru = 0;
uint64_t limit;

id |= cur_lru; // scr: ------------------------------------------> p)
pthread_mutex_lock(&lru_locks[id]);
search = tails[id]; // scr: -------------------------------------> p)
/* We walk up *only* for locked items, and if bottom is expired. */
for (; tries > 0 && search != NULL; tries--, search=next_it) {//s: p)
/* we might relink search mid-loop, so search->prev isn't reliable */
next_it = search->prev; // scr: -----------------------------> p)
...// scr: irrelevant code here
uint32_t hv = hash(ITEM_key(search), search->nkey);
/* Attempt to hash item lock the "search" item. If locked, no
* other callers can incr the refcount. Also skip ourselves. */
if (hv == cur_hv || (hold_lock = item_trylock(hv)) == NULL)
continue;
/* Now see if the item is refcount locked */
if (refcount_incr(&search->refcount) != 2) { // scr: --------> s)
/* Note pathological case with ref'ed items in tail.
* Can still unlink the item, but it won't be reusable yet */
itemstats[id].lrutail_reflocked++;
/* In case of refcount leaks, enable for quick workaround. */
/* WARNING: This can cause terrible corruption */
if (settings.tail_repair_time &&
search->time + settings.tail_repair_time < current_time) {
itemstats[id].tailrepairs++;
search->refcount = 1;
/* This will call item_remove -> item_free since refcnt is 1 */
do_item_unlink_nolock(search, hv);
item_trylock_unlock(hold_lock);
continue;
}
}

/* Expired or flushed */ // scr: ---------------------------> e1)
if ((search->exptime != 0 && search->exptime < current_time)
|| item_is_flushed(search)) {
...// scr: stat
/* refcnt 2 -> 1 */
do_item_unlink_nolock(search, hv);
/* refcnt 1 -> 0 -> item_free */
do_item_remove(search);
item_trylock_unlock(hold_lock);
removed++;

/* If all we're finding are expired, can keep going */
continue;
}

/* If we're HOT_LRU or WARM_LRU and over size limit, send to COLD_LRU.
* If we're COLD_LRU, send to WARM_LRU unless we need to evict
*/
switch (cur_lru) {
case HOT_LRU: // scr: -----------------------------------> 1)
limit = total_chunks * settings.hot_lru_pct / 100;
case WARM_LRU:
limit = total_chunks * settings.warm_lru_pct / 100; // 1)
if (sizes[id] > limit) { // scr: --------------------> 2)
...// scr: stat
move_to_lru = COLD_LRU;
do_item_unlink_q(search);
it = search;
removed++;
break;
} else if ((search->it_flags & ITEM_ACTIVE) != 0) { //e3)
...// scr: stat
search->it_flags &= ~ITEM_ACTIVE;
do_item_update_nolock(search);
do_item_remove(search);
item_trylock_unlock(hold_lock);
} else { // scr: ------------------------------------> 3)
/* Don't want to move to COLD, not active, bail out */
it = search;
}
break;
case COLD_LRU:
it = search; /* No matter what, we're stopping */
if (do_evict) {
if (settings.evict_to_free == 0) {
...// scr: not applied here
}
...// scr: stat
LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EVICTION, search);
do_item_unlink_nolock(search, hv); // scr: ------> 4)
removed++;
if (settings.slab_automove == 2) {
...// scr: not applied here
}
} else if
...// scr: not applied here
}
break;
}
if (it != NULL)
break;
}

pthread_mutex_unlock(&lru_locks[id]);

if (it != NULL) { // scr: --------------------------------------> e2)
if (move_to_lru) {
it->slabs_clsid = ITEM_clsid(it);
it->slabs_clsid |= move_to_lru;
item_link_q(it);
}
do_item_remove(it);
item_trylock_unlock(hold_lock);
}

return removed;
}
lru_pull_tail@items.c

Method start & end

p) This method starts by selecting the tail element of the designated LRU list using the slab class id and the list type, assuming that the element can be a release candidate. And it iterates over (at most 5 entries) the list in reverse order to find a entry in case that elements near the tail are recently accessed.

s) For each item selected, increase its reference count. In normal situation, the original value of reference count should be 1 (as you will see in the last step of the create operation). Hence a != 2 value after the increment indicates an exception that needs to be corrected. Note that the reference count is now 2 so it is required to decrease at least one time (back to 1) when the processing of the current item is done (e1, e2 or e3 is reached).

e1) Remove the item directly when an expiration is detected. Here the do_item_unlink_nolock is exactly the same as the discussed do_item_unlink (I think the code is duplicated to emphasize that this method is not thread-safe), and it follows the same “unlink and remove” routine as in item delete.

e2) When a candidate is found, we might need to relocate it to another list (when move_to_lru is set in the switch case) by calling item_link_q. And we do need to call do_item_remove to reduce the reference count back to 1. The decision is made by the steps discussed bellow.

e3) If an item is recently accessed, reset the ITEM_ACTIVE flag; bump it to the head of the list; decrease its reference count and iterate to the next one (maximum 5 times). Remember that the flag ITEM_ACTIVE is set by item_get, and here is the place where the item gets bumped.

Hot & warm

1) The only difference of HOT_LRU and WARM_LRU is the threshold (limit) which are indicated by their respective configurations hot_lru_pct and warm_lru_pct.


settings.hot_lru_pct = 32;
...
case HOT_LRU_PCT:
if (subopts_value == NULL) {
fprintf(stderr, "Missing hot_lru_pct argument\n");
return 1;
};
settings.hot_lru_pct = atoi(subopts_value);
if (settings.hot_lru_pct < 1 || settings.hot_lru_pct >= 80) {
fprintf(stderr, "hot_lru_pct must be > 1 and < 80\n");
return 1;
}
break;
...

hot_lru_pct@memcached.c


...
settings.warm_lru_pct = 32;
...
case WARM_LRU_PCT:
if (subopts_value == NULL) {
fprintf(stderr, "Missing warm_lru_pct argument\n");
return 1;
};
settings.warm_lru_pct = atoi(subopts_value);
if (settings.warm_lru_pct < 1 || settings.warm_lru_pct >= 80) {
fprintf(stderr, "warm_lru_pct must be > 1 and < 80\n");
return 1;
}
break;
...

warm_lru_pct@memcached.c

2) If the threshold of HOT_LRU or WARM_LRU is reached, remove the item from the current list using do_item_unlink_q, and goto e2). Therefore, e2) is responsible to relink it to the COLD_LRU and decrease the reference count.

3) If the current item is not “active”, and the threshold is not reached, finish this method without any item relocation, nor release.

Cold

4) If there are any item in the list, evict it directly with the discussed do_item_unlink_nolock to free up its resource, and goto e1). Note that the default values of evict_to_free and slab_automove are set to values that disable their respective if blocks.


...
settings.evict_to_free = 1;
...

memcached.c:215


...
case MODERN:
/* Modernized defaults. Need to add equivalent no_* flags
* before making truly default. */
...
settings.slab_automove = 1;
...

memcached.c:5824

Now we input the second line of the command

> hello world

and trigger the following procedures.

Populate the item with content

As mentioned in process_update_command, the content we input is populated to conn.item using conn.ritem and conn.rlbytes. This step is handled by drive machine which will be discussed in detail in the next post.


...
res = read(c->sfd, c->ritem, c->rlbytes);
...

memcached.c:4421

Now we consider conn.item is filled with all relevant information, hence the next and final step is to

the call stack of this step is

ref
2 |~Drive machine & command parser
|-complete_nread
|-complete_nread_ascii
|-do_store_item
|=do_item_link (LRU I)
-- |-do_item_remove
complete_nread checks the protocol in use and moves directly to


static void complete_nread(conn *c) {
assert(c != NULL);
assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);

if (c->protocol == ascii_prot) {
complete_nread_ascii(c);
} else if (c->protocol == binary_prot) {
complete_nread_binary(c);
}
}
complete_nread@memcached.c

complete_nread_ascii

static void complete_nread_ascii(conn *c) {
assert(c != NULL);

item *it = c->item;
int comm = c->cmd;
enum store_item_type ret;
...// scr: stat

if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk"); // scr: -----> 1)
} else {
ret = store_item(it, comm, c);

switch (ret) {
case STORED:
out_string(c, "STORED");
break;
case EXISTS:
out_string(c, "EXISTS");
break;
case NOT_FOUND:
out_string(c, "NOT_FOUND");
break;
case NOT_STORED:
out_string(c, "NOT_STORED");
break;
default:
out_string(c, "SERVER_ERROR Unhandled storage type.");
}

}
// scr: -------------------------------------------------------> 2)
item_remove(c->item); /* release the c->item reference */
c->item = 0;
}
do_store_item@items.c

1) Call store_item to link the item to the LRU list and hash map.

2) The reference count is set to 1 by do_slab_alloc in do_item_alloc and increased by do_item_link in do_store_item. So reduce it to the normal value, 1, with item_remove. The methods of those have all been discussed in detail.

do_store_item

enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
char *key = ITEM_key(it);
item *old_it = do_item_get(key, it->nkey, hv, c); // scr: ------------> 1)
enum store_item_type stored = NOT_STORED;

item *new_it = NULL;
uint32_t flags;

if (old_it != NULL && comm == NREAD_ADD) {
... // scr: update logic
} else if (!old_it && (comm == NREAD_REPLACE
|| comm == NREAD_APPEND || comm == NREAD_PREPEND))
{
/* replace only replaces an existing value; don't store */
} else if (comm == NREAD_CAS) {
... // scr: cas
} else {
...
if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
... // scr: comm is NREAD_ADD (1)
}

if (stored == NOT_STORED && failed_alloc == 0) {
if (old_it != NULL)
item_replace(old_it, it, hv);
else
do_item_link(it, hv); // scr: ----------------------------> 2)

...

stored = STORED;
}
}

... // scr: irrelevant code

// scr: cas
LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL,
stored, comm, ITEM_key(it), it->nkey);

return stored;
}
do_store_item@items.c

1) The newly created item exists only in the slab subsystem, hence do_item_get returns null as there is no such record in hash map yet.

2) So in the context of item creation, do_store_item is essentially the same as do_item_link.

Reference

Replacing the cache replacement algorithm in memcached

Examples of Memcached telnet commands

Memcached telnet command summary

That's it. Did I make a serious mistake? or miss out on anything important? Or you simply like the read. Link me on -- I'd be chuffed to hear your feedback.