]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
4327955ce77bad370b02e210f2259c994e50d3f3
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;        /* SQL engine type index */
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60               const char *db_password, const char *db_address, int db_port,
61               const char *db_socket, int mult_db_connections)
62 {
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else if (strcasecmp(p, "sqlite3") == 0) {
79       db_type = SQL_TYPE_SQLITE3;
80    } else {
81       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
82    }
83 #elif HAVE_MYSQL
84    db_type = SQL_TYPE_MYSQL;
85 #elif HAVE_POSTGRESQL
86    db_type = SQL_TYPE_POSTGRESQL;
87 #elif HAVE_INGRES
88    db_type = SQL_TYPE_INGRES;
89 #elif HAVE_SQLITE
90    db_type = SQL_TYPE_SQLITE;
91 #elif HAVE_SQLITE3
92    db_type = SQL_TYPE_SQLITE3;
93 #endif
94
95    return db_init_database(jcr, db_name, db_user, db_password, db_address,
96              db_port, db_socket, mult_db_connections);
97 }
98
99 dbid_list::dbid_list()
100 {
101    memset(this, 0, sizeof(dbid_list));
102    max_ids = 1000;
103    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
104    num_ids = num_seen = tot_ids = 0;
105    PurgedFiles = NULL;
106 }
107
108 dbid_list::~dbid_list()
109 {
110    free(DBId);
111 }
112
113 /*
114  * Called here to retrieve an integer from the database
115  */
116 int db_int_handler(void *ctx, int num_fields, char **row)
117 {
118    uint32_t *val = (uint32_t *)ctx;
119
120    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
121
122    if (row[0]) {
123       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
124       *val = str_to_int64(row[0]);
125    } else {
126       Dmsg0(800, "int_handler finds zero\n");
127       *val = 0;
128    }
129    Dmsg0(800, "int_handler finishes\n");
130    return 0;
131 }
132
133 /*
134  * Called here to retrieve a 32/64 bit integer from the database.
135  *   The returned integer will be extended to 64 bit.
136  */
137 int db_int64_handler(void *ctx, int num_fields, char **row)
138 {
139    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
140
141    if (row[0]) {
142       lctx->value = str_to_int64(row[0]);
143       lctx->count++;
144    }
145    return 0;
146 }
147
148 /*
149  * Use to build a comma separated list of values from a query. "10,20,30"
150  */
151 int db_list_handler(void *ctx, int num_fields, char **row)
152 {
153    db_list_ctx *lctx = (db_list_ctx *)ctx;
154    if (num_fields == 1 && row[0]) {
155       if (lctx->list[0]) {
156          pm_strcat(lctx->list, ",");
157       }
158       pm_strcat(lctx->list, row[0]);
159       lctx->count++;
160    }
161    return 0;
162 }
163
164
165 /*
166  * Called here to retrieve an integer from the database
167  */
168 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
169 {
170    uint32_t *val = (uint32_t *)ctx;
171    uint32_t index = sql_get_max_connections_index[db_type];
172    if (row[index]) {
173       *val = str_to_int64(row[index]);
174    } else {
175       Dmsg0(800, "int_handler finds zero\n");
176       *val = 0;
177    }
178    return 0;
179 }
180
181 /* 
182  * Check catalog max_connections setting
183  */
184 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
185 {
186    uint32_t max_conn=0;
187    int ret=true;
188
189    /* Without Batch insert, no need to verify max_connections */
190 #ifndef HAVE_BATCH_FILE_INSERT
191    return ret;
192 #endif
193
194    /* Check max_connections setting */
195    if (!db_sql_query(mdb, sql_get_max_connections[db_type], 
196                      db_max_connections_handler, &max_conn)) {
197       Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
198       return ret;
199    }
200    if (max_conn && max_concurrent_jobs && max_concurrent_jobs > max_conn) {
201       Mmsg(mdb->errmsg, 
202            _("On db_name=%s, %s max_connections=%d is lower than Director "
203              "MaxConcurentJobs=%d\n"),
204            mdb->db_name, db_get_type(), max_conn, max_concurrent_jobs);
205       Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
206       ret = false;
207    }
208
209    return ret;
210 }
211
212 /* NOTE!!! The following routines expect that the
213  *  calling subroutine sets and clears the mutex
214  */
215
216 /* Check that the tables correspond to the version we want */
217 bool check_tables_version(JCR *jcr, B_DB *mdb)
218 {
219    const char *query = "SELECT VersionId FROM Version";
220
221    bacula_db_version = 0;
222    if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
223       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
224       return false;
225    }
226    if (bacula_db_version != BDB_VERSION) {
227       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
228           mdb->db_name, BDB_VERSION, bacula_db_version);
229       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
230       return false;
231    }
232    return true;
233 }
234
235 /* Utility routine for queries. The database MUST be locked before calling here. */
236 int
237 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
238 {
239    int status;
240
241    sql_free_result(mdb);
242    if ((status=sql_query(mdb, cmd)) != 0) {
243       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
244       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
245       if (verbose) {
246          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
247       }
248       return 0;
249    }
250
251    mdb->result = sql_store_result(mdb);
252
253    return mdb->result != NULL;
254 }
255
256 /*
257  * Utility routine to do inserts
258  * Returns: 0 on failure
259  *          1 on success
260  */
261 int
262 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
263 {
264    if (sql_query(mdb, cmd)) {
265       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
266       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
267       if (verbose) {
268          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
269       }
270       return 0;
271    }
272    if (mdb->have_insert_id) {
273       mdb->num_rows = sql_affected_rows(mdb);
274    } else {
275       mdb->num_rows = 1;
276    }
277    if (mdb->num_rows != 1) {
278       char ed1[30];
279       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
280          edit_uint64(mdb->num_rows, ed1));
281       if (verbose) {
282          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
283       }
284       return 0;
285    }
286    mdb->changes++;
287    return 1;
288 }
289
290 /* Utility routine for updates.
291  *  Returns: 0 on failure
292  *           1 on success
293  */
294 int
295 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
296 {
297
298    if (sql_query(mdb, cmd)) {
299       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
300       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
301       if (verbose) {
302          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
303       }
304       return 0;
305    }
306    mdb->num_rows = sql_affected_rows(mdb);
307    if (mdb->num_rows < 1) {
308       char ed1[30];
309       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
310          edit_uint64(mdb->num_rows, ed1), cmd);
311       if (verbose) {
312 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
313       }
314       return 0;
315    }
316    mdb->changes++;
317    return 1;
318 }
319
320 /* Utility routine for deletes
321  *
322  * Returns: -1 on error
323  *           n number of rows affected
324  */
325 int
326 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
327 {
328
329    if (sql_query(mdb, cmd)) {
330       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
331       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
332       if (verbose) {
333          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
334       }
335       return -1;
336    }
337    mdb->changes++;
338    return sql_affected_rows(mdb);
339 }
340
341
342 /*
343  * Get record max. Query is already in mdb->cmd
344  *  No locking done
345  *
346  * Returns: -1 on failure
347  *          count on success
348  */
349 int get_sql_record_max(JCR *jcr, B_DB *mdb)
350 {
351    SQL_ROW row;
352    int stat = 0;
353
354    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
355       if ((row = sql_fetch_row(mdb)) == NULL) {
356          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
357          stat = -1;
358       } else {
359          stat = str_to_int64(row[0]);
360       }
361       sql_free_result(mdb);
362    } else {
363       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
364       stat = -1;
365    }
366    return stat;
367 }
368
369 /*
370  * Return pre-edited error message
371  */
372 char *db_strerror(B_DB *mdb)
373 {
374    return mdb->errmsg;
375 }
376
377 /*
378  * Lock database, this can be called multiple times by the same
379  *   thread without blocking, but must be unlocked the number of
380  *   times it was locked.
381  */
382 void _db_lock(const char *file, int line, B_DB *mdb)
383 {
384    int errstat;
385    if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
386       berrno be;
387       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
388            errstat, be.bstrerror(errstat));
389    }
390 }
391
392 /*
393  * Unlock the database. This can be called multiple times by the
394  *   same thread up to the number of times that thread called
395  *   db_lock()/
396  */
397 void _db_unlock(const char *file, int line, B_DB *mdb)
398 {
399    int errstat;
400    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
401       berrno be;
402       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
403            errstat, be.bstrerror(errstat));
404    }
405 }
406
407 /*
408  * Start a transaction. This groups inserts and makes things
409  *  much more efficient. Usually started when inserting
410  *  file attributes.
411  */
412 void db_start_transaction(JCR *jcr, B_DB *mdb)
413 {
414    if (!jcr->attr) {
415       jcr->attr = get_pool_memory(PM_FNAME);
416    }
417    if (!jcr->ar) {
418       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
419    }
420
421 #ifdef HAVE_SQLITE
422    if (!mdb->allow_transactions) {
423       return;
424    }
425    db_lock(mdb);
426    /* Allow only 10,000 changes per transaction */
427    if (mdb->transaction && mdb->changes > 10000) {
428       db_end_transaction(jcr, mdb);
429    }
430    if (!mdb->transaction) {
431       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
432       Dmsg0(400, "Start SQLite transaction\n");
433       mdb->transaction = 1;
434    }
435    db_unlock(mdb);
436 #endif
437
438 /*
439  * This is turned off because transactions break
440  * if multiple simultaneous jobs are run.
441  */
442 #ifdef HAVE_POSTGRESQL
443    if (!mdb->allow_transactions) {
444       return;
445    }
446    db_lock(mdb);
447    /* Allow only 25,000 changes per transaction */
448    if (mdb->transaction && mdb->changes > 25000) {
449       db_end_transaction(jcr, mdb);
450    }
451    if (!mdb->transaction) {
452       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
453       Dmsg0(400, "Start PosgreSQL transaction\n");
454       mdb->transaction = 1;
455    }
456    db_unlock(mdb);
457 #endif
458
459 #ifdef HAVE_INGRES
460    if (!mdb->allow_transactions) {
461       return;
462    }
463    db_lock(mdb);
464    /* Allow only 25,000 changes per transaction */
465    if (mdb->transaction && mdb->changes > 25000) {
466       db_end_transaction(jcr, mdb);
467    }
468    if (!mdb->transaction) {
469       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
470       Dmsg0(400, "Start Ingres transaction\n");
471       mdb->transaction = 1;
472    }
473    db_unlock(mdb);
474 #endif
475
476 #ifdef HAVE_DBI
477    if (db_type == SQL_TYPE_SQLITE) {
478       if (!mdb->allow_transactions) {
479          return;
480       }
481       db_lock(mdb);
482       /* Allow only 10,000 changes per transaction */
483       if (mdb->transaction && mdb->changes > 10000) {
484          db_end_transaction(jcr, mdb);
485       }
486       if (!mdb->transaction) {
487          //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
488          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
489          Dmsg0(400, "Start SQLite transaction\n");
490          mdb->transaction = 1;
491       }
492       db_unlock(mdb);
493    } else if (db_type == SQL_TYPE_POSTGRESQL) {
494       if (!mdb->allow_transactions) {
495          return;
496       }
497       db_lock(mdb);
498       /* Allow only 25,000 changes per transaction */
499       if (mdb->transaction && mdb->changes > 25000) {
500          db_end_transaction(jcr, mdb);
501       }
502       if (!mdb->transaction) {
503          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
504          Dmsg0(400, "Start PosgreSQL transaction\n");
505          mdb->transaction = 1;
506       }
507       db_unlock(mdb);
508    }
509 #endif
510 }
511
512 void db_end_transaction(JCR *jcr, B_DB *mdb)
513 {
514    /*
515     * This can be called during thread cleanup and
516     *   the db may already be closed.  So simply return.
517     */
518    if (!mdb) {
519       return;
520    }
521
522    if (jcr && jcr->cached_attribute) {
523       Dmsg0(400, "Flush last cached attribute.\n");
524       if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
525          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
526       }
527       jcr->cached_attribute = false;
528    }
529
530 #ifdef HAVE_SQLITE
531    if (!mdb->allow_transactions) {
532       return;
533    }
534    db_lock(mdb);
535    if (mdb->transaction) {
536       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
537       mdb->transaction = 0;
538       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
539    }
540    mdb->changes = 0;
541    db_unlock(mdb);
542 #endif
543
544
545
546 #ifdef HAVE_INGRES
547    if (!mdb->allow_transactions) {
548       return;
549    }
550    db_lock(mdb);
551    if (mdb->transaction) {
552       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
553       mdb->transaction = 0;
554       Dmsg1(400, "End Ingres transaction changes=%d\n", mdb->changes);
555    }
556    mdb->changes = 0;
557    db_unlock(mdb);
558 #endif
559
560
561 #ifdef HAVE_POSTGRESQL
562    if (!mdb->allow_transactions) {
563       return;
564    }
565    db_lock(mdb);
566    if (mdb->transaction) {
567       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
568       mdb->transaction = 0;
569       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
570    }
571    mdb->changes = 0;
572    db_unlock(mdb);
573 #endif
574
575 #ifdef HAVE_DBI
576    if (db_type == SQL_TYPE_SQLITE) {
577       if (!mdb->allow_transactions) {
578          return;
579       }
580       db_lock(mdb);
581       if (mdb->transaction) {
582          //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
583          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
584          mdb->transaction = 0;
585          Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
586       }
587       mdb->changes = 0;
588       db_unlock(mdb);
589    } else if (db_type == SQL_TYPE_POSTGRESQL) {
590       if (!mdb->allow_transactions) {
591          return;
592       }
593       db_lock(mdb);
594       if (mdb->transaction) {
595          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
596          mdb->transaction = 0;
597          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
598       }
599       mdb->changes = 0;
600       db_unlock(mdb);
601    }
602 #endif
603 }
604
605 /*
606  * Given a full filename, split it into its path
607  *  and filename parts. They are returned in pool memory
608  *  in the mdb structure.
609  */
610 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
611 {
612    const char *p, *f;
613
614    /* Find path without the filename.
615     * I.e. everything after the last / is a "filename".
616     * OK, maybe it is a directory name, but we treat it like
617     * a filename. If we don't find a / then the whole name
618     * must be a path name (e.g. c:).
619     */
620    for (p=f=fname; *p; p++) {
621       if (IsPathSeparator(*p)) {
622          f = p;                       /* set pos of last slash */
623       }
624    }
625    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
626       f++;                            /* yes, point to filename */
627    } else {                           /* no, whole thing must be path name */
628       f = p;
629    }
630
631    /* If filename doesn't exist (i.e. root directory), we
632     * simply create a blank name consisting of a single
633     * space. This makes handling zero length filenames
634     * easier.
635     */
636    mdb->fnl = p - f;
637    if (mdb->fnl > 0) {
638       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
639       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
640       mdb->fname[mdb->fnl] = 0;
641    } else {
642       mdb->fname[0] = 0;
643       mdb->fnl = 0;
644    }
645
646    mdb->pnl = f - fname;
647    if (mdb->pnl > 0) {
648       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
649       memcpy(mdb->path, fname, mdb->pnl);
650       mdb->path[mdb->pnl] = 0;
651    } else {
652       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
653       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
654       mdb->path[0] = 0;
655       mdb->pnl = 0;
656    }
657
658    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
659 }
660
661 /*
662  * Set maximum field length to something reasonable
663  */
664 static int max_length(int max_length)
665 {
666    int max_len = max_length;
667    /* Sanity check */
668    if (max_len < 0) {
669       max_len = 2;
670    } else if (max_len > 100) {
671       max_len = 100;
672    }
673    return max_len;
674 }
675
676 /*
677  * List dashes as part of header for listing SQL results in a table
678  */
679 void
680 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
681 {
682    SQL_FIELD  *field;
683    int i, j;
684    int len;
685
686    sql_field_seek(mdb, 0);
687    send(ctx, "+");
688    for (i = 0; i < sql_num_fields(mdb); i++) {
689       field = sql_fetch_field(mdb);
690       if (!field) {
691          break;
692       }
693       len = max_length(field->max_length + 2);
694       for (j = 0; j < len; j++) {
695          send(ctx, "-");
696       }
697       send(ctx, "+");
698    }
699    send(ctx, "\n");
700 }
701
702 /*
703  * If full_list is set, we list vertically, otherwise, we
704  * list on one line horizontally.
705  */
706 void
707 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
708 {
709    SQL_FIELD *field;
710    SQL_ROW row;
711    int i, col_len, max_len = 0;
712    char buf[2000], ewc[30];
713
714    Dmsg0(800, "list_result starts\n");
715    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
716       send(ctx, _("No results to list.\n"));
717       return;
718    }
719
720    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
721    /* determine column display widths */
722    sql_field_seek(mdb, 0);
723    for (i = 0; i < sql_num_fields(mdb); i++) {
724       Dmsg1(800, "list_result processing field %d\n", i);
725       field = sql_fetch_field(mdb);
726       if (!field) {
727          break;
728       }
729       col_len = cstrlen(field->name);
730       if (type == VERT_LIST) {
731          if (col_len > max_len) {
732             max_len = col_len;
733          }
734       } else {
735          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
736             field->max_length += (field->max_length - 1) / 3;
737          }
738          if (col_len < (int)field->max_length) {
739             col_len = field->max_length;
740          }
741          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
742             col_len = 4;                 /* 4 = length of the word "NULL" */
743          }
744          field->max_length = col_len;    /* reset column info */
745       }
746    }
747
748    Dmsg0(800, "list_result finished first loop\n");
749    if (type == VERT_LIST) {
750       goto vertical_list;
751    }
752
753    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
754    list_dashes(mdb, send, ctx);
755    send(ctx, "|");
756    sql_field_seek(mdb, 0);
757    for (i = 0; i < sql_num_fields(mdb); i++) {
758       Dmsg1(800, "list_result looking at field %d\n", i);
759       field = sql_fetch_field(mdb);
760       if (!field) {
761          break;
762       }
763       max_len = max_length(field->max_length);
764       bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
765       send(ctx, buf);
766    }
767    send(ctx, "\n");
768    list_dashes(mdb, send, ctx);
769
770    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
771    while ((row = sql_fetch_row(mdb)) != NULL) {
772       sql_field_seek(mdb, 0);
773       send(ctx, "|");
774       for (i = 0; i < sql_num_fields(mdb); i++) {
775          field = sql_fetch_field(mdb);
776          if (!field) {
777             break;
778          }
779          max_len = max_length(field->max_length);
780          if (row[i] == NULL) {
781             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
782          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
783             bsnprintf(buf, sizeof(buf), " %*s |", max_len,
784                       add_commas(row[i], ewc));
785          } else {
786             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
787          }
788          send(ctx, buf);
789       }
790       send(ctx, "\n");
791    }
792    list_dashes(mdb, send, ctx);
793    return;
794
795 vertical_list:
796
797    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
798    while ((row = sql_fetch_row(mdb)) != NULL) {
799       sql_field_seek(mdb, 0);
800       for (i = 0; i < sql_num_fields(mdb); i++) {
801          field = sql_fetch_field(mdb);
802          if (!field) {
803             break;
804          }
805          if (row[i] == NULL) {
806             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
807          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
808             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
809                 add_commas(row[i], ewc));
810          } else {
811             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
812          }
813          send(ctx, buf);
814       }
815       send(ctx, "\n");
816    }
817    return;
818 }
819
820 /* 
821  * Open a new connexion to mdb catalog. This function is used
822  * by batch and accurate mode.
823  */
824 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
825 {
826 #ifdef HAVE_BATCH_FILE_INSERT
827    const int multi_db = true;   /* we force a new connection only if batch insert is enabled */
828 #else
829    const int multi_db = false;
830 #endif
831
832    if (!jcr->db_batch) {
833       jcr->db_batch = db_init_database(jcr, 
834                                       mdb->db_name, 
835                                       mdb->db_user,
836                                       mdb->db_password, 
837                                       mdb->db_address,
838                                       mdb->db_port,
839                                       mdb->db_socket,
840                                       multi_db /* multi_db = true when using batch mode */);
841       if (!jcr->db_batch) {
842          Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
843          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
844          return false;
845       }
846
847       if (!db_open_database(jcr, jcr->db_batch)) {
848          Mmsg2(&mdb->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
849               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
850          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
851          return false;
852       }      
853       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
854             jcr->db_batch->connected, jcr->db_batch->db);
855
856    }
857    return true;
858 }
859
860 /*
861  * !!! WARNING !!! Use this function only when bacula is stopped.
862  * ie, after a fatal signal and before exiting the program
863  * Print information about a B_DB object.
864  */
865 void db_debug_print(JCR *jcr, FILE *fp)
866 {
867    B_DB *mdb = jcr->db;
868
869    if (!mdb) {
870       return;
871    }
872
873    fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
874            mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
875    fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
876    if (mdb->lock.valid == RWLOCK_VALID) { 
877       fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
878    }
879 }
880
881 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES*/