]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
349407380ce2a16bc28ba00f9443f41e791e3fe9
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2009-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20
21 #include "bacula.h"
22 #include "cats.h" 
23 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL
24 #include "lib/htable.h"
25 #include "bvfs.h"
26
27 #define dbglevel      DT_BVFS|10
28 #define dbglevel_sql  DT_SQL|15
29
30 static int result_handler(void *ctx, int fields, char **row)
31 {
32    if (fields == 4) {
33       Pmsg4(0, "%s\t%s\t%s\t%s\n",
34             row[0], row[1], row[2], row[3]);
35    } else if (fields == 5) {
36       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n",
37             row[0], row[1], row[2], row[3], row[4]);
38    } else if (fields == 6) {
39       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n",
40             row[0], row[1], row[2], row[3], row[4], row[5]);
41    } else if (fields == 7) {
42       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
43             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
44    }
45    return 0;
46 }
47
48 Bvfs::Bvfs(JCR *j, BDB *mdb) {
49    jcr = j;
50    jcr->inc_use_count();
51    db = mdb;                 /* need to inc ref count */
52    jobids = get_pool_memory(PM_NAME);
53    prev_dir = get_pool_memory(PM_NAME);
54    pattern = get_pool_memory(PM_NAME);
55    filename = get_pool_memory(PM_NAME);
56    tmp = get_pool_memory(PM_NAME);
57    escaped_list = get_pool_memory(PM_NAME);
58    *filename = *jobids = *prev_dir = *pattern = 0;
59    dir_filenameid = pwd_id = offset = 0;
60    see_copies = see_all_versions = false;
61    limit = 1000;
62    attr = new_attr(jcr);
63    list_entries = result_handler;
64    user_data = this;
65    username = NULL;
66    job_acl = client_acl = pool_acl = fileset_acl = NULL;
67 }
68
69 Bvfs::~Bvfs() {
70    free_pool_memory(jobids);
71    free_pool_memory(pattern);
72    free_pool_memory(prev_dir);
73    free_pool_memory(filename);
74    free_pool_memory(tmp);
75    free_pool_memory(escaped_list);
76    if (username) {
77       free(username);
78    }
79    free_attr(attr);
80    jcr->dec_use_count();
81 }
82
83 char *Bvfs::escape_list(alist *lst)
84 {
85    char *elt;
86    int len;
87
88    /* List is empty, reject everything */
89    if (!lst || lst->size() == 0) {
90       Mmsg(escaped_list, "''");
91       return escaped_list;
92    }
93
94    *tmp = 0;
95    *escaped_list = 0;
96
97    foreach_alist(elt, lst) {
98       if (elt && *elt) {
99          len = strlen(elt);
100          /* Escape + ' ' */
101          tmp = check_pool_memory_size(tmp, 2 * len + 2 + 2);
102
103          tmp[0] = '\'';
104          db->bdb_escape_string(jcr, tmp + 1 , elt, len);
105          pm_strcat(tmp, "'");
106
107          if (*escaped_list) {
108             pm_strcat(escaped_list, ",");
109          }
110
111          pm_strcat(escaped_list, tmp);
112       }
113    }
114    return escaped_list;
115 }
116
117 void Bvfs::filter_jobid()
118 {
119    POOL_MEM query;
120    POOL_MEM sub_where;
121    POOL_MEM sub_join;
122
123    /* No ACL, no username, no check */
124    if (!job_acl && !fileset_acl && !client_acl && !pool_acl && !username) {
125       Dmsg0(dbglevel_sql, "No ACL\n");
126       return;
127    }
128
129    if (job_acl) {
130       Mmsg(sub_where, " AND Job.Name IN (%s) ", escape_list(job_acl));
131    }
132
133    if (fileset_acl) {
134       Mmsg(query, " AND FileSet.FileSet IN (%s) ", escape_list(fileset_acl));
135       pm_strcat(sub_where, query.c_str());
136       pm_strcat(sub_join, " JOIN FileSet USING (FileSetId) ");
137    }
138
139    if (client_acl) {
140       Mmsg(query, " AND Client.Name IN (%s) ", escape_list(client_acl));
141       pm_strcat(sub_where, query.c_str());
142    }
143
144    if (pool_acl) {
145       Mmsg(query, " AND Pool.Name IN (%s) ", escape_list(pool_acl));
146       pm_strcat(sub_where, query.c_str());
147       pm_strcat(sub_join, " JOIN Pool USING (PoolId) ");
148    }
149
150    if (username) {
151       /* Query used by Bweb to filter clients, activated when using
152        * set_username()
153        */
154       Mmsg(query,
155       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
156         "JOIN (SELECT ClientId FROM client_group_member "
157         "JOIN client_group USING (client_group_id) "
158         "JOIN bweb_client_group_acl USING (client_group_id) "
159         "JOIN bweb_user USING (userid) "
160        "WHERE bweb_user.username = '%s' "
161       ") AS filter USING (ClientId) "
162         " WHERE JobId IN (%s) %s",
163            sub_join.c_str(), username, jobids, sub_where.c_str());
164
165    } else {
166       Mmsg(query,
167       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
168       " WHERE JobId IN (%s) %s",
169            sub_join.c_str(), jobids, sub_where.c_str());
170    }
171
172    db_list_ctx ctx;
173    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
174    db->bdb_sql_query(query.c_str(), db_list_handler, &ctx);
175    pm_strcpy(jobids, ctx.list);
176 }
177
178 void Bvfs::set_jobid(JobId_t id)
179 {
180    Mmsg(jobids, "%lld", (uint64_t)id);
181    filter_jobid();
182 }
183
184 void Bvfs::set_jobids(char *ids)
185 {
186    pm_strcpy(jobids, ids);
187    filter_jobid();
188 }
189
190 /*
191  * TODO: Find a way to let the user choose how he wants to display
192  * files and directories
193  */
194
195
196 /*
197  * Working Object to store PathId already seen (avoid
198  * database queries), equivalent to %cache_ppathid in perl
199  */
200
201 #define NITEMS 50000
202 class pathid_cache {
203 private:
204    hlink *nodes;
205    int nb_node;
206    int max_node;
207
208    alist *table_node;
209
210    htable *cache_ppathid;
211
212 public:
213    pathid_cache() {
214       hlink link;
215       cache_ppathid = (htable *)malloc(sizeof(htable));
216       cache_ppathid->init(&link, &link, NITEMS);
217       max_node = NITEMS;
218       nodes = (hlink *) malloc(max_node * sizeof (hlink));
219       nb_node = 0;
220       table_node = New(alist(5, owned_by_alist));
221       table_node->append(nodes);
222    }
223
224    hlink *get_hlink() {
225       if (++nb_node >= max_node) {
226          nb_node = 0;
227          nodes = (hlink *)malloc(max_node * sizeof(hlink));
228          table_node->append(nodes);
229       }
230       return nodes + nb_node;
231    }
232
233    bool lookup(char *pathid) {
234       bool ret = cache_ppathid->lookup(pathid) != NULL;
235       return ret;
236    }
237
238    void insert(char *pathid) {
239       hlink *h = get_hlink();
240       cache_ppathid->insert(pathid, h);
241    }
242
243    ~pathid_cache() {
244       cache_ppathid->destroy();
245       free(cache_ppathid);
246       delete table_node;
247    }
248 private:
249    pathid_cache(const pathid_cache &); /* prohibit pass by value */
250    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
251 } ;
252
253 /* Return the parent_dir with the trailing /  (update the given string)
254  * TODO: see in the rest of bacula if we don't have already this function
255  * dir=/tmp/toto/
256  * dir=/tmp/
257  * dir=/
258  * dir=
259  */
260 char *bvfs_parent_dir(char *path)
261 {
262    char *p = path;
263    int len = strlen(path) - 1;
264
265    /* windows directory / */
266    if (len == 2 && B_ISALPHA(path[0])
267                 && path[1] == ':'
268                 && path[2] == '/')
269    {
270       len = 0;
271       path[0] = '\0';
272    }
273
274    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
275       path[len] = '\0';
276    }
277
278    if (len > 0) {
279       p += len;
280       while (p > path && !IsPathSeparator(*p)) {
281          p--;
282       }
283       p[1] = '\0';
284    }
285    return path;
286 }
287
288 /* Return the basename of the with the trailing /
289  * TODO: see in the rest of bacula if we don't have
290  * this function already
291  */
292 char *bvfs_basename_dir(char *path)
293 {
294    char *p = path;
295    int len = strlen(path) - 1;
296
297    if (path[len] == '/') {      /* if directory, skip last / */
298       len -= 1;
299    }
300
301    if (len > 0) {
302       p += len;
303       while (p > path && !IsPathSeparator(*p)) {
304          p--;
305       }
306       if (*p == '/') {
307          p++;                  /* skip first / */
308       }
309    }
310    return p;
311 }
312
313 static void build_path_hierarchy(JCR *jcr, BDB *mdb,
314                                  pathid_cache &ppathid_cache,
315                                  char *org_pathid, char *path)
316 {
317    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
318    char pathid[50];
319    ATTR_DBR parent;
320    char *bkp = mdb->path;
321    strncpy(pathid, org_pathid, sizeof(pathid));
322
323    /* Does the ppathid exist for this ? we use a memory cache...  In order to
324     * avoid the full loop, we consider that if a dir is allready in the
325     * PathHierarchy table, then there is no need to calculate all the
326     * hierarchy
327     */
328    while (path && *path)
329    {
330       if (!ppathid_cache.lookup(pathid))
331       {
332          Mmsg(mdb->cmd,
333               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
334               pathid);
335
336          if (!mdb->QueryDB(jcr, mdb->cmd)) {
337             goto bail_out;      /* Query failed, just leave */
338          }
339
340          /* Do we have a result ? */
341          if (mdb->sql_num_rows() > 0) {
342             ppathid_cache.insert(pathid);
343             /* This dir was in the db ...
344              * It means we can leave, the tree has allready been built for
345              * this dir
346              */
347             goto bail_out;
348          } else {
349             /* search or create parent PathId in Path table */
350             mdb->path = bvfs_parent_dir(path);
351             mdb->pnl = strlen(mdb->path);
352             if (!mdb->bdb_create_path_record(jcr, &parent)) {
353                goto bail_out;
354             }
355             ppathid_cache.insert(pathid);
356
357             Mmsg(mdb->cmd,
358                  "INSERT INTO PathHierarchy (PathId, PPathId) "
359                  "VALUES (%s,%lld)",
360                  pathid, (uint64_t) parent.PathId);
361
362             if (!mdb->InsertDB(jcr, mdb->cmd)) {
363                goto bail_out;   /* Can't insert the record, just leave */
364             }
365
366             edit_uint64(parent.PathId, pathid);
367             path = mdb->path;   /* already done */
368          }
369       } else {
370          /* It's already in the cache.  We can leave, no time to waste here,
371           * all the parent dirs have allready been done
372           */
373          goto bail_out;
374       }
375    }
376
377 bail_out:
378    mdb->path = bkp;
379    mdb->fnl = 0;
380 }
381
382 /*
383  * Internal function to update path_hierarchy cache with a shared pathid cache
384  * return Error 0
385  *        OK    1
386  */
387 static int update_path_hierarchy_cache(JCR *jcr,
388                                         BDB *mdb,
389                                         pathid_cache &ppathid_cache,
390                                         JobId_t JobId)
391 {
392    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
393    uint32_t ret=0;
394    uint32_t num;
395    char jobid[50];
396    edit_uint64(JobId, jobid);
397
398    mdb->bdb_lock();
399
400    /* We don't really want to harm users with spurious messages,
401     * everything is handled by transaction
402     */
403    mdb->set_use_fatal_jmsg(false);
404
405    mdb->bdb_start_transaction(jcr);
406
407    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
408
409    if (!mdb->QueryDB(jcr, mdb->cmd) || mdb->sql_num_rows() > 0) {
410       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
411       ret = 1;
412       goto bail_out;
413    }
414
415    /* Inserting path records for JobId */
416    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
417                    "SELECT DISTINCT PathId, JobId "
418                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
419                            "UNION "
420                            "SELECT PathId, BaseFiles.JobId "
421                              "FROM BaseFiles JOIN File AS F USING (FileId) "
422                             "WHERE BaseFiles.JobId = %s) AS B",
423         jobid, jobid);
424
425    if (!mdb->QueryDB(jcr, mdb->cmd)) {
426       Dmsg1(dbglevel, "Can't fill PathVisibility %d\n", (uint32_t)JobId );
427       goto bail_out;
428    }
429
430    /* Now we have to do the directory recursion stuff to determine missing
431     * visibility We try to avoid recursion, to be as fast as possible We also
432     * only work on not allready hierarchised directories...
433     */
434    Mmsg(mdb->cmd,
435      "SELECT PathVisibility.PathId, Path "
436        "FROM PathVisibility "
437             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
438             "LEFT JOIN PathHierarchy "
439          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
440       "WHERE PathVisibility.JobId = %s "
441         "AND PathHierarchy.PathId IS NULL "
442       "ORDER BY Path", jobid);
443    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
444
445    if (!mdb->QueryDB(jcr, mdb->cmd)) {
446       Dmsg1(dbglevel, "Can't get new Path %d\n", (uint32_t)JobId );
447       goto bail_out;
448    }
449
450    /* TODO: I need to reuse the DB connection without emptying the result
451     * So, now i'm copying the result in memory to be able to query the
452     * catalog descriptor again.
453     */
454    num = mdb->sql_num_rows();
455    if (num > 0) {
456       char **result = (char **)malloc (num * 2 * sizeof(char *));
457
458       SQL_ROW row;
459       int i=0;
460       while((row = mdb->sql_fetch_row())) {
461          result[i++] = bstrdup(row[0]);
462          result[i++] = bstrdup(row[1]);
463       }
464
465       i=0;
466       while (num > 0) {
467          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
468          free(result[i++]);
469          free(result[i++]);
470          num--;
471       }
472       free(result);
473    }
474
475    if (mdb->bdb_get_type_index() == SQL_TYPE_SQLITE3) {
476       Mmsg(mdb->cmd,
477  "INSERT INTO PathVisibility (PathId, JobId) "
478    "SELECT DISTINCT h.PPathId AS PathId, %s "
479      "FROM PathHierarchy AS h "
480     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
481       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
482            jobid, jobid, jobid );
483
484    }  else if (mdb->bdb_get_type_index() == SQL_TYPE_MYSQL) {
485       Mmsg(mdb->cmd,
486   "INSERT INTO PathVisibility (PathId, JobId)  "
487    "SELECT a.PathId,%s "
488    "FROM ( "
489      "SELECT DISTINCT h.PPathId AS PathId "
490        "FROM PathHierarchy AS h "
491        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
492       "WHERE p.JobId=%s) AS a "
493       "LEFT JOIN PathVisibility AS b ON (b.JobId=%s and a.PathId = b.PathId) "
494       "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
495
496    } else {                     /* TODO: Test the MYSQL Query with PostgreSQL */
497       Mmsg(mdb->cmd,
498   "INSERT INTO PathVisibility (PathId, JobId)  "
499    "SELECT a.PathId,%s "
500    "FROM ( "
501      "SELECT DISTINCT h.PPathId AS PathId "
502        "FROM PathHierarchy AS h "
503        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
504       "WHERE p.JobId=%s) AS a LEFT JOIN "
505        "(SELECT PathId "
506           "FROM PathVisibility "
507          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
508    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
509    }
510
511    do {
512       ret = mdb->QueryDB(jcr, mdb->cmd);
513    } while (ret && mdb->sql_affected_rows() > 0);
514
515    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
516    ret = mdb->UpdateDB(jcr, mdb->cmd);
517
518 bail_out:
519    mdb->bdb_end_transaction(jcr);
520
521    if (!ret) {
522       Mmsg(mdb->cmd, "SELECT HasCache FROM Job WHERE JobId=%s", jobid);
523       mdb->bdb_sql_query(mdb->cmd, db_int_handler, &ret);
524    }
525
526    /* Enable back the FATAL message if something is wrong */
527    mdb->set_use_fatal_jmsg(true);
528
529    mdb->bdb_unlock();
530    return ret;
531 }
532
533 /* 
534  * Find an store the filename descriptor for empty directories Filename.Name=''
535  */
536 DBId_t Bvfs::get_dir_filenameid()
537 {
538    uint32_t id;
539    if (dir_filenameid) {
540       return dir_filenameid;
541    }
542    Mmsg(db->cmd, "SELECT FilenameId FROM Filename WHERE Name = ''");
543    db_sql_query(db, db->cmd, db_int_handler, &id);
544    dir_filenameid = id;
545    return dir_filenameid;
546 }
547
548 /* Compute the cache for the bfileview compoment */
549 void Bvfs::fv_update_cache()
550 {
551    int64_t pathid;
552    int64_t size=0, count=0;
553
554    Dmsg0(dbglevel, "fv_update_cache()\n");
555
556    if (!*jobids) {
557       return;                   /* Nothing to build */
558    }
559
560    db->bdb_lock();
561    /* We don't really want to harm users with spurious messages,
562     * everything is handled by transaction
563     */
564    db->set_use_fatal_jmsg(false);
565
566    db->bdb_start_transaction(jcr);
567
568    pathid = get_root();
569
570    fv_compute_size_and_count(pathid, &size, &count);
571
572    db->bdb_end_transaction(jcr);
573
574    /* Enable back the FATAL message if something is wrong */
575    db->set_use_fatal_jmsg(true);
576
577    db->bdb_unlock();
578 }
579
580 /* Not yet working */
581 void Bvfs::fv_get_big_files(int64_t pathid, int64_t min_size, int32_t limit)
582 {
583    Mmsg(db->cmd, 
584         "SELECT FilenameId AS filenameid, Name AS name, size "
585           "FROM ( "
586          "SELECT FilenameId, base64_decode_lstat(8,LStat) AS size "
587            "FROM File "
588           "WHERE PathId  = %lld "
589             "AND JobId = %s "
590         ") AS S INNER JOIN Filename USING (FilenameId) "
591      "WHERE S.size > %lld "
592      "ORDER BY S.size DESC "
593      "LIMIT %d ", pathid, jobids, min_size, limit);
594 }
595
596 /* Get the current path size and files count */
597 void Bvfs::fv_get_current_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
598 {
599    SQL_ROW row;
600
601    *size = *count = 0;
602
603    Mmsg(db->cmd,
604  "SELECT Size AS size, Files AS files "
605   " FROM PathVisibility "
606  " WHERE PathId = %lld "
607    " AND JobId = %s ", pathid, jobids);
608
609    if (!db->QueryDB(jcr, db->cmd)) {
610       return;
611    }
612
613    if ((row = db->sql_fetch_row())) {
614       *size = str_to_int64(row[0]);
615       *count =  str_to_int64(row[1]);
616    }
617 }
618
619 /* Compute for the current path the size and files count */
620 void Bvfs::fv_get_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
621 {
622    SQL_ROW row;
623
624    *size = *count = 0;
625
626    Mmsg(db->cmd,
627  "SELECT sum(base64_decode_lstat(8,LStat)) AS size, count(1) AS files "
628   " FROM File "
629  " WHERE PathId = %lld "
630    " AND JobId = %s ", pathid, jobids);
631
632    if (!db->QueryDB(jcr, db->cmd)) {
633       return;
634    }
635
636    if ((row = db->sql_fetch_row())) {
637       *size = str_to_int64(row[0]);
638       *count =  str_to_int64(row[1]);
639    }
640 }
641
642 void Bvfs::fv_compute_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
643 {
644    Dmsg1(dbglevel, "fv_compute_size_and_count(%lld)\n", pathid);
645
646    fv_get_current_size_and_count(pathid, size, count);
647    if (*size > 0) {
648       return;
649    }
650
651    /* Update stats for the current directory */
652    fv_get_size_and_count(pathid, size, count);
653
654    /* Update stats for all sub directories */
655    Mmsg(db->cmd,
656         " SELECT PathId "
657           " FROM PathVisibility "
658                " INNER JOIN PathHierarchy USING (PathId) "
659          " WHERE PPathId  = %lld "
660            " AND JobId = %s ", pathid, jobids);
661
662    db->QueryDB(jcr, db->cmd);
663    int num = db->sql_num_rows();
664
665    if (num > 0) {
666       int64_t *result = (int64_t *)malloc (num * sizeof(int64_t));
667       SQL_ROW row;
668       int i=0;
669
670       while((row = db->sql_fetch_row())) {
671          result[i++] = str_to_int64(row[0]); /* PathId */
672       }
673
674       i=0;
675       while (num > 0) {
676          int64_t c=0, s=0;
677          fv_compute_size_and_count(result[i], &s, &c);
678          *size += s;
679          *count += c;
680
681          i++;
682          num--;
683       }
684       free(result);
685    }
686
687    fv_update_size_and_count(pathid, *size, *count);
688 }
689
690 void Bvfs::fv_update_size_and_count(int64_t pathid, int64_t size, int64_t count)
691 {
692    Mmsg(db->cmd,
693         "UPDATE PathVisibility SET Files = %lld, Size = %lld "
694         " WHERE JobId = %s "
695         " AND PathId = %lld ", count, size, jobids, pathid);
696
697    db->UpdateDB(jcr, db->cmd);
698 }
699
700 void bvfs_update_cache(JCR *jcr, BDB *mdb)
701 {
702    uint32_t nb=0;
703    db_list_ctx jobids_list;
704
705    mdb->bdb_lock();
706
707 #ifdef xxx
708    /* TODO: Remove this code when updating make_bacula_table script */
709    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
710    if (!mdb->QueryDB(jcr, mdb->cmd)) {
711       Dmsg0(dbglevel, "Creating cache table\n");
712       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
713       mdb->QueryDB(jcr, mdb->cmd);
714
715       Mmsg(mdb->cmd,
716            "CREATE TABLE PathHierarchy ( "
717            "PathId integer NOT NULL, "
718            "PPathId integer NOT NULL, "
719            "CONSTRAINT pathhierarchy_pkey "
720            "PRIMARY KEY (PathId))");
721       mdb->QueryDB(jcr, mdb->cmd);
722
723       Mmsg(mdb->cmd,
724            "CREATE INDEX pathhierarchy_ppathid "
725            "ON PathHierarchy (PPathId)");
726       mdb->QueryDB(jcr, mdb->cmd);
727
728       Mmsg(mdb->cmd,
729            "CREATE TABLE PathVisibility ("
730            "PathId integer NOT NULL, "
731            "JobId integer NOT NULL, "
732            "Size int8 DEFAULT 0, "
733            "Files int4 DEFAULT 0, "
734            "CONSTRAINT pathvisibility_pkey "
735            "PRIMARY KEY (JobId, PathId))");
736       mdb->QueryDB(jcr, mdb->cmd);
737
738       Mmsg(mdb->cmd,
739            "CREATE INDEX pathvisibility_jobid "
740            "ON PathVisibility (JobId)");
741       mdb->QueryDB(jcr, mdb->cmd);
742
743    }
744 #endif
745
746    Mmsg(mdb->cmd,
747  "SELECT JobId from Job "
748   "WHERE HasCache = 0 "
749     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
750   "ORDER BY JobId");
751
752    mdb->bdb_sql_query(mdb->cmd, db_list_handler, &jobids_list);
753
754    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
755
756    mdb->bdb_start_transaction(jcr);
757    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
758    Mmsg(mdb->cmd,
759         "DELETE FROM PathVisibility "
760          "WHERE NOT EXISTS "
761         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
762    nb = mdb->DeleteDB(jcr, mdb->cmd);
763    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
764
765    mdb->bdb_end_transaction(jcr);
766    mdb->bdb_unlock();
767 }
768
769 /*
770  * Update the bvfs cache for given jobids (1,2,3,4)
771  */
772 int
773 bvfs_update_path_hierarchy_cache(JCR *jcr, BDB *mdb, char *jobids)
774 {
775    pathid_cache ppathid_cache;
776    JobId_t JobId;
777    char *p;
778    int ret=1;
779
780    for (p=jobids; ; ) {
781       int stat = get_next_jobid_from_list(&p, &JobId);
782       if (stat < 0) {
783          ret = 0;
784          break;
785       }
786       if (stat == 0) {
787          break;
788       }
789       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
790       if (!update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId)) {
791          ret = 0;
792       }
793    }
794    return ret;
795 }
796
797 /*
798  * Update the bvfs fileview for given jobids
799  */
800 void
801 bvfs_update_fv_cache(JCR *jcr, BDB *mdb, char *jobids)
802 {
803    char *p;
804    JobId_t JobId;
805    Bvfs bvfs(jcr, mdb);
806
807    for (p=jobids; ; ) {
808       int stat = get_next_jobid_from_list(&p, &JobId);
809       if (stat < 0) {
810          return;
811       }
812       if (stat == 0) {
813          break;
814       }
815
816       Dmsg1(dbglevel, "Trying to create cache for %lld\n", (int64_t)JobId);
817
818       bvfs.set_jobid(JobId);
819       bvfs.fv_update_cache();
820    }
821 }
822
823 /*
824  * Update the bvfs cache for current jobids
825  */
826 void Bvfs::update_cache()
827 {
828    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
829 }
830
831 /* Change the current directory, returns true if the path exists */
832 bool Bvfs::ch_dir(const char *path)
833 {
834    pm_strcpy(db->path, path);
835    db->pnl = strlen(db->path);
836    db->bdb_lock();
837    ch_dir(db->bdb_get_path_record(jcr));
838    db->bdb_unlock();
839    return pwd_id != 0;
840 }
841
842 /*
843  * Get all file versions for a specified client
844  * TODO: Handle basejobs using different client
845  */
846 void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, const char *client)
847 {
848    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
849          (uint64_t)fnid, client);
850    char ed1[50], ed2[50];
851    POOL_MEM q;
852    if (see_copies) {
853       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
854    } else {
855       Mmsg(q, " AND Job.Type = 'B' ");
856    }
857
858    POOL_MEM query;
859
860    Mmsg(query,//    1           2              3       
861 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
862 //         4          5           6
863         "File.JobId, File.LStat, File.FileId, "
864 //         7                    8
865        "Media.VolumeName, Media.InChanger "
866 "FROM File, Job, Client, JobMedia, Media "
867 "WHERE File.FilenameId = %s "
868   "AND File.PathId=%s "
869   "AND File.JobId = Job.JobId "
870   "AND Job.JobId = JobMedia.JobId "
871   "AND File.FileIndex >= JobMedia.FirstIndex "
872   "AND File.FileIndex <= JobMedia.LastIndex "
873   "AND JobMedia.MediaId = Media.MediaId "
874   "AND Job.ClientId = Client.ClientId "
875   "AND Client.Name = '%s' "
876   "%s ORDER BY FileId LIMIT %d OFFSET %d"
877         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
878         limit, offset);
879    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
880    db->bdb_sql_query(query.c_str(), list_entries, user_data);
881 }
882
883 /*
884  * Get all volumes for a specific file
885  */
886 void Bvfs::get_volumes(FileId_t fileid)
887 {
888    Dmsg1(dbglevel, "get_volumes(%lld)\n", (uint64_t)fileid);
889
890    char ed1[50];
891    POOL_MEM query;
892
893    Mmsg(query,
894 //                                   7                8
895 "SELECT DISTINCT 'L',0,0,0,0,0,0, Media.VolumeName, Media.InChanger "
896 "FROM File JOIN JobMedia USING (JobId) JOIN Media USING (MediaId) "
897 "WHERE File.FileId = %s "
898   "AND File.FileIndex >= JobMedia.FirstIndex "
899   "AND File.FileIndex <= JobMedia.LastIndex "
900   " LIMIT %d OFFSET %d"
901         ,edit_uint64(fileid, ed1), limit, offset);
902    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
903    db->bdb_sql_query(query.c_str(), list_entries, user_data);
904 }
905
906 DBId_t Bvfs::get_root()
907 {
908    int p;
909    *db->path = 0;
910    db->bdb_lock();
911    p = db->bdb_get_path_record(jcr);
912    db->bdb_unlock();
913    return p;
914 }
915
916 static int path_handler(void *ctx, int fields, char **row)
917 {
918    Bvfs *fs = (Bvfs *) ctx;
919    return fs->_handle_path(ctx, fields, row);
920 }
921
922 int Bvfs::_handle_path(void *ctx, int fields, char **row)
923 {
924    if (bvfs_is_dir(row)) {
925       /* can have the same path 2 times */
926       if (strcmp(row[BVFS_PathId], prev_dir)) {
927          pm_strcpy(prev_dir, row[BVFS_PathId]);
928          return list_entries(user_data, fields, row);
929       }
930    }
931    return 0;
932 }
933
934 /*
935  * Retrieve . and .. information
936  */
937 void Bvfs::ls_special_dirs()
938 {
939    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
940    char ed1[50], ed2[50];
941    if (*jobids == 0) {
942       return;
943    }
944    if (!dir_filenameid) {
945       get_dir_filenameid();
946    }
947
948    /* Will fetch directories  */
949    *prev_dir = 0;
950
951    POOL_MEM query;
952    Mmsg(query,
953 "(SELECT PPathId AS PathId, '..' AS Path "
954     "FROM  PathHierarchy "
955    "WHERE  PathId = %s "
956 "UNION "
957  "SELECT %s AS PathId, '.' AS Path)",
958         edit_uint64(pwd_id, ed1), ed1);
959
960    POOL_MEM query2;
961    Mmsg(query2,// 1      2     3        4     5       6
962 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
963   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
964        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
965               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
966        "WHERE File1.FilenameId = %s "
967        "AND File1.JobId IN (%s)) AS listfile1 "
968   "ON (tmp.PathId = listfile1.PathId) "
969   "ORDER BY tmp.Path, JobId DESC ",
970         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
971
972    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
973    db->bdb_sql_query(query2.c_str(), path_handler, this);
974 }
975
976 /* Returns true if we have dirs to read */
977 bool Bvfs::ls_dirs()
978 {
979    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
980    char ed1[50], ed2[50];
981    if (*jobids == 0) {
982       return false;
983    }
984
985    POOL_MEM query;
986    POOL_MEM filter;
987    if (*pattern) {
988       Mmsg(filter, " AND Path2.Path %s '%s' ",
989            match_query[db->bdb_get_type_index()], pattern);
990
991    }
992
993    if (!dir_filenameid) {
994       get_dir_filenameid();
995    }
996
997    /* the sql query displays same directory multiple time, take the first one */
998    *prev_dir = 0;
999
1000    /* Let's retrieve the list of the visible dirs in this dir ...
1001     * First, I need the empty filenameid to locate efficiently
1002     * the dirs in the file table
1003     * my $dir_filenameid = $self->get_dir_filenameid();
1004     */
1005    /* Then we get all the dir entries from File ... */
1006    Mmsg(query,
1007 //       0     1     2   3      4     5       6
1008 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
1009     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
1010            "lower(Path1.Path) AS lpath, "
1011            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
1012            "listfile1.FileId AS FileId "
1013     "FROM ( "
1014       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
1015       "FROM PathHierarchy AS PathHierarchy1 "
1016       "JOIN Path AS Path2 "
1017         "ON (PathHierarchy1.PathId = Path2.PathId) "
1018       "JOIN PathVisibility AS PathVisibility1 "
1019         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
1020       "WHERE PathHierarchy1.PPathId = %s "
1021       "AND PathVisibility1.JobId IN (%s) "
1022            "%s "
1023      ") AS listpath1 "
1024    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
1025
1026    "LEFT JOIN ( " /* get attributes if any */
1027        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
1028               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
1029        "WHERE File1.FilenameId = %s "
1030        "AND File1.JobId IN (%s)) AS listfile1 "
1031        "ON (listpath1.PathId = listfile1.PathId) "
1032     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
1033         edit_uint64(pwd_id, ed1),
1034         jobids,
1035         filter.c_str(),
1036         edit_uint64(dir_filenameid, ed2),
1037         jobids,
1038         limit, offset);
1039
1040    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1041
1042    db->bdb_lock();
1043    db->bdb_sql_query(query.c_str(), path_handler, this);
1044    nb_record = db->sql_num_rows();
1045    db->bdb_unlock();
1046
1047    return nb_record == limit;
1048 }
1049
1050 void build_ls_files_query(BDB *db, POOL_MEM &query,
1051                           const char *JobId, const char *PathId,
1052                           const char *filter, int64_t limit, int64_t offset)
1053 {
1054    if (db->bdb_get_type_index() == SQL_TYPE_POSTGRESQL) {
1055       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1056            JobId, PathId, JobId, PathId,
1057            filter, limit, offset);
1058    } else {
1059       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1060            JobId, PathId, JobId, PathId,
1061            limit, offset, filter, JobId, JobId);
1062    }
1063 }
1064
1065 /* Returns true if we have files to read */
1066 bool Bvfs::ls_files()
1067 {
1068    POOL_MEM query;
1069    POOL_MEM filter;
1070    char pathid[50];
1071
1072    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
1073    if (*jobids == 0) {
1074       return false;
1075    }
1076
1077    if (!pwd_id) {
1078       ch_dir(get_root());
1079    }
1080
1081    edit_uint64(pwd_id, pathid);
1082    if (*pattern) {
1083       Mmsg(filter, " AND Filename.Name %s '%s' ", 
1084            match_query[db_get_type_index(db)], pattern);
1085
1086    } else if (*filename) {
1087       Mmsg(filter, " AND Filename.Name = '%s' ", filename);
1088    }
1089
1090    build_ls_files_query(db, query,
1091                         jobids, pathid, filter.c_str(),
1092                         limit, offset);
1093
1094    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1095
1096    db->bdb_lock();
1097    db->bdb_sql_query(query.c_str(), list_entries, user_data);
1098    nb_record = db->sql_num_rows();
1099    db->bdb_unlock();
1100
1101    return nb_record == limit;
1102 }
1103
1104
1105 /*
1106  * Return next Id from comma separated list
1107  *
1108  * Returns:
1109  *   1 if next Id returned
1110  *   0 if no more Ids are in list
1111  *  -1 there is an error
1112  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
1113  */
1114 static int get_next_id_from_list(char **p, int64_t *Id)
1115 {
1116    const int maxlen = 30;
1117    char id[maxlen+1];
1118    char *q = *p;
1119
1120    id[0] = 0;
1121    for (int i=0; i<maxlen; i++) {
1122       if (*q == 0) {
1123          break;
1124       } else if (*q == ',') {
1125          q++;
1126          break;
1127       }
1128       id[i] = *q++;
1129       id[i+1] = 0;
1130    }
1131    if (id[0] == 0) {
1132       return 0;
1133    } else if (!is_a_number(id)) {
1134       return -1;                      /* error */
1135    }
1136    *p = q;
1137    *Id = str_to_int64(id);
1138    return 1;
1139 }
1140
1141 static int get_path_handler(void *ctx, int fields, char **row)
1142 {
1143    POOL_MEM *buf = (POOL_MEM *) ctx;
1144    pm_strcpy(*buf, row[0]);
1145    return 0;
1146 }
1147
1148 static bool check_temp(char *output_table)
1149 {
1150    if (output_table[0] == 'b' &&
1151        output_table[1] == '2' &&
1152        is_an_integer(output_table + 2))
1153    {
1154       return true;
1155    }
1156    return false;
1157 }
1158
1159 void Bvfs::clear_cache()
1160 {
1161    db->bdb_sql_query("BEGIN",                     NULL, NULL);
1162    db->bdb_sql_query("UPDATE Job SET HasCache=0", NULL, NULL);
1163    db->bdb_sql_query("TRUNCATE PathHierarchy",    NULL, NULL);
1164    db->bdb_sql_query("TRUNCATE PathVisibility",   NULL, NULL);
1165    db->bdb_sql_query("COMMIT",                    NULL, NULL);
1166 }
1167
1168 bool Bvfs::drop_restore_list(char *output_table)
1169 {
1170    POOL_MEM query;
1171    if (check_temp(output_table)) {
1172       Mmsg(query, "DROP TABLE %s", output_table);
1173       db->bdb_sql_query(query.c_str(), NULL, NULL);
1174       return true;
1175    }
1176    return false;
1177 }
1178
1179 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
1180                                 char *output_table)
1181 {
1182    POOL_MEM query;
1183    POOL_MEM tmp, tmp2;
1184    int64_t id, jobid, prev_jobid;
1185    int num;
1186    bool init=false;
1187    bool ret=false;
1188    /* check args */
1189    if ((*fileid   && !is_a_number_list(fileid))  ||
1190        (*dirid    && !is_a_number_list(dirid))   ||
1191        (*hardlink && !is_a_number_list(hardlink))||
1192        (!*hardlink && !*fileid && !*dirid && !*hardlink))
1193    {
1194       return false;
1195    }
1196    if (!check_temp(output_table)) {
1197       return false;
1198    }
1199
1200    db->bdb_lock();
1201
1202    /* Cleanup old tables first */
1203    Mmsg(query, "DROP TABLE btemp%s", output_table);
1204    db->bdb_sql_query(query.c_str());
1205
1206    Mmsg(query, "DROP TABLE %s", output_table);
1207    db->bdb_sql_query(query.c_str());
1208
1209    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
1210
1211    if (*fileid) {               /* Select files with their direct id */
1212       init=true;
1213       Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1214                       "PathId, FileId "
1215                  "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
1216            fileid);
1217       pm_strcat(query, tmp.c_str());
1218    }
1219
1220    /* Add a directory content */
1221    while (get_next_id_from_list(&dirid, &id) == 1) {
1222       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
1223
1224       if (!db->bdb_sql_query(tmp.c_str(), get_path_handler, (void *)&tmp2)) {
1225          Dmsg0(dbglevel, "Can't search for path\n");
1226          /* print error */
1227          goto bail_out;
1228       }
1229       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
1230          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
1231                id, tmp.c_str(), tmp2.c_str());
1232          break;
1233       }
1234       /* escape % and _ for LIKE search */
1235       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
1236       char *p = tmp.c_str();
1237       for (char *s = tmp2.c_str(); *s ; s++) {
1238          if (*s == '%' || *s == '_' || *s == '\\') {
1239             *p = '\\';
1240             p++;
1241          }
1242          *p = *s;
1243          p++;
1244       }
1245       *p = '\0';
1246       tmp.strcat("%");
1247
1248       size_t len = strlen(tmp.c_str());
1249       tmp2.check_size((len+1) * 2);
1250       db->bdb_escape_string(jcr, tmp2.c_str(), tmp.c_str(), len);
1251
1252       if (init) {
1253          query.strcat(" UNION ");
1254       }
1255
1256       Mmsg(tmp, "SELECT Job.JobId, JobTDate, File.FileIndex, File.FilenameId, "
1257                         "File.PathId, FileId "
1258                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
1259                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ",
1260            tmp2.c_str(), jobids);
1261       query.strcat(tmp.c_str());
1262       init = true;
1263
1264       query.strcat(" UNION ");
1265
1266       /* A directory can have files from a BaseJob */
1267       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
1268                         "File.FilenameId, File.PathId, BaseFiles.FileId "
1269                    "FROM BaseFiles "
1270                         "JOIN File USING (FileId) "
1271                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
1272                         "JOIN Path USING (PathId) "
1273                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ",
1274            tmp2.c_str(), jobids);
1275       query.strcat(tmp.c_str());
1276    }
1277
1278    /* expect jobid,fileindex */
1279    prev_jobid=0;
1280    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
1281       if (get_next_id_from_list(&hardlink, &id) != 1) {
1282          Dmsg0(dbglevel, "hardlink should be two by two\n");
1283          goto bail_out;
1284       }
1285       if (jobid != prev_jobid) { /* new job */
1286          if (prev_jobid == 0) {  /* first jobid */
1287             if (init) {
1288                query.strcat(" UNION ");
1289             }
1290          } else {               /* end last job, start new one */
1291             tmp.strcat(") UNION ");
1292             query.strcat(tmp.c_str());
1293          }
1294          Mmsg(tmp,   "SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1295                             "PathId, FileId "
1296                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld "
1297                         "AND FileIndex IN (%lld", jobid, id);
1298          prev_jobid = jobid;
1299
1300       } else {                  /* same job, add new findex */
1301          Mmsg(tmp2, ", %lld", id);
1302          tmp.strcat(tmp2.c_str());
1303       }
1304    }
1305
1306    if (prev_jobid != 0) {       /* end last job */
1307       tmp.strcat(") ");
1308       query.strcat(tmp.c_str());
1309       init = true;
1310    }
1311
1312    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1313
1314    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1315       Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1316       goto bail_out;
1317    }
1318
1319    Mmsg(query, sql_bvfs_select[db->bdb_get_type_index()],
1320         output_table, output_table, output_table);
1321
1322    /* TODO: handle jobid filter */
1323    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1324    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1325       Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1326       goto bail_out;
1327    }
1328
1329    /* MySQL needs the index */
1330    if (db->bdb_get_type_index() == SQL_TYPE_MYSQL) {
1331       Mmsg(query, "CREATE INDEX idx_%s ON %s (JobId)",
1332            output_table, output_table);
1333       Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1334       if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1335          Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1336          goto bail_out; 
1337       } 
1338    }
1339
1340    /* Check if some FileId have DeltaSeq > 0
1341     * Foreach of them we need to get the accurate_job list, and compute
1342     * what are dependencies
1343     */
1344    Mmsg(query, 
1345         "SELECT F.FileId, F.JobId, F.FilenameId, F.PathId, F.DeltaSeq "
1346           "FROM File AS F JOIN Job USING (JobId) JOIN %s USING (FileId) "
1347          "WHERE DeltaSeq > 0", output_table);
1348
1349    if (!db->QueryDB(jcr, query.c_str())) {
1350       Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
1351    }
1352
1353    /* TODO: Use an other DB connection can avoid to copy the result of the
1354     * previous query into a temporary buffer
1355     */
1356    num = db->sql_num_rows();
1357    Dmsg2(dbglevel, "Found %d Delta parts in restore selection q=%s\n", num, query.c_str());
1358
1359    if (num > 0) {
1360       int64_t *result = (int64_t *)malloc (num * 4 * sizeof(int64_t));
1361       SQL_ROW row;
1362       int i=0;
1363
1364       while((row = db->sql_fetch_row())) {
1365          result[i++] = str_to_int64(row[0]); /* FileId */
1366          result[i++] = str_to_int64(row[1]); /* JobId */
1367          result[i++] = str_to_int64(row[2]); /* FilenameId */
1368          result[i++] = str_to_int64(row[3]); /* PathId */
1369       }
1370
1371       i=0;
1372       while (num > 0) {
1373          insert_missing_delta(output_table, result + i);
1374          i += 4;
1375          num--;
1376       }
1377       free(result);
1378    }
1379
1380    ret = true;
1381
1382 bail_out:
1383    Mmsg(query, "DROP TABLE btemp%s", output_table);
1384    db->bdb_sql_query(query.c_str(), NULL, NULL);
1385    db->bdb_unlock();
1386    return ret;
1387 }
1388  
1389 void Bvfs::insert_missing_delta(char *output_table, int64_t *res)
1390 {
1391    char ed1[50], ed2[50];
1392    db_list_ctx lst;
1393    POOL_MEM query;
1394    JOB_DBR jr, jr2;
1395    memset(&jr, 0, sizeof(jr));
1396    memset(&jr2, 0, sizeof(jr2));
1397
1398    /* Need to limit the query to StartTime, Client/Fileset */
1399    jr2.JobId = res[1];
1400    db->bdb_get_job_record(jcr, &jr2);
1401
1402    jr.JobId = res[1];
1403    jr.ClientId = jr2.ClientId;
1404    jr.FileSetId = jr2.FileSetId;
1405    jr.JobLevel = L_INCREMENTAL;
1406    jr.StartTime = jr2.StartTime;
1407
1408    /* Get accurate jobid list */
1409    db->bdb_get_accurate_jobids(jcr, &jr, &lst);
1410
1411    Dmsg2(dbglevel_sql, "JobId list for %lld is %s\n", res[0], lst.list);
1412
1413    /* The list contains already the last DeltaSeq element, so
1414     * we don't need to select it in the next query
1415     */
1416    for (int l = strlen(lst.list); l > 0; l--) {
1417       if (lst.list[l] == ',') {
1418          lst.list[l] = '\0';
1419          break;
1420       }
1421    }
1422
1423    Dmsg1(dbglevel_sql, "JobId list after strip is %s\n", lst.list);
1424
1425    edit_int64(res[2], ed1);     /* fnid */
1426    edit_int64(res[3], ed2);     /* pathid */
1427
1428    int id=db->bdb_get_type_index();
1429    Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
1430         lst.list, ed1, ed2, 
1431         lst.list, ed1, ed2, 
1432         lst.list, lst.list);
1433
1434    Mmsg(db->cmd, "INSERT INTO %s "
1435                    "SELECT JobId, FileIndex, FileId FROM (%s) AS F1",
1436         output_table, query.c_str());
1437
1438    if (!db->bdb_sql_query(db->cmd, NULL, NULL)) {
1439       Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
1440    }
1441 }
1442
1443 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL */