]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Tweak mutex order for SD
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_version = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71 }
72
73 Bvfs::~Bvfs() {
74    free_pool_memory(jobids);
75    free_pool_memory(pattern);
76    free_pool_memory(prev_dir);
77    free_attr(attr);
78    jcr->dec_use_count();
79 }
80
81 /* 
82  * TODO: Find a way to let the user choose how he wants to display
83  * files and directories
84  */
85
86
87 /* 
88  * Working Object to store PathId already seen (avoid
89  * database queries), equivalent to %cache_ppathid in perl
90  */
91
92 #define NITEMS 50000
93 class pathid_cache {
94 private:
95    hlink *nodes;
96    int nb_node;
97    int max_node;
98    htable *cache_ppathid;
99
100 public:
101    pathid_cache() {
102       hlink link;
103       cache_ppathid = (htable *)malloc(sizeof(htable));
104       cache_ppathid->init(&link, &link, NITEMS);
105       max_node = NITEMS;
106       nodes = (hlink *) malloc(max_node * sizeof (hlink));
107       nb_node = 0;
108    }
109
110    hlink *get_hlink() {
111       if (nb_node >= max_node) {
112          max_node *= 2;
113          nodes = (hlink *)brealloc(nodes, sizeof(hlink) * max_node);
114       }
115       return nodes + nb_node++;
116    }
117
118    bool lookup(char *pathid) {
119       bool ret = cache_ppathid->lookup(pathid) != NULL;
120       return ret;
121    }
122    
123    void insert(char *pathid) {
124       hlink *h = get_hlink();
125       cache_ppathid->insert(pathid, h);
126    }
127
128    ~pathid_cache() {
129       cache_ppathid->destroy();
130       free(cache_ppathid);
131       free(nodes);
132    }
133 private:
134    pathid_cache(const pathid_cache &); /* prohibit pass by value */
135    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
136 } ;
137
138 /* Return the parent_dir with the trailing /  (update the given string)
139  * TODO: see in the rest of bacula if we don't have already this function
140  * dir=/tmp/toto/
141  * dir=/tmp/
142  * dir=/
143  * dir=
144  */
145 char *bvfs_parent_dir(char *path)
146 {
147    char *p = path;
148    int len = strlen(path) - 1;
149
150    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
151       path[len] = '\0';
152    }
153
154    if (len > 0) {
155       p += len;
156       while (p > path && !IsPathSeparator(*p)) {
157          p--;
158       }
159       p[1] = '\0';
160    }
161    return path;
162 }
163
164 /* Return the basename of the with the trailing /
165  * TODO: see in the rest of bacula if we don't have
166  * this function already
167  */
168 char *bvfs_basename_dir(char *path)
169 {
170    char *p = path;
171    int len = strlen(path) - 1;
172
173    if (path[len] == '/') {      /* if directory, skip last / */
174       len -= 1;
175    }
176
177    if (len > 0) {
178       p += len;
179       while (p > path && !IsPathSeparator(*p)) {
180          p--;
181       }
182       if (*p == '/') {
183          p++;                  /* skip first / */
184       }
185    } 
186    return p;
187 }
188
189 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
190                                  pathid_cache &ppathid_cache, 
191                                  char *org_pathid, char *path)
192 {
193    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
194    char pathid[50];
195    ATTR_DBR parent;
196    char *bkp = mdb->path;
197    strncpy(pathid, org_pathid, sizeof(pathid));
198
199    /* Does the ppathid exist for this ? we use a memory cache...  In order to
200     * avoid the full loop, we consider that if a dir is allready in the
201     * PathHierarchy table, then there is no need to calculate all the
202     * hierarchy
203     */
204    while (path && *path)
205    {
206       if (!ppathid_cache.lookup(pathid))
207       {
208          Mmsg(mdb->cmd, 
209               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
210               pathid);
211
212          QUERY_DB(jcr, mdb, mdb->cmd);
213          /* Do we have a result ? */
214          if (sql_num_rows(mdb) > 0) {
215             ppathid_cache.insert(pathid);
216             /* This dir was in the db ...
217              * It means we can leave, the tree has allready been built for
218              * this dir
219              */
220             goto bail_out;
221          } else {
222             /* search or create parent PathId in Path table */
223             mdb->path = bvfs_parent_dir(path);
224             mdb->pnl = strlen(mdb->path);
225             if (!db_create_path_record(jcr, mdb, &parent)) {
226                goto bail_out;
227             }
228             ppathid_cache.insert(pathid);
229             
230             Mmsg(mdb->cmd,
231                  "INSERT INTO PathHierarchy (PathId, PPathId) "
232                  "VALUES (%s,%lld)",
233                  pathid, (uint64_t) parent.PathId);
234             
235             INSERT_DB(jcr, mdb, mdb->cmd);
236
237             edit_uint64(parent.PathId, pathid);
238             path = mdb->path;   /* already done */
239          }
240       } else {
241          /* It's already in the cache.  We can leave, no time to waste here,
242           * all the parent dirs have allready been done
243           */
244          goto bail_out;
245       }
246    }   
247
248 bail_out:
249    mdb->path = bkp;
250    mdb->fnl = 0;
251 }
252
253 /* 
254  * Internal function to update path_hierarchy cache with a shared pathid cache
255  */
256 static void update_path_hierarchy_cache(JCR *jcr,
257                                         B_DB *mdb,
258                                         pathid_cache &ppathid_cache,
259                                         JobId_t JobId)
260 {
261    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
262
263    uint32_t num;
264    char jobid[50];
265    edit_uint64(JobId, jobid);
266  
267    db_lock(mdb);
268    db_start_transaction(jcr, mdb);
269
270    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
271    
272    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
273       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
274       goto bail_out;
275    }
276
277    /* Inserting path records for JobId */
278    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
279                   "SELECT DISTINCT PathId, JobId FROM File WHERE JobId = %s",
280         jobid);
281    QUERY_DB(jcr, mdb, mdb->cmd);
282
283
284    /* Now we have to do the directory recursion stuff to determine missing
285     * visibility We try to avoid recursion, to be as fast as possible We also
286     * only work on not allready hierarchised directories...
287     */
288    Mmsg(mdb->cmd, 
289      "SELECT PathVisibility.PathId, Path "
290        "FROM PathVisibility "
291             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
292             "LEFT JOIN PathHierarchy "
293          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
294       "WHERE PathVisibility.JobId = %s "
295         "AND PathHierarchy.PathId IS NULL "
296       "ORDER BY Path", jobid);
297    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
298    QUERY_DB(jcr, mdb, mdb->cmd);
299
300    /* TODO: I need to reuse the DB connection without emptying the result 
301     * So, now i'm copying the result in memory to be able to query the
302     * catalog descriptor again.
303     */
304    num = sql_num_rows(mdb);
305    if (num > 0) {
306       char **result = (char **)malloc (num * 2 * sizeof(char *));
307       
308       SQL_ROW row;
309       int i=0;
310       while((row = sql_fetch_row(mdb))) {
311          result[i++] = bstrdup(row[0]);
312          result[i++] = bstrdup(row[1]);
313       }
314       
315       i=0;
316       while (num > 0) {
317          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
318          free(result[i++]);
319          free(result[i++]);
320          num--;
321       }
322       free(result);
323    }
324    
325    Mmsg(mdb->cmd, 
326   "INSERT INTO PathVisibility (PathId, JobId)  "
327    "SELECT a.PathId,%s "
328    "FROM ( "
329      "SELECT DISTINCT h.PPathId AS PathId "
330        "FROM PathHierarchy AS h "
331        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
332       "WHERE p.JobId=%s) AS a LEFT JOIN "
333        "(SELECT PathId "
334           "FROM PathVisibility "
335          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
336    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
337
338    do {
339       QUERY_DB(jcr, mdb, mdb->cmd);
340    } while (sql_affected_rows(mdb) > 0);
341    
342    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
343    UPDATE_DB(jcr, mdb, mdb->cmd);
344
345 bail_out:
346    db_end_transaction(jcr, mdb);
347    db_unlock(mdb);
348 }
349
350 /* 
351  * Find an store the filename descriptor for empty directories Filename.Name=''
352  */
353 DBId_t Bvfs::get_dir_filenameid()
354 {
355    uint32_t id;
356    if (dir_filenameid) {
357       return dir_filenameid;
358    }
359    POOL_MEM q;
360    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
361    db_sql_query(db, q.c_str(), db_int_handler, &id);
362    dir_filenameid = id;
363    return dir_filenameid;
364 }
365
366 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
367 {
368    uint32_t nb=0;
369    db_list_ctx jobids_list;
370
371    db_lock(mdb);
372    db_start_transaction(jcr, mdb);
373
374    /* TODO: Remove this code when updating make_bacula_table script */
375    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
376    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
377       Dmsg0(dbglevel, "Creating cache table\n");
378       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
379       QUERY_DB(jcr, mdb, mdb->cmd);
380
381       Mmsg(mdb->cmd,
382            "CREATE TABLE PathHierarchy ( "
383            "PathId integer NOT NULL, "
384            "PPathId integer NOT NULL, "
385            "CONSTRAINT pathhierarchy_pkey "
386            "PRIMARY KEY (PathId))");
387       QUERY_DB(jcr, mdb, mdb->cmd); 
388
389       Mmsg(mdb->cmd,
390            "CREATE INDEX pathhierarchy_ppathid "
391            "ON PathHierarchy (PPathId)");
392       QUERY_DB(jcr, mdb, mdb->cmd);
393
394       Mmsg(mdb->cmd, 
395            "CREATE TABLE PathVisibility ("
396            "PathId integer NOT NULL, "
397            "JobId integer NOT NULL, "
398            "Size int8 DEFAULT 0, "
399            "Files int4 DEFAULT 0, "
400            "CONSTRAINT pathvisibility_pkey "
401            "PRIMARY KEY (JobId, PathId))");
402       QUERY_DB(jcr, mdb, mdb->cmd);
403
404       Mmsg(mdb->cmd, 
405            "CREATE INDEX pathvisibility_jobid "
406            "ON PathVisibility (JobId)");
407       QUERY_DB(jcr, mdb, mdb->cmd);
408
409    }
410
411    Mmsg(mdb->cmd, 
412  "SELECT JobId from Job "
413   "WHERE HashCache = 0 "
414     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
415   "ORDER BY JobId");
416
417    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
418
419    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
420
421    db_end_transaction(jcr, mdb);
422    db_start_transaction(jcr, mdb);
423    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
424    Mmsg(mdb->cmd, 
425         "DELETE FROM PathVisibility "
426          "WHERE NOT EXISTS "
427         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
428    nb = DELETE_DB(jcr, mdb, mdb->cmd);
429    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
430
431    db_end_transaction(jcr, mdb);
432 }
433
434 /*
435  * Update the bvfs cache for given jobids (1,2,3,4)
436  */
437 void
438 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
439 {
440    pathid_cache ppathid_cache;
441    JobId_t JobId;
442    char *p;
443
444    for (p=jobids; ; ) {
445       int stat = get_next_jobid_from_list(&p, &JobId);
446       if (stat < 0) {
447          return;
448       }
449       if (stat == 0) {
450          break;
451       }
452       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
453       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
454    }
455 }
456
457 /* 
458  * Update the bvfs cache for current jobids
459  */
460 void Bvfs::update_cache()
461 {
462    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
463 }
464
465 /* Change the current directory, returns true if the path exists */
466 bool Bvfs::ch_dir(const char *path)
467 {
468    pm_strcpy(db->path, path);
469    db->pnl = strlen(db->path);
470    ch_dir(db_get_path_record(jcr, db)); 
471    return pwd_id != 0;
472 }
473
474 /* 
475  * Get all file versions for a specified client
476  */
477 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
478 {
479    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
480          (uint64_t)fnid, client);
481    char ed1[50], ed2[50];
482    POOL_MEM q;
483    if (see_copies) {
484       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
485    } else {
486       Mmsg(q, " AND Job.Type = 'B' ");
487    }
488
489    POOL_MEM query;
490
491    Mmsg(query,//    1           2          3       4
492 "SELECT 'V', File.FileId, File.Md5, File.JobId, File.LStat, "
493 //         5                6
494        "Media.VolumeName, Media.InChanger "
495 "FROM File, Job, Client, JobMedia, Media "
496 "WHERE File.FilenameId = %s "
497   "AND File.PathId=%s "
498   "AND File.JobId = Job.JobId "
499   "AND Job.ClientId = Client.ClientId "
500   "AND Job.JobId = JobMedia.JobId "
501   "AND File.FileIndex >= JobMedia.FirstIndex "
502   "AND File.FileIndex <= JobMedia.LastIndex "
503   "AND JobMedia.MediaId = Media.MediaId "
504   "AND Client.Name = '%s' "
505   "%s ORDER BY FileId LIMIT %d OFFSET %d"
506         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
507         limit, offset);
508    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
509    db_sql_query(db, query.c_str(), list_entries, user_data);
510 }
511
512 DBId_t Bvfs::get_root()
513 {
514    *db->path = 0;
515    return db_get_path_record(jcr, db);
516 }
517
518 static int path_handler(void *ctx, int fields, char **row)
519 {
520    Bvfs *fs = (Bvfs *) ctx;
521    return fs->_handle_path(ctx, fields, row);
522 }
523
524 int Bvfs::_handle_path(void *ctx, int fields, char **row)
525 {
526    if (bvfs_is_dir(row)) {
527       /* can have the same path 2 times */
528       if (strcmp(row[BVFS_Name], prev_dir)) {
529          pm_strcpy(prev_dir, row[BVFS_Name]);
530          return list_entries(user_data, fields, row);
531       }
532    }
533    return 0;
534 }
535
536 /* 
537  * Retrieve . and .. information
538  */
539 void Bvfs::ls_special_dirs()
540 {
541    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
542    char ed1[50], ed2[50];
543    if (*jobids == 0) {
544       return;
545    }
546    if (!dir_filenameid) {
547       get_dir_filenameid();
548    }
549
550    /* Will fetch directories  */
551    *prev_dir = 0;
552
553    POOL_MEM query;
554    Mmsg(query, 
555 "((SELECT PPathId AS PathId, '..' AS Path "
556     "FROM  PathHierarchy "
557    "WHERE  PathId = %s) "
558 "UNION "
559  "(SELECT %s AS PathId, '.' AS Path))",
560         edit_uint64(pwd_id, ed1), ed1);
561
562    POOL_MEM query2;
563    Mmsg(query2,// 1      2     3        4     5       6
564 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
565   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
566        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
567               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
568        "WHERE File1.FilenameId = %s "
569        "AND File1.JobId IN (%s)) AS listfile1 "
570   "ON (tmp.PathId = listfile1.PathId) "
571   "ORDER BY tmp.Path, JobId DESC ",
572         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
573
574    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
575    db_sql_query(db, query2.c_str(), path_handler, this);
576 }
577
578 /* Returns true if we have dirs to read */
579 bool Bvfs::ls_dirs()
580 {
581    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
582    char ed1[50], ed2[50];
583    if (*jobids == 0) {
584       return false;
585    }
586
587    POOL_MEM filter;
588    if (*pattern) {
589       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
590    }
591
592    if (!dir_filenameid) {
593       get_dir_filenameid();
594    }
595
596    /* the sql query displays same directory multiple time, take the first one */
597    *prev_dir = 0;
598
599    /* Let's retrieve the list of the visible dirs in this dir ...
600     * First, I need the empty filenameid to locate efficiently
601     * the dirs in the file table
602     * my $dir_filenameid = $self->get_dir_filenameid();
603     */
604    /* Then we get all the dir entries from File ... */
605    POOL_MEM query;
606    Mmsg(query,
607 //       0     1     2   3      4     5       6
608 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
609     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
610            "lower(Path1.Path) AS lpath, "
611            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
612            "listfile1.FileId AS FileId "
613     "FROM ( "
614       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
615       "FROM PathHierarchy AS PathHierarchy1 "
616       "JOIN Path AS Path2 "
617         "ON (PathHierarchy1.PathId = Path2.PathId) "
618       "JOIN PathVisibility AS PathVisibility1 "
619         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
620       "WHERE PathHierarchy1.PPathId = %s "
621       "AND PathVisibility1.jobid IN (%s) "
622            "%s "
623      ") AS listpath1 "
624    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
625
626    "LEFT JOIN ( " /* get attributes if any */
627        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
628               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
629        "WHERE File1.FilenameId = %s "
630        "AND File1.JobId IN (%s)) AS listfile1 "
631        "ON (listpath1.PathId = listfile1.PathId) "
632     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
633         edit_uint64(pwd_id, ed1),
634         jobids,
635         filter.c_str(),
636         edit_uint64(dir_filenameid, ed2),
637         jobids,
638         limit, offset);
639
640    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
641
642    db_lock(db);
643    db_sql_query(db, query.c_str(), path_handler, this);
644    nb_record = db->num_rows;
645    db_unlock(db);
646
647    return nb_record == limit;
648 }
649
650 /* Returns true if we have files to read */
651 bool Bvfs::ls_files()
652 {
653    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
654    char ed1[50];
655    if (*jobids == 0) {
656       return false;
657    }
658
659    if (!pwd_id) {
660       ch_dir(get_root());
661    }
662
663    POOL_MEM filter;
664    if (*pattern) {
665       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
666    }
667
668    POOL_MEM query;
669    Mmsg(query, //    1              2             3          4
670 "SELECT 'F', File.PathId, File.FilenameId, listfiles.Name, File.JobId, "
671         "File.LStat, listfiles.id "
672 "FROM File, ( "
673        "SELECT Filename.Name as Name, max(File.FileId) as id "
674          "FROM File, Filename "
675         "WHERE File.FilenameId = Filename.FilenameId "
676           "AND Filename.Name != '' "
677           "AND File.PathId = %s "
678           "AND File.JobId IN (%s) "
679           "%s "
680         "GROUP BY Filename.Name "
681         "ORDER BY Filename.Name LIMIT %d OFFSET %d "
682      ") AS listfiles "
683 "WHERE File.FileId = listfiles.id",
684         edit_uint64(pwd_id, ed1),
685         jobids,
686         filter.c_str(),
687         limit,
688         offset);
689    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
690
691    db_lock(db);
692    db_sql_query(db, query.c_str(), list_entries, user_data);
693    nb_record = db->num_rows;
694    db_unlock(db);
695
696    return nb_record == limit;
697 }