]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/dbi.c
Enhance mountcache with rescan option after interval.
[bacula/bacula] / bacula / src / cats / dbi.c
index 6cdaf88b2bce8b5f96e9a259fad3befddce5b65b..3181e8b49beb422d2ceb20a8ac3173a7808cc584 100644 (file)
@@ -1,12 +1,12 @@
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
+   Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
 
-   Bacula® is a registered trademark of John Walker.
+   Bacula® is a registered trademark of Kern Sibbald.
    The licensor of Bacula is the Free Software Foundation Europe
    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
    Switzerland, email:ftf@fsfeurope.org.
  *    based upon work done by Dan Langille, December 2003 and
  *    by Kern Sibbald, March 2000
  *
- *    Version $Id$
+ * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
  */
-
-
-/* The following is necessary so that we do not include
- * the dummy external definition of DB.
+/*
+ * This code only compiles against a recent version of libdbi. The current
+ * release found on the libdbi website (0.8.3) won't work for this code.
+ *
+ * You find the libdbi library on http://sourceforge.net/projects/libdbi
+ *
+ * A fairly recent version of libdbi from CVS works, so either make sure
+ * your distribution has a fairly recent version of libdbi installed or
+ * clone the CVS repositories from sourceforge and compile that code and
+ * install it.
+ *
+ * You need:
+ * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
+ * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
  */
-#define __SQL_C                       /* indicate that this is sql.c */
 
 #include "bacula.h"
-#include "cats.h"
 
 #ifdef HAVE_DBI
 
+#include "cats.h"
+#include "bdb_priv.h"
+#include <dbi/dbi.h>
+#include <dbi/dbi-dev.h>
+#include <bdb_dbi.h>
+
 /* -----------------------------------------------------------------------
  *
  *   DBI dependent defines and subroutines
  * -----------------------------------------------------------------------
  */
 
-/* List of open databases */
-static BQUEUE db_list = {&db_list, &db_list};
-
-/* Control allocated fields by my_dbi_getvalue */
-static BQUEUE dbi_getvalue_list = {&dbi_getvalue_list, &dbi_getvalue_list};
-
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
 /*
- * Retrieve database type
+ * List of open databases
  */
-const char *
-db_get_type(void)
-{
-   return "DBI";
-}
+static dlist *db_list = NULL;
 
 /*
- * Initialize database data structure. In principal this should
- * never have errors, or it is really fatal.
+ * Control allocated fields by dbi_getvalue
  */
-B_DB *
-db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
-                 const char *db_address, int db_port, const char *db_socket, 
-                 int mult_db_connections)
+static dlist *dbi_getvalue_list = NULL;
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+
+typedef int (*custom_function_insert_t)(void*, const char*, int);
+typedef char* (*custom_function_error_t)(void*);
+typedef int (*custom_function_end_t)(void*, const char*);
+
+B_DB_DBI::B_DB_DBI(JCR *jcr,
+                   const char *db_driver,
+                   const char *db_name,
+                   const char *db_user,
+                   const char *db_password,
+                   const char *db_address,
+                   int db_port,
+                   const char *db_socket,
+                   bool mult_db_connections,
+                   bool disable_batch_insert)
 {
-   B_DB *mdb;
-   char db_driver[10];
+   char *p;
+   char new_db_driver[10];
    char db_driverdir[256];
-
-   /* Constraint the db_driver */
-   if(db_type  == -1) {
-      Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
-      return NULL;
+   DBI_FIELD_GET *field;
+
+   p = (char *)(db_driver + 4);
+   if (strcasecmp(p, "mysql") == 0) {
+      m_db_type = SQL_TYPE_MYSQL;
+      bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
+   } else if (strcasecmp(p, "postgresql") == 0) {
+      m_db_type = SQL_TYPE_POSTGRESQL;
+      bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
+   } else if (strcasecmp(p, "sqlite3") == 0) {
+      m_db_type = SQL_TYPE_SQLITE3;
+      bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
+   } else if (strcasecmp(p, "ingres") == 0) {
+      m_db_type = SQL_TYPE_INGRES;
+      bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
+   } else {
+      Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
+      return;
    }
-   
-   /* Do the correct selection of driver. 
-    * Can be one of the varius supported by libdbi 
+
+   /*
+    * Set db_driverdir whereis is the libdbi drivers
     */
-   switch (db_type) {
-   case SQL_TYPE_MYSQL:
-      bstrncpy(db_driver,"mysql", sizeof(db_driver));
-      break;
-   case SQL_TYPE_POSTGRESQL:
-      bstrncpy(db_driver,"pgsql", sizeof(db_driver));
-      break;
-   case SQL_TYPE_SQLITE:
-      bstrncpy(db_driver,"pgsql", sizeof(db_driver));
-      break;
-   }
-   
-   /* Set db_driverdir whereis is the libdbi drivers */
    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
-   
-   if (!db_user) {
-      Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
-      return NULL;
-   }
-   P(mutex);                          /* lock DB queue */
-   if (!mult_db_connections) {
-      /* Look to see if DB already open */
-      for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
-         if (bstrcmp(mdb->db_name, db_name) &&
-             bstrcmp(mdb->db_address, db_address) &&
-             bstrcmp(mdb->db_driver, db_driver) &&
-             mdb->db_port == db_port) { 
-            Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name, 
-                  dbi_conn_error(mdb->db, NULL));
-            mdb->ref_count++;
-            V(mutex);
-            return mdb;                  /* already open */
-         }
-      }
-   }
-   Dmsg0(100, "db_open first time\n");
-   mdb = (B_DB *)malloc(sizeof(B_DB));
-   memset(mdb, 0, sizeof(B_DB));
-   mdb->db_name = bstrdup(db_name);
-   mdb->db_user = bstrdup(db_user);
+
+   /*
+    * Initialize the parent class members.
+    */
+   m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
+   m_db_name = bstrdup(db_name);
+   m_db_user = bstrdup(db_user);
    if (db_password) {
-      mdb->db_password = bstrdup(db_password);
+      m_db_password = bstrdup(db_password);
    }
    if (db_address) {
-      mdb->db_address  = bstrdup(db_address);
+      m_db_address = bstrdup(db_address);
    }
    if (db_socket) {
-      mdb->db_socket   = bstrdup(db_socket);
+      m_db_socket = bstrdup(db_socket);
    }
    if (db_driverdir) {
-      mdb->db_driverdir = bstrdup(db_driverdir);
+      m_db_driverdir = bstrdup(db_driverdir);
    }
-   if (db_driver) {
-      mdb->db_driver    = bstrdup(db_driver);
+   m_db_driver = bstrdup(new_db_driver);
+   m_db_port = db_port;
+   if (disable_batch_insert) {
+      m_disabled_batch_insert = true;
+      m_have_batch_insert = false;
+   } else {
+      m_disabled_batch_insert = false;
+#if defined(USE_BATCH_FILE_INSERT)
+#ifdef HAVE_DBI_BATCH_FILE_INSERT
+      m_have_batch_insert = true;
+#else
+      m_have_batch_insert = false;
+#endif /* HAVE_DBI_BATCH_FILE_INSERT */
+#else
+      m_have_batch_insert = false;
+#endif /* USE_BATCH_FILE_INSERT */
    }
-   mdb->db_type        = db_type;
-   mdb->db_port        = db_port;
-   mdb->have_insert_id = TRUE;
-   mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
-   *mdb->errmsg        = 0;
-   mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
-   mdb->cached_path    = get_pool_memory(PM_FNAME);
-   mdb->cached_path_id = 0;
-   mdb->ref_count      = 1;
-   mdb->fname          = get_pool_memory(PM_FNAME);
-   mdb->path           = get_pool_memory(PM_FNAME);
-   mdb->esc_name       = get_pool_memory(PM_FNAME);
-   mdb->esc_path      = get_pool_memory(PM_FNAME);
-   mdb->allow_transactions = mult_db_connections;
-   qinsert(&db_list, &mdb->bq);            /* put db in list */
-   V(mutex);
-   return mdb;
+   errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
+   *errmsg = 0;
+   cmd = get_pool_memory(PM_EMSG); /* get command buffer */
+   cached_path = get_pool_memory(PM_FNAME);
+   cached_path_id = 0;
+   m_ref_count = 1;
+   fname = get_pool_memory(PM_FNAME);
+   path = get_pool_memory(PM_FNAME);
+   esc_name = get_pool_memory(PM_FNAME);
+   esc_path = get_pool_memory(PM_FNAME);
+   esc_obj = get_pool_memory(PM_FNAME);
+   m_allow_transactions = mult_db_connections;
+
+   /*
+    * Initialize the private members.
+    */
+   m_db_handle = NULL;
+   m_result = NULL;
+   m_field_get = NULL;
+
+   /*
+    * Put the db in the list.
+    */
+   if (db_list == NULL) {
+      db_list = New(dlist(this, &this->m_link));
+      dbi_getvalue_list = New(dlist(field, &field->link));
+   }
+   db_list->append(this);
+}
+
+B_DB_DBI::~B_DB_DBI()
+{
 }
 
 /*
  * Now actually open the database.  This can generate errors,
  *   which are returned in the errmsg
  *
- * DO NOT close the database or free(mdb) here  !!!!
+ * DO NOT close the database or delete mdb here  !!!!
  */
-int
-db_open_database(JCR *jcr, B_DB *mdb)
-{   
+bool B_DB_DBI::db_open_database(JCR *jcr)
+{
+   bool retval = false;
    int errstat;
    int dbstat;
-   const char *errmsg;
+   uint8_t len;
+   const char *dbi_errmsg;
    char buf[10], *port;
-   int numdrivers; 
-   
+   int numdrivers;
+   char *new_db_name = NULL;
+   char *new_db_dir = NULL;
+
    P(mutex);
-   if (mdb->connected) {
-      V(mutex);
-      return 1;
+   if (m_connected) {
+      retval = true;
+      goto bail_out;
    }
-   mdb->connected = false;
 
-   if ((errstat=rwl_init(&mdb->lock)) != 0) {
+   if ((errstat=rwl_init(&m_lock)) != 0) {
       berrno be;
-      Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
+      Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
             be.bstrerror(errstat));
-      V(mutex);
-      return 0;
+      goto bail_out;
    }
 
-   if (mdb->db_port) {
-      bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
+   if (m_db_port) {
+      bsnprintf(buf, sizeof(buf), "%d", m_db_port);
       port = buf;
    } else {
       port = NULL;
    }
-   
-   numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
+
+   numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
    if (numdrivers < 0) {
-      Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
+      Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
                                "db_driverdir=%s. It is probaly not found any drivers\n"),
-                               mdb->db_driverdir,numdrivers);
-      V(mutex);
-      return 0;
+                               m_db_driverdir,numdrivers);
+      goto bail_out;
    }
-   mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
-   dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
-   dbi_conn_set_option(mdb->db, "port", port);            /* default port */
-   dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
-   dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
-   dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
-      
-   /* If connection fails, try at 5 sec intervals for 30 seconds. */
+   m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
+   /*
+    * Can be many types of databases
+    */
+   switch (m_db_type) {
+   case SQL_TYPE_MYSQL:
+      dbi_conn_set_option(m_db_handle, "host", m_db_address);      /* default = localhost */
+      dbi_conn_set_option(m_db_handle, "port", port);              /* default port */
+      dbi_conn_set_option(m_db_handle, "username", m_db_user);     /* login name */
+      dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
+      dbi_conn_set_option(m_db_handle, "dbname", m_db_name);       /* database name */
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      dbi_conn_set_option(m_db_handle, "host", m_db_address);
+      dbi_conn_set_option(m_db_handle, "port", port);
+      dbi_conn_set_option(m_db_handle, "username", m_db_user);
+      dbi_conn_set_option(m_db_handle, "password", m_db_password);
+      dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
+      break;
+   case SQL_TYPE_SQLITE3:
+      len = strlen(working_directory) + 5;
+      new_db_dir = (char *)malloc(len);
+      strcpy(new_db_dir, working_directory);
+      strcat(new_db_dir, "/");
+      len = strlen(m_db_name) + 5;
+      new_db_name = (char *)malloc(len);
+      strcpy(new_db_name, m_db_name);
+      strcat(new_db_name, ".db");
+      dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
+      dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
+      Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
+      free(new_db_dir);
+      free(new_db_name);
+      break;
+   }
+
+   /*
+    * If connection fails, try at 5 sec intervals for 30 seconds.
+    */
    for (int retry=0; retry < 6; retry++) {
-         
-      dbstat = dbi_conn_connect(mdb->db); 
-      if ( dbstat == 0) {
+      dbstat = dbi_conn_connect(m_db_handle);
+      if (dbstat == 0) {
          break;
       }
-         
-      dbi_conn_error(mdb->db, &errmsg);
-      Dmsg1(50, "dbi error: %s\n", errmsg);
-                   
+
+      dbi_conn_error(m_db_handle, &dbi_errmsg);
+      Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
+
       bmicrosleep(5, 0);
-         
    }
-   
-   if ( dbstat != 0 ) {
-      Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface.\n"
-                       "Type=%s Database=%s User=%s\n"
-                       "It is probably not running or your password is incorrect.\n"),
-                        mdb->db_driver, mdb->db_name, mdb->db_user);
-      V(mutex);
-      return 0;           
-   } 
-   
+
+   if (dbstat != 0 ) {
+      Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
+         "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
+         m_db_driver, m_db_name, m_db_user);
+      goto bail_out;
+   }
+
    Dmsg0(50, "dbi_real_connect done\n");
    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
-                    mdb->db_user, mdb->db_name,
-                    mdb->db_password==NULL?"(NULL)":mdb->db_password);
+         m_db_user, m_db_name,
+        (m_db_password == NULL) ? "(NULL)" : m_db_password);
 
-   mdb->connected = true;
+   m_connected = true;
 
-   if (!check_tables_version(jcr, mdb)) {
-      V(mutex);
-      return 0;
+   if (!check_tables_version(jcr, this)) {
+      goto bail_out;
    }
-   
-   switch (mdb->db_type) {
+
+   switch (m_db_type) {
    case SQL_TYPE_MYSQL:
-      /* Set connection timeout to 8 days specialy for batch mode */
-      sql_query(mdb, "SET wait_timeout=691200");
-      sql_query(mdb, "SET interactive_timeout=691200");
+      /*
+       * Set connection timeout to 8 days specialy for batch mode
+       */
+      sql_query("SET wait_timeout=691200");
+      sql_query("SET interactive_timeout=691200");
       break;
    case SQL_TYPE_POSTGRESQL:
-      /* tell PostgreSQL we are using standard conforming strings
-         and avoid warnings such as:
-         WARNING:  nonstandard use of \\ in a string literal
-      */
-      sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
-      sql_query(mdb, "set standard_conforming_strings=on");
-      break;
-   case SQL_TYPE_SQLITE:
+      /*
+       * Tell PostgreSQL we are using standard conforming strings
+       * and avoid warnings such as:
+       * WARNING:  nonstandard use of \\ in a string literal
+       */
+      sql_query("SET datestyle TO 'ISO, YMD'");
+      sql_query("SET standard_conforming_strings=on");
       break;
    }
-   
+
+   retval = true;
+
+bail_out:
    V(mutex);
-   return 1;
+   return retval;
 }
 
-void
-db_close_database(JCR *jcr, B_DB *mdb)
+void B_DB_DBI::db_close_database(JCR *jcr)
 {
-   if (!mdb) {
-      return;
-   }
-   db_end_transaction(jcr, mdb);
+   db_end_transaction(jcr);
    P(mutex);
-   sql_free_result(mdb);
-   mdb->ref_count--;
-   if (mdb->ref_count == 0) {
-      qdchain(&mdb->bq);
-      if (mdb->connected && mdb->db) {
-         //sql_close(mdb);
-         dbi_shutdown_r(mdb->instance);
-         mdb->db = NULL;
-         mdb->instance = NULL;
+   m_ref_count--;
+   if (m_ref_count == 0) {
+      sql_free_result();
+      db_list->remove(this);
+      if (m_connected && m_db_handle) {
+         dbi_shutdown_r(m_instance);
+         m_db_handle = NULL;
+         m_instance = NULL;
       }
-      rwl_destroy(&mdb->lock);
-      free_pool_memory(mdb->errmsg);
-      free_pool_memory(mdb->cmd);
-      free_pool_memory(mdb->cached_path);
-      free_pool_memory(mdb->fname);
-      free_pool_memory(mdb->path);
-      free_pool_memory(mdb->esc_name);
-      free_pool_memory(mdb->esc_path);
-      if (mdb->db_name) {
-         free(mdb->db_name);
+      rwl_destroy(&m_lock);
+      free_pool_memory(errmsg);
+      free_pool_memory(cmd);
+      free_pool_memory(cached_path);
+      free_pool_memory(fname);
+      free_pool_memory(path);
+      free_pool_memory(esc_name);
+      free_pool_memory(esc_path);
+      free_pool_memory(esc_obj);
+      if (m_db_driver) {
+         free(m_db_driver);
       }
-      if (mdb->db_user) {
-         free(mdb->db_user);
+      if (m_db_name) {
+         free(m_db_name);
       }
-      if (mdb->db_password) {
-         free(mdb->db_password);
+      if (m_db_user) {
+         free(m_db_user);
       }
-      if (mdb->db_address) {
-         free(mdb->db_address);
+      if (m_db_password) {
+         free(m_db_password);
       }
-      if (mdb->db_socket) {
-         free(mdb->db_socket);
+      if (m_db_address) {
+         free(m_db_address);
       }
-      if (mdb->db_driverdir) {
-         free(mdb->db_driverdir);
+      if (m_db_socket) {
+         free(m_db_socket);
       }
-      if (mdb->db_driver) {
-          free(mdb->db_driver);
+      if (m_db_driverdir) {
+         free(m_db_driverdir);
       }
-      free(mdb);            
-   }   
+      delete this;
+      if (db_list->size() == 0) {
+         delete db_list;
+         db_list = NULL;
+      }
+   }
    V(mutex);
 }
 
-void db_thread_cleanup()
-{ }
-
-/*
- * Return the next unique index (auto-increment) for
- * the given table.  Return NULL on error.
- *
- */
-int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
+void B_DB_DBI::db_thread_cleanup(void)
 {
-   strcpy(index, "NULL");
-   return 1;
 }
 
-
 /*
  * Escape strings so that DBI is happy
  *
  *   NOTE! len is the length of the old string. Your new
  *         string must be long enough (max 2*old+1) to hold
  *         the escaped output.
- * 
+ *
  * dbi_conn_quote_string_copy receives a pointer to pointer.
- * We need copy the value of pointer to snew because libdbi change the 
+ * We need copy the value of pointer to snew because libdbi change the
  * pointer
  */
-void
-db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
+void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
 {
    char *inew;
    char *pnew;
-   
+
    if (len == 0) {
-      snew[0] = 0; 
+      snew[0] = 0;
    } else {
-      /* correct the size of old basead in len
-       * and copy new string to inew
+      /*
+       * Correct the size of old basead in len and copy new string to inew
        */
       inew = (char *)malloc(sizeof(char) * len + 1);
       bstrncpy(inew,old,len + 1);
-      /* escape the correct size of old */
-      dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
+      /*
+       * Escape the correct size of old
+       */
+      dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
       free(inew);
-      /* copy the escaped string to snew */
-      bstrncpy(snew, pnew, 2 * len + 1);   
+      /*
+       * Copy the escaped string to snew
+       */
+      bstrncpy(snew, pnew, 2 * len + 1);
    }
-   
+
    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
-   
 }
 
 /*
- * Submit a general SQL command (cmd), and for each row returned,
- *  the sqlite_handler is called with the ctx.
+ * Escape binary object so that DBI is happy
+ * Memory is stored in B_DB struct, no need to free it
  */
-bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
+char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
 {
-   SQL_ROW row;
-
-   Dmsg0(500, "db_sql_query started\n");
+   size_t new_len;
+   char *pnew;
 
-   db_lock(mdb);
-   if (sql_query(mdb, query) != 0) {
-      Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
-      db_unlock(mdb);
-      Dmsg0(500, "db_sql_query failed\n");
-      return false;
+   if (len == 0) {
+      esc_obj[0] = 0;
+   } else {
+      new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
+      esc_obj = check_pool_memory_size(esc_obj, new_len+1);
+      memcpy(esc_obj, pnew, new_len);
    }
-   Dmsg0(500, "db_sql_query succeeded. checking handler\n");
-
-   if (result_handler != NULL) {
-      Dmsg0(500, "db_sql_query invoking handler\n");
-      if ((mdb->result = sql_store_result(mdb)) != NULL) {
-         int num_fields = sql_num_fields(mdb);
-
-         Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
-         while ((row = sql_fetch_row(mdb)) != NULL) {
 
-            Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
-            if (result_handler(ctx, num_fields, row))
-               break;
-         }
+   return esc_obj;
+}
 
-        sql_free_result(mdb);
-      }
+/*
+ * Unescape binary object so that DBI is happy
+ */
+void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
+                                  POOLMEM **dest, int32_t *dest_len)
+{
+   if (!from) {
+      *dest[0] = 0;
+      *dest_len = 0;
+      return;
    }
-   db_unlock(mdb);
-
-   Dmsg0(500, "db_sql_query finished\n");
-
-   return true;
+   *dest = check_pool_memory_size(*dest, expected_len+1);
+   *dest_len = expected_len;
+   memcpy(*dest, from, expected_len);
+   (*dest)[expected_len]=0;
 }
 
-
-
-DBI_ROW my_dbi_fetch_row(B_DB *mdb)
+/*
+ * Start a transaction. This groups inserts and makes things
+ * much more efficient. Usually started when inserting
+ * file attributes.
+ */
+void B_DB_DBI::db_start_transaction(JCR *jcr)
 {
-   int j;
-   DBI_ROW row = NULL; // by default, return NULL
-
-   Dmsg0(500, "my_dbi_fetch_row start\n");
-   
-
-   if (!mdb->row || mdb->row_size < mdb->num_fields) {   
-      int num_fields = mdb->num_fields;
-      Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
-
-      if (mdb->row) {
-         Dmsg0(500, "my_dbi_fetch_row freeing space\n");
-         Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
-         if (mdb->num_rows != 0) {
-            for(j = 0; j < mdb->num_fields; j++) {
-               Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);               
-                  if(mdb->row[j]) {
-                     free(mdb->row[j]);
-                  }                  
-            } 
-         }
-         free(mdb->row);
+   if (!jcr->attr) {
+      jcr->attr = get_pool_memory(PM_FNAME);
+   }
+   if (!jcr->ar) {
+      jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
+   }
+
+   switch (m_db_type) {
+   case SQL_TYPE_SQLITE3:
+      if (!m_allow_transactions) {
+         return;
       }
-      //num_fields += 20;                  /* add a bit extra */
-      mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
-      mdb->row_size = num_fields;
 
-      // now reset the row_number now that we have the space allocated
-      mdb->row_number = 1;
-   }
+      db_lock(this);
+      /*
+       * Allow only 10,000 changes per transaction
+       */
+      if (m_transaction && changes > 10000) {
+         db_end_transaction(jcr);
+      }
+      if (!m_transaction) {
+         sql_query("BEGIN");  /* begin transaction */
+         Dmsg0(400, "Start SQLite transaction\n");
+         m_transaction = true;
+      }
+      db_unlock(this);
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      /*
+       * This is turned off because transactions break
+       * if multiple simultaneous jobs are run.
+       */
+      if (!m_allow_transactions) {
+         return;
+      }
 
-   // if still within the result set
-   if (mdb->row_number <= mdb->num_rows) {
-      Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
-      // get each value from this row
-      for (j = 0; j < mdb->num_fields; j++) {
-         mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);         
-         // allocate space to queue row 
-         mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
-         // store the pointer in queue
-         mdb->field_get->value = mdb->row[j];  
-         Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
-               j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
-         // insert in queue to future free
-         qinsert(&dbi_getvalue_list, &mdb->field_get->bq);         
+      db_lock(this);
+      /*
+       * Allow only 25,000 changes per transaction
+       */
+      if (m_transaction && changes > 25000) {
+         db_end_transaction(jcr);
+      }
+      if (!m_transaction) {
+         sql_query("BEGIN");  /* begin transaction */
+         Dmsg0(400, "Start PosgreSQL transaction\n");
+         m_transaction = true;
+      }
+      db_unlock(this);
+      break;
+   case SQL_TYPE_INGRES:
+      if (!m_allow_transactions) {
+         return;
       }
-      // increment the row number for the next call
-      mdb->row_number++;
 
-      row = mdb->row;
-   } else {
-      Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
+      db_lock(this);
+      /*
+       * Allow only 25,000 changes per transaction
+       */
+      if (m_transaction && changes > 25000) {
+         db_end_transaction(jcr);
+      }
+      if (!m_transaction) {
+         sql_query("BEGIN");  /* begin transaction */
+         Dmsg0(400, "Start Ingres transaction\n");
+         m_transaction = true;
+      }
+      db_unlock(this);
+      break;
+   default:
+      break;
    }
+}
 
-   Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
+void B_DB_DBI::db_end_transaction(JCR *jcr)
+{
+   if (jcr && jcr->cached_attribute) {
+      Dmsg0(400, "Flush last cached attribute.\n");
+      if (!db_create_attributes_record(jcr, this, jcr->ar)) {
+         Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
+      }
+      jcr->cached_attribute = false;
+   }
 
-   return row;
-}
+   switch (m_db_type) {
+   case SQL_TYPE_SQLITE3:
+      if (!m_allow_transactions) {
+         return;
+      }
 
-int my_dbi_max_length(B_DB *mdb, int field_num) {
-   //
-   // for a given column, find the max length
-   //
-   int max_length;
-   int i;
-   int this_length;
-   char *cbuf = NULL;
+      db_lock(this);
+      if (m_transaction) {
+         sql_query("COMMIT"); /* end transaction */
+         m_transaction = false;
+         Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
+      }
+      changes = 0;
+      db_unlock(this);
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      if (!m_allow_transactions) {
+         return;
+      }
 
-   max_length = 0;
-   for (i = 0; i < mdb->num_rows; i++) {
-      if (my_dbi_getisnull(mdb->result, i, field_num)) {
-          this_length = 4;        // "NULL"
-      } else {
-         cbuf = my_dbi_getvalue(mdb->result, i, field_num);
-         this_length = cstrlen(cbuf);
-         // cbuf is always free
-         free(cbuf);
+      db_lock(this);
+      if (m_transaction) {
+         sql_query("COMMIT"); /* end transaction */
+         m_transaction = false;
+         Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
+      }
+      changes = 0;
+      db_unlock(this);
+      break;
+   case SQL_TYPE_INGRES:
+      if (!m_allow_transactions) {
+         return;
       }
 
-      if (max_length < this_length) {
-          max_length = this_length;
+      db_lock(this);
+      if (m_transaction) {
+         sql_query("COMMIT"); /* end transaction */
+         m_transaction = false;
+         Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
       }
+      changes = 0;
+      db_unlock(this);
+      break;
+   default:
+      break;
    }
-
-   return max_length;
 }
 
-DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
+/*
+ * Submit a general SQL command (cmd), and for each row returned,
+ * the result_handler is called with the ctx.
+ */
+bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
 {
-   int     i;
-   int     dbi_index;
-
-   Dmsg0(500, "my_dbi_fetch_field starts\n");
+   bool retval = true;
+   SQL_ROW row;
 
-   if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
-      if (mdb->fields) {
-         free(mdb->fields);
-      }
-      Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
-      mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
-      mdb->fields_size = mdb->num_fields;
-      
-      for (i = 0; i < mdb->num_fields; i++) {
-         // num_fileds is starting at 1, increment i by 1
-         dbi_index = i + 1;
-         Dmsg1(500, "filling field %d\n", i);
-         mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
-         mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
-         mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
-         mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
+   Dmsg1(500, "db_sql_query starts with %s\n", query);
 
-         Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
-            mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
-            mdb->fields[i].flags);
-      } // end for
-   } // end if
+   db_lock(this);
+   if (!sql_query(query, QF_STORE_RESULT)) {
+      Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
+      Dmsg0(500, "db_sql_query failed\n");
+      retval = false;
+      goto bail_out;
+   }
 
-   // increment field number for the next time around
+   Dmsg0(500, "db_sql_query succeeded. checking handler\n");
 
-   Dmsg0(500, "my_dbi_fetch_field finishes\n");
-   return &mdb->fields[mdb->field_number++];
-}
+   if (result_handler != NULL) {
+      Dmsg0(500, "db_sql_query invoking handler\n");
+      while ((row = sql_fetch_row()) != NULL) {
+         Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
+         if (result_handler(ctx, m_num_fields, row))
+            break;
+      }
+      sql_free_result();
+   }
 
-void my_dbi_data_seek(B_DB *mdb, int row)
-{
-   // set the row number to be returned on the next call
-   // to my_dbi_fetch_row
-   mdb->row_number = row;
-}
+   Dmsg0(500, "db_sql_query finished\n");
 
-void my_dbi_field_seek(B_DB *mdb, int field)
-{
-   mdb->field_number = field;
+bail_out:
+   db_unlock(this);
+   return retval;
 }
 
 /*
  * Note, if this routine returns 1 (failure), Bacula expects
  *  that no result has been stored.
  *
- *  Returns:  0  on success
- *            1  on failure
- *
+ *  Returns:  true on success
+ *            false on failure
  */
-int my_dbi_query(B_DB *mdb, const char *query)
+bool B_DB_DBI::sql_query(const char *query, int flags)
 {
-   const char *errmsg;
-   Dmsg1(500, "my_dbi_query started %s\n", query);
-   // We are starting a new query.  reset everything.
-   mdb->num_rows     = -1;
-   mdb->row_number   = -1;
-   mdb->field_number = -1;
-
-   if (mdb->result) {
-      dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
-      mdb->result = NULL;
+   bool retval = false;
+   const char *dbi_errmsg;
+
+   Dmsg1(500, "sql_query starts with %s\n", query);
+
+   /*
+    * We are starting a new query.  reset everything.
+    */
+   m_num_rows     = -1;
+   m_row_number   = -1;
+   m_field_number = -1;
+
+   if (m_result) {
+      dbi_result_free(m_result);  /* hmm, someone forgot to free?? */
+      m_result = NULL;
    }
-         
-   mdb->result = (void **)dbi_conn_query(mdb->db, query);
-      
-   if (!mdb->result) {
-      Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);      
+
+   m_result = (void **)dbi_conn_query(m_db_handle, query);
+
+   if (!m_result) {
+      Dmsg2(50, "Query failed: %s %p\n", query, m_result);
       goto bail_out;
    }
-   
-   mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
-   
-   if (mdb->status == DBI_ERROR_NONE) {
+
+   m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
+   if (m_status == DBI_ERROR_NONE) {
       Dmsg1(500, "we have a result\n", query);
 
-      // how many fields in the set?
-      // num_fields starting at 1
-      mdb->num_fields = dbi_result_get_numfields(mdb->result);
-      Dmsg1(500, "we have %d fields\n", mdb->num_fields);
-      // if no result num_rows is 0
-      mdb->num_rows = dbi_result_get_numrows(mdb->result);
-      Dmsg1(500, "we have %d rows\n", mdb->num_rows);
+      /*
+       * How many fields in the set?
+       * num_fields starting at 1
+       */
+      m_num_fields = dbi_result_get_numfields(m_result);
+      Dmsg1(500, "we have %d fields\n", m_num_fields);
+      /*
+       * If no result num_rows is 0
+       */
+      m_num_rows = dbi_result_get_numrows(m_result);
+      Dmsg1(500, "we have %d rows\n", m_num_rows);
 
-      mdb->status = (dbi_error_flag) 0;                  /* succeed */
+      m_status = (dbi_error_flag) 0;                  /* succeed */
    } else {
       Dmsg1(50, "Result status failed: %s\n", query);
       goto bail_out;
    }
 
-   Dmsg0(500, "my_dbi_query finishing\n");
-   return mdb->status;
+   Dmsg0(500, "sql_query finishing\n");
+   retval = true;
+   goto ok_out;
 
 bail_out:
-   mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
-   //dbi_conn_error(mdb->db, &errmsg);
-   Dmsg4(500, "my_dbi_query we failed dbi error: "
-                   "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
-   dbi_result_free(mdb->result);
-   mdb->result = NULL;
-   mdb->status = (dbi_error_flag) 1;                   /* failed */
-   return mdb->status;
+   m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
+   //dbi_conn_error(m_db_handle, &dbi_errmsg);
+   Dmsg4(500, "sql_query we failed dbi error: "
+                   "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
+   dbi_result_free(m_result);
+   m_result = NULL;
+   m_status = (dbi_error_flag) 1;                   /* failed */
+
+ok_out:
+   return retval;
 }
 
-void my_dbi_free_result(B_DB *mdb)
-{ 
-   
+void B_DB_DBI::sql_free_result(void)
+{
    DBI_FIELD_GET *f;
-   db_lock(mdb);  
-   int i = 0;
-   if (mdb->result) {
-      Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
-      dbi_result_free(mdb->result);
-   }
 
-   mdb->result = NULL;
-   
-   if (mdb->row) {
-      free(mdb->row);
+   db_lock(this);
+   if (m_result) {
+      dbi_result_free(m_result);
+      m_result = NULL;
    }
-   
-   /* now is time to free all value return by my_dbi_get_value
+   if (m_rows) {
+      free(m_rows);
+      m_rows = NULL;
+   }
+   /* 
+    * Now is time to free all value return by dbi_get_value
     * this is necessary because libdbi don't free memory return by yours results
-    * and Bacula has some routine wich call more than once time my_dbi_fetch_row
-    * 
+    * and Bacula has some routine wich call more than once time sql_fetch_row
+    *
     * Using a queue to store all pointer allocate is a good way to free all things
     * when necessary
     */
-   while((f=(DBI_FIELD_GET *)qremove(&dbi_getvalue_list))) {
-      Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
+   foreach_dlist(f, dbi_getvalue_list) {
       free(f->value);
       free(f);
    }
-      
-   mdb->row = NULL;
-   
-   if (mdb->fields) {
-      free(mdb->fields);
-      mdb->fields = NULL;
+   if (m_fields) {
+      free(m_fields);
+      m_fields = NULL;
    }
-   db_unlock(mdb);
-   Dmsg0(500, "my_dbi_free_result finish\n");
-   
-}
-
-const char *my_dbi_strerror(B_DB *mdb) 
-{
-   const char *errmsg;
-   
-   dbi_conn_error(mdb->db, &errmsg);
-        
-   return errmsg;
+   m_num_rows = m_num_fields = 0;
+   db_unlock(this);
 }
 
-#ifdef HAVE_BATCH_FILE_INSERT
-
-/*
- * This can be a bit strang but is the one way to do
- * 
- * Returns 1 if OK
- *         0 if failed
+/* dbi_getvalue
+ * like PQgetvalue;
+ * char *PQgetvalue(const PGresult *res,
+ *                int row_number,
+ *                int column_number);
+ *
+ * use dbi_result_seek_row to search in result set
+ * use example to return only strings
  */
-int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
+static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
 {
-   char *query = "COPY batch FROM STDIN";
-   
-   Dmsg0(500, "my_dbi_batch_start started\n");
-   
-   switch (mdb->db_type) {
-   case SQL_TYPE_MYSQL:
-      db_lock(mdb);
-      if (my_dbi_query(mdb,
-                              "CREATE TEMPORARY TABLE batch ("
-                                  "FileIndex integer,"
-                                  "JobId integer,"
-                                  "Path blob,"
-                                  "Name blob,"
-                                  "LStat tinyblob,"
-                                  "MD5 tinyblob)") == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_start failed\n");
-         return 1;
-      }
-      db_unlock(mdb);
-      Dmsg0(500, "my_dbi_batch_start finishing\n");
-      return 1;
-      break;
-   case SQL_TYPE_POSTGRESQL:
-      
-      //query = "COPY batch FROM STDIN";
-
-      if (my_dbi_query(mdb,
-                              "CREATE TEMPORARY TABLE batch ("
-                                  "fileindex int,"
-                                  "jobid int,"
-                                  "path varchar,"
-                                  "name varchar,"
-                                  "lstat varchar,"
-                                  "md5 varchar)") == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_start failed\n");
-         return 1;
-      }
-      
-      // We are starting a new query.  reset everything.
-      mdb->num_rows     = -1;
-      mdb->row_number   = -1;
-      mdb->field_number = -1;
+   char *buf = NULL;
+   const char *dbi_errmsg;
+   const char *field_name;
+   unsigned short dbitype;
+   size_t field_length;
+   int64_t num;
 
-      my_dbi_free_result(mdb);
+   /* correct the index for dbi interface
+    * dbi index begins 1
+    * I prefer do not change others functions
+    */
+   Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
+         result, row_number, column_number);
 
-      for (int i=0; i < 10; i++) {
-         my_dbi_query(mdb, query);
-         if (mdb->result) {
-            break;
-         }
-         bmicrosleep(5, 0);
-      }
-      if (!mdb->result) {
-         Dmsg1(50, "Query failed: %s\n", query);
-         goto bail_out;
-      }
+   column_number++;
 
-      mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
-      //mdb->status = DBI_ERROR_NONE;
-         
-      if (mdb->status == DBI_ERROR_NONE) {
-         // how many fields in the set?
-         mdb->num_fields = dbi_result_get_numfields(mdb->result);
-         mdb->num_rows   = dbi_result_get_numrows(mdb->result);
-         mdb->status = (dbi_error_flag) 1;
+   if(row_number == 0) {
+     row_number++;
+   }
+
+   Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
+                        result, row_number, column_number);
+
+   if(dbi_result_seek_row(result, row_number)) {
+
+      field_name = dbi_result_get_field_name(result, column_number);
+      field_length = dbi_result_get_field_length(result, field_name);
+      dbitype = dbi_result_get_field_type_idx(result,column_number);
+
+      Dmsg3(500, "dbi_getvalue start: type: '%d' "
+            "field_length bytes: '%d' fieldname: '%s'\n",
+            dbitype, field_length, field_name);
+
+      if(field_length) {
+         //buf = (char *)malloc(sizeof(char *) * field_length + 1);
+         buf = (char *)malloc(field_length + 1);
       } else {
-         Dmsg1(50, "Result status failed: %s\n", query);
-         goto bail_out;
+         /*
+          * if numbers
+          */
+         buf = (char *)malloc(sizeof(char *) * 50);
       }
 
-      Dmsg0(500, "my_postgresql_batch_start finishing\n");
+      switch (dbitype) {
+      case DBI_TYPE_INTEGER:
+         num = dbi_result_get_longlong(result, field_name);
+         edit_int64(num, buf);
+         field_length = strlen(buf);
+         break;
+      case DBI_TYPE_STRING:
+         if(field_length) {
+            field_length = bsnprintf(buf, field_length + 1, "%s",
+            dbi_result_get_string(result, field_name));
+         } else {
+            buf[0] = 0;
+         }
+         break;
+      case DBI_TYPE_BINARY:
+         /*
+          * dbi_result_get_binary return a NULL pointer if value is empty
+          * following, change this to what Bacula espected
+          */
+         if(field_length) {
+            field_length = bsnprintf(buf, field_length + 1, "%s",
+                  dbi_result_get_binary(result, field_name));
+         } else {
+            buf[0] = 0;
+         }
+         break;
+      case DBI_TYPE_DATETIME:
+         time_t last;
+         struct tm tm;
+
+         last = dbi_result_get_datetime(result, field_name);
 
-      return mdb->status;
-      break;
-   case SQL_TYPE_SQLITE:
-      db_lock(mdb);
-      if (my_dbi_query(mdb,
-                              "CREATE TEMPORARY TABLE batch ("
-                                  "FileIndex integer,"
-                                  "JobId integer,"
-                                  "Path blob,"
-                                  "Name blob,"
-                                  "LStat tinyblob,"
-                                  "MD5 tinyblob)") == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_start failed\n");
-         goto bail_out;
+         if(last == -1) {
+                field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
+         } else {
+            (void)localtime_r(&last, &tm);
+            field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
+                  (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
+                  tm.tm_hour, tm.tm_min, tm.tm_sec);
+         }
+         break;
       }
-      db_unlock(mdb);
-      Dmsg0(500, "my_dbi_batch_start finishing\n");
-      return 1;
-      break;
+
+   } else {
+      dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
+      Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
    }
-   
-bail_out:
-   Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
-   mdb->status = (dbi_error_flag) 0;
-   my_dbi_free_result(mdb);
-   mdb->result = NULL;
-   return mdb->status;
+
+   Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
+      buf, field_length, buf);
+
+   /*
+    * Don't worry about this buf
+    */
+   return buf;
 }
 
-/* set error to something to abort operation */
-int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
+SQL_ROW B_DB_DBI::sql_fetch_row(void)
 {
-   int res = 0;
-   int count = 30;
-   int (*custom_function)(void*, const char*) = NULL;  
-   dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
-   
-   Dmsg0(500, "my_dbi_batch_end started\n");
+   int j;
+   SQL_ROW row = NULL; /* by default, return NULL */
+
+   Dmsg0(500, "sql_fetch_row start\n");
+   if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
+      if (m_rows) {
+         Dmsg0(500, "sql_fetch_row freeing space\n");
+         Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
+         if (m_num_rows != 0) {
+            for (j = 0; j < m_num_fields; j++) {
+               Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
+                  if (m_rows[j]) {
+                     free(m_rows[j]);
+                  }
+            }
+         }
+         free(m_rows);
+      }
+      Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
+      m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
+      m_rows_size = m_num_fields;
 
-   if (!mdb) {                  /* no files ? */
-      return 0;
+      /*
+       * Now reset the row_number now that we have the space allocated
+       */
+      m_row_number = 1;
    }
 
-   switch (mdb->db_type) {
-   case SQL_TYPE_MYSQL:
-      if(mdb) {
-         mdb->status = (dbi_error_flag) 0;
+   /*
+    * If still within the result set
+    */
+   if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
+      Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
+      /*
+       * Get each value from this row
+       */
+      for (j = 0; j < m_num_fields; j++) {
+         m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
+         /*
+          * Allocate space to queue row
+          */
+         m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
+         /*
+          * Store the pointer in queue
+          */
+         m_field_get->value = m_rows[j];
+         Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
+               j, m_rows[j], m_field_get->value, m_rows[j]);
+         /*
+          * Insert in queue to future free
+          */
+         dbi_getvalue_list->append(m_field_get);
       }
-      break;
-   case SQL_TYPE_POSTGRESQL:
-      custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
+      /*
+       * Increment the row number for the next call
+       */
+      m_row_number++;
 
-                  
-      do { 
-         res = (*custom_function)(myconn->connection, error);         
-      } while (res == 0 && --count > 0);
+      row = m_rows;
+   } else {
+      Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
+   }
 
-      if (res == 1) {
-         Dmsg0(500, "ok\n");
-         mdb->status = (dbi_error_flag) 1;
-      }
-         
-      if (res <= 0) {
-         Dmsg0(500, "we failed\n");
-         mdb->status = (dbi_error_flag) 0;
-         //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
-       }
-      break;
-   case SQL_TYPE_SQLITE:
-      if(mdb) {
-         mdb->status = (dbi_error_flag) 0;
-      }
-      break;
+   Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
+
+   return row;
+}
+
+const char *B_DB_DBI::sql_strerror(void)
+{
+   const char *dbi_errmsg;
+
+   dbi_conn_error(m_db_handle, &dbi_errmsg);
+
+   return dbi_errmsg;
+}
+
+void B_DB_DBI::sql_data_seek(int row)
+{
+   /*
+    * Set the row number to be returned on the next call to sql_fetch_row
+    */
+   m_row_number = row;
+}
+
+int B_DB_DBI::sql_affected_rows(void)
+{
+#if 0
+   return dbi_result_get_numrows_affected(result);
+#else
+   return 1;
+#endif
+}
+
+uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
+{
+   char sequence[30];
+   uint64_t id = 0;
+
+   /*
+    * First execute the insert query and then retrieve the currval.
+    */
+   if (!sql_query(query)) {
+      return 0;
    }
 
-   Dmsg0(500, "my_dbi_batch_end finishing\n");
+   m_num_rows = sql_affected_rows();
+   if (m_num_rows != 1) {
+      return 0;
+   }
+
+   changes++;
+
+   /*
+    * Obtain the current value of the sequence that
+    * provides the serial value for primary key of the table.
+    *
+    * currval is local to our session.  It is not affected by
+    * other transactions.
+    *
+    * Determine the name of the sequence.
+    * PostgreSQL automatically creates a sequence using
+    * <table>_<column>_seq.
+    * At the time of writing, all tables used this format for
+    * for their primary key: <table>id
+    * Except for basefiles which has a primary key on baseid.
+    * Therefore, we need to special case that one table.
+    *
+    * everything else can use the PostgreSQL formula.
+    */
+   if (m_db_type == SQL_TYPE_POSTGRESQL) {
+      if (strcasecmp(table_name, "basefiles") == 0) {
+         bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
+      } else {
+         bstrncpy(sequence, table_name, sizeof(sequence));
+         bstrncat(sequence, "_", sizeof(sequence));
+         bstrncat(sequence, table_name, sizeof(sequence));
+         bstrncat(sequence, "id", sizeof(sequence));
+      }
+
+      bstrncat(sequence, "_seq", sizeof(sequence));
+      id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
+   } else {
+      id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
+   }
 
-   return true;      
+   return id;
 }
 
-/* 
- * This function is big and use a big switch.  
- * In near future is better split in small functions
- * and refactory.
- * 
+/* dbi_getisnull
+ * like PQgetisnull
+ * int PQgetisnull(const PGresult *res,
+ *                 int row_number,
+ *                 int column_number);
+ *
+ *  use dbi_result_seek_row to search in result set
  */
-int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
-{
-   int res;
-   int count=30;   
-   dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
-   int (*custom_function)(void*, const char*, int) = NULL;
-   char* (*custom_function_error)(void*) = NULL;
-   size_t len;
-   char *digest;
-   char ed1[50];
+static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
+   int i;
 
-   Dmsg0(500, "my_dbi_batch_insert started \n");
+   if (row_number == 0) {
+      row_number++;
+   }
 
-   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);    
-   mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
+   column_number++;
 
-   if (ar->Digest == NULL || ar->Digest[0] == 0) {
-      digest = "0";
+   if (dbi_result_seek_row(result, row_number)) {
+      i = dbi_result_field_is_null_idx(result,column_number);
+      return i;
    } else {
-      digest = ar->Digest;
+      return 0;
    }
+}
 
-   switch (mdb->db_type) {
-   case SQL_TYPE_MYSQL:
-      db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
-      db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
-      len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
-                      ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path, 
-                      mdb->esc_name, ar->attr, digest);
-      
-      if (my_dbi_query(mdb,mdb->cmd) == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_insert failed\n");
-         goto bail_out;
-      }
-      
-      Dmsg0(500, "my_dbi_batch_insert finishing\n");
-      
-      return 1;
-      break;
-   case SQL_TYPE_POSTGRESQL:
-      my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
-      my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
-      len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
-                     ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
-                     mdb->esc_name, ar->attr, digest);
-      
-      /* libdbi don't support CopyData and we need call a postgresql
-       * specific function to do this work
-       */
-      Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
-      if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
-            "PQputCopyData")) != NULL) {
-         do { 
-            res = (*custom_function)(myconn->connection, mdb->cmd, len);
-         } while (res == 0 && --count > 0);
+SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
+{
+   int i, j;
+   int dbi_index;
+   int max_length;
+   int this_length;
+   char *cbuf = NULL;
 
-         if (res == 1) {
-            Dmsg0(500, "ok\n");
-            mdb->changes++;
-            mdb->status = (dbi_error_flag) 1;
-         }
+   Dmsg0(500, "sql_fetch_field starts\n");
 
-         if (res <= 0) {
-            Dmsg0(500, "my_dbi_batch_insert failed\n");
-            goto bail_out;
+   if (!m_fields || m_fields_size < m_num_fields) {
+      if (m_fields) {
+         free(m_fields);
+         m_fields = NULL;
+      }
+      Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
+      m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
+      m_fields_size = m_num_fields;
+
+      for (i = 0; i < m_num_fields; i++) {
+         /*
+          * num_fields is starting at 1, increment i by 1
+          */
+         dbi_index = i + 1;
+         Dmsg1(500, "filling field %d\n", i);
+         m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
+         m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
+         m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
+
+         /*
+          * For a given column, find the max length.
+          */
+         max_length = 0;
+         for (j = 0; j < m_num_rows; j++) {
+            if (dbi_getisnull(m_result, j, dbi_index)) {
+                this_length = 4;        /* "NULL" */
+            } else {
+               cbuf = dbi_getvalue(m_result, j, dbi_index);
+               this_length = cstrlen(cbuf);
+               /*
+                * cbuf is always free
+                */
+               free(cbuf);
+            }
+         
+            if (max_length < this_length) {
+               max_length = this_length;
+            }
          }
+         m_fields[i].max_length = max_length;
 
-         Dmsg0(500, "my_dbi_batch_insert finishing\n");
-         return mdb->status;
-      } else {
-         // ensure to detect a PQerror 
-         custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
-         Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
-         goto bail_out;
+         Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
+               m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
       }
-      break;
-   case SQL_TYPE_SQLITE:
-      db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
-      db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
-      len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
-                      ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path, 
-                      mdb->esc_name, ar->attr, digest);
-      if (my_dbi_query(mdb,mdb->cmd) == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_insert failed\n");
-         goto bail_out;
-      }
-      
-      Dmsg0(500, "my_dbi_batch_insert finishing\n");
-      
-      return 1;
-      break;
    }
-    
-bail_out:
-  Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
-  mdb->status = (dbi_error_flag) 0;
-  my_dbi_free_result(mdb);  
-  return mdb->status;
+
+   /*
+    * Increment field number for the next time around
+    */
+   return &m_fields[m_field_number++];
+}
+
+bool B_DB_DBI::sql_field_is_not_null(int field_type)
+{
+   switch (field_type) {
+   case (1 << 0):
+      return true;
+   default:
+      return false;
+   }
+}
+
+bool B_DB_DBI::sql_field_is_numeric(int field_type)
+{
+   switch (field_type) {
+   case 1:
+   case 2:
+      return true;
+   default:
+      return false;
+   }
 }
 
 /*
@@ -953,9 +1122,11 @@ bail_out:
  *         string must be long enough (max 2*old+1) to hold
  *         the escaped output.
  */
-char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
+static char *postgresql_copy_escape(char *dest, char *src, size_t len)
 {
-   /* we have to escape \t, \n, \r, \ */
+   /*
+    * We have to escape \t, \n, \r, \
+    */
    char c = '\0' ;
 
    while (len > 0 && *src) {
@@ -993,242 +1164,314 @@ char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
    return dest;
 }
 
-#endif /* HAVE_BATCH_FILE_INSERT */
-
-/* my_dbi_getisnull
- * like PQgetisnull
- * int PQgetisnull(const PGresult *res,
- *              int row_number,
- *               int column_number);
- * 
- *  use dbi_result_seek_row to search in result set
+/*
+ * This can be a bit strang but is the one way to do
+ *
+ * Returns true if OK
+ *         false if failed
  */
-int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
-   int i;
+bool B_DB_DBI::sql_batch_start(JCR *jcr)
+{
+   bool retval = true;
+   const char *query = "COPY batch FROM STDIN";
 
-   if(row_number == 0) {
-      row_number++;
+   Dmsg0(500, "sql_batch_start started\n");
+
+   db_lock(this);
+   switch (m_db_type) {
+   case SQL_TYPE_MYSQL:
+      if (!sql_query("CREATE TEMPORARY TABLE batch ("
+                             "FileIndex integer,"
+                             "JobId integer,"
+                             "Path blob,"
+                             "Name blob,"
+                             "LStat tinyblob,"
+                             "MD5 tinyblob,"
+                             "DeltaSeq smallint)")) {
+         Dmsg0(500, "sql_batch_start failed\n");
+         goto bail_out;
+      }
+      Dmsg0(500, "sql_batch_start finishing\n");
+      goto ok_out;
+   case SQL_TYPE_POSTGRESQL:
+      if (!sql_query("CREATE TEMPORARY TABLE batch ("
+                             "FileIndex int,"
+                             "JobId int,"
+                             "Path varchar,"
+                             "Name varchar,"
+                             "LStat varchar,"
+                             "MD5 varchar,"
+                             "DeltaSeq int)")) {
+         Dmsg0(500, "sql_batch_start failed\n");
+         goto bail_out;
+      }
+
+      /*
+       * We are starting a new query.  reset everything.
+       */
+      m_num_rows     = -1;
+      m_row_number   = -1;
+      m_field_number = -1;
+
+      sql_free_result();
+
+      for (int i=0; i < 10; i++) {
+         sql_query(query);
+         if (m_result) {
+            break;
+         }
+         bmicrosleep(5, 0);
+      }
+      if (!m_result) {
+         Dmsg1(50, "Query failed: %s\n", query);
+         goto bail_out;
+      }
+
+      m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
+      //m_status = DBI_ERROR_NONE;
+
+      if (m_status == DBI_ERROR_NONE) {
+         /*
+          * How many fields in the set?
+          */
+         m_num_fields = dbi_result_get_numfields(m_result);
+         m_num_rows = dbi_result_get_numrows(m_result);
+         m_status = (dbi_error_flag) 1;
+      } else {
+         Dmsg1(50, "Result status failed: %s\n", query);
+         goto bail_out;
+      }
+
+      Dmsg0(500, "sql_batch_start finishing\n");
+      goto ok_out;
+   case SQL_TYPE_SQLITE3:
+      if (!sql_query("CREATE TEMPORARY TABLE batch ("
+                             "FileIndex integer,"
+                             "JobId integer,"
+                             "Path blob,"
+                             "Name blob,"
+                             "LStat tinyblob,"
+                             "MD5 tinyblob,"
+                             "DeltaSeq smallint)")) {
+         Dmsg0(500, "sql_batch_start failed\n");
+         goto bail_out;
+      }
+      Dmsg0(500, "sql_batch_start finishing\n");
+      goto ok_out;
    }
-   
-   column_number++;
-   
-   if(dbi_result_seek_row(result, row_number)) {
 
-      i = dbi_result_field_is_null_idx(result,column_number);
+bail_out:
+   Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
+   m_status = (dbi_error_flag) 0;
+   sql_free_result();
+   m_result = NULL;
+   retval = false;
+
+ok_out:
+   db_unlock(this);
+   return retval;
+}
 
-      return i;
-   } else {
-           
-      return 0;
+/*
+ * Set error to something to abort operation
+ */
+bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
+{
+   int res = 0;
+   int count = 30;
+   int (*custom_function)(void*, const char*) = NULL;
+   dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
+
+   Dmsg0(500, "sql_batch_start started\n");
+
+   switch (m_db_type) {
+   case SQL_TYPE_MYSQL:
+      m_status = (dbi_error_flag) 0;
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
+
+      do {
+         res = (*custom_function)(myconn->connection, error);
+      } while (res == 0 && --count > 0);
+
+      if (res == 1) {
+         Dmsg0(500, "ok\n");
+         m_status = (dbi_error_flag) 1;
+      }
+
+      if (res <= 0) {
+         Dmsg0(500, "we failed\n");
+         m_status = (dbi_error_flag) 0;
+         //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
+       }
+      break;
+   case SQL_TYPE_SQLITE3:
+      m_status = (dbi_error_flag) 0;
+      break;
    }
-                
+
+   Dmsg0(500, "sql_batch_start finishing\n");
+
+   return true;
 }
-/* my_dbi_getvalue 
- * like PQgetvalue;
- * char *PQgetvalue(const PGresult *res,
- *                int row_number,
- *                int column_number);
- *
- * use dbi_result_seek_row to search in result set
- * use example to return only strings
+
+/*
+ * This function is big and use a big switch.
+ * In near future is better split in small functions
+ * and refactory.
  */
-char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
+bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
+{
+   int res;
+   int count=30;
+   dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
+   int (*custom_function)(void*, const char*, int) = NULL;
+   char* (*custom_function_error)(void*) = NULL;
+   size_t len;
+   char *digest;
+   char ed1[50];
 
-   char *buf = NULL;
-   const char *errmsg;
-   const char *field_name;     
-   unsigned short dbitype;
-   size_t field_length;
-   int64_t num;
-        
-   /* correct the index for dbi interface
-    * dbi index begins 1
-    * I prefer do not change others functions
-    */
-   Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n", 
-                                result, row_number, column_number);
-        
-   column_number++;
+   Dmsg0(500, "sql_batch_start started \n");
 
-   if(row_number == 0) {
-     row_number++;
+   esc_name = check_pool_memory_size(esc_name, fnl*2+1);
+   esc_path = check_pool_memory_size(esc_path, pnl*2+1);
+
+   if (ar->Digest == NULL || ar->Digest[0] == 0) {
+      *digest = '\0';
+   } else {
+      digest = ar->Digest;
    }
-      
-   Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n", 
-                        result, row_number, column_number);
-   
-   if(dbi_result_seek_row(result, row_number)) {
 
-      field_name = dbi_result_get_field_name(result, column_number);
-      field_length = dbi_result_get_field_length(result, field_name);
-      dbitype = dbi_result_get_field_type_idx(result,column_number);
-      
-      Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
-            "field_length bytes: '%d' fieldname: '%s'\n", 
-            dbitype, field_length, field_name);
-      
-      if(field_length) {
-         //buf = (char *)malloc(sizeof(char *) * field_length + 1);
-         buf = (char *)malloc(field_length + 1);
-      } else {
-         /* if numbers */
-         buf = (char *)malloc(sizeof(char *) * 50);
-      }           
-      
-      switch (dbitype) {
-      case DBI_TYPE_INTEGER:
-         num = dbi_result_get_longlong(result, field_name);         
-         edit_int64(num, buf);
-         field_length = strlen(buf);
-         break;
-      case DBI_TYPE_STRING:
-         if(field_length) {
-            field_length = bsnprintf(buf, field_length + 1, "%s", 
-            dbi_result_get_string(result, field_name));
-         } else {
-            buf[0] = 0;
-         }
-         break;
-      case DBI_TYPE_BINARY:
-         /* dbi_result_get_binary return a NULL pointer if value is empty
-         * following, change this to what Bacula espected
-         */
-         if(field_length) {
-            field_length = bsnprintf(buf, field_length + 1, "%s", 
-                  dbi_result_get_binary(result, field_name));
-         } else {
-            buf[0] = 0;
+   switch (m_db_type) {
+   case SQL_TYPE_MYSQL:
+      db_escape_string(jcr, esc_name, fname, fnl);
+      db_escape_string(jcr, esc_path, path, pnl);
+      len = Mmsg(cmd, "INSERT INTO batch VALUES "
+                      "(%u,%s,'%s','%s','%s','%s',%u)",
+                      ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
+                      esc_name, ar->attr, digest, ar->DeltaSeq);
+
+      if (!sql_query(cmd))
+      {
+         Dmsg0(500, "sql_batch_start failed\n");
+         goto bail_out;
+      }
+
+      Dmsg0(500, "sql_batch_start finishing\n");
+
+      return true;
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      postgresql_copy_escape(esc_name, fname, fnl);
+      postgresql_copy_escape(esc_path, path, pnl);
+      len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
+                     ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
+                     esc_name, ar->attr, digest, ar->DeltaSeq);
+
+      /*
+       * libdbi don't support CopyData and we need call a postgresql
+       * specific function to do this work
+       */
+      Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
+      custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
+      if (custom_function != NULL) {
+         do {
+            res = (*custom_function)(myconn->connection, cmd, len);
+         } while (res == 0 && --count > 0);
+
+         if (res == 1) {
+            Dmsg0(500, "ok\n");
+            changes++;
+            m_status = (dbi_error_flag) 1;
          }
-         break;
-      case DBI_TYPE_DATETIME:
-         time_t last;
-         struct tm tm;
-         
-         last = dbi_result_get_datetime(result, field_name);
-         
-         if(last == -1) {
-                field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00"); 
-         } else {
-            (void)localtime_r(&last, &tm);
-            field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
-                  (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
-                  tm.tm_hour, tm.tm_min, tm.tm_sec);
+
+         if (res <= 0) {
+            Dmsg0(500, "sql_batch_insert failed\n");
+            goto bail_out;
          }
-         break;
+
+         Dmsg0(500, "sql_batch_insert finishing\n");
+         return true;
+      } else {
+         /*
+          * Ensure to detect a PQerror
+          */
+         custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
+         Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
+         goto bail_out;
+      }
+      break;
+   case SQL_TYPE_SQLITE3:
+      db_escape_string(jcr, esc_name, fname, fnl);
+      db_escape_string(jcr, esc_path, path, pnl);
+      len = Mmsg(cmd, "INSERT INTO batch VALUES "
+                      "(%u,%s,'%s','%s','%s','%s',%u)",
+                      ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
+                      esc_name, ar->attr, digest, ar->DeltaSeq);
+
+      if (!sql_query(cmd))
+      {
+         Dmsg0(500, "sql_batch_insert failed\n");
+         goto bail_out;
       }
 
-   } else {
-      dbi_conn_error(dbi_result_get_conn(result), &errmsg);
-      Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
+      Dmsg0(500, "sql_batch_insert finishing\n");
+
+      return true;
+      break;
    }
-                
-   Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n", 
-      buf, field_length, buf);
-      
-   // don't worry about this buf
-   return buf;
+
+bail_out:
+   Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
+   m_status = (dbi_error_flag) 0;
+   sql_free_result();
+   return false;
 }
 
-int my_dbi_sql_insert_id(B_DB *mdb, char *table_name)
+/*
+ * Initialize database data structure. In principal this should
+ * never have errors, or it is really fatal.
+ */
+B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
+                       const char *db_password, const char *db_address, int db_port,
+                       const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
 {
-   /*
-    Obtain the current value of the sequence that
-    provides the serial value for primary key of the table.
-
-    currval is local to our session.  It is not affected by
-    other transactions.
-
-    Determine the name of the sequence.
-    PostgreSQL automatically creates a sequence using
-    <table>_<column>_seq.
-    At the time of writing, all tables used this format for
-    for their primary key: <table>id
-    Except for basefiles which has a primary key on baseid.
-    Therefore, we need to special case that one table.
-
-    everything else can use the PostgreSQL formula.
-   */
-   
-   char      sequence[30];  
-   uint64_t    id = 0;
-
-   if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
-           
-      if (strcasecmp(table_name, "basefiles") == 0) {
-         bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
-      } else {
-         bstrncpy(sequence, table_name, sizeof(sequence));
-         bstrncat(sequence, "_",        sizeof(sequence));
-         bstrncat(sequence, table_name, sizeof(sequence));
-         bstrncat(sequence, "id",       sizeof(sequence));
-      }
+   B_DB_DBI *mdb = NULL;
 
-      bstrncat(sequence, "_seq", sizeof(sequence));
-      id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
-   } else {
-      id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
+   if (!db_driver) {
+      Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
    }
-   
-   return id;
-}
 
-#ifdef HAVE_BATCH_FILE_INSERT
-const char *my_dbi_batch_lock_path_query[3] = {
-   /* Mysql */
-   "LOCK TABLES Path write, batch write, Path as p write",
-   /* Postgresql */
-   "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
-   /* SQLite */
-   "BEGIN"};  
-
-const char *my_dbi_batch_lock_filename_query[3] = {
-   /* Mysql */
-   "LOCK TABLES Filename write, batch write, Filename as f write",
-   /* Postgresql */
-   "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
-   /* SQLite */
-   "BEGIN"};
-
-const char *my_dbi_batch_unlock_tables_query[3] = {
-   /* Mysql */
-   "UNLOCK TABLES",
-   /* Postgresql */
-   "COMMIT",
-   /* SQLite */
-   "COMMIT"};
-
-const char *my_dbi_batch_fill_path_query[3] = {
-   /* Mysql */
-   "INSERT INTO Path (Path) "
-   "SELECT a.Path FROM " 
-   "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
-   "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
-   /* Postgresql */
-   "INSERT INTO Path (Path) "
-   "SELECT a.Path FROM "
-   "(SELECT DISTINCT Path FROM batch) AS a "
-   "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
-   /* SQLite */
-   "INSERT INTO Path (Path)" 
-   " SELECT DISTINCT Path FROM batch"
-   " EXCEPT SELECT Path FROM Path"};
-
-const char *my_dbi_batch_fill_filename_query[3] = {
-   /* Mysql */
-   "INSERT INTO Filename (Name) "
-   "SELECT a.Name FROM " 
-   "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
-   "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
-   /* Postgresql */
-   "INSERT INTO Filename (Name) "
-   "SELECT a.Name FROM "
-   "(SELECT DISTINCT Name FROM batch) as a "
-   "WHERE NOT EXISTS "
-   "(SELECT Name FROM Filename WHERE Name = a.Name)",
-   /* SQLite */
-   "INSERT INTO Filename (Name)"
-   " SELECT DISTINCT Name FROM batch "
-   " EXCEPT SELECT Name FROM Filename"};
-#endif /* HAVE_BATCH_FILE_INSERT */
+   if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
+      Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
+   }
+
+   if (!db_user) {
+      Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
+      return NULL;
+   }
+
+   P(mutex);                          /* lock DB queue */
+   if (db_list && !mult_db_connections) {
+      /*
+       * Look to see if DB already open
+       */
+      foreach_dlist(mdb, db_list) {
+         if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
+            Dmsg1(100, "DB REopen %s\n", db_name);
+            mdb->increment_refcount();
+            goto bail_out;
+         }
+      }
+   }
+   Dmsg0(100, "db_init_database first time\n");
+   mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
+                      db_port, db_socket, mult_db_connections, disable_batch_insert));
+
+bail_out:
+   V(mutex);
+   return mdb;
+}
 
 #endif /* HAVE_DBI */