]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
Merge branch 'master' into basejobv3
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id$
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;        /* SQL engine type index */
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60               const char *db_password, const char *db_address, int db_port,
61               const char *db_socket, int mult_db_connections)
62 {
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else if (strcasecmp(p, "sqlite3") == 0) {
79       db_type = SQL_TYPE_SQLITE3;
80    } else {
81       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
82    }
83 #elif HAVE_MYSQL
84    db_type = SQL_TYPE_MYSQL;
85 #elif HAVE_POSTGRESQL
86    db_type = SQL_TYPE_POSTGRESQL;
87 #elif HAVE_SQLITE
88    db_type = SQL_TYPE_SQLITE;
89 #elif HAVE_SQLITE3
90    db_type = SQL_TYPE_SQLITE3;
91 #endif
92
93    return db_init_database(jcr, db_name, db_user, db_password, db_address,
94              db_port, db_socket, mult_db_connections);
95 }
96
97 dbid_list::dbid_list()
98 {
99    memset(this, 0, sizeof(dbid_list));
100    max_ids = 1000;
101    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
102    num_ids = num_seen = tot_ids = 0;
103    PurgedFiles = NULL;
104 }
105
106 dbid_list::~dbid_list()
107 {
108    free(DBId);
109 }
110
111
112 /*
113  * Called here to retrieve an integer from the database
114  */
115 int db_int_handler(void *ctx, int num_fields, char **row)
116 {
117    uint32_t *val = (uint32_t *)ctx;
118
119    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
120
121    if (row[0]) {
122       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
123       *val = str_to_int64(row[0]);
124    } else {
125       Dmsg0(800, "int_handler finds zero\n");
126       *val = 0;
127    }
128    Dmsg0(800, "int_handler finishes\n");
129    return 0;
130 }
131
132 /*
133  * Called here to retrieve a 32/64 bit integer from the database.
134  *   The returned integer will be extended to 64 bit.
135  */
136 int db_int64_handler(void *ctx, int num_fields, char **row)
137 {
138    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
139
140    if (row[0]) {
141       lctx->value = str_to_int64(row[0]);
142       lctx->count++;
143    }
144    return 0;
145 }
146
147 /*
148  * Use to build a comma separated list of values from a query. "10,20,30"
149  */
150 int db_list_handler(void *ctx, int num_fields, char **row)
151 {
152    db_list_ctx *lctx = (db_list_ctx *)ctx;
153    if (num_fields == 1 && row[0]) {
154       if (lctx->list[0]) {
155          pm_strcat(lctx->list, ",");
156       }
157       pm_strcat(lctx->list, row[0]);
158       lctx->count++;
159    }
160    return 0;
161 }
162
163 /* NOTE!!! The following routines expect that the
164  *  calling subroutine sets and clears the mutex
165  */
166
167 /* Check that the tables correspond to the version we want */
168 bool check_tables_version(JCR *jcr, B_DB *mdb)
169 {
170    const char *query = "SELECT VersionId FROM Version";
171
172    bacula_db_version = 0;
173    if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
174       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
175       return false;
176    }
177    if (bacula_db_version != BDB_VERSION) {
178       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
179           mdb->db_name, BDB_VERSION, bacula_db_version);
180       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
181       return false;
182    }
183    return true;
184 }
185
186 /* Utility routine for queries. The database MUST be locked before calling here. */
187 int
188 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
189 {
190    int status;
191
192    sql_free_result(mdb);
193    if ((status=sql_query(mdb, cmd)) != 0) {
194       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
195       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
196       if (verbose) {
197          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
198       }
199       return 0;
200    }
201
202    mdb->result = sql_store_result(mdb);
203
204    return mdb->result != NULL;
205 }
206
207 /*
208  * Utility routine to do inserts
209  * Returns: 0 on failure
210  *          1 on success
211  */
212 int
213 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
214 {
215    if (sql_query(mdb, cmd)) {
216       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
217       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
218       if (verbose) {
219          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
220       }
221       return 0;
222    }
223    if (mdb->have_insert_id) {
224       mdb->num_rows = sql_affected_rows(mdb);
225    } else {
226       mdb->num_rows = 1;
227    }
228    if (mdb->num_rows != 1) {
229       char ed1[30];
230       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
231          edit_uint64(mdb->num_rows, ed1));
232       if (verbose) {
233          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
234       }
235       return 0;
236    }
237    mdb->changes++;
238    return 1;
239 }
240
241 /* Utility routine for updates.
242  *  Returns: 0 on failure
243  *           1 on success
244  */
245 int
246 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
247 {
248
249    if (sql_query(mdb, cmd)) {
250       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
251       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
252       if (verbose) {
253          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
254       }
255       return 0;
256    }
257    mdb->num_rows = sql_affected_rows(mdb);
258    if (mdb->num_rows < 1) {
259       char ed1[30];
260       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
261          edit_uint64(mdb->num_rows, ed1), cmd);
262       if (verbose) {
263 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
264       }
265       return 0;
266    }
267    mdb->changes++;
268    return 1;
269 }
270
271 /* Utility routine for deletes
272  *
273  * Returns: -1 on error
274  *           n number of rows affected
275  */
276 int
277 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
278 {
279
280    if (sql_query(mdb, cmd)) {
281       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
282       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
283       if (verbose) {
284          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
285       }
286       return -1;
287    }
288    mdb->changes++;
289    return sql_affected_rows(mdb);
290 }
291
292
293 /*
294  * Get record max. Query is already in mdb->cmd
295  *  No locking done
296  *
297  * Returns: -1 on failure
298  *          count on success
299  */
300 int get_sql_record_max(JCR *jcr, B_DB *mdb)
301 {
302    SQL_ROW row;
303    int stat = 0;
304
305    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
306       if ((row = sql_fetch_row(mdb)) == NULL) {
307          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
308          stat = -1;
309       } else {
310          stat = str_to_int64(row[0]);
311       }
312       sql_free_result(mdb);
313    } else {
314       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
315       stat = -1;
316    }
317    return stat;
318 }
319
320 /*
321  * Return pre-edited error message
322  */
323 char *db_strerror(B_DB *mdb)
324 {
325    return mdb->errmsg;
326 }
327
328 /*
329  * Lock database, this can be called multiple times by the same
330  *   thread without blocking, but must be unlocked the number of
331  *   times it was locked.
332  */
333 void _db_lock(const char *file, int line, B_DB *mdb)
334 {
335    int errstat;
336    if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
337       berrno be;
338       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
339            errstat, be.bstrerror(errstat));
340    }
341 }
342
343 /*
344  * Unlock the database. This can be called multiple times by the
345  *   same thread up to the number of times that thread called
346  *   db_lock()/
347  */
348 void _db_unlock(const char *file, int line, B_DB *mdb)
349 {
350    int errstat;
351    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
352       berrno be;
353       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
354            errstat, be.bstrerror(errstat));
355    }
356 }
357
358 /*
359  * Start a transaction. This groups inserts and makes things
360  *  much more efficient. Usually started when inserting
361  *  file attributes.
362  */
363 void db_start_transaction(JCR *jcr, B_DB *mdb)
364 {
365    if (!jcr->attr) {
366       jcr->attr = get_pool_memory(PM_FNAME);
367    }
368    if (!jcr->ar) {
369       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
370    }
371
372 #ifdef HAVE_SQLITE
373    if (!mdb->allow_transactions) {
374       return;
375    }
376    db_lock(mdb);
377    /* Allow only 10,000 changes per transaction */
378    if (mdb->transaction && mdb->changes > 10000) {
379       db_end_transaction(jcr, mdb);
380    }
381    if (!mdb->transaction) {
382       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
383       Dmsg0(400, "Start SQLite transaction\n");
384       mdb->transaction = 1;
385    }
386    db_unlock(mdb);
387 #endif
388
389 /*
390  * This is turned off because transactions break
391  * if multiple simultaneous jobs are run.
392  */
393 #ifdef HAVE_POSTGRESQL
394    if (!mdb->allow_transactions) {
395       return;
396    }
397    db_lock(mdb);
398    /* Allow only 25,000 changes per transaction */
399    if (mdb->transaction && mdb->changes > 25000) {
400       db_end_transaction(jcr, mdb);
401    }
402    if (!mdb->transaction) {
403       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
404       Dmsg0(400, "Start PosgreSQL transaction\n");
405       mdb->transaction = 1;
406    }
407    db_unlock(mdb);
408 #endif
409
410 #ifdef HAVE_DBI
411    if (db_type == SQL_TYPE_SQLITE) {
412       if (!mdb->allow_transactions) {
413          return;
414       }
415       db_lock(mdb);
416       /* Allow only 10,000 changes per transaction */
417       if (mdb->transaction && mdb->changes > 10000) {
418          db_end_transaction(jcr, mdb);
419       }
420       if (!mdb->transaction) {
421          //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
422          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
423          Dmsg0(400, "Start SQLite transaction\n");
424          mdb->transaction = 1;
425       }
426       db_unlock(mdb);
427    } else if (db_type == SQL_TYPE_POSTGRESQL) {
428       if (!mdb->allow_transactions) {
429          return;
430       }
431       db_lock(mdb);
432       /* Allow only 25,000 changes per transaction */
433       if (mdb->transaction && mdb->changes > 25000) {
434          db_end_transaction(jcr, mdb);
435       }
436       if (!mdb->transaction) {
437          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
438          Dmsg0(400, "Start PosgreSQL transaction\n");
439          mdb->transaction = 1;
440       }
441       db_unlock(mdb);
442    }
443 #endif
444 }
445
446 void db_end_transaction(JCR *jcr, B_DB *mdb)
447 {
448    /*
449     * This can be called during thread cleanup and
450     *   the db may already be closed.  So simply return.
451     */
452    if (!mdb) {
453       return;
454    }
455
456    if (jcr && jcr->cached_attribute) {
457       Dmsg0(400, "Flush last cached attribute.\n");
458       if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
459          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
460       }
461       jcr->cached_attribute = false;
462    }
463
464 #ifdef HAVE_SQLITE
465    if (!mdb->allow_transactions) {
466       return;
467    }
468    db_lock(mdb);
469    if (mdb->transaction) {
470       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
471       mdb->transaction = 0;
472       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
473    }
474    mdb->changes = 0;
475    db_unlock(mdb);
476 #endif
477
478 #ifdef HAVE_POSTGRESQL
479    if (!mdb->allow_transactions) {
480       return;
481    }
482    db_lock(mdb);
483    if (mdb->transaction) {
484       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
485       mdb->transaction = 0;
486       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
487    }
488    mdb->changes = 0;
489    db_unlock(mdb);
490 #endif
491
492 #ifdef HAVE_DBI
493    if (db_type == SQL_TYPE_SQLITE) {
494       if (!mdb->allow_transactions) {
495          return;
496       }
497       db_lock(mdb);
498       if (mdb->transaction) {
499          //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
500          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
501          mdb->transaction = 0;
502          Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
503       }
504       mdb->changes = 0;
505       db_unlock(mdb);
506    } else if (db_type == SQL_TYPE_POSTGRESQL) {
507       if (!mdb->allow_transactions) {
508          return;
509       }
510       db_lock(mdb);
511       if (mdb->transaction) {
512          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
513          mdb->transaction = 0;
514          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
515       }
516       mdb->changes = 0;
517       db_unlock(mdb);
518    }
519 #endif
520 }
521
522 /*
523  * Given a full filename, split it into its path
524  *  and filename parts. They are returned in pool memory
525  *  in the mdb structure.
526  */
527 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
528 {
529    const char *p, *f;
530
531    /* Find path without the filename.
532     * I.e. everything after the last / is a "filename".
533     * OK, maybe it is a directory name, but we treat it like
534     * a filename. If we don't find a / then the whole name
535     * must be a path name (e.g. c:).
536     */
537    for (p=f=fname; *p; p++) {
538       if (IsPathSeparator(*p)) {
539          f = p;                       /* set pos of last slash */
540       }
541    }
542    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
543       f++;                            /* yes, point to filename */
544    } else {                           /* no, whole thing must be path name */
545       f = p;
546    }
547
548    /* If filename doesn't exist (i.e. root directory), we
549     * simply create a blank name consisting of a single
550     * space. This makes handling zero length filenames
551     * easier.
552     */
553    mdb->fnl = p - f;
554    if (mdb->fnl > 0) {
555       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
556       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
557       mdb->fname[mdb->fnl] = 0;
558    } else {
559       mdb->fname[0] = 0;
560       mdb->fnl = 0;
561    }
562
563    mdb->pnl = f - fname;
564    if (mdb->pnl > 0) {
565       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
566       memcpy(mdb->path, fname, mdb->pnl);
567       mdb->path[mdb->pnl] = 0;
568    } else {
569       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
570       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
571       mdb->path[0] = 0;
572       mdb->pnl = 0;
573    }
574
575    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
576 }
577
578 /*
579  * Set maximum field length to something reasonable
580  */
581 static int max_length(int max_length)
582 {
583    int max_len = max_length;
584    /* Sanity check */
585    if (max_len < 0) {
586       max_len = 2;
587    } else if (max_len > 100) {
588       max_len = 100;
589    }
590    return max_len;
591 }
592
593 /*
594  * List dashes as part of header for listing SQL results in a table
595  */
596 void
597 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
598 {
599    SQL_FIELD  *field;
600    int i, j;
601    int len;
602
603    sql_field_seek(mdb, 0);
604    send(ctx, "+");
605    for (i = 0; i < sql_num_fields(mdb); i++) {
606       field = sql_fetch_field(mdb);
607       if (!field) {
608          break;
609       }
610       len = max_length(field->max_length + 2);
611       for (j = 0; j < len; j++) {
612          send(ctx, "-");
613       }
614       send(ctx, "+");
615    }
616    send(ctx, "\n");
617 }
618
619 /*
620  * If full_list is set, we list vertically, otherwise, we
621  * list on one line horizontally.
622  */
623 void
624 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
625 {
626    SQL_FIELD *field;
627    SQL_ROW row;
628    int i, col_len, max_len = 0;
629    char buf[2000], ewc[30];
630
631    Dmsg0(800, "list_result starts\n");
632    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
633       send(ctx, _("No results to list.\n"));
634       return;
635    }
636
637    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
638    /* determine column display widths */
639    sql_field_seek(mdb, 0);
640    for (i = 0; i < sql_num_fields(mdb); i++) {
641       Dmsg1(800, "list_result processing field %d\n", i);
642       field = sql_fetch_field(mdb);
643       if (!field) {
644          break;
645       }
646       col_len = cstrlen(field->name);
647       if (type == VERT_LIST) {
648          if (col_len > max_len) {
649             max_len = col_len;
650          }
651       } else {
652          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
653             field->max_length += (field->max_length - 1) / 3;
654          }
655          if (col_len < (int)field->max_length) {
656             col_len = field->max_length;
657          }
658          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
659             col_len = 4;                 /* 4 = length of the word "NULL" */
660          }
661          field->max_length = col_len;    /* reset column info */
662       }
663    }
664
665    Dmsg0(800, "list_result finished first loop\n");
666    if (type == VERT_LIST) {
667       goto vertical_list;
668    }
669
670    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
671    list_dashes(mdb, send, ctx);
672    send(ctx, "|");
673    sql_field_seek(mdb, 0);
674    for (i = 0; i < sql_num_fields(mdb); i++) {
675       Dmsg1(800, "list_result looking at field %d\n", i);
676       field = sql_fetch_field(mdb);
677       if (!field) {
678          break;
679       }
680       max_len = max_length(field->max_length);
681       bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
682       send(ctx, buf);
683    }
684    send(ctx, "\n");
685    list_dashes(mdb, send, ctx);
686
687    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
688    while ((row = sql_fetch_row(mdb)) != NULL) {
689       sql_field_seek(mdb, 0);
690       send(ctx, "|");
691       for (i = 0; i < sql_num_fields(mdb); i++) {
692          field = sql_fetch_field(mdb);
693          if (!field) {
694             break;
695          }
696          max_len = max_length(field->max_length);
697          if (row[i] == NULL) {
698             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
699          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
700             bsnprintf(buf, sizeof(buf), " %*s |", max_len,
701                       add_commas(row[i], ewc));
702          } else {
703             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
704          }
705          send(ctx, buf);
706       }
707       send(ctx, "\n");
708    }
709    list_dashes(mdb, send, ctx);
710    return;
711
712 vertical_list:
713
714    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
715    while ((row = sql_fetch_row(mdb)) != NULL) {
716       sql_field_seek(mdb, 0);
717       for (i = 0; i < sql_num_fields(mdb); i++) {
718          field = sql_fetch_field(mdb);
719          if (!field) {
720             break;
721          }
722          if (row[i] == NULL) {
723             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
724          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
725             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
726                 add_commas(row[i], ewc));
727          } else {
728             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
729          }
730          send(ctx, buf);
731       }
732       send(ctx, "\n");
733    }
734    return;
735 }
736
737 /* 
738  * Open a new connexion to mdb catalog. This function is used
739  * by batch and accurate mode.
740  */
741 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
742 {
743    int multi_db=false;
744
745 #ifdef HAVE_BATCH_FILE_INSERT
746    multi_db=true;               /* we force a new connexion only if batch insert is enabled */
747 #endif
748
749    if (!jcr->db_batch) {
750       jcr->db_batch = db_init_database(jcr, 
751                                       mdb->db_name, 
752                                       mdb->db_user,
753                                       mdb->db_password, 
754                                       mdb->db_address,
755                                       mdb->db_port,
756                                       mdb->db_socket,
757                                       multi_db /* multi_db = true when using batch mode */);
758       if (!jcr->db_batch) {
759          Jmsg0(jcr, M_FATAL, 0, "Could not init batch connexion");
760          return false;
761       }
762
763       if (!db_open_database(jcr, jcr->db_batch)) {
764          Mmsg2(&jcr->db_batch->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
765               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
766          Jmsg1(jcr, M_FATAL, 0, "%s", jcr->db_batch->errmsg);
767          return false;
768       }      
769       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
770             jcr->db_batch->connected, jcr->db_batch->db);
771
772    }
773    return true;
774 }
775
776 /*
777  * !!! WARNING !!! Use this function only when bacula is stopped.
778  * ie, after a fatal signal and before exiting the program
779  * Print information about a B_DB object.
780  */
781 void _dbg_print_db(JCR *jcr, FILE *fp)
782 {
783    B_DB *mdb = jcr->db;
784
785    if (!mdb) {
786       return;
787    }
788
789    fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
790            mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
791    fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
792    if (mdb->lock.valid == RWLOCK_VALID) { 
793       fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
794    }
795 }
796
797 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/