]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
bvfs: Do not insert deleted directories in PathVisibility table
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19
20 #include "bacula.h"
21 #include "cats.h" 
22 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL
23 #include "lib/htable.h"
24 #include "bvfs.h"
25
26 #define can_access(x) (true)
27
28 /* from libbacfind */
29 extern int decode_stat(char *buf, struct stat *statp, int stat_size, int32_t *LinkFI);
30
31 #define dbglevel      DT_BVFS|10
32 #define dbglevel_sql  DT_SQL|15
33
34 static int result_handler(void *ctx, int fields, char **row)
35 {
36    if (fields == 4) {
37       Pmsg4(0, "%s\t%s\t%s\t%s\n",
38             row[0], row[1], row[2], row[3]);
39    } else if (fields == 5) {
40       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n",
41             row[0], row[1], row[2], row[3], row[4]);
42    } else if (fields == 6) {
43       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n",
44             row[0], row[1], row[2], row[3], row[4], row[5]);
45    } else if (fields == 7) {
46       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
47             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
48    }
49    return 0;
50 }
51
52 Bvfs::Bvfs(JCR *j, BDB *mdb)
53 {
54    jcr = j;
55    jcr->inc_use_count();
56    db = mdb;                 /* need to inc ref count */
57    jobids = get_pool_memory(PM_NAME);
58    prev_dir = get_pool_memory(PM_NAME);
59    pattern = get_pool_memory(PM_NAME);
60    filename = get_pool_memory(PM_NAME);
61    tmp = get_pool_memory(PM_NAME);
62    escaped_list = get_pool_memory(PM_NAME);
63    *filename = *jobids = *prev_dir = *pattern = 0;
64    pwd_id = offset = 0;
65    see_copies = see_all_versions = false;
66    compute_delta = true;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71    username = NULL;
72    job_acl = client_acl = pool_acl = fileset_acl = NULL;
73    last_dir_acl = NULL;
74    dir_acl = NULL;
75    use_acl = false;
76    dir_filenameid = 0;       /* special FilenameId where Name='' */
77 }
78
79 Bvfs::~Bvfs() {
80    free_pool_memory(jobids);
81    free_pool_memory(pattern);
82    free_pool_memory(prev_dir);
83    free_pool_memory(filename);
84    free_pool_memory(tmp);
85    free_pool_memory(escaped_list);
86    if (username) {
87       free(username);
88    }
89    free_attr(attr);
90    jcr->dec_use_count();
91    if (dir_acl) {
92       delete dir_acl;
93    }
94 }
95
96 char *Bvfs::escape_list(alist *lst)
97 {
98    char *elt;
99    int len;
100
101    /* List is empty, reject everything */
102    if (!lst || lst->size() == 0) {
103       Mmsg(escaped_list, "''");
104       return escaped_list;
105    }
106
107    *tmp = 0;
108    *escaped_list = 0;
109
110    foreach_alist(elt, lst) {
111       if (elt && *elt) {
112          len = strlen(elt);
113          /* Escape + ' ' */
114          tmp = check_pool_memory_size(tmp, 2 * len + 2 + 2);
115
116          tmp[0] = '\'';
117          db->bdb_escape_string(jcr, tmp + 1 , elt, len);
118          pm_strcat(tmp, "'");
119
120          if (*escaped_list) {
121             pm_strcat(escaped_list, ",");
122          }
123
124          pm_strcat(escaped_list, tmp);
125       }
126    }
127    return escaped_list;
128 }
129
130 /* Returns the number of jobids in the result */
131 int Bvfs::filter_jobid()
132 {
133    POOL_MEM query;
134    POOL_MEM sub_where;
135    POOL_MEM sub_join;
136
137    /* No ACL, no username, no check */
138    if (!job_acl && !fileset_acl && !client_acl && !pool_acl && !username) {
139       Dmsg0(dbglevel_sql, "No ACL\n");
140       /* Just count the number of items in the list */
141       int nb = (*jobids != 0) ? 1 : 0;
142       for (char *p=jobids; *p ; p++) {
143          if (*p == ',') {
144             nb++;
145          }
146       }
147       return nb;
148    }
149
150    if (job_acl) {
151       Mmsg(sub_where, " AND Job.Name IN (%s) ", escape_list(job_acl));
152    }
153
154    if (fileset_acl) {
155       Mmsg(query, " AND FileSet.FileSet IN (%s) ", escape_list(fileset_acl));
156       pm_strcat(sub_where, query.c_str());
157       pm_strcat(sub_join, " JOIN FileSet USING (FileSetId) ");
158    }
159
160    if (client_acl) {
161       Mmsg(query, " AND Client.Name IN (%s) ", escape_list(client_acl));
162       pm_strcat(sub_where, query.c_str());
163    }
164
165    if (pool_acl) {
166       Mmsg(query, " AND Pool.Name IN (%s) ", escape_list(pool_acl));
167       pm_strcat(sub_where, query.c_str());
168       pm_strcat(sub_join, " JOIN Pool USING (PoolId) ");
169    }
170
171    if (username) {
172       /* Query used by Bweb to filter clients, activated when using
173        * set_username()
174        */
175       Mmsg(query,
176       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
177         "JOIN (SELECT ClientId FROM client_group_member "
178         "JOIN client_group USING (client_group_id) "
179         "JOIN bweb_client_group_acl USING (client_group_id) "
180         "JOIN bweb_user USING (userid) "
181        "WHERE bweb_user.username = '%s' "
182       ") AS filter USING (ClientId) "
183         " WHERE JobId IN (%s) %s",
184            sub_join.c_str(), username, jobids, sub_where.c_str());
185
186    } else {
187       Mmsg(query,
188       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
189       " WHERE JobId IN (%s) %s",
190            sub_join.c_str(), jobids, sub_where.c_str());
191    }
192
193    db_list_ctx ctx;
194    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
195    db->bdb_sql_query(query.c_str(), db_list_handler, &ctx);
196    pm_strcpy(jobids, ctx.list);
197    return ctx.count;
198 }
199
200 /* Return the number of jobids after the filter */
201 int Bvfs::set_jobid(JobId_t id)
202 {
203    Mmsg(jobids, "%lld", (uint64_t)id);
204    return filter_jobid();
205 }
206
207 /* Return the number of jobids after the filter */
208 int Bvfs::set_jobids(char *ids)
209 {
210    pm_strcpy(jobids, ids);
211    return filter_jobid();
212 }
213
214 /*
215  * TODO: Find a way to let the user choose how he wants to display
216  * files and directories
217  */
218
219
220 /*
221  * Working Object to store PathId already seen (avoid
222  * database queries), equivalent to %cache_ppathid in perl
223  */
224
225 #define NITEMS 50000
226 class pathid_cache {
227 private:
228    hlink *nodes;
229    int nb_node;
230    int max_node;
231
232    alist *table_node;
233
234    htable *cache_ppathid;
235
236 public:
237    pathid_cache() {
238       hlink link;
239       cache_ppathid = (htable *)malloc(sizeof(htable));
240       cache_ppathid->init(&link, &link, NITEMS);
241       max_node = NITEMS;
242       nodes = (hlink *) malloc(max_node * sizeof (hlink));
243       nb_node = 0;
244       table_node = New(alist(5, owned_by_alist));
245       table_node->append(nodes);
246    };
247
248    hlink *get_hlink() {
249       if (++nb_node >= max_node) {
250          nb_node = 0;
251          nodes = (hlink *)malloc(max_node * sizeof(hlink));
252          table_node->append(nodes);
253       }
254       return nodes + nb_node;
255    };
256
257    bool lookup(char *pathid) {
258       bool ret = cache_ppathid->lookup(pathid) != NULL;
259       return ret;
260    };
261
262    void insert(char *pathid) {
263       hlink *h = get_hlink();
264       cache_ppathid->insert(pathid, h);
265    };
266
267    ~pathid_cache() {
268       cache_ppathid->destroy();
269       free(cache_ppathid);
270       delete table_node;
271    };
272 private:
273    pathid_cache(const pathid_cache &); /* prohibit pass by value */
274    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
275 } ;
276
277 /* Return the parent_dir with the trailing /  (update the given string)
278  * TODO: see in the rest of bacula if we don't have already this function
279  * dir=/tmp/toto/
280  * dir=/tmp/
281  * dir=/
282  * dir=
283  */
284 char *bvfs_parent_dir(char *path)
285 {
286    char *p = path;
287    int len = strlen(path) - 1;
288
289    /* windows directory / */
290    if (len == 2 && B_ISALPHA(path[0])
291                 && path[1] == ':'
292                 && path[2] == '/')
293    {
294       len = 0;
295       path[0] = '\0';
296    }
297
298    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
299       path[len] = '\0';
300    }
301
302    if (len > 0) {
303       p += len;
304       while (p > path && !IsPathSeparator(*p)) {
305          p--;
306       }
307       p[1] = '\0';
308    }
309    return path;
310 }
311
312 /* Return the basename of the with the trailing /
313  * TODO: see in the rest of bacula if we don't have
314  * this function already
315  */
316 char *bvfs_basename_dir(char *path)
317 {
318    char *p = path;
319    int len = strlen(path) - 1;
320
321    if (path[len] == '/') {      /* if directory, skip last / */
322       len -= 1;
323    }
324
325    if (len > 0) {
326       p += len;
327       while (p > path && !IsPathSeparator(*p)) {
328          p--;
329       }
330       if (*p == '/') {
331          p++;                  /* skip first / */
332       }
333    }
334    return p;
335 }
336
337 static void build_path_hierarchy(JCR *jcr, BDB *mdb,
338                                  pathid_cache &ppathid_cache,
339                                  char *org_pathid, char *path)
340 {
341    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
342    char pathid[50];
343    ATTR_DBR parent;
344    char *bkp = mdb->path;
345    strncpy(pathid, org_pathid, sizeof(pathid));
346
347    /* Does the ppathid exist for this ? we use a memory cache...  In order to
348     * avoid the full loop, we consider that if a dir is allready in the
349     * PathHierarchy table, then there is no need to calculate all the
350     * hierarchy
351     */
352    while (path && *path)
353    {
354       if (!ppathid_cache.lookup(pathid))
355       {
356          Mmsg(mdb->cmd,
357               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
358               pathid);
359
360          if (!mdb->QueryDB(jcr, mdb->cmd)) {
361             goto bail_out;      /* Query failed, just leave */
362          }
363
364          /* Do we have a result ? */
365          if (mdb->sql_num_rows() > 0) {
366             ppathid_cache.insert(pathid);
367             /* This dir was in the db ...
368              * It means we can leave, the tree has allready been built for
369              * this dir
370              */
371             goto bail_out;
372          } else {
373             /* search or create parent PathId in Path table */
374             mdb->path = bvfs_parent_dir(path);
375             mdb->pnl = strlen(mdb->path);
376             if (!mdb->bdb_create_path_record(jcr, &parent)) {
377                goto bail_out;
378             }
379             ppathid_cache.insert(pathid);
380
381             Mmsg(mdb->cmd,
382                  "INSERT INTO PathHierarchy (PathId, PPathId) "
383                  "VALUES (%s,%lld)",
384                  pathid, (uint64_t) parent.PathId);
385
386             if (!mdb->InsertDB(jcr, mdb->cmd)) {
387                goto bail_out;   /* Can't insert the record, just leave */
388             }
389
390             edit_uint64(parent.PathId, pathid);
391             path = mdb->path;   /* already done */
392          }
393       } else {
394          /* It's already in the cache.  We can leave, no time to waste here,
395           * all the parent dirs have allready been done
396           */
397          goto bail_out;
398       }
399    }
400
401 bail_out:
402    mdb->path = bkp;
403    mdb->fnl = 0;
404 }
405
406 /*
407  * Internal function to update path_hierarchy cache with a shared pathid cache
408  * return Error 0
409  *        OK    1
410  */
411 static int update_path_hierarchy_cache(JCR *jcr,
412                                         BDB *mdb,
413                                         pathid_cache &ppathid_cache,
414                                         JobId_t JobId)
415 {
416    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
417    uint32_t ret=0;
418    uint32_t num;
419    char jobid[50];
420    edit_uint64(JobId, jobid);
421
422    mdb->bdb_lock();
423
424    /* We don't really want to harm users with spurious messages,
425     * everything is handled by transaction
426     */
427    mdb->set_use_fatal_jmsg(false);
428
429    mdb->bdb_start_transaction(jcr);
430
431    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
432
433    if (!mdb->QueryDB(jcr, mdb->cmd) || mdb->sql_num_rows() > 0) {
434       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
435       ret = 1;
436       goto bail_out;
437    }
438
439    /* Inserting path records for JobId */
440    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
441                    "SELECT DISTINCT PathId, JobId "
442                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s AND FileIndex <> 0 "
443                            "UNION "
444                            "SELECT PathId, BaseFiles.JobId "
445                              "FROM BaseFiles JOIN File AS F USING (FileId) "
446                             "WHERE BaseFiles.JobId = %s) AS B",
447         jobid, jobid);
448
449    if (!mdb->QueryDB(jcr, mdb->cmd)) {
450       Dmsg1(dbglevel, "Can't fill PathVisibility %d\n", (uint32_t)JobId );
451       goto bail_out;
452    }
453
454    /* Now we have to do the directory recursion stuff to determine missing
455     * visibility We try to avoid recursion, to be as fast as possible We also
456     * only work on not allready hierarchised directories...
457     */
458    Mmsg(mdb->cmd,
459      "SELECT PathVisibility.PathId, Path "
460        "FROM PathVisibility "
461             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
462             "LEFT JOIN PathHierarchy "
463          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
464       "WHERE PathVisibility.JobId = %s "
465         "AND PathHierarchy.PathId IS NULL "
466       "ORDER BY Path", jobid);
467    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
468
469    if (!mdb->QueryDB(jcr, mdb->cmd)) {
470       Dmsg1(dbglevel, "Can't get new Path %d\n", (uint32_t)JobId );
471       goto bail_out;
472    }
473
474    /* TODO: I need to reuse the DB connection without emptying the result
475     * So, now i'm copying the result in memory to be able to query the
476     * catalog descriptor again.
477     */
478    num = mdb->sql_num_rows();
479    if (num > 0) {
480       char **result = (char **)malloc (num * 2 * sizeof(char *));
481
482       SQL_ROW row;
483       int i=0;
484       while((row = mdb->sql_fetch_row())) {
485          result[i++] = bstrdup(row[0]);
486          result[i++] = bstrdup(row[1]);
487       }
488
489       i=0;
490       while (num > 0) {
491          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
492          free(result[i++]);
493          free(result[i++]);
494          num--;
495       }
496       free(result);
497    }
498
499    if (mdb->bdb_get_type_index() == SQL_TYPE_SQLITE3) {
500       Mmsg(mdb->cmd,
501  "INSERT INTO PathVisibility (PathId, JobId) "
502    "SELECT DISTINCT h.PPathId AS PathId, %s "
503      "FROM PathHierarchy AS h "
504     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
505       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
506            jobid, jobid, jobid );
507
508    } else if (mdb->bdb_get_type_index() == SQL_TYPE_MYSQL) {
509       Mmsg(mdb->cmd,
510   "INSERT INTO PathVisibility (PathId, JobId)  "
511    "SELECT a.PathId,%s "
512    "FROM ( "
513      "SELECT DISTINCT h.PPathId AS PathId "
514        "FROM PathHierarchy AS h "
515        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
516       "WHERE p.JobId=%s) AS a "
517       "LEFT JOIN PathVisibility AS b ON (b.JobId=%s and a.PathId = b.PathId) "
518       "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
519
520    } else {
521       Mmsg(mdb->cmd,
522   "INSERT INTO PathVisibility (PathId, JobId)  "
523    "SELECT a.PathId,%s "
524    "FROM ( "
525      "SELECT DISTINCT h.PPathId AS PathId "
526        "FROM PathHierarchy AS h "
527        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
528       "WHERE p.JobId=%s) AS a LEFT JOIN "
529        "(SELECT PathId "
530           "FROM PathVisibility "
531          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
532    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
533    }
534
535    do {
536       ret = mdb->QueryDB(jcr, mdb->cmd);
537    } while (ret && mdb->sql_affected_rows() > 0);
538
539    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
540    ret = mdb->UpdateDB(jcr, mdb->cmd, false);
541
542 bail_out:
543    mdb->bdb_end_transaction(jcr);
544
545    if (!ret) {
546       Mmsg(mdb->cmd, "SELECT HasCache FROM Job WHERE JobId=%s", jobid);
547       mdb->bdb_sql_query(mdb->cmd, db_int_handler, &ret);
548    }
549
550    /* Enable back the FATAL message if something is wrong */
551    mdb->set_use_fatal_jmsg(true);
552
553    mdb->bdb_unlock();
554    return ret;
555 }
556
557 /* Compute the cache for the bfileview compoment */
558 void Bvfs::fv_update_cache()
559 {
560    int64_t pathid;
561    int64_t size=0, count=0;
562
563    Dmsg0(dbglevel, "fv_update_cache()\n");
564
565    if (!*jobids) {
566       return;                   /* Nothing to build */
567    }
568
569    db->bdb_lock();
570    /* We don't really want to harm users with spurious messages,
571     * everything is handled by transaction
572     */
573    db->set_use_fatal_jmsg(false);
574
575    db->bdb_start_transaction(jcr);
576
577    pathid = get_root();
578
579    fv_compute_size_and_count(pathid, &size, &count);
580
581    db->bdb_end_transaction(jcr);
582
583    /* Enable back the FATAL message if something is wrong */
584    db->set_use_fatal_jmsg(true);
585
586    db->bdb_unlock();
587 }
588
589 /* 
590  * Find an store the filename descriptor for empty directories Filename.Name=''
591  */
592 DBId_t Bvfs::get_dir_filenameid()
593 {
594    uint32_t id;
595    if (dir_filenameid) {
596       return dir_filenameid;
597    }
598    Mmsg(db->cmd, "SELECT FilenameId FROM Filename WHERE Name = ''");
599    db_sql_query(db, db->cmd, db_int_handler, &id);
600    dir_filenameid = id;
601    return dir_filenameid;
602 }
603
604 /* Not yet working */
605 void Bvfs::fv_get_big_files(int64_t pathid, int64_t min_size, int32_t limit)
606 {
607    Mmsg(db->cmd, 
608         "SELECT FilenameId AS filenameid, Name AS name, size "
609           "FROM ( "
610          "SELECT FilenameId, base64_decode_lstat(8,LStat) AS size "
611            "FROM File "
612           "WHERE PathId  = %lld "
613             "AND JobId = %s "
614         ") AS S INNER JOIN Filename USING (FilenameId) "
615      "WHERE S.size > %lld "
616      "ORDER BY S.size DESC "
617      "LIMIT %d ", pathid, jobids, min_size, limit);
618 }
619
620
621 /* Get the current path size and files count */
622 void Bvfs::fv_get_current_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
623 {
624    SQL_ROW row;
625
626    *size = *count = 0;
627
628    Mmsg(db->cmd,
629  "SELECT Size AS size, Files AS files "
630   " FROM PathVisibility "
631  " WHERE PathId = %lld "
632    " AND JobId = %s ", pathid, jobids);
633
634    if (!db->QueryDB(jcr, db->cmd)) {
635       return;
636    }
637
638    if ((row = db->sql_fetch_row())) {
639       *size = str_to_int64(row[0]);
640       *count =  str_to_int64(row[1]);
641    }
642 }
643
644 /* Compute for the current path the size and files count */
645 void Bvfs::fv_get_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
646 {
647    SQL_ROW row;
648
649    *size = *count = 0;
650
651    Mmsg(db->cmd,
652  "SELECT sum(base64_decode_lstat(8,LStat)) AS size, count(1) AS files "
653   " FROM File "
654  " WHERE PathId = %lld "
655    " AND JobId = %s ", pathid, jobids);
656
657    if (!db->QueryDB(jcr, db->cmd)) {
658       return;
659    }
660
661    if ((row = db->sql_fetch_row())) {
662       *size = str_to_int64(row[0]);
663       *count =  str_to_int64(row[1]);
664    }
665 }
666
667 void Bvfs::fv_compute_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
668 {
669    Dmsg1(dbglevel, "fv_compute_size_and_count(%lld)\n", pathid);
670
671    fv_get_current_size_and_count(pathid, size, count);
672    if (*size > 0) {
673       return;
674    }
675
676    /* Update stats for the current directory */
677    fv_get_size_and_count(pathid, size, count);
678
679    /* Update stats for all sub directories */
680    Mmsg(db->cmd,
681         " SELECT PathId "
682           " FROM PathVisibility "
683                " INNER JOIN PathHierarchy USING (PathId) "
684          " WHERE PPathId  = %lld "
685            " AND JobId = %s ", pathid, jobids);
686
687    db->QueryDB(jcr, db->cmd);
688    int num = db->sql_num_rows();
689
690    if (num > 0) {
691       int64_t *result = (int64_t *)malloc (num * sizeof(int64_t));
692       SQL_ROW row;
693       int i=0;
694
695       while((row = db->sql_fetch_row())) {
696          result[i++] = str_to_int64(row[0]); /* PathId */
697       }
698
699       i=0;
700       while (num > 0) {
701          int64_t c=0, s=0;
702          fv_compute_size_and_count(result[i], &s, &c);
703          *size += s;
704          *count += c;
705
706          i++;
707          num--;
708       }
709       free(result);
710    }
711
712    fv_update_size_and_count(pathid, *size, *count);
713 }
714
715 void Bvfs::fv_update_size_and_count(int64_t pathid, int64_t size, int64_t count)
716 {
717    Mmsg(db->cmd,
718         "UPDATE PathVisibility SET Files = %lld, Size = %lld "
719         " WHERE JobId = %s "
720         " AND PathId = %lld ", count, size, jobids, pathid);
721
722    db->UpdateDB(jcr, db->cmd, false);
723 }
724
725 void bvfs_update_cache(JCR *jcr, BDB *mdb)
726 {
727    uint32_t nb=0;
728    db_list_ctx jobids_list;
729
730    mdb->bdb_lock();
731
732 #ifdef xxx
733    /* TODO: Remove this code when updating make_bacula_table script */
734    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
735    if (!mdb->QueryDB(jcr, mdb->cmd)) {
736       Dmsg0(dbglevel, "Creating cache table\n");
737       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
738       mdb->QueryDB(jcr, mdb->cmd);
739
740       Mmsg(mdb->cmd,
741            "CREATE TABLE PathHierarchy ( "
742            "PathId integer NOT NULL, "
743            "PPathId integer NOT NULL, "
744            "CONSTRAINT pathhierarchy_pkey "
745            "PRIMARY KEY (PathId))");
746       mdb->QueryDB(jcr, mdb->cmd);
747
748       Mmsg(mdb->cmd,
749            "CREATE INDEX pathhierarchy_ppathid "
750            "ON PathHierarchy (PPathId)");
751       mdb->QueryDB(jcr, mdb->cmd);
752
753       Mmsg(mdb->cmd,
754            "CREATE TABLE PathVisibility ("
755            "PathId integer NOT NULL, "
756            "JobId integer NOT NULL, "
757            "Size int8 DEFAULT 0, "
758            "Files int4 DEFAULT 0, "
759            "CONSTRAINT pathvisibility_pkey "
760            "PRIMARY KEY (JobId, PathId))");
761       mdb->QueryDB(jcr, mdb->cmd);
762
763       Mmsg(mdb->cmd,
764            "CREATE INDEX pathvisibility_jobid "
765            "ON PathVisibility (JobId)");
766       mdb->QueryDB(jcr, mdb->cmd);
767
768    }
769 #endif
770
771    Mmsg(mdb->cmd,
772  "SELECT JobId from Job "
773   "WHERE HasCache = 0 "
774     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
775   "ORDER BY JobId");
776
777    mdb->bdb_sql_query(mdb->cmd, db_list_handler, &jobids_list);
778
779    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
780
781    mdb->bdb_start_transaction(jcr);
782    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
783    Mmsg(mdb->cmd,
784         "DELETE FROM PathVisibility "
785          "WHERE NOT EXISTS "
786         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
787    nb = mdb->DeleteDB(jcr, mdb->cmd);
788    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
789
790    mdb->bdb_end_transaction(jcr);
791    mdb->bdb_unlock();
792 }
793
794 /*
795  * Update the bvfs cache for given jobids (1,2,3,4)
796  */
797 int
798 bvfs_update_path_hierarchy_cache(JCR *jcr, BDB *mdb, char *jobids)
799 {
800    pathid_cache ppathid_cache;
801    JobId_t JobId;
802    char *p;
803    int ret=1;
804
805    for (p=jobids; ; ) {
806       int stat = get_next_jobid_from_list(&p, &JobId);
807       if (stat < 0) {
808          ret = 0;
809          break;
810       }
811       if (stat == 0) {
812          break;
813       }
814       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
815       if (!update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId)) {
816          ret = 0;
817       }
818    }
819    return ret;
820 }
821
822 /*
823  * Update the bvfs fileview for given jobids
824  */
825 void
826 bvfs_update_fv_cache(JCR *jcr, BDB *mdb, char *jobids)
827 {
828    char *p;
829    JobId_t JobId;
830    Bvfs bvfs(jcr, mdb);
831
832    for (p=jobids; ; ) {
833       int stat = get_next_jobid_from_list(&p, &JobId);
834       if (stat < 0) {
835          return;
836       }
837       if (stat == 0) {
838          break;
839       }
840
841       Dmsg1(dbglevel, "Trying to create cache for %lld\n", (int64_t)JobId);
842
843       bvfs.set_jobid(JobId);
844       bvfs.fv_update_cache();
845    }
846 }
847
848 /*
849  * Update the bvfs cache for current jobids
850  */
851 void Bvfs::update_cache()
852 {
853    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
854 }
855
856
857 bool Bvfs::ch_dir(DBId_t pathid)
858 {
859    reset_offset();
860
861    pwd_id = pathid;
862    return pwd_id != 0;
863 }
864
865
866 /* Change the current directory, returns true if the path exists */
867 bool Bvfs::ch_dir(const char *path)
868 {
869    db->bdb_lock();
870    pm_strcpy(db->path, path);
871    db->pnl = strlen(db->path);
872    ch_dir(db->bdb_get_path_record(jcr));
873    db->bdb_unlock();
874    return pwd_id != 0;
875 }
876
877 /*
878  * Get all file versions for a specified client
879  * TODO: Handle basejobs using different client
880  */
881 void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, const char *client)
882 {
883    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
884          (uint64_t)fnid, client);
885    char ed1[50], ed2[50];
886    POOL_MEM q;
887    if (see_copies) {
888       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
889    } else {
890       Mmsg(q, " AND Job.Type = 'B' ");
891    }
892
893    POOL_MEM query;
894
895    Mmsg(query,//    1           2           3      4 
896 "SELECT 'V', File.PathId, File.FilenameId,  0, File.JobId, "
897 //            5           6          7
898         "File.LStat, File.FileId, File.Md5, "
899 //         8                    9
900         "Media.VolumeName, Media.InChanger "
901 "FROM File, Job, Client, JobMedia, Media "
902 "WHERE File.FilenameId = %s "
903   "AND File.PathId=%s "
904   "AND File.JobId = Job.JobId "
905   "AND Job.JobId = JobMedia.JobId "
906   "AND File.FileIndex >= JobMedia.FirstIndex "
907   "AND File.FileIndex <= JobMedia.LastIndex "
908   "AND JobMedia.MediaId = Media.MediaId "
909   "AND Job.ClientId = Client.ClientId "
910   "AND Client.Name = '%s' "
911   "%s ORDER BY FileId LIMIT %d OFFSET %d"
912         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
913         limit, offset);
914    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
915    db->bdb_sql_query(query.c_str(), list_entries, user_data);
916 }
917
918 /*
919  * Get all file versions for a specified client
920  * TODO: Handle basejobs using different client
921  */
922 bool Bvfs::get_delta(FileId_t fileid)
923 {
924    Dmsg1(dbglevel, "get_delta(%lld)\n", (uint64_t)fileid);
925    char ed1[50];
926    int32_t num;
927    SQL_ROW row;
928    POOL_MEM q;
929    POOL_MEM query;
930    char *fn = NULL;
931    bool ret = false;
932    db->bdb_lock();
933
934    /* Check if some FileId have DeltaSeq > 0
935     * Foreach of them we need to get the accurate_job list, and compute
936     * what are dependencies
937     */
938    Mmsg(query,
939         "SELECT F.JobId, FN.Name, F.PathId, F.DeltaSeq "
940         "FROM File AS F, Filename AS FN WHERE FileId = %lld "
941         "AND FN.FilenameId = F.FilenameId AND DeltaSeq > 0", fileid);
942
943    if (!db->QueryDB(jcr, query.c_str())) {
944       Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
945       goto bail_out;
946    }
947
948    /* TODO: Use an other DB connection can avoid to copy the result of the
949     * previous query into a temporary buffer
950     */
951    num = db->sql_num_rows();
952    Dmsg2(dbglevel, "Found %d Delta parts q=%s\n",
953          num, query.c_str());
954
955    if (num > 0 && (row = db->sql_fetch_row())) {
956       JOB_DBR jr, jr2;
957       db_list_ctx lst;
958       memset(&jr, 0, sizeof(jr));
959       memset(&jr2, 0, sizeof(jr2));
960
961       fn = bstrdup(row[1]);               /* Filename */
962       int64_t jid = str_to_int64(row[0]);     /* JobId */
963       int64_t pid = str_to_int64(row[2]);     /* PathId */
964
965       /* Need to limit the query to StartTime, Client/Fileset */
966       jr2.JobId = jid;
967       if (!db->bdb_get_job_record(jcr, &jr2)) {
968          Dmsg1(0, "Unable to get job record for jobid %d\n", jid);
969          goto bail_out;
970       }
971
972       jr.JobId = jid;
973       jr.ClientId = jr2.ClientId;
974       jr.FileSetId = jr2.FileSetId;
975       jr.JobLevel = L_INCREMENTAL;
976       jr.StartTime = jr2.StartTime;
977
978       /* Get accurate jobid list */
979       if (!db->bdb_get_accurate_jobids(jcr, &jr, &lst)) {
980          Dmsg1(0, "Unable to get Accurate list for jobid %d\n", jid);
981          goto bail_out;
982       }
983
984       /* Escape filename */
985       db->fnl = strlen(fn);
986       db->esc_name = check_pool_memory_size(db->esc_name, 2*db->fnl+2);
987       db->bdb_escape_string(jcr, db->esc_name, fn, db->fnl);
988
989       edit_int64(pid, ed1);     /* pathid */
990
991       int id=db->bdb_get_type_index();
992       Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
993            lst.list, db->esc_name, ed1,
994            lst.list, db->esc_name, ed1,
995            lst.list, lst.list);
996
997       Mmsg(db->cmd,
998            //       0     1     2   3      4      5      6            7
999            "SELECT 'd', PathId, 0, JobId, LStat, FileId, DeltaSeq, JobTDate"
1000            " FROM (%s) AS F1 "
1001            "ORDER BY DeltaSeq ASC",
1002            query.c_str());
1003
1004       Dmsg1(dbglevel_sql, "q=%s\n", db->cmd);
1005
1006       if (!db->bdb_sql_query(db->cmd, list_entries, user_data)) {
1007          Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
1008          goto bail_out;
1009       }
1010    }
1011    ret = true;
1012 bail_out:
1013    if (fn) {
1014       free(fn);
1015    }
1016    db->bdb_unlock();
1017    return ret;
1018 }
1019
1020 /*
1021  * Get all volumes for a specific file
1022  */
1023 void Bvfs::get_volumes(FileId_t fileid)
1024 {
1025    Dmsg1(dbglevel, "get_volumes(%lld)\n", (uint64_t)fileid);
1026
1027    char ed1[50];
1028    POOL_MEM query;
1029
1030    Mmsg(query,
1031 //                                   7                8
1032 "SELECT DISTINCT 'L',0,0,0,0,0,0, Media.VolumeName, Media.InChanger "
1033 "FROM File JOIN JobMedia USING (JobId) JOIN Media USING (MediaId) "
1034 "WHERE File.FileId = %s "
1035   "AND File.FileIndex >= JobMedia.FirstIndex "
1036   "AND File.FileIndex <= JobMedia.LastIndex "
1037   " LIMIT %d OFFSET %d"
1038         ,edit_uint64(fileid, ed1), limit, offset);
1039    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1040    db->bdb_sql_query(query.c_str(), list_entries, user_data);
1041 }
1042
1043 DBId_t Bvfs::get_root()
1044 {
1045    int p;
1046    *db->path = 0;
1047    db->bdb_lock();
1048    p = db->bdb_get_path_record(jcr);
1049    db->bdb_unlock();
1050    return p;
1051 }
1052
1053 static int path_handler(void *ctx, int fields, char **row)
1054 {
1055    Bvfs *fs = (Bvfs *) ctx;
1056    return fs->_handle_path(ctx, fields, row);
1057 }
1058
1059 int Bvfs::_handle_path(void *ctx, int fields, char **row)
1060 {
1061    if (bvfs_is_dir(row)) {
1062       /* can have the same path 2 times */
1063       if (strcmp(row[BVFS_PathId], prev_dir)) {
1064          pm_strcpy(prev_dir, row[BVFS_PathId]);
1065          if (strcmp(NPRTB(row[BVFS_FileIndex]), "0") == 0 &&
1066              strcmp(NPRTB(row[BVFS_FileId]), "0") != 0)
1067          {
1068             /* The directory was probably deleted */
1069             return 0;
1070          }
1071          return list_entries(user_data, fields, row);
1072       }
1073    }
1074    return 0;
1075 }
1076
1077 /*
1078  * Retrieve . and .. information
1079  */
1080 void Bvfs::ls_special_dirs()
1081 {
1082    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
1083    char ed1[50], ed2[50];
1084    if (*jobids == 0) {
1085       return;
1086    }
1087    if (!dir_filenameid) {
1088       get_dir_filenameid();
1089    }
1090
1091    /* Will fetch directories  */
1092    *prev_dir = 0;
1093
1094    POOL_MEM query;
1095    Mmsg(query,
1096 "(SELECT PathHierarchy.PPathId AS PathId, '..' AS Path "
1097     "FROM  PathHierarchy JOIN PathVisibility USING (PathId) "
1098    "WHERE  PathHierarchy.PathId = %s "
1099    "AND PathVisibility.JobId IN (%s) "
1100 "UNION "
1101  "SELECT %s AS PathId, '.' AS Path)",
1102         edit_uint64(pwd_id, ed1), jobids, ed1);
1103
1104    POOL_MEM query2;
1105    Mmsg(query2,// 1      2     3        4     5       6
1106 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId, FileIndex "
1107   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
1108        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
1109               "File1.LStat AS LStat, File1.FileId AS FileId, "
1110               "File1.FileIndex AS FileIndex, "
1111               "Job1.JobTDate AS JobTDate "
1112       "FROM File AS File1 JOIN Job AS Job1 USING (JobId)"
1113        "WHERE File1.FilenameId = %s "
1114        "AND File1.JobId IN (%s)) AS listfile1 "
1115   "ON (tmp.PathId = listfile1.PathId) "
1116   "ORDER BY tmp.Path, JobTDate DESC ",
1117         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
1118
1119    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
1120    db->bdb_sql_query(query2.c_str(), path_handler, this);
1121 }
1122
1123 /* Returns true if we have dirs to read */
1124 bool Bvfs::ls_dirs()
1125 {
1126    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
1127    char ed1[50], ed2[50];
1128    if (*jobids == 0) {
1129       return false;
1130    }
1131
1132    POOL_MEM query;
1133    POOL_MEM filter;
1134    if (*pattern) {
1135       Mmsg(filter, " AND Path2.Path %s '%s' ",
1136            match_query[db->bdb_get_type_index()], pattern);
1137
1138    }
1139
1140    if (!dir_filenameid) {
1141       get_dir_filenameid();
1142    }
1143
1144    /* the sql query displays same directory multiple time, take the first one */
1145    *prev_dir = 0;
1146
1147    /* Let's retrieve the list of the visible dirs in this dir ...
1148     * First, I need the empty filenameid to locate efficiently
1149     * the dirs in the file table
1150     * my $dir_filenameid = $self->get_dir_filenameid();
1151     */
1152    /* Then we get all the dir entries from File ... */
1153    Mmsg(query,
1154 //       0     1      2      3      4     5       6
1155 "SELECT 'D', PathId,  0,    Path, JobId, LStat, FileId, FileIndex FROM ( "
1156     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
1157            "lower(Path1.Path) AS lpath, "
1158            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
1159            "listfile1.FileId AS FileId, "
1160            "listfile1.JobTDate AS JobTDate, "
1161            "listfile1.FileIndex AS FileIndex "
1162     "FROM ( "
1163       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
1164       "FROM PathHierarchy AS PathHierarchy1 "
1165       "JOIN Path AS Path2 "
1166         "ON (PathHierarchy1.PathId = Path2.PathId) "
1167       "JOIN PathVisibility AS PathVisibility1 "
1168         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
1169       "WHERE PathHierarchy1.PPathId = %s "
1170       "AND PathVisibility1.JobId IN (%s) "
1171            "%s "
1172      ") AS listpath1 "
1173    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
1174
1175    "LEFT JOIN ( " /* get attributes if any */
1176        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
1177               "File1.LStat AS LStat, File1.FileId AS FileId, "
1178               "File1.FileIndex, Job1.JobTDate AS JobTDate "
1179      "FROM File AS File1 JOIN Job AS Job1 USING (JobId) "
1180        "WHERE File1.FilenameId = %s "
1181        "AND File1.JobId IN (%s)) AS listfile1 "
1182        "ON (listpath1.PathId = listfile1.PathId) "
1183     ") AS A ORDER BY Path,JobTDate DESC LIMIT %d OFFSET %d",
1184         edit_uint64(pwd_id, ed1),
1185         jobids,
1186         filter.c_str(),
1187         edit_uint64(dir_filenameid, ed2),
1188         jobids,
1189         limit, offset);
1190
1191    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1192
1193    db->bdb_lock();
1194    db->bdb_sql_query(query.c_str(), path_handler, this);
1195    nb_record = db->sql_num_rows();
1196    db->bdb_unlock();
1197
1198    return nb_record == limit;
1199 }
1200
1201 void build_ls_files_query(BDB *db, POOL_MEM &query,
1202                           const char *JobId, const char *PathId,
1203                           const char *filter, int64_t limit, int64_t offset)
1204 {
1205    if (db->bdb_get_type_index() == SQL_TYPE_POSTGRESQL) {
1206       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1207            JobId, PathId, JobId, PathId,
1208            filter, limit, offset);
1209    } else {
1210       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1211            JobId, PathId, JobId, PathId,
1212            limit, offset, filter, JobId, JobId);
1213    }
1214 }
1215
1216 /* Returns true if we have files to read */
1217 bool Bvfs::ls_files()
1218 {
1219    POOL_MEM query;
1220    POOL_MEM filter;
1221    char pathid[50];
1222
1223    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
1224    if (*jobids == 0) {
1225       return false;
1226    }
1227
1228    if (!pwd_id) {
1229       ch_dir(get_root());
1230    }
1231
1232    edit_uint64(pwd_id, pathid);
1233    if (*pattern) {
1234       Mmsg(filter, " AND Filename.Name %s '%s' ", 
1235            match_query[db_get_type_index(db)], pattern);
1236
1237    } else if (*filename) {
1238       Mmsg(filter, " AND Filename.Name = '%s' ", filename);
1239    }
1240
1241    build_ls_files_query(db, query,
1242                         jobids, pathid, filter.c_str(),
1243                         limit, offset);
1244
1245    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1246
1247    db->bdb_lock();
1248    db->bdb_sql_query(query.c_str(), list_entries, user_data);
1249    nb_record = db->sql_num_rows();
1250    db->bdb_unlock();
1251
1252    return nb_record == limit;
1253 }
1254
1255
1256 /*
1257  * Return next Id from comma separated list
1258  *
1259  * Returns:
1260  *   1 if next Id returned
1261  *   0 if no more Ids are in list
1262  *  -1 there is an error
1263  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
1264  */
1265 static int get_next_id_from_list(char **p, int64_t *Id)
1266 {
1267    const int maxlen = 30;
1268    char id[maxlen+1];
1269    char *q = *p;
1270
1271    id[0] = 0;
1272    for (int i=0; i<maxlen; i++) {
1273       if (*q == 0) {
1274          break;
1275       } else if (*q == ',') {
1276          q++;
1277          break;
1278       }
1279       id[i] = *q++;
1280       id[i+1] = 0;
1281    }
1282    if (id[0] == 0) {
1283       return 0;
1284    } else if (!is_a_number(id)) {
1285       return -1;                      /* error */
1286    }
1287    *p = q;
1288    *Id = str_to_int64(id);
1289    return 1;
1290 }
1291
1292 static int get_path_handler(void *ctx, int fields, char **row)
1293 {
1294    POOL_MEM *buf = (POOL_MEM *) ctx;
1295    pm_strcpy(*buf, row[0]);
1296    return 0;
1297 }
1298
1299 static bool check_temp(char *output_table)
1300 {
1301    if (output_table[0] == 'b' &&
1302        output_table[1] == '2' &&
1303        is_an_integer(output_table + 2))
1304    {
1305       return true;
1306    }
1307    return false;
1308 }
1309
1310 void Bvfs::clear_cache()
1311 {
1312    db->bdb_sql_query("BEGIN",                     NULL, NULL);
1313    db->bdb_sql_query("UPDATE Job SET HasCache=0", NULL, NULL);
1314    db->bdb_sql_query("TRUNCATE PathHierarchy",    NULL, NULL);
1315    db->bdb_sql_query("TRUNCATE PathVisibility",   NULL, NULL);
1316    db->bdb_sql_query("COMMIT",                    NULL, NULL);
1317 }
1318
1319 bool Bvfs::drop_restore_list(char *output_table)
1320 {
1321    POOL_MEM query;
1322    if (check_temp(output_table)) {
1323       Mmsg(query, "DROP TABLE %s", output_table);
1324       db->bdb_sql_query(query.c_str(), NULL, NULL);
1325       return true;
1326    }
1327    return false;
1328 }
1329
1330 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
1331                                 char *output_table)
1332 {
1333    POOL_MEM query;
1334    POOL_MEM tmp, tmp2;
1335    int64_t id, jobid, prev_jobid;
1336    int num;
1337    bool init=false;
1338    bool ret=false;
1339    /* check args */
1340    if ((*fileid   && !is_a_number_list(fileid))  ||
1341        (*dirid    && !is_a_number_list(dirid))   ||
1342        (*hardlink && !is_a_number_list(hardlink))||
1343        (!*hardlink && !*fileid && !*dirid && !*hardlink))
1344    {
1345       Dmsg0(dbglevel, "ERROR: One or more of FileId, DirId or HardLink is not given or not a number.\n");
1346       return false;
1347    }
1348    if (!check_temp(output_table)) {
1349       Dmsg0(dbglevel, "ERROR: Wrong format for table name (in path field).\n");
1350       return false;
1351    }
1352
1353    db->bdb_lock();
1354
1355    /* Cleanup old tables first */
1356    Mmsg(query, "DROP TABLE btemp%s", output_table);
1357    db->bdb_sql_query(query.c_str());
1358
1359    Mmsg(query, "DROP TABLE %s", output_table);
1360    db->bdb_sql_query(query.c_str());
1361
1362    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
1363
1364    if (*fileid) {               /* Select files with their direct id */
1365       init=true;
1366       Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1367                       "PathId, FileId "
1368                  "FROM File,Job WHERE Job.JobId=File.Jobid "
1369                  "AND FileId IN (%s)",
1370            fileid);
1371       pm_strcat(query, tmp.c_str());
1372    }
1373
1374    /* Add a directory content */
1375    while (get_next_id_from_list(&dirid, &id) == 1) {
1376       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
1377
1378       if (!db->bdb_sql_query(tmp.c_str(), get_path_handler, (void *)&tmp2)) {
1379          Dmsg3(dbglevel, "ERROR: Path not found %lld q=%s s=%s\n",
1380                id, tmp.c_str(), tmp2.c_str());
1381          /* print error */
1382          goto bail_out;
1383       }
1384       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
1385          Dmsg3(dbglevel, "ERROR: Path not found %lld q=%s s=%s\n",
1386                id, tmp.c_str(), tmp2.c_str());
1387          break;
1388       }
1389       /* escape % and _ for LIKE search */
1390       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
1391       char *p = tmp.c_str();
1392       for (char *s = tmp2.c_str(); *s ; s++) {
1393          if (*s == '%' || *s == '_' || *s == '\\') {
1394             *p = '\\';
1395             p++;
1396          }
1397          *p = *s;
1398          p++;
1399       }
1400       *p = '\0';
1401       tmp.strcat("%");
1402
1403       size_t len = strlen(tmp.c_str());
1404       tmp2.check_size((len+1) * 2);
1405       db->bdb_escape_string(jcr, tmp2.c_str(), tmp.c_str(), len);
1406
1407       if (init) {
1408          query.strcat(" UNION ");
1409       }
1410
1411       Mmsg(tmp, "SELECT Job.JobId, JobTDate, File.FileIndex, File.FilenameId, "
1412                         "File.PathId, FileId "
1413                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
1414                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ",
1415            tmp2.c_str(), jobids);
1416       query.strcat(tmp.c_str());
1417       init = true;
1418
1419       query.strcat(" UNION ");
1420
1421       /* A directory can have files from a BaseJob */
1422       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
1423                         "File.FilenameId, File.PathId, BaseFiles.FileId "
1424                    "FROM BaseFiles "
1425                         "JOIN File USING (FileId) "
1426                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
1427                         "JOIN Path USING (PathId) "
1428                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ",
1429            tmp2.c_str(), jobids);
1430       query.strcat(tmp.c_str());
1431    }
1432
1433    /* expect jobid,fileindex */
1434    prev_jobid=0;
1435    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
1436       if (get_next_id_from_list(&hardlink, &id) != 1) {
1437          Dmsg0(dbglevel, "ERROR: hardlink should be two by two\n");
1438          goto bail_out;
1439       }
1440       if (jobid != prev_jobid) { /* new job */
1441          if (prev_jobid == 0) {  /* first jobid */
1442             if (init) {
1443                query.strcat(" UNION ");
1444             }
1445          } else {               /* end last job, start new one */
1446             tmp.strcat(") UNION ");
1447             query.strcat(tmp.c_str());
1448          }
1449          Mmsg(tmp,   "SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1450                             "PathId, FileId "
1451                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld "
1452                         "AND FileIndex IN (%lld", jobid, id);
1453          prev_jobid = jobid;
1454
1455       } else {                  /* same job, add new findex */
1456          Mmsg(tmp2, ", %lld", id);
1457          tmp.strcat(tmp2.c_str());
1458       }
1459    }
1460
1461    if (prev_jobid != 0) {       /* end last job */
1462       tmp.strcat(") ");
1463       query.strcat(tmp.c_str());
1464       init = true;
1465    }
1466
1467    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1468
1469    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1470       Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
1471       goto bail_out;
1472    }
1473
1474    Mmsg(query, sql_bvfs_select[db->bdb_get_type_index()],
1475         output_table, output_table, output_table);
1476
1477    /* TODO: handle jobid filter */
1478    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1479    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1480       Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
1481       goto bail_out;
1482    }
1483
1484    /* MySQL needs the index */
1485    if (db->bdb_get_type_index() == SQL_TYPE_MYSQL) {
1486       Mmsg(query, "CREATE INDEX idx_%s ON %s (JobId)",
1487            output_table, output_table);
1488       Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1489       if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1490          Dmsg1(dbglevel, "ERROR executing query=%s\n", query.c_str());
1491          goto bail_out; 
1492       } 
1493    }
1494
1495    /* Check if some FileId have DeltaSeq > 0
1496     * Foreach of them we need to get the accurate_job list, and compute
1497     * what are dependencies
1498     */
1499    Mmsg(query, 
1500         "SELECT F.FileId, F.JobId, F.FilenameId, F.PathId, F.DeltaSeq "
1501           "FROM File AS F JOIN Job USING (JobId) JOIN %s USING (FileId) "
1502          "WHERE DeltaSeq > 0", output_table);
1503
1504    if (!db->QueryDB(jcr, query.c_str())) {
1505       Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
1506    }
1507
1508    /* TODO: Use an other DB connection can avoid to copy the result of the
1509     * previous query into a temporary buffer
1510     */
1511    num = db->sql_num_rows();
1512    Dmsg2(dbglevel, "Found %d Delta parts in restore selection q=%s\n", num, query.c_str());
1513
1514    if (num > 0) {
1515       int64_t *result = (int64_t *)malloc (num * 4 * sizeof(int64_t));
1516       SQL_ROW row;
1517       int i=0;
1518
1519       while((row = db->sql_fetch_row())) {
1520          result[i++] = str_to_int64(row[0]); /* FileId */
1521          result[i++] = str_to_int64(row[1]); /* JobId */
1522          result[i++] = str_to_int64(row[2]); /* FilenameId */
1523          result[i++] = str_to_int64(row[3]); /* PathId */
1524       } 
1525
1526       i=0;
1527       while (num > 0) {
1528          insert_missing_delta(output_table, result + i);
1529          i += 4;
1530          num--;
1531       }
1532       free(result);
1533    }
1534
1535    ret = true;
1536
1537 bail_out:
1538    Mmsg(query, "DROP TABLE btemp%s", output_table);
1539    db->bdb_sql_query(query.c_str(), NULL, NULL);
1540    db->bdb_unlock();
1541    return ret;
1542 }
1543
1544 void Bvfs::insert_missing_delta(char *output_table, int64_t *res)
1545 {
1546    char ed1[50];
1547    db_list_ctx lst;
1548    POOL_MEM query;
1549    JOB_DBR jr, jr2;
1550    memset(&jr, 0, sizeof(jr));
1551    memset(&jr2, 0, sizeof(jr2));
1552
1553    /* Need to limit the query to StartTime, Client/Fileset */
1554    jr2.JobId = res[1];
1555    db->bdb_get_job_record(jcr, &jr2);
1556
1557    jr.JobId = res[1];
1558    jr.ClientId = jr2.ClientId;
1559    jr.FileSetId = jr2.FileSetId;
1560    jr.JobLevel = L_INCREMENTAL;
1561    jr.StartTime = jr2.StartTime;
1562
1563    /* Get accurate jobid list */
1564    db->bdb_get_accurate_jobids(jcr, &jr, &lst);
1565
1566    Dmsg2(dbglevel_sql, "JobId list for %lld is %s\n", res[0], lst.list);
1567
1568    /* The list contains already the last DeltaSeq element, so
1569     * we don't need to select it in the next query
1570     */
1571    for (int l = strlen(lst.list); l > 0; l--) {
1572       if (lst.list[l] == ',') {
1573          lst.list[l] = '\0';
1574          break;
1575       }
1576    }
1577
1578    Dmsg1(dbglevel_sql, "JobId list after strip is %s\n", lst.list);
1579
1580    /* Escape filename */
1581    db->fnl = strlen((char *)res[2]);
1582    db->esc_name = check_pool_memory_size(db->esc_name, 2*db->fnl+2);
1583    db->bdb_escape_string(jcr, db->esc_name, (char *)res[2], db->fnl);
1584
1585    edit_int64(res[3], ed1);     /* pathid */
1586
1587    int id=db->bdb_get_type_index();
1588    Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
1589         lst.list, db->esc_name, ed1,
1590         lst.list, db->esc_name, ed1,
1591         lst.list, lst.list);
1592
1593    Mmsg(db->cmd, "INSERT INTO %s "
1594                    "SELECT JobId, FileIndex, FileId FROM (%s) AS F1",
1595         output_table, query.c_str());
1596
1597    if (!db->bdb_sql_query(db->cmd, NULL, NULL)) {
1598       Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
1599    }
1600 }
1601
1602 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL */