/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
- * Copyright 1998-2007 The OpenLDAP Foundation.
+ * Copyright 1998-2009 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
if( connections[i].c_struct_state != SLAP_C_UNINITIALIZED ) {
ber_sockbuf_free( connections[i].c_sb );
ldap_pvt_thread_mutex_destroy( &connections[i].c_mutex );
- ldap_pvt_thread_mutex_destroy( &connections[i].c_write_mutex );
- ldap_pvt_thread_cond_destroy( &connections[i].c_write_cv );
+ ldap_pvt_thread_mutex_destroy( &connections[i].c_write1_mutex );
+ ldap_pvt_thread_mutex_destroy( &connections[i].c_write2_mutex );
+ ldap_pvt_thread_cond_destroy( &connections[i].c_write1_cv );
+ ldap_pvt_thread_cond_destroy( &connections[i].c_write2_cv );
#ifdef LDAP_SLAPI
if ( slapi_plugins_used ) {
slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION,
*/
int connections_timeout_idle(time_t now)
{
- int i = 0;
+ int i = 0, writers = 0;
int connindex;
Connection* c;
+ time_t old;
+
+ old = slapd_get_writetime();
for( c = connection_first( &connindex );
c != NULL;
c = connection_next( c, &connindex ) )
{
/* Don't timeout a slow-running request or a persistent
- * outbound connection */
- if( c->c_n_ops_executing || c->c_conn_state == SLAP_C_CLIENT ) {
+ * outbound connection. But if it has a writewaiter, see
+ * if the waiter has been there too long.
+ */
+ if(( c->c_n_ops_executing && !c->c_writewaiter)
+ || c->c_conn_state == SLAP_C_CLIENT ) {
continue;
}
connection_closing( c, "idletimeout" );
connection_close( c );
i++;
+ continue;
+ }
+ if ( c->c_writewaiter ) {
+ writers = 1;
+ if( difftime( c->c_activitytime+global_writetimeout, now) < 0 ) {
+ /* close it */
+ connection_closing( c, "writetimeout" );
+ connection_close( c );
+ i++;
+ }
}
}
connection_done( c );
+ if ( !writers )
+ slapd_clr_writetime( old );
return i;
}
if(s == AC_SOCKET_INVALID) return NULL;
-#ifndef HAVE_WINSOCK
assert( s < dtblsize );
c = &connections[s];
-#else
- c = NULL;
- {
- ldap_pvt_thread_mutex_lock( &connections_mutex );
- for(i=0; i<dtblsize; i++) {
- if( connections[i].c_struct_state == SLAP_C_PENDING )
- continue;
-
- if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
- assert( connections[i].c_conn_state == SLAP_C_INVALID );
- assert( connections[i].c_sb == 0 );
- break;
- }
-
- if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
- assert( connections[i].c_conn_state == SLAP_C_INVALID );
- assert( connections[i].c_sd == AC_SOCKET_INVALID );
- continue;
- }
-
- /* state can actually change from used -> unused by resched,
- * so don't assert details here.
- */
-
- if( connections[i].c_sd == s ) {
- c = &connections[i];
- break;
- }
- }
- ldap_pvt_thread_mutex_unlock( &connections_mutex );
- }
-#endif
-
if( c != NULL ) {
ldap_pvt_thread_mutex_lock( &c->c_mutex );
assert( c->c_struct_state != SLAP_C_UNINITIALIZED );
-#ifdef HAVE_WINSOCK
- /* Avoid race condition after releasing
- * connections_mutex
- */
- if ( c->c_sd != s ) {
- ldap_pvt_thread_mutex_unlock( &c->c_mutex );
- return NULL;
- }
-#endif
if( c->c_struct_state != SLAP_C_USED ) {
/* connection must have been closed due to resched */
assert( c->c_conn_state == SLAP_C_INVALID );
assert( c->c_sd == AC_SOCKET_INVALID );
- Debug( LDAP_DEBUG_TRACE,
+ Debug( LDAP_DEBUG_CONNS,
"connection_get(%d): connection not used\n",
s, 0, 0 );
unsigned long id;
Connection *c;
int doinit = 0;
+ ber_socket_t sfd = SLAP_FD2SOCK(s);
assert( connections != NULL );
}
assert( s >= 0 );
-#ifndef HAVE_WINSOCK
assert( s < dtblsize );
c = &connections[s];
if( c->c_struct_state == SLAP_C_UNINITIALIZED ) {
} else {
assert( c->c_struct_state == SLAP_C_UNUSED );
}
-#else
- {
- ber_socket_t i;
- c = NULL;
-
- ldap_pvt_thread_mutex_lock( &connections_mutex );
- for( i=0; i < dtblsize; i++) {
- if ( connections[i].c_struct_state == SLAP_C_PENDING )
- continue;
-
- if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
- assert( connections[i].c_sb == 0 );
- c = &connections[i];
- c->c_struct_state = SLAP_C_PENDING;
- doinit = 1;
- break;
- }
-
- if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
- c = &connections[i];
- c->c_struct_state = SLAP_C_PENDING;
- break;
- }
-
- if( connections[i].c_conn_state == SLAP_C_CLIENT ) continue;
-
- assert( connections[i].c_struct_state == SLAP_C_USED );
- assert( connections[i].c_conn_state != SLAP_C_INVALID );
- }
- ldap_pvt_thread_mutex_unlock( &connections_mutex );
-
- if( c == NULL ) {
- Debug( LDAP_DEBUG_ANY,
- "connection_init(%d): connection table full "
- "(%d/%d)\n", s, i, dtblsize);
- return NULL;
- }
- }
-#endif
if( doinit ) {
c->c_send_ldap_result = slap_send_ldap_result;
/* should check status of thread calls */
ldap_pvt_thread_mutex_init( &c->c_mutex );
- ldap_pvt_thread_mutex_init( &c->c_write_mutex );
- ldap_pvt_thread_cond_init( &c->c_write_cv );
+ ldap_pvt_thread_mutex_init( &c->c_write1_mutex );
+ ldap_pvt_thread_mutex_init( &c->c_write2_mutex );
+ ldap_pvt_thread_cond_init( &c->c_write1_cv );
+ ldap_pvt_thread_cond_init( &c->c_write2_cv );
#ifdef LDAP_SLAPI
if ( slapi_plugins_used ) {
assert( c->c_sasl_bindop == NULL );
assert( c->c_currentber == NULL );
assert( c->c_writewaiter == 0);
+ assert( c->c_writers == 0);
c->c_listener = listener;
c->c_sd = s;
if ( flags & CONN_IS_CLIENT ) {
c->c_connid = 0;
+ ldap_pvt_thread_mutex_lock( &connections_mutex );
c->c_conn_state = SLAP_C_CLIENT;
c->c_struct_state = SLAP_C_USED;
+ ldap_pvt_thread_mutex_unlock( &connections_mutex );
c->c_close_reason = "?"; /* should never be needed */
- ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );
+ ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &sfd );
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return c;
LBER_SBIOD_LEVEL_PROVIDER, (void*)"udp_" );
#endif
ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_udp,
- LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
+ LBER_SBIOD_LEVEL_PROVIDER, (void *)&sfd );
ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_readahead,
LBER_SBIOD_LEVEL_PROVIDER, NULL );
} else
LBER_SBIOD_LEVEL_PROVIDER, (void*)"ipc_" );
#endif
ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd,
- LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
+ LBER_SBIOD_LEVEL_PROVIDER, (void *)&sfd );
#ifdef LDAP_PF_LOCAL_SENDMSG
if ( !BER_BVISEMPTY( peerbv ))
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_UNGET_BUF, peerbv );
LBER_SBIOD_LEVEL_PROVIDER, (void*)"tcp_" );
#endif
ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp,
- LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
+ LBER_SBIOD_LEVEL_PROVIDER, (void *)&sfd );
}
#ifdef LDAP_DEBUG
id = c->c_connid = conn_nextid++;
ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex );
+ ldap_pvt_thread_mutex_lock( &connections_mutex );
c->c_conn_state = SLAP_C_INACTIVE;
c->c_struct_state = SLAP_C_USED;
+ ldap_pvt_thread_mutex_unlock( &connections_mutex );
c->c_close_reason = "?"; /* should never be needed */
c->c_ssf = c->c_transport_ssf = ssf;
assert( LDAP_STAILQ_EMPTY(&c->c_txn_ops) );
#endif
assert( c->c_writewaiter == 0);
+ assert( c->c_writers == 0);
/* only for stats (print -1 as "%lu" may give unexpected results ;) */
connid = c->c_connid;
/* c_mutex must be locked by caller */
if( c->c_conn_state != SLAP_C_CLOSING ) {
- Debug( LDAP_DEBUG_TRACE,
+ Debug( LDAP_DEBUG_CONNS,
"connection_closing: readying conn=%lu sd=%d for close\n",
c->c_connid, c->c_sd, 0 );
/* update state to closing */
connection_abandon( c );
/* wake write blocked operations */
- if ( c->c_writewaiter ) {
- ldap_pvt_thread_cond_signal( &c->c_write_cv );
- /* ITS#4667 this may allow another thread to drop into
- * connection_resched / connection_close before we
- * finish, but that's OK.
- */
- slapd_clr_write( c->c_sd, 1 );
- ldap_pvt_thread_mutex_unlock( &c->c_mutex );
- ldap_pvt_thread_mutex_lock( &c->c_write_mutex );
- ldap_pvt_thread_mutex_lock( &c->c_mutex );
- ldap_pvt_thread_mutex_unlock( &c->c_write_mutex );
+ ldap_pvt_thread_mutex_lock( &c->c_write1_mutex );
+ if ( c->c_writers > 0 ) {
+ c->c_writers = -c->c_writers;
+ ldap_pvt_thread_cond_broadcast( &c->c_write1_cv );
+ ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex );
+ if ( c->c_writewaiter ) {
+ ldap_pvt_thread_mutex_lock( &c->c_write2_mutex );
+ ldap_pvt_thread_cond_signal( &c->c_write2_cv );
+ slapd_clr_write( c->c_sd, 1 );
+ ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex );
+ }
+ ldap_pvt_thread_mutex_lock( &c->c_write1_mutex );
+ while ( c->c_writers ) {
+ ldap_pvt_thread_cond_wait( &c->c_write1_cv, &c->c_write1_mutex );
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex );
} else {
+ ldap_pvt_thread_mutex_unlock( &c->c_write1_mutex );
slapd_clr_write( c->c_sd, 1 );
}
{
assert( connections != NULL );
assert( c != NULL );
-
- /* ITS#4667 we may have gotten here twice */
- if ( c->c_conn_state == SLAP_C_INVALID )
- return;
-
assert( c->c_struct_state == SLAP_C_USED );
assert( c->c_conn_state == SLAP_C_CLOSING );
if ( !LDAP_STAILQ_EMPTY(&c->c_ops) ||
!LDAP_STAILQ_EMPTY(&c->c_pending_ops) )
{
- Debug( LDAP_DEBUG_TRACE,
+ Debug( LDAP_DEBUG_CONNS,
"connection_close: deferring conn=%lu sd=%d\n",
c->c_connid, c->c_sd, 0 );
return;
for(; *index < dtblsize; (*index)++) {
int c_struct;
if( connections[*index].c_struct_state == SLAP_C_UNINITIALIZED ) {
+ /* FIXME: accessing c_conn_state without locking c_mutex */
assert( connections[*index].c_conn_state == SLAP_C_INVALID );
-#ifdef HAVE_WINSOCK
- break;
-#else
continue;
-#endif
}
if( connections[*index].c_struct_state == SLAP_C_USED ) {
- assert( connections[*index].c_conn_state != SLAP_C_INVALID );
c = &connections[(*index)++];
if ( ldap_pvt_thread_mutex_trylock( &c->c_mutex )) {
/* avoid deadlock */
continue;
}
}
+ assert( c->c_conn_state != SLAP_C_INVALID );
break;
}
if ( c_struct == SLAP_C_PENDING )
continue;
assert( c_struct == SLAP_C_UNUSED );
+ /* FIXME: accessing c_conn_state without locking c_mutex */
assert( connections[*index].c_conn_state == SLAP_C_INVALID );
}
slap_counters_t *sc;
void *vsc = NULL;
- if ( ldap_pvt_thread_pool_getkey( ctx, conn_counter_init, &vsc, NULL ) || !vsc ) {
+ if ( ldap_pvt_thread_pool_getkey(
+ ctx, (void *)conn_counter_init, &vsc, NULL ) || !vsc ) {
vsc = ch_malloc( sizeof( slap_counters_t ));
sc = vsc;
slap_counters_init( sc );
- ldap_pvt_thread_pool_setkey( ctx, conn_counter_init, vsc,
- conn_counter_destroy );
+ ldap_pvt_thread_pool_setkey( ctx, (void*)conn_counter_init, vsc,
+ conn_counter_destroy, NULL, NULL );
ldap_pvt_thread_mutex_lock( &slap_counters.sc_mutex );
sc->sc_next = slap_counters.sc_next;
static const Listener dummy_list = { BER_BVC(""), BER_BVC("") };
-int connection_client_setup(
+Connection *connection_client_setup(
ber_socket_t s,
ldap_pvt_thread_start_t *func,
void *arg )
{
Connection *c;
+ ber_socket_t sfd = SLAP_SOCKNEW( s );
- c = connection_init( s, (Listener *)&dummy_list, "", "",
+ c = connection_init( sfd, (Listener *)&dummy_list, "", "",
CONN_IS_CLIENT, 0, NULL
LDAP_PF_LOCAL_SENDMSG_ARG(NULL));
- if ( !c ) return -1;
-
- c->c_clientfunc = func;
- c->c_clientarg = arg;
+ if ( c ) {
+ c->c_clientfunc = func;
+ c->c_clientarg = arg;
- slapd_add_internal( s, 0 );
- slapd_set_read( s, 1 );
- return 0;
+ slapd_add_internal( sfd, 0 );
+ }
+ return c;
}
void connection_client_enable(
- ber_socket_t s )
+ Connection *c )
{
- slapd_set_read( s, 1 );
+ slapd_set_read( c->c_sd, 1 );
}
void connection_client_stop(
- ber_socket_t s )
+ Connection *c )
{
- Connection *c;
Sockbuf *sb;
+ ber_socket_t s = c->c_sd;
/* get (locked) connection */
c = connection_get( s );
-
+
assert( c->c_conn_state == SLAP_C_CLIENT );
c->c_listener = NULL;
static void* connection_read_thread( void* ctx, void* argv )
{
int rc ;
- conn_readinfo cri = { NULL, NULL, NULL, 0 };
+ conn_readinfo cri = { NULL, NULL, NULL, NULL, 0 };
ber_socket_t s = (long)argv;
/*
return rc;
}
+void
+connection_hangup( ber_socket_t s )
+{
+ Connection *c;
+
+ c = connection_get( s );
+ if ( c ) {
+ if ( c->c_conn_state == SLAP_C_CLIENT ) {
+ connection_return( c );
+ connection_read_activate( s );
+ } else {
+ connection_closing( c, "connection lost" );
+ connection_close( c );
+ connection_return( c );
+ }
+ }
+}
+
static int
connection_read( ber_socket_t s, conn_readinfo *cri )
{
c->c_n_read++;
if( c->c_conn_state == SLAP_C_CLOSING ) {
- Debug( LDAP_DEBUG_TRACE,
+ Debug( LDAP_DEBUG_CONNS,
"connection_read(%d): closing, ignoring input for id=%lu\n",
s, c->c_connid, 0 );
connection_return( c );
{
Operation *op;
+ if( conn->c_writewaiter )
+ return 0;
+
if( conn->c_conn_state == SLAP_C_CLOSING ) {
- Debug( LDAP_DEBUG_TRACE, "connection_resched: "
+ Debug( LDAP_DEBUG_CONNS, "connection_resched: "
"attempting closing conn=%lu sd=%d\n",
conn->c_connid, conn->c_sd, 0 );
connection_close( conn );
return 0;
}
- if( conn->c_conn_state != SLAP_C_ACTIVE || conn->c_writewaiter ) {
+ if( conn->c_conn_state != SLAP_C_ACTIVE ) {
/* other states need different handling */
return 0;
}
Debug( LDAP_DEBUG_TRACE,
"connection_write(%d): waking output for id=%lu\n",
s, c->c_connid, 0 );
- ldap_pvt_thread_cond_signal( &c->c_write_cv );
+ ldap_pvt_thread_mutex_lock( &c->c_write2_mutex );
+ ldap_pvt_thread_cond_signal( &c->c_write2_cv );
+ ldap_pvt_thread_mutex_unlock( &c->c_write2_mutex );
if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL ) ) {
slapd_set_read( s, 1 );
connection_fake_init2( conn, opbuf, ctx, 1 );
}
+void
+operation_fake_init(
+ Connection *conn,
+ Operation *op,
+ void *ctx,
+ int newmem )
+{
+ /* set memory context */
+ op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx,
+ newmem );
+ op->o_tmpmfuncs = &slap_sl_mfuncs;
+ op->o_threadctx = ctx;
+ op->o_tid = ldap_pvt_thread_pool_tid( ctx );
+
+ op->o_counters = &slap_counters;
+ op->o_conn = conn;
+ op->o_connid = op->o_conn->c_connid;
+ connection_init_log_prefix( op );
+}
+
+
void
connection_fake_init2(
Connection *conn,
op->o_hdr = &opbuf->ob_hdr;
op->o_controls = opbuf->ob_controls;
- /* set memory context */
- op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx,
- newmem );
- op->o_tmpmfuncs = &slap_sl_mfuncs;
- op->o_threadctx = ctx;
- op->o_tid = ldap_pvt_thread_pool_tid( ctx );
-
- op->o_counters = &slap_counters;
- op->o_conn = conn;
- op->o_connid = op->o_conn->c_connid;
- connection_init_log_prefix( op );
+ operation_fake_init( conn, op, ctx, newmem );
#ifdef LDAP_SLAPI
if ( slapi_plugins_used ) {
void *ebx = NULL;
/* Use thread keys to make sure these eventually get cleaned up */
- if ( ldap_pvt_thread_pool_getkey( ctx, connection_fake_init, &ebx,
- NULL )) {
+ if ( ldap_pvt_thread_pool_getkey( ctx, (void *)connection_fake_init,
+ &ebx, NULL )) {
eb = ch_malloc( sizeof( *eb ));
slapi_int_create_object_extensions( SLAPI_X_EXT_CONNECTION, conn );
slapi_int_create_object_extensions( SLAPI_X_EXT_OPERATION, op );
eb->eb_conn = conn->c_extensions;
eb->eb_op = op->o_hdr->oh_extensions;
- ldap_pvt_thread_pool_setkey( ctx, connection_fake_init, eb,
- connection_fake_destroy );
+ ldap_pvt_thread_pool_setkey( ctx, (void *)connection_fake_init,
+ eb, connection_fake_destroy, NULL, NULL );
} else {
eb = ebx;
conn->c_extensions = eb->eb_conn;