]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ldbm/idl.c
ITS#1570: IDL patch
[openldap] / servers / slapd / back-ldbm / idl.c
1 /* idl.c - ldap id list handling routines */
2 /* $OpenLDAP$ */
3 /*
4  * Copyright 1998-2002 The OpenLDAP Foundation, All Rights Reserved.
5  * COPYING RESTRICTIONS APPLY, see COPYRIGHT file
6  */
7
8 #include "portable.h"
9
10 #include <stdio.h>
11
12 #include <ac/string.h>
13 #include <ac/socket.h>
14
15 #include "slap.h"
16 #include "back-ldbm.h"
17
18 static ID_BLOCK* idl_dup( ID_BLOCK *idl );
19
20 static void cont_alloc( Datum *cont, Datum *key )
21 {
22         ldbm_datum_init( *cont );
23         cont->dsize = 1 + sizeof(ID) + key->dsize;
24         cont->dptr = ch_malloc( cont->dsize );
25
26         * (unsigned char *) cont->dptr = SLAP_INDEX_CONT_PREFIX;
27
28         AC_MEMCPY( &((unsigned char *)cont->dptr)[1 + sizeof(ID)],
29                 key->dptr, key->dsize );
30 }
31
32 static void cont_id( Datum *cont, ID id )
33 {
34         int i;
35
36         for( i=1; i <= sizeof(id); i++) {
37                 ((unsigned char *)cont->dptr)[i] = (unsigned char)(id & 0xFF);
38                 id >>= 8;
39         }
40
41 }
42
43 static void cont_free( Datum *cont )
44 {
45         ch_free( cont->dptr );
46 }
47
48 #ifdef LDBM_DEBUG_IDL
49 static void idl_check(ID_BLOCK *idl)
50 {
51         int i;
52         ID_BLOCK last;
53
54         if( ID_BLOCK_INDIRECT(idl) || ID_BLOCK_ALLIDS(idl)
55                 || ID_BLOCK_NIDS(idl) <= 1 )
56         {
57                 return;
58         }
59
60         for( last = ID_BLOCK_ID(idl, 0), i = 1;
61                 i < ID_BLOCK_NIDS(idl);
62                 last = ID_BLOCK_ID(idl, i), i++ )
63         {
64                 assert (last < ID_BLOCK_ID(idl, i) );
65         }
66 }
67 #endif
68
69 /* Allocate an ID_BLOCK with room for nids ids */
70 ID_BLOCK *
71 idl_alloc( unsigned int nids )
72 {
73         ID_BLOCK        *new;
74
75         /* nmax + nids + space for the ids */
76         new = (ID_BLOCK *) ch_calloc( (ID_BLOCK_IDS_OFFSET + nids), sizeof(ID) );
77         ID_BLOCK_NMAX(new) = nids;
78         ID_BLOCK_NIDS(new) = 0;
79
80         return( new );
81 }
82
83
84 /* Allocate an empty ALLIDS ID_BLOCK */
85 ID_BLOCK        *
86 idl_allids( Backend *be )
87 {
88         ID_BLOCK        *idl;
89         ID              id;
90
91         idl = idl_alloc( 0 );
92         ID_BLOCK_NMAX(idl) = ID_BLOCK_ALLIDS_VALUE;
93         if ( next_id_get( be, &id ) ) {
94                 return NULL;
95         }
96         ID_BLOCK_NIDS(idl) = id;
97
98         return( idl );
99 }
100
101 /* Free an ID_BLOCK */
102 void
103 idl_free( ID_BLOCK *idl )
104 {
105         if ( idl == NULL ) {
106                 Debug( LDAP_DEBUG_TRACE,
107                         "idl_free: called with NULL pointer\n",
108                         0, 0, 0 );
109                 return;
110         }
111
112         free( (char *) idl );
113 }
114
115
116 /* Fetch an single ID_BLOCK from the cache */
117 static ID_BLOCK *
118 idl_fetch_one(
119     Backend             *be,
120     DBCache     *db,
121     Datum               key
122 )
123 {
124         Datum   data;
125         ID_BLOCK        *idl;
126
127         /* Debug( LDAP_DEBUG_TRACE, "=> idl_fetch_one\n", 0, 0, 0 ); */
128
129         data = ldbm_cache_fetch( db, key );
130
131         if( data.dptr == NULL ) {
132                 return NULL;
133         }
134
135         idl = idl_dup((ID_BLOCK *) data.dptr);
136
137         ldbm_datum_free( db->dbc_db, data );
138
139         return idl;
140 }
141
142
143 /* Fetch a set of ID_BLOCKs from the cache
144  *      if not INDIRECT
145  *              if block return is an ALLIDS block,
146  *                      return an new ALLIDS block
147  *              otherwise
148  *                      return block
149  *      construct super block from all blocks referenced by INDIRECT block
150  *      return super block
151  */
152 ID_BLOCK *
153 idl_fetch(
154     Backend             *be,
155     DBCache     *db,
156     Datum               key
157 )
158 {
159         Datum   data;
160         ID_BLOCK        *idl;
161         ID_BLOCK        **tmp;
162         int     i, nids;
163
164         idl = idl_fetch_one( be, db, key );
165
166         if ( idl == NULL ) {
167                 return NULL;
168         }
169
170         if ( ID_BLOCK_ALLIDS(idl) ) {
171                 /* all ids block */
172                 /* make sure we have the current value of highest id */
173                 idl_free( idl );
174                 idl = idl_allids( be );
175
176                 return( idl );
177         }
178
179         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
180                 /* regular block */
181                 return( idl );
182         }
183
184         /*
185          * this is an indirect block which points to other blocks.
186          * we need to read in all the blocks it points to and construct
187          * a big id list containing all the ids, which we will return.
188          */
189
190         /* count the number of blocks & allocate space for pointers to them */
191         for ( i = 0; !ID_BLOCK_NOID(idl, i); i++ )
192                 ;       /* NULL */
193         tmp = (ID_BLOCK **) ch_malloc( (i + 1) * sizeof(ID_BLOCK *) );
194
195         /* read in all the blocks */
196         cont_alloc( &data, &key );
197         nids = 0;
198         for ( i = 0; !ID_BLOCK_NOID(idl, i); i++ ) {
199                 cont_id( &data, ID_BLOCK_ID(idl, i) );
200
201                 if ( (tmp[i] = idl_fetch_one( be, db, data )) == NULL ) {
202                         Debug( LDAP_DEBUG_ANY,
203                             "idl_fetch: one returned NULL\n", 0, 0, 0 );
204                         continue;
205                 }
206
207                 nids += ID_BLOCK_NIDS(tmp[i]);
208         }
209         tmp[i] = NULL;
210         cont_free( &data );
211         idl_free( idl );
212
213         /* allocate space for the big block */
214         idl = idl_alloc( nids );
215         ID_BLOCK_NIDS(idl) = nids;
216         nids = 0;
217
218         /* copy in all the ids from the component blocks */
219         for ( i = 0; tmp[i] != NULL; i++ ) {
220                 if ( tmp[i] == NULL ) {
221                         continue;
222                 }
223
224                 AC_MEMCPY(
225                         (char *) &ID_BLOCK_ID(idl, nids),
226                         (char *) &ID_BLOCK_ID(tmp[i], 0),
227                         ID_BLOCK_NIDS(tmp[i]) * sizeof(ID) );
228                 nids += ID_BLOCK_NIDS(tmp[i]);
229
230                 idl_free( tmp[i] );
231         }
232         free( (char *) tmp );
233
234 #ifdef LDBM_DEBUG_IDL
235         idl_check(idl);
236 #endif
237
238         Debug( LDAP_DEBUG_TRACE, "<= idl_fetch %ld ids (%ld max)\n",
239                ID_BLOCK_NIDS(idl), ID_BLOCK_NMAX(idl), 0 );
240         return( idl );
241 }
242
243
244 /* store a single block */
245 static int
246 idl_store(
247     Backend             *be,
248     DBCache     *db,
249     Datum               key, 
250     ID_BLOCK            *idl
251 )
252 {
253         int     rc, flags;
254         Datum   data;
255         struct ldbminfo *li = (struct ldbminfo *) be->be_private;
256
257 #ifdef LDBM_DEBUG_IDL
258         idl_check(idl);
259 #endif
260
261         ldbm_datum_init( data );
262
263         /* Debug( LDAP_DEBUG_TRACE, "=> idl_store\n", 0, 0, 0 ); */
264
265         data.dptr = (char *) idl;
266         data.dsize = (ID_BLOCK_IDS_OFFSET + ID_BLOCK_NMAX(idl)) * sizeof(ID);
267         
268 #ifdef LDBM_DEBUG
269         Statslog( LDAP_DEBUG_STATS, "<= idl_store(): rc=%d\n",
270                 rc, 0, 0, 0, 0 );
271 #endif
272
273         flags = LDBM_REPLACE;
274         rc = ldbm_cache_store( db, key, data, flags );
275
276         /* Debug( LDAP_DEBUG_TRACE, "<= idl_store %d\n", rc, 0, 0 ); */
277         return( rc );
278 }
279
280 /* split the block at id 
281  *      locate ID greater than or equal to id.
282  */
283 static void
284 idl_split_block(
285     ID_BLOCK    *b,
286     ID          id,
287     ID_BLOCK    **right,
288     ID_BLOCK    **left
289 )
290 {
291         unsigned int    nr, nl;
292
293         /* find where to split the block *//* XXX linear search XXX */
294         for ( nr = 0; nr < ID_BLOCK_NIDS(b) && id > ID_BLOCK_ID(b, nr); nr++ )
295                 ;       /* NULL */
296
297         nl = ID_BLOCK_NIDS(b) - nr;
298
299         *right = idl_alloc( nr == 0 ? 1 : nr );
300         *left = idl_alloc( nl + (nr == 0 ? 0 : 1));
301
302         /*
303          * everything before the id being inserted in the first block
304          * unless there is nothing, in which case the id being inserted
305          * goes there.
306          */
307         if ( nr == 0 ) {
308                 ID_BLOCK_NIDS(*right) = 1;
309                 ID_BLOCK_ID(*right, 0) = id;
310         } else {
311                 AC_MEMCPY(
312                         (char *) &ID_BLOCK_ID(*right, 0),
313                         (char *) &ID_BLOCK_ID(b, 0),
314                         nr * sizeof(ID) );
315                 ID_BLOCK_NIDS(*right) = nr;
316                 ID_BLOCK_ID(*left, 0) = id;
317         }
318
319         /* the id being inserted & everything after in the second block */
320         AC_MEMCPY(
321                 (char *) &ID_BLOCK_ID(*left, (nr == 0 ? 0 : 1)),
322             (char *) &ID_BLOCK_ID(b, nr),
323                 nl * sizeof(ID) );
324         ID_BLOCK_NIDS(*left) = nl + (nr == 0 ? 0 : 1);
325
326 #ifdef LDBM_DEBUG_IDL
327         idl_check(*right);
328         idl_check(*left);
329 #endif
330 }
331
332
333 /*
334  * idl_change_first - called when an indirect block's first key has
335  * changed, meaning it needs to be stored under a new key, and the
336  * header block pointing to it needs updating.
337  */
338 static int
339 idl_change_first(
340     Backend             *be,
341     DBCache     *db,
342     Datum               hkey,           /* header block key     */
343     ID_BLOCK            *h,             /* header block         */
344     int                 pos,            /* pos in h to update   */
345     Datum               bkey,           /* data block key       */
346     ID_BLOCK            *b              /* data block           */
347 )
348 {
349         int     rc;
350
351         /* Debug( LDAP_DEBUG_TRACE, "=> idl_change_first\n", 0, 0, 0 ); */
352
353         /* delete old key block */
354         if ( (rc = ldbm_cache_delete( db, bkey )) != 0 ) {
355                 Debug( LDAP_DEBUG_ANY,
356                     "idl_change_first: ldbm_cache_delete returned %d\n",
357                         rc, 0, 0 );
358                 return( rc );
359         }
360
361         /* write block with new key */
362         cont_id( &bkey, ID_BLOCK_ID(b, 0) );
363
364         if ( (rc = idl_store( be, db, bkey, b )) != 0 ) {
365                 Debug( LDAP_DEBUG_ANY,
366                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
367                 return( rc );
368         }
369
370         /* update + write indirect header block */
371         ID_BLOCK_ID(h, pos) = ID_BLOCK_ID(b, 0);
372         if ( (rc = idl_store( be, db, hkey, h )) != 0 ) {
373                 Debug( LDAP_DEBUG_ANY,
374                     "idl_change_first: idl_store returned %d\n", rc, 0, 0 );
375                 return( rc );
376         }
377
378         return( 0 );
379 }
380
381
382 int
383 idl_insert_key(
384     Backend             *be,
385     DBCache     *db,
386     Datum               key,
387     ID                  id
388 )
389 {
390         int     i, j, first, rc;
391         ID_BLOCK        *idl, *tmp, *tmp2, *tmp3;
392         Datum   k2;
393
394         if ( (idl = idl_fetch_one( be, db, key )) == NULL ) {
395                 idl = idl_alloc( 1 );
396                 ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)++) = id;
397                 rc = idl_store( be, db, key, idl );
398
399                 idl_free( idl );
400                 return( rc );
401         }
402
403         if ( ID_BLOCK_ALLIDS( idl ) ) {
404                 /* ALLIDS */
405                 idl_free( idl );
406                 return 0;
407         }
408
409         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
410                 /* regular block */
411                 switch ( idl_insert( &idl, id, db->dbc_maxids ) ) {
412                 case 0:         /* id inserted - store the updated block */
413                 case 1:
414                         rc = idl_store( be, db, key, idl );
415                         break;
416
417                 case 2:         /* id already there - nothing to do */
418                         rc = 0;
419                         break;
420
421                 case 3:         /* id not inserted - block must be split */
422                         /* check threshold for marking this an all-id block */
423                         if ( db->dbc_maxindirect < 2 ) {
424                                 idl_free( idl );
425                                 idl = idl_allids( be );
426                                 rc = idl_store( be, db, key, idl );
427                                 break;
428                         }
429
430                         idl_split_block( idl, id, &tmp, &tmp2 );
431                         idl_free( idl );
432
433                         /* create the header indirect block */
434                         idl = idl_alloc( 3 );
435                         ID_BLOCK_NMAX(idl) = 3;
436                         ID_BLOCK_NIDS(idl) = ID_BLOCK_INDIRECT_VALUE;
437                         ID_BLOCK_ID(idl, 0) = ID_BLOCK_ID(tmp, 0);
438                         ID_BLOCK_ID(idl, 1) = ID_BLOCK_ID(tmp2, 0);
439                         ID_BLOCK_ID(idl, 2) = NOID;
440
441                         /* store it */
442                         rc = idl_store( be, db, key, idl );
443
444                         cont_alloc( &k2, &key );
445                         cont_id( &k2, ID_BLOCK_ID(tmp, 0) );
446
447                         rc = idl_store( be, db, k2, tmp );
448
449                         cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
450                         rc = idl_store( be, db, k2, tmp2 );
451
452                         cont_free( &k2 );
453
454                         idl_free( tmp );
455                         idl_free( tmp2 );
456                         break;
457                 }
458
459                 idl_free( idl );
460                 return( rc );
461         }
462
463         /*
464          * this is an indirect block which points to other blocks.
465          * we need to read in the block into which the id should be
466          * inserted, then insert the id and store the block.  we might
467          * have to split the block if it is full, which means we also
468          * need to write a new "header" block.
469          */
470
471         /* select the block to try inserting into *//* XXX linear search XXX */
472         for ( i = 0; !ID_BLOCK_NOID(idl, i) && id > ID_BLOCK_ID(idl, i); i++ )
473                 ;       /* NULL */
474
475         if ( i != 0 ) {
476                 i--;
477                 first = 0;
478         } else {
479                 first = 1;
480         }
481
482         /* get the block */
483         cont_alloc( &k2, &key );
484         cont_id( &k2, ID_BLOCK_ID(idl, i) );
485
486         if ( (tmp = idl_fetch_one( be, db, k2 )) == NULL ) {
487                 Debug( LDAP_DEBUG_ANY, "idl_insert_key: nonexistent continuation block\n",
488                     0, 0, 0 );
489                 cont_free( &k2 );
490                 idl_free( idl );
491                 return( -1 );
492         }
493
494         /* insert the id */
495         switch ( idl_insert( &tmp, id, db->dbc_maxids ) ) {
496         case 0:         /* id inserted ok */
497                 if ( (rc = idl_store( be, db, k2, tmp )) != 0 ) {
498                         Debug( LDAP_DEBUG_ANY,
499                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
500                 }
501                 break;
502
503         case 1:         /* id inserted - first id in block has changed */
504                 /*
505                  * key for this block has changed, so we have to
506                  * write the block under the new key, delete the
507                  * old key block + update and write the indirect
508                  * header block.
509                  */
510
511                 rc = idl_change_first( be, db, key, idl, i, k2, tmp );
512                 break;
513
514         case 2:         /* id not inserted - already there, do nothing */
515                 rc = 0;
516                 break;
517
518         case 3:         /* id not inserted - block is full */
519                 /*
520                  * first, see if it will fit in the next block,
521                  * without splitting, unless we're trying to insert
522                  * into the beginning of the first block.
523                  */
524
525                 /* is there a next block? */
526                 if ( !first && !ID_BLOCK_NOID(idl, i + 1) ) {
527                         /* read it in */
528                         cont_alloc( &k2, &key );
529                         cont_id( &k2, ID_BLOCK_ID(idl, i) );
530                         if ( (tmp2 = idl_fetch_one( be, db, k2 )) == NULL ) {
531                                 Debug( LDAP_DEBUG_ANY,
532                                     "idl_insert_key: idl_fetch_one returned NULL\n",
533                                     0, 0, 0 );
534                                 /* split the original block */
535                                 cont_free( &k2 );
536                                 goto split;
537                         }
538
539                         /* If the new id is less than the last id in the
540                          * current block, it must not be put into the next
541                          * block. Push the last id of the current block
542                          * into the next block instead.
543                          */
544                         if (id < ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1)) {
545                             ID id2 = ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp) - 1);
546                             Datum k3;
547
548                             ldbm_datum_init( k3 );
549
550                             --ID_BLOCK_NIDS(tmp);
551                             /* This must succeed since we just popped one
552                              * ID off the end of it.
553                              */
554                             rc = idl_insert( &tmp, id, db->dbc_maxids );
555
556                                 k3.dptr = ch_malloc(k2.dsize);
557                                 k3.dsize = k2.dsize;
558                                 AC_MEMCPY(k3.dptr, k2.dptr, k3.dsize);
559                             if ( (rc = idl_store( be, db, k3, tmp )) != 0 ) {
560                                 Debug( LDAP_DEBUG_ANY,
561                             "idl_insert_key: idl_store returned %d\n", rc, 0, 0 );
562                             }
563
564                                 free( k3.dptr );
565
566                             id = id2;
567                             /* This new id will necessarily be inserted
568                              * as the first id of the next block by the
569                              * following switch() statement.
570                              */
571                         }
572
573                         switch ( (rc = idl_insert( &tmp2, id,
574                             db->dbc_maxids )) ) {
575                         case 1:         /* id inserted first in block */
576                                 rc = idl_change_first( be, db, key, idl,
577                                     i + 1, k2, tmp2 );
578                                 /* FALL */
579
580                         case 2:         /* id already there - how? */
581                         case 0:         /* id inserted: this can never be
582                                          * the result of idl_insert, because
583                                          * we guaranteed that idl_change_first
584                                          * will always be called.
585                                          */
586                                 if ( rc == 2 ) {
587                                         Debug( LDAP_DEBUG_ANY,
588                                             "idl_insert_key: id %ld already in next block\n",
589                                             id, 0, 0 );
590                                 }
591
592                                 idl_free( tmp );
593                                 idl_free( tmp2 );
594                                 idl_free( idl );
595                                 return( 0 );
596
597                         case 3:         /* split the original block */
598                                 break;
599                         }
600
601                         idl_free( tmp2 );
602                 }
603
604 split:
605                 /*
606                  * must split the block, write both new blocks + update
607                  * and write the indirect header block.
608                  */
609
610                 rc = 0; /* optimistic */
611
612
613                 /* count how many indirect blocks *//* XXX linear count XXX */
614                 for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ )
615                         ;       /* NULL */
616
617                 /* check it against all-id thresholed */
618                 if ( j + 1 > db->dbc_maxindirect ) {
619                         /*
620                          * we've passed the all-id threshold, meaning
621                          * that this set of blocks should be replaced
622                          * by a single "all-id" block.  our job: delete
623                          * all the indirect blocks, and replace the header
624                          * block by an all-id block.
625                          */
626
627                         /* delete all indirect blocks */
628                         for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ ) {
629                                 cont_id( &k2, ID_BLOCK_ID(idl, j) );
630
631                                 rc = ldbm_cache_delete( db, k2 );
632                         }
633
634                         /* store allid block in place of header block */
635                         idl_free( idl );
636                         idl = idl_allids( be );
637                         rc = idl_store( be, db, key, idl );
638
639                         cont_free( &k2 );
640                         idl_free( idl );
641                         idl_free( tmp );
642                         return( rc );
643                 }
644
645                 idl_split_block( tmp, id, &tmp2, &tmp3 );
646                 idl_free( tmp );
647
648                 /* create a new updated indirect header block */
649                 tmp = idl_alloc( ID_BLOCK_NMAX(idl) + 1 );
650                 ID_BLOCK_NIDS(tmp) = ID_BLOCK_INDIRECT_VALUE;
651                 /* everything up to the split block */
652                 AC_MEMCPY(
653                         (char *) &ID_BLOCK_ID(tmp, 0),
654                         (char *) &ID_BLOCK_ID(idl, 0),
655                     i * sizeof(ID) );
656                 /* the two new blocks */
657                 ID_BLOCK_ID(tmp, i) = ID_BLOCK_ID(tmp2, 0);
658                 ID_BLOCK_ID(tmp, i + 1) = ID_BLOCK_ID(tmp3, 0);
659                 /* everything after the split block */
660                 AC_MEMCPY(
661                         (char *) &ID_BLOCK_ID(tmp, i + 2),
662                         (char *) &ID_BLOCK_ID(idl, i + 1),
663                         (ID_BLOCK_NMAX(idl) - i - 1) * sizeof(ID) );
664
665                 /* store the header block */
666                 rc = idl_store( be, db, key, tmp );
667
668                 /* store the first id block */
669                 cont_id( &k2, ID_BLOCK_ID(tmp2, 0) );
670                 rc = idl_store( be, db, k2, tmp2 );
671
672                 /* store the second id block */
673                 cont_id( &k2, ID_BLOCK_ID(tmp3, 0) );
674                 rc = idl_store( be, db, k2, tmp3 );
675
676                 idl_free( tmp2 );
677                 idl_free( tmp3 );
678                 break;
679         }
680
681         cont_free( &k2 );
682         idl_free( tmp );
683         idl_free( idl );
684         return( rc );
685 }
686
687
688 /*
689  * idl_insert - insert an id into an id list.
690  *
691  *      returns
692  *              0       id inserted
693  *              1       id inserted, first id in block has changed
694  *              2       id not inserted, already there
695  *              3       id not inserted, block must be split
696  */
697 int
698 idl_insert( ID_BLOCK **idl, ID id, unsigned int maxids )
699 {
700         unsigned int    i;
701
702         if ( ID_BLOCK_ALLIDS( *idl ) ) {
703                 return( 2 );    /* already there */
704         }
705
706         /* is it already there? *//* XXX linear search XXX */
707         for ( i = 0; i < ID_BLOCK_NIDS(*idl) && id > ID_BLOCK_ID(*idl, i); i++ ) {
708                 ;       /* NULL */
709         }
710         if ( i < ID_BLOCK_NIDS(*idl) && ID_BLOCK_ID(*idl, i) == id ) {
711                 return( 2 );    /* already there */
712         }
713
714         /* do we need to make room for it? */
715         if ( ID_BLOCK_NIDS(*idl) == ID_BLOCK_NMAX(*idl) ) {
716                 /* make room or indicate block needs splitting */
717                 if ( ID_BLOCK_NMAX(*idl) >= maxids ) {
718                         return( 3 );    /* block needs splitting */
719                 }
720
721                 ID_BLOCK_NMAX(*idl) *= 2;
722                 if ( ID_BLOCK_NMAX(*idl) > maxids ) {
723                         ID_BLOCK_NMAX(*idl) = maxids;
724                 }
725                 *idl = (ID_BLOCK *) ch_realloc( (char *) *idl,
726                     (ID_BLOCK_NMAX(*idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
727         }
728
729         /* make a slot for the new id */
730         AC_MEMCPY( &ID_BLOCK_ID(*idl, i+1), &ID_BLOCK_ID(*idl, i),
731                     (ID_BLOCK_NIDS(*idl) - i) * sizeof(ID) );
732
733         ID_BLOCK_ID(*idl, i) = id;
734         ID_BLOCK_NIDS(*idl)++;
735         (void) memset(
736                 (char *) &ID_BLOCK_ID((*idl), ID_BLOCK_NIDS(*idl)),
737                 '\0',
738             (ID_BLOCK_NMAX(*idl) - ID_BLOCK_NIDS(*idl)) * sizeof(ID) );
739
740 #ifdef LDBM_DEBUG_IDL
741         idl_check(*idl);
742 #endif
743
744         return( i == 0 ? 1 : 0 );       /* inserted - first id changed or not */
745 }
746
747
748 int
749 idl_delete_key (
750         Backend         *be,
751         DBCache  *db,
752         Datum           key,
753         ID              id
754 )
755 {
756         Datum  data;
757         ID_BLOCK *idl;
758         unsigned i;
759         int j, nids;
760
761         if ( (idl = idl_fetch_one( be, db, key ) ) == NULL )
762         {
763                 /* It wasn't found.  Hmm... */
764                 return -1;
765         }
766
767         if ( ID_BLOCK_ALLIDS( idl ) ) {
768                 idl_free( idl );
769                 return 0;
770         }
771
772         if ( ! ID_BLOCK_INDIRECT( idl ) ) {
773                 for ( i=0; i < ID_BLOCK_NIDS(idl); i++ ) {
774                         if ( ID_BLOCK_ID(idl, i) == id ) {
775                                 if( --ID_BLOCK_NIDS(idl) == 0 ) {
776                                         ldbm_cache_delete( db, key );
777
778                                 } else {
779                                         AC_MEMCPY(
780                                                 &ID_BLOCK_ID(idl, i),
781                                                 &ID_BLOCK_ID(idl, i+1),
782                                                 (ID_BLOCK_NIDS(idl)-i) * sizeof(ID) );
783
784                                         ID_BLOCK_ID(idl, ID_BLOCK_NIDS(idl)) = NOID;
785
786                                         idl_store( be, db, key, idl );
787                                 }
788
789                                 idl_free( idl );
790                                 return 0;
791                         }
792                         /*  We didn't find the ID.  Hmmm... */
793                 }
794                 idl_free( idl );
795                 return -1;
796         }
797         
798         /* We have to go through an indirect block and find the ID
799            in the list of IDL's
800            */
801         for ( nids = 0; !ID_BLOCK_NOID(idl, nids); nids++ )
802                 ;       /* NULL */
803
804         cont_alloc( &data, &key );
805
806         for ( j = 0; !ID_BLOCK_NOID(idl, j); j++ ) 
807         {
808                 ID_BLOCK *tmp;
809                 cont_id( &data, ID_BLOCK_ID(idl, j) );
810
811                 if ( (tmp = idl_fetch_one( be, db, data )) == NULL ) {
812                         Debug( LDAP_DEBUG_ANY,
813                             "idl_delete_key: idl_fetch of returned NULL\n", 0, 0, 0 );
814                         continue;
815                 }
816                 /*
817                    Now try to find the ID in tmp
818                 */
819                 for ( i=0; i < ID_BLOCK_NIDS(tmp); i++ )
820                 {
821                         if ( ID_BLOCK_ID(tmp, i) == id )
822                         {
823                                 AC_MEMCPY(
824                                         &ID_BLOCK_ID(tmp, i),
825                                         &ID_BLOCK_ID(tmp, i+1),
826                                         (ID_BLOCK_NIDS(tmp)-(i+1)) * sizeof(ID));
827                                 ID_BLOCK_ID(tmp, ID_BLOCK_NIDS(tmp)-1 ) = NOID;
828                                 ID_BLOCK_NIDS(tmp)--;
829
830                                 if ( ID_BLOCK_NIDS(tmp) ) {
831                                         idl_store ( be, db, data, tmp );
832
833                                 } else {
834                                         ldbm_cache_delete( db, data );
835                                         AC_MEMCPY(
836                                                 &ID_BLOCK_ID(idl, j),
837                                                 &ID_BLOCK_ID(idl, j+1),
838                                                 (nids-(j+1)) * sizeof(ID));
839                                         ID_BLOCK_ID(idl, nids-1) = NOID;
840                                         nids--;
841                                         if ( ! nids )
842                                                 ldbm_cache_delete( db, key );
843                                         else
844                                                 idl_store( be, db, key, idl );
845                                 }
846                                 idl_free( tmp );
847                                 cont_free( &data );
848                                 idl_free( idl );
849                                 return 0;
850                         }
851                 }
852                 idl_free( tmp );
853         }
854
855         cont_free( &data );
856         idl_free( idl );
857         return -1;
858 }
859
860
861 /* return a duplicate of a single ID_BLOCK */
862 static ID_BLOCK *
863 idl_dup( ID_BLOCK *idl )
864 {
865         ID_BLOCK        *new;
866
867         if ( idl == NULL ) {
868                 return( NULL );
869         }
870
871         new = idl_alloc( ID_BLOCK_NMAX(idl) );
872
873         AC_MEMCPY(
874                 (char *) new,
875                 (char *) idl,
876                 (ID_BLOCK_NMAX(idl) + ID_BLOCK_IDS_OFFSET) * sizeof(ID) );
877
878 #ifdef LDBM_DEBUG_IDL
879         idl_check(new);
880 #endif
881
882         return( new );
883 }
884
885
886 /* return the smaller ID_BLOCK */
887 static ID_BLOCK *
888 idl_min( ID_BLOCK *a, ID_BLOCK *b )
889 {
890         return( ID_BLOCK_NIDS(a) > ID_BLOCK_NIDS(b) ? b : a );
891 }
892
893
894 /*
895  * idl_intersection - return a intersection b
896  */
897 ID_BLOCK *
898 idl_intersection(
899     Backend     *be,
900     ID_BLOCK    *a,
901     ID_BLOCK    *b
902 )
903 {
904         unsigned int    ai, bi, ni;
905         ID_BLOCK                *n;
906
907         if ( a == NULL || b == NULL ) {
908                 return( NULL );
909         }
910         if ( ID_BLOCK_ALLIDS( a ) ) {
911                 return( idl_dup( b ) );
912         }
913         if ( ID_BLOCK_ALLIDS( b ) ) {
914                 return( idl_dup( a ) );
915         }
916
917         n = idl_dup( idl_min( a, b ) );
918
919 #ifdef LDBM_DEBUG_IDL
920         idl_check(a);
921         idl_check(b);
922 #endif
923
924         for ( ni = 0, ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
925                 if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
926                         continue;
927                 }
928                 for ( ;
929                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
930                         bi++ )
931                 {
932                         ;       /* NULL */
933                 }
934
935                 if ( bi == ID_BLOCK_NIDS(b) ) {
936                         break;
937                 }
938
939                 if ( ID_BLOCK_ID(b, bi) == ID_BLOCK_ID(a, ai) ) {
940                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
941                         bi++;
942                 }
943         }
944
945         if ( ni == 0 ) {
946                 idl_free( n );
947                 return( NULL );
948         }
949         ID_BLOCK_NIDS(n) = ni;
950
951 #ifdef LDBM_DEBUG_IDL
952         idl_check(n);
953 #endif
954
955         return( n );
956 }
957
958
959 /*
960  * idl_union - return a union b
961  */
962 ID_BLOCK *
963 idl_union(
964     Backend     *be,
965     ID_BLOCK    *a,
966     ID_BLOCK    *b
967 )
968 {
969         unsigned int    ai, bi, ni;
970         ID_BLOCK                *n;
971
972         if ( a == NULL ) {
973                 return( idl_dup( b ) );
974         }
975         if ( b == NULL ) {
976                 return( idl_dup( a ) );
977         }
978         if ( ID_BLOCK_ALLIDS( a ) || ID_BLOCK_ALLIDS( b ) ) {
979                 return( idl_allids( be ) );
980         }
981
982 #ifdef LDBM_DEBUG_IDL
983         idl_check(a);
984         idl_check(b);
985 #endif
986
987         if ( ID_BLOCK_NIDS(b) < ID_BLOCK_NIDS(a) ) {
988                 n = a;
989                 a = b;
990                 b = n;
991         }
992
993         n = idl_alloc( ID_BLOCK_NIDS(a) + ID_BLOCK_NIDS(b) );
994
995         for ( ni = 0, ai = 0, bi = 0;
996                 ai < ID_BLOCK_NIDS(a) && bi < ID_BLOCK_NIDS(b);
997                 )
998         {
999                 if ( ID_BLOCK_ID(a, ai) < ID_BLOCK_ID(b, bi) ) {
1000                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai++);
1001
1002                 } else if ( ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai) ) {
1003                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi++);
1004
1005                 } else {
1006                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1007                         ai++, bi++;
1008                 }
1009         }
1010
1011         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1012                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1013         }
1014         for ( ; bi < ID_BLOCK_NIDS(b); bi++ ) {
1015                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(b, bi);
1016         }
1017         ID_BLOCK_NIDS(n) = ni;
1018
1019 #ifdef LDBM_DEBUG_IDL
1020         idl_check(n);
1021 #endif
1022
1023         return( n );
1024 }
1025
1026
1027 /*
1028  * idl_notin - return a intersection ~b (or a minus b)
1029  */
1030 ID_BLOCK *
1031 idl_notin(
1032     Backend     *be,
1033     ID_BLOCK    *a,
1034     ID_BLOCK    *b
1035 )
1036 {
1037         unsigned int    ni, ai, bi;
1038         ID_BLOCK                *n;
1039
1040         if ( a == NULL ) {
1041                 return( NULL );
1042         }
1043         if ( b == NULL || ID_BLOCK_ALLIDS( b )) {
1044                 return( idl_dup( a ) );
1045         }
1046
1047         if ( ID_BLOCK_ALLIDS( a ) ) {
1048                 n = idl_alloc( SLAPD_LDBM_MIN_MAXIDS );
1049                 ni = 0;
1050
1051                 for ( ai = 1, bi = 0;
1052                         ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAX(n) && bi < ID_BLOCK_NMAX(b);
1053                         ai++ )
1054                 {
1055                         if ( ID_BLOCK_ID(b, bi) == ai ) {
1056                                 bi++;
1057                         } else {
1058                                 ID_BLOCK_ID(n, ni++) = ai;
1059                         }
1060                 }
1061
1062                 for ( ; ai < ID_BLOCK_NIDS(a) && ni < ID_BLOCK_NMAX(n); ai++ ) {
1063                         ID_BLOCK_ID(n, ni++) = ai;
1064                 }
1065
1066                 if ( ni == ID_BLOCK_NMAX(n) ) {
1067                         idl_free( n );
1068                         return( idl_allids( be ) );
1069                 } else {
1070                         ID_BLOCK_NIDS(n) = ni;
1071                         return( n );
1072                 }
1073         }
1074
1075         n = idl_dup( a );
1076
1077         ni = 0;
1078         for ( ai = 0, bi = 0; ai < ID_BLOCK_NIDS(a); ai++ ) {
1079                 for ( ;
1080                         bi < ID_BLOCK_NIDS(b) && ID_BLOCK_ID(b, bi) < ID_BLOCK_ID(a, ai);
1081                     bi++ )
1082                 {
1083                         ;       /* NULL */
1084                 }
1085
1086                 if ( bi == ID_BLOCK_NIDS(b) ) {
1087                         break;
1088                 }
1089
1090                 if ( ID_BLOCK_ID(b, bi) != ID_BLOCK_ID(a, ai) ) {
1091                         ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1092                 }
1093         }
1094
1095         for ( ; ai < ID_BLOCK_NIDS(a); ai++ ) {
1096                 ID_BLOCK_ID(n, ni++) = ID_BLOCK_ID(a, ai);
1097         }
1098         ID_BLOCK_NIDS(n) = ni;
1099
1100 #ifdef LDBM_DEBUG_IDL
1101         idl_check(n);
1102 #endif
1103
1104         return( n );
1105 }
1106
1107 /*      return the first ID in the block
1108  *      if ALLIDS block
1109  *              NIDS > 1 return 1
1110  *              otherwise return NOID 
1111  *      otherwise return first ID
1112  *
1113  *      cursor is set to 1
1114  */         
1115 ID
1116 idl_firstid( ID_BLOCK *idl, ID *cursor )
1117 {
1118         *cursor = 1;
1119
1120         if ( idl == NULL || ID_BLOCK_NIDS(idl) == 0 ) {
1121                 return( NOID );
1122         }
1123
1124         if ( ID_BLOCK_ALLIDS( idl ) ) {
1125                 return( ID_BLOCK_NIDS(idl) > 1 ? 1 : NOID );
1126         }
1127
1128         return( ID_BLOCK_ID(idl, 0) );
1129 }
1130
1131 /*      return next ID
1132  *      if ALLIDS block, cursor is id.
1133  *              increment id
1134  *              if id < NIDS return id
1135  *              otherwise NOID.
1136  *      otherwise cursor is index into block
1137  *              if index < nids
1138  *                      return id at index then increment
1139  */ 
1140 ID
1141 idl_nextid( ID_BLOCK *idl, ID *cursor )
1142 {
1143         if ( ID_BLOCK_ALLIDS( idl ) ) {
1144                 if( ++(*cursor) < ID_BLOCK_NIDS(idl) ) {
1145                         return *cursor;
1146                 } else {
1147                         return NOID;
1148                 }
1149         }
1150
1151         if ( *cursor < ID_BLOCK_NIDS(idl) ) {
1152                 return( ID_BLOCK_ID(idl, (*cursor)++) );
1153         }
1154
1155         return( NOID );
1156 }