]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/dbi.c
Enhance mountcache with rescan option after interval.
[bacula/bacula] / bacula / src / cats / dbi.c
index f3c360ad7195363865fafa8d5de1f596b623ce25..3181e8b49beb422d2ceb20a8ac3173a7808cc584 100644 (file)
@@ -1,12 +1,12 @@
 /*
    Bacula® - The Network Backup Solution
 
-   Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
+   Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
 
    The main author of Bacula is Kern Sibbald, with contributions from
    many others, a complete list can be found in the file AUTHORS.
    This program is Free Software; you can redistribute it and/or
-   modify it under the terms of version two of the GNU General Public
+   modify it under the terms of version three of the GNU Affero General Public
    License as published by the Free Software Foundation and included
    in the file LICENSE.
 
@@ -15,7 +15,7 @@
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Affero General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301, USA.
@@ -33,6 +33,7 @@
  *    based upon work done by Dan Langille, December 2003 and
  *    by Kern Sibbald, March 2000
  *
+ * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
  */
 /*
  * This code only compiles against a recent version of libdbi. The current
  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
  */
 
-
-/* The following is necessary so that we do not include
- * the dummy external definition of DB.
- */
-#define __SQL_C                       /* indicate that this is sql.c */
-
 #include "bacula.h"
-#include "cats.h"
 
 #ifdef HAVE_DBI
 
+#include "cats.h"
+#include "bdb_priv.h"
+#include <dbi/dbi.h>
+#include <dbi/dbi-dev.h>
+#include <bdb_dbi.h>
+
 /* -----------------------------------------------------------------------
  *
  *   DBI dependent defines and subroutines
  * -----------------------------------------------------------------------
  */
 
-/* List of open databases */
+/*
+ * List of open databases
+ */
 static dlist *db_list = NULL;
 
-/* Control allocated fields by my_dbi_getvalue */
+/*
+ * Control allocated fields by dbi_getvalue
+ */
 static dlist *dbi_getvalue_list = NULL;
 
 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
-/*
- * Retrieve database type
- */
-const char *
-db_get_type(void)
-{
-   return "DBI";
-}
-
-/*
- * Initialize database data structure. In principal this should
- * never have errors, or it is really fatal.
- */
-B_DB *
-db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
-                 const char *db_address, int db_port, const char *db_socket,
-                 int mult_db_connections)
+typedef int (*custom_function_insert_t)(void*, const char*, int);
+typedef char* (*custom_function_error_t)(void*);
+typedef int (*custom_function_end_t)(void*, const char*);
+
+B_DB_DBI::B_DB_DBI(JCR *jcr,
+                   const char *db_driver,
+                   const char *db_name,
+                   const char *db_user,
+                   const char *db_password,
+                   const char *db_address,
+                   int db_port,
+                   const char *db_socket,
+                   bool mult_db_connections,
+                   bool disable_batch_insert)
 {
-   B_DB *mdb = NULL;
-   DBI_FIELD_GET *field;
-   char db_driver[10];
+   char *p;
+   char new_db_driver[10];
    char db_driverdir[256];
+   DBI_FIELD_GET *field;
 
-   /* Constraint the db_driver */
-   if(db_type  == -1) {
-      Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
-      return NULL;
+   p = (char *)(db_driver + 4);
+   if (strcasecmp(p, "mysql") == 0) {
+      m_db_type = SQL_TYPE_MYSQL;
+      bstrncpy(new_db_driver, "mysql", sizeof(new_db_driver));
+   } else if (strcasecmp(p, "postgresql") == 0) {
+      m_db_type = SQL_TYPE_POSTGRESQL;
+      bstrncpy(new_db_driver, "pgsql", sizeof(new_db_driver));
+   } else if (strcasecmp(p, "sqlite3") == 0) {
+      m_db_type = SQL_TYPE_SQLITE3;
+      bstrncpy(new_db_driver, "sqlite3", sizeof(new_db_driver));
+   } else if (strcasecmp(p, "ingres") == 0) {
+      m_db_type = SQL_TYPE_INGRES;
+      bstrncpy(new_db_driver, "ingres", sizeof(new_db_driver));
+   } else {
+      Jmsg(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
+      return;
    }
 
-   /* Do the correct selection of driver.
-    * Can be one of the varius supported by libdbi
+   /*
+    * Set db_driverdir whereis is the libdbi drivers
     */
-   switch (db_type) {
-   case SQL_TYPE_MYSQL:
-      bstrncpy(db_driver,"mysql", sizeof(db_driver));
-      break;
-   case SQL_TYPE_POSTGRESQL:
-      bstrncpy(db_driver,"pgsql", sizeof(db_driver));
-      break;
-   case SQL_TYPE_SQLITE:
-      bstrncpy(db_driver,"sqlite", sizeof(db_driver));
-      break;
-   case SQL_TYPE_SQLITE3:
-      bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
-      break;
-   }
-
-   /* Set db_driverdir whereis is the libdbi drivers */
    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
 
-   if (!db_user) {
-      Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
-      return NULL;
-   }
-   P(mutex);                          /* lock DB queue */
-   if (db_list == NULL) {
-      db_list = New(dlist(mdb, &mdb->link));
-      dbi_getvalue_list = New(dlist(field, &field->link));
-   }
-   if (!mult_db_connections) {
-      /* Look to see if DB already open */
-      foreach_dlist(mdb, db_list) {
-         if (bstrcmp(mdb->db_name, db_name) &&
-             bstrcmp(mdb->db_address, db_address) &&
-             bstrcmp(mdb->db_driver, db_driver) &&
-             mdb->db_port == db_port) {
-            Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
-                  dbi_conn_error(mdb->db, NULL));
-            mdb->ref_count++;
-            V(mutex);
-            return mdb;                  /* already open */
-         }
-      }
-   }
-   Dmsg0(100, "db_open first time\n");
-   mdb = (B_DB *)malloc(sizeof(B_DB));
-   memset(mdb, 0, sizeof(B_DB));
-   mdb->db_name = bstrdup(db_name);
-   mdb->db_user = bstrdup(db_user);
+   /*
+    * Initialize the parent class members.
+    */
+   m_db_interface_type = SQL_INTERFACE_TYPE_DBI;
+   m_db_name = bstrdup(db_name);
+   m_db_user = bstrdup(db_user);
    if (db_password) {
-      mdb->db_password = bstrdup(db_password);
+      m_db_password = bstrdup(db_password);
    }
    if (db_address) {
-      mdb->db_address  = bstrdup(db_address);
+      m_db_address = bstrdup(db_address);
    }
    if (db_socket) {
-      mdb->db_socket   = bstrdup(db_socket);
+      m_db_socket = bstrdup(db_socket);
    }
    if (db_driverdir) {
-      mdb->db_driverdir = bstrdup(db_driverdir);
+      m_db_driverdir = bstrdup(db_driverdir);
    }
-   if (db_driver) {
-      mdb->db_driver    = bstrdup(db_driver);
+   m_db_driver = bstrdup(new_db_driver);
+   m_db_port = db_port;
+   if (disable_batch_insert) {
+      m_disabled_batch_insert = true;
+      m_have_batch_insert = false;
+   } else {
+      m_disabled_batch_insert = false;
+#if defined(USE_BATCH_FILE_INSERT)
+#ifdef HAVE_DBI_BATCH_FILE_INSERT
+      m_have_batch_insert = true;
+#else
+      m_have_batch_insert = false;
+#endif /* HAVE_DBI_BATCH_FILE_INSERT */
+#else
+      m_have_batch_insert = false;
+#endif /* USE_BATCH_FILE_INSERT */
    }
-   mdb->db_type        = db_type;
-   mdb->db_port        = db_port;
-   mdb->have_insert_id = TRUE;
-   mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
-   *mdb->errmsg        = 0;
-   mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
-   mdb->cached_path    = get_pool_memory(PM_FNAME);
-   mdb->cached_path_id = 0;
-   mdb->ref_count      = 1;
-   mdb->fname          = get_pool_memory(PM_FNAME);
-   mdb->path           = get_pool_memory(PM_FNAME);
-   mdb->esc_name       = get_pool_memory(PM_FNAME);
-   mdb->esc_path      = get_pool_memory(PM_FNAME);
-   mdb->allow_transactions = mult_db_connections;
-   db_list->append(mdb);                   /* put db in list */
-   V(mutex);
-   return mdb;
+   errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
+   *errmsg = 0;
+   cmd = get_pool_memory(PM_EMSG); /* get command buffer */
+   cached_path = get_pool_memory(PM_FNAME);
+   cached_path_id = 0;
+   m_ref_count = 1;
+   fname = get_pool_memory(PM_FNAME);
+   path = get_pool_memory(PM_FNAME);
+   esc_name = get_pool_memory(PM_FNAME);
+   esc_path = get_pool_memory(PM_FNAME);
+   esc_obj = get_pool_memory(PM_FNAME);
+   m_allow_transactions = mult_db_connections;
+
+   /*
+    * Initialize the private members.
+    */
+   m_db_handle = NULL;
+   m_result = NULL;
+   m_field_get = NULL;
+
+   /*
+    * Put the db in the list.
+    */
+   if (db_list == NULL) {
+      db_list = New(dlist(this, &this->m_link));
+      dbi_getvalue_list = New(dlist(field, &field->link));
+   }
+   db_list->append(this);
+}
+
+B_DB_DBI::~B_DB_DBI()
+{
 }
 
 /*
  * Now actually open the database.  This can generate errors,
  *   which are returned in the errmsg
  *
- * DO NOT close the database or free(mdb) here  !!!!
+ * DO NOT close the database or delete mdb here  !!!!
  */
-int
-db_open_database(JCR *jcr, B_DB *mdb)
+bool B_DB_DBI::db_open_database(JCR *jcr)
 {
+   bool retval = false;
    int errstat;
    int dbstat;
    uint8_t len;
-   const char *errmsg;
+   const char *dbi_errmsg;
    char buf[10], *port;
    int numdrivers;
-   char *db_name = NULL;
-   char *db_dir = NULL;
+   char *new_db_name = NULL;
+   char *new_db_dir = NULL;
 
    P(mutex);
-   if (mdb->connected) {
-      V(mutex);
-      return 1;
+   if (m_connected) {
+      retval = true;
+      goto bail_out;
    }
-   mdb->connected = false;
 
-   if ((errstat=rwl_init(&mdb->lock)) != 0) {
+   if ((errstat=rwl_init(&m_lock)) != 0) {
       berrno be;
-      Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
+      Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
             be.bstrerror(errstat));
-      V(mutex);
-      return 0;
+      goto bail_out;
    }
 
-   if (mdb->db_port) {
-      bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
+   if (m_db_port) {
+      bsnprintf(buf, sizeof(buf), "%d", m_db_port);
       port = buf;
    } else {
       port = NULL;
    }
 
-   numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
+   numdrivers = dbi_initialize_r(m_db_driverdir, &(m_instance));
    if (numdrivers < 0) {
-      Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
+      Mmsg2(&errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
                                "db_driverdir=%s. It is probaly not found any drivers\n"),
-                               mdb->db_driverdir,numdrivers);
-      V(mutex);
-      return 0;
+                               m_db_driverdir,numdrivers);
+      goto bail_out;
    }
-   mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
-   /* Can be many types of databases */
-   switch (mdb->db_type) {
+   m_db_handle = (void **)dbi_conn_new_r(m_db_driver, m_instance);
+   /*
+    * Can be many types of databases
+    */
+   switch (m_db_type) {
    case SQL_TYPE_MYSQL:
-      dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
-      dbi_conn_set_option(mdb->db, "port", port);            /* default port */
-      dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
-      dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
-      dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
+      dbi_conn_set_option(m_db_handle, "host", m_db_address);      /* default = localhost */
+      dbi_conn_set_option(m_db_handle, "port", port);              /* default port */
+      dbi_conn_set_option(m_db_handle, "username", m_db_user);     /* login name */
+      dbi_conn_set_option(m_db_handle, "password", m_db_password); /* password */
+      dbi_conn_set_option(m_db_handle, "dbname", m_db_name);       /* database name */
       break;
    case SQL_TYPE_POSTGRESQL:
-      dbi_conn_set_option(mdb->db, "host", mdb->db_address);
-      dbi_conn_set_option(mdb->db, "port", port);
-      dbi_conn_set_option(mdb->db, "username", mdb->db_user);
-      dbi_conn_set_option(mdb->db, "password", mdb->db_password);
-      dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
-      break;
-   case SQL_TYPE_SQLITE:
-      len = strlen(working_directory) + 5;
-      db_dir = (char *)malloc(len);
-      strcpy(db_dir, working_directory);
-      strcat(db_dir, "/");
-      len = strlen(mdb->db_name) + 5;
-      db_name = (char *)malloc(len);
-      strcpy(db_name, mdb->db_name);
-      strcat(db_name, ".db");
-      dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
-      dbi_conn_set_option(mdb->db, "dbname", db_name);
+      dbi_conn_set_option(m_db_handle, "host", m_db_address);
+      dbi_conn_set_option(m_db_handle, "port", port);
+      dbi_conn_set_option(m_db_handle, "username", m_db_user);
+      dbi_conn_set_option(m_db_handle, "password", m_db_password);
+      dbi_conn_set_option(m_db_handle, "dbname", m_db_name);
       break;
    case SQL_TYPE_SQLITE3:
       len = strlen(working_directory) + 5;
-      db_dir = (char *)malloc(len);
-      strcpy(db_dir, working_directory);
-      strcat(db_dir, "/");
-      len = strlen(mdb->db_name) + 5;
-      db_name = (char *)malloc(len);
-      strcpy(db_name, mdb->db_name);
-      strcat(db_name, ".db");
-      dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
-      dbi_conn_set_option(mdb->db, "dbname", db_name);
-      Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
+      new_db_dir = (char *)malloc(len);
+      strcpy(new_db_dir, working_directory);
+      strcat(new_db_dir, "/");
+      len = strlen(m_db_name) + 5;
+      new_db_name = (char *)malloc(len);
+      strcpy(new_db_name, m_db_name);
+      strcat(new_db_name, ".db");
+      dbi_conn_set_option(m_db_handle, "sqlite3_dbdir", new_db_dir);
+      dbi_conn_set_option(m_db_handle, "dbname", new_db_name);
+      Dmsg2(500, "SQLITE: %s %s\n", new_db_dir, new_db_name);
+      free(new_db_dir);
+      free(new_db_name);
       break;
    }
 
-   /* If connection fails, try at 5 sec intervals for 30 seconds. */
+   /*
+    * If connection fails, try at 5 sec intervals for 30 seconds.
+    */
    for (int retry=0; retry < 6; retry++) {
-
-      dbstat = dbi_conn_connect(mdb->db);
-      if ( dbstat == 0) {
+      dbstat = dbi_conn_connect(m_db_handle);
+      if (dbstat == 0) {
          break;
       }
 
-      dbi_conn_error(mdb->db, &errmsg);
-      Dmsg1(50, "dbi error: %s\n", errmsg);
+      dbi_conn_error(m_db_handle, &dbi_errmsg);
+      Dmsg1(50, "dbi error: %s\n", dbi_errmsg);
 
       bmicrosleep(5, 0);
-
    }
 
-   if ( dbstat != 0 ) {
-      Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
+   if (dbstat != 0 ) {
+      Mmsg3(&errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
-         mdb->db_driver, mdb->db_name, mdb->db_user);
-      V(mutex);
-      return 0;
+         m_db_driver, m_db_name, m_db_user);
+      goto bail_out;
    }
 
    Dmsg0(50, "dbi_real_connect done\n");
    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
-                    mdb->db_user, mdb->db_name,
-                    mdb->db_password==NULL?"(NULL)":mdb->db_password);
+         m_db_user, m_db_name,
+        (m_db_password == NULL) ? "(NULL)" : m_db_password);
 
-   mdb->connected = true;
+   m_connected = true;
 
-   if (!check_tables_version(jcr, mdb)) {
-      V(mutex);
-      return 0;
+   if (!check_tables_version(jcr, this)) {
+      goto bail_out;
    }
 
-   switch (mdb->db_type) {
+   switch (m_db_type) {
    case SQL_TYPE_MYSQL:
-      /* Set connection timeout to 8 days specialy for batch mode */
-      sql_query(mdb, "SET wait_timeout=691200");
-      sql_query(mdb, "SET interactive_timeout=691200");
+      /*
+       * Set connection timeout to 8 days specialy for batch mode
+       */
+      sql_query("SET wait_timeout=691200");
+      sql_query("SET interactive_timeout=691200");
       break;
    case SQL_TYPE_POSTGRESQL:
-      /* tell PostgreSQL we are using standard conforming strings
-         and avoid warnings such as:
-         WARNING:  nonstandard use of \\ in a string literal
-      */
-      sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
-      sql_query(mdb, "set standard_conforming_strings=on");
+      /*
+       * Tell PostgreSQL we are using standard conforming strings
+       * and avoid warnings such as:
+       * WARNING:  nonstandard use of \\ in a string literal
+       */
+      sql_query("SET datestyle TO 'ISO, YMD'");
+      sql_query("SET standard_conforming_strings=on");
       break;
    }
 
-   if(db_dir) {
-      free(db_dir);
-   }
-   if(db_name) {
-      free(db_name);
-   }
+   retval = true;
 
+bail_out:
    V(mutex);
-   return 1;
+   return retval;
 }
 
-void
-db_close_database(JCR *jcr, B_DB *mdb)
+void B_DB_DBI::db_close_database(JCR *jcr)
 {
-   if (!mdb) {
-      return;
-   }
-   db_end_transaction(jcr, mdb);
+   db_end_transaction(jcr);
    P(mutex);
-   sql_free_result(mdb);
-   mdb->ref_count--;
-   if (mdb->ref_count == 0) {
-      db_list->remove(mdb);
-      if (mdb->connected && mdb->db) {
-         //sql_close(mdb);
-         dbi_shutdown_r(mdb->instance);
-         mdb->db = NULL;
-         mdb->instance = NULL;
+   m_ref_count--;
+   if (m_ref_count == 0) {
+      sql_free_result();
+      db_list->remove(this);
+      if (m_connected && m_db_handle) {
+         dbi_shutdown_r(m_instance);
+         m_db_handle = NULL;
+         m_instance = NULL;
       }
-      rwl_destroy(&mdb->lock);
-      free_pool_memory(mdb->errmsg);
-      free_pool_memory(mdb->cmd);
-      free_pool_memory(mdb->cached_path);
-      free_pool_memory(mdb->fname);
-      free_pool_memory(mdb->path);
-      free_pool_memory(mdb->esc_name);
-      free_pool_memory(mdb->esc_path);
-      if (mdb->db_name) {
-         free(mdb->db_name);
+      rwl_destroy(&m_lock);
+      free_pool_memory(errmsg);
+      free_pool_memory(cmd);
+      free_pool_memory(cached_path);
+      free_pool_memory(fname);
+      free_pool_memory(path);
+      free_pool_memory(esc_name);
+      free_pool_memory(esc_path);
+      free_pool_memory(esc_obj);
+      if (m_db_driver) {
+         free(m_db_driver);
       }
-      if (mdb->db_user) {
-         free(mdb->db_user);
+      if (m_db_name) {
+         free(m_db_name);
       }
-      if (mdb->db_password) {
-         free(mdb->db_password);
+      if (m_db_user) {
+         free(m_db_user);
       }
-      if (mdb->db_address) {
-         free(mdb->db_address);
+      if (m_db_password) {
+         free(m_db_password);
       }
-      if (mdb->db_socket) {
-         free(mdb->db_socket);
+      if (m_db_address) {
+         free(m_db_address);
       }
-      if (mdb->db_driverdir) {
-         free(mdb->db_driverdir);
+      if (m_db_socket) {
+         free(m_db_socket);
       }
-      if (mdb->db_driver) {
-          free(mdb->db_driver);
+      if (m_db_driverdir) {
+         free(m_db_driverdir);
       }
-      free(mdb);
+      delete this;
       if (db_list->size() == 0) {
          delete db_list;
          db_list = NULL;
@@ -399,21 +384,10 @@ db_close_database(JCR *jcr, B_DB *mdb)
    V(mutex);
 }
 
-void db_thread_cleanup()
-{ }
-
-/*
- * Return the next unique index (auto-increment) for
- * the given table.  Return NULL on error.
- *
- */
-int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
+void B_DB_DBI::db_thread_cleanup(void)
 {
-   strcpy(index, "NULL");
-   return 1;
 }
 
-
 /*
  * Escape strings so that DBI is happy
  *
@@ -425,8 +399,7 @@ int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
  * We need copy the value of pointer to snew because libdbi change the
  * pointer
  */
-void
-db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
+void B_DB_DBI::db_escape_string(JCR *jcr, char *snew, char *old, int len)
 {
    char *inew;
    char *pnew;
@@ -434,617 +407,712 @@ db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
    if (len == 0) {
       snew[0] = 0;
    } else {
-      /* correct the size of old basead in len
-       * and copy new string to inew
+      /*
+       * Correct the size of old basead in len and copy new string to inew
        */
       inew = (char *)malloc(sizeof(char) * len + 1);
       bstrncpy(inew,old,len + 1);
-      /* escape the correct size of old */
-      dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
+      /*
+       * Escape the correct size of old
+       */
+      dbi_conn_escape_string_copy(m_db_handle, inew, &pnew);
       free(inew);
-      /* copy the escaped string to snew */
+      /*
+       * Copy the escaped string to snew
+       */
       bstrncpy(snew, pnew, 2 * len + 1);
    }
 
    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
-
 }
 
 /*
- * Submit a general SQL command (cmd), and for each row returned,
- *  the sqlite_handler is called with the ctx.
+ * Escape binary object so that DBI is happy
+ * Memory is stored in B_DB struct, no need to free it
  */
-bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
+char *B_DB_DBI::db_escape_object(JCR *jcr, char *old, int len)
 {
-   SQL_ROW row;
-
-   Dmsg0(500, "db_sql_query started\n");
+   size_t new_len;
+   char *pnew;
 
-   db_lock(mdb);
-   if (sql_query(mdb, query) != 0) {
-      Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
-      db_unlock(mdb);
-      Dmsg0(500, "db_sql_query failed\n");
-      return false;
+   if (len == 0) {
+      esc_obj[0] = 0;
+   } else {
+      new_len = dbi_conn_escape_string_copy(m_db_handle, esc_obj, &pnew);
+      esc_obj = check_pool_memory_size(esc_obj, new_len+1);
+      memcpy(esc_obj, pnew, new_len);
    }
-   Dmsg0(500, "db_sql_query succeeded. checking handler\n");
-
-   if (result_handler != NULL) {
-      Dmsg0(500, "db_sql_query invoking handler\n");
-      if ((mdb->result = sql_store_result(mdb)) != NULL) {
-         int num_fields = sql_num_fields(mdb);
-
-         Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
-         while ((row = sql_fetch_row(mdb)) != NULL) {
 
-            Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
-            if (result_handler(ctx, num_fields, row))
-               break;
-         }
+   return esc_obj;
+}
 
-        sql_free_result(mdb);
-      }
+/*
+ * Unescape binary object so that DBI is happy
+ */
+void B_DB_DBI::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
+                                  POOLMEM **dest, int32_t *dest_len)
+{
+   if (!from) {
+      *dest[0] = 0;
+      *dest_len = 0;
+      return;
    }
-   db_unlock(mdb);
-
-   Dmsg0(500, "db_sql_query finished\n");
-
-   return true;
+   *dest = check_pool_memory_size(*dest, expected_len+1);
+   *dest_len = expected_len;
+   memcpy(*dest, from, expected_len);
+   (*dest)[expected_len]=0;
 }
 
-
-
-DBI_ROW my_dbi_fetch_row(B_DB *mdb)
+/*
+ * Start a transaction. This groups inserts and makes things
+ * much more efficient. Usually started when inserting
+ * file attributes.
+ */
+void B_DB_DBI::db_start_transaction(JCR *jcr)
 {
-   int j;
-   DBI_ROW row = NULL; // by default, return NULL
-
-   Dmsg0(500, "my_dbi_fetch_row start\n");
-   if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
-      int num_fields = mdb->num_fields;
-      Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
-
-      if (mdb->row) {
-         Dmsg0(500, "my_dbi_fetch_row freeing space\n");
-         Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
-         if (mdb->num_rows != 0) {
-            for(j = 0; j < mdb->num_fields; j++) {
-               Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
-                  if(mdb->row[j]) {
-                     free(mdb->row[j]);
-                  }
-            }
-         }
-         free(mdb->row);
+   if (!jcr->attr) {
+      jcr->attr = get_pool_memory(PM_FNAME);
+   }
+   if (!jcr->ar) {
+      jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
+   }
+
+   switch (m_db_type) {
+   case SQL_TYPE_SQLITE3:
+      if (!m_allow_transactions) {
+         return;
       }
-      //num_fields += 20;                  /* add a bit extra */
-      mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
-      mdb->row_size = num_fields;
 
-      // now reset the row_number now that we have the space allocated
-      mdb->row_number = 1;
-   }
+      db_lock(this);
+      /*
+       * Allow only 10,000 changes per transaction
+       */
+      if (m_transaction && changes > 10000) {
+         db_end_transaction(jcr);
+      }
+      if (!m_transaction) {
+         sql_query("BEGIN");  /* begin transaction */
+         Dmsg0(400, "Start SQLite transaction\n");
+         m_transaction = true;
+      }
+      db_unlock(this);
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      /*
+       * This is turned off because transactions break
+       * if multiple simultaneous jobs are run.
+       */
+      if (!m_allow_transactions) {
+         return;
+      }
 
-   // if still within the result set
-   if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
-      Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
-      // get each value from this row
-      for (j = 0; j < mdb->num_fields; j++) {
-         mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
-         // allocate space to queue row
-         mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
-         // store the pointer in queue
-         mdb->field_get->value = mdb->row[j];
-         Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
-               j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
-         // insert in queue to future free
-         dbi_getvalue_list->append(mdb->field_get);
+      db_lock(this);
+      /*
+       * Allow only 25,000 changes per transaction
+       */
+      if (m_transaction && changes > 25000) {
+         db_end_transaction(jcr);
+      }
+      if (!m_transaction) {
+         sql_query("BEGIN");  /* begin transaction */
+         Dmsg0(400, "Start PosgreSQL transaction\n");
+         m_transaction = true;
+      }
+      db_unlock(this);
+      break;
+   case SQL_TYPE_INGRES:
+      if (!m_allow_transactions) {
+         return;
       }
-      // increment the row number for the next call
-      mdb->row_number++;
 
-      row = mdb->row;
-   } else {
-      Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
+      db_lock(this);
+      /*
+       * Allow only 25,000 changes per transaction
+       */
+      if (m_transaction && changes > 25000) {
+         db_end_transaction(jcr);
+      }
+      if (!m_transaction) {
+         sql_query("BEGIN");  /* begin transaction */
+         Dmsg0(400, "Start Ingres transaction\n");
+         m_transaction = true;
+      }
+      db_unlock(this);
+      break;
+   default:
+      break;
    }
+}
 
-   Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
+void B_DB_DBI::db_end_transaction(JCR *jcr)
+{
+   if (jcr && jcr->cached_attribute) {
+      Dmsg0(400, "Flush last cached attribute.\n");
+      if (!db_create_attributes_record(jcr, this, jcr->ar)) {
+         Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
+      }
+      jcr->cached_attribute = false;
+   }
 
-   return row;
-}
+   switch (m_db_type) {
+   case SQL_TYPE_SQLITE3:
+      if (!m_allow_transactions) {
+         return;
+      }
 
-int my_dbi_max_length(B_DB *mdb, int field_num) {
-   //
-   // for a given column, find the max length
-   //
-   int max_length;
-   int i;
-   int this_length;
-   char *cbuf = NULL;
+      db_lock(this);
+      if (m_transaction) {
+         sql_query("COMMIT"); /* end transaction */
+         m_transaction = false;
+         Dmsg1(400, "End SQLite transaction changes=%d\n", changes);
+      }
+      changes = 0;
+      db_unlock(this);
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      if (!m_allow_transactions) {
+         return;
+      }
 
-   max_length = 0;
-   for (i = 0; i < mdb->num_rows; i++) {
-      if (my_dbi_getisnull(mdb->result, i, field_num)) {
-          this_length = 4;        // "NULL"
-      } else {
-         cbuf = my_dbi_getvalue(mdb->result, i, field_num);
-         this_length = cstrlen(cbuf);
-         // cbuf is always free
-         free(cbuf);
+      db_lock(this);
+      if (m_transaction) {
+         sql_query("COMMIT"); /* end transaction */
+         m_transaction = false;
+         Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
+      }
+      changes = 0;
+      db_unlock(this);
+      break;
+   case SQL_TYPE_INGRES:
+      if (!m_allow_transactions) {
+         return;
       }
 
-      if (max_length < this_length) {
-          max_length = this_length;
+      db_lock(this);
+      if (m_transaction) {
+         sql_query("COMMIT"); /* end transaction */
+         m_transaction = false;
+         Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
       }
+      changes = 0;
+      db_unlock(this);
+      break;
+   default:
+      break;
    }
-
-   return max_length;
 }
 
-DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
+/*
+ * Submit a general SQL command (cmd), and for each row returned,
+ * the result_handler is called with the ctx.
+ */
+bool B_DB_DBI::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
 {
-   int     i;
-   int     dbi_index;
-
-   Dmsg0(500, "my_dbi_fetch_field starts\n");
-
-   if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
-      if (mdb->fields) {
-         free(mdb->fields);
-      }
-      Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
-      mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
-      mdb->fields_size = mdb->num_fields;
+   bool retval = true;
+   SQL_ROW row;
 
-      for (i = 0; i < mdb->num_fields; i++) {
-         // num_fileds is starting at 1, increment i by 1
-         dbi_index = i + 1;
-         Dmsg1(500, "filling field %d\n", i);
-         mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
-         mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
-         mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
-         mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
+   Dmsg1(500, "db_sql_query starts with %s\n", query);
 
-         Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
-            mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
-            mdb->fields[i].flags);
-      } // end for
-   } // end if
+   db_lock(this);
+   if (!sql_query(query, QF_STORE_RESULT)) {
+      Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
+      Dmsg0(500, "db_sql_query failed\n");
+      retval = false;
+      goto bail_out;
+   }
 
-   // increment field number for the next time around
+   Dmsg0(500, "db_sql_query succeeded. checking handler\n");
 
-   Dmsg0(500, "my_dbi_fetch_field finishes\n");
-   return &mdb->fields[mdb->field_number++];
-}
+   if (result_handler != NULL) {
+      Dmsg0(500, "db_sql_query invoking handler\n");
+      while ((row = sql_fetch_row()) != NULL) {
+         Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
+         if (result_handler(ctx, m_num_fields, row))
+            break;
+      }
+      sql_free_result();
+   }
 
-void my_dbi_data_seek(B_DB *mdb, int row)
-{
-   // set the row number to be returned on the next call
-   // to my_dbi_fetch_row
-   mdb->row_number = row;
-}
+   Dmsg0(500, "db_sql_query finished\n");
 
-void my_dbi_field_seek(B_DB *mdb, int field)
-{
-   mdb->field_number = field;
+bail_out:
+   db_unlock(this);
+   return retval;
 }
 
 /*
  * Note, if this routine returns 1 (failure), Bacula expects
  *  that no result has been stored.
  *
- *  Returns:  0  on success
- *            1  on failure
- *
+ *  Returns:  true on success
+ *            false on failure
  */
-int my_dbi_query(B_DB *mdb, const char *query)
+bool B_DB_DBI::sql_query(const char *query, int flags)
 {
-   const char *errmsg;
-   Dmsg1(500, "my_dbi_query started %s\n", query);
-   // We are starting a new query.  reset everything.
-   mdb->num_rows     = -1;
-   mdb->row_number   = -1;
-   mdb->field_number = -1;
-
-   if (mdb->result) {
-      dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
-      mdb->result = NULL;
+   bool retval = false;
+   const char *dbi_errmsg;
+
+   Dmsg1(500, "sql_query starts with %s\n", query);
+
+   /*
+    * We are starting a new query.  reset everything.
+    */
+   m_num_rows     = -1;
+   m_row_number   = -1;
+   m_field_number = -1;
+
+   if (m_result) {
+      dbi_result_free(m_result);  /* hmm, someone forgot to free?? */
+      m_result = NULL;
    }
 
-   mdb->result = (void **)dbi_conn_query(mdb->db, query);
+   m_result = (void **)dbi_conn_query(m_db_handle, query);
 
-   if (!mdb->result) {
-      Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
+   if (!m_result) {
+      Dmsg2(50, "Query failed: %s %p\n", query, m_result);
       goto bail_out;
    }
 
-   mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
-
-   if (mdb->status == DBI_ERROR_NONE) {
+   m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
+   if (m_status == DBI_ERROR_NONE) {
       Dmsg1(500, "we have a result\n", query);
 
-      // how many fields in the set?
-      // num_fields starting at 1
-      mdb->num_fields = dbi_result_get_numfields(mdb->result);
-      Dmsg1(500, "we have %d fields\n", mdb->num_fields);
-      // if no result num_rows is 0
-      mdb->num_rows = dbi_result_get_numrows(mdb->result);
-      Dmsg1(500, "we have %d rows\n", mdb->num_rows);
+      /*
+       * How many fields in the set?
+       * num_fields starting at 1
+       */
+      m_num_fields = dbi_result_get_numfields(m_result);
+      Dmsg1(500, "we have %d fields\n", m_num_fields);
+      /*
+       * If no result num_rows is 0
+       */
+      m_num_rows = dbi_result_get_numrows(m_result);
+      Dmsg1(500, "we have %d rows\n", m_num_rows);
 
-      mdb->status = (dbi_error_flag) 0;                  /* succeed */
+      m_status = (dbi_error_flag) 0;                  /* succeed */
    } else {
       Dmsg1(50, "Result status failed: %s\n", query);
       goto bail_out;
    }
 
-   Dmsg0(500, "my_dbi_query finishing\n");
-   return mdb->status;
+   Dmsg0(500, "sql_query finishing\n");
+   retval = true;
+   goto ok_out;
 
 bail_out:
-   mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
-   //dbi_conn_error(mdb->db, &errmsg);
-   Dmsg4(500, "my_dbi_query we failed dbi error: "
-                   "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
-   dbi_result_free(mdb->result);
-   mdb->result = NULL;
-   mdb->status = (dbi_error_flag) 1;                   /* failed */
-   return mdb->status;
+   m_status = (dbi_error_flag) dbi_conn_error(m_db_handle, &dbi_errmsg);
+   //dbi_conn_error(m_db_handle, &dbi_errmsg);
+   Dmsg4(500, "sql_query we failed dbi error: "
+                   "'%s' '%p' '%d' flag '%d''\n", dbi_errmsg, m_result, m_result, m_status);
+   dbi_result_free(m_result);
+   m_result = NULL;
+   m_status = (dbi_error_flag) 1;                   /* failed */
+
+ok_out:
+   return retval;
 }
 
-void my_dbi_free_result(B_DB *mdb)
+void B_DB_DBI::sql_free_result(void)
 {
-
    DBI_FIELD_GET *f;
-   db_lock(mdb);
-   if (mdb->result) {
-      Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
-      dbi_result_free(mdb->result);
-   }
 
-   mdb->result = NULL;
-
-   if (mdb->row) {
-      free(mdb->row);
+   db_lock(this);
+   if (m_result) {
+      dbi_result_free(m_result);
+      m_result = NULL;
    }
-
-   /* now is time to free all value return by my_dbi_get_value
+   if (m_rows) {
+      free(m_rows);
+      m_rows = NULL;
+   }
+   /* 
+    * Now is time to free all value return by dbi_get_value
     * this is necessary because libdbi don't free memory return by yours results
-    * and Bacula has some routine wich call more than once time my_dbi_fetch_row
+    * and Bacula has some routine wich call more than once time sql_fetch_row
     *
     * Using a queue to store all pointer allocate is a good way to free all things
     * when necessary
     */
    foreach_dlist(f, dbi_getvalue_list) {
-      Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
       free(f->value);
       free(f);
    }
-
-   mdb->row = NULL;
-
-   if (mdb->fields) {
-      free(mdb->fields);
-      mdb->fields = NULL;
+   if (m_fields) {
+      free(m_fields);
+      m_fields = NULL;
    }
-   db_unlock(mdb);
-   Dmsg0(500, "my_dbi_free_result finish\n");
-
-}
-
-const char *my_dbi_strerror(B_DB *mdb)
-{
-   const char *errmsg;
-
-   dbi_conn_error(mdb->db, &errmsg);
-
-   return errmsg;
+   m_num_rows = m_num_fields = 0;
+   db_unlock(this);
 }
 
-#ifdef HAVE_BATCH_FILE_INSERT
-
-/*
- * This can be a bit strang but is the one way to do
+/* dbi_getvalue
+ * like PQgetvalue;
+ * char *PQgetvalue(const PGresult *res,
+ *                int row_number,
+ *                int column_number);
  *
- * Returns 1 if OK
- *         0 if failed
+ * use dbi_result_seek_row to search in result set
+ * use example to return only strings
  */
-int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
+static char *dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number)
 {
-   const char *query = "COPY batch FROM STDIN";
+   char *buf = NULL;
+   const char *dbi_errmsg;
+   const char *field_name;
+   unsigned short dbitype;
+   size_t field_length;
+   int64_t num;
 
-   Dmsg0(500, "my_dbi_batch_start started\n");
+   /* correct the index for dbi interface
+    * dbi index begins 1
+    * I prefer do not change others functions
+    */
+   Dmsg3(600, "dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
+         result, row_number, column_number);
 
-   switch (mdb->db_type) {
-   case SQL_TYPE_MYSQL:
-      db_lock(mdb);
-      if (my_dbi_query(mdb,
-                              "CREATE TEMPORARY TABLE batch ("
-                                  "FileIndex integer,"
-                                  "JobId integer,"
-                                  "Path blob,"
-                                  "Name blob,"
-                                  "LStat tinyblob,"
-                                  "MD5 tinyblob)") == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_start failed\n");
-         return 1;
-      }
-      db_unlock(mdb);
-      Dmsg0(500, "my_dbi_batch_start finishing\n");
-      return 1;
-      break;
-   case SQL_TYPE_POSTGRESQL:
+   column_number++;
 
-      if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
-                                  "fileindex int,"
-                                  "jobid int,"
-                                  "path varchar,"
-                                  "name varchar,"
-                                  "lstat varchar,"
-                                  "md5 varchar)") == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_start failed\n");
-         return 1;
-      }
+   if(row_number == 0) {
+     row_number++;
+   }
 
-      // We are starting a new query.  reset everything.
-      mdb->num_rows     = -1;
-      mdb->row_number   = -1;
-      mdb->field_number = -1;
+   Dmsg3(600, "dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
+                        result, row_number, column_number);
 
-      my_dbi_free_result(mdb);
+   if(dbi_result_seek_row(result, row_number)) {
 
-      for (int i=0; i < 10; i++) {
-         my_dbi_query(mdb, query);
-         if (mdb->result) {
-            break;
-         }
-         bmicrosleep(5, 0);
-      }
-      if (!mdb->result) {
-         Dmsg1(50, "Query failed: %s\n", query);
-         goto bail_out;
-      }
+      field_name = dbi_result_get_field_name(result, column_number);
+      field_length = dbi_result_get_field_length(result, field_name);
+      dbitype = dbi_result_get_field_type_idx(result,column_number);
 
-      mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
-      //mdb->status = DBI_ERROR_NONE;
+      Dmsg3(500, "dbi_getvalue start: type: '%d' "
+            "field_length bytes: '%d' fieldname: '%s'\n",
+            dbitype, field_length, field_name);
 
-      if (mdb->status == DBI_ERROR_NONE) {
-         // how many fields in the set?
-         mdb->num_fields = dbi_result_get_numfields(mdb->result);
-         mdb->num_rows   = dbi_result_get_numrows(mdb->result);
-         mdb->status = (dbi_error_flag) 1;
+      if(field_length) {
+         //buf = (char *)malloc(sizeof(char *) * field_length + 1);
+         buf = (char *)malloc(field_length + 1);
       } else {
-         Dmsg1(50, "Result status failed: %s\n", query);
-         goto bail_out;
+         /*
+          * if numbers
+          */
+         buf = (char *)malloc(sizeof(char *) * 50);
       }
 
-      Dmsg0(500, "my_postgresql_batch_start finishing\n");
+      switch (dbitype) {
+      case DBI_TYPE_INTEGER:
+         num = dbi_result_get_longlong(result, field_name);
+         edit_int64(num, buf);
+         field_length = strlen(buf);
+         break;
+      case DBI_TYPE_STRING:
+         if(field_length) {
+            field_length = bsnprintf(buf, field_length + 1, "%s",
+            dbi_result_get_string(result, field_name));
+         } else {
+            buf[0] = 0;
+         }
+         break;
+      case DBI_TYPE_BINARY:
+         /*
+          * dbi_result_get_binary return a NULL pointer if value is empty
+          * following, change this to what Bacula espected
+          */
+         if(field_length) {
+            field_length = bsnprintf(buf, field_length + 1, "%s",
+                  dbi_result_get_binary(result, field_name));
+         } else {
+            buf[0] = 0;
+         }
+         break;
+      case DBI_TYPE_DATETIME:
+         time_t last;
+         struct tm tm;
+
+         last = dbi_result_get_datetime(result, field_name);
 
-      return mdb->status;
-      break;
-   case SQL_TYPE_SQLITE:
-      db_lock(mdb);
-      if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
-                                  "FileIndex integer,"
-                                  "JobId integer,"
-                                  "Path blob,"
-                                  "Name blob,"
-                                  "LStat tinyblob,"
-                                  "MD5 tinyblob)") == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_start failed\n");
-         goto bail_out;
-      }
-      db_unlock(mdb);
-      Dmsg0(500, "my_dbi_batch_start finishing\n");
-      return 1;
-      break;
-   case SQL_TYPE_SQLITE3:
-      db_lock(mdb);
-      if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
-                                  "FileIndex integer,"
-                                  "JobId integer,"
-                                  "Path blob,"
-                                  "Name blob,"
-                                  "LStat tinyblob,"
-                                  "MD5 tinyblob)") == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_start failed\n");
-         goto bail_out;
+         if(last == -1) {
+                field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
+         } else {
+            (void)localtime_r(&last, &tm);
+            field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
+                  (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
+                  tm.tm_hour, tm.tm_min, tm.tm_sec);
+         }
+         break;
       }
-      db_unlock(mdb);
-      Dmsg0(500, "my_dbi_batch_start finishing\n");
-      return 1;
-      break;
+
+   } else {
+      dbi_conn_error(dbi_result_get_conn(result), &dbi_errmsg);
+      Dmsg1(500, "dbi_getvalue error: %s\n", dbi_errmsg);
    }
 
-bail_out:
-   Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
-   mdb->status = (dbi_error_flag) 0;
-   my_dbi_free_result(mdb);
-   mdb->result = NULL;
-   return mdb->status;
+   Dmsg3(500, "dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
+      buf, field_length, buf);
+
+   /*
+    * Don't worry about this buf
+    */
+   return buf;
 }
 
-/* set error to something to abort operation */
-int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
+SQL_ROW B_DB_DBI::sql_fetch_row(void)
 {
-   int res = 0;
-   int count = 30;
-   int (*custom_function)(void*, const char*) = NULL;
-   dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
-
-   Dmsg0(500, "my_dbi_batch_end started\n");
+   int j;
+   SQL_ROW row = NULL; /* by default, return NULL */
+
+   Dmsg0(500, "sql_fetch_row start\n");
+   if ((!m_rows || m_rows_size < m_num_fields) && m_num_rows > 0) {
+      if (m_rows) {
+         Dmsg0(500, "sql_fetch_row freeing space\n");
+         Dmsg2(500, "sql_fetch_row row: '%p' num_fields: '%d'\n", m_rows, m_num_fields);
+         if (m_num_rows != 0) {
+            for (j = 0; j < m_num_fields; j++) {
+               Dmsg2(500, "sql_fetch_row row '%p' '%d'\n", m_rows[j], j);
+                  if (m_rows[j]) {
+                     free(m_rows[j]);
+                  }
+            }
+         }
+         free(m_rows);
+      }
+      Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
+      m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
+      m_rows_size = m_num_fields;
 
-   if (!mdb) {                  /* no files ? */
-      return 0;
+      /*
+       * Now reset the row_number now that we have the space allocated
+       */
+      m_row_number = 1;
    }
 
-   switch (mdb->db_type) {
-   case SQL_TYPE_MYSQL:
-      if(mdb) {
-         mdb->status = (dbi_error_flag) 0;
+   /*
+    * If still within the result set
+    */
+   if (m_row_number <= m_num_rows && m_row_number != DBI_ERROR_BADPTR) {
+      Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (1..%d)\n", m_row_number, m_num_rows);
+      /*
+       * Get each value from this row
+       */
+      for (j = 0; j < m_num_fields; j++) {
+         m_rows[j] = dbi_getvalue(m_result, m_row_number, j);
+         /*
+          * Allocate space to queue row
+          */
+         m_field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
+         /*
+          * Store the pointer in queue
+          */
+         m_field_get->value = m_rows[j];
+         Dmsg4(500, "sql_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
+               j, m_rows[j], m_field_get->value, m_rows[j]);
+         /*
+          * Insert in queue to future free
+          */
+         dbi_getvalue_list->append(m_field_get);
       }
-      break;
-   case SQL_TYPE_POSTGRESQL:
-      custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
+      /*
+       * Increment the row number for the next call
+       */
+      m_row_number++;
 
+      row = m_rows;
+   } else {
+      Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (1..%d)\n", m_row_number, m_num_rows);
+   }
 
-      do {
-         res = (*custom_function)(myconn->connection, error);
-      } while (res == 0 && --count > 0);
+   Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
 
-      if (res == 1) {
-         Dmsg0(500, "ok\n");
-         mdb->status = (dbi_error_flag) 1;
-      }
+   return row;
+}
 
-      if (res <= 0) {
-         Dmsg0(500, "we failed\n");
-         mdb->status = (dbi_error_flag) 0;
-         //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
-       }
-      break;
-   case SQL_TYPE_SQLITE:
-      if(mdb) {
-         mdb->status = (dbi_error_flag) 0;
-      }
-      break;
-   case SQL_TYPE_SQLITE3:
-      if(mdb) {
-         mdb->status = (dbi_error_flag) 0;
-      }
-      break;
-   }
+const char *B_DB_DBI::sql_strerror(void)
+{
+   const char *dbi_errmsg;
 
-   Dmsg0(500, "my_dbi_batch_end finishing\n");
+   dbi_conn_error(m_db_handle, &dbi_errmsg);
 
-   return true;
+   return dbi_errmsg;
 }
 
-/*
- * This function is big and use a big switch.
- * In near future is better split in small functions
- * and refactory.
- *
- */
-int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
+void B_DB_DBI::sql_data_seek(int row)
 {
-   int res;
-   int count=30;
-   dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
-   int (*custom_function)(void*, const char*, int) = NULL;
-   char* (*custom_function_error)(void*) = NULL;
-   size_t len;
-   char *digest;
-   char ed1[50];
+   /*
+    * Set the row number to be returned on the next call to sql_fetch_row
+    */
+   m_row_number = row;
+}
 
-   Dmsg0(500, "my_dbi_batch_insert started \n");
+int B_DB_DBI::sql_affected_rows(void)
+{
+#if 0
+   return dbi_result_get_numrows_affected(result);
+#else
+   return 1;
+#endif
+}
 
-   mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
-   mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
+uint64_t B_DB_DBI::sql_insert_autokey_record(const char *query, const char *table_name)
+{
+   char sequence[30];
+   uint64_t id = 0;
 
-   if (ar->Digest == NULL || ar->Digest[0] == 0) {
-      *digest = '\0';
-   } else {
-      digest = ar->Digest;
+   /*
+    * First execute the insert query and then retrieve the currval.
+    */
+   if (!sql_query(query)) {
+      return 0;
    }
 
-   switch (mdb->db_type) {
-   case SQL_TYPE_MYSQL:
-      db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
-      db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
-      len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
-                      ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
-                      mdb->esc_name, ar->attr, digest);
+   m_num_rows = sql_affected_rows();
+   if (m_num_rows != 1) {
+      return 0;
+   }
 
-      if (my_dbi_query(mdb,mdb->cmd) == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_insert failed\n");
-         goto bail_out;
+   changes++;
+
+   /*
+    * Obtain the current value of the sequence that
+    * provides the serial value for primary key of the table.
+    *
+    * currval is local to our session.  It is not affected by
+    * other transactions.
+    *
+    * Determine the name of the sequence.
+    * PostgreSQL automatically creates a sequence using
+    * <table>_<column>_seq.
+    * At the time of writing, all tables used this format for
+    * for their primary key: <table>id
+    * Except for basefiles which has a primary key on baseid.
+    * Therefore, we need to special case that one table.
+    *
+    * everything else can use the PostgreSQL formula.
+    */
+   if (m_db_type == SQL_TYPE_POSTGRESQL) {
+      if (strcasecmp(table_name, "basefiles") == 0) {
+         bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
+      } else {
+         bstrncpy(sequence, table_name, sizeof(sequence));
+         bstrncat(sequence, "_", sizeof(sequence));
+         bstrncat(sequence, table_name, sizeof(sequence));
+         bstrncat(sequence, "id", sizeof(sequence));
       }
 
-      Dmsg0(500, "my_dbi_batch_insert finishing\n");
+      bstrncat(sequence, "_seq", sizeof(sequence));
+      id = dbi_conn_sequence_last(m_db_handle, NT_(sequence));
+   } else {
+      id = dbi_conn_sequence_last(m_db_handle, NT_(table_name));
+   }
 
-      return 1;
-      break;
-   case SQL_TYPE_POSTGRESQL:
-      my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
-      my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
-      len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
-                     ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
-                     mdb->esc_name, ar->attr, digest);
+   return id;
+}
 
-      /* libdbi don't support CopyData and we need call a postgresql
-       * specific function to do this work
-       */
-      Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
-      if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
-            "PQputCopyData")) != NULL) {
-         do {
-            res = (*custom_function)(myconn->connection, mdb->cmd, len);
-         } while (res == 0 && --count > 0);
+/* dbi_getisnull
+ * like PQgetisnull
+ * int PQgetisnull(const PGresult *res,
+ *                 int row_number,
+ *                 int column_number);
+ *
+ *  use dbi_result_seek_row to search in result set
+ */
+static int dbi_getisnull(dbi_result *result, int row_number, int column_number) {
+   int i;
 
-         if (res == 1) {
-            Dmsg0(500, "ok\n");
-            mdb->changes++;
-            mdb->status = (dbi_error_flag) 1;
-         }
+   if (row_number == 0) {
+      row_number++;
+   }
 
-         if (res <= 0) {
-            Dmsg0(500, "my_dbi_batch_insert failed\n");
-            goto bail_out;
-         }
+   column_number++;
 
-         Dmsg0(500, "my_dbi_batch_insert finishing\n");
-         return mdb->status;
-      } else {
-         // ensure to detect a PQerror
-         custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
-         Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
-         goto bail_out;
-      }
-      break;
-   case SQL_TYPE_SQLITE:
-      db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
-      db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
-      len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
-                      ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
-                      mdb->esc_name, ar->attr, digest);
-      if (my_dbi_query(mdb,mdb->cmd) == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_insert failed\n");
-         goto bail_out;
-      }
+   if (dbi_result_seek_row(result, row_number)) {
+      i = dbi_result_field_is_null_idx(result,column_number);
+      return i;
+   } else {
+      return 0;
+   }
+}
 
-      Dmsg0(500, "my_dbi_batch_insert finishing\n");
+SQL_FIELD *B_DB_DBI::sql_fetch_field(void)
+{
+   int i, j;
+   int dbi_index;
+   int max_length;
+   int this_length;
+   char *cbuf = NULL;
 
-      return 1;
-      break;
-   case SQL_TYPE_SQLITE3:
-      db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
-      db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
-      len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
-                      ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
-                      mdb->esc_name, ar->attr, digest);
-      if (my_dbi_query(mdb,mdb->cmd) == 1)
-      {
-         Dmsg0(500, "my_dbi_batch_insert failed\n");
-         goto bail_out;
+   Dmsg0(500, "sql_fetch_field starts\n");
+
+   if (!m_fields || m_fields_size < m_num_fields) {
+      if (m_fields) {
+         free(m_fields);
+         m_fields = NULL;
       }
+      Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
+      m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
+      m_fields_size = m_num_fields;
+
+      for (i = 0; i < m_num_fields; i++) {
+         /*
+          * num_fields is starting at 1, increment i by 1
+          */
+         dbi_index = i + 1;
+         Dmsg1(500, "filling field %d\n", i);
+         m_fields[i].name = (char *)dbi_result_get_field_name(m_result, dbi_index);
+         m_fields[i].type = dbi_result_get_field_type_idx(m_result, dbi_index);
+         m_fields[i].flags = dbi_result_get_field_attribs_idx(m_result, dbi_index);
+
+         /*
+          * For a given column, find the max length.
+          */
+         max_length = 0;
+         for (j = 0; j < m_num_rows; j++) {
+            if (dbi_getisnull(m_result, j, dbi_index)) {
+                this_length = 4;        /* "NULL" */
+            } else {
+               cbuf = dbi_getvalue(m_result, j, dbi_index);
+               this_length = cstrlen(cbuf);
+               /*
+                * cbuf is always free
+                */
+               free(cbuf);
+            }
+         
+            if (max_length < this_length) {
+               max_length = this_length;
+            }
+         }
+         m_fields[i].max_length = max_length;
 
-      Dmsg0(500, "my_dbi_batch_insert finishing\n");
+         Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
+               m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
+      }
+   }
 
-      return 1;
-      break;
+   /*
+    * Increment field number for the next time around
+    */
+   return &m_fields[m_field_number++];
+}
+
+bool B_DB_DBI::sql_field_is_not_null(int field_type)
+{
+   switch (field_type) {
+   case (1 << 0):
+      return true;
+   default:
+      return false;
    }
+}
 
-bail_out:
-  Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
-  mdb->status = (dbi_error_flag) 0;
-  my_dbi_free_result(mdb);
-  return mdb->status;
+bool B_DB_DBI::sql_field_is_numeric(int field_type)
+{
+   switch (field_type) {
+   case 1:
+   case 2:
+      return true;
+   default:
+      return false;
+   }
 }
 
 /*
@@ -1054,9 +1122,11 @@ bail_out:
  *         string must be long enough (max 2*old+1) to hold
  *         the escaped output.
  */
-char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
+static char *postgresql_copy_escape(char *dest, char *src, size_t len)
 {
-   /* we have to escape \t, \n, \r, \ */
+   /*
+    * We have to escape \t, \n, \r, \
+    */
    char c = '\0' ;
 
    while (len > 0 && *src) {
@@ -1094,292 +1164,314 @@ char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
    return dest;
 }
 
-#endif /* HAVE_BATCH_FILE_INSERT */
-
-/* my_dbi_getisnull
- * like PQgetisnull
- * int PQgetisnull(const PGresult *res,
- *              int row_number,
- *               int column_number);
+/*
+ * This can be a bit strang but is the one way to do
  *
- *  use dbi_result_seek_row to search in result set
+ * Returns true if OK
+ *         false if failed
  */
-int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
-   int i;
+bool B_DB_DBI::sql_batch_start(JCR *jcr)
+{
+   bool retval = true;
+   const char *query = "COPY batch FROM STDIN";
 
-   if(row_number == 0) {
-      row_number++;
-   }
+   Dmsg0(500, "sql_batch_start started\n");
 
-   column_number++;
+   db_lock(this);
+   switch (m_db_type) {
+   case SQL_TYPE_MYSQL:
+      if (!sql_query("CREATE TEMPORARY TABLE batch ("
+                             "FileIndex integer,"
+                             "JobId integer,"
+                             "Path blob,"
+                             "Name blob,"
+                             "LStat tinyblob,"
+                             "MD5 tinyblob,"
+                             "DeltaSeq smallint)")) {
+         Dmsg0(500, "sql_batch_start failed\n");
+         goto bail_out;
+      }
+      Dmsg0(500, "sql_batch_start finishing\n");
+      goto ok_out;
+   case SQL_TYPE_POSTGRESQL:
+      if (!sql_query("CREATE TEMPORARY TABLE batch ("
+                             "FileIndex int,"
+                             "JobId int,"
+                             "Path varchar,"
+                             "Name varchar,"
+                             "LStat varchar,"
+                             "MD5 varchar,"
+                             "DeltaSeq int)")) {
+         Dmsg0(500, "sql_batch_start failed\n");
+         goto bail_out;
+      }
 
-   if(dbi_result_seek_row(result, row_number)) {
+      /*
+       * We are starting a new query.  reset everything.
+       */
+      m_num_rows     = -1;
+      m_row_number   = -1;
+      m_field_number = -1;
 
-      i = dbi_result_field_is_null_idx(result,column_number);
+      sql_free_result();
 
-      return i;
-   } else {
+      for (int i=0; i < 10; i++) {
+         sql_query(query);
+         if (m_result) {
+            break;
+         }
+         bmicrosleep(5, 0);
+      }
+      if (!m_result) {
+         Dmsg1(50, "Query failed: %s\n", query);
+         goto bail_out;
+      }
 
-      return 0;
+      m_status = (dbi_error_flag)dbi_conn_error(m_db_handle, NULL);
+      //m_status = DBI_ERROR_NONE;
+
+      if (m_status == DBI_ERROR_NONE) {
+         /*
+          * How many fields in the set?
+          */
+         m_num_fields = dbi_result_get_numfields(m_result);
+         m_num_rows = dbi_result_get_numrows(m_result);
+         m_status = (dbi_error_flag) 1;
+      } else {
+         Dmsg1(50, "Result status failed: %s\n", query);
+         goto bail_out;
+      }
+
+      Dmsg0(500, "sql_batch_start finishing\n");
+      goto ok_out;
+   case SQL_TYPE_SQLITE3:
+      if (!sql_query("CREATE TEMPORARY TABLE batch ("
+                             "FileIndex integer,"
+                             "JobId integer,"
+                             "Path blob,"
+                             "Name blob,"
+                             "LStat tinyblob,"
+                             "MD5 tinyblob,"
+                             "DeltaSeq smallint)")) {
+         Dmsg0(500, "sql_batch_start failed\n");
+         goto bail_out;
+      }
+      Dmsg0(500, "sql_batch_start finishing\n");
+      goto ok_out;
    }
 
+bail_out:
+   Mmsg1(&errmsg, _("error starting batch mode: %s"), sql_strerror());
+   m_status = (dbi_error_flag) 0;
+   sql_free_result();
+   m_result = NULL;
+   retval = false;
+
+ok_out:
+   db_unlock(this);
+   return retval;
 }
-/* my_dbi_getvalue
- * like PQgetvalue;
- * char *PQgetvalue(const PGresult *res,
- *                int row_number,
- *                int column_number);
- *
- * use dbi_result_seek_row to search in result set
- * use example to return only strings
+
+/*
+ * Set error to something to abort operation
  */
-char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
+bool B_DB_DBI::sql_batch_end(JCR *jcr, const char *error)
+{
+   int res = 0;
+   int count = 30;
+   int (*custom_function)(void*, const char*) = NULL;
+   dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
 
-   char *buf = NULL;
-   const char *errmsg;
-   const char *field_name;
-   unsigned short dbitype;
-   size_t field_length;
-   int64_t num;
+   Dmsg0(500, "sql_batch_start started\n");
 
-   /* correct the index for dbi interface
-    * dbi index begins 1
-    * I prefer do not change others functions
-    */
-   Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
-                                result, row_number, column_number);
+   switch (m_db_type) {
+   case SQL_TYPE_MYSQL:
+      m_status = (dbi_error_flag) 0;
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQputCopyEnd");
 
-   column_number++;
+      do {
+         res = (*custom_function)(myconn->connection, error);
+      } while (res == 0 && --count > 0);
 
-   if(row_number == 0) {
-     row_number++;
+      if (res == 1) {
+         Dmsg0(500, "ok\n");
+         m_status = (dbi_error_flag) 1;
+      }
+
+      if (res <= 0) {
+         Dmsg0(500, "we failed\n");
+         m_status = (dbi_error_flag) 0;
+         //Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(myconn));
+       }
+      break;
+   case SQL_TYPE_SQLITE3:
+      m_status = (dbi_error_flag) 0;
+      break;
    }
 
-   Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
-                        result, row_number, column_number);
+   Dmsg0(500, "sql_batch_start finishing\n");
 
-   if(dbi_result_seek_row(result, row_number)) {
+   return true;
+}
 
-      field_name = dbi_result_get_field_name(result, column_number);
-      field_length = dbi_result_get_field_length(result, field_name);
-      dbitype = dbi_result_get_field_type_idx(result,column_number);
+/*
+ * This function is big and use a big switch.
+ * In near future is better split in small functions
+ * and refactory.
+ */
+bool B_DB_DBI::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
+{
+   int res;
+   int count=30;
+   dbi_conn_t *myconn = (dbi_conn_t *)(m_db_handle);
+   int (*custom_function)(void*, const char*, int) = NULL;
+   char* (*custom_function_error)(void*) = NULL;
+   size_t len;
+   char *digest;
+   char ed1[50];
 
-      Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
-            "field_length bytes: '%d' fieldname: '%s'\n",
-            dbitype, field_length, field_name);
+   Dmsg0(500, "sql_batch_start started \n");
 
-      if(field_length) {
-         //buf = (char *)malloc(sizeof(char *) * field_length + 1);
-         buf = (char *)malloc(field_length + 1);
-      } else {
-         /* if numbers */
-         buf = (char *)malloc(sizeof(char *) * 50);
+   esc_name = check_pool_memory_size(esc_name, fnl*2+1);
+   esc_path = check_pool_memory_size(esc_path, pnl*2+1);
+
+   if (ar->Digest == NULL || ar->Digest[0] == 0) {
+      *digest = '\0';
+   } else {
+      digest = ar->Digest;
+   }
+
+   switch (m_db_type) {
+   case SQL_TYPE_MYSQL:
+      db_escape_string(jcr, esc_name, fname, fnl);
+      db_escape_string(jcr, esc_path, path, pnl);
+      len = Mmsg(cmd, "INSERT INTO batch VALUES "
+                      "(%u,%s,'%s','%s','%s','%s',%u)",
+                      ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
+                      esc_name, ar->attr, digest, ar->DeltaSeq);
+
+      if (!sql_query(cmd))
+      {
+         Dmsg0(500, "sql_batch_start failed\n");
+         goto bail_out;
       }
 
-      switch (dbitype) {
-      case DBI_TYPE_INTEGER:
-         num = dbi_result_get_longlong(result, field_name);
-         edit_int64(num, buf);
-         field_length = strlen(buf);
-         break;
-      case DBI_TYPE_STRING:
-         if(field_length) {
-            field_length = bsnprintf(buf, field_length + 1, "%s",
-            dbi_result_get_string(result, field_name));
-         } else {
-            buf[0] = 0;
-         }
-         break;
-      case DBI_TYPE_BINARY:
-         /* dbi_result_get_binary return a NULL pointer if value is empty
-         * following, change this to what Bacula espected
-         */
-         if(field_length) {
-            field_length = bsnprintf(buf, field_length + 1, "%s",
-                  dbi_result_get_binary(result, field_name));
-         } else {
-            buf[0] = 0;
-         }
-         break;
-      case DBI_TYPE_DATETIME:
-         time_t last;
-         struct tm tm;
+      Dmsg0(500, "sql_batch_start finishing\n");
 
-         last = dbi_result_get_datetime(result, field_name);
+      return true;
+      break;
+   case SQL_TYPE_POSTGRESQL:
+      postgresql_copy_escape(esc_name, fname, fnl);
+      postgresql_copy_escape(esc_path, path, pnl);
+      len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
+                     ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
+                     esc_name, ar->attr, digest, ar->DeltaSeq);
+
+      /*
+       * libdbi don't support CopyData and we need call a postgresql
+       * specific function to do this work
+       */
+      Dmsg2(500, "sql_batch_insert :\n %s \ncmd_size: %d",cmd, len);
+      custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn),"PQputCopyData");
+      if (custom_function != NULL) {
+         do {
+            res = (*custom_function)(myconn->connection, cmd, len);
+         } while (res == 0 && --count > 0);
 
-         if(last == -1) {
-                field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
-         } else {
-            (void)localtime_r(&last, &tm);
-            field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
-                  (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
-                  tm.tm_hour, tm.tm_min, tm.tm_sec);
+         if (res == 1) {
+            Dmsg0(500, "ok\n");
+            changes++;
+            m_status = (dbi_error_flag) 1;
          }
-         break;
+
+         if (res <= 0) {
+            Dmsg0(500, "sql_batch_insert failed\n");
+            goto bail_out;
+         }
+
+         Dmsg0(500, "sql_batch_insert finishing\n");
+         return true;
+      } else {
+         /*
+          * Ensure to detect a PQerror
+          */
+         custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(myconn), "PQerrorMessage");
+         Dmsg1(500, "sql_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
+         goto bail_out;
+      }
+      break;
+   case SQL_TYPE_SQLITE3:
+      db_escape_string(jcr, esc_name, fname, fnl);
+      db_escape_string(jcr, esc_path, path, pnl);
+      len = Mmsg(cmd, "INSERT INTO batch VALUES "
+                      "(%u,%s,'%s','%s','%s','%s',%u)",
+                      ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
+                      esc_name, ar->attr, digest, ar->DeltaSeq);
+
+      if (!sql_query(cmd))
+      {
+         Dmsg0(500, "sql_batch_insert failed\n");
+         goto bail_out;
       }
 
-   } else {
-      dbi_conn_error(dbi_result_get_conn(result), &errmsg);
-      Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
-   }
+      Dmsg0(500, "sql_batch_insert finishing\n");
 
-   Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
-      buf, field_length, buf);
+      return true;
+      break;
+   }
 
-   // don't worry about this buf
-   return buf;
+bail_out:
+   Mmsg1(&errmsg, _("error inserting batch mode: %s"), sql_strerror());
+   m_status = (dbi_error_flag) 0;
+   sql_free_result();
+   return false;
 }
 
-int my_dbi_sql_insert_id(B_DB *mdb, char *table_name)
+/*
+ * Initialize database data structure. In principal this should
+ * never have errors, or it is really fatal.
+ */
+B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
+                       const char *db_password, const char *db_address, int db_port,
+                       const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
 {
-   /*
-    Obtain the current value of the sequence that
-    provides the serial value for primary key of the table.
-
-    currval is local to our session.  It is not affected by
-    other transactions.
+   B_DB_DBI *mdb = NULL;
 
-    Determine the name of the sequence.
-    PostgreSQL automatically creates a sequence using
-    <table>_<column>_seq.
-    At the time of writing, all tables used this format for
-    for their primary key: <table>id
-    Except for basefiles which has a primary key on baseid.
-    Therefore, we need to special case that one table.
-
-    everything else can use the PostgreSQL formula.
-   */
+   if (!db_driver) {
+      Jmsg(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
+   }
 
-   char      sequence[30];
-   uint64_t    id = 0;
+   if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
+      Jmsg(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
+   }
 
-   if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
+   if (!db_user) {
+      Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
+      return NULL;
+   }
 
-      if (strcasecmp(table_name, "basefiles") == 0) {
-         bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
-      } else {
-         bstrncpy(sequence, table_name, sizeof(sequence));
-         bstrncat(sequence, "_",        sizeof(sequence));
-         bstrncat(sequence, table_name, sizeof(sequence));
-         bstrncat(sequence, "id",       sizeof(sequence));
+   P(mutex);                          /* lock DB queue */
+   if (db_list && !mult_db_connections) {
+      /*
+       * Look to see if DB already open
+       */
+      foreach_dlist(mdb, db_list) {
+         if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
+            Dmsg1(100, "DB REopen %s\n", db_name);
+            mdb->increment_refcount();
+            goto bail_out;
+         }
       }
-
-      bstrncat(sequence, "_seq", sizeof(sequence));
-      id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
-   } else {
-      id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
    }
+   Dmsg0(100, "db_init_database first time\n");
+   mdb = New(B_DB_DBI(jcr, db_driver, db_name, db_user, db_password, db_address,
+                      db_port, db_socket, mult_db_connections, disable_batch_insert));
 
-   return id;
+bail_out:
+   V(mutex);
+   return mdb;
 }
 
-#ifdef HAVE_BATCH_FILE_INSERT
-const char *my_dbi_batch_lock_path_query[5] = {
-   /* Mysql */
-   "LOCK TABLES Path write, batch write, Path as p write",
-   /* Postgresql */
-   "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
-   /* SQLite */
-   "BEGIN",
-   /* SQLite3 */
-   "BEGIN",
-   /* Ingres */
-   "BEGIN"
-};
-
-const char *my_dbi_batch_lock_filename_query[5] = {
-   /* Mysql */
-   "LOCK TABLES Filename write, batch write, Filename as f write",
-   /* Postgresql */
-   "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
-   /* SQLite */
-   "BEGIN",
-   /* SQLite3 */
-   "BEGIN",
-   /* Ingres */
-   "BEGIN"
-};
-
-const char *my_dbi_batch_unlock_tables_query[5] = {
-   /* Mysql */
-   "UNLOCK TABLES",
-   /* Postgresql */
-   "COMMIT",
-   /* SQLite */
-   "COMMIT",
-   /* SQLite3 */
-   "COMMIT",
-   /* Ingres */
-   "COMMIT"
-};
-
-const char *my_dbi_batch_fill_path_query[5] = {
-   /* Mysql */
-   "INSERT INTO Path (Path) "
-   "SELECT a.Path FROM "
-   "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
-   "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
-   /* Postgresql */
-   "INSERT INTO Path (Path) "
-   "SELECT a.Path FROM "
-   "(SELECT DISTINCT Path FROM batch) AS a "
-   "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
-   /* SQLite */
-   "INSERT INTO Path (Path)"
-   " SELECT DISTINCT Path FROM batch"
-   " EXCEPT SELECT Path FROM Path",
-   /* SQLite3 */
-   "INSERT INTO Path (Path)"
-   " SELECT DISTINCT Path FROM batch"
-   " EXCEPT SELECT Path FROM Path",
-   /* Ingres */
-   "INSERT INTO Path (Path) "
-   "SELECT a.Path FROM "
-   "(SELECT DISTINCT Path FROM batch) AS a "
-   "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
-};
-
-const char *my_dbi_batch_fill_filename_query[5] = {
-   /* Mysql */
-   "INSERT INTO Filename (Name) "
-   "SELECT a.Name FROM "
-   "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
-   "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
-   /* Postgresql */
-   "INSERT INTO Filename (Name) "
-   "SELECT a.Name FROM "
-   "(SELECT DISTINCT Name FROM batch) as a "
-   "WHERE NOT EXISTS "
-   "(SELECT Name FROM Filename WHERE Name = a.Name)",
-   /* SQLite */
-   "INSERT INTO Filename (Name)"
-   " SELECT DISTINCT Name FROM batch "
-   " EXCEPT SELECT Name FROM Filename",
-   /* SQLite3 */
-   "INSERT INTO Filename (Name)"
-   " SELECT DISTINCT Name FROM batch "
-   " EXCEPT SELECT Name FROM Filename",
-   /* Ingres */
-   "INSERT INTO Filename (Name) "
-   "SELECT a.Name FROM "
-   "(SELECT DISTINCT Name FROM batch) as a "
-   "WHERE NOT EXISTS "
-   "(SELECT Name FROM Filename WHERE Name = a.Name)"
-};
-
-#endif /* HAVE_BATCH_FILE_INSERT */
-
-const char *my_dbi_match[5] = {
-   /* Mysql */
-   "MATCH",
-   /* Postgresql */
-   "~",
-   /* SQLite */
-   "MATCH",
-   /* SQLite3 */
-   "MATCH",
-   /* Ingres */
-   "~"
-};
-
 #endif /* HAVE_DBI */