slab allocator (I, II, III) is the core module of the cache system, which largely determines how efficient the bottleneck resource, memory, can be utilized. The other 3 parts, namely,
In classic multithreading, large amounts of slow and blocking operations, mostly, I/O, can easily drain out available thread resources, which severely constrains the maximum number of requests a server can handle per unit time. More specific, threads are scheduled out and put into sleep in the middle of procedures that contain blocking I/O, despite piling up requests packets queuing within the network stack. In such situation, server side will show low throughput, low CPU saturation and high latency.
Here is a post around server side performance. Feel free to check it out.
An introduction to event driven
This is where asynchronous event driven model comes in, which drops the idea that context of a session must be coupled with a thread. In such model, session contexts are contained within and managed by a drive machine and a thread is fully unleashed with unblocking I/O operations. More specific, 1) when I/O occurs in the middle of a procedure, a thread does not block but instantly switch to the processing of another request; and 2) when the I/O completes, the context will be picked up by the drive machine to resume the interrupted session. As such, a potentially slow procedure is effectively divided into multiple manageable pieces, and the cutting points are marked by I/O operations. This results in more performant single threaded architecture in comparison to those employ thousands of threads.
In my understanding, event driven model, in its essential, is yet another instance of “divide and conquer” and “trade space for time” in a not very obvious way.
On the other hand, multithreading can be still used in event driven model purely for the purpose of parallelism. Thus, in practice, the number of threads employed does not exceed that of CPU cores. I will discuss the Memcachedmultithreading soon in Thread model.
The drive machine
From a developer’s point of view, there are numerous ways to program an asynchronous even driven server. Memcached adopts an approach called state machine, in which logic flow is divided into non-linear fragments identified with states, which is normally controled by a huge switch case. The brightside of this approach is that the mentioned breakdown of slow procedure is sincerely reflected by the logic fragments. But it makes the code style a bit different from what most developers are already used to.
Following is how the event driven state machine actually looks like.
staticvoiddrive_machine(conn *c){ bool stop = false; int sfd; socklen_t addrlen; structsockaddr_storageaddr; int nreqs = settings.reqs_per_event; int res; constchar *str; ...
case conn_nread: ... break; case conn_swallow: ... break; case conn_write: ... case conn_mwrite: ... break; case conn_closing: if (IS_UDP(c->transport)) conn_cleanup(c); else conn_close(c); stop = true; break;
case conn_closed: /* This only happens if dormando is an idiot. */ abort(); break;
case conn_watch: /* We handed off our connection to the logger thread. */ stop = true; break; case conn_max_state: assert(false); break; } }
return; }
drive_machine@memcached.c
The omitted switch blocks will be discussed in detail in following posts, so no worries.
Thread model
The thread model of Memcached is quite standard. There is a dispatcher thread, and there are preconfigured number of worker threads. Each thread runs an independent drive machine described above. The dispatcher thread, of which the responsible is to distribute requests among worker threads, only executes code under conn_listening. The actual requests are completed by worker threads running on the rest of the states.
Thread model
Next we go through the bootstrap portion of main function which establishes the various building blocks of event driven as well as the multithreading mechanism. And we will also see locations of the discussed sub-system ***_init methods in relation to the whole initialization process.
First thing first, all the system initialization relevant procedures are executed in the discussed dispatcher thread.
intmain(int argc, char **argv){ ...// scr: -----------------------------------------------------> *) ...// scr: initialize `settings` using default values and command line arguements ...// scr: sanity check
if (hash_init(hash_type) != 0) { // scr: ---------------> LRU II fprintf(stderr, "Failed to initialize hash_algorithm!\n"); exit(EX_USAGE); }
...// scr: initialize `settings` & sanity check
if (maxcore != 0) { // scr: --------------------------------> 1) structrlimitrlim_new; /* * First try raising to infinity; if that fails, try bringing * the soft limit to the hard. */ if (getrlimit(RLIMIT_CORE, &rlim) == 0) { rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY; if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) { /* failed. try raising just to the old max */ rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max; (void)setrlimit(RLIMIT_CORE, &rlim_new); } } /* * getrlimit again to see what we ended up with. Only fail if * the soft limit ends up 0, because then no core files will be * created at all. */
if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) { fprintf(stderr, "failed to ensure corefile creation\n"); exit(EX_OSERR); } }
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { // scr: --------> 1) fprintf(stderr, "failed to getrlimit number of files\n"); exit(EX_OSERR); } else { rlim.rlim_cur = settings.maxconns; rlim.rlim_max = settings.maxconns; if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) { fprintf(stderr, "failed to set rlimit for open files. Try starting as root or requesting smaller maxconns value.\n"); exit(EX_OSERR); } }
/* create the listening socket, bind it, and init */ if (settings.socketpath == NULL) { ...// scr: not applicable if (portnumber_filename != NULL) { ...// scr: not applicable }
errno = 0; // scr: -------------------------------------> 5) if (settings.port && server_sockets(settings.port, tcp_transport, portnumber_file)) { vperror("failed to listen on TCP port %d", settings.port); exit(EX_OSERR); }
errno = 0; // scr: -------------------------------------> 5) if (settings.udpport && server_sockets(settings.udpport, udp_transport, portnumber_file)) { vperror("failed to listen on UDP port %d", settings.udpport); exit(EX_OSERR); } if (portnumber_file) { ...// scr: not applicable }
The two relevant steps are 4) and 5) which will be discussed in the following sections.
1) Raise the limit for core dump file size as well as the number of file descriptors.
2) Call event_init to initialize the libevent framework. The value returned is called an event base.
3) For all potential connections, call conn_init to allocate space to store their respective contexts (located using file descriptor in global variable conns). The role of context in event driven model has already been discussed in introduction.
staticvoidconn_init(void){ ... if (getrlimit(RLIMIT_NOFILE, &rl) == 0) { max_fds = rl.rlim_max; } else { fprintf(stderr, "Failed to query maximum file descriptor; " "falling back to maxconns\n"); } ... if ((conns = calloc(max_fds, sizeof(conn *))) == NULL) { fprintf(stderr, "Failed to allocate connection structures\n"); /* This is unrecoverable so bail out early. */ exit(1); } }
*) Other miscellaneous system operations, such as setting the signal handler for SIGINT and SIGTERM; setbufstderr to NULL; dropping the root privileges of the process; and daemonizing. If those names do not ring a bell, <<Advanced UNIX Programming>> is your friend.
Threads initialization
The core data structure of multithreading mechanism is
typedefstruct { pthread_t thread_id; /* unique ID of this thread */ structevent_base *base;/* libevent handle this thread uses */ structeventnotify_event;/* listen event for notify pipe */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ ...// scr: stat structconn_queue *new_conn_queue;/* queue of new connections to handle */ ...// scr: cas & log } LIBEVENT_THREAD;
LIBEVENT_THREAD@memcached.h
memcached_thread_init
voidmemcached_thread_init(int nthreads, struct event_base *main_base){ ...// scr: initialize all sorts of mutexes and condition variables
for (i = 0; i < nthreads; i++) { int fds[2]; if (pipe(fds)) { // scr: -------------------------------> 4) perror("Can't create notify pipe"); exit(1); }
setup_thread(&threads[i]); // scr: ---------------------> 5) /* Reserve three fds for the libevent base, and two for the pipe */ stats_state.reserved_fds += 5; }
/* Create threads after we've done all the libevent setup. */ for (i = 0; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); // scr: ---> 6) }
/* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); }
conn_init@memcached.c
1) Allocate memory for an array of LIBEVENT_THREAD. The number of thread is num_threads Each element represents one thread. As described above, better the num_threads does not exceed the number of cores.
... settings.num_threads = 4; /* N workers */ ... case't': settings.num_threads = atoi(optarg); if (settings.num_threads <= 0) { fprintf(stderr, "Number of threads must be greater than 0\n"); return1; } /* There're other problems when you get above 64 threads. * In the future we should portably detect # of cores for the * default. */ if (settings.num_threads > 64) { fprintf(stderr, "WARNING: Setting a high number of worker" "threads is not recommended.\n" " Set this value to the number of cores in" " your machine or less.\n"); } break; ...
num_threads@memcached.c
2) Set the event base for the dispatcher_thread which represents the main thread itself. Note that dispatcher_thread is a global variable so the reference is accessible to all the worker threads.
4) Initialize the pipefds for each of the worker thread. Here the notify_send_fd is used for communication between dispatcher thread and worker threads - whenever the dispatcher threadwrites to notify_send_fd, an event is generated on the other side, notify_receive_fd, which is listened by worker threads. Again, <<Advanced UNIX Programming>> gives more information about pipe.
5) The full method name is supposed to be setup_libevent_for_each_thread. Will examine this method in the next section.
me->new_conn_queue = malloc(sizeof(struct conn_queue)); //scr:3) if (me->new_conn_queue == NULL) { perror("Failed to allocate memory for connection queue"); exit(EXIT_FAILURE); } cq_init(me->new_conn_queue);
...// scr: stat & cas }
setup_thread@thread.c
1) Call event_init to initialize the libevent instance for the worker thread. As discussed in thread model, each worker thread runs its own drive machine.
2) Set the thread_libevent_process as the callback of events emitted from the discussed notify_receive_fd. The major function of thread_libevent_process is to link the actual drive machine to events, which we will see very soon in inter-thread communication.
3) Allocate and initialize the connection queue of the worker thread.
create_worker
staticvoidcreate_worker(void *(*func)(void *), void *arg){ pthread_attr_t attr; int ret;
As mentioned, this method calls pthread_create to create the actual worker threads. The callback passed through is worker_libevent which essentially starts the event loop using event_base_loop, this time, on worker threads rather than dispatch thread.
The methods involved in socket initialization reconcile the initialization of both TCP and UDP while the following discussion covers only the TCP logic branch. And we consider portnumber_file is not set so as to focus on the critical path.
portnumber_file = fopen(temp_portnumber_filename, "a"); if (portnumber_file == NULL) { fprintf(stderr, "Failed to open \"%s\": %s\n", temp_portnumber_filename, strerror(errno)); } }
memcached.c:6029
Unlike worker threads that listen to internal (pipe) fds, dispatch thread is responsible for events generated from external socket fds (by network requests). The method that initializes sockets is server_sockets.
If network interface is not indicated by inter, server_sockets is equivalent to
6) conn_new initializes the context for the fd and adds it to libevent with initial state set to conn_listening and callback as event_handler. Here event_handler is another transient method leading to the drive machine on dispatcher thread. Likewise, this method will be discussed soon in inter-thread communication.
if (event_add(&c->event, 0) == -1) { perror("event_add"); returnNULL; } ...// scr: stat return c; }
memcached.c:120
7) Add the context to the head of a global list listen_conn.
... static conn *listen_conn = NULL; ...
memcached.c:120
Next we briefly go through the process that handles a new connection to wrap up the
Inter-thread communication
Inter-thread communication
event_handler
Firstly, after a TCP connection completes, the fd monitored by dispatcher thread notifies libevent, which invokes the mentioned event_handler. Next, the logic flow enters the code snippet we got in the beginning - the drive machine, with contextstate initialized as conn_listening in socket initialization.
voidevent_handler(constint fd, const short which, void *arg){ conn *c;
c = (conn *)arg; assert(c != NULL);
c->which = which;
/* sanity */ ...
drive_machine(c);
/* wait for next event */ return; }
event_handler@memcached.c
staticvoiddrive_machine(conn *c){ ... while (!stop) {
At this stage, the drive machine 1) accepts the connection and derives another fd that can be read from. It 2) then calls dispatch_conn_new with the new fd and other relevant information including the next state, conn_new_cmd.
dispatch_conn_new
voiddispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport){ CQ_ITEM *item = cqi_new(); char buf[1]; if (item == NULL) { ...// scr: error handling }
int tid = (last_thread + 1) % settings.num_threads; // scr: > 1)
2) Initializes a CQ_ITEM instance. Here CQ_ITEM is an intermediate object passed to worker threads through connection queue, so worker threads can create new context based on it.
typedefstructconn_queue_itemCQ_ITEM; structconn_queue_item { int sfd; enum conn_states init_state; int event_flags; int read_buffer_size; enum network_transport transport; CQ_ITEM *next; };
3) Call conn_new. In server_socket we know that conn_new establishes the context, this time, for the new connection, and adds the accepted fd to libevent. Here on worker thread, the callback is set to event_handler, which essentially connects the drive machine to the upcoming events on the same connection.
That's it. Did I make a serious mistake? or miss out on anything important? Or you simply like the read. Link me on -- I'd be chuffed to hear your feedback.