]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/connection.c
ITS#4088 force cursors to use same locker
[openldap] / servers / slapd / connection.c
index d4f77c908e90208645a671cdea475461aa601665..89de750871e83e2aba75a4f7e428cbdee03ee68a 100644 (file)
 static ldap_pvt_thread_mutex_t* connections_mutex;
 static Connection **connections = NULL;
 
-/* set to the number of processors */
-#define NUM_CONNECTION_ARRAY 2
+/* set to the number of processors (round up to a power of 2) */
+#      define NUM_CONNECTION_ARRAY 4
 
 /* partition the array in a modulo manner */
-#define MCA_conn_array_id( fd ) ((int)fd%NUM_CONNECTION_ARRAY)
-#define MCA_conn_array_element_id( fd ) ((int)fd/NUM_CONNECTION_ARRAY)
+#      define MCA_conn_array_id(fd)            ((int)(fd)%NUM_CONNECTION_ARRAY)
+#      define MCA_conn_array_element_id(fd)    ((int)(fd)/NUM_CONNECTION_ARRAY)
+#      define MCA_GET_CONNECTION(fd) (&(connections[MCA_conn_array_id(fd)]) \
+               [MCA_conn_array_element_id(fd)])
+#      define MCA_GET_CONN_MUTEX(fd) (&connections_mutex[MCA_conn_array_id(fd)])
 
-#define MCA_GET_CONNECTION(fd) &(connections[MCA_conn_array_id(fd)])[MCA_conn_array_element_id( fd )]
-#define MCA_GET_CONN_MUTEX(fd) &connections_mutex[MCA_conn_array_id(fd)]
 #else
 /* protected by connections_mutex */
 static ldap_pvt_thread_mutex_t connections_mutex;
 static Connection *connections = NULL;
+
+#      define MCA_GET_CONNECTION(fd) (&connections[s])
+#      define MCA_GET_CONN_MUTEX(fd) (&connections_mutex)
 #endif
 
 static ldap_pvt_thread_mutex_t conn_nextid_mutex;
@@ -99,7 +103,7 @@ connection_state2str( int state )
 
 static Connection* connection_get( ber_socket_t s );
 
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
 static int connection_input( Connection *c, Operation** op );
 #else
 static int connection_input( Connection *c );
@@ -107,7 +111,7 @@ static int connection_input( Connection *c );
 static void connection_close( Connection *c );
 
 static int connection_op_activate( Operation *op );
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
 static void connection_op_queue( Operation *op );
 #endif
 static int connection_resched( Connection *conn );
@@ -133,26 +137,29 @@ int connections_init(void)
                return -1;
        }
 
-       connections_mutex = (ldap_pvt_thread_mutex_t*) ch_calloc( NUM_CONNECTION_ARRAY, sizeof(ldap_pvt_thread_mutex_t) );
+       connections_mutex = (ldap_pvt_thread_mutex_t*) ch_calloc(
+               NUM_CONNECTION_ARRAY, sizeof(ldap_pvt_thread_mutex_t) );
        if( connections_mutex == NULL ) {
-               Debug( LDAP_DEBUG_ANY,
-                               "connections_init: allocation of connection mutex[%d] failed\n", i, 0, 0 );
+               Debug( LDAP_DEBUG_ANY, "connections_init: "
+                       "allocation of connection mutex[%d] failed\n", i, 0, 0 );
                return -1;
        }
 
-       connections = (Connection**) ch_calloc( NUM_CONNECTION_ARRAY, sizeof(Connection*));
+       connections = (Connection**) ch_calloc(
+               NUM_CONNECTION_ARRAY, sizeof(Connection*));
        if( connections == NULL ) {
-               Debug( LDAP_DEBUG_ANY,
-                       "connections_init: allocation of connection[%d] failed\n", 0, 0, 0 );
+               Debug( LDAP_DEBUG_ANY, "connections_init: "
+                       "allocation of connection[%d] failed\n", 0, 0, 0 );
                return -1;
        }
 
        for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
                ldap_pvt_thread_mutex_init( connections_mutex+i );
-               connections[i] = (Connection*) ch_calloc( (dtblsize/NUM_CONNECTION_ARRAY), sizeof(Connection) );
+               connections[i] = (Connection*) ch_calloc(
+                       dtblsize/NUM_CONNECTION_ARRAY, sizeof(Connection) );
                if( connections[i] == NULL ) {
-                       Debug( LDAP_DEBUG_ANY,
-                               "connections_init: allocation (%d*%ld) of connection array[%d] failed\n",
+                       Debug( LDAP_DEBUG_ANY, "connections_init: "
+                               "allocation (%d*%ld) of connection array[%d] failed\n",
                                dtblsize, (long) sizeof(Connection), i );
                        return -1;
                }
@@ -162,7 +169,8 @@ int connections_init(void)
        ldap_pvt_thread_mutex_init( &conn_nextid_mutex );
 
        assert( connections[0]->c_struct_state == SLAP_C_UNINITIALIZED );
-       assert( connections[NUM_CONNECTION_ARRAY-1]->c_struct_state == SLAP_C_UNINITIALIZED );
+       assert( connections[NUM_CONNECTION_ARRAY-1]->c_struct_state ==
+               SLAP_C_UNINITIALIZED );
 
        for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
                conn = connections[i];
@@ -197,8 +205,8 @@ int connections_init(void)
        connections = (Connection *) ch_calloc( dtblsize, sizeof(Connection) );
 
        if( connections == NULL ) {
-               Debug( LDAP_DEBUG_ANY,
-                       "connections_init: allocation (%d*%ld) of connection array failed\n",
+               Debug( LDAP_DEBUG_ANY, "connections_init: "
+                       "allocation (%d*%ld) of connection array failed\n",
                        dtblsize, (long) sizeof(Connection), 0 );
                return -1;
        }
@@ -382,8 +390,9 @@ int connections_timeout_idle(time_t now)
        {
                /* Don't timeout a slow-running request or a persistent
                 * outbound connection */
-               if( c->c_n_ops_executing ||
-                       c->c_conn_state == SLAP_C_CLIENT ) continue;
+               if( c->c_n_ops_executing || c->c_conn_state == SLAP_C_CLIENT ) {
+                       continue;
+               }
 
                if( difftime( c->c_activitytime+global_idletimeout, now) < 0 ) {
                        /* close it */
@@ -409,16 +418,10 @@ static Connection* connection_get( ber_socket_t s )
 
        assert( connections != NULL );
 
-       if(s == AC_SOCKET_INVALID) {
-               return NULL;
-       }
+       if(s == AC_SOCKET_INVALID) return NULL;
 
 #ifndef HAVE_WINSOCK
-#ifdef SLAP_MULTI_CONN_ARRAY
        c = MCA_GET_CONNECTION(s);
-#else
-       c = &connections[s];
-#endif
 
        assert( c->c_struct_state != SLAP_C_UNINITIALIZED );
 
@@ -534,19 +537,10 @@ long connection_init(
        assert( s < dtblsize );
 #endif
 
-#ifdef SLAP_MULTI_CONN_ARRAY
        ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX(s) );
-#else
-       ldap_pvt_thread_mutex_lock( &connections_mutex );
-#endif
 
 #ifndef HAVE_WINSOCK
-#ifdef SLAP_MULTI_CONN_ARRAY
        c = MCA_GET_CONNECTION(s);
-#else
-       c = &connections[s];
-#endif
-
 #else
        {
                ber_socket_t i;
@@ -573,9 +567,7 @@ long connection_init(
                                break;
                        }
 
-                       if( connections[i].c_conn_state == SLAP_C_CLIENT ) {
-                               continue;
-                       }
+                       if( connections[i].c_conn_state == SLAP_C_CLIENT ) continue;
 
                        assert( connections[i].c_struct_state == SLAP_C_USED );
                        assert( connections[i].c_conn_state != SLAP_C_INVALID );
@@ -672,11 +664,7 @@ long connection_init(
                c->c_close_reason = "?";                        /* should never be needed */
                ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );
                ldap_pvt_thread_mutex_unlock( &c->c_mutex );
-#ifdef SLAP_MULTI_CONN_ARRAY
                ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-               ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
 
                return 0;
        }
@@ -764,11 +752,7 @@ long connection_init(
        slap_sasl_external( c, ssf, authid );
 
        ldap_pvt_thread_mutex_unlock( &c->c_mutex );
-#ifdef SLAP_MULTI_CONN_ARRAY
        ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-       ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
 
        backend_connection_init(c);
 
@@ -1045,9 +1029,7 @@ Connection* connection_next( Connection *c, ber_socket_t *index )
        assert( index != NULL );
        assert( *index <= (dtblsize/NUM_CONNECTION_ARRAY) );
 
-       if( c != NULL ) {
-               ldap_pvt_thread_mutex_unlock( &c->c_mutex );
-       }
+       if( c != NULL ) ldap_pvt_thread_mutex_unlock( &c->c_mutex );
 
        c = NULL;
 
@@ -1073,9 +1055,7 @@ Connection* connection_next( Connection *c, ber_socket_t *index )
                assert( conn->c_conn_state == SLAP_C_INVALID );
        }
 
-       if( c != NULL ) {
-               ldap_pvt_thread_mutex_lock( &c->c_mutex );
-       }
+       if( c != NULL ) ldap_pvt_thread_mutex_lock( &c->c_mutex );
 
        return c;
 
@@ -1086,9 +1066,7 @@ Connection* connection_next( Connection *c, ber_socket_t *index )
        assert( index != NULL );
        assert( *index <= dtblsize );
 
-       if( c != NULL ) {
-               ldap_pvt_thread_mutex_unlock( &c->c_mutex );
-       }
+       if( c != NULL ) ldap_pvt_thread_mutex_unlock( &c->c_mutex );
 
        c = NULL;
 
@@ -1112,10 +1090,7 @@ Connection* connection_next( Connection *c, ber_socket_t *index )
                assert( connections[*index].c_conn_state == SLAP_C_INVALID );
        }
 
-       if( c != NULL ) {
-               ldap_pvt_thread_mutex_lock( &c->c_mutex );
-       }
-
+       if( c != NULL ) ldap_pvt_thread_mutex_lock( &c->c_mutex );
        return c;
 }
 #endif
@@ -1128,9 +1103,7 @@ void connection_done( Connection *c )
 
        assert( connections != NULL );
 
-       if( c != NULL ) {
-               ldap_pvt_thread_mutex_unlock( &c->c_mutex );
-       }
+       if( c != NULL ) ldap_pvt_thread_mutex_unlock( &c->c_mutex );
 
 #ifdef SLAP_MULTI_CONN_ARRAY
        for ( conn_array_id = 0;
@@ -1328,10 +1301,11 @@ operations_error:
        }
 
        if ( op->o_cancel == SLAP_CANCEL_REQ ) {
-               if ( rc == SLAPD_ABANDON )
+               if ( rc == SLAPD_ABANDON ) {
                        op->o_cancel = SLAP_CANCEL_ACK;
-               else
+               } else {
                        op->o_cancel = LDAP_TOO_LATE;
+               }
        }
        while ( op->o_cancel != SLAP_CANCEL_NONE &&
                op->o_cancel != SLAP_CANCEL_DONE )
@@ -1353,8 +1327,8 @@ operations_error:
        case LBER_ERROR:
        case LDAP_REQ_UNBIND:
                /* c_mutex is locked */
-               connection_closing(
-                       conn, tag == LDAP_REQ_UNBIND ? NULL : "operations error" );
+               connection_closing( conn,
+                       tag == LDAP_REQ_UNBIND ? NULL : "operations error" );
                break;
 
        case LDAP_REQ_BIND:
@@ -1388,9 +1362,10 @@ int connection_client_setup(
        c = connection_get( s );
        c->c_clientfunc = func;
        c->c_clientarg = arg;
-       connection_return( c );
+
        slapd_add_internal( s, 0 );
        slapd_set_read( s, 1 );
+       connection_return( c );
        return 0;
 }
 
@@ -1421,12 +1396,14 @@ void connection_client_stop(
                ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max );
        }
 
-       connection_return( c );
        slapd_remove( s, 0, 1 );
+       connection_return( c );
 }
 
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-void* connection_processing_thread( void* ctx, void* argv )
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+static int connection_read( ber_socket_t s, Operation** op );
+
+static void* connection_read_thread( void* ctx, void* argv )
 {
        int rc ;
        Operation* new_op = NULL;
@@ -1451,50 +1428,43 @@ void* connection_processing_thread( void* ctx, void* argv )
        return (void*)rc;
 }
 
-int connection_processing_activate( ber_socket_t s )
+int connection_read_activate( ber_socket_t s )
 {
-       int status;
+       int rc;
 
        /*
         * suspend reading on this file descriptor until a connection processing
         * thread reads data on it. Otherwise the listener thread will repeatedly
         * submit the same event on it to the pool.
         */
-       if( !slapd_suspend( s ) ) return 0;
+       slapd_clr_read( s, 0 );
 
-       status = ldap_pvt_thread_pool_submit( &connection_pool,
-               connection_processing_thread, (void *) s );
+       rc = ldap_pvt_thread_pool_submit( &connection_pool,
+               connection_read_thread, (void *) s );
 
-       if( status != 0 ) {
-               Debug( LDAP_DEBUG_ANY, "connection_processing_activiate(%d): "
-                       "ldap_pvt_thread_pool_submit failed\n",
-                       s, 0, 0 );
-               return -1;
+       if( rc != 0 ) {
+               Debug( LDAP_DEBUG_ANY,
+                       "connection_read_activiate(%d): submit failed (%d)\n",
+                       s, rc, 0 );
        }
 
-       return 1;
+       return rc;
 }
 #endif
 
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-int connection_read( ber_socket_t s, Operation** op )
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+static int
+connection_read( ber_socket_t s, Operation** op )
 #else
 int connection_read(ber_socket_t s)
 #endif
 {
        int rc = 0;
        Connection *c;
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-       int need_resume = 1;
-#endif
 
        assert( connections != NULL );
 
-#ifdef SLAP_MULTI_CONN_ARRAY
        ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX(s) );
-#else
-       ldap_pvt_thread_mutex_lock( &connections_mutex );
-#endif
 
        /* get (locked) connection */
        c = connection_get( s );
@@ -1505,11 +1475,7 @@ int connection_read(ber_socket_t s)
                        (long) s, 0, 0 );
                slapd_remove(s, 1, 0);
 
-#ifdef SLAP_MULTI_CONN_ARRAY
                ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-               ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
                return -1;
        }
 
@@ -1519,33 +1485,26 @@ int connection_read(ber_socket_t s)
                Debug( LDAP_DEBUG_TRACE,
                        "connection_read(%d): closing, ignoring input for id=%lu\n",
                        s, c->c_connid, 0 );
-               connection_return( c );
-#ifdef SLAP_MULTI_CONN_ARRAY
-               ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-               ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
 
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-               slapd_resume( s );
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+               slapd_set_read( s, 1 );
 #endif
+               connection_return( c );
+               ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
                return 0;
        }
 
        if ( c->c_conn_state == SLAP_C_CLIENT ) {
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-               slapd_resume( s );
-#endif
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+               /* read should already be cleared */
+#else
                slapd_clr_read( s, 0 );
+#endif
                ldap_pvt_thread_pool_submit( &connection_pool,
                        c->c_clientfunc, c->c_clientarg );
 
                connection_return( c );
-#ifdef SLAP_MULTI_CONN_ARRAY
                ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-               ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
                return 0;
        }
 
@@ -1558,7 +1517,7 @@ int connection_read(ber_socket_t s)
                rc = ldap_pvt_tls_accept( c->c_sb, slap_tls_ctx );
                if ( rc < 0 ) {
                        Debug( LDAP_DEBUG_TRACE,
-                               "connection_read(%d): TLS accept error "
+                               "connection_read(%d): TLS accept failure "
                                "error=%d id=%lu, closing\n",
                                s, rc, c->c_connid );
 
@@ -1586,6 +1545,9 @@ int connection_read(ber_socket_t s)
 #endif
 
                        connection_close( c );
+                       connection_return( c );
+                       ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
+                       return 0;
 
                } else if ( rc == 0 ) {
                        void *ssl;
@@ -1618,16 +1580,12 @@ int connection_read(ber_socket_t s)
                if( rc != 0 ||
                        !ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_DATA_READY, NULL ) )
                {
-                       connection_return( c );
-#ifdef SLAP_MULTI_CONN_ARRAY
-                       ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-                       ldap_pvt_thread_mutex_unlock( &connections_mutex );
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+                       slapd_set_read( s, 1 );
 #endif
 
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-                       slapd_resume( s );
-#endif
+                       connection_return( c );
+                       ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
                        return 0;
                }
        }
@@ -1637,16 +1595,12 @@ int connection_read(ber_socket_t s)
        if ( c->c_sasl_layers ) {
                /* If previous layer is not removed yet, give up for now */
                if ( !c->c_sasl_sockctx ) {
-                       connection_return( c );
-#ifdef SLAP_MULTI_CONN_ARRAY
-                       ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-                       ldap_pvt_thread_mutex_unlock( &connections_mutex );
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+                       slapd_set_read( s, 1 );
 #endif
 
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-                       slapd_resume( s );
-#endif
+                       connection_return( c );
+                       ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
                        return 0;
                }
 
@@ -1658,18 +1612,12 @@ int connection_read(ber_socket_t s)
                                "connection_read(%d): SASL install error "
                                "error=%d id=%lu, closing\n",
                                s, rc, c->c_connid );
+
                        /* connections_mutex and c_mutex are locked */
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-                       slapd_resume( s );
-#endif
                        connection_closing( c, "SASL layer install failure" );
                        connection_close( c );
                        connection_return( c );
-#ifdef SLAP_MULTI_CONN_ARRAY
                        ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-                       ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
                        return 0;
                }
        }
@@ -1680,20 +1628,10 @@ int connection_read(ber_socket_t s)
 
        do {
                /* How do we do this without getting into a busy loop ? */
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
                rc = connection_input( c, op );
 #else
                rc = connection_input( c );
-#endif
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-               if( *op && (*op)->o_tag == LDAP_REQ_UNBIND ) {
-                       /*
-                        * After the reception of an unbind request,
-                        * no more incoming requests via the connection
-                        * is expected. Therefore, don't resume connection reading.
-                        */
-                       need_resume = 0;
-               }
 #endif
        }
 #ifdef DATA_READY_LOOP
@@ -1713,18 +1651,17 @@ int connection_read(ber_socket_t s)
                connection_closing( c, conn_lost_str );
                connection_close( c );
                connection_return( c );
-#ifdef SLAP_MULTI_CONN_ARRAY
                ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-               ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
                return 0;
        }
 
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
-       if ( need_resume ) slapd_resume( s );
-#endif
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+       if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
+               slapd_set_write( s, 0 );
+       }
 
+       slapd_set_read( s, 1 );
+#else
        if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_READ, NULL ) ) {
                slapd_set_read( s, 1 );
        }
@@ -1732,22 +1669,18 @@ int connection_read(ber_socket_t s)
        if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
                slapd_set_write( s, 1 );
        }
+#endif
 
        connection_return( c );
-#ifdef SLAP_MULTI_CONN_ARRAY
        ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-       ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
 
        return 0;
 }
 
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
 static int
-connection_input( Connection *conn, Operation** c_op )
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+connection_input( Connection *conn , Operation** c_op )
 #else
-static int
 connection_input( Connection *conn )
 #endif
 {
@@ -1774,10 +1707,9 @@ connection_input( Connection *conn )
 
 #ifdef LDAP_CONNECTIONLESS
        if ( conn->c_is_udp ) {
-               char    peername[sizeof("IP=255.255.255.255:65336")];
+               char peername[sizeof("IP=255.255.255.255:65336")];
 
-               len = ber_int_sb_read(conn->c_sb, &peeraddr,
-                       sizeof(struct sockaddr));
+               len = ber_int_sb_read(conn->c_sb, &peeraddr, sizeof(struct sockaddr));
                if (len != sizeof(struct sockaddr)) return 1;
 
                sprintf( peername, "IP=%s:%d",
@@ -1814,16 +1746,14 @@ connection_input( Connection *conn )
 
        if ( (tag = ber_get_int( ber, &msgid )) != LDAP_TAG_MSGID ) {
                /* log, close and send error */
-               Debug( LDAP_DEBUG_ANY, "ber_get_int returns 0x%lx\n",
-                       tag, 0, 0 );
+               Debug( LDAP_DEBUG_ANY, "ber_get_int returns 0x%lx\n", tag, 0, 0 );
                ber_free( ber, 1 );
                return -1;
        }
 
        if ( (tag = ber_peek_tag( ber, &len )) == LBER_ERROR ) {
                /* log, close and send error */
-               Debug( LDAP_DEBUG_ANY, "ber_peek_tag returns 0x%lx\n",
-                       tag, 0, 0 );
+               Debug( LDAP_DEBUG_ANY, "ber_peek_tag returns 0x%lx\n", tag, 0, 0 );
                ber_free( ber, 1 );
 
                return -1;
@@ -1853,7 +1783,8 @@ connection_input( Connection *conn )
        op->o_conn = conn;
        /* clear state if the connection is being reused from inactive */
        if ( conn->c_conn_state == SLAP_C_INACTIVE ) {
-               memset( &conn->c_pagedresults_state, 0, sizeof( conn->c_pagedresults_state ) );
+               memset( &conn->c_pagedresults_state, 0,
+                       sizeof( conn->c_pagedresults_state ) );
        }
 
        op->o_res_ber = NULL;
@@ -1932,14 +1863,12 @@ connection_input( Connection *conn )
                        conn->c_connid, defer, 0 );
                conn->c_n_ops_pending++;
                LDAP_STAILQ_INSERT_TAIL( &conn->c_pending_ops, op, o_next );
-               if ( conn->c_n_ops_pending > max ) {
-                       rc = -1;
-               } else {
-                       rc = 1;
-               }
+               rc = ( conn->c_n_ops_pending > max ) ? -1 : 0;
+
        } else {
                conn->c_n_ops_executing++;
-#ifdef SLAP_LIGHTWEIGHT_LISTENER
+
+#if 0
                /*
                 * The first op will be processed in the same thread context,
                 * Subsequent ops will be submitted to the pool by
@@ -1962,8 +1891,8 @@ connection_input( Connection *conn )
                return 1;
        }
 #endif
-       assert( conn->c_struct_state == SLAP_C_USED );
 
+       assert( conn->c_struct_state == SLAP_C_USED );
        return rc;
 }
 
@@ -1978,11 +1907,7 @@ connection_resched( Connection *conn )
                ber_sockbuf_ctrl( conn->c_sb, LBER_SB_OPT_GET_FD, &sd );
 
                /* use trylock to avoid possible deadlock */
-#ifdef SLAP_MULTI_CONN_ARRAY
                rc = ldap_pvt_thread_mutex_trylock( MCA_GET_CONN_MUTEX( sd ) );
-#else
-               rc = ldap_pvt_thread_mutex_trylock( &connections_mutex );
-#endif
 
                if( rc ) {
                        Debug( LDAP_DEBUG_TRACE,
@@ -1994,11 +1919,7 @@ connection_resched( Connection *conn )
                         * so recheck state below.
                         */
                        ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
-#ifdef SLAP_MULTI_CONN_ARRAY
                        ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX ( sd ) );
-#else
-                       ldap_pvt_thread_mutex_lock( &connections_mutex );
-#endif
                        ldap_pvt_thread_mutex_lock( &conn->c_mutex );
                }
 
@@ -2013,11 +1934,7 @@ connection_resched( Connection *conn )
                        connection_close( conn );
                }
 
-#ifdef SLAP_MULTI_CONN_ARRAY
                ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX( sd ) );
-#else
-               ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
                return 0;
        }
 
@@ -2027,11 +1944,11 @@ connection_resched( Connection *conn )
        }
 
        while ((op = LDAP_STAILQ_FIRST( &conn->c_pending_ops )) != NULL) {
-               if ( conn->c_n_ops_executing > connection_pool_max/2 ) {
-                       break;
-               }
+               if ( conn->c_n_ops_executing > connection_pool_max/2 ) break;
+
                LDAP_STAILQ_REMOVE_HEAD( &conn->c_pending_ops, o_next );
                LDAP_STAILQ_NEXT(op, o_next) = NULL;
+
                /* pending operations should not be marked for abandonment */
                assert(!op->o_abandon);
 
@@ -2040,9 +1957,7 @@ connection_resched( Connection *conn )
 
                connection_op_activate( op );
 
-               if ( conn->c_conn_state == SLAP_C_BINDING ) {
-                       break;
-               }
+               if ( conn->c_conn_state == SLAP_C_BINDING ) break;
        }
        return 0;
 }
@@ -2065,7 +1980,7 @@ static void connection_op_queue( Operation *op )
        int status;
        ber_tag_t tag = op->o_tag;
 
-       if(tag == LDAP_REQ_BIND) {
+       if (tag == LDAP_REQ_BIND) {
                op->o_conn->c_conn_state = SLAP_C_BINDING;
        }
 
@@ -2088,8 +2003,8 @@ static void connection_op_queue( Operation *op )
                        ? op->o_conn->c_protocol : LDAP_VERSION3;
        }
 
-       if (op->o_conn->c_conn_state == SLAP_C_INACTIVE
-               && op->o_protocol > LDAP_VERSION2)
+       if (op->o_conn->c_conn_state == SLAP_C_INACTIVE &&
+               op->o_protocol > LDAP_VERSION2)
        {
                op->o_conn->c_conn_state = SLAP_C_ACTIVE;
        }
@@ -2102,24 +2017,53 @@ static void connection_op_queue( Operation *op )
 
 static int connection_op_activate( Operation *op )
 {
-       int status;
+       int rc;
        ber_tag_t tag = op->o_tag;
 
        connection_op_queue( op );
 
-       status = ldap_pvt_thread_pool_submit( &connection_pool,
+       rc = ldap_pvt_thread_pool_submit( &connection_pool,
                connection_operation, (void *) op );
 
-       if ( status != 0 ) {
+       if ( rc != 0 ) {
                Debug( LDAP_DEBUG_ANY,
-                       "ldap_pvt_thread_pool_submit: failed (%d) for conn=%lu\n",
-                       status, op->o_connid, 0 );
+                       "connection_op_activate: submit failed (%d) for conn=%lu\n",
+                       rc, op->o_connid, 0 );
                /* should move op to pending list */
        }
 
-       return status;
+       return rc;
+}
+
+#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
+int connection_write_activate( ber_socket_t s )
+{
+       int rc;
+
+       /*
+        * suspend reading on this file descriptor until a connection processing
+        * thread write data on it. Otherwise the listener thread will repeatedly
+        * submit the same event on it to the pool.
+        */
+
+#ifndef SLAP_LIGHTWEIGHT_DISPATCHER
+       slapd_clr_write( s, 0);
+       c->c_n_write++;
+#endif
+
+       rc = ldap_pvt_thread_pool_submit( &connection_pool,
+               connection_read_thread, (void *) s );
+
+       if( rc != 0 ) {
+               Debug( LDAP_DEBUG_ANY,
+                       "connection_write_activiate(%d): submit failed (%d)\n",
+                       (int) s, rc, 0 );
+       }
+       return rc;
 }
 
+static
+#endif
 int connection_write(ber_socket_t s)
 {
        Connection *c;
@@ -2127,11 +2071,7 @@ int connection_write(ber_socket_t s)
 
        assert( connections != NULL );
 
-#ifdef SLAP_MULTI_CONN_ARRAY
        ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX( s ) );
-#else
-       ldap_pvt_thread_mutex_lock( &connections_mutex );
-#endif
 
        c = connection_get( s );
        if( c == NULL ) {
@@ -2139,16 +2079,14 @@ int connection_write(ber_socket_t s)
                        "connection_write(%ld): no connection!\n",
                        (long)s, 0, 0 );
                slapd_remove(s, 1, 0);
-#ifdef SLAP_MULTI_CONN_ARRAY
                ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX( s ) );
-#else
-               ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
                return -1;
        }
 
+#ifndef SLAP_LIGHTWEIGHT_DISPATCHER
        slapd_clr_write( s, 0);
        c->c_n_write++;
+#endif
 
        Debug( LDAP_DEBUG_TRACE,
                "connection_write(%d): waking output for id=%lu\n",
@@ -2167,11 +2105,11 @@ int connection_write(ber_socket_t s)
         */
        while ((op = LDAP_STAILQ_FIRST( &c->c_pending_ops )) != NULL) {
                if ( !c->c_writewaiter ) break;
-               if ( c->c_n_ops_executing > connection_pool_max/2 ) {
-                       break;
-               }
+               if ( c->c_n_ops_executing > connection_pool_max/2 ) break;
+
                LDAP_STAILQ_REMOVE_HEAD( &c->c_pending_ops, o_next );
                LDAP_STAILQ_NEXT(op, o_next) = NULL;
+
                /* pending operations should not be marked for abandonment */
                assert(!op->o_abandon);
 
@@ -2182,14 +2120,9 @@ int connection_write(ber_socket_t s)
 
                break;
        }
-       connection_return( c );
 
-#ifdef SLAP_MULTI_CONN_ARRAY
+       connection_return( c );
        ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
-#else
-       ldap_pvt_thread_mutex_unlock( &connections_mutex );
-#endif
-
        return 0;
 }
 
@@ -2219,7 +2152,7 @@ connection_fake_init(
        op->o_connid = op->o_conn->c_connid;
        connection_init_log_prefix( op );
 
-       op->o_time = slap_get_time();
+       slap_op_time( &op->o_time, &op->o_tincr );
 }
 
 void