]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Improve speed of BVFS with SQLite, Thanks to J.Starek
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #include "bacula.h"
30
31 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
32
33 #include "cats.h"
34 #include "bdb_priv.h"
35 #include "sql_glue.h"
36 #include "lib/htable.h"
37 #include "bvfs.h"
38
39 #define dbglevel 10
40 #define dbglevel_sql 15
41
42 static int result_handler(void *ctx, int fields, char **row)
43 {
44    if (fields == 4) {
45       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3]);
47    } else if (fields == 5) {
48       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4]);
50    } else if (fields == 6) {
51       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5]);
53    } else if (fields == 7) {
54       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
55             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
56    }
57    return 0;
58 }
59
60 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
61    jcr = j;
62    jcr->inc_use_count();
63    db = mdb;                 /* need to inc ref count */
64    jobids = get_pool_memory(PM_NAME);
65    prev_dir = get_pool_memory(PM_NAME);
66    pattern = get_pool_memory(PM_NAME);
67    *jobids = *prev_dir = *pattern = 0;
68    dir_filenameid = pwd_id = offset = 0;
69    see_copies = see_all_versions = false;
70    limit = 1000;
71    attr = new_attr(jcr);
72    list_entries = result_handler;
73    user_data = this;
74    username = NULL;
75 }
76
77 Bvfs::~Bvfs() {
78    free_pool_memory(jobids);
79    free_pool_memory(pattern);
80    free_pool_memory(prev_dir);
81    if (username) {
82       free(username);
83    }
84    free_attr(attr);
85    jcr->dec_use_count();
86 }
87
88 void Bvfs::filter_jobid()
89 {
90    if (!username) {
91       return;
92    }
93
94    /* Query used by Bweb to filter clients, activated when using
95     * set_username() 
96     */
97    POOL_MEM query;
98    Mmsg(query,
99       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
100         "JOIN (SELECT ClientId FROM client_group_member "
101         "JOIN client_group USING (client_group_id) "
102         "JOIN bweb_client_group_acl USING (client_group_id) "
103         "JOIN bweb_user USING (userid) "
104        "WHERE bweb_user.username = '%s' "
105       ") AS filter USING (ClientId) "
106         " WHERE JobId IN (%s)", 
107         username, jobids);
108
109    db_list_ctx ctx;
110    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
111    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
112    pm_strcpy(jobids, ctx.list);
113 }
114
115 void Bvfs::set_jobid(JobId_t id)
116 {
117    Mmsg(jobids, "%lld", (uint64_t)id);
118    filter_jobid();
119 }
120
121 void Bvfs::set_jobids(char *ids)
122 {
123    pm_strcpy(jobids, ids);
124    filter_jobid();
125 }
126
127 /* 
128  * TODO: Find a way to let the user choose how he wants to display
129  * files and directories
130  */
131
132
133 /* 
134  * Working Object to store PathId already seen (avoid
135  * database queries), equivalent to %cache_ppathid in perl
136  */
137
138 #define NITEMS 50000
139 class pathid_cache {
140 private:
141    hlink *nodes;
142    int nb_node;
143    int max_node;
144
145    alist *table_node;
146
147    htable *cache_ppathid;
148
149 public:
150    pathid_cache() {
151       hlink link;
152       cache_ppathid = (htable *)malloc(sizeof(htable));
153       cache_ppathid->init(&link, &link, NITEMS);
154       max_node = NITEMS;
155       nodes = (hlink *) malloc(max_node * sizeof (hlink));
156       nb_node = 0;
157       table_node = New(alist(5, owned_by_alist));
158       table_node->append(nodes);
159    }
160
161    hlink *get_hlink() {
162       if (++nb_node >= max_node) {
163          nb_node = 0;
164          nodes = (hlink *)malloc(max_node * sizeof(hlink));
165          table_node->append(nodes);
166       }
167       return nodes + nb_node;
168    }
169
170    bool lookup(char *pathid) {
171       bool ret = cache_ppathid->lookup(pathid) != NULL;
172       return ret;
173    }
174    
175    void insert(char *pathid) {
176       hlink *h = get_hlink();
177       cache_ppathid->insert(pathid, h);
178    }
179
180    ~pathid_cache() {
181       cache_ppathid->destroy();
182       free(cache_ppathid);
183       delete table_node;
184    }
185 private:
186    pathid_cache(const pathid_cache &); /* prohibit pass by value */
187    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
188 } ;
189
190 /* Return the parent_dir with the trailing /  (update the given string)
191  * TODO: see in the rest of bacula if we don't have already this function
192  * dir=/tmp/toto/
193  * dir=/tmp/
194  * dir=/
195  * dir=
196  */
197 char *bvfs_parent_dir(char *path)
198 {
199    char *p = path;
200    int len = strlen(path) - 1;
201
202    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
203       path[len] = '\0';
204    }
205
206    if (len > 0) {
207       p += len;
208       while (p > path && !IsPathSeparator(*p)) {
209          p--;
210       }
211       p[1] = '\0';
212    }
213    return path;
214 }
215
216 /* Return the basename of the with the trailing /
217  * TODO: see in the rest of bacula if we don't have
218  * this function already
219  */
220 char *bvfs_basename_dir(char *path)
221 {
222    char *p = path;
223    int len = strlen(path) - 1;
224
225    if (path[len] == '/') {      /* if directory, skip last / */
226       len -= 1;
227    }
228
229    if (len > 0) {
230       p += len;
231       while (p > path && !IsPathSeparator(*p)) {
232          p--;
233       }
234       if (*p == '/') {
235          p++;                  /* skip first / */
236       }
237    } 
238    return p;
239 }
240
241 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
242                                  pathid_cache &ppathid_cache, 
243                                  char *org_pathid, char *path)
244 {
245    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
246    char pathid[50];
247    ATTR_DBR parent;
248    char *bkp = mdb->path;
249    strncpy(pathid, org_pathid, sizeof(pathid));
250
251    /* Does the ppathid exist for this ? we use a memory cache...  In order to
252     * avoid the full loop, we consider that if a dir is allready in the
253     * PathHierarchy table, then there is no need to calculate all the
254     * hierarchy
255     */
256    while (path && *path)
257    {
258       if (!ppathid_cache.lookup(pathid))
259       {
260          Mmsg(mdb->cmd, 
261               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
262               pathid);
263
264          QUERY_DB(jcr, mdb, mdb->cmd);
265          /* Do we have a result ? */
266          if (sql_num_rows(mdb) > 0) {
267             ppathid_cache.insert(pathid);
268             /* This dir was in the db ...
269              * It means we can leave, the tree has allready been built for
270              * this dir
271              */
272             goto bail_out;
273          } else {
274             /* search or create parent PathId in Path table */
275             mdb->path = bvfs_parent_dir(path);
276             mdb->pnl = strlen(mdb->path);
277             if (!db_create_path_record(jcr, mdb, &parent)) {
278                goto bail_out;
279             }
280             ppathid_cache.insert(pathid);
281             
282             Mmsg(mdb->cmd,
283                  "INSERT INTO PathHierarchy (PathId, PPathId) "
284                  "VALUES (%s,%lld)",
285                  pathid, (uint64_t) parent.PathId);
286             
287             INSERT_DB(jcr, mdb, mdb->cmd);
288
289             edit_uint64(parent.PathId, pathid);
290             path = mdb->path;   /* already done */
291          }
292       } else {
293          /* It's already in the cache.  We can leave, no time to waste here,
294           * all the parent dirs have allready been done
295           */
296          goto bail_out;
297       }
298    }   
299
300 bail_out:
301    mdb->path = bkp;
302    mdb->fnl = 0;
303 }
304
305 /* 
306  * Internal function to update path_hierarchy cache with a shared pathid cache
307  */
308 static void update_path_hierarchy_cache(JCR *jcr,
309                                         B_DB *mdb,
310                                         pathid_cache &ppathid_cache,
311                                         JobId_t JobId)
312 {
313    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
314
315    uint32_t num;
316    char jobid[50];
317    edit_uint64(JobId, jobid);
318  
319    db_lock(mdb);
320    db_start_transaction(jcr, mdb);
321
322    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
323    
324    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
325       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
326       goto bail_out;
327    }
328
329    /* Inserting path records for JobId */
330    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
331                    "SELECT DISTINCT PathId, JobId "
332                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
333                            "UNION "
334                            "SELECT PathId, BaseFiles.JobId "
335                              "FROM BaseFiles JOIN File AS F USING (FileId) "
336                             "WHERE BaseFiles.JobId = %s) AS B",
337         jobid, jobid);
338    QUERY_DB(jcr, mdb, mdb->cmd);
339
340    /* Now we have to do the directory recursion stuff to determine missing
341     * visibility We try to avoid recursion, to be as fast as possible We also
342     * only work on not allready hierarchised directories...
343     */
344    Mmsg(mdb->cmd, 
345      "SELECT PathVisibility.PathId, Path "
346        "FROM PathVisibility "
347             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
348             "LEFT JOIN PathHierarchy "
349          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
350       "WHERE PathVisibility.JobId = %s "
351         "AND PathHierarchy.PathId IS NULL "
352       "ORDER BY Path", jobid);
353    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
354    QUERY_DB(jcr, mdb, mdb->cmd);
355
356    /* TODO: I need to reuse the DB connection without emptying the result 
357     * So, now i'm copying the result in memory to be able to query the
358     * catalog descriptor again.
359     */
360    num = sql_num_rows(mdb);
361    if (num > 0) {
362       char **result = (char **)malloc (num * 2 * sizeof(char *));
363       
364       SQL_ROW row;
365       int i=0;
366       while((row = sql_fetch_row(mdb))) {
367          result[i++] = bstrdup(row[0]);
368          result[i++] = bstrdup(row[1]);
369       }
370       
371       i=0;
372       while (num > 0) {
373          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
374          free(result[i++]);
375          free(result[i++]);
376          num--;
377       }
378       free(result);
379    }
380
381    if (mdb->db_get_type_index() == SQL_TYPE_SQLITE3) {
382       Mmsg(mdb->cmd, 
383  "INSERT INTO PathVisibility (PathId, JobId) "
384    "SELECT DISTINCT h.PPathId AS PathId, %s "
385      "FROM PathHierarchy AS h "
386     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
387       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
388            jobid, jobid, jobid );
389
390    } else {
391       Mmsg(mdb->cmd, 
392   "INSERT INTO PathVisibility (PathId, JobId)  "
393    "SELECT a.PathId,%s "
394    "FROM ( "
395      "SELECT DISTINCT h.PPathId AS PathId "
396        "FROM PathHierarchy AS h "
397        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
398       "WHERE p.JobId=%s) AS a LEFT JOIN "
399        "(SELECT PathId "
400           "FROM PathVisibility "
401          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
402    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
403    }
404
405    do {
406       QUERY_DB(jcr, mdb, mdb->cmd);
407    } while (sql_affected_rows(mdb) > 0);
408    
409    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
410    UPDATE_DB(jcr, mdb, mdb->cmd);
411
412 bail_out:
413    db_end_transaction(jcr, mdb);
414    db_unlock(mdb);
415 }
416
417 /* 
418  * Find an store the filename descriptor for empty directories Filename.Name=''
419  */
420 DBId_t Bvfs::get_dir_filenameid()
421 {
422    uint32_t id;
423    if (dir_filenameid) {
424       return dir_filenameid;
425    }
426    POOL_MEM q;
427    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
428    db_sql_query(db, q.c_str(), db_int_handler, &id);
429    dir_filenameid = id;
430    return dir_filenameid;
431 }
432
433 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
434 {
435    uint32_t nb=0;
436    db_list_ctx jobids_list;
437
438    db_lock(mdb);
439    db_start_transaction(jcr, mdb);
440
441 #ifdef xxx
442    /* TODO: Remove this code when updating make_bacula_table script */
443    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
444    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
445       Dmsg0(dbglevel, "Creating cache table\n");
446       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
447       QUERY_DB(jcr, mdb, mdb->cmd);
448
449       Mmsg(mdb->cmd,
450            "CREATE TABLE PathHierarchy ( "
451            "PathId integer NOT NULL, "
452            "PPathId integer NOT NULL, "
453            "CONSTRAINT pathhierarchy_pkey "
454            "PRIMARY KEY (PathId))");
455       QUERY_DB(jcr, mdb, mdb->cmd); 
456
457       Mmsg(mdb->cmd,
458            "CREATE INDEX pathhierarchy_ppathid "
459            "ON PathHierarchy (PPathId)");
460       QUERY_DB(jcr, mdb, mdb->cmd);
461
462       Mmsg(mdb->cmd, 
463            "CREATE TABLE PathVisibility ("
464            "PathId integer NOT NULL, "
465            "JobId integer NOT NULL, "
466            "Size int8 DEFAULT 0, "
467            "Files int4 DEFAULT 0, "
468            "CONSTRAINT pathvisibility_pkey "
469            "PRIMARY KEY (JobId, PathId))");
470       QUERY_DB(jcr, mdb, mdb->cmd);
471
472       Mmsg(mdb->cmd, 
473            "CREATE INDEX pathvisibility_jobid "
474            "ON PathVisibility (JobId)");
475       QUERY_DB(jcr, mdb, mdb->cmd);
476
477    }
478 #endif
479
480    Mmsg(mdb->cmd, 
481  "SELECT JobId from Job "
482   "WHERE HasCache = 0 "
483     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
484   "ORDER BY JobId");
485
486    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
487
488    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
489
490    db_end_transaction(jcr, mdb);
491    db_start_transaction(jcr, mdb);
492    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
493    Mmsg(mdb->cmd, 
494         "DELETE FROM PathVisibility "
495          "WHERE NOT EXISTS "
496         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
497    nb = DELETE_DB(jcr, mdb, mdb->cmd);
498    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
499
500    db_end_transaction(jcr, mdb);
501    db_unlock(mdb);
502 }
503
504 /*
505  * Update the bvfs cache for given jobids (1,2,3,4)
506  */
507 void
508 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
509 {
510    pathid_cache ppathid_cache;
511    JobId_t JobId;
512    char *p;
513
514    for (p=jobids; ; ) {
515       int stat = get_next_jobid_from_list(&p, &JobId);
516       if (stat < 0) {
517          return;
518       }
519       if (stat == 0) {
520          break;
521       }
522       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
523       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
524    }
525 }
526
527 /* 
528  * Update the bvfs cache for current jobids
529  */
530 void Bvfs::update_cache()
531 {
532    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
533 }
534
535 /* Change the current directory, returns true if the path exists */
536 bool Bvfs::ch_dir(const char *path)
537 {
538    pm_strcpy(db->path, path);
539    db->pnl = strlen(db->path);
540    ch_dir(db_get_path_record(jcr, db)); 
541    return pwd_id != 0;
542 }
543
544 /* 
545  * Get all file versions for a specified client
546  * TODO: Handle basejobs using different client
547  */
548 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
549 {
550    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
551          (uint64_t)fnid, client);
552    char ed1[50], ed2[50];
553    POOL_MEM q;
554    if (see_copies) {
555       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
556    } else {
557       Mmsg(q, " AND Job.Type = 'B' ");
558    }
559
560    POOL_MEM query;
561
562    Mmsg(query,//    1           2              3       
563 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
564 //         4          5           6
565         "File.JobId, File.LStat, File.FileId, "
566 //         7                    8
567        "Media.VolumeName, Media.InChanger "
568 "FROM File, Job, Client, JobMedia, Media "
569 "WHERE File.FilenameId = %s "
570   "AND File.PathId=%s "
571   "AND File.JobId = Job.JobId "
572   "AND Job.JobId = JobMedia.JobId "
573   "AND File.FileIndex >= JobMedia.FirstIndex "
574   "AND File.FileIndex <= JobMedia.LastIndex "
575   "AND JobMedia.MediaId = Media.MediaId "
576   "AND Job.ClientId = Client.ClientId "
577   "AND Client.Name = '%s' "
578   "%s ORDER BY FileId LIMIT %d OFFSET %d"
579         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
580         limit, offset);
581    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
582    db_sql_query(db, query.c_str(), list_entries, user_data);
583 }
584
585 DBId_t Bvfs::get_root()
586 {
587    *db->path = 0;
588    return db_get_path_record(jcr, db);
589 }
590
591 static int path_handler(void *ctx, int fields, char **row)
592 {
593    Bvfs *fs = (Bvfs *) ctx;
594    return fs->_handle_path(ctx, fields, row);
595 }
596
597 int Bvfs::_handle_path(void *ctx, int fields, char **row)
598 {
599    if (bvfs_is_dir(row)) {
600       /* can have the same path 2 times */
601       if (strcmp(row[BVFS_Name], prev_dir)) {
602          pm_strcpy(prev_dir, row[BVFS_Name]);
603          return list_entries(user_data, fields, row);
604       }
605    }
606    return 0;
607 }
608
609 /* 
610  * Retrieve . and .. information
611  */
612 void Bvfs::ls_special_dirs()
613 {
614    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
615    char ed1[50], ed2[50];
616    if (*jobids == 0) {
617       return;
618    }
619    if (!dir_filenameid) {
620       get_dir_filenameid();
621    }
622
623    /* Will fetch directories  */
624    *prev_dir = 0;
625
626    POOL_MEM query;
627    Mmsg(query, 
628 "(SELECT PPathId AS PathId, '..' AS Path "
629     "FROM  PathHierarchy "
630    "WHERE  PathId = %s "
631 "UNION "
632  "SELECT %s AS PathId, '.' AS Path)",
633         edit_uint64(pwd_id, ed1), ed1);
634
635    POOL_MEM query2;
636    Mmsg(query2,// 1      2     3        4     5       6
637 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
638   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
639        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
640               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
641        "WHERE File1.FilenameId = %s "
642        "AND File1.JobId IN (%s)) AS listfile1 "
643   "ON (tmp.PathId = listfile1.PathId) "
644   "ORDER BY tmp.Path, JobId DESC ",
645         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
646
647    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
648    db_sql_query(db, query2.c_str(), path_handler, this);
649 }
650
651 /* Returns true if we have dirs to read */
652 bool Bvfs::ls_dirs()
653 {
654    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
655    char ed1[50], ed2[50];
656    if (*jobids == 0) {
657       return false;
658    }
659
660    POOL_MEM query;
661    POOL_MEM filter;
662    if (*pattern) {
663       int len = strlen(pattern);
664       query.check_size(len*2+1);
665       db_escape_string(jcr, db, query.c_str(), pattern, len);
666       Mmsg(filter, " AND Path2.Path %s '%s' ", match_query[db_get_type_index(db)], query.c_str());
667    }
668
669    if (!dir_filenameid) {
670       get_dir_filenameid();
671    }
672
673    /* the sql query displays same directory multiple time, take the first one */
674    *prev_dir = 0;
675
676    /* Let's retrieve the list of the visible dirs in this dir ...
677     * First, I need the empty filenameid to locate efficiently
678     * the dirs in the file table
679     * my $dir_filenameid = $self->get_dir_filenameid();
680     */
681    /* Then we get all the dir entries from File ... */
682    Mmsg(query,
683 //       0     1     2   3      4     5       6
684 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
685     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
686            "lower(Path1.Path) AS lpath, "
687            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
688            "listfile1.FileId AS FileId "
689     "FROM ( "
690       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
691       "FROM PathHierarchy AS PathHierarchy1 "
692       "JOIN Path AS Path2 "
693         "ON (PathHierarchy1.PathId = Path2.PathId) "
694       "JOIN PathVisibility AS PathVisibility1 "
695         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
696       "WHERE PathHierarchy1.PPathId = %s "
697       "AND PathVisibility1.JobId IN (%s) "
698            "%s "
699      ") AS listpath1 "
700    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
701
702    "LEFT JOIN ( " /* get attributes if any */
703        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
704               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
705        "WHERE File1.FilenameId = %s "
706        "AND File1.JobId IN (%s)) AS listfile1 "
707        "ON (listpath1.PathId = listfile1.PathId) "
708     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
709         edit_uint64(pwd_id, ed1),
710         jobids,
711         filter.c_str(),
712         edit_uint64(dir_filenameid, ed2),
713         jobids,
714         limit, offset);
715
716    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
717
718    db_lock(db);
719    db_sql_query(db, query.c_str(), path_handler, this);
720    nb_record = sql_num_rows(db);
721    db_unlock(db);
722
723    return nb_record == limit;
724 }
725
726 void build_ls_files_query(B_DB *db, POOL_MEM &query, 
727                           const char *JobId, const char *PathId,  
728                           const char *filter, int64_t limit, int64_t offset)
729 {
730    if (db_get_type_index(db) == SQL_TYPE_POSTGRESQL) {
731       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
732            JobId, PathId, JobId, PathId, 
733            filter, limit, offset);
734    } else {
735       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
736            JobId, PathId, JobId, PathId, 
737            limit, offset, filter, JobId, JobId);
738    }
739 }
740
741 /* Returns true if we have files to read */
742 bool Bvfs::ls_files()
743 {
744    POOL_MEM query;
745    POOL_MEM filter;
746    char pathid[50];
747
748    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
749    if (*jobids == 0) {
750       return false;
751    }
752
753    if (!pwd_id) {
754       ch_dir(get_root());
755    }
756
757    edit_uint64(pwd_id, pathid);
758    if (*pattern) {
759       int len = strlen(pattern);
760       query.check_size(len*2+1);
761       db_escape_string(jcr, db, query.c_str(), pattern, len);
762       Mmsg(filter, " AND Filename.Name %s '%s' ", match_query[db_get_type_index(db)], query.c_str());
763    }
764
765    build_ls_files_query(db, query, 
766                         jobids, pathid, filter.c_str(),
767                         limit, offset);
768
769    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
770
771    db_lock(db);
772    db_sql_query(db, query.c_str(), list_entries, user_data);
773    nb_record = sql_num_rows(db);
774    db_unlock(db);
775
776    return nb_record == limit;
777 }
778
779
780 /* 
781  * Return next Id from comma separated list   
782  *
783  * Returns:
784  *   1 if next Id returned
785  *   0 if no more Ids are in list
786  *  -1 there is an error
787  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
788  */
789 static int get_next_id_from_list(char **p, int64_t *Id)
790 {
791    const int maxlen = 30;
792    char id[maxlen+1];
793    char *q = *p;
794
795    id[0] = 0;
796    for (int i=0; i<maxlen; i++) {
797       if (*q == 0) {
798          break;
799       } else if (*q == ',') {
800          q++;
801          break;
802       }
803       id[i] = *q++;
804       id[i+1] = 0;
805    }
806    if (id[0] == 0) {
807       return 0;
808    } else if (!is_a_number(id)) {
809       return -1;                      /* error */
810    }
811    *p = q;
812    *Id = str_to_int64(id);
813    return 1;
814 }
815
816 static int get_path_handler(void *ctx, int fields, char **row)
817 {
818    POOL_MEM *buf = (POOL_MEM *) ctx;
819    pm_strcpy(*buf, row[0]);
820    return 0;
821 }
822
823 static bool check_temp(char *output_table)
824 {
825    if (output_table[0] == 'b' &&
826        output_table[1] == '2' &&
827        is_an_integer(output_table + 2))
828    {
829       return true;
830    }
831    return false;
832 }
833
834 bool Bvfs::drop_restore_list(char *output_table)
835 {
836    POOL_MEM query;
837    if (check_temp(output_table)) {
838       Mmsg(query, "DROP TABLE %s", output_table);
839       db_sql_query(db, query.c_str(), NULL, NULL);
840       return true;
841    }
842    return false;
843 }
844
845 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
846                                 char *output_table)
847 {
848    POOL_MEM query;
849    POOL_MEM tmp, tmp2;
850    int64_t id, jobid;
851    bool init=false;
852    bool ret=false;
853    /* check args */
854    if ((*fileid   && !is_a_number_list(fileid))  ||
855        (*dirid    && !is_a_number_list(dirid))   ||
856        (*hardlink && !is_a_number_list(hardlink))||
857        (!*hardlink && !*fileid && !*dirid && !*hardlink))
858    {
859       return false;
860    }
861    if (!check_temp(output_table)) {
862       return false;
863    }
864
865    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
866
867    if (*fileid) {               /* Select files with their direct id */
868       init=true;
869       Mmsg(tmp,"SELECT JobId, JobTDate, FileIndex, FilenameId, PathId, FileId "
870                   "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
871            fileid);
872       pm_strcat(query, tmp.c_str());
873    }
874
875    /* Add a directory content */
876    while (get_next_id_from_list(&dirid, &id) == 1) {
877       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
878       
879       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
880          Dmsg0(dbglevel, "Can't search for path\n");
881          /* print error */
882          return false;
883       }
884       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
885          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
886                id, tmp.c_str(), tmp2.c_str());
887          break;
888       }
889       /* escape % and _ for LIKE search */
890       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
891       char *p = tmp.c_str();
892       for (char *s = tmp2.c_str(); *s ; s++) {
893          if (*s == '%' || *s == '_' || *s == '\\') {
894             *p = '\\'; 
895             p++;
896          }
897          *p = *s; 
898          p++;
899       }
900       *p = '\0';
901       tmp.strcat("%");
902
903       size_t len = strlen(tmp.c_str());
904       tmp2.check_size((len+1) * 2);
905       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
906
907       if (init) {
908          query.strcat(" UNION ");
909       }
910
911       Mmsg(tmp, "SELECT JobId, JobTDate, File.FileIndex, File.FilenameId, "
912                         "File.PathId, FileId "
913                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
914                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ", 
915            tmp2.c_str(), jobids); 
916       query.strcat(tmp.c_str());
917       init = true;
918
919       query.strcat(" UNION ");
920
921       /* A directory can have files from a BaseJob */
922       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
923                         "File.FilenameId, File.PathId, BaseFiles.FileId "
924                    "FROM BaseFiles "
925                         "JOIN File USING (FileId) "
926                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
927                         "JOIN Path USING (PathId) "
928                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ", 
929            tmp2.c_str(), jobids); 
930       query.strcat(tmp.c_str());
931    }
932
933    /* expect jobid,fileindex */
934    int64_t prev_jobid=0;
935    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
936       if (get_next_id_from_list(&hardlink, &id) != 1) {
937          Dmsg0(dbglevel, "hardlink should be two by two\n");
938          return false;
939       }
940       if (jobid != prev_jobid) { /* new job */
941          if (prev_jobid == 0) {  /* first jobid */
942             if (init) {
943                query.strcat(" UNION ");
944             }
945          } else {               /* end last job, start new one */
946             tmp.strcat(") UNION ");
947             query.strcat(tmp.c_str());
948          }
949          Mmsg(tmp,   "SELECT JobId, JobTDate, FileIndex, FilenameId, "
950                             "PathId, FileId "
951                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld " 
952                         "AND FileIndex IN (%lld", jobid, id);
953          prev_jobid = jobid;
954
955       } else {                  /* same job, add new findex */
956          Mmsg(tmp2, ", %lld", id);
957          tmp.strcat(tmp2.c_str());
958       }
959    }
960
961    if (prev_jobid != 0) {       /* end last job */
962       tmp.strcat(") ");
963       query.strcat(tmp.c_str());
964       init = true;
965    }
966
967    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
968
969    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
970       Dmsg0(dbglevel, "Can't execute q\n");
971       goto bail_out;
972    }
973
974    /* TODO: handle basejob and SQLite3 */
975    Mmsg(query, sql_bvfs_select[db_get_type_index(db)], output_table, output_table);
976
977    /* TODO: handle jobid filter */
978    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
979    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
980       Dmsg0(dbglevel, "Can't execute q\n");
981       goto bail_out;
982    }
983
984    /* MySQL need it */
985    if (db_get_type_index(db) == SQL_TYPE_MYSQL) {
986       Mmsg(query, "CREATE INDEX idx_%s ON b2%s (JobId)", 
987            output_table, output_table);
988       Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
989       if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
990          Dmsg0(dbglevel, "Can't execute q\n");
991          goto bail_out;
992       }
993    }
994
995    ret = true;
996
997 bail_out:
998    Mmsg(query, "DROP TABLE btemp%s", output_table);
999    db_sql_query(db, query.c_str(), NULL, NULL);
1000    return ret;
1001 }
1002
1003 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI */