]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Modify MySQL accurate query with Delta
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_versions = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71    username = NULL;
72 }
73
74 Bvfs::~Bvfs() {
75    free_pool_memory(jobids);
76    free_pool_memory(pattern);
77    free_pool_memory(prev_dir);
78    if (username) {
79       free(username);
80    }
81    free_attr(attr);
82    jcr->dec_use_count();
83 }
84
85 void Bvfs::filter_jobid()
86 {
87    if (!username) {
88       return;
89    }
90
91    /* Query used by Bweb to filter clients, activated when using
92     * set_username() 
93     */
94    POOL_MEM query;
95    Mmsg(query,
96       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
97         "JOIN (SELECT ClientId FROM client_group_member "
98         "JOIN client_group USING (client_group_id) "
99         "JOIN bweb_client_group_acl USING (client_group_id) "
100         "JOIN bweb_user USING (userid) "
101        "WHERE bweb_user.username = '%s' "
102       ") AS filter USING (ClientId) "
103         " WHERE JobId IN (%s)", 
104         username, jobids);
105
106    db_list_ctx ctx;
107    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
108    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
109    pm_strcpy(jobids, ctx.list);
110 }
111
112 void Bvfs::set_jobid(JobId_t id)
113 {
114    Mmsg(jobids, "%lld", (uint64_t)id);
115    filter_jobid();
116 }
117
118 void Bvfs::set_jobids(char *ids)
119 {
120    pm_strcpy(jobids, ids);
121    filter_jobid();
122 }
123
124 /* 
125  * TODO: Find a way to let the user choose how he wants to display
126  * files and directories
127  */
128
129
130 /* 
131  * Working Object to store PathId already seen (avoid
132  * database queries), equivalent to %cache_ppathid in perl
133  */
134
135 #define NITEMS 50000
136 class pathid_cache {
137 private:
138    hlink *nodes;
139    int nb_node;
140    int max_node;
141
142    alist *table_node;
143
144    htable *cache_ppathid;
145
146 public:
147    pathid_cache() {
148       hlink link;
149       cache_ppathid = (htable *)malloc(sizeof(htable));
150       cache_ppathid->init(&link, &link, NITEMS);
151       max_node = NITEMS;
152       nodes = (hlink *) malloc(max_node * sizeof (hlink));
153       nb_node = 0;
154       table_node = New(alist(5, owned_by_alist));
155       table_node->append(nodes);
156    }
157
158    hlink *get_hlink() {
159       if (++nb_node >= max_node) {
160          nb_node = 0;
161          nodes = (hlink *)malloc(max_node * sizeof(hlink));
162          table_node->append(nodes);
163       }
164       return nodes + nb_node;
165    }
166
167    bool lookup(char *pathid) {
168       bool ret = cache_ppathid->lookup(pathid) != NULL;
169       return ret;
170    }
171    
172    void insert(char *pathid) {
173       hlink *h = get_hlink();
174       cache_ppathid->insert(pathid, h);
175    }
176
177    ~pathid_cache() {
178       cache_ppathid->destroy();
179       free(cache_ppathid);
180       delete table_node;
181    }
182 private:
183    pathid_cache(const pathid_cache &); /* prohibit pass by value */
184    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
185 } ;
186
187 /* Return the parent_dir with the trailing /  (update the given string)
188  * TODO: see in the rest of bacula if we don't have already this function
189  * dir=/tmp/toto/
190  * dir=/tmp/
191  * dir=/
192  * dir=
193  */
194 char *bvfs_parent_dir(char *path)
195 {
196    char *p = path;
197    int len = strlen(path) - 1;
198
199    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
200       path[len] = '\0';
201    }
202
203    if (len > 0) {
204       p += len;
205       while (p > path && !IsPathSeparator(*p)) {
206          p--;
207       }
208       p[1] = '\0';
209    }
210    return path;
211 }
212
213 /* Return the basename of the with the trailing /
214  * TODO: see in the rest of bacula if we don't have
215  * this function already
216  */
217 char *bvfs_basename_dir(char *path)
218 {
219    char *p = path;
220    int len = strlen(path) - 1;
221
222    if (path[len] == '/') {      /* if directory, skip last / */
223       len -= 1;
224    }
225
226    if (len > 0) {
227       p += len;
228       while (p > path && !IsPathSeparator(*p)) {
229          p--;
230       }
231       if (*p == '/') {
232          p++;                  /* skip first / */
233       }
234    } 
235    return p;
236 }
237
238 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
239                                  pathid_cache &ppathid_cache, 
240                                  char *org_pathid, char *path)
241 {
242    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
243    char pathid[50];
244    ATTR_DBR parent;
245    char *bkp = mdb->path;
246    strncpy(pathid, org_pathid, sizeof(pathid));
247
248    /* Does the ppathid exist for this ? we use a memory cache...  In order to
249     * avoid the full loop, we consider that if a dir is allready in the
250     * PathHierarchy table, then there is no need to calculate all the
251     * hierarchy
252     */
253    while (path && *path)
254    {
255       if (!ppathid_cache.lookup(pathid))
256       {
257          Mmsg(mdb->cmd, 
258               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
259               pathid);
260
261          QUERY_DB(jcr, mdb, mdb->cmd);
262          /* Do we have a result ? */
263          if (sql_num_rows(mdb) > 0) {
264             ppathid_cache.insert(pathid);
265             /* This dir was in the db ...
266              * It means we can leave, the tree has allready been built for
267              * this dir
268              */
269             goto bail_out;
270          } else {
271             /* search or create parent PathId in Path table */
272             mdb->path = bvfs_parent_dir(path);
273             mdb->pnl = strlen(mdb->path);
274             if (!db_create_path_record(jcr, mdb, &parent)) {
275                goto bail_out;
276             }
277             ppathid_cache.insert(pathid);
278             
279             Mmsg(mdb->cmd,
280                  "INSERT INTO PathHierarchy (PathId, PPathId) "
281                  "VALUES (%s,%lld)",
282                  pathid, (uint64_t) parent.PathId);
283             
284             INSERT_DB(jcr, mdb, mdb->cmd);
285
286             edit_uint64(parent.PathId, pathid);
287             path = mdb->path;   /* already done */
288          }
289       } else {
290          /* It's already in the cache.  We can leave, no time to waste here,
291           * all the parent dirs have allready been done
292           */
293          goto bail_out;
294       }
295    }   
296
297 bail_out:
298    mdb->path = bkp;
299    mdb->fnl = 0;
300 }
301
302 /* 
303  * Internal function to update path_hierarchy cache with a shared pathid cache
304  */
305 static void update_path_hierarchy_cache(JCR *jcr,
306                                         B_DB *mdb,
307                                         pathid_cache &ppathid_cache,
308                                         JobId_t JobId)
309 {
310    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
311
312    uint32_t num;
313    char jobid[50];
314    edit_uint64(JobId, jobid);
315  
316    db_lock(mdb);
317    db_start_transaction(jcr, mdb);
318
319    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
320    
321    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
322       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
323       goto bail_out;
324    }
325
326    /* Inserting path records for JobId */
327    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
328                    "SELECT DISTINCT PathId, JobId "
329                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
330                            "UNION "
331                            "SELECT PathId, BaseFiles.JobId "
332                              "FROM BaseFiles JOIN File AS F USING (FileId) "
333                             "WHERE BaseFiles.JobId = %s) AS B",
334         jobid, jobid);
335    QUERY_DB(jcr, mdb, mdb->cmd);
336
337    /* Now we have to do the directory recursion stuff to determine missing
338     * visibility We try to avoid recursion, to be as fast as possible We also
339     * only work on not allready hierarchised directories...
340     */
341    Mmsg(mdb->cmd, 
342      "SELECT PathVisibility.PathId, Path "
343        "FROM PathVisibility "
344             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
345             "LEFT JOIN PathHierarchy "
346          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
347       "WHERE PathVisibility.JobId = %s "
348         "AND PathHierarchy.PathId IS NULL "
349       "ORDER BY Path", jobid);
350    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
351    QUERY_DB(jcr, mdb, mdb->cmd);
352
353    /* TODO: I need to reuse the DB connection without emptying the result 
354     * So, now i'm copying the result in memory to be able to query the
355     * catalog descriptor again.
356     */
357    num = sql_num_rows(mdb);
358    if (num > 0) {
359       char **result = (char **)malloc (num * 2 * sizeof(char *));
360       
361       SQL_ROW row;
362       int i=0;
363       while((row = sql_fetch_row(mdb))) {
364          result[i++] = bstrdup(row[0]);
365          result[i++] = bstrdup(row[1]);
366       }
367       
368       i=0;
369       while (num > 0) {
370          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
371          free(result[i++]);
372          free(result[i++]);
373          num--;
374       }
375       free(result);
376    }
377
378    Mmsg(mdb->cmd, 
379   "INSERT INTO PathVisibility (PathId, JobId)  "
380    "SELECT a.PathId,%s "
381    "FROM ( "
382      "SELECT DISTINCT h.PPathId AS PathId "
383        "FROM PathHierarchy AS h "
384        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
385       "WHERE p.JobId=%s) AS a LEFT JOIN "
386        "(SELECT PathId "
387           "FROM PathVisibility "
388          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
389    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
390
391    do {
392       QUERY_DB(jcr, mdb, mdb->cmd);
393    } while (sql_affected_rows(mdb) > 0);
394    
395    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
396    UPDATE_DB(jcr, mdb, mdb->cmd);
397
398 bail_out:
399    db_end_transaction(jcr, mdb);
400    db_unlock(mdb);
401 }
402
403 /* 
404  * Find an store the filename descriptor for empty directories Filename.Name=''
405  */
406 DBId_t Bvfs::get_dir_filenameid()
407 {
408    uint32_t id;
409    if (dir_filenameid) {
410       return dir_filenameid;
411    }
412    POOL_MEM q;
413    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
414    db_sql_query(db, q.c_str(), db_int_handler, &id);
415    dir_filenameid = id;
416    return dir_filenameid;
417 }
418
419 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
420 {
421    uint32_t nb=0;
422    db_list_ctx jobids_list;
423
424    db_lock(mdb);
425    db_start_transaction(jcr, mdb);
426
427 #ifdef xxx
428    /* TODO: Remove this code when updating make_bacula_table script */
429    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
430    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
431       Dmsg0(dbglevel, "Creating cache table\n");
432       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
433       QUERY_DB(jcr, mdb, mdb->cmd);
434
435       Mmsg(mdb->cmd,
436            "CREATE TABLE PathHierarchy ( "
437            "PathId integer NOT NULL, "
438            "PPathId integer NOT NULL, "
439            "CONSTRAINT pathhierarchy_pkey "
440            "PRIMARY KEY (PathId))");
441       QUERY_DB(jcr, mdb, mdb->cmd); 
442
443       Mmsg(mdb->cmd,
444            "CREATE INDEX pathhierarchy_ppathid "
445            "ON PathHierarchy (PPathId)");
446       QUERY_DB(jcr, mdb, mdb->cmd);
447
448       Mmsg(mdb->cmd, 
449            "CREATE TABLE PathVisibility ("
450            "PathId integer NOT NULL, "
451            "JobId integer NOT NULL, "
452            "Size int8 DEFAULT 0, "
453            "Files int4 DEFAULT 0, "
454            "CONSTRAINT pathvisibility_pkey "
455            "PRIMARY KEY (JobId, PathId))");
456       QUERY_DB(jcr, mdb, mdb->cmd);
457
458       Mmsg(mdb->cmd, 
459            "CREATE INDEX pathvisibility_jobid "
460            "ON PathVisibility (JobId)");
461       QUERY_DB(jcr, mdb, mdb->cmd);
462
463    }
464 #endif
465
466    Mmsg(mdb->cmd, 
467  "SELECT JobId from Job "
468   "WHERE HasCache = 0 "
469     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
470   "ORDER BY JobId");
471
472    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
473
474    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
475
476    db_end_transaction(jcr, mdb);
477    db_start_transaction(jcr, mdb);
478    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
479    Mmsg(mdb->cmd, 
480         "DELETE FROM PathVisibility "
481          "WHERE NOT EXISTS "
482         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
483    nb = DELETE_DB(jcr, mdb, mdb->cmd);
484    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
485
486    db_end_transaction(jcr, mdb);
487    db_unlock(mdb);
488 }
489
490 /*
491  * Update the bvfs cache for given jobids (1,2,3,4)
492  */
493 void
494 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
495 {
496    pathid_cache ppathid_cache;
497    JobId_t JobId;
498    char *p;
499
500    for (p=jobids; ; ) {
501       int stat = get_next_jobid_from_list(&p, &JobId);
502       if (stat < 0) {
503          return;
504       }
505       if (stat == 0) {
506          break;
507       }
508       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
509       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
510    }
511 }
512
513 /* 
514  * Update the bvfs cache for current jobids
515  */
516 void Bvfs::update_cache()
517 {
518    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
519 }
520
521 /* Change the current directory, returns true if the path exists */
522 bool Bvfs::ch_dir(const char *path)
523 {
524    pm_strcpy(db->path, path);
525    db->pnl = strlen(db->path);
526    ch_dir(db_get_path_record(jcr, db)); 
527    return pwd_id != 0;
528 }
529
530 /* 
531  * Get all file versions for a specified client
532  * TODO: Handle basejobs using different client
533  */
534 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
535 {
536    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
537          (uint64_t)fnid, client);
538    char ed1[50], ed2[50];
539    POOL_MEM q;
540    if (see_copies) {
541       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
542    } else {
543       Mmsg(q, " AND Job.Type = 'B' ");
544    }
545
546    POOL_MEM query;
547
548    Mmsg(query,//    1           2              3       
549 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
550 //         4          5           6
551         "File.JobId, File.LStat, File.FileId, "
552 //         7                    8
553        "Media.VolumeName, Media.InChanger "
554 "FROM File, Job, Client, JobMedia, Media "
555 "WHERE File.FilenameId = %s "
556   "AND File.PathId=%s "
557   "AND File.JobId = Job.JobId "
558   "AND Job.JobId = JobMedia.JobId "
559   "AND File.FileIndex >= JobMedia.FirstIndex "
560   "AND File.FileIndex <= JobMedia.LastIndex "
561   "AND JobMedia.MediaId = Media.MediaId "
562   "AND Job.ClientId = Client.ClientId "
563   "AND Client.Name = '%s' "
564   "%s ORDER BY FileId LIMIT %d OFFSET %d"
565         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
566         limit, offset);
567    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
568    db_sql_query(db, query.c_str(), list_entries, user_data);
569 }
570
571 DBId_t Bvfs::get_root()
572 {
573    *db->path = 0;
574    return db_get_path_record(jcr, db);
575 }
576
577 static int path_handler(void *ctx, int fields, char **row)
578 {
579    Bvfs *fs = (Bvfs *) ctx;
580    return fs->_handle_path(ctx, fields, row);
581 }
582
583 int Bvfs::_handle_path(void *ctx, int fields, char **row)
584 {
585    if (bvfs_is_dir(row)) {
586       /* can have the same path 2 times */
587       if (strcmp(row[BVFS_Name], prev_dir)) {
588          pm_strcpy(prev_dir, row[BVFS_Name]);
589          return list_entries(user_data, fields, row);
590       }
591    }
592    return 0;
593 }
594
595 /* 
596  * Retrieve . and .. information
597  */
598 void Bvfs::ls_special_dirs()
599 {
600    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
601    char ed1[50], ed2[50];
602    if (*jobids == 0) {
603       return;
604    }
605    if (!dir_filenameid) {
606       get_dir_filenameid();
607    }
608
609    /* Will fetch directories  */
610    *prev_dir = 0;
611
612    POOL_MEM query;
613    Mmsg(query, 
614 "((SELECT PPathId AS PathId, '..' AS Path "
615     "FROM  PathHierarchy "
616    "WHERE  PathId = %s) "
617 "UNION "
618  "(SELECT %s AS PathId, '.' AS Path))",
619         edit_uint64(pwd_id, ed1), ed1);
620
621    POOL_MEM query2;
622    Mmsg(query2,// 1      2     3        4     5       6
623 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
624   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
625        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
626               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
627        "WHERE File1.FilenameId = %s "
628        "AND File1.JobId IN (%s)) AS listfile1 "
629   "ON (tmp.PathId = listfile1.PathId) "
630   "ORDER BY tmp.Path, JobId DESC ",
631         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
632
633    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
634    db_sql_query(db, query2.c_str(), path_handler, this);
635 }
636
637 /* Returns true if we have dirs to read */
638 bool Bvfs::ls_dirs()
639 {
640    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
641    char ed1[50], ed2[50];
642    if (*jobids == 0) {
643       return false;
644    }
645
646    POOL_MEM query;
647    POOL_MEM filter;
648    if (*pattern) {
649       int len = strlen(pattern);
650       query.check_size(len*2+1);
651       db_escape_string(jcr, db, query.c_str(), pattern, len);
652       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, query.c_str());
653    }
654
655    if (!dir_filenameid) {
656       get_dir_filenameid();
657    }
658
659    /* the sql query displays same directory multiple time, take the first one */
660    *prev_dir = 0;
661
662    /* Let's retrieve the list of the visible dirs in this dir ...
663     * First, I need the empty filenameid to locate efficiently
664     * the dirs in the file table
665     * my $dir_filenameid = $self->get_dir_filenameid();
666     */
667    /* Then we get all the dir entries from File ... */
668    Mmsg(query,
669 //       0     1     2   3      4     5       6
670 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
671     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
672            "lower(Path1.Path) AS lpath, "
673            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
674            "listfile1.FileId AS FileId "
675     "FROM ( "
676       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
677       "FROM PathHierarchy AS PathHierarchy1 "
678       "JOIN Path AS Path2 "
679         "ON (PathHierarchy1.PathId = Path2.PathId) "
680       "JOIN PathVisibility AS PathVisibility1 "
681         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
682       "WHERE PathHierarchy1.PPathId = %s "
683       "AND PathVisibility1.jobid IN (%s) "
684            "%s "
685      ") AS listpath1 "
686    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
687
688    "LEFT JOIN ( " /* get attributes if any */
689        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
690               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
691        "WHERE File1.FilenameId = %s "
692        "AND File1.JobId IN (%s)) AS listfile1 "
693        "ON (listpath1.PathId = listfile1.PathId) "
694     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
695         edit_uint64(pwd_id, ed1),
696         jobids,
697         filter.c_str(),
698         edit_uint64(dir_filenameid, ed2),
699         jobids,
700         limit, offset);
701
702    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
703
704    db_lock(db);
705    db_sql_query(db, query.c_str(), path_handler, this);
706    nb_record = db->num_rows;
707    db_unlock(db);
708
709    return nb_record == limit;
710 }
711
712 void build_ls_files_query(B_DB *db, POOL_MEM &query, 
713                           const char *JobId, const char *PathId,  
714                           const char *filter, int64_t limit, int64_t offset)
715 {
716    if (db_type == SQL_TYPE_POSTGRESQL) {
717       Mmsg(query, sql_bvfs_list_files[db_type], 
718            JobId, PathId, JobId, PathId, 
719            filter, limit, offset);
720    } else {
721       Mmsg(query, sql_bvfs_list_files[db_type], 
722            JobId, PathId, JobId, PathId, 
723            limit, offset, filter, JobId, JobId);
724    }
725 }
726
727 /* Returns true if we have files to read */
728 bool Bvfs::ls_files()
729 {
730    POOL_MEM query;
731    POOL_MEM filter;
732    char pathid[50];
733
734    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
735    if (*jobids == 0) {
736       return false;
737    }
738
739    if (!pwd_id) {
740       ch_dir(get_root());
741    }
742
743    edit_uint64(pwd_id, pathid);
744    if (*pattern) {
745       int len = strlen(pattern);
746       query.check_size(len*2+1);
747       db_escape_string(jcr, db, query.c_str(), pattern, len);
748       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, query.c_str());
749    }
750
751    build_ls_files_query(db, query, 
752                         jobids, pathid, filter.c_str(),
753                         limit, offset);
754
755    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
756
757    db_lock(db);
758    db_sql_query(db, query.c_str(), list_entries, user_data);
759    nb_record = db->num_rows;
760    db_unlock(db);
761
762    return nb_record == limit;
763 }
764
765
766 /* 
767  * Return next Id from comma separated list   
768  *
769  * Returns:
770  *   1 if next Id returned
771  *   0 if no more Ids are in list
772  *  -1 there is an error
773  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
774  */
775 static int get_next_id_from_list(char **p, int64_t *Id)
776 {
777    const int maxlen = 30;
778    char id[maxlen+1];
779    char *q = *p;
780
781    id[0] = 0;
782    for (int i=0; i<maxlen; i++) {
783       if (*q == 0) {
784          break;
785       } else if (*q == ',') {
786          q++;
787          break;
788       }
789       id[i] = *q++;
790       id[i+1] = 0;
791    }
792    if (id[0] == 0) {
793       return 0;
794    } else if (!is_a_number(id)) {
795       return -1;                      /* error */
796    }
797    *p = q;
798    *Id = str_to_int64(id);
799    return 1;
800 }
801
802 static int get_path_handler(void *ctx, int fields, char **row)
803 {
804    POOL_MEM *buf = (POOL_MEM *) ctx;
805    pm_strcpy(*buf, row[0]);
806    return 0;
807 }
808
809 static bool check_temp(char *output_table)
810 {
811    if (output_table[0] == 'b' &&
812        output_table[1] == '2' &&
813        is_an_integer(output_table + 2))
814    {
815       return true;
816    }
817    return false;
818 }
819
820 bool Bvfs::drop_restore_list(char *output_table)
821 {
822    POOL_MEM query;
823    if (check_temp(output_table)) {
824       Mmsg(query, "DROP TABLE %s", output_table);
825       db_sql_query(db, query.c_str(), NULL, NULL);
826       return true;
827    }
828    return false;
829 }
830
831 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
832                                 char *output_table)
833 {
834    POOL_MEM query;
835    POOL_MEM tmp, tmp2;
836    int64_t id, jobid;
837    bool init=false;
838    bool ret=false;
839    /* check args */
840    if ((*fileid   && !is_a_number_list(fileid))  ||
841        (*dirid    && !is_a_number_list(dirid))   ||
842        (*hardlink && !is_a_number_list(hardlink))||
843        (!*hardlink && !*fileid && !*dirid && !*hardlink))
844    {
845       return false;
846    }
847    if (!check_temp(output_table)) {
848       return false;
849    }
850
851    Mmsg(query, "CREATE TEMPORARY TABLE btemp%s AS ", output_table);
852
853    if (*fileid) {               /* Select files with their direct id */
854       init=true;
855       Mmsg(tmp,"(SELECT JobId, JobTDate, FileIndex, FilenameId, PathId, FileId "
856                   "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s))",
857            fileid);
858       pm_strcat(query, tmp.c_str());
859    }
860
861    /* Add a directory content */
862    while (get_next_id_from_list(&dirid, &id) == 1) {
863       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
864       
865       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
866          Dmsg0(dbglevel, "Can't search for path\n");
867          /* print error */
868          return false;
869       }
870       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
871          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
872                id, tmp.c_str(), tmp2.c_str());
873          break;
874       }
875       /* escape % and _ for LIKE search */
876       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
877       char *p = tmp.c_str();
878       for (char *s = tmp2.c_str(); *s ; s++) {
879          if (*s == '%' || *s == '_' || *s == '\\') {
880             *p = '\\'; 
881             p++;
882          }
883          *p = *s; 
884          p++;
885       }
886       *p = '\0';
887       tmp.strcat("%");
888
889       size_t len = strlen(tmp.c_str());
890       tmp2.check_size((len+1) * 2);
891       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
892
893       if (init) {
894          query.strcat(" UNION ");
895       }
896
897       Mmsg(tmp, "(SELECT JobId, JobTDate, File.FileIndex, File.FilenameId, "
898                         "File.PathId, FileId "
899                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
900                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s)) ", 
901            tmp2.c_str(), jobids); 
902       query.strcat(tmp.c_str());
903       init = true;
904
905       query.strcat(" UNION ");
906
907       /* A directory can have files from a BaseJob */
908       Mmsg(tmp, "(SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
909                         "File.FilenameId, File.PathId, BaseFiles.FileId "
910                    "FROM BaseFiles "
911                         "JOIN File USING (FileId) "
912                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
913                         "JOIN Path USING (PathId) "
914                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s)) ", 
915            tmp2.c_str(), jobids); 
916       query.strcat(tmp.c_str());
917    }
918
919    /* expect jobid,fileindex */
920    int64_t prev_jobid=0;
921    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
922       if (get_next_id_from_list(&hardlink, &id) != 1) {
923          Dmsg0(dbglevel, "hardlink should be two by two\n");
924          return false;
925       }
926       if (jobid != prev_jobid) { /* new job */
927          if (prev_jobid == 0) {  /* first jobid */
928             if (init) {
929                query.strcat(" UNION ");
930             }
931          } else {               /* end last job, start new one */
932             tmp.strcat(")) UNION ");
933             query.strcat(tmp.c_str());
934          }
935          Mmsg(tmp, "(SELECT JobId, JobTDate, FileIndex, FilenameId, "
936                            "PathId, FileId "
937                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld " 
938                         "AND FileIndex IN (%lld", jobid, id);
939          prev_jobid = jobid;
940
941       } else {                  /* same job, add new findex */
942          Mmsg(tmp2, ", %lld", id);
943          tmp.strcat(tmp2.c_str());
944       }
945    }
946
947    if (prev_jobid != 0) {       /* end last job */
948       tmp.strcat(")) ");
949       query.strcat(tmp.c_str());
950       init = true;
951    }
952
953    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
954
955    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
956       Dmsg0(dbglevel, "Can't execute q\n");
957       goto bail_out;
958    }
959
960    /* TODO: handle basejob and SQLite3 */
961    Mmsg(query, sql_bvfs_select[db_type], output_table, output_table);
962
963    /* TODO: handle jobid filter */
964    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
965    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
966       Dmsg0(dbglevel, "Can't execute q\n");
967       goto bail_out;
968    }
969
970    /* MySQL need it */
971    if (db_type == SQL_TYPE_MYSQL) {
972       Mmsg(query, "CREATE INDEX idx_%s ON b2%s (JobId)", 
973            output_table, output_table);
974    }
975
976    ret = true;
977
978 bail_out:
979    Mmsg(query, "DROP TABLE btemp%s", output_table);
980    db_sql_query(db, query.c_str(), NULL, NULL);
981    return ret;
982 }