]> git.sur5r.net Git - bacula/bacula/blobdiff - bacula/src/cats/bvfs.c
bvfs: Do not insert deleted directories in PathVisibility table
[bacula/bacula] / bacula / src / cats / bvfs.c
index 460f683a2e48b7b1cafde9aba0b446636f936874..91d82785b3c35806ae9c9475107aba5127266656 100644 (file)
@@ -1,31 +1,35 @@
 /*
-   Bacula® - The Network Backup Solution
+   Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2009-2014 Free Software Foundation Europe e.V.
+   Copyright (C) 2000-2017 Kern Sibbald
 
-   The main author of Bacula is Kern Sibbald, with contributions from many
-   others, a complete list can be found in the file AUTHORS.
+   The original author of Bacula is Kern Sibbald, with contributions
+   from many others, a complete list can be found in the file AUTHORS.
 
    You may use this file and others of this release according to the
    license defined in the LICENSE file, which includes the Affero General
    Public License, v3.0 ("AGPLv3") and some additional permissions and
    terms pursuant to its AGPLv3 Section 7.
 
-   Bacula® is a registered trademark of Kern Sibbald.
+   This notice must be preserved when any source code is
+   conveyed and/or propagated.
+
+   Bacula(R) is a registered trademark of Kern Sibbald.
 */
 
 #include "bacula.h"
-
+#include "cats.h" 
 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL
-
-#include "cats.h"
-#include "bdb_priv.h"
-#include "sql_glue.h"
 #include "lib/htable.h"
 #include "bvfs.h"
 
-#define dbglevel 10
-#define dbglevel_sql 15
+#define can_access(x) (true)
+
+/* from libbacfind */
+extern int decode_stat(char *buf, struct stat *statp, int stat_size, int32_t *LinkFI);
+
+#define dbglevel      DT_BVFS|10
+#define dbglevel_sql  DT_SQL|15
 
 static int result_handler(void *ctx, int fields, char **row)
 {
@@ -45,7 +49,8 @@ static int result_handler(void *ctx, int fields, char **row)
    return 0;
 }
 
-Bvfs::Bvfs(JCR *j, B_DB *mdb) {
+Bvfs::Bvfs(JCR *j, BDB *mdb)
+{
    jcr = j;
    jcr->inc_use_count();
    db = mdb;                 /* need to inc ref count */
@@ -56,14 +61,19 @@ Bvfs::Bvfs(JCR *j, B_DB *mdb) {
    tmp = get_pool_memory(PM_NAME);
    escaped_list = get_pool_memory(PM_NAME);
    *filename = *jobids = *prev_dir = *pattern = 0;
-   dir_filenameid = pwd_id = offset = 0;
+   pwd_id = offset = 0;
    see_copies = see_all_versions = false;
+   compute_delta = true;
    limit = 1000;
    attr = new_attr(jcr);
    list_entries = result_handler;
    user_data = this;
    username = NULL;
    job_acl = client_acl = pool_acl = fileset_acl = NULL;
+   last_dir_acl = NULL;
+   dir_acl = NULL;
+   use_acl = false;
+   dir_filenameid = 0;       /* special FilenameId where Name='' */
 }
 
 Bvfs::~Bvfs() {
@@ -78,6 +88,9 @@ Bvfs::~Bvfs() {
    }
    free_attr(attr);
    jcr->dec_use_count();
+   if (dir_acl) {
+      delete dir_acl;
+   }
 }
 
 char *Bvfs::escape_list(alist *lst)
@@ -101,7 +114,7 @@ char *Bvfs::escape_list(alist *lst)
          tmp = check_pool_memory_size(tmp, 2 * len + 2 + 2);
 
          tmp[0] = '\'';
-         db_escape_string(jcr, db, tmp + 1 , elt, len);
+         db->bdb_escape_string(jcr, tmp + 1 , elt, len);
          pm_strcat(tmp, "'");
 
          if (*escaped_list) {
@@ -114,7 +127,8 @@ char *Bvfs::escape_list(alist *lst)
    return escaped_list;
 }
 
-void Bvfs::filter_jobid()
+/* Returns the number of jobids in the result */
+int Bvfs::filter_jobid()
 {
    POOL_MEM query;
    POOL_MEM sub_where;
@@ -123,7 +137,14 @@ void Bvfs::filter_jobid()
    /* No ACL, no username, no check */
    if (!job_acl && !fileset_acl && !client_acl && !pool_acl && !username) {
       Dmsg0(dbglevel_sql, "No ACL\n");
-      return;
+      /* Just count the number of items in the list */
+      int nb = (*jobids != 0) ? 1 : 0;
+      for (char *p=jobids; *p ; p++) {
+         if (*p == ',') {
+            nb++;
+         }
+      }
+      return nb;
    }
 
    if (job_acl) {
@@ -171,20 +192,23 @@ void Bvfs::filter_jobid()
 
    db_list_ctx ctx;
    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
-   db_sql_query(db, query.c_str(), db_list_handler, &ctx);
+   db->bdb_sql_query(query.c_str(), db_list_handler, &ctx);
    pm_strcpy(jobids, ctx.list);
+   return ctx.count;
 }
 
-void Bvfs::set_jobid(JobId_t id)
+/* Return the number of jobids after the filter */
+int Bvfs::set_jobid(JobId_t id)
 {
    Mmsg(jobids, "%lld", (uint64_t)id);
-   filter_jobid();
+   return filter_jobid();
 }
 
-void Bvfs::set_jobids(char *ids)
+/* Return the number of jobids after the filter */
+int Bvfs::set_jobids(char *ids)
 {
    pm_strcpy(jobids, ids);
-   filter_jobid();
+   return filter_jobid();
 }
 
 /*
@@ -219,7 +243,7 @@ public:
       nb_node = 0;
       table_node = New(alist(5, owned_by_alist));
       table_node->append(nodes);
-   }
+   };
 
    hlink *get_hlink() {
       if (++nb_node >= max_node) {
@@ -228,23 +252,23 @@ public:
          table_node->append(nodes);
       }
       return nodes + nb_node;
-   }
+   };
 
    bool lookup(char *pathid) {
       bool ret = cache_ppathid->lookup(pathid) != NULL;
       return ret;
-   }
+   };
 
    void insert(char *pathid) {
       hlink *h = get_hlink();
       cache_ppathid->insert(pathid, h);
-   }
+   };
 
    ~pathid_cache() {
       cache_ppathid->destroy();
       free(cache_ppathid);
       delete table_node;
-   }
+   };
 private:
    pathid_cache(const pathid_cache &); /* prohibit pass by value */
    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
@@ -310,7 +334,7 @@ char *bvfs_basename_dir(char *path)
    return p;
 }
 
-static void build_path_hierarchy(JCR *jcr, B_DB *mdb,
+static void build_path_hierarchy(JCR *jcr, BDB *mdb,
                                  pathid_cache &ppathid_cache,
                                  char *org_pathid, char *path)
 {
@@ -333,12 +357,12 @@ static void build_path_hierarchy(JCR *jcr, B_DB *mdb,
               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
               pathid);
 
-         if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
+         if (!mdb->QueryDB(jcr, mdb->cmd)) {
             goto bail_out;      /* Query failed, just leave */
          }
 
          /* Do we have a result ? */
-         if (sql_num_rows(mdb) > 0) {
+         if (mdb->sql_num_rows() > 0) {
             ppathid_cache.insert(pathid);
             /* This dir was in the db ...
              * It means we can leave, the tree has allready been built for
@@ -349,7 +373,7 @@ static void build_path_hierarchy(JCR *jcr, B_DB *mdb,
             /* search or create parent PathId in Path table */
             mdb->path = bvfs_parent_dir(path);
             mdb->pnl = strlen(mdb->path);
-            if (!db_create_path_record(jcr, mdb, &parent)) {
+            if (!mdb->bdb_create_path_record(jcr, &parent)) {
                goto bail_out;
             }
             ppathid_cache.insert(pathid);
@@ -359,7 +383,7 @@ static void build_path_hierarchy(JCR *jcr, B_DB *mdb,
                  "VALUES (%s,%lld)",
                  pathid, (uint64_t) parent.PathId);
 
-            if (!INSERT_DB(jcr, mdb, mdb->cmd)) {
+            if (!mdb->InsertDB(jcr, mdb->cmd)) {
                goto bail_out;   /* Can't insert the record, just leave */
             }
 
@@ -385,22 +409,28 @@ bail_out:
  *        OK    1
  */
 static int update_path_hierarchy_cache(JCR *jcr,
-                                        B_DB *mdb,
+                                        BDB *mdb,
                                         pathid_cache &ppathid_cache,
                                         JobId_t JobId)
 {
    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
-   int ret=0;
+   uint32_t ret=0;
    uint32_t num;
    char jobid[50];
    edit_uint64(JobId, jobid);
 
-   db_lock(mdb);
-   db_start_transaction(jcr, mdb);
+   mdb->bdb_lock();
+
+   /* We don't really want to harm users with spurious messages,
+    * everything is handled by transaction
+    */
+   mdb->set_use_fatal_jmsg(false);
+
+   mdb->bdb_start_transaction(jcr);
 
    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
 
-   if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
+   if (!mdb->QueryDB(jcr, mdb->cmd) || mdb->sql_num_rows() > 0) {
       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
       ret = 1;
       goto bail_out;
@@ -409,14 +439,14 @@ static int update_path_hierarchy_cache(JCR *jcr,
    /* Inserting path records for JobId */
    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
                    "SELECT DISTINCT PathId, JobId "
-                     "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
+                     "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s AND FileIndex <> 0 "
                            "UNION "
                            "SELECT PathId, BaseFiles.JobId "
                              "FROM BaseFiles JOIN File AS F USING (FileId) "
                             "WHERE BaseFiles.JobId = %s) AS B",
         jobid, jobid);
 
-   if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
+   if (!mdb->QueryDB(jcr, mdb->cmd)) {
       Dmsg1(dbglevel, "Can't fill PathVisibility %d\n", (uint32_t)JobId );
       goto bail_out;
    }
@@ -436,7 +466,7 @@ static int update_path_hierarchy_cache(JCR *jcr,
       "ORDER BY Path", jobid);
    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
 
-   if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
+   if (!mdb->QueryDB(jcr, mdb->cmd)) {
       Dmsg1(dbglevel, "Can't get new Path %d\n", (uint32_t)JobId );
       goto bail_out;
    }
@@ -445,13 +475,13 @@ static int update_path_hierarchy_cache(JCR *jcr,
     * So, now i'm copying the result in memory to be able to query the
     * catalog descriptor again.
     */
-   num = sql_num_rows(mdb);
+   num = mdb->sql_num_rows();
    if (num > 0) {
       char **result = (char **)malloc (num * 2 * sizeof(char *));
 
       SQL_ROW row;
       int i=0;
-      while((row = sql_fetch_row(mdb))) {
+      while((row = mdb->sql_fetch_row())) {
          result[i++] = bstrdup(row[0]);
          result[i++] = bstrdup(row[1]);
       }
@@ -466,7 +496,7 @@ static int update_path_hierarchy_cache(JCR *jcr,
       free(result);
    }
 
-   if (mdb->db_get_type_index() == SQL_TYPE_SQLITE3) {
+   if (mdb->bdb_get_type_index() == SQL_TYPE_SQLITE3) {
       Mmsg(mdb->cmd,
  "INSERT INTO PathVisibility (PathId, JobId) "
    "SELECT DISTINCT h.PPathId AS PathId, %s "
@@ -475,6 +505,18 @@ static int update_path_hierarchy_cache(JCR *jcr,
       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
            jobid, jobid, jobid );
 
+   } else if (mdb->bdb_get_type_index() == SQL_TYPE_MYSQL) {
+      Mmsg(mdb->cmd,
+  "INSERT INTO PathVisibility (PathId, JobId)  "
+   "SELECT a.PathId,%s "
+   "FROM ( "
+     "SELECT DISTINCT h.PPathId AS PathId "
+       "FROM PathHierarchy AS h "
+       "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
+      "WHERE p.JobId=%s) AS a "
+      "LEFT JOIN PathVisibility AS b ON (b.JobId=%s and a.PathId = b.PathId) "
+      "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
+
    } else {
       Mmsg(mdb->cmd,
   "INSERT INTO PathVisibility (PathId, JobId)  "
@@ -491,31 +533,25 @@ static int update_path_hierarchy_cache(JCR *jcr,
    }
 
    do {
-      ret = QUERY_DB(jcr, mdb, mdb->cmd);
-   } while (ret && sql_affected_rows(mdb) > 0);
+      ret = mdb->QueryDB(jcr, mdb->cmd);
+   } while (ret && mdb->sql_affected_rows() > 0);
 
    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
-   UPDATE_DB(jcr, mdb, mdb->cmd);
+   ret = mdb->UpdateDB(jcr, mdb->cmd, false);
 
 bail_out:
-   db_end_transaction(jcr, mdb);
-   db_unlock(mdb);
-   return ret;
-}
+   mdb->bdb_end_transaction(jcr);
 
-/*
- * Find an store the filename descriptor for empty directories Filename.Name=''
- */
-DBId_t Bvfs::get_dir_filenameid()
-{
-   uint32_t id;
-   if (dir_filenameid) {
-      return dir_filenameid;
+   if (!ret) {
+      Mmsg(mdb->cmd, "SELECT HasCache FROM Job WHERE JobId=%s", jobid);
+      mdb->bdb_sql_query(mdb->cmd, db_int_handler, &ret);
    }
-   Mmsg(db->cmd, "SELECT FilenameId FROM Filename WHERE Name = ''");
-   db_sql_query(db, db->cmd, db_int_handler, &id);
-   dir_filenameid = id;
-   return dir_filenameid;
+
+   /* Enable back the FATAL message if something is wrong */
+   mdb->set_use_fatal_jmsg(true);
+
+   mdb->bdb_unlock();
+   return ret;
 }
 
 /* Compute the cache for the bfileview compoment */
@@ -530,21 +566,45 @@ void Bvfs::fv_update_cache()
       return;                   /* Nothing to build */
    }
 
-   db_lock(db);
-   db_start_transaction(jcr, db);
+   db->bdb_lock();
+   /* We don't really want to harm users with spurious messages,
+    * everything is handled by transaction
+    */
+   db->set_use_fatal_jmsg(false);
+
+   db->bdb_start_transaction(jcr);
 
    pathid = get_root();
 
    fv_compute_size_and_count(pathid, &size, &count);
 
-   db_end_transaction(jcr, db);
-   db_unlock(db);
+   db->bdb_end_transaction(jcr);
+
+   /* Enable back the FATAL message if something is wrong */
+   db->set_use_fatal_jmsg(true);
+
+   db->bdb_unlock();
+}
+
+/* 
+ * Find an store the filename descriptor for empty directories Filename.Name=''
+ */
+DBId_t Bvfs::get_dir_filenameid()
+{
+   uint32_t id;
+   if (dir_filenameid) {
+      return dir_filenameid;
+   }
+   Mmsg(db->cmd, "SELECT FilenameId FROM Filename WHERE Name = ''");
+   db_sql_query(db, db->cmd, db_int_handler, &id);
+   dir_filenameid = id;
+   return dir_filenameid;
 }
 
 /* Not yet working */
 void Bvfs::fv_get_big_files(int64_t pathid, int64_t min_size, int32_t limit)
 {
-   Mmsg(db->cmd,
+   Mmsg(db->cmd, 
         "SELECT FilenameId AS filenameid, Name AS name, size "
           "FROM ( "
          "SELECT FilenameId, base64_decode_lstat(8,LStat) AS size "
@@ -557,6 +617,7 @@ void Bvfs::fv_get_big_files(int64_t pathid, int64_t min_size, int32_t limit)
      "LIMIT %d ", pathid, jobids, min_size, limit);
 }
 
+
 /* Get the current path size and files count */
 void Bvfs::fv_get_current_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
 {
@@ -570,11 +631,11 @@ void Bvfs::fv_get_current_size_and_count(int64_t pathid, int64_t *size, int64_t
  " WHERE PathId = %lld "
    " AND JobId = %s ", pathid, jobids);
 
-   if (!QUERY_DB(jcr, db, db->cmd)) {
+   if (!db->QueryDB(jcr, db->cmd)) {
       return;
    }
 
-   if ((row = sql_fetch_row(db))) {
+   if ((row = db->sql_fetch_row())) {
       *size = str_to_int64(row[0]);
       *count =  str_to_int64(row[1]);
    }
@@ -593,11 +654,11 @@ void Bvfs::fv_get_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
  " WHERE PathId = %lld "
    " AND JobId = %s ", pathid, jobids);
 
-   if (!QUERY_DB(jcr, db, db->cmd)) {
+   if (!db->QueryDB(jcr, db->cmd)) {
       return;
    }
 
-   if ((row = sql_fetch_row(db))) {
+   if ((row = db->sql_fetch_row())) {
       *size = str_to_int64(row[0]);
       *count =  str_to_int64(row[1]);
    }
@@ -623,15 +684,15 @@ void Bvfs::fv_compute_size_and_count(int64_t pathid, int64_t *size, int64_t *cou
          " WHERE PPathId  = %lld "
            " AND JobId = %s ", pathid, jobids);
 
-   QUERY_DB(jcr, db, db->cmd);
-   int num = sql_num_rows(db);
+   db->QueryDB(jcr, db->cmd);
+   int num = db->sql_num_rows();
 
    if (num > 0) {
       int64_t *result = (int64_t *)malloc (num * sizeof(int64_t));
       SQL_ROW row;
       int i=0;
 
-      while((row = sql_fetch_row(db))) {
+      while((row = db->sql_fetch_row())) {
          result[i++] = str_to_int64(row[0]); /* PathId */
       }
 
@@ -658,23 +719,23 @@ void Bvfs::fv_update_size_and_count(int64_t pathid, int64_t size, int64_t count)
         " WHERE JobId = %s "
         " AND PathId = %lld ", count, size, jobids, pathid);
 
-   UPDATE_DB(jcr, db, db->cmd);
+   db->UpdateDB(jcr, db->cmd, false);
 }
 
-void bvfs_update_cache(JCR *jcr, B_DB *mdb)
+void bvfs_update_cache(JCR *jcr, BDB *mdb)
 {
    uint32_t nb=0;
    db_list_ctx jobids_list;
 
-   db_lock(mdb);
+   mdb->bdb_lock();
 
 #ifdef xxx
    /* TODO: Remove this code when updating make_bacula_table script */
    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
-   if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
+   if (!mdb->QueryDB(jcr, mdb->cmd)) {
       Dmsg0(dbglevel, "Creating cache table\n");
       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
-      QUERY_DB(jcr, mdb, mdb->cmd);
+      mdb->QueryDB(jcr, mdb->cmd);
 
       Mmsg(mdb->cmd,
            "CREATE TABLE PathHierarchy ( "
@@ -682,12 +743,12 @@ void bvfs_update_cache(JCR *jcr, B_DB *mdb)
            "PPathId integer NOT NULL, "
            "CONSTRAINT pathhierarchy_pkey "
            "PRIMARY KEY (PathId))");
-      QUERY_DB(jcr, mdb, mdb->cmd);
+      mdb->QueryDB(jcr, mdb->cmd);
 
       Mmsg(mdb->cmd,
            "CREATE INDEX pathhierarchy_ppathid "
            "ON PathHierarchy (PPathId)");
-      QUERY_DB(jcr, mdb, mdb->cmd);
+      mdb->QueryDB(jcr, mdb->cmd);
 
       Mmsg(mdb->cmd,
            "CREATE TABLE PathVisibility ("
@@ -697,12 +758,12 @@ void bvfs_update_cache(JCR *jcr, B_DB *mdb)
            "Files int4 DEFAULT 0, "
            "CONSTRAINT pathvisibility_pkey "
            "PRIMARY KEY (JobId, PathId))");
-      QUERY_DB(jcr, mdb, mdb->cmd);
+      mdb->QueryDB(jcr, mdb->cmd);
 
       Mmsg(mdb->cmd,
            "CREATE INDEX pathvisibility_jobid "
            "ON PathVisibility (JobId)");
-      QUERY_DB(jcr, mdb, mdb->cmd);
+      mdb->QueryDB(jcr, mdb->cmd);
 
    }
 #endif
@@ -713,28 +774,28 @@ void bvfs_update_cache(JCR *jcr, B_DB *mdb)
     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
   "ORDER BY JobId");
 
-   db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
+   mdb->bdb_sql_query(mdb->cmd, db_list_handler, &jobids_list);
 
    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
 
-   db_start_transaction(jcr, mdb);
+   mdb->bdb_start_transaction(jcr);
    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
    Mmsg(mdb->cmd,
         "DELETE FROM PathVisibility "
          "WHERE NOT EXISTS "
         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
-   nb = DELETE_DB(jcr, mdb, mdb->cmd);
+   nb = mdb->DeleteDB(jcr, mdb->cmd);
    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
 
-   db_end_transaction(jcr, mdb);
-   db_unlock(mdb);
+   mdb->bdb_end_transaction(jcr);
+   mdb->bdb_unlock();
 }
 
 /*
  * Update the bvfs cache for given jobids (1,2,3,4)
  */
 int
-bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
+bvfs_update_path_hierarchy_cache(JCR *jcr, BDB *mdb, char *jobids)
 {
    pathid_cache ppathid_cache;
    JobId_t JobId;
@@ -744,7 +805,8 @@ bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
    for (p=jobids; ; ) {
       int stat = get_next_jobid_from_list(&p, &JobId);
       if (stat < 0) {
-         return 0;
+         ret = 0;
+         break;
       }
       if (stat == 0) {
          break;
@@ -761,7 +823,7 @@ bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
  * Update the bvfs fileview for given jobids
  */
 void
-bvfs_update_fv_cache(JCR *jcr, B_DB *mdb, char *jobids)
+bvfs_update_fv_cache(JCR *jcr, BDB *mdb, char *jobids)
 {
    char *p;
    JobId_t JobId;
@@ -791,14 +853,24 @@ void Bvfs::update_cache()
    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
 }
 
+
+bool Bvfs::ch_dir(DBId_t pathid)
+{
+   reset_offset();
+
+   pwd_id = pathid;
+   return pwd_id != 0;
+}
+
+
 /* Change the current directory, returns true if the path exists */
 bool Bvfs::ch_dir(const char *path)
 {
+   db->bdb_lock();
    pm_strcpy(db->path, path);
    db->pnl = strlen(db->path);
-   db_lock(db);
-   ch_dir(db_get_path_record(jcr, db));
-   db_unlock(db);
+   ch_dir(db->bdb_get_path_record(jcr));
+   db->bdb_unlock();
    return pwd_id != 0;
 }
 
@@ -806,7 +878,7 @@ bool Bvfs::ch_dir(const char *path)
  * Get all file versions for a specified client
  * TODO: Handle basejobs using different client
  */
-void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
+void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, const char *client)
 {
    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
          (uint64_t)fnid, client);
@@ -820,12 +892,12 @@ void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
 
    POOL_MEM query;
 
-   Mmsg(query,//    1           2              3
-"SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
-//         4          5           6
-        "File.JobId, File.LStat, File.FileId, "
-//         7                    8
-       "Media.VolumeName, Media.InChanger "
+   Mmsg(query,//    1           2           3      4 
+"SELECT 'V', File.PathId, File.FilenameId,  0, File.JobId, "
+//            5           6          7
+        "File.LStat, File.FileId, File.Md5, "
+//         8                    9
+        "Media.VolumeName, Media.InChanger "
 "FROM File, Job, Client, JobMedia, Media "
 "WHERE File.FilenameId = %s "
   "AND File.PathId=%s "
@@ -840,13 +912,115 @@ void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
         limit, offset);
    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
-   db_sql_query(db, query.c_str(), list_entries, user_data);
+   db->bdb_sql_query(query.c_str(), list_entries, user_data);
+}
+
+/*
+ * Get all file versions for a specified client
+ * TODO: Handle basejobs using different client
+ */
+bool Bvfs::get_delta(FileId_t fileid)
+{
+   Dmsg1(dbglevel, "get_delta(%lld)\n", (uint64_t)fileid);
+   char ed1[50];
+   int32_t num;
+   SQL_ROW row;
+   POOL_MEM q;
+   POOL_MEM query;
+   char *fn = NULL;
+   bool ret = false;
+   db->bdb_lock();
+
+   /* Check if some FileId have DeltaSeq > 0
+    * Foreach of them we need to get the accurate_job list, and compute
+    * what are dependencies
+    */
+   Mmsg(query,
+        "SELECT F.JobId, FN.Name, F.PathId, F.DeltaSeq "
+        "FROM File AS F, Filename AS FN WHERE FileId = %lld "
+        "AND FN.FilenameId = F.FilenameId AND DeltaSeq > 0", fileid);
+
+   if (!db->QueryDB(jcr, query.c_str())) {
+      Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
+      goto bail_out;
+   }
+
+   /* TODO: Use an other DB connection can avoid to copy the result of the
+    * previous query into a temporary buffer
+    */
+   num = db->sql_num_rows();
+   Dmsg2(dbglevel, "Found %d Delta parts q=%s\n",
+         num, query.c_str());
+
+   if (num > 0 && (row = db->sql_fetch_row())) {
+      JOB_DBR jr, jr2;
+      db_list_ctx lst;
+      memset(&jr, 0, sizeof(jr));
+      memset(&jr2, 0, sizeof(jr2));
+
+      fn = bstrdup(row[1]);               /* Filename */
+      int64_t jid = str_to_int64(row[0]);     /* JobId */
+      int64_t pid = str_to_int64(row[2]);     /* PathId */
+
+      /* Need to limit the query to StartTime, Client/Fileset */
+      jr2.JobId = jid;
+      if (!db->bdb_get_job_record(jcr, &jr2)) {
+         Dmsg1(0, "Unable to get job record for jobid %d\n", jid);
+         goto bail_out;
+      }
+
+      jr.JobId = jid;
+      jr.ClientId = jr2.ClientId;
+      jr.FileSetId = jr2.FileSetId;
+      jr.JobLevel = L_INCREMENTAL;
+      jr.StartTime = jr2.StartTime;
+
+      /* Get accurate jobid list */
+      if (!db->bdb_get_accurate_jobids(jcr, &jr, &lst)) {
+         Dmsg1(0, "Unable to get Accurate list for jobid %d\n", jid);
+         goto bail_out;
+      }
+
+      /* Escape filename */
+      db->fnl = strlen(fn);
+      db->esc_name = check_pool_memory_size(db->esc_name, 2*db->fnl+2);
+      db->bdb_escape_string(jcr, db->esc_name, fn, db->fnl);
+
+      edit_int64(pid, ed1);     /* pathid */
+
+      int id=db->bdb_get_type_index();
+      Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
+           lst.list, db->esc_name, ed1,
+           lst.list, db->esc_name, ed1,
+           lst.list, lst.list);
+
+      Mmsg(db->cmd,
+           //       0     1     2   3      4      5      6            7
+           "SELECT 'd', PathId, 0, JobId, LStat, FileId, DeltaSeq, JobTDate"
+           " FROM (%s) AS F1 "
+           "ORDER BY DeltaSeq ASC",
+           query.c_str());
+
+      Dmsg1(dbglevel_sql, "q=%s\n", db->cmd);
+
+      if (!db->bdb_sql_query(db->cmd, list_entries, user_data)) {
+         Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
+         goto bail_out;
+      }
+   }
+   ret = true;
+bail_out:
+   if (fn) {
+      free(fn);
+   }
+   db->bdb_unlock();
+   return ret;
 }
 
 /*
  * Get all volumes for a specific file
  */
-void Bvfs::get_volumes(DBId_t  fileid)
+void Bvfs::get_volumes(FileId_t fileid)
 {
    Dmsg1(dbglevel, "get_volumes(%lld)\n", (uint64_t)fileid);
 
@@ -854,25 +1028,25 @@ void Bvfs::get_volumes(DBId_t  fileid)
    POOL_MEM query;
 
    Mmsg(query,
-//                          7                8
-"SELECT 'L',0,0,0,0,0,0, Media.VolumeName, Media.InChanger "
+//                                   7                8
+"SELECT DISTINCT 'L',0,0,0,0,0,0, Media.VolumeName, Media.InChanger "
 "FROM File JOIN JobMedia USING (JobId) JOIN Media USING (MediaId) "
 "WHERE File.FileId = %s "
   "AND File.FileIndex >= JobMedia.FirstIndex "
   "AND File.FileIndex <= JobMedia.LastIndex "
-  " ORDER BY JobMediaId LIMIT %d OFFSET %d"
+  " LIMIT %d OFFSET %d"
         ,edit_uint64(fileid, ed1), limit, offset);
    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
-   db_sql_query(db, query.c_str(), list_entries, user_data);
+   db->bdb_sql_query(query.c_str(), list_entries, user_data);
 }
 
 DBId_t Bvfs::get_root()
 {
    int p;
    *db->path = 0;
-   db_lock(db);
-   p = db_get_path_record(jcr, db);
-   db_unlock(db);
+   db->bdb_lock();
+   p = db->bdb_get_path_record(jcr);
+   db->bdb_unlock();
    return p;
 }
 
@@ -888,6 +1062,12 @@ int Bvfs::_handle_path(void *ctx, int fields, char **row)
       /* can have the same path 2 times */
       if (strcmp(row[BVFS_PathId], prev_dir)) {
          pm_strcpy(prev_dir, row[BVFS_PathId]);
+         if (strcmp(NPRTB(row[BVFS_FileIndex]), "0") == 0 &&
+             strcmp(NPRTB(row[BVFS_FileId]), "0") != 0)
+         {
+            /* The directory was probably deleted */
+            return 0;
+         }
          return list_entries(user_data, fields, row);
       }
    }
@@ -913,27 +1093,31 @@ void Bvfs::ls_special_dirs()
 
    POOL_MEM query;
    Mmsg(query,
-"(SELECT PPathId AS PathId, '..' AS Path "
-    "FROM  PathHierarchy "
-   "WHERE  PathId = %s "
+"(SELECT PathHierarchy.PPathId AS PathId, '..' AS Path "
+    "FROM  PathHierarchy JOIN PathVisibility USING (PathId) "
+   "WHERE  PathHierarchy.PathId = %s "
+   "AND PathVisibility.JobId IN (%s) "
 "UNION "
  "SELECT %s AS PathId, '.' AS Path)",
-        edit_uint64(pwd_id, ed1), ed1);
+        edit_uint64(pwd_id, ed1), jobids, ed1);
 
    POOL_MEM query2;
    Mmsg(query2,// 1      2     3        4     5       6
-"SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
+"SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId, FileIndex "
   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
-              "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
+              "File1.LStat AS LStat, File1.FileId AS FileId, "
+              "File1.FileIndex AS FileIndex, "
+              "Job1.JobTDate AS JobTDate "
+      "FROM File AS File1 JOIN Job AS Job1 USING (JobId)"
        "WHERE File1.FilenameId = %s "
        "AND File1.JobId IN (%s)) AS listfile1 "
   "ON (tmp.PathId = listfile1.PathId) "
-  "ORDER BY tmp.Path, JobId DESC ",
+  "ORDER BY tmp.Path, JobTDate DESC ",
         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
 
    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
-   db_sql_query(db, query2.c_str(), path_handler, this);
+   db->bdb_sql_query(query2.c_str(), path_handler, this);
 }
 
 /* Returns true if we have dirs to read */
@@ -949,7 +1133,7 @@ bool Bvfs::ls_dirs()
    POOL_MEM filter;
    if (*pattern) {
       Mmsg(filter, " AND Path2.Path %s '%s' ",
-           match_query[db_get_type_index(db)], pattern);
+           match_query[db->bdb_get_type_index()], pattern);
 
    }
 
@@ -967,12 +1151,14 @@ bool Bvfs::ls_dirs()
     */
    /* Then we get all the dir entries from File ... */
    Mmsg(query,
-//       0     1     2   3      4     5       6
-"SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
+//       0     1      2      3      4     5       6
+"SELECT 'D', PathId,  0,    Path, JobId, LStat, FileId, FileIndex FROM ( "
     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
            "lower(Path1.Path) AS lpath, "
            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
-           "listfile1.FileId AS FileId "
+           "listfile1.FileId AS FileId, "
+           "listfile1.JobTDate AS JobTDate, "
+           "listfile1.FileIndex AS FileIndex "
     "FROM ( "
       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
       "FROM PathHierarchy AS PathHierarchy1 "
@@ -988,11 +1174,13 @@ bool Bvfs::ls_dirs()
 
    "LEFT JOIN ( " /* get attributes if any */
        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
-              "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
+              "File1.LStat AS LStat, File1.FileId AS FileId, "
+              "File1.FileIndex, Job1.JobTDate AS JobTDate "
+     "FROM File AS File1 JOIN Job AS Job1 USING (JobId) "
        "WHERE File1.FilenameId = %s "
        "AND File1.JobId IN (%s)) AS listfile1 "
        "ON (listpath1.PathId = listfile1.PathId) "
-    ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
+    ") AS A ORDER BY Path,JobTDate DESC LIMIT %d OFFSET %d",
         edit_uint64(pwd_id, ed1),
         jobids,
         filter.c_str(),
@@ -1002,24 +1190,24 @@ bool Bvfs::ls_dirs()
 
    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
 
-   db_lock(db);
-   db_sql_query(db, query.c_str(), path_handler, this);
-   nb_record = sql_num_rows(db);
-   db_unlock(db);
+   db->bdb_lock();
+   db->bdb_sql_query(query.c_str(), path_handler, this);
+   nb_record = db->sql_num_rows();
+   db->bdb_unlock();
 
    return nb_record == limit;
 }
 
-void build_ls_files_query(B_DB *db, POOL_MEM &query,
+void build_ls_files_query(BDB *db, POOL_MEM &query,
                           const char *JobId, const char *PathId,
                           const char *filter, int64_t limit, int64_t offset)
 {
-   if (db_get_type_index(db) == SQL_TYPE_POSTGRESQL) {
-      Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)],
+   if (db->bdb_get_type_index() == SQL_TYPE_POSTGRESQL) {
+      Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
            JobId, PathId, JobId, PathId,
            filter, limit, offset);
    } else {
-      Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)],
+      Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
            JobId, PathId, JobId, PathId,
            limit, offset, filter, JobId, JobId);
    }
@@ -1043,7 +1231,7 @@ bool Bvfs::ls_files()
 
    edit_uint64(pwd_id, pathid);
    if (*pattern) {
-      Mmsg(filter, " AND Filename.Name %s '%s' ",
+      Mmsg(filter, " AND Filename.Name %s '%s' ", 
            match_query[db_get_type_index(db)], pattern);
 
    } else if (*filename) {
@@ -1056,10 +1244,10 @@ bool Bvfs::ls_files()
 
    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
 
-   db_lock(db);
-   db_sql_query(db, query.c_str(), list_entries, user_data);
-   nb_record = sql_num_rows(db);
-   db_unlock(db);
+   db->bdb_lock();
+   db->bdb_sql_query(query.c_str(), list_entries, user_data);
+   nb_record = db->sql_num_rows();
+   db->bdb_unlock();
 
    return nb_record == limit;
 }
@@ -1121,11 +1309,11 @@ static bool check_temp(char *output_table)
 
 void Bvfs::clear_cache()
 {
-   db_sql_query(db, "BEGIN",                     NULL, NULL);
-   db_sql_query(db, "UPDATE Job SET HasCache=0", NULL, NULL);
-   db_sql_query(db, "TRUNCATE PathHierarchy",    NULL, NULL);
-   db_sql_query(db, "TRUNCATE PathVisibility",   NULL, NULL);
-   db_sql_query(db, "COMMIT",                    NULL, NULL);
+   db->bdb_sql_query("BEGIN",                     NULL, NULL);
+   db->bdb_sql_query("UPDATE Job SET HasCache=0", NULL, NULL);
+   db->bdb_sql_query("TRUNCATE PathHierarchy",    NULL, NULL);
+   db->bdb_sql_query("TRUNCATE PathVisibility",   NULL, NULL);
+   db->bdb_sql_query("COMMIT",                    NULL, NULL);
 }
 
 bool Bvfs::drop_restore_list(char *output_table)
@@ -1133,7 +1321,7 @@ bool Bvfs::drop_restore_list(char *output_table)
    POOL_MEM query;
    if (check_temp(output_table)) {
       Mmsg(query, "DROP TABLE %s", output_table);
-      db_sql_query(db, query.c_str(), NULL, NULL);
+      db->bdb_sql_query(query.c_str(), NULL, NULL);
       return true;
    }
    return false;
@@ -1145,6 +1333,7 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
    POOL_MEM query;
    POOL_MEM tmp, tmp2;
    int64_t id, jobid, prev_jobid;
+   int num;
    bool init=false;
    bool ret=false;
    /* check args */
@@ -1153,20 +1342,22 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
        (*hardlink && !is_a_number_list(hardlink))||
        (!*hardlink && !*fileid && !*dirid && !*hardlink))
    {
+      Dmsg0(dbglevel, "ERROR: One or more of FileId, DirId or HardLink is not given or not a number.\n");
       return false;
    }
    if (!check_temp(output_table)) {
+      Dmsg0(dbglevel, "ERROR: Wrong format for table name (in path field).\n");
       return false;
    }
 
-   db_lock(db);
+   db->bdb_lock();
 
    /* Cleanup old tables first */
    Mmsg(query, "DROP TABLE btemp%s", output_table);
-   db_sql_query(db, query.c_str());
+   db->bdb_sql_query(query.c_str());
 
    Mmsg(query, "DROP TABLE %s", output_table);
-   db_sql_query(db, query.c_str());
+   db->bdb_sql_query(query.c_str());
 
    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
 
@@ -1174,7 +1365,8 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
       init=true;
       Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
                       "PathId, FileId "
-                 "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
+                 "FROM File,Job WHERE Job.JobId=File.Jobid "
+                 "AND FileId IN (%s)",
            fileid);
       pm_strcat(query, tmp.c_str());
    }
@@ -1183,13 +1375,14 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
    while (get_next_id_from_list(&dirid, &id) == 1) {
       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
 
-      if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
-         Dmsg0(dbglevel, "Can't search for path\n");
+      if (!db->bdb_sql_query(tmp.c_str(), get_path_handler, (void *)&tmp2)) {
+         Dmsg3(dbglevel, "ERROR: Path not found %lld q=%s s=%s\n",
+               id, tmp.c_str(), tmp2.c_str());
          /* print error */
          goto bail_out;
       }
       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
-         Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
+         Dmsg3(dbglevel, "ERROR: Path not found %lld q=%s s=%s\n",
                id, tmp.c_str(), tmp2.c_str());
          break;
       }
@@ -1209,7 +1402,7 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
 
       size_t len = strlen(tmp.c_str());
       tmp2.check_size((len+1) * 2);
-      db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
+      db->bdb_escape_string(jcr, tmp2.c_str(), tmp.c_str(), len);
 
       if (init) {
          query.strcat(" UNION ");
@@ -1241,7 +1434,7 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
    prev_jobid=0;
    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
       if (get_next_id_from_list(&hardlink, &id) != 1) {
-         Dmsg0(dbglevel, "hardlink should be two by two\n");
+         Dmsg0(dbglevel, "ERROR: hardlink should be two by two\n");
          goto bail_out;
       }
       if (jobid != prev_jobid) { /* new job */
@@ -1271,41 +1464,139 @@ bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
       init = true;
    }
 
-   Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
+   Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
 
-   if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
-      Dmsg0(dbglevel, "Can't execute q\n");
+   if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
+      Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
       goto bail_out;
    }
 
-   Mmsg(query, sql_bvfs_select[db_get_type_index(db)],
+   Mmsg(query, sql_bvfs_select[db->bdb_get_type_index()],
         output_table, output_table, output_table);
 
    /* TODO: handle jobid filter */
-   Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
-   if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
-      Dmsg0(dbglevel, "Can't execute q\n");
+   Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
+   if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
+      Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
       goto bail_out;
    }
 
-   /* MySQL need it */
-   if (db_get_type_index(db) == SQL_TYPE_MYSQL) {
+   /* MySQL needs the index */
+   if (db->bdb_get_type_index() == SQL_TYPE_MYSQL) {
       Mmsg(query, "CREATE INDEX idx_%s ON %s (JobId)",
            output_table, output_table);
-      Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
-      if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
-         Dmsg0(dbglevel, "Can't execute q\n");
-         goto bail_out;
+      Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
+      if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
+         Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
+         goto bail_out; 
+      } 
+   }
+
+   /* Check if some FileId have DeltaSeq > 0
+    * Foreach of them we need to get the accurate_job list, and compute
+    * what are dependencies
+    */
+   Mmsg(query, 
+        "SELECT F.FileId, F.JobId, F.FilenameId, F.PathId, F.DeltaSeq "
+          "FROM File AS F JOIN Job USING (JobId) JOIN %s USING (FileId) "
+         "WHERE DeltaSeq > 0", output_table);
+
+   if (!db->QueryDB(jcr, query.c_str())) {
+      Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
+   }
+
+   /* TODO: Use an other DB connection can avoid to copy the result of the
+    * previous query into a temporary buffer
+    */
+   num = db->sql_num_rows();
+   Dmsg2(dbglevel, "Found %d Delta parts in restore selection q=%s\n", num, query.c_str());
+
+   if (num > 0) {
+      int64_t *result = (int64_t *)malloc (num * 4 * sizeof(int64_t));
+      SQL_ROW row;
+      int i=0;
+
+      while((row = db->sql_fetch_row())) {
+         result[i++] = str_to_int64(row[0]); /* FileId */
+         result[i++] = str_to_int64(row[1]); /* JobId */
+         result[i++] = str_to_int64(row[2]); /* FilenameId */
+         result[i++] = str_to_int64(row[3]); /* PathId */
+      } 
+
+      i=0;
+      while (num > 0) {
+         insert_missing_delta(output_table, result + i);
+         i += 4;
+         num--;
       }
+      free(result);
    }
 
    ret = true;
 
 bail_out:
    Mmsg(query, "DROP TABLE btemp%s", output_table);
-   db_sql_query(db, query.c_str(), NULL, NULL);
-   db_unlock(db);
+   db->bdb_sql_query(query.c_str(), NULL, NULL);
+   db->bdb_unlock();
    return ret;
 }
 
+void Bvfs::insert_missing_delta(char *output_table, int64_t *res)
+{
+   char ed1[50];
+   db_list_ctx lst;
+   POOL_MEM query;
+   JOB_DBR jr, jr2;
+   memset(&jr, 0, sizeof(jr));
+   memset(&jr2, 0, sizeof(jr2));
+
+   /* Need to limit the query to StartTime, Client/Fileset */
+   jr2.JobId = res[1];
+   db->bdb_get_job_record(jcr, &jr2);
+
+   jr.JobId = res[1];
+   jr.ClientId = jr2.ClientId;
+   jr.FileSetId = jr2.FileSetId;
+   jr.JobLevel = L_INCREMENTAL;
+   jr.StartTime = jr2.StartTime;
+
+   /* Get accurate jobid list */
+   db->bdb_get_accurate_jobids(jcr, &jr, &lst);
+
+   Dmsg2(dbglevel_sql, "JobId list for %lld is %s\n", res[0], lst.list);
+
+   /* The list contains already the last DeltaSeq element, so
+    * we don't need to select it in the next query
+    */
+   for (int l = strlen(lst.list); l > 0; l--) {
+      if (lst.list[l] == ',') {
+         lst.list[l] = '\0';
+         break;
+      }
+   }
+
+   Dmsg1(dbglevel_sql, "JobId list after strip is %s\n", lst.list);
+
+   /* Escape filename */
+   db->fnl = strlen((char *)res[2]);
+   db->esc_name = check_pool_memory_size(db->esc_name, 2*db->fnl+2);
+   db->bdb_escape_string(jcr, db->esc_name, (char *)res[2], db->fnl);
+
+   edit_int64(res[3], ed1);     /* pathid */
+
+   int id=db->bdb_get_type_index();
+   Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
+        lst.list, db->esc_name, ed1,
+        lst.list, db->esc_name, ed1,
+        lst.list, lst.list);
+
+   Mmsg(db->cmd, "INSERT INTO %s "
+                   "SELECT JobId, FileIndex, FileId FROM (%s) AS F1",
+        output_table, query.c_str());
+
+   if (!db->bdb_sql_query(db->cmd, NULL, NULL)) {
+      Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
+   }
+}
+
 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL */