]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
Fix SQL warning message about concurrency pointed out by Graham
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;        /* SQL engine type index */
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60               const char *db_password, const char *db_address, int db_port,
61               const char *db_socket, int mult_db_connections)
62 {
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else if (strcasecmp(p, "sqlite3") == 0) {
79       db_type = SQL_TYPE_SQLITE3;
80    } else if (strcasecmp(p, "ingres") == 0) {
81       db_type = SQL_TYPE_INGRES;
82    } else {
83       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
84    }
85 #elif HAVE_MYSQL
86    db_type = SQL_TYPE_MYSQL;
87 #elif HAVE_POSTGRESQL
88    db_type = SQL_TYPE_POSTGRESQL;
89 #elif HAVE_INGRES
90    db_type = SQL_TYPE_INGRES;
91 #elif HAVE_SQLITE
92    db_type = SQL_TYPE_SQLITE;
93 #elif HAVE_SQLITE3
94    db_type = SQL_TYPE_SQLITE3;
95 #endif
96
97    return db_init_database(jcr, db_name, db_user, db_password, db_address,
98              db_port, db_socket, mult_db_connections);
99 }
100
101 dbid_list::dbid_list()
102 {
103    memset(this, 0, sizeof(dbid_list));
104    max_ids = 1000;
105    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
106    num_ids = num_seen = tot_ids = 0;
107    PurgedFiles = NULL;
108 }
109
110 dbid_list::~dbid_list()
111 {
112    free(DBId);
113 }
114
115 /*
116  * Called here to retrieve an integer from the database
117  */
118 int db_int_handler(void *ctx, int num_fields, char **row)
119 {
120    uint32_t *val = (uint32_t *)ctx;
121
122    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
123
124    if (row[0]) {
125       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
126       *val = str_to_int64(row[0]);
127    } else {
128       Dmsg0(800, "int_handler finds zero\n");
129       *val = 0;
130    }
131    Dmsg0(800, "int_handler finishes\n");
132    return 0;
133 }
134
135 /*
136  * Called here to retrieve a 32/64 bit integer from the database.
137  *   The returned integer will be extended to 64 bit.
138  */
139 int db_int64_handler(void *ctx, int num_fields, char **row)
140 {
141    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
142
143    if (row[0]) {
144       lctx->value = str_to_int64(row[0]);
145       lctx->count++;
146    }
147    return 0;
148 }
149
150 /*
151  * Use to build a comma separated list of values from a query. "10,20,30"
152  */
153 int db_list_handler(void *ctx, int num_fields, char **row)
154 {
155    db_list_ctx *lctx = (db_list_ctx *)ctx;
156    if (num_fields == 1 && row[0]) {
157       if (lctx->list[0]) {
158          pm_strcat(lctx->list, ",");
159       }
160       pm_strcat(lctx->list, row[0]);
161       lctx->count++;
162    }
163    return 0;
164 }
165
166
167 /*
168  * Called here to retrieve an integer from the database
169  */
170 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
171 {
172    uint32_t *val = (uint32_t *)ctx;
173    uint32_t index = sql_get_max_connections_index[db_type];
174    if (row[index]) {
175       *val = str_to_int64(row[index]);
176    } else {
177       Dmsg0(800, "int_handler finds zero\n");
178       *val = 0;
179    }
180    return 0;
181 }
182
183 /* 
184  * Check catalog max_connections setting
185  */
186 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
187 {
188    uint32_t max_conn=0;
189    int ret=true;
190
191    /* Without Batch insert, no need to verify max_connections */
192 #ifndef HAVE_BATCH_FILE_INSERT
193    return ret;
194 #endif
195
196    /* Check max_connections setting */
197    if (!db_sql_query(mdb, sql_get_max_connections[db_type], 
198                      db_max_connections_handler, &max_conn)) {
199       Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
200       return ret;
201    }
202    if (max_conn && max_concurrent_jobs && max_concurrent_jobs > max_conn) {
203       Mmsg(mdb->errmsg, 
204            _("Potential performance problem:\n"
205              "max_connections=%d set for %s database \"%s\" should be larger than Director's "
206              "MaxConcurrentJobs=%d\n"),
207            max_conn, db_get_type(), mdb->db_name, max_concurrent_jobs);
208       Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
209       ret = false;
210    }
211
212    return ret;
213 }
214
215 /* NOTE!!! The following routines expect that the
216  *  calling subroutine sets and clears the mutex
217  */
218
219 /* Check that the tables correspond to the version we want */
220 bool check_tables_version(JCR *jcr, B_DB *mdb)
221 {
222    const char *query = "SELECT VersionId FROM Version";
223
224    bacula_db_version = 0;
225    if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
226       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
227       return false;
228    }
229    if (bacula_db_version != BDB_VERSION) {
230       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
231           mdb->db_name, BDB_VERSION, bacula_db_version);
232       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
233       return false;
234    }
235    return true;
236 }
237
238 /* Utility routine for queries. The database MUST be locked before calling here. */
239 int
240 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
241 {
242    int status;
243
244    sql_free_result(mdb);
245    if ((status=sql_query(mdb, cmd)) != 0) {
246       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
247       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
248       if (verbose) {
249          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
250       }
251       return 0;
252    }
253
254    mdb->result = sql_store_result(mdb);
255
256    return mdb->result != NULL;
257 }
258
259 /*
260  * Utility routine to do inserts
261  * Returns: 0 on failure
262  *          1 on success
263  */
264 int
265 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
266 {
267    if (sql_query(mdb, cmd)) {
268       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
269       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
270       if (verbose) {
271          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
272       }
273       return 0;
274    }
275    if (mdb->have_insert_id) {
276       mdb->num_rows = sql_affected_rows(mdb);
277    } else {
278       mdb->num_rows = 1;
279    }
280    if (mdb->num_rows != 1) {
281       char ed1[30];
282       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
283          edit_uint64(mdb->num_rows, ed1));
284       if (verbose) {
285          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
286       }
287       return 0;
288    }
289    mdb->changes++;
290    return 1;
291 }
292
293 /* Utility routine for updates.
294  *  Returns: 0 on failure
295  *           1 on success
296  */
297 int
298 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
299 {
300
301    if (sql_query(mdb, cmd)) {
302       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
303       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
304       if (verbose) {
305          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
306       }
307       return 0;
308    }
309    mdb->num_rows = sql_affected_rows(mdb);
310    if (mdb->num_rows < 1) {
311       char ed1[30];
312       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
313          edit_uint64(mdb->num_rows, ed1), cmd);
314       if (verbose) {
315 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
316       }
317       return 0;
318    }
319    mdb->changes++;
320    return 1;
321 }
322
323 /* Utility routine for deletes
324  *
325  * Returns: -1 on error
326  *           n number of rows affected
327  */
328 int
329 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
330 {
331
332    if (sql_query(mdb, cmd)) {
333       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
334       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
335       if (verbose) {
336          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
337       }
338       return -1;
339    }
340    mdb->changes++;
341    return sql_affected_rows(mdb);
342 }
343
344
345 /*
346  * Get record max. Query is already in mdb->cmd
347  *  No locking done
348  *
349  * Returns: -1 on failure
350  *          count on success
351  */
352 int get_sql_record_max(JCR *jcr, B_DB *mdb)
353 {
354    SQL_ROW row;
355    int stat = 0;
356
357    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
358       if ((row = sql_fetch_row(mdb)) == NULL) {
359          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
360          stat = -1;
361       } else {
362          stat = str_to_int64(row[0]);
363       }
364       sql_free_result(mdb);
365    } else {
366       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
367       stat = -1;
368    }
369    return stat;
370 }
371
372 /*
373  * Return pre-edited error message
374  */
375 char *db_strerror(B_DB *mdb)
376 {
377    return mdb->errmsg;
378 }
379
380 /*
381  * Lock database, this can be called multiple times by the same
382  *   thread without blocking, but must be unlocked the number of
383  *   times it was locked.
384  */
385 void _db_lock(const char *file, int line, B_DB *mdb)
386 {
387    int errstat;
388    if ((errstat=rwl_writelock_p(&mdb->lock, file, line)) != 0) {
389       berrno be;
390       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
391            errstat, be.bstrerror(errstat));
392    }
393 }
394
395 /*
396  * Unlock the database. This can be called multiple times by the
397  *   same thread up to the number of times that thread called
398  *   db_lock()/
399  */
400 void _db_unlock(const char *file, int line, B_DB *mdb)
401 {
402    int errstat;
403    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
404       berrno be;
405       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
406            errstat, be.bstrerror(errstat));
407    }
408 }
409
410 /*
411  * Start a transaction. This groups inserts and makes things
412  *  much more efficient. Usually started when inserting
413  *  file attributes.
414  */
415 void db_start_transaction(JCR *jcr, B_DB *mdb)
416 {
417    if (!jcr->attr) {
418       jcr->attr = get_pool_memory(PM_FNAME);
419    }
420    if (!jcr->ar) {
421       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
422    }
423
424 #ifdef HAVE_SQLITE
425    if (!mdb->allow_transactions) {
426       return;
427    }
428    db_lock(mdb);
429    /* Allow only 10,000 changes per transaction */
430    if (mdb->transaction && mdb->changes > 10000) {
431       db_end_transaction(jcr, mdb);
432    }
433    if (!mdb->transaction) {
434       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
435       Dmsg0(400, "Start SQLite transaction\n");
436       mdb->transaction = 1;
437    }
438    db_unlock(mdb);
439 #endif
440
441 /*
442  * This is turned off because transactions break
443  * if multiple simultaneous jobs are run.
444  */
445 #ifdef HAVE_POSTGRESQL
446    if (!mdb->allow_transactions) {
447       return;
448    }
449    db_lock(mdb);
450    /* Allow only 25,000 changes per transaction */
451    if (mdb->transaction && mdb->changes > 25000) {
452       db_end_transaction(jcr, mdb);
453    }
454    if (!mdb->transaction) {
455       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
456       Dmsg0(400, "Start PosgreSQL transaction\n");
457       mdb->transaction = 1;
458    }
459    db_unlock(mdb);
460 #endif
461
462 #ifdef HAVE_INGRES
463    if (!mdb->allow_transactions) {
464       return;
465    }
466    db_lock(mdb);
467    /* Allow only 25,000 changes per transaction */
468    if (mdb->transaction && mdb->changes > 25000) {
469       db_end_transaction(jcr, mdb);
470    }
471    if (!mdb->transaction) {
472       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
473       Dmsg0(400, "Start Ingres transaction\n");
474       mdb->transaction = 1;
475    }
476    db_unlock(mdb);
477 #endif
478
479 #ifdef HAVE_DBI
480    if (db_type == SQL_TYPE_SQLITE) {
481       if (!mdb->allow_transactions) {
482          return;
483       }
484       db_lock(mdb);
485       /* Allow only 10,000 changes per transaction */
486       if (mdb->transaction && mdb->changes > 10000) {
487          db_end_transaction(jcr, mdb);
488       }
489       if (!mdb->transaction) {
490          //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
491          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
492          Dmsg0(400, "Start SQLite transaction\n");
493          mdb->transaction = 1;
494       }
495       db_unlock(mdb);
496    } else if (db_type == SQL_TYPE_POSTGRESQL) {
497       if (!mdb->allow_transactions) {
498          return;
499       }
500       db_lock(mdb);
501       /* Allow only 25,000 changes per transaction */
502       if (mdb->transaction && mdb->changes > 25000) {
503          db_end_transaction(jcr, mdb);
504       }
505       if (!mdb->transaction) {
506          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
507          Dmsg0(400, "Start PosgreSQL transaction\n");
508          mdb->transaction = 1;
509       }
510       db_unlock(mdb);
511    }
512 #endif
513 }
514
515 void db_end_transaction(JCR *jcr, B_DB *mdb)
516 {
517    /*
518     * This can be called during thread cleanup and
519     *   the db may already be closed.  So simply return.
520     */
521    if (!mdb) {
522       return;
523    }
524
525    if (jcr && jcr->cached_attribute) {
526       Dmsg0(400, "Flush last cached attribute.\n");
527       if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
528          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
529       }
530       jcr->cached_attribute = false;
531    }
532
533 #ifdef HAVE_SQLITE
534    if (!mdb->allow_transactions) {
535       return;
536    }
537    db_lock(mdb);
538    if (mdb->transaction) {
539       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
540       mdb->transaction = 0;
541       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
542    }
543    mdb->changes = 0;
544    db_unlock(mdb);
545 #endif
546
547
548
549 #ifdef HAVE_INGRES
550    if (!mdb->allow_transactions) {
551       return;
552    }
553    db_lock(mdb);
554    if (mdb->transaction) {
555       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
556       mdb->transaction = 0;
557       Dmsg1(400, "End Ingres transaction changes=%d\n", mdb->changes);
558    }
559    mdb->changes = 0;
560    db_unlock(mdb);
561 #endif
562
563
564 #ifdef HAVE_POSTGRESQL
565    if (!mdb->allow_transactions) {
566       return;
567    }
568    db_lock(mdb);
569    if (mdb->transaction) {
570       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
571       mdb->transaction = 0;
572       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
573    }
574    mdb->changes = 0;
575    db_unlock(mdb);
576 #endif
577
578 #ifdef HAVE_DBI
579    if (db_type == SQL_TYPE_SQLITE) {
580       if (!mdb->allow_transactions) {
581          return;
582       }
583       db_lock(mdb);
584       if (mdb->transaction) {
585          //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
586          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
587          mdb->transaction = 0;
588          Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
589       }
590       mdb->changes = 0;
591       db_unlock(mdb);
592    } else if (db_type == SQL_TYPE_POSTGRESQL) {
593       if (!mdb->allow_transactions) {
594          return;
595       }
596       db_lock(mdb);
597       if (mdb->transaction) {
598          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
599          mdb->transaction = 0;
600          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
601       }
602       mdb->changes = 0;
603       db_unlock(mdb);
604    }
605 #endif
606 }
607
608 /*
609  * Given a full filename, split it into its path
610  *  and filename parts. They are returned in pool memory
611  *  in the mdb structure.
612  */
613 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
614 {
615    const char *p, *f;
616
617    /* Find path without the filename.
618     * I.e. everything after the last / is a "filename".
619     * OK, maybe it is a directory name, but we treat it like
620     * a filename. If we don't find a / then the whole name
621     * must be a path name (e.g. c:).
622     */
623    for (p=f=fname; *p; p++) {
624       if (IsPathSeparator(*p)) {
625          f = p;                       /* set pos of last slash */
626       }
627    }
628    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
629       f++;                            /* yes, point to filename */
630    } else {                           /* no, whole thing must be path name */
631       f = p;
632    }
633
634    /* If filename doesn't exist (i.e. root directory), we
635     * simply create a blank name consisting of a single
636     * space. This makes handling zero length filenames
637     * easier.
638     */
639    mdb->fnl = p - f;
640    if (mdb->fnl > 0) {
641       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
642       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
643       mdb->fname[mdb->fnl] = 0;
644    } else {
645       mdb->fname[0] = 0;
646       mdb->fnl = 0;
647    }
648
649    mdb->pnl = f - fname;
650    if (mdb->pnl > 0) {
651       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
652       memcpy(mdb->path, fname, mdb->pnl);
653       mdb->path[mdb->pnl] = 0;
654    } else {
655       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
656       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
657       mdb->path[0] = 0;
658       mdb->pnl = 0;
659    }
660
661    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
662 }
663
664 /*
665  * Set maximum field length to something reasonable
666  */
667 static int max_length(int max_length)
668 {
669    int max_len = max_length;
670    /* Sanity check */
671    if (max_len < 0) {
672       max_len = 2;
673    } else if (max_len > 100) {
674       max_len = 100;
675    }
676    return max_len;
677 }
678
679 /*
680  * List dashes as part of header for listing SQL results in a table
681  */
682 void
683 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
684 {
685    SQL_FIELD  *field;
686    int i, j;
687    int len;
688
689    sql_field_seek(mdb, 0);
690    send(ctx, "+");
691    for (i = 0; i < sql_num_fields(mdb); i++) {
692       field = sql_fetch_field(mdb);
693       if (!field) {
694          break;
695       }
696       len = max_length(field->max_length + 2);
697       for (j = 0; j < len; j++) {
698          send(ctx, "-");
699       }
700       send(ctx, "+");
701    }
702    send(ctx, "\n");
703 }
704
705 /*
706  * If full_list is set, we list vertically, otherwise, we
707  * list on one line horizontally.
708  */
709 void
710 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
711 {
712    SQL_FIELD *field;
713    SQL_ROW row;
714    int i, col_len, max_len = 0;
715    char buf[2000], ewc[30];
716
717    Dmsg0(800, "list_result starts\n");
718    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
719       send(ctx, _("No results to list.\n"));
720       return;
721    }
722
723    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
724    /* determine column display widths */
725    sql_field_seek(mdb, 0);
726    for (i = 0; i < sql_num_fields(mdb); i++) {
727       Dmsg1(800, "list_result processing field %d\n", i);
728       field = sql_fetch_field(mdb);
729       if (!field) {
730          break;
731       }
732       col_len = cstrlen(field->name);
733       if (type == VERT_LIST) {
734          if (col_len > max_len) {
735             max_len = col_len;
736          }
737       } else {
738          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
739             field->max_length += (field->max_length - 1) / 3;
740          }
741          if (col_len < (int)field->max_length) {
742             col_len = field->max_length;
743          }
744          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
745             col_len = 4;                 /* 4 = length of the word "NULL" */
746          }
747          field->max_length = col_len;    /* reset column info */
748       }
749    }
750
751    Dmsg0(800, "list_result finished first loop\n");
752    if (type == VERT_LIST) {
753       goto vertical_list;
754    }
755
756    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
757    list_dashes(mdb, send, ctx);
758    send(ctx, "|");
759    sql_field_seek(mdb, 0);
760    for (i = 0; i < sql_num_fields(mdb); i++) {
761       Dmsg1(800, "list_result looking at field %d\n", i);
762       field = sql_fetch_field(mdb);
763       if (!field) {
764          break;
765       }
766       max_len = max_length(field->max_length);
767       bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
768       send(ctx, buf);
769    }
770    send(ctx, "\n");
771    list_dashes(mdb, send, ctx);
772
773    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
774    while ((row = sql_fetch_row(mdb)) != NULL) {
775       sql_field_seek(mdb, 0);
776       send(ctx, "|");
777       for (i = 0; i < sql_num_fields(mdb); i++) {
778          field = sql_fetch_field(mdb);
779          if (!field) {
780             break;
781          }
782          max_len = max_length(field->max_length);
783          if (row[i] == NULL) {
784             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
785          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
786             bsnprintf(buf, sizeof(buf), " %*s |", max_len,
787                       add_commas(row[i], ewc));
788          } else {
789             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
790          }
791          send(ctx, buf);
792       }
793       send(ctx, "\n");
794    }
795    list_dashes(mdb, send, ctx);
796    return;
797
798 vertical_list:
799
800    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
801    while ((row = sql_fetch_row(mdb)) != NULL) {
802       sql_field_seek(mdb, 0);
803       for (i = 0; i < sql_num_fields(mdb); i++) {
804          field = sql_fetch_field(mdb);
805          if (!field) {
806             break;
807          }
808          if (row[i] == NULL) {
809             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
810          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
811             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
812                 add_commas(row[i], ewc));
813          } else {
814             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
815          }
816          send(ctx, buf);
817       }
818       send(ctx, "\n");
819    }
820    return;
821 }
822
823 /* 
824  * Open a new connexion to mdb catalog. This function is used
825  * by batch and accurate mode.
826  */
827 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
828 {
829 #ifdef HAVE_BATCH_FILE_INSERT
830    const int multi_db = true;   /* we force a new connection only if batch insert is enabled */
831 #else
832    const int multi_db = false;
833 #endif
834
835    if (!jcr->db_batch) {
836       jcr->db_batch = db_init_database(jcr, 
837                                       mdb->db_name, 
838                                       mdb->db_user,
839                                       mdb->db_password, 
840                                       mdb->db_address,
841                                       mdb->db_port,
842                                       mdb->db_socket,
843                                       multi_db /* multi_db = true when using batch mode */);
844       if (!jcr->db_batch) {
845          Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
846          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
847          return false;
848       }
849
850       if (!db_open_database(jcr, jcr->db_batch)) {
851          Mmsg2(&mdb->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
852               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
853          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
854          return false;
855       }      
856       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
857             jcr->db_batch->connected, jcr->db_batch->db);
858
859    }
860    return true;
861 }
862
863 /*
864  * !!! WARNING !!! Use this function only when bacula is stopped.
865  * ie, after a fatal signal and before exiting the program
866  * Print information about a B_DB object.
867  */
868 void db_debug_print(JCR *jcr, FILE *fp)
869 {
870    B_DB *mdb = jcr->db;
871
872    if (!mdb) {
873       return;
874    }
875
876    fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
877            mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
878    fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
879    if (mdb->lock.valid == RWLOCK_VALID) { 
880       fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
881    }
882 }
883
884 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES*/