]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
403f063c8de332ecc490ed24733c613fed172795
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5    Copyright (C) 2009-2014 Free Software Foundation Europe e.V.
6
7    The original author of Bacula is Kern Sibbald, with contributions
8    from many others, a complete list can be found in the file AUTHORS.
9
10    You may use this file and others of this release according to the
11    license defined in the LICENSE file, which includes the Affero General
12    Public License, v3.0 ("AGPLv3") and some additional permissions and
13    terms pursuant to its AGPLv3 Section 7.
14
15    This notice must be preserved when any source code is 
16    conveyed and/or propagated.
17
18    Bacula(R) is a registered trademark of Kern Sibbald.
19 */
20
21 #include "bacula.h"
22 #include "cats.h" 
23 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL
24 #include "lib/htable.h"
25 #include "bvfs.h"
26
27 #define dbglevel      DT_BVFS|10
28 #define dbglevel_sql  DT_SQL|15
29
30 static int result_handler(void *ctx, int fields, char **row)
31 {
32    if (fields == 4) {
33       Pmsg4(0, "%s\t%s\t%s\t%s\n",
34             row[0], row[1], row[2], row[3]);
35    } else if (fields == 5) {
36       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n",
37             row[0], row[1], row[2], row[3], row[4]);
38    } else if (fields == 6) {
39       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n",
40             row[0], row[1], row[2], row[3], row[4], row[5]);
41    } else if (fields == 7) {
42       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
43             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
44    }
45    return 0;
46 }
47
48 Bvfs::Bvfs(JCR *j, BDB *mdb) {
49    jcr = j;
50    jcr->inc_use_count();
51    db = mdb;                 /* need to inc ref count */
52    jobids = get_pool_memory(PM_NAME);
53    prev_dir = get_pool_memory(PM_NAME);
54    pattern = get_pool_memory(PM_NAME);
55    filename = get_pool_memory(PM_NAME);
56    tmp = get_pool_memory(PM_NAME);
57    escaped_list = get_pool_memory(PM_NAME);
58    *filename = *jobids = *prev_dir = *pattern = 0;
59    dir_filenameid = pwd_id = offset = 0;
60    see_copies = see_all_versions = false;
61    limit = 1000;
62    attr = new_attr(jcr);
63    list_entries = result_handler;
64    user_data = this;
65    username = NULL;
66    job_acl = client_acl = pool_acl = fileset_acl = NULL;
67 }
68
69 Bvfs::~Bvfs() {
70    free_pool_memory(jobids);
71    free_pool_memory(pattern);
72    free_pool_memory(prev_dir);
73    free_pool_memory(filename);
74    free_pool_memory(tmp);
75    free_pool_memory(escaped_list);
76    if (username) {
77       free(username);
78    }
79    free_attr(attr);
80    jcr->dec_use_count();
81 }
82
83 char *Bvfs::escape_list(alist *lst)
84 {
85    char *elt;
86    int len;
87
88    /* List is empty, reject everything */
89    if (!lst || lst->size() == 0) {
90       Mmsg(escaped_list, "''");
91       return escaped_list;
92    }
93
94    *tmp = 0;
95    *escaped_list = 0;
96
97    foreach_alist(elt, lst) {
98       if (elt && *elt) {
99          len = strlen(elt);
100          /* Escape + ' ' */
101          tmp = check_pool_memory_size(tmp, 2 * len + 2 + 2);
102
103          tmp[0] = '\'';
104          db->bdb_escape_string(jcr, tmp + 1 , elt, len);
105          pm_strcat(tmp, "'");
106
107          if (*escaped_list) {
108             pm_strcat(escaped_list, ",");
109          }
110
111          pm_strcat(escaped_list, tmp);
112       }
113    }
114    return escaped_list;
115 }
116
117 void Bvfs::filter_jobid()
118 {
119    POOL_MEM query;
120    POOL_MEM sub_where;
121    POOL_MEM sub_join;
122
123    /* No ACL, no username, no check */
124    if (!job_acl && !fileset_acl && !client_acl && !pool_acl && !username) {
125       Dmsg0(dbglevel_sql, "No ACL\n");
126       return;
127    }
128
129    if (job_acl) {
130       Mmsg(sub_where, " AND Job.Name IN (%s) ", escape_list(job_acl));
131    }
132
133    if (fileset_acl) {
134       Mmsg(query, " AND FileSet.FileSet IN (%s) ", escape_list(fileset_acl));
135       pm_strcat(sub_where, query.c_str());
136       pm_strcat(sub_join, " JOIN FileSet USING (FileSetId) ");
137    }
138
139    if (client_acl) {
140       Mmsg(query, " AND Client.Name IN (%s) ", escape_list(client_acl));
141       pm_strcat(sub_where, query.c_str());
142    }
143
144    if (pool_acl) {
145       Mmsg(query, " AND Pool.Name IN (%s) ", escape_list(pool_acl));
146       pm_strcat(sub_where, query.c_str());
147       pm_strcat(sub_join, " JOIN Pool USING (PoolId) ");
148    }
149
150    if (username) {
151       /* Query used by Bweb to filter clients, activated when using
152        * set_username()
153        */
154       Mmsg(query,
155       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
156         "JOIN (SELECT ClientId FROM client_group_member "
157         "JOIN client_group USING (client_group_id) "
158         "JOIN bweb_client_group_acl USING (client_group_id) "
159         "JOIN bweb_user USING (userid) "
160        "WHERE bweb_user.username = '%s' "
161       ") AS filter USING (ClientId) "
162         " WHERE JobId IN (%s) %s",
163            sub_join.c_str(), username, jobids, sub_where.c_str());
164
165    } else {
166       Mmsg(query,
167       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) %s "
168       " WHERE JobId IN (%s) %s",
169            sub_join.c_str(), jobids, sub_where.c_str());
170    }
171
172    db_list_ctx ctx;
173    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
174    db->bdb_sql_query(query.c_str(), db_list_handler, &ctx);
175    pm_strcpy(jobids, ctx.list);
176 }
177
178 void Bvfs::set_jobid(JobId_t id)
179 {
180    Mmsg(jobids, "%lld", (uint64_t)id);
181    filter_jobid();
182 }
183
184 void Bvfs::set_jobids(char *ids)
185 {
186    pm_strcpy(jobids, ids);
187    filter_jobid();
188 }
189
190 /*
191  * TODO: Find a way to let the user choose how he wants to display
192  * files and directories
193  */
194
195
196 /*
197  * Working Object to store PathId already seen (avoid
198  * database queries), equivalent to %cache_ppathid in perl
199  */
200
201 #define NITEMS 50000
202 class pathid_cache {
203 private:
204    hlink *nodes;
205    int nb_node;
206    int max_node;
207
208    alist *table_node;
209
210    htable *cache_ppathid;
211
212 public:
213    pathid_cache() {
214       hlink link;
215       cache_ppathid = (htable *)malloc(sizeof(htable));
216       cache_ppathid->init(&link, &link, NITEMS);
217       max_node = NITEMS;
218       nodes = (hlink *) malloc(max_node * sizeof (hlink));
219       nb_node = 0;
220       table_node = New(alist(5, owned_by_alist));
221       table_node->append(nodes);
222    }
223
224    hlink *get_hlink() {
225       if (++nb_node >= max_node) {
226          nb_node = 0;
227          nodes = (hlink *)malloc(max_node * sizeof(hlink));
228          table_node->append(nodes);
229       }
230       return nodes + nb_node;
231    }
232
233    bool lookup(char *pathid) {
234       bool ret = cache_ppathid->lookup(pathid) != NULL;
235       return ret;
236    }
237
238    void insert(char *pathid) {
239       hlink *h = get_hlink();
240       cache_ppathid->insert(pathid, h);
241    }
242
243    ~pathid_cache() {
244       cache_ppathid->destroy();
245       free(cache_ppathid);
246       delete table_node;
247    }
248 private:
249    pathid_cache(const pathid_cache &); /* prohibit pass by value */
250    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
251 } ;
252
253 /* Return the parent_dir with the trailing /  (update the given string)
254  * TODO: see in the rest of bacula if we don't have already this function
255  * dir=/tmp/toto/
256  * dir=/tmp/
257  * dir=/
258  * dir=
259  */
260 char *bvfs_parent_dir(char *path)
261 {
262    char *p = path;
263    int len = strlen(path) - 1;
264
265    /* windows directory / */
266    if (len == 2 && B_ISALPHA(path[0])
267                 && path[1] == ':'
268                 && path[2] == '/')
269    {
270       len = 0;
271       path[0] = '\0';
272    }
273
274    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
275       path[len] = '\0';
276    }
277
278    if (len > 0) {
279       p += len;
280       while (p > path && !IsPathSeparator(*p)) {
281          p--;
282       }
283       p[1] = '\0';
284    }
285    return path;
286 }
287
288 /* Return the basename of the with the trailing /
289  * TODO: see in the rest of bacula if we don't have
290  * this function already
291  */
292 char *bvfs_basename_dir(char *path)
293 {
294    char *p = path;
295    int len = strlen(path) - 1;
296
297    if (path[len] == '/') {      /* if directory, skip last / */
298       len -= 1;
299    }
300
301    if (len > 0) {
302       p += len;
303       while (p > path && !IsPathSeparator(*p)) {
304          p--;
305       }
306       if (*p == '/') {
307          p++;                  /* skip first / */
308       }
309    }
310    return p;
311 }
312
313 static void build_path_hierarchy(JCR *jcr, BDB *mdb,
314                                  pathid_cache &ppathid_cache,
315                                  char *org_pathid, char *path)
316 {
317    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
318    char pathid[50];
319    ATTR_DBR parent;
320    char *bkp = mdb->path;
321    strncpy(pathid, org_pathid, sizeof(pathid));
322
323    /* Does the ppathid exist for this ? we use a memory cache...  In order to
324     * avoid the full loop, we consider that if a dir is allready in the
325     * PathHierarchy table, then there is no need to calculate all the
326     * hierarchy
327     */
328    while (path && *path)
329    {
330       if (!ppathid_cache.lookup(pathid))
331       {
332          Mmsg(mdb->cmd,
333               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
334               pathid);
335
336          if (!mdb->QueryDB(jcr, mdb->cmd)) {
337             goto bail_out;      /* Query failed, just leave */
338          }
339
340          /* Do we have a result ? */
341          if (mdb->sql_num_rows() > 0) {
342             ppathid_cache.insert(pathid);
343             /* This dir was in the db ...
344              * It means we can leave, the tree has allready been built for
345              * this dir
346              */
347             goto bail_out;
348          } else {
349             /* search or create parent PathId in Path table */
350             mdb->path = bvfs_parent_dir(path);
351             mdb->pnl = strlen(mdb->path);
352             if (!mdb->bdb_create_path_record(jcr, &parent)) {
353                goto bail_out;
354             }
355             ppathid_cache.insert(pathid);
356
357             Mmsg(mdb->cmd,
358                  "INSERT INTO PathHierarchy (PathId, PPathId) "
359                  "VALUES (%s,%lld)",
360                  pathid, (uint64_t) parent.PathId);
361
362             if (!mdb->InsertDB(jcr, mdb->cmd)) {
363                goto bail_out;   /* Can't insert the record, just leave */
364             }
365
366             edit_uint64(parent.PathId, pathid);
367             path = mdb->path;   /* already done */
368          }
369       } else {
370          /* It's already in the cache.  We can leave, no time to waste here,
371           * all the parent dirs have allready been done
372           */
373          goto bail_out;
374       }
375    }
376
377 bail_out:
378    mdb->path = bkp;
379    mdb->fnl = 0;
380 }
381
382 /*
383  * Internal function to update path_hierarchy cache with a shared pathid cache
384  * return Error 0
385  *        OK    1
386  */
387 static int update_path_hierarchy_cache(JCR *jcr,
388                                         BDB *mdb,
389                                         pathid_cache &ppathid_cache,
390                                         JobId_t JobId)
391 {
392    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
393    uint32_t ret=0;
394    uint32_t num;
395    char jobid[50];
396    edit_uint64(JobId, jobid);
397
398    mdb->bdb_lock();
399
400    /* We don't really want to harm users with spurious messages,
401     * everything is handled by transaction
402     */
403    mdb->set_use_fatal_jmsg(false);
404
405    mdb->bdb_start_transaction(jcr);
406
407    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
408
409    if (!mdb->QueryDB(jcr, mdb->cmd) || mdb->sql_num_rows() > 0) {
410       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
411       ret = 1;
412       goto bail_out;
413    }
414
415    /* Inserting path records for JobId */
416    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
417                    "SELECT DISTINCT PathId, JobId "
418                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
419                            "UNION "
420                            "SELECT PathId, BaseFiles.JobId "
421                              "FROM BaseFiles JOIN File AS F USING (FileId) "
422                             "WHERE BaseFiles.JobId = %s) AS B",
423         jobid, jobid);
424
425    if (!mdb->QueryDB(jcr, mdb->cmd)) {
426       Dmsg1(dbglevel, "Can't fill PathVisibility %d\n", (uint32_t)JobId );
427       goto bail_out;
428    }
429
430    /* Now we have to do the directory recursion stuff to determine missing
431     * visibility We try to avoid recursion, to be as fast as possible We also
432     * only work on not allready hierarchised directories...
433     */
434    Mmsg(mdb->cmd,
435      "SELECT PathVisibility.PathId, Path "
436        "FROM PathVisibility "
437             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
438             "LEFT JOIN PathHierarchy "
439          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
440       "WHERE PathVisibility.JobId = %s "
441         "AND PathHierarchy.PathId IS NULL "
442       "ORDER BY Path", jobid);
443    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
444
445    if (!mdb->QueryDB(jcr, mdb->cmd)) {
446       Dmsg1(dbglevel, "Can't get new Path %d\n", (uint32_t)JobId );
447       goto bail_out;
448    }
449
450    /* TODO: I need to reuse the DB connection without emptying the result
451     * So, now i'm copying the result in memory to be able to query the
452     * catalog descriptor again.
453     */
454    num = mdb->sql_num_rows();
455    if (num > 0) {
456       char **result = (char **)malloc (num * 2 * sizeof(char *));
457
458       SQL_ROW row;
459       int i=0;
460       while((row = mdb->sql_fetch_row())) {
461          result[i++] = bstrdup(row[0]);
462          result[i++] = bstrdup(row[1]);
463       }
464
465       i=0;
466       while (num > 0) {
467          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
468          free(result[i++]);
469          free(result[i++]);
470          num--;
471       }
472       free(result);
473    }
474
475    
476    if (mdb->bdb_get_type_index() == SQL_TYPE_SQLITE3) {
477       Mmsg(mdb->cmd,
478  "INSERT INTO PathVisibility (PathId, JobId) "
479    "SELECT DISTINCT h.PPathId AS PathId, %s "
480      "FROM PathHierarchy AS h "
481     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
482       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
483            jobid, jobid, jobid );
484
485    } else if (mdb->bdb_get_type_index() == SQL_TYPE_MYSQL) {
486       Mmsg(mdb->cmd,
487   "INSERT INTO PathVisibility (PathId, JobId)  "
488    "SELECT a.PathId,%s "
489    "FROM ( "
490      "SELECT DISTINCT h.PPathId AS PathId "
491        "FROM PathHierarchy AS h "
492        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
493       "WHERE p.JobId=%s) AS a "
494       "LEFT JOIN PathVisibility AS b ON (b.JobId=%s and a.PathId = b.PathId) "
495       "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
496
497    } else {
498       Mmsg(mdb->cmd,
499   "INSERT INTO PathVisibility (PathId, JobId)  "
500    "SELECT a.PathId,%s "
501    "FROM ( "
502      "SELECT DISTINCT h.PPathId AS PathId "
503        "FROM PathHierarchy AS h "
504        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
505       "WHERE p.JobId=%s) AS a LEFT JOIN "
506        "(SELECT PathId "
507           "FROM PathVisibility "
508          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
509    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
510    }
511
512    do {
513       ret = mdb->QueryDB(jcr, mdb->cmd);
514    } while (ret && mdb->sql_affected_rows() > 0);
515
516    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
517    ret = mdb->UpdateDB(jcr, mdb->cmd);
518
519 bail_out:
520    mdb->bdb_end_transaction(jcr);
521
522    if (!ret) {
523       Mmsg(mdb->cmd, "SELECT HasCache FROM Job WHERE JobId=%s", jobid);
524       mdb->bdb_sql_query(mdb->cmd, db_int_handler, &ret);
525    }
526
527    /* Enable back the FATAL message if something is wrong */
528    mdb->set_use_fatal_jmsg(true);
529
530    mdb->bdb_unlock();
531    return ret;
532 }
533
534 /* 
535  * Find an store the filename descriptor for empty directories Filename.Name=''
536  */
537 DBId_t Bvfs::get_dir_filenameid()
538 {
539    uint32_t id;
540    if (dir_filenameid) {
541       return dir_filenameid;
542    }
543    Mmsg(db->cmd, "SELECT FilenameId FROM Filename WHERE Name = ''");
544    db_sql_query(db, db->cmd, db_int_handler, &id);
545    dir_filenameid = id;
546    return dir_filenameid;
547 }
548
549 /* Compute the cache for the bfileview compoment */
550 void Bvfs::fv_update_cache()
551 {
552    int64_t pathid;
553    int64_t size=0, count=0;
554
555    Dmsg0(dbglevel, "fv_update_cache()\n");
556
557    if (!*jobids) {
558       return;                   /* Nothing to build */
559    }
560
561    db->bdb_lock();
562    /* We don't really want to harm users with spurious messages,
563     * everything is handled by transaction
564     */
565    db->set_use_fatal_jmsg(false);
566
567    db->bdb_start_transaction(jcr);
568
569    pathid = get_root();
570
571    fv_compute_size_and_count(pathid, &size, &count);
572
573    db->bdb_end_transaction(jcr);
574
575    /* Enable back the FATAL message if something is wrong */
576    db->set_use_fatal_jmsg(true);
577
578    db->bdb_unlock();
579 }
580
581 /* Not yet working */
582 void Bvfs::fv_get_big_files(int64_t pathid, int64_t min_size, int32_t limit)
583 {
584    Mmsg(db->cmd, 
585         "SELECT FilenameId AS filenameid, Name AS name, size "
586           "FROM ( "
587          "SELECT FilenameId, base64_decode_lstat(8,LStat) AS size "
588            "FROM File "
589           "WHERE PathId  = %lld "
590             "AND JobId = %s "
591         ") AS S INNER JOIN Filename USING (FilenameId) "
592      "WHERE S.size > %lld "
593      "ORDER BY S.size DESC "
594      "LIMIT %d ", pathid, jobids, min_size, limit);
595 }
596
597 /* Get the current path size and files count */
598 void Bvfs::fv_get_current_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
599 {
600    SQL_ROW row;
601
602    *size = *count = 0;
603
604    Mmsg(db->cmd,
605  "SELECT Size AS size, Files AS files "
606   " FROM PathVisibility "
607  " WHERE PathId = %lld "
608    " AND JobId = %s ", pathid, jobids);
609
610    if (!db->QueryDB(jcr, db->cmd)) {
611       return;
612    }
613
614    if ((row = db->sql_fetch_row())) {
615       *size = str_to_int64(row[0]);
616       *count =  str_to_int64(row[1]);
617    }
618 }
619
620 /* Compute for the current path the size and files count */
621 void Bvfs::fv_get_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
622 {
623    SQL_ROW row;
624
625    *size = *count = 0;
626
627    Mmsg(db->cmd,
628  "SELECT sum(base64_decode_lstat(8,LStat)) AS size, count(1) AS files "
629   " FROM File "
630  " WHERE PathId = %lld "
631    " AND JobId = %s ", pathid, jobids);
632
633    if (!db->QueryDB(jcr, db->cmd)) {
634       return;
635    }
636
637    if ((row = db->sql_fetch_row())) {
638       *size = str_to_int64(row[0]);
639       *count =  str_to_int64(row[1]);
640    }
641 }
642
643 void Bvfs::fv_compute_size_and_count(int64_t pathid, int64_t *size, int64_t *count)
644 {
645    Dmsg1(dbglevel, "fv_compute_size_and_count(%lld)\n", pathid);
646
647    fv_get_current_size_and_count(pathid, size, count);
648    if (*size > 0) {
649       return;
650    }
651
652    /* Update stats for the current directory */
653    fv_get_size_and_count(pathid, size, count);
654
655    /* Update stats for all sub directories */
656    Mmsg(db->cmd,
657         " SELECT PathId "
658           " FROM PathVisibility "
659                " INNER JOIN PathHierarchy USING (PathId) "
660          " WHERE PPathId  = %lld "
661            " AND JobId = %s ", pathid, jobids);
662
663    db->QueryDB(jcr, db->cmd);
664    int num = db->sql_num_rows();
665
666    if (num > 0) {
667       int64_t *result = (int64_t *)malloc (num * sizeof(int64_t));
668       SQL_ROW row;
669       int i=0;
670
671       while((row = db->sql_fetch_row())) {
672          result[i++] = str_to_int64(row[0]); /* PathId */
673       }
674
675       i=0;
676       while (num > 0) {
677          int64_t c=0, s=0;
678          fv_compute_size_and_count(result[i], &s, &c);
679          *size += s;
680          *count += c;
681
682          i++;
683          num--;
684       }
685       free(result);
686    }
687
688    fv_update_size_and_count(pathid, *size, *count);
689 }
690
691 void Bvfs::fv_update_size_and_count(int64_t pathid, int64_t size, int64_t count)
692 {
693    Mmsg(db->cmd,
694         "UPDATE PathVisibility SET Files = %lld, Size = %lld "
695         " WHERE JobId = %s "
696         " AND PathId = %lld ", count, size, jobids, pathid);
697
698    db->UpdateDB(jcr, db->cmd);
699 }
700
701 void bvfs_update_cache(JCR *jcr, BDB *mdb)
702 {
703    uint32_t nb=0;
704    db_list_ctx jobids_list;
705
706    mdb->bdb_lock();
707
708 #ifdef xxx
709    /* TODO: Remove this code when updating make_bacula_table script */
710    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
711    if (!mdb->QueryDB(jcr, mdb->cmd)) {
712       Dmsg0(dbglevel, "Creating cache table\n");
713       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
714       mdb->QueryDB(jcr, mdb->cmd);
715
716       Mmsg(mdb->cmd,
717            "CREATE TABLE PathHierarchy ( "
718            "PathId integer NOT NULL, "
719            "PPathId integer NOT NULL, "
720            "CONSTRAINT pathhierarchy_pkey "
721            "PRIMARY KEY (PathId))");
722       mdb->QueryDB(jcr, mdb->cmd);
723
724       Mmsg(mdb->cmd,
725            "CREATE INDEX pathhierarchy_ppathid "
726            "ON PathHierarchy (PPathId)");
727       mdb->QueryDB(jcr, mdb->cmd);
728
729       Mmsg(mdb->cmd,
730            "CREATE TABLE PathVisibility ("
731            "PathId integer NOT NULL, "
732            "JobId integer NOT NULL, "
733            "Size int8 DEFAULT 0, "
734            "Files int4 DEFAULT 0, "
735            "CONSTRAINT pathvisibility_pkey "
736            "PRIMARY KEY (JobId, PathId))");
737       mdb->QueryDB(jcr, mdb->cmd);
738
739       Mmsg(mdb->cmd,
740            "CREATE INDEX pathvisibility_jobid "
741            "ON PathVisibility (JobId)");
742       mdb->QueryDB(jcr, mdb->cmd);
743
744    }
745 #endif
746
747    Mmsg(mdb->cmd,
748  "SELECT JobId from Job "
749   "WHERE HasCache = 0 "
750     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
751   "ORDER BY JobId");
752
753    mdb->bdb_sql_query(mdb->cmd, db_list_handler, &jobids_list);
754
755    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
756
757    mdb->bdb_start_transaction(jcr);
758    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
759    Mmsg(mdb->cmd,
760         "DELETE FROM PathVisibility "
761          "WHERE NOT EXISTS "
762         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
763    nb = mdb->DeleteDB(jcr, mdb->cmd);
764    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
765
766    mdb->bdb_end_transaction(jcr);
767    mdb->bdb_unlock();
768 }
769
770 /*
771  * Update the bvfs cache for given jobids (1,2,3,4)
772  */
773 int
774 bvfs_update_path_hierarchy_cache(JCR *jcr, BDB *mdb, char *jobids)
775 {
776    pathid_cache ppathid_cache;
777    JobId_t JobId;
778    char *p;
779    int ret=1;
780
781    for (p=jobids; ; ) {
782       int stat = get_next_jobid_from_list(&p, &JobId);
783       if (stat < 0) {
784          ret = 0;
785          break;
786       }
787       if (stat == 0) {
788          break;
789       }
790       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
791       if (!update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId)) {
792          ret = 0;
793       }
794    }
795    return ret;
796 }
797
798 /*
799  * Update the bvfs fileview for given jobids
800  */
801 void
802 bvfs_update_fv_cache(JCR *jcr, BDB *mdb, char *jobids)
803 {
804    char *p;
805    JobId_t JobId;
806    Bvfs bvfs(jcr, mdb);
807
808    for (p=jobids; ; ) {
809       int stat = get_next_jobid_from_list(&p, &JobId);
810       if (stat < 0) {
811          return;
812       }
813       if (stat == 0) {
814          break;
815       }
816
817       Dmsg1(dbglevel, "Trying to create cache for %lld\n", (int64_t)JobId);
818
819       bvfs.set_jobid(JobId);
820       bvfs.fv_update_cache();
821    }
822 }
823
824 /*
825  * Update the bvfs cache for current jobids
826  */
827 void Bvfs::update_cache()
828 {
829    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
830 }
831
832 /* Change the current directory, returns true if the path exists */
833 bool Bvfs::ch_dir(const char *path)
834 {
835    pm_strcpy(db->path, path);
836    db->pnl = strlen(db->path);
837    db->bdb_lock();
838    ch_dir(db->bdb_get_path_record(jcr));
839    db->bdb_unlock();
840    return pwd_id != 0;
841 }
842
843 /*
844  * Get all file versions for a specified client
845  * TODO: Handle basejobs using different client
846  */
847 void Bvfs::get_all_file_versions(DBId_t pathid, FileId_t fnid, const char *client)
848 {
849    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
850          (uint64_t)fnid, client);
851    char ed1[50], ed2[50];
852    POOL_MEM q;
853    if (see_copies) {
854       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
855    } else {
856       Mmsg(q, " AND Job.Type = 'B' ");
857    }
858
859    POOL_MEM query;
860
861    Mmsg(query,//    1           2              3       
862 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
863 //         4          5           6
864         "File.JobId, File.LStat, File.FileId, "
865 //         7                    8
866        "Media.VolumeName, Media.InChanger "
867 "FROM File, Job, Client, JobMedia, Media "
868 "WHERE File.FilenameId = %s "
869   "AND File.PathId=%s "
870   "AND File.JobId = Job.JobId "
871   "AND Job.JobId = JobMedia.JobId "
872   "AND File.FileIndex >= JobMedia.FirstIndex "
873   "AND File.FileIndex <= JobMedia.LastIndex "
874   "AND JobMedia.MediaId = Media.MediaId "
875   "AND Job.ClientId = Client.ClientId "
876   "AND Client.Name = '%s' "
877   "%s ORDER BY FileId LIMIT %d OFFSET %d"
878         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
879         limit, offset);
880    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
881    db->bdb_sql_query(query.c_str(), list_entries, user_data);
882 }
883
884 /*
885  * Get all volumes for a specific file
886  */
887 void Bvfs::get_volumes(FileId_t fileid)
888 {
889    Dmsg1(dbglevel, "get_volumes(%lld)\n", (uint64_t)fileid);
890
891    char ed1[50];
892    POOL_MEM query;
893
894    Mmsg(query,
895 //                                   7                8
896 "SELECT DISTINCT 'L',0,0,0,0,0,0, Media.VolumeName, Media.InChanger "
897 "FROM File JOIN JobMedia USING (JobId) JOIN Media USING (MediaId) "
898 "WHERE File.FileId = %s "
899   "AND File.FileIndex >= JobMedia.FirstIndex "
900   "AND File.FileIndex <= JobMedia.LastIndex "
901   " LIMIT %d OFFSET %d"
902         ,edit_uint64(fileid, ed1), limit, offset);
903    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
904    db->bdb_sql_query(query.c_str(), list_entries, user_data);
905 }
906
907 DBId_t Bvfs::get_root()
908 {
909    int p;
910    *db->path = 0;
911    db->bdb_lock();
912    p = db->bdb_get_path_record(jcr);
913    db->bdb_unlock();
914    return p;
915 }
916
917 static int path_handler(void *ctx, int fields, char **row)
918 {
919    Bvfs *fs = (Bvfs *) ctx;
920    return fs->_handle_path(ctx, fields, row);
921 }
922
923 int Bvfs::_handle_path(void *ctx, int fields, char **row)
924 {
925    if (bvfs_is_dir(row)) {
926       /* can have the same path 2 times */
927       if (strcmp(row[BVFS_PathId], prev_dir)) {
928          pm_strcpy(prev_dir, row[BVFS_PathId]);
929          return list_entries(user_data, fields, row);
930       }
931    }
932    return 0;
933 }
934
935 /*
936  * Retrieve . and .. information
937  */
938 void Bvfs::ls_special_dirs()
939 {
940    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
941    char ed1[50], ed2[50];
942    if (*jobids == 0) {
943       return;
944    }
945    if (!dir_filenameid) {
946       get_dir_filenameid();
947    }
948
949    /* Will fetch directories  */
950    *prev_dir = 0;
951
952    POOL_MEM query;
953    Mmsg(query,
954 "(SELECT PPathId AS PathId, '..' AS Path "
955     "FROM  PathHierarchy "
956    "WHERE  PathId = %s "
957 "UNION "
958  "SELECT %s AS PathId, '.' AS Path)",
959         edit_uint64(pwd_id, ed1), ed1);
960
961    POOL_MEM query2;
962    Mmsg(query2,// 1      2     3        4     5       6
963 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
964   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
965        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
966               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
967        "WHERE File1.FilenameId = %s "
968        "AND File1.JobId IN (%s)) AS listfile1 "
969   "ON (tmp.PathId = listfile1.PathId) "
970   "ORDER BY tmp.Path, JobId DESC ",
971         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
972
973    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
974    db->bdb_sql_query(query2.c_str(), path_handler, this);
975 }
976
977 /* Returns true if we have dirs to read */
978 bool Bvfs::ls_dirs()
979 {
980    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
981    char ed1[50], ed2[50];
982    if (*jobids == 0) {
983       return false;
984    }
985
986    POOL_MEM query;
987    POOL_MEM filter;
988    if (*pattern) {
989       Mmsg(filter, " AND Path2.Path %s '%s' ",
990            match_query[db->bdb_get_type_index()], pattern);
991
992    }
993
994    if (!dir_filenameid) {
995       get_dir_filenameid();
996    }
997
998    /* the sql query displays same directory multiple time, take the first one */
999    *prev_dir = 0;
1000
1001    /* Let's retrieve the list of the visible dirs in this dir ...
1002     * First, I need the empty filenameid to locate efficiently
1003     * the dirs in the file table
1004     * my $dir_filenameid = $self->get_dir_filenameid();
1005     */
1006    /* Then we get all the dir entries from File ... */
1007    Mmsg(query,
1008 //       0     1     2   3      4     5       6
1009 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
1010     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
1011            "lower(Path1.Path) AS lpath, "
1012            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
1013            "listfile1.FileId AS FileId "
1014     "FROM ( "
1015       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
1016       "FROM PathHierarchy AS PathHierarchy1 "
1017       "JOIN Path AS Path2 "
1018         "ON (PathHierarchy1.PathId = Path2.PathId) "
1019       "JOIN PathVisibility AS PathVisibility1 "
1020         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
1021       "WHERE PathHierarchy1.PPathId = %s "
1022       "AND PathVisibility1.JobId IN (%s) "
1023            "%s "
1024      ") AS listpath1 "
1025    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
1026
1027    "LEFT JOIN ( " /* get attributes if any */
1028        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
1029               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
1030        "WHERE File1.FilenameId = %s "
1031        "AND File1.JobId IN (%s)) AS listfile1 "
1032        "ON (listpath1.PathId = listfile1.PathId) "
1033     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
1034         edit_uint64(pwd_id, ed1),
1035         jobids,
1036         filter.c_str(),
1037         edit_uint64(dir_filenameid, ed2),
1038         jobids,
1039         limit, offset);
1040
1041    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1042
1043    db->bdb_lock();
1044    db->bdb_sql_query(query.c_str(), path_handler, this);
1045    nb_record = db->sql_num_rows();
1046    db->bdb_unlock();
1047
1048    return nb_record == limit;
1049 }
1050
1051 void build_ls_files_query(BDB *db, POOL_MEM &query,
1052                           const char *JobId, const char *PathId,
1053                           const char *filter, int64_t limit, int64_t offset)
1054 {
1055    if (db->bdb_get_type_index() == SQL_TYPE_POSTGRESQL) {
1056       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1057            JobId, PathId, JobId, PathId,
1058            filter, limit, offset);
1059    } else {
1060       Mmsg(query, sql_bvfs_list_files[db->bdb_get_type_index()],
1061            JobId, PathId, JobId, PathId,
1062            limit, offset, filter, JobId, JobId);
1063    }
1064 }
1065
1066 /* Returns true if we have files to read */
1067 bool Bvfs::ls_files()
1068 {
1069    POOL_MEM query;
1070    POOL_MEM filter;
1071    char pathid[50];
1072
1073    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
1074    if (*jobids == 0) {
1075       return false;
1076    }
1077
1078    if (!pwd_id) {
1079       ch_dir(get_root());
1080    }
1081
1082    edit_uint64(pwd_id, pathid);
1083    if (*pattern) {
1084       Mmsg(filter, " AND Filename.Name %s '%s' ", 
1085            match_query[db_get_type_index(db)], pattern);
1086
1087    } else if (*filename) {
1088       Mmsg(filter, " AND Filename.Name = '%s' ", filename);
1089    }
1090
1091    build_ls_files_query(db, query,
1092                         jobids, pathid, filter.c_str(),
1093                         limit, offset);
1094
1095    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1096
1097    db->bdb_lock();
1098    db->bdb_sql_query(query.c_str(), list_entries, user_data);
1099    nb_record = db->sql_num_rows();
1100    db->bdb_unlock();
1101
1102    return nb_record == limit;
1103 }
1104
1105
1106 /*
1107  * Return next Id from comma separated list
1108  *
1109  * Returns:
1110  *   1 if next Id returned
1111  *   0 if no more Ids are in list
1112  *  -1 there is an error
1113  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
1114  */
1115 static int get_next_id_from_list(char **p, int64_t *Id)
1116 {
1117    const int maxlen = 30;
1118    char id[maxlen+1];
1119    char *q = *p;
1120
1121    id[0] = 0;
1122    for (int i=0; i<maxlen; i++) {
1123       if (*q == 0) {
1124          break;
1125       } else if (*q == ',') {
1126          q++;
1127          break;
1128       }
1129       id[i] = *q++;
1130       id[i+1] = 0;
1131    }
1132    if (id[0] == 0) {
1133       return 0;
1134    } else if (!is_a_number(id)) {
1135       return -1;                      /* error */
1136    }
1137    *p = q;
1138    *Id = str_to_int64(id);
1139    return 1;
1140 }
1141
1142 static int get_path_handler(void *ctx, int fields, char **row)
1143 {
1144    POOL_MEM *buf = (POOL_MEM *) ctx;
1145    pm_strcpy(*buf, row[0]);
1146    return 0;
1147 }
1148
1149 static bool check_temp(char *output_table)
1150 {
1151    if (output_table[0] == 'b' &&
1152        output_table[1] == '2' &&
1153        is_an_integer(output_table + 2))
1154    {
1155       return true;
1156    }
1157    return false;
1158 }
1159
1160 void Bvfs::clear_cache()
1161 {
1162    db->bdb_sql_query("BEGIN",                     NULL, NULL);
1163    db->bdb_sql_query("UPDATE Job SET HasCache=0", NULL, NULL);
1164    db->bdb_sql_query("TRUNCATE PathHierarchy",    NULL, NULL);
1165    db->bdb_sql_query("TRUNCATE PathVisibility",   NULL, NULL);
1166    db->bdb_sql_query("COMMIT",                    NULL, NULL);
1167 }
1168
1169 bool Bvfs::drop_restore_list(char *output_table)
1170 {
1171    POOL_MEM query;
1172    if (check_temp(output_table)) {
1173       Mmsg(query, "DROP TABLE %s", output_table);
1174       db->bdb_sql_query(query.c_str(), NULL, NULL);
1175       return true;
1176    }
1177    return false;
1178 }
1179
1180 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink,
1181                                 char *output_table)
1182 {
1183    POOL_MEM query;
1184    POOL_MEM tmp, tmp2;
1185    int64_t id, jobid, prev_jobid;
1186    int num;
1187    bool init=false;
1188    bool ret=false;
1189    /* check args */
1190    if ((*fileid   && !is_a_number_list(fileid))  ||
1191        (*dirid    && !is_a_number_list(dirid))   ||
1192        (*hardlink && !is_a_number_list(hardlink))||
1193        (!*hardlink && !*fileid && !*dirid && !*hardlink))
1194    {
1195       return false;
1196    }
1197    if (!check_temp(output_table)) {
1198       return false;
1199    }
1200
1201    db->bdb_lock();
1202
1203    /* Cleanup old tables first */
1204    Mmsg(query, "DROP TABLE btemp%s", output_table);
1205    db->bdb_sql_query(query.c_str());
1206
1207    Mmsg(query, "DROP TABLE %s", output_table);
1208    db->bdb_sql_query(query.c_str());
1209
1210    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
1211
1212    if (*fileid) {               /* Select files with their direct id */
1213       init=true;
1214       Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1215                       "PathId, FileId "
1216                  "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
1217            fileid);
1218       pm_strcat(query, tmp.c_str());
1219    }
1220
1221    /* Add a directory content */
1222    while (get_next_id_from_list(&dirid, &id) == 1) {
1223       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
1224
1225       if (!db->bdb_sql_query(tmp.c_str(), get_path_handler, (void *)&tmp2)) {
1226          Dmsg0(dbglevel, "Can't search for path\n");
1227          /* print error */
1228          goto bail_out;
1229       }
1230       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
1231          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
1232                id, tmp.c_str(), tmp2.c_str());
1233          break;
1234       }
1235       /* escape % and _ for LIKE search */
1236       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
1237       char *p = tmp.c_str();
1238       for (char *s = tmp2.c_str(); *s ; s++) {
1239          if (*s == '%' || *s == '_' || *s == '\\') {
1240             *p = '\\';
1241             p++;
1242          }
1243          *p = *s;
1244          p++;
1245       }
1246       *p = '\0';
1247       tmp.strcat("%");
1248
1249       size_t len = strlen(tmp.c_str());
1250       tmp2.check_size((len+1) * 2);
1251       db->bdb_escape_string(jcr, tmp2.c_str(), tmp.c_str(), len);
1252
1253       if (init) {
1254          query.strcat(" UNION ");
1255       }
1256
1257       Mmsg(tmp, "SELECT Job.JobId, JobTDate, File.FileIndex, File.FilenameId, "
1258                         "File.PathId, FileId "
1259                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
1260                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ",
1261            tmp2.c_str(), jobids);
1262       query.strcat(tmp.c_str());
1263       init = true;
1264
1265       query.strcat(" UNION ");
1266
1267       /* A directory can have files from a BaseJob */
1268       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
1269                         "File.FilenameId, File.PathId, BaseFiles.FileId "
1270                    "FROM BaseFiles "
1271                         "JOIN File USING (FileId) "
1272                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
1273                         "JOIN Path USING (PathId) "
1274                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ",
1275            tmp2.c_str(), jobids);
1276       query.strcat(tmp.c_str());
1277    }
1278
1279    /* expect jobid,fileindex */
1280    prev_jobid=0;
1281    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
1282       if (get_next_id_from_list(&hardlink, &id) != 1) {
1283          Dmsg0(dbglevel, "hardlink should be two by two\n");
1284          goto bail_out;
1285       }
1286       if (jobid != prev_jobid) { /* new job */
1287          if (prev_jobid == 0) {  /* first jobid */
1288             if (init) {
1289                query.strcat(" UNION ");
1290             }
1291          } else {               /* end last job, start new one */
1292             tmp.strcat(") UNION ");
1293             query.strcat(tmp.c_str());
1294          }
1295          Mmsg(tmp,   "SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
1296                             "PathId, FileId "
1297                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld "
1298                         "AND FileIndex IN (%lld", jobid, id);
1299          prev_jobid = jobid;
1300
1301       } else {                  /* same job, add new findex */
1302          Mmsg(tmp2, ", %lld", id);
1303          tmp.strcat(tmp2.c_str());
1304       }
1305    }
1306
1307    if (prev_jobid != 0) {       /* end last job */
1308       tmp.strcat(") ");
1309       query.strcat(tmp.c_str());
1310       init = true;
1311    }
1312
1313    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1314
1315    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1316       Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1317       goto bail_out;
1318    }
1319
1320    Mmsg(query, sql_bvfs_select[db->bdb_get_type_index()],
1321         output_table, output_table, output_table);
1322
1323    /* TODO: handle jobid filter */
1324    Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1325    if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1326       Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1327       goto bail_out;
1328    }
1329
1330    /* MySQL needs the index */
1331    if (db->bdb_get_type_index() == SQL_TYPE_MYSQL) {
1332       Mmsg(query, "CREATE INDEX idx_%s ON %s (JobId)",
1333            output_table, output_table);
1334       Dmsg1(dbglevel_sql, "query=%s\n", query.c_str());
1335       if (!db->bdb_sql_query(query.c_str(), NULL, NULL)) {
1336          Dmsg1(dbglevel, "Can't execute query=%s\n", query.c_str());
1337          goto bail_out; 
1338       } 
1339    }
1340
1341    /* Check if some FileId have DeltaSeq > 0
1342     * Foreach of them we need to get the accurate_job list, and compute
1343     * what are dependencies
1344     */
1345    Mmsg(query, 
1346         "SELECT F.FileId, F.JobId, F.FilenameId, F.PathId, F.DeltaSeq "
1347           "FROM File AS F JOIN Job USING (JobId) JOIN %s USING (FileId) "
1348          "WHERE DeltaSeq > 0", output_table);
1349
1350    if (!db->QueryDB(jcr, query.c_str())) {
1351       Dmsg1(dbglevel_sql, "Can't execute query=%s\n", query.c_str());
1352    }
1353
1354    /* TODO: Use an other DB connection can avoid to copy the result of the
1355     * previous query into a temporary buffer
1356     */
1357    num = db->sql_num_rows();
1358    Dmsg2(dbglevel, "Found %d Delta parts in restore selection q=%s\n", num, query.c_str());
1359
1360    if (num > 0) {
1361       int64_t *result = (int64_t *)malloc (num * 4 * sizeof(int64_t));
1362       SQL_ROW row;
1363       int i=0;
1364
1365       while((row = db->sql_fetch_row())) {
1366          result[i++] = str_to_int64(row[0]); /* FileId */
1367          result[i++] = str_to_int64(row[1]); /* JobId */
1368          result[i++] = str_to_int64(row[2]); /* FilenameId */
1369          result[i++] = str_to_int64(row[3]); /* PathId */
1370       }
1371
1372       i=0;
1373       while (num > 0) {
1374          insert_missing_delta(output_table, result + i);
1375          i += 4;
1376          num--;
1377       }
1378       free(result);
1379    }
1380
1381    ret = true;
1382
1383 bail_out:
1384    Mmsg(query, "DROP TABLE btemp%s", output_table);
1385    db->bdb_sql_query(query.c_str(), NULL, NULL);
1386    db->bdb_unlock();
1387    return ret;
1388 }
1389  
1390 void Bvfs::insert_missing_delta(char *output_table, int64_t *res)
1391 {
1392    char ed1[50], ed2[50];
1393    db_list_ctx lst;
1394    POOL_MEM query;
1395    JOB_DBR jr, jr2;
1396    memset(&jr, 0, sizeof(jr));
1397    memset(&jr2, 0, sizeof(jr2));
1398
1399    /* Need to limit the query to StartTime, Client/Fileset */
1400    jr2.JobId = res[1];
1401    db->bdb_get_job_record(jcr, &jr2);
1402
1403    jr.JobId = res[1];
1404    jr.ClientId = jr2.ClientId;
1405    jr.FileSetId = jr2.FileSetId;
1406    jr.JobLevel = L_INCREMENTAL;
1407    jr.StartTime = jr2.StartTime;
1408
1409    /* Get accurate jobid list */
1410    db->bdb_get_accurate_jobids(jcr, &jr, &lst);
1411
1412    Dmsg2(dbglevel_sql, "JobId list for %lld is %s\n", res[0], lst.list);
1413
1414    /* The list contains already the last DeltaSeq element, so
1415     * we don't need to select it in the next query
1416     */
1417    for (int l = strlen(lst.list); l > 0; l--) {
1418       if (lst.list[l] == ',') {
1419          lst.list[l] = '\0';
1420          break;
1421       }
1422    }
1423
1424    Dmsg1(dbglevel_sql, "JobId list after strip is %s\n", lst.list);
1425
1426    edit_int64(res[2], ed1);     /* fnid */
1427    edit_int64(res[3], ed2);     /* pathid */
1428
1429    int id=db->bdb_get_type_index();
1430    Mmsg(query, bvfs_select_delta_version_with_basejob_and_delta[id],
1431         lst.list, ed1, ed2, 
1432         lst.list, ed1, ed2, 
1433         lst.list, lst.list);
1434
1435    Mmsg(db->cmd, "INSERT INTO %s "
1436                    "SELECT JobId, FileIndex, FileId FROM (%s) AS F1",
1437         output_table, query.c_str());
1438
1439    if (!db->bdb_sql_query(db->cmd, NULL, NULL)) {
1440       Dmsg1(dbglevel_sql, "Can't exec q=%s\n", db->cmd);
1441    }
1442 }
1443
1444 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL */