]> git.sur5r.net Git - openldap/blob - libraries/libmdb/mdb.c
Drop header page
[openldap] / libraries / libmdb / mdb.c
1 #include <sys/types.h>
2 #include <sys/stat.h>
3 #include <sys/queue.h>
4 #include <sys/param.h>
5 #include <sys/uio.h>
6 #include <sys/mman.h>
7 #ifdef HAVE_SYS_FILE_H
8 #include <sys/file.h>
9 #endif
10 #include <sys/ipc.h>
11 #include <sys/shm.h>
12
13 #include <assert.h>
14 #include <err.h>
15 #include <errno.h>
16 #include <fcntl.h>
17 #include <stddef.h>
18 #include <stdint.h>
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <pthread.h>
25
26 #include "mdb.h"
27
28 #ifndef DEBUG
29 #define DEBUG 1
30 #endif
31
32 #if (DEBUG +0) && defined(__GNUC__)
33 # define DPRINTF(fmt, ...) \
34         fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, ##__VA_ARGS__)
35 #else
36 # define DPRINTF(...)   ((void) 0)
37 #endif
38
39 #define PAGESIZE         4096
40 #define MDB_MINKEYS      4
41 #define MDB_MAGIC        0xBEEFC0DE
42 #define MDB_VERSION      1
43 #define MAXKEYSIZE       255
44
45 #define P_INVALID        (~0UL)
46
47 #define F_ISSET(w, f)    (((w) & (f)) == (f))
48
49 typedef ulong            pgno_t;
50 typedef uint16_t         indx_t;
51
52 #define DEFAULT_READERS 126
53 #define DEFAULT_MAPSIZE 1048576
54
55 /* Lock descriptor stuff */
56 #define RXBODY  \
57         ulong           mr_txnid; \
58         pid_t           mr_pid; \
59         pthread_t       mr_tid
60 typedef struct MDB_rxbody {
61         RXBODY;
62 } MDB_rxbody;
63
64 #ifndef CACHELINE
65 #define CACHELINE       64      /* most CPUs. Itanium uses 128 */
66 #endif
67
68 typedef struct MDB_reader {
69         RXBODY;
70         /* cache line alignment */
71         char pad[CACHELINE-sizeof(MDB_rxbody)];
72 } MDB_reader;
73
74 #define TXBODY \
75         uint32_t        mt_magic;       \
76         uint32_t        mt_version;     \
77         pthread_mutex_t mt_mutex;       \
78         ulong           mt_txnid;       \
79         uint32_t        mt_numreaders
80 typedef struct MDB_txbody {
81         TXBODY;
82 } MDB_txbody;
83
84 typedef struct MDB_txninfo {
85         TXBODY;
86         char pad[CACHELINE-sizeof(MDB_txbody)];
87         pthread_mutex_t mt_wmutex;
88         char pad2[CACHELINE-sizeof(pthread_mutex_t)];
89         MDB_reader      mt_readers[1];
90 } MDB_txninfo;
91
92 /* Common header for all page types. Overflow pages
93  * occupy a number of contiguous pages with no
94  * headers on any page after the first.
95  */
96 typedef struct MDB_page {               /* represents a page of storage */
97         pgno_t          mp_pgno;                /* page number */
98 #define P_BRANCH         0x01           /* branch page */
99 #define P_LEAF           0x02           /* leaf page */
100 #define P_OVERFLOW       0x04           /* overflow page */
101 #define P_META           0x08           /* meta page */
102 #define P_DIRTY          0x10           /* dirty page */
103         uint32_t        mp_flags;
104 #define mp_lower        mp_pb.pb.pb_lower
105 #define mp_upper        mp_pb.pb.pb_upper
106 #define mp_pages        mp_pb.pb_pages
107         union page_bounds {
108                 struct {
109                         indx_t          pb_lower;               /* lower bound of free space */
110                         indx_t          pb_upper;               /* upper bound of free space */
111                 } pb;
112                 uint32_t        pb_pages;       /* number of overflow pages */
113         } mp_pb;
114         indx_t          mp_ptrs[1];             /* dynamic size */
115 } MDB_page;
116
117 #define PAGEHDRSZ        ((unsigned) offsetof(MDB_page, mp_ptrs))
118
119 #define NUMKEYS(p)       (((p)->mp_lower - PAGEHDRSZ) >> 1)
120 #define SIZELEFT(p)      (indx_t)((p)->mp_upper - (p)->mp_lower)
121 #define PAGEFILL(env, p) (1000L * ((env)->me_meta.mm_stat.ms_psize - PAGEHDRSZ - SIZELEFT(p)) / \
122                                 ((env)->me_meta.mm_stat.ms_psize - PAGEHDRSZ))
123 #define IS_LEAF(p)       F_ISSET((p)->mp_flags, P_LEAF)
124 #define IS_BRANCH(p)     F_ISSET((p)->mp_flags, P_BRANCH)
125 #define IS_OVERFLOW(p)   F_ISSET((p)->mp_flags, P_OVERFLOW)
126
127 typedef struct MDB_meta {                       /* meta (footer) page content */
128         uint32_t        mm_magic;
129         uint32_t        mm_version;
130         void            *mm_address;            /* address for fixed mapping */
131         size_t          mm_mapsize;                     /* size of mmap region */
132         pgno_t          mm_last_pg;                     /* last used page in file */
133         ulong           mm_txnid;                       /* txnid that committed this page */
134         MDB_stat        mm_stat;
135         pgno_t          mm_root;                        /* page number of root page */
136 } MDB_meta;
137
138 typedef struct MDB_dhead {                                      /* a dirty page */
139         SIMPLEQ_ENTRY(MDB_dpage)         md_next;       /* queue of dirty pages */
140         MDB_page        *md_parent;
141         unsigned        md_pi;                          /* parent index */
142         int                     md_num;
143 } MDB_dhead;
144
145 typedef struct MDB_dpage {
146         MDB_dhead       h;
147         MDB_page        p;
148 } MDB_dpage;
149
150 SIMPLEQ_HEAD(dirty_queue, MDB_dpage);
151
152 typedef struct MDB_pageparent {
153         MDB_page *mp_page;
154         MDB_page *mp_parent;
155         unsigned mp_pi;
156 } MDB_pageparent;
157
158 static MDB_dpage *mdb_newpage(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num);
159 static int              mdb_touch(MDB_txn *txn, MDB_pageparent *mp);
160
161 typedef struct MDB_ppage {                                      /* ordered list of pages */
162         SLIST_ENTRY(MDB_ppage)   mp_entry;
163         MDB_page                *mp_page;
164         unsigned int    mp_ki;          /* cursor index on page */
165 } MDB_ppage;
166 SLIST_HEAD(page_stack, MDB_ppage);
167
168 #define CURSOR_EMPTY(c)          SLIST_EMPTY(&(c)->mc_stack)
169 #define CURSOR_TOP(c)            SLIST_FIRST(&(c)->mc_stack)
170 #define CURSOR_POP(c)            SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
171 #define CURSOR_PUSH(c,p)         SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry)
172
173 struct MDB_cursor {
174         MDB_db          *mc_db;
175         MDB_txn         *mc_txn;
176         struct page_stack        mc_stack;              /* stack of parent pages */
177         short           mc_initialized; /* 1 if initialized */
178         short           mc_eof;         /* 1 if end is reached */
179 };
180
181 #define METAHASHLEN      offsetof(MDB_meta, mm_hash)
182 #define METADATA(p)      ((void *)((char *)p + PAGEHDRSZ))
183
184 typedef struct MDB_node {
185 #define mn_pgno          mn_p.np_pgno
186 #define mn_dsize         mn_p.np_dsize
187         union {
188                 pgno_t           np_pgno;       /* child page number */
189                 uint32_t         np_dsize;      /* leaf data size */
190         } mn_p;
191         unsigned int    mn_flags:4;
192         unsigned int    mn_ksize:12;                    /* key size */
193 #define F_BIGDATA        0x01                   /* data put on overflow page */
194         char            mn_data[1];
195 } MDB_node;
196
197 struct MDB_txn {
198         pgno_t          mt_root;                /* current / new root page */
199         pgno_t          mt_next_pgno;   /* next unallocated page */
200         pgno_t          mt_first_pgno;
201         ulong           mt_txnid;
202         MDB_env         *mt_env;        
203         union {
204                 struct dirty_queue      *dirty_queue;   /* modified pages */
205                 MDB_reader      *reader;
206         } mt_u;
207 #define MDB_TXN_RDONLY           0x01           /* read-only transaction */
208 #define MDB_TXN_ERROR            0x02           /* an error has occurred */
209         unsigned int             mt_flags;
210 };
211
212 /* Must be same as MDB_db, minus md_root/md_stat */
213 typedef struct MDB_db0 {
214         unsigned int    md_flags;
215         MDB_cmp_func    *md_cmp;                /* user compare function */
216         MDB_rel_func    *md_rel;                /* user relocate function */
217         MDB_db                  *md_parent;             /* parent tree */
218         MDB_env                 *md_env;
219 } MDB_db0;
220
221 struct MDB_db {
222         unsigned int    md_flags;
223         MDB_cmp_func    *md_cmp;                /* user compare function */
224         MDB_rel_func    *md_rel;                /* user relocate function */
225         MDB_db                  *md_parent;             /* parent tree */
226         MDB_env                 *md_env;
227         MDB_stat                md_stat;
228         pgno_t                  md_root;                /* page number of root page */
229 };
230
231 struct MDB_env {
232         int                     me_fd;
233         key_t           me_shmkey;
234         uint32_t        me_flags;
235         int                     me_maxreaders;
236         int                     me_metatoggle;
237         char            *me_path;
238         char *me_map;
239         MDB_txninfo     *me_txns;
240         MDB_db0         me_db;          /* first DB, overlaps with meta */
241         MDB_meta        me_meta;
242         MDB_txn *me_txn;                /* current write transaction */
243         size_t          me_mapsize;
244         off_t           me_size;                /* current file size */
245         pthread_key_t   me_txkey;       /* thread-key for readers */
246 };
247
248 #define NODESIZE         offsetof(MDB_node, mn_data)
249
250 #define INDXSIZE(k)      (NODESIZE + ((k) == NULL ? 0 : (k)->mv_size))
251 #define LEAFSIZE(k, d)   (NODESIZE + (k)->mv_size + (d)->mv_size)
252 #define NODEPTR(p, i)    ((MDB_node *)((char *)(p) + (p)->mp_ptrs[i]))
253 #define NODEKEY(node)    (void *)((node)->mn_data)
254 #define NODEDATA(node)   (void *)((char *)(node)->mn_data + (node)->mn_ksize)
255 #define NODEPGNO(node)   ((node)->mn_pgno)
256 #define NODEDSZ(node)    ((node)->mn_dsize)
257
258 #define MDB_COMMIT_PAGES         64     /* max number of pages to write in one commit */
259 #define MDB_MAXCACHE_DEF         1024   /* max number of pages to keep in cache  */
260
261 static int  mdb_search_page_root(MDB_db *db,
262                             MDB_val *key,
263                             MDB_cursor *cursor, int modify,
264                             MDB_pageparent *mpp);
265 static int  mdb_search_page(MDB_db *db,
266                             MDB_txn *txn, MDB_val *key,
267                             MDB_cursor *cursor, int modify,
268                             MDB_pageparent *mpp);
269
270 static int  mdbenv_read_header(MDB_env *env);
271 static int  mdb_check_meta_page(MDB_page *p);
272 static int  mdbenv_read_meta(MDB_env *env);
273 static int  mdbenv_write_meta(MDB_txn *txn);
274 static MDB_page *mdbenv_get_page(MDB_env *env, pgno_t pgno);
275
276 static MDB_node *mdb_search_node(MDB_db *db, MDB_page *mp,
277                             MDB_val *key, int *exactp, unsigned int *kip);
278 static int  mdb_add_node(MDB_db *bt, MDB_page *mp,
279                             indx_t indx, MDB_val *key, MDB_val *data,
280                             pgno_t pgno, uint8_t flags);
281 static void mdb_del_node(MDB_db *bt, MDB_page *mp,
282                             indx_t indx);
283 static int  mdb_read_data(MDB_db *bt, MDB_page *mp,
284                             MDB_node *leaf, MDB_val *data);
285
286 static int               mdb_rebalance(MDB_db *bt, MDB_pageparent *mp);
287 static int               mdb_update_key(MDB_db *bt, MDB_page *mp,
288                             indx_t indx, MDB_val *key);
289 static int               mdb_move_node(MDB_db *bt, 
290                                 MDB_pageparent *src, indx_t srcindx,
291                                 MDB_pageparent *dst, indx_t dstindx);
292 static int               mdb_merge(MDB_db *bt, MDB_pageparent *src,
293                             MDB_pageparent *dst);
294 static int               mdb_split(MDB_db *bt, MDB_page **mpp,
295                             unsigned int *newindxp, MDB_val *newkey,
296                             MDB_val *newdata, pgno_t newpgno);
297 static MDB_dpage *mdbenv_new_page(MDB_env *env, uint32_t flags, int num);
298
299 static void              cursor_pop_page(MDB_cursor *cursor);
300 static MDB_ppage *cursor_push_page(MDB_cursor *cursor,
301                             MDB_page *mp);
302
303 static int               mdb_set_key(MDB_db *bt, MDB_page *mp,
304                             MDB_node *node, MDB_val *key);
305 static int               mdb_sibling(MDB_cursor *cursor, int move_right);
306 static int               mdb_cursor_next(MDB_cursor *cursor,
307                             MDB_val *key, MDB_val *data);
308 static int               mdb_cursor_set(MDB_cursor *cursor,
309                             MDB_val *key, MDB_val *data, int *exactp);
310 static int               mdb_cursor_first(MDB_cursor *cursor,
311                             MDB_val *key, MDB_val *data);
312
313 static size_t            mdb_leaf_size(MDB_db *bt, MDB_val *key,
314                             MDB_val *data);
315 static size_t            mdb_branch_size(MDB_db *bt, MDB_val *key);
316
317 static pgno_t            mdbenv_compact_tree(MDB_env *env, pgno_t pgno,
318                             MDB_env *envc);
319
320 static int               memncmp(const void *s1, size_t n1,
321                                  const void *s2, size_t n2);
322 static int               memnrcmp(const void *s1, size_t n1,
323                                   const void *s2, size_t n2);
324
325 static int
326 memncmp(const void *s1, size_t n1, const void *s2, size_t n2)
327 {
328         int diff, len_diff = -1;
329
330         if (n1 >= n2) {
331                 len_diff = (n1 > n2);
332                 n1 = n2;
333         }
334         diff = memcmp(s1, s2, n1);
335         return diff ? diff : len_diff;
336 }
337
338 static int
339 memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
340 {
341         const unsigned char     *p1, *p2, *p1_lim;
342
343         if (n2 == 0)
344                 return n1 != 0;
345         if (n1 == 0)
346                 return -1;
347
348         p1 = (const unsigned char *)s1 + n1 - 1;
349         p2 = (const unsigned char *)s2 + n2 - 1;
350
351         for (p1_lim = (n1 <= n2 ? s1 : s2);  *p1 == *p2;  p1--, p2--) {
352                 if (p1 == p1_lim)
353                         return (p1 != s1) ? (p1 != p2) : (p2 != s2) ? -1 : 0;
354         }
355         return *p1 - *p2;
356 }
357
358 int
359 mdb_cmp(MDB_db *db, const MDB_val *a, const MDB_val *b)
360 {
361         return db->md_cmp(a, b);
362 }
363
364 static int
365 _mdb_cmp(MDB_db *db, const MDB_val *key1, const MDB_val *key2)
366 {
367         if (F_ISSET(db->md_flags, MDB_REVERSEKEY))
368                 return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
369         else
370                 return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
371 }
372
373 /* Allocate new page(s) for writing */
374 static MDB_dpage *
375 mdb_newpage(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
376 {
377         MDB_dpage *dp;
378
379         if ((dp = malloc(txn->mt_env->me_meta.mm_stat.ms_psize * num + sizeof(MDB_dhead))) == NULL)
380                 return NULL;
381         dp->h.md_num = num;
382         dp->h.md_parent = parent;
383         dp->h.md_pi = parent_idx;
384         SIMPLEQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next);
385         dp->p.mp_pgno = txn->mt_next_pgno;
386         txn->mt_next_pgno += num;
387
388         return dp;
389 }
390
391 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
392  */
393 static int
394 mdb_touch(MDB_txn *txn, MDB_pageparent *pp)
395 {
396         MDB_page *mp = pp->mp_page;
397         pgno_t  pgno;
398         assert(txn != NULL);
399         assert(pp != NULL);
400
401         if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
402                 MDB_dpage *dp;
403                 DPRINTF("touching page %lu -> %lu", mp->mp_pgno, txn->mt_next_pgno);
404                 if ((dp = mdb_newpage(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL)
405                         return ENOMEM;
406                 pgno = dp->p.mp_pgno;
407                 bcopy(mp, &dp->p, txn->mt_env->me_meta.mm_stat.ms_psize);
408                 mp = &dp->p;
409                 mp->mp_pgno = pgno;
410                 mp->mp_flags |= P_DIRTY;
411
412                 /* Update the page number to new touched page. */
413                 if (pp->mp_parent != NULL)
414                         NODEPGNO(NODEPTR(pp->mp_parent, pp->mp_pi)) = mp->mp_pgno;
415                 pp->mp_page = mp;
416         }
417         return 0;
418 }
419
420 int
421 mdbenv_sync(MDB_env *env)
422 {
423         int rc = 0;
424         if (!F_ISSET(env->me_flags, MDB_NOSYNC)) {
425                 if (fsync(env->me_fd))
426                         rc = errno;
427         }
428         return rc;
429 }
430
431 int
432 mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
433 {
434         MDB_txn *txn;
435         int rc;
436
437         if ((txn = calloc(1, sizeof(*txn))) == NULL) {
438                 DPRINTF("calloc: %s", strerror(errno));
439                 return ENOMEM;
440         }
441
442         if (rdonly) {
443                 txn->mt_flags |= MDB_TXN_RDONLY;
444         } else {
445                 txn->mt_u.dirty_queue = calloc(1, sizeof(*txn->mt_u.dirty_queue));
446                 if (txn->mt_u.dirty_queue == NULL) {
447                         free(txn);
448                         return ENOMEM;
449                 }
450                 SIMPLEQ_INIT(txn->mt_u.dirty_queue);
451
452                 pthread_mutex_lock(&env->me_txns->mt_wmutex);
453                 env->me_txns->mt_txnid++;
454         }
455         txn->mt_txnid = env->me_txns->mt_txnid;
456         if (rdonly) {
457                 MDB_reader *r = pthread_getspecific(env->me_txkey);
458                 if (!r) {
459                         int i;
460                         pthread_mutex_lock(&env->me_txns->mt_mutex);
461                         for (i=0; i<env->me_maxreaders; i++) {
462                                 if (env->me_txns->mt_readers[i].mr_pid == 0) {
463                                         env->me_txns->mt_readers[i].mr_pid = getpid();
464                                         env->me_txns->mt_readers[i].mr_tid = pthread_self();
465                                         pthread_setspecific(env->me_txkey, &env->me_txns->mt_readers[i]);
466                                         if (i >= env->me_txns->mt_numreaders)
467                                                 env->me_txns->mt_numreaders = i+1;
468                                         break;
469                                 }
470                         }
471                         pthread_mutex_unlock(&env->me_txns->mt_mutex);
472                         if (i == env->me_maxreaders) {
473                                 return ENOSPC;
474                         }
475                 }
476                 r->mr_txnid = txn->mt_txnid;
477                 txn->mt_u.reader = r;
478         } else {
479                 env->me_txn = txn;
480         }
481
482         txn->mt_env = env;
483
484         if ((rc = mdbenv_read_meta(env)) != MDB_SUCCESS) {
485                 mdb_txn_abort(txn);
486                 return rc;
487         }
488
489         txn->mt_next_pgno = env->me_meta.mm_last_pg+1;
490         txn->mt_first_pgno = txn->mt_next_pgno;
491         txn->mt_root = env->me_meta.mm_root;
492         DPRINTF("begin transaction %lu on mdbenv %p, root page %lu",
493                 txn->mt_txnid, (void *) env, txn->mt_root);
494
495         *ret = txn;
496         return MDB_SUCCESS;
497 }
498
499 void
500 mdb_txn_abort(MDB_txn *txn)
501 {
502         MDB_dpage *dp;
503         MDB_env *env;
504
505         if (txn == NULL)
506                 return;
507
508         env = txn->mt_env;
509         DPRINTF("abort transaction %lu on mdbenv %p, root page %lu",
510                 txn->mt_txnid, (void *) env, txn->mt_root);
511
512         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
513                 txn->mt_u.reader->mr_txnid = 0;
514         } else {
515                 /* Discard all dirty pages.
516                  */
517                 while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
518                         dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
519                         SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
520                         free(dp);
521                 }
522
523 #if 0
524                 DPRINTF("releasing write lock on txn %p", txn);
525                 txn->bt->txn = NULL;
526                 if (flock(txn->bt->fd, LOCK_UN) != 0) {
527                         DPRINTF("failed to unlock fd %d: %s",
528                             txn->bt->fd, strerror(errno));
529                 }
530 #endif
531                 free(txn->mt_u.dirty_queue);
532                 env->me_txn = NULL;
533                 pthread_mutex_unlock(&env->me_txns->mt_wmutex);
534         }
535
536         free(txn);
537 }
538
539 int
540 mdb_txn_commit(MDB_txn *txn)
541 {
542         int              n, done;
543         ssize_t          rc;
544         off_t            size;
545         MDB_dpage       *dp;
546         MDB_env *env;
547         pgno_t  next;
548         struct iovec     iov[MDB_COMMIT_PAGES];
549
550         assert(txn != NULL);
551         assert(txn->mt_env != NULL);
552
553         env = txn->mt_env;
554
555         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
556                 DPRINTF("attempt to commit read-only transaction");
557                 mdb_txn_abort(txn);
558                 return EPERM;
559         }
560
561         if (txn != env->me_txn) {
562                 DPRINTF("attempt to commit unknown transaction");
563                 mdb_txn_abort(txn);
564                 return EINVAL;
565         }
566
567         if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
568                 DPRINTF("error flag is set, can't commit");
569                 mdb_txn_abort(txn);
570                 return EINVAL;
571         }
572
573         if (SIMPLEQ_EMPTY(txn->mt_u.dirty_queue))
574                 goto done;
575
576         DPRINTF("committing transaction %lu on mdbenv %p, root page %lu",
577             txn->mt_txnid, (void *) env, txn->mt_root);
578
579         /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
580          */
581         next = 0;
582         do {
583                 n = 0;
584                 done = 1;
585                 size = 0;
586                 SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
587                         if (dp->p.mp_pgno != next) {
588                                 lseek(env->me_fd, dp->p.mp_pgno * env->me_meta.mm_stat.ms_psize, SEEK_SET);
589                                 next = dp->p.mp_pgno;
590                                 if (n)
591                                         break;
592                         }
593                         DPRINTF("committing page %lu", dp->p.mp_pgno);
594                         iov[n].iov_len = env->me_meta.mm_stat.ms_psize * dp->h.md_num;
595                         iov[n].iov_base = &dp->p;
596                         size += iov[n].iov_len;
597                         next = dp->p.mp_pgno + dp->h.md_num;
598                         /* clear dirty flag */
599                         dp->p.mp_flags &= ~P_DIRTY;
600                         if (++n >= MDB_COMMIT_PAGES) {
601                                 done = 0;
602                                 break;
603                         }
604                 }
605
606                 if (n == 0)
607                         break;
608
609                 DPRINTF("committing %u dirty pages", n);
610                 rc = writev(env->me_fd, iov, n);
611                 if (rc != size) {
612                         n = errno;
613                         if (rc > 0)
614                                 DPRINTF("short write, filesystem full?");
615                         else
616                                 DPRINTF("writev: %s", strerror(errno));
617                         mdb_txn_abort(txn);
618                         return n;
619                 }
620
621                 /* Drop the dirty pages.
622                  */
623                 while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
624                         dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
625                         SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
626                         free(dp);
627                         if (--n == 0)
628                                 break;
629                 }
630         } while (!done);
631
632         if ((n = mdbenv_sync(env)) != 0 ||
633             (n = mdbenv_write_meta(txn)) != MDB_SUCCESS ||
634             (n = mdbenv_sync(env)) != 0) {
635                 mdb_txn_abort(txn);
636                 return n;
637         }
638         env->me_txn = NULL;
639         pthread_mutex_unlock(&env->me_txns->mt_wmutex);
640         free(txn->mt_u.dirty_queue);
641         free(txn);
642         txn = NULL;
643
644 done:
645         mdb_txn_abort(txn);
646
647         return MDB_SUCCESS;
648 }
649
650 static int
651 mdbenv_read_header(MDB_env *env)
652 {
653         char             page[PAGESIZE];
654         MDB_page        *p;
655         MDB_meta        *m;
656         int              rc;
657
658         assert(env != NULL);
659
660         /* We don't know the page size yet, so use a minimum value.
661          */
662
663         if ((rc = pread(env->me_fd, page, PAGESIZE, 0)) == 0) {
664                 return ENOENT;
665         } else if (rc != PAGESIZE) {
666                 if (rc > 0)
667                         errno = EINVAL;
668                 DPRINTF("read: %s", strerror(errno));
669                 return errno;
670         }
671
672         p = (MDB_page *)page;
673
674         if (!F_ISSET(p->mp_flags, P_META)) {
675                 DPRINTF("page %lu not a meta page", p->mp_pgno);
676                 return EINVAL;
677         }
678
679         m = METADATA(p);
680         if (m->mm_magic != MDB_MAGIC) {
681                 DPRINTF("meta has invalid magic");
682                 return EINVAL;
683         }
684
685         if (m->mm_version != MDB_VERSION) {
686                 DPRINTF("database is version %u, expected version %u",
687                     m->mm_version, MDB_VERSION);
688                 return EINVAL;
689         }
690
691         bcopy(m, &env->me_meta, sizeof(*m));
692         return 0;
693 }
694
695 static int
696 mdbenv_init_meta(MDB_env *env)
697 {
698         MDB_page *p, *q;
699         MDB_meta *meta;
700         int rc;
701         unsigned int     psize;
702
703         DPRINTF("writing new meta page");
704         psize = sysconf(_SC_PAGE_SIZE);
705
706         env->me_meta.mm_magic = MDB_MAGIC;
707         env->me_meta.mm_version = MDB_VERSION;
708         env->me_meta.mm_stat.ms_psize = psize;
709         env->me_meta.mm_stat.ms_flags = env->me_flags & 0xffff;
710         env->me_meta.mm_root = P_INVALID;
711         env->me_meta.mm_last_pg = 1;
712
713         p = calloc(2, psize);
714         p->mp_pgno = 0;
715         p->mp_flags = P_META;
716
717         meta = METADATA(p);
718         bcopy(&env->me_meta, meta, sizeof(*meta));
719
720         q = (MDB_page *)((char *)p + psize);
721
722         q->mp_pgno = 1;
723         q->mp_flags = P_META;
724
725         meta = METADATA(q);
726         bcopy(&env->me_meta, meta, sizeof(*meta));
727
728         rc = write(env->me_fd, p, psize * 2);
729         free(p);
730         return (rc == psize * 2) ? MDB_SUCCESS : errno;
731 }
732
733 static int
734 mdbenv_write_meta(MDB_txn *txn)
735 {
736         MDB_env *env;
737         MDB_meta        meta;
738         off_t off;
739         int rc;
740
741         assert(txn != NULL);
742         assert(txn->mt_env != NULL);
743
744         DPRINTF("writing meta page for root page %lu", txn->mt_root);
745
746         env = txn->mt_env;
747
748         bcopy(&env->me_meta, &meta, sizeof(meta));
749         meta.mm_root = txn->mt_root;
750         meta.mm_last_pg = txn->mt_next_pgno - 1;
751         meta.mm_txnid = txn->mt_txnid;
752
753         off = 0;
754         if (env->me_metatoggle)
755                 off = env->me_meta.mm_stat.ms_psize;
756         off += PAGEHDRSZ;
757
758         lseek(env->me_fd, off, SEEK_SET);
759         rc = write(env->me_fd, &meta, sizeof(meta));
760         if (rc != sizeof(meta)) {
761                 DPRINTF("write failed, disk error?");
762                 return errno;
763         }
764
765         return MDB_SUCCESS;
766 }
767
768 /* Returns true if page p is a valid meta page, false otherwise.
769  */
770 static int
771 mdb_check_meta_page(MDB_page *p)
772 {
773         if (!F_ISSET(p->mp_flags, P_META)) {
774                 DPRINTF("page %lu not a meta page", p->mp_pgno);
775                 return EINVAL;
776         }
777
778         return 0;
779 }
780
781 static int
782 mdbenv_read_meta(MDB_env *env)
783 {
784         MDB_page        *mp0, *mp1;
785         MDB_meta        *meta[2];
786         int toggle = 0, rc;
787
788         assert(env != NULL);
789
790         if ((mp0 = mdbenv_get_page(env, 0)) == NULL ||
791                 (mp1 = mdbenv_get_page(env, 1)) == NULL)
792                 return EIO;
793
794         rc = mdb_check_meta_page(mp0);
795         if (rc) return rc;
796
797         rc = mdb_check_meta_page(mp1);
798         if (rc) return rc;
799
800         meta[0] = METADATA(mp0);
801         meta[1] = METADATA(mp1);
802
803         if (meta[0]->mm_txnid < meta[1]->mm_txnid)
804                 toggle = 1;
805
806         if (meta[toggle]->mm_txnid > env->me_meta.mm_txnid) {
807                 bcopy(meta[toggle], &env->me_meta, sizeof(env->me_meta));
808                 env->me_metatoggle = toggle;
809         }
810
811         DPRINTF("Using meta page %d", toggle);
812
813         return MDB_SUCCESS;
814 }
815
816 int
817 mdbenv_create(MDB_env **env)
818 {
819         MDB_env *e;
820
821         e = calloc(1, sizeof(*e));
822         if (!e) return ENOMEM;
823
824         e->me_meta.mm_mapsize = DEFAULT_MAPSIZE;
825         e->me_maxreaders = DEFAULT_READERS;
826         e->me_db.md_env = e;
827         *env = e;
828         return MDB_SUCCESS;
829 }
830
831 int
832 mdbenv_set_mapsize(MDB_env *env, size_t size)
833 {
834         if (env->me_map)
835                 return EINVAL;
836         env->me_mapsize = env->me_meta.mm_mapsize = size;
837         return MDB_SUCCESS;
838 }
839
840 int
841 mdbenv_set_maxreaders(MDB_env *env, int readers)
842 {
843         env->me_maxreaders = readers;
844         return MDB_SUCCESS;
845 }
846
847 int
848 mdbenv_get_maxreaders(MDB_env *env, int *readers)
849 {
850         if (!env || !readers)
851                 return EINVAL;
852         *readers = env->me_maxreaders;
853         return MDB_SUCCESS;
854 }
855
856 int
857 mdbenv_open2(MDB_env *env, unsigned int flags)
858 {
859         int i, newenv = 0;
860
861         env->me_flags = flags;
862
863         if ((i = mdbenv_read_header(env)) != 0) {
864                 if (i != ENOENT)
865                         return i;
866                 DPRINTF("new mdbenv");
867                 newenv = 1;
868         }
869
870         if (!env->me_mapsize)
871                 env->me_mapsize = env->me_meta.mm_mapsize;
872
873         i = MAP_SHARED;
874         if (env->me_meta.mm_address && (flags & MDB_FIXEDMAP))
875                 i |= MAP_FIXED;
876         env->me_map = mmap(env->me_meta.mm_address, env->me_mapsize, PROT_READ, i,
877                 env->me_fd, 0);
878         if (env->me_map == MAP_FAILED)
879                 return errno;
880
881         if (newenv) {
882                 env->me_meta.mm_mapsize = env->me_mapsize;
883                 if (flags & MDB_FIXEDMAP)
884                         env->me_meta.mm_address = env->me_map;
885                 i = mdbenv_init_meta(env);
886                 if (i != MDB_SUCCESS) {
887                         munmap(env->me_map, env->me_mapsize);
888                         return i;
889                 }
890         }
891
892         if ((i = mdbenv_read_meta(env)) != 0)
893                 return i;
894
895         DPRINTF("opened database version %u, pagesize %u",
896             env->me_meta.mm_version, env->me_meta.mm_stat.ms_psize);
897         DPRINTF("depth: %u", env->me_meta.mm_stat.ms_depth);
898         DPRINTF("entries: %lu", env->me_meta.mm_stat.ms_entries);
899         DPRINTF("branch pages: %lu", env->me_meta.mm_stat.ms_branch_pages);
900         DPRINTF("leaf pages: %lu", env->me_meta.mm_stat.ms_leaf_pages);
901         DPRINTF("overflow pages: %lu", env->me_meta.mm_stat.ms_overflow_pages);
902         DPRINTF("root: %lu", env->me_meta.mm_root);
903
904         return MDB_SUCCESS;
905 }
906
907 static void
908 mdbenv_reader_dest(void *ptr)
909 {
910         MDB_reader *reader = ptr;
911
912         reader->mr_txnid = 0;
913         reader->mr_pid = 0;
914         reader->mr_tid = 0;
915 }
916
917 int
918 mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
919 {
920         int             oflags, rc, shmid;
921         off_t   size;
922
923
924         if (F_ISSET(flags, MDB_RDONLY))
925                 oflags = O_RDONLY;
926         else
927                 oflags = O_RDWR | O_CREAT;
928
929         if ((env->me_fd = open(path, oflags, mode)) == -1)
930                 return errno;
931
932         env->me_shmkey = ftok(path, 'm');
933         size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
934         shmid = shmget(env->me_shmkey, size, IPC_CREAT|IPC_EXCL|mode);
935         if (shmid == -1) {
936                 if (errno == EEXIST) {
937                         shmid = shmget(env->me_shmkey, size, IPC_CREAT|mode);
938                         if (shmid == -1)
939                                 return errno;
940                         env->me_txns = shmat(shmid, NULL, 0);
941                         if (env->me_txns->mt_magic != MDB_MAGIC ||
942                                 env->me_txns->mt_version != MDB_VERSION) {
943                                         DPRINTF("invalid lock region %d", shmid);
944                                         shmdt(env->me_txns);
945                                         env->me_txns = NULL;
946                                         return EIO;
947                                 }
948                 } else {
949                         return errno;
950                 }
951         } else {
952                 pthread_mutexattr_t mattr;
953
954                 env->me_txns = shmat(shmid, NULL, 0);
955                 pthread_mutexattr_init(&mattr);
956                 pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
957                 pthread_mutex_init(&env->me_txns->mt_mutex, &mattr);
958                 pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr);
959                 env->me_txns->mt_version = MDB_VERSION;
960                 env->me_txns->mt_magic = MDB_MAGIC;
961         }
962
963         if ((rc = mdbenv_open2(env, flags)) != MDB_SUCCESS) {
964                 close(env->me_fd);
965                 env->me_fd = -1;
966         } else {
967                 env->me_path = strdup(path);
968                 DPRINTF("opened dbenv %p", (void *) env);
969         }
970
971         pthread_key_create(&env->me_txkey, mdbenv_reader_dest);
972
973         return rc;
974 }
975
976 void
977 mdbenv_close(MDB_env *env)
978 {
979         if (env == NULL)
980                 return;
981
982         free(env->me_path);
983
984         if (env->me_map) {
985                 munmap(env->me_map, env->me_mapsize);
986         }
987         close(env->me_fd);
988         if (env->me_txns) {
989                 shmdt(env->me_txns);
990         }
991         free(env);
992 }
993
994 /* Search for key within a leaf page, using binary search.
995  * Returns the smallest entry larger or equal to the key.
996  * If exactp is non-null, stores whether the found entry was an exact match
997  * in *exactp (1 or 0).
998  * If kip is non-null, stores the index of the found entry in *kip.
999  * If no entry larger or equal to the key is found, returns NULL.
1000  */
1001 static MDB_node *
1002 mdb_search_node(MDB_db *bt, MDB_page *mp, MDB_val *key,
1003     int *exactp, unsigned int *kip)
1004 {
1005         unsigned int     i = 0;
1006         int              low, high;
1007         int              rc = 0;
1008         MDB_node        *node;
1009         MDB_val  nodekey;
1010
1011         DPRINTF("searching %u keys in %s page %lu",
1012             NUMKEYS(mp),
1013             IS_LEAF(mp) ? "leaf" : "branch",
1014             mp->mp_pgno);
1015
1016         assert(NUMKEYS(mp) > 0);
1017
1018         bzero(&nodekey, sizeof(nodekey));
1019
1020         low = IS_LEAF(mp) ? 0 : 1;
1021         high = NUMKEYS(mp) - 1;
1022         while (low <= high) {
1023                 i = (low + high) >> 1;
1024                 node = NODEPTR(mp, i);
1025
1026                 nodekey.mv_size = node->mn_ksize;
1027                 nodekey.mv_data = NODEKEY(node);
1028
1029                 if (bt->md_cmp)
1030                         rc = bt->md_cmp(key, &nodekey);
1031                 else
1032                         rc = _mdb_cmp(bt, key, &nodekey);
1033
1034                 if (IS_LEAF(mp))
1035                         DPRINTF("found leaf index %u [%.*s], rc = %i",
1036                             i, (int)nodekey.mv_size, (char *)nodekey.mv_data, rc);
1037                 else
1038                         DPRINTF("found branch index %u [%.*s -> %lu], rc = %i",
1039                             i, (int)node->mn_ksize, (char *)NODEKEY(node),
1040                             node->mn_pgno, rc);
1041
1042                 if (rc == 0)
1043                         break;
1044                 if (rc > 0)
1045                         low = i + 1;
1046                 else
1047                         high = i - 1;
1048         }
1049
1050         if (rc > 0) {   /* Found entry is less than the key. */
1051                 i++;    /* Skip to get the smallest entry larger than key. */
1052                 if (i >= NUMKEYS(mp))
1053                         /* There is no entry larger or equal to the key. */
1054                         return NULL;
1055         }
1056         if (exactp)
1057                 *exactp = (rc == 0);
1058         if (kip)        /* Store the key index if requested. */
1059                 *kip = i;
1060
1061         return NODEPTR(mp, i);
1062 }
1063
1064 static void
1065 cursor_pop_page(MDB_cursor *cursor)
1066 {
1067         MDB_ppage       *top;
1068
1069         top = CURSOR_TOP(cursor);
1070         CURSOR_POP(cursor);
1071
1072         DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor);
1073
1074         free(top);
1075 }
1076
1077 static MDB_ppage *
1078 cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
1079 {
1080         MDB_ppage       *ppage;
1081
1082         DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor);
1083
1084         if ((ppage = calloc(1, sizeof(*ppage))) == NULL)
1085                 return NULL;
1086         ppage->mp_page = mp;
1087         CURSOR_PUSH(cursor, ppage);
1088         return ppage;
1089 }
1090
1091 static MDB_page *
1092 mdbenv_get_page(MDB_env *env, pgno_t pgno)
1093 {
1094         MDB_page *p = NULL;
1095         MDB_txn *txn = env->me_txn;
1096
1097         if (txn && pgno >= txn->mt_first_pgno &&
1098                 !SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
1099                 MDB_dpage *dp;
1100                 SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
1101                         if (dp->p.mp_pgno == pgno) {
1102                                 p = &dp->p;
1103                                 break;
1104                         }
1105                 }
1106         } else {
1107                 p = (MDB_page *)(env->me_map + env->me_meta.mm_stat.ms_psize * pgno);
1108         }
1109         return p;
1110 }
1111
1112 static int
1113 mdb_search_page_root(MDB_db *bt, MDB_val *key,
1114     MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1115 {
1116         MDB_page        *mp = mpp->mp_page;
1117         int rc;
1118
1119         if (cursor && cursor_push_page(cursor, mp) == NULL)
1120                 return MDB_FAIL;
1121
1122         while (IS_BRANCH(mp)) {
1123                 unsigned int     i = 0;
1124                 MDB_node        *node;
1125
1126                 DPRINTF("branch page %lu has %u keys", mp->mp_pgno, NUMKEYS(mp));
1127                 assert(NUMKEYS(mp) > 1);
1128                 DPRINTF("found index 0 to page %lu", NODEPGNO(NODEPTR(mp, 0)));
1129
1130                 if (key == NULL)        /* Initialize cursor to first page. */
1131                         i = 0;
1132                 else {
1133                         int      exact;
1134                         node = mdb_search_node(bt, mp, key, &exact, &i);
1135                         if (node == NULL)
1136                                 i = NUMKEYS(mp) - 1;
1137                         else if (!exact) {
1138                                 assert(i > 0);
1139                                 i--;
1140                         }
1141                 }
1142
1143                 if (key)
1144                         DPRINTF("following index %u for key %.*s",
1145                             i, (int)key->mv_size, (char *)key->mv_data);
1146                 assert(i < NUMKEYS(mp));
1147                 node = NODEPTR(mp, i);
1148
1149                 if (cursor)
1150                         CURSOR_TOP(cursor)->mp_ki = i;
1151
1152                 mpp->mp_parent = mp;
1153                 if ((mp = mdbenv_get_page(bt->md_env, NODEPGNO(node))) == NULL)
1154                         return MDB_FAIL;
1155                 mpp->mp_pi = i;
1156                 mpp->mp_page = mp;
1157
1158                 if (cursor && cursor_push_page(cursor, mp) == NULL)
1159                         return MDB_FAIL;
1160
1161                 if (modify) {
1162                         MDB_dhead *dh = ((MDB_dhead *)mp)-1;
1163                         if ((rc = mdb_touch(bt->md_env->me_txn, mpp)) != 0)
1164                                 return rc;
1165                         dh = ((MDB_dhead *)mpp->mp_page)-1;
1166                         dh->md_parent = mpp->mp_parent;
1167                         dh->md_pi = mpp->mp_pi;
1168                 }
1169
1170                 mp = mpp->mp_page;
1171         }
1172
1173         if (!IS_LEAF(mp)) {
1174                 DPRINTF("internal error, index points to a %02X page!?",
1175                     mp->mp_flags);
1176                 return MDB_FAIL;
1177         }
1178
1179         DPRINTF("found leaf page %lu for key %.*s", mp->mp_pgno,
1180             key ? (int)key->mv_size : 0, key ? (char *)key->mv_data : NULL);
1181
1182         return MDB_SUCCESS;
1183 }
1184
1185 /* Search for the page a given key should be in.
1186  * Stores a pointer to the found page in *mpp.
1187  * If key is NULL, search for the lowest page (used by mdb_cursor_first).
1188  * If cursor is non-null, pushes parent pages on the cursor stack.
1189  * If modify is true, visited pages are updated with new page numbers.
1190  */
1191 static int
1192 mdb_search_page(MDB_db *db, MDB_txn *txn, MDB_val *key,
1193     MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1194 {
1195         int              rc;
1196         pgno_t           root;
1197
1198         /* Can't modify pages outside a transaction. */
1199         if (txn == NULL && modify) {
1200                 return EINVAL;
1201         }
1202
1203         /* Choose which root page to start with. If a transaction is given
1204          * use the root page from the transaction, otherwise read the last
1205          * committed root page.
1206          */
1207         if (txn == NULL) {
1208                 if ((rc = mdbenv_read_meta(db->md_env)) != MDB_SUCCESS)
1209                         return rc;
1210                 root = db->md_env->me_meta.mm_root;
1211         } else if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
1212                 DPRINTF("transaction has failed, must abort");
1213                 return EINVAL;
1214         } else
1215                 root = txn->mt_root;
1216
1217         if (root == P_INVALID) {                /* Tree is empty. */
1218                 DPRINTF("tree is empty");
1219                 return ENOENT;
1220         }
1221
1222         if ((mpp->mp_page = mdbenv_get_page(db->md_env, root)) == NULL)
1223                 return MDB_FAIL;
1224
1225         DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags);
1226
1227         if (modify && !F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
1228                 mpp->mp_parent = NULL;
1229                 mpp->mp_pi = 0;
1230                 if ((rc = mdb_touch(txn, mpp)))
1231                         return rc;
1232                 txn->mt_root = mpp->mp_page->mp_pgno;
1233         }
1234
1235         return mdb_search_page_root(db, key, cursor, modify, mpp);
1236 }
1237
1238 static int
1239 mdb_read_data(MDB_db *db, MDB_page *mp, MDB_node *leaf,
1240     MDB_val *data)
1241 {
1242         MDB_page        *omp;           /* overflow mpage */
1243         size_t           max;
1244         pgno_t           pgno;
1245
1246         bzero(data, sizeof(*data));
1247         max = db->md_env->me_meta.mm_stat.ms_psize - PAGEHDRSZ;
1248
1249         if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
1250                 data->mv_size = leaf->mn_dsize;
1251                 data->mv_data = NODEDATA(leaf);
1252                 return MDB_SUCCESS;
1253         }
1254
1255         /* Read overflow data.
1256          */
1257         data->mv_size = leaf->mn_dsize;
1258         bcopy(NODEDATA(leaf), &pgno, sizeof(pgno));
1259         if ((omp = mdbenv_get_page(db->md_env, pgno)) == NULL) {
1260                 DPRINTF("read overflow page %lu failed", pgno);
1261                 return MDB_FAIL;
1262         }
1263         data->mv_data = omp;
1264
1265         return MDB_SUCCESS;
1266 }
1267
1268 int
1269 mdb_get(MDB_db *db, MDB_txn *txn,
1270     MDB_val *key, MDB_val *data)
1271 {
1272         int              rc, exact;
1273         MDB_node        *leaf;
1274         MDB_pageparent mpp;
1275
1276         assert(key);
1277         assert(data);
1278         DPRINTF("===> get key [%.*s]", (int)key->mv_size, (char *)key->mv_data);
1279
1280         if (db == NULL)
1281                 return EINVAL;
1282
1283         if (txn != NULL && db->md_env != txn->mt_env) {
1284                 return EINVAL;
1285         }
1286
1287         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
1288                 return EINVAL;
1289         }
1290
1291         if ((rc = mdb_search_page(db, txn, key, NULL, 0, &mpp)) != MDB_SUCCESS)
1292                 return rc;
1293
1294         leaf = mdb_search_node(db, mpp.mp_page, key, &exact, NULL);
1295         if (leaf && exact)
1296                 rc = mdb_read_data(db, mpp.mp_page, leaf, data);
1297         else {
1298                 rc = ENOENT;
1299         }
1300
1301         return rc;
1302 }
1303
1304 static int
1305 mdb_sibling(MDB_cursor *cursor, int move_right)
1306 {
1307         int              rc;
1308         MDB_node        *indx;
1309         MDB_ppage       *parent, *top;
1310         MDB_page        *mp;
1311
1312         top = CURSOR_TOP(cursor);
1313         if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) {
1314                 return ENOENT;          /* root has no siblings */
1315         }
1316
1317         DPRINTF("parent page is page %lu, index %u",
1318             parent->mp_page->mp_pgno, parent->mp_ki);
1319
1320         cursor_pop_page(cursor);
1321         if (move_right ? (parent->mp_ki + 1 >= NUMKEYS(parent->mp_page))
1322                        : (parent->mp_ki == 0)) {
1323                 DPRINTF("no more keys left, moving to %s sibling",
1324                     move_right ? "right" : "left");
1325                 if ((rc = mdb_sibling(cursor, move_right)) != MDB_SUCCESS)
1326                         return rc;
1327                 parent = CURSOR_TOP(cursor);
1328         } else {
1329                 if (move_right)
1330                         parent->mp_ki++;
1331                 else
1332                         parent->mp_ki--;
1333                 DPRINTF("just moving to %s index key %u",
1334                     move_right ? "right" : "left", parent->mp_ki);
1335         }
1336         assert(IS_BRANCH(parent->mp_page));
1337
1338         indx = NODEPTR(parent->mp_page, parent->mp_ki);
1339         if ((mp = mdbenv_get_page(cursor->mc_db->md_env, indx->mn_pgno)) == NULL)
1340                 return MDB_FAIL;
1341 #if 0
1342         mp->parent = parent->mp_page;
1343         mp->parent_index = parent->mp_ki;
1344 #endif
1345
1346         cursor_push_page(cursor, mp);
1347
1348         return MDB_SUCCESS;
1349 }
1350
1351 static int
1352 mdb_set_key(MDB_db *bt, MDB_page *mp, MDB_node *node,
1353     MDB_val *key)
1354 {
1355         if (key == NULL)
1356                 return 0;
1357
1358         key->mv_size = node->mn_ksize;
1359         key->mv_data = NODEKEY(node);
1360
1361         return 0;
1362 }
1363
1364 static int
1365 mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
1366 {
1367         MDB_ppage       *top;
1368         MDB_page        *mp;
1369         MDB_node        *leaf;
1370
1371         if (cursor->mc_eof) {
1372                 return ENOENT;
1373         }
1374
1375         assert(cursor->mc_initialized);
1376
1377         top = CURSOR_TOP(cursor);
1378         mp = top->mp_page;
1379
1380         DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
1381
1382         if (top->mp_ki + 1 >= NUMKEYS(mp)) {
1383                 DPRINTF("=====> move to next sibling page");
1384                 if (mdb_sibling(cursor, 1) != MDB_SUCCESS) {
1385                         cursor->mc_eof = 1;
1386                         return ENOENT;
1387                 }
1388                 top = CURSOR_TOP(cursor);
1389                 mp = top->mp_page;
1390                 DPRINTF("next page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
1391         } else
1392                 top->mp_ki++;
1393
1394         DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
1395             mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
1396
1397         assert(IS_LEAF(mp));
1398         leaf = NODEPTR(mp, top->mp_ki);
1399
1400         if (data && mdb_read_data(cursor->mc_db, mp, leaf, data) != MDB_SUCCESS)
1401                 return MDB_FAIL;
1402
1403         return mdb_set_key(cursor->mc_db, mp, leaf, key);
1404 }
1405
1406 static int
1407 mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
1408     int *exactp)
1409 {
1410         int              rc;
1411         MDB_node        *leaf;
1412         MDB_ppage       *top;
1413         MDB_pageparent mpp;
1414
1415         assert(cursor);
1416         assert(key);
1417         assert(key->mv_size > 0);
1418
1419         rc = mdb_search_page(cursor->mc_db, cursor->mc_txn, key, cursor, 0, &mpp);
1420         if (rc != MDB_SUCCESS)
1421                 return rc;
1422         assert(IS_LEAF(mpp.mp_page));
1423
1424         top = CURSOR_TOP(cursor);
1425         leaf = mdb_search_node(cursor->mc_db, mpp.mp_page, key, exactp, &top->mp_ki);
1426         if (exactp != NULL && !*exactp) {
1427                 /* MDB_CURSOR_EXACT specified and not an exact match. */
1428                 return ENOENT;
1429         }
1430
1431         if (leaf == NULL) {
1432                 DPRINTF("===> inexact leaf not found, goto sibling");
1433                 if ((rc = mdb_sibling(cursor, 1)) != MDB_SUCCESS)
1434                         return rc;              /* no entries matched */
1435                 top = CURSOR_TOP(cursor);
1436                 top->mp_ki = 0;
1437                 mpp.mp_page = top->mp_page;
1438                 assert(IS_LEAF(mpp.mp_page));
1439                 leaf = NODEPTR(mpp.mp_page, 0);
1440         }
1441
1442         cursor->mc_initialized = 1;
1443         cursor->mc_eof = 0;
1444
1445         if (data && (rc = mdb_read_data(cursor->mc_db, mpp.mp_page, leaf, data)) != MDB_SUCCESS)
1446                 return rc;
1447
1448         rc = mdb_set_key(cursor->mc_db, mpp.mp_page, leaf, key);
1449         if (rc == MDB_SUCCESS) {
1450                 DPRINTF("==> cursor placed on key %.*s",
1451                         (int)key->mv_size, (char *)key->mv_data);
1452                 ;
1453         }
1454
1455         return rc;
1456 }
1457
1458 static int
1459 mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
1460 {
1461         int              rc;
1462         MDB_pageparent  mpp;
1463         MDB_node        *leaf;
1464
1465         rc = mdb_search_page(cursor->mc_db, cursor->mc_txn, NULL, cursor, 0, &mpp);
1466         if (rc != MDB_SUCCESS)
1467                 return rc;
1468         assert(IS_LEAF(mpp.mp_page));
1469
1470         leaf = NODEPTR(mpp.mp_page, 0);
1471         cursor->mc_initialized = 1;
1472         cursor->mc_eof = 0;
1473
1474         if (data && (rc = mdb_read_data(cursor->mc_db, mpp.mp_page, leaf, data)) != MDB_SUCCESS)
1475                 return rc;
1476
1477         return mdb_set_key(cursor->mc_db, mpp.mp_page, leaf, key);
1478 }
1479
1480 int
1481 mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
1482     MDB_cursor_op op)
1483 {
1484         int              rc;
1485         int              exact = 0;
1486
1487         assert(cursor);
1488
1489         switch (op) {
1490         case MDB_CURSOR:
1491         case MDB_CURSOR_EXACT:
1492                 while (CURSOR_TOP(cursor) != NULL)
1493                         cursor_pop_page(cursor);
1494                 if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
1495                         rc = EINVAL;
1496                 } else if (op == MDB_CURSOR_EXACT)
1497                         rc = mdb_cursor_set(cursor, key, data, &exact);
1498                 else
1499                         rc = mdb_cursor_set(cursor, key, data, NULL);
1500                 break;
1501         case MDB_NEXT:
1502                 if (!cursor->mc_initialized)
1503                         rc = mdb_cursor_first(cursor, key, data);
1504                 else
1505                         rc = mdb_cursor_next(cursor, key, data);
1506                 break;
1507         case MDB_FIRST:
1508                 while (CURSOR_TOP(cursor) != NULL)
1509                         cursor_pop_page(cursor);
1510                 rc = mdb_cursor_first(cursor, key, data);
1511                 break;
1512         default:
1513                 DPRINTF("unhandled/unimplemented cursor operation %u", op);
1514                 rc = EINVAL;
1515                 break;
1516         }
1517
1518         return rc;
1519 }
1520
1521 /* Allocate a page and initialize it
1522  */
1523 static MDB_dpage *
1524 mdbenv_new_page(MDB_env *env, uint32_t flags, int num)
1525 {
1526         MDB_dpage       *dp;
1527
1528         assert(env != NULL);
1529         assert(env->me_txn != NULL);
1530
1531         DPRINTF("allocating new mpage %lu, page size %u",
1532             env->me_txn->mt_next_pgno, env->me_meta.mm_stat.ms_psize);
1533         if ((dp = mdb_newpage(env->me_txn, NULL, 0, num)) == NULL)
1534                 return NULL;
1535         dp->p.mp_flags = flags | P_DIRTY;
1536         dp->p.mp_lower = PAGEHDRSZ;
1537         dp->p.mp_upper = env->me_meta.mm_stat.ms_psize;
1538
1539         if (IS_BRANCH(&dp->p))
1540                 env->me_meta.mm_stat.ms_branch_pages++;
1541         else if (IS_LEAF(&dp->p))
1542                 env->me_meta.mm_stat.ms_leaf_pages++;
1543         else if (IS_OVERFLOW(&dp->p)) {
1544                 env->me_meta.mm_stat.ms_overflow_pages += num;
1545                 dp->p.mp_pages = num;
1546         }
1547
1548         return dp;
1549 }
1550
1551 static size_t
1552 mdb_leaf_size(MDB_db *db, MDB_val *key, MDB_val *data)
1553 {
1554         size_t           sz;
1555
1556         sz = LEAFSIZE(key, data);
1557         if (data->mv_size >= db->md_env->me_meta.mm_stat.ms_psize / MDB_MINKEYS) {
1558                 /* put on overflow page */
1559                 sz -= data->mv_size - sizeof(pgno_t);
1560         }
1561
1562         return sz + sizeof(indx_t);
1563 }
1564
1565 static size_t
1566 mdb_branch_size(MDB_db *db, MDB_val *key)
1567 {
1568         size_t           sz;
1569
1570         sz = INDXSIZE(key);
1571         if (sz >= db->md_env->me_meta.mm_stat.ms_psize / MDB_MINKEYS) {
1572                 /* put on overflow page */
1573                 /* not implemented */
1574                 /* sz -= key->size - sizeof(pgno_t); */
1575         }
1576
1577         return sz + sizeof(indx_t);
1578 }
1579
1580 static int
1581 mdb_add_node(MDB_db *db, MDB_page *mp, indx_t indx,
1582     MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags)
1583 {
1584         unsigned int     i;
1585         size_t           node_size = NODESIZE;
1586         indx_t           ofs;
1587         MDB_node        *node;
1588         MDB_dpage       *ofp = NULL;            /* overflow page */
1589
1590         assert(mp->mp_upper >= mp->mp_lower);
1591
1592         DPRINTF("add node [%.*s] to %s page %lu at index %i, key size %zu",
1593             key ? (int)key->mv_size : 0, key ? (char *)key->mv_data : NULL,
1594             IS_LEAF(mp) ? "leaf" : "branch",
1595             mp->mp_pgno, indx, key ? key->mv_size : 0);
1596
1597         if (key != NULL)
1598                 node_size += key->mv_size;
1599
1600         if (IS_LEAF(mp)) {
1601                 assert(data);
1602                 if (F_ISSET(flags, F_BIGDATA)) {
1603                         /* Data already on overflow page. */
1604                         node_size += sizeof(pgno_t);
1605                 } else if (data->mv_size >= db->md_env->me_meta.mm_stat.ms_psize / MDB_MINKEYS) {
1606                         int ovpages = PAGEHDRSZ + data->mv_size + db->md_env->me_meta.mm_stat.ms_psize - 1;
1607                         ovpages /= db->md_env->me_meta.mm_stat.ms_psize;
1608                         /* Put data on overflow page. */
1609                         DPRINTF("data size is %zu, put on overflow page",
1610                             data->mv_size);
1611                         node_size += sizeof(pgno_t);
1612                         if ((ofp = mdbenv_new_page(db->md_env, P_OVERFLOW, ovpages)) == NULL)
1613                                 return MDB_FAIL;
1614                         DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno);
1615                         flags |= F_BIGDATA;
1616                 } else {
1617                         node_size += data->mv_size;
1618                 }
1619         }
1620
1621         if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
1622                 DPRINTF("not enough room in page %lu, got %u ptrs",
1623                     mp->mp_pgno, NUMKEYS(mp));
1624                 DPRINTF("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower,
1625                     mp->mp_upper - mp->mp_lower);
1626                 DPRINTF("node size = %zu", node_size);
1627                 return ENOSPC;
1628         }
1629
1630         /* Move higher pointers up one slot. */
1631         for (i = NUMKEYS(mp); i > indx; i--)
1632                 mp->mp_ptrs[i] = mp->mp_ptrs[i - 1];
1633
1634         /* Adjust free space offsets. */
1635         ofs = mp->mp_upper - node_size;
1636         assert(ofs >= mp->mp_lower + sizeof(indx_t));
1637         mp->mp_ptrs[indx] = ofs;
1638         mp->mp_upper = ofs;
1639         mp->mp_lower += sizeof(indx_t);
1640
1641         /* Write the node data. */
1642         node = NODEPTR(mp, indx);
1643         node->mn_ksize = (key == NULL) ? 0 : key->mv_size;
1644         node->mn_flags = flags;
1645         if (IS_LEAF(mp))
1646                 node->mn_dsize = data->mv_size;
1647         else
1648                 node->mn_pgno = pgno;
1649
1650         if (key)
1651                 bcopy(key->mv_data, NODEKEY(node), key->mv_size);
1652
1653         if (IS_LEAF(mp)) {
1654                 assert(key);
1655                 if (ofp == NULL) {
1656                         if (F_ISSET(flags, F_BIGDATA))
1657                                 bcopy(data->mv_data, node->mn_data + key->mv_size,
1658                                     sizeof(pgno_t));
1659                         else
1660                                 bcopy(data->mv_data, node->mn_data + key->mv_size,
1661                                     data->mv_size);
1662                 } else {
1663                         bcopy(&ofp->p.mp_pgno, node->mn_data + key->mv_size,
1664                             sizeof(pgno_t));
1665                         bcopy(data->mv_data, METADATA(&ofp->p), data->mv_size);
1666                 }
1667         }
1668
1669         return MDB_SUCCESS;
1670 }
1671
1672 static void
1673 mdb_del_node(MDB_db *db, MDB_page *mp, indx_t indx)
1674 {
1675         unsigned int     sz;
1676         indx_t           i, j, numkeys, ptr;
1677         MDB_node        *node;
1678         char            *base;
1679
1680         DPRINTF("delete node %u on %s page %lu", indx,
1681             IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno);
1682         assert(indx < NUMKEYS(mp));
1683
1684         node = NODEPTR(mp, indx);
1685         sz = NODESIZE + node->mn_ksize;
1686         if (IS_LEAF(mp)) {
1687                 if (F_ISSET(node->mn_flags, F_BIGDATA))
1688                         sz += sizeof(pgno_t);
1689                 else
1690                         sz += NODEDSZ(node);
1691         }
1692
1693         ptr = mp->mp_ptrs[indx];
1694         numkeys = NUMKEYS(mp);
1695         for (i = j = 0; i < numkeys; i++) {
1696                 if (i != indx) {
1697                         mp->mp_ptrs[j] = mp->mp_ptrs[i];
1698                         if (mp->mp_ptrs[i] < ptr)
1699                                 mp->mp_ptrs[j] += sz;
1700                         j++;
1701                 }
1702         }
1703
1704         base = (char *)mp + mp->mp_upper;
1705         bcopy(base, base + sz, ptr - mp->mp_upper);
1706
1707         mp->mp_lower -= sizeof(indx_t);
1708         mp->mp_upper += sz;
1709 }
1710
1711 int
1712 mdb_cursor_open(MDB_db *db, MDB_txn *txn, MDB_cursor **ret)
1713 {
1714         MDB_cursor      *cursor;
1715
1716         if (db == NULL || ret == NULL)
1717                 return EINVAL;
1718
1719         if (txn != NULL && db->md_env != txn->mt_env) {
1720                 return EINVAL;
1721         }
1722
1723         if ((cursor = calloc(1, sizeof(*cursor))) != NULL) {
1724                 SLIST_INIT(&cursor->mc_stack);
1725                 cursor->mc_db = db;
1726                 cursor->mc_txn = txn;
1727         }
1728
1729         *ret = cursor;
1730
1731         return MDB_SUCCESS;
1732 }
1733
1734 void
1735 mdb_cursor_close(MDB_cursor *cursor)
1736 {
1737         if (cursor != NULL) {
1738                 while (!CURSOR_EMPTY(cursor))
1739                         cursor_pop_page(cursor);
1740
1741 /*              btree_close(cursor->bt); */
1742                 free(cursor);
1743         }
1744 }
1745
1746 static int
1747 mdb_update_key(MDB_db *db, MDB_page *mp, indx_t indx,
1748     MDB_val *key)
1749 {
1750         indx_t                   ptr, i, numkeys;
1751         int                      delta;
1752         size_t                   len;
1753         MDB_node                *node;
1754         char                    *base;
1755
1756         node = NODEPTR(mp, indx);
1757         ptr = mp->mp_ptrs[indx];
1758         DPRINTF("update key %u (ofs %u) [%.*s] to [%.*s] on page %lu",
1759             indx, ptr,
1760             (int)node->mn_ksize, (char *)NODEKEY(node),
1761             (int)key->mv_size, (char *)key->mv_data,
1762             mp->mp_pgno);
1763
1764         delta = key->mv_size - node->mn_ksize;
1765         if (delta) {
1766                 if (delta > 0 && SIZELEFT(mp) < delta) {
1767                         DPRINTF("OUCH! Not enough room, delta = %d", delta);
1768                         return ENOSPC;
1769                 }
1770
1771                 numkeys = NUMKEYS(mp);
1772                 for (i = 0; i < numkeys; i++) {
1773                         if (mp->mp_ptrs[i] <= ptr)
1774                                 mp->mp_ptrs[i] -= delta;
1775                 }
1776
1777                 base = (char *)mp + mp->mp_upper;
1778                 len = ptr - mp->mp_upper + NODESIZE;
1779                 bcopy(base, base - delta, len);
1780                 mp->mp_upper -= delta;
1781
1782                 node = NODEPTR(mp, indx);
1783                 node->mn_ksize = key->mv_size;
1784         }
1785
1786         bcopy(key->mv_data, NODEKEY(node), key->mv_size);
1787
1788         return MDB_SUCCESS;
1789 }
1790
1791 /* Move a node from src to dst.
1792  */
1793 static int
1794 mdb_move_node(MDB_db *bt, MDB_pageparent *src, indx_t srcindx,
1795     MDB_pageparent *dst, indx_t dstindx)
1796 {
1797         int                      rc;
1798         MDB_node                *srcnode;
1799         MDB_val          key, data;
1800
1801         srcnode = NODEPTR(src->mp_page, srcindx);
1802         DPRINTF("moving %s node %u [%.*s] on page %lu to node %u on page %lu",
1803             IS_LEAF(src->mp_page) ? "leaf" : "branch",
1804             srcindx,
1805             (int)srcnode->mn_ksize, (char *)NODEKEY(srcnode),
1806             src->mp_page->mp_pgno,
1807             dstindx, dst->mp_page->mp_pgno);
1808
1809         /* Mark src and dst as dirty. */
1810         if ((rc = mdb_touch(bt->md_env->me_txn, src)) ||
1811             (rc = mdb_touch(bt->md_env->me_txn, dst)))
1812                 return rc;;
1813
1814         /* Add the node to the destination page.
1815          */
1816         key.mv_size = srcnode->mn_ksize;
1817         key.mv_data = NODEKEY(srcnode);
1818         data.mv_size = NODEDSZ(srcnode);
1819         data.mv_data = NODEDATA(srcnode);
1820         rc = mdb_add_node(bt, dst->mp_page, dstindx, &key, &data, NODEPGNO(srcnode),
1821             srcnode->mn_flags);
1822         if (rc != MDB_SUCCESS)
1823                 return rc;
1824
1825         /* Delete the node from the source page.
1826          */
1827         mdb_del_node(bt, src->mp_page, srcindx);
1828
1829         /* Update the parent separators.
1830          */
1831         if (srcindx == 0 && src->mp_pi != 0) {
1832                 DPRINTF("update separator for source page %lu to [%.*s]",
1833                     src->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data);
1834                 if ((rc = mdb_update_key(bt, src->mp_parent, src->mp_pi,
1835                     &key)) != MDB_SUCCESS)
1836                         return rc;
1837         }
1838
1839         if (srcindx == 0 && IS_BRANCH(src->mp_page)) {
1840                 MDB_val  nullkey;
1841                 nullkey.mv_size = 0;
1842                 assert(mdb_update_key(bt, src->mp_page, 0, &nullkey) == MDB_SUCCESS);
1843         }
1844
1845         if (dstindx == 0 && dst->mp_pi != 0) {
1846                 DPRINTF("update separator for destination page %lu to [%.*s]",
1847                     dst->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data);
1848                 if ((rc = mdb_update_key(bt, dst->mp_parent, dst->mp_pi,
1849                     &key)) != MDB_SUCCESS)
1850                         return rc;
1851         }
1852
1853         if (dstindx == 0 && IS_BRANCH(dst->mp_page)) {
1854                 MDB_val  nullkey;
1855                 nullkey.mv_size = 0;
1856                 assert(mdb_update_key(bt, dst->mp_page, 0, &nullkey) == MDB_SUCCESS);
1857         }
1858
1859         return MDB_SUCCESS;
1860 }
1861
1862 static int
1863 mdb_merge(MDB_db *bt, MDB_pageparent *src, MDB_pageparent *dst)
1864 {
1865         int                      rc;
1866         indx_t                   i;
1867         MDB_node                *srcnode;
1868         MDB_val          key, data;
1869         MDB_pageparent  mpp;
1870         MDB_dhead *dh;
1871
1872         DPRINTF("merging page %lu and %lu", src->mp_page->mp_pgno, dst->mp_page->mp_pgno);
1873
1874         assert(src->mp_parent); /* can't merge root page */
1875         assert(dst->mp_parent);
1876         assert(bt->md_env->me_txn != NULL);
1877
1878         /* Mark src and dst as dirty. */
1879         if ((rc = mdb_touch(bt->md_env->me_txn, src)) ||
1880             (rc = mdb_touch(bt->md_env->me_txn, dst)))
1881                 return rc;
1882
1883         /* Move all nodes from src to dst.
1884          */
1885         for (i = 0; i < NUMKEYS(src->mp_page); i++) {
1886                 srcnode = NODEPTR(src->mp_page, i);
1887
1888                 key.mv_size = srcnode->mn_ksize;
1889                 key.mv_data = NODEKEY(srcnode);
1890                 data.mv_size = NODEDSZ(srcnode);
1891                 data.mv_data = NODEDATA(srcnode);
1892                 rc = mdb_add_node(bt, dst->mp_page, NUMKEYS(dst->mp_page), &key,
1893                     &data, NODEPGNO(srcnode), srcnode->mn_flags);
1894                 if (rc != MDB_SUCCESS)
1895                         return rc;
1896         }
1897
1898         DPRINTF("dst page %lu now has %u keys (%.1f%% filled)",
1899             dst->mp_page->mp_pgno, NUMKEYS(dst->mp_page), (float)PAGEFILL(bt->md_env, dst->mp_page) / 10);
1900
1901         /* Unlink the src page from parent.
1902          */
1903         mdb_del_node(bt, src->mp_parent, src->mp_pi);
1904         if (src->mp_pi == 0) {
1905                 key.mv_size = 0;
1906                 if ((rc = mdb_update_key(bt, src->mp_parent, 0, &key)) != MDB_SUCCESS)
1907                         return rc;
1908         }
1909
1910         if (IS_LEAF(src->mp_page))
1911                 bt->md_env->me_meta.mm_stat.ms_leaf_pages--;
1912         else
1913                 bt->md_env->me_meta.mm_stat.ms_branch_pages--;
1914
1915         mpp.mp_page = src->mp_parent;
1916         dh = (MDB_dhead *)src->mp_parent;
1917         dh--;
1918         mpp.mp_parent = dh->md_parent;
1919         mpp.mp_pi = dh->md_pi;
1920
1921         return mdb_rebalance(bt, &mpp);
1922 }
1923
1924 #define FILL_THRESHOLD   250
1925
1926 static int
1927 mdb_rebalance(MDB_db *db, MDB_pageparent *mpp)
1928 {
1929         MDB_node        *node;
1930         MDB_page        *root;
1931         MDB_pageparent npp;
1932         indx_t           si = 0, di = 0;
1933
1934         assert(db != NULL);
1935         assert(db->md_env->me_txn != NULL);
1936         assert(mpp != NULL);
1937
1938         DPRINTF("rebalancing %s page %lu (has %u keys, %.1f%% full)",
1939             IS_LEAF(mpp->mp_page) ? "leaf" : "branch",
1940             mpp->mp_page->mp_pgno, NUMKEYS(mpp->mp_page), (float)PAGEFILL(db->md_env, mpp->mp_page) / 10);
1941
1942         if (PAGEFILL(db->md_env, mpp->mp_page) >= FILL_THRESHOLD) {
1943                 DPRINTF("no need to rebalance page %lu, above fill threshold",
1944                     mpp->mp_page->mp_pgno);
1945                 return MDB_SUCCESS;
1946         }
1947
1948         if (mpp->mp_parent == NULL) {
1949                 if (NUMKEYS(mpp->mp_page) == 0) {
1950                         DPRINTF("tree is completely empty");
1951                         db->md_env->me_txn->mt_root = P_INVALID;
1952                         db->md_env->me_meta.mm_stat.ms_depth--;
1953                         db->md_env->me_meta.mm_stat.ms_leaf_pages--;
1954                 } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
1955                         DPRINTF("collapsing root page!");
1956                         db->md_env->me_txn->mt_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
1957                         if ((root = mdbenv_get_page(db->md_env, db->md_env->me_txn->mt_root)) == NULL)
1958                                 return MDB_FAIL;
1959                         db->md_env->me_meta.mm_stat.ms_depth--;
1960                         db->md_env->me_meta.mm_stat.ms_branch_pages--;
1961                 } else
1962                         DPRINTF("root page doesn't need rebalancing");
1963                 return MDB_SUCCESS;
1964         }
1965
1966         /* The parent (branch page) must have at least 2 pointers,
1967          * otherwise the tree is invalid.
1968          */
1969         assert(NUMKEYS(mpp->mp_parent) > 1);
1970
1971         /* Leaf page fill factor is below the threshold.
1972          * Try to move keys from left or right neighbor, or
1973          * merge with a neighbor page.
1974          */
1975
1976         /* Find neighbors.
1977          */
1978         if (mpp->mp_pi == 0) {
1979                 /* We're the leftmost leaf in our parent.
1980                  */
1981                 DPRINTF("reading right neighbor");
1982                 node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
1983                 if ((npp.mp_page = mdbenv_get_page(db->md_env, NODEPGNO(node))) == NULL)
1984                         return MDB_FAIL;
1985                 npp.mp_pi = mpp->mp_pi + 1;
1986                 si = 0;
1987                 di = NUMKEYS(mpp->mp_page);
1988         } else {
1989                 /* There is at least one neighbor to the left.
1990                  */
1991                 DPRINTF("reading left neighbor");
1992                 node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
1993                 if ((npp.mp_page = mdbenv_get_page(db->md_env, NODEPGNO(node))) == NULL)
1994                         return MDB_FAIL;
1995                 npp.mp_pi = mpp->mp_pi - 1;
1996                 si = NUMKEYS(npp.mp_page) - 1;
1997                 di = 0;
1998         }
1999         npp.mp_parent = mpp->mp_parent;
2000
2001         DPRINTF("found neighbor page %lu (%u keys, %.1f%% full)",
2002             npp.mp_page->mp_pgno, NUMKEYS(npp.mp_page), (float)PAGEFILL(db->md_env, npp.mp_page) / 10);
2003
2004         /* If the neighbor page is above threshold and has at least two
2005          * keys, move one key from it.
2006          *
2007          * Otherwise we should try to merge them.
2008          */
2009         if (PAGEFILL(db->md_env, npp.mp_page) >= FILL_THRESHOLD && NUMKEYS(npp.mp_page) >= 2)
2010                 return mdb_move_node(db, &npp, si, mpp, di);
2011         else { /* FIXME: if (has_enough_room()) */
2012                 if (mpp->mp_pi == 0)
2013                         return mdb_merge(db, &npp, mpp);
2014                 else
2015                         return mdb_merge(db, mpp, &npp);
2016         }
2017 }
2018
2019 int
2020 mdb_del(MDB_db *bt, MDB_txn *txn,
2021     MDB_val *key, MDB_val *data)
2022 {
2023         int              rc, exact;
2024         unsigned int     ki;
2025         MDB_node        *leaf;
2026         MDB_pageparent  mpp;
2027
2028         DPRINTF("========> delete key %.*s", (int)key->mv_size, (char *)key->mv_data);
2029
2030         assert(key != NULL);
2031
2032         if (bt == NULL || txn == NULL)
2033                 return EINVAL;
2034
2035         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
2036                 return EINVAL;
2037         }
2038
2039         if (bt->md_env->me_txn != txn) {
2040                 return EINVAL;
2041         }
2042
2043         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2044                 return EINVAL;
2045         }
2046
2047         if ((rc = mdb_search_page(bt, txn, key, NULL, 1, &mpp)) != MDB_SUCCESS)
2048                 return rc;
2049
2050         leaf = mdb_search_node(bt, mpp.mp_page, key, &exact, &ki);
2051         if (leaf == NULL || !exact) {
2052                 return ENOENT;
2053         }
2054
2055         if (data && (rc = mdb_read_data(bt, NULL, leaf, data)) != MDB_SUCCESS)
2056                 return rc;
2057
2058         mdb_del_node(bt, mpp.mp_page, ki);
2059         bt->md_env->me_meta.mm_stat.ms_entries--;
2060         rc = mdb_rebalance(bt, &mpp);
2061         if (rc != MDB_SUCCESS)
2062                 txn->mt_flags |= MDB_TXN_ERROR;
2063
2064         return rc;
2065 }
2066
2067 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
2068  * right sibling, at index <*newindxp> (as if unsplit). Updates *mpp and
2069  * *newindxp with the actual values after split, ie if *mpp and *newindxp
2070  * refer to a node in the new right sibling page.
2071  */
2072 static int
2073 mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp,
2074     MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
2075 {
2076         uint8_t          flags;
2077         int              rc = MDB_SUCCESS, ins_new = 0;
2078         indx_t           newindx;
2079         pgno_t           pgno = 0;
2080         unsigned int     i, j, split_indx;
2081         MDB_node        *node;
2082         MDB_val  sepkey, rkey, rdata;
2083         MDB_page        *copy;
2084         MDB_dpage       *mdp, *rdp, *pdp;
2085         MDB_dhead *dh;
2086
2087         assert(bt != NULL);
2088         assert(bt->md_env != NULL);
2089
2090         dh = ((MDB_dhead *)*mpp) - 1;
2091         mdp = (MDB_dpage *)dh;
2092         newindx = *newindxp;
2093
2094         DPRINTF("-----> splitting %s page %lu and adding [%.*s] at index %i",
2095             IS_LEAF(&mdp->p) ? "leaf" : "branch", mdp->p.mp_pgno,
2096             (int)newkey->mv_size, (char *)newkey->mv_data, *newindxp);
2097
2098         if (mdp->h.md_parent == NULL) {
2099                 if ((pdp = mdbenv_new_page(bt->md_env, P_BRANCH, 1)) == NULL)
2100                         return MDB_FAIL;
2101                 mdp->h.md_pi = 0;
2102                 mdp->h.md_parent = &pdp->p;
2103                 bt->md_env->me_txn->mt_root = pdp->p.mp_pgno;
2104                 DPRINTF("root split! new root = %lu", pdp->p.mp_pgno);
2105                 bt->md_env->me_meta.mm_stat.ms_depth++;
2106
2107                 /* Add left (implicit) pointer. */
2108                 if (mdb_add_node(bt, &pdp->p, 0, NULL, NULL,
2109                     mdp->p.mp_pgno, 0) != MDB_SUCCESS)
2110                         return MDB_FAIL;
2111         } else {
2112                 DPRINTF("parent branch page is %lu", mdp->h.md_parent->mp_pgno);
2113         }
2114
2115         /* Create a right sibling. */
2116         if ((rdp = mdbenv_new_page(bt->md_env, mdp->p.mp_flags, 1)) == NULL)
2117                 return MDB_FAIL;
2118         rdp->h.md_parent = mdp->h.md_parent;
2119         rdp->h.md_pi = mdp->h.md_pi + 1;
2120         DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
2121
2122         /* Move half of the keys to the right sibling. */
2123         if ((copy = malloc(bt->md_env->me_meta.mm_stat.ms_psize)) == NULL)
2124                 return MDB_FAIL;
2125         bcopy(&mdp->p, copy, bt->md_env->me_meta.mm_stat.ms_psize);
2126         bzero(&mdp->p.mp_ptrs, bt->md_env->me_meta.mm_stat.ms_psize - PAGEHDRSZ);
2127         mdp->p.mp_lower = PAGEHDRSZ;
2128         mdp->p.mp_upper = bt->md_env->me_meta.mm_stat.ms_psize;
2129
2130         split_indx = NUMKEYS(copy) / 2 + 1;
2131
2132         /* First find the separating key between the split pages.
2133          */
2134         bzero(&sepkey, sizeof(sepkey));
2135         if (newindx == split_indx) {
2136                 sepkey.mv_size = newkey->mv_size;
2137                 sepkey.mv_data = newkey->mv_data;
2138         } else {
2139                 node = NODEPTR(copy, split_indx);
2140                 sepkey.mv_size = node->mn_ksize;
2141                 sepkey.mv_data = NODEKEY(node);
2142         }
2143
2144         DPRINTF("separator is [%.*s]", (int)sepkey.mv_size, (char *)sepkey.mv_data);
2145
2146         /* Copy separator key to the parent.
2147          */
2148         if (SIZELEFT(rdp->h.md_parent) < mdb_branch_size(bt, &sepkey)) {
2149                 rc = mdb_split(bt, &rdp->h.md_parent, &rdp->h.md_pi,
2150                     &sepkey, NULL, rdp->p.mp_pgno);
2151
2152                 /* Right page might now have changed parent.
2153                  * Check if left page also changed parent.
2154                  */
2155                 if (rdp->h.md_parent != mdp->h.md_parent &&
2156                     mdp->h.md_pi >= NUMKEYS(mdp->h.md_parent)) {
2157                         mdp->h.md_parent = rdp->h.md_parent;
2158                         mdp->h.md_pi = rdp->h.md_pi - 1;
2159                 }
2160         } else {
2161                 rc = mdb_add_node(bt, rdp->h.md_parent, rdp->h.md_pi,
2162                     &sepkey, NULL, rdp->p.mp_pgno, 0);
2163         }
2164         if (rc != MDB_SUCCESS) {
2165                 free(copy);
2166                 return MDB_FAIL;
2167         }
2168
2169         for (i = j = 0; i <= NUMKEYS(copy); j++) {
2170                 if (i < split_indx) {
2171                         /* Re-insert in left sibling. */
2172                         pdp = mdp;
2173                 } else {
2174                         /* Insert in right sibling. */
2175                         if (i == split_indx)
2176                                 /* Reset insert index for right sibling. */
2177                                 j = (i == newindx && ins_new);
2178                         pdp = rdp;
2179                 }
2180
2181                 if (i == newindx && !ins_new) {
2182                         /* Insert the original entry that caused the split. */
2183                         rkey.mv_data = newkey->mv_data;
2184                         rkey.mv_size = newkey->mv_size;
2185                         if (IS_LEAF(&mdp->p)) {
2186                                 rdata.mv_data = newdata->mv_data;
2187                                 rdata.mv_size = newdata->mv_size;
2188                         } else
2189                                 pgno = newpgno;
2190                         flags = 0;
2191
2192                         ins_new = 1;
2193
2194                         /* Update page and index for the new key. */
2195                         *newindxp = j;
2196                         *mpp = &pdp->p;
2197                 } else if (i == NUMKEYS(copy)) {
2198                         break;
2199                 } else {
2200                         node = NODEPTR(copy, i);
2201                         rkey.mv_data = NODEKEY(node);
2202                         rkey.mv_size = node->mn_ksize;
2203                         if (IS_LEAF(&mdp->p)) {
2204                                 rdata.mv_data = NODEDATA(node);
2205                                 rdata.mv_size = node->mn_dsize;
2206                         } else
2207                                 pgno = node->mn_pgno;
2208                         flags = node->mn_flags;
2209
2210                         i++;
2211                 }
2212
2213                 if (!IS_LEAF(&mdp->p) && j == 0) {
2214                         /* First branch index doesn't need key data. */
2215                         rkey.mv_size = 0;
2216                 }
2217
2218                 rc = mdb_add_node(bt, &pdp->p, j, &rkey, &rdata, pgno,flags);
2219         }
2220
2221         free(copy);
2222         return rc;
2223 }
2224
2225 int
2226 mdb_put(MDB_db *bt, MDB_txn *txn,
2227     MDB_val *key, MDB_val *data, unsigned int flags)
2228 {
2229         int              rc = MDB_SUCCESS, exact;
2230         unsigned int     ki;
2231         MDB_node        *leaf;
2232         MDB_pageparent  mpp;
2233
2234         assert(key != NULL);
2235         assert(data != NULL);
2236
2237         if (bt == NULL || txn == NULL)
2238                 return EINVAL;
2239
2240         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
2241                 return EINVAL;
2242         }
2243
2244         if (bt->md_env->me_txn != txn) {
2245                 return EINVAL;
2246         }
2247
2248         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2249                 return EINVAL;
2250         }
2251
2252         DPRINTF("==> put key %.*s, size %zu, data size %zu",
2253                 (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size);
2254
2255         rc = mdb_search_page(bt, txn, key, NULL, 1, &mpp);
2256         if (rc == MDB_SUCCESS) {
2257                 leaf = mdb_search_node(bt, mpp.mp_page, key, &exact, &ki);
2258                 if (leaf && exact) {
2259                         if (F_ISSET(flags, MDB_NOOVERWRITE)) {
2260                                 DPRINTF("duplicate key %.*s",
2261                                     (int)key->mv_size, (char *)key->mv_data);
2262                                 return EEXIST;
2263                         }
2264                         mdb_del_node(bt, mpp.mp_page, ki);
2265                 }
2266                 if (leaf == NULL) {             /* append if not found */
2267                         ki = NUMKEYS(mpp.mp_page);
2268                         DPRINTF("appending key at index %i", ki);
2269                 }
2270         } else if (rc == ENOENT) {
2271                 MDB_dpage *dp;
2272                 /* new file, just write a root leaf page */
2273                 DPRINTF("allocating new root leaf page");
2274                 if ((dp = mdbenv_new_page(bt->md_env, P_LEAF, 1)) == NULL) {
2275                         return ENOMEM;
2276                 }
2277                 mpp.mp_page = &dp->p;
2278                 txn->mt_root = mpp.mp_page->mp_pgno;
2279                 bt->md_env->me_meta.mm_stat.ms_depth++;
2280                 ki = 0;
2281         }
2282         else
2283                 goto done;
2284
2285         assert(IS_LEAF(mpp.mp_page));
2286         DPRINTF("there are %u keys, should insert new key at index %i",
2287                 NUMKEYS(mpp.mp_page), ki);
2288
2289         if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(bt, key, data)) {
2290                 rc = mdb_split(bt, &mpp.mp_page, &ki, key, data, P_INVALID);
2291         } else {
2292                 /* There is room already in this leaf page. */
2293                 rc = mdb_add_node(bt, mpp.mp_page, ki, key, data, 0, 0);
2294         }
2295
2296         if (rc != MDB_SUCCESS)
2297                 txn->mt_flags |= MDB_TXN_ERROR;
2298         else
2299                 bt->md_env->me_meta.mm_stat.ms_entries++;
2300
2301 done:
2302         return rc;
2303 }
2304
2305 int
2306 mdbenv_get_flags(MDB_env *env, unsigned int *arg)
2307 {
2308         if (!env || !arg)
2309                 return EINVAL;
2310
2311         *arg = env->me_flags;
2312         return MDB_SUCCESS;
2313 }
2314
2315 int
2316 mdbenv_get_path(MDB_env *env, const char **arg)
2317 {
2318         if (!env || !arg)
2319                 return EINVAL;
2320
2321         *arg = env->me_path;
2322         return MDB_SUCCESS;
2323 }
2324
2325 int
2326 mdbenv_stat(MDB_env *env, MDB_stat **arg)
2327 {
2328         if (env == NULL || arg == NULL)
2329                 return EINVAL;
2330
2331         *arg = &env->me_meta.mm_stat;
2332
2333         return MDB_SUCCESS;
2334 }
2335
2336 int mdb_open(MDB_env *env, MDB_txn *txn, const char *name, unsigned int flags, MDB_db **db)
2337 {
2338         if (!name) {
2339                 *db = (MDB_db *)&env->me_db;
2340                 return MDB_SUCCESS;
2341         }
2342         return EINVAL;
2343 }
2344
2345 int mdb_stat(MDB_db *db, MDB_stat **arg)
2346 {
2347         if (db == NULL || arg == NULL)
2348                 return EINVAL;
2349
2350         *arg = &db->md_stat;
2351
2352         return MDB_SUCCESS;
2353 }
2354
2355 void mdb_close(MDB_db *db)
2356 {
2357 }