]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/sql.c
Update version date
[bacula/bacula] / bacula / src / cats / sql.c
index 10bfaf3085f253171cff7b51dd9cdfd74b72846b..2c4fa431eea984f7731af79abf59191dac2725d6 100644 (file)
@@ -1,12 +1,12 @@
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2000-2007 Free Software Foundation Europe e.V.
+   Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
 
-   Bacula® is a registered trademark of John Walker.
+   Bacula® is a registered trademark of Kern Sibbald.
    The licensor of Bacula is the Free Software Foundation Europe
    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
    Switzerland, email:ftf@fsfeurope.org.
  *
  *     Almost generic set of SQL database interface routines
  *      (with a little more work)
+ *     SQL engine specific routines are in mysql.c, postgresql.c,
+ *       sqlite.c, ...
  *
  *    Kern Sibbald, March 2000
  *
- *    Version $Id$
+ *    Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
  */
 
-/* The following is necessary so that we do not include
- * the dummy external definition of B_DB.
- */
-#define __SQL_C                       /* indicate that this is sql.c */
-
 #include "bacula.h"
-#include "cats.h"
 
-#if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL
+#if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
 
-uint32_t bacula_db_version = 0;
+#include "cats.h"
+#include "bdb_priv.h"
+#include "sql_glue.h"
 
 /* Forward referenced subroutines */
 void print_dashes(B_DB *mdb);
 void print_result(B_DB *mdb);
 
-dbid_list::dbid_list() 
+dbid_list::dbid_list()
 {
    memset(this, 0, sizeof(dbid_list));
    max_ids = 1000;
@@ -61,16 +59,15 @@ dbid_list::dbid_list()
    PurgedFiles = NULL;
 }
 
-dbid_list::~dbid_list() 
-{ 
+dbid_list::~dbid_list()
+{
    free(DBId);
 }
 
-
 /*
  * Called here to retrieve an integer from the database
  */
-static int int_handler(void *ctx, int num_fields, char **row)
+int db_int_handler(void *ctx, int num_fields, char **row)
 {
    uint32_t *val = (uint32_t *)ctx;
 
@@ -102,7 +99,98 @@ int db_int64_handler(void *ctx, int num_fields, char **row)
    return 0;
 }
 
+/*
+ * Called here to retrieve a btime from the database.
+ *   The returned integer will be extended to 64 bit.
+ */
+int db_strtime_handler(void *ctx, int num_fields, char **row)
+{
+   db_int64_ctx *lctx = (db_int64_ctx *)ctx;
+
+   if (row[0]) {
+      lctx->value = str_to_utime(row[0]);
+      lctx->count++;
+   }
+   return 0;
+}
+
+/*
+ * Use to build a comma separated list of values from a query. "10,20,30"
+ */
+int db_list_handler(void *ctx, int num_fields, char **row)
+{
+   db_list_ctx *lctx = (db_list_ctx *)ctx;
+   if (num_fields == 1 && row[0]) {
+      lctx->add(row[0]);
+   }
+   return 0;
+}
+
+/*
+ * specific context passed from db_check_max_connections to db_max_connections_handler.
+ */
+struct max_connections_context {
+   B_DB *db;
+   uint32_t nr_connections;
+};
+
+/*
+ * Called here to retrieve an integer from the database
+ */
+static inline int db_max_connections_handler(void *ctx, int num_fields, char **row)
+{
+   struct max_connections_context *context;
+   uint32_t index;
+
+   context = (struct max_connections_context *)ctx;
+   switch (db_get_type_index(context->db)) {
+   case SQL_TYPE_MYSQL:
+      index = 1;
+   default:
+      index = 0;
+   }
+
+   if (row[index]) {
+      context->nr_connections = str_to_int64(row[index]);
+   } else {
+      Dmsg0(800, "int_handler finds zero\n");
+      context->nr_connections = 0;
+   }
+   return 0;
+}
+
+/*
+ * Check catalog max_connections setting
+ */
+bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
+{
+   struct max_connections_context context;
+
+   /* Without Batch insert, no need to verify max_connections */
+   if (!mdb->batch_insert_available())
+      return true;
+
+   context.db = mdb;
+   context.nr_connections = 0;
+
+   /* Check max_connections setting */
+   if (!db_sql_query(mdb, sql_get_max_connections[db_get_type_index(mdb)],
+                     db_max_connections_handler, &context)) {
+      Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
+      return false;
+   }
+   if (context.nr_connections && max_concurrent_jobs && max_concurrent_jobs > context.nr_connections) {
+      Mmsg(mdb->errmsg,
+           _("Potential performance problem:\n"
+             "max_connections=%d set for %s database \"%s\" should be larger than Director's "
+             "MaxConcurrentJobs=%d\n"),
+           context.nr_connections, db_get_type(mdb), mdb->get_db_name(), max_concurrent_jobs);
+      Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
+      return false;
+   }
 
+   return true;
+}
 
 /* NOTE!!! The following routines expect that the
  *  calling subroutine sets and clears the mutex
@@ -111,31 +199,33 @@ int db_int64_handler(void *ctx, int num_fields, char **row)
 /* Check that the tables correspond to the version we want */
 bool check_tables_version(JCR *jcr, B_DB *mdb)
 {
+   uint32_t bacula_db_version = 0;
    const char *query = "SELECT VersionId FROM Version";
 
    bacula_db_version = 0;
-   if (!db_sql_query(mdb, query, int_handler, (void *)&bacula_db_version)) {
-      Mmsg(mdb->errmsg, "Database not created or server not running.\n");
+   if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
       return false;
    }
    if (bacula_db_version != BDB_VERSION) {
       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
-          mdb->db_name, BDB_VERSION, bacula_db_version);
+          mdb->get_db_name(), BDB_VERSION, bacula_db_version);
       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
       return false;
    }
    return true;
 }
 
-/* Utility routine for queries. The database MUST be locked before calling here. */
+/*
+ * Utility routine for queries. The database MUST be locked before calling here.
+ * Returns: 0 on failure
+ *          1 on success
+ */
 int
 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
 {
-   int status;
-
    sql_free_result(mdb);
-   if ((status=sql_query(mdb, cmd)) != 0) {
+   if (!sql_query(mdb, cmd, QF_STORE_RESULT)) {
       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
       if (verbose) {
@@ -144,9 +234,7 @@ QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
       return 0;
    }
 
-   mdb->result = sql_store_result(mdb);
-
-   return mdb->result != NULL;
+   return 1;
 }
 
 /*
@@ -157,7 +245,9 @@ QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
 int
 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
 {
-   if (sql_query(mdb, cmd)) {
+   int num_rows;
+
+   if (!sql_query(mdb, cmd)) {
       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
       if (verbose) {
@@ -165,15 +255,11 @@ InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
       }
       return 0;
    }
-   if (mdb->have_insert_id) {
-      mdb->num_rows = sql_affected_rows(mdb);
-   } else {
-      mdb->num_rows = 1;
-   }
-   if (mdb->num_rows != 1) {
+   num_rows = sql_affected_rows(mdb);
+   if (num_rows != 1) {
       char ed1[30];
       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
-         edit_uint64(mdb->num_rows, ed1));
+         edit_uint64(num_rows, ed1));
       if (verbose) {
          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
       }
@@ -190,8 +276,9 @@ InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
 int
 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
 {
+   int num_rows;
 
-   if (sql_query(mdb, cmd)) {
+   if (!sql_query(mdb, cmd)) {
       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
       if (verbose) {
@@ -199,11 +286,11 @@ UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
       }
       return 0;
    }
-   mdb->num_rows = sql_affected_rows(mdb);
-   if (mdb->num_rows < 1) {
+   num_rows = sql_affected_rows(mdb);
+   if (num_rows < 1) {
       char ed1[30];
       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
-         edit_uint64(mdb->num_rows, ed1), cmd);
+         edit_uint64(num_rows, ed1), cmd);
       if (verbose) {
 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
       }
@@ -213,7 +300,8 @@ UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
    return 1;
 }
 
-/* Utility routine for deletes
+/*
+ * Utility routine for deletes
  *
  * Returns: -1 on error
  *           n number of rows affected
@@ -222,7 +310,7 @@ int
 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
 {
 
-   if (sql_query(mdb, cmd)) {
+   if (!sql_query(mdb, cmd)) {
       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
       if (verbose) {
@@ -270,136 +358,6 @@ char *db_strerror(B_DB *mdb)
    return mdb->errmsg;
 }
 
-/*
- * Lock database, this can be called multiple times by the same
- *   thread without blocking, but must be unlocked the number of
- *   times it was locked.
- */
-void _db_lock(const char *file, int line, B_DB *mdb)
-{
-   int errstat;
-   if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
-      berrno be;
-      e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
-           errstat, be.bstrerror(errstat));
-   }
-}
-
-/*
- * Unlock the database. This can be called multiple times by the
- *   same thread up to the number of times that thread called
- *   db_lock()/
- */
-void _db_unlock(const char *file, int line, B_DB *mdb)
-{
-   int errstat;
-   if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
-      berrno be;
-      e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
-           errstat, be.bstrerror(errstat));
-   }
-}
-
-/*
- * Start a transaction. This groups inserts and makes things
- *  much more efficient. Usually started when inserting
- *  file attributes.
- */
-void db_start_transaction(JCR *jcr, B_DB *mdb)
-{
-   if (!jcr->attr) {
-      jcr->attr = get_pool_memory(PM_FNAME);
-   }
-   if (!jcr->ar) {
-      jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
-   }
-
-#ifdef HAVE_SQLITE
-   if (!mdb->allow_transactions) {
-      return;
-   }
-   db_lock(mdb);
-   /* Allow only 10,000 changes per transaction */
-   if (mdb->transaction && mdb->changes > 10000) {
-      db_end_transaction(jcr, mdb);
-   }
-   if (!mdb->transaction) {
-      my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
-      Dmsg0(400, "Start SQLite transaction\n");
-      mdb->transaction = 1;
-   }
-   db_unlock(mdb);
-#endif
-
-/*
- * This is turned off because transactions break
- * if multiple simultaneous jobs are run.
- */
-#ifdef HAVE_POSTGRESQL
-   if (!mdb->allow_transactions) {
-      return;
-   }
-   db_lock(mdb);
-   /* Allow only 25,000 changes per transaction */
-   if (mdb->transaction && mdb->changes > 25000) {
-      db_end_transaction(jcr, mdb);
-   }
-   if (!mdb->transaction) {
-      db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
-      Dmsg0(400, "Start PosgreSQL transaction\n");
-      mdb->transaction = 1;
-   }
-   db_unlock(mdb);
-#endif
-}
-
-void db_end_transaction(JCR *jcr, B_DB *mdb)
-{
-   /*
-    * This can be called during thread cleanup and
-    *   the db may already be closed.  So simply return.
-    */
-   if (!mdb) {
-      return;
-   }
-
-   if (jcr && jcr->cached_attribute) {
-      Dmsg0(400, "Flush last cached attribute.\n");
-      if (!db_create_file_attributes_record(jcr, mdb, jcr->ar)) {
-         Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
-      }
-      jcr->cached_attribute = false;
-   }
-
-#ifdef HAVE_SQLITE
-   if (!mdb->allow_transactions) {
-      return;
-   }
-   db_lock(mdb);
-   if (mdb->transaction) {
-      my_sqlite_query(mdb, "COMMIT"); /* end transaction */
-      mdb->transaction = 0;
-      Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
-   }
-   mdb->changes = 0;
-   db_unlock(mdb);
-#endif
-
-#ifdef HAVE_POSTGRESQL
-   if (!mdb->allow_transactions) {
-      return;
-   }
-   db_lock(mdb);
-   if (mdb->transaction) {
-      db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
-      mdb->transaction = 0;
-      Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
-   }
-   mdb->changes = 0;
-   db_unlock(mdb);
-#endif
-}
-
 /*
  * Given a full filename, split it into its path
  *  and filename parts. They are returned in pool memory
@@ -456,20 +414,41 @@ void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
 }
 
+/*
+ * Set maximum field length to something reasonable
+ */
+static int max_length(int max_length)
+{
+   int max_len = max_length;
+   /* Sanity check */
+   if (max_len < 0) {
+      max_len = 2;
+   } else if (max_len > 100) {
+      max_len = 100;
+   }
+   return max_len;
+}
+
 /*
  * List dashes as part of header for listing SQL results in a table
  */
-void
-list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
+void list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
 {
    SQL_FIELD  *field;
    int i, j;
+   int len;
+   int num_fields;
 
    sql_field_seek(mdb, 0);
    send(ctx, "+");
-   for (i = 0; i < sql_num_fields(mdb); i++) {
+   num_fields = sql_num_fields(mdb);
+   for (i = 0; i < num_fields; i++) {
       field = sql_fetch_field(mdb);
-      for (j = 0; j < (int)field->max_length + 2; j++) {
+      if (!field) {
+         break;
+      }
+      len = max_length(field->max_length + 2);
+      for (j = 0; j < len; j++) {
          send(ctx, "-");
       }
       send(ctx, "+");
@@ -477,43 +456,177 @@ list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
    send(ctx, "\n");
 }
 
+/* Small handler to print the last line of a list xxx command */
+static void last_line_handler(void *vctx, const char *str)
+{
+   LIST_CTX *ctx = (LIST_CTX *)vctx;
+   bstrncat(ctx->line, str, sizeof(ctx->line));
+}
+
+int list_result(void *vctx, int nb_col, char **row)
+{
+   SQL_FIELD *field;
+   int i, col_len, max_len = 0;
+   int num_fields;
+   char buf[2000], ewc[30];
+
+   LIST_CTX *pctx = (LIST_CTX *)vctx;
+   DB_LIST_HANDLER *send = pctx->send;
+   e_list_type type = pctx->type;
+   B_DB *mdb = pctx->mdb;
+   void *ctx = pctx->ctx;
+   JCR *jcr = pctx->jcr;
+
+   num_fields = sql_num_fields(mdb);
+   if (!pctx->once) {
+      pctx->once = true;
+
+      Dmsg1(800, "list_result starts looking at %d fields\n", num_fields);
+      /* determine column display widths */
+      sql_field_seek(mdb, 0);
+      for (i = 0; i < num_fields; i++) {
+         Dmsg1(800, "list_result processing field %d\n", i);
+         field = sql_fetch_field(mdb);
+         if (!field) {
+            break;
+         }
+         col_len = cstrlen(field->name);
+         if (type == VERT_LIST) {
+            if (col_len > max_len) {
+               max_len = col_len;
+            }
+         } else {
+            if (sql_field_is_numeric(mdb, field->type) && (int)field->max_length > 0) { /* fixup for commas */
+               field->max_length += (field->max_length - 1) / 3;
+            }  
+            if (col_len < (int)field->max_length) {
+               col_len = field->max_length;
+            }  
+            if (col_len < 4 && !sql_field_is_not_null(mdb, field->flags)) {
+               col_len = 4;                 /* 4 = length of the word "NULL" */
+            }
+            field->max_length = col_len;    /* reset column info */
+         }
+      }
+
+      pctx->num_rows++;
+
+      Dmsg0(800, "list_result finished first loop\n");
+      if (type == VERT_LIST) {
+         goto vertical_list;
+      }
+
+      Dmsg1(800, "list_result starts second loop looking at %d fields\n", num_fields);
+
+      /* Keep the result to display the same line at the end of the table */
+      list_dashes(mdb, last_line_handler, pctx);
+      send(ctx, pctx->line);
+
+      send(ctx, "|");
+      sql_field_seek(mdb, 0);
+      for (i = 0; i < num_fields; i++) {
+         Dmsg1(800, "list_result looking at field %d\n", i);
+         field = sql_fetch_field(mdb);
+         if (!field) {
+            break;
+         }
+         max_len = max_length(field->max_length);
+         bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
+         send(ctx, buf);
+      }
+      send(ctx, "\n");
+      list_dashes(mdb, send, ctx);      
+   }
+   
+   Dmsg1(800, "list_result starts third loop looking at %d fields\n", num_fields);
+
+   sql_field_seek(mdb, 0);
+   send(ctx, "|");
+   for (i = 0; i < num_fields; i++) {
+      field = sql_fetch_field(mdb);
+      if (!field) {
+         break;
+      }
+      max_len = max_length(field->max_length);
+      if (row[i] == NULL) {
+         bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
+      } else if (sql_field_is_numeric(mdb, field->type) && !jcr->gui && is_an_integer(row[i])) {
+         bsnprintf(buf, sizeof(buf), " %*s |", max_len,
+                   add_commas(row[i], ewc));
+      } else {
+         bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
+      }
+      send(ctx, buf);
+   }
+   send(ctx, "\n");
+   return 0;
+
+vertical_list:
+
+   Dmsg1(800, "list_result starts vertical list at %d fields\n", num_fields);
+
+   sql_field_seek(mdb, 0);
+   for (i = 0; i < num_fields; i++) {
+      field = sql_fetch_field(mdb);
+      if (!field) {
+         break;
+      }
+      if (row[i] == NULL) {
+         bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
+      } else if (sql_field_is_numeric(mdb, field->type) && !jcr->gui && is_an_integer(row[i])) {
+         bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
+                   add_commas(row[i], ewc));
+      } else {
+         bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
+      }
+      send(ctx, buf);
+   }
+   send(ctx, "\n");
+   return 0;
+}
+
 /*
  * If full_list is set, we list vertically, otherwise, we
- * list on one line horizontally.
+ *  list on one line horizontally.
+ * Return number of rows
  */
-void
-list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
+int list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
 {
    SQL_FIELD *field;
    SQL_ROW row;
    int i, col_len, max_len = 0;
+   int num_fields;
    char buf[2000], ewc[30];
 
    Dmsg0(800, "list_result starts\n");
-   if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
+   if (sql_num_rows(mdb) == 0) {
       send(ctx, _("No results to list.\n"));
-      return;
+      return sql_num_rows(mdb);
    }
 
-   Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
+   num_fields = sql_num_fields(mdb);
+   Dmsg1(800, "list_result starts looking at %d fields\n", num_fields);
    /* determine column display widths */
    sql_field_seek(mdb, 0);
-   for (i = 0; i < sql_num_fields(mdb); i++) {
+   for (i = 0; i < num_fields; i++) {
       Dmsg1(800, "list_result processing field %d\n", i);
       field = sql_fetch_field(mdb);
+      if (!field) {
+         break;
+      }
       col_len = cstrlen(field->name);
       if (type == VERT_LIST) {
          if (col_len > max_len) {
             max_len = col_len;
          }
       } else {
-         if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
+         if (sql_field_is_numeric(mdb, field->type) && (int)field->max_length > 0) { /* fixup for commas */
             field->max_length += (field->max_length - 1) / 3;
-         }
+         }  
          if (col_len < (int)field->max_length) {
             col_len = field->max_length;
-         }
-         if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
+         }  
+         if (col_len < 4 && !sql_field_is_not_null(mdb, field->flags)) {
             col_len = 4;                 /* 4 = length of the word "NULL" */
          }
          field->max_length = col_len;    /* reset column info */
@@ -525,50 +638,61 @@ list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type t
       goto vertical_list;
    }
 
-   Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
+   Dmsg1(800, "list_result starts second loop looking at %d fields\n", num_fields);
    list_dashes(mdb, send, ctx);
    send(ctx, "|");
    sql_field_seek(mdb, 0);
-   for (i = 0; i < sql_num_fields(mdb); i++) {
+   for (i = 0; i < num_fields; i++) {
       Dmsg1(800, "list_result looking at field %d\n", i);
       field = sql_fetch_field(mdb);
-      bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, field->name);
+      if (!field) {
+         break;
+      }
+      max_len = max_length(field->max_length);
+      bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
       send(ctx, buf);
    }
    send(ctx, "\n");
    list_dashes(mdb, send, ctx);
 
-   Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
+   Dmsg1(800, "list_result starts third loop looking at %d fields\n", num_fields);
    while ((row = sql_fetch_row(mdb)) != NULL) {
       sql_field_seek(mdb, 0);
       send(ctx, "|");
-      for (i = 0; i < sql_num_fields(mdb); i++) {
+      for (i = 0; i < num_fields; i++) {
          field = sql_fetch_field(mdb);
+         if (!field) {
+            break;
+         }
+         max_len = max_length(field->max_length);
          if (row[i] == NULL) {
-            bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, "NULL");
-         } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
-            bsnprintf(buf, sizeof(buf), " %*s |", (int)field->max_length,
+            bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
+         } else if (sql_field_is_numeric(mdb, field->type) && !jcr->gui && is_an_integer(row[i])) {
+            bsnprintf(buf, sizeof(buf), " %*s |", max_len,
                       add_commas(row[i], ewc));
          } else {
-            bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, row[i]);
+            bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
          }
          send(ctx, buf);
       }
       send(ctx, "\n");
    }
    list_dashes(mdb, send, ctx);
-   return;
+   return sql_num_rows(mdb);
 
 vertical_list:
 
-   Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
+   Dmsg1(800, "list_result starts vertical list at %d fields\n", num_fields);
    while ((row = sql_fetch_row(mdb)) != NULL) {
       sql_field_seek(mdb, 0);
-      for (i = 0; i < sql_num_fields(mdb); i++) {
+      for (i = 0; i < num_fields; i++) {
          field = sql_fetch_field(mdb);
+         if (!field) {
+            break;
+         }
          if (row[i] == NULL) {
             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
-         } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
+         } else if (sql_field_is_numeric(mdb, field->type) && !jcr->gui && is_an_integer(row[i])) {
             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
                 add_commas(row[i], ewc));
          } else {
@@ -578,8 +702,57 @@ vertical_list:
       }
       send(ctx, "\n");
    }
-   return;
+   return sql_num_rows(mdb);
 }
 
+/* 
+ * Open a new connexion to mdb catalog. This function is used
+ * by batch and accurate mode.
+ */
+bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
+{
+   bool multi_db;
+
+   if (mdb->batch_insert_available())
+      multi_db = true;   /* we force a new connection only if batch insert is enabled */
+   else
+      multi_db = false;
+
+   if (!jcr->db_batch) {
+      jcr->db_batch = db_clone_database_connection(mdb, jcr, multi_db);
+      if (!jcr->db_batch) {
+         Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
+         Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
+         return false;
+      }
+
+      if (!db_open_database(jcr, jcr->db_batch)) {
+         Mmsg2(&mdb->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
+              jcr->db_batch->get_db_name(), db_strerror(jcr->db_batch));
+         Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
+         return false;
+      }      
+   }
+   return true;
+}
+
+/*
+ * !!! WARNING !!! Use this function only when bacula is stopped.
+ * ie, after a fatal signal and before exiting the program
+ * Print information about a B_DB object.
+ */
+void db_debug_print(JCR *jcr, FILE *fp)
+{
+   B_DB *mdb = jcr->db;
+
+   if (!mdb) {
+      return;
+   }
+
+   fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%s\n",
+           mdb, NPRTB(mdb->get_db_name()), NPRTB(mdb->get_db_user()), mdb->is_connected() ? "true" : "false");
+   fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
+   mdb->print_lock_info(fp);
+}
 
-#endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/
+#endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI */