]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ldbm/idl.c
error message from be_entry_put tool backend function
[openldap] / servers / slapd / back-ldbm / idl.c
1 /* idl.c - ldap id list handling routines */
2 /* $OpenLDAP$ */
3 /*
4  * Copyright 1998-2002 The OpenLDAP Foundation, All Rights Reserved.
5  * COPYING RESTRICTIONS APPLY, see COPYRIGHT file
6  */
7
8 #include "portable.h"
9
10 #include <stdio.h>
11
12 #include <ac/string.h>
13 #include <ac/socket.h>
14
15 #include "slap.h"
16 #include "back-ldbm.h"
17
18 static ID_BLOCK* idl_dup( ID_BLOCK *idl );
19
20 static void cont_alloc( Datum *cont, Datum *key )
21 {
22         ldbm_datum_init( *cont );
23         cont->dsize = 1 + sizeof(ID) + key->dsize;
24         cont->dptr = ch_malloc( cont->dsize );
25
26         * (unsigned char *) cont->dptr = SLAP_INDEX_CONT_PREFIX;
27
28         AC_MEMCPY( &((unsigned char *)cont->dptr)[1 + sizeof(ID)],
29                 key->dptr, key->dsize );
30 }
31
32 static void cont_id( Datum *cont, ID id )
33 {
34         int i;
35
36         for( i=1; i <= sizeof(id); i++) {
37                 ((unsigned char *)cont->dptr)[i] = (unsigned char)(id & 0xFF);
38                 id >>= 8;
39         }
40
41 }
42
43 static void cont_free( Datum *cont )
44 {
45         ch_free( cont->dptr );
46 }
47
48 #ifdef LDBM_DEBUG_IDL
49 static void idl_check(ID_BLOCK *idl)
50 {
51         int i;
52         ID_BLOCK last;
53
54         if( ID_BLOCK_INDIRECT(idl) || ID_BLOCK_ALLIDS(idl)
55                 || ID_BLOCK_NIDS(idl) <= 1 )
56         {
57                 return;
58         }
59
60         for( last = ID_BLOCK_ID(idl, 0), i = 1;
61                 i < ID_BLOCK_NIDS(idl);
62                 last = ID_BLOCK_ID(idl, i), i++ )
63         {
64                 assert (last < ID_BLOCK_ID(idl, i) );
65         }
66 }
67 #endif
68
69 /* Allocate an ID_BLOCK with room for nids ids */
70 ID_BLOCK *
71 idl_alloc( unsigned int nids )
72 {
73         ID_BLOCK        *new;
74
75         /* nmax + nids + space for the ids */
76         new = (ID_BLOCK *) ch_calloc( (ID_BLOCK_IDS_OFFSET + nids), sizeof(ID) );
77         ID_BLOCK_NMAX(new) = nids;
78         ID_BLOCK_NIDS(new) = 0;
79
80         return( new );
81 }
82
83
84 /* Allocate an empty ALLIDS ID_BLOCK */
85 ID_BLOCK        *
86 idl_allids( Backend *be )
87 {
88         ID_BLOCK        *idl;
89         ID              id;
90
91         idl = idl_alloc( 0 );
92         ID_BLOCK_NMAX(idl) = ID_BLOCK_ALLIDS_VALUE;
93         if ( next_id_get( be, &id ) ) {
94                 return NULL;
95         }
96         ID_BLOCK_NIDS(idl) = id;
97
98         return( idl );
99 }
100
101 /* Free an ID_BLOCK */
102 void
103 idl_free( ID_BLOCK *idl )
104 {
105         if ( idl == NULL ) {
106 #ifdef NEW_LOGGING
107                 LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
108                            "idl_free: called with NULL pointer\n" ));
109 #else
110                 Debug( LDAP_DEBUG_TRACE,
111                         "idl_free: called with NULL pointer\n",
112                         0, 0, 0 );
113 #endif
114
115                 return;
116         }
117
118         free( (char *) idl );
119 }
120
121
122 /* Fetch an single ID_BLOCK from the cache */
123 static ID_BLOCK *
124 idl_fetch_one(
125     Backend             *be,
126     DBCache     *db,
127     Datum               key
128 )
129 {
130         Datum   data;
131         ID_BLOCK        *idl;
132
133         /* Debug( LDAP_DEBUG_TRACE, "=> idl_fetch_one\n", 0, 0, 0 ); */
134
135         data = ldbm_cache_fetch( db, key );
136
137         if( data.dptr == NULL ) {
138                 return NULL;
139         }
140
141         idl = idl_dup((ID_BLOCK *) data.dptr);
142
143         ldbm_datum_free( db->dbc_db, data );
144
145         return idl;
146 }
147
148
149 /* Fetch a set of ID_BLOCKs from the cache
150  *      if not INDIRECT
151  *              if block return is an ALLIDS block,
152  *                      return an new ALLIDS block
153  *              otherwise
154  *                      return block
155  *      construct super block from all blocks referenced by INDIRECT block
156  *      return super block
157  */
158 ID_BLOCK *
159 idl_fetch(
160     Backend             *be,
161     DBCache     *db,
162     Datum               key
163 )
164 {
165         Datum   data;
166         ID_BLOCK        *idl;
167         ID_BLOCK        **tmp;
168         int     i, nids;
169
170         idl = idl_fetch_one( be, db, key );
171
172         if ( idl == NULL ) {
173                 return NULL;
174         }
175
176         if ( ID_BLOCK_ALLIDS(idl) ) {
177                 /* all ids block */
178                 /* make sure we have the current value of highest id */
179                 idl_free( idl );
180                 idl = idl_allids( be );
181
182                 return( idl );
183         }
184
185         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
186                 /* regular block */
187                 return( idl );
188         }
189
190         /*
191          * this is an indirect block which points to other blocks.
192          * we need to read in all the blocks it points to and construct
193          * a big id list containing all the ids, which we will return.
194          */
195
196 #ifndef USE_INDIRECT_NIDS
197         /* count the number of blocks & allocate space for pointers to them */
198         for ( i = 0; !ID_BLOCK_NOID(idl, i); i++ )
199                 ;       /* NULL */
200 #else
201         i = ID_BLOCK_NIDS(idl);
202 #endif
203         tmp = (ID_BLOCK **) ch_malloc( (i + 1) * sizeof(ID_BLOCK *) );
204
205         /* read in all the blocks */
206         cont_alloc( &data, &key );
207         nids = 0;
208 #ifndef USE_INDIRECT_NIDS
209         for ( i = 0; !ID_BLOCK_NOID(idl, i); i++ ) {
210 #else
211         for ( i = 0; i < ID_BLOCK_NIDS(idl); i++ ) {
212 #endif
213                 cont_id( &data, ID_BLOCK_ID(idl, i) );
214
215                 if ( (tmp[i] = idl_fetch_one( be, db, data )) == NULL ) {
216 #ifdef NEW_LOGGING
217                         LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
218                                    "idl_fetch: idl_fetch_one returned NULL\n" ));
219 #else
220                         Debug( LDAP_DEBUG_ANY,
221                             "idl_fetch: one returned NULL\n", 0, 0, 0 );
222 #endif
223
224                         continue;
225                 }
226
227                 nids += ID_BLOCK_NIDS(tmp[i]);
228         }
229         tmp[i] = NULL;
230         cont_free( &data );
231         idl_free( idl );
232
233         /* allocate space for the big block */
234         idl = idl_alloc( nids );
235         ID_BLOCK_NIDS(idl) = nids;
236         nids = 0;
237
238         /* copy in all the ids from the component blocks */
239         for ( i = 0; tmp[i] != NULL; i++ ) {
240                 if ( tmp[i] == NULL ) {
241                         continue;
242                 }
243
244                 AC_MEMCPY(
245                         (char *) &ID_BLOCK_ID(idl, nids),
246                         (char *) &ID_BLOCK_ID(tmp[i], 0),
247                         ID_BLOCK_NIDS(tmp[i]) * sizeof(ID) );
248                 nids += ID_BLOCK_NIDS(tmp[i]);
249
250                 idl_free( tmp[i] );
251         }
252         free( (char *) tmp );
253
254 #ifdef LDBM_DEBUG_IDL
255         idl_check(idl);
256 #endif
257
258 #ifdef NEW_LOGGING
259         LDAP_LOG(( "cache", LDAP_LEVEL_ENTRY,
260                    "idl_fetch: %ld ids (%ld max)\n",
261                    ID_BLOCK_NIDS(idl), ID_BLOCK_NMAXN(idl) ));
262 #else
263         Debug( LDAP_DEBUG_TRACE, "<= idl_fetch %ld ids (%ld max)\n",
264                ID_BLOCK_NIDS(idl), ID_BLOCK_NMAXN(idl), 0 );
265 #endif
266
267         return( idl );
268 }
269
270
271 /* store a single block */
272 static int
273 idl_store(
274     Backend             *be,
275     DBCache     *db,
276     Datum               key, 
277     ID_BLOCK            *idl
278 )
279 {
280         int     rc, flags;
281         Datum   data;
282
283 #ifdef LDBM_DEBUG_IDL
284         idl_check(idl);
285 #endif
286
287         ldbm_datum_init( data );
288
289         /* Debug( LDAP_DEBUG_TRACE, "=> idl_store\n", 0, 0, 0 ); */
290
291         data.dptr = (char *) idl;
292         data.dsize = (ID_BLOCK_IDS_OFFSET + ID_BLOCK_NMAXN(idl)) * sizeof(ID);
293         
294         flags = LDBM_REPLACE;
295         rc = ldbm_cache_store( db, key, data, flags );
296
297 #ifdef LDBM_DEBUG
298         Statslog( LDAP_DEBUG_STATS, "<= idl_store(): rc=%d\n",
299                 rc, 0, 0, 0, 0 );
300 #endif
301
302         /* Debug( LDAP_DEBUG_TRACE, "<= idl_store %d\n", rc, 0, 0 ); */
303         return( rc );
304 }
305
306 /* Binary search for id in block, return index
307  *    an index is always returned, even with no match. If no
308  * match, the returned index is the insertion point.
309  */
310 static unsigned int
311 idl_find(
312     ID_BLOCK    *b,
313     ID          id
314 )
315 {
316         int lo=0, hi=ID_BLOCK_NIDS(b)-1, nr=0;
317
318         for (;lo<=hi;)
319         {
320             nr = ( lo + hi ) / 2;
321             if (ID_BLOCK_ID(b, nr) == id)
322                 break;
323             if (ID_BLOCK_ID(b, nr) > id)
324                 hi = nr - 1;
325             else
326                 lo = nr + 1;
327         }
328         return nr;
329 }
330
331 /* split the block at id 
332  *      locate ID greater than or equal to id.
333  */
334 static void
335 idl_split_block(
336     ID_BLOCK    *b,
337     ID          id,
338     ID_BLOCK    **right,
339     ID_BLOCK    **left
340 )
341 {
342         unsigned int    nr, nl;
343
344         /* find where to split the block */
345         nr = idl_find(b, id);
346         if ( ID_BLOCK_ID(b,nr) < id )
347                 nr++;
348
349         nl = ID_BLOCK_NIDS(b) - nr;
350
351         *right = idl_alloc( nr == 0 ? 1 : nr );
352         *left = idl_alloc( nl + (nr == 0 ? 0 : 1));
353
354         /*
355          * everything before the id being inserted in the first block
356          * unless there is nothing, in which case the id being inserted
357          * goes there.
358          */
359         if ( nr == 0 ) {
360                 ID_BLOCK_NIDS(*right) = 1;
361                 ID_BLOCK_ID(*right, 0) = id;
362         } else {
363                 AC_MEMCPY(
364                         (char *) &ID_BLOCK_ID(*right, 0),
365                         (char *) &ID_BLOCK_ID(b, 0),
366                         nr * sizeof(ID) );
367                 ID_BLOCK_NIDS(*right) = nr;
368                 ID_BLOCK_ID(*left, 0) = id;
369         }
370
371         /* the id being inserted & everything after in the second block */
372         AC_MEMCPY(
373                 (char *) &ID_BLOCK_ID(*left, (nr == 0 ? 0 : 1)),
374             (char *) &ID_BLOCK_ID(b, nr),
375                 nl * sizeof(ID) );
376         ID_BLOCK_NIDS(*left) = nl + (nr == 0 ? 0 : 1);
377
378 #ifdef LDBM_DEBUG_IDL
379         idl_check(*right);
380         idl_check(*left);
381 #endif
382 }
383
384
385 /*
386  * idl_change_first - called when an indirect block's first key has
387  * changed, meaning it needs to be stored under a new key, and the
388  * header block pointing to it needs updating.
389  */
390 static int
391 idl_change_first(
392     Backend             *be,
393     DBCache     *db,
394     Datum               hkey,           /* header block key     */
395     ID_BLOCK            *h,             /* header block         */
396     int                 pos,            /* pos in h to update   */
397     Datum               bkey,           /* data block key       */
398     ID_BLOCK            *b              /* data block           */
399 )
400 {
401         int     rc;
402
403         /* Debug( LDAP_DEBUG_TRACE, "=> idl_change_first\n", 0, 0, 0 ); */
404
405         /* delete old key block */
406         if ( (rc = ldbm_cache_delete( db, bkey )) != 0 ) {
407 #ifdef NEW_LOGGING
408                 LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
409                            "idl_change_first: ldbm_cache_delete returned %d\n", rc ));
410 #else
411                 Debug( LDAP_DEBUG_ANY,
412                     "idl_change_first: ldbm_cache_delete returned %d\n",
413                         rc, 0, 0 );
414 #endif
415
416                 return( rc );
417         }
418
419         /* write block with new key */
420         cont_id( &bkey, ID_BLOCK_ID(b, 0) );
421
422         if ( (rc = idl_store( be, db, bkey, b )) != 0 ) {
423 #ifdef NEW_LOGGING
424                 LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
425                            "idl_change_first: idl_store returned %d\n", rc ));
426 #else
427                 Debug( LDAP_DEBUG_ANY,
428                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
429 #endif
430
431                 return( rc );
432         }
433
434         /* update + write indirect header block */
435         ID_BLOCK_ID(h, pos) = ID_BLOCK_ID(b, 0);
436         if ( (rc = idl_store( be, db, hkey, h )) != 0 ) {
437 #ifdef NEW_LOGGING
438                 LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
439                            "idl_change_first: idl_store returned %s\n", rc ));
440 #else
441                 Debug( LDAP_DEBUG_ANY,
442                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
443 #endif
444
445                 return( rc );
446         }
447
448         return( 0 );
449 }
450
451
452 int
453 idl_insert_key(
454     Backend             *be,
455     DBCache     *db,
456     Datum               key,
457     ID                  id
458 )
459 {
460         int     i, j, first, rc = 0;
461         ID_BLOCK        *idl, *tmp, *tmp2, *tmp3;
462         Datum   k2;
463
464         if ( (idl = idl_fetch_one( be, db, key )) == NULL ) {
465                 idl = idl_alloc( 1 );
466                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)++) = id;
467                 rc = idl_store( be, db, key, idl );
468
469                 idl_free( idl );
470                 return( rc );
471         }
472
473         if ( ID_BLOCK_ALLIDS( idl ) ) {
474                 /* ALLIDS */
475                 idl_free( idl );
476                 return 0;
477         }
478
479         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
480                 /* regular block */
481                 switch ( idl_insert( &idl, id, db->dbc_maxids ) ) {
482                 case 0:         /* id inserted - store the updated block */
483                 case 1:
484                         rc = idl_store( be, db, key, idl );
485                         break;
486
487                 case 2:         /* id already there - nothing to do */
488                         rc = 0;
489                         break;
490
491                 case 3:         /* id not inserted - block must be split */
492                         /* check threshold for marking this an all-id block */
493                         if ( db->dbc_maxindirect < 2 ) {
494                                 idl_free( idl );
495                                 idl = idl_allids( be );
496                                 rc = idl_store( be, db, key, idl );
497                                 break;
498                         }
499
500                         idl_split_block( idl, id, &tmp, &tmp2 );
501                         idl_free( idl );
502
503                         /* create the header indirect block */
504                         idl = idl_alloc( 3 );
505 #ifndef USE_INDIRECT_NIDS
506                         ID_BLOCK_NMAX(idl) = 3;
507                         ID_BLOCK_NIDS(idl) = ID_BLOCK_INDIRECT_VALUE;
508                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
509                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
510                         ID_BLOCK_ID(idl, 2) = NOID;
511 #else
512                         ID_BLOCK_NMAX(idl) = 2 | ID_BLOCK_INDIRECT_VALUE;
513                         ID_BLOCK_NIDS(idl) = 2;
514                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
515                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
516 #endif
517
518                         /* store it */
519                         rc = idl_store( be, db, key, idl );
520
521                         cont_alloc( &k2, &key );
522                         cont_id( &k2, ID_BLOCK_ID(tmp, 0) );
523
524                         rc = idl_store( be, db, k2, tmp );
525
526                         cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
527                         rc = idl_store( be, db, k2, tmp2 );
528
529                         cont_free( &k2 );
530
531                         idl_free( tmp );
532                         idl_free( tmp2 );
533                         break;
534                 }
535
536                 idl_free( idl );
537                 return( rc );
538         }
539
540         /*
541          * this is an indirect block which points to other blocks.
542          * we need to read in the block into which the id should be
543          * inserted, then insert the id and store the block.  we might
544          * have to split the block if it is full, which means we also
545          * need to write a new "header" block.
546          */
547
548 #ifndef USE_INDIRECT_NIDS
549         /* select the block to try inserting into *//* XXX linear search XXX */
550         for ( i = 0; !ID_BLOCK_NOID(idl, i) && id > ID_BLOCK_ID(idl, i); i++ )
551                 ;       /* NULL */
552 #else
553         i = idl_find(idl, id);
554         if (ID_BLOCK_ID(idl, i) < id)
555                 i++;
556 #endif
557
558         if ( i != 0 ) {
559                 i--;
560                 first = 0;
561         } else {
562                 first = 1;
563         }
564
565         /* get the block */
566         cont_alloc( &k2, &key );
567         cont_id( &k2, ID_BLOCK_ID(idl, i) );
568
569         if ( (tmp = idl_fetch_one( be, db, k2 )) == NULL ) {
570 #ifdef NEW_LOGGING
571                 LDAP_LOG(( "cache", LDAP_LEVEL_ERR,
572                            "idl_insert_key: nonexistent continuation block\n" ));
573 #else
574                 Debug( LDAP_DEBUG_ANY, "idl_insert_key: nonexistent continuation block\n",
575                     0, 0, 0 );
576 #endif
577
578                 cont_free( &k2 );
579                 idl_free( idl );
580                 return( -1 );
581         }
582
583         /* insert the id */
584         switch ( idl_insert( &tmp, id, db->dbc_maxids ) ) {
585         case 0:         /* id inserted ok */
586                 if ( (rc = idl_store( be, db, k2, tmp )) != 0 ) {
587 #ifdef NEW_LOGGING
588                         LDAP_LOG(( "cache", LDAP_LEVEL_ERR,
589                                    "ids_insert_key: idl_store returned %d\n", rc ));
590 #else
591                         Debug( LDAP_DEBUG_ANY,
592                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
593 #endif
594
595                 }
596                 break;
597
598         case 1:         /* id inserted - first id in block has changed */
599                 /*
600                  * key for this block has changed, so we have to
601                  * write the block under the new key, delete the
602                  * old key block + update and write the indirect
603                  * header block.
604                  */
605
606                 rc = idl_change_first( be, db, key, idl, i, k2, tmp );
607                 break;
608
609         case 2:         /* id not inserted - already there, do nothing */
610                 rc = 0;
611                 break;
612
613         case 3:         /* id not inserted - block is full */
614                 /*
615                  * first, see if it will fit in the next block,
616                  * without splitting, unless we're trying to insert
617                  * into the beginning of the first block.
618                  */
619
620 #ifndef USE_INDIRECT_NIDS
621                 /* is there a next block? */
622                 if ( !first && !ID_BLOCK_NOID(idl, i + 1) ) {
623 #else
624                 if ( !first && (i + 1) < ID_BLOCK_NIDS(idl) ) {
625 #endif
626                         /* read it in */
627                         cont_alloc( &k2, &key );
628                         cont_id( &k2, ID_BLOCK_ID(idl, i) );
629                         if ( (tmp2 = idl_fetch_one( be, db, k2 )) == NULL ) {
630 #ifdef NEW_LOGGING
631                                 LDAP_LOG(( "cache", LDAP_LEVEL_ERR,
632                                            "idl_insert_key: idl_fetch_one returned NULL\n"));
633 #else
634                                 Debug( LDAP_DEBUG_ANY,
635                                     "idl_insert_key: idl_fetch_one returned NULL\n",
636                                     0, 0, 0 );
637 #endif
638
639                                 /* split the original block */
640                                 cont_free( &k2 );
641                                 goto split;
642                         }
643
644                         /* If the new id is less than the last id in the
645                          * current block, it must not be put into the next
646                          * block. Push the last id of the current block
647                          * into the next block instead.
648                          */
649                         if (id < ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1)) {
650                             ID id2 = ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1);
651                             Datum k3;
652
653                             ldbm_datum_init( k3 );
654
655                             --ID_BLOCK_NIDS(tmp);
656                             /* This must succeed since we just popped one
657                              * ID off the end of it.
658                              */
659                             rc = idl_insert( &tmp, id, db->dbc_maxids );
660
661                                 k3.dptr = ch_malloc(k2.dsize);
662                                 k3.dsize = k2.dsize;
663                                 AC_MEMCPY(k3.dptr, k2.dptr, k3.dsize);
664                             if ( (rc = idl_store( be, db, k3, tmp )) != 0 ) {
665 #ifdef NEW_LOGGING
666                                 LDAP_LOG(( "cache", LDAP_LEVEL_ERR,
667                                                "idl_insert_key: idl_store returned %d\n", rc ));
668 #else
669                                 Debug( LDAP_DEBUG_ANY,
670                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
671 #endif
672
673                             }
674
675                                 free( k3.dptr );
676
677                             id = id2;
678                             /* This new id will necessarily be inserted
679                              * as the first id of the next block by the
680                              * following switch() statement.
681                              */
682                         }
683
684                         switch ( (rc = idl_insert( &tmp2, id,
685                             db->dbc_maxids )) ) {
686                         case 1:         /* id inserted first in block */
687                                 rc = idl_change_first( be, db, key, idl,
688                                     i + 1, k2, tmp2 );
689                                 /* FALL */
690
691                         case 2:         /* id already there - how? */
692                         case 0:         /* id inserted: this can never be
693                                          * the result of idl_insert, because
694                                          * we guaranteed that idl_change_first
695                                          * will always be called.
696                                          */
697                                 if ( rc == 2 ) {
698 #ifdef NEW_LOGGING
699                                         LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
700                                                    "idl_insert_key: id %ld is already in next block\n", 
701                                                    id ));
702 #else
703                                         Debug( LDAP_DEBUG_ANY,
704                                             "idl_insert_key: id %ld already in next block\n",
705                                             id, 0, 0 );
706 #endif
707
708                                 }
709
710                                 idl_free( tmp );
711                                 idl_free( tmp2 );
712                                 idl_free( idl );
713                                 return( 0 );
714
715                         case 3:         /* split the original block */
716                                 break;
717                         }
718
719                         idl_free( tmp2 );
720                 }
721
722 split:
723                 /*
724                  * must split the block, write both new blocks + update
725                  * and write the indirect header block.
726                  */
727
728                 rc = 0; /* optimistic */
729
730
731 #ifndef USE_INDIRECT_NIDS
732                 /* count how many indirect blocks *//* XXX linear count XXX */
733                 for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ )
734                         ;       /* NULL */
735 #else
736                 j = ID_BLOCK_NIDS(idl);
737 #endif
738
739                 /* check it against all-id thresholed */
740                 if ( j + 1 > db->dbc_maxindirect ) {
741                         /*
742                          * we've passed the all-id threshold, meaning
743                          * that this set of blocks should be replaced
744                          * by a single "all-id" block.  our job: delete
745                          * all the indirect blocks, and replace the header
746                          * block by an all-id block.
747                          */
748
749                         /* delete all indirect blocks */
750 #ifndef USE_INDIRECT_NIDS
751                         for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ ) {
752 #else
753                         for ( j = 0; j < ID_BLOCK_NIDS(idl); j++ ) {
754 #endif
755                                 cont_id( &k2, ID_BLOCK_ID(idl, j) );
756
757                                 rc = ldbm_cache_delete( db, k2 );
758                         }
759
760                         /* store allid block in place of header block */
761                         idl_free( idl );
762                         idl = idl_allids( be );
763                         rc = idl_store( be, db, key, idl );
764
765                         cont_free( &k2 );
766                         idl_free( idl );
767                         idl_free( tmp );
768                         return( rc );
769                 }
770
771                 idl_split_block( tmp, id, &tmp2, &tmp3 );
772                 idl_free( tmp );
773
774                 /* create a new updated indirect header block */
775                 tmp = idl_alloc( ID_BLOCK_NMAXN(idl) + 1 );
776 #ifndef USE_INDIRECT_NIDS
777                 ID_BLOCK_NIDS(tmp) = ID_BLOCK_INDIRECT_VALUE;
778 #else
779                 ID_BLOCK_NMAX(tmp) |= ID_BLOCK_INDIRECT_VALUE;
780 #endif
781                 /* everything up to the split block */
782                 AC_MEMCPY(
783                         (char *) &ID_BLOCK_ID(tmp, 0),
784                         (char *) &ID_BLOCK_ID(idl, 0),
785                     i * sizeof(ID) );
786                 /* the two new blocks */
787                 ID_BLOCK_ID(tmp, i) = ID_BLOCK_ID(tmp2, 0);
788                 ID_BLOCK_ID(tmp, i + 1) = ID_BLOCK_ID(tmp3, 0);
789                 /* everything after the split block */
790 #ifndef USE_INDIRECT_NIDS
791                 AC_MEMCPY(
792                         (char *) &ID_BLOCK_ID(tmp, i + 2),
793                         (char *) &ID_BLOCK_ID(idl, i + 1),
794                         (ID_BLOCK_NMAXN(idl) - i - 1) * sizeof(ID) );
795 #else
796                 AC_MEMCPY(
797                         (char *) &ID_BLOCK_ID(tmp, i + 2),
798                         (char *) &ID_BLOCK_ID(idl, i + 1),
799                         (ID_BLOCK_NIDS(idl) - i - 1) * sizeof(ID) );
800                 ID_BLOCK_NIDS(idl)++;
801 #endif
802
803                 /* store the header block */
804                 rc = idl_store( be, db, key, tmp );
805
806                 /* store the first id block */
807                 cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
808                 rc = idl_store( be, db, k2, tmp2 );
809
810                 /* store the second id block */
811                 cont_id( &k2, ID_BLOCK_ID(tmp3, 0) );
812                 rc = idl_store( be, db, k2, tmp3 );
813
814                 idl_free( tmp2 );
815                 idl_free( tmp3 );
816                 break;
817         }
818
819         cont_free( &k2 );
820         idl_free( tmp );
821         idl_free( idl );
822         return( rc );
823 }
824
825
826 /*
827  * idl_insert - insert an id into an id list.
828  *
829  *      returns
830  *              0       id inserted
831  *              1       id inserted, first id in block has changed
832  *              2       id not inserted, already there
833  *              3       id not inserted, block must be split
834  */
835 int
836 idl_insert( ID_BLOCK **idl, ID id, unsigned int maxids )
837 {
838         unsigned int    i;
839
840         if ( ID_BLOCK_ALLIDS( *idl ) ) {
841                 return( 2 );    /* already there */
842         }
843
844         /* is it already there? */
845         i = idl_find(*idl, id);
846         if ( ID_BLOCK_ID(*idl, i) == id ) {
847                 return( 2 );    /* already there */
848         }
849         if ( ID_BLOCK_NIDS(*idl) && ID_BLOCK_ID(*idl, i) < id )
850                 i++;
851
852         /* do we need to make room for it? */
853         if ( ID_BLOCK_NIDS(*idl) == ID_BLOCK_NMAXN(*idl) ) {
854                 /* make room or indicate block needs splitting */
855                 if ( ID_BLOCK_NMAXN(*idl) >= maxids ) {
856                         return( 3 );    /* block needs splitting */
857                 }
858
859                 ID_BLOCK_NMAX(*idl) *= 2;
860                 if ( ID_BLOCK_NMAXN(*idl) > maxids ) {
861                         ID_BLOCK_NMAX(*idl) = maxids;
862                 }
863                 *idl = (ID_BLOCK *) ch_realloc( (char *) *idl,
864                     (ID_BLOCK_NMAXN(*idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
865         }
866
867         /* make a slot for the new id */
868         AC_MEMCPY( &ID_BLOCK_ID(*idl, i+1), &ID_BLOCK_ID(*idl, i),
869                     (ID_BLOCK_NIDS(*idl) - i) * sizeof(ID) );
870
871         ID_BLOCK_ID(*idl, i) = id;
872         ID_BLOCK_NIDS(*idl)++;
873         (void) memset(
874                 (char *) &ID_BLOCK_ID((*idl), ID_BLOCK_NIDS(*idl)),
875                 '\0',
876             (ID_BLOCK_NMAXN(*idl) - ID_BLOCK_NIDS(*idl)) * sizeof(ID) );
877
878 #ifdef LDBM_DEBUG_IDL
879         idl_check(*idl);
880 #endif
881
882         return( i == 0 ? 1 : 0 );       /* inserted - first id changed or not */
883 }
884
885
886 int
887 idl_delete_key (
888         Backend         *be,
889         DBCache  *db,
890         Datum           key,
891         ID              id
892 )
893 {
894         Datum  data;
895         ID_BLOCK *idl;
896         unsigned i;
897         int j, nids;
898
899         if ( (idl = idl_fetch_one( be, db, key ) ) == NULL )
900         {
901                 /* It wasn't found.  Hmm... */
902                 return -1;
903         }
904
905         if ( ID_BLOCK_ALLIDS( idl ) ) {
906                 idl_free( idl );
907                 return 0;
908         }
909
910         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
911                 i = idl_find(idl, id);
912                 if ( ID_BLOCK_ID(idl, i) == id ) {
913                         if( --ID_BLOCK_NIDS(idl) == 0 ) {
914                                 ldbm_cache_delete( db, key );
915
916                         } else {
917                                 AC_MEMCPY(
918                                         &ID_BLOCK_ID(idl, i),
919                                         &ID_BLOCK_ID(idl, i+1),
920                                         (ID_BLOCK_NIDS(idl)-i) * sizeof(ID) );
921
922                                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)) = NOID;
923
924                                 idl_store( be, db, key, idl );
925                         }
926
927                         idl_free( idl );
928                         return 0;
929                 }
930                 /*  We didn't find the ID.  Hmmm... */
931                 idl_free( idl );
932                 return -1;
933         }
934         
935         /* We have to go through an indirect block and find the ID
936            in the list of IDL's
937            */
938 #ifndef USE_INDIRECT_NIDS
939         for ( nids = 0; !ID_BLOCK_NOID(idl, nids); nids++ )
940                 ;       /* NULL */
941
942         cont_alloc( &data, &key );
943
944         for ( j = 0; j<nids; j++ ) 
945 #else
946         nids = ID_BLOCK_NIDS(idl);
947         for ( j = idl_find(idl, id); j >= 0; j = -1)    /* execute once */
948 #endif
949         {
950                 ID_BLOCK *tmp;
951                 cont_id( &data, ID_BLOCK_ID(idl, j) );
952
953                 if ( (tmp = idl_fetch_one( be, db, data )) == NULL ) {
954 #ifdef NEW_LOGGING
955                         LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
956                                    "idl_delete_key: idl_fetch_one returned NULL\n" ));
957 #else
958                         Debug( LDAP_DEBUG_ANY,
959                             "idl_delete_key: idl_fetch of returned NULL\n", 0, 0, 0 );
960 #endif
961
962                         continue;
963                 }
964                 /*
965                    Now try to find the ID in tmp
966                 */
967
968                 i = idl_find(tmp, id);
969                 if ( ID_BLOCK_ID(tmp, i) == id )
970                 {
971                         AC_MEMCPY(
972                                 &ID_BLOCK_ID(tmp, i),
973                                 &ID_BLOCK_ID(tmp, i+1),
974                                 (ID_BLOCK_NIDS(tmp)-(i+1)) * sizeof(ID));
975                         ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp)-1 ) = NOID;
976                         ID_BLOCK_NIDS(tmp)--;
977
978                         if ( ID_BLOCK_NIDS(tmp) ) {
979                                 idl_store ( be, db, data, tmp );
980
981                         } else {
982                                 ldbm_cache_delete( db, data );
983                                 AC_MEMCPY(
984                                         &ID_BLOCK_ID(idl, j),
985                                         &ID_BLOCK_ID(idl, j+1),
986                                         (nids-(j+1)) * sizeof(ID));
987                                 ID_BLOCK_ID(idl, nids-1) = NOID;
988                                 nids--;
989 #ifdef USE_INDIRECT_NIDS
990                                 ID_BLOCK_NIDS(idl)--;
991 #endif
992                                 if ( ! nids )
993                                         ldbm_cache_delete( db, key );
994                                 else
995                                         idl_store( be, db, key, idl );
996                         }
997                         idl_free( tmp );
998                         cont_free( &data );
999                         idl_free( idl );
1000                         return 0;
1001                 }
1002                 idl_free( tmp );
1003         }
1004
1005         cont_free( &data );
1006         idl_free( idl );
1007         return -1;
1008 }
1009
1010
1011 /* return a duplicate of a single ID_BLOCK */
1012 static ID_BLOCK *
1013 idl_dup( ID_BLOCK *idl )
1014 {
1015         ID_BLOCK        *new;
1016
1017         if ( idl == NULL ) {
1018                 return( NULL );
1019         }
1020
1021         new = idl_alloc( ID_BLOCK_NMAXN(idl) );
1022
1023         AC_MEMCPY(
1024                 (char *) new,
1025                 (char *) idl,
1026                 (ID_BLOCK_NMAXN(idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
1027
1028 #ifdef LDBM_DEBUG_IDL
1029         idl_check(new);
1030 #endif
1031
1032         return( new );
1033 }
1034
1035
1036 /* return the smaller ID_BLOCK */
1037 static ID_BLOCK *
1038 idl_min( ID_BLOCK *a, ID_BLOCK *b )
1039 {
1040         return( ID_BLOCK_NIDS(a) > ID_BLOCK_NIDS(b) ? b : a );
1041 }
1042
1043
1044 /*
1045  * idl_intersection - return a intersection b
1046  */
1047 ID_BLOCK *
1048 idl_intersection(
1049     Backend     *be,
1050     ID_BLOCK    *a,
1051     ID_BLOCK    *b
1052 )
1053 {
1054         unsigned int    ai, bi, ni;
1055         ID_BLOCK                *n;
1056
1057         if ( a == NULL || b == NULL ) {
1058                 return( NULL );
1059         }
1060         if ( ID_BLOCK_ALLIDS( a ) ) {
1061                 return( idl_dup( b ) );
1062         }
1063         if ( ID_BLOCK_ALLIDS( b ) ) {
1064                 return( idl_dup( a ) );
1065         }
1066
1067         n = idl_dup( idl_min( a, b ) );
1068
1069 #ifdef LDBM_DEBUG_IDL
1070         idl_check(a);
1071         idl_check(b);
1072 #endif
1073
1074         for ( ni = 0, ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
1075                 for ( ;
1076                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
1077                         bi++ )
1078                 {
1079                         ;       /* NULL */
1080                 }
1081
1082                 if ( bi == ID_BLOCK_NIDS(b) ) {
1083                         break;
1084                 }
1085
1086                 if ( ID_BLOCK_ID(b, bi) == ID_BLOCK_ID(a, ai) ) {
1087                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1088                 }
1089         }
1090
1091         if ( ni == 0 ) {
1092                 idl_free( n );
1093                 return( NULL );
1094         }
1095         ID_BLOCK_NIDS(n) = ni;
1096
1097 #ifdef LDBM_DEBUG_IDL
1098         idl_check(n);
1099 #endif
1100
1101         return( n );
1102 }
1103
1104
1105 /*
1106  * idl_union - return a union b
1107  */
1108 ID_BLOCK *
1109 idl_union(
1110     Backend     *be,
1111     ID_BLOCK    *a,
1112     ID_BLOCK    *b
1113 )
1114 {
1115         unsigned int    ai, bi, ni;
1116         ID_BLOCK                *n;
1117
1118         if ( a == NULL ) {
1119                 return( idl_dup( b ) );
1120         }
1121         if ( b == NULL ) {
1122                 return( idl_dup( a ) );
1123         }
1124         if ( ID_BLOCK_ALLIDS( a ) || ID_BLOCK_ALLIDS( b ) ) {
1125                 return( idl_allids( be ) );
1126         }
1127
1128 #ifdef LDBM_DEBUG_IDL
1129         idl_check(a);
1130         idl_check(b);
1131 #endif
1132
1133         if ( ID_BLOCK_NIDS(b) < ID_BLOCK_NIDS(a) ) {
1134                 n = a;
1135                 a = b;
1136                 b = n;
1137         }
1138
1139         n = idl_alloc( ID_BLOCK_NIDS(a) + ID_BLOCK_NIDS(b) );
1140
1141         for ( ni = 0, ai = 0, bi = 0;
1142                 ai < ID_BLOCK_NIDS(a) && bi < ID_BLOCK_NIDS(b);
1143                 )
1144         {
1145                 if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
1146                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai++);
1147
1148                 } else if ( ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai) ) {
1149                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi++);
1150
1151                 } else {
1152                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1153                         ai++, bi++;
1154                 }
1155         }
1156
1157         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1158                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1159         }
1160         for ( ; bi < ID_BLOCK_NIDS(b); bi++ ) {
1161                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi);
1162         }
1163         ID_BLOCK_NIDS(n) = ni;
1164
1165 #ifdef LDBM_DEBUG_IDL
1166         idl_check(n);
1167 #endif
1168
1169         return( n );
1170 }
1171
1172
1173 /*
1174  * idl_notin - return a intersection ~b (or a minus b)
1175  */
1176 ID_BLOCK *
1177 idl_notin(
1178     Backend     *be,
1179     ID_BLOCK    *a,
1180     ID_BLOCK    *b
1181 )
1182 {
1183         unsigned int    ni, ai, bi;
1184         ID_BLOCK                *n;
1185
1186         if ( a == NULL ) {
1187                 return( NULL );
1188         }
1189         if ( b == NULL || ID_BLOCK_ALLIDS( b )) {
1190                 return( idl_dup( a ) );
1191         }
1192
1193         if ( ID_BLOCK_ALLIDS( a ) ) {
1194                 n = idl_alloc( SLAPD_LDBM_MIN_MAXIDS );
1195                 ni = 0;
1196
1197                 for ( ai = 1, bi = 0;
1198                         ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAXN(n) && bi < ID_BLOCK_NMAXN(b);
1199                         ai++ )
1200                 {
1201                         if ( ID_BLOCK_ID(b, bi) == ai ) {
1202                                 bi++;
1203                         } else {
1204                                 ID_BLOCK_ID(n, ni++) = ai;
1205                         }
1206                 }
1207
1208                 for ( ; ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAXN(n); ai++ ) {
1209                         ID_BLOCK_ID(n, ni++) = ai;
1210                 }
1211
1212                 if ( ni == ID_BLOCK_NMAXN(n) ) {
1213                         idl_free( n );
1214                         return( idl_allids( be ) );
1215                 } else {
1216                         ID_BLOCK_NIDS(n) = ni;
1217                         return( n );
1218                 }
1219         }
1220
1221         n = idl_dup( a );
1222
1223         ni = 0;
1224         for ( ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
1225                 for ( ;
1226                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
1227                     bi++ )
1228                 {
1229                         ;       /* NULL */
1230                 }
1231
1232                 if ( bi == ID_BLOCK_NIDS(b) ) {
1233                         break;
1234                 }
1235
1236                 if ( ID_BLOCK_ID(b, bi) != ID_BLOCK_ID(a, ai) ) {
1237                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1238                 }
1239         }
1240
1241         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1242                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1243         }
1244         ID_BLOCK_NIDS(n) = ni;
1245
1246 #ifdef LDBM_DEBUG_IDL
1247         idl_check(n);
1248 #endif
1249
1250         return( n );
1251 }
1252
1253 /*      return the first ID in the block
1254  *      if ALLIDS block
1255  *              NIDS > 1 return 1
1256  *              otherwise return NOID 
1257  *      otherwise return first ID
1258  *
1259  *      cursor is set to 1
1260  */         
1261 ID
1262 idl_firstid( ID_BLOCK *idl, ID *cursor )
1263 {
1264         *cursor = 1;
1265
1266         if ( idl == NULL || ID_BLOCK_NIDS(idl) == 0 ) {
1267                 return( NOID );
1268         }
1269
1270         if ( ID_BLOCK_ALLIDS( idl ) ) {
1271                 return( ID_BLOCK_NIDS(idl) > 1 ? 1 : NOID );
1272         }
1273
1274         return( ID_BLOCK_ID(idl, 0) );
1275 }
1276
1277 /*      return next ID
1278  *      if ALLIDS block, cursor is id.
1279  *              increment id
1280  *              if id < NIDS return id
1281  *              otherwise NOID.
1282  *      otherwise cursor is index into block
1283  *              if index < nids
1284  *                      return id at index then increment
1285  */ 
1286 ID
1287 idl_nextid( ID_BLOCK *idl, ID *cursor )
1288 {
1289         if ( ID_BLOCK_ALLIDS( idl ) ) {
1290                 if( ++(*cursor) < ID_BLOCK_NIDS(idl) ) {
1291                         return *cursor;
1292                 } else {
1293                         return NOID;
1294                 }
1295         }
1296
1297         if ( *cursor < ID_BLOCK_NIDS(idl) ) {
1298                 return( ID_BLOCK_ID(idl, (*cursor)++) );
1299         }
1300
1301         return( NOID );
1302 }