]> git.sur5r.net Git - i3/i3/commitdiff
Use ipc queue for all messages (#3585)
authorOrestis <orestisf1993@gmail.com>
Sat, 12 Jan 2019 12:13:03 +0000 (14:13 +0200)
committerMichael Stapelberg <stapelberg@users.noreply.github.com>
Sat, 12 Jan 2019 12:13:03 +0000 (13:13 +0100)
I was able to reproduce #3579 in Linux by running:
`sudo sysctl net.core.wmem_default=10000`

If a subscription message was too big to be sent at once, it was
possible to break a client by sending a reply to an other message sent
by the client. Eg:
- Write 8192 out of 11612 bytes of a workspace event.
- Blockingly write the reply to a workspace change message.
- Write the rest 3420 bytes of the workspace event.

This commit fixes this by utilizing the ipc queue for all types of
writes.

ipc_receive_message can only be called from a callback started in
ipc_new_client. This callback uses the same file descriptor with the
client also created in ipc_new_client. When the client is deleted, the
read callback is now also stopped. Thus, we can assume that whenever
ipc_receive_message is called, the corresponding client should still
exist.

- ipc_client now contains pointers to both write and read watchers. When
freed, a client will stop both of them.
- IPC_HANDLERs now work with ipc_clients instead of fds.

Fixes #3579.

include/ipc.h
src/ipc.c

index a1caea827fb7a549cbcde1a3c5c973e14e1ab987..a77439737aea46066961114f32464ce54e5eb7f5 100644 (file)
@@ -35,7 +35,8 @@ typedef struct ipc_client {
      * event has been sent by i3. */
     bool first_tick_sent;
 
-    struct ev_io *callback;
+    struct ev_io *read_callback;
+    struct ev_io *write_callback;
     struct ev_timer *timeout;
     uint8_t *buffer;
     size_t buffer_size;
@@ -54,12 +55,12 @@ typedef struct ipc_client {
  * message_type is the type of the message as the sender specified it.
  *
  */
-typedef void (*handler_t)(int, uint8_t *, int, uint32_t, uint32_t);
+typedef void (*handler_t)(ipc_client *, uint8_t *, int, uint32_t, uint32_t);
 
 /* Macro to declare a callback */
-#define IPC_HANDLER(name)                                      \
-    static void handle_##name(int fd, uint8_t *message,        \
-                              int size, uint32_t message_size, \
+#define IPC_HANDLER(name)                                           \
+    static void handle_##name(ipc_client *client, uint8_t *message, \
+                              int size, uint32_t message_size,      \
                               uint32_t message_type)
 
 /**
index 2ad5ae0f4e3bef4e41325b165fa82e87098e3fb1..2432d7a543451a2059521ca4e7456c330325f3de 100644 (file)
--- a/src/ipc.c
+++ b/src/ipc.c
@@ -38,46 +38,6 @@ static void set_nonblock(int sockfd) {
         err(-1, "Could not set O_NONBLOCK");
 }
 
-/*
- * Given a message and a message type, create the corresponding header, merge it
- * with the message and append it to the given client's output buffer.
- *
- */
-static void append_payload(ipc_client *client, uint32_t message_type, const char *payload) {
-    const size_t size = strlen(payload);
-    const i3_ipc_header_t header = {
-        .magic = {'i', '3', '-', 'i', 'p', 'c'},
-        .size = size,
-        .type = message_type};
-    const size_t header_size = sizeof(i3_ipc_header_t);
-    const size_t message_size = header_size + size;
-
-    client->buffer = srealloc(client->buffer, client->buffer_size + message_size);
-    memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size);
-    memcpy(client->buffer + client->buffer_size + header_size, payload, size);
-    client->buffer_size += message_size;
-}
-
-static void free_ipc_client(ipc_client *client) {
-    close(client->fd);
-
-    ev_io_stop(main_loop, client->callback);
-    FREE(client->callback);
-    if (client->timeout) {
-        ev_timer_stop(main_loop, client->timeout);
-        FREE(client->timeout);
-    }
-
-    free(client->buffer);
-
-    for (int i = 0; i < client->num_events; i++) {
-        free(client->events[i]);
-    }
-    free(client->events);
-    TAILQ_REMOVE(&all_clients, client, clients);
-    free(client);
-}
-
 static void ipc_client_timeout(EV_P_ ev_timer *w, int revents);
 static void ipc_socket_writeable_cb(EV_P_ struct ev_io *w, int revents);
 
@@ -89,8 +49,8 @@ void ipc_set_kill_timeout(ev_tstamp new) {
 
 /*
  * Try to write the contents of the pending buffer to the client's subscription
- * socket. Will set, reset or clear the timeout and io callbacks depending on
- * the result of the write operation.
+ * socket. Will set, reset or clear the timeout and io write callbacks depending
+ * on the result of the write operation.
  *
  */
 static void ipc_push_pending(ipc_client *client) {
@@ -108,13 +68,13 @@ static void ipc_push_pending(ipc_client *client) {
             ev_timer_stop(main_loop, client->timeout);
             FREE(client->timeout);
         }
-        ev_io_stop(main_loop, client->callback);
+        ev_io_stop(main_loop, client->write_callback);
         return;
     }
 
     /* Otherwise, make sure that the io callback is enabled and create a new
      * timer if needed. */
-    ev_io_start(main_loop, client->callback);
+    ev_io_start(main_loop, client->write_callback);
 
     if (!client->timeout) {
         struct ev_timer *timeout = scalloc(1, sizeof(struct ev_timer));
@@ -140,6 +100,54 @@ static void ipc_push_pending(ipc_client *client) {
     client->buffer = srealloc(client->buffer, client->buffer_size);
 }
 
+/*
+ * Given a message and a message type, create the corresponding header, merge it
+ * with the message and append it to the given client's output buffer. Also,
+ * send the message if the client's buffer was empty.
+ *
+ */
+static void ipc_send_client_message(ipc_client *client, size_t size, const uint32_t message_type, const uint8_t *payload) {
+    const i3_ipc_header_t header = {
+        .magic = {'i', '3', '-', 'i', 'p', 'c'},
+        .size = size,
+        .type = message_type};
+    const size_t header_size = sizeof(i3_ipc_header_t);
+    const size_t message_size = header_size + size;
+
+    const bool push_now = (client->buffer_size == 0);
+    client->buffer = srealloc(client->buffer, client->buffer_size + message_size);
+    memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size);
+    memcpy(client->buffer + client->buffer_size + header_size, payload, size);
+    client->buffer_size += message_size;
+
+    if (push_now) {
+        ipc_push_pending(client);
+    }
+}
+
+static void free_ipc_client(ipc_client *client) {
+    DLOG("Disconnecting client on fd %d\n", client->fd);
+    close(client->fd);
+
+    ev_io_stop(main_loop, client->read_callback);
+    FREE(client->read_callback);
+    ev_io_stop(main_loop, client->write_callback);
+    FREE(client->write_callback);
+    if (client->timeout) {
+        ev_timer_stop(main_loop, client->timeout);
+        FREE(client->timeout);
+    }
+
+    free(client->buffer);
+
+    for (int i = 0; i < client->num_events; i++) {
+        free(client->events[i]);
+    }
+    free(client->events);
+    TAILQ_REMOVE(&all_clients, client, clients);
+    free(client);
+}
+
 /*
  * Sends the specified event to all IPC clients which are currently connected
  * and subscribed to this kind of event.
@@ -148,21 +156,11 @@ static void ipc_push_pending(ipc_client *client) {
 void ipc_send_event(const char *event, uint32_t message_type, const char *payload) {
     ipc_client *current;
     TAILQ_FOREACH(current, &all_clients, clients) {
-        /* see if this client is interested in this event */
-        bool interested = false;
         for (int i = 0; i < current->num_events; i++) {
-            if (strcasecmp(current->events[i], event) != 0)
-                continue;
-            interested = true;
-            break;
-        }
-        if (!interested)
-            continue;
-
-        const bool push_now = (current->buffer_size == 0);
-        append_payload(current, message_type, payload);
-        if (push_now) {
-            ipc_push_pending(current);
+            if (strcasecmp(current->events[i], event) == 0) {
+                ipc_send_client_message(current, strlen(payload), message_type, (uint8_t *)payload);
+                break;
+            }
         }
     }
 }
@@ -234,8 +232,8 @@ IPC_HANDLER(run_command) {
     ylength length;
     yajl_gen_get_buf(gen, &reply, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_COMMAND,
-                     (const uint8_t *)reply);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_COMMAND,
+                            (const uint8_t *)reply);
 
     yajl_gen_free(gen);
 }
@@ -843,7 +841,7 @@ IPC_HANDLER(tree) {
     ylength length;
     y(get_buf, &payload, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_TREE, payload);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_TREE, payload);
     y(free);
 }
 
@@ -907,7 +905,7 @@ IPC_HANDLER(get_workspaces) {
     ylength length;
     y(get_buf, &payload, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_WORKSPACES, payload);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_WORKSPACES, payload);
     y(free);
 }
 
@@ -961,7 +959,7 @@ IPC_HANDLER(get_outputs) {
     ylength length;
     y(get_buf, &payload, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_OUTPUTS, payload);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_OUTPUTS, payload);
     y(free);
 }
 
@@ -988,7 +986,7 @@ IPC_HANDLER(get_marks) {
     ylength length;
     y(get_buf, &payload, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_MARKS, payload);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_MARKS, payload);
     y(free);
 }
 
@@ -1021,7 +1019,7 @@ IPC_HANDLER(get_version) {
     ylength length;
     y(get_buf, &payload, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_VERSION, payload);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_VERSION, payload);
     y(free);
 }
 
@@ -1046,7 +1044,7 @@ IPC_HANDLER(get_bar_config) {
         ylength length;
         y(get_buf, &payload, &length);
 
-        ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
+        ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
         y(free);
         return;
     }
@@ -1083,7 +1081,7 @@ IPC_HANDLER(get_bar_config) {
     ylength length;
     y(get_buf, &payload, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
     y(free);
 }
 
@@ -1105,7 +1103,7 @@ IPC_HANDLER(get_binding_modes) {
     ylength length;
     y(get_buf, &payload, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BINDING_MODES, payload);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BINDING_MODES, payload);
     y(free);
 }
 
@@ -1144,21 +1142,6 @@ static int add_subscription(void *extra, const unsigned char *s,
 IPC_HANDLER(subscribe) {
     yajl_handle p;
     yajl_status stat;
-    ipc_client *current, *client = NULL;
-
-    /* Search the ipc_client structure for this connection */
-    TAILQ_FOREACH(current, &all_clients, clients) {
-        if (current->fd != fd)
-            continue;
-
-        client = current;
-        break;
-    }
-
-    if (client == NULL) {
-        ELOG("Could not find ipc_client data structure for fd %d\n", fd);
-        return;
-    }
 
     /* Setup the JSON parser */
     static yajl_callbacks callbacks = {
@@ -1175,13 +1158,13 @@ IPC_HANDLER(subscribe) {
         yajl_free_error(p, err);
 
         const char *reply = "{\"success\":false}";
-        ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
+        ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
         yajl_free(p);
         return;
     }
     yajl_free(p);
     const char *reply = "{\"success\":true}";
-    ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
+    ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
 
     if (client->first_tick_sent) {
         return;
@@ -1200,7 +1183,7 @@ IPC_HANDLER(subscribe) {
 
     client->first_tick_sent = true;
     const char *payload = "{\"first\":true,\"payload\":\"\"}";
-    ipc_send_message(client->fd, strlen(payload), I3_IPC_EVENT_TICK, (const uint8_t *)payload);
+    ipc_send_client_message(client, strlen(payload), I3_IPC_EVENT_TICK, (const uint8_t *)payload);
 }
 
 /*
@@ -1220,7 +1203,7 @@ IPC_HANDLER(get_config) {
     ylength length;
     y(get_buf, &payload, &length);
 
-    ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_CONFIG, payload);
+    ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_CONFIG, payload);
     y(free);
 }
 
@@ -1249,7 +1232,7 @@ IPC_HANDLER(send_tick) {
     y(free);
 
     const char *reply = "{\"success\":true}";
-    ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_TICK, (const uint8_t *)reply);
+    ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_TICK, (const uint8_t *)reply);
     DLOG("Sent tick event\n");
 }
 
@@ -1300,7 +1283,7 @@ IPC_HANDLER(sync) {
         yajl_free_error(p, err);
 
         const char *reply = "{\"success\":false}";
-        ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
+        ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
         yajl_free(p);
         return;
     }
@@ -1309,7 +1292,7 @@ IPC_HANDLER(sync) {
     DLOG("received IPC sync request (rnd = %d, window = 0x%08x)\n", state.rnd, state.window);
     sync_respond(state.window, state.rnd);
     const char *reply = "{\"success\":true}";
-    ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
+    ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
 }
 
 /* The index of each callback function corresponds to the numeric
@@ -1343,6 +1326,8 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
     uint32_t message_type;
     uint32_t message_length;
     uint8_t *message = NULL;
+    ipc_client *client = (ipc_client *)w->data;
+    assert(client->fd == w->fd);
 
     int ret = ipc_recv_message(w->fd, &message_type, &message_length, &message);
     /* EOF or other error */
@@ -1355,25 +1340,8 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
 
         /* If not, there was some kind of error. We don’t bother and close the
          * connection. Delete the client from the list of clients. */
-        bool closed = false;
-        ipc_client *current;
-        TAILQ_FOREACH(current, &all_clients, clients) {
-            if (current->fd != w->fd)
-                continue;
-
-            free_ipc_client(current);
-            closed = true;
-            break;
-        }
-        if (!closed) {
-            close(w->fd);
-        }
-
-        ev_io_stop(EV_A_ w);
-        free(w);
+        free_ipc_client(client);
         FREE(message);
-
-        DLOG("IPC: client disconnected\n");
         return;
     }
 
@@ -1381,7 +1349,7 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
         DLOG("Unhandled message type: %d\n", message_type);
     else {
         handler_t h = handlers[message_type];
-        h(w->fd, message, 0, message_length, message_type);
+        h(client, message, 0, message_length, message_type);
     }
 
     FREE(message);
@@ -1453,36 +1421,33 @@ static void ipc_socket_writeable_cb(EV_P_ ev_io *w, int revents) {
 void ipc_new_client(EV_P_ struct ev_io *w, int revents) {
     struct sockaddr_un peer;
     socklen_t len = sizeof(struct sockaddr_un);
-    int client;
-    if ((client = accept(w->fd, (struct sockaddr *)&peer, &len)) < 0) {
-        if (errno == EINTR)
-            return;
-        else
+    int fd;
+    if ((fd = accept(w->fd, (struct sockaddr *)&peer, &len)) < 0) {
+        if (errno != EINTR) {
             perror("accept()");
+        }
         return;
     }
 
     /* Close this file descriptor on exec() */
-    (void)fcntl(client, F_SETFD, FD_CLOEXEC);
+    (void)fcntl(fd, F_SETFD, FD_CLOEXEC);
 
-    set_nonblock(client);
+    set_nonblock(fd);
 
-    struct ev_io *package = scalloc(1, sizeof(struct ev_io));
-    ev_io_init(package, ipc_receive_message, client, EV_READ);
-    ev_io_start(EV_A_ package);
+    ipc_client *client = scalloc(1, sizeof(ipc_client));
+    client->fd = fd;
 
-    ipc_client *new = scalloc(1, sizeof(ipc_client));
+    client->read_callback = scalloc(1, sizeof(struct ev_io));
+    client->read_callback->data = client;
+    ev_io_init(client->read_callback, ipc_receive_message, fd, EV_READ);
+    ev_io_start(EV_A_ client->read_callback);
 
-    package = scalloc(1, sizeof(struct ev_io));
-    package->data = new;
-    ev_io_init(package, ipc_socket_writeable_cb, client, EV_WRITE);
+    client->write_callback = scalloc(1, sizeof(struct ev_io));
+    client->write_callback->data = client;
+    ev_io_init(client->write_callback, ipc_socket_writeable_cb, fd, EV_WRITE);
 
     DLOG("IPC: new client connected on fd %d\n", w->fd);
-
-    new->fd = client;
-    new->callback = package;
-
-    TAILQ_INSERT_TAIL(&all_clients, new, clients);
+    TAILQ_INSERT_TAIL(&all_clients, client, clients);
 }
 
 /*