]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Add TODO in bvfs
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_version = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71 }
72
73 Bvfs::~Bvfs() {
74    free_pool_memory(jobids);
75    free_pool_memory(pattern);
76    free_pool_memory(prev_dir);
77    free_attr(attr);
78    jcr->dec_use_count();
79 }
80
81 /* 
82  * TODO: Find a way to let the user choose how he wants to display
83  * files and directories
84  */
85
86
87 /* 
88  * Working Object to store PathId already seen (avoid
89  * database queries), equivalent to %cache_ppathid in perl
90  */
91
92 #define NITEMS 50000
93 class pathid_cache {
94 private:
95    hlink *nodes;
96    int nb_node;
97    int max_node;
98
99    alist *table_node;
100
101    htable *cache_ppathid;
102
103 public:
104    pathid_cache() {
105       hlink link;
106       cache_ppathid = (htable *)malloc(sizeof(htable));
107       cache_ppathid->init(&link, &link, NITEMS);
108       max_node = NITEMS;
109       nodes = (hlink *) malloc(max_node * sizeof (hlink));
110       nb_node = 0;
111       table_node = New(alist(5, owned_by_alist));
112       table_node->append(nodes);
113    }
114
115    hlink *get_hlink() {
116       if (++nb_node >= max_node) {
117          nb_node = 0;
118          nodes = (hlink *)malloc(max_node * sizeof(hlink));
119          table_node->append(nodes);
120       }
121       return nodes + nb_node;
122    }
123
124    bool lookup(char *pathid) {
125       bool ret = cache_ppathid->lookup(pathid) != NULL;
126       return ret;
127    }
128    
129    void insert(char *pathid) {
130       hlink *h = get_hlink();
131       cache_ppathid->insert(pathid, h);
132    }
133
134    ~pathid_cache() {
135       cache_ppathid->destroy();
136       free(cache_ppathid);
137       delete table_node;
138    }
139 private:
140    pathid_cache(const pathid_cache &); /* prohibit pass by value */
141    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
142 } ;
143
144 /* Return the parent_dir with the trailing /  (update the given string)
145  * TODO: see in the rest of bacula if we don't have already this function
146  * dir=/tmp/toto/
147  * dir=/tmp/
148  * dir=/
149  * dir=
150  */
151 char *bvfs_parent_dir(char *path)
152 {
153    char *p = path;
154    int len = strlen(path) - 1;
155
156    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
157       path[len] = '\0';
158    }
159
160    if (len > 0) {
161       p += len;
162       while (p > path && !IsPathSeparator(*p)) {
163          p--;
164       }
165       p[1] = '\0';
166    }
167    return path;
168 }
169
170 /* Return the basename of the with the trailing /
171  * TODO: see in the rest of bacula if we don't have
172  * this function already
173  */
174 char *bvfs_basename_dir(char *path)
175 {
176    char *p = path;
177    int len = strlen(path) - 1;
178
179    if (path[len] == '/') {      /* if directory, skip last / */
180       len -= 1;
181    }
182
183    if (len > 0) {
184       p += len;
185       while (p > path && !IsPathSeparator(*p)) {
186          p--;
187       }
188       if (*p == '/') {
189          p++;                  /* skip first / */
190       }
191    } 
192    return p;
193 }
194
195 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
196                                  pathid_cache &ppathid_cache, 
197                                  char *org_pathid, char *path)
198 {
199    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
200    char pathid[50];
201    ATTR_DBR parent;
202    char *bkp = mdb->path;
203    strncpy(pathid, org_pathid, sizeof(pathid));
204
205    /* Does the ppathid exist for this ? we use a memory cache...  In order to
206     * avoid the full loop, we consider that if a dir is allready in the
207     * PathHierarchy table, then there is no need to calculate all the
208     * hierarchy
209     */
210    while (path && *path)
211    {
212       if (!ppathid_cache.lookup(pathid))
213       {
214          Mmsg(mdb->cmd, 
215               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
216               pathid);
217
218          QUERY_DB(jcr, mdb, mdb->cmd);
219          /* Do we have a result ? */
220          if (sql_num_rows(mdb) > 0) {
221             ppathid_cache.insert(pathid);
222             /* This dir was in the db ...
223              * It means we can leave, the tree has allready been built for
224              * this dir
225              */
226             goto bail_out;
227          } else {
228             /* search or create parent PathId in Path table */
229             mdb->path = bvfs_parent_dir(path);
230             mdb->pnl = strlen(mdb->path);
231             if (!db_create_path_record(jcr, mdb, &parent)) {
232                goto bail_out;
233             }
234             ppathid_cache.insert(pathid);
235             
236             Mmsg(mdb->cmd,
237                  "INSERT INTO PathHierarchy (PathId, PPathId) "
238                  "VALUES (%s,%lld)",
239                  pathid, (uint64_t) parent.PathId);
240             
241             INSERT_DB(jcr, mdb, mdb->cmd);
242
243             edit_uint64(parent.PathId, pathid);
244             path = mdb->path;   /* already done */
245          }
246       } else {
247          /* It's already in the cache.  We can leave, no time to waste here,
248           * all the parent dirs have allready been done
249           */
250          goto bail_out;
251       }
252    }   
253
254 bail_out:
255    mdb->path = bkp;
256    mdb->fnl = 0;
257 }
258
259 /* 
260  * Internal function to update path_hierarchy cache with a shared pathid cache
261  */
262 static void update_path_hierarchy_cache(JCR *jcr,
263                                         B_DB *mdb,
264                                         pathid_cache &ppathid_cache,
265                                         JobId_t JobId)
266 {
267    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
268
269    uint32_t num;
270    char jobid[50];
271    edit_uint64(JobId, jobid);
272  
273    db_lock(mdb);
274    db_start_transaction(jcr, mdb);
275
276    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
277    
278    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
279       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
280       goto bail_out;
281    }
282
283    /* Inserting path records for JobId */
284    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
285                   "SELECT DISTINCT PathId, JobId FROM File WHERE JobId = %s",
286         jobid);
287    QUERY_DB(jcr, mdb, mdb->cmd);
288
289
290    /* Now we have to do the directory recursion stuff to determine missing
291     * visibility We try to avoid recursion, to be as fast as possible We also
292     * only work on not allready hierarchised directories...
293     */
294    Mmsg(mdb->cmd, 
295      "SELECT PathVisibility.PathId, Path "
296        "FROM PathVisibility "
297             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
298             "LEFT JOIN PathHierarchy "
299          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
300       "WHERE PathVisibility.JobId = %s "
301         "AND PathHierarchy.PathId IS NULL "
302       "ORDER BY Path", jobid);
303    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
304    QUERY_DB(jcr, mdb, mdb->cmd);
305
306    /* TODO: I need to reuse the DB connection without emptying the result 
307     * So, now i'm copying the result in memory to be able to query the
308     * catalog descriptor again.
309     */
310    num = sql_num_rows(mdb);
311    if (num > 0) {
312       char **result = (char **)malloc (num * 2 * sizeof(char *));
313       
314       SQL_ROW row;
315       int i=0;
316       while((row = sql_fetch_row(mdb))) {
317          result[i++] = bstrdup(row[0]);
318          result[i++] = bstrdup(row[1]);
319       }
320       
321       i=0;
322       while (num > 0) {
323          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
324          free(result[i++]);
325          free(result[i++]);
326          num--;
327       }
328       free(result);
329    }
330    
331    Mmsg(mdb->cmd, 
332   "INSERT INTO PathVisibility (PathId, JobId)  "
333    "SELECT a.PathId,%s "
334    "FROM ( "
335      "SELECT DISTINCT h.PPathId AS PathId "
336        "FROM PathHierarchy AS h "
337        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
338       "WHERE p.JobId=%s) AS a LEFT JOIN "
339        "(SELECT PathId "
340           "FROM PathVisibility "
341          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
342    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
343
344    do {
345       QUERY_DB(jcr, mdb, mdb->cmd);
346    } while (sql_affected_rows(mdb) > 0);
347    
348    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
349    UPDATE_DB(jcr, mdb, mdb->cmd);
350
351 bail_out:
352    db_end_transaction(jcr, mdb);
353    db_unlock(mdb);
354 }
355
356 /* 
357  * Find an store the filename descriptor for empty directories Filename.Name=''
358  */
359 DBId_t Bvfs::get_dir_filenameid()
360 {
361    uint32_t id;
362    if (dir_filenameid) {
363       return dir_filenameid;
364    }
365    POOL_MEM q;
366    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
367    db_sql_query(db, q.c_str(), db_int_handler, &id);
368    dir_filenameid = id;
369    return dir_filenameid;
370 }
371
372 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
373 {
374    uint32_t nb=0;
375    db_list_ctx jobids_list;
376
377    db_lock(mdb);
378    db_start_transaction(jcr, mdb);
379
380 #ifdef xxx
381    /* TODO: Remove this code when updating make_bacula_table script */
382    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
383    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
384       Dmsg0(dbglevel, "Creating cache table\n");
385       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
386       QUERY_DB(jcr, mdb, mdb->cmd);
387
388       Mmsg(mdb->cmd,
389            "CREATE TABLE PathHierarchy ( "
390            "PathId integer NOT NULL, "
391            "PPathId integer NOT NULL, "
392            "CONSTRAINT pathhierarchy_pkey "
393            "PRIMARY KEY (PathId))");
394       QUERY_DB(jcr, mdb, mdb->cmd); 
395
396       Mmsg(mdb->cmd,
397            "CREATE INDEX pathhierarchy_ppathid "
398            "ON PathHierarchy (PPathId)");
399       QUERY_DB(jcr, mdb, mdb->cmd);
400
401       Mmsg(mdb->cmd, 
402            "CREATE TABLE PathVisibility ("
403            "PathId integer NOT NULL, "
404            "JobId integer NOT NULL, "
405            "Size int8 DEFAULT 0, "
406            "Files int4 DEFAULT 0, "
407            "CONSTRAINT pathvisibility_pkey "
408            "PRIMARY KEY (JobId, PathId))");
409       QUERY_DB(jcr, mdb, mdb->cmd);
410
411       Mmsg(mdb->cmd, 
412            "CREATE INDEX pathvisibility_jobid "
413            "ON PathVisibility (JobId)");
414       QUERY_DB(jcr, mdb, mdb->cmd);
415
416    }
417 #endif
418
419    Mmsg(mdb->cmd, 
420  "SELECT JobId from Job "
421   "WHERE HasCache = 0 "
422     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
423   "ORDER BY JobId");
424
425    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
426
427    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
428
429    db_end_transaction(jcr, mdb);
430    db_start_transaction(jcr, mdb);
431    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
432    Mmsg(mdb->cmd, 
433         "DELETE FROM PathVisibility "
434          "WHERE NOT EXISTS "
435         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
436    nb = DELETE_DB(jcr, mdb, mdb->cmd);
437    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
438
439    db_end_transaction(jcr, mdb);
440 }
441
442 /*
443  * Update the bvfs cache for given jobids (1,2,3,4)
444  */
445 void
446 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
447 {
448    pathid_cache ppathid_cache;
449    JobId_t JobId;
450    char *p;
451
452    for (p=jobids; ; ) {
453       int stat = get_next_jobid_from_list(&p, &JobId);
454       if (stat < 0) {
455          return;
456       }
457       if (stat == 0) {
458          break;
459       }
460       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
461       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
462    }
463 }
464
465 /* 
466  * Update the bvfs cache for current jobids
467  */
468 void Bvfs::update_cache()
469 {
470    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
471 }
472
473 /* Change the current directory, returns true if the path exists */
474 bool Bvfs::ch_dir(const char *path)
475 {
476    pm_strcpy(db->path, path);
477    db->pnl = strlen(db->path);
478    ch_dir(db_get_path_record(jcr, db)); 
479    return pwd_id != 0;
480 }
481
482 /* 
483  * Get all file versions for a specified client
484  */
485 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
486 {
487    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
488          (uint64_t)fnid, client);
489    char ed1[50], ed2[50];
490    POOL_MEM q;
491    if (see_copies) {
492       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
493    } else {
494       Mmsg(q, " AND Job.Type = 'B' ");
495    }
496
497    POOL_MEM query;
498
499    Mmsg(query,//    1           2          3       4
500 "SELECT 'V', File.FileId, File.Md5, File.JobId, File.LStat, "
501 //         5                6
502        "Media.VolumeName, Media.InChanger "
503 "FROM File, Job, Client, JobMedia, Media "
504 "WHERE File.FilenameId = %s "
505   "AND File.PathId=%s "
506   "AND File.JobId = Job.JobId "
507   "AND Job.ClientId = Client.ClientId "
508   "AND Job.JobId = JobMedia.JobId "
509   "AND File.FileIndex >= JobMedia.FirstIndex "
510   "AND File.FileIndex <= JobMedia.LastIndex "
511   "AND JobMedia.MediaId = Media.MediaId "
512   "AND Client.Name = '%s' "
513   "%s ORDER BY FileId LIMIT %d OFFSET %d"
514         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
515         limit, offset);
516    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
517    db_sql_query(db, query.c_str(), list_entries, user_data);
518 }
519
520 DBId_t Bvfs::get_root()
521 {
522    *db->path = 0;
523    return db_get_path_record(jcr, db);
524 }
525
526 static int path_handler(void *ctx, int fields, char **row)
527 {
528    Bvfs *fs = (Bvfs *) ctx;
529    return fs->_handle_path(ctx, fields, row);
530 }
531
532 int Bvfs::_handle_path(void *ctx, int fields, char **row)
533 {
534    if (bvfs_is_dir(row)) {
535       /* can have the same path 2 times */
536       if (strcmp(row[BVFS_Name], prev_dir)) {
537          pm_strcpy(prev_dir, row[BVFS_Name]);
538          return list_entries(user_data, fields, row);
539       }
540    }
541    return 0;
542 }
543
544 /* 
545  * Retrieve . and .. information
546  */
547 void Bvfs::ls_special_dirs()
548 {
549    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
550    char ed1[50], ed2[50];
551    if (*jobids == 0) {
552       return;
553    }
554    if (!dir_filenameid) {
555       get_dir_filenameid();
556    }
557
558    /* Will fetch directories  */
559    *prev_dir = 0;
560
561    POOL_MEM query;
562    Mmsg(query, 
563 "((SELECT PPathId AS PathId, '..' AS Path "
564     "FROM  PathHierarchy "
565    "WHERE  PathId = %s) "
566 "UNION "
567  "(SELECT %s AS PathId, '.' AS Path))",
568         edit_uint64(pwd_id, ed1), ed1);
569
570    POOL_MEM query2;
571    Mmsg(query2,// 1      2     3        4     5       6
572 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
573   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
574        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
575               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
576        "WHERE File1.FilenameId = %s "
577        "AND File1.JobId IN (%s)) AS listfile1 "
578   "ON (tmp.PathId = listfile1.PathId) "
579   "ORDER BY tmp.Path, JobId DESC ",
580         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
581
582    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
583    db_sql_query(db, query2.c_str(), path_handler, this);
584 }
585
586 /* Returns true if we have dirs to read */
587 bool Bvfs::ls_dirs()
588 {
589    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
590    char ed1[50], ed2[50];
591    if (*jobids == 0) {
592       return false;
593    }
594
595    POOL_MEM filter;
596    if (*pattern) {
597       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
598    }
599
600    if (!dir_filenameid) {
601       get_dir_filenameid();
602    }
603
604    /* the sql query displays same directory multiple time, take the first one */
605    *prev_dir = 0;
606
607    /* Let's retrieve the list of the visible dirs in this dir ...
608     * First, I need the empty filenameid to locate efficiently
609     * the dirs in the file table
610     * my $dir_filenameid = $self->get_dir_filenameid();
611     */
612    /* Then we get all the dir entries from File ... */
613    POOL_MEM query;
614    Mmsg(query,
615 //       0     1     2   3      4     5       6
616 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
617     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
618            "lower(Path1.Path) AS lpath, "
619            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
620            "listfile1.FileId AS FileId "
621     "FROM ( "
622       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
623       "FROM PathHierarchy AS PathHierarchy1 "
624       "JOIN Path AS Path2 "
625         "ON (PathHierarchy1.PathId = Path2.PathId) "
626       "JOIN PathVisibility AS PathVisibility1 "
627         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
628       "WHERE PathHierarchy1.PPathId = %s "
629       "AND PathVisibility1.jobid IN (%s) "
630            "%s "
631      ") AS listpath1 "
632    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
633
634    "LEFT JOIN ( " /* get attributes if any */
635        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
636               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
637        "WHERE File1.FilenameId = %s "
638        "AND File1.JobId IN (%s)) AS listfile1 "
639        "ON (listpath1.PathId = listfile1.PathId) "
640     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
641         edit_uint64(pwd_id, ed1),
642         jobids,
643         filter.c_str(),
644         edit_uint64(dir_filenameid, ed2),
645         jobids,
646         limit, offset);
647
648    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
649
650    db_lock(db);
651    db_sql_query(db, query.c_str(), path_handler, this);
652    nb_record = db->num_rows;
653    db_unlock(db);
654
655    return nb_record == limit;
656 }
657
658 /* Returns true if we have files to read */
659 bool Bvfs::ls_files()
660 {
661    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
662    char ed1[50];
663    if (*jobids == 0) {
664       return false;
665    }
666
667    if (!pwd_id) {
668       ch_dir(get_root());
669    }
670
671    POOL_MEM filter;
672    if (*pattern) {
673       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
674    }
675    /* TODO: Use JobTDate instead of FileId to determine the latest version */
676    POOL_MEM query;
677    Mmsg(query, //    1              2             3          4
678 "SELECT 'F', File.PathId, File.FilenameId, listfiles.Name, File.JobId, "
679         "File.LStat, listfiles.id "
680 "FROM File, ( "
681        "SELECT Filename.Name as Name, max(File.FileId) as id "
682          "FROM File, Filename "
683         "WHERE File.FilenameId = Filename.FilenameId "
684           "AND Filename.Name != '' "
685           "AND File.PathId = %s "
686           "AND File.JobId IN (%s) "
687           "%s "
688         "GROUP BY Filename.Name "
689         "ORDER BY Filename.Name LIMIT %d OFFSET %d "
690      ") AS listfiles "
691 "WHERE File.FileId = listfiles.id",
692         edit_uint64(pwd_id, ed1),
693         jobids,
694         filter.c_str(),
695         limit,
696         offset);
697    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
698
699    db_lock(db);
700    db_sql_query(db, query.c_str(), list_entries, user_data);
701    nb_record = db->num_rows;
702    db_unlock(db);
703
704    return nb_record == limit;
705 }