]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ldbm/idl.c
ITS#2888 don't return LDAP_SIZELIMIT_EXCEEDED prematurely
[openldap] / servers / slapd / back-ldbm / idl.c
1 /* idl.c - ldap id list handling routines */
2 /* $OpenLDAP$ */
3 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4  *
5  * Copyright 1998-2003 The OpenLDAP Foundation.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted only as authorized by the OpenLDAP
10  * Public License.
11  *
12  * A copy of this license is available in the file LICENSE in the
13  * top-level directory of the distribution or, alternatively, at
14  * <http://www.OpenLDAP.org/license.html>.
15  */
16
17 #include "portable.h"
18
19 #include <stdio.h>
20
21 #include <ac/string.h>
22 #include <ac/socket.h>
23
24 #include "slap.h"
25 #include "back-ldbm.h"
26
27 static ID_BLOCK* idl_dup( ID_BLOCK *idl );
28
29 static void cont_alloc( Datum *cont, Datum *key )
30 {
31         ldbm_datum_init( *cont );
32         cont->dsize = 1 + sizeof(ID) + key->dsize;
33         cont->dptr = ch_malloc( cont->dsize );
34
35         * (unsigned char *) cont->dptr = SLAP_INDEX_CONT_PREFIX;
36
37         AC_MEMCPY( &((unsigned char *)cont->dptr)[1 + sizeof(ID)],
38                 key->dptr, key->dsize );
39 }
40
41 static void cont_id( Datum *cont, ID id )
42 {
43         unsigned int i;
44
45         for( i=1; i <= sizeof(id); i++) {
46                 ((unsigned char *)cont->dptr)[i] = (unsigned char)(id & 0xFF);
47                 id >>= 8;
48         }
49
50 }
51
52 static void cont_free( Datum *cont )
53 {
54         ch_free( cont->dptr );
55 }
56
57 #ifdef LDBM_DEBUG_IDL
58 static void idl_check(ID_BLOCK *idl)
59 {
60         int i, max;
61         ID_BLOCK last;
62
63         if( ID_BLOCK_ALLIDS(idl) )
64         {
65                 return;
66         }
67 #ifndef USE_INDIRECT_NIDS
68         if( ID_BLOCK_INDIRECT(idl) )
69         {
70                 for ( max = 0; !ID_BLOCK_NOID(idl, max); max++ ) ;
71         } else
72 #endif
73         {
74                 max = ID_BLOCK_NIDS(idl);
75         }
76         if ( max <= 1 )
77         {
78                 return;
79         }
80
81         for( last = ID_BLOCK_ID(idl, 0), i = 1;
82                 i < max;
83                 last = ID_BLOCK_ID(idl, i), i++ )
84         {
85                 assert (last < ID_BLOCK_ID(idl, i) );
86         }
87 }
88 #endif
89
90 /* Allocate an ID_BLOCK with room for nids ids */
91 ID_BLOCK *
92 idl_alloc( unsigned int nids )
93 {
94         ID_BLOCK        *new;
95
96         /* nmax + nids + space for the ids */
97         new = (ID_BLOCK *) ch_calloc( (ID_BLOCK_IDS_OFFSET + nids), sizeof(ID) );
98         ID_BLOCK_NMAX(new) = nids;
99         ID_BLOCK_NIDS(new) = 0;
100
101         return( new );
102 }
103
104
105 /* Allocate an empty ALLIDS ID_BLOCK */
106 ID_BLOCK        *
107 idl_allids( Backend *be )
108 {
109         ID_BLOCK        *idl;
110         ID              id;
111
112         idl = idl_alloc( 0 );
113         ID_BLOCK_NMAX(idl) = ID_BLOCK_ALLIDS_VALUE;
114         if ( next_id_get( be, &id ) ) {
115                 return NULL;
116         }
117         ID_BLOCK_NIDS(idl) = id;
118
119         return( idl );
120 }
121
122 /* Free an ID_BLOCK */
123 void
124 idl_free( ID_BLOCK *idl )
125 {
126         if ( idl == NULL ) {
127 #ifdef NEW_LOGGING
128                 LDAP_LOG( INDEX, INFO, "idl_free: called with NULL pointer\n" , 0,0,0);
129 #else
130                 Debug( LDAP_DEBUG_TRACE,
131                         "idl_free: called with NULL pointer\n",
132                         0, 0, 0 );
133 #endif
134
135                 return;
136         }
137
138         free( (char *) idl );
139 }
140
141
142 /* Fetch an single ID_BLOCK from the cache */
143 static ID_BLOCK *
144 idl_fetch_one(
145     Backend             *be,
146     DBCache     *db,
147     Datum               key
148 )
149 {
150         Datum   data;
151         ID_BLOCK        *idl;
152
153         /* Debug( LDAP_DEBUG_TRACE, "=> idl_fetch_one\n", 0, 0, 0 ); */
154
155         data = ldbm_cache_fetch( db, key );
156
157         if( data.dptr == NULL ) {
158                 return NULL;
159         }
160
161         idl = (ID_BLOCK *) data.dptr;
162         if ( ID_BLOCK_ALLIDS(idl) ) {
163                 /* make sure we have the current value of highest id */
164                 idl = idl_allids( be );
165         } else {
166                 idl = idl_dup((ID_BLOCK *) data.dptr);
167         }
168
169         ldbm_datum_free( db->dbc_db, data );
170
171         return idl;
172 }
173
174
175 /* Fetch a set of ID_BLOCKs from the cache
176  *      if not INDIRECT
177  *              if block return is an ALLIDS block,
178  *                      return an new ALLIDS block
179  *              otherwise
180  *                      return block
181  *      construct super block from all blocks referenced by INDIRECT block
182  *      return super block
183  */
184 ID_BLOCK *
185 idl_fetch(
186     Backend             *be,
187     DBCache     *db,
188     Datum               key
189 )
190 {
191         Datum   data;
192         ID_BLOCK        *idl;
193         ID_BLOCK        **tmp;
194         unsigned        i, nids, nblocks;
195
196         idl = idl_fetch_one( be, db, key );
197
198         if ( idl == NULL ) {
199                 return NULL;
200         }
201
202         if ( ID_BLOCK_ALLIDS(idl) ) {
203                 /* all ids block */
204                 return( idl );
205         }
206
207         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
208                 /* regular block */
209                 return( idl );
210         }
211
212         /*
213          * this is an indirect block which points to other blocks.
214          * we need to read in all the blocks it points to and construct
215          * a big id list containing all the ids, which we will return.
216          */
217
218 #ifndef USE_INDIRECT_NIDS
219         /* count the number of blocks & allocate space for pointers to them */
220         for ( nblocks = 0; !ID_BLOCK_NOID(idl, nblocks); nblocks++ )
221                 ;       /* NULL */
222 #else
223         nblocks = ID_BLOCK_NIDS(idl);
224 #endif
225         tmp = (ID_BLOCK **) ch_malloc( nblocks * sizeof(ID_BLOCK *) );
226
227         /* read in all the blocks */
228         cont_alloc( &data, &key );
229         nids = 0;
230         for ( i = 0; i < nblocks; i++ ) {
231                 cont_id( &data, ID_BLOCK_ID(idl, i) );
232
233                 if ( (tmp[i] = idl_fetch_one( be, db, data )) == NULL ) {
234 #ifdef NEW_LOGGING
235                         LDAP_LOG( INDEX, INFO,
236                                    "idl_fetch: idl_fetch_one returned NULL\n", 0,0,0 );
237 #else
238                         Debug( LDAP_DEBUG_ANY,
239                             "idl_fetch: one returned NULL\n", 0, 0, 0 );
240 #endif
241
242                         continue;
243                 }
244
245                 nids += ID_BLOCK_NIDS(tmp[i]);
246         }
247         cont_free( &data );
248         idl_free( idl );
249
250         /* allocate space for the big block */
251         idl = idl_alloc( nids );
252         ID_BLOCK_NIDS(idl) = nids;
253         nids = 0;
254
255         /* copy in all the ids from the component blocks */
256         for ( i = 0; i < nblocks; i++ ) {
257                 if ( tmp[i] == NULL ) {
258                         continue;
259                 }
260
261                 AC_MEMCPY(
262                         (char *) &ID_BLOCK_ID(idl, nids),
263                         (char *) &ID_BLOCK_ID(tmp[i], 0),
264                         ID_BLOCK_NIDS(tmp[i]) * sizeof(ID) );
265                 nids += ID_BLOCK_NIDS(tmp[i]);
266
267                 idl_free( tmp[i] );
268         }
269         free( (char *) tmp );
270
271         assert( ID_BLOCK_NIDS(idl) == nids );
272
273 #ifdef LDBM_DEBUG_IDL
274         idl_check(idl);
275 #endif
276
277 #ifdef NEW_LOGGING
278         LDAP_LOG( INDEX, ENTRY, 
279                    "idl_fetch: %ld ids (%ld max)\n",
280                    ID_BLOCK_NIDS(idl), ID_BLOCK_NMAXN(idl), 0 );
281 #else
282         Debug( LDAP_DEBUG_TRACE, "<= idl_fetch %ld ids (%ld max)\n",
283                ID_BLOCK_NIDS(idl), ID_BLOCK_NMAXN(idl), 0 );
284 #endif
285
286         return( idl );
287 }
288
289
290 /* store a single block */
291 static int
292 idl_store(
293     Backend             *be,
294     DBCache     *db,
295     Datum               key, 
296     ID_BLOCK            *idl
297 )
298 {
299         int     rc, flags;
300         Datum   data;
301
302 #ifdef LDBM_DEBUG_IDL
303         idl_check(idl);
304 #endif
305
306         ldbm_datum_init( data );
307
308         /* Debug( LDAP_DEBUG_TRACE, "=> idl_store\n", 0, 0, 0 ); */
309
310         data.dptr = (char *) idl;
311         data.dsize = (ID_BLOCK_IDS_OFFSET + ID_BLOCK_NMAXN(idl)) * sizeof(ID);
312         
313         flags = LDBM_REPLACE;
314         rc = ldbm_cache_store( db, key, data, flags );
315
316         /* Debug( LDAP_DEBUG_TRACE, "<= idl_store %d\n", rc, 0, 0 ); */
317         return( rc );
318 }
319
320 /* Binary search for id in block, return index
321  *    an index is always returned, even with no match. If no
322  * match, the returned index is the insertion point.
323  */
324 static unsigned int
325 idl_find(
326     ID_BLOCK    *b,
327     ID          id
328 )
329 {
330         int lo=0, hi=ID_BLOCK_NIDS(b)-1, nr=0;
331
332         for (;lo<=hi;)
333         {
334             nr = ( lo + hi ) / 2;
335             if (ID_BLOCK_ID(b, nr) == id)
336                 break;
337             if (ID_BLOCK_ID(b, nr) > id)
338                 hi = nr - 1;
339             else
340                 lo = nr + 1;
341         }
342         return nr;
343 }
344
345 /* split the block at id 
346  *      locate ID greater than or equal to id.
347  */
348 static void
349 idl_split_block(
350     ID_BLOCK    *b,
351     ID          id,
352     ID_BLOCK    **right,
353     ID_BLOCK    **left
354 )
355 {
356         unsigned int    nr, nl;
357
358         /* find where to split the block */
359         nr = idl_find(b, id);
360         if ( ID_BLOCK_ID(b,nr) < id )
361                 nr++;
362
363         nl = ID_BLOCK_NIDS(b) - nr;
364
365         *right = idl_alloc( nr == 0 ? 1 : nr );
366         *left = idl_alloc( nl + (nr == 0 ? 0 : 1));
367
368         /*
369          * everything before the id being inserted in the first block
370          * unless there is nothing, in which case the id being inserted
371          * goes there.
372          */
373         if ( nr == 0 ) {
374                 ID_BLOCK_NIDS(*right) = 1;
375                 ID_BLOCK_ID(*right, 0) = id;
376         } else {
377                 AC_MEMCPY(
378                         (char *) &ID_BLOCK_ID(*right, 0),
379                         (char *) &ID_BLOCK_ID(b, 0),
380                         nr * sizeof(ID) );
381                 ID_BLOCK_NIDS(*right) = nr;
382                 ID_BLOCK_ID(*left, 0) = id;
383         }
384
385         /* the id being inserted & everything after in the second block */
386         AC_MEMCPY(
387                 (char *) &ID_BLOCK_ID(*left, (nr == 0 ? 0 : 1)),
388             (char *) &ID_BLOCK_ID(b, nr),
389                 nl * sizeof(ID) );
390         ID_BLOCK_NIDS(*left) = nl + (nr == 0 ? 0 : 1);
391
392 #ifdef LDBM_DEBUG_IDL
393         idl_check(*right);
394         idl_check(*left);
395 #endif
396 }
397
398
399 /*
400  * idl_change_first - called when an indirect block's first key has
401  * changed, meaning it needs to be stored under a new key, and the
402  * header block pointing to it needs updating.
403  */
404 static int
405 idl_change_first(
406     Backend             *be,
407     DBCache     *db,
408     Datum               hkey,           /* header block key     */
409     ID_BLOCK            *h,             /* header block         */
410     int                 pos,            /* pos in h to update   */
411     Datum               bkey,           /* data block key       */
412     ID_BLOCK            *b              /* data block           */
413 )
414 {
415         int     rc;
416
417         /* Debug( LDAP_DEBUG_TRACE, "=> idl_change_first\n", 0, 0, 0 ); */
418
419         /* delete old key block */
420         if ( (rc = ldbm_cache_delete( db, bkey )) != 0 ) {
421 #ifdef NEW_LOGGING
422                 LDAP_LOG( INDEX, INFO, 
423                            "idl_change_first: ldbm_cache_delete returned %d\n", rc, 0, 0 );
424 #else
425                 Debug( LDAP_DEBUG_ANY,
426                     "idl_change_first: ldbm_cache_delete returned %d\n",
427                         rc, 0, 0 );
428 #endif
429
430                 return( rc );
431         }
432
433         /* write block with new key */
434         cont_id( &bkey, ID_BLOCK_ID(b, 0) );
435
436         if ( (rc = idl_store( be, db, bkey, b )) != 0 ) {
437 #ifdef NEW_LOGGING
438                 LDAP_LOG( INDEX, INFO, 
439                            "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
440 #else
441                 Debug( LDAP_DEBUG_ANY,
442                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
443 #endif
444
445                 return( rc );
446         }
447
448         /* update + write indirect header block */
449         ID_BLOCK_ID(h, pos) = ID_BLOCK_ID(b, 0);
450         if ( (rc = idl_store( be, db, hkey, h )) != 0 ) {
451 #ifdef NEW_LOGGING
452                 LDAP_LOG( INDEX, INFO, 
453                            "idl_change_first: idl_store returned %s\n", rc, 0, 0 );
454 #else
455                 Debug( LDAP_DEBUG_ANY,
456                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
457 #endif
458
459                 return( rc );
460         }
461
462         return( 0 );
463 }
464
465
466 int
467 idl_insert_key(
468     Backend             *be,
469     DBCache     *db,
470     Datum               key,
471     ID                  id
472 )
473 {
474         int     i, j, first, rc = 0;
475         ID_BLOCK        *idl, *tmp, *tmp2, *tmp3;
476         Datum   k2;
477
478         if ( (idl = idl_fetch_one( be, db, key )) == NULL ) {
479                 idl = idl_alloc( 1 );
480                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)++) = id;
481                 rc = idl_store( be, db, key, idl );
482
483                 idl_free( idl );
484                 return( rc );
485         }
486
487         if ( ID_BLOCK_ALLIDS( idl ) ) {
488                 /* ALLIDS */
489                 idl_free( idl );
490                 return 0;
491         }
492
493         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
494                 /* regular block */
495                 switch ( idl_insert( &idl, id, db->dbc_maxids ) ) {
496                 case 0:         /* id inserted - store the updated block */
497                 case 1:
498                         rc = idl_store( be, db, key, idl );
499                         break;
500
501                 case 2:         /* id already there - nothing to do */
502                         rc = 0;
503                         break;
504
505                 case 3:         /* id not inserted - block must be split */
506                         /* check threshold for marking this an all-id block */
507                         if ( db->dbc_maxindirect < 2 ) {
508                                 idl_free( idl );
509                                 idl = idl_allids( be );
510                                 rc = idl_store( be, db, key, idl );
511                                 break;
512                         }
513
514                         idl_split_block( idl, id, &tmp, &tmp2 );
515                         idl_free( idl );
516
517                         /* create the header indirect block */
518 #ifndef USE_INDIRECT_NIDS
519                         idl = idl_alloc( 3 );
520                         ID_BLOCK_NMAX(idl) = 3;
521                         ID_BLOCK_NIDS(idl) = ID_BLOCK_INDIRECT_VALUE;
522                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
523                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
524                         ID_BLOCK_ID(idl, 2) = NOID;
525 #else
526                         idl = idl_alloc( 2 );
527                         ID_BLOCK_NMAX(idl) = 2 | ID_BLOCK_INDIRECT_VALUE;
528                         ID_BLOCK_NIDS(idl) = 2;
529                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
530                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
531 #endif
532
533                         /* store it */
534                         rc = idl_store( be, db, key, idl );
535
536                         cont_alloc( &k2, &key );
537                         cont_id( &k2, ID_BLOCK_ID(tmp, 0) );
538
539                         rc = idl_store( be, db, k2, tmp );
540
541                         cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
542                         rc = idl_store( be, db, k2, tmp2 );
543
544                         cont_free( &k2 );
545
546                         idl_free( tmp );
547                         idl_free( tmp2 );
548                         break;
549                 }
550
551                 idl_free( idl );
552                 return( rc );
553         }
554
555         /*
556          * this is an indirect block which points to other blocks.
557          * we need to read in the block into which the id should be
558          * inserted, then insert the id and store the block.  we might
559          * have to split the block if it is full, which means we also
560          * need to write a new "header" block.
561          */
562
563 #ifndef USE_INDIRECT_NIDS
564         /* select the block to try inserting into *//* XXX linear search XXX */
565         for ( i = 0; !ID_BLOCK_NOID(idl, i) && id >= ID_BLOCK_ID(idl, i); i++ )
566                 ;       /* NULL */
567 #else
568         i = idl_find(idl, id);
569         if (ID_BLOCK_ID(idl, i) <= id)
570                 i++;
571 #endif
572         if ( i != 0 ) {
573                 i--;
574                 first = 0;
575         } else {
576                 first = 1;
577         }
578
579         /* At this point, the following condition must be true:
580          * ID_BLOCK_ID(idl, i) <= id && id < ID_BLOCK_ID(idl, i+1)
581          * except when i is the first or the last block.
582          */
583
584         /* get the block */
585         cont_alloc( &k2, &key );
586         cont_id( &k2, ID_BLOCK_ID(idl, i) );
587
588         if ( (tmp = idl_fetch_one( be, db, k2 )) == NULL ) {
589 #ifdef NEW_LOGGING
590                 LDAP_LOG( INDEX, ERR,
591                            "idl_insert_key: nonexistent continuation block\n", 0, 0, 0 );
592 #else
593                 Debug( LDAP_DEBUG_ANY, "idl_insert_key: nonexistent continuation block\n",
594                     0, 0, 0 );
595 #endif
596
597                 cont_free( &k2 );
598                 idl_free( idl );
599                 return( -1 );
600         }
601
602         /* insert the id */
603         switch ( idl_insert( &tmp, id, db->dbc_maxids ) ) {
604         case 0:         /* id inserted ok */
605                 if ( (rc = idl_store( be, db, k2, tmp )) != 0 ) {
606 #ifdef NEW_LOGGING
607                         LDAP_LOG( INDEX, ERR, 
608                                    "ids_insert_key: idl_store returned %d\n", rc, 0, 0 );
609 #else
610                         Debug( LDAP_DEBUG_ANY,
611                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
612 #endif
613
614                 }
615                 break;
616
617         case 1:         /* id inserted - first id in block has changed */
618                 /*
619                  * key for this block has changed, so we have to
620                  * write the block under the new key, delete the
621                  * old key block + update and write the indirect
622                  * header block.
623                  */
624
625                 rc = idl_change_first( be, db, key, idl, i, k2, tmp );
626                 break;
627
628         case 2:         /* id not inserted - already there, do nothing */
629                 rc = 0;
630                 break;
631
632         case 3:         /* id not inserted - block is full */
633                 /*
634                  * first, see if it will fit in the next block,
635                  * without splitting, unless we're trying to insert
636                  * into the beginning of the first block.
637                  */
638
639 #ifndef USE_INDIRECT_NIDS
640                 /* is there a next block? */
641                 if ( !first && !ID_BLOCK_NOID(idl, i + 1) ) {
642 #else
643                 if ( !first && (unsigned long)(i + 1) < ID_BLOCK_NIDS(idl) ) {
644 #endif
645                         Datum k3;
646                         /* read it in */
647                         cont_alloc( &k3, &key );
648                         cont_id( &k3, ID_BLOCK_ID(idl, i + 1) );
649                         if ( (tmp2 = idl_fetch_one( be, db, k3 )) == NULL ) {
650 #ifdef NEW_LOGGING
651                                 LDAP_LOG( INDEX, ERR,
652                                            "idl_insert_key: idl_fetch_one returned NULL\n", 0, 0, 0);
653 #else
654                                 Debug( LDAP_DEBUG_ANY,
655                                     "idl_insert_key: idl_fetch_one returned NULL\n",
656                                     0, 0, 0 );
657 #endif
658
659                                 /* split the original block */
660                                 cont_free( &k3 );
661                                 goto split;
662                         }
663
664                         /* If the new id is less than the last id in the
665                          * current block, it must not be put into the next
666                          * block. Push the last id of the current block
667                          * into the next block instead.
668                          */
669                         if (id < ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1)) {
670                             ID id2 = ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1);
671
672                             --ID_BLOCK_NIDS(tmp);
673                             /* This must succeed since we just popped one
674                              * ID off the end of it.
675                              */
676                             rc = idl_insert( &tmp, id, db->dbc_maxids );
677
678                             if ( (rc = idl_store( be, db, k2, tmp )) != 0 ) {
679 #ifdef NEW_LOGGING
680                                 LDAP_LOG( INDEX, ERR, 
681                                         "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
682 #else
683                                 Debug( LDAP_DEBUG_ANY,
684                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
685 #endif
686
687                             }
688
689                             id = id2;
690                             /* This new id will necessarily be inserted
691                              * as the first id of the next block by the
692                              * following switch() statement.
693                              */
694                         }
695
696                         switch ( (rc = idl_insert( &tmp2, id,
697                             db->dbc_maxids )) ) {
698                         case 1:         /* id inserted first in block */
699                                 rc = idl_change_first( be, db, key, idl,
700                                     i + 1, k3, tmp2 );
701                                 /* FALL */
702
703                         case 2:         /* id already there - how? */
704                         case 0:         /* id inserted: this can never be
705                                          * the result of idl_insert, because
706                                          * we guaranteed that idl_change_first
707                                          * will always be called.
708                                          */
709                                 if ( rc == 2 ) {
710 #ifdef NEW_LOGGING
711                                         LDAP_LOG( INDEX, INFO, 
712                                                    "idl_insert_key: id %ld is already in next block\n", 
713                                                    id, 0, 0 );
714 #else
715                                         Debug( LDAP_DEBUG_ANY,
716                                             "idl_insert_key: id %ld already in next block\n",
717                                             id, 0, 0 );
718 #endif
719
720                                 }
721
722                                 idl_free( tmp );
723                                 idl_free( tmp2 );
724                                 cont_free( &k3 );
725                                 cont_free( &k2 );
726                                 idl_free( idl );
727                                 return( 0 );
728
729                         case 3:         /* split the original block */
730                                 break;
731                         }
732
733                         idl_free( tmp2 );
734                         cont_free( &k3 );
735                 }
736
737 split:
738                 /*
739                  * must split the block, write both new blocks + update
740                  * and write the indirect header block.
741                  */
742
743                 rc = 0; /* optimistic */
744
745
746 #ifndef USE_INDIRECT_NIDS
747                 /* count how many indirect blocks *//* XXX linear count XXX */
748                 for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ )
749                         ;       /* NULL */
750 #else
751                 j = ID_BLOCK_NIDS(idl);
752 #endif
753
754                 /* check it against all-id thresholed */
755                 if ( j + 1 > db->dbc_maxindirect ) {
756                         /*
757                          * we've passed the all-id threshold, meaning
758                          * that this set of blocks should be replaced
759                          * by a single "all-id" block.  our job: delete
760                          * all the indirect blocks, and replace the header
761                          * block by an all-id block.
762                          */
763
764                         /* delete all indirect blocks */
765 #ifndef USE_INDIRECT_NIDS
766                         for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ ) {
767 #else
768                         for ( j = 0; (unsigned long) j < ID_BLOCK_NIDS(idl); j++ ) {
769 #endif
770                                 cont_id( &k2, ID_BLOCK_ID(idl, j) );
771
772                                 rc = ldbm_cache_delete( db, k2 );
773                         }
774
775                         /* store allid block in place of header block */
776                         idl_free( idl );
777                         idl = idl_allids( be );
778                         rc = idl_store( be, db, key, idl );
779
780                         cont_free( &k2 );
781                         idl_free( idl );
782                         idl_free( tmp );
783                         return( rc );
784                 }
785
786                 idl_split_block( tmp, id, &tmp2, &tmp3 );
787                 idl_free( tmp );
788
789                 /* create a new updated indirect header block */
790                 tmp = idl_alloc( ID_BLOCK_NMAXN(idl) + 1 );
791 #ifndef USE_INDIRECT_NIDS
792                 ID_BLOCK_NIDS(tmp) = ID_BLOCK_INDIRECT_VALUE;
793 #else
794                 ID_BLOCK_NMAX(tmp) |= ID_BLOCK_INDIRECT_VALUE;
795 #endif
796                 /* everything up to the split block */
797                 AC_MEMCPY(
798                         (char *) &ID_BLOCK_ID(tmp, 0),
799                         (char *) &ID_BLOCK_ID(idl, 0),
800                     i * sizeof(ID) );
801                 /* the two new blocks */
802                 ID_BLOCK_ID(tmp, i) = ID_BLOCK_ID(tmp2, 0);
803                 ID_BLOCK_ID(tmp, i + 1) = ID_BLOCK_ID(tmp3, 0);
804                 /* everything after the split block */
805 #ifndef USE_INDIRECT_NIDS
806                 AC_MEMCPY(
807                         (char *) &ID_BLOCK_ID(tmp, i + 2),
808                         (char *) &ID_BLOCK_ID(idl, i + 1),
809                         (ID_BLOCK_NMAXN(idl) - i - 1) * sizeof(ID) );
810 #else
811                 AC_MEMCPY(
812                         (char *) &ID_BLOCK_ID(tmp, i + 2),
813                         (char *) &ID_BLOCK_ID(idl, i + 1),
814                         (ID_BLOCK_NIDS(idl) - i - 1) * sizeof(ID) );
815                 ID_BLOCK_NIDS(tmp) = ID_BLOCK_NIDS(idl) + 1;
816 #endif
817
818                 /* store the header block */
819                 rc = idl_store( be, db, key, tmp );
820
821                 /* store the first id block */
822                 cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
823                 rc = idl_store( be, db, k2, tmp2 );
824
825                 /* store the second id block */
826                 cont_id( &k2, ID_BLOCK_ID(tmp3, 0) );
827                 rc = idl_store( be, db, k2, tmp3 );
828
829                 idl_free( tmp2 );
830                 idl_free( tmp3 );
831                 break;
832         }
833
834         cont_free( &k2 );
835         idl_free( tmp );
836         idl_free( idl );
837         return( rc );
838 }
839
840
841 /*
842  * idl_insert - insert an id into an id list.
843  *
844  *      returns
845  *              0       id inserted
846  *              1       id inserted, first id in block has changed
847  *              2       id not inserted, already there
848  *              3       id not inserted, block must be split
849  */
850 int
851 idl_insert( ID_BLOCK **idl, ID id, unsigned int maxids )
852 {
853         unsigned int    i;
854
855         if ( ID_BLOCK_ALLIDS( *idl ) ) {
856                 return( 2 );    /* already there */
857         }
858
859         /* is it already there? */
860         i = idl_find(*idl, id);
861         if ( ID_BLOCK_ID(*idl, i) == id ) {
862                 return( 2 );    /* already there */
863         }
864         if ( ID_BLOCK_NIDS(*idl) && ID_BLOCK_ID(*idl, i) < id )
865                 i++;
866
867         /* do we need to make room for it? */
868         if ( ID_BLOCK_NIDS(*idl) == ID_BLOCK_NMAXN(*idl) ) {
869                 /* make room or indicate block needs splitting */
870                 if ( ID_BLOCK_NMAXN(*idl) >= maxids ) {
871                         return( 3 );    /* block needs splitting */
872                 }
873
874                 ID_BLOCK_NMAX(*idl) *= 2;
875                 if ( ID_BLOCK_NMAXN(*idl) > maxids ) {
876                         ID_BLOCK_NMAX(*idl) = maxids;
877                 }
878                 *idl = (ID_BLOCK *) ch_realloc( (char *) *idl,
879                     (ID_BLOCK_NMAXN(*idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
880         }
881
882         /* make a slot for the new id */
883         AC_MEMCPY( &ID_BLOCK_ID(*idl, i+1), &ID_BLOCK_ID(*idl, i),
884                     (ID_BLOCK_NIDS(*idl) - i) * sizeof(ID) );
885
886         ID_BLOCK_ID(*idl, i) = id;
887         ID_BLOCK_NIDS(*idl)++;
888         (void) memset(
889                 (char *) &ID_BLOCK_ID((*idl), ID_BLOCK_NIDS(*idl)),
890                 '\0',
891             (ID_BLOCK_NMAXN(*idl) - ID_BLOCK_NIDS(*idl)) * sizeof(ID) );
892
893 #ifdef LDBM_DEBUG_IDL
894         idl_check(*idl);
895 #endif
896
897         return( i == 0 ? 1 : 0 );       /* inserted - first id changed or not */
898 }
899
900
901 int
902 idl_delete_key (
903         Backend         *be,
904         DBCache  *db,
905         Datum           key,
906         ID              id
907 )
908 {
909         Datum  data;
910         ID_BLOCK *idl;
911         unsigned i;
912         int j, nids;
913
914         if ( (idl = idl_fetch_one( be, db, key ) ) == NULL )
915         {
916                 /* It wasn't found.  Hmm... */
917                 return -1;
918         }
919
920         if ( ID_BLOCK_ALLIDS( idl ) ) {
921                 idl_free( idl );
922                 return 0;
923         }
924
925         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
926                 i = idl_find(idl, id);
927                 if ( ID_BLOCK_ID(idl, i) == id ) {
928                         if( --ID_BLOCK_NIDS(idl) == 0 ) {
929                                 ldbm_cache_delete( db, key );
930
931                         } else {
932                                 AC_MEMCPY(
933                                         &ID_BLOCK_ID(idl, i),
934                                         &ID_BLOCK_ID(idl, i+1),
935                                         (ID_BLOCK_NIDS(idl)-i) * sizeof(ID) );
936
937                                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)) = NOID;
938
939                                 idl_store( be, db, key, idl );
940                         }
941
942                         idl_free( idl );
943                         return 0;
944                 }
945                 /*  We didn't find the ID.  Hmmm... */
946                 idl_free( idl );
947                 return -1;
948         }
949         
950         /* We have to go through an indirect block and find the ID
951            in the list of IDL's
952            */
953         cont_alloc( &data, &key );
954 #ifndef USE_INDIRECT_NIDS
955         for ( nids = 0; !ID_BLOCK_NOID(idl, nids); nids++ )
956                 ;       /* NULL */
957
958         for ( j = 0; j<nids; j++ ) 
959 #else
960         nids = ID_BLOCK_NIDS(idl);
961         for ( j = idl_find(idl, id); j >= 0; j = -1)    /* execute once */
962 #endif
963         {
964                 ID_BLOCK *tmp;
965                 cont_id( &data, ID_BLOCK_ID(idl, j) );
966
967                 if ( (tmp = idl_fetch_one( be, db, data )) == NULL ) {
968 #ifdef NEW_LOGGING
969                         LDAP_LOG( INDEX, INFO,
970                                    "idl_delete_key: idl_fetch_one returned NULL\n", 0, 0, 0 );
971 #else
972                         Debug( LDAP_DEBUG_ANY,
973                             "idl_delete_key: idl_fetch of returned NULL\n", 0, 0, 0 );
974 #endif
975
976                         continue;
977                 }
978                 /*
979                    Now try to find the ID in tmp
980                 */
981
982                 i = idl_find(tmp, id);
983                 if ( ID_BLOCK_ID(tmp, i) == id )
984                 {
985                         AC_MEMCPY(
986                                 &ID_BLOCK_ID(tmp, i),
987                                 &ID_BLOCK_ID(tmp, i+1),
988                                 (ID_BLOCK_NIDS(tmp)-(i+1)) * sizeof(ID));
989                         ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp)-1 ) = NOID;
990                         ID_BLOCK_NIDS(tmp)--;
991
992                         if ( ID_BLOCK_NIDS(tmp) ) {
993                                 idl_store ( be, db, data, tmp );
994
995                         } else {
996                                 ldbm_cache_delete( db, data );
997                                 AC_MEMCPY(
998                                         &ID_BLOCK_ID(idl, j),
999                                         &ID_BLOCK_ID(idl, j+1),
1000                                         (nids-(j+1)) * sizeof(ID));
1001                                 ID_BLOCK_ID(idl, nids-1) = NOID;
1002                                 nids--;
1003 #ifdef USE_INDIRECT_NIDS
1004                                 ID_BLOCK_NIDS(idl)--;
1005 #endif
1006                                 if ( ! nids )
1007                                         ldbm_cache_delete( db, key );
1008                                 else
1009                                         idl_store( be, db, key, idl );
1010                         }
1011                         idl_free( tmp );
1012                         cont_free( &data );
1013                         idl_free( idl );
1014                         return 0;
1015                 }
1016                 idl_free( tmp );
1017         }
1018
1019         cont_free( &data );
1020         idl_free( idl );
1021         return -1;
1022 }
1023
1024
1025 /* return a duplicate of a single ID_BLOCK */
1026 static ID_BLOCK *
1027 idl_dup( ID_BLOCK *idl )
1028 {
1029         ID_BLOCK        *new;
1030
1031         if ( idl == NULL ) {
1032                 return( NULL );
1033         }
1034
1035         new = idl_alloc( ID_BLOCK_NMAXN(idl) );
1036
1037         AC_MEMCPY(
1038                 (char *) new,
1039                 (char *) idl,
1040                 (ID_BLOCK_NMAXN(idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
1041
1042 #ifdef LDBM_DEBUG_IDL
1043         idl_check(new);
1044 #endif
1045
1046         return( new );
1047 }
1048
1049
1050 /* return the smaller ID_BLOCK */
1051 static ID_BLOCK *
1052 idl_min( ID_BLOCK *a, ID_BLOCK *b )
1053 {
1054         return( ID_BLOCK_NIDS(a) > ID_BLOCK_NIDS(b) ? b : a );
1055 }
1056
1057
1058 /*
1059  * idl_intersection - return a intersection b
1060  */
1061 ID_BLOCK *
1062 idl_intersection(
1063     Backend     *be,
1064     ID_BLOCK    *a,
1065     ID_BLOCK    *b
1066 )
1067 {
1068         unsigned int    ai, bi, ni;
1069         ID_BLOCK                *n;
1070
1071         if ( a == NULL || b == NULL ) {
1072                 return( NULL );
1073         }
1074         if ( ID_BLOCK_ALLIDS( a ) ) {
1075                 return( idl_dup( b ) );
1076         }
1077         if ( ID_BLOCK_ALLIDS( b ) ) {
1078                 return( idl_dup( a ) );
1079         }
1080         if ( ID_BLOCK_NIDS(a) == 0 || ID_BLOCK_NIDS(b) == 0 ) {
1081                 return( NULL );
1082         }
1083
1084         n = idl_dup( idl_min( a, b ) );
1085
1086 #ifdef LDBM_DEBUG_IDL
1087         idl_check(a);
1088         idl_check(b);
1089 #endif
1090
1091         for ( ni = 0, ai = 0, bi = 0; ; ) {
1092                 if ( ID_BLOCK_ID(b, bi) == ID_BLOCK_ID(a, ai) ) {
1093                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1094                         ai++;
1095                         bi++;
1096                         if ( ai >= ID_BLOCK_NIDS(a) || bi >= ID_BLOCK_NIDS(b) )
1097                                 break;
1098                 } else if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
1099                         ai++;
1100                         if ( ai >= ID_BLOCK_NIDS(a) )
1101                                 break;
1102                 } else {
1103                         bi++;
1104                         if ( bi >= ID_BLOCK_NIDS(b) )
1105                                 break;
1106                 }
1107         }
1108
1109         if ( ni == 0 ) {
1110                 idl_free( n );
1111                 return( NULL );
1112         }
1113         ID_BLOCK_NIDS(n) = ni;
1114
1115 #ifdef LDBM_DEBUG_IDL
1116         idl_check(n);
1117 #endif
1118
1119         return( n );
1120 }
1121
1122
1123 /*
1124  * idl_union - return a union b
1125  */
1126 ID_BLOCK *
1127 idl_union(
1128     Backend     *be,
1129     ID_BLOCK    *a,
1130     ID_BLOCK    *b
1131 )
1132 {
1133         unsigned int    ai, bi, ni;
1134         ID_BLOCK                *n;
1135
1136         if ( a == NULL ) {
1137                 return( idl_dup( b ) );
1138         }
1139         if ( b == NULL ) {
1140                 return( idl_dup( a ) );
1141         }
1142         if ( ID_BLOCK_ALLIDS( a ) || ID_BLOCK_ALLIDS( b ) ) {
1143                 return( idl_allids( be ) );
1144         }
1145
1146 #ifdef LDBM_DEBUG_IDL
1147         idl_check(a);
1148         idl_check(b);
1149 #endif
1150
1151         if ( ID_BLOCK_NIDS(b) < ID_BLOCK_NIDS(a) ) {
1152                 n = a;
1153                 a = b;
1154                 b = n;
1155         }
1156
1157         n = idl_alloc( ID_BLOCK_NIDS(a) + ID_BLOCK_NIDS(b) );
1158
1159         for ( ni = 0, ai = 0, bi = 0;
1160                 ai < ID_BLOCK_NIDS(a) && bi < ID_BLOCK_NIDS(b);
1161                 )
1162         {
1163                 if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
1164                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai++);
1165
1166                 } else if ( ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai) ) {
1167                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi++);
1168
1169                 } else {
1170                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1171                         ai++, bi++;
1172                 }
1173         }
1174
1175         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1176                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1177         }
1178         for ( ; bi < ID_BLOCK_NIDS(b); bi++ ) {
1179                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi);
1180         }
1181         ID_BLOCK_NIDS(n) = ni;
1182
1183 #ifdef LDBM_DEBUG_IDL
1184         idl_check(n);
1185 #endif
1186
1187         return( n );
1188 }
1189
1190
1191 /*
1192  * idl_notin - return a intersection ~b (or a minus b)
1193  */
1194 ID_BLOCK *
1195 idl_notin(
1196     Backend     *be,
1197     ID_BLOCK    *a,
1198     ID_BLOCK    *b
1199 )
1200 {
1201         unsigned int    ni, ai, bi;
1202         ID_BLOCK                *n;
1203
1204         if ( a == NULL ) {
1205                 return( NULL );
1206         }
1207         if ( b == NULL || ID_BLOCK_ALLIDS( b )) {
1208                 return( idl_dup( a ) );
1209         }
1210
1211         if ( ID_BLOCK_ALLIDS( a ) ) {
1212                 n = idl_alloc( SLAPD_LDBM_MIN_MAXIDS );
1213                 ni = 0;
1214
1215                 for ( ai = 1, bi = 0;
1216                         ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAXN(n) && bi < ID_BLOCK_NMAXN(b);
1217                         ai++ )
1218                 {
1219                         if ( ID_BLOCK_ID(b, bi) == ai ) {
1220                                 bi++;
1221                         } else {
1222                                 ID_BLOCK_ID(n, ni++) = ai;
1223                         }
1224                 }
1225
1226                 for ( ; ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAXN(n); ai++ ) {
1227                         ID_BLOCK_ID(n, ni++) = ai;
1228                 }
1229
1230                 if ( ni == ID_BLOCK_NMAXN(n) ) {
1231                         idl_free( n );
1232                         return( idl_allids( be ) );
1233                 } else {
1234                         ID_BLOCK_NIDS(n) = ni;
1235                         return( n );
1236                 }
1237         }
1238
1239         n = idl_dup( a );
1240
1241         ni = 0;
1242         for ( ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
1243                 for ( ;
1244                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
1245                     bi++ )
1246                 {
1247                         ;       /* NULL */
1248                 }
1249
1250                 if ( bi == ID_BLOCK_NIDS(b) ) {
1251                         break;
1252                 }
1253
1254                 if ( ID_BLOCK_ID(b, bi) != ID_BLOCK_ID(a, ai) ) {
1255                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1256                 }
1257         }
1258
1259         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1260                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1261         }
1262         ID_BLOCK_NIDS(n) = ni;
1263
1264 #ifdef LDBM_DEBUG_IDL
1265         idl_check(n);
1266 #endif
1267
1268         return( n );
1269 }
1270
1271 /*      return the first ID in the block
1272  *      if ALLIDS block
1273  *              NIDS > 1 return 1
1274  *              otherwise return NOID 
1275  *      otherwise return first ID
1276  *
1277  *      cursor is set to 1
1278  */         
1279 ID
1280 idl_firstid( ID_BLOCK *idl, ID *cursor )
1281 {
1282         *cursor = 1;
1283
1284         if ( idl == NULL || ID_BLOCK_NIDS(idl) == 0 ) {
1285                 return( NOID );
1286         }
1287
1288         if ( ID_BLOCK_ALLIDS( idl ) ) {
1289                 return( ID_BLOCK_NIDS(idl) > 1 ? 1 : NOID );
1290         }
1291
1292         return( ID_BLOCK_ID(idl, 0) );
1293 }
1294
1295 /*      return next ID
1296  *      if ALLIDS block, cursor is id.
1297  *              increment id
1298  *              if id < NIDS return id
1299  *              otherwise NOID.
1300  *      otherwise cursor is index into block
1301  *              if index < nids
1302  *                      return id at index then increment
1303  */ 
1304 ID
1305 idl_nextid( ID_BLOCK *idl, ID *cursor )
1306 {
1307         if ( ID_BLOCK_ALLIDS( idl ) ) {
1308                 if( ++(*cursor) < ID_BLOCK_NIDS(idl) ) {
1309                         return *cursor;
1310                 } else {
1311                         return NOID;
1312                 }
1313         }
1314
1315         if ( *cursor < ID_BLOCK_NIDS(idl) ) {
1316                 return( ID_BLOCK_ID(idl, (*cursor)++) );
1317         }
1318
1319         return( NOID );
1320 }