]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/back-bdb/search.c
Use a thread for LDIF parsing in slapadd -q
[openldap] / servers / slapd / back-bdb / search.c
index 48cc77318d037eea36907110f26bdd6723d075fc..1d7e52fb4e7c1ede5488dfc0e45bab873dd74936 100644 (file)
@@ -2,7 +2,7 @@
 /* $OpenLDAP$ */
 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
  *
- * Copyright 2000-2008 The OpenLDAP Foundation.
+ * Copyright 2000-2011 The OpenLDAP Foundation.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -103,6 +103,8 @@ static Entry * deref_base (
 
                rs->sr_err = bdb_dn2entry( op, txn, &ndn, &ei,
                        0, &lockr );
+               if ( rs->sr_err == DB_LOCK_DEADLOCK )
+                       return NULL;
 
                if ( ei ) {
                        e = ei->bei_e;
@@ -204,14 +206,15 @@ static int search_aliases(
                BDB_IDL_CPY( curscop, aliases );
                rs->sr_err = bdb_dn2idl( op, txn, &e->e_nname, BEI(e), subscop,
                        subscop2+BDB_IDL_DB_SIZE );
-               if ( rs->sr_err == DB_LOCK_DEADLOCK )
-                       return rs->sr_err;
 
                if (first) {
                        first = 0;
                } else {
                        bdb_cache_return_entry_r (bdb, e, &locka);
                }
+               if ( rs->sr_err == DB_LOCK_DEADLOCK )
+                       return rs->sr_err;
+
                BDB_IDL_CPY(subscop2, subscop);
                rs->sr_err = bdb_idl_intersection(curscop, subscop);
                bdb_idl_union( ids, subscop2 );
@@ -307,6 +310,36 @@ sameido:
        return rs->sr_err;
 }
 
+/* Get the next ID from the DB. Used if the candidate list is
+ * a range and simple iteration hits missing entryIDs
+ */
+static int
+bdb_get_nextid(struct bdb_info *bdb, DB_TXN *ltid, ID *cursor)
+{
+       DBC *curs;
+       DBT key, data;
+       ID id, nid;
+       int rc;
+
+       id = *cursor + 1;
+       BDB_ID2DISK( id, &nid );
+       rc = bdb->bi_id2entry->bdi_db->cursor(
+               bdb->bi_id2entry->bdi_db, ltid, &curs, bdb->bi_db_opflags );
+       if ( rc )
+               return rc;
+       key.data = &nid;
+       key.size = key.ulen = sizeof(ID);
+       key.flags = DB_DBT_USERMEM;
+       data.flags = DB_DBT_USERMEM | DB_DBT_PARTIAL;
+       data.dlen = data.ulen = 0;
+       rc = curs->c_get( curs, &key, &data, DB_SET_RANGE );
+       curs->c_close( curs );
+       if ( rc )
+               return rc;
+       BDB_DISK2ID( &nid, cursor );
+       return 0;
+}
+
 int
 bdb_search( Operation *op, SlapReply *rs )
 {
@@ -323,7 +356,8 @@ bdb_search( Operation *op, SlapReply *rs )
        slap_mask_t     mask;
        time_t          stoptime;
        int             manageDSAit;
-       int             tentries = 0, nentries = 0;
+       int             tentries = 0;
+       unsigned        nentries = 0;
        int             idflag = 0;
 
        DB_LOCK         lock;
@@ -379,8 +413,10 @@ dn2entry_retry:
                e = ei->bei_e;
                break;
        case DB_LOCK_DEADLOCK:
-               if ( !opinfo )
+               if ( !opinfo ) {
+                       ltid->flags &= ~TXN_DEADLOCK;
                        goto dn2entry_retry;
+               }
                opinfo->boi_err = rs->sr_err;
                /* FALLTHRU */
        case LDAP_BUSY:
@@ -574,13 +610,17 @@ dn2entry_retry:
                rs->sr_err = base_candidate( op->o_bd, &base, candidates );
 
        } else {
+cand_retry:
                BDB_IDL_ZERO( candidates );
                BDB_IDL_ZERO( scopes );
                rs->sr_err = search_candidates( op, rs, &base,
                        ltid, candidates, scopes );
                if ( rs->sr_err == DB_LOCK_DEADLOCK ) {
-                       if ( opinfo )
-                               opinfo->boi_err = rs->sr_err;
+                       if ( !opinfo ) {
+                               ltid->flags &= ~TXN_DEADLOCK;
+                               goto cand_retry;
+                       }
+                       opinfo->boi_err = rs->sr_err;
                        send_ldap_error( op, rs, LDAP_BUSY, "ldap server busy" );
                        return LDAP_BUSY;
                }
@@ -658,6 +698,7 @@ loop_begin:
                /* check for abandon */
                if ( op->o_abandon ) {
                        rs->sr_err = SLAPD_ABANDON;
+                       send_ldap_result( op, rs );
                        goto done;
                }
 
@@ -686,54 +727,78 @@ loop_begin:
                 * any subsequent entries
                 */
                nentries++;
-               if ( nentries > bdb->bi_cache.c_maxsize && !idflag )
+               if ( nentries > bdb->bi_cache.c_maxsize && !idflag ) {
                        idflag = ID_NOCACHE;
+               }
 
 fetch_entry_retry:
-                       /* get the entry with reader lock */
-                       ei = NULL;
-                       rs->sr_err = bdb_cache_find_id( op, ltid,
-                               id, &ei, idflag, &lock );
-
-                       if (rs->sr_err == LDAP_BUSY) {
-                               rs->sr_text = "ldap server busy";
-                               send_ldap_result( op, rs );
-                               goto done;
+               /* get the entry with reader lock */
+               ei = NULL;
+               rs->sr_err = bdb_cache_find_id( op, ltid,
+                       id, &ei, idflag, &lock );
 
-                       } else if ( rs->sr_err == DB_LOCK_DEADLOCK ) {
-                               if ( opinfo )
-                                       opinfo->boi_err = rs->sr_err;
-                               send_ldap_error( op, rs, LDAP_BUSY, "ldap server busy" );
-                               goto done;
+               if (rs->sr_err == LDAP_BUSY) {
+                       rs->sr_text = "ldap server busy";
+                       send_ldap_result( op, rs );
+                       goto done;
 
-                       } else if ( rs->sr_err == DB_LOCK_NOTGRANTED )
-                       {
+               } else if ( rs->sr_err == DB_LOCK_DEADLOCK ) {
+                       if ( !opinfo ) {
+                               ltid->flags &= ~TXN_DEADLOCK;
                                goto fetch_entry_retry;
-                       } else if ( rs->sr_err == LDAP_OTHER ) {
-                               rs->sr_text = "internal error";
-                               send_ldap_result( op, rs );
-                               goto done;
                        }
+txnfail:
+                       opinfo->boi_err = rs->sr_err;
+                       send_ldap_error( op, rs, LDAP_BUSY, "ldap server busy" );
+                       goto done;
 
-                       if ( ei && rs->sr_err == LDAP_SUCCESS ) {
-                               e = ei->bei_e;
-                       } else {
-                               e = NULL;
-                       }
+               } else if ( rs->sr_err == DB_LOCK_NOTGRANTED )
+               {
+                       goto fetch_entry_retry;
+               } else if ( rs->sr_err == LDAP_OTHER ) {
+                       rs->sr_text = "internal error";
+                       send_ldap_result( op, rs );
+                       goto done;
+               }
 
-                       if ( e == NULL ) {
-                               if( !BDB_IDL_IS_RANGE(candidates) ) {
-                                       /* only complain for non-range IDLs */
-                                       Debug( LDAP_DEBUG_TRACE,
-                                               LDAP_XSTRING(bdb_search)
-                                               ": candidate %ld not found\n",
-                                               (long) id, 0, 0 );
-                               }
+               if ( ei && rs->sr_err == LDAP_SUCCESS ) {
+                       e = ei->bei_e;
+               } else {
+                       e = NULL;
+               }
 
-                               goto loop_continue;
+               if ( e == NULL ) {
+                       if( !BDB_IDL_IS_RANGE(candidates) ) {
+                               /* only complain for non-range IDLs */
+                               Debug( LDAP_DEBUG_TRACE,
+                                       LDAP_XSTRING(bdb_search)
+                                       ": candidate %ld not found\n",
+                                       (long) id, 0, 0 );
+                       } else {
+                               /* get the next ID from the DB */
+id_retry:
+                               rs->sr_err = bdb_get_nextid( bdb, ltid, &cursor );
+                               if ( rs->sr_err == DB_NOTFOUND ) {
+                                       break;
+                               } else if ( rs->sr_err == DB_LOCK_DEADLOCK ) {
+                                       if ( opinfo )
+                                               goto txnfail;
+                                       ltid->flags &= ~TXN_DEADLOCK;
+                                       goto id_retry;
+                               } else if ( rs->sr_err == DB_LOCK_NOTGRANTED ) {
+                                       goto id_retry;
+                               }
+                               if ( rs->sr_err ) {
+                                       rs->sr_err = LDAP_OTHER;
+                                       rs->sr_text = "internal error in get_nextid";
+                                       send_ldap_result( op, rs );
+                                       goto done;
+                               }
+                               cursor--;
                        }
 
-               rs->sr_entry = e;
+                       goto loop_continue;
+               }
 
                if ( is_entry_subentry( e ) ) {
                        if( op->oq_search.rs_scope != LDAP_SCOPE_BASE ) {
@@ -840,13 +905,52 @@ fetch_entry_retry:
                if ( !manageDSAit && op->oq_search.rs_scope != LDAP_SCOPE_BASE
                        && is_entry_referral( e ) )
                {
+                       struct bdb_op_info bois;
+                       struct bdb_lock_info blis;
                        BerVarray erefs = get_entry_referrals( op, e );
                        rs->sr_ref = referral_rewrite( erefs, &e->e_name, NULL,
                                op->oq_search.rs_scope == LDAP_SCOPE_ONELEVEL
                                        ? LDAP_SCOPE_BASE : LDAP_SCOPE_SUBTREE );
 
+                       /* Must set lockinfo so that entry_release will work */
+                       if (!opinfo) {
+                               bois.boi_oe.oe_key = bdb;
+                               bois.boi_txn = NULL;
+                               bois.boi_err = 0;
+                               bois.boi_acl_cache = op->o_do_not_cache;
+                               bois.boi_flag = BOI_DONTFREE;
+                               bois.boi_locks = &blis;
+                               blis.bli_next = NULL;
+                               LDAP_SLIST_INSERT_HEAD( &op->o_extra, &bois.boi_oe,
+                                       oe_next );
+                       } else {
+                               blis.bli_next = opinfo->boi_locks;
+                               opinfo->boi_locks = &blis;
+                       }
+                       blis.bli_id = e->e_id;
+                       blis.bli_lock = lock;
+                       blis.bli_flag = BLI_DONTFREE;
+
+                       rs->sr_entry = e;
+                       rs->sr_flags = REP_ENTRY_MUSTRELEASE;
+
                        send_search_reference( op, rs );
 
+                       if ( blis.bli_flag ) {
+#ifdef SLAP_ZONE_ALLOC
+                               slap_zn_runlock(bdb->bi_cache.c_zctx, e);
+#endif
+                               bdb_cache_return_entry_r(bdb, e, &lock);
+                               if ( opinfo ) {
+                                       opinfo->boi_locks = blis.bli_next;
+                               } else {
+                                       LDAP_SLIST_REMOVE( &op->o_extra, &bois.boi_oe,
+                                               OpExtra, oe_next );
+                               }
+                       }
+                       rs->sr_entry = NULL;
+                       e = NULL;
+
                        ber_bvarray_free( rs->sr_ref );
                        ber_bvarray_free( erefs );
                        rs->sr_ref = NULL;
@@ -859,7 +963,7 @@ fetch_entry_retry:
                }
 
                /* if it matches the filter and scope, send it */
-               rs->sr_err = test_filter( op, rs->sr_entry, op->oq_search.rs_filter );
+               rs->sr_err = test_filter( op, e, op->oq_search.rs_filter );
 
                if ( rs->sr_err == LDAP_COMPARE_TRUE ) {
                        /* check size limit */
@@ -877,13 +981,57 @@ fetch_entry_retry:
                        }
 
                        if (e) {
+                               struct bdb_op_info bois;
+                               struct bdb_lock_info blis;
+
+                               /* Must set lockinfo so that entry_release will work */
+                               if (!opinfo) {
+                                       bois.boi_oe.oe_key = bdb;
+                                       bois.boi_txn = NULL;
+                                       bois.boi_err = 0;
+                                       bois.boi_acl_cache = op->o_do_not_cache;
+                                       bois.boi_flag = BOI_DONTFREE;
+                                       bois.boi_locks = &blis;
+                                       blis.bli_next = NULL;
+                                       LDAP_SLIST_INSERT_HEAD( &op->o_extra, &bois.boi_oe,
+                                               oe_next );
+                               } else {
+                                       blis.bli_next = opinfo->boi_locks;
+                                       opinfo->boi_locks = &blis;
+                               }
+                               blis.bli_id = e->e_id;
+                               blis.bli_lock = lock;
+                               blis.bli_flag = BLI_DONTFREE;
+
                                /* safe default */
                                rs->sr_attrs = op->oq_search.rs_attrs;
                                rs->sr_operational_attrs = NULL;
                                rs->sr_ctrls = NULL;
-                               rs->sr_flags = 0;
+                               rs->sr_entry = e;
+                               RS_ASSERT( e->e_private != NULL );
+                               rs->sr_flags = REP_ENTRY_MUSTRELEASE;
                                rs->sr_err = LDAP_SUCCESS;
                                rs->sr_err = send_search_entry( op, rs );
+                               rs->sr_attrs = NULL;
+                               rs->sr_entry = NULL;
+
+                               /* send_search_entry will usually free it.
+                                * an overlay might leave its own copy here;
+                                * bli_flag will be 0 if lock was already released.
+                                */
+                               if ( blis.bli_flag ) {
+#ifdef SLAP_ZONE_ALLOC
+                                       slap_zn_runlock(bdb->bi_cache.c_zctx, e);
+#endif
+                                       bdb_cache_return_entry_r(bdb, e, &lock);
+                                       if ( opinfo ) {
+                                               opinfo->boi_locks = blis.bli_next;
+                                       } else {
+                                               LDAP_SLIST_REMOVE( &op->o_extra, &bois.boi_oe,
+                                                       OpExtra, oe_next );
+                                       }
+                               }
+                               e = NULL;
 
                                switch ( rs->sr_err ) {
                                case LDAP_SUCCESS:      /* entry sent ok */
@@ -892,12 +1040,6 @@ fetch_entry_retry:
                                        break;
                                case LDAP_UNAVAILABLE:
                                case LDAP_SIZELIMIT_EXCEEDED:
-#ifdef SLAP_ZONE_ALLOC
-                                       slap_zn_runlock(bdb->bi_cache.c_zctx, e);
-#endif
-                                       bdb_cache_return_entry_r(bdb, e, &lock);
-                                       e = NULL;
-                                       rs->sr_entry = NULL;
                                        if ( rs->sr_err == LDAP_SIZELIMIT_EXCEEDED ) {
                                                rs->sr_ref = rs->sr_v2ref;
                                                send_ldap_result( op, rs );
@@ -924,6 +1066,7 @@ loop_continue:
                        slap_zn_runlock(bdb->bi_cache.c_zctx, e);
 #endif
                        bdb_cache_return_entry_r( bdb, e , &lock );
+                       RS_ASSERT( rs->sr_entry == NULL );
                        e = NULL;
                        rs->sr_entry = NULL;
                }
@@ -1174,6 +1317,9 @@ parse_paged_cookie( Operation *op, SlapReply *rs )
                        goto done;
                }
 
+       } else {
+               /* we're going to use ps_cookie */
+               op->o_conn->c_pagedresults_state.ps_cookie = 0;
        }
 
 done:;
@@ -1188,7 +1334,7 @@ send_paged_response(
        ID              *lastid,
        int             tentries )
 {
-       LDAPControl     ctrl, *ctrls[2];
+       LDAPControl     *ctrls[2];
        BerElementBuffer berbuf;
        BerElement      *ber = (BerElement *)&berbuf;
        PagedResultsCookie respcookie;
@@ -1198,8 +1344,6 @@ send_paged_response(
                "send_paged_response: lastid=0x%08lx nentries=%d\n", 
                lastid ? *lastid : 0, rs->sr_nentries, NULL );
 
-       BER_BVZERO( &ctrl.ldctl_value );
-       ctrls[0] = &ctrl;
        ctrls[1] = NULL;
 
        ber_init2( ber, NULL, LBER_USE_DER );
@@ -1222,6 +1366,7 @@ send_paged_response(
        /* return size of 0 -- no estimate */
        ber_printf( ber, "{iO}", 0, &cookie ); 
 
+       ctrls[0] = op->o_tmpalloc( sizeof(LDAPControl), op->o_tmpmemctx );
        if ( ber_flatten2( ber, &ctrls[0]->ldctl_value, 0 ) == -1 ) {
                goto done;
        }
@@ -1229,12 +1374,10 @@ send_paged_response(
        ctrls[0]->ldctl_oid = LDAP_CONTROL_PAGEDRESULTS;
        ctrls[0]->ldctl_iscritical = 0;
 
-       rs->sr_ctrls = ctrls;
+       slap_add_ctrls( op, rs, ctrls );
        rs->sr_err = LDAP_SUCCESS;
        send_ldap_result( op, rs );
-       rs->sr_ctrls = NULL;
 
 done:
        (void) ber_free_buf( ber );
 }
-