]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
Merge branch 'master' into basejobv3
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id$
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;        /* SQL engine type index */
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60               const char *db_password, const char *db_address, int db_port,
61               const char *db_socket, int mult_db_connections)
62 {
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else if (strcasecmp(p, "sqlite3") == 0) {
79       db_type = SQL_TYPE_SQLITE3;
80    } else {
81       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
82    }
83 #elif HAVE_MYSQL
84    db_type = SQL_TYPE_MYSQL;
85 #elif HAVE_POSTGRESQL
86    db_type = SQL_TYPE_POSTGRESQL;
87 #elif HAVE_SQLITE
88    db_type = SQL_TYPE_SQLITE;
89 #elif HAVE_SQLITE3
90    db_type = SQL_TYPE_SQLITE3;
91 #endif
92
93    return db_init_database(jcr, db_name, db_user, db_password, db_address,
94              db_port, db_socket, mult_db_connections);
95 }
96
97 dbid_list::dbid_list()
98 {
99    memset(this, 0, sizeof(dbid_list));
100    max_ids = 1000;
101    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
102    num_ids = num_seen = tot_ids = 0;
103    PurgedFiles = NULL;
104 }
105
106 dbid_list::~dbid_list()
107 {
108    free(DBId);
109 }
110
111
112 /*
113  * Called here to retrieve an integer from the database
114  */
115 static int int_handler(void *ctx, int num_fields, char **row)
116 {
117    uint32_t *val = (uint32_t *)ctx;
118
119    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
120
121    if (row[0]) {
122       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
123       *val = str_to_int64(row[0]);
124    } else {
125       Dmsg0(800, "int_handler finds zero\n");
126       *val = 0;
127    }
128    Dmsg0(800, "int_handler finishes\n");
129    return 0;
130 }
131
132 /*
133  * Called here to retrieve a 32/64 bit integer from the database.
134  *   The returned integer will be extended to 64 bit.
135  */
136 int db_int64_handler(void *ctx, int num_fields, char **row)
137 {
138    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
139
140    if (row[0]) {
141       lctx->value = str_to_int64(row[0]);
142       lctx->count++;
143    }
144    return 0;
145 }
146
147
148
149 /* NOTE!!! The following routines expect that the
150  *  calling subroutine sets and clears the mutex
151  */
152
153 /* Check that the tables correspond to the version we want */
154 bool check_tables_version(JCR *jcr, B_DB *mdb)
155 {
156    const char *query = "SELECT VersionId FROM Version";
157
158    bacula_db_version = 0;
159    if (!db_sql_query(mdb, query, int_handler, (void *)&bacula_db_version)) {
160       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
161       return false;
162    }
163    if (bacula_db_version != BDB_VERSION) {
164       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
165           mdb->db_name, BDB_VERSION, bacula_db_version);
166       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
167       return false;
168    }
169    return true;
170 }
171
172 /* Utility routine for queries. The database MUST be locked before calling here. */
173 int
174 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
175 {
176    int status;
177
178    sql_free_result(mdb);
179    if ((status=sql_query(mdb, cmd)) != 0) {
180       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
181       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
182       if (verbose) {
183          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
184       }
185       return 0;
186    }
187
188    mdb->result = sql_store_result(mdb);
189
190    return mdb->result != NULL;
191 }
192
193 /*
194  * Utility routine to do inserts
195  * Returns: 0 on failure
196  *          1 on success
197  */
198 int
199 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
200 {
201    if (sql_query(mdb, cmd)) {
202       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
203       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
204       if (verbose) {
205          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
206       }
207       return 0;
208    }
209    if (mdb->have_insert_id) {
210       mdb->num_rows = sql_affected_rows(mdb);
211    } else {
212       mdb->num_rows = 1;
213    }
214    if (mdb->num_rows != 1) {
215       char ed1[30];
216       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
217          edit_uint64(mdb->num_rows, ed1));
218       if (verbose) {
219          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
220       }
221       return 0;
222    }
223    mdb->changes++;
224    return 1;
225 }
226
227 /* Utility routine for updates.
228  *  Returns: 0 on failure
229  *           1 on success
230  */
231 int
232 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
233 {
234
235    if (sql_query(mdb, cmd)) {
236       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
237       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
238       if (verbose) {
239          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
240       }
241       return 0;
242    }
243    mdb->num_rows = sql_affected_rows(mdb);
244    if (mdb->num_rows < 1) {
245       char ed1[30];
246       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
247          edit_uint64(mdb->num_rows, ed1), cmd);
248       if (verbose) {
249 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
250       }
251       return 0;
252    }
253    mdb->changes++;
254    return 1;
255 }
256
257 /* Utility routine for deletes
258  *
259  * Returns: -1 on error
260  *           n number of rows affected
261  */
262 int
263 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
264 {
265
266    if (sql_query(mdb, cmd)) {
267       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
268       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
269       if (verbose) {
270          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
271       }
272       return -1;
273    }
274    mdb->changes++;
275    return sql_affected_rows(mdb);
276 }
277
278
279 /*
280  * Get record max. Query is already in mdb->cmd
281  *  No locking done
282  *
283  * Returns: -1 on failure
284  *          count on success
285  */
286 int get_sql_record_max(JCR *jcr, B_DB *mdb)
287 {
288    SQL_ROW row;
289    int stat = 0;
290
291    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
292       if ((row = sql_fetch_row(mdb)) == NULL) {
293          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
294          stat = -1;
295       } else {
296          stat = str_to_int64(row[0]);
297       }
298       sql_free_result(mdb);
299    } else {
300       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
301       stat = -1;
302    }
303    return stat;
304 }
305
306 /*
307  * Return pre-edited error message
308  */
309 char *db_strerror(B_DB *mdb)
310 {
311    return mdb->errmsg;
312 }
313
314 /*
315  * Lock database, this can be called multiple times by the same
316  *   thread without blocking, but must be unlocked the number of
317  *   times it was locked.
318  */
319 void _db_lock(const char *file, int line, B_DB *mdb)
320 {
321    int errstat;
322    if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
323       berrno be;
324       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
325            errstat, be.bstrerror(errstat));
326    }
327 }
328
329 /*
330  * Unlock the database. This can be called multiple times by the
331  *   same thread up to the number of times that thread called
332  *   db_lock()/
333  */
334 void _db_unlock(const char *file, int line, B_DB *mdb)
335 {
336    int errstat;
337    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
338       berrno be;
339       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
340            errstat, be.bstrerror(errstat));
341    }
342 }
343
344 /*
345  * Start a transaction. This groups inserts and makes things
346  *  much more efficient. Usually started when inserting
347  *  file attributes.
348  */
349 void db_start_transaction(JCR *jcr, B_DB *mdb)
350 {
351    if (!jcr->attr) {
352       jcr->attr = get_pool_memory(PM_FNAME);
353    }
354    if (!jcr->ar) {
355       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
356    }
357
358 #ifdef HAVE_SQLITE
359    if (!mdb->allow_transactions) {
360       return;
361    }
362    db_lock(mdb);
363    /* Allow only 10,000 changes per transaction */
364    if (mdb->transaction && mdb->changes > 10000) {
365       db_end_transaction(jcr, mdb);
366    }
367    if (!mdb->transaction) {
368       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
369       Dmsg0(400, "Start SQLite transaction\n");
370       mdb->transaction = 1;
371    }
372    db_unlock(mdb);
373 #endif
374
375 /*
376  * This is turned off because transactions break
377  * if multiple simultaneous jobs are run.
378  */
379 #ifdef HAVE_POSTGRESQL
380    if (!mdb->allow_transactions) {
381       return;
382    }
383    db_lock(mdb);
384    /* Allow only 25,000 changes per transaction */
385    if (mdb->transaction && mdb->changes > 25000) {
386       db_end_transaction(jcr, mdb);
387    }
388    if (!mdb->transaction) {
389       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
390       Dmsg0(400, "Start PosgreSQL transaction\n");
391       mdb->transaction = 1;
392    }
393    db_unlock(mdb);
394 #endif
395
396 #ifdef HAVE_DBI
397    if (db_type == SQL_TYPE_SQLITE) {
398       if (!mdb->allow_transactions) {
399          return;
400       }
401       db_lock(mdb);
402       /* Allow only 10,000 changes per transaction */
403       if (mdb->transaction && mdb->changes > 10000) {
404          db_end_transaction(jcr, mdb);
405       }
406       if (!mdb->transaction) {
407          //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
408          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
409          Dmsg0(400, "Start SQLite transaction\n");
410          mdb->transaction = 1;
411       }
412       db_unlock(mdb);
413    } else if (db_type == SQL_TYPE_POSTGRESQL) {
414       if (!mdb->allow_transactions) {
415          return;
416       }
417       db_lock(mdb);
418       /* Allow only 25,000 changes per transaction */
419       if (mdb->transaction && mdb->changes > 25000) {
420          db_end_transaction(jcr, mdb);
421       }
422       if (!mdb->transaction) {
423          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
424          Dmsg0(400, "Start PosgreSQL transaction\n");
425          mdb->transaction = 1;
426       }
427       db_unlock(mdb);
428    }
429 #endif
430 }
431
432 void db_end_transaction(JCR *jcr, B_DB *mdb)
433 {
434    /*
435     * This can be called during thread cleanup and
436     *   the db may already be closed.  So simply return.
437     */
438    if (!mdb) {
439       return;
440    }
441
442    if (jcr && jcr->cached_attribute) {
443       Dmsg0(400, "Flush last cached attribute.\n");
444       if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
445          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
446       }
447       jcr->cached_attribute = false;
448    }
449
450 #ifdef HAVE_SQLITE
451    if (!mdb->allow_transactions) {
452       return;
453    }
454    db_lock(mdb);
455    if (mdb->transaction) {
456       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
457       mdb->transaction = 0;
458       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
459    }
460    mdb->changes = 0;
461    db_unlock(mdb);
462 #endif
463
464 #ifdef HAVE_POSTGRESQL
465    if (!mdb->allow_transactions) {
466       return;
467    }
468    db_lock(mdb);
469    if (mdb->transaction) {
470       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
471       mdb->transaction = 0;
472       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
473    }
474    mdb->changes = 0;
475    db_unlock(mdb);
476 #endif
477
478 #ifdef HAVE_DBI
479    if (db_type == SQL_TYPE_SQLITE) {
480       if (!mdb->allow_transactions) {
481          return;
482       }
483       db_lock(mdb);
484       if (mdb->transaction) {
485          //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
486          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
487          mdb->transaction = 0;
488          Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
489       }
490       mdb->changes = 0;
491       db_unlock(mdb);
492    } else if (db_type == SQL_TYPE_POSTGRESQL) {
493       if (!mdb->allow_transactions) {
494          return;
495       }
496       db_lock(mdb);
497       if (mdb->transaction) {
498          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
499          mdb->transaction = 0;
500          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
501       }
502       mdb->changes = 0;
503       db_unlock(mdb);
504    }
505 #endif
506 }
507
508 /*
509  * Given a full filename, split it into its path
510  *  and filename parts. They are returned in pool memory
511  *  in the mdb structure.
512  */
513 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
514 {
515    const char *p, *f;
516
517    /* Find path without the filename.
518     * I.e. everything after the last / is a "filename".
519     * OK, maybe it is a directory name, but we treat it like
520     * a filename. If we don't find a / then the whole name
521     * must be a path name (e.g. c:).
522     */
523    for (p=f=fname; *p; p++) {
524       if (IsPathSeparator(*p)) {
525          f = p;                       /* set pos of last slash */
526       }
527    }
528    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
529       f++;                            /* yes, point to filename */
530    } else {                           /* no, whole thing must be path name */
531       f = p;
532    }
533
534    /* If filename doesn't exist (i.e. root directory), we
535     * simply create a blank name consisting of a single
536     * space. This makes handling zero length filenames
537     * easier.
538     */
539    mdb->fnl = p - f;
540    if (mdb->fnl > 0) {
541       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
542       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
543       mdb->fname[mdb->fnl] = 0;
544    } else {
545       mdb->fname[0] = 0;
546       mdb->fnl = 0;
547    }
548
549    mdb->pnl = f - fname;
550    if (mdb->pnl > 0) {
551       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
552       memcpy(mdb->path, fname, mdb->pnl);
553       mdb->path[mdb->pnl] = 0;
554    } else {
555       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
556       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
557       mdb->path[0] = 0;
558       mdb->pnl = 0;
559    }
560
561    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
562 }
563
564 /*
565  * Set maximum field length to something reasonable
566  */
567 static int max_length(int max_length)
568 {
569    int max_len = max_length;
570    /* Sanity check */
571    if (max_len < 0) {
572       max_len = 2;
573    } else if (max_len > 100) {
574       max_len = 100;
575    }
576    return max_len;
577 }
578
579 /*
580  * List dashes as part of header for listing SQL results in a table
581  */
582 void
583 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
584 {
585    SQL_FIELD  *field;
586    int i, j;
587    int len;
588
589    sql_field_seek(mdb, 0);
590    send(ctx, "+");
591    for (i = 0; i < sql_num_fields(mdb); i++) {
592       field = sql_fetch_field(mdb);
593       if (!field) {
594          break;
595       }
596       len = max_length(field->max_length + 2);
597       for (j = 0; j < len; j++) {
598          send(ctx, "-");
599       }
600       send(ctx, "+");
601    }
602    send(ctx, "\n");
603 }
604
605 /*
606  * If full_list is set, we list vertically, otherwise, we
607  * list on one line horizontally.
608  */
609 void
610 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
611 {
612    SQL_FIELD *field;
613    SQL_ROW row;
614    int i, col_len, max_len = 0;
615    char buf[2000], ewc[30];
616
617    Dmsg0(800, "list_result starts\n");
618    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
619       send(ctx, _("No results to list.\n"));
620       return;
621    }
622
623    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
624    /* determine column display widths */
625    sql_field_seek(mdb, 0);
626    for (i = 0; i < sql_num_fields(mdb); i++) {
627       Dmsg1(800, "list_result processing field %d\n", i);
628       field = sql_fetch_field(mdb);
629       if (!field) {
630          break;
631       }
632       col_len = cstrlen(field->name);
633       if (type == VERT_LIST) {
634          if (col_len > max_len) {
635             max_len = col_len;
636          }
637       } else {
638          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
639             field->max_length += (field->max_length - 1) / 3;
640          }
641          if (col_len < (int)field->max_length) {
642             col_len = field->max_length;
643          }
644          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
645             col_len = 4;                 /* 4 = length of the word "NULL" */
646          }
647          field->max_length = col_len;    /* reset column info */
648       }
649    }
650
651    Dmsg0(800, "list_result finished first loop\n");
652    if (type == VERT_LIST) {
653       goto vertical_list;
654    }
655
656    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
657    list_dashes(mdb, send, ctx);
658    send(ctx, "|");
659    sql_field_seek(mdb, 0);
660    for (i = 0; i < sql_num_fields(mdb); i++) {
661       Dmsg1(800, "list_result looking at field %d\n", i);
662       field = sql_fetch_field(mdb);
663       if (!field) {
664          break;
665       }
666       max_len = max_length(field->max_length);
667       bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
668       send(ctx, buf);
669    }
670    send(ctx, "\n");
671    list_dashes(mdb, send, ctx);
672
673    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
674    while ((row = sql_fetch_row(mdb)) != NULL) {
675       sql_field_seek(mdb, 0);
676       send(ctx, "|");
677       for (i = 0; i < sql_num_fields(mdb); i++) {
678          field = sql_fetch_field(mdb);
679          if (!field) {
680             break;
681          }
682          max_len = max_length(field->max_length);
683          if (row[i] == NULL) {
684             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
685          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
686             bsnprintf(buf, sizeof(buf), " %*s |", max_len,
687                       add_commas(row[i], ewc));
688          } else {
689             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
690          }
691          send(ctx, buf);
692       }
693       send(ctx, "\n");
694    }
695    list_dashes(mdb, send, ctx);
696    return;
697
698 vertical_list:
699
700    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
701    while ((row = sql_fetch_row(mdb)) != NULL) {
702       sql_field_seek(mdb, 0);
703       for (i = 0; i < sql_num_fields(mdb); i++) {
704          field = sql_fetch_field(mdb);
705          if (!field) {
706             break;
707          }
708          if (row[i] == NULL) {
709             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
710          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
711             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
712                 add_commas(row[i], ewc));
713          } else {
714             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
715          }
716          send(ctx, buf);
717       }
718       send(ctx, "\n");
719    }
720    return;
721 }
722
723 /* 
724  * Open a new connexion to mdb catalog. This function is used
725  * by batch and accurate mode.
726  */
727 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
728 {
729    int multi_db=false;
730
731 #ifdef HAVE_BATCH_FILE_INSERT
732    multi_db=true;               /* we force a new connexion only if batch insert is enabled */
733 #endif
734
735    if (!jcr->db_batch) {
736       jcr->db_batch = db_init_database(jcr, 
737                                       mdb->db_name, 
738                                       mdb->db_user,
739                                       mdb->db_password, 
740                                       mdb->db_address,
741                                       mdb->db_port,
742                                       mdb->db_socket,
743                                       multi_db /* multi_db = true when using batch mode */);
744       if (!jcr->db_batch) {
745          Jmsg0(jcr, M_FATAL, 0, "Could not init batch connexion");
746          return false;
747       }
748
749       if (!db_open_database(jcr, jcr->db_batch)) {
750          Mmsg2(&jcr->db_batch->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
751               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
752          Jmsg1(jcr, M_FATAL, 0, "%s", jcr->db_batch->errmsg);
753          return false;
754       }      
755       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
756             jcr->db_batch->connected, jcr->db_batch->db);
757
758    }
759    return true;
760 }
761
762 /*
763  * !!! WARNING !!! Use this function only when bacula is stopped.
764  * ie, after a fatal signal and before exiting the program
765  * Print information about a B_DB object.
766  */
767 void _dbg_print_db(JCR *jcr, FILE *fp)
768 {
769    B_DB *mdb = jcr->db;
770
771    if (!mdb) {
772       return;
773    }
774
775    fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
776            mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
777    fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
778    if (mdb->lock.valid == RWLOCK_VALID) { 
779       fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
780    }
781 }
782
783 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/