]> git.sur5r.net Git - openldap/commitdiff
Simplify write waiter handling
authorHoward Chu <hyc@openldap.org>
Thu, 15 Aug 2013 00:25:11 +0000 (17:25 -0700)
committerHoward Chu <hyc@openldap.org>
Sat, 7 Sep 2013 21:53:02 +0000 (14:53 -0700)
Writer threads do their own wait using select/poll instead of
asking the listener thread. Eliminates one mutex+one condvar
per conn plus multiple wakeups of the listener thread. Also
fixes writetimeout to wait an exact time, instead of the
approximation used in the listener thread.

servers/slapd/connection.c
servers/slapd/daemon.c
servers/slapd/proto-slap.h
servers/slapd/result.c

index c47114c7fafc29af55cc32e874fce8ccdfe3b920..e169494bb0dd2877fc755afa496df8dc79aace5e 100644 (file)
@@ -154,9 +154,7 @@ int connections_destroy(void)
                        ber_sockbuf_free( connections[i].c_sb );
                        ldap_pvt_thread_mutex_destroy( &connections[i].c_mutex );
                        ldap_pvt_thread_mutex_destroy( &connections[i].c_write1_mutex );
-                       ldap_pvt_thread_mutex_destroy( &connections[i].c_write2_mutex );
                        ldap_pvt_thread_cond_destroy( &connections[i].c_write1_cv );
-                       ldap_pvt_thread_cond_destroy( &connections[i].c_write2_cv );
 #ifdef LDAP_SLAPI
                        if ( slapi_plugins_used ) {
                                slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION,
@@ -211,17 +209,13 @@ int connections_timeout_idle(time_t now)
        int i = 0, writers = 0;
        ber_socket_t connindex;
        Connection* c;
-       time_t old;
-
-       old = slapd_get_writetime();
 
        for( c = connection_first( &connindex );
                c != NULL;
                c = connection_next( c, &connindex ) )
        {
                /* Don't timeout a slow-running request or a persistent
-                * outbound connection. But if it has a writewaiter, see
-                * if the waiter has been there too long.
+                * outbound connection.
                 */
                if(( c->c_n_ops_executing && !c->c_writewaiter)
                        || c->c_conn_state == SLAP_C_CLIENT ) {
@@ -236,20 +230,8 @@ int connections_timeout_idle(time_t now)
                        i++;
                        continue;
                }
-               if ( c->c_writewaiter && global_writetimeout ) {
-                       writers = 1;
-                       if( difftime( c->c_activitytime+global_writetimeout, now) < 0 ) {
-                               /* close it */
-                               connection_closing( c, "writetimeout" );
-                               connection_close( c );
-                               i++;
-                               continue;
-                       }
-               }
        }
        connection_done( c );
-       if ( old && !writers )
-               slapd_clr_writetime( old );
 
        return i;
 }
@@ -420,9 +402,7 @@ Connection * connection_init(
                /* should check status of thread calls */
                ldap_pvt_thread_mutex_init( &c->c_mutex );
                ldap_pvt_thread_mutex_init( &c->c_write1_mutex );
-               ldap_pvt_thread_mutex_init( &c->c_write2_mutex );
                ldap_pvt_thread_cond_init( &c->c_write1_cv );
-               ldap_pvt_thread_cond_init( &c->c_write2_cv );
 
 #ifdef LDAP_SLAPI
                if ( slapi_plugins_used ) {
@@ -780,10 +760,7 @@ connection_wake_writers( Connection *c )
                ldap_pvt_thread_cond_broadcast( &c->c_write1_cv );
                ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex );
                if ( c->c_writewaiter ) {
-                       ldap_pvt_thread_mutex_lock( &c->c_write2_mutex );
-                       ldap_pvt_thread_cond_signal( &c->c_write2_cv );
-                       slapd_clr_write( c->c_sd, 1 );
-                       ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex );
+                       slapd_shutsock( c->c_sd );
                }
                ldap_pvt_thread_mutex_lock( &c->c_write1_mutex );
                while ( c->c_writers ) {
@@ -1311,11 +1288,6 @@ int connection_read_activate( ber_socket_t s )
        if ( rc )
                return rc;
 
-       /* Don't let blocked writers block a pause request */
-       if ( connections[s].c_writewaiter &&
-               ldap_pvt_thread_pool_pausing( &connection_pool ))
-               connection_wake_writers( &connections[s] );
-
        rc = ldap_pvt_thread_pool_submit( &connection_pool,
                connection_read_thread, (void *)(long)s );
 
@@ -1918,6 +1890,7 @@ int connection_write(ber_socket_t s)
 {
        Connection *c;
        Operation *op;
+       int wantwrite;
 
        assert( connections != NULL );
 
@@ -1944,14 +1917,13 @@ int connection_write(ber_socket_t s)
        Debug( LDAP_DEBUG_TRACE,
                "connection_write(%d): waking output for id=%lu\n",
                s, c->c_connid, 0 );
-       ldap_pvt_thread_mutex_lock( &c->c_write2_mutex );
-       ldap_pvt_thread_cond_signal( &c->c_write2_cv );
-       ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex );
 
-       if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL ) ) {
-               slapd_set_read( s, 1 );
+       wantwrite = ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL );
+       if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL )) {
+               /* don't wakeup twice */
+               slapd_set_read( s, !wantwrite );
        }
-       if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
+       if ( wantwrite ) {
                slapd_set_write( s, 1 );
        }
 
index 31e4fcf8b7c4c27b72c15e336288d94942b25f0d..299ac62a5b04f3ee54a1b9c5146f220ec266bedd 100644 (file)
 
 #include "ldap_rq.h"
 
+#ifdef HAVE_POLL
+#include <poll.h>
+#endif
+
 #if defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL)
 # include <sys/epoll.h>
 #elif defined(SLAP_X_DEVPOLL) && defined(HAVE_SYS_DEVPOLL_H) && defined(HAVE_DEVPOLL)
@@ -97,8 +101,6 @@ static ldap_pvt_thread_t *listener_tid;
 static ber_socket_t wake_sds[SLAPD_MAX_DAEMON_THREADS][2];
 static int emfile;
 
-static time_t chk_writetime;
-
 static volatile int waking;
 #ifdef NO_THREADS
 #define WAKE_LISTENER(l,w)     do { \
@@ -964,14 +966,6 @@ slapd_set_write( ber_socket_t s, int wake )
                SLAP_SOCK_SET_WRITE( id, s );
                slap_daemon[id].sd_nwriters++;
        }
-       if (( wake & 2 ) && global_writetimeout && !chk_writetime ) {
-               if (id)
-                       ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex );
-               if (!chk_writetime)
-                       chk_writetime = slap_get_time();
-               if (id)
-                       ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex );
-       }
 
        ldap_pvt_thread_mutex_unlock( &slap_daemon[id].sd_mutex );
        WAKE_LISTENER(id,wake);
@@ -1011,25 +1005,6 @@ slapd_set_read( ber_socket_t s, int wake )
                WAKE_LISTENER(id,wake);
 }
 
-time_t
-slapd_get_writetime()
-{
-       time_t cur;
-       ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex );
-       cur = chk_writetime;
-       ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex );
-       return cur;
-}
-
-void
-slapd_clr_writetime( time_t old )
-{
-       ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex );
-       if ( chk_writetime == old )
-               chk_writetime = 0;
-       ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex );
-}
-
 static void
 slapd_close( ber_socket_t s )
 {
@@ -1041,6 +1016,14 @@ slapd_close( ber_socket_t s )
 #endif
 }
 
+void
+slapd_shutsock( ber_socket_t s )
+{
+       Debug( LDAP_DEBUG_CONNS, "daemon: shutdown socket %ld\n",
+               (long) s, 0, 0 );
+       shutdown( SLAP_FD2SOCK(s), 2 );
+}
+
 static void
 slap_free_listener_addresses( struct sockaddr **sal )
 {
@@ -2365,18 +2348,13 @@ loop:
 
                now = slap_get_time();
 
-               if ( !tid && ( global_idletimeout > 0 || chk_writetime )) {
+               if ( !tid && ( global_idletimeout > 0 )) {
                        int check = 0;
                        /* Set the select timeout.
                         * Don't just truncate, preserve the fractions of
                         * seconds to prevent sleeping for zero time.
                         */
-                       if ( chk_writetime ) {
-                               tv.tv_sec = global_writetimeout;
-                               tv.tv_usec = 0;
-                               if ( difftime( chk_writetime, now ) < 0 )
-                                       check = 2;
-                       } else {
+                       {
                                tv.tv_sec = global_idletimeout / SLAPD_IDLE_CHECK_LIMIT;
                                tv.tv_usec = global_idletimeout - \
                                        ( tv.tv_sec * SLAPD_IDLE_CHECK_LIMIT );
@@ -2453,7 +2431,7 @@ loop:
 
                nfds = SLAP_EVENT_MAX(tid);
 
-               if (( chk_writetime || global_idletimeout ) && slap_daemon[tid].sd_nactives ) at = 1;
+               if (( global_idletimeout ) && slap_daemon[tid].sd_nactives ) at = 1;
 
                ldap_pvt_thread_mutex_unlock( &slap_daemon[tid].sd_mutex );
 
@@ -3083,3 +3061,35 @@ slap_wake_listener()
 {
        WAKE_LISTENER(0,1);
 }
+
+/* return 0 on timeout, 1 on writer ready
+ * -1 on general error
+ */
+int
+slapd_wait_writer( ber_socket_t sd )
+{
+#ifdef HAVE_WINSOCK
+       fd_set writefds;
+       struct timeval tv, *tvp;
+       int i;
+
+       FD_ZERO( &writefds );
+       FD_SET( slapd_ws_sockets[sd], &writefds );
+       if ( global_writetimeout ) {
+               tv.tv_sec = global_writetimeout;
+               tv.tv_usec = 0;
+               tvp = &tv;
+       } else {
+               tv = NULL;
+       }
+       return select( 0, NULL, &writefds, NULL, tvp );
+#else
+       struct pollfd fds;
+       int timeout = global_writetimeout ? global_writetimeout * 1000 : -1;
+
+       fds.fd = sd;
+       fds.events = POLLOUT;
+
+       return poll( &fds, 1, timeout );
+#endif
+}
index 31c73da16e214f6e3ca3bccf570c413b90bb11f7..ccf3384aa6392470e27cdd04291fb6e86e931d5d 100644 (file)
@@ -877,8 +877,8 @@ LDAP_SLAPD_F (void) slapd_set_write LDAP_P((ber_socket_t s, int wake));
 LDAP_SLAPD_F (void) slapd_clr_write LDAP_P((ber_socket_t s, int wake));
 LDAP_SLAPD_F (void) slapd_set_read LDAP_P((ber_socket_t s, int wake));
 LDAP_SLAPD_F (int) slapd_clr_read LDAP_P((ber_socket_t s, int wake));
-LDAP_SLAPD_F (void) slapd_clr_writetime LDAP_P((time_t old));
-LDAP_SLAPD_F (time_t) slapd_get_writetime LDAP_P((void));
+LDAP_SLAPD_F (int) slapd_wait_writer( ber_socket_t sd );
+LDAP_SLAPD_F (void) slapd_shutsock( ber_socket_t sd );
 
 LDAP_SLAPD_V (volatile sig_atomic_t) slapd_abrupt_shutdown;
 LDAP_SLAPD_V (volatile sig_atomic_t) slapd_shutdown;
index 49d742933900c385fa858e09f43a20b82c9a10b8..5858496b14d093d41f20e4c6012f7f878bd71526 100644 (file)
@@ -286,6 +286,7 @@ static long send_ldap_ber(
        Connection *conn = op->o_conn;
        ber_len_t bytes;
        long ret = 0;
+       char *close_reason;
 
        ber_get_option( ber, LBER_OPT_BER_BYTES_TO_WRITE, &bytes );
 
@@ -300,7 +301,9 @@ static long send_ldap_ber(
        conn->c_writers++;
 
        while ( conn->c_writers > 0 && conn->c_writing ) {
+               ldap_pvt_thread_pool_idle( &connection_pool );
                ldap_pvt_thread_cond_wait( &conn->c_write1_cv, &conn->c_write1_mutex );
+               ldap_pvt_thread_pool_unidle( &connection_pool );
        }
 
        /* connection was closed under us */
@@ -337,28 +340,36 @@ static long send_ldap_ber(
                    err, sock_errstr(err), 0 );
 
                if ( err != EWOULDBLOCK && err != EAGAIN ) {
+                       close_reason = "connection lost on write";
+fail:
                        conn->c_writers--;
                        conn->c_writing = 0;
                        ldap_pvt_thread_mutex_unlock( &conn->c_write1_mutex );
                        ldap_pvt_thread_mutex_lock( &conn->c_mutex );
-                       connection_closing( conn, "connection lost on write" );
-
+                       connection_closing( conn, close_reason );
                        ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
                        return -1;
                }
 
                /* wait for socket to be write-ready */
-               ldap_pvt_thread_mutex_lock( &conn->c_write2_mutex );
                conn->c_writewaiter = 1;
-               slapd_set_write( conn->c_sd, 2 );
-
                ldap_pvt_thread_mutex_unlock( &conn->c_write1_mutex );
                ldap_pvt_thread_pool_idle( &connection_pool );
-               ldap_pvt_thread_cond_wait( &conn->c_write2_cv, &conn->c_write2_mutex );
+               err = slapd_wait_writer( conn->c_sd );
                conn->c_writewaiter = 0;
-               ldap_pvt_thread_mutex_unlock( &conn->c_write2_mutex );
                ldap_pvt_thread_pool_unidle( &connection_pool );
                ldap_pvt_thread_mutex_lock( &conn->c_write1_mutex );
+               /* 0 is timeout, so we close it.
+                * -1 is an error, close it.
+                */
+               if ( err <= 0 ) {
+                       if ( err == 0 )
+                               close_reason = "writetimeout";
+                       else
+                               close_reason = "connection lost on writewait";
+                       goto fail;
+               }
+
                if ( conn->c_writers < 0 ) {
                        ret = 0;
                        break;