From: Howard Chu Date: Thu, 25 Oct 2007 02:22:40 +0000 (+0000) Subject: Concurrency tweaks: X-Git-Tag: OPENLDAP_REL_ENG_2_4_9~20^2~476 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=68ebee4726e0080a4dc3c07add3012b65a105a31;p=openldap Concurrency tweaks: store conn->c_sd, don't use ber_sockbuf_ctrl to retrieve it. use per-thread free lists for operations, no mutexes needed. --- diff --git a/servers/slapd/abandon.c b/servers/slapd/abandon.c index c77dc1e25e..3f8b608c45 100644 --- a/servers/slapd/abandon.c +++ b/servers/slapd/abandon.c @@ -98,7 +98,7 @@ do_abandon( Operation *op, SlapReply *rs ) o, Operation, o_next ); LDAP_STAILQ_NEXT(o, o_next) = NULL; op->o_conn->c_n_ops_pending--; - slap_op_free( o ); + slap_op_free( o, NULL ); break; } } diff --git a/servers/slapd/cancel.c b/servers/slapd/cancel.c index dd4c79730c..d11ad6a001 100644 --- a/servers/slapd/cancel.c +++ b/servers/slapd/cancel.c @@ -70,7 +70,7 @@ int cancel_extop( Operation *op, SlapReply *rs ) LDAP_STAILQ_REMOVE( &op->o_conn->c_pending_ops, o, Operation, o_next ); LDAP_STAILQ_NEXT(o, o_next) = NULL; op->o_conn->c_n_ops_pending--; - slap_op_free( o ); + slap_op_free( o, NULL ); ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); return LDAP_SUCCESS; } diff --git a/servers/slapd/connection.c b/servers/slapd/connection.c index 9ec67f503a..d179496497 100644 --- a/servers/slapd/connection.c +++ b/servers/slapd/connection.c @@ -89,6 +89,7 @@ typedef struct conn_readinfo { Operation *op; ldap_pvt_thread_start_t *func; void *arg; + void *ctx; int nullop; } conn_readinfo; @@ -267,8 +268,6 @@ static Connection* connection_get( ber_socket_t s ) #else c = NULL; { - ber_socket_t i, sd; - ldap_pvt_thread_mutex_lock( &connections_mutex ); for(i=0; ic_mutex ); assert( c->c_struct_state != SLAP_C_UNINITIALIZED ); - ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd ); #ifdef HAVE_WINSOCK /* Avoid race condition after releasing * connections_mutex */ - if ( sd != s ) { + if ( c->c_sd != s ) { ldap_pvt_thread_mutex_unlock( &c->c_mutex ); return NULL; } @@ -323,7 +316,7 @@ static Connection* connection_get( ber_socket_t s ) /* connection must have been closed due to resched */ assert( c->c_conn_state == SLAP_C_INVALID ); - assert( sd == AC_SOCKET_INVALID ); + assert( c->c_sd == AC_SOCKET_INVALID ); Debug( LDAP_DEBUG_TRACE, "connection_get(%d): connection not used\n", @@ -341,7 +334,7 @@ static Connection* connection_get( ber_socket_t s ) assert( c->c_struct_state == SLAP_C_USED ); assert( c->c_conn_state != SLAP_C_INVALID ); - assert( sd != AC_SOCKET_INVALID ); + assert( c->c_sd != AC_SOCKET_INVALID ); #ifndef SLAPD_MONITOR if ( global_idletimeout > 0 ) @@ -405,8 +398,6 @@ Connection * connection_init( ldap_pvt_thread_mutex_lock( &connections_mutex ); for( i=0; i < dtblsize; i++) { - ber_socket_t sd; - if ( connections[i].c_struct_state == SLAP_C_PENDING ) continue; @@ -418,14 +409,7 @@ Connection * connection_init( break; } - sd = AC_SOCKET_INVALID; - if (connections[i].c_sb != NULL) { - ber_sockbuf_ctrl( connections[i].c_sb, - LBER_SB_OPT_GET_FD, &sd ); - } - if( connections[i].c_struct_state == SLAP_C_UNUSED ) { - assert( sd == AC_SOCKET_INVALID ); c = &connections[i]; c->c_struct_state = SLAP_C_PENDING; break; @@ -435,7 +419,6 @@ Connection * connection_init( assert( connections[i].c_struct_state == SLAP_C_USED ); assert( connections[i].c_conn_state != SLAP_C_INVALID ); - assert( sd != AC_SOCKET_INVALID ); } ldap_pvt_thread_mutex_unlock( &connections_mutex ); @@ -525,6 +508,7 @@ Connection * connection_init( assert( c->c_writewaiter == 0); c->c_listener = listener; + c->c_sd = s; if ( flags & CONN_IS_CLIENT ) { c->c_connid = 0; @@ -677,7 +661,6 @@ void connection2anonymous( Connection *c ) static void connection_destroy( Connection *c ) { - ber_socket_t sd; unsigned long connid; const char *close_reason; Sockbuf *sb; @@ -754,21 +737,20 @@ connection_destroy( Connection *c ) ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max ); } - ber_sockbuf_ctrl( sb, LBER_SB_OPT_GET_FD, &sd ); - /* c must be fully reset by this point; when we call slapd_remove * it may get immediately reused by a new connection. */ - if ( sd != AC_SOCKET_INVALID ) { - slapd_remove( sd, sb, 1, 0, 0 ); + if ( c->c_sd != AC_SOCKET_INVALID ) { + slapd_remove( c->c_sd, sb, 1, 0, 0 ); if ( close_reason == NULL ) { Statslog( LDAP_DEBUG_STATS, "conn=%lu fd=%ld closed\n", - connid, (long) sd, 0, 0, 0 ); + connid, (long) c->c_sd, 0, 0, 0 ); } else { Statslog( LDAP_DEBUG_STATS, "conn=%lu fd=%ld closed (%s)\n", - connid, (long) sd, close_reason, 0, 0 ); + connid, (long) c->c_sd, close_reason, 0, 0 ); } + c->c_sd = AC_SOCKET_INVALID; } } @@ -813,7 +795,7 @@ static void connection_abandon( Connection *c ) while ( (o = LDAP_STAILQ_FIRST( &c->c_txn_ops )) != NULL) { LDAP_STAILQ_REMOVE_HEAD( &c->c_txn_ops, o_next ); LDAP_STAILQ_NEXT(o, o_next) = NULL; - slap_op_free( o ); + slap_op_free( o, NULL ); } /* clear transaction */ @@ -825,7 +807,7 @@ static void connection_abandon( Connection *c ) while ( (o = LDAP_STAILQ_FIRST( &c->c_pending_ops )) != NULL) { LDAP_STAILQ_REMOVE_HEAD( &c->c_pending_ops, o_next ); LDAP_STAILQ_NEXT(o, o_next) = NULL; - slap_op_free( o ); + slap_op_free( o, NULL ); } } @@ -839,18 +821,15 @@ void connection_closing( Connection *c, const char *why ) /* c_mutex must be locked by caller */ if( c->c_conn_state != SLAP_C_CLOSING ) { - ber_socket_t sd; - - ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd ); Debug( LDAP_DEBUG_TRACE, "connection_closing: readying conn=%lu sd=%d for close\n", - c->c_connid, sd, 0 ); + c->c_connid, c->c_sd, 0 ); /* update state to closing */ c->c_conn_state = SLAP_C_CLOSING; c->c_close_reason = why; /* don't listen on this port anymore */ - slapd_clr_read( sd, 0 ); + slapd_clr_read( c->c_sd, 0 ); /* abandon active operations */ connection_abandon( c ); @@ -862,13 +841,13 @@ void connection_closing( Connection *c, const char *why ) * connection_resched / connection_close before we * finish, but that's OK. */ - slapd_clr_write( sd, 1 ); + slapd_clr_write( c->c_sd, 1 ); ldap_pvt_thread_mutex_unlock( &c->c_mutex ); ldap_pvt_thread_mutex_lock( &c->c_write_mutex ); ldap_pvt_thread_mutex_lock( &c->c_mutex ); ldap_pvt_thread_mutex_unlock( &c->c_write_mutex ); } else { - slapd_clr_write( sd, 1 ); + slapd_clr_write( c->c_sd, 1 ); } } else if( why == NULL && c->c_close_reason == conn_lost_str ) { @@ -880,8 +859,6 @@ void connection_closing( Connection *c, const char *why ) static void connection_close( Connection *c ) { - ber_socket_t sd = AC_SOCKET_INVALID; - assert( connections != NULL ); assert( c != NULL ); @@ -894,22 +871,17 @@ connection_close( Connection *c ) /* NOTE: c_mutex should be locked by caller */ - /* NOTE: don't get the file descriptor if not needed */ - if ( LogTest( LDAP_DEBUG_TRACE ) ) { - ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd ); - } - if ( !LDAP_STAILQ_EMPTY(&c->c_ops) || !LDAP_STAILQ_EMPTY(&c->c_pending_ops) ) { Debug( LDAP_DEBUG_TRACE, "connection_close: deferring conn=%lu sd=%d\n", - c->c_connid, sd, 0 ); + c->c_connid, c->c_sd, 0 ); return; } Debug( LDAP_DEBUG_TRACE, "connection_close: conn=%lu sd=%d\n", - c->c_connid, sd, 0 ); + c->c_connid, c->c_sd, 0 ); connection_destroy( c ); } @@ -1175,7 +1147,6 @@ operations_error: LDAP_STAILQ_REMOVE( &conn->c_ops, op, Operation, o_next); LDAP_STAILQ_NEXT(op, o_next) = NULL; - slap_op_free( op ); conn->c_n_ops_executing--; conn->c_n_ops_completed++; @@ -1190,6 +1161,7 @@ operations_error: connection_resched( conn ); ldap_pvt_thread_mutex_unlock( &conn->c_mutex ); + slap_op_free( op, ctx ); return NULL; } @@ -1261,6 +1233,7 @@ static void* connection_read_thread( void* ctx, void* argv ) * read incoming LDAP requests. If there is more than one, * the first one is returned with new_op */ + cri.ctx = ctx; if( ( rc = connection_read( s, &cri ) ) < 0 ) { Debug( LDAP_DEBUG_CONNS, "connection_read(%d) error\n", s, 0, 0 ); return (void*)(long)rc; @@ -1509,6 +1482,7 @@ connection_input( Connection *conn ) char *cdn = NULL; #endif char *defer = NULL; + void *ctx; if ( conn->c_currentber == NULL && ( conn->c_currentber = ber_alloc()) == NULL ) @@ -1538,15 +1512,12 @@ connection_input( Connection *conn ) tag = ber_get_next( conn->c_sb, &len, conn->c_currentber ); if ( tag != LDAP_TAG_MESSAGE ) { int err = sock_errno(); - ber_socket_t sd; - - ber_sockbuf_ctrl( conn->c_sb, LBER_SB_OPT_GET_FD, &sd ); if ( err != EWOULDBLOCK && err != EAGAIN ) { /* log, close and send error */ Debug( LDAP_DEBUG_TRACE, "ber_get_next on fd %d failed errno=%d (%s)\n", - sd, err, sock_errstr(err) ); + conn->c_sd, err, sock_errstr(err) ); ber_free( conn->c_currentber, 1 ); conn->c_currentber = NULL; @@ -1592,7 +1563,12 @@ connection_input( Connection *conn ) connection_abandon( conn ); } - op = slap_op_alloc( ber, msgid, tag, conn->c_n_ops_received++ ); +#ifdef SLAP_LIGHTWEIGHT_DISPATCHER + ctx = cri->ctx; +#else + ctx = NULL; +#endif + op = slap_op_alloc( ber, msgid, tag, conn->c_n_ops_received++, ctx ); op->o_conn = conn; /* clear state if the connection is being reused from inactive */ @@ -1723,12 +1699,9 @@ connection_resched( Connection *conn ) Operation *op; if( conn->c_conn_state == SLAP_C_CLOSING ) { - ber_socket_t sd; - ber_sockbuf_ctrl( conn->c_sb, LBER_SB_OPT_GET_FD, &sd ); - Debug( LDAP_DEBUG_TRACE, "connection_resched: " "attempting closing conn=%lu sd=%d\n", - conn->c_connid, sd, 0 ); + conn->c_connid, conn->c_sd, 0 ); connection_close( conn ); return 0; } @@ -2010,6 +1983,7 @@ connection_fake_init2( Operation *op = (Operation *) opbuf; conn->c_connid = -1; + conn->c_conn_idx = -1; conn->c_send_ldap_result = slap_send_ldap_result; conn->c_send_search_entry = slap_send_search_entry; conn->c_send_search_reference = slap_send_search_reference; diff --git a/servers/slapd/operation.c b/servers/slapd/operation.c index b5b2f3d1e6..124a3b85b9 100644 --- a/servers/slapd/operation.c +++ b/servers/slapd/operation.c @@ -38,26 +38,27 @@ #endif static ldap_pvt_thread_mutex_t slap_op_mutex; -static LDAP_STAILQ_HEAD(s_o, Operation) slap_free_ops; static time_t last_time; static int last_incr; void slap_op_init(void) { ldap_pvt_thread_mutex_init( &slap_op_mutex ); - LDAP_STAILQ_INIT(&slap_free_ops); } void slap_op_destroy(void) { - Operation *o; + ldap_pvt_thread_mutex_destroy( &slap_op_mutex ); +} - while ( (o = LDAP_STAILQ_FIRST( &slap_free_ops )) != NULL) { - LDAP_STAILQ_REMOVE_HEAD( &slap_free_ops, o_next ); - LDAP_STAILQ_NEXT(o, o_next) = NULL; - ch_free( o ); +static void +slap_op_q_destroy( void *key, void *data ) +{ + Operation *op, *op2; + for ( op = data; op; op = op2 ) { + op2 = LDAP_STAILQ_NEXT( op, o_next ); + ber_memfree_x( op, NULL ); } - ldap_pvt_thread_mutex_destroy( &slap_op_mutex ); } void @@ -72,7 +73,7 @@ slap_op_groups_free( Operation *op ) } void -slap_op_free( Operation *op ) +slap_op_free( Operation *op, void *ctx ) { OperationBuffer *opbuf; @@ -110,15 +111,22 @@ slap_op_free( Operation *op ) } #endif /* defined( LDAP_SLAPI ) */ - opbuf = (OperationBuffer *) op; memset( opbuf, 0, sizeof(*opbuf) ); op->o_hdr = &opbuf->ob_hdr; op->o_controls = opbuf->ob_controls; - ldap_pvt_thread_mutex_lock( &slap_op_mutex ); - LDAP_STAILQ_INSERT_HEAD( &slap_free_ops, op, o_next ); - ldap_pvt_thread_mutex_unlock( &slap_op_mutex ); + if ( ctx ) { + Operation *op2; + void *otmp = NULL; + ldap_pvt_thread_pool_getkey( ctx, (void *)slap_op_free, &otmp, NULL ); + op2 = otmp; + LDAP_STAILQ_NEXT( op, o_next ) = op2; + ldap_pvt_thread_pool_setkey( ctx, (void *)slap_op_free, (void *)op, + slap_op_q_destroy ); + } else { + ber_memfree_x( op, NULL ); + } } void @@ -141,16 +149,21 @@ slap_op_alloc( BerElement *ber, ber_int_t msgid, ber_tag_t tag, - ber_int_t id ) + ber_int_t id, + void *ctx ) { - Operation *op; - - ldap_pvt_thread_mutex_lock( &slap_op_mutex ); - if ((op = LDAP_STAILQ_FIRST( &slap_free_ops ))) { - LDAP_STAILQ_REMOVE_HEAD( &slap_free_ops, o_next ); + Operation *op = NULL; + + if ( ctx ) { + void *otmp = NULL; + ldap_pvt_thread_pool_getkey( ctx, (void *)slap_op_free, &otmp, NULL ); + if ( otmp ) { + op = otmp; + otmp = LDAP_STAILQ_NEXT( op, o_next ); + ldap_pvt_thread_pool_setkey( ctx, (void *)slap_op_free, otmp, + slap_op_q_destroy ); + } } - ldap_pvt_thread_mutex_unlock( &slap_op_mutex ); - if (!op) { op = (Operation *) ch_calloc( 1, sizeof(OperationBuffer) ); op->o_hdr = &((OperationBuffer *) op)->ob_hdr; diff --git a/servers/slapd/proto-slap.h b/servers/slapd/proto-slap.h index fbbc9ae675..2a56f59c12 100644 --- a/servers/slapd/proto-slap.h +++ b/servers/slapd/proto-slap.h @@ -1372,11 +1372,11 @@ LDAP_SLAPD_F (int) parse_oidm LDAP_P(( LDAP_SLAPD_F (void) slap_op_init LDAP_P(( void )); LDAP_SLAPD_F (void) slap_op_destroy LDAP_P(( void )); LDAP_SLAPD_F (void) slap_op_groups_free LDAP_P(( Operation *op )); -LDAP_SLAPD_F (void) slap_op_free LDAP_P(( Operation *op )); +LDAP_SLAPD_F (void) slap_op_free LDAP_P(( Operation *op, void *ctx )); LDAP_SLAPD_F (void) slap_op_time LDAP_P(( time_t *t, int *n )); LDAP_SLAPD_F (Operation *) slap_op_alloc LDAP_P(( BerElement *ber, ber_int_t msgid, - ber_tag_t tag, ber_int_t id )); + ber_tag_t tag, ber_int_t id, void *ctx )); LDAP_SLAPD_F (int) slap_op_add LDAP_P(( Operation **olist, Operation *op )); LDAP_SLAPD_F (int) slap_op_remove LDAP_P(( Operation **olist, Operation *op )); diff --git a/servers/slapd/result.c b/servers/slapd/result.c index c3e3861590..2befb75233 100644 --- a/servers/slapd/result.c +++ b/servers/slapd/result.c @@ -149,7 +149,6 @@ static long send_ldap_ber( /* write the pdu */ while( 1 ) { int err; - ber_socket_t sd; if ( connection_state_closing( conn ) ) { ldap_pvt_thread_mutex_unlock( &conn->c_mutex ); @@ -184,8 +183,7 @@ static long send_ldap_ber( /* wait for socket to be write-ready */ conn->c_writewaiter = 1; - ber_sockbuf_ctrl( conn->c_sb, LBER_SB_OPT_GET_FD, &sd ); - slapd_set_write( sd, 1 ); + slapd_set_write( conn->c_sd, 1 ); ldap_pvt_thread_cond_wait( &conn->c_write_cv, &conn->c_mutex ); conn->c_writewaiter = 0; diff --git a/servers/slapd/slap.h b/servers/slapd/slap.h index eae710d0ce..13b1224673 100644 --- a/servers/slapd/slap.h +++ b/servers/slapd/slap.h @@ -2672,6 +2672,7 @@ struct Connection { ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */ Sockbuf *c_sb; /* ber connection stuff */ + ber_socket_t c_sd; /* only can be changed by connect_init */ time_t c_starttime; /* when the connection was opened */