static int connection_input( Connection *c );
static void connection_close( Connection *c );
-static int connection_op_activate( Connection *conn, Operation *op );
+static int connection_op_activate( Operation *op );
static int connection_resched( Connection *conn );
static void connection_abandon( Connection *conn );
static void connection_destroy( Connection *c );
static ldap_pvt_thread_start_t connection_operation;
-struct co_arg {
- Connection *co_conn;
- Operation *co_op;
-};
-
/*
* Initialize connection management infrastructure.
*/
/* should check return of every call */
ldap_pvt_thread_mutex_init( &connections_mutex );
- connections = (Connection *) calloc( dtblsize, sizeof(Connection) );
+ connections = (Connection *) ch_calloc( dtblsize, sizeof(Connection) );
if( connections == NULL ) {
#ifdef NEW_LOGGING
if( c->c_struct_state == SLAP_C_UNINITIALIZED ) {
c->c_send_ldap_result = slap_send_ldap_result;
c->c_send_search_entry = slap_send_search_entry;
- c->c_send_search_result = slap_send_search_result;
c->c_send_search_reference = slap_send_search_reference;
c->c_send_ldap_extended = slap_send_ldap_extended;
+#ifdef LDAP_RES_INTERMEDIATE_RESP
+ c->c_send_ldap_intermediate_resp = slap_send_ldap_intermediate_resp;
+#endif
c->c_authmech.bv_val = NULL;
c->c_authmech.bv_len = 0;
static void *
connection_operation( void *ctx, void *arg_v )
{
- int rc;
- struct co_arg *arg = arg_v;
- ber_tag_t tag = arg->co_op->o_tag;
+ int rc = SLAPD_DISCONNECT;
+ Operation *op = arg_v;
+ SlapReply rs = {REP_RESULT};
+ ber_tag_t tag = op->o_tag;
#ifdef SLAPD_MONITOR
ber_tag_t oldtag = tag;
#endif /* SLAPD_MONITOR */
- Connection *conn = arg->co_conn;
+ Connection *conn = op->o_conn;
+ void *memctx;
+ ber_len_t memsiz;
ldap_pvt_thread_mutex_lock( &num_ops_mutex );
num_ops_initiated++;
ldap_pvt_thread_mutex_unlock( &num_ops_mutex );
- arg->co_op->o_threadctx = ctx;
+ op->o_threadctx = ctx;
if( conn->c_sasl_bind_in_progress && tag != LDAP_REQ_BIND ) {
#ifdef NEW_LOGGING
"error: SASL bind in progress (tag=%ld).\n",
(long) tag, 0, 0 );
#endif
- send_ldap_result( conn, arg->co_op,
- rc = LDAP_OPERATIONS_ERROR,
- NULL, "SASL bind in progress", NULL, NULL );
+ send_ldap_error( op, &rs, LDAP_OPERATIONS_ERROR,
+ "SASL bind in progress" );
goto operations_error;
}
+ /* We can use Thread-Local storage for most mallocs. We can
+ * also use TL for ber parsing, but not on Add or Modify.
+ */
+#define SLAB_SIZE 1048576
+#if 0
+ memsiz = ber_len( op->o_ber ) * 64;
+ if ( SLAB_SIZE > memsiz ) memsiz = SLAB_SIZE;
+#endif
+ memsiz = SLAB_SIZE;
+
+ memctx = sl_mem_create( memsiz, ctx );
+ op->o_tmpmemctx = memctx;
+ op->o_tmpmfuncs = &sl_mfuncs;
+ if ( tag != LDAP_REQ_ADD && tag != LDAP_REQ_MODIFY ) {
+ ber_set_option( op->o_ber, LBER_OPT_BER_MEMCTX, memctx );
+ }
+
switch ( tag ) {
case LDAP_REQ_BIND:
INCR_OP(num_ops_initiated_, SLAP_OP_BIND);
- rc = do_bind( conn, arg->co_op );
+ rc = do_bind( op, &rs );
break;
case LDAP_REQ_UNBIND:
INCR_OP(num_ops_initiated_, SLAP_OP_UNBIND);
- rc = do_unbind( conn, arg->co_op );
+ rc = do_unbind( op, &rs );
break;
case LDAP_REQ_ADD:
INCR_OP(num_ops_initiated_, SLAP_OP_ADD);
- rc = do_add( conn, arg->co_op );
+ rc = do_add( op, &rs );
break;
case LDAP_REQ_DELETE:
INCR_OP(num_ops_initiated_, SLAP_OP_DELETE);
- rc = do_delete( conn, arg->co_op );
+ rc = do_delete( op, &rs );
break;
case LDAP_REQ_MODRDN:
INCR_OP(num_ops_initiated_, SLAP_OP_MODRDN);
- rc = do_modrdn( conn, arg->co_op );
+ rc = do_modrdn( op, &rs );
break;
case LDAP_REQ_MODIFY:
INCR_OP(num_ops_initiated_, SLAP_OP_MODIFY);
- rc = do_modify( conn, arg->co_op );
+ rc = do_modify( op, &rs );
break;
case LDAP_REQ_COMPARE:
INCR_OP(num_ops_initiated_, SLAP_OP_COMPARE);
- rc = do_compare( conn, arg->co_op );
+ rc = do_compare( op, &rs );
break;
case LDAP_REQ_SEARCH:
INCR_OP(num_ops_initiated_, SLAP_OP_SEARCH);
- rc = do_search( conn, arg->co_op );
+ rc = do_search( op, &rs );
break;
case LDAP_REQ_ABANDON:
INCR_OP(num_ops_initiated_, SLAP_OP_ABANDON);
- rc = do_abandon( conn, arg->co_op );
+ rc = do_abandon( op, &rs );
break;
case LDAP_REQ_EXTENDED:
INCR_OP(num_ops_initiated_, SLAP_OP_EXTENDED);
- rc = do_extended( conn, arg->co_op );
+ rc = do_extended( op, &rs );
break;
default:
Debug( LDAP_DEBUG_ANY, "unknown LDAP request 0x%lx\n",
tag, 0, 0 );
#endif
- arg->co_op->o_tag = LBER_ERROR;
- send_ldap_disconnect( conn, arg->co_op,
- LDAP_PROTOCOL_ERROR, "unknown LDAP request" );
+ op->o_tag = LBER_ERROR;
+ rs.sr_err = LDAP_PROTOCOL_ERROR;
+ rs.sr_text = "unknown LDAP request";
+ send_ldap_disconnect( op, &rs );
rc = -1;
break;
}
#endif /* SLAPD_MONITOR */
ldap_pvt_thread_mutex_unlock( &num_ops_mutex );
- if ( arg->co_op->o_cancel == LDAP_CANCEL_REQ )
- arg->co_op->o_cancel = LDAP_TOO_LATE;
+#ifdef LDAP_EXOP_X_CANCEL
+ if ( op->o_cancel == SLAP_CANCEL_REQ ) {
+ op->o_cancel = LDAP_TOO_LATE;
+ }
- while ( arg->co_op->o_cancel != LDAP_CANCEL_NONE &&
- arg->co_op->o_cancel != LDAP_CANCEL_ACK &&
- arg->co_op->o_cancel != LDAP_CANCEL_NOTDONE ) {
- ldap_pvt_thread_yield();
+ while ( op->o_cancel != SLAP_CANCEL_NONE &&
+ op->o_cancel != SLAP_CANCEL_DONE )
+ {
+ ldap_pvt_thread_yield();
}
+#endif
ldap_pvt_thread_mutex_lock( &conn->c_mutex );
conn->c_n_ops_executing--;
conn->c_n_ops_completed++;
- LDAP_STAILQ_REMOVE( &conn->c_ops, arg->co_op, slap_op, o_next);
- LDAP_STAILQ_NEXT(arg->co_op, o_next) = NULL;
+ LDAP_STAILQ_REMOVE( &conn->c_ops, op, slap_op, o_next);
+ LDAP_STAILQ_NEXT(op, o_next) = NULL;
+
+#if defined(LDAP_CLIENT_UPDATE) || defined(LDAP_SYNC)
+ if ( op->o_cancel == SLAP_CANCEL_ACK )
+ goto co_op_free;
+#endif
#ifdef LDAP_CLIENT_UPDATE
- if ( !( arg->co_op->o_clientupdate_type & SLAP_LCUP_PERSIST ) )
-#endif /* LDAP_CLIENT_UPDATE */
- {
- slap_op_free( arg->co_op );
- }
- arg->co_op = NULL;
- arg->co_conn = NULL;
- free( (char *) arg );
- arg = NULL;
+ if ( ( op->o_clientupdate_type & SLAP_LCUP_PERSIST ) )
+ goto no_co_op_free;
+#endif
+#ifdef LDAP_SYNC
+ if ( ( op->o_sync_mode & SLAP_SYNC_PERSIST ) )
+ goto no_co_op_free;
+#endif
+
+co_op_free:
+
+ ber_set_option( op->o_ber, LBER_OPT_BER_MEMCTX, NULL );
+ slap_op_free( op );
+
+no_co_op_free:
switch( tag ) {
case LBER_ERROR:
ber_len_t len;
ber_int_t msgid;
BerElement *ber;
+ int rc;
#ifdef LDAP_CONNECTIONLESS
Sockaddr peeraddr;
char *cdn = NULL;
op->o_conn = conn;
op->vrFilter = NULL;
+#ifdef LDAP_CONTROL_PAGEDRESULTS
op->o_pagedresults_state = conn->c_pagedresults_state;
+#endif
#ifdef LDAP_CONNECTIONLESS
- op->o_peeraddr = peeraddr;
- if (cdn ) {
- ber_str2bv( cdn, 0, 1, &op->o_dn );
- op->o_protocol = LDAP_VERSION2;
- }
if (conn->c_is_udp) {
- int rc;
+ if ( cdn ) {
+ ber_str2bv( cdn, 0, 1, &op->o_dn );
+ op->o_protocol = LDAP_VERSION2;
+ }
op->o_res_ber = ber_alloc_t( LBER_USE_DER );
if (op->o_res_ber == NULL)
return 1;
- rc = ber_write(op->o_res_ber, (char *)&op->o_peeraddr, sizeof(struct sockaddr), 0);
+ rc = ber_write(op->o_res_ber, (char *)&peeraddr, sizeof(struct sockaddr), 0);
if (rc != sizeof(struct sockaddr)) {
#ifdef NEW_LOGGING
LDAP_LOG( CONNECTION, INFO,
return 1;
}
- if (conn->c_protocol == LDAP_VERSION2) {
- rc = ber_printf(op->o_res_ber, "{i{" /*}}*/, op->o_msgid);
+ if (op->o_protocol == LDAP_VERSION2) {
+ rc = ber_printf(op->o_res_ber, "{is{" /*}}*/, op->o_msgid, "");
if (rc == -1) {
#ifdef NEW_LOGGING
LDAP_LOG( CONNECTION, INFO,
}
#endif /* LDAP_CONNECTIONLESS */
+ rc = 0;
+
+ /* Don't process requests when the conn is in the middle of a
+ * Bind, or if it's closing. Also, don't let any single conn
+ * use up all the available threads, and don't execute if we're
+ * currently blocked on output. And don't execute if there are
+ * already pending ops, let them go first.
+ */
if ( conn->c_conn_state == SLAP_C_BINDING
- || conn->c_conn_state == SLAP_C_CLOSING )
+ || conn->c_conn_state == SLAP_C_CLOSING
+ || conn->c_n_ops_executing >= connection_pool_max/2
+ || conn->c_n_ops_pending
+ || conn->c_writewaiter)
{
+ int max = conn->c_dn.bv_len ? slap_conn_max_pending_auth
+ : slap_conn_max_pending;
#ifdef NEW_LOGGING
LDAP_LOG( CONNECTION, INFO,
"connection_input: conn %lu deferring operation\n",
#endif
conn->c_n_ops_pending++;
LDAP_STAILQ_INSERT_TAIL( &conn->c_pending_ops, op, o_next );
-
+ if ( conn->c_n_ops_pending > max ) {
+ rc = -1;
+ } else {
+ rc = 1;
+ }
} else {
conn->c_n_ops_executing++;
- connection_op_activate( conn, op );
+ connection_op_activate( op );
}
#ifdef NO_THREADS
#endif
assert( conn->c_struct_state == SLAP_C_USED );
- return 0;
+ return rc;
}
static int
return 0;
}
- if( conn->c_conn_state != SLAP_C_ACTIVE ) {
+ if( conn->c_conn_state != SLAP_C_ACTIVE || conn->c_writewaiter ) {
/* other states need different handling */
return 0;
}
while ((op = LDAP_STAILQ_FIRST( &conn->c_pending_ops )) != NULL) {
+ if ( conn->c_n_ops_executing > connection_pool_max/2 ) {
+ break;
+ }
LDAP_STAILQ_REMOVE_HEAD( &conn->c_pending_ops, o_next );
LDAP_STAILQ_NEXT(op, o_next) = NULL;
/* pending operations should not be marked for abandonment */
conn->c_n_ops_pending--;
conn->c_n_ops_executing++;
- connection_op_activate( conn, op );
+ connection_op_activate( op );
if ( conn->c_conn_state == SLAP_C_BINDING ) {
break;
return 0;
}
-static int connection_op_activate( Connection *conn, Operation *op )
+static int connection_op_activate( Operation *op )
{
- struct co_arg *arg;
int status;
ber_tag_t tag = op->o_tag;
if(tag == LDAP_REQ_BIND) {
- conn->c_conn_state = SLAP_C_BINDING;
+ op->o_conn->c_conn_state = SLAP_C_BINDING;
}
- arg = (struct co_arg *) ch_malloc( sizeof(struct co_arg) );
- arg->co_conn = conn;
- arg->co_op = op;
-
- if (!arg->co_op->o_dn.bv_len) {
- arg->co_op->o_authz = conn->c_authz;
- arg->co_op->o_dn.bv_val = ch_strdup( conn->c_dn.bv_val ?
- conn->c_dn.bv_val : "" );
- arg->co_op->o_ndn.bv_val = ch_strdup( conn->c_ndn.bv_val ?
- conn->c_ndn.bv_val : "" );
+ if (!op->o_dn.bv_len) {
+ op->o_authz = op->o_conn->c_authz;
+ ber_dupbv( &op->o_dn, &op->o_conn->c_dn );
+ ber_dupbv( &op->o_ndn, &op->o_conn->c_ndn );
}
- arg->co_op->o_authtype = conn->c_authtype;
- ber_dupbv( &arg->co_op->o_authmech, &conn->c_authmech );
+ op->o_authtype = op->o_conn->c_authtype;
+ ber_dupbv( &op->o_authmech, &op->o_conn->c_authmech );
- if (!arg->co_op->o_protocol) {
- arg->co_op->o_protocol = conn->c_protocol
- ? conn->c_protocol : LDAP_VERSION3;
+ if (!op->o_protocol) {
+ op->o_protocol = op->o_conn->c_protocol
+ ? op->o_conn->c_protocol : LDAP_VERSION3;
+ }
+ if (op->o_conn->c_conn_state == SLAP_C_INACTIVE
+ && op->o_protocol > LDAP_VERSION2) {
+ op->o_conn->c_conn_state = SLAP_C_ACTIVE;
}
- arg->co_op->o_connid = conn->c_connid;
- LDAP_STAILQ_INSERT_TAIL( &conn->c_ops, arg->co_op, o_next );
+ op->o_connid = op->o_conn->c_connid;
+
+ LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op, o_next );
status = ldap_pvt_thread_pool_submit( &connection_pool,
- connection_operation, (void *) arg );
+ connection_operation, (void *) op );
if ( status != 0 ) {
#ifdef NEW_LOGGING
LDAP_LOG( CONNECTION, ERR,
"connection_op_activate: conn %lu thread pool submit failed.\n",
- conn->c_connid, 0, 0 );
+ op->o_connid, 0, 0 );
#else
Debug( LDAP_DEBUG_ANY,
"ldap_pvt_thread_pool_submit failed (%d)\n", status, 0, 0 );