]> git.sur5r.net Git - openldap/commitdiff
Use IDL caching in tool/quick mode
authorHoward Chu <hyc@openldap.org>
Mon, 3 Oct 2011 02:31:18 +0000 (19:31 -0700)
committerHoward Chu <hyc@openldap.org>
Mon, 3 Oct 2011 02:38:58 +0000 (19:38 -0700)
servers/slapd/back-mdb/index.c
servers/slapd/back-mdb/proto-mdb.h
servers/slapd/back-mdb/tools.c

index 58366c88df109e80a656355d9cd3faa28d808403..8d361bede1df1af6d27c1372fc7af9fc6cf1d4d1 100644 (file)
@@ -184,9 +184,12 @@ static int indexer(
        rc = mdb_cursor_open( txn, dbi, &mc );
        if ( rc ) goto done;
 
-       if ( opid == SLAP_INDEX_ADD_OP )
-               keyfunc = mdb_idl_insert_keys;
-       else
+       if ( opid == SLAP_INDEX_ADD_OP ) {
+               if ( slapMode & SLAP_TOOL_QUICK )
+                       keyfunc = mdb_tool_idl_add;
+               else
+                       keyfunc = mdb_idl_insert_keys;
+       } else
                keyfunc = mdb_idl_delete_keys;
 
        if( IS_SLAP_INDEX( mask, SLAP_INDEX_PRESENT ) ) {
index 28f74210d1cf6e892dc111870414671c0d712722..3c2e255b253ec74c34a35b9648524b72e17b083f 100644 (file)
@@ -359,6 +359,11 @@ extern BI_tool_entry_reindex               mdb_tool_entry_reindex;
 extern BI_tool_dn2id_get               mdb_tool_dn2id_get;
 extern BI_tool_entry_modify            mdb_tool_entry_modify;
 
+int mdb_tool_idl_add(
+       MDB_cursor *mc,
+       struct berval *keys,
+       ID id );
+
 LDAP_END_DECL
 
 #endif /* _PROTO_MDB_H */
index 08dc01d9affd5e1e2d72e10ff38689a97ffcd87a..dfc956543471b78be22b751896c1b727bb505e12 100644 (file)
 #include "back-mdb.h"
 #include "idl.h"
 
+static int mdb_tool_idl_flush( BackendDB *be, MDB_txn *txn );
+
+#define        IDBLOCK 1024
+
+typedef struct mdb_tool_idl_cache_entry {
+       struct mdb_tool_idl_cache_entry *next;
+       ID ids[IDBLOCK];
+} mdb_tool_idl_cache_entry;
+
+typedef struct mdb_tool_idl_cache {
+       struct berval kstr;
+       mdb_tool_idl_cache_entry *head, *tail;
+       ID first, last;
+       int count;
+} mdb_tool_idl_cache;
+
+static mdb_tool_idl_cache_entry *mdb_tool_idl_free_list;
+static Avlnode *mdb_tool_roots[MDB_INDICES];
+
 static MDB_txn *txn = NULL, *txi = NULL;
 static MDB_cursor *cursor = NULL, *idcursor = NULL;
 static MDB_val key, data;
@@ -138,6 +157,7 @@ int mdb_tool_entry_close(
                mdb_cursor_close( cursor );
                cursor = NULL;
        }
+       mdb_tool_idl_flush( be, txn );
        if( txn ) {
                if ( mdb_txn_commit( txn ))
                        return -1;
@@ -594,6 +614,7 @@ done:
        if( rc == 0 ) {
                mdb_writes++;
                if ( mdb_writes >= mdb_writes_per_commit ) {
+                       mdb_tool_idl_flush( be, txn );
                        rc = mdb_txn_commit( txn );
                        mdb_writes = 0;
                        txn = NULL;
@@ -748,6 +769,7 @@ done:
        if( rc == 0 ) {
                mdb_writes++;
                if ( mdb_writes >= mdb_writes_per_commit ) {
+                       mdb_tool_idl_flush( be, txi );
                        rc = mdb_txn_commit( txi );
                        if( rc != 0 ) {
                                Debug( LDAP_DEBUG_ANY,
@@ -887,3 +909,244 @@ mdb_tool_index_task( void *ctx, void *ptr )
        return NULL;
 }
 #endif
+
+static int
+mdb_tool_idl_cmp( const void *v1, const void *v2 )
+{
+       const mdb_tool_idl_cache *c1 = v1, *c2 = v2;
+       int rc;
+
+       if (( rc = c1->kstr.bv_len - c2->kstr.bv_len )) return rc;
+       return memcmp( c1->kstr.bv_val, c2->kstr.bv_val, c1->kstr.bv_len );
+}
+
+static int
+mdb_tool_idl_flush_one( MDB_cursor *mc, mdb_tool_idl_cache *ic )
+{
+       mdb_tool_idl_cache_entry *ice;
+       MDB_val key, data;
+       int i, rc;
+       ID id, nid;
+
+       /* Freshly allocated, ignore it */
+       if ( !ic->head && ic->count <= MDB_IDL_DB_SIZE ) {
+               return 0;
+       }
+
+       key.mv_data = ic->kstr.bv_val;
+       key.mv_size = ic->kstr.bv_len;
+
+       rc = mdb_cursor_get( mc, &key, &data, MDB_SET );
+       /* If key already exists and we're writing a range... */
+       if ( rc == 0 && ic->count > MDB_IDL_DB_SIZE ) {
+               nid = *(ID *)data.mv_data;
+               /* If it's not currently a range, must delete old info */
+               if ( nid ) {
+                       ic->first = nid;
+                       mdb_cursor_del( mc, MDB_NODUPDATA );
+
+                       goto setrange;
+               } else {
+                       /* Skip lo */
+                       rc = mdb_cursor_get( mc, &key, &data, MDB_NEXT_DUP );
+
+                       /* Get hi */
+                       rc = mdb_cursor_get( mc, &key, &data, MDB_NEXT_DUP );
+
+                       /* Store range hi */
+                       data.mv_data = &ic->last;
+                       rc = mdb_cursor_put( mc, &key, &data, MDB_CURRENT );
+               }
+               rc = 0;
+       } else if ( rc && rc != MDB_NOTFOUND ) {
+               rc = -1;
+       } else if ( ic->count > MDB_IDL_DB_SIZE ) {
+               /* range, didn't exist before */
+setrange:
+               nid = 0;
+               data.mv_size = sizeof(ID);
+               data.mv_data = &nid;
+               rc = mdb_cursor_put( mc, &key, &data, 0 );
+               if ( rc == 0 ) {
+                       data.mv_data = &ic->first;
+                       rc = mdb_cursor_put( mc, &key, &data, 0 );
+                       if ( rc == 0 ) {
+                               data.mv_data = &ic->last;
+                               rc = mdb_cursor_put( mc, &key, &data, 0 );
+                       }
+               }
+               if ( rc ) {
+                       rc = -1;
+               }
+       } else {
+               int n;
+
+               data.mv_size = sizeof(ID);
+               /* Just a normal write */
+               rc = 0;
+               for ( ice = ic->head, n=0; ice; ice = ice->next, n++ ) {
+                       int end;
+                       if ( ice->next ) {
+                               end = IDBLOCK;
+                       } else {
+                               end = ic->count & (IDBLOCK-1);
+                               if ( !end )
+                                       end = IDBLOCK;
+                       }
+                       for ( i=0; i<end; i++ ) {
+                               if ( !ice->ids[i] ) continue;
+                               data.mv_data = &ice->ids[i];
+                               rc = mdb_cursor_put( mc, &key, &data, MDB_NODUPDATA );
+                               if ( rc ) {
+                                       if ( rc == MDB_KEYEXIST ) {
+                                               rc = 0;
+                                               continue;
+                                       }
+                                       rc = -1;
+                                       break;
+                               }
+                       }
+                       if ( rc ) {
+                               rc = -1;
+                               break;
+                       }
+               }
+               if ( ic->head ) {
+                       ic->tail->next = mdb_tool_idl_free_list;
+                       mdb_tool_idl_free_list = ic->head;
+               }
+       }
+       ch_free( ic );
+       return rc;
+}
+
+static int
+mdb_tool_idl_flush_db( MDB_txn *txn, MDB_dbi dbi, Avlnode *root )
+{
+       MDB_cursor *mc;
+       int rc;
+
+       mdb_cursor_open( txn, dbi, &mc );
+       root = tavl_end( root, TAVL_DIR_LEFT );
+       do {
+               rc = mdb_tool_idl_flush_one( mc, root->avl_data );
+               if ( rc != -1 )
+                       rc = 0;
+       } while ((root = tavl_next(root, TAVL_DIR_RIGHT)));
+       mdb_cursor_close( mc );
+
+       return rc;
+}
+
+static int
+mdb_tool_idl_flush( BackendDB *be, MDB_txn *txn )
+{
+       struct mdb_info *mdb = (struct mdb_info *) be->be_private;
+       int rc = 0;
+       unsigned int i;
+
+       for ( i=MDB_NDB; i < mdb->mi_nattrs+MDB_NDB; i++ ) {
+               if ( !mdb_tool_roots[i] ) continue;
+               rc = mdb_tool_idl_flush_db( txn, i, mdb_tool_roots[i] );
+               tavl_free(mdb_tool_roots[i], NULL);
+               mdb_tool_roots[i] = NULL;
+               if ( rc )
+                       break;
+       }
+       return rc;
+}
+
+int mdb_tool_idl_add(
+       MDB_cursor *mc,
+       struct berval *keys,
+       ID id )
+{
+       MDB_dbi dbi;
+       mdb_tool_idl_cache *ic, itmp;
+       mdb_tool_idl_cache_entry *ice;
+       int i, rc;
+
+       dbi = mdb_cursor_dbi(mc);
+       for (i=0; keys[i].bv_val; i++) {
+       itmp.kstr = keys[i];
+       ic = tavl_find( (Avlnode *)mdb_tool_roots[dbi], &itmp, mdb_tool_idl_cmp );
+
+       /* No entry yet, create one */
+       if ( !ic ) {
+               MDB_val key, data;
+               ID nid;
+               int rc;
+
+               ic = ch_malloc( sizeof( mdb_tool_idl_cache ) + itmp.kstr.bv_len );
+               ic->kstr.bv_len = itmp.kstr.bv_len;
+               ic->kstr.bv_val = (char *)(ic+1);
+               memcpy( ic->kstr.bv_val, itmp.kstr.bv_val, ic->kstr.bv_len );
+               ic->head = ic->tail = NULL;
+               ic->last = 0;
+               ic->count = 0;
+               tavl_insert( (Avlnode **)&mdb_tool_roots[dbi], ic, mdb_tool_idl_cmp,
+                       avl_dup_error );
+
+               /* load existing key count here */
+               key.mv_size = keys[i].bv_len;
+               key.mv_data = keys[i].bv_val;
+               data.mv_size = sizeof( ID );
+               data.mv_data = &nid;
+               rc = mdb_cursor_get( mc, &key, &data, MDB_SET );
+               if ( rc == 0 ) {
+                       if ( nid == 0 ) {
+                               ic->count = MDB_IDL_DB_SIZE+1;
+                       } else {
+                               size_t count;
+
+                               mdb_cursor_count( mc, &count );
+                               ic->count = count;
+                               ic->first = nid;
+                       }
+               }
+       }
+       /* are we a range already? */
+       if ( ic->count > MDB_IDL_DB_SIZE ) {
+               ic->last = id;
+               continue;
+       /* Are we at the limit, and converting to a range? */
+       } else if ( ic->count == MDB_IDL_DB_SIZE ) {
+               int n;
+               for ( ice = ic->head, n=0; ice; ice = ice->next, n++ )
+                       /* counting */ ;
+               if ( n ) {
+                       ic->tail->next = mdb_tool_idl_free_list;
+                       mdb_tool_idl_free_list = ic->head;
+               }
+               ic->head = ic->tail = NULL;
+               ic->last = id;
+               ic->count++;
+               continue;
+       }
+       /* No free block, create that too */
+       if ( !ic->tail || ( ic->count & (IDBLOCK-1)) == 0) {
+               ice = NULL;
+               if ( mdb_tool_idl_free_list ) {
+                       ice = mdb_tool_idl_free_list;
+                       mdb_tool_idl_free_list = ice->next;
+               }
+               if ( !ice ) {
+                       ice = ch_malloc( sizeof( mdb_tool_idl_cache_entry ));
+               }
+               memset( ice, 0, sizeof( *ice ));
+               if ( !ic->head ) {
+                       ic->head = ice;
+               } else {
+                       ic->tail->next = ice;
+               }
+               ic->tail = ice;
+               if ( !ic->count )
+                       ic->first = id;
+       }
+       ice = ic->tail;
+       ice->ids[ ic->count & (IDBLOCK-1) ] = id;
+       ic->count++;
+       }
+
+       return 0;
+}