]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Tweak remove last reference to __SQL_C which is not needed anymore.
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #include "bacula.h"
30
31 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
32
33 #include "cats.h"
34 #include "bdb_priv.h"
35 #include "sql_glue.h"
36 #include "lib/htable.h"
37 #include "bvfs.h"
38
39 #define dbglevel 10
40 #define dbglevel_sql 15
41
42 static int result_handler(void *ctx, int fields, char **row)
43 {
44    if (fields == 4) {
45       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3]);
47    } else if (fields == 5) {
48       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4]);
50    } else if (fields == 6) {
51       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5]);
53    } else if (fields == 7) {
54       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
55             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
56    }
57    return 0;
58 }
59
60 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
61    jcr = j;
62    jcr->inc_use_count();
63    db = mdb;                 /* need to inc ref count */
64    jobids = get_pool_memory(PM_NAME);
65    prev_dir = get_pool_memory(PM_NAME);
66    pattern = get_pool_memory(PM_NAME);
67    *jobids = *prev_dir = *pattern = 0;
68    dir_filenameid = pwd_id = offset = 0;
69    see_copies = see_all_versions = false;
70    limit = 1000;
71    attr = new_attr(jcr);
72    list_entries = result_handler;
73    user_data = this;
74    username = NULL;
75 }
76
77 Bvfs::~Bvfs() {
78    free_pool_memory(jobids);
79    free_pool_memory(pattern);
80    free_pool_memory(prev_dir);
81    if (username) {
82       free(username);
83    }
84    free_attr(attr);
85    jcr->dec_use_count();
86 }
87
88 void Bvfs::filter_jobid()
89 {
90    if (!username) {
91       return;
92    }
93
94    /* Query used by Bweb to filter clients, activated when using
95     * set_username() 
96     */
97    POOL_MEM query;
98    Mmsg(query,
99       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
100         "JOIN (SELECT ClientId FROM client_group_member "
101         "JOIN client_group USING (client_group_id) "
102         "JOIN bweb_client_group_acl USING (client_group_id) "
103         "JOIN bweb_user USING (userid) "
104        "WHERE bweb_user.username = '%s' "
105       ") AS filter USING (ClientId) "
106         " WHERE JobId IN (%s)", 
107         username, jobids);
108
109    db_list_ctx ctx;
110    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
111    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
112    pm_strcpy(jobids, ctx.list);
113 }
114
115 void Bvfs::set_jobid(JobId_t id)
116 {
117    Mmsg(jobids, "%lld", (uint64_t)id);
118    filter_jobid();
119 }
120
121 void Bvfs::set_jobids(char *ids)
122 {
123    pm_strcpy(jobids, ids);
124    filter_jobid();
125 }
126
127 /* 
128  * TODO: Find a way to let the user choose how he wants to display
129  * files and directories
130  */
131
132
133 /* 
134  * Working Object to store PathId already seen (avoid
135  * database queries), equivalent to %cache_ppathid in perl
136  */
137
138 #define NITEMS 50000
139 class pathid_cache {
140 private:
141    hlink *nodes;
142    int nb_node;
143    int max_node;
144
145    alist *table_node;
146
147    htable *cache_ppathid;
148
149 public:
150    pathid_cache() {
151       hlink link;
152       cache_ppathid = (htable *)malloc(sizeof(htable));
153       cache_ppathid->init(&link, &link, NITEMS);
154       max_node = NITEMS;
155       nodes = (hlink *) malloc(max_node * sizeof (hlink));
156       nb_node = 0;
157       table_node = New(alist(5, owned_by_alist));
158       table_node->append(nodes);
159    }
160
161    hlink *get_hlink() {
162       if (++nb_node >= max_node) {
163          nb_node = 0;
164          nodes = (hlink *)malloc(max_node * sizeof(hlink));
165          table_node->append(nodes);
166       }
167       return nodes + nb_node;
168    }
169
170    bool lookup(char *pathid) {
171       bool ret = cache_ppathid->lookup(pathid) != NULL;
172       return ret;
173    }
174    
175    void insert(char *pathid) {
176       hlink *h = get_hlink();
177       cache_ppathid->insert(pathid, h);
178    }
179
180    ~pathid_cache() {
181       cache_ppathid->destroy();
182       free(cache_ppathid);
183       delete table_node;
184    }
185 private:
186    pathid_cache(const pathid_cache &); /* prohibit pass by value */
187    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
188 } ;
189
190 /* Return the parent_dir with the trailing /  (update the given string)
191  * TODO: see in the rest of bacula if we don't have already this function
192  * dir=/tmp/toto/
193  * dir=/tmp/
194  * dir=/
195  * dir=
196  */
197 char *bvfs_parent_dir(char *path)
198 {
199    char *p = path;
200    int len = strlen(path) - 1;
201
202    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
203       path[len] = '\0';
204    }
205
206    if (len > 0) {
207       p += len;
208       while (p > path && !IsPathSeparator(*p)) {
209          p--;
210       }
211       p[1] = '\0';
212    }
213    return path;
214 }
215
216 /* Return the basename of the with the trailing /
217  * TODO: see in the rest of bacula if we don't have
218  * this function already
219  */
220 char *bvfs_basename_dir(char *path)
221 {
222    char *p = path;
223    int len = strlen(path) - 1;
224
225    if (path[len] == '/') {      /* if directory, skip last / */
226       len -= 1;
227    }
228
229    if (len > 0) {
230       p += len;
231       while (p > path && !IsPathSeparator(*p)) {
232          p--;
233       }
234       if (*p == '/') {
235          p++;                  /* skip first / */
236       }
237    } 
238    return p;
239 }
240
241 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
242                                  pathid_cache &ppathid_cache, 
243                                  char *org_pathid, char *path)
244 {
245    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
246    char pathid[50];
247    ATTR_DBR parent;
248    char *bkp = mdb->path;
249    strncpy(pathid, org_pathid, sizeof(pathid));
250
251    /* Does the ppathid exist for this ? we use a memory cache...  In order to
252     * avoid the full loop, we consider that if a dir is allready in the
253     * PathHierarchy table, then there is no need to calculate all the
254     * hierarchy
255     */
256    while (path && *path)
257    {
258       if (!ppathid_cache.lookup(pathid))
259       {
260          Mmsg(mdb->cmd, 
261               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
262               pathid);
263
264          QUERY_DB(jcr, mdb, mdb->cmd);
265          /* Do we have a result ? */
266          if (sql_num_rows(mdb) > 0) {
267             ppathid_cache.insert(pathid);
268             /* This dir was in the db ...
269              * It means we can leave, the tree has allready been built for
270              * this dir
271              */
272             goto bail_out;
273          } else {
274             /* search or create parent PathId in Path table */
275             mdb->path = bvfs_parent_dir(path);
276             mdb->pnl = strlen(mdb->path);
277             if (!db_create_path_record(jcr, mdb, &parent)) {
278                goto bail_out;
279             }
280             ppathid_cache.insert(pathid);
281             
282             Mmsg(mdb->cmd,
283                  "INSERT INTO PathHierarchy (PathId, PPathId) "
284                  "VALUES (%s,%lld)",
285                  pathid, (uint64_t) parent.PathId);
286             
287             INSERT_DB(jcr, mdb, mdb->cmd);
288
289             edit_uint64(parent.PathId, pathid);
290             path = mdb->path;   /* already done */
291          }
292       } else {
293          /* It's already in the cache.  We can leave, no time to waste here,
294           * all the parent dirs have allready been done
295           */
296          goto bail_out;
297       }
298    }   
299
300 bail_out:
301    mdb->path = bkp;
302    mdb->fnl = 0;
303 }
304
305 /* 
306  * Internal function to update path_hierarchy cache with a shared pathid cache
307  */
308 static void update_path_hierarchy_cache(JCR *jcr,
309                                         B_DB *mdb,
310                                         pathid_cache &ppathid_cache,
311                                         JobId_t JobId)
312 {
313    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
314
315    uint32_t num;
316    char jobid[50];
317    edit_uint64(JobId, jobid);
318  
319    db_lock(mdb);
320    db_start_transaction(jcr, mdb);
321
322    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
323    
324    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
325       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
326       goto bail_out;
327    }
328
329    /* Inserting path records for JobId */
330    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
331                    "SELECT DISTINCT PathId, JobId "
332                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
333                            "UNION "
334                            "SELECT PathId, BaseFiles.JobId "
335                              "FROM BaseFiles JOIN File AS F USING (FileId) "
336                             "WHERE BaseFiles.JobId = %s) AS B",
337         jobid, jobid);
338    QUERY_DB(jcr, mdb, mdb->cmd);
339
340    /* Now we have to do the directory recursion stuff to determine missing
341     * visibility We try to avoid recursion, to be as fast as possible We also
342     * only work on not allready hierarchised directories...
343     */
344    Mmsg(mdb->cmd, 
345      "SELECT PathVisibility.PathId, Path "
346        "FROM PathVisibility "
347             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
348             "LEFT JOIN PathHierarchy "
349          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
350       "WHERE PathVisibility.JobId = %s "
351         "AND PathHierarchy.PathId IS NULL "
352       "ORDER BY Path", jobid);
353    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
354    QUERY_DB(jcr, mdb, mdb->cmd);
355
356    /* TODO: I need to reuse the DB connection without emptying the result 
357     * So, now i'm copying the result in memory to be able to query the
358     * catalog descriptor again.
359     */
360    num = sql_num_rows(mdb);
361    if (num > 0) {
362       char **result = (char **)malloc (num * 2 * sizeof(char *));
363       
364       SQL_ROW row;
365       int i=0;
366       while((row = sql_fetch_row(mdb))) {
367          result[i++] = bstrdup(row[0]);
368          result[i++] = bstrdup(row[1]);
369       }
370       
371       i=0;
372       while (num > 0) {
373          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
374          free(result[i++]);
375          free(result[i++]);
376          num--;
377       }
378       free(result);
379    }
380
381    Mmsg(mdb->cmd, 
382   "INSERT INTO PathVisibility (PathId, JobId)  "
383    "SELECT a.PathId,%s "
384    "FROM ( "
385      "SELECT DISTINCT h.PPathId AS PathId "
386        "FROM PathHierarchy AS h "
387        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
388       "WHERE p.JobId=%s) AS a LEFT JOIN "
389        "(SELECT PathId "
390           "FROM PathVisibility "
391          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
392    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
393
394    do {
395       QUERY_DB(jcr, mdb, mdb->cmd);
396    } while (sql_affected_rows(mdb) > 0);
397    
398    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
399    UPDATE_DB(jcr, mdb, mdb->cmd);
400
401 bail_out:
402    db_end_transaction(jcr, mdb);
403    db_unlock(mdb);
404 }
405
406 /* 
407  * Find an store the filename descriptor for empty directories Filename.Name=''
408  */
409 DBId_t Bvfs::get_dir_filenameid()
410 {
411    uint32_t id;
412    if (dir_filenameid) {
413       return dir_filenameid;
414    }
415    POOL_MEM q;
416    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
417    db_sql_query(db, q.c_str(), db_int_handler, &id);
418    dir_filenameid = id;
419    return dir_filenameid;
420 }
421
422 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
423 {
424    uint32_t nb=0;
425    db_list_ctx jobids_list;
426
427    db_lock(mdb);
428    db_start_transaction(jcr, mdb);
429
430 #ifdef xxx
431    /* TODO: Remove this code when updating make_bacula_table script */
432    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
433    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
434       Dmsg0(dbglevel, "Creating cache table\n");
435       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
436       QUERY_DB(jcr, mdb, mdb->cmd);
437
438       Mmsg(mdb->cmd,
439            "CREATE TABLE PathHierarchy ( "
440            "PathId integer NOT NULL, "
441            "PPathId integer NOT NULL, "
442            "CONSTRAINT pathhierarchy_pkey "
443            "PRIMARY KEY (PathId))");
444       QUERY_DB(jcr, mdb, mdb->cmd); 
445
446       Mmsg(mdb->cmd,
447            "CREATE INDEX pathhierarchy_ppathid "
448            "ON PathHierarchy (PPathId)");
449       QUERY_DB(jcr, mdb, mdb->cmd);
450
451       Mmsg(mdb->cmd, 
452            "CREATE TABLE PathVisibility ("
453            "PathId integer NOT NULL, "
454            "JobId integer NOT NULL, "
455            "Size int8 DEFAULT 0, "
456            "Files int4 DEFAULT 0, "
457            "CONSTRAINT pathvisibility_pkey "
458            "PRIMARY KEY (JobId, PathId))");
459       QUERY_DB(jcr, mdb, mdb->cmd);
460
461       Mmsg(mdb->cmd, 
462            "CREATE INDEX pathvisibility_jobid "
463            "ON PathVisibility (JobId)");
464       QUERY_DB(jcr, mdb, mdb->cmd);
465
466    }
467 #endif
468
469    Mmsg(mdb->cmd, 
470  "SELECT JobId from Job "
471   "WHERE HasCache = 0 "
472     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
473   "ORDER BY JobId");
474
475    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
476
477    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
478
479    db_end_transaction(jcr, mdb);
480    db_start_transaction(jcr, mdb);
481    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
482    Mmsg(mdb->cmd, 
483         "DELETE FROM PathVisibility "
484          "WHERE NOT EXISTS "
485         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
486    nb = DELETE_DB(jcr, mdb, mdb->cmd);
487    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
488
489    db_end_transaction(jcr, mdb);
490    db_unlock(mdb);
491 }
492
493 /*
494  * Update the bvfs cache for given jobids (1,2,3,4)
495  */
496 void
497 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
498 {
499    pathid_cache ppathid_cache;
500    JobId_t JobId;
501    char *p;
502
503    for (p=jobids; ; ) {
504       int stat = get_next_jobid_from_list(&p, &JobId);
505       if (stat < 0) {
506          return;
507       }
508       if (stat == 0) {
509          break;
510       }
511       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
512       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
513    }
514 }
515
516 /* 
517  * Update the bvfs cache for current jobids
518  */
519 void Bvfs::update_cache()
520 {
521    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
522 }
523
524 /* Change the current directory, returns true if the path exists */
525 bool Bvfs::ch_dir(const char *path)
526 {
527    pm_strcpy(db->path, path);
528    db->pnl = strlen(db->path);
529    ch_dir(db_get_path_record(jcr, db)); 
530    return pwd_id != 0;
531 }
532
533 /* 
534  * Get all file versions for a specified client
535  * TODO: Handle basejobs using different client
536  */
537 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
538 {
539    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
540          (uint64_t)fnid, client);
541    char ed1[50], ed2[50];
542    POOL_MEM q;
543    if (see_copies) {
544       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
545    } else {
546       Mmsg(q, " AND Job.Type = 'B' ");
547    }
548
549    POOL_MEM query;
550
551    Mmsg(query,//    1           2              3       
552 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
553 //         4          5           6
554         "File.JobId, File.LStat, File.FileId, "
555 //         7                    8
556        "Media.VolumeName, Media.InChanger "
557 "FROM File, Job, Client, JobMedia, Media "
558 "WHERE File.FilenameId = %s "
559   "AND File.PathId=%s "
560   "AND File.JobId = Job.JobId "
561   "AND Job.JobId = JobMedia.JobId "
562   "AND File.FileIndex >= JobMedia.FirstIndex "
563   "AND File.FileIndex <= JobMedia.LastIndex "
564   "AND JobMedia.MediaId = Media.MediaId "
565   "AND Job.ClientId = Client.ClientId "
566   "AND Client.Name = '%s' "
567   "%s ORDER BY FileId LIMIT %d OFFSET %d"
568         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
569         limit, offset);
570    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
571    db_sql_query(db, query.c_str(), list_entries, user_data);
572 }
573
574 DBId_t Bvfs::get_root()
575 {
576    *db->path = 0;
577    return db_get_path_record(jcr, db);
578 }
579
580 static int path_handler(void *ctx, int fields, char **row)
581 {
582    Bvfs *fs = (Bvfs *) ctx;
583    return fs->_handle_path(ctx, fields, row);
584 }
585
586 int Bvfs::_handle_path(void *ctx, int fields, char **row)
587 {
588    if (bvfs_is_dir(row)) {
589       /* can have the same path 2 times */
590       if (strcmp(row[BVFS_Name], prev_dir)) {
591          pm_strcpy(prev_dir, row[BVFS_Name]);
592          return list_entries(user_data, fields, row);
593       }
594    }
595    return 0;
596 }
597
598 /* 
599  * Retrieve . and .. information
600  */
601 void Bvfs::ls_special_dirs()
602 {
603    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
604    char ed1[50], ed2[50];
605    if (*jobids == 0) {
606       return;
607    }
608    if (!dir_filenameid) {
609       get_dir_filenameid();
610    }
611
612    /* Will fetch directories  */
613    *prev_dir = 0;
614
615    POOL_MEM query;
616    Mmsg(query, 
617 "(SELECT PPathId AS PathId, '..' AS Path "
618     "FROM  PathHierarchy "
619    "WHERE  PathId = %s "
620 "UNION "
621  "SELECT %s AS PathId, '.' AS Path)",
622         edit_uint64(pwd_id, ed1), ed1);
623
624    POOL_MEM query2;
625    Mmsg(query2,// 1      2     3        4     5       6
626 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
627   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
628        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
629               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
630        "WHERE File1.FilenameId = %s "
631        "AND File1.JobId IN (%s)) AS listfile1 "
632   "ON (tmp.PathId = listfile1.PathId) "
633   "ORDER BY tmp.Path, JobId DESC ",
634         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
635
636    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
637    db_sql_query(db, query2.c_str(), path_handler, this);
638 }
639
640 /* Returns true if we have dirs to read */
641 bool Bvfs::ls_dirs()
642 {
643    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
644    char ed1[50], ed2[50];
645    if (*jobids == 0) {
646       return false;
647    }
648
649    POOL_MEM query;
650    POOL_MEM filter;
651    if (*pattern) {
652       int len = strlen(pattern);
653       query.check_size(len*2+1);
654       db_escape_string(jcr, db, query.c_str(), pattern, len);
655       Mmsg(filter, " AND Path2.Path %s '%s' ", match_query[db_get_type_index(db)], query.c_str());
656    }
657
658    if (!dir_filenameid) {
659       get_dir_filenameid();
660    }
661
662    /* the sql query displays same directory multiple time, take the first one */
663    *prev_dir = 0;
664
665    /* Let's retrieve the list of the visible dirs in this dir ...
666     * First, I need the empty filenameid to locate efficiently
667     * the dirs in the file table
668     * my $dir_filenameid = $self->get_dir_filenameid();
669     */
670    /* Then we get all the dir entries from File ... */
671    Mmsg(query,
672 //       0     1     2   3      4     5       6
673 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
674     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
675            "lower(Path1.Path) AS lpath, "
676            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
677            "listfile1.FileId AS FileId "
678     "FROM ( "
679       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
680       "FROM PathHierarchy AS PathHierarchy1 "
681       "JOIN Path AS Path2 "
682         "ON (PathHierarchy1.PathId = Path2.PathId) "
683       "JOIN PathVisibility AS PathVisibility1 "
684         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
685       "WHERE PathHierarchy1.PPathId = %s "
686       "AND PathVisibility1.JobId IN (%s) "
687            "%s "
688      ") AS listpath1 "
689    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
690
691    "LEFT JOIN ( " /* get attributes if any */
692        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
693               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
694        "WHERE File1.FilenameId = %s "
695        "AND File1.JobId IN (%s)) AS listfile1 "
696        "ON (listpath1.PathId = listfile1.PathId) "
697     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
698         edit_uint64(pwd_id, ed1),
699         jobids,
700         filter.c_str(),
701         edit_uint64(dir_filenameid, ed2),
702         jobids,
703         limit, offset);
704
705    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
706
707    db_lock(db);
708    db_sql_query(db, query.c_str(), path_handler, this);
709    nb_record = sql_num_rows(db);
710    db_unlock(db);
711
712    return nb_record == limit;
713 }
714
715 void build_ls_files_query(B_DB *db, POOL_MEM &query, 
716                           const char *JobId, const char *PathId,  
717                           const char *filter, int64_t limit, int64_t offset)
718 {
719    if (db_get_type_index(db) == SQL_TYPE_POSTGRESQL) {
720       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
721            JobId, PathId, JobId, PathId, 
722            filter, limit, offset);
723    } else {
724       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
725            JobId, PathId, JobId, PathId, 
726            limit, offset, filter, JobId, JobId);
727    }
728 }
729
730 /* Returns true if we have files to read */
731 bool Bvfs::ls_files()
732 {
733    POOL_MEM query;
734    POOL_MEM filter;
735    char pathid[50];
736
737    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
738    if (*jobids == 0) {
739       return false;
740    }
741
742    if (!pwd_id) {
743       ch_dir(get_root());
744    }
745
746    edit_uint64(pwd_id, pathid);
747    if (*pattern) {
748       int len = strlen(pattern);
749       query.check_size(len*2+1);
750       db_escape_string(jcr, db, query.c_str(), pattern, len);
751       Mmsg(filter, " AND Filename.Name %s '%s' ", match_query[db_get_type_index(db)], query.c_str());
752    }
753
754    build_ls_files_query(db, query, 
755                         jobids, pathid, filter.c_str(),
756                         limit, offset);
757
758    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
759
760    db_lock(db);
761    db_sql_query(db, query.c_str(), list_entries, user_data);
762    nb_record = sql_num_rows(db);
763    db_unlock(db);
764
765    return nb_record == limit;
766 }
767
768
769 /* 
770  * Return next Id from comma separated list   
771  *
772  * Returns:
773  *   1 if next Id returned
774  *   0 if no more Ids are in list
775  *  -1 there is an error
776  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
777  */
778 static int get_next_id_from_list(char **p, int64_t *Id)
779 {
780    const int maxlen = 30;
781    char id[maxlen+1];
782    char *q = *p;
783
784    id[0] = 0;
785    for (int i=0; i<maxlen; i++) {
786       if (*q == 0) {
787          break;
788       } else if (*q == ',') {
789          q++;
790          break;
791       }
792       id[i] = *q++;
793       id[i+1] = 0;
794    }
795    if (id[0] == 0) {
796       return 0;
797    } else if (!is_a_number(id)) {
798       return -1;                      /* error */
799    }
800    *p = q;
801    *Id = str_to_int64(id);
802    return 1;
803 }
804
805 static int get_path_handler(void *ctx, int fields, char **row)
806 {
807    POOL_MEM *buf = (POOL_MEM *) ctx;
808    pm_strcpy(*buf, row[0]);
809    return 0;
810 }
811
812 static bool check_temp(char *output_table)
813 {
814    if (output_table[0] == 'b' &&
815        output_table[1] == '2' &&
816        is_an_integer(output_table + 2))
817    {
818       return true;
819    }
820    return false;
821 }
822
823 bool Bvfs::drop_restore_list(char *output_table)
824 {
825    POOL_MEM query;
826    if (check_temp(output_table)) {
827       Mmsg(query, "DROP TABLE %s", output_table);
828       db_sql_query(db, query.c_str(), NULL, NULL);
829       return true;
830    }
831    return false;
832 }
833
834 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
835                                 char *output_table)
836 {
837    POOL_MEM query;
838    POOL_MEM tmp, tmp2;
839    int64_t id, jobid;
840    bool init=false;
841    bool ret=false;
842    /* check args */
843    if ((*fileid   && !is_a_number_list(fileid))  ||
844        (*dirid    && !is_a_number_list(dirid))   ||
845        (*hardlink && !is_a_number_list(hardlink))||
846        (!*hardlink && !*fileid && !*dirid && !*hardlink))
847    {
848       return false;
849    }
850    if (!check_temp(output_table)) {
851       return false;
852    }
853
854    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
855
856    if (*fileid) {               /* Select files with their direct id */
857       init=true;
858       Mmsg(tmp,"SELECT JobId, JobTDate, FileIndex, FilenameId, PathId, FileId "
859                   "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
860            fileid);
861       pm_strcat(query, tmp.c_str());
862    }
863
864    /* Add a directory content */
865    while (get_next_id_from_list(&dirid, &id) == 1) {
866       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
867       
868       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
869          Dmsg0(dbglevel, "Can't search for path\n");
870          /* print error */
871          return false;
872       }
873       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
874          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
875                id, tmp.c_str(), tmp2.c_str());
876          break;
877       }
878       /* escape % and _ for LIKE search */
879       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
880       char *p = tmp.c_str();
881       for (char *s = tmp2.c_str(); *s ; s++) {
882          if (*s == '%' || *s == '_' || *s == '\\') {
883             *p = '\\'; 
884             p++;
885          }
886          *p = *s; 
887          p++;
888       }
889       *p = '\0';
890       tmp.strcat("%");
891
892       size_t len = strlen(tmp.c_str());
893       tmp2.check_size((len+1) * 2);
894       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
895
896       if (init) {
897          query.strcat(" UNION ");
898       }
899
900       Mmsg(tmp, "SELECT JobId, JobTDate, File.FileIndex, File.FilenameId, "
901                         "File.PathId, FileId "
902                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
903                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ", 
904            tmp2.c_str(), jobids); 
905       query.strcat(tmp.c_str());
906       init = true;
907
908       query.strcat(" UNION ");
909
910       /* A directory can have files from a BaseJob */
911       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
912                         "File.FilenameId, File.PathId, BaseFiles.FileId "
913                    "FROM BaseFiles "
914                         "JOIN File USING (FileId) "
915                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
916                         "JOIN Path USING (PathId) "
917                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ", 
918            tmp2.c_str(), jobids); 
919       query.strcat(tmp.c_str());
920    }
921
922    /* expect jobid,fileindex */
923    int64_t prev_jobid=0;
924    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
925       if (get_next_id_from_list(&hardlink, &id) != 1) {
926          Dmsg0(dbglevel, "hardlink should be two by two\n");
927          return false;
928       }
929       if (jobid != prev_jobid) { /* new job */
930          if (prev_jobid == 0) {  /* first jobid */
931             if (init) {
932                query.strcat(" UNION ");
933             }
934          } else {               /* end last job, start new one */
935             tmp.strcat(") UNION ");
936             query.strcat(tmp.c_str());
937          }
938          Mmsg(tmp,   "SELECT JobId, JobTDate, FileIndex, FilenameId, "
939                             "PathId, FileId "
940                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld " 
941                         "AND FileIndex IN (%lld", jobid, id);
942          prev_jobid = jobid;
943
944       } else {                  /* same job, add new findex */
945          Mmsg(tmp2, ", %lld", id);
946          tmp.strcat(tmp2.c_str());
947       }
948    }
949
950    if (prev_jobid != 0) {       /* end last job */
951       tmp.strcat(") ");
952       query.strcat(tmp.c_str());
953       init = true;
954    }
955
956    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
957
958    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
959       Dmsg0(dbglevel, "Can't execute q\n");
960       goto bail_out;
961    }
962
963    /* TODO: handle basejob and SQLite3 */
964    Mmsg(query, sql_bvfs_select[db_get_type_index(db)], output_table, output_table);
965
966    /* TODO: handle jobid filter */
967    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
968    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
969       Dmsg0(dbglevel, "Can't execute q\n");
970       goto bail_out;
971    }
972
973    /* MySQL need it */
974    if (db_get_type_index(db) == SQL_TYPE_MYSQL) {
975       Mmsg(query, "CREATE INDEX idx_%s ON b2%s (JobId)", 
976            output_table, output_table);
977    }
978
979    ret = true;
980
981 bail_out:
982    Mmsg(query, "DROP TABLE btemp%s", output_table);
983    db_sql_query(db, query.c_str(), NULL, NULL);
984    return ret;
985 }
986
987 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI */