]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
bvfs: support for bweb user acl
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_versions = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71    username = NULL;
72 }
73
74 Bvfs::~Bvfs() {
75    free_pool_memory(jobids);
76    free_pool_memory(pattern);
77    free_pool_memory(prev_dir);
78    if (username) {
79       free(username);
80    }
81    free_attr(attr);
82    jcr->dec_use_count();
83 }
84
85 void Bvfs::filter_jobid()
86 {
87    if (!username) {
88       return;
89    }
90
91    /* Query used by Bweb to filter clients, activated when using
92     * set_username() 
93     */
94    POOL_MEM query;
95    Mmsg(query,
96       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
97         "JOIN (SELECT ClientId FROM client_group_member "
98         "JOIN client_group USING (client_group_id) "
99         "JOIN bweb_client_group_acl USING (client_group_id) "
100         "JOIN bweb_user USING (userid) "
101        "WHERE bweb_user.username = '%s' "
102       ") AS filter USING (ClientId) "
103         " WHERE JobId IN (%s)", 
104         username, jobids);
105
106    db_list_ctx ctx;
107    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
108    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
109    pm_strcpy(jobids, ctx.list);
110 }
111
112 void Bvfs::set_jobid(JobId_t id)
113 {
114    Mmsg(jobids, "%lld", (uint64_t)id);
115    filter_jobid();
116 }
117
118 void Bvfs::set_jobids(char *ids)
119 {
120    pm_strcpy(jobids, ids);
121    filter_jobid();
122 }
123
124 /* 
125  * TODO: Find a way to let the user choose how he wants to display
126  * files and directories
127  */
128
129
130 /* 
131  * Working Object to store PathId already seen (avoid
132  * database queries), equivalent to %cache_ppathid in perl
133  */
134
135 #define NITEMS 50000
136 class pathid_cache {
137 private:
138    hlink *nodes;
139    int nb_node;
140    int max_node;
141
142    alist *table_node;
143
144    htable *cache_ppathid;
145
146 public:
147    pathid_cache() {
148       hlink link;
149       cache_ppathid = (htable *)malloc(sizeof(htable));
150       cache_ppathid->init(&link, &link, NITEMS);
151       max_node = NITEMS;
152       nodes = (hlink *) malloc(max_node * sizeof (hlink));
153       nb_node = 0;
154       table_node = New(alist(5, owned_by_alist));
155       table_node->append(nodes);
156    }
157
158    hlink *get_hlink() {
159       if (++nb_node >= max_node) {
160          nb_node = 0;
161          nodes = (hlink *)malloc(max_node * sizeof(hlink));
162          table_node->append(nodes);
163       }
164       return nodes + nb_node;
165    }
166
167    bool lookup(char *pathid) {
168       bool ret = cache_ppathid->lookup(pathid) != NULL;
169       return ret;
170    }
171    
172    void insert(char *pathid) {
173       hlink *h = get_hlink();
174       cache_ppathid->insert(pathid, h);
175    }
176
177    ~pathid_cache() {
178       cache_ppathid->destroy();
179       free(cache_ppathid);
180       delete table_node;
181    }
182 private:
183    pathid_cache(const pathid_cache &); /* prohibit pass by value */
184    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
185 } ;
186
187 /* Return the parent_dir with the trailing /  (update the given string)
188  * TODO: see in the rest of bacula if we don't have already this function
189  * dir=/tmp/toto/
190  * dir=/tmp/
191  * dir=/
192  * dir=
193  */
194 char *bvfs_parent_dir(char *path)
195 {
196    char *p = path;
197    int len = strlen(path) - 1;
198
199    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
200       path[len] = '\0';
201    }
202
203    if (len > 0) {
204       p += len;
205       while (p > path && !IsPathSeparator(*p)) {
206          p--;
207       }
208       p[1] = '\0';
209    }
210    return path;
211 }
212
213 /* Return the basename of the with the trailing /
214  * TODO: see in the rest of bacula if we don't have
215  * this function already
216  */
217 char *bvfs_basename_dir(char *path)
218 {
219    char *p = path;
220    int len = strlen(path) - 1;
221
222    if (path[len] == '/') {      /* if directory, skip last / */
223       len -= 1;
224    }
225
226    if (len > 0) {
227       p += len;
228       while (p > path && !IsPathSeparator(*p)) {
229          p--;
230       }
231       if (*p == '/') {
232          p++;                  /* skip first / */
233       }
234    } 
235    return p;
236 }
237
238 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
239                                  pathid_cache &ppathid_cache, 
240                                  char *org_pathid, char *path)
241 {
242    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
243    char pathid[50];
244    ATTR_DBR parent;
245    char *bkp = mdb->path;
246    strncpy(pathid, org_pathid, sizeof(pathid));
247
248    /* Does the ppathid exist for this ? we use a memory cache...  In order to
249     * avoid the full loop, we consider that if a dir is allready in the
250     * PathHierarchy table, then there is no need to calculate all the
251     * hierarchy
252     */
253    while (path && *path)
254    {
255       if (!ppathid_cache.lookup(pathid))
256       {
257          Mmsg(mdb->cmd, 
258               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
259               pathid);
260
261          QUERY_DB(jcr, mdb, mdb->cmd);
262          /* Do we have a result ? */
263          if (sql_num_rows(mdb) > 0) {
264             ppathid_cache.insert(pathid);
265             /* This dir was in the db ...
266              * It means we can leave, the tree has allready been built for
267              * this dir
268              */
269             goto bail_out;
270          } else {
271             /* search or create parent PathId in Path table */
272             mdb->path = bvfs_parent_dir(path);
273             mdb->pnl = strlen(mdb->path);
274             if (!db_create_path_record(jcr, mdb, &parent)) {
275                goto bail_out;
276             }
277             ppathid_cache.insert(pathid);
278             
279             Mmsg(mdb->cmd,
280                  "INSERT INTO PathHierarchy (PathId, PPathId) "
281                  "VALUES (%s,%lld)",
282                  pathid, (uint64_t) parent.PathId);
283             
284             INSERT_DB(jcr, mdb, mdb->cmd);
285
286             edit_uint64(parent.PathId, pathid);
287             path = mdb->path;   /* already done */
288          }
289       } else {
290          /* It's already in the cache.  We can leave, no time to waste here,
291           * all the parent dirs have allready been done
292           */
293          goto bail_out;
294       }
295    }   
296
297 bail_out:
298    mdb->path = bkp;
299    mdb->fnl = 0;
300 }
301
302 /* 
303  * Internal function to update path_hierarchy cache with a shared pathid cache
304  */
305 static void update_path_hierarchy_cache(JCR *jcr,
306                                         B_DB *mdb,
307                                         pathid_cache &ppathid_cache,
308                                         JobId_t JobId)
309 {
310    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
311
312    uint32_t num;
313    char jobid[50];
314    edit_uint64(JobId, jobid);
315  
316    db_lock(mdb);
317    db_start_transaction(jcr, mdb);
318
319    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
320    
321    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
322       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
323       goto bail_out;
324    }
325
326    /* Inserting path records for JobId */
327    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
328                    "SELECT DISTINCT PathId, JobId "
329                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
330                            "UNION "
331                            "SELECT PathId, BaseFiles.JobId "
332                              "FROM BaseFiles JOIN File AS F USING (FileId) "
333                             "WHERE BaseFiles.JobId = %s) AS B",
334         jobid, jobid);
335    QUERY_DB(jcr, mdb, mdb->cmd);
336
337    /* Now we have to do the directory recursion stuff to determine missing
338     * visibility We try to avoid recursion, to be as fast as possible We also
339     * only work on not allready hierarchised directories...
340     */
341    Mmsg(mdb->cmd, 
342      "SELECT PathVisibility.PathId, Path "
343        "FROM PathVisibility "
344             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
345             "LEFT JOIN PathHierarchy "
346          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
347       "WHERE PathVisibility.JobId = %s "
348         "AND PathHierarchy.PathId IS NULL "
349       "ORDER BY Path", jobid);
350    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
351    QUERY_DB(jcr, mdb, mdb->cmd);
352
353    /* TODO: I need to reuse the DB connection without emptying the result 
354     * So, now i'm copying the result in memory to be able to query the
355     * catalog descriptor again.
356     */
357    num = sql_num_rows(mdb);
358    if (num > 0) {
359       char **result = (char **)malloc (num * 2 * sizeof(char *));
360       
361       SQL_ROW row;
362       int i=0;
363       while((row = sql_fetch_row(mdb))) {
364          result[i++] = bstrdup(row[0]);
365          result[i++] = bstrdup(row[1]);
366       }
367       
368       i=0;
369       while (num > 0) {
370          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
371          free(result[i++]);
372          free(result[i++]);
373          num--;
374       }
375       free(result);
376    }
377
378    Mmsg(mdb->cmd, 
379   "INSERT INTO PathVisibility (PathId, JobId)  "
380    "SELECT a.PathId,%s "
381    "FROM ( "
382      "SELECT DISTINCT h.PPathId AS PathId "
383        "FROM PathHierarchy AS h "
384        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
385       "WHERE p.JobId=%s) AS a LEFT JOIN "
386        "(SELECT PathId "
387           "FROM PathVisibility "
388          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
389    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
390
391    do {
392       QUERY_DB(jcr, mdb, mdb->cmd);
393    } while (sql_affected_rows(mdb) > 0);
394    
395    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
396    UPDATE_DB(jcr, mdb, mdb->cmd);
397
398 bail_out:
399    db_end_transaction(jcr, mdb);
400    db_unlock(mdb);
401 }
402
403 /* 
404  * Find an store the filename descriptor for empty directories Filename.Name=''
405  */
406 DBId_t Bvfs::get_dir_filenameid()
407 {
408    uint32_t id;
409    if (dir_filenameid) {
410       return dir_filenameid;
411    }
412    POOL_MEM q;
413    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
414    db_sql_query(db, q.c_str(), db_int_handler, &id);
415    dir_filenameid = id;
416    return dir_filenameid;
417 }
418
419 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
420 {
421    uint32_t nb=0;
422    db_list_ctx jobids_list;
423
424    db_lock(mdb);
425    db_start_transaction(jcr, mdb);
426
427 #ifdef xxx
428    /* TODO: Remove this code when updating make_bacula_table script */
429    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
430    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
431       Dmsg0(dbglevel, "Creating cache table\n");
432       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
433       QUERY_DB(jcr, mdb, mdb->cmd);
434
435       Mmsg(mdb->cmd,
436            "CREATE TABLE PathHierarchy ( "
437            "PathId integer NOT NULL, "
438            "PPathId integer NOT NULL, "
439            "CONSTRAINT pathhierarchy_pkey "
440            "PRIMARY KEY (PathId))");
441       QUERY_DB(jcr, mdb, mdb->cmd); 
442
443       Mmsg(mdb->cmd,
444            "CREATE INDEX pathhierarchy_ppathid "
445            "ON PathHierarchy (PPathId)");
446       QUERY_DB(jcr, mdb, mdb->cmd);
447
448       Mmsg(mdb->cmd, 
449            "CREATE TABLE PathVisibility ("
450            "PathId integer NOT NULL, "
451            "JobId integer NOT NULL, "
452            "Size int8 DEFAULT 0, "
453            "Files int4 DEFAULT 0, "
454            "CONSTRAINT pathvisibility_pkey "
455            "PRIMARY KEY (JobId, PathId))");
456       QUERY_DB(jcr, mdb, mdb->cmd);
457
458       Mmsg(mdb->cmd, 
459            "CREATE INDEX pathvisibility_jobid "
460            "ON PathVisibility (JobId)");
461       QUERY_DB(jcr, mdb, mdb->cmd);
462
463    }
464 #endif
465
466    Mmsg(mdb->cmd, 
467  "SELECT JobId from Job "
468   "WHERE HasCache = 0 "
469     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
470   "ORDER BY JobId");
471
472    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
473
474    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
475
476    db_end_transaction(jcr, mdb);
477    db_start_transaction(jcr, mdb);
478    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
479    Mmsg(mdb->cmd, 
480         "DELETE FROM PathVisibility "
481          "WHERE NOT EXISTS "
482         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
483    nb = DELETE_DB(jcr, mdb, mdb->cmd);
484    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
485
486    db_end_transaction(jcr, mdb);
487    db_unlock(mdb);
488 }
489
490 /*
491  * Update the bvfs cache for given jobids (1,2,3,4)
492  */
493 void
494 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
495 {
496    pathid_cache ppathid_cache;
497    JobId_t JobId;
498    char *p;
499
500    for (p=jobids; ; ) {
501       int stat = get_next_jobid_from_list(&p, &JobId);
502       if (stat < 0) {
503          return;
504       }
505       if (stat == 0) {
506          break;
507       }
508       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
509       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
510    }
511 }
512
513 /* 
514  * Update the bvfs cache for current jobids
515  */
516 void Bvfs::update_cache()
517 {
518    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
519 }
520
521 /* Change the current directory, returns true if the path exists */
522 bool Bvfs::ch_dir(const char *path)
523 {
524    pm_strcpy(db->path, path);
525    db->pnl = strlen(db->path);
526    ch_dir(db_get_path_record(jcr, db)); 
527    return pwd_id != 0;
528 }
529
530 /* 
531  * Get all file versions for a specified client
532  * TODO: Handle basejobs using different client
533  */
534 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
535 {
536    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
537          (uint64_t)fnid, client);
538    char ed1[50], ed2[50];
539    POOL_MEM q;
540    if (see_copies) {
541       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
542    } else {
543       Mmsg(q, " AND Job.Type = 'B' ");
544    }
545
546    POOL_MEM query;
547
548    Mmsg(query,//    1           2              3       
549 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
550 //         4          5           6
551         "File.JobId, File.LStat, File.FileId, "
552 //         7                    8
553        "Media.VolumeName, Media.InChanger "
554 "FROM File, Job, Client, JobMedia, Media "
555 "WHERE File.FilenameId = %s "
556   "AND File.PathId=%s "
557   "AND File.JobId = Job.JobId "
558   "AND Job.JobId = JobMedia.JobId "
559   "AND File.FileIndex >= JobMedia.FirstIndex "
560   "AND File.FileIndex <= JobMedia.LastIndex "
561   "AND JobMedia.MediaId = Media.MediaId "
562   "AND Job.ClientId = Client.ClientId "
563   "AND Client.Name = '%s' "
564   "%s ORDER BY FileId LIMIT %d OFFSET %d"
565         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
566         limit, offset);
567    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
568    db_sql_query(db, query.c_str(), list_entries, user_data);
569 }
570
571 DBId_t Bvfs::get_root()
572 {
573    *db->path = 0;
574    return db_get_path_record(jcr, db);
575 }
576
577 static int path_handler(void *ctx, int fields, char **row)
578 {
579    Bvfs *fs = (Bvfs *) ctx;
580    return fs->_handle_path(ctx, fields, row);
581 }
582
583 int Bvfs::_handle_path(void *ctx, int fields, char **row)
584 {
585    if (bvfs_is_dir(row)) {
586       /* can have the same path 2 times */
587       if (strcmp(row[BVFS_Name], prev_dir)) {
588          pm_strcpy(prev_dir, row[BVFS_Name]);
589          return list_entries(user_data, fields, row);
590       }
591    }
592    return 0;
593 }
594
595 /* 
596  * Retrieve . and .. information
597  */
598 void Bvfs::ls_special_dirs()
599 {
600    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
601    char ed1[50], ed2[50];
602    if (*jobids == 0) {
603       return;
604    }
605    if (!dir_filenameid) {
606       get_dir_filenameid();
607    }
608
609    /* Will fetch directories  */
610    *prev_dir = 0;
611
612    POOL_MEM query;
613    Mmsg(query, 
614 "((SELECT PPathId AS PathId, '..' AS Path "
615     "FROM  PathHierarchy "
616    "WHERE  PathId = %s) "
617 "UNION "
618  "(SELECT %s AS PathId, '.' AS Path))",
619         edit_uint64(pwd_id, ed1), ed1);
620
621    POOL_MEM query2;
622    Mmsg(query2,// 1      2     3        4     5       6
623 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
624   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
625        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
626               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
627        "WHERE File1.FilenameId = %s "
628        "AND File1.JobId IN (%s)) AS listfile1 "
629   "ON (tmp.PathId = listfile1.PathId) "
630   "ORDER BY tmp.Path, JobId DESC ",
631         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
632
633    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
634    db_sql_query(db, query2.c_str(), path_handler, this);
635 }
636
637 /* Returns true if we have dirs to read */
638 bool Bvfs::ls_dirs()
639 {
640    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
641    char ed1[50], ed2[50];
642    if (*jobids == 0) {
643       return false;
644    }
645
646    POOL_MEM filter;
647    if (*pattern) {
648       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
649    }
650
651    if (!dir_filenameid) {
652       get_dir_filenameid();
653    }
654
655    /* the sql query displays same directory multiple time, take the first one */
656    *prev_dir = 0;
657
658    /* Let's retrieve the list of the visible dirs in this dir ...
659     * First, I need the empty filenameid to locate efficiently
660     * the dirs in the file table
661     * my $dir_filenameid = $self->get_dir_filenameid();
662     */
663    /* Then we get all the dir entries from File ... */
664    POOL_MEM query;
665    Mmsg(query,
666 //       0     1     2   3      4     5       6
667 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
668     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
669            "lower(Path1.Path) AS lpath, "
670            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
671            "listfile1.FileId AS FileId "
672     "FROM ( "
673       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
674       "FROM PathHierarchy AS PathHierarchy1 "
675       "JOIN Path AS Path2 "
676         "ON (PathHierarchy1.PathId = Path2.PathId) "
677       "JOIN PathVisibility AS PathVisibility1 "
678         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
679       "WHERE PathHierarchy1.PPathId = %s "
680       "AND PathVisibility1.jobid IN (%s) "
681            "%s "
682      ") AS listpath1 "
683    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
684
685    "LEFT JOIN ( " /* get attributes if any */
686        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
687               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
688        "WHERE File1.FilenameId = %s "
689        "AND File1.JobId IN (%s)) AS listfile1 "
690        "ON (listpath1.PathId = listfile1.PathId) "
691     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
692         edit_uint64(pwd_id, ed1),
693         jobids,
694         filter.c_str(),
695         edit_uint64(dir_filenameid, ed2),
696         jobids,
697         limit, offset);
698
699    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
700
701    db_lock(db);
702    db_sql_query(db, query.c_str(), path_handler, this);
703    nb_record = db->num_rows;
704    db_unlock(db);
705
706    return nb_record == limit;
707 }
708
709 /* Returns true if we have files to read */
710 bool Bvfs::ls_files()
711 {
712    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
713    char ed1[50];
714    if (*jobids == 0) {
715       return false;
716    }
717
718    if (!pwd_id) {
719       ch_dir(get_root());
720    }
721
722    POOL_MEM filter;
723    if (*pattern) {
724       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
725    }
726    /* TODO: Use JobTDate instead of FileId to determine the latest version */
727
728 /* Postgresql
729  SELECT DISTINCT ON (FilenameId) 'F', PathId, T.FilenameId, 
730   Filename.Name, JobId, LStat, FileId
731    FROM 
732        (SELECT FileId, JobId, PathId, FilenameId, FileIndex, LStat, MD5 
733           FROM File WHERE JobId IN (7) AND PathId = 9
734          UNION ALL 
735         SELECT File.FileId, File.JobId, PathId, FilenameId, 
736                File.FileIndex, LStat, MD5 
737           FROM BaseFiles JOIN File USING (FileId) 
738          WHERE BaseFiles.JobId IN (7) AND File.PathId = 9
739         ) AS T JOIN Job USING (JobId) JOIN Filename USING (FilenameId)
740    ORDER BY FilenameId, StartTime DESC
741
742 Mysql
743 SELECT FileId, Job.JobId AS JobId, FileIndex, File.PathId AS PathId, 
744        File.FilenameId AS FilenameId, Filename.Name, LStat, MD5 
745 FROM Job, File, ( 
746     SELECT MAX(JobTDate) AS JobTDate, PathId, FilenameId 
747       FROM ( 
748         SELECT JobTDate, PathId, FilenameId
749           FROM File JOIN Job USING (JobId)
750          WHERE File.JobId IN (7) AND PathId = 9
751           UNION ALL 
752         SELECT JobTDate, PathId, FilenameId 
753           FROM BaseFiles                 
754                JOIN File USING (FileId) 
755                JOIN Job  ON    (BaseJobId = Job.JobId) 
756          WHERE BaseFiles.JobId IN (7)   AND PathId = 9
757        ) AS tmp GROUP BY PathId, FilenameId
758     ) AS T1 JOIN Filename USING (FilenameId)
759 WHERE (Job.JobId IN (  
760         SELECT DISTINCT BaseJobId FROM BaseFiles WHERE JobId IN (7)) 
761         OR Job.JobId IN (7)) 
762   AND T1.JobTDate = Job.JobTDate
763   AND Job.JobId = File.JobId
764   AND T1.PathId = File.PathId 
765   AND T1.FilenameId = File.FilenameId
766
767
768
769 */
770
771    POOL_MEM query;
772    Mmsg(query, //    1              2             3          4
773 "SELECT 'F', File.PathId, File.FilenameId, listfiles.Name, File.JobId, "
774         "File.LStat, listfiles.id "
775 "FROM File, ( "
776        "SELECT Filename.Name as Name, max(File.FileId) as id "
777          "FROM File, Filename "
778         "WHERE File.FilenameId = Filename.FilenameId "
779           "AND Filename.Name != '' "
780           "AND File.PathId = %s "
781           "AND File.JobId IN (%s) "
782           "%s "
783         "GROUP BY Filename.Name "
784         "ORDER BY Filename.Name LIMIT %d OFFSET %d "
785      ") AS listfiles "
786 "WHERE File.FileId = listfiles.id",
787         edit_uint64(pwd_id, ed1),
788         jobids,
789         filter.c_str(),
790         limit,
791         offset);
792    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
793
794    db_lock(db);
795    db_sql_query(db, query.c_str(), list_entries, user_data);
796    nb_record = db->num_rows;
797    db_unlock(db);
798
799    return nb_record == limit;
800 }
801
802
803 /* 
804  * Return next Id from comma separated list   
805  *
806  * Returns:
807  *   1 if next Id returned
808  *   0 if no more Ids are in list
809  *  -1 there is an error
810  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
811  */
812 static int get_next_id_from_list(char **p, int64_t *Id)
813 {
814    const int maxlen = 30;
815    char id[maxlen+1];
816    char *q = *p;
817
818    id[0] = 0;
819    for (int i=0; i<maxlen; i++) {
820       if (*q == 0) {
821          break;
822       } else if (*q == ',') {
823          q++;
824          break;
825       }
826       id[i] = *q++;
827       id[i+1] = 0;
828    }
829    if (id[0] == 0) {
830       return 0;
831    } else if (!is_a_number(id)) {
832       return -1;                      /* error */
833    }
834    *p = q;
835    *Id = str_to_int64(id);
836    return 1;
837 }
838
839 static int get_path_handler(void *ctx, int fields, char **row)
840 {
841    POOL_MEM *buf = (POOL_MEM *) ctx;
842    pm_strcpy(*buf, row[0]);
843    return 0;
844 }
845
846 static bool check_temp(char *output_table)
847 {
848    if (output_table[0] == 'b' &&
849        output_table[1] == '2' &&
850        is_an_integer(output_table + 2))
851    {
852       return true;
853    }
854    return false;
855 }
856
857 bool Bvfs::drop_restore_list(char *output_table)
858 {
859    POOL_MEM query;
860    if (check_temp(output_table)) {
861       Mmsg(query, "DROP TABLE %s", output_table);
862       db_sql_query(db, query.c_str(), NULL, NULL);
863       return true;
864    }
865    return false;
866 }
867
868 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
869                                 char *output_table)
870 {
871    POOL_MEM query;
872    POOL_MEM tmp, tmp2;
873    int64_t id, jobid;
874    bool init=false;
875    bool ret=false;
876    /* check args */
877    if ((*fileid   && !is_a_number_list(fileid))  ||
878        (*dirid    && !is_a_number_list(dirid))   ||
879        (*hardlink && !is_a_number_list(hardlink))||
880        (!*hardlink && !*fileid && !*dirid && !*hardlink))
881    {
882       return false;
883    }
884    if (!check_temp(output_table)) {
885       return false;
886    }
887
888    Mmsg(query, "CREATE TEMPORARY TABLE btemp%s AS ", output_table);
889
890    if (*fileid) {
891       init=true;
892       Mmsg(tmp, "(SELECT JobId, FileIndex, FilenameId, PathId, FileId "
893                    "FROM File WHERE FileId IN (%s))", fileid);
894       pm_strcat(query, tmp.c_str());
895    }
896
897    while (get_next_id_from_list(&dirid, &id) == 1) {
898       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
899       
900       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
901          Dmsg0(dbglevel, "Can't search for path\n");
902          /* print error */
903          return false;
904       }
905       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
906          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
907                id, tmp.c_str(), tmp2.c_str());
908          break;
909       }
910       /* escape % and _ for LIKE search */
911       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
912       char *p = tmp.c_str();
913       for (char *s = tmp2.c_str(); *s ; s++) {
914          if (*s == '%' || *s == '_' || *s == '\\') {
915             *p = '\\'; 
916             p++;
917          }
918          *p = *s; 
919          p++;
920       }
921       *p = '\0';
922       tmp.strcat("%");
923
924       size_t len = strlen(tmp.c_str());
925       tmp2.check_size((len+1) * 2);
926       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
927
928       if (init) {
929          query.strcat(" UNION ");
930       }
931       Mmsg(tmp, "(SELECT File.JobId, File.FileIndex, File.FilenameId, "
932                         "File.PathId, FileId "
933                    "FROM Path JOIN File USING (PathId) "
934                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s)) ", 
935            tmp2.c_str(), jobids); 
936       query.strcat(tmp.c_str());
937       init = true;
938    }
939
940    /* expect jobid,fileindex */
941    int64_t prev_jobid=0;
942    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
943       if (get_next_id_from_list(&hardlink, &id) != 1) {
944          Dmsg0(dbglevel, "hardlink should be two by two\n");
945          return false;
946       }
947       if (jobid != prev_jobid) { /* new job */
948          if (prev_jobid == 0) {  /* first jobid */
949             if (init) {
950                query.strcat(" UNION ");
951             }
952          } else {               /* end last job, start new one */
953             tmp.strcat(")) UNION ");
954             query.strcat(tmp.c_str());
955          }
956          Mmsg(tmp, "(SELECT JobId, FileIndex, FilenameId, PathId, FileId "
957                        "FROM File WHERE JobId = %lld " 
958                         "AND FileIndex IN (%lld", jobid, id);
959          prev_jobid = jobid;
960
961       } else {                  /* same job, add new findex */
962          Mmsg(tmp2, ", %lld", id);
963          tmp.strcat(tmp2.c_str());
964       }
965    }
966
967    if (prev_jobid != 0) {       /* end last job */
968       tmp.strcat(")) ");
969       query.strcat(tmp.c_str());
970       init = true;
971    }
972
973    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
974
975    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
976       Dmsg0(dbglevel, "Can't execute q\n");
977       goto bail_out;
978    }
979
980    /* TODO: handle basejob and SQLite3 */
981    Mmsg(query, sql_bvfs_select[db_type], output_table, output_table);
982
983    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
984    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
985       Dmsg0(dbglevel, "Can't execute q\n");
986       goto bail_out;
987    }
988    ret = true;
989
990 bail_out:
991    Mmsg(query, "DROP TABLE btemp%s", output_table);
992    db_sql_query(db, query.c_str(), NULL, NULL);
993    return ret;
994 }