]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Change copyright as per agreement with FSFE
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19
20 #include "bacula.h"
21 #include "cats.h" 
22 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL
23 #include "lib/htable.h"
24 #include "bvfs.h"
25
26 #define dbglevel      DT_BVFS|10
27 #define dbglevel_sql  DT_SQL|15
28
29 static int result_handler(void *ctx, int fields, char **row)
30 {
31    if (fields == 4) {
32       Pmsg4(0, "%s\t%s\t%s\t%s\n",
33             row[0], row[1], row[2], row[3]);
34    } else if (fields == 5) {
35       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n",
36             row[0], row[1], row[2], row[3], row[4]);
37    } else if (fields == 6) {
38       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n",
39             row[0], row[1], row[2], row[3], row[4], row[5]);
40    } else if (fields == 7) {
41       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
42             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
43    }
44    return 0;
45 }
46
47 Bvfs::Bvfs(JCR *j, BDB *mdb) {
48    jcr = j;
49    jcr->inc_use_count();
50    db = mdb;                 /* need to inc ref count */
51    jobids = get_pool_memory(PM_NAME);
52    prev_dir = get_pool_memory(PM_NAME);
53    pattern = get_pool_memory(PM_NAME);
54    filename = get_pool_memory(PM_NAME);
55    tmp = get_pool_memory(PM_NAME);
56    escaped_list = get_pool_memory(PM_NAME);
57    *filename = *jobids = *prev_dir = *pattern = 0;
58    dir_filenameid = pwd_id = offset = 0;
59    see_copies = see_all_versions = false;
60    limit = 1000;
61    attr = new_attr(jcr);
62    list_entries = result_handler;
63    user_data = this;
64    username = NULL;
65    job_acl = client_acl = pool_acl = fileset_acl = NULL;
66 }
67
68 Bvfs::~Bvfs() {
69    free_pool_memory(jobids);
70    free_pool_memory(pattern);
71    free_pool_memory(prev_dir);
72    free_pool_memory(filename);
73    free_pool_memory(tmp);
74    free_pool_memory(escaped_list);
75    if (username) {
76       free(username);
77    }
78    free_attr(attr);
79    jcr->dec_use_count();
80 }
81
82 char *Bvfs::escape_list(alist *lst)
83 {
84    char *elt;
85    int len;
86
87    /* List is empty, reject everything */
88    if (!lst || lst->size() == 0) {
89       Mmsg(escaped_list, "''");
90       return escaped_list;
91    }
92
93    *tmp = 0;
94    *escaped_list = 0;
95
96    foreach_alist(elt, lst) {
97       if (elt && *elt) {
98          len = strlen(elt);
99          /* Escape + ' ' */
100          tmp = check_pool_memory_size(tmp, 2 * len + 2 + 2);
101
102          tmp[0] = '\'';
103          db->bdb_escape_string(jcr, tmp + 1 , elt, len);
104          pm_strcat(tmp, "'");
105
106          if (*escaped_list) {
107             pm_strcat(escaped_list, ",");
108          }
109
110          pm_strcat(escaped_list, tmp);
111       }
112    }
113    return escaped_list;
114 }
115
116 void Bvfs::filter_jobid()
117 {
118    POOL_MEM query;
119    POOL_MEM sub_where;
120    POOL_MEM sub_join;
121
122    /* No ACL, no username, no check */
123    if (!job_acl && !fileset_acl && !client_acl && !pool_acl && !username) {
124       Dmsg0(dbglevel_sql, "No ACL\n");
125       return;
126    }
127
128    if (job_acl) {
129       Mmsg(sub_where, " AND Job.Name IN (%s) ", escape_list(job_acl));
130    }
131
132    if (fileset_acl) {
133       Mmsg(query, " AND FileSet.FileSet IN (%s) ", escape_list(fileset_acl));
134       pm_strcat(sub_where, query.c_str());
135       pm_strcat(sub_join, " JOIN FileSet USING (FileSetId) ");
136    }
137
138    if (client_acl) {
139       Mmsg(query, " AND Client.Name IN (%s) ", escape_list(client_acl));
140       pm_strcat(sub_where, query.c_str());
141    }
142
143    if (pool_acl) {
144       Mmsg(query, " AND Pool.Name IN (%s) ", escape_list(pool_acl));
145       pm_strcat(sub_where, query.c_str());
146       pm_strcat(sub_join, " JOIN Pool USING (PoolId) ");
147    }
148
149    if (username) {
150       /* Query used by Bweb to filter clients, activated when using
151        * set_username()
152        */
153       Mmsg(query,
154       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
155         "JOIN (SELECT ClientId FROM client_group_member "
156         "JOIN client_group USING (client_group_id) "
157         "JOIN bweb_client_group_acl USING (client_group_id) "
158         "JOIN bweb_user USING (userid) "
159        "WHERE bweb_user.username = '%s' "
160       ") AS filter USING (ClientId) "
161         " WHERE JobId IN (%s) %s",
162            sub_join.c_str(), username, jobids, sub_where.c_str());
163
164    } else {
165       Mmsg(query,
166       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
167       " WHERE JobId IN (%s) %s",
168            sub_join.c_str(), jobids, sub_where.c_str());
169    }
170
171    db_list_ctx ctx;
172    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
173    db->bdb_sql_query(query.c_str(), db_list_handler, &ctx);
174    pm_strcpy(jobids, ctx.list);
175 }
176
177 void Bvfs::set_jobid(JobId_t id)
178 {
179    Mmsg(jobids, "%lld", (uint64_t)id);
180    filter_jobid();
181 }
182
183 void Bvfs::set_jobids(char *ids)
184 {
185    pm_strcpy(jobids, ids);
186    filter_jobid();
187 }
188
189 /*
190  * TODO: Find a way to let the user choose how he wants to display
191  * files and directories
192  */
193
194
195 /*
196  * Working Object to store PathId already seen (avoid
197  * database queries), equivalent to %cache_ppathid in perl
198  */
199
200 #define NITEMS 50000
201 class pathid_cache {
202 private:
203    hlink *nodes;
204    int nb_node;
205    int max_node;
206
207    alist *table_node;
208
209    htable *cache_ppathid;
210
211 public:
212    pathid_cache() {
213       hlink link;
214       cache_ppathid = (htable *)malloc(sizeof(htable));
215       cache_ppathid->init(&link, &link, NITEMS);
216       max_node = NITEMS;
217       nodes = (hlink *) malloc(max_node * sizeof (hlink));
218       nb_node = 0;
219       table_node = New(alist(5, owned_by_alist));
220       table_node->append(nodes);
221    }
222
223    hlink *get_hlink() {
224       if (++nb_node >= max_node) {
225          nb_node = 0;
226          nodes = (hlink *)malloc(max_node * sizeof(hlink));
227          table_node->append(nodes);
228       }
229       return nodes + nb_node;
230    }
231
232    bool lookup(char *pathid) {
233       bool ret = cache_ppathid->lookup(pathid) != NULL;
234       return ret;
235    }
236
237    void insert(char *pathid) {
238       hlink *h = get_hlink();
239       cache_ppathid->insert(pathid, h);
240    }
241
242    ~pathid_cache() {
243       cache_ppathid->destroy();
244       free(cache_ppathid);
245       delete table_node;
246    }
247 private:
248    pathid_cache(const pathid_cache &); /* prohibit pass by value */
249    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
250 } ;
251
252 /* Return the parent_dir with the trailing /  (update the given string)
253  * TODO: see in the rest of bacula if we don't have already this function
254  * dir=/tmp/toto/
255  * dir=/tmp/
256  * dir=/
257  * dir=
258  */
259 char *bvfs_parent_dir(char *path)
260 {
261    char *p = path;
262    int len = strlen(path) - 1;
263
264    /* windows directory / */
265    if (len == 2 && B_ISALPHA(path[0])
266                 && path[1] == ':'
267                 && path[2] == '/')
268    {
269       len = 0;
270       path[0] = '\0';
271    }
272
273    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
274       path[len] = '\0';
275    }
276
277    if (len > 0) {
278       p += len;
279       while (p > path && !IsPathSeparator(*p)) {
280          p--;
281       }
282       p[1] = '\0';
283    }
284    return path;
285 }
286
287 /* Return the basename of the with the trailing /
288  * TODO: see in the rest of bacula if we don't have
289  * this function already
290  */
291 char *bvfs_basename_dir(char *path)
292 {
293    char *p = path;
294    int len = strlen(path) - 1;
295
296    if (path[len] == '/') {      /* if directory, skip last / */
297       len -= 1;
298    }
299
300    if (len > 0) {
301       p += len;
302       while (p > path && !IsPathSeparator(*p)) {
303          p--;
304       }
305       if (*p == '/') {
306          p++;                  /* skip first / */
307       }
308    }
309    return p;
310 }
311
312 static void build_path_hierarchy(JCR *jcr, BDB *mdb,
313                                  pathid_cache &ppathid_cache,
314                                  char *org_pathid, char *path)
315 {
316    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
317    char pathid[50];
318    ATTR_DBR parent;
319    char *bkp = mdb->path;
320    strncpy(pathid, org_pathid, sizeof(pathid));
321
322    /* Does the ppathid exist for this ? we use a memory cache...  In order to
323     * avoid the full loop, we consider that if a dir is allready in the
324     * PathHierarchy table, then there is no need to calculate all the
325     * hierarchy
326     */
327    while (path && *path)
328    {
329       if (!ppathid_cache.lookup(pathid))
330       {
331          Mmsg(mdb->cmd,
332               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
333               pathid);
334
335          if (!mdb->QueryDB(jcr, mdb->cmd)) {
336             goto bail_out;      /* Query failed, just leave */
337          }
338
339          /* Do we have a result ? */
340          if (mdb->sql_num_rows() > 0) {
341             ppathid_cache.insert(pathid);
342             /* This dir was in the db ...
343              * It means we can leave, the tree has allready been built for
344              * this dir
345              */
346             goto bail_out;
347          } else {
348             /* search or create parent PathId in Path table */
349             mdb->path = bvfs_parent_dir(path);
350             mdb->pnl = strlen(mdb->path);
351             if (!mdb->bdb_create_path_record(jcr, &parent)) {
352                goto bail_out;
353             }
354             ppathid_cache.insert(pathid);
355
356             Mmsg(mdb->cmd,
357                  "INSERT INTO PathHierarchy (PathId, PPathId) "
358                  "VALUES (%s,%lld)",
359                  pathid, (uint64_t) parent.PathId);
360
361             if (!mdb->InsertDB(jcr, mdb->cmd)) {
362                goto bail_out;   /* Can't insert the record, just leave */
363             }
364
365             edit_uint64(parent.PathId, pathid);
366             path = mdb->path;   /* already done */
367          }
368       } else {
369          /* It's already in the cache.  We can leave, no time to waste here,
370           * all the parent dirs have allready been done
371           */
372          goto bail_out;
373       }
374    }
375
376 bail_out:
377    mdb->path = bkp;
378    mdb->fnl = 0;
379 }
380
381 /*
382  * Internal function to update path_hierarchy cache with a shared pathid cache
383  * return Error 0
384  *        OK    1
385  */
386 static int update_path_hierarchy_cache(JCR *jcr,
387                                         BDB *mdb,
388                                         pathid_cache &ppathid_cache,
389                                         JobId_t JobId)
390 {
391    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
392    uint32_t ret=0;
393    uint32_t num;
394    char jobid[50];
395    edit_uint64(JobId, jobid);
396
397    mdb->bdb_lock();
398
399    /* We don't really want to harm users with spurious messages,
400     * everything is handled by transaction
401     */
402    mdb->set_use_fatal_jmsg(false);
403
404    mdb->bdb_start_transaction(jcr);
405
406    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
407
408    if (!mdb->QueryDB(jcr, mdb->cmd) || mdb->sql_num_rows() > 0) {
409       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
410       ret = 1;
411       goto bail_out;
412    }
413
414    /* Inserting path records for JobId */
415    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
416                    "SELECT DISTINCT PathId, JobId "
417                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
418                            "UNION "
419                            "SELECT PathId, BaseFiles.JobId "
420                              "FROM BaseFiles JOIN File AS F USING (FileId) "
421                             "WHERE BaseFiles.JobId = %s) AS B",
422         jobid, jobid);
423
424    if (!mdb->QueryDB(jcr, mdb->cmd)) {
425       Dmsg1(dbglevel, "Can't fill PathVisibility %d\n", (uint32_t)JobId );
426       goto bail_out;
427    }
428
429    /* Now we have to do the directory recursion stuff to determine missing
430     * visibility We try to avoid recursion, to be as fast as possible We also
431     * only work on not allready hierarchised directories...
432     */
433    Mmsg(mdb->cmd,
434      "SELECT PathVisibility.PathId, Path "
435        "FROM PathVisibility "
436             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
437             "LEFT JOIN PathHierarchy "
438          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
439       "WHERE PathVisibility.JobId = %s "
440         "AND PathHierarchy.PathId IS NULL "
441       "ORDER BY Path", jobid);
442    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
443
444    if (!mdb->QueryDB(jcr, mdb->cmd)) {
445       Dmsg1(dbglevel, "Can't get new Path %d\n", (uint32_t)JobId );
446       goto bail_out;
447    }
448
449    /* TODO: I need to reuse the DB connection without emptying the result
450     * So, now i'm copying the result in memory to be able to query the
451     * catalog descriptor again.
452     */
453    num = mdb->sql_num_rows();
454    if (num > 0) {
455       char **result = (char **)malloc (num * 2 * sizeof(char *));
456
457       SQL_ROW row;
458       int i=0;
459       while((row = mdb->sql_fetch_row())) {
460          result[i++] = bstrdup(row[0]);
461          result[i++] = bstrdup(row[1]);
462       }
463
464       i=0;
465       while (num > 0) {
466          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
467          free(result[i++]);
468          free(result[i++]);
469          num--;
470       }
471       free(result);
472    }
473
474    
475    if (mdb->bdb_get_type_index() == SQL_TYPE_SQLITE3) {
476       Mmsg(mdb->cmd,
477  "INSERT INTO PathVisibility (PathId, JobId) "
478    "SELECT DISTINCT h.PPathId AS PathId, %s "
479      "FROM PathHierarchy AS h "
480     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
481       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
482            jobid, jobid, jobid );
483
484    } else if (mdb->bdb_get_type_index() == SQL_TYPE_MYSQL) {
485       Mmsg(mdb->cmd,
486   "INSERT INTO PathVisibility (PathId, JobId)  "
487    "SELECT a.PathId,%s "
488    "FROM ( "
489      "SELECT DISTINCT h.PPathId AS PathId "
490        "FROM PathHierarchy AS h "
491        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
492       "WHERE p.JobId=%s) AS a "
493       "LEFT JOIN PathVisibility AS b ON (b.JobId=%s and a.PathId = b.PathId) "
494       "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
495
496    } else {
497       Mmsg(mdb->cmd,
498   "INSERT INTO PathVisibility (PathId, JobId)  "
499    "SELECT a.PathId,%s "
500    "FROM ( "
501      "SELECT DISTINCT h.PPathId AS PathId "
502        "FROM PathHierarchy AS h "
503        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
504       "WHERE p.JobId=%s) AS a LEFT JOIN "
505        "(SELECT PathId "
506           "FROM PathVisibility "
507          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
508    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
509    }
510
511    do {
512       ret = mdb->QueryDB(jcr, mdb->cmd);
513    } while (ret && mdb->sql_affected_rows() > 0);
514
515    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
516    ret = mdb->UpdateDB(jcr, mdb->cmd);
517
518 bail_out:
519    mdb->bdb_end_transaction(jcr);
520
521    if (!ret) {
522       Mmsg(mdb->cmd, "SELECT HasCache FROM Job WHERE JobId=%s", jobid);
523       mdb->bdb_sql_query(mdb->cmd, db_int_handler, &ret);
524    }
525
526    /* Enable back the FATAL message if something is wrong */
527    mdb->set_use_fatal_jmsg(true);
528
529    mdb->bdb_unlock();
530    return ret;
531 }
532
533 /* 
534  * Find an store the filename descriptor for empty directories Filename.Name=''
535  */
536 DBId_t Bvfs::get_dir_filenameid()
537 {
538    uint32_t id;
539    if (dir_filenameid) {
540       return dir_filenameid;
541    }
542    Mmsg(db->cmd, "SELECT FilenameId FROM Filename WHERE Name = ''");
543    db_sql_query(db, db->cmd, db_int_handler, &id);
544    dir_filenameid = id;
545    return dir_filenameid;
546 }
547
548 /* Compute the cache for the bfileview compoment */
549 void Bvfs::fv_update_cache()
550 {
551    int64_t pathid;
552    int64_t size=0, count=0;
553
554    Dmsg0(dbglevel, "fv_update_cache()\n");
555
556    if (!*jobids) {
557       return;                   /* Nothing to build */
558    }
559
560    db->bdb_lock();
561    /* We don't really want to harm users with spurious messages,
562     * everything is handled by transaction
563     */
564    db->set_use_fatal_jmsg(false);
565
566    db->bdb_start_transaction(jcr);
567
568    pathid = get_root();
569
570    fv_compute_size_and_count(pathid, &size, &count);
571
572    db->bdb_end_transaction(jcr);
573
574    /* Enable back the FATAL message if something is wrong */
575    db->set_use_fatal_jmsg(true);
576
577    db->bdb_unlock();
578 }
579
580 /* Not yet working */
581 void Bvfs::fv_get_big_files(int64_t pathid, int64_t min_size, int32_t limit)
582 {
583    Mmsg(db->cmd, 
584         "SELECT FilenameId AS filenameid, Name AS name, size "
585           "FROM ( "
586          "SELECT FilenameId, base64_decode_lstat(8,LStat) AS size "
587            "FROM File "
588           "WHERE PathId  = %lld "
589             "AND JobId = %s "
590         ") AS S INNER JOIN Filename USING (FilenameId) "
591      "WHERE S.size > %lld "
592      "ORDER BY S.size DESC "
593      "LIMIT %d ", pathid, jobids, min_size, limit);
594 }
595
596 /* Get the current path size and files count */
597 void Bvfs::fv_get_current_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
598 {
599    SQL_ROW row;
600
601    *size = *count = 0;
602
603    Mmsg(db->cmd,
604  "SELECT Size AS size, Files AS files "
605   " FROM PathVisibility "
606  " WHERE PathId = %lld "
607    " AND JobId = %s ", pathid, jobids);
608
609    if (!db->QueryDB(jcr, db->cmd)) {
610       return;
611    }
612
613    if ((row = db->sql_fetch_row())) {
614       *size = str_to_int64(row[0]);
615       *count =  str_to_int64(row[1]);
616    }
617 }
618
619 /* Compute for the current path the size and files count */
620 void Bvfs::fv_get_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
621 {
622    SQL_ROW row;
623
624    *size = *count = 0;
625
626    Mmsg(db->cmd,
627  "SELECT sum(base64_decode_lstat(8,LStat)) AS size, count(1) AS files "
628   " FROM File "
629  " WHERE PathId = %lld "
630    " AND JobId = %s ", pathid, jobids);
631
632    if (!db->QueryDB(jcr, db->cmd)) {
633       return;
634    }
635
636    if ((row = db->sql_fetch_row())) {
637       *size = str_to_int64(row[0]);
638       *count =  str_to_int64(row[1]);
639    }
640 }
641
642 void Bvfs::fv_compute_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
643 {
644    Dmsg1(dbglevel, "fv_compute_size_and_count(%lld)\n", pathid);
645
646    fv_get_current_size_and_count(pathid, size, count);
647    if (*size > 0) {
648       return;
649    }
650
651    /* Update stats for the current directory */
652    fv_get_size_and_count(pathid, size, count);
653
654    /* Update stats for all sub directories */
655    Mmsg(db->cmd,
656         " SELECT PathId "
657           " FROM PathVisibility "
658                " INNER JOIN PathHierarchy USING (PathId) "
659          " WHERE PPathId  = %lld "
660            " AND JobId = %s ", pathid, jobids);
661
662    db->QueryDB(jcr, db->cmd);
663    int num = db->sql_num_rows();
664
665    if (num > 0) {
666       int64_t *result = (int64_t *)malloc (num * sizeof(int64_t));
667       SQL_ROW row;
668       int i=0;
669
670       while((row = db->sql_fetch_row())) {
671          result[i++] = str_to_int64(row[0]); /* PathId */
672       }
673
674       i=0;
675       while (num > 0) {
676          int64_t c=0, s=0;
677          fv_compute_size_and_count(result[i], &s, &c);
678          *size += s;
679          *count += c;
680
681          i++;
682          num--;
683       }
684       free(result);
685    }
686
687    fv_update_size_and_count(pathid, *size, *count);
688 }
689
690 void Bvfs::fv_update_size_and_count(int64_t pathid, int64_t size, int64_t count)
691 {
692    Mmsg(db->cmd,
693         "UPDATE PathVisibility SET Files = %lld, Size = %lld "
694         " WHERE JobId = %s "
695         " AND PathId = %lld ", count, size, jobids, pathid);
696
697    db->UpdateDB(jcr, db->cmd);
698 }
699
700 void bvfs_update_cache(JCR *jcr, BDB *mdb)
701 {
702    uint32_t nb=0;
703    db_list_ctx jobids_list;
704
705    mdb->bdb_lock();
706
707 #ifdef xxx
708    /* TODO: Remove this code when updating make_bacula_table script */
709    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
710    if (!mdb->QueryDB(jcr, mdb->cmd)) {
711       Dmsg0(dbglevel, "Creating cache table\n");
712       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
713       mdb->QueryDB(jcr, mdb->cmd);
714
715       Mmsg(mdb->cmd,
716            "CREATE TABLE PathHierarchy ( "
717            "PathId integer NOT NULL, "
718            "PPathId integer NOT NULL, "
719            "CONSTRAINT pathhierarchy_pkey "
720            "PRIMARY KEY (PathId))");
721       mdb->QueryDB(jcr, mdb->cmd);
722
723       Mmsg(mdb->cmd,
724            "CREATE INDEX pathhierarchy_ppathid "
725            "ON PathHierarchy (PPathId)");
726       mdb->QueryDB(jcr, mdb->cmd);
727
728       Mmsg(mdb->cmd,
729            "CREATE TABLE PathVisibility ("
730            "PathId integer NOT NULL, "
731            "JobId integer NOT NULL, "
732            "Size int8 DEFAULT 0, "
733            "Files int4 DEFAULT 0, "
734            "CONSTRAINT pathvisibility_pkey "
735            "PRIMARY KEY (JobId, PathId))");
736       mdb->QueryDB(jcr, mdb->cmd);
737
738       Mmsg(mdb->cmd,
739            "CREATE INDEX pathvisibility_jobid "
740            "ON PathVisibility (JobId)");
741       mdb->QueryDB(jcr, mdb->cmd);
742
743    }
744 #endif
745
746    Mmsg(mdb->cmd,
747  "SELECT JobId from Job "
748   "WHERE HasCache = 0 "
749     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
750   "ORDER BY JobId");
751
752    mdb->bdb_sql_query(mdb->cmd, db_list_handler, &jobids_list);
753
754    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
755
756    mdb->bdb_start_transaction(jcr);
757    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
758    Mmsg(mdb->cmd,
759         "DELETE FROM PathVisibility "
760          "WHERE NOT EXISTS "
761         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
762    nb = mdb->DeleteDB(jcr, mdb->cmd);
763    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
764
765    mdb->bdb_end_transaction(jcr);
766    mdb->bdb_unlock();
767 }
768
769 /*
770  * Update the bvfs cache for given jobids (1,2,3,4)
771  */
772 int
773 bvfs_update_path_hierarchy_cache(JCR *jcr, BDB *mdb, char *jobids)
774 {
775    pathid_cache ppathid_cache;
776    JobId_t JobId;
777    char *p;
778    int ret=1;
779
780    for (p=jobids; ; ) {
781       int stat = get_next_jobid_from_list(&p, &JobId);
782       if (stat < 0) {
783          ret = 0;
784          break;
785       }
786       if (stat == 0) {
787          break;
788       }
789       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
790       if (!update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId)) {
791          ret = 0;
792       }
793    }
794    return ret;
795 }
796
797 /*
798  * Update the bvfs fileview for given jobids
799  */
800 void
801 bvfs_update_fv_cache(JCR *jcr, BDB *mdb, char *jobids)
802 {
803    char *p;
804    JobId_t JobId;
805    Bvfs bvfs(jcr, mdb);
806
807    for (p=jobids; ; ) {
808       int stat = get_next_jobid_from_list(&p, &JobId);
809       if (stat < 0) {
810          return;
811       }
812       if (stat == 0) {
813          break;
814       }
815
816       Dmsg1(dbglevel, "Trying to create cache for %lld\n", (int64_t)JobId);
817
818       bvfs.set_jobid(JobId);
819       bvfs.fv_update_cache();
820    }
821 }
822
823 /*
824  * Update the bvfs cache for current jobids
825  */
826 void Bvfs::update_cache()
827 {
828    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
829 }
830
831 /* Change the current directory, returns true if the path exists */
832 bool Bvfs::ch_dir(const char *path)
833 {
834    pm_strcpy(db->path, path);
835    db->pnl = strlen(db->path);
836    db->bdb_lock();
837    ch_dir(db->bdb_get_path_record(jcr));
838    db->bdb_unlock();
839    return pwd_id != 0;
840 }
841
842 /*
843  * Get all file versions for a specified client
844  * TODO: Handle basejobs using different client
845  */
846 void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, const char *client)
847 {
848    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
849          (uint64_t)fnid, client);
850    char ed1[50], ed2[50];
851    POOL_MEM q;
852    if (see_copies) {
853       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
854    } else {
855       Mmsg(q, " AND Job.Type = 'B' ");
856    }
857
858    POOL_MEM query;
859
860    Mmsg(query,//    1           2              3       
861 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
862 //         4          5           6
863         "File.JobId, File.LStat, File.FileId, "
864 //         7                    8
865        "Media.VolumeName, Media.InChanger "
866 "FROM File, Job, Client, JobMedia, Media "
867 "WHERE File.FilenameId = %s "
868   "AND File.PathId=%s "
869   "AND File.JobId = Job.JobId "
870   "AND Job.JobId = JobMedia.JobId "
871   "AND File.FileIndex >= JobMedia.FirstIndex "
872   "AND File.FileIndex <= JobMedia.LastIndex "
873   "AND JobMedia.MediaId = Media.MediaId "
874   "AND Job.ClientId = Client.ClientId "
875   "AND Client.Name = '%s' "
876   "%s ORDER BY FileId LIMIT %d OFFSET %d"
877         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
878         limit, offset);
879    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
880    db->bdb_sql_query(query.c_str(), list_entries, user_data);
881 }
882
883 /*
884  * Get all volumes for a specific file
885  */
886 void Bvfs::get_volumes(FileId_t fileid)
887 {
888    Dmsg1(dbglevel, "get_volumes(%lld)\n", (uint64_t)fileid);
889
890    char ed1[50];
891    POOL_MEM query;
892
893    Mmsg(query,
894 //                                   7                8
895 "SELECT DISTINCT 'L',0,0,0,0,0,0, Media.VolumeName, Media.InChanger "
896 "FROM File JOIN JobMedia USING (JobId) JOIN Media USING (MediaId) "
897 "WHERE File.FileId = %s "
898   "AND File.FileIndex >= JobMedia.FirstIndex "
899   "AND File.FileIndex <= JobMedia.LastIndex "
900   " LIMIT %d OFFSET %d"
901         ,edit_uint64(fileid, ed1), limit, offset);
902    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
903    db->bdb_sql_query(query.c_str(), list_entries, user_data);
904 }
905
906 DBId_t Bvfs::get_root()
907 {
908    int p;
909    *db->path = 0;
910    db->bdb_lock();
911    p = db->bdb_get_path_record(jcr);
912    db->bdb_unlock();
913    return p;
914 }
915
916 static int path_handler(void *ctx, int fields, char **row)
917 {
918    Bvfs *fs = (Bvfs *) ctx;
919    return fs->_handle_path(ctx, fields, row);
920 }
921
922 int Bvfs::_handle_path(void *ctx, int fields, char **row)
923 {
924    if (bvfs_is_dir(row)) {
925       /* can have the same path 2 times */
926       if (strcmp(row[BVFS_PathId], prev_dir)) {
927          pm_strcpy(prev_dir, row[BVFS_PathId]);
928          return list_entries(user_data, fields, row);
929       }
930    }
931    return 0;
932 }
933
934 /*
935  * Retrieve . and .. information
936  */
937 void Bvfs::ls_special_dirs()
938 {
939    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
940    char ed1[50], ed2[50];
941    if (*jobids == 0) {
942       return;
943    }
944    if (!dir_filenameid) {
945       get_dir_filenameid();
946    }
947
948    /* Will fetch directories  */
949    *prev_dir = 0;
950
951    POOL_MEM query;
952    Mmsg(query,
953 "(SELECT PPathId AS PathId, '..' AS Path "
954     "FROM  PathHierarchy "
955    "WHERE  PathId = %s "
956 "UNION "
957  "SELECT %s AS PathId, '.' AS Path)",
958         edit_uint64(pwd_id, ed1), ed1);
959
960    POOL_MEM query2;
961    Mmsg(query2,// 1      2     3        4     5       6
962 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
963   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
964        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
965               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
966        "WHERE File1.FilenameId = %s "
967        "AND File1.JobId IN (%s)) AS listfile1 "
968   "ON (tmp.PathId = listfile1.PathId) "
969   "ORDER BY tmp.Path, JobId DESC ",
970         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
971
972    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
973    db->bdb_sql_query(query2.c_str(), path_handler, this);
974 }
975
976 /* Returns true if we have dirs to read */
977 bool Bvfs::ls_dirs()
978 {
979    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
980    char ed1[50], ed2[50];
981    if (*jobids == 0) {
982       return false;
983    }
984
985    POOL_MEM query;
986    POOL_MEM filter;
987    if (*pattern) {
988       Mmsg(filter, " AND Path2.Path %s '%s' ",
989            match_query[db->bdb_get_type_index()], pattern);
990
991    }
992
993    if (!dir_filenameid) {
994       get_dir_filenameid();
995    }
996
997    /* the sql query displays same directory multiple time, take the first one */
998    *prev_dir = 0;
999
1000    /* Let's retrieve the list of the visible dirs in this dir ...
1001     * First, I need the empty filenameid to locate efficiently
1002     * the dirs in the file table
1003     * my $dir_filenameid = $self->get_dir_filenameid();
1004     */
1005    /* Then we get all the dir entries from File ... */
1006    Mmsg(query,
1007 //       0     1     2   3      4     5       6
1008 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
1009     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
1010            "lower(Path1.Path) AS lpath, "
1011            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
1012            "listfile1.FileId AS FileId "
1013     "FROM ( "
1014       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
1015       "FROM PathHierarchy AS PathHierarchy1 "
1016       "JOIN Path AS Path2 "
1017         "ON (PathHierarchy1.PathId = Path2.PathId) "
1018       "JOIN PathVisibility AS PathVisibility1 "
1019         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
1020       "WHERE PathHierarchy1.PPathId = %s "
1021       "AND PathVisibility1.JobId IN (%s) "
1022            "%s "
1023      ") AS listpath1 "
1024    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
1025
1026    "LEFT JOIN ( " /* get attributes if any */
1027        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
1028               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
1029        "WHERE File1.FilenameId = %s "
1030        "AND File1.JobId IN (%s)) AS listfile1 "
1031        "ON (listpath1.PathId = listfile1.PathId) "
1032     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
1033         edit_uint64(pwd_id, ed1),
1034         jobids,
1035         filter.c_str(),
1036         edit_uint64(dir_filenameid, ed2),
1037         jobids,
1038         limit, offset);
1039
1040    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1041
1042    db->bdb_lock();
1043    db->bdb_sql_query(query.c_str(), path_handler, this);
1044    nb_record = db->sql_num_rows();
1045    db->bdb_unlock();
1046
1047    return nb_record == limit;
1048 }
1049
1050 void build_ls_files_query(BDB *db, POOL_MEM &query,
1051                           const char *JobId, const char *PathId,
1052                           const char *filter, int64_t limit, int64_t offset)
1053 {
1054    if (db->bdb_get_type_index() == SQL_TYPE_POSTGRESQL) {
1055       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1056            JobId, PathId, JobId, PathId,
1057            filter, limit, offset);
1058    } else {
1059       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1060            JobId, PathId, JobId, PathId,
1061            limit, offset, filter, JobId, JobId);
1062    }
1063 }
1064
1065 /* Returns true if we have files to read */
1066 bool Bvfs::ls_files()
1067 {
1068    POOL_MEM query;
1069    POOL_MEM filter;
1070    char pathid[50];
1071
1072    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
1073    if (*jobids == 0) {
1074       return false;
1075    }
1076
1077    if (!pwd_id) {
1078       ch_dir(get_root());
1079    }
1080
1081    edit_uint64(pwd_id, pathid);
1082    if (*pattern) {
1083       Mmsg(filter, " AND Filename.Name %s '%s' ", 
1084            match_query[db_get_type_index(db)], pattern);
1085
1086    } else if (*filename) {
1087       Mmsg(filter, " AND Filename.Name = '%s' ", filename);
1088    }
1089
1090    build_ls_files_query(db, query,
1091                         jobids, pathid, filter.c_str(),
1092                         limit, offset);
1093
1094    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1095
1096    db->bdb_lock();
1097    db->bdb_sql_query(query.c_str(), list_entries, user_data);
1098    nb_record = db->sql_num_rows();
1099    db->bdb_unlock();
1100
1101    return nb_record == limit;
1102 }
1103
1104
1105 /*
1106  * Return next Id from comma separated list
1107  *
1108  * Returns:
1109  *   1 if next Id returned
1110  *   0 if no more Ids are in list
1111  *  -1 there is an error
1112  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
1113  */
1114 static int get_next_id_from_list(char **p, int64_t *Id)
1115 {
1116    const int maxlen = 30;
1117    char id[maxlen+1];
1118    char *q = *p;
1119
1120    id[0] = 0;
1121    for (int i=0; i<maxlen; i++) {
1122       if (*q == 0) {
1123          break;
1124       } else if (*q == ',') {
1125          q++;
1126          break;
1127       }
1128       id[i] = *q++;
1129       id[i+1] = 0;
1130    }
1131    if (id[0] == 0) {
1132       return 0;
1133    } else if (!is_a_number(id)) {
1134       return -1;                      /* error */
1135    }
1136    *p = q;
1137    *Id = str_to_int64(id);
1138    return 1;
1139 }
1140
1141 static int get_path_handler(void *ctx, int fields, char **row)
1142 {
1143    POOL_MEM *buf = (POOL_MEM *) ctx;
1144    pm_strcpy(*buf, row[0]);
1145    return 0;
1146 }
1147
1148 static bool check_temp(char *output_table)
1149 {
1150    if (output_table[0] == 'b' &&
1151        output_table[1] == '2' &&
1152        is_an_integer(output_table + 2))
1153    {
1154       return true;
1155    }
1156    return false;
1157 }
1158
1159 void Bvfs::clear_cache()
1160 {
1161    db->bdb_sql_query("BEGIN",                     NULL, NULL);
1162    db->bdb_sql_query("UPDATE Job SET HasCache=0", NULL, NULL);
1163    db->bdb_sql_query("TRUNCATE PathHierarchy",    NULL, NULL);
1164    db->bdb_sql_query("TRUNCATE PathVisibility",   NULL, NULL);
1165    db->bdb_sql_query("COMMIT",                    NULL, NULL);
1166 }
1167
1168 bool Bvfs::drop_restore_list(char *output_table)
1169 {
1170    POOL_MEM query;
1171    if (check_temp(output_table)) {
1172       Mmsg(query, "DROP TABLE %s", output_table);
1173       db->bdb_sql_query(query.c_str(), NULL, NULL);
1174       return true;
1175    }
1176    return false;
1177 }
1178
1179 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
1180                                 char *output_table)
1181 {
1182    POOL_MEM query;
1183    POOL_MEM tmp, tmp2;
1184    int64_t id, jobid, prev_jobid;
1185    int num;
1186    bool init=false;
1187    bool ret=false;
1188    /* check args */
1189    if ((*fileid   && !is_a_number_list(fileid))  ||
1190        (*dirid    && !is_a_number_list(dirid))   ||
1191        (*hardlink && !is_a_number_list(hardlink))||
1192        (!*hardlink && !*fileid && !*dirid && !*hardlink))
1193    {
1194       return false;
1195    }
1196    if (!check_temp(output_table)) {
1197       return false;
1198    }
1199
1200    db->bdb_lock();
1201
1202    /* Cleanup old tables first */
1203    Mmsg(query, "DROP TABLE btemp%s", output_table);
1204    db->bdb_sql_query(query.c_str());
1205
1206    Mmsg(query, "DROP TABLE %s", output_table);
1207    db->bdb_sql_query(query.c_str());
1208
1209    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
1210
1211    if (*fileid) {               /* Select files with their direct id */
1212       init=true;
1213       Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1214                       "PathId, FileId "
1215                  "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
1216            fileid);
1217       pm_strcat(query, tmp.c_str());
1218    }
1219
1220    /* Add a directory content */
1221    while (get_next_id_from_list(&dirid, &id) == 1) {
1222       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
1223
1224       if (!db->bdb_sql_query(tmp.c_str(), get_path_handler, (void *)&tmp2)) {
1225          Dmsg0(dbglevel, "Can't search for path\n");
1226          /* print error */
1227          goto bail_out;
1228       }
1229       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
1230          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
1231                id, tmp.c_str(), tmp2.c_str());
1232          break;
1233       }
1234       /* escape % and _ for LIKE search */
1235       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
1236       char *p = tmp.c_str();
1237       for (char *s = tmp2.c_str(); *s ; s++) {
1238          if (*s == '%' || *s == '_' || *s == '\\') {
1239             *p = '\\';
1240             p++;
1241          }
1242          *p = *s;
1243          p++;
1244       }
1245       *p = '\0';
1246       tmp.strcat("%");
1247
1248       size_t len = strlen(tmp.c_str());
1249       tmp2.check_size((len+1) * 2);
1250       db->bdb_escape_string(jcr, tmp2.c_str(), tmp.c_str(), len);
1251
1252       if (init) {
1253          query.strcat(" UNION ");
1254       }
1255
1256       Mmsg(tmp, "SELECT Job.JobId, JobTDate, File.FileIndex, File.FilenameId, "
1257                         "File.PathId, FileId "
1258                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
1259                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ",
1260            tmp2.c_str(), jobids);
1261       query.strcat(tmp.c_str());
1262       init = true;
1263
1264       query.strcat(" UNION ");
1265
1266       /* A directory can have files from a BaseJob */
1267       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
1268                         "File.FilenameId, File.PathId, BaseFiles.FileId "
1269                    "FROM BaseFiles "
1270                         "JOIN File USING (FileId) "
1271                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
1272                         "JOIN Path USING (PathId) "
1273                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ",
1274            tmp2.c_str(), jobids);
1275       query.strcat(tmp.c_str());
1276    }
1277
1278    /* expect jobid,fileindex */
1279    prev_jobid=0;
1280    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
1281       if (get_next_id_from_list(&hardlink, &id) != 1) {
1282          Dmsg0(dbglevel, "hardlink should be two by two\n");
1283          goto bail_out;
1284       }
1285       if (jobid != prev_jobid) { /* new job */
1286          if (prev_jobid == 0) {  /* first jobid */
1287             if (init) {
1288                query.strcat(" UNION ");
1289             }
1290          } else {               /* end last job, start new one */
1291             tmp.strcat(") UNION ");
1292             query.strcat(tmp.c_str());
1293          }
1294          Mmsg(tmp,   "SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1295                             "PathId, FileId "
1296                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld "
1297                         "AND FileIndex IN (%lld", jobid, id);
1298          prev_jobid = jobid;
1299
1300       } else {                  /* same job, add new findex */
1301          Mmsg(tmp2, ", %lld", id);
1302          tmp.strcat(tmp2.c_str());
1303       }
1304    }
1305
1306    if (prev_jobid != 0) {       /* end last job */
1307       tmp.strcat(") ");
1308       query.strcat(tmp.c_str());
1309       init = true;
1310    }
1311
1312    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1313
1314    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1315       Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1316       goto bail_out;
1317    }
1318
1319    Mmsg(query, sql_bvfs_select[db->bdb_get_type_index()],
1320         output_table, output_table, output_table);
1321
1322    /* TODO: handle jobid filter */
1323    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1324    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1325       Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1326       goto bail_out;
1327    }
1328
1329    /* MySQL needs the index */
1330    if (db->bdb_get_type_index() == SQL_TYPE_MYSQL) {
1331       Mmsg(query, "CREATE INDEX idx_%s ON %s (JobId)",
1332            output_table, output_table);
1333       Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1334       if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1335          Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1336          goto bail_out; 
1337       } 
1338    }
1339
1340    /* Check if some FileId have DeltaSeq > 0
1341     * Foreach of them we need to get the accurate_job list, and compute
1342     * what are dependencies
1343     */
1344    Mmsg(query, 
1345         "SELECT F.FileId, F.JobId, F.FilenameId, F.PathId, F.DeltaSeq "
1346           "FROM File AS F JOIN Job USING (JobId) JOIN %s USING (FileId) "
1347          "WHERE DeltaSeq > 0", output_table);
1348
1349    if (!db->QueryDB(jcr, query.c_str())) {
1350       Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
1351    }
1352
1353    /* TODO: Use an other DB connection can avoid to copy the result of the
1354     * previous query into a temporary buffer
1355     */
1356    num = db->sql_num_rows();
1357    Dmsg2(dbglevel, "Found %d Delta parts in restore selection q=%s\n", num, query.c_str());
1358
1359    if (num > 0) {
1360       int64_t *result = (int64_t *)malloc (num * 4 * sizeof(int64_t));
1361       SQL_ROW row;
1362       int i=0;
1363
1364       while((row = db->sql_fetch_row())) {
1365          result[i++] = str_to_int64(row[0]); /* FileId */
1366          result[i++] = str_to_int64(row[1]); /* JobId */
1367          result[i++] = str_to_int64(row[2]); /* FilenameId */
1368          result[i++] = str_to_int64(row[3]); /* PathId */
1369       } 
1370
1371       i=0;
1372       while (num > 0) {
1373          insert_missing_delta(output_table, result + i);
1374          i += 4;
1375          num--;
1376       }
1377       free(result);
1378    }
1379
1380    ret = true;
1381
1382 bail_out:
1383    Mmsg(query, "DROP TABLE btemp%s", output_table);
1384    db->bdb_sql_query(query.c_str(), NULL, NULL);
1385    db->bdb_unlock();
1386    return ret;
1387 }
1388  
1389 void Bvfs::insert_missing_delta(char *output_table, int64_t *res)
1390 {
1391    char ed1[50], ed2[50];
1392    db_list_ctx lst;
1393    POOL_MEM query;
1394    JOB_DBR jr, jr2;
1395    memset(&jr, 0, sizeof(jr));
1396    memset(&jr2, 0, sizeof(jr2));
1397
1398    /* Need to limit the query to StartTime, Client/Fileset */
1399    jr2.JobId = res[1];
1400    db->bdb_get_job_record(jcr, &jr2);
1401
1402    jr.JobId = res[1];
1403    jr.ClientId = jr2.ClientId;
1404    jr.FileSetId = jr2.FileSetId;
1405    jr.JobLevel = L_INCREMENTAL;
1406    jr.StartTime = jr2.StartTime;
1407
1408    /* Get accurate jobid list */
1409    db->bdb_get_accurate_jobids(jcr, &jr, &lst);
1410
1411    Dmsg2(dbglevel_sql, "JobId list for %lld is %s\n", res[0], lst.list);
1412
1413    /* The list contains already the last DeltaSeq element, so
1414     * we don't need to select it in the next query
1415     */
1416    for (int l = strlen(lst.list); l > 0; l--) {
1417       if (lst.list[l] == ',') {
1418          lst.list[l] = '\0';
1419          break;
1420       }
1421    }
1422
1423    Dmsg1(dbglevel_sql, "JobId list after strip is %s\n", lst.list);
1424
1425    edit_int64(res[2], ed1);     /* fnid */
1426    edit_int64(res[3], ed2);     /* pathid */
1427
1428    int id=db->bdb_get_type_index();
1429    Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
1430         lst.list, ed1, ed2, 
1431         lst.list, ed1, ed2, 
1432         lst.list, lst.list);
1433
1434    Mmsg(db->cmd, "INSERT INTO %s "
1435                    "SELECT JobId, FileIndex, FileId FROM (%s) AS F1",
1436         output_table, query.c_str());
1437
1438    if (!db->bdb_sql_query(db->cmd, NULL, NULL)) {
1439       Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
1440    }
1441 }
1442  
1443 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL */