]> git.sur5r.net Git - openldap/blob - libraries/libmdb/mdb.c
9c6af1edbaf98125cb92f922af641f38150c6860
[openldap] / libraries / libmdb / mdb.c
1 /* mdb.c - memory-mapped database library */
2 /*
3  * Copyright 2011 Howard Chu, Symas Corp.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted only as authorized by the OpenLDAP
8  * Public License.
9  *
10  * A copy of this license is available in the file LICENSE in the
11  * top-level directory of the distribution or, alternatively, at
12  * <http://www.OpenLDAP.org/license.html>.
13  *
14  * This code is derived from btree.c written by Martin Hedenfalk.
15  *
16  * Copyright (c) 2009, 2010 Martin Hedenfalk <martin@bzero.se>
17  *
18  * Permission to use, copy, modify, and distribute this software for any
19  * purpose with or without fee is hereby granted, provided that the above
20  * copyright notice and this permission notice appear in all copies.
21  *
22  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
23  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
24  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
25  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
26  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
27  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
28  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
29  */
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <sys/param.h>
33 #include <sys/uio.h>
34 #include <sys/mman.h>
35 #ifdef HAVE_SYS_FILE_H
36 #include <sys/file.h>
37 #endif
38 #include <fcntl.h>
39
40 #include <assert.h>
41 #include <errno.h>
42 #include <stddef.h>
43 #include <stdint.h>
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <string.h>
47 #include <time.h>
48 #include <unistd.h>
49 #include <pthread.h>
50
51 #include "mdb.h"
52
53 #define ULONG           unsigned long
54 typedef ULONG           pgno_t;
55
56 #include "midl.h"
57
58 /* Note: If O_DSYNC is undefined but exists in /usr/include,
59  * preferably set some compiler flag to get the definition.
60  * Otherwise compile with the less efficient -DMDB_DSYNC=O_SYNC.
61  */
62 #ifndef MDB_DSYNC
63 # define MDB_DSYNC      O_DSYNC
64 #endif
65
66 #ifndef DEBUG
67 #define DEBUG 1
68 #endif
69
70 #if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
71 # define DPRINTF        (void)  /* Vararg macros may be unsupported */
72 #elif DEBUG
73 # define DPRINTF(fmt, ...)      /* Requires 2 or more args */ \
74         fprintf(stderr, "%s:%d: " fmt "\n", __func__, __LINE__, __VA_ARGS__)
75 #else
76 # define DPRINTF(fmt, ...)      ((void) 0)
77 #endif
78 #define DPUTS(arg)      DPRINTF("%s", arg)
79
80 #define PAGESIZE         4096
81 #define MDB_MINKEYS      2
82 #define MDB_MAGIC        0xBEEFC0DE
83 #define MDB_VERSION      1
84 #define MAXKEYSIZE       511
85 #if DEBUG
86 #define KBUF    (MAXKEYSIZE*2+1)
87 #define DKBUF   char kbuf[KBUF]
88 #define DKEY(x) mdb_dkey(x, kbuf)
89 #else
90 #define DKBUF
91 #define DKEY(x)
92 #endif
93
94 #define P_INVALID        (~0UL)
95
96 #define F_ISSET(w, f)    (((w) & (f)) == (f))
97
98 typedef uint16_t         indx_t;
99
100 #define DEFAULT_READERS 126
101 #define DEFAULT_MAPSIZE 1048576
102
103 /* Lock descriptor stuff */
104 #ifndef CACHELINE
105 #define CACHELINE       64      /* most CPUs. Itanium uses 128 */
106 #endif
107
108 typedef struct MDB_rxbody {
109         ULONG           mrb_txnid;
110         pid_t           mrb_pid;
111         pthread_t       mrb_tid;
112 } MDB_rxbody;
113
114 typedef struct MDB_reader {
115         union {
116                 MDB_rxbody mrx;
117 #define mr_txnid        mru.mrx.mrb_txnid
118 #define mr_pid  mru.mrx.mrb_pid
119 #define mr_tid  mru.mrx.mrb_tid
120                 /* cache line alignment */
121                 char pad[(sizeof(MDB_rxbody)+CACHELINE-1) & ~(CACHELINE-1)];
122         } mru;
123 } MDB_reader;
124
125 typedef struct MDB_txbody {
126         uint32_t        mtb_magic;
127         uint32_t        mtb_version;
128         pthread_mutex_t mtb_mutex;
129         ULONG           mtb_txnid;
130         uint32_t        mtb_numreaders;
131         uint32_t        mtb_me_toggle;
132 } MDB_txbody;
133
134 typedef struct MDB_txninfo {
135         union {
136                 MDB_txbody mtb;
137 #define mti_magic       mt1.mtb.mtb_magic
138 #define mti_version     mt1.mtb.mtb_version
139 #define mti_mutex       mt1.mtb.mtb_mutex
140 #define mti_txnid       mt1.mtb.mtb_txnid
141 #define mti_numreaders  mt1.mtb.mtb_numreaders
142 #define mti_me_toggle   mt1.mtb.mtb_me_toggle
143                 char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
144         } mt1;
145         union {
146                 pthread_mutex_t mt2_wmutex;
147 #define mti_wmutex      mt2.mt2_wmutex
148                 char pad[(sizeof(pthread_mutex_t)+CACHELINE-1) & ~(CACHELINE-1)];
149         } mt2;
150         MDB_reader      mti_readers[1];
151 } MDB_txninfo;
152
153 /* Common header for all page types. Overflow pages
154  * occupy a number of contiguous pages with no
155  * headers on any page after the first.
156  */
157 typedef struct MDB_page {               /* represents a page of storage */
158 #define mp_pgno         mp_p.p_pgno
159         union padded {
160                 pgno_t          p_pgno;         /* page number */
161                 void *          p_align;        /* for IL32P64 */
162         } mp_p;
163 #define P_BRANCH         0x01           /* branch page */
164 #define P_LEAF           0x02           /* leaf page */
165 #define P_OVERFLOW       0x04           /* overflow page */
166 #define P_META           0x08           /* meta page */
167 #define P_DIRTY          0x10           /* dirty page */
168 #define P_LEAF2          0x20           /* DB with small, fixed size keys and no data */
169         uint32_t        mp_flags;
170 #define mp_lower        mp_pb.pb.pb_lower
171 #define mp_upper        mp_pb.pb.pb_upper
172 #define mp_pages        mp_pb.pb_pages
173         union page_bounds {
174                 struct {
175                         indx_t          pb_lower;               /* lower bound of free space */
176                         indx_t          pb_upper;               /* upper bound of free space */
177                 } pb;
178                 uint32_t        pb_pages;       /* number of overflow pages */
179         } mp_pb;
180         indx_t          mp_ptrs[1];             /* dynamic size */
181 } MDB_page;
182
183 #define PAGEHDRSZ        ((unsigned) offsetof(MDB_page, mp_ptrs))
184
185 #define NUMKEYS(p)       (((p)->mp_lower - PAGEHDRSZ) >> 1)
186 #define SIZELEFT(p)      (indx_t)((p)->mp_upper - (p)->mp_lower)
187 #define PAGEFILL(env, p) (1000L * ((env)->me_psize - PAGEHDRSZ - SIZELEFT(p)) / \
188                                 ((env)->me_psize - PAGEHDRSZ))
189 #define IS_LEAF(p)       F_ISSET((p)->mp_flags, P_LEAF)
190 #define IS_LEAF2(p)      F_ISSET((p)->mp_flags, P_LEAF2)
191 #define IS_BRANCH(p)     F_ISSET((p)->mp_flags, P_BRANCH)
192 #define IS_OVERFLOW(p)   F_ISSET((p)->mp_flags, P_OVERFLOW)
193
194 #define OVPAGES(size, psize)    ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
195
196 typedef struct MDB_db {
197         uint32_t        md_pad;         /* also ksize for LEAF2 pages */
198         uint16_t        md_flags;
199         uint16_t        md_depth;
200         ULONG           md_branch_pages;
201         ULONG           md_leaf_pages;
202         ULONG           md_overflow_pages;
203         ULONG           md_entries;
204         pgno_t          md_root;
205 } MDB_db;
206
207 #define FREE_DBI        0
208 #define MAIN_DBI        1
209
210 typedef struct MDB_meta {                       /* meta (footer) page content */
211         uint32_t        mm_magic;
212         uint32_t        mm_version;
213         void            *mm_address;            /* address for fixed mapping */
214         size_t          mm_mapsize;                     /* size of mmap region */
215         MDB_db          mm_dbs[2];                      /* first is free space, 2nd is main db */
216 #define mm_psize        mm_dbs[0].md_pad
217 #define mm_flags        mm_dbs[0].md_flags
218         pgno_t          mm_last_pg;                     /* last used page in file */
219         ULONG           mm_txnid;                       /* txnid that committed this page */
220 } MDB_meta;
221
222 typedef struct MDB_dhead {                                      /* a dirty page */
223         MDB_page        *md_parent;
224         unsigned        md_pi;                          /* parent index */
225         int                     md_num;
226 } MDB_dhead;
227
228 typedef struct MDB_dpage {
229         MDB_dhead       h;
230         MDB_page        p;
231 } MDB_dpage;
232
233 typedef struct MDB_oldpages {
234         struct MDB_oldpages *mo_next;
235         ULONG           mo_txnid;
236         pgno_t          mo_pages[1];    /* dynamic */
237 } MDB_oldpages;
238
239 typedef struct MDB_pageparent {
240         MDB_page *mp_page;
241         MDB_page *mp_parent;
242         unsigned mp_pi;
243 } MDB_pageparent;
244
245 static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num);
246 static int              mdb_touch(MDB_txn *txn, MDB_pageparent *mp);
247
248 typedef struct MDB_ppage {                                      /* ordered list of pages */
249         MDB_page                *mp_page;
250         unsigned int    mp_ki;          /* cursor index on page */
251 } MDB_ppage;
252
253 #define CURSOR_TOP(c)            (&(c)->mc_stack[(c)->mc_snum-1])
254 #define CURSOR_PARENT(c)         (&(c)->mc_stack[(c)->mc_snum-2])
255 #define CURSOR_STACK             32
256
257 struct MDB_xcursor;
258
259 struct MDB_cursor {
260         MDB_txn         *mc_txn;
261         MDB_ppage       mc_stack[CURSOR_STACK];         /* stack of parent pages */
262         unsigned int    mc_snum;                /* number of pushed pages */
263         MDB_dbi         mc_dbi;
264         short           mc_initialized; /* 1 if initialized */
265         short           mc_eof;         /* 1 if end is reached */
266         struct MDB_xcursor      *mc_xcursor;
267 };
268
269 #define METADATA(p)      ((void *)((char *)(p) + PAGEHDRSZ))
270
271 typedef struct MDB_node {
272 #define mn_pgno          mn_p.np_pgno
273 #define mn_dsize         mn_p.np_dsize
274         union {
275                 pgno_t           np_pgno;       /* child page number */
276                 uint32_t         np_dsize;      /* leaf data size */
277         } mn_p;
278         unsigned int    mn_flags:4;
279         unsigned int    mn_ksize:12;                    /* key size */
280 #define F_BIGDATA        0x01                   /* data put on overflow page */
281 #define F_SUBDATA        0x02                   /* data is a sub-database */
282 #define F_DUPDATA        0x04                   /* data has duplicates */
283         char            mn_data[1];
284 } MDB_node;
285
286 typedef struct MDB_dbx {
287         MDB_val         md_name;
288         MDB_cmp_func    *md_cmp;                /* user compare function */
289         MDB_cmp_func    *md_dcmp;               /* user dupsort function */
290         MDB_rel_func    *md_rel;                /* user relocate function */
291         MDB_dbi md_parent;
292         unsigned int    md_dirty;
293 } MDB_dbx;
294
295 struct MDB_txn {
296         pgno_t          mt_next_pgno;   /* next unallocated page */
297         ULONG           mt_txnid;
298         ULONG           mt_oldest;
299         MDB_env         *mt_env;        
300         pgno_t          *mt_free_pgs;   /* this is an IDL */
301         union {
302                 MIDL2   *dirty_list;    /* modified pages */
303                 MDB_reader      *reader;
304         } mt_u;
305         MDB_dbx         *mt_dbxs;               /* array */
306         MDB_db          *mt_dbs;
307         unsigned int    mt_numdbs;
308
309 #define MDB_TXN_RDONLY          0x01            /* read-only transaction */
310 #define MDB_TXN_ERROR           0x02            /* an error has occurred */
311 #define MDB_TXN_METOGGLE        0x04            /* used meta page 1 */
312         unsigned int    mt_flags;
313 };
314
315 /* Context for sorted-dup records */
316 typedef struct MDB_xcursor {
317         MDB_cursor mx_cursor;
318         MDB_txn mx_txn;
319         MDB_dbx mx_dbxs[4];
320         MDB_db  mx_dbs[4];
321 } MDB_xcursor;
322
323 struct MDB_env {
324         int                     me_fd;
325         int                     me_lfd;
326         int                     me_mfd;                 /* just for writing the meta pages */
327 #define MDB_FATAL_ERROR 0x80000000U
328         uint32_t        me_flags;
329         uint32_t        me_extrapad;    /* unused for now */
330         unsigned int    me_maxreaders;
331         unsigned int    me_numdbs;
332         unsigned int    me_maxdbs;
333         char            *me_path;
334         char            *me_map;
335         MDB_txninfo     *me_txns;
336         MDB_meta        *me_metas[2];
337         MDB_meta        *me_meta;
338         MDB_txn         *me_txn;                /* current write transaction */
339         size_t          me_mapsize;
340         off_t           me_size;                /* current file size */
341         pgno_t          me_maxpg;               /* me_mapsize / me_psize */
342         unsigned int    me_psize;
343         unsigned int    me_db_toggle;
344         MDB_dbx         *me_dbxs;               /* array */
345         MDB_db          *me_dbs[2];
346         MDB_oldpages *me_pghead;
347         pthread_key_t   me_txkey;       /* thread-key for readers */
348         MDB_dpage       *me_dpages;
349         pgno_t          me_free_pgs[MDB_IDL_UM_SIZE];
350         MIDL2           me_dirty_list[MDB_IDL_DB_SIZE];
351 };
352
353 #define NODESIZE         offsetof(MDB_node, mn_data)
354
355 #define INDXSIZE(k)      (NODESIZE + ((k) == NULL ? 0 : (k)->mv_size))
356 #define LEAFSIZE(k, d)   (NODESIZE + (k)->mv_size + (d)->mv_size)
357 #define NODEPTR(p, i)    ((MDB_node *)((char *)(p) + (p)->mp_ptrs[i]))
358 #define NODEKEY(node)    (void *)((node)->mn_data)
359 #define NODEDATA(node)   (void *)((char *)(node)->mn_data + (node)->mn_ksize)
360 #define NODEPGNO(node)   ((node)->mn_pgno)
361 #define NODEDSZ(node)    ((node)->mn_dsize)
362 #define NODEKSZ(node)    ((node)->mn_ksize)
363 #define LEAF2KEY(p, i, ks)      ((char *)(p) + PAGEHDRSZ + ((i)*(ks)))
364
365 #define MDB_SET_KEY(node, key)  if (key!=NULL) {(key)->mv_size = NODEKSZ(node); (key)->mv_data = NODEKEY(node);}
366
367 #define MDB_COMMIT_PAGES         64     /* max number of pages to write in one commit */
368
369 static int  mdb_search_page_root(MDB_txn *txn,
370                             MDB_dbi dbi, MDB_val *key,
371                             MDB_cursor *cursor, int modify,
372                             MDB_pageparent *mpp);
373 static int  mdb_search_page(MDB_txn *txn,
374                             MDB_dbi dbi, MDB_val *key,
375                             MDB_cursor *cursor, int modify,
376                             MDB_pageparent *mpp);
377
378 static int  mdb_env_read_header(MDB_env *env, MDB_meta *meta);
379 static int  mdb_env_read_meta(MDB_env *env, int *which);
380 static int  mdb_env_write_meta(MDB_txn *txn);
381 static int  mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **mp);
382
383 static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
384                             MDB_val *key, int *exactp, unsigned int *kip);
385 static int  mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
386                             indx_t indx, MDB_val *key, MDB_val *data,
387                             pgno_t pgno, uint8_t flags);
388 static void mdb_del_node(MDB_page *mp, indx_t indx, int ksize);
389 static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki,
390     MDB_pageparent *mpp, MDB_node *leaf);
391 static int mdb_put0(MDB_txn *txn, MDB_dbi dbi,
392     MDB_val *key, MDB_val *data, unsigned int flags);
393 static int  mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
394
395 static int               mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp);
396 static int               mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
397 static int               mdb_move_node(MDB_txn *txn, MDB_dbi dbi, 
398                                 MDB_pageparent *src, indx_t srcindx,
399                                 MDB_pageparent *dst, indx_t dstindx);
400 static int               mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src,
401                             MDB_pageparent *dst);
402 static int               mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp,
403                             unsigned int *newindxp, MDB_val *newkey,
404                             MDB_val *newdata, pgno_t newpgno);
405 static MDB_dpage *mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num);
406
407 static void              cursor_pop_page(MDB_cursor *cursor);
408 static MDB_ppage *cursor_push_page(MDB_cursor *cursor,
409                             MDB_page *mp);
410
411 static int               mdb_sibling(MDB_cursor *cursor, int move_right);
412 static int               mdb_cursor_next(MDB_cursor *cursor,
413                             MDB_val *key, MDB_val *data, MDB_cursor_op op);
414 static int               mdb_cursor_prev(MDB_cursor *cursor,
415                             MDB_val *key, MDB_val *data, MDB_cursor_op op);
416 static int               mdb_cursor_set(MDB_cursor *cursor,
417                             MDB_val *key, MDB_val *data, MDB_cursor_op op, int *exactp);
418 static int               mdb_cursor_first(MDB_cursor *cursor,
419                             MDB_val *key, MDB_val *data);
420 static int               mdb_cursor_last(MDB_cursor *cursor,
421                             MDB_val *key, MDB_val *data);
422
423 static void             mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
424 static void             mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node);
425 static void             mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
426
427 static size_t            mdb_leaf_size(MDB_env *env, MDB_val *key,
428                             MDB_val *data);
429 static size_t            mdb_branch_size(MDB_env *env, MDB_val *key);
430
431 static int               memncmp(const void *s1, size_t n1,
432                                  const void *s2, size_t n2);
433 static int               memnrcmp(const void *s1, size_t n1,
434                                   const void *s2, size_t n2);
435
436 static int
437 memncmp(const void *s1, size_t n1, const void *s2, size_t n2)
438 {
439         int diff, len_diff = -1;
440
441         if (n1 >= n2) {
442                 len_diff = (n1 > n2);
443                 n1 = n2;
444         }
445         diff = memcmp(s1, s2, n1);
446         return diff ? diff : len_diff;
447 }
448
449 static int
450 memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
451 {
452         const unsigned char     *p1, *p2, *p1_lim;
453
454         if (n2 == 0)
455                 return n1 != 0;
456         if (n1 == 0)
457                 return -1;
458
459         p1 = (const unsigned char *)s1 + n1 - 1;
460         p2 = (const unsigned char *)s2 + n2 - 1;
461
462         for (p1_lim = (n1 <= n2 ? s1 : s2);  *p1 == *p2;  p1--, p2--) {
463                 if (p1 == p1_lim)
464                         return (p1 != s1) ? (p1 != p2) : (p2 != s2) ? -1 : 0;
465         }
466         return *p1 - *p2;
467 }
468
469 char *
470 mdb_version(int *maj, int *min, int *pat)
471 {
472         *maj = MDB_VERSION_MAJOR;
473         *min = MDB_VERSION_MINOR;
474         *pat = MDB_VERSION_PATCH;
475         return MDB_VERSION_STRING;
476 }
477
478 static char *const errstr[] = {
479         "MDB_KEYEXIST: Key/data pair already exists",
480         "MDB_NOTFOUND: No matching key/data pair found",
481         "MDB_PAGE_NOTFOUND: Requested page not found",
482         "MDB_CORRUPTED: Located page was wrong type",
483         "MDB_PANIC: Update of meta page failed",
484         "MDB_VERSION_MISMATCH: Database environment version mismatch"
485 };
486
487 char *
488 mdb_strerror(int err)
489 {
490         if (!err)
491                 return ("Successful return: 0");
492
493         if (err >= MDB_KEYEXIST && err <= MDB_VERSION_MISMATCH)
494                 return errstr[err - MDB_KEYEXIST];
495
496         return strerror(err);
497 }
498
499 static char *
500 mdb_dkey(MDB_val *key, char *buf)
501 {
502         char *ptr = buf;
503         unsigned char *c = key->mv_data;
504         unsigned int i;
505         if (key->mv_size > MAXKEYSIZE)
506                 return "MAXKEYSIZE";
507         for (i=0; i<key->mv_size; i++)
508                 ptr += sprintf(ptr, "%02x", *c++);
509         return buf;
510 }
511
512 int
513 mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
514 {
515         if (txn->mt_dbxs[dbi].md_cmp)
516                 return txn->mt_dbxs[dbi].md_cmp(a, b);
517
518         if (txn->mt_dbs[dbi].md_flags & (MDB_REVERSEKEY
519 #if __BYTE_ORDER == __LITTLE_ENDIAN
520                 |MDB_INTEGERKEY
521 #endif
522         ))
523                 return memnrcmp(a->mv_data, a->mv_size, b->mv_data, b->mv_size);
524         else
525                 return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size);
526 }
527
528 int
529 mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
530 {
531         if (txn->mt_dbxs[dbi].md_dcmp)
532                 return txn->mt_dbxs[dbi].md_dcmp(a, b);
533
534         if (txn->mt_dbs[dbi].md_flags & (0
535 #if __BYTE_ORDER == __LITTLE_ENDIAN
536                 |MDB_INTEGERDUP
537 #endif
538         ))
539                 return memnrcmp(a->mv_data, a->mv_size, b->mv_data, b->mv_size);
540         else
541                 return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size);
542 }
543
544 /* Allocate new page(s) for writing */
545 static MDB_dpage *
546 mdb_alloc_page(MDB_txn *txn, MDB_page *parent, unsigned int parent_idx, int num)
547 {
548         MDB_dpage *dp;
549         pgno_t pgno = P_INVALID;
550         ULONG oldest;
551         MIDL2 mid;
552
553         if (txn->mt_txnid > 2) {
554
555         oldest = txn->mt_txnid - 2;
556         if (!txn->mt_env->me_pghead && txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
557                 /* See if there's anything in the free DB */
558                 MDB_pageparent mpp;
559                 MDB_node *leaf;
560                 ULONG *kptr;
561
562                 mpp.mp_parent = NULL;
563                 mpp.mp_pi = 0;
564                 mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp);
565                 leaf = NODEPTR(mpp.mp_page, 0);
566                 kptr = (ULONG *)NODEKEY(leaf);
567
568                 /* It's potentially usable, unless there are still
569                  * older readers outstanding. Grab it.
570                  */
571                 if (oldest > *kptr) {
572                         MDB_oldpages *mop;
573                         MDB_val data;
574                         pgno_t *idl;
575
576                         mdb_read_data(txn, leaf, &data);
577                         idl = (ULONG *)data.mv_data;
578                         mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
579                         mop->mo_next = txn->mt_env->me_pghead;
580                         mop->mo_txnid = *kptr;
581                         txn->mt_env->me_pghead = mop;
582                         memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
583
584 #if DEBUG > 1
585                         {
586                                 unsigned int i;
587                                 DPRINTF("IDL read txn %lu root %lu num %lu",
588                                         mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
589                                 for (i=0; i<idl[0]; i++) {
590                                         DPRINTF("IDL %lu", idl[i+1]);
591                                 }
592                         }
593 #endif
594                         /* drop this IDL from the DB */
595                         mpp.mp_parent = NULL;
596                         mpp.mp_pi = 0;
597                         mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp);
598                         leaf = NODEPTR(mpp.mp_page, 0);
599                         mdb_del0(txn, FREE_DBI, 0, &mpp, leaf);
600                 }
601         }
602         if (txn->mt_env->me_pghead) {
603                 unsigned int i;
604                 for (i=0; i<txn->mt_env->me_txns->mti_numreaders; i++) {
605                         ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
606                         if (!mr) continue;
607                         if (mr < oldest)
608                                 oldest = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
609                 }
610                 if (oldest > txn->mt_env->me_pghead->mo_txnid) {
611                         MDB_oldpages *mop = txn->mt_env->me_pghead;
612                         txn->mt_oldest = oldest;
613                         if (num > 1) {
614                                 /* FIXME: For now, always use fresh pages. We
615                                  * really ought to search the free list for a
616                                  * contiguous range.
617                                  */
618                                 ;
619                         } else {
620                                 /* peel pages off tail, so we only have to truncate the list */
621                                 pgno = MDB_IDL_LAST(mop->mo_pages);
622                                 if (MDB_IDL_IS_RANGE(mop->mo_pages)) {
623                                         mop->mo_pages[2]++;
624                                         if (mop->mo_pages[2] > mop->mo_pages[1])
625                                                 mop->mo_pages[0] = 0;
626                                 } else {
627                                         mop->mo_pages[0]--;
628                                 }
629                                 if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
630                                         txn->mt_env->me_pghead = mop->mo_next;
631                                         free(mop);
632                                 }
633                         }
634                 }
635         }
636         }
637
638         if (pgno == P_INVALID) {
639                 /* DB size is maxed out */
640                 if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg)
641                         return NULL;
642         }
643         if (txn->mt_env->me_dpages && num == 1) {
644                 dp = txn->mt_env->me_dpages;
645                 txn->mt_env->me_dpages = (MDB_dpage *)dp->h.md_parent;
646         } else {
647                 if ((dp = malloc(txn->mt_env->me_psize * num + sizeof(MDB_dhead))) == NULL)
648                         return NULL;
649         }
650         dp->h.md_num = num;
651         dp->h.md_parent = parent;
652         dp->h.md_pi = parent_idx;
653         if (pgno == P_INVALID) {
654                 dp->p.mp_pgno = txn->mt_next_pgno;
655                 txn->mt_next_pgno += num;
656         } else {
657                 dp->p.mp_pgno = pgno;
658         }
659         mid.mid = dp->p.mp_pgno;
660         mid.mptr = dp;
661         mdb_midl2_insert(txn->mt_u.dirty_list, &mid);
662
663         return dp;
664 }
665
666 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
667  */
668 static int
669 mdb_touch(MDB_txn *txn, MDB_pageparent *pp)
670 {
671         MDB_page *mp = pp->mp_page;
672         pgno_t  pgno;
673         assert(txn != NULL);
674         assert(pp != NULL);
675
676         if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
677                 MDB_dpage *dp;
678                 if ((dp = mdb_alloc_page(txn, pp->mp_parent, pp->mp_pi, 1)) == NULL)
679                         return ENOMEM;
680                 DPRINTF("touched page %lu -> %lu", mp->mp_pgno, dp->p.mp_pgno);
681                 assert(mp->mp_pgno != dp->p.mp_pgno);
682                 mdb_midl_insert(txn->mt_free_pgs, mp->mp_pgno);
683                 pgno = dp->p.mp_pgno;
684                 memcpy(&dp->p, mp, txn->mt_env->me_psize);
685                 mp = &dp->p;
686                 mp->mp_pgno = pgno;
687                 mp->mp_flags |= P_DIRTY;
688
689                 /* Update the page number to new touched page. */
690                 if (pp->mp_parent != NULL)
691                         NODEPGNO(NODEPTR(pp->mp_parent, pp->mp_pi)) = mp->mp_pgno;
692                 pp->mp_page = mp;
693         }
694         return 0;
695 }
696
697 int
698 mdb_env_sync(MDB_env *env, int force)
699 {
700         int rc = 0;
701         if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) {
702                 if (fdatasync(env->me_fd))
703                         rc = errno;
704         }
705         return rc;
706 }
707
708 static inline void
709 mdb_txn_reset0(MDB_txn *txn);
710
711 static inline int
712 mdb_txn_renew0(MDB_txn *txn)
713 {
714         MDB_env *env = txn->mt_env;
715
716         int rc, toggle;
717
718         if (env->me_flags & MDB_FATAL_ERROR) {
719                 DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!");
720                 return MDB_PANIC;
721         }
722
723         if (txn->mt_flags & MDB_TXN_RDONLY) {
724                 MDB_reader *r = pthread_getspecific(env->me_txkey);
725                 if (!r) {
726                         unsigned int i;
727                         pthread_mutex_lock(&env->me_txns->mti_mutex);
728                         for (i=0; i<env->me_txns->mti_numreaders; i++)
729                                 if (env->me_txns->mti_readers[i].mr_pid == 0)
730                                         break;
731                         if (i == env->me_maxreaders) {
732                                 pthread_mutex_unlock(&env->me_txns->mti_mutex);
733                                 return ENOSPC;
734                         }
735                         env->me_txns->mti_readers[i].mr_pid = getpid();
736                         env->me_txns->mti_readers[i].mr_tid = pthread_self();
737                         r = &env->me_txns->mti_readers[i];
738                         pthread_setspecific(env->me_txkey, r);
739                         if (i >= env->me_txns->mti_numreaders)
740                                 env->me_txns->mti_numreaders = i+1;
741                         pthread_mutex_unlock(&env->me_txns->mti_mutex);
742                 }
743                 txn->mt_txnid = env->me_txns->mti_txnid;
744                 r->mr_txnid = txn->mt_txnid;
745                 txn->mt_u.reader = r;
746         } else {
747                 pthread_mutex_lock(&env->me_txns->mti_wmutex);
748
749                 txn->mt_txnid = env->me_txns->mti_txnid+1;
750                 txn->mt_u.dirty_list = env->me_dirty_list;
751                 txn->mt_u.dirty_list[0].mid = 0;
752                 txn->mt_free_pgs = env->me_free_pgs;
753                 txn->mt_free_pgs[0] = 0;
754                 env->me_txn = txn;
755         }
756
757         toggle = env->me_txns->mti_me_toggle;
758         if ((rc = mdb_env_read_meta(env, &toggle)) != MDB_SUCCESS) {
759                 mdb_txn_reset0(txn);
760                 return rc;
761         }
762
763         /* Copy the DB arrays */
764         txn->mt_numdbs = env->me_numdbs;
765         txn->mt_dbxs = env->me_dbxs;    /* mostly static anyway */
766         memcpy(txn->mt_dbs, env->me_meta->mm_dbs, 2 * sizeof(MDB_db));
767         if (txn->mt_numdbs > 2)
768                 memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2,
769                         (txn->mt_numdbs - 2) * sizeof(MDB_db));
770
771         if (!(txn->mt_flags & MDB_TXN_RDONLY)) {
772                 if (toggle)
773                         txn->mt_flags |= MDB_TXN_METOGGLE;
774                 txn->mt_next_pgno = env->me_meta->mm_last_pg+1;
775         }
776
777         DPRINTF("begin txn %p %lu%c on mdbenv %p, root page %lu", txn,
778                 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
779                 (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
780
781         return MDB_SUCCESS;
782 }
783
784 int
785 mdb_txn_renew(MDB_txn *txn)
786 {
787         int rc;
788
789         if (!txn)
790                 return EINVAL;
791
792         rc = mdb_txn_renew0(txn);
793         if (rc == MDB_SUCCESS) {
794                 DPRINTF("reset txn %p %lu%c on mdbenv %p, root page %lu", txn,
795                         txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
796                         (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
797         }
798         return rc;
799 }
800
801 int
802 mdb_txn_begin(MDB_env *env, int rdonly, MDB_txn **ret)
803 {
804         MDB_txn *txn;
805         int rc;
806
807         if (env->me_flags & MDB_FATAL_ERROR) {
808                 DPUTS("mdb_txn_begin: environment had fatal error, must shutdown!");
809                 return MDB_PANIC;
810         }
811         if ((txn = calloc(1, sizeof(MDB_txn) + env->me_maxdbs * sizeof(MDB_db))) == NULL) {
812                 DPRINTF("calloc: %s", strerror(errno));
813                 return ENOMEM;
814         }
815         txn->mt_dbs = (MDB_db *)(txn+1);
816         if (rdonly) {
817                 txn->mt_flags |= MDB_TXN_RDONLY;
818         }
819         txn->mt_env = env;
820
821         rc = mdb_txn_renew0(txn);
822         if (rc)
823                 free(txn);
824         else {
825                 *ret = txn;
826                 DPRINTF("begin txn %p %lu%c on mdbenv %p, root page %lu", txn,
827                         txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
828                         (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
829         }
830
831         return rc;
832 }
833
834 static inline void
835 mdb_txn_reset0(MDB_txn *txn)
836 {
837         MDB_env *env = txn->mt_env;
838
839         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
840                 txn->mt_u.reader->mr_txnid = 0;
841         } else {
842                 MDB_oldpages *mop;
843                 MDB_dpage *dp;
844                 unsigned int i;
845
846                 /* return all dirty pages to dpage list */
847                 for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
848                         dp = txn->mt_u.dirty_list[i].mptr;
849                         if (dp->h.md_num == 1) {
850                                 dp->h.md_parent = (MDB_page *)txn->mt_env->me_dpages;
851                                 txn->mt_env->me_dpages = dp;
852                         } else {
853                                 /* large pages just get freed directly */
854                                 free(dp);
855                         }
856                 }
857
858                 while ((mop = txn->mt_env->me_pghead)) {
859                         txn->mt_env->me_pghead = mop->mo_next;
860                         free(mop);
861                 }
862
863                 env->me_txn = NULL;
864                 for (i=2; i<env->me_numdbs; i++)
865                         env->me_dbxs[i].md_dirty = 0;
866                 pthread_mutex_unlock(&env->me_txns->mti_wmutex);
867         }
868 }
869
870 void
871 mdb_txn_reset(MDB_txn *txn)
872 {
873         if (txn == NULL)
874                 return;
875
876         DPRINTF("reset txn %p %lu%c on mdbenv %p, root page %lu", txn,
877                 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
878                 (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
879
880         mdb_txn_reset0(txn);
881 }
882
883 void
884 mdb_txn_abort(MDB_txn *txn)
885 {
886         if (txn == NULL)
887                 return;
888
889         DPRINTF("abort txn %p %lu%c on mdbenv %p, root page %lu", txn,
890                 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
891                 (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
892
893         mdb_txn_reset0(txn);
894         free(txn);
895 }
896
897 int
898 mdb_txn_commit(MDB_txn *txn)
899 {
900         int              n, done;
901         unsigned int i;
902         ssize_t          rc;
903         off_t            size;
904         MDB_dpage       *dp;
905         MDB_env *env;
906         pgno_t  next;
907         struct iovec     iov[MDB_COMMIT_PAGES];
908
909         assert(txn != NULL);
910         assert(txn->mt_env != NULL);
911
912         env = txn->mt_env;
913
914         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
915                 mdb_txn_abort(txn);
916                 return MDB_SUCCESS;
917         }
918
919         if (txn != env->me_txn) {
920                 DPUTS("attempt to commit unknown transaction");
921                 mdb_txn_abort(txn);
922                 return EINVAL;
923         }
924
925         if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
926                 DPUTS("error flag is set, can't commit");
927                 mdb_txn_abort(txn);
928                 return EINVAL;
929         }
930
931         if (!txn->mt_u.dirty_list[0].mid)
932                 goto done;
933
934         DPRINTF("committing txn %p %lu on mdbenv %p, root page %lu", txn,
935             txn->mt_txnid, (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
936
937         /* should only be one record now */
938         if (env->me_pghead) {
939                 MDB_val key, data;
940                 MDB_oldpages *mop;
941
942                 mop = env->me_pghead;
943                 key.mv_size = sizeof(pgno_t);
944                 key.mv_data = (char *)&mop->mo_txnid;
945                 data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
946                 data.mv_data = mop->mo_pages;
947                 mdb_put0(txn, FREE_DBI, &key, &data, 0);
948                 free(env->me_pghead);
949                 env->me_pghead = NULL;
950         }
951         /* save to free list */
952         if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
953                 MDB_val key, data;
954                 MDB_pageparent mpp;
955
956                 /* make sure last page of freeDB is touched and on freelist */
957                 key.mv_size = MAXKEYSIZE+1;
958                 key.mv_data = NULL;
959                 mpp.mp_parent = NULL;
960                 mpp.mp_pi = 0;
961                 mdb_search_page(txn, FREE_DBI, &key, NULL, 1, &mpp);
962
963 #if DEBUG > 1
964                 {
965                         unsigned int i;
966                         ULONG *idl = txn->mt_free_pgs;
967                         DPRINTF("IDL write txn %lu root %lu num %lu",
968                                 txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
969                         for (i=0; i<idl[0]; i++) {
970                                 DPRINTF("IDL %lu", idl[i+1]);
971                         }
972                 }
973 #endif
974                 /* write to last page of freeDB */
975                 key.mv_size = sizeof(pgno_t);
976                 key.mv_data = (char *)&txn->mt_txnid;
977                 data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
978                 data.mv_data = txn->mt_free_pgs;
979                 mdb_put0(txn, FREE_DBI, &key, &data, 0);
980         }
981
982         /* Update DB root pointers. Their pages have already been
983          * touched so this is all in-place and cannot fail.
984          */
985         {
986                 MDB_val data;
987                 data.mv_size = sizeof(MDB_db);
988
989                 for (i = 2; i < txn->mt_numdbs; i++) {
990                         if (txn->mt_dbxs[i].md_dirty) {
991                                 data.mv_data = &txn->mt_dbs[i];
992                                 mdb_put0(txn, MAIN_DBI, &txn->mt_dbxs[i].md_name, &data, 0);
993                         }
994                 }
995         }
996
997         /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
998          */
999         next = 0;
1000         i = 1;
1001         do {
1002                 n = 0;
1003                 done = 1;
1004                 size = 0;
1005                 for (; i<=txn->mt_u.dirty_list[0].mid; i++) {
1006                         dp = txn->mt_u.dirty_list[i].mptr;
1007                         if (dp->p.mp_pgno != next) {
1008                                 if (n) {
1009                                         DPRINTF("committing %u dirty pages", n);
1010                                         rc = writev(env->me_fd, iov, n);
1011                                         if (rc != size) {
1012                                                 n = errno;
1013                                                 if (rc > 0)
1014                                                         DPUTS("short write, filesystem full?");
1015                                                 else
1016                                                         DPRINTF("writev: %s", strerror(errno));
1017                                                 mdb_txn_abort(txn);
1018                                                 return n;
1019                                         }
1020                                         n = 0;
1021                                         size = 0;
1022                                 }
1023                                 lseek(env->me_fd, dp->p.mp_pgno * env->me_psize, SEEK_SET);
1024                                 next = dp->p.mp_pgno;
1025                         }
1026                         DPRINTF("committing page %lu", dp->p.mp_pgno);
1027                         iov[n].iov_len = env->me_psize * dp->h.md_num;
1028                         iov[n].iov_base = &dp->p;
1029                         size += iov[n].iov_len;
1030                         next = dp->p.mp_pgno + dp->h.md_num;
1031                         /* clear dirty flag */
1032                         dp->p.mp_flags &= ~P_DIRTY;
1033                         if (++n >= MDB_COMMIT_PAGES) {
1034                                 done = 0;
1035                                 i++;
1036                                 break;
1037                         }
1038                 }
1039
1040                 if (n == 0)
1041                         break;
1042
1043                 DPRINTF("committing %u dirty pages", n);
1044                 rc = writev(env->me_fd, iov, n);
1045                 if (rc != size) {
1046                         n = errno;
1047                         if (rc > 0)
1048                                 DPUTS("short write, filesystem full?");
1049                         else
1050                                 DPRINTF("writev: %s", strerror(errno));
1051                         mdb_txn_abort(txn);
1052                         return n;
1053                 }
1054
1055         } while (!done);
1056
1057         /* Drop the dirty pages.
1058          */
1059         for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
1060                 dp = txn->mt_u.dirty_list[i].mptr;
1061                 if (dp->h.md_num == 1) {
1062                         dp->h.md_parent = (MDB_page *)txn->mt_env->me_dpages;
1063                         txn->mt_env->me_dpages = dp;
1064                 } else {
1065                         free(dp);
1066                 }
1067                 txn->mt_u.dirty_list[i].mid = 0;
1068         }
1069         txn->mt_u.dirty_list[0].mid = 0;
1070
1071         if ((n = mdb_env_sync(env, 0)) != 0 ||
1072             (n = mdb_env_write_meta(txn)) != MDB_SUCCESS) {
1073                 mdb_txn_abort(txn);
1074                 return n;
1075         }
1076
1077         env->me_txns->mti_txnid = txn->mt_txnid;
1078
1079 done:
1080         env->me_txn = NULL;
1081         /* update the DB tables */
1082         {
1083                 int toggle = !env->me_db_toggle;
1084
1085                 for (i = 2; i < env->me_numdbs; i++) {
1086                         if (txn->mt_dbxs[i].md_dirty) {
1087                                 env->me_dbs[toggle][i] = txn->mt_dbs[i];
1088                                 txn->mt_dbxs[i].md_dirty = 0;
1089                         }
1090                 }
1091                 for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
1092                         txn->mt_dbxs[i].md_dirty = 0;
1093                         env->me_dbxs[i] = txn->mt_dbxs[i];
1094                         env->me_dbs[toggle][i] = txn->mt_dbs[i];
1095                 }
1096                 env->me_db_toggle = toggle;
1097                 env->me_numdbs = txn->mt_numdbs;
1098         }
1099
1100         pthread_mutex_unlock(&env->me_txns->mti_wmutex);
1101         free(txn);
1102
1103         return MDB_SUCCESS;
1104 }
1105
1106 static int
1107 mdb_env_read_header(MDB_env *env, MDB_meta *meta)
1108 {
1109         char             page[PAGESIZE];
1110         MDB_page        *p;
1111         MDB_meta        *m;
1112         int              rc;
1113
1114         assert(env != NULL);
1115
1116         /* We don't know the page size yet, so use a minimum value.
1117          */
1118
1119         if ((rc = pread(env->me_fd, page, PAGESIZE, 0)) == 0) {
1120                 return ENOENT;
1121         } else if (rc != PAGESIZE) {
1122                 if (rc > 0)
1123                         errno = EINVAL;
1124                 DPRINTF("read: %s", strerror(errno));
1125                 return errno;
1126         }
1127
1128         p = (MDB_page *)page;
1129
1130         if (!F_ISSET(p->mp_flags, P_META)) {
1131                 DPRINTF("page %lu not a meta page", p->mp_pgno);
1132                 return EINVAL;
1133         }
1134
1135         m = METADATA(p);
1136         if (m->mm_magic != MDB_MAGIC) {
1137                 DPUTS("meta has invalid magic");
1138                 return EINVAL;
1139         }
1140
1141         if (m->mm_version != MDB_VERSION) {
1142                 DPRINTF("database is version %u, expected version %u",
1143                     m->mm_version, MDB_VERSION);
1144                 return MDB_VERSION_MISMATCH;
1145         }
1146
1147         memcpy(meta, m, sizeof(*m));
1148         return 0;
1149 }
1150
1151 static int
1152 mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
1153 {
1154         MDB_page *p, *q;
1155         MDB_meta *m;
1156         int rc;
1157         unsigned int     psize;
1158
1159         DPUTS("writing new meta page");
1160         psize = sysconf(_SC_PAGE_SIZE);
1161
1162         meta->mm_magic = MDB_MAGIC;
1163         meta->mm_version = MDB_VERSION;
1164         meta->mm_psize = psize;
1165         meta->mm_last_pg = 1;
1166         meta->mm_flags = env->me_flags & 0xffff;
1167         meta->mm_flags |= MDB_INTEGERKEY;
1168         meta->mm_dbs[0].md_root = P_INVALID;
1169         meta->mm_dbs[1].md_root = P_INVALID;
1170
1171         p = calloc(2, psize);
1172         p->mp_pgno = 0;
1173         p->mp_flags = P_META;
1174
1175         m = METADATA(p);
1176         memcpy(m, meta, sizeof(*meta));
1177
1178         q = (MDB_page *)((char *)p + psize);
1179
1180         q->mp_pgno = 1;
1181         q->mp_flags = P_META;
1182
1183         m = METADATA(q);
1184         memcpy(m, meta, sizeof(*meta));
1185
1186         rc = write(env->me_fd, p, psize * 2);
1187         free(p);
1188         return (rc == (int)psize * 2) ? MDB_SUCCESS : errno;
1189 }
1190
1191 static int
1192 mdb_env_write_meta(MDB_txn *txn)
1193 {
1194         MDB_env *env;
1195         MDB_meta        meta, metab;
1196         off_t off;
1197         int rc, len, toggle;
1198         char *ptr;
1199
1200         assert(txn != NULL);
1201         assert(txn->mt_env != NULL);
1202
1203         toggle = !F_ISSET(txn->mt_flags, MDB_TXN_METOGGLE);
1204         DPRINTF("writing meta page %d for root page %lu",
1205                 toggle, txn->mt_dbs[MAIN_DBI].md_root);
1206
1207         env = txn->mt_env;
1208
1209         metab.mm_txnid = env->me_metas[toggle]->mm_txnid;
1210         metab.mm_last_pg = env->me_metas[toggle]->mm_last_pg;
1211
1212         ptr = (char *)&meta;
1213         off = offsetof(MDB_meta, mm_dbs[0].md_depth);
1214         len = sizeof(MDB_meta) - off;
1215
1216         ptr += off;
1217         meta.mm_dbs[0] = txn->mt_dbs[0];
1218         meta.mm_dbs[1] = txn->mt_dbs[1];
1219         meta.mm_last_pg = txn->mt_next_pgno - 1;
1220         meta.mm_txnid = txn->mt_txnid;
1221
1222         if (toggle)
1223                 off += env->me_psize;
1224         off += PAGEHDRSZ;
1225
1226         /* Write to the SYNC fd */
1227         rc = pwrite(env->me_mfd, ptr, len, off);
1228         if (rc != len) {
1229                 int r2;
1230                 rc = errno;
1231                 DPUTS("write failed, disk error?");
1232                 /* On a failure, the pagecache still contains the new data.
1233                  * Write some old data back, to prevent it from being used.
1234                  * Use the non-SYNC fd; we know it will fail anyway.
1235                  */
1236                 meta.mm_last_pg = metab.mm_last_pg;
1237                 meta.mm_txnid = metab.mm_txnid;
1238                 r2 = pwrite(env->me_fd, ptr, len, off);
1239                 env->me_flags |= MDB_FATAL_ERROR;
1240                 return rc;
1241         }
1242         txn->mt_env->me_txns->mti_me_toggle = toggle;
1243
1244         return MDB_SUCCESS;
1245 }
1246
1247 static int
1248 mdb_env_read_meta(MDB_env *env, int *which)
1249 {
1250         int toggle = 0;
1251
1252         assert(env != NULL);
1253
1254         if (which)
1255                 toggle = *which;
1256         else if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
1257                 toggle = 1;
1258
1259         if (env->me_meta != env->me_metas[toggle])
1260                 env->me_meta = env->me_metas[toggle];
1261
1262         DPRINTF("Using meta page %d", toggle);
1263
1264         return MDB_SUCCESS;
1265 }
1266
1267 int
1268 mdb_env_create(MDB_env **env)
1269 {
1270         MDB_env *e;
1271
1272         e = calloc(1, sizeof(MDB_env));
1273         if (!e) return ENOMEM;
1274
1275         e->me_maxreaders = DEFAULT_READERS;
1276         e->me_maxdbs = 2;
1277         e->me_fd = -1;
1278         e->me_lfd = -1;
1279         e->me_mfd = -1;
1280         *env = e;
1281         return MDB_SUCCESS;
1282 }
1283
1284 int
1285 mdb_env_set_mapsize(MDB_env *env, size_t size)
1286 {
1287         if (env->me_map)
1288                 return EINVAL;
1289         env->me_mapsize = size;
1290         return MDB_SUCCESS;
1291 }
1292
1293 int
1294 mdb_env_set_maxdbs(MDB_env *env, int dbs)
1295 {
1296         env->me_maxdbs = dbs;
1297         return MDB_SUCCESS;
1298 }
1299
1300 int
1301 mdb_env_set_maxreaders(MDB_env *env, int readers)
1302 {
1303         env->me_maxreaders = readers;
1304         return MDB_SUCCESS;
1305 }
1306
1307 int
1308 mdb_env_get_maxreaders(MDB_env *env, int *readers)
1309 {
1310         if (!env || !readers)
1311                 return EINVAL;
1312         *readers = env->me_maxreaders;
1313         return MDB_SUCCESS;
1314 }
1315
1316 static int
1317 mdb_env_open2(MDB_env *env, unsigned int flags)
1318 {
1319         int i, newenv = 0;
1320         MDB_meta meta;
1321         MDB_page *p;
1322
1323         env->me_flags = flags;
1324
1325         memset(&meta, 0, sizeof(meta));
1326
1327         if ((i = mdb_env_read_header(env, &meta)) != 0) {
1328                 if (i != ENOENT)
1329                         return i;
1330                 DPUTS("new mdbenv");
1331                 newenv = 1;
1332         }
1333
1334         if (!env->me_mapsize) {
1335                 env->me_mapsize = newenv ? DEFAULT_MAPSIZE : meta.mm_mapsize;
1336         }
1337
1338         i = MAP_SHARED;
1339         if (meta.mm_address && (flags & MDB_FIXEDMAP))
1340                 i |= MAP_FIXED;
1341         env->me_map = mmap(meta.mm_address, env->me_mapsize, PROT_READ, i,
1342                 env->me_fd, 0);
1343         if (env->me_map == MAP_FAILED)
1344                 return errno;
1345
1346         if (newenv) {
1347                 meta.mm_mapsize = env->me_mapsize;
1348                 if (flags & MDB_FIXEDMAP)
1349                         meta.mm_address = env->me_map;
1350                 i = mdb_env_init_meta(env, &meta);
1351                 if (i != MDB_SUCCESS) {
1352                         munmap(env->me_map, env->me_mapsize);
1353                         return i;
1354                 }
1355         }
1356         env->me_psize = meta.mm_psize;
1357
1358         env->me_maxpg = env->me_mapsize / env->me_psize;
1359
1360         p = (MDB_page *)env->me_map;
1361         env->me_metas[0] = METADATA(p);
1362         env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize);
1363
1364         if ((i = mdb_env_read_meta(env, NULL)) != 0)
1365                 return i;
1366
1367         DPRINTF("opened database version %u, pagesize %u",
1368             env->me_meta->mm_version, env->me_psize);
1369         DPRINTF("depth: %u", env->me_meta->mm_dbs[MAIN_DBI].md_depth);
1370         DPRINTF("entries: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_entries);
1371         DPRINTF("branch pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_branch_pages);
1372         DPRINTF("leaf pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_leaf_pages);
1373         DPRINTF("overflow pages: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_overflow_pages);
1374         DPRINTF("root: %lu", env->me_meta->mm_dbs[MAIN_DBI].md_root);
1375
1376         return MDB_SUCCESS;
1377 }
1378
1379 static void
1380 mdb_env_reader_dest(void *ptr)
1381 {
1382         MDB_reader *reader = ptr;
1383
1384         reader->mr_txnid = 0;
1385         reader->mr_pid = 0;
1386         reader->mr_tid = 0;
1387 }
1388
1389 /* downgrade the exclusive lock on the region back to shared */
1390 static void
1391 mdb_env_share_locks(MDB_env *env)
1392 {
1393         struct flock lock_info;
1394
1395         env->me_txns->mti_txnid = env->me_meta->mm_txnid;
1396         if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
1397                 env->me_txns->mti_me_toggle = 1;
1398
1399         memset((void *)&lock_info, 0, sizeof(lock_info));
1400         lock_info.l_type = F_RDLCK;
1401         lock_info.l_whence = SEEK_SET;
1402         lock_info.l_start = 0;
1403         lock_info.l_len = 1;
1404         fcntl(env->me_lfd, F_SETLK, &lock_info);
1405 }
1406
1407 static int
1408 mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
1409 {
1410         int rc;
1411         off_t size, rsize;
1412         struct flock lock_info;
1413
1414         *excl = 0;
1415
1416         if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT, mode)) == -1) {
1417                 rc = errno;
1418                 return rc;
1419         }
1420         /* Try to get exclusive lock. If we succeed, then
1421          * nobody is using the lock region and we should initialize it.
1422          */
1423         memset((void *)&lock_info, 0, sizeof(lock_info));
1424         lock_info.l_type = F_WRLCK;
1425         lock_info.l_whence = SEEK_SET;
1426         lock_info.l_start = 0;
1427         lock_info.l_len = 1;
1428         rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
1429         if (rc == 0) {
1430                 *excl = 1;
1431         } else {
1432                 lock_info.l_type = F_RDLCK;
1433                 rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
1434                 if (rc) {
1435                         rc = errno;
1436                         goto fail;
1437                 }
1438         }
1439         size = lseek(env->me_lfd, 0, SEEK_END);
1440         rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
1441         if (size < rsize && *excl) {
1442                 if (ftruncate(env->me_lfd, rsize) != 0) {
1443                         rc = errno;
1444                         goto fail;
1445                 }
1446         } else {
1447                 rsize = size;
1448                 size = rsize - sizeof(MDB_txninfo);
1449                 env->me_maxreaders = size/sizeof(MDB_reader) + 1;
1450         }
1451         env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
1452                 env->me_lfd, 0);
1453         if (env->me_txns == MAP_FAILED) {
1454                 rc = errno;
1455                 goto fail;
1456         }
1457         if (*excl) {
1458                 pthread_mutexattr_t mattr;
1459
1460                 pthread_mutexattr_init(&mattr);
1461                 rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
1462                 if (rc) {
1463                         goto fail;
1464                 }
1465                 pthread_mutex_init(&env->me_txns->mti_mutex, &mattr);
1466                 pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr);
1467                 env->me_txns->mti_version = MDB_VERSION;
1468                 env->me_txns->mti_magic = MDB_MAGIC;
1469                 env->me_txns->mti_txnid = 0;
1470                 env->me_txns->mti_numreaders = 0;
1471                 env->me_txns->mti_me_toggle = 0;
1472
1473         } else {
1474                 if (env->me_txns->mti_magic != MDB_MAGIC) {
1475                         DPUTS("lock region has invalid magic");
1476                         rc = EINVAL;
1477                         goto fail;
1478                 }
1479                 if (env->me_txns->mti_version != MDB_VERSION) {
1480                         DPRINTF("lock region is version %u, expected version %u",
1481                                 env->me_txns->mti_version, MDB_VERSION);
1482                         rc = MDB_VERSION_MISMATCH;
1483                         goto fail;
1484                 }
1485                 if (errno != EACCES && errno != EAGAIN) {
1486                         rc = errno;
1487                         goto fail;
1488                 }
1489         }
1490         return MDB_SUCCESS;
1491
1492 fail:
1493         close(env->me_lfd);
1494         env->me_lfd = -1;
1495         return rc;
1496
1497 }
1498
1499 #define LOCKNAME        "/lock.mdb"
1500 #define DATANAME        "/data.mdb"
1501 int
1502 mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
1503 {
1504         int             oflags, rc, len, excl;
1505         char *lpath, *dpath;
1506
1507         len = strlen(path);
1508         lpath = malloc(len + sizeof(LOCKNAME) + len + sizeof(DATANAME));
1509         if (!lpath)
1510                 return ENOMEM;
1511         dpath = lpath + len + sizeof(LOCKNAME);
1512         sprintf(lpath, "%s" LOCKNAME, path);
1513         sprintf(dpath, "%s" DATANAME, path);
1514
1515         rc = mdb_env_setup_locks(env, lpath, mode, &excl);
1516         if (rc)
1517                 goto leave;
1518
1519         if (F_ISSET(flags, MDB_RDONLY))
1520                 oflags = O_RDONLY;
1521         else
1522                 oflags = O_RDWR | O_CREAT;
1523
1524         if ((env->me_fd = open(dpath, oflags, mode)) == -1) {
1525                 rc = errno;
1526                 goto leave;
1527         }
1528
1529         if ((rc = mdb_env_open2(env, flags)) == MDB_SUCCESS) {
1530                 /* synchronous fd for meta writes */
1531                 if (!(flags & (MDB_RDONLY|MDB_NOSYNC)))
1532                         oflags |= MDB_DSYNC;
1533                 if ((env->me_mfd = open(dpath, oflags, mode)) == -1) {
1534                         rc = errno;
1535                         goto leave;
1536                 }
1537
1538                 env->me_path = strdup(path);
1539                 DPRINTF("opened dbenv %p", (void *) env);
1540                 pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
1541                 if (excl)
1542                         mdb_env_share_locks(env);
1543                 env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
1544                 env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db));
1545                 env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db));
1546                 env->me_numdbs = 2;
1547         }
1548
1549 leave:
1550         if (rc) {
1551                 if (env->me_fd >= 0) {
1552                         close(env->me_fd);
1553                         env->me_fd = -1;
1554                 }
1555                 if (env->me_lfd >= 0) {
1556                         close(env->me_lfd);
1557                         env->me_lfd = -1;
1558                 }
1559         }
1560         free(lpath);
1561         return rc;
1562 }
1563
1564 void
1565 mdb_env_close(MDB_env *env)
1566 {
1567         MDB_dpage *dp;
1568
1569         if (env == NULL)
1570                 return;
1571
1572         while (env->me_dpages) {
1573                 dp = env->me_dpages;
1574                 env->me_dpages = (MDB_dpage *)dp->h.md_parent;
1575                 free(dp);
1576         }
1577
1578         free(env->me_dbs[1]);
1579         free(env->me_dbs[0]);
1580         free(env->me_dbxs);
1581         free(env->me_path);
1582
1583         pthread_key_delete(env->me_txkey);
1584
1585         if (env->me_map) {
1586                 munmap(env->me_map, env->me_mapsize);
1587         }
1588         close(env->me_mfd);
1589         close(env->me_fd);
1590         if (env->me_txns) {
1591                 pid_t pid = getpid();
1592                 size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
1593                 int i;
1594                 for (i=0; i<env->me_txns->mti_numreaders; i++)
1595                         if (env->me_txns->mti_readers[i].mr_pid == pid)
1596                                 env->me_txns->mti_readers[i].mr_pid = 0;
1597                 munmap(env->me_txns, size);
1598         }
1599         close(env->me_lfd);
1600         free(env);
1601 }
1602
1603 /* Search for key within a leaf page, using binary search.
1604  * Returns the smallest entry larger or equal to the key.
1605  * If exactp is non-null, stores whether the found entry was an exact match
1606  * in *exactp (1 or 0).
1607  * If kip is non-null, stores the index of the found entry in *kip.
1608  * If no entry larger or equal to the key is found, returns NULL.
1609  */
1610 static MDB_node *
1611 mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key,
1612     int *exactp, unsigned int *kip)
1613 {
1614         unsigned int     i = 0;
1615         int              low, high;
1616         int              rc = 0;
1617         MDB_node        *node = NULL;
1618         MDB_val  nodekey;
1619         DKBUF;
1620
1621         DPRINTF("searching %u keys in %s page %lu",
1622             NUMKEYS(mp),
1623             IS_LEAF(mp) ? "leaf" : "branch",
1624             mp->mp_pgno);
1625
1626         assert(NUMKEYS(mp) > 0);
1627
1628         memset(&nodekey, 0, sizeof(nodekey));
1629
1630         low = IS_LEAF(mp) ? 0 : 1;
1631         high = NUMKEYS(mp) - 1;
1632         while (low <= high) {
1633                 i = (low + high) >> 1;
1634
1635                 if (IS_LEAF2(mp)) {
1636                         nodekey.mv_size = txn->mt_dbs[dbi].md_pad;
1637                         nodekey.mv_data = LEAF2KEY(mp, i, nodekey.mv_size);
1638                 } else {
1639                         node = NODEPTR(mp, i);
1640
1641                         nodekey.mv_size = node->mn_ksize;
1642                         nodekey.mv_data = NODEKEY(node);
1643                 }
1644
1645                 rc = mdb_cmp(txn, dbi, key, &nodekey);
1646
1647                 if (IS_LEAF(mp))
1648                         DPRINTF("found leaf index %u [%s], rc = %i",
1649                             i, DKEY(&nodekey), rc);
1650                 else
1651                         DPRINTF("found branch index %u [%s -> %lu], rc = %i",
1652                             i, DKEY(&nodekey), NODEPGNO(node), rc);
1653
1654                 if (rc == 0)
1655                         break;
1656                 if (rc > 0)
1657                         low = i + 1;
1658                 else
1659                         high = i - 1;
1660         }
1661
1662         if (rc > 0) {   /* Found entry is less than the key. */
1663                 i++;    /* Skip to get the smallest entry larger than key. */
1664                 if (i >= NUMKEYS(mp))
1665                         /* There is no entry larger or equal to the key. */
1666                         return NULL;
1667         }
1668         if (exactp)
1669                 *exactp = (rc == 0);
1670         if (kip)        /* Store the key index if requested. */
1671                 *kip = i;
1672
1673         /* nodeptr is fake for LEAF2 */
1674         return IS_LEAF2(mp) ? NODEPTR(mp, 0) : NODEPTR(mp, i);
1675 }
1676
1677 static void
1678 cursor_pop_page(MDB_cursor *cursor)
1679 {
1680         MDB_ppage       *top;
1681
1682         if (cursor->mc_snum) {
1683                 top = CURSOR_TOP(cursor);
1684                 cursor->mc_snum--;
1685
1686                 DPRINTF("popped page %lu off db %u cursor %p", top->mp_page->mp_pgno,
1687                         cursor->mc_dbi, (void *) cursor);
1688         }
1689 }
1690
1691 static MDB_ppage *
1692 cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
1693 {
1694         MDB_ppage       *ppage;
1695
1696         DPRINTF("pushing page %lu on db %u cursor %p", mp->mp_pgno,
1697                 cursor->mc_dbi, (void *) cursor);
1698
1699         assert(cursor->mc_snum < CURSOR_STACK);
1700
1701         ppage = &cursor->mc_stack[cursor->mc_snum++];
1702         ppage->mp_page = mp;
1703         ppage->mp_ki = 0;
1704         return ppage;
1705 }
1706
1707 static int
1708 mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **ret)
1709 {
1710         MDB_page *p = NULL;
1711
1712         if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) {
1713                 MDB_dpage *dp;
1714                 MIDL2 id;
1715                 unsigned x;
1716                 id.mid = pgno;
1717                 x = mdb_midl2_search(txn->mt_u.dirty_list, &id);
1718                 if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) {
1719                         dp = txn->mt_u.dirty_list[x].mptr;
1720                         p = &dp->p;
1721                 }
1722         }
1723         if (!p) {
1724                 if (pgno <= txn->mt_env->me_meta->mm_last_pg)
1725                         p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
1726         }
1727         *ret = p;
1728         if (!p) {
1729                 DPRINTF("page %lu not found", pgno);
1730                 assert(p != NULL);
1731         }
1732         return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND;
1733 }
1734
1735 static int
1736 mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
1737     MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1738 {
1739         MDB_page        *mp = mpp->mp_page;
1740         DKBUF;
1741         int rc;
1742
1743         if (cursor && cursor_push_page(cursor, mp) == NULL)
1744                 return ENOMEM;
1745
1746         while (IS_BRANCH(mp)) {
1747                 unsigned int     i = 0;
1748                 MDB_node        *node;
1749
1750                 DPRINTF("branch page %lu has %u keys", mp->mp_pgno, NUMKEYS(mp));
1751                 assert(NUMKEYS(mp) > 1);
1752                 DPRINTF("found index 0 to page %lu", NODEPGNO(NODEPTR(mp, 0)));
1753
1754                 if (key == NULL)        /* Initialize cursor to first page. */
1755                         i = 0;
1756                 else if (key->mv_size > MAXKEYSIZE && key->mv_data == NULL) {
1757                                                         /* cursor to last page */
1758                         i = NUMKEYS(mp)-1;
1759                 } else {
1760                         int      exact;
1761                         node = mdb_search_node(txn, dbi, mp, key, &exact, &i);
1762                         if (node == NULL)
1763                                 i = NUMKEYS(mp) - 1;
1764                         else if (!exact) {
1765                                 assert(i > 0);
1766                                 i--;
1767                         }
1768                 }
1769
1770                 if (key)
1771                         DPRINTF("following index %u for key [%s]",
1772                             i, DKEY(key));
1773                 assert(i < NUMKEYS(mp));
1774                 node = NODEPTR(mp, i);
1775
1776                 if (cursor)
1777                         CURSOR_TOP(cursor)->mp_ki = i;
1778
1779                 mpp->mp_parent = mp;
1780                 if ((rc = mdb_get_page(txn, NODEPGNO(node), &mp)))
1781                         return rc;
1782                 mpp->mp_pi = i;
1783                 mpp->mp_page = mp;
1784
1785                 if (cursor && cursor_push_page(cursor, mp) == NULL)
1786                         return ENOMEM;
1787
1788                 if (modify) {
1789                         MDB_dhead *dh = ((MDB_dhead *)mp)-1;
1790                         if ((rc = mdb_touch(txn, mpp)) != 0)
1791                                 return rc;
1792                         dh = ((MDB_dhead *)mpp->mp_page)-1;
1793                         dh->md_parent = mpp->mp_parent;
1794                         dh->md_pi = mpp->mp_pi;
1795                 }
1796
1797                 mp = mpp->mp_page;
1798         }
1799
1800         if (!IS_LEAF(mp)) {
1801                 DPRINTF("internal error, index points to a %02X page!?",
1802                     mp->mp_flags);
1803                 return MDB_CORRUPTED;
1804         }
1805
1806         DPRINTF("found leaf page %lu for key [%s]", mp->mp_pgno,
1807             key ? DKEY(key) : NULL);
1808
1809         return MDB_SUCCESS;
1810 }
1811
1812 /* Search for the page a given key should be in.
1813  * Stores a pointer to the found page in *mpp.
1814  * If key is NULL, search for the lowest page (used by mdb_cursor_first).
1815  * If cursor is non-null, pushes parent pages on the cursor stack.
1816  * If modify is true, visited pages are updated with new page numbers.
1817  */
1818 static int
1819 mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
1820     MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1821 {
1822         int              rc;
1823         pgno_t           root;
1824
1825         /* Choose which root page to start with. If a transaction is given
1826          * use the root page from the transaction, otherwise read the last
1827          * committed root page.
1828          */
1829         if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
1830                 DPUTS("transaction has failed, must abort");
1831                 return EINVAL;
1832         } else
1833                 root = txn->mt_dbs[dbi].md_root;
1834
1835         if (root == P_INVALID) {                /* Tree is empty. */
1836                 DPUTS("tree is empty");
1837                 return MDB_NOTFOUND;
1838         }
1839
1840         if (rc = mdb_get_page(txn, root, &mpp->mp_page))
1841                 return rc;
1842
1843         DPRINTF("db %u root page %lu has flags 0x%X",
1844                 dbi, root,  mpp->mp_page->mp_flags);
1845
1846         if (modify) {
1847                 /* For sub-databases, update main root first */
1848                 if (dbi > MAIN_DBI && !txn->mt_dbxs[dbi].md_dirty) {
1849                         MDB_pageparent mp2;
1850                         rc = mdb_search_page(txn, MAIN_DBI, &txn->mt_dbxs[dbi].md_name,
1851                                 NULL, 1, &mp2);
1852                         if (rc)
1853                                 return rc;
1854                         txn->mt_dbxs[dbi].md_dirty = 1;
1855                 }
1856                 if (!F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
1857                         mpp->mp_parent = NULL;
1858                         mpp->mp_pi = 0;
1859                         if ((rc = mdb_touch(txn, mpp)))
1860                                 return rc;
1861                         txn->mt_dbs[dbi].md_root = mpp->mp_page->mp_pgno;
1862                 }
1863         }
1864
1865         return mdb_search_page_root(txn, dbi, key, cursor, modify, mpp);
1866 }
1867
1868 static int
1869 mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
1870 {
1871         MDB_page        *omp;           /* overflow mpage */
1872         pgno_t           pgno;
1873         int rc;
1874
1875         if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
1876                 data->mv_size = leaf->mn_dsize;
1877                 data->mv_data = NODEDATA(leaf);
1878                 return MDB_SUCCESS;
1879         }
1880
1881         /* Read overflow data.
1882          */
1883         data->mv_size = leaf->mn_dsize;
1884         memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
1885         if (rc = mdb_get_page(txn, pgno, &omp)) {
1886                 DPRINTF("read overflow page %lu failed", pgno);
1887                 return rc;
1888         }
1889         data->mv_data = METADATA(omp);
1890
1891         return MDB_SUCCESS;
1892 }
1893
1894 int
1895 mdb_get(MDB_txn *txn, MDB_dbi dbi,
1896     MDB_val *key, MDB_val *data)
1897 {
1898         int              rc, exact;
1899         MDB_node        *leaf;
1900         MDB_pageparent mpp;
1901         DKBUF;
1902
1903         assert(key);
1904         assert(data);
1905         DPRINTF("===> get db %u key [%s]", dbi, DKEY(key));
1906
1907         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
1908                 return EINVAL;
1909
1910         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
1911                 return EINVAL;
1912         }
1913
1914         if ((rc = mdb_search_page(txn, dbi, key, NULL, 0, &mpp)) != MDB_SUCCESS)
1915                 return rc;
1916
1917         leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
1918         if (leaf && exact) {
1919                 /* Return first duplicate data item */
1920                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
1921                         MDB_xcursor mx;
1922
1923                         mdb_xcursor_init0(txn, dbi, &mx);
1924                         mdb_xcursor_init1(txn, dbi, &mx, leaf);
1925                         rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, NULL, NULL, 0, &mpp);
1926                         if (rc != MDB_SUCCESS)
1927                                 return rc;
1928                         if (IS_LEAF2(mpp.mp_page)) {
1929                                 data->mv_size = txn->mt_dbs[dbi].md_pad;
1930                                 data->mv_data = LEAF2KEY(mpp.mp_page, 0, data->mv_size);
1931                         } else {
1932                                 leaf = NODEPTR(mpp.mp_page, 0);
1933                                 data->mv_size = NODEKSZ(leaf);
1934                                 data->mv_data = NODEKEY(leaf);
1935                         }
1936                 } else {
1937                         rc = mdb_read_data(txn, leaf, data);
1938                 }
1939         } else {
1940                 rc = MDB_NOTFOUND;
1941         }
1942
1943         return rc;
1944 }
1945
1946 static int
1947 mdb_sibling(MDB_cursor *cursor, int move_right)
1948 {
1949         int              rc;
1950         MDB_node        *indx;
1951         MDB_ppage       *parent;
1952         MDB_page        *mp;
1953
1954         if (cursor->mc_snum < 2) {
1955                 return MDB_NOTFOUND;            /* root has no siblings */
1956         }
1957         parent = CURSOR_PARENT(cursor);
1958
1959         DPRINTF("parent page is page %lu, index %u",
1960             parent->mp_page->mp_pgno, parent->mp_ki);
1961
1962         cursor_pop_page(cursor);
1963         if (move_right ? (parent->mp_ki + 1 >= NUMKEYS(parent->mp_page))
1964                        : (parent->mp_ki == 0)) {
1965                 DPRINTF("no more keys left, moving to %s sibling",
1966                     move_right ? "right" : "left");
1967                 if ((rc = mdb_sibling(cursor, move_right)) != MDB_SUCCESS)
1968                         return rc;
1969                 parent = CURSOR_TOP(cursor);
1970         } else {
1971                 if (move_right)
1972                         parent->mp_ki++;
1973                 else
1974                         parent->mp_ki--;
1975                 DPRINTF("just moving to %s index key %u",
1976                     move_right ? "right" : "left", parent->mp_ki);
1977         }
1978         assert(IS_BRANCH(parent->mp_page));
1979
1980         indx = NODEPTR(parent->mp_page, parent->mp_ki);
1981         if (rc = mdb_get_page(cursor->mc_txn, NODEPGNO(indx), &mp))
1982                 return rc;;
1983 #if 0
1984         mp->parent = parent->mp_page;
1985         mp->parent_index = parent->mp_ki;
1986 #endif
1987
1988         cursor_push_page(cursor, mp);
1989
1990         return MDB_SUCCESS;
1991 }
1992
1993 static int
1994 mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op)
1995 {
1996         MDB_ppage       *top;
1997         MDB_page        *mp;
1998         MDB_node        *leaf;
1999         int rc;
2000
2001         if (cursor->mc_eof) {
2002                 return MDB_NOTFOUND;
2003         }
2004
2005         assert(cursor->mc_initialized);
2006
2007         top = CURSOR_TOP(cursor);
2008         mp = top->mp_page;
2009
2010         if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
2011                 leaf = NODEPTR(mp, top->mp_ki);
2012                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2013                         if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
2014                                 rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT);
2015                                 if (op != MDB_NEXT || rc == MDB_SUCCESS)
2016                                         return rc;
2017                         }
2018                 } else {
2019                         cursor->mc_xcursor->mx_cursor.mc_initialized = 0;
2020                         if (op == MDB_NEXT_DUP)
2021                                 return MDB_NOTFOUND;
2022                 }
2023         }
2024
2025         DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
2026
2027         if (top->mp_ki + 1 >= NUMKEYS(mp)) {
2028                 DPUTS("=====> move to next sibling page");
2029                 if (mdb_sibling(cursor, 1) != MDB_SUCCESS) {
2030                         cursor->mc_eof = 1;
2031                         return MDB_NOTFOUND;
2032                 }
2033                 top = CURSOR_TOP(cursor);
2034                 mp = top->mp_page;
2035                 DPRINTF("next page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
2036         } else
2037                 top->mp_ki++;
2038
2039         DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
2040             mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
2041
2042         if (IS_LEAF2(mp)) {
2043                 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2044                 key->mv_data = LEAF2KEY(mp, top->mp_ki, key->mv_size);
2045                 return MDB_SUCCESS;
2046         }
2047
2048         assert(IS_LEAF(mp));
2049         leaf = NODEPTR(mp, top->mp_ki);
2050
2051         if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2052                 mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
2053         }
2054         if (data) {
2055                 if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
2056                         return rc;
2057
2058                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2059                         rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
2060                         if (rc != MDB_SUCCESS)
2061                                 return rc;
2062                 }
2063         }
2064
2065         MDB_SET_KEY(leaf, key);
2066         return MDB_SUCCESS;
2067 }
2068
2069 static int
2070 mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op)
2071 {
2072         MDB_ppage       *top;
2073         MDB_page        *mp;
2074         MDB_node        *leaf;
2075         int rc;
2076
2077         assert(cursor->mc_initialized);
2078
2079         top = CURSOR_TOP(cursor);
2080         mp = top->mp_page;
2081
2082         if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
2083                 leaf = NODEPTR(mp, top->mp_ki);
2084                 if (op == MDB_PREV || op == MDB_PREV_DUP) {
2085                         if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2086                                 rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
2087                                 if (op != MDB_PREV || rc == MDB_SUCCESS)
2088                                         return rc;
2089                         } else {
2090                                 cursor->mc_xcursor->mx_cursor.mc_initialized = 0;
2091                                 if (op == MDB_PREV_DUP)
2092                                         return MDB_NOTFOUND;
2093                         }
2094                 }
2095         }
2096
2097         DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
2098
2099         if (top->mp_ki == 0)  {
2100                 DPUTS("=====> move to prev sibling page");
2101                 if (mdb_sibling(cursor, 0) != MDB_SUCCESS) {
2102                         cursor->mc_initialized = 0;
2103                         return MDB_NOTFOUND;
2104                 }
2105                 top = CURSOR_TOP(cursor);
2106                 mp = top->mp_page;
2107                 top->mp_ki = NUMKEYS(mp) - 1;
2108                 DPRINTF("prev page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
2109         } else
2110                 top->mp_ki--;
2111
2112         cursor->mc_eof = 0;
2113
2114         DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
2115             mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
2116
2117         if (IS_LEAF2(mp)) {
2118                 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2119                 key->mv_data = LEAF2KEY(mp, top->mp_ki, key->mv_size);
2120                 return MDB_SUCCESS;
2121         }
2122
2123         assert(IS_LEAF(mp));
2124         leaf = NODEPTR(mp, top->mp_ki);
2125
2126         if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2127                 mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
2128         }
2129         if (data) {
2130                 if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
2131                         return rc;
2132
2133                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2134                         rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
2135                         if (rc != MDB_SUCCESS)
2136                                 return rc;
2137                 }
2138         }
2139
2140         MDB_SET_KEY(leaf, key);
2141         return MDB_SUCCESS;
2142 }
2143
2144 static int
2145 mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
2146     MDB_cursor_op op, int *exactp)
2147 {
2148         int              rc;
2149         MDB_node        *leaf;
2150         MDB_ppage       *top;
2151         MDB_pageparent mpp;
2152         DKBUF;
2153
2154         assert(cursor);
2155         assert(key);
2156         assert(key->mv_size > 0);
2157
2158         /* See if we're already on the right page */
2159         if (cursor->mc_initialized) {
2160                 MDB_val nodekey;
2161                 top = CURSOR_TOP(cursor);
2162                 /* Don't try this for LEAF2 pages. Maybe support that later. */
2163                 if ((top->mp_page->mp_flags & (P_LEAF|P_LEAF2)) == P_LEAF) {
2164                         leaf = NODEPTR(top->mp_page, 0);
2165                         MDB_SET_KEY(leaf, &nodekey);
2166                         rc = mdb_cmp(cursor->mc_txn, cursor->mc_dbi, key, &nodekey);
2167                         if (rc >= 0) {
2168                                 leaf = NODEPTR(top->mp_page, NUMKEYS(top->mp_page)-1);
2169                                 MDB_SET_KEY(leaf, &nodekey);
2170                                 rc = mdb_cmp(cursor->mc_txn, cursor->mc_dbi, key, &nodekey);
2171                                 if (rc <= 0) {
2172                                         /* we're already on the right page */
2173                                         mpp.mp_page = top->mp_page;
2174                                         rc = 0;
2175                                         goto set2;
2176                                 }
2177                         }
2178                 }
2179         }
2180         cursor->mc_snum = 0;
2181
2182         rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp);
2183         if (rc != MDB_SUCCESS)
2184                 return rc;
2185
2186         assert(IS_LEAF(mpp.mp_page));
2187
2188         top = CURSOR_TOP(cursor);
2189 set2:
2190         leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki);
2191         if (exactp != NULL && !*exactp) {
2192                 /* MDB_SET specified and not an exact match. */
2193                 return MDB_NOTFOUND;
2194         }
2195
2196         if (leaf == NULL) {
2197                 DPUTS("===> inexact leaf not found, goto sibling");
2198                 if ((rc = mdb_sibling(cursor, 1)) != MDB_SUCCESS)
2199                         return rc;              /* no entries matched */
2200                 top = CURSOR_TOP(cursor);
2201                 top->mp_ki = 0;
2202                 mpp.mp_page = top->mp_page;
2203                 assert(IS_LEAF(mpp.mp_page));
2204                 leaf = NODEPTR(mpp.mp_page, 0);
2205         }
2206
2207         cursor->mc_initialized = 1;
2208         cursor->mc_eof = 0;
2209
2210         if (IS_LEAF2(mpp.mp_page)) {
2211                 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2212                 key->mv_data = LEAF2KEY(mpp.mp_page, top->mp_ki, key->mv_size);
2213                 return MDB_SUCCESS;
2214         }
2215
2216         if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2217                 mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
2218         }
2219         if (data) {
2220                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2221                         if (op == MDB_SET || op == MDB_SET_RANGE) {
2222                                 rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
2223                         } else {
2224                                 int ex2, *ex2p;
2225                                 if (op == MDB_GET_BOTH) {
2226                                         ex2p = &ex2;
2227                                 } else {
2228                                         ex2p = NULL;
2229                                 }
2230                                 rc = mdb_cursor_set(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_SET_RANGE, ex2p);
2231                                 if (rc != MDB_SUCCESS)
2232                                         return rc;
2233                         }
2234                 } else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) {
2235                         MDB_val d2;
2236                         if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
2237                                 return rc;
2238                         rc = mdb_dcmp(cursor->mc_txn, cursor->mc_dbi, data, &d2);
2239                         if (rc) {
2240                                 if (op == MDB_GET_BOTH || rc > 0)
2241                                         return MDB_NOTFOUND;
2242                         }
2243
2244                 } else {
2245                         if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
2246                                 return rc;
2247                 }
2248         }
2249
2250         /* The key already matches in all other cases */
2251         if (op == MDB_SET_RANGE)
2252                 MDB_SET_KEY(leaf, key);
2253         DPRINTF("==> cursor placed on key [%s]", DKEY(key));
2254
2255         return rc;
2256 }
2257
2258 static int
2259 mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
2260 {
2261         int              rc;
2262         MDB_pageparent  mpp;
2263         MDB_node        *leaf;
2264
2265         cursor->mc_snum = 0;
2266
2267         rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp);
2268         if (rc != MDB_SUCCESS)
2269                 return rc;
2270         assert(IS_LEAF(mpp.mp_page));
2271
2272         leaf = NODEPTR(mpp.mp_page, 0);
2273         cursor->mc_initialized = 1;
2274         cursor->mc_eof = 0;
2275
2276         if (IS_LEAF2(mpp.mp_page)) {
2277                 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2278                 key->mv_data = LEAF2KEY(mpp.mp_page, 0, key->mv_size);
2279                 return MDB_SUCCESS;
2280         }
2281
2282         if (data) {
2283                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2284                         mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
2285                         rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
2286                         if (rc)
2287                                 return rc;
2288                 } else {
2289                         if (cursor->mc_xcursor)
2290                                 cursor->mc_xcursor->mx_cursor.mc_initialized = 0;
2291                         if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
2292                                 return rc;
2293                 }
2294         }
2295         MDB_SET_KEY(leaf, key);
2296         return MDB_SUCCESS;
2297 }
2298
2299 static int
2300 mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
2301 {
2302         int              rc;
2303         MDB_ppage       *top;
2304         MDB_pageparent  mpp;
2305         MDB_node        *leaf;
2306         MDB_val lkey;
2307
2308         cursor->mc_snum = 0;
2309
2310         lkey.mv_size = MAXKEYSIZE+1;
2311         lkey.mv_data = NULL;
2312
2313         rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, &lkey, cursor, 0, &mpp);
2314         if (rc != MDB_SUCCESS)
2315                 return rc;
2316         assert(IS_LEAF(mpp.mp_page));
2317
2318         leaf = NODEPTR(mpp.mp_page, NUMKEYS(mpp.mp_page)-1);
2319         cursor->mc_initialized = 1;
2320         cursor->mc_eof = 0;
2321
2322         top = CURSOR_TOP(cursor);
2323         top->mp_ki = NUMKEYS(top->mp_page) - 1;
2324
2325         if (IS_LEAF2(mpp.mp_page)) {
2326                 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2327                 key->mv_data = LEAF2KEY(mpp.mp_page, top->mp_ki, key->mv_size);
2328                 return MDB_SUCCESS;
2329         }
2330
2331         if (data) {
2332                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2333                         mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, leaf);
2334                         rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
2335                         if (rc)
2336                                 return rc;
2337                 } else {
2338                         if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
2339                                 return rc;
2340                 }
2341         }
2342
2343         MDB_SET_KEY(leaf, key);
2344         return MDB_SUCCESS;
2345 }
2346
2347 int
2348 mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
2349     MDB_cursor_op op)
2350 {
2351         int              rc;
2352         int              exact = 0;
2353
2354         assert(cursor);
2355
2356         switch (op) {
2357         case MDB_GET_BOTH:
2358         case MDB_GET_BOTH_RANGE:
2359                 if (data == NULL || cursor->mc_xcursor == NULL) {
2360                         rc = EINVAL;
2361                         break;
2362                 }
2363                 /* FALLTHRU */
2364         case MDB_SET:
2365         case MDB_SET_RANGE:
2366                 if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2367                         rc = EINVAL;
2368                 } else if (op == MDB_SET_RANGE)
2369                         rc = mdb_cursor_set(cursor, key, data, op, NULL);
2370                 else
2371                         rc = mdb_cursor_set(cursor, key, data, op, &exact);
2372                 break;
2373         case MDB_GET_MULTIPLE:
2374                 if (data == NULL ||
2375                         !(cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPFIXED) ||
2376                         !cursor->mc_initialized) {
2377                         rc = EINVAL;
2378                         break;
2379                 }
2380                 rc = MDB_SUCCESS;
2381                 if (!cursor->mc_xcursor->mx_cursor.mc_initialized || cursor->mc_xcursor->mx_cursor.mc_eof)
2382                         break;
2383                 goto fetchm;
2384         case MDB_NEXT_MULTIPLE:
2385                 if (data == NULL ||
2386                         !(cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPFIXED)) {
2387                         rc = EINVAL;
2388                         break;
2389                 }
2390                 if (!cursor->mc_initialized)
2391                         rc = mdb_cursor_first(cursor, key, data);
2392                 else
2393                         rc = mdb_cursor_next(cursor, key, data, MDB_NEXT_DUP);
2394                 if (rc == MDB_SUCCESS) {
2395                         if (cursor->mc_xcursor->mx_cursor.mc_initialized) {
2396                                 MDB_ppage       *top;
2397 fetchm:
2398                                 top = CURSOR_TOP(&cursor->mc_xcursor->mx_cursor);
2399                                 data->mv_size = NUMKEYS(top->mp_page) *
2400                                         cursor->mc_xcursor->mx_txn.mt_dbs[cursor->mc_xcursor->mx_cursor.mc_dbi].md_pad;
2401                                 data->mv_data = METADATA(top->mp_page);
2402                                 top->mp_ki = NUMKEYS(top->mp_page)-1;
2403                         } else {
2404                                 rc = MDB_NOTFOUND;
2405                         }
2406                 }
2407                 break;
2408         case MDB_NEXT:
2409         case MDB_NEXT_DUP:
2410         case MDB_NEXT_NODUP:
2411                 if (!cursor->mc_initialized)
2412                         rc = mdb_cursor_first(cursor, key, data);
2413                 else
2414                         rc = mdb_cursor_next(cursor, key, data, op);
2415                 break;
2416         case MDB_PREV:
2417         case MDB_PREV_DUP:
2418         case MDB_PREV_NODUP:
2419                 if (!cursor->mc_initialized || cursor->mc_eof)
2420                         rc = mdb_cursor_last(cursor, key, data);
2421                 else
2422                         rc = mdb_cursor_prev(cursor, key, data, op);
2423                 break;
2424         case MDB_FIRST:
2425                 rc = mdb_cursor_first(cursor, key, data);
2426                 break;
2427         case MDB_LAST:
2428                 rc = mdb_cursor_last(cursor, key, data);
2429                 break;
2430         default:
2431                 DPRINTF("unhandled/unimplemented cursor operation %u", op);
2432                 rc = EINVAL;
2433                 break;
2434         }
2435
2436         return rc;
2437 }
2438
2439 /* Allocate a page and initialize it
2440  */
2441 static MDB_dpage *
2442 mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num)
2443 {
2444         MDB_dpage       *dp;
2445
2446         if ((dp = mdb_alloc_page(txn, NULL, 0, num)) == NULL)
2447                 return NULL;
2448         DPRINTF("allocated new mpage %lu, page size %u",
2449             dp->p.mp_pgno, txn->mt_env->me_psize);
2450         dp->p.mp_flags = flags | P_DIRTY;
2451         dp->p.mp_lower = PAGEHDRSZ;
2452         dp->p.mp_upper = txn->mt_env->me_psize;
2453
2454         if (IS_BRANCH(&dp->p))
2455                 txn->mt_dbs[dbi].md_branch_pages++;
2456         else if (IS_LEAF(&dp->p))
2457                 txn->mt_dbs[dbi].md_leaf_pages++;
2458         else if (IS_OVERFLOW(&dp->p)) {
2459                 txn->mt_dbs[dbi].md_overflow_pages += num;
2460                 dp->p.mp_pages = num;
2461         }
2462
2463         return dp;
2464 }
2465
2466 static size_t
2467 mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data)
2468 {
2469         size_t           sz;
2470
2471         sz = LEAFSIZE(key, data);
2472         if (data->mv_size >= env->me_psize / MDB_MINKEYS) {
2473                 /* put on overflow page */
2474                 sz -= data->mv_size - sizeof(pgno_t);
2475         }
2476
2477         return sz + sizeof(indx_t);
2478 }
2479
2480 static size_t
2481 mdb_branch_size(MDB_env *env, MDB_val *key)
2482 {
2483         size_t           sz;
2484
2485         sz = INDXSIZE(key);
2486         if (sz >= env->me_psize / MDB_MINKEYS) {
2487                 /* put on overflow page */
2488                 /* not implemented */
2489                 /* sz -= key->size - sizeof(pgno_t); */
2490         }
2491
2492         return sz + sizeof(indx_t);
2493 }
2494
2495 static int
2496 mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx,
2497     MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags)
2498 {
2499         unsigned int     i;
2500         size_t           node_size = NODESIZE;
2501         indx_t           ofs;
2502         MDB_node        *node;
2503         MDB_dpage       *ofp = NULL;            /* overflow page */
2504         DKBUF;
2505
2506         assert(mp->mp_upper >= mp->mp_lower);
2507
2508         DPRINTF("add node [%s] to %s page %lu at index %i, key size %zu",
2509             key ? DKEY(key) : NULL,
2510             IS_LEAF(mp) ? "leaf" : "branch",
2511             mp->mp_pgno, indx, key ? key->mv_size : 0);
2512
2513         if (IS_LEAF2(mp)) {
2514                 /* Move higher keys up one slot. */
2515                 int ksize = txn->mt_dbs[dbi].md_pad, dif;
2516                 char *ptr = LEAF2KEY(mp, indx, ksize);
2517                 dif = NUMKEYS(mp) - indx;
2518                 if (dif > 0)
2519                         memmove(ptr+ksize, ptr, dif*ksize);
2520                 /* insert new key */
2521                 memcpy(ptr, key->mv_data, ksize);
2522
2523                 /* Just using these for counting */
2524                 mp->mp_lower += sizeof(indx_t);
2525                 mp->mp_upper -= ksize - sizeof(indx_t);
2526                 return MDB_SUCCESS;
2527         }
2528
2529         if (key != NULL)
2530                 node_size += key->mv_size;
2531
2532         if (IS_LEAF(mp)) {
2533                 assert(data);
2534                 if (F_ISSET(flags, F_BIGDATA)) {
2535                         /* Data already on overflow page. */
2536                         node_size += sizeof(pgno_t);
2537                 } else if (data->mv_size >= txn->mt_env->me_psize / MDB_MINKEYS) {
2538                         int ovpages = OVPAGES(data->mv_size, txn->mt_env->me_psize);
2539                         /* Put data on overflow page. */
2540                         DPRINTF("data size is %zu, put on overflow page",
2541                             data->mv_size);
2542                         node_size += sizeof(pgno_t);
2543                         if ((ofp = mdb_new_page(txn, dbi, P_OVERFLOW, ovpages)) == NULL)
2544                                 return ENOMEM;
2545                         DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno);
2546                         flags |= F_BIGDATA;
2547                 } else {
2548                         node_size += data->mv_size;
2549                 }
2550         }
2551
2552         if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
2553                 DPRINTF("not enough room in page %lu, got %u ptrs",
2554                     mp->mp_pgno, NUMKEYS(mp));
2555                 DPRINTF("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower,
2556                     mp->mp_upper - mp->mp_lower);
2557                 DPRINTF("node size = %zu", node_size);
2558                 return ENOSPC;
2559         }
2560
2561         /* Move higher pointers up one slot. */
2562         for (i = NUMKEYS(mp); i > indx; i--)
2563                 mp->mp_ptrs[i] = mp->mp_ptrs[i - 1];
2564
2565         /* Adjust free space offsets. */
2566         ofs = mp->mp_upper - node_size;
2567         assert(ofs >= mp->mp_lower + sizeof(indx_t));
2568         mp->mp_ptrs[indx] = ofs;
2569         mp->mp_upper = ofs;
2570         mp->mp_lower += sizeof(indx_t);
2571
2572         /* Write the node data. */
2573         node = NODEPTR(mp, indx);
2574         node->mn_ksize = (key == NULL) ? 0 : key->mv_size;
2575         node->mn_flags = flags;
2576         if (IS_LEAF(mp))
2577                 node->mn_dsize = data->mv_size;
2578         else
2579                 NODEPGNO(node) = pgno;
2580
2581         if (key)
2582                 memcpy(NODEKEY(node), key->mv_data, key->mv_size);
2583
2584         if (IS_LEAF(mp)) {
2585                 assert(key);
2586                 if (ofp == NULL) {
2587                         if (F_ISSET(flags, F_BIGDATA))
2588                                 memcpy(node->mn_data + key->mv_size, data->mv_data,
2589                                     sizeof(pgno_t));
2590                         else
2591                                 memcpy(node->mn_data + key->mv_size, data->mv_data,
2592                                     data->mv_size);
2593                 } else {
2594                         memcpy(node->mn_data + key->mv_size, &ofp->p.mp_pgno,
2595                             sizeof(pgno_t));
2596                         memcpy(METADATA(&ofp->p), data->mv_data, data->mv_size);
2597                 }
2598         }
2599
2600         return MDB_SUCCESS;
2601 }
2602
2603 static void
2604 mdb_del_node(MDB_page *mp, indx_t indx, int ksize)
2605 {
2606         unsigned int     sz;
2607         indx_t           i, j, numkeys, ptr;
2608         MDB_node        *node;
2609         char            *base;
2610
2611         DPRINTF("delete node %u on %s page %lu", indx,
2612             IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno);
2613         assert(indx < NUMKEYS(mp));
2614
2615         if (IS_LEAF2(mp)) {
2616                 int x = NUMKEYS(mp) - 1 - indx;
2617                 base = LEAF2KEY(mp, indx, ksize);
2618                 if (x)
2619                         memmove(base, base + ksize, x * ksize);
2620                 mp->mp_lower -= sizeof(indx_t);
2621                 mp->mp_upper += ksize - sizeof(indx_t);
2622                 return;
2623         }
2624
2625         node = NODEPTR(mp, indx);
2626         sz = NODESIZE + node->mn_ksize;
2627         if (IS_LEAF(mp)) {
2628                 if (F_ISSET(node->mn_flags, F_BIGDATA))
2629                         sz += sizeof(pgno_t);
2630                 else
2631                         sz += NODEDSZ(node);
2632         }
2633
2634         ptr = mp->mp_ptrs[indx];
2635         numkeys = NUMKEYS(mp);
2636         for (i = j = 0; i < numkeys; i++) {
2637                 if (i != indx) {
2638                         mp->mp_ptrs[j] = mp->mp_ptrs[i];
2639                         if (mp->mp_ptrs[i] < ptr)
2640                                 mp->mp_ptrs[j] += sz;
2641                         j++;
2642                 }
2643         }
2644
2645         base = (char *)mp + mp->mp_upper;
2646         memmove(base + sz, base, ptr - mp->mp_upper);
2647
2648         mp->mp_lower -= sizeof(indx_t);
2649         mp->mp_upper += sz;
2650 }
2651
2652 static void
2653 mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
2654 {
2655         MDB_dbi dbn;
2656
2657         mx->mx_txn = *txn;
2658         mx->mx_txn.mt_dbxs = mx->mx_dbxs;
2659         mx->mx_txn.mt_dbs = mx->mx_dbs;
2660         mx->mx_dbxs[0] = txn->mt_dbxs[0];
2661         mx->mx_dbxs[1] = txn->mt_dbxs[1];
2662         if (dbi > 1) {
2663                 mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
2664                 dbn = 2;
2665         } else {
2666                 dbn = 1;
2667         }
2668         mx->mx_dbxs[dbn+1].md_parent = dbn;
2669         mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
2670         mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
2671         mx->mx_dbxs[dbn+1].md_dirty = 0;
2672         mx->mx_txn.mt_numdbs = dbn+2;
2673
2674         mx->mx_cursor.mc_snum = 0;
2675         mx->mx_cursor.mc_txn = &mx->mx_txn;
2676         mx->mx_cursor.mc_dbi = dbn+1;
2677 }
2678
2679 static void
2680 mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_node *node)
2681 {
2682         MDB_db *db = NODEDATA(node);
2683         MDB_dbi dbn;
2684         mx->mx_dbs[0] = txn->mt_dbs[0];
2685         mx->mx_dbs[1] = txn->mt_dbs[1];
2686         if (dbi > 1) {
2687                 mx->mx_dbs[2] = txn->mt_dbs[dbi];
2688                 dbn = 3;
2689         } else {
2690                 dbn = 2;
2691         }
2692         DPRINTF("Sub-db %u for db %u root page %lu", dbn, dbi, db->md_root);
2693         mx->mx_dbs[dbn] = *db;
2694         mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node);
2695         mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize;
2696         mx->mx_txn.mt_next_pgno = txn->mt_next_pgno;
2697         mx->mx_txn.mt_oldest = txn->mt_oldest;
2698         mx->mx_txn.mt_u = txn->mt_u;
2699         mx->mx_cursor.mc_initialized = 0;
2700         mx->mx_cursor.mc_eof = 0;
2701 }
2702
2703 static void
2704 mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
2705 {
2706         txn->mt_next_pgno = mx->mx_txn.mt_next_pgno;
2707         txn->mt_oldest = mx->mx_txn.mt_oldest;
2708         txn->mt_u = mx->mx_txn.mt_u;
2709         txn->mt_dbs[0] = mx->mx_dbs[0];
2710         txn->mt_dbs[1] = mx->mx_dbs[1];
2711         txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
2712         txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
2713         if (dbi > 1) {
2714                 txn->mt_dbs[dbi] = mx->mx_dbs[2];
2715                 txn->mt_dbxs[dbi].md_dirty = mx->mx_dbxs[2].md_dirty;
2716         }
2717 }
2718
2719 int
2720 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
2721 {
2722         MDB_cursor      *cursor;
2723         size_t size = sizeof(MDB_cursor);
2724
2725         if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
2726                 return EINVAL;
2727
2728         if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
2729                 size += sizeof(MDB_xcursor);
2730
2731         if ((cursor = calloc(1, size)) != NULL) {
2732                 cursor->mc_dbi = dbi;
2733                 cursor->mc_txn = txn;
2734                 if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
2735                         MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
2736                         cursor->mc_xcursor = mx;
2737                         mdb_xcursor_init0(txn, dbi, mx);
2738                 }
2739         } else {
2740                 return ENOMEM;
2741         }
2742
2743         *ret = cursor;
2744
2745         return MDB_SUCCESS;
2746 }
2747
2748 /* Return the count of duplicate data items for the current key */
2749 int
2750 mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
2751 {
2752         MDB_ppage       *top;
2753         MDB_node        *leaf;
2754
2755         if (mc == NULL || countp == NULL)
2756                 return EINVAL;
2757
2758         if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
2759                 return EINVAL;
2760
2761         top = CURSOR_TOP(mc);
2762         leaf = NODEPTR(top->mp_page, top->mp_ki);
2763         if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2764                 *countp = 1;
2765         } else {
2766                 if (!mc->mc_xcursor->mx_cursor.mc_initialized)
2767                         return EINVAL;
2768
2769                 *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
2770         }
2771         return MDB_SUCCESS;
2772 }
2773
2774 void
2775 mdb_cursor_close(MDB_cursor *cursor)
2776 {
2777         if (cursor != NULL) {
2778                 free(cursor);
2779         }
2780 }
2781
2782 static int
2783 mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
2784 {
2785         indx_t                   ptr, i, numkeys;
2786         int                      delta;
2787         size_t                   len;
2788         MDB_node                *node;
2789         char                    *base;
2790         DKBUF;
2791
2792         node = NODEPTR(mp, indx);
2793         ptr = mp->mp_ptrs[indx];
2794         DPRINTF("update key %u (ofs %u) [%.*s] to [%s] on page %lu",
2795             indx, ptr,
2796             (int)node->mn_ksize, (char *)NODEKEY(node),
2797                 DKEY(key),
2798             mp->mp_pgno);
2799
2800         delta = key->mv_size - node->mn_ksize;
2801         if (delta) {
2802                 if (delta > 0 && SIZELEFT(mp) < delta) {
2803                         DPRINTF("OUCH! Not enough room, delta = %d", delta);
2804                         return ENOSPC;
2805                 }
2806
2807                 numkeys = NUMKEYS(mp);
2808                 for (i = 0; i < numkeys; i++) {
2809                         if (mp->mp_ptrs[i] <= ptr)
2810                                 mp->mp_ptrs[i] -= delta;
2811                 }
2812
2813                 base = (char *)mp + mp->mp_upper;
2814                 len = ptr - mp->mp_upper + NODESIZE;
2815                 memmove(base - delta, base, len);
2816                 mp->mp_upper -= delta;
2817
2818                 node = NODEPTR(mp, indx);
2819                 node->mn_ksize = key->mv_size;
2820         }
2821
2822         memcpy(NODEKEY(node), key->mv_data, key->mv_size);
2823
2824         return MDB_SUCCESS;
2825 }
2826
2827 /* Move a node from src to dst.
2828  */
2829 static int
2830 mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx,
2831     MDB_pageparent *dst, indx_t dstindx)
2832 {
2833         int                      rc;
2834         MDB_node                *srcnode;
2835         MDB_val          key, data;
2836         DKBUF;
2837
2838         /* Mark src and dst as dirty. */
2839         if ((rc = mdb_touch(txn, src)) ||
2840             (rc = mdb_touch(txn, dst)))
2841                 return rc;;
2842
2843         if (IS_LEAF2(src->mp_page)) {
2844                 srcnode = NODEPTR(src->mp_page, 0);     /* fake */
2845                 key.mv_size = txn->mt_dbs[dbi].md_pad;
2846                 key.mv_data = LEAF2KEY(src->mp_page, srcindx, key.mv_size);
2847                 data.mv_size = 0;
2848                 data.mv_data = NULL;
2849         } else {
2850                 srcnode = NODEPTR(src->mp_page, srcindx);
2851                 key.mv_size = NODEKSZ(srcnode);
2852                 key.mv_data = NODEKEY(srcnode);
2853                 data.mv_size = NODEDSZ(srcnode);
2854                 data.mv_data = NODEDATA(srcnode);
2855         }
2856         DPRINTF("moving %s node %u [%s] on page %lu to node %u on page %lu",
2857             IS_LEAF(src->mp_page) ? "leaf" : "branch",
2858             srcindx,
2859                 DKEY(&key),
2860             src->mp_page->mp_pgno,
2861             dstindx, dst->mp_page->mp_pgno);
2862
2863         /* Add the node to the destination page.
2864          */
2865         rc = mdb_add_node(txn, dbi, dst->mp_page, dstindx, &key, &data, NODEPGNO(srcnode),
2866             srcnode->mn_flags);
2867         if (rc != MDB_SUCCESS)
2868                 return rc;
2869
2870         /* Delete the node from the source page.
2871          */
2872         mdb_del_node(src->mp_page, srcindx, key.mv_size);
2873
2874         /* The key value just changed due to del_node, find it again.
2875          */
2876         if (!IS_LEAF2(src->mp_page)) {
2877                 srcnode = NODEPTR(src->mp_page, srcindx);
2878                 key.mv_data = NODEKEY(srcnode);
2879         }
2880
2881         /* Update the parent separators.
2882          */
2883         if (srcindx == 0) {
2884                 if (src->mp_pi != 0) {
2885                         DPRINTF("update separator for source page %lu to [%s]",
2886                                 src->mp_page->mp_pgno, DKEY(&key));
2887                         if ((rc = mdb_update_key(src->mp_parent, src->mp_pi,
2888                                 &key)) != MDB_SUCCESS)
2889                                 return rc;
2890                 }
2891                 if (IS_BRANCH(src->mp_page)) {
2892                         MDB_val  nullkey;
2893                         nullkey.mv_size = 0;
2894                         assert(mdb_update_key(src->mp_page, 0, &nullkey) == MDB_SUCCESS);
2895                 }
2896         }
2897
2898         if (dstindx == 0) {
2899                 if (dst->mp_pi != 0) {
2900                         DPRINTF("update separator for destination page %lu to [%s]",
2901                                 dst->mp_page->mp_pgno, DKEY(&key));
2902                         if ((rc = mdb_update_key(dst->mp_parent, dst->mp_pi,
2903                                 &key)) != MDB_SUCCESS)
2904                                 return rc;
2905                 }
2906                 if (IS_BRANCH(dst->mp_page)) {
2907                         MDB_val  nullkey;
2908                         nullkey.mv_size = 0;
2909                         assert(mdb_update_key(dst->mp_page, 0, &nullkey) == MDB_SUCCESS);
2910                 }
2911         }
2912
2913         return MDB_SUCCESS;
2914 }
2915
2916 static int
2917 mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst)
2918 {
2919         int                      rc;
2920         indx_t                   i;
2921         MDB_node                *srcnode;
2922         MDB_val          key, data;
2923         MDB_pageparent  mpp;
2924         MDB_dhead *dh;
2925
2926         DPRINTF("merging page %lu and %lu", src->mp_page->mp_pgno, dst->mp_page->mp_pgno);
2927
2928         assert(txn != NULL);
2929         assert(src->mp_parent); /* can't merge root page */
2930         assert(dst->mp_parent);
2931
2932         /* Mark src and dst as dirty. */
2933         if ((rc = mdb_touch(txn, src)) ||
2934             (rc = mdb_touch(txn, dst)))
2935                 return rc;
2936
2937         /* Move all nodes from src to dst.
2938          */
2939         if (IS_LEAF2(src->mp_page)) {
2940                 key.mv_size = txn->mt_dbs[dbi].md_pad;
2941                 key.mv_data = METADATA(src->mp_page);
2942                 for (i = 0; i < NUMKEYS(src->mp_page); i++) {
2943                         rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key,
2944                                 NULL, 0, 0);
2945                         if (rc != MDB_SUCCESS)
2946                                 return rc;
2947                         key.mv_data = (char *)key.mv_data + key.mv_size;
2948                 }
2949         } else {
2950                 for (i = 0; i < NUMKEYS(src->mp_page); i++) {
2951                         srcnode = NODEPTR(src->mp_page, i);
2952
2953                         key.mv_size = srcnode->mn_ksize;
2954                         key.mv_data = NODEKEY(srcnode);
2955                         data.mv_size = NODEDSZ(srcnode);
2956                         data.mv_data = NODEDATA(srcnode);
2957                         rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key,
2958                                 &data, NODEPGNO(srcnode), srcnode->mn_flags);
2959                         if (rc != MDB_SUCCESS)
2960                                 return rc;
2961                 }
2962         }
2963
2964         DPRINTF("dst page %lu now has %u keys (%.1f%% filled)",
2965             dst->mp_page->mp_pgno, NUMKEYS(dst->mp_page), (float)PAGEFILL(txn->mt_env, dst->mp_page) / 10);
2966
2967         /* Unlink the src page from parent.
2968          */
2969         mdb_del_node(src->mp_parent, src->mp_pi, 0);
2970         if (src->mp_pi == 0) {
2971                 key.mv_size = 0;
2972                 if ((rc = mdb_update_key(src->mp_parent, 0, &key)) != MDB_SUCCESS)
2973                         return rc;
2974         }
2975
2976         if (IS_LEAF(src->mp_page))
2977                 txn->mt_dbs[dbi].md_leaf_pages--;
2978         else
2979                 txn->mt_dbs[dbi].md_branch_pages--;
2980
2981         mpp.mp_page = src->mp_parent;
2982         dh = (MDB_dhead *)src->mp_parent;
2983         dh--;
2984         mpp.mp_parent = dh->md_parent;
2985         mpp.mp_pi = dh->md_pi;
2986
2987         return mdb_rebalance(txn, dbi, &mpp);
2988 }
2989
2990 #define FILL_THRESHOLD   250
2991
2992 static int
2993 mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
2994 {
2995         MDB_node        *node;
2996         MDB_page        *root;
2997         MDB_pageparent npp;
2998         indx_t           si = 0, di = 0;
2999         int rc;
3000
3001         assert(txn != NULL);
3002         assert(mpp != NULL);
3003
3004         DPRINTF("rebalancing %s page %lu (has %u keys, %.1f%% full)",
3005             IS_LEAF(mpp->mp_page) ? "leaf" : "branch",
3006             mpp->mp_page->mp_pgno, NUMKEYS(mpp->mp_page), (float)PAGEFILL(txn->mt_env, mpp->mp_page) / 10);
3007
3008         if (PAGEFILL(txn->mt_env, mpp->mp_page) >= FILL_THRESHOLD) {
3009                 DPRINTF("no need to rebalance page %lu, above fill threshold",
3010                     mpp->mp_page->mp_pgno);
3011                 return MDB_SUCCESS;
3012         }
3013
3014         if (mpp->mp_parent == NULL) {
3015                 if (NUMKEYS(mpp->mp_page) == 0) {
3016                         DPUTS("tree is completely empty");
3017                         txn->mt_dbs[dbi].md_root = P_INVALID;
3018                         txn->mt_dbs[dbi].md_depth = 0;
3019                         txn->mt_dbs[dbi].md_leaf_pages = 0;
3020                 } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
3021                         DPUTS("collapsing root page!");
3022                         txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
3023                         if (rc = mdb_get_page(txn, txn->mt_dbs[dbi].md_root, &root))
3024                                 return rc;
3025                         txn->mt_dbs[dbi].md_depth--;
3026                         txn->mt_dbs[dbi].md_branch_pages--;
3027                 } else
3028                         DPUTS("root page doesn't need rebalancing");
3029                 return MDB_SUCCESS;
3030         }
3031
3032         /* The parent (branch page) must have at least 2 pointers,
3033          * otherwise the tree is invalid.
3034          */
3035         assert(NUMKEYS(mpp->mp_parent) > 1);
3036
3037         /* Leaf page fill factor is below the threshold.
3038          * Try to move keys from left or right neighbor, or
3039          * merge with a neighbor page.
3040          */
3041
3042         /* Find neighbors.
3043          */
3044         if (mpp->mp_pi == 0) {
3045                 /* We're the leftmost leaf in our parent.
3046                  */
3047                 DPUTS("reading right neighbor");
3048                 node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
3049                 if (rc = mdb_get_page(txn, NODEPGNO(node), &npp.mp_page))
3050                         return rc;
3051                 npp.mp_pi = mpp->mp_pi + 1;
3052                 si = 0;
3053                 di = NUMKEYS(mpp->mp_page);
3054         } else {
3055                 /* There is at least one neighbor to the left.
3056                  */
3057                 DPUTS("reading left neighbor");
3058                 node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
3059                 if (rc = mdb_get_page(txn, NODEPGNO(node), &npp.mp_page))
3060                         return rc;
3061                 npp.mp_pi = mpp->mp_pi - 1;
3062                 si = NUMKEYS(npp.mp_page) - 1;
3063                 di = 0;
3064         }
3065         npp.mp_parent = mpp->mp_parent;
3066
3067         DPRINTF("found neighbor page %lu (%u keys, %.1f%% full)",
3068             npp.mp_page->mp_pgno, NUMKEYS(npp.mp_page), (float)PAGEFILL(txn->mt_env, npp.mp_page) / 10);
3069
3070         /* If the neighbor page is above threshold and has at least two
3071          * keys, move one key from it.
3072          *
3073          * Otherwise we should try to merge them.
3074          */
3075         if (PAGEFILL(txn->mt_env, npp.mp_page) >= FILL_THRESHOLD && NUMKEYS(npp.mp_page) >= 2)
3076                 return mdb_move_node(txn, dbi, &npp, si, mpp, di);
3077         else { /* FIXME: if (has_enough_room()) */
3078                 if (mpp->mp_pi == 0)
3079                         return mdb_merge(txn, dbi, &npp, mpp);
3080                 else
3081                         return mdb_merge(txn, dbi, mpp, &npp);
3082         }
3083 }
3084
3085 static int
3086 mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_node *leaf)
3087 {
3088         int rc;
3089
3090         /* add overflow pages to free list */
3091         if (!IS_LEAF2(mpp->mp_page) && F_ISSET(leaf->mn_flags, F_BIGDATA)) {
3092                 int i, ovpages;
3093                 pgno_t pg;
3094
3095                 memcpy(&pg, NODEDATA(leaf), sizeof(pg));
3096                 ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize);
3097                 for (i=0; i<ovpages; i++) {
3098                         DPRINTF("freed ov page %lu", pg);
3099                         mdb_midl_insert(txn->mt_free_pgs, pg);
3100                         pg++;
3101                 }
3102         }
3103         mdb_del_node(mpp->mp_page, ki, txn->mt_dbs[dbi].md_pad);
3104         txn->mt_dbs[dbi].md_entries--;
3105         rc = mdb_rebalance(txn, dbi, mpp);
3106         if (rc != MDB_SUCCESS)
3107                 txn->mt_flags |= MDB_TXN_ERROR;
3108
3109         return rc;
3110 }
3111
3112 int
3113 mdb_del(MDB_txn *txn, MDB_dbi dbi,
3114     MDB_val *key, MDB_val *data,
3115         unsigned int flags)
3116 {
3117         int              rc, exact;
3118         unsigned int     ki;
3119         MDB_node        *leaf;
3120         MDB_pageparent  mpp;
3121         DKBUF;
3122
3123         assert(key != NULL);
3124
3125         DPRINTF("====> delete db %u key [%s]", dbi, DKEY(key));
3126
3127         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3128                 return EINVAL;
3129
3130         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
3131                 return EINVAL;
3132         }
3133
3134         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
3135                 return EINVAL;
3136         }
3137
3138         mpp.mp_parent = NULL;
3139         mpp.mp_pi = 0;
3140         if ((rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp)) != MDB_SUCCESS)
3141                 return rc;
3142
3143         leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
3144         if (leaf == NULL || !exact) {
3145                 return MDB_NOTFOUND;
3146         }
3147
3148         if (!IS_LEAF2(mpp.mp_page) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
3149                 MDB_xcursor mx;
3150                 MDB_pageparent mp2;
3151
3152                 mdb_xcursor_init0(txn, dbi, &mx);
3153                 mdb_xcursor_init1(txn, dbi, &mx, leaf);
3154                 if (flags == MDB_DEL_DUP) {
3155                         rc = mdb_del(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, NULL, 0);
3156                         mdb_xcursor_fini(txn, dbi, &mx);
3157                         if (rc != MDB_SUCCESS)
3158                                 return rc;
3159                         /* If sub-DB still has entries, we're done */
3160                         if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) {
3161                                 memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
3162                                         sizeof(MDB_db));
3163                                 txn->mt_dbs[dbi].md_entries--;
3164                                 return rc;
3165                         }
3166                         /* otherwise fall thru and delete the sub-DB */
3167                 } else {
3168                         /* add all the child DB's pages to the free list */
3169                         rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi,
3170                                 NULL, &mx.mx_cursor, 0, &mp2);
3171                         if (rc == MDB_SUCCESS) {
3172                                 MDB_ppage *top, *parent;
3173                                 MDB_node *ni;
3174                                 unsigned int i;
3175
3176                                 cursor_pop_page(&mx.mx_cursor);
3177                                 if (mx.mx_cursor.mc_snum) {
3178                                         top = CURSOR_TOP(&mx.mx_cursor);
3179                                         while (mx.mx_cursor.mc_snum > 1) {
3180                                                 parent = CURSOR_PARENT(&mx.mx_cursor);
3181                                                 for (i=0; i<NUMKEYS(top->mp_page); i++) {
3182                                                         ni = NODEPTR(top->mp_page, i);
3183                                                         mdb_midl_insert(txn->mt_free_pgs, NODEPGNO(ni));
3184                                                 }
3185                                                 parent->mp_ki++;
3186                                                 if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
3187                                                         cursor_pop_page(&mx.mx_cursor);
3188                                                         top = parent;
3189                                                 } else {
3190                                                         ni = NODEPTR(parent->mp_page, parent->mp_ki);
3191                                                         rc = mdb_get_page(&mx.mx_txn, NODEPGNO(ni), &top->mp_page);
3192                                                 }
3193                                         }
3194                                 }
3195                                 mdb_midl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root);
3196                         }
3197                 }
3198         }
3199
3200         if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS)
3201                 return rc;
3202
3203         return mdb_del0(txn, dbi, ki, &mpp, leaf);
3204 }
3205
3206 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
3207  * right sibling, at index <*newindxp> (as if unsplit). Updates *mpp and
3208  * *newindxp with the actual values after split, ie if *mpp and *newindxp
3209  * refer to a node in the new right sibling page.
3210  */
3211 static int
3212 mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
3213     MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
3214 {
3215         uint8_t          flags;
3216         int              rc = MDB_SUCCESS, ins_new = 0;
3217         indx_t           newindx;
3218         pgno_t           pgno = 0;
3219         unsigned int     i, j, split_indx;
3220         MDB_node        *node;
3221         MDB_val  sepkey, rkey, rdata;
3222         MDB_page        *copy;
3223         MDB_dpage       *mdp, *rdp, *pdp;
3224         MDB_dhead *dh;
3225         DKBUF;
3226
3227         assert(txn != NULL);
3228
3229         dh = ((MDB_dhead *)*mpp) - 1;
3230         mdp = (MDB_dpage *)dh;
3231         newindx = *newindxp;
3232
3233         DPRINTF("-----> splitting %s page %lu and adding [%s] at index %i",
3234             IS_LEAF(&mdp->p) ? "leaf" : "branch", mdp->p.mp_pgno,
3235             DKEY(newkey), *newindxp);
3236
3237         if (mdp->h.md_parent == NULL) {
3238                 if ((pdp = mdb_new_page(txn, dbi, P_BRANCH, 1)) == NULL)
3239                         return ENOMEM;
3240                 mdp->h.md_pi = 0;
3241                 mdp->h.md_parent = &pdp->p;
3242                 txn->mt_dbs[dbi].md_root = pdp->p.mp_pgno;
3243                 DPRINTF("root split! new root = %lu", pdp->p.mp_pgno);
3244                 txn->mt_dbs[dbi].md_depth++;
3245
3246                 /* Add left (implicit) pointer. */
3247                 if ((rc = mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL,
3248                     mdp->p.mp_pgno, 0)) != MDB_SUCCESS)
3249                         return rc;
3250         } else {
3251                 DPRINTF("parent branch page is %lu", mdp->h.md_parent->mp_pgno);
3252         }
3253
3254         /* Create a right sibling. */
3255         if ((rdp = mdb_new_page(txn, dbi, mdp->p.mp_flags, 1)) == NULL)
3256                 return ENOMEM;
3257         rdp->h.md_parent = mdp->h.md_parent;
3258         rdp->h.md_pi = mdp->h.md_pi + 1;
3259         DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
3260
3261         split_indx = NUMKEYS(&mdp->p) / 2 + 1;
3262
3263         if (IS_LEAF2(&rdp->p)) {
3264                 char *split, *ins;
3265                 int x;
3266                 unsigned int nkeys = NUMKEYS(&mdp->p), lsize, rsize, ksize;
3267                 /* Move half of the keys to the right sibling */
3268                 copy = NULL;
3269                 x = *newindxp - split_indx;
3270                 ksize = txn->mt_dbs[dbi].md_pad;
3271                 split = LEAF2KEY(&mdp->p, split_indx, ksize);
3272                 rsize = (nkeys - split_indx) * ksize;
3273                 lsize = (nkeys - split_indx) * sizeof(indx_t);
3274                 mdp->p.mp_lower -= lsize;
3275                 rdp->p.mp_lower += lsize;
3276                 mdp->p.mp_upper += rsize - lsize;
3277                 rdp->p.mp_upper -= rsize - lsize;
3278                 sepkey.mv_size = ksize;
3279                 if (newindx == split_indx) {
3280                         sepkey.mv_data = newkey->mv_data;
3281                 } else {
3282                         sepkey.mv_data = split;
3283                 }
3284                 if (x<0) {
3285                         ins = LEAF2KEY(&mdp->p, *newindxp, ksize);
3286                         memcpy(&rdp->p.mp_ptrs, split, rsize);
3287                         sepkey.mv_data = &rdp->p.mp_ptrs;
3288                         memmove(ins+ksize, ins, (split_indx - *newindxp) * ksize);
3289                         memcpy(ins, newkey->mv_data, ksize);
3290                         mdp->p.mp_lower += sizeof(indx_t);
3291                         mdp->p.mp_upper -= ksize - sizeof(indx_t);
3292                 } else {
3293                         if (x)
3294                                 memcpy(&rdp->p.mp_ptrs, split, x * ksize);
3295                         ins = LEAF2KEY(&rdp->p, x, ksize);
3296                         memcpy(ins, newkey->mv_data, ksize);
3297                         memcpy(ins+ksize, split + x * ksize, rsize - x * ksize);
3298                         rdp->p.mp_lower += sizeof(indx_t);
3299                         rdp->p.mp_upper -= ksize - sizeof(indx_t);
3300                         *newindxp = x;
3301                         *mpp = &rdp->p;
3302                 }
3303                 goto newsep;
3304         }
3305
3306         /* Move half of the keys to the right sibling. */
3307         if ((copy = malloc(txn->mt_env->me_psize)) == NULL)
3308                 return ENOMEM;
3309         memcpy(copy, &mdp->p, txn->mt_env->me_psize);
3310         memset(&mdp->p.mp_ptrs, 0, txn->mt_env->me_psize - PAGEHDRSZ);
3311         mdp->p.mp_lower = PAGEHDRSZ;
3312         mdp->p.mp_upper = txn->mt_env->me_psize;
3313
3314         /* First find the separating key between the split pages.
3315          */
3316         memset(&sepkey, 0, sizeof(sepkey));
3317         if (newindx == split_indx) {
3318                 sepkey.mv_size = newkey->mv_size;
3319                 sepkey.mv_data = newkey->mv_data;
3320         } else {
3321                 node = NODEPTR(copy, split_indx);
3322                 sepkey.mv_size = node->mn_ksize;
3323                 sepkey.mv_data = NODEKEY(node);
3324         }
3325
3326 newsep:
3327         DPRINTF("separator is [%s]", DKEY(&sepkey));
3328
3329         /* Copy separator key to the parent.
3330          */
3331         if (SIZELEFT(rdp->h.md_parent) < mdb_branch_size(txn->mt_env, &sepkey)) {
3332                 rc = mdb_split(txn, dbi, &rdp->h.md_parent, &rdp->h.md_pi,
3333                     &sepkey, NULL, rdp->p.mp_pgno);
3334
3335                 /* Right page might now have changed parent.
3336                  * Check if left page also changed parent.
3337                  */
3338                 if (rdp->h.md_parent != mdp->h.md_parent &&
3339                     mdp->h.md_pi >= NUMKEYS(mdp->h.md_parent)) {
3340                         mdp->h.md_parent = rdp->h.md_parent;
3341                         mdp->h.md_pi = rdp->h.md_pi - 1;
3342                 }
3343         } else {
3344                 rc = mdb_add_node(txn, dbi, rdp->h.md_parent, rdp->h.md_pi,
3345                     &sepkey, NULL, rdp->p.mp_pgno, 0);
3346         }
3347         if (IS_LEAF2(&rdp->p)) {
3348                 return rc;
3349         }
3350         if (rc != MDB_SUCCESS) {
3351                 free(copy);
3352                 return rc;
3353         }
3354
3355         for (i = j = 0; i <= NUMKEYS(copy); j++) {
3356                 if (i < split_indx) {
3357                         /* Re-insert in left sibling. */
3358                         pdp = mdp;
3359                 } else {
3360                         /* Insert in right sibling. */
3361                         if (i == split_indx)
3362                                 /* Reset insert index for right sibling. */
3363                                 j = (i == newindx && ins_new);
3364                         pdp = rdp;
3365                 }
3366
3367                 if (i == newindx && !ins_new) {
3368                         /* Insert the original entry that caused the split. */
3369                         rkey.mv_data = newkey->mv_data;
3370                         rkey.mv_size = newkey->mv_size;
3371                         if (IS_LEAF(&mdp->p)) {
3372                                 rdata.mv_data = newdata->mv_data;
3373                                 rdata.mv_size = newdata->mv_size;
3374                         } else
3375                                 pgno = newpgno;
3376                         flags = 0;
3377
3378                         ins_new = 1;
3379
3380                         /* Update page and index for the new key. */
3381                         *newindxp = j;
3382                         *mpp = &pdp->p;
3383                 } else if (i == NUMKEYS(copy)) {
3384                         break;
3385                 } else {
3386                         node = NODEPTR(copy, i);
3387                         rkey.mv_data = NODEKEY(node);
3388                         rkey.mv_size = node->mn_ksize;
3389                         if (IS_LEAF(&mdp->p)) {
3390                                 rdata.mv_data = NODEDATA(node);
3391                                 rdata.mv_size = node->mn_dsize;
3392                         } else
3393                                 pgno = NODEPGNO(node);
3394                         flags = node->mn_flags;
3395
3396                         i++;
3397                 }
3398
3399                 if (!IS_LEAF(&mdp->p) && j == 0) {
3400                         /* First branch index doesn't need key data. */
3401                         rkey.mv_size = 0;
3402                 }
3403
3404                 rc = mdb_add_node(txn, dbi, &pdp->p, j, &rkey, &rdata, pgno,flags);
3405         }
3406
3407         free(copy);
3408         return rc;
3409 }
3410
3411 static int
3412 mdb_put0(MDB_txn *txn, MDB_dbi dbi,
3413     MDB_val *key, MDB_val *data, unsigned int flags)
3414 {
3415         int              rc = MDB_SUCCESS, exact;
3416         unsigned int     ki;
3417         MDB_node        *leaf;
3418         MDB_pageparent  mpp;
3419         MDB_val xdata, *rdata, dkey;
3420         MDB_db dummy;
3421         char dbuf[PAGESIZE];
3422         int do_sub = 0;
3423         size_t nsize;
3424         DKBUF;
3425
3426         DPRINTF("==> put db %u key [%s], size %zu, data size %zu",
3427                 dbi, DKEY(key), key->mv_size, data->mv_size);
3428
3429         dkey.mv_size = 0;
3430         mpp.mp_parent = NULL;
3431         mpp.mp_pi = 0;
3432         rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
3433         if (rc == MDB_SUCCESS) {
3434                 leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
3435                 if (leaf && exact) {
3436                         if (flags == MDB_NOOVERWRITE) {
3437                                 DPRINTF("duplicate key [%s]", DKEY(key));
3438                                 return MDB_KEYEXIST;
3439                         }
3440                         /* there's only a key anyway, so this is a no-op */
3441                         if (IS_LEAF2(mpp.mp_page))
3442                                 return MDB_SUCCESS;
3443
3444                         if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
3445                                 /* Was a single item before, must convert now */
3446                                 if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
3447                                         dkey.mv_size = NODEDSZ(leaf);
3448                                         dkey.mv_data = dbuf;
3449                                         memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
3450                                         /* data matches, ignore it */
3451                                         if (!mdb_dcmp(txn, dbi, data, &dkey))
3452                                                 return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
3453                                         memset(&dummy, 0, sizeof(dummy));
3454                                         if (txn->mt_dbs[dbi].md_flags & MDB_DUPFIXED) {
3455                                                 dummy.md_pad = data->mv_size;
3456                                                 dummy.md_flags = MDB_DUPFIXED;
3457                                                 if (txn->mt_dbs[dbi].md_flags & MDB_INTEGERDUP)
3458                                                         dummy.md_flags |= MDB_INTEGERKEY;
3459                                         }
3460                                         dummy.md_root = P_INVALID;
3461                                         if (dkey.mv_size == sizeof(MDB_db)) {
3462                                                 memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
3463                                                 goto put_sub;
3464                                         }
3465                                         mdb_del_node(mpp.mp_page, ki, 0);
3466                                         do_sub = 1;
3467                                         rdata = &xdata;
3468                                         xdata.mv_size = sizeof(MDB_db);
3469                                         xdata.mv_data = &dummy;
3470                                         goto new_sub;
3471                                 }
3472                                 goto put_sub;
3473                         }
3474                         /* same size, just replace it */
3475                         if (NODEDSZ(leaf) == data->mv_size) {
3476                                 memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
3477                                 goto done;
3478                         }
3479                         mdb_del_node(mpp.mp_page, ki, 0);
3480                 }
3481                 if (leaf == NULL) {             /* append if not found */
3482                         ki = NUMKEYS(mpp.mp_page);
3483                         DPRINTF("appending key at index %i", ki);
3484                 }
3485         } else if (rc == MDB_NOTFOUND) {
3486                 MDB_dpage *dp;
3487                 /* new file, just write a root leaf page */
3488                 DPUTS("allocating new root leaf page");
3489                 if ((dp = mdb_new_page(txn, dbi, P_LEAF, 1)) == NULL) {
3490                         return ENOMEM;
3491                 }
3492                 mpp.mp_page = &dp->p;
3493                 txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno;
3494                 txn->mt_dbs[dbi].md_depth++;
3495                 txn->mt_dbxs[dbi].md_dirty = 1;
3496                 if ((txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED)) == MDB_DUPFIXED)
3497                         mpp.mp_page->mp_flags |= P_LEAF2;
3498                 ki = 0;
3499         }
3500         else
3501                 goto done;
3502
3503         assert(IS_LEAF(mpp.mp_page));
3504         DPRINTF("there are %u keys, should insert new key at index %i",
3505                 NUMKEYS(mpp.mp_page), ki);
3506
3507         rdata = data;
3508
3509 new_sub:
3510         nsize = IS_LEAF2(mpp.mp_page) ? key->mv_size : mdb_leaf_size(txn->mt_env, key, rdata);
3511         if (SIZELEFT(mpp.mp_page) < nsize) {
3512                 rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID);
3513         } else {
3514                 /* There is room already in this leaf page. */
3515                 rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, rdata, 0, 0);
3516         }
3517
3518         if (rc != MDB_SUCCESS)
3519                 txn->mt_flags |= MDB_TXN_ERROR;
3520         else {
3521                 /* Remember if we just added a subdatabase */
3522                 if (flags & F_SUBDATA) {
3523                         leaf = NODEPTR(mpp.mp_page, ki);
3524                         leaf->mn_flags |= F_SUBDATA;
3525                 }
3526
3527                 /* Now store the actual data in the child DB. Note that we're
3528                  * storing the user data in the keys field, so there are strict
3529                  * size limits on dupdata. The actual data fields of the child
3530                  * DB are all zero size.
3531                  */
3532                 if (do_sub) {
3533                         MDB_xcursor mx;
3534
3535                         leaf = NODEPTR(mpp.mp_page, ki);
3536 put_sub:
3537                         mdb_xcursor_init0(txn, dbi, &mx);
3538                         mdb_xcursor_init1(txn, dbi, &mx, leaf);
3539                         xdata.mv_size = 0;
3540                         xdata.mv_data = "";
3541                         if (flags == MDB_NODUPDATA)
3542                                 flags = MDB_NOOVERWRITE;
3543                         /* converted, write the original data first */
3544                         if (dkey.mv_size) {
3545                                 rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags);
3546                                 if (rc) return rc;
3547                                 leaf->mn_flags |= F_DUPDATA;
3548                         }
3549                         rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags);
3550                         mdb_xcursor_fini(txn, dbi, &mx);
3551                         memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
3552                                 sizeof(MDB_db));
3553                 }
3554                 txn->mt_dbs[dbi].md_entries++;
3555         }
3556
3557 done:
3558         return rc;
3559 }
3560
3561 int
3562 mdb_put(MDB_txn *txn, MDB_dbi dbi,
3563     MDB_val *key, MDB_val *data, unsigned int flags)
3564 {
3565         assert(key != NULL);
3566         assert(data != NULL);
3567
3568         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3569                 return EINVAL;
3570
3571         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
3572                 return EINVAL;
3573         }
3574
3575         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
3576                 return EINVAL;
3577         }
3578
3579         if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA)) != flags)
3580                 return EINVAL;
3581
3582         return mdb_put0(txn, dbi, key, data, flags);
3583 }
3584
3585 int
3586 mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff)
3587 {
3588 #define CHANGEABLE      (MDB_NOSYNC)
3589         if ((flag & CHANGEABLE) != flag)
3590                 return EINVAL;
3591         if (onoff)
3592                 env->me_flags |= flag;
3593         else
3594                 env->me_flags &= ~flag;
3595         return MDB_SUCCESS;
3596 }
3597
3598 int
3599 mdb_env_get_flags(MDB_env *env, unsigned int *arg)
3600 {
3601         if (!env || !arg)
3602                 return EINVAL;
3603
3604         *arg = env->me_flags;
3605         return MDB_SUCCESS;
3606 }
3607
3608 int
3609 mdb_env_get_path(MDB_env *env, const char **arg)
3610 {
3611         if (!env || !arg)
3612                 return EINVAL;
3613
3614         *arg = env->me_path;
3615         return MDB_SUCCESS;
3616 }
3617
3618 static int
3619 mdb_stat0(MDB_env *env, MDB_db *db, MDB_stat *arg)
3620 {
3621         arg->ms_psize = env->me_psize;
3622         arg->ms_depth = db->md_depth;
3623         arg->ms_branch_pages = db->md_branch_pages;
3624         arg->ms_leaf_pages = db->md_leaf_pages;
3625         arg->ms_overflow_pages = db->md_overflow_pages;
3626         arg->ms_entries = db->md_entries;
3627
3628         return MDB_SUCCESS;
3629 }
3630 int
3631 mdb_env_stat(MDB_env *env, MDB_stat *arg)
3632 {
3633         if (env == NULL || arg == NULL)
3634                 return EINVAL;
3635
3636         return mdb_stat0(env, &env->me_meta->mm_dbs[MAIN_DBI], arg);
3637 }
3638
3639 int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
3640 {
3641         MDB_val key, data;
3642         MDB_dbi i;
3643         int rc, dirty = 0;
3644         size_t len;
3645
3646         /* main DB? */
3647         if (!name) {
3648                 *dbi = MAIN_DBI;
3649                 if (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY))
3650                         txn->mt_dbs[MAIN_DBI].md_flags |= (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY));
3651                 return MDB_SUCCESS;
3652         }
3653
3654         /* Is the DB already open? */
3655         len = strlen(name);
3656         for (i=2; i<txn->mt_numdbs; i++) {
3657                 if (len == txn->mt_dbxs[i].md_name.mv_size &&
3658                         !strncmp(name, txn->mt_dbxs[i].md_name.mv_data, len)) {
3659                         *dbi = i;
3660                         return MDB_SUCCESS;
3661                 }
3662         }
3663
3664         if (txn->mt_numdbs >= txn->mt_env->me_maxdbs - 1)
3665                 return ENFILE;
3666
3667         /* Find the DB info */
3668         key.mv_size = len;
3669         key.mv_data = (void *)name;
3670         rc = mdb_get(txn, MAIN_DBI, &key, &data);
3671
3672         /* Create if requested */
3673         if (rc == MDB_NOTFOUND && (flags & MDB_CREATE)) {
3674                 MDB_db dummy;
3675                 data.mv_size = sizeof(MDB_db);
3676                 data.mv_data = &dummy;
3677                 memset(&dummy, 0, sizeof(dummy));
3678                 dummy.md_root = P_INVALID;
3679                 dummy.md_flags = flags & 0xffff;
3680                 rc = mdb_put0(txn, MAIN_DBI, &key, &data, F_SUBDATA);
3681                 dirty = 1;
3682         }
3683
3684         /* OK, got info, add to table */
3685         if (rc == MDB_SUCCESS) {
3686                 txn->mt_dbxs[txn->mt_numdbs].md_name.mv_data = strdup(name);
3687                 txn->mt_dbxs[txn->mt_numdbs].md_name.mv_size = len;
3688                 txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL;
3689                 txn->mt_dbxs[txn->mt_numdbs].md_dcmp = NULL;
3690                 txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL;
3691                 txn->mt_dbxs[txn->mt_numdbs].md_parent = MAIN_DBI;
3692                 txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty;
3693                 memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
3694                 *dbi = txn->mt_numdbs;
3695                 txn->mt_env->me_dbs[0][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
3696                 txn->mt_env->me_dbs[1][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
3697                 txn->mt_numdbs++;
3698         }
3699
3700         return rc;
3701 }
3702
3703 int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *arg)
3704 {
3705         if (txn == NULL || arg == NULL || dbi >= txn->mt_numdbs)
3706                 return EINVAL;
3707
3708         return mdb_stat0(txn->mt_env, &txn->mt_dbs[dbi], arg);
3709 }
3710
3711 void mdb_close(MDB_txn *txn, MDB_dbi dbi)
3712 {
3713         char *ptr;
3714         if (dbi <= MAIN_DBI || dbi >= txn->mt_numdbs)
3715                 return;
3716         ptr = txn->mt_dbxs[dbi].md_name.mv_data;
3717         txn->mt_dbxs[dbi].md_name.mv_data = NULL;
3718         txn->mt_dbxs[dbi].md_name.mv_size = 0;
3719         free(ptr);
3720 }
3721
3722 int mdb_set_compare(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
3723 {
3724         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3725                 return EINVAL;
3726
3727         txn->mt_dbxs[dbi].md_cmp = cmp;
3728         return MDB_SUCCESS;
3729 }
3730
3731 int mdb_set_dupsort(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
3732 {
3733         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3734                 return EINVAL;
3735
3736         txn->mt_dbxs[dbi].md_dcmp = cmp;
3737         return MDB_SUCCESS;
3738 }
3739
3740 int mdb_set_relfunc(MDB_txn *txn, MDB_dbi dbi, MDB_rel_func *rel)
3741 {
3742         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3743                 return EINVAL;
3744
3745         txn->mt_dbxs[dbi].md_rel = rel;
3746         return MDB_SUCCESS;
3747 }