]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ldbm/idl.c
eae6e3db81f25415b4e4a5dd7ebfbd5800c7009a
[openldap] / servers / slapd / back-ldbm / idl.c
1 /* idl.c - ldap id list handling routines */
2 /* $OpenLDAP$ */
3 /*
4  * Copyright 1998-2002 The OpenLDAP Foundation, All Rights Reserved.
5  * COPYING RESTRICTIONS APPLY, see COPYRIGHT file
6  */
7
8 #include "portable.h"
9
10 #include <stdio.h>
11
12 #include <ac/string.h>
13 #include <ac/socket.h>
14
15 #include "slap.h"
16 #include "back-ldbm.h"
17
18 static ID_BLOCK* idl_dup( ID_BLOCK *idl );
19
20 static void cont_alloc( Datum *cont, Datum *key )
21 {
22         ldbm_datum_init( *cont );
23         cont->dsize = 1 + sizeof(ID) + key->dsize;
24         cont->dptr = ch_malloc( cont->dsize );
25
26         * (unsigned char *) cont->dptr = SLAP_INDEX_CONT_PREFIX;
27
28         AC_MEMCPY( &((unsigned char *)cont->dptr)[1 + sizeof(ID)],
29                 key->dptr, key->dsize );
30 }
31
32 static void cont_id( Datum *cont, ID id )
33 {
34         unsigned int i;
35
36         for( i=1; i <= sizeof(id); i++) {
37                 ((unsigned char *)cont->dptr)[i] = (unsigned char)(id & 0xFF);
38                 id >>= 8;
39         }
40
41 }
42
43 static void cont_free( Datum *cont )
44 {
45         ch_free( cont->dptr );
46 }
47
48 #ifdef LDBM_DEBUG_IDL
49 static void idl_check(ID_BLOCK *idl)
50 {
51         int i, max;
52         ID_BLOCK last;
53
54         if( ID_BLOCK_ALLIDS(idl) )
55         {
56                 return;
57         }
58 #ifndef USE_INDIRECT_NIDS
59         if( ID_BLOCK_INDIRECT(idl) )
60         {
61                 for ( max = 0; !ID_BLOCK_NOID(idl, max); max++ ) ;
62         } else
63 #endif
64         {
65                 max = ID_BLOCK_NIDS(idl);
66         }
67         if ( max <= 1 )
68         {
69                 return;
70         }
71
72         for( last = ID_BLOCK_ID(idl, 0), i = 1;
73                 i < max;
74                 last = ID_BLOCK_ID(idl, i), i++ )
75         {
76                 assert (last < ID_BLOCK_ID(idl, i) );
77         }
78 }
79 #endif
80
81 /* Allocate an ID_BLOCK with room for nids ids */
82 ID_BLOCK *
83 idl_alloc( unsigned int nids )
84 {
85         ID_BLOCK        *new;
86
87         /* nmax + nids + space for the ids */
88         new = (ID_BLOCK *) ch_calloc( (ID_BLOCK_IDS_OFFSET + nids), sizeof(ID) );
89         ID_BLOCK_NMAX(new) = nids;
90         ID_BLOCK_NIDS(new) = 0;
91
92         return( new );
93 }
94
95
96 /* Allocate an empty ALLIDS ID_BLOCK */
97 ID_BLOCK        *
98 idl_allids( Backend *be )
99 {
100         ID_BLOCK        *idl;
101         ID              id;
102
103         idl = idl_alloc( 0 );
104         ID_BLOCK_NMAX(idl) = ID_BLOCK_ALLIDS_VALUE;
105         if ( next_id_get( be, &id ) ) {
106                 return NULL;
107         }
108         ID_BLOCK_NIDS(idl) = id;
109
110         return( idl );
111 }
112
113 /* Free an ID_BLOCK */
114 void
115 idl_free( ID_BLOCK *idl )
116 {
117         if ( idl == NULL ) {
118 #ifdef NEW_LOGGING
119                 LDAP_LOG( INDEX, INFO, "idl_free: called with NULL pointer\n" , 0,0,0);
120 #else
121                 Debug( LDAP_DEBUG_TRACE,
122                         "idl_free: called with NULL pointer\n",
123                         0, 0, 0 );
124 #endif
125
126                 return;
127         }
128
129         free( (char *) idl );
130 }
131
132
133 /* Fetch an single ID_BLOCK from the cache */
134 static ID_BLOCK *
135 idl_fetch_one(
136     Backend             *be,
137     DBCache     *db,
138     Datum               key
139 )
140 {
141         Datum   data;
142         ID_BLOCK        *idl;
143
144         /* Debug( LDAP_DEBUG_TRACE, "=> idl_fetch_one\n", 0, 0, 0 ); */
145
146         data = ldbm_cache_fetch( db, key );
147
148         if( data.dptr == NULL ) {
149                 return NULL;
150         }
151
152         idl = (ID_BLOCK *) data.dptr;
153         if ( ID_BLOCK_ALLIDS(idl) ) {
154                 /* make sure we have the current value of highest id */
155                 idl = idl_allids( be );
156         } else {
157                 idl = idl_dup((ID_BLOCK *) data.dptr);
158         }
159
160         ldbm_datum_free( db->dbc_db, data );
161
162         return idl;
163 }
164
165
166 /* Fetch a set of ID_BLOCKs from the cache
167  *      if not INDIRECT
168  *              if block return is an ALLIDS block,
169  *                      return an new ALLIDS block
170  *              otherwise
171  *                      return block
172  *      construct super block from all blocks referenced by INDIRECT block
173  *      return super block
174  */
175 ID_BLOCK *
176 idl_fetch(
177     Backend             *be,
178     DBCache     *db,
179     Datum               key
180 )
181 {
182         Datum   data;
183         ID_BLOCK        *idl;
184         ID_BLOCK        **tmp;
185         int     nids, nblocks;
186         unsigned i;
187
188         idl = idl_fetch_one( be, db, key );
189
190         if ( idl == NULL ) {
191                 return NULL;
192         }
193
194         if ( ID_BLOCK_ALLIDS(idl) ) {
195                 /* all ids block */
196                 return( idl );
197         }
198
199         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
200                 /* regular block */
201                 return( idl );
202         }
203
204         /*
205          * this is an indirect block which points to other blocks.
206          * we need to read in all the blocks it points to and construct
207          * a big id list containing all the ids, which we will return.
208          */
209
210 #ifndef USE_INDIRECT_NIDS
211         /* count the number of blocks & allocate space for pointers to them */
212         for ( nblocks = 0; !ID_BLOCK_NOID(idl, nblocks); nblocks++ )
213                 ;       /* NULL */
214 #else
215         nblocks = ID_BLOCK_NIDS(idl);
216 #endif
217         tmp = (ID_BLOCK **) ch_malloc( nblocks * sizeof(ID_BLOCK *) );
218
219         /* read in all the blocks */
220         cont_alloc( &data, &key );
221         nids = 0;
222         for ( i = 0; i < nblocks; i++ ) {
223                 cont_id( &data, ID_BLOCK_ID(idl, i) );
224
225                 if ( (tmp[i] = idl_fetch_one( be, db, data )) == NULL ) {
226 #ifdef NEW_LOGGING
227                         LDAP_LOG( INDEX, INFO,
228                                    "idl_fetch: idl_fetch_one returned NULL\n", 0,0,0 );
229 #else
230                         Debug( LDAP_DEBUG_ANY,
231                             "idl_fetch: one returned NULL\n", 0, 0, 0 );
232 #endif
233
234                         continue;
235                 }
236
237                 nids += ID_BLOCK_NIDS(tmp[i]);
238         }
239         cont_free( &data );
240         idl_free( idl );
241
242         /* allocate space for the big block */
243         idl = idl_alloc( nids );
244         ID_BLOCK_NIDS(idl) = nids;
245         nids = 0;
246
247         /* copy in all the ids from the component blocks */
248         for ( i = 0; i < nblocks; i++ ) {
249                 if ( tmp[i] == NULL ) {
250                         continue;
251                 }
252
253                 AC_MEMCPY(
254                         (char *) &ID_BLOCK_ID(idl, nids),
255                         (char *) &ID_BLOCK_ID(tmp[i], 0),
256                         ID_BLOCK_NIDS(tmp[i]) * sizeof(ID) );
257                 nids += ID_BLOCK_NIDS(tmp[i]);
258
259                 idl_free( tmp[i] );
260         }
261         free( (char *) tmp );
262
263 #ifdef LDBM_DEBUG_IDL
264         idl_check(idl);
265 #endif
266
267 #ifdef NEW_LOGGING
268         LDAP_LOG( INDEX, ENTRY, 
269                    "idl_fetch: %ld ids (%ld max)\n",
270                    ID_BLOCK_NIDS(idl), ID_BLOCK_NMAXN(idl), 0 );
271 #else
272         Debug( LDAP_DEBUG_TRACE, "<= idl_fetch %ld ids (%ld max)\n",
273                ID_BLOCK_NIDS(idl), ID_BLOCK_NMAXN(idl), 0 );
274 #endif
275
276         return( idl );
277 }
278
279
280 /* store a single block */
281 static int
282 idl_store(
283     Backend             *be,
284     DBCache     *db,
285     Datum               key, 
286     ID_BLOCK            *idl
287 )
288 {
289         int     rc, flags;
290         Datum   data;
291
292 #ifdef LDBM_DEBUG_IDL
293         idl_check(idl);
294 #endif
295
296         ldbm_datum_init( data );
297
298         /* Debug( LDAP_DEBUG_TRACE, "=> idl_store\n", 0, 0, 0 ); */
299
300         data.dptr = (char *) idl;
301         data.dsize = (ID_BLOCK_IDS_OFFSET + ID_BLOCK_NMAXN(idl)) * sizeof(ID);
302         
303         flags = LDBM_REPLACE;
304         rc = ldbm_cache_store( db, key, data, flags );
305
306         /* Debug( LDAP_DEBUG_TRACE, "<= idl_store %d\n", rc, 0, 0 ); */
307         return( rc );
308 }
309
310 /* Binary search for id in block, return index
311  *    an index is always returned, even with no match. If no
312  * match, the returned index is the insertion point.
313  */
314 static unsigned int
315 idl_find(
316     ID_BLOCK    *b,
317     ID          id
318 )
319 {
320         int lo=0, hi=ID_BLOCK_NIDS(b)-1, nr=0;
321
322         for (;lo<=hi;)
323         {
324             nr = ( lo + hi ) / 2;
325             if (ID_BLOCK_ID(b, nr) == id)
326                 break;
327             if (ID_BLOCK_ID(b, nr) > id)
328                 hi = nr - 1;
329             else
330                 lo = nr + 1;
331         }
332         return nr;
333 }
334
335 /* split the block at id 
336  *      locate ID greater than or equal to id.
337  */
338 static void
339 idl_split_block(
340     ID_BLOCK    *b,
341     ID          id,
342     ID_BLOCK    **right,
343     ID_BLOCK    **left
344 )
345 {
346         unsigned int    nr, nl;
347
348         /* find where to split the block */
349         nr = idl_find(b, id);
350         if ( ID_BLOCK_ID(b,nr) < id )
351                 nr++;
352
353         nl = ID_BLOCK_NIDS(b) - nr;
354
355         *right = idl_alloc( nr == 0 ? 1 : nr );
356         *left = idl_alloc( nl + (nr == 0 ? 0 : 1));
357
358         /*
359          * everything before the id being inserted in the first block
360          * unless there is nothing, in which case the id being inserted
361          * goes there.
362          */
363         if ( nr == 0 ) {
364                 ID_BLOCK_NIDS(*right) = 1;
365                 ID_BLOCK_ID(*right, 0) = id;
366         } else {
367                 AC_MEMCPY(
368                         (char *) &ID_BLOCK_ID(*right, 0),
369                         (char *) &ID_BLOCK_ID(b, 0),
370                         nr * sizeof(ID) );
371                 ID_BLOCK_NIDS(*right) = nr;
372                 ID_BLOCK_ID(*left, 0) = id;
373         }
374
375         /* the id being inserted & everything after in the second block */
376         AC_MEMCPY(
377                 (char *) &ID_BLOCK_ID(*left, (nr == 0 ? 0 : 1)),
378             (char *) &ID_BLOCK_ID(b, nr),
379                 nl * sizeof(ID) );
380         ID_BLOCK_NIDS(*left) = nl + (nr == 0 ? 0 : 1);
381
382 #ifdef LDBM_DEBUG_IDL
383         idl_check(*right);
384         idl_check(*left);
385 #endif
386 }
387
388
389 /*
390  * idl_change_first - called when an indirect block's first key has
391  * changed, meaning it needs to be stored under a new key, and the
392  * header block pointing to it needs updating.
393  */
394 static int
395 idl_change_first(
396     Backend             *be,
397     DBCache     *db,
398     Datum               hkey,           /* header block key     */
399     ID_BLOCK            *h,             /* header block         */
400     int                 pos,            /* pos in h to update   */
401     Datum               bkey,           /* data block key       */
402     ID_BLOCK            *b              /* data block           */
403 )
404 {
405         int     rc;
406
407         /* Debug( LDAP_DEBUG_TRACE, "=> idl_change_first\n", 0, 0, 0 ); */
408
409         /* delete old key block */
410         if ( (rc = ldbm_cache_delete( db, bkey )) != 0 ) {
411 #ifdef NEW_LOGGING
412                 LDAP_LOG( INDEX, INFO, 
413                            "idl_change_first: ldbm_cache_delete returned %d\n", rc, 0, 0 );
414 #else
415                 Debug( LDAP_DEBUG_ANY,
416                     "idl_change_first: ldbm_cache_delete returned %d\n",
417                         rc, 0, 0 );
418 #endif
419
420                 return( rc );
421         }
422
423         /* write block with new key */
424         cont_id( &bkey, ID_BLOCK_ID(b, 0) );
425
426         if ( (rc = idl_store( be, db, bkey, b )) != 0 ) {
427 #ifdef NEW_LOGGING
428                 LDAP_LOG( INDEX, INFO, 
429                            "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
430 #else
431                 Debug( LDAP_DEBUG_ANY,
432                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
433 #endif
434
435                 return( rc );
436         }
437
438         /* update + write indirect header block */
439         ID_BLOCK_ID(h, pos) = ID_BLOCK_ID(b, 0);
440         if ( (rc = idl_store( be, db, hkey, h )) != 0 ) {
441 #ifdef NEW_LOGGING
442                 LDAP_LOG( INDEX, INFO, 
443                            "idl_change_first: idl_store returned %s\n", rc, 0, 0 );
444 #else
445                 Debug( LDAP_DEBUG_ANY,
446                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
447 #endif
448
449                 return( rc );
450         }
451
452         return( 0 );
453 }
454
455
456 int
457 idl_insert_key(
458     Backend             *be,
459     DBCache     *db,
460     Datum               key,
461     ID                  id
462 )
463 {
464         int     i, j, first, rc = 0;
465         ID_BLOCK        *idl, *tmp, *tmp2, *tmp3;
466         Datum   k2;
467
468         if ( (idl = idl_fetch_one( be, db, key )) == NULL ) {
469                 idl = idl_alloc( 1 );
470                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)++) = id;
471                 rc = idl_store( be, db, key, idl );
472
473                 idl_free( idl );
474                 return( rc );
475         }
476
477         if ( ID_BLOCK_ALLIDS( idl ) ) {
478                 /* ALLIDS */
479                 idl_free( idl );
480                 return 0;
481         }
482
483         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
484                 /* regular block */
485                 switch ( idl_insert( &idl, id, db->dbc_maxids ) ) {
486                 case 0:         /* id inserted - store the updated block */
487                 case 1:
488                         rc = idl_store( be, db, key, idl );
489                         break;
490
491                 case 2:         /* id already there - nothing to do */
492                         rc = 0;
493                         break;
494
495                 case 3:         /* id not inserted - block must be split */
496                         /* check threshold for marking this an all-id block */
497                         if ( db->dbc_maxindirect < 2 ) {
498                                 idl_free( idl );
499                                 idl = idl_allids( be );
500                                 rc = idl_store( be, db, key, idl );
501                                 break;
502                         }
503
504                         idl_split_block( idl, id, &tmp, &tmp2 );
505                         idl_free( idl );
506
507                         /* create the header indirect block */
508 #ifndef USE_INDIRECT_NIDS
509                         idl = idl_alloc( 3 );
510                         ID_BLOCK_NMAX(idl) = 3;
511                         ID_BLOCK_NIDS(idl) = ID_BLOCK_INDIRECT_VALUE;
512                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
513                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
514                         ID_BLOCK_ID(idl, 2) = NOID;
515 #else
516                         idl = idl_alloc( 2 );
517                         ID_BLOCK_NMAX(idl) = 2 | ID_BLOCK_INDIRECT_VALUE;
518                         ID_BLOCK_NIDS(idl) = 2;
519                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
520                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
521 #endif
522
523                         /* store it */
524                         rc = idl_store( be, db, key, idl );
525
526                         cont_alloc( &k2, &key );
527                         cont_id( &k2, ID_BLOCK_ID(tmp, 0) );
528
529                         rc = idl_store( be, db, k2, tmp );
530
531                         cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
532                         rc = idl_store( be, db, k2, tmp2 );
533
534                         cont_free( &k2 );
535
536                         idl_free( tmp );
537                         idl_free( tmp2 );
538                         break;
539                 }
540
541                 idl_free( idl );
542                 return( rc );
543         }
544
545         /*
546          * this is an indirect block which points to other blocks.
547          * we need to read in the block into which the id should be
548          * inserted, then insert the id and store the block.  we might
549          * have to split the block if it is full, which means we also
550          * need to write a new "header" block.
551          */
552
553 #ifndef USE_INDIRECT_NIDS
554         /* select the block to try inserting into *//* XXX linear search XXX */
555         for ( i = 0; !ID_BLOCK_NOID(idl, i) && id > ID_BLOCK_ID(idl, i); i++ )
556                 ;       /* NULL */
557 #else
558         i = idl_find(idl, id);
559         if (ID_BLOCK_ID(idl, i) < id)
560                 i++;
561 #endif
562
563         if ( i != 0 ) {
564                 i--;
565                 first = 0;
566         } else {
567                 first = 1;
568         }
569
570         /* get the block */
571         cont_alloc( &k2, &key );
572         cont_id( &k2, ID_BLOCK_ID(idl, i) );
573
574         if ( (tmp = idl_fetch_one( be, db, k2 )) == NULL ) {
575 #ifdef NEW_LOGGING
576                 LDAP_LOG( INDEX, ERR,
577                            "idl_insert_key: nonexistent continuation block\n", 0, 0, 0 );
578 #else
579                 Debug( LDAP_DEBUG_ANY, "idl_insert_key: nonexistent continuation block\n",
580                     0, 0, 0 );
581 #endif
582
583                 cont_free( &k2 );
584                 idl_free( idl );
585                 return( -1 );
586         }
587
588         /* insert the id */
589         switch ( idl_insert( &tmp, id, db->dbc_maxids ) ) {
590         case 0:         /* id inserted ok */
591                 if ( (rc = idl_store( be, db, k2, tmp )) != 0 ) {
592 #ifdef NEW_LOGGING
593                         LDAP_LOG( INDEX, ERR, 
594                                    "ids_insert_key: idl_store returned %d\n", rc, 0, 0 );
595 #else
596                         Debug( LDAP_DEBUG_ANY,
597                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
598 #endif
599
600                 }
601                 break;
602
603         case 1:         /* id inserted - first id in block has changed */
604                 /*
605                  * key for this block has changed, so we have to
606                  * write the block under the new key, delete the
607                  * old key block + update and write the indirect
608                  * header block.
609                  */
610
611                 rc = idl_change_first( be, db, key, idl, i, k2, tmp );
612                 break;
613
614         case 2:         /* id not inserted - already there, do nothing */
615                 rc = 0;
616                 break;
617
618         case 3:         /* id not inserted - block is full */
619                 /*
620                  * first, see if it will fit in the next block,
621                  * without splitting, unless we're trying to insert
622                  * into the beginning of the first block.
623                  */
624
625 #ifndef USE_INDIRECT_NIDS
626                 /* is there a next block? */
627                 if ( !first && !ID_BLOCK_NOID(idl, i + 1) ) {
628 #else
629                 if ( !first && (unsigned long)(i + 1) < ID_BLOCK_NIDS(idl) ) {
630 #endif
631                         /* read it in */
632                         cont_alloc( &k2, &key );
633                         cont_id( &k2, ID_BLOCK_ID(idl, i + 1) );
634                         if ( (tmp2 = idl_fetch_one( be, db, k2 )) == NULL ) {
635 #ifdef NEW_LOGGING
636                                 LDAP_LOG( INDEX, ERR,
637                                            "idl_insert_key: idl_fetch_one returned NULL\n", 0, 0, 0);
638 #else
639                                 Debug( LDAP_DEBUG_ANY,
640                                     "idl_insert_key: idl_fetch_one returned NULL\n",
641                                     0, 0, 0 );
642 #endif
643
644                                 /* split the original block */
645                                 cont_free( &k2 );
646                                 goto split;
647                         }
648
649                         /* If the new id is less than the last id in the
650                          * current block, it must not be put into the next
651                          * block. Push the last id of the current block
652                          * into the next block instead.
653                          */
654                         if (id < ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1)) {
655                             ID id2 = ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1);
656
657                             --ID_BLOCK_NIDS(tmp);
658                             /* This must succeed since we just popped one
659                              * ID off the end of it.
660                              */
661                             rc = idl_insert( &tmp, id, db->dbc_maxids );
662
663                             if ( (rc = idl_store( be, db, k2, tmp )) != 0 ) {
664 #ifdef NEW_LOGGING
665                                 LDAP_LOG( INDEX, ERR, 
666                                         "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
667 #else
668                                 Debug( LDAP_DEBUG_ANY,
669                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
670 #endif
671
672                             }
673
674                             id = id2;
675                             /* This new id will necessarily be inserted
676                              * as the first id of the next block by the
677                              * following switch() statement.
678                              */
679                         }
680
681                         switch ( (rc = idl_insert( &tmp2, id,
682                             db->dbc_maxids )) ) {
683                         case 1:         /* id inserted first in block */
684                                 rc = idl_change_first( be, db, key, idl,
685                                     i + 1, k2, tmp2 );
686                                 /* FALL */
687
688                         case 2:         /* id already there - how? */
689                         case 0:         /* id inserted: this can never be
690                                          * the result of idl_insert, because
691                                          * we guaranteed that idl_change_first
692                                          * will always be called.
693                                          */
694                                 if ( rc == 2 ) {
695 #ifdef NEW_LOGGING
696                                         LDAP_LOG( INDEX, INFO, 
697                                                    "idl_insert_key: id %ld is already in next block\n", 
698                                                    id, 0, 0 );
699 #else
700                                         Debug( LDAP_DEBUG_ANY,
701                                             "idl_insert_key: id %ld already in next block\n",
702                                             id, 0, 0 );
703 #endif
704
705                                 }
706
707                                 idl_free( tmp );
708                                 idl_free( tmp2 );
709                                 idl_free( idl );
710                                 return( 0 );
711
712                         case 3:         /* split the original block */
713                                 break;
714                         }
715
716                         idl_free( tmp2 );
717                 }
718
719 split:
720                 /*
721                  * must split the block, write both new blocks + update
722                  * and write the indirect header block.
723                  */
724
725                 rc = 0; /* optimistic */
726
727
728 #ifndef USE_INDIRECT_NIDS
729                 /* count how many indirect blocks *//* XXX linear count XXX */
730                 for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ )
731                         ;       /* NULL */
732 #else
733                 j = ID_BLOCK_NIDS(idl);
734 #endif
735
736                 /* check it against all-id thresholed */
737                 if ( j + 1 > db->dbc_maxindirect ) {
738                         /*
739                          * we've passed the all-id threshold, meaning
740                          * that this set of blocks should be replaced
741                          * by a single "all-id" block.  our job: delete
742                          * all the indirect blocks, and replace the header
743                          * block by an all-id block.
744                          */
745
746                         /* delete all indirect blocks */
747 #ifndef USE_INDIRECT_NIDS
748                         for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ ) {
749 #else
750                         for ( j = 0; (unsigned long) j < ID_BLOCK_NIDS(idl); j++ ) {
751 #endif
752                                 cont_id( &k2, ID_BLOCK_ID(idl, j) );
753
754                                 rc = ldbm_cache_delete( db, k2 );
755                         }
756
757                         /* store allid block in place of header block */
758                         idl_free( idl );
759                         idl = idl_allids( be );
760                         rc = idl_store( be, db, key, idl );
761
762                         cont_free( &k2 );
763                         idl_free( idl );
764                         idl_free( tmp );
765                         return( rc );
766                 }
767
768                 idl_split_block( tmp, id, &tmp2, &tmp3 );
769                 idl_free( tmp );
770
771                 /* create a new updated indirect header block */
772                 tmp = idl_alloc( ID_BLOCK_NMAXN(idl) + 1 );
773 #ifndef USE_INDIRECT_NIDS
774                 ID_BLOCK_NIDS(tmp) = ID_BLOCK_INDIRECT_VALUE;
775 #else
776                 ID_BLOCK_NMAX(tmp) |= ID_BLOCK_INDIRECT_VALUE;
777 #endif
778                 /* everything up to the split block */
779                 AC_MEMCPY(
780                         (char *) &ID_BLOCK_ID(tmp, 0),
781                         (char *) &ID_BLOCK_ID(idl, 0),
782                     i * sizeof(ID) );
783                 /* the two new blocks */
784                 ID_BLOCK_ID(tmp, i) = ID_BLOCK_ID(tmp2, 0);
785                 ID_BLOCK_ID(tmp, i + 1) = ID_BLOCK_ID(tmp3, 0);
786                 /* everything after the split block */
787 #ifndef USE_INDIRECT_NIDS
788                 AC_MEMCPY(
789                         (char *) &ID_BLOCK_ID(tmp, i + 2),
790                         (char *) &ID_BLOCK_ID(idl, i + 1),
791                         (ID_BLOCK_NMAXN(idl) - i - 1) * sizeof(ID) );
792 #else
793                 AC_MEMCPY(
794                         (char *) &ID_BLOCK_ID(tmp, i + 2),
795                         (char *) &ID_BLOCK_ID(idl, i + 1),
796                         (ID_BLOCK_NIDS(idl) - i - 1) * sizeof(ID) );
797                 ID_BLOCK_NIDS(tmp) = ID_BLOCK_NIDS(idl) + 1;
798 #endif
799
800                 /* store the header block */
801                 rc = idl_store( be, db, key, tmp );
802
803                 /* store the first id block */
804                 cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
805                 rc = idl_store( be, db, k2, tmp2 );
806
807                 /* store the second id block */
808                 cont_id( &k2, ID_BLOCK_ID(tmp3, 0) );
809                 rc = idl_store( be, db, k2, tmp3 );
810
811                 idl_free( tmp2 );
812                 idl_free( tmp3 );
813                 break;
814         }
815
816         cont_free( &k2 );
817         idl_free( tmp );
818         idl_free( idl );
819         return( rc );
820 }
821
822
823 /*
824  * idl_insert - insert an id into an id list.
825  *
826  *      returns
827  *              0       id inserted
828  *              1       id inserted, first id in block has changed
829  *              2       id not inserted, already there
830  *              3       id not inserted, block must be split
831  */
832 int
833 idl_insert( ID_BLOCK **idl, ID id, unsigned int maxids )
834 {
835         unsigned int    i;
836
837         if ( ID_BLOCK_ALLIDS( *idl ) ) {
838                 return( 2 );    /* already there */
839         }
840
841         /* is it already there? */
842         i = idl_find(*idl, id);
843         if ( ID_BLOCK_ID(*idl, i) == id ) {
844                 return( 2 );    /* already there */
845         }
846         if ( ID_BLOCK_NIDS(*idl) && ID_BLOCK_ID(*idl, i) < id )
847                 i++;
848
849         /* do we need to make room for it? */
850         if ( ID_BLOCK_NIDS(*idl) == ID_BLOCK_NMAXN(*idl) ) {
851                 /* make room or indicate block needs splitting */
852                 if ( ID_BLOCK_NMAXN(*idl) >= maxids ) {
853                         return( 3 );    /* block needs splitting */
854                 }
855
856                 ID_BLOCK_NMAX(*idl) *= 2;
857                 if ( ID_BLOCK_NMAXN(*idl) > maxids ) {
858                         ID_BLOCK_NMAX(*idl) = maxids;
859                 }
860                 *idl = (ID_BLOCK *) ch_realloc( (char *) *idl,
861                     (ID_BLOCK_NMAXN(*idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
862         }
863
864         /* make a slot for the new id */
865         AC_MEMCPY( &ID_BLOCK_ID(*idl, i+1), &ID_BLOCK_ID(*idl, i),
866                     (ID_BLOCK_NIDS(*idl) - i) * sizeof(ID) );
867
868         ID_BLOCK_ID(*idl, i) = id;
869         ID_BLOCK_NIDS(*idl)++;
870         (void) memset(
871                 (char *) &ID_BLOCK_ID((*idl), ID_BLOCK_NIDS(*idl)),
872                 '\0',
873             (ID_BLOCK_NMAXN(*idl) - ID_BLOCK_NIDS(*idl)) * sizeof(ID) );
874
875 #ifdef LDBM_DEBUG_IDL
876         idl_check(*idl);
877 #endif
878
879         return( i == 0 ? 1 : 0 );       /* inserted - first id changed or not */
880 }
881
882
883 int
884 idl_delete_key (
885         Backend         *be,
886         DBCache  *db,
887         Datum           key,
888         ID              id
889 )
890 {
891         Datum  data;
892         ID_BLOCK *idl;
893         unsigned i;
894         int j, nids;
895
896         if ( (idl = idl_fetch_one( be, db, key ) ) == NULL )
897         {
898                 /* It wasn't found.  Hmm... */
899                 return -1;
900         }
901
902         if ( ID_BLOCK_ALLIDS( idl ) ) {
903                 idl_free( idl );
904                 return 0;
905         }
906
907         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
908                 i = idl_find(idl, id);
909                 if ( ID_BLOCK_ID(idl, i) == id ) {
910                         if( --ID_BLOCK_NIDS(idl) == 0 ) {
911                                 ldbm_cache_delete( db, key );
912
913                         } else {
914                                 AC_MEMCPY(
915                                         &ID_BLOCK_ID(idl, i),
916                                         &ID_BLOCK_ID(idl, i+1),
917                                         (ID_BLOCK_NIDS(idl)-i) * sizeof(ID) );
918
919                                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)) = NOID;
920
921                                 idl_store( be, db, key, idl );
922                         }
923
924                         idl_free( idl );
925                         return 0;
926                 }
927                 /*  We didn't find the ID.  Hmmm... */
928                 idl_free( idl );
929                 return -1;
930         }
931         
932         /* We have to go through an indirect block and find the ID
933            in the list of IDL's
934            */
935         cont_alloc( &data, &key );
936 #ifndef USE_INDIRECT_NIDS
937         for ( nids = 0; !ID_BLOCK_NOID(idl, nids); nids++ )
938                 ;       /* NULL */
939
940         for ( j = 0; j<nids; j++ ) 
941 #else
942         nids = ID_BLOCK_NIDS(idl);
943         for ( j = idl_find(idl, id); j >= 0; j = -1)    /* execute once */
944 #endif
945         {
946                 ID_BLOCK *tmp;
947                 cont_id( &data, ID_BLOCK_ID(idl, j) );
948
949                 if ( (tmp = idl_fetch_one( be, db, data )) == NULL ) {
950 #ifdef NEW_LOGGING
951                         LDAP_LOG( INDEX, INFO,
952                                    "idl_delete_key: idl_fetch_one returned NULL\n", 0, 0, 0 );
953 #else
954                         Debug( LDAP_DEBUG_ANY,
955                             "idl_delete_key: idl_fetch of returned NULL\n", 0, 0, 0 );
956 #endif
957
958                         continue;
959                 }
960                 /*
961                    Now try to find the ID in tmp
962                 */
963
964                 i = idl_find(tmp, id);
965                 if ( ID_BLOCK_ID(tmp, i) == id )
966                 {
967                         AC_MEMCPY(
968                                 &ID_BLOCK_ID(tmp, i),
969                                 &ID_BLOCK_ID(tmp, i+1),
970                                 (ID_BLOCK_NIDS(tmp)-(i+1)) * sizeof(ID));
971                         ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp)-1 ) = NOID;
972                         ID_BLOCK_NIDS(tmp)--;
973
974                         if ( ID_BLOCK_NIDS(tmp) ) {
975                                 idl_store ( be, db, data, tmp );
976
977                         } else {
978                                 ldbm_cache_delete( db, data );
979                                 AC_MEMCPY(
980                                         &ID_BLOCK_ID(idl, j),
981                                         &ID_BLOCK_ID(idl, j+1),
982                                         (nids-(j+1)) * sizeof(ID));
983                                 ID_BLOCK_ID(idl, nids-1) = NOID;
984                                 nids--;
985 #ifdef USE_INDIRECT_NIDS
986                                 ID_BLOCK_NIDS(idl)--;
987 #endif
988                                 if ( ! nids )
989                                         ldbm_cache_delete( db, key );
990                                 else
991                                         idl_store( be, db, key, idl );
992                         }
993                         idl_free( tmp );
994                         cont_free( &data );
995                         idl_free( idl );
996                         return 0;
997                 }
998                 idl_free( tmp );
999         }
1000
1001         cont_free( &data );
1002         idl_free( idl );
1003         return -1;
1004 }
1005
1006
1007 /* return a duplicate of a single ID_BLOCK */
1008 static ID_BLOCK *
1009 idl_dup( ID_BLOCK *idl )
1010 {
1011         ID_BLOCK        *new;
1012
1013         if ( idl == NULL ) {
1014                 return( NULL );
1015         }
1016
1017         new = idl_alloc( ID_BLOCK_NMAXN(idl) );
1018
1019         AC_MEMCPY(
1020                 (char *) new,
1021                 (char *) idl,
1022                 (ID_BLOCK_NMAXN(idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
1023
1024 #ifdef LDBM_DEBUG_IDL
1025         idl_check(new);
1026 #endif
1027
1028         return( new );
1029 }
1030
1031
1032 /* return the smaller ID_BLOCK */
1033 static ID_BLOCK *
1034 idl_min( ID_BLOCK *a, ID_BLOCK *b )
1035 {
1036         return( ID_BLOCK_NIDS(a) > ID_BLOCK_NIDS(b) ? b : a );
1037 }
1038
1039
1040 /*
1041  * idl_intersection - return a intersection b
1042  */
1043 ID_BLOCK *
1044 idl_intersection(
1045     Backend     *be,
1046     ID_BLOCK    *a,
1047     ID_BLOCK    *b
1048 )
1049 {
1050         unsigned int    ai, bi, ni;
1051         ID_BLOCK                *n;
1052
1053         if ( a == NULL || b == NULL ) {
1054                 return( NULL );
1055         }
1056         if ( ID_BLOCK_ALLIDS( a ) ) {
1057                 return( idl_dup( b ) );
1058         }
1059         if ( ID_BLOCK_ALLIDS( b ) ) {
1060                 return( idl_dup( a ) );
1061         }
1062         if ( ID_BLOCK_NIDS(a) == 0 || ID_BLOCK_NIDS(b) == 0 ) {
1063                 return( NULL );
1064         }
1065
1066         n = idl_dup( idl_min( a, b ) );
1067
1068 #ifdef LDBM_DEBUG_IDL
1069         idl_check(a);
1070         idl_check(b);
1071 #endif
1072
1073         for ( ni = 0, ai = 0, bi = 0; ; ) {
1074                 if ( ID_BLOCK_ID(b, bi) == ID_BLOCK_ID(a, ai) ) {
1075                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1076                         ai++;
1077                         bi++;
1078                         if ( ai >= ID_BLOCK_NIDS(a) || bi >= ID_BLOCK_NIDS(b) )
1079                                 break;
1080                 } else if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
1081                         ai++;
1082                         if ( ai >= ID_BLOCK_NIDS(a) )
1083                                 break;
1084                 } else {
1085                         bi++;
1086                         if ( bi >= ID_BLOCK_NIDS(b) )
1087                                 break;
1088                 }
1089         }
1090
1091         if ( ni == 0 ) {
1092                 idl_free( n );
1093                 return( NULL );
1094         }
1095         ID_BLOCK_NIDS(n) = ni;
1096
1097 #ifdef LDBM_DEBUG_IDL
1098         idl_check(n);
1099 #endif
1100
1101         return( n );
1102 }
1103
1104
1105 /*
1106  * idl_union - return a union b
1107  */
1108 ID_BLOCK *
1109 idl_union(
1110     Backend     *be,
1111     ID_BLOCK    *a,
1112     ID_BLOCK    *b
1113 )
1114 {
1115         unsigned int    ai, bi, ni;
1116         ID_BLOCK                *n;
1117
1118         if ( a == NULL ) {
1119                 return( idl_dup( b ) );
1120         }
1121         if ( b == NULL ) {
1122                 return( idl_dup( a ) );
1123         }
1124         if ( ID_BLOCK_ALLIDS( a ) || ID_BLOCK_ALLIDS( b ) ) {
1125                 return( idl_allids( be ) );
1126         }
1127
1128 #ifdef LDBM_DEBUG_IDL
1129         idl_check(a);
1130         idl_check(b);
1131 #endif
1132
1133         if ( ID_BLOCK_NIDS(b) < ID_BLOCK_NIDS(a) ) {
1134                 n = a;
1135                 a = b;
1136                 b = n;
1137         }
1138
1139         n = idl_alloc( ID_BLOCK_NIDS(a) + ID_BLOCK_NIDS(b) );
1140
1141         for ( ni = 0, ai = 0, bi = 0;
1142                 ai < ID_BLOCK_NIDS(a) && bi < ID_BLOCK_NIDS(b);
1143                 )
1144         {
1145                 if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
1146                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai++);
1147
1148                 } else if ( ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai) ) {
1149                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi++);
1150
1151                 } else {
1152                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1153                         ai++, bi++;
1154                 }
1155         }
1156
1157         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1158                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1159         }
1160         for ( ; bi < ID_BLOCK_NIDS(b); bi++ ) {
1161                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi);
1162         }
1163         ID_BLOCK_NIDS(n) = ni;
1164
1165 #ifdef LDBM_DEBUG_IDL
1166         idl_check(n);
1167 #endif
1168
1169         return( n );
1170 }
1171
1172
1173 /*
1174  * idl_notin - return a intersection ~b (or a minus b)
1175  */
1176 ID_BLOCK *
1177 idl_notin(
1178     Backend     *be,
1179     ID_BLOCK    *a,
1180     ID_BLOCK    *b
1181 )
1182 {
1183         unsigned int    ni, ai, bi;
1184         ID_BLOCK                *n;
1185
1186         if ( a == NULL ) {
1187                 return( NULL );
1188         }
1189         if ( b == NULL || ID_BLOCK_ALLIDS( b )) {
1190                 return( idl_dup( a ) );
1191         }
1192
1193         if ( ID_BLOCK_ALLIDS( a ) ) {
1194                 n = idl_alloc( SLAPD_LDBM_MIN_MAXIDS );
1195                 ni = 0;
1196
1197                 for ( ai = 1, bi = 0;
1198                         ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAXN(n) && bi < ID_BLOCK_NMAXN(b);
1199                         ai++ )
1200                 {
1201                         if ( ID_BLOCK_ID(b, bi) == ai ) {
1202                                 bi++;
1203                         } else {
1204                                 ID_BLOCK_ID(n, ni++) = ai;
1205                         }
1206                 }
1207
1208                 for ( ; ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAXN(n); ai++ ) {
1209                         ID_BLOCK_ID(n, ni++) = ai;
1210                 }
1211
1212                 if ( ni == ID_BLOCK_NMAXN(n) ) {
1213                         idl_free( n );
1214                         return( idl_allids( be ) );
1215                 } else {
1216                         ID_BLOCK_NIDS(n) = ni;
1217                         return( n );
1218                 }
1219         }
1220
1221         n = idl_dup( a );
1222
1223         ni = 0;
1224         for ( ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
1225                 for ( ;
1226                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
1227                     bi++ )
1228                 {
1229                         ;       /* NULL */
1230                 }
1231
1232                 if ( bi == ID_BLOCK_NIDS(b) ) {
1233                         break;
1234                 }
1235
1236                 if ( ID_BLOCK_ID(b, bi) != ID_BLOCK_ID(a, ai) ) {
1237                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1238                 }
1239         }
1240
1241         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1242                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1243         }
1244         ID_BLOCK_NIDS(n) = ni;
1245
1246 #ifdef LDBM_DEBUG_IDL
1247         idl_check(n);
1248 #endif
1249
1250         return( n );
1251 }
1252
1253 /*      return the first ID in the block
1254  *      if ALLIDS block
1255  *              NIDS > 1 return 1
1256  *              otherwise return NOID 
1257  *      otherwise return first ID
1258  *
1259  *      cursor is set to 1
1260  */         
1261 ID
1262 idl_firstid( ID_BLOCK *idl, ID *cursor )
1263 {
1264         *cursor = 1;
1265
1266         if ( idl == NULL || ID_BLOCK_NIDS(idl) == 0 ) {
1267                 return( NOID );
1268         }
1269
1270         if ( ID_BLOCK_ALLIDS( idl ) ) {
1271                 return( ID_BLOCK_NIDS(idl) > 1 ? 1 : NOID );
1272         }
1273
1274         return( ID_BLOCK_ID(idl, 0) );
1275 }
1276
1277 /*      return next ID
1278  *      if ALLIDS block, cursor is id.
1279  *              increment id
1280  *              if id < NIDS return id
1281  *              otherwise NOID.
1282  *      otherwise cursor is index into block
1283  *              if index < nids
1284  *                      return id at index then increment
1285  */ 
1286 ID
1287 idl_nextid( ID_BLOCK *idl, ID *cursor )
1288 {
1289         if ( ID_BLOCK_ALLIDS( idl ) ) {
1290                 if( ++(*cursor) < ID_BLOCK_NIDS(idl) ) {
1291                         return *cursor;
1292                 } else {
1293                         return NOID;
1294                 }
1295         }
1296
1297         if ( *cursor < ID_BLOCK_NIDS(idl) ) {
1298                 return( ID_BLOCK_ID(idl, (*cursor)++) );
1299         }
1300
1301         return( NOID );
1302 }