]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
bvfs: Handle windows drive when building path hierarchy
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #include "bacula.h"
30
31 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
32
33 #include "cats.h"
34 #include "bdb_priv.h"
35 #include "sql_glue.h"
36 #include "lib/htable.h"
37 #include "bvfs.h"
38
39 #define dbglevel 10
40 #define dbglevel_sql 15
41
42 static int result_handler(void *ctx, int fields, char **row)
43 {
44    if (fields == 4) {
45       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3]);
47    } else if (fields == 5) {
48       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4]);
50    } else if (fields == 6) {
51       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5]);
53    } else if (fields == 7) {
54       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
55             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
56    }
57    return 0;
58 }
59
60 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
61    jcr = j;
62    jcr->inc_use_count();
63    db = mdb;                 /* need to inc ref count */
64    jobids = get_pool_memory(PM_NAME);
65    prev_dir = get_pool_memory(PM_NAME);
66    pattern = get_pool_memory(PM_NAME);
67    *jobids = *prev_dir = *pattern = 0;
68    dir_filenameid = pwd_id = offset = 0;
69    see_copies = see_all_versions = false;
70    limit = 1000;
71    attr = new_attr(jcr);
72    list_entries = result_handler;
73    user_data = this;
74    username = NULL;
75 }
76
77 Bvfs::~Bvfs() {
78    free_pool_memory(jobids);
79    free_pool_memory(pattern);
80    free_pool_memory(prev_dir);
81    if (username) {
82       free(username);
83    }
84    free_attr(attr);
85    jcr->dec_use_count();
86 }
87
88 void Bvfs::filter_jobid()
89 {
90    if (!username) {
91       return;
92    }
93
94    /* Query used by Bweb to filter clients, activated when using
95     * set_username() 
96     */
97    POOL_MEM query;
98    Mmsg(query,
99       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
100         "JOIN (SELECT ClientId FROM client_group_member "
101         "JOIN client_group USING (client_group_id) "
102         "JOIN bweb_client_group_acl USING (client_group_id) "
103         "JOIN bweb_user USING (userid) "
104        "WHERE bweb_user.username = '%s' "
105       ") AS filter USING (ClientId) "
106         " WHERE JobId IN (%s)", 
107         username, jobids);
108
109    db_list_ctx ctx;
110    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
111    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
112    pm_strcpy(jobids, ctx.list);
113 }
114
115 void Bvfs::set_jobid(JobId_t id)
116 {
117    Mmsg(jobids, "%lld", (uint64_t)id);
118    filter_jobid();
119 }
120
121 void Bvfs::set_jobids(char *ids)
122 {
123    pm_strcpy(jobids, ids);
124    filter_jobid();
125 }
126
127 /* 
128  * TODO: Find a way to let the user choose how he wants to display
129  * files and directories
130  */
131
132
133 /* 
134  * Working Object to store PathId already seen (avoid
135  * database queries), equivalent to %cache_ppathid in perl
136  */
137
138 #define NITEMS 50000
139 class pathid_cache {
140 private:
141    hlink *nodes;
142    int nb_node;
143    int max_node;
144
145    alist *table_node;
146
147    htable *cache_ppathid;
148
149 public:
150    pathid_cache() {
151       hlink link;
152       cache_ppathid = (htable *)malloc(sizeof(htable));
153       cache_ppathid->init(&link, &link, NITEMS);
154       max_node = NITEMS;
155       nodes = (hlink *) malloc(max_node * sizeof (hlink));
156       nb_node = 0;
157       table_node = New(alist(5, owned_by_alist));
158       table_node->append(nodes);
159    }
160
161    hlink *get_hlink() {
162       if (++nb_node >= max_node) {
163          nb_node = 0;
164          nodes = (hlink *)malloc(max_node * sizeof(hlink));
165          table_node->append(nodes);
166       }
167       return nodes + nb_node;
168    }
169
170    bool lookup(char *pathid) {
171       bool ret = cache_ppathid->lookup(pathid) != NULL;
172       return ret;
173    }
174    
175    void insert(char *pathid) {
176       hlink *h = get_hlink();
177       cache_ppathid->insert(pathid, h);
178    }
179
180    ~pathid_cache() {
181       cache_ppathid->destroy();
182       free(cache_ppathid);
183       delete table_node;
184    }
185 private:
186    pathid_cache(const pathid_cache &); /* prohibit pass by value */
187    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
188 } ;
189
190 /* Return the parent_dir with the trailing /  (update the given string)
191  * TODO: see in the rest of bacula if we don't have already this function
192  * dir=/tmp/toto/
193  * dir=/tmp/
194  * dir=/
195  * dir=
196  */
197 char *bvfs_parent_dir(char *path)
198 {
199    char *p = path;
200    int len = strlen(path) - 1;
201
202    /* windows directory / */
203    if (len == 2 && B_ISALPHA(path[0]) 
204                 && path[1] == ':' 
205                 && path[2] == '/')
206    {
207       len = 0;
208       path[0] = '\0';
209    }
210
211    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
212       path[len] = '\0';
213    }
214
215    if (len > 0) {
216       p += len;
217       while (p > path && !IsPathSeparator(*p)) {
218          p--;
219       }
220       p[1] = '\0';
221    }
222    return path;
223 }
224
225 /* Return the basename of the with the trailing /
226  * TODO: see in the rest of bacula if we don't have
227  * this function already
228  */
229 char *bvfs_basename_dir(char *path)
230 {
231    char *p = path;
232    int len = strlen(path) - 1;
233
234    if (path[len] == '/') {      /* if directory, skip last / */
235       len -= 1;
236    }
237
238    if (len > 0) {
239       p += len;
240       while (p > path && !IsPathSeparator(*p)) {
241          p--;
242       }
243       if (*p == '/') {
244          p++;                  /* skip first / */
245       }
246    } 
247    return p;
248 }
249
250 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
251                                  pathid_cache &ppathid_cache, 
252                                  char *org_pathid, char *path)
253 {
254    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
255    char pathid[50];
256    ATTR_DBR parent;
257    char *bkp = mdb->path;
258    strncpy(pathid, org_pathid, sizeof(pathid));
259
260    /* Does the ppathid exist for this ? we use a memory cache...  In order to
261     * avoid the full loop, we consider that if a dir is allready in the
262     * PathHierarchy table, then there is no need to calculate all the
263     * hierarchy
264     */
265    while (path && *path)
266    {
267       if (!ppathid_cache.lookup(pathid))
268       {
269          Mmsg(mdb->cmd, 
270               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
271               pathid);
272
273          QUERY_DB(jcr, mdb, mdb->cmd);
274          /* Do we have a result ? */
275          if (sql_num_rows(mdb) > 0) {
276             ppathid_cache.insert(pathid);
277             /* This dir was in the db ...
278              * It means we can leave, the tree has allready been built for
279              * this dir
280              */
281             goto bail_out;
282          } else {
283             /* search or create parent PathId in Path table */
284             mdb->path = bvfs_parent_dir(path);
285             mdb->pnl = strlen(mdb->path);
286             if (!db_create_path_record(jcr, mdb, &parent)) {
287                goto bail_out;
288             }
289             ppathid_cache.insert(pathid);
290             
291             Mmsg(mdb->cmd,
292                  "INSERT INTO PathHierarchy (PathId, PPathId) "
293                  "VALUES (%s,%lld)",
294                  pathid, (uint64_t) parent.PathId);
295             
296             INSERT_DB(jcr, mdb, mdb->cmd);
297
298             edit_uint64(parent.PathId, pathid);
299             path = mdb->path;   /* already done */
300          }
301       } else {
302          /* It's already in the cache.  We can leave, no time to waste here,
303           * all the parent dirs have allready been done
304           */
305          goto bail_out;
306       }
307    }   
308
309 bail_out:
310    mdb->path = bkp;
311    mdb->fnl = 0;
312 }
313
314 /* 
315  * Internal function to update path_hierarchy cache with a shared pathid cache
316  */
317 static void update_path_hierarchy_cache(JCR *jcr,
318                                         B_DB *mdb,
319                                         pathid_cache &ppathid_cache,
320                                         JobId_t JobId)
321 {
322    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
323
324    uint32_t num;
325    char jobid[50];
326    edit_uint64(JobId, jobid);
327  
328    db_lock(mdb);
329    db_start_transaction(jcr, mdb);
330
331    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
332    
333    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
334       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
335       goto bail_out;
336    }
337
338    /* Inserting path records for JobId */
339    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
340                    "SELECT DISTINCT PathId, JobId "
341                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
342                            "UNION "
343                            "SELECT PathId, BaseFiles.JobId "
344                              "FROM BaseFiles JOIN File AS F USING (FileId) "
345                             "WHERE BaseFiles.JobId = %s) AS B",
346         jobid, jobid);
347    QUERY_DB(jcr, mdb, mdb->cmd);
348
349    /* Now we have to do the directory recursion stuff to determine missing
350     * visibility We try to avoid recursion, to be as fast as possible We also
351     * only work on not allready hierarchised directories...
352     */
353    Mmsg(mdb->cmd, 
354      "SELECT PathVisibility.PathId, Path "
355        "FROM PathVisibility "
356             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
357             "LEFT JOIN PathHierarchy "
358          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
359       "WHERE PathVisibility.JobId = %s "
360         "AND PathHierarchy.PathId IS NULL "
361       "ORDER BY Path", jobid);
362    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
363    QUERY_DB(jcr, mdb, mdb->cmd);
364
365    /* TODO: I need to reuse the DB connection without emptying the result 
366     * So, now i'm copying the result in memory to be able to query the
367     * catalog descriptor again.
368     */
369    num = sql_num_rows(mdb);
370    if (num > 0) {
371       char **result = (char **)malloc (num * 2 * sizeof(char *));
372       
373       SQL_ROW row;
374       int i=0;
375       while((row = sql_fetch_row(mdb))) {
376          result[i++] = bstrdup(row[0]);
377          result[i++] = bstrdup(row[1]);
378       }
379       
380       i=0;
381       while (num > 0) {
382          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
383          free(result[i++]);
384          free(result[i++]);
385          num--;
386       }
387       free(result);
388    }
389
390    if (mdb->db_get_type_index() == SQL_TYPE_SQLITE3) {
391       Mmsg(mdb->cmd, 
392  "INSERT INTO PathVisibility (PathId, JobId) "
393    "SELECT DISTINCT h.PPathId AS PathId, %s "
394      "FROM PathHierarchy AS h "
395     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
396       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
397            jobid, jobid, jobid );
398
399    } else {
400       Mmsg(mdb->cmd, 
401   "INSERT INTO PathVisibility (PathId, JobId)  "
402    "SELECT a.PathId,%s "
403    "FROM ( "
404      "SELECT DISTINCT h.PPathId AS PathId "
405        "FROM PathHierarchy AS h "
406        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
407       "WHERE p.JobId=%s) AS a LEFT JOIN "
408        "(SELECT PathId "
409           "FROM PathVisibility "
410          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
411    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
412    }
413
414    do {
415       QUERY_DB(jcr, mdb, mdb->cmd);
416    } while (sql_affected_rows(mdb) > 0);
417    
418    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
419    UPDATE_DB(jcr, mdb, mdb->cmd);
420
421 bail_out:
422    db_end_transaction(jcr, mdb);
423    db_unlock(mdb);
424 }
425
426 /* 
427  * Find an store the filename descriptor for empty directories Filename.Name=''
428  */
429 DBId_t Bvfs::get_dir_filenameid()
430 {
431    uint32_t id;
432    if (dir_filenameid) {
433       return dir_filenameid;
434    }
435    POOL_MEM q;
436    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
437    db_sql_query(db, q.c_str(), db_int_handler, &id);
438    dir_filenameid = id;
439    return dir_filenameid;
440 }
441
442 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
443 {
444    uint32_t nb=0;
445    db_list_ctx jobids_list;
446
447    db_lock(mdb);
448    db_start_transaction(jcr, mdb);
449
450 #ifdef xxx
451    /* TODO: Remove this code when updating make_bacula_table script */
452    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
453    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
454       Dmsg0(dbglevel, "Creating cache table\n");
455       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
456       QUERY_DB(jcr, mdb, mdb->cmd);
457
458       Mmsg(mdb->cmd,
459            "CREATE TABLE PathHierarchy ( "
460            "PathId integer NOT NULL, "
461            "PPathId integer NOT NULL, "
462            "CONSTRAINT pathhierarchy_pkey "
463            "PRIMARY KEY (PathId))");
464       QUERY_DB(jcr, mdb, mdb->cmd); 
465
466       Mmsg(mdb->cmd,
467            "CREATE INDEX pathhierarchy_ppathid "
468            "ON PathHierarchy (PPathId)");
469       QUERY_DB(jcr, mdb, mdb->cmd);
470
471       Mmsg(mdb->cmd, 
472            "CREATE TABLE PathVisibility ("
473            "PathId integer NOT NULL, "
474            "JobId integer NOT NULL, "
475            "Size int8 DEFAULT 0, "
476            "Files int4 DEFAULT 0, "
477            "CONSTRAINT pathvisibility_pkey "
478            "PRIMARY KEY (JobId, PathId))");
479       QUERY_DB(jcr, mdb, mdb->cmd);
480
481       Mmsg(mdb->cmd, 
482            "CREATE INDEX pathvisibility_jobid "
483            "ON PathVisibility (JobId)");
484       QUERY_DB(jcr, mdb, mdb->cmd);
485
486    }
487 #endif
488
489    Mmsg(mdb->cmd, 
490  "SELECT JobId from Job "
491   "WHERE HasCache = 0 "
492     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
493   "ORDER BY JobId");
494
495    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
496
497    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
498
499    db_end_transaction(jcr, mdb);
500    db_start_transaction(jcr, mdb);
501    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
502    Mmsg(mdb->cmd, 
503         "DELETE FROM PathVisibility "
504          "WHERE NOT EXISTS "
505         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
506    nb = DELETE_DB(jcr, mdb, mdb->cmd);
507    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
508
509    db_end_transaction(jcr, mdb);
510    db_unlock(mdb);
511 }
512
513 /*
514  * Update the bvfs cache for given jobids (1,2,3,4)
515  */
516 void
517 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
518 {
519    pathid_cache ppathid_cache;
520    JobId_t JobId;
521    char *p;
522
523    for (p=jobids; ; ) {
524       int stat = get_next_jobid_from_list(&p, &JobId);
525       if (stat < 0) {
526          return;
527       }
528       if (stat == 0) {
529          break;
530       }
531       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
532       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
533    }
534 }
535
536 /* 
537  * Update the bvfs cache for current jobids
538  */
539 void Bvfs::update_cache()
540 {
541    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
542 }
543
544 /* Change the current directory, returns true if the path exists */
545 bool Bvfs::ch_dir(const char *path)
546 {
547    pm_strcpy(db->path, path);
548    db->pnl = strlen(db->path);
549    ch_dir(db_get_path_record(jcr, db)); 
550    return pwd_id != 0;
551 }
552
553 /* 
554  * Get all file versions for a specified client
555  * TODO: Handle basejobs using different client
556  */
557 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
558 {
559    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
560          (uint64_t)fnid, client);
561    char ed1[50], ed2[50];
562    POOL_MEM q;
563    if (see_copies) {
564       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
565    } else {
566       Mmsg(q, " AND Job.Type = 'B' ");
567    }
568
569    POOL_MEM query;
570
571    Mmsg(query,//    1           2              3       
572 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
573 //         4          5           6
574         "File.JobId, File.LStat, File.FileId, "
575 //         7                    8
576        "Media.VolumeName, Media.InChanger "
577 "FROM File, Job, Client, JobMedia, Media "
578 "WHERE File.FilenameId = %s "
579   "AND File.PathId=%s "
580   "AND File.JobId = Job.JobId "
581   "AND Job.JobId = JobMedia.JobId "
582   "AND File.FileIndex >= JobMedia.FirstIndex "
583   "AND File.FileIndex <= JobMedia.LastIndex "
584   "AND JobMedia.MediaId = Media.MediaId "
585   "AND Job.ClientId = Client.ClientId "
586   "AND Client.Name = '%s' "
587   "%s ORDER BY FileId LIMIT %d OFFSET %d"
588         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
589         limit, offset);
590    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
591    db_sql_query(db, query.c_str(), list_entries, user_data);
592 }
593
594 DBId_t Bvfs::get_root()
595 {
596    *db->path = 0;
597    return db_get_path_record(jcr, db);
598 }
599
600 static int path_handler(void *ctx, int fields, char **row)
601 {
602    Bvfs *fs = (Bvfs *) ctx;
603    return fs->_handle_path(ctx, fields, row);
604 }
605
606 int Bvfs::_handle_path(void *ctx, int fields, char **row)
607 {
608    if (bvfs_is_dir(row)) {
609       /* can have the same path 2 times */
610       if (strcmp(row[BVFS_Name], prev_dir)) {
611          pm_strcpy(prev_dir, row[BVFS_Name]);
612          return list_entries(user_data, fields, row);
613       }
614    }
615    return 0;
616 }
617
618 /* 
619  * Retrieve . and .. information
620  */
621 void Bvfs::ls_special_dirs()
622 {
623    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
624    char ed1[50], ed2[50];
625    if (*jobids == 0) {
626       return;
627    }
628    if (!dir_filenameid) {
629       get_dir_filenameid();
630    }
631
632    /* Will fetch directories  */
633    *prev_dir = 0;
634
635    POOL_MEM query;
636    Mmsg(query, 
637 "(SELECT PPathId AS PathId, '..' AS Path "
638     "FROM  PathHierarchy "
639    "WHERE  PathId = %s "
640 "UNION "
641  "SELECT %s AS PathId, '.' AS Path)",
642         edit_uint64(pwd_id, ed1), ed1);
643
644    POOL_MEM query2;
645    Mmsg(query2,// 1      2     3        4     5       6
646 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
647   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
648        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
649               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
650        "WHERE File1.FilenameId = %s "
651        "AND File1.JobId IN (%s)) AS listfile1 "
652   "ON (tmp.PathId = listfile1.PathId) "
653   "ORDER BY tmp.Path, JobId DESC ",
654         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
655
656    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
657    db_sql_query(db, query2.c_str(), path_handler, this);
658 }
659
660 /* Returns true if we have dirs to read */
661 bool Bvfs::ls_dirs()
662 {
663    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
664    char ed1[50], ed2[50];
665    if (*jobids == 0) {
666       return false;
667    }
668
669    POOL_MEM query;
670    POOL_MEM filter;
671    if (*pattern) {
672       Mmsg(filter, " AND Path2.Path %s '%s' ", 
673            match_query[db_get_type_index(db)], pattern);
674    }
675
676    if (!dir_filenameid) {
677       get_dir_filenameid();
678    }
679
680    /* the sql query displays same directory multiple time, take the first one */
681    *prev_dir = 0;
682
683    /* Let's retrieve the list of the visible dirs in this dir ...
684     * First, I need the empty filenameid to locate efficiently
685     * the dirs in the file table
686     * my $dir_filenameid = $self->get_dir_filenameid();
687     */
688    /* Then we get all the dir entries from File ... */
689    Mmsg(query,
690 //       0     1     2   3      4     5       6
691 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
692     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
693            "lower(Path1.Path) AS lpath, "
694            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
695            "listfile1.FileId AS FileId "
696     "FROM ( "
697       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
698       "FROM PathHierarchy AS PathHierarchy1 "
699       "JOIN Path AS Path2 "
700         "ON (PathHierarchy1.PathId = Path2.PathId) "
701       "JOIN PathVisibility AS PathVisibility1 "
702         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
703       "WHERE PathHierarchy1.PPathId = %s "
704       "AND PathVisibility1.JobId IN (%s) "
705            "%s "
706      ") AS listpath1 "
707    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
708
709    "LEFT JOIN ( " /* get attributes if any */
710        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
711               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
712        "WHERE File1.FilenameId = %s "
713        "AND File1.JobId IN (%s)) AS listfile1 "
714        "ON (listpath1.PathId = listfile1.PathId) "
715     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
716         edit_uint64(pwd_id, ed1),
717         jobids,
718         filter.c_str(),
719         edit_uint64(dir_filenameid, ed2),
720         jobids,
721         limit, offset);
722
723    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
724
725    db_lock(db);
726    db_sql_query(db, query.c_str(), path_handler, this);
727    nb_record = sql_num_rows(db);
728    db_unlock(db);
729
730    return nb_record == limit;
731 }
732
733 void build_ls_files_query(B_DB *db, POOL_MEM &query, 
734                           const char *JobId, const char *PathId,  
735                           const char *filter, int64_t limit, int64_t offset)
736 {
737    if (db_get_type_index(db) == SQL_TYPE_POSTGRESQL) {
738       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
739            JobId, PathId, JobId, PathId, 
740            filter, limit, offset);
741    } else {
742       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
743            JobId, PathId, JobId, PathId, 
744            limit, offset, filter, JobId, JobId);
745    }
746 }
747
748 /* Returns true if we have files to read */
749 bool Bvfs::ls_files()
750 {
751    POOL_MEM query;
752    POOL_MEM filter;
753    char pathid[50];
754
755    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
756    if (*jobids == 0) {
757       return false;
758    }
759
760    if (!pwd_id) {
761       ch_dir(get_root());
762    }
763
764    edit_uint64(pwd_id, pathid);
765    if (*pattern) {
766       Mmsg(filter, " AND Filename.Name %s '%s' ", 
767            match_query[db_get_type_index(db)], pattern);
768    }
769
770    build_ls_files_query(db, query, 
771                         jobids, pathid, filter.c_str(),
772                         limit, offset);
773
774    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
775
776    db_lock(db);
777    db_sql_query(db, query.c_str(), list_entries, user_data);
778    nb_record = sql_num_rows(db);
779    db_unlock(db);
780
781    return nb_record == limit;
782 }
783
784
785 /* 
786  * Return next Id from comma separated list   
787  *
788  * Returns:
789  *   1 if next Id returned
790  *   0 if no more Ids are in list
791  *  -1 there is an error
792  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
793  */
794 static int get_next_id_from_list(char **p, int64_t *Id)
795 {
796    const int maxlen = 30;
797    char id[maxlen+1];
798    char *q = *p;
799
800    id[0] = 0;
801    for (int i=0; i<maxlen; i++) {
802       if (*q == 0) {
803          break;
804       } else if (*q == ',') {
805          q++;
806          break;
807       }
808       id[i] = *q++;
809       id[i+1] = 0;
810    }
811    if (id[0] == 0) {
812       return 0;
813    } else if (!is_a_number(id)) {
814       return -1;                      /* error */
815    }
816    *p = q;
817    *Id = str_to_int64(id);
818    return 1;
819 }
820
821 static int get_path_handler(void *ctx, int fields, char **row)
822 {
823    POOL_MEM *buf = (POOL_MEM *) ctx;
824    pm_strcpy(*buf, row[0]);
825    return 0;
826 }
827
828 static bool check_temp(char *output_table)
829 {
830    if (output_table[0] == 'b' &&
831        output_table[1] == '2' &&
832        is_an_integer(output_table + 2))
833    {
834       return true;
835    }
836    return false;
837 }
838
839 bool Bvfs::drop_restore_list(char *output_table)
840 {
841    POOL_MEM query;
842    if (check_temp(output_table)) {
843       Mmsg(query, "DROP TABLE %s", output_table);
844       db_sql_query(db, query.c_str(), NULL, NULL);
845       return true;
846    }
847    return false;
848 }
849
850 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
851                                 char *output_table)
852 {
853    POOL_MEM query;
854    POOL_MEM tmp, tmp2;
855    int64_t id, jobid;
856    bool init=false;
857    bool ret=false;
858    /* check args */
859    if ((*fileid   && !is_a_number_list(fileid))  ||
860        (*dirid    && !is_a_number_list(dirid))   ||
861        (*hardlink && !is_a_number_list(hardlink))||
862        (!*hardlink && !*fileid && !*dirid && !*hardlink))
863    {
864       return false;
865    }
866    if (!check_temp(output_table)) {
867       return false;
868    }
869
870    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
871
872    if (*fileid) {               /* Select files with their direct id */
873       init=true;
874       Mmsg(tmp,"SELECT JobId, JobTDate, FileIndex, FilenameId, PathId, FileId "
875                   "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
876            fileid);
877       pm_strcat(query, tmp.c_str());
878    }
879
880    /* Add a directory content */
881    while (get_next_id_from_list(&dirid, &id) == 1) {
882       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
883       
884       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
885          Dmsg0(dbglevel, "Can't search for path\n");
886          /* print error */
887          return false;
888       }
889       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
890          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
891                id, tmp.c_str(), tmp2.c_str());
892          break;
893       }
894       /* escape % and _ for LIKE search */
895       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
896       char *p = tmp.c_str();
897       for (char *s = tmp2.c_str(); *s ; s++) {
898          if (*s == '%' || *s == '_' || *s == '\\') {
899             *p = '\\'; 
900             p++;
901          }
902          *p = *s; 
903          p++;
904       }
905       *p = '\0';
906       tmp.strcat("%");
907
908       size_t len = strlen(tmp.c_str());
909       tmp2.check_size((len+1) * 2);
910       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
911
912       if (init) {
913          query.strcat(" UNION ");
914       }
915
916       Mmsg(tmp, "SELECT JobId, JobTDate, File.FileIndex, File.FilenameId, "
917                         "File.PathId, FileId "
918                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
919                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ", 
920            tmp2.c_str(), jobids); 
921       query.strcat(tmp.c_str());
922       init = true;
923
924       query.strcat(" UNION ");
925
926       /* A directory can have files from a BaseJob */
927       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
928                         "File.FilenameId, File.PathId, BaseFiles.FileId "
929                    "FROM BaseFiles "
930                         "JOIN File USING (FileId) "
931                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
932                         "JOIN Path USING (PathId) "
933                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ", 
934            tmp2.c_str(), jobids); 
935       query.strcat(tmp.c_str());
936    }
937
938    /* expect jobid,fileindex */
939    int64_t prev_jobid=0;
940    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
941       if (get_next_id_from_list(&hardlink, &id) != 1) {
942          Dmsg0(dbglevel, "hardlink should be two by two\n");
943          return false;
944       }
945       if (jobid != prev_jobid) { /* new job */
946          if (prev_jobid == 0) {  /* first jobid */
947             if (init) {
948                query.strcat(" UNION ");
949             }
950          } else {               /* end last job, start new one */
951             tmp.strcat(") UNION ");
952             query.strcat(tmp.c_str());
953          }
954          Mmsg(tmp,   "SELECT JobId, JobTDate, FileIndex, FilenameId, "
955                             "PathId, FileId "
956                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld " 
957                         "AND FileIndex IN (%lld", jobid, id);
958          prev_jobid = jobid;
959
960       } else {                  /* same job, add new findex */
961          Mmsg(tmp2, ", %lld", id);
962          tmp.strcat(tmp2.c_str());
963       }
964    }
965
966    if (prev_jobid != 0) {       /* end last job */
967       tmp.strcat(") ");
968       query.strcat(tmp.c_str());
969       init = true;
970    }
971
972    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
973
974    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
975       Dmsg0(dbglevel, "Can't execute q\n");
976       goto bail_out;
977    }
978
979    /* TODO: handle basejob and SQLite3 */
980    Mmsg(query, sql_bvfs_select[db_get_type_index(db)], output_table, output_table);
981
982    /* TODO: handle jobid filter */
983    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
984    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
985       Dmsg0(dbglevel, "Can't execute q\n");
986       goto bail_out;
987    }
988
989    /* MySQL need it */
990    if (db_get_type_index(db) == SQL_TYPE_MYSQL) {
991       Mmsg(query, "CREATE INDEX idx_%s ON b2%s (JobId)", 
992            output_table, output_table);
993       Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
994       if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
995          Dmsg0(dbglevel, "Can't execute q\n");
996          goto bail_out;
997       }
998    }
999
1000    ret = true;
1001
1002 bail_out:
1003    Mmsg(query, "DROP TABLE btemp%s", output_table);
1004    db_sql_query(db, query.c_str(), NULL, NULL);
1005    return ret;
1006 }
1007
1008 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI */