]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/postgresql.c
Tweak version date
[bacula/bacula] / bacula / src / cats / postgresql.c
index 5f3d5bcb882869a2b392673cab803fb82b61b55b..1f35b1634f282856f4c51d0daaab83f47193ab65 100644 (file)
@@ -1,12 +1,12 @@
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
+   Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
 
-   Bacula® is a registered trademark of John Walker.
+   Bacula® is a registered trademark of Kern Sibbald.
    The licensor of Bacula is the Free Software Foundation Europe
    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
    Switzerland, email:ftf@fsfeurope.org.
  *    Dan Langille, December 2003
  *    based upon work done by Kern Sibbald, March 2000
  *
- *    Version $Id$
+ * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
  */
 
-
-/* The following is necessary so that we do not include
- * the dummy external definition of DB.
- */
-#define __SQL_C                       /* indicate that this is sql.c */
-
 #include "bacula.h"
-#include "cats.h"
 
 #ifdef HAVE_POSTGRESQL
 
+#include "cats.h"
+#include "bdb_priv.h"
+#include "libpq-fe.h"
 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
+#include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
+#include "bdb_postgresql.h"
 
 /* -----------------------------------------------------------------------
  *
  * -----------------------------------------------------------------------
  */
 
-/* List of open databases */
-static BQUEUE db_list = {&db_list, &db_list};
+/*
+ * List of open databases
+ */
+static dlist *db_list = NULL;
 
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
-/*
- * Retrieve database type
- */
-const char *
-db_get_type(void)
+B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
+                                 const char *db_driver,
+                                 const char *db_name,
+                                 const char *db_user,
+                                 const char *db_password,
+                                 const char *db_address,
+                                 int db_port, 
+                                 const char *db_socket,
+                                 bool mult_db_connections,
+                                 bool disable_batch_insert)
 {
-   return "PostgreSQL";
+   /*
+    * Initialize the parent class members.
+    */
+   m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
+   m_db_type = SQL_TYPE_POSTGRESQL;
+   m_db_driver = bstrdup("PostgreSQL");
+   m_db_name = bstrdup(db_name);
+   m_db_user = bstrdup(db_user);
+   if (db_password) {
+      m_db_password = bstrdup(db_password);
+   }
+   if (db_address) {
+      m_db_address = bstrdup(db_address);
+   }
+   if (db_socket) {
+      m_db_socket = bstrdup(db_socket);
+   }
+   m_db_port = db_port;
+   if (disable_batch_insert) {
+      m_disabled_batch_insert = true;
+      m_have_batch_insert = false;
+   } else {
+      m_disabled_batch_insert = false;
+#if defined(USE_BATCH_FILE_INSERT)
+#if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
+#ifdef HAVE_PQISTHREADSAFE
+      m_have_batch_insert = PQisthreadsafe();
+#else
+      m_have_batch_insert = true;
+#endif /* HAVE_PQISTHREADSAFE */
+#else
+      m_have_batch_insert = true;
+#endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
+#else
+      m_have_batch_insert = false;
+#endif /* USE_BATCH_FILE_INSERT */
+   }
+   errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
+   *errmsg = 0;
+   cmd = get_pool_memory(PM_EMSG); /* get command buffer */
+   cached_path = get_pool_memory(PM_FNAME);
+   cached_path_id = 0;
+   m_ref_count = 1;
+   fname = get_pool_memory(PM_FNAME);
+   path = get_pool_memory(PM_FNAME);
+   esc_name = get_pool_memory(PM_FNAME);
+   esc_path = get_pool_memory(PM_FNAME);
+   esc_obj = get_pool_memory(PM_FNAME);
+   m_buf =  get_pool_memory(PM_FNAME);
+   m_allow_transactions = mult_db_connections;
+
+   /*
+    * Initialize the private members.
+    */
+   m_db_handle = NULL;
+   m_result = NULL;
+
+   /*
+    * Put the db in the list.
+    */
+   if (db_list == NULL) {
+      db_list = New(dlist(this, &this->m_link));
+   }
+   db_list->append(this);
+}
 
+B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
+{
 }
 
 /*
- * Initialize database data structure. In principal this should
- * never have errors, or it is really fatal.
+ * Check that the database correspond to the encoding we want
  */
-B_DB *
-db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
-                 const char *db_address, int db_port, const char *db_socket,
-                 int mult_db_connections)
+static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
 {
-   B_DB *mdb;
+   SQL_ROW row;
+   int ret = false;
 
-   if (!db_user) {
-      Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
-      return NULL;
-   }
-   P(mutex);                          /* lock DB queue */
-   if (!mult_db_connections) {
-      /* Look to see if DB already open */
-      for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
-         if (bstrcmp(mdb->db_name, db_name) &&
-             bstrcmp(mdb->db_address, db_address) &&
-             mdb->db_port == db_port) {
-            Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
-            mdb->ref_count++;
-            V(mutex);
-            return mdb;                  /* already open */
-         }
-      }
+   if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
+      Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
+      return false;
    }
-   Dmsg0(100, "db_open first time\n");
-   mdb = (B_DB *)malloc(sizeof(B_DB));
-   memset(mdb, 0, sizeof(B_DB));
-   mdb->db_name = bstrdup(db_name);
-   mdb->db_user = bstrdup(db_user);
-   if (db_password) {
-      mdb->db_password = bstrdup(db_password);
-   }
-   if (db_address) {
-      mdb->db_address  = bstrdup(db_address);
+
+   if ((row = mdb->sql_fetch_row()) == NULL) {
+      Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
+      Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
+   } else {
+      ret = bstrcmp(row[0], "SQL_ASCII");
+
+      if (ret) {
+         /*
+          * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
+          */
+         mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
+
+      } else {
+         /*
+          * Something is wrong with database encoding
+          */
+         Mmsg(mdb->errmsg, 
+              _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
+              mdb->get_db_name(), row[0]);
+         Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
+         Dmsg1(50, "%s", mdb->errmsg);
+      } 
    }
-   if (db_socket) {
-      mdb->db_socket   = bstrdup(db_socket);
-   }
-   mdb->db_port        = db_port;
-   mdb->have_insert_id = TRUE;
-   mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
-   *mdb->errmsg        = 0;
-   mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
-   mdb->cached_path    = get_pool_memory(PM_FNAME);
-   mdb->cached_path_id = 0;
-   mdb->ref_count      = 1;
-   mdb->fname          = get_pool_memory(PM_FNAME);
-   mdb->path           = get_pool_memory(PM_FNAME);
-   mdb->esc_name       = get_pool_memory(PM_FNAME);
-   mdb->esc_path      = get_pool_memory(PM_FNAME);
-   mdb->allow_transactions = mult_db_connections;
-   qinsert(&db_list, &mdb->bq);            /* put db in list */
-   V(mutex);
-   return mdb;
+   return ret;
 }
 
 /*
  * Now actually open the database.  This can generate errors,
  *   which are returned in the errmsg
  *
- * DO NOT close the database or free(mdb) here !!!!
+ * DO NOT close the database or delete mdb here !!!!
  */
-int
-db_open_database(JCR *jcr, B_DB *mdb)
+bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
 {
+   bool retval = false;
    int errstat;
    char buf[10], *port;
 
-#ifdef xxx
-   if (!PQisthreadsafe()) {
-      Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
-           "PostgreSQL library is not thread safe. Connot continue.\n"));
-   }
-#endif
    P(mutex);
-   if (mdb->connected) {
-      V(mutex);
-      return 1;
+   if (m_connected) {
+      retval = true;
+      goto bail_out;
    }
-   mdb->connected = false;
 
-   if ((errstat=rwl_init(&mdb->lock)) != 0) {
+   if ((errstat=rwl_init(&m_lock)) != 0) {
       berrno be;
-      Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
+      Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
             be.bstrerror(errstat));
-      V(mutex);
-      return 0;
+      goto bail_out;
    }
 
-   if (mdb->db_port) {
-      bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
+   if (m_db_port) {
+      bsnprintf(buf, sizeof(buf), "%d", m_db_port);
       port = buf;
    } else {
       port = NULL;
@@ -174,114 +215,112 @@ db_open_database(JCR *jcr, B_DB *mdb)
    /* If connection fails, try at 5 sec intervals for 30 seconds. */
    for (int retry=0; retry < 6; retry++) {
       /* connect to the database */
-      mdb->db = PQsetdbLogin(
-           mdb->db_address,           /* default = localhost */
-           port,                      /* default port */
-           NULL,                      /* pg options */
-           NULL,                      /* tty, ignored */
-           mdb->db_name,              /* database name */
-           mdb->db_user,              /* login name */
-           mdb->db_password);         /* password */
+      m_db_handle = PQsetdbLogin(
+           m_db_address,         /* default = localhost */
+           port,                 /* default port */
+           NULL,                 /* pg options */
+           NULL,                 /* tty, ignored */
+           m_db_name,            /* database name */
+           m_db_user,            /* login name */
+           m_db_password);       /* password */
 
       /* If no connect, try once more in case it is a timing problem */
-      if (PQstatus(mdb->db) == CONNECTION_OK) {
+      if (PQstatus(m_db_handle) == CONNECTION_OK) {
          break;
       }
       bmicrosleep(5, 0);
    }
 
    Dmsg0(50, "pg_real_connect done\n");
-   Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
-            mdb->db_password==NULL?"(NULL)":mdb->db_password);
-
-   if (PQstatus(mdb->db) != CONNECTION_OK) {
-      Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
-            "Database=%s User=%s\n"
-            "It is probably not running or your password is incorrect.\n"),
-             mdb->db_name, mdb->db_user);
-      V(mutex);
-      return 0;
-   }
+   Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
+        (m_db_password == NULL) ? "(NULL)" : m_db_password);
 
-   mdb->connected = true;
+   if (PQstatus(m_db_handle) != CONNECTION_OK) {
+      Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
+         "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
+         m_db_name, m_db_user);
+      goto bail_out;
+   }
 
-   if (!check_tables_version(jcr, mdb)) {
-      V(mutex);
-      return 0;
+   m_connected = true;
+   if (!check_tables_version(jcr, this)) {
+      goto bail_out;
    }
 
-   sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
+   sql_query("SET datestyle TO 'ISO, YMD'");
+   sql_query("SET cursor_tuple_fraction=1");
    
-   /* tell PostgreSQL we are using standard conforming strings
-      and avoid warnings such as:
-       WARNING:  nonstandard use of \\ in a string literal
-   */
-   sql_query(mdb, "set standard_conforming_strings=on");
+   /*
+    * Tell PostgreSQL we are using standard conforming strings
+    * and avoid warnings such as:
+    *  WARNING:  nonstandard use of \\ in a string literal
+    */
+   sql_query("SET standard_conforming_strings=on");
+
+   /*
+    * Check that encoding is SQL_ASCII
+    */
+   pgsql_check_database_encoding(jcr, this);
+
+   retval = true;
 
+bail_out:
    V(mutex);
-   return 1;
+   return retval;
 }
 
-void
-db_close_database(JCR *jcr, B_DB *mdb)
+void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
 {
-   if (!mdb) {
-      return;
-   }
-   db_end_transaction(jcr, mdb);
+   db_end_transaction(jcr);
    P(mutex);
-   sql_free_result(mdb);
-   mdb->ref_count--;
-   if (mdb->ref_count == 0) {
-      qdchain(&mdb->bq);
-      if (mdb->connected && mdb->db) {
-         sql_close(mdb);
+   m_ref_count--;
+   if (m_ref_count == 0) {
+      sql_free_result();
+      db_list->remove(this);
+      if (m_connected && m_db_handle) {
+         PQfinish(m_db_handle);
+      }
+      rwl_destroy(&m_lock);
+      free_pool_memory(errmsg);
+      free_pool_memory(cmd);
+      free_pool_memory(cached_path);
+      free_pool_memory(fname);
+      free_pool_memory(path);
+      free_pool_memory(esc_name);
+      free_pool_memory(esc_path);
+      free_pool_memory(esc_obj);
+      free_pool_memory(m_buf);
+      if (m_db_driver) {
+         free(m_db_driver);
+      }
+      if (m_db_name) {
+         free(m_db_name);
       }
-      rwl_destroy(&mdb->lock);
-      free_pool_memory(mdb->errmsg);
-      free_pool_memory(mdb->cmd);
-      free_pool_memory(mdb->cached_path);
-      free_pool_memory(mdb->fname);
-      free_pool_memory(mdb->path);
-      free_pool_memory(mdb->esc_name);
-      free_pool_memory(mdb->esc_path);
-      if (mdb->db_name) {
-         free(mdb->db_name);
+      if (m_db_user) {
+         free(m_db_user);
       }
-      if (mdb->db_user) {
-         free(mdb->db_user);
+      if (m_db_password) {
+         free(m_db_password);
       }
-      if (mdb->db_password) {
-         free(mdb->db_password);
+      if (m_db_address) {
+         free(m_db_address);
       }
-      if (mdb->db_address) {
-         free(mdb->db_address);
+      if (m_db_socket) {
+         free(m_db_socket);
       }
-      if (mdb->db_socket) {
-         free(mdb->db_socket);
+      delete this;
+      if (db_list->size() == 0) {
+         delete db_list;
+         db_list = NULL;
       }
-      free(mdb);
    }
    V(mutex);
 }
 
-void db_thread_cleanup()
-{ }
-
-/*
- * Return the next unique index (auto-increment) for
- * the given table.  Return NULL on error.
- *
- * For PostgreSQL, NULL causes the auto-increment value
- *  to be updated.
- */
-int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
+void B_DB_POSTGRESQL::db_thread_cleanup(void)
 {
-   strcpy(index, "NULL");
-   return 1;
 }
 
-
 /*
  * Escape strings so that PostgreSQL is happy
  *
@@ -289,12 +328,11 @@ int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
  *         string must be long enough (max 2*old+1) to hold
  *         the escaped output.
  */
-void
-db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
+void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
 {
    int error;
   
-   PQescapeStringConn(mdb->db, snew, old, len, &error);
+   PQescapeStringConn(m_db_handle, snew, old, len, &error);
    if (error) {
       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
       /* error on encoding, probably invalid multibyte encoding in the source string
@@ -304,270 +342,428 @@ db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
 }
 
 /*
- * Submit a general SQL command (cmd), and for each row returned,
- *  the sqlite_handler is called with the ctx.
+ * Escape binary so that PostgreSQL is happy
+ *
  */
-int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
+char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
 {
-   SQL_ROW row;
-
-   Dmsg0(500, "db_sql_query started\n");
+   size_t new_len;
+   unsigned char *obj;
 
-   db_lock(mdb);
-   if (sql_query(mdb, query) != 0) {
-      Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
-      db_unlock(mdb);
-      Dmsg0(500, "db_sql_query failed\n");
-      return 0;
+   obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
+   if (!obj) {
+      Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
    }
-   Dmsg0(500, "db_sql_query succeeded. checking handler\n");
 
-   if (result_handler != NULL) {
-      Dmsg0(500, "db_sql_query invoking handler\n");
-      if ((mdb->result = sql_store_result(mdb)) != NULL) {
-         int num_fields = sql_num_fields(mdb);
+   esc_obj = check_pool_memory_size(esc_obj, new_len+1);
+   memcpy(esc_obj, obj, new_len);
+   esc_obj[new_len]=0;
 
-         Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
-         while ((row = sql_fetch_row(mdb)) != NULL) {
+   PQfreemem(obj);
 
-            Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
-            if (result_handler(ctx, num_fields, row))
-               break;
-         }
+   return (char *)esc_obj;
+}
 
-        sql_free_result(mdb);
-      }
+/*
+ * Unescape binary object so that PostgreSQL is happy
+ *
+ */
+void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
+                                         POOLMEM **dest, int32_t *dest_len)
+{
+   size_t new_len;
+   unsigned char *obj;
+
+   if (!from) {
+      *dest[0] = 0;
+      *dest_len = 0;
+      return;
    }
-   db_unlock(mdb);
 
-   Dmsg0(500, "db_sql_query finished\n");
+   obj = PQunescapeBytea((unsigned const char *)from, &new_len);
 
-   return 1;
-}
+   if (!obj) {
+      Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
+   }
 
+   *dest_len = new_len;
+   *dest = check_pool_memory_size(*dest, new_len+1);
+   memcpy(*dest, obj, new_len);
+   (*dest)[new_len]=0;
+   
+   PQfreemem(obj);
 
+   Dmsg1(010, "obj size: %d\n", *dest_len);
+}
 
-POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
+/*
+ * Start a transaction. This groups inserts and makes things
+ * much more efficient. Usually started when inserting
+ * file attributes.
+ */
+void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
 {
-   int j;
-   POSTGRESQL_ROW row = NULL; // by default, return NULL
-
-   Dmsg0(500, "my_postgresql_fetch_row start\n");
-
-   if (!mdb->row || mdb->row_size < mdb->num_fields) {
-      int num_fields = mdb->num_fields;
-      Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
+   if (!jcr->attr) {
+      jcr->attr = get_pool_memory(PM_FNAME);
+   }
+   if (!jcr->ar) {
+      jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
+   }
 
-      if (mdb->row) {
-         Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
-         free(mdb->row);
-      }
-      num_fields += 20;                  /* add a bit extra */
-      mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
-      mdb->row_size = num_fields;
+   /*
+    * This is turned off because transactions break
+    * if multiple simultaneous jobs are run.
+    */
+   if (!m_allow_transactions) {
+      return;
+   }
 
-      // now reset the row_number now that we have the space allocated
-      mdb->row_number = 0;
+   db_lock(this);
+   /*
+    * Allow only 25,000 changes per transaction
+    */
+   if (m_transaction && changes > 25000) {
+      db_end_transaction(jcr);
+   }
+   if (!m_transaction) {
+      sql_query("BEGIN");  /* begin transaction */
+      Dmsg0(400, "Start PosgreSQL transaction\n");
+      m_transaction = true;
    }
+   db_unlock(this);
+}
 
-   // if still within the result set
-   if (mdb->row_number < mdb->num_rows) {
-      Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
-      // get each value from this row
-      for (j = 0; j < mdb->num_fields; j++) {
-         mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
-         Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
+void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
+{
+   if (jcr && jcr->cached_attribute) {
+      Dmsg0(400, "Flush last cached attribute.\n");
+      if (!db_create_attributes_record(jcr, this, jcr->ar)) {
+         Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
       }
-      // increment the row number for the next call
-      mdb->row_number++;
-
-      row = mdb->row;
-   } else {
-      Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
+      jcr->cached_attribute = false;
    }
 
-   Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
+   if (!m_allow_transactions) {
+      return;
+   }
 
-   return row;
+   db_lock(this);
+   if (m_transaction) {
+      sql_query("COMMIT"); /* end transaction */
+      m_transaction = false;
+      Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
+   }
+   changes = 0;
+   db_unlock(this);
 }
 
-int my_postgresql_max_length(B_DB *mdb, int field_num) {
-   //
-   // for a given column, find the max length
-   //
-   int max_length;
-   int i;
-   int this_length;
 
-   max_length = 0;
-   for (i = 0; i < mdb->num_rows; i++) {
-      if (PQgetisnull(mdb->result, i, field_num)) {
-          this_length = 4;        // "NULL"
-      } else {
-          this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
-      }
+/*
+ * Submit a general SQL command (cmd), and for each row returned,
+ * the result_handler is called with the ctx.
+ */
+bool B_DB_POSTGRESQL::db_big_sql_query(const char *query, 
+                                       DB_RESULT_HANDLER *result_handler, 
+                                       void *ctx)
+{
+   SQL_ROW row;
+   bool retval = false;
+   bool in_transaction = m_transaction;
+   
+   Dmsg1(500, "db_sql_query starts with '%s'\n", query);
 
-      if (max_length < this_length) {
-          max_length = this_length;
-      }
+   /* This code handles only SELECT queries */
+   if (strncasecmp(query, "SELECT", 6) != 0) {
+      return db_sql_query(query, result_handler, ctx);
    }
 
-   return max_length;
-}
+   if (!result_handler) {       /* no need of big_query without handler */
+      return false;
+   }
 
-POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
-{
-   int     i;
+   db_lock(this);
 
-   Dmsg0(500, "my_postgresql_fetch_field starts\n");
+   if (!in_transaction) {       /* CURSOR needs transaction */
+      sql_query("BEGIN");
+   }
+
+   Mmsg(m_buf, "DECLARE _bac_cursor CURSOR FOR %s", query);
+
+   if (!sql_query(m_buf)) {
+      Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), m_buf, sql_strerror());
+      Dmsg0(50, "db_sql_query failed\n");
+      goto bail_out;
+   }
 
-   if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
-      if (mdb->fields) {
-         free(mdb->fields);
+   do {
+      if (!sql_query("FETCH 100 FROM _bac_cursor")) {
+         goto bail_out;
       }
-      Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
-      mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
-      mdb->fields_size = mdb->num_fields;
+      while ((row = sql_fetch_row()) != NULL) {
+         Dmsg1(500, "Fetching %d rows\n", m_num_rows);
+         if (result_handler(ctx, m_num_fields, row))
+            break;
+      }
+      PQclear(m_result);
+      m_result = NULL;
+      
+   } while (m_num_rows > 0);    /* TODO: Can probably test against 100 */
 
-      for (i = 0; i < mdb->num_fields; i++) {
-         Dmsg1(500, "filling field %d\n", i);
-         mdb->fields[i].name           = PQfname(mdb->result, i);
-         mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
-         mdb->fields[i].type       = PQftype(mdb->result, i);
-         mdb->fields[i].flags      = 0;
+   sql_query("CLOSE _bac_cursor");
 
-         Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
-            mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
-            mdb->fields[i].flags);
-      } // end for
-   } // end if
+   Dmsg0(500, "db_big_sql_query finished\n");
+   sql_free_result();
+   retval = true;
 
-   // increment field number for the next time around
+bail_out:
+   if (!in_transaction) {
+      sql_query("COMMIT");  /* end transaction */
+   }
 
-   Dmsg0(500, "my_postgresql_fetch_field finishes\n");
-   return &mdb->fields[mdb->field_number++];
+   db_unlock(this);
+   return retval;
 }
 
-void my_postgresql_data_seek(B_DB *mdb, int row)
+/*
+ * Submit a general SQL command (cmd), and for each row returned,
+ * the result_handler is called with the ctx.
+ */
+bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
 {
-   // set the row number to be returned on the next call
-   // to my_postgresql_fetch_row
-   mdb->row_number = row;
-}
+   SQL_ROW row;
+   bool retval = true;
 
-void my_postgresql_field_seek(B_DB *mdb, int field)
-{
-   mdb->field_number = field;
+   Dmsg1(500, "db_sql_query starts with '%s'\n", query);
+
+   db_lock(this);
+   if (!sql_query(query, QF_STORE_RESULT)) {
+      Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
+      Dmsg0(500, "db_sql_query failed\n");
+      retval = false;
+      goto bail_out;
+   }
+
+   Dmsg0(500, "db_sql_query succeeded. checking handler\n");
+
+   if (result_handler != NULL) {
+      Dmsg0(500, "db_sql_query invoking handler\n");
+      while ((row = sql_fetch_row()) != NULL) {
+         Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
+         if (result_handler(ctx, m_num_fields, row))
+            break;
+      }
+      sql_free_result();
+   }
+
+   Dmsg0(500, "db_sql_query finished\n");
+
+bail_out:
+   db_unlock(this);
+   return retval;
 }
 
 /*
- * Note, if this routine returns 1 (failure), Bacula expects
- *  that no result has been stored.
+ * Note, if this routine returns false (failure), Bacula expects
+ * that no result has been stored.
  * This is where QUERY_DB comes with Postgresql.
  *
- *  Returns:  0  on success
- *             on failure
+ *  Returns:  true  on success
+ *            false on failure
  *
  */
-int my_postgresql_query(B_DB *mdb, const char *query)
+bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
 {
-   Dmsg0(500, "my_postgresql_query started\n");
-   // We are starting a new query.  reset everything.
-   mdb->num_rows     = -1;
-   mdb->row_number   = -1;
-   mdb->field_number = -1;
-
-   if (mdb->result) {
-      PQclear(mdb->result);  /* hmm, someone forgot to free?? */
-      mdb->result = NULL;
+   int i;
+   bool retval = false;
+
+   Dmsg1(500, "sql_query starts with '%s'\n", query);
+   /*
+    * We are starting a new query. reset everything.
+    */
+   m_num_rows     = -1;
+   m_row_number   = -1;
+   m_field_number = -1;
+
+   if (m_result) {
+      PQclear(m_result);  /* hmm, someone forgot to free?? */
+      m_result = NULL;
    }
 
-   Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
-
-   for (int i=0; i < 10; i++) {
-      mdb->result = PQexec(mdb->db, query);
-      if (mdb->result) {
+   for (i = 0; i < 10; i++) {
+      m_result = PQexec(m_db_handle, query);
+      if (m_result) {
          break;
       }
       bmicrosleep(5, 0);
    }
-   if (!mdb->result) {
+   if (!m_result) {
       Dmsg1(50, "Query failed: %s\n", query);
       goto bail_out;
    }
 
-   mdb->status = PQresultStatus(mdb->result);
-   if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
-      Dmsg1(500, "we have a result\n", query);
+   m_status = PQresultStatus(m_result);
+   if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
+      Dmsg0(500, "we have a result\n");
 
-      // how many fields in the set?
-      mdb->num_fields = (int)PQnfields(mdb->result);
-      Dmsg1(500, "we have %d fields\n", mdb->num_fields);
+      /*
+       * How many fields in the set?
+       */
+      m_num_fields = (int)PQnfields(m_result);
+      Dmsg1(500, "we have %d fields\n", m_num_fields);
 
-      mdb->num_rows = PQntuples(mdb->result);
-      Dmsg1(500, "we have %d rows\n", mdb->num_rows);
+      m_num_rows = PQntuples(m_result);
+      Dmsg1(500, "we have %d rows\n", m_num_rows);
 
-      mdb->status = 0;                  /* succeed */
+      m_row_number = 0;      /* we can start to fetch something */
+      m_status = 0;          /* succeed */
+      retval = true;
    } else {
       Dmsg1(50, "Result status failed: %s\n", query);
       goto bail_out;
    }
 
-   Dmsg0(500, "my_postgresql_query finishing\n");
-   return mdb->status;
+   Dmsg0(500, "sql_query finishing\n");
+   goto ok_out;
 
 bail_out:
-   Dmsg1(500, "we failed\n", query);
-   PQclear(mdb->result);
-   mdb->result = NULL;
-   mdb->status = 1;                   /* failed */
-   return mdb->status;
+   Dmsg0(500, "we failed\n");
+   PQclear(m_result);
+   m_result = NULL;
+   m_status = 1;                   /* failed */
+
+ok_out:
+   return retval;
 }
 
-void my_postgresql_free_result(B_DB *mdb)
+void B_DB_POSTGRESQL::sql_free_result(void)
 {
-   
-   db_lock(mdb);
-   if (mdb->result) {
-      PQclear(mdb->result);
-      mdb->result = NULL;
+   db_lock(this);
+   if (m_result) {
+      PQclear(m_result);
+      m_result = NULL;
+   }
+   if (m_rows) {
+      free(m_rows);
+      m_rows = NULL;
+   }
+   if (m_fields) {
+      free(m_fields);
+      m_fields = NULL;
+   }
+   m_num_rows = m_num_fields = 0;
+   db_unlock(this);
+}
+
+SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
+{
+   int j;
+   SQL_ROW row = NULL; /* by default, return NULL */
+
+   Dmsg0(500, "sql_fetch_row start\n");
+
+   if (m_num_fields == 0) {     /* No field, no row */
+      Dmsg0(500, "sql_fetch_row finishes returning NULL, no fields\n");
+      return NULL;
    }
 
-   if (mdb->row) {
-      free(mdb->row);
-      mdb->row = NULL;
+   if (!m_rows || m_rows_size < m_num_fields) {
+      if (m_rows) {
+         Dmsg0(500, "sql_fetch_row freeing space\n");
+         free(m_rows);
+      }
+      Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
+      m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
+      m_rows_size = m_num_fields;
+
+      /*
+       * Now reset the row_number now that we have the space allocated
+       */
+      m_row_number = 0;
    }
 
-   if (mdb->fields) {
-      free(mdb->fields);
-      mdb->fields = NULL;
+   /*
+    * If still within the result set
+    */
+   if (m_row_number >= 0 && m_row_number < m_num_rows) {
+      Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
+      /*
+       * Get each value from this row
+       */
+      for (j = 0; j < m_num_fields; j++) {
+         m_rows[j] = PQgetvalue(m_result, m_row_number, j);
+         Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
+      }
+      /*
+       * Increment the row number for the next call
+       */
+      m_row_number++;
+      row = m_rows;
+   } else {
+      Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
    }
-   db_unlock(mdb);
+
+   Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
+
+   return row;
 }
 
-int my_postgresql_currval(B_DB *mdb, char *table_name)
+const char *B_DB_POSTGRESQL::sql_strerror(void)
 {
-   // Obtain the current value of the sequence that
-   // provides the serial value for primary key of the table.
+   return PQerrorMessage(m_db_handle);
+}
 
-   // currval is local to our session.  It is not affected by
-   // other transactions.
+void B_DB_POSTGRESQL::sql_data_seek(int row)
+{
+   /*
+    * Set the row number to be returned on the next call to sql_fetch_row
+    */
+   m_row_number = row;
+}
 
-   // Determine the name of the sequence.
-   // PostgreSQL automatically creates a sequence using
-   // <table>_<column>_seq.
-   // At the time of writing, all tables used this format for
-   // for their primary key: <table>id
-   // Except for basefiles which has a primary key on baseid.
-   // Therefore, we need to special case that one table.
+int B_DB_POSTGRESQL::sql_affected_rows(void)
+{
+   return (unsigned) str_to_int32(PQcmdTuples(m_result));
+}
 
-   // everything else can use the PostgreSQL formula.
+uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
+{
+   int i;
+   uint64_t id = 0;
+   char sequence[NAMEDATALEN-1];
+   char getkeyval_query[NAMEDATALEN+50];
+   PGresult *pg_result;
+
+   /*
+    * First execute the insert query and then retrieve the currval.
+    */
+   if (!sql_query(query)) {
+      return 0;
+   }
 
-   char      sequence[NAMEDATALEN-1];
-   char      query   [NAMEDATALEN+50];
-   PGresult *result;
-   int       id = 0;
+   m_num_rows = sql_affected_rows();
+   if (m_num_rows != 1) {
+      return 0;
+   }
 
+   changes++;
+
+   /*
+    * Obtain the current value of the sequence that
+    * provides the serial value for primary key of the table.
+    *
+    * currval is local to our session.  It is not affected by
+    * other transactions.
+    *
+    * Determine the name of the sequence.
+    * PostgreSQL automatically creates a sequence using
+    * <table>_<column>_seq.
+    * At the time of writing, all tables used this format for
+    * for their primary key: <table>id
+    * Except for basefiles which has a primary key on baseid.
+    * Therefore, we need to special case that one table.
+    *
+    * everything else can use the PostgreSQL formula.
+    */
    if (strcasecmp(table_name, "basefiles") == 0) {
       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
    } else {
@@ -578,144 +774,280 @@ int my_postgresql_currval(B_DB *mdb, char *table_name)
    }
 
    bstrncat(sequence, "_seq", sizeof(sequence));
-   bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
+   bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
 
-   Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
-   for (int i=0; i < 10; i++) {
-      result = PQexec(mdb->db, query);
-      if (result) {
+   Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
+   for (i = 0; i < 10; i++) {
+      pg_result = PQexec(m_db_handle, getkeyval_query);
+      if (pg_result) {
          break;
       }
       bmicrosleep(5, 0);
    }
-   if (!result) {
-      Dmsg1(50, "Query failed: %s\n", query);
+   if (!pg_result) {
+      Dmsg1(50, "Query failed: %s\n", getkeyval_query);
       goto bail_out;
    }
 
    Dmsg0(500, "exec done");
 
-   if (PQresultStatus(result) == PGRES_TUPLES_OK) {
+   if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
       Dmsg0(500, "getting value");
-      id = atoi(PQgetvalue(result, 0, 0));
-      Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
+      id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
+      Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
    } else {
-      Dmsg1(50, "Result status failed: %s\n", query);
-      Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
+      Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
+      Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
    }
 
 bail_out:
-   PQclear(result);
+   PQclear(pg_result);
 
    return id;
 }
 
-#ifdef HAVE_BATCH_FILE_INSERT
+SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
+{
+   int i, j;
+   int max_length;
+   int this_length;
+
+   Dmsg0(500, "sql_fetch_field starts\n");
+
+   if (!m_fields || m_fields_size < m_num_fields) {
+      if (m_fields) {
+         free(m_fields);
+         m_fields = NULL;
+      }
+      Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
+      m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
+      m_fields_size = m_num_fields;
+
+      for (i = 0; i < m_num_fields; i++) {
+         Dmsg1(500, "filling field %d\n", i);
+         m_fields[i].name = PQfname(m_result, i);
+         m_fields[i].type = PQftype(m_result, i);
+         m_fields[i].flags = 0;
+
+         /*
+          * For a given column, find the max length.
+          */
+         max_length = 0;
+         for (j = 0; j < m_num_rows; j++) {
+            if (PQgetisnull(m_result, j, i)) {
+                this_length = 4;        /* "NULL" */
+            } else {
+                this_length = cstrlen(PQgetvalue(m_result, j, i));
+            }
+         
+            if (max_length < this_length) {
+               max_length = this_length;
+            }
+         }
+         m_fields[i].max_length = max_length;
+
+         Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
+               m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
+      }
+   }
+
+   /*
+    * Increment field number for the next time around
+    */
+   return &m_fields[m_field_number++];
+}
+
+bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
+{
+   switch (field_type) {
+   case 1:
+      return true;
+   default:
+      return false;
+   }
+}
 
-int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
+bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
 {
-   char *query = "COPY batch FROM STDIN";
-
-   Dmsg0(500, "my_postgresql_batch_start started\n");
-
-   if (my_postgresql_query(mdb,
-                           "CREATE TEMPORARY TABLE batch ("
-                               "fileindex int,"
-                               "jobid int,"
-                               "path varchar,"
-                               "name varchar,"
-                               "lstat varchar,"
-                               "md5 varchar)") == 1)
-   {
-      Dmsg0(500, "my_postgresql_batch_start failed\n");
-      return 1;
+   /*
+    * TEMP: the following is taken from select OID, typname from pg_type;
+    */
+   switch (field_type) {
+   case 20:
+   case 21:
+   case 23:
+   case 700:
+   case 701:
+      return true;
+   default:
+      return false;
+   }
+}
+
+/*
+ * Escape strings so that PostgreSQL is happy on COPY
+ *
+ *   NOTE! len is the length of the old string. Your new
+ *         string must be long enough (max 2*old+1) to hold
+ *         the escaped output.
+ */
+static char *pgsql_copy_escape(char *dest, char *src, size_t len)
+{
+   /* we have to escape \t, \n, \r, \ */
+   char c = '\0' ;
+
+   while (len > 0 && *src) {
+      switch (*src) {
+      case '\n':
+         c = 'n';
+         break;
+      case '\\':
+         c = '\\';
+         break;
+      case '\t':
+         c = 't';
+         break;
+      case '\r':
+         c = 'r';
+         break;
+      default:
+         c = '\0' ;
+      }
+
+      if (c) {
+         *dest = '\\';
+         dest++;
+         *dest = c;
+      } else {
+         *dest = *src;
+      }
+
+      len--;
+      src++;
+      dest++;
+   }
+
+   *dest = '\0';
+   return dest;
+}
+
+bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
+{
+   const char *query = "COPY batch FROM STDIN";
+
+   Dmsg0(500, "sql_batch_start started\n");
+
+   if (!sql_query("CREATE TEMPORARY TABLE batch ("
+                          "FileIndex int,"
+                          "JobId int,"
+                          "Path varchar,"
+                          "Name varchar,"
+                          "LStat varchar,"
+                          "Md5 varchar,"
+                          "DeltaSeq smallint)")) {
+      Dmsg0(500, "sql_batch_start failed\n");
+      return false;
    }
    
-   // We are starting a new query.  reset everything.
-   mdb->num_rows     = -1;
-   mdb->row_number   = -1;
-   mdb->field_number = -1;
+   /*
+    * We are starting a new query.  reset everything.
+    */
+   m_num_rows     = -1;
+   m_row_number   = -1;
+   m_field_number = -1;
 
-   my_postgresql_free_result(mdb);
+   sql_free_result();
 
    for (int i=0; i < 10; i++) {
-      mdb->result = PQexec(mdb->db, query);
-      if (mdb->result) {
+      m_result = PQexec(m_db_handle, query);
+      if (m_result) {
          break;
       }
       bmicrosleep(5, 0);
    }
-   if (!mdb->result) {
+   if (!m_result) {
       Dmsg1(50, "Query failed: %s\n", query);
       goto bail_out;
    }
 
-   mdb->status = PQresultStatus(mdb->result);
-   if (mdb->status == PGRES_COPY_IN) {
-      // how many fields in the set?
-      mdb->num_fields = (int) PQnfields(mdb->result);
-      mdb->num_rows   = 0;
-      mdb->status = 1;
+   m_status = PQresultStatus(m_result);
+   if (m_status == PGRES_COPY_IN) {
+      /*
+       * How many fields in the set?
+       */
+      m_num_fields = (int) PQnfields(m_result);
+      m_num_rows = 0;
+      m_status = 1;
    } else {
       Dmsg1(50, "Result status failed: %s\n", query);
       goto bail_out;
    }
 
-   Dmsg0(500, "my_postgresql_batch_start finishing\n");
+   Dmsg0(500, "sql_batch_start finishing\n");
 
-   return mdb->status;
+   return true;
 
 bail_out:
-   mdb->status = 0;
-   PQclear(mdb->result);
-   mdb->result = NULL;
-   return mdb->status;
+   Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
+   m_status = 0;
+   PQclear(m_result);
+   m_result = NULL;
+   return false;
 }
 
-/* set error to something to abort operation */
-int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
+/*
+ * Set error to something to abort operation
+ */
+bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
 {
    int res;
    int count=30;
-   Dmsg0(500, "my_postgresql_batch_end started\n");
+   PGresult *pg_result;
 
-   if (!mdb) {                  /* no files ? */
-      return 0;
-   }
+   Dmsg0(500, "sql_batch_end started\n");
 
    do { 
-      res = PQputCopyEnd(mdb->db, error);
+      res = PQputCopyEnd(m_db_handle, error);
    } while (res == 0 && --count > 0);
 
    if (res == 1) {
       Dmsg0(500, "ok\n");
-      mdb->status = 1;
+      m_status = 1;
    }
    
    if (res <= 0) {
       Dmsg0(500, "we failed\n");
-      mdb->status = 0;
-      Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
+      m_status = 0;
+      Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
+      Dmsg1(500, "failure %s\n", errmsg);
    }
-   
-   Dmsg0(500, "my_postgresql_batch_end finishing\n");
 
-   return mdb->status;
+   /* Check command status and return to normal libpq state */
+   pg_result = PQgetResult(m_db_handle);
+   if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
+      Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
+      m_status = 0;
+   }
+   PQclear(pg_result); 
+
+   Dmsg0(500, "sql_batch_end finishing\n");
+
+   return true;
 }
 
-int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
 {
    int res;
    int count=30;
    size_t len;
-   char *digest;
+   const char *digest;
    char ed1[50];
 
-   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
-   my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
+   esc_name = check_pool_memory_size(esc_name, fnl*2+1);
+   pgsql_copy_escape(esc_name, fname, fnl);
 
-   mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
-   my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
+   esc_path = check_pool_memory_size(esc_path, pnl*2+1);
+   pgsql_copy_escape(esc_path, path, pnl);
 
    if (ar->Digest == NULL || ar->Digest[0] == 0) {
       digest = "0";
@@ -723,105 +1055,69 @@ int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
       digest = ar->Digest;
    }
 
-   len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
-              ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
-              mdb->esc_name, ar->attr, digest);
+   len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n", 
+              ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path, 
+              esc_name, ar->attr, digest, ar->DeltaSeq);
 
    do { 
-      res = PQputCopyData(mdb->db,
-                          mdb->cmd,
-                          len);
+      res = PQputCopyData(m_db_handle, cmd, len);
    } while (res == 0 && --count > 0);
 
    if (res == 1) {
       Dmsg0(500, "ok\n");
-      mdb->changes++;
-      mdb->status = 1;
+      changes++;
+      m_status = 1;
    }
 
    if (res <= 0) {
       Dmsg0(500, "we failed\n");
-      mdb->status = 0;
-      Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
+      m_status = 0;
+      Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
+      Dmsg1(500, "failure %s\n", errmsg);
    }
 
-   Dmsg0(500, "my_postgresql_batch_insert finishing\n");
+   Dmsg0(500, "sql_batch_insert finishing\n");
 
-   return mdb->status;
+   return true;
 }
 
-#endif /* HAVE_BATCH_FILE_INSERT */
-
 /*
- * Escape strings so that PostgreSQL is happy on COPY
- *
- *   NOTE! len is the length of the old string. Your new
- *         string must be long enough (max 2*old+1) to hold
- *         the escaped output.
+ * Initialize database data structure. In principal this should
+ * never have errors, or it is really fatal.
  */
-char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
+B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, 
+                       const char *db_user, const char *db_password, 
+                       const char *db_address, int db_port, 
+                       const char *db_socket, bool mult_db_connections, 
+                       bool disable_batch_insert)
 {
-   /* we have to escape \t, \n, \r, \ */
-   char c = '\0' ;
+   B_DB_POSTGRESQL *mdb = NULL;
 
-   while (len > 0 && *src) {
-      switch (*src) {
-      case '\n':
-         c = 'n';
-         break;
-      case '\\':
-         c = '\\';
-         break;
-      case '\t':
-         c = 't';
-         break;
-      case '\r':
-         c = 'r';
-         break;
-      default:
-         c = '\0' ;
-      }
-
-      if (c) {
-         *dest = '\\';
-         dest++;
-         *dest = c;
-      } else {
-         *dest = *src;
+   if (!db_user) {
+      Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
+      return NULL;
+   }
+   P(mutex);                          /* lock DB queue */
+   if (db_list && !mult_db_connections) {
+      /*
+       * Look to see if DB already open
+       */
+      foreach_dlist(mdb, db_list) {
+         if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
+            Dmsg1(100, "DB REopen %s\n", db_name);
+            mdb->increment_refcount();
+            goto bail_out;
+         }
       }
-
-      len--;
-      src++;
-      dest++;
    }
+   Dmsg0(100, "db_init_database first time\n");
+   mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password, 
+                             db_address, db_port, db_socket, 
+                             mult_db_connections, disable_batch_insert));
 
-   *dest = '\0';
-   return dest;
+bail_out:
+   V(mutex);
+   return mdb;
 }
 
-#ifdef HAVE_BATCH_FILE_INSERT
-const char *my_pg_batch_lock_path_query = 
-   "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
-
-
-const char *my_pg_batch_lock_filename_query = 
-   "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
-
-const char *my_pg_batch_unlock_tables_query = "COMMIT";
-
-const char *my_pg_batch_fill_path_query = 
-   "INSERT INTO Path (Path) "
-    "SELECT a.Path FROM "
-     "(SELECT DISTINCT Path FROM batch) AS a "
-      "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
-
-
-const char *my_pg_batch_fill_filename_query = 
-   "INSERT INTO Filename (Name) "
-    "SELECT a.Name FROM "
-     "(SELECT DISTINCT Name FROM batch) as a "
-      "WHERE NOT EXISTS "
-       "(SELECT Name FROM Filename WHERE Name = a.Name)";
-#endif /* HAVE_BATCH_FILE_INSERT */
-
 #endif /* HAVE_POSTGRESQL */