]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Fix #5346 .bvfs_lsfiles and .bvfs_restore to handle deleted files
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #include "bacula.h"
30
31 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
32
33 #include "cats.h"
34 #include "bdb_priv.h"
35 #include "sql_glue.h"
36 #include "lib/htable.h"
37 #include "bvfs.h"
38
39 #define dbglevel 10
40 #define dbglevel_sql 15
41
42 static int result_handler(void *ctx, int fields, char **row)
43 {
44    if (fields == 4) {
45       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3]);
47    } else if (fields == 5) {
48       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4]);
50    } else if (fields == 6) {
51       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5]);
53    } else if (fields == 7) {
54       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
55             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
56    }
57    return 0;
58 }
59
60 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
61    jcr = j;
62    jcr->inc_use_count();
63    db = mdb;                 /* need to inc ref count */
64    jobids = get_pool_memory(PM_NAME);
65    prev_dir = get_pool_memory(PM_NAME);
66    pattern = get_pool_memory(PM_NAME);
67    *jobids = *prev_dir = *pattern = 0;
68    dir_filenameid = pwd_id = offset = 0;
69    see_copies = see_all_versions = false;
70    limit = 1000;
71    attr = new_attr(jcr);
72    list_entries = result_handler;
73    user_data = this;
74    username = NULL;
75 }
76
77 Bvfs::~Bvfs() {
78    free_pool_memory(jobids);
79    free_pool_memory(pattern);
80    free_pool_memory(prev_dir);
81    if (username) {
82       free(username);
83    }
84    free_attr(attr);
85    jcr->dec_use_count();
86 }
87
88 void Bvfs::filter_jobid()
89 {
90    if (!username) {
91       return;
92    }
93
94    /* Query used by Bweb to filter clients, activated when using
95     * set_username() 
96     */
97    POOL_MEM query;
98    Mmsg(query,
99       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
100         "JOIN (SELECT ClientId FROM client_group_member "
101         "JOIN client_group USING (client_group_id) "
102         "JOIN bweb_client_group_acl USING (client_group_id) "
103         "JOIN bweb_user USING (userid) "
104        "WHERE bweb_user.username = '%s' "
105       ") AS filter USING (ClientId) "
106         " WHERE JobId IN (%s)", 
107         username, jobids);
108
109    db_list_ctx ctx;
110    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
111    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
112    pm_strcpy(jobids, ctx.list);
113 }
114
115 void Bvfs::set_jobid(JobId_t id)
116 {
117    Mmsg(jobids, "%lld", (uint64_t)id);
118    filter_jobid();
119 }
120
121 void Bvfs::set_jobids(char *ids)
122 {
123    pm_strcpy(jobids, ids);
124    filter_jobid();
125 }
126
127 /* 
128  * TODO: Find a way to let the user choose how he wants to display
129  * files and directories
130  */
131
132
133 /* 
134  * Working Object to store PathId already seen (avoid
135  * database queries), equivalent to %cache_ppathid in perl
136  */
137
138 #define NITEMS 50000
139 class pathid_cache {
140 private:
141    hlink *nodes;
142    int nb_node;
143    int max_node;
144
145    alist *table_node;
146
147    htable *cache_ppathid;
148
149 public:
150    pathid_cache() {
151       hlink link;
152       cache_ppathid = (htable *)malloc(sizeof(htable));
153       cache_ppathid->init(&link, &link, NITEMS);
154       max_node = NITEMS;
155       nodes = (hlink *) malloc(max_node * sizeof (hlink));
156       nb_node = 0;
157       table_node = New(alist(5, owned_by_alist));
158       table_node->append(nodes);
159    }
160
161    hlink *get_hlink() {
162       if (++nb_node >= max_node) {
163          nb_node = 0;
164          nodes = (hlink *)malloc(max_node * sizeof(hlink));
165          table_node->append(nodes);
166       }
167       return nodes + nb_node;
168    }
169
170    bool lookup(char *pathid) {
171       bool ret = cache_ppathid->lookup(pathid) != NULL;
172       return ret;
173    }
174    
175    void insert(char *pathid) {
176       hlink *h = get_hlink();
177       cache_ppathid->insert(pathid, h);
178    }
179
180    ~pathid_cache() {
181       cache_ppathid->destroy();
182       free(cache_ppathid);
183       delete table_node;
184    }
185 private:
186    pathid_cache(const pathid_cache &); /* prohibit pass by value */
187    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
188 } ;
189
190 /* Return the parent_dir with the trailing /  (update the given string)
191  * TODO: see in the rest of bacula if we don't have already this function
192  * dir=/tmp/toto/
193  * dir=/tmp/
194  * dir=/
195  * dir=
196  */
197 char *bvfs_parent_dir(char *path)
198 {
199    char *p = path;
200    int len = strlen(path) - 1;
201
202    /* windows directory / */
203    if (len == 2 && B_ISALPHA(path[0]) 
204                 && path[1] == ':' 
205                 && path[2] == '/')
206    {
207       len = 0;
208       path[0] = '\0';
209    }
210
211    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
212       path[len] = '\0';
213    }
214
215    if (len > 0) {
216       p += len;
217       while (p > path && !IsPathSeparator(*p)) {
218          p--;
219       }
220       p[1] = '\0';
221    }
222    return path;
223 }
224
225 /* Return the basename of the with the trailing /
226  * TODO: see in the rest of bacula if we don't have
227  * this function already
228  */
229 char *bvfs_basename_dir(char *path)
230 {
231    char *p = path;
232    int len = strlen(path) - 1;
233
234    if (path[len] == '/') {      /* if directory, skip last / */
235       len -= 1;
236    }
237
238    if (len > 0) {
239       p += len;
240       while (p > path && !IsPathSeparator(*p)) {
241          p--;
242       }
243       if (*p == '/') {
244          p++;                  /* skip first / */
245       }
246    } 
247    return p;
248 }
249
250 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
251                                  pathid_cache &ppathid_cache, 
252                                  char *org_pathid, char *path)
253 {
254    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
255    char pathid[50];
256    ATTR_DBR parent;
257    char *bkp = mdb->path;
258    strncpy(pathid, org_pathid, sizeof(pathid));
259
260    /* Does the ppathid exist for this ? we use a memory cache...  In order to
261     * avoid the full loop, we consider that if a dir is allready in the
262     * PathHierarchy table, then there is no need to calculate all the
263     * hierarchy
264     */
265    while (path && *path)
266    {
267       if (!ppathid_cache.lookup(pathid))
268       {
269          Mmsg(mdb->cmd, 
270               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
271               pathid);
272
273          if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
274             goto bail_out;      /* Query failed, just leave */
275          }
276
277          /* Do we have a result ? */
278          if (sql_num_rows(mdb) > 0) {
279             ppathid_cache.insert(pathid);
280             /* This dir was in the db ...
281              * It means we can leave, the tree has allready been built for
282              * this dir
283              */
284             goto bail_out;
285          } else {
286             /* search or create parent PathId in Path table */
287             mdb->path = bvfs_parent_dir(path);
288             mdb->pnl = strlen(mdb->path);
289             if (!db_create_path_record(jcr, mdb, &parent)) {
290                goto bail_out;
291             }
292             ppathid_cache.insert(pathid);
293             
294             Mmsg(mdb->cmd,
295                  "INSERT INTO PathHierarchy (PathId, PPathId) "
296                  "VALUES (%s,%lld)",
297                  pathid, (uint64_t) parent.PathId);
298             
299             if (!INSERT_DB(jcr, mdb, mdb->cmd)) {
300                goto bail_out;   /* Can't insert the record, just leave */
301             }
302
303             edit_uint64(parent.PathId, pathid);
304             path = mdb->path;   /* already done */
305          }
306       } else {
307          /* It's already in the cache.  We can leave, no time to waste here,
308           * all the parent dirs have allready been done
309           */
310          goto bail_out;
311       }
312    }   
313
314 bail_out:
315    mdb->path = bkp;
316    mdb->fnl = 0;
317 }
318
319 /* 
320  * Internal function to update path_hierarchy cache with a shared pathid cache
321  * return Error 0
322  *        OK    1
323  */
324 static int update_path_hierarchy_cache(JCR *jcr,
325                                         B_DB *mdb,
326                                         pathid_cache &ppathid_cache,
327                                         JobId_t JobId)
328 {
329    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
330    int ret=0;
331    uint32_t num;
332    char jobid[50];
333    edit_uint64(JobId, jobid);
334  
335    db_lock(mdb);
336    db_start_transaction(jcr, mdb);
337
338    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
339    
340    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
341       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
342       ret = 1;
343       goto bail_out;
344    }
345
346    /* Inserting path records for JobId */
347    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
348                    "SELECT DISTINCT PathId, JobId "
349                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
350                            "UNION "
351                            "SELECT PathId, BaseFiles.JobId "
352                              "FROM BaseFiles JOIN File AS F USING (FileId) "
353                             "WHERE BaseFiles.JobId = %s) AS B",
354         jobid, jobid);
355
356    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
357       Dmsg1(dbglevel, "Can't fill PathVisibility %d\n", (uint32_t)JobId );
358       goto bail_out;
359    }
360
361    /* Now we have to do the directory recursion stuff to determine missing
362     * visibility We try to avoid recursion, to be as fast as possible We also
363     * only work on not allready hierarchised directories...
364     */
365    Mmsg(mdb->cmd, 
366      "SELECT PathVisibility.PathId, Path "
367        "FROM PathVisibility "
368             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
369             "LEFT JOIN PathHierarchy "
370          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
371       "WHERE PathVisibility.JobId = %s "
372         "AND PathHierarchy.PathId IS NULL "
373       "ORDER BY Path", jobid);
374    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
375
376    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
377       Dmsg1(dbglevel, "Can't get new Path %d\n", (uint32_t)JobId );
378       goto bail_out;
379    }
380
381    /* TODO: I need to reuse the DB connection without emptying the result 
382     * So, now i'm copying the result in memory to be able to query the
383     * catalog descriptor again.
384     */
385    num = sql_num_rows(mdb);
386    if (num > 0) {
387       char **result = (char **)malloc (num * 2 * sizeof(char *));
388       
389       SQL_ROW row;
390       int i=0;
391       while((row = sql_fetch_row(mdb))) {
392          result[i++] = bstrdup(row[0]);
393          result[i++] = bstrdup(row[1]);
394       }
395       
396       i=0;
397       while (num > 0) {
398          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
399          free(result[i++]);
400          free(result[i++]);
401          num--;
402       }
403       free(result);
404    }
405
406    if (mdb->db_get_type_index() == SQL_TYPE_SQLITE3) {
407       Mmsg(mdb->cmd, 
408  "INSERT INTO PathVisibility (PathId, JobId) "
409    "SELECT DISTINCT h.PPathId AS PathId, %s "
410      "FROM PathHierarchy AS h "
411     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
412       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
413            jobid, jobid, jobid );
414
415    } else {
416       Mmsg(mdb->cmd, 
417   "INSERT INTO PathVisibility (PathId, JobId)  "
418    "SELECT a.PathId,%s "
419    "FROM ( "
420      "SELECT DISTINCT h.PPathId AS PathId "
421        "FROM PathHierarchy AS h "
422        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
423       "WHERE p.JobId=%s) AS a LEFT JOIN "
424        "(SELECT PathId "
425           "FROM PathVisibility "
426          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
427    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
428    }
429
430    do {
431       ret = QUERY_DB(jcr, mdb, mdb->cmd);
432    } while (ret && sql_affected_rows(mdb) > 0);
433    
434    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
435    UPDATE_DB(jcr, mdb, mdb->cmd);
436
437 bail_out:
438    db_end_transaction(jcr, mdb);
439    db_unlock(mdb);
440    return ret;
441 }
442
443 /* 
444  * Find an store the filename descriptor for empty directories Filename.Name=''
445  */
446 DBId_t Bvfs::get_dir_filenameid()
447 {
448    uint32_t id;
449    if (dir_filenameid) {
450       return dir_filenameid;
451    }
452    POOL_MEM q;
453    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
454    db_sql_query(db, q.c_str(), db_int_handler, &id);
455    dir_filenameid = id;
456    return dir_filenameid;
457 }
458
459 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
460 {
461    uint32_t nb=0;
462    db_list_ctx jobids_list;
463
464    db_lock(mdb);
465
466 #ifdef xxx
467    /* TODO: Remove this code when updating make_bacula_table script */
468    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
469    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
470       Dmsg0(dbglevel, "Creating cache table\n");
471       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
472       QUERY_DB(jcr, mdb, mdb->cmd);
473
474       Mmsg(mdb->cmd,
475            "CREATE TABLE PathHierarchy ( "
476            "PathId integer NOT NULL, "
477            "PPathId integer NOT NULL, "
478            "CONSTRAINT pathhierarchy_pkey "
479            "PRIMARY KEY (PathId))");
480       QUERY_DB(jcr, mdb, mdb->cmd); 
481
482       Mmsg(mdb->cmd,
483            "CREATE INDEX pathhierarchy_ppathid "
484            "ON PathHierarchy (PPathId)");
485       QUERY_DB(jcr, mdb, mdb->cmd);
486
487       Mmsg(mdb->cmd, 
488            "CREATE TABLE PathVisibility ("
489            "PathId integer NOT NULL, "
490            "JobId integer NOT NULL, "
491            "Size int8 DEFAULT 0, "
492            "Files int4 DEFAULT 0, "
493            "CONSTRAINT pathvisibility_pkey "
494            "PRIMARY KEY (JobId, PathId))");
495       QUERY_DB(jcr, mdb, mdb->cmd);
496
497       Mmsg(mdb->cmd, 
498            "CREATE INDEX pathvisibility_jobid "
499            "ON PathVisibility (JobId)");
500       QUERY_DB(jcr, mdb, mdb->cmd);
501
502    }
503 #endif
504
505    Mmsg(mdb->cmd, 
506  "SELECT JobId from Job "
507   "WHERE HasCache = 0 "
508     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
509   "ORDER BY JobId");
510
511    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
512
513    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
514
515    db_start_transaction(jcr, mdb);
516    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
517    Mmsg(mdb->cmd, 
518         "DELETE FROM PathVisibility "
519          "WHERE NOT EXISTS "
520         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
521    nb = DELETE_DB(jcr, mdb, mdb->cmd);
522    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
523
524    db_end_transaction(jcr, mdb);
525    db_unlock(mdb);
526 }
527
528 /*
529  * Update the bvfs cache for given jobids (1,2,3,4)
530  */
531 int
532 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
533 {
534    pathid_cache ppathid_cache;
535    JobId_t JobId;
536    char *p;
537    int ret=1;
538
539    for (p=jobids; ; ) {
540       int stat = get_next_jobid_from_list(&p, &JobId);
541       if (stat < 0) {
542          return 0;
543       }
544       if (stat == 0) {
545          break;
546       }
547       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
548       if (!update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId)) {
549          ret = 0;
550       }
551    }
552    return ret;
553 }
554
555 /* 
556  * Update the bvfs cache for current jobids
557  */
558 void Bvfs::update_cache()
559 {
560    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
561 }
562
563 /* Change the current directory, returns true if the path exists */
564 bool Bvfs::ch_dir(const char *path)
565 {
566    pm_strcpy(db->path, path);
567    db->pnl = strlen(db->path);
568    db_lock(db);
569    ch_dir(db_get_path_record(jcr, db)); 
570    db_unlock(db);
571    return pwd_id != 0;
572 }
573
574 /* 
575  * Get all file versions for a specified client
576  * TODO: Handle basejobs using different client
577  */
578 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
579 {
580    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
581          (uint64_t)fnid, client);
582    char ed1[50], ed2[50];
583    POOL_MEM q;
584    if (see_copies) {
585       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
586    } else {
587       Mmsg(q, " AND Job.Type = 'B' ");
588    }
589
590    POOL_MEM query;
591
592    Mmsg(query,//    1           2              3       
593 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
594 //         4          5           6
595         "File.JobId, File.LStat, File.FileId, "
596 //         7                    8
597        "Media.VolumeName, Media.InChanger "
598 "FROM File, Job, Client, JobMedia, Media "
599 "WHERE File.FilenameId = %s "
600   "AND File.PathId=%s "
601   "AND File.JobId = Job.JobId "
602   "AND Job.JobId = JobMedia.JobId "
603   "AND File.FileIndex >= JobMedia.FirstIndex "
604   "AND File.FileIndex <= JobMedia.LastIndex "
605   "AND JobMedia.MediaId = Media.MediaId "
606   "AND Job.ClientId = Client.ClientId "
607   "AND Client.Name = '%s' "
608   "%s ORDER BY FileId LIMIT %d OFFSET %d"
609         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
610         limit, offset);
611    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
612    db_sql_query(db, query.c_str(), list_entries, user_data);
613 }
614
615 DBId_t Bvfs::get_root()
616 {
617    int p;
618    *db->path = 0;
619    db_lock(db);
620    p = db_get_path_record(jcr, db);
621    db_unlock(db);
622    return p;
623 }
624
625 static int path_handler(void *ctx, int fields, char **row)
626 {
627    Bvfs *fs = (Bvfs *) ctx;
628    return fs->_handle_path(ctx, fields, row);
629 }
630
631 int Bvfs::_handle_path(void *ctx, int fields, char **row)
632 {
633    if (bvfs_is_dir(row)) {
634       /* can have the same path 2 times */
635       if (strcmp(row[BVFS_Name], prev_dir)) {
636          pm_strcpy(prev_dir, row[BVFS_Name]);
637          return list_entries(user_data, fields, row);
638       }
639    }
640    return 0;
641 }
642
643 /* 
644  * Retrieve . and .. information
645  */
646 void Bvfs::ls_special_dirs()
647 {
648    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
649    char ed1[50], ed2[50];
650    if (*jobids == 0) {
651       return;
652    }
653    if (!dir_filenameid) {
654       get_dir_filenameid();
655    }
656
657    /* Will fetch directories  */
658    *prev_dir = 0;
659
660    POOL_MEM query;
661    Mmsg(query, 
662 "(SELECT PPathId AS PathId, '..' AS Path "
663     "FROM  PathHierarchy "
664    "WHERE  PathId = %s "
665 "UNION "
666  "SELECT %s AS PathId, '.' AS Path)",
667         edit_uint64(pwd_id, ed1), ed1);
668
669    POOL_MEM query2;
670    Mmsg(query2,// 1      2     3        4     5       6
671 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
672   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
673        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
674               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
675        "WHERE File1.FilenameId = %s "
676        "AND File1.JobId IN (%s)) AS listfile1 "
677   "ON (tmp.PathId = listfile1.PathId) "
678   "ORDER BY tmp.Path, JobId DESC ",
679         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
680
681    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
682    db_sql_query(db, query2.c_str(), path_handler, this);
683 }
684
685 /* Returns true if we have dirs to read */
686 bool Bvfs::ls_dirs()
687 {
688    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
689    char ed1[50], ed2[50];
690    if (*jobids == 0) {
691       return false;
692    }
693
694    POOL_MEM query;
695    POOL_MEM filter;
696    if (*pattern) {
697       Mmsg(filter, " AND Path2.Path %s '%s' ", 
698            match_query[db_get_type_index(db)], pattern);
699    }
700
701    if (!dir_filenameid) {
702       get_dir_filenameid();
703    }
704
705    /* the sql query displays same directory multiple time, take the first one */
706    *prev_dir = 0;
707
708    /* Let's retrieve the list of the visible dirs in this dir ...
709     * First, I need the empty filenameid to locate efficiently
710     * the dirs in the file table
711     * my $dir_filenameid = $self->get_dir_filenameid();
712     */
713    /* Then we get all the dir entries from File ... */
714    Mmsg(query,
715 //       0     1     2   3      4     5       6
716 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
717     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
718            "lower(Path1.Path) AS lpath, "
719            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
720            "listfile1.FileId AS FileId "
721     "FROM ( "
722       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
723       "FROM PathHierarchy AS PathHierarchy1 "
724       "JOIN Path AS Path2 "
725         "ON (PathHierarchy1.PathId = Path2.PathId) "
726       "JOIN PathVisibility AS PathVisibility1 "
727         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
728       "WHERE PathHierarchy1.PPathId = %s "
729       "AND PathVisibility1.JobId IN (%s) "
730            "%s "
731      ") AS listpath1 "
732    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
733
734    "LEFT JOIN ( " /* get attributes if any */
735        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
736               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
737        "WHERE File1.FilenameId = %s "
738        "AND File1.JobId IN (%s)) AS listfile1 "
739        "ON (listpath1.PathId = listfile1.PathId) "
740     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
741         edit_uint64(pwd_id, ed1),
742         jobids,
743         filter.c_str(),
744         edit_uint64(dir_filenameid, ed2),
745         jobids,
746         limit, offset);
747
748    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
749
750    db_lock(db);
751    db_sql_query(db, query.c_str(), path_handler, this);
752    nb_record = sql_num_rows(db);
753    db_unlock(db);
754
755    return nb_record == limit;
756 }
757
758 void build_ls_files_query(B_DB *db, POOL_MEM &query, 
759                           const char *JobId, const char *PathId,  
760                           const char *filter, int64_t limit, int64_t offset)
761 {
762    if (db_get_type_index(db) == SQL_TYPE_POSTGRESQL) {
763       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
764            JobId, PathId, JobId, PathId, 
765            filter, limit, offset);
766    } else {
767       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
768            JobId, PathId, JobId, PathId, 
769            limit, offset, filter, JobId, JobId);
770    }
771 }
772
773 /* Returns true if we have files to read */
774 bool Bvfs::ls_files()
775 {
776    POOL_MEM query;
777    POOL_MEM filter;
778    char pathid[50];
779
780    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
781    if (*jobids == 0) {
782       return false;
783    }
784
785    if (!pwd_id) {
786       ch_dir(get_root());
787    }
788
789    edit_uint64(pwd_id, pathid);
790    if (*pattern) {
791       Mmsg(filter, " AND Filename.Name %s '%s' ", 
792            match_query[db_get_type_index(db)], pattern);
793    }
794
795    build_ls_files_query(db, query, 
796                         jobids, pathid, filter.c_str(),
797                         limit, offset);
798
799    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
800
801    db_lock(db);
802    db_sql_query(db, query.c_str(), list_entries, user_data);
803    nb_record = sql_num_rows(db);
804    db_unlock(db);
805
806    return nb_record == limit;
807 }
808
809
810 /* 
811  * Return next Id from comma separated list   
812  *
813  * Returns:
814  *   1 if next Id returned
815  *   0 if no more Ids are in list
816  *  -1 there is an error
817  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
818  */
819 static int get_next_id_from_list(char **p, int64_t *Id)
820 {
821    const int maxlen = 30;
822    char id[maxlen+1];
823    char *q = *p;
824
825    id[0] = 0;
826    for (int i=0; i<maxlen; i++) {
827       if (*q == 0) {
828          break;
829       } else if (*q == ',') {
830          q++;
831          break;
832       }
833       id[i] = *q++;
834       id[i+1] = 0;
835    }
836    if (id[0] == 0) {
837       return 0;
838    } else if (!is_a_number(id)) {
839       return -1;                      /* error */
840    }
841    *p = q;
842    *Id = str_to_int64(id);
843    return 1;
844 }
845
846 static int get_path_handler(void *ctx, int fields, char **row)
847 {
848    POOL_MEM *buf = (POOL_MEM *) ctx;
849    pm_strcpy(*buf, row[0]);
850    return 0;
851 }
852
853 static bool check_temp(char *output_table)
854 {
855    if (output_table[0] == 'b' &&
856        output_table[1] == '2' &&
857        is_an_integer(output_table + 2))
858    {
859       return true;
860    }
861    return false;
862 }
863
864 void Bvfs::clear_cache()
865 {
866    db_sql_query(db, "BEGIN",                     NULL, NULL);
867    db_sql_query(db, "UPDATE Job SET HasCache=0", NULL, NULL);
868    db_sql_query(db, "TRUNCATE PathHierarchy",    NULL, NULL);
869    db_sql_query(db, "TRUNCATE PathVisibility",   NULL, NULL);
870    db_sql_query(db, "COMMIT",                    NULL, NULL);
871 }
872
873 bool Bvfs::drop_restore_list(char *output_table)
874 {
875    POOL_MEM query;
876    if (check_temp(output_table)) {
877       Mmsg(query, "DROP TABLE %s", output_table);
878       db_sql_query(db, query.c_str(), NULL, NULL);
879       return true;
880    }
881    return false;
882 }
883
884 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
885                                 char *output_table)
886 {
887    POOL_MEM query;
888    POOL_MEM tmp, tmp2;
889    int64_t id, jobid, prev_jobid;
890    bool init=false;
891    bool ret=false;
892    /* check args */
893    if ((*fileid   && !is_a_number_list(fileid))  ||
894        (*dirid    && !is_a_number_list(dirid))   ||
895        (*hardlink && !is_a_number_list(hardlink))||
896        (!*hardlink && !*fileid && !*dirid && !*hardlink))
897    {
898       return false;
899    }
900    if (!check_temp(output_table)) {
901       return false;
902    }
903
904    db_lock(db);
905
906    /* Cleanup old tables first */
907    Mmsg(query, "DROP TABLE btemp%s", output_table);
908    db_sql_query(db, query.c_str());
909
910    Mmsg(query, "DROP TABLE %s", output_table);
911    db_sql_query(db, query.c_str());
912
913    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
914
915    if (*fileid) {               /* Select files with their direct id */
916       init=true;
917       Mmsg(tmp,"SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
918                       "PathId, FileId "
919                  "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
920            fileid);
921       pm_strcat(query, tmp.c_str());
922    }
923
924    /* Add a directory content */
925    while (get_next_id_from_list(&dirid, &id) == 1) {
926       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
927       
928       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
929          Dmsg0(dbglevel, "Can't search for path\n");
930          /* print error */
931          goto bail_out;
932       }
933       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
934          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
935                id, tmp.c_str(), tmp2.c_str());
936          break;
937       }
938       /* escape % and _ for LIKE search */
939       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
940       char *p = tmp.c_str();
941       for (char *s = tmp2.c_str(); *s ; s++) {
942          if (*s == '%' || *s == '_' || *s == '\\') {
943             *p = '\\'; 
944             p++;
945          }
946          *p = *s; 
947          p++;
948       }
949       *p = '\0';
950       tmp.strcat("%");
951
952       size_t len = strlen(tmp.c_str());
953       tmp2.check_size((len+1) * 2);
954       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
955
956       if (init) {
957          query.strcat(" UNION ");
958       }
959
960       Mmsg(tmp, "SELECT Job.JobId, JobTDate, File.FileIndex, File.FilenameId, "
961                         "File.PathId, FileId "
962                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
963                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ", 
964            tmp2.c_str(), jobids); 
965       query.strcat(tmp.c_str());
966       init = true;
967
968       query.strcat(" UNION ");
969
970       /* A directory can have files from a BaseJob */
971       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
972                         "File.FilenameId, File.PathId, BaseFiles.FileId "
973                    "FROM BaseFiles "
974                         "JOIN File USING (FileId) "
975                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
976                         "JOIN Path USING (PathId) "
977                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ", 
978            tmp2.c_str(), jobids); 
979       query.strcat(tmp.c_str());
980    }
981
982    /* expect jobid,fileindex */
983    prev_jobid=0;
984    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
985       if (get_next_id_from_list(&hardlink, &id) != 1) {
986          Dmsg0(dbglevel, "hardlink should be two by two\n");
987          goto bail_out;
988       }
989       if (jobid != prev_jobid) { /* new job */
990          if (prev_jobid == 0) {  /* first jobid */
991             if (init) {
992                query.strcat(" UNION ");
993             }
994          } else {               /* end last job, start new one */
995             tmp.strcat(") UNION ");
996             query.strcat(tmp.c_str());
997          }
998          Mmsg(tmp,   "SELECT Job.JobId, JobTDate, FileIndex, FilenameId, "
999                             "PathId, FileId "
1000                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld " 
1001                         "AND FileIndex IN (%lld", jobid, id);
1002          prev_jobid = jobid;
1003
1004       } else {                  /* same job, add new findex */
1005          Mmsg(tmp2, ", %lld", id);
1006          tmp.strcat(tmp2.c_str());
1007       }
1008    }
1009
1010    if (prev_jobid != 0) {       /* end last job */
1011       tmp.strcat(") ");
1012       query.strcat(tmp.c_str());
1013       init = true;
1014    }
1015
1016    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1017
1018    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
1019       Dmsg0(dbglevel, "Can't execute q\n");
1020       goto bail_out;
1021    }
1022
1023    Mmsg(query, sql_bvfs_select[db_get_type_index(db)], 
1024         output_table, output_table, output_table);
1025
1026    /* TODO: handle jobid filter */
1027    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1028    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
1029       Dmsg0(dbglevel, "Can't execute q\n");
1030       goto bail_out;
1031    }
1032
1033    /* MySQL need it */
1034    if (db_get_type_index(db) == SQL_TYPE_MYSQL) {
1035       Mmsg(query, "CREATE INDEX idx_%s ON %s (JobId)", 
1036            output_table, output_table);
1037       Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1038       if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
1039          Dmsg0(dbglevel, "Can't execute q\n");
1040          goto bail_out;
1041       }
1042    }
1043
1044    ret = true;
1045
1046 bail_out:
1047    Mmsg(query, "DROP TABLE btemp%s", output_table);
1048    db_sql_query(db, query.c_str(), NULL, NULL);
1049    db_unlock(db);
1050    return ret;
1051 }
1052
1053 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI */