]> git.sur5r.net Git - openldap/commitdiff
Add IDL caching for slapadd/slapindex quick mode
authorHoward Chu <hyc@openldap.org>
Sat, 22 Oct 2005 21:41:58 +0000 (21:41 +0000)
committerHoward Chu <hyc@openldap.org>
Sat, 22 Oct 2005 21:41:58 +0000 (21:41 +0000)
servers/slapd/back-bdb/key.c
servers/slapd/back-bdb/proto-bdb.h
servers/slapd/back-bdb/tools.c

index e64236693578b93efa160842596e3ec413bd1a8e..87679612a8bf6a72724eb3cf5b491159a1ebe41b 100644 (file)
@@ -84,7 +84,11 @@ bdb_key_change(
 
        if (op == SLAP_INDEX_ADD_OP) {
                /* Add values */
-               rc = bdb_idl_insert_key( be, db, txn, &key, id );
+
+               if ( slapMode & SLAP_TOOL_QUICK )
+                       rc = bdb_tool_idl_add( be, db, txn, &key, id );
+               else
+                       rc = bdb_idl_insert_key( be, db, txn, &key, id );
                if ( rc == DB_KEYEXIST ) rc = 0;
        } else {
                /* Delete values */
index 82cb7c1b34da8cfc2713b031eae611bd283f1c84..b6555523e9f4f12bc21710173befc03d88f8c989 100644 (file)
@@ -595,6 +595,7 @@ bdb_trans_backoff( int num_retries );
 #define bdb_tool_dn2id_get             BDB_SYMBOL(tool_dn2id_get)
 #define bdb_tool_id2entry_get          BDB_SYMBOL(tool_id2entry_get)
 #define bdb_tool_entry_modify          BDB_SYMBOL(tool_entry_modify)
+#define bdb_tool_idl_add               BDB_SYMBOL(tool_idl_add)
 
 extern BI_init                         bdb_back_initialize;
 
@@ -626,6 +627,8 @@ extern BI_tool_dn2id_get            bdb_tool_dn2id_get;
 extern BI_tool_id2entry_get            bdb_tool_id2entry_get;
 extern BI_tool_entry_modify            bdb_tool_entry_modify;
 
+int bdb_tool_idl_add( BackendDB *be, DB *db, DB_TXN *txn, DBT *key, ID id );
+
 LDAP_END_DECL
 
 #endif /* _PROTO_BDB_H */
index 3c9761c8e79bf8a94c53e0a1d3d4079c7d766582..25cbd8b55f26000092f67ad12696b46a13239a1c 100644 (file)
@@ -21,6 +21,7 @@
 
 #define AVL_INTERNAL
 #include "back-bdb.h"
+#include "idl.h"
 
 static DBC *cursor = NULL;
 static DBT key, data;
@@ -37,6 +38,12 @@ static unsigned nholes;
 
 static Avlnode *index_attrs, index_dummy;
 
+#define bdb_tool_idl_cmp               BDB_SYMBOL(tool_idl_cmp)
+#define bdb_tool_idl_flush_one         BDB_SYMBOL(tool_idl_flush_one)
+#define bdb_tool_idl_flush             BDB_SYMBOL(tool_idl_flush)
+
+static int bdb_tool_idl_flush( BackendDB *be );
+
 int bdb_tool_entry_open(
        BackendDB *be, int mode )
 {
@@ -79,6 +86,8 @@ int bdb_tool_entry_close(
                cursor = NULL;
        }
 
+       bdb_tool_idl_flush( be );
+
        if( nholes ) {
                unsigned i;
                fprintf( stderr, "Error, entries missing!\n");
@@ -630,3 +639,258 @@ done:
 
        return e->e_id;
 }
+
+#define        IDBLOCK 1024
+
+typedef struct bdb_tool_idl_cache_entry {
+       struct bdb_tool_idl_cache_entry *next;
+       ID ids[IDBLOCK];
+} bdb_tool_idl_cache_entry;
+
+typedef struct bdb_tool_idl_cache {
+       struct berval kstr;
+       bdb_tool_idl_cache_entry *head, *tail;
+       ID first, last;
+       int count;
+} bdb_tool_idl_cache;
+
+static bdb_tool_idl_cache_entry *bdb_tool_idl_free_list;
+
+static int
+bdb_tool_idl_cmp( const void *v1, const void *v2 )
+{
+       const bdb_tool_idl_cache *c1 = v1, *c2 = v2;
+       int rc;
+
+       if (( rc = c1->kstr.bv_len - c2->kstr.bv_len )) return rc;
+       return memcmp( c1->kstr.bv_val, c2->kstr.bv_val, c1->kstr.bv_len );
+}
+
+static int
+bdb_tool_idl_flush_one( void *v1, void *arg )
+{
+       bdb_tool_idl_cache *ic = v1;
+       DB *db = arg;
+       bdb_tool_idl_cache_entry *ice;
+       DBC *curs;
+       DBT key, data;
+       int i, rc;
+       ID id, nid;
+
+       rc = db->cursor( db, NULL, &curs, 0 );
+       if ( rc )
+               return -1;
+
+       DBTzero( &key );
+       DBTzero( &data );
+
+       bv2DBT( &ic->kstr, &key );
+
+       data.size = data.ulen = sizeof( ID );
+       data.flags = DB_DBT_USERMEM;
+       data.data = &nid;
+
+       rc = curs->c_get( curs, &key, &data, DB_SET );
+       /* If key already exists and we're writing a range... */
+       if ( rc == 0 && ic->head == NULL ) {
+               /* If it's not currently a range, must delete old info */
+               if ( nid ) {
+                       /* Skip lo */
+                       while ( curs->c_get( curs, &key, &data, DB_NEXT_DUP ) == 0 )
+                               curs->c_del( curs, 0 );
+
+                       nid = 0;
+                       /* Store range marker */
+                       curs->c_put( curs, &key, &data, DB_KEYFIRST );
+               } else {
+                       
+                       /* Skip lo */
+                       rc = curs->c_get( curs, &key, &data, DB_NEXT_DUP );
+
+                       /* Get hi */
+                       rc = curs->c_get( curs, &key, &data, DB_NEXT_DUP );
+
+                       /* Delete hi */
+                       curs->c_del( curs, 0 );
+               }
+               BDB_ID2DISK( ic->last, &nid );
+               curs->c_put( curs, &key, &data, DB_KEYLAST );
+               rc = 0;
+       } else if ( rc && rc != DB_NOTFOUND ) {
+               rc = -1;
+       } else if ( ic->head == NULL ) {
+               /* range, didn't exist before */
+               nid = 0;
+               rc = curs->c_put( curs, &key, &data, DB_KEYLAST );
+               if ( rc == 0 ) {
+                       BDB_ID2DISK( ic->first, &nid );
+                       rc = curs->c_put( curs, &key, &data, DB_KEYLAST );
+                       if ( rc == 0 ) {
+                               BDB_ID2DISK( ic->last, &nid );
+                               rc = curs->c_put( curs, &key, &data, DB_KEYLAST );
+                       }
+               }
+               if ( rc ) {
+                       rc = -1;
+               }
+       } else {
+               /* Just a normal write */
+               rc = 0;
+               for ( ice = ic->head; ice; ice = ic->head ) {
+                       int end = ice->next ? IDBLOCK : (ic->count & (IDBLOCK-1));
+                       ic->head = ice->next;
+                       for ( i=0; i<end; i++ ) {
+                               if ( !ice->ids[i] ) continue;
+                               BDB_ID2DISK( ice->ids[i], &nid );
+                               rc = curs->c_put( curs, &key, &data, DB_NODUPDATA );
+                               if ( rc ) {
+                                       if ( rc == DB_KEYEXIST ) {
+                                               rc = 0;
+                                               continue;
+                                       }
+                                       rc = -1;
+                                       break;
+                               }
+                       }
+                       ice->next = bdb_tool_idl_free_list;
+                       bdb_tool_idl_free_list = ice;
+                       if ( rc ) {
+                               rc = -1;
+                               break;
+                       }
+               }
+       }
+       ch_free( ic );
+       curs->c_close( curs );
+       return rc;
+}
+
+static int
+bdb_tool_idl_flush( BackendDB *be )
+{
+       struct bdb_info *bdb = (struct bdb_info *) be->be_private;
+       DB *db;
+       Avlnode *root;
+       int i, rc = 0;
+
+       for ( i=BDB_NDB; i < bdb->bi_ndatabases; i++ ) {
+               db = bdb->bi_databases[i]->bdi_db;
+               if ( !db->app_private ) continue;
+               rc = avl_apply( db->app_private, bdb_tool_idl_flush_one, db, -1,
+                       AVL_INORDER );
+               avl_free( db->app_private, NULL );
+               db->app_private = NULL;
+               if ( rc == -1 ) break;
+               rc = 0;
+       }
+       if ( !rc )
+               bdb->bi_idl_cache_size = 0;
+       return rc;
+}
+
+int bdb_tool_idl_add(
+       BackendDB *be,
+       DB *db,
+       DB_TXN *txn,
+       DBT *key,
+       ID id )
+{
+       struct bdb_info *bdb = (struct bdb_info *) be->be_private;
+       bdb_tool_idl_cache *ic, itmp;
+       bdb_tool_idl_cache_entry *ice;
+       Avlnode *root;
+       int rc;
+
+       if ( !bdb->bi_idl_cache_max_size )
+               return bdb_idl_insert_key( be, db, txn, key, id );
+
+       root = db->app_private;
+
+       DBT2bv( key, &itmp.kstr );
+
+       ic = avl_find( root, &itmp, bdb_tool_idl_cmp );
+
+       /* No entry yet, create one */
+       if ( !ic ) {
+               DBC *curs;
+               DBT data;
+               ID nid;
+               int rc;
+
+               ic = ch_malloc( sizeof( bdb_tool_idl_cache ) + itmp.kstr.bv_len );
+               ic->kstr.bv_len = itmp.kstr.bv_len;
+               ic->kstr.bv_val = (char *)(ic+1);
+               AC_MEMCPY( ic->kstr.bv_val, itmp.kstr.bv_val, ic->kstr.bv_len );
+               ic->head = ic->tail = NULL;
+               ic->last = 0;
+               ic->count = 0;
+               avl_insert( &root, ic, bdb_tool_idl_cmp, avl_dup_error );
+               db->app_private = root;
+
+               /* load existing key count here */
+               rc = db->cursor( db, NULL, &curs, 0 );
+               if ( rc ) return rc;
+
+               data.ulen = sizeof( ID );
+               data.flags = DB_DBT_USERMEM;
+               data.data = &nid;
+               rc = curs->c_get( curs, key, &data, DB_SET );
+               if ( rc == 0 ) {
+                       if ( nid == 0 ) {
+                               ic->count = BDB_IDL_DB_SIZE+1;
+                       } else {
+                               db_recno_t count;
+
+                               curs->c_count( curs, &count, 0 );
+                               ic->count = count;
+                               BDB_DISK2ID( &nid, &ic->first );
+                       }
+               }
+               curs->c_close( curs );
+       }
+       /* are we a range already? */
+       if ( ic->count > BDB_IDL_DB_SIZE ) {
+               ic->last = id;
+               return 0;
+       /* Are we at the limit, and converting to a range? */
+       } else if ( ic->count == BDB_IDL_DB_SIZE ) {
+               for (ice = ic->head; ice; ice = ice->next ) {
+                       bdb->bi_idl_cache_size--;
+               }
+               ic->tail->next = bdb_tool_idl_free_list;
+               bdb_tool_idl_free_list = ic->head;
+               ic->head = ic->tail = NULL;
+               ic->last = id;
+               ic->count++;
+               return 0;
+       }
+       /* No free block, create that too */
+       if ( !ic->tail || ( ic->count & (IDBLOCK-1)) == 0) {
+               if ( bdb_tool_idl_free_list ) {
+                       ice = bdb_tool_idl_free_list;
+                       bdb_tool_idl_free_list = ice->next;
+               } else {
+                       ice = ch_malloc( sizeof( bdb_tool_idl_cache_entry ));
+               }
+               memset( ice, 0, sizeof( *ice ));
+               if ( !ic->head ) {
+                       ic->head = ice;
+               } else {
+                       ic->tail->next = ice;
+               }
+               ic->tail = ice;
+               bdb->bi_idl_cache_size++;
+               if ( !ic->count )
+                       ic->first = id;
+       }
+       ice = ic->tail;
+       ice->ids[ ic->count & (IDBLOCK-1) ] = id;
+       ic->count++;
+
+       rc = 0;
+       /* Check for flush */
+       if ( bdb->bi_idl_cache_size > bdb->bi_idl_cache_max_size ) {
+               rc = bdb_tool_idl_flush( be );
+       }
+       return rc;
+}