]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
Drop Ingres specific versions of queries with #ifdefs
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;        /* SQL engine type index */
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60               const char *db_password, const char *db_address, int db_port,
61               const char *db_socket, int mult_db_connections)
62 {
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else if (strcasecmp(p, "sqlite3") == 0) {
79       db_type = SQL_TYPE_SQLITE3;
80    } else if (strcasecmp(p, "ingres") == 0) {
81       db_type = SQL_TYPE_INGRES;
82    } else {
83       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
84    }
85 #elif HAVE_MYSQL
86    db_type = SQL_TYPE_MYSQL;
87 #elif HAVE_POSTGRESQL
88    db_type = SQL_TYPE_POSTGRESQL;
89 #elif HAVE_INGRES
90    db_type = SQL_TYPE_INGRES;
91 #elif HAVE_SQLITE
92    db_type = SQL_TYPE_SQLITE;
93 #elif HAVE_SQLITE3
94    db_type = SQL_TYPE_SQLITE3;
95 #endif
96
97    return db_init_database(jcr, db_name, db_user, db_password, db_address,
98              db_port, db_socket, mult_db_connections);
99 }
100
101 dbid_list::dbid_list()
102 {
103    memset(this, 0, sizeof(dbid_list));
104    max_ids = 1000;
105    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
106    num_ids = num_seen = tot_ids = 0;
107    PurgedFiles = NULL;
108 }
109
110 dbid_list::~dbid_list()
111 {
112    free(DBId);
113 }
114
115 /*
116  * Called here to retrieve an integer from the database
117  */
118 int db_int_handler(void *ctx, int num_fields, char **row)
119 {
120    uint32_t *val = (uint32_t *)ctx;
121
122    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
123
124    if (row[0]) {
125       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
126       *val = str_to_int64(row[0]);
127    } else {
128       Dmsg0(800, "int_handler finds zero\n");
129       *val = 0;
130    }
131    Dmsg0(800, "int_handler finishes\n");
132    return 0;
133 }
134
135 /*
136  * Called here to retrieve a 32/64 bit integer from the database.
137  *   The returned integer will be extended to 64 bit.
138  */
139 int db_int64_handler(void *ctx, int num_fields, char **row)
140 {
141    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
142
143    if (row[0]) {
144       lctx->value = str_to_int64(row[0]);
145       lctx->count++;
146    }
147    return 0;
148 }
149
150 /*
151  * Use to build a comma separated list of values from a query. "10,20,30"
152  */
153 int db_list_handler(void *ctx, int num_fields, char **row)
154 {
155    db_list_ctx *lctx = (db_list_ctx *)ctx;
156    if (num_fields == 1 && row[0]) {
157       if (lctx->list[0]) {
158          pm_strcat(lctx->list, ",");
159       }
160       pm_strcat(lctx->list, row[0]);
161       lctx->count++;
162    }
163    return 0;
164 }
165
166
167 /*
168  * Called here to retrieve an integer from the database
169  */
170 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
171 {
172    uint32_t *val = (uint32_t *)ctx;
173    uint32_t index = sql_get_max_connections_index[db_type];
174    if (row[index]) {
175       *val = str_to_int64(row[index]);
176    } else {
177       Dmsg0(800, "int_handler finds zero\n");
178       *val = 0;
179    }
180    return 0;
181 }
182
183 /* 
184  * Check catalog max_connections setting
185  */
186 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
187 {
188 #ifdef HAVE_BATCH_FILE_INSERT
189
190    uint32_t max_conn = 0;
191
192    /* With Batch insert, verify max_connections */
193    if (!db_sql_query(mdb, sql_get_max_connections[db_type], 
194                      db_max_connections_handler, &max_conn)) {
195       Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
196       return false;
197    }
198    if (max_conn && max_concurrent_jobs > max_conn) {
199       Mmsg(mdb->errmsg, 
200            _("Potential performance problem:\n"
201              "max_connections=%d set for %s database \"%s\" should be larger than Director's "
202              "MaxConcurrentJobs=%d\n"),
203            max_conn, db_get_type(), mdb->db_name, max_concurrent_jobs);
204       Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
205       return false;
206    }
207
208 #endif
209
210    return true;
211 }
212
213 /* NOTE!!! The following routines expect that the
214  *  calling subroutine sets and clears the mutex
215  */
216
217 /* Check that the tables correspond to the version we want */
218 bool check_tables_version(JCR *jcr, B_DB *mdb)
219 {
220    const char *query = "SELECT VersionId FROM Version";
221
222    bacula_db_version = 0;
223    if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
224       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
225       return false;
226    }
227    if (bacula_db_version != BDB_VERSION) {
228       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
229           mdb->db_name, BDB_VERSION, bacula_db_version);
230       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
231       return false;
232    }
233    return true;
234 }
235
236 /* Utility routine for queries. The database MUST be locked before calling here. */
237 int
238 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
239 {
240    int status;
241
242    sql_free_result(mdb);
243    if ((status=sql_query(mdb, cmd)) != 0) {
244       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
245       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
246       if (verbose) {
247          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
248       }
249       return 0;
250    }
251
252    mdb->result = sql_store_result(mdb);
253
254    return mdb->result != NULL;
255 }
256
257 /*
258  * Utility routine to do inserts
259  * Returns: 0 on failure
260  *          1 on success
261  */
262 int
263 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
264 {
265    if (sql_query(mdb, cmd)) {
266       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
267       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
268       if (verbose) {
269          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
270       }
271       return 0;
272    }
273    if (mdb->have_insert_id) {
274       mdb->num_rows = sql_affected_rows(mdb);
275    } else {
276       mdb->num_rows = 1;
277    }
278    if (mdb->num_rows != 1) {
279       char ed1[30];
280       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
281          edit_uint64(mdb->num_rows, ed1));
282       if (verbose) {
283          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
284       }
285       return 0;
286    }
287    mdb->changes++;
288    return 1;
289 }
290
291 /* Utility routine for updates.
292  *  Returns: 0 on failure
293  *           1 on success
294  */
295 int
296 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
297 {
298
299    if (sql_query(mdb, cmd)) {
300       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
301       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
302       if (verbose) {
303          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
304       }
305       return 0;
306    }
307    mdb->num_rows = sql_affected_rows(mdb);
308    if (mdb->num_rows < 1) {
309       char ed1[30];
310       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
311          edit_uint64(mdb->num_rows, ed1), cmd);
312       if (verbose) {
313 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
314       }
315       return 0;
316    }
317    mdb->changes++;
318    return 1;
319 }
320
321 /* Utility routine for deletes
322  *
323  * Returns: -1 on error
324  *           n number of rows affected
325  */
326 int
327 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
328 {
329
330    if (sql_query(mdb, cmd)) {
331       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
332       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
333       if (verbose) {
334          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
335       }
336       return -1;
337    }
338    mdb->changes++;
339    return sql_affected_rows(mdb);
340 }
341
342
343 /*
344  * Get record max. Query is already in mdb->cmd
345  *  No locking done
346  *
347  * Returns: -1 on failure
348  *          count on success
349  */
350 int get_sql_record_max(JCR *jcr, B_DB *mdb)
351 {
352    SQL_ROW row;
353    int stat = 0;
354
355    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
356       if ((row = sql_fetch_row(mdb)) == NULL) {
357          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
358          stat = -1;
359       } else {
360          stat = str_to_int64(row[0]);
361       }
362       sql_free_result(mdb);
363    } else {
364       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
365       stat = -1;
366    }
367    return stat;
368 }
369
370 /*
371  * Return pre-edited error message
372  */
373 char *db_strerror(B_DB *mdb)
374 {
375    return mdb->errmsg;
376 }
377
378 /*
379  * Lock database, this can be called multiple times by the same
380  *   thread without blocking, but must be unlocked the number of
381  *   times it was locked.
382  */
383 void _db_lock(const char *file, int line, B_DB *mdb)
384 {
385    int errstat;
386    if ((errstat=rwl_writelock_p(&mdb->lock, file, line)) != 0) {
387       berrno be;
388       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
389            errstat, be.bstrerror(errstat));
390    }
391 }
392
393 /*
394  * Unlock the database. This can be called multiple times by the
395  *   same thread up to the number of times that thread called
396  *   db_lock()/
397  */
398 void _db_unlock(const char *file, int line, B_DB *mdb)
399 {
400    int errstat;
401    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
402       berrno be;
403       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
404            errstat, be.bstrerror(errstat));
405    }
406 }
407
408 /*
409  * Start a transaction. This groups inserts and makes things
410  *  much more efficient. Usually started when inserting
411  *  file attributes.
412  */
413 void db_start_transaction(JCR *jcr, B_DB *mdb)
414 {
415    if (!jcr->attr) {
416       jcr->attr = get_pool_memory(PM_FNAME);
417    }
418    if (!jcr->ar) {
419       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
420    }
421
422 #ifdef HAVE_SQLITE
423    if (!mdb->allow_transactions) {
424       return;
425    }
426    db_lock(mdb);
427    /* Allow only 10,000 changes per transaction */
428    if (mdb->transaction && mdb->changes > 10000) {
429       db_end_transaction(jcr, mdb);
430    }
431    if (!mdb->transaction) {
432       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
433       Dmsg0(400, "Start SQLite transaction\n");
434       mdb->transaction = 1;
435    }
436    db_unlock(mdb);
437 #endif
438
439 /*
440  * This is turned off because transactions break
441  * if multiple simultaneous jobs are run.
442  */
443 #ifdef HAVE_POSTGRESQL
444    if (!mdb->allow_transactions) {
445       return;
446    }
447    db_lock(mdb);
448    /* Allow only 25,000 changes per transaction */
449    if (mdb->transaction && mdb->changes > 25000) {
450       db_end_transaction(jcr, mdb);
451    }
452    if (!mdb->transaction) {
453       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
454       Dmsg0(400, "Start PosgreSQL transaction\n");
455       mdb->transaction = 1;
456    }
457    db_unlock(mdb);
458 #endif
459
460 #ifdef HAVE_INGRES
461    if (!mdb->allow_transactions) {
462       return;
463    }
464    db_lock(mdb);
465    /* Allow only 25,000 changes per transaction */
466    if (mdb->transaction && mdb->changes > 25000) {
467       db_end_transaction(jcr, mdb);
468    }
469    if (!mdb->transaction) {
470       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
471       Dmsg0(400, "Start Ingres transaction\n");
472       mdb->transaction = 1;
473    }
474    db_unlock(mdb);
475 #endif
476
477 #ifdef HAVE_DBI
478    if (db_type == SQL_TYPE_SQLITE) {
479       if (!mdb->allow_transactions) {
480          return;
481       }
482       db_lock(mdb);
483       /* Allow only 10,000 changes per transaction */
484       if (mdb->transaction && mdb->changes > 10000) {
485          db_end_transaction(jcr, mdb);
486       }
487       if (!mdb->transaction) {
488          //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
489          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
490          Dmsg0(400, "Start SQLite transaction\n");
491          mdb->transaction = 1;
492       }
493       db_unlock(mdb);
494    } else if (db_type == SQL_TYPE_POSTGRESQL) {
495       if (!mdb->allow_transactions) {
496          return;
497       }
498       db_lock(mdb);
499       /* Allow only 25,000 changes per transaction */
500       if (mdb->transaction && mdb->changes > 25000) {
501          db_end_transaction(jcr, mdb);
502       }
503       if (!mdb->transaction) {
504          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
505          Dmsg0(400, "Start PosgreSQL transaction\n");
506          mdb->transaction = 1;
507       }
508       db_unlock(mdb);
509    }
510 #endif
511 }
512
513 void db_end_transaction(JCR *jcr, B_DB *mdb)
514 {
515    /*
516     * This can be called during thread cleanup and
517     *   the db may already be closed.  So simply return.
518     */
519    if (!mdb) {
520       return;
521    }
522
523    if (jcr && jcr->cached_attribute) {
524       Dmsg0(400, "Flush last cached attribute.\n");
525       if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
526          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
527       }
528       jcr->cached_attribute = false;
529    }
530
531 #ifdef HAVE_SQLITE
532    if (!mdb->allow_transactions) {
533       return;
534    }
535    db_lock(mdb);
536    if (mdb->transaction) {
537       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
538       mdb->transaction = 0;
539       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
540    }
541    mdb->changes = 0;
542    db_unlock(mdb);
543 #endif
544
545
546
547 #ifdef HAVE_INGRES
548    if (!mdb->allow_transactions) {
549       return;
550    }
551    db_lock(mdb);
552    if (mdb->transaction) {
553       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
554       mdb->transaction = 0;
555       Dmsg1(400, "End Ingres transaction changes=%d\n", mdb->changes);
556    }
557    mdb->changes = 0;
558    db_unlock(mdb);
559 #endif
560
561
562 #ifdef HAVE_POSTGRESQL
563    if (!mdb->allow_transactions) {
564       return;
565    }
566    db_lock(mdb);
567    if (mdb->transaction) {
568       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
569       mdb->transaction = 0;
570       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
571    }
572    mdb->changes = 0;
573    db_unlock(mdb);
574 #endif
575
576 #ifdef HAVE_DBI
577    if (db_type == SQL_TYPE_SQLITE) {
578       if (!mdb->allow_transactions) {
579          return;
580       }
581       db_lock(mdb);
582       if (mdb->transaction) {
583          //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
584          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
585          mdb->transaction = 0;
586          Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
587       }
588       mdb->changes = 0;
589       db_unlock(mdb);
590    } else if (db_type == SQL_TYPE_POSTGRESQL) {
591       if (!mdb->allow_transactions) {
592          return;
593       }
594       db_lock(mdb);
595       if (mdb->transaction) {
596          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
597          mdb->transaction = 0;
598          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
599       }
600       mdb->changes = 0;
601       db_unlock(mdb);
602    }
603 #endif
604 }
605
606 /*
607  * Given a full filename, split it into its path
608  *  and filename parts. They are returned in pool memory
609  *  in the mdb structure.
610  */
611 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
612 {
613    const char *p, *f;
614
615    /* Find path without the filename.
616     * I.e. everything after the last / is a "filename".
617     * OK, maybe it is a directory name, but we treat it like
618     * a filename. If we don't find a / then the whole name
619     * must be a path name (e.g. c:).
620     */
621    for (p=f=fname; *p; p++) {
622       if (IsPathSeparator(*p)) {
623          f = p;                       /* set pos of last slash */
624       }
625    }
626    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
627       f++;                            /* yes, point to filename */
628    } else {                           /* no, whole thing must be path name */
629       f = p;
630    }
631
632    /* If filename doesn't exist (i.e. root directory), we
633     * simply create a blank name consisting of a single
634     * space. This makes handling zero length filenames
635     * easier.
636     */
637    mdb->fnl = p - f;
638    if (mdb->fnl > 0) {
639       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
640       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
641       mdb->fname[mdb->fnl] = 0;
642    } else {
643       mdb->fname[0] = 0;
644       mdb->fnl = 0;
645    }
646
647    mdb->pnl = f - fname;
648    if (mdb->pnl > 0) {
649       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
650       memcpy(mdb->path, fname, mdb->pnl);
651       mdb->path[mdb->pnl] = 0;
652    } else {
653       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
654       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
655       mdb->path[0] = 0;
656       mdb->pnl = 0;
657    }
658
659    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
660 }
661
662 /*
663  * Set maximum field length to something reasonable
664  */
665 static int max_length(int max_length)
666 {
667    int max_len = max_length;
668    /* Sanity check */
669    if (max_len < 0) {
670       max_len = 2;
671    } else if (max_len > 100) {
672       max_len = 100;
673    }
674    return max_len;
675 }
676
677 /*
678  * List dashes as part of header for listing SQL results in a table
679  */
680 void
681 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
682 {
683    SQL_FIELD  *field;
684    int i, j;
685    int len;
686
687    sql_field_seek(mdb, 0);
688    send(ctx, "+");
689    for (i = 0; i < sql_num_fields(mdb); i++) {
690       field = sql_fetch_field(mdb);
691       if (!field) {
692          break;
693       }
694       len = max_length(field->max_length + 2);
695       for (j = 0; j < len; j++) {
696          send(ctx, "-");
697       }
698       send(ctx, "+");
699    }
700    send(ctx, "\n");
701 }
702
703 /*
704  * If full_list is set, we list vertically, otherwise, we
705  * list on one line horizontally.
706  */
707 void
708 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
709 {
710    SQL_FIELD *field;
711    SQL_ROW row;
712    int i, col_len, max_len = 0;
713    char buf[2000], ewc[30];
714
715    Dmsg0(800, "list_result starts\n");
716    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
717       send(ctx, _("No results to list.\n"));
718       return;
719    }
720
721    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
722    /* determine column display widths */
723    sql_field_seek(mdb, 0);
724    for (i = 0; i < sql_num_fields(mdb); i++) {
725       Dmsg1(800, "list_result processing field %d\n", i);
726       field = sql_fetch_field(mdb);
727       if (!field) {
728          break;
729       }
730       col_len = cstrlen(field->name);
731       if (type == VERT_LIST) {
732          if (col_len > max_len) {
733             max_len = col_len;
734          }
735       } else {
736          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
737             field->max_length += (field->max_length - 1) / 3;
738          }
739          if (col_len < (int)field->max_length) {
740             col_len = field->max_length;
741          }
742          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
743             col_len = 4;                 /* 4 = length of the word "NULL" */
744          }
745          field->max_length = col_len;    /* reset column info */
746       }
747    }
748
749    Dmsg0(800, "list_result finished first loop\n");
750    if (type == VERT_LIST) {
751       goto vertical_list;
752    }
753
754    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
755    list_dashes(mdb, send, ctx);
756    send(ctx, "|");
757    sql_field_seek(mdb, 0);
758    for (i = 0; i < sql_num_fields(mdb); i++) {
759       Dmsg1(800, "list_result looking at field %d\n", i);
760       field = sql_fetch_field(mdb);
761       if (!field) {
762          break;
763       }
764       max_len = max_length(field->max_length);
765       bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
766       send(ctx, buf);
767    }
768    send(ctx, "\n");
769    list_dashes(mdb, send, ctx);
770
771    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
772    while ((row = sql_fetch_row(mdb)) != NULL) {
773       sql_field_seek(mdb, 0);
774       send(ctx, "|");
775       for (i = 0; i < sql_num_fields(mdb); i++) {
776          field = sql_fetch_field(mdb);
777          if (!field) {
778             break;
779          }
780          max_len = max_length(field->max_length);
781          if (row[i] == NULL) {
782             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
783          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
784             bsnprintf(buf, sizeof(buf), " %*s |", max_len,
785                       add_commas(row[i], ewc));
786          } else {
787             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
788          }
789          send(ctx, buf);
790       }
791       send(ctx, "\n");
792    }
793    list_dashes(mdb, send, ctx);
794    return;
795
796 vertical_list:
797
798    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
799    while ((row = sql_fetch_row(mdb)) != NULL) {
800       sql_field_seek(mdb, 0);
801       for (i = 0; i < sql_num_fields(mdb); i++) {
802          field = sql_fetch_field(mdb);
803          if (!field) {
804             break;
805          }
806          if (row[i] == NULL) {
807             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
808          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
809             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
810                 add_commas(row[i], ewc));
811          } else {
812             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
813          }
814          send(ctx, buf);
815       }
816       send(ctx, "\n");
817    }
818    return;
819 }
820
821 /* 
822  * Open a new connexion to mdb catalog. This function is used
823  * by batch and accurate mode.
824  */
825 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
826 {
827 #ifdef HAVE_BATCH_FILE_INSERT
828    const int multi_db = true;   /* we force a new connection only if batch insert is enabled */
829 #else
830    const int multi_db = false;
831 #endif
832
833    if (!jcr->db_batch) {
834       jcr->db_batch = db_init_database(jcr, 
835                                       mdb->db_name, 
836                                       mdb->db_user,
837                                       mdb->db_password, 
838                                       mdb->db_address,
839                                       mdb->db_port,
840                                       mdb->db_socket,
841                                       multi_db /* multi_db = true when using batch mode */);
842       if (!jcr->db_batch) {
843          Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
844          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
845          return false;
846       }
847
848       if (!db_open_database(jcr, jcr->db_batch)) {
849          Mmsg2(&mdb->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
850               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
851          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
852          return false;
853       }      
854       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
855             jcr->db_batch->connected, jcr->db_batch->db);
856
857    }
858    return true;
859 }
860
861 /*
862  * !!! WARNING !!! Use this function only when bacula is stopped.
863  * ie, after a fatal signal and before exiting the program
864  * Print information about a B_DB object.
865  */
866 void db_debug_print(JCR *jcr, FILE *fp)
867 {
868    B_DB *mdb = jcr->db;
869
870    if (!mdb) {
871       return;
872    }
873
874    fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
875            mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
876    fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
877    if (mdb->lock.valid == RWLOCK_VALID) { 
878       fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
879    }
880 }
881
882 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES*/