]> git.sur5r.net Git - openldap/blob - libraries/libmdb/mdb.c
More docs
[openldap] / libraries / libmdb / mdb.c
1 /** @file mdb.c
2  *      @brief memory-mapped database library
3  *
4  *      A Btree-based database management library modeled loosely on the
5  *      BerkeleyDB API, but much simplified.
6  */
7 /*
8  * Copyright 2011 Howard Chu, Symas Corp.
9  * All rights reserved.
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted only as authorized by the OpenLDAP
13  * Public License.
14  *
15  * A copy of this license is available in the file LICENSE in the
16  * top-level directory of the distribution or, alternatively, at
17  * <http://www.OpenLDAP.org/license.html>.
18  *
19  * This code is derived from btree.c written by Martin Hedenfalk.
20  *
21  * Copyright (c) 2009, 2010 Martin Hedenfalk <martin@bzero.se>
22  *
23  * Permission to use, copy, modify, and distribute this software for any
24  * purpose with or without fee is hereby granted, provided that the above
25  * copyright notice and this permission notice appear in all copies.
26  *
27  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
28  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
29  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
30  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
31  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
32  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
33  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
34  */
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/param.h>
38 #ifdef _WIN32
39 #include <windows.h>
40 #else
41 #include <sys/uio.h>
42 #include <sys/mman.h>
43 #ifdef HAVE_SYS_FILE_H
44 #include <sys/file.h>
45 #endif
46 #include <fcntl.h>
47 #endif
48
49 #include <assert.h>
50 #include <errno.h>
51 #include <stddef.h>
52 #include <stdint.h>
53 #include <stdio.h>
54 #include <stdlib.h>
55 #include <string.h>
56 #include <time.h>
57 #include <unistd.h>
58
59 #ifndef _WIN32
60 #include <pthread.h>
61 #endif
62
63 #include "mdb.h"
64 #include "midl.h"
65
66 /** @defgroup internal  MDB Internals
67  *      @{
68  */
69 /** @defgroup compat    Windows Compatibility Macros
70  *      A bunch of macros to minimize the amount of platform-specific ifdefs
71  *      needed throughout the rest of the code. When the features this library
72  *      needs are similar enough to POSIX to be hidden in a one-or-two line
73  *      replacement, this macro approach is used.
74  *      @{
75  */
76 #ifdef _WIN32
77 #define pthread_t       DWORD
78 #define pthread_mutex_t HANDLE
79 #define pthread_key_t   DWORD
80 #define pthread_self()  GetCurrentThreadId()
81 #define pthread_key_create(x,y) *(x) = TlsAlloc()
82 #define pthread_key_delete(x)   TlsFree(x)
83 #define pthread_getspecific(x)  TlsGetValue(x)
84 #define pthread_setspecific(x,y)        TlsSetValue(x,y)
85 #define pthread_mutex_unlock(x) ReleaseMutex(x)
86 #define pthread_mutex_lock(x)   WaitForSingleObject(x, INFINITE)
87 #define LOCK_MUTEX_R(env)       pthread_mutex_lock(env->me_rmutex)
88 #define UNLOCK_MUTEX_R(env)     pthread_mutex_unlock(env->me_rmutex)
89 #define LOCK_MUTEX_W(env)       pthread_mutex_lock(env->me_wmutex)
90 #define UNLOCK_MUTEX_W(env)     pthread_mutex_unlock(env->me_wmutex)
91 #define getpid()        GetCurrentProcessId()
92 #define fdatasync(fd)   !FlushFileBuffers(fd)
93 #define ErrCode()       GetLastError()
94 #define GetPageSize(x)  {SYSTEM_INFO si; GetSystemInfo(&si); (x) = si.dwPageSize;}
95 #define close(fd)       CloseHandle(fd)
96 #define munmap(ptr,len) UnmapViewOfFile(ptr)
97 #else
98         /** Lock the reader mutex.
99          */
100 #define LOCK_MUTEX_R(env)       pthread_mutex_lock(&env->me_txns->mti_mutex)
101         /** Unlock the reader mutex.
102          */
103 #define UNLOCK_MUTEX_R(env)     pthread_mutex_unlock(&env->me_txns->mti_mutex)
104
105         /** Lock the writer mutex.
106          *      Only a single write transaction is allowed at a time. Other writers
107          *      will block waiting for this mutex.
108          */
109 #define LOCK_MUTEX_W(env)       pthread_mutex_lock(&env->me_txns->mti_wmutex)
110         /** Unlock the writer mutex.
111          */
112 #define UNLOCK_MUTEX_W(env)     pthread_mutex_unlock(&env->me_txns->mti_wmutex)
113
114         /** Get the error code for the last failed system function.
115          */
116 #define ErrCode()       errno
117
118         /** An abstraction for a file handle.
119          *      On POSIX systems file handles are small integers. On Windows
120          *      they're opaque pointers.
121          */
122 #define HANDLE  int
123
124         /**     A value for an invalid file handle.
125          *      Mainly used to initialize file variables and signify that they are
126          *      unused.
127          */
128 #define INVALID_HANDLE_VALUE    -1
129
130         /** Get the size of a memory page for the system.
131          *      This is the basic size that the platform's memory manager uses, and is
132          *      fundamental to the use of memory-mapped files.
133          */
134 #define GetPageSize(x)  (x) = sysconf(_SC_PAGE_SIZE)
135 #endif
136
137 /** @} */
138
139 #ifndef _WIN32
140 /**     A flag for opening a file and requesting synchronous data writes.
141  *      This is only used when writing a meta page. It's not strictly needed;
142  *      we could just do a normal write and then immediately perform a flush.
143  *      But if this flag is available it saves us an extra system call.
144  *
145  *      @note If O_DSYNC is undefined but exists in /usr/include,
146  * preferably set some compiler flag to get the definition.
147  * Otherwise compile with the less efficient -DMDB_DSYNC=O_SYNC.
148  */
149 #ifndef MDB_DSYNC
150 # define MDB_DSYNC      O_DSYNC
151 #endif
152 #endif
153
154         /** A page number in the database.
155          *      Note that 64 bit page numbers are overkill, since pages themselves
156          *      already represent 12-13 bits of addressable memory, and the OS will
157          *      always limit applications to a maximum of 63 bits of address space.
158          *
159          *      @note In the #MDB_node structure, we only store 48 bits of this value,
160          *      which thus limits us to only 60 bits of addressable data.
161          */
162 typedef ULONG           pgno_t;
163
164 /** @defgroup debug     Debug Macros
165  *      @{
166  */
167 #ifndef DEBUG
168         /**     Enable debug output.
169          *      Set this to 1 for copious tracing. Set to 2 to add dumps of all IDLs
170          *      read from and written to the database (used for free space management).
171          */
172 #define DEBUG 0
173 #endif
174
175 #if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
176 # define DPRINTF        (void)  /* Vararg macros may be unsupported */
177 #elif DEBUG
178         /**     Print a debug message with printf formatting. */
179 # define DPRINTF(fmt, ...)      /**< Requires 2 or more args */ \
180         fprintf(stderr, "%s:%d:(%p) " fmt "\n", __func__, __LINE__, pthread_self(), __VA_ARGS__)
181 #else
182 # define DPRINTF(fmt, ...)      ((void) 0)
183 #endif
184         /**     Print a debug string.
185          *      The string is printed literally, with no format processing.
186          */
187 #define DPUTS(arg)      DPRINTF("%s", arg)
188 /** @} */
189
190         /** A default memory page size.
191          *      The actual size is platform-dependent, but we use this for
192          *      boot-strapping. We probably should not be using this any more.
193          *      The #GetPageSize() macro is used to get the actual size.
194          *
195          *      Note that we don't currently support Huge pages. On Linux,
196          *      regular data files cannot use Huge pages, and in general
197          *      Huge pages aren't actually pageable. We rely on the OS
198          *      demand-pager to read our data and page it out when memory
199          *      pressure from other processes is high. So until OSs have
200          *      actual paging support for Huge pages, they're not viable.
201          */
202 #define PAGESIZE         4096
203
204         /** The minimum number of keys required in a database page.
205          *      Setting this to a larger value will place a smaller bound on the
206          *      maximum size of a data item. Data items larger than this size will
207          *      be pushed into overflow pages instead of being stored directly in
208          *      the B-tree node. This value used to default to 4. With a page size
209          *      of 4096 bytes that meant that any item larger than 1024 bytes would
210          *      go into an overflow page. That also meant that on average 2-3KB of
211          *      each overflow page was wasted space. The value cannot be lower than
212          *      2 because then there would no longer be a tree structure. With this
213          *      value, items larger than 2KB will go into overflow pages, and on
214          *      average only 1KB will be wasted.
215          */
216 #define MDB_MINKEYS      2
217
218         /**     A stamp that identifies a file as an MDB file.
219          *      There's nothing special about this value other than that it is easily
220          *      recognizable, and it will reflect any byte order mismatches.
221          */
222 #define MDB_MAGIC        0xBEEFC0DE
223
224         /**     The version number for a database's file format. */
225 #define MDB_VERSION      1
226
227         /**     The maximum size of a key in the database.
228          *      While data items have essentially unbounded size, we require that
229          *      keys all fit onto a regular page. This limit could be raised a bit
230          *      further if needed; to something just under #PAGESIZE / #MDB_MINKEYS.
231          */
232 #define MAXKEYSIZE       511
233
234 #if DEBUG
235         /**     A key buffer.
236          *      @ingroup debug
237          *      This is used for printing a hex dump of a key's contents.
238          */
239 #define DKBUF   char kbuf[(MAXKEYSIZE*2+1)]
240         /**     Display a key in hex.
241          *      @ingroup debug
242          *      Invoke a function to display a key in hex.
243          */
244 #define DKEY(x) mdb_dkey(x, kbuf)
245 #else
246 #define DKBUF
247 #define DKEY(x)
248 #endif
249
250 /**     @defgroup lazylock      Lazy Locking
251  *      Macros for locks that are't actually needed.
252  *      The DB view is always consistent because all writes are wrapped in
253  *      the wmutex. Finer-grained locks aren't necessary.
254  *      @{
255  */
256 #ifndef LAZY_LOCKS
257         /**     Use lazy locking. I.e., don't lock these accesses at all. */
258 #define LAZY_LOCKS      1
259 #endif
260 #if     LAZY_LOCKS
261         /** Grab the reader lock */
262 #define LAZY_MUTEX_LOCK(x)
263         /** Release the reader lock */
264 #define LAZY_MUTEX_UNLOCK(x)
265         /** Release the DB table reader/writer lock */
266 #define LAZY_RWLOCK_UNLOCK(x)
267         /** Grab the DB table write lock */
268 #define LAZY_RWLOCK_WRLOCK(x)
269         /** Grab the DB table read lock */
270 #define LAZY_RWLOCK_RDLOCK(x)
271         /** Declare the DB table rwlock */
272 #define LAZY_RWLOCK_DEF(x)
273         /** Initialize the DB table rwlock */
274 #define LAZY_RWLOCK_INIT(x,y)
275         /**     Destroy the DB table rwlock */
276 #define LAZY_RWLOCK_DESTROY(x)
277 #else
278 #define LAZY_MUTEX_LOCK(x)              pthread_mutex_lock(x)
279 #define LAZY_MUTEX_UNLOCK(x)    pthread_mutex_unlock(x)
280 #define LAZY_RWLOCK_UNLOCK(x)   pthread_rwlock_unlock(x)
281 #define LAZY_RWLOCK_WRLOCK(x)   pthread_rwlock_wrlock(x)
282 #define LAZY_RWLOCK_RDLOCK(x)   pthread_rwlock_rdlock(x)
283 #define LAZY_RWLOCK_DEF(x)              pthread_rwlock_t        x
284 #define LAZY_RWLOCK_INIT(x,y)   pthread_rwlock_init(x,y)
285 #define LAZY_RWLOCK_DESTROY(x)  pthread_rwlock_destroy(x)
286 #endif
287 /** @} */
288
289         /** An invalid page number.
290          *      Mainly used to denote an empty tree.
291          */
292 #define P_INVALID        (~0UL)
293
294         /** Test if a flag \b f is set in a flag word \b w. */
295 #define F_ISSET(w, f)    (((w) & (f)) == (f))
296
297         /**     Used for offsets within a single page.
298          *      Since memory pages are typically 4 or 8KB in size, 12-13 bits,
299          *      this is plenty.
300          */
301 typedef uint16_t         indx_t;
302
303         /**     Default size of memory map.
304          *      This is certainly too small for any actual applications. Apps should always set
305          *      the size explicitly using #mdb_env_set_mapsize().
306          */
307 #define DEFAULT_MAPSIZE 1048576
308
309 /**     @defgroup readers       Reader Lock Table
310  *      Readers don't acquire any locks for their data access. Instead, they
311  *      simply record their transaction ID in the reader table. The reader
312  *      mutex is needed just to find an empty slot in the reader table. The
313  *      slot's address is saved in thread-specific data so that subsequent read
314  *      transactions started by the same thread need no further locking to proceed.
315  *
316  *      Since the database uses multi-version concurrency control, readers don't
317  *      actually need any locking. This table is used to keep track of which
318  *      readers are using data from which old transactions, so that we'll know
319  *      when a particular old transaction is no longer in use, Old transactions
320  *      that have freed any data pages can then have their freed pages reclaimed
321  *      for use by a later write transaction.
322  *
323  *      The lock table is constructed such that reader slots are aligned with the
324  *      processor's cache line size. Any slot is only ever used by one thread.
325  *      This alignment guarantees that there will be no contention or cache
326  *      thrashing as threads update their own slot info, and also eliminates
327  *      any need for locking when accessing a slot.
328  *
329  *      A writer thread will scan every slot in the table to determine the oldest
330  *      outstanding reader transaction. Any freed pages older than this will be
331  *      reclaimed by the writer. The writer doesn't use any locks when scanning
332  *      this table. This means that there's no guarantee that the writer will
333  *      see the most up-to-date reader info, but that's not required for correct
334  *      operation - all we need is to know the upper bound on the oldest reader,
335  *      we don't care at all about the newest reader. So the only consequence of
336  *      reading stale information here is that old pages might hang around a
337  *      while longer before being reclaimed. That's actually good anyway, because
338  *      the longer we delay reclaiming old pages, the more likely it is that a
339  *      string of contiguous pages can be found after coalescing old pages from
340  *      many old transactions together.
341  *
342  *      @todo We don't actually do such coalescing yet, we grab pages from one
343  *      old transaction at a time.
344  *      @{
345  */
346         /**     Number of slots in the reader table.
347          *      This value was chosen somewhat arbitrarily. 126 readers plus a
348          *      couple mutexes fit exactly into 8KB on my development machine.
349          *      Applications should set the table size using #mdb_env_set_maxreaders().
350          */
351 #define DEFAULT_READERS 126
352
353         /**     The size of a CPU cache line in bytes. We want our lock structures
354          *      aligned to this size to avoid false cache line sharing in the
355          *      lock table.
356          *      This value works for most CPUs. For Itanium this should be 128.
357          */
358 #ifndef CACHELINE
359 #define CACHELINE       64
360 #endif
361
362         /**     The information we store in a single slot of the reader table.
363          *      In addition to a transaction ID, we also record the process and
364          *      thread ID that owns a slot, so that we can detect stale information,
365          *      e.g. threads or processes that went away without cleaning up.
366          *      @note We currently don't check for stale records. We simply re-init
367          *      the table when we know that we're the only process opening the
368          *      lock file.
369          */
370 typedef struct MDB_rxbody {
371         /**     The current Transaction ID when this transaction began.
372          *      Multiple readers that start at the same time will probably have the
373          *      same ID here. Again, it's not important to exclude them from
374          *      anything; all we need to know is which version of the DB they
375          *      started from so we can avoid overwriting any data used in that
376          *      particular version.
377          */
378         ULONG           mrb_txnid;
379         /** The process ID of the process owning this reader txn. */
380         pid_t           mrb_pid;
381         /** The thread ID of the thread owning this txn. */
382         pthread_t       mrb_tid;
383 } MDB_rxbody;
384
385         /** The actual reader record, with cacheline padding. */
386 typedef struct MDB_reader {
387         union {
388                 MDB_rxbody mrx;
389                 /** shorthand for mrb_txnid */
390 #define mr_txnid        mru.mrx.mrb_txnid
391 #define mr_pid  mru.mrx.mrb_pid
392 #define mr_tid  mru.mrx.mrb_tid
393                 /** cache line alignment */
394                 char pad[(sizeof(MDB_rxbody)+CACHELINE-1) & ~(CACHELINE-1)];
395         } mru;
396 } MDB_reader;
397
398         /** The header for the reader table.
399          *      The table resides in a memory-mapped file. (This is a different file
400          *      than is used for the main database.)
401          *
402          *      For POSIX the actual mutexes reside in the shared memory of this
403          *      mapped file. On Windows, mutexes are named objects allocated by the
404          *      kernel; we store the mutex names in this mapped file so that other
405          *      processes can grab them. This same approach will also be used on
406          *      MacOSX/Darwin (using named semaphores) since MacOSX doesn't support
407          *      process-shared POSIX mutexes.
408          */
409 typedef struct MDB_txbody {
410                 /** Stamp identifying this as an MDB lock file. It must be set
411                  *      to #MDB_MAGIC. */
412         uint32_t        mtb_magic;
413                 /** Version number of this lock file. Must be set to #MDB_VERSION. */
414         uint32_t        mtb_version;
415 #ifdef _WIN32
416         char    mtb_rmname[32];
417 #else
418                 /** Mutex protecting access to this table.
419                  *      This is the reader lock that #LOCK_MUTEX_R acquires.
420                  */
421         pthread_mutex_t mtb_mutex;
422 #endif
423                 /**     The ID of the last transaction committed to the database.
424                  *      This is recorded here only for convenience; the value can always
425                  *      be determined by reading the main database meta pages.
426                  */
427         ULONG           mtb_txnid;
428                 /** The number of slots that have been used in the reader table.
429                  *      This always records the maximum count, it is not decremented
430                  *      when readers release their slots.
431                  */
432         uint32_t        mtb_numreaders;
433                 /**     The ID of the most recent meta page in the database.
434                  *      This is recorded here only for convenience; the value can always
435                  *      be determined by reading the main database meta pages.
436                  */
437         uint32_t        mtb_me_toggle;
438 } MDB_txbody;
439
440         /** The actual reader table definition. */
441 typedef struct MDB_txninfo {
442         union {
443                 MDB_txbody mtb;
444 #define mti_magic       mt1.mtb.mtb_magic
445 #define mti_version     mt1.mtb.mtb_version
446 #define mti_mutex       mt1.mtb.mtb_mutex
447 #define mti_rmname      mt1.mtb.mtb_rmname
448 #define mti_txnid       mt1.mtb.mtb_txnid
449 #define mti_numreaders  mt1.mtb.mtb_numreaders
450 #define mti_me_toggle   mt1.mtb.mtb_me_toggle
451                 char pad[(sizeof(MDB_txbody)+CACHELINE-1) & ~(CACHELINE-1)];
452         } mt1;
453         union {
454 #ifdef _WIN32
455                 char mt2_wmname[32];
456 #define mti_wmname      mt2.mt2_wmname
457 #else
458                 pthread_mutex_t mt2_wmutex;
459 #define mti_wmutex      mt2.mt2_wmutex
460 #endif
461                 char pad[(sizeof(pthread_mutex_t)+CACHELINE-1) & ~(CACHELINE-1)];
462         } mt2;
463         MDB_reader      mti_readers[1];
464 } MDB_txninfo;
465 /** @} */
466
467 /** Common header for all page types.
468  * Overflow pages occupy a number of contiguous pages with no
469  * headers on any page after the first.
470  */
471 typedef struct MDB_page {
472         union {
473                 pgno_t          mp_pgno;        /**< page number */
474                 void *          mp_next;        /**< for in-memory list of freed structs */
475         };
476 #define P_BRANCH         0x01           /**< branch page */
477 #define P_LEAF           0x02           /**< leaf page */
478 #define P_OVERFLOW       0x04           /**< overflow page */
479 #define P_META           0x08           /**< meta page */
480 #define P_DIRTY          0x10           /**< dirty page */
481 #define P_LEAF2          0x20           /**< for #MDB_DUPFIXED records */
482         uint32_t        mp_flags;
483         union {
484                 struct {
485                         indx_t          mp_lower;               /**< lower bound of free space */
486                         indx_t          mp_upper;               /**< upper bound of free space */
487                 };
488                 uint32_t        mp_pages;       /**< number of overflow pages */
489         };
490         indx_t          mp_ptrs[1];             /**< dynamic size */
491 } MDB_page;
492
493         /** Size of the page header, excluding dynamic data at the end */
494 #define PAGEHDRSZ        ((unsigned) offsetof(MDB_page, mp_ptrs))
495
496         /** Address of first usable data byte in a page, after the header */
497 #define METADATA(p)      ((void *)((char *)(p) + PAGEHDRSZ))
498
499         /** Number of nodes on a page */
500 #define NUMKEYS(p)       (((p)->mp_lower - PAGEHDRSZ) >> 1)
501
502         /** The amount of space remaining in the page */
503 #define SIZELEFT(p)      (indx_t)((p)->mp_upper - (p)->mp_lower)
504
505         /** The percentage of space used in the page, in tenths of a percent. */
506 #define PAGEFILL(env, p) (1000L * ((env)->me_psize - PAGEHDRSZ - SIZELEFT(p)) / \
507                                 ((env)->me_psize - PAGEHDRSZ))
508         /** The minimum page fill factor, in tenths of a percent.
509          *      Pages emptier than this are candidates for merging.
510          */
511 #define FILL_THRESHOLD   250
512
513         /** Test if a page is a leaf page */
514 #define IS_LEAF(p)       F_ISSET((p)->mp_flags, P_LEAF)
515         /** Test if a page is a LEAF2 page */
516 #define IS_LEAF2(p)      F_ISSET((p)->mp_flags, P_LEAF2)
517         /** Test if a page is a branch page */
518 #define IS_BRANCH(p)     F_ISSET((p)->mp_flags, P_BRANCH)
519         /** Test if a page is an overflow page */
520 #define IS_OVERFLOW(p)   F_ISSET((p)->mp_flags, P_OVERFLOW)
521
522         /** The number of overflow pages needed to store the given size. */
523 #define OVPAGES(size, psize)    ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
524
525         /** Header for a single key/data pair within a page.
526          * We guarantee 2-byte alignment for nodes.
527          */
528 typedef struct MDB_node {
529         /** lo and hi are used for data size on leaf nodes and for
530          * child pgno on branch nodes. On 64 bit platforms, flags
531          * is also used for pgno. (branch nodes ignore flags)
532          */
533         unsigned short  mn_lo;
534         unsigned short  mn_hi;                  /**< part of dsize or pgno */
535         unsigned short  mn_flags;               /**< flags for special node types */
536 #define F_BIGDATA        0x01                   /**< data put on overflow page */
537 #define F_SUBDATA        0x02                   /**< data is a sub-database */
538 #define F_DUPDATA        0x04                   /**< data has duplicates */
539         unsigned short  mn_ksize;               /**< key size */
540         char            mn_data[1];                     /**< key and data are appended here */
541 } MDB_node;
542
543         /** Size of the node header, excluding dynamic data at the end */
544 #define NODESIZE         offsetof(MDB_node, mn_data)
545
546         /** Size of a node in a branch page.
547          *      This is just the node header plus the key, there is no data.
548          */
549 #define INDXSIZE(k)      (NODESIZE + ((k) == NULL ? 0 : (k)->mv_size))
550
551         /** Size of a node in a leaf page.
552          *      This is node header plus key plus data size.
553          */
554 #define LEAFSIZE(k, d)   (NODESIZE + (k)->mv_size + (d)->mv_size)
555
556         /** Address of node \i in page \p */
557 #define NODEPTR(p, i)    ((MDB_node *)((char *)(p) + (p)->mp_ptrs[i]))
558
559         /** Address of the key for the node */
560 #define NODEKEY(node)    (void *)((node)->mn_data)
561
562         /** Address of the data for a node */
563 #define NODEDATA(node)   (void *)((char *)(node)->mn_data + (node)->mn_ksize)
564
565         /** Get the page number pointed to by a branch node */
566 #if LONG_MAX == 0x7fffffff
567 #define NODEPGNO(node)   ((node)->mn_lo | ((node)->mn_hi << 16))
568         /** Set the page number in a branch node */
569 #define SETPGNO(node,pgno)      do { \
570         (node)->mn_lo = (pgno) & 0xffff; (node)->mn_hi = (pgno) >> 16;} while(0)
571 #else
572 #define NODEPGNO(node)   ((node)->mn_lo | ((node)->mn_hi << 16) | ((unsigned long)(node)->mn_flags << 32))
573         /** Set the page number in a branch node */
574 #define SETPGNO(node,pgno)      do { \
575         (node)->mn_lo = (pgno) & 0xffff; (node)->mn_hi = (pgno) >> 16; \
576         (node)->mn_flags = (pgno) >> 32; } while(0)
577 #endif
578
579         /** Get the size of the data in a leaf node */
580 #define NODEDSZ(node)    ((node)->mn_lo | ((unsigned)(node)->mn_hi << 16))
581         /** Set the size of the data for a leaf node */
582 #define SETDSZ(node,size)       do { \
583         (node)->mn_lo = (size) & 0xffff; (node)->mn_hi = (size) >> 16;} while(0)
584         /** The size of a key in a node */
585 #define NODEKSZ(node)    ((node)->mn_ksize)
586
587         /** The address of a key in a LEAF2 page.
588          *      LEAF2 pages are used for #MDB_DUPFIXED sorted-duplicate sub-DBs.
589          *      There are no node headers, keys are stored contiguously.
590          */
591 #define LEAF2KEY(p, i, ks)      ((char *)(p) + PAGEHDRSZ + ((i)*(ks)))
592
593         /** Set the \b node's key into \b key, if requested. */
594 #define MDB_SET_KEY(node, key)  if (key!=NULL) {(key)->mv_size = NODEKSZ(node); (key)->mv_data = NODEKEY(node);}
595
596         /** Information about a single database in the environment. */
597 typedef struct MDB_db {
598         uint32_t        md_pad;         /**< also ksize for LEAF2 pages */
599         uint16_t        md_flags;       /**< @ref mdb_open */
600         uint16_t        md_depth;       /**< depth of this tree */
601         ULONG           md_branch_pages;        /**< number of internal pages */
602         ULONG           md_leaf_pages;          /**< number of leaf pages */
603         ULONG           md_overflow_pages;      /**< number of overflow pages */
604         ULONG           md_entries;             /**< number of data items */
605         pgno_t          md_root;                /**< the root page of this tree */
606 } MDB_db;
607
608         /** Handle for the DB used to track free pages. */
609 #define FREE_DBI        0
610         /** Handle for the default DB. */
611 #define MAIN_DBI        1
612
613         /** Meta page content. */
614 typedef struct MDB_meta {
615                 /** Stamp identifying this as an MDB data file. It must be set
616                  *      to #MDB_MAGIC. */
617         uint32_t        mm_magic;
618                 /** Version number of this lock file. Must be set to #MDB_VERSION. */
619         uint32_t        mm_version;
620         void            *mm_address;            /**< address for fixed mapping */
621         size_t          mm_mapsize;                     /**< size of mmap region */
622         MDB_db          mm_dbs[2];                      /**< first is free space, 2nd is main db */
623         /** The size of pages used in this DB */
624 #define mm_psize        mm_dbs[0].md_pad
625         /** Any persistent environment flags. @ref mdb_env */
626 #define mm_flags        mm_dbs[0].md_flags
627         pgno_t          mm_last_pg;                     /**< last used page in file */
628         ULONG           mm_txnid;                       /**< txnid that committed this page */
629 } MDB_meta;
630
631         /** Auxiliary DB info.
632          *      The information here is mostly static/read-only. There is
633          *      only a single copy of this record in the environment.
634          *      The \b md_dirty flag is not read-only, but only a write
635          *      transaction can ever update it, and only write transactions
636          *      need to worry about it.
637          */
638 typedef struct MDB_dbx {
639         MDB_val         md_name;                /**< name of the database */
640         MDB_cmp_func    *md_cmp;        /**< function for comparing keys */
641         MDB_cmp_func    *md_dcmp;       /**< function for comparing data items */
642         MDB_rel_func    *md_rel;        /**< user relocate function */
643         MDB_dbi md_parent;                      /**< parent DB of a sub-DB */
644         unsigned int    md_dirty;       /**< TRUE if DB was written in this txn */
645 } MDB_dbx;
646
647         /** A database transaction.
648          *      Every operation requires a transaction handle.
649          */
650 struct MDB_txn {
651         pgno_t          mt_next_pgno;   /**< next unallocated page */
652         /** The ID of this transaction. IDs are integers incrementing from 1.
653          *      Only committed write transactions increment the ID. If a transaction
654          *      aborts, the ID may be re-used by the next writer.
655          */
656         ULONG           mt_txnid;
657         MDB_env         *mt_env;                /**< the DB environment */
658         /** The list of pages that became unused during this transaction.
659          *      This is an #IDL.
660          */
661         pgno_t          *mt_free_pgs;
662         union {
663                 ID2L    dirty_list;     /**< modified pages */
664                 MDB_reader      *reader;        /**< this thread's slot in the reader table */
665         } mt_u;
666         /** Array of records for each DB known in the environment. */
667         MDB_dbx         *mt_dbxs;
668         /** Array of MDB_db records for each known DB */
669         MDB_db          *mt_dbs;
670         /**     Number of DB records in use. This number only ever increments;
671          *      we don't decrement it when individual DB handles are closed.
672          */
673         unsigned int    mt_numdbs;
674
675 #define MDB_TXN_RDONLY          0x01            /**< read-only transaction */
676 #define MDB_TXN_ERROR           0x02            /**< an error has occurred */
677         unsigned int    mt_flags;
678         /** Tracks which of the two meta pages was used at the start
679          *      of this transaction.
680          */
681         unsigned int    mt_toggle;
682 };
683
684 /** Enough space for 2^32 nodes with minimum of 2 keys per node. I.e., plenty.
685  * At 4 keys per node, enough for 2^64 nodes, so there's probably no need to
686  * raise this on a 64 bit machine.
687  */
688 #define CURSOR_STACK             32
689
690 struct MDB_xcursor;
691
692         /** Cursors are used for all DB operations */
693 struct MDB_cursor {
694         /** Context used for databases with #MDB_DUPSORT, otherwise NULL */
695         struct MDB_xcursor      *mc_xcursor;
696         /** The transaction that owns this cursor */
697         MDB_txn         *mc_txn;
698         /** The database handle this cursor operates on */
699         MDB_dbi         mc_dbi;
700         unsigned short  mc_snum;        /**< number of pushed pages */
701         unsigned short  mc_top;         /**< index of top page, mc_snum-1 */
702         unsigned int    mc_flags;
703 #define C_INITIALIZED   0x01    /**< cursor has been initialized and is valid */
704 #define C_EOF   0x02                    /**< No more data */
705 #define C_XDIRTY        0x04            /**< @deprecated mc_xcursor needs to be flushed */
706         MDB_page        *mc_pg[CURSOR_STACK];   /**< stack of pushed pages */
707         indx_t          mc_ki[CURSOR_STACK];    /**< stack of page indices */
708 };
709
710         /** Context for sorted-dup records.
711          *      We could have gone to a fully recursive design, with arbitrarily
712          *      deep nesting of sub-databases. But for now we only handle these
713          *      levels - main DB, optional sub-DB, sorted-duplicate DB.
714          */
715 typedef struct MDB_xcursor {
716         /** A sub-cursor for traversing the Dup DB */
717         MDB_cursor mx_cursor;
718         /** A fake transaction struct for pointing to our own table
719          *      of DB info.
720          */
721         MDB_txn mx_txn;
722         /**     Our private DB information tables. Slots 0 and 1 are always
723          *      copies of the corresponding slots in the main transaction. These
724          *      hold the FREEDB and MAINDB, respectively. If the main cursor is
725          *      on a sub-database, that will be copied to slot 2, and the duplicate
726          *      database info will be in slot 3. If the main cursor is on the MAINDB
727          *      then the duplicate DB info will be in slot 2 and slot 3 will be unused.
728          */
729         MDB_dbx mx_dbxs[4];
730         /** MDB_db table */
731         MDB_db  mx_dbs[4];
732 } MDB_xcursor;
733
734         /** A set of pages freed by an earlier transaction. */
735 typedef struct MDB_oldpages {
736         /** Usually we only read one record from the FREEDB at a time, but
737          *      in case we read more, this will chain them together.
738          */
739         struct MDB_oldpages *mo_next;
740         /**     The ID of the transaction in which these pages were freed. */
741         ULONG           mo_txnid;
742         /** An #IDL of the pages */
743         pgno_t          mo_pages[1];    /* dynamic */
744 } MDB_oldpages;
745
746         /** The database environment. */
747 struct MDB_env {
748         HANDLE          me_fd;          /**< The main data file */
749         HANDLE          me_lfd;         /**< The lock file */
750         HANDLE          me_mfd;                 /**< just for writing the meta pages */
751 #define MDB_FATAL_ERROR 0x80000000U
752         uint32_t        me_flags;
753         uint32_t        me_extrapad;    /**< unused for now */
754         unsigned int    me_maxreaders;  /**< size of the reader table */
755         unsigned int    me_numdbs;              /**< number of DBs opened */
756         unsigned int    me_maxdbs;              /**< size of the DB table */
757         char            *me_path;               /**< path to the DB files */
758         char            *me_map;                /**< the memory map of the data file */
759         MDB_txninfo     *me_txns;               /**< the memory map of the lock file */
760         MDB_meta        *me_metas[2];   /**< pointers to the two meta pages */
761         MDB_txn         *me_txn;                /**< current write transaction */
762         size_t          me_mapsize;             /**< size of the data memory map */
763         off_t           me_size;                /**< current file size */
764         pgno_t          me_maxpg;               /**< me_mapsize / me_psize */
765         unsigned int    me_psize;       /**< size of a page, from #GetPageSize */
766         unsigned int    me_db_toggle;   /**< which DB table is current */
767         MDB_dbx         *me_dbxs;               /**< array of static DB info */
768         MDB_db          *me_dbs[2];             /**< two arrays of MDB_db info */
769         MDB_oldpages *me_pghead;        /**< list of old page records */
770         pthread_key_t   me_txkey;       /**< thread-key for readers */
771         MDB_page        *me_dpages;             /**< list of malloc'd blocks for re-use */
772         /** IDL of pages that became unused in a write txn */
773         pgno_t          me_free_pgs[MDB_IDL_UM_SIZE];
774         /** ID2L of pages that were written during a write txn */
775         ID2                     me_dirty_list[MDB_IDL_UM_SIZE];
776         /** rwlock for the DB tables, if #LAZY_LOCKS is false */
777         LAZY_RWLOCK_DEF(me_dblock);
778 #ifdef _WIN32
779         HANDLE          me_rmutex;              /* Windows mutexes don't reside in shared mem */
780         HANDLE          me_wmutex;
781 #endif
782 };
783         /** max number of pages to commit in one writev() call */
784 #define MDB_COMMIT_PAGES         64
785
786 static MDB_page *mdb_alloc_page(MDB_cursor *mc, int num);
787 static int              mdb_touch(MDB_cursor *mc);
788
789 static int  mdb_search_page_root(MDB_cursor *mc,
790                             MDB_val *key, int modify);
791 static int  mdb_search_page(MDB_cursor *mc,
792                             MDB_val *key, int modify);
793
794 static int  mdb_env_read_header(MDB_env *env, MDB_meta *meta);
795 static int  mdb_env_read_meta(MDB_env *env, int *which);
796 static int  mdb_env_write_meta(MDB_txn *txn);
797 static int  mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **mp);
798
799 static MDB_node *mdb_search_node(MDB_cursor *mc, MDB_val *key, int *exactp);
800 static int  mdb_add_node(MDB_cursor *mc, indx_t indx,
801                             MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags);
802 static void mdb_del_node(MDB_page *mp, indx_t indx, int ksize);
803 static int mdb_del0(MDB_cursor *mc, MDB_node *leaf);
804 static int  mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
805
806 static int      mdb_rebalance(MDB_cursor *mc);
807 static int      mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key);
808 static int      mdb_move_node(MDB_cursor *csrcrc, MDB_cursor *cdstst);
809 static int      mdb_merge(MDB_cursor *csrcrc, MDB_cursor *cdstst);
810 static int      mdb_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
811                                 pgno_t newpgno);
812 static MDB_page *mdb_new_page(MDB_cursor *mc, uint32_t flags, int num);
813
814 static void     cursor_pop_page(MDB_cursor *mc);
815 static int      cursor_push_page(MDB_cursor *mc, MDB_page *mp);
816
817 static int      mdb_sibling(MDB_cursor *mc, int move_right);
818 static int      mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op);
819 static int      mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op);
820 static int      mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op,
821                                 int *exactp);
822 static int      mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data);
823 static int      mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data);
824
825 static void     mdb_xcursor_init0(MDB_cursor *mc);
826 static void     mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node);
827 static void     mdb_xcursor_init2(MDB_cursor *mc);
828 static void     mdb_xcursor_fini(MDB_cursor *mc);
829
830 static size_t   mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
831 static size_t   mdb_branch_size(MDB_env *env, MDB_val *key);
832
833 static void mdb_default_cmp(MDB_txn *txn, MDB_dbi dbi);
834
835 /** @cond */
836 static MDB_cmp_func     memncmp, memnrcmp, intcmp, cintcmp;
837 /** @endcond */
838
839 #ifdef _WIN32
840 static SECURITY_DESCRIPTOR mdb_null_sd;
841 static SECURITY_ATTRIBUTES mdb_all_sa;
842 static int mdb_sec_inited;
843 #endif
844
845 char *
846 mdb_version(int *major, int *minor, int *patch)
847 {
848         if (major) *major = MDB_VERSION_MAJOR;
849         if (minor) *minor = MDB_VERSION_MINOR;
850         if (patch) *patch = MDB_VERSION_PATCH;
851         return MDB_VERSION_STRING;
852 }
853
854         /** Table of descriptions for MDB @ref error codes */
855 static char *const mdb_errstr[] = {
856         "MDB_KEYEXIST: Key/data pair already exists",
857         "MDB_NOTFOUND: No matching key/data pair found",
858         "MDB_PAGE_NOTFOUND: Requested page not found",
859         "MDB_CORRUPTED: Located page was wrong type",
860         "MDB_PANIC: Update of meta page failed",
861         "MDB_VERSION_MISMATCH: Database environment version mismatch"
862 };
863
864 char *
865 mdb_strerror(int err)
866 {
867         if (!err)
868                 return ("Successful return: 0");
869
870         if (err >= MDB_KEYEXIST && err <= MDB_VERSION_MISMATCH)
871                 return mdb_errstr[err - MDB_KEYEXIST];
872
873         return strerror(err);
874 }
875
876 #if DEBUG
877 static char *
878 mdb_dkey(MDB_val *key, char *buf)
879 {
880         char *ptr = buf;
881         unsigned char *c = key->mv_data;
882         unsigned int i;
883         if (key->mv_size > MAXKEYSIZE)
884                 return "MAXKEYSIZE";
885 #if 1
886         for (i=0; i<key->mv_size; i++)
887                 ptr += sprintf(ptr, "%02x", *c++);
888 #else
889         sprintf(buf, "%.*s", key->mv_size, key->mv_data);
890 #endif
891         return buf;
892 }
893 #endif
894
895 int
896 mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
897 {
898         return txn->mt_dbxs[dbi].md_cmp(a, b);
899 }
900
901 int
902 mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
903 {
904         if (txn->mt_dbxs[dbi].md_dcmp)
905                 return txn->mt_dbxs[dbi].md_dcmp(a, b);
906         else
907                 return EINVAL;  /* too bad you can't distinguish this from a valid result */
908 }
909
910 /* Allocate new page(s) for writing */
911 static MDB_page *
912 mdb_alloc_page(MDB_cursor *mc, int num)
913 {
914         MDB_txn *txn = mc->mc_txn;
915         MDB_page *np;
916         pgno_t pgno = P_INVALID;
917         ID2 mid;
918
919         if (txn->mt_txnid > 2) {
920
921                 if (!txn->mt_env->me_pghead && mc->mc_dbi != FREE_DBI &&
922                         txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
923                         /* See if there's anything in the free DB */
924                         MDB_cursor m2;
925                         MDB_node *leaf;
926                         ULONG *kptr, oldest;
927
928                         m2.mc_txn = txn;
929                         m2.mc_dbi = FREE_DBI;
930                         m2.mc_snum = 0;
931                         m2.mc_flags = 0;
932                         mdb_search_page(&m2, NULL, 0);
933                         leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
934                         kptr = (ULONG *)NODEKEY(leaf);
935
936                         {
937                                 unsigned int i;
938                                 oldest = txn->mt_txnid - 1;
939                                 for (i=0; i<txn->mt_env->me_txns->mti_numreaders; i++) {
940                                         ULONG mr = txn->mt_env->me_txns->mti_readers[i].mr_txnid;
941                                         if (mr && mr < oldest)
942                                                 oldest = mr;
943                                 }
944                         }
945
946                         if (oldest > *kptr) {
947                                 /* It's usable, grab it.
948                                  */
949                                 MDB_oldpages *mop;
950                                 MDB_val data;
951                                 pgno_t *idl;
952
953                                 mdb_read_data(txn, leaf, &data);
954                                 idl = (ULONG *)data.mv_data;
955                                 mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
956                                 mop->mo_next = txn->mt_env->me_pghead;
957                                 mop->mo_txnid = *kptr;
958                                 txn->mt_env->me_pghead = mop;
959                                 memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
960
961 #if DEBUG > 1
962                                 {
963                                         unsigned int i;
964                                         DPRINTF("IDL read txn %lu root %lu num %lu",
965                                                 mop->mo_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
966                                         for (i=0; i<idl[0]; i++) {
967                                                 DPRINTF("IDL %lu", idl[i+1]);
968                                         }
969                                 }
970 #endif
971                                 /* drop this IDL from the DB */
972                                 m2.mc_ki[m2.mc_top] = 0;
973                                 m2.mc_flags = C_INITIALIZED;
974                                 mdb_cursor_del(&m2, 0);
975                         }
976                 }
977                 if (txn->mt_env->me_pghead) {
978                         MDB_oldpages *mop = txn->mt_env->me_pghead;
979                         if (num > 1) {
980                                 /* FIXME: For now, always use fresh pages. We
981                                  * really ought to search the free list for a
982                                  * contiguous range.
983                                  */
984                                 ;
985                         } else {
986                                 /* peel pages off tail, so we only have to truncate the list */
987                                 pgno = MDB_IDL_LAST(mop->mo_pages);
988                                 if (MDB_IDL_IS_RANGE(mop->mo_pages)) {
989                                         mop->mo_pages[2]++;
990                                         if (mop->mo_pages[2] > mop->mo_pages[1])
991                                                 mop->mo_pages[0] = 0;
992                                 } else {
993                                         mop->mo_pages[0]--;
994                                 }
995                                 if (MDB_IDL_IS_ZERO(mop->mo_pages)) {
996                                         txn->mt_env->me_pghead = mop->mo_next;
997                                         free(mop);
998                                 }
999                         }
1000                 }
1001         }
1002
1003         if (pgno == P_INVALID) {
1004                 /* DB size is maxed out */
1005                 if (txn->mt_next_pgno + num >= txn->mt_env->me_maxpg)
1006                         return NULL;
1007         }
1008         if (txn->mt_env->me_dpages && num == 1) {
1009                 np = txn->mt_env->me_dpages;
1010                 txn->mt_env->me_dpages = np->mp_next;
1011         } else {
1012                 if ((np = malloc(txn->mt_env->me_psize * num )) == NULL)
1013                         return NULL;
1014         }
1015         if (pgno == P_INVALID) {
1016                 np->mp_pgno = txn->mt_next_pgno;
1017                 txn->mt_next_pgno += num;
1018         } else {
1019                 np->mp_pgno = pgno;
1020         }
1021         mid.mid = np->mp_pgno;
1022         mid.mptr = np;
1023         mdb_mid2l_insert(txn->mt_u.dirty_list, &mid);
1024
1025         return np;
1026 }
1027
1028 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
1029  */
1030 static int
1031 mdb_touch(MDB_cursor *mc)
1032 {
1033         MDB_page *mp = mc->mc_pg[mc->mc_top];
1034         pgno_t  pgno;
1035
1036         if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
1037                 MDB_page *np;
1038                 if ((np = mdb_alloc_page(mc, 1)) == NULL)
1039                         return ENOMEM;
1040                 DPRINTF("touched db %u page %lu -> %lu", mc->mc_dbi, mp->mp_pgno, np->mp_pgno);
1041                 assert(mp->mp_pgno != np->mp_pgno);
1042                 mdb_midl_append(mc->mc_txn->mt_free_pgs, mp->mp_pgno);
1043                 pgno = np->mp_pgno;
1044                 memcpy(np, mp, mc->mc_txn->mt_env->me_psize);
1045                 mp = np;
1046                 mp->mp_pgno = pgno;
1047                 mp->mp_flags |= P_DIRTY;
1048
1049                 mc->mc_pg[mc->mc_top] = mp;
1050                 /* Update the page number to new touched page. */
1051                 if (mc->mc_top)
1052                         SETPGNO(NODEPTR(mc->mc_pg[mc->mc_top-1], mc->mc_ki[mc->mc_top-1]), mp->mp_pgno);
1053         }
1054         return 0;
1055 }
1056
1057 int
1058 mdb_env_sync(MDB_env *env, int force)
1059 {
1060         int rc = 0;
1061         if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) {
1062                 if (fdatasync(env->me_fd))
1063                         rc = ErrCode();
1064         }
1065         return rc;
1066 }
1067
1068 static inline void
1069 mdb_txn_reset0(MDB_txn *txn);
1070
1071 static inline int
1072 mdb_txn_renew0(MDB_txn *txn)
1073 {
1074         MDB_env *env = txn->mt_env;
1075
1076         if (txn->mt_flags & MDB_TXN_RDONLY) {
1077                 MDB_reader *r = pthread_getspecific(env->me_txkey);
1078                 if (!r) {
1079                         unsigned int i;
1080                         pid_t pid = getpid();
1081                         pthread_t tid = pthread_self();
1082
1083                         LOCK_MUTEX_R(env);
1084                         for (i=0; i<env->me_txns->mti_numreaders; i++)
1085                                 if (env->me_txns->mti_readers[i].mr_pid == 0)
1086                                         break;
1087                         if (i == env->me_maxreaders) {
1088                                 UNLOCK_MUTEX_R(env);
1089                                 return ENOMEM;
1090                         }
1091                         env->me_txns->mti_readers[i].mr_pid = pid;
1092                         env->me_txns->mti_readers[i].mr_tid = tid;
1093                         if (i >= env->me_txns->mti_numreaders)
1094                                 env->me_txns->mti_numreaders = i+1;
1095                         UNLOCK_MUTEX_R(env);
1096                         r = &env->me_txns->mti_readers[i];
1097                         pthread_setspecific(env->me_txkey, r);
1098                 }
1099                 txn->mt_txnid = env->me_txns->mti_txnid;
1100                 txn->mt_toggle = env->me_txns->mti_me_toggle;
1101                 r->mr_txnid = txn->mt_txnid;
1102                 txn->mt_u.reader = r;
1103         } else {
1104                 LOCK_MUTEX_W(env);
1105
1106                 txn->mt_txnid = env->me_txns->mti_txnid+1;
1107                 txn->mt_toggle = env->me_txns->mti_me_toggle;
1108                 txn->mt_u.dirty_list = env->me_dirty_list;
1109                 txn->mt_u.dirty_list[0].mid = 0;
1110                 txn->mt_free_pgs = env->me_free_pgs;
1111                 txn->mt_free_pgs[0] = 0;
1112                 txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
1113                 env->me_txn = txn;
1114         }
1115
1116         /* Copy the DB arrays */
1117         LAZY_RWLOCK_RDLOCK(&env->me_dblock);
1118         txn->mt_numdbs = env->me_numdbs;
1119         txn->mt_dbxs = env->me_dbxs;    /* mostly static anyway */
1120         memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db));
1121         if (txn->mt_numdbs > 2)
1122                 memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2,
1123                         (txn->mt_numdbs - 2) * sizeof(MDB_db));
1124         LAZY_RWLOCK_UNLOCK(&env->me_dblock);
1125
1126         return MDB_SUCCESS;
1127 }
1128
1129 int
1130 mdb_txn_renew(MDB_txn *txn)
1131 {
1132         int rc;
1133
1134         if (!txn)
1135                 return EINVAL;
1136
1137         if (txn->mt_env->me_flags & MDB_FATAL_ERROR) {
1138                 DPUTS("environment had fatal error, must shutdown!");
1139                 return MDB_PANIC;
1140         }
1141
1142         rc = mdb_txn_renew0(txn);
1143         if (rc == MDB_SUCCESS) {
1144                 DPRINTF("renew txn %lu%c %p on mdbenv %p, root page %lu",
1145                         txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
1146                         (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
1147         }
1148         return rc;
1149 }
1150
1151 int
1152 mdb_txn_begin(MDB_env *env, unsigned int flags, MDB_txn **ret)
1153 {
1154         MDB_txn *txn;
1155         int rc;
1156
1157         if (env->me_flags & MDB_FATAL_ERROR) {
1158                 DPUTS("environment had fatal error, must shutdown!");
1159                 return MDB_PANIC;
1160         }
1161         if ((txn = calloc(1, sizeof(MDB_txn) + env->me_maxdbs * sizeof(MDB_db))) == NULL) {
1162                 DPRINTF("calloc: %s", strerror(ErrCode()));
1163                 return ENOMEM;
1164         }
1165         txn->mt_dbs = (MDB_db *)(txn+1);
1166         if (flags & MDB_RDONLY) {
1167                 txn->mt_flags |= MDB_TXN_RDONLY;
1168         }
1169         txn->mt_env = env;
1170
1171         rc = mdb_txn_renew0(txn);
1172         if (rc)
1173                 free(txn);
1174         else {
1175                 *ret = txn;
1176                 DPRINTF("begin txn %lu%c %p on mdbenv %p, root page %lu",
1177                         txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
1178                         (void *) env, txn->mt_dbs[MAIN_DBI].md_root);
1179         }
1180
1181         return rc;
1182 }
1183
1184 static inline void
1185 mdb_txn_reset0(MDB_txn *txn)
1186 {
1187         MDB_env *env = txn->mt_env;
1188
1189         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
1190                 txn->mt_u.reader->mr_txnid = 0;
1191         } else {
1192                 MDB_oldpages *mop;
1193                 MDB_page *dp;
1194                 unsigned int i;
1195
1196                 /* return all dirty pages to dpage list */
1197                 for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
1198                         dp = txn->mt_u.dirty_list[i].mptr;
1199                         if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
1200                                 dp->mp_next = txn->mt_env->me_dpages;
1201                                 txn->mt_env->me_dpages = dp;
1202                         } else {
1203                                 /* large pages just get freed directly */
1204                                 free(dp);
1205                         }
1206                 }
1207
1208                 while ((mop = txn->mt_env->me_pghead)) {
1209                         txn->mt_env->me_pghead = mop->mo_next;
1210                         free(mop);
1211                 }
1212
1213                 env->me_txn = NULL;
1214                 for (i=2; i<env->me_numdbs; i++)
1215                         env->me_dbxs[i].md_dirty = 0;
1216                 UNLOCK_MUTEX_W(env);
1217         }
1218 }
1219
1220 void
1221 mdb_txn_reset(MDB_txn *txn)
1222 {
1223         if (txn == NULL)
1224                 return;
1225
1226         DPRINTF("reset txn %lu%c %p on mdbenv %p, root page %lu",
1227                 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
1228                 (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
1229
1230         mdb_txn_reset0(txn);
1231 }
1232
1233 void
1234 mdb_txn_abort(MDB_txn *txn)
1235 {
1236         if (txn == NULL)
1237                 return;
1238
1239         DPRINTF("abort txn %lu%c %p on mdbenv %p, root page %lu",
1240                 txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', txn,
1241                 (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root);
1242
1243         mdb_txn_reset0(txn);
1244         free(txn);
1245 }
1246
1247 int
1248 mdb_txn_commit(MDB_txn *txn)
1249 {
1250         int              n, done;
1251         unsigned int i;
1252         ssize_t          rc;
1253         off_t            size;
1254         MDB_page        *dp;
1255         MDB_env *env;
1256         pgno_t  next;
1257         MDB_cursor mc;
1258
1259         assert(txn != NULL);
1260         assert(txn->mt_env != NULL);
1261
1262         env = txn->mt_env;
1263
1264         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
1265                 mdb_txn_abort(txn);
1266                 return MDB_SUCCESS;
1267         }
1268
1269         if (txn != env->me_txn) {
1270                 DPUTS("attempt to commit unknown transaction");
1271                 mdb_txn_abort(txn);
1272                 return EINVAL;
1273         }
1274
1275         if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) {
1276                 DPUTS("error flag is set, can't commit");
1277                 mdb_txn_abort(txn);
1278                 return EINVAL;
1279         }
1280
1281         if (!txn->mt_u.dirty_list[0].mid)
1282                 goto done;
1283
1284         DPRINTF("committing txn %lu %p on mdbenv %p, root page %lu",
1285             txn->mt_txnid, txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root);
1286
1287         mc.mc_txn = txn;
1288         mc.mc_dbi = FREE_DBI;
1289         mc.mc_flags = 0;
1290
1291         /* should only be one record now */
1292         if (env->me_pghead) {
1293                 /* make sure first page of freeDB is touched and on freelist */
1294                 mdb_search_page(&mc, NULL, 1);
1295         }
1296         /* save to free list */
1297         if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
1298                 MDB_val key, data;
1299                 ULONG i;
1300
1301                 /* make sure last page of freeDB is touched and on freelist */
1302                 key.mv_size = MAXKEYSIZE+1;
1303                 key.mv_data = NULL;
1304                 mdb_search_page(&mc, &key, 1);
1305
1306                 mdb_midl_sort(txn->mt_free_pgs);
1307 #if DEBUG > 1
1308                 {
1309                         unsigned int i;
1310                         ULONG *idl = txn->mt_free_pgs;
1311                         DPRINTF("IDL write txn %lu root %lu num %lu",
1312                                 txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, idl[0]);
1313                         for (i=0; i<idl[0]; i++) {
1314                                 DPRINTF("IDL %lu", idl[i+1]);
1315                         }
1316                 }
1317 #endif
1318                 /* write to last page of freeDB */
1319                 key.mv_size = sizeof(pgno_t);
1320                 key.mv_data = (char *)&txn->mt_txnid;
1321                 data.mv_data = txn->mt_free_pgs;
1322                 /* The free list can still grow during this call,
1323                  * despite the pre-emptive touches above. So check
1324                  * and make sure the entire thing got written.
1325                  */
1326                 do {
1327                         i = txn->mt_free_pgs[0];
1328                         data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
1329                         rc = mdb_cursor_put(&mc, &key, &data, 0);
1330                         if (rc) {
1331                                 mdb_txn_abort(txn);
1332                                 return rc;
1333                         }
1334                 } while (i != txn->mt_free_pgs[0]);
1335         }
1336         /* should only be one record now */
1337         if (env->me_pghead) {
1338                 MDB_val key, data;
1339                 MDB_oldpages *mop;
1340
1341                 mop = env->me_pghead;
1342                 key.mv_size = sizeof(pgno_t);
1343                 key.mv_data = (char *)&mop->mo_txnid;
1344                 data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
1345                 data.mv_data = mop->mo_pages;
1346                 mdb_cursor_put(&mc, &key, &data, 0);
1347                 free(env->me_pghead);
1348                 env->me_pghead = NULL;
1349         }
1350
1351         /* Update DB root pointers. Their pages have already been
1352          * touched so this is all in-place and cannot fail.
1353          */
1354         {
1355                 MDB_val data;
1356                 data.mv_size = sizeof(MDB_db);
1357
1358                 mc.mc_dbi = MAIN_DBI;
1359                 mc.mc_flags = 0;
1360                 for (i = 2; i < txn->mt_numdbs; i++) {
1361                         if (txn->mt_dbxs[i].md_dirty) {
1362                                 data.mv_data = &txn->mt_dbs[i];
1363                                 mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0);
1364                         }
1365                 }
1366         }
1367
1368         /* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
1369          */
1370         next = 0;
1371         i = 1;
1372         do {
1373 #ifdef _WIN32
1374                 /* Windows actually supports scatter/gather I/O, but only on
1375                  * unbuffered file handles. Since we're relying on the OS page
1376                  * cache for all our data, that's self-defeating. So we just
1377                  * write pages one at a time. We use the ov structure to set
1378                  * the write offset, to at least save the overhead of a Seek
1379                  * system call.
1380                  */
1381                 OVERLAPPED ov;
1382                 memset(&ov, 0, sizeof(ov));
1383                 for (; i<=txn->mt_u.dirty_list[0].mid; i++) {
1384                         size_t wsize;
1385                         dp = txn->mt_u.dirty_list[i].mptr;
1386                         DPRINTF("committing page %lu", dp->mp_pgno);
1387                         size = dp->mp_pgno * env->me_psize;
1388                         ov.Offset = size & 0xffffffff;
1389                         ov.OffsetHigh = size >> 16;
1390                         ov.OffsetHigh >>= 16;
1391                         /* clear dirty flag */
1392                         dp->mp_flags &= ~P_DIRTY;
1393                         wsize = env->me_psize;
1394                         if (IS_OVERFLOW(dp)) wsize *= dp->mp_pages;
1395                         rc = WriteFile(env->me_fd, dp, wsize, NULL, &ov);
1396                         if (!rc) {
1397                                 n = ErrCode();
1398                                 DPRINTF("WriteFile: %d", n);
1399                                 mdb_txn_abort(txn);
1400                                 return n;
1401                         }
1402                 }
1403                 done = 1;;
1404 #else
1405                 struct iovec     iov[MDB_COMMIT_PAGES];
1406                 n = 0;
1407                 done = 1;
1408                 size = 0;
1409                 for (; i<=txn->mt_u.dirty_list[0].mid; i++) {
1410                         dp = txn->mt_u.dirty_list[i].mptr;
1411                         if (dp->mp_pgno != next) {
1412                                 if (n) {
1413                                         DPRINTF("committing %u dirty pages", n);
1414                                         rc = writev(env->me_fd, iov, n);
1415                                         if (rc != size) {
1416                                                 n = ErrCode();
1417                                                 if (rc > 0)
1418                                                         DPUTS("short write, filesystem full?");
1419                                                 else
1420                                                         DPRINTF("writev: %s", strerror(n));
1421                                                 mdb_txn_abort(txn);
1422                                                 return n;
1423                                         }
1424                                         n = 0;
1425                                         size = 0;
1426                                 }
1427                                 lseek(env->me_fd, dp->mp_pgno * env->me_psize, SEEK_SET);
1428                                 next = dp->mp_pgno;
1429                         }
1430                         DPRINTF("committing page %lu", dp->mp_pgno);
1431                         iov[n].iov_len = env->me_psize;
1432                         if (IS_OVERFLOW(dp)) iov[n].iov_len *= dp->mp_pages;
1433                         iov[n].iov_base = dp;
1434                         size += iov[n].iov_len;
1435                         next = dp->mp_pgno + (IS_OVERFLOW(dp) ? dp->mp_pages : 1);
1436                         /* clear dirty flag */
1437                         dp->mp_flags &= ~P_DIRTY;
1438                         if (++n >= MDB_COMMIT_PAGES) {
1439                                 done = 0;
1440                                 i++;
1441                                 break;
1442                         }
1443                 }
1444
1445                 if (n == 0)
1446                         break;
1447
1448                 DPRINTF("committing %u dirty pages", n);
1449                 rc = writev(env->me_fd, iov, n);
1450                 if (rc != size) {
1451                         n = ErrCode();
1452                         if (rc > 0)
1453                                 DPUTS("short write, filesystem full?");
1454                         else
1455                                 DPRINTF("writev: %s", strerror(n));
1456                         mdb_txn_abort(txn);
1457                         return n;
1458                 }
1459 #endif
1460         } while (!done);
1461
1462         /* Drop the dirty pages.
1463          */
1464         for (i=1; i<=txn->mt_u.dirty_list[0].mid; i++) {
1465                 dp = txn->mt_u.dirty_list[i].mptr;
1466                 if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
1467                         dp->mp_next = txn->mt_env->me_dpages;
1468                         txn->mt_env->me_dpages = dp;
1469                 } else {
1470                         free(dp);
1471                 }
1472                 txn->mt_u.dirty_list[i].mid = 0;
1473         }
1474         txn->mt_u.dirty_list[0].mid = 0;
1475
1476         if ((n = mdb_env_sync(env, 0)) != 0 ||
1477             (n = mdb_env_write_meta(txn)) != MDB_SUCCESS) {
1478                 mdb_txn_abort(txn);
1479                 return n;
1480         }
1481
1482 done:
1483         env->me_txn = NULL;
1484         /* update the DB tables */
1485         {
1486                 int toggle = !env->me_db_toggle;
1487                 MDB_db *ip, *jp;
1488
1489                 ip = &env->me_dbs[toggle][2];
1490                 jp = &txn->mt_dbs[2];
1491                 LAZY_RWLOCK_WRLOCK(&env->me_dblock);
1492                 for (i = 2; i < txn->mt_numdbs; i++) {
1493                         if (ip->md_root != jp->md_root)
1494                                 *ip = *jp;
1495                         ip++; jp++;
1496                 }
1497
1498                 for (i = 2; i < txn->mt_numdbs; i++) {
1499                         if (txn->mt_dbxs[i].md_dirty)
1500                                 txn->mt_dbxs[i].md_dirty = 0;
1501                 }
1502                 env->me_db_toggle = toggle;
1503                 env->me_numdbs = txn->mt_numdbs;
1504                 LAZY_RWLOCK_UNLOCK(&env->me_dblock);
1505         }
1506
1507         UNLOCK_MUTEX_W(env);
1508         free(txn);
1509
1510         return MDB_SUCCESS;
1511 }
1512
1513 static int
1514 mdb_env_read_header(MDB_env *env, MDB_meta *meta)
1515 {
1516         char             page[PAGESIZE];
1517         MDB_page        *p;
1518         MDB_meta        *m;
1519         int              rc, err;
1520
1521         /* We don't know the page size yet, so use a minimum value.
1522          */
1523
1524 #ifdef _WIN32
1525         if (!ReadFile(env->me_fd, page, PAGESIZE, (DWORD *)&rc, NULL) || rc == 0)
1526 #else
1527         if ((rc = read(env->me_fd, page, PAGESIZE)) == 0)
1528 #endif
1529         {
1530                 return ENOENT;
1531         }
1532         else if (rc != PAGESIZE) {
1533                 err = ErrCode();
1534                 if (rc > 0)
1535                         err = EINVAL;
1536                 DPRINTF("read: %s", strerror(err));
1537                 return err;
1538         }
1539
1540         p = (MDB_page *)page;
1541
1542         if (!F_ISSET(p->mp_flags, P_META)) {
1543                 DPRINTF("page %lu not a meta page", p->mp_pgno);
1544                 return EINVAL;
1545         }
1546
1547         m = METADATA(p);
1548         if (m->mm_magic != MDB_MAGIC) {
1549                 DPUTS("meta has invalid magic");
1550                 return EINVAL;
1551         }
1552
1553         if (m->mm_version != MDB_VERSION) {
1554                 DPRINTF("database is version %u, expected version %u",
1555                     m->mm_version, MDB_VERSION);
1556                 return MDB_VERSION_MISMATCH;
1557         }
1558
1559         memcpy(meta, m, sizeof(*m));
1560         return 0;
1561 }
1562
1563 static int
1564 mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
1565 {
1566         MDB_page *p, *q;
1567         MDB_meta *m;
1568         int rc;
1569         unsigned int     psize;
1570
1571         DPUTS("writing new meta page");
1572
1573         GetPageSize(psize);
1574
1575         meta->mm_magic = MDB_MAGIC;
1576         meta->mm_version = MDB_VERSION;
1577         meta->mm_psize = psize;
1578         meta->mm_last_pg = 1;
1579         meta->mm_flags = env->me_flags & 0xffff;
1580         meta->mm_flags |= MDB_INTEGERKEY;
1581         meta->mm_dbs[0].md_root = P_INVALID;
1582         meta->mm_dbs[1].md_root = P_INVALID;
1583
1584         p = calloc(2, psize);
1585         p->mp_pgno = 0;
1586         p->mp_flags = P_META;
1587
1588         m = METADATA(p);
1589         memcpy(m, meta, sizeof(*meta));
1590
1591         q = (MDB_page *)((char *)p + psize);
1592
1593         q->mp_pgno = 1;
1594         q->mp_flags = P_META;
1595
1596         m = METADATA(q);
1597         memcpy(m, meta, sizeof(*meta));
1598
1599 #ifdef _WIN32
1600         {
1601                 DWORD len;
1602                 rc = WriteFile(env->me_fd, p, psize * 2, &len, NULL);
1603                 rc = (len == psize * 2) ? MDB_SUCCESS : ErrCode();
1604         }
1605 #else
1606         rc = write(env->me_fd, p, psize * 2);
1607         rc = (rc == (int)psize * 2) ? MDB_SUCCESS : ErrCode();
1608 #endif
1609         free(p);
1610         return rc;
1611 }
1612
1613 static int
1614 mdb_env_write_meta(MDB_txn *txn)
1615 {
1616         MDB_env *env;
1617         MDB_meta        meta, metab;
1618         off_t off;
1619         int rc, len, toggle;
1620         char *ptr;
1621 #ifdef _WIN32
1622         OVERLAPPED ov;
1623 #endif
1624
1625         assert(txn != NULL);
1626         assert(txn->mt_env != NULL);
1627
1628         toggle = !txn->mt_toggle;
1629         DPRINTF("writing meta page %d for root page %lu",
1630                 toggle, txn->mt_dbs[MAIN_DBI].md_root);
1631
1632         env = txn->mt_env;
1633
1634         metab.mm_txnid = env->me_metas[toggle]->mm_txnid;
1635         metab.mm_last_pg = env->me_metas[toggle]->mm_last_pg;
1636
1637         ptr = (char *)&meta;
1638         off = offsetof(MDB_meta, mm_dbs[0].md_depth);
1639         len = sizeof(MDB_meta) - off;
1640
1641         ptr += off;
1642         meta.mm_dbs[0] = txn->mt_dbs[0];
1643         meta.mm_dbs[1] = txn->mt_dbs[1];
1644         meta.mm_last_pg = txn->mt_next_pgno - 1;
1645         meta.mm_txnid = txn->mt_txnid;
1646
1647         if (toggle)
1648                 off += env->me_psize;
1649         off += PAGEHDRSZ;
1650
1651         /* Write to the SYNC fd */
1652 #ifdef _WIN32
1653         {
1654                 memset(&ov, 0, sizeof(ov));
1655                 ov.Offset = off;
1656                 WriteFile(env->me_mfd, ptr, len, (DWORD *)&rc, &ov);
1657         }
1658 #else
1659         rc = pwrite(env->me_mfd, ptr, len, off);
1660 #endif
1661         if (rc != len) {
1662                 int r2;
1663                 rc = ErrCode();
1664                 DPUTS("write failed, disk error?");
1665                 /* On a failure, the pagecache still contains the new data.
1666                  * Write some old data back, to prevent it from being used.
1667                  * Use the non-SYNC fd; we know it will fail anyway.
1668                  */
1669                 meta.mm_last_pg = metab.mm_last_pg;
1670                 meta.mm_txnid = metab.mm_txnid;
1671 #ifdef _WIN32
1672                 WriteFile(env->me_fd, ptr, len, NULL, &ov);
1673 #else
1674                 r2 = pwrite(env->me_fd, ptr, len, off);
1675 #endif
1676                 env->me_flags |= MDB_FATAL_ERROR;
1677                 return rc;
1678         }
1679         /* Memory ordering issues are irrelevant; since the entire writer
1680          * is wrapped by wmutex, all of these changes will become visible
1681          * after the wmutex is unlocked. Since the DB is multi-version,
1682          * readers will get consistent data regardless of how fresh or
1683          * how stale their view of these values is.
1684          */
1685         LAZY_MUTEX_LOCK(&env->me_txns->mti_mutex);
1686         txn->mt_env->me_txns->mti_me_toggle = toggle;
1687         txn->mt_env->me_txns->mti_txnid = txn->mt_txnid;
1688         LAZY_MUTEX_UNLOCK(&env->me_txns->mti_mutex);
1689
1690         return MDB_SUCCESS;
1691 }
1692
1693 static int
1694 mdb_env_read_meta(MDB_env *env, int *which)
1695 {
1696         int toggle = 0;
1697
1698         assert(env != NULL);
1699
1700         if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
1701                 toggle = 1;
1702
1703         DPRINTF("Using meta page %d", toggle);
1704         *which = toggle;
1705
1706         return MDB_SUCCESS;
1707 }
1708
1709 int
1710 mdb_env_create(MDB_env **env)
1711 {
1712         MDB_env *e;
1713
1714         e = calloc(1, sizeof(MDB_env));
1715         if (!e) return ENOMEM;
1716
1717         e->me_maxreaders = DEFAULT_READERS;
1718         e->me_maxdbs = 2;
1719         e->me_fd = INVALID_HANDLE_VALUE;
1720         e->me_lfd = INVALID_HANDLE_VALUE;
1721         e->me_mfd = INVALID_HANDLE_VALUE;
1722         *env = e;
1723         return MDB_SUCCESS;
1724 }
1725
1726 int
1727 mdb_env_set_mapsize(MDB_env *env, size_t size)
1728 {
1729         if (env->me_map)
1730                 return EINVAL;
1731         env->me_mapsize = size;
1732         return MDB_SUCCESS;
1733 }
1734
1735 int
1736 mdb_env_set_maxdbs(MDB_env *env, int dbs)
1737 {
1738         if (env->me_map)
1739                 return EINVAL;
1740         env->me_maxdbs = dbs;
1741         return MDB_SUCCESS;
1742 }
1743
1744 int
1745 mdb_env_set_maxreaders(MDB_env *env, int readers)
1746 {
1747         if (env->me_map)
1748                 return EINVAL;
1749         env->me_maxreaders = readers;
1750         return MDB_SUCCESS;
1751 }
1752
1753 int
1754 mdb_env_get_maxreaders(MDB_env *env, int *readers)
1755 {
1756         if (!env || !readers)
1757                 return EINVAL;
1758         *readers = env->me_maxreaders;
1759         return MDB_SUCCESS;
1760 }
1761
1762 static int
1763 mdb_env_open2(MDB_env *env, unsigned int flags)
1764 {
1765         int i, newenv = 0, toggle;
1766         MDB_meta meta;
1767         MDB_page *p;
1768
1769         env->me_flags = flags;
1770
1771         memset(&meta, 0, sizeof(meta));
1772
1773         if ((i = mdb_env_read_header(env, &meta)) != 0) {
1774                 if (i != ENOENT)
1775                         return i;
1776                 DPUTS("new mdbenv");
1777                 newenv = 1;
1778         }
1779
1780         if (!env->me_mapsize) {
1781                 env->me_mapsize = newenv ? DEFAULT_MAPSIZE : meta.mm_mapsize;
1782         }
1783
1784 #ifdef _WIN32
1785         {
1786                 HANDLE mh;
1787                 LONG sizelo, sizehi;
1788                 sizelo = env->me_mapsize & 0xffffffff;
1789                 sizehi = env->me_mapsize >> 16;         /* pointless on WIN32, only needed on W64 */
1790                 sizehi >>= 16;
1791                 /* Windows won't create mappings for zero length files.
1792                  * Just allocate the maxsize right now.
1793                  */
1794                 if (newenv) {
1795                         SetFilePointer(env->me_fd, sizelo, sizehi ? &sizehi : NULL, 0);
1796                         if (!SetEndOfFile(env->me_fd))
1797                                 return ErrCode();
1798                         SetFilePointer(env->me_fd, 0, NULL, 0);
1799                 }
1800                 mh = CreateFileMapping(env->me_fd, NULL, PAGE_READONLY,
1801                         sizehi, sizelo, NULL);
1802                 if (!mh)
1803                         return ErrCode();
1804                 env->me_map = MapViewOfFileEx(mh, FILE_MAP_READ, 0, 0, env->me_mapsize,
1805                         meta.mm_address);
1806                 CloseHandle(mh);
1807                 if (!env->me_map)
1808                         return ErrCode();
1809         }
1810 #else
1811         i = MAP_SHARED;
1812         if (meta.mm_address && (flags & MDB_FIXEDMAP))
1813                 i |= MAP_FIXED;
1814         env->me_map = mmap(meta.mm_address, env->me_mapsize, PROT_READ, i,
1815                 env->me_fd, 0);
1816         if (env->me_map == MAP_FAILED)
1817                 return ErrCode();
1818 #endif
1819
1820         if (newenv) {
1821                 meta.mm_mapsize = env->me_mapsize;
1822                 if (flags & MDB_FIXEDMAP)
1823                         meta.mm_address = env->me_map;
1824                 i = mdb_env_init_meta(env, &meta);
1825                 if (i != MDB_SUCCESS) {
1826                         munmap(env->me_map, env->me_mapsize);
1827                         return i;
1828                 }
1829         }
1830         env->me_psize = meta.mm_psize;
1831
1832         env->me_maxpg = env->me_mapsize / env->me_psize;
1833
1834         p = (MDB_page *)env->me_map;
1835         env->me_metas[0] = METADATA(p);
1836         env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize);
1837
1838         if ((i = mdb_env_read_meta(env, &toggle)) != 0)
1839                 return i;
1840
1841         DPRINTF("opened database version %u, pagesize %u",
1842             env->me_metas[toggle]->mm_version, env->me_psize);
1843         DPRINTF("depth: %u", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_depth);
1844         DPRINTF("entries: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_entries);
1845         DPRINTF("branch pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_branch_pages);
1846         DPRINTF("leaf pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_leaf_pages);
1847         DPRINTF("overflow pages: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_overflow_pages);
1848         DPRINTF("root: %lu", env->me_metas[toggle]->mm_dbs[MAIN_DBI].md_root);
1849
1850         return MDB_SUCCESS;
1851 }
1852
1853 #ifndef _WIN32
1854 /* Windows doesn't support destructor callbacks for thread-specific storage */
1855 static void
1856 mdb_env_reader_dest(void *ptr)
1857 {
1858         MDB_reader *reader = ptr;
1859
1860         reader->mr_txnid = 0;
1861         reader->mr_pid = 0;
1862         reader->mr_tid = 0;
1863 }
1864 #endif
1865
1866 /* downgrade the exclusive lock on the region back to shared */
1867 static void
1868 mdb_env_share_locks(MDB_env *env)
1869 {
1870         int toggle = 0;
1871
1872         if (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid)
1873                 toggle = 1;
1874         env->me_txns->mti_me_toggle = toggle;
1875         env->me_txns->mti_txnid = env->me_metas[toggle]->mm_txnid;
1876
1877 #ifdef _WIN32
1878         {
1879                 OVERLAPPED ov;
1880                 /* First acquire a shared lock. The Unlock will
1881                  * then release the existing exclusive lock.
1882                  */
1883                 memset(&ov, 0, sizeof(ov));
1884                 LockFileEx(env->me_lfd, 0, 0, 1, 0, &ov);
1885                 UnlockFile(env->me_lfd, 0, 0, 1, 0);
1886         }
1887 #else
1888         {
1889                 struct flock lock_info;
1890                 /* The shared lock replaces the existing lock */
1891                 memset((void *)&lock_info, 0, sizeof(lock_info));
1892                 lock_info.l_type = F_RDLCK;
1893                 lock_info.l_whence = SEEK_SET;
1894                 lock_info.l_start = 0;
1895                 lock_info.l_len = 1;
1896                 fcntl(env->me_lfd, F_SETLK, &lock_info);
1897         }
1898 #endif
1899 }
1900
1901 static int
1902 mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
1903 {
1904         int rc;
1905         off_t size, rsize;
1906
1907         *excl = 0;
1908
1909 #ifdef _WIN32
1910         if ((env->me_lfd = CreateFile(lpath, GENERIC_READ|GENERIC_WRITE,
1911                 FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS,
1912                 FILE_ATTRIBUTE_NORMAL, NULL)) == INVALID_HANDLE_VALUE) {
1913                 rc = ErrCode();
1914                 return rc;
1915         }
1916         /* Try to get exclusive lock. If we succeed, then
1917          * nobody is using the lock region and we should initialize it.
1918          */
1919         {
1920                 if (LockFile(env->me_lfd, 0, 0, 1, 0)) {
1921                         *excl = 1;
1922                 } else {
1923                         OVERLAPPED ov;
1924                         memset(&ov, 0, sizeof(ov));
1925                         if (!LockFileEx(env->me_lfd, 0, 0, 1, 0, &ov)) {
1926                                 rc = ErrCode();
1927                                 goto fail;
1928                         }
1929                 }
1930         }
1931         size = GetFileSize(env->me_lfd, NULL);
1932 #else
1933         if ((env->me_lfd = open(lpath, O_RDWR|O_CREAT, mode)) == -1) {
1934                 rc = ErrCode();
1935                 return rc;
1936         }
1937         /* Try to get exclusive lock. If we succeed, then
1938          * nobody is using the lock region and we should initialize it.
1939          */
1940         {
1941                 struct flock lock_info;
1942                 memset((void *)&lock_info, 0, sizeof(lock_info));
1943                 lock_info.l_type = F_WRLCK;
1944                 lock_info.l_whence = SEEK_SET;
1945                 lock_info.l_start = 0;
1946                 lock_info.l_len = 1;
1947                 rc = fcntl(env->me_lfd, F_SETLK, &lock_info);
1948                 if (rc == 0) {
1949                         *excl = 1;
1950                 } else {
1951                         lock_info.l_type = F_RDLCK;
1952                         rc = fcntl(env->me_lfd, F_SETLKW, &lock_info);
1953                         if (rc) {
1954                                 rc = ErrCode();
1955                                 goto fail;
1956                         }
1957                 }
1958         }
1959         size = lseek(env->me_lfd, 0, SEEK_END);
1960 #endif
1961         rsize = (env->me_maxreaders-1) * sizeof(MDB_reader) + sizeof(MDB_txninfo);
1962         if (size < rsize && *excl) {
1963 #ifdef _WIN32
1964                 SetFilePointer(env->me_lfd, rsize, NULL, 0);
1965                 if (!SetEndOfFile(env->me_lfd)) {
1966                         rc = ErrCode();
1967                         goto fail;
1968                 }
1969 #else
1970                 if (ftruncate(env->me_lfd, rsize) != 0) {
1971                         rc = ErrCode();
1972                         goto fail;
1973                 }
1974 #endif
1975         } else {
1976                 rsize = size;
1977                 size = rsize - sizeof(MDB_txninfo);
1978                 env->me_maxreaders = size/sizeof(MDB_reader) + 1;
1979         }
1980 #ifdef _WIN32
1981         {
1982                 HANDLE mh;
1983                 mh = CreateFileMapping(env->me_lfd, NULL, PAGE_READWRITE,
1984                         0, 0, NULL);
1985                 if (!mh) {
1986                         rc = ErrCode();
1987                         goto fail;
1988                 }
1989                 env->me_txns = MapViewOfFileEx(mh, FILE_MAP_WRITE, 0, 0, rsize, NULL);
1990                 CloseHandle(mh);
1991                 if (!env->me_txns) {
1992                         rc = ErrCode();
1993                         goto fail;
1994                 }
1995         }
1996 #else
1997         env->me_txns = mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
1998                 env->me_lfd, 0);
1999         if (env->me_txns == MAP_FAILED) {
2000                 rc = ErrCode();
2001                 goto fail;
2002         }
2003 #endif
2004         if (*excl) {
2005 #ifdef _WIN32
2006                 char *ptr;
2007                 if (!mdb_sec_inited) {
2008                         InitializeSecurityDescriptor(&mdb_null_sd,
2009                                 SECURITY_DESCRIPTOR_REVISION);
2010                         SetSecurityDescriptorDacl(&mdb_null_sd, TRUE, 0, FALSE);
2011                         mdb_all_sa.nLength = sizeof(SECURITY_ATTRIBUTES);
2012                         mdb_all_sa.bInheritHandle = FALSE;
2013                         mdb_all_sa.lpSecurityDescriptor = &mdb_null_sd;
2014                         mdb_sec_inited = 1;
2015                 }
2016                 /* FIXME: only using up to 20 characters of the env path here,
2017                  * probably not enough to assure uniqueness...
2018                  */
2019                 sprintf(env->me_txns->mti_rmname, "Global\\MDBr%.20s", lpath);
2020                 ptr = env->me_txns->mti_rmname + sizeof("Global\\MDBr");
2021                 while ((ptr = strchr(ptr, '\\')))
2022                         *ptr++ = '/';
2023                 env->me_rmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_rmname);
2024                 if (!env->me_rmutex) {
2025                         rc = ErrCode();
2026                         goto fail;
2027                 }
2028                 sprintf(env->me_txns->mti_rmname, "Global\\MDBw%.20s", lpath);
2029                 ptr = env->me_txns->mti_rmname + sizeof("Global\\MDBw");
2030                 while ((ptr = strchr(ptr, '\\')))
2031                         *ptr++ = '/';
2032                 env->me_wmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_rmname);
2033                 if (!env->me_wmutex) {
2034                         rc = ErrCode();
2035                         goto fail;
2036                 }
2037 #else
2038                 pthread_mutexattr_t mattr;
2039
2040                 pthread_mutexattr_init(&mattr);
2041                 rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
2042                 if (rc) {
2043                         goto fail;
2044                 }
2045                 pthread_mutex_init(&env->me_txns->mti_mutex, &mattr);
2046                 pthread_mutex_init(&env->me_txns->mti_wmutex, &mattr);
2047 #endif
2048                 env->me_txns->mti_version = MDB_VERSION;
2049                 env->me_txns->mti_magic = MDB_MAGIC;
2050                 env->me_txns->mti_txnid = 0;
2051                 env->me_txns->mti_numreaders = 0;
2052                 env->me_txns->mti_me_toggle = 0;
2053
2054         } else {
2055                 if (env->me_txns->mti_magic != MDB_MAGIC) {
2056                         DPUTS("lock region has invalid magic");
2057                         rc = EINVAL;
2058                         goto fail;
2059                 }
2060                 if (env->me_txns->mti_version != MDB_VERSION) {
2061                         DPRINTF("lock region is version %u, expected version %u",
2062                                 env->me_txns->mti_version, MDB_VERSION);
2063                         rc = MDB_VERSION_MISMATCH;
2064                         goto fail;
2065                 }
2066                 rc = ErrCode();
2067                 if (rc != EACCES && rc != EAGAIN) {
2068                         goto fail;
2069                 }
2070 #ifdef _WIN32
2071                 env->me_rmutex = OpenMutex(SYNCHRONIZE, FALSE, env->me_txns->mti_rmname);
2072                 if (!env->me_rmutex) {
2073                         rc = ErrCode();
2074                         goto fail;
2075                 }
2076                 env->me_wmutex = OpenMutex(SYNCHRONIZE, FALSE, env->me_txns->mti_wmname);
2077                 if (!env->me_wmutex) {
2078                         rc = ErrCode();
2079                         goto fail;
2080                 }
2081 #endif
2082         }
2083         return MDB_SUCCESS;
2084
2085 fail:
2086         close(env->me_lfd);
2087         env->me_lfd = INVALID_HANDLE_VALUE;
2088         return rc;
2089
2090 }
2091
2092         /** The name of the lock file in the DB environment */
2093 #define LOCKNAME        "/lock.mdb"
2094         /** The name of the data file in the DB environment */
2095 #define DATANAME        "/data.mdb"
2096 int
2097 mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
2098 {
2099         int             oflags, rc, len, excl;
2100         char *lpath, *dpath;
2101
2102         len = strlen(path);
2103         lpath = malloc(len + sizeof(LOCKNAME) + len + sizeof(DATANAME));
2104         if (!lpath)
2105                 return ENOMEM;
2106         dpath = lpath + len + sizeof(LOCKNAME);
2107         sprintf(lpath, "%s" LOCKNAME, path);
2108         sprintf(dpath, "%s" DATANAME, path);
2109
2110         rc = mdb_env_setup_locks(env, lpath, mode, &excl);
2111         if (rc)
2112                 goto leave;
2113
2114 #ifdef _WIN32
2115         if (F_ISSET(flags, MDB_RDONLY)) {
2116                 oflags = GENERIC_READ;
2117                 len = OPEN_EXISTING;
2118         } else {
2119                 oflags = GENERIC_READ|GENERIC_WRITE;
2120                 len = OPEN_ALWAYS;
2121         }
2122         mode = FILE_ATTRIBUTE_NORMAL;
2123         if ((env->me_fd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE,
2124                         NULL, len, mode, NULL)) == INVALID_HANDLE_VALUE) {
2125                 rc = ErrCode();
2126                 goto leave;
2127         }
2128 #else
2129         if (F_ISSET(flags, MDB_RDONLY))
2130                 oflags = O_RDONLY;
2131         else
2132                 oflags = O_RDWR | O_CREAT;
2133
2134         if ((env->me_fd = open(dpath, oflags, mode)) == -1) {
2135                 rc = ErrCode();
2136                 goto leave;
2137         }
2138 #endif
2139
2140         if ((rc = mdb_env_open2(env, flags)) == MDB_SUCCESS) {
2141                 /* synchronous fd for meta writes */
2142 #ifdef _WIN32
2143                 if (!(flags & (MDB_RDONLY|MDB_NOSYNC)))
2144                         mode |= FILE_FLAG_WRITE_THROUGH;
2145                 if ((env->me_mfd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE,
2146                         NULL, len, mode, NULL)) == INVALID_HANDLE_VALUE) {
2147                         rc = ErrCode();
2148                         goto leave;
2149                 }
2150 #else
2151                 if (!(flags & (MDB_RDONLY|MDB_NOSYNC)))
2152                         oflags |= MDB_DSYNC;
2153                 if ((env->me_mfd = open(dpath, oflags, mode)) == -1) {
2154                         rc = ErrCode();
2155                         goto leave;
2156                 }
2157 #endif
2158                 env->me_path = strdup(path);
2159                 DPRINTF("opened dbenv %p", (void *) env);
2160                 pthread_key_create(&env->me_txkey, mdb_env_reader_dest);
2161                 LAZY_RWLOCK_INIT(&env->me_dblock, NULL);
2162                 if (excl)
2163                         mdb_env_share_locks(env);
2164                 env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
2165                 env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db));
2166                 env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db));
2167                 env->me_numdbs = 2;
2168         }
2169
2170 leave:
2171         if (rc) {
2172                 if (env->me_fd != INVALID_HANDLE_VALUE) {
2173                         close(env->me_fd);
2174                         env->me_fd = INVALID_HANDLE_VALUE;
2175                 }
2176                 if (env->me_lfd != INVALID_HANDLE_VALUE) {
2177                         close(env->me_lfd);
2178                         env->me_lfd = INVALID_HANDLE_VALUE;
2179                 }
2180         }
2181         free(lpath);
2182         return rc;
2183 }
2184
2185 void
2186 mdb_env_close(MDB_env *env)
2187 {
2188         MDB_page *dp;
2189
2190         if (env == NULL)
2191                 return;
2192
2193         while (env->me_dpages) {
2194                 dp = env->me_dpages;
2195                 env->me_dpages = dp->mp_next;
2196                 free(dp);
2197         }
2198
2199         free(env->me_dbs[1]);
2200         free(env->me_dbs[0]);
2201         free(env->me_dbxs);
2202         free(env->me_path);
2203
2204         LAZY_RWLOCK_DESTROY(&env->me_dblock);
2205         pthread_key_delete(env->me_txkey);
2206
2207         if (env->me_map) {
2208                 munmap(env->me_map, env->me_mapsize);
2209         }
2210         close(env->me_mfd);
2211         close(env->me_fd);
2212         if (env->me_txns) {
2213                 pid_t pid = getpid();
2214                 unsigned int i;
2215                 for (i=0; i<env->me_txns->mti_numreaders; i++)
2216                         if (env->me_txns->mti_readers[i].mr_pid == pid)
2217                                 env->me_txns->mti_readers[i].mr_pid = 0;
2218                 munmap(env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
2219         }
2220         close(env->me_lfd);
2221         free(env);
2222 }
2223
2224 /* only for aligned ints */
2225 static int
2226 intcmp(const MDB_val *a, const MDB_val *b)
2227 {
2228         if (a->mv_size == sizeof(long))
2229         {
2230                 unsigned long *la, *lb;
2231                 la = a->mv_data;
2232                 lb = b->mv_data;
2233                 return *la - *lb;
2234         } else {
2235                 unsigned int *ia, *ib;
2236                 ia = a->mv_data;
2237                 ib = b->mv_data;
2238                 return *ia - *ib;
2239         }
2240 }
2241
2242 /* ints must always be the same size */
2243 static int
2244 cintcmp(const MDB_val *a, const MDB_val *b)
2245 {
2246 #if __BYTE_ORDER == __LITTLE_ENDIAN
2247         unsigned short *u, *c;
2248         int x;
2249
2250         u = a->mv_data + a->mv_size;
2251         c = b->mv_data + a->mv_size;
2252         do {
2253                 x = *--u - *--c;
2254         } while(!x && u > (unsigned short *)a->mv_data);
2255         return x;
2256 #else
2257         return memcmp(a->mv_data, b->mv_data, a->mv_size);
2258 #endif
2259 }
2260
2261 static int
2262 memncmp(const MDB_val *a, const MDB_val *b)
2263 {
2264         int diff, len_diff;
2265         unsigned int len;
2266
2267         len = a->mv_size;
2268         len_diff = a->mv_size - b->mv_size;
2269         if (len_diff > 0)
2270                 len = b->mv_size;
2271         diff = memcmp(a->mv_data, b->mv_data, len);
2272         return diff ? diff : len_diff;
2273 }
2274
2275 static int
2276 memnrcmp(const MDB_val *a, const MDB_val *b)
2277 {
2278         const unsigned char     *p1, *p2, *p1_lim;
2279         int diff, len_diff;
2280
2281         if (b->mv_size == 0)
2282                 return a->mv_size != 0;
2283         if (a->mv_size == 0)
2284                 return -1;
2285
2286         p1 = (const unsigned char *)a->mv_data + a->mv_size - 1;
2287         p2 = (const unsigned char *)b->mv_data + b->mv_size - 1;
2288
2289         len_diff = a->mv_size - b->mv_size;
2290         if (len_diff < 0)
2291                 p1_lim = p1 - a->mv_size;
2292         else
2293                 p1_lim = p1 - b->mv_size;
2294
2295         while (p1 > p1_lim) {
2296                 diff = *p1 - *p2;
2297                 if (diff)
2298                         return diff;
2299                 p1--;
2300                 p2--;
2301         }
2302         return len_diff;
2303 }
2304
2305 /* Search for key within a leaf page, using binary search.
2306  * Returns the smallest entry larger or equal to the key.
2307  * If exactp is non-null, stores whether the found entry was an exact match
2308  * in *exactp (1 or 0).
2309  * If kip is non-null, stores the index of the found entry in *kip.
2310  * If no entry larger or equal to the key is found, returns NULL.
2311  */
2312 static MDB_node *
2313 mdb_search_node(MDB_cursor *mc, MDB_val *key, int *exactp)
2314 {
2315         unsigned int     i = 0, nkeys;
2316         int              low, high;
2317         int              rc = 0;
2318         MDB_page *mp = mc->mc_pg[mc->mc_top];
2319         MDB_node        *node = NULL;
2320         MDB_val  nodekey;
2321         MDB_cmp_func *cmp;
2322         DKBUF;
2323
2324         nkeys = NUMKEYS(mp);
2325
2326         DPRINTF("searching %u keys in %s page %lu",
2327             nkeys, IS_LEAF(mp) ? "leaf" : "branch",
2328             mp->mp_pgno);
2329
2330         assert(nkeys > 0);
2331
2332         low = IS_LEAF(mp) ? 0 : 1;
2333         high = nkeys - 1;
2334         cmp = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_cmp;
2335         if (IS_LEAF2(mp)) {
2336                 nodekey.mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
2337                 node = NODEPTR(mp, 0);  /* fake */
2338         }
2339         while (low <= high) {
2340                 i = (low + high) >> 1;
2341
2342                 if (IS_LEAF2(mp)) {
2343                         nodekey.mv_data = LEAF2KEY(mp, i, nodekey.mv_size);
2344                 } else {
2345                         node = NODEPTR(mp, i);
2346
2347                         nodekey.mv_size = node->mn_ksize;
2348                         nodekey.mv_data = NODEKEY(node);
2349                 }
2350
2351                 rc = cmp(key, &nodekey);
2352
2353 #if DEBUG
2354                 if (IS_LEAF(mp))
2355                         DPRINTF("found leaf index %u [%s], rc = %i",
2356                             i, DKEY(&nodekey), rc);
2357                 else
2358                         DPRINTF("found branch index %u [%s -> %lu], rc = %i",
2359                             i, DKEY(&nodekey), NODEPGNO(node), rc);
2360 #endif
2361
2362                 if (rc == 0)
2363                         break;
2364                 if (rc > 0)
2365                         low = i + 1;
2366                 else
2367                         high = i - 1;
2368         }
2369
2370         if (rc > 0) {   /* Found entry is less than the key. */
2371                 i++;    /* Skip to get the smallest entry larger than key. */
2372                 if (!IS_LEAF2(mp))
2373                         node = NODEPTR(mp, i);
2374         }
2375         if (exactp)
2376                 *exactp = (rc == 0);
2377         /* store the key index */
2378         mc->mc_ki[mc->mc_top] = i;
2379         if (i >= nkeys)
2380                 /* There is no entry larger or equal to the key. */
2381                 return NULL;
2382
2383         /* nodeptr is fake for LEAF2 */
2384         return node;
2385 }
2386
2387 static void
2388 cursor_pop_page(MDB_cursor *mc)
2389 {
2390         MDB_page        *top;
2391
2392         if (mc->mc_snum) {
2393                 top = mc->mc_pg[mc->mc_top];
2394                 mc->mc_snum--;
2395                 if (mc->mc_snum)
2396                         mc->mc_top--;
2397
2398                 DPRINTF("popped page %lu off db %u cursor %p", top->mp_pgno,
2399                         mc->mc_dbi, (void *) mc);
2400         }
2401 }
2402
2403 static int
2404 cursor_push_page(MDB_cursor *mc, MDB_page *mp)
2405 {
2406         DPRINTF("pushing page %lu on db %u cursor %p", mp->mp_pgno,
2407                 mc->mc_dbi, (void *) mc);
2408
2409         if (mc->mc_snum >= CURSOR_STACK)
2410                 return ENOMEM;
2411
2412         mc->mc_top = mc->mc_snum++;
2413         mc->mc_pg[mc->mc_top] = mp;
2414         mc->mc_ki[mc->mc_top] = 0;
2415
2416         return MDB_SUCCESS;
2417 }
2418
2419 static int
2420 mdb_get_page(MDB_txn *txn, pgno_t pgno, MDB_page **ret)
2421 {
2422         MDB_page *p = NULL;
2423
2424         if (!F_ISSET(txn->mt_flags, MDB_TXN_RDONLY) && txn->mt_u.dirty_list[0].mid) {
2425                 unsigned x;
2426                 x = mdb_mid2l_search(txn->mt_u.dirty_list, pgno);
2427                 if (x <= txn->mt_u.dirty_list[0].mid && txn->mt_u.dirty_list[x].mid == pgno) {
2428                         p = txn->mt_u.dirty_list[x].mptr;
2429                 }
2430         }
2431         if (!p) {
2432                 if (pgno <= txn->mt_env->me_metas[txn->mt_toggle]->mm_last_pg)
2433                         p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno);
2434         }
2435         *ret = p;
2436         if (!p) {
2437                 DPRINTF("page %lu not found", pgno);
2438                 assert(p != NULL);
2439         }
2440         return (p != NULL) ? MDB_SUCCESS : MDB_PAGE_NOTFOUND;
2441 }
2442
2443 static int
2444 mdb_search_page_root(MDB_cursor *mc, MDB_val *key, int modify)
2445 {
2446         MDB_page        *mp = mc->mc_pg[mc->mc_top];
2447         DKBUF;
2448         int rc;
2449
2450
2451         while (IS_BRANCH(mp)) {
2452                 MDB_node        *node;
2453
2454                 DPRINTF("branch page %lu has %u keys", mp->mp_pgno, NUMKEYS(mp));
2455                 assert(NUMKEYS(mp) > 1);
2456                 DPRINTF("found index 0 to page %lu", NODEPGNO(NODEPTR(mp, 0)));
2457
2458                 if (key == NULL)        /* Initialize cursor to first page. */
2459                         mc->mc_ki[mc->mc_top] = 0;
2460                 else if (key->mv_size > MAXKEYSIZE && key->mv_data == NULL) {
2461                                                         /* cursor to last page */
2462                         mc->mc_ki[mc->mc_top] = NUMKEYS(mp)-1;
2463                 } else {
2464                         int      exact;
2465                         node = mdb_search_node(mc, key, &exact);
2466                         if (node == NULL)
2467                                 mc->mc_ki[mc->mc_top] = NUMKEYS(mp) - 1;
2468                         else if (!exact) {
2469                                 assert(mc->mc_ki[mc->mc_top] > 0);
2470                                 mc->mc_ki[mc->mc_top]--;
2471                         }
2472                 }
2473
2474                 if (key)
2475                         DPRINTF("following index %u for key [%s]",
2476                             mc->mc_ki[mc->mc_top], DKEY(key));
2477                 assert(mc->mc_ki[mc->mc_top] < NUMKEYS(mp));
2478                 node = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
2479
2480                 if ((rc = mdb_get_page(mc->mc_txn, NODEPGNO(node), &mp)))
2481                         return rc;
2482
2483                 if ((rc = cursor_push_page(mc, mp)))
2484                         return rc;
2485
2486                 if (modify) {
2487                         if ((rc = mdb_touch(mc)) != 0)
2488                                 return rc;
2489                         mp = mc->mc_pg[mc->mc_top];
2490                 }
2491         }
2492
2493         if (!IS_LEAF(mp)) {
2494                 DPRINTF("internal error, index points to a %02X page!?",
2495                     mp->mp_flags);
2496                 return MDB_CORRUPTED;
2497         }
2498
2499         DPRINTF("found leaf page %lu for key [%s]", mp->mp_pgno,
2500             key ? DKEY(key) : NULL);
2501
2502         return MDB_SUCCESS;
2503 }
2504
2505 /* Search for the page a given key should be in.
2506  * Pushes parent pages on the cursor stack.
2507  * If key is NULL, search for the lowest page (used by mdb_cursor_first).
2508  * If modify is true, visited pages are updated with new page numbers.
2509  */
2510 static int
2511 mdb_search_page(MDB_cursor *mc, MDB_val *key, int modify)
2512 {
2513         int              rc;
2514         pgno_t           root;
2515
2516         /* Make sure the txn is still viable, then find the root from
2517          * the txn's db table.
2518          */
2519         if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_ERROR)) {
2520                 DPUTS("transaction has failed, must abort");
2521                 return EINVAL;
2522         } else
2523                 root = mc->mc_txn->mt_dbs[mc->mc_dbi].md_root;
2524
2525         if (root == P_INVALID) {                /* Tree is empty. */
2526                 DPUTS("tree is empty");
2527                 return MDB_NOTFOUND;
2528         }
2529
2530         if ((rc = mdb_get_page(mc->mc_txn, root, &mc->mc_pg[0])))
2531                 return rc;
2532
2533         mc->mc_snum = 1;
2534         mc->mc_top = 0;
2535
2536         DPRINTF("db %u root page %lu has flags 0x%X",
2537                 mc->mc_dbi, root, mc->mc_pg[0]->mp_flags);
2538
2539         if (modify) {
2540                 /* For sub-databases, update main root first */
2541                 if (mc->mc_dbi > MAIN_DBI && !mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty) {
2542                         MDB_cursor mc2;
2543                         mc2.mc_txn = mc->mc_txn;
2544                         mc2.mc_dbi = MAIN_DBI;
2545                         rc = mdb_search_page(&mc2, &mc->mc_txn->mt_dbxs[mc->mc_dbi].md_name, 1);
2546                         if (rc)
2547                                 return rc;
2548                         mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = 1;
2549                 }
2550                 if (!F_ISSET(mc->mc_pg[0]->mp_flags, P_DIRTY)) {
2551                         if ((rc = mdb_touch(mc)))
2552                                 return rc;
2553                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = mc->mc_pg[0]->mp_pgno;
2554                 }
2555         }
2556
2557         return mdb_search_page_root(mc, key, modify);
2558 }
2559
2560 static int
2561 mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
2562 {
2563         MDB_page        *omp;           /* overflow mpage */
2564         pgno_t           pgno;
2565         int rc;
2566
2567         if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
2568                 data->mv_size = NODEDSZ(leaf);
2569                 data->mv_data = NODEDATA(leaf);
2570                 return MDB_SUCCESS;
2571         }
2572
2573         /* Read overflow data.
2574          */
2575         data->mv_size = NODEDSZ(leaf);
2576         memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
2577         if ((rc = mdb_get_page(txn, pgno, &omp))) {
2578                 DPRINTF("read overflow page %lu failed", pgno);
2579                 return rc;
2580         }
2581         data->mv_data = METADATA(omp);
2582
2583         return MDB_SUCCESS;
2584 }
2585
2586 int
2587 mdb_get(MDB_txn *txn, MDB_dbi dbi,
2588     MDB_val *key, MDB_val *data)
2589 {
2590         MDB_cursor      mc;
2591         MDB_xcursor     mx;
2592         int exact = 0;
2593         DKBUF;
2594
2595         assert(key);
2596         assert(data);
2597         DPRINTF("===> get db %u key [%s]", dbi, DKEY(key));
2598
2599         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
2600                 return EINVAL;
2601
2602         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
2603                 return EINVAL;
2604         }
2605
2606         mc.mc_txn = txn;
2607         mc.mc_dbi = dbi;
2608         mc.mc_flags = 0;
2609         if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
2610                 mc.mc_xcursor = &mx;
2611                 mdb_xcursor_init0(&mc);
2612         } else {
2613                 mc.mc_xcursor = NULL;
2614         }
2615         return mdb_cursor_set(&mc, key, data, MDB_SET, &exact);
2616 }
2617
2618 static int
2619 mdb_sibling(MDB_cursor *mc, int move_right)
2620 {
2621         int              rc;
2622         unsigned int    ptop;
2623         MDB_node        *indx;
2624         MDB_page        *mp;
2625
2626         if (mc->mc_snum < 2) {
2627                 return MDB_NOTFOUND;            /* root has no siblings */
2628         }
2629         ptop = mc->mc_top-1;
2630
2631         DPRINTF("parent page is page %lu, index %u",
2632                 mc->mc_pg[ptop]->mp_pgno, mc->mc_ki[ptop]);
2633
2634         cursor_pop_page(mc);
2635         if (move_right ? (mc->mc_ki[ptop] + 1u >= NUMKEYS(mc->mc_pg[ptop]))
2636                        : (mc->mc_ki[ptop] == 0)) {
2637                 DPRINTF("no more keys left, moving to %s sibling",
2638                     move_right ? "right" : "left");
2639                 if ((rc = mdb_sibling(mc, move_right)) != MDB_SUCCESS)
2640                         return rc;
2641         } else {
2642                 if (move_right)
2643                         mc->mc_ki[ptop]++;
2644                 else
2645                         mc->mc_ki[ptop]--;
2646                 DPRINTF("just moving to %s index key %u",
2647                     move_right ? "right" : "left", mc->mc_ki[ptop]);
2648         }
2649         assert(IS_BRANCH(mc->mc_pg[ptop]));
2650
2651         indx = NODEPTR(mc->mc_pg[ptop], mc->mc_ki[ptop]);
2652         if ((rc = mdb_get_page(mc->mc_txn, NODEPGNO(indx), &mp)))
2653                 return rc;;
2654
2655         cursor_push_page(mc, mp);
2656
2657         return MDB_SUCCESS;
2658 }
2659
2660 static int
2661 mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
2662 {
2663         MDB_page        *mp;
2664         MDB_node        *leaf;
2665         int rc;
2666
2667         if (mc->mc_flags & C_EOF) {
2668                 return MDB_NOTFOUND;
2669         }
2670
2671         assert(mc->mc_flags & C_INITIALIZED);
2672
2673         mp = mc->mc_pg[mc->mc_top];
2674
2675         if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT) {
2676                 leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
2677                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2678                         if (op == MDB_NEXT || op == MDB_NEXT_DUP) {
2679                                 rc = mdb_cursor_next(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT);
2680                                 if (op != MDB_NEXT || rc == MDB_SUCCESS)
2681                                         return rc;
2682                         }
2683                 } else {
2684                         mc->mc_xcursor->mx_cursor.mc_flags = 0;
2685                         if (op == MDB_NEXT_DUP)
2686                                 return MDB_NOTFOUND;
2687                 }
2688         }
2689
2690         DPRINTF("cursor_next: top page is %lu in cursor %p", mp->mp_pgno, (void *) mc);
2691
2692         if (mc->mc_ki[mc->mc_top] + 1u >= NUMKEYS(mp)) {
2693                 DPUTS("=====> move to next sibling page");
2694                 if (mdb_sibling(mc, 1) != MDB_SUCCESS) {
2695                         mc->mc_flags |= C_EOF;
2696                         return MDB_NOTFOUND;
2697                 }
2698                 mp = mc->mc_pg[mc->mc_top];
2699                 DPRINTF("next page is %lu, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top]);
2700         } else
2701                 mc->mc_ki[mc->mc_top]++;
2702
2703         DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
2704             mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]);
2705
2706         if (IS_LEAF2(mp)) {
2707                 key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
2708                 key->mv_data = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], key->mv_size);
2709                 return MDB_SUCCESS;
2710         }
2711
2712         assert(IS_LEAF(mp));
2713         leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
2714
2715         if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2716                 mdb_xcursor_init1(mc, leaf);
2717         }
2718         if (data) {
2719                 if ((rc = mdb_read_data(mc->mc_txn, leaf, data) != MDB_SUCCESS))
2720                         return rc;
2721
2722                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2723                         rc = mdb_cursor_first(&mc->mc_xcursor->mx_cursor, data, NULL);
2724                         if (rc != MDB_SUCCESS)
2725                                 return rc;
2726                 }
2727         }
2728
2729         MDB_SET_KEY(leaf, key);
2730         return MDB_SUCCESS;
2731 }
2732
2733 static int
2734 mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
2735 {
2736         MDB_page        *mp;
2737         MDB_node        *leaf;
2738         int rc;
2739
2740         assert(mc->mc_flags & C_INITIALIZED);
2741
2742         mp = mc->mc_pg[mc->mc_top];
2743
2744         if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT) {
2745                 leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
2746                 if (op == MDB_PREV || op == MDB_PREV_DUP) {
2747                         if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2748                                 rc = mdb_cursor_prev(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_PREV);
2749                                 if (op != MDB_PREV || rc == MDB_SUCCESS)
2750                                         return rc;
2751                         } else {
2752                                 mc->mc_xcursor->mx_cursor.mc_flags = 0;
2753                                 if (op == MDB_PREV_DUP)
2754                                         return MDB_NOTFOUND;
2755                         }
2756                 }
2757         }
2758
2759         DPRINTF("cursor_prev: top page is %lu in cursor %p", mp->mp_pgno, (void *) mc);
2760
2761         if (mc->mc_ki[mc->mc_top] == 0)  {
2762                 DPUTS("=====> move to prev sibling page");
2763                 if (mdb_sibling(mc, 0) != MDB_SUCCESS) {
2764                         mc->mc_flags &= ~C_INITIALIZED;
2765                         return MDB_NOTFOUND;
2766                 }
2767                 mp = mc->mc_pg[mc->mc_top];
2768                 mc->mc_ki[mc->mc_top] = NUMKEYS(mp) - 1;
2769                 DPRINTF("prev page is %lu, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top]);
2770         } else
2771                 mc->mc_ki[mc->mc_top]--;
2772
2773         mc->mc_flags &= ~C_EOF;
2774
2775         DPRINTF("==> cursor points to page %lu with %u keys, key index %u",
2776             mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]);
2777
2778         if (IS_LEAF2(mp)) {
2779                 key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
2780                 key->mv_data = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], key->mv_size);
2781                 return MDB_SUCCESS;
2782         }
2783
2784         assert(IS_LEAF(mp));
2785         leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
2786
2787         if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2788                 mdb_xcursor_init1(mc, leaf);
2789         }
2790         if (data) {
2791                 if ((rc = mdb_read_data(mc->mc_txn, leaf, data) != MDB_SUCCESS))
2792                         return rc;
2793
2794                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2795                         rc = mdb_cursor_last(&mc->mc_xcursor->mx_cursor, data, NULL);
2796                         if (rc != MDB_SUCCESS)
2797                                 return rc;
2798                 }
2799         }
2800
2801         MDB_SET_KEY(leaf, key);
2802         return MDB_SUCCESS;
2803 }
2804
2805 static int
2806 mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
2807     MDB_cursor_op op, int *exactp)
2808 {
2809         int              rc;
2810         MDB_node        *leaf;
2811         DKBUF;
2812
2813         assert(mc);
2814         assert(key);
2815         assert(key->mv_size > 0);
2816
2817         /* See if we're already on the right page */
2818         if (mc->mc_flags & C_INITIALIZED) {
2819                 MDB_val nodekey;
2820
2821                 if (mc->mc_pg[mc->mc_top]->mp_flags & P_LEAF2) {
2822                         nodekey.mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
2823                         nodekey.mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top], 0, nodekey.mv_size);
2824                 } else {
2825                         leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0);
2826                         MDB_SET_KEY(leaf, &nodekey);
2827                 }
2828                 rc = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_cmp(key, &nodekey);
2829                 if (rc == 0) {
2830                         /* Probably happens rarely, but first node on the page
2831                          * was the one we wanted.
2832                          */
2833                         mc->mc_ki[mc->mc_top] = 0;
2834 set1:
2835                         if (exactp)
2836                                 *exactp = 1;
2837                         rc = 0;
2838                         leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
2839                         goto set3;
2840                 }
2841                 if (rc > 0) {
2842                         unsigned int i;
2843                         if (NUMKEYS(mc->mc_pg[mc->mc_top]) > 1) {
2844                                 if (mc->mc_pg[mc->mc_top]->mp_flags & P_LEAF2) {
2845                                         nodekey.mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top],
2846                                                  NUMKEYS(mc->mc_pg[mc->mc_top])-1, nodekey.mv_size);
2847                                 } else {
2848                                         leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
2849                                         MDB_SET_KEY(leaf, &nodekey);
2850                                 }
2851                                 rc = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_cmp(key, &nodekey);
2852                                 if (rc == 0) {
2853                                         /* last node was the one we wanted */
2854                                         mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top])-1;
2855                                         goto set1;
2856                                 }
2857                                 if (rc < 0) {
2858                                         /* This is definitely the right page, skip search_page */
2859                                         rc = 0;
2860                                         goto set2;
2861                                 }
2862                         }
2863                         /* If any parents have right-sibs, search.
2864                          * Otherwise, there's nothing further.
2865                          */
2866                         for (i=0; i<mc->mc_top; i++)
2867                                 if (mc->mc_ki[i] <
2868                                         NUMKEYS(mc->mc_pg[i])-1)
2869                                         break;
2870                         if (i == mc->mc_top) {
2871                                 /* There are no other pages */
2872                                 mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]);
2873                                 return MDB_NOTFOUND;
2874                         }
2875                 }
2876         }
2877
2878         rc = mdb_search_page(mc, key, 0);
2879         if (rc != MDB_SUCCESS)
2880                 return rc;
2881
2882         assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
2883
2884 set2:
2885         leaf = mdb_search_node(mc, key, exactp);
2886         if (exactp != NULL && !*exactp) {
2887                 /* MDB_SET specified and not an exact match. */
2888                 return MDB_NOTFOUND;
2889         }
2890
2891         if (leaf == NULL) {
2892                 DPUTS("===> inexact leaf not found, goto sibling");
2893                 if ((rc = mdb_sibling(mc, 1)) != MDB_SUCCESS)
2894                         return rc;              /* no entries matched */
2895                 mc->mc_ki[mc->mc_top] = 0;
2896                 assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
2897                 leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0);
2898         }
2899
2900 set3:
2901         mc->mc_flags |= C_INITIALIZED;
2902         mc->mc_flags &= ~C_EOF;
2903
2904         if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
2905                 key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
2906                 key->mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], key->mv_size);
2907                 return MDB_SUCCESS;
2908         }
2909
2910         if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2911                 mdb_xcursor_init1(mc, leaf);
2912         }
2913         if (data) {
2914                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2915                         if (op == MDB_SET || op == MDB_SET_RANGE) {
2916                                 rc = mdb_cursor_first(&mc->mc_xcursor->mx_cursor, data, NULL);
2917                         } else {
2918                                 int ex2, *ex2p;
2919                                 if (op == MDB_GET_BOTH) {
2920                                         ex2p = &ex2;
2921                                         ex2 = 0;
2922                                 } else {
2923                                         ex2p = NULL;
2924                                 }
2925                                 rc = mdb_cursor_set(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_SET_RANGE, ex2p);
2926                                 if (rc != MDB_SUCCESS)
2927                                         return rc;
2928                         }
2929                 } else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) {
2930                         MDB_val d2;
2931                         if ((rc = mdb_read_data(mc->mc_txn, leaf, &d2)) != MDB_SUCCESS)
2932                                 return rc;
2933                         rc = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dcmp(data, &d2);
2934                         if (rc) {
2935                                 if (op == MDB_GET_BOTH || rc > 0)
2936                                         return MDB_NOTFOUND;
2937                         }
2938
2939                 } else {
2940                         if ((rc = mdb_read_data(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
2941                                 return rc;
2942                 }
2943         }
2944
2945         /* The key already matches in all other cases */
2946         if (op == MDB_SET_RANGE)
2947                 MDB_SET_KEY(leaf, key);
2948         DPRINTF("==> cursor placed on key [%s]", DKEY(key));
2949
2950         return rc;
2951 }
2952
2953 static int
2954 mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data)
2955 {
2956         int              rc;
2957         MDB_node        *leaf;
2958
2959         rc = mdb_search_page(mc, NULL, 0);
2960         if (rc != MDB_SUCCESS)
2961                 return rc;
2962         assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
2963
2964         leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0);
2965         mc->mc_flags |= C_INITIALIZED;
2966         mc->mc_flags &= ~C_EOF;
2967
2968         if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
2969                 key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
2970                 key->mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top], 0, key->mv_size);
2971                 return MDB_SUCCESS;
2972         }
2973
2974         if (data) {
2975                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
2976                         mdb_xcursor_init1(mc, leaf);
2977                         rc = mdb_cursor_first(&mc->mc_xcursor->mx_cursor, data, NULL);
2978                         if (rc)
2979                                 return rc;
2980                 } else {
2981                         if (mc->mc_xcursor)
2982                                 mc->mc_xcursor->mx_cursor.mc_flags = 0;
2983                         if ((rc = mdb_read_data(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
2984                                 return rc;
2985                 }
2986         }
2987         MDB_SET_KEY(leaf, key);
2988         return MDB_SUCCESS;
2989 }
2990
2991 static int
2992 mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
2993 {
2994         int              rc;
2995         MDB_node        *leaf;
2996         MDB_val lkey;
2997
2998         lkey.mv_size = MAXKEYSIZE+1;
2999         lkey.mv_data = NULL;
3000
3001         rc = mdb_search_page(mc, &lkey, 0);
3002         if (rc != MDB_SUCCESS)
3003                 return rc;
3004         assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
3005
3006         leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
3007         mc->mc_flags |= C_INITIALIZED;
3008         mc->mc_flags &= ~C_EOF;
3009
3010         mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
3011
3012         if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
3013                 key->mv_size = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
3014                 key->mv_data = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], key->mv_size);
3015                 return MDB_SUCCESS;
3016         }
3017
3018         if (data) {
3019                 if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
3020                         mdb_xcursor_init1(mc, leaf);
3021                         rc = mdb_cursor_last(&mc->mc_xcursor->mx_cursor, data, NULL);
3022                         if (rc)
3023                                 return rc;
3024                 } else {
3025                         if ((rc = mdb_read_data(mc->mc_txn, leaf, data)) != MDB_SUCCESS)
3026                                 return rc;
3027                 }
3028         }
3029
3030         MDB_SET_KEY(leaf, key);
3031         return MDB_SUCCESS;
3032 }
3033
3034 int
3035 mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
3036     MDB_cursor_op op)
3037 {
3038         int              rc;
3039         int              exact = 0;
3040
3041         assert(mc);
3042
3043         switch (op) {
3044         case MDB_GET_BOTH:
3045         case MDB_GET_BOTH_RANGE:
3046                 if (data == NULL || mc->mc_xcursor == NULL) {
3047                         rc = EINVAL;
3048                         break;
3049                 }
3050                 /* FALLTHRU */
3051         case MDB_SET:
3052         case MDB_SET_RANGE:
3053                 if (key == NULL || key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
3054                         rc = EINVAL;
3055                 } else if (op == MDB_SET_RANGE)
3056                         rc = mdb_cursor_set(mc, key, data, op, NULL);
3057                 else
3058                         rc = mdb_cursor_set(mc, key, data, op, &exact);
3059                 break;
3060         case MDB_GET_MULTIPLE:
3061                 if (data == NULL ||
3062                         !(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPFIXED) ||
3063                         !(mc->mc_flags & C_INITIALIZED)) {
3064                         rc = EINVAL;
3065                         break;
3066                 }
3067                 rc = MDB_SUCCESS;
3068                 if (!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED) ||
3069                         (mc->mc_xcursor->mx_cursor.mc_flags & C_EOF))
3070                         break;
3071                 goto fetchm;
3072         case MDB_NEXT_MULTIPLE:
3073                 if (data == NULL ||
3074                         !(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPFIXED)) {
3075                         rc = EINVAL;
3076                         break;
3077                 }
3078                 if (!(mc->mc_flags & C_INITIALIZED))
3079                         rc = mdb_cursor_first(mc, key, data);
3080                 else
3081                         rc = mdb_cursor_next(mc, key, data, MDB_NEXT_DUP);
3082                 if (rc == MDB_SUCCESS) {
3083                         if (mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED) {
3084                                 MDB_cursor *mx;
3085 fetchm:
3086                                 mx = &mc->mc_xcursor->mx_cursor;
3087                                 data->mv_size = NUMKEYS(mx->mc_pg[mx->mc_top]) *
3088                                         mx->mc_txn->mt_dbs[mx->mc_dbi].md_pad;
3089                                 data->mv_data = METADATA(mx->mc_pg[mx->mc_top]);
3090                                 mx->mc_ki[mx->mc_top] = NUMKEYS(mx->mc_pg[mx->mc_top])-1;
3091                         } else {
3092                                 rc = MDB_NOTFOUND;
3093                         }
3094                 }
3095                 break;
3096         case MDB_NEXT:
3097         case MDB_NEXT_DUP:
3098         case MDB_NEXT_NODUP:
3099                 if (!(mc->mc_flags & C_INITIALIZED))
3100                         rc = mdb_cursor_first(mc, key, data);
3101                 else
3102                         rc = mdb_cursor_next(mc, key, data, op);
3103                 break;
3104         case MDB_PREV:
3105         case MDB_PREV_DUP:
3106         case MDB_PREV_NODUP:
3107                 if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF))
3108                         rc = mdb_cursor_last(mc, key, data);
3109                 else
3110                         rc = mdb_cursor_prev(mc, key, data, op);
3111                 break;
3112         case MDB_FIRST:
3113                 rc = mdb_cursor_first(mc, key, data);
3114                 break;
3115         case MDB_FIRST_DUP:
3116                 if (data == NULL ||
3117                         !(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT) ||
3118                         !(mc->mc_flags & C_INITIALIZED) ||
3119                         !(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) {
3120                         rc = EINVAL;
3121                         break;
3122                 }
3123                 rc = mdb_cursor_first(&mc->mc_xcursor->mx_cursor, data, NULL);
3124                 break;
3125         case MDB_LAST:
3126                 rc = mdb_cursor_last(mc, key, data);
3127                 break;
3128         case MDB_LAST_DUP:
3129                 if (data == NULL ||
3130                         !(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT) ||
3131                         !(mc->mc_flags & C_INITIALIZED) ||
3132                         !(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) {
3133                         rc = EINVAL;
3134                         break;
3135                 }
3136                 rc = mdb_cursor_last(&mc->mc_xcursor->mx_cursor, data, NULL);
3137                 break;
3138         default:
3139                 DPRINTF("unhandled/unimplemented cursor operation %u", op);
3140                 rc = EINVAL;
3141                 break;
3142         }
3143
3144         return rc;
3145 }
3146
3147 static int
3148 mdb_cursor_touch(MDB_cursor *mc)
3149 {
3150         int rc;
3151
3152         if (mc->mc_dbi > MAIN_DBI && !mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty) {
3153                 MDB_cursor mc2;
3154                 mc2.mc_txn = mc->mc_txn;
3155                 mc2.mc_dbi = MAIN_DBI;
3156                 rc = mdb_search_page(&mc2, &mc->mc_txn->mt_dbxs[mc->mc_dbi].md_name, 1);
3157                 if (rc) return rc;
3158                 mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = 1;
3159         }
3160         for (mc->mc_top = 0; mc->mc_top < mc->mc_snum; mc->mc_top++) {
3161                 if (!F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY)) {
3162                         rc = mdb_touch(mc);
3163                         if (rc) return rc;
3164                         if (!mc->mc_top) {
3165                                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_root =
3166                                         mc->mc_pg[mc->mc_top]->mp_pgno;
3167                         }
3168                 }
3169         }
3170         mc->mc_top = mc->mc_snum-1;
3171         return MDB_SUCCESS;
3172 }
3173
3174 int
3175 mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
3176     unsigned int flags)
3177 {
3178         MDB_node        *leaf;
3179         MDB_val xdata, *rdata, dkey;
3180         MDB_db dummy;
3181         char dbuf[PAGESIZE];
3182         int do_sub = 0;
3183         size_t nsize;
3184         DKBUF;
3185         int rc, rc2;
3186
3187         if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
3188                 return EACCES;
3189
3190         DPRINTF("==> put db %u key [%s], size %zu, data size %zu",
3191                 mc->mc_dbi, DKEY(key), key->mv_size, data->mv_size);
3192
3193         dkey.mv_size = 0;
3194
3195         if (flags == MDB_CURRENT) {
3196                 if (!(mc->mc_flags & C_INITIALIZED))
3197                         return EINVAL;
3198                 rc = MDB_SUCCESS;
3199         } else if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_root == P_INVALID) {
3200                 MDB_page *np;
3201                 /* new database, write a root leaf page */
3202                 DPUTS("allocating new root leaf page");
3203                 if ((np = mdb_new_page(mc, P_LEAF, 1)) == NULL) {
3204                         return ENOMEM;
3205                 }
3206                 mc->mc_snum = 0;
3207                 cursor_push_page(mc, np);
3208                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = np->mp_pgno;
3209                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth++;
3210                 mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = 1;
3211                 if ((mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
3212                         == MDB_DUPFIXED)
3213                         np->mp_flags |= P_LEAF2;
3214                 mc->mc_flags |= C_INITIALIZED;
3215                 rc = MDB_NOTFOUND;
3216                 goto top;
3217         } else {
3218                 int exact = 0;
3219                 rc = mdb_cursor_set(mc, key, NULL, MDB_SET, &exact);
3220                 if (flags == MDB_NOOVERWRITE && rc == 0) {
3221                         DPRINTF("duplicate key [%s]", DKEY(key));
3222                         return MDB_KEYEXIST;
3223                 }
3224                 if (rc && rc != MDB_NOTFOUND)
3225                         return rc;
3226         }
3227
3228         /* Cursor is positioned, now make sure all pages are writable */
3229         rc2 = mdb_cursor_touch(mc);
3230         if (rc2) return rc2;
3231
3232 top:
3233         /* The key already exists */
3234         if (rc == MDB_SUCCESS) {
3235                 /* there's only a key anyway, so this is a no-op */
3236                 if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
3237                         unsigned int ksize = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
3238                         if (key->mv_size != ksize)
3239                                 return EINVAL;
3240                         if (flags == MDB_CURRENT) {
3241                                 char *ptr = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], ksize);
3242                                 memcpy(ptr, key->mv_data, ksize);
3243                         }
3244                         return MDB_SUCCESS;
3245                 }
3246
3247                 leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
3248
3249                 /* DB has dups? */
3250                 if (F_ISSET(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags, MDB_DUPSORT)) {
3251                         /* Was a single item before, must convert now */
3252                         if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
3253                                 dkey.mv_size = NODEDSZ(leaf);
3254                                 dkey.mv_data = dbuf;
3255                                 memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
3256                                 /* data matches, ignore it */
3257                                 if (!mdb_dcmp(mc->mc_txn, mc->mc_dbi, data, &dkey))
3258                                         return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
3259                                 memset(&dummy, 0, sizeof(dummy));
3260                                 if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPFIXED) {
3261                                         dummy.md_pad = data->mv_size;
3262                                         dummy.md_flags = MDB_DUPFIXED;
3263                                         if (mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_INTEGERDUP)
3264                                                 dummy.md_flags |= MDB_INTEGERKEY;
3265                                 }
3266                                 dummy.md_root = P_INVALID;
3267                                 if (dkey.mv_size == sizeof(MDB_db)) {
3268                                         memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
3269                                         goto put_sub;
3270                                 }
3271                                 mdb_del_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
3272                                 do_sub = 1;
3273                                 rdata = &xdata;
3274                                 xdata.mv_size = sizeof(MDB_db);
3275                                 xdata.mv_data = &dummy;
3276                                 goto new_sub;
3277                         }
3278                         goto put_sub;
3279                 }
3280                 /* same size, just replace it */
3281                 if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
3282                         NODEDSZ(leaf) == data->mv_size) {
3283                         memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
3284                         goto done;
3285                 }
3286                 mdb_del_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
3287         } else {
3288                 DPRINTF("inserting key at index %i", mc->mc_ki[mc->mc_top]);
3289         }
3290
3291         rdata = data;
3292
3293 new_sub:
3294         nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(mc->mc_txn->mt_env, key, rdata);
3295         if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
3296                 rc = mdb_split(mc, key, rdata, P_INVALID);
3297         } else {
3298                 /* There is room already in this leaf page. */
3299                 rc = mdb_add_node(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, 0);
3300         }
3301
3302         if (rc != MDB_SUCCESS)
3303                 mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
3304         else {
3305                 /* Remember if we just added a subdatabase */
3306                 if (flags & F_SUBDATA) {
3307                         leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
3308                         leaf->mn_flags |= F_SUBDATA;
3309                 }
3310
3311                 /* Now store the actual data in the child DB. Note that we're
3312                  * storing the user data in the keys field, so there are strict
3313                  * size limits on dupdata. The actual data fields of the child
3314                  * DB are all zero size.
3315                  */
3316                 if (do_sub) {
3317                         leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
3318 put_sub:
3319                         if (flags == MDB_CURRENT)
3320                                 mdb_xcursor_init2(mc);
3321                         else
3322                                 mdb_xcursor_init1(mc, leaf);
3323                         xdata.mv_size = 0;
3324                         xdata.mv_data = "";
3325                         if (flags == MDB_NODUPDATA)
3326                                 flags = MDB_NOOVERWRITE;
3327                         /* converted, write the original data first */
3328                         if (dkey.mv_size) {
3329                                 rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, flags);
3330                                 if (rc) return rc;
3331                                 leaf->mn_flags |= F_DUPDATA;
3332                         }
3333                         rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, flags);
3334                         mdb_xcursor_fini(mc);
3335                         memcpy(NODEDATA(leaf),
3336                                 &mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi],
3337                                 sizeof(MDB_db));
3338                 }
3339                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_entries++;
3340         }
3341 done:
3342         return rc;
3343 }
3344
3345 int
3346 mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
3347 {
3348         MDB_node        *leaf;
3349         int rc;
3350
3351         if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
3352                 return EACCES;
3353
3354         if (!mc->mc_flags & C_INITIALIZED)
3355                 return EINVAL;
3356
3357         rc = mdb_cursor_touch(mc);
3358         if (rc) return rc;
3359
3360         leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
3361
3362         if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
3363                 if (flags != MDB_NODUPDATA) {
3364                         mdb_xcursor_init2(mc);
3365                         rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
3366                         mdb_xcursor_fini(mc);
3367                         /* If sub-DB still has entries, we're done */
3368                         if (mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_root
3369                                 != P_INVALID) {
3370                                 memcpy(NODEDATA(leaf),
3371                                         &mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi],
3372                                         sizeof(MDB_db));
3373                                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_entries--;
3374                                 return rc;
3375                         }
3376                         /* otherwise fall thru and delete the sub-DB */
3377                 }
3378
3379                 /* add all the child DB's pages to the free list */
3380                 rc = mdb_search_page(&mc->mc_xcursor->mx_cursor, NULL, 0);
3381                 if (rc == MDB_SUCCESS) {
3382                         MDB_node *ni;
3383                         MDB_cursor *mx;
3384                         unsigned int i;
3385
3386                         mx = &mc->mc_xcursor->mx_cursor;
3387                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_entries -=
3388                                 mx->mc_txn->mt_dbs[mx->mc_dbi].md_entries;
3389
3390                         cursor_pop_page(mx);
3391                         if (mx->mc_snum) {
3392                                 while (mx->mc_snum > 1) {
3393                                         for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
3394                                                 pgno_t pg;
3395                                                 ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
3396                                                 pg = NODEPGNO(ni);
3397                                                 /* free it */
3398                                                 mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
3399                                         }
3400                                         rc = mdb_sibling(mx, 1);
3401                                         if (rc) break;
3402                                 }
3403                         }
3404                         /* free it */
3405                         mdb_midl_append(mc->mc_txn->mt_free_pgs,
3406                                 mx->mc_txn->mt_dbs[mx->mc_dbi].md_root);
3407                 }
3408         }
3409
3410         return mdb_del0(mc, leaf);
3411 }
3412
3413 /* Allocate a page and initialize it
3414  */
3415 static MDB_page *
3416 mdb_new_page(MDB_cursor *mc, uint32_t flags, int num)
3417 {
3418         MDB_page        *np;
3419
3420         if ((np = mdb_alloc_page(mc, num)) == NULL)
3421                 return NULL;
3422         DPRINTF("allocated new mpage %lu, page size %u",
3423             np->mp_pgno, mc->mc_txn->mt_env->me_psize);
3424         np->mp_flags = flags | P_DIRTY;
3425         np->mp_lower = PAGEHDRSZ;
3426         np->mp_upper = mc->mc_txn->mt_env->me_psize;
3427
3428         if (IS_BRANCH(np))
3429                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_branch_pages++;
3430         else if (IS_LEAF(np))
3431                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_leaf_pages++;
3432         else if (IS_OVERFLOW(np)) {
3433                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_overflow_pages += num;
3434                 np->mp_pages = num;
3435         }
3436
3437         return np;
3438 }
3439
3440 static size_t
3441 mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data)
3442 {
3443         size_t           sz;
3444
3445         sz = LEAFSIZE(key, data);
3446         if (data->mv_size >= env->me_psize / MDB_MINKEYS) {
3447                 /* put on overflow page */
3448                 sz -= data->mv_size - sizeof(pgno_t);
3449         }
3450         sz += sz & 1;
3451
3452         return sz + sizeof(indx_t);
3453 }
3454
3455 static size_t
3456 mdb_branch_size(MDB_env *env, MDB_val *key)
3457 {
3458         size_t           sz;
3459
3460         sz = INDXSIZE(key);
3461         if (sz >= env->me_psize / MDB_MINKEYS) {
3462                 /* put on overflow page */
3463                 /* not implemented */
3464                 /* sz -= key->size - sizeof(pgno_t); */
3465         }
3466
3467         return sz + sizeof(indx_t);
3468 }
3469
3470 static int
3471 mdb_add_node(MDB_cursor *mc, indx_t indx,
3472     MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags)
3473 {
3474         unsigned int     i;
3475         size_t           node_size = NODESIZE;
3476         indx_t           ofs;
3477         MDB_node        *node;
3478         MDB_page        *mp = mc->mc_pg[mc->mc_top];
3479         MDB_page        *ofp = NULL;            /* overflow page */
3480         DKBUF;
3481
3482         assert(mp->mp_upper >= mp->mp_lower);
3483
3484         DPRINTF("add to %s page %lu index %i, data size %zu key size %zu [%s]",
3485             IS_LEAF(mp) ? "leaf" : "branch",
3486             mp->mp_pgno, indx, data ? data->mv_size : 0,
3487                 key ? key->mv_size : 0, key ? DKEY(key) : NULL);
3488
3489         if (IS_LEAF2(mp)) {
3490                 /* Move higher keys up one slot. */
3491                 int ksize = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad, dif;
3492                 char *ptr = LEAF2KEY(mp, indx, ksize);
3493                 dif = NUMKEYS(mp) - indx;
3494                 if (dif > 0)
3495                         memmove(ptr+ksize, ptr, dif*ksize);
3496                 /* insert new key */
3497                 memcpy(ptr, key->mv_data, ksize);
3498
3499                 /* Just using these for counting */
3500                 mp->mp_lower += sizeof(indx_t);
3501                 mp->mp_upper -= ksize - sizeof(indx_t);
3502                 return MDB_SUCCESS;
3503         }
3504
3505         if (key != NULL)
3506                 node_size += key->mv_size;
3507
3508         if (IS_LEAF(mp)) {
3509                 assert(data);
3510                 if (F_ISSET(flags, F_BIGDATA)) {
3511                         /* Data already on overflow page. */
3512                         node_size += sizeof(pgno_t);
3513                 } else if (data->mv_size >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
3514                         int ovpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
3515                         /* Put data on overflow page. */
3516                         DPRINTF("data size is %zu, put on overflow page",
3517                             data->mv_size);
3518                         node_size += sizeof(pgno_t);
3519                         if ((ofp = mdb_new_page(mc, P_OVERFLOW, ovpages)) == NULL)
3520                                 return ENOMEM;
3521                         DPRINTF("allocated overflow page %lu", ofp->mp_pgno);
3522                         flags |= F_BIGDATA;
3523                 } else {
3524                         node_size += data->mv_size;
3525                 }
3526         }
3527         node_size += node_size & 1;
3528
3529         if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
3530                 DPRINTF("not enough room in page %lu, got %u ptrs",
3531                     mp->mp_pgno, NUMKEYS(mp));
3532                 DPRINTF("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower,
3533                     mp->mp_upper - mp->mp_lower);
3534                 DPRINTF("node size = %zu", node_size);
3535                 return ENOSPC;
3536         }
3537
3538         /* Move higher pointers up one slot. */
3539         for (i = NUMKEYS(mp); i > indx; i--)
3540                 mp->mp_ptrs[i] = mp->mp_ptrs[i - 1];
3541
3542         /* Adjust free space offsets. */
3543         ofs = mp->mp_upper - node_size;
3544         assert(ofs >= mp->mp_lower + sizeof(indx_t));
3545         mp->mp_ptrs[indx] = ofs;
3546         mp->mp_upper = ofs;
3547         mp->mp_lower += sizeof(indx_t);
3548
3549         /* Write the node data. */
3550         node = NODEPTR(mp, indx);
3551         node->mn_ksize = (key == NULL) ? 0 : key->mv_size;
3552         node->mn_flags = flags;
3553         if (IS_LEAF(mp))
3554                 SETDSZ(node,data->mv_size);
3555         else
3556                 SETPGNO(node,pgno);
3557
3558         if (key)
3559                 memcpy(NODEKEY(node), key->mv_data, key->mv_size);
3560
3561         if (IS_LEAF(mp)) {
3562                 assert(key);
3563                 if (ofp == NULL) {
3564                         if (F_ISSET(flags, F_BIGDATA))
3565                                 memcpy(node->mn_data + key->mv_size, data->mv_data,
3566                                     sizeof(pgno_t));
3567                         else
3568                                 memcpy(node->mn_data + key->mv_size, data->mv_data,
3569                                     data->mv_size);
3570                 } else {
3571                         memcpy(node->mn_data + key->mv_size, &ofp->mp_pgno,
3572                             sizeof(pgno_t));
3573                         memcpy(METADATA(ofp), data->mv_data, data->mv_size);
3574                 }
3575         }
3576
3577         return MDB_SUCCESS;
3578 }
3579
3580 static void
3581 mdb_del_node(MDB_page *mp, indx_t indx, int ksize)
3582 {
3583         unsigned int     sz;
3584         indx_t           i, j, numkeys, ptr;
3585         MDB_node        *node;
3586         char            *base;
3587
3588         DPRINTF("delete node %u on %s page %lu", indx,
3589             IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno);
3590         assert(indx < NUMKEYS(mp));
3591
3592         if (IS_LEAF2(mp)) {
3593                 int x = NUMKEYS(mp) - 1 - indx;
3594                 base = LEAF2KEY(mp, indx, ksize);
3595                 if (x)
3596                         memmove(base, base + ksize, x * ksize);
3597                 mp->mp_lower -= sizeof(indx_t);
3598                 mp->mp_upper += ksize - sizeof(indx_t);
3599                 return;
3600         }
3601
3602         node = NODEPTR(mp, indx);
3603         sz = NODESIZE + node->mn_ksize;
3604         if (IS_LEAF(mp)) {
3605                 if (F_ISSET(node->mn_flags, F_BIGDATA))
3606                         sz += sizeof(pgno_t);
3607                 else
3608                         sz += NODEDSZ(node);
3609         }
3610         sz += sz & 1;
3611
3612         ptr = mp->mp_ptrs[indx];
3613         numkeys = NUMKEYS(mp);
3614         for (i = j = 0; i < numkeys; i++) {
3615                 if (i != indx) {
3616                         mp->mp_ptrs[j] = mp->mp_ptrs[i];
3617                         if (mp->mp_ptrs[i] < ptr)
3618                                 mp->mp_ptrs[j] += sz;
3619                         j++;
3620                 }
3621         }
3622
3623         base = (char *)mp + mp->mp_upper;
3624         memmove(base + sz, base, ptr - mp->mp_upper);
3625
3626         mp->mp_lower -= sizeof(indx_t);
3627         mp->mp_upper += sz;
3628 }
3629
3630 static void
3631 mdb_xcursor_init0(MDB_cursor *mc)
3632 {
3633         MDB_xcursor *mx = mc->mc_xcursor;
3634         MDB_dbi dbn;
3635
3636         mx->mx_txn = *mc->mc_txn;
3637         mx->mx_txn.mt_dbxs = mx->mx_dbxs;
3638         mx->mx_txn.mt_dbs = mx->mx_dbs;
3639         mx->mx_dbxs[0] = mc->mc_txn->mt_dbxs[0];
3640         mx->mx_dbxs[1] = mc->mc_txn->mt_dbxs[1];
3641         if (mc->mc_dbi > 1) {
3642                 mx->mx_dbxs[2] = mc->mc_txn->mt_dbxs[mc->mc_dbi];
3643                 dbn = 2;
3644         } else {
3645                 dbn = 1;
3646         }
3647         mx->mx_dbxs[dbn+1].md_parent = dbn;
3648         mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
3649         mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
3650         mx->mx_dbxs[dbn+1].md_dirty = 0;
3651         mx->mx_txn.mt_numdbs = dbn+2;
3652         mx->mx_txn.mt_u = mc->mc_txn->mt_u;
3653
3654         mx->mx_cursor.mc_xcursor = NULL;
3655         mx->mx_cursor.mc_txn = &mx->mx_txn;
3656         mx->mx_cursor.mc_dbi = dbn+1;
3657 }
3658
3659 static void
3660 mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
3661 {
3662         MDB_db *db = NODEDATA(node);
3663         MDB_xcursor *mx = mc->mc_xcursor;
3664         MDB_dbi dbn;
3665         mx->mx_dbs[0] = mc->mc_txn->mt_dbs[0];
3666         mx->mx_dbs[1] = mc->mc_txn->mt_dbs[1];
3667         if (mc->mc_dbi > 1) {
3668                 mx->mx_dbs[2] = mc->mc_txn->mt_dbs[mc->mc_dbi];
3669                 mx->mx_dbxs[2].md_dirty = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty;
3670                 dbn = 3;
3671         } else {
3672                 dbn = 2;
3673         }
3674         DPRINTF("Sub-db %u for db %u root page %lu", dbn, mc->mc_dbi, db->md_root);
3675         mx->mx_dbs[dbn] = *db;
3676         if (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY))
3677                 mx->mx_dbxs[dbn].md_dirty = 1;
3678         mx->mx_dbxs[dbn].md_name.mv_data = NODEKEY(node);
3679         mx->mx_dbxs[dbn].md_name.mv_size = node->mn_ksize;
3680         mx->mx_txn.mt_next_pgno = mc->mc_txn->mt_next_pgno;
3681         mx->mx_cursor.mc_snum = 0;
3682         mx->mx_cursor.mc_flags = 0;
3683 }
3684
3685 static void
3686 mdb_xcursor_init2(MDB_cursor *mc)
3687 {
3688         MDB_xcursor *mx = mc->mc_xcursor;
3689         MDB_dbi dbn;
3690         mx->mx_dbs[0] = mc->mc_txn->mt_dbs[0];
3691         mx->mx_dbs[1] = mc->mc_txn->mt_dbs[1];
3692         if (mc->mc_dbi > 1) {
3693                 mx->mx_dbs[2] = mc->mc_txn->mt_dbs[mc->mc_dbi];
3694                 mx->mx_dbxs[2].md_dirty = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty;
3695                 dbn = 3;
3696         } else {
3697                 dbn = 2;
3698         }
3699         DPRINTF("Sub-db %u for db %u root page %lu", dbn, mc->mc_dbi,
3700                 mx->mx_dbs[dbn].md_root);
3701         mx->mx_txn.mt_next_pgno = mc->mc_txn->mt_next_pgno;
3702 }
3703
3704 static void
3705 mdb_xcursor_fini(MDB_cursor *mc)
3706 {
3707         MDB_xcursor *mx = mc->mc_xcursor;
3708         mc->mc_txn->mt_next_pgno = mx->mx_txn.mt_next_pgno;
3709         mc->mc_txn->mt_dbs[0] = mx->mx_dbs[0];
3710         mc->mc_txn->mt_dbs[1] = mx->mx_dbs[1];
3711         if (mc->mc_dbi > 1) {
3712                 mc->mc_txn->mt_dbs[mc->mc_dbi] = mx->mx_dbs[2];
3713                 mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = mx->mx_dbxs[2].md_dirty;
3714         }
3715 }
3716
3717 int
3718 mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
3719 {
3720         MDB_cursor      *mc;
3721         size_t size = sizeof(MDB_cursor);
3722
3723         if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
3724                 return EINVAL;
3725
3726         if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
3727                 size += sizeof(MDB_xcursor);
3728
3729         if ((mc = calloc(1, size)) != NULL) {
3730                 mc->mc_dbi = dbi;
3731                 mc->mc_txn = txn;
3732                 if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
3733                         MDB_xcursor *mx = (MDB_xcursor *)(mc + 1);
3734                         mc->mc_xcursor = mx;
3735                         mdb_xcursor_init0(mc);
3736                 }
3737         } else {
3738                 return ENOMEM;
3739         }
3740
3741         *ret = mc;
3742
3743         return MDB_SUCCESS;
3744 }
3745
3746 /* Return the count of duplicate data items for the current key */
3747 int
3748 mdb_cursor_count(MDB_cursor *mc, unsigned long *countp)
3749 {
3750         MDB_node        *leaf;
3751
3752         if (mc == NULL || countp == NULL)
3753                 return EINVAL;
3754
3755         if (!(mc->mc_txn->mt_dbs[mc->mc_dbi].md_flags & MDB_DUPSORT))
3756                 return EINVAL;
3757
3758         leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
3759         if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
3760                 *countp = 1;
3761         } else {
3762                 if (!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED))
3763                         return EINVAL;
3764
3765                 *countp = mc->mc_xcursor->mx_txn.mt_dbs[mc->mc_xcursor->mx_cursor.mc_dbi].md_entries;
3766         }
3767         return MDB_SUCCESS;
3768 }
3769
3770 void
3771 mdb_cursor_close(MDB_cursor *mc)
3772 {
3773         if (mc != NULL) {
3774                 free(mc);
3775         }
3776 }
3777
3778 static int
3779 mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
3780 {
3781         indx_t                   ptr, i, numkeys;
3782         int                      delta;
3783         size_t                   len;
3784         MDB_node                *node;
3785         char                    *base;
3786         DKBUF;
3787
3788         node = NODEPTR(mp, indx);
3789         ptr = mp->mp_ptrs[indx];
3790         DPRINTF("update key %u (ofs %u) [%.*s] to [%s] on page %lu",
3791             indx, ptr,
3792             (int)node->mn_ksize, (char *)NODEKEY(node),
3793                 DKEY(key),
3794             mp->mp_pgno);
3795
3796         delta = key->mv_size - node->mn_ksize;
3797         if (delta) {
3798                 if (delta > 0 && SIZELEFT(mp) < delta) {
3799                         DPRINTF("OUCH! Not enough room, delta = %d", delta);
3800                         return ENOSPC;
3801                 }
3802
3803                 numkeys = NUMKEYS(mp);
3804                 for (i = 0; i < numkeys; i++) {
3805                         if (mp->mp_ptrs[i] <= ptr)
3806                                 mp->mp_ptrs[i] -= delta;
3807                 }
3808
3809                 base = (char *)mp + mp->mp_upper;
3810                 len = ptr - mp->mp_upper + NODESIZE;
3811                 memmove(base - delta, base, len);
3812                 mp->mp_upper -= delta;
3813
3814                 node = NODEPTR(mp, indx);
3815                 node->mn_ksize = key->mv_size;
3816         }
3817
3818         memcpy(NODEKEY(node), key->mv_data, key->mv_size);
3819
3820         return MDB_SUCCESS;
3821 }
3822
3823 /* Move a node from csrc to cdst.
3824  */
3825 static int
3826 mdb_move_node(MDB_cursor *csrc, MDB_cursor *cdst)
3827 {
3828         int                      rc;
3829         MDB_node                *srcnode;
3830         MDB_val          key, data;
3831         DKBUF;
3832
3833         /* Mark src and dst as dirty. */
3834         if ((rc = mdb_touch(csrc)) ||
3835             (rc = mdb_touch(cdst)))
3836                 return rc;;
3837
3838         if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
3839                 srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);        /* fake */
3840                 key.mv_size = csrc->mc_txn->mt_dbs[csrc->mc_dbi].md_pad;
3841                 key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
3842                 data.mv_size = 0;
3843                 data.mv_data = NULL;
3844         } else {
3845                 if (csrc->mc_ki[csrc->mc_top] == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
3846                         unsigned int snum = csrc->mc_snum;
3847                         /* must find the lowest key below src */
3848                         mdb_search_page_root(csrc, NULL, 0);
3849                         srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
3850                         csrc->mc_snum = snum--;
3851                         csrc->mc_top = snum;
3852                 } else {
3853                         srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]);
3854                 }
3855                 key.mv_size = NODEKSZ(srcnode);
3856                 key.mv_data = NODEKEY(srcnode);
3857                 data.mv_size = NODEDSZ(srcnode);
3858                 data.mv_data = NODEDATA(srcnode);
3859         }
3860         DPRINTF("moving %s node %u [%s] on page %lu to node %u on page %lu",
3861             IS_LEAF(csrc->mc_pg[csrc->mc_top]) ? "leaf" : "branch",
3862             csrc->mc_ki[csrc->mc_top],
3863                 DKEY(&key),
3864             csrc->mc_pg[csrc->mc_top]->mp_pgno,
3865             cdst->mc_ki[cdst->mc_top], cdst->mc_pg[cdst->mc_top]->mp_pgno);
3866
3867         /* Add the node to the destination page.
3868          */
3869         rc = mdb_add_node(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, NODEPGNO(srcnode),
3870             srcnode->mn_flags);
3871         if (rc != MDB_SUCCESS)
3872                 return rc;
3873
3874         /* Delete the node from the source page.
3875          */
3876         mdb_del_node(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
3877
3878         /* Update the parent separators.
3879          */
3880         if (csrc->mc_ki[csrc->mc_top] == 0) {
3881                 if (csrc->mc_ki[csrc->mc_top-1] != 0) {
3882                         if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
3883                                 key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
3884                         } else {
3885                                 srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]);
3886                                 key.mv_size = NODEKSZ(srcnode);
3887                                 key.mv_data = NODEKEY(srcnode);
3888                         }
3889                         DPRINTF("update separator for source page %lu to [%s]",
3890                                 csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key));
3891                         if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1],
3892                                 &key)) != MDB_SUCCESS)
3893                                 return rc;
3894                 }
3895                 if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
3896                         MDB_val  nullkey;
3897                         nullkey.mv_size = 0;
3898                         assert(mdb_update_key(csrc->mc_pg[csrc->mc_top], 0, &nullkey) == MDB_SUCCESS);
3899                 }
3900         }
3901
3902         if (cdst->mc_ki[cdst->mc_top] == 0) {
3903                 if (cdst->mc_ki[cdst->mc_top-1] != 0) {
3904                         if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
3905                                 key.mv_data = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, key.mv_size);
3906                         } else {
3907                                 srcnode = NODEPTR(cdst->mc_pg[cdst->mc_top], 0);
3908                                 key.mv_size = NODEKSZ(srcnode);
3909                                 key.mv_data = NODEKEY(srcnode);
3910                         }
3911                         DPRINTF("update separator for destination page %lu to [%s]",
3912                                 cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key));
3913                         if ((rc = mdb_update_key(cdst->mc_pg[cdst->mc_top-1], cdst->mc_ki[cdst->mc_top-1],
3914                                 &key)) != MDB_SUCCESS)
3915                                 return rc;
3916                 }
3917                 if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) {
3918                         MDB_val  nullkey;
3919                         nullkey.mv_size = 0;
3920                         assert(mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &nullkey) == MDB_SUCCESS);
3921                 }
3922         }
3923
3924         return MDB_SUCCESS;
3925 }
3926
3927 static int
3928 mdb_merge(MDB_cursor *csrc, MDB_cursor *cdst)
3929 {
3930         int                      rc;
3931         indx_t                   i, j;
3932         MDB_node                *srcnode;
3933         MDB_val          key, data;
3934
3935         DPRINTF("merging page %lu into %lu", csrc->mc_pg[csrc->mc_top]->mp_pgno, cdst->mc_pg[cdst->mc_top]->mp_pgno);
3936
3937         assert(csrc->mc_snum > 1);      /* can't merge root page */
3938         assert(cdst->mc_snum > 1);
3939
3940         /* Mark dst as dirty. */
3941         if ((rc = mdb_touch(cdst)))
3942                 return rc;
3943
3944         /* Move all nodes from src to dst.
3945          */
3946         j = NUMKEYS(cdst->mc_pg[cdst->mc_top]);
3947         if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
3948                 key.mv_size = csrc->mc_txn->mt_dbs[csrc->mc_dbi].md_pad;
3949                 key.mv_data = METADATA(csrc->mc_pg[csrc->mc_top]);
3950                 for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) {
3951                         rc = mdb_add_node(cdst, j, &key, NULL, 0, 0);
3952                         if (rc != MDB_SUCCESS)
3953                                 return rc;
3954                         key.mv_data = (char *)key.mv_data + key.mv_size;
3955                 }
3956         } else {
3957                 for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) {
3958                         srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], i);
3959
3960                         key.mv_size = srcnode->mn_ksize;
3961                         key.mv_data = NODEKEY(srcnode);
3962                         data.mv_size = NODEDSZ(srcnode);
3963                         data.mv_data = NODEDATA(srcnode);
3964                         rc = mdb_add_node(cdst, j, &key, &data, NODEPGNO(srcnode), srcnode->mn_flags);
3965                         if (rc != MDB_SUCCESS)
3966                                 return rc;
3967                 }
3968         }
3969
3970         DPRINTF("dst page %lu now has %u keys (%.1f%% filled)",
3971             cdst->mc_pg[cdst->mc_top]->mp_pgno, NUMKEYS(cdst->mc_pg[cdst->mc_top]), (float)PAGEFILL(cdst->mc_txn->mt_env, cdst->mc_pg[cdst->mc_top]) / 10);
3972
3973         /* Unlink the src page from parent and add to free list.
3974          */
3975         mdb_del_node(csrc->mc_pg[csrc->mc_top-1], csrc->mc_ki[csrc->mc_top-1], 0);
3976         if (csrc->mc_ki[csrc->mc_top-1] == 0) {
3977                 key.mv_size = 0;
3978                 if ((rc = mdb_update_key(csrc->mc_pg[csrc->mc_top-1], 0, &key)) != MDB_SUCCESS)
3979                         return rc;
3980         }
3981
3982         mdb_midl_append(csrc->mc_txn->mt_free_pgs, csrc->mc_pg[csrc->mc_top]->mp_pgno);
3983         if (IS_LEAF(csrc->mc_pg[csrc->mc_top]))
3984                 csrc->mc_txn->mt_dbs[csrc->mc_dbi].md_leaf_pages--;
3985         else
3986                 csrc->mc_txn->mt_dbs[csrc->mc_dbi].md_branch_pages--;
3987         cursor_pop_page(csrc);
3988
3989         return mdb_rebalance(csrc);
3990 }
3991
3992 static void
3993 mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst)
3994 {
3995         unsigned int i;
3996
3997         cdst->mc_txn = csrc->mc_txn;
3998         cdst->mc_dbi = csrc->mc_dbi;
3999         cdst->mc_snum = csrc->mc_snum;
4000         cdst->mc_top = csrc->mc_top;
4001         cdst->mc_flags = csrc->mc_flags;
4002
4003         for (i=0; i<csrc->mc_snum; i++) {
4004                 cdst->mc_pg[i] = csrc->mc_pg[i];
4005                 cdst->mc_ki[i] = csrc->mc_ki[i];
4006         }
4007 }
4008
4009 static int
4010 mdb_rebalance(MDB_cursor *mc)
4011 {
4012         MDB_node        *node;
4013         MDB_page        *root;
4014         int rc;
4015         unsigned int ptop;
4016         MDB_cursor      mn;
4017
4018         DPRINTF("rebalancing %s page %lu (has %u keys, %.1f%% full)",
4019             IS_LEAF(mc->mc_pg[mc->mc_top]) ? "leaf" : "branch",
4020             mc->mc_pg[mc->mc_top]->mp_pgno, NUMKEYS(mc->mc_pg[mc->mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) / 10);
4021
4022         if (PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) >= FILL_THRESHOLD) {
4023                 DPRINTF("no need to rebalance page %lu, above fill threshold",
4024                     mc->mc_pg[mc->mc_top]->mp_pgno);
4025                 return MDB_SUCCESS;
4026         }
4027
4028         if (mc->mc_snum < 2) {
4029                 if (NUMKEYS(mc->mc_pg[mc->mc_top]) == 0) {
4030                         DPUTS("tree is completely empty");
4031                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = P_INVALID;
4032                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth = 0;
4033                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_leaf_pages = 0;
4034                         mdb_midl_append(mc->mc_txn->mt_free_pgs, mc->mc_pg[mc->mc_top]->mp_pgno);
4035                 } else if (IS_BRANCH(mc->mc_pg[mc->mc_top]) && NUMKEYS(mc->mc_pg[mc->mc_top]) == 1) {
4036                         DPUTS("collapsing root page!");
4037                         mdb_midl_append(mc->mc_txn->mt_free_pgs, mc->mc_pg[mc->mc_top]->mp_pgno);
4038                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = NODEPGNO(NODEPTR(mc->mc_pg[mc->mc_top], 0));
4039                         if ((rc = mdb_get_page(mc->mc_txn, mc->mc_txn->mt_dbs[mc->mc_dbi].md_root, &root)))
4040                                 return rc;
4041                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth--;
4042                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_branch_pages--;
4043                 } else
4044                         DPUTS("root page doesn't need rebalancing");
4045                 return MDB_SUCCESS;
4046         }
4047
4048         /* The parent (branch page) must have at least 2 pointers,
4049          * otherwise the tree is invalid.
4050          */
4051         ptop = mc->mc_top-1;
4052         assert(NUMKEYS(mc->mc_pg[ptop]) > 1);
4053
4054         /* Leaf page fill factor is below the threshold.
4055          * Try to move keys from left or right neighbor, or
4056          * merge with a neighbor page.
4057          */
4058
4059         /* Find neighbors.
4060          */
4061         mdb_cursor_copy(mc, &mn);
4062         mn.mc_xcursor = NULL;
4063
4064         if (mc->mc_ki[ptop] == 0) {
4065                 /* We're the leftmost leaf in our parent.
4066                  */
4067                 DPUTS("reading right neighbor");
4068                 mn.mc_ki[ptop]++;
4069                 node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
4070                 if ((rc = mdb_get_page(mc->mc_txn, NODEPGNO(node), &mn.mc_pg[mn.mc_top])))
4071                         return rc;
4072                 mn.mc_ki[mn.mc_top] = 0;
4073                 mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]);
4074         } else {
4075                 /* There is at least one neighbor to the left.
4076                  */
4077                 DPUTS("reading left neighbor");
4078                 mn.mc_ki[ptop]--;
4079                 node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
4080                 if ((rc = mdb_get_page(mc->mc_txn, NODEPGNO(node), &mn.mc_pg[mn.mc_top])))
4081                         return rc;
4082                 mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1;
4083                 mc->mc_ki[mc->mc_top] = 0;
4084         }
4085
4086         DPRINTF("found neighbor page %lu (%u keys, %.1f%% full)",
4087             mn.mc_pg[mn.mc_top]->mp_pgno, NUMKEYS(mn.mc_pg[mn.mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) / 10);
4088
4089         /* If the neighbor page is above threshold and has at least two
4090          * keys, move one key from it.
4091          *
4092          * Otherwise we should try to merge them.
4093          */
4094         if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
4095                 return mdb_move_node(&mn, mc);
4096         else { /* FIXME: if (has_enough_room()) */
4097                 if (mc->mc_ki[ptop] == 0)
4098                         return mdb_merge(&mn, mc);
4099                 else
4100                         return mdb_merge(mc, &mn);
4101         }
4102 }
4103
4104 static int
4105 mdb_del0(MDB_cursor *mc, MDB_node *leaf)
4106 {
4107         int rc;
4108
4109         /* add overflow pages to free list */
4110         if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_BIGDATA)) {
4111                 int i, ovpages;
4112                 pgno_t pg;
4113
4114                 memcpy(&pg, NODEDATA(leaf), sizeof(pg));
4115                 ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
4116                 for (i=0; i<ovpages; i++) {
4117                         DPRINTF("freed ov page %lu", pg);
4118                         mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
4119                         pg++;
4120                 }
4121         }
4122         mdb_del_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad);
4123         mc->mc_txn->mt_dbs[mc->mc_dbi].md_entries--;
4124         rc = mdb_rebalance(mc);
4125         if (rc != MDB_SUCCESS)
4126                 mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
4127
4128         return rc;
4129 }
4130
4131 int
4132 mdb_del(MDB_txn *txn, MDB_dbi dbi,
4133     MDB_val *key, MDB_val *data)
4134 {
4135         MDB_cursor mc;
4136         MDB_xcursor mx;
4137         MDB_cursor_op op;
4138         MDB_val rdata, *xdata;
4139         int              rc, exact;
4140         DKBUF;
4141
4142         assert(key != NULL);
4143
4144         DPRINTF("====> delete db %u key [%s]", dbi, DKEY(key));
4145
4146         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
4147                 return EINVAL;
4148
4149         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
4150                 return EACCES;
4151         }
4152
4153         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
4154                 return EINVAL;
4155         }
4156
4157         mc.mc_txn = txn;
4158         mc.mc_dbi = dbi;
4159         mc.mc_flags = 0;
4160         if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
4161                 mc.mc_xcursor = &mx;
4162                 mdb_xcursor_init0(&mc);
4163         } else {
4164                 mc.mc_xcursor = NULL;
4165         }
4166
4167         exact = 0;
4168         if (data) {
4169                 op = MDB_GET_BOTH;
4170                 rdata = *data;
4171                 xdata = &rdata;
4172         } else {
4173                 op = MDB_SET;
4174                 xdata = NULL;
4175         }
4176         rc = mdb_cursor_set(&mc, key, xdata, op, &exact);
4177         if (rc == 0)
4178                 rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA);
4179         return rc;
4180 }
4181
4182 /* Split page <mc->top>, and insert <key,(data|newpgno)> in either left or
4183  * right sibling, at index <mc->ki> (as if unsplit). Updates mc->top and
4184  * mc->ki with the actual values after split, ie if mc->top and mc->ki
4185  * refer to a node in the new right sibling page.
4186  */
4187 static int
4188 mdb_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno)
4189 {
4190         uint8_t          flags;
4191         int              rc = MDB_SUCCESS, ins_new = 0;
4192         indx_t           newindx;
4193         pgno_t           pgno = 0;
4194         unsigned int     i, j, split_indx, nkeys, pmax;
4195         MDB_node        *node;
4196         MDB_val  sepkey, rkey, rdata;
4197         MDB_page        *copy;
4198         MDB_page        *mp, *rp, *pp;
4199         unsigned int ptop;
4200         MDB_cursor      mn;
4201         DKBUF;
4202
4203         mp = mc->mc_pg[mc->mc_top];
4204         newindx = mc->mc_ki[mc->mc_top];
4205
4206         DPRINTF("-----> splitting %s page %lu and adding [%s] at index %i",
4207             IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno,
4208             DKEY(newkey), mc->mc_ki[mc->mc_top]);
4209
4210         if (mc->mc_snum < 2) {
4211                 if ((pp = mdb_new_page(mc, P_BRANCH, 1)) == NULL)
4212                         return ENOMEM;
4213                 /* shift current top to make room for new parent */
4214                 mc->mc_pg[1] = mc->mc_pg[0];
4215                 mc->mc_ki[1] = mc->mc_ki[0];
4216                 mc->mc_pg[0] = pp;
4217                 mc->mc_ki[0] = 0;
4218                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = pp->mp_pgno;
4219                 DPRINTF("root split! new root = %lu", pp->mp_pgno);
4220                 mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth++;
4221
4222                 /* Add left (implicit) pointer. */
4223                 if ((rc = mdb_add_node(mc, 0, NULL, NULL, mp->mp_pgno, 0)) != MDB_SUCCESS) {
4224                         /* undo the pre-push */
4225                         mc->mc_pg[0] = mc->mc_pg[1];
4226                         mc->mc_ki[0] = mc->mc_ki[1];
4227                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_root = mp->mp_pgno;
4228                         mc->mc_txn->mt_dbs[mc->mc_dbi].md_depth--;
4229                         return rc;
4230                 }
4231                 mc->mc_snum = 2;
4232                 mc->mc_top = 1;
4233                 ptop = 0;
4234         } else {
4235                 ptop = mc->mc_top-1;
4236                 DPRINTF("parent branch page is %lu", mc->mc_pg[ptop]->mp_pgno);
4237         }
4238
4239         /* Create a right sibling. */
4240         if ((rp = mdb_new_page(mc, mp->mp_flags, 1)) == NULL)
4241                 return ENOMEM;
4242         mdb_cursor_copy(mc, &mn);
4243         mn.mc_pg[mn.mc_top] = rp;
4244         mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
4245         DPRINTF("new right sibling: page %lu", rp->mp_pgno);
4246
4247         nkeys = NUMKEYS(mp);
4248         split_indx = nkeys / 2 + 1;
4249
4250         if (IS_LEAF2(rp)) {
4251                 char *split, *ins;
4252                 int x;
4253                 unsigned int lsize, rsize, ksize;
4254                 /* Move half of the keys to the right sibling */
4255                 copy = NULL;
4256                 x = mc->mc_ki[mc->mc_top] - split_indx;
4257                 ksize = mc->mc_txn->mt_dbs[mc->mc_dbi].md_pad;
4258                 split = LEAF2KEY(mp, split_indx, ksize);
4259                 rsize = (nkeys - split_indx) * ksize;
4260                 lsize = (nkeys - split_indx) * sizeof(indx_t);
4261                 mp->mp_lower -= lsize;
4262                 rp->mp_lower += lsize;
4263                 mp->mp_upper += rsize - lsize;
4264                 rp->mp_upper -= rsize - lsize;
4265                 sepkey.mv_size = ksize;
4266                 if (newindx == split_indx) {
4267                         sepkey.mv_data = newkey->mv_data;
4268                 } else {
4269                         sepkey.mv_data = split;
4270                 }
4271                 if (x<0) {
4272                         ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize);
4273                         memcpy(rp->mp_ptrs, split, rsize);
4274                         sepkey.mv_data = rp->mp_ptrs;
4275                         memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize);
4276                         memcpy(ins, newkey->mv_data, ksize);
4277                         mp->mp_lower += sizeof(indx_t);
4278                         mp->mp_upper -= ksize - sizeof(indx_t);
4279                 } else {
4280                         if (x)
4281                                 memcpy(rp->mp_ptrs, split, x * ksize);
4282                         ins = LEAF2KEY(rp, x, ksize);
4283                         memcpy(ins, newkey->mv_data, ksize);
4284                         memcpy(ins+ksize, split + x * ksize, rsize - x * ksize);
4285                         rp->mp_lower += sizeof(indx_t);
4286                         rp->mp_upper -= ksize - sizeof(indx_t);
4287                         mc->mc_ki[mc->mc_top] = x;
4288                         mc->mc_pg[mc->mc_top] = rp;
4289                 }
4290                 goto newsep;
4291         }
4292
4293         /* For leaf pages, check the split point based on what
4294          * fits where, since otherwise add_node can fail.
4295          */
4296         if (IS_LEAF(mp)) {
4297                 unsigned int psize, nsize;
4298                 /* Maximum free space in an empty page */
4299                 pmax = mc->mc_txn->mt_env->me_psize - PAGEHDRSZ;
4300                 nsize = mdb_leaf_size(mc->mc_txn->mt_env, newkey, newdata);
4301                 if (newindx < split_indx) {
4302                         psize = nsize;
4303                         for (i=0; i<split_indx; i++) {
4304                                 node = NODEPTR(mp, i);
4305                                 psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
4306                                 if (F_ISSET(node->mn_flags, F_BIGDATA))
4307                                         psize += sizeof(pgno_t);
4308                                 else
4309                                         psize += NODEDSZ(node);
4310                                 psize += psize & 1;
4311                                 if (psize > pmax) {
4312                                         split_indx = i;
4313                                         break;
4314                                 }
4315                         }
4316                 } else {
4317                         psize = nsize;
4318                         for (i=nkeys-1; i>=split_indx; i--) {
4319                                 node = NODEPTR(mp, i);
4320                                 psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
4321                                 if (F_ISSET(node->mn_flags, F_BIGDATA))
4322                                         psize += sizeof(pgno_t);
4323                                 else
4324                                         psize += NODEDSZ(node);
4325                                 psize += psize & 1;
4326                                 if (psize > pmax) {
4327                                         split_indx = i+1;
4328                                         break;
4329                                 }
4330                         }
4331                 }
4332         }
4333
4334         /* First find the separating key between the split pages.
4335          */
4336         if (newindx == split_indx) {
4337                 sepkey.mv_size = newkey->mv_size;
4338                 sepkey.mv_data = newkey->mv_data;
4339         } else {
4340                 node = NODEPTR(mp, split_indx);
4341                 sepkey.mv_size = node->mn_ksize;
4342                 sepkey.mv_data = NODEKEY(node);
4343         }
4344
4345 newsep:
4346         DPRINTF("separator is [%s]", DKEY(&sepkey));
4347
4348         /* Copy separator key to the parent.
4349          */
4350         if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) {
4351                 mn.mc_snum--;
4352                 mn.mc_top--;
4353                 rc = mdb_split(&mn, &sepkey, NULL, rp->mp_pgno);
4354
4355                 /* Right page might now have changed parent.
4356                  * Check if left page also changed parent.
4357                  */
4358                 if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
4359                     mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
4360                         mc->mc_pg[ptop] = mn.mc_pg[ptop];
4361                         mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
4362                 }
4363         } else {
4364                 mn.mc_top--;
4365                 rc = mdb_add_node(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
4366                 mn.mc_top++;
4367         }
4368         if (IS_LEAF2(rp)) {
4369                 return rc;
4370         }
4371         if (rc != MDB_SUCCESS) {
4372                 return rc;
4373         }
4374
4375         /* Move half of the keys to the right sibling. */
4376
4377         /* grab a page to hold a temporary copy */
4378         if (mc->mc_txn->mt_env->me_dpages) {
4379                 copy = mc->mc_txn->mt_env->me_dpages;
4380                 mc->mc_txn->mt_env->me_dpages = copy->mp_next;
4381         } else {
4382                 if ((copy = malloc(mc->mc_txn->mt_env->me_psize)) == NULL)
4383                         return ENOMEM;
4384         }
4385
4386         copy->mp_pgno  = mp->mp_pgno;
4387         copy->mp_flags = mp->mp_flags;
4388         copy->mp_lower = PAGEHDRSZ;
4389         copy->mp_upper = mc->mc_txn->mt_env->me_psize;
4390         mc->mc_pg[mc->mc_top] = copy;
4391         for (i = j = 0; i <= nkeys; j++) {
4392                 if (i == split_indx) {
4393                 /* Insert in right sibling. */
4394                 /* Reset insert index for right sibling. */
4395                         j = (i == newindx && ins_new);
4396                         mc->mc_pg[mc->mc_top] = rp;
4397                 }
4398
4399                 if (i == newindx && !ins_new) {
4400                         /* Insert the original entry that caused the split. */
4401                         rkey.mv_data = newkey->mv_data;
4402                         rkey.mv_size = newkey->mv_size;
4403                         if (IS_LEAF(mp)) {
4404                                 rdata.mv_data = newdata->mv_data;
4405                                 rdata.mv_size = newdata->mv_size;
4406                         } else
4407                                 pgno = newpgno;
4408                         flags = 0;
4409
4410                         ins_new = 1;
4411
4412                         /* Update page and index for the new key. */
4413                         mc->mc_ki[mc->mc_top] = j;
4414                 } else if (i == nkeys) {
4415                         break;
4416                 } else {
4417                         node = NODEPTR(mp, i);
4418                         rkey.mv_data = NODEKEY(node);
4419                         rkey.mv_size = node->mn_ksize;
4420                         if (IS_LEAF(mp)) {
4421                                 rdata.mv_data = NODEDATA(node);
4422                                 rdata.mv_size = NODEDSZ(node);
4423                         } else
4424                                 pgno = NODEPGNO(node);
4425                         flags = node->mn_flags;
4426
4427                         i++;
4428                 }
4429
4430                 if (!IS_LEAF(mp) && j == 0) {
4431                         /* First branch index doesn't need key data. */
4432                         rkey.mv_size = 0;
4433                 }
4434
4435                 rc = mdb_add_node(mc, j, &rkey, &rdata, pgno, flags);
4436         }
4437
4438         /* reset back to original page */
4439         if (newindx < split_indx)
4440                 mc->mc_pg[mc->mc_top] = mp;
4441
4442         nkeys = NUMKEYS(copy);
4443         for (i=0; i<nkeys; i++)
4444                 mp->mp_ptrs[i] = copy->mp_ptrs[i];
4445         mp->mp_lower = copy->mp_lower;
4446         mp->mp_upper = copy->mp_upper;
4447         memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1),
4448                 mc->mc_txn->mt_env->me_psize - copy->mp_upper);
4449
4450         /* return tmp page to freelist */
4451         copy->mp_next = mc->mc_txn->mt_env->me_dpages;
4452         mc->mc_txn->mt_env->me_dpages = copy;
4453         return rc;
4454 }
4455
4456 int
4457 mdb_put(MDB_txn *txn, MDB_dbi dbi,
4458     MDB_val *key, MDB_val *data, unsigned int flags)
4459 {
4460         MDB_cursor mc;
4461         MDB_xcursor mx;
4462
4463         assert(key != NULL);
4464         assert(data != NULL);
4465
4466         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
4467                 return EINVAL;
4468
4469         if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
4470                 return EACCES;
4471         }
4472
4473         if (key->mv_size == 0 || key->mv_size > MAXKEYSIZE) {
4474                 return EINVAL;
4475         }
4476
4477         if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA)) != flags)
4478                 return EINVAL;
4479
4480         mc.mc_txn = txn;
4481         mc.mc_dbi = dbi;
4482         mc.mc_snum = 0;
4483         mc.mc_flags = 0;
4484         if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
4485                 mc.mc_xcursor = &mx;
4486                 mdb_xcursor_init0(&mc);
4487         } else {
4488                 mc.mc_xcursor = NULL;
4489         }
4490         return mdb_cursor_put(&mc, key, data, flags);
4491 }
4492
4493 int
4494 mdb_env_set_flags(MDB_env *env, unsigned int flag, int onoff)
4495 {
4496 #define CHANGEABLE      (MDB_NOSYNC)
4497         if ((flag & CHANGEABLE) != flag)
4498                 return EINVAL;
4499         if (onoff)
4500                 env->me_flags |= flag;
4501         else
4502                 env->me_flags &= ~flag;
4503         return MDB_SUCCESS;
4504 }
4505
4506 int
4507 mdb_env_get_flags(MDB_env *env, unsigned int *arg)
4508 {
4509         if (!env || !arg)
4510                 return EINVAL;
4511
4512         *arg = env->me_flags;
4513         return MDB_SUCCESS;
4514 }
4515
4516 int
4517 mdb_env_get_path(MDB_env *env, const char **arg)
4518 {
4519         if (!env || !arg)
4520                 return EINVAL;
4521
4522         *arg = env->me_path;
4523         return MDB_SUCCESS;
4524 }
4525
4526 static int
4527 mdb_stat0(MDB_env *env, MDB_db *db, MDB_stat *arg)
4528 {
4529         arg->ms_psize = env->me_psize;
4530         arg->ms_depth = db->md_depth;
4531         arg->ms_branch_pages = db->md_branch_pages;
4532         arg->ms_leaf_pages = db->md_leaf_pages;
4533         arg->ms_overflow_pages = db->md_overflow_pages;
4534         arg->ms_entries = db->md_entries;
4535
4536         return MDB_SUCCESS;
4537 }
4538 int
4539 mdb_env_stat(MDB_env *env, MDB_stat *arg)
4540 {
4541         int toggle;
4542
4543         if (env == NULL || arg == NULL)
4544                 return EINVAL;
4545
4546         mdb_env_read_meta(env, &toggle);
4547
4548         return mdb_stat0(env, &env->me_metas[toggle]->mm_dbs[MAIN_DBI], arg);
4549 }
4550
4551 static void
4552 mdb_default_cmp(MDB_txn *txn, MDB_dbi dbi)
4553 {
4554         if (txn->mt_dbs[dbi].md_flags & MDB_REVERSEKEY)
4555                 txn->mt_dbxs[dbi].md_cmp = memnrcmp;
4556         else if (txn->mt_dbs[dbi].md_flags & MDB_INTEGERKEY)
4557                 txn->mt_dbxs[dbi].md_cmp = cintcmp;
4558         else
4559                 txn->mt_dbxs[dbi].md_cmp = memncmp;
4560
4561         if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
4562                 if (txn->mt_dbs[dbi].md_flags & MDB_INTEGERDUP) {
4563                         if (txn->mt_dbs[dbi].md_flags & MDB_DUPFIXED)
4564                                 txn->mt_dbxs[dbi].md_dcmp = intcmp;
4565                         else
4566                                 txn->mt_dbxs[dbi].md_dcmp = cintcmp;
4567                 } else if (txn->mt_dbs[dbi].md_flags & MDB_REVERSEDUP) {
4568                         txn->mt_dbxs[dbi].md_dcmp = memnrcmp;
4569                 } else {
4570                         txn->mt_dbxs[dbi].md_dcmp = memncmp;
4571                 }
4572         } else {
4573                 txn->mt_dbxs[dbi].md_dcmp = NULL;
4574         }
4575 }
4576
4577 int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
4578 {
4579         MDB_val key, data;
4580         MDB_dbi i;
4581         int rc, dirty = 0;
4582         size_t len;
4583
4584         if (txn->mt_dbxs[FREE_DBI].md_cmp == NULL) {
4585                 mdb_default_cmp(txn, FREE_DBI);
4586         }
4587
4588         /* main DB? */
4589         if (!name) {
4590                 *dbi = MAIN_DBI;
4591                 if (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY))
4592                         txn->mt_dbs[MAIN_DBI].md_flags |= (flags & (MDB_DUPSORT|MDB_REVERSEKEY|MDB_INTEGERKEY));
4593                 mdb_default_cmp(txn, MAIN_DBI);
4594                 return MDB_SUCCESS;
4595         }
4596
4597         if (txn->mt_dbxs[MAIN_DBI].md_cmp == NULL) {
4598                 mdb_default_cmp(txn, MAIN_DBI);
4599         }
4600
4601         /* Is the DB already open? */
4602         len = strlen(name);
4603         for (i=2; i<txn->mt_numdbs; i++) {
4604                 if (len == txn->mt_dbxs[i].md_name.mv_size &&
4605                         !strncmp(name, txn->mt_dbxs[i].md_name.mv_data, len)) {
4606                         *dbi = i;
4607                         return MDB_SUCCESS;
4608                 }
4609         }
4610
4611         if (txn->mt_numdbs >= txn->mt_env->me_maxdbs - 1)
4612                 return ENFILE;
4613
4614         /* Find the DB info */
4615         key.mv_size = len;
4616         key.mv_data = (void *)name;
4617         rc = mdb_get(txn, MAIN_DBI, &key, &data);
4618
4619         /* Create if requested */
4620         if (rc == MDB_NOTFOUND && (flags & MDB_CREATE)) {
4621                 MDB_cursor mc;
4622                 MDB_db dummy;
4623                 data.mv_size = sizeof(MDB_db);
4624                 data.mv_data = &dummy;
4625                 memset(&dummy, 0, sizeof(dummy));
4626                 dummy.md_root = P_INVALID;
4627                 dummy.md_flags = flags & 0xffff;
4628                 mc.mc_txn = txn;
4629                 mc.mc_dbi = MAIN_DBI;
4630                 mc.mc_flags = 0;
4631                 rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA);
4632                 dirty = 1;
4633         }
4634
4635         /* OK, got info, add to table */
4636         if (rc == MDB_SUCCESS) {
4637                 txn->mt_dbxs[txn->mt_numdbs].md_name.mv_data = strdup(name);
4638                 txn->mt_dbxs[txn->mt_numdbs].md_name.mv_size = len;
4639                 txn->mt_dbxs[txn->mt_numdbs].md_rel = NULL;
4640                 txn->mt_dbxs[txn->mt_numdbs].md_parent = MAIN_DBI;
4641                 txn->mt_dbxs[txn->mt_numdbs].md_dirty = dirty;
4642                 memcpy(&txn->mt_dbs[txn->mt_numdbs], data.mv_data, sizeof(MDB_db));
4643                 *dbi = txn->mt_numdbs;
4644                 txn->mt_env->me_dbs[0][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
4645                 txn->mt_env->me_dbs[1][txn->mt_numdbs] = txn->mt_dbs[txn->mt_numdbs];
4646                 mdb_default_cmp(txn, txn->mt_numdbs);
4647                 txn->mt_numdbs++;
4648         }
4649
4650         return rc;
4651 }
4652
4653 int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *arg)
4654 {
4655         if (txn == NULL || arg == NULL || dbi >= txn->mt_numdbs)
4656                 return EINVAL;
4657
4658         return mdb_stat0(txn->mt_env, &txn->mt_dbs[dbi], arg);
4659 }
4660
4661 void mdb_close(MDB_txn *txn, MDB_dbi dbi)
4662 {
4663         char *ptr;
4664         if (dbi <= MAIN_DBI || dbi >= txn->mt_numdbs)
4665                 return;
4666         ptr = txn->mt_dbxs[dbi].md_name.mv_data;
4667         txn->mt_dbxs[dbi].md_name.mv_data = NULL;
4668         txn->mt_dbxs[dbi].md_name.mv_size = 0;
4669         free(ptr);
4670 }
4671
4672 int mdb_set_compare(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
4673 {
4674         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
4675                 return EINVAL;
4676
4677         txn->mt_dbxs[dbi].md_cmp = cmp;
4678         return MDB_SUCCESS;
4679 }
4680
4681 int mdb_set_dupsort(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp)
4682 {
4683         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
4684                 return EINVAL;
4685
4686         txn->mt_dbxs[dbi].md_dcmp = cmp;
4687         return MDB_SUCCESS;
4688 }
4689
4690 int mdb_set_relfunc(MDB_txn *txn, MDB_dbi dbi, MDB_rel_func *rel)
4691 {
4692         if (txn == NULL || !dbi || dbi >= txn->mt_numdbs)
4693                 return EINVAL;
4694
4695         txn->mt_dbxs[dbi].md_rel = rel;
4696         return MDB_SUCCESS;
4697 }
4698
4699 /** @} */