]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
bvfs: Tweak BaseJob code, should be ok now
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_versions = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71    username = NULL;
72 }
73
74 Bvfs::~Bvfs() {
75    free_pool_memory(jobids);
76    free_pool_memory(pattern);
77    free_pool_memory(prev_dir);
78    if (username) {
79       free(username);
80    }
81    free_attr(attr);
82    jcr->dec_use_count();
83 }
84
85 void Bvfs::filter_jobid()
86 {
87    if (!username) {
88       return;
89    }
90
91    /* Query used by Bweb to filter clients, activated when using
92     * set_username() 
93     */
94    POOL_MEM query;
95    Mmsg(query,
96       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
97         "JOIN (SELECT ClientId FROM client_group_member "
98         "JOIN client_group USING (client_group_id) "
99         "JOIN bweb_client_group_acl USING (client_group_id) "
100         "JOIN bweb_user USING (userid) "
101        "WHERE bweb_user.username = '%s' "
102       ") AS filter USING (ClientId) "
103         " WHERE JobId IN (%s)", 
104         username, jobids);
105
106    db_list_ctx ctx;
107    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
108    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
109    pm_strcpy(jobids, ctx.list);
110 }
111
112 void Bvfs::set_jobid(JobId_t id)
113 {
114    Mmsg(jobids, "%lld", (uint64_t)id);
115    filter_jobid();
116 }
117
118 void Bvfs::set_jobids(char *ids)
119 {
120    pm_strcpy(jobids, ids);
121    filter_jobid();
122 }
123
124 /* 
125  * TODO: Find a way to let the user choose how he wants to display
126  * files and directories
127  */
128
129
130 /* 
131  * Working Object to store PathId already seen (avoid
132  * database queries), equivalent to %cache_ppathid in perl
133  */
134
135 #define NITEMS 50000
136 class pathid_cache {
137 private:
138    hlink *nodes;
139    int nb_node;
140    int max_node;
141
142    alist *table_node;
143
144    htable *cache_ppathid;
145
146 public:
147    pathid_cache() {
148       hlink link;
149       cache_ppathid = (htable *)malloc(sizeof(htable));
150       cache_ppathid->init(&link, &link, NITEMS);
151       max_node = NITEMS;
152       nodes = (hlink *) malloc(max_node * sizeof (hlink));
153       nb_node = 0;
154       table_node = New(alist(5, owned_by_alist));
155       table_node->append(nodes);
156    }
157
158    hlink *get_hlink() {
159       if (++nb_node >= max_node) {
160          nb_node = 0;
161          nodes = (hlink *)malloc(max_node * sizeof(hlink));
162          table_node->append(nodes);
163       }
164       return nodes + nb_node;
165    }
166
167    bool lookup(char *pathid) {
168       bool ret = cache_ppathid->lookup(pathid) != NULL;
169       return ret;
170    }
171    
172    void insert(char *pathid) {
173       hlink *h = get_hlink();
174       cache_ppathid->insert(pathid, h);
175    }
176
177    ~pathid_cache() {
178       cache_ppathid->destroy();
179       free(cache_ppathid);
180       delete table_node;
181    }
182 private:
183    pathid_cache(const pathid_cache &); /* prohibit pass by value */
184    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
185 } ;
186
187 /* Return the parent_dir with the trailing /  (update the given string)
188  * TODO: see in the rest of bacula if we don't have already this function
189  * dir=/tmp/toto/
190  * dir=/tmp/
191  * dir=/
192  * dir=
193  */
194 char *bvfs_parent_dir(char *path)
195 {
196    char *p = path;
197    int len = strlen(path) - 1;
198
199    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
200       path[len] = '\0';
201    }
202
203    if (len > 0) {
204       p += len;
205       while (p > path && !IsPathSeparator(*p)) {
206          p--;
207       }
208       p[1] = '\0';
209    }
210    return path;
211 }
212
213 /* Return the basename of the with the trailing /
214  * TODO: see in the rest of bacula if we don't have
215  * this function already
216  */
217 char *bvfs_basename_dir(char *path)
218 {
219    char *p = path;
220    int len = strlen(path) - 1;
221
222    if (path[len] == '/') {      /* if directory, skip last / */
223       len -= 1;
224    }
225
226    if (len > 0) {
227       p += len;
228       while (p > path && !IsPathSeparator(*p)) {
229          p--;
230       }
231       if (*p == '/') {
232          p++;                  /* skip first / */
233       }
234    } 
235    return p;
236 }
237
238 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
239                                  pathid_cache &ppathid_cache, 
240                                  char *org_pathid, char *path)
241 {
242    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
243    char pathid[50];
244    ATTR_DBR parent;
245    char *bkp = mdb->path;
246    strncpy(pathid, org_pathid, sizeof(pathid));
247
248    /* Does the ppathid exist for this ? we use a memory cache...  In order to
249     * avoid the full loop, we consider that if a dir is allready in the
250     * PathHierarchy table, then there is no need to calculate all the
251     * hierarchy
252     */
253    while (path && *path)
254    {
255       if (!ppathid_cache.lookup(pathid))
256       {
257          Mmsg(mdb->cmd, 
258               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
259               pathid);
260
261          QUERY_DB(jcr, mdb, mdb->cmd);
262          /* Do we have a result ? */
263          if (sql_num_rows(mdb) > 0) {
264             ppathid_cache.insert(pathid);
265             /* This dir was in the db ...
266              * It means we can leave, the tree has allready been built for
267              * this dir
268              */
269             goto bail_out;
270          } else {
271             /* search or create parent PathId in Path table */
272             mdb->path = bvfs_parent_dir(path);
273             mdb->pnl = strlen(mdb->path);
274             if (!db_create_path_record(jcr, mdb, &parent)) {
275                goto bail_out;
276             }
277             ppathid_cache.insert(pathid);
278             
279             Mmsg(mdb->cmd,
280                  "INSERT INTO PathHierarchy (PathId, PPathId) "
281                  "VALUES (%s,%lld)",
282                  pathid, (uint64_t) parent.PathId);
283             
284             INSERT_DB(jcr, mdb, mdb->cmd);
285
286             edit_uint64(parent.PathId, pathid);
287             path = mdb->path;   /* already done */
288          }
289       } else {
290          /* It's already in the cache.  We can leave, no time to waste here,
291           * all the parent dirs have allready been done
292           */
293          goto bail_out;
294       }
295    }   
296
297 bail_out:
298    mdb->path = bkp;
299    mdb->fnl = 0;
300 }
301
302 /* 
303  * Internal function to update path_hierarchy cache with a shared pathid cache
304  */
305 static void update_path_hierarchy_cache(JCR *jcr,
306                                         B_DB *mdb,
307                                         pathid_cache &ppathid_cache,
308                                         JobId_t JobId)
309 {
310    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
311
312    uint32_t num;
313    char jobid[50];
314    edit_uint64(JobId, jobid);
315  
316    db_lock(mdb);
317    db_start_transaction(jcr, mdb);
318
319    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
320    
321    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
322       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
323       goto bail_out;
324    }
325
326    /* Inserting path records for JobId */
327    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
328                    "SELECT DISTINCT PathId, JobId "
329                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
330                            "UNION "
331                            "SELECT PathId, BaseFiles.JobId "
332                              "FROM BaseFiles JOIN File AS F USING (FileId) "
333                             "WHERE BaseFiles.JobId = %s) AS B",
334         jobid, jobid);
335    QUERY_DB(jcr, mdb, mdb->cmd);
336
337    /* Now we have to do the directory recursion stuff to determine missing
338     * visibility We try to avoid recursion, to be as fast as possible We also
339     * only work on not allready hierarchised directories...
340     */
341    Mmsg(mdb->cmd, 
342      "SELECT PathVisibility.PathId, Path "
343        "FROM PathVisibility "
344             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
345             "LEFT JOIN PathHierarchy "
346          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
347       "WHERE PathVisibility.JobId = %s "
348         "AND PathHierarchy.PathId IS NULL "
349       "ORDER BY Path", jobid);
350    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
351    QUERY_DB(jcr, mdb, mdb->cmd);
352
353    /* TODO: I need to reuse the DB connection without emptying the result 
354     * So, now i'm copying the result in memory to be able to query the
355     * catalog descriptor again.
356     */
357    num = sql_num_rows(mdb);
358    if (num > 0) {
359       char **result = (char **)malloc (num * 2 * sizeof(char *));
360       
361       SQL_ROW row;
362       int i=0;
363       while((row = sql_fetch_row(mdb))) {
364          result[i++] = bstrdup(row[0]);
365          result[i++] = bstrdup(row[1]);
366       }
367       
368       i=0;
369       while (num > 0) {
370          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
371          free(result[i++]);
372          free(result[i++]);
373          num--;
374       }
375       free(result);
376    }
377
378    Mmsg(mdb->cmd, 
379   "INSERT INTO PathVisibility (PathId, JobId)  "
380    "SELECT a.PathId,%s "
381    "FROM ( "
382      "SELECT DISTINCT h.PPathId AS PathId "
383        "FROM PathHierarchy AS h "
384        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
385       "WHERE p.JobId=%s) AS a LEFT JOIN "
386        "(SELECT PathId "
387           "FROM PathVisibility "
388          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
389    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
390
391    do {
392       QUERY_DB(jcr, mdb, mdb->cmd);
393    } while (sql_affected_rows(mdb) > 0);
394    
395    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
396    UPDATE_DB(jcr, mdb, mdb->cmd);
397
398 bail_out:
399    db_end_transaction(jcr, mdb);
400    db_unlock(mdb);
401 }
402
403 /* 
404  * Find an store the filename descriptor for empty directories Filename.Name=''
405  */
406 DBId_t Bvfs::get_dir_filenameid()
407 {
408    uint32_t id;
409    if (dir_filenameid) {
410       return dir_filenameid;
411    }
412    POOL_MEM q;
413    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
414    db_sql_query(db, q.c_str(), db_int_handler, &id);
415    dir_filenameid = id;
416    return dir_filenameid;
417 }
418
419 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
420 {
421    uint32_t nb=0;
422    db_list_ctx jobids_list;
423
424    db_lock(mdb);
425    db_start_transaction(jcr, mdb);
426
427 #ifdef xxx
428    /* TODO: Remove this code when updating make_bacula_table script */
429    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
430    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
431       Dmsg0(dbglevel, "Creating cache table\n");
432       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
433       QUERY_DB(jcr, mdb, mdb->cmd);
434
435       Mmsg(mdb->cmd,
436            "CREATE TABLE PathHierarchy ( "
437            "PathId integer NOT NULL, "
438            "PPathId integer NOT NULL, "
439            "CONSTRAINT pathhierarchy_pkey "
440            "PRIMARY KEY (PathId))");
441       QUERY_DB(jcr, mdb, mdb->cmd); 
442
443       Mmsg(mdb->cmd,
444            "CREATE INDEX pathhierarchy_ppathid "
445            "ON PathHierarchy (PPathId)");
446       QUERY_DB(jcr, mdb, mdb->cmd);
447
448       Mmsg(mdb->cmd, 
449            "CREATE TABLE PathVisibility ("
450            "PathId integer NOT NULL, "
451            "JobId integer NOT NULL, "
452            "Size int8 DEFAULT 0, "
453            "Files int4 DEFAULT 0, "
454            "CONSTRAINT pathvisibility_pkey "
455            "PRIMARY KEY (JobId, PathId))");
456       QUERY_DB(jcr, mdb, mdb->cmd);
457
458       Mmsg(mdb->cmd, 
459            "CREATE INDEX pathvisibility_jobid "
460            "ON PathVisibility (JobId)");
461       QUERY_DB(jcr, mdb, mdb->cmd);
462
463    }
464 #endif
465
466    Mmsg(mdb->cmd, 
467  "SELECT JobId from Job "
468   "WHERE HasCache = 0 "
469     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
470   "ORDER BY JobId");
471
472    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
473
474    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
475
476    db_end_transaction(jcr, mdb);
477    db_start_transaction(jcr, mdb);
478    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
479    Mmsg(mdb->cmd, 
480         "DELETE FROM PathVisibility "
481          "WHERE NOT EXISTS "
482         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
483    nb = DELETE_DB(jcr, mdb, mdb->cmd);
484    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
485
486    db_end_transaction(jcr, mdb);
487    db_unlock(mdb);
488 }
489
490 /*
491  * Update the bvfs cache for given jobids (1,2,3,4)
492  */
493 void
494 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
495 {
496    pathid_cache ppathid_cache;
497    JobId_t JobId;
498    char *p;
499
500    for (p=jobids; ; ) {
501       int stat = get_next_jobid_from_list(&p, &JobId);
502       if (stat < 0) {
503          return;
504       }
505       if (stat == 0) {
506          break;
507       }
508       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
509       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
510    }
511 }
512
513 /* 
514  * Update the bvfs cache for current jobids
515  */
516 void Bvfs::update_cache()
517 {
518    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
519 }
520
521 /* Change the current directory, returns true if the path exists */
522 bool Bvfs::ch_dir(const char *path)
523 {
524    pm_strcpy(db->path, path);
525    db->pnl = strlen(db->path);
526    ch_dir(db_get_path_record(jcr, db)); 
527    return pwd_id != 0;
528 }
529
530 /* 
531  * Get all file versions for a specified client
532  * TODO: Handle basejobs using different client
533  */
534 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
535 {
536    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
537          (uint64_t)fnid, client);
538    char ed1[50], ed2[50];
539    POOL_MEM q;
540    if (see_copies) {
541       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
542    } else {
543       Mmsg(q, " AND Job.Type = 'B' ");
544    }
545
546    POOL_MEM query;
547
548    Mmsg(query,//    1           2              3       
549 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
550 //         4          5           6
551         "File.JobId, File.LStat, File.FileId, "
552 //         7                    8
553        "Media.VolumeName, Media.InChanger "
554 "FROM File, Job, Client, JobMedia, Media "
555 "WHERE File.FilenameId = %s "
556   "AND File.PathId=%s "
557   "AND File.JobId = Job.JobId "
558   "AND Job.JobId = JobMedia.JobId "
559   "AND File.FileIndex >= JobMedia.FirstIndex "
560   "AND File.FileIndex <= JobMedia.LastIndex "
561   "AND JobMedia.MediaId = Media.MediaId "
562   "AND Job.ClientId = Client.ClientId "
563   "AND Client.Name = '%s' "
564   "%s ORDER BY FileId LIMIT %d OFFSET %d"
565         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
566         limit, offset);
567    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
568    db_sql_query(db, query.c_str(), list_entries, user_data);
569 }
570
571 DBId_t Bvfs::get_root()
572 {
573    *db->path = 0;
574    return db_get_path_record(jcr, db);
575 }
576
577 static int path_handler(void *ctx, int fields, char **row)
578 {
579    Bvfs *fs = (Bvfs *) ctx;
580    return fs->_handle_path(ctx, fields, row);
581 }
582
583 int Bvfs::_handle_path(void *ctx, int fields, char **row)
584 {
585    if (bvfs_is_dir(row)) {
586       /* can have the same path 2 times */
587       if (strcmp(row[BVFS_Name], prev_dir)) {
588          pm_strcpy(prev_dir, row[BVFS_Name]);
589          return list_entries(user_data, fields, row);
590       }
591    }
592    return 0;
593 }
594
595 /* 
596  * Retrieve . and .. information
597  */
598 void Bvfs::ls_special_dirs()
599 {
600    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
601    char ed1[50], ed2[50];
602    if (*jobids == 0) {
603       return;
604    }
605    if (!dir_filenameid) {
606       get_dir_filenameid();
607    }
608
609    /* Will fetch directories  */
610    *prev_dir = 0;
611
612    POOL_MEM query;
613    Mmsg(query, 
614 "((SELECT PPathId AS PathId, '..' AS Path "
615     "FROM  PathHierarchy "
616    "WHERE  PathId = %s) "
617 "UNION "
618  "(SELECT %s AS PathId, '.' AS Path))",
619         edit_uint64(pwd_id, ed1), ed1);
620
621    POOL_MEM query2;
622    Mmsg(query2,// 1      2     3        4     5       6
623 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
624   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
625        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
626               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
627        "WHERE File1.FilenameId = %s "
628        "AND File1.JobId IN (%s)) AS listfile1 "
629   "ON (tmp.PathId = listfile1.PathId) "
630   "ORDER BY tmp.Path, JobId DESC ",
631         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
632
633    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
634    db_sql_query(db, query2.c_str(), path_handler, this);
635 }
636
637 /* Returns true if we have dirs to read */
638 bool Bvfs::ls_dirs()
639 {
640    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
641    char ed1[50], ed2[50];
642    if (*jobids == 0) {
643       return false;
644    }
645
646    POOL_MEM filter;
647    if (*pattern) {
648       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
649    }
650
651    if (!dir_filenameid) {
652       get_dir_filenameid();
653    }
654
655    /* the sql query displays same directory multiple time, take the first one */
656    *prev_dir = 0;
657
658    /* Let's retrieve the list of the visible dirs in this dir ...
659     * First, I need the empty filenameid to locate efficiently
660     * the dirs in the file table
661     * my $dir_filenameid = $self->get_dir_filenameid();
662     */
663    /* Then we get all the dir entries from File ... */
664    POOL_MEM query;
665    Mmsg(query,
666 //       0     1     2   3      4     5       6
667 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
668     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
669            "lower(Path1.Path) AS lpath, "
670            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
671            "listfile1.FileId AS FileId "
672     "FROM ( "
673       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
674       "FROM PathHierarchy AS PathHierarchy1 "
675       "JOIN Path AS Path2 "
676         "ON (PathHierarchy1.PathId = Path2.PathId) "
677       "JOIN PathVisibility AS PathVisibility1 "
678         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
679       "WHERE PathHierarchy1.PPathId = %s "
680       "AND PathVisibility1.jobid IN (%s) "
681            "%s "
682      ") AS listpath1 "
683    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
684
685    "LEFT JOIN ( " /* get attributes if any */
686        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
687               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
688        "WHERE File1.FilenameId = %s "
689        "AND File1.JobId IN (%s)) AS listfile1 "
690        "ON (listpath1.PathId = listfile1.PathId) "
691     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
692         edit_uint64(pwd_id, ed1),
693         jobids,
694         filter.c_str(),
695         edit_uint64(dir_filenameid, ed2),
696         jobids,
697         limit, offset);
698
699    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
700
701    db_lock(db);
702    db_sql_query(db, query.c_str(), path_handler, this);
703    nb_record = db->num_rows;
704    db_unlock(db);
705
706    return nb_record == limit;
707 }
708
709 void build_ls_files_query(B_DB *db, POOL_MEM &query, 
710                           const char *JobId, const char *PathId,  
711                           const char *filter, int64_t limit, int64_t offset)
712 {
713    if (db_type == SQL_TYPE_POSTGRESQL) {
714       Mmsg(query, sql_bvfs_list_files[db_type], 
715            JobId, PathId, JobId, PathId, 
716            filter, limit, offset);
717    } else {
718       Mmsg(query, sql_bvfs_list_files[db_type], 
719            JobId, PathId, JobId, PathId, 
720            limit, offset, filter, JobId, JobId);
721    }
722 }
723
724 /* Returns true if we have files to read */
725 bool Bvfs::ls_files()
726 {
727    POOL_MEM query;
728    POOL_MEM filter;
729    char pathid[50];
730
731    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
732    if (*jobids == 0) {
733       return false;
734    }
735
736    if (!pwd_id) {
737       ch_dir(get_root());
738    }
739
740    edit_uint64(pwd_id, pathid);
741    if (*pattern) {
742       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
743    }
744
745    build_ls_files_query(db, query, 
746                         jobids, pathid, filter.c_str(),
747                         limit, offset);
748
749    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
750
751    db_lock(db);
752    db_sql_query(db, query.c_str(), list_entries, user_data);
753    nb_record = db->num_rows;
754    db_unlock(db);
755
756    return nb_record == limit;
757 }
758
759
760 /* 
761  * Return next Id from comma separated list   
762  *
763  * Returns:
764  *   1 if next Id returned
765  *   0 if no more Ids are in list
766  *  -1 there is an error
767  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
768  */
769 static int get_next_id_from_list(char **p, int64_t *Id)
770 {
771    const int maxlen = 30;
772    char id[maxlen+1];
773    char *q = *p;
774
775    id[0] = 0;
776    for (int i=0; i<maxlen; i++) {
777       if (*q == 0) {
778          break;
779       } else if (*q == ',') {
780          q++;
781          break;
782       }
783       id[i] = *q++;
784       id[i+1] = 0;
785    }
786    if (id[0] == 0) {
787       return 0;
788    } else if (!is_a_number(id)) {
789       return -1;                      /* error */
790    }
791    *p = q;
792    *Id = str_to_int64(id);
793    return 1;
794 }
795
796 static int get_path_handler(void *ctx, int fields, char **row)
797 {
798    POOL_MEM *buf = (POOL_MEM *) ctx;
799    pm_strcpy(*buf, row[0]);
800    return 0;
801 }
802
803 static bool check_temp(char *output_table)
804 {
805    if (output_table[0] == 'b' &&
806        output_table[1] == '2' &&
807        is_an_integer(output_table + 2))
808    {
809       return true;
810    }
811    return false;
812 }
813
814 bool Bvfs::drop_restore_list(char *output_table)
815 {
816    POOL_MEM query;
817    if (check_temp(output_table)) {
818       Mmsg(query, "DROP TABLE %s", output_table);
819       db_sql_query(db, query.c_str(), NULL, NULL);
820       return true;
821    }
822    return false;
823 }
824
825 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
826                                 char *output_table)
827 {
828    POOL_MEM query;
829    POOL_MEM tmp, tmp2;
830    int64_t id, jobid;
831    bool init=false;
832    bool ret=false;
833    /* check args */
834    if ((*fileid   && !is_a_number_list(fileid))  ||
835        (*dirid    && !is_a_number_list(dirid))   ||
836        (*hardlink && !is_a_number_list(hardlink))||
837        (!*hardlink && !*fileid && !*dirid && !*hardlink))
838    {
839       return false;
840    }
841    if (!check_temp(output_table)) {
842       return false;
843    }
844
845    Mmsg(query, "CREATE TEMPORARY TABLE btemp%s AS ", output_table);
846
847    if (*fileid) {               /* Select files with their direct id */
848       init=true;
849       Mmsg(tmp,"(SELECT JobId, JobTDate, FileIndex, FilenameId, PathId, FileId "
850                   "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s))",
851            fileid);
852       pm_strcat(query, tmp.c_str());
853    }
854
855    /* Add a directory content */
856    while (get_next_id_from_list(&dirid, &id) == 1) {
857       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
858       
859       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
860          Dmsg0(dbglevel, "Can't search for path\n");
861          /* print error */
862          return false;
863       }
864       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
865          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
866                id, tmp.c_str(), tmp2.c_str());
867          break;
868       }
869       /* escape % and _ for LIKE search */
870       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
871       char *p = tmp.c_str();
872       for (char *s = tmp2.c_str(); *s ; s++) {
873          if (*s == '%' || *s == '_' || *s == '\\') {
874             *p = '\\'; 
875             p++;
876          }
877          *p = *s; 
878          p++;
879       }
880       *p = '\0';
881       tmp.strcat("%");
882
883       size_t len = strlen(tmp.c_str());
884       tmp2.check_size((len+1) * 2);
885       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
886
887       if (init) {
888          query.strcat(" UNION ");
889       }
890
891       Mmsg(tmp, "(SELECT JobId, JobTDate, File.FileIndex, File.FilenameId, "
892                         "File.PathId, FileId "
893                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
894                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s)) ", 
895            tmp2.c_str(), jobids); 
896       query.strcat(tmp.c_str());
897       init = true;
898
899       query.strcat(" UNION ");
900
901       /* A directory can have files from a BaseJob */
902       Mmsg(tmp, "(SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
903                         "File.FilenameId, File.PathId, BaseFiles.FileId "
904                    "FROM BaseFiles "
905                         "JOIN File USING (FileId) "
906                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
907                         "JOIN Path USING (PathId) "
908                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s)) ", 
909            tmp2.c_str(), jobids); 
910       query.strcat(tmp.c_str());
911    }
912
913    /* expect jobid,fileindex */
914    int64_t prev_jobid=0;
915    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
916       if (get_next_id_from_list(&hardlink, &id) != 1) {
917          Dmsg0(dbglevel, "hardlink should be two by two\n");
918          return false;
919       }
920       if (jobid != prev_jobid) { /* new job */
921          if (prev_jobid == 0) {  /* first jobid */
922             if (init) {
923                query.strcat(" UNION ");
924             }
925          } else {               /* end last job, start new one */
926             tmp.strcat(")) UNION ");
927             query.strcat(tmp.c_str());
928          }
929          Mmsg(tmp, "(SELECT JobId, JobTDate, FileIndex, FilenameId, "
930                            "PathId, FileId "
931                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld " 
932                         "AND FileIndex IN (%lld", jobid, id);
933          prev_jobid = jobid;
934
935       } else {                  /* same job, add new findex */
936          Mmsg(tmp2, ", %lld", id);
937          tmp.strcat(tmp2.c_str());
938       }
939    }
940
941    if (prev_jobid != 0) {       /* end last job */
942       tmp.strcat(")) ");
943       query.strcat(tmp.c_str());
944       init = true;
945    }
946
947    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
948
949    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
950       Dmsg0(dbglevel, "Can't execute q\n");
951       goto bail_out;
952    }
953
954    /* TODO: handle basejob and SQLite3 */
955    Mmsg(query, sql_bvfs_select[db_type], output_table, output_table);
956
957    /* TODO: handle jobid filter */
958    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
959    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
960       Dmsg0(dbglevel, "Can't execute q\n");
961       goto bail_out;
962    }
963
964    /* MySQL need it */
965    if (db_type == SQL_TYPE_MYSQL) {
966       Mmsg(query, "CREATE INDEX idx_%s ON b2%s (JobId)", 
967            output_table, output_table);
968    }
969
970    ret = true;
971
972 bail_out:
973    Mmsg(query, "DROP TABLE btemp%s", output_table);
974    db_sql_query(db, query.c_str(), NULL, NULL);
975    return ret;
976 }