From: Howard Chu Date: Thu, 15 Aug 2013 00:25:11 +0000 (-0700) Subject: Simplify write waiter handling X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=88d22a1ca3b28e10e6557e8aebbd75dd61fed511;p=openldap Simplify write waiter handling Writer threads do their own wait using select/poll instead of asking the listener thread. Eliminates one mutex+one condvar per conn plus multiple wakeups of the listener thread. Also fixes writetimeout to wait an exact time, instead of the approximation used in the listener thread. --- diff --git a/servers/slapd/connection.c b/servers/slapd/connection.c index c47114c7fa..e169494bb0 100644 --- a/servers/slapd/connection.c +++ b/servers/slapd/connection.c @@ -154,9 +154,7 @@ int connections_destroy(void) ber_sockbuf_free( connections[i].c_sb ); ldap_pvt_thread_mutex_destroy( &connections[i].c_mutex ); ldap_pvt_thread_mutex_destroy( &connections[i].c_write1_mutex ); - ldap_pvt_thread_mutex_destroy( &connections[i].c_write2_mutex ); ldap_pvt_thread_cond_destroy( &connections[i].c_write1_cv ); - ldap_pvt_thread_cond_destroy( &connections[i].c_write2_cv ); #ifdef LDAP_SLAPI if ( slapi_plugins_used ) { slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION, @@ -211,17 +209,13 @@ int connections_timeout_idle(time_t now) int i = 0, writers = 0; ber_socket_t connindex; Connection* c; - time_t old; - - old = slapd_get_writetime(); for( c = connection_first( &connindex ); c != NULL; c = connection_next( c, &connindex ) ) { /* Don't timeout a slow-running request or a persistent - * outbound connection. But if it has a writewaiter, see - * if the waiter has been there too long. + * outbound connection. */ if(( c->c_n_ops_executing && !c->c_writewaiter) || c->c_conn_state == SLAP_C_CLIENT ) { @@ -236,20 +230,8 @@ int connections_timeout_idle(time_t now) i++; continue; } - if ( c->c_writewaiter && global_writetimeout ) { - writers = 1; - if( difftime( c->c_activitytime+global_writetimeout, now) < 0 ) { - /* close it */ - connection_closing( c, "writetimeout" ); - connection_close( c ); - i++; - continue; - } - } } connection_done( c ); - if ( old && !writers ) - slapd_clr_writetime( old ); return i; } @@ -420,9 +402,7 @@ Connection * connection_init( /* should check status of thread calls */ ldap_pvt_thread_mutex_init( &c->c_mutex ); ldap_pvt_thread_mutex_init( &c->c_write1_mutex ); - ldap_pvt_thread_mutex_init( &c->c_write2_mutex ); ldap_pvt_thread_cond_init( &c->c_write1_cv ); - ldap_pvt_thread_cond_init( &c->c_write2_cv ); #ifdef LDAP_SLAPI if ( slapi_plugins_used ) { @@ -780,10 +760,7 @@ connection_wake_writers( Connection *c ) ldap_pvt_thread_cond_broadcast( &c->c_write1_cv ); ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex ); if ( c->c_writewaiter ) { - ldap_pvt_thread_mutex_lock( &c->c_write2_mutex ); - ldap_pvt_thread_cond_signal( &c->c_write2_cv ); - slapd_clr_write( c->c_sd, 1 ); - ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex ); + slapd_shutsock( c->c_sd ); } ldap_pvt_thread_mutex_lock( &c->c_write1_mutex ); while ( c->c_writers ) { @@ -1311,11 +1288,6 @@ int connection_read_activate( ber_socket_t s ) if ( rc ) return rc; - /* Don't let blocked writers block a pause request */ - if ( connections[s].c_writewaiter && - ldap_pvt_thread_pool_pausing( &connection_pool )) - connection_wake_writers( &connections[s] ); - rc = ldap_pvt_thread_pool_submit( &connection_pool, connection_read_thread, (void *)(long)s ); @@ -1918,6 +1890,7 @@ int connection_write(ber_socket_t s) { Connection *c; Operation *op; + int wantwrite; assert( connections != NULL ); @@ -1944,14 +1917,13 @@ int connection_write(ber_socket_t s) Debug( LDAP_DEBUG_TRACE, "connection_write(%d): waking output for id=%lu\n", s, c->c_connid, 0 ); - ldap_pvt_thread_mutex_lock( &c->c_write2_mutex ); - ldap_pvt_thread_cond_signal( &c->c_write2_cv ); - ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex ); - if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL ) ) { - slapd_set_read( s, 1 ); + wantwrite = ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ); + if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL )) { + /* don't wakeup twice */ + slapd_set_read( s, !wantwrite ); } - if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) { + if ( wantwrite ) { slapd_set_write( s, 1 ); } diff --git a/servers/slapd/daemon.c b/servers/slapd/daemon.c index 31e4fcf8b7..299ac62a5b 100644 --- a/servers/slapd/daemon.c +++ b/servers/slapd/daemon.c @@ -41,6 +41,10 @@ #include "ldap_rq.h" +#ifdef HAVE_POLL +#include +#endif + #if defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL) # include #elif defined(SLAP_X_DEVPOLL) && defined(HAVE_SYS_DEVPOLL_H) && defined(HAVE_DEVPOLL) @@ -97,8 +101,6 @@ static ldap_pvt_thread_t *listener_tid; static ber_socket_t wake_sds[SLAPD_MAX_DAEMON_THREADS][2]; static int emfile; -static time_t chk_writetime; - static volatile int waking; #ifdef NO_THREADS #define WAKE_LISTENER(l,w) do { \ @@ -964,14 +966,6 @@ slapd_set_write( ber_socket_t s, int wake ) SLAP_SOCK_SET_WRITE( id, s ); slap_daemon[id].sd_nwriters++; } - if (( wake & 2 ) && global_writetimeout && !chk_writetime ) { - if (id) - ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex ); - if (!chk_writetime) - chk_writetime = slap_get_time(); - if (id) - ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex ); - } ldap_pvt_thread_mutex_unlock( &slap_daemon[id].sd_mutex ); WAKE_LISTENER(id,wake); @@ -1011,25 +1005,6 @@ slapd_set_read( ber_socket_t s, int wake ) WAKE_LISTENER(id,wake); } -time_t -slapd_get_writetime() -{ - time_t cur; - ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex ); - cur = chk_writetime; - ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex ); - return cur; -} - -void -slapd_clr_writetime( time_t old ) -{ - ldap_pvt_thread_mutex_lock( &slap_daemon[0].sd_mutex ); - if ( chk_writetime == old ) - chk_writetime = 0; - ldap_pvt_thread_mutex_unlock( &slap_daemon[0].sd_mutex ); -} - static void slapd_close( ber_socket_t s ) { @@ -1041,6 +1016,14 @@ slapd_close( ber_socket_t s ) #endif } +void +slapd_shutsock( ber_socket_t s ) +{ + Debug( LDAP_DEBUG_CONNS, "daemon: shutdown socket %ld\n", + (long) s, 0, 0 ); + shutdown( SLAP_FD2SOCK(s), 2 ); +} + static void slap_free_listener_addresses( struct sockaddr **sal ) { @@ -2365,18 +2348,13 @@ loop: now = slap_get_time(); - if ( !tid && ( global_idletimeout > 0 || chk_writetime )) { + if ( !tid && ( global_idletimeout > 0 )) { int check = 0; /* Set the select timeout. * Don't just truncate, preserve the fractions of * seconds to prevent sleeping for zero time. */ - if ( chk_writetime ) { - tv.tv_sec = global_writetimeout; - tv.tv_usec = 0; - if ( difftime( chk_writetime, now ) < 0 ) - check = 2; - } else { + { tv.tv_sec = global_idletimeout / SLAPD_IDLE_CHECK_LIMIT; tv.tv_usec = global_idletimeout - \ ( tv.tv_sec * SLAPD_IDLE_CHECK_LIMIT ); @@ -2453,7 +2431,7 @@ loop: nfds = SLAP_EVENT_MAX(tid); - if (( chk_writetime || global_idletimeout ) && slap_daemon[tid].sd_nactives ) at = 1; + if (( global_idletimeout ) && slap_daemon[tid].sd_nactives ) at = 1; ldap_pvt_thread_mutex_unlock( &slap_daemon[tid].sd_mutex ); @@ -3083,3 +3061,35 @@ slap_wake_listener() { WAKE_LISTENER(0,1); } + +/* return 0 on timeout, 1 on writer ready + * -1 on general error + */ +int +slapd_wait_writer( ber_socket_t sd ) +{ +#ifdef HAVE_WINSOCK + fd_set writefds; + struct timeval tv, *tvp; + int i; + + FD_ZERO( &writefds ); + FD_SET( slapd_ws_sockets[sd], &writefds ); + if ( global_writetimeout ) { + tv.tv_sec = global_writetimeout; + tv.tv_usec = 0; + tvp = &tv; + } else { + tv = NULL; + } + return select( 0, NULL, &writefds, NULL, tvp ); +#else + struct pollfd fds; + int timeout = global_writetimeout ? global_writetimeout * 1000 : -1; + + fds.fd = sd; + fds.events = POLLOUT; + + return poll( &fds, 1, timeout ); +#endif +} diff --git a/servers/slapd/proto-slap.h b/servers/slapd/proto-slap.h index 31c73da16e..ccf3384aa6 100644 --- a/servers/slapd/proto-slap.h +++ b/servers/slapd/proto-slap.h @@ -877,8 +877,8 @@ LDAP_SLAPD_F (void) slapd_set_write LDAP_P((ber_socket_t s, int wake)); LDAP_SLAPD_F (void) slapd_clr_write LDAP_P((ber_socket_t s, int wake)); LDAP_SLAPD_F (void) slapd_set_read LDAP_P((ber_socket_t s, int wake)); LDAP_SLAPD_F (int) slapd_clr_read LDAP_P((ber_socket_t s, int wake)); -LDAP_SLAPD_F (void) slapd_clr_writetime LDAP_P((time_t old)); -LDAP_SLAPD_F (time_t) slapd_get_writetime LDAP_P((void)); +LDAP_SLAPD_F (int) slapd_wait_writer( ber_socket_t sd ); +LDAP_SLAPD_F (void) slapd_shutsock( ber_socket_t sd ); LDAP_SLAPD_V (volatile sig_atomic_t) slapd_abrupt_shutdown; LDAP_SLAPD_V (volatile sig_atomic_t) slapd_shutdown; diff --git a/servers/slapd/result.c b/servers/slapd/result.c index 49d7429339..5858496b14 100644 --- a/servers/slapd/result.c +++ b/servers/slapd/result.c @@ -286,6 +286,7 @@ static long send_ldap_ber( Connection *conn = op->o_conn; ber_len_t bytes; long ret = 0; + char *close_reason; ber_get_option( ber, LBER_OPT_BER_BYTES_TO_WRITE, &bytes ); @@ -300,7 +301,9 @@ static long send_ldap_ber( conn->c_writers++; while ( conn->c_writers > 0 && conn->c_writing ) { + ldap_pvt_thread_pool_idle( &connection_pool ); ldap_pvt_thread_cond_wait( &conn->c_write1_cv, &conn->c_write1_mutex ); + ldap_pvt_thread_pool_unidle( &connection_pool ); } /* connection was closed under us */ @@ -337,28 +340,36 @@ static long send_ldap_ber( err, sock_errstr(err), 0 ); if ( err != EWOULDBLOCK && err != EAGAIN ) { + close_reason = "connection lost on write"; +fail: conn->c_writers--; conn->c_writing = 0; ldap_pvt_thread_mutex_unlock( &conn->c_write1_mutex ); ldap_pvt_thread_mutex_lock( &conn->c_mutex ); - connection_closing( conn, "connection lost on write" ); - + connection_closing( conn, close_reason ); ldap_pvt_thread_mutex_unlock( &conn->c_mutex ); return -1; } /* wait for socket to be write-ready */ - ldap_pvt_thread_mutex_lock( &conn->c_write2_mutex ); conn->c_writewaiter = 1; - slapd_set_write( conn->c_sd, 2 ); - ldap_pvt_thread_mutex_unlock( &conn->c_write1_mutex ); ldap_pvt_thread_pool_idle( &connection_pool ); - ldap_pvt_thread_cond_wait( &conn->c_write2_cv, &conn->c_write2_mutex ); + err = slapd_wait_writer( conn->c_sd ); conn->c_writewaiter = 0; - ldap_pvt_thread_mutex_unlock( &conn->c_write2_mutex ); ldap_pvt_thread_pool_unidle( &connection_pool ); ldap_pvt_thread_mutex_lock( &conn->c_write1_mutex ); + /* 0 is timeout, so we close it. + * -1 is an error, close it. + */ + if ( err <= 0 ) { + if ( err == 0 ) + close_reason = "writetimeout"; + else + close_reason = "connection lost on writewait"; + goto fail; + } + if ( conn->c_writers < 0 ) { ret = 0; break;