]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/postgresql.c
Big backport from Enterprise
[bacula/bacula] / bacula / src / cats / postgresql.c
index 618b45b0abb938a8ee5b010ef6299b51a20e7ca8..dabc4b827bf8c446635956b5f95615f75124c4f4 100644 (file)
@@ -1,29 +1,20 @@
 /*
-   Bacula® - The Network Backup Solution
-
-   Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
-
-   The main author of Bacula is Kern Sibbald, with contributions from
-   many others, a complete list can be found in the file AUTHORS.
-   This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version three of the GNU Affero General Public
-   License as published by the Free Software Foundation and included
-   in the file LICENSE.
-
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-   General Public License for more details.
-
-   You should have received a copy of the GNU Affero General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA.
-
-   Bacula® is a registered trademark of Kern Sibbald.
-   The licensor of Bacula is the Free Software Foundation Europe
-   (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
-   Switzerland, email:ftf@fsfeurope.org.
+   Bacula(R) - The Network Backup Solution
+
+   Copyright (C) 2000-2017 Kern Sibbald
+
+   The original author of Bacula is Kern Sibbald, with contributions
+   from many others, a complete list can be found in the file AUTHORS.
+
+   You may use this file and others of this release according to the
+   license defined in the LICENSE file, which includes the Affero General
+   Public License, v3.0 ("AGPLv3") and some additional permissions and
+   terms pursuant to its AGPLv3 Section 7.
+
+   This notice must be preserved when any source code is 
+   conveyed and/or propagated.
+
+   Bacula(R) is a registered trademark of Kern Sibbald.
 */
 /*
  * Bacula Catalog Database routines specific to PostgreSQL
  *    Dan Langille, December 2003
  *    based upon work done by Kern Sibbald, March 2000
  *
- * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
+ * Note: at one point, this file was changed to class based by a certain 
+ *  programmer, and other than "wrapping" in a class, which is a trivial
+ *  change for a C++ programmer, nothing substantial was done, yet all the
+ *  code was recommitted under this programmer's name.  Consequently, we
+ *  undo those changes here.  Unfortunately, it is too difficult to put
+ *  back the original author's name (Dan Langille) on the parts he wrote.
  */
 
 #include "bacula.h"
 
 #ifdef HAVE_POSTGRESQL
 
-#include "cats.h"
-#include "bdb_priv.h"
-#include "libpq-fe.h"
-#include "postgres_ext.h"       /* needed for NAMEDATALEN */
-#include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
-#include "bdb_postgresql.h"
+#include  "cats.h"
+#include  "libpq-fe.h"
+#include  "postgres_ext.h"       /* needed for NAMEDATALEN */
+#include  "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
+#define __BDB_POSTGRESQL_H_ 1
+#include  "bdb_postgresql.h"
+
+#define dbglvl_dbg   DT_SQL|100
+#define dbglvl_info  DT_SQL|50
+#define dbglvl_err   DT_SQL|10
 
 /* -----------------------------------------------------------------------
  *
  * -----------------------------------------------------------------------
  */
 
+/* List of open databases */
+static dlist *db_list = NULL; 
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
+
+BDB_POSTGRESQL::BDB_POSTGRESQL(): BDB()
+{
+   BDB_POSTGRESQL *mdb = this;
+
+   if (db_list == NULL) {
+      db_list = New(dlist(mdb, &mdb->m_link));
+   }
+   mdb->m_db_driver_type = SQL_DRIVER_TYPE_POSTGRESQL;
+   mdb->m_db_type = SQL_TYPE_POSTGRESQL;
+   mdb->m_db_driver = bstrdup("PostgreSQL");
+
+   mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
+   mdb->errmsg[0] = 0;
+   mdb->cmd = get_pool_memory(PM_EMSG);    /* get command buffer */
+   mdb->cached_path = get_pool_memory(PM_FNAME);
+   mdb->cached_path_id = 0;
+   mdb->m_ref_count = 1;
+   mdb->fname = get_pool_memory(PM_FNAME);
+   mdb->path = get_pool_memory(PM_FNAME);
+   mdb->esc_name = get_pool_memory(PM_FNAME);
+   mdb->esc_path = get_pool_memory(PM_FNAME);
+   mdb->esc_obj = get_pool_memory(PM_FNAME);
+   mdb->m_use_fatal_jmsg = true;
+
+   /* Initialize the private members. */
+   mdb->m_db_handle = NULL;
+   mdb->m_result = NULL;
+   mdb->m_buf =  get_pool_memory(PM_FNAME);
+
+   db_list->append(this);
+}
+
+BDB_POSTGRESQL::~BDB_POSTGRESQL()
+{
+}
+
 /*
- * List of open databases
+ * Initialize database data structure. In principal this should
+ * never have errors, or it is really fatal.
  */
-static dlist *db_list = NULL;
-
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
-B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
-                                 const char *db_driver,
-                                 const char *db_name,
-                                 const char *db_user,
-                                 const char *db_password,
-                                 const char *db_address,
-                                 int db_port, 
-                                 const char *db_socket,
-                                 bool mult_db_connections,
-                                 bool disable_batch_insert)
+BDB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user, 
+                       const char *db_password, const char *db_address, int db_port, const char *db_socket, 
+                       const char *db_ssl_key, const char *db_ssl_cert, const char *db_ssl_ca,
+                       const char *db_ssl_capath, const char *db_ssl_cipher,
+                       bool mult_db_connections, bool disable_batch_insert) 
 {
-   /*
-    * Initialize the parent class members.
-    */
-   m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
-   m_db_type = SQL_TYPE_POSTGRESQL;
-   m_db_driver = bstrdup("PostgreSQL");
-   m_db_name = bstrdup(db_name);
-   m_db_user = bstrdup(db_user);
+   BDB_POSTGRESQL *mdb = NULL;
+
+   if (!db_user) {
+      Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
+      return NULL;
+   }
+   P(mutex);                          /* lock DB queue */
+   if (db_list && !mult_db_connections) {
+      /*
+       * Look to see if DB already open
+       */
+      foreach_dlist(mdb, db_list) {
+         if (mdb->bdb_match_database(db_driver, db_name, db_address, db_port)) {
+            Dmsg1(dbglvl_info, "DB REopen %s\n", db_name);
+            mdb->increment_refcount();
+            goto get_out;
+         }
+      }
+   }
+   Dmsg0(dbglvl_info, "db_init_database first time\n");
+   /* Create the global Bacula db context */
+   mdb = New(BDB_POSTGRESQL());
+   if (!mdb) goto get_out;
+
+   /* Initialize the parent class members. */
+   mdb->m_db_name = bstrdup(db_name);
+   mdb->m_db_user = bstrdup(db_user);
    if (db_password) {
-      m_db_password = bstrdup(db_password);
+      mdb->m_db_password = bstrdup(db_password);
    }
    if (db_address) {
-      m_db_address = bstrdup(db_address);
+      mdb->m_db_address = bstrdup(db_address);
    }
    if (db_socket) {
-      m_db_socket = bstrdup(db_socket);
-   }
-   m_db_port = db_port;
-   if (disable_batch_insert) {
-      m_disabled_batch_insert = true;
-      m_have_batch_insert = false;
-   } else {
-      m_disabled_batch_insert = false;
-#if defined(USE_BATCH_FILE_INSERT)
-#if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
-#ifdef HAVE_PQISTHREADSAFE
-      m_have_batch_insert = PQisthreadsafe();
-#else
-      m_have_batch_insert = true;
-#endif /* HAVE_PQISTHREADSAFE */
-#else
-      m_have_batch_insert = true;
-#endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
-#else
-      m_have_batch_insert = false;
-#endif /* USE_BATCH_FILE_INSERT */
-   }
-   errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
-   *errmsg = 0;
-   cmd = get_pool_memory(PM_EMSG); /* get command buffer */
-   cached_path = get_pool_memory(PM_FNAME);
-   cached_path_id = 0;
-   m_ref_count = 1;
-   fname = get_pool_memory(PM_FNAME);
-   path = get_pool_memory(PM_FNAME);
-   esc_name = get_pool_memory(PM_FNAME);
-   esc_path = get_pool_memory(PM_FNAME);
-   esc_obj = get_pool_memory(PM_FNAME);
-   m_buf =  get_pool_memory(PM_FNAME);
-   m_allow_transactions = mult_db_connections;
-
-   /* At this time, when mult_db_connections == true, this is for 
+      mdb->m_db_socket = bstrdup(db_socket);
+   } 
+   mdb->m_db_port = db_port;
+
+   if (disable_batch_insert) { 
+      mdb->m_disabled_batch_insert = true;
+      mdb->m_have_batch_insert = false;
+   } else { 
+      mdb->m_disabled_batch_insert = false;
+#ifdef USE_BATCH_FILE_INSERT
+#if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE) 
+#ifdef HAVE_PQISTHREADSAFE 
+      mdb->m_have_batch_insert = PQisthreadsafe();
+#else 
+      mdb->m_have_batch_insert = true;
+#endif /* HAVE_PQISTHREADSAFE */ 
+#else 
+      mdb->m_have_batch_insert = true;
+#endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */ 
+#else 
+      mdb->m_have_batch_insert = false;
+#endif /* USE_BATCH_FILE_INSERT */ 
+   } 
+   mdb->m_allow_transactions = mult_db_connections;
+   /* At this time, when mult_db_connections == true, this is for
     * specific console command such as bvfs or batch mode, and we don't
     * want to share a batch mode or bvfs. In the future, we can change
     * the creation function to add this parameter.
     */
-   m_dedicated = mult_db_connections; 
-
-   /*
-    * Initialize the private members.
-    */
-   m_db_handle = NULL;
-   m_result = NULL;
+   mdb->m_dedicated = mult_db_connections;
 
-   /*
-    * Put the db in the list.
-    */
-   if (db_list == NULL) {
-      db_list = New(dlist(this, &this->m_link));
-   }
-   db_list->append(this);
-}
-
-B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
-{
-}
-
-/*
- * Check that the database correspond to the encoding we want
- */
-static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
+get_out:
+   V(mutex);
+   return mdb;
+} 
+  
+/* Check that the database corresponds to the encoding we want  */
+static bool pgsql_check_database_encoding(JCR *jcr, BDB_POSTGRESQL *mdb)
 {
    SQL_ROW row;
-   int ret = false;
+   int ret = false; 
 
-   if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
+   if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) { 
       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
-      return false;
+      return false; 
    }
 
-   if ((row = mdb->sql_fetch_row()) == NULL) {
-      Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
+   if ((row = mdb->sql_fetch_row()) == NULL) { 
+      Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror()); 
       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
-   } else {
+   } else { 
       ret = bstrcmp(row[0], "SQL_ASCII");
 
       if (ret) {
-         /*
-          * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
-          */
-         mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
-
-      } else {
-         /*
-          * Something is wrong with database encoding
-          */
-         Mmsg(mdb->errmsg, 
+         /* If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
+         mdb->sql_query("SET client_encoding TO 'SQL_ASCII'"); 
+
+      } else { 
+         /* Something is wrong with database encoding */
+         Mmsg(mdb->errmsg,
               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
-              mdb->get_db_name(), row[0]);
+              mdb->get_db_name(), row[0]); 
          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
-         Dmsg1(50, "%s", mdb->errmsg);
-      } 
+         Dmsg1(dbglvl_err, "%s", mdb->errmsg);
+      }
    }
    return ret;
 }
@@ -191,29 +213,30 @@ static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
  * Now actually open the database.  This can generate errors,
  *   which are returned in the errmsg
  *
- * DO NOT close the database or delete mdb here !!!!
+ *  DO NOT close the database or delete mdb here !!!!
  */
-bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
+bool BDB_POSTGRESQL::bdb_open_database(JCR *jcr)
 {
-   bool retval = false;
+   bool retval = false; 
    int errstat;
    char buf[10], *port;
+   BDB_POSTGRESQL *mdb = this;
 
    P(mutex);
-   if (m_connected) {
-      retval = true;
-      goto bail_out;
+   if (mdb->m_connected) {
+      retval = true; 
+      goto get_out;
    }
 
-   if ((errstat=rwl_init(&m_lock)) != 0) {
+   if ((errstat=rwl_init(&mdb->m_lock)) != 0) {
       berrno be;
-      Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
+      Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
             be.bstrerror(errstat));
-      goto bail_out;
+      goto get_out;
    }
 
-   if (m_db_port) {
-      bsnprintf(buf, sizeof(buf), "%d", m_db_port);
+   if (mdb->m_db_port) {
+      bsnprintf(buf, sizeof(buf), "%d", mdb->m_db_port);
       port = buf;
    } else {
       port = NULL;
@@ -222,100 +245,105 @@ bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
    /* If connection fails, try at 5 sec intervals for 30 seconds. */
    for (int retry=0; retry < 6; retry++) {
       /* connect to the database */
-      m_db_handle = PQsetdbLogin(
-           m_db_address,         /* default = localhost */
-           port,                 /* default port */
-           NULL,                 /* pg options */
-           NULL,                 /* tty, ignored */
-           m_db_name,            /* database name */
-           m_db_user,            /* login name */
-           m_db_password);       /* password */
+      mdb->m_db_handle = PQsetdbLogin(
+           mdb->m_db_address,         /* default = localhost */
+           port,                      /* default port */
+           NULL,                      /* pg options */
+           NULL,                      /* tty, ignored */
+           mdb->m_db_name,            /* database name */
+           mdb->m_db_user,            /* login name */
+           mdb->m_db_password);       /* password */
 
       /* If no connect, try once more in case it is a timing problem */
-      if (PQstatus(m_db_handle) == CONNECTION_OK) {
+      if (PQstatus(mdb->m_db_handle) == CONNECTION_OK) {
          break;
       }
       bmicrosleep(5, 0);
    }
 
-   Dmsg0(50, "pg_real_connect done\n");
-   Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
-        (m_db_password == NULL) ? "(NULL)" : m_db_password);
+   Dmsg0(dbglvl_info, "pg_real_connect done\n");
+   Dmsg3(dbglvl_info, "db_user=%s db_name=%s db_password=%s\n", mdb->m_db_user, mdb->m_db_name,
+        mdb->m_db_password==NULL?"(NULL)":mdb->m_db_password);
 
-   if (PQstatus(m_db_handle) != CONNECTION_OK) {
-      Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
+   if (PQstatus(mdb->m_db_handle) != CONNECTION_OK) {
+      Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
-         m_db_name, m_db_user);
-      goto bail_out;
+         mdb->m_db_name, mdb->m_db_user);
+      goto get_out;
    }
 
-   m_connected = true;
-   if (!check_tables_version(jcr, this)) {
-      goto bail_out;
+   mdb->m_connected = true;
+   if (!bdb_check_version(jcr)) {
+      goto get_out;
    }
 
-   sql_query("SET datestyle TO 'ISO, YMD'");
+   sql_query("SET datestyle TO 'ISO, YMD'"); 
    sql_query("SET cursor_tuple_fraction=1");
-   
-   /*
-    * Tell PostgreSQL we are using standard conforming strings
-    * and avoid warnings such as:
-    *  WARNING:  nonstandard use of \\ in a string literal
-    */
-   sql_query("SET standard_conforming_strings=on");
-
-   /*
-    * Check that encoding is SQL_ASCII
-    */
-   pgsql_check_database_encoding(jcr, this);
-
-   retval = true;
 
-bail_out:
+   /* 
+    * Tell PostgreSQL we are using standard conforming strings and avoid warnings such as:
+    *   WARNING:  nonstandard use of \\ in a string literal
+    */ 
+   sql_query("SET standard_conforming_strings=on"); 
+   /* Check that encoding is SQL_ASCII */
+   pgsql_check_database_encoding(jcr, mdb);
+   retval = true; 
+
+get_out:
    V(mutex);
-   return retval;
-}
+   return retval; 
+} 
 
-void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
+void BDB_POSTGRESQL::bdb_close_database(JCR *jcr)
 {
-   db_end_transaction(jcr);
+   BDB_POSTGRESQL *mdb = this;
+
+   if (mdb->m_connected) {
+      bdb_end_transaction(jcr);
+   } 
    P(mutex);
-   m_ref_count--;
-   if (m_ref_count == 0) {
-      sql_free_result();
-      db_list->remove(this);
-      if (m_connected && m_db_handle) {
-         PQfinish(m_db_handle);
-      }
-      rwl_destroy(&m_lock);
-      free_pool_memory(errmsg);
-      free_pool_memory(cmd);
-      free_pool_memory(cached_path);
-      free_pool_memory(fname);
-      free_pool_memory(path);
-      free_pool_memory(esc_name);
-      free_pool_memory(esc_path);
-      free_pool_memory(esc_obj);
-      free_pool_memory(m_buf);
-      if (m_db_driver) {
-         free(m_db_driver);
-      }
-      if (m_db_name) {
-         free(m_db_name);
-      }
-      if (m_db_user) {
-         free(m_db_user);
-      }
-      if (m_db_password) {
-         free(m_db_password);
-      }
-      if (m_db_address) {
-         free(m_db_address);
-      }
-      if (m_db_socket) {
-         free(m_db_socket);
-      }
-      delete this;
+   mdb->m_ref_count--;
+   if (mdb->m_ref_count == 0) {
+      if (mdb->m_connected) {
+         sql_free_result(); 
+      } 
+      db_list->remove(mdb);
+      if (mdb->m_connected && mdb->m_db_handle) {
+         PQfinish(mdb->m_db_handle);
+      } 
+      if (is_rwl_valid(&mdb->m_lock)) {
+         rwl_destroy(&mdb->m_lock);
+      } 
+      free_pool_memory(mdb->errmsg);
+      free_pool_memory(mdb->cmd);
+      free_pool_memory(mdb->cached_path);
+      free_pool_memory(mdb->fname);
+      free_pool_memory(mdb->path);
+      free_pool_memory(mdb->esc_name);
+      free_pool_memory(mdb->esc_path);
+      free_pool_memory(mdb->esc_obj);
+      free_pool_memory(mdb->m_buf);
+      if (mdb->m_db_driver) {
+         free(mdb->m_db_driver);
+      } 
+      if (mdb->m_db_name) {
+         free(mdb->m_db_name);
+      } 
+      if (mdb->m_db_user) {
+         free(mdb->m_db_user);
+      } 
+      if (mdb->m_db_password) {
+         free(mdb->m_db_password);
+      } 
+      if (mdb->m_db_address) {
+         free(mdb->m_db_address);
+      } 
+      if (mdb->m_db_socket) {
+         free(mdb->m_db_socket);
+      } 
+      delete mdb;
       if (db_list->size() == 0) {
          delete db_list;
          db_list = NULL;
@@ -324,59 +352,60 @@ void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
    V(mutex);
 }
 
-void B_DB_POSTGRESQL::db_thread_cleanup(void)
+void BDB_POSTGRESQL::bdb_thread_cleanup(void)
 {
 }
 
 /*
- * Escape strings so that PostgreSQL is happy
+ * Escape strings so PostgreSQL is happy
  *
- *   NOTE! len is the length of the old string. Your new
- *         string must be long enough (max 2*old+1) to hold
- *         the escaped output.
+ *  len is the length of the old string. Your new
+ *    string must be long enough (max 2*old+1) to hold
+ *    the escaped output.
  */
-void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
+void BDB_POSTGRESQL::bdb_escape_string(JCR *jcr, char *snew, char *old, int len)
 {
-   int error;
-  
-   PQescapeStringConn(m_db_handle, snew, old, len, &error);
-   if (error) {
-      Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
-      /* error on encoding, probably invalid multibyte encoding in the source string
-        see PQescapeStringConn documentation for details. */
-      Dmsg0(500, "PQescapeStringConn failed\n");
-   }
+   BDB_POSTGRESQL *mdb = this;
+   int failed; 
+
+   PQescapeStringConn(mdb->m_db_handle, snew, old, len, &failed);
+   if (failed) {
+      Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n")); 
+      /* failed on encoding, probably invalid multibyte encoding in the source string
+         see PQescapeStringConn documentation for details. */
+      Dmsg0(dbglvl_err, "PQescapeStringConn failed\n");
+   } 
 }
 
 /*
  * Escape binary so that PostgreSQL is happy
  *
  */
-char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
+char *BDB_POSTGRESQL::bdb_escape_object(JCR *jcr, char *old, int len)
 {
    size_t new_len;
-   unsigned char *obj;
+   unsigned char *obj; 
+   BDB_POSTGRESQL *mdb = this;
 
-   obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
-   if (!obj) {
+   mdb->esc_obj[0] = 0;
+   obj = PQescapeByteaConn(mdb->m_db_handle, (unsigned const char *)old, len, &new_len);
+   if (!obj) { 
       Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
+   } else {
+      mdb->esc_obj = check_pool_memory_size(mdb->esc_obj, new_len+1);
+      memcpy(mdb->esc_obj, obj, new_len);
+      mdb->esc_obj[new_len] = 0;
+      PQfreemem(obj);
    }
-
-   esc_obj = check_pool_memory_size(esc_obj, new_len+1);
-   memcpy(esc_obj, obj, new_len);
-   esc_obj[new_len]=0;
-
-   PQfreemem(obj);
-
-   return (char *)esc_obj;
+   return (char *)mdb->esc_obj;
 }
 
 /*
  * Unescape binary object so that PostgreSQL is happy
  *
  */
-void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
-                                         POOLMEM **dest, int32_t *dest_len)
+void BDB_POSTGRESQL::bdb_unescape_object(JCR *jcr, char *from, int32_t expected_len,
+                       POOLMEM **dest, int32_t *dest_len)
 {
    size_t new_len;
    unsigned char *obj;
@@ -397,98 +426,102 @@ void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_
    *dest = check_pool_memory_size(*dest, new_len+1);
    memcpy(*dest, obj, new_len);
    (*dest)[new_len]=0;
-   
+
    PQfreemem(obj);
 
-   Dmsg1(010, "obj size: %d\n", *dest_len);
+   Dmsg1(dbglvl_info, "obj size: %d\n", *dest_len);
 }
 
 /*
- * Start a transaction. This groups inserts and makes things
- * much more efficient. Usually started when inserting
- * file attributes.
+ * Start a transaction. This groups inserts and makes things more efficient.
+ *  Usually started when inserting file attributes.
  */
-void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
+void BDB_POSTGRESQL::bdb_start_transaction(JCR *jcr)
 {
-   if (!jcr->attr) {
-      jcr->attr = get_pool_memory(PM_FNAME);
-   }
-   if (!jcr->ar) {
-      jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
-   }
-
-   /*
-    * This is turned off because transactions break
-    * if multiple simultaneous jobs are run.
-    */
-   if (!m_allow_transactions) {
-      return;
-   }
-
-   db_lock(this);
-   /*
-    * Allow only 25,000 changes per transaction
-    */
-   if (m_transaction && changes > 25000) {
-      db_end_transaction(jcr);
-   }
-   if (!m_transaction) {
-      sql_query("BEGIN");  /* begin transaction */
-      Dmsg0(400, "Start PosgreSQL transaction\n");
-      m_transaction = true;
-   }
-   db_unlock(this);
+   BDB_POSTGRESQL *mdb = this;
+
+   if (!jcr->attr) { 
+      jcr->attr = get_pool_memory(PM_FNAME); 
+   }
+   if (!jcr->ar) { 
+      jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR)); 
+      memset(jcr->ar, 0, sizeof(ATTR_DBR));
+   }
+
+   /* 
+    * This is turned off because transactions break if
+    *  multiple simultaneous jobs are run.
+    */ 
+   if (!mdb->m_allow_transactions) {
+      return; 
+   }
+
+   bdb_lock();
+   /* Allow only 25,000 changes per transaction */
+   if (mdb->m_transaction && changes > 25000) {
+      bdb_end_transaction(jcr);
+   } 
+   if (!mdb->m_transaction) {
+      sql_query("BEGIN");             /* begin transaction */
+      Dmsg0(dbglvl_info, "Start PosgreSQL transaction\n");
+      mdb->m_transaction = true;
+   } 
+   bdb_unlock();
 }
 
-void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
+void BDB_POSTGRESQL::bdb_end_transaction(JCR *jcr)
 {
-   if (jcr && jcr->cached_attribute) {
-      Dmsg0(400, "Flush last cached attribute.\n");
-      if (!db_create_attributes_record(jcr, this, jcr->ar)) {
-         Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
+   BDB_POSTGRESQL *mdb = this;
+
+   if (jcr && jcr->cached_attribute) { 
+      Dmsg0(dbglvl_info, "Flush last cached attribute.\n");
+      if (!bdb_create_attributes_record(jcr, jcr->ar)) {
+         Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), jcr->db->bdb_strerror());
       }
-      jcr->cached_attribute = false;
+      jcr->cached_attribute = false; 
    }
 
-   if (!m_allow_transactions) {
-      return;
+   if (!mdb->m_allow_transactions) {
+      return; 
    }
 
-   db_lock(this);
-   if (m_transaction) {
-      sql_query("COMMIT"); /* end transaction */
-      m_transaction = false;
-      Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
+   bdb_lock();
+   if (mdb->m_transaction) {
+      sql_query("COMMIT");            /* end transaction */
+      mdb->m_transaction = false;
+      Dmsg1(dbglvl_info, "End PostgreSQL transaction changes=%d\n", changes);
    }
-   changes = 0;
-   db_unlock(this);
+   changes = 0; 
+   bdb_unlock();
 }
 
 
 /*
- * Submit a general SQL command (cmd), and for each row returned,
- * the result_handler is called with the ctx.
+ * Submit a general SQL command, and for each row returned,
+ *  the result_handler is called with the ctx.
  */
-bool B_DB_POSTGRESQL::db_big_sql_query(const char *query, 
-                                       DB_RESULT_HANDLER *result_handler, 
+bool BDB_POSTGRESQL::bdb_big_sql_query(const char *query,
+                                       DB_RESULT_HANDLER *result_handler,
                                        void *ctx)
 {
-   SQL_ROW row;
-   bool retval = false;
-   bool in_transaction = m_transaction;
-   
-   Dmsg1(500, "db_sql_query starts with '%s'\n", query);
+   BDB_POSTGRESQL *mdb = this;
+   SQL_ROW row; 
+   bool retval = false; 
+   bool in_transaction = mdb->m_transaction;
+
+   Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
 
+   mdb->errmsg[0] = 0;
    /* This code handles only SELECT queries */
    if (strncasecmp(query, "SELECT", 6) != 0) {
-      return db_sql_query(query, result_handler, ctx);
+      return bdb_sql_query(query, result_handler, ctx);
    }
 
    if (!result_handler) {       /* no need of big_query without handler */
-      return false;
+      return false; 
    }
 
-   db_lock(this);
+   bdb_lock();
 
    if (!in_transaction) {       /* CURSOR needs transaction */
       sql_query("BEGIN");
@@ -496,281 +529,274 @@ bool B_DB_POSTGRESQL::db_big_sql_query(const char *query,
 
    Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
 
-   if (!sql_query(m_buf)) {
-      Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
-      Dmsg0(50, "db_sql_query failed\n");
-      goto bail_out;
+   if (!sql_query(mdb->m_buf)) {
+      Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), mdb->m_buf, sql_strerror());
+      Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
+      goto get_out;
    }
 
    do {
       if (!sql_query("FETCH 100 FROM _bac_cursor")) {
-         goto bail_out;
+         Mmsg(mdb->errmsg, _("Fetch failed: ERR=%s\n"), sql_strerror());
+         Dmsg1(dbglvl_err, "%s\n", mdb->errmsg);
+         goto get_out;
       }
       while ((row = sql_fetch_row()) != NULL) {
-         Dmsg1(500, "Fetching %d rows\n", m_num_rows);
-         if (result_handler(ctx, m_num_fields, row))
+         Dmsg1(dbglvl_info, "Fetching %d rows\n", mdb->m_num_rows);
+         if (result_handler(ctx, mdb->m_num_fields, row))
             break;
       }
-      PQclear(m_result);
+      PQclear(mdb->m_result);
       m_result = NULL;
-      
+
    } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
 
    sql_query("CLOSE _bac_cursor");
 
-   Dmsg0(500, "db_big_sql_query finished\n");
+   Dmsg0(dbglvl_info, "db_big_sql_query finished\n");
    sql_free_result();
    retval = true;
 
-bail_out:
+get_out:
    if (!in_transaction) {
       sql_query("COMMIT");  /* end transaction */
    }
 
-   db_unlock(this);
+   bdb_unlock();
    return retval;
 }
 
-/*
- * Submit a general SQL command (cmd), and for each row returned,
- * the result_handler is called with the ctx.
- */
-bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
+/* 
+ * Submit a general SQL command, and for each row returned,
+ *  the result_handler is called with the ctx.
+ */ 
+bool BDB_POSTGRESQL::bdb_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
 {
-   SQL_ROW row;
-   bool retval = true;
-
-   Dmsg1(500, "db_sql_query starts with '%s'\n", query);
-
-   db_lock(this);
-   if (!sql_query(query, QF_STORE_RESULT)) {
-      Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
-      Dmsg0(500, "db_sql_query failed\n");
-      retval = false;
-      goto bail_out;
-   }
-
-   Dmsg0(500, "db_sql_query succeeded. checking handler\n");
-
-   if (result_handler != NULL) {
-      Dmsg0(500, "db_sql_query invoking handler\n");
-      while ((row = sql_fetch_row()) != NULL) {
-         Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
-         if (result_handler(ctx, m_num_fields, row))
-            break;
-      }
-      sql_free_result();
-   }
+   SQL_ROW row; 
+   bool retval = true; 
+   BDB_POSTGRESQL *mdb = this;
+
+   Dmsg1(dbglvl_info, "db_sql_query starts with '%s'\n", query);
+
+   bdb_lock();
+   mdb->errmsg[0] = 0;
+   if (!sql_query(query, QF_STORE_RESULT)) { 
+      Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
+      Dmsg0(dbglvl_err, "db_sql_query failed\n");
+      retval = false; 
+      goto get_out;
+   } 
+
+   Dmsg0(dbglvl_info, "db_sql_query succeeded. checking handler\n");
+
+   if (result_handler) {
+      Dmsg0(dbglvl_dbg, "db_sql_query invoking handler\n");
+      while ((row = sql_fetch_row())) {
+         Dmsg0(dbglvl_dbg, "db_sql_query sql_fetch_row worked\n");
+         if (result_handler(ctx, mdb->m_num_fields, row))
+            break; 
+      } 
+      sql_free_result(); 
+   } 
 
-   Dmsg0(500, "db_sql_query finished\n");
+   Dmsg0(dbglvl_info, "db_sql_query finished\n");
 
-bail_out:
-   db_unlock(this);
-   return retval;
+get_out:
+   bdb_unlock();
+   return retval; 
 }
 
 /*
- * Note, if this routine returns false (failure), Bacula expects
- * that no result has been stored.
- * This is where QUERY_DB comes with Postgresql.
+ * If this routine returns false (failure), Bacula expects
+ *   that no result has been stored.
+ * This is where QueryDB calls to with Postgresql.
  *
- *  Returns:  true  on success
- *            false on failure
+ *  Returns: true  on success
+ *           false on failure
  *
  */
-bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
+bool BDB_POSTGRESQL::sql_query(const char *query, int flags)
 {
-   int i;
-   bool retval = false;
+   int i; 
+   bool retval = false; 
+   BDB_POSTGRESQL *mdb = this;
 
-   Dmsg1(500, "sql_query starts with '%s'\n", query);
-   /*
-    * We are starting a new query. reset everything.
-    */
-   m_num_rows     = -1;
-   m_row_number   = -1;
-   m_field_number = -1;
+   Dmsg1(dbglvl_info, "sql_query starts with '%s'\n", query);
 
-   if (m_result) {
-      PQclear(m_result);  /* hmm, someone forgot to free?? */
-      m_result = NULL;
-   }
+   /* We are starting a new query. reset everything. */
+   mdb->m_num_rows     = -1;
+   mdb->m_row_number   = -1;
+   mdb->m_field_number = -1;
 
-   for (i = 0; i < 10; i++) {
-      m_result = PQexec(m_db_handle, query);
-      if (m_result) {
+   if (mdb->m_result) {
+      PQclear(mdb->m_result);  /* hmm, someone forgot to free?? */
+      mdb->m_result = NULL;
+   } 
+
+   for (i = 0; i < 10; i++) { 
+      mdb->m_result = PQexec(mdb->m_db_handle, query);
+      if (mdb->m_result) {
          break;
       }
       bmicrosleep(5, 0);
    }
-   if (!m_result) {
-      Dmsg1(50, "Query failed: %s\n", query);
-      goto bail_out;
+   if (!mdb->m_result) {
+      Dmsg1(dbglvl_err, "Query failed: %s\n", query);
+      goto get_out;
    }
 
-   m_status = PQresultStatus(m_result);
-   if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
-      Dmsg0(500, "we have a result\n");
+   mdb->m_status = PQresultStatus(mdb->m_result);
+   if (mdb->m_status == PGRES_TUPLES_OK || mdb->m_status == PGRES_COMMAND_OK) {
+      Dmsg0(dbglvl_dbg, "we have a result\n");
 
-      /*
-       * How many fields in the set?
-       */
-      m_num_fields = (int)PQnfields(m_result);
-      Dmsg1(500, "we have %d fields\n", m_num_fields);
+      /* How many fields in the set? */
+      mdb->m_num_fields = (int)PQnfields(mdb->m_result);
+      Dmsg1(dbglvl_dbg, "we have %d fields\n", mdb->m_num_fields);
 
-      m_num_rows = PQntuples(m_result);
-      Dmsg1(500, "we have %d rows\n", m_num_rows);
+      mdb->m_num_rows = PQntuples(mdb->m_result);
+      Dmsg1(dbglvl_dbg, "we have %d rows\n", mdb->m_num_rows);
 
-      m_row_number = 0;      /* we can start to fetch something */
-      m_status = 0;          /* succeed */
-      retval = true;
+      mdb->m_row_number = 0;      /* we can start to fetch something */
+      mdb->m_status = 0;          /* succeed */
+      retval = true; 
    } else {
-      Dmsg1(50, "Result status failed: %s\n", query);
-      goto bail_out;
-   }
-
-   Dmsg0(500, "sql_query finishing\n");
-   goto ok_out;
-
-bail_out:
-   Dmsg0(500, "we failed\n");
-   PQclear(m_result);
-   m_result = NULL;
-   m_status = 1;                   /* failed */
-
-ok_out:
-   return retval;
-}
-
-void B_DB_POSTGRESQL::sql_free_result(void)
+      Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
+      goto get_out;
+   }
+
+   Dmsg0(dbglvl_info, "sql_query finishing\n");
+   goto ok_out; 
+
+get_out:
+   Dmsg0(dbglvl_err, "we failed\n");
+   PQclear(mdb->m_result);
+   mdb->m_result = NULL;
+   mdb->m_status = 1;                   /* failed */
+ok_out: 
+   return retval; 
+}  
+void BDB_POSTGRESQL::sql_free_result(void)
 {
-   db_lock(this);
-   if (m_result) {
-      PQclear(m_result);
-      m_result = NULL;
-   }
-   if (m_rows) {
-      free(m_rows);
-      m_rows = NULL;
-   }
-   if (m_fields) {
-      free(m_fields);
-      m_fields = NULL;
-   }
-   m_num_rows = m_num_fields = 0;
-   db_unlock(this);
-}
-
-SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
-{
-   int j;
-   SQL_ROW row = NULL; /* by default, return NULL */
-
-   Dmsg0(500, "sql_fetch_row start\n");
-
-   if (m_num_fields == 0) {     /* No field, no row */
-      Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
+   BDB_POSTGRESQL *mdb = this;
+
+   bdb_lock();
+   if (mdb->m_result) {
+      PQclear(mdb->m_result);
+      mdb->m_result = NULL;
+   }
+   if (mdb->m_rows) {
+      free(mdb->m_rows);
+      mdb->m_rows = NULL;
+   }
+   if (mdb->m_fields) {
+      free(mdb->m_fields);
+      mdb->m_fields = NULL;
+   } 
+   mdb->m_num_rows = mdb->m_num_fields = 0;
+   bdb_unlock();
+} 
+
+SQL_ROW BDB_POSTGRESQL::sql_fetch_row(void)
+{ 
+   SQL_ROW row = NULL;            /* by default, return NULL */
+   BDB_POSTGRESQL *mdb = this;
+   Dmsg0(dbglvl_info, "sql_fetch_row start\n");
+   if (mdb->m_num_fields == 0) {     /* No field, no row */
+      Dmsg0(dbglvl_err, "sql_fetch_row finishes returning NULL, no fields\n");
       return NULL;
    }
 
-   if (!m_rows || m_rows_size < m_num_fields) {
-      if (m_rows) {
-         Dmsg0(500, "sql_fetch_row freeing space\n");
-         free(m_rows);
-      }
-      Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
-      m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
-      m_rows_size = m_num_fields;
-
-      /*
-       * Now reset the row_number now that we have the space allocated
-       */
-      m_row_number = 0;
-   }
-
-   /*
-    * If still within the result set
-    */
-   if (m_row_number >= 0 && m_row_number < m_num_rows) {
-      Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
-      /*
-       * Get each value from this row
-       */
-      for (j = 0; j < m_num_fields; j++) {
-         m_rows[j] = PQgetvalue(m_result, m_row_number, j);
-         Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
-      }
-      /*
-       * Increment the row number for the next call
-       */
-      m_row_number++;
-      row = m_rows;
-   } else {
-      Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
-   }
-
-   Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
-
-   return row;
-}
-
-const char *B_DB_POSTGRESQL::sql_strerror(void)
-{
-   return PQerrorMessage(m_db_handle);
-}
-
-void B_DB_POSTGRESQL::sql_data_seek(int row)
-{
-   /*
-    * Set the row number to be returned on the next call to sql_fetch_row
-    */
-   m_row_number = row;
-}
-
-int B_DB_POSTGRESQL::sql_affected_rows(void)
-{
-   return (unsigned) str_to_int32(PQcmdTuples(m_result));
-}
-
-uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
-{
-   int i;
-   uint64_t id = 0;
-   char sequence[NAMEDATALEN-1];
-   char getkeyval_query[NAMEDATALEN+50];
-   PGresult *pg_result;
-
-   /*
-    * First execute the insert query and then retrieve the currval.
-    */
-   if (!sql_query(query)) {
-      return 0;
-   }
-
-   m_num_rows = sql_affected_rows();
-   if (m_num_rows != 1) {
-      return 0;
-   }
-
-   changes++;
-
-   /*
-    * Obtain the current value of the sequence that
-    * provides the serial value for primary key of the table.
-    *
-    * currval is local to our session.  It is not affected by
-    * other transactions.
-    *
-    * Determine the name of the sequence.
-    * PostgreSQL automatically creates a sequence using
-    * <table>_<column>_seq.
-    * At the time of writing, all tables used this format for
-    * for their primary key: <table>id
-    * Except for basefiles which has a primary key on baseid.
-    * Therefore, we need to special case that one table.
-    *
-    * everything else can use the PostgreSQL formula.
-    */
+   if (!mdb->m_rows || mdb->m_rows_size < mdb->m_num_fields) {
+      if (mdb->m_rows) {
+         Dmsg0(dbglvl_dbg, "sql_fetch_row freeing space\n");
+         free(mdb->m_rows);
+      } 
+      Dmsg1(dbglvl_dbg, "we need space for %d bytes\n", sizeof(char *) * mdb->m_num_fields);
+      mdb->m_rows = (SQL_ROW)malloc(sizeof(char *) * mdb->m_num_fields);
+      mdb->m_rows_size = mdb->m_num_fields;
+      /* Now reset the row_number now that we have the space allocated */
+      mdb->m_row_number = 0;
+   }
+
+   /* If still within the result set */
+   if (mdb->m_row_number >= 0 && mdb->m_row_number < mdb->m_num_rows) {
+      Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
+
+      /* Get each value from this row */
+      for (int j = 0; j < mdb->m_num_fields; j++) {
+         mdb->m_rows[j] = PQgetvalue(mdb->m_result, mdb->m_row_number, j);
+         Dmsg2(dbglvl_dbg, "sql_fetch_row field '%d' has value '%s'\n", j, mdb->m_rows[j]);
+      } 
+      mdb->m_row_number++;  /* Increment the row number for the next call */
+      row = mdb->m_rows;
+   } else { 
+      Dmsg2(dbglvl_dbg, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->m_row_number, m_num_rows);
+   }
+   Dmsg1(dbglvl_info, "sql_fetch_row finishes returning %p\n", row);
+   return row; 
+} 
+
+const char *BDB_POSTGRESQL::sql_strerror(void)
+{   
+   BDB_POSTGRESQL *mdb = this;
+   return PQerrorMessage(mdb->m_db_handle);
+} 
+
+void BDB_POSTGRESQL::sql_data_seek(int row)
+{ 
+   BDB_POSTGRESQL *mdb = this;
+   /* Set the row number to be returned on the next call to sql_fetch_row */
+   mdb->m_row_number = row;
+} 
+
+int BDB_POSTGRESQL::sql_affected_rows(void)
+{ 
+   BDB_POSTGRESQL *mdb = this;
+   return (unsigned)str_to_int32(PQcmdTuples(mdb->m_result));
+} 
+uint64_t BDB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
+{ 
+   uint64_t id = 0; 
+   char sequence[NAMEDATALEN-1]; 
+   char getkeyval_query[NAMEDATALEN+50]; 
+   PGresult *p_result;
+   BDB_POSTGRESQL *mdb = this;
+
+   /* First execute the insert query and then retrieve the currval. */
+   if (!sql_query(query)) { 
+      return 0; 
+   } 
+
+   mdb->m_num_rows = sql_affected_rows();
+   if (mdb->m_num_rows != 1) {
+      return 0; 
+   } 
+   mdb->changes++;
+   /* 
+    *  Obtain the current value of the sequence that
+    *  provides the serial value for primary key of the table.
+    * 
+    *  currval is local to our session.  It is not affected by
+    *  other transactions.
+    * 
+    *  Determine the name of the sequence.
+    *  PostgreSQL automatically creates a sequence using
+    *   <table>_<column>_seq.
+    *  At the time of writing, all tables used this format for
+    *   for their primary key: <table>id
+    *  Except for basefiles which has a primary key on baseid.
+    *  Therefore, we need to special case that one table.
+    * 
+    *  everything else can use the PostgreSQL formula.
+    */ 
    if (strcasecmp(table_name, "basefiles") == 0) {
       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
    } else {
@@ -781,170 +807,164 @@ uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const cha
    }
 
    bstrncat(sequence, "_seq", sizeof(sequence));
-   bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
+   bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence); 
 
-   Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
-   for (i = 0; i < 10; i++) {
-      pg_result = PQexec(m_db_handle, getkeyval_query);
-      if (pg_result) {
+   Dmsg1(dbglvl_info, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
+   for (int i = 0; i < 10; i++) { 
+      p_result = PQexec(mdb->m_db_handle, getkeyval_query);
+      if (p_result) {
          break;
       }
       bmicrosleep(5, 0);
    }
-   if (!pg_result) {
-      Dmsg1(50, "Query failed: %s\n", getkeyval_query);
-      goto bail_out;
+   if (!p_result) {
+      Dmsg1(dbglvl_err, "Query failed: %s\n", getkeyval_query);
+      goto get_out;
    }
 
-   Dmsg0(500, "exec done");
+   Dmsg0(dbglvl_dbg, "exec done");
 
-   if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
-      Dmsg0(500, "getting value");
-      id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
-      Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
+   if (PQresultStatus(p_result) == PGRES_TUPLES_OK) {
+      Dmsg0(dbglvl_dbg, "getting value");
+      id = str_to_uint64(PQgetvalue(p_result, 0, 0));
+      Dmsg2(dbglvl_dbg, "got value '%s' which became %d\n", PQgetvalue(p_result, 0, 0), id);
    } else {
-      Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
-      Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
+      Dmsg1(dbglvl_err, "Result status failed: %s\n", getkeyval_query);
+      Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->m_db_handle));
    }
 
-bail_out:
-   PQclear(pg_result);
-
+get_out:
+   PQclear(p_result);
    return id;
-}
-
-SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
-{
-   int i, j;
-   int max_length;
-   int this_length;
-
-   Dmsg0(500, "sql_fetch_field starts\n");
-
-   if (!m_fields || m_fields_size < m_num_fields) {
-      if (m_fields) {
-         free(m_fields);
-         m_fields = NULL;
-      }
-      Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
-      m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
-      m_fields_size = m_num_fields;
-
-      for (i = 0; i < m_num_fields; i++) {
-         Dmsg1(500, "filling field %d\n", i);
-         m_fields[i].name = PQfname(m_result, i);
-         m_fields[i].type = PQftype(m_result, i);
-         m_fields[i].flags = 0;
-
-         /*
-          * For a given column, find the max length.
-          */
-         max_length = 0;
-         for (j = 0; j < m_num_rows; j++) {
-            if (PQgetisnull(m_result, j, i)) {
-                this_length = 4;        /* "NULL" */
-            } else {
-                this_length = cstrlen(PQgetvalue(m_result, j, i));
-            }
-         
-            if (max_length < this_length) {
-               max_length = this_length;
-            }
-         }
-         m_fields[i].max_length = max_length;
-
-         Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
-               m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
-      }
-   }
-
-   /*
-    * Increment field number for the next time around
-    */
-   return &m_fields[m_field_number++];
-}
-
-bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
-{
-   switch (field_type) {
-   case 1:
-      return true;
-   default:
-      return false;
-   }
-}
-
-bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
-{
-   /*
-    * TEMP: the following is taken from select OID, typname from pg_type;
-    */
-   switch (field_type) {
-   case 20:
-   case 21:
-   case 23:
-   case 700:
-   case 701:
-      return true;
-   default:
-      return false;
-   }
-}
-
-/*
- * Escape strings so that PostgreSQL is happy on COPY
- *
- *   NOTE! len is the length of the old string. Your new
- *         string must be long enough (max 2*old+1) to hold
- *         the escaped output.
- */
-static char *pgsql_copy_escape(char *dest, char *src, size_t len)
-{
-   /* we have to escape \t, \n, \r, \ */
-   char c = '\0' ;
-
-   while (len > 0 && *src) {
-      switch (*src) {
-      case '\n':
-         c = 'n';
-         break;
-      case '\\':
-         c = '\\';
-         break;
-      case '\t':
-         c = 't';
-         break;
-      case '\r':
-         c = 'r';
-         break;
-      default:
-         c = '\0' ;
-      }
-
-      if (c) {
-         *dest = '\\';
-         dest++;
-         *dest = c;
-      } else {
-         *dest = *src;
-      }
-
-      len--;
-      src++;
-      dest++;
-   }
-
-   *dest = '\0';
-   return dest;
-}
-
-bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
+} 
+
+SQL_FIELD *BDB_POSTGRESQL::sql_fetch_field(void)
+{ 
+   int max_len; 
+   int this_len; 
+   BDB_POSTGRESQL *mdb = this;
+   Dmsg0(dbglvl_dbg, "sql_fetch_field starts\n");
+   if (!mdb->m_fields || mdb->m_fields_size < mdb->m_num_fields) {
+      if (mdb->m_fields) {
+         free(mdb->m_fields);
+         mdb->m_fields = NULL;
+      } 
+      Dmsg1(dbglvl_dbg, "allocating space for %d fields\n", mdb->m_num_fields);
+      mdb->m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * mdb->m_num_fields);
+      mdb->m_fields_size = mdb->m_num_fields;
+      for (int i = 0; i < mdb->m_num_fields; i++) {
+         Dmsg1(dbglvl_dbg, "filling field %d\n", i);
+         mdb->m_fields[i].name = PQfname(mdb->m_result, i);
+         mdb->m_fields[i].type = PQftype(mdb->m_result, i);
+         mdb->m_fields[i].flags = 0;
+         /* For a given column, find the max length. */
+         max_len = 0; 
+         for (int j = 0; j < mdb->m_num_rows; j++) {
+            if (PQgetisnull(mdb->m_result, j, i)) {
+               this_len = 4;         /* "NULL" */
+            } else { 
+               this_len = cstrlen(PQgetvalue(mdb->m_result, j, i));
+            } 
+
+            if (max_len < this_len) { 
+               max_len = this_len; 
+            } 
+         } 
+         mdb->m_fields[i].max_length = max_len;
+  
+         Dmsg4(dbglvl_dbg, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
+               mdb->m_fields[i].name, mdb->m_fields[i].max_length, mdb->m_fields[i].type, mdb->m_fields[i].flags);
+      } 
+   } 
+   /* Increment field number for the next time around */
+   return &mdb->m_fields[mdb->m_field_number++];
+}  
+bool BDB_POSTGRESQL::sql_field_is_not_null(int field_type)
+{  
+   if (field_type == 1) {
+      return true; 
+   } 
+   return false; 
+} 
+bool BDB_POSTGRESQL::sql_field_is_numeric(int field_type)
+{ 
+    /*
+     * TEMP: the following is taken from select OID, typname from pg_type;
+     */
+    switch (field_type) {
+    case 20:
+    case 21:
+    case 23:
+    case 700:
+    case 701:
+       return true; 
+    default:
+       return false; 
+    }
+} 
+/* 
+ * Escape strings so PostgreSQL is happy on COPY
+ * 
+ * len is the length of the old string. Your new
+ *   string must be long enough (max 2*old+1) to hold
+ *   the escaped output.
+ */ 
+static char *pgsql_copy_escape(char *dest, char *src, size_t len) 
+{ 
+    /* we have to escape \t, \n, \r, \ */
+    char c = '\0' ;
+    while (len > 0 && *src) {
+       switch (*src) {
+       case '\n':
+          c = 'n';
+          break;
+       case '\\':
+          c = '\\';
+          break;
+       case '\t':
+          c = 't';
+          break;
+       case '\r':
+          c = 'r';
+          break;
+       default:
+          c = '\0' ;
+       }
+
+       if (c) {
+          *dest = '\\';
+          dest++;
+          *dest = c;
+       } else {
+          *dest = *src;
+       }
+
+       len--;
+       src++;
+       dest++;
+    }
+
+    *dest = '\0';
+    return dest;
+} 
+bool BDB_POSTGRESQL::sql_batch_start(JCR *jcr)
 {
+   BDB_POSTGRESQL *mdb = this;
    const char *query = "COPY batch FROM STDIN";
-
-   Dmsg0(500, "sql_batch_start started\n");
-
-   if (!sql_query("CREATE TEMPORARY TABLE batch ("
+   Dmsg0(dbglvl_info, "sql_batch_start started\n");
+   if  (!sql_query("CREATE TEMPORARY TABLE batch ("
                           "FileIndex int,"
                           "JobId int,"
                           "Path varchar,"
@@ -952,109 +972,109 @@ bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
                           "LStat varchar,"
                           "Md5 varchar,"
                           "DeltaSeq smallint)")) {
-      Dmsg0(500, "sql_batch_start failed\n");
-      return false;
+      Dmsg0(dbglvl_err, "sql_batch_start failed\n");
+      return false; 
    }
-   
-   /*
-    * We are starting a new query.  reset everything.
-    */
-   m_num_rows     = -1;
-   m_row_number   = -1;
-   m_field_number = -1;
 
-   sql_free_result();
+   /* We are starting a new query.  reset everything. */
+   mdb->m_num_rows     = -1;
+   mdb->m_row_number   = -1;
+   mdb->m_field_number = -1;
+
+   sql_free_result(); 
 
    for (int i=0; i < 10; i++) {
-      m_result = PQexec(m_db_handle, query);
-      if (m_result) {
+      mdb->m_result = PQexec(mdb->m_db_handle, query);
+      if (mdb->m_result) {
          break;
       }
       bmicrosleep(5, 0);
    }
-   if (!m_result) {
-      Dmsg1(50, "Query failed: %s\n", query);
-      goto bail_out;
+   if (!mdb->m_result) {
+      Dmsg1(dbglvl_err, "Query failed: %s\n", query);
+      goto get_out;
    }
 
-   m_status = PQresultStatus(m_result);
-   if (m_status == PGRES_COPY_IN) {
-      /*
-       * How many fields in the set?
-       */
-      m_num_fields = (int) PQnfields(m_result);
-      m_num_rows = 0;
-      m_status = 1;
+   mdb->m_status = PQresultStatus(mdb->m_result);
+   if (mdb->m_status == PGRES_COPY_IN) {
+      /* How many fields in the set? */
+      mdb->m_num_fields = (int) PQnfields(mdb->m_result);
+      mdb->m_num_rows = 0;
+      mdb->m_status = 1;
    } else {
-      Dmsg1(50, "Result status failed: %s\n", query);
-      goto bail_out;
+      Dmsg1(dbglvl_err, "Result status failed: %s\n", query);
+      goto get_out;
    }
 
-   Dmsg0(500, "sql_batch_start finishing\n");
+   Dmsg0(dbglvl_info, "sql_batch_start finishing\n");
 
-   return true;
+   return true; 
 
-bail_out:
-   Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
-   m_status = 0;
-   PQclear(m_result);
-   m_result = NULL;
-   return false;
+get_out:
+   Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
+   mdb->m_status = 0;
+   PQclear(mdb->m_result);
+   mdb->m_result = NULL;
+   return false; 
 }
 
-/*
- * Set error to something to abort operation
- */
-bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
+/* 
+ * Set error to something to abort the operation
+ */ 
+bool BDB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
 {
    int res;
    int count=30;
-   PGresult *pg_result;
+   PGresult *p_result;
+   BDB_POSTGRESQL *mdb = this;
 
-   Dmsg0(500, "sql_batch_end started\n");
+   Dmsg0(dbglvl_info, "sql_batch_end started\n");
 
-   do { 
-      res = PQputCopyEnd(m_db_handle, error);
+   do {
+      res = PQputCopyEnd(mdb->m_db_handle, error);
    } while (res == 0 && --count > 0);
 
    if (res == 1) {
-      Dmsg0(500, "ok\n");
-      m_status = 1;
+      Dmsg0(dbglvl_dbg, "ok\n");
+      mdb->m_status = 0;
    }
-   
+
    if (res <= 0) {
-      Dmsg0(500, "we failed\n");
-      m_status = 0;
-      Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
-      Dmsg1(500, "failure %s\n", errmsg);
+      mdb->m_status = 1;
+      Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
+      Dmsg1(dbglvl_err, "failure %s\n", errmsg);
    }
 
    /* Check command status and return to normal libpq state */
-   pg_result = PQgetResult(m_db_handle);
-   if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
-      Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
-      m_status = 0;
+   p_result = PQgetResult(mdb->m_db_handle);
+   if (PQresultStatus(p_result) != PGRES_COMMAND_OK) {
+      Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
+      mdb->m_status = 1;
    }
-   PQclear(pg_result); 
 
-   Dmsg0(500, "sql_batch_end finishing\n");
+   /* Get some statistics to compute the best plan */
+   sql_query("ANALYZE batch");
 
-   return true;
+   PQclear(p_result);
+
+   Dmsg0(dbglvl_info, "sql_batch_end finishing\n");
+   return true; 
 }
 
-bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
+bool BDB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
 {
    int res;
    int count=30;
    size_t len;
    const char *digest;
    char ed1[50];
+   BDB_POSTGRESQL *mdb = this;
 
-   esc_name = check_pool_memory_size(esc_name, fnl*2+1);
-   pgsql_copy_escape(esc_name, fname, fnl);
+   mdb->esc_name = check_pool_memory_size(mdb->esc_name, fnl*2+1);
+   pgsql_copy_escape(mdb->esc_name, fname, fnl);
 
-   esc_path = check_pool_memory_size(esc_path, pnl*2+1);
-   pgsql_copy_escape(esc_path, path, pnl);
+   mdb->esc_path = check_pool_memory_size(mdb->esc_path, pnl*2+1);
+   pgsql_copy_escape(mdb->esc_path, path, pnl);
 
    if (ar->Digest == NULL || ar->Digest[0] == 0) {
       digest = "0";
@@ -1062,69 +1082,30 @@ bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
       digest = ar->Digest;
    }
 
-   len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
-              ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
-              esc_name, ar->attr, digest, ar->DeltaSeq);
+   len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
+              ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
+              mdb->esc_name, ar->attr, digest, ar->DeltaSeq);
 
-   do { 
-      res = PQputCopyData(m_db_handle, cmd, len);
+   do {
+      res = PQputCopyData(mdb->m_db_handle, mdb->cmd, len);
    } while (res == 0 && --count > 0);
 
    if (res == 1) {
-      Dmsg0(500, "ok\n");
-      changes++;
-      m_status = 1;
+      Dmsg0(dbglvl_dbg, "ok\n");
+      mdb->changes++;
+      mdb->m_status = 1;
    }
 
    if (res <= 0) {
-      Dmsg0(500, "we failed\n");
-      m_status = 0;
-      Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
-      Dmsg1(500, "failure %s\n", errmsg);
+      mdb->m_status = 0;
+      Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->m_db_handle));
+      Dmsg1(dbglvl_err, "failure %s\n", mdb->errmsg);
    }
 
-   Dmsg0(500, "sql_batch_insert finishing\n");
+   Dmsg0(dbglvl_info, "sql_batch_insert finishing\n");
 
-   return true;
+   return true; 
 }
 
-/*
- * Initialize database data structure. In principal this should
- * never have errors, or it is really fatal.
- */
-B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
-                       const char *db_user, const char *db_password, 
-                       const char *db_address, int db_port, 
-                       const char *db_socket, bool mult_db_connections, 
-                       bool disable_batch_insert)
-{
-   B_DB_POSTGRESQL *mdb = NULL;
-
-   if (!db_user) {
-      Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
-      return NULL;
-   }
-   P(mutex);                          /* lock DB queue */
-   if (db_list && !mult_db_connections) {
-      /*
-       * Look to see if DB already open
-       */
-      foreach_dlist(mdb, db_list) {
-         if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
-            Dmsg1(100, "DB REopen %s\n", db_name);
-            mdb->increment_refcount();
-            goto bail_out;
-         }
-      }
-   }
-   Dmsg0(100, "db_init_database first time\n");
-   mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
-                             db_address, db_port, db_socket, 
-                             mult_db_connections, disable_batch_insert));
-
-bail_out:
-   V(mutex);
-   return mdb;
-}
 
 #endif /* HAVE_POSTGRESQL */