]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
ebl Fix mysql compilation problem with debug lock code. Add
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id$
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;        /* SQL engine type index */
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60               const char *db_password, const char *db_address, int db_port,
61               const char *db_socket, int mult_db_connections)
62 {
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else if (strcasecmp(p, "sqlite3") == 0) {
79       db_type = SQL_TYPE_SQLITE3;
80    } else {
81       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
82    }
83 #elif HAVE_MYSQL
84    db_type = SQL_TYPE_MYSQL;
85 #elif HAVE_POSTGRESQL
86    db_type = SQL_TYPE_POSTGRESQL;
87 #elif HAVE_SQLITE
88    db_type = SQL_TYPE_SQLITE;
89 #elif HAVE_SQLITE3
90    db_type = SQL_TYPE_SQLITE3;
91 #endif
92
93    return db_init_database(jcr, db_name, db_user, db_password, db_address,
94              db_port, db_socket, mult_db_connections);
95 }
96
97 dbid_list::dbid_list()
98 {
99    memset(this, 0, sizeof(dbid_list));
100    max_ids = 1000;
101    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
102    num_ids = num_seen = tot_ids = 0;
103    PurgedFiles = NULL;
104 }
105
106 dbid_list::~dbid_list()
107 {
108    free(DBId);
109 }
110
111
112 /*
113  * Called here to retrieve an integer from the database
114  */
115 static int int_handler(void *ctx, int num_fields, char **row)
116 {
117    uint32_t *val = (uint32_t *)ctx;
118
119    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
120
121    if (row[0]) {
122       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
123       *val = str_to_int64(row[0]);
124    } else {
125       Dmsg0(800, "int_handler finds zero\n");
126       *val = 0;
127    }
128    Dmsg0(800, "int_handler finishes\n");
129    return 0;
130 }
131
132 /*
133  * Called here to retrieve a 32/64 bit integer from the database.
134  *   The returned integer will be extended to 64 bit.
135  */
136 int db_int64_handler(void *ctx, int num_fields, char **row)
137 {
138    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
139
140    if (row[0]) {
141       lctx->value = str_to_int64(row[0]);
142       lctx->count++;
143    }
144    return 0;
145 }
146
147
148
149 /* NOTE!!! The following routines expect that the
150  *  calling subroutine sets and clears the mutex
151  */
152
153 /* Check that the tables correspond to the version we want */
154 bool check_tables_version(JCR *jcr, B_DB *mdb)
155 {
156    const char *query = "SELECT VersionId FROM Version";
157
158    bacula_db_version = 0;
159    if (!db_sql_query(mdb, query, int_handler, (void *)&bacula_db_version)) {
160       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
161       return false;
162    }
163    if (bacula_db_version != BDB_VERSION) {
164       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
165           mdb->db_name, BDB_VERSION, bacula_db_version);
166       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
167       return false;
168    }
169    return true;
170 }
171
172 /* Utility routine for queries. The database MUST be locked before calling here. */
173 int
174 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
175 {
176    int status;
177
178    sql_free_result(mdb);
179    if ((status=sql_query(mdb, cmd)) != 0) {
180       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
181       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
182       if (verbose) {
183          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
184       }
185       return 0;
186    }
187
188    mdb->result = sql_store_result(mdb);
189
190    return mdb->result != NULL;
191 }
192
193 /*
194  * Utility routine to do inserts
195  * Returns: 0 on failure
196  *          1 on success
197  */
198 int
199 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
200 {
201    if (sql_query(mdb, cmd)) {
202       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
203       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
204       if (verbose) {
205          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
206       }
207       return 0;
208    }
209    if (mdb->have_insert_id) {
210       mdb->num_rows = sql_affected_rows(mdb);
211    } else {
212       mdb->num_rows = 1;
213    }
214    if (mdb->num_rows != 1) {
215       char ed1[30];
216       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
217          edit_uint64(mdb->num_rows, ed1));
218       if (verbose) {
219          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
220       }
221       return 0;
222    }
223    mdb->changes++;
224    return 1;
225 }
226
227 /* Utility routine for updates.
228  *  Returns: 0 on failure
229  *           1 on success
230  */
231 int
232 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
233 {
234
235    if (sql_query(mdb, cmd)) {
236       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
237       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
238       if (verbose) {
239          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
240       }
241       return 0;
242    }
243    mdb->num_rows = sql_affected_rows(mdb);
244    if (mdb->num_rows < 1) {
245       char ed1[30];
246       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
247          edit_uint64(mdb->num_rows, ed1), cmd);
248       if (verbose) {
249 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
250       }
251       return 0;
252    }
253    mdb->changes++;
254    return 1;
255 }
256
257 /* Utility routine for deletes
258  *
259  * Returns: -1 on error
260  *           n number of rows affected
261  */
262 int
263 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
264 {
265
266    if (sql_query(mdb, cmd)) {
267       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
268       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
269       if (verbose) {
270          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
271       }
272       return -1;
273    }
274    mdb->changes++;
275    return sql_affected_rows(mdb);
276 }
277
278
279 /*
280  * Get record max. Query is already in mdb->cmd
281  *  No locking done
282  *
283  * Returns: -1 on failure
284  *          count on success
285  */
286 int get_sql_record_max(JCR *jcr, B_DB *mdb)
287 {
288    SQL_ROW row;
289    int stat = 0;
290
291    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
292       if ((row = sql_fetch_row(mdb)) == NULL) {
293          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
294          stat = -1;
295       } else {
296          stat = str_to_int64(row[0]);
297       }
298       sql_free_result(mdb);
299    } else {
300       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
301       stat = -1;
302    }
303    return stat;
304 }
305
306 /*
307  * Return pre-edited error message
308  */
309 char *db_strerror(B_DB *mdb)
310 {
311    return mdb->errmsg;
312 }
313
314 static void update_lock_dbg(B_DB *mdb) 
315 {
316    if (mdb->allow_transactions) { /* batch connection */
317       return;
318    }
319    if (_db_lock_recurse_count && !pthread_equal(_db_lock_threadid, pthread_self())) {
320       Dmsg2(1, "ERROR: not the same threadif %p != %p\n", _db_lock_threadid, pthread_self());
321    }
322    _db_lock_recurse_count++;
323    _db_lock_time = (utime_t) time(NULL);
324    _db_lock_threadid = pthread_self();
325 }
326
327 static void update_unlock_dbg(B_DB *mdb) 
328 {
329    if (mdb->allow_transactions) { /* batch connection */
330       return;
331    }
332    if (!pthread_equal(_db_lock_threadid, pthread_self())) {
333       Dmsg2(1, "ERROR: not the same threadid %p != %p", _db_lock_threadid, pthread_self());
334    }
335    _db_lock_recurse_count--;
336    if (!_db_lock_recurse_count) {
337       memset(&_db_lock_threadid, 0, sizeof(_db_lock_threadid));
338    }
339 }
340
341 /*
342  * Lock database, this can be called multiple times by the same
343  *   thread without blocking, but must be unlocked the number of
344  *   times it was locked.
345  */
346 void _db_lock(const char *file, int line, B_DB *mdb)
347 {
348    int errstat;
349    if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
350       berrno be;
351       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
352            errstat, be.bstrerror(errstat));
353    }
354    update_lock_dbg(mdb);
355 }
356
357 /*
358  * Unlock the database. This can be called multiple times by the
359  *   same thread up to the number of times that thread called
360  *   db_lock()/
361  */
362 void _db_unlock(const char *file, int line, B_DB *mdb)
363 {
364    int errstat;
365    update_unlock_dbg(mdb);
366    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
367       berrno be;
368       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
369            errstat, be.bstrerror(errstat));
370    }
371 }
372
373 /*
374  * Start a transaction. This groups inserts and makes things
375  *  much more efficient. Usually started when inserting
376  *  file attributes.
377  */
378 void db_start_transaction(JCR *jcr, B_DB *mdb)
379 {
380    if (!jcr->attr) {
381       jcr->attr = get_pool_memory(PM_FNAME);
382    }
383    if (!jcr->ar) {
384       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
385    }
386
387 #ifdef HAVE_SQLITE
388    if (!mdb->allow_transactions) {
389       return;
390    }
391    db_lock(mdb);
392    /* Allow only 10,000 changes per transaction */
393    if (mdb->transaction && mdb->changes > 10000) {
394       db_end_transaction(jcr, mdb);
395    }
396    if (!mdb->transaction) {
397       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
398       Dmsg0(400, "Start SQLite transaction\n");
399       mdb->transaction = 1;
400    }
401    db_unlock(mdb);
402 #endif
403
404 /*
405  * This is turned off because transactions break
406  * if multiple simultaneous jobs are run.
407  */
408 #ifdef HAVE_POSTGRESQL
409    if (!mdb->allow_transactions) {
410       return;
411    }
412    db_lock(mdb);
413    /* Allow only 25,000 changes per transaction */
414    if (mdb->transaction && mdb->changes > 25000) {
415       db_end_transaction(jcr, mdb);
416    }
417    if (!mdb->transaction) {
418       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
419       Dmsg0(400, "Start PosgreSQL transaction\n");
420       mdb->transaction = 1;
421    }
422    db_unlock(mdb);
423 #endif
424
425 #ifdef HAVE_DBI
426    if (db_type == SQL_TYPE_SQLITE) {
427       if (!mdb->allow_transactions) {
428          return;
429       }
430       db_lock(mdb);
431       /* Allow only 10,000 changes per transaction */
432       if (mdb->transaction && mdb->changes > 10000) {
433          db_end_transaction(jcr, mdb);
434       }
435       if (!mdb->transaction) {
436          //my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
437          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
438          Dmsg0(400, "Start SQLite transaction\n");
439          mdb->transaction = 1;
440       }
441       db_unlock(mdb);
442    } else if (db_type == SQL_TYPE_POSTGRESQL) {
443       if (!mdb->allow_transactions) {
444          return;
445       }
446       db_lock(mdb);
447       /* Allow only 25,000 changes per transaction */
448       if (mdb->transaction && mdb->changes > 25000) {
449          db_end_transaction(jcr, mdb);
450       }
451       if (!mdb->transaction) {
452          db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
453          Dmsg0(400, "Start PosgreSQL transaction\n");
454          mdb->transaction = 1;
455       }
456       db_unlock(mdb);
457    }
458 #endif
459 }
460
461 void db_end_transaction(JCR *jcr, B_DB *mdb)
462 {
463    /*
464     * This can be called during thread cleanup and
465     *   the db may already be closed.  So simply return.
466     */
467    if (!mdb) {
468       return;
469    }
470
471    if (jcr && jcr->cached_attribute) {
472       Dmsg0(400, "Flush last cached attribute.\n");
473       if (!db_create_file_attributes_record(jcr, mdb, jcr->ar)) {
474          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
475       }
476       jcr->cached_attribute = false;
477    }
478
479 #ifdef HAVE_SQLITE
480    if (!mdb->allow_transactions) {
481       return;
482    }
483    db_lock(mdb);
484    if (mdb->transaction) {
485       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
486       mdb->transaction = 0;
487       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
488    }
489    mdb->changes = 0;
490    db_unlock(mdb);
491 #endif
492
493 #ifdef HAVE_POSTGRESQL
494    if (!mdb->allow_transactions) {
495       return;
496    }
497    db_lock(mdb);
498    if (mdb->transaction) {
499       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
500       mdb->transaction = 0;
501       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
502    }
503    mdb->changes = 0;
504    db_unlock(mdb);
505 #endif
506
507 #ifdef HAVE_DBI
508    if (db_type == SQL_TYPE_SQLITE) {
509       if (!mdb->allow_transactions) {
510          return;
511       }
512       db_lock(mdb);
513       if (mdb->transaction) {
514          //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
515          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
516          mdb->transaction = 0;
517          Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
518       }
519       mdb->changes = 0;
520       db_unlock(mdb);
521    } else if (db_type == SQL_TYPE_POSTGRESQL) {
522       if (!mdb->allow_transactions) {
523          return;
524       }
525       db_lock(mdb);
526       if (mdb->transaction) {
527          db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
528          mdb->transaction = 0;
529          Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
530       }
531       mdb->changes = 0;
532       db_unlock(mdb);
533    }
534 #endif
535 }
536
537 /*
538  * Given a full filename, split it into its path
539  *  and filename parts. They are returned in pool memory
540  *  in the mdb structure.
541  */
542 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
543 {
544    const char *p, *f;
545
546    /* Find path without the filename.
547     * I.e. everything after the last / is a "filename".
548     * OK, maybe it is a directory name, but we treat it like
549     * a filename. If we don't find a / then the whole name
550     * must be a path name (e.g. c:).
551     */
552    for (p=f=fname; *p; p++) {
553       if (IsPathSeparator(*p)) {
554          f = p;                       /* set pos of last slash */
555       }
556    }
557    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
558       f++;                            /* yes, point to filename */
559    } else {                           /* no, whole thing must be path name */
560       f = p;
561    }
562
563    /* If filename doesn't exist (i.e. root directory), we
564     * simply create a blank name consisting of a single
565     * space. This makes handling zero length filenames
566     * easier.
567     */
568    mdb->fnl = p - f;
569    if (mdb->fnl > 0) {
570       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
571       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
572       mdb->fname[mdb->fnl] = 0;
573    } else {
574       mdb->fname[0] = 0;
575       mdb->fnl = 0;
576    }
577
578    mdb->pnl = f - fname;
579    if (mdb->pnl > 0) {
580       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
581       memcpy(mdb->path, fname, mdb->pnl);
582       mdb->path[mdb->pnl] = 0;
583    } else {
584       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
585       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
586       mdb->path[0] = 0;
587       mdb->pnl = 0;
588    }
589
590    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
591 }
592
593 /*
594  * List dashes as part of header for listing SQL results in a table
595  */
596 void
597 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
598 {
599    SQL_FIELD  *field;
600    int i, j;
601
602    sql_field_seek(mdb, 0);
603    send(ctx, "+");
604    for (i = 0; i < sql_num_fields(mdb); i++) {
605       field = sql_fetch_field(mdb);
606       if (!field) {
607          break;
608       }
609       for (j = 0; j < (int)field->max_length + 2; j++) {
610          send(ctx, "-");
611       }
612       send(ctx, "+");
613    }
614    send(ctx, "\n");
615 }
616
617 /*
618  * If full_list is set, we list vertically, otherwise, we
619  * list on one line horizontally.
620  */
621 void
622 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
623 {
624    SQL_FIELD *field;
625    SQL_ROW row;
626    int i, col_len, max_len = 0;
627    char buf[2000], ewc[30];
628
629    Dmsg0(800, "list_result starts\n");
630    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
631       send(ctx, _("No results to list.\n"));
632       return;
633    }
634
635    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
636    /* determine column display widths */
637    sql_field_seek(mdb, 0);
638    for (i = 0; i < sql_num_fields(mdb); i++) {
639       Dmsg1(800, "list_result processing field %d\n", i);
640       field = sql_fetch_field(mdb);
641       if (!field) {
642          break;
643       }
644       col_len = cstrlen(field->name);
645       if (type == VERT_LIST) {
646          if (col_len > max_len) {
647             max_len = col_len;
648          }
649       } else {
650          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
651             field->max_length += (field->max_length - 1) / 3;
652          }
653          if (col_len < (int)field->max_length) {
654             col_len = field->max_length;
655          }
656          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
657             col_len = 4;                 /* 4 = length of the word "NULL" */
658          }
659          field->max_length = col_len;    /* reset column info */
660       }
661    }
662
663    Dmsg0(800, "list_result finished first loop\n");
664    if (type == VERT_LIST) {
665       goto vertical_list;
666    }
667
668    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
669    list_dashes(mdb, send, ctx);
670    send(ctx, "|");
671    sql_field_seek(mdb, 0);
672    for (i = 0; i < sql_num_fields(mdb); i++) {
673       Dmsg1(800, "list_result looking at field %d\n", i);
674       field = sql_fetch_field(mdb);
675       if (!field) {
676          break;
677       }
678       bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, field->name);
679       send(ctx, buf);
680    }
681    send(ctx, "\n");
682    list_dashes(mdb, send, ctx);
683
684    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
685    while ((row = sql_fetch_row(mdb)) != NULL) {
686       sql_field_seek(mdb, 0);
687       send(ctx, "|");
688       for (i = 0; i < sql_num_fields(mdb); i++) {
689          field = sql_fetch_field(mdb);
690          if (!field) {
691             break;
692          }
693          if (row[i] == NULL) {
694             bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, "NULL");
695          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
696             bsnprintf(buf, sizeof(buf), " %*s |", (int)field->max_length,
697                       add_commas(row[i], ewc));
698          } else {
699             bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, row[i]);
700          }
701          send(ctx, buf);
702       }
703       send(ctx, "\n");
704    }
705    list_dashes(mdb, send, ctx);
706    return;
707
708 vertical_list:
709
710    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
711    while ((row = sql_fetch_row(mdb)) != NULL) {
712       sql_field_seek(mdb, 0);
713       for (i = 0; i < sql_num_fields(mdb); i++) {
714          field = sql_fetch_field(mdb);
715          if (!field) {
716             break;
717          }
718          if (row[i] == NULL) {
719             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
720          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
721             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
722                 add_commas(row[i], ewc));
723          } else {
724             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
725          }
726          send(ctx, buf);
727       }
728       send(ctx, "\n");
729    }
730    return;
731 }
732
733 /* 
734  * Open a new connexion to mdb catalog. This function is used
735  * by batch and accurate mode.
736  */
737 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
738 {
739    int multi_db=false;
740
741 #ifdef HAVE_BATCH_FILE_INSERT
742    multi_db=true;               /* we force a new connexion only if batch insert is enabled */
743 #endif
744
745    if (!jcr->db_batch) {
746       jcr->db_batch = db_init_database(jcr, 
747                                       mdb->db_name, 
748                                       mdb->db_user,
749                                       mdb->db_password, 
750                                       mdb->db_address,
751                                       mdb->db_port,
752                                       mdb->db_socket,
753                                       multi_db /* multi_db = true when using batch mode */);
754       if (!jcr->db_batch) {
755          Jmsg0(jcr, M_FATAL, 0, "Could not init batch connexion");
756          return false;
757       }
758
759       if (!db_open_database(jcr, jcr->db_batch)) {
760          Mmsg2(&jcr->db_batch->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
761               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
762          Jmsg1(jcr, M_FATAL, 0, "%s", jcr->db_batch->errmsg);
763          return false;
764       }      
765       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
766             jcr->db_batch->connected, jcr->db_batch->db);
767
768    }
769    return true;
770 }
771
772 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/