X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=servers%2Fslapd%2Fconnection.c;h=7d14e0c81875bdfd0ff815b358c1f78308a8e38e;hb=ef7f5f5e32e6e0f129aee7fa1626017a7dadcb48;hp=d1794964979b55e22fef6be199d34899ad900dae;hpb=68ebee4726e0080a4dc3c07add3012b65a105a31;p=openldap diff --git a/servers/slapd/connection.c b/servers/slapd/connection.c index d179496497..7d14e0c818 100644 --- a/servers/slapd/connection.c +++ b/servers/slapd/connection.c @@ -1,7 +1,7 @@ /* $OpenLDAP$ */ /* This work is part of OpenLDAP Software . * - * Copyright 1998-2007 The OpenLDAP Foundation. + * Copyright 1998-2011 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -39,6 +39,10 @@ #include "lutil.h" #include "slap.h" +#ifdef LDAP_CONNECTIONLESS +#include "../../libraries/liblber/lber-int.h" /* ber_int_sb_read() */ +#endif + #ifdef LDAP_SLAPI #include "slapi/slapi.h" #endif @@ -48,33 +52,19 @@ static ldap_pvt_thread_mutex_t connections_mutex; static Connection *connections = NULL; static ldap_pvt_thread_mutex_t conn_nextid_mutex; -static unsigned long conn_nextid = 0; +static unsigned long conn_nextid = SLAPD_SYNC_SYNCCONN_OFFSET; static const char conn_lost_str[] = "connection lost"; -/* structure state (protected by connections_mutex) */ -#define SLAP_C_UNINITIALIZED 0x00 /* MUST BE ZERO (0) */ -#define SLAP_C_UNUSED 0x01 -#define SLAP_C_USED 0x02 -#define SLAP_C_PENDING 0x03 - -/* connection state (protected by c_mutex ) */ -#define SLAP_C_INVALID 0x00 /* MUST BE ZERO (0) */ -#define SLAP_C_INACTIVE 0x01 /* zero threads */ -#define SLAP_C_ACTIVE 0x02 /* one or more threads */ -#define SLAP_C_BINDING 0x03 /* binding */ -#define SLAP_C_CLOSING 0x04 /* closing */ -#define SLAP_C_CLIENT 0x05 /* outbound client conn */ - const char * connection_state2str( int state ) { switch( state ) { case SLAP_C_INVALID: return "!"; case SLAP_C_INACTIVE: return "|"; + case SLAP_C_CLOSING: return "C"; case SLAP_C_ACTIVE: return ""; case SLAP_C_BINDING: return "B"; - case SLAP_C_CLOSING: return "C"; case SLAP_C_CLIENT: return "L"; } @@ -83,8 +73,6 @@ connection_state2str( int state ) static Connection* connection_get( ber_socket_t s ); -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER - typedef struct conn_readinfo { Operation *op; ldap_pvt_thread_start_t *func; @@ -94,15 +82,10 @@ typedef struct conn_readinfo { } conn_readinfo; static int connection_input( Connection *c, conn_readinfo *cri ); -#else -static int connection_input( Connection *c ); -#endif static void connection_close( Connection *c ); static int connection_op_activate( Operation *op ); -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER static void connection_op_queue( Operation *op ); -#endif static int connection_resched( Connection *conn ); static void connection_abandon( Connection *conn ); static void connection_destroy( Connection *c ); @@ -170,8 +153,10 @@ int connections_destroy(void) if( connections[i].c_struct_state != SLAP_C_UNINITIALIZED ) { ber_sockbuf_free( connections[i].c_sb ); ldap_pvt_thread_mutex_destroy( &connections[i].c_mutex ); - ldap_pvt_thread_mutex_destroy( &connections[i].c_write_mutex ); - ldap_pvt_thread_cond_destroy( &connections[i].c_write_cv ); + ldap_pvt_thread_mutex_destroy( &connections[i].c_write1_mutex ); + ldap_pvt_thread_mutex_destroy( &connections[i].c_write2_mutex ); + ldap_pvt_thread_cond_destroy( &connections[i].c_write1_cv ); + ldap_pvt_thread_cond_destroy( &connections[i].c_write2_cv ); #ifdef LDAP_SLAPI if ( slapi_plugins_used ) { slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION, @@ -223,32 +208,76 @@ int connections_shutdown(void) */ int connections_timeout_idle(time_t now) { - int i = 0; + int i = 0, writers = 0; int connindex; Connection* c; + time_t old; + + old = slapd_get_writetime(); for( c = connection_first( &connindex ); c != NULL; c = connection_next( c, &connindex ) ) { /* Don't timeout a slow-running request or a persistent - * outbound connection */ - if( c->c_n_ops_executing || c->c_conn_state == SLAP_C_CLIENT ) { + * outbound connection. But if it has a writewaiter, see + * if the waiter has been there too long. + */ + if(( c->c_n_ops_executing && !c->c_writewaiter) + || c->c_conn_state == SLAP_C_CLIENT ) { + connection_done( c ); continue; } - if( difftime( c->c_activitytime+global_idletimeout, now) < 0 ) { + if( global_idletimeout && + difftime( c->c_activitytime+global_idletimeout, now) < 0 ) { /* close it */ connection_closing( c, "idletimeout" ); connection_close( c ); i++; + continue; } + if ( c->c_writewaiter && global_writetimeout ) { + writers = 1; + if( difftime( c->c_activitytime+global_writetimeout, now) < 0 ) { + /* close it */ + connection_closing( c, "writetimeout" ); + connection_close( c ); + i++; + continue; + } + } + connection_done( c ); } - connection_done( c ); + if ( old && !writers ) + slapd_clr_writetime( old ); return i; } +/* Drop all client connections */ +void connections_drop() +{ + Connection* c; + int connindex; + + for( c = connection_first( &connindex ); + c != NULL; + c = connection_next( c, &connindex ) ) + { + /* Don't close a slow-running request or a persistent + * outbound connection. + */ + if(( c->c_n_ops_executing && !c->c_writewaiter) + || c->c_conn_state == SLAP_C_CLIENT ) { + connection_done( c ); + continue; + } + connection_closing( c, "dropping" ); + connection_close( c ); + } +} + static Connection* connection_get( ber_socket_t s ) { Connection *c; @@ -261,66 +290,22 @@ static Connection* connection_get( ber_socket_t s ) if(s == AC_SOCKET_INVALID) return NULL; -#ifndef HAVE_WINSOCK assert( s < dtblsize ); c = &connections[s]; -#else - c = NULL; - { - ldap_pvt_thread_mutex_lock( &connections_mutex ); - for(i=0; i unused by resched, - * so don't assert details here. - */ - - if( connections[i].c_sd == s ) { - c = &connections[i]; - break; - } - } - ldap_pvt_thread_mutex_unlock( &connections_mutex ); - } -#endif - if( c != NULL ) { ldap_pvt_thread_mutex_lock( &c->c_mutex ); assert( c->c_struct_state != SLAP_C_UNINITIALIZED ); -#ifdef HAVE_WINSOCK - /* Avoid race condition after releasing - * connections_mutex - */ - if ( c->c_sd != s ) { - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); - return NULL; - } -#endif if( c->c_struct_state != SLAP_C_USED ) { /* connection must have been closed due to resched */ - assert( c->c_conn_state == SLAP_C_INVALID ); - assert( c->c_sd == AC_SOCKET_INVALID ); - - Debug( LDAP_DEBUG_TRACE, + Debug( LDAP_DEBUG_CONNS, "connection_get(%d): connection not used\n", s, 0, 0 ); + assert( c->c_conn_state == SLAP_C_INVALID ); + assert( c->c_sd == AC_SOCKET_INVALID ); ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return NULL; @@ -365,6 +350,7 @@ Connection * connection_init( unsigned long id; Connection *c; int doinit = 0; + ber_socket_t sfd = SLAP_FD2SOCK(s); assert( connections != NULL ); @@ -383,7 +369,6 @@ Connection * connection_init( } assert( s >= 0 ); -#ifndef HAVE_WINSOCK assert( s < dtblsize ); c = &connections[s]; if( c->c_struct_state == SLAP_C_UNINITIALIZED ) { @@ -391,45 +376,6 @@ Connection * connection_init( } else { assert( c->c_struct_state == SLAP_C_UNUSED ); } -#else - { - ber_socket_t i; - c = NULL; - - ldap_pvt_thread_mutex_lock( &connections_mutex ); - for( i=0; i < dtblsize; i++) { - if ( connections[i].c_struct_state == SLAP_C_PENDING ) - continue; - - if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) { - assert( connections[i].c_sb == 0 ); - c = &connections[i]; - c->c_struct_state = SLAP_C_PENDING; - doinit = 1; - break; - } - - if( connections[i].c_struct_state == SLAP_C_UNUSED ) { - c = &connections[i]; - c->c_struct_state = SLAP_C_PENDING; - break; - } - - if( connections[i].c_conn_state == SLAP_C_CLIENT ) continue; - - assert( connections[i].c_struct_state == SLAP_C_USED ); - assert( connections[i].c_conn_state != SLAP_C_INVALID ); - } - ldap_pvt_thread_mutex_unlock( &connections_mutex ); - - if( c == NULL ) { - Debug( LDAP_DEBUG_ANY, - "connection_init(%d): connection table full " - "(%d/%d)\n", s, i, dtblsize); - return NULL; - } - } -#endif if( doinit ) { c->c_send_ldap_result = slap_send_ldap_result; @@ -473,8 +419,10 @@ Connection * connection_init( /* should check status of thread calls */ ldap_pvt_thread_mutex_init( &c->c_mutex ); - ldap_pvt_thread_mutex_init( &c->c_write_mutex ); - ldap_pvt_thread_cond_init( &c->c_write_cv ); + ldap_pvt_thread_mutex_init( &c->c_write1_mutex ); + ldap_pvt_thread_mutex_init( &c->c_write2_mutex ); + ldap_pvt_thread_cond_init( &c->c_write1_cv ); + ldap_pvt_thread_cond_init( &c->c_write2_cv ); #ifdef LDAP_SLAPI if ( slapi_plugins_used ) { @@ -506,16 +454,19 @@ Connection * connection_init( assert( c->c_sasl_bindop == NULL ); assert( c->c_currentber == NULL ); assert( c->c_writewaiter == 0); + assert( c->c_writers == 0); c->c_listener = listener; c->c_sd = s; if ( flags & CONN_IS_CLIENT ) { c->c_connid = 0; + ldap_pvt_thread_mutex_lock( &connections_mutex ); c->c_conn_state = SLAP_C_CLIENT; c->c_struct_state = SLAP_C_USED; + ldap_pvt_thread_mutex_unlock( &connections_mutex ); c->c_close_reason = "?"; /* should never be needed */ - ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s ); + ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &sfd ); ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return c; @@ -552,7 +503,7 @@ Connection * connection_init( LBER_SBIOD_LEVEL_PROVIDER, (void*)"udp_" ); #endif ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_udp, - LBER_SBIOD_LEVEL_PROVIDER, (void *)&s ); + LBER_SBIOD_LEVEL_PROVIDER, (void *)&sfd ); ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_readahead, LBER_SBIOD_LEVEL_PROVIDER, NULL ); } else @@ -564,7 +515,7 @@ Connection * connection_init( LBER_SBIOD_LEVEL_PROVIDER, (void*)"ipc_" ); #endif ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd, - LBER_SBIOD_LEVEL_PROVIDER, (void *)&s ); + LBER_SBIOD_LEVEL_PROVIDER, (void *)&sfd ); #ifdef LDAP_PF_LOCAL_SENDMSG if ( !BER_BVISEMPTY( peerbv )) ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_UNGET_BUF, peerbv ); @@ -577,7 +528,7 @@ Connection * connection_init( LBER_SBIOD_LEVEL_PROVIDER, (void*)"tcp_" ); #endif ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp, - LBER_SBIOD_LEVEL_PROVIDER, (void *)&s ); + LBER_SBIOD_LEVEL_PROVIDER, (void *)&sfd ); } #ifdef LDAP_DEBUG @@ -597,8 +548,10 @@ Connection * connection_init( id = c->c_connid = conn_nextid++; ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex ); + ldap_pvt_thread_mutex_lock( &connections_mutex ); c->c_conn_state = SLAP_C_INACTIVE; c->c_struct_state = SLAP_C_USED; + ldap_pvt_thread_mutex_unlock( &connections_mutex ); c->c_close_reason = "?"; /* should never be needed */ c->c_ssf = c->c_transport_ssf = ssf; @@ -618,9 +571,9 @@ Connection * connection_init( slap_sasl_external( c, ssf, authid ); slapd_add_internal( s, 1 ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); backend_connection_init(c); + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return c; } @@ -664,6 +617,7 @@ connection_destroy( Connection *c ) unsigned long connid; const char *close_reason; Sockbuf *sb; + ber_socket_t sd; assert( connections != NULL ); assert( c != NULL ); @@ -677,6 +631,7 @@ connection_destroy( Connection *c ) assert( LDAP_STAILQ_EMPTY(&c->c_txn_ops) ); #endif assert( c->c_writewaiter == 0); + assert( c->c_writers == 0); /* only for stats (print -1 as "%lu" may give unexpected results ;) */ connid = c->c_connid; @@ -726,6 +681,8 @@ connection_destroy( Connection *c ) } #endif + sd = c->c_sd; + c->c_sd = AC_SOCKET_INVALID; c->c_conn_state = SLAP_C_INVALID; c->c_struct_state = SLAP_C_UNUSED; c->c_close_reason = "?"; /* should never be needed */ @@ -740,33 +697,28 @@ connection_destroy( Connection *c ) /* c must be fully reset by this point; when we call slapd_remove * it may get immediately reused by a new connection. */ - if ( c->c_sd != AC_SOCKET_INVALID ) { - slapd_remove( c->c_sd, sb, 1, 0, 0 ); + if ( sd != AC_SOCKET_INVALID ) { + slapd_remove( sd, sb, 1, 0, 0 ); if ( close_reason == NULL ) { Statslog( LDAP_DEBUG_STATS, "conn=%lu fd=%ld closed\n", - connid, (long) c->c_sd, 0, 0, 0 ); + connid, (long) sd, 0, 0, 0 ); } else { Statslog( LDAP_DEBUG_STATS, "conn=%lu fd=%ld closed (%s)\n", - connid, (long) c->c_sd, close_reason, 0, 0 ); + connid, (long) sd, close_reason, 0, 0 ); } - c->c_sd = AC_SOCKET_INVALID; } } -int connection_state_closing( Connection *c ) +int connection_valid( Connection *c ) { /* c_mutex must be locked by caller */ - int state; assert( c != NULL ); - assert( c->c_struct_state == SLAP_C_USED ); - - state = c->c_conn_state; - - assert( state != SLAP_C_INVALID ); - return state == SLAP_C_CLOSING; + return c->c_struct_state == SLAP_C_USED && + c->c_conn_state >= SLAP_C_ACTIVE && + c->c_conn_state <= SLAP_C_CLIENT; } static void connection_abandon( Connection *c ) @@ -775,7 +727,6 @@ static void connection_abandon( Connection *c ) Operation *o, *next, op = {0}; Opheader ohdr = {0}; - SlapReply rs = {0}; op.o_hdr = &ohdr; op.o_conn = c; @@ -783,6 +734,8 @@ static void connection_abandon( Connection *c ) op.o_tag = LDAP_REQ_ABANDON; for ( o = LDAP_STAILQ_FIRST( &c->c_ops ); o; o=next ) { + SlapReply rs = {REP_RESULT}; + next = LDAP_STAILQ_NEXT( o, o_next ); op.orn_msgid = o->o_msgid; o->o_abandon = 1; @@ -811,17 +764,45 @@ static void connection_abandon( Connection *c ) } } +static void +connection_wake_writers( Connection *c ) +{ + /* wake write blocked operations */ + ldap_pvt_thread_mutex_lock( &c->c_write1_mutex ); + if ( c->c_writers > 0 ) { + c->c_writers = -c->c_writers; + ldap_pvt_thread_cond_broadcast( &c->c_write1_cv ); + ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex ); + if ( c->c_writewaiter ) { + ldap_pvt_thread_mutex_lock( &c->c_write2_mutex ); + ldap_pvt_thread_cond_signal( &c->c_write2_cv ); + slapd_clr_write( c->c_sd, 1 ); + ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex ); + } + ldap_pvt_thread_mutex_lock( &c->c_write1_mutex ); + while ( c->c_writers ) { + ldap_pvt_thread_cond_wait( &c->c_write1_cv, &c->c_write1_mutex ); + } + ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex ); + } else { + ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex ); + slapd_clr_write( c->c_sd, 1 ); + } +} + void connection_closing( Connection *c, const char *why ) { assert( connections != NULL ); assert( c != NULL ); - assert( c->c_struct_state == SLAP_C_USED ); + + if ( c->c_struct_state != SLAP_C_USED ) return; + assert( c->c_conn_state != SLAP_C_INVALID ); /* c_mutex must be locked by caller */ if( c->c_conn_state != SLAP_C_CLOSING ) { - Debug( LDAP_DEBUG_TRACE, + Debug( LDAP_DEBUG_CONNS, "connection_closing: readying conn=%lu sd=%d for close\n", c->c_connid, c->c_sd, 0 ); /* update state to closing */ @@ -835,20 +816,7 @@ void connection_closing( Connection *c, const char *why ) connection_abandon( c ); /* wake write blocked operations */ - if ( c->c_writewaiter ) { - ldap_pvt_thread_cond_signal( &c->c_write_cv ); - /* ITS#4667 this may allow another thread to drop into - * connection_resched / connection_close before we - * finish, but that's OK. - */ - slapd_clr_write( c->c_sd, 1 ); - ldap_pvt_thread_mutex_unlock( &c->c_mutex ); - ldap_pvt_thread_mutex_lock( &c->c_write_mutex ); - ldap_pvt_thread_mutex_lock( &c->c_mutex ); - ldap_pvt_thread_mutex_unlock( &c->c_write_mutex ); - } else { - slapd_clr_write( c->c_sd, 1 ); - } + connection_wake_writers( c ); } else if( why == NULL && c->c_close_reason == conn_lost_str ) { /* Client closed connection after doing Unbind. */ @@ -862,11 +830,8 @@ connection_close( Connection *c ) assert( connections != NULL ); assert( c != NULL ); - /* ITS#4667 we may have gotten here twice */ - if ( c->c_conn_state == SLAP_C_INVALID ) - return; + if ( c->c_struct_state != SLAP_C_USED ) return; - assert( c->c_struct_state == SLAP_C_USED ); assert( c->c_conn_state == SLAP_C_CLOSING ); /* NOTE: c_mutex should be locked by caller */ @@ -874,7 +839,7 @@ connection_close( Connection *c ) if ( !LDAP_STAILQ_EMPTY(&c->c_ops) || !LDAP_STAILQ_EMPTY(&c->c_pending_ops) ) { - Debug( LDAP_DEBUG_TRACE, + Debug( LDAP_DEBUG_CONNS, "connection_close: deferring conn=%lu sd=%d\n", c->c_connid, c->c_sd, 0 ); return; @@ -930,16 +895,12 @@ Connection* connection_next( Connection *c, ber_socket_t *index ) for(; *index < dtblsize; (*index)++) { int c_struct; if( connections[*index].c_struct_state == SLAP_C_UNINITIALIZED ) { + /* FIXME: accessing c_conn_state without locking c_mutex */ assert( connections[*index].c_conn_state == SLAP_C_INVALID ); -#ifdef HAVE_WINSOCK - break; -#else continue; -#endif } if( connections[*index].c_struct_state == SLAP_C_USED ) { - assert( connections[*index].c_conn_state != SLAP_C_INVALID ); c = &connections[(*index)++]; if ( ldap_pvt_thread_mutex_trylock( &c->c_mutex )) { /* avoid deadlock */ @@ -952,6 +913,7 @@ Connection* connection_next( Connection *c, ber_socket_t *index ) continue; } } + assert( c->c_conn_state != SLAP_C_INVALID ); break; } @@ -959,6 +921,7 @@ Connection* connection_next( Connection *c, ber_socket_t *index ) if ( c_struct == SLAP_C_PENDING ) continue; assert( c_struct == SLAP_C_UNUSED ); + /* FIXME: accessing c_conn_state without locking c_mutex */ assert( connections[*index].c_conn_state == SLAP_C_INVALID ); } @@ -983,24 +946,24 @@ void connection_done( Connection *c ) /* FIXME: returns 0 in case of failure */ #define INCR_OP_INITIATED(index) \ do { \ - ldap_pvt_thread_mutex_lock( &slap_counters.sc_ops_mutex ); \ - ldap_pvt_mp_add_ulong(slap_counters.sc_ops_initiated_[(index)], 1); \ - ldap_pvt_thread_mutex_unlock( &slap_counters.sc_ops_mutex ); \ + ldap_pvt_thread_mutex_lock( &op->o_counters->sc_mutex ); \ + ldap_pvt_mp_add_ulong(op->o_counters->sc_ops_initiated_[(index)], 1); \ + ldap_pvt_thread_mutex_unlock( &op->o_counters->sc_mutex ); \ } while (0) #define INCR_OP_COMPLETED(index) \ do { \ - ldap_pvt_thread_mutex_lock( &slap_counters.sc_ops_mutex ); \ - ldap_pvt_mp_add_ulong(slap_counters.sc_ops_completed, 1); \ - ldap_pvt_mp_add_ulong(slap_counters.sc_ops_completed_[(index)], 1); \ - ldap_pvt_thread_mutex_unlock( &slap_counters.sc_ops_mutex ); \ + ldap_pvt_thread_mutex_lock( &op->o_counters->sc_mutex ); \ + ldap_pvt_mp_add_ulong(op->o_counters->sc_ops_completed, 1); \ + ldap_pvt_mp_add_ulong(op->o_counters->sc_ops_completed_[(index)], 1); \ + ldap_pvt_thread_mutex_unlock( &op->o_counters->sc_mutex ); \ } while (0) #else /* !SLAPD_MONITOR */ #define INCR_OP_INITIATED(index) do { } while (0) #define INCR_OP_COMPLETED(index) \ do { \ - ldap_pvt_thread_mutex_lock( &slap_counters.sc_ops_mutex ); \ - ldap_pvt_mp_add_ulong(slap_counters.sc_ops_completed, 1); \ - ldap_pvt_thread_mutex_unlock( &slap_counters.sc_ops_mutex ); \ + ldap_pvt_thread_mutex_lock( &op->o_counters->sc_mutex ); \ + ldap_pvt_mp_add_ulong(op->o_counters->sc_ops_completed, 1); \ + ldap_pvt_thread_mutex_unlock( &op->o_counters->sc_mutex ); \ } while (0) #endif /* !SLAPD_MONITOR */ @@ -1021,10 +984,67 @@ static BI_op_func *opfun[] = { NULL }; +/* Counters are per-thread, not per-connection. + */ +static void +conn_counter_destroy( void *key, void *data ) +{ + slap_counters_t **prev, *sc; + + ldap_pvt_thread_mutex_lock( &slap_counters.sc_mutex ); + for ( prev = &slap_counters.sc_next, sc = slap_counters.sc_next; sc; + prev = &sc->sc_next, sc = sc->sc_next ) { + if ( sc == data ) { + int i; + + *prev = sc->sc_next; + /* Copy data to main counter */ + ldap_pvt_mp_add( slap_counters.sc_bytes, sc->sc_bytes ); + ldap_pvt_mp_add( slap_counters.sc_pdu, sc->sc_pdu ); + ldap_pvt_mp_add( slap_counters.sc_entries, sc->sc_entries ); + ldap_pvt_mp_add( slap_counters.sc_refs, sc->sc_refs ); + ldap_pvt_mp_add( slap_counters.sc_ops_initiated, sc->sc_ops_initiated ); + ldap_pvt_mp_add( slap_counters.sc_ops_completed, sc->sc_ops_completed ); +#ifdef SLAPD_MONITOR + for ( i = 0; i < SLAP_OP_LAST; i++ ) { + ldap_pvt_mp_add( slap_counters.sc_ops_initiated_[ i ], sc->sc_ops_initiated_[ i ] ); + ldap_pvt_mp_add( slap_counters.sc_ops_initiated_[ i ], sc->sc_ops_completed_[ i ] ); + } +#endif /* SLAPD_MONITOR */ + slap_counters_destroy( sc ); + ber_memfree_x( data, NULL ); + break; + } + } + ldap_pvt_thread_mutex_unlock( &slap_counters.sc_mutex ); +} + +static void +conn_counter_init( Operation *op, void *ctx ) +{ + slap_counters_t *sc; + void *vsc = NULL; + + if ( ldap_pvt_thread_pool_getkey( + ctx, (void *)conn_counter_init, &vsc, NULL ) || !vsc ) { + vsc = ch_malloc( sizeof( slap_counters_t )); + sc = vsc; + slap_counters_init( sc ); + ldap_pvt_thread_pool_setkey( ctx, (void*)conn_counter_init, vsc, + conn_counter_destroy, NULL, NULL ); + + ldap_pvt_thread_mutex_lock( &slap_counters.sc_mutex ); + sc->sc_next = slap_counters.sc_next; + slap_counters.sc_next = sc; + ldap_pvt_thread_mutex_unlock( &slap_counters.sc_mutex ); + } + op->o_counters = vsc; +} + static void * connection_operation( void *ctx, void *arg_v ) { - int rc = LDAP_OTHER; + int rc = LDAP_OTHER, cancel; Operation *op = arg_v; SlapReply rs = {REP_RESULT}; ber_tag_t tag = op->o_tag; @@ -1034,10 +1054,11 @@ connection_operation( void *ctx, void *arg_v ) void *memctx_null = NULL; ber_len_t memsiz; - ldap_pvt_thread_mutex_lock( &slap_counters.sc_ops_mutex ); + conn_counter_init( op, ctx ); + ldap_pvt_thread_mutex_lock( &op->o_counters->sc_mutex ); /* FIXME: returns 0 in case of failure */ - ldap_pvt_mp_add_ulong(slap_counters.sc_ops_initiated, 1); - ldap_pvt_thread_mutex_unlock( &slap_counters.sc_ops_mutex ); + ldap_pvt_mp_add_ulong(op->o_counters->sc_ops_initiated, 1); + ldap_pvt_thread_mutex_unlock( &op->o_counters->sc_mutex ); op->o_threadctx = ctx; op->o_tid = ldap_pvt_thread_pool_tid( ctx ); @@ -1127,22 +1148,32 @@ operations_error: INCR_OP_COMPLETED( opidx ); } - if ( op->o_cancel == SLAP_CANCEL_REQ ) { - if ( rc == SLAPD_ABANDON ) { - op->o_cancel = SLAP_CANCEL_ACK; - } else { - op->o_cancel = LDAP_TOO_LATE; + ldap_pvt_thread_mutex_lock( &conn->c_mutex ); + + if ( opidx == SLAP_OP_BIND && conn->c_conn_state == SLAP_C_BINDING ) + conn->c_conn_state = SLAP_C_ACTIVE; + + cancel = op->o_cancel; + if ( cancel != SLAP_CANCEL_NONE && cancel != SLAP_CANCEL_DONE ) { + if ( cancel == SLAP_CANCEL_REQ ) { + op->o_cancel = rc == SLAPD_ABANDON + ? SLAP_CANCEL_ACK : LDAP_TOO_LATE; } - } - while ( op->o_cancel != SLAP_CANCEL_NONE && - op->o_cancel != SLAP_CANCEL_DONE ) - { - ldap_pvt_thread_yield(); + do { + /* Fake a cond_wait with thread_yield, then + * verify the result properly mutex-protected. + */ + ldap_pvt_thread_mutex_unlock( &conn->c_mutex ); + do { + ldap_pvt_thread_yield(); + } while ( (cancel = op->o_cancel) != SLAP_CANCEL_NONE + && cancel != SLAP_CANCEL_DONE ); + ldap_pvt_thread_mutex_lock( &conn->c_mutex ); + } while ( (cancel = op->o_cancel) != SLAP_CANCEL_NONE + && cancel != SLAP_CANCEL_DONE ); } - ldap_pvt_thread_mutex_lock( &conn->c_mutex ); - ber_set_option( op->o_ber, LBER_OPT_BER_MEMCTX, &memctx_null ); LDAP_STAILQ_REMOVE( &conn->c_ops, op, Operation, o_next); @@ -1167,46 +1198,47 @@ operations_error: static const Listener dummy_list = { BER_BVC(""), BER_BVC("") }; -int connection_client_setup( +Connection *connection_client_setup( ber_socket_t s, ldap_pvt_thread_start_t *func, void *arg ) { Connection *c; + ber_socket_t sfd = SLAP_SOCKNEW( s ); - c = connection_init( s, (Listener *)&dummy_list, "", "", + c = connection_init( sfd, (Listener *)&dummy_list, "", "", CONN_IS_CLIENT, 0, NULL LDAP_PF_LOCAL_SENDMSG_ARG(NULL)); - if ( !c ) return -1; + if ( c ) { + c->c_clientfunc = func; + c->c_clientarg = arg; - c->c_clientfunc = func; - c->c_clientarg = arg; - - slapd_add_internal( s, 0 ); - slapd_set_read( s, 1 ); - return 0; + slapd_add_internal( sfd, 0 ); + } + return c; } void connection_client_enable( - ber_socket_t s ) + Connection *c ) { - slapd_set_read( s, 1 ); + slapd_set_read( c->c_sd, 1 ); } void connection_client_stop( - ber_socket_t s ) + Connection *c ) { - Connection *c; Sockbuf *sb; + ber_socket_t s = c->c_sd; /* get (locked) connection */ c = connection_get( s ); - + assert( c->c_conn_state == SLAP_C_CLIENT ); c->c_listener = NULL; c->c_conn_state = SLAP_C_INVALID; c->c_struct_state = SLAP_C_UNUSED; + c->c_sd = AC_SOCKET_INVALID; c->c_close_reason = "?"; /* should never be needed */ sb = c->c_sb; c->c_sb = ber_sockbuf_alloc( ); @@ -1219,14 +1251,12 @@ void connection_client_stop( connection_return( c ); } -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER - static int connection_read( ber_socket_t s, conn_readinfo *cri ); static void* connection_read_thread( void* ctx, void* argv ) { int rc ; - conn_readinfo cri = { NULL, NULL, NULL, 0 }; + conn_readinfo cri = { NULL, NULL, NULL, NULL, 0 }; ber_socket_t s = (long)argv; /* @@ -1262,6 +1292,11 @@ int connection_read_activate( ber_socket_t s ) if ( rc ) return rc; + /* Don't let blocked writers block a pause request */ + if ( connections[s].c_writewaiter && + ldap_pvt_thread_pool_pausing( &connection_pool )) + connection_wake_writers( &connections[s] ); + rc = ldap_pvt_thread_pool_submit( &connection_pool, connection_read_thread, (void *)(long)s ); @@ -1273,14 +1308,9 @@ int connection_read_activate( ber_socket_t s ) return rc; } -#endif -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER static int connection_read( ber_socket_t s, conn_readinfo *cri ) -#else -int connection_read(ber_socket_t s) -#endif { int rc = 0; Connection *c; @@ -1301,7 +1331,7 @@ int connection_read(ber_socket_t s) c->c_n_read++; if( c->c_conn_state == SLAP_C_CLOSING ) { - Debug( LDAP_DEBUG_TRACE, + Debug( LDAP_DEBUG_CONNS, "connection_read(%d): closing, ignoring input for id=%lu\n", s, c->c_connid, 0 ); connection_return( c ); @@ -1309,15 +1339,9 @@ int connection_read(ber_socket_t s) } if ( c->c_conn_state == SLAP_C_CLIENT ) { -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER cri->func = c->c_clientfunc; cri->arg = c->c_clientarg; /* read should already be cleared */ -#else - slapd_clr_read( s, 0 ); - ldap_pvt_thread_pool_submit( &connection_pool, - c->c_clientfunc, c->c_clientarg ); -#endif connection_return( c ); return 0; } @@ -1367,15 +1391,17 @@ int connection_read(ber_socket_t s) c->c_connid, (int) s, c->c_tls_ssf, c->c_ssf, 0 ); slap_sasl_external( c, c->c_tls_ssf, &authid ); if ( authid.bv_val ) free( authid.bv_val ); + } else if ( rc == 1 && ber_sockbuf_ctrl( c->c_sb, + LBER_SB_OPT_NEEDS_WRITE, NULL )) { /* need to retry */ + slapd_set_write( s, 1 ); + connection_return( c ); + return 0; } /* if success and data is ready, fall thru to data input loop */ if( !ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_DATA_READY, NULL ) ) { -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER slapd_set_read( s, 1 ); -#endif - connection_return( c ); return 0; } @@ -1386,10 +1412,7 @@ int connection_read(ber_socket_t s) if ( c->c_sasl_layers ) { /* If previous layer is not removed yet, give up for now */ if ( !c->c_sasl_sockctx ) { -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER slapd_set_read( s, 1 ); -#endif - connection_return( c ); return 0; } @@ -1417,11 +1440,7 @@ int connection_read(ber_socket_t s) do { /* How do we do this without getting into a busy loop ? */ -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER rc = connection_input( c, cri ); -#else - rc = connection_input( c ); -#endif } #ifdef DATA_READY_LOOP while( !rc && ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_DATA_READY, NULL )); @@ -1443,33 +1462,18 @@ int connection_read(ber_socket_t s) return 0; } -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) { slapd_set_write( s, 0 ); } slapd_set_read( s, 1 ); -#else - if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL ) ) { - slapd_set_read( s, 1 ); - } - - if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) { - slapd_set_write( s, 1 ); - } -#endif - connection_return( c ); return 0; } static int -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER connection_input( Connection *conn , conn_readinfo *cri ) -#else -connection_input( Connection *conn ) -#endif { Operation *op; ber_tag_t tag; @@ -1496,12 +1500,20 @@ connection_input( Connection *conn ) #ifdef LDAP_CONNECTIONLESS if ( conn->c_is_udp ) { char peername[sizeof("IP=255.255.255.255:65336")]; + const char *peeraddr_string = NULL; len = ber_int_sb_read(conn->c_sb, &peeraddr, sizeof(struct sockaddr)); if (len != sizeof(struct sockaddr)) return 1; +#if defined( HAVE_GETADDRINFO ) && defined( HAVE_INET_NTOP ) + char addr[INET_ADDRSTRLEN]; + peeraddr_string = inet_ntop( AF_INET, &peeraddr.sa_in_addr.sin_addr, + addr, sizeof(addr) ); +#else /* ! HAVE_GETADDRINFO || ! HAVE_INET_NTOP */ + peeraddr_string = inet_ntoa( peeraddr.sa_in_addr.sin_addr ); +#endif /* ! HAVE_GETADDRINFO || ! HAVE_INET_NTOP */ sprintf( peername, "IP=%s:%d", - inet_ntoa( peeraddr.sa_in_addr.sin_addr ), + peeraddr_string, (unsigned) ntohs( peeraddr.sa_in_addr.sin_port ) ); Statslog( LDAP_DEBUG_STATS, "conn=%lu UDP request from %s (%s) accepted.\n", @@ -1547,8 +1559,8 @@ connection_input( Connection *conn ) #ifdef LDAP_CONNECTIONLESS if( conn->c_is_udp ) { if( tag == LBER_OCTETSTRING ) { - ber_get_stringa( ber, &cdn ); - tag = ber_peek_tag(ber, &len); + if ( (tag = ber_get_stringa( ber, &cdn )) != LBER_ERROR ) + tag = ber_peek_tag( ber, &len ); } if( tag != LDAP_REQ_ABANDON && tag != LDAP_REQ_SEARCH ) { Debug( LDAP_DEBUG_ANY, "invalid req for UDP 0x%lx\n", tag, 0, 0 ); @@ -1563,13 +1575,12 @@ connection_input( Connection *conn ) connection_abandon( conn ); } -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER ctx = cri->ctx; -#else - ctx = NULL; -#endif op = slap_op_alloc( ber, msgid, tag, conn->c_n_ops_received++, ctx ); + Debug( LDAP_DEBUG_TRACE, "op tag 0x%lx, time %ld\n", tag, + (long) op->o_time, 0); + op->o_conn = conn; /* clear state if the connection is being reused from inactive */ if ( conn->c_conn_state == SLAP_C_INACTIVE ) { @@ -1658,7 +1669,6 @@ connection_input( Connection *conn ) } else { conn->c_n_ops_executing++; -#ifdef SLAP_LIGHTWEIGHT_DISPATCHER /* * The first op will be processed in the same thread context, * as long as there is only one op total. @@ -1677,9 +1687,6 @@ connection_input( Connection *conn ) } connection_op_activate( op ); } -#else - connection_op_activate( op ); -#endif } #ifdef NO_THREADS @@ -1698,15 +1705,18 @@ connection_resched( Connection *conn ) { Operation *op; + if( conn->c_writewaiter ) + return 0; + if( conn->c_conn_state == SLAP_C_CLOSING ) { - Debug( LDAP_DEBUG_TRACE, "connection_resched: " + Debug( LDAP_DEBUG_CONNS, "connection_resched: " "attempting closing conn=%lu sd=%d\n", conn->c_connid, conn->c_sd, 0 ); connection_close( conn ); return 0; } - if( conn->c_conn_state != SLAP_C_ACTIVE || conn->c_writewaiter ) { + if( conn->c_conn_state != SLAP_C_ACTIVE ) { /* other states need different handling */ return 0; } @@ -1756,8 +1766,6 @@ static int connection_bind_cleanup_cb( Operation *op, SlapReply *rs ) static int connection_bind_cb( Operation *op, SlapReply *rs ) { ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex ); - if ( op->o_conn->c_conn_state == SLAP_C_BINDING ) - op->o_conn->c_conn_state = SLAP_C_ACTIVE; op->o_conn->c_sasl_bind_in_progress = ( rs->sr_err == LDAP_SASL_BIND_IN_PROGRESS ); @@ -1895,12 +1903,22 @@ int connection_write(ber_socket_t s) return -1; } +#ifdef HAVE_TLS + if ( c->c_is_tls && c->c_needs_tls_accept ) { + connection_return( c ); + connection_read_activate( s ); + return 0; + } +#endif + c->c_n_write++; Debug( LDAP_DEBUG_TRACE, "connection_write(%d): waking output for id=%lu\n", s, c->c_connid, 0 ); - ldap_pvt_thread_cond_signal( &c->c_write_cv ); + ldap_pvt_thread_mutex_lock( &c->c_write2_mutex ); + ldap_pvt_thread_cond_signal( &c->c_write2_cv ); + ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex ); if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL ) ) { slapd_set_read( s, 1 ); @@ -1973,6 +1991,27 @@ connection_fake_init( connection_fake_init2( conn, opbuf, ctx, 1 ); } +void +operation_fake_init( + Connection *conn, + Operation *op, + void *ctx, + int newmem ) +{ + /* set memory context */ + op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, + newmem ); + op->o_tmpmfuncs = &slap_sl_mfuncs; + op->o_threadctx = ctx; + op->o_tid = ldap_pvt_thread_pool_tid( ctx ); + + op->o_counters = &slap_counters; + op->o_conn = conn; + op->o_connid = op->o_conn->c_connid; + connection_init_log_prefix( op ); +} + + void connection_fake_init2( Connection *conn, @@ -1987,6 +2026,8 @@ connection_fake_init2( conn->c_send_ldap_result = slap_send_ldap_result; conn->c_send_search_entry = slap_send_search_entry; conn->c_send_search_reference = slap_send_search_reference; + conn->c_send_ldap_extended = slap_send_ldap_extended; + conn->c_send_ldap_intermediate = slap_send_ldap_intermediate; conn->c_listener = (Listener *)&dummy_list; conn->c_peer_domain = slap_empty_bv; conn->c_peer_name = slap_empty_bv; @@ -1995,16 +2036,7 @@ connection_fake_init2( op->o_hdr = &opbuf->ob_hdr; op->o_controls = opbuf->ob_controls; - /* set memory context */ - op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, - newmem ); - op->o_tmpmfuncs = &slap_sl_mfuncs; - op->o_threadctx = ctx; - op->o_tid = ldap_pvt_thread_pool_tid( ctx ); - - op->o_conn = conn; - op->o_connid = op->o_conn->c_connid; - connection_init_log_prefix( op ); + operation_fake_init( conn, op, ctx, newmem ); #ifdef LDAP_SLAPI if ( slapi_plugins_used ) { @@ -2012,15 +2044,15 @@ connection_fake_init2( void *ebx = NULL; /* Use thread keys to make sure these eventually get cleaned up */ - if ( ldap_pvt_thread_pool_getkey( ctx, connection_fake_init, &ebx, - NULL )) { + if ( ldap_pvt_thread_pool_getkey( ctx, (void *)connection_fake_init, + &ebx, NULL )) { eb = ch_malloc( sizeof( *eb )); slapi_int_create_object_extensions( SLAPI_X_EXT_CONNECTION, conn ); slapi_int_create_object_extensions( SLAPI_X_EXT_OPERATION, op ); eb->eb_conn = conn->c_extensions; eb->eb_op = op->o_hdr->oh_extensions; - ldap_pvt_thread_pool_setkey( ctx, connection_fake_init, eb, - connection_fake_destroy ); + ldap_pvt_thread_pool_setkey( ctx, (void *)connection_fake_init, + eb, connection_fake_destroy, NULL, NULL ); } else { eb = ebx; conn->c_extensions = eb->eb_conn;