]> git.sur5r.net Git - openldap/blob - libraries/libmdb/mdb.c
5715eac78c3d4f9e82cb086f0354a95532f735df
[openldap] / libraries / libmdb / mdb.c
1 #include <sys/types.h>
2 #include <sys/stat.h>
3 #include <sys/queue.h>
4 #include <sys/param.h>
5 #include <sys/uio.h>
6 #include <sys/mman.h>
7 #ifdef HAVE_SYS_FILE_H
8 #include <sys/file.h>
9 #endif
10 #include <fcntl.h>
11
12 #include <assert.h>
13 #include <err.h>
14 #include <errno.h>
15 #include <fcntl.h>
16 #include <stddef.h>
17 #include <stdint.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <pthread.h>
24
25 #include "mdb.h"
26
27 #define ULONG           unsigned long
28 typedef ULONG           pgno_t;
29
30 #include "idl.h"
31
32 #ifndef DEBUG
33 #define DEBUG 1
34 #endif
35
36 #if (DEBUG +0) && defined(__GNUC__)
37 # define DPRINTF(fmt, ...) \
38         fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, ##__VA_ARGS__)
39 #else
40 # define DPRINTF(...)   ((void) 0)
41 #endif
42
43 #define PAGESIZE         4096
44 #define MDB_MINKEYS      4
45 #define MDB_MAGIC        0xBEEFC0DE
46 #define MDB_VERSION      1
47 #define MAXKEYSIZE       255
48
49 #define P_INVALID        (~0UL)
50
51 #define F_ISSET(w, f)    (((w) & (f)) == (f))
52
53 typedef uint16_t         indx_t;
54
55 #define DEFAULT_READERS 126
56 #define DEFAULT_MAPSIZE 1048576
57
58 /* Lock descriptor stuff */
59 #define RXBODY  \
60         ULONG           mr_txnid; \
61         pid_t           mr_pid; \
62         pthread_t       mr_tid
63 typedef struct MDB_rxbody {
64         RXBODY;
65 } MDB_rxbody;
66
67 #ifndef CACHELINE
68 #define CACHELINE       64      /* most CPUs. Itanium uses 128 */
69 #endif
70
71 typedef struct MDB_reader {
72         RXBODY;
73         /* cache line alignment */
74         char pad[CACHELINE-sizeof(MDB_rxbody)];
75 } MDB_reader;
76
77 #define TXBODY \
78         uint32_t        mt_magic;       \
79         uint32_t        mt_version;     \
80         pthread_mutex_t mt_mutex;       \
81         ULONG           mt_txnid;       \
82         uint32_t        mt_numreaders
83 typedef struct MDB_txbody {
84         TXBODY;
85 } MDB_txbody;
86
87 typedef struct MDB_txninfo {
88         TXBODY;
89         char pad[CACHELINE-sizeof(MDB_txbody)];
90         pthread_mutex_t mt_wmutex;
91         char pad2[CACHELINE-sizeof(pthread_mutex_t)];
92         MDB_reader      mt_readers[1];
93 } MDB_txninfo;
94
95 /* Common header for all page types. Overflow pages
96  * occupy a number of contiguous pages with no
97  * headers on any page after the first.
98  */
99 typedef struct MDB_page {               /* represents a page of storage */
100         pgno_t          mp_pgno;                /* page number */
101 #define P_BRANCH         0x01           /* branch page */
102 #define P_LEAF           0x02           /* leaf page */
103 #define P_OVERFLOW       0x04           /* overflow page */
104 #define P_META           0x08           /* meta page */
105 #define P_DIRTY          0x10           /* dirty page */
106         uint32_t        mp_flags;
107 #define mp_lower        mp_pb.pb.pb_lower
108 #define mp_upper        mp_pb.pb.pb_upper
109 #define mp_pages        mp_pb.pb_pages
110         union page_bounds {
111                 struct {
112                         indx_t          pb_lower;               /* lower bound of free space */
113                         indx_t          pb_upper;               /* upper bound of free space */
114                 } pb;
115                 uint32_t        pb_pages;       /* number of overflow pages */
116         } mp_pb;
117         indx_t          mp_ptrs[1];             /* dynamic size */
118 } MDB_page;
119
120 #define PAGEHDRSZ        ((unsigned) offsetof(MDB_page, mp_ptrs))
121
122 #define NUMKEYS(p)       (((p)->mp_lower - PAGEHDRSZ) >> 1)
123 #define SIZELEFT(p)      (indx_t)((p)->mp_upper - (p)->mp_lower)
124 #define PAGEFILL(env, p) (1000L * ((env)->me_meta.mm_psize - PAGEHDRSZ - SIZELEFT(p)) / \
125                                 ((env)->me_meta.mm_psize - PAGEHDRSZ))
126 #define IS_LEAF(p)       F_ISSET((p)->mp_flags, P_LEAF)
127 #define IS_BRANCH(p)     F_ISSET((p)->mp_flags, P_BRANCH)
128 #define IS_OVERFLOW(p)   F_ISSET((p)->mp_flags, P_OVERFLOW)
129
130 #define OVPAGES(size, psize)    (PAGEHDRSZ + size + psize - 1) / psize;
131
132 typedef struct MDB_meta {                       /* meta (footer) page content */
133         uint32_t        mm_magic;
134         uint32_t        mm_version;
135         void            *mm_address;            /* address for fixed mapping */
136         size_t          mm_mapsize;                     /* size of mmap region */
137         pgno_t          mm_last_pg;                     /* last used page in file */
138         ULONG           mm_txnid;                       /* txnid that committed this page */
139         uint32_t        mm_psize;
140         uint16_t        mm_flags;
141         uint16_t        mm_depth;
142         ULONG           mm_branch_pages;
143         ULONG           mm_leaf_pages;
144         ULONG           mm_overflow_pages;
145         ULONG           mm_entries;
146         pgno_t          mm_root;
147 } MDB_meta;
148
149 typedef struct MDB_dhead {                                      /* a dirty page */
150         SIMPLEQ_ENTRY(MDB_dpage)         md_next;       /* queue of dirty pages */
151         MDB_page        *md_parent;
152         unsigned        md_pi;                          /* parent index */
153         int                     md_num;
154 } MDB_dhead;
155
156 typedef struct MDB_dpage {
157         MDB_dhead       h;
158         MDB_page        p;
159 } MDB_dpage;
160
161 SIMPLEQ_HEAD(dirty_queue, MDB_dpage);
162
163 typedef struct MDB_oldpages {
164         struct MDB_oldpages *mo_next;
165         ULONG           mo_txnid;
166         pgno_t          mo_pages[1];    /* dynamic */
167 } MDB_oldpages;
168
169 typedef struct MDB_pageparent {
170         MDB_page *mp_page;
171         MDB_page *mp_parent;
172         unsigned mp_pi;
173 } MDB_pageparent;
174
175 static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num);
176 static int              mdb_touch(MDB_txn *txn, MDB_pageparent *mp);
177
178 typedef struct MDB_ppage {                                      /* ordered list of pages */
179         SLIST_ENTRY(MDB_ppage)   mp_entry;
180         MDB_page                *mp_page;
181         unsigned int    mp_ki;          /* cursor index on page */
182 } MDB_ppage;
183 SLIST_HEAD(page_stack, MDB_ppage);
184
185 #define CURSOR_EMPTY(c)          SLIST_EMPTY(&(c)->mc_stack)
186 #define CURSOR_TOP(c)            SLIST_FIRST(&(c)->mc_stack)
187 #define CURSOR_POP(c)            SLIST_REMOVE_HEAD(&(c)->mc_stack, mp_entry)
188 #define CURSOR_PUSH(c,p)         SLIST_INSERT_HEAD(&(c)->mc_stack, p, mp_entry)
189
190 struct MDB_cursor {
191         MDB_txn         *mc_txn;
192         struct page_stack        mc_stack;              /* stack of parent pages */
193         MDB_dbi         mc_dbi;
194         short           mc_initialized; /* 1 if initialized */
195         short           mc_eof;         /* 1 if end is reached */
196 };
197
198 #define METAHASHLEN      offsetof(MDB_meta, mm_hash)
199 #define METADATA(p)      ((void *)((char *)p + PAGEHDRSZ))
200
201 typedef struct MDB_node {
202 #define mn_pgno          mn_p.np_pgno
203 #define mn_dsize         mn_p.np_dsize
204         union {
205                 pgno_t           np_pgno;       /* child page number */
206                 uint32_t         np_dsize;      /* leaf data size */
207         } mn_p;
208         unsigned int    mn_flags:4;
209         unsigned int    mn_ksize:12;                    /* key size */
210 #define F_BIGDATA        0x01                   /* data put on overflow page */
211         char            mn_data[1];
212 } MDB_node;
213
214 typedef struct MDB_dbx {
215         char            *md_name;
216         MDB_cmp_func    *md_cmp;                /* user compare function */
217         MDB_rel_func    *md_rel;                /* user relocate function */
218 } MDB_dbx;
219
220 typedef struct MDB_db {
221         uint32_t        md_pad;
222         uint16_t        md_flags;
223         uint16_t        md_depth;
224         ULONG           md_branch_pages;
225         ULONG           md_leaf_pages;
226         ULONG           md_overflow_pages;
227         ULONG           md_entries;
228         pgno_t          md_root;
229 } MDB_db;
230
231 struct MDB_txn {
232         pgno_t          mt_next_pgno;   /* next unallocated page */
233         ULONG           mt_txnid;
234         ULONG           mt_oldest;
235         MDB_env         *mt_env;        
236         pgno_t          *mt_free_pgs;   /* this is an IDL */
237         union {
238                 struct dirty_queue      *dirty_queue;   /* modified pages */
239                 MDB_reader      *reader;
240         } mt_u;
241         MDB_dbx         *mt_dbxs;               /* array */
242         MDB_db          **mt_dbs;               /* array of ptrs */
243         MDB_db          mt_db0;                 /* just for write txns */
244         unsigned int    mt_numdbs;
245
246 #define MDB_TXN_RDONLY           0x01           /* read-only transaction */
247 #define MDB_TXN_ERROR            0x02           /* an error has occurred */
248 #define MDB_TXN_METOGGLE        0x04            /* used meta page 1 */
249         unsigned int             mt_flags;
250 };
251
252 struct MDB_env {
253         int                     me_fd;
254         int                     me_lfd;
255         uint32_t        me_flags;
256         unsigned int                    me_maxreaders;
257         char            *me_path;
258         char            *me_map;
259         MDB_txninfo     *me_txns;
260         MDB_meta        me_meta;
261         MDB_txn         *me_txn;                /* current write transaction */
262         size_t          me_mapsize;
263         off_t           me_size;                /* current file size */
264         pthread_key_t   me_txkey;       /* thread-key for readers */
265         MDB_oldpages *me_pghead;
266         MDB_oldpages *me_pgtail;
267         MDB_dbx         *me_dbxs;               /* array */
268         MDB_db          **me_dbs;               /* array of ptrs */
269         unsigned int    me_numdbs;
270 };
271
272 #define NODESIZE         offsetof(MDB_node, mn_data)
273
274 #define INDXSIZE(k)      (NODESIZE + ((k) == NULL ? 0 : (k)->mv_size))
275 #define LEAFSIZE(k, d)   (NODESIZE + (k)->mv_size + (d)->mv_size)
276 #define NODEPTR(p, i)    ((MDB_node *)((char *)(p) + (p)->mp_ptrs[i]))
277 #define NODEKEY(node)    (void *)((node)->mn_data)
278 #define NODEDATA(node)   (void *)((char *)(node)->mn_data + (node)->mn_ksize)
279 #define NODEPGNO(node)   ((node)->mn_pgno)
280 #define NODEDSZ(node)    ((node)->mn_dsize)
281
282 #define MDB_COMMIT_PAGES         64     /* max number of pages to write in one commit */
283 #define MDB_MAXCACHE_DEF         1024   /* max number of pages to keep in cache  */
284
285 static int  mdb_search_page_root(MDB_txn *txn,
286                             MDB_dbi dbi, MDB_val *key,
287                             MDB_cursor *cursor, int modify,
288                             MDB_pageparent *mpp);
289 static int  mdb_search_page(MDB_txn *txn,
290                             MDB_dbi dbi, MDB_val *key,
291                             MDB_cursor *cursor, int modify,
292                             MDB_pageparent *mpp);
293
294 static int  mdbenv_read_header(MDB_env *env);
295 static int  mdb_check_meta_page(MDB_page *p);
296 static int  mdbenv_read_meta(MDB_env *env, int *which);
297 static int  mdbenv_write_meta(MDB_txn *txn);
298 static MDB_page *mdbenv_get_page(MDB_env *env, pgno_t pgno);
299
300 static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
301                             MDB_val *key, int *exactp, unsigned int *kip);
302 static int  mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
303                             indx_t indx, MDB_val *key, MDB_val *data,
304                             pgno_t pgno, uint8_t flags);
305 static void mdb_del_node(MDB_page *mp, indx_t indx);
306 static int  mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data);
307
308 static int               mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp);
309 static int               mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
310 static int               mdb_move_node(MDB_txn *txn, MDB_dbi dbi, 
311                                 MDB_pageparent *src, indx_t srcindx,
312                                 MDB_pageparent *dst, indx_t dstindx);
313 static int               mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src,
314                             MDB_pageparent *dst);
315 static int               mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp,
316                             unsigned int *newindxp, MDB_val *newkey,
317                             MDB_val *newdata, pgno_t newpgno);
318 static MDB_dpage *mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num);
319
320 static void              cursor_pop_page(MDB_cursor *cursor);
321 static MDB_ppage *cursor_push_page(MDB_cursor *cursor,
322                             MDB_page *mp);
323
324 static int               mdb_set_key(MDB_node *node, MDB_val *key);
325 static int               mdb_sibling(MDB_cursor *cursor, int move_right);
326 static int               mdb_cursor_next(MDB_cursor *cursor,
327                             MDB_val *key, MDB_val *data);
328 static int               mdb_cursor_set(MDB_cursor *cursor,
329                             MDB_val *key, MDB_val *data, int *exactp);
330 static int               mdb_cursor_first(MDB_cursor *cursor,
331                             MDB_val *key, MDB_val *data);
332
333 static size_t            mdb_leaf_size(MDB_env *env, MDB_val *key,
334                             MDB_val *data);
335 static size_t            mdb_branch_size(MDB_env *env, MDB_val *key);
336
337 static int               memncmp(const void *s1, size_t n1,
338                                  const void *s2, size_t n2);
339 static int               memnrcmp(const void *s1, size_t n1,
340                                   const void *s2, size_t n2);
341
342 static int
343 memncmp(const void *s1, size_t n1, const void *s2, size_t n2)
344 {
345         int diff, len_diff = -1;
346
347         if (n1 >= n2) {
348                 len_diff = (n1 > n2);
349                 n1 = n2;
350         }
351         diff = memcmp(s1, s2, n1);
352         return diff ? diff : len_diff;
353 }
354
355 static int
356 memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
357 {
358         const unsigned char     *p1, *p2, *p1_lim;
359
360         if (n2 == 0)
361                 return n1 != 0;
362         if (n1 == 0)
363                 return -1;
364
365         p1 = (const unsigned char *)s1 + n1 - 1;
366         p2 = (const unsigned char *)s2 + n2 - 1;
367
368         for (p1_lim = (n1 <= n2 ? s1 : s2);  *p1 == *p2;  p1--, p2--) {
369                 if (p1 == p1_lim)
370                         return (p1 != s1) ? (p1 != p2) : (p2 != s2) ? -1 : 0;
371         }
372         return *p1 - *p2;
373 }
374
375 int
376 mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
377 {
378         return txn->mt_dbxs[dbi].md_cmp(a, b);
379 }
380
381 static int
382 _mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *key1, const MDB_val *key2)
383 {
384         if (F_ISSET(txn->mt_dbs[dbi]->md_flags, MDB_REVERSEKEY))
385                 return memnrcmp(key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
386         else
387                 return memncmp((char *)key1->mv_data, key1->mv_size, key2->mv_data, key2->mv_size);
388 }
389
390 /* Allocate new page(s) for writing */
391 static MDB_dpage *
392 mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
393 {
394         MDB_dpage *dp;
395         pgno_t pgno = P_INVALID;
396
397         if (txn->mt_env->me_pghead) {
398                 ULONG oldest = txn->mt_txnid - 2;
399                 unsigned int i;
400                 for (i=0; i<txn->mt_env->me_txns->mt_numreaders; i++) {
401                         if (txn->mt_env->me_txns->mt_readers[i].mr_txnid < oldest)
402                                 oldest = txn->mt_env->me_txns->mt_readers[i].mr_txnid;
403                 }
404                 if (oldest > txn->mt_env->me_pghead->mo_txnid) {
405                         MDB_oldpages *mop = txn->mt_env->me_pghead;
406                         txn->mt_oldest = oldest;
407                         if (num > 1) {
408                                 /* FIXME: For now, always use fresh pages. We
409                                  * really ought to search the free list for a
410                                  * contiguous range.
411                                  */
412                                 ;
413                         } else {
414                                 /* peel pages off tail, so we only have to truncate the list */
415                                 pgno = MDB_IDL_LAST(mop->mo_pages);
416                                 if (MDB_IDL_IS_RANGE(mop->mo_pages)) {
417                                         mop->mo_pages[2]++;
418                                         if (mop->mo_pages[2] > mop->mo_pages[1])
419                                                 mop->mo_pages[0] = 0;
420                                 } else {
421                                         mop->mo_pages[0]--;
422                                 }
423                                 if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
424                                         txn->mt_env->me_pghead = mop->mo_next;
425                                         if (!txn->mt_env->me_pghead)
426                                                 txn->mt_env->me_pgtail = NULL;
427                                         free(mop);
428                                 }
429                         }
430                 }
431         }
432
433         if ((dp = malloc(txn->mt_env->me_meta.mm_psize * num + sizeof(MDB_dhead))) == NULL)
434                 return NULL;
435         dp->h.md_num = num;
436         dp->h.md_parent = parent;
437         dp->h.md_pi = parent_idx;
438         SIMPLEQ_INSERT_TAIL(txn->mt_u.dirty_queue, dp, h.md_next);
439         if (pgno == P_INVALID) {
440                 dp->p.mp_pgno = txn->mt_next_pgno;
441                 txn->mt_next_pgno += num;
442         } else {
443                 dp->p.mp_pgno = pgno;
444         }
445
446         return dp;
447 }
448
449 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
450  */
451 static int
452 mdb_touch(MDB_txn *txn, MDB_pageparent *pp)
453 {
454         MDB_page *mp = pp->mp_page;
455         pgno_t  pgno;
456         assert(txn != NULL);
457         assert(pp != NULL);
458
459         if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
460                 MDB_dpage *dp;
461                 if ((dp = mdb_alloc_page(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL)
462                         return ENOMEM;
463                 DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno);
464                 mdb_idl_insert(txn->mt_free_pgs, mp->mp_pgno);
465                 pgno = dp->p.mp_pgno;
466                 memcpy(&dp->p, mp, txn->mt_env->me_meta.mm_psize);
467                 mp = &dp->p;
468                 mp->mp_pgno = pgno;
469                 mp->mp_flags |= P_DIRTY;
470
471                 /* Update the page number to new touched page. */
472                 if (pp->mp_parent != NULL)
473                         NODEPGNO(NODEPTR(pp->mp_parent, pp->mp_pi)) = mp->mp_pgno;
474                 pp->mp_page = mp;
475         }
476         return 0;
477 }
478
479 int
480 mdbenv_sync(MDB_env *env)
481 {
482         int rc = 0;
483         if (!F_ISSET(env->me_flags, MDB_NOSYNC)) {
484                 if (fsync(env->me_fd))
485                         rc = errno;
486         }
487         return rc;
488 }
489
490 #define DBX_CHUNK       16      /* space for 16 DBs at a time */
491
492 int
493 mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
494 {
495         MDB_txn *txn;
496         int rc, toggle;
497
498         if ((txn = calloc(1, sizeof(*txn))) == NULL) {
499                 DPRINTF("calloc: %s", strerror(errno));
500                 return ENOMEM;
501         }
502
503         if (rdonly) {
504                 txn->mt_flags |= MDB_TXN_RDONLY;
505         } else {
506                 txn->mt_u.dirty_queue = calloc(1, sizeof(*txn->mt_u.dirty_queue));
507                 if (txn->mt_u.dirty_queue == NULL) {
508                         free(txn);
509                         return ENOMEM;
510                 }
511                 SIMPLEQ_INIT(txn->mt_u.dirty_queue);
512
513                 pthread_mutex_lock(&env->me_txns->mt_wmutex);
514                 env->me_txns->mt_txnid++;
515                 txn->mt_free_pgs = malloc(MDB_IDL_UM_SIZEOF);
516                 if (txn->mt_free_pgs == NULL) {
517                         free(txn->mt_u.dirty_queue);
518                         free(txn);
519                         return ENOMEM;
520                 }
521                 txn->mt_free_pgs[0] = 0;
522         }
523
524         txn->mt_txnid = env->me_txns->mt_txnid;
525         if (rdonly) {
526                 MDB_reader *r = pthread_getspecific(env->me_txkey);
527                 if (!r) {
528                         unsigned int i;
529                         pthread_mutex_lock(&env->me_txns->mt_mutex);
530                         for (i=0; i<env->me_maxreaders; i++) {
531                                 if (env->me_txns->mt_readers[i].mr_pid == 0) {
532                                         env->me_txns->mt_readers[i].mr_pid = getpid();
533                                         env->me_txns->mt_readers[i].mr_tid = pthread_self();
534                                         r = &env->me_txns->mt_readers[i];
535                                         pthread_setspecific(env->me_txkey, r);
536                                         if (i >= env->me_txns->mt_numreaders)
537                                                 env->me_txns->mt_numreaders = i+1;
538                                         break;
539                                 }
540                         }
541                         pthread_mutex_unlock(&env->me_txns->mt_mutex);
542                         if (i == env->me_maxreaders) {
543                                 return ENOSPC;
544                         }
545                 }
546                 r->mr_txnid = txn->mt_txnid;
547                 txn->mt_u.reader = r;
548         } else {
549                 env->me_txn = txn;
550         }
551
552         txn->mt_env = env;
553
554         if ((rc = mdbenv_read_meta(env, &toggle)) != MDB_SUCCESS) {
555                 mdb_txn_abort(txn);
556                 return rc;
557         }
558
559         /* Copy the DB arrays */
560         txn->mt_numdbs = env->me_numdbs;
561         rc = (txn->mt_numdbs % DBX_CHUNK) + 1;
562         txn->mt_dbxs = malloc(rc * DBX_CHUNK * sizeof(MDB_dbx));
563         txn->mt_dbs = malloc(rc * DBX_CHUNK * sizeof(MDB_db *));
564         memcpy(txn->mt_dbxs, env->me_dbxs, txn->mt_numdbs * sizeof(MDB_dbx));
565         memcpy(txn->mt_dbs, env->me_dbs, txn->mt_numdbs * sizeof(MDB_db *));
566
567         if (!rdonly) {
568                 memcpy(&txn->mt_db0, txn->mt_dbs[0], sizeof(txn->mt_db0));
569                 txn->mt_dbs[0] = &txn->mt_db0;
570                 if (toggle)
571                         txn->mt_flags |= MDB_TXN_METOGGLE;
572                 txn->mt_next_pgno = env->me_meta.mm_last_pg+1;
573         }
574
575         DPRINTF("begin transaction %lu on mdbenv %p, root page %lu",
576                 txn->mt_txnid, (void *) env, txn->mt_dbs[0]->md_root);
577
578         *ret = txn;
579         return MDB_SUCCESS;
580 }
581
582 void
583 mdb_txn_abort(MDB_txn *txn)
584 {
585         MDB_dpage *dp;
586         MDB_env *env;
587
588         if (txn == NULL)
589                 return;
590
591         env = txn->mt_env;
592         DPRINTF("abort transaction %lu on mdbenv %p, root page %lu",
593                 txn->mt_txnid, (void *) env, txn->mt_dbs[0]->md_root);
594
595         free(txn->mt_dbs);
596         free(txn->mt_dbxs);
597
598         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
599                 txn->mt_u.reader->mr_txnid = 0;
600         } else {
601                 /* Discard all dirty pages. Return any re-used pages
602                  * to the free list.
603                  */
604                 MDB_IDL_ZERO(txn->mt_free_pgs);
605                 while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
606                         dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
607                         SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
608                         if (dp->p.mp_pgno <= env->me_meta.mm_last_pg)
609                                 mdb_idl_insert(txn->mt_free_pgs, dp->p.mp_pgno);
610                         free(dp);
611                 }
612                 /* put back to head of free list */
613                 if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
614                         MDB_oldpages *mop;
615
616                         mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t));
617                         mop->mo_next = env->me_pghead;
618                         mop->mo_txnid = txn->mt_oldest - 1;
619                         if (!env->me_pghead) {
620                                 env->me_pgtail = mop;
621                         }
622                         env->me_pghead = mop;
623                         memcpy(mop->mo_pages, txn->mt_free_pgs, MDB_IDL_SIZEOF(txn->mt_free_pgs));
624                 }
625
626                 free(txn->mt_free_pgs);
627                 free(txn->mt_u.dirty_queue);
628                 env->me_txn = NULL;
629                 env->me_txns->mt_txnid--;
630                 pthread_mutex_unlock(&env->me_txns->mt_wmutex);
631         }
632
633         free(txn);
634 }
635
636 int
637 mdb_txn_commit(MDB_txn *txn)
638 {
639         int              n, done;
640         ssize_t          rc;
641         off_t            size;
642         MDB_dpage       *dp;
643         MDB_env *env;
644         pgno_t  next;
645         struct iovec     iov[MDB_COMMIT_PAGES];
646
647         assert(txn != NULL);
648         assert(txn->mt_env != NULL);
649
650         env = txn->mt_env;
651
652         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
653                 DPRINTF("attempt to commit read-only transaction");
654                 mdb_txn_abort(txn);
655                 return EPERM;
656         }
657
658         if (txn != env->me_txn) {
659                 DPRINTF("attempt to commit unknown transaction");
660                 mdb_txn_abort(txn);
661                 return EINVAL;
662         }
663
664         if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
665                 DPRINTF("error flag is set, can't commit");
666                 mdb_txn_abort(txn);
667                 return EINVAL;
668         }
669
670         if (SIMPLEQ_EMPTY(txn->mt_u.dirty_queue))
671                 goto done;
672
673         DPRINTF("committing transaction %lu on mdbenv %p, root page %lu",
674             txn->mt_txnid, (void *) env, txn->mt_dbs[0]->md_root);
675
676         /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
677          */
678         next = 0;
679         do {
680                 n = 0;
681                 done = 1;
682                 size = 0;
683                 SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
684                         if (dp->p.mp_pgno != next) {
685                                 if (n) {
686                                         DPRINTF("committing %u dirty pages", n);
687                                         rc = writev(env->me_fd, iov, n);
688                                         if (rc != size) {
689                                                 n = errno;
690                                                 if (rc > 0)
691                                                         DPRINTF("short write, filesystem full?");
692                                                 else
693                                                         DPRINTF("writev: %s", strerror(errno));
694                                                 mdb_txn_abort(txn);
695                                                 return n;
696                                         }
697                                         n = 0;
698                                         size = 0;
699                                 }
700                                 lseek(env->me_fd, dp->p.mp_pgno * env->me_meta.mm_psize, SEEK_SET);
701                                 next = dp->p.mp_pgno;
702                         }
703                         DPRINTF("committing page %lu", dp->p.mp_pgno);
704                         iov[n].iov_len = env->me_meta.mm_psize * dp->h.md_num;
705                         iov[n].iov_base = &dp->p;
706                         size += iov[n].iov_len;
707                         next = dp->p.mp_pgno + dp->h.md_num;
708                         /* clear dirty flag */
709                         dp->p.mp_flags &= ~P_DIRTY;
710                         if (++n >= MDB_COMMIT_PAGES) {
711                                 done = 0;
712                                 break;
713                         }
714                 }
715
716                 if (n == 0)
717                         break;
718
719                 DPRINTF("committing %u dirty pages", n);
720                 rc = writev(env->me_fd, iov, n);
721                 if (rc != size) {
722                         n = errno;
723                         if (rc > 0)
724                                 DPRINTF("short write, filesystem full?");
725                         else
726                                 DPRINTF("writev: %s", strerror(errno));
727                         mdb_txn_abort(txn);
728                         return n;
729                 }
730
731         } while (!done);
732
733         /* Drop the dirty pages.
734          */
735         while (!SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
736                 dp = SIMPLEQ_FIRST(txn->mt_u.dirty_queue);
737                 SIMPLEQ_REMOVE_HEAD(txn->mt_u.dirty_queue, h.md_next);
738                 free(dp);
739         }
740
741         if ((n = mdbenv_sync(env)) != 0 ||
742             (n = mdbenv_write_meta(txn)) != MDB_SUCCESS ||
743             (n = mdbenv_sync(env)) != 0) {
744                 mdb_txn_abort(txn);
745                 return n;
746         }
747         env->me_txn = NULL;
748
749         {
750                 MDB_dbx *p1 = env->me_dbxs;
751                 MDB_db **p2 = env->me_dbs;
752
753                 txn->mt_dbs[0] = env->me_dbs[0];
754                 env->me_dbxs = txn->mt_dbxs;
755                 env->me_dbs = txn->mt_dbs;
756                 env->me_numdbs = txn->mt_numdbs;
757
758                 free(p2);
759                 free(p1);
760         }
761
762         /* add to tail of free list */
763         if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
764                 MDB_oldpages *mop;
765
766                 mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(txn->mt_free_pgs) - sizeof(pgno_t));
767                 mop->mo_next = NULL;
768                 if (env->me_pghead) {
769                         env->me_pgtail->mo_next = mop;
770                 } else {
771                         env->me_pghead = mop;
772                 }
773                 env->me_pgtail = mop;
774                 memcpy(mop->mo_pages, txn->mt_free_pgs, MDB_IDL_SIZEOF(txn->mt_free_pgs));
775                 mop->mo_txnid = txn->mt_txnid;
776         }
777
778         pthread_mutex_unlock(&env->me_txns->mt_wmutex);
779         free(txn->mt_free_pgs);
780         free(txn->mt_u.dirty_queue);
781         free(txn);
782         txn = NULL;
783
784 done:
785         mdb_txn_abort(txn);
786
787         return MDB_SUCCESS;
788 }
789
790 static int
791 mdbenv_read_header(MDB_env *env)
792 {
793         char             page[PAGESIZE];
794         MDB_page        *p;
795         MDB_meta        *m;
796         int              rc;
797
798         assert(env != NULL);
799
800         /* We don't know the page size yet, so use a minimum value.
801          */
802
803         if ((rc = pread(env->me_fd, page, PAGESIZE, 0)) == 0) {
804                 return ENOENT;
805         } else if (rc != PAGESIZE) {
806                 if (rc > 0)
807                         errno = EINVAL;
808                 DPRINTF("read: %s", strerror(errno));
809                 return errno;
810         }
811
812         p = (MDB_page *)page;
813
814         if (!F_ISSET(p->mp_flags, P_META)) {
815                 DPRINTF("page %lu not a meta page", p->mp_pgno);
816                 return EINVAL;
817         }
818
819         m = METADATA(p);
820         if (m->mm_magic != MDB_MAGIC) {
821                 DPRINTF("meta has invalid magic");
822                 return EINVAL;
823         }
824
825         if (m->mm_version != MDB_VERSION) {
826                 DPRINTF("database is version %u, expected version %u",
827                     m->mm_version, MDB_VERSION);
828                 return EINVAL;
829         }
830
831         memcpy(&env->me_meta, m, sizeof(*m));
832         return 0;
833 }
834
835 static int
836 mdbenv_init_meta(MDB_env *env)
837 {
838         MDB_page *p, *q;
839         MDB_meta *meta;
840         int rc;
841         unsigned int     psize;
842
843         DPRINTF("writing new meta page");
844         psize = sysconf(_SC_PAGE_SIZE);
845
846         env->me_meta.mm_magic = MDB_MAGIC;
847         env->me_meta.mm_version = MDB_VERSION;
848         env->me_meta.mm_psize = psize;
849         env->me_meta.mm_flags = env->me_flags & 0xffff;
850         env->me_meta.mm_root = P_INVALID;
851         env->me_meta.mm_last_pg = 1;
852
853         p = calloc(2, psize);
854         p->mp_pgno = 0;
855         p->mp_flags = P_META;
856
857         meta = METADATA(p);
858         memcpy(meta, &env->me_meta, sizeof(*meta));
859
860         q = (MDB_page *)((char *)p + psize);
861
862         q->mp_pgno = 1;
863         q->mp_flags = P_META;
864
865         meta = METADATA(q);
866         memcpy(meta, &env->me_meta, sizeof(*meta));
867
868         rc = write(env->me_fd, p, psize * 2);
869         free(p);
870         return (rc == (int)psize * 2) ? MDB_SUCCESS : errno;
871 }
872
873 static int
874 mdbenv_write_meta(MDB_txn *txn)
875 {
876         MDB_env *env;
877         MDB_meta        meta;
878         off_t off;
879         int rc, len;
880         char *ptr;
881
882         assert(txn != NULL);
883         assert(txn->mt_env != NULL);
884
885         DPRINTF("writing meta page for root page %lu", txn->mt_dbs[0]->md_root);
886
887         env = txn->mt_env;
888
889         ptr = (char *)&meta;
890         off = offsetof(MDB_meta, mm_last_pg);
891         len = sizeof(MDB_meta) - off;
892
893         ptr += off;
894         meta.mm_last_pg = txn->mt_next_pgno - 1;
895         meta.mm_txnid = txn->mt_txnid;
896         meta.mm_psize = env->me_meta.mm_psize;
897         meta.mm_flags = env->me_meta.mm_flags;
898         meta.mm_depth = txn->mt_dbs[0]->md_depth;
899         meta.mm_branch_pages = txn->mt_dbs[0]->md_branch_pages;
900         meta.mm_leaf_pages = txn->mt_dbs[0]->md_leaf_pages;
901         meta.mm_overflow_pages = txn->mt_dbs[0]->md_overflow_pages;
902         meta.mm_entries = txn->mt_dbs[0]->md_entries;
903         meta.mm_root = txn->mt_dbs[0]->md_root;
904
905         if (!F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE))
906                 off += env->me_meta.mm_psize;
907         off += PAGEHDRSZ;
908
909         lseek(env->me_fd, off, SEEK_SET);
910         rc = write(env->me_fd, ptr, len);
911         if (rc != len) {
912                 DPRINTF("write failed, disk error?");
913                 return errno;
914         }
915
916         return MDB_SUCCESS;
917 }
918
919 /* Returns true if page p is a valid meta page, false otherwise.
920  */
921 static int
922 mdb_check_meta_page(MDB_page *p)
923 {
924         if (!F_ISSET(p->mp_flags, P_META)) {
925                 DPRINTF("page %lu not a meta page", p->mp_pgno);
926                 return EINVAL;
927         }
928
929         return 0;
930 }
931
932 static int
933 mdbenv_read_meta(MDB_env *env, int *which)
934 {
935         MDB_page        *mp0, *mp1;
936         MDB_meta        *meta[2];
937         int toggle = 0, rc;
938
939         assert(env != NULL);
940
941         if ((mp0 = mdbenv_get_page(env, 0)) == NULL ||
942                 (mp1 = mdbenv_get_page(env, 1)) == NULL)
943                 return EIO;
944
945         rc = mdb_check_meta_page(mp0);
946         if (rc) return rc;
947
948         rc = mdb_check_meta_page(mp1);
949         if (rc) return rc;
950
951         meta[0] = METADATA(mp0);
952         meta[1] = METADATA(mp1);
953
954         if (meta[0]->mm_txnid < meta[1]->mm_txnid)
955                 toggle = 1;
956
957         if (meta[toggle]->mm_txnid > env->me_meta.mm_txnid) {
958                 memcpy(&env->me_meta, meta[toggle], sizeof(env->me_meta));
959                 if (which)
960                         *which = toggle;
961         }
962
963         DPRINTF("Using meta page %d", toggle);
964
965         return MDB_SUCCESS;
966 }
967
968 int
969 mdbenv_create(MDB_env **env)
970 {
971         MDB_env *e;
972
973         e = calloc(1, sizeof(*e));
974         if (!e) return ENOMEM;
975
976         e->me_meta.mm_mapsize = DEFAULT_MAPSIZE;
977         e->me_maxreaders = DEFAULT_READERS;
978         e->me_fd = -1;
979         e->me_lfd = -1;
980         *env = e;
981         return MDB_SUCCESS;
982 }
983
984 int
985 mdbenv_set_mapsize(MDB_env *env, size_t size)
986 {
987         if (env->me_map)
988                 return EINVAL;
989         env->me_mapsize = env->me_meta.mm_mapsize = size;
990         return MDB_SUCCESS;
991 }
992
993 int
994 mdbenv_set_maxreaders(MDB_env *env, int readers)
995 {
996         env->me_maxreaders = readers;
997         return MDB_SUCCESS;
998 }
999
1000 int
1001 mdbenv_get_maxreaders(MDB_env *env, int *readers)
1002 {
1003         if (!env || !readers)
1004                 return EINVAL;
1005         *readers = env->me_maxreaders;
1006         return MDB_SUCCESS;
1007 }
1008
1009 int
1010 mdbenv_open2(MDB_env *env, unsigned int flags)
1011 {
1012         int i, newenv = 0;
1013
1014         env->me_flags = flags;
1015
1016         if ((i = mdbenv_read_header(env)) != 0) {
1017                 if (i != ENOENT)
1018                         return i;
1019                 DPRINTF("new mdbenv");
1020                 newenv = 1;
1021         }
1022
1023         if (!env->me_mapsize)
1024                 env->me_mapsize = env->me_meta.mm_mapsize;
1025
1026         i = MAP_SHARED;
1027         if (env->me_meta.mm_address && (flags & MDB_FIXEDMAP))
1028                 i |= MAP_FIXED;
1029         env->me_map = mmap(env->me_meta.mm_address, env->me_mapsize, PROT_READ, i,
1030                 env->me_fd, 0);
1031         if (env->me_map == MAP_FAILED)
1032                 return errno;
1033
1034         if (newenv) {
1035                 env->me_meta.mm_mapsize = env->me_mapsize;
1036                 if (flags & MDB_FIXEDMAP)
1037                         env->me_meta.mm_address = env->me_map;
1038                 i = mdbenv_init_meta(env);
1039                 if (i != MDB_SUCCESS) {
1040                         munmap(env->me_map, env->me_mapsize);
1041                         return i;
1042                 }
1043         }
1044
1045         if ((i = mdbenv_read_meta(env, NULL)) != 0)
1046                 return i;
1047
1048         DPRINTF("opened database version %u, pagesize %u",
1049             env->me_meta.mm_version, env->me_meta.mm_psize);
1050         DPRINTF("depth: %u", env->me_meta.mm_depth);
1051         DPRINTF("entries: %lu", env->me_meta.mm_entries);
1052         DPRINTF("branch pages: %lu", env->me_meta.mm_branch_pages);
1053         DPRINTF("leaf pages: %lu", env->me_meta.mm_leaf_pages);
1054         DPRINTF("overflow pages: %lu", env->me_meta.mm_overflow_pages);
1055         DPRINTF("root: %lu", env->me_meta.mm_root);
1056
1057         return MDB_SUCCESS;
1058 }
1059
1060 static void
1061 mdbenv_reader_dest(void *ptr)
1062 {
1063         MDB_reader *reader = ptr;
1064
1065         reader->mr_txnid = 0;
1066         reader->mr_pid = 0;
1067         reader->mr_tid = 0;
1068 }
1069
1070 static void
1071 mdbenv_share_locks(MDB_env *env)
1072 {
1073         struct flock lock_info;
1074
1075         env->me_txns->mt_txnid = env->me_meta.mm_txnid;
1076
1077         memset((void *)&lock_info, 0, sizeof(lock_info));
1078         lock_info.l_type = F_RDLCK;
1079         lock_info.l_whence = SEEK_SET;
1080         lock_info.l_start = 0;
1081         lock_info.l_len = 1;
1082         fcntl(env->me_lfd, F_SETLK, &lock_info);
1083 }
1084
1085 static int
1086 mdbenv_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
1087 {
1088         int rc;
1089         off_t size, rsize;
1090         struct flock lock_info;
1091
1092         *excl = 0;
1093
1094         if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT, mode)) == -1) {
1095                 rc = errno;
1096                 return rc;
1097         }
1098         /* Try to get exclusive lock. If we succeed, then
1099          * nobody is using the lock region and we should initialize it.
1100          */
1101         memset((void *)&lock_info, 0, sizeof(lock_info));
1102         lock_info.l_type = F_WRLCK;
1103         lock_info.l_whence = SEEK_SET;
1104         lock_info.l_start = 0;
1105         lock_info.l_len = 1;
1106         rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
1107         if (rc == 0) {
1108                 *excl = 1;
1109         } else {
1110                 lock_info.l_type = F_RDLCK;
1111                 rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
1112                 if (rc) {
1113                         rc = errno;
1114                         goto fail;
1115                 }
1116         }
1117         size = lseek(env->me_lfd, 0, SEEK_END);
1118         rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
1119         if (size < rsize && *excl) {
1120                 if (ftruncate(env->me_lfd, rsize) != 0) {
1121                         rc = errno;
1122                         goto fail;
1123                 }
1124         } else {
1125                 rsize = size;
1126                 size = rsize - sizeof(MDB_txninfo);
1127                 env->me_maxreaders = size/sizeof(MDB_reader) + 1;
1128         }
1129         env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
1130                 env->me_lfd, 0);
1131         if (env->me_txns == MAP_FAILED) {
1132                 rc = errno;
1133                 goto fail;
1134         }
1135         if (*excl) {
1136                 pthread_mutexattr_t mattr;
1137
1138                 pthread_mutexattr_init(&mattr);
1139                 pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
1140                 pthread_mutex_init(&env->me_txns->mt_mutex, &mattr);
1141                 pthread_mutex_init(&env->me_txns->mt_wmutex, &mattr);
1142                 env->me_txns->mt_version = MDB_VERSION;
1143                 env->me_txns->mt_magic = MDB_MAGIC;
1144                 env->me_txns->mt_txnid = 0;
1145                 env->me_txns->mt_numreaders = 0;
1146
1147         } else {
1148                 if (env->me_txns->mt_magic != MDB_MAGIC) {
1149                         DPRINTF("lock region has invalid magic");
1150                         errno = EINVAL;
1151                 }
1152                 if (env->me_txns->mt_version != MDB_VERSION) {
1153                         DPRINTF("lock region is version %u, expected version %u",
1154                                 env->me_txns->mt_version, MDB_VERSION);
1155                         errno = EINVAL;
1156                 }
1157                 if (errno != EACCES && errno != EAGAIN) {
1158                         rc = errno;
1159                         goto fail;
1160                 }
1161         }
1162         return MDB_SUCCESS;
1163
1164 fail:
1165         close(env->me_lfd);
1166         return rc;
1167
1168 }
1169
1170 int
1171 mdbenv_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
1172 {
1173         int             oflags, rc, len, excl;
1174         char *lpath, *dpath;
1175
1176         len = strlen(path);
1177         lpath = malloc(len + sizeof("/lock.mdb") + len + sizeof("/data.db"));
1178         if (!lpath)
1179                 return ENOMEM;
1180         dpath = lpath + len + sizeof("/lock.mdb");
1181         sprintf(lpath, "%s/lock.mdb", path);
1182         sprintf(dpath, "%s/data.mdb", path);
1183
1184         rc = mdbenv_setup_locks(env, lpath, mode, &excl);
1185         if (rc)
1186                 goto leave;
1187
1188         if (F_ISSET(flags, MDB_RDONLY))
1189                 oflags = O_RDONLY;
1190         else
1191                 oflags = O_RDWR | O_CREAT;
1192
1193         if ((env->me_fd = open(dpath, oflags, mode)) == -1)
1194                 return errno;
1195
1196         if ((rc = mdbenv_open2(env, flags)) != MDB_SUCCESS) {
1197                 close(env->me_fd);
1198                 env->me_fd = -1;
1199         } else {
1200                 env->me_path = strdup(path);
1201                 DPRINTF("opened dbenv %p", (void *) env);
1202                 pthread_key_create(&env->me_txkey, mdbenv_reader_dest);
1203                 if (excl)
1204                         mdbenv_share_locks(env);
1205                 env->me_dbxs = calloc(DBX_CHUNK, sizeof(MDB_dbx));
1206                 env->me_dbs = calloc(DBX_CHUNK, sizeof(MDB_db *));
1207                 env->me_numdbs = 1;
1208                 env->me_dbs[0] = (MDB_db *)&env->me_meta.mm_psize;
1209         }
1210
1211
1212 leave:
1213         free(lpath);
1214         return rc;
1215 }
1216
1217 void
1218 mdbenv_close(MDB_env *env)
1219 {
1220         if (env == NULL)
1221                 return;
1222
1223         free(env->me_dbs);
1224         free(env->me_dbxs);
1225         free(env->me_path);
1226
1227         if (env->me_map) {
1228                 munmap(env->me_map, env->me_mapsize);
1229         }
1230         close(env->me_fd);
1231         if (env->me_txns) {
1232                 size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
1233                 munmap(env->me_txns, size);
1234         }
1235         close(env->me_lfd);
1236         free(env);
1237 }
1238
1239 /* Search for key within a leaf page, using binary search.
1240  * Returns the smallest entry larger or equal to the key.
1241  * If exactp is non-null, stores whether the found entry was an exact match
1242  * in *exactp (1 or 0).
1243  * If kip is non-null, stores the index of the found entry in *kip.
1244  * If no entry larger or equal to the key is found, returns NULL.
1245  */
1246 static MDB_node *
1247 mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key,
1248     int *exactp, unsigned int *kip)
1249 {
1250         unsigned int     i = 0;
1251         int              low, high;
1252         int              rc = 0;
1253         MDB_node        *node;
1254         MDB_val  nodekey;
1255
1256         DPRINTF("searching %u keys in %s page %lu",
1257             NUMKEYS(mp),
1258             IS_LEAF(mp) ? "leaf" : "branch",
1259             mp->mp_pgno);
1260
1261         assert(NUMKEYS(mp) > 0);
1262
1263         memset(&nodekey, 0, sizeof(nodekey));
1264
1265         low = IS_LEAF(mp) ? 0 : 1;
1266         high = NUMKEYS(mp) - 1;
1267         while (low <= high) {
1268                 i = (low + high) >> 1;
1269                 node = NODEPTR(mp, i);
1270
1271                 nodekey.mv_size = node->mn_ksize;
1272                 nodekey.mv_data = NODEKEY(node);
1273
1274                 if (txn->mt_dbxs[dbi].md_cmp)
1275                         rc = txn->mt_dbxs[dbi].md_cmp(key, &nodekey);
1276                 else
1277                         rc = _mdb_cmp(txn, dbi, key, &nodekey);
1278
1279                 if (IS_LEAF(mp))
1280                         DPRINTF("found leaf index %u [%.*s], rc = %i",
1281                             i, (int)nodekey.mv_size, (char *)nodekey.mv_data, rc);
1282                 else
1283                         DPRINTF("found branch index %u [%.*s -> %lu], rc = %i",
1284                             i, (int)node->mn_ksize, (char *)NODEKEY(node),
1285                             node->mn_pgno, rc);
1286
1287                 if (rc == 0)
1288                         break;
1289                 if (rc > 0)
1290                         low = i + 1;
1291                 else
1292                         high = i - 1;
1293         }
1294
1295         if (rc > 0) {   /* Found entry is less than the key. */
1296                 i++;    /* Skip to get the smallest entry larger than key. */
1297                 if (i >= NUMKEYS(mp))
1298                         /* There is no entry larger or equal to the key. */
1299                         return NULL;
1300         }
1301         if (exactp)
1302                 *exactp = (rc == 0);
1303         if (kip)        /* Store the key index if requested. */
1304                 *kip = i;
1305
1306         return NODEPTR(mp, i);
1307 }
1308
1309 static void
1310 cursor_pop_page(MDB_cursor *cursor)
1311 {
1312         MDB_ppage       *top;
1313
1314         top = CURSOR_TOP(cursor);
1315         CURSOR_POP(cursor);
1316
1317         DPRINTF("popped page %lu off cursor %p", top->mp_page->mp_pgno, (void *) cursor);
1318
1319         free(top);
1320 }
1321
1322 static MDB_ppage *
1323 cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
1324 {
1325         MDB_ppage       *ppage;
1326
1327         DPRINTF("pushing page %lu on cursor %p", mp->mp_pgno, (void *) cursor);
1328
1329         if ((ppage = calloc(1, sizeof(*ppage))) == NULL)
1330                 return NULL;
1331         ppage->mp_page = mp;
1332         CURSOR_PUSH(cursor, ppage);
1333         return ppage;
1334 }
1335
1336 static MDB_page *
1337 mdbenv_get_page(MDB_env *env, pgno_t pgno)
1338 {
1339         MDB_page *p = NULL;
1340         MDB_txn *txn = env->me_txn;
1341         int found = 0;
1342
1343         if (txn && !SIMPLEQ_EMPTY(txn->mt_u.dirty_queue)) {
1344                 MDB_dpage *dp;
1345                 SIMPLEQ_FOREACH(dp, txn->mt_u.dirty_queue, h.md_next) {
1346                         if (dp->p.mp_pgno == pgno) {
1347                                 p = &dp->p;
1348                                 found = 1;
1349                                 break;
1350                         }
1351                 }
1352         }
1353         if (!found) {
1354                 p = (MDB_page *)(env->me_map + env->me_meta.mm_psize * pgno);
1355         }
1356         return p;
1357 }
1358
1359 static int
1360 mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
1361     MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1362 {
1363         MDB_page        *mp = mpp->mp_page;
1364         int rc;
1365
1366         if (cursor && cursor_push_page(cursor, mp) == NULL)
1367                 return MDB_FAIL;
1368
1369         while (IS_BRANCH(mp)) {
1370                 unsigned int     i = 0;
1371                 MDB_node        *node;
1372
1373                 DPRINTF("branch page %lu has %u keys", mp->mp_pgno, NUMKEYS(mp));
1374                 assert(NUMKEYS(mp) > 1);
1375                 DPRINTF("found index 0 to page %lu", NODEPGNO(NODEPTR(mp, 0)));
1376
1377                 if (key == NULL)        /* Initialize cursor to first page. */
1378                         i = 0;
1379                 else {
1380                         int      exact;
1381                         node = mdb_search_node(txn, dbi, mp, key, &exact, &i);
1382                         if (node == NULL)
1383                                 i = NUMKEYS(mp) - 1;
1384                         else if (!exact) {
1385                                 assert(i > 0);
1386                                 i--;
1387                         }
1388                 }
1389
1390                 if (key)
1391                         DPRINTF("following index %u for key %.*s",
1392                             i, (int)key->mv_size, (char *)key->mv_data);
1393                 assert(i < NUMKEYS(mp));
1394                 node = NODEPTR(mp, i);
1395
1396                 if (cursor)
1397                         CURSOR_TOP(cursor)->mp_ki = i;
1398
1399                 mpp->mp_parent = mp;
1400                 if ((mp = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
1401                         return MDB_FAIL;
1402                 mpp->mp_pi = i;
1403                 mpp->mp_page = mp;
1404
1405                 if (cursor && cursor_push_page(cursor, mp) == NULL)
1406                         return MDB_FAIL;
1407
1408                 if (modify) {
1409                         MDB_dhead *dh = ((MDB_dhead *)mp)-1;
1410                         if ((rc = mdb_touch(txn, mpp)) != 0)
1411                                 return rc;
1412                         dh = ((MDB_dhead *)mpp->mp_page)-1;
1413                         dh->md_parent = mpp->mp_parent;
1414                         dh->md_pi = mpp->mp_pi;
1415                 }
1416
1417                 mp = mpp->mp_page;
1418         }
1419
1420         if (!IS_LEAF(mp)) {
1421                 DPRINTF("internal error, index points to a %02X page!?",
1422                     mp->mp_flags);
1423                 return MDB_FAIL;
1424         }
1425
1426         DPRINTF("found leaf page %lu for key %.*s", mp->mp_pgno,
1427             key ? (int)key->mv_size : 0, key ? (char *)key->mv_data : NULL);
1428
1429         return MDB_SUCCESS;
1430 }
1431
1432 /* Search for the page a given key should be in.
1433  * Stores a pointer to the found page in *mpp.
1434  * If key is NULL, search for the lowest page (used by mdb_cursor_first).
1435  * If cursor is non-null, pushes parent pages on the cursor stack.
1436  * If modify is true, visited pages are updated with new page numbers.
1437  */
1438 static int
1439 mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
1440     MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1441 {
1442         int              rc;
1443         pgno_t           root;
1444
1445         /* Choose which root page to start with. If a transaction is given
1446          * use the root page from the transaction, otherwise read the last
1447          * committed root page.
1448          */
1449         if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
1450                 DPRINTF("transaction has failed, must abort");
1451                 return EINVAL;
1452         } else
1453                 root = txn->mt_dbs[dbi]->md_root;
1454
1455         if (root == P_INVALID) {                /* Tree is empty. */
1456                 DPRINTF("tree is empty");
1457                 return ENOENT;
1458         }
1459
1460         if ((mpp->mp_page = mdbenv_get_page(txn->mt_env, root)) == NULL)
1461                 return MDB_FAIL;
1462
1463         DPRINTF("root page has flags 0x%X", mpp->mp_page->mp_flags);
1464
1465         if (modify && !F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
1466                 mpp->mp_parent = NULL;
1467                 mpp->mp_pi = 0;
1468                 if ((rc = mdb_touch(txn, mpp)))
1469                         return rc;
1470                 txn->mt_dbs[dbi]->md_root = mpp->mp_page->mp_pgno;
1471         }
1472
1473         return mdb_search_page_root(txn, dbi, key, cursor, modify, mpp);
1474 }
1475
1476 static int
1477 mdb_read_data(MDB_env *env, MDB_node *leaf, MDB_val *data)
1478 {
1479         MDB_page        *omp;           /* overflow mpage */
1480         pgno_t           pgno;
1481
1482         if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
1483                 data->mv_size = leaf->mn_dsize;
1484                 data->mv_data = NODEDATA(leaf);
1485                 return MDB_SUCCESS;
1486         }
1487
1488         /* Read overflow data.
1489          */
1490         data->mv_size = leaf->mn_dsize;
1491         memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
1492         if ((omp = mdbenv_get_page(env, pgno)) == NULL) {
1493                 DPRINTF("read overflow page %lu failed", pgno);
1494                 return MDB_FAIL;
1495         }
1496         data->mv_data = omp;
1497
1498         return MDB_SUCCESS;
1499 }
1500
1501 int
1502 mdb_get(MDB_txn *txn, MDB_dbi dbi,
1503     MDB_val *key, MDB_val *data)
1504 {
1505         int              rc, exact;
1506         MDB_node        *leaf;
1507         MDB_pageparent mpp;
1508
1509         assert(key);
1510         assert(data);
1511         DPRINTF("===> get key [%.*s]", (int)key->mv_size, (char *)key->mv_data);
1512
1513         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
1514                 return EINVAL;
1515         }
1516
1517         if ((rc = mdb_search_page(txn, dbi, key, NULL, 0, &mpp)) != MDB_SUCCESS)
1518                 return rc;
1519
1520         leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
1521         if (leaf && exact)
1522                 rc = mdb_read_data(txn->mt_env, leaf, data);
1523         else {
1524                 rc = ENOENT;
1525         }
1526
1527         return rc;
1528 }
1529
1530 static int
1531 mdb_sibling(MDB_cursor *cursor, int move_right)
1532 {
1533         int              rc;
1534         MDB_node        *indx;
1535         MDB_ppage       *parent, *top;
1536         MDB_page        *mp;
1537
1538         top = CURSOR_TOP(cursor);
1539         if ((parent = SLIST_NEXT(top, mp_entry)) == NULL) {
1540                 return ENOENT;          /* root has no siblings */
1541         }
1542
1543         DPRINTF("parent page is page %lu, index %u",
1544             parent->mp_page->mp_pgno, parent->mp_ki);
1545
1546         cursor_pop_page(cursor);
1547         if (move_right ? (parent->mp_ki + 1 >= NUMKEYS(parent->mp_page))
1548                        : (parent->mp_ki == 0)) {
1549                 DPRINTF("no more keys left, moving to %s sibling",
1550                     move_right ? "right" : "left");
1551                 if ((rc = mdb_sibling(cursor, move_right)) != MDB_SUCCESS)
1552                         return rc;
1553                 parent = CURSOR_TOP(cursor);
1554         } else {
1555                 if (move_right)
1556                         parent->mp_ki++;
1557                 else
1558                         parent->mp_ki--;
1559                 DPRINTF("just moving to %s index key %u",
1560                     move_right ? "right" : "left", parent->mp_ki);
1561         }
1562         assert(IS_BRANCH(parent->mp_page));
1563
1564         indx = NODEPTR(parent->mp_page, parent->mp_ki);
1565         if ((mp = mdbenv_get_page(cursor->mc_txn->mt_env, indx->mn_pgno)) == NULL)
1566                 return MDB_FAIL;
1567 #if 0
1568         mp->parent = parent->mp_page;
1569         mp->parent_index = parent->mp_ki;
1570 #endif
1571
1572         cursor_push_page(cursor, mp);
1573
1574         return MDB_SUCCESS;
1575 }
1576
1577 static int
1578 mdb_set_key(MDB_node *node, MDB_val *key)
1579 {
1580         if (key == NULL)
1581                 return 0;
1582
1583         key->mv_size = node->mn_ksize;
1584         key->mv_data = NODEKEY(node);
1585
1586         return 0;
1587 }
1588
1589 static int
1590 mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
1591 {
1592         MDB_ppage       *top;
1593         MDB_page        *mp;
1594         MDB_node        *leaf;
1595
1596         if (cursor->mc_eof) {
1597                 return ENOENT;
1598         }
1599
1600         assert(cursor->mc_initialized);
1601
1602         top = CURSOR_TOP(cursor);
1603         mp = top->mp_page;
1604
1605         DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
1606
1607         if (top->mp_ki + 1 >= NUMKEYS(mp)) {
1608                 DPRINTF("=====> move to next sibling page");
1609                 if (mdb_sibling(cursor, 1) != MDB_SUCCESS) {
1610                         cursor->mc_eof = 1;
1611                         return ENOENT;
1612                 }
1613                 top = CURSOR_TOP(cursor);
1614                 mp = top->mp_page;
1615                 DPRINTF("next page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
1616         } else
1617                 top->mp_ki++;
1618
1619         DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
1620             mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
1621
1622         assert(IS_LEAF(mp));
1623         leaf = NODEPTR(mp, top->mp_ki);
1624
1625         if (data && mdb_read_data(cursor->mc_txn->mt_env, leaf, data) != MDB_SUCCESS)
1626                 return MDB_FAIL;
1627
1628         return mdb_set_key(leaf, key);
1629 }
1630
1631 static int
1632 mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
1633     int *exactp)
1634 {
1635         int              rc;
1636         MDB_node        *leaf;
1637         MDB_ppage       *top;
1638         MDB_pageparent mpp;
1639
1640         assert(cursor);
1641         assert(key);
1642         assert(key->mv_size > 0);
1643
1644         rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp);
1645         if (rc != MDB_SUCCESS)
1646                 return rc;
1647         assert(IS_LEAF(mpp.mp_page));
1648
1649         top = CURSOR_TOP(cursor);
1650         leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki);
1651         if (exactp != NULL && !*exactp) {
1652                 /* MDB_CURSOR_EXACT specified and not an exact match. */
1653                 return ENOENT;
1654         }
1655
1656         if (leaf == NULL) {
1657                 DPRINTF("===> inexact leaf not found, goto sibling");
1658                 if ((rc = mdb_sibling(cursor, 1)) != MDB_SUCCESS)
1659                         return rc;              /* no entries matched */
1660                 top = CURSOR_TOP(cursor);
1661                 top->mp_ki = 0;
1662                 mpp.mp_page = top->mp_page;
1663                 assert(IS_LEAF(mpp.mp_page));
1664                 leaf = NODEPTR(mpp.mp_page, 0);
1665         }
1666
1667         cursor->mc_initialized = 1;
1668         cursor->mc_eof = 0;
1669
1670         if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
1671                 return rc;
1672
1673         rc = mdb_set_key(leaf, key);
1674         if (rc == MDB_SUCCESS) {
1675                 DPRINTF("==> cursor placed on key %.*s",
1676                         (int)key->mv_size, (char *)key->mv_data);
1677                 ;
1678         }
1679
1680         return rc;
1681 }
1682
1683 static int
1684 mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
1685 {
1686         int              rc;
1687         MDB_pageparent  mpp;
1688         MDB_node        *leaf;
1689
1690         rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp);
1691         if (rc != MDB_SUCCESS)
1692                 return rc;
1693         assert(IS_LEAF(mpp.mp_page));
1694
1695         leaf = NODEPTR(mpp.mp_page, 0);
1696         cursor->mc_initialized = 1;
1697         cursor->mc_eof = 0;
1698
1699         if (data && (rc = mdb_read_data(cursor->mc_txn->mt_env, leaf, data)) != MDB_SUCCESS)
1700                 return rc;
1701
1702         return mdb_set_key(leaf, key);
1703 }
1704
1705 int
1706 mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
1707     MDB_cursor_op op)
1708 {
1709         int              rc;
1710         int              exact = 0;
1711
1712         assert(cursor);
1713
1714         switch (op) {
1715         case MDB_CURSOR:
1716         case MDB_CURSOR_EXACT:
1717                 while (CURSOR_TOP(cursor) != NULL)
1718                         cursor_pop_page(cursor);
1719                 if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
1720                         rc = EINVAL;
1721                 } else if (op == MDB_CURSOR_EXACT)
1722                         rc = mdb_cursor_set(cursor, key, data, &exact);
1723                 else
1724                         rc = mdb_cursor_set(cursor, key, data, NULL);
1725                 break;
1726         case MDB_NEXT:
1727                 if (!cursor->mc_initialized)
1728                         rc = mdb_cursor_first(cursor, key, data);
1729                 else
1730                         rc = mdb_cursor_next(cursor, key, data);
1731                 break;
1732         case MDB_FIRST:
1733                 while (CURSOR_TOP(cursor) != NULL)
1734                         cursor_pop_page(cursor);
1735                 rc = mdb_cursor_first(cursor, key, data);
1736                 break;
1737         default:
1738                 DPRINTF("unhandled/unimplemented cursor operation %u", op);
1739                 rc = EINVAL;
1740                 break;
1741         }
1742
1743         return rc;
1744 }
1745
1746 /* Allocate a page and initialize it
1747  */
1748 static MDB_dpage *
1749 mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num)
1750 {
1751         MDB_dpage       *dp;
1752
1753         if ((dp = mdb_alloc_page(txn, NULL, 0, num)) == NULL)
1754                 return NULL;
1755         DPRINTF("allocated new mpage %lu, page size %u",
1756             dp->p.mp_pgno, txn->mt_env->me_meta.mm_psize);
1757         dp->p.mp_flags = flags | P_DIRTY;
1758         dp->p.mp_lower = PAGEHDRSZ;
1759         dp->p.mp_upper = txn->mt_env->me_meta.mm_psize;
1760
1761         if (IS_BRANCH(&dp->p))
1762                 txn->mt_dbs[dbi]->md_branch_pages++;
1763         else if (IS_LEAF(&dp->p))
1764                 txn->mt_dbs[dbi]->md_leaf_pages++;
1765         else if (IS_OVERFLOW(&dp->p)) {
1766                 txn->mt_dbs[dbi]->md_overflow_pages += num;
1767                 dp->p.mp_pages = num;
1768         }
1769
1770         return dp;
1771 }
1772
1773 static size_t
1774 mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data)
1775 {
1776         size_t           sz;
1777
1778         sz = LEAFSIZE(key, data);
1779         if (data->mv_size >= env->me_meta.mm_psize / MDB_MINKEYS) {
1780                 /* put on overflow page */
1781                 sz -= data->mv_size - sizeof(pgno_t);
1782         }
1783
1784         return sz + sizeof(indx_t);
1785 }
1786
1787 static size_t
1788 mdb_branch_size(MDB_env *env, MDB_val *key)
1789 {
1790         size_t           sz;
1791
1792         sz = INDXSIZE(key);
1793         if (sz >= env->me_meta.mm_psize / MDB_MINKEYS) {
1794                 /* put on overflow page */
1795                 /* not implemented */
1796                 /* sz -= key->size - sizeof(pgno_t); */
1797         }
1798
1799         return sz + sizeof(indx_t);
1800 }
1801
1802 static int
1803 mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx,
1804     MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags)
1805 {
1806         unsigned int     i;
1807         size_t           node_size = NODESIZE;
1808         indx_t           ofs;
1809         MDB_node        *node;
1810         MDB_dpage       *ofp = NULL;            /* overflow page */
1811
1812         assert(mp->mp_upper >= mp->mp_lower);
1813
1814         DPRINTF("add node [%.*s] to %s page %lu at index %i, key size %zu",
1815             key ? (int)key->mv_size : 0, key ? (char *)key->mv_data : NULL,
1816             IS_LEAF(mp) ? "leaf" : "branch",
1817             mp->mp_pgno, indx, key ? key->mv_size : 0);
1818
1819         if (key != NULL)
1820                 node_size += key->mv_size;
1821
1822         if (IS_LEAF(mp)) {
1823                 assert(data);
1824                 if (F_ISSET(flags, F_BIGDATA)) {
1825                         /* Data already on overflow page. */
1826                         node_size += sizeof(pgno_t);
1827                 } else if (data->mv_size >= txn->mt_env->me_meta.mm_psize / MDB_MINKEYS) {
1828                         int ovpages = OVPAGES(data->mv_size, txn->mt_env->me_meta.mm_psize);
1829                         /* Put data on overflow page. */
1830                         DPRINTF("data size is %zu, put on overflow page",
1831                             data->mv_size);
1832                         node_size += sizeof(pgno_t);
1833                         if ((ofp = mdb_new_page(txn, dbi, P_OVERFLOW, ovpages)) == NULL)
1834                                 return MDB_FAIL;
1835                         DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno);
1836                         flags |= F_BIGDATA;
1837                 } else {
1838                         node_size += data->mv_size;
1839                 }
1840         }
1841
1842         if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
1843                 DPRINTF("not enough room in page %lu, got %u ptrs",
1844                     mp->mp_pgno, NUMKEYS(mp));
1845                 DPRINTF("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower,
1846                     mp->mp_upper - mp->mp_lower);
1847                 DPRINTF("node size = %zu", node_size);
1848                 return ENOSPC;
1849         }
1850
1851         /* Move higher pointers up one slot. */
1852         for (i = NUMKEYS(mp); i > indx; i--)
1853                 mp->mp_ptrs[i] = mp->mp_ptrs[i - 1];
1854
1855         /* Adjust free space offsets. */
1856         ofs = mp->mp_upper - node_size;
1857         assert(ofs >= mp->mp_lower + sizeof(indx_t));
1858         mp->mp_ptrs[indx] = ofs;
1859         mp->mp_upper = ofs;
1860         mp->mp_lower += sizeof(indx_t);
1861
1862         /* Write the node data. */
1863         node = NODEPTR(mp, indx);
1864         node->mn_ksize = (key == NULL) ? 0 : key->mv_size;
1865         node->mn_flags = flags;
1866         if (IS_LEAF(mp))
1867                 node->mn_dsize = data->mv_size;
1868         else
1869                 node->mn_pgno = pgno;
1870
1871         if (key)
1872                 memcpy(NODEKEY(node), key->mv_data, key->mv_size);
1873
1874         if (IS_LEAF(mp)) {
1875                 assert(key);
1876                 if (ofp == NULL) {
1877                         if (F_ISSET(flags, F_BIGDATA))
1878                                 memcpy(node->mn_data + key->mv_size, data->mv_data,
1879                                     sizeof(pgno_t));
1880                         else
1881                                 memcpy(node->mn_data + key->mv_size, data->mv_data,
1882                                     data->mv_size);
1883                 } else {
1884                         memcpy(node->mn_data + key->mv_size, &ofp->p.mp_pgno,
1885                             sizeof(pgno_t));
1886                         memcpy(METADATA(&ofp->p), data->mv_data, data->mv_size);
1887                 }
1888         }
1889
1890         return MDB_SUCCESS;
1891 }
1892
1893 static void
1894 mdb_del_node(MDB_page *mp, indx_t indx)
1895 {
1896         unsigned int     sz;
1897         indx_t           i, j, numkeys, ptr;
1898         MDB_node        *node;
1899         char            *base;
1900
1901         DPRINTF("delete node %u on %s page %lu", indx,
1902             IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno);
1903         assert(indx < NUMKEYS(mp));
1904
1905         node = NODEPTR(mp, indx);
1906         sz = NODESIZE + node->mn_ksize;
1907         if (IS_LEAF(mp)) {
1908                 if (F_ISSET(node->mn_flags, F_BIGDATA))
1909                         sz += sizeof(pgno_t);
1910                 else
1911                         sz += NODEDSZ(node);
1912         }
1913
1914         ptr = mp->mp_ptrs[indx];
1915         numkeys = NUMKEYS(mp);
1916         for (i = j = 0; i < numkeys; i++) {
1917                 if (i != indx) {
1918                         mp->mp_ptrs[j] = mp->mp_ptrs[i];
1919                         if (mp->mp_ptrs[i] < ptr)
1920                                 mp->mp_ptrs[j] += sz;
1921                         j++;
1922                 }
1923         }
1924
1925         base = (char *)mp + mp->mp_upper;
1926         memmove(base + sz, base, ptr - mp->mp_upper);
1927
1928         mp->mp_lower -= sizeof(indx_t);
1929         mp->mp_upper += sz;
1930 }
1931
1932 int
1933 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
1934 {
1935         MDB_cursor      *cursor;
1936
1937         if (txn == NULL || ret == NULL)
1938                 return EINVAL;
1939
1940         if ((cursor = calloc(1, sizeof(*cursor))) != NULL) {
1941                 SLIST_INIT(&cursor->mc_stack);
1942                 cursor->mc_dbi = dbi;
1943                 cursor->mc_txn = txn;
1944         }
1945
1946         *ret = cursor;
1947
1948         return MDB_SUCCESS;
1949 }
1950
1951 void
1952 mdb_cursor_close(MDB_cursor *cursor)
1953 {
1954         if (cursor != NULL) {
1955                 while (!CURSOR_EMPTY(cursor))
1956                         cursor_pop_page(cursor);
1957
1958 /*              btree_close(cursor->bt); */
1959                 free(cursor);
1960         }
1961 }
1962
1963 static int
1964 mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
1965 {
1966         indx_t                   ptr, i, numkeys;
1967         int                      delta;
1968         size_t                   len;
1969         MDB_node                *node;
1970         char                    *base;
1971
1972         node = NODEPTR(mp, indx);
1973         ptr = mp->mp_ptrs[indx];
1974         DPRINTF("update key %u (ofs %u) [%.*s] to [%.*s] on page %lu",
1975             indx, ptr,
1976             (int)node->mn_ksize, (char *)NODEKEY(node),
1977             (int)key->mv_size, (char *)key->mv_data,
1978             mp->mp_pgno);
1979
1980         delta = key->mv_size - node->mn_ksize;
1981         if (delta) {
1982                 if (delta > 0 && SIZELEFT(mp) < delta) {
1983                         DPRINTF("OUCH! Not enough room, delta = %d", delta);
1984                         return ENOSPC;
1985                 }
1986
1987                 numkeys = NUMKEYS(mp);
1988                 for (i = 0; i < numkeys; i++) {
1989                         if (mp->mp_ptrs[i] <= ptr)
1990                                 mp->mp_ptrs[i] -= delta;
1991                 }
1992
1993                 base = (char *)mp + mp->mp_upper;
1994                 len = ptr - mp->mp_upper + NODESIZE;
1995                 memmove(base - delta, base, len);
1996                 mp->mp_upper -= delta;
1997
1998                 node = NODEPTR(mp, indx);
1999                 node->mn_ksize = key->mv_size;
2000         }
2001
2002         memcpy(NODEKEY(node), key->mv_data, key->mv_size);
2003
2004         return MDB_SUCCESS;
2005 }
2006
2007 /* Move a node from src to dst.
2008  */
2009 static int
2010 mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx,
2011     MDB_pageparent *dst, indx_t dstindx)
2012 {
2013         int                      rc;
2014         MDB_node                *srcnode;
2015         MDB_val          key, data;
2016
2017         srcnode = NODEPTR(src->mp_page, srcindx);
2018         DPRINTF("moving %s node %u [%.*s] on page %lu to node %u on page %lu",
2019             IS_LEAF(src->mp_page) ? "leaf" : "branch",
2020             srcindx,
2021             (int)srcnode->mn_ksize, (char *)NODEKEY(srcnode),
2022             src->mp_page->mp_pgno,
2023             dstindx, dst->mp_page->mp_pgno);
2024
2025         /* Mark src and dst as dirty. */
2026         if ((rc = mdb_touch(txn, src)) ||
2027             (rc = mdb_touch(txn, dst)))
2028                 return rc;;
2029
2030         /* Add the node to the destination page.
2031          */
2032         key.mv_size = srcnode->mn_ksize;
2033         key.mv_data = NODEKEY(srcnode);
2034         data.mv_size = NODEDSZ(srcnode);
2035         data.mv_data = NODEDATA(srcnode);
2036         rc = mdb_add_node(txn, dbi, dst->mp_page, dstindx, &key, &data, NODEPGNO(srcnode),
2037             srcnode->mn_flags);
2038         if (rc != MDB_SUCCESS)
2039                 return rc;
2040
2041         /* Delete the node from the source page.
2042          */
2043         mdb_del_node(src->mp_page, srcindx);
2044
2045         /* Update the parent separators.
2046          */
2047         if (srcindx == 0 && src->mp_pi != 0) {
2048                 DPRINTF("update separator for source page %lu to [%.*s]",
2049                     src->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data);
2050                 if ((rc = mdb_update_key(src->mp_parent, src->mp_pi,
2051                     &key)) != MDB_SUCCESS)
2052                         return rc;
2053         }
2054
2055         if (srcindx == 0 && IS_BRANCH(src->mp_page)) {
2056                 MDB_val  nullkey;
2057                 nullkey.mv_size = 0;
2058                 assert(mdb_update_key(src->mp_page, 0, &nullkey) == MDB_SUCCESS);
2059         }
2060
2061         if (dstindx == 0 && dst->mp_pi != 0) {
2062                 DPRINTF("update separator for destination page %lu to [%.*s]",
2063                     dst->mp_page->mp_pgno, (int)key.mv_size, (char *)key.mv_data);
2064                 if ((rc = mdb_update_key(dst->mp_parent, dst->mp_pi,
2065                     &key)) != MDB_SUCCESS)
2066                         return rc;
2067         }
2068
2069         if (dstindx == 0 && IS_BRANCH(dst->mp_page)) {
2070                 MDB_val  nullkey;
2071                 nullkey.mv_size = 0;
2072                 assert(mdb_update_key(dst->mp_page, 0, &nullkey) == MDB_SUCCESS);
2073         }
2074
2075         return MDB_SUCCESS;
2076 }
2077
2078 static int
2079 mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst)
2080 {
2081         int                      rc;
2082         indx_t                   i;
2083         MDB_node                *srcnode;
2084         MDB_val          key, data;
2085         MDB_pageparent  mpp;
2086         MDB_dhead *dh;
2087
2088         DPRINTF("merging page %lu and %lu", src->mp_page->mp_pgno, dst->mp_page->mp_pgno);
2089
2090         assert(txn != NULL);
2091         assert(src->mp_parent); /* can't merge root page */
2092         assert(dst->mp_parent);
2093
2094         /* Mark src and dst as dirty. */
2095         if ((rc = mdb_touch(txn, src)) ||
2096             (rc = mdb_touch(txn, dst)))
2097                 return rc;
2098
2099         /* Move all nodes from src to dst.
2100          */
2101         for (i = 0; i < NUMKEYS(src->mp_page); i++) {
2102                 srcnode = NODEPTR(src->mp_page, i);
2103
2104                 key.mv_size = srcnode->mn_ksize;
2105                 key.mv_data = NODEKEY(srcnode);
2106                 data.mv_size = NODEDSZ(srcnode);
2107                 data.mv_data = NODEDATA(srcnode);
2108                 rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key,
2109                     &data, NODEPGNO(srcnode), srcnode->mn_flags);
2110                 if (rc != MDB_SUCCESS)
2111                         return rc;
2112         }
2113
2114         DPRINTF("dst page %lu now has %u keys (%.1f%% filled)",
2115             dst->mp_page->mp_pgno, NUMKEYS(dst->mp_page), (float)PAGEFILL(txn->mt_env, dst->mp_page) / 10);
2116
2117         /* Unlink the src page from parent.
2118          */
2119         mdb_del_node(src->mp_parent, src->mp_pi);
2120         if (src->mp_pi == 0) {
2121                 key.mv_size = 0;
2122                 if ((rc = mdb_update_key(src->mp_parent, 0, &key)) != MDB_SUCCESS)
2123                         return rc;
2124         }
2125
2126         if (IS_LEAF(src->mp_page))
2127                 txn->mt_dbs[dbi]->md_leaf_pages--;
2128         else
2129                 txn->mt_dbs[dbi]->md_branch_pages--;
2130
2131         mpp.mp_page = src->mp_parent;
2132         dh = (MDB_dhead *)src->mp_parent;
2133         dh--;
2134         mpp.mp_parent = dh->md_parent;
2135         mpp.mp_pi = dh->md_pi;
2136
2137         return mdb_rebalance(txn, dbi, &mpp);
2138 }
2139
2140 #define FILL_THRESHOLD   250
2141
2142 static int
2143 mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
2144 {
2145         MDB_node        *node;
2146         MDB_page        *root;
2147         MDB_pageparent npp;
2148         indx_t           si = 0, di = 0;
2149
2150         assert(txn != NULL);
2151         assert(mpp != NULL);
2152
2153         DPRINTF("rebalancing %s page %lu (has %u keys, %.1f%% full)",
2154             IS_LEAF(mpp->mp_page) ? "leaf" : "branch",
2155             mpp->mp_page->mp_pgno, NUMKEYS(mpp->mp_page), (float)PAGEFILL(txn->mt_env, mpp->mp_page) / 10);
2156
2157         if (PAGEFILL(txn->mt_env, mpp->mp_page) >= FILL_THRESHOLD) {
2158                 DPRINTF("no need to rebalance page %lu, above fill threshold",
2159                     mpp->mp_page->mp_pgno);
2160                 return MDB_SUCCESS;
2161         }
2162
2163         if (mpp->mp_parent == NULL) {
2164                 if (NUMKEYS(mpp->mp_page) == 0) {
2165                         DPRINTF("tree is completely empty");
2166                         txn->mt_dbs[dbi]->md_root = P_INVALID;
2167                         txn->mt_dbs[dbi]->md_depth--;
2168                         txn->mt_dbs[dbi]->md_leaf_pages--;
2169                 } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
2170                         DPRINTF("collapsing root page!");
2171                         txn->mt_dbs[dbi]->md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
2172                         if ((root = mdbenv_get_page(txn->mt_env, txn->mt_dbs[dbi]->md_root)) == NULL)
2173                                 return MDB_FAIL;
2174                         txn->mt_dbs[dbi]->md_depth--;
2175                         txn->mt_dbs[dbi]->md_branch_pages--;
2176                 } else
2177                         DPRINTF("root page doesn't need rebalancing");
2178                 return MDB_SUCCESS;
2179         }
2180
2181         /* The parent (branch page) must have at least 2 pointers,
2182          * otherwise the tree is invalid.
2183          */
2184         assert(NUMKEYS(mpp->mp_parent) > 1);
2185
2186         /* Leaf page fill factor is below the threshold.
2187          * Try to move keys from left or right neighbor, or
2188          * merge with a neighbor page.
2189          */
2190
2191         /* Find neighbors.
2192          */
2193         if (mpp->mp_pi == 0) {
2194                 /* We're the leftmost leaf in our parent.
2195                  */
2196                 DPRINTF("reading right neighbor");
2197                 node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
2198                 if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
2199                         return MDB_FAIL;
2200                 npp.mp_pi = mpp->mp_pi + 1;
2201                 si = 0;
2202                 di = NUMKEYS(mpp->mp_page);
2203         } else {
2204                 /* There is at least one neighbor to the left.
2205                  */
2206                 DPRINTF("reading left neighbor");
2207                 node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
2208                 if ((npp.mp_page = mdbenv_get_page(txn->mt_env, NODEPGNO(node))) == NULL)
2209                         return MDB_FAIL;
2210                 npp.mp_pi = mpp->mp_pi - 1;
2211                 si = NUMKEYS(npp.mp_page) - 1;
2212                 di = 0;
2213         }
2214         npp.mp_parent = mpp->mp_parent;
2215
2216         DPRINTF("found neighbor page %lu (%u keys, %.1f%% full)",
2217             npp.mp_page->mp_pgno, NUMKEYS(npp.mp_page), (float)PAGEFILL(txn->mt_env, npp.mp_page) / 10);
2218
2219         /* If the neighbor page is above threshold and has at least two
2220          * keys, move one key from it.
2221          *
2222          * Otherwise we should try to merge them.
2223          */
2224         if (PAGEFILL(txn->mt_env, npp.mp_page) >= FILL_THRESHOLD && NUMKEYS(npp.mp_page) >= 2)
2225                 return mdb_move_node(txn, dbi, &npp, si, mpp, di);
2226         else { /* FIXME: if (has_enough_room()) */
2227                 if (mpp->mp_pi == 0)
2228                         return mdb_merge(txn, dbi, &npp, mpp);
2229                 else
2230                         return mdb_merge(txn, dbi, mpp, &npp);
2231         }
2232 }
2233
2234 int
2235 mdb_del(MDB_txn *txn, MDB_dbi dbi,
2236     MDB_val *key, MDB_val *data)
2237 {
2238         int              rc, exact;
2239         unsigned int     ki;
2240         MDB_node        *leaf;
2241         MDB_pageparent  mpp;
2242
2243         DPRINTF("========> delete key %.*s", (int)key->mv_size, (char *)key->mv_data);
2244
2245         assert(key != NULL);
2246
2247         if (txn == NULL || dbi >= txn->mt_numdbs)
2248                 return EINVAL;
2249
2250         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
2251                 return EINVAL;
2252         }
2253
2254         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2255                 return EINVAL;
2256         }
2257
2258         if ((rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp)) != MDB_SUCCESS)
2259                 return rc;
2260
2261         leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
2262         if (leaf == NULL || !exact) {
2263                 return ENOENT;
2264         }
2265
2266         if (data && (rc = mdb_read_data(txn->mt_env, leaf, data)) != MDB_SUCCESS)
2267                 return rc;
2268
2269         mdb_del_node(mpp.mp_page, ki);
2270         /* add overflow pages to free list */
2271         if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
2272                 int i, ovpages;
2273                 pgno_t pg;
2274
2275                 memcpy(&pg, NODEDATA(leaf), sizeof(pg));
2276                 ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_meta.mm_psize);
2277                 for (i=0; i<ovpages; i++) {
2278                         mdb_idl_insert(txn->mt_free_pgs, pg);
2279                         pg++;
2280                 }
2281         }
2282         txn->mt_dbs[dbi]->md_entries--;
2283         rc = mdb_rebalance(txn, dbi, &mpp);
2284         if (rc != MDB_SUCCESS)
2285                 txn->mt_flags |= MDB_TXN_ERROR;
2286
2287         return rc;
2288 }
2289
2290 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
2291  * right sibling, at index <*newindxp> (as if unsplit). Updates *mpp and
2292  * *newindxp with the actual values after split, ie if *mpp and *newindxp
2293  * refer to a node in the new right sibling page.
2294  */
2295 static int
2296 mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
2297     MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
2298 {
2299         uint8_t          flags;
2300         int              rc = MDB_SUCCESS, ins_new = 0;
2301         indx_t           newindx;
2302         pgno_t           pgno = 0;
2303         unsigned int     i, j, split_indx;
2304         MDB_node        *node;
2305         MDB_val  sepkey, rkey, rdata;
2306         MDB_page        *copy;
2307         MDB_dpage       *mdp, *rdp, *pdp;
2308         MDB_dhead *dh;
2309
2310         assert(txn != NULL);
2311
2312         dh = ((MDB_dhead *)*mpp) - 1;
2313         mdp = (MDB_dpage *)dh;
2314         newindx = *newindxp;
2315
2316         DPRINTF("-----> splitting %s page %lu and adding [%.*s] at index %i",
2317             IS_LEAF(&mdp->p) ? "leaf" : "branch", mdp->p.mp_pgno,
2318             (int)newkey->mv_size, (char *)newkey->mv_data, *newindxp);
2319
2320         if (mdp->h.md_parent == NULL) {
2321                 if ((pdp = mdb_new_page(txn, dbi, P_BRANCH, 1)) == NULL)
2322                         return MDB_FAIL;
2323                 mdp->h.md_pi = 0;
2324                 mdp->h.md_parent = &pdp->p;
2325                 txn->mt_dbs[dbi]->md_root = pdp->p.mp_pgno;
2326                 DPRINTF("root split! new root = %lu", pdp->p.mp_pgno);
2327                 txn->mt_dbs[dbi]->md_depth++;
2328
2329                 /* Add left (implicit) pointer. */
2330                 if (mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL,
2331                     mdp->p.mp_pgno, 0) != MDB_SUCCESS)
2332                         return MDB_FAIL;
2333         } else {
2334                 DPRINTF("parent branch page is %lu", mdp->h.md_parent->mp_pgno);
2335         }
2336
2337         /* Create a right sibling. */
2338         if ((rdp = mdb_new_page(txn, dbi, mdp->p.mp_flags, 1)) == NULL)
2339                 return MDB_FAIL;
2340         rdp->h.md_parent = mdp->h.md_parent;
2341         rdp->h.md_pi = mdp->h.md_pi + 1;
2342         DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
2343
2344         /* Move half of the keys to the right sibling. */
2345         if ((copy = malloc(txn->mt_env->me_meta.mm_psize)) == NULL)
2346                 return MDB_FAIL;
2347         memcpy(copy, &mdp->p, txn->mt_env->me_meta.mm_psize);
2348         memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_meta.mm_psize - PAGEHDRSZ);
2349         mdp->p.mp_lower = PAGEHDRSZ;
2350         mdp->p.mp_upper = txn->mt_env->me_meta.mm_psize;
2351
2352         split_indx = NUMKEYS(copy) / 2 + 1;
2353
2354         /* First find the separating key between the split pages.
2355          */
2356         memset(&sepkey, 0, sizeof(sepkey));
2357         if (newindx == split_indx) {
2358                 sepkey.mv_size = newkey->mv_size;
2359                 sepkey.mv_data = newkey->mv_data;
2360         } else {
2361                 node = NODEPTR(copy, split_indx);
2362                 sepkey.mv_size = node->mn_ksize;
2363                 sepkey.mv_data = NODEKEY(node);
2364         }
2365
2366         DPRINTF("separator is [%.*s]", (int)sepkey.mv_size, (char *)sepkey.mv_data);
2367
2368         /* Copy separator key to the parent.
2369          */
2370         if (SIZELEFT(rdp->h.md_parent) < mdb_branch_size(txn->mt_env, &sepkey)) {
2371                 rc = mdb_split(txn, dbi, &rdp->h.md_parent, &rdp->h.md_pi,
2372                     &sepkey, NULL, rdp->p.mp_pgno);
2373
2374                 /* Right page might now have changed parent.
2375                  * Check if left page also changed parent.
2376                  */
2377                 if (rdp->h.md_parent != mdp->h.md_parent &&
2378                     mdp->h.md_pi >= NUMKEYS(mdp->h.md_parent)) {
2379                         mdp->h.md_parent = rdp->h.md_parent;
2380                         mdp->h.md_pi = rdp->h.md_pi - 1;
2381                 }
2382         } else {
2383                 rc = mdb_add_node(txn, dbi, rdp->h.md_parent, rdp->h.md_pi,
2384                     &sepkey, NULL, rdp->p.mp_pgno, 0);
2385         }
2386         if (rc != MDB_SUCCESS) {
2387                 free(copy);
2388                 return MDB_FAIL;
2389         }
2390
2391         for (i = j = 0; i <= NUMKEYS(copy); j++) {
2392                 if (i < split_indx) {
2393                         /* Re-insert in left sibling. */
2394                         pdp = mdp;
2395                 } else {
2396                         /* Insert in right sibling. */
2397                         if (i == split_indx)
2398                                 /* Reset insert index for right sibling. */
2399                                 j = (i == newindx && ins_new);
2400                         pdp = rdp;
2401                 }
2402
2403                 if (i == newindx && !ins_new) {
2404                         /* Insert the original entry that caused the split. */
2405                         rkey.mv_data = newkey->mv_data;
2406                         rkey.mv_size = newkey->mv_size;
2407                         if (IS_LEAF(&mdp->p)) {
2408                                 rdata.mv_data = newdata->mv_data;
2409                                 rdata.mv_size = newdata->mv_size;
2410                         } else
2411                                 pgno = newpgno;
2412                         flags = 0;
2413
2414                         ins_new = 1;
2415
2416                         /* Update page and index for the new key. */
2417                         *newindxp = j;
2418                         *mpp = &pdp->p;
2419                 } else if (i == NUMKEYS(copy)) {
2420                         break;
2421                 } else {
2422                         node = NODEPTR(copy, i);
2423                         rkey.mv_data = NODEKEY(node);
2424                         rkey.mv_size = node->mn_ksize;
2425                         if (IS_LEAF(&mdp->p)) {
2426                                 rdata.mv_data = NODEDATA(node);
2427                                 rdata.mv_size = node->mn_dsize;
2428                         } else
2429                                 pgno = node->mn_pgno;
2430                         flags = node->mn_flags;
2431
2432                         i++;
2433                 }
2434
2435                 if (!IS_LEAF(&mdp->p) && j == 0) {
2436                         /* First branch index doesn't need key data. */
2437                         rkey.mv_size = 0;
2438                 }
2439
2440                 rc = mdb_add_node(txn, dbi, &pdp->p, j, &rkey, &rdata, pgno,flags);
2441         }
2442
2443         free(copy);
2444         return rc;
2445 }
2446
2447 int
2448 mdb_put(MDB_txn *txn, MDB_dbi dbi,
2449     MDB_val *key, MDB_val *data, unsigned int flags)
2450 {
2451         int              rc = MDB_SUCCESS, exact;
2452         unsigned int     ki;
2453         MDB_node        *leaf;
2454         MDB_pageparent  mpp;
2455
2456         assert(key != NULL);
2457         assert(data != NULL);
2458
2459         if (txn == NULL)
2460                 return EINVAL;
2461
2462         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
2463                 return EINVAL;
2464         }
2465
2466         if (txn->mt_env->me_txn != txn) {
2467                 return EINVAL;
2468         }
2469
2470         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2471                 return EINVAL;
2472         }
2473
2474         DPRINTF("==> put key %.*s, size %zu, data size %zu",
2475                 (int)key->mv_size, (char *)key->mv_data, key->mv_size, data->mv_size);
2476
2477         rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
2478         if (rc == MDB_SUCCESS) {
2479                 leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
2480                 if (leaf && exact) {
2481                         if (F_ISSET(flags, MDB_NOOVERWRITE)) {
2482                                 DPRINTF("duplicate key %.*s",
2483                                     (int)key->mv_size, (char *)key->mv_data);
2484                                 return EEXIST;
2485                         }
2486                         mdb_del_node(mpp.mp_page, ki);
2487                 }
2488                 if (leaf == NULL) {             /* append if not found */
2489                         ki = NUMKEYS(mpp.mp_page);
2490                         DPRINTF("appending key at index %i", ki);
2491                 }
2492         } else if (rc == ENOENT) {
2493                 MDB_dpage *dp;
2494                 /* new file, just write a root leaf page */
2495                 DPRINTF("allocating new root leaf page");
2496                 if ((dp = mdb_new_page(txn, dbi, P_LEAF, 1)) == NULL) {
2497                         return ENOMEM;
2498                 }
2499                 mpp.mp_page = &dp->p;
2500                 txn->mt_dbs[dbi]->md_root = mpp.mp_page->mp_pgno;
2501                 txn->mt_dbs[dbi]->md_depth++;
2502                 ki = 0;
2503         }
2504         else
2505                 goto done;
2506
2507         assert(IS_LEAF(mpp.mp_page));
2508         DPRINTF("there are %u keys, should insert new key at index %i",
2509                 NUMKEYS(mpp.mp_page), ki);
2510
2511         if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) {
2512                 rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID);
2513         } else {
2514                 /* There is room already in this leaf page. */
2515                 rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, data, 0, 0);
2516         }
2517
2518         if (rc != MDB_SUCCESS)
2519                 txn->mt_flags |= MDB_TXN_ERROR;
2520         else
2521                 txn->mt_dbs[dbi]->md_entries++;
2522
2523 done:
2524         return rc;
2525 }
2526
2527 int
2528 mdbenv_get_flags(MDB_env *env, unsigned int *arg)
2529 {
2530         if (!env || !arg)
2531                 return EINVAL;
2532
2533         *arg = env->me_flags;
2534         return MDB_SUCCESS;
2535 }
2536
2537 int
2538 mdbenv_get_path(MDB_env *env, const char **arg)
2539 {
2540         if (!env || !arg)
2541                 return EINVAL;
2542
2543         *arg = env->me_path;
2544         return MDB_SUCCESS;
2545 }
2546
2547 int
2548 mdbenv_stat(MDB_env *env, MDB_stat *arg)
2549 {
2550         if (env == NULL || arg == NULL)
2551                 return EINVAL;
2552
2553         arg->ms_psize = env->me_meta.mm_psize;
2554         arg->ms_depth = env->me_meta.mm_depth;
2555         arg->ms_branch_pages = env->me_meta.mm_branch_pages;
2556         arg->ms_leaf_pages = env->me_meta.mm_leaf_pages;
2557         arg->ms_overflow_pages = env->me_meta.mm_overflow_pages;
2558         arg->ms_entries = env->me_meta.mm_entries;
2559
2560         return MDB_SUCCESS;
2561 }
2562
2563 int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
2564 {
2565         MDB_val key, data;
2566         MDB_dbi i;
2567         int rc;
2568
2569         /* main DB? */
2570         if (!name) {
2571                 *dbi = 0;
2572                 return MDB_SUCCESS;
2573         }
2574
2575         /* Is the DB already open? */
2576         for (i=0; i<txn->mt_numdbs; i++) {
2577                 if (!strcmp(name, txn->mt_dbxs[i].md_name)) {
2578                         *dbi = i;
2579                         return MDB_SUCCESS;
2580                 }
2581         }
2582
2583         /* Find the DB info */
2584         key.mv_size = strlen(name);
2585         key.mv_data = (void *)name;
2586         rc = mdb_get(txn, 0, &key, &data);
2587
2588         /* Create if requested */
2589         if (rc == ENOENT && (flags & MDB_CREATE)) {
2590                 MDB_db dummy;
2591                 data.mv_size = sizeof(MDB_db);
2592                 data.mv_data = &dummy;
2593                 memset(&dummy, 0, sizeof(dummy));
2594                 dummy.md_root = P_INVALID;
2595                 rc = mdb_put(txn, 0, &key, &data, 0);
2596                 if (rc == MDB_SUCCESS)
2597                         rc = mdb_get(txn, 0, &key, &data);
2598         }
2599
2600         /* OK, got info, add to table */
2601         if (rc == MDB_SUCCESS) {
2602                 /* Is there a free slot? */
2603                 if ((txn->mt_numdbs & (DBX_CHUNK-1)) == 0) {
2604                         MDB_dbx *p1;
2605                         MDB_db **p2;
2606                         int i;
2607                         i = txn->mt_numdbs + DBX_CHUNK;
2608                         p1 = realloc(txn->mt_dbxs, i * sizeof(MDB_dbx));
2609                         if (p1 == NULL)
2610                                 return ENOMEM;
2611                         txn->mt_dbxs = p1;
2612                         p2 = realloc(txn->mt_dbs, i * sizeof(MDB_db *));
2613                         if (p2 == NULL)
2614                                 return ENOMEM;
2615                         txn->mt_dbs = p2;
2616                 }
2617                 txn->mt_dbxs[txn->mt_numdbs].md_name = strdup(name);
2618                 txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL;
2619                 txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL;
2620                 txn->mt_dbs[txn->mt_numdbs] = data.mv_data;
2621                 *dbi = txn->mt_numdbs;
2622                 txn->mt_numdbs++;
2623         }
2624
2625         return rc;
2626 }
2627
2628 int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *arg)
2629 {
2630         if (txn == NULL || arg == NULL)
2631                 return EINVAL;
2632
2633         arg->ms_psize = txn->mt_env->me_meta.mm_psize;
2634         arg->ms_depth = txn->mt_dbs[dbi]->md_depth;
2635         arg->ms_branch_pages = txn->mt_dbs[dbi]->md_branch_pages;
2636         arg->ms_leaf_pages = txn->mt_dbs[dbi]->md_leaf_pages;
2637         arg->ms_overflow_pages = txn->mt_dbs[dbi]->md_overflow_pages;
2638         arg->ms_entries = txn->mt_dbs[dbi]->md_entries;
2639
2640         return MDB_SUCCESS;
2641 }
2642
2643 void mdb_close(MDB_txn *txn, MDB_dbi dbi)
2644 {
2645         if (dbi >= txn->mt_numdbs)
2646                 return;
2647         free(txn->mt_dbxs[dbi].md_name);
2648         txn->mt_dbxs[dbi].md_name = NULL;
2649 }