]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Backport of class based catalog backends into Branch-5.1.
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
37  */
38 /*
39  * This code only compiles against a recent version of libdbi. The current
40  * release found on the libdbi website (0.8.3) won't work for this code.
41  *
42  * You find the libdbi library on http://sourceforge.net/projects/libdbi
43  *
44  * A fairly recent version of libdbi from CVS works, so either make sure
45  * your distribution has a fairly recent version of libdbi installed or
46  * clone the CVS repositories from sourceforge and compile that code and
47  * install it.
48  *
49  * You need:
50  * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
52  */
53
54 #include "bacula.h"
55
56 #ifdef HAVE_DBI
57
58 #include "cats.h"
59 #include "bdb_priv.h"
60 #include <dbi/dbi.h>
61 #include <dbi/dbi-dev.h>
62 #include <bdb_dbi.h>
63
64 /* -----------------------------------------------------------------------
65  *
66  *   DBI dependent defines and subroutines
67  *
68  * -----------------------------------------------------------------------
69  */
70
71 /*
72  * List of open databases
73  */
74 static dlist *db_list = NULL;
75
76 /*
77  * Control allocated fields by dbi_getvalue
78  */
79 static dlist *dbi_getvalue_list = NULL;
80
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
82
83 typedef int (*custom_function_insert_t)(void*, const char*, int);
84 typedef char* (*custom_function_error_t)(void*);
85 typedef int (*custom_function_end_t)(void*, const char*);
86
87 B_DB_DBI::B_DB_DBI(JCR *jcr,
88                    const char *db_driver,
89                    const char *db_name,
90                    const char *db_user,
91                    const char *db_password,
92                    const char *db_address,
93                    int db_port,
94                    const char *db_socket,
95                    bool mult_db_connections,
96                    bool disable_batch_insert)
97 {
98    char *p;
99    char new_db_driver[10];
100    char db_driverdir[256];
101    DBI_FIELD_GET *field;
102
103    p = (char *)(db_driver + 4);
104    if (strcasecmp(p, "mysql") == 0) {
105       m_db_type = SQL_TYPE_MYSQL;
106       bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
107    } else if (strcasecmp(p, "postgresql") == 0) {
108       m_db_type = SQL_TYPE_POSTGRESQL;
109       bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
110    } else if (strcasecmp(p, "sqlite3") == 0) {
111       m_db_type = SQL_TYPE_SQLITE3;
112       bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
113    } else if (strcasecmp(p, "ingres") == 0) {
114       m_db_type = SQL_TYPE_INGRES;
115       bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
116    } else {
117       Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
118       return;
119    }
120
121    /*
122     * Set db_driverdir whereis is the libdbi drivers
123     */
124    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
125
126    /*
127     * Initialize the parent class members.
128     */
129    m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
130    m_db_name = bstrdup(db_name);
131    m_db_user = bstrdup(db_user);
132    if (db_password) {
133       m_db_password = bstrdup(db_password);
134    }
135    if (db_address) {
136       m_db_address = bstrdup(db_address);
137    }
138    if (db_socket) {
139       m_db_socket = bstrdup(db_socket);
140    }
141    if (db_driverdir) {
142       m_db_driverdir = bstrdup(db_driverdir);
143    }
144    m_db_driver = bstrdup(new_db_driver);
145    m_db_port = db_port;
146    if (disable_batch_insert) {
147       m_disabled_batch_insert = true;
148       m_have_batch_insert = false;
149    } else {
150       m_disabled_batch_insert = false;
151 #if defined(USE_BATCH_FILE_INSERT)
152 #ifdef HAVE_DBI_BATCH_FILE_INSERT
153       m_have_batch_insert = true;
154 #else
155       m_have_batch_insert = false;
156 #endif /* HAVE_DBI_BATCH_FILE_INSERT */
157 #else
158       m_have_batch_insert = false;
159 #endif /* USE_BATCH_FILE_INSERT */
160    }
161    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
162    *errmsg = 0;
163    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
164    cached_path = get_pool_memory(PM_FNAME);
165    cached_path_id = 0;
166    m_ref_count = 1;
167    fname = get_pool_memory(PM_FNAME);
168    path = get_pool_memory(PM_FNAME);
169    esc_name = get_pool_memory(PM_FNAME);
170    esc_path = get_pool_memory(PM_FNAME);
171    m_allow_transactions = mult_db_connections;
172
173    /*
174     * Initialize the private members.
175     */
176    m_db_handle = NULL;
177    m_result = NULL;
178    m_field_get = NULL;
179
180    /*
181     * Put the db in the list.
182     */
183    if (db_list == NULL) {
184       db_list = New(dlist(this, &this->m_link));
185       dbi_getvalue_list = New(dlist(field, &field->link));
186    }
187    db_list->append(this);
188 }
189
190 B_DB_DBI::~B_DB_DBI()
191 {
192 }
193
194 /*
195  * Now actually open the database.  This can generate errors,
196  *   which are returned in the errmsg
197  *
198  * DO NOT close the database or delete mdb here  !!!!
199  */
200 bool B_DB_DBI::db_open_database(JCR *jcr)
201 {
202    bool retval = false;
203    int errstat;
204    int dbstat;
205    uint8_t len;
206    const char *dbi_errmsg;
207    char buf[10], *port;
208    int numdrivers;
209    char *new_db_name = NULL;
210    char *new_db_dir = NULL;
211
212    P(mutex);
213    if (m_connected) {
214       retval = true;
215       goto bail_out;
216    }
217
218    if ((errstat=rwl_init(&m_lock)) != 0) {
219       berrno be;
220       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
221             be.bstrerror(errstat));
222       goto bail_out;
223    }
224
225    if (m_db_port) {
226       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
227       port = buf;
228    } else {
229       port = NULL;
230    }
231
232    numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
233    if (numdrivers < 0) {
234       Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
235                                "db_driverdir=%s. It is probaly not found any drivers\n"),
236                                m_db_driverdir,numdrivers);
237       goto bail_out;
238    }
239    m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
240    /*
241     * Can be many types of databases
242     */
243    switch (m_db_type) {
244    case SQL_TYPE_MYSQL:
245       dbi_conn_set_option(m_db_handle, "host", m_db_address);      /* default = localhost */
246       dbi_conn_set_option(m_db_handle, "port", port);              /* default port */
247       dbi_conn_set_option(m_db_handle, "username", m_db_user);     /* login name */
248       dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
249       dbi_conn_set_option(m_db_handle, "dbname", m_db_name);       /* database name */
250       break;
251    case SQL_TYPE_POSTGRESQL:
252       dbi_conn_set_option(m_db_handle, "host", m_db_address);
253       dbi_conn_set_option(m_db_handle, "port", port);
254       dbi_conn_set_option(m_db_handle, "username", m_db_user);
255       dbi_conn_set_option(m_db_handle, "password", m_db_password);
256       dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
257       break;
258    case SQL_TYPE_SQLITE3:
259       len = strlen(working_directory) + 5;
260       new_db_dir = (char *)malloc(len);
261       strcpy(new_db_dir, working_directory);
262       strcat(new_db_dir, "/");
263       len = strlen(m_db_name) + 5;
264       new_db_name = (char *)malloc(len);
265       strcpy(new_db_name, m_db_name);
266       strcat(new_db_name, ".db");
267       dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
268       dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
269       Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
270       free(new_db_dir);
271       free(new_db_name);
272       break;
273    }
274
275    /*
276     * If connection fails, try at 5 sec intervals for 30 seconds.
277     */
278    for (int retry=0; retry < 6; retry++) {
279       dbstat = dbi_conn_connect(m_db_handle);
280       if (dbstat == 0) {
281          break;
282       }
283
284       dbi_conn_error(m_db_handle, &dbi_errmsg);
285       Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
286
287       bmicrosleep(5, 0);
288    }
289
290    if (dbstat != 0 ) {
291       Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
292          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
293          m_db_driver, m_db_name, m_db_user);
294       goto bail_out;
295    }
296
297    Dmsg0(50, "dbi_real_connect done\n");
298    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
299          m_db_user, m_db_name,
300         (m_db_password == NULL) ? "(NULL)" : m_db_password);
301
302    m_connected = true;
303
304    if (!check_tables_version(jcr, this)) {
305       goto bail_out;
306    }
307
308    switch (m_db_type) {
309    case SQL_TYPE_MYSQL:
310       /*
311        * Set connection timeout to 8 days specialy for batch mode
312        */
313       sql_query("SET wait_timeout=691200");
314       sql_query("SET interactive_timeout=691200");
315       break;
316    case SQL_TYPE_POSTGRESQL:
317       /*
318        * Tell PostgreSQL we are using standard conforming strings
319        * and avoid warnings such as:
320        * WARNING:  nonstandard use of \\ in a string literal
321        */
322       sql_query("SET datestyle TO 'ISO, YMD'");
323       sql_query("SET standard_conforming_strings=on");
324       break;
325    }
326
327    retval = true;
328
329 bail_out:
330    V(mutex);
331    return retval;
332 }
333
334 void B_DB_DBI::db_close_database(JCR *jcr)
335 {
336    db_end_transaction(jcr);
337    P(mutex);
338    sql_free_result();
339    m_ref_count--;
340    if (m_ref_count == 0) {
341       db_list->remove(this);
342       if (m_connected && m_db_handle) {
343          dbi_shutdown_r(m_instance);
344          m_db_handle = NULL;
345          m_instance = NULL;
346       }
347       rwl_destroy(&m_lock);
348       free_pool_memory(errmsg);
349       free_pool_memory(cmd);
350       free_pool_memory(cached_path);
351       free_pool_memory(fname);
352       free_pool_memory(path);
353       free_pool_memory(esc_name);
354       free_pool_memory(esc_path);
355       if (m_db_driver) {
356          free(m_db_driver);
357       }
358       if (m_db_name) {
359          free(m_db_name);
360       }
361       if (m_db_user) {
362          free(m_db_user);
363       }
364       if (m_db_password) {
365          free(m_db_password);
366       }
367       if (m_db_address) {
368          free(m_db_address);
369       }
370       if (m_db_socket) {
371          free(m_db_socket);
372       }
373       if (m_db_driverdir) {
374          free(m_db_driverdir);
375       }
376       delete this;
377       if (db_list->size() == 0) {
378          delete db_list;
379          db_list = NULL;
380       }
381    }
382    V(mutex);
383 }
384
385 void B_DB_DBI::db_thread_cleanup(void)
386 {
387 }
388
389 /*
390  * Escape strings so that DBI is happy
391  *
392  *   NOTE! len is the length of the old string. Your new
393  *         string must be long enough (max 2*old+1) to hold
394  *         the escaped output.
395  *
396  * dbi_conn_quote_string_copy receives a pointer to pointer.
397  * We need copy the value of pointer to snew because libdbi change the
398  * pointer
399  */
400 void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
401 {
402    char *inew;
403    char *pnew;
404
405    if (len == 0) {
406       snew[0] = 0;
407    } else {
408       /*
409        * Correct the size of old basead in len and copy new string to inew
410        */
411       inew = (char *)malloc(sizeof(char) * len + 1);
412       bstrncpy(inew,old,len + 1);
413       /*
414        * Escape the correct size of old
415        */
416       dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
417       free(inew);
418       /*
419        * Copy the escaped string to snew
420        */
421       bstrncpy(snew, pnew, 2 * len + 1);
422    }
423
424    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
425 }
426
427 /*
428  * Escape binary object so that DBI is happy
429  * Memory is stored in B_DB struct, no need to free it
430  */
431 char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
432 {
433    size_t new_len;
434    char *pnew;
435
436    if (len == 0) {
437       esc_obj[0] = 0;
438    } else {
439       new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
440       esc_obj = check_pool_memory_size(esc_obj, new_len+1);
441       memcpy(esc_obj, pnew, new_len);
442    }
443
444    return esc_obj;
445 }
446
447 /*
448  * Unescape binary object so that DBI is happy
449  */
450 void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
451                                   POOLMEM **dest, int32_t *dest_len)
452 {
453    if (!from) {
454       *dest[0] = 0;
455       *dest_len = 0;
456       return;
457    }
458    *dest = check_pool_memory_size(*dest, expected_len+1);
459    *dest_len = expected_len;
460    memcpy(*dest, from, expected_len);
461    (*dest)[expected_len]=0;
462 }
463
464 /*
465  * Start a transaction. This groups inserts and makes things
466  * much more efficient. Usually started when inserting
467  * file attributes.
468  */
469 void B_DB_DBI::db_start_transaction(JCR *jcr)
470 {
471    if (!jcr->attr) {
472       jcr->attr = get_pool_memory(PM_FNAME);
473    }
474    if (!jcr->ar) {
475       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
476    }
477
478    switch (m_db_type) {
479    case SQL_TYPE_SQLITE3:
480       if (!m_allow_transactions) {
481          return;
482       }
483
484       db_lock(this);
485       /*
486        * Allow only 10,000 changes per transaction
487        */
488       if (m_transaction && changes > 10000) {
489          db_end_transaction(jcr);
490       }
491       if (!m_transaction) {
492          sql_query("BEGIN");  /* begin transaction */
493          Dmsg0(400, "Start SQLite transaction\n");
494          m_transaction = true;
495       }
496       db_unlock(this);
497       break;
498    case SQL_TYPE_POSTGRESQL:
499       /*
500        * This is turned off because transactions break
501        * if multiple simultaneous jobs are run.
502        */
503       if (!m_allow_transactions) {
504          return;
505       }
506
507       db_lock(this);
508       /*
509        * Allow only 25,000 changes per transaction
510        */
511       if (m_transaction && changes > 25000) {
512          db_end_transaction(jcr);
513       }
514       if (!m_transaction) {
515          sql_query("BEGIN");  /* begin transaction */
516          Dmsg0(400, "Start PosgreSQL transaction\n");
517          m_transaction = true;
518       }
519       db_unlock(this);
520       break;
521    case SQL_TYPE_INGRES:
522       if (!m_allow_transactions) {
523          return;
524       }
525
526       db_lock(this);
527       /*
528        * Allow only 25,000 changes per transaction
529        */
530       if (m_transaction && changes > 25000) {
531          db_end_transaction(jcr);
532       }
533       if (!m_transaction) {
534          sql_query("BEGIN");  /* begin transaction */
535          Dmsg0(400, "Start Ingres transaction\n");
536          m_transaction = true;
537       }
538       db_unlock(this);
539       break;
540    default:
541       break;
542    }
543 }
544
545 void B_DB_DBI::db_end_transaction(JCR *jcr)
546 {
547    if (jcr && jcr->cached_attribute) {
548       Dmsg0(400, "Flush last cached attribute.\n");
549       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
550          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
551       }
552       jcr->cached_attribute = false;
553    }
554
555    switch (m_db_type) {
556    case SQL_TYPE_SQLITE3:
557       if (!m_allow_transactions) {
558          return;
559       }
560
561       db_lock(this);
562       if (m_transaction) {
563          sql_query("COMMIT"); /* end transaction */
564          m_transaction = false;
565          Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
566       }
567       changes = 0;
568       db_unlock(this);
569       break;
570    case SQL_TYPE_POSTGRESQL:
571       if (!m_allow_transactions) {
572          return;
573       }
574
575       db_lock(this);
576       if (m_transaction) {
577          sql_query("COMMIT"); /* end transaction */
578          m_transaction = false;
579          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
580       }
581       changes = 0;
582       db_unlock(this);
583       break;
584    case SQL_TYPE_INGRES:
585       if (!m_allow_transactions) {
586          return;
587       }
588
589       db_lock(this);
590       if (m_transaction) {
591          sql_query("COMMIT"); /* end transaction */
592          m_transaction = false;
593          Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
594       }
595       changes = 0;
596       db_unlock(this);
597       break;
598    default:
599       break;
600    }
601 }
602
603 /*
604  * Submit a general SQL command (cmd), and for each row returned,
605  * the result_handler is called with the ctx.
606  */
607 bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
608 {
609    bool retval = true;
610    SQL_ROW row;
611
612    Dmsg1(500, "db_sql_query starts with %s\n", query);
613
614    db_lock(this);
615    if (!sql_query(query, QF_STORE_RESULT)) {
616       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
617       Dmsg0(500, "db_sql_query failed\n");
618       retval = false;
619       goto bail_out;
620    }
621
622    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
623
624    if (result_handler != NULL) {
625       Dmsg0(500, "db_sql_query invoking handler\n");
626       while ((row = sql_fetch_row()) != NULL) {
627          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
628          if (result_handler(ctx, m_num_fields, row))
629             break;
630       }
631       sql_free_result();
632    }
633
634    Dmsg0(500, "db_sql_query finished\n");
635
636 bail_out:
637    db_unlock(this);
638    return retval;
639 }
640
641 /*
642  * Note, if this routine returns 1 (failure), Bacula expects
643  *  that no result has been stored.
644  *
645  *  Returns:  true on success
646  *            false on failure
647  */
648 bool B_DB_DBI::sql_query(const char *query, int flags)
649 {
650    bool retval = false;
651    const char *dbi_errmsg;
652
653    Dmsg1(500, "sql_query starts with %s\n", query);
654
655    /*
656     * We are starting a new query.  reset everything.
657     */
658    m_num_rows     = -1;
659    m_row_number   = -1;
660    m_field_number = -1;
661
662    if (m_result) {
663       dbi_result_free(m_result);  /* hmm, someone forgot to free?? */
664       m_result = NULL;
665    }
666
667    m_result = (void **)dbi_conn_query(m_db_handle, query);
668
669    if (!m_result) {
670       Dmsg2(50, "Query failed: %s %p\n", query, m_result);
671       goto bail_out;
672    }
673
674    m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
675    if (m_status == DBI_ERROR_NONE) {
676       Dmsg1(500, "we have a result\n", query);
677
678       /*
679        * How many fields in the set?
680        * num_fields starting at 1
681        */
682       m_num_fields = dbi_result_get_numfields(m_result);
683       Dmsg1(500, "we have %d fields\n", m_num_fields);
684       /*
685        * If no result num_rows is 0
686        */
687       m_num_rows = dbi_result_get_numrows(m_result);
688       Dmsg1(500, "we have %d rows\n", m_num_rows);
689
690       m_status = (dbi_error_flag) 0;                  /* succeed */
691    } else {
692       Dmsg1(50, "Result status failed: %s\n", query);
693       goto bail_out;
694    }
695
696    Dmsg0(500, "sql_query finishing\n");
697    retval = true;
698    goto ok_out;
699
700 bail_out:
701    m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
702    //dbi_conn_error(m_db_handle, &dbi_errmsg);
703    Dmsg4(500, "sql_query we failed dbi error: "
704                    "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
705    dbi_result_free(m_result);
706    m_result = NULL;
707    m_status = (dbi_error_flag) 1;                   /* failed */
708
709 ok_out:
710    return retval;
711 }
712
713 void B_DB_DBI::sql_free_result(void)
714 {
715    DBI_FIELD_GET *f;
716
717    db_lock(this);
718    if (m_result) {
719       dbi_result_free(m_result);
720       m_result = NULL;
721    }
722    if (m_rows) {
723       free(m_rows);
724       m_rows = NULL;
725    }
726    /* 
727     * Now is time to free all value return by dbi_get_value
728     * this is necessary because libdbi don't free memory return by yours results
729     * and Bacula has some routine wich call more than once time sql_fetch_row
730     *
731     * Using a queue to store all pointer allocate is a good way to free all things
732     * when necessary
733     */
734    foreach_dlist(f, dbi_getvalue_list) {
735       free(f->value);
736       free(f);
737    }
738    if (m_fields) {
739       free(m_fields);
740       m_fields = NULL;
741    }
742    m_num_rows = m_num_fields = 0;
743    db_unlock(this);
744 }
745
746 /* dbi_getvalue
747  * like PQgetvalue;
748  * char *PQgetvalue(const PGresult *res,
749  *                int row_number,
750  *                int column_number);
751  *
752  * use dbi_result_seek_row to search in result set
753  * use example to return only strings
754  */
755 static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
756 {
757    char *buf = NULL;
758    const char *dbi_errmsg;
759    const char *field_name;
760    unsigned short dbitype;
761    size_t field_length;
762    int64_t num;
763
764    /* correct the index for dbi interface
765     * dbi index begins 1
766     * I prefer do not change others functions
767     */
768    Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
769          result, row_number, column_number);
770
771    column_number++;
772
773    if(row_number == 0) {
774      row_number++;
775    }
776
777    Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
778                         result, row_number, column_number);
779
780    if(dbi_result_seek_row(result, row_number)) {
781
782       field_name = dbi_result_get_field_name(result, column_number);
783       field_length = dbi_result_get_field_length(result, field_name);
784       dbitype = dbi_result_get_field_type_idx(result,column_number);
785
786       Dmsg3(500, "dbi_getvalue start: type: '%d' "
787             "field_length bytes: '%d' fieldname: '%s'\n",
788             dbitype, field_length, field_name);
789
790       if(field_length) {
791          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
792          buf = (char *)malloc(field_length + 1);
793       } else {
794          /*
795           * if numbers
796           */
797          buf = (char *)malloc(sizeof(char *) * 50);
798       }
799
800       switch (dbitype) {
801       case DBI_TYPE_INTEGER:
802          num = dbi_result_get_longlong(result, field_name);
803          edit_int64(num, buf);
804          field_length = strlen(buf);
805          break;
806       case DBI_TYPE_STRING:
807          if(field_length) {
808             field_length = bsnprintf(buf, field_length + 1, "%s",
809             dbi_result_get_string(result, field_name));
810          } else {
811             buf[0] = 0;
812          }
813          break;
814       case DBI_TYPE_BINARY:
815          /*
816           * dbi_result_get_binary return a NULL pointer if value is empty
817           * following, change this to what Bacula espected
818           */
819          if(field_length) {
820             field_length = bsnprintf(buf, field_length + 1, "%s",
821                   dbi_result_get_binary(result, field_name));
822          } else {
823             buf[0] = 0;
824          }
825          break;
826       case DBI_TYPE_DATETIME:
827          time_t last;
828          struct tm tm;
829
830          last = dbi_result_get_datetime(result, field_name);
831
832          if(last == -1) {
833                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
834          } else {
835             (void)localtime_r(&last, &tm);
836             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
837                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
838                   tm.tm_hour, tm.tm_min, tm.tm_sec);
839          }
840          break;
841       }
842
843    } else {
844       dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
845       Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
846    }
847
848    Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
849       buf, field_length, buf);
850
851    /*
852     * Don't worry about this buf
853     */
854    return buf;
855 }
856
857 SQL_ROW B_DB_DBI::sql_fetch_row(void)
858 {
859    int j;
860    SQL_ROW row = NULL; /* by default, return NULL */
861
862    Dmsg0(500, "sql_fetch_row start\n");
863    if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
864       if (m_rows) {
865          Dmsg0(500, "sql_fetch_row freeing space\n");
866          Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
867          if (m_num_rows != 0) {
868             for (j = 0; j < m_num_fields; j++) {
869                Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
870                   if (m_rows[j]) {
871                      free(m_rows[j]);
872                   }
873             }
874          }
875          free(m_rows);
876       }
877       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
878       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
879       m_rows_size = m_num_fields;
880
881       /*
882        * Now reset the row_number now that we have the space allocated
883        */
884       m_row_number = 1;
885    }
886
887    /*
888     * If still within the result set
889     */
890    if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
891       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
892       /*
893        * Get each value from this row
894        */
895       for (j = 0; j < m_num_fields; j++) {
896          m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
897          /*
898           * Allocate space to queue row
899           */
900          m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
901          /*
902           * Store the pointer in queue
903           */
904          m_field_get->value = m_rows[j];
905          Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
906                j, m_rows[j], m_field_get->value, m_rows[j]);
907          /*
908           * Insert in queue to future free
909           */
910          dbi_getvalue_list->append(m_field_get);
911       }
912       /*
913        * Increment the row number for the next call
914        */
915       m_row_number++;
916
917       row = m_rows;
918    } else {
919       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
920    }
921
922    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
923
924    return row;
925 }
926
927 const char *B_DB_DBI::sql_strerror(void)
928 {
929    const char *dbi_errmsg;
930
931    dbi_conn_error(m_db_handle, &dbi_errmsg);
932
933    return dbi_errmsg;
934 }
935
936 void B_DB_DBI::sql_data_seek(int row)
937 {
938    /*
939     * Set the row number to be returned on the next call to sql_fetch_row
940     */
941    m_row_number = row;
942 }
943
944 int B_DB_DBI::sql_affected_rows(void)
945 {
946 #if 0
947    return dbi_result_get_numrows_affected(result);
948 #else
949    return 1;
950 #endif
951 }
952
953 uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
954 {
955    char sequence[30];
956    uint64_t id = 0;
957
958    /*
959     * First execute the insert query and then retrieve the currval.
960     */
961    if (!sql_query(query)) {
962       return 0;
963    }
964
965    m_num_rows = sql_affected_rows();
966    if (m_num_rows != 1) {
967       return 0;
968    }
969
970    changes++;
971
972    /*
973     * Obtain the current value of the sequence that
974     * provides the serial value for primary key of the table.
975     *
976     * currval is local to our session.  It is not affected by
977     * other transactions.
978     *
979     * Determine the name of the sequence.
980     * PostgreSQL automatically creates a sequence using
981     * <table>_<column>_seq.
982     * At the time of writing, all tables used this format for
983     * for their primary key: <table>id
984     * Except for basefiles which has a primary key on baseid.
985     * Therefore, we need to special case that one table.
986     *
987     * everything else can use the PostgreSQL formula.
988     */
989    if (m_db_type == SQL_TYPE_POSTGRESQL) {
990       if (strcasecmp(table_name, "basefiles") == 0) {
991          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
992       } else {
993          bstrncpy(sequence, table_name, sizeof(sequence));
994          bstrncat(sequence, "_", sizeof(sequence));
995          bstrncat(sequence, table_name, sizeof(sequence));
996          bstrncat(sequence, "id", sizeof(sequence));
997       }
998
999       bstrncat(sequence, "_seq", sizeof(sequence));
1000       id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
1001    } else {
1002       id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
1003    }
1004
1005    return id;
1006 }
1007
1008 /* dbi_getisnull
1009  * like PQgetisnull
1010  * int PQgetisnull(const PGresult *res,
1011  *                 int row_number,
1012  *                 int column_number);
1013  *
1014  *  use dbi_result_seek_row to search in result set
1015  */
1016 static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1017    int i;
1018
1019    if (row_number == 0) {
1020       row_number++;
1021    }
1022
1023    column_number++;
1024
1025    if (dbi_result_seek_row(result, row_number)) {
1026       i = dbi_result_field_is_null_idx(result,column_number);
1027       return i;
1028    } else {
1029       return 0;
1030    }
1031 }
1032
1033 SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
1034 {
1035    int i, j;
1036    int dbi_index;
1037    int max_length;
1038    int this_length;
1039    char *cbuf = NULL;
1040
1041    Dmsg0(500, "sql_fetch_field starts\n");
1042
1043    if (!m_fields || m_fields_size < m_num_fields) {
1044       if (m_fields) {
1045          free(m_fields);
1046          m_fields = NULL;
1047       }
1048       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
1049       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
1050       m_fields_size = m_num_fields;
1051
1052       for (i = 0; i < m_num_fields; i++) {
1053          /*
1054           * num_fields is starting at 1, increment i by 1
1055           */
1056          dbi_index = i + 1;
1057          Dmsg1(500, "filling field %d\n", i);
1058          m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
1059          m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
1060          m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
1061
1062          /*
1063           * For a given column, find the max length.
1064           */
1065          max_length = 0;
1066          for (j = 0; j < m_num_rows; j++) {
1067             if (dbi_getisnull(m_result, j, dbi_index)) {
1068                 this_length = 4;        /* "NULL" */
1069             } else {
1070                cbuf = dbi_getvalue(m_result, j, dbi_index);
1071                this_length = cstrlen(cbuf);
1072                /*
1073                 * cbuf is always free
1074                 */
1075                free(cbuf);
1076             }
1077          
1078             if (max_length < this_length) {
1079                max_length = this_length;
1080             }
1081          }
1082          m_fields[i].max_length = max_length;
1083
1084          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
1085                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
1086       }
1087    }
1088
1089    /*
1090     * Increment field number for the next time around
1091     */
1092    return &m_fields[m_field_number++];
1093 }
1094
1095 bool B_DB_DBI::sql_field_is_not_null(int field_type)
1096 {
1097    switch (field_type) {
1098    case (1 << 0):
1099       return true;
1100    default:
1101       return false;
1102    }
1103 }
1104
1105 bool B_DB_DBI::sql_field_is_numeric(int field_type)
1106 {
1107    switch (field_type) {
1108    case 1:
1109    case 2:
1110       return true;
1111    default:
1112       return false;
1113    }
1114 }
1115
1116 /*
1117  * Escape strings so that PostgreSQL is happy on COPY
1118  *
1119  *   NOTE! len is the length of the old string. Your new
1120  *         string must be long enough (max 2*old+1) to hold
1121  *         the escaped output.
1122  */
1123 static char *postgresql_copy_escape(char *dest, char *src, size_t len)
1124 {
1125    /*
1126     * We have to escape \t, \n, \r, \
1127     */
1128    char c = '\0' ;
1129
1130    while (len > 0 && *src) {
1131       switch (*src) {
1132       case '\n':
1133          c = 'n';
1134          break;
1135       case '\\':
1136          c = '\\';
1137          break;
1138       case '\t':
1139          c = 't';
1140          break;
1141       case '\r':
1142          c = 'r';
1143          break;
1144       default:
1145          c = '\0' ;
1146       }
1147
1148       if (c) {
1149          *dest = '\\';
1150          dest++;
1151          *dest = c;
1152       } else {
1153          *dest = *src;
1154       }
1155
1156       len--;
1157       src++;
1158       dest++;
1159    }
1160
1161    *dest = '\0';
1162    return dest;
1163 }
1164
1165 /*
1166  * This can be a bit strang but is the one way to do
1167  *
1168  * Returns true if OK
1169  *         false if failed
1170  */
1171 bool B_DB_DBI::sql_batch_start(JCR *jcr)
1172 {
1173    bool retval = true;
1174    const char *query = "COPY batch FROM STDIN";
1175
1176    Dmsg0(500, "sql_batch_start started\n");
1177
1178    db_lock(this);
1179    switch (m_db_type) {
1180    case SQL_TYPE_MYSQL:
1181       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1182                              "FileIndex integer,"
1183                              "JobId integer,"
1184                              "Path blob,"
1185                              "Name blob,"
1186                              "LStat tinyblob,"
1187                              "MD5 tinyblob,"
1188                              "MarkId integer)")) {
1189          Dmsg0(500, "sql_batch_start failed\n");
1190          goto bail_out;
1191       }
1192       Dmsg0(500, "sql_batch_start finishing\n");
1193       goto ok_out;
1194    case SQL_TYPE_POSTGRESQL:
1195       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1196                              "fileindex int,"
1197                              "jobid int,"
1198                              "path varchar,"
1199                              "name varchar,"
1200                              "lstat varchar,"
1201                              "md5 varchar,"
1202                              "markid int)")) {
1203          Dmsg0(500, "sql_batch_start failed\n");
1204          goto bail_out;
1205       }
1206
1207       /*
1208        * We are starting a new query.  reset everything.
1209        */
1210       m_num_rows     = -1;
1211       m_row_number   = -1;
1212       m_field_number = -1;
1213
1214       sql_free_result();
1215
1216       for (int i=0; i < 10; i++) {
1217          sql_query(query);
1218          if (m_result) {
1219             break;
1220          }
1221          bmicrosleep(5, 0);
1222       }
1223       if (!m_result) {
1224          Dmsg1(50, "Query failed: %s\n", query);
1225          goto bail_out;
1226       }
1227
1228       m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
1229       //m_status = DBI_ERROR_NONE;
1230
1231       if (m_status == DBI_ERROR_NONE) {
1232          /*
1233           * How many fields in the set?
1234           */
1235          m_num_fields = dbi_result_get_numfields(m_result);
1236          m_num_rows = dbi_result_get_numrows(m_result);
1237          m_status = (dbi_error_flag) 1;
1238       } else {
1239          Dmsg1(50, "Result status failed: %s\n", query);
1240          goto bail_out;
1241       }
1242
1243       Dmsg0(500, "sql_batch_start finishing\n");
1244       goto ok_out;
1245    case SQL_TYPE_SQLITE3:
1246       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1247                              "FileIndex integer,"
1248                              "JobId integer,"
1249                              "Path blob,"
1250                              "Name blob,"
1251                              "LStat tinyblob,"
1252                              "MD5 tinyblob,"
1253                              "MarkId integer)")) {
1254          Dmsg0(500, "sql_batch_start failed\n");
1255          goto bail_out;
1256       }
1257       Dmsg0(500, "sql_batch_start finishing\n");
1258       goto ok_out;
1259    }
1260
1261 bail_out:
1262    Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
1263    m_status = (dbi_error_flag) 0;
1264    sql_free_result();
1265    m_result = NULL;
1266    retval = false;
1267
1268 ok_out:
1269    db_unlock(this);
1270    return retval;
1271 }
1272
1273 /*
1274  * Set error to something to abort operation
1275  */
1276 bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
1277 {
1278    int res = 0;
1279    int count = 30;
1280    int (*custom_function)(void*, const char*) = NULL;
1281    dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1282
1283    Dmsg0(500, "sql_batch_start started\n");
1284
1285    switch (m_db_type) {
1286    case SQL_TYPE_MYSQL:
1287       m_status = (dbi_error_flag) 0;
1288       break;
1289    case SQL_TYPE_POSTGRESQL:
1290       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
1291
1292       do {
1293          res = (*custom_function)(myconn->connection, error);
1294       } while (res == 0 && --count > 0);
1295
1296       if (res == 1) {
1297          Dmsg0(500, "ok\n");
1298          m_status = (dbi_error_flag) 1;
1299       }
1300
1301       if (res <= 0) {
1302          Dmsg0(500, "we failed\n");
1303          m_status = (dbi_error_flag) 0;
1304          //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
1305        }
1306       break;
1307    case SQL_TYPE_SQLITE3:
1308       m_status = (dbi_error_flag) 0;
1309       break;
1310    }
1311
1312    Dmsg0(500, "sql_batch_start finishing\n");
1313
1314    return true;
1315 }
1316
1317 /*
1318  * This function is big and use a big switch.
1319  * In near future is better split in small functions
1320  * and refactory.
1321  */
1322 bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1323 {
1324    int res;
1325    int count=30;
1326    dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1327    int (*custom_function)(void*, const char*, int) = NULL;
1328    char* (*custom_function_error)(void*) = NULL;
1329    size_t len;
1330    char *digest;
1331    char ed1[50];
1332
1333    Dmsg0(500, "sql_batch_start started \n");
1334
1335    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1336    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1337
1338    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1339       *digest = '\0';
1340    } else {
1341       digest = ar->Digest;
1342    }
1343
1344    switch (m_db_type) {
1345    case SQL_TYPE_MYSQL:
1346       db_escape_string(jcr, esc_name, fname, fnl);
1347       db_escape_string(jcr, esc_path, path, pnl);
1348       len = Mmsg(cmd, "INSERT INTO batch VALUES "
1349                       "(%u,%s,'%s','%s','%s','%s',%u)",
1350                       ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1351                       esc_name, ar->attr, digest, ar->DeltaSeq);
1352
1353       if (!sql_query(cmd))
1354       {
1355          Dmsg0(500, "sql_batch_start failed\n");
1356          goto bail_out;
1357       }
1358
1359       Dmsg0(500, "sql_batch_start finishing\n");
1360
1361       return true;
1362       break;
1363    case SQL_TYPE_POSTGRESQL:
1364       postgresql_copy_escape(esc_name, fname, fnl);
1365       postgresql_copy_escape(esc_path, path, pnl);
1366       len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1367                      ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1368                      esc_name, ar->attr, digest, ar->DeltaSeq);
1369
1370       /*
1371        * libdbi don't support CopyData and we need call a postgresql
1372        * specific function to do this work
1373        */
1374       Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
1375       custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
1376       if (custom_function != NULL) {
1377          do {
1378             res = (*custom_function)(myconn->connection, cmd, len);
1379          } while (res == 0 && --count > 0);
1380
1381          if (res == 1) {
1382             Dmsg0(500, "ok\n");
1383             changes++;
1384             m_status = (dbi_error_flag) 1;
1385          }
1386
1387          if (res <= 0) {
1388             Dmsg0(500, "sql_batch_insert failed\n");
1389             goto bail_out;
1390          }
1391
1392          Dmsg0(500, "sql_batch_insert finishing\n");
1393          return true;
1394       } else {
1395          /*
1396           * Ensure to detect a PQerror
1397           */
1398          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
1399          Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1400          goto bail_out;
1401       }
1402       break;
1403    case SQL_TYPE_SQLITE3:
1404       db_escape_string(jcr, esc_name, fname, fnl);
1405       db_escape_string(jcr, esc_path, path, pnl);
1406       len = Mmsg(cmd, "INSERT INTO batch VALUES "
1407                       "(%u,%s,'%s','%s','%s','%s',%u)",
1408                       ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1409                       esc_name, ar->attr, digest, ar->DeltaSeq);
1410
1411       if (!sql_query(cmd))
1412       {
1413          Dmsg0(500, "sql_batch_insert failed\n");
1414          goto bail_out;
1415       }
1416
1417       Dmsg0(500, "sql_batch_insert finishing\n");
1418
1419       return true;
1420       break;
1421    }
1422
1423 bail_out:
1424    Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
1425    m_status = (dbi_error_flag) 0;
1426    sql_free_result();
1427    return false;
1428 }
1429
1430 /*
1431  * Initialize database data structure. In principal this should
1432  * never have errors, or it is really fatal.
1433  */
1434 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1435                        const char *db_password, const char *db_address, int db_port,
1436                        const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1437 {
1438    B_DB_DBI *mdb = NULL;
1439
1440    if (!db_driver) {
1441       Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
1442    }
1443
1444    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
1445       Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
1446    }
1447
1448    if (!db_user) {
1449       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
1450       return NULL;
1451    }
1452
1453    P(mutex);                          /* lock DB queue */
1454    if (db_list && !mult_db_connections) {
1455       /*
1456        * Look to see if DB already open
1457        */
1458       foreach_dlist(mdb, db_list) {
1459          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1460             Dmsg1(100, "DB REopen %s\n", db_name);
1461             mdb->increment_refcount();
1462             goto bail_out;
1463          }
1464       }
1465    }
1466    Dmsg0(100, "db_init_database first time\n");
1467    mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
1468                       db_port, db_socket, mult_db_connections, disable_batch_insert));
1469
1470 bail_out:
1471    V(mutex);
1472    return mdb;
1473 }
1474
1475 #endif /* HAVE_DBI */