static void *search_stack( Operation *op );
+typedef struct ww_ctx {
+ MDB_txn *txn;
+ int flag;
+} ww_ctx;
+
+static void
+mdb_writewait( Operation *op, slap_callback *sc )
+{
+ ww_ctx *ww = sc->sc_private;
+ if ( !ww->flag ) {
+ mdb_txn_reset( ww->txn );
+ ww->flag = 1;
+ }
+}
+
int
mdb_search( Operation *op, SlapReply *rs )
{
int tentries = 0;
IdScopes isc;
MDB_cursor *mci, *mcd;
+ ww_ctx wwctx;
+ slap_callback cb = { 0 };
mdb_op_info opinfo = {{{0}}}, *moi = &opinfo;
MDB_txn *ltid = NULL;
id = mdb_idl_first( candidates, &cursor );
}
+ cb.sc_writewait = mdb_writewait;
+ cb.sc_private = &wwctx;
+ wwctx.flag = 0;
+ wwctx.txn = ltid;
+ cb.sc_next = op->o_callback;
+ op->o_callback = &cb;
+
while (id != NOID)
{
int scopeok;
rs->sr_flags = 0;
send_search_reference( op, rs );
+ if ( wwctx.flag ) {
+ wwctx.flag = 0;
+ mdb_txn_renew( ltid );
+ }
mdb_entry_return( op, e );
rs->sr_entry = NULL;
rs->sr_flags = 0;
rs->sr_err = LDAP_SUCCESS;
rs->sr_err = send_search_entry( op, rs );
+ if ( wwctx.flag ) {
+ wwctx.flag = 0;
+ mdb_txn_renew( ltid );
+ }
rs->sr_attrs = NULL;
rs->sr_entry = NULL;
if (e != base)
id = mdb_idl_next( candidates, &cursor );
}
}
+ /* remove our writewait callback */
+ {
+ slap_callback **scp = &op->o_callback;
+ while ( *scp ) {
+ if ( *scp == &cb ) {
+ *scp = cb.sc_next;
+ cb.sc_private = NULL;
+ break;
+ }
+ }
+ }
nochange:
rs->sr_ctrls = NULL;
rs->sr_err = LDAP_SUCCESS;
done:
+ if ( cb.sc_private ) {
+ /* remove our writewait callback */
+ slap_callback **scp = &op->o_callback;
+ while ( *scp ) {
+ if ( *scp == &cb ) {
+ *scp = cb.sc_next;
+ cb.sc_private = NULL;
+ break;
+ }
+ }
+ }
mdb_cursor_close( mcd );
mdb_cursor_close( mci );
if ( moi == &opinfo ) {
return 1;
}
+/* Check for any callbacks that want to be informed about being blocked
+ * on output. These callbacks are expected to leave the callback list
+ * unmodified. Their result is ignored.
+ */
+static void
+slap_writewait_play(
+ Operation *op )
+{
+ slap_callback *sc = op->o_callback;
+
+ for ( ; sc; sc = sc->sc_next ) {
+ if ( sc->sc_writewait )
+ sc->sc_writewait( op, sc );
+ }
+}
+
static long send_ldap_ber(
Operation *op,
BerElement *ber )
conn->c_writewaiter = 1;
ldap_pvt_thread_mutex_unlock( &conn->c_write1_mutex );
ldap_pvt_thread_pool_idle( &connection_pool );
+ slap_writewait_play( op );
err = slapd_wait_writer( conn->c_sd );
conn->c_writewaiter = 0;
ldap_pvt_thread_pool_unidle( &connection_pool );
typedef int (slap_response)( Operation *, SlapReply * );
+struct slap_callback;
+typedef void (slap_writewait)( Operation *, struct slap_callback * );
+
typedef struct slap_callback {
struct slap_callback *sc_next;
slap_response *sc_response;
slap_response *sc_cleanup;
+ slap_writewait *sc_writewait;
void *sc_private;
} slap_callback;