]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ldbm/idl.c
BDB_INDEX code does no harm (but no good yet, not used by filters yet).
[openldap] / servers / slapd / back-ldbm / idl.c
1 /* idl.c - ldap id list handling routines */
2 /* $OpenLDAP$ */
3 /*
4  * Copyright 1998-2000 The OpenLDAP Foundation, All Rights Reserved.
5  * COPYING RESTRICTIONS APPLY, see COPYRIGHT file
6  */
7
8 #include "portable.h"
9
10 #include <stdio.h>
11
12 #include <ac/string.h>
13 #include <ac/socket.h>
14
15 #include "slap.h"
16 #include "back-ldbm.h"
17
18 static ID_BLOCK* idl_dup( ID_BLOCK *idl );
19
20 static void cont_alloc( Datum *cont, Datum *key )
21 {
22         ldbm_datum_init( *cont );
23         cont->dsize = 1 + sizeof(ID) + key->dsize;
24         cont->dptr = ch_malloc( cont->dsize );
25
26         * (unsigned char *) cont->dptr = SLAP_INDEX_CONT_PREFIX;
27
28         AC_MEMCPY( &((unsigned char *)cont->dptr)[1 + sizeof(ID)],
29                 key->dptr, key->dsize );
30 }
31
32 static void cont_id( Datum *cont, ID id )
33 {
34         int i;
35
36         for( i=1; i <= sizeof(id); i++) {
37                 ((unsigned char *)cont->dptr)[i] = (unsigned char)(id & 0xFF);
38                 id >>= 8;
39         }
40
41 }
42
43 static void cont_free( Datum *cont )
44 {
45         ch_free( cont->dptr );
46 }
47
48 #ifdef LDBM_DEBUG_IDL
49 static void idl_check(ID_BLOCK *idl)
50 {
51         int i;
52         ID_BLOCK last;
53
54         if( ID_BLOCK_INDIRECT(idl) || ID_BLOCK_ALLIDS(idl)
55                 || ID_BLOCK_NIDS(idl) <= 1 )
56         {
57                 return;
58         }
59
60         for( last = ID_BLOCK_ID(idl, 0), i = 1;
61                 i < ID_BLOCK_NIDS(idl);
62                 last = ID_BLOCK_ID(idl, i), i++ )
63         {
64                 assert (last < ID_BLOCK_ID(idl, i) );
65         }
66 }
67 #endif
68
69 /* Allocate an ID_BLOCK with room for nids ids */
70 ID_BLOCK *
71 idl_alloc( unsigned int nids )
72 {
73         ID_BLOCK        *new;
74
75         /* nmax + nids + space for the ids */
76         new = (ID_BLOCK *) ch_calloc( (ID_BLOCK_IDS_OFFSET + nids), sizeof(ID) );
77         ID_BLOCK_NMAX(new) = nids;
78         ID_BLOCK_NIDS(new) = 0;
79
80         return( new );
81 }
82
83
84 /* Allocate an empty ALLIDS ID_BLOCK */
85 ID_BLOCK        *
86 idl_allids( Backend *be )
87 {
88         ID_BLOCK        *idl;
89         ID              id;
90
91         idl = idl_alloc( 0 );
92         ID_BLOCK_NMAX(idl) = ID_BLOCK_ALLIDS_VALUE;
93         if ( next_id_get( be, &id ) ) {
94                 return NULL;
95         }
96         ID_BLOCK_NIDS(idl) = id;
97
98         return( idl );
99 }
100
101 /* Free an ID_BLOCK */
102 void
103 idl_free( ID_BLOCK *idl )
104 {
105         if ( idl == NULL ) {
106 #ifdef NEW_LOGGING
107                 LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
108                            "idl_free: called with NULL pointer\n" ));
109 #else
110                 Debug( LDAP_DEBUG_TRACE,
111                         "idl_free: called with NULL pointer\n",
112                         0, 0, 0 );
113 #endif
114
115                 return;
116         }
117
118         free( (char *) idl );
119 }
120
121
122 /* Fetch an single ID_BLOCK from the cache */
123 static ID_BLOCK *
124 idl_fetch_one(
125     Backend             *be,
126     DBCache     *db,
127     Datum               key
128 )
129 {
130         Datum   data;
131         ID_BLOCK        *idl;
132
133         /* Debug( LDAP_DEBUG_TRACE, "=> idl_fetch_one\n", 0, 0, 0 ); */
134
135         data = ldbm_cache_fetch( db, key );
136
137         if( data.dptr == NULL ) {
138                 return NULL;
139         }
140
141         idl = idl_dup((ID_BLOCK *) data.dptr);
142
143         ldbm_datum_free( db->dbc_db, data );
144
145         return idl;
146 }
147
148
149 /* Fetch a set of ID_BLOCKs from the cache
150  *      if not INDIRECT
151  *              if block return is an ALLIDS block,
152  *                      return an new ALLIDS block
153  *              otherwise
154  *                      return block
155  *      construct super block from all blocks referenced by INDIRECT block
156  *      return super block
157  */
158 ID_BLOCK *
159 idl_fetch(
160     Backend             *be,
161     DBCache     *db,
162     Datum               key
163 )
164 {
165         Datum   data;
166         ID_BLOCK        *idl;
167         ID_BLOCK        **tmp;
168         int     i, nids;
169
170         idl = idl_fetch_one( be, db, key );
171
172         if ( idl == NULL ) {
173                 return NULL;
174         }
175
176         if ( ID_BLOCK_ALLIDS(idl) ) {
177                 /* all ids block */
178                 /* make sure we have the current value of highest id */
179                 idl_free( idl );
180                 idl = idl_allids( be );
181
182                 return( idl );
183         }
184
185         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
186                 /* regular block */
187                 return( idl );
188         }
189
190         /*
191          * this is an indirect block which points to other blocks.
192          * we need to read in all the blocks it points to and construct
193          * a big id list containing all the ids, which we will return.
194          */
195
196 #ifndef USE_INDIRECT_NIDS
197         /* count the number of blocks & allocate space for pointers to them */
198         for ( i = 0; !ID_BLOCK_NOID(idl, i); i++ )
199                 ;       /* NULL */
200 #else
201         i = ID_BLOCK_NIDS(idl);
202 #endif
203         tmp = (ID_BLOCK **) ch_malloc( (i + 1) * sizeof(ID_BLOCK *) );
204
205         /* read in all the blocks */
206         cont_alloc( &data, &key );
207         nids = 0;
208 #ifndef USE_INDIRECT_NIDS
209         for ( i = 0; !ID_BLOCK_NOID(idl, i); i++ ) {
210 #else
211         for ( i = 0; i < ID_BLOCK_NIDS(idl); i++ ) {
212 #endif
213                 cont_id( &data, ID_BLOCK_ID(idl, i) );
214
215                 if ( (tmp[i] = idl_fetch_one( be, db, data )) == NULL ) {
216 #ifdef NEW_LOGGING
217                         LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
218                                    "idl_fetch: idl_fetch_one returned NULL\n" ));
219 #else
220                         Debug( LDAP_DEBUG_ANY,
221                             "idl_fetch: one returned NULL\n", 0, 0, 0 );
222 #endif
223
224                         continue;
225                 }
226
227                 nids += ID_BLOCK_NIDS(tmp[i]);
228         }
229         tmp[i] = NULL;
230         cont_free( &data );
231         idl_free( idl );
232
233         /* allocate space for the big block */
234         idl = idl_alloc( nids );
235         ID_BLOCK_NIDS(idl) = nids;
236         nids = 0;
237
238         /* copy in all the ids from the component blocks */
239         for ( i = 0; tmp[i] != NULL; i++ ) {
240                 if ( tmp[i] == NULL ) {
241                         continue;
242                 }
243
244                 AC_MEMCPY(
245                         (char *) &ID_BLOCK_ID(idl, nids),
246                         (char *) &ID_BLOCK_ID(tmp[i], 0),
247                         ID_BLOCK_NIDS(tmp[i]) * sizeof(ID) );
248                 nids += ID_BLOCK_NIDS(tmp[i]);
249
250                 idl_free( tmp[i] );
251         }
252         free( (char *) tmp );
253
254 #ifdef LDBM_DEBUG_IDL
255         idl_check(idl);
256 #endif
257
258 #ifdef NEW_LOGGING
259         LDAP_LOG(( "cache", LDAP_LEVEL_ENTRY,
260                    "idl_fetch: %ld ids (%ld max)\n",
261                    ID_BLOCK_NIDS(idl), ID_BLOCK_NMAXN(idl) ));
262 #else
263         Debug( LDAP_DEBUG_TRACE, "<= idl_fetch %ld ids (%ld max)\n",
264                ID_BLOCK_NIDS(idl), ID_BLOCK_NMAXN(idl), 0 );
265 #endif
266
267         return( idl );
268 }
269
270
271 /* store a single block */
272 static int
273 idl_store(
274     Backend             *be,
275     DBCache     *db,
276     Datum               key, 
277     ID_BLOCK            *idl
278 )
279 {
280         int     rc, flags;
281         Datum   data;
282         struct ldbminfo *li = (struct ldbminfo *) be->be_private;
283
284 #ifdef LDBM_DEBUG_IDL
285         idl_check(idl);
286 #endif
287
288         ldbm_datum_init( data );
289
290         /* Debug( LDAP_DEBUG_TRACE, "=> idl_store\n", 0, 0, 0 ); */
291
292         data.dptr = (char *) idl;
293         data.dsize = (ID_BLOCK_IDS_OFFSET + ID_BLOCK_NMAXN(idl)) * sizeof(ID);
294         
295 #ifdef LDBM_DEBUG
296         Statslog( LDAP_DEBUG_STATS, "<= idl_store(): rc=%d\n",
297                 rc, 0, 0, 0, 0 );
298 #endif
299
300         flags = LDBM_REPLACE;
301         rc = ldbm_cache_store( db, key, data, flags );
302
303         /* Debug( LDAP_DEBUG_TRACE, "<= idl_store %d\n", rc, 0, 0 ); */
304         return( rc );
305 }
306
307 /* Binary search for id in block, return index
308  *    an index is always returned, even with no match. If no
309  * match, the returned index is the insertion point.
310  */
311 static unsigned int
312 idl_find(
313     ID_BLOCK    *b,
314     ID          id
315 )
316 {
317         int lo=0, hi=ID_BLOCK_NIDS(b)-1, nr=0;
318
319         for (;lo<=hi;)
320         {
321             nr = ( lo + hi ) / 2;
322             if (ID_BLOCK_ID(b, nr) == id)
323                 break;
324             if (ID_BLOCK_ID(b, nr) > id)
325                 hi = nr - 1;
326             else
327                 lo = nr + 1;
328         }
329         return nr;
330 }
331
332 /* split the block at id 
333  *      locate ID greater than or equal to id.
334  */
335 static void
336 idl_split_block(
337     ID_BLOCK    *b,
338     ID          id,
339     ID_BLOCK    **right,
340     ID_BLOCK    **left
341 )
342 {
343         unsigned int    nr, nl;
344
345         /* find where to split the block */
346         nr = idl_find(b, id);
347         if ( ID_BLOCK_ID(b,nr) < id )
348                 nr++;
349
350         nl = ID_BLOCK_NIDS(b) - nr;
351
352         *right = idl_alloc( nr == 0 ? 1 : nr );
353         *left = idl_alloc( nl + (nr == 0 ? 0 : 1));
354
355         /*
356          * everything before the id being inserted in the first block
357          * unless there is nothing, in which case the id being inserted
358          * goes there.
359          */
360         if ( nr == 0 ) {
361                 ID_BLOCK_NIDS(*right) = 1;
362                 ID_BLOCK_ID(*right, 0) = id;
363         } else {
364                 AC_MEMCPY(
365                         (char *) &ID_BLOCK_ID(*right, 0),
366                         (char *) &ID_BLOCK_ID(b, 0),
367                         nr * sizeof(ID) );
368                 ID_BLOCK_NIDS(*right) = nr;
369                 ID_BLOCK_ID(*left, 0) = id;
370         }
371
372         /* the id being inserted & everything after in the second block */
373         AC_MEMCPY(
374                 (char *) &ID_BLOCK_ID(*left, (nr == 0 ? 0 : 1)),
375             (char *) &ID_BLOCK_ID(b, nr),
376                 nl * sizeof(ID) );
377         ID_BLOCK_NIDS(*left) = nl + (nr == 0 ? 0 : 1);
378
379 #ifdef LDBM_DEBUG_IDL
380         idl_check(*right);
381         idl_check(*left);
382 #endif
383 }
384
385
386 /*
387  * idl_change_first - called when an indirect block's first key has
388  * changed, meaning it needs to be stored under a new key, and the
389  * header block pointing to it needs updating.
390  */
391 static int
392 idl_change_first(
393     Backend             *be,
394     DBCache     *db,
395     Datum               hkey,           /* header block key     */
396     ID_BLOCK            *h,             /* header block         */
397     int                 pos,            /* pos in h to update   */
398     Datum               bkey,           /* data block key       */
399     ID_BLOCK            *b              /* data block           */
400 )
401 {
402         int     rc;
403
404         /* Debug( LDAP_DEBUG_TRACE, "=> idl_change_first\n", 0, 0, 0 ); */
405
406         /* delete old key block */
407         if ( (rc = ldbm_cache_delete( db, bkey )) != 0 ) {
408 #ifdef NEW_LOGGING
409                 LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
410                            "idl_change_first: ldbm_cache_delete returned %d\n", rc ));
411 #else
412                 Debug( LDAP_DEBUG_ANY,
413                     "idl_change_first: ldbm_cache_delete returned %d\n",
414                         rc, 0, 0 );
415 #endif
416
417                 return( rc );
418         }
419
420         /* write block with new key */
421         cont_id( &bkey, ID_BLOCK_ID(b, 0) );
422
423         if ( (rc = idl_store( be, db, bkey, b )) != 0 ) {
424 #ifdef NEW_LOGGING
425                 LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
426                            "idl_change_first: idl_store returned %d\n", rc ));
427 #else
428                 Debug( LDAP_DEBUG_ANY,
429                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
430 #endif
431
432                 return( rc );
433         }
434
435         /* update + write indirect header block */
436         ID_BLOCK_ID(h, pos) = ID_BLOCK_ID(b, 0);
437         if ( (rc = idl_store( be, db, hkey, h )) != 0 ) {
438 #ifdef NEW_LOGGING
439                 LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
440                            "idl_change_first: idl_store returned %s\n", rc ));
441 #else
442                 Debug( LDAP_DEBUG_ANY,
443                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
444 #endif
445
446                 return( rc );
447         }
448
449         return( 0 );
450 }
451
452
453 int
454 idl_insert_key(
455     Backend             *be,
456     DBCache     *db,
457     Datum               key,
458     ID                  id
459 )
460 {
461         int     i, j, first, rc;
462         ID_BLOCK        *idl, *tmp, *tmp2, *tmp3;
463         Datum   k2;
464
465         if ( (idl = idl_fetch_one( be, db, key )) == NULL ) {
466                 idl = idl_alloc( 1 );
467                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)++) = id;
468                 rc = idl_store( be, db, key, idl );
469
470                 idl_free( idl );
471                 return( rc );
472         }
473
474         if ( ID_BLOCK_ALLIDS( idl ) ) {
475                 /* ALLIDS */
476                 idl_free( idl );
477                 return 0;
478         }
479
480         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
481                 /* regular block */
482                 switch ( idl_insert( &idl, id, db->dbc_maxids ) ) {
483                 case 0:         /* id inserted - store the updated block */
484                 case 1:
485                         rc = idl_store( be, db, key, idl );
486                         break;
487
488                 case 2:         /* id already there - nothing to do */
489                         rc = 0;
490                         break;
491
492                 case 3:         /* id not inserted - block must be split */
493                         /* check threshold for marking this an all-id block */
494                         if ( db->dbc_maxindirect < 2 ) {
495                                 idl_free( idl );
496                                 idl = idl_allids( be );
497                                 rc = idl_store( be, db, key, idl );
498                                 break;
499                         }
500
501                         idl_split_block( idl, id, &tmp, &tmp2 );
502                         idl_free( idl );
503
504                         /* create the header indirect block */
505                         idl = idl_alloc( 3 );
506 #ifndef USE_INDIRECT_NIDS
507                         ID_BLOCK_NMAX(idl) = 3;
508                         ID_BLOCK_NIDS(idl) = ID_BLOCK_INDIRECT_VALUE;
509                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
510                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
511                         ID_BLOCK_ID(idl, 2) = NOID;
512 #else
513                         ID_BLOCK_NMAX(idl) = 2 | ID_BLOCK_INDIRECT_VALUE;
514                         ID_BLOCK_NIDS(idl) = 2;
515                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
516                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
517 #endif
518
519                         /* store it */
520                         rc = idl_store( be, db, key, idl );
521
522                         cont_alloc( &k2, &key );
523                         cont_id( &k2, ID_BLOCK_ID(tmp, 0) );
524
525                         rc = idl_store( be, db, k2, tmp );
526
527                         cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
528                         rc = idl_store( be, db, k2, tmp2 );
529
530                         cont_free( &k2 );
531
532                         idl_free( tmp );
533                         idl_free( tmp2 );
534                         break;
535                 }
536
537                 idl_free( idl );
538                 return( rc );
539         }
540
541         /*
542          * this is an indirect block which points to other blocks.
543          * we need to read in the block into which the id should be
544          * inserted, then insert the id and store the block.  we might
545          * have to split the block if it is full, which means we also
546          * need to write a new "header" block.
547          */
548
549 #ifndef USE_INDIRECT_NIDS
550         /* select the block to try inserting into *//* XXX linear search XXX */
551         for ( i = 0; !ID_BLOCK_NOID(idl, i) && id > ID_BLOCK_ID(idl, i); i++ )
552                 ;       /* NULL */
553 #else
554         i = idl_find(idl, id);
555         if (ID_BLOCK_ID(idl, i) < id)
556                 i++;
557 #endif
558
559         if ( i != 0 ) {
560                 i--;
561                 first = 0;
562         } else {
563                 first = 1;
564         }
565
566         /* get the block */
567         cont_alloc( &k2, &key );
568         cont_id( &k2, ID_BLOCK_ID(idl, i) );
569
570         if ( (tmp = idl_fetch_one( be, db, k2 )) == NULL ) {
571 #ifdef NEW_LOGGING
572                 LDAP_LOG(( "cache", LDAP_LEVEL_ERR,
573                            "idl_insert_key: nonexistent continuation block\n" ));
574 #else
575                 Debug( LDAP_DEBUG_ANY, "idl_insert_key: nonexistent continuation block\n",
576                     0, 0, 0 );
577 #endif
578
579                 cont_free( &k2 );
580                 idl_free( idl );
581                 return( -1 );
582         }
583
584         /* insert the id */
585         switch ( idl_insert( &tmp, id, db->dbc_maxids ) ) {
586         case 0:         /* id inserted ok */
587                 if ( (rc = idl_store( be, db, k2, tmp )) != 0 ) {
588 #ifdef NEW_LOGGING
589                         LDAP_LOG(( "cache", LDAP_LEVEL_ERR,
590                                    "ids_insert_key: idl_store returned %d\n", rc ));
591 #else
592                         Debug( LDAP_DEBUG_ANY,
593                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
594 #endif
595
596                 }
597                 break;
598
599         case 1:         /* id inserted - first id in block has changed */
600                 /*
601                  * key for this block has changed, so we have to
602                  * write the block under the new key, delete the
603                  * old key block + update and write the indirect
604                  * header block.
605                  */
606
607                 rc = idl_change_first( be, db, key, idl, i, k2, tmp );
608                 break;
609
610         case 2:         /* id not inserted - already there, do nothing */
611                 rc = 0;
612                 break;
613
614         case 3:         /* id not inserted - block is full */
615                 /*
616                  * first, see if it will fit in the next block,
617                  * without splitting, unless we're trying to insert
618                  * into the beginning of the first block.
619                  */
620
621 #ifndef USE_INDIRECT_NIDS
622                 /* is there a next block? */
623                 if ( !first && !ID_BLOCK_NOID(idl, i + 1) ) {
624 #else
625                 if ( !first && (i + 1) < ID_BLOCK_NIDS(idl) ) {
626 #endif
627                         /* read it in */
628                         cont_alloc( &k2, &key );
629                         cont_id( &k2, ID_BLOCK_ID(idl, i) );
630                         if ( (tmp2 = idl_fetch_one( be, db, k2 )) == NULL ) {
631 #ifdef NEW_LOGGING
632                                 LDAP_LOG(( "cache", LDAP_LEVEL_ERR,
633                                            "idl_insert_key: idl_fetch_one returned NULL\n"));
634 #else
635                                 Debug( LDAP_DEBUG_ANY,
636                                     "idl_insert_key: idl_fetch_one returned NULL\n",
637                                     0, 0, 0 );
638 #endif
639
640                                 /* split the original block */
641                                 cont_free( &k2 );
642                                 goto split;
643                         }
644
645                         /* If the new id is less than the last id in the
646                          * current block, it must not be put into the next
647                          * block. Push the last id of the current block
648                          * into the next block instead.
649                          */
650                         if (id < ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1)) {
651                             ID id2 = ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1);
652                             Datum k3;
653
654                             ldbm_datum_init( k3 );
655
656                             --ID_BLOCK_NIDS(tmp);
657                             /* This must succeed since we just popped one
658                              * ID off the end of it.
659                              */
660                             rc = idl_insert( &tmp, id, db->dbc_maxids );
661
662                                 k3.dptr = ch_malloc(k2.dsize);
663                                 k3.dsize = k2.dsize;
664                                 AC_MEMCPY(k3.dptr, k2.dptr, k3.dsize);
665                             if ( (rc = idl_store( be, db, k3, tmp )) != 0 ) {
666 #ifdef NEW_LOGGING
667                                 LDAP_LOG(( "cache", LDAP_LEVEL_ERR,
668                                                "idl_insert_key: idl_store returned %d\n", rc ));
669 #else
670                                 Debug( LDAP_DEBUG_ANY,
671                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
672 #endif
673
674                             }
675
676                                 free( k3.dptr );
677
678                             id = id2;
679                             /* This new id will necessarily be inserted
680                              * as the first id of the next block by the
681                              * following switch() statement.
682                              */
683                         }
684
685                         switch ( (rc = idl_insert( &tmp2, id,
686                             db->dbc_maxids )) ) {
687                         case 1:         /* id inserted first in block */
688                                 rc = idl_change_first( be, db, key, idl,
689                                     i + 1, k2, tmp2 );
690                                 /* FALL */
691
692                         case 2:         /* id already there - how? */
693                         case 0:         /* id inserted: this can never be
694                                          * the result of idl_insert, because
695                                          * we guaranteed that idl_change_first
696                                          * will always be called.
697                                          */
698                                 if ( rc == 2 ) {
699 #ifdef NEW_LOGGING
700                                         LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
701                                                    "idl_insert_key: id %ld is already in next block\n", 
702                                                    id ));
703 #else
704                                         Debug( LDAP_DEBUG_ANY,
705                                             "idl_insert_key: id %ld already in next block\n",
706                                             id, 0, 0 );
707 #endif
708
709                                 }
710
711                                 idl_free( tmp );
712                                 idl_free( tmp2 );
713                                 idl_free( idl );
714                                 return( 0 );
715
716                         case 3:         /* split the original block */
717                                 break;
718                         }
719
720                         idl_free( tmp2 );
721                 }
722
723 split:
724                 /*
725                  * must split the block, write both new blocks + update
726                  * and write the indirect header block.
727                  */
728
729                 rc = 0; /* optimistic */
730
731
732 #ifndef USE_INDIRECT_NIDS
733                 /* count how many indirect blocks *//* XXX linear count XXX */
734                 for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ )
735                         ;       /* NULL */
736 #else
737                 j = ID_BLOCK_NIDS(idl);
738 #endif
739
740                 /* check it against all-id thresholed */
741                 if ( j + 1 > db->dbc_maxindirect ) {
742                         /*
743                          * we've passed the all-id threshold, meaning
744                          * that this set of blocks should be replaced
745                          * by a single "all-id" block.  our job: delete
746                          * all the indirect blocks, and replace the header
747                          * block by an all-id block.
748                          */
749
750                         /* delete all indirect blocks */
751 #ifndef USE_INDIRECT_NIDS
752                         for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ ) {
753 #else
754                         for ( j = 0; j < ID_BLOCK_NIDS(idl); j++ ) {
755 #endif
756                                 cont_id( &k2, ID_BLOCK_ID(idl, j) );
757
758                                 rc = ldbm_cache_delete( db, k2 );
759                         }
760
761                         /* store allid block in place of header block */
762                         idl_free( idl );
763                         idl = idl_allids( be );
764                         rc = idl_store( be, db, key, idl );
765
766                         cont_free( &k2 );
767                         idl_free( idl );
768                         idl_free( tmp );
769                         return( rc );
770                 }
771
772                 idl_split_block( tmp, id, &tmp2, &tmp3 );
773                 idl_free( tmp );
774
775                 /* create a new updated indirect header block */
776                 tmp = idl_alloc( ID_BLOCK_NMAXN(idl) + 1 );
777 #ifndef USE_INDIRECT_NIDS
778                 ID_BLOCK_NIDS(tmp) = ID_BLOCK_INDIRECT_VALUE;
779 #else
780                 ID_BLOCK_NMAX(tmp) |= ID_BLOCK_INDIRECT_VALUE;
781 #endif
782                 /* everything up to the split block */
783                 AC_MEMCPY(
784                         (char *) &ID_BLOCK_ID(tmp, 0),
785                         (char *) &ID_BLOCK_ID(idl, 0),
786                     i * sizeof(ID) );
787                 /* the two new blocks */
788                 ID_BLOCK_ID(tmp, i) = ID_BLOCK_ID(tmp2, 0);
789                 ID_BLOCK_ID(tmp, i + 1) = ID_BLOCK_ID(tmp3, 0);
790                 /* everything after the split block */
791 #ifndef USE_INDIRECT_NIDS
792                 AC_MEMCPY(
793                         (char *) &ID_BLOCK_ID(tmp, i + 2),
794                         (char *) &ID_BLOCK_ID(idl, i + 1),
795                         (ID_BLOCK_NMAXN(idl) - i - 1) * sizeof(ID) );
796 #else
797                 AC_MEMCPY(
798                         (char *) &ID_BLOCK_ID(tmp, i + 2),
799                         (char *) &ID_BLOCK_ID(idl, i + 1),
800                         (ID_BLOCK_NIDS(idl) - i - 1) * sizeof(ID) );
801                 ID_BLOCK_NIDS(idl)++;
802 #endif
803
804                 /* store the header block */
805                 rc = idl_store( be, db, key, tmp );
806
807                 /* store the first id block */
808                 cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
809                 rc = idl_store( be, db, k2, tmp2 );
810
811                 /* store the second id block */
812                 cont_id( &k2, ID_BLOCK_ID(tmp3, 0) );
813                 rc = idl_store( be, db, k2, tmp3 );
814
815                 idl_free( tmp2 );
816                 idl_free( tmp3 );
817                 break;
818         }
819
820         cont_free( &k2 );
821         idl_free( tmp );
822         idl_free( idl );
823         return( rc );
824 }
825
826
827 /*
828  * idl_insert - insert an id into an id list.
829  *
830  *      returns
831  *              0       id inserted
832  *              1       id inserted, first id in block has changed
833  *              2       id not inserted, already there
834  *              3       id not inserted, block must be split
835  */
836 int
837 idl_insert( ID_BLOCK **idl, ID id, unsigned int maxids )
838 {
839         unsigned int    i;
840
841         if ( ID_BLOCK_ALLIDS( *idl ) ) {
842                 return( 2 );    /* already there */
843         }
844
845         /* is it already there? */
846         i = idl_find(*idl, id);
847         if ( ID_BLOCK_ID(*idl, i) == id ) {
848                 return( 2 );    /* already there */
849         }
850         if ( ID_BLOCK_NIDS(*idl) && ID_BLOCK_ID(*idl, i) < id )
851                 i++;
852
853         /* do we need to make room for it? */
854         if ( ID_BLOCK_NIDS(*idl) == ID_BLOCK_NMAXN(*idl) ) {
855                 /* make room or indicate block needs splitting */
856                 if ( ID_BLOCK_NMAXN(*idl) >= maxids ) {
857                         return( 3 );    /* block needs splitting */
858                 }
859
860                 ID_BLOCK_NMAX(*idl) *= 2;
861                 if ( ID_BLOCK_NMAXN(*idl) > maxids ) {
862                         ID_BLOCK_NMAX(*idl) = maxids;
863                 }
864                 *idl = (ID_BLOCK *) ch_realloc( (char *) *idl,
865                     (ID_BLOCK_NMAXN(*idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
866         }
867
868         /* make a slot for the new id */
869         AC_MEMCPY( &ID_BLOCK_ID(*idl, i+1), &ID_BLOCK_ID(*idl, i),
870                     (ID_BLOCK_NIDS(*idl) - i) * sizeof(ID) );
871
872         ID_BLOCK_ID(*idl, i) = id;
873         ID_BLOCK_NIDS(*idl)++;
874         (void) memset(
875                 (char *) &ID_BLOCK_ID((*idl), ID_BLOCK_NIDS(*idl)),
876                 '\0',
877             (ID_BLOCK_NMAXN(*idl) - ID_BLOCK_NIDS(*idl)) * sizeof(ID) );
878
879 #ifdef LDBM_DEBUG_IDL
880         idl_check(*idl);
881 #endif
882
883         return( i == 0 ? 1 : 0 );       /* inserted - first id changed or not */
884 }
885
886
887 int
888 idl_delete_key (
889         Backend         *be,
890         DBCache  *db,
891         Datum           key,
892         ID              id
893 )
894 {
895         Datum  data;
896         ID_BLOCK *idl;
897         unsigned i;
898         int j, nids;
899
900         if ( (idl = idl_fetch_one( be, db, key ) ) == NULL )
901         {
902                 /* It wasn't found.  Hmm... */
903                 return -1;
904         }
905
906         if ( ID_BLOCK_ALLIDS( idl ) ) {
907                 idl_free( idl );
908                 return 0;
909         }
910
911         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
912                 i = idl_find(idl, id);
913                 if ( ID_BLOCK_ID(idl, i) == id ) {
914                         if( --ID_BLOCK_NIDS(idl) == 0 ) {
915                                 ldbm_cache_delete( db, key );
916
917                         } else {
918                                 AC_MEMCPY(
919                                         &ID_BLOCK_ID(idl, i),
920                                         &ID_BLOCK_ID(idl, i+1),
921                                         (ID_BLOCK_NIDS(idl)-i) * sizeof(ID) );
922
923                                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)) = NOID;
924
925                                 idl_store( be, db, key, idl );
926                         }
927
928                         idl_free( idl );
929                         return 0;
930                 }
931                 /*  We didn't find the ID.  Hmmm... */
932                 idl_free( idl );
933                 return -1;
934         }
935         
936         /* We have to go through an indirect block and find the ID
937            in the list of IDL's
938            */
939 #ifndef USE_INDIRECT_NIDS
940         for ( nids = 0; !ID_BLOCK_NOID(idl, nids); nids++ )
941                 ;       /* NULL */
942
943         cont_alloc( &data, &key );
944
945         for ( j = 0; j<nids; j++ ) 
946 #else
947         nids = ID_BLOCK_NIDS(idl);
948         for ( j = idl_find(idl, id); j >= 0; j = -1)    /* execute once */
949 #endif
950         {
951                 ID_BLOCK *tmp;
952                 cont_id( &data, ID_BLOCK_ID(idl, j) );
953
954                 if ( (tmp = idl_fetch_one( be, db, data )) == NULL ) {
955 #ifdef NEW_LOGGING
956                         LDAP_LOG(( "cache", LDAP_LEVEL_INFO,
957                                    "idl_delete_key: idl_fetch_one returned NULL\n" ));
958 #else
959                         Debug( LDAP_DEBUG_ANY,
960                             "idl_delete_key: idl_fetch of returned NULL\n", 0, 0, 0 );
961 #endif
962
963                         continue;
964                 }
965                 /*
966                    Now try to find the ID in tmp
967                 */
968
969                 i = idl_find(tmp, id);
970                 if ( ID_BLOCK_ID(tmp, i) == id )
971                 {
972                         AC_MEMCPY(
973                                 &ID_BLOCK_ID(tmp, i),
974                                 &ID_BLOCK_ID(tmp, i+1),
975                                 (ID_BLOCK_NIDS(tmp)-(i+1)) * sizeof(ID));
976                         ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp)-1 ) = NOID;
977                         ID_BLOCK_NIDS(tmp)--;
978
979                         if ( ID_BLOCK_NIDS(tmp) ) {
980                                 idl_store ( be, db, data, tmp );
981
982                         } else {
983                                 ldbm_cache_delete( db, data );
984                                 AC_MEMCPY(
985                                         &ID_BLOCK_ID(idl, j),
986                                         &ID_BLOCK_ID(idl, j+1),
987                                         (nids-(j+1)) * sizeof(ID));
988                                 ID_BLOCK_ID(idl, nids-1) = NOID;
989                                 nids--;
990 #ifdef USE_INDIRECT_NIDS
991                                 ID_BLOCK_NIDS(idl)--;
992 #endif
993                                 if ( ! nids )
994                                         ldbm_cache_delete( db, key );
995                                 else
996                                         idl_store( be, db, key, idl );
997                         }
998                         idl_free( tmp );
999                         cont_free( &data );
1000                         idl_free( idl );
1001                         return 0;
1002                 }
1003                 idl_free( tmp );
1004         }
1005
1006         cont_free( &data );
1007         idl_free( idl );
1008         return -1;
1009 }
1010
1011
1012 /* return a duplicate of a single ID_BLOCK */
1013 static ID_BLOCK *
1014 idl_dup( ID_BLOCK *idl )
1015 {
1016         ID_BLOCK        *new;
1017
1018         if ( idl == NULL ) {
1019                 return( NULL );
1020         }
1021
1022         new = idl_alloc( ID_BLOCK_NMAXN(idl) );
1023
1024         AC_MEMCPY(
1025                 (char *) new,
1026                 (char *) idl,
1027                 (ID_BLOCK_NMAXN(idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
1028
1029 #ifdef LDBM_DEBUG_IDL
1030         idl_check(new);
1031 #endif
1032
1033         return( new );
1034 }
1035
1036
1037 /* return the smaller ID_BLOCK */
1038 static ID_BLOCK *
1039 idl_min( ID_BLOCK *a, ID_BLOCK *b )
1040 {
1041         return( ID_BLOCK_NIDS(a) > ID_BLOCK_NIDS(b) ? b : a );
1042 }
1043
1044
1045 /*
1046  * idl_intersection - return a intersection b
1047  */
1048 ID_BLOCK *
1049 idl_intersection(
1050     Backend     *be,
1051     ID_BLOCK    *a,
1052     ID_BLOCK    *b
1053 )
1054 {
1055         unsigned int    ai, bi, ni;
1056         ID_BLOCK                *n;
1057
1058         if ( a == NULL || b == NULL ) {
1059                 return( NULL );
1060         }
1061         if ( ID_BLOCK_ALLIDS( a ) ) {
1062                 return( idl_dup( b ) );
1063         }
1064         if ( ID_BLOCK_ALLIDS( b ) ) {
1065                 return( idl_dup( a ) );
1066         }
1067
1068         n = idl_dup( idl_min( a, b ) );
1069
1070 #ifdef LDBM_DEBUG_IDL
1071         idl_check(a);
1072         idl_check(b);
1073 #endif
1074
1075         for ( ni = 0, ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
1076                 for ( ;
1077                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
1078                         bi++ )
1079                 {
1080                         ;       /* NULL */
1081                 }
1082
1083                 if ( bi == ID_BLOCK_NIDS(b) ) {
1084                         break;
1085                 }
1086
1087                 if ( ID_BLOCK_ID(b, bi) == ID_BLOCK_ID(a, ai) ) {
1088                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1089                 }
1090         }
1091
1092         if ( ni == 0 ) {
1093                 idl_free( n );
1094                 return( NULL );
1095         }
1096         ID_BLOCK_NIDS(n) = ni;
1097
1098 #ifdef LDBM_DEBUG_IDL
1099         idl_check(n);
1100 #endif
1101
1102         return( n );
1103 }
1104
1105
1106 /*
1107  * idl_union - return a union b
1108  */
1109 ID_BLOCK *
1110 idl_union(
1111     Backend     *be,
1112     ID_BLOCK    *a,
1113     ID_BLOCK    *b
1114 )
1115 {
1116         unsigned int    ai, bi, ni;
1117         ID_BLOCK                *n;
1118
1119         if ( a == NULL ) {
1120                 return( idl_dup( b ) );
1121         }
1122         if ( b == NULL ) {
1123                 return( idl_dup( a ) );
1124         }
1125         if ( ID_BLOCK_ALLIDS( a ) || ID_BLOCK_ALLIDS( b ) ) {
1126                 return( idl_allids( be ) );
1127         }
1128
1129 #ifdef LDBM_DEBUG_IDL
1130         idl_check(a);
1131         idl_check(b);
1132 #endif
1133
1134         if ( ID_BLOCK_NIDS(b) < ID_BLOCK_NIDS(a) ) {
1135                 n = a;
1136                 a = b;
1137                 b = n;
1138         }
1139
1140         n = idl_alloc( ID_BLOCK_NIDS(a) + ID_BLOCK_NIDS(b) );
1141
1142         for ( ni = 0, ai = 0, bi = 0;
1143                 ai < ID_BLOCK_NIDS(a) && bi < ID_BLOCK_NIDS(b);
1144                 )
1145         {
1146                 if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
1147                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai++);
1148
1149                 } else if ( ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai) ) {
1150                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi++);
1151
1152                 } else {
1153                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1154                         ai++, bi++;
1155                 }
1156         }
1157
1158         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1159                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1160         }
1161         for ( ; bi < ID_BLOCK_NIDS(b); bi++ ) {
1162                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi);
1163         }
1164         ID_BLOCK_NIDS(n) = ni;
1165
1166 #ifdef LDBM_DEBUG_IDL
1167         idl_check(n);
1168 #endif
1169
1170         return( n );
1171 }
1172
1173
1174 /*
1175  * idl_notin - return a intersection ~b (or a minus b)
1176  */
1177 ID_BLOCK *
1178 idl_notin(
1179     Backend     *be,
1180     ID_BLOCK    *a,
1181     ID_BLOCK    *b
1182 )
1183 {
1184         unsigned int    ni, ai, bi;
1185         ID_BLOCK                *n;
1186
1187         if ( a == NULL ) {
1188                 return( NULL );
1189         }
1190         if ( b == NULL || ID_BLOCK_ALLIDS( b )) {
1191                 return( idl_dup( a ) );
1192         }
1193
1194         if ( ID_BLOCK_ALLIDS( a ) ) {
1195                 n = idl_alloc( SLAPD_LDBM_MIN_MAXIDS );
1196                 ni = 0;
1197
1198                 for ( ai = 1, bi = 0;
1199                         ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAXN(n) && bi < ID_BLOCK_NMAXN(b);
1200                         ai++ )
1201                 {
1202                         if ( ID_BLOCK_ID(b, bi) == ai ) {
1203                                 bi++;
1204                         } else {
1205                                 ID_BLOCK_ID(n, ni++) = ai;
1206                         }
1207                 }
1208
1209                 for ( ; ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAXN(n); ai++ ) {
1210                         ID_BLOCK_ID(n, ni++) = ai;
1211                 }
1212
1213                 if ( ni == ID_BLOCK_NMAXN(n) ) {
1214                         idl_free( n );
1215                         return( idl_allids( be ) );
1216                 } else {
1217                         ID_BLOCK_NIDS(n) = ni;
1218                         return( n );
1219                 }
1220         }
1221
1222         n = idl_dup( a );
1223
1224         ni = 0;
1225         for ( ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
1226                 for ( ;
1227                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
1228                     bi++ )
1229                 {
1230                         ;       /* NULL */
1231                 }
1232
1233                 if ( bi == ID_BLOCK_NIDS(b) ) {
1234                         break;
1235                 }
1236
1237                 if ( ID_BLOCK_ID(b, bi) != ID_BLOCK_ID(a, ai) ) {
1238                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1239                 }
1240         }
1241
1242         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1243                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1244         }
1245         ID_BLOCK_NIDS(n) = ni;
1246
1247 #ifdef LDBM_DEBUG_IDL
1248         idl_check(n);
1249 #endif
1250
1251         return( n );
1252 }
1253
1254 /*      return the first ID in the block
1255  *      if ALLIDS block
1256  *              NIDS > 1 return 1
1257  *              otherwise return NOID 
1258  *      otherwise return first ID
1259  *
1260  *      cursor is set to 1
1261  */         
1262 ID
1263 idl_firstid( ID_BLOCK *idl, ID *cursor )
1264 {
1265         *cursor = 1;
1266
1267         if ( idl == NULL || ID_BLOCK_NIDS(idl) == 0 ) {
1268                 return( NOID );
1269         }
1270
1271         if ( ID_BLOCK_ALLIDS( idl ) ) {
1272                 return( ID_BLOCK_NIDS(idl) > 1 ? 1 : NOID );
1273         }
1274
1275         return( ID_BLOCK_ID(idl, 0) );
1276 }
1277
1278 /*      return next ID
1279  *      if ALLIDS block, cursor is id.
1280  *              increment id
1281  *              if id < NIDS return id
1282  *              otherwise NOID.
1283  *      otherwise cursor is index into block
1284  *              if index < nids
1285  *                      return id at index then increment
1286  */ 
1287 ID
1288 idl_nextid( ID_BLOCK *idl, ID *cursor )
1289 {
1290         if ( ID_BLOCK_ALLIDS( idl ) ) {
1291                 if( ++(*cursor) < ID_BLOCK_NIDS(idl) ) {
1292                         return *cursor;
1293                 } else {
1294                         return NOID;
1295                 }
1296         }
1297
1298         if ( *cursor < ID_BLOCK_NIDS(idl) ) {
1299                 return( ID_BLOCK_ID(idl, (*cursor)++) );
1300         }
1301
1302         return( NOID );
1303 }