]> git.sur5r.net Git - openldap/blob - libraries/libmdb/mdb.c
Re-use old pages
[openldap] / libraries / libmdb / mdb.c
1 #include <sys/types.h>
2 #include <sys/stat.h>
3 #include <sys/queue.h>
4 #include <sys/param.h>
5 #include <sys/uio.h>
6 #include <sys/mman.h>
7 #ifdef HAVE_SYS_FILE_H
8 #include <sys/file.h>
9 #endif
10 #include <sys/ipc.h>
11 #include <sys/shm.h>
12
13 #include <assert.h>
14 #include <err.h>
15 #include <errno.h>
16 #include <fcntl.h>
17 #include <stddef.h>
18 #include <stdint.h>
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <pthread.h>
25
26 #include "mdb.h"
27 #include "idl.h"
28
29 #ifndef DEBUG
30 #define DEBUG 1
31 #endif
32
33 #if (DEBUG +0) && defined(__GNUC__)
34 # define DPRINTF(fmt, ...) \
35         fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, ##__VA_ARGS__)
36 #else
37 # define DPRINTF(...)   ((void) 0)
38 #endif
39
40 #define PAGESIZE         4096
41 #define MDB_MINKEYS      4
42 #define MDB_MAGIC        0xBEEFC0DE
43 #define MDB_VERSION      1
44 #define MAXKEYSIZE       255
45
46 #define P_INVALID        (~0UL)
47
48 #define F_ISSET(w, f)    (((w) & (f)) == (f))
49
50 typedef ulong            pgno_t;
51 typedef uint16_t         indx_t;
52
53 #define DEFAULT_READERS 126
54 #define DEFAULT_MAPSIZE 1048576
55
56 /* Lock descriptor stuff */
57 #define RXBODY  \
58         ulong           mr_txnid; \
59         pid_t           mr_pid; \
60         pthread_t       mr_tid
61 typedef struct MDB_rxbody {
62         RXBODY;
63 } MDB_rxbody;
64
65 #ifndef CACHELINE
66 #define CACHELINE       64      /* most CPUs. Itanium uses 128 */
67 #endif
68
69 typedef struct MDB_reader {
70         RXBODY;
71         /* cache line alignment */
72         char pad[CACHELINE-sizeof(MDB_rxbody)];
73 } MDB_reader;
74
75 #define TXBODY \
76         uint32_t        mt_magic;       \
77         uint32_t        mt_version;     \
78         pthread_mutex_t mt_mutex;       \
79         ulong           mt_txnid;       \
80         uint32_t        mt_numreaders
81 typedef struct MDB_txbody {
82         TXBODY;
83 } MDB_txbody;
84
85 typedef struct MDB_txninfo {
86         TXBODY;
87         char pad[CACHELINE-sizeof(MDB_txbody)];
88         pthread_mutex_t mt_wmutex;
89         char pad2[CACHELINE-sizeof(pthread_mutex_t)];
90         MDB_reader      mt_readers[1];
91 } MDB_txninfo;
92
93 /* Common header for all page types. Overflow pages
94  * occupy a number of contiguous pages with no
95  * headers on any page after the first.
96  */
97 typedef struct MDB_page {               /* represents a page of storage */
98         pgno_t          mp_pgno;                /* page number */
99 #define P_BRANCH         0x01           /* branch page */
100 #define P_LEAF           0x02           /* leaf page */
101 #define P_OVERFLOW       0x04           /* overflow page */
102 #define P_META           0x08           /* meta page */
103 #define P_DIRTY          0x10           /* dirty page */
104         uint32_t        mp_flags;
105 #define mp_lower        mp_pb.pb.pb_lower
106 #define mp_upper        mp_pb.pb.pb_upper
107 #define mp_pages        mp_pb.pb_pages
108         union page_bounds {
109                 struct {
110                         indx_t          pb_lower;               /* lower bound of free space */
111                         indx_t          pb_upper;               /* upper bound of free space */
112                 } pb;
113                 uint32_t        pb_pages;       /* number of overflow pages */
114         } mp_pb;
115         indx_t          mp_ptrs[1];             /* dynamic size */
116 } MDB_page;
117
118 #define PAGEHDRSZ        ((unsigned) offsetof(MDB_page, mp_ptrs))
119
120 #define NUMKEYS(p)       (((p)->mp_lower - PAGEHDRSZ) >> 1)
121 #define SIZELEFT(p)      (indx_t)((p)->mp_upper - (p)->mp_lower)
122 #define PAGEFILL(env, p) (1000L * ((env)->me_meta.mm_stat.ms_psize - PAGEHDRSZ - SIZELEFT(p)) / \
123                                 ((env)->me_meta.mm_stat.ms_psize - PAGEHDRSZ))
124 #define IS_LEAF(p)       F_ISSET((p)->mp_flags, P_LEAF)
125 #define IS_BRANCH(p)     F_ISSET((p)->mp_flags, P_BRANCH)
126 #define IS_OVERFLOW(p)   F_ISSET((p)->mp_flags, P_OVERFLOW)
127
128 #define OVPAGES(size, psize)    (PAGEHDRSZ + size + psize - 1) / psize;
129
130 typedef struct MDB_meta {                       /* meta (footer) page content */
131         uint32_t        mm_magic;
132         uint32_t        mm_version;
133         void            *mm_address;            /* address for fixed mapping */
134         size_t          mm_mapsize;                     /* size of mmap region */
135         pgno_t          mm_last_pg;                     /* last used page in file */
136         ulong           mm_txnid;                       /* txnid that committed this page */
137         MDB_stat        mm_stat;
138         pgno_t          mm_root;                        /* page number of root page */
139 } MDB_meta;
140
141 typedef struct MDB_dhead {                                      /* a dirty page */
142         SIMPLEQ_ENTRY(MDB_dpage)         md_next;       /* queue of dirty pages */
143         MDB_page        *md_parent;
144         unsigned        md_pi;                          /* parent index */
145         int                     md_num;
146 } MDB_dhead;
147
148 typedef struct MDB_dpage {
149         MDB_dhead       h;
150         MDB_page        p;
151 } MDB_dpage;
152
153 SIMPLEQ_HEAD(dirty_queue, MDB_dpage);
154
155 typedef struct MDB_oldpages {
156         struct MDB_oldpages *mo_next;
157         ulong           mo_txnid;
158         pgno_t          mo_pages[1];    /* dynamic */
159 } MDB_oldpages;
160
161 typedef struct MDB_pageparent {
162         MDB_page *mp_page;
163         MDB_page *mp_parent;
164         unsigned mp_pi;
165 } MDB_pageparent;
166
167 static MDB_dpage *mdb_newpage(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num);
168 static int              mdb_touch(MDB_txn *txn, MDB_pageparent *mp);
169
170 typedef struct MDB_ppage {                                      /* ordered list of pages */
171         SLIST_ENTRY(MDB_ppage)   mp_entry;
172         MDB_page                *mp_page;
173         unsigned int    mp_ki;          /* cursor index on page */
174 } MDB_ppage;
175 SLIST_HEAD(page_stack, MDB_ppage);
176
177 #define CURSOR_EMPTY(c)          SLIST_EMPTY(&(c)->mc_stack)
178 #define CURSOR_TOP(c)            SLIST_FIRST(&(c)->mc_stack)
179 #define CURSOR_POP(c)            SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
180 #define CURSOR_PUSH(c,p)         SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry)
181
182 struct MDB_cursor {
183         MDB_db          *mc_db;
184         MDB_txn         *mc_txn;
185         struct page_stack        mc_stack;              /* stack of parent pages */
186         short           mc_initialized; /* 1 if initialized */
187         short           mc_eof;         /* 1 if end is reached */
188 };
189
190 #define METAHASHLEN      offsetof(MDB_meta, mm_hash)
191 #define METADATA(p)      ((void *)((char *)p + PAGEHDRSZ))
192
193 typedef struct MDB_node {
194 #define mn_pgno          mn_p.np_pgno
195 #define mn_dsize         mn_p.np_dsize
196         union {
197                 pgno_t           np_pgno;       /* child page number */
198                 uint32_t         np_dsize;      /* leaf data size */
199         } mn_p;
200         unsigned int    mn_flags:4;
201         unsigned int    mn_ksize:12;                    /* key size */
202 #define F_BIGDATA        0x01                   /* data put on overflow page */
203         char            mn_data[1];
204 } MDB_node;
205
206 struct MDB_txn {
207         pgno_t          mt_root;                /* current / new root page */
208         pgno_t          mt_next_pgno;   /* next unallocated page */
209         pgno_t          mt_first_pgno;
210         ulong           mt_txnid;
211         ulong           mt_oldest;
212         MDB_env         *mt_env;        
213         pgno_t          *mt_free_pgs;   /* this is an IDL */
214         union {
215                 struct dirty_queue      *dirty_queue;   /* modified pages */
216                 MDB_reader      *reader;
217         } mt_u;
218 #define MDB_TXN_RDONLY           0x01           /* read-only transaction */
219 #define MDB_TXN_ERROR            0x02           /* an error has occurred */
220         unsigned int             mt_flags;
221 };
222
223 /* Must be same as MDB_db, minus md_root/md_stat */
224 typedef struct MDB_db0 {
225         unsigned int    md_flags;
226         MDB_cmp_func    *md_cmp;                /* user compare function */
227         MDB_rel_func    *md_rel;                /* user relocate function */
228         MDB_db                  *md_parent;             /* parent tree */
229         MDB_env                 *md_env;
230 } MDB_db0;
231
232 struct MDB_db {
233         unsigned int    md_flags;
234         MDB_cmp_func    *md_cmp;                /* user compare function */
235         MDB_rel_func    *md_rel;                /* user relocate function */
236         MDB_db                  *md_parent;             /* parent tree */
237         MDB_env                 *md_env;
238         MDB_stat                md_stat;
239         pgno_t                  md_root;                /* page number of root page */
240 };
241
242 struct MDB_env {
243         int                     me_fd;
244         key_t           me_shmkey;
245         uint32_t        me_flags;
246         int                     me_maxreaders;
247         int                     me_metatoggle;
248         char            *me_path;
249         char *me_map;
250         MDB_txninfo     *me_txns;
251         MDB_db0         me_db;          /* first DB, overlaps with meta */
252         MDB_meta        me_meta;
253         MDB_txn *me_txn;                /* current write transaction */
254         size_t          me_mapsize;
255         off_t           me_size;                /* current file size */
256         pthread_key_t   me_txkey;       /* thread-key for readers */
257         MDB_oldpages *me_pghead;
258         MDB_oldpages *me_pgtail;
259 };
260
261 #define NODESIZE         offsetof(MDB_node, mn_data)
262
263 #define INDXSIZE(k)      (NODESIZE + ((k) == NULL ? 0 : (k)->mv_size))
264 #define LEAFSIZE(k, d)   (NODESIZE + (k)->mv_size + (d)->mv_size)
265 #define NODEPTR(p, i)    ((MDB_node *)((char *)(p) + (p)->mp_ptrs[i]))
266 #define NODEKEY(node)    (void *)((node)->mn_data)
267 #define NODEDATA(node)   (void *)((char *)(node)->mn_data + (node)->mn_ksize)
268 #define NODEPGNO(node)   ((node)->mn_pgno)
269 #define NODEDSZ(node)    ((node)->mn_dsize)
270
271 #define MDB_COMMIT_PAGES         64     /* max number of pages to write in one commit */
272 #define MDB_MAXCACHE_DEF         1024   /* max number of pages to keep in cache  */
273
274 static int  mdb_search_page_root(MDB_db *db,
275                             MDB_val *key,
276                             MDB_cursor *cursor, int modify,
277                             MDB_pageparent *mpp);
278 static int  mdb_search_page(MDB_db *db,
279                             MDB_txn *txn, MDB_val *key,
280                             MDB_cursor *cursor, int modify,
281                             MDB_pageparent *mpp);
282
283 static int  mdbenv_read_header(MDB_env *env);
284 static int  mdb_check_meta_page(MDB_page *p);
285 static int  mdbenv_read_meta(MDB_env *env);
286 static int  mdbenv_write_meta(MDB_txn *txn);
287 static MDB_page *mdbenv_get_page(MDB_env *env, pgno_t pgno);
288
289 static MDB_node *mdb_search_node(MDB_db *db, MDB_page *mp,
290                             MDB_val *key, int *exactp, unsigned int *kip);
291 static int  mdb_add_node(MDB_db *bt, MDB_page *mp,
292                             indx_t indx, MDB_val *key, MDB_val *data,
293                             pgno_t pgno, uint8_t flags);
294 static void mdb_del_node(MDB_db *bt, MDB_page *mp,
295                             indx_t indx);
296 static int  mdb_read_data(MDB_db *bt, MDB_page *mp,
297                             MDB_node *leaf, MDB_val *data);
298
299 static int               mdb_rebalance(MDB_db *bt, MDB_pageparent *mp);
300 static int               mdb_update_key(MDB_db *bt, MDB_page *mp,
301                             indx_t indx, MDB_val *key);
302 static int               mdb_move_node(MDB_db *bt, 
303                                 MDB_pageparent *src, indx_t srcindx,
304                                 MDB_pageparent *dst, indx_t dstindx);
305 static int               mdb_merge(MDB_db *bt, MDB_pageparent *src,
306                             MDB_pageparent *dst);
307 static int               mdb_split(MDB_db *bt, MDB_page **mpp,
308                             unsigned int *newindxp, MDB_val *newkey,
309                             MDB_val *newdata, pgno_t newpgno);
310 static MDB_dpage *mdbenv_new_page(MDB_env *env, uint32_t flags, int num);
311
312 static void              cursor_pop_page(MDB_cursor *cursor);
313 static MDB_ppage *cursor_push_page(MDB_cursor *cursor,
314                             MDB_page *mp);
315
316 static int               mdb_set_key(MDB_db *bt, MDB_page *mp,
317                             MDB_node *node, MDB_val *key);
318 static int               mdb_sibling(MDB_cursor *cursor, int move_right);
319 static int               mdb_cursor_next(MDB_cursor *cursor,
320                             MDB_val *key, MDB_val *data);
321 static int               mdb_cursor_set(MDB_cursor *cursor,
322                             MDB_val *key, MDB_val *data, int *exactp);
323 static int               mdb_cursor_first(MDB_cursor *cursor,
324                             MDB_val *key, MDB_val *data);
325
326 static size_t            mdb_leaf_size(MDB_db *bt, MDB_val *key,
327                             MDB_val *data);
328 static size_t            mdb_branch_size(MDB_db *bt, MDB_val *key);
329
330 static pgno_t            mdbenv_compact_tree(MDB_env *env, pgno_t pgno,
331                             MDB_env *envc);
332
333 static int               memncmp(const void *s1, size_t n1,
334                                  const void *s2, size_t n2);
335 static int               memnrcmp(const void *s1, size_t n1,
336                                   const void *s2, size_t n2);
337
338 static int
339 memncmp(const void *s1, size_t n1, const void *s2, size_t n2)
340 {
341         int diff, len_diff = -1;
342
343         if (n1 >= n2) {
344                 len_diff = (n1 > n2);
345                 n1 = n2;
346         }
347         diff = memcmp(s1, s2, n1);
348         return diff ? diff : len_diff;
349 }
350
351 static int
352 memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
353 {
354         const unsigned char     *p1, *p2, *p1_lim;
355
356         if (n2 == 0)
357                 return n1 != 0;
358         if (n1 == 0)
359                 return -1;
360
361         p1 = (const unsigned char *)s1 + n1 - 1;
362         p2 = (const unsigned char *)s2 + n2 - 1;
363
364         for (p1_lim = (n1 <= n2 ? s1 : s2);  *p1 == *p2;  p1--, p2--) {
365                 if (p1 == p1_lim)
366                         return (p1 != s1) ? (p1 != p2) : (p2 != s2) ? -1 : 0;
367         }
368         return *p1 - *p2;
369 }
370
371 int
372 mdb_cmp(MDB_db *db, const MDB_val *a, const MDB_val *b)
373 {
374         return db->md_cmp(a, b);
375 }
376
377 static int
378 _mdb_cmp(MDB_db *db, const MDB_val *key1, const MDB_val *key2)
379 {
380         if (F_ISSET(db->md_flags, MDB_REVERSEKEY))
381                 return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
382         else
383                 return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
384 }
385
386 /* Allocate new page(s) for writing */
387 static MDB_dpage *
388 mdb_newpage(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
389 {
390         MDB_dpage *dp;
391         pgno_t pgno = P_INVALID;
392
393         if (txn->mt_env->me_pghead) {
394                 ulong oldest = txn->mt_txnid - 2;
395                 int i;
396                 for (i=0; i<txn->mt_env->me_txns->mt_numreaders; i++) {
397                         if (txn->mt_env->me_txns->mt_readers[i].mr_txnid < oldest)
398                                 oldest = txn->mt_env->me_txns->mt_readers[i].mr_txnid;
399                 }
400                 if (oldest > txn->mt_env->me_pghead->mo_txnid) {
401                         MDB_oldpages *mop = txn->mt_env->me_pghead;
402                         txn->mt_oldest = oldest;
403                         if (num > 1) {
404                                 /* FIXME */
405                                 ;
406                         } else {
407                                 /* peel pages off tail, so we only have to truncate the list */
408                                 pgno = MDB_IDL_LAST(mop->mo_pages);
409                                 if (MDB_IDL_IS_RANGE(mop->mo_pages)) {
410                                         mop->mo_pages[2]++;
411                                         if (mop->mo_pages[2] > mop->mo_pages[1])
412                                                 mop->mo_pages[0] = 0;
413                                 } else {
414                                         mop->mo_pages[0]--;
415                                 }
416                                 if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
417                                         txn->mt_env->me_pghead = mop->mo_next;
418                                         if (!txn->mt_env->me_pghead)
419                                                 txn->mt_env->me_pgtail = NULL;
420                                         free(mop);
421                                 }
422                         }
423                 }
424         }
425
426         if ((dp = malloc(txn->mt_env->me_meta.mm_stat.ms_psize * num + sizeof(MDB_dhead))) == NULL)
427                 return NULL;
428         dp->h.md_num = num;
429         dp->h.md_parent = parent;
430         dp->h.md_pi = parent_idx;
431         SIMPLEQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next);
432         if (pgno == P_INVALID) {
433                 dp->p.mp_pgno = txn->mt_next_pgno;
434                 txn->mt_next_pgno += num;
435         } else {
436                 dp->p.mp_pgno = pgno;
437         }
438
439         return dp;
440 }
441
442 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
443  */
444 static int
445 mdb_touch(MDB_txn *txn, MDB_pageparent *pp)
446 {
447         MDB_page *mp = pp->mp_page;
448         pgno_t  pgno;
449         assert(txn != NULL);
450         assert(pp != NULL);
451
452         if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
453                 MDB_dpage *dp;
454                 if ((dp = mdb_newpage(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL)
455                         return ENOMEM;
456                 DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno);
457                 mdb_idl_insert(txn->mt_free_pgs, mp->mp_pgno);
458                 pgno = dp->p.mp_pgno;
459                 bcopy(mp, &dp->p, txn->mt_env->me_meta.mm_stat.ms_psize);
460                 mp = &dp->p;
461                 mp->mp_pgno = pgno;
462                 mp->mp_flags |= P_DIRTY;
463
464                 /* Update the page number to new touched page. */
465                 if (pp->mp_parent != NULL)
466                         NODEPGNO(NODEPTR(pp->mp_parent, pp->mp_pi)) = mp->mp_pgno;
467                 pp->mp_page = mp;
468         }
469         return 0;
470 }
471
472 int
473 mdbenv_sync(MDB_env *env)
474 {
475         int rc = 0;
476         if (!F_ISSET(env->me_flags, MDB_NOSYNC)) {
477                 if (fsync(env->me_fd))
478                         rc = errno;
479         }
480         return rc;
481 }
482
483 int
484 mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
485 {
486         MDB_txn *txn;
487         int rc;
488
489         if ((txn = calloc(1, sizeof(*txn))) == NULL) {
490                 DPRINTF("calloc: %s", strerror(errno));
491                 return ENOMEM;
492         }
493
494         if (rdonly) {
495                 txn->mt_flags |= MDB_TXN_RDONLY;
496         } else {
497                 txn->mt_u.dirty_queue = calloc(1, sizeof(*txn->mt_u.dirty_queue));
498                 if (txn->mt_u.dirty_queue == NULL) {
499                         free(txn);
500                         return ENOMEM;
501                 }
502                 SIMPLEQ_INIT(txn->mt_u.dirty_queue);
503
504                 pthread_mutex_lock(&env->me_txns->mt_wmutex);
505                 env->me_txns->mt_txnid++;
506                 txn->mt_free_pgs = malloc(MDB_IDL_UM_SIZEOF);
507                 if (txn->mt_free_pgs == NULL) {
508                         free(txn->mt_u.dirty_queue);
509                         free(txn);
510                         return ENOMEM;
511                 }
512                 txn->mt_free_pgs[0] = 0;
513         }
514         txn->mt_txnid = env->me_txns->mt_txnid;
515         if (rdonly) {
516                 MDB_reader *r = pthread_getspecific(env->me_txkey);
517                 if (!r) {
518                         int i;
519                         pthread_mutex_lock(&env->me_txns->mt_mutex);
520                         for (i=0; i<env->me_maxreaders; i++) {
521                                 if (env->me_txns->mt_readers[i].mr_pid == 0) {
522                                         env->me_txns->mt_readers[i].mr_pid = getpid();
523                                         env->me_txns->mt_readers[i].mr_tid = pthread_self();
524                                         pthread_setspecific(env->me_txkey, &env->me_txns->mt_readers[i]);
525                                         if (i >= env->me_txns->mt_numreaders)
526                                                 env->me_txns->mt_numreaders = i+1;
527                                         break;
528                                 }
529                         }
530                         pthread_mutex_unlock(&env->me_txns->mt_mutex);
531                         if (i == env->me_maxreaders) {
532                                 return ENOSPC;
533                         }
534                 }
535                 r->mr_txnid = txn->mt_txnid;
536                 txn->mt_u.reader = r;
537         } else {
538                 env->me_txn = txn;
539         }
540
541         txn->mt_env = env;
542
543         if ((rc = mdbenv_read_meta(env)) != MDB_SUCCESS) {
544                 mdb_txn_abort(txn);
545                 return rc;
546         }
547
548         txn->mt_next_pgno = env->me_meta.mm_last_pg+1;
549         txn->mt_first_pgno = txn->mt_next_pgno;
550         txn->mt_root = env->me_meta.mm_root;
551         DPRINTF("begin transaction %lu on mdbenv %p, root page %lu",
552                 txn->mt_txnid, (void *) env, txn->mt_root);
553
554         *ret = txn;
555         return MDB_SUCCESS;
556 }
557
558 void
559 mdb_txn_abort(MDB_txn *txn)
560 {
561         MDB_dpage *dp;
562         MDB_env *env;
563
564         if (txn == NULL)
565                 return;
566
567         env = txn->mt_env;
568         DPRINTF("abort transaction %lu on mdbenv %p, root page %lu",
569                 txn->mt_txnid, (void *) env, txn->mt_root);
570
571         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
572                 txn->mt_u.reader->mr_txnid = 0;
573         } else {
574                 /* Discard all dirty pages. Return any re-used pages
575                  * to the free list.
576                  */
577                 MDB_IDL_ZERO(txn->mt_free_pgs);
578                 while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
579                         dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
580                         SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
581                         if (dp->p.mp_pgno <= env->me_meta.mm_last_pg)
582                                 mdb_idl_insert(txn->mt_free_pgs, dp->p.mp_pgno);
583                         free(dp);
584                 }
585                 /* put back to head of free list */
586                 if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
587                         MDB_oldpages *mop;
588
589                         mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t));
590                         mop->mo_next = env->me_pghead;
591                         mop->mo_txnid = txn->mt_oldest - 1;
592                         if (!env->me_pghead) {
593                                 env->me_pgtail = mop;
594                         }
595                         env->me_pghead = mop;
596                         bcopy(txn->mt_free_pgs, mop->mo_pages, MDB_IDL_SIZEOF(txn->mt_free_pgs));
597                 }
598
599                 free(txn->mt_free_pgs);
600                 free(txn->mt_u.dirty_queue);
601                 env->me_txn = NULL;
602                 pthread_mutex_unlock(&env->me_txns->mt_wmutex);
603         }
604
605         free(txn);
606 }
607
608 int
609 mdb_txn_commit(MDB_txn *txn)
610 {
611         int              n, done;
612         ssize_t          rc;
613         off_t            size;
614         MDB_dpage       *dp;
615         MDB_env *env;
616         pgno_t  next;
617         struct iovec     iov[MDB_COMMIT_PAGES];
618
619         assert(txn != NULL);
620         assert(txn->mt_env != NULL);
621
622         env = txn->mt_env;
623
624         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
625                 DPRINTF("attempt to commit read-only transaction");
626                 mdb_txn_abort(txn);
627                 return EPERM;
628         }
629
630         if (txn != env->me_txn) {
631                 DPRINTF("attempt to commit unknown transaction");
632                 mdb_txn_abort(txn);
633                 return EINVAL;
634         }
635
636         if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
637                 DPRINTF("error flag is set, can't commit");
638                 mdb_txn_abort(txn);
639                 return EINVAL;
640         }
641
642         if (SIMPLEQ_EMPTY(txn->mt_u.dirty_queue))
643                 goto done;
644
645         DPRINTF("committing transaction %lu on mdbenv %p, root page %lu",
646             txn->mt_txnid, (void *) env, txn->mt_root);
647
648         /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
649          */
650         next = 0;
651         do {
652                 n = 0;
653                 done = 1;
654                 size = 0;
655                 SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
656                         if (dp->p.mp_pgno != next) {
657                                 lseek(env->me_fd, dp->p.mp_pgno * env->me_meta.mm_stat.ms_psize, SEEK_SET);
658                                 next = dp->p.mp_pgno;
659                                 if (n)
660                                         break;
661                         }
662                         DPRINTF("committing page %lu", dp->p.mp_pgno);
663                         iov[n].iov_len = env->me_meta.mm_stat.ms_psize * dp->h.md_num;
664                         iov[n].iov_base = &dp->p;
665                         size += iov[n].iov_len;
666                         next = dp->p.mp_pgno + dp->h.md_num;
667                         /* clear dirty flag */
668                         dp->p.mp_flags &= ~P_DIRTY;
669                         if (++n >= MDB_COMMIT_PAGES) {
670                                 done = 0;
671                                 break;
672                         }
673                 }
674
675                 if (n == 0)
676                         break;
677
678                 DPRINTF("committing %u dirty pages", n);
679                 rc = writev(env->me_fd, iov, n);
680                 if (rc != size) {
681                         n = errno;
682                         if (rc > 0)
683                                 DPRINTF("short write, filesystem full?");
684                         else
685                                 DPRINTF("writev: %s", strerror(errno));
686                         mdb_txn_abort(txn);
687                         return n;
688                 }
689
690                 /* Drop the dirty pages.
691                  */
692                 while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
693                         dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
694                         SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
695                         free(dp);
696                         if (--n == 0)
697                                 break;
698                 }
699         } while (!done);
700
701         if ((n = mdbenv_sync(env)) != 0 ||
702             (n = mdbenv_write_meta(txn)) != MDB_SUCCESS ||
703             (n = mdbenv_sync(env)) != 0) {
704                 mdb_txn_abort(txn);
705                 return n;
706         }
707         env->me_txn = NULL;
708
709         /* add to tail of free list */
710         if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
711                 MDB_oldpages *mop;
712
713                 mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t));
714                 mop->mo_next = NULL;
715                 if (env->me_pghead) {
716                         env->me_pgtail->mo_next = mop;
717                 } else {
718                         env->me_pghead = mop;
719                 }
720                 env->me_pgtail = mop;
721                 bcopy(txn->mt_free_pgs, mop->mo_pages, MDB_IDL_SIZEOF(txn->mt_free_pgs));
722                 mop->mo_txnid = txn->mt_txnid;
723         }
724
725         pthread_mutex_unlock(&env->me_txns->mt_wmutex);
726         free(txn->mt_free_pgs);
727         free(txn->mt_u.dirty_queue);
728         free(txn);
729         txn = NULL;
730
731 done:
732         mdb_txn_abort(txn);
733
734         return MDB_SUCCESS;
735 }
736
737 static int
738 mdbenv_read_header(MDB_env *env)
739 {
740         char             page[PAGESIZE];
741         MDB_page        *p;
742         MDB_meta        *m;
743         int              rc;
744
745         assert(env != NULL);
746
747         /* We don't know the page size yet, so use a minimum value.
748          */
749
750         if ((rc = pread(env->me_fd, page, PAGESIZE, 0)) == 0) {
751                 return ENOENT;
752         } else if (rc != PAGESIZE) {
753                 if (rc > 0)
754                         errno = EINVAL;
755                 DPRINTF("read: %s", strerror(errno));
756                 return errno;
757         }
758
759         p = (MDB_page *)page;
760
761         if (!F_ISSET(p->mp_flags, P_META)) {
762                 DPRINTF("page %lu not a meta page", p->mp_pgno);
763                 return EINVAL;
764         }
765
766         m = METADATA(p);
767         if (m->mm_magic != MDB_MAGIC) {
768                 DPRINTF("meta has invalid magic");
769                 return EINVAL;
770         }
771
772         if (m->mm_version != MDB_VERSION) {
773                 DPRINTF("database is version %u, expected version %u",
774                     m->mm_version, MDB_VERSION);
775                 return EINVAL;
776         }
777
778         bcopy(m, &env->me_meta, sizeof(*m));
779         return 0;
780 }
781
782 static int
783 mdbenv_init_meta(MDB_env *env)
784 {
785         MDB_page *p, *q;
786         MDB_meta *meta;
787         int rc;
788         unsigned int     psize;
789
790         DPRINTF("writing new meta page");
791         psize = sysconf(_SC_PAGE_SIZE);
792
793         env->me_meta.mm_magic = MDB_MAGIC;
794         env->me_meta.mm_version = MDB_VERSION;
795         env->me_meta.mm_stat.ms_psize = psize;
796         env->me_meta.mm_stat.ms_flags = env->me_flags & 0xffff;
797         env->me_meta.mm_root = P_INVALID;
798         env->me_meta.mm_last_pg = 1;
799
800         p = calloc(2, psize);
801         p->mp_pgno = 0;
802         p->mp_flags = P_META;
803
804         meta = METADATA(p);
805         bcopy(&env->me_meta, meta, sizeof(*meta));
806
807         q = (MDB_page *)((char *)p + psize);
808
809         q->mp_pgno = 1;
810         q->mp_flags = P_META;
811
812         meta = METADATA(q);
813         bcopy(&env->me_meta, meta, sizeof(*meta));
814
815         rc = write(env->me_fd, p, psize * 2);
816         free(p);
817         return (rc == psize * 2) ? MDB_SUCCESS : errno;
818 }
819
820 static int
821 mdbenv_write_meta(MDB_txn *txn)
822 {
823         MDB_env *env;
824         MDB_meta        meta;
825         off_t off;
826         int rc;
827
828         assert(txn != NULL);
829         assert(txn->mt_env != NULL);
830
831         DPRINTF("writing meta page for root page %lu", txn->mt_root);
832
833         env = txn->mt_env;
834
835         bcopy(&env->me_meta, &meta, sizeof(meta));
836         meta.mm_root = txn->mt_root;
837         meta.mm_last_pg = txn->mt_next_pgno - 1;
838         meta.mm_txnid = txn->mt_txnid;
839
840         off = 0;
841         if (!env->me_metatoggle)
842                 off = env->me_meta.mm_stat.ms_psize;
843         off += PAGEHDRSZ;
844
845         lseek(env->me_fd, off, SEEK_SET);
846         rc = write(env->me_fd, &meta, sizeof(meta));
847         if (rc != sizeof(meta)) {
848                 DPRINTF("write failed, disk error?");
849                 return errno;
850         }
851
852         return MDB_SUCCESS;
853 }
854
855 /* Returns true if page p is a valid meta page, false otherwise.
856  */
857 static int
858 mdb_check_meta_page(MDB_page *p)
859 {
860         if (!F_ISSET(p->mp_flags, P_META)) {
861                 DPRINTF("page %lu not a meta page", p->mp_pgno);
862                 return EINVAL;
863         }
864
865         return 0;
866 }
867
868 static int
869 mdbenv_read_meta(MDB_env *env)
870 {
871         MDB_page        *mp0, *mp1;
872         MDB_meta        *meta[2];
873         int toggle = 0, rc;
874
875         assert(env != NULL);
876
877         if ((mp0 = mdbenv_get_page(env, 0)) == NULL ||
878                 (mp1 = mdbenv_get_page(env, 1)) == NULL)
879                 return EIO;
880
881         rc = mdb_check_meta_page(mp0);
882         if (rc) return rc;
883
884         rc = mdb_check_meta_page(mp1);
885         if (rc) return rc;
886
887         meta[0] = METADATA(mp0);
888         meta[1] = METADATA(mp1);
889
890         if (meta[0]->mm_txnid < meta[1]->mm_txnid)
891                 toggle = 1;
892
893         if (meta[toggle]->mm_txnid > env->me_meta.mm_txnid) {
894                 bcopy(meta[toggle], &env->me_meta, sizeof(env->me_meta));
895                 env->me_metatoggle = toggle;
896         }
897
898         DPRINTF("Using meta page %d", toggle);
899
900         return MDB_SUCCESS;
901 }
902
903 int
904 mdbenv_create(MDB_env **env)
905 {
906         MDB_env *e;
907
908         e = calloc(1, sizeof(*e));
909         if (!e) return ENOMEM;
910
911         e->me_meta.mm_mapsize = DEFAULT_MAPSIZE;
912         e->me_maxreaders = DEFAULT_READERS;
913         e->me_db.md_env = e;
914         *env = e;
915         return MDB_SUCCESS;
916 }
917
918 int
919 mdbenv_set_mapsize(MDB_env *env, size_t size)
920 {
921         if (env->me_map)
922                 return EINVAL;
923         env->me_mapsize = env->me_meta.mm_mapsize = size;
924         return MDB_SUCCESS;
925 }
926
927 int
928 mdbenv_set_maxreaders(MDB_env *env, int readers)
929 {
930         env->me_maxreaders = readers;
931         return MDB_SUCCESS;
932 }
933
934 int
935 mdbenv_get_maxreaders(MDB_env *env, int *readers)
936 {
937         if (!env || !readers)
938                 return EINVAL;
939         *readers = env->me_maxreaders;
940         return MDB_SUCCESS;
941 }
942
943 int
944 mdbenv_open2(MDB_env *env, unsigned int flags)
945 {
946         int i, newenv = 0;
947
948         env->me_flags = flags;
949
950         if ((i = mdbenv_read_header(env)) != 0) {
951                 if (i != ENOENT)
952                         return i;
953                 DPRINTF("new mdbenv");
954                 newenv = 1;
955         }
956
957         if (!env->me_mapsize)
958                 env->me_mapsize = env->me_meta.mm_mapsize;
959
960         i = MAP_SHARED;
961         if (env->me_meta.mm_address && (flags & MDB_FIXEDMAP))
962                 i |= MAP_FIXED;
963         env->me_map = mmap(env->me_meta.mm_address, env->me_mapsize, PROT_READ, i,
964                 env->me_fd, 0);
965         if (env->me_map == MAP_FAILED)
966                 return errno;
967
968         if (newenv) {
969                 env->me_meta.mm_mapsize = env->me_mapsize;
970                 if (flags & MDB_FIXEDMAP)
971                         env->me_meta.mm_address = env->me_map;
972                 i = mdbenv_init_meta(env);
973                 if (i != MDB_SUCCESS) {
974                         munmap(env->me_map, env->me_mapsize);
975                         return i;
976                 }
977         }
978
979         if ((i = mdbenv_read_meta(env)) != 0)
980                 return i;
981
982         DPRINTF("opened database version %u, pagesize %u",
983             env->me_meta.mm_version, env->me_meta.mm_stat.ms_psize);
984         DPRINTF("depth: %u", env->me_meta.mm_stat.ms_depth);
985         DPRINTF("entries: %lu", env->me_meta.mm_stat.ms_entries);
986         DPRINTF("branch pages: %lu", env->me_meta.mm_stat.ms_branch_pages);
987         DPRINTF("leaf pages: %lu", env->me_meta.mm_stat.ms_leaf_pages);
988         DPRINTF("overflow pages: %lu", env->me_meta.mm_stat.ms_overflow_pages);
989         DPRINTF("root: %lu", env->me_meta.mm_root);
990
991         return MDB_SUCCESS;
992 }
993
994 static void
995 mdbenv_reader_dest(void *ptr)
996 {
997         MDB_reader *reader = ptr;
998
999         reader->mr_txnid = 0;
1000         reader->mr_pid = 0;
1001         reader->mr_tid = 0;
1002 }
1003
1004 int
1005 mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
1006 {
1007         int             oflags, rc, shmid;
1008         off_t   size;
1009
1010
1011         if (F_ISSET(flags, MDB_RDONLY))
1012                 oflags = O_RDONLY;
1013         else
1014                 oflags = O_RDWR | O_CREAT;
1015
1016         if ((env->me_fd = open(path, oflags, mode)) == -1)
1017                 return errno;
1018
1019         env->me_shmkey = ftok(path, 'm');
1020         size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
1021         shmid = shmget(env->me_shmkey, size, IPC_CREAT|IPC_EXCL|mode);
1022         if (shmid == -1) {
1023                 if (errno == EEXIST) {
1024                         shmid = shmget(env->me_shmkey, size, IPC_CREAT|mode);
1025                         if (shmid == -1)
1026                                 return errno;
1027                         env->me_txns = shmat(shmid, NULL, 0);
1028                         if (env->me_txns->mt_magic != MDB_MAGIC ||
1029                                 env->me_txns->mt_version != MDB_VERSION) {
1030                                         DPRINTF("invalid lock region %d", shmid);
1031                                         shmdt(env->me_txns);
1032                                         env->me_txns = NULL;
1033                                         return EIO;
1034                                 }
1035                 } else {
1036                         return errno;
1037                 }
1038         } else {
1039                 pthread_mutexattr_t mattr;
1040
1041                 env->me_txns = shmat(shmid, NULL, 0);
1042                 pthread_mutexattr_init(&mattr);
1043                 pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
1044                 pthread_mutex_init(&env->me_txns->mt_mutex, &mattr);
1045                 pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr);
1046                 env->me_txns->mt_version = MDB_VERSION;
1047                 env->me_txns->mt_magic = MDB_MAGIC;
1048         }
1049
1050         if ((rc = mdbenv_open2(env, flags)) != MDB_SUCCESS) {
1051                 close(env->me_fd);
1052                 env->me_fd = -1;
1053         } else {
1054                 env->me_path = strdup(path);
1055                 DPRINTF("opened dbenv %p", (void *) env);
1056         }
1057
1058         pthread_key_create(&env->me_txkey, mdbenv_reader_dest);
1059
1060         return rc;
1061 }
1062
1063 void
1064 mdbenv_close(MDB_env *env)
1065 {
1066         if (env == NULL)
1067                 return;
1068
1069         free(env->me_path);
1070
1071         if (env->me_map) {
1072                 munmap(env->me_map, env->me_mapsize);
1073         }
1074         close(env->me_fd);
1075         if (env->me_txns) {
1076                 shmdt(env->me_txns);
1077         }
1078         free(env);
1079 }
1080
1081 /* Search for key within a leaf page, using binary search.
1082  * Returns the smallest entry larger or equal to the key.
1083  * If exactp is non-null, stores whether the found entry was an exact match
1084  * in *exactp (1 or 0).
1085  * If kip is non-null, stores the index of the found entry in *kip.
1086  * If no entry larger or equal to the key is found, returns NULL.
1087  */
1088 static MDB_node *
1089 mdb_search_node(MDB_db *bt, MDB_page *mp, MDB_val *key,
1090     int *exactp, unsigned int *kip)
1091 {
1092         unsigned int     i = 0;
1093         int              low, high;
1094         int              rc = 0;
1095         MDB_node        *node;
1096         MDB_val  nodekey;
1097
1098         DPRINTF("searching %u keys in %s page %lu",
1099             NUMKEYS(mp),
1100             IS_LEAF(mp) ? "leaf" : "branch",
1101             mp->mp_pgno);
1102
1103         assert(NUMKEYS(mp) > 0);
1104
1105         bzero(&nodekey, sizeof(nodekey));
1106
1107         low = IS_LEAF(mp) ? 0 : 1;
1108         high = NUMKEYS(mp) - 1;
1109         while (low <= high) {
1110                 i = (low + high) >> 1;
1111                 node = NODEPTR(mp, i);
1112
1113                 nodekey.mv_size = node->mn_ksize;
1114                 nodekey.mv_data = NODEKEY(node);
1115
1116                 if (bt->md_cmp)
1117                         rc = bt->md_cmp(key, &nodekey);
1118                 else
1119                         rc = _mdb_cmp(bt, key, &nodekey);
1120
1121                 if (IS_LEAF(mp))
1122                         DPRINTF("found leaf index %u [%.*s], rc = %i",
1123                             i, (int)nodekey.mv_size, (char *)nodekey.mv_data, rc);
1124                 else
1125                         DPRINTF("found branch index %u [%.*s -> %lu], rc = %i",
1126                             i, (int)node->mn_ksize, (char *)NODEKEY(node),
1127                             node->mn_pgno, rc);
1128
1129                 if (rc == 0)
1130                         break;
1131                 if (rc > 0)
1132                         low = i + 1;
1133                 else
1134                         high = i - 1;
1135         }
1136
1137         if (rc > 0) {   /* Found entry is less than the key. */
1138                 i++;    /* Skip to get the smallest entry larger than key. */
1139                 if (i >= NUMKEYS(mp))
1140                         /* There is no entry larger or equal to the key. */
1141                         return NULL;
1142         }
1143         if (exactp)
1144                 *exactp = (rc == 0);
1145         if (kip)        /* Store the key index if requested. */
1146                 *kip = i;
1147
1148         return NODEPTR(mp, i);
1149 }
1150
1151 static void
1152 cursor_pop_page(MDB_cursor *cursor)
1153 {
1154         MDB_ppage       *top;
1155
1156         top = CURSOR_TOP(cursor);
1157         CURSOR_POP(cursor);
1158
1159         DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor);
1160
1161         free(top);
1162 }
1163
1164 static MDB_ppage *
1165 cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
1166 {
1167         MDB_ppage       *ppage;
1168
1169         DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor);
1170
1171         if ((ppage = calloc(1, sizeof(*ppage))) == NULL)
1172                 return NULL;
1173         ppage->mp_page = mp;
1174         CURSOR_PUSH(cursor, ppage);
1175         return ppage;
1176 }
1177
1178 static MDB_page *
1179 mdbenv_get_page(MDB_env *env, pgno_t pgno)
1180 {
1181         MDB_page *p = NULL;
1182         MDB_txn *txn = env->me_txn;
1183         int found = 0;
1184
1185         if (txn && !SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
1186                 MDB_dpage *dp;
1187                 SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
1188                         if (dp->p.mp_pgno == pgno) {
1189                                 p = &dp->p;
1190                                 found = 1;
1191                                 break;
1192                         }
1193                 }
1194         }
1195         if (!found) {
1196                 p = (MDB_page *)(env->me_map + env->me_meta.mm_stat.ms_psize * pgno);
1197         }
1198         return p;
1199 }
1200
1201 static int
1202 mdb_search_page_root(MDB_db *bt, MDB_val *key,
1203     MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1204 {
1205         MDB_page        *mp = mpp->mp_page;
1206         int rc;
1207
1208         if (cursor && cursor_push_page(cursor, mp) == NULL)
1209                 return MDB_FAIL;
1210
1211         while (IS_BRANCH(mp)) {
1212                 unsigned int     i = 0;
1213                 MDB_node        *node;
1214
1215                 DPRINTF("branch page %lu has %u keys", mp->mp_pgno, NUMKEYS(mp));
1216                 assert(NUMKEYS(mp) > 1);
1217                 DPRINTF("found index 0 to page %lu", NODEPGNO(NODEPTR(mp, 0)));
1218
1219                 if (key == NULL)        /* Initialize cursor to first page. */
1220                         i = 0;
1221                 else {
1222                         int      exact;
1223                         node = mdb_search_node(bt, mp, key, &exact, &i);
1224                         if (node == NULL)
1225                                 i = NUMKEYS(mp) - 1;
1226                         else if (!exact) {
1227                                 assert(i > 0);
1228                                 i--;
1229                         }
1230                 }
1231
1232                 if (key)
1233                         DPRINTF("following index %u for key %.*s",
1234                             i, (int)key->mv_size, (char *)key->mv_data);
1235                 assert(i < NUMKEYS(mp));
1236                 node = NODEPTR(mp, i);
1237
1238                 if (cursor)
1239                         CURSOR_TOP(cursor)->mp_ki = i;
1240
1241                 mpp->mp_parent = mp;
1242                 if ((mp = mdbenv_get_page(bt->md_env, NODEPGNO(node))) == NULL)
1243                         return MDB_FAIL;
1244                 mpp->mp_pi = i;
1245                 mpp->mp_page = mp;
1246
1247                 if (cursor && cursor_push_page(cursor, mp) == NULL)
1248                         return MDB_FAIL;
1249
1250                 if (modify) {
1251                         MDB_dhead *dh = ((MDB_dhead *)mp)-1;
1252                         if ((rc = mdb_touch(bt->md_env->me_txn, mpp)) != 0)
1253                                 return rc;
1254                         dh = ((MDB_dhead *)mpp->mp_page)-1;
1255                         dh->md_parent = mpp->mp_parent;
1256                         dh->md_pi = mpp->mp_pi;
1257                 }
1258
1259                 mp = mpp->mp_page;
1260         }
1261
1262         if (!IS_LEAF(mp)) {
1263                 DPRINTF("internal error, index points to a %02X page!?",
1264                     mp->mp_flags);
1265                 return MDB_FAIL;
1266         }
1267
1268         DPRINTF("found leaf page %lu for key %.*s", mp->mp_pgno,
1269             key ? (int)key->mv_size : 0, key ? (char *)key->mv_data : NULL);
1270
1271         return MDB_SUCCESS;
1272 }
1273
1274 /* Search for the page a given key should be in.
1275  * Stores a pointer to the found page in *mpp.
1276  * If key is NULL, search for the lowest page (used by mdb_cursor_first).
1277  * If cursor is non-null, pushes parent pages on the cursor stack.
1278  * If modify is true, visited pages are updated with new page numbers.
1279  */
1280 static int
1281 mdb_search_page(MDB_db *db, MDB_txn *txn, MDB_val *key,
1282     MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1283 {
1284         int              rc;
1285         pgno_t           root;
1286
1287         /* Can't modify pages outside a transaction. */
1288         if (txn == NULL && modify) {
1289                 return EINVAL;
1290         }
1291
1292         /* Choose which root page to start with. If a transaction is given
1293          * use the root page from the transaction, otherwise read the last
1294          * committed root page.
1295          */
1296         if (txn == NULL) {
1297                 if ((rc = mdbenv_read_meta(db->md_env)) != MDB_SUCCESS)
1298                         return rc;
1299                 root = db->md_env->me_meta.mm_root;
1300         } else if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
1301                 DPRINTF("transaction has failed, must abort");
1302                 return EINVAL;
1303         } else
1304                 root = txn->mt_root;
1305
1306         if (root == P_INVALID) {                /* Tree is empty. */
1307                 DPRINTF("tree is empty");
1308                 return ENOENT;
1309         }
1310
1311         if ((mpp->mp_page = mdbenv_get_page(db->md_env, root)) == NULL)
1312                 return MDB_FAIL;
1313
1314         DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags);
1315
1316         if (modify && !F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
1317                 mpp->mp_parent = NULL;
1318                 mpp->mp_pi = 0;
1319                 if ((rc = mdb_touch(txn, mpp)))
1320                         return rc;
1321                 txn->mt_root = mpp->mp_page->mp_pgno;
1322         }
1323
1324         return mdb_search_page_root(db, key, cursor, modify, mpp);
1325 }
1326
1327 static int
1328 mdb_read_data(MDB_db *db, MDB_page *mp, MDB_node *leaf,
1329     MDB_val *data)
1330 {
1331         MDB_page        *omp;           /* overflow mpage */
1332         size_t           max;
1333         pgno_t           pgno;
1334
1335         bzero(data, sizeof(*data));
1336         max = db->md_env->me_meta.mm_stat.ms_psize - PAGEHDRSZ;
1337
1338         if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
1339                 data->mv_size = leaf->mn_dsize;
1340                 data->mv_data = NODEDATA(leaf);
1341                 return MDB_SUCCESS;
1342         }
1343
1344         /* Read overflow data.
1345          */
1346         data->mv_size = leaf->mn_dsize;
1347         bcopy(NODEDATA(leaf), &pgno, sizeof(pgno));
1348         if ((omp = mdbenv_get_page(db->md_env, pgno)) == NULL) {
1349                 DPRINTF("read overflow page %lu failed", pgno);
1350                 return MDB_FAIL;
1351         }
1352         data->mv_data = omp;
1353
1354         return MDB_SUCCESS;
1355 }
1356
1357 int
1358 mdb_get(MDB_db *db, MDB_txn *txn,
1359     MDB_val *key, MDB_val *data)
1360 {
1361         int              rc, exact;
1362         MDB_node        *leaf;
1363         MDB_pageparent mpp;
1364
1365         assert(key);
1366         assert(data);
1367         DPRINTF("===> get key [%.*s]", (int)key->mv_size, (char *)key->mv_data);
1368
1369         if (db == NULL)
1370                 return EINVAL;
1371
1372         if (txn != NULL && db->md_env != txn->mt_env) {
1373                 return EINVAL;
1374         }
1375
1376         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
1377                 return EINVAL;
1378         }
1379
1380         if ((rc = mdb_search_page(db, txn, key, NULL, 0, &mpp)) != MDB_SUCCESS)
1381                 return rc;
1382
1383         leaf = mdb_search_node(db, mpp.mp_page, key, &exact, NULL);
1384         if (leaf && exact)
1385                 rc = mdb_read_data(db, mpp.mp_page, leaf, data);
1386         else {
1387                 rc = ENOENT;
1388         }
1389
1390         return rc;
1391 }
1392
1393 static int
1394 mdb_sibling(MDB_cursor *cursor, int move_right)
1395 {
1396         int              rc;
1397         MDB_node        *indx;
1398         MDB_ppage       *parent, *top;
1399         MDB_page        *mp;
1400
1401         top = CURSOR_TOP(cursor);
1402         if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) {
1403                 return ENOENT;          /* root has no siblings */
1404         }
1405
1406         DPRINTF("parent page is page %lu, index %u",
1407             parent->mp_page->mp_pgno, parent->mp_ki);
1408
1409         cursor_pop_page(cursor);
1410         if (move_right ? (parent->mp_ki + 1 >= NUMKEYS(parent->mp_page))
1411                        : (parent->mp_ki == 0)) {
1412                 DPRINTF("no more keys left, moving to %s sibling",
1413                     move_right ? "right" : "left");
1414                 if ((rc = mdb_sibling(cursor, move_right)) != MDB_SUCCESS)
1415                         return rc;
1416                 parent = CURSOR_TOP(cursor);
1417         } else {
1418                 if (move_right)
1419                         parent->mp_ki++;
1420                 else
1421                         parent->mp_ki--;
1422                 DPRINTF("just moving to %s index key %u",
1423                     move_right ? "right" : "left", parent->mp_ki);
1424         }
1425         assert(IS_BRANCH(parent->mp_page));
1426
1427         indx = NODEPTR(parent->mp_page, parent->mp_ki);
1428         if ((mp = mdbenv_get_page(cursor->mc_db->md_env, indx->mn_pgno)) == NULL)
1429                 return MDB_FAIL;
1430 #if 0
1431         mp->parent = parent->mp_page;
1432         mp->parent_index = parent->mp_ki;
1433 #endif
1434
1435         cursor_push_page(cursor, mp);
1436
1437         return MDB_SUCCESS;
1438 }
1439
1440 static int
1441 mdb_set_key(MDB_db *bt, MDB_page *mp, MDB_node *node,
1442     MDB_val *key)
1443 {
1444         if (key == NULL)
1445                 return 0;
1446
1447         key->mv_size = node->mn_ksize;
1448         key->mv_data = NODEKEY(node);
1449
1450         return 0;
1451 }
1452
1453 static int
1454 mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
1455 {
1456         MDB_ppage       *top;
1457         MDB_page        *mp;
1458         MDB_node        *leaf;
1459
1460         if (cursor->mc_eof) {
1461                 return ENOENT;
1462         }
1463
1464         assert(cursor->mc_initialized);
1465
1466         top = CURSOR_TOP(cursor);
1467         mp = top->mp_page;
1468
1469         DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
1470
1471         if (top->mp_ki + 1 >= NUMKEYS(mp)) {
1472                 DPRINTF("=====> move to next sibling page");
1473                 if (mdb_sibling(cursor, 1) != MDB_SUCCESS) {
1474                         cursor->mc_eof = 1;
1475                         return ENOENT;
1476                 }
1477                 top = CURSOR_TOP(cursor);
1478                 mp = top->mp_page;
1479                 DPRINTF("next page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
1480         } else
1481                 top->mp_ki++;
1482
1483         DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
1484             mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
1485
1486         assert(IS_LEAF(mp));
1487         leaf = NODEPTR(mp, top->mp_ki);
1488
1489         if (data && mdb_read_data(cursor->mc_db, mp, leaf, data) != MDB_SUCCESS)
1490                 return MDB_FAIL;
1491
1492         return mdb_set_key(cursor->mc_db, mp, leaf, key);
1493 }
1494
1495 static int
1496 mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
1497     int *exactp)
1498 {
1499         int              rc;
1500         MDB_node        *leaf;
1501         MDB_ppage       *top;
1502         MDB_pageparent mpp;
1503
1504         assert(cursor);
1505         assert(key);
1506         assert(key->mv_size > 0);
1507
1508         rc = mdb_search_page(cursor->mc_db, cursor->mc_txn, key, cursor, 0, &mpp);
1509         if (rc != MDB_SUCCESS)
1510                 return rc;
1511         assert(IS_LEAF(mpp.mp_page));
1512
1513         top = CURSOR_TOP(cursor);
1514         leaf = mdb_search_node(cursor->mc_db, mpp.mp_page, key, exactp, &top->mp_ki);
1515         if (exactp != NULL && !*exactp) {
1516                 /* MDB_CURSOR_EXACT specified and not an exact match. */
1517                 return ENOENT;
1518         }
1519
1520         if (leaf == NULL) {
1521                 DPRINTF("===> inexact leaf not found, goto sibling");
1522                 if ((rc = mdb_sibling(cursor, 1)) != MDB_SUCCESS)
1523                         return rc;              /* no entries matched */
1524                 top = CURSOR_TOP(cursor);
1525                 top->mp_ki = 0;
1526                 mpp.mp_page = top->mp_page;
1527                 assert(IS_LEAF(mpp.mp_page));
1528                 leaf = NODEPTR(mpp.mp_page, 0);
1529         }
1530
1531         cursor->mc_initialized = 1;
1532         cursor->mc_eof = 0;
1533
1534         if (data && (rc = mdb_read_data(cursor->mc_db, mpp.mp_page, leaf, data)) != MDB_SUCCESS)
1535                 return rc;
1536
1537         rc = mdb_set_key(cursor->mc_db, mpp.mp_page, leaf, key);
1538         if (rc == MDB_SUCCESS) {
1539                 DPRINTF("==> cursor placed on key %.*s",
1540                         (int)key->mv_size, (char *)key->mv_data);
1541                 ;
1542         }
1543
1544         return rc;
1545 }
1546
1547 static int
1548 mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
1549 {
1550         int              rc;
1551         MDB_pageparent  mpp;
1552         MDB_node        *leaf;
1553
1554         rc = mdb_search_page(cursor->mc_db, cursor->mc_txn, NULL, cursor, 0, &mpp);
1555         if (rc != MDB_SUCCESS)
1556                 return rc;
1557         assert(IS_LEAF(mpp.mp_page));
1558
1559         leaf = NODEPTR(mpp.mp_page, 0);
1560         cursor->mc_initialized = 1;
1561         cursor->mc_eof = 0;
1562
1563         if (data && (rc = mdb_read_data(cursor->mc_db, mpp.mp_page, leaf, data)) != MDB_SUCCESS)
1564                 return rc;
1565
1566         return mdb_set_key(cursor->mc_db, mpp.mp_page, leaf, key);
1567 }
1568
1569 int
1570 mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
1571     MDB_cursor_op op)
1572 {
1573         int              rc;
1574         int              exact = 0;
1575
1576         assert(cursor);
1577
1578         switch (op) {
1579         case MDB_CURSOR:
1580         case MDB_CURSOR_EXACT:
1581                 while (CURSOR_TOP(cursor) != NULL)
1582                         cursor_pop_page(cursor);
1583                 if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
1584                         rc = EINVAL;
1585                 } else if (op == MDB_CURSOR_EXACT)
1586                         rc = mdb_cursor_set(cursor, key, data, &exact);
1587                 else
1588                         rc = mdb_cursor_set(cursor, key, data, NULL);
1589                 break;
1590         case MDB_NEXT:
1591                 if (!cursor->mc_initialized)
1592                         rc = mdb_cursor_first(cursor, key, data);
1593                 else
1594                         rc = mdb_cursor_next(cursor, key, data);
1595                 break;
1596         case MDB_FIRST:
1597                 while (CURSOR_TOP(cursor) != NULL)
1598                         cursor_pop_page(cursor);
1599                 rc = mdb_cursor_first(cursor, key, data);
1600                 break;
1601         default:
1602                 DPRINTF("unhandled/unimplemented cursor operation %u", op);
1603                 rc = EINVAL;
1604                 break;
1605         }
1606
1607         return rc;
1608 }
1609
1610 /* Allocate a page and initialize it
1611  */
1612 static MDB_dpage *
1613 mdbenv_new_page(MDB_env *env, uint32_t flags, int num)
1614 {
1615         MDB_dpage       *dp;
1616
1617         assert(env != NULL);
1618         assert(env->me_txn != NULL);
1619
1620         if ((dp = mdb_newpage(env->me_txn, NULL, 0, num)) == NULL)
1621                 return NULL;
1622         DPRINTF("allocated new mpage %lu, page size %u",
1623             dp->p.mp_pgno, env->me_meta.mm_stat.ms_psize);
1624         dp->p.mp_flags = flags | P_DIRTY;
1625         dp->p.mp_lower = PAGEHDRSZ;
1626         dp->p.mp_upper = env->me_meta.mm_stat.ms_psize;
1627
1628         if (IS_BRANCH(&dp->p))
1629                 env->me_meta.mm_stat.ms_branch_pages++;
1630         else if (IS_LEAF(&dp->p))
1631                 env->me_meta.mm_stat.ms_leaf_pages++;
1632         else if (IS_OVERFLOW(&dp->p)) {
1633                 env->me_meta.mm_stat.ms_overflow_pages += num;
1634                 dp->p.mp_pages = num;
1635         }
1636
1637         return dp;
1638 }
1639
1640 static size_t
1641 mdb_leaf_size(MDB_db *db, MDB_val *key, MDB_val *data)
1642 {
1643         size_t           sz;
1644
1645         sz = LEAFSIZE(key, data);
1646         if (data->mv_size >= db->md_env->me_meta.mm_stat.ms_psize / MDB_MINKEYS) {
1647                 /* put on overflow page */
1648                 sz -= data->mv_size - sizeof(pgno_t);
1649         }
1650
1651         return sz + sizeof(indx_t);
1652 }
1653
1654 static size_t
1655 mdb_branch_size(MDB_db *db, MDB_val *key)
1656 {
1657         size_t           sz;
1658
1659         sz = INDXSIZE(key);
1660         if (sz >= db->md_env->me_meta.mm_stat.ms_psize / MDB_MINKEYS) {
1661                 /* put on overflow page */
1662                 /* not implemented */
1663                 /* sz -= key->size - sizeof(pgno_t); */
1664         }
1665
1666         return sz + sizeof(indx_t);
1667 }
1668
1669 static int
1670 mdb_add_node(MDB_db *db, MDB_page *mp, indx_t indx,
1671     MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags)
1672 {
1673         unsigned int     i;
1674         size_t           node_size = NODESIZE;
1675         indx_t           ofs;
1676         MDB_node        *node;
1677         MDB_dpage       *ofp = NULL;            /* overflow page */
1678
1679         assert(mp->mp_upper >= mp->mp_lower);
1680
1681         DPRINTF("add node [%.*s] to %s page %lu at index %i, key size %zu",
1682             key ? (int)key->mv_size : 0, key ? (char *)key->mv_data : NULL,
1683             IS_LEAF(mp) ? "leaf" : "branch",
1684             mp->mp_pgno, indx, key ? key->mv_size : 0);
1685
1686         if (key != NULL)
1687                 node_size += key->mv_size;
1688
1689         if (IS_LEAF(mp)) {
1690                 assert(data);
1691                 if (F_ISSET(flags, F_BIGDATA)) {
1692                         /* Data already on overflow page. */
1693                         node_size += sizeof(pgno_t);
1694                 } else if (data->mv_size >= db->md_env->me_meta.mm_stat.ms_psize / MDB_MINKEYS) {
1695                         int ovpages = OVPAGES(data->mv_size, db->md_env->me_meta.mm_stat.ms_psize);
1696                         /* Put data on overflow page. */
1697                         DPRINTF("data size is %zu, put on overflow page",
1698                             data->mv_size);
1699                         node_size += sizeof(pgno_t);
1700                         if ((ofp = mdbenv_new_page(db->md_env, P_OVERFLOW, ovpages)) == NULL)
1701                                 return MDB_FAIL;
1702                         DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno);
1703                         flags |= F_BIGDATA;
1704                 } else {
1705                         node_size += data->mv_size;
1706                 }
1707         }
1708
1709         if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
1710                 DPRINTF("not enough room in page %lu, got %u ptrs",
1711                     mp->mp_pgno, NUMKEYS(mp));
1712                 DPRINTF("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower,
1713                     mp->mp_upper - mp->mp_lower);
1714                 DPRINTF("node size = %zu", node_size);
1715                 return ENOSPC;
1716         }
1717
1718         /* Move higher pointers up one slot. */
1719         for (i = NUMKEYS(mp); i > indx; i--)
1720                 mp->mp_ptrs[i] = mp->mp_ptrs[i - 1];
1721
1722         /* Adjust free space offsets. */
1723         ofs = mp->mp_upper - node_size;
1724         assert(ofs >= mp->mp_lower + sizeof(indx_t));
1725         mp->mp_ptrs[indx] = ofs;
1726         mp->mp_upper = ofs;
1727         mp->mp_lower += sizeof(indx_t);
1728
1729         /* Write the node data. */
1730         node = NODEPTR(mp, indx);
1731         node->mn_ksize = (key == NULL) ? 0 : key->mv_size;
1732         node->mn_flags = flags;
1733         if (IS_LEAF(mp))
1734                 node->mn_dsize = data->mv_size;
1735         else
1736                 node->mn_pgno = pgno;
1737
1738         if (key)
1739                 bcopy(key->mv_data, NODEKEY(node), key->mv_size);
1740
1741         if (IS_LEAF(mp)) {
1742                 assert(key);
1743                 if (ofp == NULL) {
1744                         if (F_ISSET(flags, F_BIGDATA))
1745                                 bcopy(data->mv_data, node->mn_data + key->mv_size,
1746                                     sizeof(pgno_t));
1747                         else
1748                                 bcopy(data->mv_data, node->mn_data + key->mv_size,
1749                                     data->mv_size);
1750                 } else {
1751                         bcopy(&ofp->p.mp_pgno, node->mn_data + key->mv_size,
1752                             sizeof(pgno_t));
1753                         bcopy(data->mv_data, METADATA(&ofp->p), data->mv_size);
1754                 }
1755         }
1756
1757         return MDB_SUCCESS;
1758 }
1759
1760 static void
1761 mdb_del_node(MDB_db *db, MDB_page *mp, indx_t indx)
1762 {
1763         unsigned int     sz;
1764         indx_t           i, j, numkeys, ptr;
1765         MDB_node        *node;
1766         char            *base;
1767
1768         DPRINTF("delete node %u on %s page %lu", indx,
1769             IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno);
1770         assert(indx < NUMKEYS(mp));
1771
1772         node = NODEPTR(mp, indx);
1773         sz = NODESIZE + node->mn_ksize;
1774         if (IS_LEAF(mp)) {
1775                 if (F_ISSET(node->mn_flags, F_BIGDATA))
1776                         sz += sizeof(pgno_t);
1777                 else
1778                         sz += NODEDSZ(node);
1779         }
1780
1781         ptr = mp->mp_ptrs[indx];
1782         numkeys = NUMKEYS(mp);
1783         for (i = j = 0; i < numkeys; i++) {
1784                 if (i != indx) {
1785                         mp->mp_ptrs[j] = mp->mp_ptrs[i];
1786                         if (mp->mp_ptrs[i] < ptr)
1787                                 mp->mp_ptrs[j] += sz;
1788                         j++;
1789                 }
1790         }
1791
1792         base = (char *)mp + mp->mp_upper;
1793         bcopy(base, base + sz, ptr - mp->mp_upper);
1794
1795         mp->mp_lower -= sizeof(indx_t);
1796         mp->mp_upper += sz;
1797 }
1798
1799 int
1800 mdb_cursor_open(MDB_db *db, MDB_txn *txn, MDB_cursor **ret)
1801 {
1802         MDB_cursor      *cursor;
1803
1804         if (db == NULL || ret == NULL)
1805                 return EINVAL;
1806
1807         if (txn != NULL && db->md_env != txn->mt_env) {
1808                 return EINVAL;
1809         }
1810
1811         if ((cursor = calloc(1, sizeof(*cursor))) != NULL) {
1812                 SLIST_INIT(&cursor->mc_stack);
1813                 cursor->mc_db = db;
1814                 cursor->mc_txn = txn;
1815         }
1816
1817         *ret = cursor;
1818
1819         return MDB_SUCCESS;
1820 }
1821
1822 void
1823 mdb_cursor_close(MDB_cursor *cursor)
1824 {
1825         if (cursor != NULL) {
1826                 while (!CURSOR_EMPTY(cursor))
1827                         cursor_pop_page(cursor);
1828
1829 /*              btree_close(cursor->bt); */
1830                 free(cursor);
1831         }
1832 }
1833
1834 static int
1835 mdb_update_key(MDB_db *db, MDB_page *mp, indx_t indx,
1836     MDB_val *key)
1837 {
1838         indx_t                   ptr, i, numkeys;
1839         int                      delta;
1840         size_t                   len;
1841         MDB_node                *node;
1842         char                    *base;
1843
1844         node = NODEPTR(mp, indx);
1845         ptr = mp->mp_ptrs[indx];
1846         DPRINTF("update key %u (ofs %u) [%.*s] to [%.*s] on page %lu",
1847             indx, ptr,
1848             (int)node->mn_ksize, (char *)NODEKEY(node),
1849             (int)key->mv_size, (char *)key->mv_data,
1850             mp->mp_pgno);
1851
1852         delta = key->mv_size - node->mn_ksize;
1853         if (delta) {
1854                 if (delta > 0 && SIZELEFT(mp) < delta) {
1855                         DPRINTF("OUCH! Not enough room, delta = %d", delta);
1856                         return ENOSPC;
1857                 }
1858
1859                 numkeys = NUMKEYS(mp);
1860                 for (i = 0; i < numkeys; i++) {
1861                         if (mp->mp_ptrs[i] <= ptr)
1862                                 mp->mp_ptrs[i] -= delta;
1863                 }
1864
1865                 base = (char *)mp + mp->mp_upper;
1866                 len = ptr - mp->mp_upper + NODESIZE;
1867                 bcopy(base, base - delta, len);
1868                 mp->mp_upper -= delta;
1869
1870                 node = NODEPTR(mp, indx);
1871                 node->mn_ksize = key->mv_size;
1872         }
1873
1874         bcopy(key->mv_data, NODEKEY(node), key->mv_size);
1875
1876         return MDB_SUCCESS;
1877 }
1878
1879 /* Move a node from src to dst.
1880  */
1881 static int
1882 mdb_move_node(MDB_db *bt, MDB_pageparent *src, indx_t srcindx,
1883     MDB_pageparent *dst, indx_t dstindx)
1884 {
1885         int                      rc;
1886         MDB_node                *srcnode;
1887         MDB_val          key, data;
1888
1889         srcnode = NODEPTR(src->mp_page, srcindx);
1890         DPRINTF("moving %s node %u [%.*s] on page %lu to node %u on page %lu",
1891             IS_LEAF(src->mp_page) ? "leaf" : "branch",
1892             srcindx,
1893             (int)srcnode->mn_ksize, (char *)NODEKEY(srcnode),
1894             src->mp_page->mp_pgno,
1895             dstindx, dst->mp_page->mp_pgno);
1896
1897         /* Mark src and dst as dirty. */
1898         if ((rc = mdb_touch(bt->md_env->me_txn, src)) ||
1899             (rc = mdb_touch(bt->md_env->me_txn, dst)))
1900                 return rc;;
1901
1902         /* Add the node to the destination page.
1903          */
1904         key.mv_size = srcnode->mn_ksize;
1905         key.mv_data = NODEKEY(srcnode);
1906         data.mv_size = NODEDSZ(srcnode);
1907         data.mv_data = NODEDATA(srcnode);
1908         rc = mdb_add_node(bt, dst->mp_page, dstindx, &key, &data, NODEPGNO(srcnode),
1909             srcnode->mn_flags);
1910         if (rc != MDB_SUCCESS)
1911                 return rc;
1912
1913         /* Delete the node from the source page.
1914          */
1915         mdb_del_node(bt, src->mp_page, srcindx);
1916
1917         /* Update the parent separators.
1918          */
1919         if (srcindx == 0 && src->mp_pi != 0) {
1920                 DPRINTF("update separator for source page %lu to [%.*s]",
1921                     src->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data);
1922                 if ((rc = mdb_update_key(bt, src->mp_parent, src->mp_pi,
1923                     &key)) != MDB_SUCCESS)
1924                         return rc;
1925         }
1926
1927         if (srcindx == 0 && IS_BRANCH(src->mp_page)) {
1928                 MDB_val  nullkey;
1929                 nullkey.mv_size = 0;
1930                 assert(mdb_update_key(bt, src->mp_page, 0, &nullkey) == MDB_SUCCESS);
1931         }
1932
1933         if (dstindx == 0 && dst->mp_pi != 0) {
1934                 DPRINTF("update separator for destination page %lu to [%.*s]",
1935                     dst->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data);
1936                 if ((rc = mdb_update_key(bt, dst->mp_parent, dst->mp_pi,
1937                     &key)) != MDB_SUCCESS)
1938                         return rc;
1939         }
1940
1941         if (dstindx == 0 && IS_BRANCH(dst->mp_page)) {
1942                 MDB_val  nullkey;
1943                 nullkey.mv_size = 0;
1944                 assert(mdb_update_key(bt, dst->mp_page, 0, &nullkey) == MDB_SUCCESS);
1945         }
1946
1947         return MDB_SUCCESS;
1948 }
1949
1950 static int
1951 mdb_merge(MDB_db *bt, MDB_pageparent *src, MDB_pageparent *dst)
1952 {
1953         int                      rc;
1954         indx_t                   i;
1955         MDB_node                *srcnode;
1956         MDB_val          key, data;
1957         MDB_pageparent  mpp;
1958         MDB_dhead *dh;
1959
1960         DPRINTF("merging page %lu and %lu", src->mp_page->mp_pgno, dst->mp_page->mp_pgno);
1961
1962         assert(src->mp_parent); /* can't merge root page */
1963         assert(dst->mp_parent);
1964         assert(bt->md_env->me_txn != NULL);
1965
1966         /* Mark src and dst as dirty. */
1967         if ((rc = mdb_touch(bt->md_env->me_txn, src)) ||
1968             (rc = mdb_touch(bt->md_env->me_txn, dst)))
1969                 return rc;
1970
1971         /* Move all nodes from src to dst.
1972          */
1973         for (i = 0; i < NUMKEYS(src->mp_page); i++) {
1974                 srcnode = NODEPTR(src->mp_page, i);
1975
1976                 key.mv_size = srcnode->mn_ksize;
1977                 key.mv_data = NODEKEY(srcnode);
1978                 data.mv_size = NODEDSZ(srcnode);
1979                 data.mv_data = NODEDATA(srcnode);
1980                 rc = mdb_add_node(bt, dst->mp_page, NUMKEYS(dst->mp_page), &key,
1981                     &data, NODEPGNO(srcnode), srcnode->mn_flags);
1982                 if (rc != MDB_SUCCESS)
1983                         return rc;
1984         }
1985
1986         DPRINTF("dst page %lu now has %u keys (%.1f%% filled)",
1987             dst->mp_page->mp_pgno, NUMKEYS(dst->mp_page), (float)PAGEFILL(bt->md_env, dst->mp_page) / 10);
1988
1989         /* Unlink the src page from parent.
1990          */
1991         mdb_del_node(bt, src->mp_parent, src->mp_pi);
1992         if (src->mp_pi == 0) {
1993                 key.mv_size = 0;
1994                 if ((rc = mdb_update_key(bt, src->mp_parent, 0, &key)) != MDB_SUCCESS)
1995                         return rc;
1996         }
1997
1998         if (IS_LEAF(src->mp_page))
1999                 bt->md_env->me_meta.mm_stat.ms_leaf_pages--;
2000         else
2001                 bt->md_env->me_meta.mm_stat.ms_branch_pages--;
2002
2003         mpp.mp_page = src->mp_parent;
2004         dh = (MDB_dhead *)src->mp_parent;
2005         dh--;
2006         mpp.mp_parent = dh->md_parent;
2007         mpp.mp_pi = dh->md_pi;
2008
2009         return mdb_rebalance(bt, &mpp);
2010 }
2011
2012 #define FILL_THRESHOLD   250
2013
2014 static int
2015 mdb_rebalance(MDB_db *db, MDB_pageparent *mpp)
2016 {
2017         MDB_node        *node;
2018         MDB_page        *root;
2019         MDB_pageparent npp;
2020         indx_t           si = 0, di = 0;
2021
2022         assert(db != NULL);
2023         assert(db->md_env->me_txn != NULL);
2024         assert(mpp != NULL);
2025
2026         DPRINTF("rebalancing %s page %lu (has %u keys, %.1f%% full)",
2027             IS_LEAF(mpp->mp_page) ? "leaf" : "branch",
2028             mpp->mp_page->mp_pgno, NUMKEYS(mpp->mp_page), (float)PAGEFILL(db->md_env, mpp->mp_page) / 10);
2029
2030         if (PAGEFILL(db->md_env, mpp->mp_page) >= FILL_THRESHOLD) {
2031                 DPRINTF("no need to rebalance page %lu, above fill threshold",
2032                     mpp->mp_page->mp_pgno);
2033                 return MDB_SUCCESS;
2034         }
2035
2036         if (mpp->mp_parent == NULL) {
2037                 if (NUMKEYS(mpp->mp_page) == 0) {
2038                         DPRINTF("tree is completely empty");
2039                         db->md_env->me_txn->mt_root = P_INVALID;
2040                         db->md_env->me_meta.mm_stat.ms_depth--;
2041                         db->md_env->me_meta.mm_stat.ms_leaf_pages--;
2042                 } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
2043                         DPRINTF("collapsing root page!");
2044                         db->md_env->me_txn->mt_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
2045                         if ((root = mdbenv_get_page(db->md_env, db->md_env->me_txn->mt_root)) == NULL)
2046                                 return MDB_FAIL;
2047                         db->md_env->me_meta.mm_stat.ms_depth--;
2048                         db->md_env->me_meta.mm_stat.ms_branch_pages--;
2049                 } else
2050                         DPRINTF("root page doesn't need rebalancing");
2051                 return MDB_SUCCESS;
2052         }
2053
2054         /* The parent (branch page) must have at least 2 pointers,
2055          * otherwise the tree is invalid.
2056          */
2057         assert(NUMKEYS(mpp->mp_parent) > 1);
2058
2059         /* Leaf page fill factor is below the threshold.
2060          * Try to move keys from left or right neighbor, or
2061          * merge with a neighbor page.
2062          */
2063
2064         /* Find neighbors.
2065          */
2066         if (mpp->mp_pi == 0) {
2067                 /* We're the leftmost leaf in our parent.
2068                  */
2069                 DPRINTF("reading right neighbor");
2070                 node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
2071                 if ((npp.mp_page = mdbenv_get_page(db->md_env, NODEPGNO(node))) == NULL)
2072                         return MDB_FAIL;
2073                 npp.mp_pi = mpp->mp_pi + 1;
2074                 si = 0;
2075                 di = NUMKEYS(mpp->mp_page);
2076         } else {
2077                 /* There is at least one neighbor to the left.
2078                  */
2079                 DPRINTF("reading left neighbor");
2080                 node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
2081                 if ((npp.mp_page = mdbenv_get_page(db->md_env, NODEPGNO(node))) == NULL)
2082                         return MDB_FAIL;
2083                 npp.mp_pi = mpp->mp_pi - 1;
2084                 si = NUMKEYS(npp.mp_page) - 1;
2085                 di = 0;
2086         }
2087         npp.mp_parent = mpp->mp_parent;
2088
2089         DPRINTF("found neighbor page %lu (%u keys, %.1f%% full)",
2090             npp.mp_page->mp_pgno, NUMKEYS(npp.mp_page), (float)PAGEFILL(db->md_env, npp.mp_page) / 10);
2091
2092         /* If the neighbor page is above threshold and has at least two
2093          * keys, move one key from it.
2094          *
2095          * Otherwise we should try to merge them.
2096          */
2097         if (PAGEFILL(db->md_env, npp.mp_page) >= FILL_THRESHOLD && NUMKEYS(npp.mp_page) >= 2)
2098                 return mdb_move_node(db, &npp, si, mpp, di);
2099         else { /* FIXME: if (has_enough_room()) */
2100                 if (mpp->mp_pi == 0)
2101                         return mdb_merge(db, &npp, mpp);
2102                 else
2103                         return mdb_merge(db, mpp, &npp);
2104         }
2105 }
2106
2107 int
2108 mdb_del(MDB_db *bt, MDB_txn *txn,
2109     MDB_val *key, MDB_val *data)
2110 {
2111         int              rc, exact;
2112         unsigned int     ki;
2113         MDB_node        *leaf;
2114         MDB_pageparent  mpp;
2115
2116         DPRINTF("========> delete key %.*s", (int)key->mv_size, (char *)key->mv_data);
2117
2118         assert(key != NULL);
2119
2120         if (bt == NULL || txn == NULL)
2121                 return EINVAL;
2122
2123         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
2124                 return EINVAL;
2125         }
2126
2127         if (bt->md_env->me_txn != txn) {
2128                 return EINVAL;
2129         }
2130
2131         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2132                 return EINVAL;
2133         }
2134
2135         if ((rc = mdb_search_page(bt, txn, key, NULL, 1, &mpp)) != MDB_SUCCESS)
2136                 return rc;
2137
2138         leaf = mdb_search_node(bt, mpp.mp_page, key, &exact, &ki);
2139         if (leaf == NULL || !exact) {
2140                 return ENOENT;
2141         }
2142
2143         if (data && (rc = mdb_read_data(bt, NULL, leaf, data)) != MDB_SUCCESS)
2144                 return rc;
2145
2146         mdb_del_node(bt, mpp.mp_page, ki);
2147         /* add overflow pages to free list */
2148         if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
2149                 int i, ovpages;
2150                 pgno_t pg;
2151
2152                 bcopy(NODEDATA(leaf), &pg, sizeof(pg));
2153                 ovpages = OVPAGES(NODEDSZ(leaf), bt->md_env->me_meta.mm_stat.ms_psize);
2154                 for (i=0; i<ovpages; i++) {
2155                         mdb_idl_insert(txn->mt_free_pgs, pg);
2156                         pg++;
2157                 }
2158         }
2159         bt->md_env->me_meta.mm_stat.ms_entries--;
2160         rc = mdb_rebalance(bt, &mpp);
2161         if (rc != MDB_SUCCESS)
2162                 txn->mt_flags |= MDB_TXN_ERROR;
2163
2164         return rc;
2165 }
2166
2167 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
2168  * right sibling, at index <*newindxp> (as if unsplit). Updates *mpp and
2169  * *newindxp with the actual values after split, ie if *mpp and *newindxp
2170  * refer to a node in the new right sibling page.
2171  */
2172 static int
2173 mdb_split(MDB_db *bt, MDB_page **mpp, unsigned int *newindxp,
2174     MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
2175 {
2176         uint8_t          flags;
2177         int              rc = MDB_SUCCESS, ins_new = 0;
2178         indx_t           newindx;
2179         pgno_t           pgno = 0;
2180         unsigned int     i, j, split_indx;
2181         MDB_node        *node;
2182         MDB_val  sepkey, rkey, rdata;
2183         MDB_page        *copy;
2184         MDB_dpage       *mdp, *rdp, *pdp;
2185         MDB_dhead *dh;
2186
2187         assert(bt != NULL);
2188         assert(bt->md_env != NULL);
2189
2190         dh = ((MDB_dhead *)*mpp) - 1;
2191         mdp = (MDB_dpage *)dh;
2192         newindx = *newindxp;
2193
2194         DPRINTF("-----> splitting %s page %lu and adding [%.*s] at index %i",
2195             IS_LEAF(&mdp->p) ? "leaf" : "branch", mdp->p.mp_pgno,
2196             (int)newkey->mv_size, (char *)newkey->mv_data, *newindxp);
2197
2198         if (mdp->h.md_parent == NULL) {
2199                 if ((pdp = mdbenv_new_page(bt->md_env, P_BRANCH, 1)) == NULL)
2200                         return MDB_FAIL;
2201                 mdp->h.md_pi = 0;
2202                 mdp->h.md_parent = &pdp->p;
2203                 bt->md_env->me_txn->mt_root = pdp->p.mp_pgno;
2204                 DPRINTF("root split! new root = %lu", pdp->p.mp_pgno);
2205                 bt->md_env->me_meta.mm_stat.ms_depth++;
2206
2207                 /* Add left (implicit) pointer. */
2208                 if (mdb_add_node(bt, &pdp->p, 0, NULL, NULL,
2209                     mdp->p.mp_pgno, 0) != MDB_SUCCESS)
2210                         return MDB_FAIL;
2211         } else {
2212                 DPRINTF("parent branch page is %lu", mdp->h.md_parent->mp_pgno);
2213         }
2214
2215         /* Create a right sibling. */
2216         if ((rdp = mdbenv_new_page(bt->md_env, mdp->p.mp_flags, 1)) == NULL)
2217                 return MDB_FAIL;
2218         rdp->h.md_parent = mdp->h.md_parent;
2219         rdp->h.md_pi = mdp->h.md_pi + 1;
2220         DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
2221
2222         /* Move half of the keys to the right sibling. */
2223         if ((copy = malloc(bt->md_env->me_meta.mm_stat.ms_psize)) == NULL)
2224                 return MDB_FAIL;
2225         bcopy(&mdp->p, copy, bt->md_env->me_meta.mm_stat.ms_psize);
2226         bzero(&mdp->p.mp_ptrs, bt->md_env->me_meta.mm_stat.ms_psize - PAGEHDRSZ);
2227         mdp->p.mp_lower = PAGEHDRSZ;
2228         mdp->p.mp_upper = bt->md_env->me_meta.mm_stat.ms_psize;
2229
2230         split_indx = NUMKEYS(copy) / 2 + 1;
2231
2232         /* First find the separating key between the split pages.
2233          */
2234         bzero(&sepkey, sizeof(sepkey));
2235         if (newindx == split_indx) {
2236                 sepkey.mv_size = newkey->mv_size;
2237                 sepkey.mv_data = newkey->mv_data;
2238         } else {
2239                 node = NODEPTR(copy, split_indx);
2240                 sepkey.mv_size = node->mn_ksize;
2241                 sepkey.mv_data = NODEKEY(node);
2242         }
2243
2244         DPRINTF("separator is [%.*s]", (int)sepkey.mv_size, (char *)sepkey.mv_data);
2245
2246         /* Copy separator key to the parent.
2247          */
2248         if (SIZELEFT(rdp->h.md_parent) < mdb_branch_size(bt, &sepkey)) {
2249                 rc = mdb_split(bt, &rdp->h.md_parent, &rdp->h.md_pi,
2250                     &sepkey, NULL, rdp->p.mp_pgno);
2251
2252                 /* Right page might now have changed parent.
2253                  * Check if left page also changed parent.
2254                  */
2255                 if (rdp->h.md_parent != mdp->h.md_parent &&
2256                     mdp->h.md_pi >= NUMKEYS(mdp->h.md_parent)) {
2257                         mdp->h.md_parent = rdp->h.md_parent;
2258                         mdp->h.md_pi = rdp->h.md_pi - 1;
2259                 }
2260         } else {
2261                 rc = mdb_add_node(bt, rdp->h.md_parent, rdp->h.md_pi,
2262                     &sepkey, NULL, rdp->p.mp_pgno, 0);
2263         }
2264         if (rc != MDB_SUCCESS) {
2265                 free(copy);
2266                 return MDB_FAIL;
2267         }
2268
2269         for (i = j = 0; i <= NUMKEYS(copy); j++) {
2270                 if (i < split_indx) {
2271                         /* Re-insert in left sibling. */
2272                         pdp = mdp;
2273                 } else {
2274                         /* Insert in right sibling. */
2275                         if (i == split_indx)
2276                                 /* Reset insert index for right sibling. */
2277                                 j = (i == newindx && ins_new);
2278                         pdp = rdp;
2279                 }
2280
2281                 if (i == newindx && !ins_new) {
2282                         /* Insert the original entry that caused the split. */
2283                         rkey.mv_data = newkey->mv_data;
2284                         rkey.mv_size = newkey->mv_size;
2285                         if (IS_LEAF(&mdp->p)) {
2286                                 rdata.mv_data = newdata->mv_data;
2287                                 rdata.mv_size = newdata->mv_size;
2288                         } else
2289                                 pgno = newpgno;
2290                         flags = 0;
2291
2292                         ins_new = 1;
2293
2294                         /* Update page and index for the new key. */
2295                         *newindxp = j;
2296                         *mpp = &pdp->p;
2297                 } else if (i == NUMKEYS(copy)) {
2298                         break;
2299                 } else {
2300                         node = NODEPTR(copy, i);
2301                         rkey.mv_data = NODEKEY(node);
2302                         rkey.mv_size = node->mn_ksize;
2303                         if (IS_LEAF(&mdp->p)) {
2304                                 rdata.mv_data = NODEDATA(node);
2305                                 rdata.mv_size = node->mn_dsize;
2306                         } else
2307                                 pgno = node->mn_pgno;
2308                         flags = node->mn_flags;
2309
2310                         i++;
2311                 }
2312
2313                 if (!IS_LEAF(&mdp->p) && j == 0) {
2314                         /* First branch index doesn't need key data. */
2315                         rkey.mv_size = 0;
2316                 }
2317
2318                 rc = mdb_add_node(bt, &pdp->p, j, &rkey, &rdata, pgno,flags);
2319         }
2320
2321         free(copy);
2322         return rc;
2323 }
2324
2325 int
2326 mdb_put(MDB_db *bt, MDB_txn *txn,
2327     MDB_val *key, MDB_val *data, unsigned int flags)
2328 {
2329         int              rc = MDB_SUCCESS, exact;
2330         unsigned int     ki;
2331         MDB_node        *leaf;
2332         MDB_pageparent  mpp;
2333
2334         assert(key != NULL);
2335         assert(data != NULL);
2336
2337         if (bt == NULL || txn == NULL)
2338                 return EINVAL;
2339
2340         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
2341                 return EINVAL;
2342         }
2343
2344         if (bt->md_env->me_txn != txn) {
2345                 return EINVAL;
2346         }
2347
2348         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2349                 return EINVAL;
2350         }
2351
2352         DPRINTF("==> put key %.*s, size %zu, data size %zu",
2353                 (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size);
2354
2355         rc = mdb_search_page(bt, txn, key, NULL, 1, &mpp);
2356         if (rc == MDB_SUCCESS) {
2357                 leaf = mdb_search_node(bt, mpp.mp_page, key, &exact, &ki);
2358                 if (leaf && exact) {
2359                         if (F_ISSET(flags, MDB_NOOVERWRITE)) {
2360                                 DPRINTF("duplicate key %.*s",
2361                                     (int)key->mv_size, (char *)key->mv_data);
2362                                 return EEXIST;
2363                         }
2364                         mdb_del_node(bt, mpp.mp_page, ki);
2365                 }
2366                 if (leaf == NULL) {             /* append if not found */
2367                         ki = NUMKEYS(mpp.mp_page);
2368                         DPRINTF("appending key at index %i", ki);
2369                 }
2370         } else if (rc == ENOENT) {
2371                 MDB_dpage *dp;
2372                 /* new file, just write a root leaf page */
2373                 DPRINTF("allocating new root leaf page");
2374                 if ((dp = mdbenv_new_page(bt->md_env, P_LEAF, 1)) == NULL) {
2375                         return ENOMEM;
2376                 }
2377                 mpp.mp_page = &dp->p;
2378                 txn->mt_root = mpp.mp_page->mp_pgno;
2379                 bt->md_env->me_meta.mm_stat.ms_depth++;
2380                 ki = 0;
2381         }
2382         else
2383                 goto done;
2384
2385         assert(IS_LEAF(mpp.mp_page));
2386         DPRINTF("there are %u keys, should insert new key at index %i",
2387                 NUMKEYS(mpp.mp_page), ki);
2388
2389         if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(bt, key, data)) {
2390                 rc = mdb_split(bt, &mpp.mp_page, &ki, key, data, P_INVALID);
2391         } else {
2392                 /* There is room already in this leaf page. */
2393                 rc = mdb_add_node(bt, mpp.mp_page, ki, key, data, 0, 0);
2394         }
2395
2396         if (rc != MDB_SUCCESS)
2397                 txn->mt_flags |= MDB_TXN_ERROR;
2398         else
2399                 bt->md_env->me_meta.mm_stat.ms_entries++;
2400
2401 done:
2402         return rc;
2403 }
2404
2405 int
2406 mdbenv_get_flags(MDB_env *env, unsigned int *arg)
2407 {
2408         if (!env || !arg)
2409                 return EINVAL;
2410
2411         *arg = env->me_flags;
2412         return MDB_SUCCESS;
2413 }
2414
2415 int
2416 mdbenv_get_path(MDB_env *env, const char **arg)
2417 {
2418         if (!env || !arg)
2419                 return EINVAL;
2420
2421         *arg = env->me_path;
2422         return MDB_SUCCESS;
2423 }
2424
2425 int
2426 mdbenv_stat(MDB_env *env, MDB_stat **arg)
2427 {
2428         if (env == NULL || arg == NULL)
2429                 return EINVAL;
2430
2431         *arg = &env->me_meta.mm_stat;
2432
2433         return MDB_SUCCESS;
2434 }
2435
2436 int mdb_open(MDB_env *env, MDB_txn *txn, const char *name, unsigned int flags, MDB_db **db)
2437 {
2438         if (!name) {
2439                 *db = (MDB_db *)&env->me_db;
2440                 return MDB_SUCCESS;
2441         }
2442         return EINVAL;
2443 }
2444
2445 int mdb_stat(MDB_db *db, MDB_stat **arg)
2446 {
2447         if (db == NULL || arg == NULL)
2448                 return EINVAL;
2449
2450         *arg = &db->md_stat;
2451
2452         return MDB_SUCCESS;
2453 }
2454
2455 void mdb_close(MDB_db *db)
2456 {
2457 }