static void bdb_lru_print(Cache *cache);
#endif
+static int bdb_txn_get( Operation *op, DB_ENV *env, DB_TXN **txn );
+
static EntryInfo *
bdb_cache_entryinfo_new( Cache *cache )
{
if ( (*eip)->bei_state & CACHE_ENTRY_DELETED ) {
rc = DB_NOTFOUND;
} else {
+ int load = 0;
+ /* Make sure only one thread tries to load the entry */
+load1: if ( !(*eip)->bei_e && !((*eip)->bei_state & CACHE_ENTRY_LOADING)) {
+ load = 1;
+ (*eip)->bei_state |= CACHE_ENTRY_LOADING;
+ }
if ( islocked ) {
bdb_cache_entryinfo_unlock( *eip );
islocked = 0;
rc = DB_NOTFOUND;
bdb_cache_entry_db_unlock( bdb->bi_dbenv, lock );
} else if ( rc == 0 ) {
- if ( !(*eip)->bei_e ) {
- bdb_cache_entry_db_relock( bdb->bi_dbenv, locker,
+ if ( load ) {
+ DB_TXN *ltid;
+ u_int32_t locker2 = locker;
+
+ /* We don't wrap entire read operations in txn's, but
+ * we need our cache entry lock and any DB page locks
+ * to be associated, in order for deadlock detection
+ * to work properly. So we use a long-lived per-thread
+ * txn for this step.
+ */
+ if ( !tid ) {
+ rc = bdb_txn_get( op, bdb->bi_dbenv, <id );
+ if ( ltid )
+ locker2 = TXN_ID( ltid );
+ } else {
+ ltid = tid;
+ }
+ /* Give up original read lock, obtain write lock with
+ * (possibly) new locker ID.
+ */
+ bdb_cache_entry_db_relock( bdb->bi_dbenv, locker2,
*eip, 1, 0, lock );
if (!ep) {
- rc = bdb_id2entry( op->o_bd, tid, id, &ep );
+ rc = bdb_id2entry( op->o_bd, ltid, id, &ep );
}
if ( rc == 0 ) {
- /* Make sure no other modifier beat us to it */
- if ( (*eip)->bei_e ) {
- bdb_entry_return( ep );
- ep = NULL;
-#ifdef BDB_HIER
- /* Check for subtree renames */
- rc = bdb_fix_dn( (*eip)->bei_e, 1 );
- if ( rc ) rc = bdb_fix_dn( (*eip)->bei_e, 2 );
-#endif
- } else {
- ep->e_private = *eip;
+ ep->e_private = *eip;
#ifdef BDB_HIER
- bdb_fix_dn( ep, 0 );
+ bdb_fix_dn( ep, 0 );
#endif
- (*eip)->bei_e = ep;
- ep = NULL;
- }
+ (*eip)->bei_e = ep;
+ ep = NULL;
}
- bdb_cache_entry_db_relock( bdb->bi_dbenv, locker,
- *eip, 0, 0, lock );
+ (*eip)->bei_state ^= CACHE_ENTRY_LOADING;
+ if ( rc == 0 ) {
+ /* If we succeeded, downgrade back to a readlock. */
+ bdb_cache_entry_db_relock( bdb->bi_dbenv, locker,
+ *eip, 0, 0, lock );
+ } else {
+ /* Otherwise, release the lock. */
+ bdb_cache_entry_db_unlock( bdb->bi_dbenv, lock );
+ }
+ if ( !tid ) {
+ /* If we're using the per-thread txn, release all
+ * of its page locks now.
+ */
+ DB_LOCKREQ list;
+ list.op = DB_LOCK_PUT_ALL;
+ list.obj = NULL;
+ bdb->bi_dbenv->lock_vec( bdb->bi_dbenv, locker2,
+ 0, &list, 1, NULL );
+ }
+ } else if ( !(*eip)->bei_e ) {
+ /* Some other thread is trying to load the entry,
+ * give it a chance to finish.
+ */
+ bdb_cache_entry_db_unlock( bdb->bi_dbenv, lock );
+ ldap_pvt_thread_yield();
+ bdb_cache_entryinfo_lock( *eip );
+ islocked = 1;
+ goto load1;
#ifdef BDB_HIER
} else {
/* Check for subtree renames
}
#endif
+static void
+bdb_txn_free( void *key, void *data )
+{
+ DB_TXN *txn = data;
+ TXN_ABORT( txn );
+}
+
+/* Obtain a long-lived transaction for the current thread */
+static int
+bdb_txn_get( Operation *op, DB_ENV *env, DB_TXN **txn )
+{
+ int i, rc, lockid;
+ void *ctx, *data;
+
+ /* If no op was provided, try to find the ctx anyway... */
+ if ( op ) {
+ ctx = op->o_threadctx;
+ } else {
+ ctx = ldap_pvt_thread_pool_context();
+ }
+
+ /* Shouldn't happen unless we're single-threaded */
+ if ( !ctx ) {
+ *txn = NULL;
+ return -1;
+ }
+
+ if ( ldap_pvt_thread_pool_getkey( ctx, ((char *)env)+1, &data, NULL ) ) {
+ for ( i=0, rc=1; rc != 0 && i<4; i++ ) {
+ rc = TXN_BEGIN( env, NULL, txn, 0 );
+ if (rc) ldap_pvt_thread_yield();
+ }
+ if ( rc != 0) {
+ return rc;
+ }
+ if ( ( rc = ldap_pvt_thread_pool_setkey( ctx, ((char *)env)+1,
+ *txn, bdb_txn_free ) ) ) {
+ TXN_ABORT( *txn );
+#ifdef NEW_LOGGING
+ LDAP_LOG( BACK_BDB, ERR, "bdb_txn_get: err %s(%d)\n",
+ db_strerror(rc), rc, 0 );
+#else
+ Debug( LDAP_DEBUG_ANY, "bdb_txn_get: err %s(%d)\n",
+ db_strerror(rc), rc, 0 );
+#endif
+
+ return rc;
+ }
+ } else {
+ *txn = data;
+ }
+ return 0;
+}
+
#ifdef BDB_REUSE_LOCKERS
static void
bdb_locker_id_free( void *key, void *data )
"bdb_locker_id_free: %d err %s(%d)\n",
lockid, db_strerror(rc), rc );
#endif
- memset( &lr, 0, sizeof(lr) );
-
/* release all locks held by this locker. */
lr.op = DB_LOCK_PUT_ALL;
+ lr.obj = NULL;
env->lock_vec( env, lockid, 0, &lr, 1, NULL );
XLOCK_ID_FREE( env, lockid );
}