Threads:
The main rule here is that only the main R thread can call a function from the R API. The civetweb callbacks run on the civetweb threads, so they cannot call any R API. Currently we only use the begin_request()
callback, and this is called from the request threads. So we need to synchronize between the request threads and the main R thread.
We essentially have a producer-consumer problem, with a single consumer, the main R thread, and multiple producers, the request threads. Having a single consumer means that the queue to store the jobs is of length one. This is a good guide on how to solve such a problem: https://docs.oracle.com/cd/E36784_01/html/E36868/sync-31.html
We need two conditions, to signal that 1) there is something to work on, and that 2) new work may come in. We also need a mutex to be able to wait on these conditions. These are stored in the user_data
of the civetweb server instance:
struct server_user_data {
...
pthread_cond_t process_more; /* there is something to process */
pthread_cond_t process_less; /* we can process something */
pthread_mutex_t process_lock;
struct mg_connection *nextconn;
...
};
nextconn
is the queue, it is used to pass the request from a request thread to the main R thread. When a request thread comes in, it has to make sure that nextconn
is NULL
, so waits on process_less
. Once given the green light, it sets nextconn
to the civetweb connection object, and then wait on its own finish_cond
condition, which is stored in the connection specific user data:
struct connection_user_data {
pthread_cond_t finish_cond; /* can finish callback? */
pthread_mutex_t finish_lock;
int main_todo; /* what should the main thread do? */
int req_todo; /* what shoudl the request thread do? */
double secs; /* how much should we wait? */
SEXP req;
...
};
The main R thread can use the user_data
of next_conn
to access all information about the connection and the request. Once the main R thread is done with processing the request, it sets the connection’s req_todo
field to non-zero, and signals the connection’s finish_cond
condition to allow the request thread to continue. Then it also signals the process_less
condition of the server, to let other request threads in.
Currently the main R thread can set req_todo
to two different values.
WEBFAKES_DONE
means that the request was processed, and the request thread can quit. Most requests are like this.WEBFAKES_WAIT
means that the request thread still needs to stay around and sleep for the specified number of secs
. After sleeping for the specified amount of time, the request thread will signal process_more
again, notifying the main R thread, but it also sets main_todo
to WEBFAKES_WAIT
, so the main R thread knows that this is not a new request. The main R thread can just take the stored request from the req
field of the connection user data in this case.While the server is running, all errors must be handled and the server must keep running.
They are caught and re-thrown, with the civetweb error log added. The error log typically contains more information. E.g. the most common failure is that the specified port is not free and the error log has a meaningful error message in this case.
Errors that happen in the R request handler functions are caught and the server will send an HTTP 500 response, with the R error message:
while (TRUE) {
req <- server_poll(srv)
tryCatch(
self$.process_request(req),
error = function(err) {
cat(as.character(err), file = stderr())
response_send_error(req, as.character(err), 500L)
}
)
}
For a response that is sent out in multiple pieces, this is not possible if the status code and the headers have been sent out already. In this case we just send out the R error message.
Errors that happen in the C code while processing the request or the response are different, because most probably we cannot send anything meaningful to the client. E.g. the most frequent such error happens when the connection breaks or the client closes the connection. These errors are caught in the server_poll()
and response_*()
R functions, and printed to the screen (see server.R
). If they originate from civetweb, then they are also logged in the civetweb error log. These errors invalidate the request, and finish the processing callback. This is implemented by the server.R
functions (re)throwing a webfakes_error
, which is caught and then silently ignored by the processing loop.
See ‘Resource cleanup’ below for how resources are cleaned up on error.
See the ‘Multithreading’ section above as well.
We create a req
object for an incoming request, before passing it to R from C. This object is an environment and it is kept until the response to the request is completely sent out. (Or the connection is closed for some reason.) The req
object is also added to the connection user data of civetweb. Additionally, the server keeps an list (environment) of all request objects. The latter makes sure that the request object is not garbage collected, so we don’t need to worry about that.
When a response is delayed, the app makes a note about the position of the handler function in the handler stack (in .stackptr
), so that this handler function can be called again, after the delay.
Then it calls response_write()
which sends a WEBFAKES_WAIT
message to the request thread. Then the main R thread can continue processing and potentially serving other requests, assuming the server has been started with at least two threads. After the wait, the request thread sends a message to the main R thread again, and the app’s poll call will get the same request object for the second (etc.) time. The app starts calling the handler functions from the recorded .stackptr
position.
The server runs until it is interrupted. (From the console or remotely via processx::process$interrupt()
.)
We need to make sure that the server can be interrupted while waiting for new requests (i.e. the main R thread waiting on the process_more
condition, see ‘Multithreading’ above). pthread_cond_wait()
is interrupted by SIGINT
on Unix, seemingly, but not on Windows, so we need to use pthread_cond_timedwait()
. We currently check for interrupts every 50ms. If the server is interrupted at this point, no cleanup is needed needed, as it does not hold any resources. In fact the functions in server.R
will keep the server intact, with all delayed responses, and it is possible to call server_poll()
again. But the app$listen()
method does clean up the server in this case. Maybe this will change in the future.
In theory the C code cannot be interrupted at any other points. On the other hand the R API functions might error any time, so we do need proper cleanup everywhere, see the ‘Resource cleanup’ section below.
If the R code is interrupted, the server.R
functions do not need any cleanup. (In theory some error messages might get lost if the timing is extremely unfortunate and a server.R
function is handling an error when the interrupt happens.)
The app$listen()
method cleans up the server in on.exit()
.
See also ‘Interruption’ just above.
There are some points in the C code where R errors happen while the code is holding resources. We use (a copy of) the cleancall package to take care of resource cleanup here.
The first such place is the server_poll()
C function, after a request is in from the request thread. Creating an R object for the request involves a lot of R API calls, and if one of them fails, we need to clean up the resources associated with this connection. (The error will be logged and the R server_poll()
function will continue polling.) The cleanup in this case involves:
WEBFAKES_DONE
message to the request thread, so it will quit.process_less
condition, to let other threads know that we are again ready to process requests.Other functions that need cleanup are the C functions that work on the response: response_delay()
, response_send_headers()
, response_send()
and response_write()
. These cleanups are very similar to the one for server_poll()
.
The finalizer of the server object takes care of cleaning up all resources associated with a server, including all request objects and request threads. This is also called by the server_stop()
R function, which is in turn called by the on.exit()
on the listen()
method. The finalizer uses the list of requests in the tag of the xptr object to walk over all requests, and finish all request threads.
Thread that are sleeping because of a delayed response frequently check a server-wide shutdown flag, which the finalizer also sets, so these threads quit as well. Then the finalizer calls the civetweb function mg_stop()
. mg_stop()
has its own shutdown flag and waits for all request and worker threads to quit. Given that we just cleaned up all of them, there shouldn’t be too many, and if there were any just coming in, they’ll also observe out shutdown flag and quit quickly.