]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
Remove extra _ of _dbg_print_db func
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id$
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;        /* SQL engine type index */
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60               const char *db_password, const char *db_address, int db_port,
61               const char *db_socket, int mult_db_connections)
62 {
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else if (strcasecmp(p, "sqlite3") == 0) {
79       db_type = SQL_TYPE_SQLITE3;
80    } else {
81       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
82    }
83 #elif HAVE_MYSQL
84    db_type = SQL_TYPE_MYSQL;
85 #elif HAVE_POSTGRESQL
86    db_type = SQL_TYPE_POSTGRESQL;
87 #elif HAVE_SQLITE
88    db_type = SQL_TYPE_SQLITE;
89 #elif HAVE_SQLITE3
90    db_type = SQL_TYPE_SQLITE3;
91 #endif
92
93    return db_init_database(jcr, db_name, db_user, db_password, db_address,
94              db_port, db_socket, mult_db_connections);
95 }
96
97 dbid_list::dbid_list()
98 {
99    memset(this, 0, sizeof(dbid_list));
100    max_ids = 1000;
101    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
102    num_ids = num_seen = tot_ids = 0;
103    PurgedFiles = NULL;
104 }
105
106 dbid_list::~dbid_list()
107 {
108    free(DBId);
109 }
110
111 /*
112  * Called here to retrieve an integer from the database
113  */
114 int db_int_handler(void *ctx, int num_fields, char **row)
115 {
116    uint32_t *val = (uint32_t *)ctx;
117
118    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
119
120    if (row[0]) {
121       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
122       *val = str_to_int64(row[0]);
123    } else {
124       Dmsg0(800, "int_handler finds zero\n");
125       *val = 0;
126    }
127    Dmsg0(800, "int_handler finishes\n");
128    return 0;
129 }
130
131 /*
132  * Called here to retrieve a 32/64 bit integer from the database.
133  *   The returned integer will be extended to 64 bit.
134  */
135 int db_int64_handler(void *ctx, int num_fields, char **row)
136 {
137    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
138
139    if (row[0]) {
140       lctx->value = str_to_int64(row[0]);
141       lctx->count++;
142    }
143    return 0;
144 }
145
146 /*
147  * Use to build a comma separated list of values from a query. "10,20,30"
148  */
149 int db_list_handler(void *ctx, int num_fields, char **row)
150 {
151    db_list_ctx *lctx = (db_list_ctx *)ctx;
152    if (num_fields == 1 && row[0]) {
153       if (lctx->list[0]) {
154          pm_strcat(lctx->list, ",");
155       }
156       pm_strcat(lctx->list, row[0]);
157       lctx->count++;
158    }
159    return 0;
160 }
161
162
163 /*
164  * Called here to retrieve an integer from the database
165  */
166 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
167 {
168    uint32_t *val = (uint32_t *)ctx;
169    uint32_t index = sql_get_max_connections_index[db_type];
170    if (row[index]) {
171       *val = str_to_int64(row[index]);
172    } else {
173       Dmsg0(800, "int_handler finds zero\n");
174       *val = 0;
175    }
176    return 0;
177 }
178
179 /* 
180  * Check catalog max_connections setting
181  */
182 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
183 {
184    uint32_t max_conn=0;
185    int ret=true;
186
187    /* Without Batch insert, no need to verify max_connections */
188 #ifndef HAVE_BATCH_FILE_INSERT
189    return ret;
190 #endif
191
192    /* Check max_connections setting */
193    if (!db_sql_query(mdb, sql_get_max_connections[db_type], db_max_connections_handler, &max_conn)) {
194       Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
195       return ret;
196    }
197    if (max_conn && max_concurrent_jobs && max_concurrent_jobs > max_conn) {
198       Mmsg(mdb->errmsg, 
199            _("On db_name=%s, %s max_connections=%d is lower than Director MaxConcurentJobs=%d\n"),
200            mdb->db_name, db_get_type(), max_conn, max_concurrent_jobs);
201       Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
202       ret = false;
203    }
204
205    return ret;
206 }
207
208 /* NOTE!!! The following routines expect that the
209  *  calling subroutine sets and clears the mutex
210  */
211
212 /* Check that the tables correspond to the version we want */
213 bool check_tables_version(JCR *jcr, B_DB *mdb)
214 {
215    const char *query = "SELECT VersionId FROM Version";
216
217    bacula_db_version = 0;
218    if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
219       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
220       return false;
221    }
222    if (bacula_db_version != BDB_VERSION) {
223       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
224           mdb->db_name, BDB_VERSION, bacula_db_version);
225       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
226       return false;
227    }
228    return true;
229 }
230
231 /* Utility routine for queries. The database MUST be locked before calling here. */
232 int
233 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
234 {
235    int status;
236
237    sql_free_result(mdb);
238    if ((status=sql_query(mdb, cmd)) != 0) {
239       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
240       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
241       if (verbose) {
242          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
243       }
244       return 0;
245    }
246
247    mdb->result = sql_store_result(mdb);
248
249    return mdb->result != NULL;
250 }
251
252 /*
253  * Utility routine to do inserts
254  * Returns: 0 on failure
255  *          1 on success
256  */
257 int
258 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
259 {
260    if (sql_query(mdb, cmd)) {
261       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
262       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
263       if (verbose) {
264          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
265       }
266       return 0;
267    }
268    if (mdb->have_insert_id) {
269       mdb->num_rows = sql_affected_rows(mdb);
270    } else {
271       mdb->num_rows = 1;
272    }
273    if (mdb->num_rows != 1) {
274       char ed1[30];
275       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
276          edit_uint64(mdb->num_rows, ed1));
277       if (verbose) {
278          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
279       }
280       return 0;
281    }
282    mdb->changes++;
283    return 1;
284 }
285
286 /* Utility routine for updates.
287  *  Returns: 0 on failure
288  *           1 on success
289  */
290 int
291 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
292 {
293
294    if (sql_query(mdb, cmd)) {
295       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
296       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
297       if (verbose) {
298          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
299       }
300       return 0;
301    }
302    mdb->num_rows = sql_affected_rows(mdb);
303    if (mdb->num_rows < 1) {
304       char ed1[30];
305       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
306          edit_uint64(mdb->num_rows, ed1), cmd);
307       if (verbose) {
308 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
309       }
310       return 0;
311    }
312    mdb->changes++;
313    return 1;
314 }
315
316 /* Utility routine for deletes
317  *
318  * Returns: -1 on error
319  *           n number of rows affected
320  */
321 int
322 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
323 {
324
325    if (sql_query(mdb, cmd)) {
326       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
327       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
328       if (verbose) {
329          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
330       }
331       return -1;
332    }
333    mdb->changes++;
334    return sql_affected_rows(mdb);
335 }
336
337
338 /*
339  * Get record max. Query is already in mdb->cmd
340  *  No locking done
341  *
342  * Returns: -1 on failure
343  *          count on success
344  */
345 int get_sql_record_max(JCR *jcr, B_DB *mdb)
346 {
347    SQL_ROW row;
348    int stat = 0;
349
350    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
351       if ((row = sql_fetch_row(mdb)) == NULL) {
352          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
353          stat = -1;
354       } else {
355          stat = str_to_int64(row[0]);
356       }
357       sql_free_result(mdb);
358    } else {
359       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
360       stat = -1;
361    }
362    return stat;
363 }
364
365 /*
366  * Return pre-edited error message
367  */
368 char *db_strerror(B_DB *mdb)
369 {
370    return mdb->errmsg;
371 }
372
373 /*
374  * Lock database, this can be called multiple times by the same
375  *   thread without blocking, but must be unlocked the number of
376  *   times it was locked.
377  */
378 void _db_lock(const char *file, int line, B_DB *mdb)
379 {
380    int errstat;
381    if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
382       berrno be;
383       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
384            errstat, be.bstrerror(errstat));
385    }
386 }
387
388 /*
389  * Unlock the database. This can be called multiple times by the
390  *   same thread up to the number of times that thread called
391  *   db_lock()/
392  */
393 void _db_unlock(const char *file, int line, B_DB *mdb)
394 {
395    int errstat;
396    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
397       berrno be;
398       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
399            errstat, be.bstrerror(errstat));
400    }
401 }
402
403 /*
404  * Start a transaction. This groups inserts and makes things
405  *  much more efficient. Usually started when inserting
406  *  file attributes.
407  */
408 void db_start_transaction(JCR *jcr, B_DB *mdb)
409 {
410    if (!jcr->attr) {
411       jcr->attr = get_pool_memory(PM_FNAME);
412    }
413    if (!jcr->ar) {
414       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
415    }
416
417 #ifdef HAVE_SQLITE
418    if (!mdb->allow_transactions) {
419       return;
420    }
421    db_lock(mdb);
422    /* Allow only 10,000 changes per transaction */
423    if (mdb->transaction && mdb->changes > 10000) {
424       db_end_transaction(jcr, mdb);
425    }
426    if (!mdb->transaction) {
427       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
428       Dmsg0(400, "Start SQLite transaction\n");
429       mdb->transaction = 1;
430    }
431    db_unlock(mdb);
432 #endif
433
434 /*
435  * This is turned off because transactions break
436  * if multiple simultaneous jobs are run.
437  */
438 #ifdef HAVE_POSTGRESQL
439    if (!mdb->allow_transactions) {
440       return;
441    }
442    db_lock(mdb);
443    /* Allow only 25,000 changes per transaction */
444    if (mdb->transaction && mdb->changes > 25000) {
445       db_end_transaction(jcr, mdb);
446    }
447    if (!mdb->transaction) {
448       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
449       Dmsg0(400, "Start PosgreSQL transaction\n");
450       mdb->transaction = 1;
451    }
452    db_unlock(mdb);
453 #endif
454
455 #ifdef HAVE_DBI
456    if (db_type == SQL_TYPE_SQLITE) {
457       if (!mdb->allow_transactions) {
458          return;
459       }
460       db_lock(mdb);
461       /* Allow only 10,000 changes per transaction */
462       if (mdb->transaction && mdb->changes > 10000) {
463          db_end_transaction(jcr, mdb);
464       }
465       if (!mdb->transaction) {
466          //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
467          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
468          Dmsg0(400, "Start SQLite transaction\n");
469          mdb->transaction = 1;
470       }
471       db_unlock(mdb);
472    } else if (db_type == SQL_TYPE_POSTGRESQL) {
473       if (!mdb->allow_transactions) {
474          return;
475       }
476       db_lock(mdb);
477       /* Allow only 25,000 changes per transaction */
478       if (mdb->transaction && mdb->changes > 25000) {
479          db_end_transaction(jcr, mdb);
480       }
481       if (!mdb->transaction) {
482          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
483          Dmsg0(400, "Start PosgreSQL transaction\n");
484          mdb->transaction = 1;
485       }
486       db_unlock(mdb);
487    }
488 #endif
489 }
490
491 void db_end_transaction(JCR *jcr, B_DB *mdb)
492 {
493    /*
494     * This can be called during thread cleanup and
495     *   the db may already be closed.  So simply return.
496     */
497    if (!mdb) {
498       return;
499    }
500
501    if (jcr && jcr->cached_attribute) {
502       Dmsg0(400, "Flush last cached attribute.\n");
503       if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
504          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
505       }
506       jcr->cached_attribute = false;
507    }
508
509 #ifdef HAVE_SQLITE
510    if (!mdb->allow_transactions) {
511       return;
512    }
513    db_lock(mdb);
514    if (mdb->transaction) {
515       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
516       mdb->transaction = 0;
517       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
518    }
519    mdb->changes = 0;
520    db_unlock(mdb);
521 #endif
522
523 #ifdef HAVE_POSTGRESQL
524    if (!mdb->allow_transactions) {
525       return;
526    }
527    db_lock(mdb);
528    if (mdb->transaction) {
529       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
530       mdb->transaction = 0;
531       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
532    }
533    mdb->changes = 0;
534    db_unlock(mdb);
535 #endif
536
537 #ifdef HAVE_DBI
538    if (db_type == SQL_TYPE_SQLITE) {
539       if (!mdb->allow_transactions) {
540          return;
541       }
542       db_lock(mdb);
543       if (mdb->transaction) {
544          //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
545          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
546          mdb->transaction = 0;
547          Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
548       }
549       mdb->changes = 0;
550       db_unlock(mdb);
551    } else if (db_type == SQL_TYPE_POSTGRESQL) {
552       if (!mdb->allow_transactions) {
553          return;
554       }
555       db_lock(mdb);
556       if (mdb->transaction) {
557          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
558          mdb->transaction = 0;
559          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
560       }
561       mdb->changes = 0;
562       db_unlock(mdb);
563    }
564 #endif
565 }
566
567 /*
568  * Given a full filename, split it into its path
569  *  and filename parts. They are returned in pool memory
570  *  in the mdb structure.
571  */
572 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
573 {
574    const char *p, *f;
575
576    /* Find path without the filename.
577     * I.e. everything after the last / is a "filename".
578     * OK, maybe it is a directory name, but we treat it like
579     * a filename. If we don't find a / then the whole name
580     * must be a path name (e.g. c:).
581     */
582    for (p=f=fname; *p; p++) {
583       if (IsPathSeparator(*p)) {
584          f = p;                       /* set pos of last slash */
585       }
586    }
587    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
588       f++;                            /* yes, point to filename */
589    } else {                           /* no, whole thing must be path name */
590       f = p;
591    }
592
593    /* If filename doesn't exist (i.e. root directory), we
594     * simply create a blank name consisting of a single
595     * space. This makes handling zero length filenames
596     * easier.
597     */
598    mdb->fnl = p - f;
599    if (mdb->fnl > 0) {
600       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
601       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
602       mdb->fname[mdb->fnl] = 0;
603    } else {
604       mdb->fname[0] = 0;
605       mdb->fnl = 0;
606    }
607
608    mdb->pnl = f - fname;
609    if (mdb->pnl > 0) {
610       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
611       memcpy(mdb->path, fname, mdb->pnl);
612       mdb->path[mdb->pnl] = 0;
613    } else {
614       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
615       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
616       mdb->path[0] = 0;
617       mdb->pnl = 0;
618    }
619
620    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
621 }
622
623 /*
624  * Set maximum field length to something reasonable
625  */
626 static int max_length(int max_length)
627 {
628    int max_len = max_length;
629    /* Sanity check */
630    if (max_len < 0) {
631       max_len = 2;
632    } else if (max_len > 100) {
633       max_len = 100;
634    }
635    return max_len;
636 }
637
638 /*
639  * List dashes as part of header for listing SQL results in a table
640  */
641 void
642 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
643 {
644    SQL_FIELD  *field;
645    int i, j;
646    int len;
647
648    sql_field_seek(mdb, 0);
649    send(ctx, "+");
650    for (i = 0; i < sql_num_fields(mdb); i++) {
651       field = sql_fetch_field(mdb);
652       if (!field) {
653          break;
654       }
655       len = max_length(field->max_length + 2);
656       for (j = 0; j < len; j++) {
657          send(ctx, "-");
658       }
659       send(ctx, "+");
660    }
661    send(ctx, "\n");
662 }
663
664 /*
665  * If full_list is set, we list vertically, otherwise, we
666  * list on one line horizontally.
667  */
668 void
669 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
670 {
671    SQL_FIELD *field;
672    SQL_ROW row;
673    int i, col_len, max_len = 0;
674    char buf[2000], ewc[30];
675
676    Dmsg0(800, "list_result starts\n");
677    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
678       send(ctx, _("No results to list.\n"));
679       return;
680    }
681
682    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
683    /* determine column display widths */
684    sql_field_seek(mdb, 0);
685    for (i = 0; i < sql_num_fields(mdb); i++) {
686       Dmsg1(800, "list_result processing field %d\n", i);
687       field = sql_fetch_field(mdb);
688       if (!field) {
689          break;
690       }
691       col_len = cstrlen(field->name);
692       if (type == VERT_LIST) {
693          if (col_len > max_len) {
694             max_len = col_len;
695          }
696       } else {
697          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
698             field->max_length += (field->max_length - 1) / 3;
699          }
700          if (col_len < (int)field->max_length) {
701             col_len = field->max_length;
702          }
703          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
704             col_len = 4;                 /* 4 = length of the word "NULL" */
705          }
706          field->max_length = col_len;    /* reset column info */
707       }
708    }
709
710    Dmsg0(800, "list_result finished first loop\n");
711    if (type == VERT_LIST) {
712       goto vertical_list;
713    }
714
715    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
716    list_dashes(mdb, send, ctx);
717    send(ctx, "|");
718    sql_field_seek(mdb, 0);
719    for (i = 0; i < sql_num_fields(mdb); i++) {
720       Dmsg1(800, "list_result looking at field %d\n", i);
721       field = sql_fetch_field(mdb);
722       if (!field) {
723          break;
724       }
725       max_len = max_length(field->max_length);
726       bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
727       send(ctx, buf);
728    }
729    send(ctx, "\n");
730    list_dashes(mdb, send, ctx);
731
732    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
733    while ((row = sql_fetch_row(mdb)) != NULL) {
734       sql_field_seek(mdb, 0);
735       send(ctx, "|");
736       for (i = 0; i < sql_num_fields(mdb); i++) {
737          field = sql_fetch_field(mdb);
738          if (!field) {
739             break;
740          }
741          max_len = max_length(field->max_length);
742          if (row[i] == NULL) {
743             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
744          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
745             bsnprintf(buf, sizeof(buf), " %*s |", max_len,
746                       add_commas(row[i], ewc));
747          } else {
748             bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
749          }
750          send(ctx, buf);
751       }
752       send(ctx, "\n");
753    }
754    list_dashes(mdb, send, ctx);
755    return;
756
757 vertical_list:
758
759    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
760    while ((row = sql_fetch_row(mdb)) != NULL) {
761       sql_field_seek(mdb, 0);
762       for (i = 0; i < sql_num_fields(mdb); i++) {
763          field = sql_fetch_field(mdb);
764          if (!field) {
765             break;
766          }
767          if (row[i] == NULL) {
768             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
769          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
770             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
771                 add_commas(row[i], ewc));
772          } else {
773             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
774          }
775          send(ctx, buf);
776       }
777       send(ctx, "\n");
778    }
779    return;
780 }
781
782 /* 
783  * Open a new connexion to mdb catalog. This function is used
784  * by batch and accurate mode.
785  */
786 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
787 {
788 #ifdef HAVE_BATCH_FILE_INSERT
789    const int multi_db = true;   /* we force a new connection only if batch insert is enabled */
790 #else
791    const int multi_db = false;
792 #endif
793
794    if (!jcr->db_batch) {
795       jcr->db_batch = db_init_database(jcr, 
796                                       mdb->db_name, 
797                                       mdb->db_user,
798                                       mdb->db_password, 
799                                       mdb->db_address,
800                                       mdb->db_port,
801                                       mdb->db_socket,
802                                       multi_db /* multi_db = true when using batch mode */);
803       if (!jcr->db_batch) {
804          Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
805          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
806          return false;
807       }
808
809       if (!db_open_database(jcr, jcr->db_batch)) {
810          Mmsg2(&mdb->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
811               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
812          Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
813          return false;
814       }      
815       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
816             jcr->db_batch->connected, jcr->db_batch->db);
817
818    }
819    return true;
820 }
821
822 /*
823  * !!! WARNING !!! Use this function only when bacula is stopped.
824  * ie, after a fatal signal and before exiting the program
825  * Print information about a B_DB object.
826  */
827 void dbg_print_db(JCR *jcr, FILE *fp)
828 {
829    B_DB *mdb = jcr->db;
830
831    if (!mdb) {
832       return;
833    }
834
835    fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
836            mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
837    fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
838    if (mdb->lock.valid == RWLOCK_VALID) { 
839       fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
840    }
841 }
842
843 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/