]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
small cleanup
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id$
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;        /* SQL engine type index */
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60               const char *db_password, const char *db_address, int db_port,
61               const char *db_socket, int mult_db_connections)
62 {
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else if (strcasecmp(p, "sqlite3") == 0) {
79       db_type = SQL_TYPE_SQLITE3;
80    } else {
81       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
82    }
83 #elif HAVE_MYSQL
84    db_type = SQL_TYPE_MYSQL;
85 #elif HAVE_POSTGRESQL
86    db_type = SQL_TYPE_POSTGRESQL;
87 #elif HAVE_SQLITE
88    db_type = SQL_TYPE_SQLITE;
89 #elif HAVE_SQLITE3
90    db_type = SQL_TYPE_SQLITE3;
91 #endif
92
93    return db_init_database(jcr, db_name, db_user, db_password, db_address,
94              db_port, db_socket, mult_db_connections);
95 }
96
97 dbid_list::dbid_list()
98 {
99    memset(this, 0, sizeof(dbid_list));
100    max_ids = 1000;
101    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
102    num_ids = num_seen = tot_ids = 0;
103    PurgedFiles = NULL;
104 }
105
106 dbid_list::~dbid_list()
107 {
108    free(DBId);
109 }
110
111 /*
112  * Called here to retrieve an integer from the database
113  */
114 int db_int_handler(void *ctx, int num_fields, char **row)
115 {
116    uint32_t *val = (uint32_t *)ctx;
117
118    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
119
120    if (row[0]) {
121       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
122       *val = str_to_int64(row[0]);
123    } else {
124       Dmsg0(800, "int_handler finds zero\n");
125       *val = 0;
126    }
127    Dmsg0(800, "int_handler finishes\n");
128    return 0;
129 }
130
131 /*
132  * Called here to retrieve a 32/64 bit integer from the database.
133  *   The returned integer will be extended to 64 bit.
134  */
135 int db_int64_handler(void *ctx, int num_fields, char **row)
136 {
137    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
138
139    if (row[0]) {
140       lctx->value = str_to_int64(row[0]);
141       lctx->count++;
142    }
143    return 0;
144 }
145
146 /*
147  * Use to build a comma separated list of values from a query. "10,20,30"
148  */
149 int db_list_handler(void *ctx, int num_fields, char **row)
150 {
151    db_list_ctx *lctx = (db_list_ctx *)ctx;
152    if (num_fields == 1 && row[0]) {
153       if (lctx->list[0]) {
154          pm_strcat(lctx->list, ",");
155       }
156       pm_strcat(lctx->list, row[0]);
157       lctx->count++;
158    }
159    return 0;
160 }
161
162
163 /*
164  * Called here to retrieve an integer from the database
165  */
166 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
167 {
168    uint32_t *val = (uint32_t *)ctx;
169    uint32_t index = sql_get_max_connections_index[db_type];
170    if (row[index]) {
171       *val = str_to_int64(row[index]);
172    } else {
173       Dmsg0(800, "int_handler finds zero\n");
174       *val = 0;
175    }
176    return 0;
177 }
178
179 /* 
180  * Check catalog max_connections setting
181  */
182 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
183 {
184    uint32_t max_conn=0;
185    int ret=true;
186
187    /* Without Batch insert, no need to verify max_connections */
188 #ifndef HAVE_BATCH_FILE_INSERT
189    return ret;
190 #endif
191
192    /* Check max_connections setting */
193    if (!db_sql_query(mdb, sql_get_max_connections[db_type], 
194                      db_max_connections_handler, &max_conn)) {
195       Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
196       return ret;
197    }
198    if (max_conn && max_concurrent_jobs && max_concurrent_jobs > max_conn) {
199       Mmsg(mdb->errmsg, 
200            _("On db_name=%s, %s max_connections=%d is lower than Director "
201              "MaxConcurentJobs=%d\n"),
202            mdb->db_name, db_get_type(), max_conn, max_concurrent_jobs);
203       Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
204       ret = false;
205    }
206
207    return ret;
208 }
209
210 /* NOTE!!! The following routines expect that the
211  *  calling subroutine sets and clears the mutex
212  */
213
214 /* Check that the tables correspond to the version we want */
215 bool check_tables_version(JCR *jcr, B_DB *mdb)
216 {
217    const char *query = "SELECT VersionId FROM Version";
218
219    bacula_db_version = 0;
220    if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
221       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
222       return false;
223    }
224    if (bacula_db_version != BDB_VERSION) {
225       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
226           mdb->db_name, BDB_VERSION, bacula_db_version);
227       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
228       return false;
229    }
230    return true;
231 }
232
233 /* Utility routine for queries. The database MUST be locked before calling here. */
234 int
235 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
236 {
237    int status;
238
239    sql_free_result(mdb);
240    if ((status=sql_query(mdb, cmd)) != 0) {
241       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
242       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
243       if (verbose) {
244          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
245       }
246       return 0;
247    }
248
249    mdb->result = sql_store_result(mdb);
250
251    return mdb->result != NULL;
252 }
253
254 /*
255  * Utility routine to do inserts
256  * Returns: 0 on failure
257  *          1 on success
258  */
259 int
260 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
261 {
262    if (sql_query(mdb, cmd)) {
263       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
264       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
265       if (verbose) {
266          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
267       }
268       return 0;
269    }
270    if (mdb->have_insert_id) {
271       mdb->num_rows = sql_affected_rows(mdb);
272    } else {
273       mdb->num_rows = 1;
274    }
275    if (mdb->num_rows != 1) {
276       char ed1[30];
277       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
278          edit_uint64(mdb->num_rows, ed1));
279       if (verbose) {
280          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
281       }
282       return 0;
283    }
284    mdb->changes++;
285    return 1;
286 }
287
288 /* Utility routine for updates.
289  *  Returns: 0 on failure
290  *           1 on success
291  */
292 int
293 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
294 {
295
296    if (sql_query(mdb, cmd)) {
297       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
298       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
299       if (verbose) {
300          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
301       }
302       return 0;
303    }
304    mdb->num_rows = sql_affected_rows(mdb);
305    if (mdb->num_rows < 1) {
306       char ed1[30];
307       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
308          edit_uint64(mdb->num_rows, ed1), cmd);
309       if (verbose) {
310 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
311       }
312       return 0;
313    }
314    mdb->changes++;
315    return 1;
316 }
317
318 /* Utility routine for deletes
319  *
320  * Returns: -1 on error
321  *           n number of rows affected
322  */
323 int
324 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
325 {
326
327    if (sql_query(mdb, cmd)) {
328       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
329       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
330       if (verbose) {
331          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
332       }
333       return -1;
334    }
335    mdb->changes++;
336    return sql_affected_rows(mdb);
337 }
338
339
340 /*
341  * Get record max. Query is already in mdb->cmd
342  *  No locking done
343  *
344  * Returns: -1 on failure
345  *          count on success
346  */
347 int get_sql_record_max(JCR *jcr, B_DB *mdb)
348 {
349    SQL_ROW row;
350    int stat = 0;
351
352    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
353       if ((row = sql_fetch_row(mdb)) == NULL) {
354          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
355          stat = -1;
356       } else {
357          stat = str_to_int64(row[0]);
358       }
359       sql_free_result(mdb);
360    } else {
361       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
362       stat = -1;
363    }
364    return stat;
365 }
366
367 /*
368  * Return pre-edited error message
369  */
370 char *db_strerror(B_DB *mdb)
371 {
372    return mdb->errmsg;
373 }
374
375 /*
376  * Lock database, this can be called multiple times by the same
377  *   thread without blocking, but must be unlocked the number of
378  *   times it was locked.
379  */
380 void _db_lock(const char *file, int line, B_DB *mdb)
381 {
382    int errstat;
383    if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
384       berrno be;
385       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
386            errstat, be.bstrerror(errstat));
387    }
388 }
389
390 /*
391  * Unlock the database. This can be called multiple times by the
392  *   same thread up to the number of times that thread called
393  *   db_lock()/
394  */
395 void _db_unlock(const char *file, int line, B_DB *mdb)
396 {
397    int errstat;
398    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
399       berrno be;
400       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
401            errstat, be.bstrerror(errstat));
402    }
403 }
404
405 /*
406  * Start a transaction. This groups inserts and makes things
407  *  much more efficient. Usually started when inserting
408  *  file attributes.
409  */
410 void db_start_transaction(JCR *jcr, B_DB *mdb)
411 {
412    if (!jcr->attr) {
413       jcr->attr = get_pool_memory(PM_FNAME);
414    }
415    if (!jcr->ar) {
416       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
417    }
418
419 #ifdef HAVE_SQLITE
420    if (!mdb->allow_transactions) {
421       return;
422    }
423    db_lock(mdb);
424    /* Allow only 10,000 changes per transaction */
425    if (mdb->transaction && mdb->changes > 10000) {
426       db_end_transaction(jcr, mdb);
427    }
428    if (!mdb->transaction) {
429       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
430       Dmsg0(400, "Start SQLite transaction\n");
431       mdb->transaction = 1;
432    }
433    db_unlock(mdb);
434 #endif
435
436 /*
437  * This is turned off because transactions break
438  * if multiple simultaneous jobs are run.
439  */
440 #ifdef HAVE_POSTGRESQL
441    if (!mdb->allow_transactions) {
442       return;
443    }
444    db_lock(mdb);
445    /* Allow only 25,000 changes per transaction */
446    if (mdb->transaction && mdb->changes > 25000) {
447       db_end_transaction(jcr, mdb);
448    }
449    if (!mdb->transaction) {
450       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
451       Dmsg0(400, "Start PosgreSQL transaction\n");
452       mdb->transaction = 1;
453    }
454    db_unlock(mdb);
455 #endif
456
457 #ifdef HAVE_DBI
458    if (db_type == SQL_TYPE_SQLITE) {
459       if (!mdb->allow_transactions) {
460          return;
461       }
462       db_lock(mdb);
463       /* Allow only 10,000 changes per transaction */
464       if (mdb->transaction && mdb->changes > 10000) {
465          db_end_transaction(jcr, mdb);
466       }
467       if (!mdb->transaction) {
468          //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
469          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
470          Dmsg0(400, "Start SQLite transaction\n");
471          mdb->transaction = 1;
472       }
473       db_unlock(mdb);
474    } else if (db_type == SQL_TYPE_POSTGRESQL) {
475       if (!mdb->allow_transactions) {
476          return;
477       }
478       db_lock(mdb);
479       /* Allow only 25,000 changes per transaction */
480       if (mdb->transaction && mdb->changes > 25000) {
481          db_end_transaction(jcr, mdb);
482       }
483       if (!mdb->transaction) {
484          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
485          Dmsg0(400, "Start PosgreSQL transaction\n");
486          mdb->transaction = 1;
487       }
488       db_unlock(mdb);
489    }
490 #endif
491 }
492
493 void db_end_transaction(JCR *jcr, B_DB *mdb)
494 {
495    /*
496     * This can be called during thread cleanup and
497     *   the db may already be closed.  So simply return.
498     */
499    if (!mdb) {
500       return;
501    }
502
503    if (jcr && jcr->cached_attribute) {
504       Dmsg0(400, "Flush last cached attribute.\n");
505       if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
506          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
507       }
508       jcr->cached_attribute = false;
509    }
510
511 #ifdef HAVE_SQLITE
512    if (!mdb->allow_transactions) {
513       return;
514    }
515    db_lock(mdb);
516    if (mdb->transaction) {
517       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
518       mdb->transaction = 0;
519       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
520    }
521    mdb->changes = 0;
522    db_unlock(mdb);
523 #endif
524
525 #ifdef HAVE_POSTGRESQL
526    if (!mdb->allow_transactions) {
527       return;
528    }
529    db_lock(mdb);
530    if (mdb->transaction) {
531       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
532       mdb->transaction = 0;
533       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
534    }
535    mdb->changes = 0;
536    db_unlock(mdb);
537 #endif
538
539 #ifdef HAVE_DBI
540    if (db_type == SQL_TYPE_SQLITE) {
541       if (!mdb->allow_transactions) {
542          return;
543       }
544       db_lock(mdb);
545       if (mdb->transaction) {
546          //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
547          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
548          mdb->transaction = 0;
549          Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
550       }
551       mdb->changes = 0;
552       db_unlock(mdb);
553    } else if (db_type == SQL_TYPE_POSTGRESQL) {
554       if (!mdb->allow_transactions) {
555          return;
556       }
557       db_lock(mdb);
558       if (mdb->transaction) {
559          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
560          mdb->transaction = 0;
561          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
562       }
563       mdb->changes = 0;
564       db_unlock(mdb);
565    }
566 #endif
567 }
568
569 /*
570  * Given a full filename, split it into its path
571  *  and filename parts. They are returned in pool memory
572  *  in the mdb structure.
573  */
574 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
575 {
576    const char *p, *f;
577
578    /* Find path without the filename.
579     * I.e. everything after the last / is a "filename".
580     * OK, maybe it is a directory name, but we treat it like
581     * a filename. If we don't find a / then the whole name
582     * must be a path name (e.g. c:).
583     */
584    for (p=f=fname; *p; p++) {
585       if (IsPathSeparator(*p)) {
586          f = p;                       /* set pos of last slash */
587       }
588    }
589    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
590       f++;                            /* yes, point to filename */
591    } else {                           /* no, whole thing must be path name */
592       f = p;
593    }
594
595    /* If filename doesn't exist (i.e. root directory), we
596     * simply create a blank name consisting of a single
597     * space. This makes handling zero length filenames
598     * easier.
599     */
600    mdb->fnl = p - f;
601    if (mdb->fnl > 0) {
602       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
603       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
604       mdb->fname[mdb->fnl] = 0;
605    } else {
606       mdb->fname[0] = 0;
607       mdb->fnl = 0;
608    }
609
610    mdb->pnl = f - fname;
611    if (mdb->pnl > 0) {
612       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
613       memcpy(mdb->path, fname, mdb->pnl);
614       mdb->path[mdb->pnl] = 0;
615    } else {
616       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
617       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
618       mdb->path[0] = 0;
619       mdb->pnl = 0;
620    }
621
622    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
623 }
624
625 /*
626  * Set maximum field length to something reasonable
627  */
628 static int max_length(int max_length)
629 {
630    int max_len = max_length;
631    /* Sanity check */
632    if (max_len < 0) {
633       max_len = 2;
634    } else if (max_len > 100) {
635       max_len = 100;
636    }
637    return max_len;
638 }
639
640 /*
641  * List dashes as part of header for listing SQL results in a table
642  */
643 void
644 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
645 {
646    SQL_FIELD  *field;
647    int i, j;
648    int len;
649
650    sql_field_seek(mdb, 0);
651    send(ctx, "+");
652    for (i = 0; i < sql_num_fields(mdb); i++) {
653       field = sql_fetch_field(mdb);
654       if (!field) {
655          break;
656       }
657       len = max_length(field->max_length + 2);
658       for (j = 0; j < len; j++) {
659          send(ctx, "-");
660       }
661       send(ctx, "+");
662    }
663    send(ctx, "\n");
664 }
665
666 /*
667  * If full_list is set, we list vertically, otherwise, we
668  * list on one line horizontally.
669  */
670 void
671 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
672 {
673    SQL_FIELD *field;
674    SQL_ROW row;
675    int i, col_len, max_len = 0;
676    char buf[2000], ewc[30];
677
678    Dmsg0(800, "list_result starts\n");
679    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
680       send(ctx, _("No results to list.\n"));
681       return;
682    }
683
684    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
685    /* determine column display widths */
686    sql_field_seek(mdb, 0);
687    for (i = 0; i < sql_num_fields(mdb); i++) {
688       Dmsg1(800, "list_result processing field %d\n", i);
689       field = sql_fetch_field(mdb);
690       if (!field) {
691          break;
692       }
693       col_len = cstrlen(field->name);
694       if (type == VERT_LIST) {
695          if (col_len > max_len) {
696             max_len = col_len;
697          }
698       } else {
699          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
700             field->max_length += (field->max_length - 1) / 3;
701          }
702          if (col_len < (int)field->max_length) {
703             col_len = field->max_length;
704          }
705          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
706             col_len = 4;                 /* 4 = length of the word "NULL" */
707          }
708          field->max_length = col_len;    /* reset column info */
709       }
710    }
711
712    Dmsg0(800, "list_result finished first loop\n");
713    if (type == VERT_LIST) {
714       goto vertical_list;
715    }
716
717    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
718    list_dashes(mdb, send, ctx);
719    send(ctx, "|");
720    sql_field_seek(mdb, 0);
721    for (i = 0; i < sql_num_fields(mdb); i++) {
722       Dmsg1(800, "list_result looking at field %d\n", i);
723       field = sql_fetch_field(mdb);
724       if (!field) {
725          break;
726       }
727       max_len = max_length(field->max_length);
728       bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
729       send(ctx, buf);
730    }
731    send(ctx, "\n");
732    list_dashes(mdb, send, ctx);
733
734    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
735    while ((row = sql_fetch_row(mdb)) != NULL) {
736       sql_field_seek(mdb, 0);
737       send(ctx, "|");
738       for (i = 0; i < sql_num_fields(mdb); i++) {
739          field = sql_fetch_field(mdb);
740          if (!field) {
741             break;
742          }
743          max_len = max_length(field->max_length);
744          if (row[i] == NULL) {
745             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
746          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
747             bsnprintf(buf, sizeof(buf), " %*s |", max_len,
748                       add_commas(row[i], ewc));
749          } else {
750             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
751          }
752          send(ctx, buf);
753       }
754       send(ctx, "\n");
755    }
756    list_dashes(mdb, send, ctx);
757    return;
758
759 vertical_list:
760
761    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
762    while ((row = sql_fetch_row(mdb)) != NULL) {
763       sql_field_seek(mdb, 0);
764       for (i = 0; i < sql_num_fields(mdb); i++) {
765          field = sql_fetch_field(mdb);
766          if (!field) {
767             break;
768          }
769          if (row[i] == NULL) {
770             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
771          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
772             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
773                 add_commas(row[i], ewc));
774          } else {
775             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
776          }
777          send(ctx, buf);
778       }
779       send(ctx, "\n");
780    }
781    return;
782 }
783
784 /* 
785  * Open a new connexion to mdb catalog. This function is used
786  * by batch and accurate mode.
787  */
788 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
789 {
790 #ifdef HAVE_BATCH_FILE_INSERT
791    const int multi_db = true;   /* we force a new connection only if batch insert is enabled */
792 #else
793    const int multi_db = false;
794 #endif
795
796    if (!jcr->db_batch) {
797       jcr->db_batch = db_init_database(jcr, 
798                                       mdb->db_name, 
799                                       mdb->db_user,
800                                       mdb->db_password, 
801                                       mdb->db_address,
802                                       mdb->db_port,
803                                       mdb->db_socket,
804                                       multi_db /* multi_db = true when using batch mode */);
805       if (!jcr->db_batch) {
806          Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
807          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
808          return false;
809       }
810
811       if (!db_open_database(jcr, jcr->db_batch)) {
812          Mmsg2(&mdb->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
813               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
814          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
815          return false;
816       }      
817       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
818             jcr->db_batch->connected, jcr->db_batch->db);
819
820    }
821    return true;
822 }
823
824 /*
825  * !!! WARNING !!! Use this function only when bacula is stopped.
826  * ie, after a fatal signal and before exiting the program
827  * Print information about a B_DB object.
828  */
829 void dbg_print_db(JCR *jcr, FILE *fp)
830 {
831    B_DB *mdb = jcr->db;
832
833    if (!mdb) {
834       return;
835    }
836
837    fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
838            mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
839    fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
840    if (mdb->lock.valid == RWLOCK_VALID) { 
841       fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
842    }
843 }
844
845 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/