]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Big backport from Enterprise
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19
20 #include "bacula.h"
21 #include "cats.h" 
22 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL
23 #include "lib/htable.h"
24 #include "bvfs.h"
25
26 #define can_access(x) (true)
27
28 /* from libbacfind */
29 extern int decode_stat(char *buf, struct stat *statp, int stat_size, int32_t *LinkFI);
30
31 #define dbglevel      DT_BVFS|10
32 #define dbglevel_sql  DT_SQL|15
33
34 static int result_handler(void *ctx, int fields, char **row)
35 {
36    if (fields == 4) {
37       Pmsg4(0, "%s\t%s\t%s\t%s\n",
38             row[0], row[1], row[2], row[3]);
39    } else if (fields == 5) {
40       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n",
41             row[0], row[1], row[2], row[3], row[4]);
42    } else if (fields == 6) {
43       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n",
44             row[0], row[1], row[2], row[3], row[4], row[5]);
45    } else if (fields == 7) {
46       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
47             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
48    }
49    return 0;
50 }
51
52 Bvfs::Bvfs(JCR *j, BDB *mdb)
53 {
54    jcr = j;
55    jcr->inc_use_count();
56    db = mdb;                 /* need to inc ref count */
57    jobids = get_pool_memory(PM_NAME);
58    prev_dir = get_pool_memory(PM_NAME);
59    pattern = get_pool_memory(PM_NAME);
60    filename = get_pool_memory(PM_NAME);
61    tmp = get_pool_memory(PM_NAME);
62    escaped_list = get_pool_memory(PM_NAME);
63    *filename = *jobids = *prev_dir = *pattern = 0;
64    pwd_id = offset = 0;
65    see_copies = see_all_versions = false;
66    compute_delta = true;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71    username = NULL;
72    job_acl = client_acl = pool_acl = fileset_acl = NULL;
73    last_dir_acl = NULL;
74    dir_acl = NULL;
75    use_acl = false;
76 }
77
78 Bvfs::~Bvfs() {
79    free_pool_memory(jobids);
80    free_pool_memory(pattern);
81    free_pool_memory(prev_dir);
82    free_pool_memory(filename);
83    free_pool_memory(tmp);
84    free_pool_memory(escaped_list);
85    if (username) {
86       free(username);
87    }
88    free_attr(attr);
89    jcr->dec_use_count();
90    if (dir_acl) {
91       delete dir_acl;
92    }
93 }
94
95 char *Bvfs::escape_list(alist *lst)
96 {
97    char *elt;
98    int len;
99
100    /* List is empty, reject everything */
101    if (!lst || lst->size() == 0) {
102       Mmsg(escaped_list, "''");
103       return escaped_list;
104    }
105
106    *tmp = 0;
107    *escaped_list = 0;
108
109    foreach_alist(elt, lst) {
110       if (elt && *elt) {
111          len = strlen(elt);
112          /* Escape + ' ' */
113          tmp = check_pool_memory_size(tmp, 2 * len + 2 + 2);
114
115          tmp[0] = '\'';
116          db->bdb_escape_string(jcr, tmp + 1 , elt, len);
117          pm_strcat(tmp, "'");
118
119          if (*escaped_list) {
120             pm_strcat(escaped_list, ",");
121          }
122
123          pm_strcat(escaped_list, tmp);
124       }
125    }
126    return escaped_list;
127 }
128
129 /* Returns the number of jobids in the result */
130 int Bvfs::filter_jobid()
131 {
132    POOL_MEM query;
133    POOL_MEM sub_where;
134    POOL_MEM sub_join;
135
136    /* No ACL, no username, no check */
137    if (!job_acl && !fileset_acl && !client_acl && !pool_acl && !username) {
138       Dmsg0(dbglevel_sql, "No ACL\n");
139       /* Just count the number of items in the list */
140       int nb = (*jobids != 0) ? 1 : 0;
141       for (char *p=jobids; *p ; p++) {
142          if (*p == ',') {
143             nb++;
144          }
145       }
146       return nb;
147    }
148
149    if (job_acl) {
150       Mmsg(sub_where, " AND Job.Name IN (%s) ", escape_list(job_acl));
151    }
152
153    if (fileset_acl) {
154       Mmsg(query, " AND FileSet.FileSet IN (%s) ", escape_list(fileset_acl));
155       pm_strcat(sub_where, query.c_str());
156       pm_strcat(sub_join, " JOIN FileSet USING (FileSetId) ");
157    }
158
159    if (client_acl) {
160       Mmsg(query, " AND Client.Name IN (%s) ", escape_list(client_acl));
161       pm_strcat(sub_where, query.c_str());
162    }
163
164    if (pool_acl) {
165       Mmsg(query, " AND Pool.Name IN (%s) ", escape_list(pool_acl));
166       pm_strcat(sub_where, query.c_str());
167       pm_strcat(sub_join, " JOIN Pool USING (PoolId) ");
168    }
169
170    if (username) {
171       /* Query used by Bweb to filter clients, activated when using
172        * set_username()
173        */
174       Mmsg(query,
175       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
176         "JOIN (SELECT ClientId FROM client_group_member "
177         "JOIN client_group USING (client_group_id) "
178         "JOIN bweb_client_group_acl USING (client_group_id) "
179         "JOIN bweb_user USING (userid) "
180        "WHERE bweb_user.username = '%s' "
181       ") AS filter USING (ClientId) "
182         " WHERE JobId IN (%s) %s",
183            sub_join.c_str(), username, jobids, sub_where.c_str());
184
185    } else {
186       Mmsg(query,
187       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
188       " WHERE JobId IN (%s) %s",
189            sub_join.c_str(), jobids, sub_where.c_str());
190    }
191
192    db_list_ctx ctx;
193    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
194    db->bdb_sql_query(query.c_str(), db_list_handler, &ctx);
195    pm_strcpy(jobids, ctx.list);
196    return ctx.count;
197 }
198
199 /* Return the number of jobids after the filter */
200 int Bvfs::set_jobid(JobId_t id)
201 {
202    Mmsg(jobids, "%lld", (uint64_t)id);
203    return filter_jobid();
204 }
205
206 /* Return the number of jobids after the filter */
207 int Bvfs::set_jobids(char *ids)
208 {
209    pm_strcpy(jobids, ids);
210    return filter_jobid();
211 }
212
213 /*
214  * TODO: Find a way to let the user choose how he wants to display
215  * files and directories
216  */
217
218
219 /*
220  * Working Object to store PathId already seen (avoid
221  * database queries), equivalent to %cache_ppathid in perl
222  */
223
224 #define NITEMS 50000
225 class pathid_cache {
226 private:
227    hlink *nodes;
228    int nb_node;
229    int max_node;
230
231    alist *table_node;
232
233    htable *cache_ppathid;
234
235 public:
236    pathid_cache() {
237       hlink link;
238       cache_ppathid = (htable *)malloc(sizeof(htable));
239       cache_ppathid->init(&link, &link, NITEMS);
240       max_node = NITEMS;
241       nodes = (hlink *) malloc(max_node * sizeof (hlink));
242       nb_node = 0;
243       table_node = New(alist(5, owned_by_alist));
244       table_node->append(nodes);
245    };
246
247    hlink *get_hlink() {
248       if (++nb_node >= max_node) {
249          nb_node = 0;
250          nodes = (hlink *)malloc(max_node * sizeof(hlink));
251          table_node->append(nodes);
252       }
253       return nodes + nb_node;
254    };
255
256    bool lookup(char *pathid) {
257       bool ret = cache_ppathid->lookup(pathid) != NULL;
258       return ret;
259    };
260
261    void insert(char *pathid) {
262       hlink *h = get_hlink();
263       cache_ppathid->insert(pathid, h);
264    };
265
266    ~pathid_cache() {
267       cache_ppathid->destroy();
268       free(cache_ppathid);
269       delete table_node;
270    };
271 private:
272    pathid_cache(const pathid_cache &); /* prohibit pass by value */
273    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
274 } ;
275
276 /* Return the parent_dir with the trailing /  (update the given string)
277  * TODO: see in the rest of bacula if we don't have already this function
278  * dir=/tmp/toto/
279  * dir=/tmp/
280  * dir=/
281  * dir=
282  */
283 char *bvfs_parent_dir(char *path)
284 {
285    char *p = path;
286    int len = strlen(path) - 1;
287
288    /* windows directory / */
289    if (len == 2 && B_ISALPHA(path[0])
290                 && path[1] == ':'
291                 && path[2] == '/')
292    {
293       len = 0;
294       path[0] = '\0';
295    }
296
297    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
298       path[len] = '\0';
299    }
300
301    if (len > 0) {
302       p += len;
303       while (p > path && !IsPathSeparator(*p)) {
304          p--;
305       }
306       p[1] = '\0';
307    }
308    return path;
309 }
310
311 /* Return the basename of the with the trailing /
312  * TODO: see in the rest of bacula if we don't have
313  * this function already
314  */
315 char *bvfs_basename_dir(char *path)
316 {
317    char *p = path;
318    int len = strlen(path) - 1;
319
320    if (path[len] == '/') {      /* if directory, skip last / */
321       len -= 1;
322    }
323
324    if (len > 0) {
325       p += len;
326       while (p > path && !IsPathSeparator(*p)) {
327          p--;
328       }
329       if (*p == '/') {
330          p++;                  /* skip first / */
331       }
332    }
333    return p;
334 }
335
336 static void build_path_hierarchy(JCR *jcr, BDB *mdb,
337                                  pathid_cache &ppathid_cache,
338                                  char *org_pathid, char *path)
339 {
340    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
341    char pathid[50];
342    ATTR_DBR parent;
343    char *bkp = mdb->path;
344    strncpy(pathid, org_pathid, sizeof(pathid));
345
346    /* Does the ppathid exist for this ? we use a memory cache...  In order to
347     * avoid the full loop, we consider that if a dir is allready in the
348     * PathHierarchy table, then there is no need to calculate all the
349     * hierarchy
350     */
351    while (path && *path)
352    {
353       if (!ppathid_cache.lookup(pathid))
354       {
355          Mmsg(mdb->cmd,
356               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
357               pathid);
358
359          if (!mdb->QueryDB(jcr, mdb->cmd)) {
360             goto bail_out;      /* Query failed, just leave */
361          }
362
363          /* Do we have a result ? */
364          if (mdb->sql_num_rows() > 0) {
365             ppathid_cache.insert(pathid);
366             /* This dir was in the db ...
367              * It means we can leave, the tree has allready been built for
368              * this dir
369              */
370             goto bail_out;
371          } else {
372             /* search or create parent PathId in Path table */
373             mdb->path = bvfs_parent_dir(path);
374             mdb->pnl = strlen(mdb->path);
375             if (!mdb->bdb_create_path_record(jcr, &parent)) {
376                goto bail_out;
377             }
378             ppathid_cache.insert(pathid);
379
380             Mmsg(mdb->cmd,
381                  "INSERT INTO PathHierarchy (PathId, PPathId) "
382                  "VALUES (%s,%lld)",
383                  pathid, (uint64_t) parent.PathId);
384
385             if (!mdb->InsertDB(jcr, mdb->cmd)) {
386                goto bail_out;   /* Can't insert the record, just leave */
387             }
388
389             edit_uint64(parent.PathId, pathid);
390             path = mdb->path;   /* already done */
391          }
392       } else {
393          /* It's already in the cache.  We can leave, no time to waste here,
394           * all the parent dirs have allready been done
395           */
396          goto bail_out;
397       }
398    }
399
400 bail_out:
401    mdb->path = bkp;
402    mdb->fnl = 0;
403 }
404
405 /*
406  * Internal function to update path_hierarchy cache with a shared pathid cache
407  * return Error 0
408  *        OK    1
409  */
410 static int update_path_hierarchy_cache(JCR *jcr,
411                                         BDB *mdb,
412                                         pathid_cache &ppathid_cache,
413                                         JobId_t JobId)
414 {
415    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
416    uint32_t ret=0;
417    uint32_t num;
418    char jobid[50];
419    edit_uint64(JobId, jobid);
420
421    mdb->bdb_lock();
422
423    /* We don't really want to harm users with spurious messages,
424     * everything is handled by transaction
425     */
426    mdb->set_use_fatal_jmsg(false);
427
428    mdb->bdb_start_transaction(jcr);
429
430    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
431
432    if (!mdb->QueryDB(jcr, mdb->cmd) || mdb->sql_num_rows() > 0) {
433       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
434       ret = 1;
435       goto bail_out;
436    }
437
438    /* Inserting path records for JobId */
439    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
440                    "SELECT DISTINCT PathId, JobId "
441                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
442                            "UNION "
443                            "SELECT PathId, BaseFiles.JobId "
444                              "FROM BaseFiles JOIN File AS F USING (FileId) "
445                             "WHERE BaseFiles.JobId = %s) AS B",
446         jobid, jobid);
447
448    if (!mdb->QueryDB(jcr, mdb->cmd)) {
449       Dmsg1(dbglevel, "Can't fill PathVisibility %d\n", (uint32_t)JobId );
450       goto bail_out;
451    }
452
453    /* Now we have to do the directory recursion stuff to determine missing
454     * visibility We try to avoid recursion, to be as fast as possible We also
455     * only work on not allready hierarchised directories...
456     */
457    Mmsg(mdb->cmd,
458      "SELECT PathVisibility.PathId, Path "
459        "FROM PathVisibility "
460             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
461             "LEFT JOIN PathHierarchy "
462          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
463       "WHERE PathVisibility.JobId = %s "
464         "AND PathHierarchy.PathId IS NULL "
465       "ORDER BY Path", jobid);
466    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
467
468    if (!mdb->QueryDB(jcr, mdb->cmd)) {
469       Dmsg1(dbglevel, "Can't get new Path %d\n", (uint32_t)JobId );
470       goto bail_out;
471    }
472
473    /* TODO: I need to reuse the DB connection without emptying the result
474     * So, now i'm copying the result in memory to be able to query the
475     * catalog descriptor again.
476     */
477    num = mdb->sql_num_rows();
478    if (num > 0) {
479       char **result = (char **)malloc (num * 2 * sizeof(char *));
480
481       SQL_ROW row;
482       int i=0;
483       while((row = mdb->sql_fetch_row())) {
484          result[i++] = bstrdup(row[0]);
485          result[i++] = bstrdup(row[1]);
486       }
487
488       i=0;
489       while (num > 0) {
490          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
491          free(result[i++]);
492          free(result[i++]);
493          num--;
494       }
495       free(result);
496    }
497
498    if (mdb->bdb_get_type_index() == SQL_TYPE_SQLITE3) {
499       Mmsg(mdb->cmd,
500  "INSERT INTO PathVisibility (PathId, JobId) "
501    "SELECT DISTINCT h.PPathId AS PathId, %s "
502      "FROM PathHierarchy AS h "
503     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
504       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
505            jobid, jobid, jobid );
506
507    } else if (mdb->bdb_get_type_index() == SQL_TYPE_MYSQL) {
508       Mmsg(mdb->cmd,
509   "INSERT INTO PathVisibility (PathId, JobId)  "
510    "SELECT a.PathId,%s "
511    "FROM ( "
512      "SELECT DISTINCT h.PPathId AS PathId "
513        "FROM PathHierarchy AS h "
514        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
515       "WHERE p.JobId=%s) AS a "
516       "LEFT JOIN PathVisibility AS b ON (b.JobId=%s and a.PathId = b.PathId) "
517       "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
518
519    } else {
520       Mmsg(mdb->cmd,
521   "INSERT INTO PathVisibility (PathId, JobId)  "
522    "SELECT a.PathId,%s "
523    "FROM ( "
524      "SELECT DISTINCT h.PPathId AS PathId "
525        "FROM PathHierarchy AS h "
526        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
527       "WHERE p.JobId=%s) AS a LEFT JOIN "
528        "(SELECT PathId "
529           "FROM PathVisibility "
530          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
531    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
532    }
533
534    do {
535       ret = mdb->QueryDB(jcr, mdb->cmd);
536    } while (ret && mdb->sql_affected_rows() > 0);
537
538    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
539    ret = mdb->UpdateDB(jcr, mdb->cmd, false);
540
541 bail_out:
542    mdb->bdb_end_transaction(jcr);
543
544    if (!ret) {
545       Mmsg(mdb->cmd, "SELECT HasCache FROM Job WHERE JobId=%s", jobid);
546       mdb->bdb_sql_query(mdb->cmd, db_int_handler, &ret);
547    }
548
549    /* Enable back the FATAL message if something is wrong */
550    mdb->set_use_fatal_jmsg(true);
551
552    mdb->bdb_unlock();
553    return ret;
554 }
555
556 /* Compute the cache for the bfileview compoment */
557 void Bvfs::fv_update_cache()
558 {
559    int64_t pathid;
560    int64_t size=0, count=0;
561
562    Dmsg0(dbglevel, "fv_update_cache()\n");
563
564    if (!*jobids) {
565       return;                   /* Nothing to build */
566    }
567
568    db->bdb_lock();
569    /* We don't really want to harm users with spurious messages,
570     * everything is handled by transaction
571     */
572    db->set_use_fatal_jmsg(false);
573
574    db->bdb_start_transaction(jcr);
575
576    pathid = get_root();
577
578    fv_compute_size_and_count(pathid, &size, &count);
579
580    db->bdb_end_transaction(jcr);
581
582    /* Enable back the FATAL message if something is wrong */
583    db->set_use_fatal_jmsg(true);
584
585    db->bdb_unlock();
586 }
587
588 /* 
589  * Find an store the filename descriptor for empty directories Filename.Name=''
590  */
591 DBId_t Bvfs::get_dir_filenameid()
592 {
593    uint32_t id;
594    if (dir_filenameid) {
595       return dir_filenameid;
596    }
597    Mmsg(db->cmd, "SELECT FilenameId FROM Filename WHERE Name = ''");
598    db_sql_query(db, db->cmd, db_int_handler, &id);
599    dir_filenameid = id;
600    return dir_filenameid;
601 }
602
603 /* Not yet working */
604 void Bvfs::fv_get_big_files(int64_t pathid, int64_t min_size, int32_t limit)
605 {
606    Mmsg(db->cmd, 
607         "SELECT FilenameId AS filenameid, Name AS name, size "
608           "FROM ( "
609          "SELECT FilenameId, base64_decode_lstat(8,LStat) AS size "
610            "FROM File "
611           "WHERE PathId  = %lld "
612             "AND JobId = %s "
613         ") AS S INNER JOIN Filename USING (FilenameId) "
614      "WHERE S.size > %lld "
615      "ORDER BY S.size DESC "
616      "LIMIT %d ", pathid, jobids, min_size, limit);
617 }
618
619
620 /* Get the current path size and files count */
621 void Bvfs::fv_get_current_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
622 {
623    SQL_ROW row;
624
625    *size = *count = 0;
626
627    Mmsg(db->cmd,
628  "SELECT Size AS size, Files AS files "
629   " FROM PathVisibility "
630  " WHERE PathId = %lld "
631    " AND JobId = %s ", pathid, jobids);
632
633    if (!db->QueryDB(jcr, db->cmd)) {
634       return;
635    }
636
637    if ((row = db->sql_fetch_row())) {
638       *size = str_to_int64(row[0]);
639       *count =  str_to_int64(row[1]);
640    }
641 }
642
643 /* Compute for the current path the size and files count */
644 void Bvfs::fv_get_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
645 {
646    SQL_ROW row;
647
648    *size = *count = 0;
649
650    Mmsg(db->cmd,
651  "SELECT sum(base64_decode_lstat(8,LStat)) AS size, count(1) AS files "
652   " FROM File "
653  " WHERE PathId = %lld "
654    " AND JobId = %s ", pathid, jobids);
655
656    if (!db->QueryDB(jcr, db->cmd)) {
657       return;
658    }
659
660    if ((row = db->sql_fetch_row())) {
661       *size = str_to_int64(row[0]);
662       *count =  str_to_int64(row[1]);
663    }
664 }
665
666 void Bvfs::fv_compute_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
667 {
668    Dmsg1(dbglevel, "fv_compute_size_and_count(%lld)\n", pathid);
669
670    fv_get_current_size_and_count(pathid, size, count);
671    if (*size > 0) {
672       return;
673    }
674
675    /* Update stats for the current directory */
676    fv_get_size_and_count(pathid, size, count);
677
678    /* Update stats for all sub directories */
679    Mmsg(db->cmd,
680         " SELECT PathId "
681           " FROM PathVisibility "
682                " INNER JOIN PathHierarchy USING (PathId) "
683          " WHERE PPathId  = %lld "
684            " AND JobId = %s ", pathid, jobids);
685
686    db->QueryDB(jcr, db->cmd);
687    int num = db->sql_num_rows();
688
689    if (num > 0) {
690       int64_t *result = (int64_t *)malloc (num * sizeof(int64_t));
691       SQL_ROW row;
692       int i=0;
693
694       while((row = db->sql_fetch_row())) {
695          result[i++] = str_to_int64(row[0]); /* PathId */
696       }
697
698       i=0;
699       while (num > 0) {
700          int64_t c=0, s=0;
701          fv_compute_size_and_count(result[i], &s, &c);
702          *size += s;
703          *count += c;
704
705          i++;
706          num--;
707       }
708       free(result);
709    }
710
711    fv_update_size_and_count(pathid, *size, *count);
712 }
713
714 void Bvfs::fv_update_size_and_count(int64_t pathid, int64_t size, int64_t count)
715 {
716    Mmsg(db->cmd,
717         "UPDATE PathVisibility SET Files = %lld, Size = %lld "
718         " WHERE JobId = %s "
719         " AND PathId = %lld ", count, size, jobids, pathid);
720
721    db->UpdateDB(jcr, db->cmd, false);
722 }
723
724 void bvfs_update_cache(JCR *jcr, BDB *mdb)
725 {
726    uint32_t nb=0;
727    db_list_ctx jobids_list;
728
729    mdb->bdb_lock();
730
731 #ifdef xxx
732    /* TODO: Remove this code when updating make_bacula_table script */
733    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
734    if (!mdb->QueryDB(jcr, mdb->cmd)) {
735       Dmsg0(dbglevel, "Creating cache table\n");
736       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
737       mdb->QueryDB(jcr, mdb->cmd);
738
739       Mmsg(mdb->cmd,
740            "CREATE TABLE PathHierarchy ( "
741            "PathId integer NOT NULL, "
742            "PPathId integer NOT NULL, "
743            "CONSTRAINT pathhierarchy_pkey "
744            "PRIMARY KEY (PathId))");
745       mdb->QueryDB(jcr, mdb->cmd);
746
747       Mmsg(mdb->cmd,
748            "CREATE INDEX pathhierarchy_ppathid "
749            "ON PathHierarchy (PPathId)");
750       mdb->QueryDB(jcr, mdb->cmd);
751
752       Mmsg(mdb->cmd,
753            "CREATE TABLE PathVisibility ("
754            "PathId integer NOT NULL, "
755            "JobId integer NOT NULL, "
756            "Size int8 DEFAULT 0, "
757            "Files int4 DEFAULT 0, "
758            "CONSTRAINT pathvisibility_pkey "
759            "PRIMARY KEY (JobId, PathId))");
760       mdb->QueryDB(jcr, mdb->cmd);
761
762       Mmsg(mdb->cmd,
763            "CREATE INDEX pathvisibility_jobid "
764            "ON PathVisibility (JobId)");
765       mdb->QueryDB(jcr, mdb->cmd);
766
767    }
768 #endif
769
770    Mmsg(mdb->cmd,
771  "SELECT JobId from Job "
772   "WHERE HasCache = 0 "
773     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
774   "ORDER BY JobId");
775
776    mdb->bdb_sql_query(mdb->cmd, db_list_handler, &jobids_list);
777
778    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
779
780    mdb->bdb_start_transaction(jcr);
781    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
782    Mmsg(mdb->cmd,
783         "DELETE FROM PathVisibility "
784          "WHERE NOT EXISTS "
785         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
786    nb = mdb->DeleteDB(jcr, mdb->cmd);
787    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
788
789    mdb->bdb_end_transaction(jcr);
790    mdb->bdb_unlock();
791 }
792
793 /*
794  * Update the bvfs cache for given jobids (1,2,3,4)
795  */
796 int
797 bvfs_update_path_hierarchy_cache(JCR *jcr, BDB *mdb, char *jobids)
798 {
799    pathid_cache ppathid_cache;
800    JobId_t JobId;
801    char *p;
802    int ret=1;
803
804    for (p=jobids; ; ) {
805       int stat = get_next_jobid_from_list(&p, &JobId);
806       if (stat < 0) {
807          ret = 0;
808          break;
809       }
810       if (stat == 0) {
811          break;
812       }
813       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
814       if (!update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId)) {
815          ret = 0;
816       }
817    }
818    return ret;
819 }
820
821 /*
822  * Update the bvfs fileview for given jobids
823  */
824 void
825 bvfs_update_fv_cache(JCR *jcr, BDB *mdb, char *jobids)
826 {
827    char *p;
828    JobId_t JobId;
829    Bvfs bvfs(jcr, mdb);
830
831    for (p=jobids; ; ) {
832       int stat = get_next_jobid_from_list(&p, &JobId);
833       if (stat < 0) {
834          return;
835       }
836       if (stat == 0) {
837          break;
838       }
839
840       Dmsg1(dbglevel, "Trying to create cache for %lld\n", (int64_t)JobId);
841
842       bvfs.set_jobid(JobId);
843       bvfs.fv_update_cache();
844    }
845 }
846
847 /*
848  * Update the bvfs cache for current jobids
849  */
850 void Bvfs::update_cache()
851 {
852    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
853 }
854
855
856 bool Bvfs::ch_dir(DBId_t pathid)
857 {
858    reset_offset();
859
860    pwd_id = pathid;
861    return pwd_id != 0;
862 }
863
864
865 /* Change the current directory, returns true if the path exists */
866 bool Bvfs::ch_dir(const char *path)
867 {
868    db->bdb_lock();
869    pm_strcpy(db->path, path);
870    db->pnl = strlen(db->path);
871    ch_dir(db->bdb_get_path_record(jcr));
872    db->bdb_unlock();
873    return pwd_id != 0;
874 }
875
876 /*
877  * Get all file versions for a specified client
878  * TODO: Handle basejobs using different client
879  */
880 void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, const char *client)
881 {
882    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
883          (uint64_t)fnid, client);
884    char ed1[50], ed2[50];
885    POOL_MEM q;
886    if (see_copies) {
887       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
888    } else {
889       Mmsg(q, " AND Job.Type = 'B' ");
890    }
891
892    POOL_MEM query;
893
894    Mmsg(query,//    1           2              3       
895 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
896 //         4          5           6
897         "File.JobId, File.LStat, File.FileId, "
898 //         7                    8
899        "Media.VolumeName, Media.InChanger "
900 "FROM File, Job, Client, JobMedia, Media "
901 "WHERE File.FilenameId = %s "
902   "AND File.PathId=%s "
903   "AND File.JobId = Job.JobId "
904   "AND Job.JobId = JobMedia.JobId "
905   "AND File.FileIndex >= JobMedia.FirstIndex "
906   "AND File.FileIndex <= JobMedia.LastIndex "
907   "AND JobMedia.MediaId = Media.MediaId "
908   "AND Job.ClientId = Client.ClientId "
909   "AND Client.Name = '%s' "
910   "%s ORDER BY FileId LIMIT %d OFFSET %d"
911         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
912         limit, offset);
913    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
914    db->bdb_sql_query(query.c_str(), list_entries, user_data);
915 }
916
917 /*
918  * Get all file versions for a specified client
919  * TODO: Handle basejobs using different client
920  */
921 bool Bvfs::get_delta(FileId_t fileid)
922 {
923    Dmsg1(dbglevel, "get_delta(%lld)\n", (uint64_t)fileid);
924    char ed1[50];
925    int32_t num;
926    SQL_ROW row;
927    POOL_MEM q;
928    POOL_MEM query;
929    char *fn = NULL;
930    bool ret = false;
931    db->bdb_lock();
932
933    /* Check if some FileId have DeltaSeq > 0
934     * Foreach of them we need to get the accurate_job list, and compute
935     * what are dependencies
936     */
937    Mmsg(query,
938         "SELECT F.JobId, FN.Name, F.PathId, F.DeltaSeq "
939         "FROM File AS F, Filename AS FN WHERE FileId = %lld "
940         "AND FN.FilenameId = F.FilenameId AND DeltaSeq > 0", fileid);
941
942    if (!db->QueryDB(jcr, query.c_str())) {
943       Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
944       goto bail_out;
945    }
946
947    /* TODO: Use an other DB connection can avoid to copy the result of the
948     * previous query into a temporary buffer
949     */
950    num = db->sql_num_rows();
951    Dmsg2(dbglevel, "Found %d Delta parts q=%s\n",
952          num, query.c_str());
953
954    if (num > 0 && (row = db->sql_fetch_row())) {
955       JOB_DBR jr, jr2;
956       db_list_ctx lst;
957       memset(&jr, 0, sizeof(jr));
958       memset(&jr2, 0, sizeof(jr2));
959
960       fn = bstrdup(row[1]);               /* Filename */
961       int64_t jid = str_to_int64(row[0]);     /* JobId */
962       int64_t pid = str_to_int64(row[2]);     /* PathId */
963
964       /* Need to limit the query to StartTime, Client/Fileset */
965       jr2.JobId = jid;
966       if (!db->bdb_get_job_record(jcr, &jr2)) {
967          Dmsg1(0, "Unable to get job record for jobid %d\n", jid);
968          goto bail_out;
969       }
970
971       jr.JobId = jid;
972       jr.ClientId = jr2.ClientId;
973       jr.FileSetId = jr2.FileSetId;
974       jr.JobLevel = L_INCREMENTAL;
975       jr.StartTime = jr2.StartTime;
976
977       /* Get accurate jobid list */
978       if (!db->bdb_get_accurate_jobids(jcr, &jr, &lst)) {
979          Dmsg1(0, "Unable to get Accurate list for jobid %d\n", jid);
980          goto bail_out;
981       }
982
983       /* Escape filename */
984       db->fnl = strlen(fn);
985       db->esc_name = check_pool_memory_size(db->esc_name, 2*db->fnl+2);
986       db->bdb_escape_string(jcr, db->esc_name, fn, db->fnl);
987
988       edit_int64(pid, ed1);     /* pathid */
989
990       int id=db->bdb_get_type_index();
991       Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
992            lst.list, db->esc_name, ed1,
993            lst.list, db->esc_name, ed1,
994            lst.list, lst.list);
995
996       Mmsg(db->cmd,
997            //       0     1     2   3      4      5      6            7
998            "SELECT 'd', PathId, 0, JobId, LStat, FileId, DeltaSeq, JobTDate"
999            " FROM (%s) AS F1 "
1000            "ORDER BY DeltaSeq ASC",
1001            query.c_str());
1002
1003       Dmsg1(dbglevel_sql, "q=%s\n", db->cmd);
1004
1005       if (!db->bdb_sql_query(db->cmd, list_entries, user_data)) {
1006          Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
1007          goto bail_out;
1008       }
1009    }
1010    ret = true;
1011 bail_out:
1012    if (fn) {
1013       free(fn);
1014    }
1015    db->bdb_unlock();
1016    return ret;
1017 }
1018
1019 /*
1020  * Get all volumes for a specific file
1021  */
1022 void Bvfs::get_volumes(FileId_t fileid)
1023 {
1024    Dmsg1(dbglevel, "get_volumes(%lld)\n", (uint64_t)fileid);
1025
1026    char ed1[50];
1027    POOL_MEM query;
1028
1029    Mmsg(query,
1030 //                                   7                8
1031 "SELECT DISTINCT 'L',0,0,0,0,0,0, Media.VolumeName, Media.InChanger "
1032 "FROM File JOIN JobMedia USING (JobId) JOIN Media USING (MediaId) "
1033 "WHERE File.FileId = %s "
1034   "AND File.FileIndex >= JobMedia.FirstIndex "
1035   "AND File.FileIndex <= JobMedia.LastIndex "
1036   " LIMIT %d OFFSET %d"
1037         ,edit_uint64(fileid, ed1), limit, offset);
1038    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1039    db->bdb_sql_query(query.c_str(), list_entries, user_data);
1040 }
1041
1042 DBId_t Bvfs::get_root()
1043 {
1044    int p;
1045    *db->path = 0;
1046    db->bdb_lock();
1047    p = db->bdb_get_path_record(jcr);
1048    db->bdb_unlock();
1049    return p;
1050 }
1051
1052 static int path_handler(void *ctx, int fields, char **row)
1053 {
1054    Bvfs *fs = (Bvfs *) ctx;
1055    return fs->_handle_path(ctx, fields, row);
1056 }
1057
1058 int Bvfs::_handle_path(void *ctx, int fields, char **row)
1059 {
1060    if (bvfs_is_dir(row)) {
1061       /* can have the same path 2 times */
1062       if (strcmp(row[BVFS_PathId], prev_dir)) {
1063          pm_strcpy(prev_dir, row[BVFS_PathId]);
1064          if (strcmp(NPRTB(row[BVFS_FileIndex]), "0") == 0 &&
1065              strcmp(NPRTB(row[BVFS_FileId]), "0") != 0)
1066          {
1067             /* The directory was probably deleted */
1068             return 0;
1069          }
1070          return list_entries(user_data, fields, row);
1071       }
1072    }
1073    return 0;
1074 }
1075
1076 /*
1077  * Retrieve . and .. information
1078  */
1079 void Bvfs::ls_special_dirs()
1080 {
1081    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
1082    char ed1[50], ed2[50];
1083    if (*jobids == 0) {
1084       return;
1085    }
1086    if (!dir_filenameid) {
1087       get_dir_filenameid();
1088    }
1089
1090    /* Will fetch directories  */
1091    *prev_dir = 0;
1092
1093    POOL_MEM query;
1094    Mmsg(query,
1095 "(SELECT PPathId AS PathId, '..' AS Path "
1096     "FROM  PathHierarchy "
1097    "WHERE  PathId = %s "
1098 "UNION "
1099  "SELECT %s AS PathId, '.' AS Path)",
1100         edit_uint64(pwd_id, ed1), ed1);
1101
1102    POOL_MEM query2;
1103    Mmsg(query2,// 1      2     3        4     5       6
1104 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
1105   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
1106        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
1107               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
1108        "WHERE File1.FilenameId = %s "
1109        "AND File1.JobId IN (%s)) AS listfile1 "
1110   "ON (tmp.PathId = listfile1.PathId) "
1111   "ORDER BY tmp.Path, JobId DESC ",
1112         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
1113
1114    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
1115    db->bdb_sql_query(query2.c_str(), path_handler, this);
1116 }
1117
1118 /* Returns true if we have dirs to read */
1119 bool Bvfs::ls_dirs()
1120 {
1121    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
1122    char ed1[50], ed2[50];
1123    if (*jobids == 0) {
1124       return false;
1125    }
1126
1127    POOL_MEM query;
1128    POOL_MEM filter;
1129    if (*pattern) {
1130       Mmsg(filter, " AND Path2.Path %s '%s' ",
1131            match_query[db->bdb_get_type_index()], pattern);
1132
1133    }
1134
1135    if (!dir_filenameid) {
1136       get_dir_filenameid();
1137    }
1138
1139    /* the sql query displays same directory multiple time, take the first one */
1140    *prev_dir = 0;
1141
1142    /* Let's retrieve the list of the visible dirs in this dir ...
1143     * First, I need the empty filenameid to locate efficiently
1144     * the dirs in the file table
1145     * my $dir_filenameid = $self->get_dir_filenameid();
1146     */
1147    /* Then we get all the dir entries from File ... */
1148    Mmsg(query,
1149 //       0     1     2   3      4     5       6
1150 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
1151     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
1152            "lower(Path1.Path) AS lpath, "
1153            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
1154            "listfile1.FileId AS FileId "
1155     "FROM ( "
1156       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
1157       "FROM PathHierarchy AS PathHierarchy1 "
1158       "JOIN Path AS Path2 "
1159         "ON (PathHierarchy1.PathId = Path2.PathId) "
1160       "JOIN PathVisibility AS PathVisibility1 "
1161         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
1162       "WHERE PathHierarchy1.PPathId = %s "
1163       "AND PathVisibility1.JobId IN (%s) "
1164            "%s "
1165      ") AS listpath1 "
1166    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
1167
1168    "LEFT JOIN ( " /* get attributes if any */
1169        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
1170               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
1171        "WHERE File1.FilenameId = %s "
1172        "AND File1.JobId IN (%s)) AS listfile1 "
1173        "ON (listpath1.PathId = listfile1.PathId) "
1174     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
1175         edit_uint64(pwd_id, ed1),
1176         jobids,
1177         filter.c_str(),
1178         edit_uint64(dir_filenameid, ed2),
1179         jobids,
1180         limit, offset);
1181
1182    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1183
1184    db->bdb_lock();
1185    db->bdb_sql_query(query.c_str(), path_handler, this);
1186    nb_record = db->sql_num_rows();
1187    db->bdb_unlock();
1188
1189    return nb_record == limit;
1190 }
1191
1192 void build_ls_files_query(BDB *db, POOL_MEM &query,
1193                           const char *JobId, const char *PathId,
1194                           const char *filter, int64_t limit, int64_t offset)
1195 {
1196    if (db->bdb_get_type_index() == SQL_TYPE_POSTGRESQL) {
1197       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1198            JobId, PathId, JobId, PathId,
1199            filter, limit, offset);
1200    } else {
1201       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1202            JobId, PathId, JobId, PathId,
1203            limit, offset, filter, JobId, JobId);
1204    }
1205 }
1206
1207 /* Returns true if we have files to read */
1208 bool Bvfs::ls_files()
1209 {
1210    POOL_MEM query;
1211    POOL_MEM filter;
1212    char pathid[50];
1213
1214    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
1215    if (*jobids == 0) {
1216       return false;
1217    }
1218
1219    if (!pwd_id) {
1220       ch_dir(get_root());
1221    }
1222
1223    edit_uint64(pwd_id, pathid);
1224    if (*pattern) {
1225       Mmsg(filter, " AND Filename.Name %s '%s' ", 
1226            match_query[db_get_type_index(db)], pattern);
1227
1228    } else if (*filename) {
1229       Mmsg(filter, " AND Filename.Name = '%s' ", filename);
1230    }
1231
1232    build_ls_files_query(db, query,
1233                         jobids, pathid, filter.c_str(),
1234                         limit, offset);
1235
1236    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1237
1238    db->bdb_lock();
1239    db->bdb_sql_query(query.c_str(), list_entries, user_data);
1240    nb_record = db->sql_num_rows();
1241    db->bdb_unlock();
1242
1243    return nb_record == limit;
1244 }
1245
1246
1247 /*
1248  * Return next Id from comma separated list
1249  *
1250  * Returns:
1251  *   1 if next Id returned
1252  *   0 if no more Ids are in list
1253  *  -1 there is an error
1254  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
1255  */
1256 static int get_next_id_from_list(char **p, int64_t *Id)
1257 {
1258    const int maxlen = 30;
1259    char id[maxlen+1];
1260    char *q = *p;
1261
1262    id[0] = 0;
1263    for (int i=0; i<maxlen; i++) {
1264       if (*q == 0) {
1265          break;
1266       } else if (*q == ',') {
1267          q++;
1268          break;
1269       }
1270       id[i] = *q++;
1271       id[i+1] = 0;
1272    }
1273    if (id[0] == 0) {
1274       return 0;
1275    } else if (!is_a_number(id)) {
1276       return -1;                      /* error */
1277    }
1278    *p = q;
1279    *Id = str_to_int64(id);
1280    return 1;
1281 }
1282
1283 static int get_path_handler(void *ctx, int fields, char **row)
1284 {
1285    POOL_MEM *buf = (POOL_MEM *) ctx;
1286    pm_strcpy(*buf, row[0]);
1287    return 0;
1288 }
1289
1290 static bool check_temp(char *output_table)
1291 {
1292    if (output_table[0] == 'b' &&
1293        output_table[1] == '2' &&
1294        is_an_integer(output_table + 2))
1295    {
1296       return true;
1297    }
1298    return false;
1299 }
1300
1301 void Bvfs::clear_cache()
1302 {
1303    db->bdb_sql_query("BEGIN",                     NULL, NULL);
1304    db->bdb_sql_query("UPDATE Job SET HasCache=0", NULL, NULL);
1305    db->bdb_sql_query("TRUNCATE PathHierarchy",    NULL, NULL);
1306    db->bdb_sql_query("TRUNCATE PathVisibility",   NULL, NULL);
1307    db->bdb_sql_query("COMMIT",                    NULL, NULL);
1308 }
1309
1310 bool Bvfs::drop_restore_list(char *output_table)
1311 {
1312    POOL_MEM query;
1313    if (check_temp(output_table)) {
1314       Mmsg(query, "DROP TABLE %s", output_table);
1315       db->bdb_sql_query(query.c_str(), NULL, NULL);
1316       return true;
1317    }
1318    return false;
1319 }
1320
1321 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
1322                                 char *output_table)
1323 {
1324    POOL_MEM query;
1325    POOL_MEM tmp, tmp2;
1326    int64_t id, jobid, prev_jobid;
1327    int num;
1328    bool init=false;
1329    bool ret=false;
1330    /* check args */
1331    if ((*fileid   && !is_a_number_list(fileid))  ||
1332        (*dirid    && !is_a_number_list(dirid))   ||
1333        (*hardlink && !is_a_number_list(hardlink))||
1334        (!*hardlink && !*fileid && !*dirid && !*hardlink))
1335    {
1336       return false;
1337    }
1338    if (!check_temp(output_table)) {
1339       return false;
1340    }
1341
1342    db->bdb_lock();
1343
1344    /* Cleanup old tables first */
1345    Mmsg(query, "DROP TABLE btemp%s", output_table);
1346    db->bdb_sql_query(query.c_str());
1347
1348    Mmsg(query, "DROP TABLE %s", output_table);
1349    db->bdb_sql_query(query.c_str());
1350
1351    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
1352
1353    if (*fileid) {               /* Select files with their direct id */
1354       init=true;
1355       Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, Filename.Name, "
1356                       "PathId, FileId "
1357                  "FROM File,Job,Filename WHERE Job.JobId=File.Jobid "
1358                  "AND File.FilenameId=Filename.FilenameId AND  FileId IN (%s)",
1359            fileid);
1360       pm_strcat(query, tmp.c_str());
1361    }
1362
1363    /* Add a directory content */
1364    while (get_next_id_from_list(&dirid, &id) == 1) {
1365       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
1366
1367       if (!db->bdb_sql_query(tmp.c_str(), get_path_handler, (void *)&tmp2)) {
1368          Dmsg0(dbglevel, "Can't search for path\n");
1369          /* print error */
1370          goto bail_out;
1371       }
1372       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
1373          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
1374                id, tmp.c_str(), tmp2.c_str());
1375          break;
1376       }
1377       /* escape % and _ for LIKE search */
1378       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
1379       char *p = tmp.c_str();
1380       for (char *s = tmp2.c_str(); *s ; s++) {
1381          if (*s == '%' || *s == '_' || *s == '\\') {
1382             *p = '\\';
1383             p++;
1384          }
1385          *p = *s;
1386          p++;
1387       }
1388       *p = '\0';
1389       tmp.strcat("%");
1390
1391       size_t len = strlen(tmp.c_str());
1392       tmp2.check_size((len+1) * 2);
1393       db->bdb_escape_string(jcr, tmp2.c_str(), tmp.c_str(), len);
1394
1395       if (init) {
1396          query.strcat(" UNION ");
1397       }
1398
1399       Mmsg(tmp, "SELECT Job.JobId, JobTDate, File.FileIndex, File.FilenameId, "
1400                         "File.PathId, FileId "
1401                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
1402                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ",
1403            tmp2.c_str(), jobids);
1404       query.strcat(tmp.c_str());
1405       init = true;
1406
1407       query.strcat(" UNION ");
1408
1409       /* A directory can have files from a BaseJob */
1410       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
1411                         "File.FilenameId, File.PathId, BaseFiles.FileId "
1412                    "FROM BaseFiles "
1413                         "JOIN File USING (FileId) "
1414                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
1415                         "JOIN Path USING (PathId) "
1416                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ",
1417            tmp2.c_str(), jobids);
1418       query.strcat(tmp.c_str());
1419    }
1420
1421    /* expect jobid,fileindex */
1422    prev_jobid=0;
1423    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
1424       if (get_next_id_from_list(&hardlink, &id) != 1) {
1425          Dmsg0(dbglevel, "hardlink should be two by two\n");
1426          goto bail_out;
1427       }
1428       if (jobid != prev_jobid) { /* new job */
1429          if (prev_jobid == 0) {  /* first jobid */
1430             if (init) {
1431                query.strcat(" UNION ");
1432             }
1433          } else {               /* end last job, start new one */
1434             tmp.strcat(") UNION ");
1435             query.strcat(tmp.c_str());
1436          }
1437          Mmsg(tmp,   "SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1438                             "PathId, FileId "
1439                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld "
1440                         "AND FileIndex IN (%lld", jobid, id);
1441          prev_jobid = jobid;
1442
1443       } else {                  /* same job, add new findex */
1444          Mmsg(tmp2, ", %lld", id);
1445          tmp.strcat(tmp2.c_str());
1446       }
1447    }
1448
1449    if (prev_jobid != 0) {       /* end last job */
1450       tmp.strcat(") ");
1451       query.strcat(tmp.c_str());
1452       init = true;
1453    }
1454
1455    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1456
1457    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1458       Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1459       goto bail_out;
1460    }
1461
1462    Mmsg(query, sql_bvfs_select[db->bdb_get_type_index()],
1463         output_table, output_table, output_table);
1464
1465    /* TODO: handle jobid filter */
1466    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1467    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1468       Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1469       goto bail_out;
1470    }
1471
1472    /* MySQL needs the index */
1473    if (db->bdb_get_type_index() == SQL_TYPE_MYSQL) {
1474       Mmsg(query, "CREATE INDEX idx_%s ON %s (JobId)",
1475            output_table, output_table);
1476       Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1477       if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1478          Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1479          goto bail_out; 
1480       } 
1481    }
1482
1483    /* Check if some FileId have DeltaSeq > 0
1484     * Foreach of them we need to get the accurate_job list, and compute
1485     * what are dependencies
1486     */
1487    Mmsg(query, 
1488         "SELECT F.FileId, F.JobId, F.FilenameId, F.PathId, F.DeltaSeq "
1489           "FROM File AS F JOIN Job USING (JobId) JOIN %s USING (FileId) "
1490          "WHERE DeltaSeq > 0", output_table);
1491
1492    if (!db->QueryDB(jcr, query.c_str())) {
1493       Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
1494    }
1495
1496    /* TODO: Use an other DB connection can avoid to copy the result of the
1497     * previous query into a temporary buffer
1498     */
1499    num = db->sql_num_rows();
1500    Dmsg2(dbglevel, "Found %d Delta parts in restore selection q=%s\n", num, query.c_str());
1501
1502    if (num > 0) {
1503       int64_t *result = (int64_t *)malloc (num * 4 * sizeof(int64_t));
1504       SQL_ROW row;
1505       int i=0;
1506
1507       while((row = db->sql_fetch_row())) {
1508          result[i++] = str_to_int64(row[0]); /* FileId */
1509          result[i++] = str_to_int64(row[1]); /* JobId */
1510          result[i++] = str_to_int64(row[2]); /* FilenameId */
1511          result[i++] = str_to_int64(row[3]); /* PathId */
1512       } 
1513
1514       i=0;
1515       while (num > 0) {
1516          insert_missing_delta(output_table, result + i);
1517          i += 4;
1518          num--;
1519       }
1520       free(result);
1521    }
1522
1523    ret = true;
1524
1525 bail_out:
1526    Mmsg(query, "DROP TABLE btemp%s", output_table);
1527    db->bdb_sql_query(query.c_str(), NULL, NULL);
1528    db->bdb_unlock();
1529    return ret;
1530 }
1531  
1532 void Bvfs::insert_missing_delta(char *output_table, int64_t *res)
1533 {
1534    char ed1[50];
1535    db_list_ctx lst;
1536    POOL_MEM query;
1537    JOB_DBR jr, jr2;
1538    memset(&jr, 0, sizeof(jr));
1539    memset(&jr2, 0, sizeof(jr2));
1540
1541    /* Need to limit the query to StartTime, Client/Fileset */
1542    jr2.JobId = res[1];
1543    db->bdb_get_job_record(jcr, &jr2);
1544
1545    jr.JobId = res[1];
1546    jr.ClientId = jr2.ClientId;
1547    jr.FileSetId = jr2.FileSetId;
1548    jr.JobLevel = L_INCREMENTAL;
1549    jr.StartTime = jr2.StartTime;
1550
1551    /* Get accurate jobid list */
1552    db->bdb_get_accurate_jobids(jcr, &jr, &lst);
1553
1554    Dmsg2(dbglevel_sql, "JobId list for %lld is %s\n", res[0], lst.list);
1555
1556    /* The list contains already the last DeltaSeq element, so
1557     * we don't need to select it in the next query
1558     */
1559    for (int l = strlen(lst.list); l > 0; l--) {
1560       if (lst.list[l] == ',') {
1561          lst.list[l] = '\0';
1562          break;
1563       }
1564    }
1565
1566    Dmsg1(dbglevel_sql, "JobId list after strip is %s\n", lst.list);
1567
1568    /* Escape filename */
1569    db->fnl = strlen((char *)res[2]);
1570    db->esc_name = check_pool_memory_size(db->esc_name, 2*db->fnl+2);
1571    db->bdb_escape_string(jcr, db->esc_name, (char *)res[2], db->fnl);
1572
1573    edit_int64(res[3], ed1);     /* pathid */
1574
1575    int id=db->bdb_get_type_index();
1576    Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
1577         lst.list, db->esc_name, ed1,
1578         lst.list, db->esc_name, ed1,
1579         lst.list, lst.list);
1580
1581    Mmsg(db->cmd, "INSERT INTO %s "
1582                    "SELECT JobId, FileIndex, FileId FROM (%s) AS F1",
1583         output_table, query.c_str());
1584
1585    if (!db->bdb_sql_query(db->cmd, NULL, NULL)) {
1586       Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
1587    }
1588 }
1589
1590 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL */