]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Tweak second part of fix for #1893
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
37  */
38 /*
39  * This code only compiles against a recent version of libdbi. The current
40  * release found on the libdbi website (0.8.3) won't work for this code.
41  *
42  * You find the libdbi library on http://sourceforge.net/projects/libdbi
43  *
44  * A fairly recent version of libdbi from CVS works, so either make sure
45  * your distribution has a fairly recent version of libdbi installed or
46  * clone the CVS repositories from sourceforge and compile that code and
47  * install it.
48  *
49  * You need:
50  * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
52  */
53
54 #include "bacula.h"
55
56 #ifdef HAVE_DBI
57
58 #include "cats.h"
59 #include "bdb_priv.h"
60 #include <dbi/dbi.h>
61 #include <dbi/dbi-dev.h>
62 #include <bdb_dbi.h>
63
64 /* -----------------------------------------------------------------------
65  *
66  *   DBI dependent defines and subroutines
67  *
68  * -----------------------------------------------------------------------
69  */
70
71 /*
72  * List of open databases
73  */
74 static dlist *db_list = NULL;
75
76 /*
77  * Control allocated fields by dbi_getvalue
78  */
79 static dlist *dbi_getvalue_list = NULL;
80
81 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
82
83 typedef int (*custom_function_insert_t)(void*, const char*, int);
84 typedef char* (*custom_function_error_t)(void*);
85 typedef int (*custom_function_end_t)(void*, const char*);
86
87 B_DB_DBI::B_DB_DBI(JCR *jcr,
88                    const char *db_driver,
89                    const char *db_name,
90                    const char *db_user,
91                    const char *db_password,
92                    const char *db_address,
93                    int db_port,
94                    const char *db_socket,
95                    bool mult_db_connections,
96                    bool disable_batch_insert)
97 {
98    char *p;
99    char new_db_driver[10];
100    char db_driverdir[256];
101    DBI_FIELD_GET *field;
102
103    p = (char *)(db_driver + 4);
104    if (strcasecmp(p, "mysql") == 0) {
105       m_db_type = SQL_TYPE_MYSQL;
106       bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
107    } else if (strcasecmp(p, "postgresql") == 0) {
108       m_db_type = SQL_TYPE_POSTGRESQL;
109       bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
110    } else if (strcasecmp(p, "sqlite3") == 0) {
111       m_db_type = SQL_TYPE_SQLITE3;
112       bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
113    } else if (strcasecmp(p, "ingres") == 0) {
114       m_db_type = SQL_TYPE_INGRES;
115       bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
116    } else {
117       Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
118       return;
119    }
120
121    /*
122     * Set db_driverdir whereis is the libdbi drivers
123     */
124    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
125
126    /*
127     * Initialize the parent class members.
128     */
129    m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
130    m_db_name = bstrdup(db_name);
131    m_db_user = bstrdup(db_user);
132    if (db_password) {
133       m_db_password = bstrdup(db_password);
134    }
135    if (db_address) {
136       m_db_address = bstrdup(db_address);
137    }
138    if (db_socket) {
139       m_db_socket = bstrdup(db_socket);
140    }
141    if (db_driverdir) {
142       m_db_driverdir = bstrdup(db_driverdir);
143    }
144    m_db_driver = bstrdup(new_db_driver);
145    m_db_port = db_port;
146    if (disable_batch_insert) {
147       m_disabled_batch_insert = true;
148       m_have_batch_insert = false;
149    } else {
150       m_disabled_batch_insert = false;
151 #if defined(USE_BATCH_FILE_INSERT)
152 #ifdef HAVE_DBI_BATCH_FILE_INSERT
153       m_have_batch_insert = true;
154 #else
155       m_have_batch_insert = false;
156 #endif /* HAVE_DBI_BATCH_FILE_INSERT */
157 #else
158       m_have_batch_insert = false;
159 #endif /* USE_BATCH_FILE_INSERT */
160    }
161    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
162    *errmsg = 0;
163    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
164    cached_path = get_pool_memory(PM_FNAME);
165    cached_path_id = 0;
166    m_ref_count = 1;
167    fname = get_pool_memory(PM_FNAME);
168    path = get_pool_memory(PM_FNAME);
169    esc_name = get_pool_memory(PM_FNAME);
170    esc_path = get_pool_memory(PM_FNAME);
171    esc_obj = get_pool_memory(PM_FNAME);
172    m_allow_transactions = mult_db_connections;
173
174    /* At this time, when mult_db_connections == true, this is for 
175     * specific console command such as bvfs or batch mode, and we don't
176     * want to share a batch mode or bvfs. In the future, we can change
177     * the creation function to add this parameter.
178     */
179    m_dedicated = mult_db_connections; 
180
181    /*
182     * Initialize the private members.
183     */
184    m_db_handle = NULL;
185    m_result = NULL;
186    m_field_get = NULL;
187
188    /*
189     * Put the db in the list.
190     */
191    if (db_list == NULL) {
192       db_list = New(dlist(this, &this->m_link));
193       dbi_getvalue_list = New(dlist(field, &field->link));
194    }
195    db_list->append(this);
196 }
197
198 B_DB_DBI::~B_DB_DBI()
199 {
200 }
201
202 /*
203  * Now actually open the database.  This can generate errors,
204  *   which are returned in the errmsg
205  *
206  * DO NOT close the database or delete mdb here  !!!!
207  */
208 bool B_DB_DBI::db_open_database(JCR *jcr)
209 {
210    bool retval = false;
211    int errstat;
212    int dbstat;
213    uint8_t len;
214    const char *dbi_errmsg;
215    char buf[10], *port;
216    int numdrivers;
217    char *new_db_name = NULL;
218    char *new_db_dir = NULL;
219
220    P(mutex);
221    if (m_connected) {
222       retval = true;
223       goto bail_out;
224    }
225
226    if ((errstat=rwl_init(&m_lock)) != 0) {
227       berrno be;
228       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
229             be.bstrerror(errstat));
230       goto bail_out;
231    }
232
233    if (m_db_port) {
234       bsnprintf(buf, sizeof(buf), "%d", m_db_port);
235       port = buf;
236    } else {
237       port = NULL;
238    }
239
240    numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
241    if (numdrivers < 0) {
242       Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
243                                "db_driverdir=%s. It is probaly not found any drivers\n"),
244                                m_db_driverdir,numdrivers);
245       goto bail_out;
246    }
247    m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
248    /*
249     * Can be many types of databases
250     */
251    switch (m_db_type) {
252    case SQL_TYPE_MYSQL:
253       dbi_conn_set_option(m_db_handle, "host", m_db_address);      /* default = localhost */
254       dbi_conn_set_option(m_db_handle, "port", port);              /* default port */
255       dbi_conn_set_option(m_db_handle, "username", m_db_user);     /* login name */
256       dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
257       dbi_conn_set_option(m_db_handle, "dbname", m_db_name);       /* database name */
258       break;
259    case SQL_TYPE_POSTGRESQL:
260       dbi_conn_set_option(m_db_handle, "host", m_db_address);
261       dbi_conn_set_option(m_db_handle, "port", port);
262       dbi_conn_set_option(m_db_handle, "username", m_db_user);
263       dbi_conn_set_option(m_db_handle, "password", m_db_password);
264       dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
265       break;
266    case SQL_TYPE_SQLITE3:
267       len = strlen(working_directory) + 5;
268       new_db_dir = (char *)malloc(len);
269       strcpy(new_db_dir, working_directory);
270       strcat(new_db_dir, "/");
271       len = strlen(m_db_name) + 5;
272       new_db_name = (char *)malloc(len);
273       strcpy(new_db_name, m_db_name);
274       strcat(new_db_name, ".db");
275       dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
276       dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
277       Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
278       free(new_db_dir);
279       free(new_db_name);
280       break;
281    }
282
283    /*
284     * If connection fails, try at 5 sec intervals for 30 seconds.
285     */
286    for (int retry=0; retry < 6; retry++) {
287       dbstat = dbi_conn_connect(m_db_handle);
288       if (dbstat == 0) {
289          break;
290       }
291
292       dbi_conn_error(m_db_handle, &dbi_errmsg);
293       Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
294
295       bmicrosleep(5, 0);
296    }
297
298    if (dbstat != 0 ) {
299       Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
300          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
301          m_db_driver, m_db_name, m_db_user);
302       goto bail_out;
303    }
304
305    Dmsg0(50, "dbi_real_connect done\n");
306    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
307          m_db_user, m_db_name,
308         (m_db_password == NULL) ? "(NULL)" : m_db_password);
309
310    m_connected = true;
311
312    if (!check_tables_version(jcr, this)) {
313       goto bail_out;
314    }
315
316    switch (m_db_type) {
317    case SQL_TYPE_MYSQL:
318       /*
319        * Set connection timeout to 8 days specialy for batch mode
320        */
321       sql_query("SET wait_timeout=691200");
322       sql_query("SET interactive_timeout=691200");
323       break;
324    case SQL_TYPE_POSTGRESQL:
325       /*
326        * Tell PostgreSQL we are using standard conforming strings
327        * and avoid warnings such as:
328        * WARNING:  nonstandard use of \\ in a string literal
329        */
330       sql_query("SET datestyle TO 'ISO, YMD'");
331       sql_query("SET standard_conforming_strings=on");
332       break;
333    }
334
335    retval = true;
336
337 bail_out:
338    V(mutex);
339    return retval;
340 }
341
342 void B_DB_DBI::db_close_database(JCR *jcr)
343 {
344    if (m_connected) {
345       db_end_transaction(jcr);
346    }
347    P(mutex);
348    m_ref_count--;
349    if (m_ref_count == 0) {
350       if (m_connected) {
351          sql_free_result();
352       }
353       db_list->remove(this);
354       if (m_connected && m_db_handle) {
355          dbi_shutdown_r(m_instance);
356          m_db_handle = NULL;
357          m_instance = NULL;
358       }
359       if (rwl_is_init(&m_lock)) {
360          rwl_destroy(&m_lock);
361       }
362       free_pool_memory(errmsg);
363       free_pool_memory(cmd);
364       free_pool_memory(cached_path);
365       free_pool_memory(fname);
366       free_pool_memory(path);
367       free_pool_memory(esc_name);
368       free_pool_memory(esc_path);
369       free_pool_memory(esc_obj);
370       if (m_db_driver) {
371          free(m_db_driver);
372       }
373       if (m_db_name) {
374          free(m_db_name);
375       }
376       if (m_db_user) {
377          free(m_db_user);
378       }
379       if (m_db_password) {
380          free(m_db_password);
381       }
382       if (m_db_address) {
383          free(m_db_address);
384       }
385       if (m_db_socket) {
386          free(m_db_socket);
387       }
388       if (m_db_driverdir) {
389          free(m_db_driverdir);
390       }
391       delete this;
392       if (db_list->size() == 0) {
393          delete db_list;
394          db_list = NULL;
395       }
396    }
397    V(mutex);
398 }
399
400 void B_DB_DBI::db_thread_cleanup(void)
401 {
402 }
403
404 /*
405  * Escape strings so that DBI is happy
406  *
407  *   NOTE! len is the length of the old string. Your new
408  *         string must be long enough (max 2*old+1) to hold
409  *         the escaped output.
410  *
411  * dbi_conn_quote_string_copy receives a pointer to pointer.
412  * We need copy the value of pointer to snew because libdbi change the
413  * pointer
414  */
415 void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
416 {
417    char *inew;
418    char *pnew;
419
420    if (len == 0) {
421       snew[0] = 0;
422    } else {
423       /*
424        * Correct the size of old basead in len and copy new string to inew
425        */
426       inew = (char *)malloc(sizeof(char) * len + 1);
427       bstrncpy(inew,old,len + 1);
428       /*
429        * Escape the correct size of old
430        */
431       dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
432       free(inew);
433       /*
434        * Copy the escaped string to snew
435        */
436       bstrncpy(snew, pnew, 2 * len + 1);
437    }
438
439    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
440 }
441
442 /*
443  * Escape binary object so that DBI is happy
444  * Memory is stored in B_DB struct, no need to free it
445  */
446 char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
447 {
448    size_t new_len;
449    char *pnew;
450
451    if (len == 0) {
452       esc_obj[0] = 0;
453    } else {
454       new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
455       esc_obj = check_pool_memory_size(esc_obj, new_len+1);
456       memcpy(esc_obj, pnew, new_len);
457    }
458
459    return esc_obj;
460 }
461
462 /*
463  * Unescape binary object so that DBI is happy
464  */
465 void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
466                                   POOLMEM **dest, int32_t *dest_len)
467 {
468    if (!from) {
469       *dest[0] = 0;
470       *dest_len = 0;
471       return;
472    }
473    *dest = check_pool_memory_size(*dest, expected_len+1);
474    *dest_len = expected_len;
475    memcpy(*dest, from, expected_len);
476    (*dest)[expected_len]=0;
477 }
478
479 /*
480  * Start a transaction. This groups inserts and makes things
481  * much more efficient. Usually started when inserting
482  * file attributes.
483  */
484 void B_DB_DBI::db_start_transaction(JCR *jcr)
485 {
486    if (!jcr->attr) {
487       jcr->attr = get_pool_memory(PM_FNAME);
488    }
489    if (!jcr->ar) {
490       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
491    }
492
493    switch (m_db_type) {
494    case SQL_TYPE_SQLITE3:
495       if (!m_allow_transactions) {
496          return;
497       }
498
499       db_lock(this);
500       /*
501        * Allow only 10,000 changes per transaction
502        */
503       if (m_transaction && changes > 10000) {
504          db_end_transaction(jcr);
505       }
506       if (!m_transaction) {
507          sql_query("BEGIN");  /* begin transaction */
508          Dmsg0(400, "Start SQLite transaction\n");
509          m_transaction = true;
510       }
511       db_unlock(this);
512       break;
513    case SQL_TYPE_POSTGRESQL:
514       /*
515        * This is turned off because transactions break
516        * if multiple simultaneous jobs are run.
517        */
518       if (!m_allow_transactions) {
519          return;
520       }
521
522       db_lock(this);
523       /*
524        * Allow only 25,000 changes per transaction
525        */
526       if (m_transaction && changes > 25000) {
527          db_end_transaction(jcr);
528       }
529       if (!m_transaction) {
530          sql_query("BEGIN");  /* begin transaction */
531          Dmsg0(400, "Start PosgreSQL transaction\n");
532          m_transaction = true;
533       }
534       db_unlock(this);
535       break;
536    case SQL_TYPE_INGRES:
537       if (!m_allow_transactions) {
538          return;
539       }
540
541       db_lock(this);
542       /*
543        * Allow only 25,000 changes per transaction
544        */
545       if (m_transaction && changes > 25000) {
546          db_end_transaction(jcr);
547       }
548       if (!m_transaction) {
549          sql_query("BEGIN");  /* begin transaction */
550          Dmsg0(400, "Start Ingres transaction\n");
551          m_transaction = true;
552       }
553       db_unlock(this);
554       break;
555    default:
556       break;
557    }
558 }
559
560 void B_DB_DBI::db_end_transaction(JCR *jcr)
561 {
562    if (jcr && jcr->cached_attribute) {
563       Dmsg0(400, "Flush last cached attribute.\n");
564       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
565          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
566       }
567       jcr->cached_attribute = false;
568    }
569
570    switch (m_db_type) {
571    case SQL_TYPE_SQLITE3:
572       if (!m_allow_transactions) {
573          return;
574       }
575
576       db_lock(this);
577       if (m_transaction) {
578          sql_query("COMMIT"); /* end transaction */
579          m_transaction = false;
580          Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
581       }
582       changes = 0;
583       db_unlock(this);
584       break;
585    case SQL_TYPE_POSTGRESQL:
586       if (!m_allow_transactions) {
587          return;
588       }
589
590       db_lock(this);
591       if (m_transaction) {
592          sql_query("COMMIT"); /* end transaction */
593          m_transaction = false;
594          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
595       }
596       changes = 0;
597       db_unlock(this);
598       break;
599    case SQL_TYPE_INGRES:
600       if (!m_allow_transactions) {
601          return;
602       }
603
604       db_lock(this);
605       if (m_transaction) {
606          sql_query("COMMIT"); /* end transaction */
607          m_transaction = false;
608          Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
609       }
610       changes = 0;
611       db_unlock(this);
612       break;
613    default:
614       break;
615    }
616 }
617
618 /*
619  * Submit a general SQL command (cmd), and for each row returned,
620  * the result_handler is called with the ctx.
621  */
622 bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
623 {
624    bool retval = true;
625    SQL_ROW row;
626
627    Dmsg1(500, "db_sql_query starts with %s\n", query);
628
629    db_lock(this);
630    if (!sql_query(query, QF_STORE_RESULT)) {
631       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
632       Dmsg0(500, "db_sql_query failed\n");
633       retval = false;
634       goto bail_out;
635    }
636
637    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
638
639    if (result_handler != NULL) {
640       Dmsg0(500, "db_sql_query invoking handler\n");
641       while ((row = sql_fetch_row()) != NULL) {
642          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
643          if (result_handler(ctx, m_num_fields, row))
644             break;
645       }
646       sql_free_result();
647    }
648
649    Dmsg0(500, "db_sql_query finished\n");
650
651 bail_out:
652    db_unlock(this);
653    return retval;
654 }
655
656 /*
657  * Note, if this routine returns 1 (failure), Bacula expects
658  *  that no result has been stored.
659  *
660  *  Returns:  true on success
661  *            false on failure
662  */
663 bool B_DB_DBI::sql_query(const char *query, int flags)
664 {
665    bool retval = false;
666    const char *dbi_errmsg;
667
668    Dmsg1(500, "sql_query starts with %s\n", query);
669
670    /*
671     * We are starting a new query.  reset everything.
672     */
673    m_num_rows     = -1;
674    m_row_number   = -1;
675    m_field_number = -1;
676
677    if (m_result) {
678       dbi_result_free(m_result);  /* hmm, someone forgot to free?? */
679       m_result = NULL;
680    }
681
682    m_result = (void **)dbi_conn_query(m_db_handle, query);
683
684    if (!m_result) {
685       Dmsg2(50, "Query failed: %s %p\n", query, m_result);
686       goto bail_out;
687    }
688
689    m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
690    if (m_status == DBI_ERROR_NONE) {
691       Dmsg1(500, "we have a result\n", query);
692
693       /*
694        * How many fields in the set?
695        * num_fields starting at 1
696        */
697       m_num_fields = dbi_result_get_numfields(m_result);
698       Dmsg1(500, "we have %d fields\n", m_num_fields);
699       /*
700        * If no result num_rows is 0
701        */
702       m_num_rows = dbi_result_get_numrows(m_result);
703       Dmsg1(500, "we have %d rows\n", m_num_rows);
704
705       m_status = (dbi_error_flag) 0;                  /* succeed */
706    } else {
707       Dmsg1(50, "Result status failed: %s\n", query);
708       goto bail_out;
709    }
710
711    Dmsg0(500, "sql_query finishing\n");
712    retval = true;
713    goto ok_out;
714
715 bail_out:
716    m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
717    //dbi_conn_error(m_db_handle, &dbi_errmsg);
718    Dmsg4(500, "sql_query we failed dbi error: "
719                    "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
720    dbi_result_free(m_result);
721    m_result = NULL;
722    m_status = (dbi_error_flag) 1;                   /* failed */
723
724 ok_out:
725    return retval;
726 }
727
728 void B_DB_DBI::sql_free_result(void)
729 {
730    DBI_FIELD_GET *f;
731
732    db_lock(this);
733    if (m_result) {
734       dbi_result_free(m_result);
735       m_result = NULL;
736    }
737    if (m_rows) {
738       free(m_rows);
739       m_rows = NULL;
740    }
741    /* 
742     * Now is time to free all value return by dbi_get_value
743     * this is necessary because libdbi don't free memory return by yours results
744     * and Bacula has some routine wich call more than once time sql_fetch_row
745     *
746     * Using a queue to store all pointer allocate is a good way to free all things
747     * when necessary
748     */
749    foreach_dlist(f, dbi_getvalue_list) {
750       free(f->value);
751       free(f);
752    }
753    if (m_fields) {
754       free(m_fields);
755       m_fields = NULL;
756    }
757    m_num_rows = m_num_fields = 0;
758    db_unlock(this);
759 }
760
761 /* dbi_getvalue
762  * like PQgetvalue;
763  * char *PQgetvalue(const PGresult *res,
764  *                int row_number,
765  *                int column_number);
766  *
767  * use dbi_result_seek_row to search in result set
768  * use example to return only strings
769  */
770 static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
771 {
772    char *buf = NULL;
773    const char *dbi_errmsg;
774    const char *field_name;
775    unsigned short dbitype;
776    size_t field_length;
777    int64_t num;
778
779    /* correct the index for dbi interface
780     * dbi index begins 1
781     * I prefer do not change others functions
782     */
783    Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
784          result, row_number, column_number);
785
786    column_number++;
787
788    if(row_number == 0) {
789      row_number++;
790    }
791
792    Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
793                         result, row_number, column_number);
794
795    if(dbi_result_seek_row(result, row_number)) {
796
797       field_name = dbi_result_get_field_name(result, column_number);
798       field_length = dbi_result_get_field_length(result, field_name);
799       dbitype = dbi_result_get_field_type_idx(result,column_number);
800
801       Dmsg3(500, "dbi_getvalue start: type: '%d' "
802             "field_length bytes: '%d' fieldname: '%s'\n",
803             dbitype, field_length, field_name);
804
805       if(field_length) {
806          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
807          buf = (char *)malloc(field_length + 1);
808       } else {
809          /*
810           * if numbers
811           */
812          buf = (char *)malloc(sizeof(char *) * 50);
813       }
814
815       switch (dbitype) {
816       case DBI_TYPE_INTEGER:
817          num = dbi_result_get_longlong(result, field_name);
818          edit_int64(num, buf);
819          field_length = strlen(buf);
820          break;
821       case DBI_TYPE_STRING:
822          if(field_length) {
823             field_length = bsnprintf(buf, field_length + 1, "%s",
824             dbi_result_get_string(result, field_name));
825          } else {
826             buf[0] = 0;
827          }
828          break;
829       case DBI_TYPE_BINARY:
830          /*
831           * dbi_result_get_binary return a NULL pointer if value is empty
832           * following, change this to what Bacula espected
833           */
834          if(field_length) {
835             field_length = bsnprintf(buf, field_length + 1, "%s",
836                   dbi_result_get_binary(result, field_name));
837          } else {
838             buf[0] = 0;
839          }
840          break;
841       case DBI_TYPE_DATETIME:
842          time_t last;
843          struct tm tm;
844
845          last = dbi_result_get_datetime(result, field_name);
846
847          if(last == -1) {
848                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
849          } else {
850             (void)localtime_r(&last, &tm);
851             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
852                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
853                   tm.tm_hour, tm.tm_min, tm.tm_sec);
854          }
855          break;
856       }
857
858    } else {
859       dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
860       Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
861    }
862
863    Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
864       buf, field_length, buf);
865
866    /*
867     * Don't worry about this buf
868     */
869    return buf;
870 }
871
872 SQL_ROW B_DB_DBI::sql_fetch_row(void)
873 {
874    int j;
875    SQL_ROW row = NULL; /* by default, return NULL */
876
877    Dmsg0(500, "sql_fetch_row start\n");
878    if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
879       if (m_rows) {
880          Dmsg0(500, "sql_fetch_row freeing space\n");
881          Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
882          if (m_num_rows != 0) {
883             for (j = 0; j < m_num_fields; j++) {
884                Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
885                   if (m_rows[j]) {
886                      free(m_rows[j]);
887                   }
888             }
889          }
890          free(m_rows);
891       }
892       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
893       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
894       m_rows_size = m_num_fields;
895
896       /*
897        * Now reset the row_number now that we have the space allocated
898        */
899       m_row_number = 1;
900    }
901
902    /*
903     * If still within the result set
904     */
905    if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
906       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
907       /*
908        * Get each value from this row
909        */
910       for (j = 0; j < m_num_fields; j++) {
911          m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
912          /*
913           * Allocate space to queue row
914           */
915          m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
916          /*
917           * Store the pointer in queue
918           */
919          m_field_get->value = m_rows[j];
920          Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
921                j, m_rows[j], m_field_get->value, m_rows[j]);
922          /*
923           * Insert in queue to future free
924           */
925          dbi_getvalue_list->append(m_field_get);
926       }
927       /*
928        * Increment the row number for the next call
929        */
930       m_row_number++;
931
932       row = m_rows;
933    } else {
934       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
935    }
936
937    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
938
939    return row;
940 }
941
942 const char *B_DB_DBI::sql_strerror(void)
943 {
944    const char *dbi_errmsg;
945
946    dbi_conn_error(m_db_handle, &dbi_errmsg);
947
948    return dbi_errmsg;
949 }
950
951 void B_DB_DBI::sql_data_seek(int row)
952 {
953    /*
954     * Set the row number to be returned on the next call to sql_fetch_row
955     */
956    m_row_number = row;
957 }
958
959 int B_DB_DBI::sql_affected_rows(void)
960 {
961 #if 0
962    return dbi_result_get_numrows_affected(result);
963 #else
964    return 1;
965 #endif
966 }
967
968 uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
969 {
970    char sequence[30];
971    uint64_t id = 0;
972
973    /*
974     * First execute the insert query and then retrieve the currval.
975     */
976    if (!sql_query(query)) {
977       return 0;
978    }
979
980    m_num_rows = sql_affected_rows();
981    if (m_num_rows != 1) {
982       return 0;
983    }
984
985    changes++;
986
987    /*
988     * Obtain the current value of the sequence that
989     * provides the serial value for primary key of the table.
990     *
991     * currval is local to our session.  It is not affected by
992     * other transactions.
993     *
994     * Determine the name of the sequence.
995     * PostgreSQL automatically creates a sequence using
996     * <table>_<column>_seq.
997     * At the time of writing, all tables used this format for
998     * for their primary key: <table>id
999     * Except for basefiles which has a primary key on baseid.
1000     * Therefore, we need to special case that one table.
1001     *
1002     * everything else can use the PostgreSQL formula.
1003     */
1004    if (m_db_type == SQL_TYPE_POSTGRESQL) {
1005       if (strcasecmp(table_name, "basefiles") == 0) {
1006          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1007       } else {
1008          bstrncpy(sequence, table_name, sizeof(sequence));
1009          bstrncat(sequence, "_", sizeof(sequence));
1010          bstrncat(sequence, table_name, sizeof(sequence));
1011          bstrncat(sequence, "id", sizeof(sequence));
1012       }
1013
1014       bstrncat(sequence, "_seq", sizeof(sequence));
1015       id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
1016    } else {
1017       id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
1018    }
1019
1020    return id;
1021 }
1022
1023 /* dbi_getisnull
1024  * like PQgetisnull
1025  * int PQgetisnull(const PGresult *res,
1026  *                 int row_number,
1027  *                 int column_number);
1028  *
1029  *  use dbi_result_seek_row to search in result set
1030  */
1031 static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1032    int i;
1033
1034    if (row_number == 0) {
1035       row_number++;
1036    }
1037
1038    column_number++;
1039
1040    if (dbi_result_seek_row(result, row_number)) {
1041       i = dbi_result_field_is_null_idx(result,column_number);
1042       return i;
1043    } else {
1044       return 0;
1045    }
1046 }
1047
1048 SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
1049 {
1050    int i, j;
1051    int dbi_index;
1052    int max_length;
1053    int this_length;
1054    char *cbuf = NULL;
1055
1056    Dmsg0(500, "sql_fetch_field starts\n");
1057
1058    if (!m_fields || m_fields_size < m_num_fields) {
1059       if (m_fields) {
1060          free(m_fields);
1061          m_fields = NULL;
1062       }
1063       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
1064       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
1065       m_fields_size = m_num_fields;
1066
1067       for (i = 0; i < m_num_fields; i++) {
1068          /*
1069           * num_fields is starting at 1, increment i by 1
1070           */
1071          dbi_index = i + 1;
1072          Dmsg1(500, "filling field %d\n", i);
1073          m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
1074          m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
1075          m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
1076
1077          /*
1078           * For a given column, find the max length.
1079           */
1080          max_length = 0;
1081          for (j = 0; j < m_num_rows; j++) {
1082             if (dbi_getisnull(m_result, j, dbi_index)) {
1083                 this_length = 4;        /* "NULL" */
1084             } else {
1085                cbuf = dbi_getvalue(m_result, j, dbi_index);
1086                this_length = cstrlen(cbuf);
1087                /*
1088                 * cbuf is always free
1089                 */
1090                free(cbuf);
1091             }
1092          
1093             if (max_length < this_length) {
1094                max_length = this_length;
1095             }
1096          }
1097          m_fields[i].max_length = max_length;
1098
1099          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
1100                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
1101       }
1102    }
1103
1104    /*
1105     * Increment field number for the next time around
1106     */
1107    return &m_fields[m_field_number++];
1108 }
1109
1110 bool B_DB_DBI::sql_field_is_not_null(int field_type)
1111 {
1112    switch (field_type) {
1113    case (1 << 0):
1114       return true;
1115    default:
1116       return false;
1117    }
1118 }
1119
1120 bool B_DB_DBI::sql_field_is_numeric(int field_type)
1121 {
1122    switch (field_type) {
1123    case 1:
1124    case 2:
1125       return true;
1126    default:
1127       return false;
1128    }
1129 }
1130
1131 /*
1132  * Escape strings so that PostgreSQL is happy on COPY
1133  *
1134  *   NOTE! len is the length of the old string. Your new
1135  *         string must be long enough (max 2*old+1) to hold
1136  *         the escaped output.
1137  */
1138 static char *postgresql_copy_escape(char *dest, char *src, size_t len)
1139 {
1140    /*
1141     * We have to escape \t, \n, \r, \
1142     */
1143    char c = '\0' ;
1144
1145    while (len > 0 && *src) {
1146       switch (*src) {
1147       case '\n':
1148          c = 'n';
1149          break;
1150       case '\\':
1151          c = '\\';
1152          break;
1153       case '\t':
1154          c = 't';
1155          break;
1156       case '\r':
1157          c = 'r';
1158          break;
1159       default:
1160          c = '\0' ;
1161       }
1162
1163       if (c) {
1164          *dest = '\\';
1165          dest++;
1166          *dest = c;
1167       } else {
1168          *dest = *src;
1169       }
1170
1171       len--;
1172       src++;
1173       dest++;
1174    }
1175
1176    *dest = '\0';
1177    return dest;
1178 }
1179
1180 /*
1181  * This can be a bit strang but is the one way to do
1182  *
1183  * Returns true if OK
1184  *         false if failed
1185  */
1186 bool B_DB_DBI::sql_batch_start(JCR *jcr)
1187 {
1188    bool retval = true;
1189    const char *query = "COPY batch FROM STDIN";
1190
1191    Dmsg0(500, "sql_batch_start started\n");
1192
1193    db_lock(this);
1194    switch (m_db_type) {
1195    case SQL_TYPE_MYSQL:
1196       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1197                              "FileIndex integer,"
1198                              "JobId integer,"
1199                              "Path blob,"
1200                              "Name blob,"
1201                              "LStat tinyblob,"
1202                              "MD5 tinyblob,"
1203                              "DeltaSeq smallint)")) {
1204          Dmsg0(500, "sql_batch_start failed\n");
1205          goto bail_out;
1206       }
1207       Dmsg0(500, "sql_batch_start finishing\n");
1208       goto ok_out;
1209    case SQL_TYPE_POSTGRESQL:
1210       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1211                              "FileIndex int,"
1212                              "JobId int,"
1213                              "Path varchar,"
1214                              "Name varchar,"
1215                              "LStat varchar,"
1216                              "MD5 varchar,"
1217                              "DeltaSeq int)")) {
1218          Dmsg0(500, "sql_batch_start failed\n");
1219          goto bail_out;
1220       }
1221
1222       /*
1223        * We are starting a new query.  reset everything.
1224        */
1225       m_num_rows     = -1;
1226       m_row_number   = -1;
1227       m_field_number = -1;
1228
1229       sql_free_result();
1230
1231       for (int i=0; i < 10; i++) {
1232          sql_query(query);
1233          if (m_result) {
1234             break;
1235          }
1236          bmicrosleep(5, 0);
1237       }
1238       if (!m_result) {
1239          Dmsg1(50, "Query failed: %s\n", query);
1240          goto bail_out;
1241       }
1242
1243       m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
1244       //m_status = DBI_ERROR_NONE;
1245
1246       if (m_status == DBI_ERROR_NONE) {
1247          /*
1248           * How many fields in the set?
1249           */
1250          m_num_fields = dbi_result_get_numfields(m_result);
1251          m_num_rows = dbi_result_get_numrows(m_result);
1252          m_status = (dbi_error_flag) 1;
1253       } else {
1254          Dmsg1(50, "Result status failed: %s\n", query);
1255          goto bail_out;
1256       }
1257
1258       Dmsg0(500, "sql_batch_start finishing\n");
1259       goto ok_out;
1260    case SQL_TYPE_SQLITE3:
1261       if (!sql_query("CREATE TEMPORARY TABLE batch ("
1262                              "FileIndex integer,"
1263                              "JobId integer,"
1264                              "Path blob,"
1265                              "Name blob,"
1266                              "LStat tinyblob,"
1267                              "MD5 tinyblob,"
1268                              "DeltaSeq smallint)")) {
1269          Dmsg0(500, "sql_batch_start failed\n");
1270          goto bail_out;
1271       }
1272       Dmsg0(500, "sql_batch_start finishing\n");
1273       goto ok_out;
1274    }
1275
1276 bail_out:
1277    Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
1278    m_status = (dbi_error_flag) 0;
1279    sql_free_result();
1280    m_result = NULL;
1281    retval = false;
1282
1283 ok_out:
1284    db_unlock(this);
1285    return retval;
1286 }
1287
1288 /*
1289  * Set error to something to abort operation
1290  */
1291 bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
1292 {
1293    int res = 0;
1294    int count = 30;
1295    int (*custom_function)(void*, const char*) = NULL;
1296    dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1297
1298    Dmsg0(500, "sql_batch_start started\n");
1299
1300    switch (m_db_type) {
1301    case SQL_TYPE_MYSQL:
1302       m_status = (dbi_error_flag) 0;
1303       break;
1304    case SQL_TYPE_POSTGRESQL:
1305       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
1306
1307       do {
1308          res = (*custom_function)(myconn->connection, error);
1309       } while (res == 0 && --count > 0);
1310
1311       if (res == 1) {
1312          Dmsg0(500, "ok\n");
1313          m_status = (dbi_error_flag) 1;
1314       }
1315
1316       if (res <= 0) {
1317          Dmsg0(500, "we failed\n");
1318          m_status = (dbi_error_flag) 0;
1319          //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
1320        }
1321       break;
1322    case SQL_TYPE_SQLITE3:
1323       m_status = (dbi_error_flag) 0;
1324       break;
1325    }
1326
1327    Dmsg0(500, "sql_batch_start finishing\n");
1328
1329    return true;
1330 }
1331
1332 /*
1333  * This function is big and use a big switch.
1334  * In near future is better split in small functions
1335  * and refactory.
1336  */
1337 bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1338 {
1339    int res;
1340    int count=30;
1341    dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
1342    int (*custom_function)(void*, const char*, int) = NULL;
1343    char* (*custom_function_error)(void*) = NULL;
1344    size_t len;
1345    char *digest;
1346    char ed1[50];
1347
1348    Dmsg0(500, "sql_batch_start started \n");
1349
1350    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1351    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1352
1353    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1354       *digest = '\0';
1355    } else {
1356       digest = ar->Digest;
1357    }
1358
1359    switch (m_db_type) {
1360    case SQL_TYPE_MYSQL:
1361       db_escape_string(jcr, esc_name, fname, fnl);
1362       db_escape_string(jcr, esc_path, path, pnl);
1363       len = Mmsg(cmd, "INSERT INTO batch VALUES "
1364                       "(%u,%s,'%s','%s','%s','%s',%u)",
1365                       ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1366                       esc_name, ar->attr, digest, ar->DeltaSeq);
1367
1368       if (!sql_query(cmd))
1369       {
1370          Dmsg0(500, "sql_batch_start failed\n");
1371          goto bail_out;
1372       }
1373
1374       Dmsg0(500, "sql_batch_start finishing\n");
1375
1376       return true;
1377       break;
1378    case SQL_TYPE_POSTGRESQL:
1379       postgresql_copy_escape(esc_name, fname, fnl);
1380       postgresql_copy_escape(esc_path, path, pnl);
1381       len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
1382                      ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
1383                      esc_name, ar->attr, digest, ar->DeltaSeq);
1384
1385       /*
1386        * libdbi don't support CopyData and we need call a postgresql
1387        * specific function to do this work
1388        */
1389       Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
1390       custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
1391       if (custom_function != NULL) {
1392          do {
1393             res = (*custom_function)(myconn->connection, cmd, len);
1394          } while (res == 0 && --count > 0);
1395
1396          if (res == 1) {
1397             Dmsg0(500, "ok\n");
1398             changes++;
1399             m_status = (dbi_error_flag) 1;
1400          }
1401
1402          if (res <= 0) {
1403             Dmsg0(500, "sql_batch_insert failed\n");
1404             goto bail_out;
1405          }
1406
1407          Dmsg0(500, "sql_batch_insert finishing\n");
1408          return true;
1409       } else {
1410          /*
1411           * Ensure to detect a PQerror
1412           */
1413          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
1414          Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1415          goto bail_out;
1416       }
1417       break;
1418    case SQL_TYPE_SQLITE3:
1419       db_escape_string(jcr, esc_name, fname, fnl);
1420       db_escape_string(jcr, esc_path, path, pnl);
1421       len = Mmsg(cmd, "INSERT INTO batch VALUES "
1422                       "(%u,%s,'%s','%s','%s','%s',%u)",
1423                       ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1424                       esc_name, ar->attr, digest, ar->DeltaSeq);
1425
1426       if (!sql_query(cmd))
1427       {
1428          Dmsg0(500, "sql_batch_insert failed\n");
1429          goto bail_out;
1430       }
1431
1432       Dmsg0(500, "sql_batch_insert finishing\n");
1433
1434       return true;
1435       break;
1436    }
1437
1438 bail_out:
1439    Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
1440    m_status = (dbi_error_flag) 0;
1441    sql_free_result();
1442    return false;
1443 }
1444
1445 /*
1446  * Initialize database data structure. In principal this should
1447  * never have errors, or it is really fatal.
1448  */
1449 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1450                        const char *db_password, const char *db_address, int db_port,
1451                        const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1452 {
1453    B_DB_DBI *mdb = NULL;
1454
1455    if (!db_driver) {
1456       Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
1457    }
1458
1459    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
1460       Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
1461    }
1462
1463    if (!db_user) {
1464       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
1465       return NULL;
1466    }
1467
1468    P(mutex);                          /* lock DB queue */
1469    if (db_list && !mult_db_connections) {
1470       /*
1471        * Look to see if DB already open
1472        */
1473       foreach_dlist(mdb, db_list) {
1474          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1475             Dmsg1(100, "DB REopen %s\n", db_name);
1476             mdb->increment_refcount();
1477             goto bail_out;
1478          }
1479       }
1480    }
1481    Dmsg0(100, "db_init_database first time\n");
1482    mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
1483                       db_port, db_socket, mult_db_connections, disable_batch_insert));
1484
1485 bail_out:
1486    V(mutex);
1487    return mdb;
1488 }
1489
1490 #endif /* HAVE_DBI */