From 5aa0459be0a8471226dabda620c028c149966b29 Mon Sep 17 00:00:00 2001 From: Orestis Date: Sat, 12 Jan 2019 14:13:03 +0200 Subject: [PATCH] Use ipc queue for all messages (#3585) I was able to reproduce #3579 in Linux by running: `sudo sysctl net.core.wmem_default=10000` If a subscription message was too big to be sent at once, it was possible to break a client by sending a reply to an other message sent by the client. Eg: - Write 8192 out of 11612 bytes of a workspace event. - Blockingly write the reply to a workspace change message. - Write the rest 3420 bytes of the workspace event. This commit fixes this by utilizing the ipc queue for all types of writes. ipc_receive_message can only be called from a callback started in ipc_new_client. This callback uses the same file descriptor with the client also created in ipc_new_client. When the client is deleted, the read callback is now also stopped. Thus, we can assume that whenever ipc_receive_message is called, the corresponding client should still exist. - ipc_client now contains pointers to both write and read watchers. When freed, a client will stop both of them. - IPC_HANDLERs now work with ipc_clients instead of fds. Fixes #3579. --- include/ipc.h | 11 +-- src/ipc.c | 221 +++++++++++++++++++++----------------------------- 2 files changed, 99 insertions(+), 133 deletions(-) diff --git a/include/ipc.h b/include/ipc.h index a1caea82..a7743973 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -35,7 +35,8 @@ typedef struct ipc_client { * event has been sent by i3. */ bool first_tick_sent; - struct ev_io *callback; + struct ev_io *read_callback; + struct ev_io *write_callback; struct ev_timer *timeout; uint8_t *buffer; size_t buffer_size; @@ -54,12 +55,12 @@ typedef struct ipc_client { * message_type is the type of the message as the sender specified it. * */ -typedef void (*handler_t)(int, uint8_t *, int, uint32_t, uint32_t); +typedef void (*handler_t)(ipc_client *, uint8_t *, int, uint32_t, uint32_t); /* Macro to declare a callback */ -#define IPC_HANDLER(name) \ - static void handle_##name(int fd, uint8_t *message, \ - int size, uint32_t message_size, \ +#define IPC_HANDLER(name) \ + static void handle_##name(ipc_client *client, uint8_t *message, \ + int size, uint32_t message_size, \ uint32_t message_type) /** diff --git a/src/ipc.c b/src/ipc.c index 2ad5ae0f..2432d7a5 100644 --- a/src/ipc.c +++ b/src/ipc.c @@ -38,46 +38,6 @@ static void set_nonblock(int sockfd) { err(-1, "Could not set O_NONBLOCK"); } -/* - * Given a message and a message type, create the corresponding header, merge it - * with the message and append it to the given client's output buffer. - * - */ -static void append_payload(ipc_client *client, uint32_t message_type, const char *payload) { - const size_t size = strlen(payload); - const i3_ipc_header_t header = { - .magic = {'i', '3', '-', 'i', 'p', 'c'}, - .size = size, - .type = message_type}; - const size_t header_size = sizeof(i3_ipc_header_t); - const size_t message_size = header_size + size; - - client->buffer = srealloc(client->buffer, client->buffer_size + message_size); - memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size); - memcpy(client->buffer + client->buffer_size + header_size, payload, size); - client->buffer_size += message_size; -} - -static void free_ipc_client(ipc_client *client) { - close(client->fd); - - ev_io_stop(main_loop, client->callback); - FREE(client->callback); - if (client->timeout) { - ev_timer_stop(main_loop, client->timeout); - FREE(client->timeout); - } - - free(client->buffer); - - for (int i = 0; i < client->num_events; i++) { - free(client->events[i]); - } - free(client->events); - TAILQ_REMOVE(&all_clients, client, clients); - free(client); -} - static void ipc_client_timeout(EV_P_ ev_timer *w, int revents); static void ipc_socket_writeable_cb(EV_P_ struct ev_io *w, int revents); @@ -89,8 +49,8 @@ void ipc_set_kill_timeout(ev_tstamp new) { /* * Try to write the contents of the pending buffer to the client's subscription - * socket. Will set, reset or clear the timeout and io callbacks depending on - * the result of the write operation. + * socket. Will set, reset or clear the timeout and io write callbacks depending + * on the result of the write operation. * */ static void ipc_push_pending(ipc_client *client) { @@ -108,13 +68,13 @@ static void ipc_push_pending(ipc_client *client) { ev_timer_stop(main_loop, client->timeout); FREE(client->timeout); } - ev_io_stop(main_loop, client->callback); + ev_io_stop(main_loop, client->write_callback); return; } /* Otherwise, make sure that the io callback is enabled and create a new * timer if needed. */ - ev_io_start(main_loop, client->callback); + ev_io_start(main_loop, client->write_callback); if (!client->timeout) { struct ev_timer *timeout = scalloc(1, sizeof(struct ev_timer)); @@ -140,6 +100,54 @@ static void ipc_push_pending(ipc_client *client) { client->buffer = srealloc(client->buffer, client->buffer_size); } +/* + * Given a message and a message type, create the corresponding header, merge it + * with the message and append it to the given client's output buffer. Also, + * send the message if the client's buffer was empty. + * + */ +static void ipc_send_client_message(ipc_client *client, size_t size, const uint32_t message_type, const uint8_t *payload) { + const i3_ipc_header_t header = { + .magic = {'i', '3', '-', 'i', 'p', 'c'}, + .size = size, + .type = message_type}; + const size_t header_size = sizeof(i3_ipc_header_t); + const size_t message_size = header_size + size; + + const bool push_now = (client->buffer_size == 0); + client->buffer = srealloc(client->buffer, client->buffer_size + message_size); + memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size); + memcpy(client->buffer + client->buffer_size + header_size, payload, size); + client->buffer_size += message_size; + + if (push_now) { + ipc_push_pending(client); + } +} + +static void free_ipc_client(ipc_client *client) { + DLOG("Disconnecting client on fd %d\n", client->fd); + close(client->fd); + + ev_io_stop(main_loop, client->read_callback); + FREE(client->read_callback); + ev_io_stop(main_loop, client->write_callback); + FREE(client->write_callback); + if (client->timeout) { + ev_timer_stop(main_loop, client->timeout); + FREE(client->timeout); + } + + free(client->buffer); + + for (int i = 0; i < client->num_events; i++) { + free(client->events[i]); + } + free(client->events); + TAILQ_REMOVE(&all_clients, client, clients); + free(client); +} + /* * Sends the specified event to all IPC clients which are currently connected * and subscribed to this kind of event. @@ -148,21 +156,11 @@ static void ipc_push_pending(ipc_client *client) { void ipc_send_event(const char *event, uint32_t message_type, const char *payload) { ipc_client *current; TAILQ_FOREACH(current, &all_clients, clients) { - /* see if this client is interested in this event */ - bool interested = false; for (int i = 0; i < current->num_events; i++) { - if (strcasecmp(current->events[i], event) != 0) - continue; - interested = true; - break; - } - if (!interested) - continue; - - const bool push_now = (current->buffer_size == 0); - append_payload(current, message_type, payload); - if (push_now) { - ipc_push_pending(current); + if (strcasecmp(current->events[i], event) == 0) { + ipc_send_client_message(current, strlen(payload), message_type, (uint8_t *)payload); + break; + } } } } @@ -234,8 +232,8 @@ IPC_HANDLER(run_command) { ylength length; yajl_gen_get_buf(gen, &reply, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_COMMAND, - (const uint8_t *)reply); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_COMMAND, + (const uint8_t *)reply); yajl_gen_free(gen); } @@ -843,7 +841,7 @@ IPC_HANDLER(tree) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_TREE, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_TREE, payload); y(free); } @@ -907,7 +905,7 @@ IPC_HANDLER(get_workspaces) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_WORKSPACES, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_WORKSPACES, payload); y(free); } @@ -961,7 +959,7 @@ IPC_HANDLER(get_outputs) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_OUTPUTS, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_OUTPUTS, payload); y(free); } @@ -988,7 +986,7 @@ IPC_HANDLER(get_marks) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_MARKS, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_MARKS, payload); y(free); } @@ -1021,7 +1019,7 @@ IPC_HANDLER(get_version) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_VERSION, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_VERSION, payload); y(free); } @@ -1046,7 +1044,7 @@ IPC_HANDLER(get_bar_config) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload); y(free); return; } @@ -1083,7 +1081,7 @@ IPC_HANDLER(get_bar_config) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload); y(free); } @@ -1105,7 +1103,7 @@ IPC_HANDLER(get_binding_modes) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BINDING_MODES, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BINDING_MODES, payload); y(free); } @@ -1144,21 +1142,6 @@ static int add_subscription(void *extra, const unsigned char *s, IPC_HANDLER(subscribe) { yajl_handle p; yajl_status stat; - ipc_client *current, *client = NULL; - - /* Search the ipc_client structure for this connection */ - TAILQ_FOREACH(current, &all_clients, clients) { - if (current->fd != fd) - continue; - - client = current; - break; - } - - if (client == NULL) { - ELOG("Could not find ipc_client data structure for fd %d\n", fd); - return; - } /* Setup the JSON parser */ static yajl_callbacks callbacks = { @@ -1175,13 +1158,13 @@ IPC_HANDLER(subscribe) { yajl_free_error(p, err); const char *reply = "{\"success\":false}"; - ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply); + ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply); yajl_free(p); return; } yajl_free(p); const char *reply = "{\"success\":true}"; - ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply); + ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply); if (client->first_tick_sent) { return; @@ -1200,7 +1183,7 @@ IPC_HANDLER(subscribe) { client->first_tick_sent = true; const char *payload = "{\"first\":true,\"payload\":\"\"}"; - ipc_send_message(client->fd, strlen(payload), I3_IPC_EVENT_TICK, (const uint8_t *)payload); + ipc_send_client_message(client, strlen(payload), I3_IPC_EVENT_TICK, (const uint8_t *)payload); } /* @@ -1220,7 +1203,7 @@ IPC_HANDLER(get_config) { ylength length; y(get_buf, &payload, &length); - ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_CONFIG, payload); + ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_CONFIG, payload); y(free); } @@ -1249,7 +1232,7 @@ IPC_HANDLER(send_tick) { y(free); const char *reply = "{\"success\":true}"; - ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_TICK, (const uint8_t *)reply); + ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_TICK, (const uint8_t *)reply); DLOG("Sent tick event\n"); } @@ -1300,7 +1283,7 @@ IPC_HANDLER(sync) { yajl_free_error(p, err); const char *reply = "{\"success\":false}"; - ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply); + ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply); yajl_free(p); return; } @@ -1309,7 +1292,7 @@ IPC_HANDLER(sync) { DLOG("received IPC sync request (rnd = %d, window = 0x%08x)\n", state.rnd, state.window); sync_respond(state.window, state.rnd); const char *reply = "{\"success\":true}"; - ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply); + ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply); } /* The index of each callback function corresponds to the numeric @@ -1343,6 +1326,8 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) { uint32_t message_type; uint32_t message_length; uint8_t *message = NULL; + ipc_client *client = (ipc_client *)w->data; + assert(client->fd == w->fd); int ret = ipc_recv_message(w->fd, &message_type, &message_length, &message); /* EOF or other error */ @@ -1355,25 +1340,8 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) { /* If not, there was some kind of error. We don’t bother and close the * connection. Delete the client from the list of clients. */ - bool closed = false; - ipc_client *current; - TAILQ_FOREACH(current, &all_clients, clients) { - if (current->fd != w->fd) - continue; - - free_ipc_client(current); - closed = true; - break; - } - if (!closed) { - close(w->fd); - } - - ev_io_stop(EV_A_ w); - free(w); + free_ipc_client(client); FREE(message); - - DLOG("IPC: client disconnected\n"); return; } @@ -1381,7 +1349,7 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) { DLOG("Unhandled message type: %d\n", message_type); else { handler_t h = handlers[message_type]; - h(w->fd, message, 0, message_length, message_type); + h(client, message, 0, message_length, message_type); } FREE(message); @@ -1453,36 +1421,33 @@ static void ipc_socket_writeable_cb(EV_P_ ev_io *w, int revents) { void ipc_new_client(EV_P_ struct ev_io *w, int revents) { struct sockaddr_un peer; socklen_t len = sizeof(struct sockaddr_un); - int client; - if ((client = accept(w->fd, (struct sockaddr *)&peer, &len)) < 0) { - if (errno == EINTR) - return; - else + int fd; + if ((fd = accept(w->fd, (struct sockaddr *)&peer, &len)) < 0) { + if (errno != EINTR) { perror("accept()"); + } return; } /* Close this file descriptor on exec() */ - (void)fcntl(client, F_SETFD, FD_CLOEXEC); + (void)fcntl(fd, F_SETFD, FD_CLOEXEC); - set_nonblock(client); + set_nonblock(fd); - struct ev_io *package = scalloc(1, sizeof(struct ev_io)); - ev_io_init(package, ipc_receive_message, client, EV_READ); - ev_io_start(EV_A_ package); + ipc_client *client = scalloc(1, sizeof(ipc_client)); + client->fd = fd; - ipc_client *new = scalloc(1, sizeof(ipc_client)); + client->read_callback = scalloc(1, sizeof(struct ev_io)); + client->read_callback->data = client; + ev_io_init(client->read_callback, ipc_receive_message, fd, EV_READ); + ev_io_start(EV_A_ client->read_callback); - package = scalloc(1, sizeof(struct ev_io)); - package->data = new; - ev_io_init(package, ipc_socket_writeable_cb, client, EV_WRITE); + client->write_callback = scalloc(1, sizeof(struct ev_io)); + client->write_callback->data = client; + ev_io_init(client->write_callback, ipc_socket_writeable_cb, fd, EV_WRITE); DLOG("IPC: new client connected on fd %d\n", w->fd); - - new->fd = client; - new->callback = package; - - TAILQ_INSERT_TAIL(&all_clients, new, clients); + TAILQ_INSERT_TAIL(&all_clients, client, clients); } /* -- 2.39.5