]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Adapt bvfs for BaseJobs (path is ok)
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_versions = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71 }
72
73 Bvfs::~Bvfs() {
74    free_pool_memory(jobids);
75    free_pool_memory(pattern);
76    free_pool_memory(prev_dir);
77    free_attr(attr);
78    jcr->dec_use_count();
79 }
80
81 /* 
82  * TODO: Find a way to let the user choose how he wants to display
83  * files and directories
84  */
85
86
87 /* 
88  * Working Object to store PathId already seen (avoid
89  * database queries), equivalent to %cache_ppathid in perl
90  */
91
92 #define NITEMS 50000
93 class pathid_cache {
94 private:
95    hlink *nodes;
96    int nb_node;
97    int max_node;
98
99    alist *table_node;
100
101    htable *cache_ppathid;
102
103 public:
104    pathid_cache() {
105       hlink link;
106       cache_ppathid = (htable *)malloc(sizeof(htable));
107       cache_ppathid->init(&link, &link, NITEMS);
108       max_node = NITEMS;
109       nodes = (hlink *) malloc(max_node * sizeof (hlink));
110       nb_node = 0;
111       table_node = New(alist(5, owned_by_alist));
112       table_node->append(nodes);
113    }
114
115    hlink *get_hlink() {
116       if (++nb_node >= max_node) {
117          nb_node = 0;
118          nodes = (hlink *)malloc(max_node * sizeof(hlink));
119          table_node->append(nodes);
120       }
121       return nodes + nb_node;
122    }
123
124    bool lookup(char *pathid) {
125       bool ret = cache_ppathid->lookup(pathid) != NULL;
126       return ret;
127    }
128    
129    void insert(char *pathid) {
130       hlink *h = get_hlink();
131       cache_ppathid->insert(pathid, h);
132    }
133
134    ~pathid_cache() {
135       cache_ppathid->destroy();
136       free(cache_ppathid);
137       delete table_node;
138    }
139 private:
140    pathid_cache(const pathid_cache &); /* prohibit pass by value */
141    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
142 } ;
143
144 /* Return the parent_dir with the trailing /  (update the given string)
145  * TODO: see in the rest of bacula if we don't have already this function
146  * dir=/tmp/toto/
147  * dir=/tmp/
148  * dir=/
149  * dir=
150  */
151 char *bvfs_parent_dir(char *path)
152 {
153    char *p = path;
154    int len = strlen(path) - 1;
155
156    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
157       path[len] = '\0';
158    }
159
160    if (len > 0) {
161       p += len;
162       while (p > path && !IsPathSeparator(*p)) {
163          p--;
164       }
165       p[1] = '\0';
166    }
167    return path;
168 }
169
170 /* Return the basename of the with the trailing /
171  * TODO: see in the rest of bacula if we don't have
172  * this function already
173  */
174 char *bvfs_basename_dir(char *path)
175 {
176    char *p = path;
177    int len = strlen(path) - 1;
178
179    if (path[len] == '/') {      /* if directory, skip last / */
180       len -= 1;
181    }
182
183    if (len > 0) {
184       p += len;
185       while (p > path && !IsPathSeparator(*p)) {
186          p--;
187       }
188       if (*p == '/') {
189          p++;                  /* skip first / */
190       }
191    } 
192    return p;
193 }
194
195 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
196                                  pathid_cache &ppathid_cache, 
197                                  char *org_pathid, char *path)
198 {
199    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
200    char pathid[50];
201    ATTR_DBR parent;
202    char *bkp = mdb->path;
203    strncpy(pathid, org_pathid, sizeof(pathid));
204
205    /* Does the ppathid exist for this ? we use a memory cache...  In order to
206     * avoid the full loop, we consider that if a dir is allready in the
207     * PathHierarchy table, then there is no need to calculate all the
208     * hierarchy
209     */
210    while (path && *path)
211    {
212       if (!ppathid_cache.lookup(pathid))
213       {
214          Mmsg(mdb->cmd, 
215               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
216               pathid);
217
218          QUERY_DB(jcr, mdb, mdb->cmd);
219          /* Do we have a result ? */
220          if (sql_num_rows(mdb) > 0) {
221             ppathid_cache.insert(pathid);
222             /* This dir was in the db ...
223              * It means we can leave, the tree has allready been built for
224              * this dir
225              */
226             goto bail_out;
227          } else {
228             /* search or create parent PathId in Path table */
229             mdb->path = bvfs_parent_dir(path);
230             mdb->pnl = strlen(mdb->path);
231             if (!db_create_path_record(jcr, mdb, &parent)) {
232                goto bail_out;
233             }
234             ppathid_cache.insert(pathid);
235             
236             Mmsg(mdb->cmd,
237                  "INSERT INTO PathHierarchy (PathId, PPathId) "
238                  "VALUES (%s,%lld)",
239                  pathid, (uint64_t) parent.PathId);
240             
241             INSERT_DB(jcr, mdb, mdb->cmd);
242
243             edit_uint64(parent.PathId, pathid);
244             path = mdb->path;   /* already done */
245          }
246       } else {
247          /* It's already in the cache.  We can leave, no time to waste here,
248           * all the parent dirs have allready been done
249           */
250          goto bail_out;
251       }
252    }   
253
254 bail_out:
255    mdb->path = bkp;
256    mdb->fnl = 0;
257 }
258
259 /* 
260  * Internal function to update path_hierarchy cache with a shared pathid cache
261  */
262 static void update_path_hierarchy_cache(JCR *jcr,
263                                         B_DB *mdb,
264                                         pathid_cache &ppathid_cache,
265                                         JobId_t JobId)
266 {
267    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
268
269    uint32_t num;
270    char jobid[50];
271    edit_uint64(JobId, jobid);
272  
273    db_lock(mdb);
274    db_start_transaction(jcr, mdb);
275
276    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
277    
278    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
279       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
280       goto bail_out;
281    }
282
283    /* Inserting path records for JobId */
284    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
285                    "SELECT DISTINCT PathId, JobId "
286                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
287                            "UNION "
288                            "SELECT PathId, BaseFiles.JobId FROM BaseFiles JOIN File AS F USING (FileId) "
289                             "WHERE BaseFiles.JobId = %s) AS B",
290         jobid, jobid);
291    QUERY_DB(jcr, mdb, mdb->cmd);
292
293    /* Now we have to do the directory recursion stuff to determine missing
294     * visibility We try to avoid recursion, to be as fast as possible We also
295     * only work on not allready hierarchised directories...
296     */
297    Mmsg(mdb->cmd, 
298      "SELECT PathVisibility.PathId, Path "
299        "FROM PathVisibility "
300             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
301             "LEFT JOIN PathHierarchy "
302          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
303       "WHERE PathVisibility.JobId = %s "
304         "AND PathHierarchy.PathId IS NULL "
305       "ORDER BY Path", jobid);
306    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
307    QUERY_DB(jcr, mdb, mdb->cmd);
308
309    /* TODO: I need to reuse the DB connection without emptying the result 
310     * So, now i'm copying the result in memory to be able to query the
311     * catalog descriptor again.
312     */
313    num = sql_num_rows(mdb);
314    if (num > 0) {
315       char **result = (char **)malloc (num * 2 * sizeof(char *));
316       
317       SQL_ROW row;
318       int i=0;
319       while((row = sql_fetch_row(mdb))) {
320          result[i++] = bstrdup(row[0]);
321          result[i++] = bstrdup(row[1]);
322       }
323       
324       i=0;
325       while (num > 0) {
326          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
327          free(result[i++]);
328          free(result[i++]);
329          num--;
330       }
331       free(result);
332    }
333
334    Mmsg(mdb->cmd, 
335   "INSERT INTO PathVisibility (PathId, JobId)  "
336    "SELECT a.PathId,%s "
337    "FROM ( "
338      "SELECT DISTINCT h.PPathId AS PathId "
339        "FROM PathHierarchy AS h "
340        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
341       "WHERE p.JobId=%s) AS a LEFT JOIN "
342        "(SELECT PathId "
343           "FROM PathVisibility "
344          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
345    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
346
347    do {
348       QUERY_DB(jcr, mdb, mdb->cmd);
349    } while (sql_affected_rows(mdb) > 0);
350    
351    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
352    UPDATE_DB(jcr, mdb, mdb->cmd);
353
354 bail_out:
355    db_end_transaction(jcr, mdb);
356    db_unlock(mdb);
357 }
358
359 /* 
360  * Find an store the filename descriptor for empty directories Filename.Name=''
361  */
362 DBId_t Bvfs::get_dir_filenameid()
363 {
364    uint32_t id;
365    if (dir_filenameid) {
366       return dir_filenameid;
367    }
368    POOL_MEM q;
369    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
370    db_sql_query(db, q.c_str(), db_int_handler, &id);
371    dir_filenameid = id;
372    return dir_filenameid;
373 }
374
375 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
376 {
377    uint32_t nb=0;
378    db_list_ctx jobids_list;
379
380    db_lock(mdb);
381    db_start_transaction(jcr, mdb);
382
383 #ifdef xxx
384    /* TODO: Remove this code when updating make_bacula_table script */
385    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
386    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
387       Dmsg0(dbglevel, "Creating cache table\n");
388       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
389       QUERY_DB(jcr, mdb, mdb->cmd);
390
391       Mmsg(mdb->cmd,
392            "CREATE TABLE PathHierarchy ( "
393            "PathId integer NOT NULL, "
394            "PPathId integer NOT NULL, "
395            "CONSTRAINT pathhierarchy_pkey "
396            "PRIMARY KEY (PathId))");
397       QUERY_DB(jcr, mdb, mdb->cmd); 
398
399       Mmsg(mdb->cmd,
400            "CREATE INDEX pathhierarchy_ppathid "
401            "ON PathHierarchy (PPathId)");
402       QUERY_DB(jcr, mdb, mdb->cmd);
403
404       Mmsg(mdb->cmd, 
405            "CREATE TABLE PathVisibility ("
406            "PathId integer NOT NULL, "
407            "JobId integer NOT NULL, "
408            "Size int8 DEFAULT 0, "
409            "Files int4 DEFAULT 0, "
410            "CONSTRAINT pathvisibility_pkey "
411            "PRIMARY KEY (JobId, PathId))");
412       QUERY_DB(jcr, mdb, mdb->cmd);
413
414       Mmsg(mdb->cmd, 
415            "CREATE INDEX pathvisibility_jobid "
416            "ON PathVisibility (JobId)");
417       QUERY_DB(jcr, mdb, mdb->cmd);
418
419    }
420 #endif
421
422    Mmsg(mdb->cmd, 
423  "SELECT JobId from Job "
424   "WHERE HasCache = 0 "
425     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
426   "ORDER BY JobId");
427
428    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
429
430    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
431
432    db_end_transaction(jcr, mdb);
433    db_start_transaction(jcr, mdb);
434    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
435    Mmsg(mdb->cmd, 
436         "DELETE FROM PathVisibility "
437          "WHERE NOT EXISTS "
438         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
439    nb = DELETE_DB(jcr, mdb, mdb->cmd);
440    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
441
442    db_end_transaction(jcr, mdb);
443    db_unlock(mdb);
444 }
445
446 /*
447  * Update the bvfs cache for given jobids (1,2,3,4)
448  */
449 void
450 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
451 {
452    pathid_cache ppathid_cache;
453    JobId_t JobId;
454    char *p;
455
456    for (p=jobids; ; ) {
457       int stat = get_next_jobid_from_list(&p, &JobId);
458       if (stat < 0) {
459          return;
460       }
461       if (stat == 0) {
462          break;
463       }
464       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
465       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
466    }
467 }
468
469 /* 
470  * Update the bvfs cache for current jobids
471  */
472 void Bvfs::update_cache()
473 {
474    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
475 }
476
477 /* Change the current directory, returns true if the path exists */
478 bool Bvfs::ch_dir(const char *path)
479 {
480    pm_strcpy(db->path, path);
481    db->pnl = strlen(db->path);
482    ch_dir(db_get_path_record(jcr, db)); 
483    return pwd_id != 0;
484 }
485
486 /* 
487  * Get all file versions for a specified client
488  */
489 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
490 {
491    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
492          (uint64_t)fnid, client);
493    char ed1[50], ed2[50];
494    POOL_MEM q;
495    if (see_copies) {
496       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
497    } else {
498       Mmsg(q, " AND Job.Type = 'B' ");
499    }
500
501    POOL_MEM query;
502
503    Mmsg(query,//    1           2              3       
504 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
505 //         4          5           6
506         "File.JobId, File.LStat, File.FileId, "
507 //         7                    8
508        "Media.VolumeName, Media.InChanger "
509 "FROM File, Job, Client, JobMedia, Media "
510 "WHERE File.FilenameId = %s "
511   "AND File.PathId=%s "
512   "AND File.JobId = Job.JobId "
513   "AND Job.ClientId = Client.ClientId "
514   "AND Job.JobId = JobMedia.JobId "
515   "AND File.FileIndex >= JobMedia.FirstIndex "
516   "AND File.FileIndex <= JobMedia.LastIndex "
517   "AND JobMedia.MediaId = Media.MediaId "
518   "AND Client.Name = '%s' "
519   "%s ORDER BY FileId LIMIT %d OFFSET %d"
520         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
521         limit, offset);
522    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
523    db_sql_query(db, query.c_str(), list_entries, user_data);
524
525    /* Handle BaseJobs version */
526    Mmsg(query,//    1           2              3       
527 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
528 //         4          5           6
529         "File.JobId, File.LStat, File.FileId, "
530 //         7                    8
531        "Media.VolumeName, Media.InChanger "
532 "FROM File, Job, Client, JobMedia, Media, BaseFiles "
533 "WHERE File.FilenameId = %s "
534   "AND File.PathId = %s "
535   "AND Job.JobId = BaseFiles.JobId "
536   "AND File.JobId = BaseFiles.BaseJobId "
537   "AND File.FileId = BaseFiles.FileId "
538   "AND Job.ClientId = Client.ClientId "
539   "AND Job.JobId = JobMedia.JobId "
540   "AND File.FileIndex >= JobMedia.FirstIndex "
541   "AND File.FileIndex <= JobMedia.LastIndex "
542   "AND JobMedia.MediaId = Media.MediaId "
543   "AND Client.Name = '%s' "
544   "%s ORDER BY FileId LIMIT %d OFFSET %d"
545         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
546         limit, offset);
547    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
548    db_sql_query(db, query.c_str(), list_entries, user_data);
549 }
550
551 DBId_t Bvfs::get_root()
552 {
553    *db->path = 0;
554    return db_get_path_record(jcr, db);
555 }
556
557 static int path_handler(void *ctx, int fields, char **row)
558 {
559    Bvfs *fs = (Bvfs *) ctx;
560    return fs->_handle_path(ctx, fields, row);
561 }
562
563 int Bvfs::_handle_path(void *ctx, int fields, char **row)
564 {
565    if (bvfs_is_dir(row)) {
566       /* can have the same path 2 times */
567       if (strcmp(row[BVFS_Name], prev_dir)) {
568          pm_strcpy(prev_dir, row[BVFS_Name]);
569          return list_entries(user_data, fields, row);
570       }
571    }
572    return 0;
573 }
574
575 /* 
576  * Retrieve . and .. information
577  */
578 void Bvfs::ls_special_dirs()
579 {
580    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
581    char ed1[50], ed2[50];
582    if (*jobids == 0) {
583       return;
584    }
585    if (!dir_filenameid) {
586       get_dir_filenameid();
587    }
588
589    /* Will fetch directories  */
590    *prev_dir = 0;
591
592    POOL_MEM query;
593    Mmsg(query, 
594 "((SELECT PPathId AS PathId, '..' AS Path "
595     "FROM  PathHierarchy "
596    "WHERE  PathId = %s) "
597 "UNION "
598  "(SELECT %s AS PathId, '.' AS Path))",
599         edit_uint64(pwd_id, ed1), ed1);
600
601    POOL_MEM query2;
602    Mmsg(query2,// 1      2     3        4     5       6
603 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
604   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
605        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
606               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
607        "WHERE File1.FilenameId = %s "
608        "AND File1.JobId IN (%s)) AS listfile1 "
609   "ON (tmp.PathId = listfile1.PathId) "
610   "ORDER BY tmp.Path, JobId DESC ",
611         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
612
613    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
614    db_sql_query(db, query2.c_str(), path_handler, this);
615 }
616
617 /* Returns true if we have dirs to read */
618 bool Bvfs::ls_dirs()
619 {
620    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
621    char ed1[50], ed2[50];
622    if (*jobids == 0) {
623       return false;
624    }
625
626    POOL_MEM filter;
627    if (*pattern) {
628       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
629    }
630
631    if (!dir_filenameid) {
632       get_dir_filenameid();
633    }
634
635    /* the sql query displays same directory multiple time, take the first one */
636    *prev_dir = 0;
637
638    /* Let's retrieve the list of the visible dirs in this dir ...
639     * First, I need the empty filenameid to locate efficiently
640     * the dirs in the file table
641     * my $dir_filenameid = $self->get_dir_filenameid();
642     */
643    /* Then we get all the dir entries from File ... */
644    POOL_MEM query;
645    Mmsg(query,
646 //       0     1     2   3      4     5       6
647 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
648     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
649            "lower(Path1.Path) AS lpath, "
650            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
651            "listfile1.FileId AS FileId "
652     "FROM ( "
653       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
654       "FROM PathHierarchy AS PathHierarchy1 "
655       "JOIN Path AS Path2 "
656         "ON (PathHierarchy1.PathId = Path2.PathId) "
657       "JOIN PathVisibility AS PathVisibility1 "
658         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
659       "WHERE PathHierarchy1.PPathId = %s "
660       "AND PathVisibility1.jobid IN (%s) "
661            "%s "
662      ") AS listpath1 "
663    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
664
665    "LEFT JOIN ( " /* get attributes if any */
666        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
667               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
668        "WHERE File1.FilenameId = %s "
669        "AND File1.JobId IN (%s)) AS listfile1 "
670        "ON (listpath1.PathId = listfile1.PathId) "
671     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
672         edit_uint64(pwd_id, ed1),
673         jobids,
674         filter.c_str(),
675         edit_uint64(dir_filenameid, ed2),
676         jobids,
677         limit, offset);
678
679    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
680
681    db_lock(db);
682    db_sql_query(db, query.c_str(), path_handler, this);
683    nb_record = db->num_rows;
684    db_unlock(db);
685
686    return nb_record == limit;
687 }
688
689 /* Returns true if we have files to read */
690 bool Bvfs::ls_files()
691 {
692    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
693    char ed1[50];
694    if (*jobids == 0) {
695       return false;
696    }
697
698    if (!pwd_id) {
699       ch_dir(get_root());
700    }
701
702    POOL_MEM filter;
703    if (*pattern) {
704       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
705    }
706    /* TODO: Use JobTDate instead of FileId to determine the latest version */
707    POOL_MEM query;
708    Mmsg(query, //    1              2             3          4
709 "SELECT 'F', File.PathId, File.FilenameId, listfiles.Name, File.JobId, "
710         "File.LStat, listfiles.id "
711 "FROM File, ( "
712        "SELECT Filename.Name as Name, max(File.FileId) as id "
713          "FROM File, Filename "
714         "WHERE File.FilenameId = Filename.FilenameId "
715           "AND Filename.Name != '' "
716           "AND File.PathId = %s "
717           "AND File.JobId IN (%s) "
718           "%s "
719         "GROUP BY Filename.Name "
720         "ORDER BY Filename.Name LIMIT %d OFFSET %d "
721      ") AS listfiles "
722 "WHERE File.FileId = listfiles.id",
723         edit_uint64(pwd_id, ed1),
724         jobids,
725         filter.c_str(),
726         limit,
727         offset);
728    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
729
730    db_lock(db);
731    db_sql_query(db, query.c_str(), list_entries, user_data);
732    nb_record = db->num_rows;
733    db_unlock(db);
734
735    return nb_record == limit;
736 }
737
738
739 /* 
740  * Return next Id from comma separated list   
741  *
742  * Returns:
743  *   1 if next Id returned
744  *   0 if no more Ids are in list
745  *  -1 there is an error
746  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
747  */
748 static int get_next_id_from_list(char **p, int64_t *Id)
749 {
750    const int maxlen = 30;
751    char id[maxlen+1];
752    char *q = *p;
753
754    id[0] = 0;
755    for (int i=0; i<maxlen; i++) {
756       if (*q == 0) {
757          break;
758       } else if (*q == ',') {
759          q++;
760          break;
761       }
762       id[i] = *q++;
763       id[i+1] = 0;
764    }
765    if (id[0] == 0) {
766       return 0;
767    } else if (!is_a_number(id)) {
768       return -1;                      /* error */
769    }
770    *p = q;
771    *Id = str_to_int64(id);
772    return 1;
773 }
774
775 static int get_path_handler(void *ctx, int fields, char **row)
776 {
777    POOL_MEM *buf = (POOL_MEM *) ctx;
778    pm_strcpy(*buf, row[0]);
779    return 0;
780 }
781
782 static bool check_temp(char *output_table)
783 {
784    if (output_table[0] == 'b' &&
785        output_table[1] == '2' &&
786        is_an_integer(output_table + 2))
787    {
788       return true;
789    }
790    return false;
791 }
792
793 bool Bvfs::drop_restore_list(char *output_table)
794 {
795    POOL_MEM query;
796    if (check_temp(output_table)) {
797       Mmsg(query, "DROP TABLE %s", output_table);
798       db_sql_query(db, query.c_str(), NULL, NULL);
799       return true;
800    }
801    return false;
802 }
803
804 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
805                                 char *output_table)
806 {
807    POOL_MEM query;
808    POOL_MEM tmp, tmp2;
809    int64_t id, jobid;
810    bool init=false;
811    bool ret=false;
812    /* check args */
813    if ((*fileid   && !is_a_number_list(fileid))  ||
814        (*dirid    && !is_a_number_list(dirid))   ||
815        (*hardlink && !is_a_number_list(hardlink))||
816        (!*hardlink && !*fileid && !*dirid && !*hardlink))
817    {
818       return false;
819    }
820    if (!check_temp(output_table)) {
821       return false;
822    }
823
824    Mmsg(query, "CREATE TEMPORARY TABLE btemp%s AS ", output_table);
825
826    if (*fileid) {
827       init=true;
828       Mmsg(tmp, "(SELECT JobId, FileIndex, FilenameId, PathId, FileId "
829                    "FROM File WHERE FileId IN (%s))", fileid);
830       pm_strcat(query, tmp.c_str());
831    }
832
833    while (get_next_id_from_list(&dirid, &id) == 1) {
834       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
835       
836       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
837          /* print error */
838          return false;
839       }
840       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
841          Dmsg3(0, "Path not found %lld q=%s s=%s\n",
842                id, tmp.c_str(), tmp2.c_str());
843          break;
844       }
845       /* escape % and _ for LIKE search */
846       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
847       char *p = tmp.c_str();
848       for (char *s = tmp2.c_str(); *s ; s++) {
849          if (*s == '%' || *s == '_' || *s == '\\') {
850             *p = '\\'; 
851             p++;
852          }
853          *p = *s; 
854          p++;
855       }
856       *p = '\0';
857       tmp.strcat("%");
858
859       size_t len = strlen(tmp.c_str());
860       tmp2.check_size((len+1) * 2);
861       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
862
863       if (init) {
864          query.strcat(" UNION ");
865       }
866       Mmsg(tmp, "(SELECT File.JobId, File.FileIndex, File.FilenameId, "
867                         "File.PathId, FileId "
868                    "FROM Path JOIN File USING (PathId) "
869                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s)) ", 
870            tmp2.c_str(), jobids); 
871       query.strcat(tmp.c_str());
872       init = true;
873    }
874
875    /* expect jobid,fileindex */
876    int64_t prev_jobid=0;
877    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
878       if (get_next_id_from_list(&hardlink, &id) != 1) {
879          return false;
880       }
881       if (jobid != prev_jobid) { /* new job */
882          if (prev_jobid == 0) {  /* first jobid */
883             if (init) {
884                query.strcat(" UNION ");
885             }
886          } else {               /* end last job, start new one */
887             tmp.strcat(")) UNION ");
888             query.strcat(tmp.c_str());
889          }
890          Mmsg(tmp, "(SELECT JobId, FileIndex, FilenameId, PathId, FileId "
891                        "FROM File WHERE JobId = %lld " 
892                         "AND FileIndex IN (%lld", jobid, id);
893          prev_jobid = jobid;
894
895       } else {                  /* same job, add new findex */
896          Mmsg(tmp2, ", %lld", id);
897          tmp.strcat(tmp2.c_str());
898       }
899    }
900
901    if (prev_jobid != 0) {       /* end last job */
902       tmp.strcat(")) ");
903       query.strcat(tmp.c_str());
904       init = true;
905    }
906
907    Dmsg1(0, "q=%s\n", query.c_str());
908
909    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
910       goto bail_out;
911    }
912
913    Mmsg(query, "CREATE TABLE %s AS ( "
914         "SELECT JobId, FileIndex, FileId "
915           "FROM ( "
916      "SELECT DISTINCT ON (PathId, FilenameId) JobId, FileIndex, FileId "
917        "FROM btemp%s "
918       "ORDER BY PathId, FilenameId, JobId DESC "
919           ") AS T "
920           "WHERE FileIndex > 0)", output_table, output_table);
921
922    Dmsg1(0, "q=%s\n", query.c_str());
923    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
924       goto bail_out;
925    }
926    ret = true;
927
928 bail_out:
929    Mmsg(query, "DROP TABLE btemp%s", output_table);
930    db_sql_query(db, query.c_str(), NULL, NULL);
931    return ret;
932 }