2 * @brief memory-mapped database library
4 * A Btree-based database management library modeled loosely on the
5 * BerkeleyDB API, but much simplified.
8 * Copyright 2011 Howard Chu, Symas Corp.
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted only as authorized by the OpenLDAP
15 * A copy of this license is available in the file LICENSE in the
16 * top-level directory of the distribution or, alternatively, at
17 * <http://www.OpenLDAP.org/license.html>.
19 * This code is derived from btree.c written by Martin Hedenfalk.
21 * Copyright (c) 2009, 2010 Martin Hedenfalk <martin@bzero.se>
23 * Permission to use, copy, modify, and distribute this software for any
24 * purpose with or without fee is hereby granted, provided that the above
25 * copyright notice and this permission notice appear in all copies.
27 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
28 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
29 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
30 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
31 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
32 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
33 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
35 #include <sys/types.h>
37 #include <sys/param.h>
40 #ifdef HAVE_SYS_FILE_H
58 #define ULONG unsigned long
63 /* Note: If O_DSYNC is undefined but exists in /usr/include,
64 * preferably set some compiler flag to get the definition.
65 * Otherwise compile with the less efficient -DMDB_DSYNC=O_SYNC.
68 # define MDB_DSYNC O_DSYNC
75 #if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
76 # define DPRINTF (void) /* Vararg macros may be unsupported */
78 # define DPRINTF(fmt, ...) /* Requires 2 or more args */ \
79 fprintf(stderr, "%s:%d:(%p) " fmt "\n", __func__, __LINE__, pthread_self(), __VA_ARGS__)
81 # define DPRINTF(fmt, ...) ((void) 0)
83 #define DPUTS(arg) DPRINTF("%s", arg)
87 #define MDB_MAGIC 0xBEEFC0DE
89 #define MAXKEYSIZE 511
91 #define KBUF (MAXKEYSIZE*2+1)
92 #define DKBUF char kbuf[KBUF]
93 #define DKEY(x) mdb_dkey(x, kbuf)
99 /* The DB view is always consistent because all writes are wrapped in
100 * the wmutex. Finer-grained locks aren't necessary.
106 #define LAZY_MUTEX_LOCK(x)
107 #define LAZY_MUTEX_UNLOCK(x)
108 #define LAZY_RWLOCK_UNLOCK(x)
109 #define LAZY_RWLOCK_WRLOCK(x)
110 #define LAZY_RWLOCK_RDLOCK(x)
111 #define LAZY_RWLOCK_DEF(x)
112 #define LAZY_RWLOCK_INIT(x,y)
113 #define LAZY_RWLOCK_DESTROY(x)
115 #define LAZY_MUTEX_LOCK(x) pthread_mutex_lock(x)
116 #define LAZY_MUTEX_UNLOCK(x) pthread_mutex_unlock(x)
117 #define LAZY_RWLOCK_UNLOCK(x) pthread_rwlock_unlock(x)
118 #define LAZY_RWLOCK_WRLOCK(x) pthread_rwlock_wrlock(x)
119 #define LAZY_RWLOCK_RDLOCK(x) pthread_rwlock_rdlock(x)
120 #define LAZY_RWLOCK_DEF(x) pthread_rwlock_t x
121 #define LAZY_RWLOCK_INIT(x,y) pthread_rwlock_init(x,y)
122 #define LAZY_RWLOCK_DESTROY(x) pthread_rwlock_destroy(x)
125 #define P_INVALID (~0UL)
127 #define F_ISSET(w, f) (((w) & (f)) == (f))
129 typedef uint16_t indx_t;
131 #define DEFAULT_READERS 126
132 #define DEFAULT_MAPSIZE 1048576
134 /* Lock descriptor stuff */
136 #define CACHELINE 64 /* most CPUs. Itanium uses 128 */
139 typedef struct MDB_rxbody {
145 typedef struct MDB_reader {
148 #define mr_txnid mru.mrx.mrb_txnid
149 #define mr_pid mru.mrx.mrb_pid
150 #define mr_tid mru.mrx.mrb_tid
151 /* cache line alignment */
152 char pad[(sizeof(MDB_rxbody)+CACHELINE-1) & ~(CACHELINE-1)];
156 typedef struct MDB_txbody {
158 uint32_t mtb_version;
159 pthread_mutex_t mtb_mutex;
161 uint32_t mtb_numreaders;
162 uint32_t mtb_me_toggle;
165 typedef struct MDB_txninfo {
168 #define mti_magic mt1.mtb.mtb_magic
169 #define mti_version mt1.mtb.mtb_version
170 #define mti_mutex mt1.mtb.mtb_mutex
171 #define mti_txnid mt1.mtb.mtb_txnid
172 #define mti_numreaders mt1.mtb.mtb_numreaders
173 #define mti_me_toggle mt1.mtb.mtb_me_toggle
174 char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
177 pthread_mutex_t mt2_wmutex;
178 #define mti_wmutex mt2.mt2_wmutex
179 char pad[(sizeof(pthread_mutex_t)+CACHELINE-1) & ~(CACHELINE-1)];
181 MDB_reader mti_readers[1];
184 /* Common header for all page types. Overflow pages
185 * occupy a number of contiguous pages with no
186 * headers on any page after the first.
188 typedef struct MDB_page { /* represents a page of storage */
189 #define mp_pgno mp_p.p_pgno
191 pgno_t p_pgno; /* page number */
192 void * p_align; /* for IL32P64 */
194 #define P_BRANCH 0x01 /* branch page */
195 #define P_LEAF 0x02 /* leaf page */
196 #define P_OVERFLOW 0x04 /* overflow page */
197 #define P_META 0x08 /* meta page */
198 #define P_DIRTY 0x10 /* dirty page */
199 #define P_LEAF2 0x20 /* DB with small, fixed size keys and no data */
201 #define mp_lower mp_pb.pb.pb_lower
202 #define mp_upper mp_pb.pb.pb_upper
203 #define mp_pages mp_pb.pb_pages
206 indx_t pb_lower; /* lower bound of free space */
207 indx_t pb_upper; /* upper bound of free space */
209 uint32_t pb_pages; /* number of overflow pages */
211 indx_t mp_ptrs[1]; /* dynamic size */
214 #define PAGEHDRSZ ((unsigned) offsetof(MDB_page, mp_ptrs))
216 #define NUMKEYS(p) (((p)->mp_lower - PAGEHDRSZ) >> 1)
217 #define SIZELEFT(p) (indx_t)((p)->mp_upper - (p)->mp_lower)
218 #define PAGEFILL(env, p) (1000L * ((env)->me_psize - PAGEHDRSZ - SIZELEFT(p)) / \
219 ((env)->me_psize - PAGEHDRSZ))
220 #define IS_LEAF(p) F_ISSET((p)->mp_flags, P_LEAF)
221 #define IS_LEAF2(p) F_ISSET((p)->mp_flags, P_LEAF2)
222 #define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH)
223 #define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW)
225 #define OVPAGES(size, psize) ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
227 typedef struct MDB_db {
228 uint32_t md_pad; /* also ksize for LEAF2 pages */
231 ULONG md_branch_pages;
233 ULONG md_overflow_pages;
241 typedef struct MDB_meta { /* meta (footer) page content */
244 void *mm_address; /* address for fixed mapping */
245 size_t mm_mapsize; /* size of mmap region */
246 MDB_db mm_dbs[2]; /* first is free space, 2nd is main db */
247 #define mm_psize mm_dbs[0].md_pad
248 #define mm_flags mm_dbs[0].md_flags
249 pgno_t mm_last_pg; /* last used page in file */
250 ULONG mm_txnid; /* txnid that committed this page */
253 typedef struct MDB_dhead { /* a dirty page */
255 unsigned md_pi; /* parent index */
259 typedef struct MDB_dpage {
264 typedef struct MDB_oldpages {
265 struct MDB_oldpages *mo_next;
267 pgno_t mo_pages[1]; /* dynamic */
270 typedef struct MDB_pageparent {
276 static MDB_dpage *mdb_alloc_page(MDB_txn *txn, MDB_dbi dbi, MDB_page *parent, unsigned int parent_idx, int num);
277 static int mdb_touch(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp);
279 typedef struct MDB_ppage { /* ordered list of pages */
281 unsigned int mp_ki; /* cursor index on page */
284 #define CURSOR_TOP(c) (&(c)->mc_stack[(c)->mc_snum-1])
285 #define CURSOR_PARENT(c) (&(c)->mc_stack[(c)->mc_snum-2])
286 #define CURSOR_STACK 32
292 MDB_ppage mc_stack[CURSOR_STACK]; /* stack of parent pages */
293 unsigned int mc_snum; /* number of pushed pages */
295 short mc_initialized; /* 1 if initialized */
296 short mc_eof; /* 1 if end is reached */
297 struct MDB_xcursor *mc_xcursor;
300 #define METADATA(p) ((void *)((char *)(p) + PAGEHDRSZ))
302 typedef struct MDB_node {
303 #define mn_pgno mn_p.np_pgno
304 #define mn_dsize mn_p.np_dsize
306 pgno_t np_pgno; /* child page number */
307 uint32_t np_dsize; /* leaf data size */
309 unsigned int mn_flags:4;
310 unsigned int mn_ksize:12; /* key size */
311 #define F_BIGDATA 0x01 /* data put on overflow page */
312 #define F_SUBDATA 0x02 /* data is a sub-database */
313 #define F_DUPDATA 0x04 /* data has duplicates */
317 typedef struct MDB_dbx {
319 MDB_cmp_func *md_cmp; /* user compare function */
320 MDB_cmp_func *md_dcmp; /* user dupsort function */
321 MDB_rel_func *md_rel; /* user relocate function */
323 unsigned int md_dirty;
327 pgno_t mt_next_pgno; /* next unallocated page */
330 pgno_t *mt_free_pgs; /* this is an IDL */
332 MIDL2 *dirty_list; /* modified pages */
335 MDB_dbx *mt_dbxs; /* array */
337 unsigned int mt_numdbs;
339 #define MDB_TXN_RDONLY 0x01 /* read-only transaction */
340 #define MDB_TXN_ERROR 0x02 /* an error has occurred */
341 unsigned int mt_flags;
342 unsigned int mt_toggle;
345 /* Context for sorted-dup records */
346 typedef struct MDB_xcursor {
347 MDB_cursor mx_cursor;
356 int me_mfd; /* just for writing the meta pages */
357 #define MDB_FATAL_ERROR 0x80000000U
359 uint32_t me_extrapad; /* unused for now */
360 unsigned int me_maxreaders;
361 unsigned int me_numdbs;
362 unsigned int me_maxdbs;
365 MDB_txninfo *me_txns;
366 MDB_meta *me_metas[2];
367 MDB_txn *me_txn; /* current write transaction */
369 off_t me_size; /* current file size */
370 pgno_t me_maxpg; /* me_mapsize / me_psize */
371 unsigned int me_psize;
372 unsigned int me_db_toggle;
373 MDB_dbx *me_dbxs; /* array */
375 MDB_oldpages *me_pghead;
376 pthread_key_t me_txkey; /* thread-key for readers */
377 MDB_dpage *me_dpages;
378 pgno_t me_free_pgs[MDB_IDL_UM_SIZE];
379 MIDL2 me_dirty_list[MDB_IDL_DB_SIZE];
380 LAZY_RWLOCK_DEF(me_dblock);
383 #define NODESIZE offsetof(MDB_node, mn_data)
385 #define INDXSIZE(k) (NODESIZE + ((k) == NULL ? 0 : (k)->mv_size))
386 #define LEAFSIZE(k, d) (NODESIZE + (k)->mv_size + (d)->mv_size)
387 #define NODEPTR(p, i) ((MDB_node *)((char *)(p) + (p)->mp_ptrs[i]))
388 #define NODEKEY(node) (void *)((node)->mn_data)
389 #define NODEDATA(node) (void *)((char *)(node)->mn_data + (node)->mn_ksize)
390 #define NODEPGNO(node) ((node)->mn_pgno)
391 #define NODEDSZ(node) ((node)->mn_dsize)
392 #define NODEKSZ(node) ((node)->mn_ksize)
393 #define LEAF2KEY(p, i, ks) ((char *)(p) + PAGEHDRSZ + ((i)*(ks)))
395 #define MDB_SET_KEY(node, key) if (key!=NULL) {(key)->mv_size = NODEKSZ(node); (key)->mv_data = NODEKEY(node);}
397 #define MDB_COMMIT_PAGES 64 /* max number of pages to write in one commit */
399 static int mdb_search_page_root(MDB_txn *txn,
400 MDB_dbi dbi, MDB_val *key,
401 MDB_cursor *cursor, int modify,
402 MDB_pageparent *mpp);
403 static int mdb_search_page(MDB_txn *txn,
404 MDB_dbi dbi, MDB_val *key,
405 MDB_cursor *cursor, int modify,
406 MDB_pageparent *mpp);
408 static int mdb_env_read_header(MDB_env *env, MDB_meta *meta);
409 static int mdb_env_read_meta(MDB_env *env, int *which);
410 static int mdb_env_write_meta(MDB_txn *txn);
411 static int mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **mp);
413 static MDB_node *mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
414 MDB_val *key, int *exactp, unsigned int *kip);
415 static int mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
416 indx_t indx, MDB_val *key, MDB_val *data,
417 pgno_t pgno, uint8_t flags);
418 static void mdb_del_node(MDB_page *mp, indx_t indx, int ksize);
419 static int mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki,
420 MDB_pageparent *mpp, MDB_node *leaf);
421 static int mdb_put0(MDB_txn *txn, MDB_dbi dbi,
422 MDB_val *key, MDB_val *data, unsigned int flags);
423 static int mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
425 static int mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp);
426 static int mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
427 static int mdb_move_node(MDB_txn *txn, MDB_dbi dbi,
428 MDB_pageparent *src, indx_t srcindx,
429 MDB_pageparent *dst, indx_t dstindx);
430 static int mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src,
431 MDB_pageparent *dst);
432 static int mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp,
433 unsigned int *newindxp, MDB_val *newkey,
434 MDB_val *newdata, pgno_t newpgno);
435 static MDB_dpage *mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num);
437 static void cursor_pop_page(MDB_cursor *cursor);
438 static MDB_ppage *cursor_push_page(MDB_cursor *cursor,
441 static int mdb_sibling(MDB_cursor *cursor, int move_right);
442 static int mdb_cursor_next(MDB_cursor *cursor,
443 MDB_val *key, MDB_val *data, MDB_cursor_op op);
444 static int mdb_cursor_prev(MDB_cursor *cursor,
445 MDB_val *key, MDB_val *data, MDB_cursor_op op);
446 static int mdb_cursor_set(MDB_cursor *cursor,
447 MDB_val *key, MDB_val *data, MDB_cursor_op op, int *exactp);
448 static int mdb_cursor_first(MDB_cursor *cursor,
449 MDB_val *key, MDB_val *data);
450 static int mdb_cursor_last(MDB_cursor *cursor,
451 MDB_val *key, MDB_val *data);
453 static void mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
454 static void mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx,
455 MDB_page *mp, MDB_node *node);
456 static void mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx);
458 static size_t mdb_leaf_size(MDB_env *env, MDB_val *key,
460 static size_t mdb_branch_size(MDB_env *env, MDB_val *key);
462 static int memncmp(const void *s1, size_t n1,
463 const void *s2, size_t n2);
464 static int memnrcmp(const void *s1, size_t n1,
465 const void *s2, size_t n2);
468 memncmp(const void *s1, size_t n1, const void *s2, size_t n2)
470 int diff, len_diff = -1;
473 len_diff = (n1 > n2);
476 diff = memcmp(s1, s2, n1);
477 return diff ? diff : len_diff;
481 memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
483 const unsigned char *p1, *p2, *p1_lim;
490 p1 = (const unsigned char *)s1 + n1 - 1;
491 p2 = (const unsigned char *)s2 + n2 - 1;
493 for (p1_lim = (n1 <= n2 ? s1 : s2); *p1 == *p2; p1--, p2--) {
495 return (p1 != s1) ? (p1 != p2) : (p2 != s2) ? -1 : 0;
501 mdb_version(int *maj, int *min, int *pat)
503 if (maj) *maj = MDB_VERSION_MAJOR;
504 if (min) *min = MDB_VERSION_MINOR;
505 if (pat) *pat = MDB_VERSION_PATCH;
506 return MDB_VERSION_STRING;
509 static char *const errstr[] = {
510 "MDB_KEYEXIST: Key/data pair already exists",
511 "MDB_NOTFOUND: No matching key/data pair found",
512 "MDB_PAGE_NOTFOUND: Requested page not found",
513 "MDB_CORRUPTED: Located page was wrong type",
514 "MDB_PANIC: Update of meta page failed",
515 "MDB_VERSION_MISMATCH: Database environment version mismatch"
519 mdb_strerror(int err)
522 return ("Successful return: 0");
524 if (err >= MDB_KEYEXIST && err <= MDB_VERSION_MISMATCH)
525 return errstr[err - MDB_KEYEXIST];
527 return strerror(err);
532 mdb_dkey(MDB_val *key, char *buf)
535 unsigned char *c = key->mv_data;
537 if (key->mv_size > MAXKEYSIZE)
539 for (i=0; i<key->mv_size; i++)
540 ptr += sprintf(ptr, "%02x", *c++);
546 mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
548 if (txn->mt_dbxs[dbi].md_cmp)
549 return txn->mt_dbxs[dbi].md_cmp(a, b);
551 if (txn->mt_dbs[dbi].md_flags & (MDB_REVERSEKEY
552 #if __BYTE_ORDER == __LITTLE_ENDIAN
556 return memnrcmp(a->mv_data, a->mv_size, b->mv_data, b->mv_size);
558 return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size);
562 mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
564 if (txn->mt_dbxs[dbi].md_dcmp)
565 return txn->mt_dbxs[dbi].md_dcmp(a, b);
567 if (txn->mt_dbs[dbi].md_flags & (0
568 #if __BYTE_ORDER == __LITTLE_ENDIAN
572 return memnrcmp(a->mv_data, a->mv_size, b->mv_data, b->mv_size);
574 return memncmp((char *)a->mv_data, a->mv_size, b->mv_data, b->mv_size);
577 /* Allocate new page(s) for writing */
579 mdb_alloc_page(MDB_txn *txn, MDB_dbi dbi, MDB_page *parent, unsigned int parent_idx, int num)
582 pgno_t pgno = P_INVALID;
585 if (txn->mt_txnid > 2) {
587 if (!txn->mt_env->me_pghead && dbi != FREE_DBI &&
588 txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
589 /* See if there's anything in the free DB */
594 mpp.mp_parent = NULL;
596 mdb_search_page(txn, FREE_DBI, NULL, NULL, 0, &mpp);
597 leaf = NODEPTR(mpp.mp_page, 0);
598 kptr = (ULONG *)NODEKEY(leaf);
602 oldest = txn->mt_txnid - 1;
603 for (i=0; i<txn->mt_env->me_txns->mti_numreaders; i++) {
604 ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
605 if (mr && mr < oldest)
610 if (oldest > *kptr) {
611 /* It's usable, grab it.
617 mdb_read_data(txn, leaf, &data);
618 idl = (ULONG *)data.mv_data;
619 mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
620 mop->mo_next = txn->mt_env->me_pghead;
621 mop->mo_txnid = *kptr;
622 txn->mt_env->me_pghead = mop;
623 memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
628 DPRINTF("IDL read txn %lu root %lu num %lu",
629 mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
630 for (i=0; i<idl[0]; i++) {
631 DPRINTF("IDL %lu", idl[i+1]);
635 /* drop this IDL from the DB */
636 mpp.mp_parent = NULL;
638 mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp);
639 leaf = NODEPTR(mpp.mp_page, 0);
640 mdb_del0(txn, FREE_DBI, 0, &mpp, leaf);
643 if (txn->mt_env->me_pghead) {
644 MDB_oldpages *mop = txn->mt_env->me_pghead;
646 /* FIXME: For now, always use fresh pages. We
647 * really ought to search the free list for a
652 /* peel pages off tail, so we only have to truncate the list */
653 pgno = MDB_IDL_LAST(mop->mo_pages);
654 if (MDB_IDL_IS_RANGE(mop->mo_pages)) {
656 if (mop->mo_pages[2] > mop->mo_pages[1])
657 mop->mo_pages[0] = 0;
661 if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
662 txn->mt_env->me_pghead = mop->mo_next;
669 if (pgno == P_INVALID) {
670 /* DB size is maxed out */
671 if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg)
674 if (txn->mt_env->me_dpages && num == 1) {
675 dp = txn->mt_env->me_dpages;
676 txn->mt_env->me_dpages = (MDB_dpage *)dp->h.md_parent;
678 if ((dp = malloc(txn->mt_env->me_psize * num + sizeof(MDB_dhead))) == NULL)
682 dp->h.md_parent = parent;
683 dp->h.md_pi = parent_idx;
684 if (pgno == P_INVALID) {
685 dp->p.mp_pgno = txn->mt_next_pgno;
686 txn->mt_next_pgno += num;
688 dp->p.mp_pgno = pgno;
690 mid.mid = dp->p.mp_pgno;
692 mdb_midl2_insert(txn->mt_u.dirty_list, &mid);
697 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
700 mdb_touch(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *pp)
702 MDB_page *mp = pp->mp_page;
707 if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
709 if ((dp = mdb_alloc_page(txn, dbi, pp->mp_parent, pp->mp_pi, 1)) == NULL)
711 DPRINTF("touched db %u page %lu -> %lu", dbi, mp->mp_pgno, dp->p.mp_pgno);
712 assert(mp->mp_pgno != dp->p.mp_pgno);
713 mdb_midl_insert(txn->mt_free_pgs, mp->mp_pgno);
714 pgno = dp->p.mp_pgno;
715 memcpy(&dp->p, mp, txn->mt_env->me_psize);
718 mp->mp_flags |= P_DIRTY;
720 /* Update the page number to new touched page. */
721 if (pp->mp_parent != NULL)
722 NODEPGNO(NODEPTR(pp->mp_parent, pp->mp_pi)) = mp->mp_pgno;
729 mdb_env_sync(MDB_env *env, int force)
732 if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) {
733 if (fdatasync(env->me_fd))
740 mdb_txn_reset0(MDB_txn *txn);
743 mdb_txn_renew0(MDB_txn *txn)
745 MDB_env *env = txn->mt_env;
747 if (txn->mt_flags & MDB_TXN_RDONLY) {
748 MDB_reader *r = pthread_getspecific(env->me_txkey);
751 pid_t pid = getpid();
752 pthread_t tid = pthread_self();
754 pthread_mutex_lock(&env->me_txns->mti_mutex);
755 for (i=0; i<env->me_txns->mti_numreaders; i++)
756 if (env->me_txns->mti_readers[i].mr_pid == 0)
758 if (i == env->me_maxreaders) {
759 pthread_mutex_unlock(&env->me_txns->mti_mutex);
762 env->me_txns->mti_readers[i].mr_pid = pid;
763 env->me_txns->mti_readers[i].mr_tid = tid;
764 if (i >= env->me_txns->mti_numreaders)
765 env->me_txns->mti_numreaders = i+1;
766 pthread_mutex_unlock(&env->me_txns->mti_mutex);
767 r = &env->me_txns->mti_readers[i];
768 pthread_setspecific(env->me_txkey, r);
770 txn->mt_txnid = env->me_txns->mti_txnid;
771 txn->mt_toggle = env->me_txns->mti_me_toggle;
772 r->mr_txnid = txn->mt_txnid;
773 txn->mt_u.reader = r;
775 pthread_mutex_lock(&env->me_txns->mti_wmutex);
777 txn->mt_txnid = env->me_txns->mti_txnid+1;
778 txn->mt_toggle = env->me_txns->mti_me_toggle;
779 txn->mt_u.dirty_list = env->me_dirty_list;
780 txn->mt_u.dirty_list[0].mid = 0;
781 txn->mt_free_pgs = env->me_free_pgs;
782 txn->mt_free_pgs[0] = 0;
783 txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
787 /* Copy the DB arrays */
788 LAZY_RWLOCK_RDLOCK(&env->me_dblock);
789 txn->mt_numdbs = env->me_numdbs;
790 txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
791 memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db));
792 if (txn->mt_numdbs > 2)
793 memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2,
794 (txn->mt_numdbs - 2) * sizeof(MDB_db));
795 LAZY_RWLOCK_UNLOCK(&env->me_dblock);
801 mdb_txn_renew(MDB_txn *txn)
808 if (txn->mt_env->me_flags & MDB_FATAL_ERROR) {
809 DPUTS("environment had fatal error, must shutdown!");
813 rc = mdb_txn_renew0(txn);
814 if (rc == MDB_SUCCESS) {
815 DPRINTF("renew txn %lu%c %p on mdbenv %p, root page %lu",
816 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
817 (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
823 mdb_txn_begin(MDB_env *env, unsigned int flags, MDB_txn **ret)
828 if (env->me_flags & MDB_FATAL_ERROR) {
829 DPUTS("environment had fatal error, must shutdown!");
832 if ((txn = calloc(1, sizeof(MDB_txn) + env->me_maxdbs * sizeof(MDB_db))) == NULL) {
833 DPRINTF("calloc: %s", strerror(errno));
836 txn->mt_dbs = (MDB_db *)(txn+1);
837 if (flags & MDB_RDONLY) {
838 txn->mt_flags |= MDB_TXN_RDONLY;
842 rc = mdb_txn_renew0(txn);
847 DPRINTF("begin txn %lu%c %p on mdbenv %p, root page %lu",
848 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
849 (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
856 mdb_txn_reset0(MDB_txn *txn)
858 MDB_env *env = txn->mt_env;
860 if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
861 txn->mt_u.reader->mr_txnid = 0;
867 /* return all dirty pages to dpage list */
868 for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
869 dp = txn->mt_u.dirty_list[i].mptr;
870 if (dp->h.md_num == 1) {
871 dp->h.md_parent = (MDB_page *)txn->mt_env->me_dpages;
872 txn->mt_env->me_dpages = dp;
874 /* large pages just get freed directly */
879 while ((mop = txn->mt_env->me_pghead)) {
880 txn->mt_env->me_pghead = mop->mo_next;
885 for (i=2; i<env->me_numdbs; i++)
886 env->me_dbxs[i].md_dirty = 0;
887 pthread_mutex_unlock(&env->me_txns->mti_wmutex);
892 mdb_txn_reset(MDB_txn *txn)
897 DPRINTF("reset txn %lu%c %p on mdbenv %p, root page %lu",
898 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
899 (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
905 mdb_txn_abort(MDB_txn *txn)
910 DPRINTF("abort txn %lu%c %p on mdbenv %p, root page %lu",
911 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
912 (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
919 mdb_txn_commit(MDB_txn *txn)
928 struct iovec iov[MDB_COMMIT_PAGES];
931 assert(txn->mt_env != NULL);
935 if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
940 if (txn != env->me_txn) {
941 DPUTS("attempt to commit unknown transaction");
946 if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
947 DPUTS("error flag is set, can't commit");
952 if (!txn->mt_u.dirty_list[0].mid)
955 DPRINTF("committing txn %lu %p on mdbenv %p, root page %lu",
956 txn->mt_txnid, txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
958 /* should only be one record now */
959 if (env->me_pghead) {
962 /* make sure first page of freeDB is touched and on freelist */
963 mpp.mp_parent = NULL;
965 mdb_search_page(txn, FREE_DBI, NULL, NULL, 1, &mpp);
967 /* save to free list */
968 if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
973 /* make sure last page of freeDB is touched and on freelist */
974 key.mv_size = MAXKEYSIZE+1;
976 mpp.mp_parent = NULL;
978 mdb_search_page(txn, FREE_DBI, &key, NULL, 1, &mpp);
983 ULONG *idl = txn->mt_free_pgs;
984 DPRINTF("IDL write txn %lu root %lu num %lu",
985 txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
986 for (i=0; i<idl[0]; i++) {
987 DPRINTF("IDL %lu", idl[i+1]);
991 /* write to last page of freeDB */
992 key.mv_size = sizeof(pgno_t);
993 key.mv_data = (char *)&txn->mt_txnid;
994 data.mv_data = txn->mt_free_pgs;
995 /* The free list can still grow during this call,
996 * despite the pre-emptive touches above. So check
997 * and make sure the entire thing got written.
1000 i = txn->mt_free_pgs[0];
1001 data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
1002 rc = mdb_put0(txn, FREE_DBI, &key, &data, 0);
1007 } while (i != txn->mt_free_pgs[0]);
1009 /* should only be one record now */
1010 if (env->me_pghead) {
1014 mop = env->me_pghead;
1015 key.mv_size = sizeof(pgno_t);
1016 key.mv_data = (char *)&mop->mo_txnid;
1017 data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
1018 data.mv_data = mop->mo_pages;
1019 mdb_put0(txn, FREE_DBI, &key, &data, 0);
1020 free(env->me_pghead);
1021 env->me_pghead = NULL;
1024 /* Update DB root pointers. Their pages have already been
1025 * touched so this is all in-place and cannot fail.
1029 data.mv_size = sizeof(MDB_db);
1031 for (i = 2; i < txn->mt_numdbs; i++) {
1032 if (txn->mt_dbxs[i].md_dirty) {
1033 data.mv_data = &txn->mt_dbs[i];
1034 mdb_put0(txn, MAIN_DBI, &txn->mt_dbxs[i].md_name, &data, 0);
1039 /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
1047 for (; i<=txn->mt_u.dirty_list[0].mid; i++) {
1048 dp = txn->mt_u.dirty_list[i].mptr;
1049 if (dp->p.mp_pgno != next) {
1051 DPRINTF("committing %u dirty pages", n);
1052 rc = writev(env->me_fd, iov, n);
1056 DPUTS("short write, filesystem full?");
1058 DPRINTF("writev: %s", strerror(errno));
1065 lseek(env->me_fd, dp->p.mp_pgno * env->me_psize, SEEK_SET);
1066 next = dp->p.mp_pgno;
1068 DPRINTF("committing page %lu", dp->p.mp_pgno);
1069 iov[n].iov_len = env->me_psize * dp->h.md_num;
1070 iov[n].iov_base = &dp->p;
1071 size += iov[n].iov_len;
1072 next = dp->p.mp_pgno + dp->h.md_num;
1073 /* clear dirty flag */
1074 dp->p.mp_flags &= ~P_DIRTY;
1075 if (++n >= MDB_COMMIT_PAGES) {
1085 DPRINTF("committing %u dirty pages", n);
1086 rc = writev(env->me_fd, iov, n);
1090 DPUTS("short write, filesystem full?");
1092 DPRINTF("writev: %s", strerror(errno));
1099 /* Drop the dirty pages.
1101 for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
1102 dp = txn->mt_u.dirty_list[i].mptr;
1103 if (dp->h.md_num == 1) {
1104 dp->h.md_parent = (MDB_page *)txn->mt_env->me_dpages;
1105 txn->mt_env->me_dpages = dp;
1109 txn->mt_u.dirty_list[i].mid = 0;
1111 txn->mt_u.dirty_list[0].mid = 0;
1113 if ((n = mdb_env_sync(env, 0)) != 0 ||
1114 (n = mdb_env_write_meta(txn)) != MDB_SUCCESS) {
1121 /* update the DB tables */
1123 int toggle = !env->me_db_toggle;
1126 ip = &env->me_dbs[toggle][2];
1127 jp = &txn->mt_dbs[2];
1128 LAZY_RWLOCK_WRLOCK(&env->me_dblock);
1129 for (i = 2; i < txn->mt_numdbs; i++) {
1130 if (ip->md_root != jp->md_root)
1135 for (i = 2; i < txn->mt_numdbs; i++) {
1136 if (txn->mt_dbxs[i].md_dirty)
1137 txn->mt_dbxs[i].md_dirty = 0;
1139 env->me_db_toggle = toggle;
1140 env->me_numdbs = txn->mt_numdbs;
1141 LAZY_RWLOCK_UNLOCK(&env->me_dblock);
1144 pthread_mutex_unlock(&env->me_txns->mti_wmutex);
1151 mdb_env_read_header(MDB_env *env, MDB_meta *meta)
1153 char page[PAGESIZE];
1158 assert(env != NULL);
1160 /* We don't know the page size yet, so use a minimum value.
1163 if ((rc = pread(env->me_fd, page, PAGESIZE, 0)) == 0) {
1165 } else if (rc != PAGESIZE) {
1168 DPRINTF("read: %s", strerror(errno));
1172 p = (MDB_page *)page;
1174 if (!F_ISSET(p->mp_flags, P_META)) {
1175 DPRINTF("page %lu not a meta page", p->mp_pgno);
1180 if (m->mm_magic != MDB_MAGIC) {
1181 DPUTS("meta has invalid magic");
1185 if (m->mm_version != MDB_VERSION) {
1186 DPRINTF("database is version %u, expected version %u",
1187 m->mm_version, MDB_VERSION);
1188 return MDB_VERSION_MISMATCH;
1191 memcpy(meta, m, sizeof(*m));
1196 mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
1203 DPUTS("writing new meta page");
1204 psize = sysconf(_SC_PAGE_SIZE);
1206 meta->mm_magic = MDB_MAGIC;
1207 meta->mm_version = MDB_VERSION;
1208 meta->mm_psize = psize;
1209 meta->mm_last_pg = 1;
1210 meta->mm_flags = env->me_flags & 0xffff;
1211 meta->mm_flags |= MDB_INTEGERKEY;
1212 meta->mm_dbs[0].md_root = P_INVALID;
1213 meta->mm_dbs[1].md_root = P_INVALID;
1215 p = calloc(2, psize);
1217 p->mp_flags = P_META;
1220 memcpy(m, meta, sizeof(*meta));
1222 q = (MDB_page *)((char *)p + psize);
1225 q->mp_flags = P_META;
1228 memcpy(m, meta, sizeof(*meta));
1230 rc = write(env->me_fd, p, psize * 2);
1232 return (rc == (int)psize * 2) ? MDB_SUCCESS : errno;
1236 mdb_env_write_meta(MDB_txn *txn)
1239 MDB_meta meta, metab;
1241 int rc, len, toggle;
1244 assert(txn != NULL);
1245 assert(txn->mt_env != NULL);
1247 toggle = !txn->mt_toggle;
1248 DPRINTF("writing meta page %d for root page %lu",
1249 toggle, txn->mt_dbs[MAIN_DBI].md_root);
1253 metab.mm_txnid = env->me_metas[toggle]->mm_txnid;
1254 metab.mm_last_pg = env->me_metas[toggle]->mm_last_pg;
1256 ptr = (char *)&meta;
1257 off = offsetof(MDB_meta, mm_dbs[0].md_depth);
1258 len = sizeof(MDB_meta) - off;
1261 meta.mm_dbs[0] = txn->mt_dbs[0];
1262 meta.mm_dbs[1] = txn->mt_dbs[1];
1263 meta.mm_last_pg = txn->mt_next_pgno - 1;
1264 meta.mm_txnid = txn->mt_txnid;
1267 off += env->me_psize;
1270 /* Write to the SYNC fd */
1271 rc = pwrite(env->me_mfd, ptr, len, off);
1275 DPUTS("write failed, disk error?");
1276 /* On a failure, the pagecache still contains the new data.
1277 * Write some old data back, to prevent it from being used.
1278 * Use the non-SYNC fd; we know it will fail anyway.
1280 meta.mm_last_pg = metab.mm_last_pg;
1281 meta.mm_txnid = metab.mm_txnid;
1282 r2 = pwrite(env->me_fd, ptr, len, off);
1283 env->me_flags |= MDB_FATAL_ERROR;
1286 /* Memory ordering issues are irrelevant; since the entire writer
1287 * is wrapped by wmutex, all of these changes will become visible
1288 * after the wmutex is unlocked. Since the DB is multi-version,
1289 * readers will get consistent data regardless of how fresh or
1290 * how stale their view of these values is.
1292 LAZY_MUTEX_LOCK(&env->me_txns->mti_mutex);
1293 txn->mt_env->me_txns->mti_me_toggle = toggle;
1294 txn->mt_env->me_txns->mti_txnid = txn->mt_txnid;
1295 LAZY_MUTEX_UNLOCK(&env->me_txns->mti_mutex);
1301 mdb_env_read_meta(MDB_env *env, int *which)
1305 assert(env != NULL);
1307 if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
1310 DPRINTF("Using meta page %d", toggle);
1317 mdb_env_create(MDB_env **env)
1321 e = calloc(1, sizeof(MDB_env));
1322 if (!e) return ENOMEM;
1324 e->me_maxreaders = DEFAULT_READERS;
1334 mdb_env_set_mapsize(MDB_env *env, size_t size)
1338 env->me_mapsize = size;
1343 mdb_env_set_maxdbs(MDB_env *env, int dbs)
1347 env->me_maxdbs = dbs;
1352 mdb_env_set_maxreaders(MDB_env *env, int readers)
1356 env->me_maxreaders = readers;
1361 mdb_env_get_maxreaders(MDB_env *env, int *readers)
1363 if (!env || !readers)
1365 *readers = env->me_maxreaders;
1370 mdb_env_open2(MDB_env *env, unsigned int flags)
1372 int i, newenv = 0, toggle;
1376 env->me_flags = flags;
1378 memset(&meta, 0, sizeof(meta));
1380 if ((i = mdb_env_read_header(env, &meta)) != 0) {
1383 DPUTS("new mdbenv");
1387 if (!env->me_mapsize) {
1388 env->me_mapsize = newenv ? DEFAULT_MAPSIZE : meta.mm_mapsize;
1392 if (meta.mm_address && (flags & MDB_FIXEDMAP))
1394 env->me_map = mmap(meta.mm_address, env->me_mapsize, PROT_READ, i,
1396 if (env->me_map == MAP_FAILED)
1400 meta.mm_mapsize = env->me_mapsize;
1401 if (flags & MDB_FIXEDMAP)
1402 meta.mm_address = env->me_map;
1403 i = mdb_env_init_meta(env, &meta);
1404 if (i != MDB_SUCCESS) {
1405 munmap(env->me_map, env->me_mapsize);
1409 env->me_psize = meta.mm_psize;
1411 env->me_maxpg = env->me_mapsize / env->me_psize;
1413 p = (MDB_page *)env->me_map;
1414 env->me_metas[0] = METADATA(p);
1415 env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize);
1417 if ((i = mdb_env_read_meta(env, &toggle)) != 0)
1420 DPRINTF("opened database version %u, pagesize %u",
1421 env->me_metas[toggle]->mm_version, env->me_psize);
1422 DPRINTF("depth: %u", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_depth);
1423 DPRINTF("entries: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_entries);
1424 DPRINTF("branch pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_branch_pages);
1425 DPRINTF("leaf pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_leaf_pages);
1426 DPRINTF("overflow pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_overflow_pages);
1427 DPRINTF("root: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_root);
1433 mdb_env_reader_dest(void *ptr)
1435 MDB_reader *reader = ptr;
1437 reader->mr_txnid = 0;
1442 /* downgrade the exclusive lock on the region back to shared */
1444 mdb_env_share_locks(MDB_env *env)
1446 struct flock lock_info;
1449 if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
1451 env->me_txns->mti_me_toggle = toggle;
1452 env->me_txns->mti_txnid = env->me_metas[toggle]->mm_txnid;
1454 memset((void *)&lock_info, 0, sizeof(lock_info));
1455 lock_info.l_type = F_RDLCK;
1456 lock_info.l_whence = SEEK_SET;
1457 lock_info.l_start = 0;
1458 lock_info.l_len = 1;
1459 fcntl(env->me_lfd, F_SETLK, &lock_info);
1463 mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
1467 struct flock lock_info;
1471 if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT, mode)) == -1) {
1475 /* Try to get exclusive lock. If we succeed, then
1476 * nobody is using the lock region and we should initialize it.
1478 memset((void *)&lock_info, 0, sizeof(lock_info));
1479 lock_info.l_type = F_WRLCK;
1480 lock_info.l_whence = SEEK_SET;
1481 lock_info.l_start = 0;
1482 lock_info.l_len = 1;
1483 rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
1487 lock_info.l_type = F_RDLCK;
1488 rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
1494 size = lseek(env->me_lfd, 0, SEEK_END);
1495 rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
1496 if (size < rsize && *excl) {
1497 if (ftruncate(env->me_lfd, rsize) != 0) {
1503 size = rsize - sizeof(MDB_txninfo);
1504 env->me_maxreaders = size/sizeof(MDB_reader) + 1;
1506 env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
1508 if (env->me_txns == MAP_FAILED) {
1513 pthread_mutexattr_t mattr;
1515 pthread_mutexattr_init(&mattr);
1516 rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
1520 pthread_mutex_init(&env->me_txns->mti_mutex, &mattr);
1521 pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr);
1522 env->me_txns->mti_version = MDB_VERSION;
1523 env->me_txns->mti_magic = MDB_MAGIC;
1524 env->me_txns->mti_txnid = 0;
1525 env->me_txns->mti_numreaders = 0;
1526 env->me_txns->mti_me_toggle = 0;
1529 if (env->me_txns->mti_magic != MDB_MAGIC) {
1530 DPUTS("lock region has invalid magic");
1534 if (env->me_txns->mti_version != MDB_VERSION) {
1535 DPRINTF("lock region is version %u, expected version %u",
1536 env->me_txns->mti_version, MDB_VERSION);
1537 rc = MDB_VERSION_MISMATCH;
1540 if (errno != EACCES && errno != EAGAIN) {
1554 #define LOCKNAME "/lock.mdb"
1555 #define DATANAME "/data.mdb"
1557 mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
1559 int oflags, rc, len, excl;
1560 char *lpath, *dpath;
1563 lpath = malloc(len + sizeof(LOCKNAME) + len + sizeof(DATANAME));
1566 dpath = lpath + len + sizeof(LOCKNAME);
1567 sprintf(lpath, "%s" LOCKNAME, path);
1568 sprintf(dpath, "%s" DATANAME, path);
1570 rc = mdb_env_setup_locks(env, lpath, mode, &excl);
1574 if (F_ISSET(flags, MDB_RDONLY))
1577 oflags = O_RDWR | O_CREAT;
1579 if ((env->me_fd = open(dpath, oflags, mode)) == -1) {
1584 if ((rc = mdb_env_open2(env, flags)) == MDB_SUCCESS) {
1585 /* synchronous fd for meta writes */
1586 if (!(flags & (MDB_RDONLY|MDB_NOSYNC)))
1587 oflags |= MDB_DSYNC;
1588 if ((env->me_mfd = open(dpath, oflags, mode)) == -1) {
1593 env->me_path = strdup(path);
1594 DPRINTF("opened dbenv %p", (void *) env);
1595 pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
1596 LAZY_RWLOCK_INIT(&env->me_dblock, NULL);
1598 mdb_env_share_locks(env);
1599 env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
1600 env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db));
1601 env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db));
1607 if (env->me_fd >= 0) {
1611 if (env->me_lfd >= 0) {
1621 mdb_env_close(MDB_env *env)
1628 while (env->me_dpages) {
1629 dp = env->me_dpages;
1630 env->me_dpages = (MDB_dpage *)dp->h.md_parent;
1634 free(env->me_dbs[1]);
1635 free(env->me_dbs[0]);
1639 LAZY_RWLOCK_DESTROY(&env->me_dblock);
1640 pthread_key_delete(env->me_txkey);
1643 munmap(env->me_map, env->me_mapsize);
1648 pid_t pid = getpid();
1649 size_t size = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
1651 for (i=0; i<env->me_txns->mti_numreaders; i++)
1652 if (env->me_txns->mti_readers[i].mr_pid == pid)
1653 env->me_txns->mti_readers[i].mr_pid = 0;
1654 munmap(env->me_txns, size);
1660 /* Search for key within a leaf page, using binary search.
1661 * Returns the smallest entry larger or equal to the key.
1662 * If exactp is non-null, stores whether the found entry was an exact match
1663 * in *exactp (1 or 0).
1664 * If kip is non-null, stores the index of the found entry in *kip.
1665 * If no entry larger or equal to the key is found, returns NULL.
1668 mdb_search_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, MDB_val *key,
1669 int *exactp, unsigned int *kip)
1674 MDB_node *node = NULL;
1678 DPRINTF("searching %u keys in %s page %lu",
1680 IS_LEAF(mp) ? "leaf" : "branch",
1683 assert(NUMKEYS(mp) > 0);
1685 memset(&nodekey, 0, sizeof(nodekey));
1687 low = IS_LEAF(mp) ? 0 : 1;
1688 high = NUMKEYS(mp) - 1;
1689 while (low <= high) {
1690 i = (low + high) >> 1;
1693 nodekey.mv_size = txn->mt_dbs[dbi].md_pad;
1694 nodekey.mv_data = LEAF2KEY(mp, i, nodekey.mv_size);
1696 node = NODEPTR(mp, i);
1698 nodekey.mv_size = node->mn_ksize;
1699 nodekey.mv_data = NODEKEY(node);
1702 rc = mdb_cmp(txn, dbi, key, &nodekey);
1705 DPRINTF("found leaf index %u [%s], rc = %i",
1706 i, DKEY(&nodekey), rc);
1708 DPRINTF("found branch index %u [%s -> %lu], rc = %i",
1709 i, DKEY(&nodekey), NODEPGNO(node), rc);
1719 if (rc > 0) { /* Found entry is less than the key. */
1720 i++; /* Skip to get the smallest entry larger than key. */
1721 if (i >= NUMKEYS(mp))
1722 /* There is no entry larger or equal to the key. */
1726 *exactp = (rc == 0);
1727 if (kip) /* Store the key index if requested. */
1730 /* nodeptr is fake for LEAF2 */
1731 return IS_LEAF2(mp) ? NODEPTR(mp, 0) : NODEPTR(mp, i);
1735 cursor_pop_page(MDB_cursor *cursor)
1739 if (cursor->mc_snum) {
1740 top = CURSOR_TOP(cursor);
1743 DPRINTF("popped page %lu off db %u cursor %p", top->mp_page->mp_pgno,
1744 cursor->mc_dbi, (void *) cursor);
1749 cursor_push_page(MDB_cursor *cursor, MDB_page *mp)
1753 DPRINTF("pushing page %lu on db %u cursor %p", mp->mp_pgno,
1754 cursor->mc_dbi, (void *) cursor);
1756 assert(cursor->mc_snum < CURSOR_STACK);
1758 ppage = &cursor->mc_stack[cursor->mc_snum++];
1759 ppage->mp_page = mp;
1765 mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **ret)
1769 if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) {
1774 x = mdb_midl2_search(txn->mt_u.dirty_list, &id);
1775 if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) {
1776 dp = txn->mt_u.dirty_list[x].mptr;
1781 if (pgno <= txn->mt_env->me_metas[txn->mt_toggle]->mm_last_pg)
1782 p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
1786 DPRINTF("page %lu not found", pgno);
1789 return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND;
1793 mdb_search_page_root(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
1794 MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1796 MDB_page *mp = mpp->mp_page;
1800 if (cursor && cursor_push_page(cursor, mp) == NULL)
1803 while (IS_BRANCH(mp)) {
1807 DPRINTF("branch page %lu has %u keys", mp->mp_pgno, NUMKEYS(mp));
1808 assert(NUMKEYS(mp) > 1);
1809 DPRINTF("found index 0 to page %lu", NODEPGNO(NODEPTR(mp, 0)));
1811 if (key == NULL) /* Initialize cursor to first page. */
1813 else if (key->mv_size > MAXKEYSIZE && key->mv_data == NULL) {
1814 /* cursor to last page */
1818 node = mdb_search_node(txn, dbi, mp, key, &exact, &i);
1820 i = NUMKEYS(mp) - 1;
1828 DPRINTF("following index %u for key [%s]",
1830 assert(i < NUMKEYS(mp));
1831 node = NODEPTR(mp, i);
1834 CURSOR_TOP(cursor)->mp_ki = i;
1836 mpp->mp_parent = mp;
1837 if ((rc = mdb_get_page(txn, NODEPGNO(node), &mp)))
1842 if (cursor && cursor_push_page(cursor, mp) == NULL)
1846 MDB_dhead *dh = ((MDB_dhead *)mp)-1;
1847 if ((rc = mdb_touch(txn, dbi, mpp)) != 0)
1849 dh = ((MDB_dhead *)mpp->mp_page)-1;
1850 dh->md_parent = mpp->mp_parent;
1851 dh->md_pi = mpp->mp_pi;
1858 DPRINTF("internal error, index points to a %02X page!?",
1860 return MDB_CORRUPTED;
1863 DPRINTF("found leaf page %lu for key [%s]", mp->mp_pgno,
1864 key ? DKEY(key) : NULL);
1869 /* Search for the page a given key should be in.
1870 * Stores a pointer to the found page in *mpp.
1871 * If key is NULL, search for the lowest page (used by mdb_cursor_first).
1872 * If cursor is non-null, pushes parent pages on the cursor stack.
1873 * If modify is true, visited pages are updated with new page numbers.
1876 mdb_search_page(MDB_txn *txn, MDB_dbi dbi, MDB_val *key,
1877 MDB_cursor *cursor, int modify, MDB_pageparent *mpp)
1882 /* Choose which root page to start with. If a transaction is given
1883 * use the root page from the transaction, otherwise read the last
1884 * committed root page.
1886 if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
1887 DPUTS("transaction has failed, must abort");
1890 root = txn->mt_dbs[dbi].md_root;
1892 if (root == P_INVALID) { /* Tree is empty. */
1893 DPUTS("tree is empty");
1894 return MDB_NOTFOUND;
1897 if ((rc = mdb_get_page(txn, root, &mpp->mp_page)))
1900 DPRINTF("db %u root page %lu has flags 0x%X",
1901 dbi, root, mpp->mp_page->mp_flags);
1904 /* For sub-databases, update main root first */
1905 if (dbi > MAIN_DBI && !txn->mt_dbxs[dbi].md_dirty) {
1907 rc = mdb_search_page(txn, MAIN_DBI, &txn->mt_dbxs[dbi].md_name,
1911 txn->mt_dbxs[dbi].md_dirty = 1;
1913 if (!F_ISSET(mpp->mp_page->mp_flags, P_DIRTY)) {
1914 mpp->mp_parent = NULL;
1916 if ((rc = mdb_touch(txn, dbi, mpp)))
1918 txn->mt_dbs[dbi].md_root = mpp->mp_page->mp_pgno;
1922 return mdb_search_page_root(txn, dbi, key, cursor, modify, mpp);
1926 mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
1928 MDB_page *omp; /* overflow mpage */
1932 if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
1933 data->mv_size = leaf->mn_dsize;
1934 data->mv_data = NODEDATA(leaf);
1938 /* Read overflow data.
1940 data->mv_size = leaf->mn_dsize;
1941 memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
1942 if ((rc = mdb_get_page(txn, pgno, &omp))) {
1943 DPRINTF("read overflow page %lu failed", pgno);
1946 data->mv_data = METADATA(omp);
1952 mdb_get(MDB_txn *txn, MDB_dbi dbi,
1953 MDB_val *key, MDB_val *data)
1962 DPRINTF("===> get db %u key [%s]", dbi, DKEY(key));
1964 if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
1967 if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
1971 if ((rc = mdb_search_page(txn, dbi, key, NULL, 0, &mpp)) != MDB_SUCCESS)
1974 leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, NULL);
1975 if (leaf && exact) {
1976 /* Return first duplicate data item */
1977 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
1980 mdb_xcursor_init0(txn, dbi, &mx);
1981 mdb_xcursor_init1(txn, dbi, &mx, mpp.mp_page, leaf);
1982 rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi, NULL, NULL, 0, &mpp);
1983 if (rc != MDB_SUCCESS)
1985 if (IS_LEAF2(mpp.mp_page)) {
1986 data->mv_size = txn->mt_dbs[dbi].md_pad;
1987 data->mv_data = LEAF2KEY(mpp.mp_page, 0, data->mv_size);
1989 leaf = NODEPTR(mpp.mp_page, 0);
1990 data->mv_size = NODEKSZ(leaf);
1991 data->mv_data = NODEKEY(leaf);
1994 rc = mdb_read_data(txn, leaf, data);
2004 mdb_sibling(MDB_cursor *cursor, int move_right)
2011 if (cursor->mc_snum < 2) {
2012 return MDB_NOTFOUND; /* root has no siblings */
2014 parent = CURSOR_PARENT(cursor);
2016 DPRINTF("parent page is page %lu, index %u",
2017 parent->mp_page->mp_pgno, parent->mp_ki);
2019 cursor_pop_page(cursor);
2020 if (move_right ? (parent->mp_ki + 1 >= NUMKEYS(parent->mp_page))
2021 : (parent->mp_ki == 0)) {
2022 DPRINTF("no more keys left, moving to %s sibling",
2023 move_right ? "right" : "left");
2024 if ((rc = mdb_sibling(cursor, move_right)) != MDB_SUCCESS)
2026 parent = CURSOR_TOP(cursor);
2032 DPRINTF("just moving to %s index key %u",
2033 move_right ? "right" : "left", parent->mp_ki);
2035 assert(IS_BRANCH(parent->mp_page));
2037 indx = NODEPTR(parent->mp_page, parent->mp_ki);
2038 if ((rc = mdb_get_page(cursor->mc_txn, NODEPGNO(indx), &mp)))
2041 mp->parent = parent->mp_page;
2042 mp->parent_index = parent->mp_ki;
2045 cursor_push_page(cursor, mp);
2051 mdb_cursor_next(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op)
2058 if (cursor->mc_eof) {
2059 return MDB_NOTFOUND;
2062 assert(cursor->mc_initialized);
2064 top = CURSOR_TOP(cursor);
2067 if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
2068 leaf = NODEPTR(mp, top->mp_ki);
2069 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2070 if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
2071 rc = mdb_cursor_next(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT);
2072 if (op != MDB_NEXT || rc == MDB_SUCCESS)
2076 cursor->mc_xcursor->mx_cursor.mc_initialized = 0;
2077 if (op == MDB_NEXT_DUP)
2078 return MDB_NOTFOUND;
2082 DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
2084 if (top->mp_ki + 1 >= NUMKEYS(mp)) {
2085 DPUTS("=====> move to next sibling page");
2086 if (mdb_sibling(cursor, 1) != MDB_SUCCESS) {
2088 return MDB_NOTFOUND;
2090 top = CURSOR_TOP(cursor);
2092 DPRINTF("next page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
2096 DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
2097 mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
2100 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2101 key->mv_data = LEAF2KEY(mp, top->mp_ki, key->mv_size);
2105 assert(IS_LEAF(mp));
2106 leaf = NODEPTR(mp, top->mp_ki);
2108 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2109 mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mp, leaf);
2112 if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
2115 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2116 rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
2117 if (rc != MDB_SUCCESS)
2122 MDB_SET_KEY(leaf, key);
2127 mdb_cursor_prev(MDB_cursor *cursor, MDB_val *key, MDB_val *data, MDB_cursor_op op)
2134 assert(cursor->mc_initialized);
2136 top = CURSOR_TOP(cursor);
2139 if (cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPSORT) {
2140 leaf = NODEPTR(mp, top->mp_ki);
2141 if (op == MDB_PREV || op == MDB_PREV_DUP) {
2142 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2143 rc = mdb_cursor_prev(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
2144 if (op != MDB_PREV || rc == MDB_SUCCESS)
2147 cursor->mc_xcursor->mx_cursor.mc_initialized = 0;
2148 if (op == MDB_PREV_DUP)
2149 return MDB_NOTFOUND;
2154 DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) cursor);
2156 if (top->mp_ki == 0) {
2157 DPUTS("=====> move to prev sibling page");
2158 if (mdb_sibling(cursor, 0) != MDB_SUCCESS) {
2159 cursor->mc_initialized = 0;
2160 return MDB_NOTFOUND;
2162 top = CURSOR_TOP(cursor);
2164 top->mp_ki = NUMKEYS(mp) - 1;
2165 DPRINTF("prev page is %lu, key index %u", mp->mp_pgno, top->mp_ki);
2171 DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
2172 mp->mp_pgno, NUMKEYS(mp), top->mp_ki);
2175 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2176 key->mv_data = LEAF2KEY(mp, top->mp_ki, key->mv_size);
2180 assert(IS_LEAF(mp));
2181 leaf = NODEPTR(mp, top->mp_ki);
2183 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2184 mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mp, leaf);
2187 if ((rc = mdb_read_data(cursor->mc_txn, leaf, data) != MDB_SUCCESS))
2190 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2191 rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
2192 if (rc != MDB_SUCCESS)
2197 MDB_SET_KEY(leaf, key);
2202 mdb_cursor_set(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
2203 MDB_cursor_op op, int *exactp)
2213 assert(key->mv_size > 0);
2215 /* See if we're already on the right page */
2216 if (cursor->mc_initialized) {
2218 top = CURSOR_TOP(cursor);
2219 /* Don't try this for LEAF2 pages. Maybe support that later. */
2220 if ((top->mp_page->mp_flags & (P_LEAF|P_LEAF2)) == P_LEAF) {
2221 leaf = NODEPTR(top->mp_page, 0);
2222 MDB_SET_KEY(leaf, &nodekey);
2223 rc = mdb_cmp(cursor->mc_txn, cursor->mc_dbi, key, &nodekey);
2225 leaf = NODEPTR(top->mp_page, NUMKEYS(top->mp_page)-1);
2226 MDB_SET_KEY(leaf, &nodekey);
2227 rc = mdb_cmp(cursor->mc_txn, cursor->mc_dbi, key, &nodekey);
2229 /* we're already on the right page */
2230 mpp.mp_page = top->mp_page;
2237 cursor->mc_snum = 0;
2239 rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, key, cursor, 0, &mpp);
2240 if (rc != MDB_SUCCESS)
2243 assert(IS_LEAF(mpp.mp_page));
2245 top = CURSOR_TOP(cursor);
2247 leaf = mdb_search_node(cursor->mc_txn, cursor->mc_dbi, mpp.mp_page, key, exactp, &top->mp_ki);
2248 if (exactp != NULL && !*exactp) {
2249 /* MDB_SET specified and not an exact match. */
2250 return MDB_NOTFOUND;
2254 DPUTS("===> inexact leaf not found, goto sibling");
2255 if ((rc = mdb_sibling(cursor, 1)) != MDB_SUCCESS)
2256 return rc; /* no entries matched */
2257 top = CURSOR_TOP(cursor);
2259 mpp.mp_page = top->mp_page;
2260 assert(IS_LEAF(mpp.mp_page));
2261 leaf = NODEPTR(mpp.mp_page, 0);
2264 cursor->mc_initialized = 1;
2267 if (IS_LEAF2(mpp.mp_page)) {
2268 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2269 key->mv_data = LEAF2KEY(mpp.mp_page, top->mp_ki, key->mv_size);
2273 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2274 mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mpp.mp_page, leaf);
2277 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2278 if (op == MDB_SET || op == MDB_SET_RANGE) {
2279 rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
2282 if (op == MDB_GET_BOTH) {
2287 rc = mdb_cursor_set(&cursor->mc_xcursor->mx_cursor, data, NULL, MDB_SET_RANGE, ex2p);
2288 if (rc != MDB_SUCCESS)
2291 } else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) {
2293 if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
2295 rc = mdb_dcmp(cursor->mc_txn, cursor->mc_dbi, data, &d2);
2297 if (op == MDB_GET_BOTH || rc > 0)
2298 return MDB_NOTFOUND;
2302 if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
2307 /* The key already matches in all other cases */
2308 if (op == MDB_SET_RANGE)
2309 MDB_SET_KEY(leaf, key);
2310 DPRINTF("==> cursor placed on key [%s]", DKEY(key));
2316 mdb_cursor_first(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
2322 cursor->mc_snum = 0;
2324 rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, NULL, cursor, 0, &mpp);
2325 if (rc != MDB_SUCCESS)
2327 assert(IS_LEAF(mpp.mp_page));
2329 leaf = NODEPTR(mpp.mp_page, 0);
2330 cursor->mc_initialized = 1;
2333 if (IS_LEAF2(mpp.mp_page)) {
2334 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2335 key->mv_data = LEAF2KEY(mpp.mp_page, 0, key->mv_size);
2340 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2341 mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mpp.mp_page, leaf);
2342 rc = mdb_cursor_first(&cursor->mc_xcursor->mx_cursor, data, NULL);
2346 if (cursor->mc_xcursor)
2347 cursor->mc_xcursor->mx_cursor.mc_initialized = 0;
2348 if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
2352 MDB_SET_KEY(leaf, key);
2357 mdb_cursor_last(MDB_cursor *cursor, MDB_val *key, MDB_val *data)
2365 cursor->mc_snum = 0;
2367 lkey.mv_size = MAXKEYSIZE+1;
2368 lkey.mv_data = NULL;
2370 rc = mdb_search_page(cursor->mc_txn, cursor->mc_dbi, &lkey, cursor, 0, &mpp);
2371 if (rc != MDB_SUCCESS)
2373 assert(IS_LEAF(mpp.mp_page));
2375 leaf = NODEPTR(mpp.mp_page, NUMKEYS(mpp.mp_page)-1);
2376 cursor->mc_initialized = 1;
2379 top = CURSOR_TOP(cursor);
2380 top->mp_ki = NUMKEYS(top->mp_page) - 1;
2382 if (IS_LEAF2(mpp.mp_page)) {
2383 key->mv_size = cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_pad;
2384 key->mv_data = LEAF2KEY(mpp.mp_page, top->mp_ki, key->mv_size);
2389 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2390 mdb_xcursor_init1(cursor->mc_txn, cursor->mc_dbi, cursor->mc_xcursor, mpp.mp_page, leaf);
2391 rc = mdb_cursor_last(&cursor->mc_xcursor->mx_cursor, data, NULL);
2395 if ((rc = mdb_read_data(cursor->mc_txn, leaf, data)) != MDB_SUCCESS)
2400 MDB_SET_KEY(leaf, key);
2405 mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
2415 case MDB_GET_BOTH_RANGE:
2416 if (data == NULL || cursor->mc_xcursor == NULL) {
2423 if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2425 } else if (op == MDB_SET_RANGE)
2426 rc = mdb_cursor_set(cursor, key, data, op, NULL);
2428 rc = mdb_cursor_set(cursor, key, data, op, &exact);
2430 case MDB_GET_MULTIPLE:
2432 !(cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPFIXED) ||
2433 !cursor->mc_initialized) {
2438 if (!cursor->mc_xcursor->mx_cursor.mc_initialized || cursor->mc_xcursor->mx_cursor.mc_eof)
2441 case MDB_NEXT_MULTIPLE:
2443 !(cursor->mc_txn->mt_dbs[cursor->mc_dbi].md_flags & MDB_DUPFIXED)) {
2447 if (!cursor->mc_initialized)
2448 rc = mdb_cursor_first(cursor, key, data);
2450 rc = mdb_cursor_next(cursor, key, data, MDB_NEXT_DUP);
2451 if (rc == MDB_SUCCESS) {
2452 if (cursor->mc_xcursor->mx_cursor.mc_initialized) {
2455 top = CURSOR_TOP(&cursor->mc_xcursor->mx_cursor);
2456 data->mv_size = NUMKEYS(top->mp_page) *
2457 cursor->mc_xcursor->mx_txn.mt_dbs[cursor->mc_xcursor->mx_cursor.mc_dbi].md_pad;
2458 data->mv_data = METADATA(top->mp_page);
2459 top->mp_ki = NUMKEYS(top->mp_page)-1;
2467 case MDB_NEXT_NODUP:
2468 if (!cursor->mc_initialized)
2469 rc = mdb_cursor_first(cursor, key, data);
2471 rc = mdb_cursor_next(cursor, key, data, op);
2475 case MDB_PREV_NODUP:
2476 if (!cursor->mc_initialized || cursor->mc_eof)
2477 rc = mdb_cursor_last(cursor, key, data);
2479 rc = mdb_cursor_prev(cursor, key, data, op);
2482 rc = mdb_cursor_first(cursor, key, data);
2485 rc = mdb_cursor_last(cursor, key, data);
2488 DPRINTF("unhandled/unimplemented cursor operation %u", op);
2496 /* Allocate a page and initialize it
2499 mdb_new_page(MDB_txn *txn, MDB_dbi dbi, uint32_t flags, int num)
2503 if ((dp = mdb_alloc_page(txn, dbi, NULL, 0, num)) == NULL)
2505 DPRINTF("allocated new mpage %lu, page size %u",
2506 dp->p.mp_pgno, txn->mt_env->me_psize);
2507 dp->p.mp_flags = flags | P_DIRTY;
2508 dp->p.mp_lower = PAGEHDRSZ;
2509 dp->p.mp_upper = txn->mt_env->me_psize;
2511 if (IS_BRANCH(&dp->p))
2512 txn->mt_dbs[dbi].md_branch_pages++;
2513 else if (IS_LEAF(&dp->p))
2514 txn->mt_dbs[dbi].md_leaf_pages++;
2515 else if (IS_OVERFLOW(&dp->p)) {
2516 txn->mt_dbs[dbi].md_overflow_pages += num;
2517 dp->p.mp_pages = num;
2524 mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data)
2528 sz = LEAFSIZE(key, data);
2529 if (data->mv_size >= env->me_psize / MDB_MINKEYS) {
2530 /* put on overflow page */
2531 sz -= data->mv_size - sizeof(pgno_t);
2534 return sz + sizeof(indx_t);
2538 mdb_branch_size(MDB_env *env, MDB_val *key)
2543 if (sz >= env->me_psize / MDB_MINKEYS) {
2544 /* put on overflow page */
2545 /* not implemented */
2546 /* sz -= key->size - sizeof(pgno_t); */
2549 return sz + sizeof(indx_t);
2553 mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx,
2554 MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags)
2557 size_t node_size = NODESIZE;
2560 MDB_dpage *ofp = NULL; /* overflow page */
2563 assert(mp->mp_upper >= mp->mp_lower);
2565 DPRINTF("add to %s page %lu index %i, data size %zu key size %zu [%s]",
2566 IS_LEAF(mp) ? "leaf" : "branch",
2567 mp->mp_pgno, indx, data ? data->mv_size : 0,
2568 key ? key->mv_size : 0, key ? DKEY(key) : NULL);
2571 /* Move higher keys up one slot. */
2572 int ksize = txn->mt_dbs[dbi].md_pad, dif;
2573 char *ptr = LEAF2KEY(mp, indx, ksize);
2574 dif = NUMKEYS(mp) - indx;
2576 memmove(ptr+ksize, ptr, dif*ksize);
2577 /* insert new key */
2578 memcpy(ptr, key->mv_data, ksize);
2580 /* Just using these for counting */
2581 mp->mp_lower += sizeof(indx_t);
2582 mp->mp_upper -= ksize - sizeof(indx_t);
2587 node_size += key->mv_size;
2591 if (F_ISSET(flags, F_BIGDATA)) {
2592 /* Data already on overflow page. */
2593 node_size += sizeof(pgno_t);
2594 } else if (data->mv_size >= txn->mt_env->me_psize / MDB_MINKEYS) {
2595 int ovpages = OVPAGES(data->mv_size, txn->mt_env->me_psize);
2596 /* Put data on overflow page. */
2597 DPRINTF("data size is %zu, put on overflow page",
2599 node_size += sizeof(pgno_t);
2600 if ((ofp = mdb_new_page(txn, dbi, P_OVERFLOW, ovpages)) == NULL)
2602 DPRINTF("allocated overflow page %lu", ofp->p.mp_pgno);
2605 node_size += data->mv_size;
2609 if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
2610 DPRINTF("not enough room in page %lu, got %u ptrs",
2611 mp->mp_pgno, NUMKEYS(mp));
2612 DPRINTF("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower,
2613 mp->mp_upper - mp->mp_lower);
2614 DPRINTF("node size = %zu", node_size);
2618 /* Move higher pointers up one slot. */
2619 for (i = NUMKEYS(mp); i > indx; i--)
2620 mp->mp_ptrs[i] = mp->mp_ptrs[i - 1];
2622 /* Adjust free space offsets. */
2623 ofs = mp->mp_upper - node_size;
2624 assert(ofs >= mp->mp_lower + sizeof(indx_t));
2625 mp->mp_ptrs[indx] = ofs;
2627 mp->mp_lower += sizeof(indx_t);
2629 /* Write the node data. */
2630 node = NODEPTR(mp, indx);
2631 node->mn_ksize = (key == NULL) ? 0 : key->mv_size;
2632 node->mn_flags = flags;
2634 node->mn_dsize = data->mv_size;
2636 NODEPGNO(node) = pgno;
2639 memcpy(NODEKEY(node), key->mv_data, key->mv_size);
2644 if (F_ISSET(flags, F_BIGDATA))
2645 memcpy(node->mn_data + key->mv_size, data->mv_data,
2648 memcpy(node->mn_data + key->mv_size, data->mv_data,
2651 memcpy(node->mn_data + key->mv_size, &ofp->p.mp_pgno,
2653 memcpy(METADATA(&ofp->p), data->mv_data, data->mv_size);
2661 mdb_del_node(MDB_page *mp, indx_t indx, int ksize)
2664 indx_t i, j, numkeys, ptr;
2668 DPRINTF("delete node %u on %s page %lu", indx,
2669 IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno);
2670 assert(indx < NUMKEYS(mp));
2673 int x = NUMKEYS(mp) - 1 - indx;
2674 base = LEAF2KEY(mp, indx, ksize);
2676 memmove(base, base + ksize, x * ksize);
2677 mp->mp_lower -= sizeof(indx_t);
2678 mp->mp_upper += ksize - sizeof(indx_t);
2682 node = NODEPTR(mp, indx);
2683 sz = NODESIZE + node->mn_ksize;
2685 if (F_ISSET(node->mn_flags, F_BIGDATA))
2686 sz += sizeof(pgno_t);
2688 sz += NODEDSZ(node);
2691 ptr = mp->mp_ptrs[indx];
2692 numkeys = NUMKEYS(mp);
2693 for (i = j = 0; i < numkeys; i++) {
2695 mp->mp_ptrs[j] = mp->mp_ptrs[i];
2696 if (mp->mp_ptrs[i] < ptr)
2697 mp->mp_ptrs[j] += sz;
2702 base = (char *)mp + mp->mp_upper;
2703 memmove(base + sz, base, ptr - mp->mp_upper);
2705 mp->mp_lower -= sizeof(indx_t);
2710 mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
2715 mx->mx_txn.mt_dbxs = mx->mx_dbxs;
2716 mx->mx_txn.mt_dbs = mx->mx_dbs;
2717 mx->mx_dbxs[0] = txn->mt_dbxs[0];
2718 mx->mx_dbxs[1] = txn->mt_dbxs[1];
2720 mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
2725 mx->mx_dbxs[dbn+1].md_parent = dbn;
2726 mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
2727 mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
2728 mx->mx_dbxs[dbn+1].md_dirty = 0;
2729 mx->mx_txn.mt_numdbs = dbn+2;
2731 mx->mx_cursor.mc_snum = 0;
2732 mx->mx_cursor.mc_txn = &mx->mx_txn;
2733 mx->mx_cursor.mc_dbi = dbn+1;
2737 mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_page *mp, MDB_node *node)
2739 MDB_db *db = NODEDATA(node);
2741 mx->mx_dbs[0] = txn->mt_dbs[0];
2742 mx->mx_dbs[1] = txn->mt_dbs[1];
2744 mx->mx_dbs[2] = txn->mt_dbs[dbi];
2749 DPRINTF("Sub-db %u for db %u root page %lu", dbn, dbi, db->md_root);
2750 mx->mx_dbs[dbn] = *db;
2751 if (F_ISSET(mp->mp_flags, P_DIRTY))
2752 mx->mx_dbxs[dbn].md_dirty = 1;
2753 mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node);
2754 mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize;
2755 mx->mx_txn.mt_next_pgno = txn->mt_next_pgno;
2756 mx->mx_txn.mt_u = txn->mt_u;
2757 mx->mx_cursor.mc_initialized = 0;
2758 mx->mx_cursor.mc_eof = 0;
2762 mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
2764 txn->mt_next_pgno = mx->mx_txn.mt_next_pgno;
2765 txn->mt_u = mx->mx_txn.mt_u;
2766 txn->mt_dbs[0] = mx->mx_dbs[0];
2767 txn->mt_dbs[1] = mx->mx_dbs[1];
2768 txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
2769 txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
2771 txn->mt_dbs[dbi] = mx->mx_dbs[2];
2772 txn->mt_dbxs[dbi].md_dirty = mx->mx_dbxs[2].md_dirty;
2777 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
2780 size_t size = sizeof(MDB_cursor);
2782 if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
2785 if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
2786 size += sizeof(MDB_xcursor);
2788 if ((cursor = calloc(1, size)) != NULL) {
2789 cursor->mc_dbi = dbi;
2790 cursor->mc_txn = txn;
2791 if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
2792 MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
2793 cursor->mc_xcursor = mx;
2794 mdb_xcursor_init0(txn, dbi, mx);
2805 /* Return the count of duplicate data items for the current key */
2807 mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
2812 if (mc == NULL || countp == NULL)
2815 if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
2818 top = CURSOR_TOP(mc);
2819 leaf = NODEPTR(top->mp_page, top->mp_ki);
2820 if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2823 if (!mc->mc_xcursor->mx_cursor.mc_initialized)
2826 *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
2832 mdb_cursor_close(MDB_cursor *cursor)
2834 if (cursor != NULL) {
2840 mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
2842 indx_t ptr, i, numkeys;
2849 node = NODEPTR(mp, indx);
2850 ptr = mp->mp_ptrs[indx];
2851 DPRINTF("update key %u (ofs %u) [%.*s] to [%s] on page %lu",
2853 (int)node->mn_ksize, (char *)NODEKEY(node),
2857 delta = key->mv_size - node->mn_ksize;
2859 if (delta > 0 && SIZELEFT(mp) < delta) {
2860 DPRINTF("OUCH! Not enough room, delta = %d", delta);
2864 numkeys = NUMKEYS(mp);
2865 for (i = 0; i < numkeys; i++) {
2866 if (mp->mp_ptrs[i] <= ptr)
2867 mp->mp_ptrs[i] -= delta;
2870 base = (char *)mp + mp->mp_upper;
2871 len = ptr - mp->mp_upper + NODESIZE;
2872 memmove(base - delta, base, len);
2873 mp->mp_upper -= delta;
2875 node = NODEPTR(mp, indx);
2876 node->mn_ksize = key->mv_size;
2879 memcpy(NODEKEY(node), key->mv_data, key->mv_size);
2884 /* Move a node from src to dst.
2887 mdb_move_node(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, indx_t srcindx,
2888 MDB_pageparent *dst, indx_t dstindx)
2895 /* Mark src and dst as dirty. */
2896 if ((rc = mdb_touch(txn, dbi, src)) ||
2897 (rc = mdb_touch(txn, dbi, dst)))
2900 if (IS_LEAF2(src->mp_page)) {
2901 srcnode = NODEPTR(src->mp_page, 0); /* fake */
2902 key.mv_size = txn->mt_dbs[dbi].md_pad;
2903 key.mv_data = LEAF2KEY(src->mp_page, srcindx, key.mv_size);
2905 data.mv_data = NULL;
2907 srcnode = NODEPTR(src->mp_page, srcindx);
2908 key.mv_size = NODEKSZ(srcnode);
2909 key.mv_data = NODEKEY(srcnode);
2910 data.mv_size = NODEDSZ(srcnode);
2911 data.mv_data = NODEDATA(srcnode);
2913 DPRINTF("moving %s node %u [%s] on page %lu to node %u on page %lu",
2914 IS_LEAF(src->mp_page) ? "leaf" : "branch",
2917 src->mp_page->mp_pgno,
2918 dstindx, dst->mp_page->mp_pgno);
2920 /* Add the node to the destination page.
2922 rc = mdb_add_node(txn, dbi, dst->mp_page, dstindx, &key, &data, NODEPGNO(srcnode),
2924 if (rc != MDB_SUCCESS)
2927 /* Delete the node from the source page.
2929 mdb_del_node(src->mp_page, srcindx, key.mv_size);
2931 /* The key value just changed due to del_node, find it again.
2933 if (!IS_LEAF2(src->mp_page)) {
2934 srcnode = NODEPTR(src->mp_page, srcindx);
2935 key.mv_data = NODEKEY(srcnode);
2938 /* Update the parent separators.
2941 if (src->mp_pi != 0) {
2942 DPRINTF("update separator for source page %lu to [%s]",
2943 src->mp_page->mp_pgno, DKEY(&key));
2944 if ((rc = mdb_update_key(src->mp_parent, src->mp_pi,
2945 &key)) != MDB_SUCCESS)
2948 if (IS_BRANCH(src->mp_page)) {
2950 nullkey.mv_size = 0;
2951 assert(mdb_update_key(src->mp_page, 0, &nullkey) == MDB_SUCCESS);
2956 if (dst->mp_pi != 0) {
2957 DPRINTF("update separator for destination page %lu to [%s]",
2958 dst->mp_page->mp_pgno, DKEY(&key));
2959 if ((rc = mdb_update_key(dst->mp_parent, dst->mp_pi,
2960 &key)) != MDB_SUCCESS)
2963 if (IS_BRANCH(dst->mp_page)) {
2965 nullkey.mv_size = 0;
2966 assert(mdb_update_key(dst->mp_page, 0, &nullkey) == MDB_SUCCESS);
2974 mdb_merge(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *src, MDB_pageparent *dst)
2983 DPRINTF("merging page %lu and %lu", src->mp_page->mp_pgno, dst->mp_page->mp_pgno);
2985 assert(txn != NULL);
2986 assert(src->mp_parent); /* can't merge root page */
2987 assert(dst->mp_parent);
2989 /* Mark src and dst as dirty. */
2990 if ((rc = mdb_touch(txn, dbi, src)) ||
2991 (rc = mdb_touch(txn, dbi, dst)))
2994 /* Move all nodes from src to dst.
2996 if (IS_LEAF2(src->mp_page)) {
2997 key.mv_size = txn->mt_dbs[dbi].md_pad;
2998 key.mv_data = METADATA(src->mp_page);
2999 for (i = 0; i < NUMKEYS(src->mp_page); i++) {
3000 rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key,
3002 if (rc != MDB_SUCCESS)
3004 key.mv_data = (char *)key.mv_data + key.mv_size;
3007 for (i = 0; i < NUMKEYS(src->mp_page); i++) {
3008 srcnode = NODEPTR(src->mp_page, i);
3010 key.mv_size = srcnode->mn_ksize;
3011 key.mv_data = NODEKEY(srcnode);
3012 data.mv_size = NODEDSZ(srcnode);
3013 data.mv_data = NODEDATA(srcnode);
3014 rc = mdb_add_node(txn, dbi, dst->mp_page, NUMKEYS(dst->mp_page), &key,
3015 &data, NODEPGNO(srcnode), srcnode->mn_flags);
3016 if (rc != MDB_SUCCESS)
3021 DPRINTF("dst page %lu now has %u keys (%.1f%% filled)",
3022 dst->mp_page->mp_pgno, NUMKEYS(dst->mp_page), (float)PAGEFILL(txn->mt_env, dst->mp_page) / 10);
3024 /* Unlink the src page from parent.
3026 mdb_del_node(src->mp_parent, src->mp_pi, 0);
3027 if (src->mp_pi == 0) {
3029 if ((rc = mdb_update_key(src->mp_parent, 0, &key)) != MDB_SUCCESS)
3033 if (IS_LEAF(src->mp_page))
3034 txn->mt_dbs[dbi].md_leaf_pages--;
3036 txn->mt_dbs[dbi].md_branch_pages--;
3038 mpp.mp_page = src->mp_parent;
3039 dh = (MDB_dhead *)src->mp_parent;
3041 mpp.mp_parent = dh->md_parent;
3042 mpp.mp_pi = dh->md_pi;
3044 return mdb_rebalance(txn, dbi, &mpp);
3047 #define FILL_THRESHOLD 250
3050 mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mpp)
3055 indx_t si = 0, di = 0;
3058 assert(txn != NULL);
3059 assert(mpp != NULL);
3061 DPRINTF("rebalancing %s page %lu (has %u keys, %.1f%% full)",
3062 IS_LEAF(mpp->mp_page) ? "leaf" : "branch",
3063 mpp->mp_page->mp_pgno, NUMKEYS(mpp->mp_page), (float)PAGEFILL(txn->mt_env, mpp->mp_page) / 10);
3065 if (PAGEFILL(txn->mt_env, mpp->mp_page) >= FILL_THRESHOLD) {
3066 DPRINTF("no need to rebalance page %lu, above fill threshold",
3067 mpp->mp_page->mp_pgno);
3071 if (mpp->mp_parent == NULL) {
3072 if (NUMKEYS(mpp->mp_page) == 0) {
3073 DPUTS("tree is completely empty");
3074 txn->mt_dbs[dbi].md_root = P_INVALID;
3075 txn->mt_dbs[dbi].md_depth = 0;
3076 txn->mt_dbs[dbi].md_leaf_pages = 0;
3077 } else if (IS_BRANCH(mpp->mp_page) && NUMKEYS(mpp->mp_page) == 1) {
3078 DPUTS("collapsing root page!");
3079 txn->mt_dbs[dbi].md_root = NODEPGNO(NODEPTR(mpp->mp_page, 0));
3080 if ((rc = mdb_get_page(txn, txn->mt_dbs[dbi].md_root, &root)))
3082 txn->mt_dbs[dbi].md_depth--;
3083 txn->mt_dbs[dbi].md_branch_pages--;
3085 DPUTS("root page doesn't need rebalancing");
3089 /* The parent (branch page) must have at least 2 pointers,
3090 * otherwise the tree is invalid.
3092 assert(NUMKEYS(mpp->mp_parent) > 1);
3094 /* Leaf page fill factor is below the threshold.
3095 * Try to move keys from left or right neighbor, or
3096 * merge with a neighbor page.
3101 if (mpp->mp_pi == 0) {
3102 /* We're the leftmost leaf in our parent.
3104 DPUTS("reading right neighbor");
3105 node = NODEPTR(mpp->mp_parent, mpp->mp_pi + 1);
3106 if ((rc = mdb_get_page(txn, NODEPGNO(node), &npp.mp_page)))
3108 npp.mp_pi = mpp->mp_pi + 1;
3110 di = NUMKEYS(mpp->mp_page);
3112 /* There is at least one neighbor to the left.
3114 DPUTS("reading left neighbor");
3115 node = NODEPTR(mpp->mp_parent, mpp->mp_pi - 1);
3116 if ((rc = mdb_get_page(txn, NODEPGNO(node), &npp.mp_page)))
3118 npp.mp_pi = mpp->mp_pi - 1;
3119 si = NUMKEYS(npp.mp_page) - 1;
3122 npp.mp_parent = mpp->mp_parent;
3124 DPRINTF("found neighbor page %lu (%u keys, %.1f%% full)",
3125 npp.mp_page->mp_pgno, NUMKEYS(npp.mp_page), (float)PAGEFILL(txn->mt_env, npp.mp_page) / 10);
3127 /* If the neighbor page is above threshold and has at least two
3128 * keys, move one key from it.
3130 * Otherwise we should try to merge them.
3132 if (PAGEFILL(txn->mt_env, npp.mp_page) >= FILL_THRESHOLD && NUMKEYS(npp.mp_page) >= 2)
3133 return mdb_move_node(txn, dbi, &npp, si, mpp, di);
3134 else { /* FIXME: if (has_enough_room()) */
3135 if (mpp->mp_pi == 0)
3136 return mdb_merge(txn, dbi, &npp, mpp);
3138 return mdb_merge(txn, dbi, mpp, &npp);
3143 mdb_del0(MDB_txn *txn, MDB_dbi dbi, unsigned int ki, MDB_pageparent *mpp, MDB_node *leaf)
3147 /* add overflow pages to free list */
3148 if (!IS_LEAF2(mpp->mp_page) && F_ISSET(leaf->mn_flags, F_BIGDATA)) {
3152 memcpy(&pg, NODEDATA(leaf), sizeof(pg));
3153 ovpages = OVPAGES(NODEDSZ(leaf), txn->mt_env->me_psize);
3154 for (i=0; i<ovpages; i++) {
3155 DPRINTF("freed ov page %lu", pg);
3156 mdb_midl_insert(txn->mt_free_pgs, pg);
3160 mdb_del_node(mpp->mp_page, ki, txn->mt_dbs[dbi].md_pad);
3161 txn->mt_dbs[dbi].md_entries--;
3162 rc = mdb_rebalance(txn, dbi, mpp);
3163 if (rc != MDB_SUCCESS)
3164 txn->mt_flags |= MDB_TXN_ERROR;
3170 mdb_del(MDB_txn *txn, MDB_dbi dbi,
3171 MDB_val *key, MDB_val *data)
3179 assert(key != NULL);
3181 DPRINTF("====> delete db %u key [%s]", dbi, DKEY(key));
3183 if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3186 if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
3190 if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
3194 mpp.mp_parent = NULL;
3196 if ((rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp)) != MDB_SUCCESS)
3199 leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
3200 if (leaf == NULL || !exact) {
3201 return MDB_NOTFOUND;
3204 if (!IS_LEAF2(mpp.mp_page) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
3208 mdb_xcursor_init0(txn, dbi, &mx);
3209 mdb_xcursor_init1(txn, dbi, &mx, mpp.mp_page, leaf);
3211 rc = mdb_del(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, NULL);
3212 mdb_xcursor_fini(txn, dbi, &mx);
3213 /* If sub-DB still has entries, we're done */
3214 if (mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root != P_INVALID) {
3215 memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
3217 txn->mt_dbs[dbi].md_entries--;
3220 /* otherwise fall thru and delete the sub-DB */
3222 /* add all the child DB's pages to the free list */
3223 rc = mdb_search_page(&mx.mx_txn, mx.mx_cursor.mc_dbi,
3224 NULL, &mx.mx_cursor, 0, &mp2);
3225 if (rc == MDB_SUCCESS) {
3226 MDB_ppage *top, *parent;
3230 cursor_pop_page(&mx.mx_cursor);
3231 if (mx.mx_cursor.mc_snum) {
3232 top = CURSOR_TOP(&mx.mx_cursor);
3233 while (mx.mx_cursor.mc_snum > 1) {
3234 parent = CURSOR_PARENT(&mx.mx_cursor);
3235 for (i=0; i<NUMKEYS(top->mp_page); i++) {
3236 ni = NODEPTR(top->mp_page, i);
3237 mdb_midl_insert(txn->mt_free_pgs, NODEPGNO(ni));
3240 if (parent->mp_ki >= NUMKEYS(parent->mp_page)) {
3241 cursor_pop_page(&mx.mx_cursor);
3244 ni = NODEPTR(parent->mp_page, parent->mp_ki);
3245 rc = mdb_get_page(&mx.mx_txn, NODEPGNO(ni), &top->mp_page);
3249 mdb_midl_insert(txn->mt_free_pgs, mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi].md_root);
3254 return mdb_del0(txn, dbi, ki, &mpp, leaf);
3257 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
3258 * right sibling, at index <*newindxp> (as if unsplit). Updates *mpp and
3259 * *newindxp with the actual values after split, ie if *mpp and *newindxp
3260 * refer to a node in the new right sibling page.
3263 mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
3264 MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
3267 int rc = MDB_SUCCESS, ins_new = 0;
3270 unsigned int i, j, split_indx, nkeys, pmax;
3272 MDB_val sepkey, rkey, rdata;
3273 MDB_page *copy, *cptr;
3274 MDB_dpage *mdp, *rdp, *pdp;
3278 assert(txn != NULL);
3280 dh = ((MDB_dhead *)*mpp) - 1;
3281 mdp = (MDB_dpage *)dh;
3282 newindx = *newindxp;
3284 DPRINTF("-----> splitting %s page %lu and adding [%s] at index %i",
3285 IS_LEAF(&mdp->p) ? "leaf" : "branch", mdp->p.mp_pgno,
3286 DKEY(newkey), *newindxp);
3288 if (mdp->h.md_parent == NULL) {
3289 if ((pdp = mdb_new_page(txn, dbi, P_BRANCH, 1)) == NULL)
3292 mdp->h.md_parent = &pdp->p;
3293 txn->mt_dbs[dbi].md_root = pdp->p.mp_pgno;
3294 DPRINTF("root split! new root = %lu", pdp->p.mp_pgno);
3295 txn->mt_dbs[dbi].md_depth++;
3297 /* Add left (implicit) pointer. */
3298 if ((rc = mdb_add_node(txn, dbi, &pdp->p, 0, NULL, NULL,
3299 mdp->p.mp_pgno, 0)) != MDB_SUCCESS)
3302 DPRINTF("parent branch page is %lu", mdp->h.md_parent->mp_pgno);
3305 /* Create a right sibling. */
3306 if ((rdp = mdb_new_page(txn, dbi, mdp->p.mp_flags, 1)) == NULL)
3308 rdp->h.md_parent = mdp->h.md_parent;
3309 rdp->h.md_pi = mdp->h.md_pi + 1;
3310 DPRINTF("new right sibling: page %lu", rdp->p.mp_pgno);
3312 nkeys = NUMKEYS(&mdp->p);
3313 split_indx = nkeys / 2 + 1;
3315 if (IS_LEAF2(&rdp->p)) {
3318 unsigned int lsize, rsize, ksize;
3319 /* Move half of the keys to the right sibling */
3321 x = *newindxp - split_indx;
3322 ksize = txn->mt_dbs[dbi].md_pad;
3323 split = LEAF2KEY(&mdp->p, split_indx, ksize);
3324 rsize = (nkeys - split_indx) * ksize;
3325 lsize = (nkeys - split_indx) * sizeof(indx_t);
3326 mdp->p.mp_lower -= lsize;
3327 rdp->p.mp_lower += lsize;
3328 mdp->p.mp_upper += rsize - lsize;
3329 rdp->p.mp_upper -= rsize - lsize;
3330 sepkey.mv_size = ksize;
3331 if (newindx == split_indx) {
3332 sepkey.mv_data = newkey->mv_data;
3334 sepkey.mv_data = split;
3337 ins = LEAF2KEY(&mdp->p, *newindxp, ksize);
3338 memcpy(&rdp->p.mp_ptrs, split, rsize);
3339 sepkey.mv_data = &rdp->p.mp_ptrs;
3340 memmove(ins+ksize, ins, (split_indx - *newindxp) * ksize);
3341 memcpy(ins, newkey->mv_data, ksize);
3342 mdp->p.mp_lower += sizeof(indx_t);
3343 mdp->p.mp_upper -= ksize - sizeof(indx_t);
3346 memcpy(&rdp->p.mp_ptrs, split, x * ksize);
3347 ins = LEAF2KEY(&rdp->p, x, ksize);
3348 memcpy(ins, newkey->mv_data, ksize);
3349 memcpy(ins+ksize, split + x * ksize, rsize - x * ksize);
3350 rdp->p.mp_lower += sizeof(indx_t);
3351 rdp->p.mp_upper -= ksize - sizeof(indx_t);
3358 /* For leaf pages, check the split point based on what
3359 * fits where, since otherwise add_node can fail.
3361 if (IS_LEAF(&mdp->p)) {
3362 unsigned int psize, nsize;
3363 /* Maximum free space in an empty page */
3364 pmax = txn->mt_env->me_psize - PAGEHDRSZ;
3365 nsize = mdb_leaf_size(txn->mt_env, newkey, newdata);
3366 if (newindx <= split_indx) {
3369 for (i=0; i<split_indx; i++) {
3370 node = NODEPTR(&mdp->p, i);
3371 psize += NODESIZE + NODEKSZ(node);
3372 if (F_ISSET(node->mn_flags, F_BIGDATA))
3373 psize += sizeof(pgno_t);
3375 psize += NODEDSZ(node);
3384 for (i=split_indx; i<nkeys; i++) {
3385 node = NODEPTR(&mdp->p, i);
3386 psize += NODESIZE + NODEKSZ(node);
3387 if (F_ISSET(node->mn_flags, F_BIGDATA))
3388 psize += sizeof(pgno_t);
3390 psize += NODEDSZ(node);
3399 /* First find the separating key between the split pages.
3401 memset(&sepkey, 0, sizeof(sepkey));
3402 if (newindx == split_indx) {
3403 sepkey.mv_size = newkey->mv_size;
3404 sepkey.mv_data = newkey->mv_data;
3406 node = NODEPTR(&mdp->p, split_indx);
3407 sepkey.mv_size = node->mn_ksize;
3408 sepkey.mv_data = NODEKEY(node);
3412 DPRINTF("separator is [%s]", DKEY(&sepkey));
3414 /* Copy separator key to the parent.
3416 if (SIZELEFT(rdp->h.md_parent) < mdb_branch_size(txn->mt_env, &sepkey)) {
3417 rc = mdb_split(txn, dbi, &rdp->h.md_parent, &rdp->h.md_pi,
3418 &sepkey, NULL, rdp->p.mp_pgno);
3420 /* Right page might now have changed parent.
3421 * Check if left page also changed parent.
3423 if (rdp->h.md_parent != mdp->h.md_parent &&
3424 mdp->h.md_pi >= NUMKEYS(mdp->h.md_parent)) {
3425 mdp->h.md_parent = rdp->h.md_parent;
3426 mdp->h.md_pi = rdp->h.md_pi - 1;
3429 rc = mdb_add_node(txn, dbi, rdp->h.md_parent, rdp->h.md_pi,
3430 &sepkey, NULL, rdp->p.mp_pgno, 0);
3432 if (IS_LEAF2(&rdp->p)) {
3435 if (rc != MDB_SUCCESS) {
3439 /* Move half of the keys to the right sibling. */
3440 if ((copy = malloc(txn->mt_env->me_psize)) == NULL)
3443 copy->mp_pgno = mdp->p.mp_pgno;
3444 copy->mp_flags = mdp->p.mp_flags;
3445 copy->mp_lower = PAGEHDRSZ;
3446 copy->mp_upper = txn->mt_env->me_psize;
3448 for (i = j = 0; i <= nkeys; j++) {
3449 if (i == split_indx) {
3450 /* Insert in right sibling. */
3451 /* Reset insert index for right sibling. */
3452 j = (i == newindx && ins_new);
3456 if (i == newindx && !ins_new) {
3457 /* Insert the original entry that caused the split. */
3458 rkey.mv_data = newkey->mv_data;
3459 rkey.mv_size = newkey->mv_size;
3460 if (IS_LEAF(&mdp->p)) {
3461 rdata.mv_data = newdata->mv_data;
3462 rdata.mv_size = newdata->mv_size;
3469 /* Update page and index for the new key. */
3471 if (cptr == &rdp->p)
3473 } else if (i == nkeys) {
3476 node = NODEPTR(&mdp->p, i);
3477 rkey.mv_data = NODEKEY(node);
3478 rkey.mv_size = node->mn_ksize;
3479 if (IS_LEAF(&mdp->p)) {
3480 rdata.mv_data = NODEDATA(node);
3481 rdata.mv_size = node->mn_dsize;
3483 pgno = NODEPGNO(node);
3484 flags = node->mn_flags;
3489 if (!IS_LEAF(&mdp->p) && j == 0) {
3490 /* First branch index doesn't need key data. */
3494 rc = mdb_add_node(txn, dbi, cptr, j, &rkey, &rdata, pgno, flags);
3496 nkeys = NUMKEYS(copy);
3497 for (i=0; i<nkeys; i++)
3498 mdp->p.mp_ptrs[i] = copy->mp_ptrs[i];
3499 mdp->p.mp_lower = copy->mp_lower;
3500 mdp->p.mp_upper = copy->mp_upper;
3501 memcpy(NODEPTR(&mdp->p, nkeys-1), NODEPTR(copy, nkeys-1),
3502 txn->mt_env->me_psize - copy->mp_upper);
3509 mdb_put0(MDB_txn *txn, MDB_dbi dbi,
3510 MDB_val *key, MDB_val *data, unsigned int flags)
3512 int rc = MDB_SUCCESS, exact;
3516 MDB_val xdata, *rdata, dkey;
3518 char dbuf[PAGESIZE];
3523 DPRINTF("==> put db %u key [%s], size %zu, data size %zu",
3524 dbi, DKEY(key), key->mv_size, data->mv_size);
3527 mpp.mp_parent = NULL;
3529 rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
3530 if (rc == MDB_SUCCESS) {
3531 leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
3532 if (leaf && exact) {
3533 if (flags == MDB_NOOVERWRITE) {
3534 DPRINTF("duplicate key [%s]", DKEY(key));
3535 return MDB_KEYEXIST;
3537 /* there's only a key anyway, so this is a no-op */
3538 if (IS_LEAF2(mpp.mp_page))
3541 if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
3542 /* Was a single item before, must convert now */
3543 if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
3544 dkey.mv_size = NODEDSZ(leaf);
3545 dkey.mv_data = dbuf;
3546 memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
3547 /* data matches, ignore it */
3548 if (!mdb_dcmp(txn, dbi, data, &dkey))
3549 return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
3550 memset(&dummy, 0, sizeof(dummy));
3551 if (txn->mt_dbs[dbi].md_flags & MDB_DUPFIXED) {
3552 dummy.md_pad = data->mv_size;
3553 dummy.md_flags = MDB_DUPFIXED;
3554 if (txn->mt_dbs[dbi].md_flags & MDB_INTEGERDUP)
3555 dummy.md_flags |= MDB_INTEGERKEY;
3557 dummy.md_root = P_INVALID;
3558 if (dkey.mv_size == sizeof(MDB_db)) {
3559 memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
3562 mdb_del_node(mpp.mp_page, ki, 0);
3565 xdata.mv_size = sizeof(MDB_db);
3566 xdata.mv_data = &dummy;
3571 /* same size, just replace it */
3572 if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
3573 NODEDSZ(leaf) == data->mv_size) {
3574 memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
3577 mdb_del_node(mpp.mp_page, ki, 0);
3579 if (leaf == NULL) { /* append if not found */
3580 ki = NUMKEYS(mpp.mp_page);
3581 DPRINTF("appending key at index %i", ki);
3583 } else if (rc == MDB_NOTFOUND) {
3585 /* new file, just write a root leaf page */
3586 DPUTS("allocating new root leaf page");
3587 if ((dp = mdb_new_page(txn, dbi, P_LEAF, 1)) == NULL) {
3590 mpp.mp_page = &dp->p;
3591 txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno;
3592 txn->mt_dbs[dbi].md_depth++;
3593 txn->mt_dbxs[dbi].md_dirty = 1;
3594 if ((txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED)) == MDB_DUPFIXED)
3595 mpp.mp_page->mp_flags |= P_LEAF2;
3601 assert(IS_LEAF(mpp.mp_page));
3602 DPRINTF("there are %u keys, should insert new key at index %i",
3603 NUMKEYS(mpp.mp_page), ki);
3608 nsize = IS_LEAF2(mpp.mp_page) ? key->mv_size : mdb_leaf_size(txn->mt_env, key, rdata);
3609 if (SIZELEFT(mpp.mp_page) < nsize) {
3610 rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID);
3612 /* There is room already in this leaf page. */
3613 rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, rdata, 0, 0);
3616 if (rc != MDB_SUCCESS)
3617 txn->mt_flags |= MDB_TXN_ERROR;
3619 /* Remember if we just added a subdatabase */
3620 if (flags & F_SUBDATA) {
3621 leaf = NODEPTR(mpp.mp_page, ki);
3622 leaf->mn_flags |= F_SUBDATA;
3625 /* Now store the actual data in the child DB. Note that we're
3626 * storing the user data in the keys field, so there are strict
3627 * size limits on dupdata. The actual data fields of the child
3628 * DB are all zero size.
3633 leaf = NODEPTR(mpp.mp_page, ki);
3635 mdb_xcursor_init0(txn, dbi, &mx);
3636 mdb_xcursor_init1(txn, dbi, &mx, mpp.mp_page, leaf);
3639 if (flags == MDB_NODUPDATA)
3640 flags = MDB_NOOVERWRITE;
3641 /* converted, write the original data first */
3643 rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags);
3645 leaf->mn_flags |= F_DUPDATA;
3647 rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags);
3648 mdb_xcursor_fini(txn, dbi, &mx);
3649 memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
3652 txn->mt_dbs[dbi].md_entries++;
3660 mdb_put(MDB_txn *txn, MDB_dbi dbi,
3661 MDB_val *key, MDB_val *data, unsigned int flags)
3663 assert(key != NULL);
3664 assert(data != NULL);
3666 if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3669 if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
3673 if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
3677 if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA)) != flags)
3680 return mdb_put0(txn, dbi, key, data, flags);
3684 mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff)
3686 #define CHANGEABLE (MDB_NOSYNC)
3687 if ((flag & CHANGEABLE) != flag)
3690 env->me_flags |= flag;
3692 env->me_flags &= ~flag;
3697 mdb_env_get_flags(MDB_env *env, unsigned int *arg)
3702 *arg = env->me_flags;
3707 mdb_env_get_path(MDB_env *env, const char **arg)
3712 *arg = env->me_path;
3717 mdb_stat0(MDB_env *env, MDB_db *db, MDB_stat *arg)
3719 arg->ms_psize = env->me_psize;
3720 arg->ms_depth = db->md_depth;
3721 arg->ms_branch_pages = db->md_branch_pages;
3722 arg->ms_leaf_pages = db->md_leaf_pages;
3723 arg->ms_overflow_pages = db->md_overflow_pages;
3724 arg->ms_entries = db->md_entries;
3729 mdb_env_stat(MDB_env *env, MDB_stat *arg)
3733 if (env == NULL || arg == NULL)
3736 mdb_env_read_meta(env, &toggle);
3738 return mdb_stat0(env, &env->me_metas[toggle]->mm_dbs[MAIN_DBI], arg);
3741 int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
3751 if (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY))
3752 txn->mt_dbs[MAIN_DBI].md_flags |= (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY));
3756 /* Is the DB already open? */
3758 for (i=2; i<txn->mt_numdbs; i++) {
3759 if (len == txn->mt_dbxs[i].md_name.mv_size &&
3760 !strncmp(name, txn->mt_dbxs[i].md_name.mv_data, len)) {
3766 if (txn->mt_numdbs >= txn->mt_env->me_maxdbs - 1)
3769 /* Find the DB info */
3771 key.mv_data = (void *)name;
3772 rc = mdb_get(txn, MAIN_DBI, &key, &data);
3774 /* Create if requested */
3775 if (rc == MDB_NOTFOUND && (flags & MDB_CREATE)) {
3777 data.mv_size = sizeof(MDB_db);
3778 data.mv_data = &dummy;
3779 memset(&dummy, 0, sizeof(dummy));
3780 dummy.md_root = P_INVALID;
3781 dummy.md_flags = flags & 0xffff;
3782 rc = mdb_put0(txn, MAIN_DBI, &key, &data, F_SUBDATA);
3786 /* OK, got info, add to table */
3787 if (rc == MDB_SUCCESS) {
3788 txn->mt_dbxs[txn->mt_numdbs].md_name.mv_data = strdup(name);
3789 txn->mt_dbxs[txn->mt_numdbs].md_name.mv_size = len;
3790 txn->mt_dbxs[txn->mt_numdbs].md_cmp = NULL;
3791 txn->mt_dbxs[txn->mt_numdbs].md_dcmp = NULL;
3792 txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL;
3793 txn->mt_dbxs[txn->mt_numdbs].md_parent = MAIN_DBI;
3794 txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty;
3795 memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
3796 *dbi = txn->mt_numdbs;
3797 txn->mt_env->me_dbs[0][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
3798 txn->mt_env->me_dbs[1][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
3805 int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *arg)
3807 if (txn == NULL || arg == NULL || dbi >= txn->mt_numdbs)
3810 return mdb_stat0(txn->mt_env, &txn->mt_dbs[dbi], arg);
3813 void mdb_close(MDB_txn *txn, MDB_dbi dbi)
3816 if (dbi <= MAIN_DBI || dbi >= txn->mt_numdbs)
3818 ptr = txn->mt_dbxs[dbi].md_name.mv_data;
3819 txn->mt_dbxs[dbi].md_name.mv_data = NULL;
3820 txn->mt_dbxs[dbi].md_name.mv_size = 0;
3824 int mdb_set_compare(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
3826 if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3829 txn->mt_dbxs[dbi].md_cmp = cmp;
3833 int mdb_set_dupsort(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
3835 if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3838 txn->mt_dbxs[dbi].md_dcmp = cmp;
3842 int mdb_set_relfunc(MDB_txn *txn, MDB_dbi dbi, MDB_rel_func *rel)
3844 if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
3847 txn->mt_dbxs[dbi].md_rel = rel;