]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
0be85604cd506fe73e150572712cfebac7950d6c
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    prev_dir = get_pool_memory(PM_NAME);
62    jobids = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *prev_dir = *pattern = *jobids = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_version = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71 }
72
73 Bvfs::~Bvfs() {
74    free_pool_memory(jobids);
75    free_pool_memory(pattern);
76    free_pool_memory(prev_dir);
77    free_attr(attr);
78    jcr->dec_use_count();
79 }
80
81 /* 
82  * TODO: Find a way to let the user choose how he wants to display
83  * files and directories
84  */
85
86
87 /* 
88  * Working Object to store PathId already seen (avoid
89  * database queries), equivalent to %cache_ppathid in perl
90  */
91
92 #define NITEMS 50000
93 class pathid_cache {
94 private:
95    hlink *nodes;
96    int nb_node;
97    int max_node;
98    htable *cache_ppathid;
99
100 public:
101    pathid_cache() {
102       hlink link;
103       cache_ppathid = (htable *)malloc(sizeof(htable));
104       cache_ppathid->init(&link, &link, NITEMS);
105       max_node = NITEMS;
106       nodes = (hlink *) malloc(max_node * sizeof (hlink));
107       nb_node = 0;
108    }
109
110    hlink *get_hlink() {
111       if (nb_node >= max_node) {
112          max_node *= 2;
113          nodes = (hlink *)brealloc(nodes, sizeof(hlink) * max_node);
114       }
115       return nodes + nb_node++;
116    }
117
118    bool lookup(char *pathid) {
119       bool ret = cache_ppathid->lookup(pathid) != NULL;
120       return ret;
121    }
122    
123    void insert(char *pathid) {
124       hlink *h = get_hlink();
125       cache_ppathid->insert(pathid, h);
126    }
127
128    ~pathid_cache() {
129       cache_ppathid->destroy();
130       free(cache_ppathid);
131       free(nodes);
132    }
133 private:
134    pathid_cache(const pathid_cache &); /* prohibit pass by value */
135    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
136 } ;
137
138 /* Return the parent_dir with the trailing /  (update the given string)
139  * TODO: see in the rest of bacula if we don't have already this function
140  * dir=/tmp/toto/
141  * dir=/tmp/
142  * dir=/
143  * dir=
144  */
145 char *bvfs_parent_dir(char *path)
146 {
147    char *p = path;
148    int len = strlen(path) - 1;
149
150    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
151       path[len] = '\0';
152    }
153
154    if (len > 0) {
155       p += len;
156       while (p > path && !IsPathSeparator(*p)) {
157          p--;
158       }
159       p[1] = '\0';
160    }
161    return path;
162 }
163
164 /* Return the basename of the with the trailing /
165  * TODO: see in the rest of bacula if we don't have
166  * this function already
167  */
168 char *bvfs_basename_dir(char *path)
169 {
170    char *p = path;
171    int len = strlen(path) - 1;
172
173    if (path[len] == '/') {      /* if directory, skip last / */
174       len -= 1;
175    }
176
177    if (len > 0) {
178       p += len;
179       while (p > path && !IsPathSeparator(*p)) {
180          p--;
181       }
182       if (*p == '/') {
183          p++;                  /* skip first / */
184       }
185    } 
186    return p;
187 }
188
189 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
190                                  pathid_cache &ppathid_cache, 
191                                  char *org_pathid, char *path)
192 {
193    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
194    char pathid[50];
195    ATTR_DBR parent;
196    char *bkp = mdb->path;
197    strncpy(pathid, org_pathid, sizeof(pathid));
198
199    /* Does the ppathid exist for this ? we use a memory cache...  In order to
200     * avoid the full loop, we consider that if a dir is allready in the
201     * brestore_pathhierarchy table, then there is no need to calculate all the
202     * hierarchy
203     */
204    while (path && *path)
205    {
206       if (!ppathid_cache.lookup(pathid))
207       {
208          Mmsg(mdb->cmd, 
209               "SELECT PPathId FROM brestore_pathhierarchy WHERE PathId = %s",
210               pathid);
211
212          QUERY_DB(jcr, mdb, mdb->cmd);
213          /* Do we have a result ? */
214          if (sql_num_rows(mdb) > 0) {
215             ppathid_cache.insert(pathid);
216             /* This dir was in the db ...
217              * It means we can leave, the tree has allready been built for
218              * this dir
219              */
220             goto bail_out;
221          } else {
222             /* search or create parent PathId in Path table */
223             mdb->path = bvfs_parent_dir(path);
224             mdb->pnl = strlen(mdb->path);
225             if (!db_create_path_record(jcr, mdb, &parent)) {
226                goto bail_out;
227             }
228             ppathid_cache.insert(pathid);
229             
230             Mmsg(mdb->cmd,
231                  "INSERT INTO brestore_pathhierarchy (PathId, PPathId) "
232                  "VALUES (%s,%lld)",
233                  pathid, (uint64_t) parent.PathId);
234             
235             INSERT_DB(jcr, mdb, mdb->cmd);
236
237             edit_uint64(parent.PathId, pathid);
238             path = mdb->path;   /* already done */
239          }
240       } else {
241          /* It's allready in the cache.  We can leave, no time to waste here,
242           * all the parent dirs have allready been done
243           */
244          goto bail_out;
245       }
246    }   
247
248 bail_out:
249    mdb->path = bkp;
250    mdb->fnl = 0;
251 }
252
253 /* 
254  * Internal function to update path_hierarchy cache with a shared pathid cache
255  */
256 static void update_path_hierarchy_cache(JCR *jcr,
257                                         B_DB *mdb,
258                                         pathid_cache &ppathid_cache,
259                                         JobId_t JobId)
260 {
261    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
262
263    uint32_t num;
264    char jobid[50];
265    edit_uint64(JobId, jobid);
266  
267    db_lock(mdb);
268    db_start_transaction(jcr, mdb);
269
270    Mmsg(mdb->cmd, "SELECT 1 FROM brestore_knownjobid WHERE JobId = %s", jobid);
271    
272    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
273       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
274       goto bail_out;
275    }
276
277    /* Inserting path records for JobId */
278    Mmsg(mdb->cmd, "INSERT INTO brestore_pathvisibility (PathId, JobId) "
279                   "SELECT DISTINCT PathId, JobId FROM File WHERE JobId = %s",
280         jobid);
281    QUERY_DB(jcr, mdb, mdb->cmd);
282
283
284    /* Now we have to do the directory recursion stuff to determine missing
285     * visibility We try to avoid recursion, to be as fast as possible We also
286     * only work on not allready hierarchised directories...
287     */
288    Mmsg(mdb->cmd, 
289      "SELECT brestore_pathvisibility.PathId, Path "
290        "FROM brestore_pathvisibility "
291             "JOIN Path ON( brestore_pathvisibility.PathId = Path.PathId) "
292             "LEFT JOIN brestore_pathhierarchy "
293          "ON (brestore_pathvisibility.PathId = brestore_pathhierarchy.PathId) "
294       "WHERE brestore_pathvisibility.JobId = %s "
295         "AND brestore_pathhierarchy.PathId IS NULL "
296       "ORDER BY Path", jobid);
297    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
298    QUERY_DB(jcr, mdb, mdb->cmd);
299
300    /* TODO: I need to reuse the DB connection without emptying the result 
301     * So, now i'm copying the result in memory to be able to query the
302     * catalog descriptor again.
303     */
304    num = sql_num_rows(mdb);
305    if (num > 0) {
306       char **result = (char **)malloc (num * 2 * sizeof(char *));
307       
308       SQL_ROW row;
309       int i=0;
310       while((row = sql_fetch_row(mdb))) {
311          result[i++] = bstrdup(row[0]);
312          result[i++] = bstrdup(row[1]);
313       }
314       
315       i=0;
316       while (num > 0) {
317          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
318          free(result[i++]);
319          free(result[i++]);
320          num--;
321       }
322       free(result);
323    }
324    
325    Mmsg(mdb->cmd, 
326   "INSERT INTO brestore_pathvisibility (PathId, JobId)  "
327    "SELECT a.PathId,%s "
328    "FROM ( "
329      "SELECT DISTINCT h.PPathId AS PathId "
330        "FROM brestore_pathhierarchy AS h "
331        "JOIN  brestore_pathvisibility AS p ON (h.PathId=p.PathId) "
332       "WHERE p.JobId=%s) AS a LEFT JOIN "
333        "(SELECT PathId "
334           "FROM brestore_pathvisibility "
335          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
336    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
337
338    do {
339       QUERY_DB(jcr, mdb, mdb->cmd);
340    } while (sql_affected_rows(mdb) > 0);
341    
342    Mmsg(mdb->cmd, "INSERT INTO brestore_knownjobid (JobId) VALUES (%s)", jobid);
343    INSERT_DB(jcr, mdb, mdb->cmd);
344
345 bail_out:
346    db_end_transaction(jcr, mdb);
347    db_unlock(mdb);
348 }
349
350 /* 
351  * Find an store the filename descriptor for empty directories Filename.Name=''
352  */
353 DBId_t Bvfs::get_dir_filenameid()
354 {
355    uint32_t id;
356    if (dir_filenameid) {
357       return dir_filenameid;
358    }
359    POOL_MEM q;
360    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
361    db_sql_query(db, q.c_str(), db_int_handler, &id);
362    dir_filenameid = id;
363    return dir_filenameid;
364 }
365
366 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
367 {
368    uint32_t nb=0;
369    db_lock(mdb);
370    db_start_transaction(jcr, mdb);
371
372    Mmsg(mdb->cmd, "SELECT 1 from brestore_knownjobid LIMIT 1");
373    /* TODO: Add this code in the make_bacula_table script */
374    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
375       Dmsg0(dbglevel, "Creating cache table\n");
376       Mmsg(mdb->cmd,
377            "CREATE TABLE brestore_knownjobid ("
378            "JobId integer NOT NULL, "
379            "CONSTRAINT brestore_knownjobid_pkey PRIMARY KEY (JobId))");
380       QUERY_DB(jcr, mdb, mdb->cmd);
381
382       Mmsg(mdb->cmd,
383            "CREATE TABLE brestore_pathhierarchy ( "
384            "PathId integer NOT NULL, "
385            "PPathId integer NOT NULL, "
386            "CONSTRAINT brestore_pathhierarchy_pkey "
387            "PRIMARY KEY (PathId))");
388       QUERY_DB(jcr, mdb, mdb->cmd); 
389
390       Mmsg(mdb->cmd,
391            "CREATE INDEX brestore_pathhierarchy_ppathid "
392            "ON brestore_pathhierarchy (PPathId)");
393       QUERY_DB(jcr, mdb, mdb->cmd);
394
395       Mmsg(mdb->cmd, 
396            "CREATE TABLE brestore_pathvisibility ("
397            "PathId integer NOT NULL, "
398            "JobId integer NOT NULL, "
399            "Size int8 DEFAULT 0, "
400            "Files int4 DEFAULT 0, "
401            "CONSTRAINT brestore_pathvisibility_pkey "
402            "PRIMARY KEY (JobId, PathId))");
403       QUERY_DB(jcr, mdb, mdb->cmd);
404
405       Mmsg(mdb->cmd, 
406            "CREATE INDEX brestore_pathvisibility_jobid "
407            "ON brestore_pathvisibility (JobId)");
408       QUERY_DB(jcr, mdb, mdb->cmd);
409
410    }
411
412    POOLMEM *jobids = get_pool_memory(PM_NAME);
413    *jobids = 0;
414
415    Mmsg(mdb->cmd, 
416  "SELECT JobId from Job "
417   "WHERE JobId NOT IN (SELECT JobId FROM brestore_knownjobid) "
418     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
419   "ORDER BY JobId");
420
421    db_sql_query(mdb, mdb->cmd, db_get_int_handler, jobids);
422
423    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids);
424
425    db_end_transaction(jcr, mdb);
426    db_start_transaction(jcr, mdb);
427    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
428    Mmsg(mdb->cmd, 
429         "DELETE FROM brestore_pathvisibility "
430          "WHERE NOT EXISTS "
431         "(SELECT 1 FROM Job WHERE JobId=brestore_pathvisibility.JobId)");
432    nb = DELETE_DB(jcr, mdb, mdb->cmd);
433    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
434
435    Dmsg0(dbglevel, "Cleaning knownjobid\n");
436    Mmsg(mdb->cmd,         
437         "DELETE FROM brestore_knownjobid "
438          "WHERE NOT EXISTS "
439         "(SELECT 1 FROM Job WHERE JobId=brestore_knownjobid.JobId)");
440    nb = DELETE_DB(jcr, mdb, mdb->cmd);
441    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
442
443    db_end_transaction(jcr, mdb);
444    free_pool_memory(jobids);
445 }
446
447 /*
448  * Update the bvfs cache for given jobids (1,2,3,4)
449  */
450 void
451 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
452 {
453    pathid_cache ppathid_cache;
454    JobId_t JobId;
455    char *p;
456
457    for (p=jobids; ; ) {
458       int stat = get_next_jobid_from_list(&p, &JobId);
459       if (stat < 0) {
460          return;
461       }
462       if (stat == 0) {
463          break;
464       }
465       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t) JobId);
466       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
467    }
468 }
469
470 /* 
471  * Update the bvfs cache for current jobids
472  */
473 void Bvfs::update_cache()
474 {
475    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
476 }
477
478 /* Change the current directory, returns true if the path exists */
479 bool Bvfs::ch_dir(const char *path)
480 {
481    pm_strcpy(db->path, path);
482    db->pnl = strlen(db->path);
483    ch_dir(db_get_path_record(jcr, db)); 
484    return pwd_id != 0;
485 }
486
487 /* 
488  * Get all file versions for a specified client
489  */
490 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
491 {
492    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
493          (uint64_t)fnid, client);
494    char ed1[50], ed2[50];
495    POOL_MEM q;
496    if (see_copies) {
497       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
498    } else {
499       Mmsg(q, " AND Job.Type = 'B' ");
500    }
501
502    POOL_MEM query;
503
504    Mmsg(query,//0       1           2          3
505 "SELECT File.FileId, File.Md5, File.JobId, File.LStat, "
506 //         4                5          
507        "Media.VolumeName, Media.InChanger "
508 "FROM File, Job, Client, JobMedia, Media "
509 "WHERE File.FilenameId = %s "
510   "AND File.PathId=%s "
511   "AND File.JobId = Job.JobId "
512   "AND Job.ClientId = Client.ClientId "
513   "AND Job.JobId = JobMedia.JobId "
514   "AND File.FileIndex >= JobMedia.FirstIndex "
515   "AND File.FileIndex <= JobMedia.LastIndex "
516   "AND JobMedia.MediaId = Media.MediaId "
517   "AND Client.Name = '%s' "
518   "%s ORDER BY FileId LIMIT %d OFFSET %d"
519         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
520         limit, offset);
521    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
522    db_sql_query(db, query.c_str(), list_entries, user_data);
523 }
524
525 DBId_t Bvfs::get_root()
526 {
527    *db->path = 0;
528    return db_get_path_record(jcr, db);
529 }
530
531 static int path_handler(void *ctx, int fields, char **row)
532 {
533    Bvfs *fs = (Bvfs *) ctx;
534    return fs->_handle_path(ctx, fields, row);
535 }
536
537 int Bvfs::_handle_path(void *ctx, int fields, char **row)
538 {
539    if (fields == BVFS_DIR_RECORD) {
540       /* can have the same path 2 times */
541       if (strcmp(row[BVFS_Name], prev_dir)) {
542          pm_strcpy(prev_dir, row[BVFS_Name]);
543          return list_entries(user_data, fields, row);
544       }
545    }
546    return 0;
547 }
548
549 /* 
550  * Retrieve . and .. information
551  */
552 void Bvfs::ls_special_dirs()
553 {
554    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
555    char ed1[50], ed2[50];
556    if (!*jobids) {
557       return;
558    }
559    if (!dir_filenameid) {
560       get_dir_filenameid();
561    }
562
563    /* Will fetch directories  */
564    *prev_dir = 0;
565
566    POOL_MEM query;
567    Mmsg(query, 
568 "((SELECT PPathId AS PathId, '..' AS Path "
569     "FROM  brestore_pathhierarchy "
570    "WHERE  PathId = %s) "
571 "UNION "
572  "(SELECT %s AS PathId, '.' AS Path))",
573         edit_uint64(pwd_id, ed1), ed1);
574
575    POOL_MEM query2;
576    Mmsg(query2, 
577 "SELECT tmp.PathId, tmp.Path, JobId, LStat "
578   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
579        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
580               "File1.LStat AS LStat FROM File AS File1 "
581        "WHERE File1.FilenameId = %s "
582        "AND File1.JobId IN (%s)) AS listfile1 "
583   "ON (tmp.PathId = listfile1.PathId) "
584   "ORDER BY tmp.Path, JobId DESC ",
585         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
586
587    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
588    db_sql_query(db, query2.c_str(), path_handler, this);
589 }
590
591 /* Returns true if we have dirs to read */
592 bool Bvfs::ls_dirs()
593 {
594    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
595    char ed1[50], ed2[50];
596    if (!*jobids) {
597       return false;
598    }
599
600    POOL_MEM filter;
601    if (*pattern) {
602       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
603    }
604
605    if (!dir_filenameid) {
606       get_dir_filenameid();
607    }
608
609    /* the sql query displays same directory multiple time, take the first one */
610    *prev_dir = 0;
611
612    /* Let's retrieve the list of the visible dirs in this dir ...
613     * First, I need the empty filenameid to locate efficiently
614     * the dirs in the file table
615     * my $dir_filenameid = $self->get_dir_filenameid();
616     */
617    /* Then we get all the dir entries from File ... */
618    POOL_MEM query;
619    Mmsg(query,
620 //        0     1      2      3
621 "SELECT PathId, Path, JobId, LStat FROM ( "
622     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
623            "lower(Path1.Path) AS lpath, "
624            "listfile1.JobId AS JobId, listfile1.LStat AS LStat "
625     "FROM ( "
626       "SELECT DISTINCT brestore_pathhierarchy1.PathId AS PathId "
627       "FROM brestore_pathhierarchy AS brestore_pathhierarchy1 "
628       "JOIN Path AS Path2 "
629         "ON (brestore_pathhierarchy1.PathId = Path2.PathId) "
630       "JOIN brestore_pathvisibility AS brestore_pathvisibility1 "
631         "ON (brestore_pathhierarchy1.PathId = brestore_pathvisibility1.PathId) "
632       "WHERE brestore_pathhierarchy1.PPathId = %s "
633       "AND brestore_pathvisibility1.jobid IN (%s) "
634            "%s "
635      ") AS listpath1 "
636    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
637
638    "LEFT JOIN ( " /* get attributes if any */
639        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
640               "File1.LStat AS LStat FROM File AS File1 "
641        "WHERE File1.FilenameId = %s "
642        "AND File1.JobId IN (%s)) AS listfile1 "
643        "ON (listpath1.PathId = listfile1.PathId) "
644     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
645         edit_uint64(pwd_id, ed1),
646         jobids,
647         filter.c_str(),
648         edit_uint64(dir_filenameid, ed2),
649         jobids,
650         limit, offset);
651
652    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
653
654    db_lock(db);
655    db_sql_query(db, query.c_str(), path_handler, this);
656    nb_record = db->num_rows;
657    db_unlock(db);
658
659    return nb_record == limit;
660 }
661
662 /* Returns true if we have files to read */
663 bool Bvfs::ls_files()
664 {
665    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
666    char ed1[50];
667    if (!*jobids) {
668       return false;
669    }
670
671    if (!pwd_id) {
672       ch_dir(get_root());
673    }
674
675    POOL_MEM filter;
676    if (*pattern) {
677       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
678    }
679
680    POOL_MEM query;
681    Mmsg(query, // 0         1              2             3          4
682 "SELECT File.FilenameId, listfiles.Name, File.JobId, File.LStat, listfiles.id "
683 "FROM File, ( "
684        "SELECT Filename.Name as Name, max(File.FileId) as id "
685          "FROM File, Filename "
686         "WHERE File.FilenameId = Filename.FilenameId "
687           "AND Filename.Name != '' "
688           "AND File.PathId = %s "
689           "AND File.JobId IN (%s) "
690           "%s "
691         "GROUP BY Filename.Name "
692         "ORDER BY Filename.Name LIMIT %d OFFSET %d "
693      ") AS listfiles "
694 "WHERE File.FileId = listfiles.id",
695         edit_uint64(pwd_id, ed1),
696         jobids,
697         filter.c_str(),
698         limit,
699         offset);
700    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
701
702    db_lock(db);
703    db_sql_query(db, query.c_str(), list_entries, user_data);
704    nb_record = db->num_rows;
705    db_unlock(db);
706
707    return nb_record == limit;
708 }