]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Fix bug in bvfs_update function, should work much better now
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_version = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71 }
72
73 Bvfs::~Bvfs() {
74    free_pool_memory(jobids);
75    free_pool_memory(pattern);
76    free_pool_memory(prev_dir);
77    free_attr(attr);
78    jcr->dec_use_count();
79 }
80
81 /* 
82  * TODO: Find a way to let the user choose how he wants to display
83  * files and directories
84  */
85
86
87 /* 
88  * Working Object to store PathId already seen (avoid
89  * database queries), equivalent to %cache_ppathid in perl
90  */
91
92 #define NITEMS 50000
93 class pathid_cache {
94 private:
95    hlink *nodes;
96    int nb_node;
97    int max_node;
98    htable *cache_ppathid;
99
100 public:
101    pathid_cache() {
102       hlink link;
103       cache_ppathid = (htable *)malloc(sizeof(htable));
104       cache_ppathid->init(&link, &link, NITEMS);
105       max_node = NITEMS;
106       nodes = (hlink *) malloc(max_node * sizeof (hlink));
107       nb_node = 0;
108    }
109
110    hlink *get_hlink() {
111       if (nb_node >= max_node) {
112          max_node *= 2;
113          nodes = (hlink *)brealloc(nodes, sizeof(hlink) * max_node);
114       }
115       return nodes + nb_node++;
116    }
117
118    bool lookup(char *pathid) {
119       bool ret = cache_ppathid->lookup(pathid) != NULL;
120       return ret;
121    }
122    
123    void insert(char *pathid) {
124       hlink *h = get_hlink();
125       cache_ppathid->insert(pathid, h);
126    }
127
128    ~pathid_cache() {
129       cache_ppathid->destroy();
130       free(cache_ppathid);
131       free(nodes);
132    }
133 private:
134    pathid_cache(const pathid_cache &); /* prohibit pass by value */
135    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
136 } ;
137
138 /* Return the parent_dir with the trailing /  (update the given string)
139  * TODO: see in the rest of bacula if we don't have already this function
140  * dir=/tmp/toto/
141  * dir=/tmp/
142  * dir=/
143  * dir=
144  */
145 char *bvfs_parent_dir(char *path)
146 {
147    char *p = path;
148    int len = strlen(path) - 1;
149
150    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
151       path[len] = '\0';
152    }
153
154    if (len > 0) {
155       p += len;
156       while (p > path && !IsPathSeparator(*p)) {
157          p--;
158       }
159       p[1] = '\0';
160    }
161    return path;
162 }
163
164 /* Return the basename of the with the trailing /
165  * TODO: see in the rest of bacula if we don't have
166  * this function already
167  */
168 char *bvfs_basename_dir(char *path)
169 {
170    char *p = path;
171    int len = strlen(path) - 1;
172
173    if (path[len] == '/') {      /* if directory, skip last / */
174       len -= 1;
175    }
176
177    if (len > 0) {
178       p += len;
179       while (p > path && !IsPathSeparator(*p)) {
180          p--;
181       }
182       if (*p == '/') {
183          p++;                  /* skip first / */
184       }
185    } 
186    return p;
187 }
188
189 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
190                                  pathid_cache &ppathid_cache, 
191                                  char *org_pathid, char *path)
192 {
193    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
194    char pathid[50];
195    ATTR_DBR parent;
196    char *bkp = mdb->path;
197    strncpy(pathid, org_pathid, sizeof(pathid));
198
199    /* Does the ppathid exist for this ? we use a memory cache...  In order to
200     * avoid the full loop, we consider that if a dir is allready in the
201     * PathHierarchy table, then there is no need to calculate all the
202     * hierarchy
203     */
204    while (path && *path)
205    {
206       if (!ppathid_cache.lookup(pathid))
207       {
208          Mmsg(mdb->cmd, 
209               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
210               pathid);
211
212          QUERY_DB(jcr, mdb, mdb->cmd);
213          /* Do we have a result ? */
214          if (sql_num_rows(mdb) > 0) {
215             ppathid_cache.insert(pathid);
216             /* This dir was in the db ...
217              * It means we can leave, the tree has allready been built for
218              * this dir
219              */
220             goto bail_out;
221          } else {
222             /* search or create parent PathId in Path table */
223             mdb->path = bvfs_parent_dir(path);
224             mdb->pnl = strlen(mdb->path);
225             if (!db_create_path_record(jcr, mdb, &parent)) {
226                goto bail_out;
227             }
228             ppathid_cache.insert(pathid);
229             
230             Mmsg(mdb->cmd,
231                  "INSERT INTO PathHierarchy (PathId, PPathId) "
232                  "VALUES (%s,%lld)",
233                  pathid, (uint64_t) parent.PathId);
234             
235             INSERT_DB(jcr, mdb, mdb->cmd);
236
237             edit_uint64(parent.PathId, pathid);
238             path = mdb->path;   /* already done */
239          }
240       } else {
241          /* It's already in the cache.  We can leave, no time to waste here,
242           * all the parent dirs have allready been done
243           */
244          goto bail_out;
245       }
246    }   
247
248 bail_out:
249    mdb->path = bkp;
250    mdb->fnl = 0;
251 }
252
253 /* 
254  * Internal function to update path_hierarchy cache with a shared pathid cache
255  */
256 static void update_path_hierarchy_cache(JCR *jcr,
257                                         B_DB *mdb,
258                                         pathid_cache &ppathid_cache,
259                                         JobId_t JobId)
260 {
261    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
262
263    uint32_t num;
264    char jobid[50];
265    edit_uint64(JobId, jobid);
266  
267    db_lock(mdb);
268    db_start_transaction(jcr, mdb);
269
270    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
271    
272    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
273       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
274       goto bail_out;
275    }
276
277    /* Inserting path records for JobId */
278    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
279                   "SELECT DISTINCT PathId, JobId FROM File WHERE JobId = %s",
280         jobid);
281    QUERY_DB(jcr, mdb, mdb->cmd);
282
283
284    /* Now we have to do the directory recursion stuff to determine missing
285     * visibility We try to avoid recursion, to be as fast as possible We also
286     * only work on not allready hierarchised directories...
287     */
288    Mmsg(mdb->cmd, 
289      "SELECT PathVisibility.PathId, Path "
290        "FROM PathVisibility "
291             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
292             "LEFT JOIN PathHierarchy "
293          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
294       "WHERE PathVisibility.JobId = %s "
295         "AND PathHierarchy.PathId IS NULL "
296       "ORDER BY Path", jobid);
297    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
298    QUERY_DB(jcr, mdb, mdb->cmd);
299
300    /* TODO: I need to reuse the DB connection without emptying the result 
301     * So, now i'm copying the result in memory to be able to query the
302     * catalog descriptor again.
303     */
304    num = sql_num_rows(mdb);
305    if (num > 0) {
306       char **result = (char **)malloc (num * 2 * sizeof(char *));
307       
308       SQL_ROW row;
309       int i=0;
310       while((row = sql_fetch_row(mdb))) {
311          result[i++] = bstrdup(row[0]);
312          result[i++] = bstrdup(row[1]);
313       }
314       
315       i=0;
316       while (num > 0) {
317          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
318          free(result[i++]);
319          free(result[i++]);
320          num--;
321       }
322       free(result);
323    }
324    
325    Mmsg(mdb->cmd, 
326   "INSERT INTO PathVisibility (PathId, JobId)  "
327    "SELECT a.PathId,%s "
328    "FROM ( "
329      "SELECT DISTINCT h.PPathId AS PathId "
330        "FROM PathHierarchy AS h "
331        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
332       "WHERE p.JobId=%s) AS a LEFT JOIN "
333        "(SELECT PathId "
334           "FROM PathVisibility "
335          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
336    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
337
338    do {
339       QUERY_DB(jcr, mdb, mdb->cmd);
340    } while (sql_affected_rows(mdb) > 0);
341    
342    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
343    UPDATE_DB(jcr, mdb, mdb->cmd);
344
345 bail_out:
346    db_end_transaction(jcr, mdb);
347    db_unlock(mdb);
348 }
349
350 /* 
351  * Find an store the filename descriptor for empty directories Filename.Name=''
352  */
353 DBId_t Bvfs::get_dir_filenameid()
354 {
355    uint32_t id;
356    if (dir_filenameid) {
357       return dir_filenameid;
358    }
359    POOL_MEM q;
360    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
361    db_sql_query(db, q.c_str(), db_int_handler, &id);
362    dir_filenameid = id;
363    return dir_filenameid;
364 }
365
366 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
367 {
368    uint32_t nb=0;
369    db_list_ctx jobids_list;
370
371    db_lock(mdb);
372    db_start_transaction(jcr, mdb);
373
374 #ifdef xxx
375    /* TODO: Remove this code when updating make_bacula_table script */
376    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
377    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
378       Dmsg0(dbglevel, "Creating cache table\n");
379       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
380       QUERY_DB(jcr, mdb, mdb->cmd);
381
382       Mmsg(mdb->cmd,
383            "CREATE TABLE PathHierarchy ( "
384            "PathId integer NOT NULL, "
385            "PPathId integer NOT NULL, "
386            "CONSTRAINT pathhierarchy_pkey "
387            "PRIMARY KEY (PathId))");
388       QUERY_DB(jcr, mdb, mdb->cmd); 
389
390       Mmsg(mdb->cmd,
391            "CREATE INDEX pathhierarchy_ppathid "
392            "ON PathHierarchy (PPathId)");
393       QUERY_DB(jcr, mdb, mdb->cmd);
394
395       Mmsg(mdb->cmd, 
396            "CREATE TABLE PathVisibility ("
397            "PathId integer NOT NULL, "
398            "JobId integer NOT NULL, "
399            "Size int8 DEFAULT 0, "
400            "Files int4 DEFAULT 0, "
401            "CONSTRAINT pathvisibility_pkey "
402            "PRIMARY KEY (JobId, PathId))");
403       QUERY_DB(jcr, mdb, mdb->cmd);
404
405       Mmsg(mdb->cmd, 
406            "CREATE INDEX pathvisibility_jobid "
407            "ON PathVisibility (JobId)");
408       QUERY_DB(jcr, mdb, mdb->cmd);
409
410    }
411 #endif
412
413    Mmsg(mdb->cmd, 
414  "SELECT JobId from Job "
415   "WHERE HasCache = 0 "
416     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
417   "ORDER BY JobId");
418
419    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
420
421    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
422
423    db_end_transaction(jcr, mdb);
424    db_start_transaction(jcr, mdb);
425    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
426    Mmsg(mdb->cmd, 
427         "DELETE FROM PathVisibility "
428          "WHERE NOT EXISTS "
429         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
430    nb = DELETE_DB(jcr, mdb, mdb->cmd);
431    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
432
433    db_end_transaction(jcr, mdb);
434 }
435
436 /*
437  * Update the bvfs cache for given jobids (1,2,3,4)
438  */
439 void
440 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
441 {
442    pathid_cache ppathid_cache;
443    JobId_t JobId;
444    char *p;
445
446    for (p=jobids; ; ) {
447       int stat = get_next_jobid_from_list(&p, &JobId);
448       if (stat < 0) {
449          return;
450       }
451       if (stat == 0) {
452          break;
453       }
454       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
455       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
456    }
457 }
458
459 /* 
460  * Update the bvfs cache for current jobids
461  */
462 void Bvfs::update_cache()
463 {
464    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
465 }
466
467 /* Change the current directory, returns true if the path exists */
468 bool Bvfs::ch_dir(const char *path)
469 {
470    pm_strcpy(db->path, path);
471    db->pnl = strlen(db->path);
472    ch_dir(db_get_path_record(jcr, db)); 
473    return pwd_id != 0;
474 }
475
476 /* 
477  * Get all file versions for a specified client
478  */
479 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
480 {
481    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
482          (uint64_t)fnid, client);
483    char ed1[50], ed2[50];
484    POOL_MEM q;
485    if (see_copies) {
486       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
487    } else {
488       Mmsg(q, " AND Job.Type = 'B' ");
489    }
490
491    POOL_MEM query;
492
493    Mmsg(query,//    1           2          3       4
494 "SELECT 'V', File.FileId, File.Md5, File.JobId, File.LStat, "
495 //         5                6
496        "Media.VolumeName, Media.InChanger "
497 "FROM File, Job, Client, JobMedia, Media "
498 "WHERE File.FilenameId = %s "
499   "AND File.PathId=%s "
500   "AND File.JobId = Job.JobId "
501   "AND Job.ClientId = Client.ClientId "
502   "AND Job.JobId = JobMedia.JobId "
503   "AND File.FileIndex >= JobMedia.FirstIndex "
504   "AND File.FileIndex <= JobMedia.LastIndex "
505   "AND JobMedia.MediaId = Media.MediaId "
506   "AND Client.Name = '%s' "
507   "%s ORDER BY FileId LIMIT %d OFFSET %d"
508         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
509         limit, offset);
510    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
511    db_sql_query(db, query.c_str(), list_entries, user_data);
512 }
513
514 DBId_t Bvfs::get_root()
515 {
516    *db->path = 0;
517    return db_get_path_record(jcr, db);
518 }
519
520 static int path_handler(void *ctx, int fields, char **row)
521 {
522    Bvfs *fs = (Bvfs *) ctx;
523    return fs->_handle_path(ctx, fields, row);
524 }
525
526 int Bvfs::_handle_path(void *ctx, int fields, char **row)
527 {
528    if (bvfs_is_dir(row)) {
529       /* can have the same path 2 times */
530       if (strcmp(row[BVFS_Name], prev_dir)) {
531          pm_strcpy(prev_dir, row[BVFS_Name]);
532          return list_entries(user_data, fields, row);
533       }
534    }
535    return 0;
536 }
537
538 /* 
539  * Retrieve . and .. information
540  */
541 void Bvfs::ls_special_dirs()
542 {
543    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
544    char ed1[50], ed2[50];
545    if (*jobids == 0) {
546       return;
547    }
548    if (!dir_filenameid) {
549       get_dir_filenameid();
550    }
551
552    /* Will fetch directories  */
553    *prev_dir = 0;
554
555    POOL_MEM query;
556    Mmsg(query, 
557 "((SELECT PPathId AS PathId, '..' AS Path "
558     "FROM  PathHierarchy "
559    "WHERE  PathId = %s) "
560 "UNION "
561  "(SELECT %s AS PathId, '.' AS Path))",
562         edit_uint64(pwd_id, ed1), ed1);
563
564    POOL_MEM query2;
565    Mmsg(query2,// 1      2     3        4     5       6
566 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
567   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
568        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
569               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
570        "WHERE File1.FilenameId = %s "
571        "AND File1.JobId IN (%s)) AS listfile1 "
572   "ON (tmp.PathId = listfile1.PathId) "
573   "ORDER BY tmp.Path, JobId DESC ",
574         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
575
576    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
577    db_sql_query(db, query2.c_str(), path_handler, this);
578 }
579
580 /* Returns true if we have dirs to read */
581 bool Bvfs::ls_dirs()
582 {
583    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
584    char ed1[50], ed2[50];
585    if (*jobids == 0) {
586       return false;
587    }
588
589    POOL_MEM filter;
590    if (*pattern) {
591       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
592    }
593
594    if (!dir_filenameid) {
595       get_dir_filenameid();
596    }
597
598    /* the sql query displays same directory multiple time, take the first one */
599    *prev_dir = 0;
600
601    /* Let's retrieve the list of the visible dirs in this dir ...
602     * First, I need the empty filenameid to locate efficiently
603     * the dirs in the file table
604     * my $dir_filenameid = $self->get_dir_filenameid();
605     */
606    /* Then we get all the dir entries from File ... */
607    POOL_MEM query;
608    Mmsg(query,
609 //       0     1     2   3      4     5       6
610 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
611     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
612            "lower(Path1.Path) AS lpath, "
613            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
614            "listfile1.FileId AS FileId "
615     "FROM ( "
616       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
617       "FROM PathHierarchy AS PathHierarchy1 "
618       "JOIN Path AS Path2 "
619         "ON (PathHierarchy1.PathId = Path2.PathId) "
620       "JOIN PathVisibility AS PathVisibility1 "
621         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
622       "WHERE PathHierarchy1.PPathId = %s "
623       "AND PathVisibility1.jobid IN (%s) "
624            "%s "
625      ") AS listpath1 "
626    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
627
628    "LEFT JOIN ( " /* get attributes if any */
629        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
630               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
631        "WHERE File1.FilenameId = %s "
632        "AND File1.JobId IN (%s)) AS listfile1 "
633        "ON (listpath1.PathId = listfile1.PathId) "
634     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
635         edit_uint64(pwd_id, ed1),
636         jobids,
637         filter.c_str(),
638         edit_uint64(dir_filenameid, ed2),
639         jobids,
640         limit, offset);
641
642    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
643
644    db_lock(db);
645    db_sql_query(db, query.c_str(), path_handler, this);
646    nb_record = db->num_rows;
647    db_unlock(db);
648
649    return nb_record == limit;
650 }
651
652 /* Returns true if we have files to read */
653 bool Bvfs::ls_files()
654 {
655    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
656    char ed1[50];
657    if (*jobids == 0) {
658       return false;
659    }
660
661    if (!pwd_id) {
662       ch_dir(get_root());
663    }
664
665    POOL_MEM filter;
666    if (*pattern) {
667       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
668    }
669
670    POOL_MEM query;
671    Mmsg(query, //    1              2             3          4
672 "SELECT 'F', File.PathId, File.FilenameId, listfiles.Name, File.JobId, "
673         "File.LStat, listfiles.id "
674 "FROM File, ( "
675        "SELECT Filename.Name as Name, max(File.FileId) as id "
676          "FROM File, Filename "
677         "WHERE File.FilenameId = Filename.FilenameId "
678           "AND Filename.Name != '' "
679           "AND File.PathId = %s "
680           "AND File.JobId IN (%s) "
681           "%s "
682         "GROUP BY Filename.Name "
683         "ORDER BY Filename.Name LIMIT %d OFFSET %d "
684      ") AS listfiles "
685 "WHERE File.FileId = listfiles.id",
686         edit_uint64(pwd_id, ed1),
687         jobids,
688         filter.c_str(),
689         limit,
690         offset);
691    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
692
693    db_lock(db);
694    db_sql_query(db, query.c_str(), list_entries, user_data);
695    nb_record = db->num_rows;
696    db_unlock(db);
697
698    return nb_record == limit;
699 }