]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ldbm/idl.c
0afef2333351f3ab7b2472591a22d97dbbd90ebb
[openldap] / servers / slapd / back-ldbm / idl.c
1 /* idl.c - ldap id list handling routines */
2 /*
3  * Copyright 1998-1999 The OpenLDAP Foundation, All Rights Reserved.
4  * COPYING RESTRICTIONS APPLY, see COPYRIGHT file
5  */
6
7 #include "portable.h"
8
9 #include <stdio.h>
10
11 #include <ac/string.h>
12 #include <ac/socket.h>
13
14 #include "ldap_defaults.h"
15 #include "slap.h"
16 #include "back-ldbm.h"
17
18 static ID_BLOCK* idl_dup( ID_BLOCK *idl );
19
20 /* Allocate an ID_BLOCK with room for nids ids */
21 ID_BLOCK *
22 idl_alloc( unsigned int nids )
23 {
24         ID_BLOCK        *new;
25
26         /* nmax + nids + space for the ids */
27         new = (ID_BLOCK *) ch_calloc( (ID_BLOCK_IDS_OFFSET + nids), sizeof(ID) );
28         ID_BLOCK_NMAX(new) = nids;
29         ID_BLOCK_NIDS(new) = 0;
30
31         return( new );
32 }
33
34
35 /* Allocate an empty ALLIDS ID_BLOCK */
36 ID_BLOCK        *
37 idl_allids( Backend *be )
38 {
39         ID_BLOCK        *idl;
40
41         idl = idl_alloc( 0 );
42         ID_BLOCK_NMAX(idl) = ID_BLOCK_ALLIDS_VALUE;
43         ID_BLOCK_NIDS(idl) = next_id_get( be );
44
45         return( idl );
46 }
47
48
49 /* Free an ID_BLOCK */
50 void
51 idl_free( ID_BLOCK *idl )
52 {
53         if ( idl == NULL ) {
54                 Debug( LDAP_DEBUG_TRACE,
55                         "idl_free: called with NULL pointer\n",
56                         0, 0, 0 );
57                 return;
58         }
59
60         free( (char *) idl );
61 }
62
63
64 /* Fetch an single ID_BLOCK from the cache */
65 static ID_BLOCK *
66 idl_fetch_one(
67     Backend             *be,
68     DBCache     *db,
69     Datum               key
70 )
71 {
72         Datum   data;
73         ID_BLOCK        *idl;
74
75         ldbm_datum_init( data );
76
77         /* Debug( LDAP_DEBUG_TRACE, "=> idl_fetch_one\n", 0, 0, 0 ); */
78
79         data = ldbm_cache_fetch( db, key );
80
81         if( data.dptr == NULL ) {
82                 return NULL;
83         }
84
85         idl = idl_dup( (ID_BLOCK *) data.dptr);
86         ldbm_datum_free( db->dbc_db, data );
87
88         return( idl );
89 }
90
91
92 /* Fetch a set of ID_BLOCKs from the cache
93  *      if not INDIRECT
94  *              if block return is an ALLIDS block,
95  *                      return an new ALLIDS block
96  *              otherwise
97  *                      return block
98  *      construct super block from all blocks referenced by INDIRECT block
99  *      return super block
100  */
101 ID_BLOCK *
102 idl_fetch(
103     Backend             *be,
104     DBCache     *db,
105     Datum               key
106 )
107 {
108         Datum   data;
109         ID_BLOCK        *idl;
110         ID_BLOCK        **tmp;
111         char    *kstr;
112         int     i, nids;
113
114         idl = idl_fetch_one( be, db, key );
115
116         if ( idl == NULL ) {
117                 return NULL;
118         }
119
120         if ( ID_BLOCK_ALLIDS(idl) ) {
121                 /* all ids block */
122                 /* make sure we have the current value of highest id */
123                 idl_free( idl );
124                 idl = idl_allids( be );
125
126                 return( idl );
127         }
128
129         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
130                 /* regular block */
131                 return( idl );
132         }
133
134         /*
135          * this is an indirect block which points to other blocks.
136          * we need to read in all the blocks it points to and construct
137          * a big id list containing all the ids, which we will return.
138          */
139
140         /* count the number of blocks & allocate space for pointers to them */
141         for ( i = 0; !ID_BLOCK_NOID(idl, i); i++ )
142                 ;       /* NULL */
143         tmp = (ID_BLOCK **) ch_malloc( (i + 1) * sizeof(ID_BLOCK *) );
144
145         /* read in all the blocks */
146         kstr = (char *) ch_malloc( key.dsize + 20 );
147         nids = 0;
148         for ( i = 0; !ID_BLOCK_NOID(idl, i); i++ ) {
149                 ldbm_datum_init( data );
150
151                 sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr, ID_BLOCK_ID(idl, i) );
152                 data.dptr = kstr;
153                 data.dsize = strlen( kstr ) + 1;
154
155                 if ( (tmp[i] = idl_fetch_one( be, db, data )) == NULL ) {
156                         Debug( LDAP_DEBUG_ANY,
157                             "idl_fetch of (%s) returns NULL\n", data.dptr, 0, 0 );
158                         continue;
159                 }
160
161                 nids += ID_BLOCK_NIDS(tmp[i]);
162         }
163         tmp[i] = NULL;
164         free( kstr );
165         idl_free( idl );
166
167         /* allocate space for the big block */
168         idl = idl_alloc( nids );
169         ID_BLOCK_NIDS(idl) = nids;
170         nids = 0;
171
172         /* copy in all the ids from the component blocks */
173         for ( i = 0; tmp[i] != NULL; i++ ) {
174                 if ( tmp[i] == NULL ) {
175                         continue;
176                 }
177
178                 SAFEMEMCPY(
179                         (char *) &ID_BLOCK_ID(idl, nids),
180                         (char *) &ID_BLOCK_ID(tmp[i], 0),
181                         ID_BLOCK_NIDS(tmp[i]) * sizeof(ID) );
182                 nids += ID_BLOCK_NIDS(tmp[i]);
183
184                 idl_free( tmp[i] );
185         }
186         free( (char *) tmp );
187
188         Debug( LDAP_DEBUG_TRACE, "<= idl_fetch %ld ids (%ld max)\n",
189                ID_BLOCK_NIDS(idl), ID_BLOCK_NMAX(idl), 0 );
190         return( idl );
191 }
192
193
194 /* store a single block */
195 static int
196 idl_store(
197     Backend             *be,
198     DBCache     *db,
199     Datum               key, 
200     ID_BLOCK            *idl
201 )
202 {
203         int     rc, flags;
204         Datum   data;
205         struct ldbminfo *li = (struct ldbminfo *) be->be_private;
206
207         ldbm_datum_init( data );
208
209         /* Debug( LDAP_DEBUG_TRACE, "=> idl_store\n", 0, 0, 0 ); */
210
211         data.dptr = (char *) idl;
212         data.dsize = (ID_BLOCK_IDS_OFFSET + ID_BLOCK_NMAX(idl)) * sizeof(ID);
213         
214 #ifdef LDBM_DEBUG
215         Statslog( LDAP_DEBUG_STATS, "<= idl_store(): rc=%d\n",
216                 rc, 0, 0, 0, 0 );
217 #endif
218
219         flags = LDBM_REPLACE;
220         if( li->li_dbcachewsync ) flags |= LDBM_SYNC;
221         rc = ldbm_cache_store( db, key, data, flags );
222
223         /* Debug( LDAP_DEBUG_TRACE, "<= idl_store %d\n", rc, 0, 0 ); */
224         return( rc );
225 }
226
227 /* split the block at id 
228  *      locate ID greater than or equal to id.
229  */
230 static void
231 idl_split_block(
232     ID_BLOCK    *b,
233     ID          id,
234     ID_BLOCK    **right,
235     ID_BLOCK    **left
236 )
237 {
238         unsigned int    nr, nl;
239
240         /* find where to split the block *//* XXX linear search XXX */
241         for ( nr = 0; nr < ID_BLOCK_NIDS(b) && id > ID_BLOCK_ID(b, nr); nr++ )
242                 ;       /* NULL */
243
244         nl = ID_BLOCK_NIDS(b) - nr;
245
246         *right = idl_alloc( nr == 0 ? 1 : nr );
247         *left = idl_alloc( nl + (nr == 0 ? 0 : 1));
248
249         /*
250          * everything before the id being inserted in the first block
251          * unless there is nothing, in which case the id being inserted
252          * goes there.
253          */
254         if ( nr == 0 ) {
255                 ID_BLOCK_NIDS(*right) = 1;
256                 ID_BLOCK_ID(*right, 0) = id;
257         } else {
258                 SAFEMEMCPY(
259                         (char *) &ID_BLOCK_ID(*right, 0),
260                         (char *) &ID_BLOCK_ID(b, 0),
261                         nr * sizeof(ID) );
262                 ID_BLOCK_NIDS(*right) = nr;
263                 ID_BLOCK_ID(*left, 0) = id;
264         }
265
266         /* the id being inserted & everything after in the second block */
267         SAFEMEMCPY(
268                 (char *) &ID_BLOCK_ID(*left, (nr == 0 ? 0 : 1)),
269             (char *) &ID_BLOCK_ID(b, nr),
270                 nl * sizeof(ID) );
271         ID_BLOCK_NIDS(*left) = nl + (nr == 0 ? 0 : 1);
272 }
273
274
275 /*
276  * idl_change_first - called when an indirect block's first key has
277  * changed, meaning it needs to be stored under a new key, and the
278  * header block pointing to it needs updating.
279  */
280 static int
281 idl_change_first(
282     Backend             *be,
283     DBCache     *db,
284     Datum               hkey,           /* header block key     */
285     ID_BLOCK            *h,             /* header block         */
286     int                 pos,            /* pos in h to update   */
287     Datum               bkey,           /* data block key       */
288     ID_BLOCK            *b              /* data block           */
289 )
290 {
291         int     rc;
292
293         /* Debug( LDAP_DEBUG_TRACE, "=> idl_change_first\n", 0, 0, 0 ); */
294
295         /* delete old key block */
296         if ( (rc = ldbm_cache_delete( db, bkey )) != 0 ) {
297                 Debug( LDAP_DEBUG_ANY,
298                     "ldbm_delete of (%s) returns %d\n", bkey.dptr, rc,
299                     0 );
300                 return( rc );
301         }
302
303         /* write block with new key */
304         sprintf( bkey.dptr, "%c%s%ld", CONT_PREFIX, hkey.dptr, ID_BLOCK_ID(b, 0) );
305         bkey.dsize = strlen( bkey.dptr ) + 1;
306         if ( (rc = idl_store( be, db, bkey, b )) != 0 ) {
307                 Debug( LDAP_DEBUG_ANY,
308                     "idl_store of (%s) returns %d\n", bkey.dptr, rc, 0 );
309                 return( rc );
310         }
311
312         /* update + write indirect header block */
313         ID_BLOCK_ID(h, pos) = ID_BLOCK_ID(b, 0);
314         if ( (rc = idl_store( be, db, hkey, h )) != 0 ) {
315                 Debug( LDAP_DEBUG_ANY,
316                     "idl_store of (%s) returns %d\n", hkey.dptr, rc, 0 );
317                 return( rc );
318         }
319
320         return( 0 );
321 }
322
323
324 int
325 idl_insert_key(
326     Backend             *be,
327     DBCache     *db,
328     Datum               key,
329     ID                  id
330 )
331 {
332         int     i, j, first, rc;
333         ID_BLOCK        *idl, *tmp, *tmp2, *tmp3;
334         char    *kstr;
335         Datum   k2;
336
337         ldbm_datum_init( k2 );
338
339         if ( (idl = idl_fetch_one( be, db, key )) == NULL ) {
340 #ifdef LDBM_DEBUG
341                 Statslog( LDAP_DEBUG_STATS, "=> idl_insert_key(): no key yet\n",
342                         0, 0, 0, 0, 0 );
343 #endif
344
345                 idl = idl_alloc( 1 );
346                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)++) = id;
347                 rc = idl_store( be, db, key, idl );
348
349                 idl_free( idl );
350                 return( rc );
351         }
352
353         if ( ID_BLOCK_ALLIDS( idl ) ) {
354                 /* ALLIDS */
355                 idl_free( idl );
356                 return 0;
357         }
358
359         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
360                 /* regular block */
361                 switch ( idl_insert( &idl, id, db->dbc_maxids ) ) {
362                 case 0:         /* id inserted - store the updated block */
363                 case 1:
364                         rc = idl_store( be, db, key, idl );
365                         break;
366
367                 case 2:         /* id already there - nothing to do */
368                         rc = 0;
369                         break;
370
371                 case 3:         /* id not inserted - block must be split */
372                         /* check threshold for marking this an all-id block */
373                         if ( db->dbc_maxindirect < 2 ) {
374                                 idl_free( idl );
375                                 idl = idl_allids( be );
376                                 rc = idl_store( be, db, key, idl );
377                                 break;
378                         }
379
380                         idl_split_block( idl, id, &tmp, &tmp2 );
381                         idl_free( idl );
382
383                         /* create the header indirect block */
384                         idl = idl_alloc( 3 );
385                         ID_BLOCK_NMAX(idl) = 3;
386                         ID_BLOCK_NIDS(idl) = ID_BLOCK_INDIRECT_VALUE;
387                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
388                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
389                         ID_BLOCK_ID(idl, 2) = NOID;
390
391                         /* store it */
392                         rc = idl_store( be, db, key, idl );
393
394                         /* store the first id block */
395                         kstr = (char *) ch_malloc( key.dsize + 20 );
396                         sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr,
397                             ID_BLOCK_ID(tmp, 0) );
398                         k2.dptr = kstr;
399                         k2.dsize = strlen( kstr ) + 1;
400                         rc = idl_store( be, db, k2, tmp );
401
402                         /* store the second id block */
403                         sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr,
404                             ID_BLOCK_ID(tmp2, 0) );
405                         k2.dptr = kstr;
406                         k2.dsize = strlen( kstr ) + 1;
407                         rc = idl_store( be, db, k2, tmp2 );
408
409                         free( kstr );
410                         idl_free( tmp );
411                         idl_free( tmp2 );
412                         break;
413                 }
414
415                 idl_free( idl );
416                 return( rc );
417         }
418
419         /*
420          * this is an indirect block which points to other blocks.
421          * we need to read in the block into which the id should be
422          * inserted, then insert the id and store the block.  we might
423          * have to split the block if it is full, which means we also
424          * need to write a new "header" block.
425          */
426
427         /* select the block to try inserting into *//* XXX linear search XXX */
428         for ( i = 0; !ID_BLOCK_NOID(idl, i) && id > ID_BLOCK_ID(idl, i); i++ )
429                 ;       /* NULL */
430         if ( i != 0 ) {
431                 i--;
432                 first = 0;
433         } else {
434                 first = 1;
435         }
436
437         /* get the block */
438         kstr = (char *) ch_malloc( key.dsize + 20 );
439         sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr, ID_BLOCK_ID(idl, i) );
440         k2.dptr = kstr;
441         k2.dsize = strlen( kstr ) + 1;
442         if ( (tmp = idl_fetch_one( be, db, k2 )) == NULL ) {
443                 Debug( LDAP_DEBUG_ANY, "nonexistent continuation block (%s)\n",
444                     k2.dptr, 0, 0 );
445                 free( kstr );
446                 idl_free( idl );
447                 return( -1 );
448         }
449
450         /* insert the id */
451         switch ( idl_insert( &tmp, id, db->dbc_maxids ) ) {
452         case 0:         /* id inserted ok */
453                 if ( (rc = idl_store( be, db, k2, tmp )) != 0 ) {
454                         Debug( LDAP_DEBUG_ANY,
455                             "idl_store of (%s) returns %d\n", k2.dptr, rc, 0 );
456                 }
457                 break;
458
459         case 1:         /* id inserted - first id in block has changed */
460                 /*
461                  * key for this block has changed, so we have to
462                  * write the block under the new key, delete the
463                  * old key block + update and write the indirect
464                  * header block.
465                  */
466
467                 rc = idl_change_first( be, db, key, idl, i, k2, tmp );
468                 break;
469
470         case 2:         /* id not inserted - already there */
471                 break;
472
473         case 3:         /* id not inserted - block is full */
474                 /*
475                  * first, see if it will fit in the next block,
476                  * without splitting, unless we're trying to insert
477                  * into the beginning of the first block.
478                  */
479
480                 /* is there a next block? */
481                 if ( !first && !ID_BLOCK_NOID(idl, i + 1) ) {
482                         /* read it in */
483                         sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr,
484                             ID_BLOCK_ID(idl, i + 1) );
485                         k2.dptr = kstr;
486                         k2.dsize = strlen( kstr ) + 1;
487                         if ( (tmp2 = idl_fetch_one( be, db, k2 )) == NULL ) {
488                                 Debug( LDAP_DEBUG_ANY,
489                                     "idl_fetch_one (%s) returns NULL\n",
490                                     k2.dptr, 0, 0 );
491                                 break;
492                         }
493
494                         switch ( (rc = idl_insert( &tmp2, id,
495                             db->dbc_maxids )) ) {
496                         case 1:         /* id inserted first in block */
497                                 rc = idl_change_first( be, db, key, idl,
498                                     i + 1, k2, tmp2 );
499                                 /* FALL */
500
501                         case 2:         /* id already there - how? */
502                         case 0:         /* id inserted */
503                                 if ( rc == 2 ) {
504                                         Debug( LDAP_DEBUG_ANY,
505                                             "id %ld already in next block\n",
506                                             id, 0, 0 );
507                                 }
508                                 free( kstr );
509                                 idl_free( tmp );
510                                 idl_free( tmp2 );
511                                 idl_free( idl );
512                                 return( 0 );
513
514                         case 3:         /* split the original block */
515                                 break;
516                         }
517                         idl_free( tmp2 );
518                 }
519
520                 /*
521                  * must split the block, write both new blocks + update
522                  * and write the indirect header block.
523                  */
524
525                 /* count how many indirect blocks *//* XXX linear count XXX */
526                 for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ )
527                         ;       /* NULL */
528
529                 /* check it against all-id thresholed */
530                 if ( j + 1 > db->dbc_maxindirect ) {
531                         /*
532                          * we've passed the all-id threshold, meaning
533                          * that this set of blocks should be replaced
534                          * by a single "all-id" block.  our job: delete
535                          * all the indirect blocks, and replace the header
536                          * block by an all-id block.
537                          */
538
539                         /* delete all indirect blocks */
540                         for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ ) {
541                                 sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr,
542                                     ID_BLOCK_ID(idl, j) );
543                                 k2.dptr = kstr;
544                                 k2.dsize = strlen( kstr ) + 1;
545
546                                 rc = ldbm_cache_delete( db, k2 );
547                         }
548
549                         /* store allid block in place of header block */
550                         idl_free( idl );
551                         idl = idl_allids( be );
552                         rc = idl_store( be, db, key, idl );
553
554                         free( kstr );
555                         idl_free( idl );
556                         idl_free( tmp );
557                         return( rc );
558                 }
559
560                 idl_split_block( tmp, id, &tmp2, &tmp3 );
561                 idl_free( tmp );
562
563                 /* create a new updated indirect header block */
564                 tmp = idl_alloc( ID_BLOCK_NMAX(idl) + 1 );
565                 ID_BLOCK_NIDS(tmp) = ID_BLOCK_INDIRECT_VALUE;
566                 /* everything up to the split block */
567                 SAFEMEMCPY(
568                         (char *) &ID_BLOCK_ID(tmp, 0),
569                         (char *) &ID_BLOCK_ID(idl, 0),
570                     i * sizeof(ID) );
571                 /* the two new blocks */
572                 ID_BLOCK_ID(tmp, i) = ID_BLOCK_ID(tmp2, 0);
573                 ID_BLOCK_ID(tmp, i + 1) = ID_BLOCK_ID(tmp3, 0);
574                 /* everything after the split block */
575                 SAFEMEMCPY(
576                         (char *) &ID_BLOCK_ID(tmp, i + 2),
577                         (char *) &ID_BLOCK_ID(idl, i + 1),
578                         (ID_BLOCK_NMAX(idl) - i - 1) * sizeof(ID) );
579
580                 /* store the header block */
581                 rc = idl_store( be, db, key, tmp );
582
583                 /* store the first id block */
584                 sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr,
585                     ID_BLOCK_ID(tmp2, 0) );
586                 k2.dptr = kstr;
587                 k2.dsize = strlen( kstr ) + 1;
588                 rc = idl_store( be, db, k2, tmp2 );
589
590                 /* store the second id block */
591                 sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr,
592                     ID_BLOCK_ID(tmp3, 0) );
593                 k2.dptr = kstr;
594                 k2.dsize = strlen( kstr ) + 1;
595                 rc = idl_store( be, db, k2, tmp3 );
596
597                 idl_free( tmp2 );
598                 idl_free( tmp3 );
599                 break;
600         }
601
602         free( kstr );
603         idl_free( tmp );
604         idl_free( idl );
605         return( rc );
606 }
607
608
609 /*
610  * idl_insert - insert an id into an id list.
611  *
612  *      returns
613  *              0       id inserted
614  *              1       id inserted, first id in block has changed
615  *              2       id not inserted, already there
616  *              3       id not inserted, block must be split
617  */
618 int
619 idl_insert( ID_BLOCK **idl, ID id, unsigned int maxids )
620 {
621         unsigned int    i, j;
622
623         if ( ID_BLOCK_ALLIDS( *idl ) ) {
624                 return( 2 );    /* already there */
625         }
626
627         /* is it already there? *//* XXX linear search XXX */
628         for ( i = 0; i < ID_BLOCK_NIDS(*idl) && id > ID_BLOCK_ID(*idl, i); i++ ) {
629                 ;       /* NULL */
630         }
631         if ( i < ID_BLOCK_NIDS(*idl) && ID_BLOCK_ID(*idl, i) == id ) {
632                 return( 2 );    /* already there */
633         }
634
635         /* do we need to make room for it? */
636         if ( ID_BLOCK_NIDS(*idl) == ID_BLOCK_NMAX(*idl) ) {
637                 /* make room or indicate block needs splitting */
638                 if ( ID_BLOCK_NMAX(*idl) >= maxids ) {
639                         return( 3 );    /* block needs splitting */
640                 }
641
642                 ID_BLOCK_NMAX(*idl) *= 2;
643                 if ( ID_BLOCK_NMAX(*idl) > maxids ) {
644                         ID_BLOCK_NMAX(*idl) = maxids;
645                 }
646                 *idl = (ID_BLOCK *) ch_realloc( (char *) *idl,
647                     (ID_BLOCK_NMAX(*idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
648         }
649
650         /* make a slot for the new id *//* XXX bubble move XXX */
651 #define BUBBLE 1
652 #ifdef BUBBLE
653         for ( j = ID_BLOCK_NIDS(*idl); j != i; j-- ) {
654                 ID_BLOCK_ID(*idl, j) = ID_BLOCK_ID(*idl, j-1);
655         }
656 #else
657         SAFEMEMCPY( &ID_BLOCK_ID(*idl, i), &ID_BLOCK_ID(*idl, i+1), 
658                 ID_BLOCK_NIDS(*idl) - i );
659 #endif
660
661         ID_BLOCK_ID(*idl, i) = id;
662         ID_BLOCK_NIDS(*idl)++;
663         (void) memset(
664                 (char *) &ID_BLOCK_ID((*idl), ID_BLOCK_NIDS(*idl)),
665                 '\0',
666             (ID_BLOCK_NMAX(*idl) - ID_BLOCK_NIDS(*idl)) * sizeof(ID) );
667
668         return( i == 0 ? 1 : 0 );       /* inserted - first id changed or not */
669 }
670
671
672 int
673 idl_delete_key (
674         Backend         *be,
675         DBCache  *db,
676         Datum           key,
677         ID              id
678 )
679 {
680         Datum  data;
681         ID_BLOCK *idl, *tmp;
682         unsigned i;
683         int j, nids;
684         char    *kstr;
685
686         if ( (idl = idl_fetch_one( be, db, key ) ) == NULL )
687         {
688                 /* It wasn't found.  Hmm... */
689                 return -1;
690         }
691
692         if ( ID_BLOCK_ALLIDS( idl ) ) {
693                 idl_free( idl );
694                 return 0;
695         }
696
697         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
698                 for ( i=0; i < ID_BLOCK_NIDS(idl); i++ ) {
699                         if ( ID_BLOCK_ID(idl, i) == id ) {
700                                 if( --ID_BLOCK_NIDS(idl) == 0 ) {
701                                         ldbm_cache_delete( db, key );
702
703                                 } else {
704                                         SAFEMEMCPY (
705                                                 &ID_BLOCK_ID(idl, i),
706                                                 &ID_BLOCK_ID(idl, i+1),
707                                                 (ID_BLOCK_NIDS(idl)-i) * sizeof(ID) );
708
709                                         ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)) = NOID;
710
711                                         idl_store( be, db, key, idl );
712                                 }
713
714                                 idl_free( idl );
715                                 return 0;
716                         }
717                         /*  We didn't find the ID.  Hmmm... */
718                 }
719                 idl_free( idl );
720                 return -1;
721         }
722         
723         /* We have to go through an indirect block and find the ID
724            in the list of IDL's
725            */
726         for ( nids = 0; !ID_BLOCK_NOID(idl, nids); nids++ )
727                 ;       /* NULL */
728         kstr = (char *) ch_malloc( key.dsize + 20 );
729
730         for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ ) 
731         {
732                 ldbm_datum_init( data );
733                 sprintf( kstr, "%c%s%ld", CONT_PREFIX, key.dptr, ID_BLOCK_ID(idl, j) );
734                 data.dptr = kstr;
735                 data.dsize = strlen( kstr ) + 1;
736
737                 if ( (tmp = idl_fetch_one( be, db, data )) == NULL ) {
738                         Debug( LDAP_DEBUG_ANY,
739                             "idl_fetch of (%s) returns NULL\n", data.dptr, 0, 0 );
740                         continue;
741                 }
742                 /*
743                    Now try to find the ID in tmp
744                 */
745                 for ( i=0; i < ID_BLOCK_NIDS(tmp); i++ )
746                 {
747                         if ( ID_BLOCK_ID(tmp, i) == id )
748                         {
749                                 SAFEMEMCPY(
750                                         &ID_BLOCK_ID(tmp, i),
751                                         &ID_BLOCK_ID(tmp, i+1),
752                                         (ID_BLOCK_NIDS(tmp)-(i+1)) * sizeof(ID));
753                                 ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp)-1 ) = NOID;
754                                 ID_BLOCK_NIDS(tmp)--;
755
756                                 if ( ID_BLOCK_NIDS(tmp) ) {
757                                         idl_store ( be, db, data, tmp );
758
759                                 } else {
760                                         ldbm_cache_delete( db, data );
761                                         SAFEMEMCPY(
762                                                 &ID_BLOCK_ID(idl, j),
763                                                 &ID_BLOCK_ID(idl, j+1),
764                                                 (nids-(j+1)) * sizeof(ID));
765                                         ID_BLOCK_ID(idl, nids-1) = NOID;
766                                         nids--;
767                                         if ( ! nids )
768                                                 ldbm_cache_delete( db, key );
769                                         else
770                                                 idl_store( be, db, key, idl );
771                                 }
772                                 idl_free( tmp );
773                                 free( kstr );
774                                 idl_free( idl );
775                                 return 0;
776                         }
777                 }
778                 idl_free( tmp );
779         }
780         free( kstr );
781         idl_free( idl );
782         return -1;
783 }
784
785
786 /* return a duplicate of a single ID_BLOCK */
787 static ID_BLOCK *
788 idl_dup( ID_BLOCK *idl )
789 {
790         ID_BLOCK        *new;
791
792         if ( idl == NULL ) {
793                 return( NULL );
794         }
795
796         new = idl_alloc( ID_BLOCK_NMAX(idl) );
797
798         SAFEMEMCPY(
799                 (char *) new,
800                 (char *) idl,
801                 (ID_BLOCK_NMAX(idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
802
803         return( new );
804 }
805
806
807 /* return the smaller ID_BLOCK */
808 static ID_BLOCK *
809 idl_min( ID_BLOCK *a, ID_BLOCK *b )
810 {
811         return( ID_BLOCK_NIDS(a) > ID_BLOCK_NIDS(b) ? b : a );
812 }
813
814
815 /*
816  * idl_intersection - return a intersection b
817  */
818 ID_BLOCK *
819 idl_intersection(
820     Backend     *be,
821     ID_BLOCK    *a,
822     ID_BLOCK    *b
823 )
824 {
825         unsigned int    ai, bi, ni;
826         ID_BLOCK                *n;
827
828         if ( a == NULL || b == NULL ) {
829                 return( NULL );
830         }
831         if ( ID_BLOCK_ALLIDS( a ) ) {
832                 return( idl_dup( b ) );
833         }
834         if ( ID_BLOCK_ALLIDS( b ) ) {
835                 return( idl_dup( a ) );
836         }
837
838         n = idl_dup( idl_min( a, b ) );
839
840         for ( ni = 0, ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
841                 for ( ;
842                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
843                         bi++ )
844                 {
845                         ;       /* NULL */
846                 }
847
848                 if ( bi == ID_BLOCK_NIDS(b) ) {
849                         break;
850                 }
851
852                 if ( ID_BLOCK_ID(b, bi) == ID_BLOCK_ID(a, ai) ) {
853                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
854                 }
855         }
856
857         if ( ni == 0 ) {
858                 idl_free( n );
859                 return( NULL );
860         }
861         ID_BLOCK_NIDS(n) = ni;
862
863         return( n );
864 }
865
866
867 /*
868  * idl_union - return a union b
869  */
870 ID_BLOCK *
871 idl_union(
872     Backend     *be,
873     ID_BLOCK    *a,
874     ID_BLOCK    *b
875 )
876 {
877         unsigned int    ai, bi, ni;
878         ID_BLOCK                *n;
879
880         if ( a == NULL ) {
881                 return( idl_dup( b ) );
882         }
883         if ( b == NULL ) {
884                 return( idl_dup( a ) );
885         }
886         if ( ID_BLOCK_ALLIDS( a ) || ID_BLOCK_ALLIDS( b ) ) {
887                 return( idl_allids( be ) );
888         }
889
890         if ( ID_BLOCK_NIDS(b) < ID_BLOCK_NIDS(a) ) {
891                 n = a;
892                 a = b;
893                 b = n;
894         }
895
896         n = idl_alloc( ID_BLOCK_NIDS(a) + ID_BLOCK_NIDS(b) );
897
898         for ( ni = 0, ai = 0, bi = 0;
899                 ai < ID_BLOCK_NIDS(a) && bi < ID_BLOCK_NIDS(b);
900                 )
901         {
902                 if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
903                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai++);
904
905                 } else if ( ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai) ) {
906                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi++);
907
908                 } else {
909                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
910                         ai++, bi++;
911                 }
912         }
913
914         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
915                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
916         }
917         for ( ; bi < ID_BLOCK_NIDS(b); bi++ ) {
918                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi);
919         }
920         ID_BLOCK_NIDS(n) = ni;
921
922         return( n );
923 }
924
925
926 /*
927  * idl_notin - return a intersection ~b (or a minus b)
928  */
929 ID_BLOCK *
930 idl_notin(
931     Backend     *be,
932     ID_BLOCK    *a,
933     ID_BLOCK    *b
934 )
935 {
936         unsigned int    ni, ai, bi;
937         ID_BLOCK                *n;
938
939         if ( a == NULL ) {
940                 return( NULL );
941         }
942         if ( b == NULL || ID_BLOCK_ALLIDS( b )) {
943                 return( idl_dup( a ) );
944         }
945
946         if ( ID_BLOCK_ALLIDS( a ) ) {
947                 n = idl_alloc( SLAPD_LDBM_MIN_MAXIDS );
948                 ni = 0;
949
950                 for ( ai = 1, bi = 0;
951                         ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAX(n) && bi < ID_BLOCK_NMAX(b);
952                         ai++ )
953                 {
954                         if ( ID_BLOCK_ID(b, bi) == ai ) {
955                                 bi++;
956                         } else {
957                                 ID_BLOCK_ID(n, ni++) = ai;
958                         }
959                 }
960
961                 for ( ; ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAX(n); ai++ ) {
962                         ID_BLOCK_ID(n, ni++) = ai;
963                 }
964
965                 if ( ni == ID_BLOCK_NMAX(n) ) {
966                         idl_free( n );
967                         return( idl_allids( be ) );
968                 } else {
969                         ID_BLOCK_NIDS(n) = ni;
970                         return( n );
971                 }
972         }
973
974         n = idl_dup( a );
975
976         ni = 0;
977         for ( ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
978                 for ( ;
979                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
980                     bi++ )
981                 {
982                         ;       /* NULL */
983                 }
984
985                 if ( bi == ID_BLOCK_NIDS(b) ) {
986                         break;
987                 }
988
989                 if ( ID_BLOCK_ID(b, bi) != ID_BLOCK_ID(a, ai) ) {
990                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
991                 }
992         }
993
994         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
995                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
996         }
997         ID_BLOCK_NIDS(n) = ni;
998
999         return( n );
1000 }
1001
1002 /*      return the first ID in the block
1003  *      if ALLIDS block
1004  *              NIDS > 1 return 1
1005  *              otherwise return NOID 
1006  *      otherwise return first ID
1007  */         
1008 ID
1009 idl_firstid( ID_BLOCK *idl )
1010 {
1011         if ( idl == NULL || ID_BLOCK_NIDS(idl) == 0 ) {
1012                 return( NOID );
1013         }
1014
1015         if ( ID_BLOCK_ALLIDS( idl ) ) {
1016                 return( ID_BLOCK_NIDS(idl) > 1 ? 1 : NOID );
1017         }
1018
1019         return( ID_BLOCK_ID(idl, 0) );
1020 }
1021
1022 /*      return next ID after id
1023  *      if ALLIDS block, increment id. 
1024  *              if id < NIDS return id
1025  *              otherwise NOID.
1026  *      otherwise SEARCH for next id (ugh!)
1027  */ 
1028 ID
1029 idl_nextid( ID_BLOCK *idl, ID id )
1030 {
1031         unsigned int    i;
1032
1033         if ( ID_BLOCK_ALLIDS( idl ) ) {
1034                 return( ++id < ID_BLOCK_NIDS(idl) ? id : NOID );
1035         }
1036
1037         for ( i = 0; i < ID_BLOCK_NIDS(idl) && ID_BLOCK_ID(idl, i) <= id; i++ ) {
1038                 ;       /* NULL */
1039         }
1040
1041         if ( i >= ID_BLOCK_NIDS(idl) ) {
1042                 return( NOID );
1043         } else {
1044                 return( ID_BLOCK_ID(idl, i) );
1045         }
1046 }