]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Enhance mountcache with rescan option after interval.
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
37  */
38 /*
39  * This code only compiles against a recent version of libdbi. The current
40  * release found on the libdbi website (0.8.3) won't work for this code.
41  *
42  * You find the libdbi library on http://sourceforge.net/projects/libdbi
43  *
44  * A fairly recent version of libdbi from CVS works, so either make sure
45  * your distribution has a fairly recent version of libdbi installed or
46  * clone the CVS repositories from sourceforge and compile that code and
47  * install it.
48  *
49  * You need:
50  * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
52  */
53
54 #include "bacula.h"
55
56 #ifdef HAVE_DBI
57
58 #include "cats.h"
59 #include "bdb_priv.h"
60 #include <dbi/dbi.h>
61 #include <dbi/dbi-dev.h>
62 #include <bdb_dbi.h>
63
64 /* -----------------------------------------------------------------------
65  *
66  *   DBI dependent defines and subroutines
67  *
68  * -----------------------------------------------------------------------
69  */
70
71 /*
72  * List of open databases
73  */
74 static dlist *db_list = NULL;
75
76 /*
77  * Control allocated fields by dbi_getvalue
78  */
79 static dlist *dbi_getvalue_list = NULL;
80
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
82
83 typedef int (*custom_function_insert_t)(void*, const char*, int);
84 typedef char* (*custom_function_error_t)(void*);
85 typedef int (*custom_function_end_t)(void*, const char*);
86
87 B_DB_DBI::B_DB_DBI(JCR *jcr,
88                    const char *db_driver,
89                    const char *db_name,
90                    const char *db_user,
91                    const char *db_password,
92                    const char *db_address,
93                    int db_port,
94                    const char *db_socket,
95                    bool mult_db_connections,
96                    bool disable_batch_insert)
97 {
98    char *p;
99    char new_db_driver[10];
100    char db_driverdir[256];
101    DBI_FIELD_GET *field;
102
103    p = (char *)(db_driver + 4);
104    if (strcasecmp(p, "mysql") == 0) {
105       m_db_type = SQL_TYPE_MYSQL;
106       bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
107    } else if (strcasecmp(p, "postgresql") == 0) {
108       m_db_type = SQL_TYPE_POSTGRESQL;
109       bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
110    } else if (strcasecmp(p, "sqlite3") == 0) {
111       m_db_type = SQL_TYPE_SQLITE3;
112       bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
113    } else if (strcasecmp(p, "ingres") == 0) {
114       m_db_type = SQL_TYPE_INGRES;
115       bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
116    } else {
117       Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
118       return;
119    }
120
121    /*
122     * Set db_driverdir whereis is the libdbi drivers
123     */
124    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
125
126    /*
127     * Initialize the parent class members.
128     */
129    m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
130    m_db_name = bstrdup(db_name);
131    m_db_user = bstrdup(db_user);
132    if (db_password) {
133       m_db_password = bstrdup(db_password);
134    }
135    if (db_address) {
136       m_db_address = bstrdup(db_address);
137    }
138    if (db_socket) {
139       m_db_socket = bstrdup(db_socket);
140    }
141    if (db_driverdir) {
142       m_db_driverdir = bstrdup(db_driverdir);
143    }
144    m_db_driver = bstrdup(new_db_driver);
145    m_db_port = db_port;
146    if (disable_batch_insert) {
147       m_disabled_batch_insert = true;
148       m_have_batch_insert = false;
149    } else {
150       m_disabled_batch_insert = false;
151 #if defined(USE_BATCH_FILE_INSERT)
152 #ifdef HAVE_DBI_BATCH_FILE_INSERT
153       m_have_batch_insert = true;
154 #else
155       m_have_batch_insert = false;
156 #endif /* HAVE_DBI_BATCH_FILE_INSERT */
157 #else
158       m_have_batch_insert = false;
159 #endif /* USE_BATCH_FILE_INSERT */
160    }
161    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
162    *errmsg = 0;
163    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
164    cached_path = get_pool_memory(PM_FNAME);
165    cached_path_id = 0;
166    m_ref_count = 1;
167    fname = get_pool_memory(PM_FNAME);
168    path = get_pool_memory(PM_FNAME);
169    esc_name = get_pool_memory(PM_FNAME);
170    esc_path = get_pool_memory(PM_FNAME);
171    esc_obj = get_pool_memory(PM_FNAME);
172    m_allow_transactions = mult_db_connections;
173
174    /*
175     * Initialize the private members.
176     */
177    m_db_handle = NULL;
178    m_result = NULL;
179    m_field_get = NULL;
180
181    /*
182     * Put the db in the list.
183     */
184    if (db_list == NULL) {
185       db_list = New(dlist(this, &this->m_link));
186       dbi_getvalue_list = New(dlist(field, &field->link));
187    }
188    db_list->append(this);
189 }
190
191 B_DB_DBI::~B_DB_DBI()
192 {
193 }
194
195 /*
196  * Now actually open the database.  This can generate errors,
197  *   which are returned in the errmsg
198  *
199  * DO NOT close the database or delete mdb here  !!!!
200  */
201 bool B_DB_DBI::db_open_database(JCR *jcr)
202 {
203    bool retval = false;
204    int errstat;
205    int dbstat;
206    uint8_t len;
207    const char *dbi_errmsg;
208    char buf[10], *port;
209    int numdrivers;
210    char *new_db_name = NULL;
211    char *new_db_dir = NULL;
212
213    P(mutex);
214    if (m_connected) {
215       retval = true;
216       goto bail_out;
217    }
218
219    if ((errstat=rwl_init(&m_lock)) != 0) {
220       berrno be;
221       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
222             be.bstrerror(errstat));
223       goto bail_out;
224    }
225
226    if (m_db_port) {
227       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
228       port = buf;
229    } else {
230       port = NULL;
231    }
232
233    numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
234    if (numdrivers < 0) {
235       Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
236                                "db_driverdir=%s. It is probaly not found any drivers\n"),
237                                m_db_driverdir,numdrivers);
238       goto bail_out;
239    }
240    m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
241    /*
242     * Can be many types of databases
243     */
244    switch (m_db_type) {
245    case SQL_TYPE_MYSQL:
246       dbi_conn_set_option(m_db_handle, "host", m_db_address);      /* default = localhost */
247       dbi_conn_set_option(m_db_handle, "port", port);              /* default port */
248       dbi_conn_set_option(m_db_handle, "username", m_db_user);     /* login name */
249       dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
250       dbi_conn_set_option(m_db_handle, "dbname", m_db_name);       /* database name */
251       break;
252    case SQL_TYPE_POSTGRESQL:
253       dbi_conn_set_option(m_db_handle, "host", m_db_address);
254       dbi_conn_set_option(m_db_handle, "port", port);
255       dbi_conn_set_option(m_db_handle, "username", m_db_user);
256       dbi_conn_set_option(m_db_handle, "password", m_db_password);
257       dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
258       break;
259    case SQL_TYPE_SQLITE3:
260       len = strlen(working_directory) + 5;
261       new_db_dir = (char *)malloc(len);
262       strcpy(new_db_dir, working_directory);
263       strcat(new_db_dir, "/");
264       len = strlen(m_db_name) + 5;
265       new_db_name = (char *)malloc(len);
266       strcpy(new_db_name, m_db_name);
267       strcat(new_db_name, ".db");
268       dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
269       dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
270       Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
271       free(new_db_dir);
272       free(new_db_name);
273       break;
274    }
275
276    /*
277     * If connection fails, try at 5 sec intervals for 30 seconds.
278     */
279    for (int retry=0; retry < 6; retry++) {
280       dbstat = dbi_conn_connect(m_db_handle);
281       if (dbstat == 0) {
282          break;
283       }
284
285       dbi_conn_error(m_db_handle, &dbi_errmsg);
286       Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
287
288       bmicrosleep(5, 0);
289    }
290
291    if (dbstat != 0 ) {
292       Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
293          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
294          m_db_driver, m_db_name, m_db_user);
295       goto bail_out;
296    }
297
298    Dmsg0(50, "dbi_real_connect done\n");
299    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
300          m_db_user, m_db_name,
301         (m_db_password == NULL) ? "(NULL)" : m_db_password);
302
303    m_connected = true;
304
305    if (!check_tables_version(jcr, this)) {
306       goto bail_out;
307    }
308
309    switch (m_db_type) {
310    case SQL_TYPE_MYSQL:
311       /*
312        * Set connection timeout to 8 days specialy for batch mode
313        */
314       sql_query("SET wait_timeout=691200");
315       sql_query("SET interactive_timeout=691200");
316       break;
317    case SQL_TYPE_POSTGRESQL:
318       /*
319        * Tell PostgreSQL we are using standard conforming strings
320        * and avoid warnings such as:
321        * WARNING:  nonstandard use of \\ in a string literal
322        */
323       sql_query("SET datestyle TO 'ISO, YMD'");
324       sql_query("SET standard_conforming_strings=on");
325       break;
326    }
327
328    retval = true;
329
330 bail_out:
331    V(mutex);
332    return retval;
333 }
334
335 void B_DB_DBI::db_close_database(JCR *jcr)
336 {
337    db_end_transaction(jcr);
338    P(mutex);
339    m_ref_count--;
340    if (m_ref_count == 0) {
341       sql_free_result();
342       db_list->remove(this);
343       if (m_connected && m_db_handle) {
344          dbi_shutdown_r(m_instance);
345          m_db_handle = NULL;
346          m_instance = NULL;
347       }
348       rwl_destroy(&m_lock);
349       free_pool_memory(errmsg);
350       free_pool_memory(cmd);
351       free_pool_memory(cached_path);
352       free_pool_memory(fname);
353       free_pool_memory(path);
354       free_pool_memory(esc_name);
355       free_pool_memory(esc_path);
356       free_pool_memory(esc_obj);
357       if (m_db_driver) {
358          free(m_db_driver);
359       }
360       if (m_db_name) {
361          free(m_db_name);
362       }
363       if (m_db_user) {
364          free(m_db_user);
365       }
366       if (m_db_password) {
367          free(m_db_password);
368       }
369       if (m_db_address) {
370          free(m_db_address);
371       }
372       if (m_db_socket) {
373          free(m_db_socket);
374       }
375       if (m_db_driverdir) {
376          free(m_db_driverdir);
377       }
378       delete this;
379       if (db_list->size() == 0) {
380          delete db_list;
381          db_list = NULL;
382       }
383    }
384    V(mutex);
385 }
386
387 void B_DB_DBI::db_thread_cleanup(void)
388 {
389 }
390
391 /*
392  * Escape strings so that DBI is happy
393  *
394  *   NOTE! len is the length of the old string. Your new
395  *         string must be long enough (max 2*old+1) to hold
396  *         the escaped output.
397  *
398  * dbi_conn_quote_string_copy receives a pointer to pointer.
399  * We need copy the value of pointer to snew because libdbi change the
400  * pointer
401  */
402 void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
403 {
404    char *inew;
405    char *pnew;
406
407    if (len == 0) {
408       snew[0] = 0;
409    } else {
410       /*
411        * Correct the size of old basead in len and copy new string to inew
412        */
413       inew = (char *)malloc(sizeof(char) * len + 1);
414       bstrncpy(inew,old,len + 1);
415       /*
416        * Escape the correct size of old
417        */
418       dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
419       free(inew);
420       /*
421        * Copy the escaped string to snew
422        */
423       bstrncpy(snew, pnew, 2 * len + 1);
424    }
425
426    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
427 }
428
429 /*
430  * Escape binary object so that DBI is happy
431  * Memory is stored in B_DB struct, no need to free it
432  */
433 char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
434 {
435    size_t new_len;
436    char *pnew;
437
438    if (len == 0) {
439       esc_obj[0] = 0;
440    } else {
441       new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
442       esc_obj = check_pool_memory_size(esc_obj, new_len+1);
443       memcpy(esc_obj, pnew, new_len);
444    }
445
446    return esc_obj;
447 }
448
449 /*
450  * Unescape binary object so that DBI is happy
451  */
452 void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
453                                   POOLMEM **dest, int32_t *dest_len)
454 {
455    if (!from) {
456       *dest[0] = 0;
457       *dest_len = 0;
458       return;
459    }
460    *dest = check_pool_memory_size(*dest, expected_len+1);
461    *dest_len = expected_len;
462    memcpy(*dest, from, expected_len);
463    (*dest)[expected_len]=0;
464 }
465
466 /*
467  * Start a transaction. This groups inserts and makes things
468  * much more efficient. Usually started when inserting
469  * file attributes.
470  */
471 void B_DB_DBI::db_start_transaction(JCR *jcr)
472 {
473    if (!jcr->attr) {
474       jcr->attr = get_pool_memory(PM_FNAME);
475    }
476    if (!jcr->ar) {
477       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
478    }
479
480    switch (m_db_type) {
481    case SQL_TYPE_SQLITE3:
482       if (!m_allow_transactions) {
483          return;
484       }
485
486       db_lock(this);
487       /*
488        * Allow only 10,000 changes per transaction
489        */
490       if (m_transaction && changes > 10000) {
491          db_end_transaction(jcr);
492       }
493       if (!m_transaction) {
494          sql_query("BEGIN");  /* begin transaction */
495          Dmsg0(400, "Start SQLite transaction\n");
496          m_transaction = true;
497       }
498       db_unlock(this);
499       break;
500    case SQL_TYPE_POSTGRESQL:
501       /*
502        * This is turned off because transactions break
503        * if multiple simultaneous jobs are run.
504        */
505       if (!m_allow_transactions) {
506          return;
507       }
508
509       db_lock(this);
510       /*
511        * Allow only 25,000 changes per transaction
512        */
513       if (m_transaction && changes > 25000) {
514          db_end_transaction(jcr);
515       }
516       if (!m_transaction) {
517          sql_query("BEGIN");  /* begin transaction */
518          Dmsg0(400, "Start PosgreSQL transaction\n");
519          m_transaction = true;
520       }
521       db_unlock(this);
522       break;
523    case SQL_TYPE_INGRES:
524       if (!m_allow_transactions) {
525          return;
526       }
527
528       db_lock(this);
529       /*
530        * Allow only 25,000 changes per transaction
531        */
532       if (m_transaction && changes > 25000) {
533          db_end_transaction(jcr);
534       }
535       if (!m_transaction) {
536          sql_query("BEGIN");  /* begin transaction */
537          Dmsg0(400, "Start Ingres transaction\n");
538          m_transaction = true;
539       }
540       db_unlock(this);
541       break;
542    default:
543       break;
544    }
545 }
546
547 void B_DB_DBI::db_end_transaction(JCR *jcr)
548 {
549    if (jcr && jcr->cached_attribute) {
550       Dmsg0(400, "Flush last cached attribute.\n");
551       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
552          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
553       }
554       jcr->cached_attribute = false;
555    }
556
557    switch (m_db_type) {
558    case SQL_TYPE_SQLITE3:
559       if (!m_allow_transactions) {
560          return;
561       }
562
563       db_lock(this);
564       if (m_transaction) {
565          sql_query("COMMIT"); /* end transaction */
566          m_transaction = false;
567          Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
568       }
569       changes = 0;
570       db_unlock(this);
571       break;
572    case SQL_TYPE_POSTGRESQL:
573       if (!m_allow_transactions) {
574          return;
575       }
576
577       db_lock(this);
578       if (m_transaction) {
579          sql_query("COMMIT"); /* end transaction */
580          m_transaction = false;
581          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
582       }
583       changes = 0;
584       db_unlock(this);
585       break;
586    case SQL_TYPE_INGRES:
587       if (!m_allow_transactions) {
588          return;
589       }
590
591       db_lock(this);
592       if (m_transaction) {
593          sql_query("COMMIT"); /* end transaction */
594          m_transaction = false;
595          Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
596       }
597       changes = 0;
598       db_unlock(this);
599       break;
600    default:
601       break;
602    }
603 }
604
605 /*
606  * Submit a general SQL command (cmd), and for each row returned,
607  * the result_handler is called with the ctx.
608  */
609 bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
610 {
611    bool retval = true;
612    SQL_ROW row;
613
614    Dmsg1(500, "db_sql_query starts with %s\n", query);
615
616    db_lock(this);
617    if (!sql_query(query, QF_STORE_RESULT)) {
618       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
619       Dmsg0(500, "db_sql_query failed\n");
620       retval = false;
621       goto bail_out;
622    }
623
624    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
625
626    if (result_handler != NULL) {
627       Dmsg0(500, "db_sql_query invoking handler\n");
628       while ((row = sql_fetch_row()) != NULL) {
629          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
630          if (result_handler(ctx, m_num_fields, row))
631             break;
632       }
633       sql_free_result();
634    }
635
636    Dmsg0(500, "db_sql_query finished\n");
637
638 bail_out:
639    db_unlock(this);
640    return retval;
641 }
642
643 /*
644  * Note, if this routine returns 1 (failure), Bacula expects
645  *  that no result has been stored.
646  *
647  *  Returns:  true on success
648  *            false on failure
649  */
650 bool B_DB_DBI::sql_query(const char *query, int flags)
651 {
652    bool retval = false;
653    const char *dbi_errmsg;
654
655    Dmsg1(500, "sql_query starts with %s\n", query);
656
657    /*
658     * We are starting a new query.  reset everything.
659     */
660    m_num_rows     = -1;
661    m_row_number   = -1;
662    m_field_number = -1;
663
664    if (m_result) {
665       dbi_result_free(m_result);  /* hmm, someone forgot to free?? */
666       m_result = NULL;
667    }
668
669    m_result = (void **)dbi_conn_query(m_db_handle, query);
670
671    if (!m_result) {
672       Dmsg2(50, "Query failed: %s %p\n", query, m_result);
673       goto bail_out;
674    }
675
676    m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
677    if (m_status == DBI_ERROR_NONE) {
678       Dmsg1(500, "we have a result\n", query);
679
680       /*
681        * How many fields in the set?
682        * num_fields starting at 1
683        */
684       m_num_fields = dbi_result_get_numfields(m_result);
685       Dmsg1(500, "we have %d fields\n", m_num_fields);
686       /*
687        * If no result num_rows is 0
688        */
689       m_num_rows = dbi_result_get_numrows(m_result);
690       Dmsg1(500, "we have %d rows\n", m_num_rows);
691
692       m_status = (dbi_error_flag) 0;                  /* succeed */
693    } else {
694       Dmsg1(50, "Result status failed: %s\n", query);
695       goto bail_out;
696    }
697
698    Dmsg0(500, "sql_query finishing\n");
699    retval = true;
700    goto ok_out;
701
702 bail_out:
703    m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
704    //dbi_conn_error(m_db_handle, &dbi_errmsg);
705    Dmsg4(500, "sql_query we failed dbi error: "
706                    "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
707    dbi_result_free(m_result);
708    m_result = NULL;
709    m_status = (dbi_error_flag) 1;                   /* failed */
710
711 ok_out:
712    return retval;
713 }
714
715 void B_DB_DBI::sql_free_result(void)
716 {
717    DBI_FIELD_GET *f;
718
719    db_lock(this);
720    if (m_result) {
721       dbi_result_free(m_result);
722       m_result = NULL;
723    }
724    if (m_rows) {
725       free(m_rows);
726       m_rows = NULL;
727    }
728    /* 
729     * Now is time to free all value return by dbi_get_value
730     * this is necessary because libdbi don't free memory return by yours results
731     * and Bacula has some routine wich call more than once time sql_fetch_row
732     *
733     * Using a queue to store all pointer allocate is a good way to free all things
734     * when necessary
735     */
736    foreach_dlist(f, dbi_getvalue_list) {
737       free(f->value);
738       free(f);
739    }
740    if (m_fields) {
741       free(m_fields);
742       m_fields = NULL;
743    }
744    m_num_rows = m_num_fields = 0;
745    db_unlock(this);
746 }
747
748 /* dbi_getvalue
749  * like PQgetvalue;
750  * char *PQgetvalue(const PGresult *res,
751  *                int row_number,
752  *                int column_number);
753  *
754  * use dbi_result_seek_row to search in result set
755  * use example to return only strings
756  */
757 static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
758 {
759    char *buf = NULL;
760    const char *dbi_errmsg;
761    const char *field_name;
762    unsigned short dbitype;
763    size_t field_length;
764    int64_t num;
765
766    /* correct the index for dbi interface
767     * dbi index begins 1
768     * I prefer do not change others functions
769     */
770    Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
771          result, row_number, column_number);
772
773    column_number++;
774
775    if(row_number == 0) {
776      row_number++;
777    }
778
779    Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
780                         result, row_number, column_number);
781
782    if(dbi_result_seek_row(result, row_number)) {
783
784       field_name = dbi_result_get_field_name(result, column_number);
785       field_length = dbi_result_get_field_length(result, field_name);
786       dbitype = dbi_result_get_field_type_idx(result,column_number);
787
788       Dmsg3(500, "dbi_getvalue start: type: '%d' "
789             "field_length bytes: '%d' fieldname: '%s'\n",
790             dbitype, field_length, field_name);
791
792       if(field_length) {
793          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
794          buf = (char *)malloc(field_length + 1);
795       } else {
796          /*
797           * if numbers
798           */
799          buf = (char *)malloc(sizeof(char *) * 50);
800       }
801
802       switch (dbitype) {
803       case DBI_TYPE_INTEGER:
804          num = dbi_result_get_longlong(result, field_name);
805          edit_int64(num, buf);
806          field_length = strlen(buf);
807          break;
808       case DBI_TYPE_STRING:
809          if(field_length) {
810             field_length = bsnprintf(buf, field_length + 1, "%s",
811             dbi_result_get_string(result, field_name));
812          } else {
813             buf[0] = 0;
814          }
815          break;
816       case DBI_TYPE_BINARY:
817          /*
818           * dbi_result_get_binary return a NULL pointer if value is empty
819           * following, change this to what Bacula espected
820           */
821          if(field_length) {
822             field_length = bsnprintf(buf, field_length + 1, "%s",
823                   dbi_result_get_binary(result, field_name));
824          } else {
825             buf[0] = 0;
826          }
827          break;
828       case DBI_TYPE_DATETIME:
829          time_t last;
830          struct tm tm;
831
832          last = dbi_result_get_datetime(result, field_name);
833
834          if(last == -1) {
835                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
836          } else {
837             (void)localtime_r(&last, &tm);
838             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
839                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
840                   tm.tm_hour, tm.tm_min, tm.tm_sec);
841          }
842          break;
843       }
844
845    } else {
846       dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
847       Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
848    }
849
850    Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
851       buf, field_length, buf);
852
853    /*
854     * Don't worry about this buf
855     */
856    return buf;
857 }
858
859 SQL_ROW B_DB_DBI::sql_fetch_row(void)
860 {
861    int j;
862    SQL_ROW row = NULL; /* by default, return NULL */
863
864    Dmsg0(500, "sql_fetch_row start\n");
865    if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
866       if (m_rows) {
867          Dmsg0(500, "sql_fetch_row freeing space\n");
868          Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
869          if (m_num_rows != 0) {
870             for (j = 0; j < m_num_fields; j++) {
871                Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
872                   if (m_rows[j]) {
873                      free(m_rows[j]);
874                   }
875             }
876          }
877          free(m_rows);
878       }
879       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
880       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
881       m_rows_size = m_num_fields;
882
883       /*
884        * Now reset the row_number now that we have the space allocated
885        */
886       m_row_number = 1;
887    }
888
889    /*
890     * If still within the result set
891     */
892    if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
893       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
894       /*
895        * Get each value from this row
896        */
897       for (j = 0; j < m_num_fields; j++) {
898          m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
899          /*
900           * Allocate space to queue row
901           */
902          m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
903          /*
904           * Store the pointer in queue
905           */
906          m_field_get->value = m_rows[j];
907          Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
908                j, m_rows[j], m_field_get->value, m_rows[j]);
909          /*
910           * Insert in queue to future free
911           */
912          dbi_getvalue_list->append(m_field_get);
913       }
914       /*
915        * Increment the row number for the next call
916        */
917       m_row_number++;
918
919       row = m_rows;
920    } else {
921       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
922    }
923
924    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
925
926    return row;
927 }
928
929 const char *B_DB_DBI::sql_strerror(void)
930 {
931    const char *dbi_errmsg;
932
933    dbi_conn_error(m_db_handle, &dbi_errmsg);
934
935    return dbi_errmsg;
936 }
937
938 void B_DB_DBI::sql_data_seek(int row)
939 {
940    /*
941     * Set the row number to be returned on the next call to sql_fetch_row
942     */
943    m_row_number = row;
944 }
945
946 int B_DB_DBI::sql_affected_rows(void)
947 {
948 #if 0
949    return dbi_result_get_numrows_affected(result);
950 #else
951    return 1;
952 #endif
953 }
954
955 uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
956 {
957    char sequence[30];
958    uint64_t id = 0;
959
960    /*
961     * First execute the insert query and then retrieve the currval.
962     */
963    if (!sql_query(query)) {
964       return 0;
965    }
966
967    m_num_rows = sql_affected_rows();
968    if (m_num_rows != 1) {
969       return 0;
970    }
971
972    changes++;
973
974    /*
975     * Obtain the current value of the sequence that
976     * provides the serial value for primary key of the table.
977     *
978     * currval is local to our session.  It is not affected by
979     * other transactions.
980     *
981     * Determine the name of the sequence.
982     * PostgreSQL automatically creates a sequence using
983     * <table>_<column>_seq.
984     * At the time of writing, all tables used this format for
985     * for their primary key: <table>id
986     * Except for basefiles which has a primary key on baseid.
987     * Therefore, we need to special case that one table.
988     *
989     * everything else can use the PostgreSQL formula.
990     */
991    if (m_db_type == SQL_TYPE_POSTGRESQL) {
992       if (strcasecmp(table_name, "basefiles") == 0) {
993          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
994       } else {
995          bstrncpy(sequence, table_name, sizeof(sequence));
996          bstrncat(sequence, "_", sizeof(sequence));
997          bstrncat(sequence, table_name, sizeof(sequence));
998          bstrncat(sequence, "id", sizeof(sequence));
999       }
1000
1001       bstrncat(sequence, "_seq", sizeof(sequence));
1002       id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
1003    } else {
1004       id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
1005    }
1006
1007    return id;
1008 }
1009
1010 /* dbi_getisnull
1011  * like PQgetisnull
1012  * int PQgetisnull(const PGresult *res,
1013  *                 int row_number,
1014  *                 int column_number);
1015  *
1016  *  use dbi_result_seek_row to search in result set
1017  */
1018 static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1019    int i;
1020
1021    if (row_number == 0) {
1022       row_number++;
1023    }
1024
1025    column_number++;
1026
1027    if (dbi_result_seek_row(result, row_number)) {
1028       i = dbi_result_field_is_null_idx(result,column_number);
1029       return i;
1030    } else {
1031       return 0;
1032    }
1033 }
1034
1035 SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
1036 {
1037    int i, j;
1038    int dbi_index;
1039    int max_length;
1040    int this_length;
1041    char *cbuf = NULL;
1042
1043    Dmsg0(500, "sql_fetch_field starts\n");
1044
1045    if (!m_fields || m_fields_size < m_num_fields) {
1046       if (m_fields) {
1047          free(m_fields);
1048          m_fields = NULL;
1049       }
1050       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
1051       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
1052       m_fields_size = m_num_fields;
1053
1054       for (i = 0; i < m_num_fields; i++) {
1055          /*
1056           * num_fields is starting at 1, increment i by 1
1057           */
1058          dbi_index = i + 1;
1059          Dmsg1(500, "filling field %d\n", i);
1060          m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
1061          m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
1062          m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
1063
1064          /*
1065           * For a given column, find the max length.
1066           */
1067          max_length = 0;
1068          for (j = 0; j < m_num_rows; j++) {
1069             if (dbi_getisnull(m_result, j, dbi_index)) {
1070                 this_length = 4;        /* "NULL" */
1071             } else {
1072                cbuf = dbi_getvalue(m_result, j, dbi_index);
1073                this_length = cstrlen(cbuf);
1074                /*
1075                 * cbuf is always free
1076                 */
1077                free(cbuf);
1078             }
1079          
1080             if (max_length < this_length) {
1081                max_length = this_length;
1082             }
1083          }
1084          m_fields[i].max_length = max_length;
1085
1086          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
1087                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
1088       }
1089    }
1090
1091    /*
1092     * Increment field number for the next time around
1093     */
1094    return &m_fields[m_field_number++];
1095 }
1096
1097 bool B_DB_DBI::sql_field_is_not_null(int field_type)
1098 {
1099    switch (field_type) {
1100    case (1 << 0):
1101       return true;
1102    default:
1103       return false;
1104    }
1105 }
1106
1107 bool B_DB_DBI::sql_field_is_numeric(int field_type)
1108 {
1109    switch (field_type) {
1110    case 1:
1111    case 2:
1112       return true;
1113    default:
1114       return false;
1115    }
1116 }
1117
1118 /*
1119  * Escape strings so that PostgreSQL is happy on COPY
1120  *
1121  *   NOTE! len is the length of the old string. Your new
1122  *         string must be long enough (max 2*old+1) to hold
1123  *         the escaped output.
1124  */
1125 static char *postgresql_copy_escape(char *dest, char *src, size_t len)
1126 {
1127    /*
1128     * We have to escape \t, \n, \r, \
1129     */
1130    char c = '\0' ;
1131
1132    while (len > 0 && *src) {
1133       switch (*src) {
1134       case '\n':
1135          c = 'n';
1136          break;
1137       case '\\':
1138          c = '\\';
1139          break;
1140       case '\t':
1141          c = 't';
1142          break;
1143       case '\r':
1144          c = 'r';
1145          break;
1146       default:
1147          c = '\0' ;
1148       }
1149
1150       if (c) {
1151          *dest = '\\';
1152          dest++;
1153          *dest = c;
1154       } else {
1155          *dest = *src;
1156       }
1157
1158       len--;
1159       src++;
1160       dest++;
1161    }
1162
1163    *dest = '\0';
1164    return dest;
1165 }
1166
1167 /*
1168  * This can be a bit strang but is the one way to do
1169  *
1170  * Returns true if OK
1171  *         false if failed
1172  */
1173 bool B_DB_DBI::sql_batch_start(JCR *jcr)
1174 {
1175    bool retval = true;
1176    const char *query = "COPY batch FROM STDIN";
1177
1178    Dmsg0(500, "sql_batch_start started\n");
1179
1180    db_lock(this);
1181    switch (m_db_type) {
1182    case SQL_TYPE_MYSQL:
1183       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1184                              "FileIndex integer,"
1185                              "JobId integer,"
1186                              "Path blob,"
1187                              "Name blob,"
1188                              "LStat tinyblob,"
1189                              "MD5 tinyblob,"
1190                              "DeltaSeq smallint)")) {
1191          Dmsg0(500, "sql_batch_start failed\n");
1192          goto bail_out;
1193       }
1194       Dmsg0(500, "sql_batch_start finishing\n");
1195       goto ok_out;
1196    case SQL_TYPE_POSTGRESQL:
1197       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1198                              "FileIndex int,"
1199                              "JobId int,"
1200                              "Path varchar,"
1201                              "Name varchar,"
1202                              "LStat varchar,"
1203                              "MD5 varchar,"
1204                              "DeltaSeq int)")) {
1205          Dmsg0(500, "sql_batch_start failed\n");
1206          goto bail_out;
1207       }
1208
1209       /*
1210        * We are starting a new query.  reset everything.
1211        */
1212       m_num_rows     = -1;
1213       m_row_number   = -1;
1214       m_field_number = -1;
1215
1216       sql_free_result();
1217
1218       for (int i=0; i < 10; i++) {
1219          sql_query(query);
1220          if (m_result) {
1221             break;
1222          }
1223          bmicrosleep(5, 0);
1224       }
1225       if (!m_result) {
1226          Dmsg1(50, "Query failed: %s\n", query);
1227          goto bail_out;
1228       }
1229
1230       m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
1231       //m_status = DBI_ERROR_NONE;
1232
1233       if (m_status == DBI_ERROR_NONE) {
1234          /*
1235           * How many fields in the set?
1236           */
1237          m_num_fields = dbi_result_get_numfields(m_result);
1238          m_num_rows = dbi_result_get_numrows(m_result);
1239          m_status = (dbi_error_flag) 1;
1240       } else {
1241          Dmsg1(50, "Result status failed: %s\n", query);
1242          goto bail_out;
1243       }
1244
1245       Dmsg0(500, "sql_batch_start finishing\n");
1246       goto ok_out;
1247    case SQL_TYPE_SQLITE3:
1248       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1249                              "FileIndex integer,"
1250                              "JobId integer,"
1251                              "Path blob,"
1252                              "Name blob,"
1253                              "LStat tinyblob,"
1254                              "MD5 tinyblob,"
1255                              "DeltaSeq smallint)")) {
1256          Dmsg0(500, "sql_batch_start failed\n");
1257          goto bail_out;
1258       }
1259       Dmsg0(500, "sql_batch_start finishing\n");
1260       goto ok_out;
1261    }
1262
1263 bail_out:
1264    Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
1265    m_status = (dbi_error_flag) 0;
1266    sql_free_result();
1267    m_result = NULL;
1268    retval = false;
1269
1270 ok_out:
1271    db_unlock(this);
1272    return retval;
1273 }
1274
1275 /*
1276  * Set error to something to abort operation
1277  */
1278 bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
1279 {
1280    int res = 0;
1281    int count = 30;
1282    int (*custom_function)(void*, const char*) = NULL;
1283    dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1284
1285    Dmsg0(500, "sql_batch_start started\n");
1286
1287    switch (m_db_type) {
1288    case SQL_TYPE_MYSQL:
1289       m_status = (dbi_error_flag) 0;
1290       break;
1291    case SQL_TYPE_POSTGRESQL:
1292       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
1293
1294       do {
1295          res = (*custom_function)(myconn->connection, error);
1296       } while (res == 0 && --count > 0);
1297
1298       if (res == 1) {
1299          Dmsg0(500, "ok\n");
1300          m_status = (dbi_error_flag) 1;
1301       }
1302
1303       if (res <= 0) {
1304          Dmsg0(500, "we failed\n");
1305          m_status = (dbi_error_flag) 0;
1306          //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
1307        }
1308       break;
1309    case SQL_TYPE_SQLITE3:
1310       m_status = (dbi_error_flag) 0;
1311       break;
1312    }
1313
1314    Dmsg0(500, "sql_batch_start finishing\n");
1315
1316    return true;
1317 }
1318
1319 /*
1320  * This function is big and use a big switch.
1321  * In near future is better split in small functions
1322  * and refactory.
1323  */
1324 bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1325 {
1326    int res;
1327    int count=30;
1328    dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1329    int (*custom_function)(void*, const char*, int) = NULL;
1330    char* (*custom_function_error)(void*) = NULL;
1331    size_t len;
1332    char *digest;
1333    char ed1[50];
1334
1335    Dmsg0(500, "sql_batch_start started \n");
1336
1337    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1338    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1339
1340    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1341       *digest = '\0';
1342    } else {
1343       digest = ar->Digest;
1344    }
1345
1346    switch (m_db_type) {
1347    case SQL_TYPE_MYSQL:
1348       db_escape_string(jcr, esc_name, fname, fnl);
1349       db_escape_string(jcr, esc_path, path, pnl);
1350       len = Mmsg(cmd, "INSERT INTO batch VALUES "
1351                       "(%u,%s,'%s','%s','%s','%s',%u)",
1352                       ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1353                       esc_name, ar->attr, digest, ar->DeltaSeq);
1354
1355       if (!sql_query(cmd))
1356       {
1357          Dmsg0(500, "sql_batch_start failed\n");
1358          goto bail_out;
1359       }
1360
1361       Dmsg0(500, "sql_batch_start finishing\n");
1362
1363       return true;
1364       break;
1365    case SQL_TYPE_POSTGRESQL:
1366       postgresql_copy_escape(esc_name, fname, fnl);
1367       postgresql_copy_escape(esc_path, path, pnl);
1368       len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1369                      ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1370                      esc_name, ar->attr, digest, ar->DeltaSeq);
1371
1372       /*
1373        * libdbi don't support CopyData and we need call a postgresql
1374        * specific function to do this work
1375        */
1376       Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
1377       custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
1378       if (custom_function != NULL) {
1379          do {
1380             res = (*custom_function)(myconn->connection, cmd, len);
1381          } while (res == 0 && --count > 0);
1382
1383          if (res == 1) {
1384             Dmsg0(500, "ok\n");
1385             changes++;
1386             m_status = (dbi_error_flag) 1;
1387          }
1388
1389          if (res <= 0) {
1390             Dmsg0(500, "sql_batch_insert failed\n");
1391             goto bail_out;
1392          }
1393
1394          Dmsg0(500, "sql_batch_insert finishing\n");
1395          return true;
1396       } else {
1397          /*
1398           * Ensure to detect a PQerror
1399           */
1400          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
1401          Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1402          goto bail_out;
1403       }
1404       break;
1405    case SQL_TYPE_SQLITE3:
1406       db_escape_string(jcr, esc_name, fname, fnl);
1407       db_escape_string(jcr, esc_path, path, pnl);
1408       len = Mmsg(cmd, "INSERT INTO batch VALUES "
1409                       "(%u,%s,'%s','%s','%s','%s',%u)",
1410                       ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1411                       esc_name, ar->attr, digest, ar->DeltaSeq);
1412
1413       if (!sql_query(cmd))
1414       {
1415          Dmsg0(500, "sql_batch_insert failed\n");
1416          goto bail_out;
1417       }
1418
1419       Dmsg0(500, "sql_batch_insert finishing\n");
1420
1421       return true;
1422       break;
1423    }
1424
1425 bail_out:
1426    Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
1427    m_status = (dbi_error_flag) 0;
1428    sql_free_result();
1429    return false;
1430 }
1431
1432 /*
1433  * Initialize database data structure. In principal this should
1434  * never have errors, or it is really fatal.
1435  */
1436 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1437                        const char *db_password, const char *db_address, int db_port,
1438                        const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1439 {
1440    B_DB_DBI *mdb = NULL;
1441
1442    if (!db_driver) {
1443       Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
1444    }
1445
1446    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
1447       Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
1448    }
1449
1450    if (!db_user) {
1451       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
1452       return NULL;
1453    }
1454
1455    P(mutex);                          /* lock DB queue */
1456    if (db_list && !mult_db_connections) {
1457       /*
1458        * Look to see if DB already open
1459        */
1460       foreach_dlist(mdb, db_list) {
1461          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1462             Dmsg1(100, "DB REopen %s\n", db_name);
1463             mdb->increment_refcount();
1464             goto bail_out;
1465          }
1466       }
1467    }
1468    Dmsg0(100, "db_init_database first time\n");
1469    mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
1470                       db_port, db_socket, mult_db_connections, disable_batch_insert));
1471
1472 bail_out:
1473    V(mutex);
1474    return mdb;
1475 }
1476
1477 #endif /* HAVE_DBI */