From 37d0105c8328f718740c2ae9bae0dff87287a467 Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Mon, 23 Apr 2018 12:20:05 +0300 Subject: [PATCH] Kill misbehaving subscribed clients instead of hanging This change only affects clients that are subscribed to events, which should be the main cause of our problems. In the common case (no buffered data) the behaviour doesn't change at all: the message is sent directly, no ev_io / ev_timeout callback is enabled. Once a write to a client's socket is not completed fully (returns with EAGAIN error), we put the message in the tail of a queue and init an ev_io callback and a corresponding timer. If the timer is triggered first, the socket is closed and the client connection is removed. If the socket becomes writeable before the timeout we either reset the timer if we couldn't push all the buffered data or completely remove it if everything was pushed. We could also replace ipc_send_message() for all client connections in i3, not just those subscribed to events. Furthermore, we could limit the amount of messages stored and increase the timeout (or use multiple timeouts): eg it's ok if a client is not reading for 10 seconds and we are only holding 5KB of messages for them but it is not ok if they are inactive for 5 seconds and we have 30MB of messages held. Closes #2999 Closes #2539 --- docs/ipc | 8 + include/config_directives.h | 1 + include/ipc.h | 11 ++ include/libi3.h | 8 + libi3/safewrappers.c | 22 ++- parser-specs/config.spec | 6 + src/config_directives.c | 4 + src/ipc.c | 162 ++++++++++++++++++- testcases/t/201-config-parser.t | 1 + testcases/t/298-ipc-misbehaving-connection.t | 69 ++++++++ 10 files changed, 287 insertions(+), 5 deletions(-) create mode 100644 testcases/t/298-ipc-misbehaving-connection.t diff --git a/docs/ipc b/docs/ipc index e44ffd7c..bcf8df1a 100644 --- a/docs/ipc +++ b/docs/ipc @@ -693,6 +693,14 @@ program does not want to cope which such kinds of race conditions (an event based library may not have a problem here), I suggest you create a separate connection to receive events. +If an event message needs to be sent and the socket is not writeable (write +returns EAGAIN, happens when the socket doesn't have enough buffer space for +writing new data) then i3 uses a queue system to store outgoing messages for +each client. This is combined with a timer: if the message queue for a client is +not empty and no data where successfully written in the past 10 seconds, the +connection is killed. Practically, this means that your client should try to +always read events from the socket to avoid having its connection closed. + === Subscribing to events By sending a message of type SUBSCRIBE with a JSON-encoded array as payload diff --git a/include/config_directives.h b/include/config_directives.h index f21ad8e1..4a20a1f5 100644 --- a/include/config_directives.h +++ b/include/config_directives.h @@ -62,6 +62,7 @@ CFGFUN(assign_output, const char *output); CFGFUN(assign, const char *workspace, bool is_number); CFGFUN(no_focus); CFGFUN(ipc_socket, const char *path); +CFGFUN(ipc_kill_timeout, const long timeout_ms); CFGFUN(restart_state, const char *path); CFGFUN(popup_during_fullscreen, const char *value); CFGFUN(color, const char *colorclass, const char *border, const char *background, const char *text, const char *indicator, const char *child_border); diff --git a/include/ipc.h b/include/ipc.h index c6ad35c7..a1caea82 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -35,6 +35,11 @@ typedef struct ipc_client { * event has been sent by i3. */ bool first_tick_sent; + struct ev_io *callback; + struct ev_timer *timeout; + uint8_t *buffer; + size_t buffer_size; + TAILQ_ENTRY(ipc_client) clients; } ipc_client; @@ -124,3 +129,9 @@ void ipc_send_barconfig_update_event(Barconfig *barconfig); * For the binding events, we send the serialized binding struct. */ void ipc_send_binding_event(const char *event_type, Binding *bind); + +/** + * Set the maximum duration that we allow for a connection with an unwriteable + * socket. + */ +void ipc_set_kill_timeout(ev_tstamp new); diff --git a/include/libi3.h b/include/libi3.h index ebddee96..d3b40796 100644 --- a/include/libi3.h +++ b/include/libi3.h @@ -166,6 +166,14 @@ int sasprintf(char **strp, const char *fmt, ...); */ ssize_t writeall(int fd, const void *buf, size_t count); +/** + * Like writeall, but instead of retrying upon EAGAIN (returned when a write + * would block), the function stops and returns the total number of bytes + * written so far. + * + */ +ssize_t writeall_nonblock(int fd, const void *buf, size_t count); + /** * Safe-wrapper around writeall which exits if it returns -1 (meaning that * write failed) diff --git a/libi3/safewrappers.c b/libi3/safewrappers.c index 94ad4ee6..04bbda44 100644 --- a/libi3/safewrappers.c +++ b/libi3/safewrappers.c @@ -68,10 +68,9 @@ int sasprintf(char **strp, const char *fmt, ...) { ssize_t writeall(int fd, const void *buf, size_t count) { size_t written = 0; - ssize_t n = 0; while (written < count) { - n = write(fd, buf + written, count - written); + const ssize_t n = write(fd, buf + written, count - written); if (n == -1) { if (errno == EINTR || errno == EAGAIN) continue; @@ -83,6 +82,25 @@ ssize_t writeall(int fd, const void *buf, size_t count) { return written; } +ssize_t writeall_nonblock(int fd, const void *buf, size_t count) { + size_t written = 0; + + while (written < count) { + const ssize_t n = write(fd, buf + written, count - written); + if (n == -1) { + if (errno == EAGAIN) { + return written; + } else if (errno == EINTR) { + continue; + } else { + return n; + } + } + written += (size_t)n; + } + return written; +} + ssize_t swrite(int fd, const void *buf, size_t count) { ssize_t n; diff --git a/parser-specs/config.spec b/parser-specs/config.spec index b15c9a80..5cdb7c32 100644 --- a/parser-specs/config.spec +++ b/parser-specs/config.spec @@ -48,6 +48,7 @@ state INITIAL: 'show_marks' -> SHOW_MARKS 'workspace' -> WORKSPACE 'ipc_socket', 'ipc-socket' -> IPC_SOCKET + 'ipc_kill_timeout' -> IPC_KILL_TIMEOUT 'restart_state' -> RESTART_STATE 'popup_during_fullscreen' -> POPUP_DURING_FULLSCREEN exectype = 'exec_always', 'exec' -> EXEC @@ -281,6 +282,11 @@ state IPC_SOCKET: path = string -> call cfg_ipc_socket($path) +# ipc_kill_timeout +state IPC_KILL_TIMEOUT: + timeout = number + -> call cfg_ipc_kill_timeout(&timeout) + # restart_state (for testcases) state RESTART_STATE: path = string diff --git a/src/config_directives.c b/src/config_directives.c index 17e25cde..dfbb52d8 100644 --- a/src/config_directives.c +++ b/src/config_directives.c @@ -446,6 +446,10 @@ CFGFUN(no_focus) { TAILQ_INSERT_TAIL(&assignments, assignment, assignments); } +CFGFUN(ipc_kill_timeout, const long timeout_ms) { + ipc_set_kill_timeout(timeout_ms / 1000.0); +} + /******************************************************************************* * Bar configuration (i3bar) ******************************************************************************/ diff --git a/src/ipc.c b/src/ipc.c index 240119bc..977fe377 100644 --- a/src/ipc.c +++ b/src/ipc.c @@ -38,9 +38,39 @@ static void set_nonblock(int sockfd) { err(-1, "Could not set O_NONBLOCK"); } +/* + * Given a message and a message type, create the corresponding header, merge it + * with the message and append it to the given client's output buffer. + * + */ +static void append_payload(ipc_client *client, uint32_t message_type, const char *payload) { + const size_t size = strlen(payload); + const i3_ipc_header_t header = { + .magic = {'i', '3', '-', 'i', 'p', 'c'}, + .size = size, + .type = message_type}; + const size_t header_size = sizeof(i3_ipc_header_t); + const size_t message_size = header_size + size; + + client->buffer = srealloc(client->buffer, client->buffer_size + message_size); + memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size); + memcpy(client->buffer + client->buffer_size + header_size, payload, size); + client->buffer_size += message_size; +} + static void free_ipc_client(ipc_client *client) { close(client->fd); - for (int i = 0; i < client->num_events; i++){ + + ev_io_stop(main_loop, client->callback); + FREE(client->callback); + if (client->timeout) { + ev_timer_stop(main_loop, client->timeout); + FREE(client->timeout); + } + + free(client->buffer); + + for (int i = 0; i < client->num_events; i++) { free(client->events[i]); } free(client->events); @@ -48,6 +78,68 @@ static void free_ipc_client(ipc_client *client) { free(client); } +static void ipc_client_timeout(EV_P_ ev_timer *w, int revents); +static void ipc_socket_writeable_cb(EV_P_ struct ev_io *w, int revents); + +static ev_tstamp kill_timeout = 10.0; + +void ipc_set_kill_timeout(ev_tstamp new) { + kill_timeout = new; +} + +/* + * Try to write the contents of the pending buffer to the client's subscription + * socket. Will set, reset or clear the timeout and io callbacks depending on + * the result of the write operation. + * + */ +static void ipc_push_pending(ipc_client *client) { + const ssize_t result = writeall_nonblock(client->fd, client->buffer, client->buffer_size); + if (result < 0) { + return; + } + + if ((size_t)result == client->buffer_size) { + /* Everything was written successfully: clear the timer and stop the io + * callback. */ + FREE(client->buffer); + client->buffer_size = 0; + if (client->timeout) { + ev_timer_stop(main_loop, client->timeout); + FREE(client->timeout); + } + ev_io_stop(main_loop, client->callback); + return; + } + + /* Otherwise, make sure that the io callback is enabled and create a new + * timer if needed. */ + ev_io_start(main_loop, client->callback); + + if (!client->timeout) { + struct ev_timer *timeout = scalloc(1, sizeof(struct ev_timer)); + ev_timer_init(timeout, ipc_client_timeout, kill_timeout, 0.); + timeout->data = client; + client->timeout = timeout; + ev_set_priority(timeout, EV_MINPRI); + ev_timer_start(main_loop, client->timeout); + } else if (result > 0) { + /* Keep the old timeout when nothing is written. Otherwise, we would + * keep a dead connection by continuously renewing its timeouts. */ + ev_timer_stop(main_loop, client->timeout); + ev_timer_set(client->timeout, kill_timeout, 0.0); + ev_timer_start(main_loop, client->timeout); + } + if (result == 0) { + return; + } + + /* Shift the buffer to the left and reduce the allocated space. */ + client->buffer_size -= (size_t)result; + memmove(client->buffer, client->buffer + result, client->buffer_size); + client->buffer = srealloc(client->buffer, client->buffer_size); +} + /* * Sends the specified event to all IPC clients which are currently connected * and subscribed to this kind of event. @@ -67,7 +159,11 @@ void ipc_send_event(const char *event, uint32_t message_type, const char *payloa if (!interested) continue; - ipc_send_message(current->fd, strlen(payload), message_type, (const uint8_t *)payload); + const bool push_now = (current->buffer_size == 0); + append_payload(current, message_type, payload); + if (push_now) { + ipc_push_pending(current); + } } } @@ -1286,6 +1382,60 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) { FREE(message); } +static void ipc_client_timeout(EV_P_ ev_timer *w, int revents) { + /* No need to be polite and check for writeability, the other callback would + * have been called by now. */ + ipc_client *client = (ipc_client *)w->data; + + char *cmdline = NULL; +#if defined(__linux__) && defined(SO_PEERCRED) + struct ucred peercred; + socklen_t so_len = sizeof(peercred); + if (getsockopt(client->fd, SOL_SOCKET, SO_PEERCRED, &peercred, &so_len) != 0) { + goto end; + } + char *exepath; + sasprintf(&exepath, "/proc/%d/cmdline", peercred.pid); + + int fd = open(exepath, O_RDONLY); + free(exepath); + if (fd == -1) { + goto end; + } + char buf[512] = {'\0'}; /* cut off cmdline for the error message. */ + const ssize_t n = read(fd, buf, sizeof(buf)); + close(fd); + if (n < 0) { + goto end; + } + for (char *walk = buf; walk < buf + n - 1; walk++) { + if (*walk == '\0') { + *walk = ' '; + } + } + cmdline = buf; +#endif + +end: + if (cmdline) { + ELOG("client %p with pid %d and cmdline '%s' on fd %d timed out, killing\n", client, peercred.pid, cmdline, client->fd); + } else { + ELOG("client %p on fd %d timed out, killing\n", client, client->fd); + } + + free_ipc_client(client); +} + +static void ipc_socket_writeable_cb(EV_P_ ev_io *w, int revents) { + DLOG("fd %d writeable\n", w->fd); + ipc_client *client = (ipc_client *)w->data; + + /* If this callback is called then there should be a corresponding active + * timer. */ + assert(client->timeout != NULL); + ipc_push_pending(client); +} + /* * Handler for activity on the listening socket, meaning that a new client * has just connected and we should accept() him. Sets up the event handler @@ -1314,10 +1464,16 @@ void ipc_new_client(EV_P_ struct ev_io *w, int revents) { ev_io_init(package, ipc_receive_message, client, EV_READ); ev_io_start(EV_A_ package); + ipc_client *new = scalloc(1, sizeof(ipc_client)); + + package = scalloc(1, sizeof(struct ev_io)); + package->data = new; + ev_io_init(package, ipc_socket_writeable_cb, client, EV_WRITE); + DLOG("IPC: new client connected on fd %d\n", w->fd); - ipc_client *new = scalloc(1, sizeof(ipc_client)); new->fd = client; + new->callback = package; TAILQ_INSERT_TAIL(&all_clients, new, clients); } diff --git a/testcases/t/201-config-parser.t b/testcases/t/201-config-parser.t index 1e7ebfc4..a58f33c1 100644 --- a/testcases/t/201-config-parser.t +++ b/testcases/t/201-config-parser.t @@ -501,6 +501,7 @@ my $expected_all_tokens = "ERROR: CONFIG: Expected one of these tokens: , ' workspace ipc_socket ipc-socket + ipc_kill_timeout restart_state popup_during_fullscreen exec_always diff --git a/testcases/t/298-ipc-misbehaving-connection.t b/testcases/t/298-ipc-misbehaving-connection.t new file mode 100644 index 00000000..d53ee92d --- /dev/null +++ b/testcases/t/298-ipc-misbehaving-connection.t @@ -0,0 +1,69 @@ +#!perl +# vim:ts=4:sw=4:expandtab +# +# Please read the following documents before working on tests: +# • https://build.i3wm.org/docs/testsuite.html +# (or docs/testsuite) +# +# • https://build.i3wm.org/docs/lib-i3test.html +# (alternatively: perldoc ./testcases/lib/i3test.pm) +# +# • https://build.i3wm.org/docs/ipc.html +# (or docs/ipc) +# +# • http://onyxneon.com/books/modern_perl/modern_perl_a4.pdf +# (unless you are already familiar with Perl) +# +# Test that i3 will not hang if a connected client stops reading from its +# subscription socket and that the client is killed after a delay. +# Ticket: #2999 +# Bug still in: 4.15-180-g715cea61 +use i3test i3_config => <new(Peer => get_socket_path()); +my $magic = "i3-ipc"; +my $payload = '["workspace"]'; +my $message = $magic . pack("LL", length($payload), 2) . $payload; +print $sock $message; + +# Constantly switch between 2 workspaces to generate events. +fresh_workspace; +open_window; +fresh_workspace; +open_window; + +eval { + local $SIG{ALRM} = sub { die "Timeout\n" }; + # 500 is an arbitrarily large number to make sure that the socket becomes + # non-writeable. + for (my $i = 0; $i < 500; $i++) { + alarm 1; + cmd 'workspace back_and_forth'; + alarm 0; + } +}; +ok(!$@, 'i3 didn\'t hang'); + +# Wait for connection timeout +sleep 1; + +use IO::Select; +my $s = IO::Select->new($sock); +my $reached_eof = 0; +while ($s->can_read(0.05)) { + if (read($sock, my $buffer, 100) == 0) { + $reached_eof = 1; + last; + } +} +ok($reached_eof, 'socket connection closed'); + +close $sock; +done_testing; -- 2.39.5