]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
1bf948fecc2dfcc56670001464dbf36f55ea3cf6
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
37  */
38 /*
39  * This code only compiles against a recent version of libdbi. The current
40  * release found on the libdbi website (0.8.3) won't work for this code.
41  *
42  * You find the libdbi library on http://sourceforge.net/projects/libdbi
43  *
44  * A fairly recent version of libdbi from CVS works, so either make sure
45  * your distribution has a fairly recent version of libdbi installed or
46  * clone the CVS repositories from sourceforge and compile that code and
47  * install it.
48  *
49  * You need:
50  * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
52  */
53
54 #include "bacula.h"
55
56 #ifdef HAVE_DBI
57
58 #include "cats.h"
59 #include "bdb_priv.h"
60 #include <dbi/dbi.h>
61 #include <dbi/dbi-dev.h>
62 #include <bdb_dbi.h>
63
64 /* -----------------------------------------------------------------------
65  *
66  *   DBI dependent defines and subroutines
67  *
68  * -----------------------------------------------------------------------
69  */
70
71 /*
72  * List of open databases
73  */
74 static dlist *db_list = NULL;
75
76 /*
77  * Control allocated fields by dbi_getvalue
78  */
79 static dlist *dbi_getvalue_list = NULL;
80
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
82
83 typedef int (*custom_function_insert_t)(void*, const char*, int);
84 typedef char* (*custom_function_error_t)(void*);
85 typedef int (*custom_function_end_t)(void*, const char*);
86
87 B_DB_DBI::B_DB_DBI(JCR *jcr,
88                    const char *db_driver,
89                    const char *db_name,
90                    const char *db_user,
91                    const char *db_password,
92                    const char *db_address,
93                    int db_port,
94                    const char *db_socket,
95                    bool mult_db_connections,
96                    bool disable_batch_insert)
97 {
98    char *p;
99    char new_db_driver[10];
100    char db_driverdir[256];
101    DBI_FIELD_GET *field;
102
103    p = (char *)(db_driver + 4);
104    if (strcasecmp(p, "mysql") == 0) {
105       m_db_type = SQL_TYPE_MYSQL;
106       bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
107    } else if (strcasecmp(p, "postgresql") == 0) {
108       m_db_type = SQL_TYPE_POSTGRESQL;
109       bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
110    } else if (strcasecmp(p, "sqlite3") == 0) {
111       m_db_type = SQL_TYPE_SQLITE3;
112       bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
113    } else if (strcasecmp(p, "ingres") == 0) {
114       m_db_type = SQL_TYPE_INGRES;
115       bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
116    } else {
117       Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
118       return;
119    }
120
121    /*
122     * Set db_driverdir whereis is the libdbi drivers
123     */
124    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
125
126    /*
127     * Initialize the parent class members.
128     */
129    m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
130    m_db_name = bstrdup(db_name);
131    m_db_user = bstrdup(db_user);
132    if (db_password) {
133       m_db_password = bstrdup(db_password);
134    }
135    if (db_address) {
136       m_db_address = bstrdup(db_address);
137    }
138    if (db_socket) {
139       m_db_socket = bstrdup(db_socket);
140    }
141    if (db_driverdir) {
142       m_db_driverdir = bstrdup(db_driverdir);
143    }
144    m_db_driver = bstrdup(new_db_driver);
145    m_db_port = db_port;
146    if (disable_batch_insert) {
147       m_disabled_batch_insert = true;
148       m_have_batch_insert = false;
149    } else {
150       m_disabled_batch_insert = false;
151 #if defined(USE_BATCH_FILE_INSERT)
152 #ifdef HAVE_DBI_BATCH_FILE_INSERT
153       m_have_batch_insert = true;
154 #else
155       m_have_batch_insert = false;
156 #endif /* HAVE_DBI_BATCH_FILE_INSERT */
157 #else
158       m_have_batch_insert = false;
159 #endif /* USE_BATCH_FILE_INSERT */
160    }
161    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
162    *errmsg = 0;
163    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
164    cached_path = get_pool_memory(PM_FNAME);
165    cached_path_id = 0;
166    m_ref_count = 1;
167    fname = get_pool_memory(PM_FNAME);
168    path = get_pool_memory(PM_FNAME);
169    esc_name = get_pool_memory(PM_FNAME);
170    esc_path = get_pool_memory(PM_FNAME);
171    esc_obj = get_pool_memory(PM_FNAME);
172    m_allow_transactions = mult_db_connections;
173
174    /* At this time, when mult_db_connections == true, this is for 
175     * specific console command such as bvfs or batch mode, and we don't
176     * want to share a batch mode or bvfs. In the future, we can change
177     * the creation function to add this parameter.
178     */
179    m_dedicated = mult_db_connections; 
180
181    /*
182     * Initialize the private members.
183     */
184    m_db_handle = NULL;
185    m_result = NULL;
186    m_field_get = NULL;
187
188    /*
189     * Put the db in the list.
190     */
191    if (db_list == NULL) {
192       db_list = New(dlist(this, &this->m_link));
193       dbi_getvalue_list = New(dlist(field, &field->link));
194    }
195    db_list->append(this);
196 }
197
198 B_DB_DBI::~B_DB_DBI()
199 {
200 }
201
202 /*
203  * Now actually open the database.  This can generate errors,
204  *   which are returned in the errmsg
205  *
206  * DO NOT close the database or delete mdb here  !!!!
207  */
208 bool B_DB_DBI::db_open_database(JCR *jcr)
209 {
210    bool retval = false;
211    int errstat;
212    int dbstat;
213    uint8_t len;
214    const char *dbi_errmsg;
215    char buf[10], *port;
216    int numdrivers;
217    char *new_db_name = NULL;
218    char *new_db_dir = NULL;
219
220    P(mutex);
221    if (m_connected) {
222       retval = true;
223       goto bail_out;
224    }
225
226    if ((errstat=rwl_init(&m_lock)) != 0) {
227       berrno be;
228       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
229             be.bstrerror(errstat));
230       goto bail_out;
231    }
232
233    if (m_db_port) {
234       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
235       port = buf;
236    } else {
237       port = NULL;
238    }
239
240    numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
241    if (numdrivers < 0) {
242       Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
243                                "db_driverdir=%s. It is probaly not found any drivers\n"),
244                                m_db_driverdir,numdrivers);
245       goto bail_out;
246    }
247    m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
248    /*
249     * Can be many types of databases
250     */
251    switch (m_db_type) {
252    case SQL_TYPE_MYSQL:
253       dbi_conn_set_option(m_db_handle, "host", m_db_address);      /* default = localhost */
254       dbi_conn_set_option(m_db_handle, "port", port);              /* default port */
255       dbi_conn_set_option(m_db_handle, "username", m_db_user);     /* login name */
256       dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
257       dbi_conn_set_option(m_db_handle, "dbname", m_db_name);       /* database name */
258       break;
259    case SQL_TYPE_POSTGRESQL:
260       dbi_conn_set_option(m_db_handle, "host", m_db_address);
261       dbi_conn_set_option(m_db_handle, "port", port);
262       dbi_conn_set_option(m_db_handle, "username", m_db_user);
263       dbi_conn_set_option(m_db_handle, "password", m_db_password);
264       dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
265       break;
266    case SQL_TYPE_SQLITE3:
267       len = strlen(working_directory) + 5;
268       new_db_dir = (char *)malloc(len);
269       strcpy(new_db_dir, working_directory);
270       strcat(new_db_dir, "/");
271       len = strlen(m_db_name) + 5;
272       new_db_name = (char *)malloc(len);
273       strcpy(new_db_name, m_db_name);
274       strcat(new_db_name, ".db");
275       dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
276       dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
277       Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
278       free(new_db_dir);
279       free(new_db_name);
280       break;
281    }
282
283    /*
284     * If connection fails, try at 5 sec intervals for 30 seconds.
285     */
286    for (int retry=0; retry < 6; retry++) {
287       dbstat = dbi_conn_connect(m_db_handle);
288       if (dbstat == 0) {
289          break;
290       }
291
292       dbi_conn_error(m_db_handle, &dbi_errmsg);
293       Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
294
295       bmicrosleep(5, 0);
296    }
297
298    if (dbstat != 0 ) {
299       Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
300          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
301          m_db_driver, m_db_name, m_db_user);
302       goto bail_out;
303    }
304
305    Dmsg0(50, "dbi_real_connect done\n");
306    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
307          m_db_user, m_db_name,
308         (m_db_password == NULL) ? "(NULL)" : m_db_password);
309
310    m_connected = true;
311
312    if (!check_tables_version(jcr, this)) {
313       goto bail_out;
314    }
315
316    switch (m_db_type) {
317    case SQL_TYPE_MYSQL:
318       /*
319        * Set connection timeout to 8 days specialy for batch mode
320        */
321       sql_query("SET wait_timeout=691200");
322       sql_query("SET interactive_timeout=691200");
323       break;
324    case SQL_TYPE_POSTGRESQL:
325       /*
326        * Tell PostgreSQL we are using standard conforming strings
327        * and avoid warnings such as:
328        * WARNING:  nonstandard use of \\ in a string literal
329        */
330       sql_query("SET datestyle TO 'ISO, YMD'");
331       sql_query("SET standard_conforming_strings=on");
332       break;
333    }
334
335    retval = true;
336
337 bail_out:
338    V(mutex);
339    return retval;
340 }
341
342 void B_DB_DBI::db_close_database(JCR *jcr)
343 {
344    db_end_transaction(jcr);
345    P(mutex);
346    m_ref_count--;
347    if (m_ref_count == 0) {
348       sql_free_result();
349       db_list->remove(this);
350       if (m_connected && m_db_handle) {
351          dbi_shutdown_r(m_instance);
352          m_db_handle = NULL;
353          m_instance = NULL;
354       }
355       rwl_destroy(&m_lock);
356       free_pool_memory(errmsg);
357       free_pool_memory(cmd);
358       free_pool_memory(cached_path);
359       free_pool_memory(fname);
360       free_pool_memory(path);
361       free_pool_memory(esc_name);
362       free_pool_memory(esc_path);
363       free_pool_memory(esc_obj);
364       if (m_db_driver) {
365          free(m_db_driver);
366       }
367       if (m_db_name) {
368          free(m_db_name);
369       }
370       if (m_db_user) {
371          free(m_db_user);
372       }
373       if (m_db_password) {
374          free(m_db_password);
375       }
376       if (m_db_address) {
377          free(m_db_address);
378       }
379       if (m_db_socket) {
380          free(m_db_socket);
381       }
382       if (m_db_driverdir) {
383          free(m_db_driverdir);
384       }
385       delete this;
386       if (db_list->size() == 0) {
387          delete db_list;
388          db_list = NULL;
389       }
390    }
391    V(mutex);
392 }
393
394 void B_DB_DBI::db_thread_cleanup(void)
395 {
396 }
397
398 /*
399  * Escape strings so that DBI is happy
400  *
401  *   NOTE! len is the length of the old string. Your new
402  *         string must be long enough (max 2*old+1) to hold
403  *         the escaped output.
404  *
405  * dbi_conn_quote_string_copy receives a pointer to pointer.
406  * We need copy the value of pointer to snew because libdbi change the
407  * pointer
408  */
409 void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
410 {
411    char *inew;
412    char *pnew;
413
414    if (len == 0) {
415       snew[0] = 0;
416    } else {
417       /*
418        * Correct the size of old basead in len and copy new string to inew
419        */
420       inew = (char *)malloc(sizeof(char) * len + 1);
421       bstrncpy(inew,old,len + 1);
422       /*
423        * Escape the correct size of old
424        */
425       dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
426       free(inew);
427       /*
428        * Copy the escaped string to snew
429        */
430       bstrncpy(snew, pnew, 2 * len + 1);
431    }
432
433    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
434 }
435
436 /*
437  * Escape binary object so that DBI is happy
438  * Memory is stored in B_DB struct, no need to free it
439  */
440 char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
441 {
442    size_t new_len;
443    char *pnew;
444
445    if (len == 0) {
446       esc_obj[0] = 0;
447    } else {
448       new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
449       esc_obj = check_pool_memory_size(esc_obj, new_len+1);
450       memcpy(esc_obj, pnew, new_len);
451    }
452
453    return esc_obj;
454 }
455
456 /*
457  * Unescape binary object so that DBI is happy
458  */
459 void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
460                                   POOLMEM **dest, int32_t *dest_len)
461 {
462    if (!from) {
463       *dest[0] = 0;
464       *dest_len = 0;
465       return;
466    }
467    *dest = check_pool_memory_size(*dest, expected_len+1);
468    *dest_len = expected_len;
469    memcpy(*dest, from, expected_len);
470    (*dest)[expected_len]=0;
471 }
472
473 /*
474  * Start a transaction. This groups inserts and makes things
475  * much more efficient. Usually started when inserting
476  * file attributes.
477  */
478 void B_DB_DBI::db_start_transaction(JCR *jcr)
479 {
480    if (!jcr->attr) {
481       jcr->attr = get_pool_memory(PM_FNAME);
482    }
483    if (!jcr->ar) {
484       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
485    }
486
487    switch (m_db_type) {
488    case SQL_TYPE_SQLITE3:
489       if (!m_allow_transactions) {
490          return;
491       }
492
493       db_lock(this);
494       /*
495        * Allow only 10,000 changes per transaction
496        */
497       if (m_transaction && changes > 10000) {
498          db_end_transaction(jcr);
499       }
500       if (!m_transaction) {
501          sql_query("BEGIN");  /* begin transaction */
502          Dmsg0(400, "Start SQLite transaction\n");
503          m_transaction = true;
504       }
505       db_unlock(this);
506       break;
507    case SQL_TYPE_POSTGRESQL:
508       /*
509        * This is turned off because transactions break
510        * if multiple simultaneous jobs are run.
511        */
512       if (!m_allow_transactions) {
513          return;
514       }
515
516       db_lock(this);
517       /*
518        * Allow only 25,000 changes per transaction
519        */
520       if (m_transaction && changes > 25000) {
521          db_end_transaction(jcr);
522       }
523       if (!m_transaction) {
524          sql_query("BEGIN");  /* begin transaction */
525          Dmsg0(400, "Start PosgreSQL transaction\n");
526          m_transaction = true;
527       }
528       db_unlock(this);
529       break;
530    case SQL_TYPE_INGRES:
531       if (!m_allow_transactions) {
532          return;
533       }
534
535       db_lock(this);
536       /*
537        * Allow only 25,000 changes per transaction
538        */
539       if (m_transaction && changes > 25000) {
540          db_end_transaction(jcr);
541       }
542       if (!m_transaction) {
543          sql_query("BEGIN");  /* begin transaction */
544          Dmsg0(400, "Start Ingres transaction\n");
545          m_transaction = true;
546       }
547       db_unlock(this);
548       break;
549    default:
550       break;
551    }
552 }
553
554 void B_DB_DBI::db_end_transaction(JCR *jcr)
555 {
556    if (jcr && jcr->cached_attribute) {
557       Dmsg0(400, "Flush last cached attribute.\n");
558       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
559          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
560       }
561       jcr->cached_attribute = false;
562    }
563
564    switch (m_db_type) {
565    case SQL_TYPE_SQLITE3:
566       if (!m_allow_transactions) {
567          return;
568       }
569
570       db_lock(this);
571       if (m_transaction) {
572          sql_query("COMMIT"); /* end transaction */
573          m_transaction = false;
574          Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
575       }
576       changes = 0;
577       db_unlock(this);
578       break;
579    case SQL_TYPE_POSTGRESQL:
580       if (!m_allow_transactions) {
581          return;
582       }
583
584       db_lock(this);
585       if (m_transaction) {
586          sql_query("COMMIT"); /* end transaction */
587          m_transaction = false;
588          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
589       }
590       changes = 0;
591       db_unlock(this);
592       break;
593    case SQL_TYPE_INGRES:
594       if (!m_allow_transactions) {
595          return;
596       }
597
598       db_lock(this);
599       if (m_transaction) {
600          sql_query("COMMIT"); /* end transaction */
601          m_transaction = false;
602          Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
603       }
604       changes = 0;
605       db_unlock(this);
606       break;
607    default:
608       break;
609    }
610 }
611
612 /*
613  * Submit a general SQL command (cmd), and for each row returned,
614  * the result_handler is called with the ctx.
615  */
616 bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
617 {
618    bool retval = true;
619    SQL_ROW row;
620
621    Dmsg1(500, "db_sql_query starts with %s\n", query);
622
623    db_lock(this);
624    if (!sql_query(query, QF_STORE_RESULT)) {
625       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
626       Dmsg0(500, "db_sql_query failed\n");
627       retval = false;
628       goto bail_out;
629    }
630
631    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
632
633    if (result_handler != NULL) {
634       Dmsg0(500, "db_sql_query invoking handler\n");
635       while ((row = sql_fetch_row()) != NULL) {
636          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
637          if (result_handler(ctx, m_num_fields, row))
638             break;
639       }
640       sql_free_result();
641    }
642
643    Dmsg0(500, "db_sql_query finished\n");
644
645 bail_out:
646    db_unlock(this);
647    return retval;
648 }
649
650 /*
651  * Note, if this routine returns 1 (failure), Bacula expects
652  *  that no result has been stored.
653  *
654  *  Returns:  true on success
655  *            false on failure
656  */
657 bool B_DB_DBI::sql_query(const char *query, int flags)
658 {
659    bool retval = false;
660    const char *dbi_errmsg;
661
662    Dmsg1(500, "sql_query starts with %s\n", query);
663
664    /*
665     * We are starting a new query.  reset everything.
666     */
667    m_num_rows     = -1;
668    m_row_number   = -1;
669    m_field_number = -1;
670
671    if (m_result) {
672       dbi_result_free(m_result);  /* hmm, someone forgot to free?? */
673       m_result = NULL;
674    }
675
676    m_result = (void **)dbi_conn_query(m_db_handle, query);
677
678    if (!m_result) {
679       Dmsg2(50, "Query failed: %s %p\n", query, m_result);
680       goto bail_out;
681    }
682
683    m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
684    if (m_status == DBI_ERROR_NONE) {
685       Dmsg1(500, "we have a result\n", query);
686
687       /*
688        * How many fields in the set?
689        * num_fields starting at 1
690        */
691       m_num_fields = dbi_result_get_numfields(m_result);
692       Dmsg1(500, "we have %d fields\n", m_num_fields);
693       /*
694        * If no result num_rows is 0
695        */
696       m_num_rows = dbi_result_get_numrows(m_result);
697       Dmsg1(500, "we have %d rows\n", m_num_rows);
698
699       m_status = (dbi_error_flag) 0;                  /* succeed */
700    } else {
701       Dmsg1(50, "Result status failed: %s\n", query);
702       goto bail_out;
703    }
704
705    Dmsg0(500, "sql_query finishing\n");
706    retval = true;
707    goto ok_out;
708
709 bail_out:
710    m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
711    //dbi_conn_error(m_db_handle, &dbi_errmsg);
712    Dmsg4(500, "sql_query we failed dbi error: "
713                    "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
714    dbi_result_free(m_result);
715    m_result = NULL;
716    m_status = (dbi_error_flag) 1;                   /* failed */
717
718 ok_out:
719    return retval;
720 }
721
722 void B_DB_DBI::sql_free_result(void)
723 {
724    DBI_FIELD_GET *f;
725
726    db_lock(this);
727    if (m_result) {
728       dbi_result_free(m_result);
729       m_result = NULL;
730    }
731    if (m_rows) {
732       free(m_rows);
733       m_rows = NULL;
734    }
735    /* 
736     * Now is time to free all value return by dbi_get_value
737     * this is necessary because libdbi don't free memory return by yours results
738     * and Bacula has some routine wich call more than once time sql_fetch_row
739     *
740     * Using a queue to store all pointer allocate is a good way to free all things
741     * when necessary
742     */
743    foreach_dlist(f, dbi_getvalue_list) {
744       free(f->value);
745       free(f);
746    }
747    if (m_fields) {
748       free(m_fields);
749       m_fields = NULL;
750    }
751    m_num_rows = m_num_fields = 0;
752    db_unlock(this);
753 }
754
755 /* dbi_getvalue
756  * like PQgetvalue;
757  * char *PQgetvalue(const PGresult *res,
758  *                int row_number,
759  *                int column_number);
760  *
761  * use dbi_result_seek_row to search in result set
762  * use example to return only strings
763  */
764 static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
765 {
766    char *buf = NULL;
767    const char *dbi_errmsg;
768    const char *field_name;
769    unsigned short dbitype;
770    size_t field_length;
771    int64_t num;
772
773    /* correct the index for dbi interface
774     * dbi index begins 1
775     * I prefer do not change others functions
776     */
777    Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
778          result, row_number, column_number);
779
780    column_number++;
781
782    if(row_number == 0) {
783      row_number++;
784    }
785
786    Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
787                         result, row_number, column_number);
788
789    if(dbi_result_seek_row(result, row_number)) {
790
791       field_name = dbi_result_get_field_name(result, column_number);
792       field_length = dbi_result_get_field_length(result, field_name);
793       dbitype = dbi_result_get_field_type_idx(result,column_number);
794
795       Dmsg3(500, "dbi_getvalue start: type: '%d' "
796             "field_length bytes: '%d' fieldname: '%s'\n",
797             dbitype, field_length, field_name);
798
799       if(field_length) {
800          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
801          buf = (char *)malloc(field_length + 1);
802       } else {
803          /*
804           * if numbers
805           */
806          buf = (char *)malloc(sizeof(char *) * 50);
807       }
808
809       switch (dbitype) {
810       case DBI_TYPE_INTEGER:
811          num = dbi_result_get_longlong(result, field_name);
812          edit_int64(num, buf);
813          field_length = strlen(buf);
814          break;
815       case DBI_TYPE_STRING:
816          if(field_length) {
817             field_length = bsnprintf(buf, field_length + 1, "%s",
818             dbi_result_get_string(result, field_name));
819          } else {
820             buf[0] = 0;
821          }
822          break;
823       case DBI_TYPE_BINARY:
824          /*
825           * dbi_result_get_binary return a NULL pointer if value is empty
826           * following, change this to what Bacula espected
827           */
828          if(field_length) {
829             field_length = bsnprintf(buf, field_length + 1, "%s",
830                   dbi_result_get_binary(result, field_name));
831          } else {
832             buf[0] = 0;
833          }
834          break;
835       case DBI_TYPE_DATETIME:
836          time_t last;
837          struct tm tm;
838
839          last = dbi_result_get_datetime(result, field_name);
840
841          if(last == -1) {
842                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
843          } else {
844             (void)localtime_r(&last, &tm);
845             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
846                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
847                   tm.tm_hour, tm.tm_min, tm.tm_sec);
848          }
849          break;
850       }
851
852    } else {
853       dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
854       Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
855    }
856
857    Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
858       buf, field_length, buf);
859
860    /*
861     * Don't worry about this buf
862     */
863    return buf;
864 }
865
866 SQL_ROW B_DB_DBI::sql_fetch_row(void)
867 {
868    int j;
869    SQL_ROW row = NULL; /* by default, return NULL */
870
871    Dmsg0(500, "sql_fetch_row start\n");
872    if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
873       if (m_rows) {
874          Dmsg0(500, "sql_fetch_row freeing space\n");
875          Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
876          if (m_num_rows != 0) {
877             for (j = 0; j < m_num_fields; j++) {
878                Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
879                   if (m_rows[j]) {
880                      free(m_rows[j]);
881                   }
882             }
883          }
884          free(m_rows);
885       }
886       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
887       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
888       m_rows_size = m_num_fields;
889
890       /*
891        * Now reset the row_number now that we have the space allocated
892        */
893       m_row_number = 1;
894    }
895
896    /*
897     * If still within the result set
898     */
899    if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
900       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
901       /*
902        * Get each value from this row
903        */
904       for (j = 0; j < m_num_fields; j++) {
905          m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
906          /*
907           * Allocate space to queue row
908           */
909          m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
910          /*
911           * Store the pointer in queue
912           */
913          m_field_get->value = m_rows[j];
914          Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
915                j, m_rows[j], m_field_get->value, m_rows[j]);
916          /*
917           * Insert in queue to future free
918           */
919          dbi_getvalue_list->append(m_field_get);
920       }
921       /*
922        * Increment the row number for the next call
923        */
924       m_row_number++;
925
926       row = m_rows;
927    } else {
928       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
929    }
930
931    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
932
933    return row;
934 }
935
936 const char *B_DB_DBI::sql_strerror(void)
937 {
938    const char *dbi_errmsg;
939
940    dbi_conn_error(m_db_handle, &dbi_errmsg);
941
942    return dbi_errmsg;
943 }
944
945 void B_DB_DBI::sql_data_seek(int row)
946 {
947    /*
948     * Set the row number to be returned on the next call to sql_fetch_row
949     */
950    m_row_number = row;
951 }
952
953 int B_DB_DBI::sql_affected_rows(void)
954 {
955 #if 0
956    return dbi_result_get_numrows_affected(result);
957 #else
958    return 1;
959 #endif
960 }
961
962 uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
963 {
964    char sequence[30];
965    uint64_t id = 0;
966
967    /*
968     * First execute the insert query and then retrieve the currval.
969     */
970    if (!sql_query(query)) {
971       return 0;
972    }
973
974    m_num_rows = sql_affected_rows();
975    if (m_num_rows != 1) {
976       return 0;
977    }
978
979    changes++;
980
981    /*
982     * Obtain the current value of the sequence that
983     * provides the serial value for primary key of the table.
984     *
985     * currval is local to our session.  It is not affected by
986     * other transactions.
987     *
988     * Determine the name of the sequence.
989     * PostgreSQL automatically creates a sequence using
990     * <table>_<column>_seq.
991     * At the time of writing, all tables used this format for
992     * for their primary key: <table>id
993     * Except for basefiles which has a primary key on baseid.
994     * Therefore, we need to special case that one table.
995     *
996     * everything else can use the PostgreSQL formula.
997     */
998    if (m_db_type == SQL_TYPE_POSTGRESQL) {
999       if (strcasecmp(table_name, "basefiles") == 0) {
1000          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1001       } else {
1002          bstrncpy(sequence, table_name, sizeof(sequence));
1003          bstrncat(sequence, "_", sizeof(sequence));
1004          bstrncat(sequence, table_name, sizeof(sequence));
1005          bstrncat(sequence, "id", sizeof(sequence));
1006       }
1007
1008       bstrncat(sequence, "_seq", sizeof(sequence));
1009       id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
1010    } else {
1011       id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
1012    }
1013
1014    return id;
1015 }
1016
1017 /* dbi_getisnull
1018  * like PQgetisnull
1019  * int PQgetisnull(const PGresult *res,
1020  *                 int row_number,
1021  *                 int column_number);
1022  *
1023  *  use dbi_result_seek_row to search in result set
1024  */
1025 static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1026    int i;
1027
1028    if (row_number == 0) {
1029       row_number++;
1030    }
1031
1032    column_number++;
1033
1034    if (dbi_result_seek_row(result, row_number)) {
1035       i = dbi_result_field_is_null_idx(result,column_number);
1036       return i;
1037    } else {
1038       return 0;
1039    }
1040 }
1041
1042 SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
1043 {
1044    int i, j;
1045    int dbi_index;
1046    int max_length;
1047    int this_length;
1048    char *cbuf = NULL;
1049
1050    Dmsg0(500, "sql_fetch_field starts\n");
1051
1052    if (!m_fields || m_fields_size < m_num_fields) {
1053       if (m_fields) {
1054          free(m_fields);
1055          m_fields = NULL;
1056       }
1057       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
1058       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
1059       m_fields_size = m_num_fields;
1060
1061       for (i = 0; i < m_num_fields; i++) {
1062          /*
1063           * num_fields is starting at 1, increment i by 1
1064           */
1065          dbi_index = i + 1;
1066          Dmsg1(500, "filling field %d\n", i);
1067          m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
1068          m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
1069          m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
1070
1071          /*
1072           * For a given column, find the max length.
1073           */
1074          max_length = 0;
1075          for (j = 0; j < m_num_rows; j++) {
1076             if (dbi_getisnull(m_result, j, dbi_index)) {
1077                 this_length = 4;        /* "NULL" */
1078             } else {
1079                cbuf = dbi_getvalue(m_result, j, dbi_index);
1080                this_length = cstrlen(cbuf);
1081                /*
1082                 * cbuf is always free
1083                 */
1084                free(cbuf);
1085             }
1086          
1087             if (max_length < this_length) {
1088                max_length = this_length;
1089             }
1090          }
1091          m_fields[i].max_length = max_length;
1092
1093          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
1094                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
1095       }
1096    }
1097
1098    /*
1099     * Increment field number for the next time around
1100     */
1101    return &m_fields[m_field_number++];
1102 }
1103
1104 bool B_DB_DBI::sql_field_is_not_null(int field_type)
1105 {
1106    switch (field_type) {
1107    case (1 << 0):
1108       return true;
1109    default:
1110       return false;
1111    }
1112 }
1113
1114 bool B_DB_DBI::sql_field_is_numeric(int field_type)
1115 {
1116    switch (field_type) {
1117    case 1:
1118    case 2:
1119       return true;
1120    default:
1121       return false;
1122    }
1123 }
1124
1125 /*
1126  * Escape strings so that PostgreSQL is happy on COPY
1127  *
1128  *   NOTE! len is the length of the old string. Your new
1129  *         string must be long enough (max 2*old+1) to hold
1130  *         the escaped output.
1131  */
1132 static char *postgresql_copy_escape(char *dest, char *src, size_t len)
1133 {
1134    /*
1135     * We have to escape \t, \n, \r, \
1136     */
1137    char c = '\0' ;
1138
1139    while (len > 0 && *src) {
1140       switch (*src) {
1141       case '\n':
1142          c = 'n';
1143          break;
1144       case '\\':
1145          c = '\\';
1146          break;
1147       case '\t':
1148          c = 't';
1149          break;
1150       case '\r':
1151          c = 'r';
1152          break;
1153       default:
1154          c = '\0' ;
1155       }
1156
1157       if (c) {
1158          *dest = '\\';
1159          dest++;
1160          *dest = c;
1161       } else {
1162          *dest = *src;
1163       }
1164
1165       len--;
1166       src++;
1167       dest++;
1168    }
1169
1170    *dest = '\0';
1171    return dest;
1172 }
1173
1174 /*
1175  * This can be a bit strang but is the one way to do
1176  *
1177  * Returns true if OK
1178  *         false if failed
1179  */
1180 bool B_DB_DBI::sql_batch_start(JCR *jcr)
1181 {
1182    bool retval = true;
1183    const char *query = "COPY batch FROM STDIN";
1184
1185    Dmsg0(500, "sql_batch_start started\n");
1186
1187    db_lock(this);
1188    switch (m_db_type) {
1189    case SQL_TYPE_MYSQL:
1190       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1191                              "FileIndex integer,"
1192                              "JobId integer,"
1193                              "Path blob,"
1194                              "Name blob,"
1195                              "LStat tinyblob,"
1196                              "MD5 tinyblob,"
1197                              "DeltaSeq smallint)")) {
1198          Dmsg0(500, "sql_batch_start failed\n");
1199          goto bail_out;
1200       }
1201       Dmsg0(500, "sql_batch_start finishing\n");
1202       goto ok_out;
1203    case SQL_TYPE_POSTGRESQL:
1204       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1205                              "FileIndex int,"
1206                              "JobId int,"
1207                              "Path varchar,"
1208                              "Name varchar,"
1209                              "LStat varchar,"
1210                              "MD5 varchar,"
1211                              "DeltaSeq int)")) {
1212          Dmsg0(500, "sql_batch_start failed\n");
1213          goto bail_out;
1214       }
1215
1216       /*
1217        * We are starting a new query.  reset everything.
1218        */
1219       m_num_rows     = -1;
1220       m_row_number   = -1;
1221       m_field_number = -1;
1222
1223       sql_free_result();
1224
1225       for (int i=0; i < 10; i++) {
1226          sql_query(query);
1227          if (m_result) {
1228             break;
1229          }
1230          bmicrosleep(5, 0);
1231       }
1232       if (!m_result) {
1233          Dmsg1(50, "Query failed: %s\n", query);
1234          goto bail_out;
1235       }
1236
1237       m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
1238       //m_status = DBI_ERROR_NONE;
1239
1240       if (m_status == DBI_ERROR_NONE) {
1241          /*
1242           * How many fields in the set?
1243           */
1244          m_num_fields = dbi_result_get_numfields(m_result);
1245          m_num_rows = dbi_result_get_numrows(m_result);
1246          m_status = (dbi_error_flag) 1;
1247       } else {
1248          Dmsg1(50, "Result status failed: %s\n", query);
1249          goto bail_out;
1250       }
1251
1252       Dmsg0(500, "sql_batch_start finishing\n");
1253       goto ok_out;
1254    case SQL_TYPE_SQLITE3:
1255       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1256                              "FileIndex integer,"
1257                              "JobId integer,"
1258                              "Path blob,"
1259                              "Name blob,"
1260                              "LStat tinyblob,"
1261                              "MD5 tinyblob,"
1262                              "DeltaSeq smallint)")) {
1263          Dmsg0(500, "sql_batch_start failed\n");
1264          goto bail_out;
1265       }
1266       Dmsg0(500, "sql_batch_start finishing\n");
1267       goto ok_out;
1268    }
1269
1270 bail_out:
1271    Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
1272    m_status = (dbi_error_flag) 0;
1273    sql_free_result();
1274    m_result = NULL;
1275    retval = false;
1276
1277 ok_out:
1278    db_unlock(this);
1279    return retval;
1280 }
1281
1282 /*
1283  * Set error to something to abort operation
1284  */
1285 bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
1286 {
1287    int res = 0;
1288    int count = 30;
1289    int (*custom_function)(void*, const char*) = NULL;
1290    dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1291
1292    Dmsg0(500, "sql_batch_start started\n");
1293
1294    switch (m_db_type) {
1295    case SQL_TYPE_MYSQL:
1296       m_status = (dbi_error_flag) 0;
1297       break;
1298    case SQL_TYPE_POSTGRESQL:
1299       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
1300
1301       do {
1302          res = (*custom_function)(myconn->connection, error);
1303       } while (res == 0 && --count > 0);
1304
1305       if (res == 1) {
1306          Dmsg0(500, "ok\n");
1307          m_status = (dbi_error_flag) 1;
1308       }
1309
1310       if (res <= 0) {
1311          Dmsg0(500, "we failed\n");
1312          m_status = (dbi_error_flag) 0;
1313          //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
1314        }
1315       break;
1316    case SQL_TYPE_SQLITE3:
1317       m_status = (dbi_error_flag) 0;
1318       break;
1319    }
1320
1321    Dmsg0(500, "sql_batch_start finishing\n");
1322
1323    return true;
1324 }
1325
1326 /*
1327  * This function is big and use a big switch.
1328  * In near future is better split in small functions
1329  * and refactory.
1330  */
1331 bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1332 {
1333    int res;
1334    int count=30;
1335    dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1336    int (*custom_function)(void*, const char*, int) = NULL;
1337    char* (*custom_function_error)(void*) = NULL;
1338    size_t len;
1339    char *digest;
1340    char ed1[50];
1341
1342    Dmsg0(500, "sql_batch_start started \n");
1343
1344    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1345    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1346
1347    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1348       *digest = '\0';
1349    } else {
1350       digest = ar->Digest;
1351    }
1352
1353    switch (m_db_type) {
1354    case SQL_TYPE_MYSQL:
1355       db_escape_string(jcr, esc_name, fname, fnl);
1356       db_escape_string(jcr, esc_path, path, pnl);
1357       len = Mmsg(cmd, "INSERT INTO batch VALUES "
1358                       "(%u,%s,'%s','%s','%s','%s',%u)",
1359                       ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1360                       esc_name, ar->attr, digest, ar->DeltaSeq);
1361
1362       if (!sql_query(cmd))
1363       {
1364          Dmsg0(500, "sql_batch_start failed\n");
1365          goto bail_out;
1366       }
1367
1368       Dmsg0(500, "sql_batch_start finishing\n");
1369
1370       return true;
1371       break;
1372    case SQL_TYPE_POSTGRESQL:
1373       postgresql_copy_escape(esc_name, fname, fnl);
1374       postgresql_copy_escape(esc_path, path, pnl);
1375       len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1376                      ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1377                      esc_name, ar->attr, digest, ar->DeltaSeq);
1378
1379       /*
1380        * libdbi don't support CopyData and we need call a postgresql
1381        * specific function to do this work
1382        */
1383       Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
1384       custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
1385       if (custom_function != NULL) {
1386          do {
1387             res = (*custom_function)(myconn->connection, cmd, len);
1388          } while (res == 0 && --count > 0);
1389
1390          if (res == 1) {
1391             Dmsg0(500, "ok\n");
1392             changes++;
1393             m_status = (dbi_error_flag) 1;
1394          }
1395
1396          if (res <= 0) {
1397             Dmsg0(500, "sql_batch_insert failed\n");
1398             goto bail_out;
1399          }
1400
1401          Dmsg0(500, "sql_batch_insert finishing\n");
1402          return true;
1403       } else {
1404          /*
1405           * Ensure to detect a PQerror
1406           */
1407          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
1408          Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1409          goto bail_out;
1410       }
1411       break;
1412    case SQL_TYPE_SQLITE3:
1413       db_escape_string(jcr, esc_name, fname, fnl);
1414       db_escape_string(jcr, esc_path, path, pnl);
1415       len = Mmsg(cmd, "INSERT INTO batch VALUES "
1416                       "(%u,%s,'%s','%s','%s','%s',%u)",
1417                       ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1418                       esc_name, ar->attr, digest, ar->DeltaSeq);
1419
1420       if (!sql_query(cmd))
1421       {
1422          Dmsg0(500, "sql_batch_insert failed\n");
1423          goto bail_out;
1424       }
1425
1426       Dmsg0(500, "sql_batch_insert finishing\n");
1427
1428       return true;
1429       break;
1430    }
1431
1432 bail_out:
1433    Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
1434    m_status = (dbi_error_flag) 0;
1435    sql_free_result();
1436    return false;
1437 }
1438
1439 /*
1440  * Initialize database data structure. In principal this should
1441  * never have errors, or it is really fatal.
1442  */
1443 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1444                        const char *db_password, const char *db_address, int db_port,
1445                        const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1446 {
1447    B_DB_DBI *mdb = NULL;
1448
1449    if (!db_driver) {
1450       Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
1451    }
1452
1453    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
1454       Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
1455    }
1456
1457    if (!db_user) {
1458       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
1459       return NULL;
1460    }
1461
1462    P(mutex);                          /* lock DB queue */
1463    if (db_list && !mult_db_connections) {
1464       /*
1465        * Look to see if DB already open
1466        */
1467       foreach_dlist(mdb, db_list) {
1468          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1469             Dmsg1(100, "DB REopen %s\n", db_name);
1470             mdb->increment_refcount();
1471             goto bail_out;
1472          }
1473       }
1474    }
1475    Dmsg0(100, "db_init_database first time\n");
1476    mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
1477                       db_port, db_socket, mult_db_connections, disable_batch_insert));
1478
1479 bail_out:
1480    V(mutex);
1481    return mdb;
1482 }
1483
1484 #endif /* HAVE_DBI */