]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/ingres.c
Split messages line by line before sending it to syslog() fix #3325
[bacula/bacula] / bacula / src / cats / ingres.c
index 5a9291847e39a219e8088a550bc4c32dd619bc2b..feb1aecf4983aa1574b638f6e9afd1ee9f00eab4 100755 (executable)
@@ -1,12 +1,12 @@
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
+   Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
@@ -15,7 +15,7 @@
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
 */
 /*
  * Bacula Catalog Database routines specific to Ingres
- *   These are Ingres specific routines
+ * These are Ingres specific routines
  *
- *    Stefan Reddig, June 2009
+ *    Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
  *    based uopn work done 
  *    by Dan Langille, December 2003 and
  *    by Kern Sibbald, March 2000
  *
+ * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
  */
 
-
-/* The following is necessary so that we do not include
- * the dummy external definition of DB.
- */
-#define __SQL_C                       /* indicate that this is sql.c */
-
 #include "bacula.h"
-#include "cats.h"
 
 #ifdef HAVE_INGRES
 
+#include "cats.h"
+#include "bdb_priv.h"
 #include "myingres.h"
+#include "bdb_ingres.h"
+#include "lib/breg.h"
 
 /* -----------------------------------------------------------------------
  *
  * -----------------------------------------------------------------------
  */
 
-/* List of open databases */  /* SRE: needed for ingres? */
-static BQUEUE db_list = {&db_list, &db_list};
+/*
+ * List of open databases.
+ */
+static dlist *db_list = NULL;
 
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
+struct B_DB_RWRULE {
+   int pattern_length;
+   char *search_pattern;
+   BREGEXP *rewrite_regexp;
+   bool trigger;
+};
+
 /*
- * Retrieve database type
+ * Create a new query filter.
  */
-const char *
-db_get_type(void)
+static bool db_allocate_query_filter(JCR *jcr, alist *query_filters, int pattern_length,
+                                     const char *search_pattern, const char *filter)
 {
-   return "Ingres";
+   B_DB_RWRULE *rewrite_rule;
+
+   rewrite_rule = (B_DB_RWRULE *)malloc(sizeof(B_DB_RWRULE));
+
+   rewrite_rule->pattern_length = pattern_length;
+   rewrite_rule->search_pattern = bstrdup(search_pattern);
+   rewrite_rule->rewrite_regexp = new_bregexp(filter);
+   rewrite_rule->trigger = false;
+
+   if (!rewrite_rule->rewrite_regexp) {
+      Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filter.\n"));
+      free(rewrite_rule->search_pattern);
+      free(rewrite_rule);
+      return false;
+   } else {
+      query_filters->append(rewrite_rule);
+      return true;
+   }
 }
 
 /*
- * Initialize database data structure. In principal this should
- * never have errors, or it is really fatal.
+ * Create a stack of all filters that should be applied to a SQL query
+ * before submitting it to the database backend.
  */
-B_DB *
-db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
-                 const char *db_address, int db_port, const char *db_socket,
-                 int mult_db_connections)
+static inline alist *db_initialize_query_filters(JCR *jcr)
 {
-   B_DB *mdb;
+   alist *query_filters;
 
-   if (!db_user) {
-      Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
+   query_filters = New(alist(10, not_owned_by_alist));
+
+   if (!query_filters) {
+      Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filters.\n"));
       return NULL;
    }
-   P(mutex);                          /* lock DB queue */
-   if (!mult_db_connections) {
-      /* Look to see if DB already open */
-      for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
-         if (bstrcmp(mdb->db_name, db_name) &&
-             bstrcmp(mdb->db_address, db_address) &&
-             mdb->db_port == db_port) {
-            Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
-            mdb->ref_count++;
-            V(mutex);
-            return mdb;                  /* already open */
+
+   db_allocate_query_filter(jcr, query_filters, 6, "OFFSET",
+                            "/LIMIT ([0-9]+) OFFSET ([0-9]+)/OFFSET $2 FETCH NEXT $1 ROWS ONLY/ig");
+   db_allocate_query_filter(jcr, query_filters, 5, "LIMIT",
+                            "/LIMIT ([0-9]+)/FETCH FIRST $1 ROWS ONLY/ig");
+   db_allocate_query_filter(jcr, query_filters, 9, "TEMPORARY",
+                            "/CREATE TEMPORARY TABLE (.+)/DECLARE GLOBAL TEMPORARY TABLE $1 ON COMMIT PRESERVE ROWS WITH NORECOVERY/i");
+
+   return query_filters;
+}
+
+/*
+ * Free all query filters.
+ */
+static inline void db_destroy_query_filters(alist *query_filters)
+{
+   B_DB_RWRULE *rewrite_rule;
+
+   foreach_alist(rewrite_rule, query_filters) {
+      free_bregexp(rewrite_rule->rewrite_regexp);
+      free(rewrite_rule->search_pattern);
+      free(rewrite_rule);
+   }
+
+   delete query_filters;
+}
+
+B_DB_INGRES::B_DB_INGRES(JCR *jcr,
+                         const char *db_driver,
+                         const char *db_name,
+                         const char *db_user,
+                         const char *db_password,
+                         const char *db_address,
+                         int db_port,
+                         const char *db_socket,
+                         bool mult_db_connections,
+                         bool disable_batch_insert)
+{
+   B_DB_INGRES *mdb;
+   int next_session_id = 0;
+
+   /*
+    * See what the next available session_id is.
+    * We first see what the highest session_id is used now.
+    */
+   if (db_list) {
+      foreach_dlist(mdb, db_list) {
+         if (mdb->m_session_id > next_session_id) {
+            next_session_id = mdb->m_session_id;
          }
       }
    }
-   Dmsg0(100, "db_open first time\n");
-   mdb = (B_DB *)malloc(sizeof(B_DB));
-   memset(mdb, 0, sizeof(B_DB));
-   mdb->db_name = bstrdup(db_name);
-   mdb->db_user = bstrdup(db_user);
+
+   /*
+    * Initialize the parent class members.
+    */
+   m_db_interface_type  = SQL_INTERFACE_TYPE_INGRES;
+   m_db_type = SQL_TYPE_INGRES;
+   m_db_driver = bstrdup("ingres");
+   m_db_name = bstrdup(db_name);
+   m_db_user = bstrdup(db_user);
    if (db_password) {
-      mdb->db_password = bstrdup(db_password);
+      m_db_password = bstrdup(db_password);
    }
    if (db_address) {
-      mdb->db_address  = bstrdup(db_address);
+      m_db_address = bstrdup(db_address);
    }
    if (db_socket) {
-      mdb->db_socket   = bstrdup(db_socket);
-   }
-   mdb->db_port        = db_port;
-   mdb->have_insert_id = TRUE;
-   mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
-   *mdb->errmsg        = 0;
-   mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
-   mdb->cached_path    = get_pool_memory(PM_FNAME);
-   mdb->cached_path_id = 0;
-   mdb->ref_count      = 1;
-   mdb->fname          = get_pool_memory(PM_FNAME);
-   mdb->path           = get_pool_memory(PM_FNAME);
-   mdb->esc_name       = get_pool_memory(PM_FNAME);
-   mdb->esc_path      = get_pool_memory(PM_FNAME);
-   mdb->allow_transactions = mult_db_connections;
-   qinsert(&db_list, &mdb->bq);            /* put db in list */
-   V(mutex);
-   return mdb;
-}
-
-/* Check that the database correspond to the encoding we want */
-static bool check_database_encoding(JCR *jcr, B_DB *mdb)
-{
-/* SRE: TODO! Needed?
- SQL_ROW row;
-   int ret=false;
-
-   if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
-      Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
-      return false;
+      m_db_socket = bstrdup(db_socket);
    }
-
-   if ((row = sql_fetch_row(mdb)) == NULL) {
-      Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
-      Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
+   m_db_port = db_port;
+   if (disable_batch_insert) {
+      m_disabled_batch_insert = true;
+      m_have_batch_insert = false;
    } else {
-      ret = bstrcmp(row[0], "SQL_ASCII");
-      if (!ret) {
-         Mmsg(mdb->errmsg, 
-              _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
-              mdb->db_name, row[0]);
-         Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
-         Dmsg1(50, "%s", mdb->errmsg);
-      } 
-   }
-   return ret;
-*/
-    return true;
+      m_disabled_batch_insert = false;
+#if defined(USE_BATCH_FILE_INSERT)
+      m_have_batch_insert = true;
+#else
+      m_have_batch_insert = false;
+#endif
+   }
+
+   errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
+   *errmsg = 0;
+   cmd = get_pool_memory(PM_EMSG); /* get command buffer */
+   cached_path = get_pool_memory(PM_FNAME);
+   cached_path_id = 0;
+   m_ref_count = 1;
+   fname = get_pool_memory(PM_FNAME);
+   path = get_pool_memory(PM_FNAME);
+   esc_name = get_pool_memory(PM_FNAME);
+   esc_path = get_pool_memory(PM_FNAME);
+   esc_obj = get_pool_memory(PM_FNAME);
+   m_allow_transactions = mult_db_connections;
+
+   /*
+    * Initialize the private members.
+    */
+   m_db_handle = NULL;
+   m_result = NULL;
+   m_explicit_commit = true;
+   m_session_id = ++next_session_id;
+   m_query_filters = db_initialize_query_filters(jcr);
+
+   /*
+    * Put the db in the list.
+    */
+   if (db_list == NULL) {
+      db_list = New(dlist(this, &this->m_link));
+   }
+   db_list->append(this);
 }
 
-/*
- * Check for errors in DBMS work
- */
-static int sql_check(B_DB *mdb)
+B_DB_INGRES::~B_DB_INGRES()
 {
-    return INGcheck();
 }
 
 /*
  * Now actually open the database.  This can generate errors,
  *   which are returned in the errmsg
  *
- * DO NOT close the database or free(mdb) here !!!!
+ * DO NOT close the database or delete mdb here !!!!
  */
-int
-db_open_database(JCR *jcr, B_DB *mdb)
+bool B_DB_INGRES::db_open_database(JCR *jcr)
 {
+   bool retval = false;
    int errstat;
-   char buf[10], *port;
 
    P(mutex);
-   if (mdb->connected) {
-      V(mutex);
-      return 1;
+   if (m_connected) {
+      retval = true;
+      goto bail_out;
    }
-   mdb->connected = false;
 
-   if ((errstat=rwl_init(&mdb->lock)) != 0) {
+   if ((errstat=rwl_init(&m_lock)) != 0) {
       berrno be;
-      Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
+      Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
             be.bstrerror(errstat));
-      V(mutex);
-      return 0;
-   }
-
-   if (mdb->db_port) {
-      bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
-      port = buf;
-   } else {
-      port = NULL;
+      goto bail_out;
    }
 
-   mdb->db = INGconnectDB(mdb->db_name, mdb->db_user, mdb->db_password);
+   m_db_handle = INGconnectDB(m_db_name, m_db_user, m_db_password, m_session_id);
 
    Dmsg0(50, "Ingres real CONNECT done\n");
-   Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
-            mdb->db_password==NULL?"(NULL)":mdb->db_password);
+   Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
+              m_db_password == NULL ? "(NULL)" : m_db_password);
 
-   if (sql_check(mdb)) {
-      Mmsg2(&mdb->errmsg, _("Unable to connect to Ingres server.\n"
+   if (!m_db_handle) {
+      Mmsg2(&errmsg, _("Unable to connect to Ingres server.\n"
             "Database=%s User=%s\n"
             "It is probably not running or your password is incorrect.\n"),
-             mdb->db_name, mdb->db_user);
-      V(mutex);
-      return 0;
+             m_db_name, m_db_user);
+      goto bail_out;
    }
 
-   mdb->connected = true;
+   m_connected = true;
+
+   INGsetDefaultLockingMode(m_db_handle);
 
-   if (!check_tables_version(jcr, mdb)) {
-      V(mutex);
-      return 0;
+   if (!check_tables_version(jcr, this)) {
+      goto bail_out;
    }
 
-   //sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
-   
-   /* check that encoding is SQL_ASCII */
-   check_database_encoding(jcr, mdb);
+   retval = true;
 
+bail_out:
    V(mutex);
-   return 1;
+   return retval;
 }
 
-void
-db_close_database(JCR *jcr, B_DB *mdb)
+void B_DB_INGRES::db_close_database(JCR *jcr)
 {
-   if (!mdb) {
-      return;
-   }
-   db_end_transaction(jcr, mdb);
+   db_end_transaction(jcr);
    P(mutex);
-   sql_free_result(mdb);
-   mdb->ref_count--;
-   if (mdb->ref_count == 0) {
-      qdchain(&mdb->bq);
-      if (mdb->connected && mdb->db) {
-         sql_close(mdb);
+   m_ref_count--;
+   if (m_ref_count == 0) {
+      sql_free_result();
+      db_list->remove(this);
+      if (m_connected && m_db_handle) {
+         INGdisconnectDB(m_db_handle);
       }
-      rwl_destroy(&mdb->lock);
-      free_pool_memory(mdb->errmsg);
-      free_pool_memory(mdb->cmd);
-      free_pool_memory(mdb->cached_path);
-      free_pool_memory(mdb->fname);
-      free_pool_memory(mdb->path);
-      free_pool_memory(mdb->esc_name);
-      free_pool_memory(mdb->esc_path);
-      if (mdb->db_name) {
-         free(mdb->db_name);
+      if (m_query_filters) {
+         db_destroy_query_filters(m_query_filters);
       }
-      if (mdb->db_user) {
-         free(mdb->db_user);
+      rwl_destroy(&m_lock);
+      free_pool_memory(errmsg);
+      free_pool_memory(cmd);
+      free_pool_memory(cached_path);
+      free_pool_memory(fname);
+      free_pool_memory(path);
+      free_pool_memory(esc_name);
+      free_pool_memory(esc_path);
+      free_pool_memory(esc_obj);
+      free(m_db_driver);
+      free(m_db_name);
+      free(m_db_user);
+      if (m_db_password) {
+         free(m_db_password);
       }
-      if (mdb->db_password) {
-         free(mdb->db_password);
+      if (m_db_address) {
+         free(m_db_address);
       }
-      if (mdb->db_address) {
-         free(mdb->db_address);
+      if (m_db_socket) {
+         free(m_db_socket);
       }
-      if (mdb->db_socket) {
-         free(mdb->db_socket);
+      delete this;
+      if (db_list->size() == 0) {
+         delete db_list;
+         db_list = NULL;
       }
-      free(mdb);
    }
    V(mutex);
 }
 
-void db_thread_cleanup()
-{ }
+void B_DB_INGRES::db_thread_cleanup(void)
+{
+}
 
 /*
- * Return the next unique index (auto-increment) for
- * the given table.  Return NULL on error.
+ * Escape strings so that Ingres is happy
  *
- * For Ingres, NULL causes the auto-increment value    SRE: true?
- *  to be updated.
+ *   NOTE! len is the length of the old string. Your new
+ *         string must be long enough (max 2*old+1) to hold
+ *         the escaped output.
  */
-int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
+void B_DB_INGRES::db_escape_string(JCR *jcr, char *snew, char *old, int len)
 {
-   strcpy(index, "NULL");
-   return 1;
+   char *n, *o;
+
+   n = snew;
+   o = old;
+   while (len--) {
+      switch (*o) {
+      case '\'':
+         *n++ = '\'';
+         *n++ = '\'';
+         o++;
+         break;
+      case 0:
+         *n++ = '\\';
+         *n++ = 0;
+         o++;
+         break;
+      default:
+         *n++ = *o++;
+         break;
+      }
+   }
+   *n = 0;
 }
 
+/*
+ * Escape binary so that Ingres is happy
+ *
+ *   NOTE! Need to be implemented (escape \0)
+ *
+ */
+char *B_DB_INGRES::db_escape_object(JCR *jcr, char *old, int len)
+{
+   char *n, *o;
+
+   n = esc_obj = check_pool_memory_size(esc_obj, len*2+1);
+   o = old;
+   while (len--) {
+      switch (*o) {
+      case '\'':
+         *n++ = '\'';
+         *n++ = '\'';
+         o++;
+         break;
+      case 0:
+         *n++ = '\\';
+         *n++ = 0;
+         o++;
+         break;
+      default:
+         *n++ = *o++;
+         break;
+      }
+   }
+   *n = 0;
+   return esc_obj;
+}
 
 /*
- * Escape strings so that Ingres is happy
+ * Unescape binary object so that Ingres is happy
  *
- *   NOTE! len is the length of the old string. Your new
- *         string must be long enough (max 2*old+1) to hold
- *         the escaped output.
- * SRE: TODO! 
+ * TODO: need to be implemented (escape \0)
  */
-void
-db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
+void B_DB_INGRES::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
+                                     POOLMEM **dest, int32_t *dest_len)
 {
+   if (!from) {
+      *dest[0] = 0;
+      *dest_len = 0;
+      return;
+   }
+   *dest = check_pool_memory_size(*dest, expected_len+1);
+   *dest_len = expected_len;
+   memcpy(*dest, from, expected_len);
+   (*dest)[expected_len]=0;
+}
+
 /*
-   int error;
-  
-   PQescapeStringConn(mdb->db, snew, old, len, &error);
-   if (error) {
-      Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));*/
-      /* error on encoding, probably invalid multibyte encoding in the source string
-        see PQescapeStringConn documentation for details. */
-/*      Dmsg0(500, "PQescapeStringConn failed\n");
-   }*/
+ * Start a transaction. This groups inserts and makes things
+ * much more efficient. Usually started when inserting
+ * file attributes.
+ */
+void B_DB_INGRES::db_start_transaction(JCR *jcr)
+{
+   if (!jcr->attr) {
+      jcr->attr = get_pool_memory(PM_FNAME);
+   }
+   if (!jcr->ar) {
+      jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
+   }
+
+   if (!m_allow_transactions) {
+      return;
+   }
+
+   db_lock(this);
+   /* Allow only 25,000 changes per transaction */
+   if (m_transaction && changes > 25000) {
+      db_end_transaction(jcr);
+   }
+   if (!m_transaction) {
+      sql_query("BEGIN");  /* begin transaction */
+      Dmsg0(400, "Start Ingres transaction\n");
+      m_transaction = true;
+   }
+   db_unlock(this);
+}
+
+void B_DB_INGRES::db_end_transaction(JCR *jcr)
+{
+   if (jcr && jcr->cached_attribute) {
+      Dmsg0(400, "Flush last cached attribute.\n");
+      if (!db_create_attributes_record(jcr, this, jcr->ar)) {
+         Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
+      }
+      jcr->cached_attribute = false;
+   }
+
+   if (!m_allow_transactions) {
+      return;
+   }
+
+   db_lock(this);
+   if (m_transaction) {
+      sql_query("COMMIT"); /* end transaction */
+      m_transaction = false;
+      Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
+   }
+   changes = 0;
+   db_unlock(this);
 }
 
 /*
  * Submit a general SQL command (cmd), and for each row returned,
- *  the sqlite_handler is called with the ctx.
+ *  the result_handler is called with the ctx.
  */
-bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
+bool B_DB_INGRES::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
 {
    SQL_ROW row;
+   bool retval = true;
 
-   Dmsg0(500, "db_sql_query started\n");
+   Dmsg1(500, "db_sql_query starts with %s\n", query);
 
-   db_lock(mdb);
-   if (sql_query(mdb, query) != 0) {
-      Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
-      db_unlock(mdb);
+   db_lock(this);
+   if (!sql_query(query, QF_STORE_RESULT)) {
+      Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
       Dmsg0(500, "db_sql_query failed\n");
-      return false;
+      retval = false;
+      goto bail_out;
    }
-   Dmsg0(500, "db_sql_query succeeded. checking handler\n");
 
    if (result_handler != NULL) {
       Dmsg0(500, "db_sql_query invoking handler\n");
-      if ((mdb->result = sql_store_result(mdb)) != NULL) {
-         int num_fields = sql_num_fields(mdb);
-
-         Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
-         while ((row = sql_fetch_row(mdb)) != NULL) {
-
-            Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
-            if (result_handler(ctx, num_fields, row))
-               break;
-         }
-
-        sql_free_result(mdb);
+      while ((row = sql_fetch_row()) != NULL) {
+         Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
+         if (result_handler(ctx, m_num_fields, row))
+            break;
       }
+      sql_free_result();
    }
-   db_unlock(mdb);
 
    Dmsg0(500, "db_sql_query finished\n");
 
-   return true;
+bail_out:
+   db_unlock(this);
+   return retval;
 }
 
 /*
- * Close database connection
+ * Note, if this routine returns false (failure), Bacula expects
+ * that no result has been stored.
+ *
+ *  Returns:  true  on success
+ *            false on failure
+ *
  */
-void my_ingres_close(B_DB *mdb)
-{
-    INGdisconnectDB(mdb->db);
-    //SRE: error handling? 
-}
-
-INGRES_ROW my_ingres_fetch_row(B_DB *mdb)
+bool B_DB_INGRES::sql_query(const char *query, int flags)
 {
-   int j;
-   INGRES_ROW row = NULL; // by default, return NULL
-
-   Dmsg0(500, "my_ingres_fetch_row start\n");
+   int cols;
+   char *cp, *bp;
+   char *dup_query, *new_query;
+   bool retval = true;
+   bool start_of_transaction = false;
+   bool end_of_transaction = false;
+   B_DB_RWRULE *rewrite_rule;
+
+   Dmsg1(500, "query starts with '%s'\n", query);
+   /*
+    * We always make a private copy of the query as we are doing serious
+    * rewrites in this engine. When running the private copy through the
+    * different query filters we loose the orginal private copy so we
+    * first make a extra reference to it so we can free it on exit from the
+    * function.
+    */
+   dup_query = new_query = bstrdup(query);
+
+   /*
+    * Iterate over the query string and perform any needed operations.
+    * We use a sliding window over the query string where bp points to
+    * the previous position in the query and cp to the current position
+    * in the query.
+    */
+   bp = new_query;
+   while (bp != NULL) {
+      if ((cp = strchr(bp, ' ')) != NULL) {
+         *cp++;
+      }
 
-   if (!mdb->row || mdb->row_size < mdb->num_fields) {
-      int num_fields = mdb->num_fields;
-      Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
+      if (!strncasecmp(bp, "BEGIN", 5)) {
+         /*
+          * This is the start of a transaction.
+          * Inline copy the rest of the query over the BEGIN keyword.
+          */
+         if (cp) {
+            strcpy(bp, cp);
+         } else {
+            *bp = '\0';
+         }
+         start_of_transaction = true;
+      } else if (!strncasecmp(bp, "COMMIT", 6) && (cp == NULL || strncasecmp(cp, "PRESERVE", 8))) {
+         /*
+          * This is the end of a transaction. We cannot check for just the COMMIT
+          * keyword as a DECLARE of an tempory table also has the word COMMIT in it
+          * but its followed by the word PRESERVE.
+          * Inline copy the rest of the query over the COMMIT keyword.
+          */
+         if (cp) {
+            strcpy(bp, cp);
+         } else {
+            *bp = '\0';
+         }
+         end_of_transaction = true;
+      }
 
-      if (mdb->row) {
-         Dmsg0(500, "my_ingres_fetch_row freeing space\n");
-         free(mdb->row);
+      /*
+       * See what query filter might match.
+       */
+      foreach_alist(rewrite_rule, m_query_filters) {
+         if (!strncasecmp(bp, rewrite_rule->search_pattern, rewrite_rule->pattern_length)) {
+            rewrite_rule->trigger = true;
+         }
       }
-      num_fields += 20;                  /* add a bit extra */
-      mdb->row = (INGRES_ROW)malloc(sizeof(char *) * num_fields);
-      mdb->row_size = num_fields;
 
-      // now reset the row_number now that we have the space allocated
-      mdb->row_number = 0;
+      /*
+       * Slide window.
+       */
+      bp = cp;
    }
 
-   // if still within the result set
-   if (mdb->row_number < mdb->num_rows) {
-      Dmsg2(500, "my_ingres_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
-      // get each value from this row
-      for (j = 0; j < mdb->num_fields; j++) {
-         mdb->row[j] = INGgetvalue(mdb->result, mdb->row_number, j);
-         Dmsg2(500, "my_ingres_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
+   /*
+    * Run the query through all query filters that apply e.g. have the trigger set in the
+    * previous loop.
+    */
+   foreach_alist(rewrite_rule, m_query_filters) {
+      if (rewrite_rule->trigger) {
+         new_query = rewrite_rule->rewrite_regexp->replace(new_query);
+         rewrite_rule->trigger = false;
       }
-      // increment the row number for the next call
-      mdb->row_number++;
-
-      row = mdb->row;
-   } else {
-      Dmsg2(500, "my_ingres_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
    }
 
-   Dmsg1(500, "my_ingres_fetch_row finishes returning %p\n", row);
+   if (start_of_transaction) {
+      Dmsg0(500,"sql_query: Start of transaction\n");
+      m_explicit_commit = false;
+   }
 
-   return row;
-}
+   /*
+    * See if there is any query left after filtering for certain keywords.
+    */
+   bp = new_query;
+   while (bp != NULL && strlen(bp) > 0) {
+      /*
+       * We are starting a new query.  reset everything.
+       */
+      m_num_rows     = -1;
+      m_row_number   = -1;
+      m_field_number = -1;
+
+      if (m_result) {
+         INGclear(m_result);  /* hmm, someone forgot to free?? */
+         m_result = NULL;
+      }
 
+      /*
+       * See if this is a multi-statement query. We split a multi-statement query
+       * on the semi-column and feed the individual queries to the Ingres functions.
+       * We use a sliding window over the query string where bp points to
+       * the previous position in the query and cp to the current position
+       * in the query.
+       */
+      if ((cp = strchr(bp, ';')) != NULL) {
+         *cp++ = '\0';
+      }
 
-int my_ingres_max_length(B_DB *mdb, int field_num) {
-   //
-   // for a given column, find the max length
-   //
-   int max_length;
-   int i;
-   int this_length;
+      Dmsg1(500, "sql_query after rewrite continues with '%s'\n", bp);
 
-   max_length = 0;
-   for (i = 0; i < mdb->num_rows; i++) {
-      if (INGgetisnull(mdb->result, i, field_num)) {
-          this_length = 4;        // "NULL"
+      /*
+       * See if we got a store_result hint which could mean we are running a select.
+       * If flags has QF_STORE_RESULT not set we are sure its not a query that we
+       * need to store anything for.
+       */
+      if (flags & QF_STORE_RESULT) {
+         cols = INGgetCols(m_db_handle, bp, m_explicit_commit);
       } else {
-          this_length = cstrlen(INGgetvalue(mdb->result, i, field_num));
+         cols = 0;
       }
 
-      if (max_length < this_length) {
-          max_length = this_length;
+      if (cols <= 0) {
+         if (cols < 0 ) {
+            Dmsg0(500,"sql_query: neg.columns: no DML stmt!\n");
+            retval = false;
+            goto bail_out;
+         }
+         Dmsg0(500,"sql_query (non SELECT) starting...\n");
+         /*
+          * non SELECT
+          */
+         m_num_rows = INGexec(m_db_handle, bp, m_explicit_commit);
+         if (m_num_rows == -1) {
+           Dmsg0(500,"sql_query (non SELECT) went wrong\n");
+           retval = false;
+           goto bail_out;
+         } else {
+           Dmsg0(500,"sql_query (non SELECT) seems ok\n");
+         }
+      } else {
+         /*
+          * SELECT
+          */
+         Dmsg0(500,"sql_query (SELECT) starting...\n");
+         m_result = INGquery(m_db_handle, bp, m_explicit_commit);
+         if (m_result != NULL) {
+            Dmsg0(500, "we have a result\n");
+
+            /*
+             * How many fields in the set?
+             */
+            m_num_fields = (int)INGnfields(m_result);
+            Dmsg1(500, "we have %d fields\n", m_num_fields);
+
+            m_num_rows = INGntuples(m_result);
+            Dmsg1(500, "we have %d rows\n", m_num_rows);
+         } else {
+            Dmsg0(500, "No resultset...\n");
+            retval = false;
+            goto bail_out;
+         }
       }
+
+      bp = cp;
    }
 
-   return max_length;
+bail_out:
+   if (end_of_transaction) {
+      Dmsg0(500,"sql_query: End of transaction, commiting work\n");
+      m_explicit_commit = true;
+      INGcommit(m_db_handle);
+   }
+
+   free(dup_query);
+   Dmsg0(500, "sql_query finishing\n");
+
+   return retval;
+}  
+
+void B_DB_INGRES::sql_free_result(void)
+{
+   db_lock(this);
+   if (m_result) {
+      INGclear(m_result);
+      m_result = NULL;
+   }
+   if (m_rows) {
+      free(m_rows);
+      m_rows = NULL;
+   }
+   if (m_fields) {
+      free(m_fields);
+      m_fields = NULL;
+   }
+   m_num_rows = m_num_fields = 0;
+   db_unlock(this);
 }
 
-INGRES_FIELD * my_ingres_fetch_field(B_DB *mdb)
+SQL_ROW B_DB_INGRES::sql_fetch_row(void)
 {
-   int     i;
+   int j;
+   SQL_ROW row = NULL; /* by default, return NULL */
 
-   Dmsg0(500, "my_ingres_fetch_field starts\n");
+   if (!m_result) {
+      return row;
+   }
+   if (m_result->num_rows <= 0) {
+      return row;
+   }
 
-   if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
-      if (mdb->fields) {
-         free(mdb->fields);
+   Dmsg0(500, "sql_fetch_row start\n");
+
+   if (!m_rows || m_rows_size < m_num_fields) {
+      if (m_rows) {
+         Dmsg0(500, "sql_fetch_row freeing space\n");
+         free(m_rows);
       }
-      Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
-      mdb->fields = (INGRES_FIELD *)malloc(sizeof(INGRES_FIELD) * mdb->num_fields);
-      mdb->fields_size = mdb->num_fields;
+      Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
+      m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
+      m_rows_size = m_num_fields;
+
+      /*
+       * Now reset the row_number now that we have the space allocated
+       */
+      m_row_number = 0;
+   }
 
-      for (i = 0; i < mdb->num_fields; i++) {
-         Dmsg1(500, "filling field %d\n", i);
-         strcpy(mdb->fields[i].name,INGfname(mdb->result, i));
-         mdb->fields[i].max_length = my_ingres_max_length(mdb, i);
-         mdb->fields[i].type       = INGftype(mdb->result, i);
-         mdb->fields[i].flags      = 0;
+   /*
+    * If still within the result set
+    */
+   if (m_row_number < m_num_rows) {
+      Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
+      /*
+       * Get each value from this row
+       */
+      for (j = 0; j < m_num_fields; j++) {
+         m_rows[j] = INGgetvalue(m_result, m_row_number, j);
+         Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
+      }
+      /*
+       * Increment the row number for the next call
+       */
+      m_row_number++;
 
-         Dmsg4(500, "my_ingres_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
-            mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
-            mdb->fields[i].flags);
-      } // end for
-   } // end if
+      row = m_rows;
+   } else {
+      Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
+   }
 
-   // increment field number for the next time around
+   Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
 
-   Dmsg0(500, "my_ingres_fetch_field finishes\n");
-   return &mdb->fields[mdb->field_number++];
+   return row;
+}
+
+const char *B_DB_INGRES::sql_strerror(void)
+{
+   return INGerrorMessage(m_db_handle);
 }
 
-void my_ingres_data_seek(B_DB *mdb, int row)
+void B_DB_INGRES::sql_data_seek(int row)
 {
-   // set the row number to be returned on the next call
-   // to my_ingres_fetch_row
-   mdb->row_number = row;
+   /*
+    * Set the row number to be returned on the next call to sql_fetch_row
+    */
+   m_row_number = row;
 }
 
-void my_ingres_field_seek(B_DB *mdb, int field)
+int B_DB_INGRES::sql_affected_rows(void)
 {
-   mdb->field_number = field;
+   return m_num_rows;
 }
 
 /*
- * Note, if this routine returns 1 (failure), Bacula expects
- *  that no result has been stored.
- * This is where QUERY_DB comes with Ingres.   SRE: true?
- *
- *  Returns:  0  on success
- *            1  on failure
- *
+ * First execute the insert query and then retrieve the currval.
+ * By setting transaction to true we make it an atomic transaction
+ * and as such we can get the currval after which we commit if
+ * transaction is false. This way things are an atomic operation
+ * for Ingres and things work. We save the current transaction status
+ * and set transaction in the mdb to true and at the end of this
+ * function we restore the actual transaction status.
  */
-int my_ingres_query(B_DB *mdb, const char *query)
+uint64_t B_DB_INGRES::sql_insert_autokey_record(const char *query, const char *table_name)
 {
-   Dmsg0(500, "my_ingres_query started\n");
-   // We are starting a new query.  reset everything.
-   mdb->num_rows     = -1;
-   mdb->row_number   = -1;
-   mdb->field_number = -1;
+   char sequence[64];
+   char getkeyval_query[256];
+   char *currval;
+   uint64_t id = 0;
+   bool current_explicit_commit;
+
+   /*
+    * Save the current transaction status and pretend we are in a transaction.
+    */
+   current_explicit_commit = m_explicit_commit;
+   m_explicit_commit = false;
+
+   /*
+    * Execute the INSERT query.
+    */
+   m_num_rows = INGexec(m_db_handle, query, m_explicit_commit);
+   if (m_num_rows == -1) {
+      goto bail_out;
+   }
 
-   if (mdb->result) {
-      INGclear(mdb->result);  /* hmm, someone forgot to free?? */
-      mdb->result = NULL;
+   changes++;
+
+   /*
+    * Obtain the current value of the sequence that
+    * provides the serial value for primary key of the table.
+    *
+    * currval is local to our session. It is not affected by
+    * other transactions.
+    *
+    * Determine the name of the sequence.
+    * As we name all sequences as <table>_seq this is easy.
+    */
+   bstrncpy(sequence, table_name, sizeof(sequence));
+   bstrncat(sequence, "_seq", sizeof(sequence));
+
+   bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT %s.currval FROM %s", sequence, table_name);
+
+   if (m_result) {
+      INGclear(m_result);
+      m_result = NULL;
    }
+   m_result = INGquery(m_db_handle, getkeyval_query, m_explicit_commit);
 
-   Dmsg1(500, "my_ingres_query starts with '%s'\n", query);
-   mdb->result = INGexec(mdb->db, query);
-   if (!mdb->result) {
-      Dmsg1(50, "Query failed: %s\n", query);
+   if (!m_result) {
+      Dmsg1(50, "Query failed: %s\n", getkeyval_query);
       goto bail_out;
    }
 
-   mdb->status = INGresultStatus(mdb->result);
-   if (mdb->status == ING_COMMAND_OK) {
-      Dmsg1(500, "we have a result\n", query);
+   Dmsg0(500, "exec done");
 
-      // how many fields in the set?
-      mdb->num_fields = (int)INGnfields(mdb->result);
-      Dmsg1(500, "we have %d fields\n", mdb->num_fields);
-
-      mdb->num_rows = INGntuples(mdb->result);
-      Dmsg1(500, "we have %d rows\n", mdb->num_rows);
-
-      mdb->status = 0;                  /* succeed */
-   } else {
-      Dmsg1(50, "Result status failed: %s\n", query);
-      goto bail_out;
+   currval = INGgetvalue(m_result, 0, 0);
+   if (currval) {
+      id = str_to_uint64(currval);
    }
 
-   Dmsg0(500, "my_ingres_query finishing\n");
-   return mdb->status;
+   INGclear(m_result);
+   m_result = NULL;
 
 bail_out:
-   Dmsg1(500, "we failed\n", query);
-   INGclear(mdb->result);
-   mdb->result = NULL;
-   mdb->status = 1;                   /* failed */
-   return mdb->status;
-}
-
-void my_ingres_free_result(B_DB *mdb)
-{
-   
-   db_lock(mdb);
-   if (mdb->result) {
-      INGclear(mdb->result);
-      mdb->result = NULL;
+   /*
+    * Restore the actual explicit_commit status.
+    */
+   m_explicit_commit = current_explicit_commit;
+
+   /*
+    * Commit if explicit_commit is not set.
+    */
+   if (m_explicit_commit) {
+      INGcommit(m_db_handle);
    }
 
-   if (mdb->row) {
-      free(mdb->row);
-      mdb->row = NULL;
-   }
-
-   if (mdb->fields) {
-      free(mdb->fields);
-      mdb->fields = NULL;
-   }
-   db_unlock(mdb);
+   return id;
 }
 
-int my_ingres_currval(B_DB *mdb, const char *table_name)
+SQL_FIELD *B_DB_INGRES::sql_fetch_field(void)
 {
-   // TODO!
-   return -1;
-}
+   int i, j;
+   int max_length;
+   int this_length;
 
-#ifdef HAVE_BATCH_FILE_INSERT
+   if (!m_fields || m_fields_size < m_num_fields) {
+      if (m_fields) {
+         free(m_fields);
+         m_fields = NULL;
+      }
+      Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
+      m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
+      m_fields_size = m_num_fields;
 
-int my_ingres_batch_start(JCR *jcr, B_DB *mdb)
-{
-    //TODO!
-   return ING_ERROR;
+      for (i = 0; i < m_num_fields; i++) {
+         Dmsg1(500, "filling field %d\n", i);
+         m_fields[i].name = INGfname(m_result, i);
+         m_fields[i].type = INGftype(m_result, i);
+         m_fields[i].flags = 0;
+
+         /*
+          * For a given column, find the max length.
+          */
+         max_length = 0;
+         for (j = 0; j < m_num_rows; j++) {
+            if (INGgetisnull(m_result, j, i)) {
+                this_length = 4;        /* "NULL" */
+            } else {
+                this_length = cstrlen(INGgetvalue(m_result, j, i));
+            }
+
+            if (max_length < this_length) {
+               max_length = this_length;
+            }
+         }
+         m_fields[i].max_length = max_length;
+
+         Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
+               m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
+      }
+   }
+
+   /*
+    * Increment field number for the next time around
+    */
+   return &m_fields[m_field_number++];
 }
 
-/* set error to something to abort operation */
-int my_ingres_batch_end(JCR *jcr, B_DB *mdb, const char *error)
+bool B_DB_INGRES::sql_field_is_not_null(int field_type)
 {
-    //TODO!
-   return ING_ERROR;
+   switch (field_type) {
+   case 1:
+      return true;
+   default:
+      return false;
+   }
 }
 
-int my_ingres_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+bool B_DB_INGRES::sql_field_is_numeric(int field_type)
 {
-    //TODO!
-   return ING_ERROR;
+   /*
+    * See ${II_SYSTEM}/ingres/files/eqsqlda.h for numeric types.
+    */
+   switch (field_type) {
+   case IISQ_DEC_TYPE:
+   case IISQ_INT_TYPE:
+   case IISQ_FLT_TYPE:
+      return true;
+   default:
+      return false;
+   }
 }
 
-#endif /* HAVE_BATCH_FILE_INSERT */
-
 /*
  * Escape strings so that Ingres is happy on COPY
  *
@@ -596,7 +958,7 @@ int my_ingres_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
  *         string must be long enough (max 2*old+1) to hold
  *         the escaped output.
  */
-char *my_ingres_copy_escape(char *dest, char *src, size_t len)
+static char *ingres_copy_escape(char *dest, char *src, size_t len)
 {
    /* we have to escape \t, \n, \r, \ */
    char c = '\0' ;
@@ -636,29 +998,103 @@ char *my_ingres_copy_escape(char *dest, char *src, size_t len)
    return dest;
 }
 
-#ifdef HAVE_BATCH_FILE_INSERT
-const char *my_ingres_batch_lock_path_query = 
-   "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
+/* 
+ * Returns true if OK
+ *         false if failed
+ */
+bool B_DB_INGRES::sql_batch_start(JCR *jcr)
+{
+   bool ok;
+
+   db_lock(this);
+   ok = sql_query("DECLARE GLOBAL TEMPORARY TABLE batch ("
+                           "FileIndex INTEGER,"
+                           "JobId INTEGER,"
+                           "Path VARBYTE(32000),"
+                           "Name VARBYTE(32000),"
+                           "LStat VARBYTE(255),"
+                           "MD5 VARBYTE(255),"
+                           "DeltaSeq SMALLINT)"
+                           " ON COMMIT PRESERVE ROWS WITH NORECOVERY");
+   db_unlock(this);
+   return ok;
+}
 
+/* 
+ * Returns true if OK
+ *         false if failed
+ */
+bool B_DB_INGRES::sql_batch_end(JCR *jcr, const char *error)
+{
+   m_status = 0;
+   return true;
+}
+
+/* 
+ * Returns true if OK
+ *         false if failed
+ */
+bool B_DB_INGRES::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
+{
+   size_t len;
+   const char *digest;
+   char ed1[50];
+
+   esc_name = check_pool_memory_size(esc_name, fnl*2+1);
+   db_escape_string(jcr, esc_name, fname, fnl);
+
+   esc_path = check_pool_memory_size(esc_path, pnl*2+1);
+   db_escape_string(jcr, esc_path, path, pnl);
+
+   if (ar->Digest == NULL || ar->Digest[0] == 0) {
+      digest = "0";
+   } else {
+      digest = ar->Digest;
+   }
 
-const char *my_ingres_batch_lock_filename_query = 
-   "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
+   len = Mmsg(cmd, "INSERT INTO batch VALUES "
+                   "(%u,%s,'%s','%s','%s','%s',%u)",
+                   ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
+                   esc_name, ar->attr, digest, ar->DeltaSeq);
+
+   return sql_query(cmd);
+}
 
-const char *my_ingres_batch_unlock_tables_query = "COMMIT";
+/*
+ * Initialize database data structure. In principal this should
+ * never have errors, or it is really fatal.
+ */
+B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
+                       const char *db_password, const char *db_address, int db_port,
+                       const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
+{
+   B_DB_INGRES *mdb = NULL;
 
-const char *my_ingres_batch_fill_path_query = 
-   "INSERT INTO Path (Path) "
-    "SELECT a.Path FROM "
-     "(SELECT DISTINCT Path FROM batch) AS a "
-      "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
+   if (!db_user) {
+      Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
+      return NULL;
+   }
 
+   P(mutex);                          /* lock DB queue */
+   if (db_list && !mult_db_connections) {
+      /*
+       * Look to see if DB already open
+       */
+      foreach_dlist(mdb, db_list) {
+         if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
+            Dmsg1(100, "DB REopen %s\n", db_name);
+            mdb->increment_refcount();
+            goto bail_out;
+         }
+      }
+   }
 
-const char *my_ingres_batch_fill_filename_query = 
-   "INSERT INTO Filename (Name) "
-    "SELECT a.Name FROM "
-     "(SELECT DISTINCT Name FROM batch) as a "
-      "WHERE NOT EXISTS "
-       "(SELECT Name FROM Filename WHERE Name = a.Name)";
-#endif /* HAVE_BATCH_FILE_INSERT */
+   Dmsg0(100, "db_init_database first time\n");
+   mdb = New(B_DB_INGRES(jcr, db_driver, db_name, db_user, db_password, db_address,
+                         db_port, db_socket, mult_db_connections, disable_batch_insert));
 
+bail_out:
+   V(mutex);
+   return mdb;
+}
 #endif /* HAVE_INGRES */